5 Commits

Author SHA1 Message Date
Jan Eitzinger
5398246a61 Merge pull request #540 from ClusterCockpit/hotfix
Hotfix
2026-03-27 10:00:32 +01:00
Jan Eitzinger
b43c52f5b5 Merge pull request #538 from ClusterCockpit/hotfix
Hotfix
2026-03-26 08:01:31 +01:00
Jan Eitzinger
b7e133fbaf Merge pull request #536 from ClusterCockpit/hotfix
Hotfix
2026-03-25 07:07:49 +01:00
Jan Eitzinger
c3b6d93941 Merge pull request #535 from ClusterCockpit/hotfix
Hotfix
2026-03-24 19:01:49 +01:00
Jan Eitzinger
c13fd68aa9 Merge pull request #534 from ClusterCockpit/hotfix
feat: Add command line switch to trigger manual metricstore checkpoin…
2026-03-23 19:28:06 +01:00
8 changed files with 29 additions and 109 deletions

View File

@@ -19,16 +19,6 @@ This is also the default.
### Bug fixes ### Bug fixes
- **WAL not rotated on partial checkpoint failure**: When binary checkpointing
failed for some hosts, WAL files for successfully checkpointed hosts were not
rotated and the checkpoint timestamp was not advanced. Partial successes now
correctly advance the checkpoint and rotate WAL files for completed hosts.
- **Unbounded WAL file growth**: If binary checkpointing consistently failed for
a host, its `current.wal` file grew without limit until disk exhaustion. A new
`max-wal-size` configuration option (in the `checkpoints` block) allows setting
a per-host WAL size cap in bytes. When exceeded, the WAL is force-rotated.
Defaults to 0 (unlimited) for backward compatibility.
- **Doubleranged filter fixes**: Range filters now correctly handle zero as a - **Doubleranged filter fixes**: Range filters now correctly handle zero as a
boundary value. Improved validation and UI text for "more than equal" and boundary value. Improved validation and UI text for "more than equal" and
"less than equal" range selections. "less than equal" range selections.
@@ -60,14 +50,6 @@ This is also the default.
- **Explicit node state queries in node view**: Node health and scheduler state - **Explicit node state queries in node view**: Node health and scheduler state
are now fetched independently from metric data for fresher status information. are now fetched independently from metric data for fresher status information.
### New tools
- **binaryCheckpointReader**: New utility tool (`tools/binaryCheckpointReader`)
that reads `.wal` or `.bin` checkpoint files produced by the metricstore
WAL/snapshot system and dumps their contents to a human-readable `.txt` file.
Useful for debugging and inspecting checkpoint data. Usage:
`go run ./tools/binaryCheckpointReader <file.wal|file.bin>`
### Logging improvements ### Logging improvements
- **Reduced tagger log noise**: Missing metrics and expression evaluation errors - **Reduced tagger log noise**: Missing metrics and expression evaluation errors

View File

