diff --git a/internal/api/job.go b/internal/api/job.go index 66258668..59136b02 100644 --- a/internal/api/job.go +++ b/internal/api/job.go @@ -697,7 +697,15 @@ func (api *RestAPI) startJob(rw http.ResponseWriter, r *http.Request) { } } - id, err := api.JobRepository.Start(&req) + // When tags are present, insert directly into the job table so that the + // returned ID can be used with AddTagOrCreate (which queries the job table). + // Jobs without tags use the cache path as before. + var id int64 + if len(req.Tags) > 0 { + id, err = api.JobRepository.StartDirect(&req) + } else { + id, err = api.JobRepository.Start(&req) + } if err != nil { handleError(fmt.Errorf("insert into database failed: %w", err), http.StatusInternalServerError, rw) return diff --git a/internal/api/nats.go b/internal/api/nats.go index 0e929426..f7ad0f40 100644 --- a/internal/api/nats.go +++ b/internal/api/nats.go @@ -211,7 +211,14 @@ func (api *NatsAPI) handleStartJob(payload string) { } } - id, err := api.JobRepository.Start(&req) + // When tags are present, insert directly into the job table so that the + // returned ID can be used with AddTagOrCreate (which queries the job table). + var id int64 + if len(req.Tags) > 0 { + id, err = api.JobRepository.StartDirect(&req) + } else { + id, err = api.JobRepository.Start(&req) + } if err != nil { cclog.Errorf("NATS start job: insert into database failed: %v", err) return diff --git a/internal/importer/handleImport.go b/internal/importer/handleImport.go index 257b5fec..68b6db9c 100644 --- a/internal/importer/handleImport.go +++ b/internal/importer/handleImport.go @@ -102,7 +102,7 @@ func HandleImportFlag(flag string) error { return err } - id, err := r.InsertJob(&job) + id, err := r.InsertJobDirect(&job) if err != nil { cclog.Warn("Error while job db insert") return err diff --git a/internal/importer/importer_test.go b/internal/importer/importer_test.go index f53e3a9d..cb4dca89 100644 --- a/internal/importer/importer_test.go +++ b/internal/importer/importer_test.go @@ -165,7 +165,7 @@ func TestHandleImportFlag(t *testing.T) { } result := readResult(t, testname) - job, err := r.FindCached(&result.JobId, &result.Cluster, &result.StartTime) + job, err := r.Find(&result.JobId, &result.Cluster, &result.StartTime) if err != nil { t.Fatal(err) } diff --git a/internal/repository/jobCreate.go b/internal/repository/jobCreate.go index 1ae05d8d..07c8ce11 100644 --- a/internal/repository/jobCreate.go +++ b/internal/repository/jobCreate.go @@ -30,6 +30,27 @@ const NamedJobInsert string = `INSERT INTO job ( :shared, :monitoring_status, :smt, :job_state, :start_time, :duration, :walltime, :footprint, :energy, :energy_footprint, :resources, :meta_data );` +// InsertJobDirect inserts a job directly into the job table (not job_cache). +// Use this when the returned ID will be used for operations on the job table +// (e.g., adding tags), or for imported jobs that are already completed. +func (r *JobRepository) InsertJobDirect(job *schema.Job) (int64, error) { + r.Mutex.Lock() + defer r.Mutex.Unlock() + + res, err := r.DB.NamedExec(NamedJobInsert, job) + if err != nil { + cclog.Warn("Error while NamedJobInsert (direct)") + return 0, err + } + id, err := res.LastInsertId() + if err != nil { + cclog.Warn("Error while getting last insert ID (direct)") + return 0, err + } + + return id, nil +} + func (r *JobRepository) InsertJob(job *schema.Job) (int64, error) { r.Mutex.Lock() defer r.Mutex.Unlock() @@ -148,6 +169,28 @@ func (r *JobRepository) Start(job *schema.Job) (id int64, err error) { return r.InsertJob(job) } +// StartDirect inserts a new job directly into the job table (not job_cache). +// Use this when the returned ID will immediately be used for job table +// operations such as adding tags. +func (r *JobRepository) StartDirect(job *schema.Job) (id int64, err error) { + job.RawFootprint, err = json.Marshal(job.Footprint) + if err != nil { + return -1, fmt.Errorf("REPOSITORY/JOB > encoding footprint field failed: %w", err) + } + + job.RawResources, err = json.Marshal(job.Resources) + if err != nil { + return -1, fmt.Errorf("REPOSITORY/JOB > encoding resources field failed: %w", err) + } + + job.RawMetaData, err = json.Marshal(job.MetaData) + if err != nil { + return -1, fmt.Errorf("REPOSITORY/JOB > encoding metaData field failed: %w", err) + } + + return r.InsertJobDirect(job) +} + // Stop updates the job with the database id jobId using the provided arguments. func (r *JobRepository) Stop( jobID int64, diff --git a/internal/repository/jobCreate_test.go b/internal/repository/jobCreate_test.go index 5bc0c0ee..3f2ee6fa 100644 --- a/internal/repository/jobCreate_test.go +++ b/internal/repository/jobCreate_test.go @@ -528,3 +528,80 @@ func TestSyncJobs(t *testing.T) { assert.Equal(t, 0, len(jobs), "Should return empty list when cache is empty") }) } + +func TestInsertJobDirect(t *testing.T) { + r := setup(t) + + t.Run("inserts into job table not cache", func(t *testing.T) { + job := createTestJob(999020, "testcluster") + job.RawResources, _ = json.Marshal(job.Resources) + job.RawFootprint, _ = json.Marshal(job.Footprint) + job.RawMetaData, _ = json.Marshal(job.MetaData) + + id, err := r.InsertJobDirect(job) + require.NoError(t, err, "InsertJobDirect should succeed") + assert.Greater(t, id, int64(0), "Should return valid insert ID") + + // Verify job is in job table + var count int + err = r.DB.QueryRow("SELECT COUNT(*) FROM job WHERE id = ?", id).Scan(&count) + require.NoError(t, err) + assert.Equal(t, 1, count, "Job should be in job table") + + // Verify job is NOT in job_cache + err = r.DB.QueryRow("SELECT COUNT(*) FROM job_cache WHERE job_id = ? AND cluster = ?", + job.JobID, job.Cluster).Scan(&count) + require.NoError(t, err) + assert.Equal(t, 0, count, "Job should NOT be in job_cache") + + // Clean up + _, err = r.DB.Exec("DELETE FROM job WHERE id = ?", id) + require.NoError(t, err) + }) + + t.Run("returned ID works for tag operations", func(t *testing.T) { + job := createTestJob(999021, "testcluster") + job.RawResources, _ = json.Marshal(job.Resources) + job.RawFootprint, _ = json.Marshal(job.Footprint) + job.RawMetaData, _ = json.Marshal(job.MetaData) + + id, err := r.InsertJobDirect(job) + require.NoError(t, err) + + // Adding a tag using the returned ID should succeed (FK constraint on jobtag) + err = r.ImportTag(id, "test_type", "test_name", "global") + require.NoError(t, err, "ImportTag should succeed with direct insert ID") + + // Clean up + _, err = r.DB.Exec("DELETE FROM jobtag WHERE job_id = ?", id) + require.NoError(t, err) + _, err = r.DB.Exec("DELETE FROM job WHERE id = ?", id) + require.NoError(t, err) + }) +} + +func TestStartDirect(t *testing.T) { + r := setup(t) + + t.Run("inserts into job table with JSON encoding", func(t *testing.T) { + job := createTestJob(999022, "testcluster") + + id, err := r.StartDirect(job) + require.NoError(t, err, "StartDirect should succeed") + assert.Greater(t, id, int64(0)) + + // Verify job is in job table with encoded JSON + var rawResources []byte + err = r.DB.QueryRow("SELECT resources FROM job WHERE id = ?", id).Scan(&rawResources) + require.NoError(t, err) + + var resources []*schema.Resource + err = json.Unmarshal(rawResources, &resources) + require.NoError(t, err, "Resources should be valid JSON") + assert.Equal(t, "node01", resources[0].Hostname) + + // Clean up + _, err = r.DB.Exec("DELETE FROM job WHERE id = ?", id) + require.NoError(t, err) + }) +}