Merge branch 'dev' into optimize-checkpoint-loading

This commit is contained in:
2026-02-23 08:53:26 +01:00
32 changed files with 990 additions and 126 deletions

View File

@@ -16,19 +16,25 @@ For release specific notes visit the [ClusterCockpit Documentation](https://clus
- **Removed `disable-archive` option**: This obsolete configuration option has been removed.
- **Removed `clusters` config section**: The separate clusters configuration section
has been removed. Cluster information is now derived from the job archive.
- **`apiAllowedIPs` is now optional**: If not specified, defaults to secure settings.
- **`apiAllowedIPs` is now optional**: If not specified, defaults to not
restricted.
### Architecture changes
- **Web framework replaced**: Migrated from `gorilla/mux` to `chi` as the HTTP
router. This should be transparent to users but affects how middleware and
routes are composed. A proper 404 handler is now in place.
- **MetricStore moved**: The `metricstore` package has been moved from `internal/`
to `pkg/` as it is now part of the public API.
- **MySQL/MariaDB support removed**: Only SQLite is now supported as the database backend.
- **Archive to Cleanup renaming**: Archive-related functions have been refactored
and renamed to "Cleanup" for clarity.
- **`minRunningFor` filter removed**: This undocumented filter has been removed
from the API and frontend.
### Dependency changes
- **cc-lib v2**: Switched to cc-lib version 2 with updated APIs
- **cc-lib v2.5.1**: Switched to cc-lib version 2 with updated APIs (currently at v2.5.1)
- **cclib NATS client**: Now using the cclib NATS client implementation
- Removed obsolete `util.Float` usage from cclib
@@ -51,13 +57,30 @@ For release specific notes visit the [ClusterCockpit Documentation](https://clus
- **Node state tracking**: New node table in database with timestamp tracking
- **Node state filtering**: Filter jobs by node state in systems view
- **Node metrics improvements**: Better handling of node-level metrics and data
- **Node list enhancements**: Improved paging, filtering, and continuous scroll support
- **Nodestate retention and archiving**: Node state data is now subject to configurable
retention policies and can be archived to Parquet format for long-term storage
- **Faulty node metric tracking**: Faulty node state metric lists are persisted to the database
### Health Monitoring
- **Health status dashboard**: New dedicated "Health" tab in the status details view
showing per-node metric health across the cluster
- **CCMS health check**: Support for querying health status of external
cc-metric-store (CCMS) instances via the API
- **GraphQL health endpoints**: New GraphQL queries and resolvers for health data
- **Cluster/subcluster filter**: Filter health status view by cluster or subcluster
### Log Viewer
- **Web-based log viewer**: New log viewer page in the admin interface for inspecting
backend log output directly from the browser without shell access
- **Accessible from header**: Quick access link from the navigation header
### MetricStore Improvements
- **Memory tracking worker**: New worker for CCMS memory usage tracking
- **Dynamic retention**: Support for cluster/subcluster-specific retention times
- **Dynamic retention**: Support for job specific dynamic retention times
- **Improved compression**: Transparent compression for job archive imports
- **Parallel processing**: Parallelized Iter function in all archive backends
@@ -65,15 +88,32 @@ For release specific notes visit the [ClusterCockpit Documentation](https://clus
- **Job tagger option**: Enable automatic job tagging via configuration flag
- **Application detection**: Automatic detection of applications (MATLAB, GROMACS, etc.)
- **Job classifaction**: Automatic detection of pathological jobs
- **Job classification**: Automatic detection of pathological jobs
- **omitTagged flag**: Option to exclude tagged jobs from retention/cleanup operations
- **Admin UI trigger**: Taggers can be run on-demand from the admin web interface
without restarting the backend
### Archive Backends
- **Parquet archive format**: New Parquet file format for job archiving, providing
columnar storage with efficient compression for analytical workloads
- **S3 backend**: Full support for S3-compatible object storage
- **SQLite backend**: Full support for SQLite backend using blobs
- **Performance improvements**: Fixed performance bugs in archive backends
- **Better error handling**: Improved error messages and fallback handling
- **Zstd compression**: Parquet writers use zstd compression for better
compression ratios compared to the previous snappy default
- **Optimized sort order**: Job and nodestate Parquet files are sorted by
cluster, subcluster, and start time for efficient range queries
### Unified Archive Retention and Format Conversion
- **Uniform retention policy**: Job archive retention now supports both JSON and
Parquet as target formats under a single, consistent policy configuration
- **Archive manager tool**: The `tools/archive-manager` utility now supports
format conversion between JSON and Parquet job archives
- **Parquet reader**: Full Parquet archive reader implementation for reading back
archived job data
## New features and improvements
@@ -85,6 +125,14 @@ For release specific notes visit the [ClusterCockpit Documentation](https://clus
- **Filter presets**: Move list filter preset to URL for easy sharing
- **Job comparison**: Improved job comparison views and plots
- **Subcluster reactivity**: Job list now reacts to subcluster filter changes
- **Short jobs quick selection**: New "Short jobs" quick-filter button in job lists
replaces the removed undocumented `minRunningFor` filter
- **Row plot cursor sync**: Cursor position is now synchronized across all metric
plots in a job list row for easier cross-metric comparison
- **Disabled metrics handling**: Improved handling and display of disabled metrics
across job view, node view, and list rows
- **"Not configured" info cards**: Informational cards shown when optional features
are not yet configured
- **Frontend dependencies**: Bumped frontend dependencies to latest versions
- **Svelte 5 compatibility**: Fixed Svelte state warnings and compatibility issues
@@ -95,6 +143,15 @@ For release specific notes visit the [ClusterCockpit Documentation](https://clus
- **Graceful shutdown**: Fixed shutdown timeout bugs and hanging issues
- **Configuration defaults**: Sensible defaults for most configuration options
- **Documentation**: Extensive documentation improvements across packages
- **Server flag in systemd unit**: Example systemd unit now includes the `-server` flag
### Security
- **LDAP security hardening**: Improved input validation, connection handling, and
error reporting in the LDAP authenticator
- **OIDC security hardening**: Stricter token validation and improved error handling
in the OIDC authenticator
- **Auth schema extensions**: Additional schema fields for improved auth configuration
### API improvements
@@ -102,6 +159,8 @@ For release specific notes visit the [ClusterCockpit Documentation](https://clus
- **Job exclusivity filter**: New filter for exclusive vs. shared jobs
- **Improved error messages**: Better error messages and documentation in REST API
- **GraphQL enhancements**: Improved GraphQL queries and resolvers
- **Stop job lookup order**: Reversed lookup order in stop job requests for
more reliable job matching (cluster+jobId first, then jobId alone)
### Performance
@@ -109,13 +168,17 @@ For release specific notes visit the [ClusterCockpit Documentation](https://clus
- **Job cache**: Introduced caching table for faster job inserts
- **Parallel imports**: Archive imports now run in parallel where possible
- **External tool integration**: Optimized use of external tools (fd) for better performance
- **Node repository queries**: Reviewed and optimized node repository SQL queries
- **Buffer pool**: Resized and pooled internal buffers for better memory reuse
### Developer experience
- **AI agent guidelines**: Added documentation for AI coding agents (AGENTS.md, CLAUDE.md)
- **Example API payloads**: Added example JSON API payloads for testing
- **Unit tests**: Added more unit tests for NATS API and other components
- **Test improvements**: Better test coverage and test data
- **Unit tests**: Added more unit tests for NATS API, node repository, and other components
- **Test improvements**: Better test coverage; test DB is now copied before unit tests
to avoid state pollution between test runs
- **Parquet writer tests**: Comprehensive tests for Parquet archive writing and conversion
## Bug fixes
@@ -132,6 +195,16 @@ For release specific notes visit the [ClusterCockpit Documentation](https://clus
- Fixed polar plot data query decoupling
- Fixed missing resolution parameter handling
- Fixed node table initialization fallback
- Fixed reactivity key placement in nodeList
- Fixed nodeList resolver data handling and increased nodestate filter cutoff
- Fixed job always being transferred to main job table before archiving
- Fixed AppTagger error handling and logging
- Fixed log endpoint formatting and correctness
- Fixed automatic refresh in metric status tab
- Fixed NULL value handling in `health_state` and `health_metrics` columns
- Fixed bugs related to `job_cache` IDs being used in the main job table
- Fixed SyncJobs bug causing start job hooks to be called with wrong (cache) IDs
- Fixed 404 handler route for sub-routers
## Configuration changes
@@ -167,6 +240,20 @@ For release specific notes visit the [ClusterCockpit Documentation](https://clus
"interval": "48h",
"directory": "./var/archive"
}
},
"archive": {
"retention": {
"policy": "delete",
"age": "6months",
"target-format": "parquet"
}
},
"nodestate": {
"retention": {
"policy": "archive",
"age": "30d",
"archive-path": "./var/nodestate-archive"
}
}
}
```
@@ -178,11 +265,13 @@ For release specific notes visit the [ClusterCockpit Documentation](https://clus
- If using S3 archive backend, configure the new `archive` section options
- Test the new public dashboard at `/public` route
- Review cron worker configuration if you need different frequencies
- If using the archive retention feature, configure the `target-format` option
to choose between `json` (default) and `parquet` output formats
- Consider enabling nodestate retention if you track node states over time
## Known issues
- Currently energy footprint metrics of type energy are ignored for calculating
total energy.
- Resampling for running jobs only works with cc-metric-store
- With energy footprint metrics of type power the unit is ignored and it is
assumed the metric has the unit Watt.

View File

@@ -369,13 +369,11 @@ func runServer(ctx context.Context) error {
errChan := make(chan error, 1)
// Start HTTP server
wg.Add(1)
go func() {
defer wg.Done()
wg.Go(func() {
if err := srv.Start(ctx); err != nil {
errChan <- err
}
}()
})
// Handle shutdown signals
wg.Add(1)

View File

@@ -0,0 +1,25 @@
{
"name": "High memory usage",
"tag": "highmemory",
"parameters": [
"highmemoryusage_threshold_factor",
"job_min_duration_seconds"
],
"metrics": ["mem_used"],
"requirements": [
"job.shared == \"none\"",
"job.duration > job_min_duration_seconds"
],
"variables": [
{
"name": "memory_threshold",
"expr": "mem_used.limits.peak * highmemoryusage_threshold_factor"
},
{
"name": "memory_usage_pct",
"expr": "mem_used.max / mem_used.limits.peak * 100.0"
}
],
"rule": "mem_used.max > memory_threshold",
"hint": "This job used high memory: peak memory usage {{.mem_used.max}} GB ({{.memory_usage_pct}}% of {{.mem_used.limits.peak}} GB node capacity), exceeding the {{.highmemoryusage_threshold_factor}} utilization threshold. Risk of out-of-memory conditions."
}

View File

@@ -3,8 +3,7 @@
"tag": "excessiveload",
"parameters": [
"excessivecpuload_threshold_factor",
"job_min_duration_seconds",
"sampling_interval_seconds"
"job_min_duration_seconds"
],
"metrics": ["cpu_load"],
"requirements": [
@@ -15,12 +14,8 @@
{
"name": "load_threshold",
"expr": "cpu_load.limits.peak * excessivecpuload_threshold_factor"
},
{
"name": "load_perc",
"expr": "1.0 - (cpu_load.avg / cpu_load.limits.peak)"
}
],
"rule": "cpu_load.avg > load_threshold",
"hint": "This job was detected as excessiveload because the average cpu load {{.cpu_load.avg}} falls above the threshold {{.load_threshold}}."
"hint": "This job was detected as having excessive CPU load: average cpu load {{.cpu_load.avg}} exceeds the oversubscription threshold {{.load_threshold}} ({{.excessivecpuload_threshold_factor}} \u00d7 {{.cpu_load.limits.peak}} peak cores), indicating CPU contention."
}

View File

@@ -1,5 +1,5 @@
{
"name": "Low ressource utilization",
"name": "Low resource utilization",
"tag": "lowutilization",
"parameters": ["job_min_duration_seconds"],
"metrics": ["flops_any", "mem_bw"],
@@ -9,14 +9,14 @@
],
"variables": [
{
"name": "mem_bw_perc",
"expr": "1.0 - (mem_bw.avg / mem_bw.limits.peak)"
"name": "mem_bw_pct",
"expr": "mem_bw.avg / mem_bw.limits.peak * 100.0"
},
{
"name": "flops_any_perc",
"expr": "1.0 - (flops_any.avg / flops_any.limits.peak)"
"name": "flops_any_pct",
"expr": "flops_any.avg / flops_any.limits.peak * 100.0"
}
],
"rule": "flops_any.avg < flops_any.limits.alert && mem_bw.avg < mem_bw.limits.alert",
"hint": "This job was detected as low utilization because the average flop rate {{.flops_any.avg}} falls below the threshold {{.flops_any.limits.alert}}."
"hint": "This job shows low resource utilization: FLOP rate {{.flops_any.avg}} GF/s ({{.flops_any_pct}}% of peak) and memory bandwidth {{.mem_bw.avg}} GB/s ({{.mem_bw_pct}}% of peak) are both below their alert thresholds."
}

View File

@@ -3,8 +3,7 @@
"tag": "lowload",
"parameters": [
"lowcpuload_threshold_factor",
"job_min_duration_seconds",
"sampling_interval_seconds"
"job_min_duration_seconds"
],
"metrics": ["cpu_load"],
"requirements": [
@@ -15,12 +14,8 @@
{
"name": "load_threshold",
"expr": "job.numCores * lowcpuload_threshold_factor"
},
{
"name": "load_perc",
"expr": "1.0 - (cpu_load.avg / cpu_load.limits.peak)"
}
],
"rule": "cpu_load.avg < cpu_load.limits.caution",
"hint": "This job was detected as lowload because the average cpu load {{.cpu_load}} falls below the threshold {{.cpu_load.limits.caution}}."
"rule": "cpu_load.avg < load_threshold",
"hint": "This job was detected as low CPU load: average cpu load {{.cpu_load.avg}} is below the threshold {{.load_threshold}} ({{.lowcpuload_threshold_factor}} \u00d7 {{.job.numCores}} allocated cores)."
}

View File

@@ -0,0 +1,22 @@
{
"name": "Memory bandwidth bound",
"tag": "memorybound",
"parameters": ["membound_bw_threshold_factor", "job_min_duration_seconds"],
"metrics": ["mem_bw"],
"requirements": [
"job.shared == \"none\"",
"job.duration > job_min_duration_seconds"
],
"variables": [
{
"name": "mem_bw_threshold",
"expr": "mem_bw.limits.peak * membound_bw_threshold_factor"
},
{
"name": "mem_bw_pct",
"expr": "mem_bw.avg / mem_bw.limits.peak * 100.0"
}
],
"rule": "mem_bw.avg > mem_bw_threshold",
"hint": "This job is memory bandwidth bound: memory bandwidth {{.mem_bw.avg}} GB/s ({{.mem_bw_pct}}% of peak) is within {{.membound_bw_threshold_factor}} of peak bandwidth. Consider improving data reuse or compute intensity."
}

View File

@@ -1,11 +1,12 @@
{
"lowcpuload_threshold_factor": 0.9,
"excessivecpuload_threshold_factor": 1.1,
"lowcpuload_threshold_factor": 0.85,
"excessivecpuload_threshold_factor": 1.2,
"highmemoryusage_threshold_factor": 0.9,
"node_load_imbalance_threshold_factor": 0.1,
"core_load_imbalance_threshold_factor": 0.1,
"high_memory_load_threshold_factor": 0.9,
"lowgpuload_threshold_factor": 0.7,
"membound_bw_threshold_factor": 0.8,
"memory_leak_slope_threshold": 0.1,
"job_min_duration_seconds": 600.0,
"sampling_interval_seconds": 30.0,

View File

@@ -488,3 +488,163 @@ func TestRestApi(t *testing.T) {
}
})
}
// TestStopJobWithReusedJobId verifies that stopping a recently started job works
// even when an older job with the same jobId exists in the job table (e.g. with
// state "failed"). This is a regression test for the bug where Find() on the job
// table would match the old job instead of the new one still in job_cache.
func TestStopJobWithReusedJobId(t *testing.T) {
restapi := setup(t)
t.Cleanup(cleanup)
testData := schema.JobData{
"load_one": map[schema.MetricScope]*schema.JobMetric{
schema.MetricScopeNode: {
Unit: schema.Unit{Base: "load"},
Timestep: 60,
Series: []schema.Series{
{
Hostname: "host123",
Statistics: schema.MetricStatistics{Min: 0.1, Avg: 0.2, Max: 0.3},
Data: []schema.Float{0.1, 0.1, 0.1, 0.2, 0.2, 0.2, 0.3, 0.3, 0.3},
},
},
},
},
}
metricstore.TestLoadDataCallback = func(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context, resolution int) (schema.JobData, error) {
return testData, nil
}
r := chi.NewRouter()
restapi.MountAPIRoutes(r)
const contextUserKey repository.ContextKey = "user"
contextUserValue := &schema.User{
Username: "testuser",
Projects: make([]string, 0),
Roles: []string{"user"},
AuthType: 0,
AuthSource: 2,
}
// Step 1: Start the first job (jobId=999)
const startJobBody1 string = `{
"jobId": 999,
"user": "testuser",
"project": "testproj",
"cluster": "testcluster",
"partition": "default",
"walltime": 3600,
"numNodes": 1,
"numHwthreads": 8,
"numAcc": 0,
"shared": "none",
"monitoringStatus": 1,
"smt": 1,
"resources": [{"hostname": "host123", "hwthreads": [0, 1, 2, 3, 4, 5, 6, 7]}],
"startTime": 200000000
}`
if ok := t.Run("StartFirstJob", func(t *testing.T) {
req := httptest.NewRequest(http.MethodPost, "/jobs/start_job/", bytes.NewBuffer([]byte(startJobBody1)))
recorder := httptest.NewRecorder()
ctx := context.WithValue(req.Context(), contextUserKey, contextUserValue)
r.ServeHTTP(recorder, req.WithContext(ctx))
if recorder.Result().StatusCode != http.StatusCreated {
t.Fatal(recorder.Result().Status, recorder.Body.String())
}
}); !ok {
return
}
// Step 2: Sync to move job from cache to job table, then stop it as "failed"
time.Sleep(1 * time.Second)
restapi.JobRepository.SyncJobs()
const stopJobBody1 string = `{
"jobId": 999,
"startTime": 200000000,
"cluster": "testcluster",
"jobState": "failed",
"stopTime": 200001000
}`
if ok := t.Run("StopFirstJobAsFailed", func(t *testing.T) {
req := httptest.NewRequest(http.MethodPost, "/jobs/stop_job/", bytes.NewBuffer([]byte(stopJobBody1)))
recorder := httptest.NewRecorder()
ctx := context.WithValue(req.Context(), contextUserKey, contextUserValue)
r.ServeHTTP(recorder, req.WithContext(ctx))
if recorder.Result().StatusCode != http.StatusOK {
t.Fatal(recorder.Result().Status, recorder.Body.String())
}
jobid, cluster := int64(999), "testcluster"
job, err := restapi.JobRepository.Find(&jobid, &cluster, nil)
if err != nil {
t.Fatal(err)
}
if job.State != schema.JobStateFailed {
t.Fatalf("expected first job to be failed, got: %s", job.State)
}
}); !ok {
return
}
// Wait for archiving to complete
time.Sleep(1 * time.Second)
// Step 3: Start a NEW job with the same jobId=999 but different startTime.
// This job will sit in job_cache (not yet synced).
const startJobBody2 string = `{
"jobId": 999,
"user": "testuser",
"project": "testproj",
"cluster": "testcluster",
"partition": "default",
"walltime": 3600,
"numNodes": 1,
"numHwthreads": 8,
"numAcc": 0,
"shared": "none",
"monitoringStatus": 1,
"smt": 1,
"resources": [{"hostname": "host123", "hwthreads": [0, 1, 2, 3, 4, 5, 6, 7]}],
"startTime": 300000000
}`
if ok := t.Run("StartSecondJob", func(t *testing.T) {
req := httptest.NewRequest(http.MethodPost, "/jobs/start_job/", bytes.NewBuffer([]byte(startJobBody2)))
recorder := httptest.NewRecorder()
ctx := context.WithValue(req.Context(), contextUserKey, contextUserValue)
r.ServeHTTP(recorder, req.WithContext(ctx))
if recorder.Result().StatusCode != http.StatusCreated {
t.Fatal(recorder.Result().Status, recorder.Body.String())
}
}); !ok {
return
}
// Step 4: Stop the second job WITHOUT syncing first.
// Before the fix, this would fail because Find() on the job table would
// match the old failed job (jobId=999) and reject with "already stopped".
const stopJobBody2 string = `{
"jobId": 999,
"startTime": 300000000,
"cluster": "testcluster",
"jobState": "completed",
"stopTime": 300001000
}`
t.Run("StopSecondJobBeforeSync", func(t *testing.T) {
req := httptest.NewRequest(http.MethodPost, "/jobs/stop_job/", bytes.NewBuffer([]byte(stopJobBody2)))
recorder := httptest.NewRecorder()
ctx := context.WithValue(req.Context(), contextUserKey, contextUserValue)
r.ServeHTTP(recorder, req.WithContext(ctx))
if recorder.Result().StatusCode != http.StatusOK {
t.Fatalf("expected stop to succeed for cached job, got: %s %s",
recorder.Result().Status, recorder.Body.String())
}
})
}

View File

@@ -697,7 +697,15 @@ func (api *RestAPI) startJob(rw http.ResponseWriter, r *http.Request) {
}
}
id, err := api.JobRepository.Start(&req)
// When tags are present, insert directly into the job table so that the
// returned ID can be used with AddTagOrCreate (which queries the job table).
// Jobs without tags use the cache path as before.
var id int64
if len(req.Tags) > 0 {
id, err = api.JobRepository.StartDirect(&req)
} else {
id, err = api.JobRepository.Start(&req)
}
if err != nil {
handleError(fmt.Errorf("insert into database failed: %w", err), http.StatusInternalServerError, rw)
return
@@ -755,16 +763,15 @@ func (api *RestAPI) stopJobByRequest(rw http.ResponseWriter, r *http.Request) {
}
isCached := false
job, err = api.JobRepository.Find(req.JobID, req.Cluster, req.StartTime)
job, err = api.JobRepository.FindCached(req.JobID, req.Cluster, req.StartTime)
if err != nil {
// Try cached jobs if not found in main repository
cachedJob, cachedErr := api.JobRepository.FindCached(req.JobID, req.Cluster, req.StartTime)
if cachedErr != nil {
// Combine both errors for better debugging
handleError(fmt.Errorf("finding job failed: %w (cached lookup also failed: %v)", err, cachedErr), http.StatusNotFound, rw)
// Not in cache, try main job table
job, err = api.JobRepository.Find(req.JobID, req.Cluster, req.StartTime)
if err != nil {
handleError(fmt.Errorf("finding job failed: %w", err), http.StatusNotFound, rw)
return
}
job = cachedJob
} else {
isCached = true
}
@@ -897,11 +904,13 @@ func (api *RestAPI) deleteJobBefore(rw http.ResponseWriter, r *http.Request) {
}
// Check for omit-tagged query parameter
omitTagged := false
omitTagged := "none"
if omitTaggedStr := r.URL.Query().Get("omit-tagged"); omitTaggedStr != "" {
omitTagged, e = strconv.ParseBool(omitTaggedStr)
if e != nil {
handleError(fmt.Errorf("boolean expected for omit-tagged parameter: %w", e), http.StatusBadRequest, rw)
switch omitTaggedStr {
case "none", "all", "user":
omitTagged = omitTaggedStr
default:
handleError(fmt.Errorf("omit-tagged must be one of: none, all, user"), http.StatusBadRequest, rw)
return
}
}

View File

@@ -211,7 +211,14 @@ func (api *NatsAPI) handleStartJob(payload string) {
}
}
id, err := api.JobRepository.Start(&req)
// When tags are present, insert directly into the job table so that the
// returned ID can be used with AddTagOrCreate (which queries the job table).
var id int64
if len(req.Tags) > 0 {
id, err = api.JobRepository.StartDirect(&req)
} else {
id, err = api.JobRepository.Start(&req)
}
if err != nil {
cclog.Errorf("NATS start job: insert into database failed: %v", err)
return
@@ -252,15 +259,15 @@ func (api *NatsAPI) handleStopJob(payload string) {
}
isCached := false
job, err := api.JobRepository.Find(req.JobID, req.Cluster, req.StartTime)
job, err := api.JobRepository.FindCached(req.JobID, req.Cluster, req.StartTime)
if err != nil {
cachedJob, cachedErr := api.JobRepository.FindCached(req.JobID, req.Cluster, req.StartTime)
if cachedErr != nil {
cclog.Errorf("NATS job stop: finding job failed: %v (cached lookup also failed: %v)",
err, cachedErr)
// Not in cache, try main job table
job, err = api.JobRepository.Find(req.JobID, req.Cluster, req.StartTime)
if err != nil {
cclog.Errorf("NATS job stop: finding job failed: %v", err)
return
}
job = cachedJob
} else {
isCached = true
}

View File

@@ -113,8 +113,6 @@ func (api *RestAPI) updateNodeStates(rw http.ResponseWriter, r *http.Request) {
}
}
fmt.Printf("Result: %#v\n", healthResults)
cclog.Debugf("Timer updateNodeStates, MemStore HealthCheck: %s", time.Since(startMs))
startDB := time.Now()

View File

@@ -22,6 +22,7 @@ import (
"github.com/ClusterCockpit/cc-backend/internal/auth"
"github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-backend/internal/repository"
"github.com/ClusterCockpit/cc-backend/internal/tagger"
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
"github.com/ClusterCockpit/cc-lib/v2/schema"
"github.com/ClusterCockpit/cc-lib/v2/util"
@@ -152,6 +153,8 @@ func (api *RestAPI) MountConfigAPIRoutes(r chi.Router) {
r.Delete("/config/users/", api.deleteUser)
r.Post("/config/user/{id}", api.updateUser)
r.Post("/config/notice/", api.editNotice)
r.Get("/config/taggers/", api.getTaggers)
r.Post("/config/taggers/run/", api.runTagger)
}
}
@@ -268,6 +271,42 @@ func (api *RestAPI) editNotice(rw http.ResponseWriter, r *http.Request) {
}
}
func (api *RestAPI) getTaggers(rw http.ResponseWriter, r *http.Request) {
if user := repository.GetUserFromContext(r.Context()); !user.HasRole(schema.RoleAdmin) {
handleError(fmt.Errorf("only admins are allowed to list taggers"), http.StatusForbidden, rw)
return
}
rw.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(rw).Encode(tagger.ListTaggers()); err != nil {
cclog.Errorf("Failed to encode tagger list: %v", err)
}
}
func (api *RestAPI) runTagger(rw http.ResponseWriter, r *http.Request) {
if user := repository.GetUserFromContext(r.Context()); !user.HasRole(schema.RoleAdmin) {
handleError(fmt.Errorf("only admins are allowed to run taggers"), http.StatusForbidden, rw)
return
}
name := r.FormValue("name")
if name == "" {
handleError(fmt.Errorf("missing required parameter: name"), http.StatusBadRequest, rw)
return
}
if err := tagger.RunTaggerByName(name); err != nil {
handleError(err, http.StatusConflict, rw)
return
}
rw.Header().Set("Content-Type", "text/plain")
rw.WriteHeader(http.StatusOK)
if _, err := rw.Write([]byte(fmt.Sprintf("Tagger %s started", name))); err != nil {
cclog.Errorf("Failed to write response: %v", err)
}
}
// getJWT godoc
// @summary Generate JWT token
// @tags Frontend

View File

@@ -102,7 +102,7 @@ func HandleImportFlag(flag string) error {
return err
}
id, err := r.InsertJob(&job)
id, err := r.InsertJobDirect(&job)
if err != nil {
cclog.Warn("Error while job db insert")
return err

View File

@@ -165,7 +165,7 @@ func TestHandleImportFlag(t *testing.T) {
}
result := readResult(t, testname)
job, err := r.FindCached(&result.JobId, &result.Cluster, &result.StartTime)
job, err := r.Find(&result.JobId, &result.Cluster, &result.StartTime)
if err != nil {
t.Fatal(err)
}

View File

@@ -392,15 +392,19 @@ func (r *JobRepository) FetchEnergyFootprint(job *schema.Job) (map[string]float6
//
// Parameters:
// - startTime: Unix timestamp, jobs with start_time < this value will be deleted
// - omitTagged: If true, skip jobs that have associated tags (jobtag entries)
// - omitTagged: "none" = delete all jobs, "all" = skip any tagged jobs,
// "user" = skip jobs with user-created tags (not auto-tagger types "app"/"jobClass")
//
// Returns the count of deleted jobs or an error if the operation fails.
func (r *JobRepository) DeleteJobsBefore(startTime int64, omitTagged bool) (int, error) {
func (r *JobRepository) DeleteJobsBefore(startTime int64, omitTagged string) (int, error) {
var cnt int
q := sq.Select("count(*)").From("job").Where("job.start_time < ?", startTime)
if omitTagged {
switch omitTagged {
case "all":
q = q.Where("NOT EXISTS (SELECT 1 FROM jobtag WHERE jobtag.job_id = job.id)")
case "user":
q = q.Where("NOT EXISTS (SELECT 1 FROM jobtag JOIN tag ON tag.id = jobtag.tag_id WHERE jobtag.job_id = job.id AND tag.tag_type NOT IN ('app', 'jobClass'))")
}
if err := q.RunWith(r.DB).QueryRow().Scan(&cnt); err != nil {
@@ -413,8 +417,11 @@ func (r *JobRepository) DeleteJobsBefore(startTime int64, omitTagged bool) (int,
var jobIds []int64
selectQuery := sq.Select("id").From("job").Where("job.start_time < ?", startTime)
if omitTagged {
switch omitTagged {
case "all":
selectQuery = selectQuery.Where("NOT EXISTS (SELECT 1 FROM jobtag WHERE jobtag.job_id = job.id)")
case "user":
selectQuery = selectQuery.Where("NOT EXISTS (SELECT 1 FROM jobtag JOIN tag ON tag.id = jobtag.tag_id WHERE jobtag.job_id = job.id AND tag.tag_type NOT IN ('app', 'jobClass'))")
}
rows, err := selectQuery.RunWith(r.DB).Query()
@@ -436,8 +443,11 @@ func (r *JobRepository) DeleteJobsBefore(startTime int64, omitTagged bool) (int,
qd := sq.Delete("job").Where("job.start_time < ?", startTime)
if omitTagged {
switch omitTagged {
case "all":
qd = qd.Where("NOT EXISTS (SELECT 1 FROM jobtag WHERE jobtag.job_id = job.id)")
case "user":
qd = qd.Where("NOT EXISTS (SELECT 1 FROM jobtag JOIN tag ON tag.id = jobtag.tag_id WHERE jobtag.job_id = job.id AND tag.tag_type NOT IN ('app', 'jobClass'))")
}
_, err := qd.RunWith(r.DB).Exec()
@@ -822,10 +832,11 @@ func (r *JobRepository) UpdateDuration() error {
// Parameters:
// - startTimeBegin: Unix timestamp for range start (use 0 for unbounded start)
// - startTimeEnd: Unix timestamp for range end
// - omitTagged: If true, exclude jobs with associated tags
// - omitTagged: "none" = include all jobs, "all" = exclude any tagged jobs,
// "user" = exclude jobs with user-created tags (not auto-tagger types "app"/"jobClass")
//
// Returns a slice of jobs or an error if the time range is invalid or query fails.
func (r *JobRepository) FindJobsBetween(startTimeBegin int64, startTimeEnd int64, omitTagged bool) ([]*schema.Job, error) {
func (r *JobRepository) FindJobsBetween(startTimeBegin int64, startTimeEnd int64, omitTagged string) ([]*schema.Job, error) {
var query sq.SelectBuilder
if startTimeBegin == startTimeEnd || startTimeBegin > startTimeEnd {
@@ -840,8 +851,11 @@ func (r *JobRepository) FindJobsBetween(startTimeBegin int64, startTimeEnd int64
query = sq.Select(jobColumns...).From("job").Where("job.start_time BETWEEN ? AND ?", startTimeBegin, startTimeEnd)
}
if omitTagged {
switch omitTagged {
case "all":
query = query.Where("NOT EXISTS (SELECT 1 FROM jobtag WHERE jobtag.job_id = job.id)")
case "user":
query = query.Where("NOT EXISTS (SELECT 1 FROM jobtag JOIN tag ON tag.id = jobtag.tag_id WHERE jobtag.job_id = job.id AND tag.tag_type NOT IN ('app', 'jobClass'))")
}
query = query.OrderBy("job.cluster ASC", "job.subcluster ASC", "job.project ASC", "job.start_time ASC")

View File

@@ -30,6 +30,27 @@ const NamedJobInsert string = `INSERT INTO job (
:shared, :monitoring_status, :smt, :job_state, :start_time, :duration, :walltime, :footprint, :energy, :energy_footprint, :resources, :meta_data
);`
// InsertJobDirect inserts a job directly into the job table (not job_cache).
// Use this when the returned ID will be used for operations on the job table
// (e.g., adding tags), or for imported jobs that are already completed.
func (r *JobRepository) InsertJobDirect(job *schema.Job) (int64, error) {
r.Mutex.Lock()
defer r.Mutex.Unlock()
res, err := r.DB.NamedExec(NamedJobInsert, job)
if err != nil {
cclog.Warn("Error while NamedJobInsert (direct)")
return 0, err
}
id, err := res.LastInsertId()
if err != nil {
cclog.Warn("Error while getting last insert ID (direct)")
return 0, err
}
return id, nil
}
func (r *JobRepository) InsertJob(job *schema.Job) (int64, error) {
r.Mutex.Lock()
defer r.Mutex.Unlock()
@@ -85,6 +106,22 @@ func (r *JobRepository) SyncJobs() ([]*schema.Job, error) {
return nil, err
}
// Resolve correct job.id from the job table. The IDs read from job_cache
// are from a different auto-increment sequence and must not be used to
// query the job table.
for _, job := range jobs {
var newID int64
if err := sq.Select("job.id").From("job").
Where("job.job_id = ? AND job.cluster = ? AND job.start_time = ?",
job.JobID, job.Cluster, job.StartTime).
RunWith(r.stmtCache).QueryRow().Scan(&newID); err != nil {
cclog.Warnf("SyncJobs: could not resolve job table id for job %d on %s: %v",
job.JobID, job.Cluster, err)
continue
}
job.ID = &newID
}
return jobs, nil
}
@@ -132,6 +169,28 @@ func (r *JobRepository) Start(job *schema.Job) (id int64, err error) {
return r.InsertJob(job)
}
// StartDirect inserts a new job directly into the job table (not job_cache).
// Use this when the returned ID will immediately be used for job table
// operations such as adding tags.
func (r *JobRepository) StartDirect(job *schema.Job) (id int64, err error) {
job.RawFootprint, err = json.Marshal(job.Footprint)
if err != nil {
return -1, fmt.Errorf("REPOSITORY/JOB > encoding footprint field failed: %w", err)
}
job.RawResources, err = json.Marshal(job.Resources)
if err != nil {
return -1, fmt.Errorf("REPOSITORY/JOB > encoding resources field failed: %w", err)
}
job.RawMetaData, err = json.Marshal(job.MetaData)
if err != nil {
return -1, fmt.Errorf("REPOSITORY/JOB > encoding metaData field failed: %w", err)
}
return r.InsertJobDirect(job)
}
// Stop updates the job with the database id jobId using the provided arguments.
func (r *JobRepository) Stop(
jobID int64,

View File

@@ -489,6 +489,34 @@ func TestSyncJobs(t *testing.T) {
require.NoError(t, err)
})
t.Run("sync returns job table IDs not cache IDs", func(t *testing.T) {
// Ensure cache is empty first
_, err := r.DB.Exec("DELETE FROM job_cache")
require.NoError(t, err)
// Insert a job into job_cache
job := createTestJob(999015, "testcluster")
cacheID, err := r.Start(job)
require.NoError(t, err)
// Sync jobs
jobs, err := r.SyncJobs()
require.NoError(t, err)
require.Equal(t, 1, len(jobs))
// The returned ID must refer to the job table, not job_cache
var jobTableID int64
err = r.DB.QueryRow("SELECT id FROM job WHERE job_id = ? AND cluster = ? AND start_time = ?",
jobs[0].JobID, jobs[0].Cluster, jobs[0].StartTime).Scan(&jobTableID)
require.NoError(t, err)
assert.Equal(t, jobTableID, *jobs[0].ID,
"returned ID should match the job table row, not the cache ID (%d)", cacheID)
// Clean up
_, err = r.DB.Exec("DELETE FROM job WHERE job_id = ? AND cluster = ?", job.JobID, job.Cluster)
require.NoError(t, err)
})
t.Run("sync with empty cache returns empty list", func(t *testing.T) {
// Ensure cache is empty
_, err := r.DB.Exec("DELETE FROM job_cache")
@@ -500,3 +528,80 @@ func TestSyncJobs(t *testing.T) {
assert.Equal(t, 0, len(jobs), "Should return empty list when cache is empty")
})
}
func TestInsertJobDirect(t *testing.T) {
r := setup(t)
t.Run("inserts into job table not cache", func(t *testing.T) {
job := createTestJob(999020, "testcluster")
job.RawResources, _ = json.Marshal(job.Resources)
job.RawFootprint, _ = json.Marshal(job.Footprint)
job.RawMetaData, _ = json.Marshal(job.MetaData)
id, err := r.InsertJobDirect(job)
require.NoError(t, err, "InsertJobDirect should succeed")
assert.Greater(t, id, int64(0), "Should return valid insert ID")
// Verify job is in job table
var count int
err = r.DB.QueryRow("SELECT COUNT(*) FROM job WHERE id = ?", id).Scan(&count)
require.NoError(t, err)
assert.Equal(t, 1, count, "Job should be in job table")
// Verify job is NOT in job_cache
err = r.DB.QueryRow("SELECT COUNT(*) FROM job_cache WHERE job_id = ? AND cluster = ?",
job.JobID, job.Cluster).Scan(&count)
require.NoError(t, err)
assert.Equal(t, 0, count, "Job should NOT be in job_cache")
// Clean up
_, err = r.DB.Exec("DELETE FROM job WHERE id = ?", id)
require.NoError(t, err)
})
t.Run("returned ID works for tag operations", func(t *testing.T) {
job := createTestJob(999021, "testcluster")
job.RawResources, _ = json.Marshal(job.Resources)
job.RawFootprint, _ = json.Marshal(job.Footprint)
job.RawMetaData, _ = json.Marshal(job.MetaData)
id, err := r.InsertJobDirect(job)
require.NoError(t, err)
// Adding a tag using the returned ID should succeed (FK constraint on jobtag)
err = r.ImportTag(id, "test_type", "test_name", "global")
require.NoError(t, err, "ImportTag should succeed with direct insert ID")
// Clean up
_, err = r.DB.Exec("DELETE FROM jobtag WHERE job_id = ?", id)
require.NoError(t, err)
_, err = r.DB.Exec("DELETE FROM job WHERE id = ?", id)
require.NoError(t, err)
})
}
func TestStartDirect(t *testing.T) {
r := setup(t)
t.Run("inserts into job table with JSON encoding", func(t *testing.T) {
job := createTestJob(999022, "testcluster")
id, err := r.StartDirect(job)
require.NoError(t, err, "StartDirect should succeed")
assert.Greater(t, id, int64(0))
// Verify job is in job table with encoded JSON
var rawResources []byte
err = r.DB.QueryRow("SELECT resources FROM job WHERE id = ?", id).Scan(&rawResources)
require.NoError(t, err)
var resources []*schema.Resource
err = json.Unmarshal(rawResources, &resources)
require.NoError(t, err, "Resources should be valid JSON")
assert.Equal(t, "node01", resources[0].Hostname)
// Clean up
_, err = r.DB.Exec("DELETE FROM job WHERE id = ?", id)
require.NoError(t, err)
})
}

View File

@@ -78,7 +78,7 @@ func TestFindJobsBetween(t *testing.T) {
// 1. Find a job to use (Find all jobs)
// We use a large time range to ensure we get something if it exists
jobs, err := r.FindJobsBetween(0, 9999999999, false)
jobs, err := r.FindJobsBetween(0, 9999999999, "none")
if err != nil {
t.Fatal(err)
}
@@ -88,21 +88,21 @@ func TestFindJobsBetween(t *testing.T) {
targetJob := jobs[0]
// 2. Create a tag
tagName := fmt.Sprintf("testtag_%d", time.Now().UnixNano())
tagID, err := r.CreateTag("testtype", tagName, "global")
// 2. Create an auto-tagger tag (type "app")
appTagName := fmt.Sprintf("apptag_%d", time.Now().UnixNano())
appTagID, err := r.CreateTag("app", appTagName, "global")
if err != nil {
t.Fatal(err)
}
// 3. Link Tag (Manually to avoid archive dependency side-effects in unit test)
_, err = r.DB.Exec("INSERT INTO jobtag (job_id, tag_id) VALUES (?, ?)", *targetJob.ID, tagID)
// 3. Link auto-tagger tag to job
_, err = r.DB.Exec("INSERT INTO jobtag (job_id, tag_id) VALUES (?, ?)", *targetJob.ID, appTagID)
if err != nil {
t.Fatal(err)
}
// 4. Search with omitTagged = false (Should find the job)
jobsFound, err := r.FindJobsBetween(0, 9999999999, false)
// 4. Search with omitTagged = "none" (Should find the job)
jobsFound, err := r.FindJobsBetween(0, 9999999999, "none")
if err != nil {
t.Fatal(err)
}
@@ -115,18 +115,58 @@ func TestFindJobsBetween(t *testing.T) {
}
}
if !found {
t.Errorf("Target job %d should be found when omitTagged=false", *targetJob.ID)
t.Errorf("Target job %d should be found when omitTagged=none", *targetJob.ID)
}
// 5. Search with omitTagged = true (Should NOT find the job)
jobsFiltered, err := r.FindJobsBetween(0, 9999999999, true)
// 5. Search with omitTagged = "all" (Should NOT find the job — it has a tag)
jobsFiltered, err := r.FindJobsBetween(0, 9999999999, "all")
if err != nil {
t.Fatal(err)
}
for _, j := range jobsFiltered {
if *j.ID == *targetJob.ID {
t.Errorf("Target job %d should NOT be found when omitTagged=true", *targetJob.ID)
t.Errorf("Target job %d should NOT be found when omitTagged=all", *targetJob.ID)
}
}
// 6. Search with omitTagged = "user": auto-tagger tag ("app") should NOT exclude the job
jobsUserFilter, err := r.FindJobsBetween(0, 9999999999, "user")
if err != nil {
t.Fatal(err)
}
found = false
for _, j := range jobsUserFilter {
if *j.ID == *targetJob.ID {
found = true
break
}
}
if !found {
t.Errorf("Target job %d should be found when omitTagged=user (only has auto-tagger tag)", *targetJob.ID)
}
// 7. Add a user-created tag (type "testtype") to the same job
userTagName := fmt.Sprintf("usertag_%d", time.Now().UnixNano())
userTagID, err := r.CreateTag("testtype", userTagName, "global")
if err != nil {
t.Fatal(err)
}
_, err = r.DB.Exec("INSERT INTO jobtag (job_id, tag_id) VALUES (?, ?)", *targetJob.ID, userTagID)
if err != nil {
t.Fatal(err)
}
// 8. Now omitTagged = "user" should exclude the job (has a user-created tag)
jobsUserFilter2, err := r.FindJobsBetween(0, 9999999999, "user")
if err != nil {
t.Fatal(err)
}
for _, j := range jobsUserFilter2 {
if *j.ID == *targetJob.ID {
t.Errorf("Target job %d should NOT be found when omitTagged=user (has user-created tag)", *targetJob.ID)
}
}
}

View File

@@ -274,7 +274,7 @@ type NodeStateWithNode struct {
func (r *NodeRepository) FindNodeStatesBefore(cutoff int64) ([]NodeStateWithNode, error) {
rows, err := sq.Select(
"node_state.id", "node_state.time_stamp", "node_state.node_state",
"node_state.health_state", "COALESCE(node_state.health_metrics, '')",
"node_state.health_state", "node_state.health_metrics",
"node_state.cpus_allocated", "node_state.memory_allocated",
"node_state.gpus_allocated", "node_state.jobs_running",
"node.hostname", "node.cluster", "node.subcluster",
@@ -293,13 +293,15 @@ func (r *NodeRepository) FindNodeStatesBefore(cutoff int64) ([]NodeStateWithNode
var result []NodeStateWithNode
for rows.Next() {
var ns NodeStateWithNode
var healthMetrics sql.NullString
if err := rows.Scan(&ns.ID, &ns.TimeStamp, &ns.NodeState,
&ns.HealthState, &ns.HealthMetrics,
&ns.HealthState, &healthMetrics,
&ns.CpusAllocated, &ns.MemoryAllocated,
&ns.GpusAllocated, &ns.JobsRunning,
&ns.Hostname, &ns.Cluster, &ns.SubCluster); err != nil {
return nil, err
}
ns.HealthMetrics = healthMetrics.String
result = append(result, ns)
}
return result, nil

View File

@@ -19,6 +19,14 @@ import (
"github.com/ClusterCockpit/cc-lib/v2/util"
)
func metadataKeys(m map[string]string) []string {
keys := make([]string, 0, len(m))
for k := range m {
keys = append(keys, k)
}
return keys
}
const (
// defaultConfigPath is the default path for application tagging configuration
defaultConfigPath = "./var/tagger/apps"
@@ -158,31 +166,54 @@ func (t *AppTagger) Register() error {
// Only the first matching application is tagged.
func (t *AppTagger) Match(job *schema.Job) {
r := repository.GetJobRepository()
if len(t.apps) == 0 {
cclog.Warn("AppTagger: no app patterns loaded, skipping match")
return
}
metadata, err := r.FetchMetadata(job)
if err != nil {
cclog.Infof("Cannot fetch metadata for job: %d on %s", job.JobID, job.Cluster)
cclog.Infof("AppTagger: cannot fetch metadata for job %d on %s: %v", job.JobID, job.Cluster, err)
return
}
if metadata == nil {
cclog.Infof("AppTagger: metadata is nil for job %d on %s", job.JobID, job.Cluster)
return
}
jobscript, ok := metadata["jobScript"]
if ok {
id := *job.ID
jobscriptLower := strings.ToLower(jobscript)
if !ok {
cclog.Infof("AppTagger: no 'jobScript' key in metadata for job %d on %s (keys: %v)",
job.JobID, job.Cluster, metadataKeys(metadata))
return
}
out:
for _, a := range t.apps {
for _, re := range a.patterns {
if re.MatchString(jobscriptLower) {
if !r.HasTag(id, t.tagType, a.tag) {
if _, err := r.AddTagOrCreateDirect(id, t.tagType, a.tag); err != nil {
cclog.Errorf("AppTagger: failed to add tag '%s' to job %d: %v", a.tag, id, err)
}
if len(jobscript) == 0 {
cclog.Infof("AppTagger: empty jobScript for job %d on %s", job.JobID, job.Cluster)
return
}
id := *job.ID
jobscriptLower := strings.ToLower(jobscript)
cclog.Debugf("AppTagger: matching job %d (script length: %d) against %d apps", id, len(jobscriptLower), len(t.apps))
for _, a := range t.apps {
for _, re := range a.patterns {
if re.MatchString(jobscriptLower) {
if r.HasTag(id, t.tagType, a.tag) {
cclog.Debugf("AppTagger: job %d already has tag %s:%s, skipping", id, t.tagType, a.tag)
} else {
cclog.Infof("AppTagger: pattern '%s' matched for app '%s' on job %d", re.String(), a.tag, id)
if _, err := r.AddTagOrCreateDirect(id, t.tagType, a.tag); err != nil {
cclog.Errorf("AppTagger: failed to add tag '%s' to job %d: %v", a.tag, id, err)
}
break out
}
return
}
}
} else {
cclog.Infof("Cannot extract job script for job: %d on %s", job.JobID, job.Cluster)
}
cclog.Debugf("AppTagger: no pattern matched for job %d on %s", id, job.Cluster)
}

View File

@@ -10,6 +10,7 @@
package tagger
import (
"fmt"
"sync"
"github.com/ClusterCockpit/cc-backend/internal/repository"
@@ -29,11 +30,31 @@ type Tagger interface {
Match(job *schema.Job)
}
// TaggerInfo holds metadata about a tagger for JSON serialization.
type TaggerInfo struct {
Name string `json:"name"`
Type string `json:"type"`
Running bool `json:"running"`
}
var (
initOnce sync.Once
jobTagger *JobTagger
initOnce sync.Once
jobTagger *JobTagger
statusMu sync.Mutex
taggerStatus = map[string]bool{}
)
// Known tagger definitions: name -> (type, factory)
type taggerDef struct {
ttype string
factory func() Tagger
}
var knownTaggers = map[string]taggerDef{
"AppTagger": {ttype: "start", factory: func() Tagger { return &AppTagger{} }},
"JobClassTagger": {ttype: "stop", factory: func() Tagger { return &JobClassTagger{} }},
}
// JobTagger coordinates multiple taggers that run at different job lifecycle events.
// It maintains separate lists of taggers that run when jobs start and when they stop.
type JobTagger struct {
@@ -88,6 +109,73 @@ func (jt *JobTagger) JobStopCallback(job *schema.Job) {
}
}
// ListTaggers returns information about all known taggers with their current running status.
func ListTaggers() []TaggerInfo {
statusMu.Lock()
defer statusMu.Unlock()
result := make([]TaggerInfo, 0, len(knownTaggers))
for name, def := range knownTaggers {
result = append(result, TaggerInfo{
Name: name,
Type: def.ttype,
Running: taggerStatus[name],
})
}
return result
}
// RunTaggerByName starts a tagger by name asynchronously on all jobs.
// Returns an error if the name is unknown or the tagger is already running.
func RunTaggerByName(name string) error {
def, ok := knownTaggers[name]
if !ok {
return fmt.Errorf("unknown tagger: %s", name)
}
statusMu.Lock()
if taggerStatus[name] {
statusMu.Unlock()
return fmt.Errorf("tagger %s is already running", name)
}
taggerStatus[name] = true
statusMu.Unlock()
go func() {
defer func() {
statusMu.Lock()
taggerStatus[name] = false
statusMu.Unlock()
}()
t := def.factory()
if err := t.Register(); err != nil {
cclog.Errorf("Failed to register tagger %s: %s", name, err)
return
}
r := repository.GetJobRepository()
jl, err := r.GetJobList(0, 0)
if err != nil {
cclog.Errorf("Error getting job list for tagger %s: %s", name, err)
return
}
cclog.Infof("Running tagger %s on %d jobs", name, len(jl))
for _, id := range jl {
job, err := r.FindByIDDirect(id)
if err != nil {
cclog.Errorf("Error getting job %d for tagger %s: %s", id, name, err)
continue
}
t.Match(job)
}
cclog.Infof("Tagger %s completed", name)
}()
return nil
}
// RunTaggers applies all configured taggers to all existing jobs in the repository.
// This is useful for retroactively applying tags to jobs that were created before
// the tagger system was initialized or when new tagging rules are added.

View File

@@ -28,10 +28,10 @@ func RegisterCompressionService(compressOlderThan int) {
lastTime := ar.CompressLast(startTime)
if startTime == lastTime {
cclog.Info("Compression Service - Complete archive run")
jobs, err = jobRepo.FindJobsBetween(0, startTime, false)
jobs, err = jobRepo.FindJobsBetween(0, startTime, "none")
} else {
jobs, err = jobRepo.FindJobsBetween(lastTime, startTime, false)
jobs, err = jobRepo.FindJobsBetween(lastTime, startTime, "none")
}
if err != nil {

View File

@@ -149,7 +149,7 @@ func transferJobsParquet(jobs []*schema.Job, src archive.ArchiveBackend, target
}
// cleanupAfterTransfer removes jobs from archive and optionally from DB.
func cleanupAfterTransfer(jobs []*schema.Job, startTime int64, includeDB bool, omitTagged bool) {
func cleanupAfterTransfer(jobs []*schema.Job, startTime int64, includeDB bool, omitTagged string) {
archive.GetHandle().CleanUp(jobs)
if includeDB {

View File

@@ -26,8 +26,8 @@ type Retention struct {
Policy string `json:"policy"`
Format string `json:"format"`
Age int `json:"age"`
IncludeDB bool `json:"includeDB"`
OmitTagged bool `json:"omitTagged"`
IncludeDB bool `json:"include-db"`
OmitTagged string `json:"omit-tagged"`
TargetKind string `json:"target-kind"`
TargetPath string `json:"target-path"`
TargetEndpoint string `json:"target-endpoint"`

View File

@@ -68,6 +68,11 @@ var configSchema = `
"description": "Also remove jobs from database",
"type": "boolean"
},
"omit-tagged": {
"description": "Omit tagged jobs from retention: none = include all, all = omit any tagged job, user = omit jobs with user-created tags (auto-tagger types 'app'/'jobClass' are not considered user tags)",
"type": "string",
"enum": ["none", "all", "user"]
},
"age": {
"description": "Act on jobs with startTime older than age (in days)",
"type": "integer"

View File

@@ -609,6 +609,12 @@
"dev": true,
"license": "MIT"
},
"node_modules/@types/trusted-types": {
"version": "2.0.7",
"resolved": "https://registry.npmjs.org/@types/trusted-types/-/trusted-types-2.0.7.tgz",
"integrity": "sha512-ScaPdn1dQczgbl0QFTeTOmVHFULt394XJgOQNoyVhZ6r2vLnMLJfBPd53SB52T/3G36VI1/g2MZaX0cwDuXsfw==",
"license": "MIT"
},
"node_modules/@urql/core": {
"version": "5.2.0",
"resolved": "https://registry.npmjs.org/@urql/core/-/core-5.2.0.tgz",
@@ -745,9 +751,9 @@
}
},
"node_modules/devalue": {
"version": "5.6.2",
"resolved": "https://registry.npmjs.org/devalue/-/devalue-5.6.2.tgz",
"integrity": "sha512-nPRkjWzzDQlsejL1WVifk5rvcFi/y1onBRxjaFMjZeR9mFpqu2gmAZ9xUB9/IEanEP/vBtGeGganC/GO1fmufg==",
"version": "5.6.3",
"resolved": "https://registry.npmjs.org/devalue/-/devalue-5.6.3.tgz",
"integrity": "sha512-nc7XjUU/2Lb+SvEFVGcWLiKkzfw8+qHI7zn8WYXKkLMgfGSHbgCEaR6bJpev8Cm6Rmrb19Gfd/tZvGqx9is3wg==",
"license": "MIT"
},
"node_modules/escape-latex": {
@@ -763,9 +769,9 @@
"license": "MIT"
},
"node_modules/esrap": {
"version": "2.2.1",
"resolved": "https://registry.npmjs.org/esrap/-/esrap-2.2.1.tgz",
"integrity": "sha512-GiYWG34AN/4CUyaWAgunGt0Rxvr1PTMlGC0vvEov/uOQYWne2bpN03Um+k8jT+q3op33mKouP2zeJ6OlM+qeUg==",
"version": "2.2.3",
"resolved": "https://registry.npmjs.org/esrap/-/esrap-2.2.3.tgz",
"integrity": "sha512-8fOS+GIGCQZl/ZIlhl59htOlms6U8NvX6ZYgYHpRU/b6tVSh3uHkOHZikl3D4cMbYM0JlpBe+p/BkZEi8J9XIQ==",
"license": "MIT",
"dependencies": {
"@jridgewell/sourcemap-codec": "^1.4.15"
@@ -1176,22 +1182,23 @@
}
},
"node_modules/svelte": {
"version": "5.46.4",
"resolved": "https://registry.npmjs.org/svelte/-/svelte-5.46.4.tgz",
"integrity": "sha512-VJwdXrmv9L8L7ZasJeWcCjoIuMRVbhuxbss0fpVnR8yorMmjNDwcjIH08vS6wmSzzzgAG5CADQ1JuXPS2nwt9w==",
"version": "5.53.2",
"resolved": "https://registry.npmjs.org/svelte/-/svelte-5.53.2.tgz",
"integrity": "sha512-yGONuIrcl/BMmqbm6/52Q/NYzfkta7uVlos5NSzGTfNJTTFtPPzra6rAQoQIwAqupeM3s9uuTf5PvioeiCdg9g==",
"license": "MIT",
"dependencies": {
"@jridgewell/remapping": "^2.3.4",
"@jridgewell/sourcemap-codec": "^1.5.0",
"@sveltejs/acorn-typescript": "^1.0.5",
"@types/estree": "^1.0.5",
"@types/trusted-types": "^2.0.7",
"acorn": "^8.12.1",
"aria-query": "^5.3.1",
"axobject-query": "^4.1.0",
"clsx": "^2.1.1",
"devalue": "^5.6.2",
"devalue": "^5.6.3",
"esm-env": "^1.2.1",
"esrap": "^2.2.1",
"esrap": "^2.2.2",
"is-reference": "^3.0.3",
"locate-character": "^3.0.0",
"magic-string": "^0.30.11",

View File

@@ -15,6 +15,7 @@
import ShowUsers from "./admin/ShowUsers.svelte";
import Options from "./admin/Options.svelte";
import NoticeEdit from "./admin/NoticeEdit.svelte";
import RunTaggers from "./admin/RunTaggers.svelte";
/* Svelte 5 Props */
let {
@@ -70,4 +71,5 @@
</Col>
<Options config={ccconfig} {clusterNames}/>
<NoticeEdit {ncontent}/>
<RunTaggers />
</Row>

View File

@@ -0,0 +1,142 @@
<!--
@component Admin card for running individual job taggers on all jobs
-->
<script>
import {
Col,
Card,
CardTitle,
CardBody,
Spinner,
Badge,
} from "@sveltestrap/sveltestrap";
import { fade } from "svelte/transition";
import { onMount, onDestroy } from "svelte";
/* State Init */
let taggers = $state([]);
let message = $state({ msg: "", color: "#d63384" });
let displayMessage = $state(false);
let pollTimer = $state(null);
/* Functions */
async function fetchTaggers() {
try {
const res = await fetch("/config/taggers/");
if (res.ok) {
taggers = await res.json();
}
} catch (err) {
console.error("Failed to fetch taggers:", err);
}
}
async function runTagger(name) {
let formData = new FormData();
formData.append("name", name);
try {
const res = await fetch("/config/taggers/run/", {
method: "POST",
body: formData,
});
if (res.ok) {
let text = await res.text();
popMessage(text, "#048109");
startPolling();
await fetchTaggers();
} else {
let text = await res.text();
throw new Error("Response Code " + res.status + " -> " + text);
}
} catch (err) {
popMessage(err, "#d63384");
}
}
function startPolling() {
if (pollTimer) return;
pollTimer = setInterval(async () => {
await fetchTaggers();
const anyRunning = taggers.some((t) => t.running);
if (!anyRunning) {
clearInterval(pollTimer);
pollTimer = null;
}
}, 3000);
}
function popMessage(response, rescolor) {
message = { msg: response, color: rescolor };
displayMessage = true;
setTimeout(function () {
displayMessage = false;
}, 3500);
}
/* Lifecycle */
onMount(async () => {
await fetchTaggers();
const anyRunning = taggers.some((t) => t.running);
if (anyRunning) startPolling();
});
onDestroy(() => {
if (pollTimer) clearInterval(pollTimer);
});
</script>
<Col>
<Card class="h-100">
<CardBody>
<CardTitle class="mb-3">Job Taggers</CardTitle>
<p>Run individual taggers on all existing jobs.</p>
{#if taggers.length === 0}
<p class="text-muted">No taggers available.</p>
{:else}
<table class="table table-sm mb-3">
<thead>
<tr>
<th>Name</th>
<th>Type</th>
<th>Status</th>
<th></th>
</tr>
</thead>
<tbody>
{#each taggers as tagger}
<tr>
<td>{tagger.name}</td>
<td><Badge color="secondary">{tagger.type}</Badge></td>
<td>
{#if tagger.running}
<Spinner size="sm" color="primary" /> Running
{:else}
<span class="text-muted">Idle</span>
{/if}
</td>
<td>
<button
class="btn btn-sm btn-primary"
disabled={tagger.running}
onclick={() => runTagger(tagger.name)}
>
Run
</button>
</td>
</tr>
{/each}
</tbody>
</table>
{/if}
<p>
{#if displayMessage}<b
><code style="color: {message.color};" out:fade
>{message.msg}</code
></b
>{/if}
</p>
</CardBody>
</Card>
</Col>

View File

@@ -20,6 +20,7 @@
import { queryStore, gql, getContextClient } from "@urql/svelte";
import { Card, Spinner } from "@sveltestrap/sveltestrap";
import { maxScope, checkMetricAvailability } from "../utils.js";
import uPlot from "uplot";
import JobInfo from "./JobInfo.svelte";
import MetricPlot from "../plots/MetricPlot.svelte";
import JobFootprint from "../helper/JobFootprint.svelte";
@@ -74,13 +75,17 @@
}
`;
/* Var Init*/
// svelte-ignore state_referenced_locally
let plotSync = uPlot.sync(`jobMetricStack-${job.cluster}-${job.id}`);
/* State Init */
let zoomStates = $state({});
let thresholdStates = $state({});
/* Derived */
const resampleDefault = $derived(resampleConfig ? Math.max(...resampleConfig.resolutions) : 0);
const jobId = $derived(job?.id);
const jobId = $derived(job.id);
const scopes = $derived.by(() => {
if (job.numNodes == 1) {
if (job.numAcc >= 1) return ["core", "accelerator"];
@@ -233,6 +238,7 @@
numaccs={job.numAcc}
zoomState={zoomStates[metric.data.name] || null}
thresholdState={thresholdStates[metric.data.name] || null}
{plotSync}
/>
{:else}
<Card body class="mx-2" color="warning">

View File

@@ -32,12 +32,28 @@
/* Const Init */
const client = getContextClient();
const stateOptions = [
"all",
"allocated",
"idle",
"down",
"mixed",
"reserved",
"unknown",
];
const healthOptions = [
"all",
"full",
"partial",
"failed",
];
/* State Init */
let pieWidth = $state(0);
let querySorting = $state({ field: "startTime", type: "col", order: "DESC" })
let tableHostFilter = $state("");
let tableStateFilter = $state("");
let tableHealthFilter = $state("");
let tableStateFilter = $state(stateOptions[0]);
let tableHealthFilter = $state(healthOptions[0]);
let healthTableSorting = $state(
{
schedulerState: { dir: "down", active: true },
@@ -78,7 +94,7 @@
`,
variables: {
nodeFilter: { cluster: { eq: cluster }},
sorting: { field: "startTime", type: "col", order: "DESC" },
sorting: querySorting,
},
requestPolicy: "network-only"
}));
@@ -98,10 +114,10 @@
if (tableHostFilter != "") {
pendingTableData = pendingTableData.filter((e) => e.hostname.includes(tableHostFilter))
}
if (tableStateFilter != "") {
if (tableStateFilter != "all") {
pendingTableData = pendingTableData.filter((e) => e.schedulerState.includes(tableStateFilter))
}
if (tableHealthFilter != "") {
if (tableHealthFilter != "all") {
pendingTableData = pendingTableData.filter((e) => e.healthState.includes(tableHealthFilter))
}
return pendingTableData
@@ -148,7 +164,7 @@
<Refresher
initially={120}
onRefresh={(interval) => {
sorting = { field: "startTime", type: "col", order: "DESC" }
querySorting = { field: "startTime", type: "col", order: "DESC" };
}}
/>
</Col>
@@ -280,8 +296,8 @@
<thead>
<!-- Header Row 1: Titles and Sorting -->
<tr>
<th style="width: 7.5%; min-width: 100px; max-width:10%;" onclick={() => sortBy('hostname')}>
Host
<th style="width: 9%; min-width: 100px; max-width:10%;" onclick={() => sortBy('hostname')}>
Hosts ({filteredTableData.length})
<Icon
name="caret-{healthTableSorting['hostname'].dir}{healthTableSorting['hostname']
.active
@@ -289,7 +305,7 @@
: ''}"
/>
</th>
<th style="width: 8.5%; min-width: 100px; max-width:10%;" onclick={() => sortBy('schedulerState')}>
<th style="width: 9%; min-width: 100px; max-width:10%;" onclick={() => sortBy('schedulerState')}>
Scheduler State
<Icon
name="caret-{healthTableSorting['schedulerState'].dir}{healthTableSorting['schedulerState']
@@ -298,7 +314,7 @@
: ''}"
/>
</th>
<th style="width: 7.5%; min-width: 100px; max-width:10%;" onclick={() => sortBy('healthState')}>
<th style="width: 9%; min-width: 100px; max-width:10%;" onclick={() => sortBy('healthState')}>
Health State
<Icon
name="caret-{healthTableSorting['healthState'].dir}{healthTableSorting['healthState']
@@ -322,7 +338,11 @@
</th>
<th>
<InputGroup size="sm">
<Input type="text" bind:value={tableStateFilter}/>
<Input type="select" bind:value={tableStateFilter}>
{#each stateOptions as so}
<option value={so}>{so}</option>
{/each}
</Input>
<InputGroupText>
<Icon name="search"></Icon>
</InputGroupText>
@@ -330,7 +350,11 @@
</th>
<th>
<InputGroup size="sm">
<Input type="text" bind:value={tableHealthFilter}/>
<Input type="select" bind:value={tableHealthFilter}>
{#each healthOptions as ho}
<option value={ho}>{ho}</option>
{/each}
</Input>
<InputGroupText>
<Icon name="search"></Icon>
</InputGroupText>

View File

@@ -211,6 +211,7 @@
timestep={metricData.data.metric.timestep}
series={metricData.data.metric.series}
height={375}
{plotSync}
forNode
/>
{/if}