From 97d65a9e5c21321c134fbf99bb7cc91497889886 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Thu, 26 Mar 2026 07:25:36 +0100 Subject: [PATCH] Fix bugs in WAL journal pipeline Entire-Checkpoint: 8fe0de4e6ac2 --- pkg/metricstore/metricstore.go | 6 ++ pkg/metricstore/walCheckpoint.go | 124 ++++++++++++++++--------------- 2 files changed, 69 insertions(+), 61 deletions(-) diff --git a/pkg/metricstore/metricstore.go b/pkg/metricstore/metricstore.go index e85116e5..e8bd6812 100644 --- a/pkg/metricstore/metricstore.go +++ b/pkg/metricstore/metricstore.go @@ -281,6 +281,12 @@ func Shutdown() { cclog.Infof("[METRICSTORE]> Background workers cancelled (%v)", time.Since(totalStart)) if Keys.Checkpoints.FileFormat == "wal" { + // Signal producers to stop sending before closing channels, + // preventing send-on-closed-channel panics from in-flight NATS workers. + walShuttingDown.Store(true) + // Brief grace period for in-flight DecodeLine calls to complete. + time.Sleep(100 * time.Millisecond) + for _, ch := range walShardChs { close(ch) } diff --git a/pkg/metricstore/walCheckpoint.go b/pkg/metricstore/walCheckpoint.go index 1736d754..f8121901 100644 --- a/pkg/metricstore/walCheckpoint.go +++ b/pkg/metricstore/walCheckpoint.go @@ -95,6 +95,10 @@ var walNumShards int // walStagingWg tracks WALStaging goroutine exits for shutdown synchronization. var walStagingWg sync.WaitGroup +// walShuttingDown is set before closing shard channels to prevent +// SendWALMessage from sending on a closed channel (which panics in Go). +var walShuttingDown atomic.Bool + // WALMessage represents a single metric write to be appended to the WAL. // Cluster and Node are NOT stored in the WAL record (inferred from file path). type WALMessage struct { @@ -136,9 +140,9 @@ func walShardIndex(cluster, node string) int { } // SendWALMessage routes a WAL message to the appropriate shard channel. -// Returns false if the channel is full (message dropped). +// Returns false if the channel is full or shutdown is in progress. func SendWALMessage(msg *WALMessage) bool { - if walShardChs == nil { + if walShardChs == nil || walShuttingDown.Load() { return false } shard := walShardIndex(msg.Cluster, msg.Node) @@ -320,20 +324,32 @@ func WaitForWALStagingDrain() { // RotateWALFiles sends rotation requests for the given host directories // and blocks until all rotations complete. Each request is routed to the // shard that owns the host directory. +// +// If shutdown is in progress (WAL staging goroutines may have exited), +// rotation is skipped to avoid deadlocking on abandoned channels. func RotateWALFiles(hostDirs []string) { - if walShardRotateChs == nil { + if walShardRotateChs == nil || walShuttingDown.Load() { return } - dones := make([]chan struct{}, len(hostDirs)) - for i, dir := range hostDirs { - dones[i] = make(chan struct{}) - // Extract cluster/node from hostDir to find the right shard. - // hostDir = rootDir/cluster/node + dones := make([]chan struct{}, 0, len(hostDirs)) + for _, dir := range hostDirs { + done := make(chan struct{}) shard := walShardIndexFromDir(dir) - walShardRotateChs[shard] <- walRotateReq{hostDir: dir, done: dones[i]} + select { + case walShardRotateChs[shard] <- walRotateReq{hostDir: dir, done: done}: + dones = append(dones, done) + default: + // Channel full or goroutine not consuming — skip this host. + cclog.Warnf("[METRICSTORE]> WAL rotation skipped for %s (channel full)", dir) + } } for _, done := range dones { - <-done + select { + case <-done: + case <-time.After(30 * time.Second): + cclog.Warn("[METRICSTORE]> WAL rotation timed out, continuing") + return + } } } @@ -357,78 +373,64 @@ func RotateWALFilesAfterShutdown(hostDirs []string) { } } -// writeWALRecordDirect encodes a WAL record directly into the bufio.Writer, -// avoiding heap allocations by using a stack-allocated scratch buffer for -// the fixed-size header/trailer and computing CRC inline. +// writeWALRecordDirect encodes a WAL record into a contiguous buffer first, +// then writes it to the bufio.Writer in a single call. This prevents partial +// records in the write buffer if a write error occurs mid-record (e.g. disk full). func writeWALRecordDirect(w *bufio.Writer, msg *WALMessage) error { - // Compute payload size. + // Compute payload and total record size. payloadSize := 8 + 2 + len(msg.MetricName) + 1 + 4 for _, s := range msg.Selector { payloadSize += 1 + len(s) } + // Total: 8 (header) + payload + 4 (CRC). + totalSize := 8 + payloadSize + 4 - // Write magic + payload length (8 bytes header). - var hdr [8]byte - binary.LittleEndian.PutUint32(hdr[0:4], walRecordMagic) - binary.LittleEndian.PutUint32(hdr[4:8], uint32(payloadSize)) - if _, err := w.Write(hdr[:]); err != nil { - return err + // Use stack buffer for typical small records, heap-allocate only for large ones. + var stackBuf [256]byte + var buf []byte + if totalSize <= len(stackBuf) { + buf = stackBuf[:totalSize] + } else { + buf = make([]byte, totalSize) } - // We need to compute CRC over the payload as we write it. - crc := crc32.NewIEEE() + // Header: magic + payload length. + binary.LittleEndian.PutUint32(buf[0:4], walRecordMagic) + binary.LittleEndian.PutUint32(buf[4:8], uint32(payloadSize)) + + // Payload starts at offset 8. + p := 8 // Timestamp (8 bytes). - var scratch [8]byte - binary.LittleEndian.PutUint64(scratch[:8], uint64(msg.Timestamp)) - crc.Write(scratch[:8]) - if _, err := w.Write(scratch[:8]); err != nil { - return err - } + binary.LittleEndian.PutUint64(buf[p:p+8], uint64(msg.Timestamp)) + p += 8 // Metric name length (2 bytes) + metric name. - binary.LittleEndian.PutUint16(scratch[:2], uint16(len(msg.MetricName))) - crc.Write(scratch[:2]) - if _, err := w.Write(scratch[:2]); err != nil { - return err - } - nameBytes := []byte(msg.MetricName) - crc.Write(nameBytes) - if _, err := w.Write(nameBytes); err != nil { - return err - } + binary.LittleEndian.PutUint16(buf[p:p+2], uint16(len(msg.MetricName))) + p += 2 + p += copy(buf[p:], msg.MetricName) // Selector count (1 byte). - scratch[0] = byte(len(msg.Selector)) - crc.Write(scratch[:1]) - if _, err := w.Write(scratch[:1]); err != nil { - return err - } + buf[p] = byte(len(msg.Selector)) + p++ // Selectors (1-byte length + bytes each). for _, sel := range msg.Selector { - scratch[0] = byte(len(sel)) - crc.Write(scratch[:1]) - if _, err := w.Write(scratch[:1]); err != nil { - return err - } - selBytes := []byte(sel) - crc.Write(selBytes) - if _, err := w.Write(selBytes); err != nil { - return err - } + buf[p] = byte(len(sel)) + p++ + p += copy(buf[p:], sel) } // Value (4 bytes, float32 bits). - binary.LittleEndian.PutUint32(scratch[:4], math.Float32bits(float32(msg.Value))) - crc.Write(scratch[:4]) - if _, err := w.Write(scratch[:4]); err != nil { - return err - } + binary.LittleEndian.PutUint32(buf[p:p+4], math.Float32bits(float32(msg.Value))) + p += 4 - // CRC32 (4 bytes). - binary.LittleEndian.PutUint32(scratch[:4], crc.Sum32()) - _, err := w.Write(scratch[:4]) + // CRC32 over payload (bytes 8..8+payloadSize). + crc := crc32.ChecksumIEEE(buf[8 : 8+payloadSize]) + binary.LittleEndian.PutUint32(buf[p:p+4], crc) + + // Single atomic write to the buffered writer. + _, err := w.Write(buf) return err }