diff --git a/.gitignore b/.gitignore index 81accb2..e1cd189 100644 --- a/.gitignore +++ b/.gitignore @@ -18,3 +18,9 @@ # Project specific ignores /var /configs + +sample.txt + +migrateTimestamps.pl +test_ccms_api.sh +test_ccms_free_api.sh diff --git a/api/swagger.json b/api/swagger.json index 402fd11..08522ca 100644 --- a/api/swagger.json +++ b/api/swagger.json @@ -75,7 +75,7 @@ } }, "/free/": { - "get": { + "post": { "security": [ { "ApiKeyAuth": [] @@ -273,6 +273,9 @@ "min": { "type": "number" }, + "resolution": { + "type": "integer" + }, "to": { "type": "integer" } @@ -290,6 +293,9 @@ "metric": { "type": "string" }, + "resolution": { + "type": "integer" + }, "scale-by": { "type": "number" }, diff --git a/api/swagger.yaml b/api/swagger.yaml index 51c7322..b14ba8a 100644 --- a/api/swagger.yaml +++ b/api/swagger.yaml @@ -16,6 +16,8 @@ definitions: type: number min: type: number + resolution: + type: integer to: type: integer type: object @@ -27,6 +29,8 @@ definitions: type: string metric: type: string + resolution: + type: integer scale-by: type: number subtype: @@ -137,7 +141,7 @@ paths: tags: - debug /free/: - get: + post: parameters: - description: up to timestamp in: query diff --git a/config.json b/config.json index 6bd9f84..23d8a10 100644 --- a/config.json +++ b/config.json @@ -1,45 +1,185 @@ { - "metrics": { - "flops_any": { - "frequency": 15, - "aggregation": "sum" + "metrics": { + "debug_metric": { + "frequency": 60, + "aggregation": "avg" + }, + "clock": { + "frequency": 60, + "aggregation": "avg" + }, + "cpu_idle": { + "frequency": 60, + "aggregation": "avg" + }, + "cpu_iowait": { + "frequency": 60, + "aggregation": "avg" + }, + "cpu_irq": { + "frequency": 60, + "aggregation": "avg" + }, + "cpu_system": { + "frequency": 60, + "aggregation": "avg" + }, + "cpu_user": { + "frequency": 60, + "aggregation": "avg" + }, + "nv_mem_util": { + "frequency": 60, + "aggregation": "avg" + }, + "nv_temp": { + "frequency": 60, + "aggregation": "avg" + }, + "nv_sm_clock": { + "frequency": 60, + "aggregation": "avg" + }, + "acc_utilization": { + "frequency": 60, + "aggregation": "avg" + }, + "acc_mem_used": { + "frequency": 60, + "aggregation": "sum" + }, + "acc_power": { + "frequency": 60, + "aggregation": "sum" + }, + "flops_any": { + "frequency": 60, + "aggregation": "sum" + }, + "flops_dp": { + "frequency": 60, + "aggregation": "sum" + }, + "flops_sp": { + "frequency": 60, + "aggregation": "sum" + }, + "ib_recv": { + "frequency": 60, + "aggregation": "sum" + }, + "ib_xmit": { + "frequency": 60, + "aggregation": "sum" + }, + "ib_recv_pkts": { + "frequency": 60, + "aggregation": "sum" + }, + "ib_xmit_pkts": { + "frequency": 60, + "aggregation": "sum" + }, + "cpu_power": { + "frequency": 60, + "aggregation": "sum" + }, + "core_power": { + "frequency": 60, + "aggregation": "sum" + }, + "mem_power": { + "frequency": 60, + "aggregation": "sum" + }, + "ipc": { + "frequency": 60, + "aggregation": "avg" + }, + "cpu_load": { + "frequency": 60, + "aggregation": null + }, + "lustre_close": { + "frequency": 60, + "aggregation": null + }, + "lustre_open": { + "frequency": 60, + "aggregation": null + }, + "lustre_statfs": { + "frequency": 60, + "aggregation": null + }, + "lustre_read_bytes": { + "frequency": 60, + "aggregation": null + }, + "lustre_write_bytes": { + "frequency": 60, + "aggregation": null + }, + "net_bw": { + "frequency": 60, + "aggregation": null + }, + "file_bw": { + "frequency": 60, + "aggregation": null + }, + "mem_bw": { + "frequency": 60, + "aggregation": "sum" + }, + "mem_cached": { + "frequency": 60, + "aggregation": null + }, + "mem_used": { + "frequency": 60, + "aggregation": null + }, + "net_bytes_in": { + "frequency": 60, + "aggregation": null + }, + "net_bytes_out": { + "frequency": 60, + "aggregation": null + }, + "nfs4_read": { + "frequency": 60, + "aggregation": null + }, + "nfs4_total": { + "frequency": 60, + "aggregation": null + }, + "nfs4_write": { + "frequency": 60, + "aggregation": null + }, + "vectorization_ratio": { + "frequency": 60, + "aggregation": "avg" + } }, - "flops_dp": { - "frequency": 15, - "aggregation": "sum" + "checkpoints": { + "interval": "1h", + "directory": "./var/checkpoints", + "restore": "1h" }, - "flops_sp": { - "frequency": 15, - "aggregation": "sum" + "archive": { + "interval": "2h", + "directory": "./var/archive" }, - "mem_bw": { - "frequency": 15, - "aggregation": "sum" + "http-api": { + "address": "localhost:8082", + "https-cert-file": null, + "https-key-file": null }, - "load_one": { - "frequency": 15, - "aggregation": null - }, - "load_five": { - "frequency": 15, - "aggregation": null - } - }, - "checkpoints": { - "interval": "12h", - "directory": "./var/checkpoints", - "restore": "48h" - }, - "archive": { - "interval": "168h", - "directory": "./var/archive" - }, - "http-api": { - "address": "127.0.0.1:8081", - "https-cert-file": null, - "https-key-file": null - }, - "retention-in-memory": "48h", - "nats": null, - "jwt-public-key": "kzfYrYy+TzpanWZHJ5qSdMj5uKUWgq74BWhQG6copP0=" -} + "retention-in-memory": "1h", + "nats": null, + "jwt-public-key": "kzfYrYy+TzpanWZHJ5qSdMj5uKUWgq74BWhQG6copP0=" +} \ No newline at end of file diff --git a/internal/api/api.go b/internal/api/api.go index b5e55fa..6e5968c 100644 --- a/internal/api/api.go +++ b/internal/api/api.go @@ -47,13 +47,14 @@ type ErrorResponse struct { } type ApiMetricData struct { - Error *string `json:"error,omitempty"` - Data util.FloatArray `json:"data,omitempty"` - From int64 `json:"from"` - To int64 `json:"to"` - Avg util.Float `json:"avg"` - Min util.Float `json:"min"` - Max util.Float `json:"max"` + Error *string `json:"error,omitempty"` + Data util.FloatArray `json:"data,omitempty"` + From int64 `json:"from"` + To int64 `json:"to"` + Resolution int64 `json:"resolution"` + Avg util.Float `json:"avg"` + Min util.Float `json:"min"` + Max util.Float `json:"max"` } func handleError(err error, statusCode int, rw http.ResponseWriter) { @@ -135,7 +136,7 @@ func (data *ApiMetricData) PadDataWithNull(ms *memorystore.MemoryStore, from, to // @failure 403 {object} api.ErrorResponse "Forbidden" // @failure 500 {object} api.ErrorResponse "Internal Server Error" // @security ApiKeyAuth -// @router /free/ [get] +// @router /free/ [post] func handleFree(rw http.ResponseWriter, r *http.Request) { rawTo := r.URL.Query().Get("to") if rawTo == "" { @@ -234,6 +235,7 @@ type ApiQuery struct { SubType *string `json:"subtype,omitempty"` Metric string `json:"metric"` Hostname string `json:"host"` + Resolution int64 `json:"resolution"` TypeIds []string `json:"type-ids,omitempty"` SubTypeIds []string `json:"subtype-ids,omitempty"` ScaleFactor util.Float `json:"scale-by,omitempty"` @@ -256,6 +258,10 @@ type ApiQuery struct { // @router /query/ [get] func handleQuery(rw http.ResponseWriter, r *http.Request) { var err error + ver := r.URL.Query().Get("version") + if ver == "" { + ver = "v2" + } req := ApiQueryRequest{WithStats: true, WithData: true, WithPadding: true} if err := json.NewDecoder(r.Body).Decode(&req); err != nil { handleError(err, http.StatusBadRequest, rw) @@ -336,8 +342,11 @@ func handleQuery(rw http.ResponseWriter, r *http.Request) { res := make([]ApiMetricData, 0, len(sels)) for _, sel := range sels { data := ApiMetricData{} - data.Data, data.From, data.To, err = ms.Read(sel, query.Metric, req.From, req.To) - // log.Printf("data: %#v, %#v, %#v, %#v", data.Data, data.From, data.To, err) + if ver == "v1" { + data.Data, data.From, data.To, data.Resolution, err = ms.Read(sel, query.Metric, req.From, req.To, 0) + } else { + data.Data, data.From, data.To, data.Resolution, err = ms.Read(sel, query.Metric, req.From, req.To, query.Resolution) + } if err != nil { msg := err.Error() data.Error = &msg diff --git a/internal/api/docs.go b/internal/api/docs.go index 9d49c13..84cec70 100644 --- a/internal/api/docs.go +++ b/internal/api/docs.go @@ -81,7 +81,7 @@ const docTemplate = `{ } }, "/free/": { - "get": { + "post": { "security": [ { "ApiKeyAuth": [] @@ -279,6 +279,9 @@ const docTemplate = `{ "min": { "type": "number" }, + "resolution": { + "type": "integer" + }, "to": { "type": "integer" } @@ -296,6 +299,9 @@ const docTemplate = `{ "metric": { "type": "string" }, + "resolution": { + "type": "integer" + }, "scale-by": { "type": "number" }, diff --git a/internal/api/lineprotocol.go b/internal/api/lineprotocol.go index 9081638..f675a3a 100644 --- a/internal/api/lineprotocol.go +++ b/internal/api/lineprotocol.go @@ -255,7 +255,8 @@ func decodeLine(dec *lineprotocol.Decoder, if len(subTypeBuf) == 0 { subTypeBuf = append(subTypeBuf, val...) } else { - subTypeBuf = reorder(typeBuf, val) + subTypeBuf = reorder(subTypeBuf, val) + // subTypeBuf = reorder(typeBuf, val) } case "stype-id": subTypeBuf = append(subTypeBuf, val...) @@ -308,7 +309,7 @@ func decodeLine(dec *lineprotocol.Decoder, } if t, err = dec.Time(lineprotocol.Second, t); err != nil { - return err + return fmt.Errorf("timestamp : %#v with error : %#v", lineprotocol.Second, err.Error()) } if err := ms.WriteToLevel(lvl, selector, t.Unix(), []memorystore.Metric{metric}); err != nil { diff --git a/internal/api/server.go b/internal/api/server.go index 3f7bcd9..3b69c4c 100644 --- a/internal/api/server.go +++ b/internal/api/server.go @@ -20,11 +20,23 @@ func MountRoutes(r *http.ServeMux) { log.Fatalf("starting server failed: %v", err) } publicKey := ed25519.PublicKey(buf) + // Compatibility + r.Handle("POST /api/free", authHandler(http.HandlerFunc(handleFree), publicKey)) + r.Handle("POST /api/write", authHandler(http.HandlerFunc(handleWrite), publicKey)) + r.Handle("GET /api/query", authHandler(http.HandlerFunc(handleQuery), publicKey)) + r.Handle("GET /api/debug", authHandler(http.HandlerFunc(handleDebug), publicKey)) + // Refactor r.Handle("POST /api/free/", authHandler(http.HandlerFunc(handleFree), publicKey)) r.Handle("POST /api/write/", authHandler(http.HandlerFunc(handleWrite), publicKey)) r.Handle("GET /api/query/", authHandler(http.HandlerFunc(handleQuery), publicKey)) r.Handle("GET /api/debug/", authHandler(http.HandlerFunc(handleDebug), publicKey)) } else { + // Compatibility + r.HandleFunc("POST /api/free", handleFree) + r.HandleFunc("POST /api/write", handleWrite) + r.HandleFunc("GET /api/query", handleQuery) + r.HandleFunc("GET /api/debug", handleDebug) + // Refactor r.HandleFunc("POST /api/free/", handleFree) r.HandleFunc("POST /api/write/", handleWrite) r.HandleFunc("GET /api/query/", handleQuery) diff --git a/internal/memorystore/checkpoint.go b/internal/memorystore/checkpoint.go index 9b036d5..93585b6 100644 --- a/internal/memorystore/checkpoint.go +++ b/internal/memorystore/checkpoint.go @@ -478,8 +478,10 @@ func findFiles(direntries []fs.DirEntry, t int64, findMoreRecentFiles bool) ([]s e := direntries[i] ts1 := nums[e.Name()] - if findMoreRecentFiles && t <= ts1 || i == len(direntries)-1 { + if findMoreRecentFiles && t <= ts1 { filenames = append(filenames, e.Name()) + } + if i == len(direntries)-1 { continue } diff --git a/internal/memorystore/level.go b/internal/memorystore/level.go index 4bbfe7c..f5f133a 100644 --- a/internal/memorystore/level.go +++ b/internal/memorystore/level.go @@ -107,6 +107,10 @@ func (l *Level) sizeInBytes() int64 { } } + for _, child := range l.children { + size += child.sizeInBytes() + } + return size } diff --git a/internal/memorystore/memorystore.go b/internal/memorystore/memorystore.go index 4868a85..229ab00 100644 --- a/internal/memorystore/memorystore.go +++ b/internal/memorystore/memorystore.go @@ -10,6 +10,7 @@ import ( "github.com/ClusterCockpit/cc-metric-store/internal/config" "github.com/ClusterCockpit/cc-metric-store/internal/util" + "github.com/ClusterCockpit/cc-metric-store/pkg/resampler" ) var ( @@ -178,17 +179,18 @@ func (m *MemoryStore) WriteToLevel(l *Level, selector []string, ts int64, metric // If the level does not hold the metric itself, the data will be aggregated recursively from the children. // The second and third return value are the actual from/to for the data. Those can be different from // the range asked for if no data was available. -func (m *MemoryStore) Read(selector util.Selector, metric string, from, to int64) ([]util.Float, int64, int64, error) { +func (m *MemoryStore) Read(selector util.Selector, metric string, from, to, resolution int64) ([]util.Float, int64, int64, int64, error) { if from > to { - return nil, 0, 0, errors.New("invalid time range") + return nil, 0, 0, 0, errors.New("invalid time range") } minfo, ok := m.Metrics[metric] if !ok { - return nil, 0, 0, errors.New("unkown metric: " + metric) + return nil, 0, 0, 0, errors.New("unkown metric: " + metric) } n, data := 0, make([]util.Float, (to-from)/minfo.Frequency+1) + err := m.root.findBuffers(selector, minfo.Offset, func(b *buffer) error { cdata, cfrom, cto, err := b.read(from, to, data) if err != nil { @@ -221,9 +223,9 @@ func (m *MemoryStore) Read(selector util.Selector, metric string, from, to int64 }) if err != nil { - return nil, 0, 0, err + return nil, 0, 0, 0, err } else if n == 0 { - return nil, 0, 0, errors.New("metric or host not found") + return nil, 0, 0, 0, errors.New("metric or host not found") } else if n > 1 { if minfo.Aggregation == config.AvgAggregation { normalize := 1. / util.Float(n) @@ -231,11 +233,17 @@ func (m *MemoryStore) Read(selector util.Selector, metric string, from, to int64 data[i] *= normalize } } else if minfo.Aggregation != config.SumAggregation { - return nil, 0, 0, errors.New("invalid aggregation") + return nil, 0, 0, 0, errors.New("invalid aggregation") } } - return data, from, to, nil + data, resolution, err = resampler.LargestTriangleThreeBucket(data, minfo.Frequency, resolution) + + if err != nil { + return nil, 0, 0, 0, err + } + + return data, from, to, resolution, nil } // Release all buffers for the selected level and all its children that contain only diff --git a/pkg/resampler/resampler.go b/pkg/resampler/resampler.go new file mode 100644 index 0000000..59288c5 --- /dev/null +++ b/pkg/resampler/resampler.go @@ -0,0 +1,122 @@ +package resampler + +import ( + "errors" + "fmt" + "math" + + "github.com/ClusterCockpit/cc-metric-store/internal/util" +) + +func SimpleResampler(data []util.Float, old_frequency int64, new_frequency int64) ([]util.Float, error) { + if old_frequency == 0 || new_frequency == 0 { + return nil, errors.New("either old or new frequency is set to 0") + } + + if new_frequency%old_frequency != 0 { + return nil, errors.New("new sampling frequency should be multiple of the old frequency") + } + + var step int = int(new_frequency / old_frequency) + var new_data_length = len(data) / step + + if new_data_length == 0 || len(data) < 100 || new_data_length >= len(data) { + return data, nil + } + + new_data := make([]util.Float, new_data_length) + + for i := 0; i < new_data_length; i++ { + new_data[i] = data[i*step] + } + + return new_data, nil +} + +// Inspired by one of the algorithms from https://skemman.is/bitstream/1946/15343/3/SS_MSthesis.pdf +// Adapted from https://github.com/haoel/downsampling/blob/master/core/lttb.go +func LargestTriangleThreeBucket(data []util.Float, old_frequency int64, new_frequency int64) ([]util.Float, int64, error) { + + if old_frequency == 0 || new_frequency == 0 { + return data, old_frequency, nil + } + + if new_frequency%old_frequency != 0 { + return nil, 0, fmt.Errorf("new sampling frequency : %d should be multiple of the old frequency : %d", new_frequency, old_frequency) + } + + var step int = int(new_frequency / old_frequency) + var new_data_length = len(data) / step + + if new_data_length == 0 || len(data) < 100 || new_data_length >= len(data) { + return data, old_frequency, nil + } + + new_data := make([]util.Float, 0, new_data_length) + + // Bucket size. Leave room for start and end data points + bucketSize := float64(len(data)-2) / float64(new_data_length-2) + + new_data = append(new_data, data[0]) // Always add the first point + + // We have 3 pointers represent for + // > bucketLow - the current bucket's beginning location + // > bucketMiddle - the current bucket's ending location, + // also the beginning location of next bucket + // > bucketHight - the next bucket's ending location. + bucketLow := 1 + bucketMiddle := int(math.Floor(bucketSize)) + 1 + + var prevMaxAreaPoint int + + for i := 0; i < new_data_length-2; i++ { + + bucketHigh := int(math.Floor(float64(i+2)*bucketSize)) + 1 + if bucketHigh >= len(data)-1 { + bucketHigh = len(data) - 2 + } + + // Calculate point average for next bucket (containing c) + avgPointX, avgPointY := calculateAverageDataPoint(data[bucketMiddle:bucketHigh+1], int64(bucketMiddle)) + + // Get the range for current bucket + currBucketStart := bucketLow + currBucketEnd := bucketMiddle + + // Point a + pointX := prevMaxAreaPoint + pointY := data[prevMaxAreaPoint] + + maxArea := -1.0 + + var maxAreaPoint int + flag_ := 0 + for ; currBucketStart < currBucketEnd; currBucketStart++ { + + area := calculateTriangleArea(util.Float(pointX), pointY, avgPointX, avgPointY, util.Float(currBucketStart), data[currBucketStart]) + if area > maxArea { + maxArea = area + maxAreaPoint = currBucketStart + } + if math.IsNaN(float64(avgPointY)) { + flag_ = 1 + } + } + + if flag_ == 1 { + new_data = append(new_data, util.NaN) // Pick this point from the bucket + + } else { + new_data = append(new_data, data[maxAreaPoint]) // Pick this point from the bucket + } + prevMaxAreaPoint = maxAreaPoint // This MaxArea point is the next's prevMAxAreaPoint + + //move to the next window + bucketLow = bucketMiddle + bucketMiddle = bucketHigh + } + + new_data = append(new_data, data[len(data)-1]) // Always add last + + return new_data, new_frequency, nil +} diff --git a/pkg/resampler/util.go b/pkg/resampler/util.go new file mode 100644 index 0000000..61397bf --- /dev/null +++ b/pkg/resampler/util.go @@ -0,0 +1,35 @@ +package resampler + +import ( + "math" + + "github.com/ClusterCockpit/cc-metric-store/internal/util" +) + +func calculateTriangleArea(paX, paY, pbX, pbY, pcX, pcY util.Float) float64 { + area := ((paX-pcX)*(pbY-paY) - (paX-pbX)*(pcY-paY)) * 0.5 + return math.Abs(float64(area)) +} + +func calculateAverageDataPoint(points []util.Float, xStart int64) (avgX util.Float, avgY util.Float) { + flag := 0 + for _, point := range points { + avgX += util.Float(xStart) + avgY += point + xStart++ + if math.IsNaN(float64(point)) { + flag = 1 + } + } + + l := util.Float(len(points)) + + avgX /= l + avgY /= l + + if flag == 1 { + return avgX, util.NaN + } else { + return avgX, avgY + } +}