Merge pull request #72 from giesselmann/fix_api_state_and_racecond

Preserve job state in stopJob and secure startJob by Mutex.
Fixes #46 and #68
This commit is contained in:
Jan Eitzinger 2022-11-25 14:25:50 +01:00 committed by GitHub
commit 3cc006d4ce
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 11 additions and 3 deletions

View File

@ -57,6 +57,7 @@ type RestApi struct {
Authentication *auth.Authentication Authentication *auth.Authentication
MachineStateDir string MachineStateDir string
OngoingArchivings sync.WaitGroup OngoingArchivings sync.WaitGroup
RepositoryMutex sync.Mutex
} }
func (api *RestApi) MountRoutes(r *mux.Router) { func (api *RestApi) MountRoutes(r *mux.Router) {
@ -362,6 +363,11 @@ func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) {
return return
} }
// aquire lock to avoid race condition between API calls
var unlockOnce sync.Once
api.RepositoryMutex.Lock()
defer unlockOnce.Do(api.RepositoryMutex.Unlock)
// Check if combination of (job_id, cluster_id, start_time) already exists: // Check if combination of (job_id, cluster_id, start_time) already exists:
jobs, err := api.JobRepository.FindAll(&req.JobID, &req.Cluster, nil) jobs, err := api.JobRepository.FindAll(&req.JobID, &req.Cluster, nil)
if err != nil && err != sql.ErrNoRows { if err != nil && err != sql.ErrNoRows {
@ -381,6 +387,8 @@ func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) {
handleError(fmt.Errorf("insert into database failed: %w", err), http.StatusInternalServerError, rw) handleError(fmt.Errorf("insert into database failed: %w", err), http.StatusInternalServerError, rw)
return return
} }
// unlock here, adding Tags can be async
unlockOnce.Do(api.RepositoryMutex.Unlock)
for _, tag := range req.Tags { for _, tag := range req.Tags {
if _, err := api.JobRepository.AddTagOrCreate(id, tag.Type, tag.Name); err != nil { if _, err := api.JobRepository.AddTagOrCreate(id, tag.Type, tag.Name); err != nil {
@ -509,7 +517,7 @@ func (api *RestApi) checkAndHandleStopJob(rw http.ResponseWriter, job *schema.Jo
if req.State != "" && !req.State.Valid() { if req.State != "" && !req.State.Valid() {
handleError(fmt.Errorf("invalid job state: %#v", req.State), http.StatusBadRequest, rw) handleError(fmt.Errorf("invalid job state: %#v", req.State), http.StatusBadRequest, rw)
return return
} else { } else if req.State == "" {
req.State = schema.JobStateCompleted req.State = schema.JobStateCompleted
} }

View File

@ -566,8 +566,8 @@ func subtestLetJobFail(t *testing.T, restapi *api.RestApi, r *mux.Router) {
t.Fatal(err) t.Fatal(err)
} }
if job.State != schema.JobStateCompleted { if job.State != schema.JobStateFailed {
t.Fatal("expected job to be completed") t.Fatal("expected job to be failed")
} }
}) })
if !ok { if !ok {