diff --git a/pkg/metricstore/lineprotocol.go b/pkg/metricstore/lineprotocol.go index d7b96b91..caec82e9 100644 --- a/pkg/metricstore/lineprotocol.go +++ b/pkg/metricstore/lineprotocol.go @@ -361,13 +361,11 @@ func DecodeLine(dec *lineprotocol.Decoder, Value: metric.Value, Timestamp: time, } - select { - case WALMessages <- msg: - default: - // WAL channel full — metric is written to memory store but not WAL. + if !SendWALMessage(msg) { + // WAL shard channel full — metric is written to memory store but not WAL. // Next binary snapshot will capture it. if dropped := walDropped.Add(1); dropped%10000 == 1 { - cclog.Warnf("[METRICSTORE]> WAL channel full, dropped %d messages (data safe in memory, next snapshot will capture)", dropped) + cclog.Warnf("[METRICSTORE]> WAL shard channel full, dropped %d messages (data safe in memory, next snapshot will capture)", dropped) } } } diff --git a/pkg/metricstore/metricstore.go b/pkg/metricstore/metricstore.go index cd74f481..83c78ab2 100644 --- a/pkg/metricstore/metricstore.go +++ b/pkg/metricstore/metricstore.go @@ -277,7 +277,9 @@ func Shutdown() { } if Keys.Checkpoints.FileFormat == "wal" { - close(WALMessages) + for _, ch := range walShardChs { + close(ch) + } } cclog.Infof("[METRICSTORE]> Writing to '%s'...\n", Keys.Checkpoints.RootDir) diff --git a/pkg/metricstore/walCheckpoint.go b/pkg/metricstore/walCheckpoint.go index 2131331e..38585cf5 100644 --- a/pkg/metricstore/walCheckpoint.go +++ b/pkg/metricstore/walCheckpoint.go @@ -61,6 +61,7 @@ import ( "encoding/binary" "fmt" "hash/crc32" + "hash/fnv" "io" "math" "os" @@ -80,13 +81,15 @@ const ( snapFileMagic = uint32(0xCC5B0001) // Binary snapshot magic ) -// WALMessages is the channel for sending metric writes to the WAL staging goroutine. -// Buffered to allow burst writes without blocking the metric ingestion path. -var WALMessages = make(chan *WALMessage, 65536) +// walShardChs holds per-shard channels for WAL messages. +// Initialized by WALStaging; nil when WAL is not active. +var walShardChs []chan *WALMessage -// walRotateCh is used by the checkpoint goroutine to request WAL file rotation -// (close, delete, reopen) after a binary snapshot has been written. -var walRotateCh = make(chan walRotateReq, 256) +// walShardRotateChs holds per-shard channels for WAL rotation requests. +var walShardRotateChs []chan walRotateReq + +// walNumShards stores the number of shards (set during WALStaging init). +var walNumShards int // WALMessage represents a single metric write to be appended to the WAL. // Cluster and Node are NOT stored in the WAL record (inferred from file path). @@ -112,162 +115,218 @@ type walFileState struct { w *bufio.Writer } -// WALStaging starts a background goroutine that receives WALMessage items -// and appends binary WAL records to per-host current.wal files. -// Also handles WAL rotation requests from the checkpoint goroutine. +// walShardIndex computes which shard a message belongs to based on cluster+node. +// Uses FNV-1a hash for fast, well-distributed mapping. +func walShardIndex(cluster, node string) int { + h := fnv.New32a() + h.Write([]byte(cluster)) + h.Write([]byte{0}) + h.Write([]byte(node)) + return int(h.Sum32()) % walNumShards +} + +// SendWALMessage routes a WAL message to the appropriate shard channel. +// Returns false if the channel is full (message dropped). +func SendWALMessage(msg *WALMessage) bool { + if walShardChs == nil { + return false + } + shard := walShardIndex(msg.Cluster, msg.Node) + select { + case walShardChs[shard] <- msg: + return true + default: + return false + } +} + +// WALStaging starts N sharded background goroutines that receive WALMessage items +// and append binary WAL records to per-host current.wal files. +// Each shard owns a subset of hosts (determined by hash of cluster+node), +// parallelizing serialization and I/O while keeping per-host writes serial. func WALStaging(wg *sync.WaitGroup, ctx context.Context) { - wg.Go(func() { - if Keys.Checkpoints.FileFormat == "json" { - return - } + if Keys.Checkpoints.FileFormat == "json" { + return + } - hostFiles := make(map[string]*walFileState) + walNumShards = max(Keys.NumWorkers, 1) + chBufSize := max(65536/walNumShards, 1024) - defer func() { - for _, ws := range hostFiles { - if ws.f != nil { - ws.w.Flush() - ws.f.Close() + walShardChs = make([]chan *WALMessage, walNumShards) + walShardRotateChs = make([]chan walRotateReq, walNumShards) + + for i := range walNumShards { + walShardChs[i] = make(chan *WALMessage, chBufSize) + walShardRotateChs[i] = make(chan walRotateReq, 64) + } + + for i := range walNumShards { + msgCh := walShardChs[i] + rotateCh := walShardRotateChs[i] + + wg.Go(func() { + hostFiles := make(map[string]*walFileState) + + defer func() { + for _, ws := range hostFiles { + if ws.f != nil { + ws.w.Flush() + ws.f.Close() + } } - } - }() + }() - getOrOpenWAL := func(hostDir string) *walFileState { - ws, ok := hostFiles[hostDir] - if ok { + getOrOpenWAL := func(hostDir string) *walFileState { + ws, ok := hostFiles[hostDir] + if ok { + return ws + } + + if err := os.MkdirAll(hostDir, CheckpointDirPerms); err != nil { + cclog.Errorf("[METRICSTORE]> WAL: mkdir %s: %v", hostDir, err) + return nil + } + + walPath := path.Join(hostDir, "current.wal") + f, err := os.OpenFile(walPath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, CheckpointFilePerms) + if err != nil { + cclog.Errorf("[METRICSTORE]> WAL: open %s: %v", walPath, err) + return nil + } + + w := bufio.NewWriter(f) + + // Write file header magic if file is new (empty). + info, err := f.Stat() + if err == nil && info.Size() == 0 { + var hdr [4]byte + binary.LittleEndian.PutUint32(hdr[:], walFileMagic) + if _, err := w.Write(hdr[:]); err != nil { + cclog.Errorf("[METRICSTORE]> WAL: write header %s: %v", walPath, err) + f.Close() + return nil + } + } + + ws = &walFileState{f: f, w: w} + hostFiles[hostDir] = ws return ws } - if err := os.MkdirAll(hostDir, CheckpointDirPerms); err != nil { - cclog.Errorf("[METRICSTORE]> WAL: mkdir %s: %v", hostDir, err) - return nil - } - - walPath := path.Join(hostDir, "current.wal") - f, err := os.OpenFile(walPath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, CheckpointFilePerms) - if err != nil { - cclog.Errorf("[METRICSTORE]> WAL: open %s: %v", walPath, err) - return nil - } - - w := bufio.NewWriter(f) - - // Write file header magic if file is new (empty). - info, err := f.Stat() - if err == nil && info.Size() == 0 { - var hdr [4]byte - binary.LittleEndian.PutUint32(hdr[:], walFileMagic) - if _, err := w.Write(hdr[:]); err != nil { - cclog.Errorf("[METRICSTORE]> WAL: write header %s: %v", walPath, err) - f.Close() - return nil - } - } - - ws = &walFileState{f: f, w: w} - hostFiles[hostDir] = ws - return ws - } - - processMsg := func(msg *WALMessage) { - hostDir := path.Join(Keys.Checkpoints.RootDir, msg.Cluster, msg.Node) - ws := getOrOpenWAL(hostDir) - if ws == nil { - return - } - if err := writeWALRecord(ws.w, msg); err != nil { - cclog.Errorf("[METRICSTORE]> WAL: write record: %v", err) - } - } - - processRotate := func(req walRotateReq) { - ws, ok := hostFiles[req.hostDir] - if ok && ws.f != nil { - ws.w.Flush() - ws.f.Close() - walPath := path.Join(req.hostDir, "current.wal") - if err := os.Remove(walPath); err != nil && !os.IsNotExist(err) { - cclog.Errorf("[METRICSTORE]> WAL: remove %s: %v", walPath, err) - } - delete(hostFiles, req.hostDir) - } - close(req.done) - } - - drain := func() { - for { - select { - case msg, ok := <-WALMessages: - if !ok { - return - } - processMsg(msg) - case req := <-walRotateCh: - processRotate(req) - default: - // Flush all buffered writers after draining remaining messages. - for _, ws := range hostFiles { - if ws.f != nil { - ws.w.Flush() - } - } + processMsg := func(msg *WALMessage) { + hostDir := path.Join(Keys.Checkpoints.RootDir, msg.Cluster, msg.Node) + ws := getOrOpenWAL(hostDir) + if ws == nil { return } + if err := writeWALRecordDirect(ws.w, msg); err != nil { + cclog.Errorf("[METRICSTORE]> WAL: write record: %v", err) + } } - } - for { - select { - case <-ctx.Done(): - drain() - return - case msg, ok := <-WALMessages: - if !ok { - return - } - processMsg(msg) - - // Drain up to 256 more messages without blocking to batch writes. - for range 256 { - select { - case msg, ok := <-WALMessages: - if !ok { - return - } - processMsg(msg) - case req := <-walRotateCh: - processRotate(req) - default: - goto flushed + processRotate := func(req walRotateReq) { + ws, ok := hostFiles[req.hostDir] + if ok && ws.f != nil { + ws.w.Flush() + ws.f.Close() + walPath := path.Join(req.hostDir, "current.wal") + if err := os.Remove(walPath); err != nil && !os.IsNotExist(err) { + cclog.Errorf("[METRICSTORE]> WAL: remove %s: %v", walPath, err) } + delete(hostFiles, req.hostDir) } - flushed: - // Flush all buffered writers after processing the batch. + close(req.done) + } + + flushAll := func() { for _, ws := range hostFiles { if ws.f != nil { ws.w.Flush() } } - case req := <-walRotateCh: - processRotate(req) } - } - }) + + drain := func() { + for { + select { + case msg, ok := <-msgCh: + if !ok { + return + } + processMsg(msg) + case req := <-rotateCh: + processRotate(req) + default: + flushAll() + return + } + } + } + + for { + select { + case <-ctx.Done(): + drain() + return + case msg, ok := <-msgCh: + if !ok { + return + } + processMsg(msg) + + // Drain up to 256 more messages without blocking to batch writes. + for range 256 { + select { + case msg, ok := <-msgCh: + if !ok { + return + } + processMsg(msg) + case req := <-rotateCh: + processRotate(req) + default: + goto flushed + } + } + flushed: + flushAll() + case req := <-rotateCh: + processRotate(req) + } + } + }) + } } // RotateWALFiles sends rotation requests for the given host directories -// and blocks until all rotations complete. +// and blocks until all rotations complete. Each request is routed to the +// shard that owns the host directory. func RotateWALFiles(hostDirs []string) { + if walShardRotateChs == nil { + return + } dones := make([]chan struct{}, len(hostDirs)) for i, dir := range hostDirs { dones[i] = make(chan struct{}) - walRotateCh <- walRotateReq{hostDir: dir, done: dones[i]} + // Extract cluster/node from hostDir to find the right shard. + // hostDir = rootDir/cluster/node + shard := walShardIndexFromDir(dir) + walShardRotateChs[shard] <- walRotateReq{hostDir: dir, done: dones[i]} } for _, done := range dones { <-done } } +// walShardIndexFromDir extracts cluster and node from a host directory path +// and returns the shard index. The path format is: rootDir/cluster/node +func walShardIndexFromDir(hostDir string) int { + // hostDir = .../cluster/node — extract last two path components + node := path.Base(hostDir) + cluster := path.Base(path.Dir(hostDir)) + return walShardIndex(cluster, node) +} + // RotateWALFiles sends rotation requests for the given host directories // and blocks until all rotations complete. func RotateWALFilesAfterShutdown(hostDirs []string) { @@ -279,6 +338,81 @@ func RotateWALFilesAfterShutdown(hostDirs []string) { } } +// writeWALRecordDirect encodes a WAL record directly into the bufio.Writer, +// avoiding heap allocations by using a stack-allocated scratch buffer for +// the fixed-size header/trailer and computing CRC inline. +func writeWALRecordDirect(w *bufio.Writer, msg *WALMessage) error { + // Compute payload size. + payloadSize := 8 + 2 + len(msg.MetricName) + 1 + 4 + for _, s := range msg.Selector { + payloadSize += 1 + len(s) + } + + // Write magic + payload length (8 bytes header). + var hdr [8]byte + binary.LittleEndian.PutUint32(hdr[0:4], walRecordMagic) + binary.LittleEndian.PutUint32(hdr[4:8], uint32(payloadSize)) + if _, err := w.Write(hdr[:]); err != nil { + return err + } + + // We need to compute CRC over the payload as we write it. + crc := crc32.NewIEEE() + + // Timestamp (8 bytes). + var scratch [8]byte + binary.LittleEndian.PutUint64(scratch[:8], uint64(msg.Timestamp)) + crc.Write(scratch[:8]) + if _, err := w.Write(scratch[:8]); err != nil { + return err + } + + // Metric name length (2 bytes) + metric name. + binary.LittleEndian.PutUint16(scratch[:2], uint16(len(msg.MetricName))) + crc.Write(scratch[:2]) + if _, err := w.Write(scratch[:2]); err != nil { + return err + } + nameBytes := []byte(msg.MetricName) + crc.Write(nameBytes) + if _, err := w.Write(nameBytes); err != nil { + return err + } + + // Selector count (1 byte). + scratch[0] = byte(len(msg.Selector)) + crc.Write(scratch[:1]) + if _, err := w.Write(scratch[:1]); err != nil { + return err + } + + // Selectors (1-byte length + bytes each). + for _, sel := range msg.Selector { + scratch[0] = byte(len(sel)) + crc.Write(scratch[:1]) + if _, err := w.Write(scratch[:1]); err != nil { + return err + } + selBytes := []byte(sel) + crc.Write(selBytes) + if _, err := w.Write(selBytes); err != nil { + return err + } + } + + // Value (4 bytes, float32 bits). + binary.LittleEndian.PutUint32(scratch[:4], math.Float32bits(float32(msg.Value))) + crc.Write(scratch[:4]) + if _, err := w.Write(scratch[:4]); err != nil { + return err + } + + // CRC32 (4 bytes). + binary.LittleEndian.PutUint32(scratch[:4], crc.Sum32()) + _, err := w.Write(scratch[:4]) + return err +} + // buildWALPayload encodes a WALMessage into a binary payload (without magic/length/CRC). func buildWALPayload(msg *WALMessage) []byte { size := 8 + 2 + len(msg.MetricName) + 1 + 4