diff --git a/pkg/metricstore/walCheckpoint.go b/pkg/metricstore/walCheckpoint.go index 38585cf5..85e5047e 100644 --- a/pkg/metricstore/walCheckpoint.go +++ b/pkg/metricstore/walCheckpoint.go @@ -69,6 +69,7 @@ import ( "strings" "sync" "sync/atomic" + "time" cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" "github.com/ClusterCockpit/cc-lib/v2/schema" @@ -111,10 +112,16 @@ type walRotateReq struct { // walFileState holds an open WAL file handle and buffered writer for one host directory. type walFileState struct { - f *os.File - w *bufio.Writer + f *os.File + w *bufio.Writer + dirty bool } +// walFlushInterval controls how often dirty WAL files are flushed to disk. +// Decoupling flushes from message processing lets the consumer run at memory +// speed, amortizing syscall overhead across many writes. +const walFlushInterval = 5 * time.Second + // walShardIndex computes which shard a message belongs to based on cluster+node. // Uses FNV-1a hash for fast, well-distributed mapping. func walShardIndex(cluster, node string) int { @@ -222,6 +229,7 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) { if err := writeWALRecordDirect(ws.w, msg); err != nil { cclog.Errorf("[METRICSTORE]> WAL: write record: %v", err) } + ws.dirty = true } processRotate := func(req walRotateReq) { @@ -238,10 +246,11 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) { close(req.done) } - flushAll := func() { + flushDirty := func() { for _, ws := range hostFiles { - if ws.f != nil { + if ws.dirty { ws.w.Flush() + ws.dirty = false } } } @@ -257,12 +266,35 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) { case req := <-rotateCh: processRotate(req) default: - flushAll() + flushDirty() return } } } + ticker := time.NewTicker(walFlushInterval) + defer ticker.Stop() + + // drainBatch processes up to 4096 pending messages without blocking. + // Returns false if the channel was closed. + drainBatch := func() bool { + for range 4096 { + select { + case msg, ok := <-msgCh: + if !ok { + flushDirty() + return false + } + processMsg(msg) + case req := <-rotateCh: + processRotate(req) + default: + return true + } + } + return true + } + for { select { case <-ctx.Done(): @@ -273,23 +305,12 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) { return } processMsg(msg) - - // Drain up to 256 more messages without blocking to batch writes. - for range 256 { - select { - case msg, ok := <-msgCh: - if !ok { - return - } - processMsg(msg) - case req := <-rotateCh: - processRotate(req) - default: - goto flushed - } + if !drainBatch() { + return } - flushed: - flushAll() + // No flush here — timer handles periodic flushing. + case <-ticker.C: + flushDirty() case req := <-rotateCh: processRotate(req) }