mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2026-03-26 09:37:30 +01:00
Fix bugs in WAL journal pipeline
Entire-Checkpoint: 8fe0de4e6ac2
This commit is contained in:
@@ -281,6 +281,12 @@ func Shutdown() {
|
|||||||
cclog.Infof("[METRICSTORE]> Background workers cancelled (%v)", time.Since(totalStart))
|
cclog.Infof("[METRICSTORE]> Background workers cancelled (%v)", time.Since(totalStart))
|
||||||
|
|
||||||
if Keys.Checkpoints.FileFormat == "wal" {
|
if Keys.Checkpoints.FileFormat == "wal" {
|
||||||
|
// Signal producers to stop sending before closing channels,
|
||||||
|
// preventing send-on-closed-channel panics from in-flight NATS workers.
|
||||||
|
walShuttingDown.Store(true)
|
||||||
|
// Brief grace period for in-flight DecodeLine calls to complete.
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
|
||||||
for _, ch := range walShardChs {
|
for _, ch := range walShardChs {
|
||||||
close(ch)
|
close(ch)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -95,6 +95,10 @@ var walNumShards int
|
|||||||
// walStagingWg tracks WALStaging goroutine exits for shutdown synchronization.
|
// walStagingWg tracks WALStaging goroutine exits for shutdown synchronization.
|
||||||
var walStagingWg sync.WaitGroup
|
var walStagingWg sync.WaitGroup
|
||||||
|
|
||||||
|
// walShuttingDown is set before closing shard channels to prevent
|
||||||
|
// SendWALMessage from sending on a closed channel (which panics in Go).
|
||||||
|
var walShuttingDown atomic.Bool
|
||||||
|
|
||||||
// WALMessage represents a single metric write to be appended to the WAL.
|
// WALMessage represents a single metric write to be appended to the WAL.
|
||||||
// Cluster and Node are NOT stored in the WAL record (inferred from file path).
|
// Cluster and Node are NOT stored in the WAL record (inferred from file path).
|
||||||
type WALMessage struct {
|
type WALMessage struct {
|
||||||
@@ -136,9 +140,9 @@ func walShardIndex(cluster, node string) int {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// SendWALMessage routes a WAL message to the appropriate shard channel.
|
// SendWALMessage routes a WAL message to the appropriate shard channel.
|
||||||
// Returns false if the channel is full (message dropped).
|
// Returns false if the channel is full or shutdown is in progress.
|
||||||
func SendWALMessage(msg *WALMessage) bool {
|
func SendWALMessage(msg *WALMessage) bool {
|
||||||
if walShardChs == nil {
|
if walShardChs == nil || walShuttingDown.Load() {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
shard := walShardIndex(msg.Cluster, msg.Node)
|
shard := walShardIndex(msg.Cluster, msg.Node)
|
||||||
@@ -320,20 +324,32 @@ func WaitForWALStagingDrain() {
|
|||||||
// RotateWALFiles sends rotation requests for the given host directories
|
// RotateWALFiles sends rotation requests for the given host directories
|
||||||
// and blocks until all rotations complete. Each request is routed to the
|
// and blocks until all rotations complete. Each request is routed to the
|
||||||
// shard that owns the host directory.
|
// shard that owns the host directory.
|
||||||
|
//
|
||||||
|
// If shutdown is in progress (WAL staging goroutines may have exited),
|
||||||
|
// rotation is skipped to avoid deadlocking on abandoned channels.
|
||||||
func RotateWALFiles(hostDirs []string) {
|
func RotateWALFiles(hostDirs []string) {
|
||||||
if walShardRotateChs == nil {
|
if walShardRotateChs == nil || walShuttingDown.Load() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
dones := make([]chan struct{}, len(hostDirs))
|
dones := make([]chan struct{}, 0, len(hostDirs))
|
||||||
for i, dir := range hostDirs {
|
for _, dir := range hostDirs {
|
||||||
dones[i] = make(chan struct{})
|
done := make(chan struct{})
|
||||||
// Extract cluster/node from hostDir to find the right shard.
|
|
||||||
// hostDir = rootDir/cluster/node
|
|
||||||
shard := walShardIndexFromDir(dir)
|
shard := walShardIndexFromDir(dir)
|
||||||
walShardRotateChs[shard] <- walRotateReq{hostDir: dir, done: dones[i]}
|
select {
|
||||||
|
case walShardRotateChs[shard] <- walRotateReq{hostDir: dir, done: done}:
|
||||||
|
dones = append(dones, done)
|
||||||
|
default:
|
||||||
|
// Channel full or goroutine not consuming — skip this host.
|
||||||
|
cclog.Warnf("[METRICSTORE]> WAL rotation skipped for %s (channel full)", dir)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
for _, done := range dones {
|
for _, done := range dones {
|
||||||
<-done
|
select {
|
||||||
|
case <-done:
|
||||||
|
case <-time.After(30 * time.Second):
|
||||||
|
cclog.Warn("[METRICSTORE]> WAL rotation timed out, continuing")
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -357,78 +373,64 @@ func RotateWALFilesAfterShutdown(hostDirs []string) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// writeWALRecordDirect encodes a WAL record directly into the bufio.Writer,
|
// writeWALRecordDirect encodes a WAL record into a contiguous buffer first,
|
||||||
// avoiding heap allocations by using a stack-allocated scratch buffer for
|
// then writes it to the bufio.Writer in a single call. This prevents partial
|
||||||
// the fixed-size header/trailer and computing CRC inline.
|
// records in the write buffer if a write error occurs mid-record (e.g. disk full).
|
||||||
func writeWALRecordDirect(w *bufio.Writer, msg *WALMessage) error {
|
func writeWALRecordDirect(w *bufio.Writer, msg *WALMessage) error {
|
||||||
// Compute payload size.
|
// Compute payload and total record size.
|
||||||
payloadSize := 8 + 2 + len(msg.MetricName) + 1 + 4
|
payloadSize := 8 + 2 + len(msg.MetricName) + 1 + 4
|
||||||
for _, s := range msg.Selector {
|
for _, s := range msg.Selector {
|
||||||
payloadSize += 1 + len(s)
|
payloadSize += 1 + len(s)
|
||||||
}
|
}
|
||||||
|
// Total: 8 (header) + payload + 4 (CRC).
|
||||||
|
totalSize := 8 + payloadSize + 4
|
||||||
|
|
||||||
// Write magic + payload length (8 bytes header).
|
// Use stack buffer for typical small records, heap-allocate only for large ones.
|
||||||
var hdr [8]byte
|
var stackBuf [256]byte
|
||||||
binary.LittleEndian.PutUint32(hdr[0:4], walRecordMagic)
|
var buf []byte
|
||||||
binary.LittleEndian.PutUint32(hdr[4:8], uint32(payloadSize))
|
if totalSize <= len(stackBuf) {
|
||||||
if _, err := w.Write(hdr[:]); err != nil {
|
buf = stackBuf[:totalSize]
|
||||||
return err
|
} else {
|
||||||
|
buf = make([]byte, totalSize)
|
||||||
}
|
}
|
||||||
|
|
||||||
// We need to compute CRC over the payload as we write it.
|
// Header: magic + payload length.
|
||||||
crc := crc32.NewIEEE()
|
binary.LittleEndian.PutUint32(buf[0:4], walRecordMagic)
|
||||||
|
binary.LittleEndian.PutUint32(buf[4:8], uint32(payloadSize))
|
||||||
|
|
||||||
|
// Payload starts at offset 8.
|
||||||
|
p := 8
|
||||||
|
|
||||||
// Timestamp (8 bytes).
|
// Timestamp (8 bytes).
|
||||||
var scratch [8]byte
|
binary.LittleEndian.PutUint64(buf[p:p+8], uint64(msg.Timestamp))
|
||||||
binary.LittleEndian.PutUint64(scratch[:8], uint64(msg.Timestamp))
|
p += 8
|
||||||
crc.Write(scratch[:8])
|
|
||||||
if _, err := w.Write(scratch[:8]); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Metric name length (2 bytes) + metric name.
|
// Metric name length (2 bytes) + metric name.
|
||||||
binary.LittleEndian.PutUint16(scratch[:2], uint16(len(msg.MetricName)))
|
binary.LittleEndian.PutUint16(buf[p:p+2], uint16(len(msg.MetricName)))
|
||||||
crc.Write(scratch[:2])
|
p += 2
|
||||||
if _, err := w.Write(scratch[:2]); err != nil {
|
p += copy(buf[p:], msg.MetricName)
|
||||||
return err
|
|
||||||
}
|
|
||||||
nameBytes := []byte(msg.MetricName)
|
|
||||||
crc.Write(nameBytes)
|
|
||||||
if _, err := w.Write(nameBytes); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Selector count (1 byte).
|
// Selector count (1 byte).
|
||||||
scratch[0] = byte(len(msg.Selector))
|
buf[p] = byte(len(msg.Selector))
|
||||||
crc.Write(scratch[:1])
|
p++
|
||||||
if _, err := w.Write(scratch[:1]); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Selectors (1-byte length + bytes each).
|
// Selectors (1-byte length + bytes each).
|
||||||
for _, sel := range msg.Selector {
|
for _, sel := range msg.Selector {
|
||||||
scratch[0] = byte(len(sel))
|
buf[p] = byte(len(sel))
|
||||||
crc.Write(scratch[:1])
|
p++
|
||||||
if _, err := w.Write(scratch[:1]); err != nil {
|
p += copy(buf[p:], sel)
|
||||||
return err
|
|
||||||
}
|
|
||||||
selBytes := []byte(sel)
|
|
||||||
crc.Write(selBytes)
|
|
||||||
if _, err := w.Write(selBytes); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Value (4 bytes, float32 bits).
|
// Value (4 bytes, float32 bits).
|
||||||
binary.LittleEndian.PutUint32(scratch[:4], math.Float32bits(float32(msg.Value)))
|
binary.LittleEndian.PutUint32(buf[p:p+4], math.Float32bits(float32(msg.Value)))
|
||||||
crc.Write(scratch[:4])
|
p += 4
|
||||||
if _, err := w.Write(scratch[:4]); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// CRC32 (4 bytes).
|
// CRC32 over payload (bytes 8..8+payloadSize).
|
||||||
binary.LittleEndian.PutUint32(scratch[:4], crc.Sum32())
|
crc := crc32.ChecksumIEEE(buf[8 : 8+payloadSize])
|
||||||
_, err := w.Write(scratch[:4])
|
binary.LittleEndian.PutUint32(buf[p:p+4], crc)
|
||||||
|
|
||||||
|
// Single atomic write to the buffered writer.
|
||||||
|
_, err := w.Write(buf)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user