From b186dca79dc474f2c453370e3437fe84152d731d Mon Sep 17 00:00:00 2001 From: Aditya Ujeniya Date: Wed, 21 Aug 2024 09:47:16 +0200 Subject: [PATCH] Resampler implemented --- .gitignore | 4 + config.json | 220 +++++++++++++++++++++++----- internal/api/api.go | 20 +-- internal/memorystore/level.go | 4 + internal/memorystore/memorystore.go | 22 ++- pkg/resampler/resampler.go | 112 ++++++++++++++ pkg/resampler/util.go | 25 ++++ 7 files changed, 351 insertions(+), 56 deletions(-) create mode 100644 pkg/resampler/resampler.go create mode 100644 pkg/resampler/util.go diff --git a/.gitignore b/.gitignore index 81accb2..c5d48a2 100644 --- a/.gitignore +++ b/.gitignore @@ -18,3 +18,7 @@ # Project specific ignores /var /configs + + +migrateTimestamps.pl +test_ccms_api.sh diff --git a/config.json b/config.json index 6bd9f84..c0d1197 100644 --- a/config.json +++ b/config.json @@ -1,45 +1,185 @@ { - "metrics": { - "flops_any": { - "frequency": 15, - "aggregation": "sum" + "metrics": { + "debug_metric": { + "frequency": 60, + "aggregation": "avg" + }, + "clock": { + "frequency": 60, + "aggregation": "avg" + }, + "cpu_idle": { + "frequency": 60, + "aggregation": "avg" + }, + "cpu_iowait": { + "frequency": 60, + "aggregation": "avg" + }, + "cpu_irq": { + "frequency": 60, + "aggregation": "avg" + }, + "cpu_system": { + "frequency": 60, + "aggregation": "avg" + }, + "cpu_user": { + "frequency": 60, + "aggregation": "avg" + }, + "nv_mem_util": { + "frequency": 60, + "aggregation": "avg" + }, + "nv_temp": { + "frequency": 60, + "aggregation": "avg" + }, + "nv_sm_clock": { + "frequency": 60, + "aggregation": "avg" + }, + "acc_utilization": { + "frequency": 60, + "aggregation": "avg" + }, + "acc_mem_used": { + "frequency": 60, + "aggregation": "sum" + }, + "acc_power": { + "frequency": 60, + "aggregation": "sum" + }, + "flops_any": { + "frequency": 60, + "aggregation": "sum" + }, + "flops_dp": { + "frequency": 60, + "aggregation": "sum" + }, + "flops_sp": { + "frequency": 60, + "aggregation": "sum" + }, + "ib_recv": { + "frequency": 60, + "aggregation": "sum" + }, + "ib_xmit": { + "frequency": 60, + "aggregation": "sum" + }, + "ib_recv_pkts": { + "frequency": 60, + "aggregation": "sum" + }, + "ib_xmit_pkts": { + "frequency": 60, + "aggregation": "sum" + }, + "cpu_power": { + "frequency": 60, + "aggregation": "sum" + }, + "core_power": { + "frequency": 60, + "aggregation": "sum" + }, + "mem_power": { + "frequency": 60, + "aggregation": "sum" + }, + "ipc": { + "frequency": 60, + "aggregation": "avg" + }, + "cpu_load": { + "frequency": 60, + "aggregation": null + }, + "lustre_close": { + "frequency": 60, + "aggregation": null + }, + "lustre_open": { + "frequency": 60, + "aggregation": null + }, + "lustre_statfs": { + "frequency": 60, + "aggregation": null + }, + "lustre_read_bytes": { + "frequency": 60, + "aggregation": null + }, + "lustre_write_bytes": { + "frequency": 60, + "aggregation": null + }, + "net_bw": { + "frequency": 60, + "aggregation": null + }, + "file_bw": { + "frequency": 60, + "aggregation": null + }, + "mem_bw": { + "frequency": 60, + "aggregation": "sum" + }, + "mem_cached": { + "frequency": 60, + "aggregation": null + }, + "mem_used": { + "frequency": 60, + "aggregation": null + }, + "net_bytes_in": { + "frequency": 60, + "aggregation": null + }, + "net_bytes_out": { + "frequency": 60, + "aggregation": null + }, + "nfs4_read": { + "frequency": 60, + "aggregation": null + }, + "nfs4_total": { + "frequency": 60, + "aggregation": null + }, + "nfs4_write": { + "frequency": 60, + "aggregation": null + }, + "vectorization_ratio": { + "frequency": 60, + "aggregation": "avg" + } }, - "flops_dp": { - "frequency": 15, - "aggregation": "sum" + "checkpoints": { + "interval": "12h", + "directory": "./var/checkpoints", + "restore": "48h" }, - "flops_sp": { - "frequency": 15, - "aggregation": "sum" + "archive": { + "interval": "168h", + "directory": "./var/archive" }, - "mem_bw": { - "frequency": 15, - "aggregation": "sum" + "http-api": { + "address": "localhost:8081", + "https-cert-file": null, + "https-key-file": null }, - "load_one": { - "frequency": 15, - "aggregation": null - }, - "load_five": { - "frequency": 15, - "aggregation": null - } - }, - "checkpoints": { - "interval": "12h", - "directory": "./var/checkpoints", - "restore": "48h" - }, - "archive": { - "interval": "168h", - "directory": "./var/archive" - }, - "http-api": { - "address": "127.0.0.1:8081", - "https-cert-file": null, - "https-key-file": null - }, - "retention-in-memory": "48h", - "nats": null, - "jwt-public-key": "kzfYrYy+TzpanWZHJ5qSdMj5uKUWgq74BWhQG6copP0=" -} + "retention-in-memory": "48h", + "nats": null, + "jwt-public-key": "kzfYrYy+TzpanWZHJ5qSdMj5uKUWgq74BWhQG6copP0=" +} \ No newline at end of file diff --git a/internal/api/api.go b/internal/api/api.go index b5e55fa..197cee9 100644 --- a/internal/api/api.go +++ b/internal/api/api.go @@ -47,13 +47,14 @@ type ErrorResponse struct { } type ApiMetricData struct { - Error *string `json:"error,omitempty"` - Data util.FloatArray `json:"data,omitempty"` - From int64 `json:"from"` - To int64 `json:"to"` - Avg util.Float `json:"avg"` - Min util.Float `json:"min"` - Max util.Float `json:"max"` + Error *string `json:"error,omitempty"` + Data util.FloatArray `json:"data,omitempty"` + From int64 `json:"from"` + To int64 `json:"to"` + Resolution int64 `json:"resolution"` + Avg util.Float `json:"avg"` + Min util.Float `json:"min"` + Max util.Float `json:"max"` } func handleError(err error, statusCode int, rw http.ResponseWriter) { @@ -234,6 +235,7 @@ type ApiQuery struct { SubType *string `json:"subtype,omitempty"` Metric string `json:"metric"` Hostname string `json:"host"` + Resolution int64 `json:"resolution"` TypeIds []string `json:"type-ids,omitempty"` SubTypeIds []string `json:"subtype-ids,omitempty"` ScaleFactor util.Float `json:"scale-by,omitempty"` @@ -336,8 +338,8 @@ func handleQuery(rw http.ResponseWriter, r *http.Request) { res := make([]ApiMetricData, 0, len(sels)) for _, sel := range sels { data := ApiMetricData{} - data.Data, data.From, data.To, err = ms.Read(sel, query.Metric, req.From, req.To) - // log.Printf("data: %#v, %#v, %#v, %#v", data.Data, data.From, data.To, err) + data.Data, data.From, data.To, data.Resolution, err = ms.Read(sel, query.Metric, req.From, req.To, query.Resolution) + if err != nil { msg := err.Error() data.Error = &msg diff --git a/internal/memorystore/level.go b/internal/memorystore/level.go index 4bbfe7c..f5f133a 100644 --- a/internal/memorystore/level.go +++ b/internal/memorystore/level.go @@ -107,6 +107,10 @@ func (l *Level) sizeInBytes() int64 { } } + for _, child := range l.children { + size += child.sizeInBytes() + } + return size } diff --git a/internal/memorystore/memorystore.go b/internal/memorystore/memorystore.go index 4868a85..229ab00 100644 --- a/internal/memorystore/memorystore.go +++ b/internal/memorystore/memorystore.go @@ -10,6 +10,7 @@ import ( "github.com/ClusterCockpit/cc-metric-store/internal/config" "github.com/ClusterCockpit/cc-metric-store/internal/util" + "github.com/ClusterCockpit/cc-metric-store/pkg/resampler" ) var ( @@ -178,17 +179,18 @@ func (m *MemoryStore) WriteToLevel(l *Level, selector []string, ts int64, metric // If the level does not hold the metric itself, the data will be aggregated recursively from the children. // The second and third return value are the actual from/to for the data. Those can be different from // the range asked for if no data was available. -func (m *MemoryStore) Read(selector util.Selector, metric string, from, to int64) ([]util.Float, int64, int64, error) { +func (m *MemoryStore) Read(selector util.Selector, metric string, from, to, resolution int64) ([]util.Float, int64, int64, int64, error) { if from > to { - return nil, 0, 0, errors.New("invalid time range") + return nil, 0, 0, 0, errors.New("invalid time range") } minfo, ok := m.Metrics[metric] if !ok { - return nil, 0, 0, errors.New("unkown metric: " + metric) + return nil, 0, 0, 0, errors.New("unkown metric: " + metric) } n, data := 0, make([]util.Float, (to-from)/minfo.Frequency+1) + err := m.root.findBuffers(selector, minfo.Offset, func(b *buffer) error { cdata, cfrom, cto, err := b.read(from, to, data) if err != nil { @@ -221,9 +223,9 @@ func (m *MemoryStore) Read(selector util.Selector, metric string, from, to int64 }) if err != nil { - return nil, 0, 0, err + return nil, 0, 0, 0, err } else if n == 0 { - return nil, 0, 0, errors.New("metric or host not found") + return nil, 0, 0, 0, errors.New("metric or host not found") } else if n > 1 { if minfo.Aggregation == config.AvgAggregation { normalize := 1. / util.Float(n) @@ -231,11 +233,17 @@ func (m *MemoryStore) Read(selector util.Selector, metric string, from, to int64 data[i] *= normalize } } else if minfo.Aggregation != config.SumAggregation { - return nil, 0, 0, errors.New("invalid aggregation") + return nil, 0, 0, 0, errors.New("invalid aggregation") } } - return data, from, to, nil + data, resolution, err = resampler.LargestTriangleThreeBucket(data, minfo.Frequency, resolution) + + if err != nil { + return nil, 0, 0, 0, err + } + + return data, from, to, resolution, nil } // Release all buffers for the selected level and all its children that contain only diff --git a/pkg/resampler/resampler.go b/pkg/resampler/resampler.go new file mode 100644 index 0000000..f9748db --- /dev/null +++ b/pkg/resampler/resampler.go @@ -0,0 +1,112 @@ +package resampler + +import ( + "errors" + "math" + + "github.com/ClusterCockpit/cc-metric-store/internal/util" +) + +func SimpleResampler(data []util.Float, old_frequency int64, new_frequency int64) ([]util.Float, error) { + if old_frequency == 0 || new_frequency == 0 { + return nil, errors.New("either old or new frequency is set to 0") + } + + if new_frequency%old_frequency != 0 { + return nil, errors.New("new sampling frequency should be multiple of the old frequency") + } + + var step int = int(new_frequency / old_frequency) + var new_data_length = len(data) / step + + if new_data_length == 0 || len(data) < 100 || new_data_length >= len(data) { + return data, nil + } + + new_data := make([]util.Float, new_data_length) + + for i := 0; i < new_data_length; i++ { + new_data[i] = data[i*step] + } + + return new_data, nil +} + +// Inspired by one of the algorithms from https://skemman.is/bitstream/1946/15343/3/SS_MSthesis.pdf +// Adapted from https://github.com/haoel/downsampling/blob/master/core/lttb.go +func LargestTriangleThreeBucket(data []util.Float, old_frequency int64, new_frequency int64) ([]util.Float, int64, error) { + + if old_frequency == 0 || new_frequency == 0 { + return data, old_frequency, nil + } + + if new_frequency%old_frequency != 0 { + return nil, 0, errors.New("new sampling frequency should be multiple of the old frequency") + } + + var step int = int(new_frequency / old_frequency) + var new_data_length = len(data) / step + + if new_data_length == 0 || len(data) < 100 || new_data_length >= len(data) { + return data, old_frequency, nil + } + + new_data := make([]util.Float, 0, new_data_length) + + // Bucket size. Leave room for start and end data points + bucketSize := float64(len(data)-2) / float64(new_data_length-2) + + new_data = append(new_data, data[0]) // Always add the first point + + // We have 3 pointers represent for + // > bucketLow - the current bucket's beginning location + // > bucketMiddle - the current bucket's ending location, + // also the beginning location of next bucket + // > bucketHight - the next bucket's ending location. + bucketLow := 1 + bucketMiddle := int(math.Floor(bucketSize)) + 1 + + var prevMaxAreaPoint int + + for i := 0; i < new_data_length-2; i++ { + + bucketHigh := int(math.Floor(float64(i+2)*bucketSize)) + 1 + if bucketHigh >= len(data)-1 { + bucketHigh = len(data) - 2 + } + + // Calculate point average for next bucket (containing c) + avgPointX, avgPointY := calculateAverageDataPoint(data[bucketMiddle:bucketHigh+1], int64(bucketMiddle)) + + // Get the range for current bucket + currBucketStart := bucketLow + currBucketEnd := bucketMiddle + + // Point a + pointX := prevMaxAreaPoint + pointY := data[prevMaxAreaPoint] + + maxArea := -1.0 + + var maxAreaPoint int + for ; currBucketStart < currBucketEnd; currBucketStart++ { + + area := calculateTriangleArea(util.Float(pointX), pointY, avgPointX, avgPointY, util.Float(currBucketStart), data[currBucketStart]) + if area > maxArea { + maxArea = area + maxAreaPoint = currBucketStart + } + } + + new_data = append(new_data, data[maxAreaPoint]) // Pick this point from the bucket + prevMaxAreaPoint = maxAreaPoint // This MaxArea point is the next's prevMAxAreaPoint + + //move to the next window + bucketLow = bucketMiddle + bucketMiddle = bucketHigh + } + + new_data = append(new_data, data[len(data)-1]) // Always add last + + return new_data, new_frequency, nil +} diff --git a/pkg/resampler/util.go b/pkg/resampler/util.go new file mode 100644 index 0000000..2e4fa73 --- /dev/null +++ b/pkg/resampler/util.go @@ -0,0 +1,25 @@ +package resampler + +import ( + "math" + + "github.com/ClusterCockpit/cc-metric-store/internal/util" +) + +func calculateTriangleArea(paX, paY, pbX, pbY, pcX, pcY util.Float) float64 { + area := ((paX-pcX)*(pbY-paY) - (paX-pbX)*(pcY-paY)) * 0.5 + return math.Abs(float64(area)) +} + +func calculateAverageDataPoint(points []util.Float, xStart int64) (avgX util.Float, avgY util.Float) { + + for _, point := range points { + avgX += util.Float(xStart) + avgY += point + xStart++ + } + l := util.Float(len(points)) + avgX /= l + avgY /= l + return avgX, avgY +}