mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2026-02-28 13:27:30 +01:00
Reverse Lookup order in stop job request
This commit is contained in:
@@ -488,3 +488,163 @@ func TestRestApi(t *testing.T) {
|
|||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestStopJobWithReusedJobId verifies that stopping a recently started job works
|
||||||
|
// even when an older job with the same jobId exists in the job table (e.g. with
|
||||||
|
// state "failed"). This is a regression test for the bug where Find() on the job
|
||||||
|
// table would match the old job instead of the new one still in job_cache.
|
||||||
|
func TestStopJobWithReusedJobId(t *testing.T) {
|
||||||
|
restapi := setup(t)
|
||||||
|
t.Cleanup(cleanup)
|
||||||
|
|
||||||
|
testData := schema.JobData{
|
||||||
|
"load_one": map[schema.MetricScope]*schema.JobMetric{
|
||||||
|
schema.MetricScopeNode: {
|
||||||
|
Unit: schema.Unit{Base: "load"},
|
||||||
|
Timestep: 60,
|
||||||
|
Series: []schema.Series{
|
||||||
|
{
|
||||||
|
Hostname: "host123",
|
||||||
|
Statistics: schema.MetricStatistics{Min: 0.1, Avg: 0.2, Max: 0.3},
|
||||||
|
Data: []schema.Float{0.1, 0.1, 0.1, 0.2, 0.2, 0.2, 0.3, 0.3, 0.3},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
metricstore.TestLoadDataCallback = func(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context, resolution int) (schema.JobData, error) {
|
||||||
|
return testData, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
r := chi.NewRouter()
|
||||||
|
restapi.MountAPIRoutes(r)
|
||||||
|
|
||||||
|
const contextUserKey repository.ContextKey = "user"
|
||||||
|
contextUserValue := &schema.User{
|
||||||
|
Username: "testuser",
|
||||||
|
Projects: make([]string, 0),
|
||||||
|
Roles: []string{"user"},
|
||||||
|
AuthType: 0,
|
||||||
|
AuthSource: 2,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Step 1: Start the first job (jobId=999)
|
||||||
|
const startJobBody1 string = `{
|
||||||
|
"jobId": 999,
|
||||||
|
"user": "testuser",
|
||||||
|
"project": "testproj",
|
||||||
|
"cluster": "testcluster",
|
||||||
|
"partition": "default",
|
||||||
|
"walltime": 3600,
|
||||||
|
"numNodes": 1,
|
||||||
|
"numHwthreads": 8,
|
||||||
|
"numAcc": 0,
|
||||||
|
"shared": "none",
|
||||||
|
"monitoringStatus": 1,
|
||||||
|
"smt": 1,
|
||||||
|
"resources": [{"hostname": "host123", "hwthreads": [0, 1, 2, 3, 4, 5, 6, 7]}],
|
||||||
|
"startTime": 200000000
|
||||||
|
}`
|
||||||
|
|
||||||
|
if ok := t.Run("StartFirstJob", func(t *testing.T) {
|
||||||
|
req := httptest.NewRequest(http.MethodPost, "/jobs/start_job/", bytes.NewBuffer([]byte(startJobBody1)))
|
||||||
|
recorder := httptest.NewRecorder()
|
||||||
|
ctx := context.WithValue(req.Context(), contextUserKey, contextUserValue)
|
||||||
|
r.ServeHTTP(recorder, req.WithContext(ctx))
|
||||||
|
if recorder.Result().StatusCode != http.StatusCreated {
|
||||||
|
t.Fatal(recorder.Result().Status, recorder.Body.String())
|
||||||
|
}
|
||||||
|
}); !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Step 2: Sync to move job from cache to job table, then stop it as "failed"
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
restapi.JobRepository.SyncJobs()
|
||||||
|
|
||||||
|
const stopJobBody1 string = `{
|
||||||
|
"jobId": 999,
|
||||||
|
"startTime": 200000000,
|
||||||
|
"cluster": "testcluster",
|
||||||
|
"jobState": "failed",
|
||||||
|
"stopTime": 200001000
|
||||||
|
}`
|
||||||
|
|
||||||
|
if ok := t.Run("StopFirstJobAsFailed", func(t *testing.T) {
|
||||||
|
req := httptest.NewRequest(http.MethodPost, "/jobs/stop_job/", bytes.NewBuffer([]byte(stopJobBody1)))
|
||||||
|
recorder := httptest.NewRecorder()
|
||||||
|
ctx := context.WithValue(req.Context(), contextUserKey, contextUserValue)
|
||||||
|
r.ServeHTTP(recorder, req.WithContext(ctx))
|
||||||
|
if recorder.Result().StatusCode != http.StatusOK {
|
||||||
|
t.Fatal(recorder.Result().Status, recorder.Body.String())
|
||||||
|
}
|
||||||
|
|
||||||
|
jobid, cluster := int64(999), "testcluster"
|
||||||
|
job, err := restapi.JobRepository.Find(&jobid, &cluster, nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if job.State != schema.JobStateFailed {
|
||||||
|
t.Fatalf("expected first job to be failed, got: %s", job.State)
|
||||||
|
}
|
||||||
|
}); !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for archiving to complete
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
|
||||||
|
// Step 3: Start a NEW job with the same jobId=999 but different startTime.
|
||||||
|
// This job will sit in job_cache (not yet synced).
|
||||||
|
const startJobBody2 string = `{
|
||||||
|
"jobId": 999,
|
||||||
|
"user": "testuser",
|
||||||
|
"project": "testproj",
|
||||||
|
"cluster": "testcluster",
|
||||||
|
"partition": "default",
|
||||||
|
"walltime": 3600,
|
||||||
|
"numNodes": 1,
|
||||||
|
"numHwthreads": 8,
|
||||||
|
"numAcc": 0,
|
||||||
|
"shared": "none",
|
||||||
|
"monitoringStatus": 1,
|
||||||
|
"smt": 1,
|
||||||
|
"resources": [{"hostname": "host123", "hwthreads": [0, 1, 2, 3, 4, 5, 6, 7]}],
|
||||||
|
"startTime": 300000000
|
||||||
|
}`
|
||||||
|
|
||||||
|
if ok := t.Run("StartSecondJob", func(t *testing.T) {
|
||||||
|
req := httptest.NewRequest(http.MethodPost, "/jobs/start_job/", bytes.NewBuffer([]byte(startJobBody2)))
|
||||||
|
recorder := httptest.NewRecorder()
|
||||||
|
ctx := context.WithValue(req.Context(), contextUserKey, contextUserValue)
|
||||||
|
r.ServeHTTP(recorder, req.WithContext(ctx))
|
||||||
|
if recorder.Result().StatusCode != http.StatusCreated {
|
||||||
|
t.Fatal(recorder.Result().Status, recorder.Body.String())
|
||||||
|
}
|
||||||
|
}); !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Step 4: Stop the second job WITHOUT syncing first.
|
||||||
|
// Before the fix, this would fail because Find() on the job table would
|
||||||
|
// match the old failed job (jobId=999) and reject with "already stopped".
|
||||||
|
const stopJobBody2 string = `{
|
||||||
|
"jobId": 999,
|
||||||
|
"startTime": 300000000,
|
||||||
|
"cluster": "testcluster",
|
||||||
|
"jobState": "completed",
|
||||||
|
"stopTime": 300001000
|
||||||
|
}`
|
||||||
|
|
||||||
|
t.Run("StopSecondJobBeforeSync", func(t *testing.T) {
|
||||||
|
req := httptest.NewRequest(http.MethodPost, "/jobs/stop_job/", bytes.NewBuffer([]byte(stopJobBody2)))
|
||||||
|
recorder := httptest.NewRecorder()
|
||||||
|
ctx := context.WithValue(req.Context(), contextUserKey, contextUserValue)
|
||||||
|
r.ServeHTTP(recorder, req.WithContext(ctx))
|
||||||
|
if recorder.Result().StatusCode != http.StatusOK {
|
||||||
|
t.Fatalf("expected stop to succeed for cached job, got: %s %s",
|
||||||
|
recorder.Result().Status, recorder.Body.String())
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|||||||
@@ -755,16 +755,15 @@ func (api *RestAPI) stopJobByRequest(rw http.ResponseWriter, r *http.Request) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
isCached := false
|
isCached := false
|
||||||
job, err = api.JobRepository.Find(req.JobID, req.Cluster, req.StartTime)
|
job, err = api.JobRepository.FindCached(req.JobID, req.Cluster, req.StartTime)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Try cached jobs if not found in main repository
|
// Not in cache, try main job table
|
||||||
cachedJob, cachedErr := api.JobRepository.FindCached(req.JobID, req.Cluster, req.StartTime)
|
job, err = api.JobRepository.Find(req.JobID, req.Cluster, req.StartTime)
|
||||||
if cachedErr != nil {
|
if err != nil {
|
||||||
// Combine both errors for better debugging
|
handleError(fmt.Errorf("finding job failed: %w", err), http.StatusNotFound, rw)
|
||||||
handleError(fmt.Errorf("finding job failed: %w (cached lookup also failed: %v)", err, cachedErr), http.StatusNotFound, rw)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
job = cachedJob
|
} else {
|
||||||
isCached = true
|
isCached = true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -252,15 +252,15 @@ func (api *NatsAPI) handleStopJob(payload string) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
isCached := false
|
isCached := false
|
||||||
job, err := api.JobRepository.Find(req.JobID, req.Cluster, req.StartTime)
|
job, err := api.JobRepository.FindCached(req.JobID, req.Cluster, req.StartTime)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cachedJob, cachedErr := api.JobRepository.FindCached(req.JobID, req.Cluster, req.StartTime)
|
// Not in cache, try main job table
|
||||||
if cachedErr != nil {
|
job, err = api.JobRepository.Find(req.JobID, req.Cluster, req.StartTime)
|
||||||
cclog.Errorf("NATS job stop: finding job failed: %v (cached lookup also failed: %v)",
|
if err != nil {
|
||||||
err, cachedErr)
|
cclog.Errorf("NATS job stop: finding job failed: %v", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
job = cachedJob
|
} else {
|
||||||
isCached = true
|
isCached = true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user