From 0ce2fa2fbeb794e5840561f8d150c45242ac3cf2 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Fri, 27 Mar 2026 06:59:58 +0100 Subject: [PATCH] fix: checkpoint initialization gap on restarts Entire-Checkpoint: 3f4d366b037c --- pkg/metricstore/archive.go | 12 +++++++++++- pkg/metricstore/checkpoint.go | 30 +++++++++++++++++++++--------- pkg/metricstore/metricstore.go | 2 +- 3 files changed, 33 insertions(+), 11 deletions(-) diff --git a/pkg/metricstore/archive.go b/pkg/metricstore/archive.go index 3b92a3e0..3d3e9eb3 100644 --- a/pkg/metricstore/archive.go +++ b/pkg/metricstore/archive.go @@ -53,6 +53,16 @@ func cleanUpWorker(wg *sync.WaitGroup, ctx context.Context, interval string, mod return } + // Account for checkpoint span: files named {from}.bin contain data up to + // from+checkpointInterval. Subtract the checkpoint interval so we don't + // delete files whose data still falls within the retention window. + checkpointSpan := 12 * time.Hour + if Keys.CheckpointInterval != "" { + if parsed, err := time.ParseDuration(Keys.CheckpointInterval); err == nil { + checkpointSpan = parsed + } + } + ticker := time.NewTicker(d) defer ticker.Stop() @@ -61,7 +71,7 @@ func cleanUpWorker(wg *sync.WaitGroup, ctx context.Context, interval string, mod case <-ctx.Done(): return case <-ticker.C: - t := time.Now().Add(-d) + t := time.Now().Add(-d).Add(-checkpointSpan) cclog.Infof("[METRICSTORE]> start %s checkpoints (older than %s)...", mode, t.Format(time.RFC3339)) n, err := CleanupCheckpoints(Keys.Checkpoints.RootDir, cleanupDir, t.Unix(), delete) diff --git a/pkg/metricstore/checkpoint.go b/pkg/metricstore/checkpoint.go index 3627a67b..7878b283 100644 --- a/pkg/metricstore/checkpoint.go +++ b/pkg/metricstore/checkpoint.go @@ -86,14 +86,16 @@ var ( // Checkpointing starts a background worker that periodically saves metric data to disk. // -// Checkpoints are written every 12 hours (hardcoded). +// restoreFrom is the earliest timestamp of data loaded from checkpoint files at startup. +// The first periodic checkpoint after restart will cover [restoreFrom, now], ensuring that +// loaded data is re-persisted before old checkpoint files are cleaned up. // // Format behaviour: // - "json": Periodic checkpointing every checkpointInterval // - "wal": Periodic binary snapshots + WAL rotation every checkpointInterval -func Checkpointing(wg *sync.WaitGroup, ctx context.Context) { +func Checkpointing(wg *sync.WaitGroup, ctx context.Context, restoreFrom time.Time) { lastCheckpointMu.Lock() - lastCheckpoint = time.Now() + lastCheckpoint = restoreFrom lastCheckpointMu.Unlock() ms := GetMemoryStore() @@ -337,25 +339,35 @@ func (l *Level) toCheckpoint(dir string, from, to int64, m *MemoryStore) error { return ErrNoNewArchiveData } - filepath := path.Join(dir, fmt.Sprintf("%d.json", from)) - f, err := os.OpenFile(filepath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, CheckpointFilePerms) + finalPath := path.Join(dir, fmt.Sprintf("%d.json", from)) + tmpPath := finalPath + ".tmp" + + f, err := os.OpenFile(tmpPath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, CheckpointFilePerms) if err != nil && os.IsNotExist(err) { err = os.MkdirAll(dir, CheckpointDirPerms) if err == nil { - f, err = os.OpenFile(filepath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, CheckpointFilePerms) + f, err = os.OpenFile(tmpPath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, CheckpointFilePerms) } } if err != nil { return err } - defer f.Close() bw := bufio.NewWriter(f) if err = json.NewEncoder(bw).Encode(cf); err != nil { + f.Close() + os.Remove(tmpPath) return err } - return bw.Flush() + if err = bw.Flush(); err != nil { + f.Close() + os.Remove(tmpPath) + return err + } + f.Close() + + return os.Rename(tmpPath, finalPath) } // enqueueCheckpointHosts traverses checkpoint directory and enqueues cluster/host pairs. @@ -470,7 +482,7 @@ func (l *Level) loadFile(cf *CheckpointFile, m *MemoryStore) error { data: metric.Data[0:n:n], prev: nil, next: nil, - archived: true, + archived: false, } minfo, ok := m.Metrics[name] diff --git a/pkg/metricstore/metricstore.go b/pkg/metricstore/metricstore.go index 9ba69c55..61aa5800 100644 --- a/pkg/metricstore/metricstore.go +++ b/pkg/metricstore/metricstore.go @@ -170,7 +170,7 @@ func Init(rawConfig json.RawMessage, metrics map[string]MetricConfig, wg *sync.W ctx, shutdown := context.WithCancel(context.Background()) Retention(wg, ctx) - Checkpointing(wg, ctx) + Checkpointing(wg, ctx, restoreFrom) CleanUp(wg, ctx) WALStaging(wg, ctx) MemoryUsageTracker(wg, ctx)