Remove obsolete Archive Migration Tool

This commit is contained in:
Jan Eitzinger 2024-11-14 19:09:56 +01:00
parent 1a87ed8210
commit 6056341525
Signed by: moebiusband
GPG Key ID: 2574BA29B90D6DD5
8 changed files with 0 additions and 1146 deletions

View File

@ -1,65 +0,0 @@
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package main
import (
"github.com/ClusterCockpit/cc-backend/pkg/schema"
)
// type Accelerator struct {
// ID string `json:"id"`
// Type string `json:"type"`
// Model string `json:"model"`
// }
// type Topology struct {
// Node []int `json:"node"`
// Socket [][]int `json:"socket"`
// MemoryDomain [][]int `json:"memoryDomain"`
// Die [][]int `json:"die"`
// Core [][]int `json:"core"`
// Accelerators []*Accelerator `json:"accelerators"`
// }
type SubCluster struct {
Name string `json:"name"`
Nodes string `json:"nodes"`
NumberOfNodes int `json:"numberOfNodes"`
ProcessorType string `json:"processorType"`
SocketsPerNode int `json:"socketsPerNode"`
CoresPerSocket int `json:"coresPerSocket"`
ThreadsPerCore int `json:"threadsPerCore"`
FlopRateScalar int `json:"flopRateScalar"`
FlopRateSimd int `json:"flopRateSimd"`
MemoryBandwidth int `json:"memoryBandwidth"`
Topology *schema.Topology `json:"topology"`
}
// type SubClusterConfig struct {
// Name string `json:"name"`
// Peak float64 `json:"peak"`
// Normal float64 `json:"normal"`
// Caution float64 `json:"caution"`
// Alert float64 `json:"alert"`
// }
type MetricConfig struct {
Name string `json:"name"`
Unit string `json:"unit"`
Scope schema.MetricScope `json:"scope"`
Aggregation string `json:"aggregation"`
Timestep int `json:"timestep"`
Peak float64 `json:"peak"`
Normal float64 `json:"normal"`
Caution float64 `json:"caution"`
Alert float64 `json:"alert"`
SubClusters []*schema.SubClusterConfig `json:"subClusters"`
}
type Cluster struct {
Name string `json:"name"`
MetricConfig []*MetricConfig `json:"metricConfig"`
SubClusters []*SubCluster `json:"subClusters"`
}

View File

@ -1,166 +0,0 @@
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package main
import (
"errors"
"fmt"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
)
var Clusters []*Cluster
var nodeLists map[string]map[string]archive.NodeList
func initClusterConfig() error {
Clusters = []*Cluster{}
nodeLists = map[string]map[string]archive.NodeList{}
for _, c := range ar.GetClusters() {
cluster, err := ar.LoadClusterCfg(c)
if err != nil {
return err
}
if len(cluster.Name) == 0 ||
len(cluster.MetricConfig) == 0 ||
len(cluster.SubClusters) == 0 {
return errors.New("cluster.name, cluster.metricConfig and cluster.SubClusters should not be empty")
}
for _, mc := range cluster.MetricConfig {
if len(mc.Name) == 0 {
return errors.New("cluster.metricConfig.name should not be empty")
}
if mc.Timestep < 1 {
return errors.New("cluster.metricConfig.timestep should not be smaller than one")
}
// For backwards compability...
if mc.Scope == "" {
mc.Scope = schema.MetricScopeNode
}
if !mc.Scope.Valid() {
return errors.New("cluster.metricConfig.scope must be a valid scope ('node', 'scocket', ...)")
}
}
Clusters = append(Clusters, cluster)
nodeLists[cluster.Name] = make(map[string]archive.NodeList)
for _, sc := range cluster.SubClusters {
if sc.Nodes == "" {
continue
}
nl, err := archive.ParseNodeList(sc.Nodes)
if err != nil {
return fmt.Errorf("in %s/cluster.json: %w", cluster.Name, err)
}
nodeLists[cluster.Name][sc.Name] = nl
}
}
return nil
}
func GetCluster(cluster string) *Cluster {
for _, c := range Clusters {
if c.Name == cluster {
return c
}
}
return nil
}
func GetSubCluster(cluster, subcluster string) *SubCluster {
for _, c := range Clusters {
if c.Name == cluster {
for _, p := range c.SubClusters {
if p.Name == subcluster {
return p
}
}
}
}
return nil
}
func GetMetricConfig(cluster, metric string) *MetricConfig {
for _, c := range Clusters {
if c.Name == cluster {
for _, m := range c.MetricConfig {
if m.Name == metric {
return m
}
}
}
}
return nil
}
// AssignSubCluster sets the `job.subcluster` property of the job based
// on its cluster and resources.
func AssignSubCluster(job *BaseJob) error {
cluster := GetCluster(job.Cluster)
if cluster == nil {
return fmt.Errorf("unkown cluster: %#v", job.Cluster)
}
if job.SubCluster != "" {
for _, sc := range cluster.SubClusters {
if sc.Name == job.SubCluster {
return nil
}
}
return fmt.Errorf("already assigned subcluster %#v unkown (cluster: %#v)", job.SubCluster, job.Cluster)
}
if len(job.Resources) == 0 {
return fmt.Errorf("job without any resources/hosts")
}
host0 := job.Resources[0].Hostname
for sc, nl := range nodeLists[job.Cluster] {
if nl != nil && nl.Contains(host0) {
job.SubCluster = sc
return nil
}
}
if cluster.SubClusters[0].Nodes == "" {
job.SubCluster = cluster.SubClusters[0].Name
return nil
}
return fmt.Errorf("no subcluster found for cluster %#v and host %#v", job.Cluster, host0)
}
func GetSubClusterByNode(cluster, hostname string) (string, error) {
for sc, nl := range nodeLists[cluster] {
if nl != nil && nl.Contains(hostname) {
return sc, nil
}
}
c := GetCluster(cluster)
if c == nil {
return "", fmt.Errorf("unkown cluster: %#v", cluster)
}
if c.SubClusters[0].Nodes == "" {
return c.SubClusters[0].Name, nil
}
return "", fmt.Errorf("no subcluster found for cluster %#v and host %#v", cluster, hostname)
}

