4 Commits

Author SHA1 Message Date
dependabot[bot]
0d06b86e79 Bump the npm_and_yarn group across 1 directory with 1 update
Bumps the npm_and_yarn group with 1 update in the /web/frontend directory: [picomatch](https://github.com/micromatch/picomatch).


Updates `picomatch` from 4.0.3 to 4.0.4
- [Release notes](https://github.com/micromatch/picomatch/releases)
- [Changelog](https://github.com/micromatch/picomatch/blob/master/CHANGELOG.md)
- [Commits](https://github.com/micromatch/picomatch/compare/4.0.3...4.0.4)

Updates `picomatch` from 2.3.1 to 2.3.2
- [Release notes](https://github.com/micromatch/picomatch/releases)
- [Changelog](https://github.com/micromatch/picomatch/blob/master/CHANGELOG.md)
- [Commits](https://github.com/micromatch/picomatch/compare/4.0.3...4.0.4)

---
updated-dependencies:
- dependency-name: picomatch
  dependency-version: 4.0.4
  dependency-type: indirect
  dependency-group: npm_and_yarn
- dependency-name: picomatch
  dependency-version: 2.3.2
  dependency-type: indirect
  dependency-group: npm_and_yarn
...

Signed-off-by: dependabot[bot] <support@github.com>
2026-03-25 23:26:34 +00:00
Jan Eitzinger
b7e133fbaf Merge pull request #536 from ClusterCockpit/hotfix
Hotfix
2026-03-25 07:07:49 +01:00
Jan Eitzinger
c3b6d93941 Merge pull request #535 from ClusterCockpit/hotfix
Hotfix
2026-03-24 19:01:49 +01:00
Jan Eitzinger
c13fd68aa9 Merge pull request #534 from ClusterCockpit/hotfix
feat: Add command line switch to trigger manual metricstore checkpoin…
2026-03-23 19:28:06 +01:00
12 changed files with 119 additions and 652 deletions

View File

@@ -1,6 +1,6 @@
TARGET = ./cc-backend
FRONTEND = ./web/frontend
VERSION = 1.5.3
VERSION = 1.5.2
GIT_HASH := $(shell git rev-parse --short HEAD || echo 'development')
CURRENT_TIME = $(shell date +"%Y-%m-%d:T%H:%M:%S")
LD_FLAGS = '-s -X main.date=${CURRENT_TIME} -X main.version=${VERSION} -X main.commit=${GIT_HASH}'

View File

@@ -1,4 +1,4 @@
# `cc-backend` version 1.5.3
# `cc-backend` version 1.5.2
Supports job archive version 3 and database version 11.
@@ -15,47 +15,6 @@ While we are confident that the memory issue with the metricstore cleanup move
policy is fixed, it is still recommended to use delete policy for cleanup.
This is also the default.
## Changes in 1.5.3
### Bug fixes
- **Doubleranged filter fixes**: Range filters now correctly handle zero as a
boundary value. Improved validation and UI text for "more than equal" and
"less than equal" range selections.
- **Lineprotocol body parsing interrupted**: Switched from `ReadTimeout` to
`ReadHeaderTimeout` so that long-running metric submissions are no longer
cut off mid-stream.
- **Checkpoint archiving continues on error**: A single cluster's archiving
failure no longer aborts the entire cleanup operation. Errors are collected
and reported per cluster.
- **Parquet row group overflow**: Added periodic flush during checkpoint
archiving to prevent exceeding the parquet-go 32k column-write limit.
- **Removed metrics excluded from subcluster config**: Metrics removed from a
subcluster are no longer returned by `GetMetricConfigSubCluster`.
### MetricStore performance
- **WAL writer throughput**: Decoupled WAL file flushing from message processing
using a periodic 5-second batch flush (up to 4096 messages per cycle),
significantly increasing metric ingestion throughput.
- **Improved shutdown time**: HTTP shutdown timeout reduced; metricstore and
archiver now shut down concurrently. Overall shutdown deadline raised to
60 seconds.
### New features
- **Manual checkpoint cleanup flag**: New `-cleanup-checkpoints` CLI flag
triggers checkpoint cleanup without starting the server, useful for
maintenance windows or automated cleanup scripts.
- **Explicit node state queries in node view**: Node health and scheduler state
are now fetched independently from metric data for fresher status information.
### Logging improvements
- **Reduced tagger log noise**: Missing metrics and expression evaluation errors
in the job classification tagger are now logged at debug level instead of
error level.
## Changes in 1.5.2
### Bug fixes

View File

@@ -407,27 +407,21 @@ func (s *Server) Start(ctx context.Context) error {
}
func (s *Server) Shutdown(ctx context.Context) {
shutdownStart := time.Now()
shutdownCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
natsStart := time.Now()
nc := nats.GetClient()
if nc != nil {
nc.Close()
}
cclog.Infof("Shutdown: NATS closed (%v)", time.Since(natsStart))
httpStart := time.Now()
shutdownCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
if err := s.server.Shutdown(shutdownCtx); err != nil {
cclog.Errorf("Server shutdown error: %v", err)
}
cclog.Infof("Shutdown: HTTP server stopped (%v)", time.Since(httpStart))
// Run metricstore and archiver shutdown concurrently.
// They are independent: metricstore writes .bin snapshots,
// archiver flushes pending job archives.
storeStart := time.Now()
done := make(chan struct{})
go func() {
defer close(done)
@@ -450,10 +444,7 @@ func (s *Server) Shutdown(ctx context.Context) {
select {
case <-done:
cclog.Infof("Shutdown: metricstore + archiver completed (%v)", time.Since(storeStart))
case <-time.After(60 * time.Second):
cclog.Warnf("Shutdown deadline exceeded after %v, forcing exit", time.Since(shutdownStart))
cclog.Warn("Shutdown deadline exceeded, forcing exit")
}
cclog.Infof("Shutdown: total time %v", time.Since(shutdownStart))
}

View File

@@ -53,16 +53,6 @@ func cleanUpWorker(wg *sync.WaitGroup, ctx context.Context, interval string, mod
return
}
// Account for checkpoint span: files named {from}.bin contain data up to
// from+checkpointInterval. Subtract the checkpoint interval so we don't
// delete files whose data still falls within the retention window.
checkpointSpan := 12 * time.Hour
if Keys.CheckpointInterval != "" {
if parsed, err := time.ParseDuration(Keys.CheckpointInterval); err == nil {
checkpointSpan = parsed
}
}
ticker := time.NewTicker(d)
defer ticker.Stop()
@@ -71,7 +61,7 @@ func cleanUpWorker(wg *sync.WaitGroup, ctx context.Context, interval string, mod
case <-ctx.Done():
return
case <-ticker.C:
t := time.Now().Add(-d).Add(-checkpointSpan)
t := time.Now().Add(-d)
cclog.Infof("[METRICSTORE]> start %s checkpoints (older than %s)...", mode, t.Format(time.RFC3339))
n, err := CleanupCheckpoints(Keys.Checkpoints.RootDir, cleanupDir, t.Unix(), delete)

View File

@@ -146,9 +146,7 @@ var (
// ErrDataDoesNotAlign indicates that aggregated data from child scopes
// does not align with the parent scope's expected timestamps/intervals.
ErrDataDoesNotAlignMissingFront error = errors.New("[METRICSTORE]> data from lower granularities does not align (missing data prior to start of the buffers)")
ErrDataDoesNotAlignMissingBack error = errors.New("[METRICSTORE]> data from lower granularities does not align (missing data after the end of the buffers)")
ErrDataDoesNotAlignDataLenMismatch error = errors.New("[METRICSTORE]> data from lower granularities does not align (collected data length is different than expected data length)")
ErrDataDoesNotAlign error = errors.New("[METRICSTORE]> data from lower granularities does not align")
)
// buffer stores time-series data for a single metric at a specific hierarchical level.

View File

@@ -86,16 +86,14 @@ var (
// Checkpointing starts a background worker that periodically saves metric data to disk.
//
// restoreFrom is the earliest timestamp of data loaded from checkpoint files at startup.
// The first periodic checkpoint after restart will cover [restoreFrom, now], ensuring that
// loaded data is re-persisted before old checkpoint files are cleaned up.
// Checkpoints are written every 12 hours (hardcoded).
//
// Format behaviour:
// - "json": Periodic checkpointing every checkpointInterval
// - "wal": Periodic binary snapshots + WAL rotation every checkpointInterval
func Checkpointing(wg *sync.WaitGroup, ctx context.Context, restoreFrom time.Time) {
func Checkpointing(wg *sync.WaitGroup, ctx context.Context) {
lastCheckpointMu.Lock()
lastCheckpoint = restoreFrom
lastCheckpoint = time.Now()
lastCheckpointMu.Unlock()
ms := GetMemoryStore()
@@ -339,35 +337,25 @@ func (l *Level) toCheckpoint(dir string, from, to int64, m *MemoryStore) error {
return ErrNoNewArchiveData
}
finalPath := path.Join(dir, fmt.Sprintf("%d.json", from))
tmpPath := finalPath + ".tmp"
f, err := os.OpenFile(tmpPath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, CheckpointFilePerms)
filepath := path.Join(dir, fmt.Sprintf("%d.json", from))
f, err := os.OpenFile(filepath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, CheckpointFilePerms)
if err != nil && os.IsNotExist(err) {
err = os.MkdirAll(dir, CheckpointDirPerms)
if err == nil {
f, err = os.OpenFile(tmpPath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, CheckpointFilePerms)
f, err = os.OpenFile(filepath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, CheckpointFilePerms)
}
}
if err != nil {
return err
}
defer f.Close()
bw := bufio.NewWriter(f)
if err = json.NewEncoder(bw).Encode(cf); err != nil {
f.Close()
os.Remove(tmpPath)
return err
}
if err = bw.Flush(); err != nil {
f.Close()
os.Remove(tmpPath)
return err
}
f.Close()
return os.Rename(tmpPath, finalPath)
return bw.Flush()
}
// enqueueCheckpointHosts traverses checkpoint directory and enqueues cluster/host pairs.
@@ -482,7 +470,7 @@ func (l *Level) loadFile(cf *CheckpointFile, m *MemoryStore) error {
data: metric.Data[0:n:n],
prev: nil,
next: nil,
archived: false,
archived: true,
}
minfo, ok := m.Metrics[name]

View File

@@ -170,7 +170,7 @@ func Init(rawConfig json.RawMessage, metrics map[string]MetricConfig, wg *sync.W
ctx, shutdown := context.WithCancel(context.Background())
Retention(wg, ctx)
Checkpointing(wg, ctx, restoreFrom)
Checkpointing(wg, ctx)
CleanUp(wg, ctx)
WALStaging(wg, ctx)
MemoryUsageTracker(wg, ctx)
@@ -271,32 +271,19 @@ func (ms *MemoryStore) SetNodeProvider(provider NodeProvider) {
//
// Note: This function blocks until the final checkpoint is written.
func Shutdown() {
totalStart := time.Now()
shutdownFuncMu.Lock()
defer shutdownFuncMu.Unlock()
if shutdownFunc != nil {
shutdownFunc()
}
cclog.Infof("[METRICSTORE]> Background workers cancelled (%v)", time.Since(totalStart))
if Keys.Checkpoints.FileFormat == "wal" {
// Signal producers to stop sending before closing channels,
// preventing send-on-closed-channel panics from in-flight NATS workers.
walShuttingDown.Store(true)
// Brief grace period for in-flight DecodeLine calls to complete.
time.Sleep(100 * time.Millisecond)
for _, ch := range walShardChs {
close(ch)
}
drainStart := time.Now()
WaitForWALStagingDrain()
cclog.Infof("[METRICSTORE]> WAL staging goroutines exited (%v)", time.Since(drainStart))
}
cclog.Infof("[METRICSTORE]> Writing checkpoint to '%s'...", Keys.Checkpoints.RootDir)
checkpointStart := time.Now()
cclog.Infof("[METRICSTORE]> Writing to '%s'...\n", Keys.Checkpoints.RootDir)
var files int
var err error
@@ -307,16 +294,19 @@ func Shutdown() {
lastCheckpointMu.Unlock()
if Keys.Checkpoints.FileFormat == "wal" {
// WAL files are deleted per-host inside ToCheckpointWAL workers.
files, _, err = ms.ToCheckpointWAL(Keys.Checkpoints.RootDir, from.Unix(), time.Now().Unix())
var hostDirs []string
files, hostDirs, err = ms.ToCheckpointWAL(Keys.Checkpoints.RootDir, from.Unix(), time.Now().Unix())
if err == nil {
RotateWALFilesAfterShutdown(hostDirs)
}
} else {
files, err = ms.ToCheckpoint(Keys.Checkpoints.RootDir, from.Unix(), time.Now().Unix())
}
if err != nil {
cclog.Errorf("[METRICSTORE]> Writing checkpoint failed: %s", err.Error())
cclog.Errorf("[METRICSTORE]> Writing checkpoint failed: %s\n", err.Error())
}
cclog.Infof("[METRICSTORE]> Done! (%d files written in %v, total shutdown: %v)", files, time.Since(checkpointStart), time.Since(totalStart))
cclog.Infof("[METRICSTORE]> Done! (%d files written)\n", files)
}
// Retention starts a background goroutine that periodically frees old metric data.
@@ -712,16 +702,16 @@ func (m *MemoryStore) Read(selector util.Selector, metric string, from, to, reso
} else if from != cfrom || to != cto || len(data) != len(cdata) {
missingfront, missingback := int((from-cfrom)/minfo.Frequency), int((to-cto)/minfo.Frequency)
if missingfront != 0 {
return ErrDataDoesNotAlignMissingFront
return ErrDataDoesNotAlign
}
newlen := len(cdata) - missingback
if newlen < 1 {
return ErrDataDoesNotAlignMissingBack
return ErrDataDoesNotAlign
}
cdata = cdata[0:newlen]
if len(cdata) != len(data) {
return ErrDataDoesNotAlignDataLenMismatch
return ErrDataDoesNotAlign
}
from, to = cfrom, cto

View File

@@ -91,10 +91,8 @@ func (m *MemoryStore) Stats(selector util.Selector, metric string, from, to int6
if n == 0 {
from, to = cfrom, cto
} else if from != cfrom {
return ErrDataDoesNotAlignMissingFront
} else if to != cto {
return ErrDataDoesNotAlignMissingBack
} else if from != cfrom || to != cto {
return ErrDataDoesNotAlign
}
samples += stats.Samples

View File

@@ -92,13 +92,6 @@ var walShardRotateChs []chan walRotateReq
// walNumShards stores the number of shards (set during WALStaging init).
var walNumShards int
// walStagingWg tracks WALStaging goroutine exits for shutdown synchronization.
var walStagingWg sync.WaitGroup
// walShuttingDown is set before closing shard channels to prevent
// SendWALMessage from sending on a closed channel (which panics in Go).
var walShuttingDown atomic.Bool
// WALMessage represents a single metric write to be appended to the WAL.
// Cluster and Node are NOT stored in the WAL record (inferred from file path).
type WALMessage struct {
@@ -140,9 +133,9 @@ func walShardIndex(cluster, node string) int {
}
// SendWALMessage routes a WAL message to the appropriate shard channel.
// Returns false if the channel is full or shutdown is in progress.
// Returns false if the channel is full (message dropped).
func SendWALMessage(msg *WALMessage) bool {
if walShardChs == nil || walShuttingDown.Load() {
if walShardChs == nil {
return false
}
shard := walShardIndex(msg.Cluster, msg.Node)
@@ -178,9 +171,7 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) {
msgCh := walShardChs[i]
rotateCh := walShardRotateChs[i]
walStagingWg.Add(1)
wg.Go(func() {
defer walStagingWg.Done()
hostFiles := make(map[string]*walFileState)
defer func() {
@@ -264,6 +255,23 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) {
}
}
drain := func() {
for {
select {
case msg, ok := <-msgCh:
if !ok {
return
}
processMsg(msg)
case req := <-rotateCh:
processRotate(req)
default:
flushDirty()
return
}
}
}
ticker := time.NewTicker(walFlushInterval)
defer ticker.Stop()
@@ -290,10 +298,7 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) {
for {
select {
case <-ctx.Done():
// On shutdown, skip draining buffered messages — a full binary
// checkpoint will be written from in-memory state, making
// buffered WAL records redundant.
flushDirty()
drain()
return
case msg, ok := <-msgCh:
if !ok {
@@ -314,42 +319,23 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) {
}
}
// WaitForWALStagingDrain blocks until all WALStaging goroutines have exited.
// Must be called after closing walShardChs to ensure all file handles are
// flushed and closed before checkpoint writes begin.
func WaitForWALStagingDrain() {
walStagingWg.Wait()
}
// RotateWALFiles sends rotation requests for the given host directories
// and blocks until all rotations complete. Each request is routed to the
// shard that owns the host directory.
//
// If shutdown is in progress (WAL staging goroutines may have exited),
// rotation is skipped to avoid deadlocking on abandoned channels.
func RotateWALFiles(hostDirs []string) {
if walShardRotateChs == nil || walShuttingDown.Load() {
if walShardRotateChs == nil {
return
}
dones := make([]chan struct{}, 0, len(hostDirs))
for _, dir := range hostDirs {
done := make(chan struct{})
dones := make([]chan struct{}, len(hostDirs))
for i, dir := range hostDirs {
dones[i] = make(chan struct{})
// Extract cluster/node from hostDir to find the right shard.
// hostDir = rootDir/cluster/node
shard := walShardIndexFromDir(dir)
select {
case walShardRotateChs[shard] <- walRotateReq{hostDir: dir, done: done}:
dones = append(dones, done)
default:
// Channel full or goroutine not consuming — skip this host.
cclog.Warnf("[METRICSTORE]> WAL rotation skipped for %s (channel full)", dir)
}
walShardRotateChs[shard] <- walRotateReq{hostDir: dir, done: dones[i]}
}
for _, done := range dones {
select {
case <-done:
case <-time.After(30 * time.Second):
cclog.Warn("[METRICSTORE]> WAL rotation timed out, continuing")
return
}
<-done
}
}
@@ -373,64 +359,78 @@ func RotateWALFilesAfterShutdown(hostDirs []string) {
}
}
// writeWALRecordDirect encodes a WAL record into a contiguous buffer first,
// then writes it to the bufio.Writer in a single call. This prevents partial
// records in the write buffer if a write error occurs mid-record (e.g. disk full).
// writeWALRecordDirect encodes a WAL record directly into the bufio.Writer,
// avoiding heap allocations by using a stack-allocated scratch buffer for
// the fixed-size header/trailer and computing CRC inline.
func writeWALRecordDirect(w *bufio.Writer, msg *WALMessage) error {
// Compute payload and total record size.
// Compute payload size.
payloadSize := 8 + 2 + len(msg.MetricName) + 1 + 4
for _, s := range msg.Selector {
payloadSize += 1 + len(s)
}
// Total: 8 (header) + payload + 4 (CRC).
totalSize := 8 + payloadSize + 4
// Use stack buffer for typical small records, heap-allocate only for large ones.
var stackBuf [256]byte
var buf []byte
if totalSize <= len(stackBuf) {
buf = stackBuf[:totalSize]
} else {
buf = make([]byte, totalSize)
// Write magic + payload length (8 bytes header).
var hdr [8]byte
binary.LittleEndian.PutUint32(hdr[0:4], walRecordMagic)
binary.LittleEndian.PutUint32(hdr[4:8], uint32(payloadSize))
if _, err := w.Write(hdr[:]); err != nil {
return err
}
// Header: magic + payload length.
binary.LittleEndian.PutUint32(buf[0:4], walRecordMagic)
binary.LittleEndian.PutUint32(buf[4:8], uint32(payloadSize))
// Payload starts at offset 8.
p := 8
// We need to compute CRC over the payload as we write it.
crc := crc32.NewIEEE()
// Timestamp (8 bytes).
binary.LittleEndian.PutUint64(buf[p:p+8], uint64(msg.Timestamp))
p += 8
var scratch [8]byte
binary.LittleEndian.PutUint64(scratch[:8], uint64(msg.Timestamp))
crc.Write(scratch[:8])
if _, err := w.Write(scratch[:8]); err != nil {
return err
}
// Metric name length (2 bytes) + metric name.
binary.LittleEndian.PutUint16(buf[p:p+2], uint16(len(msg.MetricName)))
p += 2
p += copy(buf[p:], msg.MetricName)
binary.LittleEndian.PutUint16(scratch[:2], uint16(len(msg.MetricName)))
crc.Write(scratch[:2])
if _, err := w.Write(scratch[:2]); err != nil {
return err
}
nameBytes := []byte(msg.MetricName)
crc.Write(nameBytes)
if _, err := w.Write(nameBytes); err != nil {
return err
}
// Selector count (1 byte).
buf[p] = byte(len(msg.Selector))
p++
scratch[0] = byte(len(msg.Selector))
crc.Write(scratch[:1])
if _, err := w.Write(scratch[:1]); err != nil {
return err
}
// Selectors (1-byte length + bytes each).
for _, sel := range msg.Selector {
buf[p] = byte(len(sel))
p++
p += copy(buf[p:], sel)
scratch[0] = byte(len(sel))
crc.Write(scratch[:1])
if _, err := w.Write(scratch[:1]); err != nil {
return err
}
selBytes := []byte(sel)
crc.Write(selBytes)
if _, err := w.Write(selBytes); err != nil {
return err
}
}
// Value (4 bytes, float32 bits).
binary.LittleEndian.PutUint32(buf[p:p+4], math.Float32bits(float32(msg.Value)))
p += 4
binary.LittleEndian.PutUint32(scratch[:4], math.Float32bits(float32(msg.Value)))
crc.Write(scratch[:4])
if _, err := w.Write(scratch[:4]); err != nil {
return err
}
// CRC32 over payload (bytes 8..8+payloadSize).
crc := crc32.ChecksumIEEE(buf[8 : 8+payloadSize])
binary.LittleEndian.PutUint32(buf[p:p+4], crc)
// Single atomic write to the buffered writer.
_, err := w.Write(buf)
// CRC32 (4 bytes).
binary.LittleEndian.PutUint32(scratch[:4], crc.Sum32())
_, err := w.Write(scratch[:4])
return err
}
@@ -655,10 +655,7 @@ func (m *MemoryStore) ToCheckpointWAL(dir string, from, to int64) (int, []string
selector []string
}
totalWork := len(levels)
cclog.Infof("[METRICSTORE]> Starting binary checkpoint for %d hosts with %d workers", totalWork, Keys.NumWorkers)
n, errs, completed := int32(0), int32(0), int32(0)
n, errs := int32(0), int32(0)
var successDirs []string
var successMu sync.Mutex
@@ -666,22 +663,6 @@ func (m *MemoryStore) ToCheckpointWAL(dir string, from, to int64) (int, []string
wg.Add(Keys.NumWorkers)
work := make(chan workItem, Keys.NumWorkers*2)
// Progress logging goroutine.
stopProgress := make(chan struct{})
go func() {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
cclog.Infof("[METRICSTORE]> Checkpoint progress: %d/%d hosts (%d written, %d errors)",
atomic.LoadInt32(&completed), totalWork, atomic.LoadInt32(&n), atomic.LoadInt32(&errs))
case <-stopProgress:
return
}
}
}()
for range Keys.NumWorkers {
go func() {
defer wg.Done()
@@ -689,23 +670,16 @@ func (m *MemoryStore) ToCheckpointWAL(dir string, from, to int64) (int, []string
err := wi.level.toCheckpointBinary(wi.hostDir, from, to, m)
if err != nil {
if err == ErrNoNewArchiveData {
atomic.AddInt32(&completed, 1)
continue
}
cclog.Errorf("[METRICSTORE]> binary checkpoint error for %s: %v", wi.hostDir, err)
atomic.AddInt32(&errs, 1)
} else {
atomic.AddInt32(&n, 1)
// Delete WAL immediately after successful snapshot.
walPath := path.Join(wi.hostDir, "current.wal")
if err := os.Remove(walPath); err != nil && !os.IsNotExist(err) {
cclog.Errorf("[METRICSTORE]> WAL remove %s: %v", walPath, err)
}
successMu.Lock()
successDirs = append(successDirs, wi.hostDir)
successMu.Unlock()
}
atomic.AddInt32(&completed, 1)
}
}()
}
@@ -720,7 +694,6 @@ func (m *MemoryStore) ToCheckpointWAL(dir string, from, to int64) (int, []string
}
close(work)
wg.Wait()
close(stopProgress)
if errs > 0 {
return int(n), successDirs, fmt.Errorf("[METRICSTORE]> %d errors during binary checkpoint (%d successes)", errs, n)

View File

@@ -1,381 +0,0 @@
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved. This file is part of cc-backend.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
// binaryCheckpointReader reads .wal or .bin checkpoint files produced by the
// metricstore WAL/snapshot system and dumps their contents to a human-readable
// .txt file (same name as input, with .txt extension).
//
// Usage:
//
// go run ./tools/binaryCheckpointReader <file.wal|file.bin>
package main
import (
"bufio"
"encoding/binary"
"fmt"
"hash/crc32"
"io"
"math"
"os"
"path/filepath"
"strings"
"time"
)
// Magic numbers matching metricstore/walCheckpoint.go.
const (
walFileMagic = uint32(0xCC1DA701)
walRecordMagic = uint32(0xCC1DA7A1)
snapFileMagic = uint32(0xCC5B0001)
)
func main() {
if len(os.Args) != 2 {
fmt.Fprintf(os.Stderr, "Usage: %s <file.wal|file.bin>\n", os.Args[0])
os.Exit(1)
}
inputPath := os.Args[1]
ext := strings.ToLower(filepath.Ext(inputPath))
if ext != ".wal" && ext != ".bin" {
fmt.Fprintf(os.Stderr, "Error: file must have .wal or .bin extension, got %q\n", ext)
os.Exit(1)
}
f, err := os.Open(inputPath)
if err != nil {
fmt.Fprintf(os.Stderr, "Error opening %s: %v\n", inputPath, err)
os.Exit(1)
}
defer f.Close()
// Output file: replace extension with .txt
outputPath := strings.TrimSuffix(inputPath, filepath.Ext(inputPath)) + ".txt"
out, err := os.Create(outputPath)
if err != nil {
fmt.Fprintf(os.Stderr, "Error creating output %s: %v\n", outputPath, err)
os.Exit(1)
}
defer out.Close()
w := bufio.NewWriter(out)
defer w.Flush()
switch ext {
case ".wal":
err = dumpWAL(f, w)
case ".bin":
err = dumpBinarySnapshot(f, w)
}
if err != nil {
fmt.Fprintf(os.Stderr, "Error reading %s: %v\n", inputPath, err)
os.Exit(1)
}
w.Flush()
fmt.Printf("Output written to %s\n", outputPath)
}
// ---------- WAL reader ----------
func dumpWAL(f *os.File, w *bufio.Writer) error {
br := bufio.NewReader(f)
// Read and verify file header magic.
var fileMagic uint32
if err := binary.Read(br, binary.LittleEndian, &fileMagic); err != nil {
if err == io.EOF {
fmt.Fprintln(w, "WAL file is empty (0 bytes).")
return nil
}
return fmt.Errorf("read file header: %w", err)
}
if fileMagic != walFileMagic {
return fmt.Errorf("invalid WAL file magic 0x%08X (expected 0x%08X)", fileMagic, walFileMagic)
}
fmt.Fprintf(w, "=== WAL File Dump ===\n")
fmt.Fprintf(w, "File: %s\n", f.Name())
fmt.Fprintf(w, "File Magic: 0x%08X (valid)\n\n", fileMagic)
recordNum := 0
for {
msg, err := readWALRecord(br)
if err != nil {
fmt.Fprintf(w, "--- Record #%d: ERROR ---\n", recordNum+1)
fmt.Fprintf(w, " Error: %v\n", err)
fmt.Fprintf(w, " (stopping replay — likely truncated trailing record)\n\n")
break
}
if msg == nil {
break // Clean EOF
}
recordNum++
ts := time.Unix(msg.Timestamp, 0).UTC()
fmt.Fprintf(w, "--- Record #%d ---\n", recordNum)
fmt.Fprintf(w, " Timestamp: %d (%s)\n", msg.Timestamp, ts.Format(time.RFC3339))
fmt.Fprintf(w, " Metric: %s\n", msg.MetricName)
if len(msg.Selector) > 0 {
fmt.Fprintf(w, " Selectors: [%s]\n", strings.Join(msg.Selector, ", "))
} else {
fmt.Fprintf(w, " Selectors: (none)\n")
}
fmt.Fprintf(w, " Value: %g\n\n", msg.Value)
}
fmt.Fprintf(w, "=== Total valid records: %d ===\n", recordNum)
return nil
}
type walMessage struct {
MetricName string
Selector []string
Value float32
Timestamp int64
}
func readWALRecord(r io.Reader) (*walMessage, error) {
var magic uint32
if err := binary.Read(r, binary.LittleEndian, &magic); err != nil {
if err == io.EOF {
return nil, nil
}
return nil, fmt.Errorf("read record magic: %w", err)
}
if magic != walRecordMagic {
return nil, fmt.Errorf("invalid record magic 0x%08X (expected 0x%08X)", magic, walRecordMagic)
}
var payloadLen uint32
if err := binary.Read(r, binary.LittleEndian, &payloadLen); err != nil {
return nil, fmt.Errorf("read payload length: %w", err)
}
if payloadLen > 1<<20 {
return nil, fmt.Errorf("record payload too large: %d bytes", payloadLen)
}
payload := make([]byte, payloadLen)
if _, err := io.ReadFull(r, payload); err != nil {
return nil, fmt.Errorf("read payload: %w", err)
}
var storedCRC uint32
if err := binary.Read(r, binary.LittleEndian, &storedCRC); err != nil {
return nil, fmt.Errorf("read CRC: %w", err)
}
if crc32.ChecksumIEEE(payload) != storedCRC {
return nil, fmt.Errorf("CRC mismatch (truncated write or corruption)")
}
return parseWALPayload(payload)
}
func parseWALPayload(payload []byte) (*walMessage, error) {
if len(payload) < 8+2+1+4 {
return nil, fmt.Errorf("payload too short: %d bytes", len(payload))
}
offset := 0
// Timestamp (8 bytes).
ts := int64(binary.LittleEndian.Uint64(payload[offset : offset+8]))
offset += 8
// Metric name (2-byte length + bytes).
if offset+2 > len(payload) {
return nil, fmt.Errorf("metric name length overflows payload")
}
mLen := int(binary.LittleEndian.Uint16(payload[offset : offset+2]))
offset += 2
if offset+mLen > len(payload) {
return nil, fmt.Errorf("metric name overflows payload")
}
metricName := string(payload[offset : offset+mLen])
offset += mLen
// Selector count (1 byte).
if offset >= len(payload) {
return nil, fmt.Errorf("selector count overflows payload")
}
selCount := int(payload[offset])
offset++
selectors := make([]string, selCount)
for i := range selCount {
if offset >= len(payload) {
return nil, fmt.Errorf("selector[%d] length overflows payload", i)
}
sLen := int(payload[offset])
offset++
if offset+sLen > len(payload) {
return nil, fmt.Errorf("selector[%d] data overflows payload", i)
}
selectors[i] = string(payload[offset : offset+sLen])
offset += sLen
}
// Value (4 bytes, float32 bits).
if offset+4 > len(payload) {
return nil, fmt.Errorf("value overflows payload")
}
bits := binary.LittleEndian.Uint32(payload[offset : offset+4])
value := math.Float32frombits(bits)
return &walMessage{
MetricName: metricName,
Timestamp: ts,
Selector: selectors,
Value: value,
}, nil
}
// ---------- Binary snapshot reader ----------
func dumpBinarySnapshot(f *os.File, w *bufio.Writer) error {
br := bufio.NewReader(f)
var magic uint32
if err := binary.Read(br, binary.LittleEndian, &magic); err != nil {
return fmt.Errorf("read magic: %w", err)
}
if magic != snapFileMagic {
return fmt.Errorf("invalid snapshot magic 0x%08X (expected 0x%08X)", magic, snapFileMagic)
}
var from, to int64
if err := binary.Read(br, binary.LittleEndian, &from); err != nil {
return fmt.Errorf("read from: %w", err)
}
if err := binary.Read(br, binary.LittleEndian, &to); err != nil {
return fmt.Errorf("read to: %w", err)
}
fromTime := time.Unix(from, 0).UTC()
toTime := time.Unix(to, 0).UTC()
fmt.Fprintf(w, "=== Binary Snapshot Dump ===\n")
fmt.Fprintf(w, "File: %s\n", f.Name())
fmt.Fprintf(w, "Magic: 0x%08X (valid)\n", magic)
fmt.Fprintf(w, "From: %d (%s)\n", from, fromTime.Format(time.RFC3339))
fmt.Fprintf(w, "To: %d (%s)\n\n", to, toTime.Format(time.RFC3339))
return dumpBinaryLevel(br, w, 0)
}
func dumpBinaryLevel(r io.Reader, w *bufio.Writer, depth int) error {
indent := strings.Repeat(" ", depth)
var numMetrics uint32
if err := binary.Read(r, binary.LittleEndian, &numMetrics); err != nil {
return fmt.Errorf("read num_metrics: %w", err)
}
if numMetrics > 0 {
fmt.Fprintf(w, "%sMetrics (%d):\n", indent, numMetrics)
}
for i := range numMetrics {
name, err := readString16(r)
if err != nil {
return fmt.Errorf("read metric name [%d]: %w", i, err)
}
var freq, start int64
if err := binary.Read(r, binary.LittleEndian, &freq); err != nil {
return fmt.Errorf("read frequency for %s: %w", name, err)
}
if err := binary.Read(r, binary.LittleEndian, &start); err != nil {
return fmt.Errorf("read start for %s: %w", name, err)
}
var numValues uint32
if err := binary.Read(r, binary.LittleEndian, &numValues); err != nil {
return fmt.Errorf("read num_values for %s: %w", name, err)
}
startTime := time.Unix(start, 0).UTC()
fmt.Fprintf(w, "%s [%s]\n", indent, name)
fmt.Fprintf(w, "%s Frequency: %d s\n", indent, freq)
fmt.Fprintf(w, "%s Start: %d (%s)\n", indent, start, startTime.Format(time.RFC3339))
fmt.Fprintf(w, "%s Values (%d):", indent, numValues)
if numValues == 0 {
fmt.Fprintln(w, " (none)")
} else {
fmt.Fprintln(w)
// Print values in rows of 10 for readability.
for j := range numValues {
var bits uint32
if err := binary.Read(r, binary.LittleEndian, &bits); err != nil {
return fmt.Errorf("read value[%d] for %s: %w", j, name, err)
}
val := math.Float32frombits(bits)
if j%10 == 0 {
if j > 0 {
fmt.Fprintln(w)
}
// Print the timestamp for this row's first value.
rowTS := start + int64(j)*freq
fmt.Fprintf(w, "%s [%s] ", indent, time.Unix(rowTS, 0).UTC().Format("15:04:05"))
}
if math.IsNaN(float64(val)) {
fmt.Fprintf(w, "NaN ")
} else {
fmt.Fprintf(w, "%g ", val)
}
}
fmt.Fprintln(w)
}
}
var numChildren uint32
if err := binary.Read(r, binary.LittleEndian, &numChildren); err != nil {
return fmt.Errorf("read num_children: %w", err)
}
if numChildren > 0 {
fmt.Fprintf(w, "%sChildren (%d):\n", indent, numChildren)
}
for i := range numChildren {
childName, err := readString16(r)
if err != nil {
return fmt.Errorf("read child name [%d]: %w", i, err)
}
fmt.Fprintf(w, "%s [%s]\n", indent, childName)
if err := dumpBinaryLevel(r, w, depth+2); err != nil {
return fmt.Errorf("read child %s: %w", childName, err)
}
}
return nil
}
func readString16(r io.Reader) (string, error) {
var sLen uint16
if err := binary.Read(r, binary.LittleEndian, &sLen); err != nil {
return "", err
}
buf := make([]byte, sLen)
if _, err := io.ReadFull(r, buf); err != nil {
return "", err
}
return string(buf), nil
}

View File

@@ -1,12 +1,12 @@
{
"name": "cc-frontend",
"version": "1.5.3",
"version": "1.5.2",
"lockfileVersion": 4,
"requires": true,
"packages": {
"": {
"name": "cc-frontend",
"version": "1.5.3",
"version": "1.5.2",
"license": "MIT",
"dependencies": {
"@rollup/plugin-replace": "^6.0.3",
@@ -328,9 +328,6 @@
"cpu": [
"arm"
],
"libc": [
"glibc"
],
"license": "MIT",
"optional": true,
"os": [
@@ -344,9 +341,6 @@
"cpu": [
"arm"
],
"libc": [
"musl"
],
"license": "MIT",
"optional": true,
"os": [
@@ -360,9 +354,6 @@
"cpu": [
"arm64"
],
"libc": [
"glibc"
],
"license": "MIT",
"optional": true,
"os": [
@@ -376,9 +367,6 @@
"cpu": [
"arm64"
],
"libc": [
"musl"
],
"license": "MIT",
"optional": true,
"os": [
@@ -392,9 +380,6 @@
"cpu": [
"loong64"
],
"libc": [
"glibc"
],
"license": "MIT",
"optional": true,
"os": [
@@ -408,9 +393,6 @@
"cpu": [
"loong64"
],
"libc": [
"musl"
],
"license": "MIT",
"optional": true,
"os": [
@@ -424,9 +406,6 @@
"cpu": [
"ppc64"
],
"libc": [
"glibc"
],
"license": "MIT",
"optional": true,
"os": [
@@ -440,9 +419,6 @@
"cpu": [
"ppc64"
],
"libc": [
"musl"
],
"license": "MIT",
"optional": true,
"os": [
@@ -456,9 +432,6 @@
"cpu": [
"riscv64"
],
"libc": [
"glibc"
],
"license": "MIT",
"optional": true,
"os": [
@@ -472,9 +445,6 @@
"cpu": [
"riscv64"
],
"libc": [
"musl"
],
"license": "MIT",
"optional": true,
"os": [
@@ -488,9 +458,6 @@
"cpu": [
"s390x"
],
"libc": [
"glibc"
],
"license": "MIT",
"optional": true,
"os": [
@@ -504,9 +471,6 @@
"cpu": [
"x64"
],
"libc": [
"glibc"
],
"license": "MIT",
"optional": true,
"os": [
@@ -520,9 +484,6 @@
"cpu": [
"x64"
],
"libc": [
"musl"
],
"license": "MIT",
"optional": true,
"os": [
@@ -998,9 +959,9 @@
"license": "MIT"
},
"node_modules/picomatch": {
"version": "4.0.3",
"resolved": "https://registry.npmjs.org/picomatch/-/picomatch-4.0.3.tgz",
"integrity": "sha512-5gTmgEY/sqK6gFXLIsQNH19lWb4ebPDLA4SdLP7dsWkIXHWlG66oPuVvXSGFPppYZz8ZDZq0dYYrbHfBCVUb1Q==",
"version": "4.0.4",
"resolved": "https://registry.npmjs.org/picomatch/-/picomatch-4.0.4.tgz",
"integrity": "sha512-QP88BAKvMam/3NxH6vj2o21R6MjxZUAd6nlwAS/pnGvN9IVLocLHxGYIzFhg6fUQ+5th6P4dv4eW9jX3DSIj7A==",
"license": "MIT",
"engines": {
"node": ">=12"
@@ -1134,9 +1095,9 @@
}
},
"node_modules/rollup-plugin-svelte/node_modules/picomatch": {
"version": "2.3.1",
"resolved": "https://registry.npmjs.org/picomatch/-/picomatch-2.3.1.tgz",
"integrity": "sha512-JU3teHTNjmE2VCGFzuY8EXzCDVwEqB2a8fsIvwaStHhAWJEeVd1o1QD80CU6+ZdEXXSLbSsuLwJjkCBWqRQUVA==",
"version": "2.3.2",
"resolved": "https://registry.npmjs.org/picomatch/-/picomatch-2.3.2.tgz",
"integrity": "sha512-V7+vQEJ06Z+c5tSye8S+nHUfI51xoXIXjHQ99cQtKUkQqqO1kO/KCJUfZXuB47h/YBlDhah2H3hdUGXn8ie0oA==",
"dev": true,
"license": "MIT",
"engines": {

View File

@@ -1,6 +1,6 @@
{
"name": "cc-frontend",
"version": "1.5.3",
"version": "1.5.2",
"license": "MIT",
"scripts": {
"build": "rollup -c",