From 39ab12784c13db733e3bc3ce2e5c73d66c53bebd Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Fri, 13 Mar 2026 09:07:38 +0100 Subject: [PATCH] Make checkpointInterval an option config option again. Also applies small fixes Entire-Checkpoint: c11d1a65fae4 --- pkg/metricstore/api.go | 2 +- pkg/metricstore/checkpoint.go | 11 ++++++++-- pkg/metricstore/configSchema.go | 4 ++++ pkg/metricstore/healthcheck.go | 5 ++++- pkg/metricstore/level.go | 36 +++++++++++++++++++-------------- pkg/metricstore/metricstore.go | 3 +++ pkg/metricstore/stats.go | 2 +- 7 files changed, 43 insertions(+), 20 deletions(-) diff --git a/pkg/metricstore/api.go b/pkg/metricstore/api.go index 21f8db0c..c091c6d0 100644 --- a/pkg/metricstore/api.go +++ b/pkg/metricstore/api.go @@ -127,7 +127,7 @@ func (data *APIMetricData) AddStats() { // This is commonly used for unit conversion (e.g., bytes to gigabytes). // Scaling by 0 or 1 is a no-op for performance reasons. func (data *APIMetricData) ScaleBy(f schema.Float) { - if f == 0 || f == 1 { + if f == 1 { return } diff --git a/pkg/metricstore/checkpoint.go b/pkg/metricstore/checkpoint.go index ba1f7ba0..3627a67b 100644 --- a/pkg/metricstore/checkpoint.go +++ b/pkg/metricstore/checkpoint.go @@ -100,8 +100,15 @@ func Checkpointing(wg *sync.WaitGroup, ctx context.Context) { wg.Go(func() { - const checkpointInterval = 12 * time.Hour - d := checkpointInterval + d := 12 * time.Hour // default checkpoint interval + if Keys.CheckpointInterval != "" { + parsed, err := time.ParseDuration(Keys.CheckpointInterval) + if err != nil { + cclog.Errorf("[METRICSTORE]> invalid checkpoint-interval %q: %s, using default 12h", Keys.CheckpointInterval, err) + } else { + d = parsed + } + } ticker := time.NewTicker(d) defer ticker.Stop() diff --git a/pkg/metricstore/configSchema.go b/pkg/metricstore/configSchema.go index ed9bccaa..00bfc4b7 100644 --- a/pkg/metricstore/configSchema.go +++ b/pkg/metricstore/configSchema.go @@ -54,6 +54,10 @@ const configSchema = `{ "description": "Keep the metrics within memory for given time interval. Retention for X hours, then the metrics would be freed.", "type": "string" }, + "checkpoint-interval": { + "description": "Interval between checkpoints as a Go duration string (e.g., '12h', '6h', '30m'). Default is '12h'.", + "type": "string" + }, "memory-cap": { "description": "Upper memory capacity limit used by metricstore in GB", "type": "integer" diff --git a/pkg/metricstore/healthcheck.go b/pkg/metricstore/healthcheck.go index b3470a14..dc400f24 100644 --- a/pkg/metricstore/healthcheck.go +++ b/pkg/metricstore/healthcheck.go @@ -66,7 +66,10 @@ func (l *Level) collectMetricStatus(m *MemoryStore, expectedMetrics []string, he if degraded[metricName] { continue // already degraded, cannot improve } - mc := m.Metrics[metricName] + mc, ok := m.Metrics[metricName] + if !ok { + continue // unknown metric, will be reported as missing + } b := l.metrics[mc.offset] if b.bufferExists() { if !b.isBufferHealthy() { diff --git a/pkg/metricstore/level.go b/pkg/metricstore/level.go index 218ae8e6..1d360564 100644 --- a/pkg/metricstore/level.go +++ b/pkg/metricstore/level.go @@ -181,6 +181,14 @@ func (l *Level) collectPaths(currentDepth, targetDepth int, currentPath []string // - int: Total number of buffers freed in this subtree // - error: Non-nil on failure (propagated from children) func (l *Level) free(t int64) (int, error) { + n, _, err := l.freeAndCheckEmpty(t) + return n, err +} + +// freeAndCheckEmpty performs free() and atomically checks if the level is empty +// while still holding its own lock, avoiding a TOCTOU race between free() and +// a separate isEmpty() call. +func (l *Level) freeAndCheckEmpty(t int64) (int, bool, error) { l.lock.Lock() defer l.lock.Unlock() @@ -201,30 +209,28 @@ func (l *Level) free(t int64) (int, error) { } for key, child := range l.children { - m, err := child.free(t) + m, empty, err := child.freeAndCheckEmpty(t) n += m if err != nil { - return n, err + return n, false, err } - if child.isEmpty() { + if empty { delete(l.children, key) } } - return n, nil -} - -// isEmpty returns true if this level has no metrics and no children. -func (l *Level) isEmpty() bool { - l.lock.RLock() - defer l.lock.RUnlock() - - for _, b := range l.metrics { - if b != nil { - return false + // Check emptiness while still holding the lock + empty := len(l.children) == 0 + if empty { + for _, b := range l.metrics { + if b != nil { + empty = false + break + } } } - return len(l.children) == 0 + + return n, empty, nil } // forceFree removes the oldest buffer from each metric chain in the subtree. diff --git a/pkg/metricstore/metricstore.go b/pkg/metricstore/metricstore.go index 85eb0baf..84dad72b 100644 --- a/pkg/metricstore/metricstore.go +++ b/pkg/metricstore/metricstore.go @@ -754,6 +754,9 @@ func (m *MemoryStore) ForceFree() (int, error) { } func (m *MemoryStore) FreeAll() error { + m.root.lock.Lock() + defer m.root.lock.Unlock() + for k := range m.root.children { delete(m.root.children, k) } diff --git a/pkg/metricstore/stats.go b/pkg/metricstore/stats.go index 8f7886a3..2feec2ff 100644 --- a/pkg/metricstore/stats.go +++ b/pkg/metricstore/stats.go @@ -42,7 +42,7 @@ func (b *buffer) stats(from, to int64) (Stats, int64, int64, error) { if b == nil { break } - idx = 0 + idx = int((t - b.start) / b.frequency) } if t < b.start || idx >= len(b.data) {