Merge pull request #494 from ClusterCockpit/dev

Dev
This commit is contained in:
Jan Eitzinger
2026-02-20 08:43:54 +01:00
committed by GitHub
3 changed files with 48 additions and 2 deletions

View File

@@ -85,6 +85,22 @@ func (r *JobRepository) SyncJobs() ([]*schema.Job, error) {
return nil, err return nil, err
} }
// Resolve correct job.id from the job table. The IDs read from job_cache
// are from a different auto-increment sequence and must not be used to
// query the job table.
for _, job := range jobs {
var newID int64
if err := sq.Select("job.id").From("job").
Where("job.job_id = ? AND job.cluster = ? AND job.start_time = ?",
job.JobID, job.Cluster, job.StartTime).
RunWith(r.stmtCache).QueryRow().Scan(&newID); err != nil {
cclog.Warnf("SyncJobs: could not resolve job table id for job %d on %s: %v",
job.JobID, job.Cluster, err)
continue
}
job.ID = &newID
}
return jobs, nil return jobs, nil
} }

View File

@@ -489,6 +489,34 @@ func TestSyncJobs(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
}) })
t.Run("sync returns job table IDs not cache IDs", func(t *testing.T) {
// Ensure cache is empty first
_, err := r.DB.Exec("DELETE FROM job_cache")
require.NoError(t, err)
// Insert a job into job_cache
job := createTestJob(999015, "testcluster")
cacheID, err := r.Start(job)
require.NoError(t, err)
// Sync jobs
jobs, err := r.SyncJobs()
require.NoError(t, err)
require.Equal(t, 1, len(jobs))
// The returned ID must refer to the job table, not job_cache
var jobTableID int64
err = r.DB.QueryRow("SELECT id FROM job WHERE job_id = ? AND cluster = ? AND start_time = ?",
jobs[0].JobID, jobs[0].Cluster, jobs[0].StartTime).Scan(&jobTableID)
require.NoError(t, err)
assert.Equal(t, jobTableID, *jobs[0].ID,
"returned ID should match the job table row, not the cache ID (%d)", cacheID)
// Clean up
_, err = r.DB.Exec("DELETE FROM job WHERE job_id = ? AND cluster = ?", job.JobID, job.Cluster)
require.NoError(t, err)
})
t.Run("sync with empty cache returns empty list", func(t *testing.T) { t.Run("sync with empty cache returns empty list", func(t *testing.T) {
// Ensure cache is empty // Ensure cache is empty
_, err := r.DB.Exec("DELETE FROM job_cache") _, err := r.DB.Exec("DELETE FROM job_cache")

View File

@@ -274,7 +274,7 @@ type NodeStateWithNode struct {
func (r *NodeRepository) FindNodeStatesBefore(cutoff int64) ([]NodeStateWithNode, error) { func (r *NodeRepository) FindNodeStatesBefore(cutoff int64) ([]NodeStateWithNode, error) {
rows, err := sq.Select( rows, err := sq.Select(
"node_state.id", "node_state.time_stamp", "node_state.node_state", "node_state.id", "node_state.time_stamp", "node_state.node_state",
"node_state.health_state", "COALESCE(node_state.health_metrics, '')", "node_state.health_state", "node_state.health_metrics",
"node_state.cpus_allocated", "node_state.memory_allocated", "node_state.cpus_allocated", "node_state.memory_allocated",
"node_state.gpus_allocated", "node_state.jobs_running", "node_state.gpus_allocated", "node_state.jobs_running",
"node.hostname", "node.cluster", "node.subcluster", "node.hostname", "node.cluster", "node.subcluster",
@@ -293,13 +293,15 @@ func (r *NodeRepository) FindNodeStatesBefore(cutoff int64) ([]NodeStateWithNode
var result []NodeStateWithNode var result []NodeStateWithNode
for rows.Next() { for rows.Next() {
var ns NodeStateWithNode var ns NodeStateWithNode
var healthMetrics sql.NullString
if err := rows.Scan(&ns.ID, &ns.TimeStamp, &ns.NodeState, if err := rows.Scan(&ns.ID, &ns.TimeStamp, &ns.NodeState,
&ns.HealthState, &ns.HealthMetrics, &ns.HealthState, &healthMetrics,
&ns.CpusAllocated, &ns.MemoryAllocated, &ns.CpusAllocated, &ns.MemoryAllocated,
&ns.GpusAllocated, &ns.JobsRunning, &ns.GpusAllocated, &ns.JobsRunning,
&ns.Hostname, &ns.Cluster, &ns.SubCluster); err != nil { &ns.Hostname, &ns.Cluster, &ns.SubCluster); err != nil {
return nil, err return nil, err
} }
ns.HealthMetrics = healthMetrics.String
result = append(result, ns) result = append(result, ns)
} }
return result, nil return result, nil