@@ -3,12 +3,6 @@
// Use of this source code is governed by a MIT-style // Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file. // license that can be found in the LICENSE file.
// This file implements the cleanup (archiving or deletion) of old checkpoint files.
//
// The CleanUp worker runs on a timer equal to RetentionInMemory. In "archive" mode
// it converts checkpoint files older than the retention window into per-cluster
// Parquet files and then deletes the originals. In "delete" mode it simply removes
// old checkpoint files.
package metricstore package metricstore
import ( import (
@@ -25,12 +19,8 @@ import (
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
) )
// CleanUp starts a background worker that periodically removes or archives // Worker for either Archiving or Deleting files
// checkpoint files older than the configured retention window.
//
// In "archive" mode, old checkpoint files are converted to Parquet and stored
// under Keys.Cleanup.RootDir. In "delete" mode they are simply removed.
// The cleanup interval equals Keys.RetentionInMemory.
func CleanUp(wg *sync.WaitGroup, ctx context.Context) { func CleanUp(wg *sync.WaitGroup, ctx context.Context) {
if Keys.Cleanup.Mode == "archive" { if Keys.Cleanup.Mode == "archive" {
cclog.Info("[METRICSTORE]> enable archive cleanup to parquet") cclog.Info("[METRICSTORE]> enable archive cleanup to parquet")

View File

@@ -86,12 +86,11 @@ var (
// Checkpointing starts a background worker that periodically saves metric data to disk. // Checkpointing starts a background worker that periodically saves metric data to disk.
// //
// The checkpoint interval is read from Keys.CheckpointInterval (default: 12 hours). // Checkpoints are written every 12 hours (hardcoded).
// //
// Format behaviour: // Format behaviour:
// - "json": Writes a JSON snapshot file per host every interval // - "json": Periodic checkpointing every checkpointInterval
// - "wal": Writes a binary snapshot file per host every interval, then rotates // - "wal": Periodic binary snapshots + WAL rotation every checkpointInterval
// the current.wal files for all successfully checkpointed hosts
func Checkpointing(wg *sync.WaitGroup, ctx context.Context) { func Checkpointing(wg *sync.WaitGroup, ctx context.Context) {
lastCheckpointMu.Lock() lastCheckpointMu.Lock()
lastCheckpoint = time.Now() lastCheckpoint = time.Now()
@@ -127,15 +126,10 @@ func Checkpointing(wg *sync.WaitGroup, ctx context.Context) {
cclog.Infof("[METRICSTORE]> start checkpointing (starting at %s)...", from.Format(time.RFC3339)) cclog.Infof("[METRICSTORE]> start checkpointing (starting at %s)...", from.Format(time.RFC3339))
if Keys.Checkpoints.FileFormat == "wal" { if Keys.Checkpoints.FileFormat == "wal" {
// Pause WAL writes: the binary snapshot captures all in-memory
// data, so WAL records written during checkpoint are redundant
// and would be deleted during rotation anyway.
walCheckpointActive.Store(true)
n, hostDirs, err := ms.ToCheckpointWAL(Keys.Checkpoints.RootDir, from.Unix(), now.Unix()) n, hostDirs, err := ms.ToCheckpointWAL(Keys.Checkpoints.RootDir, from.Unix(), now.Unix())
if err != nil { if err != nil {
cclog.Errorf("[METRICSTORE]> binary checkpointing failed: %s", err.Error()) cclog.Errorf("[METRICSTORE]> binary checkpointing failed: %s", err.Error())
} } else {
if n > 0 {
cclog.Infof("[METRICSTORE]> done: %d binary snapshot files created", n) cclog.Infof("[METRICSTORE]> done: %d binary snapshot files created", n)
lastCheckpointMu.Lock() lastCheckpointMu.Lock()
lastCheckpoint = now lastCheckpoint = now
@@ -143,8 +137,6 @@ func Checkpointing(wg *sync.WaitGroup, ctx context.Context) {
// Rotate WAL files for successfully checkpointed hosts. // Rotate WAL files for successfully checkpointed hosts.
RotateWALFiles(hostDirs) RotateWALFiles(hostDirs)
} }
walCheckpointActive.Store(false)
walDropped.Store(0)
} else { } else {
n, err := ms.ToCheckpoint(Keys.Checkpoints.RootDir, from.Unix(), now.Unix()) n, err := ms.ToCheckpoint(Keys.Checkpoints.RootDir, from.Unix(), now.Unix())
if err != nil { if err != nil {

View File

@@ -59,14 +59,11 @@ const (
// Checkpoints configures periodic persistence of in-memory metric data. // Checkpoints configures periodic persistence of in-memory metric data.
// //
// Fields: // Fields:
// - FileFormat: "json" (human-readable, periodic) or "wal" (binary snapshot + WAL, crash-safe); default is "wal" // - FileFormat: "json" (human-readable, periodic) or "wal" (binary snapshot + WAL, crash-safe); default is "wal"
// - RootDir: Filesystem path for checkpoint files (created if missing) // - RootDir: Filesystem path for checkpoint files (created if missing)
// - MaxWALSize: Maximum size in bytes for a single host's WAL file; 0 = unlimited (default).
// When exceeded the WAL is force-rotated to prevent unbounded disk growth.
type Checkpoints struct { type Checkpoints struct {
FileFormat string `json:"file-format"` FileFormat string `json:"file-format"`
RootDir string `json:"directory"` RootDir string `json:"directory"`
MaxWALSize int64 `json:"max-wal-size,omitempty"`
} }
// Debug provides development and profiling options. // Debug provides development and profiling options.

View File

@@ -24,11 +24,6 @@ const configSchema = `{
"directory": { "directory": {
"description": "Path in which the checkpointed files should be placed.", "description": "Path in which the checkpointed files should be placed.",
"type": "string" "type": "string"
},
"max-wal-size": {
"description": "Maximum size in bytes for a single host's WAL file. When exceeded the WAL is force-rotated to prevent unbounded disk growth. Only applies when file-format is 'wal'. 0 means unlimited (default).",
"type": "integer",
"minimum": 0
} }
} }
}, },

View File

@@ -8,7 +8,7 @@
// //
// The package organizes metrics in a tree structure (cluster → host → component) and // The package organizes metrics in a tree structure (cluster → host → component) and
// provides concurrent read/write access to metric data with configurable aggregation strategies. // provides concurrent read/write access to metric data with configurable aggregation strategies.
// Background goroutines handle periodic checkpointing (JSON or WAL/binary format), archiving old data, // Background goroutines handle periodic checkpointing (JSON or Avro format), archiving old data,
// and enforcing retention policies. // and enforcing retention policies.
// //
// Key features: // Key features:
@@ -16,7 +16,7 @@
// - Hierarchical data organization (selectors) // - Hierarchical data organization (selectors)
// - Concurrent checkpoint/archive workers // - Concurrent checkpoint/archive workers
// - Support for sum and average aggregation // - Support for sum and average aggregation
// - NATS integration for metric ingestion via InfluxDB line protocol // - NATS integration for metric ingestion
package metricstore package metricstore
import ( import (
@@ -113,8 +113,7 @@ type MemoryStore struct {
// 6. Optionally subscribes to NATS for real-time metric ingestion // 6. Optionally subscribes to NATS for real-time metric ingestion
// //
// Parameters: // Parameters:
// - rawConfig: JSON configuration for the metric store (see MetricStoreConfig); may be nil to use defaults // - rawConfig: JSON configuration for the metric store (see MetricStoreConfig)
// - metrics: Map of metric names to their configurations (frequency and aggregation strategy)
// - wg: WaitGroup that will be incremented for each background goroutine started // - wg: WaitGroup that will be incremented for each background goroutine started
// //
// The function will call cclog.Fatal on critical errors during initialization. // The function will call cclog.Fatal on critical errors during initialization.

View File

@@ -99,11 +99,6 @@ var walStagingWg sync.WaitGroup
// SendWALMessage from sending on a closed channel (which panics in Go). // SendWALMessage from sending on a closed channel (which panics in Go).
var walShuttingDown atomic.Bool var walShuttingDown atomic.Bool
// walCheckpointActive is set during binary checkpoint writes.
// While active, SendWALMessage skips sending (returns true) because the
// snapshot captures all in-memory data, making WAL writes redundant.
var walCheckpointActive atomic.Bool
// WALMessage represents a single metric write to be appended to the WAL. // WALMessage represents a single metric write to be appended to the WAL.
// Cluster and Node are NOT stored in the WAL record (inferred from file path). // Cluster and Node are NOT stored in the WAL record (inferred from file path).
type WALMessage struct { type WALMessage struct {
@@ -127,7 +122,6 @@ type walFileState struct {
f *os.File f *os.File
w *bufio.Writer w *bufio.Writer
dirty bool dirty bool
size int64 // approximate bytes written (tracked from open + writes)
} }
// walFlushInterval controls how often dirty WAL files are flushed to disk. // walFlushInterval controls how often dirty WAL files are flushed to disk.
@@ -151,9 +145,6 @@ func SendWALMessage(msg *WALMessage) bool {
if walShardChs == nil || walShuttingDown.Load() { if walShardChs == nil || walShuttingDown.Load() {
return false return false
} }
if walCheckpointActive.Load() {
return true // Data safe in memory; snapshot will capture it
}
shard := walShardIndex(msg.Cluster, msg.Node) shard := walShardIndex(msg.Cluster, msg.Node)
select { select {
case walShardChs[shard] <- msg: case walShardChs[shard] <- msg:
@@ -223,11 +214,7 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) {
// Write file header magic if file is new (empty). // Write file header magic if file is new (empty).
info, err := f.Stat() info, err := f.Stat()
var fileSize int64 if err == nil && info.Size() == 0 {
if err == nil {
fileSize = info.Size()
}
if err == nil && fileSize == 0 {
var hdr [4]byte var hdr [4]byte
binary.LittleEndian.PutUint32(hdr[:], walFileMagic) binary.LittleEndian.PutUint32(hdr[:], walFileMagic)
if _, err := w.Write(hdr[:]); err != nil { if _, err := w.Write(hdr[:]); err != nil {
@@ -235,10 +222,9 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) {
f.Close() f.Close()
return nil return nil
} }
fileSize = 4
} }
ws = &walFileState{f: f, w: w, size: fileSize} ws = &walFileState{f: f, w: w}
hostFiles[hostDir] = ws hostFiles[hostDir] = ws
return ws return ws
} }
@@ -249,30 +235,9 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) {
if ws == nil { if ws == nil {
return return
} }
if err := writeWALRecordDirect(ws.w, msg); err != nil {
// Enforce max WAL size: force-rotate before writing if limit is exceeded.
// The in-memory store still holds the data; only crash-recovery coverage is lost.
if maxSize := Keys.Checkpoints.MaxWALSize; maxSize > 0 && ws.size >= maxSize {
cclog.Warnf("[METRICSTORE]> WAL: force-rotating %s (size %d >= limit %d)",
hostDir, ws.size, maxSize)
ws.w.Flush()
ws.f.Close()
walPath := path.Join(hostDir, "current.wal")
if err := os.Remove(walPath); err != nil && !os.IsNotExist(err) {
cclog.Errorf("[METRICSTORE]> WAL: remove %s: %v", walPath, err)
}
delete(hostFiles, hostDir)
ws = getOrOpenWAL(hostDir)
if ws == nil {
return
}
}
n, err := writeWALRecordDirect(ws.w, msg)
if err != nil {
cclog.Errorf("[METRICSTORE]> WAL: write record: %v", err) cclog.Errorf("[METRICSTORE]> WAL: write record: %v", err)
} }
ws.size += int64(n)
ws.dirty = true ws.dirty = true
} }
@@ -366,7 +331,6 @@ func RotateWALFiles(hostDirs []string) {
if walShardRotateChs == nil || walShuttingDown.Load() { if walShardRotateChs == nil || walShuttingDown.Load() {
return return
} }
deadline := time.After(2 * time.Minute)
dones := make([]chan struct{}, 0, len(hostDirs)) dones := make([]chan struct{}, 0, len(hostDirs))
for _, dir := range hostDirs { for _, dir := range hostDirs {
done := make(chan struct{}) done := make(chan struct{})
@@ -374,18 +338,16 @@ func RotateWALFiles(hostDirs []string) {
select { select {
case walShardRotateChs[shard] <- walRotateReq{hostDir: dir, done: done}: case walShardRotateChs[shard] <- walRotateReq{hostDir: dir, done: done}:
dones = append(dones, done) dones = append(dones, done)
case <-deadline: default:
cclog.Warnf("[METRICSTORE]> WAL rotation send timed out, %d of %d hosts remaining", // Channel full or goroutine not consuming — skip this host.
len(hostDirs)-len(dones), len(hostDirs)) cclog.Warnf("[METRICSTORE]> WAL rotation skipped for %s (channel full)", dir)
goto waitDones
} }
} }
waitDones:
for _, done := range dones { for _, done := range dones {
select { select {
case <-done: case <-done:
case <-time.After(30 * time.Second): case <-time.After(30 * time.Second):
cclog.Warn("[METRICSTORE]> WAL rotation completion timed out, continuing") cclog.Warn("[METRICSTORE]> WAL rotation timed out, continuing")
return return
} }
} }
@@ -400,9 +362,8 @@ func walShardIndexFromDir(hostDir string) int {
return walShardIndex(cluster, node) return walShardIndex(cluster, node)
} }
// RotateWALFilesAfterShutdown directly removes current.wal files for the given // RotateWALFiles sends rotation requests for the given host directories
// host directories. Used after shutdown, when WALStaging goroutines have already // and blocks until all rotations complete.
// exited and the channel-based RotateWALFiles is no longer safe to call.
func RotateWALFilesAfterShutdown(hostDirs []string) { func RotateWALFilesAfterShutdown(hostDirs []string) {
for _, dir := range hostDirs { for _, dir := range hostDirs {
walPath := path.Join(dir, "current.wal") walPath := path.Join(dir, "current.wal")
@@ -415,8 +376,7 @@ func RotateWALFilesAfterShutdown(hostDirs []string) {
// writeWALRecordDirect encodes a WAL record into a contiguous buffer first, // writeWALRecordDirect encodes a WAL record into a contiguous buffer first,
// then writes it to the bufio.Writer in a single call. This prevents partial // then writes it to the bufio.Writer in a single call. This prevents partial
// records in the write buffer if a write error occurs mid-record (e.g. disk full). // records in the write buffer if a write error occurs mid-record (e.g. disk full).
// Returns the number of bytes written and any error. func writeWALRecordDirect(w *bufio.Writer, msg *WALMessage) error {
func writeWALRecordDirect(w *bufio.Writer, msg *WALMessage) (int, error) {
// Compute payload and total record size. // Compute payload and total record size.
payloadSize := 8 + 2 + len(msg.MetricName) + 1 + 4 payloadSize := 8 + 2 + len(msg.MetricName) + 1 + 4
for _, s := range msg.Selector { for _, s := range msg.Selector {
@@ -470,8 +430,8 @@ func writeWALRecordDirect(w *bufio.Writer, msg *WALMessage) (int, error) {
binary.LittleEndian.PutUint32(buf[p:p+4], crc) binary.LittleEndian.PutUint32(buf[p:p+4], crc)
// Single atomic write to the buffered writer. // Single atomic write to the buffered writer.
n, err := w.Write(buf) _, err := w.Write(buf)
return n, err return err
} }
// readWALRecord reads one WAL record from the reader. // readWALRecord reads one WAL record from the reader.
@@ -736,6 +696,11 @@ func (m *MemoryStore) ToCheckpointWAL(dir string, from, to int64) (int, []string
atomic.AddInt32(&errs, 1) atomic.AddInt32(&errs, 1)
} else { } else {
atomic.AddInt32(&n, 1) atomic.AddInt32(&n, 1)
// Delete WAL immediately after successful snapshot.
walPath := path.Join(wi.hostDir, "current.wal")
if err := os.Remove(walPath); err != nil && !os.IsNotExist(err) {
cclog.Errorf("[METRICSTORE]> WAL remove %s: %v", walPath, err)
}
successMu.Lock() successMu.Lock()
successDirs = append(successDirs, wi.hostDir) successDirs = append(successDirs, wi.hostDir)
successMu.Unlock() successMu.Unlock()

View File

@@ -57,7 +57,7 @@
let entries = $state([]); let entries = $state([]);
let loading = $state(false); let loading = $state(false);
let error = $state(null); let error = $state(null);
let timer = null; let timer = $state(null);
function levelColor(priority) { function levelColor(priority) {
if (priority <= 2) return "danger"; if (priority <= 2) return "danger";