View File

@ -1,109 +0,0 @@
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package main
import (
"errors"
"io"
"math"
"strconv"
)
// A custom float type is used so that (Un)MarshalJSON and
// (Un)MarshalGQL can be overloaded and NaN/null can be used.
// The default behaviour of putting every nullable value behind
// a pointer has a bigger overhead.
type Float float64
var NaN Float = Float(math.NaN())
var nullAsBytes []byte = []byte("null")
func (f Float) IsNaN() bool {
return math.IsNaN(float64(f))
}
// NaN will be serialized to `null`.
func (f Float) MarshalJSON() ([]byte, error) {
if f.IsNaN() {
return nullAsBytes, nil
}
return strconv.AppendFloat(make([]byte, 0, 10), float64(f), 'f', 2, 64), nil
}
// `null` will be unserialized to NaN.
func (f *Float) UnmarshalJSON(input []byte) error {
s := string(input)
if s == "null" {
*f = NaN
return nil
}
val, err := strconv.ParseFloat(s, 64)
if err != nil {
return err
}
*f = Float(val)
return nil
}
// UnmarshalGQL implements the graphql.Unmarshaler interface.
func (f *Float) UnmarshalGQL(v interface{}) error {
f64, ok := v.(float64)
if !ok {
return errors.New("invalid Float scalar")
}
*f = Float(f64)
return nil
}
// MarshalGQL implements the graphql.Marshaler interface.
// NaN will be serialized to `null`.
func (f Float) MarshalGQL(w io.Writer) {
if f.IsNaN() {
w.Write(nullAsBytes)
} else {
w.Write(strconv.AppendFloat(make([]byte, 0, 10), float64(f), 'f', 2, 64))
}
}
// Only used via REST-API, not via GraphQL.
// This uses a lot less allocations per series,
// but it turns out that the performance increase
// from using this is not that big.
func (s *Series) MarshalJSON() ([]byte, error) {
buf := make([]byte, 0, 512+len(s.Data)*8)
buf = append(buf, `{"hostname":"`...)
buf = append(buf, s.Hostname...)
buf = append(buf, '"')
if s.Id != nil {
buf = append(buf, `,"id":`...)
buf = strconv.AppendInt(buf, int64(*s.Id), 10)
}
if s.Statistics != nil {
buf = append(buf, `,"statistics":{"min":`...)
buf = strconv.AppendFloat(buf, s.Statistics.Min, 'f', 2, 64)
buf = append(buf, `,"avg":`...)
buf = strconv.AppendFloat(buf, s.Statistics.Avg, 'f', 2, 64)
buf = append(buf, `,"max":`...)
buf = strconv.AppendFloat(buf, s.Statistics.Max, 'f', 2, 64)
buf = append(buf, '}')
}
buf = append(buf, `,"data":[`...)
for i := 0; i < len(s.Data); i++ {
if i != 0 {
buf = append(buf, ',')
}
if s.Data[i].IsNaN() {
buf = append(buf, `null`...)
} else {
buf = strconv.AppendFloat(buf, float64(s.Data[i]), 'f', 2, 32)
}
}
buf = append(buf, ']', '}')
return buf, nil
}

