From 53d5734fd5e4e2b03ff5e133b62d766213513875 Mon Sep 17 00:00:00 2001 From: Lou Knauer Date: Mon, 13 Sep 2021 12:25:56 +0200 Subject: [PATCH] New peek query (newest values) --- api.go | 17 ++++++++ selector.go | 4 +- stats.go | 121 ++++++++++++++++++++++++++-------------------------- 3 files changed, 79 insertions(+), 63 deletions(-) diff --git a/api.go b/api.go index 00593c4..69267d6 100644 --- a/api.go +++ b/api.go @@ -187,12 +187,29 @@ func handleFree(rw http.ResponseWriter, r *http.Request) { rw.Write([]byte(fmt.Sprintf("buffers freed: %d\n", n))) } +func handlePeek(rw http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + cluster := vars["cluster"] + res, err := memoryStore.Peek(cluster) + if err != nil { + http.Error(rw, err.Error(), http.StatusInternalServerError) + return + } + + rw.Header().Set("Content-Type", "application/json") + err = json.NewEncoder(rw).Encode(res) + if err != nil { + log.Println(err.Error()) + } +} + func StartApiServer(address string, ctx context.Context) error { r := mux.NewRouter() r.HandleFunc("/api/{from:[0-9]+}/{to:[0-9]+}/timeseries", handleTimeseries) r.HandleFunc("/api/{from:[0-9]+}/{to:[0-9]+}/stats", handleStats) r.HandleFunc("/api/{to:[0-9]+}/free", handleFree) + r.HandleFunc("/api/{cluster}/peek", handlePeek) server := &http.Server{ Handler: r, diff --git a/selector.go b/selector.go index e857dee..357b02f 100644 --- a/selector.go +++ b/selector.go @@ -67,7 +67,7 @@ func (l *level) findBuffers(selector Selector, offset int, f func(b *buffer) err } sel := selector[0] - if len(sel.String) != 0 { + if len(sel.String) != 0 && l.children != nil { lvl, ok := l.children[sel.String] if ok { err := lvl.findBuffers(selector[1:], offset, f) @@ -78,7 +78,7 @@ func (l *level) findBuffers(selector Selector, offset int, f func(b *buffer) err return nil } - if sel.Group != nil { + if sel.Group != nil && l.children != nil { for _, key := range sel.Group { lvl, ok := l.children[key] if ok { diff --git a/stats.go b/stats.go index 2e13b66..28f79b4 100644 --- a/stats.go +++ b/stats.go @@ -12,7 +12,6 @@ type Stats struct { Max Float } -// Return `Stats` by value for less allocations/GC? func (b *buffer) stats(from, to int64) (Stats, int64, int64, error) { if from < b.start { if b.prev != nil { @@ -58,66 +57,9 @@ func (b *buffer) stats(from, to int64) (Stats, int64, int64, error) { }, from, t, nil } -// This function assmumes that `l.lock` is LOCKED! -// It basically works just like level.read but calculates min/max/avg for that data level.read would return. -// TODO: Make this DRY? -func (l *level) stats(offset int, from, to int64, aggreg AggregationStrategy) (Stats, int64, int64, error) { - if b := l.metrics[offset]; b != nil { - return b.stats(from, to) - } - - if len(l.children) == 0 { - return Stats{}, 0, 0, ErrNoData - } - - n := 0 - samples := 0 - avg, min, max := Float(0), Float(math.MaxFloat32), Float(-math.MaxFloat32) - for _, child := range l.children { - child.lock.RLock() - stats, cfrom, cto, err := child.stats(offset, from, to, aggreg) - child.lock.RUnlock() - - if err == ErrNoData { - continue - } - - if err != nil { - return Stats{}, 0, 0, err - } - - if n == 0 { - from = cfrom - to = cto - } else if cfrom != from || cto != to { - return Stats{}, 0, 0, ErrDataDoesNotAlign - } - - samples += stats.Samples - avg += stats.Avg - min = Float(math.Min(float64(min), float64(stats.Min))) - max = Float(math.Max(float64(max), float64(stats.Max))) - n += 1 - } - - if n == 0 { - return Stats{}, 0, 0, ErrNoData - } - - if aggreg == AvgAggregation { - avg /= Float(n) - } else if aggreg != SumAggregation { - return Stats{}, 0, 0, errors.New("invalid aggregation") - } - - return Stats{ - Samples: samples, - Avg: avg, - Min: min, - Max: max, - }, from, to, nil -} - +// Returns statistics for the requested metric on the selected node/level. +// Data is aggregated to the selected level the same way as in `MemoryStore.Read`. +// If `Stats.Samples` is zero, the statistics should not be considered as valid. func (m *MemoryStore) Stats(selector Selector, metric string, from, to int64) (*Stats, int64, int64, error) { if from > to { return nil, 0, 0, errors.New("invalid time range") @@ -171,3 +113,60 @@ func (m *MemoryStore) Stats(selector Selector, metric string, from, to int64) (* Max: Float(max), }, from, to, nil } + +// Return the newest value of the metric at offset `offset`. +// In case the level does not hold the metric itself, +// sum up the values from all lower levels. +func (l *level) peek(offset int) (Float, int) { + b := l.metrics[offset] + if b != nil { + x := b.data[len(b.data)-1] + return x, 1 + } + + n, sum := 0, Float(0) + for _, lvl := range l.children { + lvl.lock.RLock() + x, m := lvl.peek(offset) + lvl.lock.RUnlock() + n += m + sum += x + } + + return sum, n +} + +// Return the newest value for every metric of every node for the given cluster. +// All values are always aggregated to a node. +func (m *MemoryStore) Peek(cluster string) (map[string]map[string]Float, error) { + m.root.lock.RLock() + clusterLevel, ok := m.root.children[cluster] + m.root.lock.RUnlock() + if !ok { + return nil, errors.New("no such cluster: " + cluster) + } + + clusterLevel.lock.RLock() + defer clusterLevel.lock.RUnlock() + + nodes := make(map[string]map[string]Float) + for node, l := range clusterLevel.children { + l.lock.RLock() + metrics := make(map[string]Float) + for metric, minfo := range m.metrics { + x, n := l.peek(minfo.offset) + if n > 1 { + if minfo.aggregation == NoAggregation { + return nil, errors.New("cannot aggregate: " + metric) + } else if minfo.aggregation == AvgAggregation { + x /= Float(n) + } + } + metrics[metric] = x + } + nodes[node] = metrics + l.lock.RUnlock() + } + + return nodes, nil +}