From 82e79b074a3d4b5576331f180855353ba36cefb3 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Sat, 21 Feb 2026 13:51:31 +0100 Subject: [PATCH] Reverse Lookup order in stop job request --- internal/api/api_test.go | 160 +++++++++++++++++++++++++++++++++++++++ internal/api/job.go | 13 ++-- internal/api/nats.go | 12 +-- 3 files changed, 172 insertions(+), 13 deletions(-) diff --git a/internal/api/api_test.go b/internal/api/api_test.go index 09fc4c7f..a8aef889 100644 --- a/internal/api/api_test.go +++ b/internal/api/api_test.go @@ -488,3 +488,163 @@ func TestRestApi(t *testing.T) { } }) } + +// TestStopJobWithReusedJobId verifies that stopping a recently started job works +// even when an older job with the same jobId exists in the job table (e.g. with +// state "failed"). This is a regression test for the bug where Find() on the job +// table would match the old job instead of the new one still in job_cache. +func TestStopJobWithReusedJobId(t *testing.T) { + restapi := setup(t) + t.Cleanup(cleanup) + + testData := schema.JobData{ + "load_one": map[schema.MetricScope]*schema.JobMetric{ + schema.MetricScopeNode: { + Unit: schema.Unit{Base: "load"}, + Timestep: 60, + Series: []schema.Series{ + { + Hostname: "host123", + Statistics: schema.MetricStatistics{Min: 0.1, Avg: 0.2, Max: 0.3}, + Data: []schema.Float{0.1, 0.1, 0.1, 0.2, 0.2, 0.2, 0.3, 0.3, 0.3}, + }, + }, + }, + }, + } + + metricstore.TestLoadDataCallback = func(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context, resolution int) (schema.JobData, error) { + return testData, nil + } + + r := chi.NewRouter() + restapi.MountAPIRoutes(r) + + const contextUserKey repository.ContextKey = "user" + contextUserValue := &schema.User{ + Username: "testuser", + Projects: make([]string, 0), + Roles: []string{"user"}, + AuthType: 0, + AuthSource: 2, + } + + // Step 1: Start the first job (jobId=999) + const startJobBody1 string = `{ + "jobId": 999, + "user": "testuser", + "project": "testproj", + "cluster": "testcluster", + "partition": "default", + "walltime": 3600, + "numNodes": 1, + "numHwthreads": 8, + "numAcc": 0, + "shared": "none", + "monitoringStatus": 1, + "smt": 1, + "resources": [{"hostname": "host123", "hwthreads": [0, 1, 2, 3, 4, 5, 6, 7]}], + "startTime": 200000000 + }` + + if ok := t.Run("StartFirstJob", func(t *testing.T) { + req := httptest.NewRequest(http.MethodPost, "/jobs/start_job/", bytes.NewBuffer([]byte(startJobBody1))) + recorder := httptest.NewRecorder() + ctx := context.WithValue(req.Context(), contextUserKey, contextUserValue) + r.ServeHTTP(recorder, req.WithContext(ctx)) + if recorder.Result().StatusCode != http.StatusCreated { + t.Fatal(recorder.Result().Status, recorder.Body.String()) + } + }); !ok { + return + } + + // Step 2: Sync to move job from cache to job table, then stop it as "failed" + time.Sleep(1 * time.Second) + restapi.JobRepository.SyncJobs() + + const stopJobBody1 string = `{ + "jobId": 999, + "startTime": 200000000, + "cluster": "testcluster", + "jobState": "failed", + "stopTime": 200001000 + }` + + if ok := t.Run("StopFirstJobAsFailed", func(t *testing.T) { + req := httptest.NewRequest(http.MethodPost, "/jobs/stop_job/", bytes.NewBuffer([]byte(stopJobBody1))) + recorder := httptest.NewRecorder() + ctx := context.WithValue(req.Context(), contextUserKey, contextUserValue) + r.ServeHTTP(recorder, req.WithContext(ctx)) + if recorder.Result().StatusCode != http.StatusOK { + t.Fatal(recorder.Result().Status, recorder.Body.String()) + } + + jobid, cluster := int64(999), "testcluster" + job, err := restapi.JobRepository.Find(&jobid, &cluster, nil) + if err != nil { + t.Fatal(err) + } + if job.State != schema.JobStateFailed { + t.Fatalf("expected first job to be failed, got: %s", job.State) + } + }); !ok { + return + } + + // Wait for archiving to complete + time.Sleep(1 * time.Second) + + // Step 3: Start a NEW job with the same jobId=999 but different startTime. + // This job will sit in job_cache (not yet synced). + const startJobBody2 string = `{ + "jobId": 999, + "user": "testuser", + "project": "testproj", + "cluster": "testcluster", + "partition": "default", + "walltime": 3600, + "numNodes": 1, + "numHwthreads": 8, + "numAcc": 0, + "shared": "none", + "monitoringStatus": 1, + "smt": 1, + "resources": [{"hostname": "host123", "hwthreads": [0, 1, 2, 3, 4, 5, 6, 7]}], + "startTime": 300000000 + }` + + if ok := t.Run("StartSecondJob", func(t *testing.T) { + req := httptest.NewRequest(http.MethodPost, "/jobs/start_job/", bytes.NewBuffer([]byte(startJobBody2))) + recorder := httptest.NewRecorder() + ctx := context.WithValue(req.Context(), contextUserKey, contextUserValue) + r.ServeHTTP(recorder, req.WithContext(ctx)) + if recorder.Result().StatusCode != http.StatusCreated { + t.Fatal(recorder.Result().Status, recorder.Body.String()) + } + }); !ok { + return + } + + // Step 4: Stop the second job WITHOUT syncing first. + // Before the fix, this would fail because Find() on the job table would + // match the old failed job (jobId=999) and reject with "already stopped". + const stopJobBody2 string = `{ + "jobId": 999, + "startTime": 300000000, + "cluster": "testcluster", + "jobState": "completed", + "stopTime": 300001000 + }` + + t.Run("StopSecondJobBeforeSync", func(t *testing.T) { + req := httptest.NewRequest(http.MethodPost, "/jobs/stop_job/", bytes.NewBuffer([]byte(stopJobBody2))) + recorder := httptest.NewRecorder() + ctx := context.WithValue(req.Context(), contextUserKey, contextUserValue) + r.ServeHTTP(recorder, req.WithContext(ctx)) + if recorder.Result().StatusCode != http.StatusOK { + t.Fatalf("expected stop to succeed for cached job, got: %s %s", + recorder.Result().Status, recorder.Body.String()) + } + }) +} diff --git a/internal/api/job.go b/internal/api/job.go index 66258668..c4a81cf2 100644 --- a/internal/api/job.go +++ b/internal/api/job.go @@ -755,16 +755,15 @@ func (api *RestAPI) stopJobByRequest(rw http.ResponseWriter, r *http.Request) { } isCached := false - job, err = api.JobRepository.Find(req.JobID, req.Cluster, req.StartTime) + job, err = api.JobRepository.FindCached(req.JobID, req.Cluster, req.StartTime) if err != nil { - // Try cached jobs if not found in main repository - cachedJob, cachedErr := api.JobRepository.FindCached(req.JobID, req.Cluster, req.StartTime) - if cachedErr != nil { - // Combine both errors for better debugging - handleError(fmt.Errorf("finding job failed: %w (cached lookup also failed: %v)", err, cachedErr), http.StatusNotFound, rw) + // Not in cache, try main job table + job, err = api.JobRepository.Find(req.JobID, req.Cluster, req.StartTime) + if err != nil { + handleError(fmt.Errorf("finding job failed: %w", err), http.StatusNotFound, rw) return } - job = cachedJob + } else { isCached = true } diff --git a/internal/api/nats.go b/internal/api/nats.go index 0e929426..f1684f20 100644 --- a/internal/api/nats.go +++ b/internal/api/nats.go @@ -252,15 +252,15 @@ func (api *NatsAPI) handleStopJob(payload string) { } isCached := false - job, err := api.JobRepository.Find(req.JobID, req.Cluster, req.StartTime) + job, err := api.JobRepository.FindCached(req.JobID, req.Cluster, req.StartTime) if err != nil { - cachedJob, cachedErr := api.JobRepository.FindCached(req.JobID, req.Cluster, req.StartTime) - if cachedErr != nil { - cclog.Errorf("NATS job stop: finding job failed: %v (cached lookup also failed: %v)", - err, cachedErr) + // Not in cache, try main job table + job, err = api.JobRepository.Find(req.JobID, req.Cluster, req.StartTime) + if err != nil { + cclog.Errorf("NATS job stop: finding job failed: %v", err) return } - job = cachedJob + } else { isCached = true }