diff --git a/tools/sanitize-archive/cluster.go b/tools/sanitize-archive/cluster.go new file mode 100644 index 0000000..afa5f42 --- /dev/null +++ b/tools/sanitize-archive/cluster.go @@ -0,0 +1,174 @@ +// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. +package main + +import "strconv" + +type Accelerator struct { + ID string `json:"id"` + Type string `json:"type"` + Model string `json:"model"` +} + +type Topology struct { + Node []int `json:"node"` + Socket [][]int `json:"socket"` + MemoryDomain [][]int `json:"memoryDomain"` + Die [][]int `json:"die"` + Core [][]int `json:"core"` + Accelerators []*Accelerator `json:"accelerators"` +} + +type SubCluster struct { + Name string `json:"name"` + Nodes string `json:"nodes"` + NumberOfNodes int `json:"numberOfNodes"` + ProcessorType string `json:"processorType"` + SocketsPerNode int `json:"socketsPerNode"` + CoresPerSocket int `json:"coresPerSocket"` + ThreadsPerCore int `json:"threadsPerCore"` + FlopRateScalar int `json:"flopRateScalar"` + FlopRateSimd int `json:"flopRateSimd"` + MemoryBandwidth int `json:"memoryBandwidth"` + Topology *Topology `json:"topology"` +} + +type SubClusterConfig struct { + Name string `json:"name"` + Peak float64 `json:"peak"` + Normal float64 `json:"normal"` + Caution float64 `json:"caution"` + Alert float64 `json:"alert"` +} + +type MetricConfig struct { + Name string `json:"name"` + Unit string `json:"unit"` + Scope MetricScope `json:"scope"` + Aggregation *string `json:"aggregation"` + Timestep int `json:"timestep"` + Peak *float64 `json:"peak"` + Normal *float64 `json:"normal"` + Caution *float64 `json:"caution"` + Alert *float64 `json:"alert"` + SubClusters []*SubClusterConfig `json:"subClusters"` +} + +type Cluster struct { + Name string `json:"name"` + MetricConfig []*MetricConfig `json:"metricConfig"` + SubClusters []*SubCluster `json:"subClusters"` +} + +// Return a list of socket IDs given a list of hwthread IDs. Even if just one +// hwthread is in that socket, add it to the list. If no hwthreads other than +// those in the argument list are assigned to one of the sockets in the first +// return value, return true as the second value. TODO: Optimize this, there +// must be a more efficient way/algorithm. +func (topo *Topology) GetSocketsFromHWThreads( + hwthreads []int) (sockets []int, exclusive bool) { + + socketsMap := map[int]int{} + for _, hwthread := range hwthreads { + for socket, hwthreadsInSocket := range topo.Socket { + for _, hwthreadInSocket := range hwthreadsInSocket { + if hwthread == hwthreadInSocket { + socketsMap[socket] += 1 + } + } + } + } + + exclusive = true + hwthreadsPerSocket := len(topo.Node) / len(topo.Socket) + sockets = make([]int, 0, len(socketsMap)) + for socket, count := range socketsMap { + sockets = append(sockets, socket) + exclusive = exclusive && count == hwthreadsPerSocket + } + + return sockets, exclusive +} + +// Return a list of core IDs given a list of hwthread IDs. Even if just one +// hwthread is in that core, add it to the list. If no hwthreads other than +// those in the argument list are assigned to one of the cores in the first +// return value, return true as the second value. TODO: Optimize this, there +// must be a more efficient way/algorithm. +func (topo *Topology) GetCoresFromHWThreads( + hwthreads []int) (cores []int, exclusive bool) { + + coresMap := map[int]int{} + for _, hwthread := range hwthreads { + for core, hwthreadsInCore := range topo.Core { + for _, hwthreadInCore := range hwthreadsInCore { + if hwthread == hwthreadInCore { + coresMap[core] += 1 + } + } + } + } + + exclusive = true + hwthreadsPerCore := len(topo.Node) / len(topo.Core) + cores = make([]int, 0, len(coresMap)) + for core, count := range coresMap { + cores = append(cores, core) + exclusive = exclusive && count == hwthreadsPerCore + } + + return cores, exclusive +} + +// Return a list of memory domain IDs given a list of hwthread IDs. Even if +// just one hwthread is in that memory domain, add it to the list. If no +// hwthreads other than those in the argument list are assigned to one of the +// memory domains in the first return value, return true as the second value. +// TODO: Optimize this, there must be a more efficient way/algorithm. +func (topo *Topology) GetMemoryDomainsFromHWThreads( + hwthreads []int) (memDoms []int, exclusive bool) { + + memDomsMap := map[int]int{} + for _, hwthread := range hwthreads { + for memDom, hwthreadsInmemDom := range topo.MemoryDomain { + for _, hwthreadInmemDom := range hwthreadsInmemDom { + if hwthread == hwthreadInmemDom { + memDomsMap[memDom] += 1 + } + } + } + } + + exclusive = true + hwthreadsPermemDom := len(topo.Node) / len(topo.MemoryDomain) + memDoms = make([]int, 0, len(memDomsMap)) + for memDom, count := range memDomsMap { + memDoms = append(memDoms, memDom) + exclusive = exclusive && count == hwthreadsPermemDom + } + + return memDoms, exclusive +} + +func (topo *Topology) GetAcceleratorIDs() ([]int, error) { + accels := make([]int, 0) + for _, accel := range topo.Accelerators { + id, err := strconv.Atoi(accel.ID) + if err != nil { + return nil, err + } + accels = append(accels, id) + } + return accels, nil +} + +func (topo *Topology) GetAcceleratorIndex(id string) (int, bool) { + for idx, accel := range topo.Accelerators { + if accel.ID == id { + return idx, true + } + } + return -1, false +} diff --git a/tools/sanitize-archive/clusterConfig.go b/tools/sanitize-archive/clusterConfig.go new file mode 100644 index 0000000..d9a8354 --- /dev/null +++ b/tools/sanitize-archive/clusterConfig.go @@ -0,0 +1,165 @@ +// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. +package main + +import ( + "errors" + "fmt" + + "github.com/ClusterCockpit/cc-backend/pkg/archive" +) + +var Clusters []*Cluster +var nodeLists map[string]map[string]archive.NodeList + +func initClusterConfig() error { + + Clusters = []*Cluster{} + nodeLists = map[string]map[string]archive.NodeList{} + + for _, c := range ar.GetClusters() { + + cluster, err := ar.LoadClusterCfg(c) + if err != nil { + return err + } + + if len(cluster.Name) == 0 || + len(cluster.MetricConfig) == 0 || + len(cluster.SubClusters) == 0 { + return errors.New("cluster.name, cluster.metricConfig and cluster.SubClusters should not be empty") + } + + for _, mc := range cluster.MetricConfig { + if len(mc.Name) == 0 { + return errors.New("cluster.metricConfig.name should not be empty") + } + if mc.Timestep < 1 { + return errors.New("cluster.metricConfig.timestep should not be smaller than one") + } + + // For backwards compability... + if mc.Scope == "" { + mc.Scope = MetricScopeNode + } + if !mc.Scope.Valid() { + return errors.New("cluster.metricConfig.scope must be a valid scope ('node', 'scocket', ...)") + } + } + + Clusters = append(Clusters, cluster) + + nodeLists[cluster.Name] = make(map[string]archive.NodeList) + for _, sc := range cluster.SubClusters { + if sc.Nodes == "" { + continue + } + + nl, err := archive.ParseNodeList(sc.Nodes) + if err != nil { + return fmt.Errorf("in %s/cluster.json: %w", cluster.Name, err) + } + nodeLists[cluster.Name][sc.Name] = nl + } + } + + return nil +} + +func GetCluster(cluster string) *Cluster { + + for _, c := range Clusters { + if c.Name == cluster { + return c + } + } + return nil +} + +func GetSubCluster(cluster, subcluster string) *SubCluster { + + for _, c := range Clusters { + if c.Name == cluster { + for _, p := range c.SubClusters { + if p.Name == subcluster { + return p + } + } + } + } + return nil +} + +func GetMetricConfig(cluster, metric string) *MetricConfig { + + for _, c := range Clusters { + if c.Name == cluster { + for _, m := range c.MetricConfig { + if m.Name == metric { + return m + } + } + } + } + return nil +} + +// AssignSubCluster sets the `job.subcluster` property of the job based +// on its cluster and resources. +func AssignSubCluster(job *BaseJob) error { + + cluster := GetCluster(job.Cluster) + if cluster == nil { + return fmt.Errorf("unkown cluster: %#v", job.Cluster) + } + + if job.SubCluster != "" { + for _, sc := range cluster.SubClusters { + if sc.Name == job.SubCluster { + return nil + } + } + return fmt.Errorf("already assigned subcluster %#v unkown (cluster: %#v)", job.SubCluster, job.Cluster) + } + + if len(job.Resources) == 0 { + return fmt.Errorf("job without any resources/hosts") + } + + host0 := job.Resources[0].Hostname + for sc, nl := range nodeLists[job.Cluster] { + if nl != nil && nl.Contains(host0) { + job.SubCluster = sc + return nil + } + } + + if cluster.SubClusters[0].Nodes == "" { + job.SubCluster = cluster.SubClusters[0].Name + return nil + } + + return fmt.Errorf("no subcluster found for cluster %#v and host %#v", job.Cluster, host0) +} + +func GetSubClusterByNode(cluster, hostname string) (string, error) { + + for sc, nl := range nodeLists[cluster] { + if nl != nil && nl.Contains(hostname) { + return sc, nil + } + } + + c := GetCluster(cluster) + if c == nil { + return "", fmt.Errorf("unkown cluster: %#v", cluster) + } + + if c.SubClusters[0].Nodes == "" { + return c.SubClusters[0].Name, nil + } + + return "", fmt.Errorf("no subcluster found for cluster %#v and host %#v", cluster, hostname) +} diff --git a/tools/sanitize-archive/float.go b/tools/sanitize-archive/float.go new file mode 100644 index 0000000..af322db --- /dev/null +++ b/tools/sanitize-archive/float.go @@ -0,0 +1,109 @@ +// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. +package main + +import ( + "errors" + "io" + "math" + "strconv" +) + +// A custom float type is used so that (Un)MarshalJSON and +// (Un)MarshalGQL can be overloaded and NaN/null can be used. +// The default behaviour of putting every nullable value behind +// a pointer has a bigger overhead. +type Float float64 + +var NaN Float = Float(math.NaN()) +var nullAsBytes []byte = []byte("null") + +func (f Float) IsNaN() bool { + return math.IsNaN(float64(f)) +} + +// NaN will be serialized to `null`. +func (f Float) MarshalJSON() ([]byte, error) { + if f.IsNaN() { + return nullAsBytes, nil + } + + return strconv.AppendFloat(make([]byte, 0, 10), float64(f), 'f', 2, 64), nil +} + +// `null` will be unserialized to NaN. +func (f *Float) UnmarshalJSON(input []byte) error { + s := string(input) + if s == "null" { + *f = NaN + return nil + } + + val, err := strconv.ParseFloat(s, 64) + if err != nil { + return err + } + *f = Float(val) + return nil +} + +// UnmarshalGQL implements the graphql.Unmarshaler interface. +func (f *Float) UnmarshalGQL(v interface{}) error { + f64, ok := v.(float64) + if !ok { + return errors.New("invalid Float scalar") + } + + *f = Float(f64) + return nil +} + +// MarshalGQL implements the graphql.Marshaler interface. +// NaN will be serialized to `null`. +func (f Float) MarshalGQL(w io.Writer) { + if f.IsNaN() { + w.Write(nullAsBytes) + } else { + w.Write(strconv.AppendFloat(make([]byte, 0, 10), float64(f), 'f', 2, 64)) + } +} + +// Only used via REST-API, not via GraphQL. +// This uses a lot less allocations per series, +// but it turns out that the performance increase +// from using this is not that big. +func (s *Series) MarshalJSON() ([]byte, error) { + buf := make([]byte, 0, 512+len(s.Data)*8) + buf = append(buf, `{"hostname":"`...) + buf = append(buf, s.Hostname...) + buf = append(buf, '"') + if s.Id != nil { + buf = append(buf, `,"id":`...) + buf = strconv.AppendInt(buf, int64(*s.Id), 10) + } + if s.Statistics != nil { + buf = append(buf, `,"statistics":{"min":`...) + buf = strconv.AppendFloat(buf, s.Statistics.Min, 'f', 2, 64) + buf = append(buf, `,"avg":`...) + buf = strconv.AppendFloat(buf, s.Statistics.Avg, 'f', 2, 64) + buf = append(buf, `,"max":`...) + buf = strconv.AppendFloat(buf, s.Statistics.Max, 'f', 2, 64) + buf = append(buf, '}') + } + buf = append(buf, `,"data":[`...) + for i := 0; i < len(s.Data); i++ { + if i != 0 { + buf = append(buf, ',') + } + + if s.Data[i].IsNaN() { + buf = append(buf, `null`...) + } else { + buf = strconv.AppendFloat(buf, float64(s.Data[i]), 'f', 2, 32) + } + } + buf = append(buf, ']', '}') + return buf, nil +} diff --git a/tools/sanitize-archive/fsBackend.go b/tools/sanitize-archive/fsBackend.go new file mode 100644 index 0000000..95b75df --- /dev/null +++ b/tools/sanitize-archive/fsBackend.go @@ -0,0 +1,221 @@ +// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. +package main + +import ( + "bufio" + "bytes" + "encoding/json" + "fmt" + "os" + "path" + "path/filepath" + "strconv" + "time" + + "github.com/ClusterCockpit/cc-backend/pkg/log" +) + +type FsArchiveConfig struct { + Path string `json:"path"` +} + +type FsArchive struct { + path string + clusters []string +} + +func getPath( + job *Job, + rootPath string, + file string) string { + + lvl1, lvl2 := fmt.Sprintf("%d", job.JobID/1000), fmt.Sprintf("%03d", job.JobID%1000) + return filepath.Join( + rootPath, + job.Cluster, + lvl1, lvl2, + strconv.FormatInt(job.StartTime.Unix(), 10), file) +} + +func loadJobMeta(filename string) (*JobMeta, error) { + + f, err := os.Open(filename) + if err != nil { + log.Errorf("fsBackend loadJobMeta()- %v", err) + return &JobMeta{}, err + } + defer f.Close() + + return DecodeJobMeta(bufio.NewReader(f)) +} + +func (fsa *FsArchive) Init(rawConfig json.RawMessage) error { + + var config FsArchiveConfig + if err := json.Unmarshal(rawConfig, &config); err != nil { + log.Errorf("fsBackend Init()- %v", err) + return err + } + if config.Path == "" { + err := fmt.Errorf("fsBackend Init()- empty path") + log.Errorf("fsBackend Init()- %v", err) + return err + } + fsa.path = config.Path + + entries, err := os.ReadDir(fsa.path) + if err != nil { + log.Errorf("fsBackend Init()- %v", err) + return err + } + + for _, de := range entries { + fsa.clusters = append(fsa.clusters, de.Name()) + } + + return nil +} + +func (fsa *FsArchive) LoadJobData(job *Job) (JobData, error) { + + filename := getPath(job, fsa.path, "data.json") + f, err := os.Open(filename) + if err != nil { + log.Errorf("fsBackend LoadJobData()- %v", err) + return nil, err + } + defer f.Close() + + return DecodeJobData(bufio.NewReader(f), filename) +} + +func (fsa *FsArchive) LoadJobMeta(job *Job) (*JobMeta, error) { + + filename := getPath(job, fsa.path, "meta.json") + return loadJobMeta(filename) +} + +func (fsa *FsArchive) LoadClusterCfg(name string) (*Cluster, error) { + + b, err := os.ReadFile(filepath.Join(fsa.path, name, "cluster.json")) + if err != nil { + log.Errorf("fsBackend LoadClusterCfg()- %v", err) + return &Cluster{}, err + } + return DecodeCluster(bytes.NewReader(b)) +} + +func (fsa *FsArchive) Iter() <-chan *JobMeta { + + ch := make(chan *JobMeta) + go func() { + clustersDir, err := os.ReadDir(fsa.path) + if err != nil { + log.Fatalf("Reading clusters failed: %s", err.Error()) + } + + for _, clusterDir := range clustersDir { + lvl1Dirs, err := os.ReadDir(filepath.Join(fsa.path, clusterDir.Name())) + if err != nil { + log.Fatalf("Reading jobs failed: %s", err.Error()) + } + + for _, lvl1Dir := range lvl1Dirs { + if !lvl1Dir.IsDir() { + // Could be the cluster.json file + continue + } + + lvl2Dirs, err := os.ReadDir(filepath.Join(fsa.path, clusterDir.Name(), lvl1Dir.Name())) + if err != nil { + log.Fatalf("Reading jobs failed: %s", err.Error()) + } + + for _, lvl2Dir := range lvl2Dirs { + dirpath := filepath.Join(fsa.path, clusterDir.Name(), lvl1Dir.Name(), lvl2Dir.Name()) + startTimeDirs, err := os.ReadDir(dirpath) + if err != nil { + log.Fatalf("Reading jobs failed: %s", err.Error()) + } + + for _, startTimeDir := range startTimeDirs { + if startTimeDir.IsDir() { + job, err := loadJobMeta(filepath.Join(dirpath, startTimeDir.Name(), "meta.json")) + if err != nil { + log.Errorf("in %s: %s", filepath.Join(dirpath, startTimeDir.Name()), err.Error()) + } else { + ch <- job + } + } + } + } + } + } + close(ch) + }() + return ch +} + +func (fsa *FsArchive) StoreJobMeta(jobMeta *JobMeta) error { + + job := Job{ + BaseJob: jobMeta.BaseJob, + StartTime: time.Unix(jobMeta.StartTime, 0), + StartTimeUnix: jobMeta.StartTime, + } + f, err := os.Create(getPath(&job, fsa.path, "meta.json")) + if err != nil { + return err + } + if err := EncodeJobMeta(f, jobMeta); err != nil { + return err + } + if err := f.Close(); err != nil { + return err + } + + return nil +} + +func (fsa *FsArchive) GetClusters() []string { + + return fsa.clusters +} + +func (fsa *FsArchive) ImportJob( + jobMeta *JobMeta, + jobData *JobData) error { + + job := Job{ + BaseJob: jobMeta.BaseJob, + StartTime: time.Unix(jobMeta.StartTime, 0), + StartTimeUnix: jobMeta.StartTime, + } + dir := getPath(&job, fsa.path, "") + if err := os.MkdirAll(dir, 0777); err != nil { + return err + } + + f, err := os.Create(path.Join(dir, "meta.json")) + if err != nil { + return err + } + if err := EncodeJobMeta(f, jobMeta); err != nil { + return err + } + if err := f.Close(); err != nil { + return err + } + + f, err = os.Create(path.Join(dir, "data.json")) + if err != nil { + return err + } + if err := EncodeJobData(f, jobData); err != nil { + return err + } + return f.Close() +} diff --git a/tools/sanitize-archive/job.go b/tools/sanitize-archive/job.go new file mode 100644 index 0000000..b0e9558 --- /dev/null +++ b/tools/sanitize-archive/job.go @@ -0,0 +1,160 @@ +// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. +package main + +import ( + "errors" + "fmt" + "io" + "time" +) + +// Non-Swaggered Comment: BaseJob +// Non-Swaggered Comment: Common subset of Job and JobMeta. Use one of those, not this type directly. + +type BaseJob struct { + // The unique identifier of a job + JobID int64 `json:"jobId" db:"job_id" example:"123000"` + User string `json:"user" db:"user" example:"abcd100h"` // The unique identifier of a user + Project string `json:"project" db:"project" example:"abcd200"` // The unique identifier of a project + Cluster string `json:"cluster" db:"cluster" example:"fritz"` // The unique identifier of a cluster + SubCluster string `json:"subCluster" db:"subcluster" example:"main"` // The unique identifier of a sub cluster + Partition string `json:"partition" db:"partition" example:"main"` // The Slurm partition to which the job was submitted + ArrayJobId int64 `json:"arrayJobId" db:"array_job_id" example:"123000"` // The unique identifier of an array job + NumNodes int32 `json:"numNodes" db:"num_nodes" example:"2" minimum:"1"` // Number of nodes used (Min > 0) + NumHWThreads int32 `json:"numHwthreads" db:"num_hwthreads" example:"20" minimum:"1"` // Number of HWThreads used (Min > 0) + NumAcc int32 `json:"numAcc" db:"num_acc" example:"2" minimum:"1"` // Number of accelerators used (Min > 0) + Exclusive int32 `json:"exclusive" db:"exclusive" example:"1" minimum:"0" maximum:"2"` // Specifies how nodes are shared: 0 - Shared among multiple jobs of multiple users, 1 - Job exclusive (Default), 2 - Shared among multiple jobs of same user + MonitoringStatus int32 `json:"monitoringStatus" db:"monitoring_status" example:"1" minimum:"0" maximum:"3"` // State of monitoring system during job run: 0 - Disabled, 1 - Running or Archiving (Default), 2 - Archiving Failed, 3 - Archiving Successfull + SMT int32 `json:"smt" db:"smt" example:"4"` // SMT threads used by job + State JobState `json:"jobState" db:"job_state" example:"completed" enums:"completed,failed,cancelled,stopped,timeout,out_of_memory"` // Final state of job + Duration int32 `json:"duration" db:"duration" example:"43200" minimum:"1"` // Duration of job in seconds (Min > 0) + Walltime int64 `json:"walltime" db:"walltime" example:"86400" minimum:"1"` // Requested walltime of job in seconds (Min > 0) + Tags []*Tag `json:"tags"` // List of tags + RawResources []byte `json:"-" db:"resources"` // Resources used by job [As Bytes] + Resources []*Resource `json:"resources"` // Resources used by job + RawMetaData []byte `json:"-" db:"meta_data"` // Additional information about the job [As Bytes] + MetaData map[string]string `json:"metaData"` // Additional information about the job +} + +// Non-Swaggered Comment: Job +// Non-Swaggered Comment: This type is used as the GraphQL interface and using sqlx as a table row. + +// Job model +// @Description Information of a HPC job. +type Job struct { + // The unique identifier of a job in the database + ID int64 `json:"id" db:"id"` + BaseJob + StartTimeUnix int64 `json:"-" db:"start_time" example:"1649723812"` // Start epoch time stamp in seconds + StartTime time.Time `json:"startTime"` // Start time as 'time.Time' data type + MemUsedMax float64 `json:"-" db:"mem_used_max"` // MemUsedMax as Float64 + FlopsAnyAvg float64 `json:"-" db:"flops_any_avg"` // FlopsAnyAvg as Float64 + MemBwAvg float64 `json:"-" db:"mem_bw_avg"` // MemBwAvg as Float64 + LoadAvg float64 `json:"-" db:"load_avg"` // LoadAvg as Float64 + NetBwAvg float64 `json:"-" db:"net_bw_avg"` // NetBwAvg as Float64 + NetDataVolTotal float64 `json:"-" db:"net_data_vol_total"` // NetDataVolTotal as Float64 + FileBwAvg float64 `json:"-" db:"file_bw_avg"` // FileBwAvg as Float64 + FileDataVolTotal float64 `json:"-" db:"file_data_vol_total"` // FileDataVolTotal as Float64 +} + +// Non-Swaggered Comment: JobMeta +// Non-Swaggered Comment: When reading from the database or sending data via GraphQL, the start time can be in the much more +// Non-Swaggered Comment: convenient time.Time type. In the `meta.json` files, the start time is encoded as a unix epoch timestamp. +// Non-Swaggered Comment: This is why there is this struct, which contains all fields from the regular job struct, but "overwrites" +// Non-Swaggered Comment: the StartTime field with one of type int64. +// Non-Swaggered Comment: ID *int64 `json:"id,omitempty"` >> never used in the job-archive, only available via REST-API + +// JobMeta model +// @Description Meta data information of a HPC job. +type JobMeta struct { + // The unique identifier of a job in the database + ID *int64 `json:"id,omitempty"` + BaseJob + StartTime int64 `json:"startTime" db:"start_time" example:"1649723812" minimum:"1"` // Start epoch time stamp in seconds (Min > 0) + Statistics map[string]JobStatistics `json:"statistics,omitempty"` // Metric statistics of job +} + +const ( + MonitoringStatusDisabled int32 = 0 + MonitoringStatusRunningOrArchiving int32 = 1 + MonitoringStatusArchivingFailed int32 = 2 + MonitoringStatusArchivingSuccessful int32 = 3 +) + +var JobDefaults BaseJob = BaseJob{ + Exclusive: 1, + MonitoringStatus: MonitoringStatusRunningOrArchiving, +} + +// JobStatistics model +// @Description Specification for job metric statistics. +type JobStatistics struct { + // Metric unit (see schema/unit.schema.json) + Unit string `json:"unit" example:"GHz"` + Avg float64 `json:"avg" example:"2500" minimum:"0"` // Job metric average + Min float64 `json:"min" example:"2000" minimum:"0"` // Job metric minimum + Max float64 `json:"max" example:"3000" minimum:"0"` // Job metric maximum +} + +// Tag model +// @Description Defines a tag using name and type. +type Tag struct { + // The unique DB identifier of a tag + ID int64 `json:"id" db:"id"` + Type string `json:"type" db:"tag_type" example:"Debug"` // Tag Type + Name string `json:"name" db:"tag_name" example:"Testjob"` // Tag Name +} + +// Resource model +// @Description A resource used by a job +type Resource struct { + Hostname string `json:"hostname"` // Name of the host (= node) + HWThreads []int `json:"hwthreads,omitempty"` // List of OS processor ids + Accelerators []string `json:"accelerators,omitempty"` // List of of accelerator device ids + Configuration string `json:"configuration,omitempty"` // The configuration options of the node +} + +type JobState string + +const ( + JobStateRunning JobState = "running" + JobStateCompleted JobState = "completed" + JobStateFailed JobState = "failed" + JobStateCancelled JobState = "cancelled" + JobStateStopped JobState = "stopped" + JobStateTimeout JobState = "timeout" + JobStatePreempted JobState = "preempted" + JobStateOutOfMemory JobState = "out_of_memory" +) + +func (e *JobState) UnmarshalGQL(v interface{}) error { + str, ok := v.(string) + if !ok { + return fmt.Errorf("enums must be strings") + } + + *e = JobState(str) + if !e.Valid() { + return errors.New("invalid job state") + } + + return nil +} + +func (e JobState) MarshalGQL(w io.Writer) { + fmt.Fprintf(w, "\"%s\"", e) +} + +func (e JobState) Valid() bool { + return e == JobStateRunning || + e == JobStateCompleted || + e == JobStateFailed || + e == JobStateCancelled || + e == JobStateStopped || + e == JobStateTimeout || + e == JobStatePreempted || + e == JobStateOutOfMemory +} diff --git a/tools/sanitize-archive/json.go b/tools/sanitize-archive/json.go new file mode 100644 index 0000000..bb56bb5 --- /dev/null +++ b/tools/sanitize-archive/json.go @@ -0,0 +1,59 @@ +// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. +package main + +import ( + "encoding/json" + "io" +) + +func DecodeJobData(r io.Reader, k string) (JobData, error) { + var d JobData + if err := json.NewDecoder(r).Decode(&d); err != nil { + return nil, err + } + + return d, nil +} + +func DecodeJobMeta(r io.Reader) (*JobMeta, error) { + var d JobMeta + if err := json.NewDecoder(r).Decode(&d); err != nil { + return &d, err + } + + // Sanitize parameters + + return &d, nil +} + +func DecodeCluster(r io.Reader) (*Cluster, error) { + var c Cluster + if err := json.NewDecoder(r).Decode(&c); err != nil { + return &c, err + } + + // Sanitize parameters + + return &c, nil +} + +func EncodeJobData(w io.Writer, d *JobData) error { + // Sanitize parameters + if err := json.NewEncoder(w).Encode(d); err != nil { + return err + } + + return nil +} + +func EncodeJobMeta(w io.Writer, d *JobMeta) error { + // Sanitize parameters + if err := json.NewEncoder(w).Encode(d); err != nil { + return err + } + + return nil +} diff --git a/tools/sanitize-archive/main.go b/tools/sanitize-archive/main.go new file mode 100644 index 0000000..b69b348 --- /dev/null +++ b/tools/sanitize-archive/main.go @@ -0,0 +1,32 @@ +// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. +package main + +import ( + "encoding/json" + "flag" + "fmt" + "log" +) + +var ar FsArchive + +func main() { + var srcPath string + var dstPath string + + flag.StringVar(&srcPath, "s", "./var/job-archive", "Specify the source job archive path. Default is ./var/job-archive") + flag.StringVar(&dstPath, "d", "./var/job-archive-new", "Specify the destination job archive path. Default is ./var/job-archive-new") + + srcConfig := fmt.Sprintf("{\"path\": \"%s\"}", srcPath) + err := ar.Init(json.RawMessage(srcConfig)) + if err != nil { + log.Fatal(err) + } + + for job := range ar.Iter() { + fmt.Printf("Job %d\n", job.JobID) + } +} diff --git a/tools/sanitize-archive/metrics.go b/tools/sanitize-archive/metrics.go new file mode 100644 index 0000000..96174c0 --- /dev/null +++ b/tools/sanitize-archive/metrics.go @@ -0,0 +1,323 @@ +// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. +package main + +import ( + "fmt" + "io" + "math" + "sort" + "unsafe" +) + +type JobData map[string]map[MetricScope]*JobMetric + +type JobMetric struct { + Unit string `json:"unit"` + Scope MetricScope `json:"scope"` + Timestep int `json:"timestep"` + Series []Series `json:"series"` + StatisticsSeries *StatsSeries `json:"statisticsSeries"` +} + +type Series struct { + Hostname string `json:"hostname"` + Id *int `json:"id,omitempty"` + Statistics *MetricStatistics `json:"statistics"` + Data []Float `json:"data"` +} + +type MetricStatistics struct { + Avg float64 `json:"avg"` + Min float64 `json:"min"` + Max float64 `json:"max"` +} + +type StatsSeries struct { + Mean []Float `json:"mean"` + Min []Float `json:"min"` + Max []Float `json:"max"` + Percentiles map[int][]Float `json:"percentiles,omitempty"` +} + +type MetricScope string + +const ( + MetricScopeInvalid MetricScope = "invalid_scope" + + MetricScopeNode MetricScope = "node" + MetricScopeSocket MetricScope = "socket" + MetricScopeMemoryDomain MetricScope = "memoryDomain" + MetricScopeCore MetricScope = "core" + MetricScopeHWThread MetricScope = "hwthread" + + MetricScopeAccelerator MetricScope = "accelerator" +) + +var metricScopeGranularity map[MetricScope]int = map[MetricScope]int{ + MetricScopeNode: 10, + MetricScopeSocket: 5, + MetricScopeMemoryDomain: 3, + MetricScopeCore: 2, + MetricScopeHWThread: 1, + + MetricScopeAccelerator: 5, // Special/Randomly choosen + + MetricScopeInvalid: -1, +} + +func (e *MetricScope) LT(other MetricScope) bool { + a := metricScopeGranularity[*e] + b := metricScopeGranularity[other] + return a < b +} + +func (e *MetricScope) LTE(other MetricScope) bool { + a := metricScopeGranularity[*e] + b := metricScopeGranularity[other] + return a <= b +} + +func (e *MetricScope) Max(other MetricScope) MetricScope { + a := metricScopeGranularity[*e] + b := metricScopeGranularity[other] + if a > b { + return *e + } + return other +} + +func (e *MetricScope) UnmarshalGQL(v interface{}) error { + str, ok := v.(string) + if !ok { + return fmt.Errorf("enums must be strings") + } + + *e = MetricScope(str) + if !e.Valid() { + return fmt.Errorf("%s is not a valid MetricScope", str) + } + return nil +} + +func (e MetricScope) MarshalGQL(w io.Writer) { + fmt.Fprintf(w, "\"%s\"", e) +} + +func (e MetricScope) Valid() bool { + gran, ok := metricScopeGranularity[e] + return ok && gran > 0 +} + +func (jd *JobData) Size() int { + n := 128 + for _, scopes := range *jd { + for _, metric := range scopes { + if metric.StatisticsSeries != nil { + n += len(metric.StatisticsSeries.Max) + n += len(metric.StatisticsSeries.Mean) + n += len(metric.StatisticsSeries.Min) + } + + for _, series := range metric.Series { + n += len(series.Data) + } + } + } + return n * int(unsafe.Sizeof(Float(0))) +} + +const smooth bool = false + +func (jm *JobMetric) AddStatisticsSeries() { + if jm.StatisticsSeries != nil || len(jm.Series) < 4 { + return + } + + n, m := 0, len(jm.Series[0].Data) + for _, series := range jm.Series { + if len(series.Data) > n { + n = len(series.Data) + } + if len(series.Data) < m { + m = len(series.Data) + } + } + + min, mean, max := make([]Float, n), make([]Float, n), make([]Float, n) + i := 0 + for ; i < m; i++ { + smin, ssum, smax := math.MaxFloat32, 0.0, -math.MaxFloat32 + notnan := 0 + for j := 0; j < len(jm.Series); j++ { + x := float64(jm.Series[j].Data[i]) + if math.IsNaN(x) { + continue + } + + notnan += 1 + ssum += x + smin = math.Min(smin, x) + smax = math.Max(smax, x) + } + + if notnan < 3 { + min[i] = NaN + mean[i] = NaN + max[i] = NaN + } else { + min[i] = Float(smin) + mean[i] = Float(ssum / float64(notnan)) + max[i] = Float(smax) + } + } + + for ; i < n; i++ { + min[i] = NaN + mean[i] = NaN + max[i] = NaN + } + + if smooth { + for i := 2; i < len(mean)-2; i++ { + if min[i].IsNaN() { + continue + } + + min[i] = (min[i-2] + min[i-1] + min[i] + min[i+1] + min[i+2]) / 5 + max[i] = (max[i-2] + max[i-1] + max[i] + max[i+1] + max[i+2]) / 5 + mean[i] = (mean[i-2] + mean[i-1] + mean[i] + mean[i+1] + mean[i+2]) / 5 + } + } + + jm.StatisticsSeries = &StatsSeries{Mean: mean, Min: min, Max: max} +} + +func (jd *JobData) AddNodeScope(metric string) bool { + scopes, ok := (*jd)[metric] + if !ok { + return false + } + + var maxScope MetricScope = MetricScopeInvalid + for scope := range scopes { + maxScope = maxScope.Max(scope) + } + + if maxScope == MetricScopeInvalid || maxScope == MetricScopeNode { + return false + } + + jm := scopes[maxScope] + hosts := make(map[string][]Series, 32) + for _, series := range jm.Series { + hosts[series.Hostname] = append(hosts[series.Hostname], series) + } + + nodeJm := &JobMetric{ + Unit: jm.Unit, + Scope: MetricScopeNode, + Timestep: jm.Timestep, + Series: make([]Series, 0, len(hosts)), + } + for hostname, series := range hosts { + min, sum, max := math.MaxFloat32, 0.0, -math.MaxFloat32 + for _, series := range series { + if series.Statistics == nil { + min, sum, max = math.NaN(), math.NaN(), math.NaN() + break + } + sum += series.Statistics.Avg + min = math.Min(min, series.Statistics.Min) + max = math.Max(max, series.Statistics.Max) + } + + n, m := 0, len(jm.Series[0].Data) + for _, series := range jm.Series { + if len(series.Data) > n { + n = len(series.Data) + } + if len(series.Data) < m { + m = len(series.Data) + } + } + + i, data := 0, make([]Float, len(series[0].Data)) + for ; i < m; i++ { + x := Float(0.0) + for _, series := range jm.Series { + x += series.Data[i] + } + data[i] = x + } + + for ; i < n; i++ { + data[i] = NaN + } + + nodeJm.Series = append(nodeJm.Series, Series{ + Hostname: hostname, + Statistics: &MetricStatistics{Min: min, Avg: sum / float64(len(series)), Max: max}, + Data: data, + }) + } + + scopes[MetricScopeNode] = nodeJm + return true +} + +func (jm *JobMetric) AddPercentiles(ps []int) bool { + if jm.StatisticsSeries == nil { + jm.AddStatisticsSeries() + } + + if len(jm.Series) < 3 { + return false + } + + if jm.StatisticsSeries.Percentiles == nil { + jm.StatisticsSeries.Percentiles = make(map[int][]Float, len(ps)) + } + + n := 0 + for _, series := range jm.Series { + if len(series.Data) > n { + n = len(series.Data) + } + } + + data := make([][]float64, n) + for i := 0; i < n; i++ { + vals := make([]float64, 0, len(jm.Series)) + for _, series := range jm.Series { + if i < len(series.Data) { + vals = append(vals, float64(series.Data[i])) + } + } + + sort.Float64s(vals) + data[i] = vals + } + + for _, p := range ps { + if p < 1 || p > 99 { + panic("invalid percentile") + } + + if _, ok := jm.StatisticsSeries.Percentiles[p]; ok { + continue + } + + percentiles := make([]Float, n) + for i := 0; i < n; i++ { + sorted := data[i] + percentiles[i] = Float(sorted[(len(sorted)*p)/100]) + } + + jm.StatisticsSeries.Percentiles[p] = percentiles + } + + return true +}