From 6661937fed0c6fc72f2247fab41054f8d33a55bf Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Wed, 5 Apr 2023 15:55:04 +0200 Subject: [PATCH] Adapt schema and fix bugs in migration --- pkg/schema/schemas/job-data.schema.json | 4 +- pkg/schema/schemas/job-meta.schema.json | 6 +-- tools/archive-migration/main.go | 56 +++++++++++++++++++++---- 3 files changed, 53 insertions(+), 13 deletions(-) diff --git a/pkg/schema/schemas/job-data.schema.json b/pkg/schema/schemas/job-data.schema.json index d1ad1f1..a6e4811 100644 --- a/pkg/schema/schemas/job-data.schema.json +++ b/pkg/schema/schemas/job-data.schema.json @@ -86,8 +86,8 @@ }, "minProperties": 1 }, - "cpu_used": { - "description": "CPU active core utilization", + "cpu_user": { + "description": "CPU user active core utilization", "properties": { "node": { "$ref": "embedfs://job-metric-data.schema.json" diff --git a/pkg/schema/schemas/job-meta.schema.json b/pkg/schema/schemas/job-meta.schema.json index ad4c05b..1e9ad5c 100644 --- a/pkg/schema/schemas/job-meta.schema.json +++ b/pkg/schema/schemas/job-meta.schema.json @@ -193,8 +193,8 @@ "description": "Instructions executed per cycle", "$ref": "embedfs://job-metric-statistics.schema.json" }, - "cpu_used": { - "description": "CPU active core utilization", + "cpu_user": { + "description": "CPU user active core utilization", "$ref": "embedfs://job-metric-statistics.schema.json" }, "flops_dp": { @@ -326,7 +326,7 @@ } }, "required": [ - "cpu_used", + "cpu_user", "mem_used", "flops_any", "mem_bw" diff --git a/tools/archive-migration/main.go b/tools/archive-migration/main.go index a1dd58d..6a37f4a 100644 --- a/tools/archive-migration/main.go +++ b/tools/archive-migration/main.go @@ -13,6 +13,7 @@ import ( "log" "os" "path/filepath" + "sync" "github.com/ClusterCockpit/cc-backend/pkg/schema" "github.com/ClusterCockpit/cc-backend/pkg/units" @@ -52,21 +53,39 @@ func deepCopyJobMeta(j *JobMeta) schema.JobMeta { var rn schema.Resource rn.Hostname = ro.Hostname rn.Configuration = ro.Configuration + hwt := make([]int, len(ro.HWThreads)) if ro.HWThreads != nil { - hwt := make([]int, len(ro.HWThreads)) copy(hwt, ro.HWThreads) } + rn.HWThreads = hwt + acc := make([]string, len(ro.Accelerators)) if ro.Accelerators != nil { - acc := make([]string, len(ro.Accelerators)) copy(acc, ro.Accelerators) } + rn.Accelerators = acc jn.Resources = append(jn.Resources, &rn) } + jn.MetaData = make(map[string]string) for k, v := range j.MetaData { jn.MetaData[k] = v } + jn.Statistics = make(map[string]schema.JobStatistics) + for k, v := range j.Statistics { + var sn schema.JobStatistics + sn.Avg = v.Avg + sn.Max = v.Max + sn.Min = v.Min + tmpUnit := units.ConvertUnitString(v.Unit) + if tmpUnit.Base == "inval" { + sn.Unit = schema.Unit{Base: ""} + } else { + sn.Unit = tmpUnit + } + jn.Statistics[k] = sn + } + //optional properties jn.Partition = j.Partition jn.ArrayJobId = j.ArrayJobId @@ -83,13 +102,23 @@ func deepCopyJobMeta(j *JobMeta) schema.JobMeta { return jn } -func deepCopyJobData(d *JobData) schema.JobData { +func deepCopyJobData(d *JobData) *schema.JobData { var dn = make(schema.JobData) for k, v := range *d { + // fmt.Printf("Metric %s\n", k) + dn[k] = make(map[schema.MetricScope]*schema.JobMetric) + for mk, mv := range v { + // fmt.Printf("Scope %s\n", mk) var mn schema.JobMetric - mn.Unit = units.ConvertUnitString(mv.Unit) + tmpUnit := units.ConvertUnitString(mv.Unit) + if tmpUnit.Base == "inval" { + mn.Unit = schema.Unit{Base: ""} + } else { + mn.Unit = tmpUnit + } + mn.Timestep = mv.Timestep for _, v := range mv.Series { @@ -112,12 +141,12 @@ func deepCopyJobData(d *JobData) schema.JobData { mn.Series = append(mn.Series, sn) } - dn[k] = make(map[schema.MetricScope]*schema.JobMetric) dn[k][mk] = &mn } + // fmt.Printf("FINISH %s\n", k) } - return dn + return &dn } func deepCopyClusterConfig(co *Cluster) schema.Cluster { @@ -162,12 +191,18 @@ func deepCopyClusterConfig(co *Cluster) schema.Cluster { mcn.Aggregation = mco.Aggregation } mcn.Timestep = mco.Timestep - mcn.Unit = units.ConvertUnitString(mco.Unit) + tmpUnit := units.ConvertUnitString(mco.Unit) + if tmpUnit.Base == "inval" { + mcn.Unit = schema.Unit{Base: ""} + } else { + mcn.Unit = tmpUnit + } mcn.Peak = mco.Peak mcn.Normal = mco.Normal mcn.Caution = mco.Caution mcn.Alert = mco.Alert mcn.SubClusters = mco.SubClusters + cn.MetricConfig = append(cn.MetricConfig, &mcn) } @@ -223,11 +258,15 @@ func main() { } } + var wg sync.WaitGroup + for job := range ar.Iter() { // fmt.Printf("Job %d\n", job.JobID) job := job + wg.Add(1) go func() { + defer wg.Done() path := getPath(job, dstPath, "meta.json") err = os.MkdirAll(filepath.Dir(path), 0750) if err != nil { @@ -257,7 +296,7 @@ func main() { log.Fatal(err) } jdn := deepCopyJobData(jd) - if err := EncodeJobData(f, &jdn); err != nil { + if err := EncodeJobData(f, jdn); err != nil { log.Fatal(err) } if err := f.Close(); err != nil { @@ -266,5 +305,6 @@ func main() { }() } + wg.Wait() os.WriteFile(filepath.Join(dstPath, "version.txt"), []byte(fmt.Sprintf("%d", Version)), 0644) }