Upon start job, compare start time against all matching jobs.

For requeued jobs, we might have multiple jobs with matching cluster and job id, but differing start time. So check all of the matching ones against the start time.
This commit is contained in:
Joachim Meyer 2022-11-09 11:49:36 +01:00
parent d50b777d7b
commit 574d941c6c
2 changed files with 49 additions and 11 deletions

View File

@ -363,16 +363,18 @@ func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) {
} }
// Check if combination of (job_id, cluster_id, start_time) already exists: // Check if combination of (job_id, cluster_id, start_time) already exists:
job, err := api.JobRepository.Find(&req.JobID, &req.Cluster, nil) jobs, err := api.JobRepository.FindAll(&req.JobID, &req.Cluster, nil)
if err != nil && err != sql.ErrNoRows { if err != nil && err != sql.ErrNoRows {
handleError(fmt.Errorf("checking for duplicate failed: %w", err), http.StatusInternalServerError, rw) handleError(fmt.Errorf("checking for duplicate failed: %w", err), http.StatusInternalServerError, rw)
return return
} else if err == nil { } else if err == nil {
for _, job := range jobs {
if (req.StartTime - job.StartTimeUnix) < 86400 { if (req.StartTime - job.StartTimeUnix) < 86400 {
handleError(fmt.Errorf("a job with that jobId, cluster and startTime already exists: dbid: %d", job.ID), http.StatusUnprocessableEntity, rw) handleError(fmt.Errorf("a job with that jobId, cluster and startTime already exists: dbid: %d", job.ID), http.StatusUnprocessableEntity, rw)
return return
} }
} }
}
id, err := api.JobRepository.Start(&req) id, err := api.JobRepository.Start(&req)
if err != nil { if err != nil {
@ -723,13 +725,13 @@ func (api *RestApi) updateUser(rw http.ResponseWriter, r *http.Request) {
delrole := r.FormValue("remove-role") delrole := r.FormValue("remove-role")
// TODO: Handle anything but roles... // TODO: Handle anything but roles...
if (newrole != "") { if newrole != "" {
if err := api.Authentication.AddRole(r.Context(), mux.Vars(r)["id"], newrole); err != nil { if err := api.Authentication.AddRole(r.Context(), mux.Vars(r)["id"], newrole); err != nil {
http.Error(rw, err.Error(), http.StatusUnprocessableEntity) http.Error(rw, err.Error(), http.StatusUnprocessableEntity)
return return
} }
rw.Write([]byte("Add Role Success")) rw.Write([]byte("Add Role Success"))
} else if (delrole != "") { } else if delrole != "" {
if err := api.Authentication.RemoveRole(r.Context(), mux.Vars(r)["id"], delrole); err != nil { if err := api.Authentication.RemoveRole(r.Context(), mux.Vars(r)["id"], delrole); err != nil {
http.Error(rw, err.Error(), http.StatusUnprocessableEntity) http.Error(rw, err.Error(), http.StatusUnprocessableEntity)
return return

View File

@ -156,6 +156,42 @@ func (r *JobRepository) Find(
return scanJob(q.RunWith(r.stmtCache).QueryRow()) return scanJob(q.RunWith(r.stmtCache).QueryRow())
} }
// Find executes a SQL query to find a specific batch job.
// The job is queried using the batch job id, the cluster name,
// and the start time of the job in UNIX epoch time seconds.
// It returns a pointer to a schema.Job data structure and an error variable.
// To check if no job was found test err == sql.ErrNoRows
func (r *JobRepository) FindAll(
jobId *int64,
cluster *string,
startTime *int64) ([]*schema.Job, error) {
q := sq.Select(jobColumns...).From("job").
Where("job.job_id = ?", *jobId)
if cluster != nil {
q = q.Where("job.cluster = ?", *cluster)
}
if startTime != nil {
q = q.Where("job.start_time = ?", *startTime)
}
rows, err := q.RunWith(r.stmtCache).Query()
if err != nil {
return nil, err
}
jobs := make([]*schema.Job, 0, 10)
for rows.Next() {
job, err := scanJob(rows)
if err != nil {
return nil, err
}
jobs = append(jobs, job)
}
return jobs, nil
}
// FindById executes a SQL query to find a specific batch job. // FindById executes a SQL query to find a specific batch job.
// The job is queried using the database id. // The job is queried using the database id.
// It returns a pointer to a schema.Job data structure and an error variable. // It returns a pointer to a schema.Job data structure and an error variable.