Move wg increment inside goroutines. Make GC calls less aggressive

This commit is contained in:
2026-01-27 17:25:29 +01:00
parent 55cb2cb6d6
commit bbde91a1f9
4 changed files with 44 additions and 48 deletions

View File

@@ -49,6 +49,7 @@ func CleanUp(wg *sync.WaitGroup, ctx context.Context) {
// runWorker takes simple values to configure what it does // runWorker takes simple values to configure what it does
func cleanUpWorker(wg *sync.WaitGroup, ctx context.Context, interval string, mode string, cleanupDir string, delete bool) { func cleanUpWorker(wg *sync.WaitGroup, ctx context.Context, interval string, mode string, cleanupDir string, delete bool) {
wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()

View File

@@ -15,15 +15,14 @@ import (
) )
func DataStaging(wg *sync.WaitGroup, ctx context.Context) { func DataStaging(wg *sync.WaitGroup, ctx context.Context) {
// AvroPool is a pool of Avro writers. wg.Add(1)
go func() { go func() {
if Keys.Checkpoints.FileFormat == "json" {
wg.Done() // Mark this goroutine as done
return // Exit the goroutine
}
defer wg.Done() defer wg.Done()
if Keys.Checkpoints.FileFormat == "json" {
return
}
ms := GetMemoryStore() ms := GetMemoryStore()
var avroLevel *AvroLevel var avroLevel *AvroLevel
oldSelector := make([]string, 0) oldSelector := make([]string, 0)

View File

@@ -100,6 +100,7 @@ func Checkpointing(wg *sync.WaitGroup, ctx context.Context) {
if Keys.Checkpoints.FileFormat == "json" { if Keys.Checkpoints.FileFormat == "json" {
ms := GetMemoryStore() ms := GetMemoryStore()
wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
d, err := time.ParseDuration(Keys.Checkpoints.Interval) d, err := time.ParseDuration(Keys.Checkpoints.Interval)
@@ -139,6 +140,7 @@ func Checkpointing(wg *sync.WaitGroup, ctx context.Context) {
} }
}() }()
} else { } else {
wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()

View File

@@ -26,6 +26,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"runtime" "runtime"
"runtime/debug"
"slices" "slices"
"sync" "sync"
"time" "time"
@@ -168,20 +169,6 @@ func Init(rawConfig json.RawMessage, metrics map[string]MetricConfig, wg *sync.W
ctx, shutdown := context.WithCancel(context.Background()) ctx, shutdown := context.WithCancel(context.Background())
retentionGoroutines := 1
checkpointingGoroutines := 1
dataStagingGoroutines := 1
archivingGoroutines := 1
memoryUsageTracker := 1
totalGoroutines := retentionGoroutines +
checkpointingGoroutines +
dataStagingGoroutines +
archivingGoroutines +
memoryUsageTracker
wg.Add(totalGoroutines)
Retention(wg, ctx) Retention(wg, ctx)
Checkpointing(wg, ctx) Checkpointing(wg, ctx)
CleanUp(wg, ctx) CleanUp(wg, ctx)
@@ -325,6 +312,7 @@ func Shutdown() {
func Retention(wg *sync.WaitGroup, ctx context.Context) { func Retention(wg *sync.WaitGroup, ctx context.Context) {
ms := GetMemoryStore() ms := GetMemoryStore()
wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
d, err := time.ParseDuration(Keys.RetentionInMemory) d, err := time.ParseDuration(Keys.RetentionInMemory)
@@ -370,9 +358,9 @@ func Retention(wg *sync.WaitGroup, ctx context.Context) {
// MemoryUsageTracker starts a background goroutine that monitors memory usage. // MemoryUsageTracker starts a background goroutine that monitors memory usage.
// //
// This worker checks memory usage every minute and force-frees buffers if memory // This worker checks memory usage periodically and force-frees buffers if memory
// exceeds the configured cap. It protects against infinite loops by limiting // exceeds the configured cap. It uses FreeOSMemory() to return memory to the OS
// iterations and forcing garbage collection between attempts. // after freeing buffers, avoiding aggressive GC that causes performance issues.
// //
// Parameters: // Parameters:
// - wg: WaitGroup to signal completion when context is cancelled // - wg: WaitGroup to signal completion when context is cancelled
@@ -382,6 +370,7 @@ func Retention(wg *sync.WaitGroup, ctx context.Context) {
func MemoryUsageTracker(wg *sync.WaitGroup, ctx context.Context) { func MemoryUsageTracker(wg *sync.WaitGroup, ctx context.Context) {
ms := GetMemoryStore() ms := GetMemoryStore()
wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
d := DefaultMemoryUsageTrackerInterval d := DefaultMemoryUsageTrackerInterval
@@ -398,62 +387,67 @@ func MemoryUsageTracker(wg *sync.WaitGroup, ctx context.Context) {
case <-ctx.Done(): case <-ctx.Done():
return return
case <-ticker.C: case <-ticker.C:
state.mu.RLock()
memoryUsageGB := ms.SizeInGB() memoryUsageGB := ms.SizeInGB()
cclog.Infof("[METRICSTORE]> current memory usage: %.2f GB\n", memoryUsageGB) cclog.Infof("[METRICSTORE]> current memory usage: %.2f GB", memoryUsageGB)
freedTotal := 0 freedExcluded := 0
freedEmergency := 0
var err error var err error
// First force-free all the checkpoints that were state.mu.RLock()
if state.lastRetentionTime != 0 && state.selectorsExcluded { lastRetention := state.lastRetentionTime
freedTotal, err = ms.Free(nil, state.lastRetentionTime) selectorsExcluded := state.selectorsExcluded
state.mu.RUnlock()
if lastRetention != 0 && selectorsExcluded {
freedExcluded, err = ms.Free(nil, lastRetention)
if err != nil { if err != nil {
cclog.Errorf("[METRICSTORE]> error while force-freeing the excluded buffers: %s", err) cclog.Errorf("[METRICSTORE]> error while force-freeing the excluded buffers: %s", err)
} }
// Calling runtime.GC() twice in succession tp completely empty a bufferPool (sync.Pool) if freedExcluded > 0 {
runtime.GC() debug.FreeOSMemory()
runtime.GC() cclog.Infof("[METRICSTORE]> done: %d excluded buffers force-freed", freedExcluded)
}
cclog.Infof("[METRICSTORE]> done: %d excluded buffers force-freed\n", freedTotal)
} }
state.mu.RUnlock()
memoryUsageGB = ms.SizeInGB() memoryUsageGB = ms.SizeInGB()
if memoryUsageGB > float64(Keys.MemoryCap) { if memoryUsageGB > float64(Keys.MemoryCap) {
cclog.Warnf("[METRICSTORE]> memory usage is still greater than the Memory Cap: %d GB\n", Keys.MemoryCap) cclog.Warnf("[METRICSTORE]> memory usage %.2f GB exceeds cap %d GB, starting emergency buffer freeing", memoryUsageGB, Keys.MemoryCap)
cclog.Warnf("[METRICSTORE]> starting to force-free the buffers from the Metric Store\n")
const maxIterations = 100 const maxIterations = 100
for range maxIterations { for i := 0; i < maxIterations; i++ {
memoryUsageGB = ms.SizeInGB()
if memoryUsageGB < float64(Keys.MemoryCap) { if memoryUsageGB < float64(Keys.MemoryCap) {
break break
} }
freed, err := ms.ForceFree() freed, err := ms.ForceFree()
if err != nil { if err != nil {
cclog.Errorf("[METRICSTORE]> error while force-freeing the buffers: %s", err) cclog.Errorf("[METRICSTORE]> error while force-freeing buffers: %s", err)
} }
if freed == 0 { if freed == 0 {
cclog.Errorf("[METRICSTORE]> 0 buffers force-freed in last try, %d total buffers force-freed, memory usage of %.2f GB remains higher than the memory cap of %d GB and there are no buffers left to force-free\n", freedTotal, memoryUsageGB, Keys.MemoryCap) cclog.Errorf("[METRICSTORE]> no more buffers to free after %d emergency frees, memory usage %.2f GB still exceeds cap %d GB", freedEmergency, memoryUsageGB, Keys.MemoryCap)
break break
} }
freedTotal += freed freedEmergency += freed
runtime.GC() if i%10 == 0 && freedEmergency > 0 {
memoryUsageGB = ms.SizeInGB()
}
} }
if freedEmergency > 0 {
debug.FreeOSMemory()
}
memoryUsageGB = ms.SizeInGB()
if memoryUsageGB >= float64(Keys.MemoryCap) { if memoryUsageGB >= float64(Keys.MemoryCap) {
cclog.Errorf("[METRICSTORE]> reached maximum iterations (%d) or no more buffers to free, current memory usage: %.2f GB\n", maxIterations, memoryUsageGB) cclog.Errorf("[METRICSTORE]> after %d emergency frees, memory usage %.2f GB still at/above cap %d GB", freedEmergency, memoryUsageGB, Keys.MemoryCap)
} else { } else {
cclog.Infof("[METRICSTORE]> done: %d buffers force-freed\n", freedTotal) cclog.Infof("[METRICSTORE]> emergency freeing complete: %d buffers freed, memory now %.2f GB", freedEmergency, memoryUsageGB)
cclog.Infof("[METRICSTORE]> current memory usage after force-freeing the buffers: %.2f GB\n", memoryUsageGB)
} }
} }