fix: Fix metricstore memory explosion from broken emergency free and batch aborts

- Fix MemoryUsageTracker: remove premature bufferPool.Clear() that prevented
  mem.Alloc from decreasing, replace broken ForceFree loop (100 iterations
  with no GC) with progressive time-based Free at 75%/50%/25% retention,
  add bufferPool.Clear()+GC between steps so memory stats update correctly
- Enable debug.FreeOSMemory() after emergency freeing to return memory to OS
- Add adaptive ticker: 30s checks when memory >80% of cap, normal otherwise
- Reduce default memory check interval from 1h to 5min
- Don't abort entire NATS batch on single write error (out-of-order timestamp),
  log warning and continue processing remaining lines
- Prune empty levels from tree after free() to reduce overhead
- Include buffer struct overhead in sizeInBytes() for more accurate reporting

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Entire-Checkpoint: 7ce28627fc1d
This commit is contained in:
2026-03-13 07:57:35 +01:00
parent 126f65879a
commit 8234ad3126
5 changed files with 71 additions and 31 deletions

View File

@@ -408,3 +408,12 @@ func (b *buffer) count() int64 {
} }
return res return res
} }
// bufferCount returns the number of buffer nodes in the linked list.
func (b *buffer) bufferCount() int64 {
var n int64
for ; b != nil; b = b.prev {
n++
}
return n
}

View File

@@ -53,7 +53,7 @@ const (
DefaultMaxWorkers = 10 DefaultMaxWorkers = 10
DefaultBufferCapacity = 512 DefaultBufferCapacity = 512
DefaultGCTriggerInterval = 100 DefaultGCTriggerInterval = 100
DefaultMemoryUsageTrackerInterval = 1 * time.Hour DefaultMemoryUsageTrackerInterval = 5 * time.Minute
) )
// Checkpoints configures periodic persistence of in-memory metric data. // Checkpoints configures periodic persistence of in-memory metric data.

View File

@@ -200,17 +200,33 @@ func (l *Level) free(t int64) (int, error) {
} }
} }
for _, l := range l.children { for key, child := range l.children {
m, err := l.free(t) m, err := child.free(t)
n += m n += m
if err != nil { if err != nil {
return n, err return n, err
} }
if child.isEmpty() {
delete(l.children, key)
}
} }
return n, nil return n, nil
} }
// isEmpty returns true if this level has no metrics and no children.
func (l *Level) isEmpty() bool {
l.lock.RLock()
defer l.lock.RUnlock()
for _, b := range l.metrics {
if b != nil {
return false
}
}
return len(l.children) == 0
}
// forceFree removes the oldest buffer from each metric chain in the subtree. // forceFree removes the oldest buffer from each metric chain in the subtree.
// //
// Unlike free(), which removes based on time threshold, this unconditionally removes // Unlike free(), which removes based on time threshold, this unconditionally removes
@@ -278,6 +294,7 @@ func (l *Level) sizeInBytes() int64 {
for _, b := range l.metrics { for _, b := range l.metrics {
if b != nil { if b != nil {
size += b.count() * int64(unsafe.Sizeof(schema.Float(0))) size += b.count() * int64(unsafe.Sizeof(schema.Float(0)))
size += b.bufferCount() * int64(unsafe.Sizeof(buffer{}))
} }
} }

View File

@@ -359,7 +359,8 @@ func DecodeLine(dec *lineprotocol.Decoder,
} }
if err := ms.WriteToLevel(lvl, st.selector, time, []Metric{metric}); err != nil { if err := ms.WriteToLevel(lvl, st.selector, time, []Metric{metric}); err != nil {
return err cclog.Warnf("write error for host %s metric %s at ts %d: %s", host, string(st.metricBuf), time, err.Error())
continue
} }
} }
return nil return nil

View File

