fix: Unbound growth of wal files in case of checkpointing error

Entire-Checkpoint: 95a89a7127c5
This commit is contained in:
2026-03-28 06:26:21 +01:00
parent ac0a4cc39a
commit cc3d03bb5b
5 changed files with 56 additions and 9 deletions

View File

@@ -129,7 +129,8 @@ func Checkpointing(wg *sync.WaitGroup, ctx context.Context) {
n, hostDirs, err := ms.ToCheckpointWAL(Keys.Checkpoints.RootDir, from.Unix(), now.Unix())
if err != nil {
cclog.Errorf("[METRICSTORE]> binary checkpointing failed: %s", err.Error())
} else {
}
if n > 0 {
cclog.Infof("[METRICSTORE]> done: %d binary snapshot files created", n)
lastCheckpointMu.Lock()
lastCheckpoint = now

View File

@@ -59,11 +59,14 @@ const (
// Checkpoints configures periodic persistence of in-memory metric data.
//
// Fields:
// - FileFormat: "json" (human-readable, periodic) or "wal" (binary snapshot + WAL, crash-safe); default is "wal"
// - RootDir: Filesystem path for checkpoint files (created if missing)
// - FileFormat: "json" (human-readable, periodic) or "wal" (binary snapshot + WAL, crash-safe); default is "wal"
// - RootDir: Filesystem path for checkpoint files (created if missing)
// - MaxWALSize: Maximum size in bytes for a single host's WAL file; 0 = unlimited (default).
// When exceeded the WAL is force-rotated to prevent unbounded disk growth.
type Checkpoints struct {
FileFormat string `json:"file-format"`
RootDir string `json:"directory"`
MaxWALSize int64 `json:"max-wal-size,omitempty"`
}
// Debug provides development and profiling options.

View File

@@ -24,6 +24,11 @@ const configSchema = `{
"directory": {
"description": "Path in which the checkpointed files should be placed.",
"type": "string"
},
"max-wal-size": {
"description": "Maximum size in bytes for a single host's WAL file. When exceeded the WAL is force-rotated to prevent unbounded disk growth. Only applies when file-format is 'wal'. 0 means unlimited (default).",
"type": "integer",
"minimum": 0
}
}
},

View File

@@ -122,6 +122,7 @@ type walFileState struct {
f *os.File
w *bufio.Writer
dirty bool
size int64 // approximate bytes written (tracked from open + writes)
}
// walFlushInterval controls how often dirty WAL files are flushed to disk.
@@ -214,7 +215,11 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) {
// Write file header magic if file is new (empty).
info, err := f.Stat()
if err == nil && info.Size() == 0 {
var fileSize int64
if err == nil {
fileSize = info.Size()
}
if err == nil && fileSize == 0 {
var hdr [4]byte
binary.LittleEndian.PutUint32(hdr[:], walFileMagic)
if _, err := w.Write(hdr[:]); err != nil {
@@ -222,9 +227,10 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) {
f.Close()
return nil
}
fileSize = 4
}
ws = &walFileState{f: f, w: w}
ws = &walFileState{f: f, w: w, size: fileSize}
hostFiles[hostDir] = ws
return ws
}
@@ -235,9 +241,30 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) {
if ws == nil {
return
}
if err := writeWALRecordDirect(ws.w, msg); err != nil {
// Enforce max WAL size: force-rotate before writing if limit is exceeded.
// The in-memory store still holds the data; only crash-recovery coverage is lost.
if maxSize := Keys.Checkpoints.MaxWALSize; maxSize > 0 && ws.size >= maxSize {
cclog.Warnf("[METRICSTORE]> WAL: force-rotating %s (size %d >= limit %d)",
hostDir, ws.size, maxSize)
ws.w.Flush()
ws.f.Close()
walPath := path.Join(hostDir, "current.wal")
if err := os.Remove(walPath); err != nil && !os.IsNotExist(err) {
cclog.Errorf("[METRICSTORE]> WAL: remove %s: %v", walPath, err)
}
delete(hostFiles, hostDir)
ws = getOrOpenWAL(hostDir)
if ws == nil {
return
}
}
n, err := writeWALRecordDirect(ws.w, msg)
if err != nil {
cclog.Errorf("[METRICSTORE]> WAL: write record: %v", err)
}
ws.size += int64(n)
ws.dirty = true
}
@@ -376,7 +403,8 @@ func RotateWALFilesAfterShutdown(hostDirs []string) {
// writeWALRecordDirect encodes a WAL record into a contiguous buffer first,
// then writes it to the bufio.Writer in a single call. This prevents partial
// records in the write buffer if a write error occurs mid-record (e.g. disk full).
func writeWALRecordDirect(w *bufio.Writer, msg *WALMessage) error {
// Returns the number of bytes written and any error.
func writeWALRecordDirect(w *bufio.Writer, msg *WALMessage) (int, error) {
// Compute payload and total record size.
payloadSize := 8 + 2 + len(msg.MetricName) + 1 + 4
for _, s := range msg.Selector {
@@ -430,8 +458,8 @@ func writeWALRecordDirect(w *bufio.Writer, msg *WALMessage) error {
binary.LittleEndian.PutUint32(buf[p:p+4], crc)
// Single atomic write to the buffered writer.
_, err := w.Write(buf)
return err
n, err := w.Write(buf)
return n, err
}
// readWALRecord reads one WAL record from the reader.