View File

@ -1,142 +0,0 @@
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package main
import (
"bufio"
"bytes"
"encoding/json"
"fmt"
"os"
"path/filepath"
"strconv"
"github.com/ClusterCockpit/cc-backend/pkg/log"
)
type FsArchiveConfig struct {
Path string `json:"path"`
}
type FsArchive struct {
path string
clusters []string
}
func getPath(
job *JobMeta,
rootPath string,
file string) string {
lvl1, lvl2 := fmt.Sprintf("%d", job.JobID/1000), fmt.Sprintf("%03d", job.JobID%1000)
return filepath.Join(
rootPath,
job.Cluster,
lvl1, lvl2,
strconv.FormatInt(job.StartTime, 10), file)
}
func loadJobMeta(filename string) (*JobMeta, error) {
f, err := os.Open(filename)
if err != nil {
log.Errorf("fsBackend loadJobMeta()- %v", err)
return &JobMeta{}, err
}
defer f.Close()
return DecodeJobMeta(bufio.NewReader(f))
}
func (fsa *FsArchive) Init(rawConfig json.RawMessage) error {
var config FsArchiveConfig
if err := json.Unmarshal(rawConfig, &config); err != nil {
log.Errorf("fsBackend Init()- %v", err)
return err
}
if config.Path == "" {
err := fmt.Errorf("fsBackend Init()- empty path")
log.Errorf("fsBackend Init()- %v", err)
return err
}
fsa.path = config.Path
entries, err := os.ReadDir(fsa.path)
if err != nil {
log.Errorf("fsBackend Init()- %v", err)
return err
}
for _, de := range entries {
fsa.clusters = append(fsa.clusters, de.Name())
}
return nil
}
func (fsa *FsArchive) Iter() <-chan *JobMeta {
ch := make(chan *JobMeta)
go func() {
clustersDir, err := os.ReadDir(fsa.path)
if err != nil {
log.Fatalf("Reading clusters failed: %s", err.Error())
}
for _, clusterDir := range clustersDir {
lvl1Dirs, err := os.ReadDir(filepath.Join(fsa.path, clusterDir.Name()))
if err != nil {
log.Fatalf("Reading jobs failed: %s", err.Error())
}
for _, lvl1Dir := range lvl1Dirs {
if !lvl1Dir.IsDir() {
// Could be the cluster.json file
continue
}
lvl2Dirs, err := os.ReadDir(filepath.Join(fsa.path, clusterDir.Name(), lvl1Dir.Name()))
if err != nil {
log.Fatalf("Reading jobs failed: %s", err.Error())
}
for _, lvl2Dir := range lvl2Dirs {
dirpath := filepath.Join(fsa.path, clusterDir.Name(), lvl1Dir.Name(), lvl2Dir.Name())
startTimeDirs, err := os.ReadDir(dirpath)
if err != nil {
log.Fatalf("Reading jobs failed: %s", err.Error())
}
for _, startTimeDir := range startTimeDirs {
if startTimeDir.IsDir() {
job, err := loadJobMeta(filepath.Join(dirpath, startTimeDir.Name(), "meta.json"))
if err != nil {
log.Errorf("in %s: %s", filepath.Join(dirpath, startTimeDir.Name()), err.Error())
} else {
ch <- job
}
}
}
}
}
}
close(ch)
}()
return ch
}
func (fsa *FsArchive) LoadClusterCfg(name string) (*Cluster, error) {
b, err := os.ReadFile(filepath.Join(fsa.path, name, "cluster.json"))
if err != nil {
log.Errorf("fsBackend LoadClusterCfg()- %v", err)
return &Cluster{}, err
}
return DecodeCluster(bytes.NewReader(b))
}
func (fsa *FsArchive) GetClusters() []string {
return fsa.clusters
}

