From 32319adf72ab1030ab13cff432f6bf7a3e02f9f8 Mon Sep 17 00:00:00 2001 From: Aditya Ujeniya Date: Thu, 15 Jan 2026 21:29:21 +0100 Subject: [PATCH] Add Memory Tracker worker for CCMS --- configs/config-demo.json | 3 +- configs/config.json | 1 + internal/metricstore/buffer.go | 32 +++++++++++++ internal/metricstore/configSchema.go | 2 +- internal/metricstore/level.go | 36 +++++++++++++++ internal/metricstore/metricstore.go | 68 +++++++++++++++++++++++++++- 6 files changed, 139 insertions(+), 3 deletions(-) diff --git a/configs/config-demo.json b/configs/config-demo.json index 23e9a6cb..18688378 100644 --- a/configs/config-demo.json +++ b/configs/config-demo.json @@ -19,6 +19,7 @@ "checkpoints": { "interval": "12h" }, - "retention-in-memory": "48h" + "retention-in-memory": "48h", + "memory-cap": 100 } } \ No newline at end of file diff --git a/configs/config.json b/configs/config.json index 90a6ba79..ff224f29 100644 --- a/configs/config.json +++ b/configs/config.json @@ -49,6 +49,7 @@ "checkpoints": { "interval": "12h" }, + "memory-cap": 100, "retention-in-memory": "48h", "nats-subscriptions": [ { diff --git a/internal/metricstore/buffer.go b/internal/metricstore/buffer.go index 94d3ce76..64109abb 100644 --- a/internal/metricstore/buffer.go +++ b/internal/metricstore/buffer.go @@ -164,6 +164,38 @@ func (b *buffer) free(t int64) (delme bool, n int) { return false, n } +// forceFreeOldest recursively finds the end of the linked list (the oldest buffer) +// and removes it. +// Returns: +// +// delme: true if 'b' itself is the oldest and should be removed by the caller +// n: the number of buffers freed (will be 1 or 0) +func (b *buffer) forceFreeOldest() (delme bool, n int) { + // If there is a previous buffer, recurse down to find the oldest + if b.prev != nil { + delPrev, freed := b.prev.forceFreeOldest() + + // If the previous buffer signals it should be deleted: + if delPrev { + // Unlink references + b.prev.next = nil + + // Return to pool if capacity matches + if cap(b.prev.data) == BufferCap { + bufferPool.Put(b.prev) + } + + // Remove the link from the current buffer + b.prev = nil + } + return false, freed + } + + // If b.prev is nil, THIS buffer is the oldest. + // We return true so the parent (or the Level loop) knows to delete reference to 'b'. + return true, 1 +} + // Call `callback` on every buffer that contains data in the range from `from` to `to`. func (b *buffer) iterFromTo(from, to int64, callback func(b *buffer) error) error { if b == nil { diff --git a/internal/metricstore/configSchema.go b/internal/metricstore/configSchema.go index c06bc6d9..4677eca6 100644 --- a/internal/metricstore/configSchema.go +++ b/internal/metricstore/configSchema.go @@ -73,5 +73,5 @@ const configSchema = `{ } } }, - "required": ["checkpoints", "retention-in-memory"] + "required": ["checkpoints", "retention-in-memory", "memory-cap"] }` diff --git a/internal/metricstore/level.go b/internal/metricstore/level.go index 87aeefc9..4960563a 100644 --- a/internal/metricstore/level.go +++ b/internal/metricstore/level.go @@ -124,6 +124,42 @@ func (l *Level) free(t int64) (int, error) { return n, nil } +func (l *Level) forceFree() (int, error) { + l.lock.Lock() + defer l.lock.Unlock() + + n := 0 + + // Iterate over metrics in the current level + for i, b := range l.metrics { + if b != nil { + // Attempt to free the oldest buffer in this chain + delme, freedCount := b.forceFreeOldest() + n += freedCount + + // If delme is true, it means 'b' itself (the head) was the oldest + // and needs to be removed from the slice. + if delme { + if cap(b.data) == BufferCap { + bufferPool.Put(b) + } + l.metrics[i] = nil + } + } + } + + // Recursively traverse children + for _, child := range l.children { + m, err := child.forceFree() + n += m + if err != nil { + return n, err + } + } + + return n, nil +} + func (l *Level) sizeInBytes() int64 { l.lock.RLock() defer l.lock.RUnlock() diff --git a/internal/metricstore/metricstore.go b/internal/metricstore/metricstore.go index 1607a68c..07b9c38a 100644 --- a/internal/metricstore/metricstore.go +++ b/internal/metricstore/metricstore.go @@ -143,11 +143,13 @@ func Init(rawConfig json.RawMessage, wg *sync.WaitGroup) { checkpointingGoroutines := 1 dataStagingGoroutines := 1 archivingGoroutines := 1 + memoryUsageTracker := 1 totalGoroutines := retentionGoroutines + checkpointingGoroutines + dataStagingGoroutines + - archivingGoroutines + archivingGoroutines + + memoryUsageTracker wg.Add(totalGoroutines) @@ -155,6 +157,7 @@ func Init(rawConfig json.RawMessage, wg *sync.WaitGroup) { Checkpointing(wg, ctx) Archiving(wg, ctx) DataStaging(wg, ctx) + MemoryUsageTracker(wg, ctx) // Note: Signal handling has been removed from this function. // The caller is responsible for handling shutdown signals and calling @@ -280,6 +283,59 @@ func Retention(wg *sync.WaitGroup, ctx context.Context) { }() } +func MemoryUsageTracker(wg *sync.WaitGroup, ctx context.Context) { + ms := GetMemoryStore() + + go func() { + defer wg.Done() + d := 1 * time.Minute + + if d <= 0 { + return + } + + ticker := time.NewTicker(d) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + memoryUsageGB := ms.SizeInGB() + cclog.Infof("[METRICSTORE]> current memory usage: %.2f\n", memoryUsageGB) + + if memoryUsageGB > float64(Keys.MemoryCap) { + cclog.Warnf("[METRICSTORE]> current memory usage is greater than the Memory Cap: %d\n", Keys.MemoryCap) + cclog.Warnf("[METRICSTORE]> starting to force-free the buffers from the Metric Store\n") + + freedTotal := 0 + + for { + memoryUsageGB = ms.SizeInGB() + if memoryUsageGB < float64(Keys.MemoryCap) { + break + } + + freed, err := ms.ForceFree() + if err != nil { + cclog.Errorf("error while force-freeing the buffers: %s", err) + } + if freed == 0 { + cclog.Fatalf("0 buffers force-freed in last try, %d total buffers force-freed, memory usage of %.2f remains higher than the memory cap and there are no buffers left to force-free\n", freedTotal, memoryUsageGB) + } + freedTotal += freed + } + + cclog.Infof("[METRICSTORE]> done: %d buffers freed\n", freedTotal) + cclog.Infof("[METRICSTORE]> current memory usage after force-freeing the buffers: %.2f\n", memoryUsageGB) + } + + } + } + }() +} + func Free(ms *MemoryStore, t time.Time) (int, error) { // If no NodeProvider is configured, free all buffers older than t if ms.nodeProvider == nil { @@ -496,6 +552,12 @@ func (m *MemoryStore) Free(selector []string, t int64) (int, error) { return m.GetLevel(selector).free(t) } +// Free releases all buffers for the selected level and all its children that +// contain only values older than `t`. +func (m *MemoryStore) ForceFree() (int, error) { + return m.GetLevel(nil).forceFree() +} + func (m *MemoryStore) FreeAll() error { for k := range m.root.children { delete(m.root.children, k) @@ -508,6 +570,10 @@ func (m *MemoryStore) SizeInBytes() int64 { return m.root.sizeInBytes() } +func (m *MemoryStore) SizeInGB() float64 { + return float64(m.root.sizeInBytes()) / 1e9 +} + // ListChildren , given a selector, returns a list of all children of the level // selected. func (m *MemoryStore) ListChildren(selector []string) []string {