From cdd45ce88b0a03cae4ea457c4bf092fcb77ffb7e Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Sat, 16 Nov 2024 06:36:55 +0100 Subject: [PATCH] Fix importers and add Energy footprint to import --- internal/importer/handleImport.go | 32 ++++++++++++++++++++++++++++++- internal/importer/initDB.go | 29 ++++++++++++++++++++++++++++ internal/repository/jobCreate.go | 4 ++-- 3 files changed, 62 insertions(+), 3 deletions(-) diff --git a/internal/importer/handleImport.go b/internal/importer/handleImport.go index 153402a..01773a5 100644 --- a/internal/importer/handleImport.go +++ b/internal/importer/handleImport.go @@ -8,6 +8,7 @@ import ( "bytes" "encoding/json" "fmt" + "math" "os" "strings" @@ -84,7 +85,8 @@ func HandleImportFlag(flag string) error { } name := fmt.Sprintf("%s_%s", fp, statType) - job.Footprint[fp] = repository.LoadJobStat(&job, name, statType) + + job.Footprint[name] = repository.LoadJobStat(&job, fp, statType) } job.RawFootprint, err = json.Marshal(job.Footprint) @@ -92,6 +94,34 @@ func HandleImportFlag(flag string) error { log.Warn("Error while marshaling job footprint") return err } + + job.EnergyFootprint = make(map[string]float64) + var totalEnergy float64 + var energy float64 + + for _, fp := range sc.EnergyFootprint { + if i, err := archive.MetricIndex(sc.MetricConfig, fp); err == nil { + // Note: For DB data, calculate and save as kWh + // Energy: Power (in Watts) * Time (in Seconds) + if sc.MetricConfig[i].Energy == "energy" { // this metric has energy as unit (Joules) + } else if sc.MetricConfig[i].Energy == "power" { // this metric has power as unit (Watt) + // Unit: ( W * s ) / 3600 / 1000 = kWh ; Rounded to 2 nearest digits + energy = math.Round(((repository.LoadJobStat(&job, fp, "avg")*float64(job.Duration))/3600/1000)*100) / 100 + } + } else { + log.Warnf("Error while collecting energy metric %s for job, DB ID '%v', return '0.0'", fp, job.ID) + } + + job.EnergyFootprint[fp] = energy + totalEnergy += energy + } + + job.Energy = (math.Round(totalEnergy*100) / 100) + if job.RawEnergyFootprint, err = json.Marshal(job.EnergyFootprint); err != nil { + log.Warnf("Error while marshaling energy footprint for job INTO BYTES, DB ID '%v'", job.ID) + return err + } + job.RawResources, err = json.Marshal(job.Resources) if err != nil { log.Warn("Error while marshaling job resources") diff --git a/internal/importer/initDB.go b/internal/importer/initDB.go index afcde77..fa2ee6e 100644 --- a/internal/importer/initDB.go +++ b/internal/importer/initDB.go @@ -7,6 +7,7 @@ package importer import ( "encoding/json" "fmt" + "math" "strings" "time" @@ -70,6 +71,7 @@ func InitDB() error { log.Errorf("cannot get subcluster: %s", err.Error()) return err } + job.Footprint = make(map[string]float64) for _, fp := range sc.Footprint { @@ -90,6 +92,33 @@ func InitDB() error { return err } + job.EnergyFootprint = make(map[string]float64) + var totalEnergy float64 + var energy float64 + + for _, fp := range sc.EnergyFootprint { + if i, err := archive.MetricIndex(sc.MetricConfig, fp); err == nil { + // Note: For DB data, calculate and save as kWh + // Energy: Power (in Watts) * Time (in Seconds) + if sc.MetricConfig[i].Energy == "energy" { // this metric has energy as unit (Joules) + } else if sc.MetricConfig[i].Energy == "power" { // this metric has power as unit (Watt) + // Unit: ( W * s ) / 3600 / 1000 = kWh ; Rounded to 2 nearest digits + energy = math.Round(((repository.LoadJobStat(jobMeta, fp, "avg")*float64(jobMeta.Duration))/3600/1000)*100) / 100 + } + } else { + log.Warnf("Error while collecting energy metric %s for job, DB ID '%v', return '0.0'", fp, jobMeta.ID) + } + + job.EnergyFootprint[fp] = energy + totalEnergy += energy + } + + job.Energy = (math.Round(totalEnergy*100) / 100) + if job.RawEnergyFootprint, err = json.Marshal(job.EnergyFootprint); err != nil { + log.Warnf("Error while marshaling energy footprint for job INTO BYTES, DB ID '%v'", jobMeta.ID) + return err + } + job.RawResources, err = json.Marshal(job.Resources) if err != nil { log.Errorf("repository initDB(): %v", err) diff --git a/internal/repository/jobCreate.go b/internal/repository/jobCreate.go index 43c26c1..1b05b52 100644 --- a/internal/repository/jobCreate.go +++ b/internal/repository/jobCreate.go @@ -15,10 +15,10 @@ import ( const NamedJobInsert string = `INSERT INTO job ( job_id, user, project, cluster, subcluster, ` + "`partition`" + `, array_job_id, num_nodes, num_hwthreads, num_acc, - exclusive, monitoring_status, smt, job_state, start_time, duration, walltime, footprint, resources, meta_data + exclusive, monitoring_status, smt, job_state, start_time, duration, walltime, footprint, energy, energy_footprint, resources, meta_data ) VALUES ( :job_id, :user, :project, :cluster, :subcluster, :partition, :array_job_id, :num_nodes, :num_hwthreads, :num_acc, - :exclusive, :monitoring_status, :smt, :job_state, :start_time, :duration, :walltime, :footprint, :resources, :meta_data + :exclusive, :monitoring_status, :smt, :job_state, :start_time, :duration, :walltime, :footprint, :energy, :energy_footprint, :resources, :meta_data );` func (r *JobRepository) InsertJob(job *schema.JobMeta) (int64, error) {