From 561fd41d5d3b9877dc7be3980bda50e4fcb1f46a Mon Sep 17 00:00:00 2001 From: Christoph Kluge Date: Tue, 13 Aug 2024 17:49:28 +0200 Subject: [PATCH 01/31] fix: add accelerator scope to to-be archived scopes - if numAcc > 0 - fixes Add accelerator scope to archive requests #282 --- internal/metricdata/metricdata.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/internal/metricdata/metricdata.go b/internal/metricdata/metricdata.go index c826113..eba9dee 100644 --- a/internal/metricdata/metricdata.go +++ b/internal/metricdata/metricdata.go @@ -307,6 +307,10 @@ func ArchiveJob(job *schema.Job, ctx context.Context) (*schema.JobMeta, error) { scopes = append(scopes, schema.MetricScopeCore) } + if job.NumAcc > 0 { + scopes = append(scopes, schema.MetricScopeAccelerator) + } + jobData, err := LoadData(job, allMetrics, scopes, ctx) if err != nil { log.Error("Error wile loading job data for archiving") From 9b6db4684adde310b29edb56d298c3faacbcd1b3 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Thu, 15 Aug 2024 08:53:49 +0200 Subject: [PATCH 02/31] Refactor: Remove redundant code --- cmd/cc-backend/server.go | 49 +++++++++++----------------------------- 1 file changed, 13 insertions(+), 36 deletions(-) diff --git a/cmd/cc-backend/server.go b/cmd/cc-backend/server.go index 5531415..d2b62e2 100644 --- a/cmd/cc-backend/server.go +++ b/cmd/cc-backend/server.go @@ -38,6 +38,15 @@ var ( apiHandle *api.RestApi ) +func onFailureResponse(rw http.ResponseWriter, r *http.Request, err error) { + rw.Header().Add("Content-Type", "application/json") + rw.WriteHeader(http.StatusUnauthorized) + json.NewEncoder(rw).Encode(map[string]string{ + "status": http.StatusText(http.StatusUnauthorized), + "error": err.Error(), + }) +} + func serverInit() { // Setup the http.Handler/Router used by the server graph.Init() @@ -166,64 +175,32 @@ func serverInit() { return authHandle.AuthApi( // On success; next, - // On failure: JSON Response - func(rw http.ResponseWriter, r *http.Request, err error) { - rw.Header().Add("Content-Type", "application/json") - rw.WriteHeader(http.StatusUnauthorized) - json.NewEncoder(rw).Encode(map[string]string{ - "status": http.StatusText(http.StatusUnauthorized), - "error": err.Error(), - }) - }) + onFailureResponse) }) userapi.Use(func(next http.Handler) http.Handler { return authHandle.AuthUserApi( // On success; next, - // On failure: JSON Response - func(rw http.ResponseWriter, r *http.Request, err error) { - rw.Header().Add("Content-Type", "application/json") - rw.WriteHeader(http.StatusUnauthorized) - json.NewEncoder(rw).Encode(map[string]string{ - "status": http.StatusText(http.StatusUnauthorized), - "error": err.Error(), - }) - }) + onFailureResponse) }) configapi.Use(func(next http.Handler) http.Handler { return authHandle.AuthConfigApi( // On success; next, - // On failure: JSON Response - func(rw http.ResponseWriter, r *http.Request, err error) { - rw.Header().Add("Content-Type", "application/json") - rw.WriteHeader(http.StatusUnauthorized) - json.NewEncoder(rw).Encode(map[string]string{ - "status": http.StatusText(http.StatusUnauthorized), - "error": err.Error(), - }) - }) + onFailureResponse) }) frontendapi.Use(func(next http.Handler) http.Handler { return authHandle.AuthFrontendApi( // On success; next, - // On failure: JSON Response - func(rw http.ResponseWriter, r *http.Request, err error) { - rw.Header().Add("Content-Type", "application/json") - rw.WriteHeader(http.StatusUnauthorized) - json.NewEncoder(rw).Encode(map[string]string{ - "status": http.StatusText(http.StatusUnauthorized), - "error": err.Error(), - }) - }) + onFailureResponse) }) } From ba2f406bc08b9ca75d9b03773269f8ada89c7e2b Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Thu, 15 Aug 2024 09:41:54 +0200 Subject: [PATCH 03/31] Extend sqlite db migration --- .../sqlite3/08_add-footprint.down.sql | 21 +++++++++++++++++++ .../sqlite3/08_add-footprint.up.sql | 10 +++++++++ pkg/schema/job.go | 2 +- 3 files changed, 32 insertions(+), 1 deletion(-) diff --git a/internal/repository/migrations/sqlite3/08_add-footprint.down.sql b/internal/repository/migrations/sqlite3/08_add-footprint.down.sql index e69de29..8c99eb5 100644 --- a/internal/repository/migrations/sqlite3/08_add-footprint.down.sql +++ b/internal/repository/migrations/sqlite3/08_add-footprint.down.sql @@ -0,0 +1,21 @@ +ALTER TABLE job DROP energy; +ALTER TABLE job DROP energy_footprint; +ALTER TABLE job ADD COLUMN flops_any_avg; +ALTER TABLE job ADD COLUMN mem_bw_avg; +ALTER TABLE job ADD COLUMN mem_used_max; +ALTER TABLE job ADD COLUMN load_avg; +ALTER TABLE job ADD COLUMN net_bw_avg; +ALTER TABLE job ADD COLUMN net_data_vol_total; +ALTER TABLE job ADD COLUMN file_bw_avg; +ALTER TABLE job ADD COLUMN file_data_vol_total; + +UPDATE job SET flops_any_avg = json_extract(footprint, '$.flops_any_avg'); +UPDATE job SET mem_bw_avg = json_extract(footprint, '$.mem_bw_avg'); +UPDATE job SET mem_used_max = json_extract(footprint, '$.mem_used_max'); +UPDATE job SET load_avg = json_extract(footprint, '$.cpu_load_avg'); +UPDATE job SET net_bw_avg = json_extract(footprint, '$.net_bw_avg'); +UPDATE job SET net_data_vol_total = json_extract(footprint, '$.net_data_vol_total'); +UPDATE job SET file_bw_avg = json_extract(footprint, '$.file_bw_avg'); +UPDATE job SET file_data_vol_total = json_extract(footprint, '$.file_data_vol_total'); + +ALTER TABLE job DROP footprint; diff --git a/internal/repository/migrations/sqlite3/08_add-footprint.up.sql b/internal/repository/migrations/sqlite3/08_add-footprint.up.sql index 643b87e..e5af149 100644 --- a/internal/repository/migrations/sqlite3/08_add-footprint.up.sql +++ b/internal/repository/migrations/sqlite3/08_add-footprint.up.sql @@ -1,4 +1,5 @@ ALTER TABLE job ADD COLUMN energy REAL NOT NULL DEFAULT 0.0; +ALTER TABLE job ADD COLUMN energy_footprint TEXT DEFAULT NULL; ALTER TABLE job ADD COLUMN footprint TEXT DEFAULT NULL; UPDATE job SET footprint = '{"flops_any_avg": 0.0}'; @@ -6,7 +7,16 @@ UPDATE job SET footprint = json_replace(footprint, '$.flops_any_avg', job.flops_ UPDATE job SET footprint = json_insert(footprint, '$.mem_bw_avg', job.mem_bw_avg); UPDATE job SET footprint = json_insert(footprint, '$.mem_used_max', job.mem_used_max); UPDATE job SET footprint = json_insert(footprint, '$.cpu_load_avg', job.load_avg); +UPDATE job SET footprint = json_insert(footprint, '$.net_bw_avg', job.net_bw_avg); +UPDATE job SET footprint = json_insert(footprint, '$.net_data_vol_total', job.net_data_vol_total); +UPDATE job SET footprint = json_insert(footprint, '$.file_bw_avg', job.file_bw_avg); +UPDATE job SET footprint = json_insert(footprint, '$.file_data_vol_total', job.file_data_vol_total); + ALTER TABLE job DROP flops_any_avg; ALTER TABLE job DROP mem_bw_avg; ALTER TABLE job DROP mem_used_max; ALTER TABLE job DROP load_avg; +ALTER TABLE job DROP net_bw_avg; +ALTER TABLE job DROP net_data_vol_total; +ALTER TABLE job DROP file_bw_avg; +ALTER TABLE job DROP file_data_vol_total; diff --git a/pkg/schema/job.go b/pkg/schema/job.go index 83064c7..2a2ea95 100644 --- a/pkg/schema/job.go +++ b/pkg/schema/job.go @@ -32,7 +32,7 @@ type BaseJob struct { Footprint map[string]float64 `json:"footprint"` MetaData map[string]string `json:"metaData"` ConcurrentJobs JobLinkResultList `json:"concurrentJobs"` - Energy float64 `json:"energy"` + Energy float64 `json:"energy" db:"energy"` ArrayJobId int64 `json:"arrayJobId,omitempty" db:"array_job_id" example:"123000"` Walltime int64 `json:"walltime,omitempty" db:"walltime" example:"86400" minimum:"1"` JobID int64 `json:"jobId" db:"job_id" example:"123000"` From e1faba0ff2334d7eabaa2d4aa566c55ca4c55f78 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Thu, 15 Aug 2024 10:39:32 +0200 Subject: [PATCH 04/31] Update cluster json schema --- pkg/schema/schemas/cluster.schema.json | 597 +++++++++++++------------ 1 file changed, 316 insertions(+), 281 deletions(-) diff --git a/pkg/schema/schemas/cluster.schema.json b/pkg/schema/schemas/cluster.schema.json index e745f99..81b138a 100644 --- a/pkg/schema/schemas/cluster.schema.json +++ b/pkg/schema/schemas/cluster.schema.json @@ -1,284 +1,319 @@ { - "$schema": "http://json-schema.org/draft/2020-12/schema", - "$id": "embedfs://cluster.schema.json", - "title": "HPC cluster description", - "description": "Meta data information of a HPC cluster", - "type": "object", - "properties": { - "name": { - "description": "The unique identifier of a cluster", - "type": "string" - }, - "metricConfig": { - "description": "Metric specifications", - "type": "array", - "items": { - "type": "object", - "properties": { - "name": { - "description": "Metric name", - "type": "string" - }, - "unit": { - "description": "Metric unit", - "$ref": "embedfs://unit.schema.json" - }, - "scope": { - "description": "Native measurement resolution", - "type": "string" - }, - "timestep": { - "description": "Frequency of timeseries points", - "type": "integer" - }, - "aggregation": { - "description": "How the metric is aggregated", - "type": "string", - "enum": [ - "sum", - "avg" - ] - }, - "peak": { - "description": "Metric peak threshold (Upper metric limit)", - "type": "number" - }, - "normal": { - "description": "Metric normal threshold", - "type": "number" - }, - "caution": { - "description": "Metric caution threshold (Suspicious but does not require immediate action)", - "type": "number" - }, - "alert": { - "description": "Metric alert threshold (Requires immediate action)", - "type": "number" - }, - "subClusters": { - "description": "Array of cluster hardware partition metric thresholds", - "type": "array", - "items": { - "type": "object", - "properties": { - "name": { - "description": "Hardware partition name", - "type": "string" - }, - "peak": { - "type": "number" - }, - "normal": { - "type": "number" - }, - "caution": { - "type": "number" - }, - "alert": { - "type": "number" - }, - "remove": { - "type": "boolean" - } - }, - "required": [ - "name" - ] - } - } - }, - "required": [ - "name", - "unit", - "scope", - "timestep", - "aggregation", - "peak", - "normal", - "caution", - "alert" - ] - }, - "minItems": 1 - }, - "subClusters": { - "description": "Array of cluster hardware partitions", - "type": "array", - "items": { - "type": "object", - "properties": { - "name": { - "description": "Hardware partition name", - "type": "string" - }, - "processorType": { - "description": "Processor type", - "type": "string" - }, - "socketsPerNode": { - "description": "Number of sockets per node", - "type": "integer" - }, - "coresPerSocket": { - "description": "Number of cores per socket", - "type": "integer" - }, - "threadsPerCore": { - "description": "Number of SMT threads per core", - "type": "integer" - }, - "flopRateScalar": { - "description": "Theoretical node peak flop rate for scalar code in GFlops/s", - "type": "object", - "properties": { - "unit": { - "description": "Metric unit", - "$ref": "embedfs://unit.schema.json" - }, - "value": { - "type": "number" - } - } - }, - "flopRateSimd": { - "description": "Theoretical node peak flop rate for SIMD code in GFlops/s", - "type": "object", - "properties": { - "unit": { - "description": "Metric unit", - "$ref": "embedfs://unit.schema.json" - }, - "value": { - "type": "number" - } - } - }, - "memoryBandwidth": { - "description": "Theoretical node peak memory bandwidth in GB/s", - "type": "object", - "properties": { - "unit": { - "description": "Metric unit", - "$ref": "embedfs://unit.schema.json" - }, - "value": { - "type": "number" - } - } - }, - "nodes": { - "description": "Node list expression", - "type": "string" - }, - "topology": { - "description": "Node topology", - "type": "object", - "properties": { - "node": { - "description": "HwTread lists of node", - "type": "array", - "items": { - "type": "integer" - } - }, - "socket": { - "description": "HwTread lists of sockets", - "type": "array", - "items": { - "type": "array", - "items": { - "type": "integer" - } - } - }, - "memoryDomain": { - "description": "HwTread lists of memory domains", - "type": "array", - "items": { - "type": "array", - "items": { - "type": "integer" - } - } - }, - "die": { - "description": "HwTread lists of dies", - "type": "array", - "items": { - "type": "array", - "items": { - "type": "integer" - } - } - }, - "core": { - "description": "HwTread lists of cores", - "type": "array", - "items": { - "type": "array", - "items": { - "type": "integer" - } - } - }, - "accelerators": { - "type": "array", - "description": "List of of accelerator devices", - "items": { - "type": "object", - "properties": { - "id": { - "type": "string", - "description": "The unique device id" - }, - "type": { - "type": "string", - "description": "The accelerator type", - "enum": [ - "Nvidia GPU", - "AMD GPU", - "Intel GPU" - ] - }, - "model": { - "type": "string", - "description": "The accelerator model" - } - }, - "required": [ - "id", - "type", - "model" - ] - } - } - }, - "required": [ - "node", - "socket", - "memoryDomain" - ] - } - }, - "required": [ - "name", - "nodes", - "topology", - "processorType", - "socketsPerNode", - "coresPerSocket", - "threadsPerCore", - "flopRateScalar", - "flopRateSimd", - "memoryBandwidth" - ] - }, - "minItems": 1 - } + "$schema": "http://json-schema.org/draft/2020-12/schema", + "$id": "embedfs://cluster.schema.json", + "title": "HPC cluster description", + "description": "Meta data information of a HPC cluster", + "type": "object", + "properties": { + "name": { + "description": "The unique identifier of a cluster", + "type": "string" }, - "required": [ - "name", - "metricConfig", - "subClusters" - ] + "metricConfig": { + "description": "Metric specifications", + "type": "array", + "items": { + "type": "object", + "properties": { + "name": { + "description": "Metric name", + "type": "string" + }, + "unit": { + "description": "Metric unit", + "$ref": "embedfs://unit.schema.json" + }, + "scope": { + "description": "Native measurement resolution", + "type": "string" + }, + "timestep": { + "description": "Frequency of timeseries points", + "type": "integer" + }, + "aggregation": { + "description": "How the metric is aggregated", + "type": "string", + "enum": [ + "sum", + "avg" + ] + }, + "footprint": { + "description": "Is it a footprint metric and what type", + "type": "string", + "enum": [ + "avg", + "max", + "min" + ] + }, + "energy": { + "description": "Is it used to calculate job energy", + "type": "boolean" + }, + "lowerIsBetter": { + "description": "Is lower better.", + "type": "boolean" + }, + "peak": { + "description": "Metric peak threshold (Upper metric limit)", + "type": "number" + }, + "normal": { + "description": "Metric normal threshold", + "type": "number" + }, + "caution": { + "description": "Metric caution threshold (Suspicious but does not require immediate action)", + "type": "number" + }, + "alert": { + "description": "Metric alert threshold (Requires immediate action)", + "type": "number" + }, + "subClusters": { + "description": "Array of cluster hardware partition metric thresholds", + "type": "array", + "items": { + "type": "object", + "properties": { + "name": { + "description": "Hardware partition name", + "type": "string" + }, + "footprint": { + "description": "Is it a footprint metric and what type. Overwrite global setting", + "type": "string", + "enum": [ + "avg", + "max", + "min" + ] + }, + "energy": { + "description": "Is it used to calculate job energy. Overwrite global", + "type": "boolean" + }, + "lowerIsBetter": { + "description": "Is lower better. Overwrite global", + "type": "boolean" + }, + "peak": { + "type": "number" + }, + "normal": { + "type": "number" + }, + "caution": { + "type": "number" + }, + "alert": { + "type": "number" + }, + "remove": { + "description": "Remove this metric for this subcluster", + "type": "boolean" + } + }, + "required": [ + "name" + ] + } + } + }, + "required": [ + "name", + "unit", + "scope", + "timestep", + "aggregation", + "peak", + "normal", + "caution", + "alert" + ] + }, + "minItems": 1 + }, + "subClusters": { + "description": "Array of cluster hardware partitions", + "type": "array", + "items": { + "type": "object", + "properties": { + "name": { + "description": "Hardware partition name", + "type": "string" + }, + "processorType": { + "description": "Processor type", + "type": "string" + }, + "socketsPerNode": { + "description": "Number of sockets per node", + "type": "integer" + }, + "coresPerSocket": { + "description": "Number of cores per socket", + "type": "integer" + }, + "threadsPerCore": { + "description": "Number of SMT threads per core", + "type": "integer" + }, + "flopRateScalar": { + "description": "Theoretical node peak flop rate for scalar code in GFlops/s", + "type": "object", + "properties": { + "unit": { + "description": "Metric unit", + "$ref": "embedfs://unit.schema.json" + }, + "value": { + "type": "number" + } + } + }, + "flopRateSimd": { + "description": "Theoretical node peak flop rate for SIMD code in GFlops/s", + "type": "object", + "properties": { + "unit": { + "description": "Metric unit", + "$ref": "embedfs://unit.schema.json" + }, + "value": { + "type": "number" + } + } + }, + "memoryBandwidth": { + "description": "Theoretical node peak memory bandwidth in GB/s", + "type": "object", + "properties": { + "unit": { + "description": "Metric unit", + "$ref": "embedfs://unit.schema.json" + }, + "value": { + "type": "number" + } + } + }, + "nodes": { + "description": "Node list expression", + "type": "string" + }, + "topology": { + "description": "Node topology", + "type": "object", + "properties": { + "node": { + "description": "HwTread lists of node", + "type": "array", + "items": { + "type": "integer" + } + }, + "socket": { + "description": "HwTread lists of sockets", + "type": "array", + "items": { + "type": "array", + "items": { + "type": "integer" + } + } + }, + "memoryDomain": { + "description": "HwTread lists of memory domains", + "type": "array", + "items": { + "type": "array", + "items": { + "type": "integer" + } + } + }, + "die": { + "description": "HwTread lists of dies", + "type": "array", + "items": { + "type": "array", + "items": { + "type": "integer" + } + } + }, + "core": { + "description": "HwTread lists of cores", + "type": "array", + "items": { + "type": "array", + "items": { + "type": "integer" + } + } + }, + "accelerators": { + "type": "array", + "description": "List of of accelerator devices", + "items": { + "type": "object", + "properties": { + "id": { + "type": "string", + "description": "The unique device id" + }, + "type": { + "type": "string", + "description": "The accelerator type", + "enum": [ + "Nvidia GPU", + "AMD GPU", + "Intel GPU" + ] + }, + "model": { + "type": "string", + "description": "The accelerator model" + } + }, + "required": [ + "id", + "type", + "model" + ] + } + } + }, + "required": [ + "node", + "socket", + "memoryDomain" + ] + } + }, + "required": [ + "name", + "nodes", + "topology", + "processorType", + "socketsPerNode", + "coresPerSocket", + "threadsPerCore", + "flopRateScalar", + "flopRateSimd", + "memoryBandwidth" + ] + }, + "minItems": 1 + } + }, + "required": [ + "name", + "metricConfig", + "subClusters" + ] } From 5c99f5f8bbb44366dc13d0250bff1087dfa2e0cd Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Thu, 15 Aug 2024 12:35:11 +0200 Subject: [PATCH 05/31] Only add footprint columns if not 0 --- .../repository/migrations/sqlite3/08_add-footprint.up.sql | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/internal/repository/migrations/sqlite3/08_add-footprint.up.sql b/internal/repository/migrations/sqlite3/08_add-footprint.up.sql index e5af149..0ffbf37 100644 --- a/internal/repository/migrations/sqlite3/08_add-footprint.up.sql +++ b/internal/repository/migrations/sqlite3/08_add-footprint.up.sql @@ -7,10 +7,10 @@ UPDATE job SET footprint = json_replace(footprint, '$.flops_any_avg', job.flops_ UPDATE job SET footprint = json_insert(footprint, '$.mem_bw_avg', job.mem_bw_avg); UPDATE job SET footprint = json_insert(footprint, '$.mem_used_max', job.mem_used_max); UPDATE job SET footprint = json_insert(footprint, '$.cpu_load_avg', job.load_avg); -UPDATE job SET footprint = json_insert(footprint, '$.net_bw_avg', job.net_bw_avg); -UPDATE job SET footprint = json_insert(footprint, '$.net_data_vol_total', job.net_data_vol_total); -UPDATE job SET footprint = json_insert(footprint, '$.file_bw_avg', job.file_bw_avg); -UPDATE job SET footprint = json_insert(footprint, '$.file_data_vol_total', job.file_data_vol_total); +UPDATE job SET footprint = json_insert(footprint, '$.net_bw_avg', job.net_bw_avg) IF job.net_bw_avg != 0; +UPDATE job SET footprint = json_insert(footprint, '$.net_data_vol_total', job.net_data_vol_total) IF job.net_data_vol_total != 0; +UPDATE job SET footprint = json_insert(footprint, '$.file_bw_avg', job.file_bw_avg) IF job.file_bw_avg != 0; +UPDATE job SET footprint = json_insert(footprint, '$.file_data_vol_total', job.file_data_vol_total) IF job.file_data_vol_total != 0; ALTER TABLE job DROP flops_any_avg; ALTER TABLE job DROP mem_bw_avg; From d6a88896d059023eeac8dbad415a0ce065f328fe Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Thu, 15 Aug 2024 12:36:21 +0200 Subject: [PATCH 06/31] Refactor: Reduce struct memory size --- pkg/schema/cluster.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/schema/cluster.go b/pkg/schema/cluster.go index a77bd32..e9aa178 100644 --- a/pkg/schema/cluster.go +++ b/pkg/schema/cluster.go @@ -47,11 +47,11 @@ type SubCluster struct { type SubClusterConfig struct { Name string `json:"name"` + Footprint string `json:"footprint,omitempty"` Peak float64 `json:"peak"` Normal float64 `json:"normal"` Caution float64 `json:"caution"` Alert float64 `json:"alert"` - Footprint string `json:"footprint,omitempty"` Remove bool `json:"remove"` LowerIsBetter bool `json:"lowerIsBetter"` Energy bool `json:"energy"` @@ -62,14 +62,14 @@ type MetricConfig struct { Name string `json:"name"` Scope MetricScope `json:"scope"` Aggregation string `json:"aggregation"` + Footprint string `json:"footprint,omitempty"` SubClusters []*SubClusterConfig `json:"subClusters,omitempty"` - Timestep int `json:"timestep"` Peak float64 `json:"peak"` Normal float64 `json:"normal"` Caution float64 `json:"caution"` Alert float64 `json:"alert"` + Timestep int `json:"timestep"` LowerIsBetter bool `json:"lowerIsBetter"` - Footprint string `json:"footprint,omitempty"` Energy bool `json:"energy"` } From 5e074dad1029062a39241daaa8319f20f5f36736 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Thu, 15 Aug 2024 12:39:14 +0200 Subject: [PATCH 07/31] Resolve error in migration --- .../repository/migrations/sqlite3/08_add-footprint.up.sql | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/internal/repository/migrations/sqlite3/08_add-footprint.up.sql b/internal/repository/migrations/sqlite3/08_add-footprint.up.sql index 0ffbf37..93f0659 100644 --- a/internal/repository/migrations/sqlite3/08_add-footprint.up.sql +++ b/internal/repository/migrations/sqlite3/08_add-footprint.up.sql @@ -7,10 +7,10 @@ UPDATE job SET footprint = json_replace(footprint, '$.flops_any_avg', job.flops_ UPDATE job SET footprint = json_insert(footprint, '$.mem_bw_avg', job.mem_bw_avg); UPDATE job SET footprint = json_insert(footprint, '$.mem_used_max', job.mem_used_max); UPDATE job SET footprint = json_insert(footprint, '$.cpu_load_avg', job.load_avg); -UPDATE job SET footprint = json_insert(footprint, '$.net_bw_avg', job.net_bw_avg) IF job.net_bw_avg != 0; -UPDATE job SET footprint = json_insert(footprint, '$.net_data_vol_total', job.net_data_vol_total) IF job.net_data_vol_total != 0; -UPDATE job SET footprint = json_insert(footprint, '$.file_bw_avg', job.file_bw_avg) IF job.file_bw_avg != 0; -UPDATE job SET footprint = json_insert(footprint, '$.file_data_vol_total', job.file_data_vol_total) IF job.file_data_vol_total != 0; +UPDATE job SET footprint = json_insert(footprint, '$.net_bw_avg', job.net_bw_avg) WHERE job.net_bw_avg != 0; +UPDATE job SET footprint = json_insert(footprint, '$.net_data_vol_total', job.net_data_vol_total) WHERE job.net_data_vol_total != 0; +UPDATE job SET footprint = json_insert(footprint, '$.file_bw_avg', job.file_bw_avg) WHERE job.file_bw_avg != 0; +UPDATE job SET footprint = json_insert(footprint, '$.file_data_vol_total', job.file_data_vol_total) WHERE job.file_data_vol_total != 0; ALTER TABLE job DROP flops_any_avg; ALTER TABLE job DROP mem_bw_avg; From 49e0a2c0550c208264747c8dce3c5812b9a0f921 Mon Sep 17 00:00:00 2001 From: Christoph Kluge Date: Thu, 15 Aug 2024 14:33:04 +0200 Subject: [PATCH 08/31] fix: add compatibility for footprint metrics without config --- .../src/generic/helper/JobFootprint.svelte | 222 ++++++++++-------- 1 file changed, 129 insertions(+), 93 deletions(-) diff --git a/web/frontend/src/generic/helper/JobFootprint.svelte b/web/frontend/src/generic/helper/JobFootprint.svelte index 82818e3..4e1abb0 100644 --- a/web/frontend/src/generic/helper/JobFootprint.svelte +++ b/web/frontend/src/generic/helper/JobFootprint.svelte @@ -67,62 +67,74 @@ export let height = "310px"; const footprintData = job?.footprint?.map((jf) => { - // Unit const fmc = getContext("getMetricConfig")(job.cluster, job.subCluster, jf.name); - const unit = (fmc?.unit?.prefix ? fmc.unit.prefix : "") + (fmc?.unit?.base ? fmc.unit.base : "") + if (fmc) { + // Unit + const unit = (fmc?.unit?.prefix ? fmc.unit.prefix : "") + (fmc?.unit?.base ? fmc.unit.base : "") - // Threshold / -Differences - const fmt = findJobThresholds(job, fmc); - if (jf.name === "flops_any") fmt.peak = round(fmt.peak * 0.85, 0); + // Threshold / -Differences + const fmt = findJobThresholds(job, fmc); + if (jf.name === "flops_any") fmt.peak = round(fmt.peak * 0.85, 0); - // Define basic data -> Value: Use as Provided - const fmBase = { - name: jf.name + ' (' + jf.stat + ')', - avg: jf.value, - unit: unit, - max: fmt.peak, - dir: fmc.lowerIsBetter - }; + // Define basic data -> Value: Use as Provided + const fmBase = { + name: jf.name + ' (' + jf.stat + ')', + avg: jf.value, + unit: unit, + max: fmt.peak, + dir: fmc.lowerIsBetter + }; - if (evalFootprint(jf.value, fmt, fmc.lowerIsBetter, "alert")) { + if (evalFootprint(jf.value, fmt, fmc.lowerIsBetter, "alert")) { + return { + ...fmBase, + color: "danger", + message: `Metric average way ${fmc.lowerIsBetter ? "above" : "below"} expected normal thresholds.`, + impact: 3 + }; + } else if (evalFootprint(jf.value, fmt, fmc.lowerIsBetter, "caution")) { + return { + ...fmBase, + color: "warning", + message: `Metric average ${fmc.lowerIsBetter ? "above" : "below"} expected normal thresholds.`, + impact: 2, + }; + } else if (evalFootprint(jf.value, fmt, fmc.lowerIsBetter, "normal")) { + return { + ...fmBase, + color: "success", + message: "Metric average within expected thresholds.", + impact: 1, + }; + } else if (evalFootprint(jf.value, fmt, fmc.lowerIsBetter, "peak")) { + return { + ...fmBase, + color: "info", + message: + "Metric average above expected normal thresholds: Check for artifacts recommended.", + impact: 0, + }; + } else { + return { + ...fmBase, + color: "secondary", + message: + "Metric average above expected peak threshold: Check for artifacts!", + impact: -1, + }; + } + } else { // No matching metric config: display as single value return { - ...fmBase, - color: "danger", - message: `Metric average way ${fmc.lowerIsBetter ? "above" : "below"} expected normal thresholds.`, - impact: 3 - }; - } else if (evalFootprint(jf.value, fmt, fmc.lowerIsBetter, "caution")) { - return { - ...fmBase, - color: "warning", - message: `Metric average ${fmc.lowerIsBetter ? "above" : "below"} expected normal thresholds.`, - impact: 2, - }; - } else if (evalFootprint(jf.value, fmt, fmc.lowerIsBetter, "normal")) { - return { - ...fmBase, - color: "success", - message: "Metric average within expected thresholds.", - impact: 1, - }; - } else if (evalFootprint(jf.value, fmt, fmc.lowerIsBetter, "peak")) { - return { - ...fmBase, - color: "info", + name: jf.name + ' (' + jf.stat + ')', + avg: jf.value, message: - "Metric average above expected normal thresholds: Check for artifacts recommended.", - impact: 0, - }; - } else { - return { - ...fmBase, - color: "secondary", - message: - "Metric average above expected peak threshold: Check for artifacts!", - impact: -1, + `No config for metric ${jf.name} found.`, + impact: 4, }; } - }); + }).sort(function (a, b) { // Sort by impact value primarily, within impact sort name alphabetically + return a.impact - b.impact || ((a.name > b.name) ? 1 : ((b.name > a.name) ? -1 : 0)); + });; function evalFootprint(mean, thresholds, lowerIsBetter, level) { // Handle Metrics in which less value is better @@ -159,37 +171,76 @@ {/if} {#each footprintData as fpd, index} -
-
 {fpd.name}
