Staged error handling for job cache

This commit is contained in:
Jan Eitzinger 2025-05-16 17:37:24 +02:00
parent d76b1ae75d
commit 2e781b900d

View File

@ -820,7 +820,7 @@ func (api *RestApi) removeTags(rw http.ResponseWriter, r *http.Request) {
} }
rw.WriteHeader(http.StatusOK) rw.WriteHeader(http.StatusOK)
rw.Write([]byte(fmt.Sprintf("Deleted Tags from DB: %d successfull of %d requested\n", currentCount, targetCount))) fmt.Fprintf(rw, "Deleted Tags from DB: %d successfull of %d requested\n", currentCount, targetCount)
} }
// startJob godoc // startJob godoc
@ -846,6 +846,7 @@ func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) {
return return
} }
log.Printf("REST: %s\n", req.GoString())
req.State = schema.JobStateRunning req.State = schema.JobStateRunning
if err := importer.SanityChecks(&req.BaseJob); err != nil { if err := importer.SanityChecks(&req.BaseJob); err != nil {
@ -931,8 +932,12 @@ func (api *RestApi) stopJobByRequest(rw http.ResponseWriter, r *http.Request) {
// log.Printf("loading db job for stopJobByRequest... : stopJobApiRequest=%v", req) // log.Printf("loading db job for stopJobByRequest... : stopJobApiRequest=%v", req)
job, err = api.JobRepository.Find(req.JobId, req.Cluster, req.StartTime) job, err = api.JobRepository.Find(req.JobId, req.Cluster, req.StartTime)
if err != nil { if err != nil {
handleError(fmt.Errorf("finding job failed: %w", err), http.StatusUnprocessableEntity, rw) job, err = api.JobRepository.FindCached(req.JobId, req.Cluster, req.StartTime)
return // FIXME: Previous error is hidden
if err != nil {
handleError(fmt.Errorf("finding job failed: %w", err), http.StatusUnprocessableEntity, rw)
return
}
} }
api.checkAndHandleStopJob(rw, job, req) api.checkAndHandleStopJob(rw, job, req)
@ -1097,10 +1102,15 @@ func (api *RestApi) checkAndHandleStopJob(rw http.ResponseWriter, job *schema.Jo
// Mark job as stopped in the database (update state and duration) // Mark job as stopped in the database (update state and duration)
job.Duration = int32(req.StopTime - job.StartTime.Unix()) job.Duration = int32(req.StopTime - job.StartTime.Unix())
job.State = req.State job.State = req.State
api.JobRepository.Mutex.Lock()
if err := api.JobRepository.Stop(job.ID, job.Duration, job.State, job.MonitoringStatus); err != nil { if err := api.JobRepository.Stop(job.ID, job.Duration, job.State, job.MonitoringStatus); err != nil {
handleError(fmt.Errorf("jobId %d (id %d) on %s : marking job as '%s' (duration: %d) in DB failed: %w", job.JobID, job.ID, job.Cluster, job.State, job.Duration, err), http.StatusInternalServerError, rw) if err := api.JobRepository.StopCached(job.ID, job.Duration, job.State, job.MonitoringStatus); err != nil {
return api.JobRepository.Mutex.Unlock()
handleError(fmt.Errorf("jobId %d (id %d) on %s : marking job as '%s' (duration: %d) in DB failed: %w", job.JobID, job.ID, job.Cluster, job.State, job.Duration, err), http.StatusInternalServerError, rw)
return
}
} }
api.JobRepository.Mutex.Unlock()
log.Printf("archiving job... (dbid: %d): cluster=%s, jobId=%d, user=%s, startTime=%s, duration=%d, state=%s", job.ID, job.Cluster, job.JobID, job.User, job.StartTime, job.Duration, job.State) log.Printf("archiving job... (dbid: %d): cluster=%s, jobId=%d, user=%s, startTime=%s, duration=%d, state=%s", job.ID, job.Cluster, job.JobID, job.User, job.StartTime, job.Duration, job.State)
@ -1116,6 +1126,8 @@ func (api *RestApi) checkAndHandleStopJob(rw http.ResponseWriter, job *schema.Jo
return return
} }
repository.CallJobStopHooks()
// Trigger async archiving // Trigger async archiving
archiver.TriggerArchiving(job) archiver.TriggerArchiving(job)
} }