Merge pull request #23 from ClusterCockpit/devel

Resampling support. Endpoint versioning.
This commit is contained in:
Jan Eitzinger 2024-10-21 09:43:04 +02:00 committed by GitHub
commit 2c2e7decc8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 418 additions and 63 deletions

6
.gitignore vendored
View File

@ -18,3 +18,9 @@
# Project specific ignores # Project specific ignores
/var /var
/configs /configs
sample.txt
migrateTimestamps.pl
test_ccms_api.sh
test_ccms_free_api.sh

View File

@ -75,7 +75,7 @@
} }
}, },
"/free/": { "/free/": {
"get": { "post": {
"security": [ "security": [
{ {
"ApiKeyAuth": [] "ApiKeyAuth": []
@ -273,6 +273,9 @@
"min": { "min": {
"type": "number" "type": "number"
}, },
"resolution": {
"type": "integer"
},
"to": { "to": {
"type": "integer" "type": "integer"
} }
@ -290,6 +293,9 @@
"metric": { "metric": {
"type": "string" "type": "string"
}, },
"resolution": {
"type": "integer"
},
"scale-by": { "scale-by": {
"type": "number" "type": "number"
}, },

View File

@ -16,6 +16,8 @@ definitions:
type: number type: number
min: min:
type: number type: number
resolution:
type: integer
to: to:
type: integer type: integer
type: object type: object
@ -27,6 +29,8 @@ definitions:
type: string type: string
metric: metric:
type: string type: string
resolution:
type: integer
scale-by: scale-by:
type: number type: number
subtype: subtype:
@ -137,7 +141,7 @@ paths:
tags: tags:
- debug - debug
/free/: /free/:
get: post:
parameters: parameters:
- description: up to timestamp - description: up to timestamp
in: query in: query

View File

@ -1,45 +1,185 @@
{ {
"metrics": { "metrics": {
"flops_any": { "debug_metric": {
"frequency": 15, "frequency": 60,
"aggregation": "sum" "aggregation": "avg"
},
"clock": {
"frequency": 60,
"aggregation": "avg"
},
"cpu_idle": {
"frequency": 60,
"aggregation": "avg"
},
"cpu_iowait": {
"frequency": 60,
"aggregation": "avg"
},
"cpu_irq": {
"frequency": 60,
"aggregation": "avg"
},
"cpu_system": {
"frequency": 60,
"aggregation": "avg"
},
"cpu_user": {
"frequency": 60,
"aggregation": "avg"
},
"nv_mem_util": {
"frequency": 60,
"aggregation": "avg"
},
"nv_temp": {
"frequency": 60,
"aggregation": "avg"
},
"nv_sm_clock": {
"frequency": 60,
"aggregation": "avg"
},
"acc_utilization": {
"frequency": 60,
"aggregation": "avg"
},
"acc_mem_used": {
"frequency": 60,
"aggregation": "sum"
},
"acc_power": {
"frequency": 60,
"aggregation": "sum"
},
"flops_any": {
"frequency": 60,
"aggregation": "sum"
},
"flops_dp": {
"frequency": 60,
"aggregation": "sum"
},
"flops_sp": {
"frequency": 60,
"aggregation": "sum"
},
"ib_recv": {
"frequency": 60,
"aggregation": "sum"
},
"ib_xmit": {
"frequency": 60,
"aggregation": "sum"
},
"ib_recv_pkts": {
"frequency": 60,
"aggregation": "sum"
},
"ib_xmit_pkts": {
"frequency": 60,
"aggregation": "sum"
},
"cpu_power": {
"frequency": 60,
"aggregation": "sum"
},
"core_power": {
"frequency": 60,
"aggregation": "sum"
},
"mem_power": {
"frequency": 60,
"aggregation": "sum"
},
"ipc": {
"frequency": 60,
"aggregation": "avg"
},
"cpu_load": {
"frequency": 60,
"aggregation": null
},
"lustre_close": {
"frequency": 60,
"aggregation": null
},
"lustre_open": {
"frequency": 60,
"aggregation": null
},
"lustre_statfs": {
"frequency": 60,
"aggregation": null
},
"lustre_read_bytes": {
"frequency": 60,
"aggregation": null
},
"lustre_write_bytes": {
"frequency": 60,
"aggregation": null
},
"net_bw": {
"frequency": 60,
"aggregation": null
},
"file_bw": {
"frequency": 60,
"aggregation": null
},
"mem_bw": {
"frequency": 60,
"aggregation": "sum"
},
"mem_cached": {
"frequency": 60,
"aggregation": null
},
"mem_used": {
"frequency": 60,
"aggregation": null
},
"net_bytes_in": {
"frequency": 60,
"aggregation": null
},
"net_bytes_out": {
"frequency": 60,
"aggregation": null
},
"nfs4_read": {
"frequency": 60,
"aggregation": null
},
"nfs4_total": {
"frequency": 60,
"aggregation": null
},
"nfs4_write": {
"frequency": 60,
"aggregation": null
},
"vectorization_ratio": {
"frequency": 60,
"aggregation": "avg"
}
}, },
"flops_dp": { "checkpoints": {
"frequency": 15, "interval": "1h",
"aggregation": "sum" "directory": "./var/checkpoints",
"restore": "1h"
}, },
"flops_sp": { "archive": {
"frequency": 15, "interval": "2h",
"aggregation": "sum" "directory": "./var/archive"
}, },
"mem_bw": { "http-api": {
"frequency": 15, "address": "localhost:8082",
"aggregation": "sum" "https-cert-file": null,
"https-key-file": null
}, },
"load_one": { "retention-in-memory": "1h",
"frequency": 15, "nats": null,
"aggregation": null "jwt-public-key": "kzfYrYy+TzpanWZHJ5qSdMj5uKUWgq74BWhQG6copP0="
},
"load_five": {
"frequency": 15,
"aggregation": null
}
},
"checkpoints": {
"interval": "12h",
"directory": "./var/checkpoints",
"restore": "48h"
},
"archive": {
"interval": "168h",
"directory": "./var/archive"
},
"http-api": {
"address": "127.0.0.1:8081",
"https-cert-file": null,
"https-key-file": null
},
"retention-in-memory": "48h",
"nats": null,
"jwt-public-key": "kzfYrYy+TzpanWZHJ5qSdMj5uKUWgq74BWhQG6copP0="
} }

View File

@ -47,13 +47,14 @@ type ErrorResponse struct {
} }
type ApiMetricData struct { type ApiMetricData struct {
Error *string `json:"error,omitempty"` Error *string `json:"error,omitempty"`
Data util.FloatArray `json:"data,omitempty"` Data util.FloatArray `json:"data,omitempty"`
From int64 `json:"from"` From int64 `json:"from"`
To int64 `json:"to"` To int64 `json:"to"`
Avg util.Float `json:"avg"` Resolution int64 `json:"resolution"`
Min util.Float `json:"min"` Avg util.Float `json:"avg"`
Max util.Float `json:"max"` Min util.Float `json:"min"`
Max util.Float `json:"max"`
} }
func handleError(err error, statusCode int, rw http.ResponseWriter) { func handleError(err error, statusCode int, rw http.ResponseWriter) {
@ -135,7 +136,7 @@ func (data *ApiMetricData) PadDataWithNull(ms *memorystore.MemoryStore, from, to
// @failure 403 {object} api.ErrorResponse "Forbidden" // @failure 403 {object} api.ErrorResponse "Forbidden"
// @failure 500 {object} api.ErrorResponse "Internal Server Error" // @failure 500 {object} api.ErrorResponse "Internal Server Error"
// @security ApiKeyAuth // @security ApiKeyAuth
// @router /free/ [get] // @router /free/ [post]
func handleFree(rw http.ResponseWriter, r *http.Request) { func handleFree(rw http.ResponseWriter, r *http.Request) {
rawTo := r.URL.Query().Get("to") rawTo := r.URL.Query().Get("to")
if rawTo == "" { if rawTo == "" {
@ -234,6 +235,7 @@ type ApiQuery struct {
SubType *string `json:"subtype,omitempty"` SubType *string `json:"subtype,omitempty"`
Metric string `json:"metric"` Metric string `json:"metric"`
Hostname string `json:"host"` Hostname string `json:"host"`
Resolution int64 `json:"resolution"`
TypeIds []string `json:"type-ids,omitempty"` TypeIds []string `json:"type-ids,omitempty"`
SubTypeIds []string `json:"subtype-ids,omitempty"` SubTypeIds []string `json:"subtype-ids,omitempty"`
ScaleFactor util.Float `json:"scale-by,omitempty"` ScaleFactor util.Float `json:"scale-by,omitempty"`
@ -256,6 +258,10 @@ type ApiQuery struct {
// @router /query/ [get] // @router /query/ [get]
func handleQuery(rw http.ResponseWriter, r *http.Request) { func handleQuery(rw http.ResponseWriter, r *http.Request) {
var err error var err error
ver := r.URL.Query().Get("version")
if ver == "" {
ver = "v2"
}
req := ApiQueryRequest{WithStats: true, WithData: true, WithPadding: true} req := ApiQueryRequest{WithStats: true, WithData: true, WithPadding: true}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil { if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
handleError(err, http.StatusBadRequest, rw) handleError(err, http.StatusBadRequest, rw)
@ -336,8 +342,11 @@ func handleQuery(rw http.ResponseWriter, r *http.Request) {
res := make([]ApiMetricData, 0, len(sels)) res := make([]ApiMetricData, 0, len(sels))
for _, sel := range sels { for _, sel := range sels {
data := ApiMetricData{} data := ApiMetricData{}
data.Data, data.From, data.To, err = ms.Read(sel, query.Metric, req.From, req.To) if ver == "v1" {
// log.Printf("data: %#v, %#v, %#v, %#v", data.Data, data.From, data.To, err) data.Data, data.From, data.To, data.Resolution, err = ms.Read(sel, query.Metric, req.From, req.To, 0)
} else {
data.Data, data.From, data.To, data.Resolution, err = ms.Read(sel, query.Metric, req.From, req.To, query.Resolution)
}
if err != nil { if err != nil {
msg := err.Error() msg := err.Error()
data.Error = &msg data.Error = &msg

View File

@ -81,7 +81,7 @@ const docTemplate = `{
} }
}, },
"/free/": { "/free/": {
"get": { "post": {
"security": [ "security": [
{ {
"ApiKeyAuth": [] "ApiKeyAuth": []
@ -279,6 +279,9 @@ const docTemplate = `{
"min": { "min": {
"type": "number" "type": "number"
}, },
"resolution": {
"type": "integer"
},
"to": { "to": {
"type": "integer" "type": "integer"
} }
@ -296,6 +299,9 @@ const docTemplate = `{
"metric": { "metric": {
"type": "string" "type": "string"
}, },
"resolution": {
"type": "integer"
},
"scale-by": { "scale-by": {
"type": "number" "type": "number"
}, },

View File

@ -255,7 +255,8 @@ func decodeLine(dec *lineprotocol.Decoder,
if len(subTypeBuf) == 0 { if len(subTypeBuf) == 0 {
subTypeBuf = append(subTypeBuf, val...) subTypeBuf = append(subTypeBuf, val...)
} else { } else {
subTypeBuf = reorder(typeBuf, val) subTypeBuf = reorder(subTypeBuf, val)
// subTypeBuf = reorder(typeBuf, val)
} }
case "stype-id": case "stype-id":
subTypeBuf = append(subTypeBuf, val...) subTypeBuf = append(subTypeBuf, val...)
@ -308,7 +309,7 @@ func decodeLine(dec *lineprotocol.Decoder,
} }
if t, err = dec.Time(lineprotocol.Second, t); err != nil { if t, err = dec.Time(lineprotocol.Second, t); err != nil {
return err return fmt.Errorf("timestamp : %#v with error : %#v", lineprotocol.Second, err.Error())
} }
if err := ms.WriteToLevel(lvl, selector, t.Unix(), []memorystore.Metric{metric}); err != nil { if err := ms.WriteToLevel(lvl, selector, t.Unix(), []memorystore.Metric{metric}); err != nil {

View File

@ -20,11 +20,23 @@ func MountRoutes(r *http.ServeMux) {
log.Fatalf("starting server failed: %v", err) log.Fatalf("starting server failed: %v", err)
} }
publicKey := ed25519.PublicKey(buf) publicKey := ed25519.PublicKey(buf)
// Compatibility
r.Handle("POST /api/free", authHandler(http.HandlerFunc(handleFree), publicKey))
r.Handle("POST /api/write", authHandler(http.HandlerFunc(handleWrite), publicKey))
r.Handle("GET /api/query", authHandler(http.HandlerFunc(handleQuery), publicKey))
r.Handle("GET /api/debug", authHandler(http.HandlerFunc(handleDebug), publicKey))
// Refactor
r.Handle("POST /api/free/", authHandler(http.HandlerFunc(handleFree), publicKey)) r.Handle("POST /api/free/", authHandler(http.HandlerFunc(handleFree), publicKey))
r.Handle("POST /api/write/", authHandler(http.HandlerFunc(handleWrite), publicKey)) r.Handle("POST /api/write/", authHandler(http.HandlerFunc(handleWrite), publicKey))
r.Handle("GET /api/query/", authHandler(http.HandlerFunc(handleQuery), publicKey)) r.Handle("GET /api/query/", authHandler(http.HandlerFunc(handleQuery), publicKey))
r.Handle("GET /api/debug/", authHandler(http.HandlerFunc(handleDebug), publicKey)) r.Handle("GET /api/debug/", authHandler(http.HandlerFunc(handleDebug), publicKey))
} else { } else {
// Compatibility
r.HandleFunc("POST /api/free", handleFree)
r.HandleFunc("POST /api/write", handleWrite)
r.HandleFunc("GET /api/query", handleQuery)
r.HandleFunc("GET /api/debug", handleDebug)
// Refactor
r.HandleFunc("POST /api/free/", handleFree) r.HandleFunc("POST /api/free/", handleFree)
r.HandleFunc("POST /api/write/", handleWrite) r.HandleFunc("POST /api/write/", handleWrite)
r.HandleFunc("GET /api/query/", handleQuery) r.HandleFunc("GET /api/query/", handleQuery)

View File

@ -478,8 +478,10 @@ func findFiles(direntries []fs.DirEntry, t int64, findMoreRecentFiles bool) ([]s
e := direntries[i] e := direntries[i]
ts1 := nums[e.Name()] ts1 := nums[e.Name()]
if findMoreRecentFiles && t <= ts1 || i == len(direntries)-1 { if findMoreRecentFiles && t <= ts1 {
filenames = append(filenames, e.Name()) filenames = append(filenames, e.Name())
}
if i == len(direntries)-1 {
continue continue
} }

View File

@ -107,6 +107,10 @@ func (l *Level) sizeInBytes() int64 {
} }
} }
for _, child := range l.children {
size += child.sizeInBytes()
}
return size return size
} }

View File

@ -10,6 +10,7 @@ import (
"github.com/ClusterCockpit/cc-metric-store/internal/config" "github.com/ClusterCockpit/cc-metric-store/internal/config"
"github.com/ClusterCockpit/cc-metric-store/internal/util" "github.com/ClusterCockpit/cc-metric-store/internal/util"
"github.com/ClusterCockpit/cc-metric-store/pkg/resampler"
) )
var ( var (
@ -178,17 +179,18 @@ func (m *MemoryStore) WriteToLevel(l *Level, selector []string, ts int64, metric
// If the level does not hold the metric itself, the data will be aggregated recursively from the children. // If the level does not hold the metric itself, the data will be aggregated recursively from the children.
// The second and third return value are the actual from/to for the data. Those can be different from // The second and third return value are the actual from/to for the data. Those can be different from
// the range asked for if no data was available. // the range asked for if no data was available.
func (m *MemoryStore) Read(selector util.Selector, metric string, from, to int64) ([]util.Float, int64, int64, error) { func (m *MemoryStore) Read(selector util.Selector, metric string, from, to, resolution int64) ([]util.Float, int64, int64, int64, error) {
if from > to { if from > to {
return nil, 0, 0, errors.New("invalid time range") return nil, 0, 0, 0, errors.New("invalid time range")
} }
minfo, ok := m.Metrics[metric] minfo, ok := m.Metrics[metric]
if !ok { if !ok {
return nil, 0, 0, errors.New("unkown metric: " + metric) return nil, 0, 0, 0, errors.New("unkown metric: " + metric)
} }
n, data := 0, make([]util.Float, (to-from)/minfo.Frequency+1) n, data := 0, make([]util.Float, (to-from)/minfo.Frequency+1)
err := m.root.findBuffers(selector, minfo.Offset, func(b *buffer) error { err := m.root.findBuffers(selector, minfo.Offset, func(b *buffer) error {
cdata, cfrom, cto, err := b.read(from, to, data) cdata, cfrom, cto, err := b.read(from, to, data)
if err != nil { if err != nil {
@ -221,9 +223,9 @@ func (m *MemoryStore) Read(selector util.Selector, metric string, from, to int64
}) })
if err != nil { if err != nil {
return nil, 0, 0, err return nil, 0, 0, 0, err
} else if n == 0 { } else if n == 0 {
return nil, 0, 0, errors.New("metric or host not found") return nil, 0, 0, 0, errors.New("metric or host not found")
} else if n > 1 { } else if n > 1 {
if minfo.Aggregation == config.AvgAggregation { if minfo.Aggregation == config.AvgAggregation {
normalize := 1. / util.Float(n) normalize := 1. / util.Float(n)
@ -231,11 +233,17 @@ func (m *MemoryStore) Read(selector util.Selector, metric string, from, to int64
data[i] *= normalize data[i] *= normalize
} }
} else if minfo.Aggregation != config.SumAggregation { } else if minfo.Aggregation != config.SumAggregation {
return nil, 0, 0, errors.New("invalid aggregation") return nil, 0, 0, 0, errors.New("invalid aggregation")
} }
} }
return data, from, to, nil data, resolution, err = resampler.LargestTriangleThreeBucket(data, minfo.Frequency, resolution)
if err != nil {
return nil, 0, 0, 0, err
}
return data, from, to, resolution, nil
} }
// Release all buffers for the selected level and all its children that contain only // Release all buffers for the selected level and all its children that contain only

122
pkg/resampler/resampler.go Normal file
View File

@ -0,0 +1,122 @@
package resampler
import (
"errors"
"fmt"
"math"
"github.com/ClusterCockpit/cc-metric-store/internal/util"
)
func SimpleResampler(data []util.Float, old_frequency int64, new_frequency int64) ([]util.Float, error) {
if old_frequency == 0 || new_frequency == 0 {
return nil, errors.New("either old or new frequency is set to 0")
}
if new_frequency%old_frequency != 0 {
return nil, errors.New("new sampling frequency should be multiple of the old frequency")
}
var step int = int(new_frequency / old_frequency)
var new_data_length = len(data) / step
if new_data_length == 0 || len(data) < 100 || new_data_length >= len(data) {
return data, nil
}
new_data := make([]util.Float, new_data_length)
for i := 0; i < new_data_length; i++ {
new_data[i] = data[i*step]
}
return new_data, nil
}
// Inspired by one of the algorithms from https://skemman.is/bitstream/1946/15343/3/SS_MSthesis.pdf
// Adapted from https://github.com/haoel/downsampling/blob/master/core/lttb.go
func LargestTriangleThreeBucket(data []util.Float, old_frequency int64, new_frequency int64) ([]util.Float, int64, error) {
if old_frequency == 0 || new_frequency == 0 {
return data, old_frequency, nil
}
if new_frequency%old_frequency != 0 {
return nil, 0, fmt.Errorf("new sampling frequency : %d should be multiple of the old frequency : %d", new_frequency, old_frequency)
}
var step int = int(new_frequency / old_frequency)
var new_data_length = len(data) / step
if new_data_length == 0 || len(data) < 100 || new_data_length >= len(data) {
return data, old_frequency, nil
}
new_data := make([]util.Float, 0, new_data_length)
// Bucket size. Leave room for start and end data points
bucketSize := float64(len(data)-2) / float64(new_data_length-2)
new_data = append(new_data, data[0]) // Always add the first point
// We have 3 pointers represent for
// > bucketLow - the current bucket's beginning location
// > bucketMiddle - the current bucket's ending location,
// also the beginning location of next bucket
// > bucketHight - the next bucket's ending location.
bucketLow := 1
bucketMiddle := int(math.Floor(bucketSize)) + 1
var prevMaxAreaPoint int
for i := 0; i < new_data_length-2; i++ {
bucketHigh := int(math.Floor(float64(i+2)*bucketSize)) + 1
if bucketHigh >= len(data)-1 {
bucketHigh = len(data) - 2
}
// Calculate point average for next bucket (containing c)
avgPointX, avgPointY := calculateAverageDataPoint(data[bucketMiddle:bucketHigh+1], int64(bucketMiddle))
// Get the range for current bucket
currBucketStart := bucketLow
currBucketEnd := bucketMiddle
// Point a
pointX := prevMaxAreaPoint
pointY := data[prevMaxAreaPoint]
maxArea := -1.0
var maxAreaPoint int
flag_ := 0
for ; currBucketStart < currBucketEnd; currBucketStart++ {
area := calculateTriangleArea(util.Float(pointX), pointY, avgPointX, avgPointY, util.Float(currBucketStart), data[currBucketStart])
if area > maxArea {
maxArea = area
maxAreaPoint = currBucketStart
}
if math.IsNaN(float64(avgPointY)) {
flag_ = 1
}
}
if flag_ == 1 {
new_data = append(new_data, util.NaN) // Pick this point from the bucket
} else {
new_data = append(new_data, data[maxAreaPoint]) // Pick this point from the bucket
}
prevMaxAreaPoint = maxAreaPoint // This MaxArea point is the next's prevMAxAreaPoint
//move to the next window
bucketLow = bucketMiddle
bucketMiddle = bucketHigh
}
new_data = append(new_data, data[len(data)-1]) // Always add last
return new_data, new_frequency, nil
}

35
pkg/resampler/util.go Normal file
View File

@ -0,0 +1,35 @@
package resampler
import (
"math"
"github.com/ClusterCockpit/cc-metric-store/internal/util"
)
func calculateTriangleArea(paX, paY, pbX, pbY, pcX, pcY util.Float) float64 {
area := ((paX-pcX)*(pbY-paY) - (paX-pbX)*(pcY-paY)) * 0.5
return math.Abs(float64(area))
}
func calculateAverageDataPoint(points []util.Float, xStart int64) (avgX util.Float, avgY util.Float) {
flag := 0
for _, point := range points {
avgX += util.Float(xStart)
avgY += point
xStart++
if math.IsNaN(float64(point)) {
flag = 1
}
}
l := util.Float(len(points))
avgX /= l
avgY /= l
if flag == 1 {
return avgX, util.NaN
} else {
return avgX, avgY
}
}