feat: Add total energy and energy footprint

This commit is contained in:
Jan Eitzinger 2024-08-29 07:26:49 +02:00
parent f305863616
commit 5b03cf826b
4 changed files with 76 additions and 8 deletions

View File

@ -57,6 +57,10 @@ func archivingWorker() {
log.Errorf("archiving job (dbid: %d) failed at update Footprint step: %s", job.ID, err.Error()) log.Errorf("archiving job (dbid: %d) failed at update Footprint step: %s", job.ID, err.Error())
continue continue
} }
if err := jobRepo.UpdateEnergy(jobMeta); err != nil {
log.Errorf("archiving job (dbid: %d) failed at update Energy step: %s", job.ID, err.Error())
continue
}
// Update the jobs database entry one last time: // Update the jobs database entry one last time:
if err := jobRepo.MarkArchived(jobMeta, schema.MonitoringStatusArchivingSuccessful); err != nil { if err := jobRepo.MarkArchived(jobMeta, schema.MonitoringStatusArchivingSuccessful); err != nil {
log.Errorf("archiving job (dbid: %d) failed at marking archived step: %s", job.ID, err.Error()) log.Errorf("archiving job (dbid: %d) failed at marking archived step: %s", job.ID, err.Error())

View File

@ -500,7 +500,7 @@ func (r *JobRepository) UpdateMonitoringStatus(job int64, monitoringStatus int32
return return
} }
// Stop updates the job with the database id jobId using the provided arguments. // FIXME: Combine the next three queries into one providing the db statement as function argument!
func (r *JobRepository) MarkArchived( func (r *JobRepository) MarkArchived(
jobMeta *schema.JobMeta, jobMeta *schema.JobMeta,
monitoringStatus int32, monitoringStatus int32,
@ -516,6 +516,49 @@ func (r *JobRepository) MarkArchived(
return nil return nil
} }
func (r *JobRepository) UpdateEnergy(jobMeta *schema.JobMeta) error {
sc, err := archive.GetSubCluster(jobMeta.Cluster, jobMeta.SubCluster)
if err != nil {
log.Errorf("cannot get subcluster: %s", err.Error())
return err
}
energyFootprint := make(map[string]float64)
var totalEnergy float64
var energy float64
for _, fp := range sc.EnergyFootprint {
if i, err := archive.MetricIndex(sc.MetricConfig, fp); err != nil {
// FIXME: Check for unit conversions
if sc.MetricConfig[i].Energy == "power" {
energy = LoadJobStat(jobMeta, fp, "avg") * float64(jobMeta.Duration)
} else if sc.MetricConfig[i].Energy == "energy" {
// FIXME: Compute sum of energy metric
}
}
energyFootprint[fp] = energy
totalEnergy += energy
}
var rawFootprint []byte
if rawFootprint, err = json.Marshal(energyFootprint); err != nil {
log.Warnf("Error while marshaling energy footprint for job, DB ID '%v'", jobMeta.ID)
return err
}
stmt := sq.Update("job").
Set("energy_footprint", rawFootprint).
Set("energy", totalEnergy).
Where("job.id = ?", jobMeta.JobID)
if _, err := stmt.RunWith(r.stmtCache).Exec(); err != nil {
log.Warn("Error while updating job energy footprint")
return err
}
return nil
}
func (r *JobRepository) UpdateFootprint(jobMeta *schema.JobMeta) error { func (r *JobRepository) UpdateFootprint(jobMeta *schema.JobMeta) error {
sc, err := archive.GetSubCluster(jobMeta.Cluster, jobMeta.SubCluster) sc, err := archive.GetSubCluster(jobMeta.Cluster, jobMeta.SubCluster)
if err != nil { if err != nil {
@ -525,7 +568,13 @@ func (r *JobRepository) UpdateFootprint(jobMeta *schema.JobMeta) error {
footprint := make(map[string]float64) footprint := make(map[string]float64)
for _, fp := range sc.Footprint { for _, fp := range sc.Footprint {
footprint[fp] = LoadJobStat(jobMeta, fp) statType := "avg"
if i, err := archive.MetricIndex(sc.MetricConfig, fp); err != nil {
statType = sc.MetricConfig[i].Footprint
}
footprint[fp] = LoadJobStat(jobMeta, fp, statType)
} }
var rawFootprint []byte var rawFootprint []byte
@ -535,7 +584,8 @@ func (r *JobRepository) UpdateFootprint(jobMeta *schema.JobMeta) error {
return err return err
} }
stmt := sq.Update("job").Set("footprint", rawFootprint) stmt := sq.Update("job").Set("footprint", rawFootprint).
Where("job.id = ?", jobMeta.JobID)
if _, err := stmt.RunWith(r.stmtCache).Exec(); err != nil { if _, err := stmt.RunWith(r.stmtCache).Exec(); err != nil {
log.Warn("Error while updating job footprint") log.Warn("Error while updating job footprint")

View File

@ -286,13 +286,17 @@ func (r *JobRepository) JobsStats(
return stats, nil return stats, nil
} }
// FIXME: Make generic func LoadJobStat(job *schema.JobMeta, metric string, statType string) float64 {
func LoadJobStat(job *schema.JobMeta, metric string) float64 {
if stats, ok := job.Statistics[metric]; ok { if stats, ok := job.Statistics[metric]; ok {
if metric == "mem_used" { switch statType {
return stats.Max case "avg":
} else {
return stats.Avg return stats.Avg
case "max":
return stats.Max
case "min":
return stats.Min
default:
log.Errorf("Unknown stat type %s", statType)
} }
} }

View File

@ -221,3 +221,13 @@ func GetSubClusterByNode(cluster, hostname string) (string, error) {
return "", fmt.Errorf("ARCHIVE/CLUSTERCONFIG > no subcluster found for cluster %v and host %v", cluster, hostname) return "", fmt.Errorf("ARCHIVE/CLUSTERCONFIG > no subcluster found for cluster %v and host %v", cluster, hostname)
} }
func MetricIndex(mc []schema.MetricConfig, name string) (int, error) {
for i, m := range mc {
if m.Name == name {
return i, nil
}
}
return 0, fmt.Errorf("Unknown metric name %s", name)
}