mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2026-03-26 09:37:30 +01:00
Compare commits
4 Commits
dependabot
...
master
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b43c52f5b5 | ||
| 97d65a9e5c | |||
| e759810051 | |||
| b1884fda9d |
2
Makefile
2
Makefile
@@ -1,6 +1,6 @@
|
||||
TARGET = ./cc-backend
|
||||
FRONTEND = ./web/frontend
|
||||
VERSION = 1.5.2
|
||||
VERSION = 1.5.3
|
||||
GIT_HASH := $(shell git rev-parse --short HEAD || echo 'development')
|
||||
CURRENT_TIME = $(shell date +"%Y-%m-%d:T%H:%M:%S")
|
||||
LD_FLAGS = '-s -X main.date=${CURRENT_TIME} -X main.version=${VERSION} -X main.commit=${GIT_HASH}'
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
# `cc-backend` version 1.5.2
|
||||
# `cc-backend` version 1.5.3
|
||||
|
||||
Supports job archive version 3 and database version 11.
|
||||
|
||||
@@ -15,6 +15,47 @@ While we are confident that the memory issue with the metricstore cleanup move
|
||||
policy is fixed, it is still recommended to use delete policy for cleanup.
|
||||
This is also the default.
|
||||
|
||||
## Changes in 1.5.3
|
||||
|
||||
### Bug fixes
|
||||
|
||||
- **Doubleranged filter fixes**: Range filters now correctly handle zero as a
|
||||
boundary value. Improved validation and UI text for "more than equal" and
|
||||
"less than equal" range selections.
|
||||
- **Lineprotocol body parsing interrupted**: Switched from `ReadTimeout` to
|
||||
`ReadHeaderTimeout` so that long-running metric submissions are no longer
|
||||
cut off mid-stream.
|
||||
- **Checkpoint archiving continues on error**: A single cluster's archiving
|
||||
failure no longer aborts the entire cleanup operation. Errors are collected
|
||||
and reported per cluster.
|
||||
- **Parquet row group overflow**: Added periodic flush during checkpoint
|
||||
archiving to prevent exceeding the parquet-go 32k column-write limit.
|
||||
- **Removed metrics excluded from subcluster config**: Metrics removed from a
|
||||
subcluster are no longer returned by `GetMetricConfigSubCluster`.
|
||||
|
||||
### MetricStore performance
|
||||
|
||||
- **WAL writer throughput**: Decoupled WAL file flushing from message processing
|
||||
using a periodic 5-second batch flush (up to 4096 messages per cycle),
|
||||
significantly increasing metric ingestion throughput.
|
||||
- **Improved shutdown time**: HTTP shutdown timeout reduced; metricstore and
|
||||
archiver now shut down concurrently. Overall shutdown deadline raised to
|
||||
60 seconds.
|
||||
|
||||
### New features
|
||||
|
||||
- **Manual checkpoint cleanup flag**: New `-cleanup-checkpoints` CLI flag
|
||||
triggers checkpoint cleanup without starting the server, useful for
|
||||
maintenance windows or automated cleanup scripts.
|
||||
- **Explicit node state queries in node view**: Node health and scheduler state
|
||||
are now fetched independently from metric data for fresher status information.
|
||||
|
||||
### Logging improvements
|
||||
|
||||
- **Reduced tagger log noise**: Missing metrics and expression evaluation errors
|
||||
in the job classification tagger are now logged at debug level instead of
|
||||
error level.
|
||||
|
||||
## Changes in 1.5.2
|
||||
|
||||
### Bug fixes
|
||||
|
||||
@@ -407,21 +407,27 @@ func (s *Server) Start(ctx context.Context) error {
|
||||
}
|
||||
|
||||
func (s *Server) Shutdown(ctx context.Context) {
|
||||
shutdownCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
||||
defer cancel()
|
||||
shutdownStart := time.Now()
|
||||
|
||||
natsStart := time.Now()
|
||||
nc := nats.GetClient()
|
||||
if nc != nil {
|
||||
nc.Close()
|
||||
}
|
||||
cclog.Infof("Shutdown: NATS closed (%v)", time.Since(natsStart))
|
||||
|
||||
httpStart := time.Now()
|
||||
shutdownCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
||||
defer cancel()
|
||||
if err := s.server.Shutdown(shutdownCtx); err != nil {
|
||||
cclog.Errorf("Server shutdown error: %v", err)
|
||||
}
|
||||
cclog.Infof("Shutdown: HTTP server stopped (%v)", time.Since(httpStart))
|
||||
|
||||
// Run metricstore and archiver shutdown concurrently.
|
||||
// They are independent: metricstore writes .bin snapshots,
|
||||
// archiver flushes pending job archives.
|
||||
storeStart := time.Now()
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
defer close(done)
|
||||
@@ -444,7 +450,10 @@ func (s *Server) Shutdown(ctx context.Context) {
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
cclog.Infof("Shutdown: metricstore + archiver completed (%v)", time.Since(storeStart))
|
||||
case <-time.After(60 * time.Second):
|
||||
cclog.Warn("Shutdown deadline exceeded, forcing exit")
|
||||
cclog.Warnf("Shutdown deadline exceeded after %v, forcing exit", time.Since(shutdownStart))
|
||||
}
|
||||
|
||||
cclog.Infof("Shutdown: total time %v", time.Since(shutdownStart))
|
||||
}
|
||||
|
||||
@@ -271,19 +271,32 @@ func (ms *MemoryStore) SetNodeProvider(provider NodeProvider) {
|
||||
//
|
||||
// Note: This function blocks until the final checkpoint is written.
|
||||
func Shutdown() {
|
||||
totalStart := time.Now()
|
||||
|
||||
shutdownFuncMu.Lock()
|
||||
defer shutdownFuncMu.Unlock()
|
||||
if shutdownFunc != nil {
|
||||
shutdownFunc()
|
||||
}
|
||||
cclog.Infof("[METRICSTORE]> Background workers cancelled (%v)", time.Since(totalStart))
|
||||
|
||||
if Keys.Checkpoints.FileFormat == "wal" {
|
||||
// Signal producers to stop sending before closing channels,
|
||||
// preventing send-on-closed-channel panics from in-flight NATS workers.
|
||||
walShuttingDown.Store(true)
|
||||
// Brief grace period for in-flight DecodeLine calls to complete.
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
for _, ch := range walShardChs {
|
||||
close(ch)
|
||||
}
|
||||
drainStart := time.Now()
|
||||
WaitForWALStagingDrain()
|
||||
cclog.Infof("[METRICSTORE]> WAL staging goroutines exited (%v)", time.Since(drainStart))
|
||||
}
|
||||
|
||||
cclog.Infof("[METRICSTORE]> Writing to '%s'...\n", Keys.Checkpoints.RootDir)
|
||||
cclog.Infof("[METRICSTORE]> Writing checkpoint to '%s'...", Keys.Checkpoints.RootDir)
|
||||
checkpointStart := time.Now()
|
||||
var files int
|
||||
var err error
|
||||
|
||||
@@ -294,19 +307,16 @@ func Shutdown() {
|
||||
lastCheckpointMu.Unlock()
|
||||
|
||||
if Keys.Checkpoints.FileFormat == "wal" {
|
||||
var hostDirs []string
|
||||
files, hostDirs, err = ms.ToCheckpointWAL(Keys.Checkpoints.RootDir, from.Unix(), time.Now().Unix())
|
||||
if err == nil {
|
||||
RotateWALFilesAfterShutdown(hostDirs)
|
||||
}
|
||||
// WAL files are deleted per-host inside ToCheckpointWAL workers.
|
||||
files, _, err = ms.ToCheckpointWAL(Keys.Checkpoints.RootDir, from.Unix(), time.Now().Unix())
|
||||
} else {
|
||||
files, err = ms.ToCheckpoint(Keys.Checkpoints.RootDir, from.Unix(), time.Now().Unix())
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
cclog.Errorf("[METRICSTORE]> Writing checkpoint failed: %s\n", err.Error())
|
||||
cclog.Errorf("[METRICSTORE]> Writing checkpoint failed: %s", err.Error())
|
||||
}
|
||||
cclog.Infof("[METRICSTORE]> Done! (%d files written)\n", files)
|
||||
cclog.Infof("[METRICSTORE]> Done! (%d files written in %v, total shutdown: %v)", files, time.Since(checkpointStart), time.Since(totalStart))
|
||||
}
|
||||
|
||||
// Retention starts a background goroutine that periodically frees old metric data.
|
||||
|
||||
@@ -92,6 +92,13 @@ var walShardRotateChs []chan walRotateReq
|
||||
// walNumShards stores the number of shards (set during WALStaging init).
|
||||
var walNumShards int
|
||||
|
||||
// walStagingWg tracks WALStaging goroutine exits for shutdown synchronization.
|
||||
var walStagingWg sync.WaitGroup
|
||||
|
||||
// walShuttingDown is set before closing shard channels to prevent
|
||||
// SendWALMessage from sending on a closed channel (which panics in Go).
|
||||
var walShuttingDown atomic.Bool
|
||||
|
||||
// WALMessage represents a single metric write to be appended to the WAL.
|
||||
// Cluster and Node are NOT stored in the WAL record (inferred from file path).
|
||||
type WALMessage struct {
|
||||
@@ -133,9 +140,9 @@ func walShardIndex(cluster, node string) int {
|
||||
}
|
||||
|
||||
// SendWALMessage routes a WAL message to the appropriate shard channel.
|
||||
// Returns false if the channel is full (message dropped).
|
||||
// Returns false if the channel is full or shutdown is in progress.
|
||||
func SendWALMessage(msg *WALMessage) bool {
|
||||
if walShardChs == nil {
|
||||
if walShardChs == nil || walShuttingDown.Load() {
|
||||
return false
|
||||
}
|
||||
shard := walShardIndex(msg.Cluster, msg.Node)
|
||||
@@ -171,7 +178,9 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) {
|
||||
msgCh := walShardChs[i]
|
||||
rotateCh := walShardRotateChs[i]
|
||||
|
||||
walStagingWg.Add(1)
|
||||
wg.Go(func() {
|
||||
defer walStagingWg.Done()
|
||||
hostFiles := make(map[string]*walFileState)
|
||||
|
||||
defer func() {
|
||||
@@ -255,23 +264,6 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) {
|
||||
}
|
||||
}
|
||||
|
||||
drain := func() {
|
||||
for {
|
||||
select {
|
||||
case msg, ok := <-msgCh:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
processMsg(msg)
|
||||
case req := <-rotateCh:
|
||||
processRotate(req)
|
||||
default:
|
||||
flushDirty()
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ticker := time.NewTicker(walFlushInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
@@ -298,7 +290,10 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
drain()
|
||||
// On shutdown, skip draining buffered messages — a full binary
|
||||
// checkpoint will be written from in-memory state, making
|
||||
// buffered WAL records redundant.
|
||||
flushDirty()
|
||||
return
|
||||
case msg, ok := <-msgCh:
|
||||
if !ok {
|
||||
@@ -319,23 +314,42 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) {
|
||||
}
|
||||
}
|
||||
|
||||
// WaitForWALStagingDrain blocks until all WALStaging goroutines have exited.
|
||||
// Must be called after closing walShardChs to ensure all file handles are
|
||||
// flushed and closed before checkpoint writes begin.
|
||||
func WaitForWALStagingDrain() {
|
||||
walStagingWg.Wait()
|
||||
}
|
||||
|
||||
// RotateWALFiles sends rotation requests for the given host directories
|
||||
// and blocks until all rotations complete. Each request is routed to the
|
||||
// shard that owns the host directory.
|
||||
//
|
||||
// If shutdown is in progress (WAL staging goroutines may have exited),
|
||||
// rotation is skipped to avoid deadlocking on abandoned channels.
|
||||
func RotateWALFiles(hostDirs []string) {
|
||||
if walShardRotateChs == nil {
|
||||
if walShardRotateChs == nil || walShuttingDown.Load() {
|
||||
return
|
||||
}
|
||||
dones := make([]chan struct{}, len(hostDirs))
|
||||
for i, dir := range hostDirs {
|
||||
dones[i] = make(chan struct{})
|
||||
// Extract cluster/node from hostDir to find the right shard.
|
||||
// hostDir = rootDir/cluster/node
|
||||
dones := make([]chan struct{}, 0, len(hostDirs))
|
||||
for _, dir := range hostDirs {
|
||||
done := make(chan struct{})
|
||||
shard := walShardIndexFromDir(dir)
|
||||
walShardRotateChs[shard] <- walRotateReq{hostDir: dir, done: dones[i]}
|
||||
select {
|
||||
case walShardRotateChs[shard] <- walRotateReq{hostDir: dir, done: done}:
|
||||
dones = append(dones, done)
|
||||
default:
|
||||
// Channel full or goroutine not consuming — skip this host.
|
||||
cclog.Warnf("[METRICSTORE]> WAL rotation skipped for %s (channel full)", dir)
|
||||
}
|
||||
}
|
||||
for _, done := range dones {
|
||||
<-done
|
||||
select {
|
||||
case <-done:
|
||||
case <-time.After(30 * time.Second):
|
||||
cclog.Warn("[METRICSTORE]> WAL rotation timed out, continuing")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -359,78 +373,64 @@ func RotateWALFilesAfterShutdown(hostDirs []string) {
|
||||
}
|
||||
}
|
||||
|
||||
// writeWALRecordDirect encodes a WAL record directly into the bufio.Writer,
|
||||
// avoiding heap allocations by using a stack-allocated scratch buffer for
|
||||
// the fixed-size header/trailer and computing CRC inline.
|
||||
// writeWALRecordDirect encodes a WAL record into a contiguous buffer first,
|
||||
// then writes it to the bufio.Writer in a single call. This prevents partial
|
||||
// records in the write buffer if a write error occurs mid-record (e.g. disk full).
|
||||
func writeWALRecordDirect(w *bufio.Writer, msg *WALMessage) error {
|
||||
// Compute payload size.
|
||||
// Compute payload and total record size.
|
||||
payloadSize := 8 + 2 + len(msg.MetricName) + 1 + 4
|
||||
for _, s := range msg.Selector {
|
||||
payloadSize += 1 + len(s)
|
||||
}
|
||||
// Total: 8 (header) + payload + 4 (CRC).
|
||||
totalSize := 8 + payloadSize + 4
|
||||
|
||||
// Write magic + payload length (8 bytes header).
|
||||
var hdr [8]byte
|
||||
binary.LittleEndian.PutUint32(hdr[0:4], walRecordMagic)
|
||||
binary.LittleEndian.PutUint32(hdr[4:8], uint32(payloadSize))
|
||||
if _, err := w.Write(hdr[:]); err != nil {
|
||||
return err
|
||||
// Use stack buffer for typical small records, heap-allocate only for large ones.
|
||||
var stackBuf [256]byte
|
||||
var buf []byte
|
||||
if totalSize <= len(stackBuf) {
|
||||
buf = stackBuf[:totalSize]
|
||||
} else {
|
||||
buf = make([]byte, totalSize)
|
||||
}
|
||||
|
||||
// We need to compute CRC over the payload as we write it.
|
||||
crc := crc32.NewIEEE()
|
||||
// Header: magic + payload length.
|
||||
binary.LittleEndian.PutUint32(buf[0:4], walRecordMagic)
|
||||
binary.LittleEndian.PutUint32(buf[4:8], uint32(payloadSize))
|
||||
|
||||
// Payload starts at offset 8.
|
||||
p := 8
|
||||
|
||||
// Timestamp (8 bytes).
|
||||
var scratch [8]byte
|
||||
binary.LittleEndian.PutUint64(scratch[:8], uint64(msg.Timestamp))
|
||||
crc.Write(scratch[:8])
|
||||
if _, err := w.Write(scratch[:8]); err != nil {
|
||||
return err
|
||||
}
|
||||
binary.LittleEndian.PutUint64(buf[p:p+8], uint64(msg.Timestamp))
|
||||
p += 8
|
||||
|
||||
// Metric name length (2 bytes) + metric name.
|
||||
binary.LittleEndian.PutUint16(scratch[:2], uint16(len(msg.MetricName)))
|
||||
crc.Write(scratch[:2])
|
||||
if _, err := w.Write(scratch[:2]); err != nil {
|
||||
return err
|
||||
}
|
||||
nameBytes := []byte(msg.MetricName)
|
||||
crc.Write(nameBytes)
|
||||
if _, err := w.Write(nameBytes); err != nil {
|
||||
return err
|
||||
}
|
||||
binary.LittleEndian.PutUint16(buf[p:p+2], uint16(len(msg.MetricName)))
|
||||
p += 2
|
||||
p += copy(buf[p:], msg.MetricName)
|
||||
|
||||
// Selector count (1 byte).
|
||||
scratch[0] = byte(len(msg.Selector))
|
||||
crc.Write(scratch[:1])
|
||||
if _, err := w.Write(scratch[:1]); err != nil {
|
||||
return err
|
||||
}
|
||||
buf[p] = byte(len(msg.Selector))
|
||||
p++
|
||||
|
||||
// Selectors (1-byte length + bytes each).
|
||||
for _, sel := range msg.Selector {
|
||||
scratch[0] = byte(len(sel))
|
||||
crc.Write(scratch[:1])
|
||||
if _, err := w.Write(scratch[:1]); err != nil {
|
||||
return err
|
||||
}
|
||||
selBytes := []byte(sel)
|
||||
crc.Write(selBytes)
|
||||
if _, err := w.Write(selBytes); err != nil {
|
||||
return err
|
||||
}
|
||||
buf[p] = byte(len(sel))
|
||||
p++
|
||||
p += copy(buf[p:], sel)
|
||||
}
|
||||
|
||||
// Value (4 bytes, float32 bits).
|
||||
binary.LittleEndian.PutUint32(scratch[:4], math.Float32bits(float32(msg.Value)))
|
||||
crc.Write(scratch[:4])
|
||||
if _, err := w.Write(scratch[:4]); err != nil {
|
||||
return err
|
||||
}
|
||||
binary.LittleEndian.PutUint32(buf[p:p+4], math.Float32bits(float32(msg.Value)))
|
||||
p += 4
|
||||
|
||||
// CRC32 (4 bytes).
|
||||
binary.LittleEndian.PutUint32(scratch[:4], crc.Sum32())
|
||||
_, err := w.Write(scratch[:4])
|
||||
// CRC32 over payload (bytes 8..8+payloadSize).
|
||||
crc := crc32.ChecksumIEEE(buf[8 : 8+payloadSize])
|
||||
binary.LittleEndian.PutUint32(buf[p:p+4], crc)
|
||||
|
||||
// Single atomic write to the buffered writer.
|
||||
_, err := w.Write(buf)
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -655,7 +655,10 @@ func (m *MemoryStore) ToCheckpointWAL(dir string, from, to int64) (int, []string
|
||||
selector []string
|
||||
}
|
||||
|
||||
n, errs := int32(0), int32(0)
|
||||
totalWork := len(levels)
|
||||
cclog.Infof("[METRICSTORE]> Starting binary checkpoint for %d hosts with %d workers", totalWork, Keys.NumWorkers)
|
||||
|
||||
n, errs, completed := int32(0), int32(0), int32(0)
|
||||
var successDirs []string
|
||||
var successMu sync.Mutex
|
||||
|
||||
@@ -663,6 +666,22 @@ func (m *MemoryStore) ToCheckpointWAL(dir string, from, to int64) (int, []string
|
||||
wg.Add(Keys.NumWorkers)
|
||||
work := make(chan workItem, Keys.NumWorkers*2)
|
||||
|
||||
// Progress logging goroutine.
|
||||
stopProgress := make(chan struct{})
|
||||
go func() {
|
||||
ticker := time.NewTicker(10 * time.Second)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
cclog.Infof("[METRICSTORE]> Checkpoint progress: %d/%d hosts (%d written, %d errors)",
|
||||
atomic.LoadInt32(&completed), totalWork, atomic.LoadInt32(&n), atomic.LoadInt32(&errs))
|
||||
case <-stopProgress:
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
for range Keys.NumWorkers {
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
@@ -670,16 +689,23 @@ func (m *MemoryStore) ToCheckpointWAL(dir string, from, to int64) (int, []string
|
||||
err := wi.level.toCheckpointBinary(wi.hostDir, from, to, m)
|
||||
if err != nil {
|
||||
if err == ErrNoNewArchiveData {
|
||||
atomic.AddInt32(&completed, 1)
|
||||
continue
|
||||
}
|
||||
cclog.Errorf("[METRICSTORE]> binary checkpoint error for %s: %v", wi.hostDir, err)
|
||||
atomic.AddInt32(&errs, 1)
|
||||
} else {
|
||||
atomic.AddInt32(&n, 1)
|
||||
// Delete WAL immediately after successful snapshot.
|
||||
walPath := path.Join(wi.hostDir, "current.wal")
|
||||
if err := os.Remove(walPath); err != nil && !os.IsNotExist(err) {
|
||||
cclog.Errorf("[METRICSTORE]> WAL remove %s: %v", walPath, err)
|
||||
}
|
||||
successMu.Lock()
|
||||
successDirs = append(successDirs, wi.hostDir)
|
||||
successMu.Unlock()
|
||||
}
|
||||
atomic.AddInt32(&completed, 1)
|
||||
}
|
||||
}()
|
||||
}
|
||||
@@ -694,6 +720,7 @@ func (m *MemoryStore) ToCheckpointWAL(dir string, from, to int64) (int, []string
|
||||
}
|
||||
close(work)
|
||||
wg.Wait()
|
||||
close(stopProgress)
|
||||
|
||||
if errs > 0 {
|
||||
return int(n), successDirs, fmt.Errorf("[METRICSTORE]> %d errors during binary checkpoint (%d successes)", errs, n)
|
||||
|
||||
4
web/frontend/package-lock.json
generated
4
web/frontend/package-lock.json
generated
@@ -1,12 +1,12 @@
|
||||
{
|
||||
"name": "cc-frontend",
|
||||
"version": "1.5.2",
|
||||
"version": "1.5.3",
|
||||
"lockfileVersion": 4,
|
||||
"requires": true,
|
||||
"packages": {
|
||||
"": {
|
||||
"name": "cc-frontend",
|
||||
"version": "1.5.2",
|
||||
"version": "1.5.3",
|
||||
"license": "MIT",
|
||||
"dependencies": {
|
||||
"@rollup/plugin-replace": "^6.0.3",
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "cc-frontend",
|
||||
"version": "1.5.2",
|
||||
"version": "1.5.3",
|
||||
"license": "MIT",
|
||||
"scripts": {
|
||||
"build": "rollup -c",
|
||||
|
||||
Reference in New Issue
Block a user