diff --git a/internal/archiver/archiveWorker.go b/internal/archiver/archiveWorker.go index 4de5032..628e36e 100644 --- a/internal/archiver/archiveWorker.go +++ b/internal/archiver/archiveWorker.go @@ -12,6 +12,7 @@ import ( "github.com/ClusterCockpit/cc-backend/internal/repository" "github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/schema" + sq "github.com/Masterminds/squirrel" ) var ( @@ -53,17 +54,20 @@ func archivingWorker() { continue } - if err := jobRepo.UpdateFootprint(jobMeta); err != nil { + stmt := sq.Update("job").Where("job.id = ?", job.ID) + + if stmt, err = jobRepo.UpdateFootprint(stmt, jobMeta); err != nil { log.Errorf("archiving job (dbid: %d) failed at update Footprint step: %s", job.ID, err.Error()) continue } - if err := jobRepo.UpdateEnergy(jobMeta); err != nil { + if stmt, err = jobRepo.UpdateEnergy(stmt, jobMeta); err != nil { log.Errorf("archiving job (dbid: %d) failed at update Energy step: %s", job.ID, err.Error()) continue } // Update the jobs database entry one last time: - if err := jobRepo.MarkArchived(jobMeta, schema.MonitoringStatusArchivingSuccessful); err != nil { - log.Errorf("archiving job (dbid: %d) failed at marking archived step: %s", job.ID, err.Error()) + stmt = jobRepo.MarkArchived(stmt, schema.MonitoringStatusArchivingSuccessful) + if err := jobRepo.Execute(stmt); err != nil { + log.Errorf("archiving job (dbid: %d) failed at db execute: %s", job.ID, err.Error()) continue } log.Debugf("archiving job %d took %s", job.JobID, time.Since(start)) diff --git a/internal/repository/job.go b/internal/repository/job.go index 01dc0af..e5e2569 100644 --- a/internal/repository/job.go +++ b/internal/repository/job.go @@ -488,7 +488,6 @@ func (r *JobRepository) UpdateDuration() error { Set("duration", sq.Expr("? - job.start_time", time.Now().Unix())). Where("job_state = running"). RunWith(r.stmtCache).Exec(); err != nil { - log.Warnf("Error while updating metadata for job, DB ID '%v'", job.ID) return err } @@ -542,27 +541,29 @@ func (r *JobRepository) UpdateMonitoringStatus(job int64, monitoringStatus int32 return } -// FIXME: Combine the next three queries into one providing the db statement as function argument! -func (r *JobRepository) MarkArchived( - jobMeta *schema.JobMeta, - monitoringStatus int32, -) error { - stmt := sq.Update("job"). - Set("monitoring_status", monitoringStatus). - Where("job.id = ?", jobMeta.JobID) - +func (r *JobRepository) Execute(stmt sq.UpdateBuilder) error { if _, err := stmt.RunWith(r.stmtCache).Exec(); err != nil { - log.Warn("Error while marking job as archived") return err } + return nil } -func (r *JobRepository) UpdateEnergy(jobMeta *schema.JobMeta) error { +func (r *JobRepository) MarkArchived( + stmt sq.UpdateBuilder, + monitoringStatus int32, +) sq.UpdateBuilder { + return stmt.Set("monitoring_status", monitoringStatus) +} + +func (r *JobRepository) UpdateEnergy( + stmt sq.UpdateBuilder, + jobMeta *schema.JobMeta, +) (sq.UpdateBuilder, error) { sc, err := archive.GetSubCluster(jobMeta.Cluster, jobMeta.SubCluster) if err != nil { log.Errorf("cannot get subcluster: %s", err.Error()) - return err + return stmt, err } energyFootprint := make(map[string]float64) var totalEnergy float64 @@ -586,26 +587,23 @@ func (r *JobRepository) UpdateEnergy(jobMeta *schema.JobMeta) error { if rawFootprint, err = json.Marshal(energyFootprint); err != nil { log.Warnf("Error while marshaling energy footprint for job, DB ID '%v'", jobMeta.ID) - return err + return stmt, err } - stmt := sq.Update("job"). - Set("energy_footprint", rawFootprint). - Set("energy", totalEnergy). - Where("job.id = ?", jobMeta.JobID) + stmt.Set("energy_footprint", rawFootprint). + Set("energy", totalEnergy) - if _, err := stmt.RunWith(r.stmtCache).Exec(); err != nil { - log.Warn("Error while updating job energy footprint") - return err - } - return nil + return stmt, nil } -func (r *JobRepository) UpdateFootprint(jobMeta *schema.JobMeta) error { +func (r *JobRepository) UpdateFootprint( + stmt sq.UpdateBuilder, + jobMeta *schema.JobMeta, +) (sq.UpdateBuilder, error) { sc, err := archive.GetSubCluster(jobMeta.Cluster, jobMeta.SubCluster) if err != nil { log.Errorf("cannot get subcluster: %s", err.Error()) - return err + return stmt, err } footprint := make(map[string]float64) @@ -624,15 +622,9 @@ func (r *JobRepository) UpdateFootprint(jobMeta *schema.JobMeta) error { if rawFootprint, err = json.Marshal(footprint); err != nil { log.Warnf("Error while marshaling footprint for job, DB ID '%v'", jobMeta.ID) - return err + return stmt, err } - stmt := sq.Update("job").Set("footprint", rawFootprint). - Where("job.id = ?", jobMeta.JobID) - - if _, err := stmt.RunWith(r.stmtCache).Exec(); err != nil { - log.Warn("Error while updating job footprint") - return err - } - return nil + stmt.Set("footprint", rawFootprint) + return stmt, nil } diff --git a/internal/taskManager/updateDurationService.go b/internal/taskManager/updateDurationService.go index afc1045..6023547 100644 --- a/internal/taskManager/updateDurationService.go +++ b/internal/taskManager/updateDurationService.go @@ -21,6 +21,6 @@ func RegisterUpdateDurationWorker() { start := time.Now() log.Printf("Update duration started at %s", start.Format(time.RFC3339)) jobRepo.UpdateDuration() - log.Print("Update duration is done and took %s", time.Since(start)) + log.Printf("Update duration is done and took %s", time.Since(start)) })) } diff --git a/internal/taskManager/updateFootprintService.go b/internal/taskManager/updateFootprintService.go index ff76e25..510c73e 100644 --- a/internal/taskManager/updateFootprintService.go +++ b/internal/taskManager/updateFootprintService.go @@ -13,6 +13,7 @@ import ( "github.com/ClusterCockpit/cc-backend/pkg/archive" "github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/schema" + sq "github.com/Masterminds/squirrel" "github.com/go-co-op/gocron/v2" ) @@ -22,8 +23,8 @@ func RegisterFootprintWorker() { s.NewJob(gocron.DurationJob(d), gocron.NewTask( func() { - t := time.Now() - log.Printf("Update Footprints started at %s", t.Format(time.RFC3339)) + s := time.Now() + log.Printf("Update Footprints started at %s", s.Format(time.RFC3339)) for _, cluster := range archive.Clusters { jobs, err := jobRepo.FindRunningJobs(cluster.Name) if err != nil { @@ -77,16 +78,21 @@ func RegisterFootprintWorker() { } } - if err := jobRepo.UpdateFootprint(jobMeta); err != nil { + stmt := sq.Update("job").Where("job.id = ?", job.ID) + if stmt, err = jobRepo.UpdateFootprint(stmt, jobMeta); err != nil { log.Errorf("Update job (dbid: %d) failed at update Footprint step: %s", job.ID, err.Error()) continue } - if err := jobRepo.UpdateEnergy(jobMeta); err != nil { + if stmt, err = jobRepo.UpdateEnergy(stmt, jobMeta); err != nil { log.Errorf("Update job (dbid: %d) failed at update Energy step: %s", job.ID, err.Error()) continue } + if err := jobRepo.Execute(stmt); err != nil { + log.Errorf("Update job (dbid: %d) failed at db execute: %s", job.ID, err.Error()) + continue + } } } - log.Print("Update Footprints done") + log.Printf("Update Footprints is done and took %s", time.Since(s)) })) }