5 Commits

Author SHA1 Message Date
Jan Eitzinger
5398246a61 Merge pull request #540 from ClusterCockpit/hotfix
Hotfix
2026-03-27 10:00:32 +01:00
Jan Eitzinger
b43c52f5b5 Merge pull request #538 from ClusterCockpit/hotfix
Hotfix
2026-03-26 08:01:31 +01:00
Jan Eitzinger
b7e133fbaf Merge pull request #536 from ClusterCockpit/hotfix
Hotfix
2026-03-25 07:07:49 +01:00
Jan Eitzinger
c3b6d93941 Merge pull request #535 from ClusterCockpit/hotfix
Hotfix
2026-03-24 19:01:49 +01:00
Jan Eitzinger
c13fd68aa9 Merge pull request #534 from ClusterCockpit/hotfix
feat: Add command line switch to trigger manual metricstore checkpoin…
2026-03-23 19:28:06 +01:00
8 changed files with 29 additions and 109 deletions

View File

@@ -19,16 +19,6 @@ This is also the default.
### Bug fixes
- **WAL not rotated on partial checkpoint failure**: When binary checkpointing
failed for some hosts, WAL files for successfully checkpointed hosts were not
rotated and the checkpoint timestamp was not advanced. Partial successes now
correctly advance the checkpoint and rotate WAL files for completed hosts.
- **Unbounded WAL file growth**: If binary checkpointing consistently failed for
a host, its `current.wal` file grew without limit until disk exhaustion. A new
`max-wal-size` configuration option (in the `checkpoints` block) allows setting
a per-host WAL size cap in bytes. When exceeded, the WAL is force-rotated.
Defaults to 0 (unlimited) for backward compatibility.
- **Doubleranged filter fixes**: Range filters now correctly handle zero as a
boundary value. Improved validation and UI text for "more than equal" and
"less than equal" range selections.
@@ -60,14 +50,6 @@ This is also the default.
- **Explicit node state queries in node view**: Node health and scheduler state
are now fetched independently from metric data for fresher status information.
### New tools
- **binaryCheckpointReader**: New utility tool (`tools/binaryCheckpointReader`)
that reads `.wal` or `.bin` checkpoint files produced by the metricstore
WAL/snapshot system and dumps their contents to a human-readable `.txt` file.
Useful for debugging and inspecting checkpoint data. Usage:
`go run ./tools/binaryCheckpointReader <file.wal|file.bin>`
### Logging improvements
- **Reduced tagger log noise**: Missing metrics and expression evaluation errors

View File

