mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2026-02-24 11:27:30 +01:00
Merge pull request #499 from ClusterCockpit/dev
Fix and extend jobclass rules
This commit is contained in:
2
Makefile
2
Makefile
@@ -84,4 +84,4 @@ $(VAR):
|
|||||||
|
|
||||||
$(SVELTE_TARGETS): $(SVELTE_SRC)
|
$(SVELTE_TARGETS): $(SVELTE_SRC)
|
||||||
$(info ===> BUILD frontend)
|
$(info ===> BUILD frontend)
|
||||||
cd web/frontend && npm install && npm run build
|
cd web/frontend && npm ci && npm run build
|
||||||
|
|||||||
107
ReleaseNotes.md
107
ReleaseNotes.md
@@ -16,19 +16,25 @@ For release specific notes visit the [ClusterCockpit Documentation](https://clus
|
|||||||
- **Removed `disable-archive` option**: This obsolete configuration option has been removed.
|
- **Removed `disable-archive` option**: This obsolete configuration option has been removed.
|
||||||
- **Removed `clusters` config section**: The separate clusters configuration section
|
- **Removed `clusters` config section**: The separate clusters configuration section
|
||||||
has been removed. Cluster information is now derived from the job archive.
|
has been removed. Cluster information is now derived from the job archive.
|
||||||
- **`apiAllowedIPs` is now optional**: If not specified, defaults to secure settings.
|
- **`apiAllowedIPs` is now optional**: If not specified, defaults to not
|
||||||
|
restricted.
|
||||||
|
|
||||||
### Architecture changes
|
### Architecture changes
|
||||||
|
|
||||||
|
- **Web framework replaced**: Migrated from `gorilla/mux` to `chi` as the HTTP
|
||||||
|
router. This should be transparent to users but affects how middleware and
|
||||||
|
routes are composed. A proper 404 handler is now in place.
|
||||||
- **MetricStore moved**: The `metricstore` package has been moved from `internal/`
|
- **MetricStore moved**: The `metricstore` package has been moved from `internal/`
|
||||||
to `pkg/` as it is now part of the public API.
|
to `pkg/` as it is now part of the public API.
|
||||||
- **MySQL/MariaDB support removed**: Only SQLite is now supported as the database backend.
|
- **MySQL/MariaDB support removed**: Only SQLite is now supported as the database backend.
|
||||||
- **Archive to Cleanup renaming**: Archive-related functions have been refactored
|
- **Archive to Cleanup renaming**: Archive-related functions have been refactored
|
||||||
and renamed to "Cleanup" for clarity.
|
and renamed to "Cleanup" for clarity.
|
||||||
|
- **`minRunningFor` filter removed**: This undocumented filter has been removed
|
||||||
|
from the API and frontend.
|
||||||
|
|
||||||
### Dependency changes
|
### Dependency changes
|
||||||
|
|
||||||
- **cc-lib v2**: Switched to cc-lib version 2 with updated APIs
|
- **cc-lib v2.5.1**: Switched to cc-lib version 2 with updated APIs (currently at v2.5.1)
|
||||||
- **cclib NATS client**: Now using the cclib NATS client implementation
|
- **cclib NATS client**: Now using the cclib NATS client implementation
|
||||||
- Removed obsolete `util.Float` usage from cclib
|
- Removed obsolete `util.Float` usage from cclib
|
||||||
|
|
||||||
@@ -51,13 +57,30 @@ For release specific notes visit the [ClusterCockpit Documentation](https://clus
|
|||||||
|
|
||||||
- **Node state tracking**: New node table in database with timestamp tracking
|
- **Node state tracking**: New node table in database with timestamp tracking
|
||||||
- **Node state filtering**: Filter jobs by node state in systems view
|
- **Node state filtering**: Filter jobs by node state in systems view
|
||||||
- **Node metrics improvements**: Better handling of node-level metrics and data
|
|
||||||
- **Node list enhancements**: Improved paging, filtering, and continuous scroll support
|
- **Node list enhancements**: Improved paging, filtering, and continuous scroll support
|
||||||
|
- **Nodestate retention and archiving**: Node state data is now subject to configurable
|
||||||
|
retention policies and can be archived to Parquet format for long-term storage
|
||||||
|
- **Faulty node metric tracking**: Faulty node state metric lists are persisted to the database
|
||||||
|
|
||||||
|
### Health Monitoring
|
||||||
|
|
||||||
|
- **Health status dashboard**: New dedicated "Health" tab in the status details view
|
||||||
|
showing per-node metric health across the cluster
|
||||||
|
- **CCMS health check**: Support for querying health status of external
|
||||||
|
cc-metric-store (CCMS) instances via the API
|
||||||
|
- **GraphQL health endpoints**: New GraphQL queries and resolvers for health data
|
||||||
|
- **Cluster/subcluster filter**: Filter health status view by cluster or subcluster
|
||||||
|
|
||||||
|
### Log Viewer
|
||||||
|
|
||||||
|
- **Web-based log viewer**: New log viewer page in the admin interface for inspecting
|
||||||
|
backend log output directly from the browser without shell access
|
||||||
|
- **Accessible from header**: Quick access link from the navigation header
|
||||||
|
|
||||||
### MetricStore Improvements
|
### MetricStore Improvements
|
||||||
|
|
||||||
- **Memory tracking worker**: New worker for CCMS memory usage tracking
|
- **Memory tracking worker**: New worker for CCMS memory usage tracking
|
||||||
- **Dynamic retention**: Support for cluster/subcluster-specific retention times
|
- **Dynamic retention**: Support for job specific dynamic retention times
|
||||||
- **Improved compression**: Transparent compression for job archive imports
|
- **Improved compression**: Transparent compression for job archive imports
|
||||||
- **Parallel processing**: Parallelized Iter function in all archive backends
|
- **Parallel processing**: Parallelized Iter function in all archive backends
|
||||||
|
|
||||||
@@ -65,15 +88,32 @@ For release specific notes visit the [ClusterCockpit Documentation](https://clus
|
|||||||
|
|
||||||
- **Job tagger option**: Enable automatic job tagging via configuration flag
|
- **Job tagger option**: Enable automatic job tagging via configuration flag
|
||||||
- **Application detection**: Automatic detection of applications (MATLAB, GROMACS, etc.)
|
- **Application detection**: Automatic detection of applications (MATLAB, GROMACS, etc.)
|
||||||
- **Job classifaction**: Automatic detection of pathological jobs
|
- **Job classification**: Automatic detection of pathological jobs
|
||||||
- **omitTagged flag**: Option to exclude tagged jobs from retention/cleanup operations
|
- **omit-tagged**: Option to exclude tagged jobs from retention/cleanup operations (`none`, `all`, or `user`)
|
||||||
|
- **Admin UI trigger**: Taggers can be run on-demand from the admin web interface
|
||||||
|
without restarting the backend
|
||||||
|
|
||||||
### Archive Backends
|
### Archive Backends
|
||||||
|
|
||||||
|
- **Parquet archive format**: New Parquet file format for job archiving, providing
|
||||||
|
columnar storage with efficient compression for analytical workloads
|
||||||
- **S3 backend**: Full support for S3-compatible object storage
|
- **S3 backend**: Full support for S3-compatible object storage
|
||||||
- **SQLite backend**: Full support for SQLite backend using blobs
|
- **SQLite backend**: Full support for SQLite backend using blobs
|
||||||
- **Performance improvements**: Fixed performance bugs in archive backends
|
- **Performance improvements**: Fixed performance bugs in archive backends
|
||||||
- **Better error handling**: Improved error messages and fallback handling
|
- **Better error handling**: Improved error messages and fallback handling
|
||||||
|
- **Zstd compression**: Parquet writers use zstd compression for better
|
||||||
|
compression ratios compared to the previous snappy default
|
||||||
|
- **Optimized sort order**: Job and nodestate Parquet files are sorted by
|
||||||
|
cluster, subcluster, and start time for efficient range queries
|
||||||
|
|
||||||
|
### Unified Archive Retention and Format Conversion
|
||||||
|
|
||||||
|
- **Uniform retention policy**: Job archive retention now supports both JSON and
|
||||||
|
Parquet as target formats under a single, consistent policy configuration
|
||||||
|
- **Archive manager tool**: The `tools/archive-manager` utility now supports
|
||||||
|
format conversion between JSON and Parquet job archives
|
||||||
|
- **Parquet reader**: Full Parquet archive reader implementation for reading back
|
||||||
|
archived job data
|
||||||
|
|
||||||
## New features and improvements
|
## New features and improvements
|
||||||
|
|
||||||
@@ -85,6 +125,14 @@ For release specific notes visit the [ClusterCockpit Documentation](https://clus
|
|||||||
- **Filter presets**: Move list filter preset to URL for easy sharing
|
- **Filter presets**: Move list filter preset to URL for easy sharing
|
||||||
- **Job comparison**: Improved job comparison views and plots
|
- **Job comparison**: Improved job comparison views and plots
|
||||||
- **Subcluster reactivity**: Job list now reacts to subcluster filter changes
|
- **Subcluster reactivity**: Job list now reacts to subcluster filter changes
|
||||||
|
- **Short jobs quick selection**: New "Short jobs" quick-filter button in job lists
|
||||||
|
replaces the removed undocumented `minRunningFor` filter
|
||||||
|
- **Row plot cursor sync**: Cursor position is now synchronized across all metric
|
||||||
|
plots in a job list row for easier cross-metric comparison
|
||||||
|
- **Disabled metrics handling**: Improved handling and display of disabled metrics
|
||||||
|
across job view, node view, and list rows
|
||||||
|
- **"Not configured" info cards**: Informational cards shown when optional features
|
||||||
|
are not yet configured
|
||||||
- **Frontend dependencies**: Bumped frontend dependencies to latest versions
|
- **Frontend dependencies**: Bumped frontend dependencies to latest versions
|
||||||
- **Svelte 5 compatibility**: Fixed Svelte state warnings and compatibility issues
|
- **Svelte 5 compatibility**: Fixed Svelte state warnings and compatibility issues
|
||||||
|
|
||||||
@@ -95,6 +143,15 @@ For release specific notes visit the [ClusterCockpit Documentation](https://clus
|
|||||||
- **Graceful shutdown**: Fixed shutdown timeout bugs and hanging issues
|
- **Graceful shutdown**: Fixed shutdown timeout bugs and hanging issues
|
||||||
- **Configuration defaults**: Sensible defaults for most configuration options
|
- **Configuration defaults**: Sensible defaults for most configuration options
|
||||||
- **Documentation**: Extensive documentation improvements across packages
|
- **Documentation**: Extensive documentation improvements across packages
|
||||||
|
- **Server flag in systemd unit**: Example systemd unit now includes the `-server` flag
|
||||||
|
|
||||||
|
### Security
|
||||||
|
|
||||||
|
- **LDAP security hardening**: Improved input validation, connection handling, and
|
||||||
|
error reporting in the LDAP authenticator
|
||||||
|
- **OIDC security hardening**: Stricter token validation and improved error handling
|
||||||
|
in the OIDC authenticator
|
||||||
|
- **Auth schema extensions**: Additional schema fields for improved auth configuration
|
||||||
|
|
||||||
### API improvements
|
### API improvements
|
||||||
|
|
||||||
@@ -102,6 +159,8 @@ For release specific notes visit the [ClusterCockpit Documentation](https://clus
|
|||||||
- **Job exclusivity filter**: New filter for exclusive vs. shared jobs
|
- **Job exclusivity filter**: New filter for exclusive vs. shared jobs
|
||||||
- **Improved error messages**: Better error messages and documentation in REST API
|
- **Improved error messages**: Better error messages and documentation in REST API
|
||||||
- **GraphQL enhancements**: Improved GraphQL queries and resolvers
|
- **GraphQL enhancements**: Improved GraphQL queries and resolvers
|
||||||
|
- **Stop job lookup order**: Reversed lookup order in stop job requests for
|
||||||
|
more reliable job matching (cluster+jobId first, then jobId alone)
|
||||||
|
|
||||||
### Performance
|
### Performance
|
||||||
|
|
||||||
@@ -109,13 +168,17 @@ For release specific notes visit the [ClusterCockpit Documentation](https://clus
|
|||||||
- **Job cache**: Introduced caching table for faster job inserts
|
- **Job cache**: Introduced caching table for faster job inserts
|
||||||
- **Parallel imports**: Archive imports now run in parallel where possible
|
- **Parallel imports**: Archive imports now run in parallel where possible
|
||||||
- **External tool integration**: Optimized use of external tools (fd) for better performance
|
- **External tool integration**: Optimized use of external tools (fd) for better performance
|
||||||
|
- **Node repository queries**: Reviewed and optimized node repository SQL queries
|
||||||
|
- **Buffer pool**: Resized and pooled internal buffers for better memory reuse
|
||||||
|
|
||||||
### Developer experience
|
### Developer experience
|
||||||
|
|
||||||
- **AI agent guidelines**: Added documentation for AI coding agents (AGENTS.md, CLAUDE.md)
|
- **AI agent guidelines**: Added documentation for AI coding agents (AGENTS.md, CLAUDE.md)
|
||||||
- **Example API payloads**: Added example JSON API payloads for testing
|
- **Example API payloads**: Added example JSON API payloads for testing
|
||||||
- **Unit tests**: Added more unit tests for NATS API and other components
|
- **Unit tests**: Added more unit tests for NATS API, node repository, and other components
|
||||||
- **Test improvements**: Better test coverage and test data
|
- **Test improvements**: Better test coverage; test DB is now copied before unit tests
|
||||||
|
to avoid state pollution between test runs
|
||||||
|
- **Parquet writer tests**: Comprehensive tests for Parquet archive writing and conversion
|
||||||
|
|
||||||
## Bug fixes
|
## Bug fixes
|
||||||
|
|
||||||
@@ -132,6 +195,16 @@ For release specific notes visit the [ClusterCockpit Documentation](https://clus
|
|||||||
- Fixed polar plot data query decoupling
|
- Fixed polar plot data query decoupling
|
||||||
- Fixed missing resolution parameter handling
|
- Fixed missing resolution parameter handling
|
||||||
- Fixed node table initialization fallback
|
- Fixed node table initialization fallback
|
||||||
|
- Fixed reactivity key placement in nodeList
|
||||||
|
- Fixed nodeList resolver data handling and increased nodestate filter cutoff
|
||||||
|
- Fixed job always being transferred to main job table before archiving
|
||||||
|
- Fixed AppTagger error handling and logging
|
||||||
|
- Fixed log endpoint formatting and correctness
|
||||||
|
- Fixed automatic refresh in metric status tab
|
||||||
|
- Fixed NULL value handling in `health_state` and `health_metrics` columns
|
||||||
|
- Fixed bugs related to `job_cache` IDs being used in the main job table
|
||||||
|
- Fixed SyncJobs bug causing start job hooks to be called with wrong (cache) IDs
|
||||||
|
- Fixed 404 handler route for sub-routers
|
||||||
|
|
||||||
## Configuration changes
|
## Configuration changes
|
||||||
|
|
||||||
@@ -167,6 +240,20 @@ For release specific notes visit the [ClusterCockpit Documentation](https://clus
|
|||||||
"interval": "48h",
|
"interval": "48h",
|
||||||
"directory": "./var/archive"
|
"directory": "./var/archive"
|
||||||
}
|
}
|
||||||
|
},
|
||||||
|
"archive": {
|
||||||
|
"retention": {
|
||||||
|
"policy": "delete",
|
||||||
|
"age": "6months",
|
||||||
|
"target-format": "parquet"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"nodestate": {
|
||||||
|
"retention": {
|
||||||
|
"policy": "archive",
|
||||||
|
"age": "30d",
|
||||||
|
"archive-path": "./var/nodestate-archive"
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
@@ -178,11 +265,13 @@ For release specific notes visit the [ClusterCockpit Documentation](https://clus
|
|||||||
- If using S3 archive backend, configure the new `archive` section options
|
- If using S3 archive backend, configure the new `archive` section options
|
||||||
- Test the new public dashboard at `/public` route
|
- Test the new public dashboard at `/public` route
|
||||||
- Review cron worker configuration if you need different frequencies
|
- Review cron worker configuration if you need different frequencies
|
||||||
|
- If using the archive retention feature, configure the `target-format` option
|
||||||
|
to choose between `json` (default) and `parquet` output formats
|
||||||
|
- Consider enabling nodestate retention if you track node states over time
|
||||||
|
|
||||||
## Known issues
|
## Known issues
|
||||||
|
|
||||||
- Currently energy footprint metrics of type energy are ignored for calculating
|
- Currently energy footprint metrics of type energy are ignored for calculating
|
||||||
total energy.
|
total energy.
|
||||||
- Resampling for running jobs only works with cc-metric-store
|
|
||||||
- With energy footprint metrics of type power the unit is ignored and it is
|
- With energy footprint metrics of type power the unit is ignored and it is
|
||||||
assumed the metric has the unit Watt.
|
assumed the metric has the unit Watt.
|
||||||
|
|||||||
@@ -389,8 +389,71 @@
|
|||||||
]
|
]
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
"/api/jobs/edit_meta/": {
|
||||||
|
"patch": {
|
||||||
|
"security": [
|
||||||
|
{
|
||||||
|
"ApiKeyAuth": []
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"description": "Edit key value pairs in metadata json of job specified by jobID, StartTime and Cluster\nIf a key already exists its content will be overwritten",
|
||||||
|
"consumes": [
|
||||||
|
"application/json"
|
||||||
|
],
|
||||||
|
"produces": [
|
||||||
|
"application/json"
|
||||||
|
],
|
||||||
|
"tags": [
|
||||||
|
"Job add and modify"
|
||||||
|
],
|
||||||
|
"summary": "Edit meta-data json by request",
|
||||||
|
"parameters": [
|
||||||
|
{
|
||||||
|
"description": "Specifies job and payload to add or update",
|
||||||
|
"name": "request",
|
||||||
|
"in": "body",
|
||||||
|
"required": true,
|
||||||
|
"schema": {
|
||||||
|
"$ref": "#/definitions/api.JobMetaRequest"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"responses": {
|
||||||
|
"200": {
|
||||||
|
"description": "Updated job resource",
|
||||||
|
"schema": {
|
||||||
|
"$ref": "#/definitions/schema.Job"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"400": {
|
||||||
|
"description": "Bad Request",
|
||||||
|
"schema": {
|
||||||
|
"$ref": "#/definitions/api.ErrorResponse"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"401": {
|
||||||
|
"description": "Unauthorized",
|
||||||
|
"schema": {
|
||||||
|
"$ref": "#/definitions/api.ErrorResponse"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"404": {
|
||||||
|
"description": "Job does not exist",
|
||||||
|
"schema": {
|
||||||
|
"$ref": "#/definitions/api.ErrorResponse"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"500": {
|
||||||
|
"description": "Internal Server Error",
|
||||||
|
"schema": {
|
||||||
|
"$ref": "#/definitions/api.ErrorResponse"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
"/api/jobs/edit_meta/{id}": {
|
"/api/jobs/edit_meta/{id}": {
|
||||||
"post": {
|
"patch": {
|
||||||
"description": "Edit key value pairs in job metadata json\nIf a key already exists its content will be overwritten",
|
"description": "Edit key value pairs in job metadata json\nIf a key already exists its content will be overwritten",
|
||||||
"consumes": [
|
"consumes": [
|
||||||
"application/json"
|
"application/json"
|
||||||
|
|||||||
@@ -102,6 +102,27 @@ definitions:
|
|||||||
description: Page id returned
|
description: Page id returned
|
||||||
type: integer
|
type: integer
|
||||||
type: object
|
type: object
|
||||||
|
api.JobMetaRequest:
|
||||||
|
properties:
|
||||||
|
cluster:
|
||||||
|
description: Cluster of job
|
||||||
|
example: fritz
|
||||||
|
type: string
|
||||||
|
jobId:
|
||||||
|
description: Cluster Job ID of job
|
||||||
|
example: 123000
|
||||||
|
type: integer
|
||||||
|
payload:
|
||||||
|
allOf:
|
||||||
|
- $ref: '#/definitions/api.EditMetaRequest'
|
||||||
|
description: Content to Add to Job Meta_Data
|
||||||
|
startTime:
|
||||||
|
description: Start Time of job as epoch
|
||||||
|
example: 1649723812
|
||||||
|
type: integer
|
||||||
|
required:
|
||||||
|
- jobId
|
||||||
|
type: object
|
||||||
api.JobMetricWithName:
|
api.JobMetricWithName:
|
||||||
properties:
|
properties:
|
||||||
metric:
|
metric:
|
||||||
@@ -1091,8 +1112,50 @@ paths:
|
|||||||
summary: Remove a job from the sql database
|
summary: Remove a job from the sql database
|
||||||
tags:
|
tags:
|
||||||
- Job remove
|
- Job remove
|
||||||
|
/api/jobs/edit_meta/:
|
||||||
|
patch:
|
||||||
|
consumes:
|
||||||
|
- application/json
|
||||||
|
description: |-
|
||||||
|
Edit key value pairs in metadata json of job specified by jobID, StartTime and Cluster
|
||||||
|
If a key already exists its content will be overwritten
|
||||||
|
parameters:
|
||||||
|
- description: Specifies job and payload to add or update
|
||||||
|
in: body
|
||||||
|
name: request
|
||||||
|
required: true
|
||||||
|
schema:
|
||||||
|
$ref: '#/definitions/api.JobMetaRequest'
|
||||||
|
produces:
|
||||||
|
- application/json
|
||||||
|
responses:
|
||||||
|
"200":
|
||||||
|
description: Updated job resource
|
||||||
|
schema:
|
||||||
|
$ref: '#/definitions/schema.Job'
|
||||||
|
"400":
|
||||||
|
description: Bad Request
|
||||||
|
schema:
|
||||||
|
$ref: '#/definitions/api.ErrorResponse'
|
||||||
|
"401":
|
||||||
|
description: Unauthorized
|
||||||
|
schema:
|
||||||
|
$ref: '#/definitions/api.ErrorResponse'
|
||||||
|
"404":
|
||||||
|
description: Job does not exist
|
||||||
|
schema:
|
||||||
|
$ref: '#/definitions/api.ErrorResponse'
|
||||||
|
"500":
|
||||||
|
description: Internal Server Error
|
||||||
|
schema:
|
||||||
|
$ref: '#/definitions/api.ErrorResponse'
|
||||||
|
security:
|
||||||
|
- ApiKeyAuth: []
|
||||||
|
summary: Edit meta-data json by request
|
||||||
|
tags:
|
||||||
|
- Job add and modify
|
||||||
/api/jobs/edit_meta/{id}:
|
/api/jobs/edit_meta/{id}:
|
||||||
post:
|
patch:
|
||||||
consumes:
|
consumes:
|
||||||
- application/json
|
- application/json
|
||||||
description: |-
|
description: |-
|
||||||
|
|||||||
@@ -369,13 +369,11 @@ func runServer(ctx context.Context) error {
|
|||||||
errChan := make(chan error, 1)
|
errChan := make(chan error, 1)
|
||||||
|
|
||||||
// Start HTTP server
|
// Start HTTP server
|
||||||
wg.Add(1)
|
wg.Go(func() {
|
||||||
go func() {
|
|
||||||
defer wg.Done()
|
|
||||||
if err := srv.Start(ctx); err != nil {
|
if err := srv.Start(ctx); err != nil {
|
||||||
errChan <- err
|
errChan <- err
|
||||||
}
|
}
|
||||||
}()
|
})
|
||||||
|
|
||||||
// Handle shutdown signals
|
// Handle shutdown signals
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
|
|||||||
25
configs/tagger/jobclasses/highMemoryUsage.json
Normal file
25
configs/tagger/jobclasses/highMemoryUsage.json
Normal file
@@ -0,0 +1,25 @@
|
|||||||
|
{
|
||||||
|
"name": "High memory usage",
|
||||||
|
"tag": "highmemory",
|
||||||
|
"parameters": [
|
||||||
|
"highmemoryusage_threshold_factor",
|
||||||
|
"job_min_duration_seconds"
|
||||||
|
],
|
||||||
|
"metrics": ["mem_used"],
|
||||||
|
"requirements": [
|
||||||
|
"job.shared == \"none\"",
|
||||||
|
"job.duration > job_min_duration_seconds"
|
||||||
|
],
|
||||||
|
"variables": [
|
||||||
|
{
|
||||||
|
"name": "memory_threshold",
|
||||||
|
"expr": "mem_used.limits.peak * highmemoryusage_threshold_factor"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "memory_usage_pct",
|
||||||
|
"expr": "mem_used.max / mem_used.limits.peak * 100.0"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"rule": "mem_used.max > memory_threshold",
|
||||||
|
"hint": "This job used high memory: peak memory usage {{.mem_used.max}} GB ({{.memory_usage_pct}}% of {{.mem_used.limits.peak}} GB node capacity), exceeding the {{.highmemoryusage_threshold_factor}} utilization threshold. Risk of out-of-memory conditions."
|
||||||
|
}
|
||||||
@@ -3,8 +3,7 @@
|
|||||||
"tag": "excessiveload",
|
"tag": "excessiveload",
|
||||||
"parameters": [
|
"parameters": [
|
||||||
"excessivecpuload_threshold_factor",
|
"excessivecpuload_threshold_factor",
|
||||||
"job_min_duration_seconds",
|
"job_min_duration_seconds"
|
||||||
"sampling_interval_seconds"
|
|
||||||
],
|
],
|
||||||
"metrics": ["cpu_load"],
|
"metrics": ["cpu_load"],
|
||||||
"requirements": [
|
"requirements": [
|
||||||
@@ -15,12 +14,8 @@
|
|||||||
{
|
{
|
||||||
"name": "load_threshold",
|
"name": "load_threshold",
|
||||||
"expr": "cpu_load.limits.peak * excessivecpuload_threshold_factor"
|
"expr": "cpu_load.limits.peak * excessivecpuload_threshold_factor"
|
||||||
},
|
|
||||||
{
|
|
||||||
"name": "load_perc",
|
|
||||||
"expr": "1.0 - (cpu_load.avg / cpu_load.limits.peak)"
|
|
||||||
}
|
}
|
||||||
],
|
],
|
||||||
"rule": "cpu_load.avg > load_threshold",
|
"rule": "cpu_load.avg > load_threshold",
|
||||||
"hint": "This job was detected as excessiveload because the average cpu load {{.cpu_load.avg}} falls above the threshold {{.load_threshold}}."
|
"hint": "This job was detected as having excessive CPU load: average cpu load {{.cpu_load.avg}} exceeds the oversubscription threshold {{.load_threshold}} ({{.excessivecpuload_threshold_factor}} \u00d7 {{.cpu_load.limits.peak}} peak cores), indicating CPU contention."
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
{
|
{
|
||||||
"name": "Low ressource utilization",
|
"name": "Low resource utilization",
|
||||||
"tag": "lowutilization",
|
"tag": "lowutilization",
|
||||||
"parameters": ["job_min_duration_seconds"],
|
"parameters": ["job_min_duration_seconds"],
|
||||||
"metrics": ["flops_any", "mem_bw"],
|
"metrics": ["flops_any", "mem_bw"],
|
||||||
@@ -9,14 +9,14 @@
|
|||||||
],
|
],
|
||||||
"variables": [
|
"variables": [
|
||||||
{
|
{
|
||||||
"name": "mem_bw_perc",
|
"name": "mem_bw_pct",
|
||||||
"expr": "1.0 - (mem_bw.avg / mem_bw.limits.peak)"
|
"expr": "mem_bw.avg / mem_bw.limits.peak * 100.0"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"name": "flops_any_perc",
|
"name": "flops_any_pct",
|
||||||
"expr": "1.0 - (flops_any.avg / flops_any.limits.peak)"
|
"expr": "flops_any.avg / flops_any.limits.peak * 100.0"
|
||||||
}
|
}
|
||||||
],
|
],
|
||||||
"rule": "flops_any.avg < flops_any.limits.alert && mem_bw.avg < mem_bw.limits.alert",
|
"rule": "flops_any.avg < flops_any.limits.alert && mem_bw.avg < mem_bw.limits.alert",
|
||||||
"hint": "This job was detected as low utilization because the average flop rate {{.flops_any.avg}} falls below the threshold {{.flops_any.limits.alert}}."
|
"hint": "This job shows low resource utilization: FLOP rate {{.flops_any.avg}} GF/s ({{.flops_any_pct}}% of peak) and memory bandwidth {{.mem_bw.avg}} GB/s ({{.mem_bw_pct}}% of peak) are both below their alert thresholds."
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -3,8 +3,7 @@
|
|||||||
"tag": "lowload",
|
"tag": "lowload",
|
||||||
"parameters": [
|
"parameters": [
|
||||||
"lowcpuload_threshold_factor",
|
"lowcpuload_threshold_factor",
|
||||||
"job_min_duration_seconds",
|
"job_min_duration_seconds"
|
||||||
"sampling_interval_seconds"
|
|
||||||
],
|
],
|
||||||
"metrics": ["cpu_load"],
|
"metrics": ["cpu_load"],
|
||||||
"requirements": [
|
"requirements": [
|
||||||
@@ -15,12 +14,8 @@
|
|||||||
{
|
{
|
||||||
"name": "load_threshold",
|
"name": "load_threshold",
|
||||||
"expr": "job.numCores * lowcpuload_threshold_factor"
|
"expr": "job.numCores * lowcpuload_threshold_factor"
|
||||||
},
|
|
||||||
{
|
|
||||||
"name": "load_perc",
|
|
||||||
"expr": "1.0 - (cpu_load.avg / cpu_load.limits.peak)"
|
|
||||||
}
|
}
|
||||||
],
|
],
|
||||||
"rule": "cpu_load.avg < cpu_load.limits.caution",
|
"rule": "cpu_load.avg < load_threshold",
|
||||||
"hint": "This job was detected as lowload because the average cpu load {{.cpu_load}} falls below the threshold {{.cpu_load.limits.caution}}."
|
"hint": "This job was detected as low CPU load: average cpu load {{.cpu_load.avg}} is below the threshold {{.load_threshold}} ({{.lowcpuload_threshold_factor}} \u00d7 {{.job.numCores}} allocated cores)."
|
||||||
}
|
}
|
||||||
|
|||||||
22
configs/tagger/jobclasses/memoryBound.json
Normal file
22
configs/tagger/jobclasses/memoryBound.json
Normal file
@@ -0,0 +1,22 @@
|
|||||||
|
{
|
||||||
|
"name": "Memory bandwidth bound",
|
||||||
|
"tag": "memorybound",
|
||||||
|
"parameters": ["membound_bw_threshold_factor", "job_min_duration_seconds"],
|
||||||
|
"metrics": ["mem_bw"],
|
||||||
|
"requirements": [
|
||||||
|
"job.shared == \"none\"",
|
||||||
|
"job.duration > job_min_duration_seconds"
|
||||||
|
],
|
||||||
|
"variables": [
|
||||||
|
{
|
||||||
|
"name": "mem_bw_threshold",
|
||||||
|
"expr": "mem_bw.limits.peak * membound_bw_threshold_factor"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "mem_bw_pct",
|
||||||
|
"expr": "mem_bw.avg / mem_bw.limits.peak * 100.0"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"rule": "mem_bw.avg > mem_bw_threshold",
|
||||||
|
"hint": "This job is memory bandwidth bound: memory bandwidth {{.mem_bw.avg}} GB/s ({{.mem_bw_pct}}% of peak) is within {{.membound_bw_threshold_factor}} of peak bandwidth. Consider improving data reuse or compute intensity."
|
||||||
|
}
|
||||||
@@ -1,11 +1,12 @@
|
|||||||
{
|
{
|
||||||
"lowcpuload_threshold_factor": 0.9,
|
"lowcpuload_threshold_factor": 0.85,
|
||||||
"excessivecpuload_threshold_factor": 1.1,
|
"excessivecpuload_threshold_factor": 1.2,
|
||||||
"highmemoryusage_threshold_factor": 0.9,
|
"highmemoryusage_threshold_factor": 0.9,
|
||||||
"node_load_imbalance_threshold_factor": 0.1,
|
"node_load_imbalance_threshold_factor": 0.1,
|
||||||
"core_load_imbalance_threshold_factor": 0.1,
|
"core_load_imbalance_threshold_factor": 0.1,
|
||||||
"high_memory_load_threshold_factor": 0.9,
|
"high_memory_load_threshold_factor": 0.9,
|
||||||
"lowgpuload_threshold_factor": 0.7,
|
"lowgpuload_threshold_factor": 0.7,
|
||||||
|
"membound_bw_threshold_factor": 0.8,
|
||||||
"memory_leak_slope_threshold": 0.1,
|
"memory_leak_slope_threshold": 0.1,
|
||||||
"job_min_duration_seconds": 600.0,
|
"job_min_duration_seconds": 600.0,
|
||||||
"sampling_interval_seconds": 30.0,
|
"sampling_interval_seconds": 30.0,
|
||||||
|
|||||||
@@ -396,8 +396,71 @@ const docTemplate = `{
|
|||||||
]
|
]
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
"/api/jobs/edit_meta/": {
|
||||||
|
"patch": {
|
||||||
|
"security": [
|
||||||
|
{
|
||||||
|
"ApiKeyAuth": []
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"description": "Edit key value pairs in metadata json of job specified by jobID, StartTime and Cluster\nIf a key already exists its content will be overwritten",
|
||||||
|
"consumes": [
|
||||||
|
"application/json"
|
||||||
|
],
|
||||||
|
"produces": [
|
||||||
|
"application/json"
|
||||||
|
],
|
||||||
|
"tags": [
|
||||||
|
"Job add and modify"
|
||||||
|
],
|
||||||
|
"summary": "Edit meta-data json by request",
|
||||||
|
"parameters": [
|
||||||
|
{
|
||||||
|
"description": "Specifies job and payload to add or update",
|
||||||
|
"name": "request",
|
||||||
|
"in": "body",
|
||||||
|
"required": true,
|
||||||
|
"schema": {
|
||||||
|
"$ref": "#/definitions/api.JobMetaRequest"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"responses": {
|
||||||
|
"200": {
|
||||||
|
"description": "Updated job resource",
|
||||||
|
"schema": {
|
||||||
|
"$ref": "#/definitions/schema.Job"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"400": {
|
||||||
|
"description": "Bad Request",
|
||||||
|
"schema": {
|
||||||
|
"$ref": "#/definitions/api.ErrorResponse"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"401": {
|
||||||
|
"description": "Unauthorized",
|
||||||
|
"schema": {
|
||||||
|
"$ref": "#/definitions/api.ErrorResponse"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"404": {
|
||||||
|
"description": "Job does not exist",
|
||||||
|
"schema": {
|
||||||
|
"$ref": "#/definitions/api.ErrorResponse"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"500": {
|
||||||
|
"description": "Internal Server Error",
|
||||||
|
"schema": {
|
||||||
|
"$ref": "#/definitions/api.ErrorResponse"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
"/api/jobs/edit_meta/{id}": {
|
"/api/jobs/edit_meta/{id}": {
|
||||||
"post": {
|
"patch": {
|
||||||
"description": "Edit key value pairs in job metadata json\nIf a key already exists its content will be overwritten",
|
"description": "Edit key value pairs in job metadata json\nIf a key already exists its content will be overwritten",
|
||||||
"consumes": [
|
"consumes": [
|
||||||
"application/json"
|
"application/json"
|
||||||
|
|||||||
@@ -72,6 +72,14 @@ type EditMetaRequest struct {
|
|||||||
Value string `json:"value" example:"bash script"`
|
Value string `json:"value" example:"bash script"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// JobMetaRequest model
|
||||||
|
type JobMetaRequest struct {
|
||||||
|
JobId *int64 `json:"jobId" validate:"required" example:"123000"` // Cluster Job ID of job
|
||||||
|
Cluster *string `json:"cluster" example:"fritz"` // Cluster of job
|
||||||
|
StartTime *int64 `json:"startTime" example:"1649723812"` // Start Time of job as epoch
|
||||||
|
Payload EditMetaRequest `json:"payload"` // Content to Add to Job Meta_Data
|
||||||
|
}
|
||||||
|
|
||||||
type TagJobAPIRequest []*APITag
|
type TagJobAPIRequest []*APITag
|
||||||
|
|
||||||
type GetJobAPIRequest []string
|
type GetJobAPIRequest []string
|
||||||
@@ -423,21 +431,21 @@ func (api *RestAPI) getJobByID(rw http.ResponseWriter, r *http.Request) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// editMeta godoc
|
// editMeta godoc
|
||||||
// @summary Edit meta-data json
|
// @summary Edit meta-data json of job identified by database id
|
||||||
// @tags Job add and modify
|
// @tags Job add and modify
|
||||||
// @description Edit key value pairs in job metadata json
|
// @description Edit key value pairs in job metadata json of job specified by database id
|
||||||
// @description If a key already exists its content will be overwritten
|
// @description If a key already exists its content will be overwritten
|
||||||
// @accept json
|
// @accept json
|
||||||
// @produce json
|
// @produce json
|
||||||
// @param id path int true "Job Database ID"
|
// @param id path int true "Job Database ID"
|
||||||
// @param request body api.EditMetaRequest true "Kay value pair to add"
|
// @param request body api.EditMetaRequest true "Metadata Key value pair to add or update"
|
||||||
// @success 200 {object} schema.Job "Updated job resource"
|
// @success 200 {object} schema.Job "Updated job resource"
|
||||||
// @failure 400 {object} api.ErrorResponse "Bad Request"
|
// @failure 400 {object} api.ErrorResponse "Bad Request"
|
||||||
// @failure 401 {object} api.ErrorResponse "Unauthorized"
|
// @failure 401 {object} api.ErrorResponse "Unauthorized"
|
||||||
// @failure 404 {object} api.ErrorResponse "Job does not exist"
|
// @failure 404 {object} api.ErrorResponse "Job does not exist"
|
||||||
// @failure 500 {object} api.ErrorResponse "Internal Server Error"
|
// @failure 500 {object} api.ErrorResponse "Internal Server Error"
|
||||||
// @security ApiKeyAuth
|
// @security ApiKeyAuth
|
||||||
// @router /api/jobs/edit_meta/{id} [post]
|
// @router /api/jobs/edit_meta/{id} [patch]
|
||||||
func (api *RestAPI) editMeta(rw http.ResponseWriter, r *http.Request) {
|
func (api *RestAPI) editMeta(rw http.ResponseWriter, r *http.Request) {
|
||||||
id, err := strconv.ParseInt(chi.URLParam(r, "id"), 10, 64)
|
id, err := strconv.ParseInt(chi.URLParam(r, "id"), 10, 64)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -469,6 +477,54 @@ func (api *RestAPI) editMeta(rw http.ResponseWriter, r *http.Request) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// editMetaByRequest godoc
|
||||||
|
// @summary Edit meta-data json of job identified by request
|
||||||
|
// @tags Job add and modify
|
||||||
|
// @description Edit key value pairs in metadata json of job specified by jobID, StartTime and Cluster
|
||||||
|
// @description If a key already exists its content will be overwritten
|
||||||
|
// @accept json
|
||||||
|
// @produce json
|
||||||
|
// @param request body api.JobMetaRequest true "Specifies job and payload to add or update"
|
||||||
|
// @success 200 {object} schema.Job "Updated job resource"
|
||||||
|
// @failure 400 {object} api.ErrorResponse "Bad Request"
|
||||||
|
// @failure 401 {object} api.ErrorResponse "Unauthorized"
|
||||||
|
// @failure 404 {object} api.ErrorResponse "Job does not exist"
|
||||||
|
// @failure 500 {object} api.ErrorResponse "Internal Server Error"
|
||||||
|
// @security ApiKeyAuth
|
||||||
|
// @router /api/jobs/edit_meta/ [patch]
|
||||||
|
func (api *RestAPI) editMetaByRequest(rw http.ResponseWriter, r *http.Request) {
|
||||||
|
// Parse request body
|
||||||
|
req := JobMetaRequest{}
|
||||||
|
if err := decode(r.Body, &req); err != nil {
|
||||||
|
handleError(fmt.Errorf("parsing request body failed: %w", err), http.StatusBadRequest, rw)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fetch job (that will have its meta_data edited) from db
|
||||||
|
var job *schema.Job
|
||||||
|
var err error
|
||||||
|
if req.JobId == nil {
|
||||||
|
handleError(errors.New("the field 'jobId' is required"), http.StatusBadRequest, rw)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// log.Printf("loading db job for editMetaByRequest... : JobMetaRequest=%v", req)
|
||||||
|
job, err = api.JobRepository.Find(req.JobId, req.Cluster, req.StartTime)
|
||||||
|
if err != nil {
|
||||||
|
handleError(fmt.Errorf("finding job failed: %w", err), http.StatusUnprocessableEntity, rw)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := api.JobRepository.UpdateMetadata(job, req.Payload.Key, req.Payload.Value); err != nil {
|
||||||
|
http.Error(rw, err.Error(), http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
rw.Header().Add("Content-Type", "application/json")
|
||||||
|
rw.WriteHeader(http.StatusOK)
|
||||||
|
json.NewEncoder(rw).Encode(job)
|
||||||
|
}
|
||||||
|
|
||||||
// tagJob godoc
|
// tagJob godoc
|
||||||
// @summary Adds one or more tags to a job
|
// @summary Adds one or more tags to a job
|
||||||
// @tags Job add and modify
|
// @tags Job add and modify
|
||||||
@@ -904,11 +960,13 @@ func (api *RestAPI) deleteJobBefore(rw http.ResponseWriter, r *http.Request) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Check for omit-tagged query parameter
|
// Check for omit-tagged query parameter
|
||||||
omitTagged := false
|
omitTagged := "none"
|
||||||
if omitTaggedStr := r.URL.Query().Get("omit-tagged"); omitTaggedStr != "" {
|
if omitTaggedStr := r.URL.Query().Get("omit-tagged"); omitTaggedStr != "" {
|
||||||
omitTagged, e = strconv.ParseBool(omitTaggedStr)
|
switch omitTaggedStr {
|
||||||
if e != nil {
|
case "none", "all", "user":
|
||||||
handleError(fmt.Errorf("boolean expected for omit-tagged parameter: %w", e), http.StatusBadRequest, rw)
|
omitTagged = omitTaggedStr
|
||||||
|
default:
|
||||||
|
handleError(fmt.Errorf("omit-tagged must be one of: none, all, user"), http.StatusBadRequest, rw)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -96,8 +96,8 @@ func (api *RestAPI) MountAPIRoutes(r chi.Router) {
|
|||||||
r.Post("/jobs/tag_job/{id}", api.tagJob)
|
r.Post("/jobs/tag_job/{id}", api.tagJob)
|
||||||
r.Patch("/jobs/tag_job/{id}", api.tagJob)
|
r.Patch("/jobs/tag_job/{id}", api.tagJob)
|
||||||
r.Delete("/jobs/tag_job/{id}", api.removeTagJob)
|
r.Delete("/jobs/tag_job/{id}", api.removeTagJob)
|
||||||
r.Post("/jobs/edit_meta/{id}", api.editMeta)
|
|
||||||
r.Patch("/jobs/edit_meta/{id}", api.editMeta)
|
r.Patch("/jobs/edit_meta/{id}", api.editMeta)
|
||||||
|
r.Patch("/jobs/edit_meta/", api.editMetaByRequest)
|
||||||
r.Get("/jobs/metrics/{id}", api.getJobMetrics)
|
r.Get("/jobs/metrics/{id}", api.getJobMetrics)
|
||||||
r.Delete("/jobs/delete_job/", api.deleteJobByRequest)
|
r.Delete("/jobs/delete_job/", api.deleteJobByRequest)
|
||||||
r.Delete("/jobs/delete_job/{id}", api.deleteJobByID)
|
r.Delete("/jobs/delete_job/{id}", api.deleteJobByID)
|
||||||
|
|||||||
@@ -392,15 +392,19 @@ func (r *JobRepository) FetchEnergyFootprint(job *schema.Job) (map[string]float6
|
|||||||
//
|
//
|
||||||
// Parameters:
|
// Parameters:
|
||||||
// - startTime: Unix timestamp, jobs with start_time < this value will be deleted
|
// - startTime: Unix timestamp, jobs with start_time < this value will be deleted
|
||||||
// - omitTagged: If true, skip jobs that have associated tags (jobtag entries)
|
// - omitTagged: "none" = delete all jobs, "all" = skip any tagged jobs,
|
||||||
|
// "user" = skip jobs with user-created tags (not auto-tagger types "app"/"jobClass")
|
||||||
//
|
//
|
||||||
// Returns the count of deleted jobs or an error if the operation fails.
|
// Returns the count of deleted jobs or an error if the operation fails.
|
||||||
func (r *JobRepository) DeleteJobsBefore(startTime int64, omitTagged bool) (int, error) {
|
func (r *JobRepository) DeleteJobsBefore(startTime int64, omitTagged string) (int, error) {
|
||||||
var cnt int
|
var cnt int
|
||||||
q := sq.Select("count(*)").From("job").Where("job.start_time < ?", startTime)
|
q := sq.Select("count(*)").From("job").Where("job.start_time < ?", startTime)
|
||||||
|
|
||||||
if omitTagged {
|
switch omitTagged {
|
||||||
|
case "all":
|
||||||
q = q.Where("NOT EXISTS (SELECT 1 FROM jobtag WHERE jobtag.job_id = job.id)")
|
q = q.Where("NOT EXISTS (SELECT 1 FROM jobtag WHERE jobtag.job_id = job.id)")
|
||||||
|
case "user":
|
||||||
|
q = q.Where("NOT EXISTS (SELECT 1 FROM jobtag JOIN tag ON tag.id = jobtag.tag_id WHERE jobtag.job_id = job.id AND tag.tag_type NOT IN ('app', 'jobClass'))")
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := q.RunWith(r.DB).QueryRow().Scan(&cnt); err != nil {
|
if err := q.RunWith(r.DB).QueryRow().Scan(&cnt); err != nil {
|
||||||
@@ -413,8 +417,11 @@ func (r *JobRepository) DeleteJobsBefore(startTime int64, omitTagged bool) (int,
|
|||||||
var jobIds []int64
|
var jobIds []int64
|
||||||
selectQuery := sq.Select("id").From("job").Where("job.start_time < ?", startTime)
|
selectQuery := sq.Select("id").From("job").Where("job.start_time < ?", startTime)
|
||||||
|
|
||||||
if omitTagged {
|
switch omitTagged {
|
||||||
|
case "all":
|
||||||
selectQuery = selectQuery.Where("NOT EXISTS (SELECT 1 FROM jobtag WHERE jobtag.job_id = job.id)")
|
selectQuery = selectQuery.Where("NOT EXISTS (SELECT 1 FROM jobtag WHERE jobtag.job_id = job.id)")
|
||||||
|
case "user":
|
||||||
|
selectQuery = selectQuery.Where("NOT EXISTS (SELECT 1 FROM jobtag JOIN tag ON tag.id = jobtag.tag_id WHERE jobtag.job_id = job.id AND tag.tag_type NOT IN ('app', 'jobClass'))")
|
||||||
}
|
}
|
||||||
|
|
||||||
rows, err := selectQuery.RunWith(r.DB).Query()
|
rows, err := selectQuery.RunWith(r.DB).Query()
|
||||||
@@ -436,8 +443,11 @@ func (r *JobRepository) DeleteJobsBefore(startTime int64, omitTagged bool) (int,
|
|||||||
|
|
||||||
qd := sq.Delete("job").Where("job.start_time < ?", startTime)
|
qd := sq.Delete("job").Where("job.start_time < ?", startTime)
|
||||||
|
|
||||||
if omitTagged {
|
switch omitTagged {
|
||||||
|
case "all":
|
||||||
qd = qd.Where("NOT EXISTS (SELECT 1 FROM jobtag WHERE jobtag.job_id = job.id)")
|
qd = qd.Where("NOT EXISTS (SELECT 1 FROM jobtag WHERE jobtag.job_id = job.id)")
|
||||||
|
case "user":
|
||||||
|
qd = qd.Where("NOT EXISTS (SELECT 1 FROM jobtag JOIN tag ON tag.id = jobtag.tag_id WHERE jobtag.job_id = job.id AND tag.tag_type NOT IN ('app', 'jobClass'))")
|
||||||
}
|
}
|
||||||
_, err := qd.RunWith(r.DB).Exec()
|
_, err := qd.RunWith(r.DB).Exec()
|
||||||
|
|
||||||
@@ -822,10 +832,11 @@ func (r *JobRepository) UpdateDuration() error {
|
|||||||
// Parameters:
|
// Parameters:
|
||||||
// - startTimeBegin: Unix timestamp for range start (use 0 for unbounded start)
|
// - startTimeBegin: Unix timestamp for range start (use 0 for unbounded start)
|
||||||
// - startTimeEnd: Unix timestamp for range end
|
// - startTimeEnd: Unix timestamp for range end
|
||||||
// - omitTagged: If true, exclude jobs with associated tags
|
// - omitTagged: "none" = include all jobs, "all" = exclude any tagged jobs,
|
||||||
|
// "user" = exclude jobs with user-created tags (not auto-tagger types "app"/"jobClass")
|
||||||
//
|
//
|
||||||
// Returns a slice of jobs or an error if the time range is invalid or query fails.
|
// Returns a slice of jobs or an error if the time range is invalid or query fails.
|
||||||
func (r *JobRepository) FindJobsBetween(startTimeBegin int64, startTimeEnd int64, omitTagged bool) ([]*schema.Job, error) {
|
func (r *JobRepository) FindJobsBetween(startTimeBegin int64, startTimeEnd int64, omitTagged string) ([]*schema.Job, error) {
|
||||||
var query sq.SelectBuilder
|
var query sq.SelectBuilder
|
||||||
|
|
||||||
if startTimeBegin == startTimeEnd || startTimeBegin > startTimeEnd {
|
if startTimeBegin == startTimeEnd || startTimeBegin > startTimeEnd {
|
||||||
@@ -840,8 +851,11 @@ func (r *JobRepository) FindJobsBetween(startTimeBegin int64, startTimeEnd int64
|
|||||||
query = sq.Select(jobColumns...).From("job").Where("job.start_time BETWEEN ? AND ?", startTimeBegin, startTimeEnd)
|
query = sq.Select(jobColumns...).From("job").Where("job.start_time BETWEEN ? AND ?", startTimeBegin, startTimeEnd)
|
||||||
}
|
}
|
||||||
|
|
||||||
if omitTagged {
|
switch omitTagged {
|
||||||
|
case "all":
|
||||||
query = query.Where("NOT EXISTS (SELECT 1 FROM jobtag WHERE jobtag.job_id = job.id)")
|
query = query.Where("NOT EXISTS (SELECT 1 FROM jobtag WHERE jobtag.job_id = job.id)")
|
||||||
|
case "user":
|
||||||
|
query = query.Where("NOT EXISTS (SELECT 1 FROM jobtag JOIN tag ON tag.id = jobtag.tag_id WHERE jobtag.job_id = job.id AND tag.tag_type NOT IN ('app', 'jobClass'))")
|
||||||
}
|
}
|
||||||
|
|
||||||
query = query.OrderBy("job.cluster ASC", "job.subcluster ASC", "job.project ASC", "job.start_time ASC")
|
query = query.OrderBy("job.cluster ASC", "job.subcluster ASC", "job.project ASC", "job.start_time ASC")
|
||||||
|
|||||||
@@ -78,7 +78,7 @@ func TestFindJobsBetween(t *testing.T) {
|
|||||||
|
|
||||||
// 1. Find a job to use (Find all jobs)
|
// 1. Find a job to use (Find all jobs)
|
||||||
// We use a large time range to ensure we get something if it exists
|
// We use a large time range to ensure we get something if it exists
|
||||||
jobs, err := r.FindJobsBetween(0, 9999999999, false)
|
jobs, err := r.FindJobsBetween(0, 9999999999, "none")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@@ -88,21 +88,21 @@ func TestFindJobsBetween(t *testing.T) {
|
|||||||
|
|
||||||
targetJob := jobs[0]
|
targetJob := jobs[0]
|
||||||
|
|
||||||
// 2. Create a tag
|
// 2. Create an auto-tagger tag (type "app")
|
||||||
tagName := fmt.Sprintf("testtag_%d", time.Now().UnixNano())
|
appTagName := fmt.Sprintf("apptag_%d", time.Now().UnixNano())
|
||||||
tagID, err := r.CreateTag("testtype", tagName, "global")
|
appTagID, err := r.CreateTag("app", appTagName, "global")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// 3. Link Tag (Manually to avoid archive dependency side-effects in unit test)
|
// 3. Link auto-tagger tag to job
|
||||||
_, err = r.DB.Exec("INSERT INTO jobtag (job_id, tag_id) VALUES (?, ?)", *targetJob.ID, tagID)
|
_, err = r.DB.Exec("INSERT INTO jobtag (job_id, tag_id) VALUES (?, ?)", *targetJob.ID, appTagID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// 4. Search with omitTagged = false (Should find the job)
|
// 4. Search with omitTagged = "none" (Should find the job)
|
||||||
jobsFound, err := r.FindJobsBetween(0, 9999999999, false)
|
jobsFound, err := r.FindJobsBetween(0, 9999999999, "none")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@@ -115,18 +115,58 @@ func TestFindJobsBetween(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if !found {
|
if !found {
|
||||||
t.Errorf("Target job %d should be found when omitTagged=false", *targetJob.ID)
|
t.Errorf("Target job %d should be found when omitTagged=none", *targetJob.ID)
|
||||||
}
|
}
|
||||||
|
|
||||||
// 5. Search with omitTagged = true (Should NOT find the job)
|
// 5. Search with omitTagged = "all" (Should NOT find the job — it has a tag)
|
||||||
jobsFiltered, err := r.FindJobsBetween(0, 9999999999, true)
|
jobsFiltered, err := r.FindJobsBetween(0, 9999999999, "all")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, j := range jobsFiltered {
|
for _, j := range jobsFiltered {
|
||||||
if *j.ID == *targetJob.ID {
|
if *j.ID == *targetJob.ID {
|
||||||
t.Errorf("Target job %d should NOT be found when omitTagged=true", *targetJob.ID)
|
t.Errorf("Target job %d should NOT be found when omitTagged=all", *targetJob.ID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 6. Search with omitTagged = "user": auto-tagger tag ("app") should NOT exclude the job
|
||||||
|
jobsUserFilter, err := r.FindJobsBetween(0, 9999999999, "user")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
found = false
|
||||||
|
for _, j := range jobsUserFilter {
|
||||||
|
if *j.ID == *targetJob.ID {
|
||||||
|
found = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !found {
|
||||||
|
t.Errorf("Target job %d should be found when omitTagged=user (only has auto-tagger tag)", *targetJob.ID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 7. Add a user-created tag (type "testtype") to the same job
|
||||||
|
userTagName := fmt.Sprintf("usertag_%d", time.Now().UnixNano())
|
||||||
|
userTagID, err := r.CreateTag("testtype", userTagName, "global")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
_, err = r.DB.Exec("INSERT INTO jobtag (job_id, tag_id) VALUES (?, ?)", *targetJob.ID, userTagID)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 8. Now omitTagged = "user" should exclude the job (has a user-created tag)
|
||||||
|
jobsUserFilter2, err := r.FindJobsBetween(0, 9999999999, "user")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, j := range jobsUserFilter2 {
|
||||||
|
if *j.ID == *targetJob.ID {
|
||||||
|
t.Errorf("Target job %d should NOT be found when omitTagged=user (has user-created tag)", *targetJob.ID)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -28,10 +28,10 @@ func RegisterCompressionService(compressOlderThan int) {
|
|||||||
lastTime := ar.CompressLast(startTime)
|
lastTime := ar.CompressLast(startTime)
|
||||||
if startTime == lastTime {
|
if startTime == lastTime {
|
||||||
cclog.Info("Compression Service - Complete archive run")
|
cclog.Info("Compression Service - Complete archive run")
|
||||||
jobs, err = jobRepo.FindJobsBetween(0, startTime, false)
|
jobs, err = jobRepo.FindJobsBetween(0, startTime, "none")
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
jobs, err = jobRepo.FindJobsBetween(lastTime, startTime, false)
|
jobs, err = jobRepo.FindJobsBetween(lastTime, startTime, "none")
|
||||||
}
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@@ -149,7 +149,7 @@ func transferJobsParquet(jobs []*schema.Job, src archive.ArchiveBackend, target
|
|||||||
}
|
}
|
||||||
|
|
||||||
// cleanupAfterTransfer removes jobs from archive and optionally from DB.
|
// cleanupAfterTransfer removes jobs from archive and optionally from DB.
|
||||||
func cleanupAfterTransfer(jobs []*schema.Job, startTime int64, includeDB bool, omitTagged bool) {
|
func cleanupAfterTransfer(jobs []*schema.Job, startTime int64, includeDB bool, omitTagged string) {
|
||||||
archive.GetHandle().CleanUp(jobs)
|
archive.GetHandle().CleanUp(jobs)
|
||||||
|
|
||||||
if includeDB {
|
if includeDB {
|
||||||
|
|||||||
@@ -26,8 +26,8 @@ type Retention struct {
|
|||||||
Policy string `json:"policy"`
|
Policy string `json:"policy"`
|
||||||
Format string `json:"format"`
|
Format string `json:"format"`
|
||||||
Age int `json:"age"`
|
Age int `json:"age"`
|
||||||
IncludeDB bool `json:"includeDB"`
|
IncludeDB bool `json:"include-db"`
|
||||||
OmitTagged bool `json:"omitTagged"`
|
OmitTagged string `json:"omit-tagged"`
|
||||||
TargetKind string `json:"target-kind"`
|
TargetKind string `json:"target-kind"`
|
||||||
TargetPath string `json:"target-path"`
|
TargetPath string `json:"target-path"`
|
||||||
TargetEndpoint string `json:"target-endpoint"`
|
TargetEndpoint string `json:"target-endpoint"`
|
||||||
|
|||||||
@@ -68,6 +68,11 @@ var configSchema = `
|
|||||||
"description": "Also remove jobs from database",
|
"description": "Also remove jobs from database",
|
||||||
"type": "boolean"
|
"type": "boolean"
|
||||||
},
|
},
|
||||||
|
"omit-tagged": {
|
||||||
|
"description": "Omit tagged jobs from retention: none = include all, all = omit any tagged job, user = omit jobs with user-created tags (auto-tagger types 'app'/'jobClass' are not considered user tags)",
|
||||||
|
"type": "string",
|
||||||
|
"enum": ["none", "all", "user"]
|
||||||
|
},
|
||||||
"age": {
|
"age": {
|
||||||
"description": "Act on jobs with startTime older than age (in days)",
|
"description": "Act on jobs with startTime older than age (in days)",
|
||||||
"type": "integer"
|
"type": "integer"
|
||||||
|
|||||||
@@ -32,15 +32,6 @@
|
|||||||
|
|
||||||
/* Const Init */
|
/* Const Init */
|
||||||
const client = getContextClient();
|
const client = getContextClient();
|
||||||
const stateOptions = [
|
|
||||||
"all",
|
|
||||||
"allocated",
|
|
||||||
"idle",
|
|
||||||
"down",
|
|
||||||
"mixed",
|
|
||||||
"reserved",
|
|
||||||
"unknown",
|
|
||||||
];
|
|
||||||
const healthOptions = [
|
const healthOptions = [
|
||||||
"all",
|
"all",
|
||||||
"full",
|
"full",
|
||||||
@@ -52,12 +43,10 @@
|
|||||||
let pieWidth = $state(0);
|
let pieWidth = $state(0);
|
||||||
let querySorting = $state({ field: "startTime", type: "col", order: "DESC" })
|
let querySorting = $state({ field: "startTime", type: "col", order: "DESC" })
|
||||||
let tableHostFilter = $state("");
|
let tableHostFilter = $state("");
|
||||||
let tableStateFilter = $state(stateOptions[0]);
|
|
||||||
let tableHealthFilter = $state(healthOptions[0]);
|
let tableHealthFilter = $state(healthOptions[0]);
|
||||||
let healthTableSorting = $state(
|
let healthTableSorting = $state(
|
||||||
{
|
{
|
||||||
schedulerState: { dir: "down", active: true },
|
healthState: { dir: "up", active: true },
|
||||||
healthState: { dir: "down", active: false },
|
|
||||||
hostname: { dir: "down", active: false },
|
hostname: { dir: "down", active: false },
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
@@ -79,9 +68,7 @@
|
|||||||
hostname
|
hostname
|
||||||
cluster
|
cluster
|
||||||
subCluster
|
subCluster
|
||||||
schedulerState
|
|
||||||
healthState
|
healthState
|
||||||
metaData
|
|
||||||
healthData
|
healthData
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -102,7 +89,7 @@
|
|||||||
let healthTableData = $derived.by(() => {
|
let healthTableData = $derived.by(() => {
|
||||||
if ($statusQuery?.data) {
|
if ($statusQuery?.data) {
|
||||||
return [...$statusQuery.data.nodes.items].sort((n1, n2) => {
|
return [...$statusQuery.data.nodes.items].sort((n1, n2) => {
|
||||||
return n1['schedulerState'].localeCompare(n2['schedulerState'])
|
return n1['healthState'].localeCompare(n2['healthState'])
|
||||||
});
|
});
|
||||||
} else {
|
} else {
|
||||||
return [];
|
return [];
|
||||||
@@ -114,21 +101,12 @@
|
|||||||
if (tableHostFilter != "") {
|
if (tableHostFilter != "") {
|
||||||
pendingTableData = pendingTableData.filter((e) => e.hostname.includes(tableHostFilter))
|
pendingTableData = pendingTableData.filter((e) => e.hostname.includes(tableHostFilter))
|
||||||
}
|
}
|
||||||
if (tableStateFilter != "all") {
|
|
||||||
pendingTableData = pendingTableData.filter((e) => e.schedulerState.includes(tableStateFilter))
|
|
||||||
}
|
|
||||||
if (tableHealthFilter != "all") {
|
if (tableHealthFilter != "all") {
|
||||||
pendingTableData = pendingTableData.filter((e) => e.healthState.includes(tableHealthFilter))
|
pendingTableData = pendingTableData.filter((e) => e.healthState.includes(tableHealthFilter))
|
||||||
}
|
}
|
||||||
return pendingTableData
|
return pendingTableData
|
||||||
});
|
});
|
||||||
|
|
||||||
const refinedStateData = $derived.by(() => {
|
|
||||||
return $statusQuery?.data?.nodeStates.
|
|
||||||
filter((e) => ['allocated', 'reserved', 'idle', 'mixed','down', 'unknown'].includes(e.state)).
|
|
||||||
sort((a, b) => b.count - a.count)
|
|
||||||
});
|
|
||||||
|
|
||||||
const refinedHealthData = $derived.by(() => {
|
const refinedHealthData = $derived.by(() => {
|
||||||
return $statusQuery?.data?.nodeStates.
|
return $statusQuery?.data?.nodeStates.
|
||||||
filter((e) => ['full', 'partial', 'failed'].includes(e.state)).
|
filter((e) => ['full', 'partial', 'failed'].includes(e.state)).
|
||||||
@@ -296,7 +274,7 @@
|
|||||||
<thead>
|
<thead>
|
||||||
<!-- Header Row 1: Titles and Sorting -->
|
<!-- Header Row 1: Titles and Sorting -->
|
||||||
<tr>
|
<tr>
|
||||||
<th style="width: 9%; min-width: 100px; max-width:10%;" onclick={() => sortBy('hostname')}>
|
<th style="width: 10%; min-width: 100px; max-width:12%;" onclick={() => sortBy('hostname')}>
|
||||||
Hosts ({filteredTableData.length})
|
Hosts ({filteredTableData.length})
|
||||||
<Icon
|
<Icon
|
||||||
name="caret-{healthTableSorting['hostname'].dir}{healthTableSorting['hostname']
|
name="caret-{healthTableSorting['hostname'].dir}{healthTableSorting['hostname']
|
||||||
@@ -304,17 +282,8 @@
|
|||||||
? '-fill'
|
? '-fill'
|
||||||
: ''}"
|
: ''}"
|
||||||
/>
|
/>
|
||||||
</th>
|
|
||||||
<th style="width: 9%; min-width: 100px; max-width:10%;" onclick={() => sortBy('schedulerState')}>
|
|
||||||
Scheduler State
|
|
||||||
<Icon
|
|
||||||
name="caret-{healthTableSorting['schedulerState'].dir}{healthTableSorting['schedulerState']
|
|
||||||
.active
|
|
||||||
? '-fill'
|
|
||||||
: ''}"
|
|
||||||
/>
|
|
||||||
</th>
|
</th>
|
||||||
<th style="width: 9%; min-width: 100px; max-width:10%;" onclick={() => sortBy('healthState')}>
|
<th style="width: 10%; min-width: 100px; max-width:12%;" onclick={() => sortBy('healthState')}>
|
||||||
Health State
|
Health State
|
||||||
<Icon
|
<Icon
|
||||||
name="caret-{healthTableSorting['healthState'].dir}{healthTableSorting['healthState']
|
name="caret-{healthTableSorting['healthState'].dir}{healthTableSorting['healthState']
|
||||||
@@ -324,7 +293,6 @@
|
|||||||
/>
|
/>
|
||||||
</th>
|
</th>
|
||||||
<th>Metric Availability</th>
|
<th>Metric Availability</th>
|
||||||
<th>Meta Information</th>
|
|
||||||
</tr>
|
</tr>
|
||||||
<!-- Header Row 2: Filters -->
|
<!-- Header Row 2: Filters -->
|
||||||
<tr>
|
<tr>
|
||||||
@@ -337,53 +305,27 @@
|
|||||||
</InputGroup>
|
</InputGroup>
|
||||||
</th>
|
</th>
|
||||||
<th>
|
<th>
|
||||||
<InputGroup size="sm">
|
<Input size="sm" type="select" bind:value={tableHealthFilter}>
|
||||||
<Input type="select" bind:value={tableStateFilter}>
|
|
||||||
{#each stateOptions as so}
|
|
||||||
<option value={so}>{so}</option>
|
|
||||||
{/each}
|
|
||||||
</Input>
|
|
||||||
<InputGroupText>
|
|
||||||
<Icon name="search"></Icon>
|
|
||||||
</InputGroupText>
|
|
||||||
</InputGroup>
|
|
||||||
</th>
|
|
||||||
<th>
|
|
||||||
<InputGroup size="sm">
|
|
||||||
<Input type="select" bind:value={tableHealthFilter}>
|
|
||||||
{#each healthOptions as ho}
|
{#each healthOptions as ho}
|
||||||
<option value={ho}>{ho}</option>
|
<option value={ho}>{ho}</option>
|
||||||
{/each}
|
{/each}
|
||||||
</Input>
|
</Input>
|
||||||
<InputGroupText>
|
|
||||||
<Icon name="search"></Icon>
|
|
||||||
</InputGroupText>
|
|
||||||
</InputGroup>
|
|
||||||
</th>
|
</th>
|
||||||
<th></th>
|
<th></th>
|
||||||
<th></th>
|
|
||||||
</tr>
|
</tr>
|
||||||
</thead>
|
</thead>
|
||||||
<tbody>
|
<tbody>
|
||||||
{#each filteredTableData as host (host.hostname)}
|
{#each filteredTableData as host (host.hostname)}
|
||||||
<tr>
|
<tr>
|
||||||
<th scope="row"><b><a href="/monitoring/node/{cluster}/{host.hostname}" target="_blank">{host.hostname}</a></b></th>
|
<th scope="row"><b><a href="/monitoring/node/{cluster}/{host.hostname}" target="_blank">{host.hostname}</a></b></th>
|
||||||
<td>{host.schedulerState}</td>
|
|
||||||
<td>{host.healthState}</td>
|
<td>{host.healthState}</td>
|
||||||
<td style="max-width: 250px;">
|
<td style="max-width: 76%;">
|
||||||
{#each Object.keys(host.healthData) as hkey}
|
{#each Object.keys(host.healthData) as hkey}
|
||||||
<p>
|
<p>
|
||||||
<b>{hkey}</b>: {host.healthData[hkey]}
|
<b>{hkey}</b>: {host.healthData[hkey]}
|
||||||
</p>
|
</p>
|
||||||
{/each}
|
{/each}
|
||||||
</td>
|
</td>
|
||||||
<td style="max-width: 250px;">
|
|
||||||
{#each Object.keys(host.metaData) as mkey}
|
|
||||||
<p>
|
|
||||||
<b>{mkey}</b>: {host.metaData[mkey]}
|
|
||||||
</p>
|
|
||||||
{/each}
|
|
||||||
</td>
|
|
||||||
</tr>
|
</tr>
|
||||||
{/each}
|
{/each}
|
||||||
</tbody>
|
</tbody>
|
||||||
|
|||||||
Reference in New Issue
Block a user