From a125bd5bfd400c0558c0b93b1c4c74c333a6faed Mon Sep 17 00:00:00 2001 From: Lou Knauer Date: Tue, 24 Aug 2021 10:41:30 +0200 Subject: [PATCH] Add API endpoints, update README.md --- README.md | 112 +++++++++++++++++++++++++++++++++++++++++++++++- api.go | 120 +++++++++++++++++++++++++++++++++++++++++++++++++--- config.json | 10 ++++- 3 files changed, 235 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index 16f9052..a48d7e1 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,113 @@ # ClusterCockpit Metric Store -FileStore is only a code fragment. To test the memoryStore move away the fileStore and run go test. +Unusable yet. Go look at the [GitHub Issues](https://github.com/ClusterCockpit/cc-metric-store/issues) for a progress overview. + +### REST API Endpoints + +The following endpoints are implemented (not properly tested, subject to change): + +- *from* and *to* need to be Unix timestamps in seconds +- *class* needs to be `node`, `socket` or `cpu` (The class of each metric is documented in [cc-metric-collector](https://github.com/ClusterCockpit/cc-metric-collector)) +- If the *class* is `socket`, the hostname needs to be appended by `:s`. The same goes for `cpu` and `:c` +- Fetch all datapoints from *from* to *to* for the hosts *h1*, *h2* and *h3* and metrics *m1* and *m2* + - Request: `GET /api/////timeseries?host=

&host=

&host=

&metric=&metric=&...` + - Response: `{ "m1": { "hosts": [{ "host": "h1", "start": , "data": [1.0, 2.0, 3.0, ...] }, { "host": "h2" , ...}, { "host": "h3", ...}] }, ... }` +- Fetch the average, minimum and maximum values from *from* to *to* for the hosts *h1*, *h2* and *h3* and metrics *m1* and *m2* + - Request: `GET /api/////stats?host=

&host=

&host=

