Pull out metric List build from metricstore Init

This commit is contained in:
2026-01-27 17:06:52 +01:00
parent 28a3ff8d67
commit 752e19c276
5 changed files with 58 additions and 140 deletions

View File

@@ -321,7 +321,8 @@ func runServer(ctx context.Context) error {
haveMetricstore := false haveMetricstore := false
mscfg := ccconf.GetPackageConfig("metric-store") mscfg := ccconf.GetPackageConfig("metric-store")
if mscfg != nil { if mscfg != nil {
metricstore.Init(mscfg, &wg) metrics := metricstore.BuildMetricList()
metricstore.Init(mscfg, metrics, &wg)
// Inject repository as NodeProvider to break import cycle // Inject repository as NodeProvider to break import cycle
ms := metricstore.GetMemoryStore() ms := metricstore.GetMemoryStore()

View File

@@ -24,6 +24,7 @@ func DataStaging(wg *sync.WaitGroup, ctx context.Context) {
defer wg.Done() defer wg.Done()
ms := GetMemoryStore()
var avroLevel *AvroLevel var avroLevel *AvroLevel
oldSelector := make([]string, 0) oldSelector := make([]string, 0)
@@ -39,7 +40,7 @@ func DataStaging(wg *sync.WaitGroup, ctx context.Context) {
return return
} }
// Process remaining message // Process remaining message
freq, err := GetMetricFrequency(val.MetricName) freq, err := ms.GetMetricFrequency(val.MetricName)
if err != nil { if err != nil {
continue continue
} }
@@ -76,7 +77,7 @@ func DataStaging(wg *sync.WaitGroup, ctx context.Context) {
} }
// Fetch the frequency of the metric from the global configuration // Fetch the frequency of the metric from the global configuration
freq, err := GetMetricFrequency(val.MetricName) freq, err := ms.GetMetricFrequency(val.MetricName)
if err != nil { if err != nil {
cclog.Errorf("Error fetching metric frequency: %s\n", err) cclog.Errorf("Error fetching metric frequency: %s\n", err)
continue continue

View File

@@ -45,6 +45,10 @@ package metricstore
import ( import (
"fmt" "fmt"
"time" "time"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
"github.com/ClusterCockpit/cc-lib/v2/schema"
) )
const ( const (
@@ -207,55 +211,51 @@ type MetricConfig struct {
offset int offset int
} }
// Metrics is the global map of metric configurations. func BuildMetricList() map[string]MetricConfig {
// var metrics map[string]MetricConfig = make(map[string]MetricConfig)
// Keyed by metric name (e.g., "cpu_load", "mem_used"). Populated during Init()
// from cluster configuration and checkpoint restoration. Each MetricConfig.offset
// corresponds to the buffer slice index in Level.metrics.
var Metrics map[string]MetricConfig
// GetMetricFrequency retrieves the measurement interval for a metric. addMetric := func(name string, metric MetricConfig) error {
// if metrics == nil {
// Parameters: metrics = make(map[string]MetricConfig, 0)
// - metricName: Metric name (e.g., "cpu_load") }
//
// Returns:
// - int64: Frequency in seconds
// - error: Non-nil if metric not found in Metrics map
func GetMetricFrequency(metricName string) (int64, error) {
if metric, ok := Metrics[metricName]; ok {
return metric.Frequency, nil
}
return 0, fmt.Errorf("[METRICSTORE]> metric %s not found", metricName)
}
// AddMetric registers a new metric or updates an existing one. if existingMetric, ok := metrics[name]; ok {
// if existingMetric.Frequency != metric.Frequency {
// If the metric already exists with a different frequency, uses the higher frequency if existingMetric.Frequency < metric.Frequency {
// (finer granularity). This handles cases where different clusters report the same existingMetric.Frequency = metric.Frequency
// metric at different intervals. metrics[name] = existingMetric
// }
// Parameters: }
// - name: Metric name (e.g., "cpu_load") } else {
// - metric: Configuration (frequency, aggregation strategy) metrics[name] = metric
// }
// Returns:
// - error: Always nil (signature for future error handling) return nil
func AddMetric(name string, metric MetricConfig) error {
if Metrics == nil {
Metrics = make(map[string]MetricConfig, 0)
} }
if existingMetric, ok := Metrics[name]; ok { // Helper function to add metric configuration
if existingMetric.Frequency != metric.Frequency { addMetricConfig := func(mc *schema.MetricConfig) {
if existingMetric.Frequency < metric.Frequency { agg, err := AssignAggregationStrategy(mc.Aggregation)
existingMetric.Frequency = metric.Frequency if err != nil {
Metrics[name] = existingMetric cclog.Warnf("Could not find aggregation strategy for metric config '%s': %s", mc.Name, err.Error())
}
addMetric(mc.Name, MetricConfig{
Frequency: int64(mc.Timestep),
Aggregation: agg,
})
}
for _, c := range archive.Clusters {
for _, mc := range c.MetricConfig {
addMetricConfig(mc)
}
for _, sc := range c.SubClusters {
for _, mc := range sc.MetricConfig {
addMetricConfig(mc)
} }
} }
} else {
Metrics[name] = metric
} }
return nil return metrics
} }

View File

@@ -24,13 +24,13 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt"
"runtime" "runtime"
"slices" "slices"
"sync" "sync"
"time" "time"
"github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
"github.com/ClusterCockpit/cc-lib/v2/resampler" "github.com/ClusterCockpit/cc-lib/v2/resampler"
"github.com/ClusterCockpit/cc-lib/v2/schema" "github.com/ClusterCockpit/cc-lib/v2/schema"
@@ -120,7 +120,7 @@ type MemoryStore struct {
// //
// Note: Signal handling must be implemented by the caller. Call Shutdown() when // Note: Signal handling must be implemented by the caller. Call Shutdown() when
// receiving termination signals to ensure checkpoint data is persisted. // receiving termination signals to ensure checkpoint data is persisted.
func Init(rawConfig json.RawMessage, wg *sync.WaitGroup) { func Init(rawConfig json.RawMessage, metrics map[string]MetricConfig, wg *sync.WaitGroup) {
startupTime := time.Now() startupTime := time.Now()
if rawConfig != nil { if rawConfig != nil {
@@ -138,33 +138,8 @@ func Init(rawConfig json.RawMessage, wg *sync.WaitGroup) {
} }
cclog.Debugf("[METRICSTORE]> Using %d workers for checkpoint/archive operations\n", Keys.NumWorkers) cclog.Debugf("[METRICSTORE]> Using %d workers for checkpoint/archive operations\n", Keys.NumWorkers)
// Helper function to add metric configuration
addMetricConfig := func(mc *schema.MetricConfig) {
agg, err := AssignAggregationStrategy(mc.Aggregation)
if err != nil {
cclog.Warnf("Could not find aggregation strategy for metric config '%s': %s", mc.Name, err.Error())
}
AddMetric(mc.Name, MetricConfig{
Frequency: int64(mc.Timestep),
Aggregation: agg,
})
}
for _, c := range archive.Clusters {
for _, mc := range c.MetricConfig {
addMetricConfig(mc)
}
for _, sc := range c.SubClusters {
for _, mc := range sc.MetricConfig {
addMetricConfig(mc)
}
}
}
// Pass the config.MetricStoreKeys // Pass the config.MetricStoreKeys
InitMetrics(Metrics) InitMetrics(metrics)
ms := GetMemoryStore() ms := GetMemoryStore()
@@ -279,6 +254,13 @@ func GetMemoryStore() *MemoryStore {
return msInstance return msInstance
} }
func (ms *MemoryStore) GetMetricFrequency(metricName string) (int64, error) {
if metric, ok := ms.Metrics[metricName]; ok {
return metric.Frequency, nil
}
return 0, fmt.Errorf("[METRICSTORE]> metric %s not found", metricName)
}
// SetNodeProvider sets the NodeProvider implementation for the MemoryStore. // SetNodeProvider sets the NodeProvider implementation for the MemoryStore.
// This must be called during initialization to provide job state information // This must be called during initialization to provide job state information
// for selective buffer retention during Free operations. // for selective buffer retention during Free operations.

View File

@@ -38,72 +38,6 @@ func TestAssignAggregationStrategy(t *testing.T) {
} }
} }
func TestAddMetric(t *testing.T) {
// Reset Metrics before test
Metrics = make(map[string]MetricConfig)
err := AddMetric("test_metric", MetricConfig{
Frequency: 60,
Aggregation: SumAggregation,
})
if err != nil {
t.Errorf("AddMetric() error = %v", err)
}
if _, ok := Metrics["test_metric"]; !ok {
t.Error("AddMetric() did not add metric to Metrics map")
}
// Test updating with higher frequency
err = AddMetric("test_metric", MetricConfig{
Frequency: 120,
Aggregation: SumAggregation,
})
if err != nil {
t.Errorf("AddMetric() error = %v", err)
}
if Metrics["test_metric"].Frequency != 120 {
t.Errorf("AddMetric() frequency = %d, want 120", Metrics["test_metric"].Frequency)
}
// Test updating with lower frequency (should not update)
err = AddMetric("test_metric", MetricConfig{
Frequency: 30,
Aggregation: SumAggregation,
})
if err != nil {
t.Errorf("AddMetric() error = %v", err)
}
if Metrics["test_metric"].Frequency != 120 {
t.Errorf("AddMetric() frequency = %d, want 120 (should not downgrade)", Metrics["test_metric"].Frequency)
}
}
func TestGetMetricFrequency(t *testing.T) {
// Reset Metrics before test
Metrics = map[string]MetricConfig{
"test_metric": {
Frequency: 60,
Aggregation: SumAggregation,
},
}
freq, err := GetMetricFrequency("test_metric")
if err != nil {
t.Errorf("GetMetricFrequency() error = %v", err)
}
if freq != 60 {
t.Errorf("GetMetricFrequency() = %d, want 60", freq)
}
_, err = GetMetricFrequency("nonexistent")
if err == nil {
t.Error("GetMetricFrequency() expected error for nonexistent metric")
}
}
func TestBufferWrite(t *testing.T) { func TestBufferWrite(t *testing.T) {
b := newBuffer(100, 10) b := newBuffer(100, 10)