diff --git a/README.md b/README.md index bdcbf3e..76625ff 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # ClusterCockpit Metric Store -![test workflow](https://github.com/ClusterCockpit/cc-metric-store/actions/workflows/test.yml/badge.svg) +[![Build & Test](https://github.com/ClusterCockpit/cc-metric-store/actions/workflows/test.yml/badge.svg)](https://github.com/ClusterCockpit/cc-metric-store/actions/workflows/test.yml) Barely unusable yet. Go look at the [GitHub Issues](https://github.com/ClusterCockpit/cc-metric-store/issues) for a progress overview. @@ -27,7 +27,7 @@ go test -bench=. -race -v ./... Tags in InfluxDB are used to build indexes over the stored data. InfluxDB-Tags have no relation to each other, they do not depend on each other and have no hierarchy. -Different tags build up different indexes. +Different tags build up different indexes (I am no expert at all, but this is how i think they work). This project also works as a time-series database and uses the InfluxDB line protocol. Unlike InfluxDB, the data is indexed by one single strictly hierarchical tree structure. @@ -83,16 +83,16 @@ The plan is later to have the structure look like this (for this, the socket of ### Config file - `metrics`: Map of metric-name to objects with the following properties - - `frequency`: Timestep/Interval/Resolution of this metric - - `aggregation`: Can be `"sum"`, `"avg"` or `null`. - - `null` means "horizontal" aggregation is disabled + - `frequency`: Timestep/Interval/Resolution of this metric (In seconds) + - `aggregation`: Can be `"sum"`, `"avg"` or `null` + - `null` means aggregation across nodes is forbidden for this metric - `"sum"` means that values from the child levels are summed up for the parent level - `"avg"` means that values from the child levels are averaged for the parent level - `scope`: Unused at the moment, should be something like `"node"`, `"socket"` or `"cpu"` - `nats`: Url of NATS.io server (The `updates` channel will be subscribed for metrics) -- `archive-root`: Directory to be used as archive (__Unimplemented__) -- `restore-last-hours`: After restart, load data from the past *X* hours back to memory (__Unimplemented__) -- `checkpoint-interval-hours`: Every *X* hours, write currently held data to disk (__Unimplemented__) +- `archive-root`: Directory to be used as archive +- `restore-last-hours`: After restart, load data from the past *X* hours back to memory +- `checkpoint-interval-hours`: Every *X* hours, write currently held data to disk ### Test the complete setup (excluding ClusterCockpit itself) @@ -106,7 +106,7 @@ docker pull nats:latest docker run -p 4222:4222 -ti nats:latest ``` -Second, build and start start the [cc-metric-collector](https://github.com/ClusterCockpit/cc-metric-collector) using the following as `config.json`: +Second, build and start the [cc-metric-collector](https://github.com/ClusterCockpit/cc-metric-collector) using the following as `config.json`: ```json { @@ -139,13 +139,20 @@ And finally, use the API to fetch some data: ```sh # If the collector and store and nats-server have been running for at least 60 seconds on the same host, you may run: -curl -D - "http://localhost:8080/api/$(expr $(date +%s) - 60)/$(date +%s)/timeseries" -d "[ { \"selector\": [\"testcluster\", \"$(hostname)\"], \"metrics\": [\"load_one\"] } ]" +curl -D - "http://localhost:8080/api/$(expr $(date +%s) - 60)/$(date +%s)/timeseries" -d "{ \"selectors\": [[\"testcluster\", \"$(hostname)\"]], \"metrics\": [\"load_one\"] }" # Get flops_any for all CPUs: -curl -D - "http://localhost:8080/api/$(expr $(date +%s) - 60)/$(date +%s)/timeseries" -d "[ { \"selector\": [\"testcluster\", \"$(hostname)\", \"cpu\"], \"metrics\": [\"flops_any\"] } ]" +curl -D - "http://localhost:8080/api/$(expr $(date +%s) - 60)/$(date +%s)/timeseries" -d "{ \"selectors\": [[\"testcluster\", \"$(hostname)\", \"cpu\"]], \"metrics\": [\"flops_any\"] }" # Get flops_any for CPU 0: -curl -D - "http://localhost:8080/api/$(expr $(date +%s) - 60)/$(date +%s)/timeseries" -d "[ { \"selector\": [\"testcluster\", \"$(hostname)\", \"cpu\", \"0\"], \"metrics\": [\"flops_any\"] } ]" +curl -D - "http://localhost:8080/api/$(expr $(date +%s) - 60)/$(date +%s)/timeseries" -d "{ \"selectors\": [[\"testcluster\", \"$(hostname)\", \"cpu\", \"0\"]], \"metrics\": [\"flops_any\"] }" + +# Stats for load_one and proc_run: +curl -D - "http://localhost:8080/api/$(expr $(date +%s) - 60)/$(date +%s)/stats" -d "{ \"selectors\": [[\"testcluster\", \"$(hostname)\"]], \"metrics\": [\"load_one\", \"proc_run\"] }" + +# Stats for *all* CPUs aggregated both from CPU to node and over time: +curl -D - "http://localhost:8080/api/$(expr $(date +%s) - 60)/$(date +%s)/stats" -d "{ \"selectors\": [[\"testcluster\", \"$(hostname)\", \"cpu\"]], \"metrics\": [\"flops_sp\", \"flops_dp\"] }" + # ... ``` diff --git a/TODO.md b/TODO.md index b916541..8349e5f 100644 --- a/TODO.md +++ b/TODO.md @@ -10,13 +10,10 @@ - Implement API endpoint for releasing old data - Make sure data is written to disk before it is released - Automatically free up old buffers periodically? -- Implement basic support for aggregations over time (stats like min/max/avg) - - Optimization: Once a buffer is full, calculate min, max and avg - - Calculate averages buffer-wise, average weighted by length of buffer +- Optimization: Once a buffer is full, calculate min, max and avg + - Calculate averages buffer-wise, average weighted by length of buffer + - Only the head-buffer needs to be fully traversed - Implement basic support for query of most recent value for every metric on every host - Optimize horizontal aggregations -- Optimize locking of levels in the tree structure - - In 99.9% of cases, no new level will need to be created, so all lookups into `level.children` will be read only - - `level.metrics` will be modified more often and will accesses will need to be serialized here - - Suggestion: Use a proper Mutex for `level.metrics`, but something read-optimized and possibly lock-free for `level.children` +- All metrics are known in advance, including the level: Use this to replace `level.metrics` hashmap by slice? - ... diff --git a/api.go b/api.go index 5da75d9..045a4fa 100644 --- a/api.go +++ b/api.go @@ -12,12 +12,13 @@ import ( ) // Example: -// [ -// { "selector": ["emmy", "host123"], "metrics": ["load_one"] } -// ] -type ApiRequestBody []struct { - Selector []string `json:"selector"` - Metrics []string `json:"metrics"` +// { +// "metrics": ["flops_sp", "flops_dp"] +// "selectors": [["emmy", "host123", "cpu", "0"], ["emmy", "host123", "cpu", "1"]] +// } +type ApiRequestBody struct { + Metrics []string `json:"metrics"` + Selectors [][]string `json:"selectors"` } type ApiMetricData struct { @@ -26,6 +27,15 @@ type ApiMetricData struct { Data []Float `json:"data"` } +type ApiStatsData struct { + From int64 `json:"from"` + To int64 `json:"to"` + Samples int `json:"samples"` + Avg Float `json:"avg"` + Min Float `json:"min"` + Max Float `json:"max"` +} + func handleTimeseries(rw http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) from, err := strconv.ParseInt(vars["from"], 10, 64) @@ -52,11 +62,11 @@ func handleTimeseries(rw http.ResponseWriter, r *http.Request) { return } - res := make([]map[string]ApiMetricData, 0, len(reqBody)) - for _, req := range reqBody { + res := make([]map[string]ApiMetricData, 0, len(reqBody.Selectors)) + for _, selector := range reqBody.Selectors { metrics := make(map[string]ApiMetricData) - for _, metric := range req.Metrics { - data, f, t, err := memoryStore.Read(req.Selector, metric, from, to) + for _, metric := range reqBody.Metrics { + data, f, t, err := memoryStore.Read(selector, metric, from, to) if err != nil { http.Error(rw, err.Error(), http.StatusInternalServerError) return @@ -78,10 +88,66 @@ func handleTimeseries(rw http.ResponseWriter, r *http.Request) { } } +func handleStats(rw http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + from, err := strconv.ParseInt(vars["from"], 10, 64) + if err != nil { + http.Error(rw, err.Error(), http.StatusBadRequest) + return + } + to, err := strconv.ParseInt(vars["to"], 10, 64) + if err != nil { + http.Error(rw, err.Error(), http.StatusBadRequest) + return + } + + if r.Method != http.MethodPost { + http.Error(rw, "Method Not Allowed", http.StatusMethodNotAllowed) + return + } + + bodyDec := json.NewDecoder(r.Body) + var reqBody ApiRequestBody + err = bodyDec.Decode(&reqBody) + if err != nil { + http.Error(rw, err.Error(), http.StatusBadRequest) + return + } + + res := make([]map[string]ApiStatsData, 0, len(reqBody.Selectors)) + for _, selector := range reqBody.Selectors { + metrics := make(map[string]ApiStatsData) + for _, metric := range reqBody.Metrics { + stats, f, t, err := memoryStore.Stats(selector, metric, from, to) + if err != nil { + http.Error(rw, err.Error(), http.StatusInternalServerError) + return + } + + metrics[metric] = ApiStatsData{ + From: f, + To: t, + Samples: stats.Samples, + Avg: stats.Avg, + Min: stats.Min, + Max: stats.Max, + } + } + res = append(res, metrics) + } + + rw.Header().Set("Content-Type", "application/json") + err = json.NewEncoder(rw).Encode(res) + if err != nil { + log.Println(err.Error()) + } +} + func StartApiServer(address string, done chan bool) error { r := mux.NewRouter() r.HandleFunc("/api/{from:[0-9]+}/{to:[0-9]+}/timeseries", handleTimeseries) + r.HandleFunc("/api/{from:[0-9]+}/{to:[0-9]+}/stats", handleStats) server := &http.Server{ Handler: r, diff --git a/memstore_test.go b/memstore_test.go index 4326651..b15d91f 100644 --- a/memstore_test.go +++ b/memstore_test.go @@ -3,6 +3,7 @@ package main import ( "fmt" "math" + "math/rand" "sync" "testing" ) @@ -172,6 +173,75 @@ func TestMemoryStoreAggregation(t *testing.T) { } } +func TestMemoryStoreStats(t *testing.T) { + count := 3000 + store := NewMemoryStore(map[string]MetricConfig{ + "a": {Frequency: 1}, + "b": {Frequency: 1, Aggregation: "avg"}, + }) + + sel1 := []string{"cluster", "host1"} + sel2 := []string{"cluster", "host2", "left"} + sel3 := []string{"cluster", "host2", "right"} + + samples := 0 + asum, amin, amax := 0., math.MaxFloat32, -math.MaxFloat32 + bsum, bmin, bmax := 0., math.MaxFloat32, -math.MaxFloat32 + + for i := 0; i < count; i++ { + if i%5 == 0 { + // Skip some writes, test if samples is calculated correctly + continue + } + + samples += 1 + a := float64(rand.Int()%100 - 50) + asum += a + amin = math.Min(amin, a) + amax = math.Max(amax, a) + b := float64(rand.Int()%100 - 50) + bsum += b * 2 + bmin = math.Min(bmin, b) + bmax = math.Max(bmax, b) + + store.Write(sel1, int64(i), []Metric{ + {Name: "a", Value: Float(a)}, + }) + store.Write(sel2, int64(i), []Metric{ + {Name: "b", Value: Float(b)}, + }) + store.Write(sel3, int64(i), []Metric{ + {Name: "b", Value: Float(b)}, + }) + } + + stats, from, to, err := store.Stats(sel1, "a", 0, int64(count)) + if err != nil { + t.Fatal(err) + } + + if from != 1 || to != int64(count) || stats.Samples != samples { + t.Fatalf("unexpected: from=%d, to=%d, stats.Samples=%d (expected samples=%d)\n", from, to, stats.Samples, samples) + } + + if stats.Avg != Float(asum/float64(samples)) || stats.Min != Float(amin) || stats.Max != Float(amax) { + t.Fatalf("wrong stats: %#v\n", stats) + } + + stats, from, to, err = store.Stats([]string{"cluster", "host2"}, "b", 0, int64(count)) + if err != nil { + t.Fatal(err) + } + + if from != 1 || to != int64(count) || stats.Samples != samples*2 { + t.Fatalf("unexpected: from=%d, to=%d, stats.Samples=%d (expected samples=%d)\n", from, to, stats.Samples, samples*2) + } + + if stats.Avg != Float(bsum/float64(samples*2)) || stats.Min != Float(bmin) || stats.Max != Float(bmax) { + t.Fatalf("wrong stats: %#v (expected: avg=%f, min=%f, max=%f)\n", stats, bsum/float64(samples*2), bmin, bmax) + } +} + func TestMemoryStoreArchive(t *testing.T) { store1 := NewMemoryStore(map[string]MetricConfig{ "a": {Frequency: 1}, diff --git a/stats.go b/stats.go new file mode 100644 index 0000000..1d2b10b --- /dev/null +++ b/stats.go @@ -0,0 +1,139 @@ +package main + +import ( + "errors" + "math" +) + +type Stats struct { + Samples int + Avg Float + Min Float + Max Float +} + +// Return `Stats` by value for less allocations/GC? +func (b *buffer) stats(from, to int64) (*Stats, int64, int64, error) { + if from < b.start { + if b.prev != nil { + return b.prev.stats(from, to) + } + from = b.start + } + + samples := 0 + sum, min, max := 0.0, math.MaxFloat32, -math.MaxFloat32 + + var t int64 + for t = from; t < to; t += b.frequency { + idx := int((t - b.start) / b.frequency) + if idx >= cap(b.data) { + b = b.next + if b == nil { + break + } + idx = 0 + } + + if t < b.start || idx >= len(b.data) { + continue + } + + xf := float64(b.data[idx]) + if math.IsNaN(xf) { + continue + } + + samples += 1 + sum += xf + min = math.Min(min, xf) + max = math.Max(max, xf) + } + + return &Stats{ + Samples: samples, + Avg: Float(sum) / Float(samples), + Min: Float(min), + Max: Float(max), + }, from, t, nil +} + +// This function assmumes that `l.lock` is LOCKED! +// It basically works just like level.read but calculates min/max/avg for that data level.read would return. +// TODO: Make this DRY? +func (l *level) stats(metric string, from, to int64, aggregation string) (*Stats, int64, int64, error) { + if b, ok := l.metrics[metric]; ok { + return b.stats(from, to) + } + + if len(l.children) == 0 { + return nil, 0, 0, errors.New("no data for that metric/level") + } + + if len(l.children) == 1 { + for _, child := range l.children { + child.lock.Lock() + stats, from, to, err := child.stats(metric, from, to, aggregation) + child.lock.Unlock() + return stats, from, to, err + } + } + + samples := 0 + avgSum, min, max := Float(0), Float(math.MaxFloat32), Float(-math.MaxFloat32) + for _, child := range l.children { + child.lock.Lock() + stats, cfrom, cto, err := child.stats(metric, from, to, aggregation) + child.lock.Unlock() + + if err != nil { + return nil, 0, 0, err + } + + if cfrom != from || cto != to { + // See level.read for more on this + if samples == 0 { + from = cfrom + to = cto + } else { + return nil, 0, 0, errors.New("data for metrics at child levels does not align") + } + } + + samples += stats.Samples + avgSum += stats.Avg + min = Float(math.Min(float64(min), float64(stats.Min))) + max = Float(math.Max(float64(max), float64(stats.Max))) + } + + avg := avgSum + if aggregation == "avg" { + avg /= Float(len(l.children)) + } else if aggregation != "sum" { + return nil, 0, 0, errors.New("invalid aggregation strategy: " + aggregation) + } + + return &Stats{ + Samples: samples, + Avg: avg, + Min: min, + Max: max, + }, from, to, nil +} + +func (m *MemoryStore) Stats(selector []string, metric string, from, to int64) (*Stats, int64, int64, error) { + l := m.root.findLevelOrCreate(selector) + l.lock.RLock() + defer l.lock.RUnlock() + + if from > to { + return nil, 0, 0, errors.New("invalid time range") + } + + minfo, ok := m.metrics[metric] + if !ok { + return nil, 0, 0, errors.New("unkown metric: " + metric) + } + + return l.stats(metric, from, to, minfo.Aggregation) +}