Improve retention

Add db cleanup
Fixes #103
This commit is contained in:
Jan Eitzinger 2023-05-11 09:40:13 +02:00
parent cc634dd155
commit 19e3ba7290
4 changed files with 42 additions and 8 deletions

View File

@ -443,11 +443,24 @@ func main() {
switch cfg.Retention.Policy { switch cfg.Retention.Policy {
case "delete": case "delete":
s.Every(1).Day().At("4:00").Do(func() { s.Every(1).Day().At("4:00").Do(func() {
jobs, err := jobRepo.FindJobsOlderThan(cfg.Retention.Age) startTime := time.Now().Unix() - int64(cfg.Retention.Age*24*3600)
jobs, err := jobRepo.FindJobsBefore(startTime)
if err != nil { if err != nil {
log.Warnf("Error while looking for retention jobs: %s", err.Error()) log.Warnf("Error while looking for retention jobs: %s", err.Error())
} }
archive.GetHandle().CleanUp(jobs) archive.GetHandle().CleanUp(jobs)
if cfg.Retention.IncludeDB {
cnt, err := jobRepo.DeleteJobsBefore(startTime)
if err != nil {
log.Errorf("Error while deleting retention jobs from db: %s", err.Error())
} else {
log.Infof("Retention: Removed %d jobs from db", cnt)
}
if err = jobRepo.Optimize(); err != nil {
log.Errorf("Error occured in db optimization: %s", err.Error())
}
}
}) })
case "move": case "move":
log.Warn("Retention policy move not implemented") log.Warn("Retention policy move not implemented")
@ -455,7 +468,8 @@ func main() {
if cfg.Compression > 0 { if cfg.Compression > 0 {
s.Every(1).Day().At("5:00").Do(func() { s.Every(1).Day().At("5:00").Do(func() {
jobs, err := jobRepo.FindJobsOlderThan(cfg.Compression) startTime := time.Now().Unix() - int64(cfg.Compression*24*3600)
jobs, err := jobRepo.FindJobsBefore(startTime)
if err != nil { if err != nil {
log.Warnf("Error while looking for retention jobs: %s", err.Error()) log.Warnf("Error while looking for retention jobs: %s", err.Error())
} }

View File

@ -96,6 +96,21 @@ func scanJob(row interface{ Scan(...interface{}) error }) (*schema.Job, error) {
return job, nil return job, nil
} }
func (r *JobRepository) Optimize() error {
var err error
switch r.driver {
case "sqlite3":
if _, err = r.DB.Exec(`VACUUM`); err != nil {
return err
}
case "mysql":
log.Info("Optimize currently not supported for mysql driver")
}
return nil
}
func (r *JobRepository) Flush() error { func (r *JobRepository) Flush() error {
var err error var err error
@ -707,10 +722,10 @@ func (r *JobRepository) StopJobsExceedingWalltimeBy(seconds int) error {
return nil return nil
} }
func (r *JobRepository) FindJobsOlderThan(days int) ([]*schema.Job, error) { func (r *JobRepository) FindJobsBefore(startTime int64) ([]*schema.Job, error) {
query := sq.Select(jobColumns...).From("job").Where(fmt.Sprintf("job.start_time < %d", query := sq.Select(jobColumns...).From("job").Where(fmt.Sprintf(
time.Now().Unix()-int64(days*24*3600))) "job.start_time < %d", startTime))
sql, args, err := query.ToSql() sql, args, err := query.ToSql()
if err != nil { if err != nil {

View File

@ -58,9 +58,10 @@ type ClusterConfig struct {
} }
type Retention struct { type Retention struct {
Age int `json:"age"` Age int `json:"age"`
Policy string `json:"policy"` IncludeDB bool `json:"includeDB"`
Location string `json:"location"` Policy string `json:"policy"`
Location string `json:"location"`
} }
// Format of the configuration (file). See below for the defaults. // Format of the configuration (file). See below for the defaults.

View File

@ -73,6 +73,10 @@
"move" "move"
] ]
}, },
"includeDB": {
"description": "Also remove jobs from database",
"type": "boolean"
},
"age": { "age": {
"description": "Act on jobs with startTime older than age (in days)", "description": "Act on jobs with startTime older than age (in days)",
"type": "integer" "type": "integer"