feat: Add command line switch to trigger manual metricstore checkpoint cleanup

Entire-Checkpoint: 29b9d52db89c
This commit is contained in:
2026-03-23 07:58:35 +01:00
parent 192c94a78d
commit 45f329e5fb
2 changed files with 41 additions and 1 deletions

View File

@@ -11,7 +11,8 @@ import "flag"
var (
flagReinitDB, flagInit, flagServer, flagSyncLDAP, flagGops, flagMigrateDB, flagRevertDB,
flagForceDB, flagDev, flagVersion, flagLogDateTime, flagApplyTags, flagOptimizeDB bool
flagForceDB, flagDev, flagVersion, flagLogDateTime, flagApplyTags, flagOptimizeDB,
flagCleanupCheckpoints bool
flagNewUser, flagDelUser, flagGenJWT, flagConfigFile, flagImportJob, flagLogLevel string
)
@@ -28,6 +29,7 @@ func cliInit() {
flag.BoolVar(&flagApplyTags, "apply-tags", false, "Run taggers on all completed jobs and exit")
flag.BoolVar(&flagForceDB, "force-db", false, "Force database version, clear dirty flag and exit")
flag.BoolVar(&flagOptimizeDB, "optimize-db", false, "Optimize database: run VACUUM to reclaim space, then ANALYZE to update query planner statistics")
flag.BoolVar(&flagCleanupCheckpoints, "cleanup-checkpoints", false, "Clean up old checkpoint files (delete or archive) based on retention settings, then exit")
flag.BoolVar(&flagLogDateTime, "logdate", false, "Set this flag to add date and time to log messages")
flag.StringVar(&flagConfigFile, "config", "./config.json", "Specify alternative path to `config.json`")
flag.StringVar(&flagNewUser, "add-user", "", "Add a new user. Argument format: <username>:[admin,support,manager,api,user]:<password>")

View File

@@ -14,6 +14,7 @@ import (
"fmt"
"os"
"os/signal"
goruntime "runtime"
"runtime/debug"
"strings"
"sync"
@@ -536,6 +537,43 @@ func run() error {
return err
}
// Handle checkpoint cleanup
if flagCleanupCheckpoints {
mscfg := ccconf.GetPackageConfig("metric-store")
if mscfg == nil {
return fmt.Errorf("metric-store configuration required for checkpoint cleanup")
}
if err := json.Unmarshal(mscfg, &metricstore.Keys); err != nil {
return fmt.Errorf("decoding metric-store config: %w", err)
}
if metricstore.Keys.NumWorkers <= 0 {
metricstore.Keys.NumWorkers = min(goruntime.NumCPU()/2+1, metricstore.DefaultMaxWorkers)
}
d, err := time.ParseDuration(metricstore.Keys.RetentionInMemory)
if err != nil {
return fmt.Errorf("parsing retention-in-memory: %w", err)
}
from := time.Now().Add(-d)
deleteMode := metricstore.Keys.Cleanup == nil || metricstore.Keys.Cleanup.Mode != "archive"
cleanupDir := ""
if !deleteMode {
cleanupDir = metricstore.Keys.Cleanup.RootDir
}
cclog.Infof("Cleaning up checkpoints older than %s...", from.Format(time.RFC3339))
n, err := metricstore.CleanupCheckpoints(
metricstore.Keys.Checkpoints.RootDir, cleanupDir, from.Unix(), deleteMode)
if err != nil {
return fmt.Errorf("checkpoint cleanup: %w", err)
}
if deleteMode {
cclog.Exitf("Cleanup done: %d checkpoint files deleted.", n)
} else {
cclog.Exitf("Cleanup done: %d checkpoint files archived to parquet.", n)
}
}
// Exit if start server is not requested
if !flagServer {
cclog.Exit("No errors, server flag not set. Exiting cc-backend.")