feat: display energy usage in job view

- optional emission constant config line added
This commit is contained in:
Christoph Kluge
2024-09-27 13:45:44 +02:00
parent f53fc088ec
commit 48225662b1
11 changed files with 611 additions and 10 deletions

View File

@@ -9,6 +9,7 @@ import (
"encoding/json"
"errors"
"fmt"
"math"
"strconv"
"sync"
"time"
@@ -52,7 +53,7 @@ func GetJobRepository() *JobRepository {
var jobColumns []string = []string{
"job.id", "job.job_id", "job.user", "job.project", "job.cluster", "job.subcluster", "job.start_time", "job.partition", "job.array_job_id",
"job.num_nodes", "job.num_hwthreads", "job.num_acc", "job.exclusive", "job.monitoring_status", "job.smt", "job.job_state",
"job.duration", "job.walltime", "job.resources", "job.footprint", // "job.meta_data",
"job.duration", "job.walltime", "job.resources", "job.footprint", "job.energy",
}
func scanJob(row interface{ Scan(...interface{}) error }) (*schema.Job, error) {
@@ -61,7 +62,7 @@ func scanJob(row interface{ Scan(...interface{}) error }) (*schema.Job, error) {
if err := row.Scan(
&job.ID, &job.JobID, &job.User, &job.Project, &job.Cluster, &job.SubCluster, &job.StartTimeUnix, &job.Partition, &job.ArrayJobId,
&job.NumNodes, &job.NumHWThreads, &job.NumAcc, &job.Exclusive, &job.MonitoringStatus, &job.SMT, &job.State,
&job.Duration, &job.Walltime, &job.RawResources, &job.RawFootprint /*&job.RawMetaData*/); err != nil {
&job.Duration, &job.Walltime, &job.RawResources, &job.RawFootprint, &job.Energy); err != nil {
log.Warnf("Error while scanning rows (Job): %v", err)
return nil, err
}
@@ -245,6 +246,34 @@ func (r *JobRepository) FetchFootprint(job *schema.Job) (map[string]float64, err
return job.Footprint, nil
}
func (r *JobRepository) FetchEnergyFootprint(job *schema.Job) (map[string]float64, error) {
start := time.Now()
cachekey := fmt.Sprintf("energyFootprint:%d", job.ID)
if cached := r.cache.Get(cachekey, nil); cached != nil {
job.EnergyFootprint = cached.(map[string]float64)
return job.EnergyFootprint, nil
}
if err := sq.Select("job.energy_footprint").From("job").Where("job.id = ?", job.ID).
RunWith(r.stmtCache).QueryRow().Scan(&job.RawEnergyFootprint); err != nil {
log.Warn("Error while scanning for job energy_footprint")
return nil, err
}
if len(job.RawEnergyFootprint) == 0 {
return nil, nil
}
if err := json.Unmarshal(job.RawEnergyFootprint, &job.EnergyFootprint); err != nil {
log.Warn("Error while unmarshaling raw energy footprint json")
return nil, err
}
r.cache.Put(cachekey, job.EnergyFootprint, len(job.EnergyFootprint), 24*time.Hour)
log.Debugf("Timer FetchEnergyFootprint %s", time.Since(start))
return job.EnergyFootprint, nil
}
func (r *JobRepository) DeleteJobsBefore(startTime int64) (int, error) {
var cnt int
q := sq.Select("count(*)").From("job").Where("job.start_time < ?", startTime)
@@ -562,6 +591,7 @@ func (r *JobRepository) UpdateEnergy(
stmt sq.UpdateBuilder,
jobMeta *schema.JobMeta,
) (sq.UpdateBuilder, error) {
/* Note: Only Called for Running Jobs during Intermediate Update or on Archiving */
sc, err := archive.GetSubCluster(jobMeta.Cluster, jobMeta.SubCluster)
if err != nil {
log.Errorf("cannot get subcluster: %s", err.Error())
@@ -572,13 +602,17 @@ func (r *JobRepository) UpdateEnergy(
var energy float64
for _, fp := range sc.EnergyFootprint {
if i, err := archive.MetricIndex(sc.MetricConfig, fp); err != nil {
if i, err := archive.MetricIndex(sc.MetricConfig, fp); err == nil {
// FIXME: Check for unit conversions
// Energy: Watts * Time // Power: Energy / Time -> Correct labelling here?
if sc.MetricConfig[i].Energy == "power" {
energy = LoadJobStat(jobMeta, fp, "avg") * float64(jobMeta.Duration)
// Unit: ( W * s ) / 3600 = Wh ; Rounded to 2 nearest digits
energy = math.Round(((LoadJobStat(jobMeta, fp, "avg")*float64(jobMeta.Duration))/3600)*100) / 100
} else if sc.MetricConfig[i].Energy == "energy" {
// This assumes the metric is of aggregation type sum
}
} else {
log.Warnf("Error while collecting energy metric %s for job, DB ID '%v', return '0.0'", fp, jobMeta.ID)
}
energyFootprint[fp] = energy
@@ -592,13 +626,14 @@ func (r *JobRepository) UpdateEnergy(
return stmt, err
}
return stmt.Set("energy_footprint", rawFootprint).Set("energy", totalEnergy), nil
return stmt.Set("energy_footprint", rawFootprint).Set("energy", (math.Round(totalEnergy*100) / 100)), nil
}
func (r *JobRepository) UpdateFootprint(
stmt sq.UpdateBuilder,
jobMeta *schema.JobMeta,
) (sq.UpdateBuilder, error) {
/* Note: Only Called for Running Jobs during Intermediate Update or on Archiving */
sc, err := archive.GetSubCluster(jobMeta.Cluster, jobMeta.SubCluster)
if err != nil {
log.Errorf("cannot get subcluster: %s", err.Error())