mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2026-02-11 13:31:45 +01:00
fix: Transfer always to main job table before archiving
This commit is contained in:
@@ -754,6 +754,7 @@ func (api *RestAPI) stopJobByRequest(rw http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
isCached := false
|
||||
job, err = api.JobRepository.Find(req.JobID, req.Cluster, req.StartTime)
|
||||
if err != nil {
|
||||
// Try cached jobs if not found in main repository
|
||||
@@ -764,9 +765,10 @@ func (api *RestAPI) stopJobByRequest(rw http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
job = cachedJob
|
||||
isCached = true
|
||||
}
|
||||
|
||||
api.checkAndHandleStopJob(rw, job, req)
|
||||
api.checkAndHandleStopJob(rw, job, req, isCached)
|
||||
}
|
||||
|
||||
// deleteJobByID godoc
|
||||
@@ -923,7 +925,7 @@ func (api *RestAPI) deleteJobBefore(rw http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
}
|
||||
|
||||
func (api *RestAPI) checkAndHandleStopJob(rw http.ResponseWriter, job *schema.Job, req StopJobAPIRequest) {
|
||||
func (api *RestAPI) checkAndHandleStopJob(rw http.ResponseWriter, job *schema.Job, req StopJobAPIRequest, isCached bool) {
|
||||
// Sanity checks
|
||||
if job.State != schema.JobStateRunning {
|
||||
handleError(fmt.Errorf("jobId %d (id %d) on %s : job has already been stopped (state is: %s)", job.JobID, *job.ID, job.Cluster, job.State), http.StatusUnprocessableEntity, rw)
|
||||
@@ -948,11 +950,21 @@ func (api *RestAPI) checkAndHandleStopJob(rw http.ResponseWriter, job *schema.Jo
|
||||
api.JobRepository.Mutex.Lock()
|
||||
defer api.JobRepository.Mutex.Unlock()
|
||||
|
||||
if err := api.JobRepository.Stop(*job.ID, job.Duration, job.State, job.MonitoringStatus); err != nil {
|
||||
if err := api.JobRepository.StopCached(*job.ID, job.Duration, job.State, job.MonitoringStatus); err != nil {
|
||||
handleError(fmt.Errorf("jobId %d (id %d) on %s : marking job as '%s' (duration: %d) in DB failed: %w", job.JobID, *job.ID, job.Cluster, job.State, job.Duration, err), http.StatusInternalServerError, rw)
|
||||
// If the job is still in job_cache, transfer it to the job table first
|
||||
// so that job.ID always points to the job table for downstream code
|
||||
if isCached {
|
||||
newID, err := api.JobRepository.TransferCachedJobToMain(*job.ID)
|
||||
if err != nil {
|
||||
handleError(fmt.Errorf("jobId %d (id %d) on %s : transferring cached job failed: %w", job.JobID, *job.ID, job.Cluster, err), http.StatusInternalServerError, rw)
|
||||
return
|
||||
}
|
||||
cclog.Infof("transferred cached job to main table: old id %d -> new id %d (jobId=%d)", *job.ID, newID, job.JobID)
|
||||
job.ID = &newID
|
||||
}
|
||||
|
||||
if err := api.JobRepository.Stop(*job.ID, job.Duration, job.State, job.MonitoringStatus); err != nil {
|
||||
handleError(fmt.Errorf("jobId %d (id %d) on %s : marking job as '%s' (duration: %d) in DB failed: %w", job.JobID, *job.ID, job.Cluster, job.State, job.Duration, err), http.StatusInternalServerError, rw)
|
||||
return
|
||||
}
|
||||
|
||||
cclog.Infof("archiving job... (dbid: %d): cluster=%s, jobId=%d, user=%s, startTime=%d, duration=%d, state=%s", *job.ID, job.Cluster, job.JobID, job.User, job.StartTime, job.Duration, job.State)
|
||||
|
||||
@@ -251,6 +251,7 @@ func (api *NatsAPI) handleStopJob(payload string) {
|
||||
return
|
||||
}
|
||||
|
||||
isCached := false
|
||||
job, err := api.JobRepository.Find(req.JobID, req.Cluster, req.StartTime)
|
||||
if err != nil {
|
||||
cachedJob, cachedErr := api.JobRepository.FindCached(req.JobID, req.Cluster, req.StartTime)
|
||||
@@ -260,6 +261,7 @@ func (api *NatsAPI) handleStopJob(payload string) {
|
||||
return
|
||||
}
|
||||
job = cachedJob
|
||||
isCached = true
|
||||
}
|
||||
|
||||
if job.State != schema.JobStateRunning {
|
||||
@@ -287,16 +289,26 @@ func (api *NatsAPI) handleStopJob(payload string) {
|
||||
api.JobRepository.Mutex.Lock()
|
||||
defer api.JobRepository.Mutex.Unlock()
|
||||
|
||||
if err := api.JobRepository.Stop(*job.ID, job.Duration, job.State, job.MonitoringStatus); err != nil {
|
||||
if err := api.JobRepository.StopCached(*job.ID, job.Duration, job.State, job.MonitoringStatus); err != nil {
|
||||
cclog.Errorf("NATS job stop: jobId %d (id %d) on %s: marking job as '%s' failed: %v",
|
||||
job.JobID, job.ID, job.Cluster, job.State, err)
|
||||
// If the job is still in job_cache, transfer it to the job table first
|
||||
if isCached {
|
||||
newID, err := api.JobRepository.TransferCachedJobToMain(*job.ID)
|
||||
if err != nil {
|
||||
cclog.Errorf("NATS job stop: jobId %d (id %d) on %s: transferring cached job failed: %v",
|
||||
job.JobID, *job.ID, job.Cluster, err)
|
||||
return
|
||||
}
|
||||
cclog.Infof("NATS: transferred cached job to main table: old id %d -> new id %d (jobId=%d)", *job.ID, newID, job.JobID)
|
||||
job.ID = &newID
|
||||
}
|
||||
|
||||
if err := api.JobRepository.Stop(*job.ID, job.Duration, job.State, job.MonitoringStatus); err != nil {
|
||||
cclog.Errorf("NATS job stop: jobId %d (id %d) on %s: marking job as '%s' failed: %v",
|
||||
job.JobID, *job.ID, job.Cluster, job.State, err)
|
||||
return
|
||||
}
|
||||
|
||||
cclog.Infof("NATS: archiving job (dbid: %d): cluster=%s, jobId=%d, user=%s, startTime=%d, duration=%d, state=%s",
|
||||
job.ID, job.Cluster, job.JobID, job.User, job.StartTime, job.Duration, job.State)
|
||||
*job.ID, job.Cluster, job.JobID, job.User, job.StartTime, job.Duration, job.State)
|
||||
|
||||
if job.MonitoringStatus == schema.MonitoringStatusDisabled {
|
||||
return
|
||||
|
||||
@@ -71,8 +71,9 @@ func (r *JobRepository) SyncJobs() ([]*schema.Job, error) {
|
||||
jobs = append(jobs, job)
|
||||
}
|
||||
|
||||
// Use INSERT OR IGNORE to skip jobs already transferred by the stop path
|
||||
_, err = r.DB.Exec(
|
||||
"INSERT INTO job (job_id, cluster, subcluster, start_time, hpc_user, project, cluster_partition, array_job_id, num_nodes, num_hwthreads, num_acc, shared, monitoring_status, smt, job_state, duration, walltime, footprint, energy, energy_footprint, resources, meta_data) SELECT job_id, cluster, subcluster, start_time, hpc_user, project, cluster_partition, array_job_id, num_nodes, num_hwthreads, num_acc, shared, monitoring_status, smt, job_state, duration, walltime, footprint, energy, energy_footprint, resources, meta_data FROM job_cache")
|
||||
"INSERT OR IGNORE INTO job (job_id, cluster, subcluster, start_time, hpc_user, project, cluster_partition, array_job_id, num_nodes, num_hwthreads, num_acc, shared, monitoring_status, smt, job_state, duration, walltime, footprint, energy, energy_footprint, resources, meta_data) SELECT job_id, cluster, subcluster, start_time, hpc_user, project, cluster_partition, array_job_id, num_nodes, num_hwthreads, num_acc, shared, monitoring_status, smt, job_state, duration, walltime, footprint, energy, energy_footprint, resources, meta_data FROM job_cache")
|
||||
if err != nil {
|
||||
cclog.Warnf("Error while Job sync: %v", err)
|
||||
return nil, err
|
||||
@@ -87,6 +88,29 @@ func (r *JobRepository) SyncJobs() ([]*schema.Job, error) {
|
||||
return jobs, nil
|
||||
}
|
||||
|
||||
// TransferCachedJobToMain moves a job from job_cache to the job table.
|
||||
// Caller must hold r.Mutex. Returns the new job table ID.
|
||||
func (r *JobRepository) TransferCachedJobToMain(cacheID int64) (int64, error) {
|
||||
res, err := r.DB.Exec(
|
||||
"INSERT INTO job (job_id, cluster, subcluster, start_time, hpc_user, project, cluster_partition, array_job_id, num_nodes, num_hwthreads, num_acc, shared, monitoring_status, smt, job_state, duration, walltime, footprint, energy, energy_footprint, resources, meta_data) SELECT job_id, cluster, subcluster, start_time, hpc_user, project, cluster_partition, array_job_id, num_nodes, num_hwthreads, num_acc, shared, monitoring_status, smt, job_state, duration, walltime, footprint, energy, energy_footprint, resources, meta_data FROM job_cache WHERE id = ?",
|
||||
cacheID)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("transferring cached job %d to main table failed: %w", cacheID, err)
|
||||
}
|
||||
|
||||
newID, err := res.LastInsertId()
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("getting new job ID after transfer failed: %w", err)
|
||||
}
|
||||
|
||||
_, err = r.DB.Exec("DELETE FROM job_cache WHERE id = ?", cacheID)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("deleting cached job %d after transfer failed: %w", cacheID, err)
|
||||
}
|
||||
|
||||
return newID, nil
|
||||
}
|
||||
|
||||
// Start inserts a new job in the table, returning the unique job ID.
|
||||
// Statistics are not transfered!
|
||||
func (r *JobRepository) Start(job *schema.Job) (id int64, err error) {
|
||||
@@ -129,20 +153,3 @@ func (r *JobRepository) Stop(
|
||||
return err
|
||||
}
|
||||
|
||||
func (r *JobRepository) StopCached(
|
||||
jobID int64,
|
||||
duration int32,
|
||||
state schema.JobState,
|
||||
monitoringStatus int32,
|
||||
) (err error) {
|
||||
// Note: StopCached updates job_cache table, not the main job table
|
||||
// Cache invalidation happens when job is synced to main table
|
||||
stmt := sq.Update("job_cache").
|
||||
Set("job_state", state).
|
||||
Set("duration", duration).
|
||||
Set("monitoring_status", monitoringStatus).
|
||||
Where("job_cache.id = ?", jobID)
|
||||
|
||||
_, err = stmt.RunWith(r.stmtCache).Exec()
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -331,58 +331,60 @@ func TestStop(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestStopCached(t *testing.T) {
|
||||
func TestTransferCachedJobToMain(t *testing.T) {
|
||||
r := setup(t)
|
||||
|
||||
t.Run("successful stop cached job", func(t *testing.T) {
|
||||
t.Run("successful transfer from cache to main", func(t *testing.T) {
|
||||
// Insert a job in job_cache
|
||||
job := createTestJob(999009, "testcluster")
|
||||
id, err := r.Start(job)
|
||||
cacheID, err := r.Start(job)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Stop the cached job
|
||||
duration := int32(3600)
|
||||
state := schema.JobStateCompleted
|
||||
monitoringStatus := int32(schema.MonitoringStatusArchivingSuccessful)
|
||||
// Transfer the cached job to the main table
|
||||
r.Mutex.Lock()
|
||||
newID, err := r.TransferCachedJobToMain(cacheID)
|
||||
r.Mutex.Unlock()
|
||||
require.NoError(t, err, "TransferCachedJobToMain should succeed")
|
||||
assert.NotEqual(t, cacheID, newID, "New ID should differ from cache ID")
|
||||
|
||||
err = r.StopCached(id, duration, state, monitoringStatus)
|
||||
require.NoError(t, err, "StopCached should succeed")
|
||||
|
||||
// Verify job was updated in job_cache table
|
||||
var retrievedDuration int32
|
||||
var retrievedState string
|
||||
var retrievedMonStatus int32
|
||||
err = r.DB.QueryRow(`SELECT duration, job_state, monitoring_status FROM job_cache WHERE id = ?`, id).Scan(
|
||||
&retrievedDuration, &retrievedState, &retrievedMonStatus)
|
||||
// Verify job exists in job table
|
||||
var count int
|
||||
err = r.DB.QueryRow(`SELECT COUNT(*) FROM job WHERE id = ?`, newID).Scan(&count)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, duration, retrievedDuration)
|
||||
assert.Equal(t, string(state), retrievedState)
|
||||
assert.Equal(t, monitoringStatus, retrievedMonStatus)
|
||||
assert.Equal(t, 1, count, "Job should exist in main table")
|
||||
|
||||
// Verify job was removed from job_cache
|
||||
err = r.DB.QueryRow(`SELECT COUNT(*) FROM job_cache WHERE id = ?`, cacheID).Scan(&count)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 0, count, "Job should be removed from cache")
|
||||
|
||||
// Clean up
|
||||
_, err = r.DB.Exec("DELETE FROM job_cache WHERE id = ?", id)
|
||||
_, err = r.DB.Exec("DELETE FROM job WHERE id = ?", newID)
|
||||
require.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("stop cached job does not affect job table", func(t *testing.T) {
|
||||
t.Run("transfer preserves job data", func(t *testing.T) {
|
||||
// Insert a job in job_cache
|
||||
job := createTestJob(999010, "testcluster")
|
||||
id, err := r.Start(job)
|
||||
cacheID, err := r.Start(job)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Stop the cached job
|
||||
err = r.StopCached(id, 3600, schema.JobStateCompleted, int32(schema.MonitoringStatusArchivingSuccessful))
|
||||
// Transfer the cached job
|
||||
r.Mutex.Lock()
|
||||
newID, err := r.TransferCachedJobToMain(cacheID)
|
||||
r.Mutex.Unlock()
|
||||
require.NoError(t, err)
|
||||
|
||||
// Verify job table was not affected
|
||||
var count int
|
||||
err = r.DB.QueryRow(`SELECT COUNT(*) FROM job WHERE job_id = ? AND cluster = ?`,
|
||||
job.JobID, job.Cluster).Scan(&count)
|
||||
// Verify the transferred job has the correct data
|
||||
var jobID int64
|
||||
var cluster string
|
||||
err = r.DB.QueryRow(`SELECT job_id, cluster FROM job WHERE id = ?`, newID).Scan(&jobID, &cluster)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 0, count, "Job table should not be affected by StopCached")
|
||||
assert.Equal(t, job.JobID, jobID)
|
||||
assert.Equal(t, job.Cluster, cluster)
|
||||
|
||||
// Clean up
|
||||
_, err = r.DB.Exec("DELETE FROM job_cache WHERE id = ?", id)
|
||||
_, err = r.DB.Exec("DELETE FROM job WHERE id = ?", newID)
|
||||
require.NoError(t, err)
|
||||
})
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user