View File

@ -1,162 +0,0 @@
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package main
import (
"errors"
"fmt"
"io"
"time"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
)
// Non-Swaggered Comment: BaseJob
// Non-Swaggered Comment: Common subset of Job and JobMeta. Use one of those, not this type directly.
type BaseJob struct {
// The unique identifier of a job
JobID int64 `json:"jobId" db:"job_id" example:"123000"`
User string `json:"user" db:"user" example:"abcd100h"` // The unique identifier of a user
Project string `json:"project" db:"project" example:"abcd200"` // The unique identifier of a project
Cluster string `json:"cluster" db:"cluster" example:"fritz"` // The unique identifier of a cluster
SubCluster string `json:"subCluster" db:"subcluster" example:"main"` // The unique identifier of a sub cluster
Partition string `json:"partition" db:"partition" example:"main"` // The Slurm partition to which the job was submitted
ArrayJobId int64 `json:"arrayJobId" db:"array_job_id" example:"123000"` // The unique identifier of an array job
NumNodes int32 `json:"numNodes" db:"num_nodes" example:"2" minimum:"1"` // Number of nodes used (Min > 0)
NumHWThreads int32 `json:"numHwthreads" db:"num_hwthreads" example:"20" minimum:"1"` // Number of HWThreads used (Min > 0)
NumAcc int32 `json:"numAcc" db:"num_acc" example:"2" minimum:"1"` // Number of accelerators used (Min > 0)
Exclusive int32 `json:"exclusive" db:"exclusive" example:"1" minimum:"0" maximum:"2"` // Specifies how nodes are shared: 0 - Shared among multiple jobs of multiple users, 1 - Job exclusive (Default), 2 - Shared among multiple jobs of same user
MonitoringStatus int32 `json:"monitoringStatus" db:"monitoring_status" example:"1" minimum:"0" maximum:"3"` // State of monitoring system during job run: 0 - Disabled, 1 - Running or Archiving (Default), 2 - Archiving Failed, 3 - Archiving Successfull
SMT int32 `json:"smt" db:"smt" example:"4"` // SMT threads used by job
State JobState `json:"jobState" db:"job_state" example:"completed" enums:"completed,failed,cancelled,stopped,timeout,out_of_memory"` // Final state of job
Duration int32 `json:"duration" db:"duration" example:"43200" minimum:"1"` // Duration of job in seconds (Min > 0)
Walltime int64 `json:"walltime" db:"walltime" example:"86400" minimum:"1"` // Requested walltime of job in seconds (Min > 0)
Tags []*schema.Tag `json:"tags"` // List of tags
RawResources []byte `json:"-" db:"resources"` // Resources used by job [As Bytes]
Resources []*Resource `json:"resources"` // Resources used by job
RawMetaData []byte `json:"-" db:"meta_data"` // Additional information about the job [As Bytes]
MetaData map[string]string `json:"metaData"` // Additional information about the job
}
// Non-Swaggered Comment: Job
// Non-Swaggered Comment: This type is used as the GraphQL interface and using sqlx as a table row.
// Job model
// @Description Information of a HPC job.
type Job struct {
// The unique identifier of a job in the database
ID int64 `json:"id" db:"id"`
BaseJob
StartTimeUnix int64 `json:"-" db:"start_time" example:"1649723812"` // Start epoch time stamp in seconds
StartTime time.Time `json:"startTime"` // Start time as 'time.Time' data type
MemUsedMax float64 `json:"-" db:"mem_used_max"` // MemUsedMax as Float64
FlopsAnyAvg float64 `json:"-" db:"flops_any_avg"` // FlopsAnyAvg as Float64
MemBwAvg float64 `json:"-" db:"mem_bw_avg"` // MemBwAvg as Float64
LoadAvg float64 `json:"-" db:"load_avg"` // LoadAvg as Float64
NetBwAvg float64 `json:"-" db:"net_bw_avg"` // NetBwAvg as Float64
NetDataVolTotal float64 `json:"-" db:"net_data_vol_total"` // NetDataVolTotal as Float64
FileBwAvg float64 `json:"-" db:"file_bw_avg"` // FileBwAvg as Float64
FileDataVolTotal float64 `json:"-" db:"file_data_vol_total"` // FileDataVolTotal as Float64
}
// Non-Swaggered Comment: JobMeta
// Non-Swaggered Comment: When reading from the database or sending data via GraphQL, the start time can be in the much more
// Non-Swaggered Comment: convenient time.Time type. In the `meta.json` files, the start time is encoded as a unix epoch timestamp.
// Non-Swaggered Comment: This is why there is this struct, which contains all fields from the regular job struct, but "overwrites"
// Non-Swaggered Comment: the StartTime field with one of type int64.
// Non-Swaggered Comment: ID *int64 `json:"id,omitempty"` >> never used in the job-archive, only available via REST-API
// JobMeta model
// @Description Meta data information of a HPC job.
type JobMeta struct {
// The unique identifier of a job in the database
ID *int64 `json:"id,omitempty"`
BaseJob
StartTime int64 `json:"startTime" db:"start_time" example:"1649723812" minimum:"1"` // Start epoch time stamp in seconds (Min > 0)
Statistics map[string]JobStatistics `json:"statistics,omitempty"` // Metric statistics of job
}
const (
MonitoringStatusDisabled int32 = 0
MonitoringStatusRunningOrArchiving int32 = 1
MonitoringStatusArchivingFailed int32 = 2
MonitoringStatusArchivingSuccessful int32 = 3
)
var JobDefaults BaseJob = BaseJob{
Exclusive: 1,
MonitoringStatus: MonitoringStatusRunningOrArchiving,
}
// JobStatistics model
// @Description Specification for job metric statistics.
type JobStatistics struct {
// Metric unit (see schema/unit.schema.json)
Unit string `json:"unit" example:"GHz"`
Avg float64 `json:"avg" example:"2500" minimum:"0"` // Job metric average
Min float64 `json:"min" example:"2000" minimum:"0"` // Job metric minimum
Max float64 `json:"max" example:"3000" minimum:"0"` // Job metric maximum
}
// Tag model
// @Description Defines a tag using name and type.
type Tag struct {
// The unique DB identifier of a tag
ID int64 `json:"id" db:"id"`
Type string `json:"type" db:"tag_type" example:"Debug"` // Tag Type
Name string `json:"name" db:"tag_name" example:"Testjob"` // Tag Name
}
// Resource model
// @Description A resource used by a job
type Resource struct {
Hostname string `json:"hostname"` // Name of the host (= node)
HWThreads []int `json:"hwthreads,omitempty"` // List of OS processor ids
Accelerators []string `json:"accelerators,omitempty"` // List of of accelerator device ids
Configuration string `json:"configuration,omitempty"` // The configuration options of the node
}
type JobState string
const (
JobStateRunning JobState = "running"
JobStateCompleted JobState = "completed"
JobStateFailed JobState = "failed"
JobStateCancelled JobState = "cancelled"
JobStateStopped JobState = "stopped"
JobStateTimeout JobState = "timeout"
JobStatePreempted JobState = "preempted"
JobStateOutOfMemory JobState = "out_of_memory"
)
func (e *JobState) UnmarshalGQL(v interface{}) error {
str, ok := v.(string)
if !ok {
return fmt.Errorf("enums must be strings")
}
*e = JobState(str)
if !e.Valid() {
return errors.New("invalid job state")
}
return nil
}
func (e JobState) MarshalGQL(w io.Writer) {
fmt.Fprintf(w, "\"%s\"", e)
}
func (e JobState) Valid() bool {
return e == JobStateRunning ||
e == JobStateCompleted ||
e == JobStateFailed ||
e == JobStateCancelled ||
e == JobStateStopped ||
e == JobStateTimeout ||
e == JobStatePreempted ||
e == JobStateOutOfMemory
}

