From 10a5c89a1608a5c2b0f1b9a99d8e7debf037fe0e Mon Sep 17 00:00:00 2001 From: Aditya Ujeniya Date: Thu, 15 Jan 2026 20:27:11 +0100 Subject: [PATCH] Fix logic for findFiles() and keep archive worker --- configs/config-demo.json | 6 +++-- internal/metricstore/archive.go | 42 ++++++++++++++++++++++++----- internal/metricstore/checkpoint.go | 19 ++++++++----- internal/metricstore/metricstore.go | 6 ++++- 4 files changed, 57 insertions(+), 16 deletions(-) diff --git a/configs/config-demo.json b/configs/config-demo.json index e53fa8bd..23e9a6cb 100644 --- a/configs/config-demo.json +++ b/configs/config-demo.json @@ -1,7 +1,9 @@ { "main": { "addr": "127.0.0.1:8080", - "apiAllowedIPs": ["*"] + "apiAllowedIPs": [ + "*" + ] }, "cron": { "commit-job-worker": "1m", @@ -19,4 +21,4 @@ }, "retention-in-memory": "48h" } -} +} \ No newline at end of file diff --git a/internal/metricstore/archive.go b/internal/metricstore/archive.go index 78efadfe..68f88741 100644 --- a/internal/metricstore/archive.go +++ b/internal/metricstore/archive.go @@ -21,12 +21,36 @@ import ( cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" ) +// Worker for either Archiving or Deleting files + func Archiving(wg *sync.WaitGroup, ctx context.Context) { + if Keys.Archive != nil { + // Run as Archiver + runWorker(wg, ctx, + Keys.Archive.ArchiveInterval, + "archiving", + Keys.Archive.RootDir, + Keys.Archive.DeleteInstead, + ) + } else { + // Run as Deleter + runWorker(wg, ctx, + Keys.RetentionInMemory, + "deleting", + "", + true, + ) + } +} + +// runWorker takes simple values to configure what it does +func runWorker(wg *sync.WaitGroup, ctx context.Context, interval string, mode string, archiveDir string, delete bool) { go func() { defer wg.Done() - d, err := time.ParseDuration(Keys.Archive.ArchiveInterval) + + d, err := time.ParseDuration(interval) if err != nil { - cclog.Fatalf("[METRICSTORE]> error parsing archive interval duration: %v\n", err) + cclog.Fatalf("[METRICSTORE]> error parsing %s interval duration: %v\n", mode, err) } if d <= 0 { return @@ -41,14 +65,18 @@ func Archiving(wg *sync.WaitGroup, ctx context.Context) { return case <-ticker.C: t := time.Now().Add(-d) - cclog.Infof("[METRICSTORE]> start archiving checkpoints (older than %s)...", t.Format(time.RFC3339)) - n, err := ArchiveCheckpoints(Keys.Checkpoints.RootDir, - Keys.Archive.RootDir, t.Unix(), Keys.Archive.DeleteInstead) + cclog.Infof("[METRICSTORE]> start %s checkpoints (older than %s)...", mode, t.Format(time.RFC3339)) + + n, err := ArchiveCheckpoints(Keys.Checkpoints.RootDir, archiveDir, t.Unix(), delete) if err != nil { - cclog.Errorf("[METRICSTORE]> archiving failed: %s", err.Error()) + cclog.Errorf("[METRICSTORE]> %s failed: %s", mode, err.Error()) } else { - cclog.Infof("[METRICSTORE]> done: %d files zipped and moved to archive", n) + if delete && archiveDir == "" { + cclog.Infof("[METRICSTORE]> done: %d checkpoints deleted", n) + } else { + cclog.Infof("[METRICSTORE]> done: %d files zipped and moved to archive", n) + } } } } diff --git a/internal/metricstore/checkpoint.go b/internal/metricstore/checkpoint.go index b5511221..660b7a1f 100644 --- a/internal/metricstore/checkpoint.go +++ b/internal/metricstore/checkpoint.go @@ -730,13 +730,24 @@ func findFiles(direntries []fs.DirEntry, t int64, extension string, findMoreRece return nums[a.Name()] < nums[b.Name()] }) + if len(nums) == 0 { + return nil, nil + } + filenames := make([]string, 0) - for i := range direntries { - e := direntries[i] + + for i, e := range direntries { ts1 := nums[e.Name()] + // Logic to look for files in forward or direction + // If logic: All files greater than or after + // the given timestamp will be selected + // Else If logic: All files less than or before + // the given timestamp will be selected if findMoreRecentFiles && t <= ts1 { filenames = append(filenames, e.Name()) + } else if !findMoreRecentFiles && ts1 <= t && ts1 != 0 { + filenames = append(filenames, e.Name()) } if i == len(direntries)-1 { continue @@ -749,10 +760,6 @@ func findFiles(direntries []fs.DirEntry, t int64, extension string, findMoreRece if ts1 < t && t < ts2 { filenames = append(filenames, e.Name()) } - } else { - if ts2 < t { - filenames = append(filenames, e.Name()) - } } } diff --git a/internal/metricstore/metricstore.go b/internal/metricstore/metricstore.go index 0d7e5f45..1607a68c 100644 --- a/internal/metricstore/metricstore.go +++ b/internal/metricstore/metricstore.go @@ -144,7 +144,11 @@ func Init(rawConfig json.RawMessage, wg *sync.WaitGroup) { dataStagingGoroutines := 1 archivingGoroutines := 1 - totalGoroutines := retentionGoroutines + checkpointingGoroutines + dataStagingGoroutines + archivingGoroutines + totalGoroutines := retentionGoroutines + + checkpointingGoroutines + + dataStagingGoroutines + + archivingGoroutines + wg.Add(totalGoroutines) Retention(wg, ctx)