From 1ec41d8389e81e7c517960dda251ec6a8a53ad39 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Sat, 28 Feb 2026 19:34:33 +0100 Subject: [PATCH] Review and improve buffer pool implmentation. Add unit tests. --- pkg/metricstore/buffer.go | 75 ++-- pkg/metricstore/level.go | 9 +- pkg/metricstore/metricstore.go | 2 +- pkg/metricstore/metricstore_test.go | 520 ++++++++++++++++++++++++++++ 4 files changed, 566 insertions(+), 40 deletions(-) diff --git a/pkg/metricstore/buffer.go b/pkg/metricstore/buffer.go index f486e645..557a941c 100644 --- a/pkg/metricstore/buffer.go +++ b/pkg/metricstore/buffer.go @@ -54,6 +54,10 @@ import ( // of data or reallocation needs to happen on writes. const BufferCap int = DefaultBufferCapacity +// maxPoolSize caps the number of buffers held in the pool at any time. +// Prevents unbounded memory growth after large retention-cleanup bursts. +const maxPoolSize = 4096 + // BufferPool is the global instance. // It is initialized immediately when the package loads. var bufferPool = NewPersistentBufferPool() @@ -89,12 +93,18 @@ func (p *PersistentBufferPool) Get() *buffer { return b } +// Put returns b to the pool. The caller must set b.lastUsed = time.Now().Unix() +// before calling Put so that Clean() can evict idle entries correctly. func (p *PersistentBufferPool) Put(b *buffer) { // Reset the buffer before putting it back b.data = b.data[:0] p.mu.Lock() defer p.mu.Unlock() + if len(p.pool) >= maxPoolSize { + // Pool is full; drop the buffer and let GC collect it. + return + } p.pool = append(p.pool, b) } @@ -121,13 +131,11 @@ func (p *PersistentBufferPool) Clean(threshold int64) { p.mu.Lock() defer p.mu.Unlock() - // Filter in place + // Filter in place, retaining only buffers returned to the pool recently enough. active := p.pool[:0] for _, b := range p.pool { if b.lastUsed >= threshold { active = append(active, b) - } else { - // Buffer is older than the threshold, let it be collected by GC } } @@ -139,19 +147,6 @@ func (p *PersistentBufferPool) Clean(threshold int64) { p.pool = active } -// CleanAll removes all buffers from the pool. -func (p *PersistentBufferPool) CleanAll() { - p.mu.Lock() - defer p.mu.Unlock() - - // Nullify all buffers to prevent memory leaks - for i := range p.pool { - p.pool[i] = nil - } - - p.pool = p.pool[:0] -} - var ( // ErrNoData indicates no time-series data exists for the requested metric/level. ErrNoData error = errors.New("[METRICSTORE]> no data for this metric/level") @@ -276,11 +271,13 @@ func (b *buffer) firstWrite() int64 { // // Panics if 'data' slice is too small to hold all values in [from, to). func (b *buffer) read(from, to int64, data []schema.Float) ([]schema.Float, int64, int64, error) { - if from < b.firstWrite() { - if b.prev != nil { - return b.prev.read(from, to, data) + // Walk back to the buffer that covers 'from', adjusting if we hit the oldest. + for from < b.firstWrite() { + if b.prev == nil { + from = b.firstWrite() + break } - from = b.firstWrite() + b = b.prev } i := 0 @@ -292,16 +289,17 @@ func (b *buffer) read(from, to int64, data []schema.Float) ([]schema.Float, int6 break } b = b.next - idx = 0 + // Recalculate idx in the new buffer; a gap between buffers may exist. + idx = int((t - b.start) / b.frequency) } if idx >= len(b.data) { if b.next == nil || to <= b.next.start { break } - data[i] += schema.NaN + data[i] += schema.NaN // NaN + anything = NaN; propagates missing data } else if t < b.start { - data[i] += schema.NaN + data[i] += schema.NaN // gap before this buffer's first write } else { data[i] += b.data[idx] } @@ -359,11 +357,12 @@ func (b *buffer) forceFreeOldest() (delme bool, n int) { // If the previous buffer signals it should be deleted: if delPrev { - // Clear links on the dying buffer to prevent leaks b.prev.next = nil - b.prev.data = nil // Release the underlying float slice immediately - - // Remove the link from the current buffer + if cap(b.prev.data) != BufferCap { + b.prev.data = make([]schema.Float, 0, BufferCap) + } + b.prev.lastUsed = time.Now().Unix() + bufferPool.Put(b.prev) b.prev = nil } return false, freed @@ -392,21 +391,27 @@ func (b *buffer) iterFromTo(from, to int64, callback func(b *buffer) error) erro return nil } - if err := b.prev.iterFromTo(from, to, callback); err != nil { - return err + // Collect overlapping buffers walking backwards (newest → oldest). + var matching []*buffer + for cur := b; cur != nil; cur = cur.prev { + if from <= cur.end() && cur.start <= to { + matching = append(matching, cur) + } } - if from <= b.end() && b.start <= to { - return callback(b) + // Invoke callback in chronological order (oldest → newest). + for i := len(matching) - 1; i >= 0; i-- { + if err := callback(matching[i]); err != nil { + return err + } } - return nil } func (b *buffer) count() int64 { - res := int64(len(b.data)) - if b.prev != nil { - res += b.prev.count() + var res int64 + for ; b != nil; b = b.prev { + res += int64(len(b.data)) } return res } diff --git a/pkg/metricstore/level.go b/pkg/metricstore/level.go index ef082579..2b24a2ea 100644 --- a/pkg/metricstore/level.go +++ b/pkg/metricstore/level.go @@ -238,12 +238,13 @@ func (l *Level) forceFree() (int, error) { // If delme is true, it means 'b' itself (the head) was the oldest // and needs to be removed from the slice. if delme { - // Nil out fields to ensure no hanging references - b.next = nil b.prev = nil - b.data = nil - + if cap(b.data) != BufferCap { + b.data = make([]schema.Float, 0, BufferCap) + } + b.lastUsed = time.Now().Unix() + bufferPool.Put(b) l.metrics[i] = nil } } diff --git a/pkg/metricstore/metricstore.go b/pkg/metricstore/metricstore.go index db3e4357..b5b1a528 100644 --- a/pkg/metricstore/metricstore.go +++ b/pkg/metricstore/metricstore.go @@ -428,7 +428,7 @@ func MemoryUsageTracker(wg *sync.WaitGroup, ctx context.Context) { runtime.ReadMemStats(&mem) actualMemoryGB = float64(mem.Alloc) / 1e9 - bufferPool.CleanAll() + bufferPool.Clear() cclog.Infof("[METRICSTORE]> Cleaned up bufferPool\n") if actualMemoryGB > float64(Keys.MemoryCap) { diff --git a/pkg/metricstore/metricstore_test.go b/pkg/metricstore/metricstore_test.go index 772fd7ea..9087df2a 100644 --- a/pkg/metricstore/metricstore_test.go +++ b/pkg/metricstore/metricstore_test.go @@ -12,6 +12,526 @@ import ( "github.com/ClusterCockpit/cc-lib/v2/schema" ) +// ─── Buffer pool ───────────────────────────────────────────────────────────── + +// TestBufferPoolGetReuse verifies that Get() returns pooled buffers before +// allocating new ones, and that an empty pool allocates a fresh BufferCap buffer. +func TestBufferPoolGetReuse(t *testing.T) { + pool := NewPersistentBufferPool() + + original := &buffer{data: make([]schema.Float, 0, BufferCap), lastUsed: time.Now().Unix()} + pool.Put(original) + + reused := pool.Get() + if reused != original { + t.Error("Get() should return the previously pooled buffer") + } + if pool.GetSize() != 0 { + t.Errorf("pool size after Get() = %d, want 0", pool.GetSize()) + } + + // Empty pool must allocate a fresh buffer with the standard capacity. + fresh := pool.Get() + if fresh == nil { + t.Fatal("Get() from empty pool returned nil") + } + if cap(fresh.data) != BufferCap { + t.Errorf("fresh buffer cap = %d, want %d", cap(fresh.data), BufferCap) + } +} + +// TestBufferPoolClear verifies that Clear() drains all entries. +func TestBufferPoolClear(t *testing.T) { + pool := NewPersistentBufferPool() + for i := 0; i < 10; i++ { + pool.Put(&buffer{data: make([]schema.Float, 0), lastUsed: time.Now().Unix()}) + } + pool.Clear() + if pool.GetSize() != 0 { + t.Errorf("pool size after Clear() = %d, want 0", pool.GetSize()) + } +} + +// TestBufferPoolMaxSize verifies that Put() silently drops buffers once the +// pool reaches maxPoolSize, preventing unbounded memory growth. +func TestBufferPoolMaxSize(t *testing.T) { + pool := NewPersistentBufferPool() + for i := 0; i < maxPoolSize; i++ { + pool.Put(&buffer{data: make([]schema.Float, 0, BufferCap), lastUsed: time.Now().Unix()}) + } + if pool.GetSize() != maxPoolSize { + t.Fatalf("pool size = %d, want %d", pool.GetSize(), maxPoolSize) + } + + pool.Put(&buffer{data: make([]schema.Float, 0, BufferCap), lastUsed: time.Now().Unix()}) + if pool.GetSize() != maxPoolSize { + t.Errorf("pool size after overflow Put = %d, want %d (should not grow)", pool.GetSize(), maxPoolSize) + } +} + +// ─── Buffer helpers ─────────────────────────────────────────────────────────── + +// TestBufferEndFirstWrite verifies the end() and firstWrite() calculations. +func TestBufferEndFirstWrite(t *testing.T) { + // start=90, freq=10 → firstWrite = 90+5 = 95 + b := &buffer{data: make([]schema.Float, 4, BufferCap), frequency: 10, start: 90} + if fw := b.firstWrite(); fw != 95 { + t.Errorf("firstWrite() = %d, want 95", fw) + } + // end = firstWrite + len(data)*freq = 95 + 4*10 = 135 + if e := b.end(); e != 135 { + t.Errorf("end() = %d, want 135", e) + } +} + +// ─── Buffer write ───────────────────────────────────────────────────────────── + +// TestBufferWriteNaNFill verifies that skipped timestamps are filled with NaN. +func TestBufferWriteNaNFill(t *testing.T) { + b := newBuffer(100, 10) + b.write(100, schema.Float(1.0)) + // skip 110 and 120 + b.write(130, schema.Float(4.0)) + + if len(b.data) != 4 { + t.Fatalf("len(data) = %d, want 4 (1 value + 2 NaN + 1 value)", len(b.data)) + } + if b.data[0] != schema.Float(1.0) { + t.Errorf("data[0] = %v, want 1.0", b.data[0]) + } + if !b.data[1].IsNaN() { + t.Errorf("data[1] should be NaN (gap), got %v", b.data[1]) + } + if !b.data[2].IsNaN() { + t.Errorf("data[2] should be NaN (gap), got %v", b.data[2]) + } + if b.data[3] != schema.Float(4.0) { + t.Errorf("data[3] = %v, want 4.0", b.data[3]) + } +} + +// TestBufferWriteCapacityOverflow verifies that exceeding capacity creates and +// links a new buffer rather than panicking or silently dropping data. +func TestBufferWriteCapacityOverflow(t *testing.T) { + // Cap=2 so the third write must overflow into a new buffer. + b := &buffer{data: make([]schema.Float, 0, 2), frequency: 10, start: 95} + + nb, _ := b.write(100, schema.Float(1.0)) + nb, _ = nb.write(110, schema.Float(2.0)) + nb, err := nb.write(120, schema.Float(3.0)) + if err != nil { + t.Fatalf("write() error = %v", err) + } + if nb == b { + t.Fatal("write() should have returned a new buffer after overflow") + } + if nb.prev != b { + t.Error("new buffer should link back to old via prev") + } + if b.next != nb { + t.Error("old buffer should link forward to new via next") + } + if len(b.data) != 2 { + t.Errorf("old buffer len = %d, want 2 (full)", len(b.data)) + } + if nb.data[0] != schema.Float(3.0) { + t.Errorf("new buffer data[0] = %v, want 3.0", nb.data[0]) + } +} + +// TestBufferWriteOverwrite verifies that writing to an already-occupied index +// replaces the value rather than appending. +func TestBufferWriteOverwrite(t *testing.T) { + b := newBuffer(100, 10) + b.write(100, schema.Float(1.0)) + b.write(110, schema.Float(2.0)) + + // Overwrite the first slot. + b.write(100, schema.Float(99.0)) + if len(b.data) != 2 { + t.Errorf("len(data) after overwrite = %d, want 2 (no append)", len(b.data)) + } + if b.data[0] != schema.Float(99.0) { + t.Errorf("data[0] after overwrite = %v, want 99.0", b.data[0]) + } +} + +// ─── Buffer read ────────────────────────────────────────────────────────────── + +// TestBufferReadBeforeFirstWrite verifies that 'from' is clamped to firstWrite +// when the requested range starts before any data in the chain. +func TestBufferReadBeforeFirstWrite(t *testing.T) { + b := newBuffer(100, 10) // firstWrite = 100 + b.write(100, schema.Float(1.0)) + b.write(110, schema.Float(2.0)) + + data := make([]schema.Float, 10) + result, adjustedFrom, _, err := b.read(50, 120, data) + if err != nil { + t.Fatalf("read() error = %v", err) + } + if adjustedFrom != 100 { + t.Errorf("adjustedFrom = %d, want 100 (clamped to firstWrite)", adjustedFrom) + } + if len(result) != 2 { + t.Errorf("len(result) = %d, want 2", len(result)) + } +} + +// TestBufferReadChain verifies that read() traverses a multi-buffer chain and +// returns contiguous values from both buffers. +// +// The switch to b.next in read() triggers on idx >= cap(b.data), so b1 must +// be full (len == cap) for the loop to advance to b2 without producing NaN. +func TestBufferReadChain(t *testing.T) { + // b1: cap=3, covers t=100..120. b2: covers t=130..150. b2 is head. + b1 := &buffer{data: make([]schema.Float, 0, 3), frequency: 10, start: 95} + b1.data = append(b1.data, 1.0, 2.0, 3.0) // fills b1: len=cap=3 + + b2 := &buffer{data: make([]schema.Float, 0, 3), frequency: 10, start: 125} + b2.data = append(b2.data, 4.0, 5.0, 6.0) // t=130,140,150 + b2.prev = b1 + b1.next = b2 + + data := make([]schema.Float, 6) + result, from, to, err := b2.read(100, 160, data) + if err != nil { + t.Fatalf("read() error = %v", err) + } + if from != 100 || to != 160 { + t.Errorf("read() from/to = %d/%d, want 100/160", from, to) + } + if len(result) != 6 { + t.Fatalf("len(result) = %d, want 6", len(result)) + } + for i, want := range []schema.Float{1, 2, 3, 4, 5, 6} { + if result[i] != want { + t.Errorf("result[%d] = %v, want %v", i, result[i], want) + } + } +} + +// TestBufferReadIdxAfterSwitch is a regression test for the index recalculation +// bug after switching to b.next during a read. +// +// When both buffers share the same start time (can happen with checkpoint-loaded +// chains), the old code hardcoded idx=0 after the switch, causing reads at time t +// to return the wrong element from the next buffer. +func TestBufferReadIdxAfterSwitch(t *testing.T) { + // b1: cap=2, both buffers start at 0 (firstWrite=5). + // b1 carries t=5 and t=15; b2 carries t=5,15,25,35 with the same start. + // When reading reaches t=25 the loop overflows b1 (idx=2 >= cap=2) and + // switches to b2. The correct index in b2 is (25-0)/10=2 → b2.data[2]=30.0. + // The old code set idx=0 → b2.data[0]=10.0 (wrong). + b1 := &buffer{data: make([]schema.Float, 0, 2), frequency: 10, start: 0} + b1.data = append(b1.data, schema.Float(1.0), schema.Float(2.0)) // t=5, t=15 + + b2 := &buffer{data: make([]schema.Float, 0, 10), frequency: 10, start: 0} + b2.data = append(b2.data, + schema.Float(10.0), schema.Float(20.0), + schema.Float(30.0), schema.Float(40.0)) // t=5,15,25,35 + b2.prev = b1 + b1.next = b2 + + // from=0 triggers the walkback to b1 (from < b2.firstWrite=5). + // After clamping, the loop runs t=5,15,25,35. + data := make([]schema.Float, 4) + result, _, _, err := b2.read(0, 36, data) + if err != nil { + t.Fatalf("read() error = %v", err) + } + if len(result) < 3 { + t.Fatalf("len(result) = %d, want >= 3", len(result)) + } + if result[0] != schema.Float(1.0) { + t.Errorf("result[0] (t=5) = %v, want 1.0 (from b1)", result[0]) + } + if result[1] != schema.Float(2.0) { + t.Errorf("result[1] (t=15) = %v, want 2.0 (from b1)", result[1]) + } + // This is the critical assertion: old code returned 10.0 (b2.data[0]). + if result[2] != schema.Float(30.0) { + t.Errorf("result[2] (t=25) = %v, want 30.0 (idx recalculation fix)", result[2]) + } +} + +// TestBufferReadNaNValues verifies that NaN slots written to the buffer are +// returned as NaN during read. +func TestBufferReadNaNValues(t *testing.T) { + b := newBuffer(100, 10) + b.write(100, schema.Float(1.0)) + b.write(110, schema.NaN) + b.write(120, schema.Float(3.0)) + + data := make([]schema.Float, 3) + result, _, _, err := b.read(100, 130, data) + if err != nil { + t.Fatalf("read() error = %v", err) + } + if len(result) != 3 { + t.Fatalf("len(result) = %d, want 3", len(result)) + } + if result[0] != schema.Float(1.0) { + t.Errorf("result[0] = %v, want 1.0", result[0]) + } + if !result[1].IsNaN() { + t.Errorf("result[1] should be NaN, got %v", result[1]) + } + if result[2] != schema.Float(3.0) { + t.Errorf("result[2] = %v, want 3.0", result[2]) + } +} + +// TestBufferReadAccumulation verifies the += accumulation pattern used for +// aggregation: values are added to whatever was already in the data slice. +func TestBufferReadAccumulation(t *testing.T) { + b := newBuffer(100, 10) + b.write(100, schema.Float(3.0)) + b.write(110, schema.Float(5.0)) + + // Pre-populate data slice (simulates a second metric being summed in). + data := []schema.Float{2.0, 1.0, 0.0} + result, _, _, err := b.read(100, 120, data) + if err != nil { + t.Fatalf("read() error = %v", err) + } + // 2.0+3.0=5.0, 1.0+5.0=6.0 + if result[0] != schema.Float(5.0) { + t.Errorf("result[0] = %v, want 5.0 (2+3)", result[0]) + } + if result[1] != schema.Float(6.0) { + t.Errorf("result[1] = %v, want 6.0 (1+5)", result[1]) + } +} + +// ─── Buffer free ───────────────────────────────────────────────────────────── + +// newTestPool swaps out the package-level bufferPool for a fresh isolated one +// and returns a cleanup function that restores the original. +func newTestPool(t *testing.T) *PersistentBufferPool { + t.Helper() + pool := NewPersistentBufferPool() + saved := bufferPool + bufferPool = pool + t.Cleanup(func() { bufferPool = saved }) + return pool +} + +// TestBufferFreeRetention verifies that free() removes buffers whose entire +// time range falls before the retention threshold and returns them to the pool. +func TestBufferFreeRetention(t *testing.T) { + pool := newTestPool(t) + + // b1: firstWrite=5, end=25 b2: firstWrite=25, end=45 b3: firstWrite=45, end=65 + b1 := &buffer{data: make([]schema.Float, 0, BufferCap), frequency: 10, start: 0} + b1.data = append(b1.data, 1.0, 2.0) + + b2 := &buffer{data: make([]schema.Float, 0, BufferCap), frequency: 10, start: 20} + b2.data = append(b2.data, 3.0, 4.0) + b2.prev = b1 + b1.next = b2 + + b3 := &buffer{data: make([]schema.Float, 0, BufferCap), frequency: 10, start: 40} + b3.data = append(b3.data, 5.0, 6.0) + b3.prev = b2 + b2.next = b3 + + // Threshold=30: b1.end()=25 < 30 → freed; b2.end()=45 >= 30 → kept. + delme, n := b3.free(30) + if delme { + t.Error("head buffer b3 should not be marked for deletion") + } + if n != 1 { + t.Errorf("freed count = %d, want 1", n) + } + if b2.prev != nil { + t.Error("b1 should have been unlinked from b2.prev") + } + if b3.prev != b2 { + t.Error("b3 should still reference b2") + } + if pool.GetSize() != 1 { + t.Errorf("pool size = %d, want 1 (b1 returned)", pool.GetSize()) + } +} + +// TestBufferFreeAll verifies that free() removes all buffers and signals the +// caller to delete the head when the entire chain is older than the threshold. +func TestBufferFreeAll(t *testing.T) { + pool := newTestPool(t) + + b1 := &buffer{data: make([]schema.Float, 0, BufferCap), frequency: 10, start: 0} + b1.data = append(b1.data, 1.0, 2.0) // end=25 + + b2 := &buffer{data: make([]schema.Float, 0, BufferCap), frequency: 10, start: 20} + b2.data = append(b2.data, 3.0, 4.0) // end=45 + b2.prev = b1 + b1.next = b2 + + // Threshold=100 > both ends → both should be freed. + delme, n := b2.free(100) + if !delme { + t.Error("head buffer b2 should be marked for deletion when all data is stale") + } + if n != 2 { + t.Errorf("freed count = %d, want 2", n) + } + // b1 was freed inside free(); b2 is returned with delme=true for the caller. + if pool.GetSize() != 1 { + t.Errorf("pool size = %d, want 1 (b1 returned; b2 returned by caller)", pool.GetSize()) + } +} + +// ─── forceFreeOldest ───────────────────────────────────────────────────────── + +// TestForceFreeOldestPoolReturn verifies that forceFreeOldest() returns the +// freed buffer to the pool (regression: previously it was just dropped). +func TestForceFreeOldestPoolReturn(t *testing.T) { + pool := newTestPool(t) + + b1 := &buffer{data: make([]schema.Float, 0, BufferCap), frequency: 10, start: 0} + b2 := &buffer{data: make([]schema.Float, 0, BufferCap), frequency: 10, start: 20} + b3 := &buffer{data: make([]schema.Float, 0, BufferCap), frequency: 10, start: 40} + b1.data = append(b1.data, 1.0) + b2.data = append(b2.data, 2.0) + b3.data = append(b3.data, 3.0) + b2.prev = b1 + b1.next = b2 + b3.prev = b2 + b2.next = b3 + + delme, n := b3.forceFreeOldest() + if delme { + t.Error("head b3 should not be marked for deletion (chain has 3 buffers)") + } + if n != 1 { + t.Errorf("freed count = %d, want 1", n) + } + if b2.prev != nil { + t.Error("b1 should have been unlinked from b2.prev after forceFreeOldest") + } + if b3.prev != b2 { + t.Error("b3 should still link to b2") + } + if pool.GetSize() != 1 { + t.Errorf("pool size = %d, want 1 (b1 returned to pool)", pool.GetSize()) + } +} + +// TestForceFreeOldestSingleBuffer verifies that forceFreeOldest() returns +// delme=true when the buffer is the only one in the chain. +func TestForceFreeOldestSingleBuffer(t *testing.T) { + b := newBuffer(100, 10) + b.write(100, schema.Float(1.0)) + + delme, n := b.forceFreeOldest() + if !delme { + t.Error("single-buffer chain: expected delme=true (the buffer IS the oldest)") + } + if n != 1 { + t.Errorf("freed count = %d, want 1", n) + } +} + +// ─── iterFromTo ─────────────────────────────────────────────────────────────── + +// TestBufferIterFromToOrder verifies that iterFromTo invokes the callback in +// chronological order (oldest → newest). +func TestBufferIterFromToOrder(t *testing.T) { + // Each buffer has 2 data points so end() = firstWrite + 2*freq. + b1 := &buffer{data: make([]schema.Float, 2, BufferCap), frequency: 10, start: 0} // end=25 + b2 := &buffer{data: make([]schema.Float, 2, BufferCap), frequency: 10, start: 20} // end=45 + b3 := &buffer{data: make([]schema.Float, 2, BufferCap), frequency: 10, start: 40} // end=65 + b2.prev = b1 + b1.next = b2 + b3.prev = b2 + b2.next = b3 + + var order []*buffer + err := b3.iterFromTo(0, 100, func(b *buffer) error { + order = append(order, b) + return nil + }) + if err != nil { + t.Fatalf("iterFromTo() error = %v", err) + } + if len(order) != 3 { + t.Fatalf("callback count = %d, want 3", len(order)) + } + if order[0] != b1 || order[1] != b2 || order[2] != b3 { + t.Error("iterFromTo() did not call callbacks in chronological (oldest→newest) order") + } +} + +// TestBufferIterFromToFiltered verifies that iterFromTo only calls the callback +// for buffers whose time range overlaps [from, to]. +func TestBufferIterFromToFiltered(t *testing.T) { + // b1: end=25 b2: start=20, end=45 b3: start=40, end=65 + b1 := &buffer{data: make([]schema.Float, 2, BufferCap), frequency: 10, start: 0} + b2 := &buffer{data: make([]schema.Float, 2, BufferCap), frequency: 10, start: 20} + b3 := &buffer{data: make([]schema.Float, 2, BufferCap), frequency: 10, start: 40} + b2.prev = b1 + b1.next = b2 + b3.prev = b2 + b2.next = b3 + + // [30,50]: b1.end=25 < 30 → excluded; b2 and b3 overlap → included. + var visited []*buffer + b3.iterFromTo(30, 50, func(b *buffer) error { + visited = append(visited, b) + return nil + }) + if len(visited) != 2 { + t.Fatalf("visited count = %d, want 2 (b2 and b3)", len(visited)) + } + if visited[0] != b2 || visited[1] != b3 { + t.Errorf("visited = %v, want [b2, b3]", visited) + } +} + +// TestBufferIterFromToNilBuffer verifies that iterFromTo on a nil buffer is a +// safe no-op. +func TestBufferIterFromToNilBuffer(t *testing.T) { + var b *buffer + called := false + err := b.iterFromTo(0, 100, func(_ *buffer) error { + called = true + return nil + }) + if err != nil { + t.Errorf("iterFromTo(nil) error = %v, want nil", err) + } + if called { + t.Error("callback should not be called for a nil buffer") + } +} + +// ─── count ──────────────────────────────────────────────────────────────────── + +// TestBufferCount verifies that count() sums data-point lengths across the +// entire chain, including all prev links. +func TestBufferCount(t *testing.T) { + b1 := &buffer{data: make([]schema.Float, 3, BufferCap), frequency: 10, start: 0} + b2 := &buffer{data: make([]schema.Float, 2, BufferCap), frequency: 10, start: 35} + b3 := &buffer{data: make([]schema.Float, 5, BufferCap), frequency: 10, start: 60} + b2.prev = b1 + b1.next = b2 + b3.prev = b2 + b2.next = b3 + + if got := b3.count(); got != 10 { + t.Errorf("count() = %d, want 10 (3+2+5)", got) + } + + // Single buffer. + lone := &buffer{data: make([]schema.Float, 7, BufferCap)} + if got := lone.count(); got != 7 { + t.Errorf("count() single buffer = %d, want 7", got) + } +} + +// ─── Existing tests below ──────────────────────────────────────────────────── + func TestAssignAggregationStrategy(t *testing.T) { tests := []struct { name string