@@ -384,14 +384,16 @@ func MemoryUsageTracker(wg *sync.WaitGroup, ctx context.Context) {
ms := GetMemoryStore() ms := GetMemoryStore()
wg.Go(func() { wg.Go(func() {
d := DefaultMemoryUsageTrackerInterval normalInterval := DefaultMemoryUsageTrackerInterval
fastInterval := 30 * time.Second
if d <= 0 { if normalInterval <= 0 {
return return
} }
ticker := time.NewTicker(d) ticker := time.NewTicker(normalInterval)
defer ticker.Stop() defer ticker.Stop()
currentInterval := normalInterval
for { for {
select { select {
@@ -428,48 +430,59 @@ func MemoryUsageTracker(wg *sync.WaitGroup, ctx context.Context) {
runtime.ReadMemStats(&mem) runtime.ReadMemStats(&mem)
actualMemoryGB = float64(mem.Alloc) / 1e9 actualMemoryGB = float64(mem.Alloc) / 1e9
bufferPool.Clear()
cclog.Infof("[METRICSTORE]> Cleaned up bufferPool\n")
if actualMemoryGB > float64(Keys.MemoryCap) { if actualMemoryGB > float64(Keys.MemoryCap) {
cclog.Warnf("[METRICSTORE]> memory usage %.2f GB exceeds cap %d GB, starting emergency buffer freeing", actualMemoryGB, Keys.MemoryCap) cclog.Warnf("[METRICSTORE]> memory usage %.2f GB exceeds cap %d GB, starting emergency buffer freeing", actualMemoryGB, Keys.MemoryCap)
const maxIterations = 100 // Use progressive time-based Free with increasing threshold
// instead of ForceFree loop — fewer tree traversals, more effective
d, parseErr := time.ParseDuration(Keys.RetentionInMemory)
if parseErr != nil {
cclog.Errorf("[METRICSTORE]> cannot parse retention duration: %s", parseErr)
} else {
thresholds := []float64{0.75, 0.5, 0.25}
for _, fraction := range thresholds {
threshold := time.Now().Add(-time.Duration(float64(d) * fraction))
freed, freeErr := ms.Free(nil, threshold.Unix())
if freeErr != nil {
cclog.Errorf("[METRICSTORE]> error while freeing buffers at %.0f%% retention: %s", fraction*100, freeErr)
}
freedEmergency += freed
for i := range maxIterations { bufferPool.Clear()
if actualMemoryGB < float64(Keys.MemoryCap) { runtime.GC()
break
}
freed, err := ms.ForceFree()
if err != nil {
cclog.Errorf("[METRICSTORE]> error while force-freeing buffers: %s", err)
}
if freed == 0 {
cclog.Errorf("[METRICSTORE]> no more buffers to free after %d emergency frees, memory usage %.2f GB still exceeds cap %d GB", freedEmergency, actualMemoryGB, Keys.MemoryCap)
break
}
freedEmergency += freed
if i%10 == 0 && freedEmergency > 0 {
runtime.ReadMemStats(&mem) runtime.ReadMemStats(&mem)
actualMemoryGB = float64(mem.Alloc) / 1e9 actualMemoryGB = float64(mem.Alloc) / 1e9
if actualMemoryGB < float64(Keys.MemoryCap) {
break
}
} }
} }
// if freedEmergency > 0 { bufferPool.Clear()
// debug.FreeOSMemory() debug.FreeOSMemory()
// }
runtime.ReadMemStats(&mem) runtime.ReadMemStats(&mem)
actualMemoryGB = float64(mem.Alloc) / 1e9 actualMemoryGB = float64(mem.Alloc) / 1e9
if actualMemoryGB >= float64(Keys.MemoryCap) { if actualMemoryGB >= float64(Keys.MemoryCap) {
cclog.Errorf("[METRICSTORE]> after %d emergency frees, memory usage %.2f GB still at/above cap %d GB", freedEmergency, actualMemoryGB, Keys.MemoryCap) cclog.Errorf("[METRICSTORE]> after emergency frees (%d buffers), memory usage %.2f GB still at/above cap %d GB", freedEmergency, actualMemoryGB, Keys.MemoryCap)
} else { } else {
cclog.Infof("[METRICSTORE]> emergency freeing complete: %d buffers freed, memory now %.2f GB", freedEmergency, actualMemoryGB) cclog.Infof("[METRICSTORE]> emergency freeing complete: %d buffers freed, memory now %.2f GB", freedEmergency, actualMemoryGB)
} }
} }
// Adaptive ticker: check more frequently when memory is high
memoryRatio := actualMemoryGB / float64(Keys.MemoryCap)
if memoryRatio > 0.8 && currentInterval != fastInterval {
ticker.Reset(fastInterval)
currentInterval = fastInterval
cclog.Infof("[METRICSTORE]> memory at %.0f%% of cap, switching to fast check interval (30s)", memoryRatio*100)
} else if memoryRatio <= 0.8 && currentInterval != normalInterval {
ticker.Reset(normalInterval)
currentInterval = normalInterval
cclog.Infof("[METRICSTORE]> memory at %.0f%% of cap, switching to normal check interval", memoryRatio*100)
}
} }
} }
}) })