- -
-
- - {#if fpd.impact === 3 || fpd.impact === -1} - - {:else if fpd.impact === 2} - - {/if} - - {#if fpd.impact === 3} - - {:else if fpd.impact === 2} - - {:else if fpd.impact === 1} - - {:else if fpd.impact === 0} - - {:else if fpd.impact === -1} - - {/if} + {#if fpd.impact !== 4} +
+
 {fpd.name}
+ +
+
+ + {#if fpd.impact === 3 || fpd.impact === -1} + + {:else if fpd.impact === 2} + + {/if} + + {#if fpd.impact === 3} + + {:else if fpd.impact === 2} + + {:else if fpd.impact === 1} + + {:else if fpd.impact === 0} + + {:else if fpd.impact === -1} + + {/if} +
+
+ + {fpd.avg} / {fpd.max} + {fpd.unit}   +
+ {fpd.message} +
+ + {#if fpd.dir} + + + + {/if} + + + + {#if !fpd.dir} + + + + {/if} + + {:else} +
- - {fpd.avg} / {fpd.max} - {fpd.unit}   +  {fpd.name} +
+
+
+ +
+
+ {fpd.avg}  +
{fpd.message} -
- - {#if fpd.dir} - - - - {/if} - - - - {#if !fpd.dir} - - - - {/if} - + {/if} {/each} {#if job?.metaData?.message}
From a8a27c9b51cb9002498634cf625423e0896c985c Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Mon, 19 Aug 2024 12:11:53 +0200 Subject: [PATCH 09/31] Add project index to job table --- .../repository/migrations/sqlite3/08_add-footprint.up.sql | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/internal/repository/migrations/sqlite3/08_add-footprint.up.sql b/internal/repository/migrations/sqlite3/08_add-footprint.up.sql index 93f0659..bcd6494 100644 --- a/internal/repository/migrations/sqlite3/08_add-footprint.up.sql +++ b/internal/repository/migrations/sqlite3/08_add-footprint.up.sql @@ -1,8 +1,12 @@ +CREATE INDEX IF NOT EXISTS job_by_project ON job (project); +CREATE INDEX IF NOT EXISTS job_list_projects ON job (project, job_state); + ALTER TABLE job ADD COLUMN energy REAL NOT NULL DEFAULT 0.0; ALTER TABLE job ADD COLUMN energy_footprint TEXT DEFAULT NULL; ALTER TABLE job ADD COLUMN footprint TEXT DEFAULT NULL; UPDATE job SET footprint = '{"flops_any_avg": 0.0}'; + UPDATE job SET footprint = json_replace(footprint, '$.flops_any_avg', job.flops_any_avg); UPDATE job SET footprint = json_insert(footprint, '$.mem_bw_avg', job.mem_bw_avg); UPDATE job SET footprint = json_insert(footprint, '$.mem_used_max', job.mem_used_max); From 1758275f115cef35c17ca255ade6fbd4d3db4c11 Mon Sep 17 00:00:00 2001 From: Christoph Kluge Date: Thu, 22 Aug 2024 14:01:27 +0200 Subject: [PATCH 10/31] fix: fix getMetricConfigDeep util function - threw error for mismatching metric availability between clusters --- web/frontend/src/generic/utils.js | 43 +++++++++++++++---------------- 1 file changed, 21 insertions(+), 22 deletions(-) diff --git a/web/frontend/src/generic/utils.js b/web/frontend/src/generic/utils.js index bb63a4f..3aaafa1 100644 --- a/web/frontend/src/generic/utils.js +++ b/web/frontend/src/generic/utils.js @@ -301,7 +301,7 @@ export function stickyHeader(datatableHeaderSelector, updatePading) { onDestroy(() => document.removeEventListener("scroll", onscroll)); } -export function checkMetricDisabled(m, c, s) { //[m]etric, [c]luster, [s]ubcluster +export function checkMetricDisabled(m, c, s) { // [m]etric, [c]luster, [s]ubcluster const metrics = getContext("globalMetrics"); const result = metrics?.find((gm) => gm.name === m)?.availability?.find((av) => av.cluster === c)?.subClusters?.includes(s) return !result @@ -309,23 +309,22 @@ export function checkMetricDisabled(m, c, s) { //[m]etric, [c]luster, [s]ubclust export function getStatsItems() { // console.time('stats') - // console.log('getStatsItems ...') const globalMetrics = getContext("globalMetrics") const result = globalMetrics.map((gm) => { if (gm?.footprint) { - // Footprint contains suffix naming the used stat-type // console.time('deep') - // console.log('Deep Config for', gm.name) const mc = getMetricConfigDeep(gm.name, null, null) // console.timeEnd('deep') - return { - field: gm.name + '_' + gm.footprint, - text: gm.name + ' (' + gm.footprint + ')', - metric: gm.name, - from: 0, - to: mc.peak, - peak: mc.peak, - enabled: false + if (mc) { + return { + field: gm.name + '_' + gm.footprint, + text: gm.name + ' (' + gm.footprint + ')', + metric: gm.name, + from: 0, + to: mc.peak, + peak: mc.peak, + enabled: false + } } } return null @@ -336,11 +335,9 @@ export function getStatsItems() { export function getSortItems() { //console.time('sort') - //console.log('getSortItems ...') const globalMetrics = getContext("globalMetrics") const result = globalMetrics.map((gm) => { if (gm?.footprint) { - // Footprint contains suffix naming the used stat-type return { field: gm.name + '_' + gm.footprint, type: 'foot', @@ -357,21 +354,22 @@ export function getSortItems() { function getMetricConfigDeep(metric, cluster, subCluster) { const clusters = getContext("clusters"); if (cluster != null) { - let c = clusters.find((c) => c.name == cluster); + const c = clusters.find((c) => c.name == cluster); if (subCluster != null) { - let sc = c.subClusters.find((sc) => sc.name == subCluster); + const sc = c.subClusters.find((sc) => sc.name == subCluster); return sc.metricConfig.find((mc) => mc.name == metric) } else { let result; for (let sc of c.subClusters) { const mc = sc.metricConfig.find((mc) => mc.name == metric) - if (result) { // If lowerIsBetter: Peak is still maximum value, no special case required + if (result && mc) { // update result; If lowerIsBetter: Peak is still maximum value, no special case required result.alert = (mc.alert > result.alert) ? mc.alert : result.alert result.caution = (mc.caution > result.caution) ? mc.caution : result.caution result.normal = (mc.normal > result.normal) ? mc.normal : result.normal result.peak = (mc.peak > result.peak) ? mc.peak : result.peak - } else { - if (mc) result = {...mc}; + } else if (mc) { + // start new result + result = {...mc}; } } return result @@ -381,13 +379,14 @@ function getMetricConfigDeep(metric, cluster, subCluster) { for (let c of clusters) { for (let sc of c.subClusters) { const mc = sc.metricConfig.find((mc) => mc.name == metric) - if (result) { // If lowerIsBetter: Peak is still maximum value, no special case required + if (result && mc) { // update result; If lowerIsBetter: Peak is still maximum value, no special case required result.alert = (mc.alert > result.alert) ? mc.alert : result.alert result.caution = (mc.caution > result.caution) ? mc.caution : result.caution result.normal = (mc.normal > result.normal) ? mc.normal : result.normal result.peak = (mc.peak > result.peak) ? mc.peak : result.peak - } else { - if (mc) result = {...mc}; + } else if (mc) { + // Start new result + result = {...mc}; } } } From 084f89fa32b55467030f452b227ffd96adaefd35 Mon Sep 17 00:00:00 2001 From: Christoph Kluge Date: Thu, 22 Aug 2024 14:46:27 +0200 Subject: [PATCH 11/31] fix: fix svelte source paths in makefile --- Makefile | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/Makefile b/Makefile index 508c9fb..b673e79 100644 --- a/Makefile +++ b/Makefile @@ -22,11 +22,21 @@ SVELTE_COMPONENTS = status \ header SVELTE_TARGETS = $(addprefix $(FRONTEND)/public/build/,$(addsuffix .js, $(SVELTE_COMPONENTS))) -SVELTE_SRC = $(wildcard $(FRONTEND)/src/*.svelte) \ - $(wildcard $(FRONTEND)/src/*.js) \ - $(wildcard $(FRONTEND)/src/filters/*.svelte) \ - $(wildcard $(FRONTEND)/src/plots/*.svelte) \ - $(wildcard $(FRONTEND)/src/joblist/*.svelte) +SVELTE_SRC = $(wildcard $(FRONTEND)/src/*.svelte) \ + $(wildcard $(FRONTEND)/src/*.js) \ + $(wildcard $(FRONTEND)/src/analysis/*.svelte) \ + $(wildcard $(FRONTEND)/src/config/*.svelte) \ + $(wildcard $(FRONTEND)/src/config/admin/*.svelte) \ + $(wildcard $(FRONTEND)/src/config/user/*.svelte) \ + $(wildcard $(FRONTEND)/src/generic/*.js) \ + $(wildcard $(FRONTEND)/src/generic/*.svelte) \ + $(wildcard $(FRONTEND)/src/generic/filters/*.svelte) \ + $(wildcard $(FRONTEND)/src/generic/plots/*.svelte) \ + $(wildcard $(FRONTEND)/src/generic/joblist/*.svelte) \ + $(wildcard $(FRONTEND)/src/generic/helper/*.svelte) \ + $(wildcard $(FRONTEND)/src/generic/select/*.svelte) \ + $(wildcard $(FRONTEND)/src/header/*.svelte) \ + $(wildcard $(FRONTEND)/src/job/*.svelte) .PHONY: clean distclean test tags frontend swagger graphql $(TARGET) From 56ebb301ca5361e8930370a2f5543d4cddae58f3 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Tue, 27 Aug 2024 10:14:33 +0200 Subject: [PATCH 12/31] Start to restructure Does not compile --- internal/metricdata/metricdata.go | 66 ------------------------------ pkg/archive/archive.go | 68 +++++++++++++++++++++++++++++++ 2 files changed, 68 insertions(+), 66 deletions(-) diff --git a/internal/metricdata/metricdata.go b/internal/metricdata/metricdata.go index eba9dee..3d93102 100644 --- a/internal/metricdata/metricdata.go +++ b/internal/metricdata/metricdata.go @@ -8,7 +8,6 @@ import ( "context" "encoding/json" "fmt" - "math" "time" "github.com/ClusterCockpit/cc-backend/internal/config" @@ -292,68 +291,3 @@ func prepareJobData( jobData.AddNodeScope("mem_bw") } } - -// Writes a running job to the job-archive -func ArchiveJob(job *schema.Job, ctx context.Context) (*schema.JobMeta, error) { - allMetrics := make([]string, 0) - metricConfigs := archive.GetCluster(job.Cluster).MetricConfig - for _, mc := range metricConfigs { - allMetrics = append(allMetrics, mc.Name) - } - - // TODO: Talk about this! What resolutions to store data at... - scopes := []schema.MetricScope{schema.MetricScopeNode} - if job.NumNodes <= 8 { - scopes = append(scopes, schema.MetricScopeCore) - } - - if job.NumAcc > 0 { - scopes = append(scopes, schema.MetricScopeAccelerator) - } - - jobData, err := LoadData(job, allMetrics, scopes, ctx) - if err != nil { - log.Error("Error wile loading job data for archiving") - return nil, err - } - - jobMeta := &schema.JobMeta{ - BaseJob: job.BaseJob, - StartTime: job.StartTime.Unix(), - Statistics: make(map[string]schema.JobStatistics), - } - - for metric, data := range jobData { - avg, min, max := 0.0, math.MaxFloat32, -math.MaxFloat32 - nodeData, ok := data["node"] - if !ok { - // TODO/FIXME: Calc average for non-node metrics as well! - continue - } - - for _, series := range nodeData.Series { - avg += series.Statistics.Avg - min = math.Min(min, series.Statistics.Min) - max = math.Max(max, series.Statistics.Max) - } - - jobMeta.Statistics[metric] = schema.JobStatistics{ - Unit: schema.Unit{ - Prefix: archive.GetMetricConfig(job.Cluster, metric).Unit.Prefix, - Base: archive.GetMetricConfig(job.Cluster, metric).Unit.Base, - }, - Avg: avg / float64(job.NumNodes), - Min: min, - Max: max, - } - } - - // If the file based archive is disabled, - // only return the JobMeta structure as the - // statistics in there are needed. - if !useArchive { - return jobMeta, nil - } - - return jobMeta, archive.GetHandle().ImportJob(jobMeta, &jobData) -} diff --git a/pkg/archive/archive.go b/pkg/archive/archive.go index 56c5d47..765a2ce 100644 --- a/pkg/archive/archive.go +++ b/pkg/archive/archive.go @@ -5,10 +5,13 @@ package archive import ( + "context" "encoding/json" "fmt" + "math" "sync" + "github.com/ClusterCockpit/cc-backend/internal/metricdata" "github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/lrucache" "github.com/ClusterCockpit/cc-backend/pkg/schema" @@ -102,6 +105,71 @@ func GetHandle() ArchiveBackend { return ar } +// Writes a running job to the job-archive +func ArchiveJob(job *schema.Job, ctx context.Context) (*schema.JobMeta, error) { + allMetrics := make([]string, 0) + metricConfigs := GetCluster(job.Cluster).MetricConfig + for _, mc := range metricConfigs { + allMetrics = append(allMetrics, mc.Name) + } + + // TODO: Talk about this! What resolutions to store data at... + scopes := []schema.MetricScope{schema.MetricScopeNode} + if job.NumNodes <= 8 { + scopes = append(scopes, schema.MetricScopeCore) + } + + if job.NumAcc > 0 { + scopes = append(scopes, schema.MetricScopeAccelerator) + } + + jobData, err := metricdata.LoadData(job, allMetrics, scopes, ctx) + if err != nil { + log.Error("Error wile loading job data for archiving") + return nil, err + } + + jobMeta := &schema.JobMeta{ + BaseJob: job.BaseJob, + StartTime: job.StartTime.Unix(), + Statistics: make(map[string]schema.JobStatistics), + } + + for metric, data := range jobData { + avg, min, max := 0.0, math.MaxFloat32, -math.MaxFloat32 + nodeData, ok := data["node"] + if !ok { + // TODO/FIXME: Calc average for non-node metrics as well! + continue + } + + for _, series := range nodeData.Series { + avg += series.Statistics.Avg + min = math.Min(min, series.Statistics.Min) + max = math.Max(max, series.Statistics.Max) + } + + jobMeta.Statistics[metric] = schema.JobStatistics{ + Unit: schema.Unit{ + Prefix: GetMetricConfig(job.Cluster, metric).Unit.Prefix, + Base: GetMetricConfig(job.Cluster, metric).Unit.Base, + }, + Avg: avg / float64(job.NumNodes), + Min: min, + Max: max, + } + } + + // If the file based archive is disabled, + // only return the JobMeta structure as the + // statistics in there are needed. + if !useArchive { + return jobMeta, nil + } + + return jobMeta, archive.GetHandle().ImportJob(jobMeta, &jobData) +} + // Helper to metricdata.LoadAverages(). func LoadAveragesFromArchive( job *schema.Job, From f914a312f53362108d63e67d208cd0cec61c4601 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Tue, 27 Aug 2024 16:44:16 +0200 Subject: [PATCH 13/31] Introduce metricDataDispatcher Does not compile yet --- cmd/cc-backend/main.go | 2 +- internal/metricDataDispatcher/dataLoader.go | 131 ++++++++++++++++++++ internal/metricdata/metricdata.go | 119 +----------------- 3 files changed, 138 insertions(+), 114 deletions(-) create mode 100644 internal/metricDataDispatcher/dataLoader.go diff --git a/cmd/cc-backend/main.go b/cmd/cc-backend/main.go index 3a49923..9f7e673 100644 --- a/cmd/cc-backend/main.go +++ b/cmd/cc-backend/main.go @@ -181,7 +181,7 @@ func main() { log.Fatalf("failed to initialize archive: %s", err.Error()) } - if err := metricdata.Init(config.Keys.DisableArchive); err != nil { + if err := metricdata.Init(); err != nil { log.Fatalf("failed to initialize metricdata repository: %s", err.Error()) } diff --git a/internal/metricDataDispatcher/dataLoader.go b/internal/metricDataDispatcher/dataLoader.go new file mode 100644 index 0000000..a463ada --- /dev/null +++ b/internal/metricDataDispatcher/dataLoader.go @@ -0,0 +1,131 @@ +// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. +package metricDataDispatcher + +import ( + "context" + "fmt" + "time" + + "github.com/ClusterCockpit/cc-backend/internal/config" + "github.com/ClusterCockpit/cc-backend/internal/metricdata" + "github.com/ClusterCockpit/cc-backend/pkg/archive" + "github.com/ClusterCockpit/cc-backend/pkg/log" + "github.com/ClusterCockpit/cc-backend/pkg/lrucache" + "github.com/ClusterCockpit/cc-backend/pkg/schema" +) + +var cache *lrucache.Cache = lrucache.New(128 * 1024 * 1024) + +func cacheKey( + job *schema.Job, + metrics []string, + scopes []schema.MetricScope, +) string { + // Duration and StartTime do not need to be in the cache key as StartTime is less unique than + // job.ID and the TTL of the cache entry makes sure it does not stay there forever. + return fmt.Sprintf("%d(%s):[%v],[%v]", + job.ID, job.State, metrics, scopes) +} + +// Fetches the metric data for a job. +func LoadData(job *schema.Job, + metrics []string, + scopes []schema.MetricScope, + ctx context.Context, +) (schema.JobData, error) { + data := cache.Get(cacheKey(job, metrics, scopes), func() (_ interface{}, ttl time.Duration, size int) { + var jd schema.JobData + var err error + + if job.State == schema.JobStateRunning || + job.MonitoringStatus == schema.MonitoringStatusRunningOrArchiving || + !config.Keys.DisableArchive { + + repo, ok := metricdata.GetMetricDataRepo(job.Cluster) + + if !ok { + return fmt.Errorf("METRICDATA/METRICDATA > no metric data repository configured for '%s'", job.Cluster), 0, 0 + } + + if scopes == nil { + scopes = append(scopes, schema.MetricScopeNode) + } + + if metrics == nil { + cluster := archive.GetCluster(job.Cluster) + for _, mc := range cluster.MetricConfig { + metrics = append(metrics, mc.Name) + } + } + + jd, err = repo.LoadData(job, metrics, scopes, ctx) + if err != nil { + if len(jd) != 0 { + log.Warnf("partial error: %s", err.Error()) + // return err, 0, 0 // Reactivating will block archiving on one partial error + } else { + log.Error("Error while loading job data from metric repository") + return err, 0, 0 + } + } + size = jd.Size() + } else { + jd, err = archive.GetHandle().LoadJobData(job) + if err != nil { + log.Error("Error while loading job data from archive") + return err, 0, 0 + } + + // Avoid sending unrequested data to the client: + if metrics != nil || scopes != nil { + if metrics == nil { + metrics = make([]string, 0, len(jd)) + for k := range jd { + metrics = append(metrics, k) + } + } + + res := schema.JobData{} + for _, metric := range metrics { + if perscope, ok := jd[metric]; ok { + if len(perscope) > 1 { + subset := make(map[schema.MetricScope]*schema.JobMetric) + for _, scope := range scopes { + if jm, ok := perscope[scope]; ok { + subset[scope] = jm + } + } + + if len(subset) > 0 { + perscope = subset + } + } + + res[metric] = perscope + } + } + jd = res + } + size = jd.Size() + } + + ttl = 5 * time.Hour + if job.State == schema.JobStateRunning { + ttl = 2 * time.Minute + } + + prepareJobData(jd, scopes) + + return jd, ttl, size + }) + + if err, ok := data.(error); ok { + log.Error("Error in returned dataset") + return nil, err + } + + return data.(schema.JobData), nil +} diff --git a/internal/metricdata/metricdata.go b/internal/metricdata/metricdata.go index 3d93102..feefb0a 100644 --- a/internal/metricdata/metricdata.go +++ b/internal/metricdata/metricdata.go @@ -13,7 +13,6 @@ import ( "github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-backend/pkg/archive" "github.com/ClusterCockpit/cc-backend/pkg/log" - "github.com/ClusterCockpit/cc-backend/pkg/lrucache" "github.com/ClusterCockpit/cc-backend/pkg/schema" ) @@ -34,10 +33,7 @@ type MetricDataRepository interface { var metricDataRepos map[string]MetricDataRepository = map[string]MetricDataRepository{} -var useArchive bool - -func Init(disableArchive bool) error { - useArchive = !disableArchive +func Init() error { for _, cluster := range config.Keys.Clusters { if cluster.MetricDataRepository != nil { var kind struct { @@ -72,106 +68,14 @@ func Init(disableArchive bool) error { return nil } -var cache *lrucache.Cache = lrucache.New(128 * 1024 * 1024) +func GetMetricDataRepo(cluster string) MetricDataRepository { + repo, ok := metricDataRepos[cluster] -// Fetches the metric data for a job. -func LoadData(job *schema.Job, - metrics []string, - scopes []schema.MetricScope, - ctx context.Context, -) (schema.JobData, error) { - data := cache.Get(cacheKey(job, metrics, scopes), func() (_ interface{}, ttl time.Duration, size int) { - var jd schema.JobData - var err error - - if job.State == schema.JobStateRunning || - job.MonitoringStatus == schema.MonitoringStatusRunningOrArchiving || - !useArchive { - - repo, ok := metricDataRepos[job.Cluster] - - if !ok { - return fmt.Errorf("METRICDATA/METRICDATA > no metric data repository configured for '%s'", job.Cluster), 0, 0 - } - - if scopes == nil { - scopes = append(scopes, schema.MetricScopeNode) - } - - if metrics == nil { - cluster := archive.GetCluster(job.Cluster) - for _, mc := range cluster.MetricConfig { - metrics = append(metrics, mc.Name) - } - } - - jd, err = repo.LoadData(job, metrics, scopes, ctx) - if err != nil { - if len(jd) != 0 { - log.Warnf("partial error: %s", err.Error()) - // return err, 0, 0 // Reactivating will block archiving on one partial error - } else { - log.Error("Error while loading job data from metric repository") - return err, 0, 0 - } - } - size = jd.Size() - } else { - jd, err = archive.GetHandle().LoadJobData(job) - if err != nil { - log.Error("Error while loading job data from archive") - return err, 0, 0 - } - - // Avoid sending unrequested data to the client: - if metrics != nil || scopes != nil { - if metrics == nil { - metrics = make([]string, 0, len(jd)) - for k := range jd { - metrics = append(metrics, k) - } - } - - res := schema.JobData{} - for _, metric := range metrics { - if perscope, ok := jd[metric]; ok { - if len(perscope) > 1 { - subset := make(map[schema.MetricScope]*schema.JobMetric) - for _, scope := range scopes { - if jm, ok := perscope[scope]; ok { - subset[scope] = jm - } - } - - if len(subset) > 0 { - perscope = subset - } - } - - res[metric] = perscope - } - } - jd = res - } - size = jd.Size() - } - - ttl = 5 * time.Hour - if job.State == schema.JobStateRunning { - ttl = 2 * time.Minute - } - - prepareJobData(jd, scopes) - - return jd, ttl, size - }) - - if err, ok := data.(error); ok { - log.Error("Error in returned dataset") - return nil, err + if !ok { + return fmt.Errorf("METRICDATA/METRICDATA > no metric data repository configured for '%s'", job.Cluster), 0, 0 } - return data.(schema.JobData), nil + return repo } // Used for the jobsFootprint GraphQL-Query. TODO: Rename/Generalize. @@ -249,17 +153,6 @@ func LoadNodeData( return data, nil } -func cacheKey( - job *schema.Job, - metrics []string, - scopes []schema.MetricScope, -) string { - // Duration and StartTime do not need to be in the cache key as StartTime is less unique than - // job.ID and the TTL of the cache entry makes sure it does not stay there forever. - return fmt.Sprintf("%d(%s):[%v],[%v]", - job.ID, job.State, metrics, scopes) -} - // For /monitoring/job/ and some other places, flops_any and mem_bw need // to be available at the scope 'node'. If a job has a lot of nodes, // statisticsSeries should be available so that a min/median/max Graph can be From cff60eb51c910d3cab9f3a20b2c22e77c46a6f36 Mon Sep 17 00:00:00 2001 From: Christoph Kluge Date: Tue, 27 Aug 2024 17:43:48 +0200 Subject: [PATCH 14/31] increase server timeout limit, improve and add db indices - change energy footprint key to string --- cmd/cc-backend/server.go | 4 ++-- .../migrations/sqlite3/08_add-footprint.up.sql | 8 ++++++-- pkg/archive/clusterConfig.go | 4 ++-- pkg/schema/cluster.go | 4 ++-- pkg/schema/schemas/cluster.schema.json | 12 ++++++++++-- 5 files changed, 22 insertions(+), 10 deletions(-) diff --git a/cmd/cc-backend/server.go b/cmd/cc-backend/server.go index d2b62e2..7ca9ccb 100644 --- a/cmd/cc-backend/server.go +++ b/cmd/cc-backend/server.go @@ -260,8 +260,8 @@ func serverStart() { }) server = &http.Server{ - ReadTimeout: 10 * time.Second, - WriteTimeout: 10 * time.Second, + ReadTimeout: 20 * time.Second, + WriteTimeout: 20 * time.Second, Handler: handler, Addr: config.Keys.Addr, } diff --git a/internal/repository/migrations/sqlite3/08_add-footprint.up.sql b/internal/repository/migrations/sqlite3/08_add-footprint.up.sql index bcd6494..4895258 100644 --- a/internal/repository/migrations/sqlite3/08_add-footprint.up.sql +++ b/internal/repository/migrations/sqlite3/08_add-footprint.up.sql @@ -1,5 +1,7 @@ -CREATE INDEX IF NOT EXISTS job_by_project ON job (project); -CREATE INDEX IF NOT EXISTS job_list_projects ON job (project, job_state); +CREATE INDEX IF NOT EXISTS jobs_cluster_orderby_starttime ON job (cluster, start_time DESC); +CREATE INDEX IF NOT EXISTS jobs_cluster_count ON job (cluster, job_state, start_time); +CREATE INDEX IF NOT EXISTS jobs_project_orderby_starttime ON job (project, start_time DESC); +CREATE INDEX IF NOT EXISTS jobs_project_count ON job (project, job_state, start_time); ALTER TABLE job ADD COLUMN energy REAL NOT NULL DEFAULT 0.0; ALTER TABLE job ADD COLUMN energy_footprint TEXT DEFAULT NULL; @@ -24,3 +26,5 @@ ALTER TABLE job DROP net_bw_avg; ALTER TABLE job DROP net_data_vol_total; ALTER TABLE job DROP file_bw_avg; ALTER TABLE job DROP file_data_vol_total; + +PRAGMA optimize; \ No newline at end of file diff --git a/pkg/archive/clusterConfig.go b/pkg/archive/clusterConfig.go index 6f0178c..3dd8c64 100644 --- a/pkg/archive/clusterConfig.go +++ b/pkg/archive/clusterConfig.go @@ -88,7 +88,7 @@ func initClusterConfig() error { sc.Footprint = append(sc.Footprint, newMetric.Name) ml.Footprint = newMetric.Footprint } - if newMetric.Energy { + if newMetric.Energy != "" { sc.EnergyFootprint = append(sc.EnergyFootprint, newMetric.Name) } } @@ -99,7 +99,7 @@ func initClusterConfig() error { if newMetric.Footprint != "" { sc.Footprint = append(sc.Footprint, newMetric.Name) } - if newMetric.Energy { + if newMetric.Energy != "" { sc.EnergyFootprint = append(sc.EnergyFootprint, newMetric.Name) } } diff --git a/pkg/schema/cluster.go b/pkg/schema/cluster.go index e9aa178..b9bf306 100644 --- a/pkg/schema/cluster.go +++ b/pkg/schema/cluster.go @@ -54,7 +54,7 @@ type SubClusterConfig struct { Alert float64 `json:"alert"` Remove bool `json:"remove"` LowerIsBetter bool `json:"lowerIsBetter"` - Energy bool `json:"energy"` + Energy string `json:"energy"` } type MetricConfig struct { @@ -70,7 +70,7 @@ type MetricConfig struct { Alert float64 `json:"alert"` Timestep int `json:"timestep"` LowerIsBetter bool `json:"lowerIsBetter"` - Energy bool `json:"energy"` + Energy string `json:"energy"` } type Cluster struct { diff --git a/pkg/schema/schemas/cluster.schema.json b/pkg/schema/schemas/cluster.schema.json index 81b138a..66b7ba1 100644 --- a/pkg/schema/schemas/cluster.schema.json +++ b/pkg/schema/schemas/cluster.schema.json @@ -50,7 +50,11 @@ }, "energy": { "description": "Is it used to calculate job energy", - "type": "boolean" + "type": "string", + "enum": [ + "power", + "energy" + ] }, "lowerIsBetter": { "description": "Is lower better.", @@ -93,7 +97,11 @@ }, "energy": { "description": "Is it used to calculate job energy. Overwrite global", - "type": "boolean" + "type": "string", + "enum": [ + "power", + "energy" + ] }, "lowerIsBetter": { "description": "Is lower better. Overwrite global", From e7231b0e13d039636dbd7741aa8edaa5eb855f5c Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Wed, 28 Aug 2024 10:03:04 +0200 Subject: [PATCH 15/31] Finish refactoring Add new packages: - metricDataDispatcher - archiver --- internal/api/api_test.go | 5 +- internal/api/rest.go | 6 +- internal/archiver/archiver.go | 81 ++++++++++++++ internal/graph/schema.resolvers.go | 20 ++-- internal/graph/util.go | 10 +- internal/metricDataDispatcher/dataLoader.go | 110 ++++++++++++++++++- internal/metricdata/metricdata.go | 115 +------------------- internal/repository/archiveWorker.go | 4 +- internal/repository/stats.go | 4 +- pkg/archive/archive.go | 68 ------------ 10 files changed, 216 insertions(+), 207 deletions(-) create mode 100644 internal/archiver/archiver.go diff --git a/internal/api/api_test.go b/internal/api/api_test.go index 80a7e64..a6d183e 100644 --- a/internal/api/api_test.go +++ b/internal/api/api_test.go @@ -22,6 +22,7 @@ import ( "github.com/ClusterCockpit/cc-backend/internal/auth" "github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-backend/internal/graph" + "github.com/ClusterCockpit/cc-backend/internal/metricDataDispatcher" "github.com/ClusterCockpit/cc-backend/internal/metricdata" "github.com/ClusterCockpit/cc-backend/internal/repository" "github.com/ClusterCockpit/cc-backend/pkg/archive" @@ -150,7 +151,7 @@ func setup(t *testing.T) *api.RestApi { t.Fatal(err) } - if err := metricdata.Init(config.Keys.DisableArchive); err != nil { + if err := metricdata.Init(); err != nil { t.Fatal(err) } @@ -341,7 +342,7 @@ func TestRestApi(t *testing.T) { } t.Run("CheckArchive", func(t *testing.T) { - data, err := metricdata.LoadData(stoppedJob, []string{"load_one"}, []schema.MetricScope{schema.MetricScopeNode}, context.Background()) + data, err := metricDataDispatcher.LoadData(stoppedJob, []string{"load_one"}, []schema.MetricScope{schema.MetricScopeNode}, context.Background()) if err != nil { t.Fatal(err) } diff --git a/internal/api/rest.go b/internal/api/rest.go index c8f4e7a..da0f4be 100644 --- a/internal/api/rest.go +++ b/internal/api/rest.go @@ -24,7 +24,7 @@ import ( "github.com/ClusterCockpit/cc-backend/internal/graph" "github.com/ClusterCockpit/cc-backend/internal/graph/model" "github.com/ClusterCockpit/cc-backend/internal/importer" - "github.com/ClusterCockpit/cc-backend/internal/metricdata" + "github.com/ClusterCockpit/cc-backend/internal/metricDataDispatcher" "github.com/ClusterCockpit/cc-backend/internal/repository" "github.com/ClusterCockpit/cc-backend/internal/util" "github.com/ClusterCockpit/cc-backend/pkg/archive" @@ -515,7 +515,7 @@ func (api *RestApi) getCompleteJobById(rw http.ResponseWriter, r *http.Request) var data schema.JobData if r.URL.Query().Get("all-metrics") == "true" { - data, err = metricdata.LoadData(job, nil, scopes, r.Context()) + data, err = metricDataDispatcher.LoadData(job, nil, scopes, r.Context()) if err != nil { log.Warn("Error while loading job data") return @@ -604,7 +604,7 @@ func (api *RestApi) getJobById(rw http.ResponseWriter, r *http.Request) { scopes = []schema.MetricScope{"node"} } - data, err := metricdata.LoadData(job, metrics, scopes, r.Context()) + data, err := metricDataDispatcher.LoadData(job, metrics, scopes, r.Context()) if err != nil { log.Warn("Error while loading job data") return diff --git a/internal/archiver/archiver.go b/internal/archiver/archiver.go new file mode 100644 index 0000000..e10a994 --- /dev/null +++ b/internal/archiver/archiver.go @@ -0,0 +1,81 @@ +// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. +package archiver + +import ( + "context" + "math" + + "github.com/ClusterCockpit/cc-backend/internal/config" + "github.com/ClusterCockpit/cc-backend/internal/metricDataDispatcher" + "github.com/ClusterCockpit/cc-backend/pkg/archive" + "github.com/ClusterCockpit/cc-backend/pkg/log" + "github.com/ClusterCockpit/cc-backend/pkg/schema" +) + +// Writes a running job to the job-archive +func ArchiveJob(job *schema.Job, ctx context.Context) (*schema.JobMeta, error) { + allMetrics := make([]string, 0) + metricConfigs := archive.GetCluster(job.Cluster).MetricConfig + for _, mc := range metricConfigs { + allMetrics = append(allMetrics, mc.Name) + } + + // TODO: Talk about this! What resolutions to store data at... + scopes := []schema.MetricScope{schema.MetricScopeNode} + if job.NumNodes <= 8 { + scopes = append(scopes, schema.MetricScopeCore) + } + + if job.NumAcc > 0 { + scopes = append(scopes, schema.MetricScopeAccelerator) + } + + jobData, err := metricDataDispatcher.LoadData(job, allMetrics, scopes, ctx) + if err != nil { + log.Error("Error wile loading job data for archiving") + return nil, err + } + + jobMeta := &schema.JobMeta{ + BaseJob: job.BaseJob, + StartTime: job.StartTime.Unix(), + Statistics: make(map[string]schema.JobStatistics), + } + + for metric, data := range jobData { + avg, min, max := 0.0, math.MaxFloat32, -math.MaxFloat32 + nodeData, ok := data["node"] + if !ok { + // TODO/FIXME: Calc average for non-node metrics as well! + continue + } + + for _, series := range nodeData.Series { + avg += series.Statistics.Avg + min = math.Min(min, series.Statistics.Min) + max = math.Max(max, series.Statistics.Max) + } + + jobMeta.Statistics[metric] = schema.JobStatistics{ + Unit: schema.Unit{ + Prefix: archive.GetMetricConfig(job.Cluster, metric).Unit.Prefix, + Base: archive.GetMetricConfig(job.Cluster, metric).Unit.Base, + }, + Avg: avg / float64(job.NumNodes), + Min: min, + Max: max, + } + } + + // If the file based archive is disabled, + // only return the JobMeta structure as the + // statistics in there are needed. + if config.Keys.DisableArchive { + return jobMeta, nil + } + + return jobMeta, archive.GetHandle().ImportJob(jobMeta, &jobData) +} diff --git a/internal/graph/schema.resolvers.go b/internal/graph/schema.resolvers.go index f36e25a..6177bce 100644 --- a/internal/graph/schema.resolvers.go +++ b/internal/graph/schema.resolvers.go @@ -15,7 +15,7 @@ import ( "github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-backend/internal/graph/generated" "github.com/ClusterCockpit/cc-backend/internal/graph/model" - "github.com/ClusterCockpit/cc-backend/internal/metricdata" + "github.com/ClusterCockpit/cc-backend/internal/metricDataDispatcher" "github.com/ClusterCockpit/cc-backend/internal/repository" "github.com/ClusterCockpit/cc-backend/pkg/archive" "github.com/ClusterCockpit/cc-backend/pkg/log" @@ -231,7 +231,7 @@ func (r *queryResolver) JobMetrics(ctx context.Context, id string, metrics []str return nil, err } - data, err := metricdata.LoadData(job, metrics, scopes, ctx) + data, err := metricDataDispatcher.LoadData(job, metrics, scopes, ctx) if err != nil { log.Warn("Error while loading job data") return nil, err @@ -383,7 +383,7 @@ func (r *queryResolver) NodeMetrics(ctx context.Context, cluster string, nodes [ } } - data, err := metricdata.LoadNodeData(cluster, metrics, nodes, scopes, from, to, ctx) + data, err := metricDataDispatcher.LoadNodeData(cluster, metrics, nodes, scopes, from, to, ctx) if err != nil { log.Warn("Error while loading node data") return nil, err @@ -440,9 +440,11 @@ func (r *Resolver) Query() generated.QueryResolver { return &queryResolver{r} } // SubCluster returns generated.SubClusterResolver implementation. func (r *Resolver) SubCluster() generated.SubClusterResolver { return &subClusterResolver{r} } -type clusterResolver struct{ *Resolver } -type jobResolver struct{ *Resolver } -type metricValueResolver struct{ *Resolver } -type mutationResolver struct{ *Resolver } -type queryResolver struct{ *Resolver } -type subClusterResolver struct{ *Resolver } +type ( + clusterResolver struct{ *Resolver } + jobResolver struct{ *Resolver } + metricValueResolver struct{ *Resolver } + mutationResolver struct{ *Resolver } + queryResolver struct{ *Resolver } + subClusterResolver struct{ *Resolver } +) diff --git a/internal/graph/util.go b/internal/graph/util.go index 3e65b6c..8296a02 100644 --- a/internal/graph/util.go +++ b/internal/graph/util.go @@ -11,7 +11,7 @@ import ( "github.com/99designs/gqlgen/graphql" "github.com/ClusterCockpit/cc-backend/internal/graph/model" - "github.com/ClusterCockpit/cc-backend/internal/metricdata" + "github.com/ClusterCockpit/cc-backend/internal/metricDataDispatcher" "github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/schema" // "github.com/ClusterCockpit/cc-backend/pkg/archive" @@ -24,8 +24,8 @@ func (r *queryResolver) rooflineHeatmap( ctx context.Context, filter []*model.JobFilter, rows int, cols int, - minX float64, minY float64, maxX float64, maxY float64) ([][]float64, error) { - + minX float64, minY float64, maxX float64, maxY float64, +) ([][]float64, error) { jobs, err := r.Repo.QueryJobs(ctx, filter, &model.PageRequest{Page: 1, ItemsPerPage: MAX_JOBS_FOR_ANALYSIS + 1}, nil) if err != nil { log.Error("Error while querying jobs for roofline") @@ -47,7 +47,7 @@ func (r *queryResolver) rooflineHeatmap( continue } - jobdata, err := metricdata.LoadData(job, []string{"flops_any", "mem_bw"}, []schema.MetricScope{schema.MetricScopeNode}, ctx) + jobdata, err := metricDataDispatcher.LoadData(job, []string{"flops_any", "mem_bw"}, []schema.MetricScope{schema.MetricScopeNode}, ctx) if err != nil { log.Errorf("Error while loading roofline metrics for job %d", job.ID) return nil, err @@ -120,7 +120,7 @@ func (r *queryResolver) jobsFootprints(ctx context.Context, filter []*model.JobF continue } - if err := metricdata.LoadAverages(job, metrics, avgs, ctx); err != nil { + if err := metricDataDispatcher.LoadAverages(job, metrics, avgs, ctx); err != nil { log.Error("Error while loading averages for footprint") return nil, err } diff --git a/internal/metricDataDispatcher/dataLoader.go b/internal/metricDataDispatcher/dataLoader.go index a463ada..2c7cfa6 100644 --- a/internal/metricDataDispatcher/dataLoader.go +++ b/internal/metricDataDispatcher/dataLoader.go @@ -42,11 +42,10 @@ func LoadData(job *schema.Job, if job.State == schema.JobStateRunning || job.MonitoringStatus == schema.MonitoringStatusRunningOrArchiving || - !config.Keys.DisableArchive { + config.Keys.DisableArchive { - repo, ok := metricdata.GetMetricDataRepo(job.Cluster) - - if !ok { + repo, err := metricdata.GetMetricDataRepo(job.Cluster) + if err != nil { return fmt.Errorf("METRICDATA/METRICDATA > no metric data repository configured for '%s'", job.Cluster), 0, 0 } @@ -117,7 +116,33 @@ func LoadData(job *schema.Job, ttl = 2 * time.Minute } - prepareJobData(jd, scopes) + // FIXME: Review: Is this really necessary or correct. + // For /monitoring/job/ and some other places, flops_any and mem_bw need + // to be available at the scope 'node'. If a job has a lot of nodes, + // statisticsSeries should be available so that a min/median/max Graph can be + // used instead of a lot of single lines. + const maxSeriesSize int = 15 + for _, scopes := range jd { + for _, jm := range scopes { + if jm.StatisticsSeries != nil || len(jm.Series) <= maxSeriesSize { + continue + } + + jm.AddStatisticsSeries() + } + } + + nodeScopeRequested := false + for _, scope := range scopes { + if scope == schema.MetricScopeNode { + nodeScopeRequested = true + } + } + + if nodeScopeRequested { + jd.AddNodeScope("flops_any") + jd.AddNodeScope("mem_bw") + } return jd, ttl, size }) @@ -129,3 +154,78 @@ func LoadData(job *schema.Job, return data.(schema.JobData), nil } + +// Used for the jobsFootprint GraphQL-Query. TODO: Rename/Generalize. +func LoadAverages( + job *schema.Job, + metrics []string, + data [][]schema.Float, + ctx context.Context, +) error { + if job.State != schema.JobStateRunning && !config.Keys.DisableArchive { + return archive.LoadAveragesFromArchive(job, metrics, data) // #166 change also here? + } + + repo, err := metricdata.GetMetricDataRepo(job.Cluster) + if err != nil { + return fmt.Errorf("METRICDATA/METRICDATA > no metric data repository configured for '%s'", job.Cluster) + } + + stats, err := repo.LoadStats(job, metrics, ctx) // #166 how to handle stats for acc normalizazion? + if err != nil { + log.Errorf("Error while loading statistics for job %v (User %v, Project %v)", job.JobID, job.User, job.Project) + return err + } + + for i, m := range metrics { + nodes, ok := stats[m] + if !ok { + data[i] = append(data[i], schema.NaN) + continue + } + + sum := 0.0 + for _, node := range nodes { + sum += node.Avg + } + data[i] = append(data[i], schema.Float(sum)) + } + + return nil +} + +// Used for the node/system view. Returns a map of nodes to a map of metrics. +func LoadNodeData( + cluster string, + metrics, nodes []string, + scopes []schema.MetricScope, + from, to time.Time, + ctx context.Context, +) (map[string]map[string][]*schema.JobMetric, error) { + repo, err := metricdata.GetMetricDataRepo(cluster) + if err != nil { + return nil, fmt.Errorf("METRICDATA/METRICDATA > no metric data repository configured for '%s'", cluster) + } + + if metrics == nil { + for _, m := range archive.GetCluster(cluster).MetricConfig { + metrics = append(metrics, m.Name) + } + } + + data, err := repo.LoadNodeData(cluster, metrics, nodes, scopes, from, to, ctx) + if err != nil { + if len(data) != 0 { + log.Warnf("partial error: %s", err.Error()) + } else { + log.Error("Error while loading node data from metric repository") + return nil, err + } + } + + if data == nil { + return nil, fmt.Errorf("METRICDATA/METRICDATA > the metric data repository for '%s' does not support this query", cluster) + } + + return data, nil +} diff --git a/internal/metricdata/metricdata.go b/internal/metricdata/metricdata.go index feefb0a..68d8d32 100644 --- a/internal/metricdata/metricdata.go +++ b/internal/metricdata/metricdata.go @@ -11,7 +11,6 @@ import ( "time" "github.com/ClusterCockpit/cc-backend/internal/config" - "github.com/ClusterCockpit/cc-backend/pkg/archive" "github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/schema" ) @@ -68,119 +67,13 @@ func Init() error { return nil } -func GetMetricDataRepo(cluster string) MetricDataRepository { +func GetMetricDataRepo(cluster string) (MetricDataRepository, error) { + var err error repo, ok := metricDataRepos[cluster] if !ok { - return fmt.Errorf("METRICDATA/METRICDATA > no metric data repository configured for '%s'", job.Cluster), 0, 0 + err = fmt.Errorf("METRICDATA/METRICDATA > no metric data repository configured for '%s'", cluster) } - return repo -} - -// Used for the jobsFootprint GraphQL-Query. TODO: Rename/Generalize. -func LoadAverages( - job *schema.Job, - metrics []string, - data [][]schema.Float, - ctx context.Context, -) error { - if job.State != schema.JobStateRunning && useArchive { - return archive.LoadAveragesFromArchive(job, metrics, data) // #166 change also here? - } - - repo, ok := metricDataRepos[job.Cluster] - if !ok { - return fmt.Errorf("METRICDATA/METRICDATA > no metric data repository configured for '%s'", job.Cluster) - } - - stats, err := repo.LoadStats(job, metrics, ctx) // #166 how to handle stats for acc normalizazion? - if err != nil { - log.Errorf("Error while loading statistics for job %v (User %v, Project %v)", job.JobID, job.User, job.Project) - return err - } - - for i, m := range metrics { - nodes, ok := stats[m] - if !ok { - data[i] = append(data[i], schema.NaN) - continue - } - - sum := 0.0 - for _, node := range nodes { - sum += node.Avg - } - data[i] = append(data[i], schema.Float(sum)) - } - - return nil -} - -// Used for the node/system view. Returns a map of nodes to a map of metrics. -func LoadNodeData( - cluster string, - metrics, nodes []string, - scopes []schema.MetricScope, - from, to time.Time, - ctx context.Context, -) (map[string]map[string][]*schema.JobMetric, error) { - repo, ok := metricDataRepos[cluster] - if !ok { - return nil, fmt.Errorf("METRICDATA/METRICDATA > no metric data repository configured for '%s'", cluster) - } - - if metrics == nil { - for _, m := range archive.GetCluster(cluster).MetricConfig { - metrics = append(metrics, m.Name) - } - } - - data, err := repo.LoadNodeData(cluster, metrics, nodes, scopes, from, to, ctx) - if err != nil { - if len(data) != 0 { - log.Warnf("partial error: %s", err.Error()) - } else { - log.Error("Error while loading node data from metric repository") - return nil, err - } - } - - if data == nil { - return nil, fmt.Errorf("METRICDATA/METRICDATA > the metric data repository for '%s' does not support this query", cluster) - } - - return data, nil -} - -// For /monitoring/job/ and some other places, flops_any and mem_bw need -// to be available at the scope 'node'. If a job has a lot of nodes, -// statisticsSeries should be available so that a min/median/max Graph can be -// used instead of a lot of single lines. -func prepareJobData( - jobData schema.JobData, - scopes []schema.MetricScope, -) { - const maxSeriesSize int = 15 - for _, scopes := range jobData { - for _, jm := range scopes { - if jm.StatisticsSeries != nil || len(jm.Series) <= maxSeriesSize { - continue - } - - jm.AddStatisticsSeries() - } - } - - nodeScopeRequested := false - for _, scope := range scopes { - if scope == schema.MetricScopeNode { - nodeScopeRequested = true - } - } - - if nodeScopeRequested { - jobData.AddNodeScope("flops_any") - jobData.AddNodeScope("mem_bw") - } + return repo, err } diff --git a/internal/repository/archiveWorker.go b/internal/repository/archiveWorker.go index 42febb5..7094b7c 100644 --- a/internal/repository/archiveWorker.go +++ b/internal/repository/archiveWorker.go @@ -9,7 +9,7 @@ import ( "encoding/json" "time" - "github.com/ClusterCockpit/cc-backend/internal/metricdata" + "github.com/ClusterCockpit/cc-backend/internal/archiver" "github.com/ClusterCockpit/cc-backend/pkg/archive" "github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/schema" @@ -35,7 +35,7 @@ func (r *JobRepository) archivingWorker() { // metricdata.ArchiveJob will fetch all the data from a MetricDataRepository and push into configured archive backend // TODO: Maybe use context with cancel/timeout here - jobMeta, err := metricdata.ArchiveJob(job, context.Background()) + jobMeta, err := archiver.ArchiveJob(job, context.Background()) if err != nil { log.Errorf("archiving job (dbid: %d) failed at archiving job step: %s", job.ID, err.Error()) r.UpdateMonitoringStatus(job.ID, schema.MonitoringStatusArchivingFailed) diff --git a/internal/repository/stats.go b/internal/repository/stats.go index 81ca8d1..5682144 100644 --- a/internal/repository/stats.go +++ b/internal/repository/stats.go @@ -13,7 +13,7 @@ import ( "github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-backend/internal/graph/model" - "github.com/ClusterCockpit/cc-backend/internal/metricdata" + "github.com/ClusterCockpit/cc-backend/internal/metricDataDispatcher" "github.com/ClusterCockpit/cc-backend/pkg/archive" "github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/schema" @@ -691,7 +691,7 @@ func (r *JobRepository) runningJobsMetricStatisticsHistogram( continue } - if err := metricdata.LoadAverages(job, metrics, avgs, ctx); err != nil { + if err := metricDataDispatcher.LoadAverages(job, metrics, avgs, ctx); err != nil { log.Errorf("Error while loading averages for histogram: %s", err) return nil } diff --git a/pkg/archive/archive.go b/pkg/archive/archive.go index 765a2ce..56c5d47 100644 --- a/pkg/archive/archive.go +++ b/pkg/archive/archive.go @@ -5,13 +5,10 @@ package archive import ( - "context" "encoding/json" "fmt" - "math" "sync" - "github.com/ClusterCockpit/cc-backend/internal/metricdata" "github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/lrucache" "github.com/ClusterCockpit/cc-backend/pkg/schema" @@ -105,71 +102,6 @@ func GetHandle() ArchiveBackend { return ar } -// Writes a running job to the job-archive -func ArchiveJob(job *schema.Job, ctx context.Context) (*schema.JobMeta, error) { - allMetrics := make([]string, 0) - metricConfigs := GetCluster(job.Cluster).MetricConfig - for _, mc := range metricConfigs { - allMetrics = append(allMetrics, mc.Name) - } - - // TODO: Talk about this! What resolutions to store data at... - scopes := []schema.MetricScope{schema.MetricScopeNode} - if job.NumNodes <= 8 { - scopes = append(scopes, schema.MetricScopeCore) - } - - if job.NumAcc > 0 { - scopes = append(scopes, schema.MetricScopeAccelerator) - } - - jobData, err := metricdata.LoadData(job, allMetrics, scopes, ctx) - if err != nil { - log.Error("Error wile loading job data for archiving") - return nil, err - } - - jobMeta := &schema.JobMeta{ - BaseJob: job.BaseJob, - StartTime: job.StartTime.Unix(), - Statistics: make(map[string]schema.JobStatistics), - } - - for metric, data := range jobData { - avg, min, max := 0.0, math.MaxFloat32, -math.MaxFloat32 - nodeData, ok := data["node"] - if !ok { - // TODO/FIXME: Calc average for non-node metrics as well! - continue - } - - for _, series := range nodeData.Series { - avg += series.Statistics.Avg - min = math.Min(min, series.Statistics.Min) - max = math.Max(max, series.Statistics.Max) - } - - jobMeta.Statistics[metric] = schema.JobStatistics{ - Unit: schema.Unit{ - Prefix: GetMetricConfig(job.Cluster, metric).Unit.Prefix, - Base: GetMetricConfig(job.Cluster, metric).Unit.Base, - }, - Avg: avg / float64(job.NumNodes), - Min: min, - Max: max, - } - } - - // If the file based archive is disabled, - // only return the JobMeta structure as the - // statistics in there are needed. - if !useArchive { - return jobMeta, nil - } - - return jobMeta, archive.GetHandle().ImportJob(jobMeta, &jobData) -} - // Helper to metricdata.LoadAverages(). func LoadAveragesFromArchive( job *schema.Job, From db5809d522028fe36c51b187269168f803830cb6 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Wed, 28 Aug 2024 11:13:54 +0200 Subject: [PATCH 16/31] Move rest of archiveing code into new archive package --- cmd/cc-backend/server.go | 3 +- internal/api/api_test.go | 5 +- internal/api/rest.go | 3 +- internal/archiver/archiveWorker.go | 81 +++++++++++++++++++ internal/repository/archiveWorker.go | 112 --------------------------- internal/repository/job.go | 70 ++++++++++++++--- 6 files changed, 147 insertions(+), 127 deletions(-) create mode 100644 internal/archiver/archiveWorker.go delete mode 100644 internal/repository/archiveWorker.go diff --git a/cmd/cc-backend/server.go b/cmd/cc-backend/server.go index 7ca9ccb..bc20fcf 100644 --- a/cmd/cc-backend/server.go +++ b/cmd/cc-backend/server.go @@ -20,6 +20,7 @@ import ( "github.com/99designs/gqlgen/graphql/handler" "github.com/99designs/gqlgen/graphql/playground" "github.com/ClusterCockpit/cc-backend/internal/api" + "github.com/ClusterCockpit/cc-backend/internal/archiver" "github.com/ClusterCockpit/cc-backend/internal/auth" "github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-backend/internal/graph" @@ -308,5 +309,5 @@ func serverShutdown() { server.Shutdown(context.Background()) // Then, wait for any async archivings still pending... - apiHandle.JobRepository.WaitForArchiving() + archiver.WaitForArchiving() } diff --git a/internal/api/api_test.go b/internal/api/api_test.go index a6d183e..bb1ff6f 100644 --- a/internal/api/api_test.go +++ b/internal/api/api_test.go @@ -19,6 +19,7 @@ import ( "testing" "github.com/ClusterCockpit/cc-backend/internal/api" + "github.com/ClusterCockpit/cc-backend/internal/archiver" "github.com/ClusterCockpit/cc-backend/internal/auth" "github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-backend/internal/graph" @@ -312,7 +313,7 @@ func TestRestApi(t *testing.T) { t.Fatal(response.Status, recorder.Body.String()) } - restapi.JobRepository.WaitForArchiving() + archiver.WaitForArchiving() resolver := graph.GetResolverInstance() job, err := resolver.Query().Job(ctx, strconv.Itoa(int(dbid))) if err != nil { @@ -423,7 +424,7 @@ func TestRestApi(t *testing.T) { t.Fatal(response.Status, recorder.Body.String()) } - restapi.JobRepository.WaitForArchiving() + archiver.WaitForArchiving() jobid, cluster := int64(12345), "testcluster" job, err := restapi.JobRepository.Find(&jobid, &cluster, nil) if err != nil { diff --git a/internal/api/rest.go b/internal/api/rest.go index da0f4be..b737090 100644 --- a/internal/api/rest.go +++ b/internal/api/rest.go @@ -19,6 +19,7 @@ import ( "sync" "time" + "github.com/ClusterCockpit/cc-backend/internal/archiver" "github.com/ClusterCockpit/cc-backend/internal/auth" "github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-backend/internal/graph" @@ -1081,7 +1082,7 @@ func (api *RestApi) checkAndHandleStopJob(rw http.ResponseWriter, job *schema.Jo } // Trigger async archiving - api.JobRepository.TriggerArchiving(job) + archiver.TriggerArchiving(job) } func (api *RestApi) getJobMetrics(rw http.ResponseWriter, r *http.Request) { diff --git a/internal/archiver/archiveWorker.go b/internal/archiver/archiveWorker.go new file mode 100644 index 0000000..6757a0b --- /dev/null +++ b/internal/archiver/archiveWorker.go @@ -0,0 +1,81 @@ +// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. +package archiver + +import ( + "context" + "sync" + "time" + + "github.com/ClusterCockpit/cc-backend/internal/repository" + "github.com/ClusterCockpit/cc-backend/pkg/log" + "github.com/ClusterCockpit/cc-backend/pkg/schema" +) + +var ( + archivePending sync.WaitGroup + archiveChannel chan *schema.Job + r *repository.JobRepository +) + +func Start(jobRepo *repository.JobRepository) { + archiveChannel = make(chan *schema.Job, 128) + + go archivingWorker() +} + +// Archiving worker thread +func archivingWorker() { + for { + select { + case job, ok := <-archiveChannel: + if !ok { + break + } + start := time.Now() + // not using meta data, called to load JobMeta into Cache? + // will fail if job meta not in repository + if _, err := r.FetchMetadata(job); err != nil { + log.Errorf("archiving job (dbid: %d) failed at check metadata step: %s", job.ID, err.Error()) + r.UpdateMonitoringStatus(job.ID, schema.MonitoringStatusArchivingFailed) + continue + } + + // ArchiveJob will fetch all the data from a MetricDataRepository and push into configured archive backend + // TODO: Maybe use context with cancel/timeout here + jobMeta, err := ArchiveJob(job, context.Background()) + if err != nil { + log.Errorf("archiving job (dbid: %d) failed at archiving job step: %s", job.ID, err.Error()) + r.UpdateMonitoringStatus(job.ID, schema.MonitoringStatusArchivingFailed) + continue + } + + if err := r.UpdateFootprint(jobMeta); err != nil { + log.Errorf("archiving job (dbid: %d) failed at update Footprint step: %s", job.ID, err.Error()) + continue + } + // Update the jobs database entry one last time: + if err := r.MarkArchived(jobMeta, schema.MonitoringStatusArchivingSuccessful); err != nil { + log.Errorf("archiving job (dbid: %d) failed at marking archived step: %s", job.ID, err.Error()) + continue + } + log.Debugf("archiving job %d took %s", job.JobID, time.Since(start)) + log.Printf("archiving job (dbid: %d) successful", job.ID) + archivePending.Done() + } + } +} + +// Trigger async archiving +func TriggerArchiving(job *schema.Job) { + archivePending.Add(1) + archiveChannel <- job +} + +// Wait for background thread to finish pending archiving operations +func WaitForArchiving() { + // close channel and wait for worker to process remaining jobs + archivePending.Wait() +} diff --git a/internal/repository/archiveWorker.go b/internal/repository/archiveWorker.go deleted file mode 100644 index 7094b7c..0000000 --- a/internal/repository/archiveWorker.go +++ /dev/null @@ -1,112 +0,0 @@ -// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. -// All rights reserved. -// Use of this source code is governed by a MIT-style -// license that can be found in the LICENSE file. -package repository - -import ( - "context" - "encoding/json" - "time" - - "github.com/ClusterCockpit/cc-backend/internal/archiver" - "github.com/ClusterCockpit/cc-backend/pkg/archive" - "github.com/ClusterCockpit/cc-backend/pkg/log" - "github.com/ClusterCockpit/cc-backend/pkg/schema" - sq "github.com/Masterminds/squirrel" -) - -// Archiving worker thread -func (r *JobRepository) archivingWorker() { - for { - select { - case job, ok := <-r.archiveChannel: - if !ok { - break - } - start := time.Now() - // not using meta data, called to load JobMeta into Cache? - // will fail if job meta not in repository - if _, err := r.FetchMetadata(job); err != nil { - log.Errorf("archiving job (dbid: %d) failed at check metadata step: %s", job.ID, err.Error()) - r.UpdateMonitoringStatus(job.ID, schema.MonitoringStatusArchivingFailed) - continue - } - - // metricdata.ArchiveJob will fetch all the data from a MetricDataRepository and push into configured archive backend - // TODO: Maybe use context with cancel/timeout here - jobMeta, err := archiver.ArchiveJob(job, context.Background()) - if err != nil { - log.Errorf("archiving job (dbid: %d) failed at archiving job step: %s", job.ID, err.Error()) - r.UpdateMonitoringStatus(job.ID, schema.MonitoringStatusArchivingFailed) - continue - } - - // Update the jobs database entry one last time: - if err := r.MarkArchived(jobMeta, schema.MonitoringStatusArchivingSuccessful); err != nil { - log.Errorf("archiving job (dbid: %d) failed at marking archived step: %s", job.ID, err.Error()) - continue - } - log.Debugf("archiving job %d took %s", job.JobID, time.Since(start)) - log.Printf("archiving job (dbid: %d) successful", job.ID) - r.archivePending.Done() - } - } -} - -// Stop updates the job with the database id jobId using the provided arguments. -func (r *JobRepository) MarkArchived( - jobMeta *schema.JobMeta, - monitoringStatus int32, -) error { - stmt := sq.Update("job"). - Set("monitoring_status", monitoringStatus). - Where("job.id = ?", jobMeta.JobID) - - sc, err := archive.GetSubCluster(jobMeta.Cluster, jobMeta.SubCluster) - if err != nil { - log.Errorf("cannot get subcluster: %s", err.Error()) - return err - } - footprint := make(map[string]float64) - - for _, fp := range sc.Footprint { - footprint[fp] = LoadJobStat(jobMeta, fp) - } - - var rawFootprint []byte - - if rawFootprint, err = json.Marshal(footprint); err != nil { - log.Warnf("Error while marshaling footprint for job, DB ID '%v'", jobMeta.ID) - return err - } - - stmt = stmt.Set("footprint", rawFootprint) - - if _, err := stmt.RunWith(r.stmtCache).Exec(); err != nil { - log.Warn("Error while marking job as archived") - return err - } - return nil -} - -func (r *JobRepository) UpdateMonitoringStatus(job int64, monitoringStatus int32) (err error) { - stmt := sq.Update("job"). - Set("monitoring_status", monitoringStatus). - Where("job.id = ?", job) - - _, err = stmt.RunWith(r.stmtCache).Exec() - return -} - -// Trigger async archiving -func (r *JobRepository) TriggerArchiving(job *schema.Job) { - r.archivePending.Add(1) - r.archiveChannel <- job -} - -// Wait for background thread to finish pending archiving operations -func (r *JobRepository) WaitForArchiving() { - // close channel and wait for worker to process remaining jobs - r.archivePending.Wait() -} diff --git a/internal/repository/job.go b/internal/repository/job.go index ca8350f..2c5eef8 100644 --- a/internal/repository/job.go +++ b/internal/repository/job.go @@ -28,12 +28,10 @@ var ( ) type JobRepository struct { - DB *sqlx.DB - stmtCache *sq.StmtCache - cache *lrucache.Cache - archiveChannel chan *schema.Job - driver string - archivePending sync.WaitGroup + DB *sqlx.DB + stmtCache *sq.StmtCache + cache *lrucache.Cache + driver string } func GetJobRepository() *JobRepository { @@ -44,12 +42,9 @@ func GetJobRepository() *JobRepository { DB: db.DB, driver: db.Driver, - stmtCache: sq.NewStmtCache(db.DB), - cache: lrucache.New(1024 * 1024), - archiveChannel: make(chan *schema.Job, 128), + stmtCache: sq.NewStmtCache(db.DB), + cache: lrucache.New(1024 * 1024), } - // start archiving worker - go jobRepoInstance.archivingWorker() }) return jobRepoInstance } @@ -495,3 +490,56 @@ func (r *JobRepository) FindJobsBetween(startTimeBegin int64, startTimeEnd int64 log.Infof("Return job count %d", len(jobs)) return jobs, nil } + +func (r *JobRepository) UpdateMonitoringStatus(job int64, monitoringStatus int32) (err error) { + stmt := sq.Update("job"). + Set("monitoring_status", monitoringStatus). + Where("job.id = ?", job) + + _, err = stmt.RunWith(r.stmtCache).Exec() + return +} + +// Stop updates the job with the database id jobId using the provided arguments. +func (r *JobRepository) MarkArchived( + jobMeta *schema.JobMeta, + monitoringStatus int32, +) error { + stmt := sq.Update("job"). + Set("monitoring_status", monitoringStatus). + Where("job.id = ?", jobMeta.JobID) + + if _, err := stmt.RunWith(r.stmtCache).Exec(); err != nil { + log.Warn("Error while marking job as archived") + return err + } + return nil +} + +func (r *JobRepository) UpdateFootprint(jobMeta *schema.JobMeta) error { + sc, err := archive.GetSubCluster(jobMeta.Cluster, jobMeta.SubCluster) + if err != nil { + log.Errorf("cannot get subcluster: %s", err.Error()) + return err + } + footprint := make(map[string]float64) + + for _, fp := range sc.Footprint { + footprint[fp] = LoadJobStat(jobMeta, fp) + } + + var rawFootprint []byte + + if rawFootprint, err = json.Marshal(footprint); err != nil { + log.Warnf("Error while marshaling footprint for job, DB ID '%v'", jobMeta.ID) + return err + } + + stmt := sq.Update("job").Set("footprint", rawFootprint) + + if _, err := stmt.RunWith(r.stmtCache).Exec(); err != nil { + log.Warn("Error while updating job footprint") + return err + } + return nil +} From f30586361667e3c6cc432f089104b215cee35771 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Wed, 28 Aug 2024 12:26:35 +0200 Subject: [PATCH 17/31] Bugs fixed in unit tests and archiver init --- internal/api/api_test.go | 1 + internal/archiver/archiveWorker.go | 19 ++++++++++++------- .../testdata/archive/alex/cluster.json | 6 +++--- .../testdata/archive/fritz/cluster.json | 4 ++-- 4 files changed, 18 insertions(+), 12 deletions(-) diff --git a/internal/api/api_test.go b/internal/api/api_test.go index bb1ff6f..aa0a1f4 100644 --- a/internal/api/api_test.go +++ b/internal/api/api_test.go @@ -156,6 +156,7 @@ func setup(t *testing.T) *api.RestApi { t.Fatal(err) } + archiver.Start(repository.GetJobRepository()) auth.Init() graph.Init() diff --git a/internal/archiver/archiveWorker.go b/internal/archiver/archiveWorker.go index 6757a0b..7b272e6 100644 --- a/internal/archiver/archiveWorker.go +++ b/internal/archiver/archiveWorker.go @@ -17,11 +17,12 @@ import ( var ( archivePending sync.WaitGroup archiveChannel chan *schema.Job - r *repository.JobRepository + jobRepo *repository.JobRepository ) -func Start(jobRepo *repository.JobRepository) { +func Start(r *repository.JobRepository) { archiveChannel = make(chan *schema.Job, 128) + jobRepo = r go archivingWorker() } @@ -37,9 +38,9 @@ func archivingWorker() { start := time.Now() // not using meta data, called to load JobMeta into Cache? // will fail if job meta not in repository - if _, err := r.FetchMetadata(job); err != nil { + if _, err := jobRepo.FetchMetadata(job); err != nil { log.Errorf("archiving job (dbid: %d) failed at check metadata step: %s", job.ID, err.Error()) - r.UpdateMonitoringStatus(job.ID, schema.MonitoringStatusArchivingFailed) + jobRepo.UpdateMonitoringStatus(job.ID, schema.MonitoringStatusArchivingFailed) continue } @@ -48,16 +49,16 @@ func archivingWorker() { jobMeta, err := ArchiveJob(job, context.Background()) if err != nil { log.Errorf("archiving job (dbid: %d) failed at archiving job step: %s", job.ID, err.Error()) - r.UpdateMonitoringStatus(job.ID, schema.MonitoringStatusArchivingFailed) + jobRepo.UpdateMonitoringStatus(job.ID, schema.MonitoringStatusArchivingFailed) continue } - if err := r.UpdateFootprint(jobMeta); err != nil { + if err := jobRepo.UpdateFootprint(jobMeta); err != nil { log.Errorf("archiving job (dbid: %d) failed at update Footprint step: %s", job.ID, err.Error()) continue } // Update the jobs database entry one last time: - if err := r.MarkArchived(jobMeta, schema.MonitoringStatusArchivingSuccessful); err != nil { + if err := jobRepo.MarkArchived(jobMeta, schema.MonitoringStatusArchivingSuccessful); err != nil { log.Errorf("archiving job (dbid: %d) failed at marking archived step: %s", job.ID, err.Error()) continue } @@ -70,6 +71,10 @@ func archivingWorker() { // Trigger async archiving func TriggerArchiving(job *schema.Job) { + if archiveChannel == nil { + log.Fatal("Cannot archive without archiving channel. Did you Start the archiver?") + } + archivePending.Add(1) archiveChannel <- job } diff --git a/pkg/archive/testdata/archive/alex/cluster.json b/pkg/archive/testdata/archive/alex/cluster.json index cc2888d..f1cf085 100644 --- a/pkg/archive/testdata/archive/alex/cluster.json +++ b/pkg/archive/testdata/archive/alex/cluster.json @@ -94,7 +94,7 @@ }, "scope": "hwthread", "aggregation": "sum", - "energy": true, + "energy": "power", "timestep": 60, "peak": 500, "normal": 250, @@ -136,7 +136,7 @@ }, "scope": "accelerator", "aggregation": "sum", - "energy": true, + "energy": "power", "timestep": 60, "peak": 400, "normal": 200, @@ -190,7 +190,7 @@ }, "scope": "socket", "aggregation": "sum", - "energy": true, + "energy": "power", "timestep": 60, "peak": 500, "normal": 250, diff --git a/pkg/archive/testdata/archive/fritz/cluster.json b/pkg/archive/testdata/archive/fritz/cluster.json index 58ec3af..3df3a95 100644 --- a/pkg/archive/testdata/archive/fritz/cluster.json +++ b/pkg/archive/testdata/archive/fritz/cluster.json @@ -256,7 +256,7 @@ "normal": 250, "caution": 100, "alert": 50, - "energy": true + "energy": "power" }, { "name": "mem_power", @@ -270,7 +270,7 @@ "normal": 50, "caution": 20, "alert": 10, - "energy": true + "energy": "power" }, { "name": "ipc", From 5b03cf826bd844e7bebaad81be88458f1adc79bc Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Thu, 29 Aug 2024 07:26:49 +0200 Subject: [PATCH 18/31] feat: Add total energy and energy footprint --- internal/archiver/archiveWorker.go | 4 +++ internal/repository/job.go | 56 ++++++++++++++++++++++++++++-- internal/repository/stats.go | 14 +++++--- pkg/archive/clusterConfig.go | 10 ++++++ 4 files changed, 76 insertions(+), 8 deletions(-) diff --git a/internal/archiver/archiveWorker.go b/internal/archiver/archiveWorker.go index 7b272e6..4de5032 100644 --- a/internal/archiver/archiveWorker.go +++ b/internal/archiver/archiveWorker.go @@ -57,6 +57,10 @@ func archivingWorker() { log.Errorf("archiving job (dbid: %d) failed at update Footprint step: %s", job.ID, err.Error()) continue } + if err := jobRepo.UpdateEnergy(jobMeta); err != nil { + log.Errorf("archiving job (dbid: %d) failed at update Energy step: %s", job.ID, err.Error()) + continue + } // Update the jobs database entry one last time: if err := jobRepo.MarkArchived(jobMeta, schema.MonitoringStatusArchivingSuccessful); err != nil { log.Errorf("archiving job (dbid: %d) failed at marking archived step: %s", job.ID, err.Error()) diff --git a/internal/repository/job.go b/internal/repository/job.go index 2c5eef8..1e552e1 100644 --- a/internal/repository/job.go +++ b/internal/repository/job.go @@ -500,7 +500,7 @@ func (r *JobRepository) UpdateMonitoringStatus(job int64, monitoringStatus int32 return } -// Stop updates the job with the database id jobId using the provided arguments. +// FIXME: Combine the next three queries into one providing the db statement as function argument! func (r *JobRepository) MarkArchived( jobMeta *schema.JobMeta, monitoringStatus int32, @@ -516,6 +516,49 @@ func (r *JobRepository) MarkArchived( return nil } +func (r *JobRepository) UpdateEnergy(jobMeta *schema.JobMeta) error { + sc, err := archive.GetSubCluster(jobMeta.Cluster, jobMeta.SubCluster) + if err != nil { + log.Errorf("cannot get subcluster: %s", err.Error()) + return err + } + energyFootprint := make(map[string]float64) + var totalEnergy float64 + var energy float64 + + for _, fp := range sc.EnergyFootprint { + if i, err := archive.MetricIndex(sc.MetricConfig, fp); err != nil { + // FIXME: Check for unit conversions + if sc.MetricConfig[i].Energy == "power" { + energy = LoadJobStat(jobMeta, fp, "avg") * float64(jobMeta.Duration) + } else if sc.MetricConfig[i].Energy == "energy" { + // FIXME: Compute sum of energy metric + } + } + + energyFootprint[fp] = energy + totalEnergy += energy + } + + var rawFootprint []byte + + if rawFootprint, err = json.Marshal(energyFootprint); err != nil { + log.Warnf("Error while marshaling energy footprint for job, DB ID '%v'", jobMeta.ID) + return err + } + + stmt := sq.Update("job"). + Set("energy_footprint", rawFootprint). + Set("energy", totalEnergy). + Where("job.id = ?", jobMeta.JobID) + + if _, err := stmt.RunWith(r.stmtCache).Exec(); err != nil { + log.Warn("Error while updating job energy footprint") + return err + } + return nil +} + func (r *JobRepository) UpdateFootprint(jobMeta *schema.JobMeta) error { sc, err := archive.GetSubCluster(jobMeta.Cluster, jobMeta.SubCluster) if err != nil { @@ -525,7 +568,13 @@ func (r *JobRepository) UpdateFootprint(jobMeta *schema.JobMeta) error { footprint := make(map[string]float64) for _, fp := range sc.Footprint { - footprint[fp] = LoadJobStat(jobMeta, fp) + statType := "avg" + + if i, err := archive.MetricIndex(sc.MetricConfig, fp); err != nil { + statType = sc.MetricConfig[i].Footprint + } + + footprint[fp] = LoadJobStat(jobMeta, fp, statType) } var rawFootprint []byte @@ -535,7 +584,8 @@ func (r *JobRepository) UpdateFootprint(jobMeta *schema.JobMeta) error { return err } - stmt := sq.Update("job").Set("footprint", rawFootprint) + stmt := sq.Update("job").Set("footprint", rawFootprint). + Where("job.id = ?", jobMeta.JobID) if _, err := stmt.RunWith(r.stmtCache).Exec(); err != nil { log.Warn("Error while updating job footprint") diff --git a/internal/repository/stats.go b/internal/repository/stats.go index 5682144..ca05ca3 100644 --- a/internal/repository/stats.go +++ b/internal/repository/stats.go @@ -286,13 +286,17 @@ func (r *JobRepository) JobsStats( return stats, nil } -// FIXME: Make generic -func LoadJobStat(job *schema.JobMeta, metric string) float64 { +func LoadJobStat(job *schema.JobMeta, metric string, statType string) float64 { if stats, ok := job.Statistics[metric]; ok { - if metric == "mem_used" { - return stats.Max - } else { + switch statType { + case "avg": return stats.Avg + case "max": + return stats.Max + case "min": + return stats.Min + default: + log.Errorf("Unknown stat type %s", statType) } } diff --git a/pkg/archive/clusterConfig.go b/pkg/archive/clusterConfig.go index 3dd8c64..5710d06 100644 --- a/pkg/archive/clusterConfig.go +++ b/pkg/archive/clusterConfig.go @@ -221,3 +221,13 @@ func GetSubClusterByNode(cluster, hostname string) (string, error) { return "", fmt.Errorf("ARCHIVE/CLUSTERCONFIG > no subcluster found for cluster %v and host %v", cluster, hostname) } + +func MetricIndex(mc []schema.MetricConfig, name string) (int, error) { + for i, m := range mc { + if m.Name == name { + return i, nil + } + } + + return 0, fmt.Errorf("Unknown metric name %s", name) +} From 7c51d88501adafa592bbe3188806c7621d58a250 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Thu, 29 Aug 2024 08:45:04 +0200 Subject: [PATCH 19/31] Add stub for Footprint update service --- internal/taskManager/footprintService.go | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/internal/taskManager/footprintService.go b/internal/taskManager/footprintService.go index 28a5a72..ba1fb09 100644 --- a/internal/taskManager/footprintService.go +++ b/internal/taskManager/footprintService.go @@ -4,5 +4,22 @@ // license that can be found in the LICENSE file. package taskManager +import ( + "time" + + "github.com/ClusterCockpit/cc-backend/pkg/log" + "github.com/go-co-op/gocron/v2" +) + func registerFootprintWorker() { + log.Info("Register Footprint Update service") + d, _ := time.ParseDuration("10m") + s.NewJob(gocron.DurationJob(d), + gocron.NewTask( + func() { + t := time.Now() + log.Printf("Update Footprints started at %s", t.Format(time.RFC3339)) + + log.Print("Update Footprints done") + })) } From b0c9d1164d7928fc0100c70698622fdb71d3ab96 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Fri, 30 Aug 2024 07:22:40 +0200 Subject: [PATCH 20/31] Add initial version of footprint update service Not tested yet --- internal/archiver/archiver.go | 6 +-- internal/repository/job.go | 32 ++++++++++- internal/taskManager/footprintService.go | 67 ++++++++++++++++++++++++ 3 files changed, 100 insertions(+), 5 deletions(-) diff --git a/internal/archiver/archiver.go b/internal/archiver/archiver.go index e10a994..abaecd6 100644 --- a/internal/archiver/archiver.go +++ b/internal/archiver/archiver.go @@ -23,9 +23,9 @@ func ArchiveJob(job *schema.Job, ctx context.Context) (*schema.JobMeta, error) { allMetrics = append(allMetrics, mc.Name) } - // TODO: Talk about this! What resolutions to store data at... scopes := []schema.MetricScope{schema.MetricScopeNode} - if job.NumNodes <= 8 { + if job.NumNodes <= 8 { // FIXME: Add a config option for this + // This will add the native scope if core scope is not available scopes = append(scopes, schema.MetricScopeCore) } @@ -49,7 +49,7 @@ func ArchiveJob(job *schema.Job, ctx context.Context) (*schema.JobMeta, error) { avg, min, max := 0.0, math.MaxFloat32, -math.MaxFloat32 nodeData, ok := data["node"] if !ok { - // TODO/FIXME: Calc average for non-node metrics as well! + // This should never happen ? continue } diff --git a/internal/repository/job.go b/internal/repository/job.go index 1e552e1..7cfe4fd 100644 --- a/internal/repository/job.go +++ b/internal/repository/job.go @@ -453,6 +453,33 @@ func (r *JobRepository) StopJobsExceedingWalltimeBy(seconds int) error { return nil } +func (r *JobRepository) FindRunningJobs(cluster string) ([]*schema.Job, error) { + query := sq.Select(jobColumns...).From("job"). + Where(fmt.Sprintf("job.cluster = '%s'", cluster)). + Where("job.job_state = 'running'"). + Where("job.duration>600") + + rows, err := query.RunWith(r.stmtCache).Query() + if err != nil { + log.Error("Error while running query") + return nil, err + } + + jobs := make([]*schema.Job, 0, 50) + for rows.Next() { + job, err := scanJob(rows) + if err != nil { + rows.Close() + log.Warn("Error while scanning rows") + return nil, err + } + jobs = append(jobs, job) + } + + log.Infof("Return job count %d", len(jobs)) + return jobs, nil +} + func (r *JobRepository) FindJobsBetween(startTimeBegin int64, startTimeEnd int64) ([]*schema.Job, error) { var query sq.SelectBuilder @@ -532,7 +559,7 @@ func (r *JobRepository) UpdateEnergy(jobMeta *schema.JobMeta) error { if sc.MetricConfig[i].Energy == "power" { energy = LoadJobStat(jobMeta, fp, "avg") * float64(jobMeta.Duration) } else if sc.MetricConfig[i].Energy == "energy" { - // FIXME: Compute sum of energy metric + // This assumes the metric is of aggregation type sum } } @@ -574,7 +601,8 @@ func (r *JobRepository) UpdateFootprint(jobMeta *schema.JobMeta) error { statType = sc.MetricConfig[i].Footprint } - footprint[fp] = LoadJobStat(jobMeta, fp, statType) + name := fmt.Sprintf("%s_%s", fp, statType) + footprint[fp] = LoadJobStat(jobMeta, name, statType) } var rawFootprint []byte diff --git a/internal/taskManager/footprintService.go b/internal/taskManager/footprintService.go index ba1fb09..d14026d 100644 --- a/internal/taskManager/footprintService.go +++ b/internal/taskManager/footprintService.go @@ -5,9 +5,14 @@ package taskManager import ( + "context" + "math" "time" + "github.com/ClusterCockpit/cc-backend/internal/metricDataDispatcher" + "github.com/ClusterCockpit/cc-backend/pkg/archive" "github.com/ClusterCockpit/cc-backend/pkg/log" + "github.com/ClusterCockpit/cc-backend/pkg/schema" "github.com/go-co-op/gocron/v2" ) @@ -19,7 +24,69 @@ func registerFootprintWorker() { func() { t := time.Now() log.Printf("Update Footprints started at %s", t.Format(time.RFC3339)) + for _, cluster := range archive.Clusters { + jobs, err := jobRepo.FindRunningJobs(cluster.Name) + if err != nil { + continue + } + allMetrics := make([]string, 0) + metricConfigs := archive.GetCluster(cluster.Name).MetricConfig + for _, mc := range metricConfigs { + allMetrics = append(allMetrics, mc.Name) + } + scopes := []schema.MetricScope{schema.MetricScopeNode} + scopes = append(scopes, schema.MetricScopeCore) + scopes = append(scopes, schema.MetricScopeAccelerator) + + for _, job := range jobs { + jobData, err := metricDataDispatcher.LoadData(job, allMetrics, scopes, context.Background()) + if err != nil { + log.Error("Error wile loading job data for footprint update") + continue + } + + jobMeta := &schema.JobMeta{ + BaseJob: job.BaseJob, + StartTime: job.StartTime.Unix(), + Statistics: make(map[string]schema.JobStatistics), + } + + for metric, data := range jobData { + avg, min, max := 0.0, math.MaxFloat32, -math.MaxFloat32 + nodeData, ok := data["node"] + if !ok { + // This should never happen ? + continue + } + + for _, series := range nodeData.Series { + avg += series.Statistics.Avg + min = math.Min(min, series.Statistics.Min) + max = math.Max(max, series.Statistics.Max) + } + + jobMeta.Statistics[metric] = schema.JobStatistics{ + Unit: schema.Unit{ + Prefix: archive.GetMetricConfig(job.Cluster, metric).Unit.Prefix, + Base: archive.GetMetricConfig(job.Cluster, metric).Unit.Base, + }, + Avg: avg / float64(job.NumNodes), + Min: min, + Max: max, + } + } + + if err := jobRepo.UpdateFootprint(jobMeta); err != nil { + log.Errorf("Update job (dbid: %d) failed at update Footprint step: %s", job.ID, err.Error()) + continue + } + if err := jobRepo.UpdateEnergy(jobMeta); err != nil { + log.Errorf("Update job (dbid: %d) failed at update Energy step: %s", job.ID, err.Error()) + continue + } + } + } log.Print("Update Footprints done") })) } From c459724114abbc1b03c6d7ab23c1f1b373759127 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Fri, 30 Aug 2024 13:50:49 +0200 Subject: [PATCH 21/31] Resolve build errors --- internal/importer/handleImport.go | 10 +++++++++- internal/importer/initDB.go | 10 +++++++++- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/internal/importer/handleImport.go b/internal/importer/handleImport.go index c4d55ab..35403f6 100644 --- a/internal/importer/handleImport.go +++ b/internal/importer/handleImport.go @@ -77,8 +77,16 @@ func HandleImportFlag(flag string) error { job.Footprint = make(map[string]float64) for _, fp := range sc.Footprint { - job.Footprint[fp] = repository.LoadJobStat(&job, fp) + statType := "avg" + + if i, err := archive.MetricIndex(sc.MetricConfig, fp); err != nil { + statType = sc.MetricConfig[i].Footprint + } + + name := fmt.Sprintf("%s_%s", fp, statType) + job.Footprint[fp] = repository.LoadJobStat(&job, name, statType) } + job.RawFootprint, err = json.Marshal(job.Footprint) if err != nil { log.Warn("Error while marshaling job footprint") diff --git a/internal/importer/initDB.go b/internal/importer/initDB.go index 4b9abab..fe49e94 100644 --- a/internal/importer/initDB.go +++ b/internal/importer/initDB.go @@ -68,7 +68,15 @@ func InitDB() error { job.Footprint = make(map[string]float64) for _, fp := range sc.Footprint { - job.Footprint[fp] = repository.LoadJobStat(jobMeta, fp) + statType := "avg" + + if i, err := archive.MetricIndex(sc.MetricConfig, fp); err != nil { + statType = sc.MetricConfig[i].Footprint + } + + name := fmt.Sprintf("%s_%s", fp, statType) + + job.Footprint[fp] = repository.LoadJobStat(jobMeta, name, statType) } job.RawFootprint, err = json.Marshal(job.Footprint) From 47b14f932ed19ff02728edddd80641f002b9d3a6 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Mon, 2 Sep 2024 12:07:44 +0200 Subject: [PATCH 22/31] Start footprint service --- internal/taskManager/footprintService.go | 2 +- internal/taskManager/taskManager.go | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/internal/taskManager/footprintService.go b/internal/taskManager/footprintService.go index d14026d..ff76e25 100644 --- a/internal/taskManager/footprintService.go +++ b/internal/taskManager/footprintService.go @@ -16,7 +16,7 @@ import ( "github.com/go-co-op/gocron/v2" ) -func registerFootprintWorker() { +func RegisterFootprintWorker() { log.Info("Register Footprint Update service") d, _ := time.ParseDuration("10m") s.NewJob(gocron.DurationJob(d), diff --git a/internal/taskManager/taskManager.go b/internal/taskManager/taskManager.go index 006469c..b31a1a1 100644 --- a/internal/taskManager/taskManager.go +++ b/internal/taskManager/taskManager.go @@ -79,6 +79,8 @@ func Start() { RegisterLdapSyncService(lc.SyncInterval) } + RegisterFootprintWorker() + s.Start() } From 39c09f8565fe0f63f808416ad018fea3e6453d13 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Tue, 3 Sep 2024 10:03:38 +0200 Subject: [PATCH 23/31] Introduce job duration update task --- internal/repository/job.go | 17 +++++++++++- internal/taskManager/taskManager.go | 1 + internal/taskManager/updateDurationService.go | 26 +++++++++++++++++++ ...ntService.go => updateFootprintService.go} | 0 4 files changed, 43 insertions(+), 1 deletion(-) create mode 100644 internal/taskManager/updateDurationService.go rename internal/taskManager/{footprintService.go => updateFootprintService.go} (100%) diff --git a/internal/repository/job.go b/internal/repository/job.go index 7cfe4fd..01dc0af 100644 --- a/internal/repository/job.go +++ b/internal/repository/job.go @@ -205,7 +205,10 @@ func (r *JobRepository) UpdateMetadata(job *schema.Job, key, val string) (err er return err } - if _, err = sq.Update("job").Set("meta_data", job.RawMetaData).Where("job.id = ?", job.ID).RunWith(r.stmtCache).Exec(); err != nil { + if _, err = sq.Update("job"). + Set("meta_data", job.RawMetaData). + Where("job.id = ?", job.ID). + RunWith(r.stmtCache).Exec(); err != nil { log.Warnf("Error while updating metadata for job, DB ID '%v'", job.ID) return err } @@ -480,6 +483,18 @@ func (r *JobRepository) FindRunningJobs(cluster string) ([]*schema.Job, error) { return jobs, nil } +func (r *JobRepository) UpdateDuration() error { + if _, err := sq.Update("job"). + Set("duration", sq.Expr("? - job.start_time", time.Now().Unix())). + Where("job_state = running"). + RunWith(r.stmtCache).Exec(); err != nil { + log.Warnf("Error while updating metadata for job, DB ID '%v'", job.ID) + return err + } + + return nil +} + func (r *JobRepository) FindJobsBetween(startTimeBegin int64, startTimeEnd int64) ([]*schema.Job, error) { var query sq.SelectBuilder diff --git a/internal/taskManager/taskManager.go b/internal/taskManager/taskManager.go index b31a1a1..101fc4a 100644 --- a/internal/taskManager/taskManager.go +++ b/internal/taskManager/taskManager.go @@ -80,6 +80,7 @@ func Start() { } RegisterFootprintWorker() + RegisterUpdateDurationWorker() s.Start() } diff --git a/internal/taskManager/updateDurationService.go b/internal/taskManager/updateDurationService.go new file mode 100644 index 0000000..afc1045 --- /dev/null +++ b/internal/taskManager/updateDurationService.go @@ -0,0 +1,26 @@ +// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. +package taskManager + +import ( + "time" + + "github.com/ClusterCockpit/cc-backend/pkg/log" + "github.com/go-co-op/gocron/v2" +) + +func RegisterUpdateDurationWorker() { + log.Info("Register duration update service") + + d, _ := time.ParseDuration("5m") + s.NewJob(gocron.DurationJob(d), + gocron.NewTask( + func() { + start := time.Now() + log.Printf("Update duration started at %s", start.Format(time.RFC3339)) + jobRepo.UpdateDuration() + log.Print("Update duration is done and took %s", time.Since(start)) + })) +} diff --git a/internal/taskManager/footprintService.go b/internal/taskManager/updateFootprintService.go similarity index 100% rename from internal/taskManager/footprintService.go rename to internal/taskManager/updateFootprintService.go From 6568b6d72355974eacec1d43e638c9bd6f53722a Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Tue, 3 Sep 2024 13:40:11 +0200 Subject: [PATCH 24/31] Prepare transaction API for general usage --- internal/importer/initDB.go | 2 +- internal/repository/transaction.go | 13 ++++++------- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/internal/importer/initDB.go b/internal/importer/initDB.go index fe49e94..7e9fed5 100644 --- a/internal/importer/initDB.go +++ b/internal/importer/initDB.go @@ -27,7 +27,7 @@ func InitDB() error { starttime := time.Now() log.Print("Building job table...") - t, err := r.TransactionInit() + t, err := r.TransactionInit(repository.NamedJobInsert) if err != nil { log.Warn("Error while initializing SQL transactions") return err diff --git a/internal/repository/transaction.go b/internal/repository/transaction.go index 9398354..992b423 100644 --- a/internal/repository/transaction.go +++ b/internal/repository/transaction.go @@ -15,20 +15,19 @@ type Transaction struct { stmt *sqlx.NamedStmt } -func (r *JobRepository) TransactionInit() (*Transaction, error) { +func (r *JobRepository) TransactionInit(sqlStmt string) (*Transaction, error) { var err error t := new(Transaction) - // Inserts are bundled into transactions because in sqlite, - // that speeds up inserts A LOT. + t.tx, err = r.DB.Beginx() if err != nil { log.Warn("Error while bundling transactions") return nil, err } - t.stmt, err = t.tx.PrepareNamed(NamedJobInsert) + t.stmt, err = t.tx.PrepareNamed(sqlStmt) if err != nil { - log.Warn("Error while preparing namedJobInsert") + log.Warn("Error while preparing SQL statement in transaction") return nil, err } @@ -63,8 +62,8 @@ func (r *JobRepository) TransactionEnd(t *Transaction) error { return nil } -func (r *JobRepository) TransactionAdd(t *Transaction, job schema.Job) (int64, error) { - res, err := t.stmt.Exec(job) +func (r *JobRepository) TransactionAdd(t *Transaction, obj interface{}) (int64, error) { + res, err := t.stmt.Exec(obj) if err != nil { log.Errorf("repository initDB(): %v", err) return 0, err From f58efa28711a9df1460c110bb78bb3d90ea9cfb9 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Tue, 3 Sep 2024 13:41:00 +0200 Subject: [PATCH 25/31] Allow to combine job update queries --- internal/archiver/archiveWorker.go | 12 ++-- internal/repository/job.go | 60 ++++++++----------- internal/taskManager/updateDurationService.go | 2 +- .../taskManager/updateFootprintService.go | 16 +++-- 4 files changed, 46 insertions(+), 44 deletions(-) diff --git a/internal/archiver/archiveWorker.go b/internal/archiver/archiveWorker.go index 4de5032..628e36e 100644 --- a/internal/archiver/archiveWorker.go +++ b/internal/archiver/archiveWorker.go @@ -12,6 +12,7 @@ import ( "github.com/ClusterCockpit/cc-backend/internal/repository" "github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/schema" + sq "github.com/Masterminds/squirrel" ) var ( @@ -53,17 +54,20 @@ func archivingWorker() { continue } - if err := jobRepo.UpdateFootprint(jobMeta); err != nil { + stmt := sq.Update("job").Where("job.id = ?", job.ID) + + if stmt, err = jobRepo.UpdateFootprint(stmt, jobMeta); err != nil { log.Errorf("archiving job (dbid: %d) failed at update Footprint step: %s", job.ID, err.Error()) continue } - if err := jobRepo.UpdateEnergy(jobMeta); err != nil { + if stmt, err = jobRepo.UpdateEnergy(stmt, jobMeta); err != nil { log.Errorf("archiving job (dbid: %d) failed at update Energy step: %s", job.ID, err.Error()) continue } // Update the jobs database entry one last time: - if err := jobRepo.MarkArchived(jobMeta, schema.MonitoringStatusArchivingSuccessful); err != nil { - log.Errorf("archiving job (dbid: %d) failed at marking archived step: %s", job.ID, err.Error()) + stmt = jobRepo.MarkArchived(stmt, schema.MonitoringStatusArchivingSuccessful) + if err := jobRepo.Execute(stmt); err != nil { + log.Errorf("archiving job (dbid: %d) failed at db execute: %s", job.ID, err.Error()) continue } log.Debugf("archiving job %d took %s", job.JobID, time.Since(start)) diff --git a/internal/repository/job.go b/internal/repository/job.go index 01dc0af..e5e2569 100644 --- a/internal/repository/job.go +++ b/internal/repository/job.go @@ -488,7 +488,6 @@ func (r *JobRepository) UpdateDuration() error { Set("duration", sq.Expr("? - job.start_time", time.Now().Unix())). Where("job_state = running"). RunWith(r.stmtCache).Exec(); err != nil { - log.Warnf("Error while updating metadata for job, DB ID '%v'", job.ID) return err } @@ -542,27 +541,29 @@ func (r *JobRepository) UpdateMonitoringStatus(job int64, monitoringStatus int32 return } -// FIXME: Combine the next three queries into one providing the db statement as function argument! -func (r *JobRepository) MarkArchived( - jobMeta *schema.JobMeta, - monitoringStatus int32, -) error { - stmt := sq.Update("job"). - Set("monitoring_status", monitoringStatus). - Where("job.id = ?", jobMeta.JobID) - +func (r *JobRepository) Execute(stmt sq.UpdateBuilder) error { if _, err := stmt.RunWith(r.stmtCache).Exec(); err != nil { - log.Warn("Error while marking job as archived") return err } + return nil } -func (r *JobRepository) UpdateEnergy(jobMeta *schema.JobMeta) error { +func (r *JobRepository) MarkArchived( + stmt sq.UpdateBuilder, + monitoringStatus int32, +) sq.UpdateBuilder { + return stmt.Set("monitoring_status", monitoringStatus) +} + +func (r *JobRepository) UpdateEnergy( + stmt sq.UpdateBuilder, + jobMeta *schema.JobMeta, +) (sq.UpdateBuilder, error) { sc, err := archive.GetSubCluster(jobMeta.Cluster, jobMeta.SubCluster) if err != nil { log.Errorf("cannot get subcluster: %s", err.Error()) - return err + return stmt, err } energyFootprint := make(map[string]float64) var totalEnergy float64 @@ -586,26 +587,23 @@ func (r *JobRepository) UpdateEnergy(jobMeta *schema.JobMeta) error { if rawFootprint, err = json.Marshal(energyFootprint); err != nil { log.Warnf("Error while marshaling energy footprint for job, DB ID '%v'", jobMeta.ID) - return err + return stmt, err } - stmt := sq.Update("job"). - Set("energy_footprint", rawFootprint). - Set("energy", totalEnergy). - Where("job.id = ?", jobMeta.JobID) + stmt.Set("energy_footprint", rawFootprint). + Set("energy", totalEnergy) - if _, err := stmt.RunWith(r.stmtCache).Exec(); err != nil { - log.Warn("Error while updating job energy footprint") - return err - } - return nil + return stmt, nil } -func (r *JobRepository) UpdateFootprint(jobMeta *schema.JobMeta) error { +func (r *JobRepository) UpdateFootprint( + stmt sq.UpdateBuilder, + jobMeta *schema.JobMeta, +) (sq.UpdateBuilder, error) { sc, err := archive.GetSubCluster(jobMeta.Cluster, jobMeta.SubCluster) if err != nil { log.Errorf("cannot get subcluster: %s", err.Error()) - return err + return stmt, err } footprint := make(map[string]float64) @@ -624,15 +622,9 @@ func (r *JobRepository) UpdateFootprint(jobMeta *schema.JobMeta) error { if rawFootprint, err = json.Marshal(footprint); err != nil { log.Warnf("Error while marshaling footprint for job, DB ID '%v'", jobMeta.ID) - return err + return stmt, err } - stmt := sq.Update("job").Set("footprint", rawFootprint). - Where("job.id = ?", jobMeta.JobID) - - if _, err := stmt.RunWith(r.stmtCache).Exec(); err != nil { - log.Warn("Error while updating job footprint") - return err - } - return nil + stmt.Set("footprint", rawFootprint) + return stmt, nil } diff --git a/internal/taskManager/updateDurationService.go b/internal/taskManager/updateDurationService.go index afc1045..6023547 100644 --- a/internal/taskManager/updateDurationService.go +++ b/internal/taskManager/updateDurationService.go @@ -21,6 +21,6 @@ func RegisterUpdateDurationWorker() { start := time.Now() log.Printf("Update duration started at %s", start.Format(time.RFC3339)) jobRepo.UpdateDuration() - log.Print("Update duration is done and took %s", time.Since(start)) + log.Printf("Update duration is done and took %s", time.Since(start)) })) } diff --git a/internal/taskManager/updateFootprintService.go b/internal/taskManager/updateFootprintService.go index ff76e25..510c73e 100644 --- a/internal/taskManager/updateFootprintService.go +++ b/internal/taskManager/updateFootprintService.go @@ -13,6 +13,7 @@ import ( "github.com/ClusterCockpit/cc-backend/pkg/archive" "github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/schema" + sq "github.com/Masterminds/squirrel" "github.com/go-co-op/gocron/v2" ) @@ -22,8 +23,8 @@ func RegisterFootprintWorker() { s.NewJob(gocron.DurationJob(d), gocron.NewTask( func() { - t := time.Now() - log.Printf("Update Footprints started at %s", t.Format(time.RFC3339)) + s := time.Now() + log.Printf("Update Footprints started at %s", s.Format(time.RFC3339)) for _, cluster := range archive.Clusters { jobs, err := jobRepo.FindRunningJobs(cluster.Name) if err != nil { @@ -77,16 +78,21 @@ func RegisterFootprintWorker() { } } - if err := jobRepo.UpdateFootprint(jobMeta); err != nil { + stmt := sq.Update("job").Where("job.id = ?", job.ID) + if stmt, err = jobRepo.UpdateFootprint(stmt, jobMeta); err != nil { log.Errorf("Update job (dbid: %d) failed at update Footprint step: %s", job.ID, err.Error()) continue } - if err := jobRepo.UpdateEnergy(jobMeta); err != nil { + if stmt, err = jobRepo.UpdateEnergy(stmt, jobMeta); err != nil { log.Errorf("Update job (dbid: %d) failed at update Energy step: %s", job.ID, err.Error()) continue } + if err := jobRepo.Execute(stmt); err != nil { + log.Errorf("Update job (dbid: %d) failed at db execute: %s", job.ID, err.Error()) + continue + } } } - log.Print("Update Footprints done") + log.Printf("Update Footprints is done and took %s", time.Since(s)) })) } From e267481f7191a4974c99403fb5e80c7520fe319b Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Tue, 3 Sep 2024 15:40:02 +0200 Subject: [PATCH 26/31] Cleanup transaction api --- internal/importer/initDB.go | 18 +++++++++--- internal/repository/transaction.go | 45 ++++++++++-------------------- 2 files changed, 28 insertions(+), 35 deletions(-) diff --git a/internal/importer/initDB.go b/internal/importer/initDB.go index 7e9fed5..5f06f36 100644 --- a/internal/importer/initDB.go +++ b/internal/importer/initDB.go @@ -16,6 +16,11 @@ import ( "github.com/ClusterCockpit/cc-backend/pkg/schema" ) +const ( + addTagQuery = "INSERT INTO tag (tag_name, tag_type) VALUES (?, ?)" + setTagQuery = "INSERT INTO jobtag (job_id, tag_id) VALUES (?, ?)" +) + // Delete the tables "job", "tag" and "jobtag" from the database and // repopulate them using the jobs found in `archive`. func InitDB() error { @@ -27,7 +32,7 @@ func InitDB() error { starttime := time.Now() log.Print("Building job table...") - t, err := r.TransactionInit(repository.NamedJobInsert) + t, err := r.TransactionInit() if err != nil { log.Warn("Error while initializing SQL transactions") return err @@ -105,7 +110,8 @@ func InitDB() error { continue } - id, err := r.TransactionAdd(t, job) + id, err := r.TransactionAddNamed(t, + repository.NamedJobInsert, job) if err != nil { log.Errorf("repository initDB(): %v", err) errorOccured++ @@ -116,7 +122,9 @@ func InitDB() error { tagstr := tag.Name + ":" + tag.Type tagId, ok := tags[tagstr] if !ok { - tagId, err = r.TransactionAddTag(t, tag) + tagId, err = r.TransactionAdd(t, + addTagQuery, + tag.Name, tag.Type) if err != nil { log.Errorf("Error adding tag: %v", err) errorOccured++ @@ -125,7 +133,9 @@ func InitDB() error { tags[tagstr] = tagId } - r.TransactionSetTag(t, id, tagId) + r.TransactionAdd(t, + setTagQuery, + id, tagId) } if err == nil { diff --git a/internal/repository/transaction.go b/internal/repository/transaction.go index 992b423..8c5d357 100644 --- a/internal/repository/transaction.go +++ b/internal/repository/transaction.go @@ -6,7 +6,6 @@ package repository import ( "github.com/ClusterCockpit/cc-backend/pkg/log" - "github.com/ClusterCockpit/cc-backend/pkg/schema" "github.com/jmoiron/sqlx" ) @@ -15,7 +14,7 @@ type Transaction struct { stmt *sqlx.NamedStmt } -func (r *JobRepository) TransactionInit(sqlStmt string) (*Transaction, error) { +func (r *JobRepository) TransactionInit() (*Transaction, error) { var err error t := new(Transaction) @@ -24,13 +23,6 @@ func (r *JobRepository) TransactionInit(sqlStmt string) (*Transaction, error) { log.Warn("Error while bundling transactions") return nil, err } - - t.stmt, err = t.tx.PrepareNamed(sqlStmt) - if err != nil { - log.Warn("Error while preparing SQL statement in transaction") - return nil, err - } - return t, nil } @@ -49,7 +41,6 @@ func (r *JobRepository) TransactionCommit(t *Transaction) error { return err } - t.stmt = t.tx.NamedStmt(t.stmt) return nil } @@ -62,10 +53,14 @@ func (r *JobRepository) TransactionEnd(t *Transaction) error { return nil } -func (r *JobRepository) TransactionAdd(t *Transaction, obj interface{}) (int64, error) { - res, err := t.stmt.Exec(obj) +func (r *JobRepository) TransactionAddNamed( + t *Transaction, + query string, + args ...interface{}, +) (int64, error) { + res, err := t.tx.NamedExec(query, args) if err != nil { - log.Errorf("repository initDB(): %v", err) + log.Errorf("Named Exec failed: %v", err) return 0, err } @@ -78,26 +73,14 @@ func (r *JobRepository) TransactionAdd(t *Transaction, obj interface{}) (int64, return id, nil } -func (r *JobRepository) TransactionAddTag(t *Transaction, tag *schema.Tag) (int64, error) { - res, err := t.tx.Exec(`INSERT INTO tag (tag_name, tag_type) VALUES (?, ?)`, tag.Name, tag.Type) +func (r *JobRepository) TransactionAdd(t *Transaction, query string, args ...interface{}) (int64, error) { + res := t.tx.MustExec(query, args) + + id, err := res.LastInsertId() if err != nil { - log.Errorf("Error while inserting tag into tag table: %v (Type %v)", tag.Name, tag.Type) - return 0, err - } - tagId, err := res.LastInsertId() - if err != nil { - log.Warn("Error while getting last insert ID") + log.Errorf("repository initDB(): %v", err) return 0, err } - return tagId, nil -} - -func (r *JobRepository) TransactionSetTag(t *Transaction, jobId int64, tagId int64) error { - if _, err := t.tx.Exec(`INSERT INTO jobtag (job_id, tag_id) VALUES (?, ?)`, jobId, tagId); err != nil { - log.Errorf("Error while inserting jobtag into jobtag table: %v (TagID %v)", jobId, tagId) - return err - } - - return nil + return id, nil } From 508978d586a9e9f98e6c1080160b520fc2bad441 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Tue, 3 Sep 2024 15:59:01 +0200 Subject: [PATCH 27/31] Initial attempt to update footprints in transaction --- internal/archiver/archiver.go | 3 ++- .../taskManager/updateFootprintService.go | 20 +++++++++++++++++-- 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/internal/archiver/archiver.go b/internal/archiver/archiver.go index abaecd6..de84cf0 100644 --- a/internal/archiver/archiver.go +++ b/internal/archiver/archiver.go @@ -24,7 +24,8 @@ func ArchiveJob(job *schema.Job, ctx context.Context) (*schema.JobMeta, error) { } scopes := []schema.MetricScope{schema.MetricScopeNode} - if job.NumNodes <= 8 { // FIXME: Add a config option for this + // FIXME: Add a config option for this + if job.NumNodes <= 8 { // This will add the native scope if core scope is not available scopes = append(scopes, schema.MetricScopeCore) } diff --git a/internal/taskManager/updateFootprintService.go b/internal/taskManager/updateFootprintService.go index 510c73e..2fdd6b9 100644 --- a/internal/taskManager/updateFootprintService.go +++ b/internal/taskManager/updateFootprintService.go @@ -25,6 +25,12 @@ func RegisterFootprintWorker() { func() { s := time.Now() log.Printf("Update Footprints started at %s", s.Format(time.RFC3339)) + + t, err := jobRepo.TransactionInit() + if err != nil { + log.Errorf("Failed TransactionInit %v", err) + } + for _, cluster := range archive.Clusters { jobs, err := jobRepo.FindRunningJobs(cluster.Name) if err != nil { @@ -87,12 +93,22 @@ func RegisterFootprintWorker() { log.Errorf("Update job (dbid: %d) failed at update Energy step: %s", job.ID, err.Error()) continue } - if err := jobRepo.Execute(stmt); err != nil { - log.Errorf("Update job (dbid: %d) failed at db execute: %s", job.ID, err.Error()) + + query, args, err := stmt.ToSql() + if err != nil { + log.Errorf("Failed in ToSQL conversion %v", err) continue } + jobRepo.TransactionAdd(t, query, args) + // if err := jobRepo.Execute(stmt); err != nil { + // log.Errorf("Update job (dbid: %d) failed at db execute: %s", job.ID, err.Error()) + // continue + // } } + + jobRepo.TransactionCommit(t) } + jobRepo.TransactionEnd(t) log.Printf("Update Footprints is done and took %s", time.Since(s)) })) } From 53ca38ce530861e501965d011828843cf4b0c1db Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Thu, 5 Sep 2024 11:18:00 +0200 Subject: [PATCH 28/31] Add debug output to duration query --- internal/repository/job.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/internal/repository/job.go b/internal/repository/job.go index e5e2569..1cb4c62 100644 --- a/internal/repository/job.go +++ b/internal/repository/job.go @@ -484,10 +484,14 @@ func (r *JobRepository) FindRunningJobs(cluster string) ([]*schema.Job, error) { } func (r *JobRepository) UpdateDuration() error { - if _, err := sq.Update("job"). + stmnt := sq.Update("job"). Set("duration", sq.Expr("? - job.start_time", time.Now().Unix())). - Where("job_state = running"). - RunWith(r.stmtCache).Exec(); err != nil { + Where("job_state = running") + sql, _, err := stmnt.ToSql() + log.Infof("Duration Update query %s", sql) + + _, err = stmnt.RunWith(r.stmtCache).Exec() + if err != nil { return err } From 5e65e21f0bb9387523f46887de607319e0df831a Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Thu, 5 Sep 2024 12:38:39 +0200 Subject: [PATCH 29/31] Add quotes in duration query --- internal/repository/job.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/repository/job.go b/internal/repository/job.go index 1cb4c62..c411ab2 100644 --- a/internal/repository/job.go +++ b/internal/repository/job.go @@ -486,7 +486,7 @@ func (r *JobRepository) FindRunningJobs(cluster string) ([]*schema.Job, error) { func (r *JobRepository) UpdateDuration() error { stmnt := sq.Update("job"). Set("duration", sq.Expr("? - job.start_time", time.Now().Unix())). - Where("job_state = running") + Where("job_state = 'running'") sql, _, err := stmnt.ToSql() log.Infof("Duration Update query %s", sql) From 7c33dcf630f13c376f56ec5f72f83db9887c5274 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Thu, 5 Sep 2024 14:58:08 +0200 Subject: [PATCH 30/31] Bugfix in footprint update --- internal/repository/job.go | 4 +--- internal/taskManager/updateFootprintService.go | 7 +++++-- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/internal/repository/job.go b/internal/repository/job.go index c411ab2..9bad866 100644 --- a/internal/repository/job.go +++ b/internal/repository/job.go @@ -487,10 +487,8 @@ func (r *JobRepository) UpdateDuration() error { stmnt := sq.Update("job"). Set("duration", sq.Expr("? - job.start_time", time.Now().Unix())). Where("job_state = 'running'") - sql, _, err := stmnt.ToSql() - log.Infof("Duration Update query %s", sql) - _, err = stmnt.RunWith(r.stmtCache).Exec() + _, err := stmnt.RunWith(r.stmtCache).Exec() if err != nil { return err } diff --git a/internal/taskManager/updateFootprintService.go b/internal/taskManager/updateFootprintService.go index 2fdd6b9..2434fd1 100644 --- a/internal/taskManager/updateFootprintService.go +++ b/internal/taskManager/updateFootprintService.go @@ -85,11 +85,14 @@ func RegisterFootprintWorker() { } stmt := sq.Update("job").Where("job.id = ?", job.ID) - if stmt, err = jobRepo.UpdateFootprint(stmt, jobMeta); err != nil { + stmt, err = jobRepo.UpdateFootprint(stmt, jobMeta) + if err != nil { log.Errorf("Update job (dbid: %d) failed at update Footprint step: %s", job.ID, err.Error()) continue } - if stmt, err = jobRepo.UpdateEnergy(stmt, jobMeta); err != nil { + + stmt, err = jobRepo.UpdateEnergy(stmt, jobMeta) + if err != nil { log.Errorf("Update job (dbid: %d) failed at update Energy step: %s", job.ID, err.Error()) continue } From 7ea4086807cc13b08422e303e9d962fdc5fec2ff Mon Sep 17 00:00:00 2001 From: Christoph Kluge Date: Thu, 5 Sep 2024 15:06:38 +0200 Subject: [PATCH 31/31] Rework sqlite indices in v8 migration --- .../sqlite3/08_add-footprint.up.sql | 54 +++++++++++++++++-- 1 file changed, 49 insertions(+), 5 deletions(-) diff --git a/internal/repository/migrations/sqlite3/08_add-footprint.up.sql b/internal/repository/migrations/sqlite3/08_add-footprint.up.sql index 4895258..7f0d578 100644 --- a/internal/repository/migrations/sqlite3/08_add-footprint.up.sql +++ b/internal/repository/migrations/sqlite3/08_add-footprint.up.sql @@ -1,7 +1,11 @@ -CREATE INDEX IF NOT EXISTS jobs_cluster_orderby_starttime ON job (cluster, start_time DESC); -CREATE INDEX IF NOT EXISTS jobs_cluster_count ON job (cluster, job_state, start_time); -CREATE INDEX IF NOT EXISTS jobs_project_orderby_starttime ON job (project, start_time DESC); -CREATE INDEX IF NOT EXISTS jobs_project_count ON job (project, job_state, start_time); +DROP INDEX job_stats; +DROP INDEX job_by_user; +DROP INDEX job_by_starttime; +DROP INDEX job_by_job_id; +DROP INDEX job_list; +DROP INDEX job_list_user; +DROP INDEX job_list_users; +DROP INDEX job_list_users_start; ALTER TABLE job ADD COLUMN energy REAL NOT NULL DEFAULT 0.0; ALTER TABLE job ADD COLUMN energy_footprint TEXT DEFAULT NULL; @@ -27,4 +31,44 @@ ALTER TABLE job DROP net_data_vol_total; ALTER TABLE job DROP file_bw_avg; ALTER TABLE job DROP file_data_vol_total; -PRAGMA optimize; \ No newline at end of file +CREATE INDEX IF NOT EXISTS jobs_cluster ON job (cluster); +CREATE INDEX IF NOT EXISTS jobs_cluster_starttime ON job (cluster, start_time); +CREATE INDEX IF NOT EXISTS jobs_cluster_user ON job (cluster, user); +CREATE INDEX IF NOT EXISTS jobs_cluster_project ON job (cluster, project); +CREATE INDEX IF NOT EXISTS jobs_cluster_subcluster ON job (cluster, subcluster); + +CREATE INDEX IF NOT EXISTS jobs_cluster_partition ON job (cluster, partition); +CREATE INDEX IF NOT EXISTS jobs_cluster_partition_starttime ON job (cluster, partition, start_time); +CREATE INDEX IF NOT EXISTS jobs_cluster_partition_jobstate ON job (cluster, partition, job_state); +CREATE INDEX IF NOT EXISTS jobs_cluster_partition_jobstate_user ON job (cluster, partition, job_state, user); +CREATE INDEX IF NOT EXISTS jobs_cluster_partition_jobstate_project ON job (cluster, partition, job_state, project); +CREATE INDEX IF NOT EXISTS jobs_cluster_partition_jobstate_starttime ON job (cluster, partition, job_state, start_time); + +CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate ON job (cluster, job_state); +CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_starttime ON job (cluster, job_state, start_time); +CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_user ON job (cluster, job_state, user); +CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_project ON job (cluster, job_state, project); + +CREATE INDEX IF NOT EXISTS jobs_user ON job (user); +CREATE INDEX IF NOT EXISTS jobs_user_starttime ON job (user, start_time); + +CREATE INDEX IF NOT EXISTS jobs_project ON job (project); +CREATE INDEX IF NOT EXISTS jobs_project_starttime ON job (project, start_time); +CREATE INDEX IF NOT EXISTS jobs_project_user ON job (project, user); + +CREATE INDEX IF NOT EXISTS jobs_jobstate ON job (job_state); +CREATE INDEX IF NOT EXISTS jobs_jobstate_user ON job (job_state, user); +CREATE INDEX IF NOT EXISTS jobs_jobstate_project ON job (job_state, project); +CREATE INDEX IF NOT EXISTS jobs_jobstate_cluster ON job (job_state, cluster); +CREATE INDEX IF NOT EXISTS jobs_jobstate_starttime ON job (job_state, start_time); + +CREATE INDEX IF NOT EXISTS jobs_arrayjobid_starttime ON job (array_job_id, start_time); +CREATE INDEX IF NOT EXISTS jobs_cluster_arrayjobid_starttime ON job (cluster, array_job_id, start_time); + +CREATE INDEX IF NOT EXISTS jobs_starttime ON job (start_time); +CREATE INDEX IF NOT EXISTS jobs_duration ON job (duration); +CREATE INDEX IF NOT EXISTS jobs_numnodes ON job (num_nodes); +CREATE INDEX IF NOT EXISTS jobs_numhwthreads ON job (num_hwthreads); +CREATE INDEX IF NOT EXISTS jobs_numacc ON job (num_acc); + +PRAGMA optimize;