From 752e19c27663a7e18974a0aad88f8a7f61fca7f6 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Tue, 27 Jan 2026 17:06:52 +0100 Subject: [PATCH] Pull out metric List build from metricstore Init --- cmd/cc-backend/main.go | 3 +- pkg/metricstore/avroHelper.go | 5 +- pkg/metricstore/config.go | 86 ++++++++++++++--------------- pkg/metricstore/metricstore.go | 38 ++++--------- pkg/metricstore/metricstore_test.go | 66 ---------------------- 5 files changed, 58 insertions(+), 140 deletions(-) diff --git a/cmd/cc-backend/main.go b/cmd/cc-backend/main.go index 9ded95ba..deefce34 100644 --- a/cmd/cc-backend/main.go +++ b/cmd/cc-backend/main.go @@ -321,7 +321,8 @@ func runServer(ctx context.Context) error { haveMetricstore := false mscfg := ccconf.GetPackageConfig("metric-store") if mscfg != nil { - metricstore.Init(mscfg, &wg) + metrics := metricstore.BuildMetricList() + metricstore.Init(mscfg, metrics, &wg) // Inject repository as NodeProvider to break import cycle ms := metricstore.GetMemoryStore() diff --git a/pkg/metricstore/avroHelper.go b/pkg/metricstore/avroHelper.go index 985fdd78..36594e73 100644 --- a/pkg/metricstore/avroHelper.go +++ b/pkg/metricstore/avroHelper.go @@ -24,6 +24,7 @@ func DataStaging(wg *sync.WaitGroup, ctx context.Context) { defer wg.Done() + ms := GetMemoryStore() var avroLevel *AvroLevel oldSelector := make([]string, 0) @@ -39,7 +40,7 @@ func DataStaging(wg *sync.WaitGroup, ctx context.Context) { return } // Process remaining message - freq, err := GetMetricFrequency(val.MetricName) + freq, err := ms.GetMetricFrequency(val.MetricName) if err != nil { continue } @@ -76,7 +77,7 @@ func DataStaging(wg *sync.WaitGroup, ctx context.Context) { } // Fetch the frequency of the metric from the global configuration - freq, err := GetMetricFrequency(val.MetricName) + freq, err := ms.GetMetricFrequency(val.MetricName) if err != nil { cclog.Errorf("Error fetching metric frequency: %s\n", err) continue diff --git a/pkg/metricstore/config.go b/pkg/metricstore/config.go index 44a24f7d..69ee3563 100644 --- a/pkg/metricstore/config.go +++ b/pkg/metricstore/config.go @@ -45,6 +45,10 @@ package metricstore import ( "fmt" "time" + + "github.com/ClusterCockpit/cc-backend/pkg/archive" + cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" + "github.com/ClusterCockpit/cc-lib/v2/schema" ) const ( @@ -207,55 +211,51 @@ type MetricConfig struct { offset int } -// Metrics is the global map of metric configurations. -// -// Keyed by metric name (e.g., "cpu_load", "mem_used"). Populated during Init() -// from cluster configuration and checkpoint restoration. Each MetricConfig.offset -// corresponds to the buffer slice index in Level.metrics. -var Metrics map[string]MetricConfig +func BuildMetricList() map[string]MetricConfig { + var metrics map[string]MetricConfig = make(map[string]MetricConfig) -// GetMetricFrequency retrieves the measurement interval for a metric. -// -// Parameters: -// - metricName: Metric name (e.g., "cpu_load") -// -// Returns: -// - int64: Frequency in seconds -// - error: Non-nil if metric not found in Metrics map -func GetMetricFrequency(metricName string) (int64, error) { - if metric, ok := Metrics[metricName]; ok { - return metric.Frequency, nil - } - return 0, fmt.Errorf("[METRICSTORE]> metric %s not found", metricName) -} + addMetric := func(name string, metric MetricConfig) error { + if metrics == nil { + metrics = make(map[string]MetricConfig, 0) + } -// AddMetric registers a new metric or updates an existing one. -// -// If the metric already exists with a different frequency, uses the higher frequency -// (finer granularity). This handles cases where different clusters report the same -// metric at different intervals. -// -// Parameters: -// - name: Metric name (e.g., "cpu_load") -// - metric: Configuration (frequency, aggregation strategy) -// -// Returns: -// - error: Always nil (signature for future error handling) -func AddMetric(name string, metric MetricConfig) error { - if Metrics == nil { - Metrics = make(map[string]MetricConfig, 0) + if existingMetric, ok := metrics[name]; ok { + if existingMetric.Frequency != metric.Frequency { + if existingMetric.Frequency < metric.Frequency { + existingMetric.Frequency = metric.Frequency + metrics[name] = existingMetric + } + } + } else { + metrics[name] = metric + } + + return nil } - if existingMetric, ok := Metrics[name]; ok { - if existingMetric.Frequency != metric.Frequency { - if existingMetric.Frequency < metric.Frequency { - existingMetric.Frequency = metric.Frequency - Metrics[name] = existingMetric + // Helper function to add metric configuration + addMetricConfig := func(mc *schema.MetricConfig) { + agg, err := AssignAggregationStrategy(mc.Aggregation) + if err != nil { + cclog.Warnf("Could not find aggregation strategy for metric config '%s': %s", mc.Name, err.Error()) + } + + addMetric(mc.Name, MetricConfig{ + Frequency: int64(mc.Timestep), + Aggregation: agg, + }) + } + for _, c := range archive.Clusters { + for _, mc := range c.MetricConfig { + addMetricConfig(mc) + } + + for _, sc := range c.SubClusters { + for _, mc := range sc.MetricConfig { + addMetricConfig(mc) } } - } else { - Metrics[name] = metric } - return nil + return metrics } diff --git a/pkg/metricstore/metricstore.go b/pkg/metricstore/metricstore.go index 0d1f19c9..6246f520 100644 --- a/pkg/metricstore/metricstore.go +++ b/pkg/metricstore/metricstore.go @@ -24,13 +24,13 @@ import ( "context" "encoding/json" "errors" + "fmt" "runtime" "slices" "sync" "time" "github.com/ClusterCockpit/cc-backend/internal/config" - "github.com/ClusterCockpit/cc-backend/pkg/archive" cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" "github.com/ClusterCockpit/cc-lib/v2/resampler" "github.com/ClusterCockpit/cc-lib/v2/schema" @@ -120,7 +120,7 @@ type MemoryStore struct { // // Note: Signal handling must be implemented by the caller. Call Shutdown() when // receiving termination signals to ensure checkpoint data is persisted. -func Init(rawConfig json.RawMessage, wg *sync.WaitGroup) { +func Init(rawConfig json.RawMessage, metrics map[string]MetricConfig, wg *sync.WaitGroup) { startupTime := time.Now() if rawConfig != nil { @@ -138,33 +138,8 @@ func Init(rawConfig json.RawMessage, wg *sync.WaitGroup) { } cclog.Debugf("[METRICSTORE]> Using %d workers for checkpoint/archive operations\n", Keys.NumWorkers) - // Helper function to add metric configuration - addMetricConfig := func(mc *schema.MetricConfig) { - agg, err := AssignAggregationStrategy(mc.Aggregation) - if err != nil { - cclog.Warnf("Could not find aggregation strategy for metric config '%s': %s", mc.Name, err.Error()) - } - - AddMetric(mc.Name, MetricConfig{ - Frequency: int64(mc.Timestep), - Aggregation: agg, - }) - } - - for _, c := range archive.Clusters { - for _, mc := range c.MetricConfig { - addMetricConfig(mc) - } - - for _, sc := range c.SubClusters { - for _, mc := range sc.MetricConfig { - addMetricConfig(mc) - } - } - } - // Pass the config.MetricStoreKeys - InitMetrics(Metrics) + InitMetrics(metrics) ms := GetMemoryStore() @@ -279,6 +254,13 @@ func GetMemoryStore() *MemoryStore { return msInstance } +func (ms *MemoryStore) GetMetricFrequency(metricName string) (int64, error) { + if metric, ok := ms.Metrics[metricName]; ok { + return metric.Frequency, nil + } + return 0, fmt.Errorf("[METRICSTORE]> metric %s not found", metricName) +} + // SetNodeProvider sets the NodeProvider implementation for the MemoryStore. // This must be called during initialization to provide job state information // for selective buffer retention during Free operations. diff --git a/pkg/metricstore/metricstore_test.go b/pkg/metricstore/metricstore_test.go index fd7c963f..90cec2bd 100644 --- a/pkg/metricstore/metricstore_test.go +++ b/pkg/metricstore/metricstore_test.go @@ -38,72 +38,6 @@ func TestAssignAggregationStrategy(t *testing.T) { } } -func TestAddMetric(t *testing.T) { - // Reset Metrics before test - Metrics = make(map[string]MetricConfig) - - err := AddMetric("test_metric", MetricConfig{ - Frequency: 60, - Aggregation: SumAggregation, - }) - if err != nil { - t.Errorf("AddMetric() error = %v", err) - } - - if _, ok := Metrics["test_metric"]; !ok { - t.Error("AddMetric() did not add metric to Metrics map") - } - - // Test updating with higher frequency - err = AddMetric("test_metric", MetricConfig{ - Frequency: 120, - Aggregation: SumAggregation, - }) - if err != nil { - t.Errorf("AddMetric() error = %v", err) - } - - if Metrics["test_metric"].Frequency != 120 { - t.Errorf("AddMetric() frequency = %d, want 120", Metrics["test_metric"].Frequency) - } - - // Test updating with lower frequency (should not update) - err = AddMetric("test_metric", MetricConfig{ - Frequency: 30, - Aggregation: SumAggregation, - }) - if err != nil { - t.Errorf("AddMetric() error = %v", err) - } - - if Metrics["test_metric"].Frequency != 120 { - t.Errorf("AddMetric() frequency = %d, want 120 (should not downgrade)", Metrics["test_metric"].Frequency) - } -} - -func TestGetMetricFrequency(t *testing.T) { - // Reset Metrics before test - Metrics = map[string]MetricConfig{ - "test_metric": { - Frequency: 60, - Aggregation: SumAggregation, - }, - } - - freq, err := GetMetricFrequency("test_metric") - if err != nil { - t.Errorf("GetMetricFrequency() error = %v", err) - } - if freq != 60 { - t.Errorf("GetMetricFrequency() = %d, want 60", freq) - } - - _, err = GetMetricFrequency("nonexistent") - if err == nil { - t.Error("GetMetricFrequency() expected error for nonexistent metric") - } -} - func TestBufferWrite(t *testing.T) { b := newBuffer(100, 10)