From bac51891b759454548a3ea41ef4eb8966e971903 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Sat, 18 Oct 2025 08:30:42 +0200 Subject: [PATCH] Move avro into memorystore. Refactor Does not compile --- cmd/cc-backend/main.go | 5 +- cmd/cc-backend/server.go | 4 +- internal/memorystore/api.go | 47 +++-- internal/memorystore/archive.go | 12 +- .../{avro => memorystore}/avroCheckpoint.go | 70 +++---- internal/{avro => memorystore}/avroHelper.go | 13 +- internal/{avro => memorystore}/avroStruct.go | 17 +- internal/memorystore/buffer.go | 13 +- internal/memorystore/checkpoint.go | 65 +++--- .../memorystore.go => memorystore/config.go} | 28 ++- internal/memorystore/configSchema.go | 190 ++++++++++++++++++ internal/memorystore/debug.go | 7 +- internal/memorystore/healthcheck.go | 16 +- internal/memorystore/level.go | 7 +- internal/memorystore/lineprotocol.go | 24 ++- internal/memorystore/memorystore.go | 70 +++---- internal/memorystore/stats.go | 12 +- .../metricdata/cc-metric-store-internal.go | 80 ++++---- internal/metricdata/metricdata.go | 4 +- 19 files changed, 456 insertions(+), 228 deletions(-) rename internal/{avro => memorystore}/avroCheckpoint.go (88%) rename internal/{avro => memorystore}/avroHelper.go (86%) rename internal/{avro => memorystore}/avroStruct.go (94%) rename internal/{config/memorystore.go => memorystore/config.go} (78%) create mode 100644 internal/memorystore/configSchema.go diff --git a/cmd/cc-backend/main.go b/cmd/cc-backend/main.go index 2dbbe9f..659998d 100644 --- a/cmd/cc-backend/main.go +++ b/cmd/cc-backend/main.go @@ -246,9 +246,9 @@ func main() { var wg sync.WaitGroup // Metric Store starts after all flags have been processes - if config.InternalCCMSFlag { + if memorystore.InternalCCMSFlag { if mscfg := ccconf.GetPackageConfig("metric-store"); mscfg != nil { - config.InitMetricStore(mscfg) + memorystore.InitMetricStore(mscfg) } else { cclog.Abort("Metric Store configuration must be present") } @@ -257,7 +257,6 @@ func main() { } archiver.Start(repository.GetJobRepository()) - // // Comment out taskManager.Start(ccconf.GetPackageConfig("cron"), ccconf.GetPackageConfig("archive")) diff --git a/cmd/cc-backend/server.go b/cmd/cc-backend/server.go index fd1a0c4..95f6464 100644 --- a/cmd/cc-backend/server.go +++ b/cmd/cc-backend/server.go @@ -244,7 +244,7 @@ func serverInit() { apiHandle.MountConfigApiRoutes(configapi) apiHandle.MountFrontendApiRoutes(frontendapi) - if config.InternalCCMSFlag { + if memorystore.InternalCCMSFlag { apiHandle.MountMetricStoreApiRoutes(metricstoreapi) } @@ -369,7 +369,7 @@ func serverShutdown() { server.Shutdown(context.Background()) // Archive all the metric store data - if config.InternalCCMSFlag { + if memorystore.InternalCCMSFlag { memorystore.Shutdown() } diff --git a/internal/memorystore/api.go b/internal/memorystore/api.go index 367f245..6382090 100644 --- a/internal/memorystore/api.go +++ b/internal/memorystore/api.go @@ -2,6 +2,7 @@ // All rights reserved. // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. + package memorystore import ( @@ -47,7 +48,7 @@ type ErrorResponse struct { Error string `json:"error"` // Error Message } -type ApiMetricData struct { +type APIMetricData struct { Error *string `json:"error,omitempty"` Data schema.FloatArray `json:"data,omitempty"` From int64 `json:"from"` @@ -69,7 +70,7 @@ func handleError(err error, statusCode int, rw http.ResponseWriter) { } // TODO: Optimize this, just like the stats endpoint! -func (data *ApiMetricData) AddStats() { +func (data *APIMetricData) AddStats() { n := 0 sum, min, max := 0.0, math.MaxFloat64, -math.MaxFloat64 for _, x := range data.Data { @@ -93,7 +94,7 @@ func (data *ApiMetricData) AddStats() { } } -func (data *ApiMetricData) ScaleBy(f schema.Float) { +func (data *APIMetricData) ScaleBy(f schema.Float) { if f == 0 || f == 1 { return } @@ -106,7 +107,7 @@ func (data *ApiMetricData) ScaleBy(f schema.Float) { } } -func (data *ApiMetricData) PadDataWithNull(ms *MemoryStore, from, to int64, metric string) { +func (data *APIMetricData) PadDataWithNull(ms *MemoryStore, from, to int64, metric string) { minfo, ok := ms.Metrics[metric] if !ok { return @@ -115,7 +116,7 @@ func (data *ApiMetricData) PadDataWithNull(ms *MemoryStore, from, to int64, metr if (data.From / minfo.Frequency) > (from / minfo.Frequency) { padfront := int((data.From / minfo.Frequency) - (from / minfo.Frequency)) ndata := make([]schema.Float, 0, padfront+len(data.Data)) - for i := 0; i < padfront; i++ { + for range padfront { ndata = append(ndata, schema.NaN) } for j := 0; j < len(data.Data); j++ { @@ -218,9 +219,9 @@ func HandleWrite(rw http.ResponseWriter, r *http.Request) { rw.WriteHeader(http.StatusOK) } -type ApiQueryRequest struct { +type APIQueryRequest struct { Cluster string `json:"cluster"` - Queries []ApiQuery `json:"queries"` + Queries []APIQuery `json:"queries"` ForAllNodes []string `json:"for-all-nodes"` From int64 `json:"from"` To int64 `json:"to"` @@ -229,12 +230,12 @@ type ApiQueryRequest struct { WithPadding bool `json:"with-padding"` } -type ApiQueryResponse struct { - Queries []ApiQuery `json:"queries,omitempty"` - Results [][]ApiMetricData `json:"results"` +type APIQueryResponse struct { + Queries []APIQuery `json:"queries,omitempty"` + Results [][]APIMetricData `json:"results"` } -type ApiQuery struct { +type APIQuery struct { Type *string `json:"type,omitempty"` SubType *string `json:"subtype,omitempty"` Metric string `json:"metric"` @@ -246,22 +247,21 @@ type ApiQuery struct { Aggregate bool `json:"aggreg"` } -func FetchData(req ApiQueryRequest) (*ApiQueryResponse, error) { - +func FetchData(req APIQueryRequest) (*APIQueryResponse, error) { req.WithData = true req.WithData = true req.WithData = true ms := GetMemoryStore() - response := ApiQueryResponse{ - Results: make([][]ApiMetricData, 0, len(req.Queries)), + response := APIQueryResponse{ + Results: make([][]APIMetricData, 0, len(req.Queries)), } if req.ForAllNodes != nil { nodes := ms.ListChildren([]string{req.Cluster}) for _, node := range nodes { for _, metric := range req.ForAllNodes { - q := ApiQuery{ + q := APIQuery{ Metric: metric, Hostname: node, } @@ -300,21 +300,21 @@ func FetchData(req ApiQueryRequest) (*ApiQueryResponse, error) { } sels = append(sels, sel) } else { - for _, typeId := range query.TypeIds { + for _, typeID := range query.TypeIds { if query.SubType != nil { - for _, subTypeId := range query.SubTypeIds { + for _, subTypeID := range query.SubTypeIds { sels = append(sels, util.Selector{ {String: req.Cluster}, {String: query.Hostname}, - {String: *query.Type + typeId}, - {String: *query.SubType + subTypeId}, + {String: *query.Type + typeID}, + {String: *query.SubType + subTypeID}, }) } } else { sels = append(sels, util.Selector{ {String: req.Cluster}, {String: query.Hostname}, - {String: *query.Type + typeId}, + {String: *query.Type + typeID}, }) } } @@ -323,12 +323,11 @@ func FetchData(req ApiQueryRequest) (*ApiQueryResponse, error) { // log.Printf("query: %#v\n", query) // log.Printf("sels: %#v\n", sels) var err error - res := make([]ApiMetricData, 0, len(sels)) + res := make([]APIMetricData, 0, len(sels)) for _, sel := range sels { - data := ApiMetricData{} + data := APIMetricData{} data.Data, data.From, data.To, data.Resolution, err = ms.Read(sel, query.Metric, req.From, req.To, query.Resolution) - if err != nil { msg := err.Error() data.Error = &msg diff --git a/internal/memorystore/archive.go b/internal/memorystore/archive.go index 9720d20..0b65869 100644 --- a/internal/memorystore/archive.go +++ b/internal/memorystore/archive.go @@ -2,6 +2,7 @@ // All rights reserved. This file is part of cc-backend. // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. + package memorystore import ( @@ -18,14 +19,13 @@ import ( "sync/atomic" "time" - "github.com/ClusterCockpit/cc-backend/internal/config" cclog "github.com/ClusterCockpit/cc-lib/ccLogger" ) func Archiving(wg *sync.WaitGroup, ctx context.Context) { go func() { defer wg.Done() - d, err := time.ParseDuration(config.MetricStoreKeys.Archive.Interval) + d, err := time.ParseDuration(Keys.Archive.Interval) if err != nil { log.Fatalf("[METRICSTORE]> error parsing archive interval duration: %v\n", err) } @@ -46,8 +46,8 @@ func Archiving(wg *sync.WaitGroup, ctx context.Context) { case <-ticks: t := time.Now().Add(-d) log.Printf("[METRICSTORE]> start archiving checkpoints (older than %s)...\n", t.Format(time.RFC3339)) - n, err := ArchiveCheckpoints(config.MetricStoreKeys.Checkpoints.RootDir, - config.MetricStoreKeys.Archive.RootDir, t.Unix(), config.MetricStoreKeys.Archive.DeleteInstead) + n, err := ArchiveCheckpoints(Keys.Checkpoints.RootDir, + Keys.Archive.RootDir, t.Unix(), Keys.Archive.DeleteInstead) if err != nil { log.Printf("[METRICSTORE]> archiving failed: %s\n", err.Error()) @@ -59,7 +59,7 @@ func Archiving(wg *sync.WaitGroup, ctx context.Context) { }() } -var ErrNoNewData error = errors.New("all data already archived") +var ErrNoNewArchiveData error = errors.New("all data already archived") // ZIP all checkpoint files older than `from` together and write them to the `archiveDir`, // deleting them from the `checkpointsDir`. @@ -129,7 +129,7 @@ func archiveCheckpoints(dir string, archiveDir string, from int64, deleteInstead return 0, err } - extension := config.MetricStoreKeys.Checkpoints.FileFormat + extension := Keys.Checkpoints.FileFormat files, err := findFiles(entries, from, extension, false) if err != nil { return 0, err diff --git a/internal/avro/avroCheckpoint.go b/internal/memorystore/avroCheckpoint.go similarity index 88% rename from internal/avro/avroCheckpoint.go rename to internal/memorystore/avroCheckpoint.go index b7c2ea1..8c82364 100644 --- a/internal/avro/avroCheckpoint.go +++ b/internal/memorystore/avroCheckpoint.go @@ -2,7 +2,8 @@ // All rights reserved. This file is part of cc-backend. // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. -package avro + +package memorystore import ( "bufio" @@ -19,12 +20,11 @@ import ( "sync/atomic" "time" - "github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-lib/schema" "github.com/linkedin/goavro/v2" ) -var NumWorkers int = 4 +var NumAvroWorkers int = 4 var ErrNoNewData error = errors.New("no data in the pool") @@ -58,9 +58,9 @@ func (as *AvroStore) ToCheckpoint(dir string, dumpAll bool) (int, error) { n, errs := int32(0), int32(0) var wg sync.WaitGroup - wg.Add(NumWorkers) - work := make(chan workItem, NumWorkers*2) - for range NumWorkers { + wg.Add(NumAvroWorkers) + work := make(chan workItem, NumAvroWorkers*2) + for range NumAvroWorkers { go func() { defer wg.Done() @@ -68,7 +68,7 @@ func (as *AvroStore) ToCheckpoint(dir string, dumpAll bool) (int, error) { from := getTimestamp(workItem.dir) if err := workItem.level.toCheckpoint(workItem.dir, from, dumpAll); err != nil { - if err == ErrNoNewData { + if err == ErrNoNewArchiveData { continue } @@ -113,7 +113,7 @@ func getTimestamp(dir string) int64 { if err != nil { return 0 } - var maxTs int64 = 0 + var maxTS int64 = 0 if len(files) == 0 { return 0 @@ -135,19 +135,19 @@ func getTimestamp(dir string) int64 { continue } - if ts > maxTs { - maxTs = ts + if ts > maxTS { + maxTS = ts } } - interval, _ := time.ParseDuration(config.MetricStoreKeys.Checkpoints.Interval) - updateTime := time.Unix(maxTs, 0).Add(interval).Add(time.Duration(CheckpointBufferMinutes-1) * time.Minute).Unix() + interval, _ := time.ParseDuration(Keys.Checkpoints.Interval) + updateTime := time.Unix(maxTS, 0).Add(interval).Add(time.Duration(CheckpointBufferMinutes-1) * time.Minute).Unix() if updateTime < time.Now().Unix() { return 0 } - return maxTs + return maxTS } func (l *AvroLevel) toCheckpoint(dir string, from int64, dumpAll bool) error { @@ -156,27 +156,27 @@ func (l *AvroLevel) toCheckpoint(dir string, from int64, dumpAll bool) error { // fmt.Printf("Checkpointing directory: %s\n", dir) // filepath contains the resolution - int_res, _ := strconv.Atoi(path.Base(dir)) + intRes, _ := strconv.Atoi(path.Base(dir)) // find smallest overall timestamp in l.data map and delete it from l.data - minTs := int64(1<<63 - 1) + minTS := int64(1<<63 - 1) for ts, dat := range l.data { - if ts < minTs && len(dat) != 0 { - minTs = ts + if ts < minTS && len(dat) != 0 { + minTS = ts } } - if from == 0 && minTs != int64(1<<63-1) { - from = minTs + if from == 0 && minTS != int64(1<<63-1) { + from = minTS } if from == 0 { - return ErrNoNewData + return ErrNoNewArchiveData } var schema string var codec *goavro.Codec - record_list := make([]map[string]any, 0) + recordList := make([]map[string]any, 0) var f *os.File @@ -208,19 +208,19 @@ func (l *AvroLevel) toCheckpoint(dir string, from int64, dumpAll bool) error { f.Close() } - time_ref := time.Now().Add(time.Duration(-CheckpointBufferMinutes+1) * time.Minute).Unix() + timeRef := time.Now().Add(time.Duration(-CheckpointBufferMinutes+1) * time.Minute).Unix() if dumpAll { - time_ref = time.Now().Unix() + timeRef = time.Now().Unix() } // Empty values if len(l.data) == 0 { // we checkpoint avro files every 60 seconds - repeat := 60 / int_res + repeat := 60 / intRes for range repeat { - record_list = append(record_list, make(map[string]any)) + recordList = append(recordList, make(map[string]any)) } } @@ -228,15 +228,15 @@ func (l *AvroLevel) toCheckpoint(dir string, from int64, dumpAll bool) error { for ts := range l.data { flag := false - if ts < time_ref { + if ts < timeRef { data := l.data[ts] - schema_gen, err := generateSchema(data) + schemaGen, err := generateSchema(data) if err != nil { return err } - flag, schema, err = compareSchema(schema, schema_gen) + flag, schema, err = compareSchema(schema, schemaGen) if err != nil { return fmt.Errorf("failed to compare read and generated schema: %v", err) } @@ -262,7 +262,7 @@ func (l *AvroLevel) toCheckpoint(dir string, from int64, dumpAll bool) error { return fmt.Errorf("failed to read record: %v", err) } - record_list = append(record_list, record.(map[string]any)) + recordList = append(recordList, record.(map[string]any)) } f.Close() @@ -279,13 +279,13 @@ func (l *AvroLevel) toCheckpoint(dir string, from int64, dumpAll bool) error { return fmt.Errorf("failed to create codec after merged schema: %v", err) } - record_list = append(record_list, generateRecord(data)) + recordList = append(recordList, generateRecord(data)) delete(l.data, ts) } } - if len(record_list) == 0 { - return ErrNoNewData + if len(recordList) == 0 { + return ErrNoNewArchiveData } f, err = os.OpenFile(filePath, os.O_CREATE|os.O_APPEND|os.O_RDWR, 0o644) @@ -305,7 +305,7 @@ func (l *AvroLevel) toCheckpoint(dir string, from int64, dumpAll bool) error { } // Append the new record - if err := writer.Append(record_list); err != nil { + if err := writer.Append(recordList); err != nil { return fmt.Errorf("failed to append record: %v", err) } @@ -401,12 +401,12 @@ func compareSchema(schemaRead, schemaGen string) (bool, string, error) { } // Marshal the merged schema back to JSON - mergedSchemaJson, err := json.Marshal(mergedSchema) + mergedSchemaJSON, err := json.Marshal(mergedSchema) if err != nil { return false, "", fmt.Errorf("failed to marshal merged schema: %v", err) } - return true, string(mergedSchemaJson), nil + return true, string(mergedSchemaJSON), nil } func generateSchema(data map[string]schema.Float) (string, error) { diff --git a/internal/avro/avroHelper.go b/internal/memorystore/avroHelper.go similarity index 86% rename from internal/avro/avroHelper.go rename to internal/memorystore/avroHelper.go index 7710f0f..dadacdc 100644 --- a/internal/avro/avroHelper.go +++ b/internal/memorystore/avroHelper.go @@ -2,7 +2,8 @@ // All rights reserved. This file is part of cc-backend. // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. -package avro + +package memorystore import ( "context" @@ -10,14 +11,12 @@ import ( "slices" "strconv" "sync" - - "github.com/ClusterCockpit/cc-backend/internal/config" ) func DataStaging(wg *sync.WaitGroup, ctx context.Context) { // AvroPool is a pool of Avro writers. go func() { - if config.MetricStoreKeys.Checkpoints.FileFormat == "json" { + if Keys.Checkpoints.FileFormat == "json" { wg.Done() // Mark this goroutine as done return // Exit the goroutine } @@ -33,7 +32,7 @@ func DataStaging(wg *sync.WaitGroup, ctx context.Context) { return case val := <-LineProtocolMessages: // Fetch the frequency of the metric from the global configuration - freq, err := config.GetMetricFrequency(val.MetricName) + freq, err := GetMetricFrequency(val.MetricName) if err != nil { log.Printf("Error fetching metric frequency: %s\n", err) continue @@ -41,8 +40,8 @@ func DataStaging(wg *sync.WaitGroup, ctx context.Context) { metricName := "" - for _, selector_name := range val.Selector { - metricName += selector_name + Delimiter + for _, selectorName := range val.Selector { + metricName += selectorName + Delimiter } metricName += val.MetricName diff --git a/internal/avro/avroStruct.go b/internal/memorystore/avroStruct.go similarity index 94% rename from internal/avro/avroStruct.go rename to internal/memorystore/avroStruct.go index b0ded94..cc8005c 100644 --- a/internal/avro/avroStruct.go +++ b/internal/memorystore/avroStruct.go @@ -2,7 +2,8 @@ // All rights reserved. This file is part of cc-backend. // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. -package avro + +package memorystore import ( "sync" @@ -112,18 +113,18 @@ func (l *AvroLevel) addMetric(metricName string, value schema.Float, timestamp i } } else { // Get the last timestamp - var lastTs int64 + var lastTS int64 for ts := range l.data { - if ts > lastTs { - lastTs = ts + if ts > lastTS { + lastTS = ts } } // Create keys for the next KeyCounter timestamps - l.data[lastTs+int64(Freq)] = make(map[string]schema.Float, 0) + l.data[lastTS+int64(Freq)] = make(map[string]schema.Float, 0) } } - closestTs := int64(0) + closestTS := int64(0) minDiff := int64(Freq) + 1 // Start with diff just outside the valid range found := false @@ -144,13 +145,13 @@ func (l *AvroLevel) addMetric(metricName string, value schema.Float, timestamp i // Check if this is the closest timestamp so far if Abs(diff) < minDiff { minDiff = Abs(diff) - closestTs = ts + closestTS = ts found = true } } if found { - l.data[closestTs][metricName] = value + l.data[closestTS][metricName] = value } } diff --git a/internal/memorystore/buffer.go b/internal/memorystore/buffer.go index 39e9abc..a942d0a 100644 --- a/internal/memorystore/buffer.go +++ b/internal/memorystore/buffer.go @@ -1,3 +1,8 @@ +// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. This file is part of cc-backend. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + package memorystore import ( @@ -12,14 +17,14 @@ import ( // in the buffer chain will be created if needed so that no copying // of data or reallocation needs to happen on writes. const ( - BUFFER_CAP int = 512 + BufferCap int = 512 ) // So that we can reuse allocations var bufferPool sync.Pool = sync.Pool{ - New: func() interface{} { + New: func() any { return &buffer{ - data: make([]schema.Float, 0, BUFFER_CAP), + data: make([]schema.Float, 0, BufferCap), } }, } @@ -192,7 +197,7 @@ func (b *buffer) free(t int64) (delme bool, n int) { n += m if delme { b.prev.next = nil - if cap(b.prev.data) == BUFFER_CAP { + if cap(b.prev.data) == BufferCap { bufferPool.Put(b.prev) } b.prev = nil diff --git a/internal/memorystore/checkpoint.go b/internal/memorystore/checkpoint.go index adee443..e649ee0 100644 --- a/internal/memorystore/checkpoint.go +++ b/internal/memorystore/checkpoint.go @@ -1,3 +1,8 @@ +// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. This file is part of cc-backend. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + package memorystore import ( @@ -19,8 +24,6 @@ import ( "sync/atomic" "time" - "github.com/ClusterCockpit/cc-backend/internal/avro" - "github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-lib/schema" "github.com/linkedin/goavro/v2" ) @@ -44,12 +47,12 @@ var lastCheckpoint time.Time func Checkpointing(wg *sync.WaitGroup, ctx context.Context) { lastCheckpoint = time.Now() - if config.MetricStoreKeys.Checkpoints.FileFormat == "json" { + if Keys.Checkpoints.FileFormat == "json" { ms := GetMemoryStore() go func() { defer wg.Done() - d, err := time.ParseDuration(config.MetricStoreKeys.Checkpoints.Interval) + d, err := time.ParseDuration(Keys.Checkpoints.Interval) if err != nil { log.Fatal(err) } @@ -70,7 +73,7 @@ func Checkpointing(wg *sync.WaitGroup, ctx context.Context) { case <-ticks: log.Printf("[METRICSTORE]> start checkpointing (starting at %s)...\n", lastCheckpoint.Format(time.RFC3339)) now := time.Now() - n, err := ms.ToCheckpoint(config.MetricStoreKeys.Checkpoints.RootDir, + n, err := ms.ToCheckpoint(Keys.Checkpoints.RootDir, lastCheckpoint.Unix(), now.Unix()) if err != nil { log.Printf("[METRICSTORE]> checkpointing failed: %s\n", err.Error()) @@ -89,9 +92,9 @@ func Checkpointing(wg *sync.WaitGroup, ctx context.Context) { select { case <-ctx.Done(): return - case <-time.After(time.Duration(avro.CheckpointBufferMinutes) * time.Minute): + case <-time.After(time.Duration(CheckpointBufferMinutes) * time.Minute): // This is the first tick untill we collect the data for given minutes. - avro.GetAvroStore().ToCheckpoint(config.MetricStoreKeys.Checkpoints.RootDir, false) + GetAvroStore().ToCheckpoint(Keys.Checkpoints.RootDir, false) // log.Printf("Checkpointing %d avro files", count) } @@ -109,7 +112,7 @@ func Checkpointing(wg *sync.WaitGroup, ctx context.Context) { return case <-ticks: // Regular ticks of 1 minute to write data. - avro.GetAvroStore().ToCheckpoint(config.MetricStoreKeys.Checkpoints.RootDir, false) + GetAvroStore().ToCheckpoint(Keys.Checkpoints.RootDir, false) // log.Printf("Checkpointing %d avro files", count) } } @@ -176,7 +179,7 @@ func (m *MemoryStore) ToCheckpoint(dir string, from, to int64) (int, error) { for workItem := range work { if err := workItem.level.toCheckpoint(workItem.dir, from, to, m); err != nil { - if err == ErrNoNewData { + if err == ErrNoNewArchiveData { continue } @@ -219,7 +222,7 @@ func (l *Level) toCheckpointFile(from, to int64, m *MemoryStore) (*CheckpointFil } for metric, minfo := range m.Metrics { - b := l.metrics[minfo.Offset] + b := l.metrics[minfo.offset] if b == nil { continue } @@ -278,7 +281,7 @@ func (l *Level) toCheckpoint(dir string, from, to int64, m *MemoryStore) error { } if cf == nil { - return ErrNoNewData + return ErrNoNewArchiveData } filepath := path.Join(dir, fmt.Sprintf("%d.json", from)) @@ -376,7 +379,7 @@ done: func (m *MemoryStore) FromCheckpointFiles(dir string, from int64) (int, error) { if _, err := os.Stat(dir); os.IsNotExist(err) { // The directory does not exist, so create it using os.MkdirAll() - err := os.MkdirAll(dir, 0755) // 0755 sets the permissions for the directory + err := os.MkdirAll(dir, 0o755) // 0755 sets the permissions for the directory if err != nil { log.Fatalf("[METRICSTORE]> Error creating directory: %#v\n", err) } @@ -384,7 +387,7 @@ func (m *MemoryStore) FromCheckpointFiles(dir string, from int64) (int, error) { } // Config read (replace with your actual config read) - fileFormat := config.MetricStoreKeys.Checkpoints.FileFormat + fileFormat := Keys.Checkpoints.FileFormat if fileFormat == "" { fileFormat = "avro" } @@ -445,10 +448,10 @@ func (l *Level) loadAvroFile(m *MemoryStore, f *os.File, from int64) error { return fmt.Errorf("[METRICSTORE]> error while reading avro file (resolution parsing) : %s", err) } - from_timestamp, err := strconv.ParseInt(fileName[strings.Index(fileName, "_")+1:len(fileName)-5], 10, 64) + fromTimestamp, err := strconv.ParseInt(fileName[strings.Index(fileName, "_")+1:len(fileName)-5], 10, 64) // Same logic according to lineprotocol - from_timestamp -= (resolution / 2) + fromTimestamp -= (resolution / 2) if err != nil { return fmt.Errorf("[METRICSTORE]> error converting timestamp from the avro file : %s", err) @@ -472,7 +475,7 @@ func (l *Level) loadAvroFile(m *MemoryStore, f *os.File, from int64) error { return fmt.Errorf("[METRICSTORE]> error while reading avro file : %s", err) } - record, ok := datum.(map[string]interface{}) + record, ok := datum.(map[string]any) if !ok { panic("[METRICSTORE]> failed to assert datum as map[string]interface{}") } @@ -484,16 +487,16 @@ func (l *Level) loadAvroFile(m *MemoryStore, f *os.File, from int64) error { recordCounter += 1 } - to := (from_timestamp + (recordCounter / (60 / resolution) * 60)) + to := (fromTimestamp + (recordCounter / (60 / resolution) * 60)) if to < from { return nil } for key, floatArray := range metricsData { - metricName := avro.ReplaceKey(key) + metricName := ReplaceKey(key) - if strings.Contains(metricName, avro.Delimiter) { - subString := strings.Split(metricName, avro.Delimiter) + if strings.Contains(metricName, Delimiter) { + subString := strings.Split(metricName, Delimiter) lvl := l @@ -517,12 +520,12 @@ func (l *Level) loadAvroFile(m *MemoryStore, f *os.File, from int64) error { } leafMetricName := subString[len(subString)-1] - err = lvl.createBuffer(m, leafMetricName, floatArray, from_timestamp, resolution) + err = lvl.createBuffer(m, leafMetricName, floatArray, fromTimestamp, resolution) if err != nil { return fmt.Errorf("[METRICSTORE]> error while creating buffers from avroReader : %s", err) } } else { - err = l.createBuffer(m, metricName, floatArray, from_timestamp, resolution) + err = l.createBuffer(m, metricName, floatArray, fromTimestamp, resolution) if err != nil { return fmt.Errorf("[METRICSTORE]> error while creating buffers from avroReader : %s", err) } @@ -551,9 +554,9 @@ func (l *Level) createBuffer(m *MemoryStore, metricName string, floatArray schem // return errors.New("Unkown metric: " + name) } - prev := l.metrics[minfo.Offset] + prev := l.metrics[minfo.offset] if prev == nil { - l.metrics[minfo.Offset] = b + l.metrics[minfo.offset] = b } else { if prev.start > b.start { return errors.New("wooops") @@ -573,12 +576,12 @@ func (l *Level) createBuffer(m *MemoryStore, metricName string, floatArray schem prev.data = prev.data[0:len(prev.data):len(prev.data)] } } - l.metrics[minfo.Offset] = b + l.metrics[minfo.offset] = b return nil } -func (l *Level) loadJsonFile(m *MemoryStore, f *os.File, from int64) error { +func (l *Level) loadJSONFile(m *MemoryStore, f *os.File, from int64) error { br := bufio.NewReader(f) cf := &CheckpointFile{} if err := json.NewDecoder(br).Decode(cf); err != nil { @@ -615,9 +618,9 @@ func (l *Level) loadFile(cf *CheckpointFile, m *MemoryStore) error { // return errors.New("Unkown metric: " + name) } - prev := l.metrics[minfo.Offset] + prev := l.metrics[minfo.offset] if prev == nil { - l.metrics[minfo.Offset] = b + l.metrics[minfo.offset] = b } else { if prev.start > b.start { return errors.New("wooops") @@ -626,7 +629,7 @@ func (l *Level) loadFile(cf *CheckpointFile, m *MemoryStore) error { b.prev = prev prev.next = b } - l.metrics[minfo.Offset] = b + l.metrics[minfo.offset] = b } if len(cf.Children) > 0 && l.children == nil { @@ -690,7 +693,7 @@ func (l *Level) fromCheckpoint(m *MemoryStore, dir string, from int64, extension } loaders := map[string]func(*MemoryStore, *os.File, int64) error{ - "json": l.loadJsonFile, + "json": l.loadJSONFile, "avro": l.loadAvroFile, } @@ -736,7 +739,7 @@ func findFiles(direntries []fs.DirEntry, t int64, extension string, findMoreRece }) filenames := make([]string, 0) - for i := 0; i < len(direntries); i++ { + for i := range direntries { e := direntries[i] ts1 := nums[e.Name()] diff --git a/internal/config/memorystore.go b/internal/memorystore/config.go similarity index 78% rename from internal/config/memorystore.go rename to internal/memorystore/config.go index e9c85da..3bbca27 100644 --- a/internal/config/memorystore.go +++ b/internal/memorystore/config.go @@ -1,10 +1,16 @@ -package config +// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. This file is part of cc-backend. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package memorystore import ( "bytes" "encoding/json" "fmt" + "github.com/ClusterCockpit/cc-backend/internal/config" cclog "github.com/ClusterCockpit/cc-lib/ccLogger" ) @@ -50,7 +56,7 @@ type NatsConfig struct { } `json:"subscriptions"` } -var MetricStoreKeys MetricStoreConfig +var Keys MetricStoreConfig // AggregationStrategy for aggregation over multiple values at different cpus/sockets/..., not time! type AggregationStrategy int @@ -75,24 +81,26 @@ func AssignAggregationStratergy(str string) (AggregationStrategy, error) { } type MetricConfig struct { - // Interval in seconds at which measurements will arive. + // Interval in seconds at which measurements are stored Frequency int64 // Can be 'sum', 'avg' or null. Describes how to aggregate metrics from the same timestep over the hierarchy. Aggregation AggregationStrategy // Private, used internally... - Offset int + offset int } var Metrics map[string]MetricConfig -func InitMetricStore(msConfig json.RawMessage) { - // Validate(msConfigSchema, msConfig) - dec := json.NewDecoder(bytes.NewReader(msConfig)) - // dec.DisallowUnknownFields() - if err := dec.Decode(&MetricStoreKeys); err != nil { - cclog.Abortf("[METRICSTORE]> Metric Store Config Init: Could not decode config file '%s'.\nError: %s\n", msConfig, err.Error()) +func InitMetricStore(rawConfig json.RawMessage) { + if rawConfig != nil { + config.Validate(configSchema, rawConfig) + dec := json.NewDecoder(bytes.NewReader(rawConfig)) + // dec.DisallowUnknownFields() + if err := dec.Decode(&Keys); err != nil { + cclog.Abortf("[METRICSTORE]> Metric Store Config Init: Could not decode config file '%s'.\nError: %s\n", rawConfig, err.Error()) + } } } diff --git a/internal/memorystore/configSchema.go b/internal/memorystore/configSchema.go new file mode 100644 index 0000000..f548ac6 --- /dev/null +++ b/internal/memorystore/configSchema.go @@ -0,0 +1,190 @@ +// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. This file is part of cc-backend. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package memorystore + +const configSchema = `{ + "type": "object", + "properties": { + "jobList": { + "description": "Job list defaults. Applies to user- and jobs views.", + "type": "object", + "properties": { + "usePaging": { + "description": "If classic paging is used instead of continuous scrolling by default.", + "type": "boolean" + }, + "showFootprint": { + "description": "If footprint bars are shown as first column by default.", + "type": "boolean" + } + } + }, + "nodeList": { + "description": "Node list defaults. Applies to node list view.", + "type": "object", + "properties": { + "usePaging": { + "description": "If classic paging is used instead of continuous scrolling by default.", + "type": "boolean" + } + } + }, + "jobView": { + "description": "Job view defaults.", + "type": "object", + "properties": { + "showPolarPlot": { + "description": "If the job metric footprints polar plot is shown by default.", + "type": "boolean" + }, + "showFootprint": { + "description": "If the annotated job metric footprint bars are shown by default.", + "type": "boolean" + }, + "showRoofline": { + "description": "If the job roofline plot is shown by default.", + "type": "boolean" + }, + "showStatTable": { + "description": "If the job metric statistics table is shown by default.", + "type": "boolean" + } + } + }, + "metricConfig": { + "description": "Global initial metric selections for primary views of all clusters.", + "type": "object", + "properties": { + "jobListMetrics": { + "description": "Initial metrics shown for new users in job lists (User and jobs view).", + "type": "array", + "items": { + "type": "string", + "minItems": 1 + } + }, + "jobViewPlotMetrics": { + "description": "Initial metrics shown for new users as job view metric plots.", + "type": "array", + "items": { + "type": "string", + "minItems": 1 + } + }, + "jobViewTableMetrics": { + "description": "Initial metrics shown for new users in job view statistics table.", + "type": "array", + "items": { + "type": "string", + "minItems": 1 + } + }, + "clusters": { + "description": "Overrides for global defaults by cluster and subcluster.", + "type": "array", + "items": { + "type": "object", + "properties": { + "name": { + "description": "The name of the cluster." + }, + "jobListMetrics": { + "description": "Initial metrics shown for new users in job lists (User and jobs view) for subcluster.", + "type": "array", + "items": { + "type": "string", + "minItems": 1 + } + }, + "jobViewPlotMetrics": { + "description": "Initial metrics shown for new users as job view timeplots for subcluster.", + "type": "array", + "items": { + "type": "string", + "minItems": 1 + } + }, + "jobViewTableMetrics": { + "description": "Initial metrics shown for new users in job view statistics table for subcluster.", + "type": "array", + "items": { + "type": "string", + "minItems": 1 + } + }, + "subClusters": { + "description": "The array of overrides per subcluster.", + "type": "array", + "items": { + "type": "object", + "properties": { + "name": { + "description": "The name of the subcluster.", + "type": "string" + }, + "jobListMetrics": { + "description": "Initial metrics shown for new users in job lists (User and jobs view) for subcluster.", + "type": "array", + "items": { + "type": "string", + "minItems": 1 + } + }, + "jobViewPlotMetrics": { + "description": "Initial metrics shown for new users as job view timeplots for subcluster.", + "type": "array", + "items": { + "type": "string", + "minItems": 1 + } + }, + "jobViewTableMetrics": { + "description": "Initial metrics shown for new users in job view statistics table for subcluster.", + "type": "array", + "items": { + "type": "string", + "minItems": 1 + } + } + }, + "required": ["name"], + "minItems": 1 + } + } + }, + "required": ["name", "subClusters"], + "minItems": 1 + } + } + } + }, + "plotConfiguration": { + "description": "Initial settings for plot render options.", + "type": "object", + "properties": { + "colorBackground": { + "description": "If the metric plot backgrounds are initially colored by threshold limits.", + "type": "boolean" + }, + "plotsPerRow": { + "description": "How many plots are initially rendered in per row. Applies to job, single node, and analysis views.", + "type": "integer" + }, + "lineWidth": { + "description": "Initial thickness of rendered plotlines. Applies to metric plot, job compare plot and roofline.", + "type": "integer" + }, + "colorScheme": { + "description": "Initial colorScheme to be used for metric plots.", + "type": "array", + "items": { + "type": "string" + } + } + } + } + } +}` diff --git a/internal/memorystore/debug.go b/internal/memorystore/debug.go index 0f85024..b56cf25 100644 --- a/internal/memorystore/debug.go +++ b/internal/memorystore/debug.go @@ -1,3 +1,8 @@ +// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. This file is part of cc-backend. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + package memorystore import ( @@ -41,7 +46,7 @@ func (l *Level) debugDump(m *MemoryStore, w *bufio.Writer, lvlname string, buf [ depth += 1 objitems := 0 for name, mc := range m.Metrics { - if b := l.metrics[mc.Offset]; b != nil { + if b := l.metrics[mc.offset]; b != nil { for i := 0; i < depth; i++ { buf = append(buf, '\t') } diff --git a/internal/memorystore/healthcheck.go b/internal/memorystore/healthcheck.go index d655db3..b1052f3 100644 --- a/internal/memorystore/healthcheck.go +++ b/internal/memorystore/healthcheck.go @@ -1,3 +1,8 @@ +// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. This file is part of cc-backend. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + package memorystore import ( @@ -6,29 +11,28 @@ import ( "time" ) -// This is a threshold that allows a node to be healthy with certain number of data points missing. +// MaxMissingDataPoints is a threshold that allows a node to be healthy with certain number of data points missing. // Suppose a node does not receive last 5 data points, then healthCheck endpoint will still say a // node is healthy. Anything more than 5 missing points in metrics of the node will deem the node unhealthy. const MaxMissingDataPoints int64 = 5 -// This is a threshold which allows upto certain number of metrics in a node to be unhealthly. +// MaxUnhealthyMetrics is a threshold which allows upto certain number of metrics in a node to be unhealthly. // Works with MaxMissingDataPoints. Say 5 metrics (including submetrics) do not receive the last // MaxMissingDataPoints data points, then the node will be deemed healthy. Any more metrics that does // not receive data for MaxMissingDataPoints data points will deem the node unhealthy. const MaxUnhealthyMetrics int64 = 5 func (b *buffer) healthCheck() int64 { - // Check if the buffer is empty if b.data == nil { return 1 } - buffer_end := b.start + b.frequency*int64(len(b.data)) + bufferEnd := b.start + b.frequency*int64(len(b.data)) t := time.Now().Unix() // Check if the buffer is too old - if t-buffer_end > MaxMissingDataPoints*b.frequency { + if t-bufferEnd > MaxMissingDataPoints*b.frequency { return 1 } @@ -40,7 +44,7 @@ func (l *Level) healthCheck(m *MemoryStore, count int64) (int64, error) { defer l.lock.RUnlock() for _, mc := range m.Metrics { - if b := l.metrics[mc.Offset]; b != nil { + if b := l.metrics[mc.offset]; b != nil { count += b.healthCheck() } } diff --git a/internal/memorystore/level.go b/internal/memorystore/level.go index 76916e6..54f7c4a 100644 --- a/internal/memorystore/level.go +++ b/internal/memorystore/level.go @@ -1,3 +1,8 @@ +// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. This file is part of cc-backend. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + package memorystore import ( @@ -77,7 +82,7 @@ func (l *Level) free(t int64) (int, error) { delme, m := b.free(t) n += m if delme { - if cap(b.data) == BUFFER_CAP { + if cap(b.data) == BufferCap { bufferPool.Put(b) } l.metrics[i] = nil diff --git a/internal/memorystore/lineprotocol.go b/internal/memorystore/lineprotocol.go index 495197d..5265848 100644 --- a/internal/memorystore/lineprotocol.go +++ b/internal/memorystore/lineprotocol.go @@ -1,3 +1,8 @@ +// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. This file is part of cc-backend. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + package memorystore import ( @@ -7,8 +12,6 @@ import ( "sync" "time" - "github.com/ClusterCockpit/cc-backend/internal/avro" - "github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-lib/schema" "github.com/influxdata/line-protocol/v2/lineprotocol" "github.com/nats-io/nats.go" @@ -77,10 +80,10 @@ import ( // return nil // } -// Connect to a nats server and subscribe to "updates". This is a blocking -// function. handleLine will be called for each line recieved via nats. -// Send `true` through the done channel for gracefull termination. -func ReceiveNats(conf *(config.NatsConfig), +// ReceiveNats connects to a nats server and subscribes to "updates". This is a +// blocking function. handleLine will be called for each line recieved via +// nats. Send `true` through the done channel for gracefull termination. +func ReceiveNats(conf *(NatsConfig), ms *MemoryStore, workers int, ctx context.Context, @@ -170,7 +173,7 @@ func reorder(buf, prefix []byte) []byte { for i := m - 1; i >= 0; i-- { buf[i+n] = buf[i] } - for i := 0; i < n; i++ { + for i := range n { buf[i] = prefix[i] } return buf @@ -329,14 +332,15 @@ func decodeLine(dec *lineprotocol.Decoder, time := t.Unix() - if config.MetricStoreKeys.Checkpoints.FileFormat != "json" { - avro.LineProtocolMessages <- &avro.AvroStruct{ + if Keys.Checkpoints.FileFormat != "json" { + LineProtocolMessages <- &AvroStruct{ MetricName: string(metricBuf), Cluster: cluster, Node: host, Selector: append([]string{}, selector...), Value: metric.Value, - Timestamp: time} + Timestamp: time, + } } if err := ms.WriteToLevel(lvl, selector, time, []Metric{metric}); err != nil { diff --git a/internal/memorystore/memorystore.go b/internal/memorystore/memorystore.go index 4a631c2..11b4a1c 100644 --- a/internal/memorystore/memorystore.go +++ b/internal/memorystore/memorystore.go @@ -1,3 +1,8 @@ +// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. This file is part of cc-backend. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + package memorystore import ( @@ -11,8 +16,6 @@ import ( "syscall" "time" - "github.com/ClusterCockpit/cc-backend/internal/avro" - "github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-lib/resampler" "github.com/ClusterCockpit/cc-lib/runtimeEnv" "github.com/ClusterCockpit/cc-lib/schema" @@ -30,39 +33,36 @@ var NumWorkers int = 4 func init() { maxWorkers := 10 - NumWorkers = runtime.NumCPU()/2 + 1 - if NumWorkers > maxWorkers { - NumWorkers = maxWorkers - } + NumWorkers = min(runtime.NumCPU()/2+1, maxWorkers) } type Metric struct { Name string Value schema.Float - MetricConfig config.MetricConfig + MetricConfig MetricConfig } type MemoryStore struct { - Metrics map[string]config.MetricConfig + Metrics map[string]MetricConfig root Level } func Init(wg *sync.WaitGroup) { startupTime := time.Now() - //Pass the config.MetricStoreKeys - InitMetrics(config.Metrics) + // Pass the config.MetricStoreKeys + InitMetrics(Metrics) ms := GetMemoryStore() - d, err := time.ParseDuration(config.MetricStoreKeys.Checkpoints.Restore) + d, err := time.ParseDuration(Keys.Checkpoints.Restore) if err != nil { log.Fatal(err) } restoreFrom := startupTime.Add(-d) log.Printf("[METRICSTORE]> Loading checkpoints newer than %s\n", restoreFrom.Format(time.RFC3339)) - files, err := ms.FromCheckpointFiles(config.MetricStoreKeys.Checkpoints.RootDir, restoreFrom.Unix()) + files, err := ms.FromCheckpointFiles(Keys.Checkpoints.RootDir, restoreFrom.Unix()) loadedData := ms.SizeInBytes() / 1024 / 1024 // In MB if err != nil { log.Fatalf("[METRICSTORE]> Loading checkpoints failed: %s\n", err.Error()) @@ -85,7 +85,7 @@ func Init(wg *sync.WaitGroup) { Retention(wg, ctx) Checkpointing(wg, ctx) Archiving(wg, ctx) - avro.DataStaging(wg, ctx) + DataStaging(wg, ctx) wg.Add(1) sigs := make(chan os.Signal, 1) @@ -97,8 +97,8 @@ func Init(wg *sync.WaitGroup) { shutdown() }() - if config.MetricStoreKeys.Nats != nil { - for _, natsConf := range config.MetricStoreKeys.Nats { + if Keys.Nats != nil { + for _, natsConf := range Keys.Nats { // TODO: When multiple nats configs share a URL, do a single connect. wg.Add(1) nc := natsConf @@ -114,9 +114,9 @@ func Init(wg *sync.WaitGroup) { } } -// Create a new, initialized instance of a MemoryStore. +// InitMetrics creates a new, initialized instance of a MemoryStore. // Will panic if values in the metric configurations are invalid. -func InitMetrics(metrics map[string]config.MetricConfig) { +func InitMetrics(metrics map[string]MetricConfig) { singleton.Do(func() { offset := 0 for key, cfg := range metrics { @@ -124,10 +124,10 @@ func InitMetrics(metrics map[string]config.MetricConfig) { panic("[METRICSTORE]> invalid frequency") } - metrics[key] = config.MetricConfig{ + metrics[key] = MetricConfig{ Frequency: cfg.Frequency, Aggregation: cfg.Aggregation, - Offset: offset, + offset: offset, } offset += 1 } @@ -151,17 +151,17 @@ func GetMemoryStore() *MemoryStore { } func Shutdown() { - log.Printf("[METRICSTORE]> Writing to '%s'...\n", config.MetricStoreKeys.Checkpoints.RootDir) + log.Printf("[METRICSTORE]> Writing to '%s'...\n", Keys.Checkpoints.RootDir) var files int var err error ms := GetMemoryStore() - if config.MetricStoreKeys.Checkpoints.FileFormat == "json" { - files, err = ms.ToCheckpoint(config.MetricStoreKeys.Checkpoints.RootDir, lastCheckpoint.Unix(), time.Now().Unix()) + if Keys.Checkpoints.FileFormat == "json" { + files, err = ms.ToCheckpoint(Keys.Checkpoints.RootDir, lastCheckpoint.Unix(), time.Now().Unix()) } else { - files, err = avro.GetAvroStore().ToCheckpoint(config.MetricStoreKeys.Checkpoints.RootDir, true) - close(avro.LineProtocolMessages) + files, err = GetAvroStore().ToCheckpoint(Keys.Checkpoints.RootDir, true) + close(LineProtocolMessages) } if err != nil { @@ -234,7 +234,7 @@ func Shutdown() { func getName(m *MemoryStore, i int) string { for key, val := range m.Metrics { - if val.Offset == i { + if val.offset == i { return key } } @@ -246,7 +246,7 @@ func Retention(wg *sync.WaitGroup, ctx context.Context) { go func() { defer wg.Done() - d, err := time.ParseDuration(config.MetricStoreKeys.RetentionInMemory) + d, err := time.ParseDuration(Keys.RetentionInMemory) if err != nil { log.Fatal(err) } @@ -311,11 +311,11 @@ func (m *MemoryStore) WriteToLevel(l *Level, selector []string, ts int64, metric continue } - b := l.metrics[metric.MetricConfig.Offset] + b := l.metrics[metric.MetricConfig.offset] if b == nil { // First write to this metric and level b = newBuffer(ts, metric.MetricConfig.Frequency) - l.metrics[metric.MetricConfig.Offset] = b + l.metrics[metric.MetricConfig.offset] = b } nb, err := b.write(ts, metric.Value) @@ -325,7 +325,7 @@ func (m *MemoryStore) WriteToLevel(l *Level, selector []string, ts int64, metric // Last write created a new buffer... if b != nb { - l.metrics[metric.MetricConfig.Offset] = nb + l.metrics[metric.MetricConfig.offset] = nb } } return nil @@ -337,17 +337,17 @@ func (m *MemoryStore) WriteToLevel(l *Level, selector []string, ts int64, metric // the range asked for if no data was available. func (m *MemoryStore) Read(selector util.Selector, metric string, from, to, resolution int64) ([]schema.Float, int64, int64, int64, error) { if from > to { - return nil, 0, 0, 0, errors.New("[METRICSTORE]> invalid time range\n") + return nil, 0, 0, 0, errors.New("[METRICSTORE]> invalid time range") } minfo, ok := m.Metrics[metric] if !ok { - return nil, 0, 0, 0, errors.New("[METRICSTORE]> unkown metric: \n" + metric) + return nil, 0, 0, 0, errors.New("[METRICSTORE]> unkown metric: " + metric) } n, data := 0, make([]schema.Float, (to-from)/minfo.Frequency+1) - err := m.root.findBuffers(selector, minfo.Offset, func(b *buffer) error { + err := m.root.findBuffers(selector, minfo.offset, func(b *buffer) error { cdata, cfrom, cto, err := b.read(from, to, data) if err != nil { return err @@ -381,14 +381,14 @@ func (m *MemoryStore) Read(selector util.Selector, metric string, from, to, reso if err != nil { return nil, 0, 0, 0, err } else if n == 0 { - return nil, 0, 0, 0, errors.New("[METRICSTORE]> metric or host not found\n") + return nil, 0, 0, 0, errors.New("[METRICSTORE]> metric or host not found") } else if n > 1 { - if minfo.Aggregation == config.AvgAggregation { + if minfo.Aggregation == AvgAggregation { normalize := 1. / schema.Float(n) for i := 0; i < len(data); i++ { data[i] *= normalize } - } else if minfo.Aggregation != config.SumAggregation { + } else if minfo.Aggregation != SumAggregation { return nil, 0, 0, 0, errors.New("[METRICSTORE]> invalid aggregation") } } diff --git a/internal/memorystore/stats.go b/internal/memorystore/stats.go index 1066dcb..91b1f2c 100644 --- a/internal/memorystore/stats.go +++ b/internal/memorystore/stats.go @@ -1,10 +1,14 @@ +// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. This file is part of cc-backend. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + package memorystore import ( "errors" "math" - "github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-lib/util" ) @@ -78,7 +82,7 @@ func (m *MemoryStore) Stats(selector util.Selector, metric string, from, to int6 n, samples := 0, 0 avg, min, max := util.Float(0), math.MaxFloat32, -math.MaxFloat32 - err := m.root.findBuffers(selector, minfo.Offset, func(b *buffer) error { + err := m.root.findBuffers(selector, minfo.offset, func(b *buffer) error { stats, cfrom, cto, err := b.stats(from, to) if err != nil { return err @@ -105,9 +109,9 @@ func (m *MemoryStore) Stats(selector util.Selector, metric string, from, to int6 return nil, 0, 0, ErrNoData } - if minfo.Aggregation == config.AvgAggregation { + if minfo.Aggregation == AvgAggregation { avg /= util.Float(n) - } else if n > 1 && minfo.Aggregation != config.SumAggregation { + } else if n > 1 && minfo.Aggregation != SumAggregation { return nil, 0, 0, errors.New("invalid aggregation") } diff --git a/internal/metricdata/cc-metric-store-internal.go b/internal/metricdata/cc-metric-store-internal.go index 2babb09..01f4a05 100644 --- a/internal/metricdata/cc-metric-store-internal.go +++ b/internal/metricdata/cc-metric-store-internal.go @@ -54,7 +54,7 @@ func (ccms *CCMetricStoreInternal) LoadData( return nil, err } - req := memorystore.ApiQueryRequest{ + req := memorystore.APIQueryRequest{ Cluster: job.Cluster, From: job.StartTime, To: job.StartTime + int64(job.Duration), @@ -156,8 +156,8 @@ func (ccms *CCMetricStoreInternal) buildQueries( metrics []string, scopes []schema.MetricScope, resolution int64, -) ([]memorystore.ApiQuery, []schema.MetricScope, error) { - queries := make([]memorystore.ApiQuery, 0, len(metrics)*len(scopes)*len(job.Resources)) +) ([]memorystore.APIQuery, []schema.MetricScope, error) { + queries := make([]memorystore.APIQuery, 0, len(metrics)*len(scopes)*len(job.Resources)) assignedScope := []schema.MetricScope{} subcluster, scerr := archive.GetSubCluster(job.Cluster, job.SubCluster) @@ -219,7 +219,7 @@ func (ccms *CCMetricStoreInternal) buildQueries( continue } - queries = append(queries, memorystore.ApiQuery{ + queries = append(queries, memorystore.APIQuery{ Metric: metric, Hostname: host.Hostname, Aggregate: false, @@ -237,7 +237,7 @@ func (ccms *CCMetricStoreInternal) buildQueries( continue } - queries = append(queries, memorystore.ApiQuery{ + queries = append(queries, memorystore.APIQuery{ Metric: metric, Hostname: host.Hostname, Aggregate: true, @@ -251,7 +251,7 @@ func (ccms *CCMetricStoreInternal) buildQueries( // HWThread -> HWThead if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeHWThread { - queries = append(queries, memorystore.ApiQuery{ + queries = append(queries, memorystore.APIQuery{ Metric: metric, Hostname: host.Hostname, Aggregate: false, @@ -267,7 +267,7 @@ func (ccms *CCMetricStoreInternal) buildQueries( if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeCore { cores, _ := topology.GetCoresFromHWThreads(hwthreads) for _, core := range cores { - queries = append(queries, memorystore.ApiQuery{ + queries = append(queries, memorystore.APIQuery{ Metric: metric, Hostname: host.Hostname, Aggregate: true, @@ -284,7 +284,7 @@ func (ccms *CCMetricStoreInternal) buildQueries( if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeSocket { sockets, _ := topology.GetSocketsFromHWThreads(hwthreads) for _, socket := range sockets { - queries = append(queries, memorystore.ApiQuery{ + queries = append(queries, memorystore.APIQuery{ Metric: metric, Hostname: host.Hostname, Aggregate: true, @@ -299,7 +299,7 @@ func (ccms *CCMetricStoreInternal) buildQueries( // HWThread -> Node if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeNode { - queries = append(queries, memorystore.ApiQuery{ + queries = append(queries, memorystore.APIQuery{ Metric: metric, Hostname: host.Hostname, Aggregate: true, @@ -314,7 +314,7 @@ func (ccms *CCMetricStoreInternal) buildQueries( // Core -> Core if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeCore { cores, _ := topology.GetCoresFromHWThreads(hwthreads) - queries = append(queries, memorystore.ApiQuery{ + queries = append(queries, memorystore.APIQuery{ Metric: metric, Hostname: host.Hostname, Aggregate: false, @@ -330,7 +330,7 @@ func (ccms *CCMetricStoreInternal) buildQueries( if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeSocket { sockets, _ := topology.GetSocketsFromCores(hwthreads) for _, socket := range sockets { - queries = append(queries, memorystore.ApiQuery{ + queries = append(queries, memorystore.APIQuery{ Metric: metric, Hostname: host.Hostname, Aggregate: true, @@ -346,7 +346,7 @@ func (ccms *CCMetricStoreInternal) buildQueries( // Core -> Node if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeNode { cores, _ := topology.GetCoresFromHWThreads(hwthreads) - queries = append(queries, memorystore.ApiQuery{ + queries = append(queries, memorystore.APIQuery{ Metric: metric, Hostname: host.Hostname, Aggregate: true, @@ -361,7 +361,7 @@ func (ccms *CCMetricStoreInternal) buildQueries( // MemoryDomain -> MemoryDomain if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeMemoryDomain { sockets, _ := topology.GetMemoryDomainsFromHWThreads(hwthreads) - queries = append(queries, memorystore.ApiQuery{ + queries = append(queries, memorystore.APIQuery{ Metric: metric, Hostname: host.Hostname, Aggregate: false, @@ -376,7 +376,7 @@ func (ccms *CCMetricStoreInternal) buildQueries( // MemoryDoman -> Node if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeNode { sockets, _ := topology.GetMemoryDomainsFromHWThreads(hwthreads) - queries = append(queries, memorystore.ApiQuery{ + queries = append(queries, memorystore.APIQuery{ Metric: metric, Hostname: host.Hostname, Aggregate: true, @@ -391,7 +391,7 @@ func (ccms *CCMetricStoreInternal) buildQueries( // Socket -> Socket if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeSocket { sockets, _ := topology.GetSocketsFromHWThreads(hwthreads) - queries = append(queries, memorystore.ApiQuery{ + queries = append(queries, memorystore.APIQuery{ Metric: metric, Hostname: host.Hostname, Aggregate: false, @@ -406,7 +406,7 @@ func (ccms *CCMetricStoreInternal) buildQueries( // Socket -> Node if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeNode { sockets, _ := topology.GetSocketsFromHWThreads(hwthreads) - queries = append(queries, memorystore.ApiQuery{ + queries = append(queries, memorystore.APIQuery{ Metric: metric, Hostname: host.Hostname, Aggregate: true, @@ -420,7 +420,7 @@ func (ccms *CCMetricStoreInternal) buildQueries( // Node -> Node if nativeScope == schema.MetricScopeNode && scope == schema.MetricScopeNode { - queries = append(queries, memorystore.ApiQuery{ + queries = append(queries, memorystore.APIQuery{ Metric: metric, Hostname: host.Hostname, Resolution: resolution, @@ -448,7 +448,7 @@ func (ccms *CCMetricStoreInternal) LoadStats( return nil, err } - req := memorystore.ApiQueryRequest{ + req := memorystore.APIQueryRequest{ Cluster: job.Cluster, From: job.StartTime, To: job.StartTime + int64(job.Duration), @@ -507,7 +507,7 @@ func (ccms *CCMetricStoreInternal) LoadScopedStats( return nil, err } - req := memorystore.ApiQueryRequest{ + req := memorystore.APIQueryRequest{ Cluster: job.Cluster, From: job.StartTime, To: job.StartTime + int64(job.Duration), @@ -593,7 +593,7 @@ func (ccms *CCMetricStoreInternal) LoadNodeData( from, to time.Time, ctx context.Context, ) (map[string]map[string][]*schema.JobMetric, error) { - req := memorystore.ApiQueryRequest{ + req := memorystore.APIQueryRequest{ Cluster: cluster, From: from.Unix(), To: to.Unix(), @@ -606,7 +606,7 @@ func (ccms *CCMetricStoreInternal) LoadNodeData( } else { for _, node := range nodes { for _, metric := range metrics { - req.Queries = append(req.Queries, memorystore.ApiQuery{ + req.Queries = append(req.Queries, memorystore.APIQuery{ Hostname: node, Metric: metric, Resolution: 0, // Default for Node Queries: Will return metric $Timestep Resolution @@ -624,7 +624,7 @@ func (ccms *CCMetricStoreInternal) LoadNodeData( var errors []string data := make(map[string]map[string][]*schema.JobMetric) for i, res := range resBody.Results { - var query memorystore.ApiQuery + var query memorystore.APIQuery if resBody.Queries != nil { query = resBody.Queries[i] } else { @@ -737,7 +737,7 @@ func (ccms *CCMetricStoreInternal) LoadNodeListData( return nil, totalNodes, hasNextPage, err } - req := memorystore.ApiQueryRequest{ + req := memorystore.APIQueryRequest{ Cluster: cluster, Queries: queries, From: from.Unix(), @@ -755,7 +755,7 @@ func (ccms *CCMetricStoreInternal) LoadNodeListData( var errors []string data := make(map[string]schema.JobData) for i, row := range resBody.Results { - var query memorystore.ApiQuery + var query memorystore.APIQuery if resBody.Queries != nil { query = resBody.Queries[i] } else { @@ -842,8 +842,8 @@ func (ccms *CCMetricStoreInternal) buildNodeQueries( metrics []string, scopes []schema.MetricScope, resolution int64, -) ([]memorystore.ApiQuery, []schema.MetricScope, error) { - queries := make([]memorystore.ApiQuery, 0, len(metrics)*len(scopes)*len(nodes)) +) ([]memorystore.APIQuery, []schema.MetricScope, error) { + queries := make([]memorystore.APIQuery, 0, len(metrics)*len(scopes)*len(nodes)) assignedScope := []schema.MetricScope{} // Get Topol before loop if subCluster given @@ -926,7 +926,7 @@ func (ccms *CCMetricStoreInternal) buildNodeQueries( continue } - queries = append(queries, memorystore.ApiQuery{ + queries = append(queries, memorystore.APIQuery{ Metric: metric, Hostname: hostname, Aggregate: false, @@ -944,7 +944,7 @@ func (ccms *CCMetricStoreInternal) buildNodeQueries( continue } - queries = append(queries, memorystore.ApiQuery{ + queries = append(queries, memorystore.APIQuery{ Metric: metric, Hostname: hostname, Aggregate: true, @@ -958,7 +958,7 @@ func (ccms *CCMetricStoreInternal) buildNodeQueries( // HWThread -> HWThead if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeHWThread { - queries = append(queries, memorystore.ApiQuery{ + queries = append(queries, memorystore.APIQuery{ Metric: metric, Hostname: hostname, Aggregate: false, @@ -974,7 +974,7 @@ func (ccms *CCMetricStoreInternal) buildNodeQueries( if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeCore { cores, _ := topology.GetCoresFromHWThreads(topology.Node) for _, core := range cores { - queries = append(queries, memorystore.ApiQuery{ + queries = append(queries, memorystore.APIQuery{ Metric: metric, Hostname: hostname, Aggregate: true, @@ -991,7 +991,7 @@ func (ccms *CCMetricStoreInternal) buildNodeQueries( if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeSocket { sockets, _ := topology.GetSocketsFromHWThreads(topology.Node) for _, socket := range sockets { - queries = append(queries, memorystore.ApiQuery{ + queries = append(queries, memorystore.APIQuery{ Metric: metric, Hostname: hostname, Aggregate: true, @@ -1006,7 +1006,7 @@ func (ccms *CCMetricStoreInternal) buildNodeQueries( // HWThread -> Node if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeNode { - queries = append(queries, memorystore.ApiQuery{ + queries = append(queries, memorystore.APIQuery{ Metric: metric, Hostname: hostname, Aggregate: true, @@ -1021,7 +1021,7 @@ func (ccms *CCMetricStoreInternal) buildNodeQueries( // Core -> Core if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeCore { cores, _ := topology.GetCoresFromHWThreads(topology.Node) - queries = append(queries, memorystore.ApiQuery{ + queries = append(queries, memorystore.APIQuery{ Metric: metric, Hostname: hostname, Aggregate: false, @@ -1037,7 +1037,7 @@ func (ccms *CCMetricStoreInternal) buildNodeQueries( if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeSocket { sockets, _ := topology.GetSocketsFromCores(topology.Node) for _, socket := range sockets { - queries = append(queries, memorystore.ApiQuery{ + queries = append(queries, memorystore.APIQuery{ Metric: metric, Hostname: hostname, Aggregate: true, @@ -1053,7 +1053,7 @@ func (ccms *CCMetricStoreInternal) buildNodeQueries( // Core -> Node if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeNode { cores, _ := topology.GetCoresFromHWThreads(topology.Node) - queries = append(queries, memorystore.ApiQuery{ + queries = append(queries, memorystore.APIQuery{ Metric: metric, Hostname: hostname, Aggregate: true, @@ -1068,7 +1068,7 @@ func (ccms *CCMetricStoreInternal) buildNodeQueries( // MemoryDomain -> MemoryDomain if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeMemoryDomain { sockets, _ := topology.GetMemoryDomainsFromHWThreads(topology.Node) - queries = append(queries, memorystore.ApiQuery{ + queries = append(queries, memorystore.APIQuery{ Metric: metric, Hostname: hostname, Aggregate: false, @@ -1083,7 +1083,7 @@ func (ccms *CCMetricStoreInternal) buildNodeQueries( // MemoryDoman -> Node if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeNode { sockets, _ := topology.GetMemoryDomainsFromHWThreads(topology.Node) - queries = append(queries, memorystore.ApiQuery{ + queries = append(queries, memorystore.APIQuery{ Metric: metric, Hostname: hostname, Aggregate: true, @@ -1098,7 +1098,7 @@ func (ccms *CCMetricStoreInternal) buildNodeQueries( // Socket -> Socket if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeSocket { sockets, _ := topology.GetSocketsFromHWThreads(topology.Node) - queries = append(queries, memorystore.ApiQuery{ + queries = append(queries, memorystore.APIQuery{ Metric: metric, Hostname: hostname, Aggregate: false, @@ -1113,7 +1113,7 @@ func (ccms *CCMetricStoreInternal) buildNodeQueries( // Socket -> Node if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeNode { sockets, _ := topology.GetSocketsFromHWThreads(topology.Node) - queries = append(queries, memorystore.ApiQuery{ + queries = append(queries, memorystore.APIQuery{ Metric: metric, Hostname: hostname, Aggregate: true, @@ -1127,7 +1127,7 @@ func (ccms *CCMetricStoreInternal) buildNodeQueries( // Node -> Node if nativeScope == schema.MetricScopeNode && scope == schema.MetricScopeNode { - queries = append(queries, memorystore.ApiQuery{ + queries = append(queries, memorystore.APIQuery{ Metric: metric, Hostname: hostname, Resolution: resolution, diff --git a/internal/metricdata/metricdata.go b/internal/metricdata/metricdata.go index 4cfff34..3219611 100644 --- a/internal/metricdata/metricdata.go +++ b/internal/metricdata/metricdata.go @@ -2,6 +2,7 @@ // All rights reserved. This file is part of cc-backend. // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. + package metricdata import ( @@ -12,6 +13,7 @@ import ( "github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-backend/internal/graph/model" + "github.com/ClusterCockpit/cc-backend/internal/memorystore" cclog "github.com/ClusterCockpit/cc-lib/ccLogger" "github.com/ClusterCockpit/cc-lib/schema" ) @@ -56,7 +58,7 @@ func Init() error { mdr = &CCMetricStore{} case "cc-metric-store-internal": mdr = &CCMetricStoreInternal{} - config.InternalCCMSFlag = true + memorystore.InternalCCMSFlag = true case "prometheus": mdr = &PrometheusDataRepository{} case "test":