mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2026-03-30 12:37:30 +02:00
fix: Pause WAL writes during binary checkpoint to prevent message drops
WAL writes during checkpoint are redundant since the binary snapshot captures all in-memory data. Pausing eliminates channel saturation (1.4M+ dropped messages) caused by disk I/O contention between checkpoint writes and WAL staging. Also removes direct WAL file deletion in checkpoint workers that raced with the staging goroutine. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> Entire-Checkpoint: 34d698f40bac
This commit is contained in:
@@ -126,6 +126,10 @@ func Checkpointing(wg *sync.WaitGroup, ctx context.Context) {
|
|||||||
cclog.Infof("[METRICSTORE]> start checkpointing (starting at %s)...", from.Format(time.RFC3339))
|
cclog.Infof("[METRICSTORE]> start checkpointing (starting at %s)...", from.Format(time.RFC3339))
|
||||||
|
|
||||||
if Keys.Checkpoints.FileFormat == "wal" {
|
if Keys.Checkpoints.FileFormat == "wal" {
|
||||||
|
// Pause WAL writes: the binary snapshot captures all in-memory
|
||||||
|
// data, so WAL records written during checkpoint are redundant
|
||||||
|
// and would be deleted during rotation anyway.
|
||||||
|
walCheckpointActive.Store(true)
|
||||||
n, hostDirs, err := ms.ToCheckpointWAL(Keys.Checkpoints.RootDir, from.Unix(), now.Unix())
|
n, hostDirs, err := ms.ToCheckpointWAL(Keys.Checkpoints.RootDir, from.Unix(), now.Unix())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cclog.Errorf("[METRICSTORE]> binary checkpointing failed: %s", err.Error())
|
cclog.Errorf("[METRICSTORE]> binary checkpointing failed: %s", err.Error())
|
||||||
@@ -138,6 +142,8 @@ func Checkpointing(wg *sync.WaitGroup, ctx context.Context) {
|
|||||||
// Rotate WAL files for successfully checkpointed hosts.
|
// Rotate WAL files for successfully checkpointed hosts.
|
||||||
RotateWALFiles(hostDirs)
|
RotateWALFiles(hostDirs)
|
||||||
}
|
}
|
||||||
|
walCheckpointActive.Store(false)
|
||||||
|
walDropped.Store(0)
|
||||||
} else {
|
} else {
|
||||||
n, err := ms.ToCheckpoint(Keys.Checkpoints.RootDir, from.Unix(), now.Unix())
|
n, err := ms.ToCheckpoint(Keys.Checkpoints.RootDir, from.Unix(), now.Unix())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@@ -99,6 +99,11 @@ var walStagingWg sync.WaitGroup
|
|||||||
// SendWALMessage from sending on a closed channel (which panics in Go).
|
// SendWALMessage from sending on a closed channel (which panics in Go).
|
||||||
var walShuttingDown atomic.Bool
|
var walShuttingDown atomic.Bool
|
||||||
|
|
||||||
|
// walCheckpointActive is set during binary checkpoint writes.
|
||||||
|
// While active, SendWALMessage skips sending (returns true) because the
|
||||||
|
// snapshot captures all in-memory data, making WAL writes redundant.
|
||||||
|
var walCheckpointActive atomic.Bool
|
||||||
|
|
||||||
// WALMessage represents a single metric write to be appended to the WAL.
|
// WALMessage represents a single metric write to be appended to the WAL.
|
||||||
// Cluster and Node are NOT stored in the WAL record (inferred from file path).
|
// Cluster and Node are NOT stored in the WAL record (inferred from file path).
|
||||||
type WALMessage struct {
|
type WALMessage struct {
|
||||||
@@ -146,6 +151,9 @@ func SendWALMessage(msg *WALMessage) bool {
|
|||||||
if walShardChs == nil || walShuttingDown.Load() {
|
if walShardChs == nil || walShuttingDown.Load() {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
if walCheckpointActive.Load() {
|
||||||
|
return true // Data safe in memory; snapshot will capture it
|
||||||
|
}
|
||||||
shard := walShardIndex(msg.Cluster, msg.Node)
|
shard := walShardIndex(msg.Cluster, msg.Node)
|
||||||
select {
|
select {
|
||||||
case walShardChs[shard] <- msg:
|
case walShardChs[shard] <- msg:
|
||||||
@@ -727,11 +735,6 @@ func (m *MemoryStore) ToCheckpointWAL(dir string, from, to int64) (int, []string
|
|||||||
atomic.AddInt32(&errs, 1)
|
atomic.AddInt32(&errs, 1)
|
||||||
} else {
|
} else {
|
||||||
atomic.AddInt32(&n, 1)
|
atomic.AddInt32(&n, 1)
|
||||||
// Delete WAL immediately after successful snapshot.
|
|
||||||
walPath := path.Join(wi.hostDir, "current.wal")
|
|
||||||
if err := os.Remove(walPath); err != nil && !os.IsNotExist(err) {
|
|
||||||
cclog.Errorf("[METRICSTORE]> WAL remove %s: %v", walPath, err)
|
|
||||||
}
|
|
||||||
successMu.Lock()
|
successMu.Lock()
|
||||||
successDirs = append(successDirs, wi.hostDir)
|
successDirs = append(successDirs, wi.hostDir)
|
||||||
successMu.Unlock()
|
successMu.Unlock()
|
||||||
|
|||||||
Reference in New Issue
Block a user