Cleanup metricstore options

Entire-Checkpoint: 2f9a4e1c2e87
This commit is contained in:
2026-03-04 10:37:43 +01:00
parent 84fda9c8e2
commit 39635ea123
6 changed files with 27 additions and 53 deletions

View File

@@ -21,8 +21,6 @@
], ],
"metric-store": { "metric-store": {
"checkpoints": { "checkpoints": {
"file-format": "wal",
"interval": "12h"
}, },
"retention-in-memory": "48h", "retention-in-memory": "48h",
"memory-cap": 100 "memory-cap": 100

View File

@@ -74,14 +74,12 @@
], ],
"metric-store": { "metric-store": {
"checkpoints": { "checkpoints": {
"interval": "12h",
"directory": "./var/checkpoints" "directory": "./var/checkpoints"
}, },
"memory-cap": 100, "memory-cap": 100,
"retention-in-memory": "48h", "retention-in-memory": "48h",
"cleanup": { "cleanup": {
"mode": "archive", "mode": "archive",
"interval": "48h",
"directory": "./var/archive" "directory": "./var/archive"
}, },
"nats-subscriptions": [ "nats-subscriptions": [

View File

@@ -24,19 +24,15 @@ func CleanUp(wg *sync.WaitGroup, ctx context.Context) {
if Keys.Cleanup.Mode == "archive" { if Keys.Cleanup.Mode == "archive" {
// Run as Archiver // Run as Archiver
cleanUpWorker(wg, ctx, cleanUpWorker(wg, ctx,
Keys.Cleanup.Interval, Keys.RetentionInMemory,
"archiving", "archiving",
Keys.Cleanup.RootDir, Keys.Cleanup.RootDir,
false, false,
) )
} else { } else {
if Keys.Cleanup.Interval == "" {
Keys.Cleanup.Interval = Keys.RetentionInMemory
}
// Run as Deleter // Run as Deleter
cleanUpWorker(wg, ctx, cleanUpWorker(wg, ctx,
Keys.Cleanup.Interval, Keys.RetentionInMemory,
"deleting", "deleting",
"", "",
true, true,

View File

@@ -86,9 +86,11 @@ var (
// Checkpointing starts a background worker that periodically saves metric data to disk. // Checkpointing starts a background worker that periodically saves metric data to disk.
// //
// Checkpoints are written every 12 hours (hardcoded).
//
// Format behaviour: // Format behaviour:
// - "json": Periodic checkpointing based on Keys.Checkpoints.Interval // - "json": Periodic checkpointing every checkpointInterval
// - "wal": Periodic binary snapshots + WAL rotation at Keys.Checkpoints.Interval // - "wal": Periodic binary snapshots + WAL rotation every checkpointInterval
func Checkpointing(wg *sync.WaitGroup, ctx context.Context) { func Checkpointing(wg *sync.WaitGroup, ctx context.Context) {
lastCheckpointMu.Lock() lastCheckpointMu.Lock()
lastCheckpoint = time.Now() lastCheckpoint = time.Now()
@@ -98,14 +100,8 @@ func Checkpointing(wg *sync.WaitGroup, ctx context.Context) {
wg.Go(func() { wg.Go(func() {
d, err := time.ParseDuration(Keys.Checkpoints.Interval) const checkpointInterval = 12 * time.Hour
if err != nil { d := checkpointInterval
cclog.Fatalf("[METRICSTORE]> invalid checkpoint interval '%s': %s", Keys.Checkpoints.Interval, err.Error())
}
if d <= 0 {
cclog.Warnf("[METRICSTORE]> checkpoint interval is zero or negative (%s), checkpointing disabled", d)
return
}
ticker := time.NewTicker(d) ticker := time.NewTicker(d)
defer ticker.Stop() defer ticker.Stop()

View File

@@ -11,15 +11,13 @@
// //
// MetricStoreConfig (Keys) // MetricStoreConfig (Keys)
// ├─ NumWorkers: Parallel checkpoint/archive workers // ├─ NumWorkers: Parallel checkpoint/archive workers
// ├─ RetentionInMemory: How long to keep data in RAM // ├─ RetentionInMemory: How long to keep data in RAM (also used as cleanup interval)
// ├─ MemoryCap: Memory limit in bytes (triggers forceFree) // ├─ MemoryCap: Memory limit in bytes (triggers forceFree)
// ├─ Checkpoints: Persistence configuration // ├─ Checkpoints: Persistence configuration
// │ ├─ FileFormat: "json" or "wal" // │ ├─ FileFormat: "json" or "wal" (default: "wal")
// │ ├─ Interval: How often to save (e.g., "1h")
// │ └─ RootDir: Checkpoint storage path // │ └─ RootDir: Checkpoint storage path
// ├─ Cleanup: Long-term storage configuration // ├─ Cleanup: Long-term storage configuration (interval = RetentionInMemory)
// │ ├─ Interval: How often to delete/archive // │ ├─ RootDir: Archive storage path (archive mode only)
// │ ├─ RootDir: Archive storage path
// │ └─ Mode: "delete" or "archive" // │ └─ Mode: "delete" or "archive"
// ├─ Debug: Development/debugging options // ├─ Debug: Development/debugging options
// └─ Subscriptions: NATS topic subscriptions for metric ingestion // └─ Subscriptions: NATS topic subscriptions for metric ingestion
@@ -61,12 +59,10 @@ const (
// Checkpoints configures periodic persistence of in-memory metric data. // Checkpoints configures periodic persistence of in-memory metric data.
// //
// Fields: // Fields:
// - FileFormat: "json" (human-readable, periodic) or "wal" (binary snapshot + WAL, crash-safe) // - FileFormat: "json" (human-readable, periodic) or "wal" (binary snapshot + WAL, crash-safe); default is "wal"
// - Interval: Duration string (e.g., "1h", "30m") between checkpoint saves
// - RootDir: Filesystem path for checkpoint files (created if missing) // - RootDir: Filesystem path for checkpoint files (created if missing)
type Checkpoints struct { type Checkpoints struct {
FileFormat string `json:"file-format"` FileFormat string `json:"file-format"`
Interval string `json:"interval"`
RootDir string `json:"directory"` RootDir string `json:"directory"`
} }
@@ -80,18 +76,17 @@ type Debug struct {
EnableGops bool `json:"gops"` EnableGops bool `json:"gops"`
} }
// Archive configures long-term storage of old metric data. // Cleanup configures long-term storage of old metric data.
// //
// Data older than RetentionInMemory is archived to disk or deleted. // Data older than RetentionInMemory is archived to disk or deleted.
// The cleanup interval is always RetentionInMemory.
// //
// Fields: // Fields:
// - ArchiveInterval: Duration string (e.g., "24h") between archive operations // - RootDir: Filesystem path for archived data (used in "archive" mode)
// - RootDir: Filesystem path for archived data (created if missing) // - Mode: "delete" (discard old data) or "archive" (write to RootDir)
// - DeleteInstead: If true, delete old data instead of archiving (saves disk space)
type Cleanup struct { type Cleanup struct {
Interval string `json:"interval"` RootDir string `json:"directory"`
RootDir string `json:"directory"` Mode string `json:"mode"`
Mode string `json:"mode"`
} }
// Subscriptions defines NATS topics to subscribe to for metric ingestion. // Subscriptions defines NATS topics to subscribe to for metric ingestion.
@@ -141,7 +136,7 @@ type MetricStoreConfig struct {
// Accessed by Init(), Checkpointing(), and other lifecycle functions. // Accessed by Init(), Checkpointing(), and other lifecycle functions.
var Keys MetricStoreConfig = MetricStoreConfig{ var Keys MetricStoreConfig = MetricStoreConfig{
Checkpoints: Checkpoints{ Checkpoints: Checkpoints{
FileFormat: "json", FileFormat: "wal",
RootDir: "./var/checkpoints", RootDir: "./var/checkpoints",
}, },
Cleanup: &Cleanup{ Cleanup: &Cleanup{

View File

@@ -18,35 +18,26 @@ const configSchema = `{
"type": "object", "type": "object",
"properties": { "properties": {
"file-format": { "file-format": {
"description": "Specify the format for checkpoint files. Two variants: 'json' (human-readable, periodic) and 'wal' (binary snapshot + Write-Ahead Log, crash-safe). Default is 'json'.", "description": "Specify the format for checkpoint files. Two variants: 'json' (human-readable, periodic) and 'wal' (binary snapshot + Write-Ahead Log, crash-safe). Default is 'wal'.",
"type": "string"
},
"interval": {
"description": "Interval at which the metrics should be checkpointed.",
"type": "string" "type": "string"
}, },
"directory": { "directory": {
"description": "Path in which the checkpointed files should be placed.", "description": "Path in which the checkpointed files should be placed.",
"type": "string" "type": "string"
} }
}, }
"required": ["interval"]
}, },
"cleanup": { "cleanup": {
"description": "Configuration for the cleanup process.", "description": "Configuration for the cleanup process. The cleanup interval is always 'retention-in-memory'.",
"type": "object", "type": "object",
"properties": { "properties": {
"mode": { "mode": {
"description": "The operation mode (e.g., 'archive' or 'delete').", "description": "The operation mode (e.g., 'archive' or 'delete').",
"type": "string", "type": "string",
"enum": ["archive", "delete"] "enum": ["archive", "delete"]
},
"interval": {
"description": "Interval at which the cleanup runs.",
"type": "string"
}, },
"directory": { "directory": {
"description": "Target directory for operations.", "description": "Target directory for archive operations.",
"type": "string" "type": "string"
} }
}, },
@@ -56,7 +47,7 @@ const configSchema = `{
} }
}, },
"then": { "then": {
"required": ["interval", "directory"] "required": ["directory"]
} }
}, },
"retention-in-memory": { "retention-in-memory": {
@@ -86,5 +77,5 @@ const configSchema = `{
} }
} }
}, },
"required": ["checkpoints", "retention-in-memory", "memory-cap"] "required": ["retention-in-memory", "memory-cap"]
}` }`