Refactor Archive to Cleanup and rename everything

This commit is contained in:
Aditya Ujeniya
2026-01-16 14:09:50 +01:00
parent 1d62ee1e22
commit 1a41629535
6 changed files with 78 additions and 48 deletions

View File

@@ -16,8 +16,7 @@
"checkpoints": { "checkpoints": {
"interval": "12h" "interval": "12h"
}, },
"retention-in-memory": "48h", "retention-in-memory": "2m",
"memory-cap": 100 "memory-cap": 100
} }
} }

View File

@@ -5,13 +5,18 @@
"https-key-file": "/etc/letsencrypt/live/url/privkey.pem", "https-key-file": "/etc/letsencrypt/live/url/privkey.pem",
"user": "clustercockpit", "user": "clustercockpit",
"group": "clustercockpit", "group": "clustercockpit",
"apiAllowedIPs": ["*"], "apiAllowedIPs": [
"*"
],
"short-running-jobs-duration": 300, "short-running-jobs-duration": 300,
"enable-job-taggers": true, "enable-job-taggers": true,
"resampling": { "resampling": {
"minimumPoints": 600, "minimumPoints": 600,
"trigger": 180, "trigger": 180,
"resolutions": [240, 60] "resolutions": [
240,
60
]
}, },
"apiSubjects": { "apiSubjects": {
"subjectJobEvent": "cc.job.event", "subjectJobEvent": "cc.job.event",
@@ -47,10 +52,16 @@
}, },
"metric-store": { "metric-store": {
"checkpoints": { "checkpoints": {
"interval": "12h" "interval": "12h",
"directory": "./var/checkpoints"
}, },
"memory-cap": 100, "memory-cap": 100,
"retention-in-memory": "48h", "retention-in-memory": "48h",
"cleanup": {
"mode": "archive",
"interval": "48h",
"directory": "./var/archive"
},
"nats-subscriptions": [ "nats-subscriptions": [
{ {
"subscribe-to": "hpc-nats", "subscribe-to": "hpc-nats",

View File

@@ -23,19 +23,23 @@ import (
// Worker for either Archiving or Deleting files // Worker for either Archiving or Deleting files
func Archiving(wg *sync.WaitGroup, ctx context.Context) { func CleanUp(wg *sync.WaitGroup, ctx context.Context) {
if Keys.Archive != nil { if Keys.Cleanup.Mode == "archive" {
// Run as Archiver // Run as Archiver
runWorker(wg, ctx, cleanUpWorker(wg, ctx,
Keys.Archive.ArchiveInterval, Keys.Cleanup.Interval,
"archiving", "archiving",
Keys.Archive.RootDir, Keys.Cleanup.RootDir,
Keys.Archive.DeleteInstead, false,
) )
} else { } else {
if Keys.Cleanup.Interval == "" {
Keys.Cleanup.Interval = Keys.RetentionInMemory
}
// Run as Deleter // Run as Deleter
runWorker(wg, ctx, cleanUpWorker(wg, ctx,
Keys.RetentionInMemory, Keys.Cleanup.Interval,
"deleting", "deleting",
"", "",
true, true,
@@ -44,7 +48,7 @@ func Archiving(wg *sync.WaitGroup, ctx context.Context) {
} }
// runWorker takes simple values to configure what it does // runWorker takes simple values to configure what it does
func runWorker(wg *sync.WaitGroup, ctx context.Context, interval string, mode string, archiveDir string, delete bool) { func cleanUpWorker(wg *sync.WaitGroup, ctx context.Context, interval string, mode string, cleanupDir string, delete bool) {
go func() { go func() {
defer wg.Done() defer wg.Done()
@@ -67,12 +71,12 @@ func runWorker(wg *sync.WaitGroup, ctx context.Context, interval string, mode st
t := time.Now().Add(-d) t := time.Now().Add(-d)
cclog.Infof("[METRICSTORE]> start %s checkpoints (older than %s)...", mode, t.Format(time.RFC3339)) cclog.Infof("[METRICSTORE]> start %s checkpoints (older than %s)...", mode, t.Format(time.RFC3339))
n, err := ArchiveCheckpoints(Keys.Checkpoints.RootDir, archiveDir, t.Unix(), delete) n, err := CleanupCheckpoints(Keys.Checkpoints.RootDir, cleanupDir, t.Unix(), delete)
if err != nil { if err != nil {
cclog.Errorf("[METRICSTORE]> %s failed: %s", mode, err.Error()) cclog.Errorf("[METRICSTORE]> %s failed: %s", mode, err.Error())
} else { } else {
if delete && archiveDir == "" { if delete && cleanupDir == "" {
cclog.Infof("[METRICSTORE]> done: %d checkpoints deleted", n) cclog.Infof("[METRICSTORE]> done: %d checkpoints deleted", n)
} else { } else {
cclog.Infof("[METRICSTORE]> done: %d files zipped and moved to archive", n) cclog.Infof("[METRICSTORE]> done: %d files zipped and moved to archive", n)
@@ -85,9 +89,9 @@ func runWorker(wg *sync.WaitGroup, ctx context.Context, interval string, mode st
var ErrNoNewArchiveData error = errors.New("all data already archived") var ErrNoNewArchiveData error = errors.New("all data already archived")
// ZIP all checkpoint files older than `from` together and write them to the `archiveDir`, // Delete or ZIP all checkpoint files older than `from` together and write them to the `cleanupDir`,
// deleting them from the `checkpointsDir`. // deleting/moving them from the `checkpointsDir`.
func ArchiveCheckpoints(checkpointsDir, archiveDir string, from int64, deleteInstead bool) (int, error) { func CleanupCheckpoints(checkpointsDir, cleanupDir string, from int64, deleteInstead bool) (int, error) {
entries1, err := os.ReadDir(checkpointsDir) entries1, err := os.ReadDir(checkpointsDir)
if err != nil { if err != nil {
return 0, err return 0, err
@@ -107,7 +111,7 @@ func ArchiveCheckpoints(checkpointsDir, archiveDir string, from int64, deleteIns
go func() { go func() {
defer wg.Done() defer wg.Done()
for workItem := range work { for workItem := range work {
m, err := archiveCheckpoints(workItem.cdir, workItem.adir, from, deleteInstead) m, err := cleanupCheckpoints(workItem.cdir, workItem.adir, from, deleteInstead)
if err != nil { if err != nil {
cclog.Errorf("error while archiving %s/%s: %s", workItem.cluster, workItem.host, err.Error()) cclog.Errorf("error while archiving %s/%s: %s", workItem.cluster, workItem.host, err.Error())
atomic.AddInt32(&errs, 1) atomic.AddInt32(&errs, 1)
@@ -125,7 +129,7 @@ func ArchiveCheckpoints(checkpointsDir, archiveDir string, from int64, deleteIns
for _, de2 := range entries2 { for _, de2 := range entries2 {
cdir := filepath.Join(checkpointsDir, de1.Name(), de2.Name()) cdir := filepath.Join(checkpointsDir, de1.Name(), de2.Name())
adir := filepath.Join(archiveDir, de1.Name(), de2.Name()) adir := filepath.Join(cleanupDir, de1.Name(), de2.Name())
work <- workItem{ work <- workItem{
adir: adir, cdir: cdir, adir: adir, cdir: cdir,
cluster: de1.Name(), host: de2.Name(), cluster: de1.Name(), host: de2.Name(),
@@ -146,8 +150,8 @@ func ArchiveCheckpoints(checkpointsDir, archiveDir string, from int64, deleteIns
return int(n), nil return int(n), nil
} }
// Helper function for `ArchiveCheckpoints`. // Helper function for `CleanupCheckpoints`.
func archiveCheckpoints(dir string, archiveDir string, from int64, deleteInstead bool) (int, error) { func cleanupCheckpoints(dir string, cleanupDir string, from int64, deleteInstead bool) (int, error) {
entries, err := os.ReadDir(dir) entries, err := os.ReadDir(dir)
if err != nil { if err != nil {
return 0, err return 0, err
@@ -171,10 +175,10 @@ func archiveCheckpoints(dir string, archiveDir string, from int64, deleteInstead
return n, nil return n, nil
} }
filename := filepath.Join(archiveDir, fmt.Sprintf("%d.zip", from)) filename := filepath.Join(cleanupDir, fmt.Sprintf("%d.zip", from))
f, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY, CheckpointFilePerms) f, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY, CheckpointFilePerms)
if err != nil && os.IsNotExist(err) { if err != nil && os.IsNotExist(err) {
err = os.MkdirAll(archiveDir, CheckpointDirPerms) err = os.MkdirAll(cleanupDir, CheckpointDirPerms)
if err == nil { if err == nil {
f, err = os.OpenFile(filename, os.O_CREATE|os.O_WRONLY, CheckpointFilePerms) f, err = os.OpenFile(filename, os.O_CREATE|os.O_WRONLY, CheckpointFilePerms)
} }

View File

@@ -17,10 +17,10 @@
// │ ├─ FileFormat: "avro" or "json" // │ ├─ FileFormat: "avro" or "json"
// │ ├─ Interval: How often to save (e.g., "1h") // │ ├─ Interval: How often to save (e.g., "1h")
// │ └─ RootDir: Checkpoint storage path // │ └─ RootDir: Checkpoint storage path
// ├─ Archive: Long-term storage configuration // ├─ Cleanup: Long-term storage configuration
// │ ├─ ArchiveInterval: How often to archive // │ ├─ Interval: How often to delete/archive
// │ ├─ RootDir: Archive storage path // │ ├─ RootDir: Archive storage path
// │ └─ DeleteInstead: Delete old data instead of archiving // │ └─ Mode: "delete" or "archive"
// ├─ Debug: Development/debugging options // ├─ Debug: Development/debugging options
// └─ Subscriptions: NATS topic subscriptions for metric ingestion // └─ Subscriptions: NATS topic subscriptions for metric ingestion
// //
@@ -48,12 +48,13 @@ import (
) )
const ( const (
DefaultMaxWorkers = 10 DefaultMaxWorkers = 10
DefaultBufferCapacity = 512 DefaultBufferCapacity = 512
DefaultGCTriggerInterval = 100 DefaultGCTriggerInterval = 100
DefaultAvroWorkers = 4 DefaultAvroWorkers = 4
DefaultCheckpointBufferMin = 3 DefaultCheckpointBufferMin = 3
DefaultAvroCheckpointInterval = time.Minute DefaultAvroCheckpointInterval = time.Minute
DefaultMemoryUsageTrackerInterval = 1 * time.Hour
) )
// Checkpoints configures periodic persistence of in-memory metric data. // Checkpoints configures periodic persistence of in-memory metric data.
@@ -86,10 +87,10 @@ type Debug struct {
// - ArchiveInterval: Duration string (e.g., "24h") between archive operations // - ArchiveInterval: Duration string (e.g., "24h") between archive operations
// - RootDir: Filesystem path for archived data (created if missing) // - RootDir: Filesystem path for archived data (created if missing)
// - DeleteInstead: If true, delete old data instead of archiving (saves disk space) // - DeleteInstead: If true, delete old data instead of archiving (saves disk space)
type Archive struct { type Cleanup struct {
ArchiveInterval string `json:"interval"` Interval string `json:"interval"`
RootDir string `json:"directory"` RootDir string `json:"directory"`
DeleteInstead bool `json:"delete-instead"` Mode string `json:"mode"`
} }
// Subscriptions defines NATS topics to subscribe to for metric ingestion. // Subscriptions defines NATS topics to subscribe to for metric ingestion.
@@ -129,7 +130,7 @@ type MetricStoreConfig struct {
MemoryCap int `json:"memory-cap"` MemoryCap int `json:"memory-cap"`
Checkpoints Checkpoints `json:"checkpoints"` Checkpoints Checkpoints `json:"checkpoints"`
Debug *Debug `json:"debug"` Debug *Debug `json:"debug"`
Archive *Archive `json:"archive"` Cleanup *Cleanup `json:"cleanup"`
Subscriptions *Subscriptions `json:"nats-subscriptions"` Subscriptions *Subscriptions `json:"nats-subscriptions"`
} }
@@ -142,6 +143,9 @@ var Keys MetricStoreConfig = MetricStoreConfig{
FileFormat: "avro", FileFormat: "avro",
RootDir: "./var/checkpoints", RootDir: "./var/checkpoints",
}, },
Cleanup: &Cleanup{
Mode: "delete",
},
} }
// AggregationStrategy defines how to combine metric values across hierarchy levels. // AggregationStrategy defines how to combine metric values across hierarchy levels.

View File

@@ -32,20 +32,32 @@ const configSchema = `{
}, },
"required": ["interval"] "required": ["interval"]
}, },
"archive": { "cleanup": {
"description": "Configuration for archiving the already checkpointed files.", "description": "Configuration for the cleanup process.",
"type": "object", "type": "object",
"properties": { "properties": {
"mode": {
"description": "The operation mode (e.g., 'archive' or 'delete').",
"type": "string",
"enum": ["archive", "delete"]
},
"interval": { "interval": {
"description": "Interval at which the checkpointed files should be archived.", "description": "Interval at which the cleanup runs.",
"type": "string" "type": "string"
}, },
"directory": { "directory": {
"description": "Specify the directy in which the archived files should be placed.", "description": "Target directory for operations.",
"type": "string" "type": "string"
} }
}, },
"required": ["interval", "directory"] "if": {
"properties": {
"mode": { "const": "archive" }
}
},
"then": {
"required": ["interval", "directory"]
}
}, },
"retention-in-memory": { "retention-in-memory": {
"description": "Keep the metrics within memory for given time interval. Retention for X hours, then the metrics would be freed.", "description": "Keep the metrics within memory for given time interval. Retention for X hours, then the metrics would be freed.",

View File

@@ -200,7 +200,7 @@ func Init(rawConfig json.RawMessage, wg *sync.WaitGroup) {
Retention(wg, ctx) Retention(wg, ctx)
Checkpointing(wg, ctx) Checkpointing(wg, ctx)
Archiving(wg, ctx) CleanUp(wg, ctx)
DataStaging(wg, ctx) DataStaging(wg, ctx)
MemoryUsageTracker(wg, ctx) MemoryUsageTracker(wg, ctx)
@@ -386,7 +386,7 @@ func MemoryUsageTracker(wg *sync.WaitGroup, ctx context.Context) {
go func() { go func() {
defer wg.Done() defer wg.Done()
d := 1 * time.Minute d := DefaultMemoryUsageTrackerInterval
if d <= 0 { if d <= 0 {
return return