From 2d90fd05d655da5c1739e25046195573d84a4793 Mon Sep 17 00:00:00 2001 From: Christoph Kluge Date: Fri, 20 Feb 2026 10:38:07 +0100 Subject: [PATCH 01/11] Manually migrate editMetaByRequest to current state of 2026 - originally PR #400 --- api/swagger.json | 65 +++++++++++++++++++++++++++++++++++++++++++- api/swagger.yaml | 65 +++++++++++++++++++++++++++++++++++++++++++- internal/api/docs.go | 65 +++++++++++++++++++++++++++++++++++++++++++- internal/api/job.go | 64 ++++++++++++++++++++++++++++++++++++++++--- internal/api/rest.go | 2 +- 5 files changed, 253 insertions(+), 8 deletions(-) diff --git a/api/swagger.json b/api/swagger.json index 42ed7bb6..c9c36de1 100644 --- a/api/swagger.json +++ b/api/swagger.json @@ -389,8 +389,71 @@ ] } }, + "/api/jobs/edit_meta/": { + "patch": { + "security": [ + { + "ApiKeyAuth": [] + } + ], + "description": "Edit key value pairs in metadata json of job specified by jobID, StartTime and Cluster\nIf a key already exists its content will be overwritten", + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "tags": [ + "Job add and modify" + ], + "summary": "Edit meta-data json by request", + "parameters": [ + { + "description": "Specifies job and payload to add or update", + "name": "request", + "in": "body", + "required": true, + "schema": { + "$ref": "#/definitions/api.JobMetaRequest" + } + } + ], + "responses": { + "200": { + "description": "Updated job resource", + "schema": { + "$ref": "#/definitions/schema.Job" + } + }, + "400": { + "description": "Bad Request", + "schema": { + "$ref": "#/definitions/api.ErrorResponse" + } + }, + "401": { + "description": "Unauthorized", + "schema": { + "$ref": "#/definitions/api.ErrorResponse" + } + }, + "404": { + "description": "Job does not exist", + "schema": { + "$ref": "#/definitions/api.ErrorResponse" + } + }, + "500": { + "description": "Internal Server Error", + "schema": { + "$ref": "#/definitions/api.ErrorResponse" + } + } + } + } + }, "/api/jobs/edit_meta/{id}": { - "post": { + "patch": { "description": "Edit key value pairs in job metadata json\nIf a key already exists its content will be overwritten", "consumes": [ "application/json" diff --git a/api/swagger.yaml b/api/swagger.yaml index 0bf60082..def939dd 100644 --- a/api/swagger.yaml +++ b/api/swagger.yaml @@ -102,6 +102,27 @@ definitions: description: Page id returned type: integer type: object + api.JobMetaRequest: + properties: + cluster: + description: Cluster of job + example: fritz + type: string + jobId: + description: Cluster Job ID of job + example: 123000 + type: integer + payload: + allOf: + - $ref: '#/definitions/api.EditMetaRequest' + description: Content to Add to Job Meta_Data + startTime: + description: Start Time of job as epoch + example: 1649723812 + type: integer + required: + - jobId + type: object api.JobMetricWithName: properties: metric: @@ -1091,8 +1112,50 @@ paths: summary: Remove a job from the sql database tags: - Job remove + /api/jobs/edit_meta/: + patch: + consumes: + - application/json + description: |- + Edit key value pairs in metadata json of job specified by jobID, StartTime and Cluster + If a key already exists its content will be overwritten + parameters: + - description: Specifies job and payload to add or update + in: body + name: request + required: true + schema: + $ref: '#/definitions/api.JobMetaRequest' + produces: + - application/json + responses: + "200": + description: Updated job resource + schema: + $ref: '#/definitions/schema.Job' + "400": + description: Bad Request + schema: + $ref: '#/definitions/api.ErrorResponse' + "401": + description: Unauthorized + schema: + $ref: '#/definitions/api.ErrorResponse' + "404": + description: Job does not exist + schema: + $ref: '#/definitions/api.ErrorResponse' + "500": + description: Internal Server Error + schema: + $ref: '#/definitions/api.ErrorResponse' + security: + - ApiKeyAuth: [] + summary: Edit meta-data json by request + tags: + - Job add and modify /api/jobs/edit_meta/{id}: - post: + patch: consumes: - application/json description: |- diff --git a/internal/api/docs.go b/internal/api/docs.go index 78eecfa3..de3cf506 100644 --- a/internal/api/docs.go +++ b/internal/api/docs.go @@ -396,8 +396,71 @@ const docTemplate = `{ ] } }, + "/api/jobs/edit_meta/": { + "patch": { + "security": [ + { + "ApiKeyAuth": [] + } + ], + "description": "Edit key value pairs in metadata json of job specified by jobID, StartTime and Cluster\nIf a key already exists its content will be overwritten", + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "tags": [ + "Job add and modify" + ], + "summary": "Edit meta-data json by request", + "parameters": [ + { + "description": "Specifies job and payload to add or update", + "name": "request", + "in": "body", + "required": true, + "schema": { + "$ref": "#/definitions/api.JobMetaRequest" + } + } + ], + "responses": { + "200": { + "description": "Updated job resource", + "schema": { + "$ref": "#/definitions/schema.Job" + } + }, + "400": { + "description": "Bad Request", + "schema": { + "$ref": "#/definitions/api.ErrorResponse" + } + }, + "401": { + "description": "Unauthorized", + "schema": { + "$ref": "#/definitions/api.ErrorResponse" + } + }, + "404": { + "description": "Job does not exist", + "schema": { + "$ref": "#/definitions/api.ErrorResponse" + } + }, + "500": { + "description": "Internal Server Error", + "schema": { + "$ref": "#/definitions/api.ErrorResponse" + } + } + } + } + }, "/api/jobs/edit_meta/{id}": { - "post": { + "patch": { "description": "Edit key value pairs in job metadata json\nIf a key already exists its content will be overwritten", "consumes": [ "application/json" diff --git a/internal/api/job.go b/internal/api/job.go index 59136b02..ad905dd2 100644 --- a/internal/api/job.go +++ b/internal/api/job.go @@ -72,6 +72,14 @@ type EditMetaRequest struct { Value string `json:"value" example:"bash script"` } +// JobMetaRequest model +type JobMetaRequest struct { + JobId *int64 `json:"jobId" validate:"required" example:"123000"` // Cluster Job ID of job + Cluster *string `json:"cluster" example:"fritz"` // Cluster of job + StartTime *int64 `json:"startTime" example:"1649723812"` // Start Time of job as epoch + Payload EditMetaRequest `json:"payload"` // Content to Add to Job Meta_Data +} + type TagJobAPIRequest []*APITag type GetJobAPIRequest []string @@ -423,21 +431,21 @@ func (api *RestAPI) getJobByID(rw http.ResponseWriter, r *http.Request) { } // editMeta godoc -// @summary Edit meta-data json +// @summary Edit meta-data json of job identified by database id // @tags Job add and modify -// @description Edit key value pairs in job metadata json +// @description Edit key value pairs in job metadata json of job specified by database id // @description If a key already exists its content will be overwritten // @accept json // @produce json // @param id path int true "Job Database ID" -// @param request body api.EditMetaRequest true "Kay value pair to add" +// @param request body api.EditMetaRequest true "Metadata Key value pair to add or update" // @success 200 {object} schema.Job "Updated job resource" // @failure 400 {object} api.ErrorResponse "Bad Request" // @failure 401 {object} api.ErrorResponse "Unauthorized" // @failure 404 {object} api.ErrorResponse "Job does not exist" // @failure 500 {object} api.ErrorResponse "Internal Server Error" // @security ApiKeyAuth -// @router /api/jobs/edit_meta/{id} [post] +// @router /api/jobs/edit_meta/{id} [patch] func (api *RestAPI) editMeta(rw http.ResponseWriter, r *http.Request) { id, err := strconv.ParseInt(chi.URLParam(r, "id"), 10, 64) if err != nil { @@ -469,6 +477,54 @@ func (api *RestAPI) editMeta(rw http.ResponseWriter, r *http.Request) { } } +// editMetaByRequest godoc +// @summary Edit meta-data json of job identified by request +// @tags Job add and modify +// @description Edit key value pairs in metadata json of job specified by jobID, StartTime and Cluster +// @description If a key already exists its content will be overwritten +// @accept json +// @produce json +// @param request body api.JobMetaRequest true "Specifies job and payload to add or update" +// @success 200 {object} schema.Job "Updated job resource" +// @failure 400 {object} api.ErrorResponse "Bad Request" +// @failure 401 {object} api.ErrorResponse "Unauthorized" +// @failure 404 {object} api.ErrorResponse "Job does not exist" +// @failure 500 {object} api.ErrorResponse "Internal Server Error" +// @security ApiKeyAuth +// @router /api/jobs/edit_meta/ [patch] +func (api *RestAPI) editMetaByRequest(rw http.ResponseWriter, r *http.Request) { + // Parse request body + req := JobMetaRequest{} + if err := decode(r.Body, &req); err != nil { + handleError(fmt.Errorf("parsing request body failed: %w", err), http.StatusBadRequest, rw) + return + } + + // Fetch job (that will have its meta_data edited) from db + var job *schema.Job + var err error + if req.JobId == nil { + handleError(errors.New("the field 'jobId' is required"), http.StatusBadRequest, rw) + return + } + + // log.Printf("loading db job for editMetaByRequest... : JobMetaRequest=%v", req) + job, err = api.JobRepository.Find(req.JobId, req.Cluster, req.StartTime) + if err != nil { + handleError(fmt.Errorf("finding job failed: %w", err), http.StatusUnprocessableEntity, rw) + return + } + + if err := api.JobRepository.UpdateMetadata(job, req.Payload.Key, req.Payload.Value); err != nil { + http.Error(rw, err.Error(), http.StatusInternalServerError) + return + } + + rw.Header().Add("Content-Type", "application/json") + rw.WriteHeader(http.StatusOK) + json.NewEncoder(rw).Encode(job) +} + // tagJob godoc // @summary Adds one or more tags to a job // @tags Job add and modify diff --git a/internal/api/rest.go b/internal/api/rest.go index fe722511..6eadc6a8 100644 --- a/internal/api/rest.go +++ b/internal/api/rest.go @@ -95,8 +95,8 @@ func (api *RestAPI) MountAPIRoutes(r chi.Router) { r.Post("/jobs/tag_job/{id}", api.tagJob) r.Patch("/jobs/tag_job/{id}", api.tagJob) r.Delete("/jobs/tag_job/{id}", api.removeTagJob) - r.Post("/jobs/edit_meta/{id}", api.editMeta) r.Patch("/jobs/edit_meta/{id}", api.editMeta) + r.Patch("/jobs/edit_meta/", api.editMetaByRequest) r.Get("/jobs/metrics/{id}", api.getJobMetrics) r.Delete("/jobs/delete_job/", api.deleteJobByRequest) r.Delete("/jobs/delete_job/{id}", api.deleteJobByID) From 82e79b074a3d4b5576331f180855353ba36cefb3 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Sat, 21 Feb 2026 13:51:31 +0100 Subject: [PATCH 02/11] Reverse Lookup order in stop job request --- internal/api/api_test.go | 160 +++++++++++++++++++++++++++++++++++++++ internal/api/job.go | 13 ++-- internal/api/nats.go | 12 +-- 3 files changed, 172 insertions(+), 13 deletions(-) diff --git a/internal/api/api_test.go b/internal/api/api_test.go index 09fc4c7f..a8aef889 100644 --- a/internal/api/api_test.go +++ b/internal/api/api_test.go @@ -488,3 +488,163 @@ func TestRestApi(t *testing.T) { } }) } + +// TestStopJobWithReusedJobId verifies that stopping a recently started job works +// even when an older job with the same jobId exists in the job table (e.g. with +// state "failed"). This is a regression test for the bug where Find() on the job +// table would match the old job instead of the new one still in job_cache. +func TestStopJobWithReusedJobId(t *testing.T) { + restapi := setup(t) + t.Cleanup(cleanup) + + testData := schema.JobData{ + "load_one": map[schema.MetricScope]*schema.JobMetric{ + schema.MetricScopeNode: { + Unit: schema.Unit{Base: "load"}, + Timestep: 60, + Series: []schema.Series{ + { + Hostname: "host123", + Statistics: schema.MetricStatistics{Min: 0.1, Avg: 0.2, Max: 0.3}, + Data: []schema.Float{0.1, 0.1, 0.1, 0.2, 0.2, 0.2, 0.3, 0.3, 0.3}, + }, + }, + }, + }, + } + + metricstore.TestLoadDataCallback = func(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context, resolution int) (schema.JobData, error) { + return testData, nil + } + + r := chi.NewRouter() + restapi.MountAPIRoutes(r) + + const contextUserKey repository.ContextKey = "user" + contextUserValue := &schema.User{ + Username: "testuser", + Projects: make([]string, 0), + Roles: []string{"user"}, + AuthType: 0, + AuthSource: 2, + } + + // Step 1: Start the first job (jobId=999) + const startJobBody1 string = `{ + "jobId": 999, + "user": "testuser", + "project": "testproj", + "cluster": "testcluster", + "partition": "default", + "walltime": 3600, + "numNodes": 1, + "numHwthreads": 8, + "numAcc": 0, + "shared": "none", + "monitoringStatus": 1, + "smt": 1, + "resources": [{"hostname": "host123", "hwthreads": [0, 1, 2, 3, 4, 5, 6, 7]}], + "startTime": 200000000 + }` + + if ok := t.Run("StartFirstJob", func(t *testing.T) { + req := httptest.NewRequest(http.MethodPost, "/jobs/start_job/", bytes.NewBuffer([]byte(startJobBody1))) + recorder := httptest.NewRecorder() + ctx := context.WithValue(req.Context(), contextUserKey, contextUserValue) + r.ServeHTTP(recorder, req.WithContext(ctx)) + if recorder.Result().StatusCode != http.StatusCreated { + t.Fatal(recorder.Result().Status, recorder.Body.String()) + } + }); !ok { + return + } + + // Step 2: Sync to move job from cache to job table, then stop it as "failed" + time.Sleep(1 * time.Second) + restapi.JobRepository.SyncJobs() + + const stopJobBody1 string = `{ + "jobId": 999, + "startTime": 200000000, + "cluster": "testcluster", + "jobState": "failed", + "stopTime": 200001000 + }` + + if ok := t.Run("StopFirstJobAsFailed", func(t *testing.T) { + req := httptest.NewRequest(http.MethodPost, "/jobs/stop_job/", bytes.NewBuffer([]byte(stopJobBody1))) + recorder := httptest.NewRecorder() + ctx := context.WithValue(req.Context(), contextUserKey, contextUserValue) + r.ServeHTTP(recorder, req.WithContext(ctx)) + if recorder.Result().StatusCode != http.StatusOK { + t.Fatal(recorder.Result().Status, recorder.Body.String()) + } + + jobid, cluster := int64(999), "testcluster" + job, err := restapi.JobRepository.Find(&jobid, &cluster, nil) + if err != nil { + t.Fatal(err) + } + if job.State != schema.JobStateFailed { + t.Fatalf("expected first job to be failed, got: %s", job.State) + } + }); !ok { + return + } + + // Wait for archiving to complete + time.Sleep(1 * time.Second) + + // Step 3: Start a NEW job with the same jobId=999 but different startTime. + // This job will sit in job_cache (not yet synced). + const startJobBody2 string = `{ + "jobId": 999, + "user": "testuser", + "project": "testproj", + "cluster": "testcluster", + "partition": "default", + "walltime": 3600, + "numNodes": 1, + "numHwthreads": 8, + "numAcc": 0, + "shared": "none", + "monitoringStatus": 1, + "smt": 1, + "resources": [{"hostname": "host123", "hwthreads": [0, 1, 2, 3, 4, 5, 6, 7]}], + "startTime": 300000000 + }` + + if ok := t.Run("StartSecondJob", func(t *testing.T) { + req := httptest.NewRequest(http.MethodPost, "/jobs/start_job/", bytes.NewBuffer([]byte(startJobBody2))) + recorder := httptest.NewRecorder() + ctx := context.WithValue(req.Context(), contextUserKey, contextUserValue) + r.ServeHTTP(recorder, req.WithContext(ctx)) + if recorder.Result().StatusCode != http.StatusCreated { + t.Fatal(recorder.Result().Status, recorder.Body.String()) + } + }); !ok { + return + } + + // Step 4: Stop the second job WITHOUT syncing first. + // Before the fix, this would fail because Find() on the job table would + // match the old failed job (jobId=999) and reject with "already stopped". + const stopJobBody2 string = `{ + "jobId": 999, + "startTime": 300000000, + "cluster": "testcluster", + "jobState": "completed", + "stopTime": 300001000 + }` + + t.Run("StopSecondJobBeforeSync", func(t *testing.T) { + req := httptest.NewRequest(http.MethodPost, "/jobs/stop_job/", bytes.NewBuffer([]byte(stopJobBody2))) + recorder := httptest.NewRecorder() + ctx := context.WithValue(req.Context(), contextUserKey, contextUserValue) + r.ServeHTTP(recorder, req.WithContext(ctx)) + if recorder.Result().StatusCode != http.StatusOK { + t.Fatalf("expected stop to succeed for cached job, got: %s %s", + recorder.Result().Status, recorder.Body.String()) + } + }) +} diff --git a/internal/api/job.go b/internal/api/job.go index 66258668..c4a81cf2 100644 --- a/internal/api/job.go +++ b/internal/api/job.go @@ -755,16 +755,15 @@ func (api *RestAPI) stopJobByRequest(rw http.ResponseWriter, r *http.Request) { } isCached := false - job, err = api.JobRepository.Find(req.JobID, req.Cluster, req.StartTime) + job, err = api.JobRepository.FindCached(req.JobID, req.Cluster, req.StartTime) if err != nil { - // Try cached jobs if not found in main repository - cachedJob, cachedErr := api.JobRepository.FindCached(req.JobID, req.Cluster, req.StartTime) - if cachedErr != nil { - // Combine both errors for better debugging - handleError(fmt.Errorf("finding job failed: %w (cached lookup also failed: %v)", err, cachedErr), http.StatusNotFound, rw) + // Not in cache, try main job table + job, err = api.JobRepository.Find(req.JobID, req.Cluster, req.StartTime) + if err != nil { + handleError(fmt.Errorf("finding job failed: %w", err), http.StatusNotFound, rw) return } - job = cachedJob + } else { isCached = true } diff --git a/internal/api/nats.go b/internal/api/nats.go index 0e929426..f1684f20 100644 --- a/internal/api/nats.go +++ b/internal/api/nats.go @@ -252,15 +252,15 @@ func (api *NatsAPI) handleStopJob(payload string) { } isCached := false - job, err := api.JobRepository.Find(req.JobID, req.Cluster, req.StartTime) + job, err := api.JobRepository.FindCached(req.JobID, req.Cluster, req.StartTime) if err != nil { - cachedJob, cachedErr := api.JobRepository.FindCached(req.JobID, req.Cluster, req.StartTime) - if cachedErr != nil { - cclog.Errorf("NATS job stop: finding job failed: %v (cached lookup also failed: %v)", - err, cachedErr) + // Not in cache, try main job table + job, err = api.JobRepository.Find(req.JobID, req.Cluster, req.StartTime) + if err != nil { + cclog.Errorf("NATS job stop: finding job failed: %v", err) return } - job = cachedJob + } else { isCached = true } From 8ee6c09e9ba723a4299930bd8b3cfc2b01493f9b Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Sun, 22 Feb 2026 09:42:53 +0100 Subject: [PATCH 03/11] Enable to run taggers from within admin web interface --- internal/api/rest.go | 39 +++++ internal/tagger/tagger.go | 92 +++++++++++- web/frontend/src/config/AdminSettings.svelte | 2 + .../src/config/admin/RunTaggers.svelte | 142 ++++++++++++++++++ 4 files changed, 273 insertions(+), 2 deletions(-) create mode 100644 web/frontend/src/config/admin/RunTaggers.svelte diff --git a/internal/api/rest.go b/internal/api/rest.go index fe722511..4c964b19 100644 --- a/internal/api/rest.go +++ b/internal/api/rest.go @@ -22,6 +22,7 @@ import ( "github.com/ClusterCockpit/cc-backend/internal/auth" "github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-backend/internal/repository" + "github.com/ClusterCockpit/cc-backend/internal/tagger" cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" "github.com/ClusterCockpit/cc-lib/v2/schema" "github.com/ClusterCockpit/cc-lib/v2/util" @@ -152,6 +153,8 @@ func (api *RestAPI) MountConfigAPIRoutes(r chi.Router) { r.Delete("/config/users/", api.deleteUser) r.Post("/config/user/{id}", api.updateUser) r.Post("/config/notice/", api.editNotice) + r.Get("/config/taggers/", api.getTaggers) + r.Post("/config/taggers/run/", api.runTagger) } } @@ -268,6 +271,42 @@ func (api *RestAPI) editNotice(rw http.ResponseWriter, r *http.Request) { } } +func (api *RestAPI) getTaggers(rw http.ResponseWriter, r *http.Request) { + if user := repository.GetUserFromContext(r.Context()); !user.HasRole(schema.RoleAdmin) { + handleError(fmt.Errorf("only admins are allowed to list taggers"), http.StatusForbidden, rw) + return + } + + rw.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(rw).Encode(tagger.ListTaggers()); err != nil { + cclog.Errorf("Failed to encode tagger list: %v", err) + } +} + +func (api *RestAPI) runTagger(rw http.ResponseWriter, r *http.Request) { + if user := repository.GetUserFromContext(r.Context()); !user.HasRole(schema.RoleAdmin) { + handleError(fmt.Errorf("only admins are allowed to run taggers"), http.StatusForbidden, rw) + return + } + + name := r.FormValue("name") + if name == "" { + handleError(fmt.Errorf("missing required parameter: name"), http.StatusBadRequest, rw) + return + } + + if err := tagger.RunTaggerByName(name); err != nil { + handleError(err, http.StatusConflict, rw) + return + } + + rw.Header().Set("Content-Type", "text/plain") + rw.WriteHeader(http.StatusOK) + if _, err := rw.Write([]byte(fmt.Sprintf("Tagger %s started", name))); err != nil { + cclog.Errorf("Failed to write response: %v", err) + } +} + // getJWT godoc // @summary Generate JWT token // @tags Frontend diff --git a/internal/tagger/tagger.go b/internal/tagger/tagger.go index bde3817d..5ee27e08 100644 --- a/internal/tagger/tagger.go +++ b/internal/tagger/tagger.go @@ -10,6 +10,7 @@ package tagger import ( + "fmt" "sync" "github.com/ClusterCockpit/cc-backend/internal/repository" @@ -29,11 +30,31 @@ type Tagger interface { Match(job *schema.Job) } +// TaggerInfo holds metadata about a tagger for JSON serialization. +type TaggerInfo struct { + Name string `json:"name"` + Type string `json:"type"` + Running bool `json:"running"` +} + var ( - initOnce sync.Once - jobTagger *JobTagger + initOnce sync.Once + jobTagger *JobTagger + statusMu sync.Mutex + taggerStatus = map[string]bool{} ) +// Known tagger definitions: name -> (type, factory) +type taggerDef struct { + ttype string + factory func() Tagger +} + +var knownTaggers = map[string]taggerDef{ + "AppTagger": {ttype: "start", factory: func() Tagger { return &AppTagger{} }}, + "JobClassTagger": {ttype: "stop", factory: func() Tagger { return &JobClassTagger{} }}, +} + // JobTagger coordinates multiple taggers that run at different job lifecycle events. // It maintains separate lists of taggers that run when jobs start and when they stop. type JobTagger struct { @@ -88,6 +109,73 @@ func (jt *JobTagger) JobStopCallback(job *schema.Job) { } } +// ListTaggers returns information about all known taggers with their current running status. +func ListTaggers() []TaggerInfo { + statusMu.Lock() + defer statusMu.Unlock() + + result := make([]TaggerInfo, 0, len(knownTaggers)) + for name, def := range knownTaggers { + result = append(result, TaggerInfo{ + Name: name, + Type: def.ttype, + Running: taggerStatus[name], + }) + } + return result +} + +// RunTaggerByName starts a tagger by name asynchronously on all jobs. +// Returns an error if the name is unknown or the tagger is already running. +func RunTaggerByName(name string) error { + def, ok := knownTaggers[name] + if !ok { + return fmt.Errorf("unknown tagger: %s", name) + } + + statusMu.Lock() + if taggerStatus[name] { + statusMu.Unlock() + return fmt.Errorf("tagger %s is already running", name) + } + taggerStatus[name] = true + statusMu.Unlock() + + go func() { + defer func() { + statusMu.Lock() + taggerStatus[name] = false + statusMu.Unlock() + }() + + t := def.factory() + if err := t.Register(); err != nil { + cclog.Errorf("Failed to register tagger %s: %s", name, err) + return + } + + r := repository.GetJobRepository() + jl, err := r.GetJobList(0, 0) + if err != nil { + cclog.Errorf("Error getting job list for tagger %s: %s", name, err) + return + } + + cclog.Infof("Running tagger %s on %d jobs", name, len(jl)) + for _, id := range jl { + job, err := r.FindByIDDirect(id) + if err != nil { + cclog.Errorf("Error getting job %d for tagger %s: %s", id, name, err) + continue + } + t.Match(job) + } + cclog.Infof("Tagger %s completed", name) + }() + + return nil +} + // RunTaggers applies all configured taggers to all existing jobs in the repository. // This is useful for retroactively applying tags to jobs that were created before // the tagger system was initialized or when new tagging rules are added. diff --git a/web/frontend/src/config/AdminSettings.svelte b/web/frontend/src/config/AdminSettings.svelte index 46958963..5d61e66c 100644 --- a/web/frontend/src/config/AdminSettings.svelte +++ b/web/frontend/src/config/AdminSettings.svelte @@ -15,6 +15,7 @@ import ShowUsers from "./admin/ShowUsers.svelte"; import Options from "./admin/Options.svelte"; import NoticeEdit from "./admin/NoticeEdit.svelte"; + import RunTaggers from "./admin/RunTaggers.svelte"; /* Svelte 5 Props */ let { @@ -70,4 +71,5 @@ + diff --git a/web/frontend/src/config/admin/RunTaggers.svelte b/web/frontend/src/config/admin/RunTaggers.svelte new file mode 100644 index 00000000..0df3e961 --- /dev/null +++ b/web/frontend/src/config/admin/RunTaggers.svelte @@ -0,0 +1,142 @@ + + + + + + + + Job Taggers +

