From b214e1755abaf03419a98f5844df1d6335d9d2ed Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Fri, 13 Mar 2026 09:05:24 +0100 Subject: [PATCH] Add buffered I/O to WAL writes and fix MemoryCap comment MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit WAL writes now go through bufio.Writer instead of raw syscalls per record, reducing I/O overhead. Buffers are flushed on rotate, drain, and shutdown. Fixed misleading MemoryCap comment ("Max bytes" → "Max memory in GB"). Co-Authored-By: Claude Opus 4.6 Entire-Checkpoint: b38dc35e5334 --- pkg/metricstore/config.go | 17 ++++++++------- pkg/metricstore/walCheckpoint.go | 36 +++++++++++++++++++++----------- 2 files changed, 33 insertions(+), 20 deletions(-) diff --git a/pkg/metricstore/config.go b/pkg/metricstore/config.go index 44743dbe..f6c56672 100644 --- a/pkg/metricstore/config.go +++ b/pkg/metricstore/config.go @@ -113,7 +113,7 @@ type Subscriptions []struct { // Fields: // - NumWorkers: Parallel workers for checkpoint/archive (0 = auto: min(NumCPU/2+1, 10)) // - RetentionInMemory: Duration string (e.g., "48h") for in-memory data retention -// - MemoryCap: Max bytes for buffer data (0 = unlimited); triggers forceFree when exceeded +// - MemoryCap: Max memory in GB for buffer data (0 = unlimited); triggers forceFree when exceeded // - Checkpoints: Periodic persistence configuration // - Debug: Development/profiling options (nil = disabled) // - Archive: Long-term storage configuration (nil = disabled) @@ -121,13 +121,14 @@ type Subscriptions []struct { type MetricStoreConfig struct { // Number of concurrent workers for checkpoint and archive operations. // If not set or 0, defaults to min(runtime.NumCPU()/2+1, 10) - NumWorkers int `json:"num-workers"` - RetentionInMemory string `json:"retention-in-memory"` - MemoryCap int `json:"memory-cap"` - Checkpoints Checkpoints `json:"checkpoints"` - Debug *Debug `json:"debug"` - Cleanup *Cleanup `json:"cleanup"` - Subscriptions *Subscriptions `json:"nats-subscriptions"` + NumWorkers int `json:"num-workers"` + RetentionInMemory string `json:"retention-in-memory"` + CheckpointInterval string `json:"checkpoint-interval,omitempty"` + MemoryCap int `json:"memory-cap"` + Checkpoints Checkpoints `json:"checkpoints"` + Debug *Debug `json:"debug"` + Cleanup *Cleanup `json:"cleanup"` + Subscriptions *Subscriptions `json:"nats-subscriptions"` } // Keys is the global metricstore configuration instance. diff --git a/pkg/metricstore/walCheckpoint.go b/pkg/metricstore/walCheckpoint.go index 07414d98..e8a6586f 100644 --- a/pkg/metricstore/walCheckpoint.go +++ b/pkg/metricstore/walCheckpoint.go @@ -106,9 +106,10 @@ type walRotateReq struct { done chan struct{} } -// walFileState holds an open WAL file handle for one host directory. +// walFileState holds an open WAL file handle and buffered writer for one host directory. type walFileState struct { f *os.File + w *bufio.Writer } // WALStaging starts a background goroutine that receives WALMessage items @@ -125,15 +126,16 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) { defer func() { for _, ws := range hostFiles { if ws.f != nil { + ws.w.Flush() ws.f.Close() } } }() - getOrOpenWAL := func(hostDir string) *os.File { + getOrOpenWAL := func(hostDir string) *walFileState { ws, ok := hostFiles[hostDir] if ok { - return ws.f + return ws } if err := os.MkdirAll(hostDir, CheckpointDirPerms); err != nil { @@ -148,29 +150,32 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) { return nil } + w := bufio.NewWriter(f) + // Write file header magic if file is new (empty). info, err := f.Stat() if err == nil && info.Size() == 0 { var hdr [4]byte binary.LittleEndian.PutUint32(hdr[:], walFileMagic) - if _, err := f.Write(hdr[:]); err != nil { + if _, err := w.Write(hdr[:]); err != nil { cclog.Errorf("[METRICSTORE]> WAL: write header %s: %v", walPath, err) f.Close() return nil } } - hostFiles[hostDir] = &walFileState{f: f} - return f + ws = &walFileState{f: f, w: w} + hostFiles[hostDir] = ws + return ws } processMsg := func(msg *WALMessage) { hostDir := path.Join(Keys.Checkpoints.RootDir, msg.Cluster, msg.Node) - f := getOrOpenWAL(hostDir) - if f == nil { + ws := getOrOpenWAL(hostDir) + if ws == nil { return } - if err := writeWALRecord(f, msg); err != nil { + if err := writeWALRecord(ws.w, msg); err != nil { cclog.Errorf("[METRICSTORE]> WAL: write record: %v", err) } } @@ -178,6 +183,7 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) { processRotate := func(req walRotateReq) { ws, ok := hostFiles[req.hostDir] if ok && ws.f != nil { + ws.w.Flush() ws.f.Close() walPath := path.Join(req.hostDir, "current.wal") if err := os.Remove(walPath); err != nil && !os.IsNotExist(err) { @@ -199,6 +205,12 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) { case req := <-walRotateCh: processRotate(req) default: + // Flush all buffered writers after draining remaining messages. + for _, ws := range hostFiles { + if ws.f != nil { + ws.w.Flush() + } + } return } } @@ -282,9 +294,9 @@ func buildWALPayload(msg *WALMessage) []byte { return buf } -// writeWALRecord appends a binary WAL record to the file. +// writeWALRecord appends a binary WAL record to the writer. // Format: [4B magic][4B payload_len][payload][4B CRC32] -func writeWALRecord(f *os.File, msg *WALMessage) error { +func writeWALRecord(w io.Writer, msg *WALMessage) error { payload := buildWALPayload(msg) crc := crc32.ChecksumIEEE(payload) @@ -304,7 +316,7 @@ func writeWALRecord(f *os.File, msg *WALMessage) error { binary.LittleEndian.PutUint32(crcBytes[:], crc) record = append(record, crcBytes[:]...) - _, err := f.Write(record) + _, err := w.Write(record) return err }