From ac7eb93141d081ca083e8e576e8bc61268f2671e Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Mon, 9 Feb 2026 19:57:46 +0100 Subject: [PATCH] fix: Transfer always to main job table before archiving --- internal/api/job.go | 22 +++++++--- internal/api/nats.go | 22 +++++++--- internal/repository/jobCreate.go | 43 +++++++++++-------- internal/repository/jobCreate_test.go | 62 ++++++++++++++------------- 4 files changed, 91 insertions(+), 58 deletions(-) diff --git a/internal/api/job.go b/internal/api/job.go index d67dbb93..9bd93b1c 100644 --- a/internal/api/job.go +++ b/internal/api/job.go @@ -754,6 +754,7 @@ func (api *RestAPI) stopJobByRequest(rw http.ResponseWriter, r *http.Request) { return } + isCached := false job, err = api.JobRepository.Find(req.JobID, req.Cluster, req.StartTime) if err != nil { // Try cached jobs if not found in main repository @@ -764,9 +765,10 @@ func (api *RestAPI) stopJobByRequest(rw http.ResponseWriter, r *http.Request) { return } job = cachedJob + isCached = true } - api.checkAndHandleStopJob(rw, job, req) + api.checkAndHandleStopJob(rw, job, req, isCached) } // deleteJobByID godoc @@ -923,7 +925,7 @@ func (api *RestAPI) deleteJobBefore(rw http.ResponseWriter, r *http.Request) { } } -func (api *RestAPI) checkAndHandleStopJob(rw http.ResponseWriter, job *schema.Job, req StopJobAPIRequest) { +func (api *RestAPI) checkAndHandleStopJob(rw http.ResponseWriter, job *schema.Job, req StopJobAPIRequest, isCached bool) { // Sanity checks if job.State != schema.JobStateRunning { handleError(fmt.Errorf("jobId %d (id %d) on %s : job has already been stopped (state is: %s)", job.JobID, *job.ID, job.Cluster, job.State), http.StatusUnprocessableEntity, rw) @@ -948,11 +950,21 @@ func (api *RestAPI) checkAndHandleStopJob(rw http.ResponseWriter, job *schema.Jo api.JobRepository.Mutex.Lock() defer api.JobRepository.Mutex.Unlock() - if err := api.JobRepository.Stop(*job.ID, job.Duration, job.State, job.MonitoringStatus); err != nil { - if err := api.JobRepository.StopCached(*job.ID, job.Duration, job.State, job.MonitoringStatus); err != nil { - handleError(fmt.Errorf("jobId %d (id %d) on %s : marking job as '%s' (duration: %d) in DB failed: %w", job.JobID, *job.ID, job.Cluster, job.State, job.Duration, err), http.StatusInternalServerError, rw) + // If the job is still in job_cache, transfer it to the job table first + // so that job.ID always points to the job table for downstream code + if isCached { + newID, err := api.JobRepository.TransferCachedJobToMain(*job.ID) + if err != nil { + handleError(fmt.Errorf("jobId %d (id %d) on %s : transferring cached job failed: %w", job.JobID, *job.ID, job.Cluster, err), http.StatusInternalServerError, rw) return } + cclog.Infof("transferred cached job to main table: old id %d -> new id %d (jobId=%d)", *job.ID, newID, job.JobID) + job.ID = &newID + } + + if err := api.JobRepository.Stop(*job.ID, job.Duration, job.State, job.MonitoringStatus); err != nil { + handleError(fmt.Errorf("jobId %d (id %d) on %s : marking job as '%s' (duration: %d) in DB failed: %w", job.JobID, *job.ID, job.Cluster, job.State, job.Duration, err), http.StatusInternalServerError, rw) + return } cclog.Infof("archiving job... (dbid: %d): cluster=%s, jobId=%d, user=%s, startTime=%d, duration=%d, state=%s", *job.ID, job.Cluster, job.JobID, job.User, job.StartTime, job.Duration, job.State) diff --git a/internal/api/nats.go b/internal/api/nats.go index c0a8c174..0e929426 100644 --- a/internal/api/nats.go +++ b/internal/api/nats.go @@ -251,6 +251,7 @@ func (api *NatsAPI) handleStopJob(payload string) { return } + isCached := false job, err := api.JobRepository.Find(req.JobID, req.Cluster, req.StartTime) if err != nil { cachedJob, cachedErr := api.JobRepository.FindCached(req.JobID, req.Cluster, req.StartTime) @@ -260,6 +261,7 @@ func (api *NatsAPI) handleStopJob(payload string) { return } job = cachedJob + isCached = true } if job.State != schema.JobStateRunning { @@ -287,16 +289,26 @@ func (api *NatsAPI) handleStopJob(payload string) { api.JobRepository.Mutex.Lock() defer api.JobRepository.Mutex.Unlock() - if err := api.JobRepository.Stop(*job.ID, job.Duration, job.State, job.MonitoringStatus); err != nil { - if err := api.JobRepository.StopCached(*job.ID, job.Duration, job.State, job.MonitoringStatus); err != nil { - cclog.Errorf("NATS job stop: jobId %d (id %d) on %s: marking job as '%s' failed: %v", - job.JobID, job.ID, job.Cluster, job.State, err) + // If the job is still in job_cache, transfer it to the job table first + if isCached { + newID, err := api.JobRepository.TransferCachedJobToMain(*job.ID) + if err != nil { + cclog.Errorf("NATS job stop: jobId %d (id %d) on %s: transferring cached job failed: %v", + job.JobID, *job.ID, job.Cluster, err) return } + cclog.Infof("NATS: transferred cached job to main table: old id %d -> new id %d (jobId=%d)", *job.ID, newID, job.JobID) + job.ID = &newID + } + + if err := api.JobRepository.Stop(*job.ID, job.Duration, job.State, job.MonitoringStatus); err != nil { + cclog.Errorf("NATS job stop: jobId %d (id %d) on %s: marking job as '%s' failed: %v", + job.JobID, *job.ID, job.Cluster, job.State, err) + return } cclog.Infof("NATS: archiving job (dbid: %d): cluster=%s, jobId=%d, user=%s, startTime=%d, duration=%d, state=%s", - job.ID, job.Cluster, job.JobID, job.User, job.StartTime, job.Duration, job.State) + *job.ID, job.Cluster, job.JobID, job.User, job.StartTime, job.Duration, job.State) if job.MonitoringStatus == schema.MonitoringStatusDisabled { return diff --git a/internal/repository/jobCreate.go b/internal/repository/jobCreate.go index 6114ae5e..9f4f366d 100644 --- a/internal/repository/jobCreate.go +++ b/internal/repository/jobCreate.go @@ -71,8 +71,9 @@ func (r *JobRepository) SyncJobs() ([]*schema.Job, error) { jobs = append(jobs, job) } + // Use INSERT OR IGNORE to skip jobs already transferred by the stop path _, err = r.DB.Exec( - "INSERT INTO job (job_id, cluster, subcluster, start_time, hpc_user, project, cluster_partition, array_job_id, num_nodes, num_hwthreads, num_acc, shared, monitoring_status, smt, job_state, duration, walltime, footprint, energy, energy_footprint, resources, meta_data) SELECT job_id, cluster, subcluster, start_time, hpc_user, project, cluster_partition, array_job_id, num_nodes, num_hwthreads, num_acc, shared, monitoring_status, smt, job_state, duration, walltime, footprint, energy, energy_footprint, resources, meta_data FROM job_cache") + "INSERT OR IGNORE INTO job (job_id, cluster, subcluster, start_time, hpc_user, project, cluster_partition, array_job_id, num_nodes, num_hwthreads, num_acc, shared, monitoring_status, smt, job_state, duration, walltime, footprint, energy, energy_footprint, resources, meta_data) SELECT job_id, cluster, subcluster, start_time, hpc_user, project, cluster_partition, array_job_id, num_nodes, num_hwthreads, num_acc, shared, monitoring_status, smt, job_state, duration, walltime, footprint, energy, energy_footprint, resources, meta_data FROM job_cache") if err != nil { cclog.Warnf("Error while Job sync: %v", err) return nil, err @@ -87,6 +88,29 @@ func (r *JobRepository) SyncJobs() ([]*schema.Job, error) { return jobs, nil } +// TransferCachedJobToMain moves a job from job_cache to the job table. +// Caller must hold r.Mutex. Returns the new job table ID. +func (r *JobRepository) TransferCachedJobToMain(cacheID int64) (int64, error) { + res, err := r.DB.Exec( + "INSERT INTO job (job_id, cluster, subcluster, start_time, hpc_user, project, cluster_partition, array_job_id, num_nodes, num_hwthreads, num_acc, shared, monitoring_status, smt, job_state, duration, walltime, footprint, energy, energy_footprint, resources, meta_data) SELECT job_id, cluster, subcluster, start_time, hpc_user, project, cluster_partition, array_job_id, num_nodes, num_hwthreads, num_acc, shared, monitoring_status, smt, job_state, duration, walltime, footprint, energy, energy_footprint, resources, meta_data FROM job_cache WHERE id = ?", + cacheID) + if err != nil { + return 0, fmt.Errorf("transferring cached job %d to main table failed: %w", cacheID, err) + } + + newID, err := res.LastInsertId() + if err != nil { + return 0, fmt.Errorf("getting new job ID after transfer failed: %w", err) + } + + _, err = r.DB.Exec("DELETE FROM job_cache WHERE id = ?", cacheID) + if err != nil { + return 0, fmt.Errorf("deleting cached job %d after transfer failed: %w", cacheID, err) + } + + return newID, nil +} + // Start inserts a new job in the table, returning the unique job ID. // Statistics are not transfered! func (r *JobRepository) Start(job *schema.Job) (id int64, err error) { @@ -129,20 +153,3 @@ func (r *JobRepository) Stop( return err } -func (r *JobRepository) StopCached( - jobID int64, - duration int32, - state schema.JobState, - monitoringStatus int32, -) (err error) { - // Note: StopCached updates job_cache table, not the main job table - // Cache invalidation happens when job is synced to main table - stmt := sq.Update("job_cache"). - Set("job_state", state). - Set("duration", duration). - Set("monitoring_status", monitoringStatus). - Where("job_cache.id = ?", jobID) - - _, err = stmt.RunWith(r.stmtCache).Exec() - return err -} diff --git a/internal/repository/jobCreate_test.go b/internal/repository/jobCreate_test.go index 3a586482..9e72555f 100644 --- a/internal/repository/jobCreate_test.go +++ b/internal/repository/jobCreate_test.go @@ -331,58 +331,60 @@ func TestStop(t *testing.T) { }) } -func TestStopCached(t *testing.T) { +func TestTransferCachedJobToMain(t *testing.T) { r := setup(t) - t.Run("successful stop cached job", func(t *testing.T) { + t.Run("successful transfer from cache to main", func(t *testing.T) { // Insert a job in job_cache job := createTestJob(999009, "testcluster") - id, err := r.Start(job) + cacheID, err := r.Start(job) require.NoError(t, err) - // Stop the cached job - duration := int32(3600) - state := schema.JobStateCompleted - monitoringStatus := int32(schema.MonitoringStatusArchivingSuccessful) + // Transfer the cached job to the main table + r.Mutex.Lock() + newID, err := r.TransferCachedJobToMain(cacheID) + r.Mutex.Unlock() + require.NoError(t, err, "TransferCachedJobToMain should succeed") + assert.NotEqual(t, cacheID, newID, "New ID should differ from cache ID") - err = r.StopCached(id, duration, state, monitoringStatus) - require.NoError(t, err, "StopCached should succeed") - - // Verify job was updated in job_cache table - var retrievedDuration int32 - var retrievedState string - var retrievedMonStatus int32 - err = r.DB.QueryRow(`SELECT duration, job_state, monitoring_status FROM job_cache WHERE id = ?`, id).Scan( - &retrievedDuration, &retrievedState, &retrievedMonStatus) + // Verify job exists in job table + var count int + err = r.DB.QueryRow(`SELECT COUNT(*) FROM job WHERE id = ?`, newID).Scan(&count) require.NoError(t, err) - assert.Equal(t, duration, retrievedDuration) - assert.Equal(t, string(state), retrievedState) - assert.Equal(t, monitoringStatus, retrievedMonStatus) + assert.Equal(t, 1, count, "Job should exist in main table") + + // Verify job was removed from job_cache + err = r.DB.QueryRow(`SELECT COUNT(*) FROM job_cache WHERE id = ?`, cacheID).Scan(&count) + require.NoError(t, err) + assert.Equal(t, 0, count, "Job should be removed from cache") // Clean up - _, err = r.DB.Exec("DELETE FROM job_cache WHERE id = ?", id) + _, err = r.DB.Exec("DELETE FROM job WHERE id = ?", newID) require.NoError(t, err) }) - t.Run("stop cached job does not affect job table", func(t *testing.T) { + t.Run("transfer preserves job data", func(t *testing.T) { // Insert a job in job_cache job := createTestJob(999010, "testcluster") - id, err := r.Start(job) + cacheID, err := r.Start(job) require.NoError(t, err) - // Stop the cached job - err = r.StopCached(id, 3600, schema.JobStateCompleted, int32(schema.MonitoringStatusArchivingSuccessful)) + // Transfer the cached job + r.Mutex.Lock() + newID, err := r.TransferCachedJobToMain(cacheID) + r.Mutex.Unlock() require.NoError(t, err) - // Verify job table was not affected - var count int - err = r.DB.QueryRow(`SELECT COUNT(*) FROM job WHERE job_id = ? AND cluster = ?`, - job.JobID, job.Cluster).Scan(&count) + // Verify the transferred job has the correct data + var jobID int64 + var cluster string + err = r.DB.QueryRow(`SELECT job_id, cluster FROM job WHERE id = ?`, newID).Scan(&jobID, &cluster) require.NoError(t, err) - assert.Equal(t, 0, count, "Job table should not be affected by StopCached") + assert.Equal(t, job.JobID, jobID) + assert.Equal(t, job.Cluster, cluster) // Clean up - _, err = r.DB.Exec("DELETE FROM job_cache WHERE id = ?", id) + _, err = r.DB.Exec("DELETE FROM job WHERE id = ?", newID) require.NoError(t, err) }) }