diff --git a/pkg/metricstore/buffer.go b/pkg/metricstore/buffer.go index 2d752006..8530f893 100644 --- a/pkg/metricstore/buffer.go +++ b/pkg/metricstore/buffer.go @@ -408,3 +408,12 @@ func (b *buffer) count() int64 { } return res } + +// bufferCount returns the number of buffer nodes in the linked list. +func (b *buffer) bufferCount() int64 { + var n int64 + for ; b != nil; b = b.prev { + n++ + } + return n +} diff --git a/pkg/metricstore/config.go b/pkg/metricstore/config.go index 3b6be529..44743dbe 100644 --- a/pkg/metricstore/config.go +++ b/pkg/metricstore/config.go @@ -53,7 +53,7 @@ const ( DefaultMaxWorkers = 10 DefaultBufferCapacity = 512 DefaultGCTriggerInterval = 100 - DefaultMemoryUsageTrackerInterval = 1 * time.Hour + DefaultMemoryUsageTrackerInterval = 5 * time.Minute ) // Checkpoints configures periodic persistence of in-memory metric data. diff --git a/pkg/metricstore/level.go b/pkg/metricstore/level.go index 2b24a2ea..218ae8e6 100644 --- a/pkg/metricstore/level.go +++ b/pkg/metricstore/level.go @@ -200,17 +200,33 @@ func (l *Level) free(t int64) (int, error) { } } - for _, l := range l.children { - m, err := l.free(t) + for key, child := range l.children { + m, err := child.free(t) n += m if err != nil { return n, err } + if child.isEmpty() { + delete(l.children, key) + } } return n, nil } +// isEmpty returns true if this level has no metrics and no children. +func (l *Level) isEmpty() bool { + l.lock.RLock() + defer l.lock.RUnlock() + + for _, b := range l.metrics { + if b != nil { + return false + } + } + return len(l.children) == 0 +} + // forceFree removes the oldest buffer from each metric chain in the subtree. // // Unlike free(), which removes based on time threshold, this unconditionally removes @@ -278,6 +294,7 @@ func (l *Level) sizeInBytes() int64 { for _, b := range l.metrics { if b != nil { size += b.count() * int64(unsafe.Sizeof(schema.Float(0))) + size += b.bufferCount() * int64(unsafe.Sizeof(buffer{})) } } diff --git a/pkg/metricstore/lineprotocol.go b/pkg/metricstore/lineprotocol.go index ecae3df1..21f036ed 100644 --- a/pkg/metricstore/lineprotocol.go +++ b/pkg/metricstore/lineprotocol.go @@ -359,7 +359,8 @@ func DecodeLine(dec *lineprotocol.Decoder, } if err := ms.WriteToLevel(lvl, st.selector, time, []Metric{metric}); err != nil { - return err + cclog.Warnf("write error for host %s metric %s at ts %d: %s", host, string(st.metricBuf), time, err.Error()) + continue } } return nil diff --git a/pkg/metricstore/metricstore.go b/pkg/metricstore/metricstore.go index b6fbb51a..85eb0baf 100644 --- a/pkg/metricstore/metricstore.go +++ b/pkg/metricstore/metricstore.go @@ -384,14 +384,16 @@ func MemoryUsageTracker(wg *sync.WaitGroup, ctx context.Context) { ms := GetMemoryStore() wg.Go(func() { - d := DefaultMemoryUsageTrackerInterval + normalInterval := DefaultMemoryUsageTrackerInterval + fastInterval := 30 * time.Second - if d <= 0 { + if normalInterval <= 0 { return } - ticker := time.NewTicker(d) + ticker := time.NewTicker(normalInterval) defer ticker.Stop() + currentInterval := normalInterval for { select { @@ -428,48 +430,59 @@ func MemoryUsageTracker(wg *sync.WaitGroup, ctx context.Context) { runtime.ReadMemStats(&mem) actualMemoryGB = float64(mem.Alloc) / 1e9 - bufferPool.Clear() - cclog.Infof("[METRICSTORE]> Cleaned up bufferPool\n") - if actualMemoryGB > float64(Keys.MemoryCap) { cclog.Warnf("[METRICSTORE]> memory usage %.2f GB exceeds cap %d GB, starting emergency buffer freeing", actualMemoryGB, Keys.MemoryCap) - const maxIterations = 100 + // Use progressive time-based Free with increasing threshold + // instead of ForceFree loop — fewer tree traversals, more effective + d, parseErr := time.ParseDuration(Keys.RetentionInMemory) + if parseErr != nil { + cclog.Errorf("[METRICSTORE]> cannot parse retention duration: %s", parseErr) + } else { + thresholds := []float64{0.75, 0.5, 0.25} + for _, fraction := range thresholds { + threshold := time.Now().Add(-time.Duration(float64(d) * fraction)) + freed, freeErr := ms.Free(nil, threshold.Unix()) + if freeErr != nil { + cclog.Errorf("[METRICSTORE]> error while freeing buffers at %.0f%% retention: %s", fraction*100, freeErr) + } + freedEmergency += freed - for i := range maxIterations { - if actualMemoryGB < float64(Keys.MemoryCap) { - break - } - - freed, err := ms.ForceFree() - if err != nil { - cclog.Errorf("[METRICSTORE]> error while force-freeing buffers: %s", err) - } - if freed == 0 { - cclog.Errorf("[METRICSTORE]> no more buffers to free after %d emergency frees, memory usage %.2f GB still exceeds cap %d GB", freedEmergency, actualMemoryGB, Keys.MemoryCap) - break - } - freedEmergency += freed - - if i%10 == 0 && freedEmergency > 0 { + bufferPool.Clear() + runtime.GC() runtime.ReadMemStats(&mem) actualMemoryGB = float64(mem.Alloc) / 1e9 + + if actualMemoryGB < float64(Keys.MemoryCap) { + break + } } } - // if freedEmergency > 0 { - // debug.FreeOSMemory() - // } + bufferPool.Clear() + debug.FreeOSMemory() runtime.ReadMemStats(&mem) actualMemoryGB = float64(mem.Alloc) / 1e9 if actualMemoryGB >= float64(Keys.MemoryCap) { - cclog.Errorf("[METRICSTORE]> after %d emergency frees, memory usage %.2f GB still at/above cap %d GB", freedEmergency, actualMemoryGB, Keys.MemoryCap) + cclog.Errorf("[METRICSTORE]> after emergency frees (%d buffers), memory usage %.2f GB still at/above cap %d GB", freedEmergency, actualMemoryGB, Keys.MemoryCap) } else { cclog.Infof("[METRICSTORE]> emergency freeing complete: %d buffers freed, memory now %.2f GB", freedEmergency, actualMemoryGB) } } + + // Adaptive ticker: check more frequently when memory is high + memoryRatio := actualMemoryGB / float64(Keys.MemoryCap) + if memoryRatio > 0.8 && currentInterval != fastInterval { + ticker.Reset(fastInterval) + currentInterval = fastInterval + cclog.Infof("[METRICSTORE]> memory at %.0f%% of cap, switching to fast check interval (30s)", memoryRatio*100) + } else if memoryRatio <= 0.8 && currentInterval != normalInterval { + ticker.Reset(normalInterval) + currentInterval = normalInterval + cclog.Infof("[METRICSTORE]> memory at %.0f%% of cap, switching to normal check interval", memoryRatio*100) + } } } })