From d89574ce7338b52d31822017869dfe020d2519ed Mon Sep 17 00:00:00 2001 From: Christoph Kluge Date: Fri, 22 Nov 2024 12:42:49 +0100 Subject: [PATCH 01/10] Use repo.loadStats, move transaction init --- .../taskManager/updateFootprintService.go | 74 ++++++++++--------- 1 file changed, 39 insertions(+), 35 deletions(-) diff --git a/internal/taskManager/updateFootprintService.go b/internal/taskManager/updateFootprintService.go index efca6d1..546d5a7 100644 --- a/internal/taskManager/updateFootprintService.go +++ b/internal/taskManager/updateFootprintService.go @@ -10,7 +10,7 @@ import ( "time" "github.com/ClusterCockpit/cc-backend/internal/config" - "github.com/ClusterCockpit/cc-backend/internal/metricDataDispatcher" + "github.com/ClusterCockpit/cc-backend/internal/metricdata" "github.com/ClusterCockpit/cc-backend/pkg/archive" "github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/schema" @@ -37,11 +37,6 @@ func RegisterFootprintWorker() { cl := 0 log.Printf("Update Footprints started at %s", s.Format(time.RFC3339)) - t, err := jobRepo.TransactionInit() - if err != nil { - log.Errorf("Failed TransactionInit %v", err) - } - for _, cluster := range archive.Clusters { jobs, err := jobRepo.FindRunningJobs(cluster.Name) if err != nil { @@ -53,16 +48,21 @@ func RegisterFootprintWorker() { allMetrics = append(allMetrics, mc.Name) } - scopes := []schema.MetricScope{schema.MetricScopeNode} - scopes = append(scopes, schema.MetricScopeCore) - scopes = append(scopes, schema.MetricScopeAccelerator) + repo, err := metricdata.GetMetricDataRepo(cluster.Name) + if err != nil { + log.Warnf("no metric data repository configured for '%s'", cluster.Name) + continue + } + + pendingStatements := make([]sq.UpdateBuilder, len(jobs)) for _, job := range jobs { log.Debugf("Try job %d", job.JobID) cl++ - jobData, err := metricDataDispatcher.LoadData(job, allMetrics, scopes, context.Background(), 0) // 0 Resolution-Value retrieves highest res + + jobStats, err := repo.LoadStats(job, allMetrics, context.Background()) if err != nil { - log.Errorf("Error wile loading job data for footprint update: %v", err) + log.Errorf("Error wile loading job data stats for footprint update: %v", err) ce++ continue } @@ -73,19 +73,19 @@ func RegisterFootprintWorker() { Statistics: make(map[string]schema.JobStatistics), } - for metric, data := range jobData { + for metric, data := range jobStats { // Metric, Hostname:Stats avg, min, max := 0.0, math.MaxFloat32, -math.MaxFloat32 - nodeData, ok := data["node"] - if !ok { - // This should never happen ? - ce++ - continue - } + // nodeData, ok := data["node"] + // if !ok { + // // This should never happen ? + // ce++ + // continue + // } - for _, series := range nodeData.Series { - avg += series.Statistics.Avg - min = math.Min(min, series.Statistics.Min) - max = math.Max(max, series.Statistics.Max) + for _, hostStats := range data { + avg += hostStats.Avg + min = math.Min(min, hostStats.Min) + max = math.Max(max, hostStats.Max) } // Add values rounded to 2 digits @@ -100,25 +100,34 @@ func RegisterFootprintWorker() { } } - // Init UpdateBuilder + // Build Statement per Job, Add to Pending Array stmt := sq.Update("job") - // Add SET queries stmt, err = jobRepo.UpdateFootprint(stmt, jobMeta) if err != nil { - log.Errorf("Update job (dbid: %d) failed at update Footprint step: %s", job.ID, err.Error()) + log.Errorf("Update job (dbid: %d) statement build failed at footprint step: %s", job.ID, err.Error()) ce++ continue } stmt, err = jobRepo.UpdateEnergy(stmt, jobMeta) if err != nil { - log.Errorf("Update job (dbid: %d) failed at update Energy step: %s", job.ID, err.Error()) + log.Errorf("update job (dbid: %d) statement build failed at energy step: %s", job.ID, err.Error()) ce++ continue } - // Add WHERE Filter stmt = stmt.Where("job.id = ?", job.ID) - query, args, err := stmt.ToSql() + pendingStatements = append(pendingStatements, stmt) + log.Debugf("Finish Job Preparation %d", job.JobID) + } + + t, err := jobRepo.TransactionInit() + if err != nil { + log.Errorf("Failed TransactionInit %v", err) + } + + for _, ps := range pendingStatements { + + query, args, err := ps.ToSql() if err != nil { log.Errorf("Failed in ToSQL conversion: %v", err) ce++ @@ -127,17 +136,12 @@ func RegisterFootprintWorker() { // Args: JSON, JSON, ENERGY, JOBID jobRepo.TransactionAdd(t, query, args...) - // if err := jobRepo.Execute(stmt); err != nil { - // log.Errorf("Update job footprint (dbid: %d) failed at db execute: %s", job.ID, err.Error()) - // continue - // } c++ - log.Debugf("Finish Job %d", job.JobID) } - jobRepo.TransactionCommit(t) + + jobRepo.TransactionEnd(t) log.Debugf("Finish Cluster %s", cluster.Name) } - jobRepo.TransactionEnd(t) log.Printf("Updating %d (of %d; Skipped %d) Footprints is done and took %s", c, cl, ce, time.Since(s)) })) } From 21b3a67988a3a7be48336b0dd28ca8059761f53d Mon Sep 17 00:00:00 2001 From: Christoph Kluge Date: Fri, 22 Nov 2024 13:13:43 +0100 Subject: [PATCH 02/10] add timers, add else case for transaction add --- .../taskManager/updateFootprintService.go | 24 +++++++++---------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/internal/taskManager/updateFootprintService.go b/internal/taskManager/updateFootprintService.go index 546d5a7..60ba988 100644 --- a/internal/taskManager/updateFootprintService.go +++ b/internal/taskManager/updateFootprintService.go @@ -38,6 +38,7 @@ func RegisterFootprintWorker() { log.Printf("Update Footprints started at %s", s.Format(time.RFC3339)) for _, cluster := range archive.Clusters { + s_cluster := time.Now() jobs, err := jobRepo.FindRunningJobs(cluster.Name) if err != nil { continue @@ -60,6 +61,8 @@ func RegisterFootprintWorker() { log.Debugf("Try job %d", job.JobID) cl++ + s_job := time.Now() + jobStats, err := repo.LoadStats(job, allMetrics, context.Background()) if err != nil { log.Errorf("Error wile loading job data stats for footprint update: %v", err) @@ -75,12 +78,6 @@ func RegisterFootprintWorker() { for metric, data := range jobStats { // Metric, Hostname:Stats avg, min, max := 0.0, math.MaxFloat32, -math.MaxFloat32 - // nodeData, ok := data["node"] - // if !ok { - // // This should never happen ? - // ce++ - // continue - // } for _, hostStats := range data { avg += hostStats.Avg @@ -117,7 +114,7 @@ func RegisterFootprintWorker() { stmt = stmt.Where("job.id = ?", job.ID) pendingStatements = append(pendingStatements, stmt) - log.Debugf("Finish Job Preparation %d", job.JobID) + log.Debugf("Finish Job Preparation %d, took %s", job.JobID, time.Since(s_job)) } t, err := jobRepo.TransactionInit() @@ -129,18 +126,19 @@ func RegisterFootprintWorker() { query, args, err := ps.ToSql() if err != nil { + log.Debugf(">>> Query: %v", query) + log.Debugf(">>> Args: %v", args) log.Errorf("Failed in ToSQL conversion: %v", err) ce++ - continue + } else { + // Args: JSON, JSON, ENERGY, JOBID + jobRepo.TransactionAdd(t, query, args...) + c++ } - - // Args: JSON, JSON, ENERGY, JOBID - jobRepo.TransactionAdd(t, query, args...) - c++ } jobRepo.TransactionEnd(t) - log.Debugf("Finish Cluster %s", cluster.Name) + log.Debugf("Finish Cluster %s, took %s", cluster.Name, time.Since(s_cluster)) } log.Printf("Updating %d (of %d; Skipped %d) Footprints is done and took %s", c, cl, ce, time.Since(s)) })) From 69f8a34aaccffda7675386e7846a047d8b52540d Mon Sep 17 00:00:00 2001 From: Christoph Kluge Date: Fri, 22 Nov 2024 13:36:26 +0100 Subject: [PATCH 03/10] more logging --- internal/taskManager/updateFootprintService.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/internal/taskManager/updateFootprintService.go b/internal/taskManager/updateFootprintService.go index 60ba988..fba0ec7 100644 --- a/internal/taskManager/updateFootprintService.go +++ b/internal/taskManager/updateFootprintService.go @@ -65,7 +65,7 @@ func RegisterFootprintWorker() { jobStats, err := repo.LoadStats(job, allMetrics, context.Background()) if err != nil { - log.Errorf("Error wile loading job data stats for footprint update: %v", err) + log.Errorf("error wile loading job data stats for footprint update: %v", err) ce++ continue } @@ -101,7 +101,7 @@ func RegisterFootprintWorker() { stmt := sq.Update("job") stmt, err = jobRepo.UpdateFootprint(stmt, jobMeta) if err != nil { - log.Errorf("Update job (dbid: %d) statement build failed at footprint step: %s", job.ID, err.Error()) + log.Errorf("update job (dbid: %d) statement build failed at footprint step: %s", job.ID, err.Error()) ce++ continue } @@ -114,24 +114,24 @@ func RegisterFootprintWorker() { stmt = stmt.Where("job.id = ?", job.ID) pendingStatements = append(pendingStatements, stmt) - log.Debugf("Finish Job Preparation %d, took %s", job.JobID, time.Since(s_job)) + log.Debugf("Job %d took %s", job.JobID, time.Since(s_job)) } + log.Debugf("Finish preparation for %d jobs: %d statements", len(jobs), len(pendingStatements)) t, err := jobRepo.TransactionInit() if err != nil { - log.Errorf("Failed TransactionInit %v", err) + log.Errorf("failed TransactionInit %v", err) } - for _, ps := range pendingStatements { + for idx, ps := range pendingStatements { query, args, err := ps.ToSql() if err != nil { - log.Debugf(">>> Query: %v", query) - log.Debugf(">>> Args: %v", args) - log.Errorf("Failed in ToSQL conversion: %v", err) + log.Errorf("failed in ToSQL conversion: %v", err) ce++ } else { // Args: JSON, JSON, ENERGY, JOBID + log.Infof("add transaction on index %d", idx) jobRepo.TransactionAdd(t, query, args...) c++ } From baa7367ebec344b19a76eaf5d831f50389109c73 Mon Sep 17 00:00:00 2001 From: Christoph Kluge Date: Fri, 22 Nov 2024 13:39:59 +0100 Subject: [PATCH 04/10] change array init to empty array --- internal/taskManager/updateFootprintService.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/taskManager/updateFootprintService.go b/internal/taskManager/updateFootprintService.go index fba0ec7..24c8a3e 100644 --- a/internal/taskManager/updateFootprintService.go +++ b/internal/taskManager/updateFootprintService.go @@ -55,7 +55,7 @@ func RegisterFootprintWorker() { continue } - pendingStatements := make([]sq.UpdateBuilder, len(jobs)) + pendingStatements := []sq.UpdateBuilder{} for _, job := range jobs { log.Debugf("Try job %d", job.JobID) From a8eff6fbd196a0452e4cde6762af39d2d03e2edd Mon Sep 17 00:00:00 2001 From: Christoph Kluge Date: Fri, 22 Nov 2024 15:08:53 +0100 Subject: [PATCH 05/10] small logging changes --- internal/taskManager/updateFootprintService.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/internal/taskManager/updateFootprintService.go b/internal/taskManager/updateFootprintService.go index 24c8a3e..b884ed6 100644 --- a/internal/taskManager/updateFootprintService.go +++ b/internal/taskManager/updateFootprintService.go @@ -51,14 +51,14 @@ func RegisterFootprintWorker() { repo, err := metricdata.GetMetricDataRepo(cluster.Name) if err != nil { - log.Warnf("no metric data repository configured for '%s'", cluster.Name) + log.Errorf("no metric data repository configured for '%s'", cluster.Name) continue } pendingStatements := []sq.UpdateBuilder{} for _, job := range jobs { - log.Debugf("Try job %d", job.JobID) + log.Debugf("Prepare job %d", job.JobID) cl++ s_job := time.Now() @@ -116,7 +116,6 @@ func RegisterFootprintWorker() { pendingStatements = append(pendingStatements, stmt) log.Debugf("Job %d took %s", job.JobID, time.Since(s_job)) } - log.Debugf("Finish preparation for %d jobs: %d statements", len(jobs), len(pendingStatements)) t, err := jobRepo.TransactionInit() if err != nil { @@ -131,7 +130,7 @@ func RegisterFootprintWorker() { ce++ } else { // Args: JSON, JSON, ENERGY, JOBID - log.Infof("add transaction on index %d", idx) + log.Debugf("add transaction on index %d", idx) jobRepo.TransactionAdd(t, query, args...) c++ } From 5f4a74f8bad56576f292b1bd343ba31ac6194010 Mon Sep 17 00:00:00 2001 From: Christoph Kluge Date: Fri, 22 Nov 2024 15:57:28 +0100 Subject: [PATCH 06/10] add check on returned stats --- internal/taskManager/updateFootprintService.go | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/internal/taskManager/updateFootprintService.go b/internal/taskManager/updateFootprintService.go index b884ed6..d542371 100644 --- a/internal/taskManager/updateFootprintService.go +++ b/internal/taskManager/updateFootprintService.go @@ -79,10 +79,16 @@ func RegisterFootprintWorker() { for metric, data := range jobStats { // Metric, Hostname:Stats avg, min, max := 0.0, math.MaxFloat32, -math.MaxFloat32 - for _, hostStats := range data { - avg += hostStats.Avg - min = math.Min(min, hostStats.Min) - max = math.Max(max, hostStats.Max) + for hostname := range data { + hostStats, ok := data[hostname] + if !ok { + log.Debugf("footprintWorker: NAN stats returned for job %d @ %s", job.JobID, hostname) + } else { + log.Debugf("stats returned for job %d : %#v", job.JobID, hostStats) + avg += hostStats.Avg + min = math.Min(min, hostStats.Min) + max = math.Max(max, hostStats.Max) + } } // Add values rounded to 2 digits From 00ddc462d20a5731e5de256aeff3d0e63143c0b5 Mon Sep 17 00:00:00 2001 From: Christoph Kluge Date: Fri, 22 Nov 2024 16:31:35 +0100 Subject: [PATCH 07/10] expand check, change to zero init --- .../taskManager/updateFootprintService.go | 24 ++++++++++--------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/internal/taskManager/updateFootprintService.go b/internal/taskManager/updateFootprintService.go index d542371..d131dc5 100644 --- a/internal/taskManager/updateFootprintService.go +++ b/internal/taskManager/updateFootprintService.go @@ -76,19 +76,21 @@ func RegisterFootprintWorker() { Statistics: make(map[string]schema.JobStatistics), } - for metric, data := range jobStats { // Metric, Hostname:Stats - avg, min, max := 0.0, math.MaxFloat32, -math.MaxFloat32 + for metric := range jobStats { // Metric, Hostname:Stats + avg, min, max := 0.0, 0.0, 0.0 // math.MaxFloat32, -math.MaxFloat32 - for hostname := range data { - hostStats, ok := data[hostname] - if !ok { - log.Debugf("footprintWorker: NAN stats returned for job %d @ %s", job.JobID, hostname) - } else { - log.Debugf("stats returned for job %d : %#v", job.JobID, hostStats) - avg += hostStats.Avg - min = math.Min(min, hostStats.Min) - max = math.Max(max, hostStats.Max) + data, ok := jobStats[metric] + if ok { + for hostname := range data { + hostStats, ok := data[hostname] + if ok { + avg += hostStats.Avg + min = math.Min(min, hostStats.Min) + max = math.Max(max, hostStats.Max) + } } + } else { + log.Debugf("no stats data return for job %d, metric %s", job.JobID, metric) } // Add values rounded to 2 digits From 93d5a0e532c1a5399e132a7fd66c5ded6533c11c Mon Sep 17 00:00:00 2001 From: Christoph Kluge Date: Fri, 22 Nov 2024 16:59:18 +0100 Subject: [PATCH 08/10] correct input for check --- internal/taskManager/updateFootprintService.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/internal/taskManager/updateFootprintService.go b/internal/taskManager/updateFootprintService.go index d131dc5..2a1e590 100644 --- a/internal/taskManager/updateFootprintService.go +++ b/internal/taskManager/updateFootprintService.go @@ -76,17 +76,18 @@ func RegisterFootprintWorker() { Statistics: make(map[string]schema.JobStatistics), } - for metric := range jobStats { // Metric, Hostname:Stats + for _, metric := range allMetrics { avg, min, max := 0.0, 0.0, 0.0 // math.MaxFloat32, -math.MaxFloat32 - - data, ok := jobStats[metric] + data, ok := jobStats[metric] // Metric:[Hostname:Stats] if ok { - for hostname := range data { - hostStats, ok := data[hostname] + for _, res := range job.Resources { + hostStats, ok := data[res.Hostname] if ok { avg += hostStats.Avg min = math.Min(min, hostStats.Min) max = math.Max(max, hostStats.Max) + } else { + log.Debugf("no stats data return for host %s in job %d, metric %s", res.Hostname, job.JobID, metric) } } } else { From d4f487d5546c3a0cac4d62fe10785cf91f134f36 Mon Sep 17 00:00:00 2001 From: Christoph Kluge Date: Fri, 22 Nov 2024 17:56:55 +0100 Subject: [PATCH 09/10] comment debug logging --- internal/taskManager/updateFootprintService.go | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/internal/taskManager/updateFootprintService.go b/internal/taskManager/updateFootprintService.go index 2a1e590..580e338 100644 --- a/internal/taskManager/updateFootprintService.go +++ b/internal/taskManager/updateFootprintService.go @@ -77,8 +77,8 @@ func RegisterFootprintWorker() { } for _, metric := range allMetrics { - avg, min, max := 0.0, 0.0, 0.0 // math.MaxFloat32, -math.MaxFloat32 - data, ok := jobStats[metric] // Metric:[Hostname:Stats] + avg, min, max := 0.0, 0.0, 0.0 + data, ok := jobStats[metric] // Metric:[Hostname:Stats] if ok { for _, res := range job.Resources { hostStats, ok := data[res.Hostname] @@ -86,13 +86,15 @@ func RegisterFootprintWorker() { avg += hostStats.Avg min = math.Min(min, hostStats.Min) max = math.Max(max, hostStats.Max) - } else { - log.Debugf("no stats data return for host %s in job %d, metric %s", res.Hostname, job.JobID, metric) } + // else { + // log.Debugf("no stats data return for host %s in job %d, metric %s", res.Hostname, job.JobID, metric) + // } } - } else { - log.Debugf("no stats data return for job %d, metric %s", job.JobID, metric) } + // else { + // log.Debugf("no stats data return for job %d, metric %s", job.JobID, metric) + // } // Add values rounded to 2 digits jobMeta.Statistics[metric] = schema.JobStatistics{ From 8ea1454c066b5c69fb251f0a9bbe0afda71432dd Mon Sep 17 00:00:00 2001 From: Christoph Kluge Date: Mon, 25 Nov 2024 17:03:59 +0100 Subject: [PATCH 10/10] improve transaction init error handling --- .../taskManager/updateFootprintService.go | 41 +++++++++---------- 1 file changed, 19 insertions(+), 22 deletions(-) diff --git a/internal/taskManager/updateFootprintService.go b/internal/taskManager/updateFootprintService.go index 580e338..59c1b12 100644 --- a/internal/taskManager/updateFootprintService.go +++ b/internal/taskManager/updateFootprintService.go @@ -43,6 +43,9 @@ func RegisterFootprintWorker() { if err != nil { continue } + // NOTE: Additional Subcluster Loop Could Allow For Limited List Of (Energy)Footprint-Metrics Only. + // - Chunk-Size Would Then Be 'SubCluster' (Running Jobs, Transactions) as Lists Can Change Within SCs + // - Would Require Review of 'updateFootprint' And 'updateEnergy' Usage allMetrics := make([]string, 0) metricConfigs := archive.GetCluster(cluster.Name).MetricConfig for _, mc := range metricConfigs { @@ -78,7 +81,7 @@ func RegisterFootprintWorker() { for _, metric := range allMetrics { avg, min, max := 0.0, 0.0, 0.0 - data, ok := jobStats[metric] // Metric:[Hostname:Stats] + data, ok := jobStats[metric] // JobStats[Metric1:[Hostname1:[Stats], Hostname2:[Stats], ...], Metric2[...] ...] if ok { for _, res := range job.Resources { hostStats, ok := data[res.Hostname] @@ -87,14 +90,9 @@ func RegisterFootprintWorker() { min = math.Min(min, hostStats.Min) max = math.Max(max, hostStats.Max) } - // else { - // log.Debugf("no stats data return for host %s in job %d, metric %s", res.Hostname, job.JobID, metric) - // } + } } - // else { - // log.Debugf("no stats data return for job %d, metric %s", job.JobID, metric) - // } // Add values rounded to 2 digits jobMeta.Statistics[metric] = schema.JobStatistics{ @@ -131,23 +129,22 @@ func RegisterFootprintWorker() { t, err := jobRepo.TransactionInit() if err != nil { log.Errorf("failed TransactionInit %v", err) - } - - for idx, ps := range pendingStatements { - - query, args, err := ps.ToSql() - if err != nil { - log.Errorf("failed in ToSQL conversion: %v", err) - ce++ - } else { - // Args: JSON, JSON, ENERGY, JOBID - log.Debugf("add transaction on index %d", idx) - jobRepo.TransactionAdd(t, query, args...) - c++ + log.Errorf("skipped %d transactions for cluster %s", len(pendingStatements), cluster.Name) + ce += len(pendingStatements) + } else { + for _, ps := range pendingStatements { + query, args, err := ps.ToSql() + if err != nil { + log.Errorf("failed in ToSQL conversion: %v", err) + ce++ + } else { + // args...: Footprint-JSON, Energyfootprint-JSON, TotalEnergy, JobID + jobRepo.TransactionAdd(t, query, args...) + c++ + } } + jobRepo.TransactionEnd(t) } - - jobRepo.TransactionEnd(t) log.Debugf("Finish Cluster %s, took %s", cluster.Name, time.Since(s_cluster)) } log.Printf("Updating %d (of %d; Skipped %d) Footprints is done and took %s", c, cl, ce, time.Since(s))