fix: Increase throughput for WAL writers

Entire-Checkpoint: ddd40d290c56
This commit is contained in:
2026-03-24 06:53:12 +01:00
parent 3d94b0bf79
commit 0325d9e866

View File

@@ -69,6 +69,7 @@ import (
"strings" "strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time"
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
"github.com/ClusterCockpit/cc-lib/v2/schema" "github.com/ClusterCockpit/cc-lib/v2/schema"
@@ -111,10 +112,16 @@ type walRotateReq struct {
// walFileState holds an open WAL file handle and buffered writer for one host directory. // walFileState holds an open WAL file handle and buffered writer for one host directory.
type walFileState struct { type walFileState struct {
f *os.File f *os.File
w *bufio.Writer w *bufio.Writer
dirty bool
} }
// walFlushInterval controls how often dirty WAL files are flushed to disk.
// Decoupling flushes from message processing lets the consumer run at memory
// speed, amortizing syscall overhead across many writes.
const walFlushInterval = 5 * time.Second
// walShardIndex computes which shard a message belongs to based on cluster+node. // walShardIndex computes which shard a message belongs to based on cluster+node.
// Uses FNV-1a hash for fast, well-distributed mapping. // Uses FNV-1a hash for fast, well-distributed mapping.
func walShardIndex(cluster, node string) int { func walShardIndex(cluster, node string) int {
@@ -222,6 +229,7 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) {
if err := writeWALRecordDirect(ws.w, msg); err != nil { if err := writeWALRecordDirect(ws.w, msg); err != nil {
cclog.Errorf("[METRICSTORE]> WAL: write record: %v", err) cclog.Errorf("[METRICSTORE]> WAL: write record: %v", err)
} }
ws.dirty = true
} }
processRotate := func(req walRotateReq) { processRotate := func(req walRotateReq) {
@@ -238,10 +246,11 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) {
close(req.done) close(req.done)
} }
flushAll := func() { flushDirty := func() {
for _, ws := range hostFiles { for _, ws := range hostFiles {
if ws.f != nil { if ws.dirty {
ws.w.Flush() ws.w.Flush()
ws.dirty = false
} }
} }
} }
@@ -257,12 +266,35 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) {
case req := <-rotateCh: case req := <-rotateCh:
processRotate(req) processRotate(req)
default: default:
flushAll() flushDirty()
return return
} }
} }
} }
ticker := time.NewTicker(walFlushInterval)
defer ticker.Stop()
// drainBatch processes up to 4096 pending messages without blocking.
// Returns false if the channel was closed.
drainBatch := func() bool {
for range 4096 {
select {
case msg, ok := <-msgCh:
if !ok {
flushDirty()
return false
}
processMsg(msg)
case req := <-rotateCh:
processRotate(req)
default:
return true
}
}
return true
}
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
@@ -273,23 +305,12 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) {
return return
} }
processMsg(msg) processMsg(msg)
if !drainBatch() {
// Drain up to 256 more messages without blocking to batch writes. return
for range 256 {
select {
case msg, ok := <-msgCh:
if !ok {
return
}
processMsg(msg)
case req := <-rotateCh:
processRotate(req)
default:
goto flushed
}
} }
flushed: // No flush here — timer handles periodic flushing.
flushAll() case <-ticker.C:
flushDirty()
case req := <-rotateCh: case req := <-rotateCh:
processRotate(req) processRotate(req)
} }