diff --git a/configs/config-demo.json b/configs/config-demo.json index 7d36f19f..8cbd2ed6 100644 --- a/configs/config-demo.json +++ b/configs/config-demo.json @@ -16,8 +16,7 @@ "checkpoints": { "interval": "12h" }, - "retention-in-memory": "48h", + "retention-in-memory": "2m", "memory-cap": 100 } -} - +} \ No newline at end of file diff --git a/configs/config.json b/configs/config.json index ff224f29..a00db00e 100644 --- a/configs/config.json +++ b/configs/config.json @@ -5,13 +5,18 @@ "https-key-file": "/etc/letsencrypt/live/url/privkey.pem", "user": "clustercockpit", "group": "clustercockpit", - "apiAllowedIPs": ["*"], + "apiAllowedIPs": [ + "*" + ], "short-running-jobs-duration": 300, "enable-job-taggers": true, "resampling": { "minimumPoints": 600, "trigger": 180, - "resolutions": [240, 60] + "resolutions": [ + 240, + 60 + ] }, "apiSubjects": { "subjectJobEvent": "cc.job.event", @@ -47,10 +52,16 @@ }, "metric-store": { "checkpoints": { - "interval": "12h" + "interval": "12h", + "directory": "./var/checkpoints" }, "memory-cap": 100, "retention-in-memory": "48h", + "cleanup": { + "mode": "archive", + "interval": "48h", + "directory": "./var/archive" + }, "nats-subscriptions": [ { "subscribe-to": "hpc-nats", @@ -63,4 +74,4 @@ ] }, "ui-file": "ui-config.json" -} +} \ No newline at end of file diff --git a/internal/metricstore/archive.go b/internal/metricstore/archive.go index 68f88741..6abcb183 100644 --- a/internal/metricstore/archive.go +++ b/internal/metricstore/archive.go @@ -23,19 +23,23 @@ import ( // Worker for either Archiving or Deleting files -func Archiving(wg *sync.WaitGroup, ctx context.Context) { - if Keys.Archive != nil { +func CleanUp(wg *sync.WaitGroup, ctx context.Context) { + if Keys.Cleanup.Mode == "archive" { // Run as Archiver - runWorker(wg, ctx, - Keys.Archive.ArchiveInterval, + cleanUpWorker(wg, ctx, + Keys.Cleanup.Interval, "archiving", - Keys.Archive.RootDir, - Keys.Archive.DeleteInstead, + Keys.Cleanup.RootDir, + false, ) } else { + if Keys.Cleanup.Interval == "" { + Keys.Cleanup.Interval = Keys.RetentionInMemory + } + // Run as Deleter - runWorker(wg, ctx, - Keys.RetentionInMemory, + cleanUpWorker(wg, ctx, + Keys.Cleanup.Interval, "deleting", "", true, @@ -44,7 +48,7 @@ func Archiving(wg *sync.WaitGroup, ctx context.Context) { } // runWorker takes simple values to configure what it does -func runWorker(wg *sync.WaitGroup, ctx context.Context, interval string, mode string, archiveDir string, delete bool) { +func cleanUpWorker(wg *sync.WaitGroup, ctx context.Context, interval string, mode string, cleanupDir string, delete bool) { go func() { defer wg.Done() @@ -67,12 +71,12 @@ func runWorker(wg *sync.WaitGroup, ctx context.Context, interval string, mode st t := time.Now().Add(-d) cclog.Infof("[METRICSTORE]> start %s checkpoints (older than %s)...", mode, t.Format(time.RFC3339)) - n, err := ArchiveCheckpoints(Keys.Checkpoints.RootDir, archiveDir, t.Unix(), delete) + n, err := CleanupCheckpoints(Keys.Checkpoints.RootDir, cleanupDir, t.Unix(), delete) if err != nil { cclog.Errorf("[METRICSTORE]> %s failed: %s", mode, err.Error()) } else { - if delete && archiveDir == "" { + if delete && cleanupDir == "" { cclog.Infof("[METRICSTORE]> done: %d checkpoints deleted", n) } else { cclog.Infof("[METRICSTORE]> done: %d files zipped and moved to archive", n) @@ -85,9 +89,9 @@ func runWorker(wg *sync.WaitGroup, ctx context.Context, interval string, mode st var ErrNoNewArchiveData error = errors.New("all data already archived") -// ZIP all checkpoint files older than `from` together and write them to the `archiveDir`, -// deleting them from the `checkpointsDir`. -func ArchiveCheckpoints(checkpointsDir, archiveDir string, from int64, deleteInstead bool) (int, error) { +// Delete or ZIP all checkpoint files older than `from` together and write them to the `cleanupDir`, +// deleting/moving them from the `checkpointsDir`. +func CleanupCheckpoints(checkpointsDir, cleanupDir string, from int64, deleteInstead bool) (int, error) { entries1, err := os.ReadDir(checkpointsDir) if err != nil { return 0, err @@ -107,7 +111,7 @@ func ArchiveCheckpoints(checkpointsDir, archiveDir string, from int64, deleteIns go func() { defer wg.Done() for workItem := range work { - m, err := archiveCheckpoints(workItem.cdir, workItem.adir, from, deleteInstead) + m, err := cleanupCheckpoints(workItem.cdir, workItem.adir, from, deleteInstead) if err != nil { cclog.Errorf("error while archiving %s/%s: %s", workItem.cluster, workItem.host, err.Error()) atomic.AddInt32(&errs, 1) @@ -125,7 +129,7 @@ func ArchiveCheckpoints(checkpointsDir, archiveDir string, from int64, deleteIns for _, de2 := range entries2 { cdir := filepath.Join(checkpointsDir, de1.Name(), de2.Name()) - adir := filepath.Join(archiveDir, de1.Name(), de2.Name()) + adir := filepath.Join(cleanupDir, de1.Name(), de2.Name()) work <- workItem{ adir: adir, cdir: cdir, cluster: de1.Name(), host: de2.Name(), @@ -146,8 +150,8 @@ func ArchiveCheckpoints(checkpointsDir, archiveDir string, from int64, deleteIns return int(n), nil } -// Helper function for `ArchiveCheckpoints`. -func archiveCheckpoints(dir string, archiveDir string, from int64, deleteInstead bool) (int, error) { +// Helper function for `CleanupCheckpoints`. +func cleanupCheckpoints(dir string, cleanupDir string, from int64, deleteInstead bool) (int, error) { entries, err := os.ReadDir(dir) if err != nil { return 0, err @@ -171,10 +175,10 @@ func archiveCheckpoints(dir string, archiveDir string, from int64, deleteInstead return n, nil } - filename := filepath.Join(archiveDir, fmt.Sprintf("%d.zip", from)) + filename := filepath.Join(cleanupDir, fmt.Sprintf("%d.zip", from)) f, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY, CheckpointFilePerms) if err != nil && os.IsNotExist(err) { - err = os.MkdirAll(archiveDir, CheckpointDirPerms) + err = os.MkdirAll(cleanupDir, CheckpointDirPerms) if err == nil { f, err = os.OpenFile(filename, os.O_CREATE|os.O_WRONLY, CheckpointFilePerms) } diff --git a/internal/metricstore/config.go b/internal/metricstore/config.go index 34e536d0..44a24f7d 100644 --- a/internal/metricstore/config.go +++ b/internal/metricstore/config.go @@ -17,10 +17,10 @@ // │ ├─ FileFormat: "avro" or "json" // │ ├─ Interval: How often to save (e.g., "1h") // │ └─ RootDir: Checkpoint storage path -// ├─ Archive: Long-term storage configuration -// │ ├─ ArchiveInterval: How often to archive +// ├─ Cleanup: Long-term storage configuration +// │ ├─ Interval: How often to delete/archive // │ ├─ RootDir: Archive storage path -// │ └─ DeleteInstead: Delete old data instead of archiving +// │ └─ Mode: "delete" or "archive" // ├─ Debug: Development/debugging options // └─ Subscriptions: NATS topic subscriptions for metric ingestion // @@ -48,12 +48,13 @@ import ( ) const ( - DefaultMaxWorkers = 10 - DefaultBufferCapacity = 512 - DefaultGCTriggerInterval = 100 - DefaultAvroWorkers = 4 - DefaultCheckpointBufferMin = 3 - DefaultAvroCheckpointInterval = time.Minute + DefaultMaxWorkers = 10 + DefaultBufferCapacity = 512 + DefaultGCTriggerInterval = 100 + DefaultAvroWorkers = 4 + DefaultCheckpointBufferMin = 3 + DefaultAvroCheckpointInterval = time.Minute + DefaultMemoryUsageTrackerInterval = 1 * time.Hour ) // Checkpoints configures periodic persistence of in-memory metric data. @@ -86,10 +87,10 @@ type Debug struct { // - ArchiveInterval: Duration string (e.g., "24h") between archive operations // - RootDir: Filesystem path for archived data (created if missing) // - DeleteInstead: If true, delete old data instead of archiving (saves disk space) -type Archive struct { - ArchiveInterval string `json:"interval"` - RootDir string `json:"directory"` - DeleteInstead bool `json:"delete-instead"` +type Cleanup struct { + Interval string `json:"interval"` + RootDir string `json:"directory"` + Mode string `json:"mode"` } // Subscriptions defines NATS topics to subscribe to for metric ingestion. @@ -129,7 +130,7 @@ type MetricStoreConfig struct { MemoryCap int `json:"memory-cap"` Checkpoints Checkpoints `json:"checkpoints"` Debug *Debug `json:"debug"` - Archive *Archive `json:"archive"` + Cleanup *Cleanup `json:"cleanup"` Subscriptions *Subscriptions `json:"nats-subscriptions"` } @@ -142,6 +143,9 @@ var Keys MetricStoreConfig = MetricStoreConfig{ FileFormat: "avro", RootDir: "./var/checkpoints", }, + Cleanup: &Cleanup{ + Mode: "delete", + }, } // AggregationStrategy defines how to combine metric values across hierarchy levels. diff --git a/internal/metricstore/configSchema.go b/internal/metricstore/configSchema.go index 4677eca6..cb60fc37 100644 --- a/internal/metricstore/configSchema.go +++ b/internal/metricstore/configSchema.go @@ -32,20 +32,32 @@ const configSchema = `{ }, "required": ["interval"] }, - "archive": { - "description": "Configuration for archiving the already checkpointed files.", + "cleanup": { + "description": "Configuration for the cleanup process.", "type": "object", "properties": { + "mode": { + "description": "The operation mode (e.g., 'archive' or 'delete').", + "type": "string", + "enum": ["archive", "delete"] + }, "interval": { - "description": "Interval at which the checkpointed files should be archived.", + "description": "Interval at which the cleanup runs.", "type": "string" }, "directory": { - "description": "Specify the directy in which the archived files should be placed.", + "description": "Target directory for operations.", "type": "string" } }, - "required": ["interval", "directory"] + "if": { + "properties": { + "mode": { "const": "archive" } + } + }, + "then": { + "required": ["interval", "directory"] + } }, "retention-in-memory": { "description": "Keep the metrics within memory for given time interval. Retention for X hours, then the metrics would be freed.", diff --git a/internal/metricstore/metricstore.go b/internal/metricstore/metricstore.go index b1fe065d..b016e725 100644 --- a/internal/metricstore/metricstore.go +++ b/internal/metricstore/metricstore.go @@ -200,7 +200,7 @@ func Init(rawConfig json.RawMessage, wg *sync.WaitGroup) { Retention(wg, ctx) Checkpointing(wg, ctx) - Archiving(wg, ctx) + CleanUp(wg, ctx) DataStaging(wg, ctx) MemoryUsageTracker(wg, ctx) @@ -386,7 +386,7 @@ func MemoryUsageTracker(wg *sync.WaitGroup, ctx context.Context) { go func() { defer wg.Done() - d := 1 * time.Minute + d := DefaultMemoryUsageTrackerInterval if d <= 0 { return