diff --git a/internal/taskmanager/nodestateRetentionService.go b/internal/taskmanager/nodestateRetentionService.go index 9a704502..b6306849 100644 --- a/internal/taskmanager/nodestateRetentionService.go +++ b/internal/taskmanager/nodestateRetentionService.go @@ -18,7 +18,7 @@ import ( func RegisterNodeStateRetentionDeleteService(ageHours int) { cclog.Info("Register node state retention delete service") - s.NewJob(gocron.DurationJob(1*time.Hour), + s.NewJob(gocron.DailyJob(1, gocron.NewAtTimes(gocron.NewAtTime(2, 0, 0))), gocron.NewTask( func() { cutoff := time.Now().Unix() - int64(ageHours*3600) @@ -32,8 +32,8 @@ func RegisterNodeStateRetentionDeleteService(ageHours int) { })) } -func RegisterNodeStateRetentionParquetService(cfg *config.NodeStateRetention) { - cclog.Info("Register node state retention parquet service") +func RegisterNodeStateRetentionMoveService(cfg *config.NodeStateRetention) { + cclog.Info("Register node state retention move service") maxFileSizeMB := cfg.MaxFileSizeMB if maxFileSizeMB <= 0 { @@ -63,11 +63,11 @@ func RegisterNodeStateRetentionParquetService(cfg *config.NodeStateRetention) { } if err != nil { - cclog.Errorf("NodeState parquet retention: failed to create target: %v", err) + cclog.Errorf("NodeState move retention: failed to create target: %v", err) return } - s.NewJob(gocron.DurationJob(1*time.Hour), + s.NewJob(gocron.DailyJob(1, gocron.NewAtTimes(gocron.NewAtTime(2, 30, 0))), gocron.NewTask( func() { cutoff := time.Now().Unix() - int64(ageHours*3600) @@ -75,14 +75,14 @@ func RegisterNodeStateRetentionParquetService(cfg *config.NodeStateRetention) { rows, err := nodeRepo.FindNodeStatesBefore(cutoff) if err != nil { - cclog.Errorf("NodeState parquet retention: error finding rows: %v", err) + cclog.Errorf("NodeState move retention: error finding rows: %v", err) return } if len(rows) == 0 { return } - cclog.Infof("NodeState parquet retention: archiving %d rows", len(rows)) + cclog.Infof("NodeState move retention: archiving %d rows", len(rows)) pw := pqarchive.NewNodeStateParquetWriter(target, maxFileSizeMB) for _, ns := range rows { @@ -100,21 +100,21 @@ func RegisterNodeStateRetentionParquetService(cfg *config.NodeStateRetention) { SubCluster: ns.SubCluster, } if err := pw.AddRow(row); err != nil { - cclog.Errorf("NodeState parquet retention: add row: %v", err) + cclog.Errorf("NodeState move retention: add row: %v", err) continue } } if err := pw.Close(); err != nil { - cclog.Errorf("NodeState parquet retention: close writer: %v", err) + cclog.Errorf("NodeState move retention: close writer: %v", err) return } cnt, err := nodeRepo.DeleteNodeStatesBefore(cutoff) if err != nil { - cclog.Errorf("NodeState parquet retention: error deleting rows: %v", err) + cclog.Errorf("NodeState move retention: error deleting rows: %v", err) } else { - cclog.Infof("NodeState parquet retention: deleted %d rows from db", cnt) + cclog.Infof("NodeState move retention: deleted %d rows from db", cnt) } })) } diff --git a/internal/taskmanager/taskManager.go b/internal/taskmanager/taskManager.go index 529395b5..b25b2a93 100644 --- a/internal/taskmanager/taskManager.go +++ b/internal/taskmanager/taskManager.go @@ -154,8 +154,8 @@ func initNodeStateRetention() { switch cfg.Policy { case "delete": RegisterNodeStateRetentionDeleteService(age) - case "parquet": - RegisterNodeStateRetentionParquetService(cfg) + case "move": + RegisterNodeStateRetentionMoveService(cfg) default: cclog.Warnf("Unknown nodestate-retention policy: %s", cfg.Policy) }