Basic stats support

This commit is contained in:
Lou Knauer 2021-09-01 08:48:35 +02:00
parent 61bc7df93a
commit cbe5a0714c
5 changed files with 308 additions and 29 deletions

View File

@ -1,6 +1,6 @@
# ClusterCockpit Metric Store
![test workflow](https://github.com/ClusterCockpit/cc-metric-store/actions/workflows/test.yml/badge.svg)
[![Build & Test](https://github.com/ClusterCockpit/cc-metric-store/actions/workflows/test.yml/badge.svg)](https://github.com/ClusterCockpit/cc-metric-store/actions/workflows/test.yml)
Barely unusable yet. Go look at the [GitHub Issues](https://github.com/ClusterCockpit/cc-metric-store/issues) for a progress overview.
@ -27,7 +27,7 @@ go test -bench=. -race -v ./...
Tags in InfluxDB are used to build indexes over the stored data. InfluxDB-Tags have no
relation to each other, they do not depend on each other and have no hierarchy.
Different tags build up different indexes.
Different tags build up different indexes (I am no expert at all, but this is how i think they work).
This project also works as a time-series database and uses the InfluxDB line protocol.
Unlike InfluxDB, the data is indexed by one single strictly hierarchical tree structure.
@ -83,16 +83,16 @@ The plan is later to have the structure look like this (for this, the socket of
### Config file
- `metrics`: Map of metric-name to objects with the following properties
- `frequency`: Timestep/Interval/Resolution of this metric
- `aggregation`: Can be `"sum"`, `"avg"` or `null`.
- `null` means "horizontal" aggregation is disabled
- `frequency`: Timestep/Interval/Resolution of this metric (In seconds)
- `aggregation`: Can be `"sum"`, `"avg"` or `null`
- `null` means aggregation across nodes is forbidden for this metric
- `"sum"` means that values from the child levels are summed up for the parent level
- `"avg"` means that values from the child levels are averaged for the parent level
- `scope`: Unused at the moment, should be something like `"node"`, `"socket"` or `"cpu"`
- `nats`: Url of NATS.io server (The `updates` channel will be subscribed for metrics)
- `archive-root`: Directory to be used as archive (__Unimplemented__)
- `restore-last-hours`: After restart, load data from the past *X* hours back to memory (__Unimplemented__)
- `checkpoint-interval-hours`: Every *X* hours, write currently held data to disk (__Unimplemented__)
- `archive-root`: Directory to be used as archive
- `restore-last-hours`: After restart, load data from the past *X* hours back to memory
- `checkpoint-interval-hours`: Every *X* hours, write currently held data to disk
### Test the complete setup (excluding ClusterCockpit itself)
@ -106,7 +106,7 @@ docker pull nats:latest
docker run -p 4222:4222 -ti nats:latest
```
Second, build and start start the [cc-metric-collector](https://github.com/ClusterCockpit/cc-metric-collector) using the following as `config.json`:
Second, build and start the [cc-metric-collector](https://github.com/ClusterCockpit/cc-metric-collector) using the following as `config.json`:
```json
{
@ -139,13 +139,20 @@ And finally, use the API to fetch some data:
```sh
# If the collector and store and nats-server have been running for at least 60 seconds on the same host, you may run:
curl -D - "http://localhost:8080/api/$(expr $(date +%s) - 60)/$(date +%s)/timeseries" -d "[ { \"selector\": [\"testcluster\", \"$(hostname)\"], \"metrics\": [\"load_one\"] } ]"
curl -D - "http://localhost:8080/api/$(expr $(date +%s) - 60)/$(date +%s)/timeseries" -d "{ \"selectors\": [[\"testcluster\", \"$(hostname)\"]], \"metrics\": [\"load_one\"] }"
# Get flops_any for all CPUs:
curl -D - "http://localhost:8080/api/$(expr $(date +%s) - 60)/$(date +%s)/timeseries" -d "[ { \"selector\": [\"testcluster\", \"$(hostname)\", \"cpu\"], \"metrics\": [\"flops_any\"] } ]"
curl -D - "http://localhost:8080/api/$(expr $(date +%s) - 60)/$(date +%s)/timeseries" -d "{ \"selectors\": [[\"testcluster\", \"$(hostname)\", \"cpu\"]], \"metrics\": [\"flops_any\"] }"
# Get flops_any for CPU 0:
curl -D - "http://localhost:8080/api/$(expr $(date +%s) - 60)/$(date +%s)/timeseries" -d "[ { \"selector\": [\"testcluster\", \"$(hostname)\", \"cpu\", \"0\"], \"metrics\": [\"flops_any\"] } ]"
curl -D - "http://localhost:8080/api/$(expr $(date +%s) - 60)/$(date +%s)/timeseries" -d "{ \"selectors\": [[\"testcluster\", \"$(hostname)\", \"cpu\", \"0\"]], \"metrics\": [\"flops_any\"] }"
# Stats for load_one and proc_run:
curl -D - "http://localhost:8080/api/$(expr $(date +%s) - 60)/$(date +%s)/stats" -d "{ \"selectors\": [[\"testcluster\", \"$(hostname)\"]], \"metrics\": [\"load_one\", \"proc_run\"] }"
# Stats for *all* CPUs aggregated both from CPU to node and over time:
curl -D - "http://localhost:8080/api/$(expr $(date +%s) - 60)/$(date +%s)/stats" -d "{ \"selectors\": [[\"testcluster\", \"$(hostname)\", \"cpu\"]], \"metrics\": [\"flops_sp\", \"flops_dp\"] }"
# ...
```

11
TODO.md
View File

@ -10,13 +10,10 @@
- Implement API endpoint for releasing old data
- Make sure data is written to disk before it is released
- Automatically free up old buffers periodically?
- Implement basic support for aggregations over time (stats like min/max/avg)
- Optimization: Once a buffer is full, calculate min, max and avg
- Calculate averages buffer-wise, average weighted by length of buffer
- Optimization: Once a buffer is full, calculate min, max and avg
- Calculate averages buffer-wise, average weighted by length of buffer
- Only the head-buffer needs to be fully traversed
- Implement basic support for query of most recent value for every metric on every host
- Optimize horizontal aggregations
- Optimize locking of levels in the tree structure
- In 99.9% of cases, no new level will need to be created, so all lookups into `level.children` will be read only
- `level.metrics` will be modified more often and will accesses will need to be serialized here
- Suggestion: Use a proper Mutex for `level.metrics`, but something read-optimized and possibly lock-free for `level.children`
- All metrics are known in advance, including the level: Use this to replace `level.metrics` hashmap by slice?
- ...

86
api.go
View File

@ -12,12 +12,13 @@ import (
)
// Example:
// [
// { "selector": ["emmy", "host123"], "metrics": ["load_one"] }
// ]
type ApiRequestBody []struct {
Selector []string `json:"selector"`
Metrics []string `json:"metrics"`
// {
// "metrics": ["flops_sp", "flops_dp"]
// "selectors": [["emmy", "host123", "cpu", "0"], ["emmy", "host123", "cpu", "1"]]
// }
type ApiRequestBody struct {
Metrics []string `json:"metrics"`
Selectors [][]string `json:"selectors"`
}
type ApiMetricData struct {
@ -26,6 +27,15 @@ type ApiMetricData struct {
Data []Float `json:"data"`
}
type ApiStatsData struct {
From int64 `json:"from"`
To int64 `json:"to"`
Samples int `json:"samples"`
Avg Float `json:"avg"`
Min Float `json:"min"`
Max Float `json:"max"`
}
func handleTimeseries(rw http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
from, err := strconv.ParseInt(vars["from"], 10, 64)
@ -52,11 +62,11 @@ func handleTimeseries(rw http.ResponseWriter, r *http.Request) {
return
}
res := make([]map[string]ApiMetricData, 0, len(reqBody))
for _, req := range reqBody {
res := make([]map[string]ApiMetricData, 0, len(reqBody.Selectors))
for _, selector := range reqBody.Selectors {
metrics := make(map[string]ApiMetricData)
for _, metric := range req.Metrics {
data, f, t, err := memoryStore.Read(req.Selector, metric, from, to)
for _, metric := range reqBody.Metrics {
data, f, t, err := memoryStore.Read(selector, metric, from, to)
if err != nil {
http.Error(rw, err.Error(), http.StatusInternalServerError)
return
@ -78,10 +88,66 @@ func handleTimeseries(rw http.ResponseWriter, r *http.Request) {
}
}
func handleStats(rw http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
from, err := strconv.ParseInt(vars["from"], 10, 64)
if err != nil {
http.Error(rw, err.Error(), http.StatusBadRequest)
return
}
to, err := strconv.ParseInt(vars["to"], 10, 64)
if err != nil {
http.Error(rw, err.Error(), http.StatusBadRequest)
return
}
if r.Method != http.MethodPost {
http.Error(rw, "Method Not Allowed", http.StatusMethodNotAllowed)
return
}
bodyDec := json.NewDecoder(r.Body)
var reqBody ApiRequestBody
err = bodyDec.Decode(&reqBody)
if err != nil {
http.Error(rw, err.Error(), http.StatusBadRequest)
return
}
res := make([]map[string]ApiStatsData, 0, len(reqBody.Selectors))
for _, selector := range reqBody.Selectors {
metrics := make(map[string]ApiStatsData)
for _, metric := range reqBody.Metrics {
stats, f, t, err := memoryStore.Stats(selector, metric, from, to)
if err != nil {
http.Error(rw, err.Error(), http.StatusInternalServerError)
return
}
metrics[metric] = ApiStatsData{
From: f,
To: t,
Samples: stats.Samples,
Avg: stats.Avg,
Min: stats.Min,
Max: stats.Max,
}
}
res = append(res, metrics)
}
rw.Header().Set("Content-Type", "application/json")
err = json.NewEncoder(rw).Encode(res)
if err != nil {
log.Println(err.Error())
}
}
func StartApiServer(address string, done chan bool) error {
r := mux.NewRouter()
r.HandleFunc("/api/{from:[0-9]+}/{to:[0-9]+}/timeseries", handleTimeseries)
r.HandleFunc("/api/{from:[0-9]+}/{to:[0-9]+}/stats", handleStats)
server := &http.Server{
Handler: r,

View File

@ -3,6 +3,7 @@ package main
import (
"fmt"
"math"
"math/rand"
"sync"
"testing"
)
@ -172,6 +173,75 @@ func TestMemoryStoreAggregation(t *testing.T) {
}
}
func TestMemoryStoreStats(t *testing.T) {
count := 3000
store := NewMemoryStore(map[string]MetricConfig{
"a": {Frequency: 1},
"b": {Frequency: 1, Aggregation: "avg"},
})
sel1 := []string{"cluster", "host1"}
sel2 := []string{"cluster", "host2", "left"}
sel3 := []string{"cluster", "host2", "right"}
samples := 0
asum, amin, amax := 0., math.MaxFloat32, -math.MaxFloat32
bsum, bmin, bmax := 0., math.MaxFloat32, -math.MaxFloat32
for i := 0; i < count; i++ {
if i%5 == 0 {
// Skip some writes, test if samples is calculated correctly
continue
}
samples += 1
a := float64(rand.Int()%100 - 50)
asum += a
amin = math.Min(amin, a)
amax = math.Max(amax, a)
b := float64(rand.Int()%100 - 50)
bsum += b * 2
bmin = math.Min(bmin, b)
bmax = math.Max(bmax, b)
store.Write(sel1, int64(i), []Metric{
{Name: "a", Value: Float(a)},
})
store.Write(sel2, int64(i), []Metric{
{Name: "b", Value: Float(b)},
})
store.Write(sel3, int64(i), []Metric{
{Name: "b", Value: Float(b)},
})
}
stats, from, to, err := store.Stats(sel1, "a", 0, int64(count))
if err != nil {
t.Fatal(err)
}
if from != 1 || to != int64(count) || stats.Samples != samples {
t.Fatalf("unexpected: from=%d, to=%d, stats.Samples=%d (expected samples=%d)\n", from, to, stats.Samples, samples)
}
if stats.Avg != Float(asum/float64(samples)) || stats.Min != Float(amin) || stats.Max != Float(amax) {
t.Fatalf("wrong stats: %#v\n", stats)
}
stats, from, to, err = store.Stats([]string{"cluster", "host2"}, "b", 0, int64(count))
if err != nil {
t.Fatal(err)
}
if from != 1 || to != int64(count) || stats.Samples != samples*2 {
t.Fatalf("unexpected: from=%d, to=%d, stats.Samples=%d (expected samples=%d)\n", from, to, stats.Samples, samples*2)
}
if stats.Avg != Float(bsum/float64(samples*2)) || stats.Min != Float(bmin) || stats.Max != Float(bmax) {
t.Fatalf("wrong stats: %#v (expected: avg=%f, min=%f, max=%f)\n", stats, bsum/float64(samples*2), bmin, bmax)
}
}
func TestMemoryStoreArchive(t *testing.T) {
store1 := NewMemoryStore(map[string]MetricConfig{
"a": {Frequency: 1},

139
stats.go Normal file
View File

@ -0,0 +1,139 @@
package main
import (
"errors"
"math"
)
type Stats struct {
Samples int
Avg Float
Min Float
Max Float
}
// Return `Stats` by value for less allocations/GC?
func (b *buffer) stats(from, to int64) (*Stats, int64, int64, error) {
if from < b.start {
if b.prev != nil {
return b.prev.stats(from, to)
}
from = b.start
}
samples := 0
sum, min, max := 0.0, math.MaxFloat32, -math.MaxFloat32
var t int64
for t = from; t < to; t += b.frequency {
idx := int((t - b.start) / b.frequency)
if idx >= cap(b.data) {
b = b.next
if b == nil {
break
}
idx = 0
}
if t < b.start || idx >= len(b.data) {
continue
}
xf := float64(b.data[idx])
if math.IsNaN(xf) {
continue
}
samples += 1
sum += xf
min = math.Min(min, xf)
max = math.Max(max, xf)
}
return &Stats{
Samples: samples,
Avg: Float(sum) / Float(samples),
Min: Float(min),
Max: Float(max),
}, from, t, nil
}
// This function assmumes that `l.lock` is LOCKED!
// It basically works just like level.read but calculates min/max/avg for that data level.read would return.
// TODO: Make this DRY?
func (l *level) stats(metric string, from, to int64, aggregation string) (*Stats, int64, int64, error) {
if b, ok := l.metrics[metric]; ok {
return b.stats(from, to)
}
if len(l.children) == 0 {
return nil, 0, 0, errors.New("no data for that metric/level")
}
if len(l.children) == 1 {
for _, child := range l.children {
child.lock.Lock()
stats, from, to, err := child.stats(metric, from, to, aggregation)
child.lock.Unlock()
return stats, from, to, err
}
}
samples := 0
avgSum, min, max := Float(0), Float(math.MaxFloat32), Float(-math.MaxFloat32)
for _, child := range l.children {
child.lock.Lock()
stats, cfrom, cto, err := child.stats(metric, from, to, aggregation)
child.lock.Unlock()
if err != nil {
return nil, 0, 0, err
}
if cfrom != from || cto != to {
// See level.read for more on this
if samples == 0 {
from = cfrom
to = cto
} else {
return nil, 0, 0, errors.New("data for metrics at child levels does not align")
}
}
samples += stats.Samples
avgSum += stats.Avg
min = Float(math.Min(float64(min), float64(stats.Min)))
max = Float(math.Max(float64(max), float64(stats.Max)))
}
avg := avgSum
if aggregation == "avg" {
avg /= Float(len(l.children))
} else if aggregation != "sum" {
return nil, 0, 0, errors.New("invalid aggregation strategy: " + aggregation)
}
return &Stats{
Samples: samples,
Avg: avg,
Min: min,
Max: max,
}, from, to, nil
}
func (m *MemoryStore) Stats(selector []string, metric string, from, to int64) (*Stats, int64, int64, error) {
l := m.root.findLevelOrCreate(selector)
l.lock.RLock()
defer l.lock.RUnlock()
if from > to {
return nil, 0, 0, errors.New("invalid time range")
}
minfo, ok := m.metrics[metric]
if !ok {
return nil, 0, 0, errors.New("unkown metric: " + metric)
}
return l.stats(metric, from, to, minfo.Aggregation)
}