Merge pull request #538 from ClusterCockpit/hotfix

Hotfix
This commit is contained in:
Jan Eitzinger
2026-03-26 08:01:31 +01:00
committed by GitHub
7 changed files with 183 additions and 96 deletions

View File

@@ -1,6 +1,6 @@
TARGET = ./cc-backend TARGET = ./cc-backend
FRONTEND = ./web/frontend FRONTEND = ./web/frontend
VERSION = 1.5.2 VERSION = 1.5.3
GIT_HASH := $(shell git rev-parse --short HEAD || echo 'development') GIT_HASH := $(shell git rev-parse --short HEAD || echo 'development')
CURRENT_TIME = $(shell date +"%Y-%m-%d:T%H:%M:%S") CURRENT_TIME = $(shell date +"%Y-%m-%d:T%H:%M:%S")
LD_FLAGS = '-s -X main.date=${CURRENT_TIME} -X main.version=${VERSION} -X main.commit=${GIT_HASH}' LD_FLAGS = '-s -X main.date=${CURRENT_TIME} -X main.version=${VERSION} -X main.commit=${GIT_HASH}'

View File

@@ -1,4 +1,4 @@
# `cc-backend` version 1.5.2 # `cc-backend` version 1.5.3
Supports job archive version 3 and database version 11. Supports job archive version 3 and database version 11.
@@ -15,6 +15,47 @@ While we are confident that the memory issue with the metricstore cleanup move
policy is fixed, it is still recommended to use delete policy for cleanup. policy is fixed, it is still recommended to use delete policy for cleanup.
This is also the default. This is also the default.
## Changes in 1.5.3
### Bug fixes
- **Doubleranged filter fixes**: Range filters now correctly handle zero as a
boundary value. Improved validation and UI text for "more than equal" and
"less than equal" range selections.
- **Lineprotocol body parsing interrupted**: Switched from `ReadTimeout` to
`ReadHeaderTimeout` so that long-running metric submissions are no longer
cut off mid-stream.
- **Checkpoint archiving continues on error**: A single cluster's archiving
failure no longer aborts the entire cleanup operation. Errors are collected
and reported per cluster.
- **Parquet row group overflow**: Added periodic flush during checkpoint
archiving to prevent exceeding the parquet-go 32k column-write limit.
- **Removed metrics excluded from subcluster config**: Metrics removed from a
subcluster are no longer returned by `GetMetricConfigSubCluster`.
### MetricStore performance
- **WAL writer throughput**: Decoupled WAL file flushing from message processing
using a periodic 5-second batch flush (up to 4096 messages per cycle),
significantly increasing metric ingestion throughput.
- **Improved shutdown time**: HTTP shutdown timeout reduced; metricstore and
archiver now shut down concurrently. Overall shutdown deadline raised to
60 seconds.
### New features
- **Manual checkpoint cleanup flag**: New `-cleanup-checkpoints` CLI flag
triggers checkpoint cleanup without starting the server, useful for
maintenance windows or automated cleanup scripts.
- **Explicit node state queries in node view**: Node health and scheduler state
are now fetched independently from metric data for fresher status information.
### Logging improvements
- **Reduced tagger log noise**: Missing metrics and expression evaluation errors
in the job classification tagger are now logged at debug level instead of
error level.
## Changes in 1.5.2 ## Changes in 1.5.2
### Bug fixes ### Bug fixes

View File

