Merge branch 'main' into feature/526-average-resample

This commit is contained in:
2026-06-17 06:30:54 +02:00
69 changed files with 5681 additions and 5268 deletions

View File

@@ -198,36 +198,19 @@ func GetSubCluster(cluster, subcluster string) (*schema.SubCluster, error) {
func GetMetricConfigSubCluster(cluster, subcluster string) map[string]*schema.Metric {
metrics := make(map[string]*schema.Metric)
for _, c := range Clusters {
if c.Name == cluster {
for _, m := range c.MetricConfig {
for _, s := range m.SubClusters {
if s.Name == subcluster {
metrics[m.Name] = &schema.Metric{
Name: m.Name,
Unit: s.Unit,
Peak: s.Peak,
Normal: s.Normal,
Caution: s.Caution,
Alert: s.Alert,
}
break
}
}
sc, err := GetSubCluster(cluster, subcluster)
if err != nil {
return metrics
}
_, ok := metrics[m.Name]
if !ok {
metrics[m.Name] = &schema.Metric{
Name: m.Name,
Unit: m.Unit,
Peak: m.Peak,
Normal: m.Normal,
Caution: m.Caution,
Alert: m.Alert,
}
}
}
break
for _, m := range sc.MetricConfig {
metrics[m.Name] = &schema.Metric{
Name: m.Name,
Unit: m.Unit,
Peak: m.Peak,
Normal: m.Normal,
Caution: m.Caution,
Alert: m.Alert,
}
}

View File

@@ -37,3 +37,27 @@ func TestClusterConfig(t *testing.T) {
// spew.Dump(archive.GlobalMetricList)
// t.Fail()
}
func TestGetMetricConfigSubClusterRespectsRemovedMetrics(t *testing.T) {
if err := archive.Init(json.RawMessage(`{"kind": "file","path": "testdata/archive"}`)); err != nil {
t.Fatal(err)
}
sc, err := archive.GetSubCluster("fritz", "spr2tb")
if err != nil {
t.Fatal(err)
}
metrics := archive.GetMetricConfigSubCluster("fritz", "spr2tb")
if len(metrics) != len(sc.MetricConfig) {
t.Fatalf("GetMetricConfigSubCluster() returned %d metrics, want %d", len(metrics), len(sc.MetricConfig))
}
if _, ok := metrics["flops_any"]; ok {
t.Fatalf("GetMetricConfigSubCluster() returned removed metric flops_any for subcluster spr2tb")
}
if _, ok := metrics["cpu_power"]; !ok {
t.Fatalf("GetMetricConfigSubCluster() missing active metric cpu_power for subcluster spr2tb")
}
}

View File

@@ -133,10 +133,10 @@ func (pt *prefixedTarget) WriteFile(name string, data []byte) error {
// ClusterAwareParquetWriter organizes Parquet output by cluster.
// Each cluster gets its own subdirectory with a cluster.json config file.
type ClusterAwareParquetWriter struct {
target ParquetTarget
maxSizeMB int
writers map[string]*ParquetWriter
clusterCfgs map[string]*schema.Cluster
target ParquetTarget
maxSizeMB int
writers map[string]*ParquetWriter
clusterCfgs map[string]*schema.Cluster
}
// NewClusterAwareParquetWriter creates a writer that routes jobs to per-cluster ParquetWriters.

View File

@@ -79,7 +79,7 @@ type APIQueryResponse struct {
// - Type + TypeIds: First level of hierarchy (e.g., "cpu" + ["0", "1", "2"])
// - SubType + SubTypeIds: Second level of hierarchy (e.g., "core" + ["0", "1"])
//
// If Aggregate is true, data from multiple type/subtype IDs will be aggregated according
// If Aggregate is true, data from multiple type/stype IDs will be aggregated according
// to the metric's aggregation strategy. Otherwise, separate results are returned for each combination.
type APIQuery struct {
Type *string `json:"type,omitempty"`
@@ -175,13 +175,13 @@ func (data *APIMetricData) PadDataWithNull(ms *MemoryStore, from, to int64, metr
// This is the primary API for retrieving metric data from the memory store. It supports:
// - Individual queries via req.Queries
// - Batch queries for all nodes via req.ForAllNodes
// - Hierarchical selector construction (cluster → host → type → subtype)
// - Hierarchical selector construction (cluster → host → type → stype)
// - Optional statistics computation (avg, min, max)
// - Optional data scaling
// - Optional data padding with NaN values
//
// The function constructs selectors based on the query parameters and calls MemoryStore.Read()
// for each selector. If a query specifies Aggregate=false with multiple type/subtype IDs,
// for each selector. If a query specifies Aggregate=false with multiple type/stype IDs,
// separate results are returned for each combination.
//
// Parameters:

View File

@@ -3,6 +3,12 @@
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
// This file implements the cleanup (archiving or deletion) of old checkpoint files.
//
// The CleanUp worker runs on a timer equal to RetentionInMemory. In "archive" mode
// it converts checkpoint files older than the retention window into per-cluster
// Parquet files and then deletes the originals. In "delete" mode it simply removes
// old checkpoint files.
package metricstore
import (
@@ -11,6 +17,7 @@ import (
"fmt"
"os"
"path/filepath"
"strings"
"sync"
"sync/atomic"
"time"
@@ -18,10 +25,15 @@ import (
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
)
// Worker for either Archiving or Deleting files
// CleanUp starts a background worker that periodically removes or archives
// checkpoint files older than the configured retention window.
//
// In "archive" mode, old checkpoint files are converted to Parquet and stored
// under Keys.Cleanup.RootDir. In "delete" mode they are simply removed.
// The cleanup interval equals Keys.RetentionInMemory.
func CleanUp(wg *sync.WaitGroup, ctx context.Context) {
if Keys.Cleanup.Mode == "archive" {
cclog.Info("[METRICSTORE]> enable archive cleanup to parquet")
// Run as Archiver
cleanUpWorker(wg, ctx,
Keys.RetentionInMemory,
@@ -43,7 +55,6 @@ func CleanUp(wg *sync.WaitGroup, ctx context.Context) {
// cleanUpWorker takes simple values to configure what it does
func cleanUpWorker(wg *sync.WaitGroup, ctx context.Context, interval string, mode string, cleanupDir string, delete bool) {
wg.Go(func() {
d, err := time.ParseDuration(interval)
if err != nil {
cclog.Fatalf("[METRICSTORE]> error parsing %s interval duration: %v\n", mode, err)
@@ -99,8 +110,8 @@ func deleteCheckpoints(checkpointsDir string, from int64) (int, error) {
}
type workItem struct {
dir string
cluster, host string
dir string
cluster, host string
}
var wg sync.WaitGroup
@@ -181,6 +192,7 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err
}
totalFiles := 0
var clusterErrors []string
for _, clusterEntry := range clusterEntries {
if !clusterEntry.IsDir() {
@@ -190,7 +202,9 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err
cluster := clusterEntry.Name()
hostEntries, err := os.ReadDir(filepath.Join(checkpointsDir, cluster))
if err != nil {
return totalFiles, err
cclog.Errorf("[METRICSTORE]> error reading host entries for cluster %s: %s", cluster, err.Error())
clusterErrors = append(clusterErrors, cluster)
continue
}
// Workers load checkpoint files from disk; main thread writes to parquet.
@@ -255,7 +269,9 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err
// Drain results channel to unblock workers
for range results {
}
return totalFiles, fmt.Errorf("creating parquet writer for cluster %s: %w", cluster, err)
cclog.Errorf("[METRICSTORE]> error creating parquet writer for cluster %s: %s", cluster, err.Error())
clusterErrors = append(clusterErrors, cluster)
continue
}
type deleteItem struct {
@@ -275,6 +291,12 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err
break
}
}
// Flush once per host to keep row group count within parquet limits.
if writeErr == nil {
if err := writer.FlushRowGroup(); err != nil {
writeErr = err
}
}
}
// Always track files for deletion (even if write failed, we still drain)
toDelete = append(toDelete, deleteItem{dir: r.dir, files: r.files})
@@ -285,7 +307,10 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err
}
if errs > 0 {
return totalFiles, fmt.Errorf("%d errors reading checkpoints for cluster %s", errs, cluster)
cclog.Errorf("[METRICSTORE]> %d errors reading checkpoints for cluster %s", errs, cluster)
clusterErrors = append(clusterErrors, cluster)
os.Remove(parquetFile)
continue
}
if writer.count == 0 {
@@ -296,7 +321,9 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err
if writeErr != nil {
os.Remove(parquetFile)
return totalFiles, fmt.Errorf("writing parquet archive for cluster %s: %w", cluster, writeErr)
cclog.Errorf("[METRICSTORE]> error writing parquet archive for cluster %s: %s", cluster, writeErr.Error())
clusterErrors = append(clusterErrors, cluster)
continue
}
// Delete archived checkpoint files
@@ -316,5 +343,10 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err
}
cclog.Infof("[METRICSTORE]> archiving checkpoints completed in %s (%d files)", time.Since(startTime).Round(time.Millisecond), totalFiles)
if len(clusterErrors) > 0 {
return totalFiles, fmt.Errorf("archiving failed for clusters: %s", strings.Join(clusterErrors, ", "))
}
return totalFiles, nil
}

View File

@@ -146,7 +146,9 @@ var (
// ErrDataDoesNotAlign indicates that aggregated data from child scopes
// does not align with the parent scope's expected timestamps/intervals.
ErrDataDoesNotAlign error = errors.New("[METRICSTORE]> data from lower granularities does not align")
ErrDataDoesNotAlignMissingFront error = errors.New("[METRICSTORE]> data from lower granularities does not align (missing data prior to start of the buffers)")
ErrDataDoesNotAlignMissingBack error = errors.New("[METRICSTORE]> data from lower granularities does not align (missing data after the end of the buffers)")
ErrDataDoesNotAlignDataLenMismatch error = errors.New("[METRICSTORE]> data from lower granularities does not align (collected data length is different than expected data length)")
)
// buffer stores time-series data for a single metric at a specific hierarchical level.

View File

@@ -86,11 +86,12 @@ var (
// Checkpointing starts a background worker that periodically saves metric data to disk.
//
// Checkpoints are written every 12 hours (hardcoded).
// The checkpoint interval is read from Keys.CheckpointInterval (default: 12 hours).
//
// Format behaviour:
// - "json": Periodic checkpointing every checkpointInterval
// - "wal": Periodic binary snapshots + WAL rotation every checkpointInterval
// - "json": Writes a JSON snapshot file per host every interval
// - "wal": Writes a binary snapshot file per host every interval, then rotates
// the current.wal files for all successfully checkpointed hosts
func Checkpointing(wg *sync.WaitGroup, ctx context.Context) {
lastCheckpointMu.Lock()
lastCheckpoint = time.Now()
@@ -99,7 +100,6 @@ func Checkpointing(wg *sync.WaitGroup, ctx context.Context) {
ms := GetMemoryStore()
wg.Go(func() {
d := 12 * time.Hour // default checkpoint interval
if Keys.CheckpointInterval != "" {
parsed, err := time.ParseDuration(Keys.CheckpointInterval)
@@ -126,10 +126,15 @@ func Checkpointing(wg *sync.WaitGroup, ctx context.Context) {
cclog.Infof("[METRICSTORE]> start checkpointing (starting at %s)...", from.Format(time.RFC3339))
if Keys.Checkpoints.FileFormat == "wal" {
// Pause WAL writes: the binary snapshot captures all in-memory
// data, so WAL records written during checkpoint are redundant
// and would be deleted during rotation anyway.
walCheckpointActive.Store(true)
n, hostDirs, err := ms.ToCheckpointWAL(Keys.Checkpoints.RootDir, from.Unix(), now.Unix())
if err != nil {
cclog.Errorf("[METRICSTORE]> binary checkpointing failed: %s", err.Error())
} else {
}
if n > 0 {
cclog.Infof("[METRICSTORE]> done: %d binary snapshot files created", n)
lastCheckpointMu.Lock()
lastCheckpoint = now
@@ -137,6 +142,8 @@ func Checkpointing(wg *sync.WaitGroup, ctx context.Context) {
// Rotate WAL files for successfully checkpointed hosts.
RotateWALFiles(hostDirs)
}
walCheckpointActive.Store(false)
walDropped.Store(0)
} else {
n, err := ms.ToCheckpoint(Keys.Checkpoints.RootDir, from.Unix(), now.Unix())
if err != nil {

View File

@@ -59,11 +59,14 @@ const (
// Checkpoints configures periodic persistence of in-memory metric data.
//
// Fields:
// - FileFormat: "json" (human-readable, periodic) or "wal" (binary snapshot + WAL, crash-safe); default is "wal"
// - RootDir: Filesystem path for checkpoint files (created if missing)
// - FileFormat: "json" (human-readable, periodic) or "wal" (binary snapshot + WAL, crash-safe); default is "wal"
// - RootDir: Filesystem path for checkpoint files (created if missing)
// - MaxWALSize: Maximum size in bytes for a single host's WAL file; 0 = unlimited (default).
// When exceeded the WAL is force-rotated to prevent unbounded disk growth.
type Checkpoints struct {
FileFormat string `json:"file-format"`
RootDir string `json:"directory"`
MaxWALSize int64 `json:"max-wal-size,omitempty"`
}
// Debug provides development and profiling options.

View File

@@ -24,6 +24,11 @@ const configSchema = `{
"directory": {
"description": "Path in which the checkpointed files should be placed.",
"type": "string"
},
"max-wal-size": {
"description": "Maximum size in bytes for a single host's WAL file. When exceeded the WAL is force-rotated to prevent unbounded disk growth. Only applies when file-format is 'wal'. 0 means unlimited (default).",
"type": "integer",
"minimum": 0
}
}
},

View File

@@ -6,11 +6,11 @@
// This file implements ingestion of InfluxDB line-protocol metric data received
// over NATS. Each line encodes one metric sample with the following structure:
//
// <measurement>[,cluster=<c>][,hostname=<h>][,type=<t>][,type-id=<id>][,subtype=<s>][,stype-id=<id>] value=<v> [<timestamp>]
// <measurement>[,cluster=<c>][,hostname=<h>][,type=<t>][,type-id=<id>][,stype=<s>][,stype-id=<id>] value=<v> [<timestamp>]
//
// The measurement name identifies the metric (e.g. "cpu_load"). Tags provide
// routing information (cluster, host) and optional sub-device selectors (type,
// subtype). Only one field is expected per line: "value".
// stype). Only one field is expected per line: "value".
//
// After decoding, each sample is:
// 1. Written to the in-memory store via ms.WriteToLevel.
@@ -22,6 +22,7 @@ import (
"bytes"
"context"
"fmt"
"strings"
"sync"
"sync/atomic"
"time"
@@ -103,7 +104,7 @@ func ReceiveNats(ms *MemoryStore,
// reorder prepends prefix to buf in-place when buf has enough spare capacity,
// avoiding an allocation. Falls back to a regular append otherwise.
//
// It is used to assemble the "type<type-id>" and "subtype<stype-id>" selector
// It is used to assemble the "type<type-id>" and "stype<stype-id>" selector
// strings when the type tag arrives before the type-id tag in the line, so the
// two byte slices need to be concatenated in tag-declaration order regardless
// of wire order.
@@ -145,7 +146,7 @@ type decodeState struct {
// current line. Reset at the start of each line's tag-decode loop.
typeBuf []byte
// subTypeBuf accumulates the concatenated "subtype"+"stype-id" tag value.
// subTypeBuf accumulates the concatenated "stype"+"stype-id" tag value.
// Reset at the start of each line's tag-decode loop.
subTypeBuf []byte
@@ -186,7 +187,7 @@ var decodeStatePool = sync.Pool{
// - The Level pointer (host-level node in the metric tree) is cached across
// consecutive lines that share the same cluster+host pair to avoid
// repeated lock acquisitions on the root and cluster levels.
// - []byte→string conversions for type/subtype selectors are cached via
// - []byte→string conversions for type/stype selectors are cached via
// prevType*/prevSubType* fields because batches typically repeat the same
// sub-device identifiers.
// - Timestamp parsing tries Second precision first; if that fails it retries
@@ -196,6 +197,16 @@ var decodeStatePool = sync.Pool{
// When the checkpoint format is "wal" each successfully decoded sample is also
// sent to WALMessages so the WAL staging goroutine can persist it durably
// before the next binary snapshot.
// isValidPathComponent reports whether s is safe to use as a single filesystem
// path component (cluster or host name) when building checkpoint/WAL paths. It
// rejects path-traversal sequences and embedded separators. An empty string is
// allowed because a missing host tag is legitimate and does not escape the root.
func isValidPathComponent(s string) bool {
return !strings.Contains(s, "..") &&
!strings.Contains(s, "/") &&
!strings.Contains(s, "\\")
}
func DecodeLine(dec *lineprotocol.Decoder,
ms *MemoryStore,
clusterDefault string,
@@ -269,8 +280,8 @@ func DecodeLine(dec *lineprotocol.Decoder,
}
case "type-id":
st.typeBuf = append(st.typeBuf, val...)
case "subtype":
// We cannot be sure that the "subtype" tag comes before the "stype-id" tag:
case "stype":
// We cannot be sure that the "stype" tag comes before the "stype-id" tag:
if len(st.subTypeBuf) == 0 {
st.subTypeBuf = append(st.subTypeBuf, val...)
} else {
@@ -282,6 +293,17 @@ func DecodeLine(dec *lineprotocol.Decoder,
}
}
// cluster and host are taken verbatim from attacker-controllable line
// protocol tags and are later used as filesystem path components for the
// checkpoint/WAL files (path.Join(RootDir, cluster, host)). Reject
// path-traversal sequences so a malicious tag cannot escape the
// checkpoint root. Skip the offending sample; the rest of the batch is
// still processed.
if !isValidPathComponent(cluster) || !isValidPathComponent(host) {
cclog.Warnf("[METRICSTORE]> rejecting metric with invalid cluster/host tag (cluster=%q host=%q)", cluster, host)
continue
}
// If the cluster or host changed, the lvl was set to nil
if lvl == nil {
st.selector = st.selector[:2]
@@ -291,7 +313,7 @@ func DecodeLine(dec *lineprotocol.Decoder,
}
// subtypes: cache []byte→string conversions; messages in a batch typically
// share the same type/subtype so the hit rate is very high.
// share the same type/stype so the hit rate is very high.
st.selector = st.selector[:0]
if len(st.typeBuf) > 0 {
if !bytes.Equal(st.typeBuf, st.prevTypeBytes) {

View File

@@ -8,7 +8,7 @@
//
// The package organizes metrics in a tree structure (cluster → host → component) and
// provides concurrent read/write access to metric data with configurable aggregation strategies.
// Background goroutines handle periodic checkpointing (JSON or Avro format), archiving old data,
// Background goroutines handle periodic checkpointing (JSON or WAL/binary format), archiving old data,
// and enforcing retention policies.
//
// Key features:
@@ -16,7 +16,7 @@
// - Hierarchical data organization (selectors)
// - Concurrent checkpoint/archive workers
// - Support for sum and average aggregation
// - NATS integration for metric ingestion
// - NATS integration for metric ingestion via InfluxDB line protocol
package metricstore
import (
@@ -113,7 +113,8 @@ type MemoryStore struct {
// 6. Optionally subscribes to NATS for real-time metric ingestion
//
// Parameters:
// - rawConfig: JSON configuration for the metric store (see MetricStoreConfig)
// - rawConfig: JSON configuration for the metric store (see MetricStoreConfig); may be nil to use defaults
// - metrics: Map of metric names to their configurations (frequency and aggregation strategy)
// - wg: WaitGroup that will be incremented for each background goroutine started
//
// The function will call cclog.Fatal on critical errors during initialization.
@@ -271,19 +272,37 @@ func (ms *MemoryStore) SetNodeProvider(provider NodeProvider) {
//
// Note: This function blocks until the final checkpoint is written.
func Shutdown() {
totalStart := time.Now()
shutdownFuncMu.Lock()
defer shutdownFuncMu.Unlock()
if shutdownFunc != nil {
shutdownFunc()
if shutdownFunc == nil {
// Already shut down (or never initialized): nothing to do. This keeps
// Shutdown idempotent so it is safe to call from more than one path.
shutdownFuncMu.Unlock()
return
}
shutdownFunc()
shutdownFunc = nil
shutdownFuncMu.Unlock()
cclog.Infof("[METRICSTORE]> Background workers cancelled (%v)", time.Since(totalStart))
if Keys.Checkpoints.FileFormat == "wal" {
// Signal producers to stop sending before closing channels,
// preventing send-on-closed-channel panics from in-flight NATS workers.
walShuttingDown.Store(true)
// Brief grace period for in-flight DecodeLine calls to complete.
time.Sleep(100 * time.Millisecond)
for _, ch := range walShardChs {
close(ch)
}
drainStart := time.Now()
WaitForWALStagingDrain()
cclog.Infof("[METRICSTORE]> WAL staging goroutines exited (%v)", time.Since(drainStart))
}
cclog.Infof("[METRICSTORE]> Writing to '%s'...\n", Keys.Checkpoints.RootDir)
cclog.Infof("[METRICSTORE]> Writing checkpoint to '%s'...", Keys.Checkpoints.RootDir)
checkpointStart := time.Now()
var files int
var err error
@@ -294,19 +313,22 @@ func Shutdown() {
lastCheckpointMu.Unlock()
if Keys.Checkpoints.FileFormat == "wal" {
var hostDirs []string
files, hostDirs, err = ms.ToCheckpointWAL(Keys.Checkpoints.RootDir, from.Unix(), time.Now().Unix())
if err == nil {
RotateWALFilesAfterShutdown(hostDirs)
}
var successDirs []string
files, successDirs, err = ms.ToCheckpointWAL(Keys.Checkpoints.RootDir, from.Unix(), time.Now().Unix())
// The final binary snapshot now captures all in-memory data for these
// hosts, making their current.wal redundant. The staging goroutines have
// already exited, so remove the WAL files directly (the channel-based
// RotateWALFiles is no longer safe to call). Without this, current.wal
// files survive shutdown and keep growing across restarts.
RotateWALFilesAfterShutdown(successDirs)
} else {
files, err = ms.ToCheckpoint(Keys.Checkpoints.RootDir, from.Unix(), time.Now().Unix())
}
if err != nil {
cclog.Errorf("[METRICSTORE]> Writing checkpoint failed: %s\n", err.Error())
cclog.Errorf("[METRICSTORE]> Writing checkpoint failed: %s", err.Error())
}
cclog.Infof("[METRICSTORE]> Done! (%d files written)\n", files)
cclog.Infof("[METRICSTORE]> Done! (%d files written in %v, total shutdown: %v)", files, time.Since(checkpointStart), time.Since(totalStart))
}
// Retention starts a background goroutine that periodically frees old metric data.
@@ -702,16 +724,16 @@ func (m *MemoryStore) Read(selector util.Selector, metric string, from, to, reso
} else if from != cfrom || to != cto || len(data) != len(cdata) {
missingfront, missingback := int((from-cfrom)/minfo.Frequency), int((to-cto)/minfo.Frequency)
if missingfront != 0 {
return ErrDataDoesNotAlign
return ErrDataDoesNotAlignMissingFront
}
newlen := len(cdata) - missingback
if newlen < 1 {
return ErrDataDoesNotAlign
return ErrDataDoesNotAlignMissingBack
}
cdata = cdata[0:newlen]
if len(cdata) != len(data) {
return ErrDataDoesNotAlign
return ErrDataDoesNotAlignDataLenMismatch
}
from, to = cfrom, cto

View File

@@ -99,7 +99,7 @@ func newParquetArchiveWriter(filename string) (*parquetArchiveWriter, error) {
// WriteCheckpointFile streams a CheckpointFile tree directly to Parquet rows,
// writing metrics in sorted order without materializing all rows in memory.
// Produces one row group per call (typically one host's data).
// Call FlushRowGroup() after writing all checkpoint files for a host.
func (w *parquetArchiveWriter) WriteCheckpointFile(cf *CheckpointFile, cluster, hostname, scope, scopeID string) error {
w.writeLevel(cf, cluster, hostname, scope, scopeID)
@@ -112,10 +112,15 @@ func (w *parquetArchiveWriter) WriteCheckpointFile(cf *CheckpointFile, cluster,
w.batch = w.batch[:0]
}
return nil
}
// FlushRowGroup flushes the current row group to the Parquet file.
// Should be called once per host after all checkpoint files for that host are written.
func (w *parquetArchiveWriter) FlushRowGroup() error {
if err := w.writer.Flush(); err != nil {
return fmt.Errorf("flushing parquet row group: %w", err)
}
return nil
}

View File

@@ -91,8 +91,10 @@ func (m *MemoryStore) Stats(selector util.Selector, metric string, from, to int6
if n == 0 {
from, to = cfrom, cto
} else if from != cfrom || to != cto {
return ErrDataDoesNotAlign
} else if from != cfrom {
return ErrDataDoesNotAlignMissingFront
} else if to != cto {
return ErrDataDoesNotAlignMissingBack
}
samples += stats.Samples

View File

@@ -69,6 +69,7 @@ import (
"strings"
"sync"
"sync/atomic"
"time"
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
"github.com/ClusterCockpit/cc-lib/v2/schema"
@@ -91,6 +92,18 @@ var walShardRotateChs []chan walRotateReq
// walNumShards stores the number of shards (set during WALStaging init).
var walNumShards int
// walStagingWg tracks WALStaging goroutine exits for shutdown synchronization.
var walStagingWg sync.WaitGroup
// walShuttingDown is set before closing shard channels to prevent
// SendWALMessage from sending on a closed channel (which panics in Go).
var walShuttingDown atomic.Bool
// walCheckpointActive is set during binary checkpoint writes.
// While active, SendWALMessage skips sending (returns true) because the
// snapshot captures all in-memory data, making WAL writes redundant.
var walCheckpointActive atomic.Bool
// WALMessage represents a single metric write to be appended to the WAL.
// Cluster and Node are NOT stored in the WAL record (inferred from file path).
type WALMessage struct {
@@ -111,10 +124,17 @@ type walRotateReq struct {
// walFileState holds an open WAL file handle and buffered writer for one host directory.
type walFileState struct {
f *os.File
w *bufio.Writer
f *os.File
w *bufio.Writer
dirty bool
size int64 // approximate bytes written (tracked from open + writes)
}
// walFlushInterval controls how often dirty WAL files are flushed to disk.
// Decoupling flushes from message processing lets the consumer run at memory
// speed, amortizing syscall overhead across many writes.
const walFlushInterval = 1 * time.Second
// walShardIndex computes which shard a message belongs to based on cluster+node.
// Uses FNV-1a hash for fast, well-distributed mapping.
func walShardIndex(cluster, node string) int {
@@ -126,11 +146,14 @@ func walShardIndex(cluster, node string) int {
}
// SendWALMessage routes a WAL message to the appropriate shard channel.
// Returns false if the channel is full (message dropped).
// Returns false if the channel is full or shutdown is in progress.
func SendWALMessage(msg *WALMessage) bool {
if walShardChs == nil {
if walShardChs == nil || walShuttingDown.Load() {
return false
}
if walCheckpointActive.Load() {
return true // Data safe in memory; snapshot will capture it
}
shard := walShardIndex(msg.Cluster, msg.Node)
select {
case walShardChs[shard] <- msg:
@@ -164,7 +187,9 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) {
msgCh := walShardChs[i]
rotateCh := walShardRotateChs[i]
walStagingWg.Add(1)
wg.Go(func() {
defer walStagingWg.Done()
hostFiles := make(map[string]*walFileState)
defer func() {
@@ -198,7 +223,11 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) {
// Write file header magic if file is new (empty).
info, err := f.Stat()
if err == nil && info.Size() == 0 {
var fileSize int64
if err == nil {
fileSize = info.Size()
}
if err == nil && fileSize == 0 {
var hdr [4]byte
binary.LittleEndian.PutUint32(hdr[:], walFileMagic)
if _, err := w.Write(hdr[:]); err != nil {
@@ -206,9 +235,10 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) {
f.Close()
return nil
}
fileSize = 4
}
ws = &walFileState{f: f, w: w}
ws = &walFileState{f: f, w: w, size: fileSize}
hostFiles[hostDir] = ws
return ws
}
@@ -219,9 +249,31 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) {
if ws == nil {
return
}
if err := writeWALRecordDirect(ws.w, msg); err != nil {
// Enforce max WAL size: force-rotate before writing if limit is exceeded.
// The in-memory store still holds the data; only crash-recovery coverage is lost.
if maxSize := Keys.Checkpoints.MaxWALSize; maxSize > 0 && ws.size >= maxSize {
cclog.Warnf("[METRICSTORE]> WAL: force-rotating %s (size %d >= limit %d)",
hostDir, ws.size, maxSize)
ws.w.Flush()
ws.f.Close()
walPath := path.Join(hostDir, "current.wal")
if err := os.Remove(walPath); err != nil && !os.IsNotExist(err) {
cclog.Errorf("[METRICSTORE]> WAL: remove %s: %v", walPath, err)
}
delete(hostFiles, hostDir)
ws = getOrOpenWAL(hostDir)
if ws == nil {
return
}
}
n, err := writeWALRecordDirect(ws.w, msg)
if err != nil {
cclog.Errorf("[METRICSTORE]> WAL: write record: %v", err)
}
ws.size += int64(n)
ws.dirty = true
}
processRotate := func(req walRotateReq) {
@@ -238,58 +290,57 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) {
close(req.done)
}
flushAll := func() {
flushDirty := func() {
for _, ws := range hostFiles {
if ws.f != nil {
if ws.dirty {
ws.w.Flush()
ws.dirty = false
}
}
}
drain := func() {
for {
ticker := time.NewTicker(walFlushInterval)
defer ticker.Stop()
// drainBatch processes up to 4096 pending messages without blocking.
// Returns false if the channel was closed.
drainBatch := func() bool {
for range 4096 {
select {
case msg, ok := <-msgCh:
if !ok {
return
flushDirty()
return false
}
processMsg(msg)
case req := <-rotateCh:
processRotate(req)
default:
flushAll()
return
return true
}
}
return true
}
for {
select {
case <-ctx.Done():
drain()
// On shutdown, skip draining buffered messages — a full binary
// checkpoint will be written from in-memory state, making
// buffered WAL records redundant.
flushDirty()
return
case msg, ok := <-msgCh:
if !ok {
return
}
processMsg(msg)
// Drain up to 256 more messages without blocking to batch writes.
for range 256 {
select {
case msg, ok := <-msgCh:
if !ok {
return
}
processMsg(msg)
case req := <-rotateCh:
processRotate(req)
default:
goto flushed
}
if !drainBatch() {
return
}
flushed:
flushAll()
// No flush here — timer handles periodic flushing.
case <-ticker.C:
flushDirty()
case req := <-rotateCh:
processRotate(req)
}
@@ -298,23 +349,45 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) {
}
}
// WaitForWALStagingDrain blocks until all WALStaging goroutines have exited.
// Must be called after closing walShardChs to ensure all file handles are
// flushed and closed before checkpoint writes begin.
func WaitForWALStagingDrain() {
walStagingWg.Wait()
}
// RotateWALFiles sends rotation requests for the given host directories
// and blocks until all rotations complete. Each request is routed to the
// shard that owns the host directory.
//
// If shutdown is in progress (WAL staging goroutines may have exited),
// rotation is skipped to avoid deadlocking on abandoned channels.
func RotateWALFiles(hostDirs []string) {
if walShardRotateChs == nil {
if walShardRotateChs == nil || walShuttingDown.Load() {
return
}
dones := make([]chan struct{}, len(hostDirs))
for i, dir := range hostDirs {
dones[i] = make(chan struct{})
// Extract cluster/node from hostDir to find the right shard.
// hostDir = rootDir/cluster/node
deadline := time.After(2 * time.Minute)
dones := make([]chan struct{}, 0, len(hostDirs))
for _, dir := range hostDirs {
done := make(chan struct{})
shard := walShardIndexFromDir(dir)
walShardRotateChs[shard] <- walRotateReq{hostDir: dir, done: dones[i]}
select {
case walShardRotateChs[shard] <- walRotateReq{hostDir: dir, done: done}:
dones = append(dones, done)
case <-deadline:
cclog.Warnf("[METRICSTORE]> WAL rotation send timed out, %d of %d hosts remaining",
len(hostDirs)-len(dones), len(hostDirs))
goto waitDones
}
}
waitDones:
for _, done := range dones {
<-done
select {
case <-done:
case <-time.After(30 * time.Second):
cclog.Warn("[METRICSTORE]> WAL rotation completion timed out, continuing")
return
}
}
}
@@ -327,8 +400,9 @@ func walShardIndexFromDir(hostDir string) int {
return walShardIndex(cluster, node)
}
// RotateWALFiles sends rotation requests for the given host directories
// and blocks until all rotations complete.
// RotateWALFilesAfterShutdown directly removes current.wal files for the given
// host directories. Used after shutdown, when WALStaging goroutines have already
// exited and the channel-based RotateWALFiles is no longer safe to call.
func RotateWALFilesAfterShutdown(hostDirs []string) {
for _, dir := range hostDirs {
walPath := path.Join(dir, "current.wal")
@@ -338,142 +412,66 @@ func RotateWALFilesAfterShutdown(hostDirs []string) {
}
}
// writeWALRecordDirect encodes a WAL record directly into the bufio.Writer,
// avoiding heap allocations by using a stack-allocated scratch buffer for
// the fixed-size header/trailer and computing CRC inline.
func writeWALRecordDirect(w *bufio.Writer, msg *WALMessage) error {
// Compute payload size.
// writeWALRecordDirect encodes a WAL record into a contiguous buffer first,
// then writes it to the bufio.Writer in a single call. This prevents partial
// records in the write buffer if a write error occurs mid-record (e.g. disk full).
// Returns the number of bytes written and any error.
func writeWALRecordDirect(w *bufio.Writer, msg *WALMessage) (int, error) {
// Compute payload and total record size.
payloadSize := 8 + 2 + len(msg.MetricName) + 1 + 4
for _, s := range msg.Selector {
payloadSize += 1 + len(s)
}
// Total: 8 (header) + payload + 4 (CRC).
totalSize := 8 + payloadSize + 4
// Write magic + payload length (8 bytes header).
var hdr [8]byte
binary.LittleEndian.PutUint32(hdr[0:4], walRecordMagic)
binary.LittleEndian.PutUint32(hdr[4:8], uint32(payloadSize))
if _, err := w.Write(hdr[:]); err != nil {
return err
// Use stack buffer for typical small records, heap-allocate only for large ones.
var stackBuf [256]byte
var buf []byte
if totalSize <= len(stackBuf) {
buf = stackBuf[:totalSize]
} else {
buf = make([]byte, totalSize)
}
// We need to compute CRC over the payload as we write it.
crc := crc32.NewIEEE()
// Header: magic + payload length.
binary.LittleEndian.PutUint32(buf[0:4], walRecordMagic)
binary.LittleEndian.PutUint32(buf[4:8], uint32(payloadSize))
// Payload starts at offset 8.
p := 8
// Timestamp (8 bytes).
var scratch [8]byte
binary.LittleEndian.PutUint64(scratch[:8], uint64(msg.Timestamp))
crc.Write(scratch[:8])
if _, err := w.Write(scratch[:8]); err != nil {
return err
}
binary.LittleEndian.PutUint64(buf[p:p+8], uint64(msg.Timestamp))
p += 8
// Metric name length (2 bytes) + metric name.
binary.LittleEndian.PutUint16(scratch[:2], uint16(len(msg.MetricName)))
crc.Write(scratch[:2])
if _, err := w.Write(scratch[:2]); err != nil {
return err
}
nameBytes := []byte(msg.MetricName)
crc.Write(nameBytes)
if _, err := w.Write(nameBytes); err != nil {
return err
}
binary.LittleEndian.PutUint16(buf[p:p+2], uint16(len(msg.MetricName)))
p += 2
p += copy(buf[p:], msg.MetricName)
// Selector count (1 byte).
scratch[0] = byte(len(msg.Selector))
crc.Write(scratch[:1])
if _, err := w.Write(scratch[:1]); err != nil {
return err
}
buf[p] = byte(len(msg.Selector))
p++
// Selectors (1-byte length + bytes each).
for _, sel := range msg.Selector {
scratch[0] = byte(len(sel))
crc.Write(scratch[:1])
if _, err := w.Write(scratch[:1]); err != nil {
return err
}
selBytes := []byte(sel)
crc.Write(selBytes)
if _, err := w.Write(selBytes); err != nil {
return err
}
buf[p] = byte(len(sel))
p++
p += copy(buf[p:], sel)
}
// Value (4 bytes, float32 bits).
binary.LittleEndian.PutUint32(scratch[:4], math.Float32bits(float32(msg.Value)))
crc.Write(scratch[:4])
if _, err := w.Write(scratch[:4]); err != nil {
return err
}
binary.LittleEndian.PutUint32(buf[p:p+4], math.Float32bits(float32(msg.Value)))
p += 4
// CRC32 (4 bytes).
binary.LittleEndian.PutUint32(scratch[:4], crc.Sum32())
_, err := w.Write(scratch[:4])
return err
}
// CRC32 over payload (bytes 8..8+payloadSize).
crc := crc32.ChecksumIEEE(buf[8 : 8+payloadSize])
binary.LittleEndian.PutUint32(buf[p:p+4], crc)
// buildWALPayload encodes a WALMessage into a binary payload (without magic/length/CRC).
func buildWALPayload(msg *WALMessage) []byte {
size := 8 + 2 + len(msg.MetricName) + 1 + 4
for _, s := range msg.Selector {
size += 1 + len(s)
}
buf := make([]byte, 0, size)
// Timestamp (8 bytes, little-endian int64)
var ts [8]byte
binary.LittleEndian.PutUint64(ts[:], uint64(msg.Timestamp))
buf = append(buf, ts[:]...)
// Metric name (2-byte length prefix + bytes)
var mLen [2]byte
binary.LittleEndian.PutUint16(mLen[:], uint16(len(msg.MetricName)))
buf = append(buf, mLen[:]...)
buf = append(buf, msg.MetricName...)
// Selector count (1 byte)
buf = append(buf, byte(len(msg.Selector)))
// Selectors (1-byte length prefix + bytes each)
for _, sel := range msg.Selector {
buf = append(buf, byte(len(sel)))
buf = append(buf, sel...)
}
// Value (4 bytes, float32 bit representation)
var val [4]byte
binary.LittleEndian.PutUint32(val[:], math.Float32bits(float32(msg.Value)))
buf = append(buf, val[:]...)
return buf
}
// writeWALRecord appends a binary WAL record to the writer.
// Format: [4B magic][4B payload_len][payload][4B CRC32]
func writeWALRecord(w io.Writer, msg *WALMessage) error {
payload := buildWALPayload(msg)
crc := crc32.ChecksumIEEE(payload)
record := make([]byte, 0, 4+4+len(payload)+4)
var magic [4]byte
binary.LittleEndian.PutUint32(magic[:], walRecordMagic)
record = append(record, magic[:]...)
var pLen [4]byte
binary.LittleEndian.PutUint32(pLen[:], uint32(len(payload)))
record = append(record, pLen[:]...)
record = append(record, payload...)
var crcBytes [4]byte
binary.LittleEndian.PutUint32(crcBytes[:], crc)
record = append(record, crcBytes[:]...)
_, err := w.Write(record)
return err
// Single atomic write to the buffered writer.
n, err := w.Write(buf)
return n, err
}
// readWALRecord reads one WAL record from the reader.
@@ -697,7 +695,10 @@ func (m *MemoryStore) ToCheckpointWAL(dir string, from, to int64) (int, []string
selector []string
}
n, errs := int32(0), int32(0)
totalWork := len(levels)
cclog.Infof("[METRICSTORE]> Starting binary checkpoint for %d hosts with %d workers", totalWork, Keys.NumWorkers)
n, errs, completed := int32(0), int32(0), int32(0)
var successDirs []string
var successMu sync.Mutex
@@ -705,6 +706,22 @@ func (m *MemoryStore) ToCheckpointWAL(dir string, from, to int64) (int, []string
wg.Add(Keys.NumWorkers)
work := make(chan workItem, Keys.NumWorkers*2)
// Progress logging goroutine.
stopProgress := make(chan struct{})
go func() {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
cclog.Infof("[METRICSTORE]> Checkpoint progress: %d/%d hosts (%d written, %d errors)",
atomic.LoadInt32(&completed), totalWork, atomic.LoadInt32(&n), atomic.LoadInt32(&errs))
case <-stopProgress:
return
}
}
}()
for range Keys.NumWorkers {
go func() {
defer wg.Done()
@@ -712,6 +729,7 @@ func (m *MemoryStore) ToCheckpointWAL(dir string, from, to int64) (int, []string
err := wi.level.toCheckpointBinary(wi.hostDir, from, to, m)
if err != nil {
if err == ErrNoNewArchiveData {
atomic.AddInt32(&completed, 1)
continue
}
cclog.Errorf("[METRICSTORE]> binary checkpoint error for %s: %v", wi.hostDir, err)
@@ -722,6 +740,7 @@ func (m *MemoryStore) ToCheckpointWAL(dir string, from, to int64) (int, []string
successDirs = append(successDirs, wi.hostDir)
successMu.Unlock()
}
atomic.AddInt32(&completed, 1)
}
}()
}
@@ -736,6 +755,7 @@ func (m *MemoryStore) ToCheckpointWAL(dir string, from, to int64) (int, []string
}
close(work)
wg.Wait()
close(stopProgress)
if errs > 0 {
return int(n), successDirs, fmt.Errorf("[METRICSTORE]> %d errors during binary checkpoint (%d successes)", errs, n)