Continue with job-archive migration

This commit is contained in:
Jan Eitzinger 2023-03-10 12:14:33 +01:00
parent 97f391deff
commit d2b97c9e2f
4 changed files with 116 additions and 28 deletions

View File

@ -77,16 +77,6 @@ func (fsa *FsArchive) Init(rawConfig json.RawMessage) error {
return nil
}
func (fsa *FsArchive) LoadClusterCfg(name string) (*Cluster, error) {
b, err := os.ReadFile(filepath.Join(fsa.path, name, "cluster.json"))
if err != nil {
log.Errorf("fsBackend LoadClusterCfg()- %v", err)
return &Cluster{}, err
}
return DecodeCluster(bytes.NewReader(b))
}
func (fsa *FsArchive) Iter() <-chan *JobMeta {
ch := make(chan *JobMeta)
@ -138,7 +128,15 @@ func (fsa *FsArchive) Iter() <-chan *JobMeta {
return ch
}
func (fsa *FsArchive) GetClusters() []string {
func (fsa *FsArchive) LoadClusterCfg(name string) (*Cluster, error) {
b, err := os.ReadFile(filepath.Join(fsa.path, name, "cluster.json"))
if err != nil {
log.Errorf("fsBackend LoadClusterCfg()- %v", err)
return &Cluster{}, err
}
return DecodeCluster(bytes.NewReader(b))
}
func (fsa *FsArchive) GetClusters() []string {
return fsa.clusters
}

View File

@ -11,38 +11,34 @@ import (
"github.com/ClusterCockpit/cc-backend/pkg/schema"
)
func DecodeJobData(r io.Reader, k string) (JobData, error) {
func DecodeJobData(r io.Reader) (*JobData, error) {
var d JobData
if err := json.NewDecoder(r).Decode(&d); err != nil {
return nil, err
}
return d, nil
return &d, nil
}
func DecodeJobMeta(r io.Reader) (*JobMeta, error) {
var d JobMeta
if err := json.NewDecoder(r).Decode(&d); err != nil {
return &d, err
return nil, err
}
// Sanitize parameters
return &d, nil
}
func DecodeCluster(r io.Reader) (*Cluster, error) {
var c Cluster
if err := json.NewDecoder(r).Decode(&c); err != nil {
return &c, err
return nil, err
}
// Sanitize parameters
return &c, nil
}
func EncodeJobData(w io.Writer, d *JobData) error {
func EncodeJobData(w io.Writer, d *schema.JobData) error {
// Sanitize parameters
if err := json.NewEncoder(w).Encode(d); err != nil {
return err
@ -51,7 +47,7 @@ func EncodeJobData(w io.Writer, d *JobData) error {
return nil
}
func EncodeJobMeta(w io.Writer, d *JobMeta) error {
func EncodeJobMeta(w io.Writer, d *schema.JobMeta) error {
// Sanitize parameters
if err := json.NewEncoder(w).Encode(d); err != nil {
return err

View File

@ -5,6 +5,7 @@
package main
import (
"bufio"
"encoding/json"
"flag"
"fmt"
@ -17,12 +18,88 @@ import (
var ar FsArchive
func loadJobData(filename string) (*JobData, error) {
f, err := os.Open(filename)
if err != nil {
fmt.Errorf("fsBackend loadJobData()- %v", err)
return &JobData{}, err
}
defer f.Close()
return DecodeJobData(bufio.NewReader(f))
}
func deepCopyJobMeta(j *JobMeta) schema.JobMeta {
var jn schema.JobMeta
jn.StartTime = j
jn.BaseJob = j.BaseJob
jn.StartTime = j.StartTime
jn.User = j.User
jn.Project = j.Project
jn.Cluster = j.Cluster
jn.SubCluster = j.SubCluster
jn.Partition = j.Partition
jn.ArrayJobId = j.ArrayJobId
jn.NumNodes = j.NumNodes
jn.NumHWThreads = j.NumHWThreads
jn.NumAcc = j.NumAcc
jn.Exclusive = j.Exclusive
jn.MonitoringStatus = j.MonitoringStatus
jn.SMT = j.SMT
jn.Duration = j.Duration
jn.Walltime = j.Walltime
jn.State = schema.JobState(j.State)
jn.Exclusive = j.Exclusive
jn.Exclusive = j.Exclusive
jn.Exclusive = j.Exclusive
for _, ro := range j.Resources {
var rn schema.Resource
rn.Hostname = ro.Hostname
rn.Configuration = ro.Configuration
hwt := make([]int, len(ro.HWThreads))
copy(hwt, ro.HWThreads)
acc := make([]string, len(ro.Accelerators))
copy(acc, ro.Accelerators)
jn.Resources = append(jn.Resources, &rn)
}
for k, v := range j.MetaData {
jn.MetaData[k] = v
}
return jn
}
func deepCopyJobData(d *JobData) schema.JobData {
var dn schema.JobData
for k, v := range *d {
for mk, mv := range v {
var mn schema.JobMetric
mn.Unit = units.ConvertUnitString(mv.Unit)
mn.Scope = mv.Scope
mn.Timestep = mv.Timestep
for _, v := range mv.Series {
var sn schema.Series
sn.Hostname = v.Hostname
sn.Id = v.Id
sn.Statistics = &schema.MetricStatistics{
Avg: v.Statistics.Avg,
Min: v.Statistics.Min,
Max: v.Statistics.Max}
sn.Data = make([]schema.Float, len(v.Data))
copy(sn.Data, v.Data)
mn.Series = append(mn.Series, sn)
}
dn[k][mk] = &mn
}
}
return dn
}
func deepCopyClusterConfig(co *Cluster) schema.Cluster {
@ -37,10 +114,6 @@ func deepCopyClusterConfig(co *Cluster) schema.Cluster {
mcn.Scope = mco.Scope
mcn.Aggregation = mco.Aggregation
mcn.Timestep = mco.Timestep
mcn.Peak = mco.Peak
mcn.Normal = mco.Normal
mcn.Caution = mco.Caution
mcn.Alert = mco.Alert
mcn.Unit = units.ConvertUnitString(mco.Unit)
cn.MetricConfig = append(cn.MetricConfig, &mcn)
}
@ -102,5 +175,26 @@ func main() {
}
jmn := deepCopyJobMeta(job)
if err := EncodeJobMeta(f, &jmn); err != nil {
log.Fatal(err)
}
if err := f.Close(); err != nil {
log.Fatal(err)
}
f, err = os.Create(getPath(job, root, "data.json"))
if err != nil {
log.Fatal(err)
}
sroot := fmt.Sprintf("%s/%s/", srcPath, job.Cluster)
jd, err := loadJobData(getPath(job, sroot, "data.json"))
jdn := deepCopyJobData(jd)
if err := EncodeJobData(f, &jdn); err != nil {
log.Fatal(err)
}
if err := f.Close(); err != nil {
log.Fatal(err)
}
}
}

View File

@ -22,7 +22,7 @@ type Series struct {
Hostname string `json:"hostname"`
Id *int `json:"id,omitempty"`
Statistics *MetricStatistics `json:"statistics"`
Data []Float `json:"data"`
Data []schema.Float `json:"data"`
}
type MetricStatistics struct {