diff --git a/internal/repository/jobCreate.go b/internal/repository/jobCreate.go index 9f4f366d..1ae05d8d 100644 --- a/internal/repository/jobCreate.go +++ b/internal/repository/jobCreate.go @@ -85,6 +85,22 @@ func (r *JobRepository) SyncJobs() ([]*schema.Job, error) { return nil, err } + // Resolve correct job.id from the job table. The IDs read from job_cache + // are from a different auto-increment sequence and must not be used to + // query the job table. + for _, job := range jobs { + var newID int64 + if err := sq.Select("job.id").From("job"). + Where("job.job_id = ? AND job.cluster = ? AND job.start_time = ?", + job.JobID, job.Cluster, job.StartTime). + RunWith(r.stmtCache).QueryRow().Scan(&newID); err != nil { + cclog.Warnf("SyncJobs: could not resolve job table id for job %d on %s: %v", + job.JobID, job.Cluster, err) + continue + } + job.ID = &newID + } + return jobs, nil } diff --git a/internal/repository/jobCreate_test.go b/internal/repository/jobCreate_test.go index 9e72555f..5bc0c0ee 100644 --- a/internal/repository/jobCreate_test.go +++ b/internal/repository/jobCreate_test.go @@ -489,6 +489,34 @@ func TestSyncJobs(t *testing.T) { require.NoError(t, err) }) + t.Run("sync returns job table IDs not cache IDs", func(t *testing.T) { + // Ensure cache is empty first + _, err := r.DB.Exec("DELETE FROM job_cache") + require.NoError(t, err) + + // Insert a job into job_cache + job := createTestJob(999015, "testcluster") + cacheID, err := r.Start(job) + require.NoError(t, err) + + // Sync jobs + jobs, err := r.SyncJobs() + require.NoError(t, err) + require.Equal(t, 1, len(jobs)) + + // The returned ID must refer to the job table, not job_cache + var jobTableID int64 + err = r.DB.QueryRow("SELECT id FROM job WHERE job_id = ? AND cluster = ? AND start_time = ?", + jobs[0].JobID, jobs[0].Cluster, jobs[0].StartTime).Scan(&jobTableID) + require.NoError(t, err) + assert.Equal(t, jobTableID, *jobs[0].ID, + "returned ID should match the job table row, not the cache ID (%d)", cacheID) + + // Clean up + _, err = r.DB.Exec("DELETE FROM job WHERE job_id = ? AND cluster = ?", job.JobID, job.Cluster) + require.NoError(t, err) + }) + t.Run("sync with empty cache returns empty list", func(t *testing.T) { // Ensure cache is empty _, err := r.DB.Exec("DELETE FROM job_cache") diff --git a/internal/repository/node.go b/internal/repository/node.go index 4b10aea3..09415bef 100644 --- a/internal/repository/node.go +++ b/internal/repository/node.go @@ -274,7 +274,7 @@ type NodeStateWithNode struct { func (r *NodeRepository) FindNodeStatesBefore(cutoff int64) ([]NodeStateWithNode, error) { rows, err := sq.Select( "node_state.id", "node_state.time_stamp", "node_state.node_state", - "node_state.health_state", "COALESCE(node_state.health_metrics, '')", + "node_state.health_state", "node_state.health_metrics", "node_state.cpus_allocated", "node_state.memory_allocated", "node_state.gpus_allocated", "node_state.jobs_running", "node.hostname", "node.cluster", "node.subcluster", @@ -293,13 +293,15 @@ func (r *NodeRepository) FindNodeStatesBefore(cutoff int64) ([]NodeStateWithNode var result []NodeStateWithNode for rows.Next() { var ns NodeStateWithNode + var healthMetrics sql.NullString if err := rows.Scan(&ns.ID, &ns.TimeStamp, &ns.NodeState, - &ns.HealthState, &ns.HealthMetrics, + &ns.HealthState, &healthMetrics, &ns.CpusAllocated, &ns.MemoryAllocated, &ns.GpusAllocated, &ns.JobsRunning, &ns.Hostname, &ns.Cluster, &ns.SubCluster); err != nil { return nil, err } + ns.HealthMetrics = healthMetrics.String result = append(result, ns) } return result, nil