mirror of
https://github.com/ClusterCockpit/cc-metric-store.git
synced 2024-12-27 09:19:06 +01:00
Add API endpoints, update README.md
This commit is contained in:
parent
18e0155c95
commit
a125bd5bfd
112
README.md
112
README.md
@ -1,3 +1,113 @@
|
|||||||
# ClusterCockpit Metric Store
|
# ClusterCockpit Metric Store
|
||||||
|
|
||||||
FileStore is only a code fragment. To test the memoryStore move away the fileStore and run go test.
|
Unusable yet. Go look at the [GitHub Issues](https://github.com/ClusterCockpit/cc-metric-store/issues) for a progress overview.
|
||||||
|
|
||||||
|
### REST API Endpoints
|
||||||
|
|
||||||
|
The following endpoints are implemented (not properly tested, subject to change):
|
||||||
|
|
||||||
|
- *from* and *to* need to be Unix timestamps in seconds
|
||||||
|
- *class* needs to be `node`, `socket` or `cpu` (The class of each metric is documented in [cc-metric-collector](https://github.com/ClusterCockpit/cc-metric-collector))
|
||||||
|
- If the *class* is `socket`, the hostname needs to be appended by `:s<socket-index>`. The same goes for `cpu` and `:c<cpu-index>`
|
||||||
|
- Fetch all datapoints from *from* to *to* for the hosts *h1*, *h2* and *h3* and metrics *m1* and *m2*
|
||||||
|
- Request: `GET /api/<cluster>/<class>/<from>/<to>/timeseries?host=<h1>&host=<h2>&host=<h3>&metric=<m1>&metric=<m2>&...`
|
||||||
|
- Response: `{ "m1": { "hosts": [{ "host": "h1", "start": <start-time>, "data": [1.0, 2.0, 3.0, ...] }, { "host": "h2" , ...}, { "host": "h3", ...}] }, ... }`
|
||||||
|
- Fetch the average, minimum and maximum values from *from* to *to* for the hosts *h1*, *h2* and *h3* and metrics *m1* and *m2*
|
||||||
|
- Request: `GET /api/<cluster>/<class>/<from>/<to>/stats?host=<h1>&host=<h2>&host=<h3>&metric=<m1>&metric=<m2>&...`
|
||||||
|
- Response: `{ "m1": { "hosts": [{ "host": "h1", "samples": 123, "avg": 0.5, "min": 0.0, "max": 1.0 }, ...] }, ... }`
|
||||||
|
- `samples` is the number of actual measurements taken into account. This can be lower than expected if data ponts are missing
|
||||||
|
- Fetch the newest value for each host and metric on a specified cluster
|
||||||
|
- Request: `GET /api/<cluster>/<class>/peak`
|
||||||
|
- Response: `{ "host1": { "metric1": 1., "metric2": 2., ... }, "host2": { ... }, ... }`
|
||||||
|
|
||||||
|
### Run tests
|
||||||
|
|
||||||
|
```sh
|
||||||
|
# Test the line protocol parser
|
||||||
|
go test ./lineprotocol -v
|
||||||
|
# Test the memory store
|
||||||
|
go test . -v
|
||||||
|
```
|
||||||
|
|
||||||
|
### Test the complete setup (excluding ClusterCockpit itself)
|
||||||
|
|
||||||
|
First, get a NATS server running:
|
||||||
|
|
||||||
|
```sh
|
||||||
|
# Only needed once, downloads the docker image
|
||||||
|
docker pull nats:latest
|
||||||
|
|
||||||
|
# Start the NATS server
|
||||||
|
docker run -p 4222:4222 -ti nats:latest
|
||||||
|
```
|
||||||
|
|
||||||
|
Second, build and start start the [cc-metric-collector](https://github.com/ClusterCockpit/cc-metric-collector) using the following as `config.json`:
|
||||||
|
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"sink": {
|
||||||
|
"type": "nats",
|
||||||
|
"host": "localhost",
|
||||||
|
"port": "4222",
|
||||||
|
"database": "updates"
|
||||||
|
},
|
||||||
|
"interval" : 3,
|
||||||
|
"duration" : 1,
|
||||||
|
"collectors": [ "likwid", "loadavg" ],
|
||||||
|
"default_tags": { "cluster": "testcluster" },
|
||||||
|
"receiver": { "type": "none" }
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
Third, build and start the metric store.
|
||||||
|
|
||||||
|
```sh
|
||||||
|
# Assuming you have a clone of this repo in ./cc-metric-store:
|
||||||
|
cd cc-metric-store
|
||||||
|
go get
|
||||||
|
go build
|
||||||
|
./cc-metric-store
|
||||||
|
```
|
||||||
|
|
||||||
|
Use this as `config.json`:
|
||||||
|
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"metrics": {
|
||||||
|
"node": {
|
||||||
|
"frequency": 3,
|
||||||
|
"metrics": ["load_five", "load_fifteen", "proc_total", "proc_run", "load_one"]
|
||||||
|
},
|
||||||
|
"socket": {
|
||||||
|
"frequency": 3,
|
||||||
|
"metrics": ["power", "mem_bw"]
|
||||||
|
},
|
||||||
|
"cpu": {
|
||||||
|
"frequency": 3,
|
||||||
|
"metrics": ["flops_sp", "flops_dp", "flops_any", "clock", "cpi"]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
And finally, use the API to fetch some data:
|
||||||
|
|
||||||
|
```sh
|
||||||
|
# If the collector and store and nats-server have been running for at least 60 seconds on the same host, you may run:
|
||||||
|
curl -D - "http://localhost:8080/api/testcluster/node/$(expr $(date +%s) - 60)/$(date +%s)/timeseries?metric=load_one&host=$(hostname)"
|
||||||
|
|
||||||
|
# or:
|
||||||
|
curl -D - "http://localhost:8080/api/testcluster/socket/$(expr $(date +%s) - 60)/$(date +%s)/timeseries?metric=mem_bw&metric=power&host=$(hostname):s0"
|
||||||
|
|
||||||
|
# or:
|
||||||
|
curl -D - "http://localhost:8080/api/testcluster/cpu/$(expr $(date +%s) - 60)/$(date +%s)/timeseries?metric=flops_any&host=$(hostname):c0&host=$(hostname):c1"
|
||||||
|
|
||||||
|
# or:
|
||||||
|
curl -D - "http://localhost:8080/api/testcluster/node/peak"
|
||||||
|
|
||||||
|
# or:
|
||||||
|
curl -D - "http://localhost:8080/api/testcluster/socket/$(expr $(date +%s) - 60)/$(date +%s)/stats?metric=mem_bw&metric=power&host=$(hostname):s0"
|
||||||
|
|
||||||
|
# ...
|
||||||
|
```
|
||||||
|
|
||||||
|
120
api.go
120
api.go
@ -4,6 +4,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"log"
|
"log"
|
||||||
|
"math"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strconv"
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
@ -22,9 +23,23 @@ type MetricData struct {
|
|||||||
Hosts []HostData `json:"hosts"`
|
Hosts []HostData `json:"hosts"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type TimeseriesNodeResponse map[string]MetricData
|
type TimeseriesResponse map[string]MetricData
|
||||||
|
|
||||||
func handleTimeseriesNode(rw http.ResponseWriter, r *http.Request) {
|
type HostStats struct {
|
||||||
|
Host string `json:"host"`
|
||||||
|
Sampels int `json:"sampels"`
|
||||||
|
Avg lineprotocol.Float `json:"avg"`
|
||||||
|
Min lineprotocol.Float `json:"min"`
|
||||||
|
Max lineprotocol.Float `json:"max"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type MetricStats struct {
|
||||||
|
Hosts []HostStats `json:"hosts"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type StatsResponse map[string]MetricStats
|
||||||
|
|
||||||
|
func handleTimeseries(rw http.ResponseWriter, r *http.Request) {
|
||||||
vars := mux.Vars(r)
|
vars := mux.Vars(r)
|
||||||
cluster := vars["cluster"]
|
cluster := vars["cluster"]
|
||||||
from, err := strconv.ParseInt(vars["from"], 10, 64)
|
from, err := strconv.ParseInt(vars["from"], 10, 64)
|
||||||
@ -46,8 +61,13 @@ func handleTimeseriesNode(rw http.ResponseWriter, r *http.Request) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
response := TimeseriesNodeResponse{}
|
response := TimeseriesResponse{}
|
||||||
store := metricStores["node"]
|
store, ok := metricStores[vars["class"]]
|
||||||
|
if !ok {
|
||||||
|
http.Error(rw, "invalid class", http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
for _, metric := range metrics {
|
for _, metric := range metrics {
|
||||||
hostsdata := []HostData{}
|
hostsdata := []HostData{}
|
||||||
for _, host := range hosts {
|
for _, host := range hosts {
|
||||||
@ -76,10 +96,100 @@ func handleTimeseriesNode(rw http.ResponseWriter, r *http.Request) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func handleStats(rw http.ResponseWriter, r *http.Request) {
|
||||||
|
vars := mux.Vars(r)
|
||||||
|
cluster := vars["cluster"]
|
||||||
|
from, err := strconv.ParseInt(vars["from"], 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
http.Error(rw, err.Error(), http.StatusBadRequest)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
to, err := strconv.ParseInt(vars["to"], 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
http.Error(rw, err.Error(), http.StatusBadRequest)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
values := r.URL.Query()
|
||||||
|
hosts := values["host"]
|
||||||
|
metrics := values["metric"]
|
||||||
|
if len(hosts) < 1 || len(metrics) < 1 {
|
||||||
|
http.Error(rw, "no hosts or metrics specified", http.StatusBadRequest)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
response := StatsResponse{}
|
||||||
|
store, ok := metricStores[vars["class"]]
|
||||||
|
if !ok {
|
||||||
|
http.Error(rw, "invalid class", http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, metric := range metrics {
|
||||||
|
hoststats := []HostStats{}
|
||||||
|
for _, host := range hosts {
|
||||||
|
key := cluster + ":" + host
|
||||||
|
min, max := math.MaxFloat64, -math.MaxFloat64
|
||||||
|
samples := 0
|
||||||
|
|
||||||
|
sum, err := store.Reduce(key, metric, from, to, func(t int64, sum, x lineprotocol.Float) lineprotocol.Float {
|
||||||
|
if math.IsNaN(float64(x)) {
|
||||||
|
return sum
|
||||||
|
}
|
||||||
|
|
||||||
|
samples += 1
|
||||||
|
min = math.Min(min, float64(x))
|
||||||
|
max = math.Max(max, float64(x))
|
||||||
|
return sum + x
|
||||||
|
}, 0.)
|
||||||
|
if err != nil {
|
||||||
|
http.Error(rw, err.Error(), http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
hoststats = append(hoststats, HostStats{
|
||||||
|
Host: host,
|
||||||
|
Sampels: samples,
|
||||||
|
Avg: sum / lineprotocol.Float(samples),
|
||||||
|
Min: lineprotocol.Float(min),
|
||||||
|
Max: lineprotocol.Float(max),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
response[metric] = MetricStats{
|
||||||
|
Hosts: hoststats,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
rw.Header().Set("Content-Type", "application/json")
|
||||||
|
err = json.NewEncoder(rw).Encode(response)
|
||||||
|
if err != nil {
|
||||||
|
log.Println(err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func handlePeak(rw http.ResponseWriter, r *http.Request) {
|
||||||
|
vars := mux.Vars(r)
|
||||||
|
cluster := vars["cluster"]
|
||||||
|
store, ok := metricStores[vars["class"]]
|
||||||
|
if !ok {
|
||||||
|
http.Error(rw, "invalid class", http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
response := store.Peak(cluster + ":")
|
||||||
|
rw.Header().Set("Content-Type", "application/json")
|
||||||
|
err := json.NewEncoder(rw).Encode(response)
|
||||||
|
if err != nil {
|
||||||
|
log.Println(err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func StartApiServer(address string, done chan bool) error {
|
func StartApiServer(address string, done chan bool) error {
|
||||||
r := mux.NewRouter()
|
r := mux.NewRouter()
|
||||||
|
|
||||||
r.HandleFunc("/api/{cluster}/timeseries/node/{from:[0-9]+}/{to:[0-9]+}", handleTimeseriesNode)
|
r.HandleFunc("/api/{cluster}/{class:(?:node|socket|cpu)}/{from:[0-9]+}/{to:[0-9]+}/timeseries", handleTimeseries)
|
||||||
|
r.HandleFunc("/api/{cluster}/{class:(?:node|socket|cpu)}/{from:[0-9]+}/{to:[0-9]+}/stats", handleStats)
|
||||||
|
r.HandleFunc("/api/{cluster}/{class:(?:node|socket|cpu)}/peak", handlePeak)
|
||||||
|
|
||||||
server := &http.Server{
|
server := &http.Server{
|
||||||
Handler: r,
|
Handler: r,
|
||||||
|
10
config.json
10
config.json
@ -2,7 +2,15 @@
|
|||||||
"metrics": {
|
"metrics": {
|
||||||
"node": {
|
"node": {
|
||||||
"frequency": 3,
|
"frequency": 3,
|
||||||
"metrics": ["load_one", "load_five", "load_fifteen", "proc_total", "proc_run"]
|
"metrics": ["load_five", "load_fifteen", "proc_total", "proc_run", "load_one"]
|
||||||
|
},
|
||||||
|
"socket": {
|
||||||
|
"frequency": 3,
|
||||||
|
"metrics": ["power", "mem_bw"]
|
||||||
|
},
|
||||||
|
"cpu": {
|
||||||
|
"frequency": 3,
|
||||||
|
"metrics": ["flops_sp", "flops_dp", "flops_any", "clock", "cpi"]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user