From fc47b12fedd259e79bcf9f468e5cdcc8ee808890 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Sun, 29 Mar 2026 11:13:39 +0200 Subject: [PATCH] fix: Pause WAL writes during binary checkpoint to prevent message drops WAL writes during checkpoint are redundant since the binary snapshot captures all in-memory data. Pausing eliminates channel saturation (1.4M+ dropped messages) caused by disk I/O contention between checkpoint writes and WAL staging. Also removes direct WAL file deletion in checkpoint workers that raced with the staging goroutine. Co-Authored-By: Claude Opus 4.6 Entire-Checkpoint: 34d698f40bac --- pkg/metricstore/checkpoint.go | 6 ++++++ pkg/metricstore/walCheckpoint.go | 13 ++++++++----- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/pkg/metricstore/checkpoint.go b/pkg/metricstore/checkpoint.go index 2dcb5733..d408ce72 100644 --- a/pkg/metricstore/checkpoint.go +++ b/pkg/metricstore/checkpoint.go @@ -126,6 +126,10 @@ func Checkpointing(wg *sync.WaitGroup, ctx context.Context) { cclog.Infof("[METRICSTORE]> start checkpointing (starting at %s)...", from.Format(time.RFC3339)) if Keys.Checkpoints.FileFormat == "wal" { + // Pause WAL writes: the binary snapshot captures all in-memory + // data, so WAL records written during checkpoint are redundant + // and would be deleted during rotation anyway. + walCheckpointActive.Store(true) n, hostDirs, err := ms.ToCheckpointWAL(Keys.Checkpoints.RootDir, from.Unix(), now.Unix()) if err != nil { cclog.Errorf("[METRICSTORE]> binary checkpointing failed: %s", err.Error()) @@ -138,6 +142,8 @@ func Checkpointing(wg *sync.WaitGroup, ctx context.Context) { // Rotate WAL files for successfully checkpointed hosts. RotateWALFiles(hostDirs) } + walCheckpointActive.Store(false) + walDropped.Store(0) } else { n, err := ms.ToCheckpoint(Keys.Checkpoints.RootDir, from.Unix(), now.Unix()) if err != nil { diff --git a/pkg/metricstore/walCheckpoint.go b/pkg/metricstore/walCheckpoint.go index 749ff756..0d0b2c8e 100644 --- a/pkg/metricstore/walCheckpoint.go +++ b/pkg/metricstore/walCheckpoint.go @@ -99,6 +99,11 @@ var walStagingWg sync.WaitGroup // SendWALMessage from sending on a closed channel (which panics in Go). var walShuttingDown atomic.Bool +// walCheckpointActive is set during binary checkpoint writes. +// While active, SendWALMessage skips sending (returns true) because the +// snapshot captures all in-memory data, making WAL writes redundant. +var walCheckpointActive atomic.Bool + // WALMessage represents a single metric write to be appended to the WAL. // Cluster and Node are NOT stored in the WAL record (inferred from file path). type WALMessage struct { @@ -146,6 +151,9 @@ func SendWALMessage(msg *WALMessage) bool { if walShardChs == nil || walShuttingDown.Load() { return false } + if walCheckpointActive.Load() { + return true // Data safe in memory; snapshot will capture it + } shard := walShardIndex(msg.Cluster, msg.Node) select { case walShardChs[shard] <- msg: @@ -727,11 +735,6 @@ func (m *MemoryStore) ToCheckpointWAL(dir string, from, to int64) (int, []string atomic.AddInt32(&errs, 1) } else { atomic.AddInt32(&n, 1) - // Delete WAL immediately after successful snapshot. - walPath := path.Join(wi.hostDir, "current.wal") - if err := os.Remove(walPath); err != nil && !os.IsNotExist(err) { - cclog.Errorf("[METRICSTORE]> WAL remove %s: %v", walPath, err) - } successMu.Lock() successDirs = append(successDirs, wi.hostDir) successMu.Unlock()