diff --git a/internal/api/rest.go b/internal/api/rest.go index e689f56..d37e871 100644 --- a/internal/api/rest.go +++ b/internal/api/rest.go @@ -57,6 +57,7 @@ type RestApi struct { Authentication *auth.Authentication MachineStateDir string OngoingArchivings sync.WaitGroup + RepositoryMutex sync.Mutex } func (api *RestApi) MountRoutes(r *mux.Router) { @@ -362,6 +363,11 @@ func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) { return } + // aquire lock to avoid race condition between API calls + var unlockOnce sync.Once + api.RepositoryMutex.Lock() + defer unlockOnce.Do(api.RepositoryMutex.Unlock) + // Check if combination of (job_id, cluster_id, start_time) already exists: jobs, err := api.JobRepository.FindAll(&req.JobID, &req.Cluster, nil) if err != nil && err != sql.ErrNoRows { @@ -381,6 +387,8 @@ func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) { handleError(fmt.Errorf("insert into database failed: %w", err), http.StatusInternalServerError, rw) return } + // unlock here, adding Tags can be async + unlockOnce.Do(api.RepositoryMutex.Unlock) for _, tag := range req.Tags { if _, err := api.JobRepository.AddTagOrCreate(id, tag.Type, tag.Name); err != nil { @@ -509,7 +517,7 @@ func (api *RestApi) checkAndHandleStopJob(rw http.ResponseWriter, job *schema.Jo if req.State != "" && !req.State.Valid() { handleError(fmt.Errorf("invalid job state: %#v", req.State), http.StatusBadRequest, rw) return - } else { + } else if req.State == "" { req.State = schema.JobStateCompleted } diff --git a/test/integration_test.go b/test/integration_test.go index a7753fd..3442051 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -566,8 +566,8 @@ func subtestLetJobFail(t *testing.T, restapi *api.RestApi, r *mux.Router) { t.Fatal(err) } - if job.State != schema.JobStateCompleted { - t.Fatal("expected job to be completed") + if job.State != schema.JobStateFailed { + t.Fatal("expected job to be failed") } }) if !ok {