Review and improve buffer pool implmentation. Add unit tests.

This commit is contained in:
2026-02-28 19:34:33 +01:00
parent 888d7fb235
commit 1ec41d8389
4 changed files with 566 additions and 40 deletions

View File

@@ -54,6 +54,10 @@ import (
// of data or reallocation needs to happen on writes.
const BufferCap int = DefaultBufferCapacity
// maxPoolSize caps the number of buffers held in the pool at any time.
// Prevents unbounded memory growth after large retention-cleanup bursts.
const maxPoolSize = 4096
// BufferPool is the global instance.
// It is initialized immediately when the package loads.
var bufferPool = NewPersistentBufferPool()
@@ -89,12 +93,18 @@ func (p *PersistentBufferPool) Get() *buffer {
return b
}
// Put returns b to the pool. The caller must set b.lastUsed = time.Now().Unix()
// before calling Put so that Clean() can evict idle entries correctly.
func (p *PersistentBufferPool) Put(b *buffer) {
// Reset the buffer before putting it back
b.data = b.data[:0]
p.mu.Lock()
defer p.mu.Unlock()
if len(p.pool) >= maxPoolSize {
// Pool is full; drop the buffer and let GC collect it.
return
}
p.pool = append(p.pool, b)
}
@@ -121,13 +131,11 @@ func (p *PersistentBufferPool) Clean(threshold int64) {
p.mu.Lock()
defer p.mu.Unlock()
// Filter in place
// Filter in place, retaining only buffers returned to the pool recently enough.
active := p.pool[:0]
for _, b := range p.pool {
if b.lastUsed >= threshold {
active = append(active, b)
} else {
// Buffer is older than the threshold, let it be collected by GC
}
}
@@ -139,19 +147,6 @@ func (p *PersistentBufferPool) Clean(threshold int64) {
p.pool = active
}
// CleanAll removes all buffers from the pool.
func (p *PersistentBufferPool) CleanAll() {
p.mu.Lock()
defer p.mu.Unlock()
// Nullify all buffers to prevent memory leaks
for i := range p.pool {
p.pool[i] = nil
}
p.pool = p.pool[:0]
}
var (
// ErrNoData indicates no time-series data exists for the requested metric/level.
ErrNoData error = errors.New("[METRICSTORE]> no data for this metric/level")
@@ -276,11 +271,13 @@ func (b *buffer) firstWrite() int64 {
//
// Panics if 'data' slice is too small to hold all values in [from, to).
func (b *buffer) read(from, to int64, data []schema.Float) ([]schema.Float, int64, int64, error) {
if from < b.firstWrite() {
if b.prev != nil {
return b.prev.read(from, to, data)
}
// Walk back to the buffer that covers 'from', adjusting if we hit the oldest.
for from < b.firstWrite() {
if b.prev == nil {
from = b.firstWrite()
break
}
b = b.prev
}
i := 0
@@ -292,16 +289,17 @@ func (b *buffer) read(from, to int64, data []schema.Float) ([]schema.Float, int6
break
}
b = b.next
idx = 0
// Recalculate idx in the new buffer; a gap between buffers may exist.
idx = int((t - b.start) / b.frequency)
}
if idx >= len(b.data) {
if b.next == nil || to <= b.next.start {
break
}
data[i] += schema.NaN
data[i] += schema.NaN // NaN + anything = NaN; propagates missing data
} else if t < b.start {
data[i] += schema.NaN
data[i] += schema.NaN // gap before this buffer's first write
} else {
data[i] += b.data[idx]
}
@@ -359,11 +357,12 @@ func (b *buffer) forceFreeOldest() (delme bool, n int) {
// If the previous buffer signals it should be deleted:
if delPrev {
// Clear links on the dying buffer to prevent leaks
b.prev.next = nil
b.prev.data = nil // Release the underlying float slice immediately
// Remove the link from the current buffer
if cap(b.prev.data) != BufferCap {
b.prev.data = make([]schema.Float, 0, BufferCap)
}
b.prev.lastUsed = time.Now().Unix()
bufferPool.Put(b.prev)
b.prev = nil
}
return false, freed
@@ -392,21 +391,27 @@ func (b *buffer) iterFromTo(from, to int64, callback func(b *buffer) error) erro
return nil
}
if err := b.prev.iterFromTo(from, to, callback); err != nil {
// Collect overlapping buffers walking backwards (newest → oldest).
var matching []*buffer
for cur := b; cur != nil; cur = cur.prev {
if from <= cur.end() && cur.start <= to {
matching = append(matching, cur)
}
}
// Invoke callback in chronological order (oldest → newest).
for i := len(matching) - 1; i >= 0; i-- {
if err := callback(matching[i]); err != nil {
return err
}
if from <= b.end() && b.start <= to {
return callback(b)
}
return nil
}
func (b *buffer) count() int64 {
res := int64(len(b.data))
if b.prev != nil {
res += b.prev.count()
var res int64
for ; b != nil; b = b.prev {
res += int64(len(b.data))
}
return res
}

View File

@@ -238,12 +238,13 @@ func (l *Level) forceFree() (int, error) {
// If delme is true, it means 'b' itself (the head) was the oldest
// and needs to be removed from the slice.
if delme {
// Nil out fields to ensure no hanging references
b.next = nil
b.prev = nil
b.data = nil
if cap(b.data) != BufferCap {
b.data = make([]schema.Float, 0, BufferCap)
}
b.lastUsed = time.Now().Unix()
bufferPool.Put(b)
l.metrics[i] = nil
}
}

View File

@@ -428,7 +428,7 @@ func MemoryUsageTracker(wg *sync.WaitGroup, ctx context.Context) {
runtime.ReadMemStats(&mem)
actualMemoryGB = float64(mem.Alloc) / 1e9
bufferPool.CleanAll()
bufferPool.Clear()
cclog.Infof("[METRICSTORE]> Cleaned up bufferPool\n")
if actualMemoryGB > float64(Keys.MemoryCap) {

View File

@@ -12,6 +12,526 @@ import (
"github.com/ClusterCockpit/cc-lib/v2/schema"
)
// ─── Buffer pool ─────────────────────────────────────────────────────────────
// TestBufferPoolGetReuse verifies that Get() returns pooled buffers before
// allocating new ones, and that an empty pool allocates a fresh BufferCap buffer.
func TestBufferPoolGetReuse(t *testing.T) {
pool := NewPersistentBufferPool()
original := &buffer{data: make([]schema.Float, 0, BufferCap), lastUsed: time.Now().Unix()}
pool.Put(original)
reused := pool.Get()
if reused != original {
t.Error("Get() should return the previously pooled buffer")
}
if pool.GetSize() != 0 {
t.Errorf("pool size after Get() = %d, want 0", pool.GetSize())
}
// Empty pool must allocate a fresh buffer with the standard capacity.
fresh := pool.Get()
if fresh == nil {
t.Fatal("Get() from empty pool returned nil")
}
if cap(fresh.data) != BufferCap {
t.Errorf("fresh buffer cap = %d, want %d", cap(fresh.data), BufferCap)
}
}
// TestBufferPoolClear verifies that Clear() drains all entries.
func TestBufferPoolClear(t *testing.T) {
pool := NewPersistentBufferPool()
for i := 0; i < 10; i++ {
pool.Put(&buffer{data: make([]schema.Float, 0), lastUsed: time.Now().Unix()})
}
pool.Clear()
if pool.GetSize() != 0 {
t.Errorf("pool size after Clear() = %d, want 0", pool.GetSize())
}
}
// TestBufferPoolMaxSize verifies that Put() silently drops buffers once the
// pool reaches maxPoolSize, preventing unbounded memory growth.
func TestBufferPoolMaxSize(t *testing.T) {
pool := NewPersistentBufferPool()
for i := 0; i < maxPoolSize; i++ {
pool.Put(&buffer{data: make([]schema.Float, 0, BufferCap), lastUsed: time.Now().Unix()})
}
if pool.GetSize() != maxPoolSize {
t.Fatalf("pool size = %d, want %d", pool.GetSize(), maxPoolSize)
}
pool.Put(&buffer{data: make([]schema.Float, 0, BufferCap), lastUsed: time.Now().Unix()})
if pool.GetSize() != maxPoolSize {
t.Errorf("pool size after overflow Put = %d, want %d (should not grow)", pool.GetSize(), maxPoolSize)
}
}
// ─── Buffer helpers ───────────────────────────────────────────────────────────
// TestBufferEndFirstWrite verifies the end() and firstWrite() calculations.
func TestBufferEndFirstWrite(t *testing.T) {
// start=90, freq=10 → firstWrite = 90+5 = 95
b := &buffer{data: make([]schema.Float, 4, BufferCap), frequency: 10, start: 90}
if fw := b.firstWrite(); fw != 95 {
t.Errorf("firstWrite() = %d, want 95", fw)
}
// end = firstWrite + len(data)*freq = 95 + 4*10 = 135
if e := b.end(); e != 135 {
t.Errorf("end() = %d, want 135", e)
}
}
// ─── Buffer write ─────────────────────────────────────────────────────────────
// TestBufferWriteNaNFill verifies that skipped timestamps are filled with NaN.
func TestBufferWriteNaNFill(t *testing.T) {
b := newBuffer(100, 10)
b.write(100, schema.Float(1.0))
// skip 110 and 120
b.write(130, schema.Float(4.0))
if len(b.data) != 4 {
t.Fatalf("len(data) = %d, want 4 (1 value + 2 NaN + 1 value)", len(b.data))
}
if b.data[0] != schema.Float(1.0) {
t.Errorf("data[0] = %v, want 1.0", b.data[0])
}
if !b.data[1].IsNaN() {
t.Errorf("data[1] should be NaN (gap), got %v", b.data[1])
}
if !b.data[2].IsNaN() {
t.Errorf("data[2] should be NaN (gap), got %v", b.data[2])
}
if b.data[3] != schema.Float(4.0) {
t.Errorf("data[3] = %v, want 4.0", b.data[3])
}
}
// TestBufferWriteCapacityOverflow verifies that exceeding capacity creates and
// links a new buffer rather than panicking or silently dropping data.
func TestBufferWriteCapacityOverflow(t *testing.T) {
// Cap=2 so the third write must overflow into a new buffer.
b := &buffer{data: make([]schema.Float, 0, 2), frequency: 10, start: 95}
nb, _ := b.write(100, schema.Float(1.0))
nb, _ = nb.write(110, schema.Float(2.0))
nb, err := nb.write(120, schema.Float(3.0))
if err != nil {
t.Fatalf("write() error = %v", err)
}
if nb == b {
t.Fatal("write() should have returned a new buffer after overflow")
}
if nb.prev != b {
t.Error("new buffer should link back to old via prev")
}
if b.next != nb {
t.Error("old buffer should link forward to new via next")
}
if len(b.data) != 2 {
t.Errorf("old buffer len = %d, want 2 (full)", len(b.data))
}
if nb.data[0] != schema.Float(3.0) {
t.Errorf("new buffer data[0] = %v, want 3.0", nb.data[0])
}
}
// TestBufferWriteOverwrite verifies that writing to an already-occupied index
// replaces the value rather than appending.
func TestBufferWriteOverwrite(t *testing.T) {
b := newBuffer(100, 10)
b.write(100, schema.Float(1.0))
b.write(110, schema.Float(2.0))
// Overwrite the first slot.
b.write(100, schema.Float(99.0))
if len(b.data) != 2 {
t.Errorf("len(data) after overwrite = %d, want 2 (no append)", len(b.data))
}
if b.data[0] != schema.Float(99.0) {
t.Errorf("data[0] after overwrite = %v, want 99.0", b.data[0])
}
}
// ─── Buffer read ──────────────────────────────────────────────────────────────
// TestBufferReadBeforeFirstWrite verifies that 'from' is clamped to firstWrite
// when the requested range starts before any data in the chain.
func TestBufferReadBeforeFirstWrite(t *testing.T) {
b := newBuffer(100, 10) // firstWrite = 100
b.write(100, schema.Float(1.0))
b.write(110, schema.Float(2.0))
data := make([]schema.Float, 10)
result, adjustedFrom, _, err := b.read(50, 120, data)
if err != nil {
t.Fatalf("read() error = %v", err)
}
if adjustedFrom != 100 {
t.Errorf("adjustedFrom = %d, want 100 (clamped to firstWrite)", adjustedFrom)
}
if len(result) != 2 {
t.Errorf("len(result) = %d, want 2", len(result))
}
}
// TestBufferReadChain verifies that read() traverses a multi-buffer chain and
// returns contiguous values from both buffers.
//
// The switch to b.next in read() triggers on idx >= cap(b.data), so b1 must
// be full (len == cap) for the loop to advance to b2 without producing NaN.
func TestBufferReadChain(t *testing.T) {
// b1: cap=3, covers t=100..120. b2: covers t=130..150. b2 is head.
b1 := &buffer{data: make([]schema.Float, 0, 3), frequency: 10, start: 95}
b1.data = append(b1.data, 1.0, 2.0, 3.0) // fills b1: len=cap=3
b2 := &buffer{data: make([]schema.Float, 0, 3), frequency: 10, start: 125}
b2.data = append(b2.data, 4.0, 5.0, 6.0) // t=130,140,150
b2.prev = b1
b1.next = b2
data := make([]schema.Float, 6)
result, from, to, err := b2.read(100, 160, data)
if err != nil {
t.Fatalf("read() error = %v", err)
}
if from != 100 || to != 160 {
t.Errorf("read() from/to = %d/%d, want 100/160", from, to)
}
if len(result) != 6 {
t.Fatalf("len(result) = %d, want 6", len(result))
}
for i, want := range []schema.Float{1, 2, 3, 4, 5, 6} {
if result[i] != want {
t.Errorf("result[%d] = %v, want %v", i, result[i], want)
}
}
}
// TestBufferReadIdxAfterSwitch is a regression test for the index recalculation
// bug after switching to b.next during a read.
//
// When both buffers share the same start time (can happen with checkpoint-loaded
// chains), the old code hardcoded idx=0 after the switch, causing reads at time t
// to return the wrong element from the next buffer.
func TestBufferReadIdxAfterSwitch(t *testing.T) {
// b1: cap=2, both buffers start at 0 (firstWrite=5).
// b1 carries t=5 and t=15; b2 carries t=5,15,25,35 with the same start.
// When reading reaches t=25 the loop overflows b1 (idx=2 >= cap=2) and
// switches to b2. The correct index in b2 is (25-0)/10=2 → b2.data[2]=30.0.
// The old code set idx=0 → b2.data[0]=10.0 (wrong).
b1 := &buffer{data: make([]schema.Float, 0, 2), frequency: 10, start: 0}
b1.data = append(b1.data, schema.Float(1.0), schema.Float(2.0)) // t=5, t=15
b2 := &buffer{data: make([]schema.Float, 0, 10), frequency: 10, start: 0}
b2.data = append(b2.data,
schema.Float(10.0), schema.Float(20.0),
schema.Float(30.0), schema.Float(40.0)) // t=5,15,25,35
b2.prev = b1
b1.next = b2
// from=0 triggers the walkback to b1 (from < b2.firstWrite=5).
// After clamping, the loop runs t=5,15,25,35.
data := make([]schema.Float, 4)
result, _, _, err := b2.read(0, 36, data)
if err != nil {
t.Fatalf("read() error = %v", err)
}
if len(result) < 3 {
t.Fatalf("len(result) = %d, want >= 3", len(result))
}
if result[0] != schema.Float(1.0) {
t.Errorf("result[0] (t=5) = %v, want 1.0 (from b1)", result[0])
}
if result[1] != schema.Float(2.0) {
t.Errorf("result[1] (t=15) = %v, want 2.0 (from b1)", result[1])
}
// This is the critical assertion: old code returned 10.0 (b2.data[0]).
if result[2] != schema.Float(30.0) {
t.Errorf("result[2] (t=25) = %v, want 30.0 (idx recalculation fix)", result[2])
}
}
// TestBufferReadNaNValues verifies that NaN slots written to the buffer are
// returned as NaN during read.
func TestBufferReadNaNValues(t *testing.T) {
b := newBuffer(100, 10)
b.write(100, schema.Float(1.0))
b.write(110, schema.NaN)
b.write(120, schema.Float(3.0))
data := make([]schema.Float, 3)
result, _, _, err := b.read(100, 130, data)
if err != nil {
t.Fatalf("read() error = %v", err)
}
if len(result) != 3 {
t.Fatalf("len(result) = %d, want 3", len(result))
}
if result[0] != schema.Float(1.0) {
t.Errorf("result[0] = %v, want 1.0", result[0])
}
if !result[1].IsNaN() {
t.Errorf("result[1] should be NaN, got %v", result[1])
}
if result[2] != schema.Float(3.0) {
t.Errorf("result[2] = %v, want 3.0", result[2])
}
}
// TestBufferReadAccumulation verifies the += accumulation pattern used for
// aggregation: values are added to whatever was already in the data slice.
func TestBufferReadAccumulation(t *testing.T) {
b := newBuffer(100, 10)
b.write(100, schema.Float(3.0))
b.write(110, schema.Float(5.0))
// Pre-populate data slice (simulates a second metric being summed in).
data := []schema.Float{2.0, 1.0, 0.0}
result, _, _, err := b.read(100, 120, data)
if err != nil {
t.Fatalf("read() error = %v", err)
}
// 2.0+3.0=5.0, 1.0+5.0=6.0
if result[0] != schema.Float(5.0) {
t.Errorf("result[0] = %v, want 5.0 (2+3)", result[0])
}
if result[1] != schema.Float(6.0) {
t.Errorf("result[1] = %v, want 6.0 (1+5)", result[1])
}
}
// ─── Buffer free ─────────────────────────────────────────────────────────────
// newTestPool swaps out the package-level bufferPool for a fresh isolated one
// and returns a cleanup function that restores the original.
func newTestPool(t *testing.T) *PersistentBufferPool {
t.Helper()
pool := NewPersistentBufferPool()
saved := bufferPool
bufferPool = pool
t.Cleanup(func() { bufferPool = saved })
return pool
}
// TestBufferFreeRetention verifies that free() removes buffers whose entire
// time range falls before the retention threshold and returns them to the pool.
func TestBufferFreeRetention(t *testing.T) {
pool := newTestPool(t)
// b1: firstWrite=5, end=25 b2: firstWrite=25, end=45 b3: firstWrite=45, end=65
b1 := &buffer{data: make([]schema.Float, 0, BufferCap), frequency: 10, start: 0}
b1.data = append(b1.data, 1.0, 2.0)
b2 := &buffer{data: make([]schema.Float, 0, BufferCap), frequency: 10, start: 20}
b2.data = append(b2.data, 3.0, 4.0)
b2.prev = b1
b1.next = b2
b3 := &buffer{data: make([]schema.Float, 0, BufferCap), frequency: 10, start: 40}
b3.data = append(b3.data, 5.0, 6.0)
b3.prev = b2
b2.next = b3
// Threshold=30: b1.end()=25 < 30 → freed; b2.end()=45 >= 30 → kept.
delme, n := b3.free(30)
if delme {
t.Error("head buffer b3 should not be marked for deletion")
}
if n != 1 {
t.Errorf("freed count = %d, want 1", n)
}
if b2.prev != nil {
t.Error("b1 should have been unlinked from b2.prev")
}
if b3.prev != b2 {
t.Error("b3 should still reference b2")
}
if pool.GetSize() != 1 {
t.Errorf("pool size = %d, want 1 (b1 returned)", pool.GetSize())
}
}
// TestBufferFreeAll verifies that free() removes all buffers and signals the
// caller to delete the head when the entire chain is older than the threshold.
func TestBufferFreeAll(t *testing.T) {
pool := newTestPool(t)
b1 := &buffer{data: make([]schema.Float, 0, BufferCap), frequency: 10, start: 0}
b1.data = append(b1.data, 1.0, 2.0) // end=25
b2 := &buffer{data: make([]schema.Float, 0, BufferCap), frequency: 10, start: 20}
b2.data = append(b2.data, 3.0, 4.0) // end=45
b2.prev = b1
b1.next = b2
// Threshold=100 > both ends → both should be freed.
delme, n := b2.free(100)
if !delme {
t.Error("head buffer b2 should be marked for deletion when all data is stale")
}
if n != 2 {
t.Errorf("freed count = %d, want 2", n)
}
// b1 was freed inside free(); b2 is returned with delme=true for the caller.
if pool.GetSize() != 1 {
t.Errorf("pool size = %d, want 1 (b1 returned; b2 returned by caller)", pool.GetSize())
}
}
// ─── forceFreeOldest ─────────────────────────────────────────────────────────
// TestForceFreeOldestPoolReturn verifies that forceFreeOldest() returns the
// freed buffer to the pool (regression: previously it was just dropped).
func TestForceFreeOldestPoolReturn(t *testing.T) {
pool := newTestPool(t)
b1 := &buffer{data: make([]schema.Float, 0, BufferCap), frequency: 10, start: 0}
b2 := &buffer{data: make([]schema.Float, 0, BufferCap), frequency: 10, start: 20}
b3 := &buffer{data: make([]schema.Float, 0, BufferCap), frequency: 10, start: 40}
b1.data = append(b1.data, 1.0)
b2.data = append(b2.data, 2.0)
b3.data = append(b3.data, 3.0)
b2.prev = b1
b1.next = b2
b3.prev = b2
b2.next = b3
delme, n := b3.forceFreeOldest()
if delme {
t.Error("head b3 should not be marked for deletion (chain has 3 buffers)")
}
if n != 1 {
t.Errorf("freed count = %d, want 1", n)
}
if b2.prev != nil {
t.Error("b1 should have been unlinked from b2.prev after forceFreeOldest")
}
if b3.prev != b2 {
t.Error("b3 should still link to b2")
}
if pool.GetSize() != 1 {
t.Errorf("pool size = %d, want 1 (b1 returned to pool)", pool.GetSize())
}
}
// TestForceFreeOldestSingleBuffer verifies that forceFreeOldest() returns
// delme=true when the buffer is the only one in the chain.
func TestForceFreeOldestSingleBuffer(t *testing.T) {
b := newBuffer(100, 10)
b.write(100, schema.Float(1.0))
delme, n := b.forceFreeOldest()
if !delme {
t.Error("single-buffer chain: expected delme=true (the buffer IS the oldest)")
}
if n != 1 {
t.Errorf("freed count = %d, want 1", n)
}
}
// ─── iterFromTo ───────────────────────────────────────────────────────────────
// TestBufferIterFromToOrder verifies that iterFromTo invokes the callback in
// chronological order (oldest → newest).
func TestBufferIterFromToOrder(t *testing.T) {
// Each buffer has 2 data points so end() = firstWrite + 2*freq.
b1 := &buffer{data: make([]schema.Float, 2, BufferCap), frequency: 10, start: 0} // end=25
b2 := &buffer{data: make([]schema.Float, 2, BufferCap), frequency: 10, start: 20} // end=45
b3 := &buffer{data: make([]schema.Float, 2, BufferCap), frequency: 10, start: 40} // end=65
b2.prev = b1
b1.next = b2
b3.prev = b2
b2.next = b3
var order []*buffer
err := b3.iterFromTo(0, 100, func(b *buffer) error {
order = append(order, b)
return nil
})
if err != nil {
t.Fatalf("iterFromTo() error = %v", err)
}
if len(order) != 3 {
t.Fatalf("callback count = %d, want 3", len(order))
}
if order[0] != b1 || order[1] != b2 || order[2] != b3 {
t.Error("iterFromTo() did not call callbacks in chronological (oldest→newest) order")
}
}
// TestBufferIterFromToFiltered verifies that iterFromTo only calls the callback
// for buffers whose time range overlaps [from, to].
func TestBufferIterFromToFiltered(t *testing.T) {
// b1: end=25 b2: start=20, end=45 b3: start=40, end=65
b1 := &buffer{data: make([]schema.Float, 2, BufferCap), frequency: 10, start: 0}
b2 := &buffer{data: make([]schema.Float, 2, BufferCap), frequency: 10, start: 20}
b3 := &buffer{data: make([]schema.Float, 2, BufferCap), frequency: 10, start: 40}
b2.prev = b1
b1.next = b2
b3.prev = b2
b2.next = b3
// [30,50]: b1.end=25 < 30 → excluded; b2 and b3 overlap → included.
var visited []*buffer
b3.iterFromTo(30, 50, func(b *buffer) error {
visited = append(visited, b)
return nil
})
if len(visited) != 2 {
t.Fatalf("visited count = %d, want 2 (b2 and b3)", len(visited))
}
if visited[0] != b2 || visited[1] != b3 {
t.Errorf("visited = %v, want [b2, b3]", visited)
}
}
// TestBufferIterFromToNilBuffer verifies that iterFromTo on a nil buffer is a
// safe no-op.
func TestBufferIterFromToNilBuffer(t *testing.T) {
var b *buffer
called := false
err := b.iterFromTo(0, 100, func(_ *buffer) error {
called = true
return nil
})
if err != nil {
t.Errorf("iterFromTo(nil) error = %v, want nil", err)
}
if called {
t.Error("callback should not be called for a nil buffer")
}
}
// ─── count ────────────────────────────────────────────────────────────────────
// TestBufferCount verifies that count() sums data-point lengths across the
// entire chain, including all prev links.
func TestBufferCount(t *testing.T) {
b1 := &buffer{data: make([]schema.Float, 3, BufferCap), frequency: 10, start: 0}
b2 := &buffer{data: make([]schema.Float, 2, BufferCap), frequency: 10, start: 35}
b3 := &buffer{data: make([]schema.Float, 5, BufferCap), frequency: 10, start: 60}
b2.prev = b1
b1.next = b2
b3.prev = b2
b2.next = b3
if got := b3.count(); got != 10 {
t.Errorf("count() = %d, want 10 (3+2+5)", got)
}
// Single buffer.
lone := &buffer{data: make([]schema.Float, 7, BufferCap)}
if got := lone.count(); got != 7 {
t.Errorf("count() single buffer = %d, want 7", got)
}
}
// ─── Existing tests below ────────────────────────────────────────────────────
func TestAssignAggregationStrategy(t *testing.T) {
tests := []struct {
name string