From d89574ce7338b52d31822017869dfe020d2519ed Mon Sep 17 00:00:00 2001 From: Christoph Kluge Date: Fri, 22 Nov 2024 12:42:49 +0100 Subject: [PATCH 01/16] Use repo.loadStats, move transaction init --- .../taskManager/updateFootprintService.go | 74 ++++++++++--------- 1 file changed, 39 insertions(+), 35 deletions(-) diff --git a/internal/taskManager/updateFootprintService.go b/internal/taskManager/updateFootprintService.go index efca6d1..546d5a7 100644 --- a/internal/taskManager/updateFootprintService.go +++ b/internal/taskManager/updateFootprintService.go @@ -10,7 +10,7 @@ import ( "time" "github.com/ClusterCockpit/cc-backend/internal/config" - "github.com/ClusterCockpit/cc-backend/internal/metricDataDispatcher" + "github.com/ClusterCockpit/cc-backend/internal/metricdata" "github.com/ClusterCockpit/cc-backend/pkg/archive" "github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/schema" @@ -37,11 +37,6 @@ func RegisterFootprintWorker() { cl := 0 log.Printf("Update Footprints started at %s", s.Format(time.RFC3339)) - t, err := jobRepo.TransactionInit() - if err != nil { - log.Errorf("Failed TransactionInit %v", err) - } - for _, cluster := range archive.Clusters { jobs, err := jobRepo.FindRunningJobs(cluster.Name) if err != nil { @@ -53,16 +48,21 @@ func RegisterFootprintWorker() { allMetrics = append(allMetrics, mc.Name) } - scopes := []schema.MetricScope{schema.MetricScopeNode} - scopes = append(scopes, schema.MetricScopeCore) - scopes = append(scopes, schema.MetricScopeAccelerator) + repo, err := metricdata.GetMetricDataRepo(cluster.Name) + if err != nil { + log.Warnf("no metric data repository configured for '%s'", cluster.Name) + continue + } + + pendingStatements := make([]sq.UpdateBuilder, len(jobs)) for _, job := range jobs { log.Debugf("Try job %d", job.JobID) cl++ - jobData, err := metricDataDispatcher.LoadData(job, allMetrics, scopes, context.Background(), 0) // 0 Resolution-Value retrieves highest res + + jobStats, err := repo.LoadStats(job, allMetrics, context.Background()) if err != nil { - log.Errorf("Error wile loading job data for footprint update: %v", err) + log.Errorf("Error wile loading job data stats for footprint update: %v", err) ce++ continue } @@ -73,19 +73,19 @@ func RegisterFootprintWorker() { Statistics: make(map[string]schema.JobStatistics), } - for metric, data := range jobData { + for metric, data := range jobStats { // Metric, Hostname:Stats avg, min, max := 0.0, math.MaxFloat32, -math.MaxFloat32 - nodeData, ok := data["node"] - if !ok { - // This should never happen ? - ce++ - continue - } + // nodeData, ok := data["node"] + // if !ok { + // // This should never happen ? + // ce++ + // continue + // } - for _, series := range nodeData.Series { - avg += series.Statistics.Avg - min = math.Min(min, series.Statistics.Min) - max = math.Max(max, series.Statistics.Max) + for _, hostStats := range data { + avg += hostStats.Avg + min = math.Min(min, hostStats.Min) + max = math.Max(max, hostStats.Max) } // Add values rounded to 2 digits @@ -100,25 +100,34 @@ func RegisterFootprintWorker() { } } - // Init UpdateBuilder + // Build Statement per Job, Add to Pending Array stmt := sq.Update("job") - // Add SET queries stmt, err = jobRepo.UpdateFootprint(stmt, jobMeta) if err != nil { - log.Errorf("Update job (dbid: %d) failed at update Footprint step: %s", job.ID, err.Error()) + log.Errorf("Update job (dbid: %d) statement build failed at footprint step: %s", job.ID, err.Error()) ce++ continue } stmt, err = jobRepo.UpdateEnergy(stmt, jobMeta) if err != nil { - log.Errorf("Update job (dbid: %d) failed at update Energy step: %s", job.ID, err.Error()) + log.Errorf("update job (dbid: %d) statement build failed at energy step: %s", job.ID, err.Error()) ce++ continue } - // Add WHERE Filter stmt = stmt.Where("job.id = ?", job.ID) - query, args, err := stmt.ToSql() + pendingStatements = append(pendingStatements, stmt) + log.Debugf("Finish Job Preparation %d", job.JobID) + } + + t, err := jobRepo.TransactionInit() + if err != nil { + log.Errorf("Failed TransactionInit %v", err) + } + + for _, ps := range pendingStatements { + + query, args, err := ps.ToSql() if err != nil { log.Errorf("Failed in ToSQL conversion: %v", err) ce++ @@ -127,17 +136,12 @@ func RegisterFootprintWorker() { // Args: JSON, JSON, ENERGY, JOBID jobRepo.TransactionAdd(t, query, args...) - // if err := jobRepo.Execute(stmt); err != nil { - // log.Errorf("Update job footprint (dbid: %d) failed at db execute: %s", job.ID, err.Error()) - // continue - // } c++ - log.Debugf("Finish Job %d", job.JobID) } - jobRepo.TransactionCommit(t) + + jobRepo.TransactionEnd(t) log.Debugf("Finish Cluster %s", cluster.Name) } - jobRepo.TransactionEnd(t) log.Printf("Updating %d (of %d; Skipped %d) Footprints is done and took %s", c, cl, ce, time.Since(s)) })) } From 21b3a67988a3a7be48336b0dd28ca8059761f53d Mon Sep 17 00:00:00 2001 From: Christoph Kluge Date: Fri, 22 Nov 2024 13:13:43 +0100 Subject: [PATCH 02/16] add timers, add else case for transaction add --- .../taskManager/updateFootprintService.go | 24 +++++++++---------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/internal/taskManager/updateFootprintService.go b/internal/taskManager/updateFootprintService.go index 546d5a7..60ba988 100644 --- a/internal/taskManager/updateFootprintService.go +++ b/internal/taskManager/updateFootprintService.go @@ -38,6 +38,7 @@ func RegisterFootprintWorker() { log.Printf("Update Footprints started at %s", s.Format(time.RFC3339)) for _, cluster := range archive.Clusters { + s_cluster := time.Now() jobs, err := jobRepo.FindRunningJobs(cluster.Name) if err != nil { continue @@ -60,6 +61,8 @@ func RegisterFootprintWorker() { log.Debugf("Try job %d", job.JobID) cl++ + s_job := time.Now() + jobStats, err := repo.LoadStats(job, allMetrics, context.Background()) if err != nil { log.Errorf("Error wile loading job data stats for footprint update: %v", err) @@ -75,12 +78,6 @@ func RegisterFootprintWorker() { for metric, data := range jobStats { // Metric, Hostname:Stats avg, min, max := 0.0, math.MaxFloat32, -math.MaxFloat32 - // nodeData, ok := data["node"] - // if !ok { - // // This should never happen ? - // ce++ - // continue - // } for _, hostStats := range data { avg += hostStats.Avg @@ -117,7 +114,7 @@ func RegisterFootprintWorker() { stmt = stmt.Where("job.id = ?", job.ID) pendingStatements = append(pendingStatements, stmt) - log.Debugf("Finish Job Preparation %d", job.JobID) + log.Debugf("Finish Job Preparation %d, took %s", job.JobID, time.Since(s_job)) } t, err := jobRepo.TransactionInit() @@ -129,18 +126,19 @@ func RegisterFootprintWorker() { query, args, err := ps.ToSql() if err != nil { + log.Debugf(">>> Query: %v", query) + log.Debugf(">>> Args: %v", args) log.Errorf("Failed in ToSQL conversion: %v", err) ce++ - continue + } else { + // Args: JSON, JSON, ENERGY, JOBID + jobRepo.TransactionAdd(t, query, args...) + c++ } - - // Args: JSON, JSON, ENERGY, JOBID - jobRepo.TransactionAdd(t, query, args...) - c++ } jobRepo.TransactionEnd(t) - log.Debugf("Finish Cluster %s", cluster.Name) + log.Debugf("Finish Cluster %s, took %s", cluster.Name, time.Since(s_cluster)) } log.Printf("Updating %d (of %d; Skipped %d) Footprints is done and took %s", c, cl, ce, time.Since(s)) })) From 69f8a34aaccffda7675386e7846a047d8b52540d Mon Sep 17 00:00:00 2001 From: Christoph Kluge Date: Fri, 22 Nov 2024 13:36:26 +0100 Subject: [PATCH 03/16] more logging --- internal/taskManager/updateFootprintService.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/internal/taskManager/updateFootprintService.go b/internal/taskManager/updateFootprintService.go index 60ba988..fba0ec7 100644 --- a/internal/taskManager/updateFootprintService.go +++ b/internal/taskManager/updateFootprintService.go @@ -65,7 +65,7 @@ func RegisterFootprintWorker() { jobStats, err := repo.LoadStats(job, allMetrics, context.Background()) if err != nil { - log.Errorf("Error wile loading job data stats for footprint update: %v", err) + log.Errorf("error wile loading job data stats for footprint update: %v", err) ce++ continue } @@ -101,7 +101,7 @@ func RegisterFootprintWorker() { stmt := sq.Update("job") stmt, err = jobRepo.UpdateFootprint(stmt, jobMeta) if err != nil { - log.Errorf("Update job (dbid: %d) statement build failed at footprint step: %s", job.ID, err.Error()) + log.Errorf("update job (dbid: %d) statement build failed at footprint step: %s", job.ID, err.Error()) ce++ continue } @@ -114,24 +114,24 @@ func RegisterFootprintWorker() { stmt = stmt.Where("job.id = ?", job.ID) pendingStatements = append(pendingStatements, stmt) - log.Debugf("Finish Job Preparation %d, took %s", job.JobID, time.Since(s_job)) + log.Debugf("Job %d took %s", job.JobID, time.Since(s_job)) } + log.Debugf("Finish preparation for %d jobs: %d statements", len(jobs), len(pendingStatements)) t, err := jobRepo.TransactionInit() if err != nil { - log.Errorf("Failed TransactionInit %v", err) + log.Errorf("failed TransactionInit %v", err) } - for _, ps := range pendingStatements { + for idx, ps := range pendingStatements { query, args, err := ps.ToSql() if err != nil { - log.Debugf(">>> Query: %v", query) - log.Debugf(">>> Args: %v", args) - log.Errorf("Failed in ToSQL conversion: %v", err) + log.Errorf("failed in ToSQL conversion: %v", err) ce++ } else { // Args: JSON, JSON, ENERGY, JOBID + log.Infof("add transaction on index %d", idx) jobRepo.TransactionAdd(t, query, args...) c++ } From baa7367ebec344b19a76eaf5d831f50389109c73 Mon Sep 17 00:00:00 2001 From: Christoph Kluge Date: Fri, 22 Nov 2024 13:39:59 +0100 Subject: [PATCH 04/16] change array init to empty array --- internal/taskManager/updateFootprintService.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/taskManager/updateFootprintService.go b/internal/taskManager/updateFootprintService.go index fba0ec7..24c8a3e 100644 --- a/internal/taskManager/updateFootprintService.go +++ b/internal/taskManager/updateFootprintService.go @@ -55,7 +55,7 @@ func RegisterFootprintWorker() { continue } - pendingStatements := make([]sq.UpdateBuilder, len(jobs)) + pendingStatements := []sq.UpdateBuilder{} for _, job := range jobs { log.Debugf("Try job %d", job.JobID) From a8eff6fbd196a0452e4cde6762af39d2d03e2edd Mon Sep 17 00:00:00 2001 From: Christoph Kluge Date: Fri, 22 Nov 2024 15:08:53 +0100 Subject: [PATCH 05/16] small logging changes --- internal/taskManager/updateFootprintService.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/internal/taskManager/updateFootprintService.go b/internal/taskManager/updateFootprintService.go index 24c8a3e..b884ed6 100644 --- a/internal/taskManager/updateFootprintService.go +++ b/internal/taskManager/updateFootprintService.go @@ -51,14 +51,14 @@ func RegisterFootprintWorker() { repo, err := metricdata.GetMetricDataRepo(cluster.Name) if err != nil { - log.Warnf("no metric data repository configured for '%s'", cluster.Name) + log.Errorf("no metric data repository configured for '%s'", cluster.Name) continue } pendingStatements := []sq.UpdateBuilder{} for _, job := range jobs { - log.Debugf("Try job %d", job.JobID) + log.Debugf("Prepare job %d", job.JobID) cl++ s_job := time.Now() @@ -116,7 +116,6 @@ func RegisterFootprintWorker() { pendingStatements = append(pendingStatements, stmt) log.Debugf("Job %d took %s", job.JobID, time.Since(s_job)) } - log.Debugf("Finish preparation for %d jobs: %d statements", len(jobs), len(pendingStatements)) t, err := jobRepo.TransactionInit() if err != nil { @@ -131,7 +130,7 @@ func RegisterFootprintWorker() { ce++ } else { // Args: JSON, JSON, ENERGY, JOBID - log.Infof("add transaction on index %d", idx) + log.Debugf("add transaction on index %d", idx) jobRepo.TransactionAdd(t, query, args...) c++ } From 5f4a74f8bad56576f292b1bd343ba31ac6194010 Mon Sep 17 00:00:00 2001 From: Christoph Kluge Date: Fri, 22 Nov 2024 15:57:28 +0100 Subject: [PATCH 06/16] add check on returned stats --- internal/taskManager/updateFootprintService.go | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/internal/taskManager/updateFootprintService.go b/internal/taskManager/updateFootprintService.go index b884ed6..d542371 100644 --- a/internal/taskManager/updateFootprintService.go +++ b/internal/taskManager/updateFootprintService.go @@ -79,10 +79,16 @@ func RegisterFootprintWorker() { for metric, data := range jobStats { // Metric, Hostname:Stats avg, min, max := 0.0, math.MaxFloat32, -math.MaxFloat32 - for _, hostStats := range data { - avg += hostStats.Avg - min = math.Min(min, hostStats.Min) - max = math.Max(max, hostStats.Max) + for hostname := range data { + hostStats, ok := data[hostname] + if !ok { + log.Debugf("footprintWorker: NAN stats returned for job %d @ %s", job.JobID, hostname) + } else { + log.Debugf("stats returned for job %d : %#v", job.JobID, hostStats) + avg += hostStats.Avg + min = math.Min(min, hostStats.Min) + max = math.Max(max, hostStats.Max) + } } // Add values rounded to 2 digits From 00ddc462d20a5731e5de256aeff3d0e63143c0b5 Mon Sep 17 00:00:00 2001 From: Christoph Kluge Date: Fri, 22 Nov 2024 16:31:35 +0100 Subject: [PATCH 07/16] expand check, change to zero init --- .../taskManager/updateFootprintService.go | 24 ++++++++++--------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/internal/taskManager/updateFootprintService.go b/internal/taskManager/updateFootprintService.go index d542371..d131dc5 100644 --- a/internal/taskManager/updateFootprintService.go +++ b/internal/taskManager/updateFootprintService.go @@ -76,19 +76,21 @@ func RegisterFootprintWorker() { Statistics: make(map[string]schema.JobStatistics), } - for metric, data := range jobStats { // Metric, Hostname:Stats - avg, min, max := 0.0, math.MaxFloat32, -math.MaxFloat32 + for metric := range jobStats { // Metric, Hostname:Stats + avg, min, max := 0.0, 0.0, 0.0 // math.MaxFloat32, -math.MaxFloat32 - for hostname := range data { - hostStats, ok := data[hostname] - if !ok { - log.Debugf("footprintWorker: NAN stats returned for job %d @ %s", job.JobID, hostname) - } else { - log.Debugf("stats returned for job %d : %#v", job.JobID, hostStats) - avg += hostStats.Avg - min = math.Min(min, hostStats.Min) - max = math.Max(max, hostStats.Max) + data, ok := jobStats[metric] + if ok { + for hostname := range data { + hostStats, ok := data[hostname] + if ok { + avg += hostStats.Avg + min = math.Min(min, hostStats.Min) + max = math.Max(max, hostStats.Max) + } } + } else { + log.Debugf("no stats data return for job %d, metric %s", job.JobID, metric) } // Add values rounded to 2 digits From 93d5a0e532c1a5399e132a7fd66c5ded6533c11c Mon Sep 17 00:00:00 2001 From: Christoph Kluge Date: Fri, 22 Nov 2024 16:59:18 +0100 Subject: [PATCH 08/16] correct input for check --- internal/taskManager/updateFootprintService.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/internal/taskManager/updateFootprintService.go b/internal/taskManager/updateFootprintService.go index d131dc5..2a1e590 100644 --- a/internal/taskManager/updateFootprintService.go +++ b/internal/taskManager/updateFootprintService.go @@ -76,17 +76,18 @@ func RegisterFootprintWorker() { Statistics: make(map[string]schema.JobStatistics), } - for metric := range jobStats { // Metric, Hostname:Stats + for _, metric := range allMetrics { avg, min, max := 0.0, 0.0, 0.0 // math.MaxFloat32, -math.MaxFloat32 - - data, ok := jobStats[metric] + data, ok := jobStats[metric] // Metric:[Hostname:Stats] if ok { - for hostname := range data { - hostStats, ok := data[hostname] + for _, res := range job.Resources { + hostStats, ok := data[res.Hostname] if ok { avg += hostStats.Avg min = math.Min(min, hostStats.Min) max = math.Max(max, hostStats.Max) + } else { + log.Debugf("no stats data return for host %s in job %d, metric %s", res.Hostname, job.JobID, metric) } } } else { From d4f487d5546c3a0cac4d62fe10785cf91f134f36 Mon Sep 17 00:00:00 2001 From: Christoph Kluge Date: Fri, 22 Nov 2024 17:56:55 +0100 Subject: [PATCH 09/16] comment debug logging --- internal/taskManager/updateFootprintService.go | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/internal/taskManager/updateFootprintService.go b/internal/taskManager/updateFootprintService.go index 2a1e590..580e338 100644 --- a/internal/taskManager/updateFootprintService.go +++ b/internal/taskManager/updateFootprintService.go @@ -77,8 +77,8 @@ func RegisterFootprintWorker() { } for _, metric := range allMetrics { - avg, min, max := 0.0, 0.0, 0.0 // math.MaxFloat32, -math.MaxFloat32 - data, ok := jobStats[metric] // Metric:[Hostname:Stats] + avg, min, max := 0.0, 0.0, 0.0 + data, ok := jobStats[metric] // Metric:[Hostname:Stats] if ok { for _, res := range job.Resources { hostStats, ok := data[res.Hostname] @@ -86,13 +86,15 @@ func RegisterFootprintWorker() { avg += hostStats.Avg min = math.Min(min, hostStats.Min) max = math.Max(max, hostStats.Max) - } else { - log.Debugf("no stats data return for host %s in job %d, metric %s", res.Hostname, job.JobID, metric) } + // else { + // log.Debugf("no stats data return for host %s in job %d, metric %s", res.Hostname, job.JobID, metric) + // } } - } else { - log.Debugf("no stats data return for job %d, metric %s", job.JobID, metric) } + // else { + // log.Debugf("no stats data return for job %d, metric %s", job.JobID, metric) + // } // Add values rounded to 2 digits jobMeta.Statistics[metric] = schema.JobStatistics{ From 81b8d578f2ead1df8d55fa1c4a19f4544a1883ef Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Mon, 25 Nov 2024 16:44:50 +0100 Subject: [PATCH 10/16] feat: Add buffered channel with worker thread for job start API Fixes #293 Refactoring on the way --- cmd/cc-backend/server.go | 4 ++ internal/api/rest.go | 72 +++------------------------ internal/graph/schema.resolvers.go | 22 ++++---- internal/repository/dbConnection.go | 2 + internal/repository/jobFind.go | 17 +++++++ internal/repository/jobQuery.go | 9 +++- internal/repository/jobStartWorker.go | 70 ++++++++++++++++++++++++++ internal/repository/job_test.go | 2 +- internal/repository/stats.go | 10 ++-- internal/repository/tags.go | 32 ++++++------ internal/routerConfig/routes.go | 2 +- pkg/schema/cluster.go | 6 +-- pkg/schema/config.go | 6 +-- pkg/schema/metrics.go | 14 +++--- pkg/schema/user.go | 4 +- 15 files changed, 156 insertions(+), 116 deletions(-) create mode 100644 internal/repository/jobStartWorker.go diff --git a/cmd/cc-backend/server.go b/cmd/cc-backend/server.go index 3c6fa55..fc620c8 100644 --- a/cmd/cc-backend/server.go +++ b/cmd/cc-backend/server.go @@ -25,6 +25,7 @@ import ( "github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-backend/internal/graph" "github.com/ClusterCockpit/cc-backend/internal/graph/generated" + "github.com/ClusterCockpit/cc-backend/internal/repository" "github.com/ClusterCockpit/cc-backend/internal/routerConfig" "github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/runtimeEnv" @@ -316,6 +317,9 @@ func serverShutdown() { // First shut down the server gracefully (waiting for all ongoing requests) server.Shutdown(context.Background()) + // Then, wait for any async jobStarts still pending... + repository.WaitForJobStart() + // Then, wait for any async archivings still pending... archiver.WaitForArchiving() } diff --git a/internal/api/rest.go b/internal/api/rest.go index 369faf4..b60521b 100644 --- a/internal/api/rest.go +++ b/internal/api/rest.go @@ -72,7 +72,6 @@ func (api *RestApi) MountApiRoutes(r *mux.Router) { r.HandleFunc("/jobs/start_job/", api.startJob).Methods(http.MethodPost, http.MethodPut) r.HandleFunc("/jobs/stop_job/", api.stopJobByRequest).Methods(http.MethodPost, http.MethodPut) - r.HandleFunc("/jobs/stop_job/{id}", api.stopJobById).Methods(http.MethodPost, http.MethodPut) // r.HandleFunc("/jobs/import/", api.importJob).Methods(http.MethodPost, http.MethodPut) r.HandleFunc("/jobs/", api.getJobs).Methods(http.MethodGet) @@ -421,7 +420,7 @@ func (api *RestApi) getJobs(rw http.ResponseWriter, r *http.Request) { StartTime: job.StartTime.Unix(), } - res.Tags, err = api.JobRepository.GetTags(r.Context(), &job.ID) + res.Tags, err = api.JobRepository.GetTags(repository.GetUserFromContext(r.Context()), &job.ID) if err != nil { handleError(err, http.StatusInternalServerError, rw) return @@ -494,7 +493,7 @@ func (api *RestApi) getCompleteJobById(rw http.ResponseWriter, r *http.Request) return } - job.Tags, err = api.JobRepository.GetTags(r.Context(), &job.ID) + job.Tags, err = api.JobRepository.GetTags(repository.GetUserFromContext(r.Context()), &job.ID) if err != nil { handleError(err, http.StatusInternalServerError, rw) return @@ -587,7 +586,7 @@ func (api *RestApi) getJobById(rw http.ResponseWriter, r *http.Request) { return } - job.Tags, err = api.JobRepository.GetTags(r.Context(), &job.ID) + job.Tags, err = api.JobRepository.GetTags(repository.GetUserFromContext(r.Context()), &job.ID) if err != nil { handleError(err, http.StatusInternalServerError, rw) return @@ -728,7 +727,7 @@ func (api *RestApi) tagJob(rw http.ResponseWriter, r *http.Request) { return } - job.Tags, err = api.JobRepository.GetTags(r.Context(), &job.ID) + job.Tags, err = api.JobRepository.GetTags(repository.GetUserFromContext(r.Context()), &job.ID) if err != nil { http.Error(rw, err.Error(), http.StatusInternalServerError) return @@ -741,7 +740,7 @@ func (api *RestApi) tagJob(rw http.ResponseWriter, r *http.Request) { } for _, tag := range req { - tagId, err := api.JobRepository.AddTagOrCreate(r.Context(), job.ID, tag.Type, tag.Name, tag.Scope) + tagId, err := api.JobRepository.AddTagOrCreate(repository.GetUserFromContext(r.Context()), job.ID, tag.Type, tag.Name, tag.Scope) if err != nil { http.Error(rw, err.Error(), http.StatusInternalServerError) return @@ -791,11 +790,6 @@ func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) { return } - // aquire lock to avoid race condition between API calls - var unlockOnce sync.Once - api.RepositoryMutex.Lock() - defer unlockOnce.Do(api.RepositoryMutex.Unlock) - // Check if combination of (job_id, cluster_id, start_time) already exists: jobs, err := api.JobRepository.FindAll(&req.JobID, &req.Cluster, nil) if err != nil && err != sql.ErrNoRows { @@ -810,16 +804,16 @@ func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) { } } + repository.TriggerJobStart(repository.JobWithUser{Job: &req, User: repository.GetUserFromContext(r.Context())}) + id, err := api.JobRepository.Start(&req) if err != nil { handleError(fmt.Errorf("insert into database failed: %w", err), http.StatusInternalServerError, rw) return } - // unlock here, adding Tags can be async - unlockOnce.Do(api.RepositoryMutex.Unlock) for _, tag := range req.Tags { - if _, err := api.JobRepository.AddTagOrCreate(r.Context(), id, tag.Type, tag.Name, tag.Scope); err != nil { + if _, err := api.JobRepository.AddTagOrCreate(repository.GetUserFromContext(r.Context()), id, tag.Type, tag.Name, tag.Scope); err != nil { http.Error(rw, err.Error(), http.StatusInternalServerError) handleError(fmt.Errorf("adding tag to new job %d failed: %w", id, err), http.StatusInternalServerError, rw) return @@ -834,56 +828,6 @@ func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) { }) } -// stopJobById godoc -// @summary Marks job as completed and triggers archiving -// @tags Job add and modify -// @description Job to stop is specified by database ID. Only stopTime and final state are required in request body. -// @description Returns full job resource information according to 'JobMeta' scheme. -// @accept json -// @produce json -// @param id path int true "Database ID of Job" -// @param request body api.StopJobApiRequest true "stopTime and final state in request body" -// @success 200 {object} schema.JobMeta "Job resource" -// @failure 400 {object} api.ErrorResponse "Bad Request" -// @failure 401 {object} api.ErrorResponse "Unauthorized" -// @failure 403 {object} api.ErrorResponse "Forbidden" -// @failure 404 {object} api.ErrorResponse "Resource not found" -// @failure 422 {object} api.ErrorResponse "Unprocessable Entity: finding job failed: sql: no rows in result set" -// @failure 500 {object} api.ErrorResponse "Internal Server Error" -// @security ApiKeyAuth -// @router /jobs/stop_job/{id} [post] -func (api *RestApi) stopJobById(rw http.ResponseWriter, r *http.Request) { - // Parse request body: Only StopTime and State - req := StopJobApiRequest{} - if err := decode(r.Body, &req); err != nil { - handleError(fmt.Errorf("parsing request body failed: %w", err), http.StatusBadRequest, rw) - return - } - - // Fetch job (that will be stopped) from db - id, ok := mux.Vars(r)["id"] - var job *schema.Job - var err error - if ok { - id, e := strconv.ParseInt(id, 10, 64) - if e != nil { - handleError(fmt.Errorf("integer expected in path for id: %w", e), http.StatusBadRequest, rw) - return - } - - job, err = api.JobRepository.FindById(r.Context(), id) - } else { - handleError(errors.New("the parameter 'id' is required"), http.StatusBadRequest, rw) - return - } - if err != nil { - handleError(fmt.Errorf("finding job failed: %w", err), http.StatusUnprocessableEntity, rw) - return - } - - api.checkAndHandleStopJob(rw, job, req) -} - // stopJobByRequest godoc // @summary Marks job as completed and triggers archiving // @tags Job add and modify diff --git a/internal/graph/schema.resolvers.go b/internal/graph/schema.resolvers.go index 58d664b..9fd7260 100644 --- a/internal/graph/schema.resolvers.go +++ b/internal/graph/schema.resolvers.go @@ -31,7 +31,7 @@ func (r *clusterResolver) Partitions(ctx context.Context, obj *schema.Cluster) ( // Tags is the resolver for the tags field. func (r *jobResolver) Tags(ctx context.Context, obj *schema.Job) ([]*schema.Tag, error) { - return r.Repo.GetTags(ctx, &obj.ID) + return r.Repo.GetTags(repository.GetUserFromContext(ctx), &obj.ID) } // ConcurrentJobs is the resolver for the concurrentJobs field. @@ -159,7 +159,7 @@ func (r *mutationResolver) AddTagsToJob(ctx context.Context, job string, tagIds return nil, err } - if tags, err = r.Repo.AddTag(ctx, jid, tid); err != nil { + if tags, err = r.Repo.AddTag(repository.GetUserFromContext(ctx), jid, tid); err != nil { log.Warn("Error while adding tag") return nil, err } @@ -185,7 +185,7 @@ func (r *mutationResolver) RemoveTagsFromJob(ctx context.Context, job string, ta return nil, err } - if tags, err = r.Repo.RemoveTag(ctx, jid, tid); err != nil { + if tags, err = r.Repo.RemoveTag(repository.GetUserFromContext(ctx), jid, tid); err != nil { log.Warn("Error while removing tag") return nil, err } @@ -211,7 +211,7 @@ func (r *queryResolver) Clusters(ctx context.Context) ([]*schema.Cluster, error) // Tags is the resolver for the tags field. func (r *queryResolver) Tags(ctx context.Context) ([]*schema.Tag, error) { - return r.Repo.GetTags(ctx, nil) + return r.Repo.GetTags(repository.GetUserFromContext(ctx), nil) } // GlobalMetrics is the resolver for the globalMetrics field. @@ -493,9 +493,11 @@ func (r *Resolver) Query() generated.QueryResolver { return &queryResolver{r} } // SubCluster returns generated.SubClusterResolver implementation. func (r *Resolver) SubCluster() generated.SubClusterResolver { return &subClusterResolver{r} } -type clusterResolver struct{ *Resolver } -type jobResolver struct{ *Resolver } -type metricValueResolver struct{ *Resolver } -type mutationResolver struct{ *Resolver } -type queryResolver struct{ *Resolver } -type subClusterResolver struct{ *Resolver } +type ( + clusterResolver struct{ *Resolver } + jobResolver struct{ *Resolver } + metricValueResolver struct{ *Resolver } + mutationResolver struct{ *Resolver } + queryResolver struct{ *Resolver } + subClusterResolver struct{ *Resolver } +) diff --git a/internal/repository/dbConnection.go b/internal/repository/dbConnection.go index 418eef9..d062052 100644 --- a/internal/repository/dbConnection.go +++ b/internal/repository/dbConnection.go @@ -82,6 +82,8 @@ func Connect(driver string, db string) { if err != nil { log.Fatal(err) } + + startJobStartWorker() }) } diff --git a/internal/repository/jobFind.go b/internal/repository/jobFind.go index ff5a936..0354df0 100644 --- a/internal/repository/jobFind.go +++ b/internal/repository/jobFind.go @@ -99,6 +99,23 @@ func (r *JobRepository) FindById(ctx context.Context, jobId int64) (*schema.Job, return scanJob(q.RunWith(r.stmtCache).QueryRow()) } +// FindByIdWithUser executes a SQL query to find a specific batch job. +// The job is queried using the database id. The user is passed directly, +// instead as part of the context. +// It returns a pointer to a schema.Job data structure and an error variable. +// To check if no job was found test err == sql.ErrNoRows +func (r *JobRepository) FindByIdWithUser(user *schema.User, jobId int64) (*schema.Job, error) { + q := sq.Select(jobColumns...). + From("job").Where("job.id = ?", jobId) + + q, qerr := SecurityCheckWithUser(user, q) + if qerr != nil { + return nil, qerr + } + + return scanJob(q.RunWith(r.stmtCache).QueryRow()) +} + // FindByIdDirect executes a SQL query to find a specific batch job. // The job is queried using the database id. // It returns a pointer to a schema.Job data structure and an error variable. diff --git a/internal/repository/jobQuery.go b/internal/repository/jobQuery.go index c9812a3..0ab2ea2 100644 --- a/internal/repository/jobQuery.go +++ b/internal/repository/jobQuery.go @@ -107,8 +107,7 @@ func (r *JobRepository) CountJobs( return count, nil } -func SecurityCheck(ctx context.Context, query sq.SelectBuilder) (sq.SelectBuilder, error) { - user := GetUserFromContext(ctx) +func SecurityCheckWithUser(user *schema.User, query sq.SelectBuilder) (sq.SelectBuilder, error) { if user == nil { var qnil sq.SelectBuilder return qnil, fmt.Errorf("user context is nil") @@ -134,6 +133,12 @@ func SecurityCheck(ctx context.Context, query sq.SelectBuilder) (sq.SelectBuilde } } +func SecurityCheck(ctx context.Context, query sq.SelectBuilder) (sq.SelectBuilder, error) { + user := GetUserFromContext(ctx) + + return SecurityCheckWithUser(user, query) +} + // Build a sq.SelectBuilder out of a schema.JobFilter. func BuildWhereClause(filter *model.JobFilter, query sq.SelectBuilder) sq.SelectBuilder { if filter.Tags != nil { diff --git a/internal/repository/jobStartWorker.go b/internal/repository/jobStartWorker.go new file mode 100644 index 0000000..dbd2247 --- /dev/null +++ b/internal/repository/jobStartWorker.go @@ -0,0 +1,70 @@ +// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. +package repository + +import ( + "sync" + + "github.com/ClusterCockpit/cc-backend/pkg/log" + "github.com/ClusterCockpit/cc-backend/pkg/schema" +) + +type JobWithUser struct { + Job *schema.JobMeta + User *schema.User +} + +var ( + jobStartPending sync.WaitGroup + jobStartChannel chan JobWithUser +) + +func startJobStartWorker() { + jobStartChannel = make(chan JobWithUser, 128) + + go jobStartWorker() +} + +// Archiving worker thread +func jobStartWorker() { + for { + select { + case req, ok := <-jobStartChannel: + if !ok { + break + } + jobRepo := GetJobRepository() + + id, err := jobRepo.Start(req.Job) + if err != nil { + log.Errorf("insert into database failed: %v", err) + } + + for _, tag := range req.Job.Tags { + if _, err := jobRepo.AddTagOrCreate(req.User, id, tag.Type, tag.Name, tag.Scope); err != nil { + log.Errorf("adding tag to new job %d failed: %v", id, err) + } + } + + jobStartPending.Done() + } + } +} + +// Trigger async archiving +func TriggerJobStart(req JobWithUser) { + if jobStartChannel == nil { + log.Fatal("Cannot start Job without jobStart channel. Did you Start the worker?") + } + + jobStartPending.Add(1) + jobStartChannel <- req +} + +// Wait for background thread to finish pending archiving operations +func WaitForJobStart() { + // close channel and wait for worker to process remaining jobs + jobStartPending.Wait() +} diff --git a/internal/repository/job_test.go b/internal/repository/job_test.go index f7b3783..363bb6c 100644 --- a/internal/repository/job_test.go +++ b/internal/repository/job_test.go @@ -59,7 +59,7 @@ func TestGetTags(t *testing.T) { ctx := context.WithValue(getContext(t), contextUserKey, contextUserValue) // Test Tag has Scope "global" - tags, counts, err := r.CountTags(ctx) + tags, counts, err := r.CountTags(GetUserFromContext(ctx)) if err != nil { t.Fatal(err) } diff --git a/internal/repository/stats.go b/internal/repository/stats.go index ea195b8..484851d 100644 --- a/internal/repository/stats.go +++ b/internal/repository/stats.go @@ -560,9 +560,9 @@ func (r *JobRepository) jobsMetricStatisticsHistogram( ) (*model.MetricHistoPoints, error) { // Get specific Peak or largest Peak var metricConfig *schema.MetricConfig - var peak float64 = 0.0 - var unit string = "" - var footprintStat string = "" + var peak float64 + var unit string + var footprintStat string for _, f := range filters { if f.Cluster != nil { @@ -712,8 +712,8 @@ func (r *JobRepository) runningJobsMetricStatisticsHistogram( for idx, metric := range metrics { // Get specific Peak or largest Peak var metricConfig *schema.MetricConfig - var peak float64 = 0.0 - var unit string = "" + var peak float64 + var unit string for _, f := range filters { if f.Cluster != nil { diff --git a/internal/repository/tags.go b/internal/repository/tags.go index 6239495..8120364 100644 --- a/internal/repository/tags.go +++ b/internal/repository/tags.go @@ -5,7 +5,6 @@ package repository import ( - "context" "fmt" "strings" @@ -16,8 +15,8 @@ import ( ) // Add the tag with id `tagId` to the job with the database id `jobId`. -func (r *JobRepository) AddTag(ctx context.Context, job int64, tag int64) ([]*schema.Tag, error) { - j, err := r.FindById(ctx, job) +func (r *JobRepository) AddTag(user *schema.User, job int64, tag int64) ([]*schema.Tag, error) { + j, err := r.FindByIdWithUser(user, job) if err != nil { log.Warn("Error while finding job by id") return nil, err @@ -31,7 +30,7 @@ func (r *JobRepository) AddTag(ctx context.Context, job int64, tag int64) ([]*sc return nil, err } - tags, err := r.GetTags(ctx, &job) + tags, err := r.GetTags(user, &job) if err != nil { log.Warn("Error while getting tags for job") return nil, err @@ -47,8 +46,8 @@ func (r *JobRepository) AddTag(ctx context.Context, job int64, tag int64) ([]*sc } // Removes a tag from a job -func (r *JobRepository) RemoveTag(ctx context.Context, job, tag int64) ([]*schema.Tag, error) { - j, err := r.FindById(ctx, job) +func (r *JobRepository) RemoveTag(user *schema.User, job, tag int64) ([]*schema.Tag, error) { + j, err := r.FindByIdWithUser(user, job) if err != nil { log.Warn("Error while finding job by id") return nil, err @@ -62,7 +61,7 @@ func (r *JobRepository) RemoveTag(ctx context.Context, job, tag int64) ([]*schem return nil, err } - tags, err := r.GetTags(ctx, &job) + tags, err := r.GetTags(user, &job) if err != nil { log.Warn("Error while getting tags for job") return nil, err @@ -96,7 +95,7 @@ func (r *JobRepository) CreateTag(tagType string, tagName string, tagScope strin return res.LastInsertId() } -func (r *JobRepository) CountTags(ctx context.Context) (tags []schema.Tag, counts map[string]int, err error) { +func (r *JobRepository) CountTags(user *schema.User) (tags []schema.Tag, counts map[string]int, err error) { // Fetch all Tags in DB for Display in Frontend Tag-View tags = make([]schema.Tag, 0, 100) xrows, err := r.DB.Queryx("SELECT id, tag_type, tag_name, tag_scope FROM tag") @@ -111,7 +110,7 @@ func (r *JobRepository) CountTags(ctx context.Context) (tags []schema.Tag, count } // Handle Scope Filtering: Tag Scope is Global, Private (== Username) or User is auth'd to view Admin Tags - readable, err := r.checkScopeAuth(ctx, "read", t.Scope) + readable, err := r.checkScopeAuth(user, "read", t.Scope) if err != nil { return nil, nil, err } @@ -120,8 +119,6 @@ func (r *JobRepository) CountTags(ctx context.Context) (tags []schema.Tag, count } } - user := GetUserFromContext(ctx) - // Query and Count Jobs with attached Tags q := sq.Select("t.tag_name, t.id, count(jt.tag_id)"). From("tag t"). @@ -172,13 +169,13 @@ func (r *JobRepository) CountTags(ctx context.Context) (tags []schema.Tag, count // AddTagOrCreate adds the tag with the specified type and name to the job with the database id `jobId`. // If such a tag does not yet exist, it is created. -func (r *JobRepository) AddTagOrCreate(ctx context.Context, jobId int64, tagType string, tagName string, tagScope string) (tagId int64, err error) { +func (r *JobRepository) AddTagOrCreate(user *schema.User, jobId int64, tagType string, tagName string, tagScope string) (tagId int64, err error) { // Default to "Global" scope if none defined if tagScope == "" { tagScope = "global" } - writable, err := r.checkScopeAuth(ctx, "write", tagScope) + writable, err := r.checkScopeAuth(user, "write", tagScope) if err != nil { return 0, err } @@ -194,7 +191,7 @@ func (r *JobRepository) AddTagOrCreate(ctx context.Context, jobId int64, tagType } } - if _, err := r.AddTag(ctx, jobId, tagId); err != nil { + if _, err := r.AddTag(user, jobId, tagId); err != nil { return 0, err } @@ -213,7 +210,7 @@ func (r *JobRepository) TagId(tagType string, tagName string, tagScope string) ( } // GetTags returns a list of all scoped tags if job is nil or of the tags that the job with that database ID has. -func (r *JobRepository) GetTags(ctx context.Context, job *int64) ([]*schema.Tag, error) { +func (r *JobRepository) GetTags(user *schema.User, job *int64) ([]*schema.Tag, error) { q := sq.Select("id", "tag_type", "tag_name", "tag_scope").From("tag") if job != nil { q = q.Join("jobtag ON jobtag.tag_id = tag.id").Where("jobtag.job_id = ?", *job) @@ -234,7 +231,7 @@ func (r *JobRepository) GetTags(ctx context.Context, job *int64) ([]*schema.Tag, return nil, err } // Handle Scope Filtering: Tag Scope is Global, Private (== Username) or User is auth'd to view Admin Tags - readable, err := r.checkScopeAuth(ctx, "read", tag.Scope) + readable, err := r.checkScopeAuth(user, "read", tag.Scope) if err != nil { return nil, err } @@ -295,8 +292,7 @@ func (r *JobRepository) ImportTag(jobId int64, tagType string, tagName string, t return nil } -func (r *JobRepository) checkScopeAuth(ctx context.Context, operation string, scope string) (pass bool, err error) { - user := GetUserFromContext(ctx) +func (r *JobRepository) checkScopeAuth(user *schema.User, operation string, scope string) (pass bool, err error) { if user != nil { switch { case operation == "write" && scope == "admin": diff --git a/internal/routerConfig/routes.go b/internal/routerConfig/routes.go index abb7793..1e2fe73 100644 --- a/internal/routerConfig/routes.go +++ b/internal/routerConfig/routes.go @@ -132,7 +132,7 @@ func setupAnalysisRoute(i InfoType, r *http.Request) InfoType { func setupTaglistRoute(i InfoType, r *http.Request) InfoType { jobRepo := repository.GetJobRepository() - tags, counts, err := jobRepo.CountTags(r.Context()) + tags, counts, err := jobRepo.CountTags(repository.GetUserFromContext(r.Context())) tagMap := make(map[string][]map[string]interface{}) if err != nil { log.Warnf("GetTags failed: %s", err.Error()) diff --git a/pkg/schema/cluster.go b/pkg/schema/cluster.go index b9bf306..0c88c61 100644 --- a/pkg/schema/cluster.go +++ b/pkg/schema/cluster.go @@ -48,29 +48,29 @@ type SubCluster struct { type SubClusterConfig struct { Name string `json:"name"` Footprint string `json:"footprint,omitempty"` + Energy string `json:"energy"` Peak float64 `json:"peak"` Normal float64 `json:"normal"` Caution float64 `json:"caution"` Alert float64 `json:"alert"` Remove bool `json:"remove"` LowerIsBetter bool `json:"lowerIsBetter"` - Energy string `json:"energy"` } type MetricConfig struct { Unit Unit `json:"unit"` + Energy string `json:"energy"` Name string `json:"name"` Scope MetricScope `json:"scope"` Aggregation string `json:"aggregation"` Footprint string `json:"footprint,omitempty"` SubClusters []*SubClusterConfig `json:"subClusters,omitempty"` Peak float64 `json:"peak"` - Normal float64 `json:"normal"` Caution float64 `json:"caution"` Alert float64 `json:"alert"` Timestep int `json:"timestep"` + Normal float64 `json:"normal"` LowerIsBetter bool `json:"lowerIsBetter"` - Energy string `json:"energy"` } type Cluster struct { diff --git a/pkg/schema/config.go b/pkg/schema/config.go index b87841c..a7abefe 100644 --- a/pkg/schema/config.go +++ b/pkg/schema/config.go @@ -57,9 +57,9 @@ type IntRange struct { } type TimeRange struct { - Range string `json:"range,omitempty"` // Optional, e.g. 'last6h' From *time.Time `json:"from"` To *time.Time `json:"to"` + Range string `json:"range,omitempty"` } type FilterRanges struct { @@ -82,10 +82,10 @@ type Retention struct { } type ResampleConfig struct { - // Trigger next zoom level at less than this many visible datapoints - Trigger int `json:"trigger"` // Array of resampling target resolutions, in seconds; Example: [600,300,60] Resolutions []int `json:"resolutions"` + // Trigger next zoom level at less than this many visible datapoints + Trigger int `json:"trigger"` } type CronFrequency struct { diff --git a/pkg/schema/metrics.go b/pkg/schema/metrics.go index 9db853d..bbc3c74 100644 --- a/pkg/schema/metrics.go +++ b/pkg/schema/metrics.go @@ -17,17 +17,17 @@ import ( type JobData map[string]map[MetricScope]*JobMetric type JobMetric struct { - Unit Unit `json:"unit"` - Timestep int `json:"timestep"` - Series []Series `json:"series"` StatisticsSeries *StatsSeries `json:"statisticsSeries,omitempty"` + Unit Unit `json:"unit"` + Series []Series `json:"series"` + Timestep int `json:"timestep"` } type Series struct { - Hostname string `json:"hostname"` Id *string `json:"id,omitempty"` - Statistics MetricStatistics `json:"statistics"` + Hostname string `json:"hostname"` Data []Float `json:"data"` + Statistics MetricStatistics `json:"statistics"` } type MetricStatistics struct { @@ -37,11 +37,11 @@ type MetricStatistics struct { } type StatsSeries struct { + Percentiles map[int][]Float `json:"percentiles,omitempty"` Mean []Float `json:"mean"` Median []Float `json:"median"` Min []Float `json:"min"` Max []Float `json:"max"` - Percentiles map[int][]Float `json:"percentiles,omitempty"` } type MetricScope string @@ -229,7 +229,7 @@ func (jd *JobData) AddNodeScope(metric string) bool { return false } - var maxScope MetricScope = MetricScopeInvalid + maxScope := MetricScopeInvalid for scope := range scopes { maxScope = maxScope.Max(scope) } diff --git a/pkg/schema/user.go b/pkg/schema/user.go index 7b1ca13..c004254 100644 --- a/pkg/schema/user.go +++ b/pkg/schema/user.go @@ -42,11 +42,11 @@ type User struct { Username string `json:"username"` Password string `json:"-"` Name string `json:"name"` + Email string `json:"email"` Roles []string `json:"roles"` + Projects []string `json:"projects"` AuthType AuthType `json:"authType"` AuthSource AuthSource `json:"authSource"` - Email string `json:"email"` - Projects []string `json:"projects"` } func (u *User) HasProject(project string) bool { From 8ea1454c066b5c69fb251f0a9bbe0afda71432dd Mon Sep 17 00:00:00 2001 From: Christoph Kluge Date: Mon, 25 Nov 2024 17:03:59 +0100 Subject: [PATCH 11/16] improve transaction init error handling --- .../taskManager/updateFootprintService.go | 41 +++++++++---------- 1 file changed, 19 insertions(+), 22 deletions(-) diff --git a/internal/taskManager/updateFootprintService.go b/internal/taskManager/updateFootprintService.go index 580e338..59c1b12 100644 --- a/internal/taskManager/updateFootprintService.go +++ b/internal/taskManager/updateFootprintService.go @@ -43,6 +43,9 @@ func RegisterFootprintWorker() { if err != nil { continue } + // NOTE: Additional Subcluster Loop Could Allow For Limited List Of (Energy)Footprint-Metrics Only. + // - Chunk-Size Would Then Be 'SubCluster' (Running Jobs, Transactions) as Lists Can Change Within SCs + // - Would Require Review of 'updateFootprint' And 'updateEnergy' Usage allMetrics := make([]string, 0) metricConfigs := archive.GetCluster(cluster.Name).MetricConfig for _, mc := range metricConfigs { @@ -78,7 +81,7 @@ func RegisterFootprintWorker() { for _, metric := range allMetrics { avg, min, max := 0.0, 0.0, 0.0 - data, ok := jobStats[metric] // Metric:[Hostname:Stats] + data, ok := jobStats[metric] // JobStats[Metric1:[Hostname1:[Stats], Hostname2:[Stats], ...], Metric2[...] ...] if ok { for _, res := range job.Resources { hostStats, ok := data[res.Hostname] @@ -87,14 +90,9 @@ func RegisterFootprintWorker() { min = math.Min(min, hostStats.Min) max = math.Max(max, hostStats.Max) } - // else { - // log.Debugf("no stats data return for host %s in job %d, metric %s", res.Hostname, job.JobID, metric) - // } + } } - // else { - // log.Debugf("no stats data return for job %d, metric %s", job.JobID, metric) - // } // Add values rounded to 2 digits jobMeta.Statistics[metric] = schema.JobStatistics{ @@ -131,23 +129,22 @@ func RegisterFootprintWorker() { t, err := jobRepo.TransactionInit() if err != nil { log.Errorf("failed TransactionInit %v", err) - } - - for idx, ps := range pendingStatements { - - query, args, err := ps.ToSql() - if err != nil { - log.Errorf("failed in ToSQL conversion: %v", err) - ce++ - } else { - // Args: JSON, JSON, ENERGY, JOBID - log.Debugf("add transaction on index %d", idx) - jobRepo.TransactionAdd(t, query, args...) - c++ + log.Errorf("skipped %d transactions for cluster %s", len(pendingStatements), cluster.Name) + ce += len(pendingStatements) + } else { + for _, ps := range pendingStatements { + query, args, err := ps.ToSql() + if err != nil { + log.Errorf("failed in ToSQL conversion: %v", err) + ce++ + } else { + // args...: Footprint-JSON, Energyfootprint-JSON, TotalEnergy, JobID + jobRepo.TransactionAdd(t, query, args...) + c++ + } } + jobRepo.TransactionEnd(t) } - - jobRepo.TransactionEnd(t) log.Debugf("Finish Cluster %s, took %s", cluster.Name, time.Since(s_cluster)) } log.Printf("Updating %d (of %d; Skipped %d) Footprints is done and took %s", c, cl, ce, time.Since(s)) From adb11b3ed070a26b28065bb98bc6c825c68f7eae Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Mon, 25 Nov 2024 17:35:22 +0100 Subject: [PATCH 12/16] Re-enable Footprint worker --- internal/taskManager/taskManager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/taskManager/taskManager.go b/internal/taskManager/taskManager.go index 4dbe4ad..101fc4a 100644 --- a/internal/taskManager/taskManager.go +++ b/internal/taskManager/taskManager.go @@ -79,7 +79,7 @@ func Start() { RegisterLdapSyncService(lc.SyncInterval) } - // RegisterFootprintWorker() + RegisterFootprintWorker() RegisterUpdateDurationWorker() s.Start() From 28539e60b06d12aab988dad818382850b4e9cd30 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Tue, 26 Nov 2024 07:02:53 +0100 Subject: [PATCH 13/16] Regenerate Swagger, fix tests, cleanup --- api/swagger.json | 239 +++++++++----------------- api/swagger.yaml | 194 +++++++-------------- internal/api/api_test.go | 23 ++- internal/api/docs.go | 239 +++++++++----------------- internal/api/rest.go | 20 +-- internal/repository/jobStartWorker.go | 21 ++- 6 files changed, 254 insertions(+), 482 deletions(-) diff --git a/api/swagger.json b/api/swagger.json index 7f5eaf7..3b59b5e 100644 --- a/api/swagger.json +++ b/api/swagger.json @@ -595,88 +595,6 @@ } } }, - "/jobs/stop_job/{id}": { - "post": { - "security": [ - { - "ApiKeyAuth": [] - } - ], - "description": "Job to stop is specified by database ID. Only stopTime and final state are required in request body.\nReturns full job resource information according to 'JobMeta' scheme.", - "consumes": [ - "application/json" - ], - "produces": [ - "application/json" - ], - "tags": [ - "Job add and modify" - ], - "summary": "Marks job as completed and triggers archiving", - "parameters": [ - { - "type": "integer", - "description": "Database ID of Job", - "name": "id", - "in": "path", - "required": true - }, - { - "description": "stopTime and final state in request body", - "name": "request", - "in": "body", - "required": true, - "schema": { - "$ref": "#/definitions/api.StopJobApiRequest" - } - } - ], - "responses": { - "200": { - "description": "Job resource", - "schema": { - "$ref": "#/definitions/schema.JobMeta" - } - }, - "400": { - "description": "Bad Request", - "schema": { - "$ref": "#/definitions/api.ErrorResponse" - } - }, - "401": { - "description": "Unauthorized", - "schema": { - "$ref": "#/definitions/api.ErrorResponse" - } - }, - "403": { - "description": "Forbidden", - "schema": { - "$ref": "#/definitions/api.ErrorResponse" - } - }, - "404": { - "description": "Resource not found", - "schema": { - "$ref": "#/definitions/api.ErrorResponse" - } - }, - "422": { - "description": "Unprocessable Entity: finding job failed: sql: no rows in result set", - "schema": { - "$ref": "#/definitions/api.ErrorResponse" - } - }, - "500": { - "description": "Internal Server Error", - "schema": { - "$ref": "#/definitions/api.ErrorResponse" - } - } - } - } - }, "/jobs/tag_job/{id}": { "post": { "security": [ @@ -684,7 +602,7 @@ "ApiKeyAuth": [] } ], - "description": "Adds tag(s) to a job specified by DB ID. Name and Type of Tag(s) can be chosen freely.\nIf tagged job is already finished: Tag will be written directly to respective archive files.", + "description": "Adds tag(s) to a job specified by DB ID. Name and Type of Tag(s) can be chosen freely.\nTag Scope for frontend visibility will default to \"global\" if none entered, other options: \"admin\" or specific username.\nIf tagged job is already finished: Tag will be written directly to respective archive files.", "consumes": [ "application/json" ], @@ -1277,6 +1195,11 @@ "type": "string", "example": "Testjob" }, + "scope": { + "description": "Tag Scope for Frontend Display", + "type": "string", + "example": "global" + }, "type": { "description": "Tag Type", "type": "string", @@ -1404,9 +1327,8 @@ "api.StartJobApiResponse": { "type": "object", "properties": { - "id": { - "description": "Database ID of new job", - "type": "integer" + "msg": { + "type": "string" } } }, @@ -1418,17 +1340,14 @@ ], "properties": { "cluster": { - "description": "Cluster of job", "type": "string", "example": "fritz" }, "jobId": { - "description": "Cluster Job ID of job", "type": "integer", "example": 123000 }, "jobState": { - "description": "Final job state", "allOf": [ { "$ref": "#/definitions/schema.JobState" @@ -1437,12 +1356,10 @@ "example": "completed" }, "startTime": { - "description": "Start Time of job as epoch", "type": "integer", "example": 1649723812 }, "stopTime": { - "description": "Stop Time of job as epoch", "type": "integer", "example": 1649763839 } @@ -1487,12 +1404,10 @@ "type": "object", "properties": { "arrayJobId": { - "description": "The unique identifier of an array job", "type": "integer", "example": 123000 }, "cluster": { - "description": "The unique identifier of a cluster", "type": "string", "example": "fritz" }, @@ -1500,33 +1415,39 @@ "$ref": "#/definitions/schema.JobLinkResultList" }, "duration": { - "description": "Duration of job in seconds (Min \u003e 0)", "type": "integer", "minimum": 1, "example": 43200 }, + "energy": { + "type": "number" + }, + "energyFootprint": { + "type": "object", + "additionalProperties": { + "type": "number" + } + }, "exclusive": { - "description": "Specifies how nodes are shared: 0 - Shared among multiple jobs of multiple users, 1 - Job exclusive (Default), 2 - Shared among multiple jobs of same user", "type": "integer", "maximum": 2, "minimum": 0, "example": 1 }, - "flopsAnyAvg": { - "description": "FlopsAnyAvg as Float64", - "type": "number" + "footprint": { + "type": "object", + "additionalProperties": { + "type": "number" + } }, "id": { - "description": "The unique identifier of a job in the database", "type": "integer" }, "jobId": { - "description": "The unique identifier of a job", "type": "integer", "example": 123000 }, "jobState": { - "description": "Final state of job", "enum": [ "completed", "failed", @@ -1542,95 +1463,69 @@ ], "example": "completed" }, - "loadAvg": { - "description": "LoadAvg as Float64", - "type": "number" - }, - "memBwAvg": { - "description": "MemBwAvg as Float64", - "type": "number" - }, - "memUsedMax": { - "description": "MemUsedMax as Float64", - "type": "number" - }, "metaData": { - "description": "Additional information about the job", "type": "object", "additionalProperties": { "type": "string" } }, "monitoringStatus": { - "description": "State of monitoring system during job run: 0 - Disabled, 1 - Running or Archiving (Default), 2 - Archiving Failed, 3 - Archiving Successfull", "type": "integer", "maximum": 3, "minimum": 0, "example": 1 }, "numAcc": { - "description": "Number of accelerators used (Min \u003e 0)", "type": "integer", "minimum": 1, "example": 2 }, "numHwthreads": { - "description": "NumCores int32 `json:\"numCores\" db:\"num_cores\" example:\"20\" minimum:\"1\"` // Number of HWThreads used (Min \u003e 0)", "type": "integer", "minimum": 1, "example": 20 }, "numNodes": { - "description": "Number of nodes used (Min \u003e 0)", "type": "integer", "minimum": 1, "example": 2 }, "partition": { - "description": "The Slurm partition to which the job was submitted", "type": "string", "example": "main" }, "project": { - "description": "The unique identifier of a project", "type": "string", "example": "abcd200" }, "resources": { - "description": "Resources used by job", "type": "array", "items": { "$ref": "#/definitions/schema.Resource" } }, "smt": { - "description": "SMT threads used by job", "type": "integer", "example": 4 }, "startTime": { - "description": "Start time as 'time.Time' data type", "type": "string" }, "subCluster": { - "description": "The unique identifier of a sub cluster", "type": "string", "example": "main" }, "tags": { - "description": "List of tags", "type": "array", "items": { "$ref": "#/definitions/schema.Tag" } }, "user": { - "description": "The unique identifier of a user", "type": "string", "example": "abcd100h" }, "walltime": { - "description": "Requested walltime of job in seconds (Min \u003e 0)", "type": "integer", "minimum": 1, "example": 86400 @@ -1667,12 +1562,10 @@ "type": "object", "properties": { "arrayJobId": { - "description": "The unique identifier of an array job", "type": "integer", "example": 123000 }, "cluster": { - "description": "The unique identifier of a cluster", "type": "string", "example": "fritz" }, @@ -1680,29 +1573,39 @@ "$ref": "#/definitions/schema.JobLinkResultList" }, "duration": { - "description": "Duration of job in seconds (Min \u003e 0)", "type": "integer", "minimum": 1, "example": 43200 }, + "energy": { + "type": "number" + }, + "energyFootprint": { + "type": "object", + "additionalProperties": { + "type": "number" + } + }, "exclusive": { - "description": "Specifies how nodes are shared: 0 - Shared among multiple jobs of multiple users, 1 - Job exclusive (Default), 2 - Shared among multiple jobs of same user", "type": "integer", "maximum": 2, "minimum": 0, "example": 1 }, + "footprint": { + "type": "object", + "additionalProperties": { + "type": "number" + } + }, "id": { - "description": "The unique identifier of a job in the database", "type": "integer" }, "jobId": { - "description": "The unique identifier of a job", "type": "integer", "example": 123000 }, "jobState": { - "description": "Final state of job", "enum": [ "completed", "failed", @@ -1719,91 +1622,76 @@ "example": "completed" }, "metaData": { - "description": "Additional information about the job", "type": "object", "additionalProperties": { "type": "string" } }, "monitoringStatus": { - "description": "State of monitoring system during job run: 0 - Disabled, 1 - Running or Archiving (Default), 2 - Archiving Failed, 3 - Archiving Successfull", "type": "integer", "maximum": 3, "minimum": 0, "example": 1 }, "numAcc": { - "description": "Number of accelerators used (Min \u003e 0)", "type": "integer", "minimum": 1, "example": 2 }, "numHwthreads": { - "description": "NumCores int32 `json:\"numCores\" db:\"num_cores\" example:\"20\" minimum:\"1\"` // Number of HWThreads used (Min \u003e 0)", "type": "integer", "minimum": 1, "example": 20 }, "numNodes": { - "description": "Number of nodes used (Min \u003e 0)", "type": "integer", "minimum": 1, "example": 2 }, "partition": { - "description": "The Slurm partition to which the job was submitted", "type": "string", "example": "main" }, "project": { - "description": "The unique identifier of a project", "type": "string", "example": "abcd200" }, "resources": { - "description": "Resources used by job", "type": "array", "items": { "$ref": "#/definitions/schema.Resource" } }, "smt": { - "description": "SMT threads used by job", "type": "integer", "example": 4 }, "startTime": { - "description": "Start epoch time stamp in seconds (Min \u003e 0)", "type": "integer", "minimum": 1, "example": 1649723812 }, "statistics": { - "description": "Metric statistics of job", "type": "object", "additionalProperties": { "$ref": "#/definitions/schema.JobStatistics" } }, "subCluster": { - "description": "The unique identifier of a sub cluster", "type": "string", "example": "main" }, "tags": { - "description": "List of tags", "type": "array", "items": { "$ref": "#/definitions/schema.Tag" } }, "user": { - "description": "The unique identifier of a user", "type": "string", "example": "abcd100h" }, "walltime": { - "description": "Requested walltime of job in seconds (Min \u003e 0)", "type": "integer", "minimum": 1, "example": 86400 @@ -1892,6 +1780,15 @@ "caution": { "type": "number" }, + "energy": { + "type": "string" + }, + "footprint": { + "type": "string" + }, + "lowerIsBetter": { + "type": "boolean" + }, "name": { "type": "string" }, @@ -1969,22 +1866,18 @@ "type": "object", "properties": { "accelerators": { - "description": "List of of accelerator device ids", "type": "array", "items": { "type": "string" } }, "configuration": { - "description": "The configuration options of the node", "type": "string" }, "hostname": { - "description": "Name of the host (= node)", "type": "string" }, "hwthreads": { - "description": "List of OS processor ids", "type": "array", "items": { "type": "integer" @@ -2027,6 +1920,12 @@ "type": "number" } }, + "median": { + "type": "array", + "items": { + "type": "number" + } + }, "min": { "type": "array", "items": { @@ -2050,15 +1949,33 @@ "coresPerSocket": { "type": "integer" }, + "energyFootprint": { + "type": "array", + "items": { + "type": "string" + } + }, "flopRateScalar": { "$ref": "#/definitions/schema.MetricValue" }, "flopRateSimd": { "$ref": "#/definitions/schema.MetricValue" }, + "footprint": { + "type": "array", + "items": { + "type": "string" + } + }, "memoryBandwidth": { "$ref": "#/definitions/schema.MetricValue" }, + "metricConfig": { + "type": "array", + "items": { + "$ref": "#/definitions/schema.MetricConfig" + } + }, "name": { "type": "string" }, @@ -2088,6 +2005,15 @@ "caution": { "type": "number" }, + "energy": { + "type": "string" + }, + "footprint": { + "type": "string" + }, + "lowerIsBetter": { + "type": "boolean" + }, "name": { "type": "string" }, @@ -2107,16 +2033,17 @@ "type": "object", "properties": { "id": { - "description": "The unique DB identifier of a tag", "type": "integer" }, "name": { - "description": "Tag Name", "type": "string", "example": "Testjob" }, + "scope": { + "type": "string", + "example": "global" + }, "type": { - "description": "Tag Type", "type": "string", "example": "Debug" } diff --git a/api/swagger.yaml b/api/swagger.yaml index f47ac3f..4e3c47e 100644 --- a/api/swagger.yaml +++ b/api/swagger.yaml @@ -23,6 +23,10 @@ definitions: description: Tag Name example: Testjob type: string + scope: + description: Tag Scope for Frontend Display + example: global + type: string type: description: Tag Type example: Debug @@ -110,31 +114,25 @@ definitions: type: object api.StartJobApiResponse: properties: - id: - description: Database ID of new job - type: integer + msg: + type: string type: object api.StopJobApiRequest: properties: cluster: - description: Cluster of job example: fritz type: string jobId: - description: Cluster Job ID of job example: 123000 type: integer jobState: allOf: - $ref: '#/definitions/schema.JobState' - description: Final job state example: completed startTime: - description: Start Time of job as epoch example: 1649723812 type: integer stopTime: - description: Stop Time of job as epoch example: 1649763839 type: integer required: @@ -167,42 +165,40 @@ definitions: description: Information of a HPC job. properties: arrayJobId: - description: The unique identifier of an array job example: 123000 type: integer cluster: - description: The unique identifier of a cluster example: fritz type: string concurrentJobs: $ref: '#/definitions/schema.JobLinkResultList' duration: - description: Duration of job in seconds (Min > 0) example: 43200 minimum: 1 type: integer + energy: + type: number + energyFootprint: + additionalProperties: + type: number + type: object exclusive: - description: 'Specifies how nodes are shared: 0 - Shared among multiple jobs - of multiple users, 1 - Job exclusive (Default), 2 - Shared among multiple - jobs of same user' example: 1 maximum: 2 minimum: 0 type: integer - flopsAnyAvg: - description: FlopsAnyAvg as Float64 - type: number + footprint: + additionalProperties: + type: number + type: object id: - description: The unique identifier of a job in the database type: integer jobId: - description: The unique identifier of a job example: 123000 type: integer jobState: allOf: - $ref: '#/definitions/schema.JobState' - description: Final state of job enum: - completed - failed @@ -211,79 +207,53 @@ definitions: - timeout - out_of_memory example: completed - loadAvg: - description: LoadAvg as Float64 - type: number - memBwAvg: - description: MemBwAvg as Float64 - type: number - memUsedMax: - description: MemUsedMax as Float64 - type: number metaData: additionalProperties: type: string - description: Additional information about the job type: object monitoringStatus: - description: 'State of monitoring system during job run: 0 - Disabled, 1 - - Running or Archiving (Default), 2 - Archiving Failed, 3 - Archiving Successfull' example: 1 maximum: 3 minimum: 0 type: integer numAcc: - description: Number of accelerators used (Min > 0) example: 2 minimum: 1 type: integer numHwthreads: - description: NumCores int32 `json:"numCores" db:"num_cores" - example:"20" minimum:"1"` // - Number of HWThreads used (Min > 0) example: 20 minimum: 1 type: integer numNodes: - description: Number of nodes used (Min > 0) example: 2 minimum: 1 type: integer partition: - description: The Slurm partition to which the job was submitted example: main type: string project: - description: The unique identifier of a project example: abcd200 type: string resources: - description: Resources used by job items: $ref: '#/definitions/schema.Resource' type: array smt: - description: SMT threads used by job example: 4 type: integer startTime: - description: Start time as 'time.Time' data type type: string subCluster: - description: The unique identifier of a sub cluster example: main type: string tags: - description: List of tags items: $ref: '#/definitions/schema.Tag' type: array user: - description: The unique identifier of a user example: abcd100h type: string walltime: - description: Requested walltime of job in seconds (Min > 0) example: 86400 minimum: 1 type: integer @@ -308,39 +278,40 @@ definitions: description: Meta data information of a HPC job. properties: arrayJobId: - description: The unique identifier of an array job example: 123000 type: integer cluster: - description: The unique identifier of a cluster example: fritz type: string concurrentJobs: $ref: '#/definitions/schema.JobLinkResultList' duration: - description: Duration of job in seconds (Min > 0) example: 43200 minimum: 1 type: integer + energy: + type: number + energyFootprint: + additionalProperties: + type: number + type: object exclusive: - description: 'Specifies how nodes are shared: 0 - Shared among multiple jobs - of multiple users, 1 - Job exclusive (Default), 2 - Shared among multiple - jobs of same user' example: 1 maximum: 2 minimum: 0 type: integer + footprint: + additionalProperties: + type: number + type: object id: - description: The unique identifier of a job in the database type: integer jobId: - description: The unique identifier of a job example: 123000 type: integer jobState: allOf: - $ref: '#/definitions/schema.JobState' - description: Final state of job enum: - completed - failed @@ -352,74 +323,56 @@ definitions: metaData: additionalProperties: type: string - description: Additional information about the job type: object monitoringStatus: - description: 'State of monitoring system during job run: 0 - Disabled, 1 - - Running or Archiving (Default), 2 - Archiving Failed, 3 - Archiving Successfull' example: 1 maximum: 3 minimum: 0 type: integer numAcc: - description: Number of accelerators used (Min > 0) example: 2 minimum: 1 type: integer numHwthreads: - description: NumCores int32 `json:"numCores" db:"num_cores" - example:"20" minimum:"1"` // - Number of HWThreads used (Min > 0) example: 20 minimum: 1 type: integer numNodes: - description: Number of nodes used (Min > 0) example: 2 minimum: 1 type: integer partition: - description: The Slurm partition to which the job was submitted example: main type: string project: - description: The unique identifier of a project example: abcd200 type: string resources: - description: Resources used by job items: $ref: '#/definitions/schema.Resource' type: array smt: - description: SMT threads used by job example: 4 type: integer startTime: - description: Start epoch time stamp in seconds (Min > 0) example: 1649723812 minimum: 1 type: integer statistics: additionalProperties: $ref: '#/definitions/schema.JobStatistics' - description: Metric statistics of job type: object subCluster: - description: The unique identifier of a sub cluster example: main type: string tags: - description: List of tags items: $ref: '#/definitions/schema.Tag' type: array user: - description: The unique identifier of a user example: abcd100h type: string walltime: - description: Requested walltime of job in seconds (Min > 0) example: 86400 minimum: 1 type: integer @@ -486,6 +439,12 @@ definitions: type: number caution: type: number + energy: + type: string + footprint: + type: string + lowerIsBetter: + type: boolean name: type: string normal: @@ -541,18 +500,14 @@ definitions: description: A resource used by a job properties: accelerators: - description: List of of accelerator device ids items: type: string type: array configuration: - description: The configuration options of the node type: string hostname: - description: Name of the host (= node) type: string hwthreads: - description: List of OS processor ids items: type: integer type: array @@ -580,6 +535,10 @@ definitions: items: type: number type: array + median: + items: + type: number + type: array min: items: type: number @@ -595,12 +554,24 @@ definitions: properties: coresPerSocket: type: integer + energyFootprint: + items: + type: string + type: array flopRateScalar: $ref: '#/definitions/schema.MetricValue' flopRateSimd: $ref: '#/definitions/schema.MetricValue' + footprint: + items: + type: string + type: array memoryBandwidth: $ref: '#/definitions/schema.MetricValue' + metricConfig: + items: + $ref: '#/definitions/schema.MetricConfig' + type: array name: type: string nodes: @@ -620,6 +591,12 @@ definitions: type: number caution: type: number + energy: + type: string + footprint: + type: string + lowerIsBetter: + type: boolean name: type: string normal: @@ -633,14 +610,14 @@ definitions: description: Defines a tag using name and type. properties: id: - description: The unique DB identifier of a tag type: integer name: - description: Tag Name example: Testjob type: string + scope: + example: global + type: string type: - description: Tag Type example: Debug type: string type: object @@ -1197,68 +1174,13 @@ paths: summary: Marks job as completed and triggers archiving tags: - Job add and modify - /jobs/stop_job/{id}: - post: - consumes: - - application/json - description: |- - Job to stop is specified by database ID. Only stopTime and final state are required in request body. - Returns full job resource information according to 'JobMeta' scheme. - parameters: - - description: Database ID of Job - in: path - name: id - required: true - type: integer - - description: stopTime and final state in request body - in: body - name: request - required: true - schema: - $ref: '#/definitions/api.StopJobApiRequest' - produces: - - application/json - responses: - "200": - description: Job resource - schema: - $ref: '#/definitions/schema.JobMeta' - "400": - description: Bad Request - schema: - $ref: '#/definitions/api.ErrorResponse' - "401": - description: Unauthorized - schema: - $ref: '#/definitions/api.ErrorResponse' - "403": - description: Forbidden - schema: - $ref: '#/definitions/api.ErrorResponse' - "404": - description: Resource not found - schema: - $ref: '#/definitions/api.ErrorResponse' - "422": - description: 'Unprocessable Entity: finding job failed: sql: no rows in - result set' - schema: - $ref: '#/definitions/api.ErrorResponse' - "500": - description: Internal Server Error - schema: - $ref: '#/definitions/api.ErrorResponse' - security: - - ApiKeyAuth: [] - summary: Marks job as completed and triggers archiving - tags: - - Job add and modify /jobs/tag_job/{id}: post: consumes: - application/json description: |- Adds tag(s) to a job specified by DB ID. Name and Type of Tag(s) can be chosen freely. + Tag Scope for frontend visibility will default to "global" if none entered, other options: "admin" or specific username. If tagged job is already finished: Tag will be written directly to respective archive files. parameters: - description: Job Database ID diff --git a/internal/api/api_test.go b/internal/api/api_test.go index 3d1d7bb..bcabd5f 100644 --- a/internal/api/api_test.go +++ b/internal/api/api_test.go @@ -14,9 +14,9 @@ import ( "os" "path/filepath" "reflect" - "strconv" "strings" "testing" + "time" "github.com/ClusterCockpit/cc-backend/internal/api" "github.com/ClusterCockpit/cc-backend/internal/archiver" @@ -200,6 +200,10 @@ func TestRestApi(t *testing.T) { r.StrictSlash(true) restapi.MountApiRoutes(r) + var TestJobId int64 = 123 + var TestClusterName string = "testcluster" + var TestStartTime int64 = 123456789 + const startJobBody string = `{ "jobId": 123, "user": "testuser", @@ -225,7 +229,6 @@ func TestRestApi(t *testing.T) { "startTime": 123456789 }` - var dbid int64 const contextUserKey repository.ContextKey = "user" contextUserValue := &schema.User{ Username: "testuser", @@ -247,13 +250,10 @@ func TestRestApi(t *testing.T) { t.Fatal(response.Status, recorder.Body.String()) } - var res api.StartJobApiResponse - if err := json.Unmarshal(recorder.Body.Bytes(), &res); err != nil { - t.Fatal(err) - } + time.Sleep(1 * time.Second) resolver := graph.GetResolverInstance() - job, err := resolver.Query().Job(ctx, strconv.Itoa(int(res.DBID))) + job, err := restapi.JobRepository.Find(&TestJobId, &TestClusterName, &TestStartTime) if err != nil { t.Fatal(err) } @@ -285,8 +285,6 @@ func TestRestApi(t *testing.T) { if len(job.Tags) != 1 || job.Tags[0].Type != "testTagType" || job.Tags[0].Name != "testTagName" || job.Tags[0].Scope != "testuser" { t.Fatalf("unexpected tags: %#v", job.Tags) } - - dbid = res.DBID }); !ok { return } @@ -314,8 +312,7 @@ func TestRestApi(t *testing.T) { } archiver.WaitForArchiving() - resolver := graph.GetResolverInstance() - job, err := resolver.Query().Job(ctx, strconv.Itoa(int(dbid))) + job, err := restapi.JobRepository.Find(&TestJobId, &TestClusterName, &TestStartTime) if err != nil { t.Fatal(err) } @@ -404,8 +401,10 @@ func TestRestApi(t *testing.T) { t.Fatal("subtest failed") } + time.Sleep(1 * time.Second) + const stopJobBodyFailed string = `{ - "jobId": 12345, + "jobId": 12345, "cluster": "testcluster", "jobState": "failed", diff --git a/internal/api/docs.go b/internal/api/docs.go index e5ec50b..7c1daac 100644 --- a/internal/api/docs.go +++ b/internal/api/docs.go @@ -601,88 +601,6 @@ const docTemplate = `{ } } }, - "/jobs/stop_job/{id}": { - "post": { - "security": [ - { - "ApiKeyAuth": [] - } - ], - "description": "Job to stop is specified by database ID. Only stopTime and final state are required in request body.\nReturns full job resource information according to 'JobMeta' scheme.", - "consumes": [ - "application/json" - ], - "produces": [ - "application/json" - ], - "tags": [ - "Job add and modify" - ], - "summary": "Marks job as completed and triggers archiving", - "parameters": [ - { - "type": "integer", - "description": "Database ID of Job", - "name": "id", - "in": "path", - "required": true - }, - { - "description": "stopTime and final state in request body", - "name": "request", - "in": "body", - "required": true, - "schema": { - "$ref": "#/definitions/api.StopJobApiRequest" - } - } - ], - "responses": { - "200": { - "description": "Job resource", - "schema": { - "$ref": "#/definitions/schema.JobMeta" - } - }, - "400": { - "description": "Bad Request", - "schema": { - "$ref": "#/definitions/api.ErrorResponse" - } - }, - "401": { - "description": "Unauthorized", - "schema": { - "$ref": "#/definitions/api.ErrorResponse" - } - }, - "403": { - "description": "Forbidden", - "schema": { - "$ref": "#/definitions/api.ErrorResponse" - } - }, - "404": { - "description": "Resource not found", - "schema": { - "$ref": "#/definitions/api.ErrorResponse" - } - }, - "422": { - "description": "Unprocessable Entity: finding job failed: sql: no rows in result set", - "schema": { - "$ref": "#/definitions/api.ErrorResponse" - } - }, - "500": { - "description": "Internal Server Error", - "schema": { - "$ref": "#/definitions/api.ErrorResponse" - } - } - } - } - }, "/jobs/tag_job/{id}": { "post": { "security": [ @@ -690,7 +608,7 @@ const docTemplate = `{ "ApiKeyAuth": [] } ], - "description": "Adds tag(s) to a job specified by DB ID. Name and Type of Tag(s) can be chosen freely.\nIf tagged job is already finished: Tag will be written directly to respective archive files.", + "description": "Adds tag(s) to a job specified by DB ID. Name and Type of Tag(s) can be chosen freely.\nTag Scope for frontend visibility will default to \"global\" if none entered, other options: \"admin\" or specific username.\nIf tagged job is already finished: Tag will be written directly to respective archive files.", "consumes": [ "application/json" ], @@ -1283,6 +1201,11 @@ const docTemplate = `{ "type": "string", "example": "Testjob" }, + "scope": { + "description": "Tag Scope for Frontend Display", + "type": "string", + "example": "global" + }, "type": { "description": "Tag Type", "type": "string", @@ -1410,9 +1333,8 @@ const docTemplate = `{ "api.StartJobApiResponse": { "type": "object", "properties": { - "id": { - "description": "Database ID of new job", - "type": "integer" + "msg": { + "type": "string" } } }, @@ -1424,17 +1346,14 @@ const docTemplate = `{ ], "properties": { "cluster": { - "description": "Cluster of job", "type": "string", "example": "fritz" }, "jobId": { - "description": "Cluster Job ID of job", "type": "integer", "example": 123000 }, "jobState": { - "description": "Final job state", "allOf": [ { "$ref": "#/definitions/schema.JobState" @@ -1443,12 +1362,10 @@ const docTemplate = `{ "example": "completed" }, "startTime": { - "description": "Start Time of job as epoch", "type": "integer", "example": 1649723812 }, "stopTime": { - "description": "Stop Time of job as epoch", "type": "integer", "example": 1649763839 } @@ -1493,12 +1410,10 @@ const docTemplate = `{ "type": "object", "properties": { "arrayJobId": { - "description": "The unique identifier of an array job", "type": "integer", "example": 123000 }, "cluster": { - "description": "The unique identifier of a cluster", "type": "string", "example": "fritz" }, @@ -1506,33 +1421,39 @@ const docTemplate = `{ "$ref": "#/definitions/schema.JobLinkResultList" }, "duration": { - "description": "Duration of job in seconds (Min \u003e 0)", "type": "integer", "minimum": 1, "example": 43200 }, + "energy": { + "type": "number" + }, + "energyFootprint": { + "type": "object", + "additionalProperties": { + "type": "number" + } + }, "exclusive": { - "description": "Specifies how nodes are shared: 0 - Shared among multiple jobs of multiple users, 1 - Job exclusive (Default), 2 - Shared among multiple jobs of same user", "type": "integer", "maximum": 2, "minimum": 0, "example": 1 }, - "flopsAnyAvg": { - "description": "FlopsAnyAvg as Float64", - "type": "number" + "footprint": { + "type": "object", + "additionalProperties": { + "type": "number" + } }, "id": { - "description": "The unique identifier of a job in the database", "type": "integer" }, "jobId": { - "description": "The unique identifier of a job", "type": "integer", "example": 123000 }, "jobState": { - "description": "Final state of job", "enum": [ "completed", "failed", @@ -1548,95 +1469,69 @@ const docTemplate = `{ ], "example": "completed" }, - "loadAvg": { - "description": "LoadAvg as Float64", - "type": "number" - }, - "memBwAvg": { - "description": "MemBwAvg as Float64", - "type": "number" - }, - "memUsedMax": { - "description": "MemUsedMax as Float64", - "type": "number" - }, "metaData": { - "description": "Additional information about the job", "type": "object", "additionalProperties": { "type": "string" } }, "monitoringStatus": { - "description": "State of monitoring system during job run: 0 - Disabled, 1 - Running or Archiving (Default), 2 - Archiving Failed, 3 - Archiving Successfull", "type": "integer", "maximum": 3, "minimum": 0, "example": 1 }, "numAcc": { - "description": "Number of accelerators used (Min \u003e 0)", "type": "integer", "minimum": 1, "example": 2 }, "numHwthreads": { - "description": "NumCores int32 ` + "`" + `json:\"numCores\" db:\"num_cores\" example:\"20\" minimum:\"1\"` + "`" + ` // Number of HWThreads used (Min \u003e 0)", "type": "integer", "minimum": 1, "example": 20 }, "numNodes": { - "description": "Number of nodes used (Min \u003e 0)", "type": "integer", "minimum": 1, "example": 2 }, "partition": { - "description": "The Slurm partition to which the job was submitted", "type": "string", "example": "main" }, "project": { - "description": "The unique identifier of a project", "type": "string", "example": "abcd200" }, "resources": { - "description": "Resources used by job", "type": "array", "items": { "$ref": "#/definitions/schema.Resource" } }, "smt": { - "description": "SMT threads used by job", "type": "integer", "example": 4 }, "startTime": { - "description": "Start time as 'time.Time' data type", "type": "string" }, "subCluster": { - "description": "The unique identifier of a sub cluster", "type": "string", "example": "main" }, "tags": { - "description": "List of tags", "type": "array", "items": { "$ref": "#/definitions/schema.Tag" } }, "user": { - "description": "The unique identifier of a user", "type": "string", "example": "abcd100h" }, "walltime": { - "description": "Requested walltime of job in seconds (Min \u003e 0)", "type": "integer", "minimum": 1, "example": 86400 @@ -1673,12 +1568,10 @@ const docTemplate = `{ "type": "object", "properties": { "arrayJobId": { - "description": "The unique identifier of an array job", "type": "integer", "example": 123000 }, "cluster": { - "description": "The unique identifier of a cluster", "type": "string", "example": "fritz" }, @@ -1686,29 +1579,39 @@ const docTemplate = `{ "$ref": "#/definitions/schema.JobLinkResultList" }, "duration": { - "description": "Duration of job in seconds (Min \u003e 0)", "type": "integer", "minimum": 1, "example": 43200 }, + "energy": { + "type": "number" + }, + "energyFootprint": { + "type": "object", + "additionalProperties": { + "type": "number" + } + }, "exclusive": { - "description": "Specifies how nodes are shared: 0 - Shared among multiple jobs of multiple users, 1 - Job exclusive (Default), 2 - Shared among multiple jobs of same user", "type": "integer", "maximum": 2, "minimum": 0, "example": 1 }, + "footprint": { + "type": "object", + "additionalProperties": { + "type": "number" + } + }, "id": { - "description": "The unique identifier of a job in the database", "type": "integer" }, "jobId": { - "description": "The unique identifier of a job", "type": "integer", "example": 123000 }, "jobState": { - "description": "Final state of job", "enum": [ "completed", "failed", @@ -1725,91 +1628,76 @@ const docTemplate = `{ "example": "completed" }, "metaData": { - "description": "Additional information about the job", "type": "object", "additionalProperties": { "type": "string" } }, "monitoringStatus": { - "description": "State of monitoring system during job run: 0 - Disabled, 1 - Running or Archiving (Default), 2 - Archiving Failed, 3 - Archiving Successfull", "type": "integer", "maximum": 3, "minimum": 0, "example": 1 }, "numAcc": { - "description": "Number of accelerators used (Min \u003e 0)", "type": "integer", "minimum": 1, "example": 2 }, "numHwthreads": { - "description": "NumCores int32 ` + "`" + `json:\"numCores\" db:\"num_cores\" example:\"20\" minimum:\"1\"` + "`" + ` // Number of HWThreads used (Min \u003e 0)", "type": "integer", "minimum": 1, "example": 20 }, "numNodes": { - "description": "Number of nodes used (Min \u003e 0)", "type": "integer", "minimum": 1, "example": 2 }, "partition": { - "description": "The Slurm partition to which the job was submitted", "type": "string", "example": "main" }, "project": { - "description": "The unique identifier of a project", "type": "string", "example": "abcd200" }, "resources": { - "description": "Resources used by job", "type": "array", "items": { "$ref": "#/definitions/schema.Resource" } }, "smt": { - "description": "SMT threads used by job", "type": "integer", "example": 4 }, "startTime": { - "description": "Start epoch time stamp in seconds (Min \u003e 0)", "type": "integer", "minimum": 1, "example": 1649723812 }, "statistics": { - "description": "Metric statistics of job", "type": "object", "additionalProperties": { "$ref": "#/definitions/schema.JobStatistics" } }, "subCluster": { - "description": "The unique identifier of a sub cluster", "type": "string", "example": "main" }, "tags": { - "description": "List of tags", "type": "array", "items": { "$ref": "#/definitions/schema.Tag" } }, "user": { - "description": "The unique identifier of a user", "type": "string", "example": "abcd100h" }, "walltime": { - "description": "Requested walltime of job in seconds (Min \u003e 0)", "type": "integer", "minimum": 1, "example": 86400 @@ -1898,6 +1786,15 @@ const docTemplate = `{ "caution": { "type": "number" }, + "energy": { + "type": "string" + }, + "footprint": { + "type": "string" + }, + "lowerIsBetter": { + "type": "boolean" + }, "name": { "type": "string" }, @@ -1975,22 +1872,18 @@ const docTemplate = `{ "type": "object", "properties": { "accelerators": { - "description": "List of of accelerator device ids", "type": "array", "items": { "type": "string" } }, "configuration": { - "description": "The configuration options of the node", "type": "string" }, "hostname": { - "description": "Name of the host (= node)", "type": "string" }, "hwthreads": { - "description": "List of OS processor ids", "type": "array", "items": { "type": "integer" @@ -2033,6 +1926,12 @@ const docTemplate = `{ "type": "number" } }, + "median": { + "type": "array", + "items": { + "type": "number" + } + }, "min": { "type": "array", "items": { @@ -2056,15 +1955,33 @@ const docTemplate = `{ "coresPerSocket": { "type": "integer" }, + "energyFootprint": { + "type": "array", + "items": { + "type": "string" + } + }, "flopRateScalar": { "$ref": "#/definitions/schema.MetricValue" }, "flopRateSimd": { "$ref": "#/definitions/schema.MetricValue" }, + "footprint": { + "type": "array", + "items": { + "type": "string" + } + }, "memoryBandwidth": { "$ref": "#/definitions/schema.MetricValue" }, + "metricConfig": { + "type": "array", + "items": { + "$ref": "#/definitions/schema.MetricConfig" + } + }, "name": { "type": "string" }, @@ -2094,6 +2011,15 @@ const docTemplate = `{ "caution": { "type": "number" }, + "energy": { + "type": "string" + }, + "footprint": { + "type": "string" + }, + "lowerIsBetter": { + "type": "boolean" + }, "name": { "type": "string" }, @@ -2113,16 +2039,17 @@ const docTemplate = `{ "type": "object", "properties": { "id": { - "description": "The unique DB identifier of a tag", "type": "integer" }, "name": { - "description": "Tag Name", "type": "string", "example": "Testjob" }, + "scope": { + "type": "string", + "example": "global" + }, "type": { - "description": "Tag Type", "type": "string", "example": "Debug" } diff --git a/internal/api/rest.go b/internal/api/rest.go index b60521b..3842596 100644 --- a/internal/api/rest.go +++ b/internal/api/rest.go @@ -124,8 +124,7 @@ func (api *RestApi) MountFrontendApiRoutes(r *mux.Router) { // StartJobApiResponse model type StartJobApiResponse struct { - // Database ID of new job - DBID int64 `json:"id"` + Message string `json:"msg"` } // DeleteJobApiResponse model @@ -806,25 +805,10 @@ func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) { repository.TriggerJobStart(repository.JobWithUser{Job: &req, User: repository.GetUserFromContext(r.Context())}) - id, err := api.JobRepository.Start(&req) - if err != nil { - handleError(fmt.Errorf("insert into database failed: %w", err), http.StatusInternalServerError, rw) - return - } - - for _, tag := range req.Tags { - if _, err := api.JobRepository.AddTagOrCreate(repository.GetUserFromContext(r.Context()), id, tag.Type, tag.Name, tag.Scope); err != nil { - http.Error(rw, err.Error(), http.StatusInternalServerError) - handleError(fmt.Errorf("adding tag to new job %d failed: %w", id, err), http.StatusInternalServerError, rw) - return - } - } - - log.Printf("new job (id: %d): cluster=%s, jobId=%d, user=%s, startTime=%d", id, req.Cluster, req.JobID, req.User, req.StartTime) rw.Header().Add("Content-Type", "application/json") rw.WriteHeader(http.StatusCreated) json.NewEncoder(rw).Encode(StartJobApiResponse{ - DBID: id, + Message: fmt.Sprintf("Successfully triggered job start"), }) } diff --git a/internal/repository/jobStartWorker.go b/internal/repository/jobStartWorker.go index dbd2247..18d2be7 100644 --- a/internal/repository/jobStartWorker.go +++ b/internal/repository/jobStartWorker.go @@ -6,6 +6,7 @@ package repository import ( "sync" + "time" "github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/schema" @@ -36,18 +37,30 @@ func jobStartWorker() { break } jobRepo := GetJobRepository() + var id int64 - id, err := jobRepo.Start(req.Job) - if err != nil { - log.Errorf("insert into database failed: %v", err) + for i := 0; i < 5; i++ { + var err error + + id, err = jobRepo.Start(req.Job) + if err != nil { + log.Errorf("Attempt %d: insert into database failed: %v", i, err) + } else { + break + } + time.Sleep(1 * time.Second) } for _, tag := range req.Job.Tags { - if _, err := jobRepo.AddTagOrCreate(req.User, id, tag.Type, tag.Name, tag.Scope); err != nil { + if _, err := jobRepo.AddTagOrCreate(req.User, id, + tag.Type, tag.Name, tag.Scope); err != nil { log.Errorf("adding tag to new job %d failed: %v", id, err) } } + log.Printf("new job (id: %d): cluster=%s, jobId=%d, user=%s, startTime=%d", + id, req.Job.Cluster, req.Job.JobID, req.Job.User, req.Job.StartTime) + jobStartPending.Done() } } From e1be6c713886912aa596d065ad181b0031f63d3f Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Tue, 26 Nov 2024 10:49:44 +0100 Subject: [PATCH 14/16] Remove UpdateEnergy from UpdateFootprint Task Conputing total energy for running jobs does not make any sense --- internal/taskManager/updateFootprintService.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/internal/taskManager/updateFootprintService.go b/internal/taskManager/updateFootprintService.go index 59c1b12..e9525d2 100644 --- a/internal/taskManager/updateFootprintService.go +++ b/internal/taskManager/updateFootprintService.go @@ -114,12 +114,6 @@ func RegisterFootprintWorker() { ce++ continue } - stmt, err = jobRepo.UpdateEnergy(stmt, jobMeta) - if err != nil { - log.Errorf("update job (dbid: %d) statement build failed at energy step: %s", job.ID, err.Error()) - ce++ - continue - } stmt = stmt.Where("job.id = ?", job.ID) pendingStatements = append(pendingStatements, stmt) From 38ce40ae7df5148f3e56a461c1235e4a8371023c Mon Sep 17 00:00:00 2001 From: Christoph Kluge Date: Tue, 26 Nov 2024 16:21:16 +0100 Subject: [PATCH 15/16] feat: redirect to requested page after login, solves #281 --- cmd/cc-backend/server.go | 19 ++++++++----------- internal/auth/auth.go | 8 ++++++-- .../taskManager/updateFootprintService.go | 4 ++-- web/templates/login.tmpl | 1 + web/web.go | 1 + 5 files changed, 18 insertions(+), 15 deletions(-) diff --git a/cmd/cc-backend/server.go b/cmd/cc-backend/server.go index fc620c8..083b9e5 100644 --- a/cmd/cc-backend/server.go +++ b/cmd/cc-backend/server.go @@ -110,9 +110,7 @@ func serverInit() { if !config.Keys.DisableAuthentication { router.Handle("/login", authHandle.Login( - // On success: - http.RedirectHandler("/", http.StatusTemporaryRedirect), - + // On success: Handled within Login() // On failure: func(rw http.ResponseWriter, r *http.Request, err error) { rw.Header().Add("Content-Type", "text/html; charset=utf-8") @@ -127,9 +125,7 @@ func serverInit() { })).Methods(http.MethodPost) router.Handle("/jwt-login", authHandle.Login( - // On success: - http.RedirectHandler("/", http.StatusTemporaryRedirect), - + // On success: Handled within Login() // On failure: func(rw http.ResponseWriter, r *http.Request, err error) { rw.Header().Add("Content-Type", "text/html; charset=utf-8") @@ -165,11 +161,12 @@ func serverInit() { func(rw http.ResponseWriter, r *http.Request, err error) { rw.WriteHeader(http.StatusUnauthorized) web.RenderTemplate(rw, "login.tmpl", &web.Page{ - Title: "Authentication failed - ClusterCockpit", - MsgType: "alert-danger", - Message: err.Error(), - Build: buildInfo, - Infos: info, + Title: "Authentication failed - ClusterCockpit", + MsgType: "alert-danger", + Message: err.Error(), + Build: buildInfo, + Infos: info, + Redirect: r.RequestURI, }) }) }) diff --git a/internal/auth/auth.go b/internal/auth/auth.go index 270989f..a186cf6 100644 --- a/internal/auth/auth.go +++ b/internal/auth/auth.go @@ -201,7 +201,6 @@ func (auth *Authentication) SaveSession(rw http.ResponseWriter, r *http.Request, } func (auth *Authentication) Login( - onsuccess http.Handler, onfailure func(rw http.ResponseWriter, r *http.Request, loginErr error), ) http.Handler { return http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { @@ -238,7 +237,12 @@ func (auth *Authentication) Login( log.Infof("login successfull: user: %#v (roles: %v, projects: %v)", user.Username, user.Roles, user.Projects) ctx := context.WithValue(r.Context(), repository.ContextUserKey, user) - onsuccess.ServeHTTP(rw, r.WithContext(ctx)) + + if r.FormValue("redirect") != "" { + http.RedirectHandler(r.FormValue("redirect"), http.StatusFound).ServeHTTP(rw, r.WithContext(ctx)) + } else { + http.RedirectHandler(r.FormValue("/"), http.StatusFound).ServeHTTP(rw, r.WithContext(ctx)) + } return } diff --git a/internal/taskManager/updateFootprintService.go b/internal/taskManager/updateFootprintService.go index e9525d2..d30d766 100644 --- a/internal/taskManager/updateFootprintService.go +++ b/internal/taskManager/updateFootprintService.go @@ -43,9 +43,9 @@ func RegisterFootprintWorker() { if err != nil { continue } - // NOTE: Additional Subcluster Loop Could Allow For Limited List Of (Energy)Footprint-Metrics Only. + // NOTE: Additional Subcluster Loop Could Allow For Limited List Of Footprint-Metrics Only. // - Chunk-Size Would Then Be 'SubCluster' (Running Jobs, Transactions) as Lists Can Change Within SCs - // - Would Require Review of 'updateFootprint' And 'updateEnergy' Usage + // - Would Require Review of 'updateFootprint' Usage (Logic Could Possibly Be Included Here Completely) allMetrics := make([]string, 0) metricConfigs := archive.GetCluster(cluster.Name).MetricConfig for _, mc := range metricConfigs { diff --git a/web/templates/login.tmpl b/web/templates/login.tmpl index f10e064..cd13926 100644 --- a/web/templates/login.tmpl +++ b/web/templates/login.tmpl @@ -41,6 +41,7 @@ {{- if .Infos.hasOpenIDConnect}} OpenID Connect Login {{end}} + diff --git a/web/web.go b/web/web.go index 45ca9e3..1cfa176 100644 --- a/web/web.go +++ b/web/web.go @@ -99,6 +99,7 @@ type Page struct { Infos map[string]interface{} // For generic use (e.g. username for /monitoring/user/, job id for /monitoring/job/) Config map[string]interface{} // UI settings for the currently logged in user (e.g. line width, ...) Resampling *schema.ResampleConfig // If not nil, defines resampling trigger and resolutions + Redirect string // The originally requested URL, for intermediate login handling } func RenderTemplate(rw http.ResponseWriter, file string, page *Page) { From 00a578657c9f168b7d32ffd2b420ba686a232607 Mon Sep 17 00:00:00 2001 From: Christoph Kluge Date: Wed, 27 Nov 2024 10:50:11 +0100 Subject: [PATCH 16/16] feat: add edit of notice box content to admin settings --- internal/api/rest.go | 64 +++++++++++++++ internal/routerConfig/routes.go | 13 +++- web/frontend/src/Config.root.svelte | 3 +- web/frontend/src/config.entrypoint.js | 3 +- web/frontend/src/config/AdminSettings.svelte | 4 + .../src/config/admin/NoticeEdit.svelte | 78 +++++++++++++++++++ web/templates/config.tmpl | 1 + 7 files changed, 163 insertions(+), 3 deletions(-) create mode 100644 web/frontend/src/config/admin/NoticeEdit.svelte diff --git a/internal/api/rest.go b/internal/api/rest.go index 3842596..db747ce 100644 --- a/internal/api/rest.go +++ b/internal/api/rest.go @@ -110,6 +110,7 @@ func (api *RestApi) MountConfigApiRoutes(r *mux.Router) { r.HandleFunc("/users/", api.getUsers).Methods(http.MethodGet) r.HandleFunc("/users/", api.deleteUser).Methods(http.MethodDelete) r.HandleFunc("/user/{id}", api.updateUser).Methods(http.MethodPost) + r.HandleFunc("/notice/", api.editNotice).Methods(http.MethodPost) } } @@ -1285,6 +1286,69 @@ func (api *RestApi) updateUser(rw http.ResponseWriter, r *http.Request) { } } +// editNotice godoc +// @summary Updates or empties the notice box content +// @tags User +// @description Modifies the content of notice.txt, shown as notice box on the homepage. +// @description If more than one formValue is set then only the highest priority field is used. +// @description Only accessible from IPs registered with apiAllowedIPs configuration option. +// @accept mpfd +// @produce plain +// @param new-content formData string false "Priority 1: New content to display" +// @success 200 {string} string "Success Response Message" +// @failure 400 {string} string "Bad Request" +// @failure 401 {string} string "Unauthorized" +// @failure 403 {string} string "Forbidden" +// @failure 422 {string} string "Unprocessable Entity: The user could not be updated" +// @failure 500 {string} string "Internal Server Error" +// @security ApiKeyAuth +// @router /notice/ [post] +func (api *RestApi) editNotice(rw http.ResponseWriter, r *http.Request) { + err := securedCheck(r) + if err != nil { + http.Error(rw, err.Error(), http.StatusForbidden) + return + } + + if user := repository.GetUserFromContext(r.Context()); !user.HasRole(schema.RoleAdmin) { + http.Error(rw, "Only admins are allowed to update the notice.txt file", http.StatusForbidden) + return + } + + // Get Value + newContent := r.FormValue("new-content") + + // Check FIle + noticeExists := util.CheckFileExists("./var/notice.txt") + if !noticeExists { + ntxt, err := os.Create("./var/notice.txt") + if err != nil { + log.Errorf("Creating ./var/notice.txt failed: %s", err.Error()) + http.Error(rw, err.Error(), http.StatusUnprocessableEntity) + return + } + ntxt.Close() + } + + if newContent != "" { + if err := os.WriteFile("./var/notice.txt", []byte(newContent), 0o666); err != nil { + log.Errorf("Writing to ./var/notice.txt failed: %s", err.Error()) + http.Error(rw, err.Error(), http.StatusUnprocessableEntity) + return + } else { + rw.Write([]byte("Update Notice Content Success")) + } + } else { + if err := os.WriteFile("./var/notice.txt", []byte(""), 0o666); err != nil { + log.Errorf("Writing to ./var/notice.txt failed: %s", err.Error()) + http.Error(rw, err.Error(), http.StatusUnprocessableEntity) + return + } else { + rw.Write([]byte("Empty Notice Content Success")) + } + } +} + func (api *RestApi) getJWT(rw http.ResponseWriter, r *http.Request) { err := securedCheck(r) if err != nil { diff --git a/internal/routerConfig/routes.go b/internal/routerConfig/routes.go index 1e2fe73..8e943a0 100644 --- a/internal/routerConfig/routes.go +++ b/internal/routerConfig/routes.go @@ -35,7 +35,7 @@ type Route struct { var routes []Route = []Route{ {"/", "home.tmpl", "ClusterCockpit", false, setupHomeRoute}, - {"/config", "config.tmpl", "Settings", false, func(i InfoType, r *http.Request) InfoType { return i }}, + {"/config", "config.tmpl", "Settings", false, setupConfigRoute}, {"/monitoring/jobs/", "monitoring/jobs.tmpl", "Jobs - ClusterCockpit", true, func(i InfoType, r *http.Request) InfoType { return i }}, {"/monitoring/job/{id:[0-9]+}", "monitoring/job.tmpl", "Job - ClusterCockpit", false, setupJobRoute}, {"/monitoring/users/", "monitoring/list.tmpl", "Users - ClusterCockpit", true, func(i InfoType, r *http.Request) InfoType { i["listType"] = "USER"; return i }}, @@ -80,6 +80,17 @@ func setupHomeRoute(i InfoType, r *http.Request) InfoType { return i } +func setupConfigRoute(i InfoType, r *http.Request) InfoType { + if util.CheckFileExists("./var/notice.txt") { + msg, err := os.ReadFile("./var/notice.txt") + if err == nil { + i["ncontent"] = string(msg) + } + } + + return i +} + func setupJobRoute(i InfoType, r *http.Request) InfoType { i["id"] = mux.Vars(r)["id"] if config.Keys.EmissionConstant != 0 { diff --git a/web/frontend/src/Config.root.svelte b/web/frontend/src/Config.root.svelte index 6dd68f1..dc45491 100644 --- a/web/frontend/src/Config.root.svelte +++ b/web/frontend/src/Config.root.svelte @@ -15,6 +15,7 @@ export let isAdmin; export let isApi; export let username; + export let ncontent; {#if isAdmin == true} @@ -22,7 +23,7 @@ Admin Options - + {/if} diff --git a/web/frontend/src/config.entrypoint.js b/web/frontend/src/config.entrypoint.js index 345056b..feb3916 100644 --- a/web/frontend/src/config.entrypoint.js +++ b/web/frontend/src/config.entrypoint.js @@ -6,7 +6,8 @@ new Config({ props: { isAdmin: isAdmin, isApi: isApi, - username: username + username: username, + ncontent: ncontent, }, context: new Map([ ['cc-config', clusterCockpitConfig], diff --git a/web/frontend/src/config/AdminSettings.svelte b/web/frontend/src/config/AdminSettings.svelte index 9d3abf2..f512d40 100644 --- a/web/frontend/src/config/AdminSettings.svelte +++ b/web/frontend/src/config/AdminSettings.svelte @@ -10,6 +10,9 @@ import AddUser from "./admin/AddUser.svelte"; import ShowUsers from "./admin/ShowUsers.svelte"; import Options from "./admin/Options.svelte"; + import NoticeEdit from "./admin/NoticeEdit.svelte"; + + export let ncontent; let users = []; let roles = []; @@ -52,4 +55,5 @@ + diff --git a/web/frontend/src/config/admin/NoticeEdit.svelte b/web/frontend/src/config/admin/NoticeEdit.svelte new file mode 100644 index 0000000..325800b --- /dev/null +++ b/web/frontend/src/config/admin/NoticeEdit.svelte @@ -0,0 +1,78 @@ + + + + + + + + Edit Notice Shown On Homepage +

Empty content ("No Content.") hides notice card on homepage.

+
+ + + + + +
+

+ {#if displayMessage}Update: {message.msg}{/if} +

+
+
+ diff --git a/web/templates/config.tmpl b/web/templates/config.tmpl index 7993c3e..914dc88 100644 --- a/web/templates/config.tmpl +++ b/web/templates/config.tmpl @@ -13,6 +13,7 @@ const filterPresets = {{ .FilterPresets }}; const clusterCockpitConfig = {{ .Config }}; const resampleConfig = {{ .Resampling }}; + const ncontent = {{ .Infos.ncontent }}; {{end}} \ No newline at end of file