Allow to combine job update queries

This commit is contained in:
Jan Eitzinger 2024-09-03 13:41:00 +02:00
parent 6568b6d723
commit f58efa2871
4 changed files with 46 additions and 44 deletions

View File

@ -12,6 +12,7 @@ import (
"github.com/ClusterCockpit/cc-backend/internal/repository" "github.com/ClusterCockpit/cc-backend/internal/repository"
"github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/schema" "github.com/ClusterCockpit/cc-backend/pkg/schema"
sq "github.com/Masterminds/squirrel"
) )
var ( var (
@ -53,17 +54,20 @@ func archivingWorker() {
continue continue
} }
if err := jobRepo.UpdateFootprint(jobMeta); err != nil { stmt := sq.Update("job").Where("job.id = ?", job.ID)
if stmt, err = jobRepo.UpdateFootprint(stmt, jobMeta); err != nil {
log.Errorf("archiving job (dbid: %d) failed at update Footprint step: %s", job.ID, err.Error()) log.Errorf("archiving job (dbid: %d) failed at update Footprint step: %s", job.ID, err.Error())
continue continue
} }
if err := jobRepo.UpdateEnergy(jobMeta); err != nil { if stmt, err = jobRepo.UpdateEnergy(stmt, jobMeta); err != nil {
log.Errorf("archiving job (dbid: %d) failed at update Energy step: %s", job.ID, err.Error()) log.Errorf("archiving job (dbid: %d) failed at update Energy step: %s", job.ID, err.Error())
continue continue
} }
// Update the jobs database entry one last time: // Update the jobs database entry one last time:
if err := jobRepo.MarkArchived(jobMeta, schema.MonitoringStatusArchivingSuccessful); err != nil { stmt = jobRepo.MarkArchived(stmt, schema.MonitoringStatusArchivingSuccessful)
log.Errorf("archiving job (dbid: %d) failed at marking archived step: %s", job.ID, err.Error()) if err := jobRepo.Execute(stmt); err != nil {
log.Errorf("archiving job (dbid: %d) failed at db execute: %s", job.ID, err.Error())
continue continue
} }
log.Debugf("archiving job %d took %s", job.JobID, time.Since(start)) log.Debugf("archiving job %d took %s", job.JobID, time.Since(start))

View File

@ -488,7 +488,6 @@ func (r *JobRepository) UpdateDuration() error {
Set("duration", sq.Expr("? - job.start_time", time.Now().Unix())). Set("duration", sq.Expr("? - job.start_time", time.Now().Unix())).
Where("job_state = running"). Where("job_state = running").
RunWith(r.stmtCache).Exec(); err != nil { RunWith(r.stmtCache).Exec(); err != nil {
log.Warnf("Error while updating metadata for job, DB ID '%v'", job.ID)
return err return err
} }
@ -542,27 +541,29 @@ func (r *JobRepository) UpdateMonitoringStatus(job int64, monitoringStatus int32
return return
} }
// FIXME: Combine the next three queries into one providing the db statement as function argument! func (r *JobRepository) Execute(stmt sq.UpdateBuilder) error {
func (r *JobRepository) MarkArchived(
jobMeta *schema.JobMeta,
monitoringStatus int32,
) error {
stmt := sq.Update("job").
Set("monitoring_status", monitoringStatus).
Where("job.id = ?", jobMeta.JobID)
if _, err := stmt.RunWith(r.stmtCache).Exec(); err != nil { if _, err := stmt.RunWith(r.stmtCache).Exec(); err != nil {
log.Warn("Error while marking job as archived")
return err return err
} }
return nil return nil
} }
func (r *JobRepository) UpdateEnergy(jobMeta *schema.JobMeta) error { func (r *JobRepository) MarkArchived(
stmt sq.UpdateBuilder,
monitoringStatus int32,
) sq.UpdateBuilder {
return stmt.Set("monitoring_status", monitoringStatus)
}
func (r *JobRepository) UpdateEnergy(
stmt sq.UpdateBuilder,
jobMeta *schema.JobMeta,
) (sq.UpdateBuilder, error) {
sc, err := archive.GetSubCluster(jobMeta.Cluster, jobMeta.SubCluster) sc, err := archive.GetSubCluster(jobMeta.Cluster, jobMeta.SubCluster)
if err != nil { if err != nil {
log.Errorf("cannot get subcluster: %s", err.Error()) log.Errorf("cannot get subcluster: %s", err.Error())
return err return stmt, err
} }
energyFootprint := make(map[string]float64) energyFootprint := make(map[string]float64)
var totalEnergy float64 var totalEnergy float64
@ -586,26 +587,23 @@ func (r *JobRepository) UpdateEnergy(jobMeta *schema.JobMeta) error {
if rawFootprint, err = json.Marshal(energyFootprint); err != nil { if rawFootprint, err = json.Marshal(energyFootprint); err != nil {
log.Warnf("Error while marshaling energy footprint for job, DB ID '%v'", jobMeta.ID) log.Warnf("Error while marshaling energy footprint for job, DB ID '%v'", jobMeta.ID)
return err return stmt, err
} }
stmt := sq.Update("job"). stmt.Set("energy_footprint", rawFootprint).
Set("energy_footprint", rawFootprint). Set("energy", totalEnergy)
Set("energy", totalEnergy).
Where("job.id = ?", jobMeta.JobID)
if _, err := stmt.RunWith(r.stmtCache).Exec(); err != nil { return stmt, nil
log.Warn("Error while updating job energy footprint")
return err
}
return nil
} }
func (r *JobRepository) UpdateFootprint(jobMeta *schema.JobMeta) error { func (r *JobRepository) UpdateFootprint(
stmt sq.UpdateBuilder,
jobMeta *schema.JobMeta,
) (sq.UpdateBuilder, error) {
sc, err := archive.GetSubCluster(jobMeta.Cluster, jobMeta.SubCluster) sc, err := archive.GetSubCluster(jobMeta.Cluster, jobMeta.SubCluster)
if err != nil { if err != nil {
log.Errorf("cannot get subcluster: %s", err.Error()) log.Errorf("cannot get subcluster: %s", err.Error())
return err return stmt, err
} }
footprint := make(map[string]float64) footprint := make(map[string]float64)
@ -624,15 +622,9 @@ func (r *JobRepository) UpdateFootprint(jobMeta *schema.JobMeta) error {
if rawFootprint, err = json.Marshal(footprint); err != nil { if rawFootprint, err = json.Marshal(footprint); err != nil {
log.Warnf("Error while marshaling footprint for job, DB ID '%v'", jobMeta.ID) log.Warnf("Error while marshaling footprint for job, DB ID '%v'", jobMeta.ID)
return err return stmt, err
} }
stmt := sq.Update("job").Set("footprint", rawFootprint). stmt.Set("footprint", rawFootprint)
Where("job.id = ?", jobMeta.JobID) return stmt, nil
if _, err := stmt.RunWith(r.stmtCache).Exec(); err != nil {
log.Warn("Error while updating job footprint")
return err
}
return nil
} }

View File

@ -21,6 +21,6 @@ func RegisterUpdateDurationWorker() {
start := time.Now() start := time.Now()
log.Printf("Update duration started at %s", start.Format(time.RFC3339)) log.Printf("Update duration started at %s", start.Format(time.RFC3339))
jobRepo.UpdateDuration() jobRepo.UpdateDuration()
log.Print("Update duration is done and took %s", time.Since(start)) log.Printf("Update duration is done and took %s", time.Since(start))
})) }))
} }

