diff --git a/pkg/metricstore/checkpoint.go b/pkg/metricstore/checkpoint.go index 2dcb5733..d408ce72 100644 --- a/pkg/metricstore/checkpoint.go +++ b/pkg/metricstore/checkpoint.go @@ -126,6 +126,10 @@ func Checkpointing(wg *sync.WaitGroup, ctx context.Context) { cclog.Infof("[METRICSTORE]> start checkpointing (starting at %s)...", from.Format(time.RFC3339)) if Keys.Checkpoints.FileFormat == "wal" { + // Pause WAL writes: the binary snapshot captures all in-memory + // data, so WAL records written during checkpoint are redundant + // and would be deleted during rotation anyway. + walCheckpointActive.Store(true) n, hostDirs, err := ms.ToCheckpointWAL(Keys.Checkpoints.RootDir, from.Unix(), now.Unix()) if err != nil { cclog.Errorf("[METRICSTORE]> binary checkpointing failed: %s", err.Error()) @@ -138,6 +142,8 @@ func Checkpointing(wg *sync.WaitGroup, ctx context.Context) { // Rotate WAL files for successfully checkpointed hosts. RotateWALFiles(hostDirs) } + walCheckpointActive.Store(false) + walDropped.Store(0) } else { n, err := ms.ToCheckpoint(Keys.Checkpoints.RootDir, from.Unix(), now.Unix()) if err != nil { diff --git a/pkg/metricstore/walCheckpoint.go b/pkg/metricstore/walCheckpoint.go index 749ff756..0d0b2c8e 100644 --- a/pkg/metricstore/walCheckpoint.go +++ b/pkg/metricstore/walCheckpoint.go @@ -99,6 +99,11 @@ var walStagingWg sync.WaitGroup // SendWALMessage from sending on a closed channel (which panics in Go). var walShuttingDown atomic.Bool +// walCheckpointActive is set during binary checkpoint writes. +// While active, SendWALMessage skips sending (returns true) because the +// snapshot captures all in-memory data, making WAL writes redundant. +var walCheckpointActive atomic.Bool + // WALMessage represents a single metric write to be appended to the WAL. // Cluster and Node are NOT stored in the WAL record (inferred from file path). type WALMessage struct { @@ -146,6 +151,9 @@ func SendWALMessage(msg *WALMessage) bool { if walShardChs == nil || walShuttingDown.Load() { return false } + if walCheckpointActive.Load() { + return true // Data safe in memory; snapshot will capture it + } shard := walShardIndex(msg.Cluster, msg.Node) select { case walShardChs[shard] <- msg: @@ -727,11 +735,6 @@ func (m *MemoryStore) ToCheckpointWAL(dir string, from, to int64) (int, []string atomic.AddInt32(&errs, 1) } else { atomic.AddInt32(&n, 1) - // Delete WAL immediately after successful snapshot. - walPath := path.Join(wi.hostDir, "current.wal") - if err := os.Remove(walPath); err != nil && !os.IsNotExist(err) { - cclog.Errorf("[METRICSTORE]> WAL remove %s: %v", walPath, err) - } successMu.Lock() successDirs = append(successDirs, wi.hostDir) successMu.Unlock()