Run individual taggers on all existing jobs.

+ {#if taggers.length === 0} +

No taggers available.

+ {:else} + + + + + + + + + + + {#each taggers as tagger} + + + + + + + {/each} + +
NameTypeStatus
{tagger.name}{tagger.type} + {#if tagger.running} + Running + {:else} + Idle + {/if} + + +
+ {/if} +

+ {#if displayMessage}{message.msg}{/if} +

+
+
+ From 86fbecc6797632223918af468480e772839f3948 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Sun, 22 Feb 2026 09:45:48 +0100 Subject: [PATCH 04/11] Update frontend deps --- web/frontend/package-lock.json | 29 ++++++++++++++++++----------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/web/frontend/package-lock.json b/web/frontend/package-lock.json index 6962dc1b..e293d650 100644 --- a/web/frontend/package-lock.json +++ b/web/frontend/package-lock.json @@ -609,6 +609,12 @@ "dev": true, "license": "MIT" }, + "node_modules/@types/trusted-types": { + "version": "2.0.7", + "resolved": "https://registry.npmjs.org/@types/trusted-types/-/trusted-types-2.0.7.tgz", + "integrity": "sha512-ScaPdn1dQczgbl0QFTeTOmVHFULt394XJgOQNoyVhZ6r2vLnMLJfBPd53SB52T/3G36VI1/g2MZaX0cwDuXsfw==", + "license": "MIT" + }, "node_modules/@urql/core": { "version": "5.2.0", "resolved": "https://registry.npmjs.org/@urql/core/-/core-5.2.0.tgz", @@ -745,9 +751,9 @@ } }, "node_modules/devalue": { - "version": "5.6.2", - "resolved": "https://registry.npmjs.org/devalue/-/devalue-5.6.2.tgz", - "integrity": "sha512-nPRkjWzzDQlsejL1WVifk5rvcFi/y1onBRxjaFMjZeR9mFpqu2gmAZ9xUB9/IEanEP/vBtGeGganC/GO1fmufg==", + "version": "5.6.3", + "resolved": "https://registry.npmjs.org/devalue/-/devalue-5.6.3.tgz", + "integrity": "sha512-nc7XjUU/2Lb+SvEFVGcWLiKkzfw8+qHI7zn8WYXKkLMgfGSHbgCEaR6bJpev8Cm6Rmrb19Gfd/tZvGqx9is3wg==", "license": "MIT" }, "node_modules/escape-latex": { @@ -763,9 +769,9 @@ "license": "MIT" }, "node_modules/esrap": { - "version": "2.2.1", - "resolved": "https://registry.npmjs.org/esrap/-/esrap-2.2.1.tgz", - "integrity": "sha512-GiYWG34AN/4CUyaWAgunGt0Rxvr1PTMlGC0vvEov/uOQYWne2bpN03Um+k8jT+q3op33mKouP2zeJ6OlM+qeUg==", + "version": "2.2.3", + "resolved": "https://registry.npmjs.org/esrap/-/esrap-2.2.3.tgz", + "integrity": "sha512-8fOS+GIGCQZl/ZIlhl59htOlms6U8NvX6ZYgYHpRU/b6tVSh3uHkOHZikl3D4cMbYM0JlpBe+p/BkZEi8J9XIQ==", "license": "MIT", "dependencies": { "@jridgewell/sourcemap-codec": "^1.4.15" @@ -1176,22 +1182,23 @@ } }, "node_modules/svelte": { - "version": "5.46.4", - "resolved": "https://registry.npmjs.org/svelte/-/svelte-5.46.4.tgz", - "integrity": "sha512-VJwdXrmv9L8L7ZasJeWcCjoIuMRVbhuxbss0fpVnR8yorMmjNDwcjIH08vS6wmSzzzgAG5CADQ1JuXPS2nwt9w==", + "version": "5.53.2", + "resolved": "https://registry.npmjs.org/svelte/-/svelte-5.53.2.tgz", + "integrity": "sha512-yGONuIrcl/BMmqbm6/52Q/NYzfkta7uVlos5NSzGTfNJTTFtPPzra6rAQoQIwAqupeM3s9uuTf5PvioeiCdg9g==", "license": "MIT", "dependencies": { "@jridgewell/remapping": "^2.3.4", "@jridgewell/sourcemap-codec": "^1.5.0", "@sveltejs/acorn-typescript": "^1.0.5", "@types/estree": "^1.0.5", + "@types/trusted-types": "^2.0.7", "acorn": "^8.12.1", "aria-query": "^5.3.1", "axobject-query": "^4.1.0", "clsx": "^2.1.1", - "devalue": "^5.6.2", + "devalue": "^5.6.3", "esm-env": "^1.2.1", - "esrap": "^2.2.1", + "esrap": "^2.2.2", "is-reference": "^3.0.3", "locate-character": "^3.0.0", "magic-string": "^0.30.11", From c9d8de0d56b8a3800a658e380af0035408c40480 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Sun, 22 Feb 2026 13:27:51 +0100 Subject: [PATCH 05/11] Fix and extend jobclass rules --- .../tagger/jobclasses/highMemoryUsage.json | 25 +++++++++++++++++++ configs/tagger/jobclasses/highload.json | 9 ++----- configs/tagger/jobclasses/lowUtilization.json | 12 ++++----- configs/tagger/jobclasses/lowload.json | 11 +++----- configs/tagger/jobclasses/memoryBound.json | 22 ++++++++++++++++ configs/tagger/jobclasses/parameters.json | 5 ++-- 6 files changed, 61 insertions(+), 23 deletions(-) create mode 100644 configs/tagger/jobclasses/highMemoryUsage.json create mode 100644 configs/tagger/jobclasses/memoryBound.json diff --git a/configs/tagger/jobclasses/highMemoryUsage.json b/configs/tagger/jobclasses/highMemoryUsage.json new file mode 100644 index 00000000..3c10b06f --- /dev/null +++ b/configs/tagger/jobclasses/highMemoryUsage.json @@ -0,0 +1,25 @@ +{ + "name": "High memory usage", + "tag": "highmemory", + "parameters": [ + "highmemoryusage_threshold_factor", + "job_min_duration_seconds" + ], + "metrics": ["mem_used"], + "requirements": [ + "job.shared == \"none\"", + "job.duration > job_min_duration_seconds" + ], + "variables": [ + { + "name": "memory_threshold", + "expr": "mem_used.limits.peak * highmemoryusage_threshold_factor" + }, + { + "name": "memory_usage_pct", + "expr": "mem_used.max / mem_used.limits.peak * 100.0" + } + ], + "rule": "mem_used.max > memory_threshold", + "hint": "This job used high memory: peak memory usage {{.mem_used.max}} GB ({{.memory_usage_pct}}% of {{.mem_used.limits.peak}} GB node capacity), exceeding the {{.highmemoryusage_threshold_factor}} utilization threshold. Risk of out-of-memory conditions." +} diff --git a/configs/tagger/jobclasses/highload.json b/configs/tagger/jobclasses/highload.json index 9667011b..a442a3ac 100644 --- a/configs/tagger/jobclasses/highload.json +++ b/configs/tagger/jobclasses/highload.json @@ -3,8 +3,7 @@ "tag": "excessiveload", "parameters": [ "excessivecpuload_threshold_factor", - "job_min_duration_seconds", - "sampling_interval_seconds" + "job_min_duration_seconds" ], "metrics": ["cpu_load"], "requirements": [ @@ -15,12 +14,8 @@ { "name": "load_threshold", "expr": "cpu_load.limits.peak * excessivecpuload_threshold_factor" - }, - { - "name": "load_perc", - "expr": "1.0 - (cpu_load.avg / cpu_load.limits.peak)" } ], "rule": "cpu_load.avg > load_threshold", - "hint": "This job was detected as excessiveload because the average cpu load {{.cpu_load.avg}} falls above the threshold {{.load_threshold}}." + "hint": "This job was detected as having excessive CPU load: average cpu load {{.cpu_load.avg}} exceeds the oversubscription threshold {{.load_threshold}} ({{.excessivecpuload_threshold_factor}} \u00d7 {{.cpu_load.limits.peak}} peak cores), indicating CPU contention." } diff --git a/configs/tagger/jobclasses/lowUtilization.json b/configs/tagger/jobclasses/lowUtilization.json index e84b81da..1d365150 100644 --- a/configs/tagger/jobclasses/lowUtilization.json +++ b/configs/tagger/jobclasses/lowUtilization.json @@ -1,5 +1,5 @@ { - "name": "Low ressource utilization", + "name": "Low resource utilization", "tag": "lowutilization", "parameters": ["job_min_duration_seconds"], "metrics": ["flops_any", "mem_bw"], @@ -9,14 +9,14 @@ ], "variables": [ { - "name": "mem_bw_perc", - "expr": "1.0 - (mem_bw.avg / mem_bw.limits.peak)" + "name": "mem_bw_pct", + "expr": "mem_bw.avg / mem_bw.limits.peak * 100.0" }, { - "name": "flops_any_perc", - "expr": "1.0 - (flops_any.avg / flops_any.limits.peak)" + "name": "flops_any_pct", + "expr": "flops_any.avg / flops_any.limits.peak * 100.0" } ], "rule": "flops_any.avg < flops_any.limits.alert && mem_bw.avg < mem_bw.limits.alert", - "hint": "This job was detected as low utilization because the average flop rate {{.flops_any.avg}} falls below the threshold {{.flops_any.limits.alert}}." + "hint": "This job shows low resource utilization: FLOP rate {{.flops_any.avg}} GF/s ({{.flops_any_pct}}% of peak) and memory bandwidth {{.mem_bw.avg}} GB/s ({{.mem_bw_pct}}% of peak) are both below their alert thresholds." } diff --git a/configs/tagger/jobclasses/lowload.json b/configs/tagger/jobclasses/lowload.json index f952da59..7fa3ca3b 100644 --- a/configs/tagger/jobclasses/lowload.json +++ b/configs/tagger/jobclasses/lowload.json @@ -3,8 +3,7 @@ "tag": "lowload", "parameters": [ "lowcpuload_threshold_factor", - "job_min_duration_seconds", - "sampling_interval_seconds" + "job_min_duration_seconds" ], "metrics": ["cpu_load"], "requirements": [ @@ -15,12 +14,8 @@ { "name": "load_threshold", "expr": "job.numCores * lowcpuload_threshold_factor" - }, - { - "name": "load_perc", - "expr": "1.0 - (cpu_load.avg / cpu_load.limits.peak)" } ], - "rule": "cpu_load.avg < cpu_load.limits.caution", - "hint": "This job was detected as lowload because the average cpu load {{.cpu_load}} falls below the threshold {{.cpu_load.limits.caution}}." + "rule": "cpu_load.avg < load_threshold", + "hint": "This job was detected as low CPU load: average cpu load {{.cpu_load.avg}} is below the threshold {{.load_threshold}} ({{.lowcpuload_threshold_factor}} \u00d7 {{.job.numCores}} allocated cores)." } diff --git a/configs/tagger/jobclasses/memoryBound.json b/configs/tagger/jobclasses/memoryBound.json new file mode 100644 index 00000000..01368c08 --- /dev/null +++ b/configs/tagger/jobclasses/memoryBound.json @@ -0,0 +1,22 @@ +{ + "name": "Memory bandwidth bound", + "tag": "memorybound", + "parameters": ["membound_bw_threshold_factor", "job_min_duration_seconds"], + "metrics": ["mem_bw"], + "requirements": [ + "job.shared == \"none\"", + "job.duration > job_min_duration_seconds" + ], + "variables": [ + { + "name": "mem_bw_threshold", + "expr": "mem_bw.limits.peak * membound_bw_threshold_factor" + }, + { + "name": "mem_bw_pct", + "expr": "mem_bw.avg / mem_bw.limits.peak * 100.0" + } + ], + "rule": "mem_bw.avg > mem_bw_threshold", + "hint": "This job is memory bandwidth bound: memory bandwidth {{.mem_bw.avg}} GB/s ({{.mem_bw_pct}}% of peak) is within {{.membound_bw_threshold_factor}} of peak bandwidth. Consider improving data reuse or compute intensity." +} diff --git a/configs/tagger/jobclasses/parameters.json b/configs/tagger/jobclasses/parameters.json index 39e94c1c..c3fb5cdc 100644 --- a/configs/tagger/jobclasses/parameters.json +++ b/configs/tagger/jobclasses/parameters.json @@ -1,11 +1,12 @@ { - "lowcpuload_threshold_factor": 0.9, - "excessivecpuload_threshold_factor": 1.1, + "lowcpuload_threshold_factor": 0.85, + "excessivecpuload_threshold_factor": 1.2, "highmemoryusage_threshold_factor": 0.9, "node_load_imbalance_threshold_factor": 0.1, "core_load_imbalance_threshold_factor": 0.1, "high_memory_load_threshold_factor": 0.9, "lowgpuload_threshold_factor": 0.7, + "membound_bw_threshold_factor": 0.8, "memory_leak_slope_threshold": 0.1, "job_min_duration_seconds": 600.0, "sampling_interval_seconds": 30.0, From defa8fa994869592467fdf9e6e8b62b12220ea7c Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Mon, 23 Feb 2026 08:45:49 +0100 Subject: [PATCH 06/11] Update ReleaseNotes for v1.5.0 --- ReleaseNotes.md | 105 ++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 97 insertions(+), 8 deletions(-) diff --git a/ReleaseNotes.md b/ReleaseNotes.md index 7ea43620..1178ca8e 100644 --- a/ReleaseNotes.md +++ b/ReleaseNotes.md @@ -16,19 +16,25 @@ For release specific notes visit the [ClusterCockpit Documentation](https://clus - **Removed `disable-archive` option**: This obsolete configuration option has been removed. - **Removed `clusters` config section**: The separate clusters configuration section has been removed. Cluster information is now derived from the job archive. -- **`apiAllowedIPs` is now optional**: If not specified, defaults to secure settings. +- **`apiAllowedIPs` is now optional**: If not specified, defaults to not + restricted. ### Architecture changes +- **Web framework replaced**: Migrated from `gorilla/mux` to `chi` as the HTTP + router. This should be transparent to users but affects how middleware and + routes are composed. A proper 404 handler is now in place. - **MetricStore moved**: The `metricstore` package has been moved from `internal/` to `pkg/` as it is now part of the public API. - **MySQL/MariaDB support removed**: Only SQLite is now supported as the database backend. - **Archive to Cleanup renaming**: Archive-related functions have been refactored and renamed to "Cleanup" for clarity. +- **`minRunningFor` filter removed**: This undocumented filter has been removed + from the API and frontend. ### Dependency changes -- **cc-lib v2**: Switched to cc-lib version 2 with updated APIs +- **cc-lib v2.5.1**: Switched to cc-lib version 2 with updated APIs (currently at v2.5.1) - **cclib NATS client**: Now using the cclib NATS client implementation - Removed obsolete `util.Float` usage from cclib @@ -51,13 +57,30 @@ For release specific notes visit the [ClusterCockpit Documentation](https://clus - **Node state tracking**: New node table in database with timestamp tracking - **Node state filtering**: Filter jobs by node state in systems view -- **Node metrics improvements**: Better handling of node-level metrics and data - **Node list enhancements**: Improved paging, filtering, and continuous scroll support +- **Nodestate retention and archiving**: Node state data is now subject to configurable + retention policies and can be archived to Parquet format for long-term storage +- **Faulty node metric tracking**: Faulty node state metric lists are persisted to the database + +### Health Monitoring + +- **Health status dashboard**: New dedicated "Health" tab in the status details view + showing per-node metric health across the cluster +- **CCMS health check**: Support for querying health status of external + cc-metric-store (CCMS) instances via the API +- **GraphQL health endpoints**: New GraphQL queries and resolvers for health data +- **Cluster/subcluster filter**: Filter health status view by cluster or subcluster + +### Log Viewer + +- **Web-based log viewer**: New log viewer page in the admin interface for inspecting + backend log output directly from the browser without shell access +- **Accessible from header**: Quick access link from the navigation header ### MetricStore Improvements - **Memory tracking worker**: New worker for CCMS memory usage tracking -- **Dynamic retention**: Support for cluster/subcluster-specific retention times +- **Dynamic retention**: Support for job specific dynamic retention times - **Improved compression**: Transparent compression for job archive imports - **Parallel processing**: Parallelized Iter function in all archive backends @@ -65,15 +88,32 @@ For release specific notes visit the [ClusterCockpit Documentation](https://clus - **Job tagger option**: Enable automatic job tagging via configuration flag - **Application detection**: Automatic detection of applications (MATLAB, GROMACS, etc.) -- **Job classifaction**: Automatic detection of pathological jobs +- **Job classification**: Automatic detection of pathological jobs - **omitTagged flag**: Option to exclude tagged jobs from retention/cleanup operations +- **Admin UI trigger**: Taggers can be run on-demand from the admin web interface + without restarting the backend ### Archive Backends +- **Parquet archive format**: New Parquet file format for job archiving, providing + columnar storage with efficient compression for analytical workloads - **S3 backend**: Full support for S3-compatible object storage - **SQLite backend**: Full support for SQLite backend using blobs - **Performance improvements**: Fixed performance bugs in archive backends - **Better error handling**: Improved error messages and fallback handling +- **Zstd compression**: Parquet writers use zstd compression for better + compression ratios compared to the previous snappy default +- **Optimized sort order**: Job and nodestate Parquet files are sorted by + cluster, subcluster, and start time for efficient range queries + +### Unified Archive Retention and Format Conversion + +- **Uniform retention policy**: Job archive retention now supports both JSON and + Parquet as target formats under a single, consistent policy configuration +- **Archive manager tool**: The `tools/archive-manager` utility now supports + format conversion between JSON and Parquet job archives +- **Parquet reader**: Full Parquet archive reader implementation for reading back + archived job data ## New features and improvements @@ -85,6 +125,14 @@ For release specific notes visit the [ClusterCockpit Documentation](https://clus - **Filter presets**: Move list filter preset to URL for easy sharing - **Job comparison**: Improved job comparison views and plots - **Subcluster reactivity**: Job list now reacts to subcluster filter changes +- **Short jobs quick selection**: New "Short jobs" quick-filter button in job lists + replaces the removed undocumented `minRunningFor` filter +- **Row plot cursor sync**: Cursor position is now synchronized across all metric + plots in a job list row for easier cross-metric comparison +- **Disabled metrics handling**: Improved handling and display of disabled metrics + across job view, node view, and list rows +- **"Not configured" info cards**: Informational cards shown when optional features + are not yet configured - **Frontend dependencies**: Bumped frontend dependencies to latest versions - **Svelte 5 compatibility**: Fixed Svelte state warnings and compatibility issues @@ -95,6 +143,15 @@ For release specific notes visit the [ClusterCockpit Documentation](https://clus - **Graceful shutdown**: Fixed shutdown timeout bugs and hanging issues - **Configuration defaults**: Sensible defaults for most configuration options - **Documentation**: Extensive documentation improvements across packages +- **Server flag in systemd unit**: Example systemd unit now includes the `-server` flag + +### Security + +- **LDAP security hardening**: Improved input validation, connection handling, and + error reporting in the LDAP authenticator +- **OIDC security hardening**: Stricter token validation and improved error handling + in the OIDC authenticator +- **Auth schema extensions**: Additional schema fields for improved auth configuration ### API improvements @@ -102,6 +159,8 @@ For release specific notes visit the [ClusterCockpit Documentation](https://clus - **Job exclusivity filter**: New filter for exclusive vs. shared jobs - **Improved error messages**: Better error messages and documentation in REST API - **GraphQL enhancements**: Improved GraphQL queries and resolvers +- **Stop job lookup order**: Reversed lookup order in stop job requests for + more reliable job matching (cluster+jobId first, then jobId alone) ### Performance @@ -109,13 +168,17 @@ For release specific notes visit the [ClusterCockpit Documentation](https://clus - **Job cache**: Introduced caching table for faster job inserts - **Parallel imports**: Archive imports now run in parallel where possible - **External tool integration**: Optimized use of external tools (fd) for better performance +- **Node repository queries**: Reviewed and optimized node repository SQL queries +- **Buffer pool**: Resized and pooled internal buffers for better memory reuse ### Developer experience - **AI agent guidelines**: Added documentation for AI coding agents (AGENTS.md, CLAUDE.md) - **Example API payloads**: Added example JSON API payloads for testing -- **Unit tests**: Added more unit tests for NATS API and other components -- **Test improvements**: Better test coverage and test data +- **Unit tests**: Added more unit tests for NATS API, node repository, and other components +- **Test improvements**: Better test coverage; test DB is now copied before unit tests + to avoid state pollution between test runs +- **Parquet writer tests**: Comprehensive tests for Parquet archive writing and conversion ## Bug fixes @@ -132,6 +195,16 @@ For release specific notes visit the [ClusterCockpit Documentation](https://clus - Fixed polar plot data query decoupling - Fixed missing resolution parameter handling - Fixed node table initialization fallback +- Fixed reactivity key placement in nodeList +- Fixed nodeList resolver data handling and increased nodestate filter cutoff +- Fixed job always being transferred to main job table before archiving +- Fixed AppTagger error handling and logging +- Fixed log endpoint formatting and correctness +- Fixed automatic refresh in metric status tab +- Fixed NULL value handling in `health_state` and `health_metrics` columns +- Fixed bugs related to `job_cache` IDs being used in the main job table +- Fixed SyncJobs bug causing start job hooks to be called with wrong (cache) IDs +- Fixed 404 handler route for sub-routers ## Configuration changes @@ -167,6 +240,20 @@ For release specific notes visit the [ClusterCockpit Documentation](https://clus "interval": "48h", "directory": "./var/archive" } + }, + "archive": { + "retention": { + "policy": "delete", + "age": "6months", + "target-format": "parquet" + } + }, + "nodestate": { + "retention": { + "policy": "archive", + "age": "30d", + "archive-path": "./var/nodestate-archive" + } } } ``` @@ -178,11 +265,13 @@ For release specific notes visit the [ClusterCockpit Documentation](https://clus - If using S3 archive backend, configure the new `archive` section options - Test the new public dashboard at `/public` route - Review cron worker configuration if you need different frequencies +- If using the archive retention feature, configure the `target-format` option + to choose between `json` (default) and `parquet` output formats +- Consider enabling nodestate retention if you track node states over time ## Known issues - Currently energy footprint metrics of type energy are ignored for calculating total energy. -- Resampling for running jobs only works with cc-metric-store - With energy footprint metrics of type power the unit is ignored and it is assumed the metric has the unit Watt. From 03c65e06f6baf1b92690739caf377fea0001a521 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Mon, 23 Feb 2026 08:46:47 +0100 Subject: [PATCH 07/11] Allow finer control for omit tagged jobs in retention policies --- cmd/cc-backend/main.go | 6 +- internal/api/job.go | 10 ++-- internal/repository/job.go | 30 +++++++--- internal/repository/job_test.go | 64 ++++++++++++++++++---- internal/taskmanager/compressionService.go | 4 +- internal/taskmanager/retentionService.go | 2 +- internal/taskmanager/taskManager.go | 4 +- pkg/archive/ConfigSchema.go | 5 ++ 8 files changed, 92 insertions(+), 33 deletions(-) diff --git a/cmd/cc-backend/main.go b/cmd/cc-backend/main.go index fde95fd3..81d397d2 100644 --- a/cmd/cc-backend/main.go +++ b/cmd/cc-backend/main.go @@ -369,13 +369,11 @@ func runServer(ctx context.Context) error { errChan := make(chan error, 1) // Start HTTP server - wg.Add(1) - go func() { - defer wg.Done() + wg.Go(func() { if err := srv.Start(ctx); err != nil { errChan <- err } - }() + }) // Handle shutdown signals wg.Add(1) diff --git a/internal/api/job.go b/internal/api/job.go index 1322225b..62410001 100644 --- a/internal/api/job.go +++ b/internal/api/job.go @@ -904,11 +904,13 @@ func (api *RestAPI) deleteJobBefore(rw http.ResponseWriter, r *http.Request) { } // Check for omit-tagged query parameter - omitTagged := false + omitTagged := "none" if omitTaggedStr := r.URL.Query().Get("omit-tagged"); omitTaggedStr != "" { - omitTagged, e = strconv.ParseBool(omitTaggedStr) - if e != nil { - handleError(fmt.Errorf("boolean expected for omit-tagged parameter: %w", e), http.StatusBadRequest, rw) + switch omitTaggedStr { + case "none", "all", "user": + omitTagged = omitTaggedStr + default: + handleError(fmt.Errorf("omit-tagged must be one of: none, all, user"), http.StatusBadRequest, rw) return } } diff --git a/internal/repository/job.go b/internal/repository/job.go index a1cd9719..8055ca37 100644 --- a/internal/repository/job.go +++ b/internal/repository/job.go @@ -392,15 +392,19 @@ func (r *JobRepository) FetchEnergyFootprint(job *schema.Job) (map[string]float6 // // Parameters: // - startTime: Unix timestamp, jobs with start_time < this value will be deleted -// - omitTagged: If true, skip jobs that have associated tags (jobtag entries) +// - omitTagged: "none" = delete all jobs, "all" = skip any tagged jobs, +// "user" = skip jobs with user-created tags (not auto-tagger types "app"/"jobClass") // // Returns the count of deleted jobs or an error if the operation fails. -func (r *JobRepository) DeleteJobsBefore(startTime int64, omitTagged bool) (int, error) { +func (r *JobRepository) DeleteJobsBefore(startTime int64, omitTagged string) (int, error) { var cnt int q := sq.Select("count(*)").From("job").Where("job.start_time < ?", startTime) - if omitTagged { + switch omitTagged { + case "all": q = q.Where("NOT EXISTS (SELECT 1 FROM jobtag WHERE jobtag.job_id = job.id)") + case "user": + q = q.Where("NOT EXISTS (SELECT 1 FROM jobtag JOIN tag ON tag.id = jobtag.tag_id WHERE jobtag.job_id = job.id AND tag.tag_type NOT IN ('app', 'jobClass'))") } if err := q.RunWith(r.DB).QueryRow().Scan(&cnt); err != nil { @@ -413,8 +417,11 @@ func (r *JobRepository) DeleteJobsBefore(startTime int64, omitTagged bool) (int, var jobIds []int64 selectQuery := sq.Select("id").From("job").Where("job.start_time < ?", startTime) - if omitTagged { + switch omitTagged { + case "all": selectQuery = selectQuery.Where("NOT EXISTS (SELECT 1 FROM jobtag WHERE jobtag.job_id = job.id)") + case "user": + selectQuery = selectQuery.Where("NOT EXISTS (SELECT 1 FROM jobtag JOIN tag ON tag.id = jobtag.tag_id WHERE jobtag.job_id = job.id AND tag.tag_type NOT IN ('app', 'jobClass'))") } rows, err := selectQuery.RunWith(r.DB).Query() @@ -436,8 +443,11 @@ func (r *JobRepository) DeleteJobsBefore(startTime int64, omitTagged bool) (int, qd := sq.Delete("job").Where("job.start_time < ?", startTime) - if omitTagged { + switch omitTagged { + case "all": qd = qd.Where("NOT EXISTS (SELECT 1 FROM jobtag WHERE jobtag.job_id = job.id)") + case "user": + qd = qd.Where("NOT EXISTS (SELECT 1 FROM jobtag JOIN tag ON tag.id = jobtag.tag_id WHERE jobtag.job_id = job.id AND tag.tag_type NOT IN ('app', 'jobClass'))") } _, err := qd.RunWith(r.DB).Exec() @@ -822,10 +832,11 @@ func (r *JobRepository) UpdateDuration() error { // Parameters: // - startTimeBegin: Unix timestamp for range start (use 0 for unbounded start) // - startTimeEnd: Unix timestamp for range end -// - omitTagged: If true, exclude jobs with associated tags +// - omitTagged: "none" = include all jobs, "all" = exclude any tagged jobs, +// "user" = exclude jobs with user-created tags (not auto-tagger types "app"/"jobClass") // // Returns a slice of jobs or an error if the time range is invalid or query fails. -func (r *JobRepository) FindJobsBetween(startTimeBegin int64, startTimeEnd int64, omitTagged bool) ([]*schema.Job, error) { +func (r *JobRepository) FindJobsBetween(startTimeBegin int64, startTimeEnd int64, omitTagged string) ([]*schema.Job, error) { var query sq.SelectBuilder if startTimeBegin == startTimeEnd || startTimeBegin > startTimeEnd { @@ -840,8 +851,11 @@ func (r *JobRepository) FindJobsBetween(startTimeBegin int64, startTimeEnd int64 query = sq.Select(jobColumns...).From("job").Where("job.start_time BETWEEN ? AND ?", startTimeBegin, startTimeEnd) } - if omitTagged { + switch omitTagged { + case "all": query = query.Where("NOT EXISTS (SELECT 1 FROM jobtag WHERE jobtag.job_id = job.id)") + case "user": + query = query.Where("NOT EXISTS (SELECT 1 FROM jobtag JOIN tag ON tag.id = jobtag.tag_id WHERE jobtag.job_id = job.id AND tag.tag_type NOT IN ('app', 'jobClass'))") } query = query.OrderBy("job.cluster ASC", "job.subcluster ASC", "job.project ASC", "job.start_time ASC") diff --git a/internal/repository/job_test.go b/internal/repository/job_test.go index 9f4871fd..992251af 100644 --- a/internal/repository/job_test.go +++ b/internal/repository/job_test.go @@ -78,7 +78,7 @@ func TestFindJobsBetween(t *testing.T) { // 1. Find a job to use (Find all jobs) // We use a large time range to ensure we get something if it exists - jobs, err := r.FindJobsBetween(0, 9999999999, false) + jobs, err := r.FindJobsBetween(0, 9999999999, "none") if err != nil { t.Fatal(err) } @@ -88,21 +88,21 @@ func TestFindJobsBetween(t *testing.T) { targetJob := jobs[0] - // 2. Create a tag - tagName := fmt.Sprintf("testtag_%d", time.Now().UnixNano()) - tagID, err := r.CreateTag("testtype", tagName, "global") + // 2. Create an auto-tagger tag (type "app") + appTagName := fmt.Sprintf("apptag_%d", time.Now().UnixNano()) + appTagID, err := r.CreateTag("app", appTagName, "global") if err != nil { t.Fatal(err) } - // 3. Link Tag (Manually to avoid archive dependency side-effects in unit test) - _, err = r.DB.Exec("INSERT INTO jobtag (job_id, tag_id) VALUES (?, ?)", *targetJob.ID, tagID) + // 3. Link auto-tagger tag to job + _, err = r.DB.Exec("INSERT INTO jobtag (job_id, tag_id) VALUES (?, ?)", *targetJob.ID, appTagID) if err != nil { t.Fatal(err) } - // 4. Search with omitTagged = false (Should find the job) - jobsFound, err := r.FindJobsBetween(0, 9999999999, false) + // 4. Search with omitTagged = "none" (Should find the job) + jobsFound, err := r.FindJobsBetween(0, 9999999999, "none") if err != nil { t.Fatal(err) } @@ -115,18 +115,58 @@ func TestFindJobsBetween(t *testing.T) { } } if !found { - t.Errorf("Target job %d should be found when omitTagged=false", *targetJob.ID) + t.Errorf("Target job %d should be found when omitTagged=none", *targetJob.ID) } - // 5. Search with omitTagged = true (Should NOT find the job) - jobsFiltered, err := r.FindJobsBetween(0, 9999999999, true) + // 5. Search with omitTagged = "all" (Should NOT find the job — it has a tag) + jobsFiltered, err := r.FindJobsBetween(0, 9999999999, "all") if err != nil { t.Fatal(err) } for _, j := range jobsFiltered { if *j.ID == *targetJob.ID { - t.Errorf("Target job %d should NOT be found when omitTagged=true", *targetJob.ID) + t.Errorf("Target job %d should NOT be found when omitTagged=all", *targetJob.ID) + } + } + + // 6. Search with omitTagged = "user": auto-tagger tag ("app") should NOT exclude the job + jobsUserFilter, err := r.FindJobsBetween(0, 9999999999, "user") + if err != nil { + t.Fatal(err) + } + + found = false + for _, j := range jobsUserFilter { + if *j.ID == *targetJob.ID { + found = true + break + } + } + if !found { + t.Errorf("Target job %d should be found when omitTagged=user (only has auto-tagger tag)", *targetJob.ID) + } + + // 7. Add a user-created tag (type "testtype") to the same job + userTagName := fmt.Sprintf("usertag_%d", time.Now().UnixNano()) + userTagID, err := r.CreateTag("testtype", userTagName, "global") + if err != nil { + t.Fatal(err) + } + _, err = r.DB.Exec("INSERT INTO jobtag (job_id, tag_id) VALUES (?, ?)", *targetJob.ID, userTagID) + if err != nil { + t.Fatal(err) + } + + // 8. Now omitTagged = "user" should exclude the job (has a user-created tag) + jobsUserFilter2, err := r.FindJobsBetween(0, 9999999999, "user") + if err != nil { + t.Fatal(err) + } + + for _, j := range jobsUserFilter2 { + if *j.ID == *targetJob.ID { + t.Errorf("Target job %d should NOT be found when omitTagged=user (has user-created tag)", *targetJob.ID) } } } diff --git a/internal/taskmanager/compressionService.go b/internal/taskmanager/compressionService.go index ab01ce8f..353fcb65 100644 --- a/internal/taskmanager/compressionService.go +++ b/internal/taskmanager/compressionService.go @@ -28,10 +28,10 @@ func RegisterCompressionService(compressOlderThan int) { lastTime := ar.CompressLast(startTime) if startTime == lastTime { cclog.Info("Compression Service - Complete archive run") - jobs, err = jobRepo.FindJobsBetween(0, startTime, false) + jobs, err = jobRepo.FindJobsBetween(0, startTime, "none") } else { - jobs, err = jobRepo.FindJobsBetween(lastTime, startTime, false) + jobs, err = jobRepo.FindJobsBetween(lastTime, startTime, "none") } if err != nil { diff --git a/internal/taskmanager/retentionService.go b/internal/taskmanager/retentionService.go index eda452e6..48e5c042 100644 --- a/internal/taskmanager/retentionService.go +++ b/internal/taskmanager/retentionService.go @@ -149,7 +149,7 @@ func transferJobsParquet(jobs []*schema.Job, src archive.ArchiveBackend, target } // cleanupAfterTransfer removes jobs from archive and optionally from DB. -func cleanupAfterTransfer(jobs []*schema.Job, startTime int64, includeDB bool, omitTagged bool) { +func cleanupAfterTransfer(jobs []*schema.Job, startTime int64, includeDB bool, omitTagged string) { archive.GetHandle().CleanUp(jobs) if includeDB { diff --git a/internal/taskmanager/taskManager.go b/internal/taskmanager/taskManager.go index b25b2a93..d758ee52 100644 --- a/internal/taskmanager/taskManager.go +++ b/internal/taskmanager/taskManager.go @@ -26,8 +26,8 @@ type Retention struct { Policy string `json:"policy"` Format string `json:"format"` Age int `json:"age"` - IncludeDB bool `json:"includeDB"` - OmitTagged bool `json:"omitTagged"` + IncludeDB bool `json:"include-db"` + OmitTagged string `json:"omit-tagged"` TargetKind string `json:"target-kind"` TargetPath string `json:"target-path"` TargetEndpoint string `json:"target-endpoint"` diff --git a/pkg/archive/ConfigSchema.go b/pkg/archive/ConfigSchema.go index cb9b16bc..1c2b7fe1 100644 --- a/pkg/archive/ConfigSchema.go +++ b/pkg/archive/ConfigSchema.go @@ -68,6 +68,11 @@ var configSchema = ` "description": "Also remove jobs from database", "type": "boolean" }, + "omit-tagged": { + "description": "Omit tagged jobs from retention: none = include all, all = omit any tagged job, user = omit jobs with user-created tags (auto-tagger types 'app'/'jobClass' are not considered user tags)", + "type": "string", + "enum": ["none", "all", "user"] + }, "age": { "description": "Act on jobs with startTime older than age (in days)", "type": "integer" From ab55ce91a11c753305c4c147968cd5fc097b1c93 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Mon, 23 Feb 2026 09:02:36 +0100 Subject: [PATCH 08/11] Change omit tagged key to kebab case --- ReleaseNotes.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ReleaseNotes.md b/ReleaseNotes.md index 1178ca8e..3d352f20 100644 --- a/ReleaseNotes.md +++ b/ReleaseNotes.md @@ -89,7 +89,7 @@ For release specific notes visit the [ClusterCockpit Documentation](https://clus - **Job tagger option**: Enable automatic job tagging via configuration flag - **Application detection**: Automatic detection of applications (MATLAB, GROMACS, etc.) - **Job classification**: Automatic detection of pathological jobs -- **omitTagged flag**: Option to exclude tagged jobs from retention/cleanup operations +- **omit-tagged**: Option to exclude tagged jobs from retention/cleanup operations (`none`, `all`, or `user`) - **Admin UI trigger**: Taggers can be run on-demand from the admin web interface without restarting the backend From 31f3c2829403923615f7df98ffd2126ebfba9e4f Mon Sep 17 00:00:00 2001 From: Christoph Kluge Date: Mon, 23 Feb 2026 13:26:06 +0100 Subject: [PATCH 09/11] Remove schedState and metaData from metricHealth table --- .../src/status/dashdetails/HealthDash.svelte | 70 ++----------------- 1 file changed, 6 insertions(+), 64 deletions(-) diff --git a/web/frontend/src/status/dashdetails/HealthDash.svelte b/web/frontend/src/status/dashdetails/HealthDash.svelte index 11f1ef31..b4063309 100644 --- a/web/frontend/src/status/dashdetails/HealthDash.svelte +++ b/web/frontend/src/status/dashdetails/HealthDash.svelte @@ -32,15 +32,6 @@ /* Const Init */ const client = getContextClient(); - const stateOptions = [ - "all", - "allocated", - "idle", - "down", - "mixed", - "reserved", - "unknown", - ]; const healthOptions = [ "all", "full", @@ -52,12 +43,10 @@ let pieWidth = $state(0); let querySorting = $state({ field: "startTime", type: "col", order: "DESC" }) let tableHostFilter = $state(""); - let tableStateFilter = $state(stateOptions[0]); let tableHealthFilter = $state(healthOptions[0]); let healthTableSorting = $state( { - schedulerState: { dir: "down", active: true }, - healthState: { dir: "down", active: false }, + healthState: { dir: "up", active: true }, hostname: { dir: "down", active: false }, } ); @@ -79,9 +68,7 @@ hostname cluster subCluster - schedulerState healthState - metaData healthData } } @@ -102,7 +89,7 @@ let healthTableData = $derived.by(() => { if ($statusQuery?.data) { return [...$statusQuery.data.nodes.items].sort((n1, n2) => { - return n1['schedulerState'].localeCompare(n2['schedulerState']) + return n1['healthState'].localeCompare(n2['healthState']) }); } else { return []; @@ -114,21 +101,12 @@ if (tableHostFilter != "") { pendingTableData = pendingTableData.filter((e) => e.hostname.includes(tableHostFilter)) } - if (tableStateFilter != "all") { - pendingTableData = pendingTableData.filter((e) => e.schedulerState.includes(tableStateFilter)) - } if (tableHealthFilter != "all") { pendingTableData = pendingTableData.filter((e) => e.healthState.includes(tableHealthFilter)) } return pendingTableData }); - const refinedStateData = $derived.by(() => { - return $statusQuery?.data?.nodeStates. - filter((e) => ['allocated', 'reserved', 'idle', 'mixed','down', 'unknown'].includes(e.state)). - sort((a, b) => b.count - a.count) - }); - const refinedHealthData = $derived.by(() => { return $statusQuery?.data?.nodeStates. filter((e) => ['full', 'partial', 'failed'].includes(e.state)). @@ -296,7 +274,7 @@ - sortBy('hostname')}> + sortBy('hostname')}> Hosts ({filteredTableData.length}) - - sortBy('schedulerState')}> - Scheduler State - - sortBy('healthState')}> + sortBy('healthState')}> Health State - - {#each stateOptions as so} - - {/each} - - - - - - - - - + {#each healthOptions as ho} {/each} - - - - - {#each filteredTableData as host (host.hostname)} {host.hostname} - {host.schedulerState} {host.healthState} - + {#each Object.keys(host.healthData) as hkey}

{hkey}: {host.healthData[hkey]}

{/each} - - {#each Object.keys(host.metaData) as mkey} -

- {mkey}: {host.metaData[mkey]} -

- {/each} - {/each} From bcfe9022de7b5b0f28d45af16e04022d06c76a8a Mon Sep 17 00:00:00 2001 From: Michael Panzlaff Date: Mon, 23 Feb 2026 14:51:19 +0100 Subject: [PATCH 10/11] Makefile: use npm ci --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 3246538a..5829beae 100644 --- a/Makefile +++ b/Makefile @@ -84,4 +84,4 @@ $(VAR): $(SVELTE_TARGETS): $(SVELTE_SRC) $(info ===> BUILD frontend) - cd web/frontend && npm install && npm run build + cd web/frontend && npm ci && npm run build From 8989b7a410e8a46c5aed837eb03bf77490fb94f2 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Mon, 23 Feb 2026 18:45:41 +0100 Subject: [PATCH 11/11] Update cclib --- go.mod | 2 +- go.sum | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/go.mod b/go.mod index b790c0c8..e244062c 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,7 @@ tool ( require ( github.com/99designs/gqlgen v0.17.86 - github.com/ClusterCockpit/cc-lib/v2 v2.5.1 + github.com/ClusterCockpit/cc-lib/v2 v2.6.0 github.com/Masterminds/squirrel v1.5.4 github.com/aws/aws-sdk-go-v2 v1.41.1 github.com/aws/aws-sdk-go-v2/config v1.32.8 diff --git a/go.sum b/go.sum index c319c6ba..f2929454 100644 --- a/go.sum +++ b/go.sum @@ -6,6 +6,8 @@ github.com/Azure/go-ntlmssp v0.1.0 h1:DjFo6YtWzNqNvQdrwEyr/e4nhU3vRiwenz5QX7sFz+ github.com/Azure/go-ntlmssp v0.1.0/go.mod h1:NYqdhxd/8aAct/s4qSYZEerdPuH1liG2/X9DiVTbhpk= github.com/ClusterCockpit/cc-lib/v2 v2.5.1 h1:s6M9tyPDty+4zTdQGJYKpGJM9Nz7N6ITMdjPvNSLX5g= github.com/ClusterCockpit/cc-lib/v2 v2.5.1/go.mod h1:DZ8OIHPUZJpWqErLITt0B8P6/Q7CBW2IQSQ5YiFFaG0= +github.com/ClusterCockpit/cc-lib/v2 v2.6.0 h1:Q7zvRAVhfYA9PDB18pfY9A/6Ws4oWpnv8+P9MBRUDzg= +github.com/ClusterCockpit/cc-lib/v2 v2.6.0/go.mod h1:DZ8OIHPUZJpWqErLITt0B8P6/Q7CBW2IQSQ5YiFFaG0= github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU= github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU= github.com/KyleBanks/depth v1.2.1 h1:5h8fQADFrWtarTdtDudMmGsC7GPbOAu6RVB3ffsVFHc=