diff --git a/api.go b/api.go index 9244052..8484f58 100644 --- a/api.go +++ b/api.go @@ -57,6 +57,19 @@ func (data *ApiMetricData) AddStats() { } } +func (data *ApiMetricData) ScaleBy(f Float) { + if f == 0 || f == 1 { + return + } + + data.Avg *= f + data.Min *= f + data.Max *= f + for i := 0; i < len(data.Data); i++ { + data.Data[i] *= f + } +} + func (data *ApiMetricData) PadDataWithNull(from, to int64, metric string) { minfo, ok := memoryStore.metrics[metric] if !ok { @@ -162,13 +175,14 @@ type ApiQueryResponse struct { } type ApiQuery struct { - Metric string `json:"metric"` - Hostname string `json:"host"` - Aggregate bool `json:"aggreg"` - Type *string `json:"type,omitempty"` - TypeIds []int `json:"type-ids,omitempty"` - SubType *string `json:"subtype,omitempty"` - SubTypeIds []int `json:"subtype-ids,omitempty"` + Metric string `json:"metric"` + Hostname string `json:"host"` + Aggregate bool `json:"aggreg"` + ScaleFactor Float `json:"scale-by,omitempty"` + Type *string `json:"type,omitempty"` + TypeIds []int `json:"type-ids,omitempty"` + SubType *string `json:"subtype,omitempty"` + SubTypeIds []int `json:"subtype-ids,omitempty"` } func handleQuery(rw http.ResponseWriter, r *http.Request) { @@ -260,6 +274,9 @@ func handleQuery(rw http.ResponseWriter, r *http.Request) { if req.WithStats { data.AddStats() } + if query.ScaleFactor != 0 { + data.ScaleBy(query.ScaleFactor) + } if req.WithPadding { data.PadDataWithNull(req.From, req.To, query.Metric) } @@ -333,17 +350,23 @@ func StartApiServer(ctx context.Context, httpConfig *HttpConfig) error { r.HandleFunc("/api/write", handleWrite) r.HandleFunc("/api/query", handleQuery) r.HandleFunc("/api/debug", func(rw http.ResponseWriter, r *http.Request) { - bw := bufio.NewWriter(rw) - defer bw.Flush() + raw := r.URL.Query().Get("selector") + selector := []string{} + if len(raw) != 0 { + selector = strings.Split(raw, ":") + } - memoryStore.DebugDump(bw) + if err := memoryStore.DebugDump(bufio.NewWriter(rw), selector); err != nil { + rw.WriteHeader(http.StatusBadRequest) + rw.Write([]byte(err.Error())) + } }) server := &http.Server{ Handler: r, Addr: httpConfig.Address, - WriteTimeout: 15 * time.Second, - ReadTimeout: 15 * time.Second, + WriteTimeout: 30 * time.Second, + ReadTimeout: 30 * time.Second, } if len(conf.JwtPublicKey) > 0 { diff --git a/cc-metric-store.go b/cc-metric-store.go index b9bc135..5dfe64b 100644 --- a/cc-metric-store.go +++ b/cc-metric-store.go @@ -277,7 +277,7 @@ func main() { for { sig := <-sigs if sig == syscall.SIGUSR1 { - memoryStore.DebugDump(bufio.NewWriter(os.Stdout)) + memoryStore.DebugDump(bufio.NewWriter(os.Stdout), nil) continue } diff --git a/debug.go b/debug.go index 712c928..88af59f 100644 --- a/debug.go +++ b/debug.go @@ -3,48 +3,105 @@ package main import ( "bufio" "fmt" + "strconv" ) -func (b *buffer) debugDump(w *bufio.Writer) { +func (b *buffer) debugDump(buf []byte) []byte { if b.prev != nil { - b.prev.debugDump(w) + buf = b.prev.debugDump(buf) } - end := "" + start, len, end := b.start, len(b.data), b.start+b.frequency*int64(len(b.data)) + buf = append(buf, `{"start":`...) + buf = strconv.AppendInt(buf, start, 10) + buf = append(buf, `,"len":`...) + buf = strconv.AppendInt(buf, int64(len), 10) + buf = append(buf, `,"end":`...) + buf = strconv.AppendInt(buf, end, 10) + if b.archived { + buf = append(buf, `,"saved":true`...) + } if b.next != nil { - end = " -> " + buf = append(buf, `},`...) + } else { + buf = append(buf, `}`...) } - - to := b.start + b.frequency*int64(len(b.data)) - fmt.Fprintf(w, "buffer(from=%d, len=%d, to=%d, archived=%v)%s", b.start, len(b.data), to, b.archived, end) + return buf } -func (l *level) debugDump(w *bufio.Writer, m *MemoryStore, indent string) error { +func (l *level) debugDump(m *MemoryStore, w *bufio.Writer, lvlname string, buf []byte, depth int) ([]byte, error) { l.lock.RLock() defer l.lock.RUnlock() + for i := 0; i < depth; i++ { + buf = append(buf, '\t') + } + buf = append(buf, '"') + buf = append(buf, lvlname...) + buf = append(buf, "\":{\n"...) + depth += 1 + objitems := 0 + for name, mc := range m.metrics { + if b := l.metrics[mc.offset]; b != nil { + for i := 0; i < depth; i++ { + buf = append(buf, '\t') + } - for name, minfo := range m.metrics { - b := l.metrics[minfo.offset] - if b != nil { - fmt.Fprintf(w, "%smetric '%s': ", indent, name) - b.debugDump(w) - fmt.Fprint(w, "\n") + buf = append(buf, '"') + buf = append(buf, name...) + buf = append(buf, `":[`...) + buf = b.debugDump(buf) + buf = append(buf, "],\n"...) + objitems++ } } - if l.children != nil && len(l.children) > 0 { - fmt.Fprintf(w, "%schildren:\n", indent) - for name, lvl := range l.children { - fmt.Fprintf(w, "%s'%s':\n", indent, name) - lvl.debugDump(w, m, "\t"+indent) + for name, lvl := range l.children { + _, err := w.Write(buf) + if err != nil { + return nil, err } + + buf = buf[0:0] + buf, err = lvl.debugDump(m, w, name, buf, depth) + if err != nil { + return nil, err + } + + buf = append(buf, ',', '\n') + objitems++ } - return nil + // remove final `,`: + if objitems > 0 { + buf = append(buf[0:len(buf)-1], '\n') + } + + depth -= 1 + for i := 0; i < depth; i++ { + buf = append(buf, '\t') + } + buf = append(buf, '}') + return buf, nil } -func (m *MemoryStore) DebugDump(w *bufio.Writer) error { - fmt.Fprintf(w, "MemoryStore (%d MB):\n", m.SizeInBytes()/1024/1024) - m.root.debugDump(w, m, " ") +func (m *MemoryStore) DebugDump(w *bufio.Writer, selector []string) error { + lvl := m.root.findLevel(selector) + if lvl == nil { + return fmt.Errorf("not found: %#v", selector) + } + + buf := make([]byte, 0, 2048) + buf = append(buf, "{"...) + + buf, err := lvl.debugDump(m, w, "data", buf, 0) + if err != nil { + return err + } + + buf = append(buf, "}\n"...) + if _, err = w.Write(buf); err != nil { + return err + } + return w.Flush() } diff --git a/float.go b/float.go index 43c8eba..eae2d98 100644 --- a/float.go +++ b/float.go @@ -23,7 +23,7 @@ func (f Float) MarshalJSON() ([]byte, error) { return nullAsBytes, nil } - return strconv.AppendFloat(make([]byte, 0, 10), float64(f), 'f', -1, 64), nil + return strconv.AppendFloat(make([]byte, 0, 10), float64(f), 'f', 3, 64), nil } func (f *Float) UnmarshalJSON(input []byte) error { @@ -54,7 +54,7 @@ func (fa FloatArray) MarshalJSON() ([]byte, error) { if fa[i].IsNaN() { buf = append(buf, `null`...) } else { - buf = strconv.AppendFloat(buf, float64(fa[i]), 'f', -1, 64) + buf = strconv.AppendFloat(buf, float64(fa[i]), 'f', 3, 64) } } diff --git a/lineprotocol.go b/lineprotocol.go index ab7e332..b2e1692 100644 --- a/lineprotocol.go +++ b/lineprotocol.go @@ -295,6 +295,8 @@ func decodeLine(dec *lineprotocol.Decoder, clusterDefault string) error { metric.Value = Float(val.FloatV()) } else if val.Kind() == lineprotocol.Int { metric.Value = Float(val.IntV()) + } else if val.Kind() == lineprotocol.Uint { + metric.Value = Float(val.UintV()) } else { return fmt.Errorf("unsupported value type in message: %s", val.Kind().String()) } diff --git a/selector.go b/selector.go index de17ed5..25fe209 100644 --- a/selector.go +++ b/selector.go @@ -50,6 +50,22 @@ func (se *SelectorElement) MarshalJSON() ([]byte, error) { type Selector []SelectorElement +func (l *level) findLevel(selector []string) *level { + if len(selector) == 0 { + return l + } + + l.lock.RLock() + defer l.lock.RUnlock() + + lvl := l.children[selector[0]] + if lvl == nil { + return nil + } + + return lvl.findLevel(selector[1:]) +} + func (l *level) findBuffers(selector Selector, offset int, f func(b *buffer) error) error { l.lock.RLock() defer l.lock.RUnlock()