mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2024-12-25 21:09:05 +01:00
Add skeleton for job archive conversion tool
This commit is contained in:
parent
740d2a2928
commit
70b39730d2
174
tools/sanitize-archive/cluster.go
Normal file
174
tools/sanitize-archive/cluster.go
Normal file
@ -0,0 +1,174 @@
|
|||||||
|
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
|
||||||
|
// All rights reserved.
|
||||||
|
// Use of this source code is governed by a MIT-style
|
||||||
|
// license that can be found in the LICENSE file.
|
||||||
|
package main
|
||||||
|
|
||||||
|
import "strconv"
|
||||||
|
|
||||||
|
type Accelerator struct {
|
||||||
|
ID string `json:"id"`
|
||||||
|
Type string `json:"type"`
|
||||||
|
Model string `json:"model"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type Topology struct {
|
||||||
|
Node []int `json:"node"`
|
||||||
|
Socket [][]int `json:"socket"`
|
||||||
|
MemoryDomain [][]int `json:"memoryDomain"`
|
||||||
|
Die [][]int `json:"die"`
|
||||||
|
Core [][]int `json:"core"`
|
||||||
|
Accelerators []*Accelerator `json:"accelerators"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type SubCluster struct {
|
||||||
|
Name string `json:"name"`
|
||||||
|
Nodes string `json:"nodes"`
|
||||||
|
NumberOfNodes int `json:"numberOfNodes"`
|
||||||
|
ProcessorType string `json:"processorType"`
|
||||||
|
SocketsPerNode int `json:"socketsPerNode"`
|
||||||
|
CoresPerSocket int `json:"coresPerSocket"`
|
||||||
|
ThreadsPerCore int `json:"threadsPerCore"`
|
||||||
|
FlopRateScalar int `json:"flopRateScalar"`
|
||||||
|
FlopRateSimd int `json:"flopRateSimd"`
|
||||||
|
MemoryBandwidth int `json:"memoryBandwidth"`
|
||||||
|
Topology *Topology `json:"topology"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type SubClusterConfig struct {
|
||||||
|
Name string `json:"name"`
|
||||||
|
Peak float64 `json:"peak"`
|
||||||
|
Normal float64 `json:"normal"`
|
||||||
|
Caution float64 `json:"caution"`
|
||||||
|
Alert float64 `json:"alert"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type MetricConfig struct {
|
||||||
|
Name string `json:"name"`
|
||||||
|
Unit string `json:"unit"`
|
||||||
|
Scope MetricScope `json:"scope"`
|
||||||
|
Aggregation *string `json:"aggregation"`
|
||||||
|
Timestep int `json:"timestep"`
|
||||||
|
Peak *float64 `json:"peak"`
|
||||||
|
Normal *float64 `json:"normal"`
|
||||||
|
Caution *float64 `json:"caution"`
|
||||||
|
Alert *float64 `json:"alert"`
|
||||||
|
SubClusters []*SubClusterConfig `json:"subClusters"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type Cluster struct {
|
||||||
|
Name string `json:"name"`
|
||||||
|
MetricConfig []*MetricConfig `json:"metricConfig"`
|
||||||
|
SubClusters []*SubCluster `json:"subClusters"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// Return a list of socket IDs given a list of hwthread IDs. Even if just one
|
||||||
|
// hwthread is in that socket, add it to the list. If no hwthreads other than
|
||||||
|
// those in the argument list are assigned to one of the sockets in the first
|
||||||
|
// return value, return true as the second value. TODO: Optimize this, there
|
||||||
|
// must be a more efficient way/algorithm.
|
||||||
|
func (topo *Topology) GetSocketsFromHWThreads(
|
||||||
|
hwthreads []int) (sockets []int, exclusive bool) {
|
||||||
|
|
||||||
|
socketsMap := map[int]int{}
|
||||||
|
for _, hwthread := range hwthreads {
|
||||||
|
for socket, hwthreadsInSocket := range topo.Socket {
|
||||||
|
for _, hwthreadInSocket := range hwthreadsInSocket {
|
||||||
|
if hwthread == hwthreadInSocket {
|
||||||
|
socketsMap[socket] += 1
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
exclusive = true
|
||||||
|
hwthreadsPerSocket := len(topo.Node) / len(topo.Socket)
|
||||||
|
sockets = make([]int, 0, len(socketsMap))
|
||||||
|
for socket, count := range socketsMap {
|
||||||
|
sockets = append(sockets, socket)
|
||||||
|
exclusive = exclusive && count == hwthreadsPerSocket
|
||||||
|
}
|
||||||
|
|
||||||
|
return sockets, exclusive
|
||||||
|
}
|
||||||
|
|
||||||
|
// Return a list of core IDs given a list of hwthread IDs. Even if just one
|
||||||
|
// hwthread is in that core, add it to the list. If no hwthreads other than
|
||||||
|
// those in the argument list are assigned to one of the cores in the first
|
||||||
|
// return value, return true as the second value. TODO: Optimize this, there
|
||||||
|
// must be a more efficient way/algorithm.
|
||||||
|
func (topo *Topology) GetCoresFromHWThreads(
|
||||||
|
hwthreads []int) (cores []int, exclusive bool) {
|
||||||
|
|
||||||
|
coresMap := map[int]int{}
|
||||||
|
for _, hwthread := range hwthreads {
|
||||||
|
for core, hwthreadsInCore := range topo.Core {
|
||||||
|
for _, hwthreadInCore := range hwthreadsInCore {
|
||||||
|
if hwthread == hwthreadInCore {
|
||||||
|
coresMap[core] += 1
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
exclusive = true
|
||||||
|
hwthreadsPerCore := len(topo.Node) / len(topo.Core)
|
||||||
|
cores = make([]int, 0, len(coresMap))
|
||||||
|
for core, count := range coresMap {
|
||||||
|
cores = append(cores, core)
|
||||||
|
exclusive = exclusive && count == hwthreadsPerCore
|
||||||
|
}
|
||||||
|
|
||||||
|
return cores, exclusive
|
||||||
|
}
|
||||||
|
|
||||||
|
// Return a list of memory domain IDs given a list of hwthread IDs. Even if
|
||||||
|
// just one hwthread is in that memory domain, add it to the list. If no
|
||||||
|
// hwthreads other than those in the argument list are assigned to one of the
|
||||||
|
// memory domains in the first return value, return true as the second value.
|
||||||
|
// TODO: Optimize this, there must be a more efficient way/algorithm.
|
||||||
|
func (topo *Topology) GetMemoryDomainsFromHWThreads(
|
||||||
|
hwthreads []int) (memDoms []int, exclusive bool) {
|
||||||
|
|
||||||
|
memDomsMap := map[int]int{}
|
||||||
|
for _, hwthread := range hwthreads {
|
||||||
|
for memDom, hwthreadsInmemDom := range topo.MemoryDomain {
|
||||||
|
for _, hwthreadInmemDom := range hwthreadsInmemDom {
|
||||||
|
if hwthread == hwthreadInmemDom {
|
||||||
|
memDomsMap[memDom] += 1
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
exclusive = true
|
||||||
|
hwthreadsPermemDom := len(topo.Node) / len(topo.MemoryDomain)
|
||||||
|
memDoms = make([]int, 0, len(memDomsMap))
|
||||||
|
for memDom, count := range memDomsMap {
|
||||||
|
memDoms = append(memDoms, memDom)
|
||||||
|
exclusive = exclusive && count == hwthreadsPermemDom
|
||||||
|
}
|
||||||
|
|
||||||
|
return memDoms, exclusive
|
||||||
|
}
|
||||||
|
|
||||||
|
func (topo *Topology) GetAcceleratorIDs() ([]int, error) {
|
||||||
|
accels := make([]int, 0)
|
||||||
|
for _, accel := range topo.Accelerators {
|
||||||
|
id, err := strconv.Atoi(accel.ID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
accels = append(accels, id)
|
||||||
|
}
|
||||||
|
return accels, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (topo *Topology) GetAcceleratorIndex(id string) (int, bool) {
|
||||||
|
for idx, accel := range topo.Accelerators {
|
||||||
|
if accel.ID == id {
|
||||||
|
return idx, true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return -1, false
|
||||||
|
}
|
165
tools/sanitize-archive/clusterConfig.go
Normal file
165
tools/sanitize-archive/clusterConfig.go
Normal file
@ -0,0 +1,165 @@
|
|||||||
|
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
|
||||||
|
// All rights reserved.
|
||||||
|
// Use of this source code is governed by a MIT-style
|
||||||
|
// license that can be found in the LICENSE file.
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/ClusterCockpit/cc-backend/pkg/archive"
|
||||||
|
)
|
||||||
|
|
||||||
|
var Clusters []*Cluster
|
||||||
|
var nodeLists map[string]map[string]archive.NodeList
|
||||||
|
|
||||||
|
func initClusterConfig() error {
|
||||||
|
|
||||||
|
Clusters = []*Cluster{}
|
||||||
|
nodeLists = map[string]map[string]archive.NodeList{}
|
||||||
|
|
||||||
|
for _, c := range ar.GetClusters() {
|
||||||
|
|
||||||
|
cluster, err := ar.LoadClusterCfg(c)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(cluster.Name) == 0 ||
|
||||||
|
len(cluster.MetricConfig) == 0 ||
|
||||||
|
len(cluster.SubClusters) == 0 {
|
||||||
|
return errors.New("cluster.name, cluster.metricConfig and cluster.SubClusters should not be empty")
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, mc := range cluster.MetricConfig {
|
||||||
|
if len(mc.Name) == 0 {
|
||||||
|
return errors.New("cluster.metricConfig.name should not be empty")
|
||||||
|
}
|
||||||
|
if mc.Timestep < 1 {
|
||||||
|
return errors.New("cluster.metricConfig.timestep should not be smaller than one")
|
||||||
|
}
|
||||||
|
|
||||||
|
// For backwards compability...
|
||||||
|
if mc.Scope == "" {
|
||||||
|
mc.Scope = MetricScopeNode
|
||||||
|
}
|
||||||
|
if !mc.Scope.Valid() {
|
||||||
|
return errors.New("cluster.metricConfig.scope must be a valid scope ('node', 'scocket', ...)")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Clusters = append(Clusters, cluster)
|
||||||
|
|
||||||
|
nodeLists[cluster.Name] = make(map[string]archive.NodeList)
|
||||||
|
for _, sc := range cluster.SubClusters {
|
||||||
|
if sc.Nodes == "" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
nl, err := archive.ParseNodeList(sc.Nodes)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("in %s/cluster.json: %w", cluster.Name, err)
|
||||||
|
}
|
||||||
|
nodeLists[cluster.Name][sc.Name] = nl
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func GetCluster(cluster string) *Cluster {
|
||||||
|
|
||||||
|
for _, c := range Clusters {
|
||||||
|
if c.Name == cluster {
|
||||||
|
return c
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func GetSubCluster(cluster, subcluster string) *SubCluster {
|
||||||
|
|
||||||
|
for _, c := range Clusters {
|
||||||
|
if c.Name == cluster {
|
||||||
|
for _, p := range c.SubClusters {
|
||||||
|
if p.Name == subcluster {
|
||||||
|
return p
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func GetMetricConfig(cluster, metric string) *MetricConfig {
|
||||||
|
|
||||||
|
for _, c := range Clusters {
|
||||||
|
if c.Name == cluster {
|
||||||
|
for _, m := range c.MetricConfig {
|
||||||
|
if m.Name == metric {
|
||||||
|
return m
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// AssignSubCluster sets the `job.subcluster` property of the job based
|
||||||
|
// on its cluster and resources.
|
||||||
|
func AssignSubCluster(job *BaseJob) error {
|
||||||
|
|
||||||
|
cluster := GetCluster(job.Cluster)
|
||||||
|
if cluster == nil {
|
||||||
|
return fmt.Errorf("unkown cluster: %#v", job.Cluster)
|
||||||
|
}
|
||||||
|
|
||||||
|
if job.SubCluster != "" {
|
||||||
|
for _, sc := range cluster.SubClusters {
|
||||||
|
if sc.Name == job.SubCluster {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return fmt.Errorf("already assigned subcluster %#v unkown (cluster: %#v)", job.SubCluster, job.Cluster)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(job.Resources) == 0 {
|
||||||
|
return fmt.Errorf("job without any resources/hosts")
|
||||||
|
}
|
||||||
|
|
||||||
|
host0 := job.Resources[0].Hostname
|
||||||
|
for sc, nl := range nodeLists[job.Cluster] {
|
||||||
|
if nl != nil && nl.Contains(host0) {
|
||||||
|
job.SubCluster = sc
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if cluster.SubClusters[0].Nodes == "" {
|
||||||
|
job.SubCluster = cluster.SubClusters[0].Name
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return fmt.Errorf("no subcluster found for cluster %#v and host %#v", job.Cluster, host0)
|
||||||
|
}
|
||||||
|
|
||||||
|
func GetSubClusterByNode(cluster, hostname string) (string, error) {
|
||||||
|
|
||||||
|
for sc, nl := range nodeLists[cluster] {
|
||||||
|
if nl != nil && nl.Contains(hostname) {
|
||||||
|
return sc, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
c := GetCluster(cluster)
|
||||||
|
if c == nil {
|
||||||
|
return "", fmt.Errorf("unkown cluster: %#v", cluster)
|
||||||
|
}
|
||||||
|
|
||||||
|
if c.SubClusters[0].Nodes == "" {
|
||||||
|
return c.SubClusters[0].Name, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return "", fmt.Errorf("no subcluster found for cluster %#v and host %#v", cluster, hostname)
|
||||||
|
}
|
109
tools/sanitize-archive/float.go
Normal file
109
tools/sanitize-archive/float.go
Normal file
@ -0,0 +1,109 @@
|
|||||||
|
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
|
||||||
|
// All rights reserved.
|
||||||
|
// Use of this source code is governed by a MIT-style
|
||||||
|
// license that can be found in the LICENSE file.
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"io"
|
||||||
|
"math"
|
||||||
|
"strconv"
|
||||||
|
)
|
||||||
|
|
||||||
|
// A custom float type is used so that (Un)MarshalJSON and
|
||||||
|
// (Un)MarshalGQL can be overloaded and NaN/null can be used.
|
||||||
|
// The default behaviour of putting every nullable value behind
|
||||||
|
// a pointer has a bigger overhead.
|
||||||
|
type Float float64
|
||||||
|
|
||||||
|
var NaN Float = Float(math.NaN())
|
||||||
|
var nullAsBytes []byte = []byte("null")
|
||||||
|
|
||||||
|
func (f Float) IsNaN() bool {
|
||||||
|
return math.IsNaN(float64(f))
|
||||||
|
}
|
||||||
|
|
||||||
|
// NaN will be serialized to `null`.
|
||||||
|
func (f Float) MarshalJSON() ([]byte, error) {
|
||||||
|
if f.IsNaN() {
|
||||||
|
return nullAsBytes, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return strconv.AppendFloat(make([]byte, 0, 10), float64(f), 'f', 2, 64), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// `null` will be unserialized to NaN.
|
||||||
|
func (f *Float) UnmarshalJSON(input []byte) error {
|
||||||
|
s := string(input)
|
||||||
|
if s == "null" {
|
||||||
|
*f = NaN
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
val, err := strconv.ParseFloat(s, 64)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
*f = Float(val)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// UnmarshalGQL implements the graphql.Unmarshaler interface.
|
||||||
|
func (f *Float) UnmarshalGQL(v interface{}) error {
|
||||||
|
f64, ok := v.(float64)
|
||||||
|
if !ok {
|
||||||
|
return errors.New("invalid Float scalar")
|
||||||
|
}
|
||||||
|
|
||||||
|
*f = Float(f64)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// MarshalGQL implements the graphql.Marshaler interface.
|
||||||
|
// NaN will be serialized to `null`.
|
||||||
|
func (f Float) MarshalGQL(w io.Writer) {
|
||||||
|
if f.IsNaN() {
|
||||||
|
w.Write(nullAsBytes)
|
||||||
|
} else {
|
||||||
|
w.Write(strconv.AppendFloat(make([]byte, 0, 10), float64(f), 'f', 2, 64))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Only used via REST-API, not via GraphQL.
|
||||||
|
// This uses a lot less allocations per series,
|
||||||
|
// but it turns out that the performance increase
|
||||||
|
// from using this is not that big.
|
||||||
|
func (s *Series) MarshalJSON() ([]byte, error) {
|
||||||
|
buf := make([]byte, 0, 512+len(s.Data)*8)
|
||||||
|
buf = append(buf, `{"hostname":"`...)
|
||||||
|
buf = append(buf, s.Hostname...)
|
||||||
|
buf = append(buf, '"')
|
||||||
|
if s.Id != nil {
|
||||||
|
buf = append(buf, `,"id":`...)
|
||||||
|
buf = strconv.AppendInt(buf, int64(*s.Id), 10)
|
||||||
|
}
|
||||||
|
if s.Statistics != nil {
|
||||||
|
buf = append(buf, `,"statistics":{"min":`...)
|
||||||
|
buf = strconv.AppendFloat(buf, s.Statistics.Min, 'f', 2, 64)
|
||||||
|
buf = append(buf, `,"avg":`...)
|
||||||
|
buf = strconv.AppendFloat(buf, s.Statistics.Avg, 'f', 2, 64)
|
||||||
|
buf = append(buf, `,"max":`...)
|
||||||
|
buf = strconv.AppendFloat(buf, s.Statistics.Max, 'f', 2, 64)
|
||||||
|
buf = append(buf, '}')
|
||||||
|
}
|
||||||
|
buf = append(buf, `,"data":[`...)
|
||||||
|
for i := 0; i < len(s.Data); i++ {
|
||||||
|
if i != 0 {
|
||||||
|
buf = append(buf, ',')
|
||||||
|
}
|
||||||
|
|
||||||
|
if s.Data[i].IsNaN() {
|
||||||
|
buf = append(buf, `null`...)
|
||||||
|
} else {
|
||||||
|
buf = strconv.AppendFloat(buf, float64(s.Data[i]), 'f', 2, 32)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
buf = append(buf, ']', '}')
|
||||||
|
return buf, nil
|
||||||
|
}
|
221
tools/sanitize-archive/fsBackend.go
Normal file
221
tools/sanitize-archive/fsBackend.go
Normal file
@ -0,0 +1,221 @@
|
|||||||
|
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
|
||||||
|
// All rights reserved.
|
||||||
|
// Use of this source code is governed by a MIT-style
|
||||||
|
// license that can be found in the LICENSE file.
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bufio"
|
||||||
|
"bytes"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"path"
|
||||||
|
"path/filepath"
|
||||||
|
"strconv"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
||||||
|
)
|
||||||
|
|
||||||
|
type FsArchiveConfig struct {
|
||||||
|
Path string `json:"path"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type FsArchive struct {
|
||||||
|
path string
|
||||||
|
clusters []string
|
||||||
|
}
|
||||||
|
|
||||||
|
func getPath(
|
||||||
|
job *Job,
|
||||||
|
rootPath string,
|
||||||
|
file string) string {
|
||||||
|
|
||||||
|
lvl1, lvl2 := fmt.Sprintf("%d", job.JobID/1000), fmt.Sprintf("%03d", job.JobID%1000)
|
||||||
|
return filepath.Join(
|
||||||
|
rootPath,
|
||||||
|
job.Cluster,
|
||||||
|
lvl1, lvl2,
|
||||||
|
strconv.FormatInt(job.StartTime.Unix(), 10), file)
|
||||||
|
}
|
||||||
|
|
||||||
|
func loadJobMeta(filename string) (*JobMeta, error) {
|
||||||
|
|
||||||
|
f, err := os.Open(filename)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("fsBackend loadJobMeta()- %v", err)
|
||||||
|
return &JobMeta{}, err
|
||||||
|
}
|
||||||
|
defer f.Close()
|
||||||
|
|
||||||
|
return DecodeJobMeta(bufio.NewReader(f))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fsa *FsArchive) Init(rawConfig json.RawMessage) error {
|
||||||
|
|
||||||
|
var config FsArchiveConfig
|
||||||
|
if err := json.Unmarshal(rawConfig, &config); err != nil {
|
||||||
|
log.Errorf("fsBackend Init()- %v", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if config.Path == "" {
|
||||||
|
err := fmt.Errorf("fsBackend Init()- empty path")
|
||||||
|
log.Errorf("fsBackend Init()- %v", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
fsa.path = config.Path
|
||||||
|
|
||||||
|
entries, err := os.ReadDir(fsa.path)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("fsBackend Init()- %v", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, de := range entries {
|
||||||
|
fsa.clusters = append(fsa.clusters, de.Name())
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fsa *FsArchive) LoadJobData(job *Job) (JobData, error) {
|
||||||
|
|
||||||
|
filename := getPath(job, fsa.path, "data.json")
|
||||||
|
f, err := os.Open(filename)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("fsBackend LoadJobData()- %v", err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer f.Close()
|
||||||
|
|
||||||
|
return DecodeJobData(bufio.NewReader(f), filename)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fsa *FsArchive) LoadJobMeta(job *Job) (*JobMeta, error) {
|
||||||
|
|
||||||
|
filename := getPath(job, fsa.path, "meta.json")
|
||||||
|
return loadJobMeta(filename)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fsa *FsArchive) LoadClusterCfg(name string) (*Cluster, error) {
|
||||||
|
|
||||||
|
b, err := os.ReadFile(filepath.Join(fsa.path, name, "cluster.json"))
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("fsBackend LoadClusterCfg()- %v", err)
|
||||||
|
return &Cluster{}, err
|
||||||
|
}
|
||||||
|
return DecodeCluster(bytes.NewReader(b))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fsa *FsArchive) Iter() <-chan *JobMeta {
|
||||||
|
|
||||||
|
ch := make(chan *JobMeta)
|
||||||
|
go func() {
|
||||||
|
clustersDir, err := os.ReadDir(fsa.path)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Reading clusters failed: %s", err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, clusterDir := range clustersDir {
|
||||||
|
lvl1Dirs, err := os.ReadDir(filepath.Join(fsa.path, clusterDir.Name()))
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Reading jobs failed: %s", err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, lvl1Dir := range lvl1Dirs {
|
||||||
|
if !lvl1Dir.IsDir() {
|
||||||
|
// Could be the cluster.json file
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
lvl2Dirs, err := os.ReadDir(filepath.Join(fsa.path, clusterDir.Name(), lvl1Dir.Name()))
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Reading jobs failed: %s", err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, lvl2Dir := range lvl2Dirs {
|
||||||
|
dirpath := filepath.Join(fsa.path, clusterDir.Name(), lvl1Dir.Name(), lvl2Dir.Name())
|
||||||
|
startTimeDirs, err := os.ReadDir(dirpath)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Reading jobs failed: %s", err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, startTimeDir := range startTimeDirs {
|
||||||
|
if startTimeDir.IsDir() {
|
||||||
|
job, err := loadJobMeta(filepath.Join(dirpath, startTimeDir.Name(), "meta.json"))
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("in %s: %s", filepath.Join(dirpath, startTimeDir.Name()), err.Error())
|
||||||
|
} else {
|
||||||
|
ch <- job
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
close(ch)
|
||||||
|
}()
|
||||||
|
return ch
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fsa *FsArchive) StoreJobMeta(jobMeta *JobMeta) error {
|
||||||
|
|
||||||
|
job := Job{
|
||||||
|
BaseJob: jobMeta.BaseJob,
|
||||||
|
StartTime: time.Unix(jobMeta.StartTime, 0),
|
||||||
|
StartTimeUnix: jobMeta.StartTime,
|
||||||
|
}
|
||||||
|
f, err := os.Create(getPath(&job, fsa.path, "meta.json"))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := EncodeJobMeta(f, jobMeta); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := f.Close(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fsa *FsArchive) GetClusters() []string {
|
||||||
|
|
||||||
|
return fsa.clusters
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fsa *FsArchive) ImportJob(
|
||||||
|
jobMeta *JobMeta,
|
||||||
|
jobData *JobData) error {
|
||||||
|
|
||||||
|
job := Job{
|
||||||
|
BaseJob: jobMeta.BaseJob,
|
||||||
|
StartTime: time.Unix(jobMeta.StartTime, 0),
|
||||||
|
StartTimeUnix: jobMeta.StartTime,
|
||||||
|
}
|
||||||
|
dir := getPath(&job, fsa.path, "")
|
||||||
|
if err := os.MkdirAll(dir, 0777); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
f, err := os.Create(path.Join(dir, "meta.json"))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := EncodeJobMeta(f, jobMeta); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := f.Close(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
f, err = os.Create(path.Join(dir, "data.json"))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := EncodeJobData(f, jobData); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return f.Close()
|
||||||
|
}
|
160
tools/sanitize-archive/job.go
Normal file
160
tools/sanitize-archive/job.go
Normal file
@ -0,0 +1,160 @@
|
|||||||
|
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
|
||||||
|
// All rights reserved.
|
||||||
|
// Use of this source code is governed by a MIT-style
|
||||||
|
// license that can be found in the LICENSE file.
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Non-Swaggered Comment: BaseJob
|
||||||
|
// Non-Swaggered Comment: Common subset of Job and JobMeta. Use one of those, not this type directly.
|
||||||
|
|
||||||
|
type BaseJob struct {
|
||||||
|
// The unique identifier of a job
|
||||||
|
JobID int64 `json:"jobId" db:"job_id" example:"123000"`
|
||||||
|
User string `json:"user" db:"user" example:"abcd100h"` // The unique identifier of a user
|
||||||
|
Project string `json:"project" db:"project" example:"abcd200"` // The unique identifier of a project
|
||||||
|
Cluster string `json:"cluster" db:"cluster" example:"fritz"` // The unique identifier of a cluster
|
||||||
|
SubCluster string `json:"subCluster" db:"subcluster" example:"main"` // The unique identifier of a sub cluster
|
||||||
|
Partition string `json:"partition" db:"partition" example:"main"` // The Slurm partition to which the job was submitted
|
||||||
|
ArrayJobId int64 `json:"arrayJobId" db:"array_job_id" example:"123000"` // The unique identifier of an array job
|
||||||
|
NumNodes int32 `json:"numNodes" db:"num_nodes" example:"2" minimum:"1"` // Number of nodes used (Min > 0)
|
||||||
|
NumHWThreads int32 `json:"numHwthreads" db:"num_hwthreads" example:"20" minimum:"1"` // Number of HWThreads used (Min > 0)
|
||||||
|
NumAcc int32 `json:"numAcc" db:"num_acc" example:"2" minimum:"1"` // Number of accelerators used (Min > 0)
|
||||||
|
Exclusive int32 `json:"exclusive" db:"exclusive" example:"1" minimum:"0" maximum:"2"` // Specifies how nodes are shared: 0 - Shared among multiple jobs of multiple users, 1 - Job exclusive (Default), 2 - Shared among multiple jobs of same user
|
||||||
|
MonitoringStatus int32 `json:"monitoringStatus" db:"monitoring_status" example:"1" minimum:"0" maximum:"3"` // State of monitoring system during job run: 0 - Disabled, 1 - Running or Archiving (Default), 2 - Archiving Failed, 3 - Archiving Successfull
|
||||||
|
SMT int32 `json:"smt" db:"smt" example:"4"` // SMT threads used by job
|
||||||
|
State JobState `json:"jobState" db:"job_state" example:"completed" enums:"completed,failed,cancelled,stopped,timeout,out_of_memory"` // Final state of job
|
||||||
|
Duration int32 `json:"duration" db:"duration" example:"43200" minimum:"1"` // Duration of job in seconds (Min > 0)
|
||||||
|
Walltime int64 `json:"walltime" db:"walltime" example:"86400" minimum:"1"` // Requested walltime of job in seconds (Min > 0)
|
||||||
|
Tags []*Tag `json:"tags"` // List of tags
|
||||||
|
RawResources []byte `json:"-" db:"resources"` // Resources used by job [As Bytes]
|
||||||
|
Resources []*Resource `json:"resources"` // Resources used by job
|
||||||
|
RawMetaData []byte `json:"-" db:"meta_data"` // Additional information about the job [As Bytes]
|
||||||
|
MetaData map[string]string `json:"metaData"` // Additional information about the job
|
||||||
|
}
|
||||||
|
|
||||||
|
// Non-Swaggered Comment: Job
|
||||||
|
// Non-Swaggered Comment: This type is used as the GraphQL interface and using sqlx as a table row.
|
||||||
|
|
||||||
|
// Job model
|
||||||
|
// @Description Information of a HPC job.
|
||||||
|
type Job struct {
|
||||||
|
// The unique identifier of a job in the database
|
||||||
|
ID int64 `json:"id" db:"id"`
|
||||||
|
BaseJob
|
||||||
|
StartTimeUnix int64 `json:"-" db:"start_time" example:"1649723812"` // Start epoch time stamp in seconds
|
||||||
|
StartTime time.Time `json:"startTime"` // Start time as 'time.Time' data type
|
||||||
|
MemUsedMax float64 `json:"-" db:"mem_used_max"` // MemUsedMax as Float64
|
||||||
|
FlopsAnyAvg float64 `json:"-" db:"flops_any_avg"` // FlopsAnyAvg as Float64
|
||||||
|
MemBwAvg float64 `json:"-" db:"mem_bw_avg"` // MemBwAvg as Float64
|
||||||
|
LoadAvg float64 `json:"-" db:"load_avg"` // LoadAvg as Float64
|
||||||
|
NetBwAvg float64 `json:"-" db:"net_bw_avg"` // NetBwAvg as Float64
|
||||||
|
NetDataVolTotal float64 `json:"-" db:"net_data_vol_total"` // NetDataVolTotal as Float64
|
||||||
|
FileBwAvg float64 `json:"-" db:"file_bw_avg"` // FileBwAvg as Float64
|
||||||
|
FileDataVolTotal float64 `json:"-" db:"file_data_vol_total"` // FileDataVolTotal as Float64
|
||||||
|
}
|
||||||
|
|
||||||
|
// Non-Swaggered Comment: JobMeta
|
||||||
|
// Non-Swaggered Comment: When reading from the database or sending data via GraphQL, the start time can be in the much more
|
||||||
|
// Non-Swaggered Comment: convenient time.Time type. In the `meta.json` files, the start time is encoded as a unix epoch timestamp.
|
||||||
|
// Non-Swaggered Comment: This is why there is this struct, which contains all fields from the regular job struct, but "overwrites"
|
||||||
|
// Non-Swaggered Comment: the StartTime field with one of type int64.
|
||||||
|
// Non-Swaggered Comment: ID *int64 `json:"id,omitempty"` >> never used in the job-archive, only available via REST-API
|
||||||
|
|
||||||
|
// JobMeta model
|
||||||
|
// @Description Meta data information of a HPC job.
|
||||||
|
type JobMeta struct {
|
||||||
|
// The unique identifier of a job in the database
|
||||||
|
ID *int64 `json:"id,omitempty"`
|
||||||
|
BaseJob
|
||||||
|
StartTime int64 `json:"startTime" db:"start_time" example:"1649723812" minimum:"1"` // Start epoch time stamp in seconds (Min > 0)
|
||||||
|
Statistics map[string]JobStatistics `json:"statistics,omitempty"` // Metric statistics of job
|
||||||
|
}
|
||||||
|
|
||||||
|
const (
|
||||||
|
MonitoringStatusDisabled int32 = 0
|
||||||
|
MonitoringStatusRunningOrArchiving int32 = 1
|
||||||
|
MonitoringStatusArchivingFailed int32 = 2
|
||||||
|
MonitoringStatusArchivingSuccessful int32 = 3
|
||||||
|
)
|
||||||
|
|
||||||
|
var JobDefaults BaseJob = BaseJob{
|
||||||
|
Exclusive: 1,
|
||||||
|
MonitoringStatus: MonitoringStatusRunningOrArchiving,
|
||||||
|
}
|
||||||
|
|
||||||
|
// JobStatistics model
|
||||||
|
// @Description Specification for job metric statistics.
|
||||||
|
type JobStatistics struct {
|
||||||
|
// Metric unit (see schema/unit.schema.json)
|
||||||
|
Unit string `json:"unit" example:"GHz"`
|
||||||
|
Avg float64 `json:"avg" example:"2500" minimum:"0"` // Job metric average
|
||||||
|
Min float64 `json:"min" example:"2000" minimum:"0"` // Job metric minimum
|
||||||
|
Max float64 `json:"max" example:"3000" minimum:"0"` // Job metric maximum
|
||||||
|
}
|
||||||
|
|
||||||
|
// Tag model
|
||||||
|
// @Description Defines a tag using name and type.
|
||||||
|
type Tag struct {
|
||||||
|
// The unique DB identifier of a tag
|
||||||
|
ID int64 `json:"id" db:"id"`
|
||||||
|
Type string `json:"type" db:"tag_type" example:"Debug"` // Tag Type
|
||||||
|
Name string `json:"name" db:"tag_name" example:"Testjob"` // Tag Name
|
||||||
|
}
|
||||||
|
|
||||||
|
// Resource model
|
||||||
|
// @Description A resource used by a job
|
||||||
|
type Resource struct {
|
||||||
|
Hostname string `json:"hostname"` // Name of the host (= node)
|
||||||
|
HWThreads []int `json:"hwthreads,omitempty"` // List of OS processor ids
|
||||||
|
Accelerators []string `json:"accelerators,omitempty"` // List of of accelerator device ids
|
||||||
|
Configuration string `json:"configuration,omitempty"` // The configuration options of the node
|
||||||
|
}
|
||||||
|
|
||||||
|
type JobState string
|
||||||
|
|
||||||
|
const (
|
||||||
|
JobStateRunning JobState = "running"
|
||||||
|
JobStateCompleted JobState = "completed"
|
||||||
|
JobStateFailed JobState = "failed"
|
||||||
|
JobStateCancelled JobState = "cancelled"
|
||||||
|
JobStateStopped JobState = "stopped"
|
||||||
|
JobStateTimeout JobState = "timeout"
|
||||||
|
JobStatePreempted JobState = "preempted"
|
||||||
|
JobStateOutOfMemory JobState = "out_of_memory"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (e *JobState) UnmarshalGQL(v interface{}) error {
|
||||||
|
str, ok := v.(string)
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("enums must be strings")
|
||||||
|
}
|
||||||
|
|
||||||
|
*e = JobState(str)
|
||||||
|
if !e.Valid() {
|
||||||
|
return errors.New("invalid job state")
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e JobState) MarshalGQL(w io.Writer) {
|
||||||
|
fmt.Fprintf(w, "\"%s\"", e)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e JobState) Valid() bool {
|
||||||
|
return e == JobStateRunning ||
|
||||||
|
e == JobStateCompleted ||
|
||||||
|
e == JobStateFailed ||
|
||||||
|
e == JobStateCancelled ||
|
||||||
|
e == JobStateStopped ||
|
||||||
|
e == JobStateTimeout ||
|
||||||
|
e == JobStatePreempted ||
|
||||||
|
e == JobStateOutOfMemory
|
||||||
|
}
|
59
tools/sanitize-archive/json.go
Normal file
59
tools/sanitize-archive/json.go
Normal file
@ -0,0 +1,59 @@
|
|||||||
|
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
|
||||||
|
// All rights reserved.
|
||||||
|
// Use of this source code is governed by a MIT-style
|
||||||
|
// license that can be found in the LICENSE file.
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"io"
|
||||||
|
)
|
||||||
|
|
||||||
|
func DecodeJobData(r io.Reader, k string) (JobData, error) {
|
||||||
|
var d JobData
|
||||||
|
if err := json.NewDecoder(r).Decode(&d); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return d, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func DecodeJobMeta(r io.Reader) (*JobMeta, error) {
|
||||||
|
var d JobMeta
|
||||||
|
if err := json.NewDecoder(r).Decode(&d); err != nil {
|
||||||
|
return &d, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sanitize parameters
|
||||||
|
|
||||||
|
return &d, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func DecodeCluster(r io.Reader) (*Cluster, error) {
|
||||||
|
var c Cluster
|
||||||
|
if err := json.NewDecoder(r).Decode(&c); err != nil {
|
||||||
|
return &c, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sanitize parameters
|
||||||
|
|
||||||
|
return &c, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func EncodeJobData(w io.Writer, d *JobData) error {
|
||||||
|
// Sanitize parameters
|
||||||
|
if err := json.NewEncoder(w).Encode(d); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func EncodeJobMeta(w io.Writer, d *JobMeta) error {
|
||||||
|
// Sanitize parameters
|
||||||
|
if err := json.NewEncoder(w).Encode(d); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
32
tools/sanitize-archive/main.go
Normal file
32
tools/sanitize-archive/main.go
Normal file
@ -0,0 +1,32 @@
|
|||||||
|
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
|
||||||
|
// All rights reserved.
|
||||||
|
// Use of this source code is governed by a MIT-style
|
||||||
|
// license that can be found in the LICENSE file.
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"flag"
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
)
|
||||||
|
|
||||||
|
var ar FsArchive
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
var srcPath string
|
||||||
|
var dstPath string
|
||||||
|
|
||||||
|
flag.StringVar(&srcPath, "s", "./var/job-archive", "Specify the source job archive path. Default is ./var/job-archive")
|
||||||
|
flag.StringVar(&dstPath, "d", "./var/job-archive-new", "Specify the destination job archive path. Default is ./var/job-archive-new")
|
||||||
|
|
||||||
|
srcConfig := fmt.Sprintf("{\"path\": \"%s\"}", srcPath)
|
||||||
|
err := ar.Init(json.RawMessage(srcConfig))
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for job := range ar.Iter() {
|
||||||
|
fmt.Printf("Job %d\n", job.JobID)
|
||||||
|
}
|
||||||
|
}
|
323
tools/sanitize-archive/metrics.go
Normal file
323
tools/sanitize-archive/metrics.go
Normal file
@ -0,0 +1,323 @@
|
|||||||
|
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
|
||||||
|
// All rights reserved.
|
||||||
|
// Use of this source code is governed by a MIT-style
|
||||||
|
// license that can be found in the LICENSE file.
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"math"
|
||||||
|
"sort"
|
||||||
|
"unsafe"
|
||||||
|
)
|
||||||
|
|
||||||
|
type JobData map[string]map[MetricScope]*JobMetric
|
||||||
|
|
||||||
|
type JobMetric struct {
|
||||||
|
Unit string `json:"unit"`
|
||||||
|
Scope MetricScope `json:"scope"`
|
||||||
|
Timestep int `json:"timestep"`
|
||||||
|
Series []Series `json:"series"`
|
||||||
|
StatisticsSeries *StatsSeries `json:"statisticsSeries"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type Series struct {
|
||||||
|
Hostname string `json:"hostname"`
|
||||||
|
Id *int `json:"id,omitempty"`
|
||||||
|
Statistics *MetricStatistics `json:"statistics"`
|
||||||
|
Data []Float `json:"data"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type MetricStatistics struct {
|
||||||
|
Avg float64 `json:"avg"`
|
||||||
|
Min float64 `json:"min"`
|
||||||
|
Max float64 `json:"max"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type StatsSeries struct {
|
||||||
|
Mean []Float `json:"mean"`
|
||||||
|
Min []Float `json:"min"`
|
||||||
|
Max []Float `json:"max"`
|
||||||
|
Percentiles map[int][]Float `json:"percentiles,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type MetricScope string
|
||||||
|
|
||||||
|
const (
|
||||||
|
MetricScopeInvalid MetricScope = "invalid_scope"
|
||||||
|
|
||||||
|
MetricScopeNode MetricScope = "node"
|
||||||
|
MetricScopeSocket MetricScope = "socket"
|
||||||
|
MetricScopeMemoryDomain MetricScope = "memoryDomain"
|
||||||
|
MetricScopeCore MetricScope = "core"
|
||||||
|
MetricScopeHWThread MetricScope = "hwthread"
|
||||||
|
|
||||||
|
MetricScopeAccelerator MetricScope = "accelerator"
|
||||||
|
)
|
||||||
|
|
||||||
|
var metricScopeGranularity map[MetricScope]int = map[MetricScope]int{
|
||||||
|
MetricScopeNode: 10,
|
||||||
|
MetricScopeSocket: 5,
|
||||||
|
MetricScopeMemoryDomain: 3,
|
||||||
|
MetricScopeCore: 2,
|
||||||
|
MetricScopeHWThread: 1,
|
||||||
|
|
||||||
|
MetricScopeAccelerator: 5, // Special/Randomly choosen
|
||||||
|
|
||||||
|
MetricScopeInvalid: -1,
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *MetricScope) LT(other MetricScope) bool {
|
||||||
|
a := metricScopeGranularity[*e]
|
||||||
|
b := metricScopeGranularity[other]
|
||||||
|
return a < b
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *MetricScope) LTE(other MetricScope) bool {
|
||||||
|
a := metricScopeGranularity[*e]
|
||||||
|
b := metricScopeGranularity[other]
|
||||||
|
return a <= b
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *MetricScope) Max(other MetricScope) MetricScope {
|
||||||
|
a := metricScopeGranularity[*e]
|
||||||
|
b := metricScopeGranularity[other]
|
||||||
|
if a > b {
|
||||||
|
return *e
|
||||||
|
}
|
||||||
|
return other
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *MetricScope) UnmarshalGQL(v interface{}) error {
|
||||||
|
str, ok := v.(string)
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("enums must be strings")
|
||||||
|
}
|
||||||
|
|
||||||
|
*e = MetricScope(str)
|
||||||
|
if !e.Valid() {
|
||||||
|
return fmt.Errorf("%s is not a valid MetricScope", str)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e MetricScope) MarshalGQL(w io.Writer) {
|
||||||
|
fmt.Fprintf(w, "\"%s\"", e)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e MetricScope) Valid() bool {
|
||||||
|
gran, ok := metricScopeGranularity[e]
|
||||||
|
return ok && gran > 0
|
||||||
|
}
|
||||||
|
|
||||||
|
func (jd *JobData) Size() int {
|
||||||
|
n := 128
|
||||||
|
for _, scopes := range *jd {
|
||||||
|
for _, metric := range scopes {
|
||||||
|
if metric.StatisticsSeries != nil {
|
||||||
|
n += len(metric.StatisticsSeries.Max)
|
||||||
|
n += len(metric.StatisticsSeries.Mean)
|
||||||
|
n += len(metric.StatisticsSeries.Min)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, series := range metric.Series {
|
||||||
|
n += len(series.Data)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return n * int(unsafe.Sizeof(Float(0)))
|
||||||
|
}
|
||||||
|
|
||||||
|
const smooth bool = false
|
||||||
|
|
||||||
|
func (jm *JobMetric) AddStatisticsSeries() {
|
||||||
|
if jm.StatisticsSeries != nil || len(jm.Series) < 4 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
n, m := 0, len(jm.Series[0].Data)
|
||||||
|
for _, series := range jm.Series {
|
||||||
|
if len(series.Data) > n {
|
||||||
|
n = len(series.Data)
|
||||||
|
}
|
||||||
|
if len(series.Data) < m {
|
||||||
|
m = len(series.Data)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
min, mean, max := make([]Float, n), make([]Float, n), make([]Float, n)
|
||||||
|
i := 0
|
||||||
|
for ; i < m; i++ {
|
||||||
|
smin, ssum, smax := math.MaxFloat32, 0.0, -math.MaxFloat32
|
||||||
|
notnan := 0
|
||||||
|
for j := 0; j < len(jm.Series); j++ {
|
||||||
|
x := float64(jm.Series[j].Data[i])
|
||||||
|
if math.IsNaN(x) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
notnan += 1
|
||||||
|
ssum += x
|
||||||
|
smin = math.Min(smin, x)
|
||||||
|
smax = math.Max(smax, x)
|
||||||
|
}
|
||||||
|
|
||||||
|
if notnan < 3 {
|
||||||
|
min[i] = NaN
|
||||||
|
mean[i] = NaN
|
||||||
|
max[i] = NaN
|
||||||
|
} else {
|
||||||
|
min[i] = Float(smin)
|
||||||
|
mean[i] = Float(ssum / float64(notnan))
|
||||||
|
max[i] = Float(smax)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for ; i < n; i++ {
|
||||||
|
min[i] = NaN
|
||||||
|
mean[i] = NaN
|
||||||
|
max[i] = NaN
|
||||||
|
}
|
||||||
|
|
||||||
|
if smooth {
|
||||||
|
for i := 2; i < len(mean)-2; i++ {
|
||||||
|
if min[i].IsNaN() {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
min[i] = (min[i-2] + min[i-1] + min[i] + min[i+1] + min[i+2]) / 5
|
||||||
|
max[i] = (max[i-2] + max[i-1] + max[i] + max[i+1] + max[i+2]) / 5
|
||||||
|
mean[i] = (mean[i-2] + mean[i-1] + mean[i] + mean[i+1] + mean[i+2]) / 5
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
jm.StatisticsSeries = &StatsSeries{Mean: mean, Min: min, Max: max}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (jd *JobData) AddNodeScope(metric string) bool {
|
||||||
|
scopes, ok := (*jd)[metric]
|
||||||
|
if !ok {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
var maxScope MetricScope = MetricScopeInvalid
|
||||||
|
for scope := range scopes {
|
||||||
|
maxScope = maxScope.Max(scope)
|
||||||
|
}
|
||||||
|
|
||||||
|
if maxScope == MetricScopeInvalid || maxScope == MetricScopeNode {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
jm := scopes[maxScope]
|
||||||
|
hosts := make(map[string][]Series, 32)
|
||||||
|
for _, series := range jm.Series {
|
||||||
|
hosts[series.Hostname] = append(hosts[series.Hostname], series)
|
||||||
|
}
|
||||||
|
|
||||||
|
nodeJm := &JobMetric{
|
||||||
|
Unit: jm.Unit,
|
||||||
|
Scope: MetricScopeNode,
|
||||||
|
Timestep: jm.Timestep,
|
||||||
|
Series: make([]Series, 0, len(hosts)),
|
||||||
|
}
|
||||||
|
for hostname, series := range hosts {
|
||||||
|
min, sum, max := math.MaxFloat32, 0.0, -math.MaxFloat32
|
||||||
|
for _, series := range series {
|
||||||
|
if series.Statistics == nil {
|
||||||
|
min, sum, max = math.NaN(), math.NaN(), math.NaN()
|
||||||
|
break
|
||||||
|
}
|
||||||
|
sum += series.Statistics.Avg
|
||||||
|
min = math.Min(min, series.Statistics.Min)
|
||||||
|
max = math.Max(max, series.Statistics.Max)
|
||||||
|
}
|
||||||
|
|
||||||
|
n, m := 0, len(jm.Series[0].Data)
|
||||||
|
for _, series := range jm.Series {
|
||||||
|
if len(series.Data) > n {
|
||||||
|
n = len(series.Data)
|
||||||
|
}
|
||||||
|
if len(series.Data) < m {
|
||||||
|
m = len(series.Data)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
i, data := 0, make([]Float, len(series[0].Data))
|
||||||
|
for ; i < m; i++ {
|
||||||
|
x := Float(0.0)
|
||||||
|
for _, series := range jm.Series {
|
||||||
|
x += series.Data[i]
|
||||||
|
}
|
||||||
|
data[i] = x
|
||||||
|
}
|
||||||
|
|
||||||
|
for ; i < n; i++ {
|
||||||
|
data[i] = NaN
|
||||||
|
}
|
||||||
|
|
||||||
|
nodeJm.Series = append(nodeJm.Series, Series{
|
||||||
|
Hostname: hostname,
|
||||||
|
Statistics: &MetricStatistics{Min: min, Avg: sum / float64(len(series)), Max: max},
|
||||||
|
Data: data,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
scopes[MetricScopeNode] = nodeJm
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (jm *JobMetric) AddPercentiles(ps []int) bool {
|
||||||
|
if jm.StatisticsSeries == nil {
|
||||||
|
jm.AddStatisticsSeries()
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(jm.Series) < 3 {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
if jm.StatisticsSeries.Percentiles == nil {
|
||||||
|
jm.StatisticsSeries.Percentiles = make(map[int][]Float, len(ps))
|
||||||
|
}
|
||||||
|
|
||||||
|
n := 0
|
||||||
|
for _, series := range jm.Series {
|
||||||
|
if len(series.Data) > n {
|
||||||
|
n = len(series.Data)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
data := make([][]float64, n)
|
||||||
|
for i := 0; i < n; i++ {
|
||||||
|
vals := make([]float64, 0, len(jm.Series))
|
||||||
|
for _, series := range jm.Series {
|
||||||
|
if i < len(series.Data) {
|
||||||
|
vals = append(vals, float64(series.Data[i]))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
sort.Float64s(vals)
|
||||||
|
data[i] = vals
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, p := range ps {
|
||||||
|
if p < 1 || p > 99 {
|
||||||
|
panic("invalid percentile")
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, ok := jm.StatisticsSeries.Percentiles[p]; ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
percentiles := make([]Float, n)
|
||||||
|
for i := 0; i < n; i++ {
|
||||||
|
sorted := data[i]
|
||||||
|
percentiles[i] = Float(sorted[(len(sorted)*p)/100])
|
||||||
|
}
|
||||||
|
|
||||||
|
jm.StatisticsSeries.Percentiles[p] = percentiles
|
||||||
|
}
|
||||||
|
|
||||||
|
return true
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user