From cc3d03bb5b78c26b3dd2e04279b6206ed3753034 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Sat, 28 Mar 2026 06:26:21 +0100 Subject: [PATCH] fix: Unbound growth of wal files in case of checkpointing error Entire-Checkpoint: 95a89a7127c5 --- ReleaseNotes.md | 10 ++++++++ pkg/metricstore/checkpoint.go | 3 ++- pkg/metricstore/config.go | 7 ++++-- pkg/metricstore/configSchema.go | 5 ++++ pkg/metricstore/walCheckpoint.go | 40 +++++++++++++++++++++++++++----- 5 files changed, 56 insertions(+), 9 deletions(-) diff --git a/ReleaseNotes.md b/ReleaseNotes.md index 20e66322..461df2a2 100644 --- a/ReleaseNotes.md +++ b/ReleaseNotes.md @@ -19,6 +19,16 @@ This is also the default. ### Bug fixes +- **WAL not rotated on partial checkpoint failure**: When binary checkpointing + failed for some hosts, WAL files for successfully checkpointed hosts were not + rotated and the checkpoint timestamp was not advanced. Partial successes now + correctly advance the checkpoint and rotate WAL files for completed hosts. +- **Unbounded WAL file growth**: If binary checkpointing consistently failed for + a host, its `current.wal` file grew without limit until disk exhaustion. A new + `max-wal-size` configuration option (in the `checkpoints` block) allows setting + a per-host WAL size cap in bytes. When exceeded, the WAL is force-rotated. + Defaults to 0 (unlimited) for backward compatibility. + - **Doubleranged filter fixes**: Range filters now correctly handle zero as a boundary value. Improved validation and UI text for "more than equal" and "less than equal" range selections. diff --git a/pkg/metricstore/checkpoint.go b/pkg/metricstore/checkpoint.go index 3627a67b..2dcb5733 100644 --- a/pkg/metricstore/checkpoint.go +++ b/pkg/metricstore/checkpoint.go @@ -129,7 +129,8 @@ func Checkpointing(wg *sync.WaitGroup, ctx context.Context) { n, hostDirs, err := ms.ToCheckpointWAL(Keys.Checkpoints.RootDir, from.Unix(), now.Unix()) if err != nil { cclog.Errorf("[METRICSTORE]> binary checkpointing failed: %s", err.Error()) - } else { + } + if n > 0 { cclog.Infof("[METRICSTORE]> done: %d binary snapshot files created", n) lastCheckpointMu.Lock() lastCheckpoint = now diff --git a/pkg/metricstore/config.go b/pkg/metricstore/config.go index f6c56672..b24af844 100644 --- a/pkg/metricstore/config.go +++ b/pkg/metricstore/config.go @@ -59,11 +59,14 @@ const ( // Checkpoints configures periodic persistence of in-memory metric data. // // Fields: -// - FileFormat: "json" (human-readable, periodic) or "wal" (binary snapshot + WAL, crash-safe); default is "wal" -// - RootDir: Filesystem path for checkpoint files (created if missing) +// - FileFormat: "json" (human-readable, periodic) or "wal" (binary snapshot + WAL, crash-safe); default is "wal" +// - RootDir: Filesystem path for checkpoint files (created if missing) +// - MaxWALSize: Maximum size in bytes for a single host's WAL file; 0 = unlimited (default). +// When exceeded the WAL is force-rotated to prevent unbounded disk growth. type Checkpoints struct { FileFormat string `json:"file-format"` RootDir string `json:"directory"` + MaxWALSize int64 `json:"max-wal-size,omitempty"` } // Debug provides development and profiling options. diff --git a/pkg/metricstore/configSchema.go b/pkg/metricstore/configSchema.go index 00bfc4b7..3aeef4bd 100644 --- a/pkg/metricstore/configSchema.go +++ b/pkg/metricstore/configSchema.go @@ -24,6 +24,11 @@ const configSchema = `{ "directory": { "description": "Path in which the checkpointed files should be placed.", "type": "string" + }, + "max-wal-size": { + "description": "Maximum size in bytes for a single host's WAL file. When exceeded the WAL is force-rotated to prevent unbounded disk growth. Only applies when file-format is 'wal'. 0 means unlimited (default).", + "type": "integer", + "minimum": 0 } } }, diff --git a/pkg/metricstore/walCheckpoint.go b/pkg/metricstore/walCheckpoint.go index 01de32e9..35a5d6d5 100644 --- a/pkg/metricstore/walCheckpoint.go +++ b/pkg/metricstore/walCheckpoint.go @@ -122,6 +122,7 @@ type walFileState struct { f *os.File w *bufio.Writer dirty bool + size int64 // approximate bytes written (tracked from open + writes) } // walFlushInterval controls how often dirty WAL files are flushed to disk. @@ -214,7 +215,11 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) { // Write file header magic if file is new (empty). info, err := f.Stat() - if err == nil && info.Size() == 0 { + var fileSize int64 + if err == nil { + fileSize = info.Size() + } + if err == nil && fileSize == 0 { var hdr [4]byte binary.LittleEndian.PutUint32(hdr[:], walFileMagic) if _, err := w.Write(hdr[:]); err != nil { @@ -222,9 +227,10 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) { f.Close() return nil } + fileSize = 4 } - ws = &walFileState{f: f, w: w} + ws = &walFileState{f: f, w: w, size: fileSize} hostFiles[hostDir] = ws return ws } @@ -235,9 +241,30 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) { if ws == nil { return } - if err := writeWALRecordDirect(ws.w, msg); err != nil { + + // Enforce max WAL size: force-rotate before writing if limit is exceeded. + // The in-memory store still holds the data; only crash-recovery coverage is lost. + if maxSize := Keys.Checkpoints.MaxWALSize; maxSize > 0 && ws.size >= maxSize { + cclog.Warnf("[METRICSTORE]> WAL: force-rotating %s (size %d >= limit %d)", + hostDir, ws.size, maxSize) + ws.w.Flush() + ws.f.Close() + walPath := path.Join(hostDir, "current.wal") + if err := os.Remove(walPath); err != nil && !os.IsNotExist(err) { + cclog.Errorf("[METRICSTORE]> WAL: remove %s: %v", walPath, err) + } + delete(hostFiles, hostDir) + ws = getOrOpenWAL(hostDir) + if ws == nil { + return + } + } + + n, err := writeWALRecordDirect(ws.w, msg) + if err != nil { cclog.Errorf("[METRICSTORE]> WAL: write record: %v", err) } + ws.size += int64(n) ws.dirty = true } @@ -376,7 +403,8 @@ func RotateWALFilesAfterShutdown(hostDirs []string) { // writeWALRecordDirect encodes a WAL record into a contiguous buffer first, // then writes it to the bufio.Writer in a single call. This prevents partial // records in the write buffer if a write error occurs mid-record (e.g. disk full). -func writeWALRecordDirect(w *bufio.Writer, msg *WALMessage) error { +// Returns the number of bytes written and any error. +func writeWALRecordDirect(w *bufio.Writer, msg *WALMessage) (int, error) { // Compute payload and total record size. payloadSize := 8 + 2 + len(msg.MetricName) + 1 + 4 for _, s := range msg.Selector { @@ -430,8 +458,8 @@ func writeWALRecordDirect(w *bufio.Writer, msg *WALMessage) error { binary.LittleEndian.PutUint32(buf[p:p+4], crc) // Single atomic write to the buffered writer. - _, err := w.Write(buf) - return err + n, err := w.Write(buf) + return n, err } // readWALRecord reads one WAL record from the reader.