diff --git a/Makefile b/Makefile
index 3246538a..5829beae 100644
--- a/Makefile
+++ b/Makefile
@@ -84,4 +84,4 @@ $(VAR):
$(SVELTE_TARGETS): $(SVELTE_SRC)
$(info ===> BUILD frontend)
- cd web/frontend && npm install && npm run build
+ cd web/frontend && npm ci && npm run build
diff --git a/ReleaseNotes.md b/ReleaseNotes.md
index 7ea43620..3d352f20 100644
--- a/ReleaseNotes.md
+++ b/ReleaseNotes.md
@@ -16,19 +16,25 @@ For release specific notes visit the [ClusterCockpit Documentation](https://clus
- **Removed `disable-archive` option**: This obsolete configuration option has been removed.
- **Removed `clusters` config section**: The separate clusters configuration section
has been removed. Cluster information is now derived from the job archive.
-- **`apiAllowedIPs` is now optional**: If not specified, defaults to secure settings.
+- **`apiAllowedIPs` is now optional**: If not specified, defaults to not
+ restricted.
### Architecture changes
+- **Web framework replaced**: Migrated from `gorilla/mux` to `chi` as the HTTP
+ router. This should be transparent to users but affects how middleware and
+ routes are composed. A proper 404 handler is now in place.
- **MetricStore moved**: The `metricstore` package has been moved from `internal/`
to `pkg/` as it is now part of the public API.
- **MySQL/MariaDB support removed**: Only SQLite is now supported as the database backend.
- **Archive to Cleanup renaming**: Archive-related functions have been refactored
and renamed to "Cleanup" for clarity.
+- **`minRunningFor` filter removed**: This undocumented filter has been removed
+ from the API and frontend.
### Dependency changes
-- **cc-lib v2**: Switched to cc-lib version 2 with updated APIs
+- **cc-lib v2.5.1**: Switched to cc-lib version 2 with updated APIs (currently at v2.5.1)
- **cclib NATS client**: Now using the cclib NATS client implementation
- Removed obsolete `util.Float` usage from cclib
@@ -51,13 +57,30 @@ For release specific notes visit the [ClusterCockpit Documentation](https://clus
- **Node state tracking**: New node table in database with timestamp tracking
- **Node state filtering**: Filter jobs by node state in systems view
-- **Node metrics improvements**: Better handling of node-level metrics and data
- **Node list enhancements**: Improved paging, filtering, and continuous scroll support
+- **Nodestate retention and archiving**: Node state data is now subject to configurable
+ retention policies and can be archived to Parquet format for long-term storage
+- **Faulty node metric tracking**: Faulty node state metric lists are persisted to the database
+
+### Health Monitoring
+
+- **Health status dashboard**: New dedicated "Health" tab in the status details view
+ showing per-node metric health across the cluster
+- **CCMS health check**: Support for querying health status of external
+ cc-metric-store (CCMS) instances via the API
+- **GraphQL health endpoints**: New GraphQL queries and resolvers for health data
+- **Cluster/subcluster filter**: Filter health status view by cluster or subcluster
+
+### Log Viewer
+
+- **Web-based log viewer**: New log viewer page in the admin interface for inspecting
+ backend log output directly from the browser without shell access
+- **Accessible from header**: Quick access link from the navigation header
### MetricStore Improvements
- **Memory tracking worker**: New worker for CCMS memory usage tracking
-- **Dynamic retention**: Support for cluster/subcluster-specific retention times
+- **Dynamic retention**: Support for job specific dynamic retention times
- **Improved compression**: Transparent compression for job archive imports
- **Parallel processing**: Parallelized Iter function in all archive backends
@@ -65,15 +88,32 @@ For release specific notes visit the [ClusterCockpit Documentation](https://clus
- **Job tagger option**: Enable automatic job tagging via configuration flag
- **Application detection**: Automatic detection of applications (MATLAB, GROMACS, etc.)
-- **Job classifaction**: Automatic detection of pathological jobs
-- **omitTagged flag**: Option to exclude tagged jobs from retention/cleanup operations
+- **Job classification**: Automatic detection of pathological jobs
+- **omit-tagged**: Option to exclude tagged jobs from retention/cleanup operations (`none`, `all`, or `user`)
+- **Admin UI trigger**: Taggers can be run on-demand from the admin web interface
+ without restarting the backend
### Archive Backends
+- **Parquet archive format**: New Parquet file format for job archiving, providing
+ columnar storage with efficient compression for analytical workloads
- **S3 backend**: Full support for S3-compatible object storage
- **SQLite backend**: Full support for SQLite backend using blobs
- **Performance improvements**: Fixed performance bugs in archive backends
- **Better error handling**: Improved error messages and fallback handling
+- **Zstd compression**: Parquet writers use zstd compression for better
+ compression ratios compared to the previous snappy default
+- **Optimized sort order**: Job and nodestate Parquet files are sorted by
+ cluster, subcluster, and start time for efficient range queries
+
+### Unified Archive Retention and Format Conversion
+
+- **Uniform retention policy**: Job archive retention now supports both JSON and
+ Parquet as target formats under a single, consistent policy configuration
+- **Archive manager tool**: The `tools/archive-manager` utility now supports
+ format conversion between JSON and Parquet job archives
+- **Parquet reader**: Full Parquet archive reader implementation for reading back
+ archived job data
## New features and improvements
@@ -85,6 +125,14 @@ For release specific notes visit the [ClusterCockpit Documentation](https://clus
- **Filter presets**: Move list filter preset to URL for easy sharing
- **Job comparison**: Improved job comparison views and plots
- **Subcluster reactivity**: Job list now reacts to subcluster filter changes
+- **Short jobs quick selection**: New "Short jobs" quick-filter button in job lists
+ replaces the removed undocumented `minRunningFor` filter
+- **Row plot cursor sync**: Cursor position is now synchronized across all metric
+ plots in a job list row for easier cross-metric comparison
+- **Disabled metrics handling**: Improved handling and display of disabled metrics
+ across job view, node view, and list rows
+- **"Not configured" info cards**: Informational cards shown when optional features
+ are not yet configured
- **Frontend dependencies**: Bumped frontend dependencies to latest versions
- **Svelte 5 compatibility**: Fixed Svelte state warnings and compatibility issues
@@ -95,6 +143,15 @@ For release specific notes visit the [ClusterCockpit Documentation](https://clus
- **Graceful shutdown**: Fixed shutdown timeout bugs and hanging issues
- **Configuration defaults**: Sensible defaults for most configuration options
- **Documentation**: Extensive documentation improvements across packages
+- **Server flag in systemd unit**: Example systemd unit now includes the `-server` flag
+
+### Security
+
+- **LDAP security hardening**: Improved input validation, connection handling, and
+ error reporting in the LDAP authenticator
+- **OIDC security hardening**: Stricter token validation and improved error handling
+ in the OIDC authenticator
+- **Auth schema extensions**: Additional schema fields for improved auth configuration
### API improvements
@@ -102,6 +159,8 @@ For release specific notes visit the [ClusterCockpit Documentation](https://clus
- **Job exclusivity filter**: New filter for exclusive vs. shared jobs
- **Improved error messages**: Better error messages and documentation in REST API
- **GraphQL enhancements**: Improved GraphQL queries and resolvers
+- **Stop job lookup order**: Reversed lookup order in stop job requests for
+ more reliable job matching (cluster+jobId first, then jobId alone)
### Performance
@@ -109,13 +168,17 @@ For release specific notes visit the [ClusterCockpit Documentation](https://clus
- **Job cache**: Introduced caching table for faster job inserts
- **Parallel imports**: Archive imports now run in parallel where possible
- **External tool integration**: Optimized use of external tools (fd) for better performance
+- **Node repository queries**: Reviewed and optimized node repository SQL queries
+- **Buffer pool**: Resized and pooled internal buffers for better memory reuse
### Developer experience
- **AI agent guidelines**: Added documentation for AI coding agents (AGENTS.md, CLAUDE.md)
- **Example API payloads**: Added example JSON API payloads for testing
-- **Unit tests**: Added more unit tests for NATS API and other components
-- **Test improvements**: Better test coverage and test data
+- **Unit tests**: Added more unit tests for NATS API, node repository, and other components
+- **Test improvements**: Better test coverage; test DB is now copied before unit tests
+ to avoid state pollution between test runs
+- **Parquet writer tests**: Comprehensive tests for Parquet archive writing and conversion
## Bug fixes
@@ -132,6 +195,16 @@ For release specific notes visit the [ClusterCockpit Documentation](https://clus
- Fixed polar plot data query decoupling
- Fixed missing resolution parameter handling
- Fixed node table initialization fallback
+- Fixed reactivity key placement in nodeList
+- Fixed nodeList resolver data handling and increased nodestate filter cutoff
+- Fixed job always being transferred to main job table before archiving
+- Fixed AppTagger error handling and logging
+- Fixed log endpoint formatting and correctness
+- Fixed automatic refresh in metric status tab
+- Fixed NULL value handling in `health_state` and `health_metrics` columns
+- Fixed bugs related to `job_cache` IDs being used in the main job table
+- Fixed SyncJobs bug causing start job hooks to be called with wrong (cache) IDs
+- Fixed 404 handler route for sub-routers
## Configuration changes
@@ -167,6 +240,20 @@ For release specific notes visit the [ClusterCockpit Documentation](https://clus
"interval": "48h",
"directory": "./var/archive"
}
+ },
+ "archive": {
+ "retention": {
+ "policy": "delete",
+ "age": "6months",
+ "target-format": "parquet"
+ }
+ },
+ "nodestate": {
+ "retention": {
+ "policy": "archive",
+ "age": "30d",
+ "archive-path": "./var/nodestate-archive"
+ }
}
}
```
@@ -178,11 +265,13 @@ For release specific notes visit the [ClusterCockpit Documentation](https://clus
- If using S3 archive backend, configure the new `archive` section options
- Test the new public dashboard at `/public` route
- Review cron worker configuration if you need different frequencies
+- If using the archive retention feature, configure the `target-format` option
+ to choose between `json` (default) and `parquet` output formats
+- Consider enabling nodestate retention if you track node states over time
## Known issues
- Currently energy footprint metrics of type energy are ignored for calculating
total energy.
-- Resampling for running jobs only works with cc-metric-store
- With energy footprint metrics of type power the unit is ignored and it is
assumed the metric has the unit Watt.
diff --git a/api/swagger.json b/api/swagger.json
index 42ed7bb6..c9c36de1 100644
--- a/api/swagger.json
+++ b/api/swagger.json
@@ -389,8 +389,71 @@
]
}
},
+ "/api/jobs/edit_meta/": {
+ "patch": {
+ "security": [
+ {
+ "ApiKeyAuth": []
+ }
+ ],
+ "description": "Edit key value pairs in metadata json of job specified by jobID, StartTime and Cluster\nIf a key already exists its content will be overwritten",
+ "consumes": [
+ "application/json"
+ ],
+ "produces": [
+ "application/json"
+ ],
+ "tags": [
+ "Job add and modify"
+ ],
+ "summary": "Edit meta-data json by request",
+ "parameters": [
+ {
+ "description": "Specifies job and payload to add or update",
+ "name": "request",
+ "in": "body",
+ "required": true,
+ "schema": {
+ "$ref": "#/definitions/api.JobMetaRequest"
+ }
+ }
+ ],
+ "responses": {
+ "200": {
+ "description": "Updated job resource",
+ "schema": {
+ "$ref": "#/definitions/schema.Job"
+ }
+ },
+ "400": {
+ "description": "Bad Request",
+ "schema": {
+ "$ref": "#/definitions/api.ErrorResponse"
+ }
+ },
+ "401": {
+ "description": "Unauthorized",
+ "schema": {
+ "$ref": "#/definitions/api.ErrorResponse"
+ }
+ },
+ "404": {
+ "description": "Job does not exist",
+ "schema": {
+ "$ref": "#/definitions/api.ErrorResponse"
+ }
+ },
+ "500": {
+ "description": "Internal Server Error",
+ "schema": {
+ "$ref": "#/definitions/api.ErrorResponse"
+ }
+ }
+ }
+ }
+ },
"/api/jobs/edit_meta/{id}": {
- "post": {
+ "patch": {
"description": "Edit key value pairs in job metadata json\nIf a key already exists its content will be overwritten",
"consumes": [
"application/json"
diff --git a/api/swagger.yaml b/api/swagger.yaml
index 0bf60082..def939dd 100644
--- a/api/swagger.yaml
+++ b/api/swagger.yaml
@@ -102,6 +102,27 @@ definitions:
description: Page id returned
type: integer
type: object
+ api.JobMetaRequest:
+ properties:
+ cluster:
+ description: Cluster of job
+ example: fritz
+ type: string
+ jobId:
+ description: Cluster Job ID of job
+ example: 123000
+ type: integer
+ payload:
+ allOf:
+ - $ref: '#/definitions/api.EditMetaRequest'
+ description: Content to Add to Job Meta_Data
+ startTime:
+ description: Start Time of job as epoch
+ example: 1649723812
+ type: integer
+ required:
+ - jobId
+ type: object
api.JobMetricWithName:
properties:
metric:
@@ -1091,8 +1112,50 @@ paths:
summary: Remove a job from the sql database
tags:
- Job remove
+ /api/jobs/edit_meta/:
+ patch:
+ consumes:
+ - application/json
+ description: |-
+ Edit key value pairs in metadata json of job specified by jobID, StartTime and Cluster
+ If a key already exists its content will be overwritten
+ parameters:
+ - description: Specifies job and payload to add or update
+ in: body
+ name: request
+ required: true
+ schema:
+ $ref: '#/definitions/api.JobMetaRequest'
+ produces:
+ - application/json
+ responses:
+ "200":
+ description: Updated job resource
+ schema:
+ $ref: '#/definitions/schema.Job'
+ "400":
+ description: Bad Request
+ schema:
+ $ref: '#/definitions/api.ErrorResponse'
+ "401":
+ description: Unauthorized
+ schema:
+ $ref: '#/definitions/api.ErrorResponse'
+ "404":
+ description: Job does not exist
+ schema:
+ $ref: '#/definitions/api.ErrorResponse'
+ "500":
+ description: Internal Server Error
+ schema:
+ $ref: '#/definitions/api.ErrorResponse'
+ security:
+ - ApiKeyAuth: []
+ summary: Edit meta-data json by request
+ tags:
+ - Job add and modify
/api/jobs/edit_meta/{id}:
- post:
+ patch:
consumes:
- application/json
description: |-
diff --git a/cmd/cc-backend/main.go b/cmd/cc-backend/main.go
index fde95fd3..81d397d2 100644
--- a/cmd/cc-backend/main.go
+++ b/cmd/cc-backend/main.go
@@ -369,13 +369,11 @@ func runServer(ctx context.Context) error {
errChan := make(chan error, 1)
// Start HTTP server
- wg.Add(1)
- go func() {
- defer wg.Done()
+ wg.Go(func() {
if err := srv.Start(ctx); err != nil {
errChan <- err
}
- }()
+ })
// Handle shutdown signals
wg.Add(1)
diff --git a/configs/tagger/jobclasses/highMemoryUsage.json b/configs/tagger/jobclasses/highMemoryUsage.json
new file mode 100644
index 00000000..3c10b06f
--- /dev/null
+++ b/configs/tagger/jobclasses/highMemoryUsage.json
@@ -0,0 +1,25 @@
+{
+ "name": "High memory usage",
+ "tag": "highmemory",
+ "parameters": [
+ "highmemoryusage_threshold_factor",
+ "job_min_duration_seconds"
+ ],
+ "metrics": ["mem_used"],
+ "requirements": [
+ "job.shared == \"none\"",
+ "job.duration > job_min_duration_seconds"
+ ],
+ "variables": [
+ {
+ "name": "memory_threshold",
+ "expr": "mem_used.limits.peak * highmemoryusage_threshold_factor"
+ },
+ {
+ "name": "memory_usage_pct",
+ "expr": "mem_used.max / mem_used.limits.peak * 100.0"
+ }
+ ],
+ "rule": "mem_used.max > memory_threshold",
+ "hint": "This job used high memory: peak memory usage {{.mem_used.max}} GB ({{.memory_usage_pct}}% of {{.mem_used.limits.peak}} GB node capacity), exceeding the {{.highmemoryusage_threshold_factor}} utilization threshold. Risk of out-of-memory conditions."
+}
diff --git a/configs/tagger/jobclasses/highload.json b/configs/tagger/jobclasses/highload.json
index 9667011b..a442a3ac 100644
--- a/configs/tagger/jobclasses/highload.json
+++ b/configs/tagger/jobclasses/highload.json
@@ -3,8 +3,7 @@
"tag": "excessiveload",
"parameters": [
"excessivecpuload_threshold_factor",
- "job_min_duration_seconds",
- "sampling_interval_seconds"
+ "job_min_duration_seconds"
],
"metrics": ["cpu_load"],
"requirements": [
@@ -15,12 +14,8 @@
{
"name": "load_threshold",
"expr": "cpu_load.limits.peak * excessivecpuload_threshold_factor"
- },
- {
- "name": "load_perc",
- "expr": "1.0 - (cpu_load.avg / cpu_load.limits.peak)"
}
],
"rule": "cpu_load.avg > load_threshold",
- "hint": "This job was detected as excessiveload because the average cpu load {{.cpu_load.avg}} falls above the threshold {{.load_threshold}}."
+ "hint": "This job was detected as having excessive CPU load: average cpu load {{.cpu_load.avg}} exceeds the oversubscription threshold {{.load_threshold}} ({{.excessivecpuload_threshold_factor}} \u00d7 {{.cpu_load.limits.peak}} peak cores), indicating CPU contention."
}
diff --git a/configs/tagger/jobclasses/lowUtilization.json b/configs/tagger/jobclasses/lowUtilization.json
index e84b81da..1d365150 100644
--- a/configs/tagger/jobclasses/lowUtilization.json
+++ b/configs/tagger/jobclasses/lowUtilization.json
@@ -1,5 +1,5 @@
{
- "name": "Low ressource utilization",
+ "name": "Low resource utilization",
"tag": "lowutilization",
"parameters": ["job_min_duration_seconds"],
"metrics": ["flops_any", "mem_bw"],
@@ -9,14 +9,14 @@
],
"variables": [
{
- "name": "mem_bw_perc",
- "expr": "1.0 - (mem_bw.avg / mem_bw.limits.peak)"
+ "name": "mem_bw_pct",
+ "expr": "mem_bw.avg / mem_bw.limits.peak * 100.0"
},
{
- "name": "flops_any_perc",
- "expr": "1.0 - (flops_any.avg / flops_any.limits.peak)"
+ "name": "flops_any_pct",
+ "expr": "flops_any.avg / flops_any.limits.peak * 100.0"
}
],
"rule": "flops_any.avg < flops_any.limits.alert && mem_bw.avg < mem_bw.limits.alert",
- "hint": "This job was detected as low utilization because the average flop rate {{.flops_any.avg}} falls below the threshold {{.flops_any.limits.alert}}."
+ "hint": "This job shows low resource utilization: FLOP rate {{.flops_any.avg}} GF/s ({{.flops_any_pct}}% of peak) and memory bandwidth {{.mem_bw.avg}} GB/s ({{.mem_bw_pct}}% of peak) are both below their alert thresholds."
}
diff --git a/configs/tagger/jobclasses/lowload.json b/configs/tagger/jobclasses/lowload.json
index f952da59..7fa3ca3b 100644
--- a/configs/tagger/jobclasses/lowload.json
+++ b/configs/tagger/jobclasses/lowload.json
@@ -3,8 +3,7 @@
"tag": "lowload",
"parameters": [
"lowcpuload_threshold_factor",
- "job_min_duration_seconds",
- "sampling_interval_seconds"
+ "job_min_duration_seconds"
],
"metrics": ["cpu_load"],
"requirements": [
@@ -15,12 +14,8 @@
{
"name": "load_threshold",
"expr": "job.numCores * lowcpuload_threshold_factor"
- },
- {
- "name": "load_perc",
- "expr": "1.0 - (cpu_load.avg / cpu_load.limits.peak)"
}
],
- "rule": "cpu_load.avg < cpu_load.limits.caution",
- "hint": "This job was detected as lowload because the average cpu load {{.cpu_load}} falls below the threshold {{.cpu_load.limits.caution}}."
+ "rule": "cpu_load.avg < load_threshold",
+ "hint": "This job was detected as low CPU load: average cpu load {{.cpu_load.avg}} is below the threshold {{.load_threshold}} ({{.lowcpuload_threshold_factor}} \u00d7 {{.job.numCores}} allocated cores)."
}
diff --git a/configs/tagger/jobclasses/memoryBound.json b/configs/tagger/jobclasses/memoryBound.json
new file mode 100644
index 00000000..01368c08
--- /dev/null
+++ b/configs/tagger/jobclasses/memoryBound.json
@@ -0,0 +1,22 @@
+{
+ "name": "Memory bandwidth bound",
+ "tag": "memorybound",
+ "parameters": ["membound_bw_threshold_factor", "job_min_duration_seconds"],
+ "metrics": ["mem_bw"],
+ "requirements": [
+ "job.shared == \"none\"",
+ "job.duration > job_min_duration_seconds"
+ ],
+ "variables": [
+ {
+ "name": "mem_bw_threshold",
+ "expr": "mem_bw.limits.peak * membound_bw_threshold_factor"
+ },
+ {
+ "name": "mem_bw_pct",
+ "expr": "mem_bw.avg / mem_bw.limits.peak * 100.0"
+ }
+ ],
+ "rule": "mem_bw.avg > mem_bw_threshold",
+ "hint": "This job is memory bandwidth bound: memory bandwidth {{.mem_bw.avg}} GB/s ({{.mem_bw_pct}}% of peak) is within {{.membound_bw_threshold_factor}} of peak bandwidth. Consider improving data reuse or compute intensity."
+}
diff --git a/configs/tagger/jobclasses/parameters.json b/configs/tagger/jobclasses/parameters.json
index 39e94c1c..c3fb5cdc 100644
--- a/configs/tagger/jobclasses/parameters.json
+++ b/configs/tagger/jobclasses/parameters.json
@@ -1,11 +1,12 @@
{
- "lowcpuload_threshold_factor": 0.9,
- "excessivecpuload_threshold_factor": 1.1,
+ "lowcpuload_threshold_factor": 0.85,
+ "excessivecpuload_threshold_factor": 1.2,
"highmemoryusage_threshold_factor": 0.9,
"node_load_imbalance_threshold_factor": 0.1,
"core_load_imbalance_threshold_factor": 0.1,
"high_memory_load_threshold_factor": 0.9,
"lowgpuload_threshold_factor": 0.7,
+ "membound_bw_threshold_factor": 0.8,
"memory_leak_slope_threshold": 0.1,
"job_min_duration_seconds": 600.0,
"sampling_interval_seconds": 30.0,
diff --git a/go.mod b/go.mod
index b790c0c8..e244062c 100644
--- a/go.mod
+++ b/go.mod
@@ -9,7 +9,7 @@ tool (
require (
github.com/99designs/gqlgen v0.17.86
- github.com/ClusterCockpit/cc-lib/v2 v2.5.1
+ github.com/ClusterCockpit/cc-lib/v2 v2.6.0
github.com/Masterminds/squirrel v1.5.4
github.com/aws/aws-sdk-go-v2 v1.41.1
github.com/aws/aws-sdk-go-v2/config v1.32.8
diff --git a/go.sum b/go.sum
index c319c6ba..f2929454 100644
--- a/go.sum
+++ b/go.sum
@@ -6,6 +6,8 @@ github.com/Azure/go-ntlmssp v0.1.0 h1:DjFo6YtWzNqNvQdrwEyr/e4nhU3vRiwenz5QX7sFz+
github.com/Azure/go-ntlmssp v0.1.0/go.mod h1:NYqdhxd/8aAct/s4qSYZEerdPuH1liG2/X9DiVTbhpk=
github.com/ClusterCockpit/cc-lib/v2 v2.5.1 h1:s6M9tyPDty+4zTdQGJYKpGJM9Nz7N6ITMdjPvNSLX5g=
github.com/ClusterCockpit/cc-lib/v2 v2.5.1/go.mod h1:DZ8OIHPUZJpWqErLITt0B8P6/Q7CBW2IQSQ5YiFFaG0=
+github.com/ClusterCockpit/cc-lib/v2 v2.6.0 h1:Q7zvRAVhfYA9PDB18pfY9A/6Ws4oWpnv8+P9MBRUDzg=
+github.com/ClusterCockpit/cc-lib/v2 v2.6.0/go.mod h1:DZ8OIHPUZJpWqErLITt0B8P6/Q7CBW2IQSQ5YiFFaG0=
github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU=
github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU=
github.com/KyleBanks/depth v1.2.1 h1:5h8fQADFrWtarTdtDudMmGsC7GPbOAu6RVB3ffsVFHc=
diff --git a/internal/api/api_test.go b/internal/api/api_test.go
index 09fc4c7f..a8aef889 100644
--- a/internal/api/api_test.go
+++ b/internal/api/api_test.go
@@ -488,3 +488,163 @@ func TestRestApi(t *testing.T) {
}
})
}
+
+// TestStopJobWithReusedJobId verifies that stopping a recently started job works
+// even when an older job with the same jobId exists in the job table (e.g. with
+// state "failed"). This is a regression test for the bug where Find() on the job
+// table would match the old job instead of the new one still in job_cache.
+func TestStopJobWithReusedJobId(t *testing.T) {
+ restapi := setup(t)
+ t.Cleanup(cleanup)
+
+ testData := schema.JobData{
+ "load_one": map[schema.MetricScope]*schema.JobMetric{
+ schema.MetricScopeNode: {
+ Unit: schema.Unit{Base: "load"},
+ Timestep: 60,
+ Series: []schema.Series{
+ {
+ Hostname: "host123",
+ Statistics: schema.MetricStatistics{Min: 0.1, Avg: 0.2, Max: 0.3},
+ Data: []schema.Float{0.1, 0.1, 0.1, 0.2, 0.2, 0.2, 0.3, 0.3, 0.3},
+ },
+ },
+ },
+ },
+ }
+
+ metricstore.TestLoadDataCallback = func(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context, resolution int) (schema.JobData, error) {
+ return testData, nil
+ }
+
+ r := chi.NewRouter()
+ restapi.MountAPIRoutes(r)
+
+ const contextUserKey repository.ContextKey = "user"
+ contextUserValue := &schema.User{
+ Username: "testuser",
+ Projects: make([]string, 0),
+ Roles: []string{"user"},
+ AuthType: 0,
+ AuthSource: 2,
+ }
+
+ // Step 1: Start the first job (jobId=999)
+ const startJobBody1 string = `{
+ "jobId": 999,
+ "user": "testuser",
+ "project": "testproj",
+ "cluster": "testcluster",
+ "partition": "default",
+ "walltime": 3600,
+ "numNodes": 1,
+ "numHwthreads": 8,
+ "numAcc": 0,
+ "shared": "none",
+ "monitoringStatus": 1,
+ "smt": 1,
+ "resources": [{"hostname": "host123", "hwthreads": [0, 1, 2, 3, 4, 5, 6, 7]}],
+ "startTime": 200000000
+ }`
+
+ if ok := t.Run("StartFirstJob", func(t *testing.T) {
+ req := httptest.NewRequest(http.MethodPost, "/jobs/start_job/", bytes.NewBuffer([]byte(startJobBody1)))
+ recorder := httptest.NewRecorder()
+ ctx := context.WithValue(req.Context(), contextUserKey, contextUserValue)
+ r.ServeHTTP(recorder, req.WithContext(ctx))
+ if recorder.Result().StatusCode != http.StatusCreated {
+ t.Fatal(recorder.Result().Status, recorder.Body.String())
+ }
+ }); !ok {
+ return
+ }
+
+ // Step 2: Sync to move job from cache to job table, then stop it as "failed"
+ time.Sleep(1 * time.Second)
+ restapi.JobRepository.SyncJobs()
+
+ const stopJobBody1 string = `{
+ "jobId": 999,
+ "startTime": 200000000,
+ "cluster": "testcluster",
+ "jobState": "failed",
+ "stopTime": 200001000
+ }`
+
+ if ok := t.Run("StopFirstJobAsFailed", func(t *testing.T) {
+ req := httptest.NewRequest(http.MethodPost, "/jobs/stop_job/", bytes.NewBuffer([]byte(stopJobBody1)))
+ recorder := httptest.NewRecorder()
+ ctx := context.WithValue(req.Context(), contextUserKey, contextUserValue)
+ r.ServeHTTP(recorder, req.WithContext(ctx))
+ if recorder.Result().StatusCode != http.StatusOK {
+ t.Fatal(recorder.Result().Status, recorder.Body.String())
+ }
+
+ jobid, cluster := int64(999), "testcluster"
+ job, err := restapi.JobRepository.Find(&jobid, &cluster, nil)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if job.State != schema.JobStateFailed {
+ t.Fatalf("expected first job to be failed, got: %s", job.State)
+ }
+ }); !ok {
+ return
+ }
+
+ // Wait for archiving to complete
+ time.Sleep(1 * time.Second)
+
+ // Step 3: Start a NEW job with the same jobId=999 but different startTime.
+ // This job will sit in job_cache (not yet synced).
+ const startJobBody2 string = `{
+ "jobId": 999,
+ "user": "testuser",
+ "project": "testproj",
+ "cluster": "testcluster",
+ "partition": "default",
+ "walltime": 3600,
+ "numNodes": 1,
+ "numHwthreads": 8,
+ "numAcc": 0,
+ "shared": "none",
+ "monitoringStatus": 1,
+ "smt": 1,
+ "resources": [{"hostname": "host123", "hwthreads": [0, 1, 2, 3, 4, 5, 6, 7]}],
+ "startTime": 300000000
+ }`
+
+ if ok := t.Run("StartSecondJob", func(t *testing.T) {
+ req := httptest.NewRequest(http.MethodPost, "/jobs/start_job/", bytes.NewBuffer([]byte(startJobBody2)))
+ recorder := httptest.NewRecorder()
+ ctx := context.WithValue(req.Context(), contextUserKey, contextUserValue)
+ r.ServeHTTP(recorder, req.WithContext(ctx))
+ if recorder.Result().StatusCode != http.StatusCreated {
+ t.Fatal(recorder.Result().Status, recorder.Body.String())
+ }
+ }); !ok {
+ return
+ }
+
+ // Step 4: Stop the second job WITHOUT syncing first.
+ // Before the fix, this would fail because Find() on the job table would
+ // match the old failed job (jobId=999) and reject with "already stopped".
+ const stopJobBody2 string = `{
+ "jobId": 999,
+ "startTime": 300000000,
+ "cluster": "testcluster",
+ "jobState": "completed",
+ "stopTime": 300001000
+ }`
+
+ t.Run("StopSecondJobBeforeSync", func(t *testing.T) {
+ req := httptest.NewRequest(http.MethodPost, "/jobs/stop_job/", bytes.NewBuffer([]byte(stopJobBody2)))
+ recorder := httptest.NewRecorder()
+ ctx := context.WithValue(req.Context(), contextUserKey, contextUserValue)
+ r.ServeHTTP(recorder, req.WithContext(ctx))
+ if recorder.Result().StatusCode != http.StatusOK {
+ t.Fatalf("expected stop to succeed for cached job, got: %s %s",
+ recorder.Result().Status, recorder.Body.String())
+ }
+ })
+}
diff --git a/internal/api/docs.go b/internal/api/docs.go
index 78eecfa3..de3cf506 100644
--- a/internal/api/docs.go
+++ b/internal/api/docs.go
@@ -396,8 +396,71 @@ const docTemplate = `{
]
}
},
+ "/api/jobs/edit_meta/": {
+ "patch": {
+ "security": [
+ {
+ "ApiKeyAuth": []
+ }
+ ],
+ "description": "Edit key value pairs in metadata json of job specified by jobID, StartTime and Cluster\nIf a key already exists its content will be overwritten",
+ "consumes": [
+ "application/json"
+ ],
+ "produces": [
+ "application/json"
+ ],
+ "tags": [
+ "Job add and modify"
+ ],
+ "summary": "Edit meta-data json by request",
+ "parameters": [
+ {
+ "description": "Specifies job and payload to add or update",
+ "name": "request",
+ "in": "body",
+ "required": true,
+ "schema": {
+ "$ref": "#/definitions/api.JobMetaRequest"
+ }
+ }
+ ],
+ "responses": {
+ "200": {
+ "description": "Updated job resource",
+ "schema": {
+ "$ref": "#/definitions/schema.Job"
+ }
+ },
+ "400": {
+ "description": "Bad Request",
+ "schema": {
+ "$ref": "#/definitions/api.ErrorResponse"
+ }
+ },
+ "401": {
+ "description": "Unauthorized",
+ "schema": {
+ "$ref": "#/definitions/api.ErrorResponse"
+ }
+ },
+ "404": {
+ "description": "Job does not exist",
+ "schema": {
+ "$ref": "#/definitions/api.ErrorResponse"
+ }
+ },
+ "500": {
+ "description": "Internal Server Error",
+ "schema": {
+ "$ref": "#/definitions/api.ErrorResponse"
+ }
+ }
+ }
+ }
+ },
"/api/jobs/edit_meta/{id}": {
- "post": {
+ "patch": {
"description": "Edit key value pairs in job metadata json\nIf a key already exists its content will be overwritten",
"consumes": [
"application/json"
diff --git a/internal/api/job.go b/internal/api/job.go
index 59136b02..76ec3e2a 100644
--- a/internal/api/job.go
+++ b/internal/api/job.go
@@ -72,6 +72,14 @@ type EditMetaRequest struct {
Value string `json:"value" example:"bash script"`
}
+// JobMetaRequest model
+type JobMetaRequest struct {
+ JobId *int64 `json:"jobId" validate:"required" example:"123000"` // Cluster Job ID of job
+ Cluster *string `json:"cluster" example:"fritz"` // Cluster of job
+ StartTime *int64 `json:"startTime" example:"1649723812"` // Start Time of job as epoch
+ Payload EditMetaRequest `json:"payload"` // Content to Add to Job Meta_Data
+}
+
type TagJobAPIRequest []*APITag
type GetJobAPIRequest []string
@@ -423,21 +431,21 @@ func (api *RestAPI) getJobByID(rw http.ResponseWriter, r *http.Request) {
}
// editMeta godoc
-// @summary Edit meta-data json
+// @summary Edit meta-data json of job identified by database id
// @tags Job add and modify
-// @description Edit key value pairs in job metadata json
+// @description Edit key value pairs in job metadata json of job specified by database id
// @description If a key already exists its content will be overwritten
// @accept json
// @produce json
// @param id path int true "Job Database ID"
-// @param request body api.EditMetaRequest true "Kay value pair to add"
+// @param request body api.EditMetaRequest true "Metadata Key value pair to add or update"
// @success 200 {object} schema.Job "Updated job resource"
// @failure 400 {object} api.ErrorResponse "Bad Request"
// @failure 401 {object} api.ErrorResponse "Unauthorized"
// @failure 404 {object} api.ErrorResponse "Job does not exist"
// @failure 500 {object} api.ErrorResponse "Internal Server Error"
// @security ApiKeyAuth
-// @router /api/jobs/edit_meta/{id} [post]
+// @router /api/jobs/edit_meta/{id} [patch]
func (api *RestAPI) editMeta(rw http.ResponseWriter, r *http.Request) {
id, err := strconv.ParseInt(chi.URLParam(r, "id"), 10, 64)
if err != nil {
@@ -469,6 +477,54 @@ func (api *RestAPI) editMeta(rw http.ResponseWriter, r *http.Request) {
}
}
+// editMetaByRequest godoc
+// @summary Edit meta-data json of job identified by request
+// @tags Job add and modify
+// @description Edit key value pairs in metadata json of job specified by jobID, StartTime and Cluster
+// @description If a key already exists its content will be overwritten
+// @accept json
+// @produce json
+// @param request body api.JobMetaRequest true "Specifies job and payload to add or update"
+// @success 200 {object} schema.Job "Updated job resource"
+// @failure 400 {object} api.ErrorResponse "Bad Request"
+// @failure 401 {object} api.ErrorResponse "Unauthorized"
+// @failure 404 {object} api.ErrorResponse "Job does not exist"
+// @failure 500 {object} api.ErrorResponse "Internal Server Error"
+// @security ApiKeyAuth
+// @router /api/jobs/edit_meta/ [patch]
+func (api *RestAPI) editMetaByRequest(rw http.ResponseWriter, r *http.Request) {
+ // Parse request body
+ req := JobMetaRequest{}
+ if err := decode(r.Body, &req); err != nil {
+ handleError(fmt.Errorf("parsing request body failed: %w", err), http.StatusBadRequest, rw)
+ return
+ }
+
+ // Fetch job (that will have its meta_data edited) from db
+ var job *schema.Job
+ var err error
+ if req.JobId == nil {
+ handleError(errors.New("the field 'jobId' is required"), http.StatusBadRequest, rw)
+ return
+ }
+
+ // log.Printf("loading db job for editMetaByRequest... : JobMetaRequest=%v", req)
+ job, err = api.JobRepository.Find(req.JobId, req.Cluster, req.StartTime)
+ if err != nil {
+ handleError(fmt.Errorf("finding job failed: %w", err), http.StatusUnprocessableEntity, rw)
+ return
+ }
+
+ if err := api.JobRepository.UpdateMetadata(job, req.Payload.Key, req.Payload.Value); err != nil {
+ http.Error(rw, err.Error(), http.StatusInternalServerError)
+ return
+ }
+
+ rw.Header().Add("Content-Type", "application/json")
+ rw.WriteHeader(http.StatusOK)
+ json.NewEncoder(rw).Encode(job)
+}
+
// tagJob godoc
// @summary Adds one or more tags to a job
// @tags Job add and modify
@@ -763,16 +819,15 @@ func (api *RestAPI) stopJobByRequest(rw http.ResponseWriter, r *http.Request) {
}
isCached := false
- job, err = api.JobRepository.Find(req.JobID, req.Cluster, req.StartTime)
+ job, err = api.JobRepository.FindCached(req.JobID, req.Cluster, req.StartTime)
if err != nil {
- // Try cached jobs if not found in main repository
- cachedJob, cachedErr := api.JobRepository.FindCached(req.JobID, req.Cluster, req.StartTime)
- if cachedErr != nil {
- // Combine both errors for better debugging
- handleError(fmt.Errorf("finding job failed: %w (cached lookup also failed: %v)", err, cachedErr), http.StatusNotFound, rw)
+ // Not in cache, try main job table
+ job, err = api.JobRepository.Find(req.JobID, req.Cluster, req.StartTime)
+ if err != nil {
+ handleError(fmt.Errorf("finding job failed: %w", err), http.StatusNotFound, rw)
return
}
- job = cachedJob
+ } else {
isCached = true
}
@@ -905,11 +960,13 @@ func (api *RestAPI) deleteJobBefore(rw http.ResponseWriter, r *http.Request) {
}
// Check for omit-tagged query parameter
- omitTagged := false
+ omitTagged := "none"
if omitTaggedStr := r.URL.Query().Get("omit-tagged"); omitTaggedStr != "" {
- omitTagged, e = strconv.ParseBool(omitTaggedStr)
- if e != nil {
- handleError(fmt.Errorf("boolean expected for omit-tagged parameter: %w", e), http.StatusBadRequest, rw)
+ switch omitTaggedStr {
+ case "none", "all", "user":
+ omitTagged = omitTaggedStr
+ default:
+ handleError(fmt.Errorf("omit-tagged must be one of: none, all, user"), http.StatusBadRequest, rw)
return
}
}
diff --git a/internal/api/nats.go b/internal/api/nats.go
index f7ad0f40..02a03fae 100644
--- a/internal/api/nats.go
+++ b/internal/api/nats.go
@@ -259,15 +259,15 @@ func (api *NatsAPI) handleStopJob(payload string) {
}
isCached := false
- job, err := api.JobRepository.Find(req.JobID, req.Cluster, req.StartTime)
+ job, err := api.JobRepository.FindCached(req.JobID, req.Cluster, req.StartTime)
if err != nil {
- cachedJob, cachedErr := api.JobRepository.FindCached(req.JobID, req.Cluster, req.StartTime)
- if cachedErr != nil {
- cclog.Errorf("NATS job stop: finding job failed: %v (cached lookup also failed: %v)",
- err, cachedErr)
+ // Not in cache, try main job table
+ job, err = api.JobRepository.Find(req.JobID, req.Cluster, req.StartTime)
+ if err != nil {
+ cclog.Errorf("NATS job stop: finding job failed: %v", err)
return
}
- job = cachedJob
+ } else {
isCached = true
}
diff --git a/internal/api/rest.go b/internal/api/rest.go
index fe722511..4d2385e3 100644
--- a/internal/api/rest.go
+++ b/internal/api/rest.go
@@ -22,6 +22,7 @@ import (
"github.com/ClusterCockpit/cc-backend/internal/auth"
"github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-backend/internal/repository"
+ "github.com/ClusterCockpit/cc-backend/internal/tagger"
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
"github.com/ClusterCockpit/cc-lib/v2/schema"
"github.com/ClusterCockpit/cc-lib/v2/util"
@@ -95,8 +96,8 @@ func (api *RestAPI) MountAPIRoutes(r chi.Router) {
r.Post("/jobs/tag_job/{id}", api.tagJob)
r.Patch("/jobs/tag_job/{id}", api.tagJob)
r.Delete("/jobs/tag_job/{id}", api.removeTagJob)
- r.Post("/jobs/edit_meta/{id}", api.editMeta)
r.Patch("/jobs/edit_meta/{id}", api.editMeta)
+ r.Patch("/jobs/edit_meta/", api.editMetaByRequest)
r.Get("/jobs/metrics/{id}", api.getJobMetrics)
r.Delete("/jobs/delete_job/", api.deleteJobByRequest)
r.Delete("/jobs/delete_job/{id}", api.deleteJobByID)
@@ -152,6 +153,8 @@ func (api *RestAPI) MountConfigAPIRoutes(r chi.Router) {
r.Delete("/config/users/", api.deleteUser)
r.Post("/config/user/{id}", api.updateUser)
r.Post("/config/notice/", api.editNotice)
+ r.Get("/config/taggers/", api.getTaggers)
+ r.Post("/config/taggers/run/", api.runTagger)
}
}
@@ -268,6 +271,42 @@ func (api *RestAPI) editNotice(rw http.ResponseWriter, r *http.Request) {
}
}
+func (api *RestAPI) getTaggers(rw http.ResponseWriter, r *http.Request) {
+ if user := repository.GetUserFromContext(r.Context()); !user.HasRole(schema.RoleAdmin) {
+ handleError(fmt.Errorf("only admins are allowed to list taggers"), http.StatusForbidden, rw)
+ return
+ }
+
+ rw.Header().Set("Content-Type", "application/json")
+ if err := json.NewEncoder(rw).Encode(tagger.ListTaggers()); err != nil {
+ cclog.Errorf("Failed to encode tagger list: %v", err)
+ }
+}
+
+func (api *RestAPI) runTagger(rw http.ResponseWriter, r *http.Request) {
+ if user := repository.GetUserFromContext(r.Context()); !user.HasRole(schema.RoleAdmin) {
+ handleError(fmt.Errorf("only admins are allowed to run taggers"), http.StatusForbidden, rw)
+ return
+ }
+
+ name := r.FormValue("name")
+ if name == "" {
+ handleError(fmt.Errorf("missing required parameter: name"), http.StatusBadRequest, rw)
+ return
+ }
+
+ if err := tagger.RunTaggerByName(name); err != nil {
+ handleError(err, http.StatusConflict, rw)
+ return
+ }
+
+ rw.Header().Set("Content-Type", "text/plain")
+ rw.WriteHeader(http.StatusOK)
+ if _, err := rw.Write([]byte(fmt.Sprintf("Tagger %s started", name))); err != nil {
+ cclog.Errorf("Failed to write response: %v", err)
+ }
+}
+
// getJWT godoc
// @summary Generate JWT token
// @tags Frontend
diff --git a/internal/repository/job.go b/internal/repository/job.go
index a1cd9719..8055ca37 100644
--- a/internal/repository/job.go
+++ b/internal/repository/job.go
@@ -392,15 +392,19 @@ func (r *JobRepository) FetchEnergyFootprint(job *schema.Job) (map[string]float6
//
// Parameters:
// - startTime: Unix timestamp, jobs with start_time < this value will be deleted
-// - omitTagged: If true, skip jobs that have associated tags (jobtag entries)
+// - omitTagged: "none" = delete all jobs, "all" = skip any tagged jobs,
+// "user" = skip jobs with user-created tags (not auto-tagger types "app"/"jobClass")
//
// Returns the count of deleted jobs or an error if the operation fails.
-func (r *JobRepository) DeleteJobsBefore(startTime int64, omitTagged bool) (int, error) {
+func (r *JobRepository) DeleteJobsBefore(startTime int64, omitTagged string) (int, error) {
var cnt int
q := sq.Select("count(*)").From("job").Where("job.start_time < ?", startTime)
- if omitTagged {
+ switch omitTagged {
+ case "all":
q = q.Where("NOT EXISTS (SELECT 1 FROM jobtag WHERE jobtag.job_id = job.id)")
+ case "user":
+ q = q.Where("NOT EXISTS (SELECT 1 FROM jobtag JOIN tag ON tag.id = jobtag.tag_id WHERE jobtag.job_id = job.id AND tag.tag_type NOT IN ('app', 'jobClass'))")
}
if err := q.RunWith(r.DB).QueryRow().Scan(&cnt); err != nil {
@@ -413,8 +417,11 @@ func (r *JobRepository) DeleteJobsBefore(startTime int64, omitTagged bool) (int,
var jobIds []int64
selectQuery := sq.Select("id").From("job").Where("job.start_time < ?", startTime)
- if omitTagged {
+ switch omitTagged {
+ case "all":
selectQuery = selectQuery.Where("NOT EXISTS (SELECT 1 FROM jobtag WHERE jobtag.job_id = job.id)")
+ case "user":
+ selectQuery = selectQuery.Where("NOT EXISTS (SELECT 1 FROM jobtag JOIN tag ON tag.id = jobtag.tag_id WHERE jobtag.job_id = job.id AND tag.tag_type NOT IN ('app', 'jobClass'))")
}
rows, err := selectQuery.RunWith(r.DB).Query()
@@ -436,8 +443,11 @@ func (r *JobRepository) DeleteJobsBefore(startTime int64, omitTagged bool) (int,
qd := sq.Delete("job").Where("job.start_time < ?", startTime)
- if omitTagged {
+ switch omitTagged {
+ case "all":
qd = qd.Where("NOT EXISTS (SELECT 1 FROM jobtag WHERE jobtag.job_id = job.id)")
+ case "user":
+ qd = qd.Where("NOT EXISTS (SELECT 1 FROM jobtag JOIN tag ON tag.id = jobtag.tag_id WHERE jobtag.job_id = job.id AND tag.tag_type NOT IN ('app', 'jobClass'))")
}
_, err := qd.RunWith(r.DB).Exec()
@@ -822,10 +832,11 @@ func (r *JobRepository) UpdateDuration() error {
// Parameters:
// - startTimeBegin: Unix timestamp for range start (use 0 for unbounded start)
// - startTimeEnd: Unix timestamp for range end
-// - omitTagged: If true, exclude jobs with associated tags
+// - omitTagged: "none" = include all jobs, "all" = exclude any tagged jobs,
+// "user" = exclude jobs with user-created tags (not auto-tagger types "app"/"jobClass")
//
// Returns a slice of jobs or an error if the time range is invalid or query fails.
-func (r *JobRepository) FindJobsBetween(startTimeBegin int64, startTimeEnd int64, omitTagged bool) ([]*schema.Job, error) {
+func (r *JobRepository) FindJobsBetween(startTimeBegin int64, startTimeEnd int64, omitTagged string) ([]*schema.Job, error) {
var query sq.SelectBuilder
if startTimeBegin == startTimeEnd || startTimeBegin > startTimeEnd {
@@ -840,8 +851,11 @@ func (r *JobRepository) FindJobsBetween(startTimeBegin int64, startTimeEnd int64
query = sq.Select(jobColumns...).From("job").Where("job.start_time BETWEEN ? AND ?", startTimeBegin, startTimeEnd)
}
- if omitTagged {
+ switch omitTagged {
+ case "all":
query = query.Where("NOT EXISTS (SELECT 1 FROM jobtag WHERE jobtag.job_id = job.id)")
+ case "user":
+ query = query.Where("NOT EXISTS (SELECT 1 FROM jobtag JOIN tag ON tag.id = jobtag.tag_id WHERE jobtag.job_id = job.id AND tag.tag_type NOT IN ('app', 'jobClass'))")
}
query = query.OrderBy("job.cluster ASC", "job.subcluster ASC", "job.project ASC", "job.start_time ASC")
diff --git a/internal/repository/job_test.go b/internal/repository/job_test.go
index 9f4871fd..992251af 100644
--- a/internal/repository/job_test.go
+++ b/internal/repository/job_test.go
@@ -78,7 +78,7 @@ func TestFindJobsBetween(t *testing.T) {
// 1. Find a job to use (Find all jobs)
// We use a large time range to ensure we get something if it exists
- jobs, err := r.FindJobsBetween(0, 9999999999, false)
+ jobs, err := r.FindJobsBetween(0, 9999999999, "none")
if err != nil {
t.Fatal(err)
}
@@ -88,21 +88,21 @@ func TestFindJobsBetween(t *testing.T) {
targetJob := jobs[0]
- // 2. Create a tag
- tagName := fmt.Sprintf("testtag_%d", time.Now().UnixNano())
- tagID, err := r.CreateTag("testtype", tagName, "global")
+ // 2. Create an auto-tagger tag (type "app")
+ appTagName := fmt.Sprintf("apptag_%d", time.Now().UnixNano())
+ appTagID, err := r.CreateTag("app", appTagName, "global")
if err != nil {
t.Fatal(err)
}
- // 3. Link Tag (Manually to avoid archive dependency side-effects in unit test)
- _, err = r.DB.Exec("INSERT INTO jobtag (job_id, tag_id) VALUES (?, ?)", *targetJob.ID, tagID)
+ // 3. Link auto-tagger tag to job
+ _, err = r.DB.Exec("INSERT INTO jobtag (job_id, tag_id) VALUES (?, ?)", *targetJob.ID, appTagID)
if err != nil {
t.Fatal(err)
}
- // 4. Search with omitTagged = false (Should find the job)
- jobsFound, err := r.FindJobsBetween(0, 9999999999, false)
+ // 4. Search with omitTagged = "none" (Should find the job)
+ jobsFound, err := r.FindJobsBetween(0, 9999999999, "none")
if err != nil {
t.Fatal(err)
}
@@ -115,18 +115,58 @@ func TestFindJobsBetween(t *testing.T) {
}
}
if !found {
- t.Errorf("Target job %d should be found when omitTagged=false", *targetJob.ID)
+ t.Errorf("Target job %d should be found when omitTagged=none", *targetJob.ID)
}
- // 5. Search with omitTagged = true (Should NOT find the job)
- jobsFiltered, err := r.FindJobsBetween(0, 9999999999, true)
+ // 5. Search with omitTagged = "all" (Should NOT find the job — it has a tag)
+ jobsFiltered, err := r.FindJobsBetween(0, 9999999999, "all")
if err != nil {
t.Fatal(err)
}
for _, j := range jobsFiltered {
if *j.ID == *targetJob.ID {
- t.Errorf("Target job %d should NOT be found when omitTagged=true", *targetJob.ID)
+ t.Errorf("Target job %d should NOT be found when omitTagged=all", *targetJob.ID)
+ }
+ }
+
+ // 6. Search with omitTagged = "user": auto-tagger tag ("app") should NOT exclude the job
+ jobsUserFilter, err := r.FindJobsBetween(0, 9999999999, "user")
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ found = false
+ for _, j := range jobsUserFilter {
+ if *j.ID == *targetJob.ID {
+ found = true
+ break
+ }
+ }
+ if !found {
+ t.Errorf("Target job %d should be found when omitTagged=user (only has auto-tagger tag)", *targetJob.ID)
+ }
+
+ // 7. Add a user-created tag (type "testtype") to the same job
+ userTagName := fmt.Sprintf("usertag_%d", time.Now().UnixNano())
+ userTagID, err := r.CreateTag("testtype", userTagName, "global")
+ if err != nil {
+ t.Fatal(err)
+ }
+ _, err = r.DB.Exec("INSERT INTO jobtag (job_id, tag_id) VALUES (?, ?)", *targetJob.ID, userTagID)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ // 8. Now omitTagged = "user" should exclude the job (has a user-created tag)
+ jobsUserFilter2, err := r.FindJobsBetween(0, 9999999999, "user")
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ for _, j := range jobsUserFilter2 {
+ if *j.ID == *targetJob.ID {
+ t.Errorf("Target job %d should NOT be found when omitTagged=user (has user-created tag)", *targetJob.ID)
}
}
}
diff --git a/internal/tagger/tagger.go b/internal/tagger/tagger.go
index bde3817d..5ee27e08 100644
--- a/internal/tagger/tagger.go
+++ b/internal/tagger/tagger.go
@@ -10,6 +10,7 @@
package tagger
import (
+ "fmt"
"sync"
"github.com/ClusterCockpit/cc-backend/internal/repository"
@@ -29,11 +30,31 @@ type Tagger interface {
Match(job *schema.Job)
}
+// TaggerInfo holds metadata about a tagger for JSON serialization.
+type TaggerInfo struct {
+ Name string `json:"name"`
+ Type string `json:"type"`
+ Running bool `json:"running"`
+}
+
var (
- initOnce sync.Once
- jobTagger *JobTagger
+ initOnce sync.Once
+ jobTagger *JobTagger
+ statusMu sync.Mutex
+ taggerStatus = map[string]bool{}
)
+// Known tagger definitions: name -> (type, factory)
+type taggerDef struct {
+ ttype string
+ factory func() Tagger
+}
+
+var knownTaggers = map[string]taggerDef{
+ "AppTagger": {ttype: "start", factory: func() Tagger { return &AppTagger{} }},
+ "JobClassTagger": {ttype: "stop", factory: func() Tagger { return &JobClassTagger{} }},
+}
+
// JobTagger coordinates multiple taggers that run at different job lifecycle events.
// It maintains separate lists of taggers that run when jobs start and when they stop.
type JobTagger struct {
@@ -88,6 +109,73 @@ func (jt *JobTagger) JobStopCallback(job *schema.Job) {
}
}
+// ListTaggers returns information about all known taggers with their current running status.
+func ListTaggers() []TaggerInfo {
+ statusMu.Lock()
+ defer statusMu.Unlock()
+
+ result := make([]TaggerInfo, 0, len(knownTaggers))
+ for name, def := range knownTaggers {
+ result = append(result, TaggerInfo{
+ Name: name,
+ Type: def.ttype,
+ Running: taggerStatus[name],
+ })
+ }
+ return result
+}
+
+// RunTaggerByName starts a tagger by name asynchronously on all jobs.
+// Returns an error if the name is unknown or the tagger is already running.
+func RunTaggerByName(name string) error {
+ def, ok := knownTaggers[name]
+ if !ok {
+ return fmt.Errorf("unknown tagger: %s", name)
+ }
+
+ statusMu.Lock()
+ if taggerStatus[name] {
+ statusMu.Unlock()
+ return fmt.Errorf("tagger %s is already running", name)
+ }
+ taggerStatus[name] = true
+ statusMu.Unlock()
+
+ go func() {
+ defer func() {
+ statusMu.Lock()
+ taggerStatus[name] = false
+ statusMu.Unlock()
+ }()
+
+ t := def.factory()
+ if err := t.Register(); err != nil {
+ cclog.Errorf("Failed to register tagger %s: %s", name, err)
+ return
+ }
+
+ r := repository.GetJobRepository()
+ jl, err := r.GetJobList(0, 0)
+ if err != nil {
+ cclog.Errorf("Error getting job list for tagger %s: %s", name, err)
+ return
+ }
+
+ cclog.Infof("Running tagger %s on %d jobs", name, len(jl))
+ for _, id := range jl {
+ job, err := r.FindByIDDirect(id)
+ if err != nil {
+ cclog.Errorf("Error getting job %d for tagger %s: %s", id, name, err)
+ continue
+ }
+ t.Match(job)
+ }
+ cclog.Infof("Tagger %s completed", name)
+ }()
+
+ return nil
+}
+
// RunTaggers applies all configured taggers to all existing jobs in the repository.
// This is useful for retroactively applying tags to jobs that were created before
// the tagger system was initialized or when new tagging rules are added.
diff --git a/internal/taskmanager/compressionService.go b/internal/taskmanager/compressionService.go
index ab01ce8f..353fcb65 100644
--- a/internal/taskmanager/compressionService.go
+++ b/internal/taskmanager/compressionService.go
@@ -28,10 +28,10 @@ func RegisterCompressionService(compressOlderThan int) {
lastTime := ar.CompressLast(startTime)
if startTime == lastTime {
cclog.Info("Compression Service - Complete archive run")
- jobs, err = jobRepo.FindJobsBetween(0, startTime, false)
+ jobs, err = jobRepo.FindJobsBetween(0, startTime, "none")
} else {
- jobs, err = jobRepo.FindJobsBetween(lastTime, startTime, false)
+ jobs, err = jobRepo.FindJobsBetween(lastTime, startTime, "none")
}
if err != nil {
diff --git a/internal/taskmanager/retentionService.go b/internal/taskmanager/retentionService.go
index eda452e6..48e5c042 100644
--- a/internal/taskmanager/retentionService.go
+++ b/internal/taskmanager/retentionService.go
@@ -149,7 +149,7 @@ func transferJobsParquet(jobs []*schema.Job, src archive.ArchiveBackend, target
}
// cleanupAfterTransfer removes jobs from archive and optionally from DB.
-func cleanupAfterTransfer(jobs []*schema.Job, startTime int64, includeDB bool, omitTagged bool) {
+func cleanupAfterTransfer(jobs []*schema.Job, startTime int64, includeDB bool, omitTagged string) {
archive.GetHandle().CleanUp(jobs)
if includeDB {
diff --git a/internal/taskmanager/taskManager.go b/internal/taskmanager/taskManager.go
index b25b2a93..d758ee52 100644
--- a/internal/taskmanager/taskManager.go
+++ b/internal/taskmanager/taskManager.go
@@ -26,8 +26,8 @@ type Retention struct {
Policy string `json:"policy"`
Format string `json:"format"`
Age int `json:"age"`
- IncludeDB bool `json:"includeDB"`
- OmitTagged bool `json:"omitTagged"`
+ IncludeDB bool `json:"include-db"`
+ OmitTagged string `json:"omit-tagged"`
TargetKind string `json:"target-kind"`
TargetPath string `json:"target-path"`
TargetEndpoint string `json:"target-endpoint"`
diff --git a/pkg/archive/ConfigSchema.go b/pkg/archive/ConfigSchema.go
index cb9b16bc..1c2b7fe1 100644
--- a/pkg/archive/ConfigSchema.go
+++ b/pkg/archive/ConfigSchema.go
@@ -68,6 +68,11 @@ var configSchema = `
"description": "Also remove jobs from database",
"type": "boolean"
},
+ "omit-tagged": {
+ "description": "Omit tagged jobs from retention: none = include all, all = omit any tagged job, user = omit jobs with user-created tags (auto-tagger types 'app'/'jobClass' are not considered user tags)",
+ "type": "string",
+ "enum": ["none", "all", "user"]
+ },
"age": {
"description": "Act on jobs with startTime older than age (in days)",
"type": "integer"
diff --git a/web/frontend/package-lock.json b/web/frontend/package-lock.json
index 6962dc1b..e293d650 100644
--- a/web/frontend/package-lock.json
+++ b/web/frontend/package-lock.json
@@ -609,6 +609,12 @@
"dev": true,
"license": "MIT"
},
+ "node_modules/@types/trusted-types": {
+ "version": "2.0.7",
+ "resolved": "https://registry.npmjs.org/@types/trusted-types/-/trusted-types-2.0.7.tgz",
+ "integrity": "sha512-ScaPdn1dQczgbl0QFTeTOmVHFULt394XJgOQNoyVhZ6r2vLnMLJfBPd53SB52T/3G36VI1/g2MZaX0cwDuXsfw==",
+ "license": "MIT"
+ },
"node_modules/@urql/core": {
"version": "5.2.0",
"resolved": "https://registry.npmjs.org/@urql/core/-/core-5.2.0.tgz",
@@ -745,9 +751,9 @@
}
},
"node_modules/devalue": {
- "version": "5.6.2",
- "resolved": "https://registry.npmjs.org/devalue/-/devalue-5.6.2.tgz",
- "integrity": "sha512-nPRkjWzzDQlsejL1WVifk5rvcFi/y1onBRxjaFMjZeR9mFpqu2gmAZ9xUB9/IEanEP/vBtGeGganC/GO1fmufg==",
+ "version": "5.6.3",
+ "resolved": "https://registry.npmjs.org/devalue/-/devalue-5.6.3.tgz",
+ "integrity": "sha512-nc7XjUU/2Lb+SvEFVGcWLiKkzfw8+qHI7zn8WYXKkLMgfGSHbgCEaR6bJpev8Cm6Rmrb19Gfd/tZvGqx9is3wg==",
"license": "MIT"
},
"node_modules/escape-latex": {
@@ -763,9 +769,9 @@
"license": "MIT"
},
"node_modules/esrap": {
- "version": "2.2.1",
- "resolved": "https://registry.npmjs.org/esrap/-/esrap-2.2.1.tgz",
- "integrity": "sha512-GiYWG34AN/4CUyaWAgunGt0Rxvr1PTMlGC0vvEov/uOQYWne2bpN03Um+k8jT+q3op33mKouP2zeJ6OlM+qeUg==",
+ "version": "2.2.3",
+ "resolved": "https://registry.npmjs.org/esrap/-/esrap-2.2.3.tgz",
+ "integrity": "sha512-8fOS+GIGCQZl/ZIlhl59htOlms6U8NvX6ZYgYHpRU/b6tVSh3uHkOHZikl3D4cMbYM0JlpBe+p/BkZEi8J9XIQ==",
"license": "MIT",
"dependencies": {
"@jridgewell/sourcemap-codec": "^1.4.15"
@@ -1176,22 +1182,23 @@
}
},
"node_modules/svelte": {
- "version": "5.46.4",
- "resolved": "https://registry.npmjs.org/svelte/-/svelte-5.46.4.tgz",
- "integrity": "sha512-VJwdXrmv9L8L7ZasJeWcCjoIuMRVbhuxbss0fpVnR8yorMmjNDwcjIH08vS6wmSzzzgAG5CADQ1JuXPS2nwt9w==",
+ "version": "5.53.2",
+ "resolved": "https://registry.npmjs.org/svelte/-/svelte-5.53.2.tgz",
+ "integrity": "sha512-yGONuIrcl/BMmqbm6/52Q/NYzfkta7uVlos5NSzGTfNJTTFtPPzra6rAQoQIwAqupeM3s9uuTf5PvioeiCdg9g==",
"license": "MIT",
"dependencies": {
"@jridgewell/remapping": "^2.3.4",
"@jridgewell/sourcemap-codec": "^1.5.0",
"@sveltejs/acorn-typescript": "^1.0.5",
"@types/estree": "^1.0.5",
+ "@types/trusted-types": "^2.0.7",
"acorn": "^8.12.1",
"aria-query": "^5.3.1",
"axobject-query": "^4.1.0",
"clsx": "^2.1.1",
- "devalue": "^5.6.2",
+ "devalue": "^5.6.3",
"esm-env": "^1.2.1",
- "esrap": "^2.2.1",
+ "esrap": "^2.2.2",
"is-reference": "^3.0.3",
"locate-character": "^3.0.0",
"magic-string": "^0.30.11",
diff --git a/web/frontend/src/config/AdminSettings.svelte b/web/frontend/src/config/AdminSettings.svelte
index 46958963..5d61e66c 100644
--- a/web/frontend/src/config/AdminSettings.svelte
+++ b/web/frontend/src/config/AdminSettings.svelte
@@ -15,6 +15,7 @@
import ShowUsers from "./admin/ShowUsers.svelte";
import Options from "./admin/Options.svelte";
import NoticeEdit from "./admin/NoticeEdit.svelte";
+ import RunTaggers from "./admin/RunTaggers.svelte";
/* Svelte 5 Props */
let {
@@ -70,4 +71,5 @@
Run individual taggers on all existing jobs.
+ {#if taggers.length === 0} +No taggers available.
+ {:else} +| Name | +Type | +Status | ++ |
|---|---|---|---|
| {tagger.name} | +
+ {#if tagger.running}
+ |
+ + + | +
+ {#if displayMessage}{message.msg}{/if}
+
{hkey}: {host.healthData[hkey]}
{/each}- {mkey}: {host.metaData[mkey]} -
- {/each} -