Allow finer control for omit tagged jobs in retention policies

This commit is contained in:
2026-02-23 08:46:47 +01:00
parent defa8fa994
commit 03c65e06f6
8 changed files with 92 additions and 33 deletions

View File

@@ -369,13 +369,11 @@ func runServer(ctx context.Context) error {
errChan := make(chan error, 1) errChan := make(chan error, 1)
// Start HTTP server // Start HTTP server
wg.Add(1) wg.Go(func() {
go func() {
defer wg.Done()
if err := srv.Start(ctx); err != nil { if err := srv.Start(ctx); err != nil {
errChan <- err errChan <- err
} }
}() })
// Handle shutdown signals // Handle shutdown signals
wg.Add(1) wg.Add(1)

View File

@@ -904,11 +904,13 @@ func (api *RestAPI) deleteJobBefore(rw http.ResponseWriter, r *http.Request) {
} }
// Check for omit-tagged query parameter // Check for omit-tagged query parameter
omitTagged := false omitTagged := "none"
if omitTaggedStr := r.URL.Query().Get("omit-tagged"); omitTaggedStr != "" { if omitTaggedStr := r.URL.Query().Get("omit-tagged"); omitTaggedStr != "" {
omitTagged, e = strconv.ParseBool(omitTaggedStr) switch omitTaggedStr {
if e != nil { case "none", "all", "user":
handleError(fmt.Errorf("boolean expected for omit-tagged parameter: %w", e), http.StatusBadRequest, rw) omitTagged = omitTaggedStr
default:
handleError(fmt.Errorf("omit-tagged must be one of: none, all, user"), http.StatusBadRequest, rw)
return return
} }
} }

View File