View File

@ -1,66 +0,0 @@
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package main
import (
"encoding/json"
"io"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
)
func DecodeJobData(r io.Reader) (*JobData, error) {
var d JobData
if err := json.NewDecoder(r).Decode(&d); err != nil {
return nil, err
}
return &d, nil
}
func DecodeJobMeta(r io.Reader) (*JobMeta, error) {
var d JobMeta
if err := json.NewDecoder(r).Decode(&d); err != nil {
return nil, err
}
return &d, nil
}
func DecodeCluster(r io.Reader) (*Cluster, error) {
var c Cluster
if err := json.NewDecoder(r).Decode(&c); err != nil {
return nil, err
}
return &c, nil
}
func EncodeJobData(w io.Writer, d *schema.JobData) error {
// Sanitize parameters
if err := json.NewEncoder(w).Encode(d); err != nil {
return err
}
return nil
}
func EncodeJobMeta(w io.Writer, d *schema.JobMeta) error {
// Sanitize parameters
if err := json.NewEncoder(w).Encode(d); err != nil {
return err
}
return nil
}
func EncodeCluster(w io.Writer, c *schema.Cluster) error {
// Sanitize parameters
if err := json.NewEncoder(w).Encode(c); err != nil {
return err
}
return nil
}

View File

@ -1,371 +0,0 @@
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package main
import (
"bufio"
"encoding/json"
"errors"
"flag"
"fmt"
"os"
"path/filepath"
"sync"
"github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
ccunits "github.com/ClusterCockpit/cc-units"
)
const Version = 1
var ar FsArchive
var srcPath string
var dstPath string
func loadJobData(filename string) (*JobData, error) {
f, err := os.Open(filename)
if err != nil {
return &JobData{}, fmt.Errorf("fsBackend loadJobData()- %v", err)
}
defer f.Close()
return DecodeJobData(bufio.NewReader(f))
}
func ConvertUnitString(us string) schema.Unit {
var nu schema.Unit
if us == "CPI" ||
us == "IPC" ||
us == "load" ||
us == "" {
nu.Base = us
return nu
}
u := ccunits.NewUnit(us)
p := u.GetPrefix()
if p.Prefix() != "" {
prefix := p.Prefix()
nu.Prefix = prefix
}
m := u.GetMeasure()
d := u.GetUnitDenominator()
if d.Short() != "inval" {
nu.Base = fmt.Sprintf("%s/%s", m.Short(), d.Short())
} else {
nu.Base = m.Short()
}
return nu
}
func deepCopyJobMeta(j *JobMeta) schema.JobMeta {
var jn schema.JobMeta
//required properties
jn.JobID = j.JobID
jn.User = j.User
jn.Project = j.Project
jn.Cluster = j.Cluster
jn.SubCluster = j.SubCluster
jn.NumNodes = j.NumNodes
jn.Exclusive = j.Exclusive
jn.StartTime = j.StartTime
jn.State = schema.JobState(j.State)
jn.Duration = j.Duration
for _, ro := range j.Resources {
var rn schema.Resource
rn.Hostname = ro.Hostname
rn.Configuration = ro.Configuration
hwt := make([]int, len(ro.HWThreads))
if ro.HWThreads != nil {
copy(hwt, ro.HWThreads)
}
rn.HWThreads = hwt
acc := make([]string, len(ro.Accelerators))
if ro.Accelerators != nil {
copy(acc, ro.Accelerators)
}
rn.Accelerators = acc
jn.Resources = append(jn.Resources, &rn)
}
jn.MetaData = make(map[string]string)
for k, v := range j.MetaData {
jn.MetaData[k] = v
}
jn.Statistics = make(map[string]schema.JobStatistics)
for k, v := range j.Statistics {
var sn schema.JobStatistics
sn.Avg = v.Avg
sn.Max = v.Max
sn.Min = v.Min
tmpUnit := ConvertUnitString(v.Unit)
if tmpUnit.Base == "inval" {
sn.Unit = schema.Unit{Base: ""}
} else {
sn.Unit = tmpUnit
}
jn.Statistics[k] = sn
}
//optional properties
jn.Partition = j.Partition
jn.ArrayJobId = j.ArrayJobId
jn.NumHWThreads = j.NumHWThreads
jn.NumAcc = j.NumAcc
jn.MonitoringStatus = j.MonitoringStatus
jn.SMT = j.SMT
jn.Walltime = j.Walltime
for _, t := range j.Tags {
jn.Tags = append(jn.Tags, t)
}
return jn
}
func deepCopyJobData(d *JobData, cluster string, subCluster string) *schema.JobData {
var dn = make(schema.JobData)
for k, v := range *d {
// fmt.Printf("Metric %s\n", k)
dn[k] = make(map[schema.MetricScope]*schema.JobMetric)
for mk, mv := range v {
// fmt.Printf("Scope %s\n", mk)
var mn schema.JobMetric
tmpUnit := ConvertUnitString(mv.Unit)
if tmpUnit.Base == "inval" {
mn.Unit = schema.Unit{Base: ""}
} else {
mn.Unit = tmpUnit
}
mn.Timestep = mv.Timestep
for _, v := range mv.Series {
var sn schema.Series
sn.Hostname = v.Hostname
if v.Id != nil {
var id = new(string)
if mk == schema.MetricScopeAccelerator {
s := GetSubCluster(cluster, subCluster)
var err error
*id, err = s.Topology.GetAcceleratorID(*v.Id)
if err != nil {
log.Fatal(err)
}
} else {
*id = fmt.Sprint(*v.Id)
}
sn.Id = id
}
if v.Statistics != nil {
sn.Statistics = schema.MetricStatistics{
Avg: v.Statistics.Avg,
Min: v.Statistics.Min,
Max: v.Statistics.Max}
}
sn.Data = make([]schema.Float, len(v.Data))
copy(sn.Data, v.Data)
mn.Series = append(mn.Series, sn)
}
dn[k][mk] = &mn
}
// fmt.Printf("FINISH %s\n", k)
}
return &dn
}
func deepCopyClusterConfig(co *Cluster) schema.Cluster {
var cn schema.Cluster
cn.Name = co.Name
for _, sco := range co.SubClusters {
var scn schema.SubCluster
scn.Name = sco.Name
scn.Nodes = sco.Nodes
scn.ProcessorType = sco.ProcessorType
scn.SocketsPerNode = sco.SocketsPerNode
scn.CoresPerSocket = sco.CoresPerSocket
scn.ThreadsPerCore = sco.ThreadsPerCore
scn.FlopRateScalar = schema.MetricValue{
Unit: schema.Unit{Base: "F/s", Prefix: "G"},
Value: float64(sco.FlopRateScalar)}
scn.FlopRateSimd = schema.MetricValue{
Unit: schema.Unit{Base: "F/s", Prefix: "G"},
Value: float64(sco.FlopRateSimd)}
scn.MemoryBandwidth = schema.MetricValue{
Unit: schema.Unit{Base: "B/s", Prefix: "G"},
Value: float64(sco.MemoryBandwidth)}
scn.Topology = *sco.Topology
cn.SubClusters = append(cn.SubClusters, &scn)
}
for _, mco := range co.MetricConfig {
var mcn schema.MetricConfig
mcn.Name = mco.Name
mcn.Scope = mco.Scope
if mco.Aggregation == "" {
fmt.Println("cluster.json - Property aggregation missing! Please review file!")
mcn.Aggregation = "sum"
} else {
mcn.Aggregation = mco.Aggregation
}
mcn.Timestep = mco.Timestep
tmpUnit := ConvertUnitString(mco.Unit)
if tmpUnit.Base == "inval" {
mcn.Unit = schema.Unit{Base: ""}
} else {
mcn.Unit = tmpUnit
}
mcn.Peak = mco.Peak
mcn.Normal = mco.Normal
mcn.Caution = mco.Caution
mcn.Alert = mco.Alert
mcn.SubClusters = mco.SubClusters
cn.MetricConfig = append(cn.MetricConfig, &mcn)
}
return cn
}
func convertJob(job *JobMeta) {
// check if source data is available, otherwise skip job
src_data_path := getPath(job, srcPath, "data.json")
info, err := os.Stat(src_data_path)
if err != nil {
log.Fatal(err)
}
if info.Size() == 0 {
fmt.Printf("Skip path %s, filesize is 0 Bytes.", src_data_path)
return
}
path := getPath(job, dstPath, "meta.json")
err = os.MkdirAll(filepath.Dir(path), 0750)
if err != nil {
log.Fatal(err)
}
f, err := os.Create(path)
if err != nil {
log.Fatal(err)
}
jmn := deepCopyJobMeta(job)
if err = EncodeJobMeta(f, &jmn); err != nil {
log.Fatal(err)
}
if err = f.Close(); err != nil {
log.Fatal(err)
}
f, err = os.Create(getPath(job, dstPath, "data.json"))
if err != nil {
log.Fatal(err)
}
var jd *JobData
jd, err = loadJobData(src_data_path)
if err != nil {
log.Fatal(err)
}
jdn := deepCopyJobData(jd, job.Cluster, job.SubCluster)
if err := EncodeJobData(f, jdn); err != nil {
log.Fatal(err)
}
if err := f.Close(); err != nil {
log.Fatal(err)
}
}
func main() {
var flagLogLevel, flagConfigFile string
var flagLogDateTime, debug bool
flag.BoolVar(&flagLogDateTime, "logdate", false, "Set this flag to add date and time to log messages")
flag.BoolVar(&debug, "debug", false, "Set this flag to force sequential execution for debugging")
flag.StringVar(&flagLogLevel, "loglevel", "warn", "Sets the logging level: `[debug,info,warn (default),err,fatal,crit]`")
flag.StringVar(&flagConfigFile, "config", "./config.json", "Specify alternative path to `config.json`")
flag.StringVar(&srcPath, "src", "./var/job-archive", "Specify the source job archive path")
flag.StringVar(&dstPath, "dst", "./var/job-archive-new", "Specify the destination job archive path")
flag.Parse()
if _, err := os.Stat(filepath.Join(srcPath, "version.txt")); !errors.Is(err, os.ErrNotExist) {
log.Fatal("Archive version exists!")
}
log.Init(flagLogLevel, flagLogDateTime)
config.Init(flagConfigFile)
srcConfig := fmt.Sprintf("{\"path\": \"%s\"}", srcPath)
err := ar.Init(json.RawMessage(srcConfig))
if err != nil {
log.Fatal(err)
}
err = initClusterConfig()
if err != nil {
log.Fatal(err)
}
// setup new job archive
err = os.Mkdir(dstPath, 0750)
if err != nil {
log.Fatal(err)
}
for _, c := range Clusters {
path := fmt.Sprintf("%s/%s", dstPath, c.Name)
fmt.Println(path)
err = os.Mkdir(path, 0750)
if err != nil {
log.Fatal(err)
}
cn := deepCopyClusterConfig(c)
f, err := os.Create(fmt.Sprintf("%s/%s/cluster.json", dstPath, c.Name))
if err != nil {
log.Fatal(err)
}
if err := EncodeCluster(f, &cn); err != nil {
log.Fatal(err)
}
if err := f.Close(); err != nil {
log.Fatal(err)
}
}
var wg sync.WaitGroup
for job := range ar.Iter() {
if debug {
fmt.Printf("Job %d\n", job.JobID)
convertJob(job)
} else {
job := job
wg.Add(1)
go func() {
defer wg.Done()
convertJob(job)
}()
}
}
wg.Wait()
os.WriteFile(filepath.Join(dstPath, "version.txt"), []byte(fmt.Sprintf("%d", Version)), 0644)
}

