This commit is contained in:
2025-12-15 13:44:50 +01:00
parent 72ce3954b4
commit 104fd1576a
9 changed files with 117 additions and 15 deletions

View File

@@ -2,6 +2,7 @@
// All rights reserved. This file is part of cc-backend. // All rights reserved. This file is part of cc-backend.
// Use of this source code is governed by a MIT-style // Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file. // license that can be found in the LICENSE file.
package taskmanager package taskmanager
import ( import (
@@ -19,7 +20,13 @@ func RegisterCommitJobService() {
} else { } else {
frequency = "2m" frequency = "2m"
} }
d, _ := time.ParseDuration(frequency)
d, err := parseDuration(frequency)
if err != nil {
cclog.Errorf("RegisterCommitJobService: %v", err)
return
}
cclog.Infof("register commitJob service with %s interval", frequency) cclog.Infof("register commitJob service with %s interval", frequency)
s.NewJob(gocron.DurationJob(d), s.NewJob(gocron.DurationJob(d),

View File

@@ -2,6 +2,7 @@
// All rights reserved. This file is part of cc-backend. // All rights reserved. This file is part of cc-backend.
// Use of this source code is governed by a MIT-style // Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file. // license that can be found in the LICENSE file.
package taskmanager package taskmanager
import ( import (

View File

@@ -0,0 +1,12 @@
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved. This file is part of cc-backend.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
// Package taskmanager provides a background task scheduler for the cc-backend.
// It manages various periodic tasks such as job archiving (retention),
// database compression, LDAP synchronization, and statistic updates.
//
// The package uses the gocron library to schedule tasks. Configuration
// for the tasks is provided via JSON configs passed to the Start function.
package taskmanager

View File

@@ -2,6 +2,7 @@
// All rights reserved. This file is part of cc-backend. // All rights reserved. This file is part of cc-backend.
// Use of this source code is governed by a MIT-style // Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file. // license that can be found in the LICENSE file.
package taskmanager package taskmanager
import ( import (

View File

@@ -2,6 +2,7 @@
// All rights reserved. This file is part of cc-backend. // All rights reserved. This file is part of cc-backend.
// Use of this source code is governed by a MIT-style // Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file. // license that can be found in the LICENSE file.
package taskmanager package taskmanager
import ( import (

View File

@@ -2,6 +2,7 @@
// All rights reserved. This file is part of cc-backend. // All rights reserved. This file is part of cc-backend.
// Use of this source code is governed by a MIT-style // Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file. // license that can be found in the LICENSE file.
package taskmanager package taskmanager
import ( import (
@@ -16,13 +17,16 @@ import (
"github.com/go-co-op/gocron/v2" "github.com/go-co-op/gocron/v2"
) )
// Retention defines the configuration for job retention policies.
type Retention struct { type Retention struct {
Policy string `json:"policy"` Policy string `json:"policy"`
Location string `json:"location"` Location string `json:"location"`
Age int `json:"age"` Age int `json:"age"`
IncludeDB bool `json:"includeDB"` IncludeDB bool `json:"includeDB"`
OmitTagged bool `json:"omitTagged"`
} }
// CronFrequency defines the execution intervals for various background workers.
type CronFrequency struct { type CronFrequency struct {
// Duration Update Worker [Defaults to '2m'] // Duration Update Worker [Defaults to '2m']
CommitJobWorker string `json:"commit-job-worker"` CommitJobWorker string `json:"commit-job-worker"`
@@ -35,9 +39,12 @@ type CronFrequency struct {
var ( var (
s gocron.Scheduler s gocron.Scheduler
jobRepo *repository.JobRepository jobRepo *repository.JobRepository
// Keys holds the configured frequencies for cron jobs.
Keys CronFrequency Keys CronFrequency
) )
// parseDuration parses a duration string and handles errors by logging them.
// It returns the duration and any error encountered.
func parseDuration(s string) (time.Duration, error) { func parseDuration(s string) (time.Duration, error) {
interval, err := time.ParseDuration(s) interval, err := time.ParseDuration(s)
if err != nil { if err != nil {
@@ -53,6 +60,8 @@ func parseDuration(s string) (time.Duration, error) {
return interval, nil return interval, nil
} }
// Start initializes the task manager, parses configurations, and registers background tasks.
// It starts the gocron scheduler.
func Start(cronCfg, archiveConfig json.RawMessage) { func Start(cronCfg, archiveConfig json.RawMessage) {
var err error var err error
jobRepo = repository.GetJobRepository() jobRepo = repository.GetJobRepository()
@@ -85,12 +94,14 @@ func Start(cronCfg, archiveConfig json.RawMessage) {
case "delete": case "delete":
RegisterRetentionDeleteService( RegisterRetentionDeleteService(
cfg.Retention.Age, cfg.Retention.Age,
cfg.Retention.IncludeDB) cfg.Retention.IncludeDB,
cfg.Retention.OmitTagged)
case "move": case "move":
RegisterRetentionMoveService( RegisterRetentionMoveService(
cfg.Retention.Age, cfg.Retention.Age,
cfg.Retention.IncludeDB, cfg.Retention.IncludeDB,
cfg.Retention.Location) cfg.Retention.Location,
cfg.Retention.OmitTagged)
} }
if cfg.Compression > 0 { if cfg.Compression > 0 {
@@ -110,6 +121,9 @@ func Start(cronCfg, archiveConfig json.RawMessage) {
s.Start() s.Start()
} }
// Shutdown stops the task manager and its scheduler.
func Shutdown() { func Shutdown() {
if s != nil {
s.Shutdown() s.Shutdown()
} }
}

View File

@@ -0,0 +1,52 @@
package taskmanager
import (
"encoding/json"
"testing"
"time"
)
func TestParseDuration(t *testing.T) {
tests := []struct {
input string
expected time.Duration
wantErr bool
}{
{"2m", 2 * time.Minute, false},
{"1h", 1 * time.Hour, false},
{"10s", 10 * time.Second, false},
{"invalid", 0, true},
{"", 0, true}, // time.ParseDuration returns error for empty string
{"0", 0, false},
}
for _, tt := range tests {
got, err := parseDuration(tt.input)
if (err != nil) != tt.wantErr {
t.Errorf("parseDuration(%q) error = %v, wantErr %v", tt.input, err, tt.wantErr)
continue
}
if got != tt.expected {
t.Errorf("parseDuration(%q) = %v, want %v", tt.input, got, tt.expected)
}
}
}
func TestCronFrequencyParsing(t *testing.T) {
jsonStr := `{"commit-job-worker": "10m", "duration-worker": "5m", "footprint-worker": "1h"}`
var keys CronFrequency
err := json.Unmarshal([]byte(jsonStr), &keys)
if err != nil {
t.Fatalf("Unmarshal failed: %v", err)
}
if keys.CommitJobWorker != "10m" {
t.Errorf("Expected 10m, got %s", keys.CommitJobWorker)
}
if keys.DurationWorker != "5m" {
t.Errorf("Expected 5m, got %s", keys.DurationWorker)
}
if keys.FootprintWorker != "1h" {
t.Errorf("Expected 1h, got %s", keys.FootprintWorker)
}
}

View File

@@ -2,6 +2,7 @@
// All rights reserved. This file is part of cc-backend. // All rights reserved. This file is part of cc-backend.
// Use of this source code is governed by a MIT-style // Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file. // license that can be found in the LICENSE file.
package taskmanager package taskmanager
import ( import (
@@ -18,7 +19,13 @@ func RegisterUpdateDurationWorker() {
} else { } else {
frequency = "5m" frequency = "5m"
} }
d, _ := time.ParseDuration(frequency)
d, err := parseDuration(frequency)
if err != nil {
cclog.Errorf("RegisterUpdateDurationWorker: %v", err)
return
}
cclog.Infof("Register Duration Update service with %s interval", frequency) cclog.Infof("Register Duration Update service with %s interval", frequency)
s.NewJob(gocron.DurationJob(d), s.NewJob(gocron.DurationJob(d),

View File

@@ -2,6 +2,7 @@
// All rights reserved. This file is part of cc-backend. // All rights reserved. This file is part of cc-backend.
// Use of this source code is governed by a MIT-style // Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file. // license that can be found in the LICENSE file.
package taskmanager package taskmanager
import ( import (
@@ -24,7 +25,13 @@ func RegisterFootprintWorker() {
} else { } else {
frequency = "10m" frequency = "10m"
} }
d, _ := time.ParseDuration(frequency)
d, err := parseDuration(frequency)
if err != nil {
cclog.Errorf("RegisterFootprintWorker: %v", err)
return
}
cclog.Infof("Register Footprint Update service with %s interval", frequency) cclog.Infof("Register Footprint Update service with %s interval", frequency)
s.NewJob(gocron.DurationJob(d), s.NewJob(gocron.DurationJob(d),
@@ -37,7 +44,7 @@ func RegisterFootprintWorker() {
cclog.Infof("Update Footprints started at %s", s.Format(time.RFC3339)) cclog.Infof("Update Footprints started at %s", s.Format(time.RFC3339))
for _, cluster := range archive.Clusters { for _, cluster := range archive.Clusters {
s_cluster := time.Now() sCluster := time.Now()
jobs, err := jobRepo.FindRunningJobs(cluster.Name) jobs, err := jobRepo.FindRunningJobs(cluster.Name)
if err != nil { if err != nil {
continue continue
@@ -63,7 +70,7 @@ func RegisterFootprintWorker() {
cclog.Debugf("Prepare job %d", job.JobID) cclog.Debugf("Prepare job %d", job.JobID)
cl++ cl++
s_job := time.Now() sJob := time.Now()
jobStats, err := repo.LoadStats(job, allMetrics, context.Background()) jobStats, err := repo.LoadStats(job, allMetrics, context.Background())
if err != nil { if err != nil {
@@ -112,7 +119,7 @@ func RegisterFootprintWorker() {
stmt = stmt.Where("job.id = ?", job.ID) stmt = stmt.Where("job.id = ?", job.ID)
pendingStatements = append(pendingStatements, stmt) pendingStatements = append(pendingStatements, stmt)
cclog.Debugf("Job %d took %s", job.JobID, time.Since(s_job)) cclog.Debugf("Job %d took %s", job.JobID, time.Since(sJob))
} }
t, err := jobRepo.TransactionInit() t, err := jobRepo.TransactionInit()
@@ -134,7 +141,7 @@ func RegisterFootprintWorker() {
} }
jobRepo.TransactionEnd(t) jobRepo.TransactionEnd(t)
} }
cclog.Debugf("Finish Cluster %s, took %s\n", cluster.Name, time.Since(s_cluster)) cclog.Debugf("Finish Cluster %s, took %s\n", cluster.Name, time.Since(sCluster))
} }
cclog.Infof("Updating %d (of %d; Skipped %d) Footprints is done and took %s", c, cl, ce, time.Since(s)) cclog.Infof("Updating %d (of %d; Skipped %d) Footprints is done and took %s", c, cl, ce, time.Since(s))
})) }))