mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2026-02-24 11:27:30 +01:00
Merge pull request #495 from ClusterCockpit/dev
Fix more bugs related to job_cache ids used in job table
This commit is contained in:
@@ -488,3 +488,163 @@ func TestRestApi(t *testing.T) {
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// TestStopJobWithReusedJobId verifies that stopping a recently started job works
|
||||
// even when an older job with the same jobId exists in the job table (e.g. with
|
||||
// state "failed"). This is a regression test for the bug where Find() on the job
|
||||
// table would match the old job instead of the new one still in job_cache.
|
||||
func TestStopJobWithReusedJobId(t *testing.T) {
|
||||
restapi := setup(t)
|
||||
t.Cleanup(cleanup)
|
||||
|
||||
testData := schema.JobData{
|
||||
"load_one": map[schema.MetricScope]*schema.JobMetric{
|
||||
schema.MetricScopeNode: {
|
||||
Unit: schema.Unit{Base: "load"},
|
||||
Timestep: 60,
|
||||
Series: []schema.Series{
|
||||
{
|
||||
Hostname: "host123",
|
||||
Statistics: schema.MetricStatistics{Min: 0.1, Avg: 0.2, Max: 0.3},
|
||||
Data: []schema.Float{0.1, 0.1, 0.1, 0.2, 0.2, 0.2, 0.3, 0.3, 0.3},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
metricstore.TestLoadDataCallback = func(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context, resolution int) (schema.JobData, error) {
|
||||
return testData, nil
|
||||
}
|
||||
|
||||
r := chi.NewRouter()
|
||||
restapi.MountAPIRoutes(r)
|
||||
|
||||
const contextUserKey repository.ContextKey = "user"
|
||||
contextUserValue := &schema.User{
|
||||
Username: "testuser",
|
||||
Projects: make([]string, 0),
|
||||
Roles: []string{"user"},
|
||||
AuthType: 0,
|
||||
AuthSource: 2,
|
||||
}
|
||||
|
||||
// Step 1: Start the first job (jobId=999)
|
||||
const startJobBody1 string = `{
|
||||
"jobId": 999,
|
||||
"user": "testuser",
|
||||
"project": "testproj",
|
||||
"cluster": "testcluster",
|
||||
"partition": "default",
|
||||
"walltime": 3600,
|
||||
"numNodes": 1,
|
||||
"numHwthreads": 8,
|
||||
"numAcc": 0,
|
||||
"shared": "none",
|
||||
"monitoringStatus": 1,
|
||||
"smt": 1,
|
||||
"resources": [{"hostname": "host123", "hwthreads": [0, 1, 2, 3, 4, 5, 6, 7]}],
|
||||
"startTime": 200000000
|
||||
}`
|
||||
|
||||
if ok := t.Run("StartFirstJob", func(t *testing.T) {
|
||||
req := httptest.NewRequest(http.MethodPost, "/jobs/start_job/", bytes.NewBuffer([]byte(startJobBody1)))
|
||||
recorder := httptest.NewRecorder()
|
||||
ctx := context.WithValue(req.Context(), contextUserKey, contextUserValue)
|
||||
r.ServeHTTP(recorder, req.WithContext(ctx))
|
||||
if recorder.Result().StatusCode != http.StatusCreated {
|
||||
t.Fatal(recorder.Result().Status, recorder.Body.String())
|
||||
}
|
||||
}); !ok {
|
||||
return
|
||||
}
|
||||
|
||||
// Step 2: Sync to move job from cache to job table, then stop it as "failed"
|
||||
time.Sleep(1 * time.Second)
|
||||
restapi.JobRepository.SyncJobs()
|
||||
|
||||
const stopJobBody1 string = `{
|
||||
"jobId": 999,
|
||||
"startTime": 200000000,
|
||||
"cluster": "testcluster",
|
||||
"jobState": "failed",
|
||||
"stopTime": 200001000
|
||||
}`
|
||||
|
||||
if ok := t.Run("StopFirstJobAsFailed", func(t *testing.T) {
|
||||
req := httptest.NewRequest(http.MethodPost, "/jobs/stop_job/", bytes.NewBuffer([]byte(stopJobBody1)))
|
||||
recorder := httptest.NewRecorder()
|
||||
ctx := context.WithValue(req.Context(), contextUserKey, contextUserValue)
|
||||
r.ServeHTTP(recorder, req.WithContext(ctx))
|
||||
if recorder.Result().StatusCode != http.StatusOK {
|
||||
t.Fatal(recorder.Result().Status, recorder.Body.String())
|
||||
}
|
||||
|
||||
jobid, cluster := int64(999), "testcluster"
|
||||
job, err := restapi.JobRepository.Find(&jobid, &cluster, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if job.State != schema.JobStateFailed {
|
||||
t.Fatalf("expected first job to be failed, got: %s", job.State)
|
||||
}
|
||||
}); !ok {
|
||||
return
|
||||
}
|
||||
|
||||
// Wait for archiving to complete
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
// Step 3: Start a NEW job with the same jobId=999 but different startTime.
|
||||
// This job will sit in job_cache (not yet synced).
|
||||
const startJobBody2 string = `{
|
||||
"jobId": 999,
|
||||
"user": "testuser",
|
||||
"project": "testproj",
|
||||
"cluster": "testcluster",
|
||||
"partition": "default",
|
||||
"walltime": 3600,
|
||||
"numNodes": 1,
|
||||
"numHwthreads": 8,
|
||||
"numAcc": 0,
|
||||
"shared": "none",
|
||||
"monitoringStatus": 1,
|
||||
"smt": 1,
|
||||
"resources": [{"hostname": "host123", "hwthreads": [0, 1, 2, 3, 4, 5, 6, 7]}],
|
||||
"startTime": 300000000
|
||||
}`
|
||||
|
||||
if ok := t.Run("StartSecondJob", func(t *testing.T) {
|
||||
req := httptest.NewRequest(http.MethodPost, "/jobs/start_job/", bytes.NewBuffer([]byte(startJobBody2)))
|
||||
recorder := httptest.NewRecorder()
|
||||
ctx := context.WithValue(req.Context(), contextUserKey, contextUserValue)
|
||||
r.ServeHTTP(recorder, req.WithContext(ctx))
|
||||
if recorder.Result().StatusCode != http.StatusCreated {
|
||||
t.Fatal(recorder.Result().Status, recorder.Body.String())
|
||||
}
|
||||
}); !ok {
|
||||
return
|
||||
}
|
||||
|
||||
// Step 4: Stop the second job WITHOUT syncing first.
|
||||
// Before the fix, this would fail because Find() on the job table would
|
||||
// match the old failed job (jobId=999) and reject with "already stopped".
|
||||
const stopJobBody2 string = `{
|
||||
"jobId": 999,
|
||||
"startTime": 300000000,
|
||||
"cluster": "testcluster",
|
||||
"jobState": "completed",
|
||||
"stopTime": 300001000
|
||||
}`
|
||||
|
||||
t.Run("StopSecondJobBeforeSync", func(t *testing.T) {
|
||||
req := httptest.NewRequest(http.MethodPost, "/jobs/stop_job/", bytes.NewBuffer([]byte(stopJobBody2)))
|
||||
recorder := httptest.NewRecorder()
|
||||
ctx := context.WithValue(req.Context(), contextUserKey, contextUserValue)
|
||||
r.ServeHTTP(recorder, req.WithContext(ctx))
|
||||
if recorder.Result().StatusCode != http.StatusOK {
|
||||
t.Fatalf("expected stop to succeed for cached job, got: %s %s",
|
||||
recorder.Result().Status, recorder.Body.String())
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
@@ -697,7 +697,15 @@ func (api *RestAPI) startJob(rw http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
}
|
||||
|
||||
id, err := api.JobRepository.Start(&req)
|
||||
// When tags are present, insert directly into the job table so that the
|
||||
// returned ID can be used with AddTagOrCreate (which queries the job table).
|
||||
// Jobs without tags use the cache path as before.
|
||||
var id int64
|
||||
if len(req.Tags) > 0 {
|
||||
id, err = api.JobRepository.StartDirect(&req)
|
||||
} else {
|
||||
id, err = api.JobRepository.Start(&req)
|
||||
}
|
||||
if err != nil {
|
||||
handleError(fmt.Errorf("insert into database failed: %w", err), http.StatusInternalServerError, rw)
|
||||
return
|
||||
@@ -755,16 +763,15 @@ func (api *RestAPI) stopJobByRequest(rw http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
isCached := false
|
||||
job, err = api.JobRepository.Find(req.JobID, req.Cluster, req.StartTime)
|
||||
job, err = api.JobRepository.FindCached(req.JobID, req.Cluster, req.StartTime)
|
||||
if err != nil {
|
||||
// Try cached jobs if not found in main repository
|
||||
cachedJob, cachedErr := api.JobRepository.FindCached(req.JobID, req.Cluster, req.StartTime)
|
||||
if cachedErr != nil {
|
||||
// Combine both errors for better debugging
|
||||
handleError(fmt.Errorf("finding job failed: %w (cached lookup also failed: %v)", err, cachedErr), http.StatusNotFound, rw)
|
||||
// Not in cache, try main job table
|
||||
job, err = api.JobRepository.Find(req.JobID, req.Cluster, req.StartTime)
|
||||
if err != nil {
|
||||
handleError(fmt.Errorf("finding job failed: %w", err), http.StatusNotFound, rw)
|
||||
return
|
||||
}
|
||||
job = cachedJob
|
||||
} else {
|
||||
isCached = true
|
||||
}
|
||||
|
||||
|
||||
@@ -211,7 +211,14 @@ func (api *NatsAPI) handleStartJob(payload string) {
|
||||
}
|
||||
}
|
||||
|
||||
id, err := api.JobRepository.Start(&req)
|
||||
// When tags are present, insert directly into the job table so that the
|
||||
// returned ID can be used with AddTagOrCreate (which queries the job table).
|
||||
var id int64
|
||||
if len(req.Tags) > 0 {
|
||||
id, err = api.JobRepository.StartDirect(&req)
|
||||
} else {
|
||||
id, err = api.JobRepository.Start(&req)
|
||||
}
|
||||
if err != nil {
|
||||
cclog.Errorf("NATS start job: insert into database failed: %v", err)
|
||||
return
|
||||
@@ -252,15 +259,15 @@ func (api *NatsAPI) handleStopJob(payload string) {
|
||||
}
|
||||
|
||||
isCached := false
|
||||
job, err := api.JobRepository.Find(req.JobID, req.Cluster, req.StartTime)
|
||||
job, err := api.JobRepository.FindCached(req.JobID, req.Cluster, req.StartTime)
|
||||
if err != nil {
|
||||
cachedJob, cachedErr := api.JobRepository.FindCached(req.JobID, req.Cluster, req.StartTime)
|
||||
if cachedErr != nil {
|
||||
cclog.Errorf("NATS job stop: finding job failed: %v (cached lookup also failed: %v)",
|
||||
err, cachedErr)
|
||||
// Not in cache, try main job table
|
||||
job, err = api.JobRepository.Find(req.JobID, req.Cluster, req.StartTime)
|
||||
if err != nil {
|
||||
cclog.Errorf("NATS job stop: finding job failed: %v", err)
|
||||
return
|
||||
}
|
||||
job = cachedJob
|
||||
} else {
|
||||
isCached = true
|
||||
}
|
||||
|
||||
|
||||
@@ -22,6 +22,7 @@ import (
|
||||
"github.com/ClusterCockpit/cc-backend/internal/auth"
|
||||
"github.com/ClusterCockpit/cc-backend/internal/config"
|
||||
"github.com/ClusterCockpit/cc-backend/internal/repository"
|
||||
"github.com/ClusterCockpit/cc-backend/internal/tagger"
|
||||
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
|
||||
"github.com/ClusterCockpit/cc-lib/v2/schema"
|
||||
"github.com/ClusterCockpit/cc-lib/v2/util"
|
||||
@@ -152,6 +153,8 @@ func (api *RestAPI) MountConfigAPIRoutes(r chi.Router) {
|
||||
r.Delete("/config/users/", api.deleteUser)
|
||||
r.Post("/config/user/{id}", api.updateUser)
|
||||
r.Post("/config/notice/", api.editNotice)
|
||||
r.Get("/config/taggers/", api.getTaggers)
|
||||
r.Post("/config/taggers/run/", api.runTagger)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -268,6 +271,42 @@ func (api *RestAPI) editNotice(rw http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
}
|
||||
|
||||
func (api *RestAPI) getTaggers(rw http.ResponseWriter, r *http.Request) {
|
||||
if user := repository.GetUserFromContext(r.Context()); !user.HasRole(schema.RoleAdmin) {
|
||||
handleError(fmt.Errorf("only admins are allowed to list taggers"), http.StatusForbidden, rw)
|
||||
return
|
||||
}
|
||||
|
||||
rw.Header().Set("Content-Type", "application/json")
|
||||
if err := json.NewEncoder(rw).Encode(tagger.ListTaggers()); err != nil {
|
||||
cclog.Errorf("Failed to encode tagger list: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (api *RestAPI) runTagger(rw http.ResponseWriter, r *http.Request) {
|
||||
if user := repository.GetUserFromContext(r.Context()); !user.HasRole(schema.RoleAdmin) {
|
||||
handleError(fmt.Errorf("only admins are allowed to run taggers"), http.StatusForbidden, rw)
|
||||
return
|
||||
}
|
||||
|
||||
name := r.FormValue("name")
|
||||
if name == "" {
|
||||
handleError(fmt.Errorf("missing required parameter: name"), http.StatusBadRequest, rw)
|
||||
return
|
||||
}
|
||||
|
||||
if err := tagger.RunTaggerByName(name); err != nil {
|
||||
handleError(err, http.StatusConflict, rw)
|
||||
return
|
||||
}
|
||||
|
||||
rw.Header().Set("Content-Type", "text/plain")
|
||||
rw.WriteHeader(http.StatusOK)
|
||||
if _, err := rw.Write([]byte(fmt.Sprintf("Tagger %s started", name))); err != nil {
|
||||
cclog.Errorf("Failed to write response: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// getJWT godoc
|
||||
// @summary Generate JWT token
|
||||
// @tags Frontend
|
||||
|
||||
@@ -102,7 +102,7 @@ func HandleImportFlag(flag string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
id, err := r.InsertJob(&job)
|
||||
id, err := r.InsertJobDirect(&job)
|
||||
if err != nil {
|
||||
cclog.Warn("Error while job db insert")
|
||||
return err
|
||||
|
||||
@@ -165,7 +165,7 @@ func TestHandleImportFlag(t *testing.T) {
|
||||
}
|
||||
|
||||
result := readResult(t, testname)
|
||||
job, err := r.FindCached(&result.JobId, &result.Cluster, &result.StartTime)
|
||||
job, err := r.Find(&result.JobId, &result.Cluster, &result.StartTime)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
@@ -30,6 +30,27 @@ const NamedJobInsert string = `INSERT INTO job (
|
||||
:shared, :monitoring_status, :smt, :job_state, :start_time, :duration, :walltime, :footprint, :energy, :energy_footprint, :resources, :meta_data
|
||||
);`
|
||||
|
||||
// InsertJobDirect inserts a job directly into the job table (not job_cache).
|
||||
// Use this when the returned ID will be used for operations on the job table
|
||||
// (e.g., adding tags), or for imported jobs that are already completed.
|
||||
func (r *JobRepository) InsertJobDirect(job *schema.Job) (int64, error) {
|
||||
r.Mutex.Lock()
|
||||
defer r.Mutex.Unlock()
|
||||
|
||||
res, err := r.DB.NamedExec(NamedJobInsert, job)
|
||||
if err != nil {
|
||||
cclog.Warn("Error while NamedJobInsert (direct)")
|
||||
return 0, err
|
||||
}
|
||||
id, err := res.LastInsertId()
|
||||
if err != nil {
|
||||
cclog.Warn("Error while getting last insert ID (direct)")
|
||||
return 0, err
|
||||
}
|
||||
|
||||
return id, nil
|
||||
}
|
||||
|
||||
func (r *JobRepository) InsertJob(job *schema.Job) (int64, error) {
|
||||
r.Mutex.Lock()
|
||||
defer r.Mutex.Unlock()
|
||||
@@ -148,6 +169,28 @@ func (r *JobRepository) Start(job *schema.Job) (id int64, err error) {
|
||||
return r.InsertJob(job)
|
||||
}
|
||||
|
||||
// StartDirect inserts a new job directly into the job table (not job_cache).
|
||||
// Use this when the returned ID will immediately be used for job table
|
||||
// operations such as adding tags.
|
||||
func (r *JobRepository) StartDirect(job *schema.Job) (id int64, err error) {
|
||||
job.RawFootprint, err = json.Marshal(job.Footprint)
|
||||
if err != nil {
|
||||
return -1, fmt.Errorf("REPOSITORY/JOB > encoding footprint field failed: %w", err)
|
||||
}
|
||||
|
||||
job.RawResources, err = json.Marshal(job.Resources)
|
||||
if err != nil {
|
||||
return -1, fmt.Errorf("REPOSITORY/JOB > encoding resources field failed: %w", err)
|
||||
}
|
||||
|
||||
job.RawMetaData, err = json.Marshal(job.MetaData)
|
||||
if err != nil {
|
||||
return -1, fmt.Errorf("REPOSITORY/JOB > encoding metaData field failed: %w", err)
|
||||
}
|
||||
|
||||
return r.InsertJobDirect(job)
|
||||
}
|
||||
|
||||
// Stop updates the job with the database id jobId using the provided arguments.
|
||||
func (r *JobRepository) Stop(
|
||||
jobID int64,
|
||||
|
||||
@@ -528,3 +528,80 @@ func TestSyncJobs(t *testing.T) {
|
||||
assert.Equal(t, 0, len(jobs), "Should return empty list when cache is empty")
|
||||
})
|
||||
}
|
||||
|
||||
func TestInsertJobDirect(t *testing.T) {
|
||||
r := setup(t)
|
||||
|
||||
t.Run("inserts into job table not cache", func(t *testing.T) {
|
||||
job := createTestJob(999020, "testcluster")
|
||||
job.RawResources, _ = json.Marshal(job.Resources)
|
||||
job.RawFootprint, _ = json.Marshal(job.Footprint)
|
||||
job.RawMetaData, _ = json.Marshal(job.MetaData)
|
||||
|
||||
id, err := r.InsertJobDirect(job)
|
||||
require.NoError(t, err, "InsertJobDirect should succeed")
|
||||
assert.Greater(t, id, int64(0), "Should return valid insert ID")
|
||||
|
||||
// Verify job is in job table
|
||||
var count int
|
||||
err = r.DB.QueryRow("SELECT COUNT(*) FROM job WHERE id = ?", id).Scan(&count)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 1, count, "Job should be in job table")
|
||||
|
||||
// Verify job is NOT in job_cache
|
||||
err = r.DB.QueryRow("SELECT COUNT(*) FROM job_cache WHERE job_id = ? AND cluster = ?",
|
||||
job.JobID, job.Cluster).Scan(&count)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 0, count, "Job should NOT be in job_cache")
|
||||
|
||||
// Clean up
|
||||
_, err = r.DB.Exec("DELETE FROM job WHERE id = ?", id)
|
||||
require.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("returned ID works for tag operations", func(t *testing.T) {
|
||||
job := createTestJob(999021, "testcluster")
|
||||
job.RawResources, _ = json.Marshal(job.Resources)
|
||||
job.RawFootprint, _ = json.Marshal(job.Footprint)
|
||||
job.RawMetaData, _ = json.Marshal(job.MetaData)
|
||||
|
||||
id, err := r.InsertJobDirect(job)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Adding a tag using the returned ID should succeed (FK constraint on jobtag)
|
||||
err = r.ImportTag(id, "test_type", "test_name", "global")
|
||||
require.NoError(t, err, "ImportTag should succeed with direct insert ID")
|
||||
|
||||
// Clean up
|
||||
_, err = r.DB.Exec("DELETE FROM jobtag WHERE job_id = ?", id)
|
||||
require.NoError(t, err)
|
||||
_, err = r.DB.Exec("DELETE FROM job WHERE id = ?", id)
|
||||
require.NoError(t, err)
|
||||
})
|
||||
}
|
||||
|
||||
func TestStartDirect(t *testing.T) {
|
||||
r := setup(t)
|
||||
|
||||
t.Run("inserts into job table with JSON encoding", func(t *testing.T) {
|
||||
job := createTestJob(999022, "testcluster")
|
||||
|
||||
id, err := r.StartDirect(job)
|
||||
require.NoError(t, err, "StartDirect should succeed")
|
||||
assert.Greater(t, id, int64(0))
|
||||
|
||||
// Verify job is in job table with encoded JSON
|
||||
var rawResources []byte
|
||||
err = r.DB.QueryRow("SELECT resources FROM job WHERE id = ?", id).Scan(&rawResources)
|
||||
require.NoError(t, err)
|
||||
|
||||
var resources []*schema.Resource
|
||||
err = json.Unmarshal(rawResources, &resources)
|
||||
require.NoError(t, err, "Resources should be valid JSON")
|
||||
assert.Equal(t, "node01", resources[0].Hostname)
|
||||
|
||||
// Clean up
|
||||
_, err = r.DB.Exec("DELETE FROM job WHERE id = ?", id)
|
||||
require.NoError(t, err)
|
||||
})
|
||||
}
|
||||
|
||||
@@ -10,6 +10,7 @@
|
||||
package tagger
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/ClusterCockpit/cc-backend/internal/repository"
|
||||
@@ -29,11 +30,31 @@ type Tagger interface {
|
||||
Match(job *schema.Job)
|
||||
}
|
||||
|
||||
// TaggerInfo holds metadata about a tagger for JSON serialization.
|
||||
type TaggerInfo struct {
|
||||
Name string `json:"name"`
|
||||
Type string `json:"type"`
|
||||
Running bool `json:"running"`
|
||||
}
|
||||
|
||||
var (
|
||||
initOnce sync.Once
|
||||
jobTagger *JobTagger
|
||||
initOnce sync.Once
|
||||
jobTagger *JobTagger
|
||||
statusMu sync.Mutex
|
||||
taggerStatus = map[string]bool{}
|
||||
)
|
||||
|
||||
// Known tagger definitions: name -> (type, factory)
|
||||
type taggerDef struct {
|
||||
ttype string
|
||||
factory func() Tagger
|
||||
}
|
||||
|
||||
var knownTaggers = map[string]taggerDef{
|
||||
"AppTagger": {ttype: "start", factory: func() Tagger { return &AppTagger{} }},
|
||||
"JobClassTagger": {ttype: "stop", factory: func() Tagger { return &JobClassTagger{} }},
|
||||
}
|
||||
|
||||
// JobTagger coordinates multiple taggers that run at different job lifecycle events.
|
||||
// It maintains separate lists of taggers that run when jobs start and when they stop.
|
||||
type JobTagger struct {
|
||||
@@ -88,6 +109,73 @@ func (jt *JobTagger) JobStopCallback(job *schema.Job) {
|
||||
}
|
||||
}
|
||||
|
||||
// ListTaggers returns information about all known taggers with their current running status.
|
||||
func ListTaggers() []TaggerInfo {
|
||||
statusMu.Lock()
|
||||
defer statusMu.Unlock()
|
||||
|
||||
result := make([]TaggerInfo, 0, len(knownTaggers))
|
||||
for name, def := range knownTaggers {
|
||||
result = append(result, TaggerInfo{
|
||||
Name: name,
|
||||
Type: def.ttype,
|
||||
Running: taggerStatus[name],
|
||||
})
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// RunTaggerByName starts a tagger by name asynchronously on all jobs.
|
||||
// Returns an error if the name is unknown or the tagger is already running.
|
||||
func RunTaggerByName(name string) error {
|
||||
def, ok := knownTaggers[name]
|
||||
if !ok {
|
||||
return fmt.Errorf("unknown tagger: %s", name)
|
||||
}
|
||||
|
||||
statusMu.Lock()
|
||||
if taggerStatus[name] {
|
||||
statusMu.Unlock()
|
||||
return fmt.Errorf("tagger %s is already running", name)
|
||||
}
|
||||
taggerStatus[name] = true
|
||||
statusMu.Unlock()
|
||||
|
||||
go func() {
|
||||
defer func() {
|
||||
statusMu.Lock()
|
||||
taggerStatus[name] = false
|
||||
statusMu.Unlock()
|
||||
}()
|
||||
|
||||
t := def.factory()
|
||||
if err := t.Register(); err != nil {
|
||||
cclog.Errorf("Failed to register tagger %s: %s", name, err)
|
||||
return
|
||||
}
|
||||
|
||||
r := repository.GetJobRepository()
|
||||
jl, err := r.GetJobList(0, 0)
|
||||
if err != nil {
|
||||
cclog.Errorf("Error getting job list for tagger %s: %s", name, err)
|
||||
return
|
||||
}
|
||||
|
||||
cclog.Infof("Running tagger %s on %d jobs", name, len(jl))
|
||||
for _, id := range jl {
|
||||
job, err := r.FindByIDDirect(id)
|
||||
if err != nil {
|
||||
cclog.Errorf("Error getting job %d for tagger %s: %s", id, name, err)
|
||||
continue
|
||||
}
|
||||
t.Match(job)
|
||||
}
|
||||
cclog.Infof("Tagger %s completed", name)
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// RunTaggers applies all configured taggers to all existing jobs in the repository.
|
||||
// This is useful for retroactively applying tags to jobs that were created before
|
||||
// the tagger system was initialized or when new tagging rules are added.
|
||||
|
||||
29
web/frontend/package-lock.json
generated
29
web/frontend/package-lock.json
generated
@@ -609,6 +609,12 @@
|
||||
"dev": true,
|
||||
"license": "MIT"
|
||||
},
|
||||
"node_modules/@types/trusted-types": {
|
||||
"version": "2.0.7",
|
||||
"resolved": "https://registry.npmjs.org/@types/trusted-types/-/trusted-types-2.0.7.tgz",
|
||||
"integrity": "sha512-ScaPdn1dQczgbl0QFTeTOmVHFULt394XJgOQNoyVhZ6r2vLnMLJfBPd53SB52T/3G36VI1/g2MZaX0cwDuXsfw==",
|
||||
"license": "MIT"
|
||||
},
|
||||
"node_modules/@urql/core": {
|
||||
"version": "5.2.0",
|
||||
"resolved": "https://registry.npmjs.org/@urql/core/-/core-5.2.0.tgz",
|
||||
@@ -745,9 +751,9 @@
|
||||
}
|
||||
},
|
||||
"node_modules/devalue": {
|
||||
"version": "5.6.2",
|
||||
"resolved": "https://registry.npmjs.org/devalue/-/devalue-5.6.2.tgz",
|
||||
"integrity": "sha512-nPRkjWzzDQlsejL1WVifk5rvcFi/y1onBRxjaFMjZeR9mFpqu2gmAZ9xUB9/IEanEP/vBtGeGganC/GO1fmufg==",
|
||||
"version": "5.6.3",
|
||||
"resolved": "https://registry.npmjs.org/devalue/-/devalue-5.6.3.tgz",
|
||||
"integrity": "sha512-nc7XjUU/2Lb+SvEFVGcWLiKkzfw8+qHI7zn8WYXKkLMgfGSHbgCEaR6bJpev8Cm6Rmrb19Gfd/tZvGqx9is3wg==",
|
||||
"license": "MIT"
|
||||
},
|
||||
"node_modules/escape-latex": {
|
||||
@@ -763,9 +769,9 @@
|
||||
"license": "MIT"
|
||||
},
|
||||
"node_modules/esrap": {
|
||||
"version": "2.2.1",
|
||||
"resolved": "https://registry.npmjs.org/esrap/-/esrap-2.2.1.tgz",
|
||||
"integrity": "sha512-GiYWG34AN/4CUyaWAgunGt0Rxvr1PTMlGC0vvEov/uOQYWne2bpN03Um+k8jT+q3op33mKouP2zeJ6OlM+qeUg==",
|
||||
"version": "2.2.3",
|
||||
"resolved": "https://registry.npmjs.org/esrap/-/esrap-2.2.3.tgz",
|
||||
"integrity": "sha512-8fOS+GIGCQZl/ZIlhl59htOlms6U8NvX6ZYgYHpRU/b6tVSh3uHkOHZikl3D4cMbYM0JlpBe+p/BkZEi8J9XIQ==",
|
||||
"license": "MIT",
|
||||
"dependencies": {
|
||||
"@jridgewell/sourcemap-codec": "^1.4.15"
|
||||
@@ -1176,22 +1182,23 @@
|
||||
}
|
||||
},
|
||||
"node_modules/svelte": {
|
||||
"version": "5.46.4",
|
||||
"resolved": "https://registry.npmjs.org/svelte/-/svelte-5.46.4.tgz",
|
||||
"integrity": "sha512-VJwdXrmv9L8L7ZasJeWcCjoIuMRVbhuxbss0fpVnR8yorMmjNDwcjIH08vS6wmSzzzgAG5CADQ1JuXPS2nwt9w==",
|
||||
"version": "5.53.2",
|
||||
"resolved": "https://registry.npmjs.org/svelte/-/svelte-5.53.2.tgz",
|
||||
"integrity": "sha512-yGONuIrcl/BMmqbm6/52Q/NYzfkta7uVlos5NSzGTfNJTTFtPPzra6rAQoQIwAqupeM3s9uuTf5PvioeiCdg9g==",
|
||||
"license": "MIT",
|
||||
"dependencies": {
|
||||
"@jridgewell/remapping": "^2.3.4",
|
||||
"@jridgewell/sourcemap-codec": "^1.5.0",
|
||||
"@sveltejs/acorn-typescript": "^1.0.5",
|
||||
"@types/estree": "^1.0.5",
|
||||
"@types/trusted-types": "^2.0.7",
|
||||
"acorn": "^8.12.1",
|
||||
"aria-query": "^5.3.1",
|
||||
"axobject-query": "^4.1.0",
|
||||
"clsx": "^2.1.1",
|
||||
"devalue": "^5.6.2",
|
||||
"devalue": "^5.6.3",
|
||||
"esm-env": "^1.2.1",
|
||||
"esrap": "^2.2.1",
|
||||
"esrap": "^2.2.2",
|
||||
"is-reference": "^3.0.3",
|
||||
"locate-character": "^3.0.0",
|
||||
"magic-string": "^0.30.11",
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
import ShowUsers from "./admin/ShowUsers.svelte";
|
||||
import Options from "./admin/Options.svelte";
|
||||
import NoticeEdit from "./admin/NoticeEdit.svelte";
|
||||
import RunTaggers from "./admin/RunTaggers.svelte";
|
||||
|
||||
/* Svelte 5 Props */
|
||||
let {
|
||||
@@ -70,4 +71,5 @@
|
||||
</Col>
|
||||
<Options config={ccconfig} {clusterNames}/>
|
||||
<NoticeEdit {ncontent}/>
|
||||
<RunTaggers />
|
||||
</Row>
|
||||
|
||||
142
web/frontend/src/config/admin/RunTaggers.svelte
Normal file
142
web/frontend/src/config/admin/RunTaggers.svelte
Normal file
@@ -0,0 +1,142 @@
|
||||
<!--
|
||||
@component Admin card for running individual job taggers on all jobs
|
||||
-->
|
||||
|
||||
<script>
|
||||
import {
|
||||
Col,
|
||||
Card,
|
||||
CardTitle,
|
||||
CardBody,
|
||||
Spinner,
|
||||
Badge,
|
||||
} from "@sveltestrap/sveltestrap";
|
||||
import { fade } from "svelte/transition";
|
||||
import { onMount, onDestroy } from "svelte";
|
||||
|
||||
/* State Init */
|
||||
let taggers = $state([]);
|
||||
let message = $state({ msg: "", color: "#d63384" });
|
||||
let displayMessage = $state(false);
|
||||
let pollTimer = $state(null);
|
||||
|
||||
/* Functions */
|
||||
async function fetchTaggers() {
|
||||
try {
|
||||
const res = await fetch("/config/taggers/");
|
||||
if (res.ok) {
|
||||
taggers = await res.json();
|
||||
}
|
||||
} catch (err) {
|
||||
console.error("Failed to fetch taggers:", err);
|
||||
}
|
||||
}
|
||||
|
||||
async function runTagger(name) {
|
||||
let formData = new FormData();
|
||||
formData.append("name", name);
|
||||
|
||||
try {
|
||||
const res = await fetch("/config/taggers/run/", {
|
||||
method: "POST",
|
||||
body: formData,
|
||||
});
|
||||
if (res.ok) {
|
||||
let text = await res.text();
|
||||
popMessage(text, "#048109");
|
||||
startPolling();
|
||||
await fetchTaggers();
|
||||
} else {
|
||||
let text = await res.text();
|
||||
throw new Error("Response Code " + res.status + " -> " + text);
|
||||
}
|
||||
} catch (err) {
|
||||
popMessage(err, "#d63384");
|
||||
}
|
||||
}
|
||||
|
||||
function startPolling() {
|
||||
if (pollTimer) return;
|
||||
pollTimer = setInterval(async () => {
|
||||
await fetchTaggers();
|
||||
const anyRunning = taggers.some((t) => t.running);
|
||||
if (!anyRunning) {
|
||||
clearInterval(pollTimer);
|
||||
pollTimer = null;
|
||||
}
|
||||
}, 3000);
|
||||
}
|
||||
|
||||
function popMessage(response, rescolor) {
|
||||
message = { msg: response, color: rescolor };
|
||||
displayMessage = true;
|
||||
setTimeout(function () {
|
||||
displayMessage = false;
|
||||
}, 3500);
|
||||
}
|
||||
|
||||
/* Lifecycle */
|
||||
onMount(async () => {
|
||||
await fetchTaggers();
|
||||
const anyRunning = taggers.some((t) => t.running);
|
||||
if (anyRunning) startPolling();
|
||||
});
|
||||
|
||||
onDestroy(() => {
|
||||
if (pollTimer) clearInterval(pollTimer);
|
||||
});
|
||||
</script>
|
||||
|
||||
<Col>
|
||||
<Card class="h-100">
|
||||
<CardBody>
|
||||
<CardTitle class="mb-3">Job Taggers</CardTitle>
|
||||
<p>Run individual taggers on all existing jobs.</p>
|
||||
{#if taggers.length === 0}
|
||||
<p class="text-muted">No taggers available.</p>
|
||||
{:else}
|
||||
<table class="table table-sm mb-3">
|
||||
<thead>
|
||||
<tr>
|
||||
<th>Name</th>
|
||||
<th>Type</th>
|
||||
<th>Status</th>
|
||||
<th></th>
|
||||
</tr>
|
||||
</thead>
|
||||
<tbody>
|
||||
{#each taggers as tagger}
|
||||
<tr>
|
||||
<td>{tagger.name}</td>
|
||||
<td><Badge color="secondary">{tagger.type}</Badge></td>
|
||||
<td>
|
||||
{#if tagger.running}
|
||||
<Spinner size="sm" color="primary" /> Running
|
||||
{:else}
|
||||
<span class="text-muted">Idle</span>
|
||||
{/if}
|
||||
</td>
|
||||
<td>
|
||||
<button
|
||||
class="btn btn-sm btn-primary"
|
||||
disabled={tagger.running}
|
||||
onclick={() => runTagger(tagger.name)}
|
||||
>
|
||||
Run
|
||||
</button>
|
||||
</td>
|
||||
</tr>
|
||||
{/each}
|
||||
</tbody>
|
||||
</table>
|
||||
{/if}
|
||||
<p>
|
||||
{#if displayMessage}<b
|
||||
><code style="color: {message.color};" out:fade
|
||||
>{message.msg}</code
|
||||
></b
|
||||
>{/if}
|
||||
</p>
|
||||
</CardBody>
|
||||
</Card>
|
||||
</Col>
|
||||
Reference in New Issue
Block a user