cc-backend/tools/archive-migration/main.go

270 lines
6.1 KiB
Go
Raw Normal View History

// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package main
import (
2023-03-10 12:14:33 +01:00
"bufio"
"encoding/json"
2023-03-27 13:24:06 +02:00
"errors"
"flag"
"fmt"
"log"
2022-11-10 13:37:53 +01:00
"os"
"path/filepath"
2022-11-10 13:37:53 +01:00
"github.com/ClusterCockpit/cc-backend/pkg/schema"
"github.com/ClusterCockpit/cc-backend/pkg/units"
)
2023-03-27 13:24:06 +02:00
const Version = 1
var ar FsArchive
2023-03-10 12:14:33 +01:00
func loadJobData(filename string) (*JobData, error) {
f, err := os.Open(filename)
if err != nil {
return &JobData{}, fmt.Errorf("fsBackend loadJobData()- %v", err)
2023-03-10 12:14:33 +01:00
}
defer f.Close()
return DecodeJobData(bufio.NewReader(f))
}
2023-02-28 09:33:55 +01:00
func deepCopyJobMeta(j *JobMeta) schema.JobMeta {
var jn schema.JobMeta
2023-03-29 06:46:33 +02:00
//required properties
jn.JobID = j.JobID
2023-03-10 12:14:33 +01:00
jn.User = j.User
jn.Project = j.Project
jn.Cluster = j.Cluster
jn.SubCluster = j.SubCluster
jn.NumNodes = j.NumNodes
jn.Exclusive = j.Exclusive
2023-03-29 06:46:33 +02:00
jn.StartTime = j.StartTime
2023-03-10 12:14:33 +01:00
jn.State = schema.JobState(j.State)
2023-03-29 06:46:33 +02:00
jn.Duration = j.Duration
2023-03-10 12:14:33 +01:00
for _, ro := range j.Resources {
var rn schema.Resource
rn.Hostname = ro.Hostname
rn.Configuration = ro.Configuration
2023-03-29 06:46:33 +02:00
if ro.HWThreads != nil {
hwt := make([]int, len(ro.HWThreads))
copy(hwt, ro.HWThreads)
}
if ro.Accelerators != nil {
acc := make([]string, len(ro.Accelerators))
copy(acc, ro.Accelerators)
}
2023-03-10 12:14:33 +01:00
jn.Resources = append(jn.Resources, &rn)
}
2023-02-28 09:33:55 +01:00
2023-03-10 12:14:33 +01:00
for k, v := range j.MetaData {
jn.MetaData[k] = v
}
2023-03-29 06:46:33 +02:00
//optional properties
jn.Partition = j.Partition
jn.ArrayJobId = j.ArrayJobId
jn.NumHWThreads = j.NumHWThreads
jn.NumAcc = j.NumAcc
jn.MonitoringStatus = j.MonitoringStatus
jn.SMT = j.SMT
jn.Walltime = j.Walltime
for _, t := range j.Tags {
jn.Tags = append(jn.Tags, t)
}
2023-03-10 12:14:33 +01:00
return jn
}
func deepCopyJobData(d *JobData) schema.JobData {
2023-03-23 10:29:11 +01:00
var dn = make(schema.JobData)
2023-03-10 12:14:33 +01:00
for k, v := range *d {
for mk, mv := range v {
var mn schema.JobMetric
mn.Unit = units.ConvertUnitString(mv.Unit)
mn.Timestep = mv.Timestep
for _, v := range mv.Series {
var sn schema.Series
sn.Hostname = v.Hostname
2023-03-23 10:29:11 +01:00
if v.Id != nil {
var id = new(string)
*id = fmt.Sprint(*v.Id)
sn.Id = id
}
if v.Statistics != nil {
sn.Statistics = schema.MetricStatistics{
Avg: v.Statistics.Avg,
Min: v.Statistics.Min,
Max: v.Statistics.Max}
}
2023-03-10 12:14:33 +01:00
sn.Data = make([]schema.Float, len(v.Data))
copy(sn.Data, v.Data)
mn.Series = append(mn.Series, sn)
}
dn[k] = make(map[schema.MetricScope]*schema.JobMetric)
2023-03-10 12:14:33 +01:00
dn[k][mk] = &mn
}
}
return dn
2023-02-28 09:33:55 +01:00
}
2022-11-10 13:37:53 +01:00
func deepCopyClusterConfig(co *Cluster) schema.Cluster {
var cn schema.Cluster
cn.Name = co.Name
for _, sco := range co.SubClusters {
var scn schema.SubCluster
scn.Name = sco.Name
if sco.Nodes == "" {
scn.Nodes = "*"
} else {
scn.Nodes = sco.Nodes
}
scn.ProcessorType = sco.ProcessorType
scn.SocketsPerNode = sco.SocketsPerNode
scn.CoresPerSocket = sco.CoresPerSocket
scn.ThreadsPerCore = sco.ThreadsPerCore
2023-03-23 10:29:11 +01:00
var prefix = new(string)
2023-03-22 19:29:29 +01:00
*prefix = "G"
scn.FlopRateScalar = schema.MetricValue{
2023-03-22 19:29:29 +01:00
Unit: schema.Unit{Base: "F/s", Prefix: prefix},
Value: float64(sco.FlopRateScalar)}
scn.FlopRateSimd = schema.MetricValue{
2023-03-22 19:29:29 +01:00
Unit: schema.Unit{Base: "F/s", Prefix: prefix},
Value: float64(sco.FlopRateSimd)}
scn.MemoryBandwidth = schema.MetricValue{
2023-03-22 19:29:29 +01:00
Unit: schema.Unit{Base: "B/s", Prefix: prefix},
Value: float64(sco.MemoryBandwidth)}
scn.Topology = *sco.Topology
cn.SubClusters = append(cn.SubClusters, &scn)
}
2022-11-10 13:37:53 +01:00
for _, mco := range co.MetricConfig {
var mcn schema.MetricConfig
mcn.Name = mco.Name
mcn.Scope = mco.Scope
if mco.Aggregation == "" {
fmt.Println("Property aggregation missing! Please review file!")
mcn.Aggregation = "sum"
} else {
mcn.Aggregation = mco.Aggregation
}
2022-11-10 13:37:53 +01:00
mcn.Timestep = mco.Timestep
mcn.Unit = units.ConvertUnitString(mco.Unit)
mcn.Peak = mco.Peak
mcn.Normal = mco.Normal
mcn.Caution = mco.Caution
mcn.Alert = mco.Alert
mcn.SubClusters = mco.SubClusters
2022-11-10 13:37:53 +01:00
cn.MetricConfig = append(cn.MetricConfig, &mcn)
}
return cn
}
func main() {
var srcPath string
var dstPath string
flag.StringVar(&srcPath, "s", "./var/job-archive", "Specify the source job archive path. Default is ./var/job-archive")
flag.StringVar(&dstPath, "d", "./var/job-archive-new", "Specify the destination job archive path. Default is ./var/job-archive-new")
2023-03-27 15:34:56 +02:00
if _, err := os.Stat(filepath.Join(srcPath, "version.txt")); !errors.Is(err, os.ErrNotExist) {
2023-03-27 13:24:06 +02:00
log.Fatal("Archive version exists!")
}
srcConfig := fmt.Sprintf("{\"path\": \"%s\"}", srcPath)
err := ar.Init(json.RawMessage(srcConfig))
if err != nil {
log.Fatal(err)
}
2022-11-10 13:37:53 +01:00
err = initClusterConfig()
if err != nil {
log.Fatal(err)
}
// setup new job archive
err = os.Mkdir(dstPath, 0750)
if err != nil {
log.Fatal(err)
}
for _, c := range Clusters {
path := fmt.Sprintf("%s/%s", dstPath, c.Name)
fmt.Println(path)
err = os.Mkdir(path, 0750)
if err != nil {
log.Fatal(err)
}
cn := deepCopyClusterConfig(c)
f, err := os.Create(fmt.Sprintf("%s/%s/cluster.json", dstPath, c.Name))
if err != nil {
log.Fatal(err)
}
if err := EncodeCluster(f, &cn); err != nil {
log.Fatal(err)
}
if err := f.Close(); err != nil {
log.Fatal(err)
}
}
2022-11-10 13:37:53 +01:00
2023-02-28 09:33:55 +01:00
for job := range ar.Iter() {
2023-03-27 11:11:28 +02:00
// fmt.Printf("Job %d\n", job.JobID)
job := job
go func() {
path := getPath(job, dstPath, "meta.json")
err = os.MkdirAll(filepath.Dir(path), 0750)
if err != nil {
log.Fatal(err)
}
f, err := os.Create(path)
if err != nil {
log.Fatal(err)
}
2023-02-28 09:33:55 +01:00
2023-03-27 11:11:28 +02:00
jmn := deepCopyJobMeta(job)
if err = EncodeJobMeta(f, &jmn); err != nil {
log.Fatal(err)
}
if err = f.Close(); err != nil {
log.Fatal(err)
}
2023-03-10 12:14:33 +01:00
2023-03-27 11:11:28 +02:00
f, err = os.Create(getPath(job, dstPath, "data.json"))
if err != nil {
log.Fatal(err)
}
2023-03-10 12:14:33 +01:00
2023-03-27 11:11:28 +02:00
var jd *JobData
jd, err = loadJobData(getPath(job, srcPath, "data.json"))
if err != nil {
log.Fatal(err)
}
jdn := deepCopyJobData(jd)
if err := EncodeJobData(f, &jdn); err != nil {
log.Fatal(err)
}
if err := f.Close(); err != nil {
log.Fatal(err)
}
}()
2023-02-28 09:33:55 +01:00
}
2023-03-27 13:24:06 +02:00
2023-03-27 15:34:56 +02:00
os.WriteFile(filepath.Join(dstPath, "version.txt"), []byte(fmt.Sprintf("%d", Version)), 0644)
}