&metric=&metric=&...` + - Response: `{ "m1": { "hosts": [{ "host": "h1", "samples": 123, "avg": 0.5, "min": 0.0, "max": 1.0 }, ...] }, ... }` + - `samples` is the number of actual measurements taken into account. This can be lower than expected if data ponts are missing +- Fetch the newest value for each host and metric on a specified cluster + - Request: `GET /api///peak` + - Response: `{ "host1": { "metric1": 1., "metric2": 2., ... }, "host2": { ... }, ... }` + +### Run tests + +```sh +# Test the line protocol parser +go test ./lineprotocol -v +# Test the memory store +go test . -v +``` + +### Test the complete setup (excluding ClusterCockpit itself) + +First, get a NATS server running: + +```sh +# Only needed once, downloads the docker image +docker pull nats:latest + +# Start the NATS server +docker run -p 4222:4222 -ti nats:latest +``` + +Second, build and start start the [cc-metric-collector](https://github.com/ClusterCockpit/cc-metric-collector) using the following as `config.json`: + +```json +{ + "sink": { + "type": "nats", + "host": "localhost", + "port": "4222", + "database": "updates" + }, + "interval" : 3, + "duration" : 1, + "collectors": [ "likwid", "loadavg" ], + "default_tags": { "cluster": "testcluster" }, + "receiver": { "type": "none" } +} +``` + +Third, build and start the metric store. + +```sh +# Assuming you have a clone of this repo in ./cc-metric-store: +cd cc-metric-store +go get +go build +./cc-metric-store +``` + +Use this as `config.json`: + +```json +{ + "metrics": { + "node": { + "frequency": 3, + "metrics": ["load_five", "load_fifteen", "proc_total", "proc_run", "load_one"] + }, + "socket": { + "frequency": 3, + "metrics": ["power", "mem_bw"] + }, + "cpu": { + "frequency": 3, + "metrics": ["flops_sp", "flops_dp", "flops_any", "clock", "cpi"] + } + } +} +``` + +And finally, use the API to fetch some data: + +```sh +# If the collector and store and nats-server have been running for at least 60 seconds on the same host, you may run: +curl -D - "http://localhost:8080/api/testcluster/node/$(expr $(date +%s) - 60)/$(date +%s)/timeseries?metric=load_one&host=$(hostname)" + +# or: +curl -D - "http://localhost:8080/api/testcluster/socket/$(expr $(date +%s) - 60)/$(date +%s)/timeseries?metric=mem_bw&metric=power&host=$(hostname):s0" + +# or: +curl -D - "http://localhost:8080/api/testcluster/cpu/$(expr $(date +%s) - 60)/$(date +%s)/timeseries?metric=flops_any&host=$(hostname):c0&host=$(hostname):c1" + +# or: +curl -D - "http://localhost:8080/api/testcluster/node/peak" + +# or: +curl -D - "http://localhost:8080/api/testcluster/socket/$(expr $(date +%s) - 60)/$(date +%s)/stats?metric=mem_bw&metric=power&host=$(hostname):s0" + +# ... +``` + diff --git a/api.go b/api.go index 39e198f..20f93eb 100644 --- a/api.go +++ b/api.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "log" + "math" "net/http" "strconv" "time" @@ -22,9 +23,23 @@ type MetricData struct { Hosts []HostData `json:"hosts"` } -type TimeseriesNodeResponse map[string]MetricData +type TimeseriesResponse map[string]MetricData -func handleTimeseriesNode(rw http.ResponseWriter, r *http.Request) { +type HostStats struct { + Host string `json:"host"` + Sampels int `json:"sampels"` + Avg lineprotocol.Float `json:"avg"` + Min lineprotocol.Float `json:"min"` + Max lineprotocol.Float `json:"max"` +} + +type MetricStats struct { + Hosts []HostStats `json:"hosts"` +} + +type StatsResponse map[string]MetricStats + +func handleTimeseries(rw http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) cluster := vars["cluster"] from, err := strconv.ParseInt(vars["from"], 10, 64) @@ -46,8 +61,13 @@ func handleTimeseriesNode(rw http.ResponseWriter, r *http.Request) { return } - response := TimeseriesNodeResponse{} - store := metricStores["node"] + response := TimeseriesResponse{} + store, ok := metricStores[vars["class"]] + if !ok { + http.Error(rw, "invalid class", http.StatusInternalServerError) + return + } + for _, metric := range metrics { hostsdata := []HostData{} for _, host := range hosts { @@ -76,10 +96,100 @@ func handleTimeseriesNode(rw http.ResponseWriter, r *http.Request) { } } +func handleStats(rw http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + cluster := vars["cluster"] + from, err := strconv.ParseInt(vars["from"], 10, 64) + if err != nil { + http.Error(rw, err.Error(), http.StatusBadRequest) + return + } + to, err := strconv.ParseInt(vars["to"], 10, 64) + if err != nil { + http.Error(rw, err.Error(), http.StatusBadRequest) + return + } + + values := r.URL.Query() + hosts := values["host"] + metrics := values["metric"] + if len(hosts) < 1 || len(metrics) < 1 { + http.Error(rw, "no hosts or metrics specified", http.StatusBadRequest) + return + } + + response := StatsResponse{} + store, ok := metricStores[vars["class"]] + if !ok { + http.Error(rw, "invalid class", http.StatusInternalServerError) + return + } + + for _, metric := range metrics { + hoststats := []HostStats{} + for _, host := range hosts { + key := cluster + ":" + host + min, max := math.MaxFloat64, -math.MaxFloat64 + samples := 0 + + sum, err := store.Reduce(key, metric, from, to, func(t int64, sum, x lineprotocol.Float) lineprotocol.Float { + if math.IsNaN(float64(x)) { + return sum + } + + samples += 1 + min = math.Min(min, float64(x)) + max = math.Max(max, float64(x)) + return sum + x + }, 0.) + if err != nil { + http.Error(rw, err.Error(), http.StatusInternalServerError) + return + } + + hoststats = append(hoststats, HostStats{ + Host: host, + Sampels: samples, + Avg: sum / lineprotocol.Float(samples), + Min: lineprotocol.Float(min), + Max: lineprotocol.Float(max), + }) + } + response[metric] = MetricStats{ + Hosts: hoststats, + } + } + + rw.Header().Set("Content-Type", "application/json") + err = json.NewEncoder(rw).Encode(response) + if err != nil { + log.Println(err.Error()) + } +} + +func handlePeak(rw http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + cluster := vars["cluster"] + store, ok := metricStores[vars["class"]] + if !ok { + http.Error(rw, "invalid class", http.StatusInternalServerError) + return + } + + response := store.Peak(cluster + ":") + rw.Header().Set("Content-Type", "application/json") + err := json.NewEncoder(rw).Encode(response) + if err != nil { + log.Println(err.Error()) + } +} + func StartApiServer(address string, done chan bool) error { r := mux.NewRouter() - r.HandleFunc("/api/{cluster}/timeseries/node/{from:[0-9]+}/{to:[0-9]+}", handleTimeseriesNode) + r.HandleFunc("/api/{cluster}/{class:(?:node|socket|cpu)}/{from:[0-9]+}/{to:[0-9]+}/timeseries", handleTimeseries) + r.HandleFunc("/api/{cluster}/{class:(?:node|socket|cpu)}/{from:[0-9]+}/{to:[0-9]+}/stats", handleStats) + r.HandleFunc("/api/{cluster}/{class:(?:node|socket|cpu)}/peak", handlePeak) server := &http.Server{ Handler: r, diff --git a/config.json b/config.json index 5298ab1..644ec7c 100644 --- a/config.json +++ b/config.json @@ -2,7 +2,15 @@ "metrics": { "node": { "frequency": 3, - "metrics": ["load_one", "load_five", "load_fifteen", "proc_total", "proc_run"] + "metrics": ["load_five", "load_fifteen", "proc_total", "proc_run", "load_one"] + }, + "socket": { + "frequency": 3, + "metrics": ["power", "mem_bw"] + }, + "cpu": { + "frequency": 3, + "metrics": ["flops_sp", "flops_dp", "flops_any", "clock", "cpi"] } } }