From 786770f56abac5863ccbe62795c7c029f24ab5e8 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Fri, 28 Jun 2024 16:48:10 +0200 Subject: [PATCH] Start to convert to new footprint layout --- internal/repository/job.go | 14 ++++-- internal/repository/migration.go | 2 +- internal/repository/query.go | 18 +++---- pkg/schema/cluster.go | 38 +++++++------- pkg/schema/job.go | 85 ++++++++++++++------------------ 5 files changed, 78 insertions(+), 79 deletions(-) diff --git a/internal/repository/job.go b/internal/repository/job.go index b42598d..da71566 100644 --- a/internal/repository/job.go +++ b/internal/repository/job.go @@ -59,7 +59,7 @@ func GetJobRepository() *JobRepository { var jobColumns []string = []string{ "job.id", "job.job_id", "job.user", "job.project", "job.cluster", "job.subcluster", "job.start_time", "job.partition", "job.array_job_id", "job.num_nodes", "job.num_hwthreads", "job.num_acc", "job.exclusive", "job.monitoring_status", "job.smt", "job.job_state", - "job.duration", "job.walltime", "job.resources", "job.mem_used_max", "job.flops_any_avg", "job.mem_bw_avg", "job.load_avg", // "job.meta_data", + "job.duration", "job.walltime", "job.resources", "job.footprint", // "job.meta_data", } func scanJob(row interface{ Scan(...interface{}) error }) (*schema.Job, error) { @@ -67,15 +67,22 @@ func scanJob(row interface{ Scan(...interface{}) error }) (*schema.Job, error) { if err := row.Scan( &job.ID, &job.JobID, &job.User, &job.Project, &job.Cluster, &job.SubCluster, &job.StartTimeUnix, &job.Partition, &job.ArrayJobId, &job.NumNodes, &job.NumHWThreads, &job.NumAcc, &job.Exclusive, &job.MonitoringStatus, &job.SMT, &job.State, - &job.Duration, &job.Walltime, &job.RawResources, &job.MemUsedMax, &job.FlopsAnyAvg, &job.MemBwAvg, &job.LoadAvg /*&job.RawMetaData*/); err != nil { + &job.Duration, &job.Walltime, &job.RawResources, &job.RawFootprint /*&job.RawMetaData*/); err != nil { log.Warnf("Error while scanning rows (Job): %v", err) return nil, err } if err := json.Unmarshal(job.RawResources, &job.Resources); err != nil { - log.Warn("Error while unmarhsaling raw resources json") + log.Warn("Error while unmarshaling raw resources json") return nil, err } + job.RawResources = nil + + if err := json.Unmarshal(job.RawFootprint, &job.Footprint); err != nil { + log.Warn("Error while unmarshaling raw footprint json") + return nil, err + } + job.RawFootprint = nil // if err := json.Unmarshal(job.RawMetaData, &job.MetaData); err != nil { // return nil, err @@ -86,7 +93,6 @@ func scanJob(row interface{ Scan(...interface{}) error }) (*schema.Job, error) { job.Duration = int32(time.Since(job.StartTime).Seconds()) } - job.RawResources = nil return job, nil } diff --git a/internal/repository/migration.go b/internal/repository/migration.go index 0259c61..970fbc2 100644 --- a/internal/repository/migration.go +++ b/internal/repository/migration.go @@ -16,7 +16,7 @@ import ( "github.com/golang-migrate/migrate/v4/source/iofs" ) -const Version uint = 7 +const Version uint = 8 //go:embed migrations/* var migrationFiles embed.FS diff --git a/internal/repository/query.go b/internal/repository/query.go index 5ca98fb..394856d 100644 --- a/internal/repository/query.go +++ b/internal/repository/query.go @@ -22,8 +22,8 @@ func (r *JobRepository) QueryJobs( ctx context.Context, filters []*model.JobFilter, page *model.PageRequest, - order *model.OrderByInput) ([]*schema.Job, error) { - + order *model.OrderByInput, +) ([]*schema.Job, error) { query, qerr := SecurityCheck(ctx, sq.Select(jobColumns...).From("job")) if qerr != nil { return nil, qerr @@ -73,8 +73,8 @@ func (r *JobRepository) QueryJobs( func (r *JobRepository) CountJobs( ctx context.Context, - filters []*model.JobFilter) (int, error) { - + filters []*model.JobFilter, +) (int, error) { query, qerr := SecurityCheck(ctx, sq.Select("count(*)").From("job")) if qerr != nil { return 0, qerr @@ -227,9 +227,7 @@ func buildStringCondition(field string, cond *model.StringInput, query sq.Select } if cond.In != nil { queryElements := make([]string, len(cond.In)) - for i, val := range cond.In { - queryElements[i] = val - } + copy(queryElements, cond.In) return query.Where(sq.Or{sq.Eq{field: queryElements}}) } return query @@ -257,8 +255,10 @@ func buildMetaJsonCondition(jsonField string, cond *model.StringInput, query sq. return query } -var matchFirstCap = regexp.MustCompile("(.)([A-Z][a-z]+)") -var matchAllCap = regexp.MustCompile("([a-z0-9])([A-Z])") +var ( + matchFirstCap = regexp.MustCompile("(.)([A-Z][a-z]+)") + matchAllCap = regexp.MustCompile("([a-z0-9])([A-Z])") +) func toSnakeCase(str string) string { for _, c := range str { diff --git a/pkg/schema/cluster.go b/pkg/schema/cluster.go index e4ca658..6edd830 100644 --- a/pkg/schema/cluster.go +++ b/pkg/schema/cluster.go @@ -33,35 +33,37 @@ type SubCluster struct { Name string `json:"name"` Nodes string `json:"nodes"` ProcessorType string `json:"processorType"` - SocketsPerNode int `json:"socketsPerNode"` - CoresPerSocket int `json:"coresPerSocket"` - ThreadsPerCore int `json:"threadsPerCore"` + Topology Topology `json:"topology"` FlopRateScalar MetricValue `json:"flopRateScalar"` FlopRateSimd MetricValue `json:"flopRateSimd"` MemoryBandwidth MetricValue `json:"memoryBandwidth"` - Topology Topology `json:"topology"` + SocketsPerNode int `json:"socketsPerNode"` + CoresPerSocket int `json:"coresPerSocket"` + ThreadsPerCore int `json:"threadsPerCore"` } type SubClusterConfig struct { - Name string `json:"name"` - Peak float64 `json:"peak"` - Normal float64 `json:"normal"` - Caution float64 `json:"caution"` - Alert float64 `json:"alert"` - Remove bool `json:"remove"` + Name string `json:"name"` + Peak float64 `json:"peak"` + Normal float64 `json:"normal"` + Caution float64 `json:"caution"` + Alert float64 `json:"alert"` + Footprint bool `json:"footprint"` + Remove bool `json:"remove"` } type MetricConfig struct { - Name string `json:"name"` Unit Unit `json:"unit"` + Name string `json:"name"` Scope MetricScope `json:"scope"` Aggregation string `json:"aggregation"` + SubClusters []*SubClusterConfig `json:"subClusters,omitempty"` Timestep int `json:"timestep"` Peak float64 `json:"peak"` Normal float64 `json:"normal"` Caution float64 `json:"caution"` Alert float64 `json:"alert"` - SubClusters []*SubClusterConfig `json:"subClusters,omitempty"` + Footprint bool `json:"footprint"` } type Cluster struct { @@ -76,8 +78,8 @@ type Cluster struct { // return value, return true as the second value. TODO: Optimize this, there // must be a more efficient way/algorithm. func (topo *Topology) GetSocketsFromHWThreads( - hwthreads []int) (sockets []int, exclusive bool) { - + hwthreads []int, +) (sockets []int, exclusive bool) { socketsMap := map[int]int{} for _, hwthread := range hwthreads { for socket, hwthreadsInSocket := range topo.Socket { @@ -106,8 +108,8 @@ func (topo *Topology) GetSocketsFromHWThreads( // return value, return true as the second value. TODO: Optimize this, there // must be a more efficient way/algorithm. func (topo *Topology) GetCoresFromHWThreads( - hwthreads []int) (cores []int, exclusive bool) { - + hwthreads []int, +) (cores []int, exclusive bool) { coresMap := map[int]int{} for _, hwthread := range hwthreads { for core, hwthreadsInCore := range topo.Core { @@ -136,8 +138,8 @@ func (topo *Topology) GetCoresFromHWThreads( // memory domains in the first return value, return true as the second value. // TODO: Optimize this, there must be a more efficient way/algorithm. func (topo *Topology) GetMemoryDomainsFromHWThreads( - hwthreads []int) (memDoms []int, exclusive bool) { - + hwthreads []int, +) (memDoms []int, exclusive bool) { memDomsMap := map[int]int{} for _, hwthread := range hwthreads { for memDom, hwthreadsInmemDom := range topo.MemoryDomain { diff --git a/pkg/schema/job.go b/pkg/schema/job.go index ad3e6dc..f5363b3 100644 --- a/pkg/schema/job.go +++ b/pkg/schema/job.go @@ -16,30 +16,31 @@ import ( // Common subset of Job and JobMeta. Use one of those, not this type directly. type BaseJob struct { - // The unique identifier of a job - JobID int64 `json:"jobId" db:"job_id" example:"123000"` - User string `json:"user" db:"user" example:"abcd100h"` // The unique identifier of a user - Project string `json:"project" db:"project" example:"abcd200"` // The unique identifier of a project - Cluster string `json:"cluster" db:"cluster" example:"fritz"` // The unique identifier of a cluster - SubCluster string `json:"subCluster" db:"subcluster" example:"main"` // The unique identifier of a sub cluster - Partition string `json:"partition,omitempty" db:"partition" example:"main"` // The Slurm partition to which the job was submitted - ArrayJobId int64 `json:"arrayJobId,omitempty" db:"array_job_id" example:"123000"` // The unique identifier of an array job - NumNodes int32 `json:"numNodes" db:"num_nodes" example:"2" minimum:"1"` // Number of nodes used (Min > 0) - // NumCores int32 `json:"numCores" db:"num_cores" example:"20" minimum:"1"` // Number of HWThreads used (Min > 0) - NumHWThreads int32 `json:"numHwthreads,omitempty" db:"num_hwthreads" example:"20" minimum:"1"` // Number of HWThreads used (Min > 0) - NumAcc int32 `json:"numAcc,omitempty" db:"num_acc" example:"2" minimum:"1"` // Number of accelerators used (Min > 0) - Exclusive int32 `json:"exclusive" db:"exclusive" example:"1" minimum:"0" maximum:"2"` // Specifies how nodes are shared: 0 - Shared among multiple jobs of multiple users, 1 - Job exclusive (Default), 2 - Shared among multiple jobs of same user - MonitoringStatus int32 `json:"monitoringStatus,omitempty" db:"monitoring_status" example:"1" minimum:"0" maximum:"3"` // State of monitoring system during job run: 0 - Disabled, 1 - Running or Archiving (Default), 2 - Archiving Failed, 3 - Archiving Successfull - SMT int32 `json:"smt,omitempty" db:"smt" example:"4"` // SMT threads used by job - State JobState `json:"jobState" db:"job_state" example:"completed" enums:"completed,failed,cancelled,stopped,timeout,out_of_memory"` // Final state of job - Duration int32 `json:"duration" db:"duration" example:"43200" minimum:"1"` // Duration of job in seconds (Min > 0) - Walltime int64 `json:"walltime,omitempty" db:"walltime" example:"86400" minimum:"1"` // Requested walltime of job in seconds (Min > 0) - Tags []*Tag `json:"tags,omitempty"` // List of tags - RawResources []byte `json:"-" db:"resources"` // Resources used by job [As Bytes] - Resources []*Resource `json:"resources"` // Resources used by job - RawMetaData []byte `json:"-" db:"meta_data"` // Additional information about the job [As Bytes] - MetaData map[string]string `json:"metaData"` // Additional information about the job - ConcurrentJobs JobLinkResultList `json:"concurrentJobs"` + Footprint map[string]float64 `json:"footPrint"` + MetaData map[string]string `json:"metaData"` + Cluster string `json:"cluster" db:"cluster" example:"fritz"` + SubCluster string `json:"subCluster" db:"subcluster" example:"main"` + Partition string `json:"partition,omitempty" db:"partition" example:"main"` + Project string `json:"project" db:"project" example:"abcd200"` + User string `json:"user" db:"user" example:"abcd100h"` + State JobState `json:"jobState" db:"job_state" example:"completed" enums:"completed,failed,cancelled,stopped,timeout,out_of_memory"` + Tags []*Tag `json:"tags,omitempty"` + RawFootprint []byte `json:"-" db:"footprint"` + RawMetaData []byte `json:"-" db:"meta_data"` + Resources []*Resource `json:"resources"` + RawResources []byte `json:"-" db:"resources"` + ConcurrentJobs JobLinkResultList `json:"concurrentJobs"` + Energy float64 `json:"energy"` + ArrayJobId int64 `json:"arrayJobId,omitempty" db:"array_job_id" example:"123000"` + Walltime int64 `json:"walltime,omitempty" db:"walltime" example:"86400" minimum:"1"` + JobID int64 `json:"jobId" db:"job_id" example:"123000"` + Duration int32 `json:"duration" db:"duration" example:"43200" minimum:"1"` + SMT int32 `json:"smt,omitempty" db:"smt" example:"4"` + MonitoringStatus int32 `json:"monitoringStatus,omitempty" db:"monitoring_status" example:"1" minimum:"0" maximum:"3"` + Exclusive int32 `json:"exclusive" db:"exclusive" example:"1" minimum:"0" maximum:"2"` + NumAcc int32 `json:"numAcc,omitempty" db:"num_acc" example:"2" minimum:"1"` + NumHWThreads int32 `json:"numHwthreads,omitempty" db:"num_hwthreads" example:"20" minimum:"1"` + NumNodes int32 `json:"numNodes" db:"num_nodes" example:"2" minimum:"1"` } // Job struct type @@ -49,19 +50,10 @@ type BaseJob struct { // Job model // @Description Information of a HPC job. type Job struct { - // The unique identifier of a job in the database - ID int64 `json:"id" db:"id"` + StartTime time.Time `json:"startTime"` BaseJob - StartTimeUnix int64 `json:"-" db:"start_time" example:"1649723812"` // Start epoch time stamp in seconds - StartTime time.Time `json:"startTime"` // Start time as 'time.Time' data type - MemUsedMax float64 `json:"memUsedMax" db:"mem_used_max"` // MemUsedMax as Float64 - FlopsAnyAvg float64 `json:"flopsAnyAvg" db:"flops_any_avg"` // FlopsAnyAvg as Float64 - MemBwAvg float64 `json:"memBwAvg" db:"mem_bw_avg"` // MemBwAvg as Float64 - LoadAvg float64 `json:"loadAvg" db:"load_avg"` // LoadAvg as Float64 - NetBwAvg float64 `json:"-" db:"net_bw_avg"` // NetBwAvg as Float64 - NetDataVolTotal float64 `json:"-" db:"net_data_vol_total"` // NetDataVolTotal as Float64 - FileBwAvg float64 `json:"-" db:"file_bw_avg"` // FileBwAvg as Float64 - FileDataVolTotal float64 `json:"-" db:"file_data_vol_total"` // FileDataVolTotal as Float64 + ID int64 `json:"id" db:"id"` + StartTimeUnix int64 `json:"-" db:"start_time" example:"1649723812"` } // JobMeta struct type @@ -88,11 +80,10 @@ type JobLinkResultList struct { // JobMeta model // @Description Meta data information of a HPC job. type JobMeta struct { - // The unique identifier of a job in the database - ID *int64 `json:"id,omitempty"` + ID *int64 `json:"id,omitempty"` + Statistics map[string]JobStatistics `json:"statistics"` BaseJob - StartTime int64 `json:"startTime" db:"start_time" example:"1649723812" minimum:"1"` // Start epoch time stamp in seconds (Min > 0) - Statistics map[string]JobStatistics `json:"statistics"` // Metric statistics of job + StartTime int64 `json:"startTime" db:"start_time" example:"1649723812" minimum:"1"` } const ( @@ -124,18 +115,18 @@ type JobStatistics struct { // Tag model // @Description Defines a tag using name and type. type Tag struct { - ID int64 `json:"id" db:"id"` // The unique DB identifier of a tag - Type string `json:"type" db:"tag_type" example:"Debug"` // Tag Type - Name string `json:"name" db:"tag_name" example:"Testjob"` // Tag Name + Type string `json:"type" db:"tag_type" example:"Debug"` + Name string `json:"name" db:"tag_name" example:"Testjob"` + ID int64 `json:"id" db:"id"` } // Resource model // @Description A resource used by a job type Resource struct { - Hostname string `json:"hostname"` // Name of the host (= node) - HWThreads []int `json:"hwthreads,omitempty"` // List of OS processor ids - Accelerators []string `json:"accelerators,omitempty"` // List of of accelerator device ids - Configuration string `json:"configuration,omitempty"` // The configuration options of the node + Hostname string `json:"hostname"` + Configuration string `json:"configuration,omitempty"` + HWThreads []int `json:"hwthreads,omitempty"` + Accelerators []string `json:"accelerators,omitempty"` } type JobState string