mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2026-03-21 07:17:30 +01:00
fix: metricstore NATS contention
Entire-Checkpoint: 7e68050cab59
This commit is contained in:
@@ -23,6 +23,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
|
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
|
||||||
@@ -53,7 +54,7 @@ func ReceiveNats(ms *MemoryStore,
|
|||||||
}
|
}
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
msgs := make(chan []byte, workers*2)
|
msgs := make(chan []byte, max(workers*256, 8192))
|
||||||
|
|
||||||
for _, sc := range *Keys.Subscriptions {
|
for _, sc := range *Keys.Subscriptions {
|
||||||
clusterTag := sc.ClusterTag
|
clusterTag := sc.ClusterTag
|
||||||
@@ -123,6 +124,10 @@ func reorder(buf, prefix []byte) []byte {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// walDropped counts WAL messages dropped due to a full channel.
|
||||||
|
// Logged periodically; not critical since binary snapshots capture the data.
|
||||||
|
var walDropped atomic.Int64
|
||||||
|
|
||||||
// decodeState holds the per-call scratch buffers used by DecodeLine.
|
// decodeState holds the per-call scratch buffers used by DecodeLine.
|
||||||
// Instances are recycled via decodeStatePool to avoid repeated allocations
|
// Instances are recycled via decodeStatePool to avoid repeated allocations
|
||||||
// during high-throughput ingestion.
|
// during high-throughput ingestion.
|
||||||
@@ -348,7 +353,7 @@ func DecodeLine(dec *lineprotocol.Decoder,
|
|||||||
time := t.Unix()
|
time := t.Unix()
|
||||||
|
|
||||||
if Keys.Checkpoints.FileFormat == "wal" {
|
if Keys.Checkpoints.FileFormat == "wal" {
|
||||||
WALMessages <- &WALMessage{
|
msg := &WALMessage{
|
||||||
MetricName: string(st.metricBuf),
|
MetricName: string(st.metricBuf),
|
||||||
Cluster: cluster,
|
Cluster: cluster,
|
||||||
Node: host,
|
Node: host,
|
||||||
@@ -356,6 +361,15 @@ func DecodeLine(dec *lineprotocol.Decoder,
|
|||||||
Value: metric.Value,
|
Value: metric.Value,
|
||||||
Timestamp: time,
|
Timestamp: time,
|
||||||
}
|
}
|
||||||
|
select {
|
||||||
|
case WALMessages <- msg:
|
||||||
|
default:
|
||||||
|
// WAL channel full — metric is written to memory store but not WAL.
|
||||||
|
// Next binary snapshot will capture it.
|
||||||
|
if dropped := walDropped.Add(1); dropped%10000 == 1 {
|
||||||
|
cclog.Warnf("[METRICSTORE]> WAL channel full, dropped %d messages (data safe in memory, next snapshot will capture)", dropped)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := ms.WriteToLevel(lvl, st.selector, time, []Metric{metric}); err != nil {
|
if err := ms.WriteToLevel(lvl, st.selector, time, []Metric{metric}); err != nil {
|
||||||
|
|||||||
@@ -184,7 +184,7 @@ func Init(rawConfig json.RawMessage, metrics map[string]MetricConfig, wg *sync.W
|
|||||||
shutdownFuncMu.Unlock()
|
shutdownFuncMu.Unlock()
|
||||||
|
|
||||||
if Keys.Subscriptions != nil {
|
if Keys.Subscriptions != nil {
|
||||||
err = ReceiveNats(ms, 1, ctx)
|
err = ReceiveNats(ms, Keys.NumWorkers, ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cclog.Fatal(err)
|
cclog.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -82,7 +82,7 @@ const (
|
|||||||
|
|
||||||
// WALMessages is the channel for sending metric writes to the WAL staging goroutine.
|
// WALMessages is the channel for sending metric writes to the WAL staging goroutine.
|
||||||
// Buffered to allow burst writes without blocking the metric ingestion path.
|
// Buffered to allow burst writes without blocking the metric ingestion path.
|
||||||
var WALMessages = make(chan *WALMessage, 4096)
|
var WALMessages = make(chan *WALMessage, 65536)
|
||||||
|
|
||||||
// walRotateCh is used by the checkpoint goroutine to request WAL file rotation
|
// walRotateCh is used by the checkpoint goroutine to request WAL file rotation
|
||||||
// (close, delete, reopen) after a binary snapshot has been written.
|
// (close, delete, reopen) after a binary snapshot has been written.
|
||||||
@@ -226,6 +226,28 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
processMsg(msg)
|
processMsg(msg)
|
||||||
|
|
||||||
|
// Drain up to 256 more messages without blocking to batch writes.
|
||||||
|
for range 256 {
|
||||||
|
select {
|
||||||
|
case msg, ok := <-WALMessages:
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
processMsg(msg)
|
||||||
|
case req := <-walRotateCh:
|
||||||
|
processRotate(req)
|
||||||
|
default:
|
||||||
|
goto flushed
|
||||||
|
}
|
||||||
|
}
|
||||||
|
flushed:
|
||||||
|
// Flush all buffered writers after processing the batch.
|
||||||
|
for _, ws := range hostFiles {
|
||||||
|
if ws.f != nil {
|
||||||
|
ws.w.Flush()
|
||||||
|
}
|
||||||
|
}
|
||||||
case req := <-walRotateCh:
|
case req := <-walRotateCh:
|
||||||
processRotate(req)
|
processRotate(req)
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user