Merge branch 'import-data-sanitation' of https://github.com/ClusterCockpit/cc-backend into import-data-sanitation

This commit is contained in:
Christoph Kluge 2023-04-06 11:38:12 +02:00
commit 323066cb30
3 changed files with 53 additions and 13 deletions

View File

@ -86,8 +86,8 @@
}, },
"minProperties": 1 "minProperties": 1
}, },
"cpu_used": { "cpu_user": {
"description": "CPU active core utilization", "description": "CPU user active core utilization",
"properties": { "properties": {
"node": { "node": {
"$ref": "embedfs://job-metric-data.schema.json" "$ref": "embedfs://job-metric-data.schema.json"

View File

@ -193,8 +193,8 @@
"description": "Instructions executed per cycle", "description": "Instructions executed per cycle",
"$ref": "embedfs://job-metric-statistics.schema.json" "$ref": "embedfs://job-metric-statistics.schema.json"
}, },
"cpu_used": { "cpu_user": {
"description": "CPU active core utilization", "description": "CPU user active core utilization",
"$ref": "embedfs://job-metric-statistics.schema.json" "$ref": "embedfs://job-metric-statistics.schema.json"
}, },
"flops_dp": { "flops_dp": {
@ -326,7 +326,7 @@
} }
}, },
"required": [ "required": [
"cpu_used", "cpu_user",
"mem_used", "mem_used",
"flops_any", "flops_any",
"mem_bw" "mem_bw"

View File

@ -13,6 +13,7 @@ import (
"log" "log"
"os" "os"
"path/filepath" "path/filepath"
"sync"
"github.com/ClusterCockpit/cc-backend/pkg/schema" "github.com/ClusterCockpit/cc-backend/pkg/schema"
"github.com/ClusterCockpit/cc-backend/pkg/units" "github.com/ClusterCockpit/cc-backend/pkg/units"
@ -52,21 +53,39 @@ func deepCopyJobMeta(j *JobMeta) schema.JobMeta {
var rn schema.Resource var rn schema.Resource
rn.Hostname = ro.Hostname rn.Hostname = ro.Hostname
rn.Configuration = ro.Configuration rn.Configuration = ro.Configuration
hwt := make([]int, len(ro.HWThreads))
if ro.HWThreads != nil { if ro.HWThreads != nil {
hwt := make([]int, len(ro.HWThreads))
copy(hwt, ro.HWThreads) copy(hwt, ro.HWThreads)
} }
rn.HWThreads = hwt
acc := make([]string, len(ro.Accelerators))
if ro.Accelerators != nil { if ro.Accelerators != nil {
acc := make([]string, len(ro.Accelerators))
copy(acc, ro.Accelerators) copy(acc, ro.Accelerators)
} }
rn.Accelerators = acc
jn.Resources = append(jn.Resources, &rn) jn.Resources = append(jn.Resources, &rn)
} }
jn.MetaData = make(map[string]string)
for k, v := range j.MetaData { for k, v := range j.MetaData {
jn.MetaData[k] = v jn.MetaData[k] = v
} }
jn.Statistics = make(map[string]schema.JobStatistics)
for k, v := range j.Statistics {
var sn schema.JobStatistics
sn.Avg = v.Avg
sn.Max = v.Max
sn.Min = v.Min
tmpUnit := units.ConvertUnitString(v.Unit)
if tmpUnit.Base == "inval" {
sn.Unit = schema.Unit{Base: ""}
} else {
sn.Unit = tmpUnit
}
jn.Statistics[k] = sn
}
//optional properties //optional properties
jn.Partition = j.Partition jn.Partition = j.Partition
jn.ArrayJobId = j.ArrayJobId jn.ArrayJobId = j.ArrayJobId
@ -83,13 +102,23 @@ func deepCopyJobMeta(j *JobMeta) schema.JobMeta {
return jn return jn
} }
func deepCopyJobData(d *JobData) schema.JobData { func deepCopyJobData(d *JobData) *schema.JobData {
var dn = make(schema.JobData) var dn = make(schema.JobData)
for k, v := range *d { for k, v := range *d {
// fmt.Printf("Metric %s\n", k)
dn[k] = make(map[schema.MetricScope]*schema.JobMetric)
for mk, mv := range v { for mk, mv := range v {
// fmt.Printf("Scope %s\n", mk)
var mn schema.JobMetric var mn schema.JobMetric
mn.Unit = units.ConvertUnitString(mv.Unit) tmpUnit := units.ConvertUnitString(mv.Unit)
if tmpUnit.Base == "inval" {
mn.Unit = schema.Unit{Base: ""}
} else {
mn.Unit = tmpUnit
}
mn.Timestep = mv.Timestep mn.Timestep = mv.Timestep
for _, v := range mv.Series { for _, v := range mv.Series {
@ -112,12 +141,12 @@ func deepCopyJobData(d *JobData) schema.JobData {
mn.Series = append(mn.Series, sn) mn.Series = append(mn.Series, sn)
} }
dn[k] = make(map[schema.MetricScope]*schema.JobMetric)
dn[k][mk] = &mn dn[k][mk] = &mn
} }
// fmt.Printf("FINISH %s\n", k)
} }
return dn return &dn
} }
func deepCopyClusterConfig(co *Cluster) schema.Cluster { func deepCopyClusterConfig(co *Cluster) schema.Cluster {
@ -162,12 +191,18 @@ func deepCopyClusterConfig(co *Cluster) schema.Cluster {
mcn.Aggregation = mco.Aggregation mcn.Aggregation = mco.Aggregation
} }
mcn.Timestep = mco.Timestep mcn.Timestep = mco.Timestep
mcn.Unit = units.ConvertUnitString(mco.Unit) tmpUnit := units.ConvertUnitString(mco.Unit)
if tmpUnit.Base == "inval" {
mcn.Unit = schema.Unit{Base: ""}
} else {
mcn.Unit = tmpUnit
}
mcn.Peak = mco.Peak mcn.Peak = mco.Peak
mcn.Normal = mco.Normal mcn.Normal = mco.Normal
mcn.Caution = mco.Caution mcn.Caution = mco.Caution
mcn.Alert = mco.Alert mcn.Alert = mco.Alert
mcn.SubClusters = mco.SubClusters mcn.SubClusters = mco.SubClusters
cn.MetricConfig = append(cn.MetricConfig, &mcn) cn.MetricConfig = append(cn.MetricConfig, &mcn)
} }
@ -223,11 +258,15 @@ func main() {
} }
} }
var wg sync.WaitGroup
for job := range ar.Iter() { for job := range ar.Iter() {
// fmt.Printf("Job %d\n", job.JobID) // fmt.Printf("Job %d\n", job.JobID)
job := job job := job
wg.Add(1)
go func() { go func() {
defer wg.Done()
path := getPath(job, dstPath, "meta.json") path := getPath(job, dstPath, "meta.json")
err = os.MkdirAll(filepath.Dir(path), 0750) err = os.MkdirAll(filepath.Dir(path), 0750)
if err != nil { if err != nil {
@ -257,7 +296,7 @@ func main() {
log.Fatal(err) log.Fatal(err)
} }
jdn := deepCopyJobData(jd) jdn := deepCopyJobData(jd)
if err := EncodeJobData(f, &jdn); err != nil { if err := EncodeJobData(f, jdn); err != nil {
log.Fatal(err) log.Fatal(err)
} }
if err := f.Close(); err != nil { if err := f.Close(); err != nil {
@ -266,5 +305,6 @@ func main() {
}() }()
} }
wg.Wait()
os.WriteFile(filepath.Join(dstPath, "version.txt"), []byte(fmt.Sprintf("%d", Version)), 0644) os.WriteFile(filepath.Join(dstPath, "version.txt"), []byte(fmt.Sprintf("%d", Version)), 0644)
} }