fix: Shard WAL consumer for higher throughput

Entire-Checkpoint: e583b7b11439
This commit is contained in:
2026-03-18 06:32:14 +01:00
parent 50aed595cf
commit bf1a8a174e
3 changed files with 269 additions and 135 deletions

View File

@@ -361,13 +361,11 @@ func DecodeLine(dec *lineprotocol.Decoder,
Value: metric.Value,
Timestamp: time,
}
select {
case WALMessages <- msg:
default:
// WAL channel full — metric is written to memory store but not WAL.
if !SendWALMessage(msg) {
// WAL shard channel full — metric is written to memory store but not WAL.
// Next binary snapshot will capture it.
if dropped := walDropped.Add(1); dropped%10000 == 1 {
cclog.Warnf("[METRICSTORE]> WAL channel full, dropped %d messages (data safe in memory, next snapshot will capture)", dropped)
cclog.Warnf("[METRICSTORE]> WAL shard channel full, dropped %d messages (data safe in memory, next snapshot will capture)", dropped)
}
}
}

View File

@@ -277,7 +277,9 @@ func Shutdown() {
}
if Keys.Checkpoints.FileFormat == "wal" {
close(WALMessages)
for _, ch := range walShardChs {
close(ch)
}
}
cclog.Infof("[METRICSTORE]> Writing to '%s'...\n", Keys.Checkpoints.RootDir)

View File

