diff --git a/api/schema.graphqls b/api/schema.graphqls index ed94d42..4aba1d5 100644 --- a/api/schema.graphqls +++ b/api/schema.graphqls @@ -18,6 +18,7 @@ type Job { numNodes: Int! numHWThreads: Int! numAcc: Int! + energy: Float! SMT: Int! exclusive: Int! partition: String! @@ -28,6 +29,7 @@ type Job { resources: [Resource!]! concurrentJobs: JobLinkResultList footprint: [FootprintValue] + energyFootprint: [EnergyFootprintValue] metaData: Any userData: User } @@ -65,6 +67,12 @@ type FootprintValue { value: Float! } +type EnergyFootprintValue { + hardware: String! + metric: String! + value: Float! +} + type MetricValue { name: String unit: Unit! diff --git a/internal/graph/generated/generated.go b/internal/graph/generated/generated.go index 3fd3649..b4556a8 100644 --- a/internal/graph/generated/generated.go +++ b/internal/graph/generated/generated.go @@ -74,6 +74,12 @@ type ComplexityRoot struct { Name func(childComplexity int) int } + EnergyFootprintValue struct { + Hardware func(childComplexity int) int + Metric func(childComplexity int) int + Value func(childComplexity int) int + } + FootprintValue struct { Name func(childComplexity int) int Stat func(childComplexity int) int @@ -108,6 +114,8 @@ type ComplexityRoot struct { Cluster func(childComplexity int) int ConcurrentJobs func(childComplexity int) int Duration func(childComplexity int) int + Energy func(childComplexity int) int + EnergyFootprint func(childComplexity int) int Exclusive func(childComplexity int) int Footprint func(childComplexity int) int ID func(childComplexity int) int @@ -349,6 +357,7 @@ type JobResolver interface { ConcurrentJobs(ctx context.Context, obj *schema.Job) (*model.JobLinkResultList, error) Footprint(ctx context.Context, obj *schema.Job) ([]*model.FootprintValue, error) + EnergyFootprint(ctx context.Context, obj *schema.Job) ([]*model.EnergyFootprintValue, error) MetaData(ctx context.Context, obj *schema.Job) (any, error) UserData(ctx context.Context, obj *schema.Job) (*model.User, error) } @@ -469,6 +478,27 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in return e.complexity.Count.Name(childComplexity), true + case "EnergyFootprintValue.hardware": + if e.complexity.EnergyFootprintValue.Hardware == nil { + break + } + + return e.complexity.EnergyFootprintValue.Hardware(childComplexity), true + + case "EnergyFootprintValue.metric": + if e.complexity.EnergyFootprintValue.Metric == nil { + break + } + + return e.complexity.EnergyFootprintValue.Metric(childComplexity), true + + case "EnergyFootprintValue.value": + if e.complexity.EnergyFootprintValue.Value == nil { + break + } + + return e.complexity.EnergyFootprintValue.Value(childComplexity), true + case "FootprintValue.name": if e.complexity.FootprintValue.Name == nil { break @@ -595,6 +625,20 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in return e.complexity.Job.Duration(childComplexity), true + case "Job.energy": + if e.complexity.Job.Energy == nil { + break + } + + return e.complexity.Job.Energy(childComplexity), true + + case "Job.energyFootprint": + if e.complexity.Job.EnergyFootprint == nil { + break + } + + return e.complexity.Job.EnergyFootprint(childComplexity), true + case "Job.exclusive": if e.complexity.Job.Exclusive == nil { break @@ -1862,6 +1906,7 @@ type Job { numNodes: Int! numHWThreads: Int! numAcc: Int! + energy: Float! SMT: Int! exclusive: Int! partition: String! @@ -1872,6 +1917,7 @@ type Job { resources: [Resource!]! concurrentJobs: JobLinkResultList footprint: [FootprintValue] + energyFootprint: [EnergyFootprintValue] metaData: Any userData: User } @@ -1909,6 +1955,12 @@ type FootprintValue { value: Float! } +type EnergyFootprintValue { + hardware: String! + metric: String! + value: Float! +} + type MetricValue { name: String unit: Unit! @@ -3173,6 +3225,138 @@ func (ec *executionContext) fieldContext_Count_count(_ context.Context, field gr return fc, nil } +func (ec *executionContext) _EnergyFootprintValue_hardware(ctx context.Context, field graphql.CollectedField, obj *model.EnergyFootprintValue) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_EnergyFootprintValue_hardware(ctx, field) + if err != nil { + return graphql.Null + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return obj.Hardware, nil + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + if !graphql.HasFieldError(ctx, fc) { + ec.Errorf(ctx, "must not be null") + } + return graphql.Null + } + res := resTmp.(string) + fc.Result = res + return ec.marshalNString2string(ctx, field.Selections, res) +} + +func (ec *executionContext) fieldContext_EnergyFootprintValue_hardware(_ context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "EnergyFootprintValue", + Field: field, + IsMethod: false, + IsResolver: false, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + return nil, errors.New("field of type String does not have child fields") + }, + } + return fc, nil +} + +func (ec *executionContext) _EnergyFootprintValue_metric(ctx context.Context, field graphql.CollectedField, obj *model.EnergyFootprintValue) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_EnergyFootprintValue_metric(ctx, field) + if err != nil { + return graphql.Null + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return obj.Metric, nil + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + if !graphql.HasFieldError(ctx, fc) { + ec.Errorf(ctx, "must not be null") + } + return graphql.Null + } + res := resTmp.(string) + fc.Result = res + return ec.marshalNString2string(ctx, field.Selections, res) +} + +func (ec *executionContext) fieldContext_EnergyFootprintValue_metric(_ context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "EnergyFootprintValue", + Field: field, + IsMethod: false, + IsResolver: false, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + return nil, errors.New("field of type String does not have child fields") + }, + } + return fc, nil +} + +func (ec *executionContext) _EnergyFootprintValue_value(ctx context.Context, field graphql.CollectedField, obj *model.EnergyFootprintValue) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_EnergyFootprintValue_value(ctx, field) + if err != nil { + return graphql.Null + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return obj.Value, nil + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + if !graphql.HasFieldError(ctx, fc) { + ec.Errorf(ctx, "must not be null") + } + return graphql.Null + } + res := resTmp.(float64) + fc.Result = res + return ec.marshalNFloat2float64(ctx, field.Selections, res) +} + +func (ec *executionContext) fieldContext_EnergyFootprintValue_value(_ context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "EnergyFootprintValue", + Field: field, + IsMethod: false, + IsResolver: false, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + return nil, errors.New("field of type Float does not have child fields") + }, + } + return fc, nil +} + func (ec *executionContext) _FootprintValue_name(ctx context.Context, field graphql.CollectedField, obj *model.FootprintValue) (ret graphql.Marshaler) { fc, err := ec.fieldContext_FootprintValue_name(ctx, field) if err != nil { @@ -4340,6 +4524,50 @@ func (ec *executionContext) fieldContext_Job_numAcc(_ context.Context, field gra return fc, nil } +func (ec *executionContext) _Job_energy(ctx context.Context, field graphql.CollectedField, obj *schema.Job) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_Job_energy(ctx, field) + if err != nil { + return graphql.Null + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return obj.Energy, nil + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + if !graphql.HasFieldError(ctx, fc) { + ec.Errorf(ctx, "must not be null") + } + return graphql.Null + } + res := resTmp.(float64) + fc.Result = res + return ec.marshalNFloat2float64(ctx, field.Selections, res) +} + +func (ec *executionContext) fieldContext_Job_energy(_ context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "Job", + Field: field, + IsMethod: false, + IsResolver: false, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + return nil, errors.New("field of type Float does not have child fields") + }, + } + return fc, nil +} + func (ec *executionContext) _Job_SMT(ctx context.Context, field graphql.CollectedField, obj *schema.Job) (ret graphql.Marshaler) { fc, err := ec.fieldContext_Job_SMT(ctx, field) if err != nil { @@ -4810,6 +5038,55 @@ func (ec *executionContext) fieldContext_Job_footprint(_ context.Context, field return fc, nil } +func (ec *executionContext) _Job_energyFootprint(ctx context.Context, field graphql.CollectedField, obj *schema.Job) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_Job_energyFootprint(ctx, field) + if err != nil { + return graphql.Null + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return ec.resolvers.Job().EnergyFootprint(rctx, obj) + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + return graphql.Null + } + res := resTmp.([]*model.EnergyFootprintValue) + fc.Result = res + return ec.marshalOEnergyFootprintValue2ᚕᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋinternalᚋgraphᚋmodelᚐEnergyFootprintValue(ctx, field.Selections, res) +} + +func (ec *executionContext) fieldContext_Job_energyFootprint(_ context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "Job", + Field: field, + IsMethod: true, + IsResolver: true, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + switch field.Name { + case "hardware": + return ec.fieldContext_EnergyFootprintValue_hardware(ctx, field) + case "metric": + return ec.fieldContext_EnergyFootprintValue_metric(ctx, field) + case "value": + return ec.fieldContext_EnergyFootprintValue_value(ctx, field) + } + return nil, fmt.Errorf("no field named %q was found under type EnergyFootprintValue", field.Name) + }, + } + return fc, nil +} + func (ec *executionContext) _Job_metaData(ctx context.Context, field graphql.CollectedField, obj *schema.Job) (ret graphql.Marshaler) { fc, err := ec.fieldContext_Job_metaData(ctx, field) if err != nil { @@ -5518,6 +5795,8 @@ func (ec *executionContext) fieldContext_JobResultList_items(_ context.Context, return ec.fieldContext_Job_numHWThreads(ctx, field) case "numAcc": return ec.fieldContext_Job_numAcc(ctx, field) + case "energy": + return ec.fieldContext_Job_energy(ctx, field) case "SMT": return ec.fieldContext_Job_SMT(ctx, field) case "exclusive": @@ -5538,6 +5817,8 @@ func (ec *executionContext) fieldContext_JobResultList_items(_ context.Context, return ec.fieldContext_Job_concurrentJobs(ctx, field) case "footprint": return ec.fieldContext_Job_footprint(ctx, field) + case "energyFootprint": + return ec.fieldContext_Job_energyFootprint(ctx, field) case "metaData": return ec.fieldContext_Job_metaData(ctx, field) case "userData": @@ -8480,6 +8761,8 @@ func (ec *executionContext) fieldContext_Query_job(ctx context.Context, field gr return ec.fieldContext_Job_numHWThreads(ctx, field) case "numAcc": return ec.fieldContext_Job_numAcc(ctx, field) + case "energy": + return ec.fieldContext_Job_energy(ctx, field) case "SMT": return ec.fieldContext_Job_SMT(ctx, field) case "exclusive": @@ -8500,6 +8783,8 @@ func (ec *executionContext) fieldContext_Query_job(ctx context.Context, field gr return ec.fieldContext_Job_concurrentJobs(ctx, field) case "footprint": return ec.fieldContext_Job_footprint(ctx, field) + case "energyFootprint": + return ec.fieldContext_Job_energyFootprint(ctx, field) case "metaData": return ec.fieldContext_Job_metaData(ctx, field) case "userData": @@ -13740,6 +14025,55 @@ func (ec *executionContext) _Count(ctx context.Context, sel ast.SelectionSet, ob return out } +var energyFootprintValueImplementors = []string{"EnergyFootprintValue"} + +func (ec *executionContext) _EnergyFootprintValue(ctx context.Context, sel ast.SelectionSet, obj *model.EnergyFootprintValue) graphql.Marshaler { + fields := graphql.CollectFields(ec.OperationContext, sel, energyFootprintValueImplementors) + + out := graphql.NewFieldSet(fields) + deferred := make(map[string]*graphql.FieldSet) + for i, field := range fields { + switch field.Name { + case "__typename": + out.Values[i] = graphql.MarshalString("EnergyFootprintValue") + case "hardware": + out.Values[i] = ec._EnergyFootprintValue_hardware(ctx, field, obj) + if out.Values[i] == graphql.Null { + out.Invalids++ + } + case "metric": + out.Values[i] = ec._EnergyFootprintValue_metric(ctx, field, obj) + if out.Values[i] == graphql.Null { + out.Invalids++ + } + case "value": + out.Values[i] = ec._EnergyFootprintValue_value(ctx, field, obj) + if out.Values[i] == graphql.Null { + out.Invalids++ + } + default: + panic("unknown field " + strconv.Quote(field.Name)) + } + } + out.Dispatch(ctx) + if out.Invalids > 0 { + return graphql.Null + } + + atomic.AddInt32(&ec.deferred, int32(len(deferred))) + + for label, dfs := range deferred { + ec.processDeferredGroup(graphql.DeferredGroup{ + Label: label, + Path: graphql.GetPath(ctx), + FieldSet: dfs, + Context: ctx, + }) + } + + return out +} + var footprintValueImplementors = []string{"FootprintValue"} func (ec *executionContext) _FootprintValue(ctx context.Context, sel ast.SelectionSet, obj *model.FootprintValue) graphql.Marshaler { @@ -14048,6 +14382,11 @@ func (ec *executionContext) _Job(ctx context.Context, sel ast.SelectionSet, obj if out.Values[i] == graphql.Null { atomic.AddUint32(&out.Invalids, 1) } + case "energy": + out.Values[i] = ec._Job_energy(ctx, field, obj) + if out.Values[i] == graphql.Null { + atomic.AddUint32(&out.Invalids, 1) + } case "SMT": out.Values[i] = ec._Job_SMT(ctx, field, obj) if out.Values[i] == graphql.Null { @@ -14184,6 +14523,39 @@ func (ec *executionContext) _Job(ctx context.Context, sel ast.SelectionSet, obj continue } + out.Concurrently(i, func(ctx context.Context) graphql.Marshaler { return innerFunc(ctx, out) }) + case "energyFootprint": + field := field + + innerFunc := func(ctx context.Context, _ *graphql.FieldSet) (res graphql.Marshaler) { + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + } + }() + res = ec._Job_energyFootprint(ctx, field, obj) + return res + } + + if field.Deferrable != nil { + dfs, ok := deferred[field.Deferrable.Label] + di := 0 + if ok { + dfs.AddField(field) + di = len(dfs.Values) - 1 + } else { + dfs = graphql.NewFieldSet([]graphql.CollectedField{field}) + deferred[field.Deferrable.Label] = dfs + } + dfs.Concurrently(di, func(ctx context.Context) graphql.Marshaler { + return innerFunc(ctx, dfs) + }) + + // don't run the out.Concurrently() call below + out.Values[i] = graphql.Null + continue + } + out.Concurrently(i, func(ctx context.Context) graphql.Marshaler { return innerFunc(ctx, out) }) case "metaData": field := field @@ -18120,6 +18492,54 @@ func (ec *executionContext) marshalOBoolean2ᚖbool(ctx context.Context, sel ast return res } +func (ec *executionContext) marshalOEnergyFootprintValue2ᚕᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋinternalᚋgraphᚋmodelᚐEnergyFootprintValue(ctx context.Context, sel ast.SelectionSet, v []*model.EnergyFootprintValue) graphql.Marshaler { + if v == nil { + return graphql.Null + } + ret := make(graphql.Array, len(v)) + var wg sync.WaitGroup + isLen1 := len(v) == 1 + if !isLen1 { + wg.Add(len(v)) + } + for i := range v { + i := i + fc := &graphql.FieldContext{ + Index: &i, + Result: &v[i], + } + ctx := graphql.WithFieldContext(ctx, fc) + f := func(i int) { + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = nil + } + }() + if !isLen1 { + defer wg.Done() + } + ret[i] = ec.marshalOEnergyFootprintValue2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋinternalᚋgraphᚋmodelᚐEnergyFootprintValue(ctx, sel, v[i]) + } + if isLen1 { + f(i) + } else { + go f(i) + } + + } + wg.Wait() + + return ret +} + +func (ec *executionContext) marshalOEnergyFootprintValue2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋinternalᚋgraphᚋmodelᚐEnergyFootprintValue(ctx context.Context, sel ast.SelectionSet, v *model.EnergyFootprintValue) graphql.Marshaler { + if v == nil { + return graphql.Null + } + return ec._EnergyFootprintValue(ctx, sel, v) +} + func (ec *executionContext) unmarshalOFloat2float64(ctx context.Context, v interface{}) (float64, error) { res, err := graphql.UnmarshalFloatContext(ctx, v) return res, graphql.ErrorOnPath(ctx, err) diff --git a/internal/graph/model/models_gen.go b/internal/graph/model/models_gen.go index 6c731a2..58389ab 100644 --- a/internal/graph/model/models_gen.go +++ b/internal/graph/model/models_gen.go @@ -16,6 +16,12 @@ type Count struct { Count int `json:"count"` } +type EnergyFootprintValue struct { + Hardware string `json:"hardware"` + Metric string `json:"metric"` + Value float64 `json:"value"` +} + type FloatRange struct { From float64 `json:"from"` To float64 `json:"to"` diff --git a/internal/graph/schema.resolvers.go b/internal/graph/schema.resolvers.go index ef17e1d..8c5ee0d 100644 --- a/internal/graph/schema.resolvers.go +++ b/internal/graph/schema.resolvers.go @@ -8,6 +8,7 @@ import ( "context" "errors" "fmt" + "regexp" "slices" "strconv" "strings" @@ -71,6 +72,45 @@ func (r *jobResolver) Footprint(ctx context.Context, obj *schema.Job) ([]*model. return res, err } +// EnergyFootprint is the resolver for the energyFootprint field. +func (r *jobResolver) EnergyFootprint(ctx context.Context, obj *schema.Job) ([]*model.EnergyFootprintValue, error) { + rawEnergyFootprint, err := r.Repo.FetchEnergyFootprint(obj) + if err != nil { + log.Warn("Error while fetching job energy footprint data") + return nil, err + } + + res := []*model.EnergyFootprintValue{} + for name, value := range rawEnergyFootprint { + // Suboptimal: Nearly hardcoded metric name expectations + matchCpu := regexp.MustCompile(`cpu|Cpu|CPU`) + matchAcc := regexp.MustCompile(`acc|Acc|ACC`) + matchMem := regexp.MustCompile(`mem|Mem|MEM`) + matchCore := regexp.MustCompile(`core|Core|CORE`) + + hwType := "" + switch test := name; { // NOtice ';' for var declaration + case matchCpu.MatchString(test): + hwType = "CPU" + case matchAcc.MatchString(test): + hwType = "Accelerator" + case matchMem.MatchString(test): + hwType = "Memory" + case matchCore.MatchString(test): + hwType = "Core" + default: + hwType = "Hardware" + } + + res = append(res, &model.EnergyFootprintValue{ + Hardware: hwType, + Metric: name, + Value: value, + }) + } + return res, err +} + // MetaData is the resolver for the metaData field. func (r *jobResolver) MetaData(ctx context.Context, obj *schema.Job) (any, error) { return r.Repo.FetchMetadata(obj) diff --git a/internal/repository/job.go b/internal/repository/job.go index 440734e..16390f2 100644 --- a/internal/repository/job.go +++ b/internal/repository/job.go @@ -9,6 +9,7 @@ import ( "encoding/json" "errors" "fmt" + "math" "strconv" "sync" "time" @@ -52,7 +53,7 @@ func GetJobRepository() *JobRepository { var jobColumns []string = []string{ "job.id", "job.job_id", "job.user", "job.project", "job.cluster", "job.subcluster", "job.start_time", "job.partition", "job.array_job_id", "job.num_nodes", "job.num_hwthreads", "job.num_acc", "job.exclusive", "job.monitoring_status", "job.smt", "job.job_state", - "job.duration", "job.walltime", "job.resources", "job.footprint", // "job.meta_data", + "job.duration", "job.walltime", "job.resources", "job.footprint", "job.energy", } func scanJob(row interface{ Scan(...interface{}) error }) (*schema.Job, error) { @@ -61,7 +62,7 @@ func scanJob(row interface{ Scan(...interface{}) error }) (*schema.Job, error) { if err := row.Scan( &job.ID, &job.JobID, &job.User, &job.Project, &job.Cluster, &job.SubCluster, &job.StartTimeUnix, &job.Partition, &job.ArrayJobId, &job.NumNodes, &job.NumHWThreads, &job.NumAcc, &job.Exclusive, &job.MonitoringStatus, &job.SMT, &job.State, - &job.Duration, &job.Walltime, &job.RawResources, &job.RawFootprint /*&job.RawMetaData*/); err != nil { + &job.Duration, &job.Walltime, &job.RawResources, &job.RawFootprint, &job.Energy); err != nil { log.Warnf("Error while scanning rows (Job): %v", err) return nil, err } @@ -245,6 +246,34 @@ func (r *JobRepository) FetchFootprint(job *schema.Job) (map[string]float64, err return job.Footprint, nil } +func (r *JobRepository) FetchEnergyFootprint(job *schema.Job) (map[string]float64, error) { + start := time.Now() + cachekey := fmt.Sprintf("energyFootprint:%d", job.ID) + if cached := r.cache.Get(cachekey, nil); cached != nil { + job.EnergyFootprint = cached.(map[string]float64) + return job.EnergyFootprint, nil + } + + if err := sq.Select("job.energy_footprint").From("job").Where("job.id = ?", job.ID). + RunWith(r.stmtCache).QueryRow().Scan(&job.RawEnergyFootprint); err != nil { + log.Warn("Error while scanning for job energy_footprint") + return nil, err + } + + if len(job.RawEnergyFootprint) == 0 { + return nil, nil + } + + if err := json.Unmarshal(job.RawEnergyFootprint, &job.EnergyFootprint); err != nil { + log.Warn("Error while unmarshaling raw energy footprint json") + return nil, err + } + + r.cache.Put(cachekey, job.EnergyFootprint, len(job.EnergyFootprint), 24*time.Hour) + log.Debugf("Timer FetchEnergyFootprint %s", time.Since(start)) + return job.EnergyFootprint, nil +} + func (r *JobRepository) DeleteJobsBefore(startTime int64) (int, error) { var cnt int q := sq.Select("count(*)").From("job").Where("job.start_time < ?", startTime) @@ -562,6 +591,7 @@ func (r *JobRepository) UpdateEnergy( stmt sq.UpdateBuilder, jobMeta *schema.JobMeta, ) (sq.UpdateBuilder, error) { + /* Note: Only Called for Running Jobs during Intermediate Update or on Archiving */ sc, err := archive.GetSubCluster(jobMeta.Cluster, jobMeta.SubCluster) if err != nil { log.Errorf("cannot get subcluster: %s", err.Error()) @@ -572,13 +602,17 @@ func (r *JobRepository) UpdateEnergy( var energy float64 for _, fp := range sc.EnergyFootprint { - if i, err := archive.MetricIndex(sc.MetricConfig, fp); err != nil { + if i, err := archive.MetricIndex(sc.MetricConfig, fp); err == nil { // FIXME: Check for unit conversions + // Energy: Watts * Time // Power: Energy / Time -> Correct labelling here? if sc.MetricConfig[i].Energy == "power" { - energy = LoadJobStat(jobMeta, fp, "avg") * float64(jobMeta.Duration) + // Unit: ( W * s ) / 3600 = Wh ; Rounded to 2 nearest digits + energy = math.Round(((LoadJobStat(jobMeta, fp, "avg")*float64(jobMeta.Duration))/3600)*100) / 100 } else if sc.MetricConfig[i].Energy == "energy" { // This assumes the metric is of aggregation type sum } + } else { + log.Warnf("Error while collecting energy metric %s for job, DB ID '%v', return '0.0'", fp, jobMeta.ID) } energyFootprint[fp] = energy @@ -592,13 +626,14 @@ func (r *JobRepository) UpdateEnergy( return stmt, err } - return stmt.Set("energy_footprint", rawFootprint).Set("energy", totalEnergy), nil + return stmt.Set("energy_footprint", rawFootprint).Set("energy", (math.Round(totalEnergy*100) / 100)), nil } func (r *JobRepository) UpdateFootprint( stmt sq.UpdateBuilder, jobMeta *schema.JobMeta, ) (sq.UpdateBuilder, error) { + /* Note: Only Called for Running Jobs during Intermediate Update or on Archiving */ sc, err := archive.GetSubCluster(jobMeta.Cluster, jobMeta.SubCluster) if err != nil { log.Errorf("cannot get subcluster: %s", err.Error()) diff --git a/internal/routerConfig/routes.go b/internal/routerConfig/routes.go index 1dfdfc2..ae3da8f 100644 --- a/internal/routerConfig/routes.go +++ b/internal/routerConfig/routes.go @@ -78,6 +78,9 @@ func setupHomeRoute(i InfoType, r *http.Request) InfoType { func setupJobRoute(i InfoType, r *http.Request) InfoType { i["id"] = mux.Vars(r)["id"] + if config.Keys.EmissionConstant != 0 { + i["emission"] = config.Keys.EmissionConstant + } return i } diff --git a/pkg/archive/clusterConfig.go b/pkg/archive/clusterConfig.go index 5710d06..fff32c9 100644 --- a/pkg/archive/clusterConfig.go +++ b/pkg/archive/clusterConfig.go @@ -75,7 +75,6 @@ func initClusterConfig() error { if !cfg.Remove { availability.SubClusters = append(availability.SubClusters, sc.Name) newMetric.Peak = cfg.Peak - newMetric.Peak = cfg.Peak newMetric.Normal = cfg.Normal newMetric.Caution = cfg.Caution newMetric.Alert = cfg.Alert @@ -229,5 +228,5 @@ func MetricIndex(mc []schema.MetricConfig, name string) (int, error) { } } - return 0, fmt.Errorf("Unknown metric name %s", name) + return 0, fmt.Errorf("unknown metric name %s", name) } diff --git a/pkg/schema/config.go b/pkg/schema/config.go index e2cb28c..10fb728 100644 --- a/pkg/schema/config.go +++ b/pkg/schema/config.go @@ -154,4 +154,8 @@ type ProgramConfig struct { // Array of Clusters Clusters []*ClusterConfig `json:"clusters"` + + // Energy Mix CO2 Emission Constant [g/kWh] + // If entered, displays estimated CO2 emission for job based on jobs totalEnergy + EmissionConstant int `json:"emission-constant"` } diff --git a/web/frontend/src/Job.root.svelte b/web/frontend/src/Job.root.svelte index 73dd158..899d5af 100644 --- a/web/frontend/src/Job.root.svelte +++ b/web/frontend/src/Job.root.svelte @@ -36,6 +36,7 @@ import Metric from "./job/Metric.svelte"; import StatsTable from "./job/StatsTable.svelte"; import JobSummary from "./job/JobSummary.svelte"; + import EnergySummary from "./job/EnergySummary.svelte"; import ConcurrentJobs from "./generic/helper/ConcurrentJobs.svelte"; import PlotTable from "./generic/PlotTable.svelte"; import Roofline from "./generic/plots/Roofline.svelte"; @@ -70,7 +71,7 @@ const { query: initq } = init(` job(id: "${dbid}") { id, jobId, user, project, cluster, startTime, - duration, numNodes, numHWThreads, numAcc, + duration, numNodes, numHWThreads, numAcc, energy, SMT, exclusive, partition, subCluster, arrayJobId, monitoringStatus, state, walltime, tags { id, type, scope, name }, @@ -78,7 +79,8 @@ metaData, userData { name, email }, concurrentJobs { items { id, jobId }, count, listQuery }, - footprint { name, stat, value } + footprint { name, stat, value }, + energyFootprint { hardware, metric, value } } `); @@ -308,6 +310,14 @@ +{#if $initq?.data} + + + + + +{/if} + diff --git a/web/frontend/src/job.entrypoint.js b/web/frontend/src/job.entrypoint.js index 16714a5..78ce1d1 100644 --- a/web/frontend/src/job.entrypoint.js +++ b/web/frontend/src/job.entrypoint.js @@ -11,6 +11,7 @@ new Job({ }, context: new Map([ ['cc-config', clusterCockpitConfig], - ['resampling', resampleConfig] + ['resampling', resampleConfig], + ['emission', emission] ]) }) diff --git a/web/frontend/src/job/EnergySummary.svelte b/web/frontend/src/job/EnergySummary.svelte new file mode 100644 index 0000000..f22f984 --- /dev/null +++ b/web/frontend/src/job/EnergySummary.svelte @@ -0,0 +1,75 @@ + + + + + + + + {#each job.energyFootprint as efp} + +
{efp.hardware}: {efp.value} Wh ({efp.metric})
+ + Estimated energy consumption based on metric {efp.metric} and job runtime. + + {/each} + +
Total Energy: {job?.energy? job.energy : 0} Wh
+ + {#if carbonPerkWh} + +
Carbon Emission: {carbonMass} kg
+ + {/if} +
+
+
+ +Estimated total energy consumption of job. + + +{#if carbonPerkWh} + Estimated emission based on supplier energy mix and total energy consumption. + +{/if} + +