From 752e19c27663a7e18974a0aad88f8a7f61fca7f6 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Tue, 27 Jan 2026 17:06:52 +0100 Subject: [PATCH 1/8] Pull out metric List build from metricstore Init --- cmd/cc-backend/main.go | 3 +- pkg/metricstore/avroHelper.go | 5 +- pkg/metricstore/config.go | 86 ++++++++++++++--------------- pkg/metricstore/metricstore.go | 38 ++++--------- pkg/metricstore/metricstore_test.go | 66 ---------------------- 5 files changed, 58 insertions(+), 140 deletions(-) diff --git a/cmd/cc-backend/main.go b/cmd/cc-backend/main.go index 9ded95ba..deefce34 100644 --- a/cmd/cc-backend/main.go +++ b/cmd/cc-backend/main.go @@ -321,7 +321,8 @@ func runServer(ctx context.Context) error { haveMetricstore := false mscfg := ccconf.GetPackageConfig("metric-store") if mscfg != nil { - metricstore.Init(mscfg, &wg) + metrics := metricstore.BuildMetricList() + metricstore.Init(mscfg, metrics, &wg) // Inject repository as NodeProvider to break import cycle ms := metricstore.GetMemoryStore() diff --git a/pkg/metricstore/avroHelper.go b/pkg/metricstore/avroHelper.go index 985fdd78..36594e73 100644 --- a/pkg/metricstore/avroHelper.go +++ b/pkg/metricstore/avroHelper.go @@ -24,6 +24,7 @@ func DataStaging(wg *sync.WaitGroup, ctx context.Context) { defer wg.Done() + ms := GetMemoryStore() var avroLevel *AvroLevel oldSelector := make([]string, 0) @@ -39,7 +40,7 @@ func DataStaging(wg *sync.WaitGroup, ctx context.Context) { return } // Process remaining message - freq, err := GetMetricFrequency(val.MetricName) + freq, err := ms.GetMetricFrequency(val.MetricName) if err != nil { continue } @@ -76,7 +77,7 @@ func DataStaging(wg *sync.WaitGroup, ctx context.Context) { } // Fetch the frequency of the metric from the global configuration - freq, err := GetMetricFrequency(val.MetricName) + freq, err := ms.GetMetricFrequency(val.MetricName) if err != nil { cclog.Errorf("Error fetching metric frequency: %s\n", err) continue diff --git a/pkg/metricstore/config.go b/pkg/metricstore/config.go index 44a24f7d..69ee3563 100644 --- a/pkg/metricstore/config.go +++ b/pkg/metricstore/config.go @@ -45,6 +45,10 @@ package metricstore import ( "fmt" "time" + + "github.com/ClusterCockpit/cc-backend/pkg/archive" + cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" + "github.com/ClusterCockpit/cc-lib/v2/schema" ) const ( @@ -207,55 +211,51 @@ type MetricConfig struct { offset int } -// Metrics is the global map of metric configurations. -// -// Keyed by metric name (e.g., "cpu_load", "mem_used"). Populated during Init() -// from cluster configuration and checkpoint restoration. Each MetricConfig.offset -// corresponds to the buffer slice index in Level.metrics. -var Metrics map[string]MetricConfig +func BuildMetricList() map[string]MetricConfig { + var metrics map[string]MetricConfig = make(map[string]MetricConfig) -// GetMetricFrequency retrieves the measurement interval for a metric. -// -// Parameters: -// - metricName: Metric name (e.g., "cpu_load") -// -// Returns: -// - int64: Frequency in seconds -// - error: Non-nil if metric not found in Metrics map -func GetMetricFrequency(metricName string) (int64, error) { - if metric, ok := Metrics[metricName]; ok { - return metric.Frequency, nil - } - return 0, fmt.Errorf("[METRICSTORE]> metric %s not found", metricName) -} + addMetric := func(name string, metric MetricConfig) error { + if metrics == nil { + metrics = make(map[string]MetricConfig, 0) + } -// AddMetric registers a new metric or updates an existing one. -// -// If the metric already exists with a different frequency, uses the higher frequency -// (finer granularity). This handles cases where different clusters report the same -// metric at different intervals. -// -// Parameters: -// - name: Metric name (e.g., "cpu_load") -// - metric: Configuration (frequency, aggregation strategy) -// -// Returns: -// - error: Always nil (signature for future error handling) -func AddMetric(name string, metric MetricConfig) error { - if Metrics == nil { - Metrics = make(map[string]MetricConfig, 0) + if existingMetric, ok := metrics[name]; ok { + if existingMetric.Frequency != metric.Frequency { + if existingMetric.Frequency < metric.Frequency { + existingMetric.Frequency = metric.Frequency + metrics[name] = existingMetric + } + } + } else { + metrics[name] = metric + } + + return nil } - if existingMetric, ok := Metrics[name]; ok { - if existingMetric.Frequency != metric.Frequency { - if existingMetric.Frequency < metric.Frequency { - existingMetric.Frequency = metric.Frequency - Metrics[name] = existingMetric + // Helper function to add metric configuration + addMetricConfig := func(mc *schema.MetricConfig) { + agg, err := AssignAggregationStrategy(mc.Aggregation) + if err != nil { + cclog.Warnf("Could not find aggregation strategy for metric config '%s': %s", mc.Name, err.Error()) + } + + addMetric(mc.Name, MetricConfig{ + Frequency: int64(mc.Timestep), + Aggregation: agg, + }) + } + for _, c := range archive.Clusters { + for _, mc := range c.MetricConfig { + addMetricConfig(mc) + } + + for _, sc := range c.SubClusters { + for _, mc := range sc.MetricConfig { + addMetricConfig(mc) } } - } else { - Metrics[name] = metric } - return nil + return metrics } diff --git a/pkg/metricstore/metricstore.go b/pkg/metricstore/metricstore.go index 0d1f19c9..6246f520 100644 --- a/pkg/metricstore/metricstore.go +++ b/pkg/metricstore/metricstore.go @@ -24,13 +24,13 @@ import ( "context" "encoding/json" "errors" + "fmt" "runtime" "slices" "sync" "time" "github.com/ClusterCockpit/cc-backend/internal/config" - "github.com/ClusterCockpit/cc-backend/pkg/archive" cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" "github.com/ClusterCockpit/cc-lib/v2/resampler" "github.com/ClusterCockpit/cc-lib/v2/schema" @@ -120,7 +120,7 @@ type MemoryStore struct { // // Note: Signal handling must be implemented by the caller. Call Shutdown() when // receiving termination signals to ensure checkpoint data is persisted. -func Init(rawConfig json.RawMessage, wg *sync.WaitGroup) { +func Init(rawConfig json.RawMessage, metrics map[string]MetricConfig, wg *sync.WaitGroup) { startupTime := time.Now() if rawConfig != nil { @@ -138,33 +138,8 @@ func Init(rawConfig json.RawMessage, wg *sync.WaitGroup) { } cclog.Debugf("[METRICSTORE]> Using %d workers for checkpoint/archive operations\n", Keys.NumWorkers) - // Helper function to add metric configuration - addMetricConfig := func(mc *schema.MetricConfig) { - agg, err := AssignAggregationStrategy(mc.Aggregation) - if err != nil { - cclog.Warnf("Could not find aggregation strategy for metric config '%s': %s", mc.Name, err.Error()) - } - - AddMetric(mc.Name, MetricConfig{ - Frequency: int64(mc.Timestep), - Aggregation: agg, - }) - } - - for _, c := range archive.Clusters { - for _, mc := range c.MetricConfig { - addMetricConfig(mc) - } - - for _, sc := range c.SubClusters { - for _, mc := range sc.MetricConfig { - addMetricConfig(mc) - } - } - } - // Pass the config.MetricStoreKeys - InitMetrics(Metrics) + InitMetrics(metrics) ms := GetMemoryStore() @@ -279,6 +254,13 @@ func GetMemoryStore() *MemoryStore { return msInstance } +func (ms *MemoryStore) GetMetricFrequency(metricName string) (int64, error) { + if metric, ok := ms.Metrics[metricName]; ok { + return metric.Frequency, nil + } + return 0, fmt.Errorf("[METRICSTORE]> metric %s not found", metricName) +} + // SetNodeProvider sets the NodeProvider implementation for the MemoryStore. // This must be called during initialization to provide job state information // for selective buffer retention during Free operations. diff --git a/pkg/metricstore/metricstore_test.go b/pkg/metricstore/metricstore_test.go index fd7c963f..90cec2bd 100644 --- a/pkg/metricstore/metricstore_test.go +++ b/pkg/metricstore/metricstore_test.go @@ -38,72 +38,6 @@ func TestAssignAggregationStrategy(t *testing.T) { } } -func TestAddMetric(t *testing.T) { - // Reset Metrics before test - Metrics = make(map[string]MetricConfig) - - err := AddMetric("test_metric", MetricConfig{ - Frequency: 60, - Aggregation: SumAggregation, - }) - if err != nil { - t.Errorf("AddMetric() error = %v", err) - } - - if _, ok := Metrics["test_metric"]; !ok { - t.Error("AddMetric() did not add metric to Metrics map") - } - - // Test updating with higher frequency - err = AddMetric("test_metric", MetricConfig{ - Frequency: 120, - Aggregation: SumAggregation, - }) - if err != nil { - t.Errorf("AddMetric() error = %v", err) - } - - if Metrics["test_metric"].Frequency != 120 { - t.Errorf("AddMetric() frequency = %d, want 120", Metrics["test_metric"].Frequency) - } - - // Test updating with lower frequency (should not update) - err = AddMetric("test_metric", MetricConfig{ - Frequency: 30, - Aggregation: SumAggregation, - }) - if err != nil { - t.Errorf("AddMetric() error = %v", err) - } - - if Metrics["test_metric"].Frequency != 120 { - t.Errorf("AddMetric() frequency = %d, want 120 (should not downgrade)", Metrics["test_metric"].Frequency) - } -} - -func TestGetMetricFrequency(t *testing.T) { - // Reset Metrics before test - Metrics = map[string]MetricConfig{ - "test_metric": { - Frequency: 60, - Aggregation: SumAggregation, - }, - } - - freq, err := GetMetricFrequency("test_metric") - if err != nil { - t.Errorf("GetMetricFrequency() error = %v", err) - } - if freq != 60 { - t.Errorf("GetMetricFrequency() = %d, want 60", freq) - } - - _, err = GetMetricFrequency("nonexistent") - if err == nil { - t.Error("GetMetricFrequency() expected error for nonexistent metric") - } -} - func TestBufferWrite(t *testing.T) { b := newBuffer(100, 10) From 55cb2cb6d647f26f280f24c8b76b5cd9a6c30a88 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Tue, 27 Jan 2026 17:10:26 +0100 Subject: [PATCH 2/8] Prevent file not closed on error in avro checkpoint --- pkg/metricstore/avroCheckpoint.go | 52 ++++++++++++++++--------------- 1 file changed, 27 insertions(+), 25 deletions(-) diff --git a/pkg/metricstore/avroCheckpoint.go b/pkg/metricstore/avroCheckpoint.go index aa14ce5a..14898186 100644 --- a/pkg/metricstore/avroCheckpoint.go +++ b/pkg/metricstore/avroCheckpoint.go @@ -203,6 +203,7 @@ func (l *AvroLevel) toCheckpoint(dir string, from int64, dumpAll bool) error { if err != nil { return fmt.Errorf("failed to open existing avro file: %v", err) } + defer f.Close() br := bufio.NewReader(f) @@ -212,8 +213,6 @@ func (l *AvroLevel) toCheckpoint(dir string, from int64, dumpAll bool) error { } codec = reader.Codec() schema = codec.Schema() - - f.Close() } timeRef := time.Now().Add(time.Duration(-CheckpointBufferMinutes+1) * time.Minute).Unix() @@ -249,31 +248,35 @@ func (l *AvroLevel) toCheckpoint(dir string, from int64, dumpAll bool) error { return fmt.Errorf("failed to compare read and generated schema: %v", err) } if flag && readFlag && !errors.Is(err_, os.ErrNotExist) { - - f.Close() - - f, err = os.Open(filePath) - if err != nil { - return fmt.Errorf("failed to open Avro file: %v", err) - } - - br := bufio.NewReader(f) - - ocfReader, err := goavro.NewOCFReader(br) - if err != nil { - return fmt.Errorf("failed to create OCF reader while changing schema: %v", err) - } - - for ocfReader.Scan() { - record, err := ocfReader.Read() + // Use closure to ensure file is closed even on error + err := func() error { + f2, err := os.Open(filePath) if err != nil { - return fmt.Errorf("failed to read record: %v", err) + return fmt.Errorf("failed to open Avro file: %v", err) + } + defer f2.Close() + + br := bufio.NewReader(f2) + + ocfReader, err := goavro.NewOCFReader(br) + if err != nil { + return fmt.Errorf("failed to create OCF reader while changing schema: %v", err) } - recordList = append(recordList, record.(map[string]any)) - } + for ocfReader.Scan() { + record, err := ocfReader.Read() + if err != nil { + return fmt.Errorf("failed to read record: %v", err) + } - f.Close() + recordList = append(recordList, record.(map[string]any)) + } + + return nil + }() + if err != nil { + return err + } err = os.Remove(filePath) if err != nil { @@ -300,6 +303,7 @@ func (l *AvroLevel) toCheckpoint(dir string, from int64, dumpAll bool) error { if err != nil { return fmt.Errorf("failed to append new avro file: %v", err) } + defer f.Close() // fmt.Printf("Codec : %#v\n", codec) @@ -317,8 +321,6 @@ func (l *AvroLevel) toCheckpoint(dir string, from int64, dumpAll bool) error { return fmt.Errorf("failed to append record: %v", err) } - f.Close() - return nil } From bbde91a1f9ba5cf061b661b2370106d32344e533 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Tue, 27 Jan 2026 17:25:29 +0100 Subject: [PATCH 3/8] Move wg increment inside goroutines. Make GC calls less aggressive --- pkg/metricstore/archive.go | 1 + pkg/metricstore/avroHelper.go | 11 +++-- pkg/metricstore/checkpoint.go | 2 + pkg/metricstore/metricstore.go | 78 ++++++++++++++++------------------ 4 files changed, 44 insertions(+), 48 deletions(-) diff --git a/pkg/metricstore/archive.go b/pkg/metricstore/archive.go index 6abcb183..cab4c24f 100644 --- a/pkg/metricstore/archive.go +++ b/pkg/metricstore/archive.go @@ -49,6 +49,7 @@ func CleanUp(wg *sync.WaitGroup, ctx context.Context) { // runWorker takes simple values to configure what it does func cleanUpWorker(wg *sync.WaitGroup, ctx context.Context, interval string, mode string, cleanupDir string, delete bool) { + wg.Add(1) go func() { defer wg.Done() diff --git a/pkg/metricstore/avroHelper.go b/pkg/metricstore/avroHelper.go index 36594e73..62827afd 100644 --- a/pkg/metricstore/avroHelper.go +++ b/pkg/metricstore/avroHelper.go @@ -15,15 +15,14 @@ import ( ) func DataStaging(wg *sync.WaitGroup, ctx context.Context) { - // AvroPool is a pool of Avro writers. + wg.Add(1) go func() { - if Keys.Checkpoints.FileFormat == "json" { - wg.Done() // Mark this goroutine as done - return // Exit the goroutine - } - defer wg.Done() + if Keys.Checkpoints.FileFormat == "json" { + return + } + ms := GetMemoryStore() var avroLevel *AvroLevel oldSelector := make([]string, 0) diff --git a/pkg/metricstore/checkpoint.go b/pkg/metricstore/checkpoint.go index b90b1c22..c301058b 100644 --- a/pkg/metricstore/checkpoint.go +++ b/pkg/metricstore/checkpoint.go @@ -100,6 +100,7 @@ func Checkpointing(wg *sync.WaitGroup, ctx context.Context) { if Keys.Checkpoints.FileFormat == "json" { ms := GetMemoryStore() + wg.Add(1) go func() { defer wg.Done() d, err := time.ParseDuration(Keys.Checkpoints.Interval) @@ -139,6 +140,7 @@ func Checkpointing(wg *sync.WaitGroup, ctx context.Context) { } }() } else { + wg.Add(1) go func() { defer wg.Done() diff --git a/pkg/metricstore/metricstore.go b/pkg/metricstore/metricstore.go index 6246f520..63161814 100644 --- a/pkg/metricstore/metricstore.go +++ b/pkg/metricstore/metricstore.go @@ -26,6 +26,7 @@ import ( "errors" "fmt" "runtime" + "runtime/debug" "slices" "sync" "time" @@ -168,20 +169,6 @@ func Init(rawConfig json.RawMessage, metrics map[string]MetricConfig, wg *sync.W ctx, shutdown := context.WithCancel(context.Background()) - retentionGoroutines := 1 - checkpointingGoroutines := 1 - dataStagingGoroutines := 1 - archivingGoroutines := 1 - memoryUsageTracker := 1 - - totalGoroutines := retentionGoroutines + - checkpointingGoroutines + - dataStagingGoroutines + - archivingGoroutines + - memoryUsageTracker - - wg.Add(totalGoroutines) - Retention(wg, ctx) Checkpointing(wg, ctx) CleanUp(wg, ctx) @@ -325,6 +312,7 @@ func Shutdown() { func Retention(wg *sync.WaitGroup, ctx context.Context) { ms := GetMemoryStore() + wg.Add(1) go func() { defer wg.Done() d, err := time.ParseDuration(Keys.RetentionInMemory) @@ -370,9 +358,9 @@ func Retention(wg *sync.WaitGroup, ctx context.Context) { // MemoryUsageTracker starts a background goroutine that monitors memory usage. // -// This worker checks memory usage every minute and force-frees buffers if memory -// exceeds the configured cap. It protects against infinite loops by limiting -// iterations and forcing garbage collection between attempts. +// This worker checks memory usage periodically and force-frees buffers if memory +// exceeds the configured cap. It uses FreeOSMemory() to return memory to the OS +// after freeing buffers, avoiding aggressive GC that causes performance issues. // // Parameters: // - wg: WaitGroup to signal completion when context is cancelled @@ -382,6 +370,7 @@ func Retention(wg *sync.WaitGroup, ctx context.Context) { func MemoryUsageTracker(wg *sync.WaitGroup, ctx context.Context) { ms := GetMemoryStore() + wg.Add(1) go func() { defer wg.Done() d := DefaultMemoryUsageTrackerInterval @@ -398,62 +387,67 @@ func MemoryUsageTracker(wg *sync.WaitGroup, ctx context.Context) { case <-ctx.Done(): return case <-ticker.C: - state.mu.RLock() - memoryUsageGB := ms.SizeInGB() - cclog.Infof("[METRICSTORE]> current memory usage: %.2f GB\n", memoryUsageGB) + cclog.Infof("[METRICSTORE]> current memory usage: %.2f GB", memoryUsageGB) - freedTotal := 0 + freedExcluded := 0 + freedEmergency := 0 var err error - // First force-free all the checkpoints that were - if state.lastRetentionTime != 0 && state.selectorsExcluded { - freedTotal, err = ms.Free(nil, state.lastRetentionTime) + state.mu.RLock() + lastRetention := state.lastRetentionTime + selectorsExcluded := state.selectorsExcluded + state.mu.RUnlock() + + if lastRetention != 0 && selectorsExcluded { + freedExcluded, err = ms.Free(nil, lastRetention) if err != nil { cclog.Errorf("[METRICSTORE]> error while force-freeing the excluded buffers: %s", err) } - // Calling runtime.GC() twice in succession tp completely empty a bufferPool (sync.Pool) - runtime.GC() - runtime.GC() - - cclog.Infof("[METRICSTORE]> done: %d excluded buffers force-freed\n", freedTotal) + if freedExcluded > 0 { + debug.FreeOSMemory() + cclog.Infof("[METRICSTORE]> done: %d excluded buffers force-freed", freedExcluded) + } } - state.mu.RUnlock() - memoryUsageGB = ms.SizeInGB() if memoryUsageGB > float64(Keys.MemoryCap) { - cclog.Warnf("[METRICSTORE]> memory usage is still greater than the Memory Cap: %d GB\n", Keys.MemoryCap) - cclog.Warnf("[METRICSTORE]> starting to force-free the buffers from the Metric Store\n") + cclog.Warnf("[METRICSTORE]> memory usage %.2f GB exceeds cap %d GB, starting emergency buffer freeing", memoryUsageGB, Keys.MemoryCap) const maxIterations = 100 - for range maxIterations { - memoryUsageGB = ms.SizeInGB() + for i := 0; i < maxIterations; i++ { if memoryUsageGB < float64(Keys.MemoryCap) { break } freed, err := ms.ForceFree() if err != nil { - cclog.Errorf("[METRICSTORE]> error while force-freeing the buffers: %s", err) + cclog.Errorf("[METRICSTORE]> error while force-freeing buffers: %s", err) } if freed == 0 { - cclog.Errorf("[METRICSTORE]> 0 buffers force-freed in last try, %d total buffers force-freed, memory usage of %.2f GB remains higher than the memory cap of %d GB and there are no buffers left to force-free\n", freedTotal, memoryUsageGB, Keys.MemoryCap) + cclog.Errorf("[METRICSTORE]> no more buffers to free after %d emergency frees, memory usage %.2f GB still exceeds cap %d GB", freedEmergency, memoryUsageGB, Keys.MemoryCap) break } - freedTotal += freed + freedEmergency += freed - runtime.GC() + if i%10 == 0 && freedEmergency > 0 { + memoryUsageGB = ms.SizeInGB() + } } + if freedEmergency > 0 { + debug.FreeOSMemory() + } + + memoryUsageGB = ms.SizeInGB() + if memoryUsageGB >= float64(Keys.MemoryCap) { - cclog.Errorf("[METRICSTORE]> reached maximum iterations (%d) or no more buffers to free, current memory usage: %.2f GB\n", maxIterations, memoryUsageGB) + cclog.Errorf("[METRICSTORE]> after %d emergency frees, memory usage %.2f GB still at/above cap %d GB", freedEmergency, memoryUsageGB, Keys.MemoryCap) } else { - cclog.Infof("[METRICSTORE]> done: %d buffers force-freed\n", freedTotal) - cclog.Infof("[METRICSTORE]> current memory usage after force-freeing the buffers: %.2f GB\n", memoryUsageGB) + cclog.Infof("[METRICSTORE]> emergency freeing complete: %d buffers freed, memory now %.2f GB", freedEmergency, memoryUsageGB) } } From 9d15a87c88b46ba519b599ddea5b619cb554a3fe Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Tue, 27 Jan 2026 18:23:09 +0100 Subject: [PATCH 4/8] Take into account the real allocated heap memory in MemoryUsageTracker --- pkg/metricstore/metricstore.go | 42 +++++++++++++++++++++------------- 1 file changed, 26 insertions(+), 16 deletions(-) diff --git a/pkg/metricstore/metricstore.go b/pkg/metricstore/metricstore.go index 63161814..25f4547e 100644 --- a/pkg/metricstore/metricstore.go +++ b/pkg/metricstore/metricstore.go @@ -358,9 +358,13 @@ func Retention(wg *sync.WaitGroup, ctx context.Context) { // MemoryUsageTracker starts a background goroutine that monitors memory usage. // -// This worker checks memory usage periodically and force-frees buffers if memory -// exceeds the configured cap. It uses FreeOSMemory() to return memory to the OS -// after freeing buffers, avoiding aggressive GC that causes performance issues. +// This worker checks actual process memory usage (via runtime.MemStats) periodically +// and force-frees buffers if memory exceeds the configured cap. It uses FreeOSMemory() +// to return memory to the OS after freeing buffers, avoiding aggressive GC that causes +// performance issues. +// +// The tracker logs both actual memory usage (heap allocated) and metric data size for +// visibility into memory overhead from Go runtime structures and allocations. // // Parameters: // - wg: WaitGroup to signal completion when context is cancelled @@ -387,8 +391,11 @@ func MemoryUsageTracker(wg *sync.WaitGroup, ctx context.Context) { case <-ctx.Done(): return case <-ticker.C: - memoryUsageGB := ms.SizeInGB() - cclog.Infof("[METRICSTORE]> current memory usage: %.2f GB", memoryUsageGB) + var mem runtime.MemStats + runtime.ReadMemStats(&mem) + actualMemoryGB := float64(mem.Alloc) / 1e9 + metricDataGB := ms.SizeInGB() + cclog.Infof("[METRICSTORE]> memory usage: %.2f GB actual (%.2f GB metric data)", actualMemoryGB, metricDataGB) freedExcluded := 0 freedEmergency := 0 @@ -411,15 +418,16 @@ func MemoryUsageTracker(wg *sync.WaitGroup, ctx context.Context) { } } - memoryUsageGB = ms.SizeInGB() + runtime.ReadMemStats(&mem) + actualMemoryGB = float64(mem.Alloc) / 1e9 - if memoryUsageGB > float64(Keys.MemoryCap) { - cclog.Warnf("[METRICSTORE]> memory usage %.2f GB exceeds cap %d GB, starting emergency buffer freeing", memoryUsageGB, Keys.MemoryCap) + if actualMemoryGB > float64(Keys.MemoryCap) { + cclog.Warnf("[METRICSTORE]> memory usage %.2f GB exceeds cap %d GB, starting emergency buffer freeing", actualMemoryGB, Keys.MemoryCap) const maxIterations = 100 - for i := 0; i < maxIterations; i++ { - if memoryUsageGB < float64(Keys.MemoryCap) { + for i := range maxIterations { + if actualMemoryGB < float64(Keys.MemoryCap) { break } @@ -428,13 +436,14 @@ func MemoryUsageTracker(wg *sync.WaitGroup, ctx context.Context) { cclog.Errorf("[METRICSTORE]> error while force-freeing buffers: %s", err) } if freed == 0 { - cclog.Errorf("[METRICSTORE]> no more buffers to free after %d emergency frees, memory usage %.2f GB still exceeds cap %d GB", freedEmergency, memoryUsageGB, Keys.MemoryCap) + cclog.Errorf("[METRICSTORE]> no more buffers to free after %d emergency frees, memory usage %.2f GB still exceeds cap %d GB", freedEmergency, actualMemoryGB, Keys.MemoryCap) break } freedEmergency += freed if i%10 == 0 && freedEmergency > 0 { - memoryUsageGB = ms.SizeInGB() + runtime.ReadMemStats(&mem) + actualMemoryGB = float64(mem.Alloc) / 1e9 } } @@ -442,12 +451,13 @@ func MemoryUsageTracker(wg *sync.WaitGroup, ctx context.Context) { debug.FreeOSMemory() } - memoryUsageGB = ms.SizeInGB() + runtime.ReadMemStats(&mem) + actualMemoryGB = float64(mem.Alloc) / 1e9 - if memoryUsageGB >= float64(Keys.MemoryCap) { - cclog.Errorf("[METRICSTORE]> after %d emergency frees, memory usage %.2f GB still at/above cap %d GB", freedEmergency, memoryUsageGB, Keys.MemoryCap) + if actualMemoryGB >= float64(Keys.MemoryCap) { + cclog.Errorf("[METRICSTORE]> after %d emergency frees, memory usage %.2f GB still at/above cap %d GB", freedEmergency, actualMemoryGB, Keys.MemoryCap) } else { - cclog.Infof("[METRICSTORE]> emergency freeing complete: %d buffers freed, memory now %.2f GB", freedEmergency, memoryUsageGB) + cclog.Infof("[METRICSTORE]> emergency freeing complete: %d buffers freed, memory now %.2f GB", freedEmergency, actualMemoryGB) } } From 95689e3c99dedf386de873812718fa647d4f5048 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Wed, 28 Jan 2026 07:05:29 +0100 Subject: [PATCH 5/8] Add API endpoint for getUsedNodes Needed by dynamic memory management for external ccms --- internal/api/api_test.go | 34 +++++++++++++++++++++++++ internal/api/job.go | 54 ++++++++++++++++++++++++++++++++++++++++ internal/api/rest.go | 5 ++-- 3 files changed, 91 insertions(+), 2 deletions(-) diff --git a/internal/api/api_test.go b/internal/api/api_test.go index 4a7fc07c..8cbf95d7 100644 --- a/internal/api/api_test.go +++ b/internal/api/api_test.go @@ -455,4 +455,38 @@ func TestRestApi(t *testing.T) { if !ok { t.Fatal("subtest failed") } + + t.Run("GetUsedNodesNoRunning", func(t *testing.T) { + contextUserValue := &schema.User{ + Username: "testuser", + Projects: make([]string, 0), + Roles: []string{"api"}, + AuthType: 0, + AuthSource: 2, + } + + req := httptest.NewRequest(http.MethodGet, "/jobs/used_nodes?ts=123456790", nil) + recorder := httptest.NewRecorder() + + ctx := context.WithValue(req.Context(), contextUserKey, contextUserValue) + + r.ServeHTTP(recorder, req.WithContext(ctx)) + response := recorder.Result() + if response.StatusCode != http.StatusOK { + t.Fatal(response.Status, recorder.Body.String()) + } + + var result api.GetUsedNodesAPIResponse + if err := json.NewDecoder(response.Body).Decode(&result); err != nil { + t.Fatal(err) + } + + if result.UsedNodes == nil { + t.Fatal("expected usedNodes to be non-nil") + } + + if len(result.UsedNodes) != 0 { + t.Fatalf("expected no used nodes for stopped jobs, got: %v", result.UsedNodes) + } + }) } diff --git a/internal/api/job.go b/internal/api/job.go index 1b1e05d6..64f6a92c 100644 --- a/internal/api/job.go +++ b/internal/api/job.go @@ -1021,3 +1021,57 @@ func (api *RestAPI) getJobMetrics(rw http.ResponseWriter, r *http.Request) { cclog.Errorf("Failed to encode response: %v", err) } } + +// GetUsedNodesAPIResponse model +type GetUsedNodesAPIResponse struct { + UsedNodes map[string][]string `json:"usedNodes"` // Map of cluster names to lists of used node hostnames +} + +// getUsedNodes godoc +// @summary Lists used nodes by cluster +// @tags Job query +// @description Get a map of cluster names to lists of unique hostnames that are currently in use by running jobs that started before the specified timestamp. +// @produce json +// @param ts query int true "Unix timestamp to filter jobs (jobs with start_time < ts)" +// @success 200 {object} api.GetUsedNodesAPIResponse "Map of cluster names to hostname lists" +// @failure 400 {object} api.ErrorResponse "Bad Request" +// @failure 401 {object} api.ErrorResponse "Unauthorized" +// @failure 403 {object} api.ErrorResponse "Forbidden" +// @failure 500 {object} api.ErrorResponse "Internal Server Error" +// @security ApiKeyAuth +// @router /api/jobs/used_nodes [get] +func (api *RestAPI) getUsedNodes(rw http.ResponseWriter, r *http.Request) { + if user := repository.GetUserFromContext(r.Context()); user != nil && + !user.HasRole(schema.RoleApi) { + handleError(fmt.Errorf("missing role: %v", schema.GetRoleString(schema.RoleApi)), http.StatusForbidden, rw) + return + } + + tsStr := r.URL.Query().Get("ts") + if tsStr == "" { + handleError(fmt.Errorf("missing required query parameter: ts"), http.StatusBadRequest, rw) + return + } + + ts, err := strconv.ParseInt(tsStr, 10, 64) + if err != nil { + handleError(fmt.Errorf("invalid timestamp format: %w", err), http.StatusBadRequest, rw) + return + } + + usedNodes, err := api.JobRepository.GetUsedNodes(ts) + if err != nil { + handleError(fmt.Errorf("failed to get used nodes: %w", err), http.StatusInternalServerError, rw) + return + } + + rw.Header().Add("Content-Type", "application/json") + payload := GetUsedNodesAPIResponse{ + UsedNodes: usedNodes, + } + + if err := json.NewEncoder(rw).Encode(payload); err != nil { + handleError(err, http.StatusInternalServerError, rw) + return + } +} diff --git a/internal/api/rest.go b/internal/api/rest.go index c0fa7c2a..0d52742e 100644 --- a/internal/api/rest.go +++ b/internal/api/rest.go @@ -89,8 +89,7 @@ func (api *RestAPI) MountAPIRoutes(r *mux.Router) { r.HandleFunc("/jobs/stop_job/", api.stopJobByRequest).Methods(http.MethodPost, http.MethodPut) } r.HandleFunc("/jobs/", api.getJobs).Methods(http.MethodGet) - r.HandleFunc("/jobs/{id}", api.getJobByID).Methods(http.MethodPost) - r.HandleFunc("/jobs/{id}", api.getCompleteJobByID).Methods(http.MethodGet) + r.HandleFunc("/jobs/used_nodes", api.getUsedNodes).Methods(http.MethodGet) r.HandleFunc("/jobs/tag_job/{id}", api.tagJob).Methods(http.MethodPost, http.MethodPatch) r.HandleFunc("/jobs/tag_job/{id}", api.removeTagJob).Methods(http.MethodDelete) r.HandleFunc("/jobs/edit_meta/{id}", api.editMeta).Methods(http.MethodPost, http.MethodPatch) @@ -98,6 +97,8 @@ func (api *RestAPI) MountAPIRoutes(r *mux.Router) { r.HandleFunc("/jobs/delete_job/", api.deleteJobByRequest).Methods(http.MethodDelete) r.HandleFunc("/jobs/delete_job/{id}", api.deleteJobByID).Methods(http.MethodDelete) r.HandleFunc("/jobs/delete_job_before/{ts}", api.deleteJobBefore).Methods(http.MethodDelete) + r.HandleFunc("/jobs/{id}", api.getJobByID).Methods(http.MethodPost) + r.HandleFunc("/jobs/{id}", api.getCompleteJobByID).Methods(http.MethodGet) r.HandleFunc("/tags/", api.removeTags).Methods(http.MethodDelete) From 98661aad15f1554400819c44b36714e1bc1f42db Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Wed, 28 Jan 2026 10:41:44 +0100 Subject: [PATCH 6/8] Increase default GC frequency --- cmd/cc-backend/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/cc-backend/main.go b/cmd/cc-backend/main.go index deefce34..3c70a960 100644 --- a/cmd/cc-backend/main.go +++ b/cmd/cc-backend/main.go @@ -399,7 +399,7 @@ func runServer(ctx context.Context) error { // Set GC percent if not configured if os.Getenv(envGOGC) == "" { - debug.SetGCPercent(25) + debug.SetGCPercent(15) } runtime.SystemdNotify(true, "running") From eb5aa9ad02eef1b73caddaf709d7cacb65f09a7e Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Wed, 28 Jan 2026 11:21:02 +0100 Subject: [PATCH 7/8] Disable explicit GC calls --- pkg/metricstore/checkpoint.go | 17 ++++++++--------- pkg/metricstore/metricstore.go | 7 +++---- 2 files changed, 11 insertions(+), 13 deletions(-) diff --git a/pkg/metricstore/checkpoint.go b/pkg/metricstore/checkpoint.go index c301058b..715566e4 100644 --- a/pkg/metricstore/checkpoint.go +++ b/pkg/metricstore/checkpoint.go @@ -43,7 +43,6 @@ import ( "os" "path" "path/filepath" - "runtime" "sort" "strconv" "strings" @@ -396,14 +395,14 @@ func enqueueCheckpointHosts(dir string, work chan<- [2]string) error { } gcCounter++ - if gcCounter%GCTriggerInterval == 0 { - // Forcing garbage collection runs here regulary during the loading of checkpoints - // will decrease the total heap size after loading everything back to memory is done. - // While loading data, the heap will grow fast, so the GC target size will double - // almost always. By forcing GCs here, we can keep it growing more slowly so that - // at the end, less memory is wasted. - runtime.GC() - } + // if gcCounter%GCTriggerInterval == 0 { + // Forcing garbage collection runs here regulary during the loading of checkpoints + // will decrease the total heap size after loading everything back to memory is done. + // While loading data, the heap will grow fast, so the GC target size will double + // almost always. By forcing GCs here, we can keep it growing more slowly so that + // at the end, less memory is wasted. + // runtime.GC() + // } work <- [2]string{clusterDir.Name(), hostDir.Name()} } diff --git a/pkg/metricstore/metricstore.go b/pkg/metricstore/metricstore.go index 25f4547e..38ff9015 100644 --- a/pkg/metricstore/metricstore.go +++ b/pkg/metricstore/metricstore.go @@ -447,9 +447,9 @@ func MemoryUsageTracker(wg *sync.WaitGroup, ctx context.Context) { } } - if freedEmergency > 0 { - debug.FreeOSMemory() - } + // if freedEmergency > 0 { + // debug.FreeOSMemory() + // } runtime.ReadMemStats(&mem) actualMemoryGB = float64(mem.Alloc) / 1e9 @@ -460,7 +460,6 @@ func MemoryUsageTracker(wg *sync.WaitGroup, ctx context.Context) { cclog.Infof("[METRICSTORE]> emergency freeing complete: %d buffers freed, memory now %.2f GB", freedEmergency, actualMemoryGB) } } - } } }() From 0d857b49a25692e8e1259e7ed6400dce7da202ab Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Wed, 28 Jan 2026 11:21:27 +0100 Subject: [PATCH 8/8] Disable explicit GC calls --- pkg/metricstore/metricstore.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/metricstore/metricstore.go b/pkg/metricstore/metricstore.go index 38ff9015..617e945e 100644 --- a/pkg/metricstore/metricstore.go +++ b/pkg/metricstore/metricstore.go @@ -165,7 +165,7 @@ func Init(rawConfig json.RawMessage, metrics map[string]MetricConfig, wg *sync.W // previously active heap, a GC is triggered. // Forcing a GC here will set the "previously active heap" // to a minumum. - runtime.GC() + // runtime.GC() ctx, shutdown := context.WithCancel(context.Background())