Fix bug in SyncJobs causing start job hooks being called with job_cache ID

This commit is contained in:
2026-02-20 08:29:41 +01:00
parent abd11d783b
commit dc161ec421
2 changed files with 44 additions and 0 deletions

View File

@@ -85,6 +85,22 @@ func (r *JobRepository) SyncJobs() ([]*schema.Job, error) {
return nil, err return nil, err
} }
// Resolve correct job.id from the job table. The IDs read from job_cache
// are from a different auto-increment sequence and must not be used to
// query the job table.
for _, job := range jobs {
var newID int64
if err := sq.Select("job.id").From("job").
Where("job.job_id = ? AND job.cluster = ? AND job.start_time = ?",
job.JobID, job.Cluster, job.StartTime).
RunWith(r.stmtCache).QueryRow().Scan(&newID); err != nil {
cclog.Warnf("SyncJobs: could not resolve job table id for job %d on %s: %v",
job.JobID, job.Cluster, err)
continue
}
job.ID = &newID
}
return jobs, nil return jobs, nil
} }

View File

@@ -489,6 +489,34 @@ func TestSyncJobs(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
}) })
t.Run("sync returns job table IDs not cache IDs", func(t *testing.T) {
// Ensure cache is empty first
_, err := r.DB.Exec("DELETE FROM job_cache")
require.NoError(t, err)
// Insert a job into job_cache
job := createTestJob(999015, "testcluster")
cacheID, err := r.Start(job)
require.NoError(t, err)
// Sync jobs
jobs, err := r.SyncJobs()
require.NoError(t, err)
require.Equal(t, 1, len(jobs))
// The returned ID must refer to the job table, not job_cache
var jobTableID int64
err = r.DB.QueryRow("SELECT id FROM job WHERE job_id = ? AND cluster = ? AND start_time = ?",
jobs[0].JobID, jobs[0].Cluster, jobs[0].StartTime).Scan(&jobTableID)
require.NoError(t, err)
assert.Equal(t, jobTableID, *jobs[0].ID,
"returned ID should match the job table row, not the cache ID (%d)", cacheID)
// Clean up
_, err = r.DB.Exec("DELETE FROM job WHERE job_id = ? AND cluster = ?", job.JobID, job.Cluster)
require.NoError(t, err)
})
t.Run("sync with empty cache returns empty list", func(t *testing.T) { t.Run("sync with empty cache returns empty list", func(t *testing.T) {
// Ensure cache is empty // Ensure cache is empty
_, err := r.DB.Exec("DELETE FROM job_cache") _, err := r.DB.Exec("DELETE FROM job_cache")