Fine tune schema and job archive migration

This commit is contained in:
Jan Eitzinger 2023-04-06 18:09:36 +02:00
parent 6661937fed
commit d858868901
6 changed files with 35 additions and 15 deletions

View File

@ -172,11 +172,11 @@ func (fsa *FsArchive) LoadClusterCfg(name string) (*schema.Cluster, error) {
log.Errorf("fsBackend LoadClusterCfg()- %v", err)
return &schema.Cluster{}, err
}
if config.Keys.Validate {
if err := schema.Validate(schema.ClusterCfg, bytes.NewReader(b)); err != nil {
return &schema.Cluster{}, fmt.Errorf("validate cluster config: %v", err)
}
// if config.Keys.Validate {
if err := schema.Validate(schema.ClusterCfg, bytes.NewReader(b)); err != nil {
return &schema.Cluster{}, fmt.Errorf("validate cluster config: %v", err)
}
// }
return DecodeCluster(bytes.NewReader(b))
}

View File

@ -4,7 +4,10 @@
// license that can be found in the LICENSE file.
package schema
import "strconv"
import (
"fmt"
"strconv"
)
type Accelerator struct {
ID string `json:"id"`
@ -157,6 +160,15 @@ func (topo *Topology) GetMemoryDomainsFromHWThreads(
return memDoms, exclusive
}
// Temporary fix to convert back from int id to string id for accelerators
func (topo *Topology) GetAcceleratorID(id int) (string, error) {
if id < len(topo.Accelerators) {
return topo.Accelerators[id].ID, nil
} else {
return "", fmt.Errorf("Index %d out of range", id)
}
}
func (topo *Topology) GetAcceleratorIDs() ([]int, error) {
accels := make([]int, 0)
for _, accel := range topo.Accelerators {

View File

@ -479,7 +479,7 @@
]
},
"required": [
"cpu_used",
"cpu_user",
"mem_used",
"flops_any",
"mem_bw",

View File

@ -15,7 +15,6 @@
"F/s",
"CPI",
"IPC",
"load",
"Hz",
"W",
"°C",

View File

@ -23,6 +23,7 @@ func main() {
archiveCfg := fmt.Sprintf("{\"kind\": \"file\",\"path\": \"%s\"}", srcPath)
config.Init(flagConfigFile)
config.Keys.Validate = true
if err := archive.Init(json.RawMessage(archiveCfg)); err != nil {
log.Fatal(err)

View File

@ -102,7 +102,7 @@ func deepCopyJobMeta(j *JobMeta) schema.JobMeta {
return jn
}
func deepCopyJobData(d *JobData) *schema.JobData {
func deepCopyJobData(d *JobData, cluster string, subCluster string) *schema.JobData {
var dn = make(schema.JobData)
for k, v := range *d {
@ -126,7 +126,19 @@ func deepCopyJobData(d *JobData) *schema.JobData {
sn.Hostname = v.Hostname
if v.Id != nil {
var id = new(string)
*id = fmt.Sprint(*v.Id)
if mk == schema.MetricScopeAccelerator {
s := GetSubCluster(cluster, subCluster)
var err error
*id, err = s.Topology.GetAcceleratorID(*v.Id)
if err != nil {
log.Fatal(err)
}
} else {
*id = fmt.Sprint(*v.Id)
}
sn.Id = id
}
if v.Statistics != nil {
@ -156,11 +168,7 @@ func deepCopyClusterConfig(co *Cluster) schema.Cluster {
for _, sco := range co.SubClusters {
var scn schema.SubCluster
scn.Name = sco.Name
if sco.Nodes == "" {
scn.Nodes = "*"
} else {
scn.Nodes = sco.Nodes
}
scn.Nodes = sco.Nodes
scn.ProcessorType = sco.ProcessorType
scn.SocketsPerNode = sco.SocketsPerNode
scn.CoresPerSocket = sco.CoresPerSocket
@ -295,7 +303,7 @@ func main() {
if err != nil {
log.Fatal(err)
}
jdn := deepCopyJobData(jd)
jdn := deepCopyJobData(jd, job.Cluster, job.SubCluster)
if err := EncodeJobData(f, jdn); err != nil {
log.Fatal(err)
}