diff --git a/pkg/metricstore/buffer.go b/pkg/metricstore/buffer.go index 665d8012..f486e645 100644 --- a/pkg/metricstore/buffer.go +++ b/pkg/metricstore/buffer.go @@ -43,6 +43,7 @@ package metricstore import ( "errors" "sync" + "time" "github.com/ClusterCockpit/cc-lib/v2/schema" ) @@ -53,12 +54,102 @@ import ( // of data or reallocation needs to happen on writes. const BufferCap int = DefaultBufferCapacity -var bufferPool sync.Pool = sync.Pool{ - New: func() any { +// BufferPool is the global instance. +// It is initialized immediately when the package loads. +var bufferPool = NewPersistentBufferPool() + +type PersistentBufferPool struct { + pool []*buffer + mu sync.Mutex +} + +// NewPersistentBufferPool creates a dynamic pool for buffers. +func NewPersistentBufferPool() *PersistentBufferPool { + return &PersistentBufferPool{ + pool: make([]*buffer, 0), + } +} + +func (p *PersistentBufferPool) Get() *buffer { + p.mu.Lock() + defer p.mu.Unlock() + + n := len(p.pool) + if n == 0 { + // Pool is empty, allocate a new one return &buffer{ data: make([]schema.Float, 0, BufferCap), } - }, + } + + // Reuse existing buffer from the pool + b := p.pool[n-1] + p.pool[n-1] = nil // Avoid memory leak + p.pool = p.pool[:n-1] + return b +} + +func (p *PersistentBufferPool) Put(b *buffer) { + // Reset the buffer before putting it back + b.data = b.data[:0] + + p.mu.Lock() + defer p.mu.Unlock() + p.pool = append(p.pool, b) +} + +// GetSize returns the exact number of buffers currently sitting in the pool. +func (p *PersistentBufferPool) GetSize() int { + p.mu.Lock() + defer p.mu.Unlock() + return len(p.pool) +} + +// Clear drains all buffers currently in the pool, allowing the GC to collect them. +func (p *PersistentBufferPool) Clear() { + p.mu.Lock() + defer p.mu.Unlock() + for i := range p.pool { + p.pool[i] = nil + } + p.pool = p.pool[:0] +} + +// Clean removes buffers from the pool that haven't been used in the given duration. +// It uses a simple LRU approach based on the lastUsed timestamp. +func (p *PersistentBufferPool) Clean(threshold int64) { + p.mu.Lock() + defer p.mu.Unlock() + + // Filter in place + active := p.pool[:0] + for _, b := range p.pool { + if b.lastUsed >= threshold { + active = append(active, b) + } else { + // Buffer is older than the threshold, let it be collected by GC + } + } + + // Nullify the rest to prevent memory leaks + for i := len(active); i < len(p.pool); i++ { + p.pool[i] = nil + } + + p.pool = active +} + +// CleanAll removes all buffers from the pool. +func (p *PersistentBufferPool) CleanAll() { + p.mu.Lock() + defer p.mu.Unlock() + + // Nullify all buffers to prevent memory leaks + for i := range p.pool { + p.pool[i] = nil + } + + p.pool = p.pool[:0] } var ( @@ -94,10 +185,11 @@ type buffer struct { start int64 archived bool closed bool + lastUsed int64 } func newBuffer(ts, freq int64) *buffer { - b := bufferPool.Get().(*buffer) + b := bufferPool.Get() b.frequency = freq b.start = ts - (freq / 2) b.prev = nil @@ -240,6 +332,7 @@ func (b *buffer) free(t int64) (delme bool, n int) { if cap(b.prev.data) != BufferCap { b.prev.data = make([]schema.Float, 0, BufferCap) } + b.prev.lastUsed = time.Now().Unix() bufferPool.Put(b.prev) b.prev = nil } diff --git a/pkg/metricstore/level.go b/pkg/metricstore/level.go index 85c2ba7b..ef082579 100644 --- a/pkg/metricstore/level.go +++ b/pkg/metricstore/level.go @@ -42,6 +42,7 @@ package metricstore import ( "sync" + "time" "unsafe" "github.com/ClusterCockpit/cc-lib/v2/schema" @@ -192,6 +193,7 @@ func (l *Level) free(t int64) (int, error) { if cap(b.data) != BufferCap { b.data = make([]schema.Float, 0, BufferCap) } + b.lastUsed = time.Now().Unix() bufferPool.Put(b) l.metrics[i] = nil } diff --git a/pkg/metricstore/metricstore.go b/pkg/metricstore/metricstore.go index d46c0d15..db3e4357 100644 --- a/pkg/metricstore/metricstore.go +++ b/pkg/metricstore/metricstore.go @@ -357,6 +357,9 @@ func Retention(wg *sync.WaitGroup, ctx context.Context) { } state.mu.Unlock() + + // Clean up the buffer pool + bufferPool.Clean(state.lastRetentionTime) } } }) @@ -425,6 +428,9 @@ func MemoryUsageTracker(wg *sync.WaitGroup, ctx context.Context) { runtime.ReadMemStats(&mem) actualMemoryGB = float64(mem.Alloc) / 1e9 + bufferPool.CleanAll() + cclog.Infof("[METRICSTORE]> Cleaned up bufferPool\n") + if actualMemoryGB > float64(Keys.MemoryCap) { cclog.Warnf("[METRICSTORE]> memory usage %.2f GB exceeds cap %d GB, starting emergency buffer freeing", actualMemoryGB, Keys.MemoryCap) diff --git a/pkg/metricstore/metricstore_test.go b/pkg/metricstore/metricstore_test.go index eb1aff15..772fd7ea 100644 --- a/pkg/metricstore/metricstore_test.go +++ b/pkg/metricstore/metricstore_test.go @@ -464,3 +464,53 @@ func TestBufferHealthChecks(t *testing.T) { }) } } + +func TestBufferPoolClean(t *testing.T) { + // Use a fresh pool for testing + pool := NewPersistentBufferPool() + + now := time.Now().Unix() + + // Create some buffers and put them in the pool with different lastUsed times + b1 := &buffer{lastUsed: now - 3600, data: make([]schema.Float, 0)} // 1 hour ago + b2 := &buffer{lastUsed: now - 7200, data: make([]schema.Float, 0)} // 2 hours ago + b3 := &buffer{lastUsed: now - 180000, data: make([]schema.Float, 0)} // 50 hours ago + b4 := &buffer{lastUsed: now - 200000, data: make([]schema.Float, 0)} // 55 hours ago + b5 := &buffer{lastUsed: now, data: make([]schema.Float, 0)} + + pool.Put(b1) + pool.Put(b2) + pool.Put(b3) + pool.Put(b4) + pool.Put(b5) + + if pool.GetSize() != 5 { + t.Fatalf("Expected pool size 5, got %d", pool.GetSize()) + } + + // Clean buffers older than 48 hours + timeUpdate := time.Now().Add(-48 * time.Hour).Unix() + pool.Clean(timeUpdate) + + // Expected: b1, b2, b5 should remain. b3, b4 should be cleaned. + if pool.GetSize() != 3 { + t.Fatalf("Expected pool size 3 after clean, got %d", pool.GetSize()) + } + + validBufs := map[int64]bool{ + b1.lastUsed: true, + b2.lastUsed: true, + b5.lastUsed: true, + } + + for i := 0; i < 3; i++ { + b := pool.Get() + if !validBufs[b.lastUsed] { + t.Errorf("Found unexpected buffer with lastUsed %d", b.lastUsed) + } + } + + if pool.GetSize() != 0 { + t.Fatalf("Expected pool to be empty, got %d", pool.GetSize()) + } +}