@@ -392,15 +392,19 @@ func (r *JobRepository) FetchEnergyFootprint(job *schema.Job) (map[string]float6
// //
// Parameters: // Parameters:
// - startTime: Unix timestamp, jobs with start_time < this value will be deleted // - startTime: Unix timestamp, jobs with start_time < this value will be deleted
// - omitTagged: If true, skip jobs that have associated tags (jobtag entries) // - omitTagged: "none" = delete all jobs, "all" = skip any tagged jobs,
// "user" = skip jobs with user-created tags (not auto-tagger types "app"/"jobClass")
// //
// Returns the count of deleted jobs or an error if the operation fails. // Returns the count of deleted jobs or an error if the operation fails.
func (r *JobRepository) DeleteJobsBefore(startTime int64, omitTagged bool) (int, error) { func (r *JobRepository) DeleteJobsBefore(startTime int64, omitTagged string) (int, error) {
var cnt int var cnt int
q := sq.Select("count(*)").From("job").Where("job.start_time < ?", startTime) q := sq.Select("count(*)").From("job").Where("job.start_time < ?", startTime)
if omitTagged { switch omitTagged {
case "all":
q = q.Where("NOT EXISTS (SELECT 1 FROM jobtag WHERE jobtag.job_id = job.id)") q = q.Where("NOT EXISTS (SELECT 1 FROM jobtag WHERE jobtag.job_id = job.id)")
case "user":
q = q.Where("NOT EXISTS (SELECT 1 FROM jobtag JOIN tag ON tag.id = jobtag.tag_id WHERE jobtag.job_id = job.id AND tag.tag_type NOT IN ('app', 'jobClass'))")
} }
if err := q.RunWith(r.DB).QueryRow().Scan(&cnt); err != nil { if err := q.RunWith(r.DB).QueryRow().Scan(&cnt); err != nil {
@@ -413,8 +417,11 @@ func (r *JobRepository) DeleteJobsBefore(startTime int64, omitTagged bool) (int,
var jobIds []int64 var jobIds []int64
selectQuery := sq.Select("id").From("job").Where("job.start_time < ?", startTime) selectQuery := sq.Select("id").From("job").Where("job.start_time < ?", startTime)
if omitTagged { switch omitTagged {
case "all":
selectQuery = selectQuery.Where("NOT EXISTS (SELECT 1 FROM jobtag WHERE jobtag.job_id = job.id)") selectQuery = selectQuery.Where("NOT EXISTS (SELECT 1 FROM jobtag WHERE jobtag.job_id = job.id)")
case "user":
selectQuery = selectQuery.Where("NOT EXISTS (SELECT 1 FROM jobtag JOIN tag ON tag.id = jobtag.tag_id WHERE jobtag.job_id = job.id AND tag.tag_type NOT IN ('app', 'jobClass'))")
} }
rows, err := selectQuery.RunWith(r.DB).Query() rows, err := selectQuery.RunWith(r.DB).Query()
@@ -436,8 +443,11 @@ func (r *JobRepository) DeleteJobsBefore(startTime int64, omitTagged bool) (int,
qd := sq.Delete("job").Where("job.start_time < ?", startTime) qd := sq.Delete("job").Where("job.start_time < ?", startTime)
if omitTagged { switch omitTagged {
case "all":
qd = qd.Where("NOT EXISTS (SELECT 1 FROM jobtag WHERE jobtag.job_id = job.id)") qd = qd.Where("NOT EXISTS (SELECT 1 FROM jobtag WHERE jobtag.job_id = job.id)")
case "user":
qd = qd.Where("NOT EXISTS (SELECT 1 FROM jobtag JOIN tag ON tag.id = jobtag.tag_id WHERE jobtag.job_id = job.id AND tag.tag_type NOT IN ('app', 'jobClass'))")
} }
_, err := qd.RunWith(r.DB).Exec() _, err := qd.RunWith(r.DB).Exec()
@@ -822,10 +832,11 @@ func (r *JobRepository) UpdateDuration() error {
// Parameters: // Parameters:
// - startTimeBegin: Unix timestamp for range start (use 0 for unbounded start) // - startTimeBegin: Unix timestamp for range start (use 0 for unbounded start)
// - startTimeEnd: Unix timestamp for range end // - startTimeEnd: Unix timestamp for range end
// - omitTagged: If true, exclude jobs with associated tags // - omitTagged: "none" = include all jobs, "all" = exclude any tagged jobs,
// "user" = exclude jobs with user-created tags (not auto-tagger types "app"/"jobClass")
// //
// Returns a slice of jobs or an error if the time range is invalid or query fails. // Returns a slice of jobs or an error if the time range is invalid or query fails.
func (r *JobRepository) FindJobsBetween(startTimeBegin int64, startTimeEnd int64, omitTagged bool) ([]*schema.Job, error) { func (r *JobRepository) FindJobsBetween(startTimeBegin int64, startTimeEnd int64, omitTagged string) ([]*schema.Job, error) {
var query sq.SelectBuilder var query sq.SelectBuilder
if startTimeBegin == startTimeEnd || startTimeBegin > startTimeEnd { if startTimeBegin == startTimeEnd || startTimeBegin > startTimeEnd {
@@ -840,8 +851,11 @@ func (r *JobRepository) FindJobsBetween(startTimeBegin int64, startTimeEnd int64
query = sq.Select(jobColumns...).From("job").Where("job.start_time BETWEEN ? AND ?", startTimeBegin, startTimeEnd) query = sq.Select(jobColumns...).From("job").Where("job.start_time BETWEEN ? AND ?", startTimeBegin, startTimeEnd)
} }
if omitTagged { switch omitTagged {
case "all":
query = query.Where("NOT EXISTS (SELECT 1 FROM jobtag WHERE jobtag.job_id = job.id)") query = query.Where("NOT EXISTS (SELECT 1 FROM jobtag WHERE jobtag.job_id = job.id)")
case "user":
query = query.Where("NOT EXISTS (SELECT 1 FROM jobtag JOIN tag ON tag.id = jobtag.tag_id WHERE jobtag.job_id = job.id AND tag.tag_type NOT IN ('app', 'jobClass'))")
} }
query = query.OrderBy("job.cluster ASC", "job.subcluster ASC", "job.project ASC", "job.start_time ASC") query = query.OrderBy("job.cluster ASC", "job.subcluster ASC", "job.project ASC", "job.start_time ASC")

View File

@@ -78,7 +78,7 @@ func TestFindJobsBetween(t *testing.T) {
// 1. Find a job to use (Find all jobs) // 1. Find a job to use (Find all jobs)
// We use a large time range to ensure we get something if it exists // We use a large time range to ensure we get something if it exists
jobs, err := r.FindJobsBetween(0, 9999999999, false) jobs, err := r.FindJobsBetween(0, 9999999999, "none")
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@@ -88,21 +88,21 @@ func TestFindJobsBetween(t *testing.T) {
targetJob := jobs[0] targetJob := jobs[0]
// 2. Create a tag // 2. Create an auto-tagger tag (type "app")
tagName := fmt.Sprintf("testtag_%d", time.Now().UnixNano()) appTagName := fmt.Sprintf("apptag_%d", time.Now().UnixNano())
tagID, err := r.CreateTag("testtype", tagName, "global") appTagID, err := r.CreateTag("app", appTagName, "global")
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
// 3. Link Tag (Manually to avoid archive dependency side-effects in unit test) // 3. Link auto-tagger tag to job
_, err = r.DB.Exec("INSERT INTO jobtag (job_id, tag_id) VALUES (?, ?)", *targetJob.ID, tagID) _, err = r.DB.Exec("INSERT INTO jobtag (job_id, tag_id) VALUES (?, ?)", *targetJob.ID, appTagID)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
// 4. Search with omitTagged = false (Should find the job) // 4. Search with omitTagged = "none" (Should find the job)
jobsFound, err := r.FindJobsBetween(0, 9999999999, false) jobsFound, err := r.FindJobsBetween(0, 9999999999, "none")
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@@ -115,18 +115,58 @@ func TestFindJobsBetween(t *testing.T) {
} }
} }
if !found { if !found {
t.Errorf("Target job %d should be found when omitTagged=false", *targetJob.ID) t.Errorf("Target job %d should be found when omitTagged=none", *targetJob.ID)
} }
// 5. Search with omitTagged = true (Should NOT find the job) // 5. Search with omitTagged = "all" (Should NOT find the job — it has a tag)
jobsFiltered, err := r.FindJobsBetween(0, 9999999999, true) jobsFiltered, err := r.FindJobsBetween(0, 9999999999, "all")
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
for _, j := range jobsFiltered { for _, j := range jobsFiltered {
if *j.ID == *targetJob.ID { if *j.ID == *targetJob.ID {
t.Errorf("Target job %d should NOT be found when omitTagged=true", *targetJob.ID) t.Errorf("Target job %d should NOT be found when omitTagged=all", *targetJob.ID)
}
}
// 6. Search with omitTagged = "user": auto-tagger tag ("app") should NOT exclude the job
jobsUserFilter, err := r.FindJobsBetween(0, 9999999999, "user")
if err != nil {
t.Fatal(err)
}
found = false
for _, j := range jobsUserFilter {
if *j.ID == *targetJob.ID {
found = true
break
}
}
if !found {
t.Errorf("Target job %d should be found when omitTagged=user (only has auto-tagger tag)", *targetJob.ID)
}
// 7. Add a user-created tag (type "testtype") to the same job
userTagName := fmt.Sprintf("usertag_%d", time.Now().UnixNano())
userTagID, err := r.CreateTag("testtype", userTagName, "global")
if err != nil {
t.Fatal(err)
}
_, err = r.DB.Exec("INSERT INTO jobtag (job_id, tag_id) VALUES (?, ?)", *targetJob.ID, userTagID)
if err != nil {
t.Fatal(err)
}
// 8. Now omitTagged = "user" should exclude the job (has a user-created tag)
jobsUserFilter2, err := r.FindJobsBetween(0, 9999999999, "user")
if err != nil {
t.Fatal(err)
}
for _, j := range jobsUserFilter2 {
if *j.ID == *targetJob.ID {
t.Errorf("Target job %d should NOT be found when omitTagged=user (has user-created tag)", *targetJob.ID)
} }
} }
} }

View File

@@ -28,10 +28,10 @@ func RegisterCompressionService(compressOlderThan int) {
lastTime := ar.CompressLast(startTime) lastTime := ar.CompressLast(startTime)
if startTime == lastTime { if startTime == lastTime {
cclog.Info("Compression Service - Complete archive run") cclog.Info("Compression Service - Complete archive run")
jobs, err = jobRepo.FindJobsBetween(0, startTime, false) jobs, err = jobRepo.FindJobsBetween(0, startTime, "none")
} else { } else {
jobs, err = jobRepo.FindJobsBetween(lastTime, startTime, false) jobs, err = jobRepo.FindJobsBetween(lastTime, startTime, "none")
} }
if err != nil { if err != nil {

View File

@@ -149,7 +149,7 @@ func transferJobsParquet(jobs []*schema.Job, src archive.ArchiveBackend, target
} }
// cleanupAfterTransfer removes jobs from archive and optionally from DB. // cleanupAfterTransfer removes jobs from archive and optionally from DB.
func cleanupAfterTransfer(jobs []*schema.Job, startTime int64, includeDB bool, omitTagged bool) { func cleanupAfterTransfer(jobs []*schema.Job, startTime int64, includeDB bool, omitTagged string) {
archive.GetHandle().CleanUp(jobs) archive.GetHandle().CleanUp(jobs)
if includeDB { if includeDB {

View File

@@ -26,8 +26,8 @@ type Retention struct {
Policy string `json:"policy"` Policy string `json:"policy"`
Format string `json:"format"` Format string `json:"format"`
Age int `json:"age"` Age int `json:"age"`
IncludeDB bool `json:"includeDB"` IncludeDB bool `json:"include-db"`
OmitTagged bool `json:"omitTagged"` OmitTagged string `json:"omit-tagged"`
TargetKind string `json:"target-kind"` TargetKind string `json:"target-kind"`
TargetPath string `json:"target-path"` TargetPath string `json:"target-path"`
TargetEndpoint string `json:"target-endpoint"` TargetEndpoint string `json:"target-endpoint"`

View File

@@ -68,6 +68,11 @@ var configSchema = `
"description": "Also remove jobs from database", "description": "Also remove jobs from database",
"type": "boolean" "type": "boolean"
}, },
"omit-tagged": {
"description": "Omit tagged jobs from retention: none = include all, all = omit any tagged job, user = omit jobs with user-created tags (auto-tagger types 'app'/'jobClass' are not considered user tags)",
"type": "string",
"enum": ["none", "all", "user"]
},
"age": { "age": {
"description": "Act on jobs with startTime older than age (in days)", "description": "Act on jobs with startTime older than age (in days)",
"type": "integer" "type": "integer"