diff --git a/internal/archiver/archiveWorker.go b/internal/archiver/archiveWorker.go index 7b272e6..4de5032 100644 --- a/internal/archiver/archiveWorker.go +++ b/internal/archiver/archiveWorker.go @@ -57,6 +57,10 @@ func archivingWorker() { log.Errorf("archiving job (dbid: %d) failed at update Footprint step: %s", job.ID, err.Error()) continue } + if err := jobRepo.UpdateEnergy(jobMeta); err != nil { + log.Errorf("archiving job (dbid: %d) failed at update Energy step: %s", job.ID, err.Error()) + continue + } // Update the jobs database entry one last time: if err := jobRepo.MarkArchived(jobMeta, schema.MonitoringStatusArchivingSuccessful); err != nil { log.Errorf("archiving job (dbid: %d) failed at marking archived step: %s", job.ID, err.Error()) diff --git a/internal/repository/job.go b/internal/repository/job.go index 2c5eef8..1e552e1 100644 --- a/internal/repository/job.go +++ b/internal/repository/job.go @@ -500,7 +500,7 @@ func (r *JobRepository) UpdateMonitoringStatus(job int64, monitoringStatus int32 return } -// Stop updates the job with the database id jobId using the provided arguments. +// FIXME: Combine the next three queries into one providing the db statement as function argument! func (r *JobRepository) MarkArchived( jobMeta *schema.JobMeta, monitoringStatus int32, @@ -516,6 +516,49 @@ func (r *JobRepository) MarkArchived( return nil } +func (r *JobRepository) UpdateEnergy(jobMeta *schema.JobMeta) error { + sc, err := archive.GetSubCluster(jobMeta.Cluster, jobMeta.SubCluster) + if err != nil { + log.Errorf("cannot get subcluster: %s", err.Error()) + return err + } + energyFootprint := make(map[string]float64) + var totalEnergy float64 + var energy float64 + + for _, fp := range sc.EnergyFootprint { + if i, err := archive.MetricIndex(sc.MetricConfig, fp); err != nil { + // FIXME: Check for unit conversions + if sc.MetricConfig[i].Energy == "power" { + energy = LoadJobStat(jobMeta, fp, "avg") * float64(jobMeta.Duration) + } else if sc.MetricConfig[i].Energy == "energy" { + // FIXME: Compute sum of energy metric + } + } + + energyFootprint[fp] = energy + totalEnergy += energy + } + + var rawFootprint []byte + + if rawFootprint, err = json.Marshal(energyFootprint); err != nil { + log.Warnf("Error while marshaling energy footprint for job, DB ID '%v'", jobMeta.ID) + return err + } + + stmt := sq.Update("job"). + Set("energy_footprint", rawFootprint). + Set("energy", totalEnergy). + Where("job.id = ?", jobMeta.JobID) + + if _, err := stmt.RunWith(r.stmtCache).Exec(); err != nil { + log.Warn("Error while updating job energy footprint") + return err + } + return nil +} + func (r *JobRepository) UpdateFootprint(jobMeta *schema.JobMeta) error { sc, err := archive.GetSubCluster(jobMeta.Cluster, jobMeta.SubCluster) if err != nil { @@ -525,7 +568,13 @@ func (r *JobRepository) UpdateFootprint(jobMeta *schema.JobMeta) error { footprint := make(map[string]float64) for _, fp := range sc.Footprint { - footprint[fp] = LoadJobStat(jobMeta, fp) + statType := "avg" + + if i, err := archive.MetricIndex(sc.MetricConfig, fp); err != nil { + statType = sc.MetricConfig[i].Footprint + } + + footprint[fp] = LoadJobStat(jobMeta, fp, statType) } var rawFootprint []byte @@ -535,7 +584,8 @@ func (r *JobRepository) UpdateFootprint(jobMeta *schema.JobMeta) error { return err } - stmt := sq.Update("job").Set("footprint", rawFootprint) + stmt := sq.Update("job").Set("footprint", rawFootprint). + Where("job.id = ?", jobMeta.JobID) if _, err := stmt.RunWith(r.stmtCache).Exec(); err != nil { log.Warn("Error while updating job footprint") diff --git a/internal/repository/stats.go b/internal/repository/stats.go index 5682144..ca05ca3 100644 --- a/internal/repository/stats.go +++ b/internal/repository/stats.go @@ -286,13 +286,17 @@ func (r *JobRepository) JobsStats( return stats, nil } -// FIXME: Make generic -func LoadJobStat(job *schema.JobMeta, metric string) float64 { +func LoadJobStat(job *schema.JobMeta, metric string, statType string) float64 { if stats, ok := job.Statistics[metric]; ok { - if metric == "mem_used" { - return stats.Max - } else { + switch statType { + case "avg": return stats.Avg + case "max": + return stats.Max + case "min": + return stats.Min + default: + log.Errorf("Unknown stat type %s", statType) } } diff --git a/pkg/archive/clusterConfig.go b/pkg/archive/clusterConfig.go index 3dd8c64..5710d06 100644 --- a/pkg/archive/clusterConfig.go +++ b/pkg/archive/clusterConfig.go @@ -221,3 +221,13 @@ func GetSubClusterByNode(cluster, hostname string) (string, error) { return "", fmt.Errorf("ARCHIVE/CLUSTERCONFIG > no subcluster found for cluster %v and host %v", cluster, hostname) } + +func MetricIndex(mc []schema.MetricConfig, name string) (int, error) { + for i, m := range mc { + if m.Name == name { + return i, nil + } + } + + return 0, fmt.Errorf("Unknown metric name %s", name) +}