From 39c09f8565fe0f63f808416ad018fea3e6453d13 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Tue, 3 Sep 2024 10:03:38 +0200 Subject: [PATCH] Introduce job duration update task --- internal/repository/job.go | 17 +++++++++++- internal/taskManager/taskManager.go | 1 + internal/taskManager/updateDurationService.go | 26 +++++++++++++++++++ ...ntService.go => updateFootprintService.go} | 0 4 files changed, 43 insertions(+), 1 deletion(-) create mode 100644 internal/taskManager/updateDurationService.go rename internal/taskManager/{footprintService.go => updateFootprintService.go} (100%) diff --git a/internal/repository/job.go b/internal/repository/job.go index 7cfe4fd..01dc0af 100644 --- a/internal/repository/job.go +++ b/internal/repository/job.go @@ -205,7 +205,10 @@ func (r *JobRepository) UpdateMetadata(job *schema.Job, key, val string) (err er return err } - if _, err = sq.Update("job").Set("meta_data", job.RawMetaData).Where("job.id = ?", job.ID).RunWith(r.stmtCache).Exec(); err != nil { + if _, err = sq.Update("job"). + Set("meta_data", job.RawMetaData). + Where("job.id = ?", job.ID). + RunWith(r.stmtCache).Exec(); err != nil { log.Warnf("Error while updating metadata for job, DB ID '%v'", job.ID) return err } @@ -480,6 +483,18 @@ func (r *JobRepository) FindRunningJobs(cluster string) ([]*schema.Job, error) { return jobs, nil } +func (r *JobRepository) UpdateDuration() error { + if _, err := sq.Update("job"). + Set("duration", sq.Expr("? - job.start_time", time.Now().Unix())). + Where("job_state = running"). + RunWith(r.stmtCache).Exec(); err != nil { + log.Warnf("Error while updating metadata for job, DB ID '%v'", job.ID) + return err + } + + return nil +} + func (r *JobRepository) FindJobsBetween(startTimeBegin int64, startTimeEnd int64) ([]*schema.Job, error) { var query sq.SelectBuilder diff --git a/internal/taskManager/taskManager.go b/internal/taskManager/taskManager.go index b31a1a1..101fc4a 100644 --- a/internal/taskManager/taskManager.go +++ b/internal/taskManager/taskManager.go @@ -80,6 +80,7 @@ func Start() { } RegisterFootprintWorker() + RegisterUpdateDurationWorker() s.Start() } diff --git a/internal/taskManager/updateDurationService.go b/internal/taskManager/updateDurationService.go new file mode 100644 index 0000000..afc1045 --- /dev/null +++ b/internal/taskManager/updateDurationService.go @@ -0,0 +1,26 @@ +// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. +package taskManager + +import ( + "time" + + "github.com/ClusterCockpit/cc-backend/pkg/log" + "github.com/go-co-op/gocron/v2" +) + +func RegisterUpdateDurationWorker() { + log.Info("Register duration update service") + + d, _ := time.ParseDuration("5m") + s.NewJob(gocron.DurationJob(d), + gocron.NewTask( + func() { + start := time.Now() + log.Printf("Update duration started at %s", start.Format(time.RFC3339)) + jobRepo.UpdateDuration() + log.Print("Update duration is done and took %s", time.Since(start)) + })) +} diff --git a/internal/taskManager/footprintService.go b/internal/taskManager/updateFootprintService.go similarity index 100% rename from internal/taskManager/footprintService.go rename to internal/taskManager/updateFootprintService.go