diff --git a/configs/config-demo.json b/configs/config-demo.json index 8423758..e8d4570 100644 --- a/configs/config-demo.json +++ b/configs/config-demo.json @@ -1,56 +1,67 @@ { - "addr": "127.0.0.1:8080", - "archive": { - "kind": "file", - "path": "./var/job-archive" - }, - "jwts": { - "max-age": "2000h" - }, - "clusters": [ - { - "name": "fritz", - "metricDataRepository": { - "kind": "cc-metric-store", - "url": "http://localhost:8082", - "token": "" - }, - "filterRanges": { - "numNodes": { - "from": 1, - "to": 64 - }, - "duration": { - "from": 0, - "to": 86400 - }, - "startTime": { - "from": "2022-01-01T00:00:00Z", - "to": null - } - } - }, - { - "name": "alex", - "metricDataRepository": { - "kind": "cc-metric-store", - "url": "http://localhost:8082", - "token": "" - }, - "filterRanges": { - "numNodes": { - "from": 1, - "to": 64 - }, - "duration": { - "from": 0, - "to": 86400 - }, - "startTime": { - "from": "2022-01-01T00:00:00Z", - "to": null - } - } - } + "addr": "127.0.0.1:8080", + "short-running-jobs-duration": 300, + "archive": { + "kind": "file", + "path": "./var/job-archive" + }, + "jwts": { + "max-age": "2000h" + }, + "enable-resampling": { + "trigger": 30, + "resolutions": [ + 600, + 300, + 120, + 60 ] + }, + "emission-constant": 317, + "clusters": [ + { + "name": "fritz", + "metricDataRepository": { + "kind": "cc-metric-store", + "url": "http://localhost:8082", + "token": "" + }, + "filterRanges": { + "numNodes": { + "from": 1, + "to": 64 + }, + "duration": { + "from": 0, + "to": 86400 + }, + "startTime": { + "from": "2022-01-01T00:00:00Z", + "to": null + } + } + }, + { + "name": "alex", + "metricDataRepository": { + "kind": "cc-metric-store", + "url": "http://localhost:8082", + "token": "" + }, + "filterRanges": { + "numNodes": { + "from": 1, + "to": 64 + }, + "duration": { + "from": 0, + "to": 86400 + }, + "startTime": { + "from": "2022-01-01T00:00:00Z", + "to": null + } + } + } + ] } diff --git a/internal/api/api_test.go b/internal/api/api_test.go index 0312e43..3d1d7bb 100644 --- a/internal/api/api_test.go +++ b/internal/api/api_test.go @@ -120,7 +120,7 @@ func setup(t *testing.T) *api.RestApi { t.Fatal(err) } - if err := os.WriteFile(filepath.Join(jobarchive, "version.txt"), []byte(fmt.Sprintf("%d", 1)), 0666); err != nil { + if err := os.WriteFile(filepath.Join(jobarchive, "version.txt"), []byte(fmt.Sprintf("%d", 2)), 0666); err != nil { t.Fatal(err) } diff --git a/internal/importer/handleImport.go b/internal/importer/handleImport.go index 153402a..01773a5 100644 --- a/internal/importer/handleImport.go +++ b/internal/importer/handleImport.go @@ -8,6 +8,7 @@ import ( "bytes" "encoding/json" "fmt" + "math" "os" "strings" @@ -84,7 +85,8 @@ func HandleImportFlag(flag string) error { } name := fmt.Sprintf("%s_%s", fp, statType) - job.Footprint[fp] = repository.LoadJobStat(&job, name, statType) + + job.Footprint[name] = repository.LoadJobStat(&job, fp, statType) } job.RawFootprint, err = json.Marshal(job.Footprint) @@ -92,6 +94,34 @@ func HandleImportFlag(flag string) error { log.Warn("Error while marshaling job footprint") return err } + + job.EnergyFootprint = make(map[string]float64) + var totalEnergy float64 + var energy float64 + + for _, fp := range sc.EnergyFootprint { + if i, err := archive.MetricIndex(sc.MetricConfig, fp); err == nil { + // Note: For DB data, calculate and save as kWh + // Energy: Power (in Watts) * Time (in Seconds) + if sc.MetricConfig[i].Energy == "energy" { // this metric has energy as unit (Joules) + } else if sc.MetricConfig[i].Energy == "power" { // this metric has power as unit (Watt) + // Unit: ( W * s ) / 3600 / 1000 = kWh ; Rounded to 2 nearest digits + energy = math.Round(((repository.LoadJobStat(&job, fp, "avg")*float64(job.Duration))/3600/1000)*100) / 100 + } + } else { + log.Warnf("Error while collecting energy metric %s for job, DB ID '%v', return '0.0'", fp, job.ID) + } + + job.EnergyFootprint[fp] = energy + totalEnergy += energy + } + + job.Energy = (math.Round(totalEnergy*100) / 100) + if job.RawEnergyFootprint, err = json.Marshal(job.EnergyFootprint); err != nil { + log.Warnf("Error while marshaling energy footprint for job INTO BYTES, DB ID '%v'", job.ID) + return err + } + job.RawResources, err = json.Marshal(job.Resources) if err != nil { log.Warn("Error while marshaling job resources") diff --git a/internal/importer/importer_test.go b/internal/importer/importer_test.go index ce0d2e1..4e839cf 100644 --- a/internal/importer/importer_test.go +++ b/internal/importer/importer_test.go @@ -82,7 +82,7 @@ func setup(t *testing.T) *repository.JobRepository { if err := os.Mkdir(jobarchive, 0777); err != nil { t.Fatal(err) } - if err := os.WriteFile(filepath.Join(jobarchive, "version.txt"), []byte(fmt.Sprintf("%d", 1)), 0666); err != nil { + if err := os.WriteFile(filepath.Join(jobarchive, "version.txt"), []byte(fmt.Sprintf("%d", 2)), 0666); err != nil { t.Fatal(err) } fritzArchive := filepath.Join(tmpdir, "job-archive", "fritz") diff --git a/internal/importer/initDB.go b/internal/importer/initDB.go index 5f06f36..fa2ee6e 100644 --- a/internal/importer/initDB.go +++ b/internal/importer/initDB.go @@ -7,6 +7,7 @@ package importer import ( "encoding/json" "fmt" + "math" "strings" "time" @@ -70,6 +71,7 @@ func InitDB() error { log.Errorf("cannot get subcluster: %s", err.Error()) return err } + job.Footprint = make(map[string]float64) for _, fp := range sc.Footprint { @@ -81,7 +83,7 @@ func InitDB() error { name := fmt.Sprintf("%s_%s", fp, statType) - job.Footprint[fp] = repository.LoadJobStat(jobMeta, name, statType) + job.Footprint[name] = repository.LoadJobStat(jobMeta, fp, statType) } job.RawFootprint, err = json.Marshal(job.Footprint) @@ -90,6 +92,33 @@ func InitDB() error { return err } + job.EnergyFootprint = make(map[string]float64) + var totalEnergy float64 + var energy float64 + + for _, fp := range sc.EnergyFootprint { + if i, err := archive.MetricIndex(sc.MetricConfig, fp); err == nil { + // Note: For DB data, calculate and save as kWh + // Energy: Power (in Watts) * Time (in Seconds) + if sc.MetricConfig[i].Energy == "energy" { // this metric has energy as unit (Joules) + } else if sc.MetricConfig[i].Energy == "power" { // this metric has power as unit (Watt) + // Unit: ( W * s ) / 3600 / 1000 = kWh ; Rounded to 2 nearest digits + energy = math.Round(((repository.LoadJobStat(jobMeta, fp, "avg")*float64(jobMeta.Duration))/3600/1000)*100) / 100 + } + } else { + log.Warnf("Error while collecting energy metric %s for job, DB ID '%v', return '0.0'", fp, jobMeta.ID) + } + + job.EnergyFootprint[fp] = energy + totalEnergy += energy + } + + job.Energy = (math.Round(totalEnergy*100) / 100) + if job.RawEnergyFootprint, err = json.Marshal(job.EnergyFootprint); err != nil { + log.Warnf("Error while marshaling energy footprint for job INTO BYTES, DB ID '%v'", jobMeta.ID) + return err + } + job.RawResources, err = json.Marshal(job.Resources) if err != nil { log.Errorf("repository initDB(): %v", err) diff --git a/internal/repository/jobCreate.go b/internal/repository/jobCreate.go index 43c26c1..1b05b52 100644 --- a/internal/repository/jobCreate.go +++ b/internal/repository/jobCreate.go @@ -15,10 +15,10 @@ import ( const NamedJobInsert string = `INSERT INTO job ( job_id, user, project, cluster, subcluster, ` + "`partition`" + `, array_job_id, num_nodes, num_hwthreads, num_acc, - exclusive, monitoring_status, smt, job_state, start_time, duration, walltime, footprint, resources, meta_data + exclusive, monitoring_status, smt, job_state, start_time, duration, walltime, footprint, energy, energy_footprint, resources, meta_data ) VALUES ( :job_id, :user, :project, :cluster, :subcluster, :partition, :array_job_id, :num_nodes, :num_hwthreads, :num_acc, - :exclusive, :monitoring_status, :smt, :job_state, :start_time, :duration, :walltime, :footprint, :resources, :meta_data + :exclusive, :monitoring_status, :smt, :job_state, :start_time, :duration, :walltime, :footprint, :energy, :energy_footprint, :resources, :meta_data );` func (r *JobRepository) InsertJob(job *schema.JobMeta) (int64, error) { diff --git a/pkg/archive/archive.go b/pkg/archive/archive.go index 52a760f..c6c04e4 100644 --- a/pkg/archive/archive.go +++ b/pkg/archive/archive.go @@ -14,7 +14,7 @@ import ( "github.com/ClusterCockpit/cc-backend/pkg/schema" ) -const Version uint64 = 1 +const Version uint64 = 2 type ArchiveBackend interface { Init(rawConfig json.RawMessage) (uint64, error) diff --git a/pkg/archive/fsBackend_test.go b/pkg/archive/fsBackend_test.go index d60e478..9db68ed 100644 --- a/pkg/archive/fsBackend_test.go +++ b/pkg/archive/fsBackend_test.go @@ -48,7 +48,7 @@ func TestInit(t *testing.T) { if fsa.path != "testdata/archive" { t.Fail() } - if version != 1 { + if version != 2 { t.Fail() } if len(fsa.clusters) != 3 || fsa.clusters[1] != "emmy" { diff --git a/pkg/archive/testdata/archive/version.txt b/pkg/archive/testdata/archive/version.txt index d00491f..0cfbf08 100644 --- a/pkg/archive/testdata/archive/version.txt +++ b/pkg/archive/testdata/archive/version.txt @@ -1 +1 @@ -1 +2 diff --git a/tools/archive-migration/cluster.go b/tools/archive-migration/cluster.go deleted file mode 100644 index f9a45ad..0000000 --- a/tools/archive-migration/cluster.go +++ /dev/null @@ -1,65 +0,0 @@ -// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. -// All rights reserved. -// Use of this source code is governed by a MIT-style -// license that can be found in the LICENSE file. -package main - -import ( - "github.com/ClusterCockpit/cc-backend/pkg/schema" -) - -// type Accelerator struct { -// ID string `json:"id"` -// Type string `json:"type"` -// Model string `json:"model"` -// } - -// type Topology struct { -// Node []int `json:"node"` -// Socket [][]int `json:"socket"` -// MemoryDomain [][]int `json:"memoryDomain"` -// Die [][]int `json:"die"` -// Core [][]int `json:"core"` -// Accelerators []*Accelerator `json:"accelerators"` -// } - -type SubCluster struct { - Name string `json:"name"` - Nodes string `json:"nodes"` - NumberOfNodes int `json:"numberOfNodes"` - ProcessorType string `json:"processorType"` - SocketsPerNode int `json:"socketsPerNode"` - CoresPerSocket int `json:"coresPerSocket"` - ThreadsPerCore int `json:"threadsPerCore"` - FlopRateScalar int `json:"flopRateScalar"` - FlopRateSimd int `json:"flopRateSimd"` - MemoryBandwidth int `json:"memoryBandwidth"` - Topology *schema.Topology `json:"topology"` -} - -// type SubClusterConfig struct { -// Name string `json:"name"` -// Peak float64 `json:"peak"` -// Normal float64 `json:"normal"` -// Caution float64 `json:"caution"` -// Alert float64 `json:"alert"` -// } - -type MetricConfig struct { - Name string `json:"name"` - Unit string `json:"unit"` - Scope schema.MetricScope `json:"scope"` - Aggregation string `json:"aggregation"` - Timestep int `json:"timestep"` - Peak float64 `json:"peak"` - Normal float64 `json:"normal"` - Caution float64 `json:"caution"` - Alert float64 `json:"alert"` - SubClusters []*schema.SubClusterConfig `json:"subClusters"` -} - -type Cluster struct { - Name string `json:"name"` - MetricConfig []*MetricConfig `json:"metricConfig"` - SubClusters []*SubCluster `json:"subClusters"` -} diff --git a/tools/archive-migration/clusterConfig.go b/tools/archive-migration/clusterConfig.go deleted file mode 100644 index 0f9f426..0000000 --- a/tools/archive-migration/clusterConfig.go +++ /dev/null @@ -1,166 +0,0 @@ -// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. -// All rights reserved. -// Use of this source code is governed by a MIT-style -// license that can be found in the LICENSE file. -package main - -import ( - "errors" - "fmt" - - "github.com/ClusterCockpit/cc-backend/pkg/archive" - "github.com/ClusterCockpit/cc-backend/pkg/schema" -) - -var Clusters []*Cluster -var nodeLists map[string]map[string]archive.NodeList - -func initClusterConfig() error { - - Clusters = []*Cluster{} - nodeLists = map[string]map[string]archive.NodeList{} - - for _, c := range ar.GetClusters() { - - cluster, err := ar.LoadClusterCfg(c) - if err != nil { - return err - } - - if len(cluster.Name) == 0 || - len(cluster.MetricConfig) == 0 || - len(cluster.SubClusters) == 0 { - return errors.New("cluster.name, cluster.metricConfig and cluster.SubClusters should not be empty") - } - - for _, mc := range cluster.MetricConfig { - if len(mc.Name) == 0 { - return errors.New("cluster.metricConfig.name should not be empty") - } - if mc.Timestep < 1 { - return errors.New("cluster.metricConfig.timestep should not be smaller than one") - } - - // For backwards compability... - if mc.Scope == "" { - mc.Scope = schema.MetricScopeNode - } - if !mc.Scope.Valid() { - return errors.New("cluster.metricConfig.scope must be a valid scope ('node', 'scocket', ...)") - } - } - - Clusters = append(Clusters, cluster) - - nodeLists[cluster.Name] = make(map[string]archive.NodeList) - for _, sc := range cluster.SubClusters { - if sc.Nodes == "" { - continue - } - - nl, err := archive.ParseNodeList(sc.Nodes) - if err != nil { - return fmt.Errorf("in %s/cluster.json: %w", cluster.Name, err) - } - nodeLists[cluster.Name][sc.Name] = nl - } - } - - return nil -} - -func GetCluster(cluster string) *Cluster { - - for _, c := range Clusters { - if c.Name == cluster { - return c - } - } - return nil -} - -func GetSubCluster(cluster, subcluster string) *SubCluster { - - for _, c := range Clusters { - if c.Name == cluster { - for _, p := range c.SubClusters { - if p.Name == subcluster { - return p - } - } - } - } - return nil -} - -func GetMetricConfig(cluster, metric string) *MetricConfig { - - for _, c := range Clusters { - if c.Name == cluster { - for _, m := range c.MetricConfig { - if m.Name == metric { - return m - } - } - } - } - return nil -} - -// AssignSubCluster sets the `job.subcluster` property of the job based -// on its cluster and resources. -func AssignSubCluster(job *BaseJob) error { - - cluster := GetCluster(job.Cluster) - if cluster == nil { - return fmt.Errorf("unkown cluster: %#v", job.Cluster) - } - - if job.SubCluster != "" { - for _, sc := range cluster.SubClusters { - if sc.Name == job.SubCluster { - return nil - } - } - return fmt.Errorf("already assigned subcluster %#v unkown (cluster: %#v)", job.SubCluster, job.Cluster) - } - - if len(job.Resources) == 0 { - return fmt.Errorf("job without any resources/hosts") - } - - host0 := job.Resources[0].Hostname - for sc, nl := range nodeLists[job.Cluster] { - if nl != nil && nl.Contains(host0) { - job.SubCluster = sc - return nil - } - } - - if cluster.SubClusters[0].Nodes == "" { - job.SubCluster = cluster.SubClusters[0].Name - return nil - } - - return fmt.Errorf("no subcluster found for cluster %#v and host %#v", job.Cluster, host0) -} - -func GetSubClusterByNode(cluster, hostname string) (string, error) { - - for sc, nl := range nodeLists[cluster] { - if nl != nil && nl.Contains(hostname) { - return sc, nil - } - } - - c := GetCluster(cluster) - if c == nil { - return "", fmt.Errorf("unkown cluster: %#v", cluster) - } - - if c.SubClusters[0].Nodes == "" { - return c.SubClusters[0].Name, nil - } - - return "", fmt.Errorf("no subcluster found for cluster %#v and host %#v", cluster, hostname) -} diff --git a/tools/archive-migration/float.go b/tools/archive-migration/float.go deleted file mode 100644 index 3fbccf8..0000000 --- a/tools/archive-migration/float.go +++ /dev/null @@ -1,109 +0,0 @@ -// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. -// All rights reserved. -// Use of this source code is governed by a MIT-style -// license that can be found in the LICENSE file. -package main - -import ( - "errors" - "io" - "math" - "strconv" -) - -// A custom float type is used so that (Un)MarshalJSON and -// (Un)MarshalGQL can be overloaded and NaN/null can be used. -// The default behaviour of putting every nullable value behind -// a pointer has a bigger overhead. -type Float float64 - -var NaN Float = Float(math.NaN()) -var nullAsBytes []byte = []byte("null") - -func (f Float) IsNaN() bool { - return math.IsNaN(float64(f)) -} - -// NaN will be serialized to `null`. -func (f Float) MarshalJSON() ([]byte, error) { - if f.IsNaN() { - return nullAsBytes, nil - } - - return strconv.AppendFloat(make([]byte, 0, 10), float64(f), 'f', 2, 64), nil -} - -// `null` will be unserialized to NaN. -func (f *Float) UnmarshalJSON(input []byte) error { - s := string(input) - if s == "null" { - *f = NaN - return nil - } - - val, err := strconv.ParseFloat(s, 64) - if err != nil { - return err - } - *f = Float(val) - return nil -} - -// UnmarshalGQL implements the graphql.Unmarshaler interface. -func (f *Float) UnmarshalGQL(v interface{}) error { - f64, ok := v.(float64) - if !ok { - return errors.New("invalid Float scalar") - } - - *f = Float(f64) - return nil -} - -// MarshalGQL implements the graphql.Marshaler interface. -// NaN will be serialized to `null`. -func (f Float) MarshalGQL(w io.Writer) { - if f.IsNaN() { - w.Write(nullAsBytes) - } else { - w.Write(strconv.AppendFloat(make([]byte, 0, 10), float64(f), 'f', 2, 64)) - } -} - -// Only used via REST-API, not via GraphQL. -// This uses a lot less allocations per series, -// but it turns out that the performance increase -// from using this is not that big. -func (s *Series) MarshalJSON() ([]byte, error) { - buf := make([]byte, 0, 512+len(s.Data)*8) - buf = append(buf, `{"hostname":"`...) - buf = append(buf, s.Hostname...) - buf = append(buf, '"') - if s.Id != nil { - buf = append(buf, `,"id":`...) - buf = strconv.AppendInt(buf, int64(*s.Id), 10) - } - if s.Statistics != nil { - buf = append(buf, `,"statistics":{"min":`...) - buf = strconv.AppendFloat(buf, s.Statistics.Min, 'f', 2, 64) - buf = append(buf, `,"avg":`...) - buf = strconv.AppendFloat(buf, s.Statistics.Avg, 'f', 2, 64) - buf = append(buf, `,"max":`...) - buf = strconv.AppendFloat(buf, s.Statistics.Max, 'f', 2, 64) - buf = append(buf, '}') - } - buf = append(buf, `,"data":[`...) - for i := 0; i < len(s.Data); i++ { - if i != 0 { - buf = append(buf, ',') - } - - if s.Data[i].IsNaN() { - buf = append(buf, `null`...) - } else { - buf = strconv.AppendFloat(buf, float64(s.Data[i]), 'f', 2, 32) - } - } - buf = append(buf, ']', '}') - return buf, nil -} diff --git a/tools/archive-migration/fsBackend.go b/tools/archive-migration/fsBackend.go deleted file mode 100644 index 81cf57e..0000000 --- a/tools/archive-migration/fsBackend.go +++ /dev/null @@ -1,142 +0,0 @@ -// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. -// All rights reserved. -// Use of this source code is governed by a MIT-style -// license that can be found in the LICENSE file. -package main - -import ( - "bufio" - "bytes" - "encoding/json" - "fmt" - "os" - "path/filepath" - "strconv" - - "github.com/ClusterCockpit/cc-backend/pkg/log" -) - -type FsArchiveConfig struct { - Path string `json:"path"` -} - -type FsArchive struct { - path string - clusters []string -} - -func getPath( - job *JobMeta, - rootPath string, - file string) string { - - lvl1, lvl2 := fmt.Sprintf("%d", job.JobID/1000), fmt.Sprintf("%03d", job.JobID%1000) - return filepath.Join( - rootPath, - job.Cluster, - lvl1, lvl2, - strconv.FormatInt(job.StartTime, 10), file) -} - -func loadJobMeta(filename string) (*JobMeta, error) { - - f, err := os.Open(filename) - if err != nil { - log.Errorf("fsBackend loadJobMeta()- %v", err) - return &JobMeta{}, err - } - defer f.Close() - - return DecodeJobMeta(bufio.NewReader(f)) -} - -func (fsa *FsArchive) Init(rawConfig json.RawMessage) error { - - var config FsArchiveConfig - if err := json.Unmarshal(rawConfig, &config); err != nil { - log.Errorf("fsBackend Init()- %v", err) - return err - } - if config.Path == "" { - err := fmt.Errorf("fsBackend Init()- empty path") - log.Errorf("fsBackend Init()- %v", err) - return err - } - fsa.path = config.Path - - entries, err := os.ReadDir(fsa.path) - if err != nil { - log.Errorf("fsBackend Init()- %v", err) - return err - } - - for _, de := range entries { - fsa.clusters = append(fsa.clusters, de.Name()) - } - - return nil -} - -func (fsa *FsArchive) Iter() <-chan *JobMeta { - - ch := make(chan *JobMeta) - go func() { - clustersDir, err := os.ReadDir(fsa.path) - if err != nil { - log.Fatalf("Reading clusters failed: %s", err.Error()) - } - - for _, clusterDir := range clustersDir { - lvl1Dirs, err := os.ReadDir(filepath.Join(fsa.path, clusterDir.Name())) - if err != nil { - log.Fatalf("Reading jobs failed: %s", err.Error()) - } - - for _, lvl1Dir := range lvl1Dirs { - if !lvl1Dir.IsDir() { - // Could be the cluster.json file - continue - } - - lvl2Dirs, err := os.ReadDir(filepath.Join(fsa.path, clusterDir.Name(), lvl1Dir.Name())) - if err != nil { - log.Fatalf("Reading jobs failed: %s", err.Error()) - } - - for _, lvl2Dir := range lvl2Dirs { - dirpath := filepath.Join(fsa.path, clusterDir.Name(), lvl1Dir.Name(), lvl2Dir.Name()) - startTimeDirs, err := os.ReadDir(dirpath) - if err != nil { - log.Fatalf("Reading jobs failed: %s", err.Error()) - } - - for _, startTimeDir := range startTimeDirs { - if startTimeDir.IsDir() { - job, err := loadJobMeta(filepath.Join(dirpath, startTimeDir.Name(), "meta.json")) - if err != nil { - log.Errorf("in %s: %s", filepath.Join(dirpath, startTimeDir.Name()), err.Error()) - } else { - ch <- job - } - } - } - } - } - } - close(ch) - }() - return ch -} - -func (fsa *FsArchive) LoadClusterCfg(name string) (*Cluster, error) { - b, err := os.ReadFile(filepath.Join(fsa.path, name, "cluster.json")) - if err != nil { - log.Errorf("fsBackend LoadClusterCfg()- %v", err) - return &Cluster{}, err - } - return DecodeCluster(bytes.NewReader(b)) -} - -func (fsa *FsArchive) GetClusters() []string { - return fsa.clusters -} diff --git a/tools/archive-migration/job.go b/tools/archive-migration/job.go deleted file mode 100644 index 8705ce9..0000000 --- a/tools/archive-migration/job.go +++ /dev/null @@ -1,162 +0,0 @@ -// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. -// All rights reserved. -// Use of this source code is governed by a MIT-style -// license that can be found in the LICENSE file. -package main - -import ( - "errors" - "fmt" - "io" - "time" - - "github.com/ClusterCockpit/cc-backend/pkg/schema" -) - -// Non-Swaggered Comment: BaseJob -// Non-Swaggered Comment: Common subset of Job and JobMeta. Use one of those, not this type directly. - -type BaseJob struct { - // The unique identifier of a job - JobID int64 `json:"jobId" db:"job_id" example:"123000"` - User string `json:"user" db:"user" example:"abcd100h"` // The unique identifier of a user - Project string `json:"project" db:"project" example:"abcd200"` // The unique identifier of a project - Cluster string `json:"cluster" db:"cluster" example:"fritz"` // The unique identifier of a cluster - SubCluster string `json:"subCluster" db:"subcluster" example:"main"` // The unique identifier of a sub cluster - Partition string `json:"partition" db:"partition" example:"main"` // The Slurm partition to which the job was submitted - ArrayJobId int64 `json:"arrayJobId" db:"array_job_id" example:"123000"` // The unique identifier of an array job - NumNodes int32 `json:"numNodes" db:"num_nodes" example:"2" minimum:"1"` // Number of nodes used (Min > 0) - NumHWThreads int32 `json:"numHwthreads" db:"num_hwthreads" example:"20" minimum:"1"` // Number of HWThreads used (Min > 0) - NumAcc int32 `json:"numAcc" db:"num_acc" example:"2" minimum:"1"` // Number of accelerators used (Min > 0) - Exclusive int32 `json:"exclusive" db:"exclusive" example:"1" minimum:"0" maximum:"2"` // Specifies how nodes are shared: 0 - Shared among multiple jobs of multiple users, 1 - Job exclusive (Default), 2 - Shared among multiple jobs of same user - MonitoringStatus int32 `json:"monitoringStatus" db:"monitoring_status" example:"1" minimum:"0" maximum:"3"` // State of monitoring system during job run: 0 - Disabled, 1 - Running or Archiving (Default), 2 - Archiving Failed, 3 - Archiving Successfull - SMT int32 `json:"smt" db:"smt" example:"4"` // SMT threads used by job - State JobState `json:"jobState" db:"job_state" example:"completed" enums:"completed,failed,cancelled,stopped,timeout,out_of_memory"` // Final state of job - Duration int32 `json:"duration" db:"duration" example:"43200" minimum:"1"` // Duration of job in seconds (Min > 0) - Walltime int64 `json:"walltime" db:"walltime" example:"86400" minimum:"1"` // Requested walltime of job in seconds (Min > 0) - Tags []*schema.Tag `json:"tags"` // List of tags - RawResources []byte `json:"-" db:"resources"` // Resources used by job [As Bytes] - Resources []*Resource `json:"resources"` // Resources used by job - RawMetaData []byte `json:"-" db:"meta_data"` // Additional information about the job [As Bytes] - MetaData map[string]string `json:"metaData"` // Additional information about the job -} - -// Non-Swaggered Comment: Job -// Non-Swaggered Comment: This type is used as the GraphQL interface and using sqlx as a table row. - -// Job model -// @Description Information of a HPC job. -type Job struct { - // The unique identifier of a job in the database - ID int64 `json:"id" db:"id"` - BaseJob - StartTimeUnix int64 `json:"-" db:"start_time" example:"1649723812"` // Start epoch time stamp in seconds - StartTime time.Time `json:"startTime"` // Start time as 'time.Time' data type - MemUsedMax float64 `json:"-" db:"mem_used_max"` // MemUsedMax as Float64 - FlopsAnyAvg float64 `json:"-" db:"flops_any_avg"` // FlopsAnyAvg as Float64 - MemBwAvg float64 `json:"-" db:"mem_bw_avg"` // MemBwAvg as Float64 - LoadAvg float64 `json:"-" db:"load_avg"` // LoadAvg as Float64 - NetBwAvg float64 `json:"-" db:"net_bw_avg"` // NetBwAvg as Float64 - NetDataVolTotal float64 `json:"-" db:"net_data_vol_total"` // NetDataVolTotal as Float64 - FileBwAvg float64 `json:"-" db:"file_bw_avg"` // FileBwAvg as Float64 - FileDataVolTotal float64 `json:"-" db:"file_data_vol_total"` // FileDataVolTotal as Float64 -} - -// Non-Swaggered Comment: JobMeta -// Non-Swaggered Comment: When reading from the database or sending data via GraphQL, the start time can be in the much more -// Non-Swaggered Comment: convenient time.Time type. In the `meta.json` files, the start time is encoded as a unix epoch timestamp. -// Non-Swaggered Comment: This is why there is this struct, which contains all fields from the regular job struct, but "overwrites" -// Non-Swaggered Comment: the StartTime field with one of type int64. -// Non-Swaggered Comment: ID *int64 `json:"id,omitempty"` >> never used in the job-archive, only available via REST-API - -// JobMeta model -// @Description Meta data information of a HPC job. -type JobMeta struct { - // The unique identifier of a job in the database - ID *int64 `json:"id,omitempty"` - BaseJob - StartTime int64 `json:"startTime" db:"start_time" example:"1649723812" minimum:"1"` // Start epoch time stamp in seconds (Min > 0) - Statistics map[string]JobStatistics `json:"statistics,omitempty"` // Metric statistics of job -} - -const ( - MonitoringStatusDisabled int32 = 0 - MonitoringStatusRunningOrArchiving int32 = 1 - MonitoringStatusArchivingFailed int32 = 2 - MonitoringStatusArchivingSuccessful int32 = 3 -) - -var JobDefaults BaseJob = BaseJob{ - Exclusive: 1, - MonitoringStatus: MonitoringStatusRunningOrArchiving, -} - -// JobStatistics model -// @Description Specification for job metric statistics. -type JobStatistics struct { - // Metric unit (see schema/unit.schema.json) - Unit string `json:"unit" example:"GHz"` - Avg float64 `json:"avg" example:"2500" minimum:"0"` // Job metric average - Min float64 `json:"min" example:"2000" minimum:"0"` // Job metric minimum - Max float64 `json:"max" example:"3000" minimum:"0"` // Job metric maximum -} - -// Tag model -// @Description Defines a tag using name and type. -type Tag struct { - // The unique DB identifier of a tag - ID int64 `json:"id" db:"id"` - Type string `json:"type" db:"tag_type" example:"Debug"` // Tag Type - Name string `json:"name" db:"tag_name" example:"Testjob"` // Tag Name -} - -// Resource model -// @Description A resource used by a job -type Resource struct { - Hostname string `json:"hostname"` // Name of the host (= node) - HWThreads []int `json:"hwthreads,omitempty"` // List of OS processor ids - Accelerators []string `json:"accelerators,omitempty"` // List of of accelerator device ids - Configuration string `json:"configuration,omitempty"` // The configuration options of the node -} - -type JobState string - -const ( - JobStateRunning JobState = "running" - JobStateCompleted JobState = "completed" - JobStateFailed JobState = "failed" - JobStateCancelled JobState = "cancelled" - JobStateStopped JobState = "stopped" - JobStateTimeout JobState = "timeout" - JobStatePreempted JobState = "preempted" - JobStateOutOfMemory JobState = "out_of_memory" -) - -func (e *JobState) UnmarshalGQL(v interface{}) error { - str, ok := v.(string) - if !ok { - return fmt.Errorf("enums must be strings") - } - - *e = JobState(str) - if !e.Valid() { - return errors.New("invalid job state") - } - - return nil -} - -func (e JobState) MarshalGQL(w io.Writer) { - fmt.Fprintf(w, "\"%s\"", e) -} - -func (e JobState) Valid() bool { - return e == JobStateRunning || - e == JobStateCompleted || - e == JobStateFailed || - e == JobStateCancelled || - e == JobStateStopped || - e == JobStateTimeout || - e == JobStatePreempted || - e == JobStateOutOfMemory -} diff --git a/tools/archive-migration/json.go b/tools/archive-migration/json.go deleted file mode 100644 index b2c281c..0000000 --- a/tools/archive-migration/json.go +++ /dev/null @@ -1,66 +0,0 @@ -// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. -// All rights reserved. -// Use of this source code is governed by a MIT-style -// license that can be found in the LICENSE file. -package main - -import ( - "encoding/json" - "io" - - "github.com/ClusterCockpit/cc-backend/pkg/schema" -) - -func DecodeJobData(r io.Reader) (*JobData, error) { - var d JobData - if err := json.NewDecoder(r).Decode(&d); err != nil { - return nil, err - } - - return &d, nil -} - -func DecodeJobMeta(r io.Reader) (*JobMeta, error) { - var d JobMeta - if err := json.NewDecoder(r).Decode(&d); err != nil { - return nil, err - } - - return &d, nil -} - -func DecodeCluster(r io.Reader) (*Cluster, error) { - var c Cluster - if err := json.NewDecoder(r).Decode(&c); err != nil { - return nil, err - } - - return &c, nil -} - -func EncodeJobData(w io.Writer, d *schema.JobData) error { - // Sanitize parameters - if err := json.NewEncoder(w).Encode(d); err != nil { - return err - } - - return nil -} - -func EncodeJobMeta(w io.Writer, d *schema.JobMeta) error { - // Sanitize parameters - if err := json.NewEncoder(w).Encode(d); err != nil { - return err - } - - return nil -} - -func EncodeCluster(w io.Writer, c *schema.Cluster) error { - // Sanitize parameters - if err := json.NewEncoder(w).Encode(c); err != nil { - return err - } - - return nil -} diff --git a/tools/archive-migration/main.go b/tools/archive-migration/main.go deleted file mode 100644 index b78e94e..0000000 --- a/tools/archive-migration/main.go +++ /dev/null @@ -1,371 +0,0 @@ -// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. -// All rights reserved. -// Use of this source code is governed by a MIT-style -// license that can be found in the LICENSE file. -package main - -import ( - "bufio" - "encoding/json" - "errors" - "flag" - "fmt" - "os" - "path/filepath" - "sync" - - "github.com/ClusterCockpit/cc-backend/internal/config" - "github.com/ClusterCockpit/cc-backend/pkg/log" - "github.com/ClusterCockpit/cc-backend/pkg/schema" - ccunits "github.com/ClusterCockpit/cc-units" -) - -const Version = 1 - -var ar FsArchive -var srcPath string -var dstPath string - -func loadJobData(filename string) (*JobData, error) { - - f, err := os.Open(filename) - if err != nil { - return &JobData{}, fmt.Errorf("fsBackend loadJobData()- %v", err) - } - defer f.Close() - - return DecodeJobData(bufio.NewReader(f)) -} - -func ConvertUnitString(us string) schema.Unit { - var nu schema.Unit - - if us == "CPI" || - us == "IPC" || - us == "load" || - us == "" { - nu.Base = us - return nu - } - u := ccunits.NewUnit(us) - p := u.GetPrefix() - if p.Prefix() != "" { - prefix := p.Prefix() - nu.Prefix = prefix - } - m := u.GetMeasure() - d := u.GetUnitDenominator() - if d.Short() != "inval" { - nu.Base = fmt.Sprintf("%s/%s", m.Short(), d.Short()) - } else { - nu.Base = m.Short() - } - - return nu -} - -func deepCopyJobMeta(j *JobMeta) schema.JobMeta { - var jn schema.JobMeta - - //required properties - jn.JobID = j.JobID - jn.User = j.User - jn.Project = j.Project - jn.Cluster = j.Cluster - jn.SubCluster = j.SubCluster - jn.NumNodes = j.NumNodes - jn.Exclusive = j.Exclusive - jn.StartTime = j.StartTime - jn.State = schema.JobState(j.State) - jn.Duration = j.Duration - - for _, ro := range j.Resources { - var rn schema.Resource - rn.Hostname = ro.Hostname - rn.Configuration = ro.Configuration - hwt := make([]int, len(ro.HWThreads)) - if ro.HWThreads != nil { - copy(hwt, ro.HWThreads) - } - rn.HWThreads = hwt - acc := make([]string, len(ro.Accelerators)) - if ro.Accelerators != nil { - copy(acc, ro.Accelerators) - } - rn.Accelerators = acc - jn.Resources = append(jn.Resources, &rn) - } - jn.MetaData = make(map[string]string) - - for k, v := range j.MetaData { - jn.MetaData[k] = v - } - - jn.Statistics = make(map[string]schema.JobStatistics) - for k, v := range j.Statistics { - var sn schema.JobStatistics - sn.Avg = v.Avg - sn.Max = v.Max - sn.Min = v.Min - tmpUnit := ConvertUnitString(v.Unit) - if tmpUnit.Base == "inval" { - sn.Unit = schema.Unit{Base: ""} - } else { - sn.Unit = tmpUnit - } - jn.Statistics[k] = sn - } - - //optional properties - jn.Partition = j.Partition - jn.ArrayJobId = j.ArrayJobId - jn.NumHWThreads = j.NumHWThreads - jn.NumAcc = j.NumAcc - jn.MonitoringStatus = j.MonitoringStatus - jn.SMT = j.SMT - jn.Walltime = j.Walltime - - for _, t := range j.Tags { - jn.Tags = append(jn.Tags, t) - } - - return jn -} - -func deepCopyJobData(d *JobData, cluster string, subCluster string) *schema.JobData { - var dn = make(schema.JobData) - - for k, v := range *d { - // fmt.Printf("Metric %s\n", k) - dn[k] = make(map[schema.MetricScope]*schema.JobMetric) - - for mk, mv := range v { - // fmt.Printf("Scope %s\n", mk) - var mn schema.JobMetric - tmpUnit := ConvertUnitString(mv.Unit) - if tmpUnit.Base == "inval" { - mn.Unit = schema.Unit{Base: ""} - } else { - mn.Unit = tmpUnit - } - - mn.Timestep = mv.Timestep - - for _, v := range mv.Series { - var sn schema.Series - sn.Hostname = v.Hostname - if v.Id != nil { - var id = new(string) - - if mk == schema.MetricScopeAccelerator { - s := GetSubCluster(cluster, subCluster) - var err error - - *id, err = s.Topology.GetAcceleratorID(*v.Id) - if err != nil { - log.Fatal(err) - } - - } else { - *id = fmt.Sprint(*v.Id) - } - sn.Id = id - } - if v.Statistics != nil { - sn.Statistics = schema.MetricStatistics{ - Avg: v.Statistics.Avg, - Min: v.Statistics.Min, - Max: v.Statistics.Max} - } - - sn.Data = make([]schema.Float, len(v.Data)) - copy(sn.Data, v.Data) - mn.Series = append(mn.Series, sn) - } - - dn[k][mk] = &mn - } - // fmt.Printf("FINISH %s\n", k) - } - - return &dn -} - -func deepCopyClusterConfig(co *Cluster) schema.Cluster { - var cn schema.Cluster - - cn.Name = co.Name - for _, sco := range co.SubClusters { - var scn schema.SubCluster - scn.Name = sco.Name - scn.Nodes = sco.Nodes - scn.ProcessorType = sco.ProcessorType - scn.SocketsPerNode = sco.SocketsPerNode - scn.CoresPerSocket = sco.CoresPerSocket - scn.ThreadsPerCore = sco.ThreadsPerCore - scn.FlopRateScalar = schema.MetricValue{ - Unit: schema.Unit{Base: "F/s", Prefix: "G"}, - Value: float64(sco.FlopRateScalar)} - scn.FlopRateSimd = schema.MetricValue{ - Unit: schema.Unit{Base: "F/s", Prefix: "G"}, - Value: float64(sco.FlopRateSimd)} - scn.MemoryBandwidth = schema.MetricValue{ - Unit: schema.Unit{Base: "B/s", Prefix: "G"}, - Value: float64(sco.MemoryBandwidth)} - scn.Topology = *sco.Topology - cn.SubClusters = append(cn.SubClusters, &scn) - } - - for _, mco := range co.MetricConfig { - var mcn schema.MetricConfig - mcn.Name = mco.Name - mcn.Scope = mco.Scope - if mco.Aggregation == "" { - fmt.Println("cluster.json - Property aggregation missing! Please review file!") - mcn.Aggregation = "sum" - } else { - mcn.Aggregation = mco.Aggregation - } - mcn.Timestep = mco.Timestep - tmpUnit := ConvertUnitString(mco.Unit) - if tmpUnit.Base == "inval" { - mcn.Unit = schema.Unit{Base: ""} - } else { - mcn.Unit = tmpUnit - } - mcn.Peak = mco.Peak - mcn.Normal = mco.Normal - mcn.Caution = mco.Caution - mcn.Alert = mco.Alert - mcn.SubClusters = mco.SubClusters - - cn.MetricConfig = append(cn.MetricConfig, &mcn) - } - - return cn -} - -func convertJob(job *JobMeta) { - // check if source data is available, otherwise skip job - src_data_path := getPath(job, srcPath, "data.json") - info, err := os.Stat(src_data_path) - if err != nil { - log.Fatal(err) - } - if info.Size() == 0 { - fmt.Printf("Skip path %s, filesize is 0 Bytes.", src_data_path) - return - } - - path := getPath(job, dstPath, "meta.json") - err = os.MkdirAll(filepath.Dir(path), 0750) - if err != nil { - log.Fatal(err) - } - f, err := os.Create(path) - if err != nil { - log.Fatal(err) - } - - jmn := deepCopyJobMeta(job) - if err = EncodeJobMeta(f, &jmn); err != nil { - log.Fatal(err) - } - if err = f.Close(); err != nil { - log.Fatal(err) - } - - f, err = os.Create(getPath(job, dstPath, "data.json")) - if err != nil { - log.Fatal(err) - } - - var jd *JobData - jd, err = loadJobData(src_data_path) - if err != nil { - log.Fatal(err) - } - jdn := deepCopyJobData(jd, job.Cluster, job.SubCluster) - if err := EncodeJobData(f, jdn); err != nil { - log.Fatal(err) - } - if err := f.Close(); err != nil { - log.Fatal(err) - } -} - -func main() { - var flagLogLevel, flagConfigFile string - var flagLogDateTime, debug bool - - flag.BoolVar(&flagLogDateTime, "logdate", false, "Set this flag to add date and time to log messages") - flag.BoolVar(&debug, "debug", false, "Set this flag to force sequential execution for debugging") - flag.StringVar(&flagLogLevel, "loglevel", "warn", "Sets the logging level: `[debug,info,warn (default),err,fatal,crit]`") - flag.StringVar(&flagConfigFile, "config", "./config.json", "Specify alternative path to `config.json`") - flag.StringVar(&srcPath, "src", "./var/job-archive", "Specify the source job archive path") - flag.StringVar(&dstPath, "dst", "./var/job-archive-new", "Specify the destination job archive path") - flag.Parse() - - if _, err := os.Stat(filepath.Join(srcPath, "version.txt")); !errors.Is(err, os.ErrNotExist) { - log.Fatal("Archive version exists!") - } - - log.Init(flagLogLevel, flagLogDateTime) - config.Init(flagConfigFile) - srcConfig := fmt.Sprintf("{\"path\": \"%s\"}", srcPath) - err := ar.Init(json.RawMessage(srcConfig)) - if err != nil { - log.Fatal(err) - } - - err = initClusterConfig() - if err != nil { - log.Fatal(err) - } - // setup new job archive - err = os.Mkdir(dstPath, 0750) - if err != nil { - log.Fatal(err) - } - - for _, c := range Clusters { - path := fmt.Sprintf("%s/%s", dstPath, c.Name) - fmt.Println(path) - err = os.Mkdir(path, 0750) - if err != nil { - log.Fatal(err) - } - cn := deepCopyClusterConfig(c) - - f, err := os.Create(fmt.Sprintf("%s/%s/cluster.json", dstPath, c.Name)) - if err != nil { - log.Fatal(err) - } - if err := EncodeCluster(f, &cn); err != nil { - log.Fatal(err) - } - if err := f.Close(); err != nil { - log.Fatal(err) - } - } - - var wg sync.WaitGroup - - for job := range ar.Iter() { - if debug { - fmt.Printf("Job %d\n", job.JobID) - convertJob(job) - } else { - job := job - wg.Add(1) - - go func() { - defer wg.Done() - convertJob(job) - }() - } - } - - wg.Wait() - os.WriteFile(filepath.Join(dstPath, "version.txt"), []byte(fmt.Sprintf("%d", Version)), 0644) -} diff --git a/tools/archive-migration/metrics.go b/tools/archive-migration/metrics.go deleted file mode 100644 index ec5de6f..0000000 --- a/tools/archive-migration/metrics.go +++ /dev/null @@ -1,65 +0,0 @@ -// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. -// All rights reserved. -// Use of this source code is governed by a MIT-style -// license that can be found in the LICENSE file. -package main - -import ( - "github.com/ClusterCockpit/cc-backend/pkg/schema" -) - -type JobData map[string]map[schema.MetricScope]*JobMetric - -type JobMetric struct { - Unit string `json:"unit"` - Scope schema.MetricScope `json:"scope"` - Timestep int `json:"timestep"` - Series []Series `json:"series"` - StatisticsSeries *StatsSeries `json:"statisticsSeries"` -} - -type Series struct { - Hostname string `json:"hostname"` - Id *int `json:"id,omitempty"` - Statistics *MetricStatistics `json:"statistics"` - Data []schema.Float `json:"data"` -} - -type MetricStatistics struct { - Avg float64 `json:"avg"` - Min float64 `json:"min"` - Max float64 `json:"max"` -} - -type StatsSeries struct { - Mean []Float `json:"mean"` - Min []Float `json:"min"` - Max []Float `json:"max"` - Percentiles map[int][]Float `json:"percentiles,omitempty"` -} - -// type MetricScope string - -// const ( -// MetricScopeInvalid MetricScope = "invalid_scope" - -// MetricScopeNode MetricScope = "node" -// MetricScopeSocket MetricScope = "socket" -// MetricScopeMemoryDomain MetricScope = "memoryDomain" -// MetricScopeCore MetricScope = "core" -// MetricScopeHWThread MetricScope = "hwthread" - -// MetricScopeAccelerator MetricScope = "accelerator" -// ) - -// var metricScopeGranularity map[MetricScope]int = map[MetricScope]int{ -// MetricScopeNode: 10, -// MetricScopeSocket: 5, -// MetricScopeMemoryDomain: 3, -// MetricScopeCore: 2, -// MetricScopeHWThread: 1, - -// MetricScopeAccelerator: 5, // Special/Randomly choosen - -// MetricScopeInvalid: -1, -// }