View File

@ -13,6 +13,7 @@ import (
"github.com/ClusterCockpit/cc-backend/pkg/archive" "github.com/ClusterCockpit/cc-backend/pkg/archive"
"github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/schema" "github.com/ClusterCockpit/cc-backend/pkg/schema"
sq "github.com/Masterminds/squirrel"
"github.com/go-co-op/gocron/v2" "github.com/go-co-op/gocron/v2"
) )
@ -22,8 +23,8 @@ func RegisterFootprintWorker() {
s.NewJob(gocron.DurationJob(d), s.NewJob(gocron.DurationJob(d),
gocron.NewTask( gocron.NewTask(
func() { func() {
t := time.Now() s := time.Now()
log.Printf("Update Footprints started at %s", t.Format(time.RFC3339)) log.Printf("Update Footprints started at %s", s.Format(time.RFC3339))
for _, cluster := range archive.Clusters { for _, cluster := range archive.Clusters {
jobs, err := jobRepo.FindRunningJobs(cluster.Name) jobs, err := jobRepo.FindRunningJobs(cluster.Name)
if err != nil { if err != nil {
@ -77,16 +78,21 @@ func RegisterFootprintWorker() {
} }
} }
if err := jobRepo.UpdateFootprint(jobMeta); err != nil { stmt := sq.Update("job").Where("job.id = ?", job.ID)
if stmt, err = jobRepo.UpdateFootprint(stmt, jobMeta); err != nil {
log.Errorf("Update job (dbid: %d) failed at update Footprint step: %s", job.ID, err.Error()) log.Errorf("Update job (dbid: %d) failed at update Footprint step: %s", job.ID, err.Error())
continue continue
} }
if err := jobRepo.UpdateEnergy(jobMeta); err != nil { if stmt, err = jobRepo.UpdateEnergy(stmt, jobMeta); err != nil {
log.Errorf("Update job (dbid: %d) failed at update Energy step: %s", job.ID, err.Error()) log.Errorf("Update job (dbid: %d) failed at update Energy step: %s", job.ID, err.Error())
continue continue
} }
if err := jobRepo.Execute(stmt); err != nil {
log.Errorf("Update job (dbid: %d) failed at db execute: %s", job.ID, err.Error())
continue
}
} }
} }
log.Print("Update Footprints done") log.Printf("Update Footprints is done and took %s", time.Since(s))
})) }))
} }