mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2025-01-24 18:39:06 +01:00
commit
2bb1b78ba4
@ -1,56 +1,67 @@
|
||||
{
|
||||
"addr": "127.0.0.1:8080",
|
||||
"archive": {
|
||||
"kind": "file",
|
||||
"path": "./var/job-archive"
|
||||
},
|
||||
"jwts": {
|
||||
"max-age": "2000h"
|
||||
},
|
||||
"clusters": [
|
||||
{
|
||||
"name": "fritz",
|
||||
"metricDataRepository": {
|
||||
"kind": "cc-metric-store",
|
||||
"url": "http://localhost:8082",
|
||||
"token": ""
|
||||
},
|
||||
"filterRanges": {
|
||||
"numNodes": {
|
||||
"from": 1,
|
||||
"to": 64
|
||||
},
|
||||
"duration": {
|
||||
"from": 0,
|
||||
"to": 86400
|
||||
},
|
||||
"startTime": {
|
||||
"from": "2022-01-01T00:00:00Z",
|
||||
"to": null
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "alex",
|
||||
"metricDataRepository": {
|
||||
"kind": "cc-metric-store",
|
||||
"url": "http://localhost:8082",
|
||||
"token": ""
|
||||
},
|
||||
"filterRanges": {
|
||||
"numNodes": {
|
||||
"from": 1,
|
||||
"to": 64
|
||||
},
|
||||
"duration": {
|
||||
"from": 0,
|
||||
"to": 86400
|
||||
},
|
||||
"startTime": {
|
||||
"from": "2022-01-01T00:00:00Z",
|
||||
"to": null
|
||||
}
|
||||
}
|
||||
}
|
||||
"addr": "127.0.0.1:8080",
|
||||
"short-running-jobs-duration": 300,
|
||||
"archive": {
|
||||
"kind": "file",
|
||||
"path": "./var/job-archive"
|
||||
},
|
||||
"jwts": {
|
||||
"max-age": "2000h"
|
||||
},
|
||||
"enable-resampling": {
|
||||
"trigger": 30,
|
||||
"resolutions": [
|
||||
600,
|
||||
300,
|
||||
120,
|
||||
60
|
||||
]
|
||||
},
|
||||
"emission-constant": 317,
|
||||
"clusters": [
|
||||
{
|
||||
"name": "fritz",
|
||||
"metricDataRepository": {
|
||||
"kind": "cc-metric-store",
|
||||
"url": "http://localhost:8082",
|
||||
"token": ""
|
||||
},
|
||||
"filterRanges": {
|
||||
"numNodes": {
|
||||
"from": 1,
|
||||
"to": 64
|
||||
},
|
||||
"duration": {
|
||||
"from": 0,
|
||||
"to": 86400
|
||||
},
|
||||
"startTime": {
|
||||
"from": "2022-01-01T00:00:00Z",
|
||||
"to": null
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "alex",
|
||||
"metricDataRepository": {
|
||||
"kind": "cc-metric-store",
|
||||
"url": "http://localhost:8082",
|
||||
"token": ""
|
||||
},
|
||||
"filterRanges": {
|
||||
"numNodes": {
|
||||
"from": 1,
|
||||
"to": 64
|
||||
},
|
||||
"duration": {
|
||||
"from": 0,
|
||||
"to": 86400
|
||||
},
|
||||
"startTime": {
|
||||
"from": "2022-01-01T00:00:00Z",
|
||||
"to": null
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
|
@ -120,7 +120,7 @@ func setup(t *testing.T) *api.RestApi {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err := os.WriteFile(filepath.Join(jobarchive, "version.txt"), []byte(fmt.Sprintf("%d", 1)), 0666); err != nil {
|
||||
if err := os.WriteFile(filepath.Join(jobarchive, "version.txt"), []byte(fmt.Sprintf("%d", 2)), 0666); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
|
@ -8,6 +8,7 @@ import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"math"
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
@ -84,7 +85,8 @@ func HandleImportFlag(flag string) error {
|
||||
}
|
||||
|
||||
name := fmt.Sprintf("%s_%s", fp, statType)
|
||||
job.Footprint[fp] = repository.LoadJobStat(&job, name, statType)
|
||||
|
||||
job.Footprint[name] = repository.LoadJobStat(&job, fp, statType)
|
||||
}
|
||||
|
||||
job.RawFootprint, err = json.Marshal(job.Footprint)
|
||||
@ -92,6 +94,34 @@ func HandleImportFlag(flag string) error {
|
||||
log.Warn("Error while marshaling job footprint")
|
||||
return err
|
||||
}
|
||||
|
||||
job.EnergyFootprint = make(map[string]float64)
|
||||
var totalEnergy float64
|
||||
var energy float64
|
||||
|
||||
for _, fp := range sc.EnergyFootprint {
|
||||
if i, err := archive.MetricIndex(sc.MetricConfig, fp); err == nil {
|
||||
// Note: For DB data, calculate and save as kWh
|
||||
// Energy: Power (in Watts) * Time (in Seconds)
|
||||
if sc.MetricConfig[i].Energy == "energy" { // this metric has energy as unit (Joules)
|
||||
} else if sc.MetricConfig[i].Energy == "power" { // this metric has power as unit (Watt)
|
||||
// Unit: ( W * s ) / 3600 / 1000 = kWh ; Rounded to 2 nearest digits
|
||||
energy = math.Round(((repository.LoadJobStat(&job, fp, "avg")*float64(job.Duration))/3600/1000)*100) / 100
|
||||
}
|
||||
} else {
|
||||
log.Warnf("Error while collecting energy metric %s for job, DB ID '%v', return '0.0'", fp, job.ID)
|
||||
}
|
||||
|
||||
job.EnergyFootprint[fp] = energy
|
||||
totalEnergy += energy
|
||||
}
|
||||
|
||||
job.Energy = (math.Round(totalEnergy*100) / 100)
|
||||
if job.RawEnergyFootprint, err = json.Marshal(job.EnergyFootprint); err != nil {
|
||||
log.Warnf("Error while marshaling energy footprint for job INTO BYTES, DB ID '%v'", job.ID)
|
||||
return err
|
||||
}
|
||||
|
||||
job.RawResources, err = json.Marshal(job.Resources)
|
||||
if err != nil {
|
||||
log.Warn("Error while marshaling job resources")
|
||||
|
@ -82,7 +82,7 @@ func setup(t *testing.T) *repository.JobRepository {
|
||||
if err := os.Mkdir(jobarchive, 0777); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := os.WriteFile(filepath.Join(jobarchive, "version.txt"), []byte(fmt.Sprintf("%d", 1)), 0666); err != nil {
|
||||
if err := os.WriteFile(filepath.Join(jobarchive, "version.txt"), []byte(fmt.Sprintf("%d", 2)), 0666); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
fritzArchive := filepath.Join(tmpdir, "job-archive", "fritz")
|
||||
|
@ -7,6 +7,7 @@ package importer
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"math"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
@ -70,6 +71,7 @@ func InitDB() error {
|
||||
log.Errorf("cannot get subcluster: %s", err.Error())
|
||||
return err
|
||||
}
|
||||
|
||||
job.Footprint = make(map[string]float64)
|
||||
|
||||
for _, fp := range sc.Footprint {
|
||||
@ -81,7 +83,7 @@ func InitDB() error {
|
||||
|
||||
name := fmt.Sprintf("%s_%s", fp, statType)
|
||||
|
||||
job.Footprint[fp] = repository.LoadJobStat(jobMeta, name, statType)
|
||||
job.Footprint[name] = repository.LoadJobStat(jobMeta, fp, statType)
|
||||
}
|
||||
|
||||
job.RawFootprint, err = json.Marshal(job.Footprint)
|
||||
@ -90,6 +92,33 @@ func InitDB() error {
|
||||
return err
|
||||
}
|
||||
|
||||
job.EnergyFootprint = make(map[string]float64)
|
||||
var totalEnergy float64
|
||||
var energy float64
|
||||
|
||||
for _, fp := range sc.EnergyFootprint {
|
||||
if i, err := archive.MetricIndex(sc.MetricConfig, fp); err == nil {
|
||||
// Note: For DB data, calculate and save as kWh
|
||||
// Energy: Power (in Watts) * Time (in Seconds)
|
||||
if sc.MetricConfig[i].Energy == "energy" { // this metric has energy as unit (Joules)
|
||||
} else if sc.MetricConfig[i].Energy == "power" { // this metric has power as unit (Watt)
|
||||
// Unit: ( W * s ) / 3600 / 1000 = kWh ; Rounded to 2 nearest digits
|
||||
energy = math.Round(((repository.LoadJobStat(jobMeta, fp, "avg")*float64(jobMeta.Duration))/3600/1000)*100) / 100
|
||||
}
|
||||
} else {
|
||||
log.Warnf("Error while collecting energy metric %s for job, DB ID '%v', return '0.0'", fp, jobMeta.ID)
|
||||
}
|
||||
|
||||
job.EnergyFootprint[fp] = energy
|
||||
totalEnergy += energy
|
||||
}
|
||||
|
||||
job.Energy = (math.Round(totalEnergy*100) / 100)
|
||||
if job.RawEnergyFootprint, err = json.Marshal(job.EnergyFootprint); err != nil {
|
||||
log.Warnf("Error while marshaling energy footprint for job INTO BYTES, DB ID '%v'", jobMeta.ID)
|
||||
return err
|
||||
}
|
||||
|
||||
job.RawResources, err = json.Marshal(job.Resources)
|
||||
if err != nil {
|
||||
log.Errorf("repository initDB(): %v", err)
|
||||
|
@ -15,10 +15,10 @@ import (
|
||||
|
||||
const NamedJobInsert string = `INSERT INTO job (
|
||||
job_id, user, project, cluster, subcluster, ` + "`partition`" + `, array_job_id, num_nodes, num_hwthreads, num_acc,
|
||||
exclusive, monitoring_status, smt, job_state, start_time, duration, walltime, footprint, resources, meta_data
|
||||
exclusive, monitoring_status, smt, job_state, start_time, duration, walltime, footprint, energy, energy_footprint, resources, meta_data
|
||||
) VALUES (
|
||||
:job_id, :user, :project, :cluster, :subcluster, :partition, :array_job_id, :num_nodes, :num_hwthreads, :num_acc,
|
||||
:exclusive, :monitoring_status, :smt, :job_state, :start_time, :duration, :walltime, :footprint, :resources, :meta_data
|
||||
:exclusive, :monitoring_status, :smt, :job_state, :start_time, :duration, :walltime, :footprint, :energy, :energy_footprint, :resources, :meta_data
|
||||
);`
|
||||
|
||||
func (r *JobRepository) InsertJob(job *schema.JobMeta) (int64, error) {
|
||||
|
@ -14,7 +14,7 @@ import (
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
||||
)
|
||||
|
||||
const Version uint64 = 1
|
||||
const Version uint64 = 2
|
||||
|
||||
type ArchiveBackend interface {
|
||||
Init(rawConfig json.RawMessage) (uint64, error)
|
||||
|
@ -48,7 +48,7 @@ func TestInit(t *testing.T) {
|
||||
if fsa.path != "testdata/archive" {
|
||||
t.Fail()
|
||||
}
|
||||
if version != 1 {
|
||||
if version != 2 {
|
||||
t.Fail()
|
||||
}
|
||||
if len(fsa.clusters) != 3 || fsa.clusters[1] != "emmy" {
|
||||
|
2
pkg/archive/testdata/archive/version.txt
vendored
2
pkg/archive/testdata/archive/version.txt
vendored
@ -1 +1 @@
|
||||
1
|
||||
2
|
||||
|
@ -1,65 +0,0 @@
|
||||
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
|
||||
// All rights reserved.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
package main
|
||||
|
||||
import (
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
||||
)
|
||||
|
||||
// type Accelerator struct {
|
||||
// ID string `json:"id"`
|
||||
// Type string `json:"type"`
|
||||
// Model string `json:"model"`
|
||||
// }
|
||||
|
||||
// type Topology struct {
|
||||
// Node []int `json:"node"`
|
||||
// Socket [][]int `json:"socket"`
|
||||
// MemoryDomain [][]int `json:"memoryDomain"`
|
||||
// Die [][]int `json:"die"`
|
||||
// Core [][]int `json:"core"`
|
||||
// Accelerators []*Accelerator `json:"accelerators"`
|
||||
// }
|
||||
|
||||
type SubCluster struct {
|
||||
Name string `json:"name"`
|
||||
Nodes string `json:"nodes"`
|
||||
NumberOfNodes int `json:"numberOfNodes"`
|
||||
ProcessorType string `json:"processorType"`
|
||||
SocketsPerNode int `json:"socketsPerNode"`
|
||||
CoresPerSocket int `json:"coresPerSocket"`
|
||||
ThreadsPerCore int `json:"threadsPerCore"`
|
||||
FlopRateScalar int `json:"flopRateScalar"`
|
||||
FlopRateSimd int `json:"flopRateSimd"`
|
||||
MemoryBandwidth int `json:"memoryBandwidth"`
|
||||
Topology *schema.Topology `json:"topology"`
|
||||
}
|
||||
|
||||
// type SubClusterConfig struct {
|
||||
// Name string `json:"name"`
|
||||
// Peak float64 `json:"peak"`
|
||||
// Normal float64 `json:"normal"`
|
||||
// Caution float64 `json:"caution"`
|
||||
// Alert float64 `json:"alert"`
|
||||
// }
|
||||
|
||||
type MetricConfig struct {
|
||||
Name string `json:"name"`
|
||||
Unit string `json:"unit"`
|
||||
Scope schema.MetricScope `json:"scope"`
|
||||
Aggregation string `json:"aggregation"`
|
||||
Timestep int `json:"timestep"`
|
||||
Peak float64 `json:"peak"`
|
||||
Normal float64 `json:"normal"`
|
||||
Caution float64 `json:"caution"`
|
||||
Alert float64 `json:"alert"`
|
||||
SubClusters []*schema.SubClusterConfig `json:"subClusters"`
|
||||
}
|
||||
|
||||
type Cluster struct {
|
||||
Name string `json:"name"`
|
||||
MetricConfig []*MetricConfig `json:"metricConfig"`
|
||||
SubClusters []*SubCluster `json:"subClusters"`
|
||||
}
|
@ -1,166 +0,0 @@
|
||||
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
|
||||
// All rights reserved.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
package main
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/archive"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
||||
)
|
||||
|
||||
var Clusters []*Cluster
|
||||
var nodeLists map[string]map[string]archive.NodeList
|
||||
|
||||
func initClusterConfig() error {
|
||||
|
||||
Clusters = []*Cluster{}
|
||||
nodeLists = map[string]map[string]archive.NodeList{}
|
||||
|
||||
for _, c := range ar.GetClusters() {
|
||||
|
||||
cluster, err := ar.LoadClusterCfg(c)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(cluster.Name) == 0 ||
|
||||
len(cluster.MetricConfig) == 0 ||
|
||||
len(cluster.SubClusters) == 0 {
|
||||
return errors.New("cluster.name, cluster.metricConfig and cluster.SubClusters should not be empty")
|
||||
}
|
||||
|
||||
for _, mc := range cluster.MetricConfig {
|
||||
if len(mc.Name) == 0 {
|
||||
return errors.New("cluster.metricConfig.name should not be empty")
|
||||
}
|
||||
if mc.Timestep < 1 {
|
||||
return errors.New("cluster.metricConfig.timestep should not be smaller than one")
|
||||
}
|
||||
|
||||
// For backwards compability...
|
||||
if mc.Scope == "" {
|
||||
mc.Scope = schema.MetricScopeNode
|
||||
}
|
||||
if !mc.Scope.Valid() {
|
||||
return errors.New("cluster.metricConfig.scope must be a valid scope ('node', 'scocket', ...)")
|
||||
}
|
||||
}
|
||||
|
||||
Clusters = append(Clusters, cluster)
|
||||
|
||||
nodeLists[cluster.Name] = make(map[string]archive.NodeList)
|
||||
for _, sc := range cluster.SubClusters {
|
||||
if sc.Nodes == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
nl, err := archive.ParseNodeList(sc.Nodes)
|
||||
if err != nil {
|
||||
return fmt.Errorf("in %s/cluster.json: %w", cluster.Name, err)
|
||||
}
|
||||
nodeLists[cluster.Name][sc.Name] = nl
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func GetCluster(cluster string) *Cluster {
|
||||
|
||||
for _, c := range Clusters {
|
||||
if c.Name == cluster {
|
||||
return c
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func GetSubCluster(cluster, subcluster string) *SubCluster {
|
||||
|
||||
for _, c := range Clusters {
|
||||
if c.Name == cluster {
|
||||
for _, p := range c.SubClusters {
|
||||
if p.Name == subcluster {
|
||||
return p
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func GetMetricConfig(cluster, metric string) *MetricConfig {
|
||||
|
||||
for _, c := range Clusters {
|
||||
if c.Name == cluster {
|
||||
for _, m := range c.MetricConfig {
|
||||
if m.Name == metric {
|
||||
return m
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// AssignSubCluster sets the `job.subcluster` property of the job based
|
||||
// on its cluster and resources.
|
||||
func AssignSubCluster(job *BaseJob) error {
|
||||
|
||||
cluster := GetCluster(job.Cluster)
|
||||
if cluster == nil {
|
||||
return fmt.Errorf("unkown cluster: %#v", job.Cluster)
|
||||
}
|
||||
|
||||
if job.SubCluster != "" {
|
||||
for _, sc := range cluster.SubClusters {
|
||||
if sc.Name == job.SubCluster {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return fmt.Errorf("already assigned subcluster %#v unkown (cluster: %#v)", job.SubCluster, job.Cluster)
|
||||
}
|
||||
|
||||
if len(job.Resources) == 0 {
|
||||
return fmt.Errorf("job without any resources/hosts")
|
||||
}
|
||||
|
||||
host0 := job.Resources[0].Hostname
|
||||
for sc, nl := range nodeLists[job.Cluster] {
|
||||
if nl != nil && nl.Contains(host0) {
|
||||
job.SubCluster = sc
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
if cluster.SubClusters[0].Nodes == "" {
|
||||
job.SubCluster = cluster.SubClusters[0].Name
|
||||
return nil
|
||||
}
|
||||
|
||||
return fmt.Errorf("no subcluster found for cluster %#v and host %#v", job.Cluster, host0)
|
||||
}
|
||||
|
||||
func GetSubClusterByNode(cluster, hostname string) (string, error) {
|
||||
|
||||
for sc, nl := range nodeLists[cluster] {
|
||||
if nl != nil && nl.Contains(hostname) {
|
||||
return sc, nil
|
||||
}
|
||||
}
|
||||
|
||||
c := GetCluster(cluster)
|
||||
if c == nil {
|
||||
return "", fmt.Errorf("unkown cluster: %#v", cluster)
|
||||
}
|
||||
|
||||
if c.SubClusters[0].Nodes == "" {
|
||||
return c.SubClusters[0].Name, nil
|
||||
}
|
||||
|
||||
return "", fmt.Errorf("no subcluster found for cluster %#v and host %#v", cluster, hostname)
|
||||
}
|
@ -1,109 +0,0 @@
|
||||
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
|
||||
// All rights reserved.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
package main
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"io"
|
||||
"math"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
// A custom float type is used so that (Un)MarshalJSON and
|
||||
// (Un)MarshalGQL can be overloaded and NaN/null can be used.
|
||||
// The default behaviour of putting every nullable value behind
|
||||
// a pointer has a bigger overhead.
|
||||
type Float float64
|
||||
|
||||
var NaN Float = Float(math.NaN())
|
||||
var nullAsBytes []byte = []byte("null")
|
||||
|
||||
func (f Float) IsNaN() bool {
|
||||
return math.IsNaN(float64(f))
|
||||
}
|
||||
|
||||
// NaN will be serialized to `null`.
|
||||
func (f Float) MarshalJSON() ([]byte, error) {
|
||||
if f.IsNaN() {
|
||||
return nullAsBytes, nil
|
||||
}
|
||||
|
||||
return strconv.AppendFloat(make([]byte, 0, 10), float64(f), 'f', 2, 64), nil
|
||||
}
|
||||
|
||||
// `null` will be unserialized to NaN.
|
||||
func (f *Float) UnmarshalJSON(input []byte) error {
|
||||
s := string(input)
|
||||
if s == "null" {
|
||||
*f = NaN
|
||||
return nil
|
||||
}
|
||||
|
||||
val, err := strconv.ParseFloat(s, 64)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
*f = Float(val)
|
||||
return nil
|
||||
}
|
||||
|
||||
// UnmarshalGQL implements the graphql.Unmarshaler interface.
|
||||
func (f *Float) UnmarshalGQL(v interface{}) error {
|
||||
f64, ok := v.(float64)
|
||||
if !ok {
|
||||
return errors.New("invalid Float scalar")
|
||||
}
|
||||
|
||||
*f = Float(f64)
|
||||
return nil
|
||||
}
|
||||
|
||||
// MarshalGQL implements the graphql.Marshaler interface.
|
||||
// NaN will be serialized to `null`.
|
||||
func (f Float) MarshalGQL(w io.Writer) {
|
||||
if f.IsNaN() {
|
||||
w.Write(nullAsBytes)
|
||||
} else {
|
||||
w.Write(strconv.AppendFloat(make([]byte, 0, 10), float64(f), 'f', 2, 64))
|
||||
}
|
||||
}
|
||||
|
||||
// Only used via REST-API, not via GraphQL.
|
||||
// This uses a lot less allocations per series,
|
||||
// but it turns out that the performance increase
|
||||
// from using this is not that big.
|
||||
func (s *Series) MarshalJSON() ([]byte, error) {
|
||||
buf := make([]byte, 0, 512+len(s.Data)*8)
|
||||
buf = append(buf, `{"hostname":"`...)
|
||||
buf = append(buf, s.Hostname...)
|
||||
buf = append(buf, '"')
|
||||
if s.Id != nil {
|
||||
buf = append(buf, `,"id":`...)
|
||||
buf = strconv.AppendInt(buf, int64(*s.Id), 10)
|
||||
}
|
||||
if s.Statistics != nil {
|
||||
buf = append(buf, `,"statistics":{"min":`...)
|
||||
buf = strconv.AppendFloat(buf, s.Statistics.Min, 'f', 2, 64)
|
||||
buf = append(buf, `,"avg":`...)
|
||||
buf = strconv.AppendFloat(buf, s.Statistics.Avg, 'f', 2, 64)
|
||||
buf = append(buf, `,"max":`...)
|
||||
buf = strconv.AppendFloat(buf, s.Statistics.Max, 'f', 2, 64)
|
||||
buf = append(buf, '}')
|
||||
}
|
||||
buf = append(buf, `,"data":[`...)
|
||||
for i := 0; i < len(s.Data); i++ {
|
||||
if i != 0 {
|
||||
buf = append(buf, ',')
|
||||
}
|
||||
|
||||
if s.Data[i].IsNaN() {
|
||||
buf = append(buf, `null`...)
|
||||
} else {
|
||||
buf = strconv.AppendFloat(buf, float64(s.Data[i]), 'f', 2, 32)
|
||||
}
|
||||
}
|
||||
buf = append(buf, ']', '}')
|
||||
return buf, nil
|
||||
}
|
@ -1,142 +0,0 @@
|
||||
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
|
||||
// All rights reserved.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
package main
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
||||
)
|
||||
|
||||
type FsArchiveConfig struct {
|
||||
Path string `json:"path"`
|
||||
}
|
||||
|
||||
type FsArchive struct {
|
||||
path string
|
||||
clusters []string
|
||||
}
|
||||
|
||||
func getPath(
|
||||
job *JobMeta,
|
||||
rootPath string,
|
||||
file string) string {
|
||||
|
||||
lvl1, lvl2 := fmt.Sprintf("%d", job.JobID/1000), fmt.Sprintf("%03d", job.JobID%1000)
|
||||
return filepath.Join(
|
||||
rootPath,
|
||||
job.Cluster,
|
||||
lvl1, lvl2,
|
||||
strconv.FormatInt(job.StartTime, 10), file)
|
||||
}
|
||||
|
||||
func loadJobMeta(filename string) (*JobMeta, error) {
|
||||
|
||||
f, err := os.Open(filename)
|
||||
if err != nil {
|
||||
log.Errorf("fsBackend loadJobMeta()- %v", err)
|
||||
return &JobMeta{}, err
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
return DecodeJobMeta(bufio.NewReader(f))
|
||||
}
|
||||
|
||||
func (fsa *FsArchive) Init(rawConfig json.RawMessage) error {
|
||||
|
||||
var config FsArchiveConfig
|
||||
if err := json.Unmarshal(rawConfig, &config); err != nil {
|
||||
log.Errorf("fsBackend Init()- %v", err)
|
||||
return err
|
||||
}
|
||||
if config.Path == "" {
|
||||
err := fmt.Errorf("fsBackend Init()- empty path")
|
||||
log.Errorf("fsBackend Init()- %v", err)
|
||||
return err
|
||||
}
|
||||
fsa.path = config.Path
|
||||
|
||||
entries, err := os.ReadDir(fsa.path)
|
||||
if err != nil {
|
||||
log.Errorf("fsBackend Init()- %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
for _, de := range entries {
|
||||
fsa.clusters = append(fsa.clusters, de.Name())
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (fsa *FsArchive) Iter() <-chan *JobMeta {
|
||||
|
||||
ch := make(chan *JobMeta)
|
||||
go func() {
|
||||
clustersDir, err := os.ReadDir(fsa.path)
|
||||
if err != nil {
|
||||
log.Fatalf("Reading clusters failed: %s", err.Error())
|
||||
}
|
||||
|
||||
for _, clusterDir := range clustersDir {
|
||||
lvl1Dirs, err := os.ReadDir(filepath.Join(fsa.path, clusterDir.Name()))
|
||||
if err != nil {
|
||||
log.Fatalf("Reading jobs failed: %s", err.Error())
|
||||
}
|
||||
|
||||
for _, lvl1Dir := range lvl1Dirs {
|
||||
if !lvl1Dir.IsDir() {
|
||||
// Could be the cluster.json file
|
||||
continue
|
||||
}
|
||||
|
||||
lvl2Dirs, err := os.ReadDir(filepath.Join(fsa.path, clusterDir.Name(), lvl1Dir.Name()))
|
||||
if err != nil {
|
||||
log.Fatalf("Reading jobs failed: %s", err.Error())
|
||||
}
|
||||
|
||||
for _, lvl2Dir := range lvl2Dirs {
|
||||
dirpath := filepath.Join(fsa.path, clusterDir.Name(), lvl1Dir.Name(), lvl2Dir.Name())
|
||||
startTimeDirs, err := os.ReadDir(dirpath)
|
||||
if err != nil {
|
||||
log.Fatalf("Reading jobs failed: %s", err.Error())
|
||||
}
|
||||
|
||||
for _, startTimeDir := range startTimeDirs {
|
||||
if startTimeDir.IsDir() {
|
||||
job, err := loadJobMeta(filepath.Join(dirpath, startTimeDir.Name(), "meta.json"))
|
||||
if err != nil {
|
||||
log.Errorf("in %s: %s", filepath.Join(dirpath, startTimeDir.Name()), err.Error())
|
||||
} else {
|
||||
ch <- job
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
close(ch)
|
||||
}()
|
||||
return ch
|
||||
}
|
||||
|
||||
func (fsa *FsArchive) LoadClusterCfg(name string) (*Cluster, error) {
|
||||
b, err := os.ReadFile(filepath.Join(fsa.path, name, "cluster.json"))
|
||||
if err != nil {
|
||||
log.Errorf("fsBackend LoadClusterCfg()- %v", err)
|
||||
return &Cluster{}, err
|
||||
}
|
||||
return DecodeCluster(bytes.NewReader(b))
|
||||
}
|
||||
|
||||
func (fsa *FsArchive) GetClusters() []string {
|
||||
return fsa.clusters
|
||||
}
|
@ -1,162 +0,0 @@
|
||||
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
|
||||
// All rights reserved.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
package main
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"time"
|
||||
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
||||
)
|
||||
|
||||
// Non-Swaggered Comment: BaseJob
|
||||
// Non-Swaggered Comment: Common subset of Job and JobMeta. Use one of those, not this type directly.
|
||||
|
||||
type BaseJob struct {
|
||||
// The unique identifier of a job
|
||||
JobID int64 `json:"jobId" db:"job_id" example:"123000"`
|
||||
User string `json:"user" db:"user" example:"abcd100h"` // The unique identifier of a user
|
||||
Project string `json:"project" db:"project" example:"abcd200"` // The unique identifier of a project
|
||||
Cluster string `json:"cluster" db:"cluster" example:"fritz"` // The unique identifier of a cluster
|
||||
SubCluster string `json:"subCluster" db:"subcluster" example:"main"` // The unique identifier of a sub cluster
|
||||
Partition string `json:"partition" db:"partition" example:"main"` // The Slurm partition to which the job was submitted
|
||||
ArrayJobId int64 `json:"arrayJobId" db:"array_job_id" example:"123000"` // The unique identifier of an array job
|
||||
NumNodes int32 `json:"numNodes" db:"num_nodes" example:"2" minimum:"1"` // Number of nodes used (Min > 0)
|
||||
NumHWThreads int32 `json:"numHwthreads" db:"num_hwthreads" example:"20" minimum:"1"` // Number of HWThreads used (Min > 0)
|
||||
NumAcc int32 `json:"numAcc" db:"num_acc" example:"2" minimum:"1"` // Number of accelerators used (Min > 0)
|
||||
Exclusive int32 `json:"exclusive" db:"exclusive" example:"1" minimum:"0" maximum:"2"` // Specifies how nodes are shared: 0 - Shared among multiple jobs of multiple users, 1 - Job exclusive (Default), 2 - Shared among multiple jobs of same user
|
||||
MonitoringStatus int32 `json:"monitoringStatus" db:"monitoring_status" example:"1" minimum:"0" maximum:"3"` // State of monitoring system during job run: 0 - Disabled, 1 - Running or Archiving (Default), 2 - Archiving Failed, 3 - Archiving Successfull
|
||||
SMT int32 `json:"smt" db:"smt" example:"4"` // SMT threads used by job
|
||||
State JobState `json:"jobState" db:"job_state" example:"completed" enums:"completed,failed,cancelled,stopped,timeout,out_of_memory"` // Final state of job
|
||||
Duration int32 `json:"duration" db:"duration" example:"43200" minimum:"1"` // Duration of job in seconds (Min > 0)
|
||||
Walltime int64 `json:"walltime" db:"walltime" example:"86400" minimum:"1"` // Requested walltime of job in seconds (Min > 0)
|
||||
Tags []*schema.Tag `json:"tags"` // List of tags
|
||||
RawResources []byte `json:"-" db:"resources"` // Resources used by job [As Bytes]
|
||||
Resources []*Resource `json:"resources"` // Resources used by job
|
||||
RawMetaData []byte `json:"-" db:"meta_data"` // Additional information about the job [As Bytes]
|
||||
MetaData map[string]string `json:"metaData"` // Additional information about the job
|
||||
}
|
||||
|
||||
// Non-Swaggered Comment: Job
|
||||
// Non-Swaggered Comment: This type is used as the GraphQL interface and using sqlx as a table row.
|
||||
|
||||
// Job model
|
||||
// @Description Information of a HPC job.
|
||||
type Job struct {
|
||||
// The unique identifier of a job in the database
|
||||
ID int64 `json:"id" db:"id"`
|
||||
BaseJob
|
||||
StartTimeUnix int64 `json:"-" db:"start_time" example:"1649723812"` // Start epoch time stamp in seconds
|
||||
StartTime time.Time `json:"startTime"` // Start time as 'time.Time' data type
|
||||
MemUsedMax float64 `json:"-" db:"mem_used_max"` // MemUsedMax as Float64
|
||||
FlopsAnyAvg float64 `json:"-" db:"flops_any_avg"` // FlopsAnyAvg as Float64
|
||||
MemBwAvg float64 `json:"-" db:"mem_bw_avg"` // MemBwAvg as Float64
|
||||
LoadAvg float64 `json:"-" db:"load_avg"` // LoadAvg as Float64
|
||||
NetBwAvg float64 `json:"-" db:"net_bw_avg"` // NetBwAvg as Float64
|
||||
NetDataVolTotal float64 `json:"-" db:"net_data_vol_total"` // NetDataVolTotal as Float64
|
||||
FileBwAvg float64 `json:"-" db:"file_bw_avg"` // FileBwAvg as Float64
|
||||
FileDataVolTotal float64 `json:"-" db:"file_data_vol_total"` // FileDataVolTotal as Float64
|
||||
}
|
||||
|
||||
// Non-Swaggered Comment: JobMeta
|
||||
// Non-Swaggered Comment: When reading from the database or sending data via GraphQL, the start time can be in the much more
|
||||
// Non-Swaggered Comment: convenient time.Time type. In the `meta.json` files, the start time is encoded as a unix epoch timestamp.
|
||||
// Non-Swaggered Comment: This is why there is this struct, which contains all fields from the regular job struct, but "overwrites"
|
||||
// Non-Swaggered Comment: the StartTime field with one of type int64.
|
||||
// Non-Swaggered Comment: ID *int64 `json:"id,omitempty"` >> never used in the job-archive, only available via REST-API
|
||||
|
||||
// JobMeta model
|
||||
// @Description Meta data information of a HPC job.
|
||||
type JobMeta struct {
|
||||
// The unique identifier of a job in the database
|
||||
ID *int64 `json:"id,omitempty"`
|
||||
BaseJob
|
||||
StartTime int64 `json:"startTime" db:"start_time" example:"1649723812" minimum:"1"` // Start epoch time stamp in seconds (Min > 0)
|
||||
Statistics map[string]JobStatistics `json:"statistics,omitempty"` // Metric statistics of job
|
||||
}
|
||||
|
||||
const (
|
||||
MonitoringStatusDisabled int32 = 0
|
||||
MonitoringStatusRunningOrArchiving int32 = 1
|
||||
MonitoringStatusArchivingFailed int32 = 2
|
||||
MonitoringStatusArchivingSuccessful int32 = 3
|
||||
)
|
||||
|
||||
var JobDefaults BaseJob = BaseJob{
|
||||
Exclusive: 1,
|
||||
MonitoringStatus: MonitoringStatusRunningOrArchiving,
|
||||
}
|
||||
|
||||
// JobStatistics model
|
||||
// @Description Specification for job metric statistics.
|
||||
type JobStatistics struct {
|
||||
// Metric unit (see schema/unit.schema.json)
|
||||
Unit string `json:"unit" example:"GHz"`
|
||||
Avg float64 `json:"avg" example:"2500" minimum:"0"` // Job metric average
|
||||
Min float64 `json:"min" example:"2000" minimum:"0"` // Job metric minimum
|
||||
Max float64 `json:"max" example:"3000" minimum:"0"` // Job metric maximum
|
||||
}
|
||||
|
||||
// Tag model
|
||||
// @Description Defines a tag using name and type.
|
||||
type Tag struct {
|
||||
// The unique DB identifier of a tag
|
||||
ID int64 `json:"id" db:"id"`
|
||||
Type string `json:"type" db:"tag_type" example:"Debug"` // Tag Type
|
||||
Name string `json:"name" db:"tag_name" example:"Testjob"` // Tag Name
|
||||
}
|
||||
|
||||
// Resource model
|
||||
// @Description A resource used by a job
|
||||
type Resource struct {
|
||||
Hostname string `json:"hostname"` // Name of the host (= node)
|
||||
HWThreads []int `json:"hwthreads,omitempty"` // List of OS processor ids
|
||||
Accelerators []string `json:"accelerators,omitempty"` // List of of accelerator device ids
|
||||
Configuration string `json:"configuration,omitempty"` // The configuration options of the node
|
||||
}
|
||||
|
||||
type JobState string
|
||||
|
||||
const (
|
||||
JobStateRunning JobState = "running"
|
||||
JobStateCompleted JobState = "completed"
|
||||
JobStateFailed JobState = "failed"
|
||||
JobStateCancelled JobState = "cancelled"
|
||||
JobStateStopped JobState = "stopped"
|
||||
JobStateTimeout JobState = "timeout"
|
||||
JobStatePreempted JobState = "preempted"
|
||||
JobStateOutOfMemory JobState = "out_of_memory"
|
||||
)
|
||||
|
||||
func (e *JobState) UnmarshalGQL(v interface{}) error {
|
||||
str, ok := v.(string)
|
||||
if !ok {
|
||||
return fmt.Errorf("enums must be strings")
|
||||
}
|
||||
|
||||
*e = JobState(str)
|
||||
if !e.Valid() {
|
||||
return errors.New("invalid job state")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e JobState) MarshalGQL(w io.Writer) {
|
||||
fmt.Fprintf(w, "\"%s\"", e)
|
||||
}
|
||||
|
||||
func (e JobState) Valid() bool {
|
||||
return e == JobStateRunning ||
|
||||
e == JobStateCompleted ||
|
||||
e == JobStateFailed ||
|
||||
e == JobStateCancelled ||
|
||||
e == JobStateStopped ||
|
||||
e == JobStateTimeout ||
|
||||
e == JobStatePreempted ||
|
||||
e == JobStateOutOfMemory
|
||||
}
|
@ -1,66 +0,0 @@
|
||||
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
|
||||
// All rights reserved.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"io"
|
||||
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
||||
)
|
||||
|
||||
func DecodeJobData(r io.Reader) (*JobData, error) {
|
||||
var d JobData
|
||||
if err := json.NewDecoder(r).Decode(&d); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &d, nil
|
||||
}
|
||||
|
||||
func DecodeJobMeta(r io.Reader) (*JobMeta, error) {
|
||||
var d JobMeta
|
||||
if err := json.NewDecoder(r).Decode(&d); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &d, nil
|
||||
}
|
||||
|
||||
func DecodeCluster(r io.Reader) (*Cluster, error) {
|
||||
var c Cluster
|
||||
if err := json.NewDecoder(r).Decode(&c); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &c, nil
|
||||
}
|
||||
|
||||
func EncodeJobData(w io.Writer, d *schema.JobData) error {
|
||||
// Sanitize parameters
|
||||
if err := json.NewEncoder(w).Encode(d); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func EncodeJobMeta(w io.Writer, d *schema.JobMeta) error {
|
||||
// Sanitize parameters
|
||||
if err := json.NewEncoder(w).Encode(d); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func EncodeCluster(w io.Writer, c *schema.Cluster) error {
|
||||
// Sanitize parameters
|
||||
if err := json.NewEncoder(w).Encode(c); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
@ -1,371 +0,0 @@
|
||||
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
|
||||
// All rights reserved.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
package main
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"flag"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
|
||||
"github.com/ClusterCockpit/cc-backend/internal/config"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
||||
ccunits "github.com/ClusterCockpit/cc-units"
|
||||
)
|
||||
|
||||
const Version = 1
|
||||
|
||||
var ar FsArchive
|
||||
var srcPath string
|
||||
var dstPath string
|
||||
|
||||
func loadJobData(filename string) (*JobData, error) {
|
||||
|
||||
f, err := os.Open(filename)
|
||||
if err != nil {
|
||||
return &JobData{}, fmt.Errorf("fsBackend loadJobData()- %v", err)
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
return DecodeJobData(bufio.NewReader(f))
|
||||
}
|
||||
|
||||
func ConvertUnitString(us string) schema.Unit {
|
||||
var nu schema.Unit
|
||||
|
||||
if us == "CPI" ||
|
||||
us == "IPC" ||
|
||||
us == "load" ||
|
||||
us == "" {
|
||||
nu.Base = us
|
||||
return nu
|
||||
}
|
||||
u := ccunits.NewUnit(us)
|
||||
p := u.GetPrefix()
|
||||
if p.Prefix() != "" {
|
||||
prefix := p.Prefix()
|
||||
nu.Prefix = prefix
|
||||
}
|
||||
m := u.GetMeasure()
|
||||
d := u.GetUnitDenominator()
|
||||
if d.Short() != "inval" {
|
||||
nu.Base = fmt.Sprintf("%s/%s", m.Short(), d.Short())
|
||||
} else {
|
||||
nu.Base = m.Short()
|
||||
}
|
||||
|
||||
return nu
|
||||
}
|
||||
|
||||
func deepCopyJobMeta(j *JobMeta) schema.JobMeta {
|
||||
var jn schema.JobMeta
|
||||
|
||||
//required properties
|
||||
jn.JobID = j.JobID
|
||||
jn.User = j.User
|
||||
jn.Project = j.Project
|
||||
jn.Cluster = j.Cluster
|
||||
jn.SubCluster = j.SubCluster
|
||||
jn.NumNodes = j.NumNodes
|
||||
jn.Exclusive = j.Exclusive
|
||||
jn.StartTime = j.StartTime
|
||||
jn.State = schema.JobState(j.State)
|
||||
jn.Duration = j.Duration
|
||||
|
||||
for _, ro := range j.Resources {
|
||||
var rn schema.Resource
|
||||
rn.Hostname = ro.Hostname
|
||||
rn.Configuration = ro.Configuration
|
||||
hwt := make([]int, len(ro.HWThreads))
|
||||
if ro.HWThreads != nil {
|
||||
copy(hwt, ro.HWThreads)
|
||||
}
|
||||
rn.HWThreads = hwt
|
||||
acc := make([]string, len(ro.Accelerators))
|
||||
if ro.Accelerators != nil {
|
||||
copy(acc, ro.Accelerators)
|
||||
}
|
||||
rn.Accelerators = acc
|
||||
jn.Resources = append(jn.Resources, &rn)
|
||||
}
|
||||
jn.MetaData = make(map[string]string)
|
||||
|
||||
for k, v := range j.MetaData {
|
||||
jn.MetaData[k] = v
|
||||
}
|
||||
|
||||
jn.Statistics = make(map[string]schema.JobStatistics)
|
||||
for k, v := range j.Statistics {
|
||||
var sn schema.JobStatistics
|
||||
sn.Avg = v.Avg
|
||||
sn.Max = v.Max
|
||||
sn.Min = v.Min
|
||||
tmpUnit := ConvertUnitString(v.Unit)
|
||||
if tmpUnit.Base == "inval" {
|
||||
sn.Unit = schema.Unit{Base: ""}
|
||||
} else {
|
||||
sn.Unit = tmpUnit
|
||||
}
|
||||
jn.Statistics[k] = sn
|
||||
}
|
||||
|
||||
//optional properties
|
||||
jn.Partition = j.Partition
|
||||
jn.ArrayJobId = j.ArrayJobId
|
||||
jn.NumHWThreads = j.NumHWThreads
|
||||
jn.NumAcc = j.NumAcc
|
||||
jn.MonitoringStatus = j.MonitoringStatus
|
||||
jn.SMT = j.SMT
|
||||
jn.Walltime = j.Walltime
|
||||
|
||||
for _, t := range j.Tags {
|
||||
jn.Tags = append(jn.Tags, t)
|
||||
}
|
||||
|
||||
return jn
|
||||
}
|
||||
|
||||
func deepCopyJobData(d *JobData, cluster string, subCluster string) *schema.JobData {
|
||||
var dn = make(schema.JobData)
|
||||
|
||||
for k, v := range *d {
|
||||
// fmt.Printf("Metric %s\n", k)
|
||||
dn[k] = make(map[schema.MetricScope]*schema.JobMetric)
|
||||
|
||||
for mk, mv := range v {
|
||||
// fmt.Printf("Scope %s\n", mk)
|
||||
var mn schema.JobMetric
|
||||
tmpUnit := ConvertUnitString(mv.Unit)
|
||||
if tmpUnit.Base == "inval" {
|
||||
mn.Unit = schema.Unit{Base: ""}
|
||||
} else {
|
||||
mn.Unit = tmpUnit
|
||||
}
|
||||
|
||||
mn.Timestep = mv.Timestep
|
||||
|
||||
for _, v := range mv.Series {
|
||||
var sn schema.Series
|
||||
sn.Hostname = v.Hostname
|
||||
if v.Id != nil {
|
||||
var id = new(string)
|
||||
|
||||
if mk == schema.MetricScopeAccelerator {
|
||||
s := GetSubCluster(cluster, subCluster)
|
||||
var err error
|
||||
|
||||
*id, err = s.Topology.GetAcceleratorID(*v.Id)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
} else {
|
||||
*id = fmt.Sprint(*v.Id)
|
||||
}
|
||||
sn.Id = id
|
||||
}
|
||||
if v.Statistics != nil {
|
||||
sn.Statistics = schema.MetricStatistics{
|
||||
Avg: v.Statistics.Avg,
|
||||
Min: v.Statistics.Min,
|
||||
Max: v.Statistics.Max}
|
||||
}
|
||||
|
||||
sn.Data = make([]schema.Float, len(v.Data))
|
||||
copy(sn.Data, v.Data)
|
||||
mn.Series = append(mn.Series, sn)
|
||||
}
|
||||
|
||||
dn[k][mk] = &mn
|
||||
}
|
||||
// fmt.Printf("FINISH %s\n", k)
|
||||
}
|
||||
|
||||
return &dn
|
||||
}
|
||||
|
||||
func deepCopyClusterConfig(co *Cluster) schema.Cluster {
|
||||
var cn schema.Cluster
|
||||
|
||||
cn.Name = co.Name
|
||||
for _, sco := range co.SubClusters {
|
||||
var scn schema.SubCluster
|
||||
scn.Name = sco.Name
|
||||
scn.Nodes = sco.Nodes
|
||||
scn.ProcessorType = sco.ProcessorType
|
||||
scn.SocketsPerNode = sco.SocketsPerNode
|
||||
scn.CoresPerSocket = sco.CoresPerSocket
|
||||
scn.ThreadsPerCore = sco.ThreadsPerCore
|
||||
scn.FlopRateScalar = schema.MetricValue{
|
||||
Unit: schema.Unit{Base: "F/s", Prefix: "G"},
|
||||
Value: float64(sco.FlopRateScalar)}
|
||||
scn.FlopRateSimd = schema.MetricValue{
|
||||
Unit: schema.Unit{Base: "F/s", Prefix: "G"},
|
||||
Value: float64(sco.FlopRateSimd)}
|
||||
scn.MemoryBandwidth = schema.MetricValue{
|
||||
Unit: schema.Unit{Base: "B/s", Prefix: "G"},
|
||||
Value: float64(sco.MemoryBandwidth)}
|
||||
scn.Topology = *sco.Topology
|
||||
cn.SubClusters = append(cn.SubClusters, &scn)
|
||||
}
|
||||
|
||||
for _, mco := range co.MetricConfig {
|
||||
var mcn schema.MetricConfig
|
||||
mcn.Name = mco.Name
|
||||
mcn.Scope = mco.Scope
|
||||
if mco.Aggregation == "" {
|
||||
fmt.Println("cluster.json - Property aggregation missing! Please review file!")
|
||||
mcn.Aggregation = "sum"
|
||||
} else {
|
||||
mcn.Aggregation = mco.Aggregation
|
||||
}
|
||||
mcn.Timestep = mco.Timestep
|
||||
tmpUnit := ConvertUnitString(mco.Unit)
|
||||
if tmpUnit.Base == "inval" {
|
||||
mcn.Unit = schema.Unit{Base: ""}
|
||||
} else {
|
||||
mcn.Unit = tmpUnit
|
||||
}
|
||||
mcn.Peak = mco.Peak
|
||||
mcn.Normal = mco.Normal
|
||||
mcn.Caution = mco.Caution
|
||||
mcn.Alert = mco.Alert
|
||||
mcn.SubClusters = mco.SubClusters
|
||||
|
||||
cn.MetricConfig = append(cn.MetricConfig, &mcn)
|
||||
}
|
||||
|
||||
return cn
|
||||
}
|
||||
|
||||
func convertJob(job *JobMeta) {
|
||||
// check if source data is available, otherwise skip job
|
||||
src_data_path := getPath(job, srcPath, "data.json")
|
||||
info, err := os.Stat(src_data_path)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
if info.Size() == 0 {
|
||||
fmt.Printf("Skip path %s, filesize is 0 Bytes.", src_data_path)
|
||||
return
|
||||
}
|
||||
|
||||
path := getPath(job, dstPath, "meta.json")
|
||||
err = os.MkdirAll(filepath.Dir(path), 0750)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
f, err := os.Create(path)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
jmn := deepCopyJobMeta(job)
|
||||
if err = EncodeJobMeta(f, &jmn); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
if err = f.Close(); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
f, err = os.Create(getPath(job, dstPath, "data.json"))
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
var jd *JobData
|
||||
jd, err = loadJobData(src_data_path)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
jdn := deepCopyJobData(jd, job.Cluster, job.SubCluster)
|
||||
if err := EncodeJobData(f, jdn); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
if err := f.Close(); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func main() {
|
||||
var flagLogLevel, flagConfigFile string
|
||||
var flagLogDateTime, debug bool
|
||||
|
||||
flag.BoolVar(&flagLogDateTime, "logdate", false, "Set this flag to add date and time to log messages")
|
||||
flag.BoolVar(&debug, "debug", false, "Set this flag to force sequential execution for debugging")
|
||||
flag.StringVar(&flagLogLevel, "loglevel", "warn", "Sets the logging level: `[debug,info,warn (default),err,fatal,crit]`")
|
||||
flag.StringVar(&flagConfigFile, "config", "./config.json", "Specify alternative path to `config.json`")
|
||||
flag.StringVar(&srcPath, "src", "./var/job-archive", "Specify the source job archive path")
|
||||
flag.StringVar(&dstPath, "dst", "./var/job-archive-new", "Specify the destination job archive path")
|
||||
flag.Parse()
|
||||
|
||||
if _, err := os.Stat(filepath.Join(srcPath, "version.txt")); !errors.Is(err, os.ErrNotExist) {
|
||||
log.Fatal("Archive version exists!")
|
||||
}
|
||||
|
||||
log.Init(flagLogLevel, flagLogDateTime)
|
||||
config.Init(flagConfigFile)
|
||||
srcConfig := fmt.Sprintf("{\"path\": \"%s\"}", srcPath)
|
||||
err := ar.Init(json.RawMessage(srcConfig))
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
err = initClusterConfig()
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
// setup new job archive
|
||||
err = os.Mkdir(dstPath, 0750)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
for _, c := range Clusters {
|
||||
path := fmt.Sprintf("%s/%s", dstPath, c.Name)
|
||||
fmt.Println(path)
|
||||
err = os.Mkdir(path, 0750)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
cn := deepCopyClusterConfig(c)
|
||||
|
||||
f, err := os.Create(fmt.Sprintf("%s/%s/cluster.json", dstPath, c.Name))
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
if err := EncodeCluster(f, &cn); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
if err := f.Close(); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
|
||||
for job := range ar.Iter() {
|
||||
if debug {
|
||||
fmt.Printf("Job %d\n", job.JobID)
|
||||
convertJob(job)
|
||||
} else {
|
||||
job := job
|
||||
wg.Add(1)
|
||||
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
convertJob(job)
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
os.WriteFile(filepath.Join(dstPath, "version.txt"), []byte(fmt.Sprintf("%d", Version)), 0644)
|
||||
}
|
@ -1,65 +0,0 @@
|
||||
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
|
||||
// All rights reserved.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
package main
|
||||
|
||||
import (
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
||||
)
|
||||
|
||||
type JobData map[string]map[schema.MetricScope]*JobMetric
|
||||
|
||||
type JobMetric struct {
|
||||
Unit string `json:"unit"`
|
||||
Scope schema.MetricScope `json:"scope"`
|
||||
Timestep int `json:"timestep"`
|
||||
Series []Series `json:"series"`
|
||||
StatisticsSeries *StatsSeries `json:"statisticsSeries"`
|
||||
}
|
||||
|
||||
type Series struct {
|
||||
Hostname string `json:"hostname"`
|
||||
Id *int `json:"id,omitempty"`
|
||||
Statistics *MetricStatistics `json:"statistics"`
|
||||
Data []schema.Float `json:"data"`
|
||||
}
|
||||
|
||||
type MetricStatistics struct {
|
||||
Avg float64 `json:"avg"`
|
||||
Min float64 `json:"min"`
|
||||
Max float64 `json:"max"`
|
||||
}
|
||||
|
||||
type StatsSeries struct {
|
||||
Mean []Float `json:"mean"`
|
||||
Min []Float `json:"min"`
|
||||
Max []Float `json:"max"`
|
||||
Percentiles map[int][]Float `json:"percentiles,omitempty"`
|
||||
}
|
||||
|
||||
// type MetricScope string
|
||||
|
||||
// const (
|
||||
// MetricScopeInvalid MetricScope = "invalid_scope"
|
||||
|
||||
// MetricScopeNode MetricScope = "node"
|
||||
// MetricScopeSocket MetricScope = "socket"
|
||||
// MetricScopeMemoryDomain MetricScope = "memoryDomain"
|
||||
// MetricScopeCore MetricScope = "core"
|
||||
// MetricScopeHWThread MetricScope = "hwthread"
|
||||
|
||||
// MetricScopeAccelerator MetricScope = "accelerator"
|
||||
// )
|
||||
|
||||
// var metricScopeGranularity map[MetricScope]int = map[MetricScope]int{
|
||||
// MetricScopeNode: 10,
|
||||
// MetricScopeSocket: 5,
|
||||
// MetricScopeMemoryDomain: 3,
|
||||
// MetricScopeCore: 2,
|
||||
// MetricScopeHWThread: 1,
|
||||
|
||||
// MetricScopeAccelerator: 5, // Special/Randomly choosen
|
||||
|
||||
// MetricScopeInvalid: -1,
|
||||
// }
|
Loading…
Reference in New Issue
Block a user