diff --git a/cmd/cc-backend/cli.go b/cmd/cc-backend/cli.go index 47f534be..3896383a 100644 --- a/cmd/cc-backend/cli.go +++ b/cmd/cc-backend/cli.go @@ -11,7 +11,8 @@ import "flag" var ( flagReinitDB, flagInit, flagServer, flagSyncLDAP, flagGops, flagMigrateDB, flagRevertDB, - flagForceDB, flagDev, flagVersion, flagLogDateTime, flagApplyTags, flagOptimizeDB bool + flagForceDB, flagDev, flagVersion, flagLogDateTime, flagApplyTags, flagOptimizeDB, + flagCleanupCheckpoints bool flagNewUser, flagDelUser, flagGenJWT, flagConfigFile, flagImportJob, flagLogLevel string ) @@ -28,6 +29,7 @@ func cliInit() { flag.BoolVar(&flagApplyTags, "apply-tags", false, "Run taggers on all completed jobs and exit") flag.BoolVar(&flagForceDB, "force-db", false, "Force database version, clear dirty flag and exit") flag.BoolVar(&flagOptimizeDB, "optimize-db", false, "Optimize database: run VACUUM to reclaim space, then ANALYZE to update query planner statistics") + flag.BoolVar(&flagCleanupCheckpoints, "cleanup-checkpoints", false, "Clean up old checkpoint files (delete or archive) based on retention settings, then exit") flag.BoolVar(&flagLogDateTime, "logdate", false, "Set this flag to add date and time to log messages") flag.StringVar(&flagConfigFile, "config", "./config.json", "Specify alternative path to `config.json`") flag.StringVar(&flagNewUser, "add-user", "", "Add a new user. Argument format: :[admin,support,manager,api,user]:") diff --git a/cmd/cc-backend/main.go b/cmd/cc-backend/main.go index 12faeae5..f8f0a767 100644 --- a/cmd/cc-backend/main.go +++ b/cmd/cc-backend/main.go @@ -14,6 +14,7 @@ import ( "fmt" "os" "os/signal" + goruntime "runtime" "runtime/debug" "strings" "sync" @@ -536,6 +537,43 @@ func run() error { return err } + // Handle checkpoint cleanup + if flagCleanupCheckpoints { + mscfg := ccconf.GetPackageConfig("metric-store") + if mscfg == nil { + return fmt.Errorf("metric-store configuration required for checkpoint cleanup") + } + if err := json.Unmarshal(mscfg, &metricstore.Keys); err != nil { + return fmt.Errorf("decoding metric-store config: %w", err) + } + if metricstore.Keys.NumWorkers <= 0 { + metricstore.Keys.NumWorkers = min(goruntime.NumCPU()/2+1, metricstore.DefaultMaxWorkers) + } + + d, err := time.ParseDuration(metricstore.Keys.RetentionInMemory) + if err != nil { + return fmt.Errorf("parsing retention-in-memory: %w", err) + } + from := time.Now().Add(-d) + deleteMode := metricstore.Keys.Cleanup == nil || metricstore.Keys.Cleanup.Mode != "archive" + cleanupDir := "" + if !deleteMode { + cleanupDir = metricstore.Keys.Cleanup.RootDir + } + + cclog.Infof("Cleaning up checkpoints older than %s...", from.Format(time.RFC3339)) + n, err := metricstore.CleanupCheckpoints( + metricstore.Keys.Checkpoints.RootDir, cleanupDir, from.Unix(), deleteMode) + if err != nil { + return fmt.Errorf("checkpoint cleanup: %w", err) + } + if deleteMode { + cclog.Exitf("Cleanup done: %d checkpoint files deleted.", n) + } else { + cclog.Exitf("Cleanup done: %d checkpoint files archived to parquet.", n) + } + } + // Exit if start server is not requested if !flagServer { cclog.Exit("No errors, server flag not set. Exiting cc-backend.")