This commit is contained in:
2025-12-15 13:50:05 +01:00
parent 104fd1576a
commit 987cc40318
10 changed files with 0 additions and 0 deletions

View File

@@ -0,0 +1,41 @@
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved. This file is part of cc-backend.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package taskmanager
import (
"time"
"github.com/ClusterCockpit/cc-backend/internal/repository"
cclog "github.com/ClusterCockpit/cc-lib/ccLogger"
"github.com/go-co-op/gocron/v2"
)
func RegisterCommitJobService() {
var frequency string
if Keys.CommitJobWorker != "" {
frequency = Keys.CommitJobWorker
} else {
frequency = "2m"
}
d, err := parseDuration(frequency)
if err != nil {
cclog.Errorf("RegisterCommitJobService: %v", err)
return
}
cclog.Infof("register commitJob service with %s interval", frequency)
s.NewJob(gocron.DurationJob(d),
gocron.NewTask(
func() {
start := time.Now()
cclog.Debugf("jobcache sync started at %s\n", start.Format(time.RFC3339))
jobs, _ := jobRepo.SyncJobs()
repository.CallJobStartHooks(jobs)
cclog.Debugf("jobcache sync and job callbacks are done and took %s\n", time.Since(start))
}))
}

View File

@@ -0,0 +1,42 @@
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved. This file is part of cc-backend.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package taskmanager
import (
"time"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
cclog "github.com/ClusterCockpit/cc-lib/ccLogger"
"github.com/ClusterCockpit/cc-lib/schema"
"github.com/go-co-op/gocron/v2"
)
func RegisterCompressionService(compressOlderThan int) {
cclog.Info("Register compression service")
s.NewJob(gocron.DailyJob(1, gocron.NewAtTimes(gocron.NewAtTime(0o5, 0, 0))),
gocron.NewTask(
func() {
var jobs []*schema.Job
var err error
ar := archive.GetHandle()
startTime := time.Now().Unix() - int64(compressOlderThan*24*3600)
lastTime := ar.CompressLast(startTime)
if startTime == lastTime {
cclog.Info("Compression Service - Complete archive run")
jobs, err = jobRepo.FindJobsBetween(0, startTime, false)
} else {
jobs, err = jobRepo.FindJobsBetween(lastTime, startTime, false)
}
if err != nil {
cclog.Warnf("Error while looking for compression jobs: %v", err)
}
ar.Compress(jobs)
}))
}

View File

@@ -0,0 +1,12 @@
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved. This file is part of cc-backend.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
// Package taskmanager provides a background task scheduler for the cc-backend.
// It manages various periodic tasks such as job archiving (retention),
// database compression, LDAP synchronization, and statistic updates.
//
// The package uses the gocron library to schedule tasks. Configuration
// for the tasks is provided via JSON configs passed to the Start function.
package taskmanager

View File

@@ -0,0 +1,37 @@
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved. This file is part of cc-backend.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package taskmanager
import (
"time"
"github.com/ClusterCockpit/cc-backend/internal/auth"
cclog "github.com/ClusterCockpit/cc-lib/ccLogger"
"github.com/go-co-op/gocron/v2"
)
func RegisterLdapSyncService(ds string) {
interval, err := parseDuration(ds)
if err != nil {
cclog.Warnf("Could not parse duration for sync interval: %v",
ds)
return
}
auth := auth.GetAuthInstance()
cclog.Info("Register LDAP sync service")
s.NewJob(gocron.DurationJob(interval),
gocron.NewTask(
func() {
t := time.Now()
cclog.Infof("ldap sync started at %s", t.Format(time.RFC3339))
if err := auth.LdapAuth.Sync(); err != nil {
cclog.Errorf("ldap sync failed: %s", err.Error())
}
cclog.Print("ldap sync done")
}))
}

View File

