diff --git a/pkg/metricstore/archive.go b/pkg/metricstore/archive.go index 3b92a3e0..3938ac82 100644 --- a/pkg/metricstore/archive.go +++ b/pkg/metricstore/archive.go @@ -3,6 +3,12 @@ // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. +// This file implements the cleanup (archiving or deletion) of old checkpoint files. +// +// The CleanUp worker runs on a timer equal to RetentionInMemory. In "archive" mode +// it converts checkpoint files older than the retention window into per-cluster +// Parquet files and then deletes the originals. In "delete" mode it simply removes +// old checkpoint files. package metricstore import ( @@ -19,8 +25,12 @@ import ( cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" ) -// Worker for either Archiving or Deleting files - +// CleanUp starts a background worker that periodically removes or archives +// checkpoint files older than the configured retention window. +// +// In "archive" mode, old checkpoint files are converted to Parquet and stored +// under Keys.Cleanup.RootDir. In "delete" mode they are simply removed. +// The cleanup interval equals Keys.RetentionInMemory. func CleanUp(wg *sync.WaitGroup, ctx context.Context) { if Keys.Cleanup.Mode == "archive" { cclog.Info("[METRICSTORE]> enable archive cleanup to parquet") diff --git a/pkg/metricstore/checkpoint.go b/pkg/metricstore/checkpoint.go index d408ce72..fbf56a15 100644 --- a/pkg/metricstore/checkpoint.go +++ b/pkg/metricstore/checkpoint.go @@ -86,11 +86,12 @@ var ( // Checkpointing starts a background worker that periodically saves metric data to disk. // -// Checkpoints are written every 12 hours (hardcoded). +// The checkpoint interval is read from Keys.CheckpointInterval (default: 12 hours). // // Format behaviour: -// - "json": Periodic checkpointing every checkpointInterval -// - "wal": Periodic binary snapshots + WAL rotation every checkpointInterval +// - "json": Writes a JSON snapshot file per host every interval +// - "wal": Writes a binary snapshot file per host every interval, then rotates +// the current.wal files for all successfully checkpointed hosts func Checkpointing(wg *sync.WaitGroup, ctx context.Context) { lastCheckpointMu.Lock() lastCheckpoint = time.Now() diff --git a/pkg/metricstore/metricstore.go b/pkg/metricstore/metricstore.go index 9ba69c55..845472b2 100644 --- a/pkg/metricstore/metricstore.go +++ b/pkg/metricstore/metricstore.go @@ -8,7 +8,7 @@ // // The package organizes metrics in a tree structure (cluster → host → component) and // provides concurrent read/write access to metric data with configurable aggregation strategies. -// Background goroutines handle periodic checkpointing (JSON or Avro format), archiving old data, +// Background goroutines handle periodic checkpointing (JSON or WAL/binary format), archiving old data, // and enforcing retention policies. // // Key features: @@ -16,7 +16,7 @@ // - Hierarchical data organization (selectors) // - Concurrent checkpoint/archive workers // - Support for sum and average aggregation -// - NATS integration for metric ingestion +// - NATS integration for metric ingestion via InfluxDB line protocol package metricstore import ( @@ -113,7 +113,8 @@ type MemoryStore struct { // 6. Optionally subscribes to NATS for real-time metric ingestion // // Parameters: -// - rawConfig: JSON configuration for the metric store (see MetricStoreConfig) +// - rawConfig: JSON configuration for the metric store (see MetricStoreConfig); may be nil to use defaults +// - metrics: Map of metric names to their configurations (frequency and aggregation strategy) // - wg: WaitGroup that will be incremented for each background goroutine started // // The function will call cclog.Fatal on critical errors during initialization. diff --git a/pkg/metricstore/walCheckpoint.go b/pkg/metricstore/walCheckpoint.go index 0d0b2c8e..b7c7ffb2 100644 --- a/pkg/metricstore/walCheckpoint.go +++ b/pkg/metricstore/walCheckpoint.go @@ -400,8 +400,9 @@ func walShardIndexFromDir(hostDir string) int { return walShardIndex(cluster, node) } -// RotateWALFiles sends rotation requests for the given host directories -// and blocks until all rotations complete. +// RotateWALFilesAfterShutdown directly removes current.wal files for the given +// host directories. Used after shutdown, when WALStaging goroutines have already +// exited and the channel-based RotateWALFiles is no longer safe to call. func RotateWALFilesAfterShutdown(hostDirs []string) { for _, dir := range hostDirs { walPath := path.Join(dir, "current.wal")