diff --git a/pkg/metricstore/lineprotocol.go b/pkg/metricstore/lineprotocol.go index 21f036ed..d7b96b91 100644 --- a/pkg/metricstore/lineprotocol.go +++ b/pkg/metricstore/lineprotocol.go @@ -23,6 +23,7 @@ import ( "context" "fmt" "sync" + "sync/atomic" "time" cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" @@ -53,7 +54,7 @@ func ReceiveNats(ms *MemoryStore, } var wg sync.WaitGroup - msgs := make(chan []byte, workers*2) + msgs := make(chan []byte, max(workers*256, 8192)) for _, sc := range *Keys.Subscriptions { clusterTag := sc.ClusterTag @@ -123,6 +124,10 @@ func reorder(buf, prefix []byte) []byte { } } +// walDropped counts WAL messages dropped due to a full channel. +// Logged periodically; not critical since binary snapshots capture the data. +var walDropped atomic.Int64 + // decodeState holds the per-call scratch buffers used by DecodeLine. // Instances are recycled via decodeStatePool to avoid repeated allocations // during high-throughput ingestion. @@ -348,7 +353,7 @@ func DecodeLine(dec *lineprotocol.Decoder, time := t.Unix() if Keys.Checkpoints.FileFormat == "wal" { - WALMessages <- &WALMessage{ + msg := &WALMessage{ MetricName: string(st.metricBuf), Cluster: cluster, Node: host, @@ -356,6 +361,15 @@ func DecodeLine(dec *lineprotocol.Decoder, Value: metric.Value, Timestamp: time, } + select { + case WALMessages <- msg: + default: + // WAL channel full — metric is written to memory store but not WAL. + // Next binary snapshot will capture it. + if dropped := walDropped.Add(1); dropped%10000 == 1 { + cclog.Warnf("[METRICSTORE]> WAL channel full, dropped %d messages (data safe in memory, next snapshot will capture)", dropped) + } + } } if err := ms.WriteToLevel(lvl, st.selector, time, []Metric{metric}); err != nil { diff --git a/pkg/metricstore/metricstore.go b/pkg/metricstore/metricstore.go index 84dad72b..cd74f481 100644 --- a/pkg/metricstore/metricstore.go +++ b/pkg/metricstore/metricstore.go @@ -184,7 +184,7 @@ func Init(rawConfig json.RawMessage, metrics map[string]MetricConfig, wg *sync.W shutdownFuncMu.Unlock() if Keys.Subscriptions != nil { - err = ReceiveNats(ms, 1, ctx) + err = ReceiveNats(ms, Keys.NumWorkers, ctx) if err != nil { cclog.Fatal(err) } diff --git a/pkg/metricstore/walCheckpoint.go b/pkg/metricstore/walCheckpoint.go index e8a6586f..2131331e 100644 --- a/pkg/metricstore/walCheckpoint.go +++ b/pkg/metricstore/walCheckpoint.go @@ -82,7 +82,7 @@ const ( // WALMessages is the channel for sending metric writes to the WAL staging goroutine. // Buffered to allow burst writes without blocking the metric ingestion path. -var WALMessages = make(chan *WALMessage, 4096) +var WALMessages = make(chan *WALMessage, 65536) // walRotateCh is used by the checkpoint goroutine to request WAL file rotation // (close, delete, reopen) after a binary snapshot has been written. @@ -226,6 +226,28 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) { return } processMsg(msg) + + // Drain up to 256 more messages without blocking to batch writes. + for range 256 { + select { + case msg, ok := <-WALMessages: + if !ok { + return + } + processMsg(msg) + case req := <-walRotateCh: + processRotate(req) + default: + goto flushed + } + } + flushed: + // Flush all buffered writers after processing the batch. + for _, ws := range hostFiles { + if ws.f != nil { + ws.w.Flush() + } + } case req := <-walRotateCh: processRotate(req) }