@@ -0,0 +1,68 @@
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved. This file is part of cc-backend.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package taskmanager
import (
"time"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
cclog "github.com/ClusterCockpit/cc-lib/ccLogger"
"github.com/go-co-op/gocron/v2"
)
func RegisterRetentionDeleteService(age int, includeDB bool, omitTagged bool) {
cclog.Info("Register retention delete service")
s.NewJob(gocron.DailyJob(1, gocron.NewAtTimes(gocron.NewAtTime(0o4, 0, 0))),
gocron.NewTask(
func() {
startTime := time.Now().Unix() - int64(age*24*3600)
jobs, err := jobRepo.FindJobsBetween(0, startTime, omitTagged)
if err != nil {
cclog.Warnf("Error while looking for retention jobs: %s", err.Error())
}
archive.GetHandle().CleanUp(jobs)
if includeDB {
cnt, err := jobRepo.DeleteJobsBefore(startTime)
if err != nil {
cclog.Errorf("Error while deleting retention jobs from db: %s", err.Error())
} else {
cclog.Infof("Retention: Removed %d jobs from db", cnt)
}
if err = jobRepo.Optimize(); err != nil {
cclog.Errorf("Error occured in db optimization: %s", err.Error())
}
}
}))
}
func RegisterRetentionMoveService(age int, includeDB bool, location string, omitTagged bool) {
cclog.Info("Register retention move service")
s.NewJob(gocron.DailyJob(1, gocron.NewAtTimes(gocron.NewAtTime(0o4, 0, 0))),
gocron.NewTask(
func() {
startTime := time.Now().Unix() - int64(age*24*3600)
jobs, err := jobRepo.FindJobsBetween(0, startTime, omitTagged)
if err != nil {
cclog.Warnf("Error while looking for retention jobs: %s", err.Error())
}
archive.GetHandle().Move(jobs, location)
if includeDB {
cnt, err := jobRepo.DeleteJobsBefore(startTime)
if err != nil {
cclog.Errorf("Error while deleting retention jobs from db: %v", err)
} else {
cclog.Infof("Retention: Removed %d jobs from db", cnt)
}
if err = jobRepo.Optimize(); err != nil {
cclog.Errorf("Error occured in db optimization: %v", err)
}
}
}))
}

View File

@@ -0,0 +1,28 @@
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved. This file is part of cc-backend.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package taskmanager
import (
"runtime"
"github.com/ClusterCockpit/cc-backend/internal/config"
cclog "github.com/ClusterCockpit/cc-lib/ccLogger"
"github.com/go-co-op/gocron/v2"
)
func RegisterStopJobsExceedTime() {
cclog.Info("Register undead jobs service")
s.NewJob(gocron.DailyJob(1, gocron.NewAtTimes(gocron.NewAtTime(0o3, 0, 0))),
gocron.NewTask(
func() {
err := jobRepo.StopJobsExceedingWalltimeBy(config.Keys.StopJobsExceedingWalltime)
if err != nil {
cclog.Warnf("Error while looking for jobs exceeding their walltime: %s", err.Error())
}
runtime.GC()
}))
}

View File

@@ -0,0 +1,129 @@
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved. This file is part of cc-backend.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package taskmanager
import (
"bytes"
"encoding/json"
"time"
"github.com/ClusterCockpit/cc-backend/internal/auth"
"github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-backend/internal/repository"
cclog "github.com/ClusterCockpit/cc-lib/ccLogger"
"github.com/go-co-op/gocron/v2"
)
// Retention defines the configuration for job retention policies.
type Retention struct {
Policy string `json:"policy"`
Location string `json:"location"`
Age int `json:"age"`
IncludeDB bool `json:"includeDB"`
OmitTagged bool `json:"omitTagged"`
}
// CronFrequency defines the execution intervals for various background workers.
type CronFrequency struct {
// Duration Update Worker [Defaults to '2m']
CommitJobWorker string `json:"commit-job-worker"`
// Duration Update Worker [Defaults to '5m']
DurationWorker string `json:"duration-worker"`
// Metric-Footprint Update Worker [Defaults to '10m']
FootprintWorker string `json:"footprint-worker"`
}
var (
s gocron.Scheduler
jobRepo *repository.JobRepository
// Keys holds the configured frequencies for cron jobs.
Keys CronFrequency
)
// parseDuration parses a duration string and handles errors by logging them.
// It returns the duration and any error encountered.
func parseDuration(s string) (time.Duration, error) {
interval, err := time.ParseDuration(s)
if err != nil {
cclog.Warnf("Could not parse duration for sync interval: %v",
s)
return 0, err
}
if interval == 0 {
cclog.Info("TaskManager: Sync interval is zero")
}
return interval, nil
}
// Start initializes the task manager, parses configurations, and registers background tasks.
// It starts the gocron scheduler.
func Start(cronCfg, archiveConfig json.RawMessage) {
var err error
jobRepo = repository.GetJobRepository()
s, err = gocron.NewScheduler()
if err != nil {
cclog.Abortf("Taskmanager Start: Could not create gocron scheduler.\nError: %s\n", err.Error())
}
if config.Keys.StopJobsExceedingWalltime > 0 {
RegisterStopJobsExceedTime()
}
dec := json.NewDecoder(bytes.NewReader(cronCfg))
dec.DisallowUnknownFields()
if err := dec.Decode(&Keys); err != nil {
cclog.Errorf("error while decoding cron config: %v", err)
}
var cfg struct {
Retention Retention `json:"retention"`
Compression int `json:"compression"`
}
cfg.Retention.IncludeDB = true
if err := json.Unmarshal(archiveConfig, &cfg); err != nil {
cclog.Warn("Error while unmarshaling raw config json")
}
switch cfg.Retention.Policy {
case "delete":
RegisterRetentionDeleteService(
cfg.Retention.Age,
cfg.Retention.IncludeDB,
cfg.Retention.OmitTagged)
case "move":
RegisterRetentionMoveService(
cfg.Retention.Age,
cfg.Retention.IncludeDB,
cfg.Retention.Location,
cfg.Retention.OmitTagged)
}
if cfg.Compression > 0 {
RegisterCompressionService(cfg.Compression)
}
lc := auth.Keys.LdapConfig
if lc != nil && lc.SyncInterval != "" {
RegisterLdapSyncService(lc.SyncInterval)
}
RegisterFootprintWorker()
RegisterUpdateDurationWorker()
RegisterCommitJobService()
s.Start()
}
// Shutdown stops the task manager and its scheduler.
func Shutdown() {
if s != nil {
s.Shutdown()
}
}