@@ -61,6 +61,7 @@ import (
"encoding/binary"
"fmt"
"hash/crc32"
"hash/fnv"
"io"
"math"
"os"
@@ -80,13 +81,15 @@ const (
snapFileMagic = uint32(0xCC5B0001) // Binary snapshot magic
)
// WALMessages is the channel for sending metric writes to the WAL staging goroutine.
// Buffered to allow burst writes without blocking the metric ingestion path.
var WALMessages = make(chan *WALMessage, 65536)
// walShardChs holds per-shard channels for WAL messages.
// Initialized by WALStaging; nil when WAL is not active.
var walShardChs []chan *WALMessage
// walRotateCh is used by the checkpoint goroutine to request WAL file rotation
// (close, delete, reopen) after a binary snapshot has been written.
var walRotateCh = make(chan walRotateReq, 256)
// walShardRotateChs holds per-shard channels for WAL rotation requests.
var walShardRotateChs []chan walRotateReq
// walNumShards stores the number of shards (set during WALStaging init).
var walNumShards int
// WALMessage represents a single metric write to be appended to the WAL.
// Cluster and Node are NOT stored in the WAL record (inferred from file path).
@@ -112,162 +115,218 @@ type walFileState struct {
w *bufio.Writer
}
// WALStaging starts a background goroutine that receives WALMessage items
// and appends binary WAL records to per-host current.wal files.
// Also handles WAL rotation requests from the checkpoint goroutine.
// walShardIndex computes which shard a message belongs to based on cluster+node.
// Uses FNV-1a hash for fast, well-distributed mapping.
func walShardIndex(cluster, node string) int {
h := fnv.New32a()
h.Write([]byte(cluster))
h.Write([]byte{0})
h.Write([]byte(node))
return int(h.Sum32()) % walNumShards
}
// SendWALMessage routes a WAL message to the appropriate shard channel.
// Returns false if the channel is full (message dropped).
func SendWALMessage(msg *WALMessage) bool {
if walShardChs == nil {
return false
}
shard := walShardIndex(msg.Cluster, msg.Node)
select {
case walShardChs[shard] <- msg:
return true
default:
return false
}
}
// WALStaging starts N sharded background goroutines that receive WALMessage items
// and append binary WAL records to per-host current.wal files.
// Each shard owns a subset of hosts (determined by hash of cluster+node),
// parallelizing serialization and I/O while keeping per-host writes serial.
func WALStaging(wg *sync.WaitGroup, ctx context.Context) {
wg.Go(func() {
if Keys.Checkpoints.FileFormat == "json" {
return
}
if Keys.Checkpoints.FileFormat == "json" {
return
}
hostFiles := make(map[string]*walFileState)
walNumShards = max(Keys.NumWorkers, 1)
chBufSize := max(65536/walNumShards, 1024)
defer func() {
for _, ws := range hostFiles {
if ws.f != nil {
ws.w.Flush()
ws.f.Close()
walShardChs = make([]chan *WALMessage, walNumShards)
walShardRotateChs = make([]chan walRotateReq, walNumShards)
for i := range walNumShards {
walShardChs[i] = make(chan *WALMessage, chBufSize)
walShardRotateChs[i] = make(chan walRotateReq, 64)
}
for i := range walNumShards {
msgCh := walShardChs[i]
rotateCh := walShardRotateChs[i]
wg.Go(func() {
hostFiles := make(map[string]*walFileState)
defer func() {
for _, ws := range hostFiles {
if ws.f != nil {
ws.w.Flush()
ws.f.Close()
}
}
}
}()
}()
getOrOpenWAL := func(hostDir string) *walFileState {
ws, ok := hostFiles[hostDir]
if ok {
getOrOpenWAL := func(hostDir string) *walFileState {
ws, ok := hostFiles[hostDir]
if ok {
return ws
}
if err := os.MkdirAll(hostDir, CheckpointDirPerms); err != nil {
cclog.Errorf("[METRICSTORE]> WAL: mkdir %s: %v", hostDir, err)
return nil
}
walPath := path.Join(hostDir, "current.wal")
f, err := os.OpenFile(walPath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, CheckpointFilePerms)
if err != nil {
cclog.Errorf("[METRICSTORE]> WAL: open %s: %v", walPath, err)
return nil
}
w := bufio.NewWriter(f)
// Write file header magic if file is new (empty).
info, err := f.Stat()
if err == nil && info.Size() == 0 {
var hdr [4]byte
binary.LittleEndian.PutUint32(hdr[:], walFileMagic)
if _, err := w.Write(hdr[:]); err != nil {
cclog.Errorf("[METRICSTORE]> WAL: write header %s: %v", walPath, err)
f.Close()
return nil
}
}
ws = &walFileState{f: f, w: w}
hostFiles[hostDir] = ws
return ws
}
if err := os.MkdirAll(hostDir, CheckpointDirPerms); err != nil {
cclog.Errorf("[METRICSTORE]> WAL: mkdir %s: %v", hostDir, err)
return nil
}
walPath := path.Join(hostDir, "current.wal")
f, err := os.OpenFile(walPath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, CheckpointFilePerms)
if err != nil {
cclog.Errorf("[METRICSTORE]> WAL: open %s: %v", walPath, err)
return nil
}
w := bufio.NewWriter(f)
// Write file header magic if file is new (empty).
info, err := f.Stat()
if err == nil && info.Size() == 0 {
var hdr [4]byte
binary.LittleEndian.PutUint32(hdr[:], walFileMagic)
if _, err := w.Write(hdr[:]); err != nil {
cclog.Errorf("[METRICSTORE]> WAL: write header %s: %v", walPath, err)
f.Close()
return nil
}
}
ws = &walFileState{f: f, w: w}
hostFiles[hostDir] = ws
return ws
}
processMsg := func(msg *WALMessage) {
hostDir := path.Join(Keys.Checkpoints.RootDir, msg.Cluster, msg.Node)
ws := getOrOpenWAL(hostDir)
if ws == nil {
return
}
if err := writeWALRecord(ws.w, msg); err != nil {
cclog.Errorf("[METRICSTORE]> WAL: write record: %v", err)
}
}
processRotate := func(req walRotateReq) {
ws, ok := hostFiles[req.hostDir]
if ok && ws.f != nil {
ws.w.Flush()
ws.f.Close()
walPath := path.Join(req.hostDir, "current.wal")
if err := os.Remove(walPath); err != nil && !os.IsNotExist(err) {
cclog.Errorf("[METRICSTORE]> WAL: remove %s: %v", walPath, err)
}
delete(hostFiles, req.hostDir)
}
close(req.done)
}
drain := func() {
for {
select {
case msg, ok := <-WALMessages:
if !ok {
return
}
processMsg(msg)
case req := <-walRotateCh:
processRotate(req)
default:
// Flush all buffered writers after draining remaining messages.
for _, ws := range hostFiles {
if ws.f != nil {
ws.w.Flush()
}
}
processMsg := func(msg *WALMessage) {
hostDir := path.Join(Keys.Checkpoints.RootDir, msg.Cluster, msg.Node)
ws := getOrOpenWAL(hostDir)
if ws == nil {
return
}
if err := writeWALRecordDirect(ws.w, msg); err != nil {
cclog.Errorf("[METRICSTORE]> WAL: write record: %v", err)
}
}
}
for {
select {
case <-ctx.Done():
drain()
return
case msg, ok := <-WALMessages:
if !ok {
return
}
processMsg(msg)
// Drain up to 256 more messages without blocking to batch writes.
for range 256 {
select {
case msg, ok := <-WALMessages:
if !ok {
return
}
processMsg(msg)
case req := <-walRotateCh:
processRotate(req)
default:
goto flushed
processRotate := func(req walRotateReq) {
ws, ok := hostFiles[req.hostDir]
if ok && ws.f != nil {
ws.w.Flush()
ws.f.Close()
walPath := path.Join(req.hostDir, "current.wal")
if err := os.Remove(walPath); err != nil && !os.IsNotExist(err) {
cclog.Errorf("[METRICSTORE]> WAL: remove %s: %v", walPath, err)
}
delete(hostFiles, req.hostDir)
}
flushed:
// Flush all buffered writers after processing the batch.
close(req.done)
}
flushAll := func() {
for _, ws := range hostFiles {
if ws.f != nil {
ws.w.Flush()
}
}
case req := <-walRotateCh:
processRotate(req)
}
}
})
drain := func() {
for {
select {
case msg, ok := <-msgCh:
if !ok {
return
}
processMsg(msg)
case req := <-rotateCh:
processRotate(req)
default:
flushAll()
return
}
}
}
for {
select {
case <-ctx.Done():
drain()
return
case msg, ok := <-msgCh:
if !ok {
return
}
processMsg(msg)
// Drain up to 256 more messages without blocking to batch writes.
for range 256 {
select {
case msg, ok := <-msgCh:
if !ok {
return
}
processMsg(msg)
case req := <-rotateCh:
processRotate(req)
default:
goto flushed
}
}
flushed:
flushAll()
case req := <-rotateCh:
processRotate(req)
}
}
})
}
}
// RotateWALFiles sends rotation requests for the given host directories
// and blocks until all rotations complete.
// and blocks until all rotations complete. Each request is routed to the
// shard that owns the host directory.
func RotateWALFiles(hostDirs []string) {
if walShardRotateChs == nil {
return
}
dones := make([]chan struct{}, len(hostDirs))
for i, dir := range hostDirs {
dones[i] = make(chan struct{})
walRotateCh <- walRotateReq{hostDir: dir, done: dones[i]}
// Extract cluster/node from hostDir to find the right shard.
// hostDir = rootDir/cluster/node
shard := walShardIndexFromDir(dir)
walShardRotateChs[shard] <- walRotateReq{hostDir: dir, done: dones[i]}
}
for _, done := range dones {
<-done
}
}
// walShardIndexFromDir extracts cluster and node from a host directory path
// and returns the shard index. The path format is: rootDir/cluster/node
func walShardIndexFromDir(hostDir string) int {
// hostDir = .../cluster/node — extract last two path components
node := path.Base(hostDir)
cluster := path.Base(path.Dir(hostDir))
return walShardIndex(cluster, node)
}
// RotateWALFiles sends rotation requests for the given host directories
// and blocks until all rotations complete.
func RotateWALFilesAfterShutdown(hostDirs []string) {
@@ -279,6 +338,81 @@ func RotateWALFilesAfterShutdown(hostDirs []string) {
}
}
// writeWALRecordDirect encodes a WAL record directly into the bufio.Writer,
// avoiding heap allocations by using a stack-allocated scratch buffer for
// the fixed-size header/trailer and computing CRC inline.
func writeWALRecordDirect(w *bufio.Writer, msg *WALMessage) error {
// Compute payload size.
payloadSize := 8 + 2 + len(msg.MetricName) + 1 + 4
for _, s := range msg.Selector {
payloadSize += 1 + len(s)
}
// Write magic + payload length (8 bytes header).
var hdr [8]byte
binary.LittleEndian.PutUint32(hdr[0:4], walRecordMagic)
binary.LittleEndian.PutUint32(hdr[4:8], uint32(payloadSize))
if _, err := w.Write(hdr[:]); err != nil {
return err
}
// We need to compute CRC over the payload as we write it.
crc := crc32.NewIEEE()
// Timestamp (8 bytes).
var scratch [8]byte
binary.LittleEndian.PutUint64(scratch[:8], uint64(msg.Timestamp))
crc.Write(scratch[:8])
if _, err := w.Write(scratch[:8]); err != nil {
return err
}
// Metric name length (2 bytes) + metric name.
binary.LittleEndian.PutUint16(scratch[:2], uint16(len(msg.MetricName)))
crc.Write(scratch[:2])
if _, err := w.Write(scratch[:2]); err != nil {
return err
}
nameBytes := []byte(msg.MetricName)
crc.Write(nameBytes)
if _, err := w.Write(nameBytes); err != nil {
return err
}
// Selector count (1 byte).
scratch[0] = byte(len(msg.Selector))
crc.Write(scratch[:1])
if _, err := w.Write(scratch[:1]); err != nil {
return err
}
// Selectors (1-byte length + bytes each).
for _, sel := range msg.Selector {
scratch[0] = byte(len(sel))
crc.Write(scratch[:1])
if _, err := w.Write(scratch[:1]); err != nil {
return err
}
selBytes := []byte(sel)
crc.Write(selBytes)
if _, err := w.Write(selBytes); err != nil {
return err
}
}
// Value (4 bytes, float32 bits).
binary.LittleEndian.PutUint32(scratch[:4], math.Float32bits(float32(msg.Value)))
crc.Write(scratch[:4])
if _, err := w.Write(scratch[:4]); err != nil {
return err
}
// CRC32 (4 bytes).
binary.LittleEndian.PutUint32(scratch[:4], crc.Sum32())
_, err := w.Write(scratch[:4])
return err
}
// buildWALPayload encodes a WALMessage into a binary payload (without magic/length/CRC).
func buildWALPayload(msg *WALMessage) []byte {
size := 8 + 2 + len(msg.MetricName) + 1 + 4