From 03c65e06f6baf1b92690739caf377fea0001a521 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Mon, 23 Feb 2026 08:46:47 +0100 Subject: [PATCH] Allow finer control for omit tagged jobs in retention policies --- cmd/cc-backend/main.go | 6 +- internal/api/job.go | 10 ++-- internal/repository/job.go | 30 +++++++--- internal/repository/job_test.go | 64 ++++++++++++++++++---- internal/taskmanager/compressionService.go | 4 +- internal/taskmanager/retentionService.go | 2 +- internal/taskmanager/taskManager.go | 4 +- pkg/archive/ConfigSchema.go | 5 ++ 8 files changed, 92 insertions(+), 33 deletions(-) diff --git a/cmd/cc-backend/main.go b/cmd/cc-backend/main.go index fde95fd3..81d397d2 100644 --- a/cmd/cc-backend/main.go +++ b/cmd/cc-backend/main.go @@ -369,13 +369,11 @@ func runServer(ctx context.Context) error { errChan := make(chan error, 1) // Start HTTP server - wg.Add(1) - go func() { - defer wg.Done() + wg.Go(func() { if err := srv.Start(ctx); err != nil { errChan <- err } - }() + }) // Handle shutdown signals wg.Add(1) diff --git a/internal/api/job.go b/internal/api/job.go index 1322225b..62410001 100644 --- a/internal/api/job.go +++ b/internal/api/job.go @@ -904,11 +904,13 @@ func (api *RestAPI) deleteJobBefore(rw http.ResponseWriter, r *http.Request) { } // Check for omit-tagged query parameter - omitTagged := false + omitTagged := "none" if omitTaggedStr := r.URL.Query().Get("omit-tagged"); omitTaggedStr != "" { - omitTagged, e = strconv.ParseBool(omitTaggedStr) - if e != nil { - handleError(fmt.Errorf("boolean expected for omit-tagged parameter: %w", e), http.StatusBadRequest, rw) + switch omitTaggedStr { + case "none", "all", "user": + omitTagged = omitTaggedStr + default: + handleError(fmt.Errorf("omit-tagged must be one of: none, all, user"), http.StatusBadRequest, rw) return } } diff --git a/internal/repository/job.go b/internal/repository/job.go index a1cd9719..8055ca37 100644 --- a/internal/repository/job.go +++ b/internal/repository/job.go @@ -392,15 +392,19 @@ func (r *JobRepository) FetchEnergyFootprint(job *schema.Job) (map[string]float6 // // Parameters: // - startTime: Unix timestamp, jobs with start_time < this value will be deleted -// - omitTagged: If true, skip jobs that have associated tags (jobtag entries) +// - omitTagged: "none" = delete all jobs, "all" = skip any tagged jobs, +// "user" = skip jobs with user-created tags (not auto-tagger types "app"/"jobClass") // // Returns the count of deleted jobs or an error if the operation fails. -func (r *JobRepository) DeleteJobsBefore(startTime int64, omitTagged bool) (int, error) { +func (r *JobRepository) DeleteJobsBefore(startTime int64, omitTagged string) (int, error) { var cnt int q := sq.Select("count(*)").From("job").Where("job.start_time < ?", startTime) - if omitTagged { + switch omitTagged { + case "all": q = q.Where("NOT EXISTS (SELECT 1 FROM jobtag WHERE jobtag.job_id = job.id)") + case "user": + q = q.Where("NOT EXISTS (SELECT 1 FROM jobtag JOIN tag ON tag.id = jobtag.tag_id WHERE jobtag.job_id = job.id AND tag.tag_type NOT IN ('app', 'jobClass'))") } if err := q.RunWith(r.DB).QueryRow().Scan(&cnt); err != nil { @@ -413,8 +417,11 @@ func (r *JobRepository) DeleteJobsBefore(startTime int64, omitTagged bool) (int, var jobIds []int64 selectQuery := sq.Select("id").From("job").Where("job.start_time < ?", startTime) - if omitTagged { + switch omitTagged { + case "all": selectQuery = selectQuery.Where("NOT EXISTS (SELECT 1 FROM jobtag WHERE jobtag.job_id = job.id)") + case "user": + selectQuery = selectQuery.Where("NOT EXISTS (SELECT 1 FROM jobtag JOIN tag ON tag.id = jobtag.tag_id WHERE jobtag.job_id = job.id AND tag.tag_type NOT IN ('app', 'jobClass'))") } rows, err := selectQuery.RunWith(r.DB).Query() @@ -436,8 +443,11 @@ func (r *JobRepository) DeleteJobsBefore(startTime int64, omitTagged bool) (int, qd := sq.Delete("job").Where("job.start_time < ?", startTime) - if omitTagged { + switch omitTagged { + case "all": qd = qd.Where("NOT EXISTS (SELECT 1 FROM jobtag WHERE jobtag.job_id = job.id)") + case "user": + qd = qd.Where("NOT EXISTS (SELECT 1 FROM jobtag JOIN tag ON tag.id = jobtag.tag_id WHERE jobtag.job_id = job.id AND tag.tag_type NOT IN ('app', 'jobClass'))") } _, err := qd.RunWith(r.DB).Exec() @@ -822,10 +832,11 @@ func (r *JobRepository) UpdateDuration() error { // Parameters: // - startTimeBegin: Unix timestamp for range start (use 0 for unbounded start) // - startTimeEnd: Unix timestamp for range end -// - omitTagged: If true, exclude jobs with associated tags +// - omitTagged: "none" = include all jobs, "all" = exclude any tagged jobs, +// "user" = exclude jobs with user-created tags (not auto-tagger types "app"/"jobClass") // // Returns a slice of jobs or an error if the time range is invalid or query fails. -func (r *JobRepository) FindJobsBetween(startTimeBegin int64, startTimeEnd int64, omitTagged bool) ([]*schema.Job, error) { +func (r *JobRepository) FindJobsBetween(startTimeBegin int64, startTimeEnd int64, omitTagged string) ([]*schema.Job, error) { var query sq.SelectBuilder if startTimeBegin == startTimeEnd || startTimeBegin > startTimeEnd { @@ -840,8 +851,11 @@ func (r *JobRepository) FindJobsBetween(startTimeBegin int64, startTimeEnd int64 query = sq.Select(jobColumns...).From("job").Where("job.start_time BETWEEN ? AND ?", startTimeBegin, startTimeEnd) } - if omitTagged { + switch omitTagged { + case "all": query = query.Where("NOT EXISTS (SELECT 1 FROM jobtag WHERE jobtag.job_id = job.id)") + case "user": + query = query.Where("NOT EXISTS (SELECT 1 FROM jobtag JOIN tag ON tag.id = jobtag.tag_id WHERE jobtag.job_id = job.id AND tag.tag_type NOT IN ('app', 'jobClass'))") } query = query.OrderBy("job.cluster ASC", "job.subcluster ASC", "job.project ASC", "job.start_time ASC") diff --git a/internal/repository/job_test.go b/internal/repository/job_test.go index 9f4871fd..992251af 100644 --- a/internal/repository/job_test.go +++ b/internal/repository/job_test.go @@ -78,7 +78,7 @@ func TestFindJobsBetween(t *testing.T) { // 1. Find a job to use (Find all jobs) // We use a large time range to ensure we get something if it exists - jobs, err := r.FindJobsBetween(0, 9999999999, false) + jobs, err := r.FindJobsBetween(0, 9999999999, "none") if err != nil { t.Fatal(err) } @@ -88,21 +88,21 @@ func TestFindJobsBetween(t *testing.T) { targetJob := jobs[0] - // 2. Create a tag - tagName := fmt.Sprintf("testtag_%d", time.Now().UnixNano()) - tagID, err := r.CreateTag("testtype", tagName, "global") + // 2. Create an auto-tagger tag (type "app") + appTagName := fmt.Sprintf("apptag_%d", time.Now().UnixNano()) + appTagID, err := r.CreateTag("app", appTagName, "global") if err != nil { t.Fatal(err) } - // 3. Link Tag (Manually to avoid archive dependency side-effects in unit test) - _, err = r.DB.Exec("INSERT INTO jobtag (job_id, tag_id) VALUES (?, ?)", *targetJob.ID, tagID) + // 3. Link auto-tagger tag to job + _, err = r.DB.Exec("INSERT INTO jobtag (job_id, tag_id) VALUES (?, ?)", *targetJob.ID, appTagID) if err != nil { t.Fatal(err) } - // 4. Search with omitTagged = false (Should find the job) - jobsFound, err := r.FindJobsBetween(0, 9999999999, false) + // 4. Search with omitTagged = "none" (Should find the job) + jobsFound, err := r.FindJobsBetween(0, 9999999999, "none") if err != nil { t.Fatal(err) } @@ -115,18 +115,58 @@ func TestFindJobsBetween(t *testing.T) { } } if !found { - t.Errorf("Target job %d should be found when omitTagged=false", *targetJob.ID) + t.Errorf("Target job %d should be found when omitTagged=none", *targetJob.ID) } - // 5. Search with omitTagged = true (Should NOT find the job) - jobsFiltered, err := r.FindJobsBetween(0, 9999999999, true) + // 5. Search with omitTagged = "all" (Should NOT find the job — it has a tag) + jobsFiltered, err := r.FindJobsBetween(0, 9999999999, "all") if err != nil { t.Fatal(err) } for _, j := range jobsFiltered { if *j.ID == *targetJob.ID { - t.Errorf("Target job %d should NOT be found when omitTagged=true", *targetJob.ID) + t.Errorf("Target job %d should NOT be found when omitTagged=all", *targetJob.ID) + } + } + + // 6. Search with omitTagged = "user": auto-tagger tag ("app") should NOT exclude the job + jobsUserFilter, err := r.FindJobsBetween(0, 9999999999, "user") + if err != nil { + t.Fatal(err) + } + + found = false + for _, j := range jobsUserFilter { + if *j.ID == *targetJob.ID { + found = true + break + } + } + if !found { + t.Errorf("Target job %d should be found when omitTagged=user (only has auto-tagger tag)", *targetJob.ID) + } + + // 7. Add a user-created tag (type "testtype") to the same job + userTagName := fmt.Sprintf("usertag_%d", time.Now().UnixNano()) + userTagID, err := r.CreateTag("testtype", userTagName, "global") + if err != nil { + t.Fatal(err) + } + _, err = r.DB.Exec("INSERT INTO jobtag (job_id, tag_id) VALUES (?, ?)", *targetJob.ID, userTagID) + if err != nil { + t.Fatal(err) + } + + // 8. Now omitTagged = "user" should exclude the job (has a user-created tag) + jobsUserFilter2, err := r.FindJobsBetween(0, 9999999999, "user") + if err != nil { + t.Fatal(err) + } + + for _, j := range jobsUserFilter2 { + if *j.ID == *targetJob.ID { + t.Errorf("Target job %d should NOT be found when omitTagged=user (has user-created tag)", *targetJob.ID) } } } diff --git a/internal/taskmanager/compressionService.go b/internal/taskmanager/compressionService.go index ab01ce8f..353fcb65 100644 --- a/internal/taskmanager/compressionService.go +++ b/internal/taskmanager/compressionService.go @@ -28,10 +28,10 @@ func RegisterCompressionService(compressOlderThan int) { lastTime := ar.CompressLast(startTime) if startTime == lastTime { cclog.Info("Compression Service - Complete archive run") - jobs, err = jobRepo.FindJobsBetween(0, startTime, false) + jobs, err = jobRepo.FindJobsBetween(0, startTime, "none") } else { - jobs, err = jobRepo.FindJobsBetween(lastTime, startTime, false) + jobs, err = jobRepo.FindJobsBetween(lastTime, startTime, "none") } if err != nil { diff --git a/internal/taskmanager/retentionService.go b/internal/taskmanager/retentionService.go index eda452e6..48e5c042 100644 --- a/internal/taskmanager/retentionService.go +++ b/internal/taskmanager/retentionService.go @@ -149,7 +149,7 @@ func transferJobsParquet(jobs []*schema.Job, src archive.ArchiveBackend, target } // cleanupAfterTransfer removes jobs from archive and optionally from DB. -func cleanupAfterTransfer(jobs []*schema.Job, startTime int64, includeDB bool, omitTagged bool) { +func cleanupAfterTransfer(jobs []*schema.Job, startTime int64, includeDB bool, omitTagged string) { archive.GetHandle().CleanUp(jobs) if includeDB { diff --git a/internal/taskmanager/taskManager.go b/internal/taskmanager/taskManager.go index b25b2a93..d758ee52 100644 --- a/internal/taskmanager/taskManager.go +++ b/internal/taskmanager/taskManager.go @@ -26,8 +26,8 @@ type Retention struct { Policy string `json:"policy"` Format string `json:"format"` Age int `json:"age"` - IncludeDB bool `json:"includeDB"` - OmitTagged bool `json:"omitTagged"` + IncludeDB bool `json:"include-db"` + OmitTagged string `json:"omit-tagged"` TargetKind string `json:"target-kind"` TargetPath string `json:"target-path"` TargetEndpoint string `json:"target-endpoint"` diff --git a/pkg/archive/ConfigSchema.go b/pkg/archive/ConfigSchema.go index cb9b16bc..1c2b7fe1 100644 --- a/pkg/archive/ConfigSchema.go +++ b/pkg/archive/ConfigSchema.go @@ -68,6 +68,11 @@ var configSchema = ` "description": "Also remove jobs from database", "type": "boolean" }, + "omit-tagged": { + "description": "Omit tagged jobs from retention: none = include all, all = omit any tagged job, user = omit jobs with user-created tags (auto-tagger types 'app'/'jobClass' are not considered user tags)", + "type": "string", + "enum": ["none", "all", "user"] + }, "age": { "description": "Act on jobs with startTime older than age (in days)", "type": "integer"