Fix logic for findFiles() and keep archive worker

This commit is contained in:
Aditya Ujeniya
2026-01-15 20:27:11 +01:00
parent 40bff1eff9
commit 10a5c89a16
4 changed files with 57 additions and 16 deletions

View File

@@ -1,7 +1,9 @@
{
"main": {
"addr": "127.0.0.1:8080",
"apiAllowedIPs": ["*"]
"apiAllowedIPs": [
"*"
]
},
"cron": {
"commit-job-worker": "1m",
@@ -19,4 +21,4 @@
},
"retention-in-memory": "48h"
}
}
}

View File

@@ -21,12 +21,36 @@ import (
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
)
// Worker for either Archiving or Deleting files
func Archiving(wg *sync.WaitGroup, ctx context.Context) {
if Keys.Archive != nil {
// Run as Archiver
runWorker(wg, ctx,
Keys.Archive.ArchiveInterval,
"archiving",
Keys.Archive.RootDir,
Keys.Archive.DeleteInstead,
)
} else {
// Run as Deleter
runWorker(wg, ctx,
Keys.RetentionInMemory,
"deleting",
"",
true,
)
}
}
// runWorker takes simple values to configure what it does
func runWorker(wg *sync.WaitGroup, ctx context.Context, interval string, mode string, archiveDir string, delete bool) {
go func() {
defer wg.Done()
d, err := time.ParseDuration(Keys.Archive.ArchiveInterval)
d, err := time.ParseDuration(interval)
if err != nil {
cclog.Fatalf("[METRICSTORE]> error parsing archive interval duration: %v\n", err)
cclog.Fatalf("[METRICSTORE]> error parsing %s interval duration: %v\n", mode, err)
}
if d <= 0 {
return
@@ -41,14 +65,18 @@ func Archiving(wg *sync.WaitGroup, ctx context.Context) {
return
case <-ticker.C:
t := time.Now().Add(-d)
cclog.Infof("[METRICSTORE]> start archiving checkpoints (older than %s)...", t.Format(time.RFC3339))
n, err := ArchiveCheckpoints(Keys.Checkpoints.RootDir,
Keys.Archive.RootDir, t.Unix(), Keys.Archive.DeleteInstead)
cclog.Infof("[METRICSTORE]> start %s checkpoints (older than %s)...", mode, t.Format(time.RFC3339))
n, err := ArchiveCheckpoints(Keys.Checkpoints.RootDir, archiveDir, t.Unix(), delete)
if err != nil {
cclog.Errorf("[METRICSTORE]> archiving failed: %s", err.Error())
cclog.Errorf("[METRICSTORE]> %s failed: %s", mode, err.Error())
} else {
cclog.Infof("[METRICSTORE]> done: %d files zipped and moved to archive", n)
if delete && archiveDir == "" {
cclog.Infof("[METRICSTORE]> done: %d checkpoints deleted", n)
} else {
cclog.Infof("[METRICSTORE]> done: %d files zipped and moved to archive", n)
}
}
}
}

View File

@@ -730,13 +730,24 @@ func findFiles(direntries []fs.DirEntry, t int64, extension string, findMoreRece
return nums[a.Name()] < nums[b.Name()]
})
if len(nums) == 0 {
return nil, nil
}
filenames := make([]string, 0)
for i := range direntries {
e := direntries[i]
for i, e := range direntries {
ts1 := nums[e.Name()]
// Logic to look for files in forward or direction
// If logic: All files greater than or after
// the given timestamp will be selected
// Else If logic: All files less than or before
// the given timestamp will be selected
if findMoreRecentFiles && t <= ts1 {
filenames = append(filenames, e.Name())
} else if !findMoreRecentFiles && ts1 <= t && ts1 != 0 {
filenames = append(filenames, e.Name())
}
if i == len(direntries)-1 {
continue
@@ -749,10 +760,6 @@ func findFiles(direntries []fs.DirEntry, t int64, extension string, findMoreRece
if ts1 < t && t < ts2 {
filenames = append(filenames, e.Name())
}
} else {
if ts2 < t {
filenames = append(filenames, e.Name())
}
}
}

View File

@@ -144,7 +144,11 @@ func Init(rawConfig json.RawMessage, wg *sync.WaitGroup) {
dataStagingGoroutines := 1
archivingGoroutines := 1
totalGoroutines := retentionGoroutines + checkpointingGoroutines + dataStagingGoroutines + archivingGoroutines
totalGoroutines := retentionGoroutines +
checkpointingGoroutines +
dataStagingGoroutines +
archivingGoroutines
wg.Add(totalGoroutines)
Retention(wg, ctx)