@@ -407,21 +407,27 @@ func (s *Server) Start(ctx context.Context) error {
} }
func (s *Server) Shutdown(ctx context.Context) { func (s *Server) Shutdown(ctx context.Context) {
shutdownCtx, cancel := context.WithTimeout(ctx, 5*time.Second) shutdownStart := time.Now()
defer cancel()
natsStart := time.Now()
nc := nats.GetClient() nc := nats.GetClient()
if nc != nil { if nc != nil {
nc.Close() nc.Close()
} }
cclog.Infof("Shutdown: NATS closed (%v)", time.Since(natsStart))
httpStart := time.Now()
shutdownCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
if err := s.server.Shutdown(shutdownCtx); err != nil { if err := s.server.Shutdown(shutdownCtx); err != nil {
cclog.Errorf("Server shutdown error: %v", err) cclog.Errorf("Server shutdown error: %v", err)
} }
cclog.Infof("Shutdown: HTTP server stopped (%v)", time.Since(httpStart))
// Run metricstore and archiver shutdown concurrently. // Run metricstore and archiver shutdown concurrently.
// They are independent: metricstore writes .bin snapshots, // They are independent: metricstore writes .bin snapshots,
// archiver flushes pending job archives. // archiver flushes pending job archives.
storeStart := time.Now()
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
defer close(done) defer close(done)
@@ -444,7 +450,10 @@ func (s *Server) Shutdown(ctx context.Context) {
select { select {
case <-done: case <-done:
cclog.Infof("Shutdown: metricstore + archiver completed (%v)", time.Since(storeStart))
case <-time.After(60 * time.Second): case <-time.After(60 * time.Second):
cclog.Warn("Shutdown deadline exceeded, forcing exit") cclog.Warnf("Shutdown deadline exceeded after %v, forcing exit", time.Since(shutdownStart))
} }
cclog.Infof("Shutdown: total time %v", time.Since(shutdownStart))
} }

View File

@@ -271,19 +271,32 @@ func (ms *MemoryStore) SetNodeProvider(provider NodeProvider) {
// //
// Note: This function blocks until the final checkpoint is written. // Note: This function blocks until the final checkpoint is written.
func Shutdown() { func Shutdown() {
totalStart := time.Now()
shutdownFuncMu.Lock() shutdownFuncMu.Lock()
defer shutdownFuncMu.Unlock() defer shutdownFuncMu.Unlock()
if shutdownFunc != nil { if shutdownFunc != nil {
shutdownFunc() shutdownFunc()
} }
cclog.Infof("[METRICSTORE]> Background workers cancelled (%v)", time.Since(totalStart))
if Keys.Checkpoints.FileFormat == "wal" { if Keys.Checkpoints.FileFormat == "wal" {
// Signal producers to stop sending before closing channels,
// preventing send-on-closed-channel panics from in-flight NATS workers.
walShuttingDown.Store(true)
// Brief grace period for in-flight DecodeLine calls to complete.
time.Sleep(100 * time.Millisecond)
for _, ch := range walShardChs { for _, ch := range walShardChs {
close(ch) close(ch)
} }
drainStart := time.Now()
WaitForWALStagingDrain()
cclog.Infof("[METRICSTORE]> WAL staging goroutines exited (%v)", time.Since(drainStart))
} }
cclog.Infof("[METRICSTORE]> Writing to '%s'...\n", Keys.Checkpoints.RootDir) cclog.Infof("[METRICSTORE]> Writing checkpoint to '%s'...", Keys.Checkpoints.RootDir)
checkpointStart := time.Now()
var files int var files int
var err error var err error
@@ -294,19 +307,16 @@ func Shutdown() {
lastCheckpointMu.Unlock() lastCheckpointMu.Unlock()
if Keys.Checkpoints.FileFormat == "wal" { if Keys.Checkpoints.FileFormat == "wal" {
var hostDirs []string // WAL files are deleted per-host inside ToCheckpointWAL workers.
files, hostDirs, err = ms.ToCheckpointWAL(Keys.Checkpoints.RootDir, from.Unix(), time.Now().Unix()) files, _, err = ms.ToCheckpointWAL(Keys.Checkpoints.RootDir, from.Unix(), time.Now().Unix())
if err == nil {
RotateWALFilesAfterShutdown(hostDirs)
}
} else { } else {
files, err = ms.ToCheckpoint(Keys.Checkpoints.RootDir, from.Unix(), time.Now().Unix()) files, err = ms.ToCheckpoint(Keys.Checkpoints.RootDir, from.Unix(), time.Now().Unix())
} }
if err != nil { if err != nil {
cclog.Errorf("[METRICSTORE]> Writing checkpoint failed: %s\n", err.Error()) cclog.Errorf("[METRICSTORE]> Writing checkpoint failed: %s", err.Error())
} }
cclog.Infof("[METRICSTORE]> Done! (%d files written)\n", files) cclog.Infof("[METRICSTORE]> Done! (%d files written in %v, total shutdown: %v)", files, time.Since(checkpointStart), time.Since(totalStart))
} }
// Retention starts a background goroutine that periodically frees old metric data. // Retention starts a background goroutine that periodically frees old metric data.

View File

@@ -92,6 +92,13 @@ var walShardRotateChs []chan walRotateReq
// walNumShards stores the number of shards (set during WALStaging init). // walNumShards stores the number of shards (set during WALStaging init).
var walNumShards int var walNumShards int
// walStagingWg tracks WALStaging goroutine exits for shutdown synchronization.
var walStagingWg sync.WaitGroup
// walShuttingDown is set before closing shard channels to prevent
// SendWALMessage from sending on a closed channel (which panics in Go).
var walShuttingDown atomic.Bool
// WALMessage represents a single metric write to be appended to the WAL. // WALMessage represents a single metric write to be appended to the WAL.
// Cluster and Node are NOT stored in the WAL record (inferred from file path). // Cluster and Node are NOT stored in the WAL record (inferred from file path).
type WALMessage struct { type WALMessage struct {
@@ -133,9 +140,9 @@ func walShardIndex(cluster, node string) int {
} }
// SendWALMessage routes a WAL message to the appropriate shard channel. // SendWALMessage routes a WAL message to the appropriate shard channel.
// Returns false if the channel is full (message dropped). // Returns false if the channel is full or shutdown is in progress.
func SendWALMessage(msg *WALMessage) bool { func SendWALMessage(msg *WALMessage) bool {
if walShardChs == nil { if walShardChs == nil || walShuttingDown.Load() {
return false return false
} }
shard := walShardIndex(msg.Cluster, msg.Node) shard := walShardIndex(msg.Cluster, msg.Node)
@@ -171,7 +178,9 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) {
msgCh := walShardChs[i] msgCh := walShardChs[i]
rotateCh := walShardRotateChs[i] rotateCh := walShardRotateChs[i]
walStagingWg.Add(1)
wg.Go(func() { wg.Go(func() {
defer walStagingWg.Done()
hostFiles := make(map[string]*walFileState) hostFiles := make(map[string]*walFileState)
defer func() { defer func() {
@@ -255,23 +264,6 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) {
} }
} }
drain := func() {
for {
select {
case msg, ok := <-msgCh:
if !ok {
return
}
processMsg(msg)
case req := <-rotateCh:
processRotate(req)
default:
flushDirty()
return
}
}
}
ticker := time.NewTicker(walFlushInterval) ticker := time.NewTicker(walFlushInterval)
defer ticker.Stop() defer ticker.Stop()
@@ -298,7 +290,10 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) {
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
drain() // On shutdown, skip draining buffered messages — a full binary
// checkpoint will be written from in-memory state, making
// buffered WAL records redundant.
flushDirty()
return return
case msg, ok := <-msgCh: case msg, ok := <-msgCh:
if !ok { if !ok {
@@ -319,23 +314,42 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) {
} }
} }
// WaitForWALStagingDrain blocks until all WALStaging goroutines have exited.
// Must be called after closing walShardChs to ensure all file handles are
// flushed and closed before checkpoint writes begin.
func WaitForWALStagingDrain() {
walStagingWg.Wait()
}
// RotateWALFiles sends rotation requests for the given host directories // RotateWALFiles sends rotation requests for the given host directories
// and blocks until all rotations complete. Each request is routed to the // and blocks until all rotations complete. Each request is routed to the
// shard that owns the host directory. // shard that owns the host directory.
//
// If shutdown is in progress (WAL staging goroutines may have exited),
// rotation is skipped to avoid deadlocking on abandoned channels.
func RotateWALFiles(hostDirs []string) { func RotateWALFiles(hostDirs []string) {
if walShardRotateChs == nil { if walShardRotateChs == nil || walShuttingDown.Load() {
return return
} }
dones := make([]chan struct{}, len(hostDirs)) dones := make([]chan struct{}, 0, len(hostDirs))
for i, dir := range hostDirs { for _, dir := range hostDirs {
dones[i] = make(chan struct{}) done := make(chan struct{})
// Extract cluster/node from hostDir to find the right shard.
// hostDir = rootDir/cluster/node
shard := walShardIndexFromDir(dir) shard := walShardIndexFromDir(dir)
walShardRotateChs[shard] <- walRotateReq{hostDir: dir, done: dones[i]} select {
case walShardRotateChs[shard] <- walRotateReq{hostDir: dir, done: done}:
dones = append(dones, done)
default:
// Channel full or goroutine not consuming — skip this host.
cclog.Warnf("[METRICSTORE]> WAL rotation skipped for %s (channel full)", dir)
}
} }
for _, done := range dones { for _, done := range dones {
<-done select {
case <-done:
case <-time.After(30 * time.Second):
cclog.Warn("[METRICSTORE]> WAL rotation timed out, continuing")
return
}
} }
} }
@@ -359,78 +373,64 @@ func RotateWALFilesAfterShutdown(hostDirs []string) {
} }
} }
// writeWALRecordDirect encodes a WAL record directly into the bufio.Writer, // writeWALRecordDirect encodes a WAL record into a contiguous buffer first,
// avoiding heap allocations by using a stack-allocated scratch buffer for // then writes it to the bufio.Writer in a single call. This prevents partial
// the fixed-size header/trailer and computing CRC inline. // records in the write buffer if a write error occurs mid-record (e.g. disk full).
func writeWALRecordDirect(w *bufio.Writer, msg *WALMessage) error { func writeWALRecordDirect(w *bufio.Writer, msg *WALMessage) error {
// Compute payload size. // Compute payload and total record size.
payloadSize := 8 + 2 + len(msg.MetricName) + 1 + 4 payloadSize := 8 + 2 + len(msg.MetricName) + 1 + 4
for _, s := range msg.Selector { for _, s := range msg.Selector {
payloadSize += 1 + len(s) payloadSize += 1 + len(s)
} }
// Total: 8 (header) + payload + 4 (CRC).
totalSize := 8 + payloadSize + 4
// Write magic + payload length (8 bytes header). // Use stack buffer for typical small records, heap-allocate only for large ones.
var hdr [8]byte var stackBuf [256]byte
binary.LittleEndian.PutUint32(hdr[0:4], walRecordMagic) var buf []byte
binary.LittleEndian.PutUint32(hdr[4:8], uint32(payloadSize)) if totalSize <= len(stackBuf) {
if _, err := w.Write(hdr[:]); err != nil { buf = stackBuf[:totalSize]
return err } else {
buf = make([]byte, totalSize)
} }
// We need to compute CRC over the payload as we write it. // Header: magic + payload length.
crc := crc32.NewIEEE() binary.LittleEndian.PutUint32(buf[0:4], walRecordMagic)
binary.LittleEndian.PutUint32(buf[4:8], uint32(payloadSize))
// Payload starts at offset 8.
p := 8
// Timestamp (8 bytes). // Timestamp (8 bytes).
var scratch [8]byte binary.LittleEndian.PutUint64(buf[p:p+8], uint64(msg.Timestamp))
binary.LittleEndian.PutUint64(scratch[:8], uint64(msg.Timestamp)) p += 8
crc.Write(scratch[:8])
if _, err := w.Write(scratch[:8]); err != nil {
return err
}
// Metric name length (2 bytes) + metric name. // Metric name length (2 bytes) + metric name.
binary.LittleEndian.PutUint16(scratch[:2], uint16(len(msg.MetricName))) binary.LittleEndian.PutUint16(buf[p:p+2], uint16(len(msg.MetricName)))
crc.Write(scratch[:2]) p += 2
if _, err := w.Write(scratch[:2]); err != nil { p += copy(buf[p:], msg.MetricName)
return err
}
nameBytes := []byte(msg.MetricName)
crc.Write(nameBytes)
if _, err := w.Write(nameBytes); err != nil {
return err
}
// Selector count (1 byte). // Selector count (1 byte).
scratch[0] = byte(len(msg.Selector)) buf[p] = byte(len(msg.Selector))
crc.Write(scratch[:1]) p++
if _, err := w.Write(scratch[:1]); err != nil {
return err
}
// Selectors (1-byte length + bytes each). // Selectors (1-byte length + bytes each).
for _, sel := range msg.Selector { for _, sel := range msg.Selector {
scratch[0] = byte(len(sel)) buf[p] = byte(len(sel))
crc.Write(scratch[:1]) p++
if _, err := w.Write(scratch[:1]); err != nil { p += copy(buf[p:], sel)
return err
}
selBytes := []byte(sel)
crc.Write(selBytes)
if _, err := w.Write(selBytes); err != nil {
return err
}
} }
// Value (4 bytes, float32 bits). // Value (4 bytes, float32 bits).
binary.LittleEndian.PutUint32(scratch[:4], math.Float32bits(float32(msg.Value))) binary.LittleEndian.PutUint32(buf[p:p+4], math.Float32bits(float32(msg.Value)))
crc.Write(scratch[:4]) p += 4
if _, err := w.Write(scratch[:4]); err != nil {
return err
}
// CRC32 (4 bytes). // CRC32 over payload (bytes 8..8+payloadSize).
binary.LittleEndian.PutUint32(scratch[:4], crc.Sum32()) crc := crc32.ChecksumIEEE(buf[8 : 8+payloadSize])
_, err := w.Write(scratch[:4]) binary.LittleEndian.PutUint32(buf[p:p+4], crc)
// Single atomic write to the buffered writer.
_, err := w.Write(buf)
return err return err
} }
@@ -655,7 +655,10 @@ func (m *MemoryStore) ToCheckpointWAL(dir string, from, to int64) (int, []string
selector []string selector []string
} }
n, errs := int32(0), int32(0) totalWork := len(levels)
cclog.Infof("[METRICSTORE]> Starting binary checkpoint for %d hosts with %d workers", totalWork, Keys.NumWorkers)
n, errs, completed := int32(0), int32(0), int32(0)
var successDirs []string var successDirs []string
var successMu sync.Mutex var successMu sync.Mutex
@@ -663,6 +666,22 @@ func (m *MemoryStore) ToCheckpointWAL(dir string, from, to int64) (int, []string
wg.Add(Keys.NumWorkers) wg.Add(Keys.NumWorkers)
work := make(chan workItem, Keys.NumWorkers*2) work := make(chan workItem, Keys.NumWorkers*2)
// Progress logging goroutine.
stopProgress := make(chan struct{})
go func() {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
cclog.Infof("[METRICSTORE]> Checkpoint progress: %d/%d hosts (%d written, %d errors)",
atomic.LoadInt32(&completed), totalWork, atomic.LoadInt32(&n), atomic.LoadInt32(&errs))
case <-stopProgress:
return
}
}
}()
for range Keys.NumWorkers { for range Keys.NumWorkers {
go func() { go func() {
defer wg.Done() defer wg.Done()
@@ -670,16 +689,23 @@ func (m *MemoryStore) ToCheckpointWAL(dir string, from, to int64) (int, []string
err := wi.level.toCheckpointBinary(wi.hostDir, from, to, m) err := wi.level.toCheckpointBinary(wi.hostDir, from, to, m)
if err != nil { if err != nil {
if err == ErrNoNewArchiveData { if err == ErrNoNewArchiveData {
atomic.AddInt32(&completed, 1)
continue continue
} }
cclog.Errorf("[METRICSTORE]> binary checkpoint error for %s: %v", wi.hostDir, err) cclog.Errorf("[METRICSTORE]> binary checkpoint error for %s: %v", wi.hostDir, err)
atomic.AddInt32(&errs, 1) atomic.AddInt32(&errs, 1)
} else { } else {
atomic.AddInt32(&n, 1) atomic.AddInt32(&n, 1)
// Delete WAL immediately after successful snapshot.
walPath := path.Join(wi.hostDir, "current.wal")
if err := os.Remove(walPath); err != nil && !os.IsNotExist(err) {
cclog.Errorf("[METRICSTORE]> WAL remove %s: %v", walPath, err)
}
successMu.Lock() successMu.Lock()
successDirs = append(successDirs, wi.hostDir) successDirs = append(successDirs, wi.hostDir)
successMu.Unlock() successMu.Unlock()
} }
atomic.AddInt32(&completed, 1)
} }
}() }()
} }
@@ -694,6 +720,7 @@ func (m *MemoryStore) ToCheckpointWAL(dir string, from, to int64) (int, []string
} }
close(work) close(work)
wg.Wait() wg.Wait()
close(stopProgress)
if errs > 0 { if errs > 0 {
return int(n), successDirs, fmt.Errorf("[METRICSTORE]> %d errors during binary checkpoint (%d successes)", errs, n) return int(n), successDirs, fmt.Errorf("[METRICSTORE]> %d errors during binary checkpoint (%d successes)", errs, n)

View File

@@ -1,12 +1,12 @@
{ {
"name": "cc-frontend", "name": "cc-frontend",
"version": "1.5.2", "version": "1.5.3",
"lockfileVersion": 4, "lockfileVersion": 4,
"requires": true, "requires": true,
"packages": { "packages": {
"": { "": {
"name": "cc-frontend", "name": "cc-frontend",
"version": "1.5.2", "version": "1.5.3",
"license": "MIT", "license": "MIT",
"dependencies": { "dependencies": {
"@rollup/plugin-replace": "^6.0.3", "@rollup/plugin-replace": "^6.0.3",

View File

@@ -1,6 +1,6 @@
{ {
"name": "cc-frontend", "name": "cc-frontend",
"version": "1.5.2", "version": "1.5.3",
"license": "MIT", "license": "MIT",
"scripts": { "scripts": {
"build": "rollup -c", "build": "rollup -c",