From 752e19c27663a7e18974a0aad88f8a7f61fca7f6 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Tue, 27 Jan 2026 17:06:52 +0100 Subject: [PATCH 01/10] Pull out metric List build from metricstore Init --- cmd/cc-backend/main.go | 3 +- pkg/metricstore/avroHelper.go | 5 +- pkg/metricstore/config.go | 86 ++++++++++++++--------------- pkg/metricstore/metricstore.go | 38 ++++--------- pkg/metricstore/metricstore_test.go | 66 ---------------------- 5 files changed, 58 insertions(+), 140 deletions(-) diff --git a/cmd/cc-backend/main.go b/cmd/cc-backend/main.go index 9ded95ba..deefce34 100644 --- a/cmd/cc-backend/main.go +++ b/cmd/cc-backend/main.go @@ -321,7 +321,8 @@ func runServer(ctx context.Context) error { haveMetricstore := false mscfg := ccconf.GetPackageConfig("metric-store") if mscfg != nil { - metricstore.Init(mscfg, &wg) + metrics := metricstore.BuildMetricList() + metricstore.Init(mscfg, metrics, &wg) // Inject repository as NodeProvider to break import cycle ms := metricstore.GetMemoryStore() diff --git a/pkg/metricstore/avroHelper.go b/pkg/metricstore/avroHelper.go index 985fdd78..36594e73 100644 --- a/pkg/metricstore/avroHelper.go +++ b/pkg/metricstore/avroHelper.go @@ -24,6 +24,7 @@ func DataStaging(wg *sync.WaitGroup, ctx context.Context) { defer wg.Done() + ms := GetMemoryStore() var avroLevel *AvroLevel oldSelector := make([]string, 0) @@ -39,7 +40,7 @@ func DataStaging(wg *sync.WaitGroup, ctx context.Context) { return } // Process remaining message - freq, err := GetMetricFrequency(val.MetricName) + freq, err := ms.GetMetricFrequency(val.MetricName) if err != nil { continue } @@ -76,7 +77,7 @@ func DataStaging(wg *sync.WaitGroup, ctx context.Context) { } // Fetch the frequency of the metric from the global configuration - freq, err := GetMetricFrequency(val.MetricName) + freq, err := ms.GetMetricFrequency(val.MetricName) if err != nil { cclog.Errorf("Error fetching metric frequency: %s\n", err) continue diff --git a/pkg/metricstore/config.go b/pkg/metricstore/config.go index 44a24f7d..69ee3563 100644 --- a/pkg/metricstore/config.go +++ b/pkg/metricstore/config.go @@ -45,6 +45,10 @@ package metricstore import ( "fmt" "time" + + "github.com/ClusterCockpit/cc-backend/pkg/archive" + cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" + "github.com/ClusterCockpit/cc-lib/v2/schema" ) const ( @@ -207,55 +211,51 @@ type MetricConfig struct { offset int } -// Metrics is the global map of metric configurations. -// -// Keyed by metric name (e.g., "cpu_load", "mem_used"). Populated during Init() -// from cluster configuration and checkpoint restoration. Each MetricConfig.offset -// corresponds to the buffer slice index in Level.metrics. -var Metrics map[string]MetricConfig +func BuildMetricList() map[string]MetricConfig { + var metrics map[string]MetricConfig = make(map[string]MetricConfig) -// GetMetricFrequency retrieves the measurement interval for a metric. -// -// Parameters: -// - metricName: Metric name (e.g., "cpu_load") -// -// Returns: -// - int64: Frequency in seconds -// - error: Non-nil if metric not found in Metrics map -func GetMetricFrequency(metricName string) (int64, error) { - if metric, ok := Metrics[metricName]; ok { - return metric.Frequency, nil - } - return 0, fmt.Errorf("[METRICSTORE]> metric %s not found", metricName) -} + addMetric := func(name string, metric MetricConfig) error { + if metrics == nil { + metrics = make(map[string]MetricConfig, 0) + } -// AddMetric registers a new metric or updates an existing one. -// -// If the metric already exists with a different frequency, uses the higher frequency -// (finer granularity). This handles cases where different clusters report the same -// metric at different intervals. -// -// Parameters: -// - name: Metric name (e.g., "cpu_load") -// - metric: Configuration (frequency, aggregation strategy) -// -// Returns: -// - error: Always nil (signature for future error handling) -func AddMetric(name string, metric MetricConfig) error { - if Metrics == nil { - Metrics = make(map[string]MetricConfig, 0) + if existingMetric, ok := metrics[name]; ok { + if existingMetric.Frequency != metric.Frequency { + if existingMetric.Frequency < metric.Frequency { + existingMetric.Frequency = metric.Frequency + metrics[name] = existingMetric + } + } + } else { + metrics[name] = metric + } + + return nil } - if existingMetric, ok := Metrics[name]; ok { - if existingMetric.Frequency != metric.Frequency { - if existingMetric.Frequency < metric.Frequency { - existingMetric.Frequency = metric.Frequency - Metrics[name] = existingMetric + // Helper function to add metric configuration + addMetricConfig := func(mc *schema.MetricConfig) { + agg, err := AssignAggregationStrategy(mc.Aggregation) + if err != nil { + cclog.Warnf("Could not find aggregation strategy for metric config '%s': %s", mc.Name, err.Error()) + } + + addMetric(mc.Name, MetricConfig{ + Frequency: int64(mc.Timestep), + Aggregation: agg, + }) + } + for _, c := range archive.Clusters { + for _, mc := range c.MetricConfig { + addMetricConfig(mc) + } + + for _, sc := range c.SubClusters { + for _, mc := range sc.MetricConfig { + addMetricConfig(mc) } } - } else { - Metrics[name] = metric } - return nil + return metrics } diff --git a/pkg/metricstore/metricstore.go b/pkg/metricstore/metricstore.go index 0d1f19c9..6246f520 100644 --- a/pkg/metricstore/metricstore.go +++ b/pkg/metricstore/metricstore.go @@ -24,13 +24,13 @@ import ( "context" "encoding/json" "errors" + "fmt" "runtime" "slices" "sync" "time" "github.com/ClusterCockpit/cc-backend/internal/config" - "github.com/ClusterCockpit/cc-backend/pkg/archive" cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" "github.com/ClusterCockpit/cc-lib/v2/resampler" "github.com/ClusterCockpit/cc-lib/v2/schema" @@ -120,7 +120,7 @@ type MemoryStore struct { // // Note: Signal handling must be implemented by the caller. Call Shutdown() when // receiving termination signals to ensure checkpoint data is persisted. -func Init(rawConfig json.RawMessage, wg *sync.WaitGroup) { +func Init(rawConfig json.RawMessage, metrics map[string]MetricConfig, wg *sync.WaitGroup) { startupTime := time.Now() if rawConfig != nil { @@ -138,33 +138,8 @@ func Init(rawConfig json.RawMessage, wg *sync.WaitGroup) { } cclog.Debugf("[METRICSTORE]> Using %d workers for checkpoint/archive operations\n", Keys.NumWorkers) - // Helper function to add metric configuration - addMetricConfig := func(mc *schema.MetricConfig) { - agg, err := AssignAggregationStrategy(mc.Aggregation) - if err != nil { - cclog.Warnf("Could not find aggregation strategy for metric config '%s': %s", mc.Name, err.Error()) - } - - AddMetric(mc.Name, MetricConfig{ - Frequency: int64(mc.Timestep), - Aggregation: agg, - }) - } - - for _, c := range archive.Clusters { - for _, mc := range c.MetricConfig { - addMetricConfig(mc) - } - - for _, sc := range c.SubClusters { - for _, mc := range sc.MetricConfig { - addMetricConfig(mc) - } - } - } - // Pass the config.MetricStoreKeys - InitMetrics(Metrics) + InitMetrics(metrics) ms := GetMemoryStore() @@ -279,6 +254,13 @@ func GetMemoryStore() *MemoryStore { return msInstance } +func (ms *MemoryStore) GetMetricFrequency(metricName string) (int64, error) { + if metric, ok := ms.Metrics[metricName]; ok { + return metric.Frequency, nil + } + return 0, fmt.Errorf("[METRICSTORE]> metric %s not found", metricName) +} + // SetNodeProvider sets the NodeProvider implementation for the MemoryStore. // This must be called during initialization to provide job state information // for selective buffer retention during Free operations. diff --git a/pkg/metricstore/metricstore_test.go b/pkg/metricstore/metricstore_test.go index fd7c963f..90cec2bd 100644 --- a/pkg/metricstore/metricstore_test.go +++ b/pkg/metricstore/metricstore_test.go @@ -38,72 +38,6 @@ func TestAssignAggregationStrategy(t *testing.T) { } } -func TestAddMetric(t *testing.T) { - // Reset Metrics before test - Metrics = make(map[string]MetricConfig) - - err := AddMetric("test_metric", MetricConfig{ - Frequency: 60, - Aggregation: SumAggregation, - }) - if err != nil { - t.Errorf("AddMetric() error = %v", err) - } - - if _, ok := Metrics["test_metric"]; !ok { - t.Error("AddMetric() did not add metric to Metrics map") - } - - // Test updating with higher frequency - err = AddMetric("test_metric", MetricConfig{ - Frequency: 120, - Aggregation: SumAggregation, - }) - if err != nil { - t.Errorf("AddMetric() error = %v", err) - } - - if Metrics["test_metric"].Frequency != 120 { - t.Errorf("AddMetric() frequency = %d, want 120", Metrics["test_metric"].Frequency) - } - - // Test updating with lower frequency (should not update) - err = AddMetric("test_metric", MetricConfig{ - Frequency: 30, - Aggregation: SumAggregation, - }) - if err != nil { - t.Errorf("AddMetric() error = %v", err) - } - - if Metrics["test_metric"].Frequency != 120 { - t.Errorf("AddMetric() frequency = %d, want 120 (should not downgrade)", Metrics["test_metric"].Frequency) - } -} - -func TestGetMetricFrequency(t *testing.T) { - // Reset Metrics before test - Metrics = map[string]MetricConfig{ - "test_metric": { - Frequency: 60, - Aggregation: SumAggregation, - }, - } - - freq, err := GetMetricFrequency("test_metric") - if err != nil { - t.Errorf("GetMetricFrequency() error = %v", err) - } - if freq != 60 { - t.Errorf("GetMetricFrequency() = %d, want 60", freq) - } - - _, err = GetMetricFrequency("nonexistent") - if err == nil { - t.Error("GetMetricFrequency() expected error for nonexistent metric") - } -} - func TestBufferWrite(t *testing.T) { b := newBuffer(100, 10) From 55cb2cb6d647f26f280f24c8b76b5cd9a6c30a88 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Tue, 27 Jan 2026 17:10:26 +0100 Subject: [PATCH 02/10] Prevent file not closed on error in avro checkpoint --- pkg/metricstore/avroCheckpoint.go | 52 ++++++++++++++++--------------- 1 file changed, 27 insertions(+), 25 deletions(-) diff --git a/pkg/metricstore/avroCheckpoint.go b/pkg/metricstore/avroCheckpoint.go index aa14ce5a..14898186 100644 --- a/pkg/metricstore/avroCheckpoint.go +++ b/pkg/metricstore/avroCheckpoint.go @@ -203,6 +203,7 @@ func (l *AvroLevel) toCheckpoint(dir string, from int64, dumpAll bool) error { if err != nil { return fmt.Errorf("failed to open existing avro file: %v", err) } + defer f.Close() br := bufio.NewReader(f) @@ -212,8 +213,6 @@ func (l *AvroLevel) toCheckpoint(dir string, from int64, dumpAll bool) error { } codec = reader.Codec() schema = codec.Schema() - - f.Close() } timeRef := time.Now().Add(time.Duration(-CheckpointBufferMinutes+1) * time.Minute).Unix() @@ -249,31 +248,35 @@ func (l *AvroLevel) toCheckpoint(dir string, from int64, dumpAll bool) error { return fmt.Errorf("failed to compare read and generated schema: %v", err) } if flag && readFlag && !errors.Is(err_, os.ErrNotExist) { - - f.Close() - - f, err = os.Open(filePath) - if err != nil { - return fmt.Errorf("failed to open Avro file: %v", err) - } - - br := bufio.NewReader(f) - - ocfReader, err := goavro.NewOCFReader(br) - if err != nil { - return fmt.Errorf("failed to create OCF reader while changing schema: %v", err) - } - - for ocfReader.Scan() { - record, err := ocfReader.Read() + // Use closure to ensure file is closed even on error + err := func() error { + f2, err := os.Open(filePath) if err != nil { - return fmt.Errorf("failed to read record: %v", err) + return fmt.Errorf("failed to open Avro file: %v", err) + } + defer f2.Close() + + br := bufio.NewReader(f2) + + ocfReader, err := goavro.NewOCFReader(br) + if err != nil { + return fmt.Errorf("failed to create OCF reader while changing schema: %v", err) } - recordList = append(recordList, record.(map[string]any)) - } + for ocfReader.Scan() { + record, err := ocfReader.Read() + if err != nil { + return fmt.Errorf("failed to read record: %v", err) + } - f.Close() + recordList = append(recordList, record.(map[string]any)) + } + + return nil + }() + if err != nil { + return err + } err = os.Remove(filePath) if err != nil { @@ -300,6 +303,7 @@ func (l *AvroLevel) toCheckpoint(dir string, from int64, dumpAll bool) error { if err != nil { return fmt.Errorf("failed to append new avro file: %v", err) } + defer f.Close() // fmt.Printf("Codec : %#v\n", codec) @@ -317,8 +321,6 @@ func (l *AvroLevel) toCheckpoint(dir string, from int64, dumpAll bool) error { return fmt.Errorf("failed to append record: %v", err) } - f.Close() - return nil } From bbde91a1f9ba5cf061b661b2370106d32344e533 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Tue, 27 Jan 2026 17:25:29 +0100 Subject: [PATCH 03/10] Move wg increment inside goroutines. Make GC calls less aggressive --- pkg/metricstore/archive.go | 1 + pkg/metricstore/avroHelper.go | 11 +++-- pkg/metricstore/checkpoint.go | 2 + pkg/metricstore/metricstore.go | 78 ++++++++++++++++------------------ 4 files changed, 44 insertions(+), 48 deletions(-) diff --git a/pkg/metricstore/archive.go b/pkg/metricstore/archive.go index 6abcb183..cab4c24f 100644 --- a/pkg/metricstore/archive.go +++ b/pkg/metricstore/archive.go @@ -49,6 +49,7 @@ func CleanUp(wg *sync.WaitGroup, ctx context.Context) { // runWorker takes simple values to configure what it does func cleanUpWorker(wg *sync.WaitGroup, ctx context.Context, interval string, mode string, cleanupDir string, delete bool) { + wg.Add(1) go func() { defer wg.Done() diff --git a/pkg/metricstore/avroHelper.go b/pkg/metricstore/avroHelper.go index 36594e73..62827afd 100644 --- a/pkg/metricstore/avroHelper.go +++ b/pkg/metricstore/avroHelper.go @@ -15,15 +15,14 @@ import ( ) func DataStaging(wg *sync.WaitGroup, ctx context.Context) { - // AvroPool is a pool of Avro writers. + wg.Add(1) go func() { - if Keys.Checkpoints.FileFormat == "json" { - wg.Done() // Mark this goroutine as done - return // Exit the goroutine - } - defer wg.Done() + if Keys.Checkpoints.FileFormat == "json" { + return + } + ms := GetMemoryStore() var avroLevel *AvroLevel oldSelector := make([]string, 0) diff --git a/pkg/metricstore/checkpoint.go b/pkg/metricstore/checkpoint.go index b90b1c22..c301058b 100644 --- a/pkg/metricstore/checkpoint.go +++ b/pkg/metricstore/checkpoint.go @@ -100,6 +100,7 @@ func Checkpointing(wg *sync.WaitGroup, ctx context.Context) { if Keys.Checkpoints.FileFormat == "json" { ms := GetMemoryStore() + wg.Add(1) go func() { defer wg.Done() d, err := time.ParseDuration(Keys.Checkpoints.Interval) @@ -139,6 +140,7 @@ func Checkpointing(wg *sync.WaitGroup, ctx context.Context) { } }() } else { + wg.Add(1) go func() { defer wg.Done() diff --git a/pkg/metricstore/metricstore.go b/pkg/metricstore/metricstore.go index 6246f520..63161814 100644 --- a/pkg/metricstore/metricstore.go +++ b/pkg/metricstore/metricstore.go @@ -26,6 +26,7 @@ import ( "errors" "fmt" "runtime" + "runtime/debug" "slices" "sync" "time" @@ -168,20 +169,6 @@ func Init(rawConfig json.RawMessage, metrics map[string]MetricConfig, wg *sync.W ctx, shutdown := context.WithCancel(context.Background()) - retentionGoroutines := 1 - checkpointingGoroutines := 1 - dataStagingGoroutines := 1 - archivingGoroutines := 1 - memoryUsageTracker := 1 - - totalGoroutines := retentionGoroutines + - checkpointingGoroutines + - dataStagingGoroutines + - archivingGoroutines + - memoryUsageTracker - - wg.Add(totalGoroutines) - Retention(wg, ctx) Checkpointing(wg, ctx) CleanUp(wg, ctx) @@ -325,6 +312,7 @@ func Shutdown() { func Retention(wg *sync.WaitGroup, ctx context.Context) { ms := GetMemoryStore() + wg.Add(1) go func() { defer wg.Done() d, err := time.ParseDuration(Keys.RetentionInMemory) @@ -370,9 +358,9 @@ func Retention(wg *sync.WaitGroup, ctx context.Context) { // MemoryUsageTracker starts a background goroutine that monitors memory usage. // -// This worker checks memory usage every minute and force-frees buffers if memory -// exceeds the configured cap. It protects against infinite loops by limiting -// iterations and forcing garbage collection between attempts. +// This worker checks memory usage periodically and force-frees buffers if memory +// exceeds the configured cap. It uses FreeOSMemory() to return memory to the OS +// after freeing buffers, avoiding aggressive GC that causes performance issues. // // Parameters: // - wg: WaitGroup to signal completion when context is cancelled @@ -382,6 +370,7 @@ func Retention(wg *sync.WaitGroup, ctx context.Context) { func MemoryUsageTracker(wg *sync.WaitGroup, ctx context.Context) { ms := GetMemoryStore() + wg.Add(1) go func() { defer wg.Done() d := DefaultMemoryUsageTrackerInterval @@ -398,62 +387,67 @@ func MemoryUsageTracker(wg *sync.WaitGroup, ctx context.Context) { case <-ctx.Done(): return case <-ticker.C: - state.mu.RLock() - memoryUsageGB := ms.SizeInGB() - cclog.Infof("[METRICSTORE]> current memory usage: %.2f GB\n", memoryUsageGB) + cclog.Infof("[METRICSTORE]> current memory usage: %.2f GB", memoryUsageGB) - freedTotal := 0 + freedExcluded := 0 + freedEmergency := 0 var err error - // First force-free all the checkpoints that were - if state.lastRetentionTime != 0 && state.selectorsExcluded { - freedTotal, err = ms.Free(nil, state.lastRetentionTime) + state.mu.RLock() + lastRetention := state.lastRetentionTime + selectorsExcluded := state.selectorsExcluded + state.mu.RUnlock() + + if lastRetention != 0 && selectorsExcluded { + freedExcluded, err = ms.Free(nil, lastRetention) if err != nil { cclog.Errorf("[METRICSTORE]> error while force-freeing the excluded buffers: %s", err) } - // Calling runtime.GC() twice in succession tp completely empty a bufferPool (sync.Pool) - runtime.GC() - runtime.GC() - - cclog.Infof("[METRICSTORE]> done: %d excluded buffers force-freed\n", freedTotal) + if freedExcluded > 0 { + debug.FreeOSMemory() + cclog.Infof("[METRICSTORE]> done: %d excluded buffers force-freed", freedExcluded) + } } - state.mu.RUnlock() - memoryUsageGB = ms.SizeInGB() if memoryUsageGB > float64(Keys.MemoryCap) { - cclog.Warnf("[METRICSTORE]> memory usage is still greater than the Memory Cap: %d GB\n", Keys.MemoryCap) - cclog.Warnf("[METRICSTORE]> starting to force-free the buffers from the Metric Store\n") + cclog.Warnf("[METRICSTORE]> memory usage %.2f GB exceeds cap %d GB, starting emergency buffer freeing", memoryUsageGB, Keys.MemoryCap) const maxIterations = 100 - for range maxIterations { - memoryUsageGB = ms.SizeInGB() + for i := 0; i < maxIterations; i++ { if memoryUsageGB < float64(Keys.MemoryCap) { break } freed, err := ms.ForceFree() if err != nil { - cclog.Errorf("[METRICSTORE]> error while force-freeing the buffers: %s", err) + cclog.Errorf("[METRICSTORE]> error while force-freeing buffers: %s", err) } if freed == 0 { - cclog.Errorf("[METRICSTORE]> 0 buffers force-freed in last try, %d total buffers force-freed, memory usage of %.2f GB remains higher than the memory cap of %d GB and there are no buffers left to force-free\n", freedTotal, memoryUsageGB, Keys.MemoryCap) + cclog.Errorf("[METRICSTORE]> no more buffers to free after %d emergency frees, memory usage %.2f GB still exceeds cap %d GB", freedEmergency, memoryUsageGB, Keys.MemoryCap) break } - freedTotal += freed + freedEmergency += freed - runtime.GC() + if i%10 == 0 && freedEmergency > 0 { + memoryUsageGB = ms.SizeInGB() + } } + if freedEmergency > 0 { + debug.FreeOSMemory() + } + + memoryUsageGB = ms.SizeInGB() + if memoryUsageGB >= float64(Keys.MemoryCap) { - cclog.Errorf("[METRICSTORE]> reached maximum iterations (%d) or no more buffers to free, current memory usage: %.2f GB\n", maxIterations, memoryUsageGB) + cclog.Errorf("[METRICSTORE]> after %d emergency frees, memory usage %.2f GB still at/above cap %d GB", freedEmergency, memoryUsageGB, Keys.MemoryCap) } else { - cclog.Infof("[METRICSTORE]> done: %d buffers force-freed\n", freedTotal) - cclog.Infof("[METRICSTORE]> current memory usage after force-freeing the buffers: %.2f GB\n", memoryUsageGB) + cclog.Infof("[METRICSTORE]> emergency freeing complete: %d buffers freed, memory now %.2f GB", freedEmergency, memoryUsageGB) } } From 719aaaff4bb213f99dbc76cde45512c4b217eb49 Mon Sep 17 00:00:00 2001 From: Christoph Kluge Date: Tue, 27 Jan 2026 17:39:16 +0100 Subject: [PATCH 04/10] fix data flipping on doublemetric render --- web/frontend/src/DashPublic.root.svelte | 8 ++++--- .../src/generic/plots/DoubleMetricPlot.svelte | 21 ++++++++++--------- 2 files changed, 16 insertions(+), 13 deletions(-) diff --git a/web/frontend/src/DashPublic.root.svelte b/web/frontend/src/DashPublic.root.svelte index 91f4664c..9c17e7d8 100644 --- a/web/frontend/src/DashPublic.root.svelte +++ b/web/frontend/src/DashPublic.root.svelte @@ -286,6 +286,8 @@ sort((a, b) => b.count - a.count) }); + const sortedClusterMetrics = $derived($statusQuery?.data?.clusterMetrics?.metrics.sort((a, b) => b.name.localeCompare(a.name))); + /* Functions */ function transformNodesStatsToData(subclusterData) { let data = null @@ -516,10 +518,10 @@
Cluster Utilization ( - {`${$statusQuery?.data?.clusterMetrics?.metrics[0]?.name} (${$statusQuery?.data?.clusterMetrics?.metrics[0]?.unit?.prefix}${$statusQuery?.data?.clusterMetrics?.metrics[0]?.unit?.base})`} + {`${sortedClusterMetrics[0]?.name} (${sortedClusterMetrics[0]?.unit?.prefix}${sortedClusterMetrics[0]?.unit?.base})`} , - {`${$statusQuery?.data?.clusterMetrics?.metrics[1]?.name} (${$statusQuery?.data?.clusterMetrics?.metrics[1]?.unit?.prefix}${$statusQuery?.data?.clusterMetrics?.metrics[1]?.unit?.base})`} + {`${sortedClusterMetrics[1]?.name} (${sortedClusterMetrics[1]?.unit?.prefix}${sortedClusterMetrics[1]?.unit?.base})`} )
@@ -528,7 +530,7 @@ diff --git a/web/frontend/src/generic/plots/DoubleMetricPlot.svelte b/web/frontend/src/generic/plots/DoubleMetricPlot.svelte index c865dd68..e94e269d 100644 --- a/web/frontend/src/generic/plots/DoubleMetricPlot.svelte +++ b/web/frontend/src/generic/plots/DoubleMetricPlot.svelte @@ -4,7 +4,7 @@ Only width/height should change reactively. Properties: - - `metricData [Data]`: Two series of metric data including unit info + - `metricData [Data]`: Two series of metric data including unit info, unsorted - `timestep Number`: Data timestep - `numNodes Number`: Number of nodes from which metric data is aggregated - `cluster String`: Cluster name of the parent job / data [Default: ""] @@ -46,10 +46,11 @@ let uplot = $state(null); /* Derived */ + const sortedMetricData = $derived(publicMode ? [...metricData] : metricData.sort((a, b) => b.name.localeCompare(a.name))); // PublicMode: Presorted const maxX = $derived(longestSeries * timestep); const lineWidth = $derived(publicMode ? 2 : clusterCockpitConfig.plotConfiguration_lineWidth / window.devicePixelRatio); const longestSeries = $derived.by(() => { - return metricData.reduce((n, m) => Math.max(n, m.data.length), 0); + return sortedMetricData.reduce((n, m) => Math.max(n, m.data.length), 0); }); // Derive Plot Params @@ -68,8 +69,8 @@ }; }; // Y - for (let i = 0; i < metricData.length; i++) { - pendingData.push(metricData[i]?.data); + for (let i = 0; i < sortedMetricData.length; i++) { + pendingData.push(sortedMetricData[i]?.data); }; return pendingData; }) @@ -84,9 +85,9 @@ } ]; // Y - for (let i = 0; i < metricData.length; i++) { + for (let i = 0; i < sortedMetricData.length; i++) { pendingSeries.push({ - label: publicMode ? null : `${metricData[i]?.name} (${metricData[i]?.unit?.prefix}${metricData[i]?.unit?.base})`, + label: publicMode ? null : `${sortedMetricData[i]?.name} (${sortedMetricData[i]?.unit?.prefix}${sortedMetricData[i]?.unit?.base})`, scale: `y${i+1}`, width: lineWidth, stroke: fixedLineColors[i], @@ -156,9 +157,9 @@ // X baseOpts.axes[0].label = 'Time'; // Y1 - baseOpts.axes[1].label = `${metricData[0]?.name} (${metricData[0]?.unit?.prefix}${metricData[0]?.unit?.base})`; + baseOpts.axes[1].label = `${sortedMetricData[0]?.name} (${sortedMetricData[0]?.unit?.prefix}${sortedMetricData[0]?.unit?.base})`; // Y2 - baseOpts.axes[2].label = `${metricData[1]?.name} (${metricData[1]?.unit?.prefix}${metricData[1]?.unit?.base})`; + baseOpts.axes[2].label = `${sortedMetricData[1]?.name} (${sortedMetricData[1]?.unit?.prefix}${sortedMetricData[1]?.unit?.base})`; baseOpts.hooks.draw = [ (u) => { // Draw plot type label: @@ -212,7 +213,7 @@ style = { backgroundColor: "rgba(255, 249, 196, 0.92)", color: "black" }, } = {}) { let legendEl; - const dataSize = metricData.length; + const dataSize = sortedMetricData.length; function init(u, opts) { legendEl = u.root.querySelector(".u-legend"); @@ -311,7 +312,7 @@ -{#if metricData[0]?.data && metricData[0]?.data?.length > 0} +{#if sortedMetricData[0]?.data && sortedMetricData[0]?.data?.length > 0}
From 9d15a87c88b46ba519b599ddea5b619cb554a3fe Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Tue, 27 Jan 2026 18:23:09 +0100 Subject: [PATCH 05/10] Take into account the real allocated heap memory in MemoryUsageTracker --- pkg/metricstore/metricstore.go | 42 +++++++++++++++++++++------------- 1 file changed, 26 insertions(+), 16 deletions(-) diff --git a/pkg/metricstore/metricstore.go b/pkg/metricstore/metricstore.go index 63161814..25f4547e 100644 --- a/pkg/metricstore/metricstore.go +++ b/pkg/metricstore/metricstore.go @@ -358,9 +358,13 @@ func Retention(wg *sync.WaitGroup, ctx context.Context) { // MemoryUsageTracker starts a background goroutine that monitors memory usage. // -// This worker checks memory usage periodically and force-frees buffers if memory -// exceeds the configured cap. It uses FreeOSMemory() to return memory to the OS -// after freeing buffers, avoiding aggressive GC that causes performance issues. +// This worker checks actual process memory usage (via runtime.MemStats) periodically +// and force-frees buffers if memory exceeds the configured cap. It uses FreeOSMemory() +// to return memory to the OS after freeing buffers, avoiding aggressive GC that causes +// performance issues. +// +// The tracker logs both actual memory usage (heap allocated) and metric data size for +// visibility into memory overhead from Go runtime structures and allocations. // // Parameters: // - wg: WaitGroup to signal completion when context is cancelled @@ -387,8 +391,11 @@ func MemoryUsageTracker(wg *sync.WaitGroup, ctx context.Context) { case <-ctx.Done(): return case <-ticker.C: - memoryUsageGB := ms.SizeInGB() - cclog.Infof("[METRICSTORE]> current memory usage: %.2f GB", memoryUsageGB) + var mem runtime.MemStats + runtime.ReadMemStats(&mem) + actualMemoryGB := float64(mem.Alloc) / 1e9 + metricDataGB := ms.SizeInGB() + cclog.Infof("[METRICSTORE]> memory usage: %.2f GB actual (%.2f GB metric data)", actualMemoryGB, metricDataGB) freedExcluded := 0 freedEmergency := 0 @@ -411,15 +418,16 @@ func MemoryUsageTracker(wg *sync.WaitGroup, ctx context.Context) { } } - memoryUsageGB = ms.SizeInGB() + runtime.ReadMemStats(&mem) + actualMemoryGB = float64(mem.Alloc) / 1e9 - if memoryUsageGB > float64(Keys.MemoryCap) { - cclog.Warnf("[METRICSTORE]> memory usage %.2f GB exceeds cap %d GB, starting emergency buffer freeing", memoryUsageGB, Keys.MemoryCap) + if actualMemoryGB > float64(Keys.MemoryCap) { + cclog.Warnf("[METRICSTORE]> memory usage %.2f GB exceeds cap %d GB, starting emergency buffer freeing", actualMemoryGB, Keys.MemoryCap) const maxIterations = 100 - for i := 0; i < maxIterations; i++ { - if memoryUsageGB < float64(Keys.MemoryCap) { + for i := range maxIterations { + if actualMemoryGB < float64(Keys.MemoryCap) { break } @@ -428,13 +436,14 @@ func MemoryUsageTracker(wg *sync.WaitGroup, ctx context.Context) { cclog.Errorf("[METRICSTORE]> error while force-freeing buffers: %s", err) } if freed == 0 { - cclog.Errorf("[METRICSTORE]> no more buffers to free after %d emergency frees, memory usage %.2f GB still exceeds cap %d GB", freedEmergency, memoryUsageGB, Keys.MemoryCap) + cclog.Errorf("[METRICSTORE]> no more buffers to free after %d emergency frees, memory usage %.2f GB still exceeds cap %d GB", freedEmergency, actualMemoryGB, Keys.MemoryCap) break } freedEmergency += freed if i%10 == 0 && freedEmergency > 0 { - memoryUsageGB = ms.SizeInGB() + runtime.ReadMemStats(&mem) + actualMemoryGB = float64(mem.Alloc) / 1e9 } } @@ -442,12 +451,13 @@ func MemoryUsageTracker(wg *sync.WaitGroup, ctx context.Context) { debug.FreeOSMemory() } - memoryUsageGB = ms.SizeInGB() + runtime.ReadMemStats(&mem) + actualMemoryGB = float64(mem.Alloc) / 1e9 - if memoryUsageGB >= float64(Keys.MemoryCap) { - cclog.Errorf("[METRICSTORE]> after %d emergency frees, memory usage %.2f GB still at/above cap %d GB", freedEmergency, memoryUsageGB, Keys.MemoryCap) + if actualMemoryGB >= float64(Keys.MemoryCap) { + cclog.Errorf("[METRICSTORE]> after %d emergency frees, memory usage %.2f GB still at/above cap %d GB", freedEmergency, actualMemoryGB, Keys.MemoryCap) } else { - cclog.Infof("[METRICSTORE]> emergency freeing complete: %d buffers freed, memory now %.2f GB", freedEmergency, memoryUsageGB) + cclog.Infof("[METRICSTORE]> emergency freeing complete: %d buffers freed, memory now %.2f GB", freedEmergency, actualMemoryGB) } } From 9d9babe94ddd3f4aa793b1b9d2c5e5bd1706bac5 Mon Sep 17 00:00:00 2001 From: Christoph Kluge Date: Tue, 27 Jan 2026 19:04:29 +0100 Subject: [PATCH 06/10] review clusterMetrics aggregation handling, fixes index error --- internal/graph/schema.resolvers.go | 36 +++++++++++++++++------------- 1 file changed, 21 insertions(+), 15 deletions(-) diff --git a/internal/graph/schema.resolvers.go b/internal/graph/schema.resolvers.go index 3e142f9a..9bc8811d 100644 --- a/internal/graph/schema.resolvers.go +++ b/internal/graph/schema.resolvers.go @@ -905,26 +905,32 @@ func (r *queryResolver) ClusterMetrics(ctx context.Context, cluster string, metr for _, metrics := range data { clusterMetrics.NodeCount += 1 for metric, scopedMetrics := range metrics { - _, ok := collectorData[metric] - if !ok { - collectorData[metric] = make([]schema.Float, 0) - for _, scopedMetric := range scopedMetrics { - // Collect Info + for _, scopedMetric := range scopedMetrics { + // Collect Info Once + _, okTimestep := collectorTimestep[metric] + if !okTimestep { collectorTimestep[metric] = scopedMetric.Timestep - collectorUnit[metric] = scopedMetric.Unit - // Collect Initial Data - for _, ser := range scopedMetric.Series { - collectorData[metric] = append(collectorData[metric], ser.Data...) - } } - } else { - // Sum up values by index - for _, scopedMetric := range scopedMetrics { - // For This Purpose (Cluster_Wide-Sum of Node Metrics) OK - for _, ser := range scopedMetric.Series { + _, okUnit := collectorUnit[metric] + if !okUnit { + collectorUnit[metric] = scopedMetric.Unit + } + // Collect Data + for _, ser := range scopedMetric.Series { + _, okData := collectorData[metric] + // Init With Datasize > 0 + if !okData && len(ser.Data) != 0 { + collectorData[metric] = make([]schema.Float, len(ser.Data)) + } else if !okData { + cclog.Debugf("ClusterMetrics Skip Init: No Data -> %s at %s; Size %d", metric, ser.Hostname, len(ser.Data)) + } + // Sum if init'd and matching size + if okData && len(ser.Data) == len(collectorData[metric]) { for i, val := range ser.Data { collectorData[metric][i] += val } + } else if okData { + cclog.Debugf("ClusterMetrics Skip Sum: Data Diff -> %s at %s; Want Size %d, Have Size %d", metric, ser.Hostname, len(collectorData[metric]), len(ser.Data)) } } } From 95689e3c99dedf386de873812718fa647d4f5048 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Wed, 28 Jan 2026 07:05:29 +0100 Subject: [PATCH 07/10] Add API endpoint for getUsedNodes Needed by dynamic memory management for external ccms --- internal/api/api_test.go | 34 +++++++++++++++++++++++++ internal/api/job.go | 54 ++++++++++++++++++++++++++++++++++++++++ internal/api/rest.go | 5 ++-- 3 files changed, 91 insertions(+), 2 deletions(-) diff --git a/internal/api/api_test.go b/internal/api/api_test.go index 4a7fc07c..8cbf95d7 100644 --- a/internal/api/api_test.go +++ b/internal/api/api_test.go @@ -455,4 +455,38 @@ func TestRestApi(t *testing.T) { if !ok { t.Fatal("subtest failed") } + + t.Run("GetUsedNodesNoRunning", func(t *testing.T) { + contextUserValue := &schema.User{ + Username: "testuser", + Projects: make([]string, 0), + Roles: []string{"api"}, + AuthType: 0, + AuthSource: 2, + } + + req := httptest.NewRequest(http.MethodGet, "/jobs/used_nodes?ts=123456790", nil) + recorder := httptest.NewRecorder() + + ctx := context.WithValue(req.Context(), contextUserKey, contextUserValue) + + r.ServeHTTP(recorder, req.WithContext(ctx)) + response := recorder.Result() + if response.StatusCode != http.StatusOK { + t.Fatal(response.Status, recorder.Body.String()) + } + + var result api.GetUsedNodesAPIResponse + if err := json.NewDecoder(response.Body).Decode(&result); err != nil { + t.Fatal(err) + } + + if result.UsedNodes == nil { + t.Fatal("expected usedNodes to be non-nil") + } + + if len(result.UsedNodes) != 0 { + t.Fatalf("expected no used nodes for stopped jobs, got: %v", result.UsedNodes) + } + }) } diff --git a/internal/api/job.go b/internal/api/job.go index 1b1e05d6..64f6a92c 100644 --- a/internal/api/job.go +++ b/internal/api/job.go @@ -1021,3 +1021,57 @@ func (api *RestAPI) getJobMetrics(rw http.ResponseWriter, r *http.Request) { cclog.Errorf("Failed to encode response: %v", err) } } + +// GetUsedNodesAPIResponse model +type GetUsedNodesAPIResponse struct { + UsedNodes map[string][]string `json:"usedNodes"` // Map of cluster names to lists of used node hostnames +} + +// getUsedNodes godoc +// @summary Lists used nodes by cluster +// @tags Job query +// @description Get a map of cluster names to lists of unique hostnames that are currently in use by running jobs that started before the specified timestamp. +// @produce json +// @param ts query int true "Unix timestamp to filter jobs (jobs with start_time < ts)" +// @success 200 {object} api.GetUsedNodesAPIResponse "Map of cluster names to hostname lists" +// @failure 400 {object} api.ErrorResponse "Bad Request" +// @failure 401 {object} api.ErrorResponse "Unauthorized" +// @failure 403 {object} api.ErrorResponse "Forbidden" +// @failure 500 {object} api.ErrorResponse "Internal Server Error" +// @security ApiKeyAuth +// @router /api/jobs/used_nodes [get] +func (api *RestAPI) getUsedNodes(rw http.ResponseWriter, r *http.Request) { + if user := repository.GetUserFromContext(r.Context()); user != nil && + !user.HasRole(schema.RoleApi) { + handleError(fmt.Errorf("missing role: %v", schema.GetRoleString(schema.RoleApi)), http.StatusForbidden, rw) + return + } + + tsStr := r.URL.Query().Get("ts") + if tsStr == "" { + handleError(fmt.Errorf("missing required query parameter: ts"), http.StatusBadRequest, rw) + return + } + + ts, err := strconv.ParseInt(tsStr, 10, 64) + if err != nil { + handleError(fmt.Errorf("invalid timestamp format: %w", err), http.StatusBadRequest, rw) + return + } + + usedNodes, err := api.JobRepository.GetUsedNodes(ts) + if err != nil { + handleError(fmt.Errorf("failed to get used nodes: %w", err), http.StatusInternalServerError, rw) + return + } + + rw.Header().Add("Content-Type", "application/json") + payload := GetUsedNodesAPIResponse{ + UsedNodes: usedNodes, + } + + if err := json.NewEncoder(rw).Encode(payload); err != nil { + handleError(err, http.StatusInternalServerError, rw) + return + } +} diff --git a/internal/api/rest.go b/internal/api/rest.go index c0fa7c2a..0d52742e 100644 --- a/internal/api/rest.go +++ b/internal/api/rest.go @@ -89,8 +89,7 @@ func (api *RestAPI) MountAPIRoutes(r *mux.Router) { r.HandleFunc("/jobs/stop_job/", api.stopJobByRequest).Methods(http.MethodPost, http.MethodPut) } r.HandleFunc("/jobs/", api.getJobs).Methods(http.MethodGet) - r.HandleFunc("/jobs/{id}", api.getJobByID).Methods(http.MethodPost) - r.HandleFunc("/jobs/{id}", api.getCompleteJobByID).Methods(http.MethodGet) + r.HandleFunc("/jobs/used_nodes", api.getUsedNodes).Methods(http.MethodGet) r.HandleFunc("/jobs/tag_job/{id}", api.tagJob).Methods(http.MethodPost, http.MethodPatch) r.HandleFunc("/jobs/tag_job/{id}", api.removeTagJob).Methods(http.MethodDelete) r.HandleFunc("/jobs/edit_meta/{id}", api.editMeta).Methods(http.MethodPost, http.MethodPatch) @@ -98,6 +97,8 @@ func (api *RestAPI) MountAPIRoutes(r *mux.Router) { r.HandleFunc("/jobs/delete_job/", api.deleteJobByRequest).Methods(http.MethodDelete) r.HandleFunc("/jobs/delete_job/{id}", api.deleteJobByID).Methods(http.MethodDelete) r.HandleFunc("/jobs/delete_job_before/{ts}", api.deleteJobBefore).Methods(http.MethodDelete) + r.HandleFunc("/jobs/{id}", api.getJobByID).Methods(http.MethodPost) + r.HandleFunc("/jobs/{id}", api.getCompleteJobByID).Methods(http.MethodGet) r.HandleFunc("/tags/", api.removeTags).Methods(http.MethodDelete) From 98661aad15f1554400819c44b36714e1bc1f42db Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Wed, 28 Jan 2026 10:41:44 +0100 Subject: [PATCH 08/10] Increase default GC frequency --- cmd/cc-backend/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/cc-backend/main.go b/cmd/cc-backend/main.go index deefce34..3c70a960 100644 --- a/cmd/cc-backend/main.go +++ b/cmd/cc-backend/main.go @@ -399,7 +399,7 @@ func runServer(ctx context.Context) error { // Set GC percent if not configured if os.Getenv(envGOGC) == "" { - debug.SetGCPercent(25) + debug.SetGCPercent(15) } runtime.SystemdNotify(true, "running") From eb5aa9ad02eef1b73caddaf709d7cacb65f09a7e Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Wed, 28 Jan 2026 11:21:02 +0100 Subject: [PATCH 09/10] Disable explicit GC calls --- pkg/metricstore/checkpoint.go | 17 ++++++++--------- pkg/metricstore/metricstore.go | 7 +++---- 2 files changed, 11 insertions(+), 13 deletions(-) diff --git a/pkg/metricstore/checkpoint.go b/pkg/metricstore/checkpoint.go index c301058b..715566e4 100644 --- a/pkg/metricstore/checkpoint.go +++ b/pkg/metricstore/checkpoint.go @@ -43,7 +43,6 @@ import ( "os" "path" "path/filepath" - "runtime" "sort" "strconv" "strings" @@ -396,14 +395,14 @@ func enqueueCheckpointHosts(dir string, work chan<- [2]string) error { } gcCounter++ - if gcCounter%GCTriggerInterval == 0 { - // Forcing garbage collection runs here regulary during the loading of checkpoints - // will decrease the total heap size after loading everything back to memory is done. - // While loading data, the heap will grow fast, so the GC target size will double - // almost always. By forcing GCs here, we can keep it growing more slowly so that - // at the end, less memory is wasted. - runtime.GC() - } + // if gcCounter%GCTriggerInterval == 0 { + // Forcing garbage collection runs here regulary during the loading of checkpoints + // will decrease the total heap size after loading everything back to memory is done. + // While loading data, the heap will grow fast, so the GC target size will double + // almost always. By forcing GCs here, we can keep it growing more slowly so that + // at the end, less memory is wasted. + // runtime.GC() + // } work <- [2]string{clusterDir.Name(), hostDir.Name()} } diff --git a/pkg/metricstore/metricstore.go b/pkg/metricstore/metricstore.go index 25f4547e..38ff9015 100644 --- a/pkg/metricstore/metricstore.go +++ b/pkg/metricstore/metricstore.go @@ -447,9 +447,9 @@ func MemoryUsageTracker(wg *sync.WaitGroup, ctx context.Context) { } } - if freedEmergency > 0 { - debug.FreeOSMemory() - } + // if freedEmergency > 0 { + // debug.FreeOSMemory() + // } runtime.ReadMemStats(&mem) actualMemoryGB = float64(mem.Alloc) / 1e9 @@ -460,7 +460,6 @@ func MemoryUsageTracker(wg *sync.WaitGroup, ctx context.Context) { cclog.Infof("[METRICSTORE]> emergency freeing complete: %d buffers freed, memory now %.2f GB", freedEmergency, actualMemoryGB) } } - } } }() From 0d857b49a25692e8e1259e7ed6400dce7da202ab Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Wed, 28 Jan 2026 11:21:27 +0100 Subject: [PATCH 10/10] Disable explicit GC calls --- pkg/metricstore/metricstore.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/metricstore/metricstore.go b/pkg/metricstore/metricstore.go index 38ff9015..617e945e 100644 --- a/pkg/metricstore/metricstore.go +++ b/pkg/metricstore/metricstore.go @@ -165,7 +165,7 @@ func Init(rawConfig json.RawMessage, metrics map[string]MetricConfig, wg *sync.W // previously active heap, a GC is triggered. // Forcing a GC here will set the "previously active heap" // to a minumum. - runtime.GC() + // runtime.GC() ctx, shutdown := context.WithCancel(context.Background())