From 7251344d4aade00f68399a69a6afea7b713f8363 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Wed, 28 Sep 2022 16:25:26 +0200 Subject: [PATCH 1/2] Add convenience routines to unit package --- internal/repository/init.go | 13 +++++ pkg/units/README.md | 3 +- pkg/units/unitPrefix.go | 18 ++++++ pkg/units/units.go | 62 ++++++++++++++++++++- pkg/units/units_test.go | 106 ++++++++++++++++++++++++++++++++++++ 5 files changed, 200 insertions(+), 2 deletions(-) diff --git a/internal/repository/init.go b/internal/repository/init.go index 76973eb..1ea8d17 100644 --- a/internal/repository/init.go +++ b/internal/repository/init.go @@ -366,3 +366,16 @@ func loadJobStat(job *schema.JobMeta, metric string) float64 { return 0.0 } + +func checkJobData(d *schema.JobData) error { + // for name, scopes := range *d { + + // for scope, metric := range scopes { + // // 1. Unit normalisation + + // } + // // 2. Add node scope if missing + + // } + return nil +} diff --git a/pkg/units/README.md b/pkg/units/README.md index b5ac5ac..862239c 100644 --- a/pkg/units/README.md +++ b/pkg/units/README.md @@ -1,6 +1,7 @@ # cc-units - A unit system for ClusterCockpit -When working with metrics, the problem comes up that they may use different unit name but have the same unit in fact. There are a lot of real world examples like 'kB' and 'Kbyte'. In [cc-metric-collector](https://github.com/ClusterCockpit/cc-metric-collector), the collectors read data from different sources which may use different units or the programmer specifies a unit for a metric by hand. The cc-units system is not comparable with the SI unit system. If you are looking for a package for the SI units, see [here](https://pkg.go.dev/github.com/gurre/si). +When working with metrics, the problem comes up that they may use different unit name but have the same unit in fact. + There are a lot of real world examples like 'kB' and 'Kbyte'. In [cc-metric-collector](https://github.com/ClusterCockpit/cc-metric-collector), the collectors read data from different sources which may use different units or the programmer specifies a unit for a metric by hand. The cc-units system is not comparable with the SI unit system. If you are looking for a package for the SI units, see [here](https://pkg.go.dev/github.com/gurre/si). In order to enable unit comparison and conversion, the ccUnits package provides some helpers: ```go diff --git a/pkg/units/unitPrefix.go b/pkg/units/unitPrefix.go index 347578d..014fcc7 100644 --- a/pkg/units/unitPrefix.go +++ b/pkg/units/unitPrefix.go @@ -1,6 +1,7 @@ package units import ( + "math" "regexp" ) @@ -172,3 +173,20 @@ func NewPrefix(prefix string) Prefix { } return InvalidPrefix } + +func getExponent(p float64) int { + count := 0 + + for p > 1.0 { + p = p / 1000.0 + count++ + } + + return count * 3 +} + +func NewPrefixFromFactor(op Prefix, e int) Prefix { + f := float64(op) + exp := math.Pow10(getExponent(f) - e) + return Prefix(exp) +} diff --git a/pkg/units/units.go b/pkg/units/units.go index 9cca4df..2cb6524 100644 --- a/pkg/units/units.go +++ b/pkg/units/units.go @@ -3,6 +3,7 @@ package units import ( "fmt" + "math" "strings" ) @@ -25,7 +26,9 @@ type Unit interface { var INVALID_UNIT = NewUnit("foobar") -// Valid checks whether a unit is a valid unit. A unit is valid if it has at least a prefix and a measure. The unit denominator is optional. +// Valid checks whether a unit is a valid unit. +// A unit is valid if it has at least a prefix and a measure. +// The unit denominator is optional. func (u *unit) Valid() bool { return u.prefix != InvalidPrefix && u.measure != InvalidMeasure } @@ -71,6 +74,63 @@ func (u *unit) getUnitDenominator() Measure { return u.divMeasure } +func ConvertValue(v *float64, from string, to string) { + uf := NewUnit(from) + ut := NewUnit(to) + factor := float64(uf.getPrefix()) / float64(ut.getPrefix()) + *v = math.Ceil(*v * factor) +} + +func ConvertSeries(s []float64, from string, to string) { + uf := NewUnit(from) + ut := NewUnit(to) + factor := float64(uf.getPrefix()) / float64(ut.getPrefix()) + + for i := 0; i < len(s); i++ { + s[i] = math.Ceil(s[i] * factor) + } +} + +func getNormalizationFactor(v float64) (float64, int) { + count := 0 + scale := -3 + + if v > 1000.0 { + for v > 1000.0 { + v *= 1e-3 + count++ + } + } else { + for v < 1.0 { + v *= 1e3 + count++ + } + scale = 3 + } + return math.Pow10(count * scale), count * scale +} + +func NormalizeValue(v *float64, us string, nu *string) { + u := NewUnit(us) + f, e := getNormalizationFactor((*v)) + *v = math.Ceil(*v * f) + u.setPrefix(NewPrefixFromFactor(u.getPrefix(), e)) + *nu = u.Short() +} + +func NormalizeSeries(s []float64, avg float64, us string, nu *string) { + u := NewUnit(us) + f, e := getNormalizationFactor(avg) + + for i := 0; i < len(s); i++ { + s[i] *= f + s[i] = math.Ceil(s[i]) + } + u.setPrefix(NewPrefixFromFactor(u.getPrefix(), e)) + fmt.Printf("Prefix: %e \n", u.getPrefix()) + *nu = u.Short() +} + // GetPrefixPrefixFactor creates the default conversion function between two prefixes. // It returns a conversation function for the value. func GetPrefixPrefixFactor(in Prefix, out Prefix) func(value interface{}) interface{} { diff --git a/pkg/units/units_test.go b/pkg/units/units_test.go index 5045ab1..cf0bce4 100644 --- a/pkg/units/units_test.go +++ b/pkg/units/units_test.go @@ -2,6 +2,7 @@ package units import ( "fmt" + "reflect" "regexp" "testing" ) @@ -199,3 +200,108 @@ func TestPrefixRegex(t *testing.T) { t.Logf("succussfully compiled regex '%s' for prefix %s", data.Regex, data.Long) } } + +func TestConvertValue(t *testing.T) { + v := float64(103456) + ConvertValue(&v, "MB/s", "GB/s") + + if v != 104.00 { + t.Errorf("Failed ConvertValue: Want 103.456, Got %f", v) + } +} + +func TestConvertValueUp(t *testing.T) { + v := float64(10.3456) + ConvertValue(&v, "GB/s", "MB/s") + + if v != 10346.00 { + t.Errorf("Failed ConvertValue: Want 10346.00, Got %f", v) + } +} +func TestConvertSeries(t *testing.T) { + s := []float64{2890031237, 23998994567, 389734042344, 390349424345} + r := []float64{3, 24, 390, 391} + ConvertSeries(s, "F/s", "GF/s") + + if !reflect.DeepEqual(s, r) { + t.Errorf("Failed ConvertValue: Want 3, 24, 390, 391, Got %v", s) + } +} + +func TestNormalizeValue(t *testing.T) { + var s string + v := float64(103456) + + NormalizeValue(&v, "MB/s", &s) + + if v != 104.00 { + t.Errorf("Failed ConvertValue: Want 104.00, Got %f", v) + } + if s != "GB/s" { + t.Errorf("Failed Prefix or unit: Want GB/s, Got %s", s) + } +} + +func TestNormalizeValueNoPrefix(t *testing.T) { + var s string + v := float64(103458596) + + NormalizeValue(&v, "F/s", &s) + + if v != 104.00 { + t.Errorf("Failed ConvertValue: Want 104.00, Got %f", v) + } + if s != "MFlops/s" { + t.Errorf("Failed Prefix or unit: Want GB/s, Got %s", s) + } +} + +func TestNormalizeValueKeep(t *testing.T) { + var s string + v := float64(345) + + NormalizeValue(&v, "MB/s", &s) + + if v != 345.00 { + t.Errorf("Failed ConvertValue: Want 104.00, Got %f", v) + } + if s != "MB/s" { + t.Errorf("Failed Prefix or unit: Want GB/s, Got %s", s) + } +} + +func TestNormalizeValueDown(t *testing.T) { + var s string + v := float64(0.0004578) + + NormalizeValue(&v, "GB/s", &s) + + if v != 458.00 { + t.Errorf("Failed ConvertValue: Want 458.00, Got %f", v) + } + if s != "KB/s" { + t.Errorf("Failed Prefix or unit: Want KB/s, Got %s", s) + } +} + +func TestNormalizeSeries(t *testing.T) { + var us string + s := []float64{2890031237, 23998994567, 389734042344, 390349424345} + r := []float64{3, 24, 390, 391} + + total := 0.0 + for _, number := range s { + total += number + } + avg := total / float64(len(s)) + + fmt.Printf("AVG: %e\n", avg) + NormalizeSeries(s, avg, "KB/s", &us) + + if !reflect.DeepEqual(s, r) { + t.Errorf("Failed ConvertValue: Want 3, 24, 390, 391, Got %v", s) + } + if us != "TB/s" { + t.Errorf("Failed Prefix or unit: Want TB/s, Got %s", us) + } +} From 29d215fcea9a626e7bd49d70f7470586726a2aff Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Tue, 4 Oct 2022 10:12:35 +0200 Subject: [PATCH 2/2] Intermediate Save commit --- internal/repository/init.go | 39 +++++++++---- internal/repository/job.go | 22 +++++-- pkg/schema/cluster.go | 2 +- pkg/schema/float.go | 20 +++++++ pkg/schema/job.go | 78 +++++++++++++------------ pkg/schema/metrics.go | 2 +- pkg/schema/schemas/cluster.schema.json | 3 +- pkg/schema/schemas/job-meta.schema.json | 1 - pkg/schema/schemas/unit.schema.json | 4 +- test/integration_test.go | 13 +++++ 10 files changed, 125 insertions(+), 59 deletions(-) diff --git a/internal/repository/init.go b/internal/repository/init.go index 1ea8d17..554d88b 100644 --- a/internal/repository/init.go +++ b/internal/repository/init.go @@ -17,6 +17,7 @@ import ( "github.com/ClusterCockpit/cc-backend/pkg/archive" "github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/schema" + "github.com/ClusterCockpit/cc-backend/pkg/units" ) // `AUTO_INCREMENT` is in a comment because of this hack: @@ -103,11 +104,11 @@ func HandleImportFlag(flag string) error { return err } - if config.Keys.Validate { - if err := schema.Validate(schema.Meta, bytes.NewReader(raw)); err != nil { - return fmt.Errorf("validate job meta: %v", err) - } + // if config.Keys.Validate { + if err := schema.Validate(schema.Meta, bytes.NewReader(raw)); err != nil { + return fmt.Errorf("validate job meta: %v", err) } + // } dec := json.NewDecoder(bytes.NewReader(raw)) dec.DisallowUnknownFields() jobMeta := schema.JobMeta{BaseJob: schema.JobDefaults} @@ -132,6 +133,7 @@ func HandleImportFlag(flag string) error { return err } + checkJobData(&jobData) SanityChecks(&jobMeta.BaseJob) jobMeta.MonitoringStatus = schema.MonitoringStatusArchivingSuccessful if job, err := GetJobRepository().Find(&jobMeta.JobID, &jobMeta.Cluster, &jobMeta.StartTime); err != sql.ErrNoRows { @@ -368,14 +370,31 @@ func loadJobStat(job *schema.JobMeta, metric string) float64 { } func checkJobData(d *schema.JobData) error { - // for name, scopes := range *d { + for _, scopes := range *d { + var newUnit string + // Add node scope if missing + for _, metric := range scopes { + if strings.Contains(metric.Unit.Base, "B/s") || + strings.Contains(metric.Unit.Base, "F/s") || + strings.Contains(metric.Unit.Base, "B") { - // for scope, metric := range scopes { - // // 1. Unit normalisation + // First get overall avg + sum := 0.0 + for _, s := range metric.Series { + sum += s.Statistics.Avg + } - // } - // // 2. Add node scope if missing + avg := sum / float64(len(metric.Series)) - // } + for _, s := range metric.Series { + fp := schema.ConvertFloatToFloat64(s.Data) + // Normalize values with new unit prefix + units.NormalizeSeries(fp, avg, metric.Unit, &newUnit) + s.Data = schema.GetFloat64ToFloat(fp) + } + metric.Unit = newUnit + } + } + } return nil } diff --git a/internal/repository/job.go b/internal/repository/job.go index 0496698..90b7dbf 100644 --- a/internal/repository/job.go +++ b/internal/repository/job.go @@ -211,7 +211,13 @@ func (r *JobRepository) Stop( } // TODO: Use node hours instead: SELECT job.user, sum(job.num_nodes * (CASE WHEN job.job_state = "running" THEN CAST(strftime('%s', 'now') AS INTEGER) - job.start_time ELSE job.duration END)) as x FROM job GROUP BY user ORDER BY x DESC; -func (r *JobRepository) CountGroupedJobs(ctx context.Context, aggreg model.Aggregate, filters []*model.JobFilter, weight *model.Weights, limit *int) (map[string]int, error) { +func (r *JobRepository) CountGroupedJobs( + ctx context.Context, + aggreg model.Aggregate, + filters []*model.JobFilter, + weight *model.Weights, + limit *int) (map[string]int, error) { + if !aggreg.IsValid() { return nil, errors.New("invalid aggregate") } @@ -301,10 +307,14 @@ func (r *JobRepository) Archive( var ErrNotFound = errors.New("no such job or user") -// FindJobOrUser returns a job database ID or a username if a job or user machtes the search term. -// As 0 is a valid job id, check if username is "" instead in order to check what machted. -// If nothing matches the search, `ErrNotFound` is returned. -func (r *JobRepository) FindJobOrUser(ctx context.Context, searchterm string) (job int64, username string, err error) { +// FindJobOrUser returns a job database ID or a username if a job or user +// machtes the search term. As 0 is a valid job id, check if username is "" +// instead in order to check what matched. If nothing matches the search, +// `ErrNotFound` is returned. +func (r *JobRepository) FindJobOrUser( + ctx context.Context, + searchterm string) (job int64, username string, err error) { + user := auth.GetUser(ctx) if id, err := strconv.Atoi(searchterm); err == nil { qb := sq.Select("job.id").From("job").Where("job.job_id = ?", id) @@ -353,6 +363,7 @@ func (r *JobRepository) Partitions(cluster string) ([]string, error) { // AllocatedNodes returns a map of all subclusters to a map of hostnames to the amount of jobs running on that host. // Hosts with zero jobs running on them will not show up! func (r *JobRepository) AllocatedNodes(cluster string) (map[string]map[string]int, error) { + subclusters := make(map[string]map[string]int) rows, err := sq.Select("resources", "subcluster").From("job"). Where("job.job_state = 'running'"). @@ -390,6 +401,7 @@ func (r *JobRepository) AllocatedNodes(cluster string) (map[string]map[string]in } func (r *JobRepository) StopJobsExceedingWalltimeBy(seconds int) error { + res, err := sq.Update("job"). Set("monitoring_status", schema.MonitoringStatusArchivingFailed). Set("duration", 0). diff --git a/pkg/schema/cluster.go b/pkg/schema/cluster.go index 729f72a..28d9450 100644 --- a/pkg/schema/cluster.go +++ b/pkg/schema/cluster.go @@ -45,7 +45,7 @@ type SubClusterConfig struct { type MetricConfig struct { Name string `json:"name"` - Unit string `json:"unit"` + Unit Unit `json:"unit"` Scope MetricScope `json:"scope"` Aggregation *string `json:"aggregation"` Timestep int `json:"timestep"` diff --git a/pkg/schema/float.go b/pkg/schema/float.go index df084fa..0e2165f 100644 --- a/pkg/schema/float.go +++ b/pkg/schema/float.go @@ -107,3 +107,23 @@ func (s *Series) MarshalJSON() ([]byte, error) { buf = append(buf, ']', '}') return buf, nil } + +func ConvertFloatToFloat64(s []Float) []float64 { + fp := make([]float64, len(s)) + + for i, val := range s { + fp[i] = float64(val) + } + + return fp +} + +func GetFloat64ToFloat(s []float64) []Float { + fp := make([]Float, len(s)) + + for i, val := range s { + fp[i] = Float(val) + } + + return fp +} diff --git a/pkg/schema/job.go b/pkg/schema/job.go index d2db324..d0d4c52 100644 --- a/pkg/schema/job.go +++ b/pkg/schema/job.go @@ -17,26 +17,26 @@ import ( type BaseJob struct { // The unique identifier of a job JobID int64 `json:"jobId" db:"job_id" example:"123000"` - User string `json:"user" db:"user" example:"abcd100h"` // The unique identifier of a user - Project string `json:"project" db:"project" example:"abcd200"` // The unique identifier of a project - Cluster string `json:"cluster" db:"cluster" example:"fritz"` // The unique identifier of a cluster - SubCluster string `json:"subCluster" db:"subcluster" example:"main"` // The unique identifier of a sub cluster - Partition string `json:"partition" db:"partition" example:"main"` // The Slurm partition to which the job was submitted - ArrayJobId int64 `json:"arrayJobId" db:"array_job_id" example:"123000"` // The unique identifier of an array job - NumNodes int32 `json:"numNodes" db:"num_nodes" example:"2" minimum:"1"` // Number of nodes used (Min > 0) - NumHWThreads int32 `json:"numHwthreads" db:"num_hwthreads" example:"20" minimum:"1"` // Number of HWThreads used (Min > 0) - NumAcc int32 `json:"numAcc" db:"num_acc" example:"2" minimum:"1"` // Number of accelerators used (Min > 0) - Exclusive int32 `json:"exclusive" db:"exclusive" example:"1" minimum:"0" maximum:"2"` // Specifies how nodes are shared: 0 - Shared among multiple jobs of multiple users, 1 - Job exclusive (Default), 2 - Shared among multiple jobs of same user - MonitoringStatus int32 `json:"monitoringStatus" db:"monitoring_status" example:"1" minimum:"0" maximum:"3"` // State of monitoring system during job run: 0 - Disabled, 1 - Running or Archiving (Default), 2 - Archiving Failed, 3 - Archiving Successfull - SMT int32 `json:"smt" db:"smt" example:"4"` // SMT threads used by job + User string `json:"user" db:"user" example:"abcd100h"` // The unique identifier of a user + Project string `json:"project" db:"project" example:"abcd200"` // The unique identifier of a project + Cluster string `json:"cluster" db:"cluster" example:"fritz"` // The unique identifier of a cluster + SubCluster string `json:"subCluster" db:"subcluster" example:"main"` // The unique identifier of a sub cluster + Partition string `json:"partition" db:"partition" example:"main"` // The Slurm partition to which the job was submitted + ArrayJobId int64 `json:"arrayJobId" db:"array_job_id" example:"123000"` // The unique identifier of an array job + NumNodes int32 `json:"numNodes" db:"num_nodes" example:"2" minimum:"1"` // Number of nodes used (Min > 0) + NumHWThreads int32 `json:"numHwthreads" db:"num_hwthreads" example:"20" minimum:"1"` // Number of HWThreads used (Min > 0) + NumAcc int32 `json:"numAcc" db:"num_acc" example:"2" minimum:"1"` // Number of accelerators used (Min > 0) + Exclusive int32 `json:"exclusive" db:"exclusive" example:"1" minimum:"0" maximum:"2"` // Specifies how nodes are shared: 0 - Shared among multiple jobs of multiple users, 1 - Job exclusive (Default), 2 - Shared among multiple jobs of same user + MonitoringStatus int32 `json:"monitoringStatus" db:"monitoring_status" example:"1" minimum:"0" maximum:"3"` // State of monitoring system during job run: 0 - Disabled, 1 - Running or Archiving (Default), 2 - Archiving Failed, 3 - Archiving Successfull + SMT int32 `json:"smt" db:"smt" example:"4"` // SMT threads used by job State JobState `json:"jobState" db:"job_state" example:"completed" enums:"completed,failed,cancelled,stopped,timeout,out_of_memory"` // Final state of job - Duration int32 `json:"duration" db:"duration" example:"43200" minimum:"1"` // Duration of job in seconds (Min > 0) - Walltime int64 `json:"walltime" db:"walltime" example:"86400" minimum:"1"` // Requested walltime of job in seconds (Min > 0) - Tags []*Tag `json:"tags"` // List of tags - RawResources []byte `json:"-" db:"resources"` // Resources used by job [As Bytes] - Resources []*Resource `json:"resources"` // Resources used by job - RawMetaData []byte `json:"-" db:"meta_data"` // Additional information about the job [As Bytes] - MetaData map[string]string `json:"metaData"` // Additional information about the job + Duration int32 `json:"duration" db:"duration" example:"43200" minimum:"1"` // Duration of job in seconds (Min > 0) + Walltime int64 `json:"walltime" db:"walltime" example:"86400" minimum:"1"` // Requested walltime of job in seconds (Min > 0) + Tags []*Tag `json:"tags"` // List of tags + RawResources []byte `json:"-" db:"resources"` // Resources used by job [As Bytes] + Resources []*Resource `json:"resources"` // Resources used by job + RawMetaData []byte `json:"-" db:"meta_data"` // Additional information about the job [As Bytes] + MetaData map[string]string `json:"metaData"` // Additional information about the job } // Non-Swaggered Comment: Job @@ -49,15 +49,15 @@ type Job struct { ID int64 `json:"id" db:"id"` BaseJob StartTimeUnix int64 `json:"-" db:"start_time" example:"1649723812"` // Start epoch time stamp in seconds - StartTime time.Time `json:"startTime"` // Start time as 'time.Time' data type - MemUsedMax float64 `json:"-" db:"mem_used_max"` // MemUsedMax as Float64 - FlopsAnyAvg float64 `json:"-" db:"flops_any_avg"` // FlopsAnyAvg as Float64 - MemBwAvg float64 `json:"-" db:"mem_bw_avg"` // MemBwAvg as Float64 - LoadAvg float64 `json:"-" db:"load_avg"` // LoadAvg as Float64 - NetBwAvg float64 `json:"-" db:"net_bw_avg"` // NetBwAvg as Float64 - NetDataVolTotal float64 `json:"-" db:"net_data_vol_total"` // NetDataVolTotal as Float64 - FileBwAvg float64 `json:"-" db:"file_bw_avg"` // FileBwAvg as Float64 - FileDataVolTotal float64 `json:"-" db:"file_data_vol_total"` // FileDataVolTotal as Float64 + StartTime time.Time `json:"startTime"` // Start time as 'time.Time' data type + MemUsedMax float64 `json:"-" db:"mem_used_max"` // MemUsedMax as Float64 + FlopsAnyAvg float64 `json:"-" db:"flops_any_avg"` // FlopsAnyAvg as Float64 + MemBwAvg float64 `json:"-" db:"mem_bw_avg"` // MemBwAvg as Float64 + LoadAvg float64 `json:"-" db:"load_avg"` // LoadAvg as Float64 + NetBwAvg float64 `json:"-" db:"net_bw_avg"` // NetBwAvg as Float64 + NetDataVolTotal float64 `json:"-" db:"net_data_vol_total"` // NetDataVolTotal as Float64 + FileBwAvg float64 `json:"-" db:"file_bw_avg"` // FileBwAvg as Float64 + FileDataVolTotal float64 `json:"-" db:"file_data_vol_total"` // FileDataVolTotal as Float64 } // Non-Swaggered Comment: JobMeta @@ -70,11 +70,11 @@ type Job struct { // JobMeta model // @Description Meta data information of a HPC job. type JobMeta struct { - // The unique identifier of a job in the database + // The unique identifier of a job in the database ID *int64 `json:"id,omitempty"` BaseJob StartTime int64 `json:"startTime" db:"start_time" example:"1649723812" minimum:"1"` // Start epoch time stamp in seconds (Min > 0) - Statistics map[string]JobStatistics `json:"statistics,omitempty"` // Metric statistics of job + Statistics map[string]JobStatistics `json:"statistics,omitempty"` // Metric statistics of job } const ( @@ -89,11 +89,15 @@ var JobDefaults BaseJob = BaseJob{ MonitoringStatus: MonitoringStatusRunningOrArchiving, } +type Unit struct { + Base string `json:"base"` + Prefix string `json:"prefix"` +} + // JobStatistics model // @Description Specification for job metric statistics. type JobStatistics struct { - // Metric unit (see schema/unit.schema.json) - Unit string `json:"unit" example:"GHz"` + Unit Unit `json:"unit" example:"GHz"` Avg float64 `json:"avg" example:"2500" minimum:"0"` // Job metric average Min float64 `json:"min" example:"2000" minimum:"0"` // Job metric minimum Max float64 `json:"max" example:"3000" minimum:"0"` // Job metric maximum @@ -102,18 +106,18 @@ type JobStatistics struct { // Tag model // @Description Defines a tag using name and type. type Tag struct { - // The unique DB identifier of a tag + // The unique DB identifier of a tag ID int64 `json:"id" db:"id"` - Type string `json:"type" db:"tag_type" example:"Debug"` // Tag Type + Type string `json:"type" db:"tag_type" example:"Debug"` // Tag Type Name string `json:"name" db:"tag_name" example:"Testjob"` // Tag Name } // Resource model // @Description A resource used by a job type Resource struct { - Hostname string `json:"hostname"` // Name of the host (= node) - HWThreads []int `json:"hwthreads,omitempty"` // List of OS processor ids - Accelerators []string `json:"accelerators,omitempty"` // List of of accelerator device ids + Hostname string `json:"hostname"` // Name of the host (= node) + HWThreads []int `json:"hwthreads,omitempty"` // List of OS processor ids + Accelerators []string `json:"accelerators,omitempty"` // List of of accelerator device ids Configuration string `json:"configuration,omitempty"` // The configuration options of the node } diff --git a/pkg/schema/metrics.go b/pkg/schema/metrics.go index 171901c..c0f1fa3 100644 --- a/pkg/schema/metrics.go +++ b/pkg/schema/metrics.go @@ -15,7 +15,7 @@ import ( type JobData map[string]map[MetricScope]*JobMetric type JobMetric struct { - Unit string `json:"unit"` + Unit Unit `json:"unit"` Scope MetricScope `json:"scope"` Timestep int `json:"timestep"` Series []Series `json:"series"` diff --git a/pkg/schema/schemas/cluster.schema.json b/pkg/schema/schemas/cluster.schema.json index 5dfb92e..7cbf639 100644 --- a/pkg/schema/schemas/cluster.schema.json +++ b/pkg/schema/schemas/cluster.schema.json @@ -21,7 +21,7 @@ }, "unit": { "description": "Metric unit", - "type": "string" + "$ref": "embedfs://unit.schema.json" }, "scope": { "description": "Native measurement resolution", @@ -38,7 +38,6 @@ "sum", "avg" ] - }, "subClusters": { "description": "Array of cluster hardware partition metric thresholds", diff --git a/pkg/schema/schemas/job-meta.schema.json b/pkg/schema/schemas/job-meta.schema.json index 6959ec0..061c920 100644 --- a/pkg/schema/schemas/job-meta.schema.json +++ b/pkg/schema/schemas/job-meta.schema.json @@ -349,7 +349,6 @@ "jobState", "duration", "resources", - "tags", "statistics" ] } diff --git a/pkg/schema/schemas/unit.schema.json b/pkg/schema/schemas/unit.schema.json index 85b1458..c5bad7e 100644 --- a/pkg/schema/schemas/unit.schema.json +++ b/pkg/schema/schemas/unit.schema.json @@ -5,7 +5,7 @@ "description": "Format specification for job metric units", "type": "object", "properties": { - "base_unit": { + "base": { "description": "Metric base unit", "type": "string", "enum": [ @@ -36,6 +36,6 @@ } }, "required": [ - "base_unit" + "base" ] } diff --git a/test/integration_test.go b/test/integration_test.go index abafe08..a8de0ce 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -602,4 +602,17 @@ func testImportFlag(t *testing.T) { if len(data) != 8 { t.Errorf("Job data length: Got %d, want 8", len(data)) } + + r := map[string]string{"mem_used": "GB", "net_bw": "KB/s", + "cpu_power": "W", "cpu_used": "cpu_used", + "file_bw": "KB/s", "flops_any": "Flops/s", + "mem_bw": "GB/s", "ipc": "IPC"} + + for name, scopes := range data { + for _, metric := range scopes { + if metric.Unit != r[name] { + t.Errorf("Metric %s unit: Got %s, want %s", name, metric.Unit, r[name]) + } + } + } }