mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2026-03-30 12:37:30 +02:00
Compare commits
5 Commits
hotfix
...
release/v1
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5398246a61 | ||
|
|
b43c52f5b5 | ||
|
|
b7e133fbaf | ||
|
|
c3b6d93941 | ||
|
|
c13fd68aa9 |
@@ -19,16 +19,6 @@ This is also the default.
|
||||
|
||||
### Bug fixes
|
||||
|
||||
- **WAL not rotated on partial checkpoint failure**: When binary checkpointing
|
||||
failed for some hosts, WAL files for successfully checkpointed hosts were not
|
||||
rotated and the checkpoint timestamp was not advanced. Partial successes now
|
||||
correctly advance the checkpoint and rotate WAL files for completed hosts.
|
||||
- **Unbounded WAL file growth**: If binary checkpointing consistently failed for
|
||||
a host, its `current.wal` file grew without limit until disk exhaustion. A new
|
||||
`max-wal-size` configuration option (in the `checkpoints` block) allows setting
|
||||
a per-host WAL size cap in bytes. When exceeded, the WAL is force-rotated.
|
||||
Defaults to 0 (unlimited) for backward compatibility.
|
||||
|
||||
- **Doubleranged filter fixes**: Range filters now correctly handle zero as a
|
||||
boundary value. Improved validation and UI text for "more than equal" and
|
||||
"less than equal" range selections.
|
||||
@@ -60,14 +50,6 @@ This is also the default.
|
||||
- **Explicit node state queries in node view**: Node health and scheduler state
|
||||
are now fetched independently from metric data for fresher status information.
|
||||
|
||||
### New tools
|
||||
|
||||
- **binaryCheckpointReader**: New utility tool (`tools/binaryCheckpointReader`)
|
||||
that reads `.wal` or `.bin` checkpoint files produced by the metricstore
|
||||
WAL/snapshot system and dumps their contents to a human-readable `.txt` file.
|
||||
Useful for debugging and inspecting checkpoint data. Usage:
|
||||
`go run ./tools/binaryCheckpointReader <file.wal|file.bin>`
|
||||
|
||||
### Logging improvements
|
||||
|
||||
- **Reduced tagger log noise**: Missing metrics and expression evaluation errors
|
||||
|
||||
@@ -3,12 +3,6 @@
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
// This file implements the cleanup (archiving or deletion) of old checkpoint files.
|
||||
//
|
||||
// The CleanUp worker runs on a timer equal to RetentionInMemory. In "archive" mode
|
||||
// it converts checkpoint files older than the retention window into per-cluster
|
||||
// Parquet files and then deletes the originals. In "delete" mode it simply removes
|
||||
// old checkpoint files.
|
||||
package metricstore
|
||||
|
||||
import (
|
||||
@@ -25,12 +19,8 @@ import (
|
||||
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
|
||||
)
|
||||
|
||||
// CleanUp starts a background worker that periodically removes or archives
|
||||
// checkpoint files older than the configured retention window.
|
||||
//
|
||||
// In "archive" mode, old checkpoint files are converted to Parquet and stored
|
||||
// under Keys.Cleanup.RootDir. In "delete" mode they are simply removed.
|
||||
// The cleanup interval equals Keys.RetentionInMemory.
|
||||
// Worker for either Archiving or Deleting files
|
||||
|
||||
func CleanUp(wg *sync.WaitGroup, ctx context.Context) {
|
||||
if Keys.Cleanup.Mode == "archive" {
|
||||
cclog.Info("[METRICSTORE]> enable archive cleanup to parquet")
|
||||
|
||||
@@ -86,12 +86,11 @@ var (
|
||||
|
||||
// Checkpointing starts a background worker that periodically saves metric data to disk.
|
||||
//
|
||||
// The checkpoint interval is read from Keys.CheckpointInterval (default: 12 hours).
|
||||
// Checkpoints are written every 12 hours (hardcoded).
|
||||
//
|
||||
// Format behaviour:
|
||||
// - "json": Writes a JSON snapshot file per host every interval
|
||||
// - "wal": Writes a binary snapshot file per host every interval, then rotates
|
||||
// the current.wal files for all successfully checkpointed hosts
|
||||
// - "json": Periodic checkpointing every checkpointInterval
|
||||
// - "wal": Periodic binary snapshots + WAL rotation every checkpointInterval
|
||||
func Checkpointing(wg *sync.WaitGroup, ctx context.Context) {
|
||||
lastCheckpointMu.Lock()
|
||||
lastCheckpoint = time.Now()
|
||||
@@ -127,15 +126,10 @@ func Checkpointing(wg *sync.WaitGroup, ctx context.Context) {
|
||||
cclog.Infof("[METRICSTORE]> start checkpointing (starting at %s)...", from.Format(time.RFC3339))
|
||||
|
||||
if Keys.Checkpoints.FileFormat == "wal" {
|
||||
// Pause WAL writes: the binary snapshot captures all in-memory
|
||||
// data, so WAL records written during checkpoint are redundant
|
||||
// and would be deleted during rotation anyway.
|
||||
walCheckpointActive.Store(true)
|
||||
n, hostDirs, err := ms.ToCheckpointWAL(Keys.Checkpoints.RootDir, from.Unix(), now.Unix())
|
||||
if err != nil {
|
||||
cclog.Errorf("[METRICSTORE]> binary checkpointing failed: %s", err.Error())
|
||||
}
|
||||
if n > 0 {
|
||||
} else {
|
||||
cclog.Infof("[METRICSTORE]> done: %d binary snapshot files created", n)
|
||||
lastCheckpointMu.Lock()
|
||||
lastCheckpoint = now
|
||||
@@ -143,8 +137,6 @@ func Checkpointing(wg *sync.WaitGroup, ctx context.Context) {
|
||||
// Rotate WAL files for successfully checkpointed hosts.
|
||||
RotateWALFiles(hostDirs)
|
||||
}
|
||||
walCheckpointActive.Store(false)
|
||||
walDropped.Store(0)
|
||||
} else {
|
||||
n, err := ms.ToCheckpoint(Keys.Checkpoints.RootDir, from.Unix(), now.Unix())
|
||||
if err != nil {
|
||||
|
||||
@@ -59,14 +59,11 @@ const (
|
||||
// Checkpoints configures periodic persistence of in-memory metric data.
|
||||
//
|
||||
// Fields:
|
||||
// - FileFormat: "json" (human-readable, periodic) or "wal" (binary snapshot + WAL, crash-safe); default is "wal"
|
||||
// - RootDir: Filesystem path for checkpoint files (created if missing)
|
||||
// - MaxWALSize: Maximum size in bytes for a single host's WAL file; 0 = unlimited (default).
|
||||
// When exceeded the WAL is force-rotated to prevent unbounded disk growth.
|
||||
// - FileFormat: "json" (human-readable, periodic) or "wal" (binary snapshot + WAL, crash-safe); default is "wal"
|
||||
// - RootDir: Filesystem path for checkpoint files (created if missing)
|
||||
type Checkpoints struct {
|
||||
FileFormat string `json:"file-format"`
|
||||
RootDir string `json:"directory"`
|
||||
MaxWALSize int64 `json:"max-wal-size,omitempty"`
|
||||
}
|
||||
|
||||
// Debug provides development and profiling options.
|
||||
|
||||
@@ -24,11 +24,6 @@ const configSchema = `{
|
||||
"directory": {
|
||||
"description": "Path in which the checkpointed files should be placed.",
|
||||
"type": "string"
|
||||
},
|
||||
"max-wal-size": {
|
||||
"description": "Maximum size in bytes for a single host's WAL file. When exceeded the WAL is force-rotated to prevent unbounded disk growth. Only applies when file-format is 'wal'. 0 means unlimited (default).",
|
||||
"type": "integer",
|
||||
"minimum": 0
|
||||
}
|
||||
}
|
||||
},
|
||||
|
||||
@@ -8,7 +8,7 @@
|
||||
//
|
||||
// The package organizes metrics in a tree structure (cluster → host → component) and
|
||||
// provides concurrent read/write access to metric data with configurable aggregation strategies.
|
||||
// Background goroutines handle periodic checkpointing (JSON or WAL/binary format), archiving old data,
|
||||
// Background goroutines handle periodic checkpointing (JSON or Avro format), archiving old data,
|
||||
// and enforcing retention policies.
|
||||
//
|
||||
// Key features:
|
||||
@@ -16,7 +16,7 @@
|
||||
// - Hierarchical data organization (selectors)
|
||||
// - Concurrent checkpoint/archive workers
|
||||
// - Support for sum and average aggregation
|
||||
// - NATS integration for metric ingestion via InfluxDB line protocol
|
||||
// - NATS integration for metric ingestion
|
||||
package metricstore
|
||||
|
||||
import (
|
||||
@@ -113,8 +113,7 @@ type MemoryStore struct {
|
||||
// 6. Optionally subscribes to NATS for real-time metric ingestion
|
||||
//
|
||||
// Parameters:
|
||||
// - rawConfig: JSON configuration for the metric store (see MetricStoreConfig); may be nil to use defaults
|
||||
// - metrics: Map of metric names to their configurations (frequency and aggregation strategy)
|
||||
// - rawConfig: JSON configuration for the metric store (see MetricStoreConfig)
|
||||
// - wg: WaitGroup that will be incremented for each background goroutine started
|
||||
//
|
||||
// The function will call cclog.Fatal on critical errors during initialization.
|
||||
|
||||
@@ -99,11 +99,6 @@ var walStagingWg sync.WaitGroup
|
||||
// SendWALMessage from sending on a closed channel (which panics in Go).
|
||||
var walShuttingDown atomic.Bool
|
||||
|
||||
// walCheckpointActive is set during binary checkpoint writes.
|
||||
// While active, SendWALMessage skips sending (returns true) because the
|
||||
// snapshot captures all in-memory data, making WAL writes redundant.
|
||||
var walCheckpointActive atomic.Bool
|
||||
|
||||
// WALMessage represents a single metric write to be appended to the WAL.
|
||||
// Cluster and Node are NOT stored in the WAL record (inferred from file path).
|
||||
type WALMessage struct {
|
||||
@@ -127,7 +122,6 @@ type walFileState struct {
|
||||
f *os.File
|
||||
w *bufio.Writer
|
||||
dirty bool
|
||||
size int64 // approximate bytes written (tracked from open + writes)
|
||||
}
|
||||
|
||||
// walFlushInterval controls how often dirty WAL files are flushed to disk.
|
||||
@@ -151,9 +145,6 @@ func SendWALMessage(msg *WALMessage) bool {
|
||||
if walShardChs == nil || walShuttingDown.Load() {
|
||||
return false
|
||||
}
|
||||
if walCheckpointActive.Load() {
|
||||
return true // Data safe in memory; snapshot will capture it
|
||||
}
|
||||
shard := walShardIndex(msg.Cluster, msg.Node)
|
||||
select {
|
||||
case walShardChs[shard] <- msg:
|
||||
@@ -223,11 +214,7 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) {
|
||||
|
||||
// Write file header magic if file is new (empty).
|
||||
info, err := f.Stat()
|
||||
var fileSize int64
|
||||
if err == nil {
|
||||
fileSize = info.Size()
|
||||
}
|
||||
if err == nil && fileSize == 0 {
|
||||
if err == nil && info.Size() == 0 {
|
||||
var hdr [4]byte
|
||||
binary.LittleEndian.PutUint32(hdr[:], walFileMagic)
|
||||
if _, err := w.Write(hdr[:]); err != nil {
|
||||
@@ -235,10 +222,9 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) {
|
||||
f.Close()
|
||||
return nil
|
||||
}
|
||||
fileSize = 4
|
||||
}
|
||||
|
||||
ws = &walFileState{f: f, w: w, size: fileSize}
|
||||
ws = &walFileState{f: f, w: w}
|
||||
hostFiles[hostDir] = ws
|
||||
return ws
|
||||
}
|
||||
@@ -249,30 +235,9 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) {
|
||||
if ws == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Enforce max WAL size: force-rotate before writing if limit is exceeded.
|
||||
// The in-memory store still holds the data; only crash-recovery coverage is lost.
|
||||
if maxSize := Keys.Checkpoints.MaxWALSize; maxSize > 0 && ws.size >= maxSize {
|
||||
cclog.Warnf("[METRICSTORE]> WAL: force-rotating %s (size %d >= limit %d)",
|
||||
hostDir, ws.size, maxSize)
|
||||
ws.w.Flush()
|
||||
ws.f.Close()
|
||||
walPath := path.Join(hostDir, "current.wal")
|
||||
if err := os.Remove(walPath); err != nil && !os.IsNotExist(err) {
|
||||
cclog.Errorf("[METRICSTORE]> WAL: remove %s: %v", walPath, err)
|
||||
}
|
||||
delete(hostFiles, hostDir)
|
||||
ws = getOrOpenWAL(hostDir)
|
||||
if ws == nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
n, err := writeWALRecordDirect(ws.w, msg)
|
||||
if err != nil {
|
||||
if err := writeWALRecordDirect(ws.w, msg); err != nil {
|
||||
cclog.Errorf("[METRICSTORE]> WAL: write record: %v", err)
|
||||
}
|
||||
ws.size += int64(n)
|
||||
ws.dirty = true
|
||||
}
|
||||
|
||||
@@ -366,7 +331,6 @@ func RotateWALFiles(hostDirs []string) {
|
||||
if walShardRotateChs == nil || walShuttingDown.Load() {
|
||||
return
|
||||
}
|
||||
deadline := time.After(2 * time.Minute)
|
||||
dones := make([]chan struct{}, 0, len(hostDirs))
|
||||
for _, dir := range hostDirs {
|
||||
done := make(chan struct{})
|
||||
@@ -374,18 +338,16 @@ func RotateWALFiles(hostDirs []string) {
|
||||
select {
|
||||
case walShardRotateChs[shard] <- walRotateReq{hostDir: dir, done: done}:
|
||||
dones = append(dones, done)
|
||||
case <-deadline:
|
||||
cclog.Warnf("[METRICSTORE]> WAL rotation send timed out, %d of %d hosts remaining",
|
||||
len(hostDirs)-len(dones), len(hostDirs))
|
||||
goto waitDones
|
||||
default:
|
||||
// Channel full or goroutine not consuming — skip this host.
|
||||
cclog.Warnf("[METRICSTORE]> WAL rotation skipped for %s (channel full)", dir)
|
||||
}
|
||||
}
|
||||
waitDones:
|
||||
for _, done := range dones {
|
||||
select {
|
||||
case <-done:
|
||||
case <-time.After(30 * time.Second):
|
||||
cclog.Warn("[METRICSTORE]> WAL rotation completion timed out, continuing")
|
||||
cclog.Warn("[METRICSTORE]> WAL rotation timed out, continuing")
|
||||
return
|
||||
}
|
||||
}
|
||||
@@ -400,9 +362,8 @@ func walShardIndexFromDir(hostDir string) int {
|
||||
return walShardIndex(cluster, node)
|
||||
}
|
||||
|
||||
// RotateWALFilesAfterShutdown directly removes current.wal files for the given
|
||||
// host directories. Used after shutdown, when WALStaging goroutines have already
|
||||
// exited and the channel-based RotateWALFiles is no longer safe to call.
|
||||
// RotateWALFiles sends rotation requests for the given host directories
|
||||
// and blocks until all rotations complete.
|
||||
func RotateWALFilesAfterShutdown(hostDirs []string) {
|
||||
for _, dir := range hostDirs {
|
||||
walPath := path.Join(dir, "current.wal")
|
||||
@@ -415,8 +376,7 @@ func RotateWALFilesAfterShutdown(hostDirs []string) {
|
||||
// writeWALRecordDirect encodes a WAL record into a contiguous buffer first,
|
||||
// then writes it to the bufio.Writer in a single call. This prevents partial
|
||||
// records in the write buffer if a write error occurs mid-record (e.g. disk full).
|
||||
// Returns the number of bytes written and any error.
|
||||
func writeWALRecordDirect(w *bufio.Writer, msg *WALMessage) (int, error) {
|
||||
func writeWALRecordDirect(w *bufio.Writer, msg *WALMessage) error {
|
||||
// Compute payload and total record size.
|
||||
payloadSize := 8 + 2 + len(msg.MetricName) + 1 + 4
|
||||
for _, s := range msg.Selector {
|
||||
@@ -470,8 +430,8 @@ func writeWALRecordDirect(w *bufio.Writer, msg *WALMessage) (int, error) {
|
||||
binary.LittleEndian.PutUint32(buf[p:p+4], crc)
|
||||
|
||||
// Single atomic write to the buffered writer.
|
||||
n, err := w.Write(buf)
|
||||
return n, err
|
||||
_, err := w.Write(buf)
|
||||
return err
|
||||
}
|
||||
|
||||
// readWALRecord reads one WAL record from the reader.
|
||||
@@ -736,6 +696,11 @@ func (m *MemoryStore) ToCheckpointWAL(dir string, from, to int64) (int, []string
|
||||
atomic.AddInt32(&errs, 1)
|
||||
} else {
|
||||
atomic.AddInt32(&n, 1)
|
||||
// Delete WAL immediately after successful snapshot.
|
||||
walPath := path.Join(wi.hostDir, "current.wal")
|
||||
if err := os.Remove(walPath); err != nil && !os.IsNotExist(err) {
|
||||
cclog.Errorf("[METRICSTORE]> WAL remove %s: %v", walPath, err)
|
||||
}
|
||||
successMu.Lock()
|
||||
successDirs = append(successDirs, wi.hostDir)
|
||||
successMu.Unlock()
|
||||
|
||||
@@ -57,7 +57,7 @@
|
||||
let entries = $state([]);
|
||||
let loading = $state(false);
|
||||
let error = $state(null);
|
||||
let timer = null;
|
||||
let timer = $state(null);
|
||||
|
||||
function levelColor(priority) {
|
||||
if (priority <= 2) return "danger";
|
||||
|
||||
Reference in New Issue
Block a user