mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2026-03-30 12:37:30 +02:00
Update metricstore documentation
Entire-Checkpoint: 99f20c1edd90
This commit is contained in:
@@ -3,6 +3,12 @@
|
|||||||
// Use of this source code is governed by a MIT-style
|
// Use of this source code is governed by a MIT-style
|
||||||
// license that can be found in the LICENSE file.
|
// license that can be found in the LICENSE file.
|
||||||
|
|
||||||
|
// This file implements the cleanup (archiving or deletion) of old checkpoint files.
|
||||||
|
//
|
||||||
|
// The CleanUp worker runs on a timer equal to RetentionInMemory. In "archive" mode
|
||||||
|
// it converts checkpoint files older than the retention window into per-cluster
|
||||||
|
// Parquet files and then deletes the originals. In "delete" mode it simply removes
|
||||||
|
// old checkpoint files.
|
||||||
package metricstore
|
package metricstore
|
||||||
|
|
||||||
import (
|
import (
|
||||||
@@ -19,8 +25,12 @@ import (
|
|||||||
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
|
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Worker for either Archiving or Deleting files
|
// CleanUp starts a background worker that periodically removes or archives
|
||||||
|
// checkpoint files older than the configured retention window.
|
||||||
|
//
|
||||||
|
// In "archive" mode, old checkpoint files are converted to Parquet and stored
|
||||||
|
// under Keys.Cleanup.RootDir. In "delete" mode they are simply removed.
|
||||||
|
// The cleanup interval equals Keys.RetentionInMemory.
|
||||||
func CleanUp(wg *sync.WaitGroup, ctx context.Context) {
|
func CleanUp(wg *sync.WaitGroup, ctx context.Context) {
|
||||||
if Keys.Cleanup.Mode == "archive" {
|
if Keys.Cleanup.Mode == "archive" {
|
||||||
cclog.Info("[METRICSTORE]> enable archive cleanup to parquet")
|
cclog.Info("[METRICSTORE]> enable archive cleanup to parquet")
|
||||||
|
|||||||
@@ -86,11 +86,12 @@ var (
|
|||||||
|
|
||||||
// Checkpointing starts a background worker that periodically saves metric data to disk.
|
// Checkpointing starts a background worker that periodically saves metric data to disk.
|
||||||
//
|
//
|
||||||
// Checkpoints are written every 12 hours (hardcoded).
|
// The checkpoint interval is read from Keys.CheckpointInterval (default: 12 hours).
|
||||||
//
|
//
|
||||||
// Format behaviour:
|
// Format behaviour:
|
||||||
// - "json": Periodic checkpointing every checkpointInterval
|
// - "json": Writes a JSON snapshot file per host every interval
|
||||||
// - "wal": Periodic binary snapshots + WAL rotation every checkpointInterval
|
// - "wal": Writes a binary snapshot file per host every interval, then rotates
|
||||||
|
// the current.wal files for all successfully checkpointed hosts
|
||||||
func Checkpointing(wg *sync.WaitGroup, ctx context.Context) {
|
func Checkpointing(wg *sync.WaitGroup, ctx context.Context) {
|
||||||
lastCheckpointMu.Lock()
|
lastCheckpointMu.Lock()
|
||||||
lastCheckpoint = time.Now()
|
lastCheckpoint = time.Now()
|
||||||
|
|||||||
@@ -8,7 +8,7 @@
|
|||||||
//
|
//
|
||||||
// The package organizes metrics in a tree structure (cluster → host → component) and
|
// The package organizes metrics in a tree structure (cluster → host → component) and
|
||||||
// provides concurrent read/write access to metric data with configurable aggregation strategies.
|
// provides concurrent read/write access to metric data with configurable aggregation strategies.
|
||||||
// Background goroutines handle periodic checkpointing (JSON or Avro format), archiving old data,
|
// Background goroutines handle periodic checkpointing (JSON or WAL/binary format), archiving old data,
|
||||||
// and enforcing retention policies.
|
// and enforcing retention policies.
|
||||||
//
|
//
|
||||||
// Key features:
|
// Key features:
|
||||||
@@ -16,7 +16,7 @@
|
|||||||
// - Hierarchical data organization (selectors)
|
// - Hierarchical data organization (selectors)
|
||||||
// - Concurrent checkpoint/archive workers
|
// - Concurrent checkpoint/archive workers
|
||||||
// - Support for sum and average aggregation
|
// - Support for sum and average aggregation
|
||||||
// - NATS integration for metric ingestion
|
// - NATS integration for metric ingestion via InfluxDB line protocol
|
||||||
package metricstore
|
package metricstore
|
||||||
|
|
||||||
import (
|
import (
|
||||||
@@ -113,7 +113,8 @@ type MemoryStore struct {
|
|||||||
// 6. Optionally subscribes to NATS for real-time metric ingestion
|
// 6. Optionally subscribes to NATS for real-time metric ingestion
|
||||||
//
|
//
|
||||||
// Parameters:
|
// Parameters:
|
||||||
// - rawConfig: JSON configuration for the metric store (see MetricStoreConfig)
|
// - rawConfig: JSON configuration for the metric store (see MetricStoreConfig); may be nil to use defaults
|
||||||
|
// - metrics: Map of metric names to their configurations (frequency and aggregation strategy)
|
||||||
// - wg: WaitGroup that will be incremented for each background goroutine started
|
// - wg: WaitGroup that will be incremented for each background goroutine started
|
||||||
//
|
//
|
||||||
// The function will call cclog.Fatal on critical errors during initialization.
|
// The function will call cclog.Fatal on critical errors during initialization.
|
||||||
|
|||||||
@@ -400,8 +400,9 @@ func walShardIndexFromDir(hostDir string) int {
|
|||||||
return walShardIndex(cluster, node)
|
return walShardIndex(cluster, node)
|
||||||
}
|
}
|
||||||
|
|
||||||
// RotateWALFiles sends rotation requests for the given host directories
|
// RotateWALFilesAfterShutdown directly removes current.wal files for the given
|
||||||
// and blocks until all rotations complete.
|
// host directories. Used after shutdown, when WALStaging goroutines have already
|
||||||
|
// exited and the channel-based RotateWALFiles is no longer safe to call.
|
||||||
func RotateWALFilesAfterShutdown(hostDirs []string) {
|
func RotateWALFilesAfterShutdown(hostDirs []string) {
|
||||||
for _, dir := range hostDirs {
|
for _, dir := range hostDirs {
|
||||||
walPath := path.Join(dir, "current.wal")
|
walPath := path.Join(dir, "current.wal")
|
||||||
|
|||||||
Reference in New Issue
Block a user