Merge branch 'import-data-sanitation' of https://github.com/ClusterCockpit/cc-backend into import-data-sanitation

This commit is contained in:
Christoph Kluge 2023-03-29 10:39:42 +02:00
commit 1bb2827aeb
7 changed files with 51 additions and 39 deletions

View File

@ -283,7 +283,7 @@ func (ccms *CCMetricStore) buildQueries(
scopesLoop: scopesLoop:
for _, requestedScope := range scopes { for _, requestedScope := range scopes {
nativeScope := mc.Scope nativeScope := mc.Scope
if nativeScope == schema.MetricScopeAccelerator && job.NumAcc == 0 { if nativeScope == schema.MetricScopeAccelerator && job.NumAcc == nil {
continue continue
} }

View File

@ -351,7 +351,7 @@ func SanityChecks(job *schema.BaseJob) error {
if len(job.Resources) == 0 || len(job.User) == 0 { if len(job.Resources) == 0 || len(job.User) == 0 {
return fmt.Errorf("'resources' and 'user' should not be empty") return fmt.Errorf("'resources' and 'user' should not be empty")
} }
if job.NumAcc < 0 || job.NumHWThreads < 0 || job.NumNodes < 1 { if *job.NumAcc < 0 || *job.NumHWThreads < 0 || job.NumNodes < 1 {
return fmt.Errorf("'numNodes', 'numAcc' or 'numHWThreads' invalid") return fmt.Errorf("'numNodes', 'numAcc' or 'numHWThreads' invalid")
} }
if len(job.Resources) != int(job.NumNodes) { if len(job.Resources) != int(job.NumNodes) {

View File

@ -21,18 +21,18 @@ type BaseJob struct {
Project string `json:"project" db:"project" example:"abcd200"` // The unique identifier of a project Project string `json:"project" db:"project" example:"abcd200"` // The unique identifier of a project
Cluster string `json:"cluster" db:"cluster" example:"fritz"` // The unique identifier of a cluster Cluster string `json:"cluster" db:"cluster" example:"fritz"` // The unique identifier of a cluster
SubCluster string `json:"subCluster" db:"subcluster" example:"main"` // The unique identifier of a sub cluster SubCluster string `json:"subCluster" db:"subcluster" example:"main"` // The unique identifier of a sub cluster
Partition string `json:"partition" db:"partition" example:"main"` // The Slurm partition to which the job was submitted Partition *string `json:"partition,omitempty" db:"partition" example:"main"` // The Slurm partition to which the job was submitted
ArrayJobId int64 `json:"arrayJobId" db:"array_job_id" example:"123000"` // The unique identifier of an array job ArrayJobId *int64 `json:"arrayJobId,omitempty" db:"array_job_id" example:"123000"` // The unique identifier of an array job
NumNodes int32 `json:"numNodes" db:"num_nodes" example:"2" minimum:"1"` // Number of nodes used (Min > 0) NumNodes int32 `json:"numNodes" db:"num_nodes" example:"2" minimum:"1"` // Number of nodes used (Min > 0)
NumHWThreads int32 `json:"numHwthreads" db:"num_hwthreads" example:"20" minimum:"1"` // Number of HWThreads used (Min > 0) NumHWThreads *int32 `json:"numHwthreads,omitempty" db:"num_hwthreads" example:"20" minimum:"1"` // Number of HWThreads used (Min > 0)
NumAcc int32 `json:"numAcc" db:"num_acc" example:"2" minimum:"1"` // Number of accelerators used (Min > 0) NumAcc *int32 `json:"numAcc,omitempty" db:"num_acc" example:"2" minimum:"1"` // Number of accelerators used (Min > 0)
Exclusive int32 `json:"exclusive" db:"exclusive" example:"1" minimum:"0" maximum:"2"` // Specifies how nodes are shared: 0 - Shared among multiple jobs of multiple users, 1 - Job exclusive (Default), 2 - Shared among multiple jobs of same user Exclusive int32 `json:"exclusive" db:"exclusive" example:"1" minimum:"0" maximum:"2"` // Specifies how nodes are shared: 0 - Shared among multiple jobs of multiple users, 1 - Job exclusive (Default), 2 - Shared among multiple jobs of same user
MonitoringStatus int32 `json:"monitoringStatus" db:"monitoring_status" example:"1" minimum:"0" maximum:"3"` // State of monitoring system during job run: 0 - Disabled, 1 - Running or Archiving (Default), 2 - Archiving Failed, 3 - Archiving Successfull MonitoringStatus int32 `json:"monitoringStatus,omitempty" db:"monitoring_status" example:"1" minimum:"0" maximum:"3"` // State of monitoring system during job run: 0 - Disabled, 1 - Running or Archiving (Default), 2 - Archiving Failed, 3 - Archiving Successfull
SMT int32 `json:"smt" db:"smt" example:"4"` // SMT threads used by job SMT *int32 `json:"smt,omitempty" db:"smt" example:"4"` // SMT threads used by job
State JobState `json:"jobState" db:"job_state" example:"completed" enums:"completed,failed,cancelled,stopped,timeout,out_of_memory"` // Final state of job State JobState `json:"jobState" db:"job_state" example:"completed" enums:"completed,failed,cancelled,stopped,timeout,out_of_memory"` // Final state of job
Duration int32 `json:"duration" db:"duration" example:"43200" minimum:"1"` // Duration of job in seconds (Min > 0) Duration int32 `json:"duration" db:"duration" example:"43200" minimum:"1"` // Duration of job in seconds (Min > 0)
Walltime int64 `json:"walltime" db:"walltime" example:"86400" minimum:"1"` // Requested walltime of job in seconds (Min > 0) Walltime *int64 `json:"walltime,omitempty" db:"walltime" example:"86400" minimum:"1"` // Requested walltime of job in seconds (Min > 0)
Tags []*Tag `json:"tags"` // List of tags Tags []*Tag `json:"tags,omitempty"` // List of tags
RawResources []byte `json:"-" db:"resources"` // Resources used by job [As Bytes] RawResources []byte `json:"-" db:"resources"` // Resources used by job [As Bytes]
Resources []*Resource `json:"resources"` // Resources used by job Resources []*Resource `json:"resources"` // Resources used by job
RawMetaData []byte `json:"-" db:"meta_data"` // Additional information about the job [As Bytes] RawMetaData []byte `json:"-" db:"meta_data"` // Additional information about the job [As Bytes]

View File

@ -338,6 +338,7 @@
"user", "user",
"project", "project",
"cluster", "cluster",
"subcluster",
"numNodes", "numNodes",
"exclusive", "exclusive",
"startTime", "startTime",

View File

@ -448,15 +448,15 @@ func TestRestApi(t *testing.T) {
job.Project != "testproj" || job.Project != "testproj" ||
job.Cluster != "testcluster" || job.Cluster != "testcluster" ||
job.SubCluster != "sc1" || job.SubCluster != "sc1" ||
job.Partition != "default" || *job.Partition != "default" ||
job.Walltime != 3600 || *job.Walltime != 3600 ||
job.ArrayJobId != 0 || job.ArrayJobId != nil ||
job.NumNodes != 1 || job.NumNodes != 1 ||
job.NumHWThreads != 8 || *job.NumHWThreads != 8 ||
job.NumAcc != 0 || job.NumAcc != nil ||
job.Exclusive != 1 || job.Exclusive != 1 ||
job.MonitoringStatus != 1 || job.MonitoringStatus != 1 ||
job.SMT != 1 || *job.SMT != 1 ||
!reflect.DeepEqual(job.Resources, []*schema.Resource{{Hostname: "host123", HWThreads: []int{0, 1, 2, 3, 4, 5, 6, 7}}}) || !reflect.DeepEqual(job.Resources, []*schema.Resource{{Hostname: "host123", HWThreads: []int{0, 1, 2, 3, 4, 5, 6, 7}}}) ||
job.StartTime.Unix() != 123456789 { job.StartTime.Unix() != 123456789 {
t.Fatalf("unexpected job properties: %#v", job) t.Fatalf("unexpected job properties: %#v", job)

View File

@ -9,6 +9,8 @@ import (
"fmt" "fmt"
"io" "io"
"time" "time"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
) )
// Non-Swaggered Comment: BaseJob // Non-Swaggered Comment: BaseJob
@ -21,18 +23,18 @@ type BaseJob struct {
Project string `json:"project" db:"project" example:"abcd200"` // The unique identifier of a project Project string `json:"project" db:"project" example:"abcd200"` // The unique identifier of a project
Cluster string `json:"cluster" db:"cluster" example:"fritz"` // The unique identifier of a cluster Cluster string `json:"cluster" db:"cluster" example:"fritz"` // The unique identifier of a cluster
SubCluster string `json:"subCluster" db:"subcluster" example:"main"` // The unique identifier of a sub cluster SubCluster string `json:"subCluster" db:"subcluster" example:"main"` // The unique identifier of a sub cluster
Partition string `json:"partition" db:"partition" example:"main"` // The Slurm partition to which the job was submitted Partition *string `json:"partition" db:"partition" example:"main"` // The Slurm partition to which the job was submitted
ArrayJobId int64 `json:"arrayJobId" db:"array_job_id" example:"123000"` // The unique identifier of an array job ArrayJobId *int64 `json:"arrayJobId" db:"array_job_id" example:"123000"` // The unique identifier of an array job
NumNodes int32 `json:"numNodes" db:"num_nodes" example:"2" minimum:"1"` // Number of nodes used (Min > 0) NumNodes int32 `json:"numNodes" db:"num_nodes" example:"2" minimum:"1"` // Number of nodes used (Min > 0)
NumHWThreads int32 `json:"numHwthreads" db:"num_hwthreads" example:"20" minimum:"1"` // Number of HWThreads used (Min > 0) NumHWThreads *int32 `json:"numHwthreads" db:"num_hwthreads" example:"20" minimum:"1"` // Number of HWThreads used (Min > 0)
NumAcc int32 `json:"numAcc" db:"num_acc" example:"2" minimum:"1"` // Number of accelerators used (Min > 0) NumAcc *int32 `json:"numAcc" db:"num_acc" example:"2" minimum:"1"` // Number of accelerators used (Min > 0)
Exclusive int32 `json:"exclusive" db:"exclusive" example:"1" minimum:"0" maximum:"2"` // Specifies how nodes are shared: 0 - Shared among multiple jobs of multiple users, 1 - Job exclusive (Default), 2 - Shared among multiple jobs of same user Exclusive int32 `json:"exclusive" db:"exclusive" example:"1" minimum:"0" maximum:"2"` // Specifies how nodes are shared: 0 - Shared among multiple jobs of multiple users, 1 - Job exclusive (Default), 2 - Shared among multiple jobs of same user
MonitoringStatus int32 `json:"monitoringStatus" db:"monitoring_status" example:"1" minimum:"0" maximum:"3"` // State of monitoring system during job run: 0 - Disabled, 1 - Running or Archiving (Default), 2 - Archiving Failed, 3 - Archiving Successfull MonitoringStatus int32 `json:"monitoringStatus" db:"monitoring_status" example:"1" minimum:"0" maximum:"3"` // State of monitoring system during job run: 0 - Disabled, 1 - Running or Archiving (Default), 2 - Archiving Failed, 3 - Archiving Successfull
SMT int32 `json:"smt" db:"smt" example:"4"` // SMT threads used by job SMT *int32 `json:"smt" db:"smt" example:"4"` // SMT threads used by job
State JobState `json:"jobState" db:"job_state" example:"completed" enums:"completed,failed,cancelled,stopped,timeout,out_of_memory"` // Final state of job State JobState `json:"jobState" db:"job_state" example:"completed" enums:"completed,failed,cancelled,stopped,timeout,out_of_memory"` // Final state of job
Duration int32 `json:"duration" db:"duration" example:"43200" minimum:"1"` // Duration of job in seconds (Min > 0) Duration int32 `json:"duration" db:"duration" example:"43200" minimum:"1"` // Duration of job in seconds (Min > 0)
Walltime int64 `json:"walltime" db:"walltime" example:"86400" minimum:"1"` // Requested walltime of job in seconds (Min > 0) Walltime *int64 `json:"walltime" db:"walltime" example:"86400" minimum:"1"` // Requested walltime of job in seconds (Min > 0)
Tags []*Tag `json:"tags"` // List of tags Tags []*schema.Tag `json:"tags"` // List of tags
RawResources []byte `json:"-" db:"resources"` // Resources used by job [As Bytes] RawResources []byte `json:"-" db:"resources"` // Resources used by job [As Bytes]
Resources []*Resource `json:"resources"` // Resources used by job Resources []*Resource `json:"resources"` // Resources used by job
RawMetaData []byte `json:"-" db:"meta_data"` // Additional information about the job [As Bytes] RawMetaData []byte `json:"-" db:"meta_data"` // Additional information about the job [As Bytes]

View File

@ -36,34 +36,30 @@ func loadJobData(filename string) (*JobData, error) {
func deepCopyJobMeta(j *JobMeta) schema.JobMeta { func deepCopyJobMeta(j *JobMeta) schema.JobMeta {
var jn schema.JobMeta var jn schema.JobMeta
jn.StartTime = j.StartTime //required properties
jn.JobID = j.JobID
jn.User = j.User jn.User = j.User
jn.Project = j.Project jn.Project = j.Project
jn.Cluster = j.Cluster jn.Cluster = j.Cluster
jn.SubCluster = j.SubCluster jn.SubCluster = j.SubCluster
jn.Partition = j.Partition
jn.ArrayJobId = j.ArrayJobId
jn.NumNodes = j.NumNodes jn.NumNodes = j.NumNodes
jn.NumHWThreads = j.NumHWThreads
jn.NumAcc = j.NumAcc
jn.Exclusive = j.Exclusive jn.Exclusive = j.Exclusive
jn.MonitoringStatus = j.MonitoringStatus jn.StartTime = j.StartTime
jn.SMT = j.SMT
jn.Duration = j.Duration
jn.Walltime = j.Walltime
jn.State = schema.JobState(j.State) jn.State = schema.JobState(j.State)
jn.Exclusive = j.Exclusive jn.Duration = j.Duration
jn.Exclusive = j.Exclusive
jn.Exclusive = j.Exclusive
for _, ro := range j.Resources { for _, ro := range j.Resources {
var rn schema.Resource var rn schema.Resource
rn.Hostname = ro.Hostname rn.Hostname = ro.Hostname
rn.Configuration = ro.Configuration rn.Configuration = ro.Configuration
hwt := make([]int, len(ro.HWThreads)) if ro.HWThreads != nil {
copy(hwt, ro.HWThreads) hwt := make([]int, len(ro.HWThreads))
acc := make([]string, len(ro.Accelerators)) copy(hwt, ro.HWThreads)
copy(acc, ro.Accelerators) }
if ro.Accelerators != nil {
acc := make([]string, len(ro.Accelerators))
copy(acc, ro.Accelerators)
}
jn.Resources = append(jn.Resources, &rn) jn.Resources = append(jn.Resources, &rn)
} }
@ -71,6 +67,19 @@ func deepCopyJobMeta(j *JobMeta) schema.JobMeta {
jn.MetaData[k] = v jn.MetaData[k] = v
} }
//optional properties
jn.Partition = j.Partition
jn.ArrayJobId = j.ArrayJobId
jn.NumHWThreads = j.NumHWThreads
jn.NumAcc = j.NumAcc
jn.MonitoringStatus = j.MonitoringStatus
jn.SMT = j.SMT
jn.Walltime = j.Walltime
for _, t := range j.Tags {
jn.Tags = append(jn.Tags, t)
}
return jn return jn
} }