View File

@@ -0,0 +1,52 @@
package taskmanager
import (
"encoding/json"
"testing"
"time"
)
func TestParseDuration(t *testing.T) {
tests := []struct {
input string
expected time.Duration
wantErr bool
}{
{"2m", 2 * time.Minute, false},
{"1h", 1 * time.Hour, false},
{"10s", 10 * time.Second, false},
{"invalid", 0, true},
{"", 0, true}, // time.ParseDuration returns error for empty string
{"0", 0, false},
}
for _, tt := range tests {
got, err := parseDuration(tt.input)
if (err != nil) != tt.wantErr {
t.Errorf("parseDuration(%q) error = %v, wantErr %v", tt.input, err, tt.wantErr)
continue
}
if got != tt.expected {
t.Errorf("parseDuration(%q) = %v, want %v", tt.input, got, tt.expected)
}
}
}
func TestCronFrequencyParsing(t *testing.T) {
jsonStr := `{"commit-job-worker": "10m", "duration-worker": "5m", "footprint-worker": "1h"}`
var keys CronFrequency
err := json.Unmarshal([]byte(jsonStr), &keys)
if err != nil {
t.Fatalf("Unmarshal failed: %v", err)
}
if keys.CommitJobWorker != "10m" {
t.Errorf("Expected 10m, got %s", keys.CommitJobWorker)
}
if keys.DurationWorker != "5m" {
t.Errorf("Expected 5m, got %s", keys.DurationWorker)
}
if keys.FootprintWorker != "1h" {
t.Errorf("Expected 1h, got %s", keys.FootprintWorker)
}
}

View File

@@ -0,0 +1,39 @@
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved. This file is part of cc-backend.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package taskmanager
import (
"time"
cclog "github.com/ClusterCockpit/cc-lib/ccLogger"
"github.com/go-co-op/gocron/v2"
)
func RegisterUpdateDurationWorker() {
var frequency string
if Keys.DurationWorker != "" {
frequency = Keys.DurationWorker
} else {
frequency = "5m"
}
d, err := parseDuration(frequency)
if err != nil {
cclog.Errorf("RegisterUpdateDurationWorker: %v", err)
return
}
cclog.Infof("Register Duration Update service with %s interval", frequency)
s.NewJob(gocron.DurationJob(d),
gocron.NewTask(
func() {
start := time.Now()
cclog.Infof("Update duration started at %s", start.Format(time.RFC3339))
jobRepo.UpdateDuration()
cclog.Infof("Update duration is done and took %s", time.Since(start))
}))
}

View File

