From bbde91a1f9ba5cf061b661b2370106d32344e533 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Tue, 27 Jan 2026 17:25:29 +0100 Subject: [PATCH] Move wg increment inside goroutines. Make GC calls less aggressive --- pkg/metricstore/archive.go | 1 + pkg/metricstore/avroHelper.go | 11 +++-- pkg/metricstore/checkpoint.go | 2 + pkg/metricstore/metricstore.go | 78 ++++++++++++++++------------------ 4 files changed, 44 insertions(+), 48 deletions(-) diff --git a/pkg/metricstore/archive.go b/pkg/metricstore/archive.go index 6abcb183..cab4c24f 100644 --- a/pkg/metricstore/archive.go +++ b/pkg/metricstore/archive.go @@ -49,6 +49,7 @@ func CleanUp(wg *sync.WaitGroup, ctx context.Context) { // runWorker takes simple values to configure what it does func cleanUpWorker(wg *sync.WaitGroup, ctx context.Context, interval string, mode string, cleanupDir string, delete bool) { + wg.Add(1) go func() { defer wg.Done() diff --git a/pkg/metricstore/avroHelper.go b/pkg/metricstore/avroHelper.go index 36594e73..62827afd 100644 --- a/pkg/metricstore/avroHelper.go +++ b/pkg/metricstore/avroHelper.go @@ -15,15 +15,14 @@ import ( ) func DataStaging(wg *sync.WaitGroup, ctx context.Context) { - // AvroPool is a pool of Avro writers. + wg.Add(1) go func() { - if Keys.Checkpoints.FileFormat == "json" { - wg.Done() // Mark this goroutine as done - return // Exit the goroutine - } - defer wg.Done() + if Keys.Checkpoints.FileFormat == "json" { + return + } + ms := GetMemoryStore() var avroLevel *AvroLevel oldSelector := make([]string, 0) diff --git a/pkg/metricstore/checkpoint.go b/pkg/metricstore/checkpoint.go index b90b1c22..c301058b 100644 --- a/pkg/metricstore/checkpoint.go +++ b/pkg/metricstore/checkpoint.go @@ -100,6 +100,7 @@ func Checkpointing(wg *sync.WaitGroup, ctx context.Context) { if Keys.Checkpoints.FileFormat == "json" { ms := GetMemoryStore() + wg.Add(1) go func() { defer wg.Done() d, err := time.ParseDuration(Keys.Checkpoints.Interval) @@ -139,6 +140,7 @@ func Checkpointing(wg *sync.WaitGroup, ctx context.Context) { } }() } else { + wg.Add(1) go func() { defer wg.Done() diff --git a/pkg/metricstore/metricstore.go b/pkg/metricstore/metricstore.go index 6246f520..63161814 100644 --- a/pkg/metricstore/metricstore.go +++ b/pkg/metricstore/metricstore.go @@ -26,6 +26,7 @@ import ( "errors" "fmt" "runtime" + "runtime/debug" "slices" "sync" "time" @@ -168,20 +169,6 @@ func Init(rawConfig json.RawMessage, metrics map[string]MetricConfig, wg *sync.W ctx, shutdown := context.WithCancel(context.Background()) - retentionGoroutines := 1 - checkpointingGoroutines := 1 - dataStagingGoroutines := 1 - archivingGoroutines := 1 - memoryUsageTracker := 1 - - totalGoroutines := retentionGoroutines + - checkpointingGoroutines + - dataStagingGoroutines + - archivingGoroutines + - memoryUsageTracker - - wg.Add(totalGoroutines) - Retention(wg, ctx) Checkpointing(wg, ctx) CleanUp(wg, ctx) @@ -325,6 +312,7 @@ func Shutdown() { func Retention(wg *sync.WaitGroup, ctx context.Context) { ms := GetMemoryStore() + wg.Add(1) go func() { defer wg.Done() d, err := time.ParseDuration(Keys.RetentionInMemory) @@ -370,9 +358,9 @@ func Retention(wg *sync.WaitGroup, ctx context.Context) { // MemoryUsageTracker starts a background goroutine that monitors memory usage. // -// This worker checks memory usage every minute and force-frees buffers if memory -// exceeds the configured cap. It protects against infinite loops by limiting -// iterations and forcing garbage collection between attempts. +// This worker checks memory usage periodically and force-frees buffers if memory +// exceeds the configured cap. It uses FreeOSMemory() to return memory to the OS +// after freeing buffers, avoiding aggressive GC that causes performance issues. // // Parameters: // - wg: WaitGroup to signal completion when context is cancelled @@ -382,6 +370,7 @@ func Retention(wg *sync.WaitGroup, ctx context.Context) { func MemoryUsageTracker(wg *sync.WaitGroup, ctx context.Context) { ms := GetMemoryStore() + wg.Add(1) go func() { defer wg.Done() d := DefaultMemoryUsageTrackerInterval @@ -398,62 +387,67 @@ func MemoryUsageTracker(wg *sync.WaitGroup, ctx context.Context) { case <-ctx.Done(): return case <-ticker.C: - state.mu.RLock() - memoryUsageGB := ms.SizeInGB() - cclog.Infof("[METRICSTORE]> current memory usage: %.2f GB\n", memoryUsageGB) + cclog.Infof("[METRICSTORE]> current memory usage: %.2f GB", memoryUsageGB) - freedTotal := 0 + freedExcluded := 0 + freedEmergency := 0 var err error - // First force-free all the checkpoints that were - if state.lastRetentionTime != 0 && state.selectorsExcluded { - freedTotal, err = ms.Free(nil, state.lastRetentionTime) + state.mu.RLock() + lastRetention := state.lastRetentionTime + selectorsExcluded := state.selectorsExcluded + state.mu.RUnlock() + + if lastRetention != 0 && selectorsExcluded { + freedExcluded, err = ms.Free(nil, lastRetention) if err != nil { cclog.Errorf("[METRICSTORE]> error while force-freeing the excluded buffers: %s", err) } - // Calling runtime.GC() twice in succession tp completely empty a bufferPool (sync.Pool) - runtime.GC() - runtime.GC() - - cclog.Infof("[METRICSTORE]> done: %d excluded buffers force-freed\n", freedTotal) + if freedExcluded > 0 { + debug.FreeOSMemory() + cclog.Infof("[METRICSTORE]> done: %d excluded buffers force-freed", freedExcluded) + } } - state.mu.RUnlock() - memoryUsageGB = ms.SizeInGB() if memoryUsageGB > float64(Keys.MemoryCap) { - cclog.Warnf("[METRICSTORE]> memory usage is still greater than the Memory Cap: %d GB\n", Keys.MemoryCap) - cclog.Warnf("[METRICSTORE]> starting to force-free the buffers from the Metric Store\n") + cclog.Warnf("[METRICSTORE]> memory usage %.2f GB exceeds cap %d GB, starting emergency buffer freeing", memoryUsageGB, Keys.MemoryCap) const maxIterations = 100 - for range maxIterations { - memoryUsageGB = ms.SizeInGB() + for i := 0; i < maxIterations; i++ { if memoryUsageGB < float64(Keys.MemoryCap) { break } freed, err := ms.ForceFree() if err != nil { - cclog.Errorf("[METRICSTORE]> error while force-freeing the buffers: %s", err) + cclog.Errorf("[METRICSTORE]> error while force-freeing buffers: %s", err) } if freed == 0 { - cclog.Errorf("[METRICSTORE]> 0 buffers force-freed in last try, %d total buffers force-freed, memory usage of %.2f GB remains higher than the memory cap of %d GB and there are no buffers left to force-free\n", freedTotal, memoryUsageGB, Keys.MemoryCap) + cclog.Errorf("[METRICSTORE]> no more buffers to free after %d emergency frees, memory usage %.2f GB still exceeds cap %d GB", freedEmergency, memoryUsageGB, Keys.MemoryCap) break } - freedTotal += freed + freedEmergency += freed - runtime.GC() + if i%10 == 0 && freedEmergency > 0 { + memoryUsageGB = ms.SizeInGB() + } } + if freedEmergency > 0 { + debug.FreeOSMemory() + } + + memoryUsageGB = ms.SizeInGB() + if memoryUsageGB >= float64(Keys.MemoryCap) { - cclog.Errorf("[METRICSTORE]> reached maximum iterations (%d) or no more buffers to free, current memory usage: %.2f GB\n", maxIterations, memoryUsageGB) + cclog.Errorf("[METRICSTORE]> after %d emergency frees, memory usage %.2f GB still at/above cap %d GB", freedEmergency, memoryUsageGB, Keys.MemoryCap) } else { - cclog.Infof("[METRICSTORE]> done: %d buffers force-freed\n", freedTotal) - cclog.Infof("[METRICSTORE]> current memory usage after force-freeing the buffers: %.2f GB\n", memoryUsageGB) + cclog.Infof("[METRICSTORE]> emergency freeing complete: %d buffers freed, memory now %.2f GB", freedEmergency, memoryUsageGB) } }