From d2b97c9e2f72fa80753cb52d557323af75a4809f Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Fri, 10 Mar 2023 12:14:33 +0100 Subject: [PATCH] Continue with job-archive migration --- tools/sanitize-archive/fsBackend.go | 20 +++--- tools/sanitize-archive/json.go | 16 ++--- tools/sanitize-archive/main.go | 106 ++++++++++++++++++++++++++-- tools/sanitize-archive/metrics.go | 2 +- 4 files changed, 116 insertions(+), 28 deletions(-) diff --git a/tools/sanitize-archive/fsBackend.go b/tools/sanitize-archive/fsBackend.go index a3b4eff..a8f21b9 100644 --- a/tools/sanitize-archive/fsBackend.go +++ b/tools/sanitize-archive/fsBackend.go @@ -77,16 +77,6 @@ func (fsa *FsArchive) Init(rawConfig json.RawMessage) error { return nil } -func (fsa *FsArchive) LoadClusterCfg(name string) (*Cluster, error) { - - b, err := os.ReadFile(filepath.Join(fsa.path, name, "cluster.json")) - if err != nil { - log.Errorf("fsBackend LoadClusterCfg()- %v", err) - return &Cluster{}, err - } - return DecodeCluster(bytes.NewReader(b)) -} - func (fsa *FsArchive) Iter() <-chan *JobMeta { ch := make(chan *JobMeta) @@ -138,7 +128,15 @@ func (fsa *FsArchive) Iter() <-chan *JobMeta { return ch } -func (fsa *FsArchive) GetClusters() []string { +func (fsa *FsArchive) LoadClusterCfg(name string) (*Cluster, error) { + b, err := os.ReadFile(filepath.Join(fsa.path, name, "cluster.json")) + if err != nil { + log.Errorf("fsBackend LoadClusterCfg()- %v", err) + return &Cluster{}, err + } + return DecodeCluster(bytes.NewReader(b)) +} +func (fsa *FsArchive) GetClusters() []string { return fsa.clusters } diff --git a/tools/sanitize-archive/json.go b/tools/sanitize-archive/json.go index a94b0dd..174e725 100644 --- a/tools/sanitize-archive/json.go +++ b/tools/sanitize-archive/json.go @@ -11,38 +11,34 @@ import ( "github.com/ClusterCockpit/cc-backend/pkg/schema" ) -func DecodeJobData(r io.Reader, k string) (JobData, error) { +func DecodeJobData(r io.Reader) (*JobData, error) { var d JobData if err := json.NewDecoder(r).Decode(&d); err != nil { return nil, err } - return d, nil + return &d, nil } func DecodeJobMeta(r io.Reader) (*JobMeta, error) { var d JobMeta if err := json.NewDecoder(r).Decode(&d); err != nil { - return &d, err + return nil, err } - // Sanitize parameters - return &d, nil } func DecodeCluster(r io.Reader) (*Cluster, error) { var c Cluster if err := json.NewDecoder(r).Decode(&c); err != nil { - return &c, err + return nil, err } - // Sanitize parameters - return &c, nil } -func EncodeJobData(w io.Writer, d *JobData) error { +func EncodeJobData(w io.Writer, d *schema.JobData) error { // Sanitize parameters if err := json.NewEncoder(w).Encode(d); err != nil { return err @@ -51,7 +47,7 @@ func EncodeJobData(w io.Writer, d *JobData) error { return nil } -func EncodeJobMeta(w io.Writer, d *JobMeta) error { +func EncodeJobMeta(w io.Writer, d *schema.JobMeta) error { // Sanitize parameters if err := json.NewEncoder(w).Encode(d); err != nil { return err diff --git a/tools/sanitize-archive/main.go b/tools/sanitize-archive/main.go index d8f8531..7ad7c59 100644 --- a/tools/sanitize-archive/main.go +++ b/tools/sanitize-archive/main.go @@ -5,6 +5,7 @@ package main import ( + "bufio" "encoding/json" "flag" "fmt" @@ -17,12 +18,88 @@ import ( var ar FsArchive +func loadJobData(filename string) (*JobData, error) { + + f, err := os.Open(filename) + if err != nil { + fmt.Errorf("fsBackend loadJobData()- %v", err) + return &JobData{}, err + } + defer f.Close() + + return DecodeJobData(bufio.NewReader(f)) +} + func deepCopyJobMeta(j *JobMeta) schema.JobMeta { var jn schema.JobMeta - jn.StartTime = j - jn.BaseJob = j.BaseJob + jn.StartTime = j.StartTime + jn.User = j.User + jn.Project = j.Project + jn.Cluster = j.Cluster + jn.SubCluster = j.SubCluster + jn.Partition = j.Partition + jn.ArrayJobId = j.ArrayJobId + jn.NumNodes = j.NumNodes + jn.NumHWThreads = j.NumHWThreads + jn.NumAcc = j.NumAcc + jn.Exclusive = j.Exclusive + jn.MonitoringStatus = j.MonitoringStatus + jn.SMT = j.SMT + jn.Duration = j.Duration + jn.Walltime = j.Walltime + jn.State = schema.JobState(j.State) + jn.Exclusive = j.Exclusive + jn.Exclusive = j.Exclusive + jn.Exclusive = j.Exclusive + for _, ro := range j.Resources { + var rn schema.Resource + rn.Hostname = ro.Hostname + rn.Configuration = ro.Configuration + hwt := make([]int, len(ro.HWThreads)) + copy(hwt, ro.HWThreads) + acc := make([]string, len(ro.Accelerators)) + copy(acc, ro.Accelerators) + jn.Resources = append(jn.Resources, &rn) + } + + for k, v := range j.MetaData { + jn.MetaData[k] = v + } + + return jn +} + +func deepCopyJobData(d *JobData) schema.JobData { + var dn schema.JobData + + for k, v := range *d { + for mk, mv := range v { + var mn schema.JobMetric + mn.Unit = units.ConvertUnitString(mv.Unit) + mn.Scope = mv.Scope + mn.Timestep = mv.Timestep + + for _, v := range mv.Series { + var sn schema.Series + sn.Hostname = v.Hostname + sn.Id = v.Id + sn.Statistics = &schema.MetricStatistics{ + Avg: v.Statistics.Avg, + Min: v.Statistics.Min, + Max: v.Statistics.Max} + + sn.Data = make([]schema.Float, len(v.Data)) + copy(sn.Data, v.Data) + mn.Series = append(mn.Series, sn) + } + + dn[k][mk] = &mn + } + } + + return dn } func deepCopyClusterConfig(co *Cluster) schema.Cluster { @@ -37,10 +114,6 @@ func deepCopyClusterConfig(co *Cluster) schema.Cluster { mcn.Scope = mco.Scope mcn.Aggregation = mco.Aggregation mcn.Timestep = mco.Timestep - mcn.Peak = mco.Peak - mcn.Normal = mco.Normal - mcn.Caution = mco.Caution - mcn.Alert = mco.Alert mcn.Unit = units.ConvertUnitString(mco.Unit) cn.MetricConfig = append(cn.MetricConfig, &mcn) } @@ -102,5 +175,26 @@ func main() { } jmn := deepCopyJobMeta(job) + if err := EncodeJobMeta(f, &jmn); err != nil { + log.Fatal(err) + } + if err := f.Close(); err != nil { + log.Fatal(err) + } + + f, err = os.Create(getPath(job, root, "data.json")) + if err != nil { + log.Fatal(err) + } + + sroot := fmt.Sprintf("%s/%s/", srcPath, job.Cluster) + jd, err := loadJobData(getPath(job, sroot, "data.json")) + jdn := deepCopyJobData(jd) + if err := EncodeJobData(f, &jdn); err != nil { + log.Fatal(err) + } + if err := f.Close(); err != nil { + log.Fatal(err) + } } } diff --git a/tools/sanitize-archive/metrics.go b/tools/sanitize-archive/metrics.go index 401f67e..e87332f 100644 --- a/tools/sanitize-archive/metrics.go +++ b/tools/sanitize-archive/metrics.go @@ -22,7 +22,7 @@ type Series struct { Hostname string `json:"hostname"` Id *int `json:"id,omitempty"` Statistics *MetricStatistics `json:"statistics"` - Data []Float `json:"data"` + Data []schema.Float `json:"data"` } type MetricStatistics struct {