diff --git a/internal/taskManager/commitJobService.go b/internal/taskManager/commitJobService.go index 87d2a3c..4f21c86 100644 --- a/internal/taskManager/commitJobService.go +++ b/internal/taskManager/commitJobService.go @@ -2,6 +2,7 @@ // All rights reserved. This file is part of cc-backend. // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. + package taskmanager import ( @@ -19,7 +20,13 @@ func RegisterCommitJobService() { } else { frequency = "2m" } - d, _ := time.ParseDuration(frequency) + + d, err := parseDuration(frequency) + if err != nil { + cclog.Errorf("RegisterCommitJobService: %v", err) + return + } + cclog.Infof("register commitJob service with %s interval", frequency) s.NewJob(gocron.DurationJob(d), diff --git a/internal/taskManager/compressionService.go b/internal/taskManager/compressionService.go index 0abe167..c2df852 100644 --- a/internal/taskManager/compressionService.go +++ b/internal/taskManager/compressionService.go @@ -2,6 +2,7 @@ // All rights reserved. This file is part of cc-backend. // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. + package taskmanager import ( diff --git a/internal/taskManager/doc.go b/internal/taskManager/doc.go new file mode 100644 index 0000000..007192e --- /dev/null +++ b/internal/taskManager/doc.go @@ -0,0 +1,12 @@ +// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. This file is part of cc-backend. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +// Package taskmanager provides a background task scheduler for the cc-backend. +// It manages various periodic tasks such as job archiving (retention), +// database compression, LDAP synchronization, and statistic updates. +// +// The package uses the gocron library to schedule tasks. Configuration +// for the tasks is provided via JSON configs passed to the Start function. +package taskmanager diff --git a/internal/taskManager/ldapSyncService.go b/internal/taskManager/ldapSyncService.go index 47aaf64..e410af9 100644 --- a/internal/taskManager/ldapSyncService.go +++ b/internal/taskManager/ldapSyncService.go @@ -2,6 +2,7 @@ // All rights reserved. This file is part of cc-backend. // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. + package taskmanager import ( diff --git a/internal/taskManager/stopJobsExceedTime.go b/internal/taskManager/stopJobsExceedTime.go index d22f791..e59b3ae 100644 --- a/internal/taskManager/stopJobsExceedTime.go +++ b/internal/taskManager/stopJobsExceedTime.go @@ -2,6 +2,7 @@ // All rights reserved. This file is part of cc-backend. // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. + package taskmanager import ( diff --git a/internal/taskManager/taskManager.go b/internal/taskManager/taskManager.go index 44a1c22..57f2d88 100644 --- a/internal/taskManager/taskManager.go +++ b/internal/taskManager/taskManager.go @@ -2,6 +2,7 @@ // All rights reserved. This file is part of cc-backend. // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. + package taskmanager import ( @@ -16,13 +17,16 @@ import ( "github.com/go-co-op/gocron/v2" ) +// Retention defines the configuration for job retention policies. type Retention struct { - Policy string `json:"policy"` - Location string `json:"location"` - Age int `json:"age"` - IncludeDB bool `json:"includeDB"` + Policy string `json:"policy"` + Location string `json:"location"` + Age int `json:"age"` + IncludeDB bool `json:"includeDB"` + OmitTagged bool `json:"omitTagged"` } +// CronFrequency defines the execution intervals for various background workers. type CronFrequency struct { // Duration Update Worker [Defaults to '2m'] CommitJobWorker string `json:"commit-job-worker"` @@ -35,9 +39,12 @@ type CronFrequency struct { var ( s gocron.Scheduler jobRepo *repository.JobRepository - Keys CronFrequency + // Keys holds the configured frequencies for cron jobs. + Keys CronFrequency ) +// parseDuration parses a duration string and handles errors by logging them. +// It returns the duration and any error encountered. func parseDuration(s string) (time.Duration, error) { interval, err := time.ParseDuration(s) if err != nil { @@ -53,6 +60,8 @@ func parseDuration(s string) (time.Duration, error) { return interval, nil } +// Start initializes the task manager, parses configurations, and registers background tasks. +// It starts the gocron scheduler. func Start(cronCfg, archiveConfig json.RawMessage) { var err error jobRepo = repository.GetJobRepository() @@ -85,12 +94,14 @@ func Start(cronCfg, archiveConfig json.RawMessage) { case "delete": RegisterRetentionDeleteService( cfg.Retention.Age, - cfg.Retention.IncludeDB) + cfg.Retention.IncludeDB, + cfg.Retention.OmitTagged) case "move": RegisterRetentionMoveService( cfg.Retention.Age, cfg.Retention.IncludeDB, - cfg.Retention.Location) + cfg.Retention.Location, + cfg.Retention.OmitTagged) } if cfg.Compression > 0 { @@ -110,6 +121,9 @@ func Start(cronCfg, archiveConfig json.RawMessage) { s.Start() } +// Shutdown stops the task manager and its scheduler. func Shutdown() { - s.Shutdown() + if s != nil { + s.Shutdown() + } } diff --git a/internal/taskManager/taskManager_test.go b/internal/taskManager/taskManager_test.go new file mode 100644 index 0000000..3d15e96 --- /dev/null +++ b/internal/taskManager/taskManager_test.go @@ -0,0 +1,52 @@ +package taskmanager + +import ( + "encoding/json" + "testing" + "time" +) + +func TestParseDuration(t *testing.T) { + tests := []struct { + input string + expected time.Duration + wantErr bool + }{ + {"2m", 2 * time.Minute, false}, + {"1h", 1 * time.Hour, false}, + {"10s", 10 * time.Second, false}, + {"invalid", 0, true}, + {"", 0, true}, // time.ParseDuration returns error for empty string + {"0", 0, false}, + } + + for _, tt := range tests { + got, err := parseDuration(tt.input) + if (err != nil) != tt.wantErr { + t.Errorf("parseDuration(%q) error = %v, wantErr %v", tt.input, err, tt.wantErr) + continue + } + if got != tt.expected { + t.Errorf("parseDuration(%q) = %v, want %v", tt.input, got, tt.expected) + } + } +} + +func TestCronFrequencyParsing(t *testing.T) { + jsonStr := `{"commit-job-worker": "10m", "duration-worker": "5m", "footprint-worker": "1h"}` + var keys CronFrequency + err := json.Unmarshal([]byte(jsonStr), &keys) + if err != nil { + t.Fatalf("Unmarshal failed: %v", err) + } + + if keys.CommitJobWorker != "10m" { + t.Errorf("Expected 10m, got %s", keys.CommitJobWorker) + } + if keys.DurationWorker != "5m" { + t.Errorf("Expected 5m, got %s", keys.DurationWorker) + } + if keys.FootprintWorker != "1h" { + t.Errorf("Expected 1h, got %s", keys.FootprintWorker) + } +} diff --git a/internal/taskManager/updateDurationService.go b/internal/taskManager/updateDurationService.go index 43fbb54..9c52da7 100644 --- a/internal/taskManager/updateDurationService.go +++ b/internal/taskManager/updateDurationService.go @@ -2,6 +2,7 @@ // All rights reserved. This file is part of cc-backend. // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. + package taskmanager import ( @@ -18,7 +19,13 @@ func RegisterUpdateDurationWorker() { } else { frequency = "5m" } - d, _ := time.ParseDuration(frequency) + + d, err := parseDuration(frequency) + if err != nil { + cclog.Errorf("RegisterUpdateDurationWorker: %v", err) + return + } + cclog.Infof("Register Duration Update service with %s interval", frequency) s.NewJob(gocron.DurationJob(d), diff --git a/internal/taskManager/updateFootprintService.go b/internal/taskManager/updateFootprintService.go index 89914e7..ae9512c 100644 --- a/internal/taskManager/updateFootprintService.go +++ b/internal/taskManager/updateFootprintService.go @@ -2,6 +2,7 @@ // All rights reserved. This file is part of cc-backend. // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. + package taskmanager import ( @@ -24,7 +25,13 @@ func RegisterFootprintWorker() { } else { frequency = "10m" } - d, _ := time.ParseDuration(frequency) + + d, err := parseDuration(frequency) + if err != nil { + cclog.Errorf("RegisterFootprintWorker: %v", err) + return + } + cclog.Infof("Register Footprint Update service with %s interval", frequency) s.NewJob(gocron.DurationJob(d), @@ -37,7 +44,7 @@ func RegisterFootprintWorker() { cclog.Infof("Update Footprints started at %s", s.Format(time.RFC3339)) for _, cluster := range archive.Clusters { - s_cluster := time.Now() + sCluster := time.Now() jobs, err := jobRepo.FindRunningJobs(cluster.Name) if err != nil { continue @@ -63,7 +70,7 @@ func RegisterFootprintWorker() { cclog.Debugf("Prepare job %d", job.JobID) cl++ - s_job := time.Now() + sJob := time.Now() jobStats, err := repo.LoadStats(job, allMetrics, context.Background()) if err != nil { @@ -112,7 +119,7 @@ func RegisterFootprintWorker() { stmt = stmt.Where("job.id = ?", job.ID) pendingStatements = append(pendingStatements, stmt) - cclog.Debugf("Job %d took %s", job.JobID, time.Since(s_job)) + cclog.Debugf("Job %d took %s", job.JobID, time.Since(sJob)) } t, err := jobRepo.TransactionInit() @@ -134,7 +141,7 @@ func RegisterFootprintWorker() { } jobRepo.TransactionEnd(t) } - cclog.Debugf("Finish Cluster %s, took %s\n", cluster.Name, time.Since(s_cluster)) + cclog.Debugf("Finish Cluster %s, took %s\n", cluster.Name, time.Since(sCluster)) } cclog.Infof("Updating %d (of %d; Skipped %d) Footprints is done and took %s", c, cl, ce, time.Since(s)) }))