@@ -0,0 +1,148 @@
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved. This file is part of cc-backend.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package taskmanager
import (
"context"
"math"
"time"
"github.com/ClusterCockpit/cc-backend/internal/metricdata"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
cclog "github.com/ClusterCockpit/cc-lib/ccLogger"
"github.com/ClusterCockpit/cc-lib/schema"
sq "github.com/Masterminds/squirrel"
"github.com/go-co-op/gocron/v2"
)
func RegisterFootprintWorker() {
var frequency string
if Keys.FootprintWorker != "" {
frequency = Keys.FootprintWorker
} else {
frequency = "10m"
}
d, err := parseDuration(frequency)
if err != nil {
cclog.Errorf("RegisterFootprintWorker: %v", err)
return
}
cclog.Infof("Register Footprint Update service with %s interval", frequency)
s.NewJob(gocron.DurationJob(d),
gocron.NewTask(
func() {
s := time.Now()
c := 0
ce := 0
cl := 0
cclog.Infof("Update Footprints started at %s", s.Format(time.RFC3339))
for _, cluster := range archive.Clusters {
sCluster := time.Now()
jobs, err := jobRepo.FindRunningJobs(cluster.Name)
if err != nil {
continue
}
// NOTE: Additional Subcluster Loop Could Allow For Limited List Of Footprint-Metrics Only.
// - Chunk-Size Would Then Be 'SubCluster' (Running Jobs, Transactions) as Lists Can Change Within SCs
// - Would Require Review of 'updateFootprint' Usage (Logic Could Possibly Be Included Here Completely)
allMetrics := make([]string, 0)
metricConfigs := archive.GetCluster(cluster.Name).MetricConfig
for _, mc := range metricConfigs {
allMetrics = append(allMetrics, mc.Name)
}
repo, err := metricdata.GetMetricDataRepo(cluster.Name)
if err != nil {
cclog.Errorf("no metric data repository configured for '%s'", cluster.Name)
continue
}
pendingStatements := []sq.UpdateBuilder{}
for _, job := range jobs {
cclog.Debugf("Prepare job %d", job.JobID)
cl++
sJob := time.Now()
jobStats, err := repo.LoadStats(job, allMetrics, context.Background())
if err != nil {
cclog.Errorf("error wile loading job data stats for footprint update: %v", err)
ce++
continue
}
job.Statistics = make(map[string]schema.JobStatistics)
for _, metric := range allMetrics {
avg, min, max := 0.0, 0.0, 0.0
data, ok := jobStats[metric] // JobStats[Metric1:[Hostname1:[Stats], Hostname2:[Stats], ...], Metric2[...] ...]
if ok {
for _, res := range job.Resources {
hostStats, ok := data[res.Hostname]
if ok {
avg += hostStats.Avg
min = math.Min(min, hostStats.Min)
max = math.Max(max, hostStats.Max)
}
}
}
// Add values rounded to 2 digits: repo.LoadStats may return unrounded
job.Statistics[metric] = schema.JobStatistics{
Unit: schema.Unit{
Prefix: archive.GetMetricConfig(job.Cluster, metric).Unit.Prefix,
Base: archive.GetMetricConfig(job.Cluster, metric).Unit.Base,
},
Avg: (math.Round((avg/float64(job.NumNodes))*100) / 100),
Min: (math.Round(min*100) / 100),
Max: (math.Round(max*100) / 100),
}
}
// Build Statement per Job, Add to Pending Array
stmt := sq.Update("job")
stmt, err = jobRepo.UpdateFootprint(stmt, job)
if err != nil {
cclog.Errorf("update job (dbid: %d) statement build failed at footprint step: %s", job.ID, err.Error())
ce++
continue
}
stmt = stmt.Where("job.id = ?", job.ID)
pendingStatements = append(pendingStatements, stmt)
cclog.Debugf("Job %d took %s", job.JobID, time.Since(sJob))
}
t, err := jobRepo.TransactionInit()
if err != nil {
cclog.Errorf("failed TransactionInit %v", err)
cclog.Errorf("skipped %d transactions for cluster %s", len(pendingStatements), cluster.Name)
ce += len(pendingStatements)
} else {
for _, ps := range pendingStatements {
query, args, err := ps.ToSql()
if err != nil {
cclog.Errorf("failed in ToSQL conversion: %v", err)
ce++
} else {
// args...: Footprint-JSON, Energyfootprint-JSON, TotalEnergy, JobID
jobRepo.TransactionAdd(t, query, args...)
c++
}
}
jobRepo.TransactionEnd(t)
}
cclog.Debugf("Finish Cluster %s, took %s\n", cluster.Name, time.Since(sCluster))
}
cclog.Infof("Updating %d (of %d; Skipped %d) Footprints is done and took %s", c, cl, ce, time.Since(s))
}))
}