Start working on pre-computed stats

This commit is contained in:
Lou Knauer 2021-12-15 09:59:33 +01:00
parent 5d89d87a2d
commit 878e9d7154
6 changed files with 93 additions and 34 deletions

22
api.go
View File

@ -35,19 +35,15 @@ type ApiMetricData struct {
From int64 `json:"from"` From int64 `json:"from"`
To int64 `json:"to"` To int64 `json:"to"`
Data []Float `json:"data"` Data []Float `json:"data"`
Avg *float64 `json:"avg"` Avg Float `json:"avg"`
Min *float64 `json:"min"` Min Float `json:"min"`
Max *float64 `json:"max"` Max Float `json:"max"`
} }
// TODO: Optimize this, just like the stats endpoint! // TODO: Optimize this, just like the stats endpoint!
func (data *ApiMetricData) AddStats() { func (data *ApiMetricData) AddStats() {
if len(data.Data) == 0 || data.Error != nil {
return
}
n := 0 n := 0
sum, min, max := 0.0, float64(data.Data[0]), float64(data.Data[0]) sum, min, max := 0.0, math.MaxFloat64, -math.MaxFloat64
for _, x := range data.Data { for _, x := range data.Data {
if x.IsNaN() { if x.IsNaN() {
continue continue
@ -59,10 +55,14 @@ func (data *ApiMetricData) AddStats() {
max = math.Max(max, float64(x)) max = math.Max(max, float64(x))
} }
if n > 0 {
avg := sum / float64(n) avg := sum / float64(n)
data.Avg = &avg data.Avg = Float(avg)
data.Min = &min data.Min = Float(min)
data.Max = &max data.Max = Float(max)
} else {
data.Avg, data.Min, data.Max = NaN, NaN, NaN
}
} }
type ApiStatsData struct { type ApiStatsData struct {

View File

@ -245,6 +245,7 @@ func (l *level) loadFile(cf *CheckpointFile, m *MemoryStore) error {
next: nil, next: nil,
archived: true, archived: true,
} }
b.close()
minfo, ok := m.metrics[name] minfo, ok := m.metrics[name]
if !ok { if !ok {

View File

@ -1,30 +1,41 @@
{ {
"metrics": { "metrics": {
"load_one": { "frequency": 10, "aggregation": null, "scope": "node" }, "load_one": { "frequency": 3, "aggregation": null, "scope": "node" },
"load_five": { "frequency": 10, "aggregation": null, "scope": "node" }, "load_five": { "frequency": 3, "aggregation": null, "scope": "node" },
"load_fifteen": { "frequency": 10, "aggregation": null, "scope": "node" }, "load_fifteen": { "frequency": 3, "aggregation": null, "scope": "node" },
"proc_run": { "frequency": 10, "aggregation": null, "scope": "node" }, "proc_run": { "frequency": 3, "aggregation": null, "scope": "node" },
"proc_total": { "frequency": 10, "aggregation": null, "scope": "node" }, "proc_total": { "frequency": 3, "aggregation": null, "scope": "node" },
"mem_free": { "frequency": 10, "aggregation": null, "scope": "node" }, "mem_free": { "frequency": 3, "aggregation": null, "scope": "node" },
"mem_used": { "frequency": 10, "aggregation": null, "scope": "node" }, "mem_cached": { "frequency": 3, "aggregation": null, "scope": "node" },
"power": { "frequency": 10, "aggregation": "sum", "scope": "socket" }, "mem_total": { "frequency": 3, "aggregation": null, "scope": "node" },
"mem_bw": { "frequency": 10, "aggregation": "sum", "scope": "socket" }, "swap_total": { "frequency": 3, "aggregation": null, "scope": "node" },
"flops_sp": { "frequency": 10, "aggregation": "sum", "scope": "cpu" }, "mem_slab": { "frequency": 3, "aggregation": null, "scope": "node" },
"flops_dp": { "frequency": 10, "aggregation": "sum", "scope": "cpu" }, "mem_buffers": { "frequency": 3, "aggregation": null, "scope": "node" },
"flops_any": { "frequency": 10, "aggregation": "sum", "scope": "cpu" }, "mem_sreclaimable": { "frequency": 3, "aggregation": null, "scope": "node" },
"clock": { "frequency": 10, "aggregation": "avg", "scope": "cpu" }, "mem_available": { "frequency": 3, "aggregation": null, "scope": "node" },
"cpi": { "frequency": 10, "aggregation": "avg", "scope": "cpu" } "swap_free": { "frequency": 3, "aggregation": null, "scope": "node" },
"mem_used": { "frequency": 3, "aggregation": null, "scope": "node" },
"cpu_user": { "frequency": 3, "aggregation": "sum", "scope": "cpu" },
"cpu_nice": { "frequency": 3, "aggregation": "sum", "scope": "cpu" },
"cpu_system": { "frequency": 3, "aggregation": "sum", "scope": "cpu" },
"cpu_idle": { "frequency": 3, "aggregation": "sum", "scope": "cpu" },
"cpu_iowait": { "frequency": 3, "aggregation": "sum", "scope": "cpu" },
"cpu_irq": { "frequency": 3, "aggregation": "sum", "scope": "cpu" },
"cpu_softirq": { "frequency": 3, "aggregation": "sum", "scope": "cpu" },
"cpu_steal": { "frequency": 3, "aggregation": "sum", "scope": "cpu" },
"cpu_guest": { "frequency": 3, "aggregation": "sum", "scope": "cpu" },
"cpu_guest_nice": { "frequency": 3, "aggregation": "sum", "scope": "cpu" }
}, },
"checkpoints": { "checkpoints": {
"interval": 21600, "interval": 60,
"directory": "./var/checkpoints", "directory": "./var/checkpoints",
"restore": 43200 "restore": 120
}, },
"archive": { "archive": {
"interval": 86400, "interval": 180,
"directory": "./var/archive" "directory": "./var/archive"
}, },
"retention-in-memory": 86400, "retention-in-memory": 120,
"http-api-address": "0.0.0.0:8081", "http-api-address": "0.0.0.0:8081",
"nats": null, "nats": null,
"jwt-public-key": "kzfYrYy+TzpanWZHJ5qSdMj5uKUWgq74BWhQG6copP0=" "jwt-public-key": "kzfYrYy+TzpanWZHJ5qSdMj5uKUWgq74BWhQG6copP0="

View File

@ -2,6 +2,7 @@ package main
import ( import (
"errors" "errors"
"math"
"sync" "sync"
) )
@ -37,6 +38,14 @@ type buffer struct {
data []Float // The slice should never reallocacte as `cap(data)` is respected. data []Float // The slice should never reallocacte as `cap(data)` is respected.
prev, next *buffer // `prev` contains older data, `next` newer data. prev, next *buffer // `prev` contains older data, `next` newer data.
archived bool // If true, this buffer is already archived archived bool // If true, this buffer is already archived
closed bool
statisticts struct {
samples int
min Float
max Float
avg Float
}
} }
func newBuffer(ts, freq int64) *buffer { func newBuffer(ts, freq int64) *buffer {
@ -46,6 +55,7 @@ func newBuffer(ts, freq int64) *buffer {
b.prev = nil b.prev = nil
b.next = nil b.next = nil
b.archived = false b.archived = false
b.closed = false
b.data = b.data[:0] b.data = b.data[:0]
return b return b
} }
@ -70,6 +80,7 @@ func (b *buffer) write(ts int64, value Float) (*buffer, error) {
newbuf := newBuffer(ts, b.frequency) newbuf := newBuffer(ts, b.frequency)
newbuf.prev = b newbuf.prev = b
b.next = newbuf b.next = newbuf
b.close()
b = newbuf b = newbuf
idx = 0 idx = 0
} }
@ -93,6 +104,37 @@ func (b *buffer) end() int64 {
return b.start + int64(len(b.data))*b.frequency return b.start + int64(len(b.data))*b.frequency
} }
func (b *buffer) close() {
if b.closed {
return
}
b.closed = true
n, sum, min, max := 0, 0., math.MaxFloat64, -math.MaxFloat64
for _, x := range b.data {
if x.IsNaN() {
continue
}
n += 1
f := float64(x)
sum += f
min = math.Min(min, f)
max = math.Max(max, f)
}
b.statisticts.samples = n
if n > 0 {
b.statisticts.avg = Float(sum / float64(n))
b.statisticts.min = Float(min)
b.statisticts.max = Float(max)
} else {
b.statisticts.avg = NaN
b.statisticts.min = NaN
b.statisticts.max = NaN
}
}
// func interpolate(idx int, data []Float) Float { // func interpolate(idx int, data []Float) Float {
// if idx == 0 || idx+1 == len(data) { // if idx == 0 || idx+1 == len(data) {
// return NaN // return NaN

View File

@ -80,6 +80,8 @@ func handleLine(dec *lineprotocol.Decoder) error {
typeName = string(val) typeName = string(val)
case "type-id": case "type-id":
typeId = string(val) typeId = string(val)
case "unit", "group":
// Ignore... (Important only for ganglia)
default: default:
return fmt.Errorf("unkown tag: '%s' (value: '%s')", string(key), string(val)) return fmt.Errorf("unkown tag: '%s' (value: '%s')", string(key), string(val))
} }

View File

@ -20,6 +20,9 @@ func (b *buffer) stats(from, to int64) (Stats, int64, int64, error) {
from = b.start from = b.start
} }
// TODO: Check if b.closed and if so and the full buffer is queried,
// use b.statistics instead of iterating over the buffer.
samples := 0 samples := 0
sum, min, max := 0.0, math.MaxFloat32, -math.MaxFloat32 sum, min, max := 0.0, math.MaxFloat32, -math.MaxFloat32