Wrap SyncJobs in transaction

Entire-Checkpoint: d4f6c79a8dc1
This commit is contained in:
2026-03-16 11:25:49 +01:00
parent 51517f8031
commit e4f3fa9ba0

View File

@@ -141,7 +141,13 @@ func (r *JobRepository) SyncJobs() ([]*schema.Job, error) {
// TransferCachedJobToMain moves a job from job_cache to the job table.
// Caller must hold r.Mutex. Returns the new job table ID.
func (r *JobRepository) TransferCachedJobToMain(cacheID int64) (int64, error) {
res, err := r.DB.Exec(
tx, err := r.DB.Beginx()
if err != nil {
return 0, fmt.Errorf("TransferCachedJobToMain: begin transaction: %w", err)
}
defer tx.Rollback()
res, err := tx.Exec(
"INSERT INTO job (job_id, cluster, subcluster, start_time, hpc_user, project, cluster_partition, array_job_id, num_nodes, num_hwthreads, num_acc, shared, monitoring_status, smt, job_state, duration, walltime, footprint, energy, energy_footprint, resources, meta_data) SELECT job_id, cluster, subcluster, start_time, hpc_user, project, cluster_partition, array_job_id, num_nodes, num_hwthreads, num_acc, shared, monitoring_status, smt, job_state, duration, walltime, footprint, energy, energy_footprint, resources, meta_data FROM job_cache WHERE id = ?",
cacheID)
if err != nil {
@@ -153,11 +159,15 @@ func (r *JobRepository) TransferCachedJobToMain(cacheID int64) (int64, error) {
return 0, fmt.Errorf("getting new job ID after transfer failed: %w", err)
}
_, err = r.DB.Exec("DELETE FROM job_cache WHERE id = ?", cacheID)
_, err = tx.Exec("DELETE FROM job_cache WHERE id = ?", cacheID)
if err != nil {
return 0, fmt.Errorf("deleting cached job %d after transfer failed: %w", cacheID, err)
}
if err := tx.Commit(); err != nil {
return 0, fmt.Errorf("TransferCachedJobToMain: commit: %w", err)
}
return newID, nil
}