Cleanup; /api/debug returns JSON

This commit is contained in:
Lou Knauer 2022-03-31 14:17:27 +02:00
parent aca13c1769
commit f3d7e5c28a
6 changed files with 136 additions and 38 deletions

33
api.go
View File

@ -57,6 +57,19 @@ func (data *ApiMetricData) AddStats() {
} }
} }
func (data *ApiMetricData) ScaleBy(f Float) {
if f == 0 || f == 1 {
return
}
data.Avg *= f
data.Min *= f
data.Max *= f
for i := 0; i < len(data.Data); i++ {
data.Data[i] *= f
}
}
func (data *ApiMetricData) PadDataWithNull(from, to int64, metric string) { func (data *ApiMetricData) PadDataWithNull(from, to int64, metric string) {
minfo, ok := memoryStore.metrics[metric] minfo, ok := memoryStore.metrics[metric]
if !ok { if !ok {
@ -165,6 +178,7 @@ type ApiQuery struct {
Metric string `json:"metric"` Metric string `json:"metric"`
Hostname string `json:"host"` Hostname string `json:"host"`
Aggregate bool `json:"aggreg"` Aggregate bool `json:"aggreg"`
ScaleFactor Float `json:"scale-by,omitempty"`
Type *string `json:"type,omitempty"` Type *string `json:"type,omitempty"`
TypeIds []int `json:"type-ids,omitempty"` TypeIds []int `json:"type-ids,omitempty"`
SubType *string `json:"subtype,omitempty"` SubType *string `json:"subtype,omitempty"`
@ -260,6 +274,9 @@ func handleQuery(rw http.ResponseWriter, r *http.Request) {
if req.WithStats { if req.WithStats {
data.AddStats() data.AddStats()
} }
if query.ScaleFactor != 0 {
data.ScaleBy(query.ScaleFactor)
}
if req.WithPadding { if req.WithPadding {
data.PadDataWithNull(req.From, req.To, query.Metric) data.PadDataWithNull(req.From, req.To, query.Metric)
} }
@ -333,17 +350,23 @@ func StartApiServer(ctx context.Context, httpConfig *HttpConfig) error {
r.HandleFunc("/api/write", handleWrite) r.HandleFunc("/api/write", handleWrite)
r.HandleFunc("/api/query", handleQuery) r.HandleFunc("/api/query", handleQuery)
r.HandleFunc("/api/debug", func(rw http.ResponseWriter, r *http.Request) { r.HandleFunc("/api/debug", func(rw http.ResponseWriter, r *http.Request) {
bw := bufio.NewWriter(rw) raw := r.URL.Query().Get("selector")
defer bw.Flush() selector := []string{}
if len(raw) != 0 {
selector = strings.Split(raw, ":")
}
memoryStore.DebugDump(bw) if err := memoryStore.DebugDump(bufio.NewWriter(rw), selector); err != nil {
rw.WriteHeader(http.StatusBadRequest)
rw.Write([]byte(err.Error()))
}
}) })
server := &http.Server{ server := &http.Server{
Handler: r, Handler: r,
Addr: httpConfig.Address, Addr: httpConfig.Address,
WriteTimeout: 15 * time.Second, WriteTimeout: 30 * time.Second,
ReadTimeout: 15 * time.Second, ReadTimeout: 30 * time.Second,
} }
if len(conf.JwtPublicKey) > 0 { if len(conf.JwtPublicKey) > 0 {

View File

@ -277,7 +277,7 @@ func main() {
for { for {
sig := <-sigs sig := <-sigs
if sig == syscall.SIGUSR1 { if sig == syscall.SIGUSR1 {
memoryStore.DebugDump(bufio.NewWriter(os.Stdout)) memoryStore.DebugDump(bufio.NewWriter(os.Stdout), nil)
continue continue
} }

105
debug.go
View File

@ -3,48 +3,105 @@ package main
import ( import (
"bufio" "bufio"
"fmt" "fmt"
"strconv"
) )
func (b *buffer) debugDump(w *bufio.Writer) { func (b *buffer) debugDump(buf []byte) []byte {
if b.prev != nil { if b.prev != nil {
b.prev.debugDump(w) buf = b.prev.debugDump(buf)
} }
end := "" start, len, end := b.start, len(b.data), b.start+b.frequency*int64(len(b.data))
buf = append(buf, `{"start":`...)
buf = strconv.AppendInt(buf, start, 10)
buf = append(buf, `,"len":`...)
buf = strconv.AppendInt(buf, int64(len), 10)
buf = append(buf, `,"end":`...)
buf = strconv.AppendInt(buf, end, 10)
if b.archived {
buf = append(buf, `,"saved":true`...)
}
if b.next != nil { if b.next != nil {
end = " -> " buf = append(buf, `},`...)
} else {
buf = append(buf, `}`...)
}
return buf
} }
to := b.start + b.frequency*int64(len(b.data)) func (l *level) debugDump(m *MemoryStore, w *bufio.Writer, lvlname string, buf []byte, depth int) ([]byte, error) {
fmt.Fprintf(w, "buffer(from=%d, len=%d, to=%d, archived=%v)%s", b.start, len(b.data), to, b.archived, end)
}
func (l *level) debugDump(w *bufio.Writer, m *MemoryStore, indent string) error {
l.lock.RLock() l.lock.RLock()
defer l.lock.RUnlock() defer l.lock.RUnlock()
for i := 0; i < depth; i++ {
buf = append(buf, '\t')
}
buf = append(buf, '"')
buf = append(buf, lvlname...)
buf = append(buf, "\":{\n"...)
depth += 1
objitems := 0
for name, mc := range m.metrics {
if b := l.metrics[mc.offset]; b != nil {
for i := 0; i < depth; i++ {
buf = append(buf, '\t')
}
for name, minfo := range m.metrics { buf = append(buf, '"')
b := l.metrics[minfo.offset] buf = append(buf, name...)
if b != nil { buf = append(buf, `":[`...)
fmt.Fprintf(w, "%smetric '%s': ", indent, name) buf = b.debugDump(buf)
b.debugDump(w) buf = append(buf, "],\n"...)
fmt.Fprint(w, "\n") objitems++
} }
} }
if l.children != nil && len(l.children) > 0 {
fmt.Fprintf(w, "%schildren:\n", indent)
for name, lvl := range l.children { for name, lvl := range l.children {
fmt.Fprintf(w, "%s'%s':\n", indent, name) _, err := w.Write(buf)
lvl.debugDump(w, m, "\t"+indent) if err != nil {
} return nil, err
} }
return nil buf = buf[0:0]
buf, err = lvl.debugDump(m, w, name, buf, depth)
if err != nil {
return nil, err
}
buf = append(buf, ',', '\n')
objitems++
}
// remove final `,`:
if objitems > 0 {
buf = append(buf[0:len(buf)-1], '\n')
}
depth -= 1
for i := 0; i < depth; i++ {
buf = append(buf, '\t')
}
buf = append(buf, '}')
return buf, nil
}
func (m *MemoryStore) DebugDump(w *bufio.Writer, selector []string) error {
lvl := m.root.findLevel(selector)
if lvl == nil {
return fmt.Errorf("not found: %#v", selector)
}
buf := make([]byte, 0, 2048)
buf = append(buf, "{"...)
buf, err := lvl.debugDump(m, w, "data", buf, 0)
if err != nil {
return err
}
buf = append(buf, "}\n"...)
if _, err = w.Write(buf); err != nil {
return err
} }
func (m *MemoryStore) DebugDump(w *bufio.Writer) error {
fmt.Fprintf(w, "MemoryStore (%d MB):\n", m.SizeInBytes()/1024/1024)
m.root.debugDump(w, m, " ")
return w.Flush() return w.Flush()
} }

View File

@ -23,7 +23,7 @@ func (f Float) MarshalJSON() ([]byte, error) {
return nullAsBytes, nil return nullAsBytes, nil
} }
return strconv.AppendFloat(make([]byte, 0, 10), float64(f), 'f', -1, 64), nil return strconv.AppendFloat(make([]byte, 0, 10), float64(f), 'f', 3, 64), nil
} }
func (f *Float) UnmarshalJSON(input []byte) error { func (f *Float) UnmarshalJSON(input []byte) error {
@ -54,7 +54,7 @@ func (fa FloatArray) MarshalJSON() ([]byte, error) {
if fa[i].IsNaN() { if fa[i].IsNaN() {
buf = append(buf, `null`...) buf = append(buf, `null`...)
} else { } else {
buf = strconv.AppendFloat(buf, float64(fa[i]), 'f', -1, 64) buf = strconv.AppendFloat(buf, float64(fa[i]), 'f', 3, 64)
} }
} }

View File

@ -295,6 +295,8 @@ func decodeLine(dec *lineprotocol.Decoder, clusterDefault string) error {
metric.Value = Float(val.FloatV()) metric.Value = Float(val.FloatV())
} else if val.Kind() == lineprotocol.Int { } else if val.Kind() == lineprotocol.Int {
metric.Value = Float(val.IntV()) metric.Value = Float(val.IntV())
} else if val.Kind() == lineprotocol.Uint {
metric.Value = Float(val.UintV())
} else { } else {
return fmt.Errorf("unsupported value type in message: %s", val.Kind().String()) return fmt.Errorf("unsupported value type in message: %s", val.Kind().String())
} }

View File

@ -50,6 +50,22 @@ func (se *SelectorElement) MarshalJSON() ([]byte, error) {
type Selector []SelectorElement type Selector []SelectorElement
func (l *level) findLevel(selector []string) *level {
if len(selector) == 0 {
return l
}
l.lock.RLock()
defer l.lock.RUnlock()
lvl := l.children[selector[0]]
if lvl == nil {
return nil
}
return lvl.findLevel(selector[1:])
}
func (l *level) findBuffers(selector Selector, offset int, f func(b *buffer) error) error { func (l *level) findBuffers(selector Selector, offset int, f func(b *buffer) error) error {
l.lock.RLock() l.lock.RLock()
defer l.lock.RUnlock() defer l.lock.RUnlock()