diff --git a/ReleaseNotes.md b/ReleaseNotes.md index 7ea43620..1178ca8e 100644 --- a/ReleaseNotes.md +++ b/ReleaseNotes.md @@ -16,19 +16,25 @@ For release specific notes visit the [ClusterCockpit Documentation](https://clus - **Removed `disable-archive` option**: This obsolete configuration option has been removed. - **Removed `clusters` config section**: The separate clusters configuration section has been removed. Cluster information is now derived from the job archive. -- **`apiAllowedIPs` is now optional**: If not specified, defaults to secure settings. +- **`apiAllowedIPs` is now optional**: If not specified, defaults to not + restricted. ### Architecture changes +- **Web framework replaced**: Migrated from `gorilla/mux` to `chi` as the HTTP + router. This should be transparent to users but affects how middleware and + routes are composed. A proper 404 handler is now in place. - **MetricStore moved**: The `metricstore` package has been moved from `internal/` to `pkg/` as it is now part of the public API. - **MySQL/MariaDB support removed**: Only SQLite is now supported as the database backend. - **Archive to Cleanup renaming**: Archive-related functions have been refactored and renamed to "Cleanup" for clarity. +- **`minRunningFor` filter removed**: This undocumented filter has been removed + from the API and frontend. ### Dependency changes -- **cc-lib v2**: Switched to cc-lib version 2 with updated APIs +- **cc-lib v2.5.1**: Switched to cc-lib version 2 with updated APIs (currently at v2.5.1) - **cclib NATS client**: Now using the cclib NATS client implementation - Removed obsolete `util.Float` usage from cclib @@ -51,13 +57,30 @@ For release specific notes visit the [ClusterCockpit Documentation](https://clus - **Node state tracking**: New node table in database with timestamp tracking - **Node state filtering**: Filter jobs by node state in systems view -- **Node metrics improvements**: Better handling of node-level metrics and data - **Node list enhancements**: Improved paging, filtering, and continuous scroll support +- **Nodestate retention and archiving**: Node state data is now subject to configurable + retention policies and can be archived to Parquet format for long-term storage +- **Faulty node metric tracking**: Faulty node state metric lists are persisted to the database + +### Health Monitoring + +- **Health status dashboard**: New dedicated "Health" tab in the status details view + showing per-node metric health across the cluster +- **CCMS health check**: Support for querying health status of external + cc-metric-store (CCMS) instances via the API +- **GraphQL health endpoints**: New GraphQL queries and resolvers for health data +- **Cluster/subcluster filter**: Filter health status view by cluster or subcluster + +### Log Viewer + +- **Web-based log viewer**: New log viewer page in the admin interface for inspecting + backend log output directly from the browser without shell access +- **Accessible from header**: Quick access link from the navigation header ### MetricStore Improvements - **Memory tracking worker**: New worker for CCMS memory usage tracking -- **Dynamic retention**: Support for cluster/subcluster-specific retention times +- **Dynamic retention**: Support for job specific dynamic retention times - **Improved compression**: Transparent compression for job archive imports - **Parallel processing**: Parallelized Iter function in all archive backends @@ -65,15 +88,32 @@ For release specific notes visit the [ClusterCockpit Documentation](https://clus - **Job tagger option**: Enable automatic job tagging via configuration flag - **Application detection**: Automatic detection of applications (MATLAB, GROMACS, etc.) -- **Job classifaction**: Automatic detection of pathological jobs +- **Job classification**: Automatic detection of pathological jobs - **omitTagged flag**: Option to exclude tagged jobs from retention/cleanup operations +- **Admin UI trigger**: Taggers can be run on-demand from the admin web interface + without restarting the backend ### Archive Backends +- **Parquet archive format**: New Parquet file format for job archiving, providing + columnar storage with efficient compression for analytical workloads - **S3 backend**: Full support for S3-compatible object storage - **SQLite backend**: Full support for SQLite backend using blobs - **Performance improvements**: Fixed performance bugs in archive backends - **Better error handling**: Improved error messages and fallback handling +- **Zstd compression**: Parquet writers use zstd compression for better + compression ratios compared to the previous snappy default +- **Optimized sort order**: Job and nodestate Parquet files are sorted by + cluster, subcluster, and start time for efficient range queries + +### Unified Archive Retention and Format Conversion + +- **Uniform retention policy**: Job archive retention now supports both JSON and + Parquet as target formats under a single, consistent policy configuration +- **Archive manager tool**: The `tools/archive-manager` utility now supports + format conversion between JSON and Parquet job archives +- **Parquet reader**: Full Parquet archive reader implementation for reading back + archived job data ## New features and improvements @@ -85,6 +125,14 @@ For release specific notes visit the [ClusterCockpit Documentation](https://clus - **Filter presets**: Move list filter preset to URL for easy sharing - **Job comparison**: Improved job comparison views and plots - **Subcluster reactivity**: Job list now reacts to subcluster filter changes +- **Short jobs quick selection**: New "Short jobs" quick-filter button in job lists + replaces the removed undocumented `minRunningFor` filter +- **Row plot cursor sync**: Cursor position is now synchronized across all metric + plots in a job list row for easier cross-metric comparison +- **Disabled metrics handling**: Improved handling and display of disabled metrics + across job view, node view, and list rows +- **"Not configured" info cards**: Informational cards shown when optional features + are not yet configured - **Frontend dependencies**: Bumped frontend dependencies to latest versions - **Svelte 5 compatibility**: Fixed Svelte state warnings and compatibility issues @@ -95,6 +143,15 @@ For release specific notes visit the [ClusterCockpit Documentation](https://clus - **Graceful shutdown**: Fixed shutdown timeout bugs and hanging issues - **Configuration defaults**: Sensible defaults for most configuration options - **Documentation**: Extensive documentation improvements across packages +- **Server flag in systemd unit**: Example systemd unit now includes the `-server` flag + +### Security + +- **LDAP security hardening**: Improved input validation, connection handling, and + error reporting in the LDAP authenticator +- **OIDC security hardening**: Stricter token validation and improved error handling + in the OIDC authenticator +- **Auth schema extensions**: Additional schema fields for improved auth configuration ### API improvements @@ -102,6 +159,8 @@ For release specific notes visit the [ClusterCockpit Documentation](https://clus - **Job exclusivity filter**: New filter for exclusive vs. shared jobs - **Improved error messages**: Better error messages and documentation in REST API - **GraphQL enhancements**: Improved GraphQL queries and resolvers +- **Stop job lookup order**: Reversed lookup order in stop job requests for + more reliable job matching (cluster+jobId first, then jobId alone) ### Performance @@ -109,13 +168,17 @@ For release specific notes visit the [ClusterCockpit Documentation](https://clus - **Job cache**: Introduced caching table for faster job inserts - **Parallel imports**: Archive imports now run in parallel where possible - **External tool integration**: Optimized use of external tools (fd) for better performance +- **Node repository queries**: Reviewed and optimized node repository SQL queries +- **Buffer pool**: Resized and pooled internal buffers for better memory reuse ### Developer experience - **AI agent guidelines**: Added documentation for AI coding agents (AGENTS.md, CLAUDE.md) - **Example API payloads**: Added example JSON API payloads for testing -- **Unit tests**: Added more unit tests for NATS API and other components -- **Test improvements**: Better test coverage and test data +- **Unit tests**: Added more unit tests for NATS API, node repository, and other components +- **Test improvements**: Better test coverage; test DB is now copied before unit tests + to avoid state pollution between test runs +- **Parquet writer tests**: Comprehensive tests for Parquet archive writing and conversion ## Bug fixes @@ -132,6 +195,16 @@ For release specific notes visit the [ClusterCockpit Documentation](https://clus - Fixed polar plot data query decoupling - Fixed missing resolution parameter handling - Fixed node table initialization fallback +- Fixed reactivity key placement in nodeList +- Fixed nodeList resolver data handling and increased nodestate filter cutoff +- Fixed job always being transferred to main job table before archiving +- Fixed AppTagger error handling and logging +- Fixed log endpoint formatting and correctness +- Fixed automatic refresh in metric status tab +- Fixed NULL value handling in `health_state` and `health_metrics` columns +- Fixed bugs related to `job_cache` IDs being used in the main job table +- Fixed SyncJobs bug causing start job hooks to be called with wrong (cache) IDs +- Fixed 404 handler route for sub-routers ## Configuration changes @@ -167,6 +240,20 @@ For release specific notes visit the [ClusterCockpit Documentation](https://clus "interval": "48h", "directory": "./var/archive" } + }, + "archive": { + "retention": { + "policy": "delete", + "age": "6months", + "target-format": "parquet" + } + }, + "nodestate": { + "retention": { + "policy": "archive", + "age": "30d", + "archive-path": "./var/nodestate-archive" + } } } ``` @@ -178,11 +265,13 @@ For release specific notes visit the [ClusterCockpit Documentation](https://clus - If using S3 archive backend, configure the new `archive` section options - Test the new public dashboard at `/public` route - Review cron worker configuration if you need different frequencies +- If using the archive retention feature, configure the `target-format` option + to choose between `json` (default) and `parquet` output formats +- Consider enabling nodestate retention if you track node states over time ## Known issues - Currently energy footprint metrics of type energy are ignored for calculating total energy. -- Resampling for running jobs only works with cc-metric-store - With energy footprint metrics of type power the unit is ignored and it is assumed the metric has the unit Watt. diff --git a/cmd/cc-backend/main.go b/cmd/cc-backend/main.go index fde95fd3..81d397d2 100644 --- a/cmd/cc-backend/main.go +++ b/cmd/cc-backend/main.go @@ -369,13 +369,11 @@ func runServer(ctx context.Context) error { errChan := make(chan error, 1) // Start HTTP server - wg.Add(1) - go func() { - defer wg.Done() + wg.Go(func() { if err := srv.Start(ctx); err != nil { errChan <- err } - }() + }) // Handle shutdown signals wg.Add(1) diff --git a/configs/tagger/jobclasses/highMemoryUsage.json b/configs/tagger/jobclasses/highMemoryUsage.json new file mode 100644 index 00000000..3c10b06f --- /dev/null +++ b/configs/tagger/jobclasses/highMemoryUsage.json @@ -0,0 +1,25 @@ +{ + "name": "High memory usage", + "tag": "highmemory", + "parameters": [ + "highmemoryusage_threshold_factor", + "job_min_duration_seconds" + ], + "metrics": ["mem_used"], + "requirements": [ + "job.shared == \"none\"", + "job.duration > job_min_duration_seconds" + ], + "variables": [ + { + "name": "memory_threshold", + "expr": "mem_used.limits.peak * highmemoryusage_threshold_factor" + }, + { + "name": "memory_usage_pct", + "expr": "mem_used.max / mem_used.limits.peak * 100.0" + } + ], + "rule": "mem_used.max > memory_threshold", + "hint": "This job used high memory: peak memory usage {{.mem_used.max}} GB ({{.memory_usage_pct}}% of {{.mem_used.limits.peak}} GB node capacity), exceeding the {{.highmemoryusage_threshold_factor}} utilization threshold. Risk of out-of-memory conditions." +} diff --git a/configs/tagger/jobclasses/highload.json b/configs/tagger/jobclasses/highload.json index 9667011b..a442a3ac 100644 --- a/configs/tagger/jobclasses/highload.json +++ b/configs/tagger/jobclasses/highload.json @@ -3,8 +3,7 @@ "tag": "excessiveload", "parameters": [ "excessivecpuload_threshold_factor", - "job_min_duration_seconds", - "sampling_interval_seconds" + "job_min_duration_seconds" ], "metrics": ["cpu_load"], "requirements": [ @@ -15,12 +14,8 @@ { "name": "load_threshold", "expr": "cpu_load.limits.peak * excessivecpuload_threshold_factor" - }, - { - "name": "load_perc", - "expr": "1.0 - (cpu_load.avg / cpu_load.limits.peak)" } ], "rule": "cpu_load.avg > load_threshold", - "hint": "This job was detected as excessiveload because the average cpu load {{.cpu_load.avg}} falls above the threshold {{.load_threshold}}." + "hint": "This job was detected as having excessive CPU load: average cpu load {{.cpu_load.avg}} exceeds the oversubscription threshold {{.load_threshold}} ({{.excessivecpuload_threshold_factor}} \u00d7 {{.cpu_load.limits.peak}} peak cores), indicating CPU contention." } diff --git a/configs/tagger/jobclasses/lowUtilization.json b/configs/tagger/jobclasses/lowUtilization.json index e84b81da..1d365150 100644 --- a/configs/tagger/jobclasses/lowUtilization.json +++ b/configs/tagger/jobclasses/lowUtilization.json @@ -1,5 +1,5 @@ { - "name": "Low ressource utilization", + "name": "Low resource utilization", "tag": "lowutilization", "parameters": ["job_min_duration_seconds"], "metrics": ["flops_any", "mem_bw"], @@ -9,14 +9,14 @@ ], "variables": [ { - "name": "mem_bw_perc", - "expr": "1.0 - (mem_bw.avg / mem_bw.limits.peak)" + "name": "mem_bw_pct", + "expr": "mem_bw.avg / mem_bw.limits.peak * 100.0" }, { - "name": "flops_any_perc", - "expr": "1.0 - (flops_any.avg / flops_any.limits.peak)" + "name": "flops_any_pct", + "expr": "flops_any.avg / flops_any.limits.peak * 100.0" } ], "rule": "flops_any.avg < flops_any.limits.alert && mem_bw.avg < mem_bw.limits.alert", - "hint": "This job was detected as low utilization because the average flop rate {{.flops_any.avg}} falls below the threshold {{.flops_any.limits.alert}}." + "hint": "This job shows low resource utilization: FLOP rate {{.flops_any.avg}} GF/s ({{.flops_any_pct}}% of peak) and memory bandwidth {{.mem_bw.avg}} GB/s ({{.mem_bw_pct}}% of peak) are both below their alert thresholds." } diff --git a/configs/tagger/jobclasses/lowload.json b/configs/tagger/jobclasses/lowload.json index f952da59..7fa3ca3b 100644 --- a/configs/tagger/jobclasses/lowload.json +++ b/configs/tagger/jobclasses/lowload.json @@ -3,8 +3,7 @@ "tag": "lowload", "parameters": [ "lowcpuload_threshold_factor", - "job_min_duration_seconds", - "sampling_interval_seconds" + "job_min_duration_seconds" ], "metrics": ["cpu_load"], "requirements": [ @@ -15,12 +14,8 @@ { "name": "load_threshold", "expr": "job.numCores * lowcpuload_threshold_factor" - }, - { - "name": "load_perc", - "expr": "1.0 - (cpu_load.avg / cpu_load.limits.peak)" } ], - "rule": "cpu_load.avg < cpu_load.limits.caution", - "hint": "This job was detected as lowload because the average cpu load {{.cpu_load}} falls below the threshold {{.cpu_load.limits.caution}}." + "rule": "cpu_load.avg < load_threshold", + "hint": "This job was detected as low CPU load: average cpu load {{.cpu_load.avg}} is below the threshold {{.load_threshold}} ({{.lowcpuload_threshold_factor}} \u00d7 {{.job.numCores}} allocated cores)." } diff --git a/configs/tagger/jobclasses/memoryBound.json b/configs/tagger/jobclasses/memoryBound.json new file mode 100644 index 00000000..01368c08 --- /dev/null +++ b/configs/tagger/jobclasses/memoryBound.json @@ -0,0 +1,22 @@ +{ + "name": "Memory bandwidth bound", + "tag": "memorybound", + "parameters": ["membound_bw_threshold_factor", "job_min_duration_seconds"], + "metrics": ["mem_bw"], + "requirements": [ + "job.shared == \"none\"", + "job.duration > job_min_duration_seconds" + ], + "variables": [ + { + "name": "mem_bw_threshold", + "expr": "mem_bw.limits.peak * membound_bw_threshold_factor" + }, + { + "name": "mem_bw_pct", + "expr": "mem_bw.avg / mem_bw.limits.peak * 100.0" + } + ], + "rule": "mem_bw.avg > mem_bw_threshold", + "hint": "This job is memory bandwidth bound: memory bandwidth {{.mem_bw.avg}} GB/s ({{.mem_bw_pct}}% of peak) is within {{.membound_bw_threshold_factor}} of peak bandwidth. Consider improving data reuse or compute intensity." +} diff --git a/configs/tagger/jobclasses/parameters.json b/configs/tagger/jobclasses/parameters.json index 39e94c1c..c3fb5cdc 100644 --- a/configs/tagger/jobclasses/parameters.json +++ b/configs/tagger/jobclasses/parameters.json @@ -1,11 +1,12 @@ { - "lowcpuload_threshold_factor": 0.9, - "excessivecpuload_threshold_factor": 1.1, + "lowcpuload_threshold_factor": 0.85, + "excessivecpuload_threshold_factor": 1.2, "highmemoryusage_threshold_factor": 0.9, "node_load_imbalance_threshold_factor": 0.1, "core_load_imbalance_threshold_factor": 0.1, "high_memory_load_threshold_factor": 0.9, "lowgpuload_threshold_factor": 0.7, + "membound_bw_threshold_factor": 0.8, "memory_leak_slope_threshold": 0.1, "job_min_duration_seconds": 600.0, "sampling_interval_seconds": 30.0, diff --git a/internal/api/api_test.go b/internal/api/api_test.go index 09fc4c7f..a8aef889 100644 --- a/internal/api/api_test.go +++ b/internal/api/api_test.go @@ -488,3 +488,163 @@ func TestRestApi(t *testing.T) { } }) } + +// TestStopJobWithReusedJobId verifies that stopping a recently started job works +// even when an older job with the same jobId exists in the job table (e.g. with +// state "failed"). This is a regression test for the bug where Find() on the job +// table would match the old job instead of the new one still in job_cache. +func TestStopJobWithReusedJobId(t *testing.T) { + restapi := setup(t) + t.Cleanup(cleanup) + + testData := schema.JobData{ + "load_one": map[schema.MetricScope]*schema.JobMetric{ + schema.MetricScopeNode: { + Unit: schema.Unit{Base: "load"}, + Timestep: 60, + Series: []schema.Series{ + { + Hostname: "host123", + Statistics: schema.MetricStatistics{Min: 0.1, Avg: 0.2, Max: 0.3}, + Data: []schema.Float{0.1, 0.1, 0.1, 0.2, 0.2, 0.2, 0.3, 0.3, 0.3}, + }, + }, + }, + }, + } + + metricstore.TestLoadDataCallback = func(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context, resolution int) (schema.JobData, error) { + return testData, nil + } + + r := chi.NewRouter() + restapi.MountAPIRoutes(r) + + const contextUserKey repository.ContextKey = "user" + contextUserValue := &schema.User{ + Username: "testuser", + Projects: make([]string, 0), + Roles: []string{"user"}, + AuthType: 0, + AuthSource: 2, + } + + // Step 1: Start the first job (jobId=999) + const startJobBody1 string = `{ + "jobId": 999, + "user": "testuser", + "project": "testproj", + "cluster": "testcluster", + "partition": "default", + "walltime": 3600, + "numNodes": 1, + "numHwthreads": 8, + "numAcc": 0, + "shared": "none", + "monitoringStatus": 1, + "smt": 1, + "resources": [{"hostname": "host123", "hwthreads": [0, 1, 2, 3, 4, 5, 6, 7]}], + "startTime": 200000000 + }` + + if ok := t.Run("StartFirstJob", func(t *testing.T) { + req := httptest.NewRequest(http.MethodPost, "/jobs/start_job/", bytes.NewBuffer([]byte(startJobBody1))) + recorder := httptest.NewRecorder() + ctx := context.WithValue(req.Context(), contextUserKey, contextUserValue) + r.ServeHTTP(recorder, req.WithContext(ctx)) + if recorder.Result().StatusCode != http.StatusCreated { + t.Fatal(recorder.Result().Status, recorder.Body.String()) + } + }); !ok { + return + } + + // Step 2: Sync to move job from cache to job table, then stop it as "failed" + time.Sleep(1 * time.Second) + restapi.JobRepository.SyncJobs() + + const stopJobBody1 string = `{ + "jobId": 999, + "startTime": 200000000, + "cluster": "testcluster", + "jobState": "failed", + "stopTime": 200001000 + }` + + if ok := t.Run("StopFirstJobAsFailed", func(t *testing.T) { + req := httptest.NewRequest(http.MethodPost, "/jobs/stop_job/", bytes.NewBuffer([]byte(stopJobBody1))) + recorder := httptest.NewRecorder() + ctx := context.WithValue(req.Context(), contextUserKey, contextUserValue) + r.ServeHTTP(recorder, req.WithContext(ctx)) + if recorder.Result().StatusCode != http.StatusOK { + t.Fatal(recorder.Result().Status, recorder.Body.String()) + } + + jobid, cluster := int64(999), "testcluster" + job, err := restapi.JobRepository.Find(&jobid, &cluster, nil) + if err != nil { + t.Fatal(err) + } + if job.State != schema.JobStateFailed { + t.Fatalf("expected first job to be failed, got: %s", job.State) + } + }); !ok { + return + } + + // Wait for archiving to complete + time.Sleep(1 * time.Second) + + // Step 3: Start a NEW job with the same jobId=999 but different startTime. + // This job will sit in job_cache (not yet synced). + const startJobBody2 string = `{ + "jobId": 999, + "user": "testuser", + "project": "testproj", + "cluster": "testcluster", + "partition": "default", + "walltime": 3600, + "numNodes": 1, + "numHwthreads": 8, + "numAcc": 0, + "shared": "none", + "monitoringStatus": 1, + "smt": 1, + "resources": [{"hostname": "host123", "hwthreads": [0, 1, 2, 3, 4, 5, 6, 7]}], + "startTime": 300000000 + }` + + if ok := t.Run("StartSecondJob", func(t *testing.T) { + req := httptest.NewRequest(http.MethodPost, "/jobs/start_job/", bytes.NewBuffer([]byte(startJobBody2))) + recorder := httptest.NewRecorder() + ctx := context.WithValue(req.Context(), contextUserKey, contextUserValue) + r.ServeHTTP(recorder, req.WithContext(ctx)) + if recorder.Result().StatusCode != http.StatusCreated { + t.Fatal(recorder.Result().Status, recorder.Body.String()) + } + }); !ok { + return + } + + // Step 4: Stop the second job WITHOUT syncing first. + // Before the fix, this would fail because Find() on the job table would + // match the old failed job (jobId=999) and reject with "already stopped". + const stopJobBody2 string = `{ + "jobId": 999, + "startTime": 300000000, + "cluster": "testcluster", + "jobState": "completed", + "stopTime": 300001000 + }` + + t.Run("StopSecondJobBeforeSync", func(t *testing.T) { + req := httptest.NewRequest(http.MethodPost, "/jobs/stop_job/", bytes.NewBuffer([]byte(stopJobBody2))) + recorder := httptest.NewRecorder() + ctx := context.WithValue(req.Context(), contextUserKey, contextUserValue) + r.ServeHTTP(recorder, req.WithContext(ctx)) + if recorder.Result().StatusCode != http.StatusOK { + t.Fatalf("expected stop to succeed for cached job, got: %s %s", + recorder.Result().Status, recorder.Body.String()) + } + }) +} diff --git a/internal/api/job.go b/internal/api/job.go index 66258668..62410001 100644 --- a/internal/api/job.go +++ b/internal/api/job.go @@ -697,7 +697,15 @@ func (api *RestAPI) startJob(rw http.ResponseWriter, r *http.Request) { } } - id, err := api.JobRepository.Start(&req) + // When tags are present, insert directly into the job table so that the + // returned ID can be used with AddTagOrCreate (which queries the job table). + // Jobs without tags use the cache path as before. + var id int64 + if len(req.Tags) > 0 { + id, err = api.JobRepository.StartDirect(&req) + } else { + id, err = api.JobRepository.Start(&req) + } if err != nil { handleError(fmt.Errorf("insert into database failed: %w", err), http.StatusInternalServerError, rw) return @@ -755,16 +763,15 @@ func (api *RestAPI) stopJobByRequest(rw http.ResponseWriter, r *http.Request) { } isCached := false - job, err = api.JobRepository.Find(req.JobID, req.Cluster, req.StartTime) + job, err = api.JobRepository.FindCached(req.JobID, req.Cluster, req.StartTime) if err != nil { - // Try cached jobs if not found in main repository - cachedJob, cachedErr := api.JobRepository.FindCached(req.JobID, req.Cluster, req.StartTime) - if cachedErr != nil { - // Combine both errors for better debugging - handleError(fmt.Errorf("finding job failed: %w (cached lookup also failed: %v)", err, cachedErr), http.StatusNotFound, rw) + // Not in cache, try main job table + job, err = api.JobRepository.Find(req.JobID, req.Cluster, req.StartTime) + if err != nil { + handleError(fmt.Errorf("finding job failed: %w", err), http.StatusNotFound, rw) return } - job = cachedJob + } else { isCached = true } @@ -897,11 +904,13 @@ func (api *RestAPI) deleteJobBefore(rw http.ResponseWriter, r *http.Request) { } // Check for omit-tagged query parameter - omitTagged := false + omitTagged := "none" if omitTaggedStr := r.URL.Query().Get("omit-tagged"); omitTaggedStr != "" { - omitTagged, e = strconv.ParseBool(omitTaggedStr) - if e != nil { - handleError(fmt.Errorf("boolean expected for omit-tagged parameter: %w", e), http.StatusBadRequest, rw) + switch omitTaggedStr { + case "none", "all", "user": + omitTagged = omitTaggedStr + default: + handleError(fmt.Errorf("omit-tagged must be one of: none, all, user"), http.StatusBadRequest, rw) return } } diff --git a/internal/api/nats.go b/internal/api/nats.go index 0e929426..02a03fae 100644 --- a/internal/api/nats.go +++ b/internal/api/nats.go @@ -211,7 +211,14 @@ func (api *NatsAPI) handleStartJob(payload string) { } } - id, err := api.JobRepository.Start(&req) + // When tags are present, insert directly into the job table so that the + // returned ID can be used with AddTagOrCreate (which queries the job table). + var id int64 + if len(req.Tags) > 0 { + id, err = api.JobRepository.StartDirect(&req) + } else { + id, err = api.JobRepository.Start(&req) + } if err != nil { cclog.Errorf("NATS start job: insert into database failed: %v", err) return @@ -252,15 +259,15 @@ func (api *NatsAPI) handleStopJob(payload string) { } isCached := false - job, err := api.JobRepository.Find(req.JobID, req.Cluster, req.StartTime) + job, err := api.JobRepository.FindCached(req.JobID, req.Cluster, req.StartTime) if err != nil { - cachedJob, cachedErr := api.JobRepository.FindCached(req.JobID, req.Cluster, req.StartTime) - if cachedErr != nil { - cclog.Errorf("NATS job stop: finding job failed: %v (cached lookup also failed: %v)", - err, cachedErr) + // Not in cache, try main job table + job, err = api.JobRepository.Find(req.JobID, req.Cluster, req.StartTime) + if err != nil { + cclog.Errorf("NATS job stop: finding job failed: %v", err) return } - job = cachedJob + } else { isCached = true } diff --git a/internal/api/node.go b/internal/api/node.go index cab33452..5032ed7b 100644 --- a/internal/api/node.go +++ b/internal/api/node.go @@ -113,8 +113,6 @@ func (api *RestAPI) updateNodeStates(rw http.ResponseWriter, r *http.Request) { } } - fmt.Printf("Result: %#v\n", healthResults) - cclog.Debugf("Timer updateNodeStates, MemStore HealthCheck: %s", time.Since(startMs)) startDB := time.Now() diff --git a/internal/api/rest.go b/internal/api/rest.go index fe722511..4c964b19 100644 --- a/internal/api/rest.go +++ b/internal/api/rest.go @@ -22,6 +22,7 @@ import ( "github.com/ClusterCockpit/cc-backend/internal/auth" "github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-backend/internal/repository" + "github.com/ClusterCockpit/cc-backend/internal/tagger" cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" "github.com/ClusterCockpit/cc-lib/v2/schema" "github.com/ClusterCockpit/cc-lib/v2/util" @@ -152,6 +153,8 @@ func (api *RestAPI) MountConfigAPIRoutes(r chi.Router) { r.Delete("/config/users/", api.deleteUser) r.Post("/config/user/{id}", api.updateUser) r.Post("/config/notice/", api.editNotice) + r.Get("/config/taggers/", api.getTaggers) + r.Post("/config/taggers/run/", api.runTagger) } } @@ -268,6 +271,42 @@ func (api *RestAPI) editNotice(rw http.ResponseWriter, r *http.Request) { } } +func (api *RestAPI) getTaggers(rw http.ResponseWriter, r *http.Request) { + if user := repository.GetUserFromContext(r.Context()); !user.HasRole(schema.RoleAdmin) { + handleError(fmt.Errorf("only admins are allowed to list taggers"), http.StatusForbidden, rw) + return + } + + rw.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(rw).Encode(tagger.ListTaggers()); err != nil { + cclog.Errorf("Failed to encode tagger list: %v", err) + } +} + +func (api *RestAPI) runTagger(rw http.ResponseWriter, r *http.Request) { + if user := repository.GetUserFromContext(r.Context()); !user.HasRole(schema.RoleAdmin) { + handleError(fmt.Errorf("only admins are allowed to run taggers"), http.StatusForbidden, rw) + return + } + + name := r.FormValue("name") + if name == "" { + handleError(fmt.Errorf("missing required parameter: name"), http.StatusBadRequest, rw) + return + } + + if err := tagger.RunTaggerByName(name); err != nil { + handleError(err, http.StatusConflict, rw) + return + } + + rw.Header().Set("Content-Type", "text/plain") + rw.WriteHeader(http.StatusOK) + if _, err := rw.Write([]byte(fmt.Sprintf("Tagger %s started", name))); err != nil { + cclog.Errorf("Failed to write response: %v", err) + } +} + // getJWT godoc // @summary Generate JWT token // @tags Frontend diff --git a/internal/importer/handleImport.go b/internal/importer/handleImport.go index 257b5fec..68b6db9c 100644 --- a/internal/importer/handleImport.go +++ b/internal/importer/handleImport.go @@ -102,7 +102,7 @@ func HandleImportFlag(flag string) error { return err } - id, err := r.InsertJob(&job) + id, err := r.InsertJobDirect(&job) if err != nil { cclog.Warn("Error while job db insert") return err diff --git a/internal/importer/importer_test.go b/internal/importer/importer_test.go index f53e3a9d..cb4dca89 100644 --- a/internal/importer/importer_test.go +++ b/internal/importer/importer_test.go @@ -165,7 +165,7 @@ func TestHandleImportFlag(t *testing.T) { } result := readResult(t, testname) - job, err := r.FindCached(&result.JobId, &result.Cluster, &result.StartTime) + job, err := r.Find(&result.JobId, &result.Cluster, &result.StartTime) if err != nil { t.Fatal(err) } diff --git a/internal/repository/job.go b/internal/repository/job.go index a1cd9719..8055ca37 100644 --- a/internal/repository/job.go +++ b/internal/repository/job.go @@ -392,15 +392,19 @@ func (r *JobRepository) FetchEnergyFootprint(job *schema.Job) (map[string]float6 // // Parameters: // - startTime: Unix timestamp, jobs with start_time < this value will be deleted -// - omitTagged: If true, skip jobs that have associated tags (jobtag entries) +// - omitTagged: "none" = delete all jobs, "all" = skip any tagged jobs, +// "user" = skip jobs with user-created tags (not auto-tagger types "app"/"jobClass") // // Returns the count of deleted jobs or an error if the operation fails. -func (r *JobRepository) DeleteJobsBefore(startTime int64, omitTagged bool) (int, error) { +func (r *JobRepository) DeleteJobsBefore(startTime int64, omitTagged string) (int, error) { var cnt int q := sq.Select("count(*)").From("job").Where("job.start_time < ?", startTime) - if omitTagged { + switch omitTagged { + case "all": q = q.Where("NOT EXISTS (SELECT 1 FROM jobtag WHERE jobtag.job_id = job.id)") + case "user": + q = q.Where("NOT EXISTS (SELECT 1 FROM jobtag JOIN tag ON tag.id = jobtag.tag_id WHERE jobtag.job_id = job.id AND tag.tag_type NOT IN ('app', 'jobClass'))") } if err := q.RunWith(r.DB).QueryRow().Scan(&cnt); err != nil { @@ -413,8 +417,11 @@ func (r *JobRepository) DeleteJobsBefore(startTime int64, omitTagged bool) (int, var jobIds []int64 selectQuery := sq.Select("id").From("job").Where("job.start_time < ?", startTime) - if omitTagged { + switch omitTagged { + case "all": selectQuery = selectQuery.Where("NOT EXISTS (SELECT 1 FROM jobtag WHERE jobtag.job_id = job.id)") + case "user": + selectQuery = selectQuery.Where("NOT EXISTS (SELECT 1 FROM jobtag JOIN tag ON tag.id = jobtag.tag_id WHERE jobtag.job_id = job.id AND tag.tag_type NOT IN ('app', 'jobClass'))") } rows, err := selectQuery.RunWith(r.DB).Query() @@ -436,8 +443,11 @@ func (r *JobRepository) DeleteJobsBefore(startTime int64, omitTagged bool) (int, qd := sq.Delete("job").Where("job.start_time < ?", startTime) - if omitTagged { + switch omitTagged { + case "all": qd = qd.Where("NOT EXISTS (SELECT 1 FROM jobtag WHERE jobtag.job_id = job.id)") + case "user": + qd = qd.Where("NOT EXISTS (SELECT 1 FROM jobtag JOIN tag ON tag.id = jobtag.tag_id WHERE jobtag.job_id = job.id AND tag.tag_type NOT IN ('app', 'jobClass'))") } _, err := qd.RunWith(r.DB).Exec() @@ -822,10 +832,11 @@ func (r *JobRepository) UpdateDuration() error { // Parameters: // - startTimeBegin: Unix timestamp for range start (use 0 for unbounded start) // - startTimeEnd: Unix timestamp for range end -// - omitTagged: If true, exclude jobs with associated tags +// - omitTagged: "none" = include all jobs, "all" = exclude any tagged jobs, +// "user" = exclude jobs with user-created tags (not auto-tagger types "app"/"jobClass") // // Returns a slice of jobs or an error if the time range is invalid or query fails. -func (r *JobRepository) FindJobsBetween(startTimeBegin int64, startTimeEnd int64, omitTagged bool) ([]*schema.Job, error) { +func (r *JobRepository) FindJobsBetween(startTimeBegin int64, startTimeEnd int64, omitTagged string) ([]*schema.Job, error) { var query sq.SelectBuilder if startTimeBegin == startTimeEnd || startTimeBegin > startTimeEnd { @@ -840,8 +851,11 @@ func (r *JobRepository) FindJobsBetween(startTimeBegin int64, startTimeEnd int64 query = sq.Select(jobColumns...).From("job").Where("job.start_time BETWEEN ? AND ?", startTimeBegin, startTimeEnd) } - if omitTagged { + switch omitTagged { + case "all": query = query.Where("NOT EXISTS (SELECT 1 FROM jobtag WHERE jobtag.job_id = job.id)") + case "user": + query = query.Where("NOT EXISTS (SELECT 1 FROM jobtag JOIN tag ON tag.id = jobtag.tag_id WHERE jobtag.job_id = job.id AND tag.tag_type NOT IN ('app', 'jobClass'))") } query = query.OrderBy("job.cluster ASC", "job.subcluster ASC", "job.project ASC", "job.start_time ASC") diff --git a/internal/repository/jobCreate.go b/internal/repository/jobCreate.go index 9f4f366d..07c8ce11 100644 --- a/internal/repository/jobCreate.go +++ b/internal/repository/jobCreate.go @@ -30,6 +30,27 @@ const NamedJobInsert string = `INSERT INTO job ( :shared, :monitoring_status, :smt, :job_state, :start_time, :duration, :walltime, :footprint, :energy, :energy_footprint, :resources, :meta_data );` +// InsertJobDirect inserts a job directly into the job table (not job_cache). +// Use this when the returned ID will be used for operations on the job table +// (e.g., adding tags), or for imported jobs that are already completed. +func (r *JobRepository) InsertJobDirect(job *schema.Job) (int64, error) { + r.Mutex.Lock() + defer r.Mutex.Unlock() + + res, err := r.DB.NamedExec(NamedJobInsert, job) + if err != nil { + cclog.Warn("Error while NamedJobInsert (direct)") + return 0, err + } + id, err := res.LastInsertId() + if err != nil { + cclog.Warn("Error while getting last insert ID (direct)") + return 0, err + } + + return id, nil +} + func (r *JobRepository) InsertJob(job *schema.Job) (int64, error) { r.Mutex.Lock() defer r.Mutex.Unlock() @@ -85,6 +106,22 @@ func (r *JobRepository) SyncJobs() ([]*schema.Job, error) { return nil, err } + // Resolve correct job.id from the job table. The IDs read from job_cache + // are from a different auto-increment sequence and must not be used to + // query the job table. + for _, job := range jobs { + var newID int64 + if err := sq.Select("job.id").From("job"). + Where("job.job_id = ? AND job.cluster = ? AND job.start_time = ?", + job.JobID, job.Cluster, job.StartTime). + RunWith(r.stmtCache).QueryRow().Scan(&newID); err != nil { + cclog.Warnf("SyncJobs: could not resolve job table id for job %d on %s: %v", + job.JobID, job.Cluster, err) + continue + } + job.ID = &newID + } + return jobs, nil } @@ -132,6 +169,28 @@ func (r *JobRepository) Start(job *schema.Job) (id int64, err error) { return r.InsertJob(job) } +// StartDirect inserts a new job directly into the job table (not job_cache). +// Use this when the returned ID will immediately be used for job table +// operations such as adding tags. +func (r *JobRepository) StartDirect(job *schema.Job) (id int64, err error) { + job.RawFootprint, err = json.Marshal(job.Footprint) + if err != nil { + return -1, fmt.Errorf("REPOSITORY/JOB > encoding footprint field failed: %w", err) + } + + job.RawResources, err = json.Marshal(job.Resources) + if err != nil { + return -1, fmt.Errorf("REPOSITORY/JOB > encoding resources field failed: %w", err) + } + + job.RawMetaData, err = json.Marshal(job.MetaData) + if err != nil { + return -1, fmt.Errorf("REPOSITORY/JOB > encoding metaData field failed: %w", err) + } + + return r.InsertJobDirect(job) +} + // Stop updates the job with the database id jobId using the provided arguments. func (r *JobRepository) Stop( jobID int64, diff --git a/internal/repository/jobCreate_test.go b/internal/repository/jobCreate_test.go index 9e72555f..3f2ee6fa 100644 --- a/internal/repository/jobCreate_test.go +++ b/internal/repository/jobCreate_test.go @@ -489,6 +489,34 @@ func TestSyncJobs(t *testing.T) { require.NoError(t, err) }) + t.Run("sync returns job table IDs not cache IDs", func(t *testing.T) { + // Ensure cache is empty first + _, err := r.DB.Exec("DELETE FROM job_cache") + require.NoError(t, err) + + // Insert a job into job_cache + job := createTestJob(999015, "testcluster") + cacheID, err := r.Start(job) + require.NoError(t, err) + + // Sync jobs + jobs, err := r.SyncJobs() + require.NoError(t, err) + require.Equal(t, 1, len(jobs)) + + // The returned ID must refer to the job table, not job_cache + var jobTableID int64 + err = r.DB.QueryRow("SELECT id FROM job WHERE job_id = ? AND cluster = ? AND start_time = ?", + jobs[0].JobID, jobs[0].Cluster, jobs[0].StartTime).Scan(&jobTableID) + require.NoError(t, err) + assert.Equal(t, jobTableID, *jobs[0].ID, + "returned ID should match the job table row, not the cache ID (%d)", cacheID) + + // Clean up + _, err = r.DB.Exec("DELETE FROM job WHERE job_id = ? AND cluster = ?", job.JobID, job.Cluster) + require.NoError(t, err) + }) + t.Run("sync with empty cache returns empty list", func(t *testing.T) { // Ensure cache is empty _, err := r.DB.Exec("DELETE FROM job_cache") @@ -500,3 +528,80 @@ func TestSyncJobs(t *testing.T) { assert.Equal(t, 0, len(jobs), "Should return empty list when cache is empty") }) } + +func TestInsertJobDirect(t *testing.T) { + r := setup(t) + + t.Run("inserts into job table not cache", func(t *testing.T) { + job := createTestJob(999020, "testcluster") + job.RawResources, _ = json.Marshal(job.Resources) + job.RawFootprint, _ = json.Marshal(job.Footprint) + job.RawMetaData, _ = json.Marshal(job.MetaData) + + id, err := r.InsertJobDirect(job) + require.NoError(t, err, "InsertJobDirect should succeed") + assert.Greater(t, id, int64(0), "Should return valid insert ID") + + // Verify job is in job table + var count int + err = r.DB.QueryRow("SELECT COUNT(*) FROM job WHERE id = ?", id).Scan(&count) + require.NoError(t, err) + assert.Equal(t, 1, count, "Job should be in job table") + + // Verify job is NOT in job_cache + err = r.DB.QueryRow("SELECT COUNT(*) FROM job_cache WHERE job_id = ? AND cluster = ?", + job.JobID, job.Cluster).Scan(&count) + require.NoError(t, err) + assert.Equal(t, 0, count, "Job should NOT be in job_cache") + + // Clean up + _, err = r.DB.Exec("DELETE FROM job WHERE id = ?", id) + require.NoError(t, err) + }) + + t.Run("returned ID works for tag operations", func(t *testing.T) { + job := createTestJob(999021, "testcluster") + job.RawResources, _ = json.Marshal(job.Resources) + job.RawFootprint, _ = json.Marshal(job.Footprint) + job.RawMetaData, _ = json.Marshal(job.MetaData) + + id, err := r.InsertJobDirect(job) + require.NoError(t, err) + + // Adding a tag using the returned ID should succeed (FK constraint on jobtag) + err = r.ImportTag(id, "test_type", "test_name", "global") + require.NoError(t, err, "ImportTag should succeed with direct insert ID") + + // Clean up + _, err = r.DB.Exec("DELETE FROM jobtag WHERE job_id = ?", id) + require.NoError(t, err) + _, err = r.DB.Exec("DELETE FROM job WHERE id = ?", id) + require.NoError(t, err) + }) +} + +func TestStartDirect(t *testing.T) { + r := setup(t) + + t.Run("inserts into job table with JSON encoding", func(t *testing.T) { + job := createTestJob(999022, "testcluster") + + id, err := r.StartDirect(job) + require.NoError(t, err, "StartDirect should succeed") + assert.Greater(t, id, int64(0)) + + // Verify job is in job table with encoded JSON + var rawResources []byte + err = r.DB.QueryRow("SELECT resources FROM job WHERE id = ?", id).Scan(&rawResources) + require.NoError(t, err) + + var resources []*schema.Resource + err = json.Unmarshal(rawResources, &resources) + require.NoError(t, err, "Resources should be valid JSON") + assert.Equal(t, "node01", resources[0].Hostname) + + // Clean up + _, err = r.DB.Exec("DELETE FROM job WHERE id = ?", id) + require.NoError(t, err) + }) +} diff --git a/internal/repository/job_test.go b/internal/repository/job_test.go index 9f4871fd..992251af 100644 --- a/internal/repository/job_test.go +++ b/internal/repository/job_test.go @@ -78,7 +78,7 @@ func TestFindJobsBetween(t *testing.T) { // 1. Find a job to use (Find all jobs) // We use a large time range to ensure we get something if it exists - jobs, err := r.FindJobsBetween(0, 9999999999, false) + jobs, err := r.FindJobsBetween(0, 9999999999, "none") if err != nil { t.Fatal(err) } @@ -88,21 +88,21 @@ func TestFindJobsBetween(t *testing.T) { targetJob := jobs[0] - // 2. Create a tag - tagName := fmt.Sprintf("testtag_%d", time.Now().UnixNano()) - tagID, err := r.CreateTag("testtype", tagName, "global") + // 2. Create an auto-tagger tag (type "app") + appTagName := fmt.Sprintf("apptag_%d", time.Now().UnixNano()) + appTagID, err := r.CreateTag("app", appTagName, "global") if err != nil { t.Fatal(err) } - // 3. Link Tag (Manually to avoid archive dependency side-effects in unit test) - _, err = r.DB.Exec("INSERT INTO jobtag (job_id, tag_id) VALUES (?, ?)", *targetJob.ID, tagID) + // 3. Link auto-tagger tag to job + _, err = r.DB.Exec("INSERT INTO jobtag (job_id, tag_id) VALUES (?, ?)", *targetJob.ID, appTagID) if err != nil { t.Fatal(err) } - // 4. Search with omitTagged = false (Should find the job) - jobsFound, err := r.FindJobsBetween(0, 9999999999, false) + // 4. Search with omitTagged = "none" (Should find the job) + jobsFound, err := r.FindJobsBetween(0, 9999999999, "none") if err != nil { t.Fatal(err) } @@ -115,18 +115,58 @@ func TestFindJobsBetween(t *testing.T) { } } if !found { - t.Errorf("Target job %d should be found when omitTagged=false", *targetJob.ID) + t.Errorf("Target job %d should be found when omitTagged=none", *targetJob.ID) } - // 5. Search with omitTagged = true (Should NOT find the job) - jobsFiltered, err := r.FindJobsBetween(0, 9999999999, true) + // 5. Search with omitTagged = "all" (Should NOT find the job — it has a tag) + jobsFiltered, err := r.FindJobsBetween(0, 9999999999, "all") if err != nil { t.Fatal(err) } for _, j := range jobsFiltered { if *j.ID == *targetJob.ID { - t.Errorf("Target job %d should NOT be found when omitTagged=true", *targetJob.ID) + t.Errorf("Target job %d should NOT be found when omitTagged=all", *targetJob.ID) + } + } + + // 6. Search with omitTagged = "user": auto-tagger tag ("app") should NOT exclude the job + jobsUserFilter, err := r.FindJobsBetween(0, 9999999999, "user") + if err != nil { + t.Fatal(err) + } + + found = false + for _, j := range jobsUserFilter { + if *j.ID == *targetJob.ID { + found = true + break + } + } + if !found { + t.Errorf("Target job %d should be found when omitTagged=user (only has auto-tagger tag)", *targetJob.ID) + } + + // 7. Add a user-created tag (type "testtype") to the same job + userTagName := fmt.Sprintf("usertag_%d", time.Now().UnixNano()) + userTagID, err := r.CreateTag("testtype", userTagName, "global") + if err != nil { + t.Fatal(err) + } + _, err = r.DB.Exec("INSERT INTO jobtag (job_id, tag_id) VALUES (?, ?)", *targetJob.ID, userTagID) + if err != nil { + t.Fatal(err) + } + + // 8. Now omitTagged = "user" should exclude the job (has a user-created tag) + jobsUserFilter2, err := r.FindJobsBetween(0, 9999999999, "user") + if err != nil { + t.Fatal(err) + } + + for _, j := range jobsUserFilter2 { + if *j.ID == *targetJob.ID { + t.Errorf("Target job %d should NOT be found when omitTagged=user (has user-created tag)", *targetJob.ID) } } } diff --git a/internal/repository/node.go b/internal/repository/node.go index 4b10aea3..09415bef 100644 --- a/internal/repository/node.go +++ b/internal/repository/node.go @@ -274,7 +274,7 @@ type NodeStateWithNode struct { func (r *NodeRepository) FindNodeStatesBefore(cutoff int64) ([]NodeStateWithNode, error) { rows, err := sq.Select( "node_state.id", "node_state.time_stamp", "node_state.node_state", - "node_state.health_state", "COALESCE(node_state.health_metrics, '')", + "node_state.health_state", "node_state.health_metrics", "node_state.cpus_allocated", "node_state.memory_allocated", "node_state.gpus_allocated", "node_state.jobs_running", "node.hostname", "node.cluster", "node.subcluster", @@ -293,13 +293,15 @@ func (r *NodeRepository) FindNodeStatesBefore(cutoff int64) ([]NodeStateWithNode var result []NodeStateWithNode for rows.Next() { var ns NodeStateWithNode + var healthMetrics sql.NullString if err := rows.Scan(&ns.ID, &ns.TimeStamp, &ns.NodeState, - &ns.HealthState, &ns.HealthMetrics, + &ns.HealthState, &healthMetrics, &ns.CpusAllocated, &ns.MemoryAllocated, &ns.GpusAllocated, &ns.JobsRunning, &ns.Hostname, &ns.Cluster, &ns.SubCluster); err != nil { return nil, err } + ns.HealthMetrics = healthMetrics.String result = append(result, ns) } return result, nil diff --git a/internal/tagger/detectApp.go b/internal/tagger/detectApp.go index 21667a27..c82c87bc 100644 --- a/internal/tagger/detectApp.go +++ b/internal/tagger/detectApp.go @@ -19,6 +19,14 @@ import ( "github.com/ClusterCockpit/cc-lib/v2/util" ) +func metadataKeys(m map[string]string) []string { + keys := make([]string, 0, len(m)) + for k := range m { + keys = append(keys, k) + } + return keys +} + const ( // defaultConfigPath is the default path for application tagging configuration defaultConfigPath = "./var/tagger/apps" @@ -158,31 +166,54 @@ func (t *AppTagger) Register() error { // Only the first matching application is tagged. func (t *AppTagger) Match(job *schema.Job) { r := repository.GetJobRepository() + + if len(t.apps) == 0 { + cclog.Warn("AppTagger: no app patterns loaded, skipping match") + return + } + metadata, err := r.FetchMetadata(job) if err != nil { - cclog.Infof("Cannot fetch metadata for job: %d on %s", job.JobID, job.Cluster) + cclog.Infof("AppTagger: cannot fetch metadata for job %d on %s: %v", job.JobID, job.Cluster, err) + return + } + + if metadata == nil { + cclog.Infof("AppTagger: metadata is nil for job %d on %s", job.JobID, job.Cluster) return } jobscript, ok := metadata["jobScript"] - if ok { - id := *job.ID - jobscriptLower := strings.ToLower(jobscript) + if !ok { + cclog.Infof("AppTagger: no 'jobScript' key in metadata for job %d on %s (keys: %v)", + job.JobID, job.Cluster, metadataKeys(metadata)) + return + } - out: - for _, a := range t.apps { - for _, re := range a.patterns { - if re.MatchString(jobscriptLower) { - if !r.HasTag(id, t.tagType, a.tag) { - if _, err := r.AddTagOrCreateDirect(id, t.tagType, a.tag); err != nil { - cclog.Errorf("AppTagger: failed to add tag '%s' to job %d: %v", a.tag, id, err) - } + if len(jobscript) == 0 { + cclog.Infof("AppTagger: empty jobScript for job %d on %s", job.JobID, job.Cluster) + return + } + + id := *job.ID + jobscriptLower := strings.ToLower(jobscript) + cclog.Debugf("AppTagger: matching job %d (script length: %d) against %d apps", id, len(jobscriptLower), len(t.apps)) + + for _, a := range t.apps { + for _, re := range a.patterns { + if re.MatchString(jobscriptLower) { + if r.HasTag(id, t.tagType, a.tag) { + cclog.Debugf("AppTagger: job %d already has tag %s:%s, skipping", id, t.tagType, a.tag) + } else { + cclog.Infof("AppTagger: pattern '%s' matched for app '%s' on job %d", re.String(), a.tag, id) + if _, err := r.AddTagOrCreateDirect(id, t.tagType, a.tag); err != nil { + cclog.Errorf("AppTagger: failed to add tag '%s' to job %d: %v", a.tag, id, err) } - break out } + return } } - } else { - cclog.Infof("Cannot extract job script for job: %d on %s", job.JobID, job.Cluster) } + + cclog.Debugf("AppTagger: no pattern matched for job %d on %s", id, job.Cluster) } diff --git a/internal/tagger/tagger.go b/internal/tagger/tagger.go index bde3817d..5ee27e08 100644 --- a/internal/tagger/tagger.go +++ b/internal/tagger/tagger.go @@ -10,6 +10,7 @@ package tagger import ( + "fmt" "sync" "github.com/ClusterCockpit/cc-backend/internal/repository" @@ -29,11 +30,31 @@ type Tagger interface { Match(job *schema.Job) } +// TaggerInfo holds metadata about a tagger for JSON serialization. +type TaggerInfo struct { + Name string `json:"name"` + Type string `json:"type"` + Running bool `json:"running"` +} + var ( - initOnce sync.Once - jobTagger *JobTagger + initOnce sync.Once + jobTagger *JobTagger + statusMu sync.Mutex + taggerStatus = map[string]bool{} ) +// Known tagger definitions: name -> (type, factory) +type taggerDef struct { + ttype string + factory func() Tagger +} + +var knownTaggers = map[string]taggerDef{ + "AppTagger": {ttype: "start", factory: func() Tagger { return &AppTagger{} }}, + "JobClassTagger": {ttype: "stop", factory: func() Tagger { return &JobClassTagger{} }}, +} + // JobTagger coordinates multiple taggers that run at different job lifecycle events. // It maintains separate lists of taggers that run when jobs start and when they stop. type JobTagger struct { @@ -88,6 +109,73 @@ func (jt *JobTagger) JobStopCallback(job *schema.Job) { } } +// ListTaggers returns information about all known taggers with their current running status. +func ListTaggers() []TaggerInfo { + statusMu.Lock() + defer statusMu.Unlock() + + result := make([]TaggerInfo, 0, len(knownTaggers)) + for name, def := range knownTaggers { + result = append(result, TaggerInfo{ + Name: name, + Type: def.ttype, + Running: taggerStatus[name], + }) + } + return result +} + +// RunTaggerByName starts a tagger by name asynchronously on all jobs. +// Returns an error if the name is unknown or the tagger is already running. +func RunTaggerByName(name string) error { + def, ok := knownTaggers[name] + if !ok { + return fmt.Errorf("unknown tagger: %s", name) + } + + statusMu.Lock() + if taggerStatus[name] { + statusMu.Unlock() + return fmt.Errorf("tagger %s is already running", name) + } + taggerStatus[name] = true + statusMu.Unlock() + + go func() { + defer func() { + statusMu.Lock() + taggerStatus[name] = false + statusMu.Unlock() + }() + + t := def.factory() + if err := t.Register(); err != nil { + cclog.Errorf("Failed to register tagger %s: %s", name, err) + return + } + + r := repository.GetJobRepository() + jl, err := r.GetJobList(0, 0) + if err != nil { + cclog.Errorf("Error getting job list for tagger %s: %s", name, err) + return + } + + cclog.Infof("Running tagger %s on %d jobs", name, len(jl)) + for _, id := range jl { + job, err := r.FindByIDDirect(id) + if err != nil { + cclog.Errorf("Error getting job %d for tagger %s: %s", id, name, err) + continue + } + t.Match(job) + } + cclog.Infof("Tagger %s completed", name) + }() + + return nil +} + // RunTaggers applies all configured taggers to all existing jobs in the repository. // This is useful for retroactively applying tags to jobs that were created before // the tagger system was initialized or when new tagging rules are added. diff --git a/internal/taskmanager/compressionService.go b/internal/taskmanager/compressionService.go index ab01ce8f..353fcb65 100644 --- a/internal/taskmanager/compressionService.go +++ b/internal/taskmanager/compressionService.go @@ -28,10 +28,10 @@ func RegisterCompressionService(compressOlderThan int) { lastTime := ar.CompressLast(startTime) if startTime == lastTime { cclog.Info("Compression Service - Complete archive run") - jobs, err = jobRepo.FindJobsBetween(0, startTime, false) + jobs, err = jobRepo.FindJobsBetween(0, startTime, "none") } else { - jobs, err = jobRepo.FindJobsBetween(lastTime, startTime, false) + jobs, err = jobRepo.FindJobsBetween(lastTime, startTime, "none") } if err != nil { diff --git a/internal/taskmanager/retentionService.go b/internal/taskmanager/retentionService.go index eda452e6..48e5c042 100644 --- a/internal/taskmanager/retentionService.go +++ b/internal/taskmanager/retentionService.go @@ -149,7 +149,7 @@ func transferJobsParquet(jobs []*schema.Job, src archive.ArchiveBackend, target } // cleanupAfterTransfer removes jobs from archive and optionally from DB. -func cleanupAfterTransfer(jobs []*schema.Job, startTime int64, includeDB bool, omitTagged bool) { +func cleanupAfterTransfer(jobs []*schema.Job, startTime int64, includeDB bool, omitTagged string) { archive.GetHandle().CleanUp(jobs) if includeDB { diff --git a/internal/taskmanager/taskManager.go b/internal/taskmanager/taskManager.go index b25b2a93..d758ee52 100644 --- a/internal/taskmanager/taskManager.go +++ b/internal/taskmanager/taskManager.go @@ -26,8 +26,8 @@ type Retention struct { Policy string `json:"policy"` Format string `json:"format"` Age int `json:"age"` - IncludeDB bool `json:"includeDB"` - OmitTagged bool `json:"omitTagged"` + IncludeDB bool `json:"include-db"` + OmitTagged string `json:"omit-tagged"` TargetKind string `json:"target-kind"` TargetPath string `json:"target-path"` TargetEndpoint string `json:"target-endpoint"` diff --git a/pkg/archive/ConfigSchema.go b/pkg/archive/ConfigSchema.go index cb9b16bc..1c2b7fe1 100644 --- a/pkg/archive/ConfigSchema.go +++ b/pkg/archive/ConfigSchema.go @@ -68,6 +68,11 @@ var configSchema = ` "description": "Also remove jobs from database", "type": "boolean" }, + "omit-tagged": { + "description": "Omit tagged jobs from retention: none = include all, all = omit any tagged job, user = omit jobs with user-created tags (auto-tagger types 'app'/'jobClass' are not considered user tags)", + "type": "string", + "enum": ["none", "all", "user"] + }, "age": { "description": "Act on jobs with startTime older than age (in days)", "type": "integer" diff --git a/web/frontend/package-lock.json b/web/frontend/package-lock.json index 6962dc1b..e293d650 100644 --- a/web/frontend/package-lock.json +++ b/web/frontend/package-lock.json @@ -609,6 +609,12 @@ "dev": true, "license": "MIT" }, + "node_modules/@types/trusted-types": { + "version": "2.0.7", + "resolved": "https://registry.npmjs.org/@types/trusted-types/-/trusted-types-2.0.7.tgz", + "integrity": "sha512-ScaPdn1dQczgbl0QFTeTOmVHFULt394XJgOQNoyVhZ6r2vLnMLJfBPd53SB52T/3G36VI1/g2MZaX0cwDuXsfw==", + "license": "MIT" + }, "node_modules/@urql/core": { "version": "5.2.0", "resolved": "https://registry.npmjs.org/@urql/core/-/core-5.2.0.tgz", @@ -745,9 +751,9 @@ } }, "node_modules/devalue": { - "version": "5.6.2", - "resolved": "https://registry.npmjs.org/devalue/-/devalue-5.6.2.tgz", - "integrity": "sha512-nPRkjWzzDQlsejL1WVifk5rvcFi/y1onBRxjaFMjZeR9mFpqu2gmAZ9xUB9/IEanEP/vBtGeGganC/GO1fmufg==", + "version": "5.6.3", + "resolved": "https://registry.npmjs.org/devalue/-/devalue-5.6.3.tgz", + "integrity": "sha512-nc7XjUU/2Lb+SvEFVGcWLiKkzfw8+qHI7zn8WYXKkLMgfGSHbgCEaR6bJpev8Cm6Rmrb19Gfd/tZvGqx9is3wg==", "license": "MIT" }, "node_modules/escape-latex": { @@ -763,9 +769,9 @@ "license": "MIT" }, "node_modules/esrap": { - "version": "2.2.1", - "resolved": "https://registry.npmjs.org/esrap/-/esrap-2.2.1.tgz", - "integrity": "sha512-GiYWG34AN/4CUyaWAgunGt0Rxvr1PTMlGC0vvEov/uOQYWne2bpN03Um+k8jT+q3op33mKouP2zeJ6OlM+qeUg==", + "version": "2.2.3", + "resolved": "https://registry.npmjs.org/esrap/-/esrap-2.2.3.tgz", + "integrity": "sha512-8fOS+GIGCQZl/ZIlhl59htOlms6U8NvX6ZYgYHpRU/b6tVSh3uHkOHZikl3D4cMbYM0JlpBe+p/BkZEi8J9XIQ==", "license": "MIT", "dependencies": { "@jridgewell/sourcemap-codec": "^1.4.15" @@ -1176,22 +1182,23 @@ } }, "node_modules/svelte": { - "version": "5.46.4", - "resolved": "https://registry.npmjs.org/svelte/-/svelte-5.46.4.tgz", - "integrity": "sha512-VJwdXrmv9L8L7ZasJeWcCjoIuMRVbhuxbss0fpVnR8yorMmjNDwcjIH08vS6wmSzzzgAG5CADQ1JuXPS2nwt9w==", + "version": "5.53.2", + "resolved": "https://registry.npmjs.org/svelte/-/svelte-5.53.2.tgz", + "integrity": "sha512-yGONuIrcl/BMmqbm6/52Q/NYzfkta7uVlos5NSzGTfNJTTFtPPzra6rAQoQIwAqupeM3s9uuTf5PvioeiCdg9g==", "license": "MIT", "dependencies": { "@jridgewell/remapping": "^2.3.4", "@jridgewell/sourcemap-codec": "^1.5.0", "@sveltejs/acorn-typescript": "^1.0.5", "@types/estree": "^1.0.5", + "@types/trusted-types": "^2.0.7", "acorn": "^8.12.1", "aria-query": "^5.3.1", "axobject-query": "^4.1.0", "clsx": "^2.1.1", - "devalue": "^5.6.2", + "devalue": "^5.6.3", "esm-env": "^1.2.1", - "esrap": "^2.2.1", + "esrap": "^2.2.2", "is-reference": "^3.0.3", "locate-character": "^3.0.0", "magic-string": "^0.30.11", diff --git a/web/frontend/src/config/AdminSettings.svelte b/web/frontend/src/config/AdminSettings.svelte index 46958963..5d61e66c 100644 --- a/web/frontend/src/config/AdminSettings.svelte +++ b/web/frontend/src/config/AdminSettings.svelte @@ -15,6 +15,7 @@ import ShowUsers from "./admin/ShowUsers.svelte"; import Options from "./admin/Options.svelte"; import NoticeEdit from "./admin/NoticeEdit.svelte"; + import RunTaggers from "./admin/RunTaggers.svelte"; /* Svelte 5 Props */ let { @@ -70,4 +71,5 @@ + diff --git a/web/frontend/src/config/admin/RunTaggers.svelte b/web/frontend/src/config/admin/RunTaggers.svelte new file mode 100644 index 00000000..0df3e961 --- /dev/null +++ b/web/frontend/src/config/admin/RunTaggers.svelte @@ -0,0 +1,142 @@ + + + + + + + + Job Taggers +

Run individual taggers on all existing jobs.

+ {#if taggers.length === 0} +

No taggers available.

+ {:else} + + + + + + + + + + + {#each taggers as tagger} + + + + + + + {/each} + +
NameTypeStatus
{tagger.name}{tagger.type} + {#if tagger.running} + Running + {:else} + Idle + {/if} + + +
+ {/if} +

+ {#if displayMessage}{message.msg}{/if} +

+
+
+ diff --git a/web/frontend/src/generic/joblist/JobListRow.svelte b/web/frontend/src/generic/joblist/JobListRow.svelte index 9db340d4..3963708f 100644 --- a/web/frontend/src/generic/joblist/JobListRow.svelte +++ b/web/frontend/src/generic/joblist/JobListRow.svelte @@ -20,6 +20,7 @@ import { queryStore, gql, getContextClient } from "@urql/svelte"; import { Card, Spinner } from "@sveltestrap/sveltestrap"; import { maxScope, checkMetricAvailability } from "../utils.js"; + import uPlot from "uplot"; import JobInfo from "./JobInfo.svelte"; import MetricPlot from "../plots/MetricPlot.svelte"; import JobFootprint from "../helper/JobFootprint.svelte"; @@ -74,13 +75,17 @@ } `; + /* Var Init*/ + // svelte-ignore state_referenced_locally + let plotSync = uPlot.sync(`jobMetricStack-${job.cluster}-${job.id}`); + /* State Init */ let zoomStates = $state({}); let thresholdStates = $state({}); /* Derived */ const resampleDefault = $derived(resampleConfig ? Math.max(...resampleConfig.resolutions) : 0); - const jobId = $derived(job?.id); + const jobId = $derived(job.id); const scopes = $derived.by(() => { if (job.numNodes == 1) { if (job.numAcc >= 1) return ["core", "accelerator"]; @@ -233,6 +238,7 @@ numaccs={job.numAcc} zoomState={zoomStates[metric.data.name] || null} thresholdState={thresholdStates[metric.data.name] || null} + {plotSync} /> {:else} diff --git a/web/frontend/src/status/dashdetails/HealthDash.svelte b/web/frontend/src/status/dashdetails/HealthDash.svelte index 9bf0d5af..11f1ef31 100644 --- a/web/frontend/src/status/dashdetails/HealthDash.svelte +++ b/web/frontend/src/status/dashdetails/HealthDash.svelte @@ -32,12 +32,28 @@ /* Const Init */ const client = getContextClient(); + const stateOptions = [ + "all", + "allocated", + "idle", + "down", + "mixed", + "reserved", + "unknown", + ]; + const healthOptions = [ + "all", + "full", + "partial", + "failed", + ]; /* State Init */ let pieWidth = $state(0); + let querySorting = $state({ field: "startTime", type: "col", order: "DESC" }) let tableHostFilter = $state(""); - let tableStateFilter = $state(""); - let tableHealthFilter = $state(""); + let tableStateFilter = $state(stateOptions[0]); + let tableHealthFilter = $state(healthOptions[0]); let healthTableSorting = $state( { schedulerState: { dir: "down", active: true }, @@ -78,7 +94,7 @@ `, variables: { nodeFilter: { cluster: { eq: cluster }}, - sorting: { field: "startTime", type: "col", order: "DESC" }, + sorting: querySorting, }, requestPolicy: "network-only" })); @@ -98,10 +114,10 @@ if (tableHostFilter != "") { pendingTableData = pendingTableData.filter((e) => e.hostname.includes(tableHostFilter)) } - if (tableStateFilter != "") { + if (tableStateFilter != "all") { pendingTableData = pendingTableData.filter((e) => e.schedulerState.includes(tableStateFilter)) } - if (tableHealthFilter != "") { + if (tableHealthFilter != "all") { pendingTableData = pendingTableData.filter((e) => e.healthState.includes(tableHealthFilter)) } return pendingTableData @@ -148,7 +164,7 @@ { - sorting = { field: "startTime", type: "col", order: "DESC" } + querySorting = { field: "startTime", type: "col", order: "DESC" }; }} /> @@ -280,8 +296,8 @@ - sortBy('hostname')}> - Host + sortBy('hostname')}> + Hosts ({filteredTableData.length}) - sortBy('schedulerState')}> + sortBy('schedulerState')}> Scheduler State - sortBy('healthState')}> + sortBy('healthState')}> Health State - + + {#each stateOptions as so} + + {/each} + @@ -330,7 +350,11 @@ - + + {#each healthOptions as ho} + + {/each} + diff --git a/web/frontend/src/systems/nodelist/NodeListRow.svelte b/web/frontend/src/systems/nodelist/NodeListRow.svelte index 4689ed21..558d0642 100644 --- a/web/frontend/src/systems/nodelist/NodeListRow.svelte +++ b/web/frontend/src/systems/nodelist/NodeListRow.svelte @@ -211,6 +211,7 @@ timestep={metricData.data.metric.timestep} series={metricData.data.metric.series} height={375} + {plotSync} forNode /> {/if}