View File

@ -1,65 +0,0 @@
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package main
import (
"github.com/ClusterCockpit/cc-backend/pkg/schema"
)
type JobData map[string]map[schema.MetricScope]*JobMetric
type JobMetric struct {
Unit string `json:"unit"`
Scope schema.MetricScope `json:"scope"`
Timestep int `json:"timestep"`
Series []Series `json:"series"`
StatisticsSeries *StatsSeries `json:"statisticsSeries"`
}
type Series struct {
Hostname string `json:"hostname"`
Id *int `json:"id,omitempty"`
Statistics *MetricStatistics `json:"statistics"`
Data []schema.Float `json:"data"`
}
type MetricStatistics struct {
Avg float64 `json:"avg"`
Min float64 `json:"min"`
Max float64 `json:"max"`
}
type StatsSeries struct {
Mean []Float `json:"mean"`
Min []Float `json:"min"`
Max []Float `json:"max"`
Percentiles map[int][]Float `json:"percentiles,omitempty"`
}
// type MetricScope string
// const (
// MetricScopeInvalid MetricScope = "invalid_scope"
// MetricScopeNode MetricScope = "node"
// MetricScopeSocket MetricScope = "socket"
// MetricScopeMemoryDomain MetricScope = "memoryDomain"
// MetricScopeCore MetricScope = "core"
// MetricScopeHWThread MetricScope = "hwthread"
// MetricScopeAccelerator MetricScope = "accelerator"
// )
// var metricScopeGranularity map[MetricScope]int = map[MetricScope]int{
// MetricScopeNode: 10,
// MetricScopeSocket: 5,
// MetricScopeMemoryDomain: 3,
// MetricScopeCore: 2,
// MetricScopeHWThread: 1,
// MetricScopeAccelerator: 5, // Special/Randomly choosen
// MetricScopeInvalid: -1,
// }