mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2026-02-24 11:27:30 +01:00
Fix more bugs related to job_cache ids used in job table
This commit is contained in:
@@ -697,7 +697,15 @@ func (api *RestAPI) startJob(rw http.ResponseWriter, r *http.Request) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
id, err := api.JobRepository.Start(&req)
|
// When tags are present, insert directly into the job table so that the
|
||||||
|
// returned ID can be used with AddTagOrCreate (which queries the job table).
|
||||||
|
// Jobs without tags use the cache path as before.
|
||||||
|
var id int64
|
||||||
|
if len(req.Tags) > 0 {
|
||||||
|
id, err = api.JobRepository.StartDirect(&req)
|
||||||
|
} else {
|
||||||
|
id, err = api.JobRepository.Start(&req)
|
||||||
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
handleError(fmt.Errorf("insert into database failed: %w", err), http.StatusInternalServerError, rw)
|
handleError(fmt.Errorf("insert into database failed: %w", err), http.StatusInternalServerError, rw)
|
||||||
return
|
return
|
||||||
|
|||||||
@@ -211,7 +211,14 @@ func (api *NatsAPI) handleStartJob(payload string) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
id, err := api.JobRepository.Start(&req)
|
// When tags are present, insert directly into the job table so that the
|
||||||
|
// returned ID can be used with AddTagOrCreate (which queries the job table).
|
||||||
|
var id int64
|
||||||
|
if len(req.Tags) > 0 {
|
||||||
|
id, err = api.JobRepository.StartDirect(&req)
|
||||||
|
} else {
|
||||||
|
id, err = api.JobRepository.Start(&req)
|
||||||
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cclog.Errorf("NATS start job: insert into database failed: %v", err)
|
cclog.Errorf("NATS start job: insert into database failed: %v", err)
|
||||||
return
|
return
|
||||||
|
|||||||
@@ -102,7 +102,7 @@ func HandleImportFlag(flag string) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
id, err := r.InsertJob(&job)
|
id, err := r.InsertJobDirect(&job)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cclog.Warn("Error while job db insert")
|
cclog.Warn("Error while job db insert")
|
||||||
return err
|
return err
|
||||||
|
|||||||
@@ -165,7 +165,7 @@ func TestHandleImportFlag(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
result := readResult(t, testname)
|
result := readResult(t, testname)
|
||||||
job, err := r.FindCached(&result.JobId, &result.Cluster, &result.StartTime)
|
job, err := r.Find(&result.JobId, &result.Cluster, &result.StartTime)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -30,6 +30,27 @@ const NamedJobInsert string = `INSERT INTO job (
|
|||||||
:shared, :monitoring_status, :smt, :job_state, :start_time, :duration, :walltime, :footprint, :energy, :energy_footprint, :resources, :meta_data
|
:shared, :monitoring_status, :smt, :job_state, :start_time, :duration, :walltime, :footprint, :energy, :energy_footprint, :resources, :meta_data
|
||||||
);`
|
);`
|
||||||
|
|
||||||
|
// InsertJobDirect inserts a job directly into the job table (not job_cache).
|
||||||
|
// Use this when the returned ID will be used for operations on the job table
|
||||||
|
// (e.g., adding tags), or for imported jobs that are already completed.
|
||||||
|
func (r *JobRepository) InsertJobDirect(job *schema.Job) (int64, error) {
|
||||||
|
r.Mutex.Lock()
|
||||||
|
defer r.Mutex.Unlock()
|
||||||
|
|
||||||
|
res, err := r.DB.NamedExec(NamedJobInsert, job)
|
||||||
|
if err != nil {
|
||||||
|
cclog.Warn("Error while NamedJobInsert (direct)")
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
id, err := res.LastInsertId()
|
||||||
|
if err != nil {
|
||||||
|
cclog.Warn("Error while getting last insert ID (direct)")
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return id, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (r *JobRepository) InsertJob(job *schema.Job) (int64, error) {
|
func (r *JobRepository) InsertJob(job *schema.Job) (int64, error) {
|
||||||
r.Mutex.Lock()
|
r.Mutex.Lock()
|
||||||
defer r.Mutex.Unlock()
|
defer r.Mutex.Unlock()
|
||||||
@@ -148,6 +169,28 @@ func (r *JobRepository) Start(job *schema.Job) (id int64, err error) {
|
|||||||
return r.InsertJob(job)
|
return r.InsertJob(job)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// StartDirect inserts a new job directly into the job table (not job_cache).
|
||||||
|
// Use this when the returned ID will immediately be used for job table
|
||||||
|
// operations such as adding tags.
|
||||||
|
func (r *JobRepository) StartDirect(job *schema.Job) (id int64, err error) {
|
||||||
|
job.RawFootprint, err = json.Marshal(job.Footprint)
|
||||||
|
if err != nil {
|
||||||
|
return -1, fmt.Errorf("REPOSITORY/JOB > encoding footprint field failed: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
job.RawResources, err = json.Marshal(job.Resources)
|
||||||
|
if err != nil {
|
||||||
|
return -1, fmt.Errorf("REPOSITORY/JOB > encoding resources field failed: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
job.RawMetaData, err = json.Marshal(job.MetaData)
|
||||||
|
if err != nil {
|
||||||
|
return -1, fmt.Errorf("REPOSITORY/JOB > encoding metaData field failed: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return r.InsertJobDirect(job)
|
||||||
|
}
|
||||||
|
|
||||||
// Stop updates the job with the database id jobId using the provided arguments.
|
// Stop updates the job with the database id jobId using the provided arguments.
|
||||||
func (r *JobRepository) Stop(
|
func (r *JobRepository) Stop(
|
||||||
jobID int64,
|
jobID int64,
|
||||||
|
|||||||
@@ -528,3 +528,80 @@ func TestSyncJobs(t *testing.T) {
|
|||||||
assert.Equal(t, 0, len(jobs), "Should return empty list when cache is empty")
|
assert.Equal(t, 0, len(jobs), "Should return empty list when cache is empty")
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestInsertJobDirect(t *testing.T) {
|
||||||
|
r := setup(t)
|
||||||
|
|
||||||
|
t.Run("inserts into job table not cache", func(t *testing.T) {
|
||||||
|
job := createTestJob(999020, "testcluster")
|
||||||
|
job.RawResources, _ = json.Marshal(job.Resources)
|
||||||
|
job.RawFootprint, _ = json.Marshal(job.Footprint)
|
||||||
|
job.RawMetaData, _ = json.Marshal(job.MetaData)
|
||||||
|
|
||||||
|
id, err := r.InsertJobDirect(job)
|
||||||
|
require.NoError(t, err, "InsertJobDirect should succeed")
|
||||||
|
assert.Greater(t, id, int64(0), "Should return valid insert ID")
|
||||||
|
|
||||||
|
// Verify job is in job table
|
||||||
|
var count int
|
||||||
|
err = r.DB.QueryRow("SELECT COUNT(*) FROM job WHERE id = ?", id).Scan(&count)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Equal(t, 1, count, "Job should be in job table")
|
||||||
|
|
||||||
|
// Verify job is NOT in job_cache
|
||||||
|
err = r.DB.QueryRow("SELECT COUNT(*) FROM job_cache WHERE job_id = ? AND cluster = ?",
|
||||||
|
job.JobID, job.Cluster).Scan(&count)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Equal(t, 0, count, "Job should NOT be in job_cache")
|
||||||
|
|
||||||
|
// Clean up
|
||||||
|
_, err = r.DB.Exec("DELETE FROM job WHERE id = ?", id)
|
||||||
|
require.NoError(t, err)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("returned ID works for tag operations", func(t *testing.T) {
|
||||||
|
job := createTestJob(999021, "testcluster")
|
||||||
|
job.RawResources, _ = json.Marshal(job.Resources)
|
||||||
|
job.RawFootprint, _ = json.Marshal(job.Footprint)
|
||||||
|
job.RawMetaData, _ = json.Marshal(job.MetaData)
|
||||||
|
|
||||||
|
id, err := r.InsertJobDirect(job)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Adding a tag using the returned ID should succeed (FK constraint on jobtag)
|
||||||
|
err = r.ImportTag(id, "test_type", "test_name", "global")
|
||||||
|
require.NoError(t, err, "ImportTag should succeed with direct insert ID")
|
||||||
|
|
||||||
|
// Clean up
|
||||||
|
_, err = r.DB.Exec("DELETE FROM jobtag WHERE job_id = ?", id)
|
||||||
|
require.NoError(t, err)
|
||||||
|
_, err = r.DB.Exec("DELETE FROM job WHERE id = ?", id)
|
||||||
|
require.NoError(t, err)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStartDirect(t *testing.T) {
|
||||||
|
r := setup(t)
|
||||||
|
|
||||||
|
t.Run("inserts into job table with JSON encoding", func(t *testing.T) {
|
||||||
|
job := createTestJob(999022, "testcluster")
|
||||||
|
|
||||||
|
id, err := r.StartDirect(job)
|
||||||
|
require.NoError(t, err, "StartDirect should succeed")
|
||||||
|
assert.Greater(t, id, int64(0))
|
||||||
|
|
||||||
|
// Verify job is in job table with encoded JSON
|
||||||
|
var rawResources []byte
|
||||||
|
err = r.DB.QueryRow("SELECT resources FROM job WHERE id = ?", id).Scan(&rawResources)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
var resources []*schema.Resource
|
||||||
|
err = json.Unmarshal(rawResources, &resources)
|
||||||
|
require.NoError(t, err, "Resources should be valid JSON")
|
||||||
|
assert.Equal(t, "node01", resources[0].Hostname)
|
||||||
|
|
||||||
|
// Clean up
|
||||||
|
_, err = r.DB.Exec("DELETE FROM job WHERE id = ?", id)
|
||||||
|
require.NoError(t, err)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user