diff --git a/internal/repository/init.go b/internal/repository/init.go index 1ea8d17..554d88b 100644 --- a/internal/repository/init.go +++ b/internal/repository/init.go @@ -17,6 +17,7 @@ import ( "github.com/ClusterCockpit/cc-backend/pkg/archive" "github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/schema" + "github.com/ClusterCockpit/cc-backend/pkg/units" ) // `AUTO_INCREMENT` is in a comment because of this hack: @@ -103,11 +104,11 @@ func HandleImportFlag(flag string) error { return err } - if config.Keys.Validate { - if err := schema.Validate(schema.Meta, bytes.NewReader(raw)); err != nil { - return fmt.Errorf("validate job meta: %v", err) - } + // if config.Keys.Validate { + if err := schema.Validate(schema.Meta, bytes.NewReader(raw)); err != nil { + return fmt.Errorf("validate job meta: %v", err) } + // } dec := json.NewDecoder(bytes.NewReader(raw)) dec.DisallowUnknownFields() jobMeta := schema.JobMeta{BaseJob: schema.JobDefaults} @@ -132,6 +133,7 @@ func HandleImportFlag(flag string) error { return err } + checkJobData(&jobData) SanityChecks(&jobMeta.BaseJob) jobMeta.MonitoringStatus = schema.MonitoringStatusArchivingSuccessful if job, err := GetJobRepository().Find(&jobMeta.JobID, &jobMeta.Cluster, &jobMeta.StartTime); err != sql.ErrNoRows { @@ -368,14 +370,31 @@ func loadJobStat(job *schema.JobMeta, metric string) float64 { } func checkJobData(d *schema.JobData) error { - // for name, scopes := range *d { + for _, scopes := range *d { + var newUnit string + // Add node scope if missing + for _, metric := range scopes { + if strings.Contains(metric.Unit.Base, "B/s") || + strings.Contains(metric.Unit.Base, "F/s") || + strings.Contains(metric.Unit.Base, "B") { - // for scope, metric := range scopes { - // // 1. Unit normalisation + // First get overall avg + sum := 0.0 + for _, s := range metric.Series { + sum += s.Statistics.Avg + } - // } - // // 2. Add node scope if missing + avg := sum / float64(len(metric.Series)) - // } + for _, s := range metric.Series { + fp := schema.ConvertFloatToFloat64(s.Data) + // Normalize values with new unit prefix + units.NormalizeSeries(fp, avg, metric.Unit, &newUnit) + s.Data = schema.GetFloat64ToFloat(fp) + } + metric.Unit = newUnit + } + } + } return nil } diff --git a/internal/repository/job.go b/internal/repository/job.go index 0496698..90b7dbf 100644 --- a/internal/repository/job.go +++ b/internal/repository/job.go @@ -211,7 +211,13 @@ func (r *JobRepository) Stop( } // TODO: Use node hours instead: SELECT job.user, sum(job.num_nodes * (CASE WHEN job.job_state = "running" THEN CAST(strftime('%s', 'now') AS INTEGER) - job.start_time ELSE job.duration END)) as x FROM job GROUP BY user ORDER BY x DESC; -func (r *JobRepository) CountGroupedJobs(ctx context.Context, aggreg model.Aggregate, filters []*model.JobFilter, weight *model.Weights, limit *int) (map[string]int, error) { +func (r *JobRepository) CountGroupedJobs( + ctx context.Context, + aggreg model.Aggregate, + filters []*model.JobFilter, + weight *model.Weights, + limit *int) (map[string]int, error) { + if !aggreg.IsValid() { return nil, errors.New("invalid aggregate") } @@ -301,10 +307,14 @@ func (r *JobRepository) Archive( var ErrNotFound = errors.New("no such job or user") -// FindJobOrUser returns a job database ID or a username if a job or user machtes the search term. -// As 0 is a valid job id, check if username is "" instead in order to check what machted. -// If nothing matches the search, `ErrNotFound` is returned. -func (r *JobRepository) FindJobOrUser(ctx context.Context, searchterm string) (job int64, username string, err error) { +// FindJobOrUser returns a job database ID or a username if a job or user +// machtes the search term. As 0 is a valid job id, check if username is "" +// instead in order to check what matched. If nothing matches the search, +// `ErrNotFound` is returned. +func (r *JobRepository) FindJobOrUser( + ctx context.Context, + searchterm string) (job int64, username string, err error) { + user := auth.GetUser(ctx) if id, err := strconv.Atoi(searchterm); err == nil { qb := sq.Select("job.id").From("job").Where("job.job_id = ?", id) @@ -353,6 +363,7 @@ func (r *JobRepository) Partitions(cluster string) ([]string, error) { // AllocatedNodes returns a map of all subclusters to a map of hostnames to the amount of jobs running on that host. // Hosts with zero jobs running on them will not show up! func (r *JobRepository) AllocatedNodes(cluster string) (map[string]map[string]int, error) { + subclusters := make(map[string]map[string]int) rows, err := sq.Select("resources", "subcluster").From("job"). Where("job.job_state = 'running'"). @@ -390,6 +401,7 @@ func (r *JobRepository) AllocatedNodes(cluster string) (map[string]map[string]in } func (r *JobRepository) StopJobsExceedingWalltimeBy(seconds int) error { + res, err := sq.Update("job"). Set("monitoring_status", schema.MonitoringStatusArchivingFailed). Set("duration", 0). diff --git a/pkg/schema/cluster.go b/pkg/schema/cluster.go index 729f72a..28d9450 100644 --- a/pkg/schema/cluster.go +++ b/pkg/schema/cluster.go @@ -45,7 +45,7 @@ type SubClusterConfig struct { type MetricConfig struct { Name string `json:"name"` - Unit string `json:"unit"` + Unit Unit `json:"unit"` Scope MetricScope `json:"scope"` Aggregation *string `json:"aggregation"` Timestep int `json:"timestep"` diff --git a/pkg/schema/float.go b/pkg/schema/float.go index df084fa..0e2165f 100644 --- a/pkg/schema/float.go +++ b/pkg/schema/float.go @@ -107,3 +107,23 @@ func (s *Series) MarshalJSON() ([]byte, error) { buf = append(buf, ']', '}') return buf, nil } + +func ConvertFloatToFloat64(s []Float) []float64 { + fp := make([]float64, len(s)) + + for i, val := range s { + fp[i] = float64(val) + } + + return fp +} + +func GetFloat64ToFloat(s []float64) []Float { + fp := make([]Float, len(s)) + + for i, val := range s { + fp[i] = Float(val) + } + + return fp +} diff --git a/pkg/schema/job.go b/pkg/schema/job.go index d2db324..d0d4c52 100644 --- a/pkg/schema/job.go +++ b/pkg/schema/job.go @@ -17,26 +17,26 @@ import ( type BaseJob struct { // The unique identifier of a job JobID int64 `json:"jobId" db:"job_id" example:"123000"` - User string `json:"user" db:"user" example:"abcd100h"` // The unique identifier of a user - Project string `json:"project" db:"project" example:"abcd200"` // The unique identifier of a project - Cluster string `json:"cluster" db:"cluster" example:"fritz"` // The unique identifier of a cluster - SubCluster string `json:"subCluster" db:"subcluster" example:"main"` // The unique identifier of a sub cluster - Partition string `json:"partition" db:"partition" example:"main"` // The Slurm partition to which the job was submitted - ArrayJobId int64 `json:"arrayJobId" db:"array_job_id" example:"123000"` // The unique identifier of an array job - NumNodes int32 `json:"numNodes" db:"num_nodes" example:"2" minimum:"1"` // Number of nodes used (Min > 0) - NumHWThreads int32 `json:"numHwthreads" db:"num_hwthreads" example:"20" minimum:"1"` // Number of HWThreads used (Min > 0) - NumAcc int32 `json:"numAcc" db:"num_acc" example:"2" minimum:"1"` // Number of accelerators used (Min > 0) - Exclusive int32 `json:"exclusive" db:"exclusive" example:"1" minimum:"0" maximum:"2"` // Specifies how nodes are shared: 0 - Shared among multiple jobs of multiple users, 1 - Job exclusive (Default), 2 - Shared among multiple jobs of same user - MonitoringStatus int32 `json:"monitoringStatus" db:"monitoring_status" example:"1" minimum:"0" maximum:"3"` // State of monitoring system during job run: 0 - Disabled, 1 - Running or Archiving (Default), 2 - Archiving Failed, 3 - Archiving Successfull - SMT int32 `json:"smt" db:"smt" example:"4"` // SMT threads used by job + User string `json:"user" db:"user" example:"abcd100h"` // The unique identifier of a user + Project string `json:"project" db:"project" example:"abcd200"` // The unique identifier of a project + Cluster string `json:"cluster" db:"cluster" example:"fritz"` // The unique identifier of a cluster + SubCluster string `json:"subCluster" db:"subcluster" example:"main"` // The unique identifier of a sub cluster + Partition string `json:"partition" db:"partition" example:"main"` // The Slurm partition to which the job was submitted + ArrayJobId int64 `json:"arrayJobId" db:"array_job_id" example:"123000"` // The unique identifier of an array job + NumNodes int32 `json:"numNodes" db:"num_nodes" example:"2" minimum:"1"` // Number of nodes used (Min > 0) + NumHWThreads int32 `json:"numHwthreads" db:"num_hwthreads" example:"20" minimum:"1"` // Number of HWThreads used (Min > 0) + NumAcc int32 `json:"numAcc" db:"num_acc" example:"2" minimum:"1"` // Number of accelerators used (Min > 0) + Exclusive int32 `json:"exclusive" db:"exclusive" example:"1" minimum:"0" maximum:"2"` // Specifies how nodes are shared: 0 - Shared among multiple jobs of multiple users, 1 - Job exclusive (Default), 2 - Shared among multiple jobs of same user + MonitoringStatus int32 `json:"monitoringStatus" db:"monitoring_status" example:"1" minimum:"0" maximum:"3"` // State of monitoring system during job run: 0 - Disabled, 1 - Running or Archiving (Default), 2 - Archiving Failed, 3 - Archiving Successfull + SMT int32 `json:"smt" db:"smt" example:"4"` // SMT threads used by job State JobState `json:"jobState" db:"job_state" example:"completed" enums:"completed,failed,cancelled,stopped,timeout,out_of_memory"` // Final state of job - Duration int32 `json:"duration" db:"duration" example:"43200" minimum:"1"` // Duration of job in seconds (Min > 0) - Walltime int64 `json:"walltime" db:"walltime" example:"86400" minimum:"1"` // Requested walltime of job in seconds (Min > 0) - Tags []*Tag `json:"tags"` // List of tags - RawResources []byte `json:"-" db:"resources"` // Resources used by job [As Bytes] - Resources []*Resource `json:"resources"` // Resources used by job - RawMetaData []byte `json:"-" db:"meta_data"` // Additional information about the job [As Bytes] - MetaData map[string]string `json:"metaData"` // Additional information about the job + Duration int32 `json:"duration" db:"duration" example:"43200" minimum:"1"` // Duration of job in seconds (Min > 0) + Walltime int64 `json:"walltime" db:"walltime" example:"86400" minimum:"1"` // Requested walltime of job in seconds (Min > 0) + Tags []*Tag `json:"tags"` // List of tags + RawResources []byte `json:"-" db:"resources"` // Resources used by job [As Bytes] + Resources []*Resource `json:"resources"` // Resources used by job + RawMetaData []byte `json:"-" db:"meta_data"` // Additional information about the job [As Bytes] + MetaData map[string]string `json:"metaData"` // Additional information about the job } // Non-Swaggered Comment: Job @@ -49,15 +49,15 @@ type Job struct { ID int64 `json:"id" db:"id"` BaseJob StartTimeUnix int64 `json:"-" db:"start_time" example:"1649723812"` // Start epoch time stamp in seconds - StartTime time.Time `json:"startTime"` // Start time as 'time.Time' data type - MemUsedMax float64 `json:"-" db:"mem_used_max"` // MemUsedMax as Float64 - FlopsAnyAvg float64 `json:"-" db:"flops_any_avg"` // FlopsAnyAvg as Float64 - MemBwAvg float64 `json:"-" db:"mem_bw_avg"` // MemBwAvg as Float64 - LoadAvg float64 `json:"-" db:"load_avg"` // LoadAvg as Float64 - NetBwAvg float64 `json:"-" db:"net_bw_avg"` // NetBwAvg as Float64 - NetDataVolTotal float64 `json:"-" db:"net_data_vol_total"` // NetDataVolTotal as Float64 - FileBwAvg float64 `json:"-" db:"file_bw_avg"` // FileBwAvg as Float64 - FileDataVolTotal float64 `json:"-" db:"file_data_vol_total"` // FileDataVolTotal as Float64 + StartTime time.Time `json:"startTime"` // Start time as 'time.Time' data type + MemUsedMax float64 `json:"-" db:"mem_used_max"` // MemUsedMax as Float64 + FlopsAnyAvg float64 `json:"-" db:"flops_any_avg"` // FlopsAnyAvg as Float64 + MemBwAvg float64 `json:"-" db:"mem_bw_avg"` // MemBwAvg as Float64 + LoadAvg float64 `json:"-" db:"load_avg"` // LoadAvg as Float64 + NetBwAvg float64 `json:"-" db:"net_bw_avg"` // NetBwAvg as Float64 + NetDataVolTotal float64 `json:"-" db:"net_data_vol_total"` // NetDataVolTotal as Float64 + FileBwAvg float64 `json:"-" db:"file_bw_avg"` // FileBwAvg as Float64 + FileDataVolTotal float64 `json:"-" db:"file_data_vol_total"` // FileDataVolTotal as Float64 } // Non-Swaggered Comment: JobMeta @@ -70,11 +70,11 @@ type Job struct { // JobMeta model // @Description Meta data information of a HPC job. type JobMeta struct { - // The unique identifier of a job in the database + // The unique identifier of a job in the database ID *int64 `json:"id,omitempty"` BaseJob StartTime int64 `json:"startTime" db:"start_time" example:"1649723812" minimum:"1"` // Start epoch time stamp in seconds (Min > 0) - Statistics map[string]JobStatistics `json:"statistics,omitempty"` // Metric statistics of job + Statistics map[string]JobStatistics `json:"statistics,omitempty"` // Metric statistics of job } const ( @@ -89,11 +89,15 @@ var JobDefaults BaseJob = BaseJob{ MonitoringStatus: MonitoringStatusRunningOrArchiving, } +type Unit struct { + Base string `json:"base"` + Prefix string `json:"prefix"` +} + // JobStatistics model // @Description Specification for job metric statistics. type JobStatistics struct { - // Metric unit (see schema/unit.schema.json) - Unit string `json:"unit" example:"GHz"` + Unit Unit `json:"unit" example:"GHz"` Avg float64 `json:"avg" example:"2500" minimum:"0"` // Job metric average Min float64 `json:"min" example:"2000" minimum:"0"` // Job metric minimum Max float64 `json:"max" example:"3000" minimum:"0"` // Job metric maximum @@ -102,18 +106,18 @@ type JobStatistics struct { // Tag model // @Description Defines a tag using name and type. type Tag struct { - // The unique DB identifier of a tag + // The unique DB identifier of a tag ID int64 `json:"id" db:"id"` - Type string `json:"type" db:"tag_type" example:"Debug"` // Tag Type + Type string `json:"type" db:"tag_type" example:"Debug"` // Tag Type Name string `json:"name" db:"tag_name" example:"Testjob"` // Tag Name } // Resource model // @Description A resource used by a job type Resource struct { - Hostname string `json:"hostname"` // Name of the host (= node) - HWThreads []int `json:"hwthreads,omitempty"` // List of OS processor ids - Accelerators []string `json:"accelerators,omitempty"` // List of of accelerator device ids + Hostname string `json:"hostname"` // Name of the host (= node) + HWThreads []int `json:"hwthreads,omitempty"` // List of OS processor ids + Accelerators []string `json:"accelerators,omitempty"` // List of of accelerator device ids Configuration string `json:"configuration,omitempty"` // The configuration options of the node } diff --git a/pkg/schema/metrics.go b/pkg/schema/metrics.go index 171901c..c0f1fa3 100644 --- a/pkg/schema/metrics.go +++ b/pkg/schema/metrics.go @@ -15,7 +15,7 @@ import ( type JobData map[string]map[MetricScope]*JobMetric type JobMetric struct { - Unit string `json:"unit"` + Unit Unit `json:"unit"` Scope MetricScope `json:"scope"` Timestep int `json:"timestep"` Series []Series `json:"series"` diff --git a/pkg/schema/schemas/cluster.schema.json b/pkg/schema/schemas/cluster.schema.json index 5dfb92e..7cbf639 100644 --- a/pkg/schema/schemas/cluster.schema.json +++ b/pkg/schema/schemas/cluster.schema.json @@ -21,7 +21,7 @@ }, "unit": { "description": "Metric unit", - "type": "string" + "$ref": "embedfs://unit.schema.json" }, "scope": { "description": "Native measurement resolution", @@ -38,7 +38,6 @@ "sum", "avg" ] - }, "subClusters": { "description": "Array of cluster hardware partition metric thresholds", diff --git a/pkg/schema/schemas/job-meta.schema.json b/pkg/schema/schemas/job-meta.schema.json index 6959ec0..061c920 100644 --- a/pkg/schema/schemas/job-meta.schema.json +++ b/pkg/schema/schemas/job-meta.schema.json @@ -349,7 +349,6 @@ "jobState", "duration", "resources", - "tags", "statistics" ] } diff --git a/pkg/schema/schemas/unit.schema.json b/pkg/schema/schemas/unit.schema.json index 85b1458..c5bad7e 100644 --- a/pkg/schema/schemas/unit.schema.json +++ b/pkg/schema/schemas/unit.schema.json @@ -5,7 +5,7 @@ "description": "Format specification for job metric units", "type": "object", "properties": { - "base_unit": { + "base": { "description": "Metric base unit", "type": "string", "enum": [ @@ -36,6 +36,6 @@ } }, "required": [ - "base_unit" + "base" ] } diff --git a/test/integration_test.go b/test/integration_test.go index abafe08..a8de0ce 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -602,4 +602,17 @@ func testImportFlag(t *testing.T) { if len(data) != 8 { t.Errorf("Job data length: Got %d, want 8", len(data)) } + + r := map[string]string{"mem_used": "GB", "net_bw": "KB/s", + "cpu_power": "W", "cpu_used": "cpu_used", + "file_bw": "KB/s", "flops_any": "Flops/s", + "mem_bw": "GB/s", "ipc": "IPC"} + + for name, scopes := range data { + for _, metric := range scopes { + if metric.Unit != r[name] { + t.Errorf("Metric %s unit: Got %s, want %s", name, metric.Unit, r[name]) + } + } + } }