New peek query (newest values)

This commit is contained in:
Lou Knauer 2021-09-13 12:25:56 +02:00
parent 53339eb8eb
commit 53d5734fd5
3 changed files with 79 additions and 63 deletions

17
api.go
View File

@ -187,12 +187,29 @@ func handleFree(rw http.ResponseWriter, r *http.Request) {
rw.Write([]byte(fmt.Sprintf("buffers freed: %d\n", n))) rw.Write([]byte(fmt.Sprintf("buffers freed: %d\n", n)))
} }
func handlePeek(rw http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
cluster := vars["cluster"]
res, err := memoryStore.Peek(cluster)
if err != nil {
http.Error(rw, err.Error(), http.StatusInternalServerError)
return
}
rw.Header().Set("Content-Type", "application/json")
err = json.NewEncoder(rw).Encode(res)
if err != nil {
log.Println(err.Error())
}
}
func StartApiServer(address string, ctx context.Context) error { func StartApiServer(address string, ctx context.Context) error {
r := mux.NewRouter() r := mux.NewRouter()
r.HandleFunc("/api/{from:[0-9]+}/{to:[0-9]+}/timeseries", handleTimeseries) r.HandleFunc("/api/{from:[0-9]+}/{to:[0-9]+}/timeseries", handleTimeseries)
r.HandleFunc("/api/{from:[0-9]+}/{to:[0-9]+}/stats", handleStats) r.HandleFunc("/api/{from:[0-9]+}/{to:[0-9]+}/stats", handleStats)
r.HandleFunc("/api/{to:[0-9]+}/free", handleFree) r.HandleFunc("/api/{to:[0-9]+}/free", handleFree)
r.HandleFunc("/api/{cluster}/peek", handlePeek)
server := &http.Server{ server := &http.Server{
Handler: r, Handler: r,

View File

@ -67,7 +67,7 @@ func (l *level) findBuffers(selector Selector, offset int, f func(b *buffer) err
} }
sel := selector[0] sel := selector[0]
if len(sel.String) != 0 { if len(sel.String) != 0 && l.children != nil {
lvl, ok := l.children[sel.String] lvl, ok := l.children[sel.String]
if ok { if ok {
err := lvl.findBuffers(selector[1:], offset, f) err := lvl.findBuffers(selector[1:], offset, f)
@ -78,7 +78,7 @@ func (l *level) findBuffers(selector Selector, offset int, f func(b *buffer) err
return nil return nil
} }
if sel.Group != nil { if sel.Group != nil && l.children != nil {
for _, key := range sel.Group { for _, key := range sel.Group {
lvl, ok := l.children[key] lvl, ok := l.children[key]
if ok { if ok {

121
stats.go
View File

@ -12,7 +12,6 @@ type Stats struct {
Max Float Max Float
} }
// Return `Stats` by value for less allocations/GC?
func (b *buffer) stats(from, to int64) (Stats, int64, int64, error) { func (b *buffer) stats(from, to int64) (Stats, int64, int64, error) {
if from < b.start { if from < b.start {
if b.prev != nil { if b.prev != nil {
@ -58,66 +57,9 @@ func (b *buffer) stats(from, to int64) (Stats, int64, int64, error) {
}, from, t, nil }, from, t, nil
} }
// This function assmumes that `l.lock` is LOCKED! // Returns statistics for the requested metric on the selected node/level.
// It basically works just like level.read but calculates min/max/avg for that data level.read would return. // Data is aggregated to the selected level the same way as in `MemoryStore.Read`.
// TODO: Make this DRY? // If `Stats.Samples` is zero, the statistics should not be considered as valid.
func (l *level) stats(offset int, from, to int64, aggreg AggregationStrategy) (Stats, int64, int64, error) {
if b := l.metrics[offset]; b != nil {
return b.stats(from, to)
}
if len(l.children) == 0 {
return Stats{}, 0, 0, ErrNoData
}
n := 0
samples := 0
avg, min, max := Float(0), Float(math.MaxFloat32), Float(-math.MaxFloat32)
for _, child := range l.children {
child.lock.RLock()
stats, cfrom, cto, err := child.stats(offset, from, to, aggreg)
child.lock.RUnlock()
if err == ErrNoData {
continue
}
if err != nil {
return Stats{}, 0, 0, err
}
if n == 0 {
from = cfrom
to = cto
} else if cfrom != from || cto != to {
return Stats{}, 0, 0, ErrDataDoesNotAlign
}
samples += stats.Samples
avg += stats.Avg
min = Float(math.Min(float64(min), float64(stats.Min)))
max = Float(math.Max(float64(max), float64(stats.Max)))
n += 1
}
if n == 0 {
return Stats{}, 0, 0, ErrNoData
}
if aggreg == AvgAggregation {
avg /= Float(n)
} else if aggreg != SumAggregation {
return Stats{}, 0, 0, errors.New("invalid aggregation")
}
return Stats{
Samples: samples,
Avg: avg,
Min: min,
Max: max,
}, from, to, nil
}
func (m *MemoryStore) Stats(selector Selector, metric string, from, to int64) (*Stats, int64, int64, error) { func (m *MemoryStore) Stats(selector Selector, metric string, from, to int64) (*Stats, int64, int64, error) {
if from > to { if from > to {
return nil, 0, 0, errors.New("invalid time range") return nil, 0, 0, errors.New("invalid time range")
@ -171,3 +113,60 @@ func (m *MemoryStore) Stats(selector Selector, metric string, from, to int64) (*
Max: Float(max), Max: Float(max),
}, from, to, nil }, from, to, nil
} }
// Return the newest value of the metric at offset `offset`.
// In case the level does not hold the metric itself,
// sum up the values from all lower levels.
func (l *level) peek(offset int) (Float, int) {
b := l.metrics[offset]
if b != nil {
x := b.data[len(b.data)-1]
return x, 1
}
n, sum := 0, Float(0)
for _, lvl := range l.children {
lvl.lock.RLock()
x, m := lvl.peek(offset)
lvl.lock.RUnlock()
n += m
sum += x
}
return sum, n
}
// Return the newest value for every metric of every node for the given cluster.
// All values are always aggregated to a node.
func (m *MemoryStore) Peek(cluster string) (map[string]map[string]Float, error) {
m.root.lock.RLock()
clusterLevel, ok := m.root.children[cluster]
m.root.lock.RUnlock()
if !ok {
return nil, errors.New("no such cluster: " + cluster)
}
clusterLevel.lock.RLock()
defer clusterLevel.lock.RUnlock()
nodes := make(map[string]map[string]Float)
for node, l := range clusterLevel.children {
l.lock.RLock()
metrics := make(map[string]Float)
for metric, minfo := range m.metrics {
x, n := l.peek(minfo.offset)
if n > 1 {
if minfo.aggregation == NoAggregation {
return nil, errors.New("cannot aggregate: " + metric)
} else if minfo.aggregation == AvgAggregation {
x /= Float(n)
}
}
metrics[metric] = x
}
nodes[node] = metrics
l.lock.RUnlock()
}
return nodes, nil
}