@@ -3,12 +3,6 @@
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
// This file implements the cleanup (archiving or deletion) of old checkpoint files.
//
// The CleanUp worker runs on a timer equal to RetentionInMemory. In "archive" mode
// it converts checkpoint files older than the retention window into per-cluster
// Parquet files and then deletes the originals. In "delete" mode it simply removes
// old checkpoint files.
package metricstore
import (
@@ -25,12 +19,8 @@ import (
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
)
// CleanUp starts a background worker that periodically removes or archives
// checkpoint files older than the configured retention window.
//
// In "archive" mode, old checkpoint files are converted to Parquet and stored
// under Keys.Cleanup.RootDir. In "delete" mode they are simply removed.
// The cleanup interval equals Keys.RetentionInMemory.
// Worker for either Archiving or Deleting files
func CleanUp(wg *sync.WaitGroup, ctx context.Context) {
if Keys.Cleanup.Mode == "archive" {
cclog.Info("[METRICSTORE]> enable archive cleanup to parquet")

View File

@@ -86,12 +86,11 @@ var (
// Checkpointing starts a background worker that periodically saves metric data to disk.
//
// The checkpoint interval is read from Keys.CheckpointInterval (default: 12 hours).
// Checkpoints are written every 12 hours (hardcoded).
//
// Format behaviour:
// - "json": Writes a JSON snapshot file per host every interval
// - "wal": Writes a binary snapshot file per host every interval, then rotates
// the current.wal files for all successfully checkpointed hosts
// - "json": Periodic checkpointing every checkpointInterval
// - "wal": Periodic binary snapshots + WAL rotation every checkpointInterval
func Checkpointing(wg *sync.WaitGroup, ctx context.Context) {
lastCheckpointMu.Lock()
lastCheckpoint = time.Now()
@@ -127,15 +126,10 @@ func Checkpointing(wg *sync.WaitGroup, ctx context.Context) {
cclog.Infof("[METRICSTORE]> start checkpointing (starting at %s)...", from.Format(time.RFC3339))
if Keys.Checkpoints.FileFormat == "wal" {
// Pause WAL writes: the binary snapshot captures all in-memory
// data, so WAL records written during checkpoint are redundant
// and would be deleted during rotation anyway.
walCheckpointActive.Store(true)
n, hostDirs, err := ms.ToCheckpointWAL(Keys.Checkpoints.RootDir, from.Unix(), now.Unix())
if err != nil {
cclog.Errorf("[METRICSTORE]> binary checkpointing failed: %s", err.Error())
}
if n > 0 {
} else {
cclog.Infof("[METRICSTORE]> done: %d binary snapshot files created", n)
lastCheckpointMu.Lock()
lastCheckpoint = now
@@ -143,8 +137,6 @@ func Checkpointing(wg *sync.WaitGroup, ctx context.Context) {
// Rotate WAL files for successfully checkpointed hosts.
RotateWALFiles(hostDirs)
}
walCheckpointActive.Store(false)
walDropped.Store(0)
} else {
n, err := ms.ToCheckpoint(Keys.Checkpoints.RootDir, from.Unix(), now.Unix())
if err != nil {

View File

@@ -61,12 +61,9 @@ const (
// Fields:
// - FileFormat: "json" (human-readable, periodic) or "wal" (binary snapshot + WAL, crash-safe); default is "wal"
// - RootDir: Filesystem path for checkpoint files (created if missing)
// - MaxWALSize: Maximum size in bytes for a single host's WAL file; 0 = unlimited (default).
// When exceeded the WAL is force-rotated to prevent unbounded disk growth.
type Checkpoints struct {
FileFormat string `json:"file-format"`
RootDir string `json:"directory"`
MaxWALSize int64 `json:"max-wal-size,omitempty"`
}
// Debug provides development and profiling options.

View File

@@ -24,11 +24,6 @@ const configSchema = `{
"directory": {
"description": "Path in which the checkpointed files should be placed.",
"type": "string"
},
"max-wal-size": {
"description": "Maximum size in bytes for a single host's WAL file. When exceeded the WAL is force-rotated to prevent unbounded disk growth. Only applies when file-format is 'wal'. 0 means unlimited (default).",
"type": "integer",
"minimum": 0
}
}
},

View File

@@ -8,7 +8,7 @@
//
// The package organizes metrics in a tree structure (cluster → host → component) and
// provides concurrent read/write access to metric data with configurable aggregation strategies.
// Background goroutines handle periodic checkpointing (JSON or WAL/binary format), archiving old data,
// Background goroutines handle periodic checkpointing (JSON or Avro format), archiving old data,
// and enforcing retention policies.
//
// Key features:
@@ -16,7 +16,7 @@
// - Hierarchical data organization (selectors)
// - Concurrent checkpoint/archive workers
// - Support for sum and average aggregation
// - NATS integration for metric ingestion via InfluxDB line protocol
// - NATS integration for metric ingestion
package metricstore
import (
@@ -113,8 +113,7 @@ type MemoryStore struct {
// 6. Optionally subscribes to NATS for real-time metric ingestion
//
// Parameters:
// - rawConfig: JSON configuration for the metric store (see MetricStoreConfig); may be nil to use defaults
// - metrics: Map of metric names to their configurations (frequency and aggregation strategy)
// - rawConfig: JSON configuration for the metric store (see MetricStoreConfig)
// - wg: WaitGroup that will be incremented for each background goroutine started
//
// The function will call cclog.Fatal on critical errors during initialization.

View File

@@ -99,11 +99,6 @@ var walStagingWg sync.WaitGroup
// SendWALMessage from sending on a closed channel (which panics in Go).
var walShuttingDown atomic.Bool
// walCheckpointActive is set during binary checkpoint writes.
// While active, SendWALMessage skips sending (returns true) because the
// snapshot captures all in-memory data, making WAL writes redundant.
var walCheckpointActive atomic.Bool
// WALMessage represents a single metric write to be appended to the WAL.
// Cluster and Node are NOT stored in the WAL record (inferred from file path).
type WALMessage struct {
@@ -127,7 +122,6 @@ type walFileState struct {
f *os.File
w *bufio.Writer
dirty bool
size int64 // approximate bytes written (tracked from open + writes)
}
// walFlushInterval controls how often dirty WAL files are flushed to disk.
@@ -151,9 +145,6 @@ func SendWALMessage(msg *WALMessage) bool {
if walShardChs == nil || walShuttingDown.Load() {
return false
}
if walCheckpointActive.Load() {
return true // Data safe in memory; snapshot will capture it
}
shard := walShardIndex(msg.Cluster, msg.Node)
select {
case walShardChs[shard] <- msg:
@@ -223,11 +214,7 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) {
// Write file header magic if file is new (empty).
info, err := f.Stat()
var fileSize int64
if err == nil {
fileSize = info.Size()
}
if err == nil && fileSize == 0 {
if err == nil && info.Size() == 0 {
var hdr [4]byte
binary.LittleEndian.PutUint32(hdr[:], walFileMagic)
if _, err := w.Write(hdr[:]); err != nil {
@@ -235,10 +222,9 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) {
f.Close()
return nil
}
fileSize = 4
}
ws = &walFileState{f: f, w: w, size: fileSize}
ws = &walFileState{f: f, w: w}
hostFiles[hostDir] = ws
return ws
}
@@ -249,30 +235,9 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) {
if ws == nil {
return
}
// Enforce max WAL size: force-rotate before writing if limit is exceeded.
// The in-memory store still holds the data; only crash-recovery coverage is lost.
if maxSize := Keys.Checkpoints.MaxWALSize; maxSize > 0 && ws.size >= maxSize {
cclog.Warnf("[METRICSTORE]> WAL: force-rotating %s (size %d >= limit %d)",
hostDir, ws.size, maxSize)
ws.w.Flush()
ws.f.Close()
walPath := path.Join(hostDir, "current.wal")
if err := os.Remove(walPath); err != nil && !os.IsNotExist(err) {
cclog.Errorf("[METRICSTORE]> WAL: remove %s: %v", walPath, err)
}
delete(hostFiles, hostDir)
ws = getOrOpenWAL(hostDir)
if ws == nil {
return
}
}
n, err := writeWALRecordDirect(ws.w, msg)
if err != nil {
if err := writeWALRecordDirect(ws.w, msg); err != nil {
cclog.Errorf("[METRICSTORE]> WAL: write record: %v", err)
}
ws.size += int64(n)
ws.dirty = true
}
@@ -366,7 +331,6 @@ func RotateWALFiles(hostDirs []string) {
if walShardRotateChs == nil || walShuttingDown.Load() {
return
}
deadline := time.After(2 * time.Minute)
dones := make([]chan struct{}, 0, len(hostDirs))
for _, dir := range hostDirs {
done := make(chan struct{})
@@ -374,18 +338,16 @@ func RotateWALFiles(hostDirs []string) {
select {
case walShardRotateChs[shard] <- walRotateReq{hostDir: dir, done: done}:
dones = append(dones, done)
case <-deadline:
cclog.Warnf("[METRICSTORE]> WAL rotation send timed out, %d of %d hosts remaining",
len(hostDirs)-len(dones), len(hostDirs))
goto waitDones
default:
// Channel full or goroutine not consuming — skip this host.
cclog.Warnf("[METRICSTORE]> WAL rotation skipped for %s (channel full)", dir)
}
}
waitDones:
for _, done := range dones {
select {
case <-done:
case <-time.After(30 * time.Second):
cclog.Warn("[METRICSTORE]> WAL rotation completion timed out, continuing")
cclog.Warn("[METRICSTORE]> WAL rotation timed out, continuing")
return
}
}
@@ -400,9 +362,8 @@ func walShardIndexFromDir(hostDir string) int {
return walShardIndex(cluster, node)
}
// RotateWALFilesAfterShutdown directly removes current.wal files for the given
// host directories. Used after shutdown, when WALStaging goroutines have already
// exited and the channel-based RotateWALFiles is no longer safe to call.
// RotateWALFiles sends rotation requests for the given host directories
// and blocks until all rotations complete.
func RotateWALFilesAfterShutdown(hostDirs []string) {
for _, dir := range hostDirs {
walPath := path.Join(dir, "current.wal")
@@ -415,8 +376,7 @@ func RotateWALFilesAfterShutdown(hostDirs []string) {
// writeWALRecordDirect encodes a WAL record into a contiguous buffer first,
// then writes it to the bufio.Writer in a single call. This prevents partial
// records in the write buffer if a write error occurs mid-record (e.g. disk full).
// Returns the number of bytes written and any error.
func writeWALRecordDirect(w *bufio.Writer, msg *WALMessage) (int, error) {
func writeWALRecordDirect(w *bufio.Writer, msg *WALMessage) error {
// Compute payload and total record size.
payloadSize := 8 + 2 + len(msg.MetricName) + 1 + 4
for _, s := range msg.Selector {
@@ -470,8 +430,8 @@ func writeWALRecordDirect(w *bufio.Writer, msg *WALMessage) (int, error) {
binary.LittleEndian.PutUint32(buf[p:p+4], crc)
// Single atomic write to the buffered writer.
n, err := w.Write(buf)
return n, err
_, err := w.Write(buf)
return err
}
// readWALRecord reads one WAL record from the reader.
@@ -736,6 +696,11 @@ func (m *MemoryStore) ToCheckpointWAL(dir string, from, to int64) (int, []string
atomic.AddInt32(&errs, 1)
} else {
atomic.AddInt32(&n, 1)
// Delete WAL immediately after successful snapshot.
walPath := path.Join(wi.hostDir, "current.wal")
if err := os.Remove(walPath); err != nil && !os.IsNotExist(err) {
cclog.Errorf("[METRICSTORE]> WAL remove %s: %v", walPath, err)
}
successMu.Lock()
successDirs = append(successDirs, wi.hostDir)
successMu.Unlock()

View File

@@ -57,7 +57,7 @@
let entries = $state([]);
let loading = $state(false);
let error = $state(null);
let timer = null;
let timer = $state(null);
function levelColor(priority) {
if (priority <= 2) return "danger";