Add new bufferPool implementation

This commit is contained in:
Aditya Ujeniya
2026-02-27 14:44:32 +01:00
parent a418abc7d5
commit 07b989cb81
4 changed files with 155 additions and 4 deletions

View File

@@ -43,6 +43,7 @@ package metricstore
import (
"errors"
"sync"
"time"
"github.com/ClusterCockpit/cc-lib/v2/schema"
)
@@ -53,12 +54,102 @@ import (
// of data or reallocation needs to happen on writes.
const BufferCap int = DefaultBufferCapacity
var bufferPool sync.Pool = sync.Pool{
New: func() any {
// BufferPool is the global instance.
// It is initialized immediately when the package loads.
var bufferPool = NewPersistentBufferPool()
type PersistentBufferPool struct {
pool []*buffer
mu sync.Mutex
}
// NewPersistentBufferPool creates a dynamic pool for buffers.
func NewPersistentBufferPool() *PersistentBufferPool {
return &PersistentBufferPool{
pool: make([]*buffer, 0),
}
}
func (p *PersistentBufferPool) Get() *buffer {
p.mu.Lock()
defer p.mu.Unlock()
n := len(p.pool)
if n == 0 {
// Pool is empty, allocate a new one
return &buffer{
data: make([]schema.Float, 0, BufferCap),
}
},
}
// Reuse existing buffer from the pool
b := p.pool[n-1]
p.pool[n-1] = nil // Avoid memory leak
p.pool = p.pool[:n-1]
return b
}
func (p *PersistentBufferPool) Put(b *buffer) {
// Reset the buffer before putting it back
b.data = b.data[:0]
p.mu.Lock()
defer p.mu.Unlock()
p.pool = append(p.pool, b)
}
// GetSize returns the exact number of buffers currently sitting in the pool.
func (p *PersistentBufferPool) GetSize() int {
p.mu.Lock()
defer p.mu.Unlock()
return len(p.pool)
}
// Clear drains all buffers currently in the pool, allowing the GC to collect them.
func (p *PersistentBufferPool) Clear() {
p.mu.Lock()
defer p.mu.Unlock()
for i := range p.pool {
p.pool[i] = nil
}
p.pool = p.pool[:0]
}
// Clean removes buffers from the pool that haven't been used in the given duration.
// It uses a simple LRU approach based on the lastUsed timestamp.
func (p *PersistentBufferPool) Clean(threshold int64) {
p.mu.Lock()
defer p.mu.Unlock()
// Filter in place
active := p.pool[:0]
for _, b := range p.pool {
if b.lastUsed >= threshold {
active = append(active, b)
} else {
// Buffer is older than the threshold, let it be collected by GC
}
}
// Nullify the rest to prevent memory leaks
for i := len(active); i < len(p.pool); i++ {
p.pool[i] = nil
}
p.pool = active
}
// CleanAll removes all buffers from the pool.
func (p *PersistentBufferPool) CleanAll() {
p.mu.Lock()
defer p.mu.Unlock()
// Nullify all buffers to prevent memory leaks
for i := range p.pool {
p.pool[i] = nil
}
p.pool = p.pool[:0]
}
var (
@@ -94,10 +185,11 @@ type buffer struct {
start int64
archived bool
closed bool
lastUsed int64
}
func newBuffer(ts, freq int64) *buffer {
b := bufferPool.Get().(*buffer)
b := bufferPool.Get()
b.frequency = freq
b.start = ts - (freq / 2)
b.prev = nil
@@ -240,6 +332,7 @@ func (b *buffer) free(t int64) (delme bool, n int) {
if cap(b.prev.data) != BufferCap {
b.prev.data = make([]schema.Float, 0, BufferCap)
}
b.prev.lastUsed = time.Now().Unix()
bufferPool.Put(b.prev)
b.prev = nil
}

View File

@@ -42,6 +42,7 @@ package metricstore
import (
"sync"
"time"
"unsafe"
"github.com/ClusterCockpit/cc-lib/v2/schema"
@@ -192,6 +193,7 @@ func (l *Level) free(t int64) (int, error) {
if cap(b.data) != BufferCap {
b.data = make([]schema.Float, 0, BufferCap)
}
b.lastUsed = time.Now().Unix()
bufferPool.Put(b)
l.metrics[i] = nil
}

View File

@@ -357,6 +357,9 @@ func Retention(wg *sync.WaitGroup, ctx context.Context) {
}
state.mu.Unlock()
// Clean up the buffer pool
bufferPool.Clean(state.lastRetentionTime)
}
}
})
@@ -425,6 +428,9 @@ func MemoryUsageTracker(wg *sync.WaitGroup, ctx context.Context) {
runtime.ReadMemStats(&mem)
actualMemoryGB = float64(mem.Alloc) / 1e9
bufferPool.CleanAll()
cclog.Infof("[METRICSTORE]> Cleaned up bufferPool\n")
if actualMemoryGB > float64(Keys.MemoryCap) {
cclog.Warnf("[METRICSTORE]> memory usage %.2f GB exceeds cap %d GB, starting emergency buffer freeing", actualMemoryGB, Keys.MemoryCap)

View File

@@ -464,3 +464,53 @@ func TestBufferHealthChecks(t *testing.T) {
})
}
}
func TestBufferPoolClean(t *testing.T) {
// Use a fresh pool for testing
pool := NewPersistentBufferPool()
now := time.Now().Unix()
// Create some buffers and put them in the pool with different lastUsed times
b1 := &buffer{lastUsed: now - 3600, data: make([]schema.Float, 0)} // 1 hour ago
b2 := &buffer{lastUsed: now - 7200, data: make([]schema.Float, 0)} // 2 hours ago
b3 := &buffer{lastUsed: now - 180000, data: make([]schema.Float, 0)} // 50 hours ago
b4 := &buffer{lastUsed: now - 200000, data: make([]schema.Float, 0)} // 55 hours ago
b5 := &buffer{lastUsed: now, data: make([]schema.Float, 0)}
pool.Put(b1)
pool.Put(b2)
pool.Put(b3)
pool.Put(b4)
pool.Put(b5)
if pool.GetSize() != 5 {
t.Fatalf("Expected pool size 5, got %d", pool.GetSize())
}
// Clean buffers older than 48 hours
timeUpdate := time.Now().Add(48 * time.Hour).Unix()
pool.Clean(timeUpdate)
// Expected: b1, b2, b5 should remain. b3, b4 should be cleaned.
if pool.GetSize() != 3 {
t.Fatalf("Expected pool size 3 after clean, got %d", pool.GetSize())
}
validBufs := map[int64]bool{
b1.lastUsed: true,
b2.lastUsed: true,
b5.lastUsed: true,
}
for i := 0; i < 3; i++ {
b := pool.Get()
if !validBufs[b.lastUsed] {
t.Errorf("Found unexpected buffer with lastUsed %d", b.lastUsed)
}
}
if pool.GetSize() != 0 {
t.Fatalf("Expected pool to be empty, got %d", pool.GetSize())
}
}