From 574d941c6caca4796e6c60ec535256d5be8f9de7 Mon Sep 17 00:00:00 2001 From: Joachim Meyer Date: Wed, 9 Nov 2022 11:49:36 +0100 Subject: [PATCH] Upon start job, compare start time against all matching jobs. For requeued jobs, we might have multiple jobs with matching cluster and job id, but differing start time. So check all of the matching ones against the start time. --- internal/api/rest.go | 24 +++++++++++++----------- internal/repository/job.go | 36 ++++++++++++++++++++++++++++++++++++ 2 files changed, 49 insertions(+), 11 deletions(-) diff --git a/internal/api/rest.go b/internal/api/rest.go index 3525fba..e689f56 100644 --- a/internal/api/rest.go +++ b/internal/api/rest.go @@ -99,12 +99,12 @@ type StartJobApiResponse struct { // @Description Request to stop running job using stoptime and final state. // @Description They are only required if no database id was provided with endpoint. type StopJobApiRequest struct { - // Stop Time of job as epoch + // Stop Time of job as epoch StopTime int64 `json:"stopTime" validate:"required" example:"1649763839"` - State schema.JobState `json:"jobState" validate:"required" example:"completed" enums:"completed,failed,cancelled,stopped,timeout"` // Final job state - JobId *int64 `json:"jobId" example:"123000"` // Cluster Job ID of job - Cluster *string `json:"cluster" example:"fritz"` // Cluster of job - StartTime *int64 `json:"startTime" example:"1649723812"` // Start Time of job as epoch + State schema.JobState `json:"jobState" validate:"required" example:"completed" enums:"completed,failed,cancelled,stopped,timeout"` // Final job state + JobId *int64 `json:"jobId" example:"123000"` // Cluster Job ID of job + Cluster *string `json:"cluster" example:"fritz"` // Cluster of job + StartTime *int64 `json:"startTime" example:"1649723812"` // Start Time of job as epoch } // ErrorResponse model @@ -363,14 +363,16 @@ func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) { } // Check if combination of (job_id, cluster_id, start_time) already exists: - job, err := api.JobRepository.Find(&req.JobID, &req.Cluster, nil) + jobs, err := api.JobRepository.FindAll(&req.JobID, &req.Cluster, nil) if err != nil && err != sql.ErrNoRows { handleError(fmt.Errorf("checking for duplicate failed: %w", err), http.StatusInternalServerError, rw) return } else if err == nil { - if (req.StartTime - job.StartTimeUnix) < 86400 { - handleError(fmt.Errorf("a job with that jobId, cluster and startTime already exists: dbid: %d", job.ID), http.StatusUnprocessableEntity, rw) - return + for _, job := range jobs { + if (req.StartTime - job.StartTimeUnix) < 86400 { + handleError(fmt.Errorf("a job with that jobId, cluster and startTime already exists: dbid: %d", job.ID), http.StatusUnprocessableEntity, rw) + return + } } } @@ -723,13 +725,13 @@ func (api *RestApi) updateUser(rw http.ResponseWriter, r *http.Request) { delrole := r.FormValue("remove-role") // TODO: Handle anything but roles... - if (newrole != "") { + if newrole != "" { if err := api.Authentication.AddRole(r.Context(), mux.Vars(r)["id"], newrole); err != nil { http.Error(rw, err.Error(), http.StatusUnprocessableEntity) return } rw.Write([]byte("Add Role Success")) - } else if (delrole != "") { + } else if delrole != "" { if err := api.Authentication.RemoveRole(r.Context(), mux.Vars(r)["id"], delrole); err != nil { http.Error(rw, err.Error(), http.StatusUnprocessableEntity) return diff --git a/internal/repository/job.go b/internal/repository/job.go index 0496698..a568e11 100644 --- a/internal/repository/job.go +++ b/internal/repository/job.go @@ -156,6 +156,42 @@ func (r *JobRepository) Find( return scanJob(q.RunWith(r.stmtCache).QueryRow()) } +// Find executes a SQL query to find a specific batch job. +// The job is queried using the batch job id, the cluster name, +// and the start time of the job in UNIX epoch time seconds. +// It returns a pointer to a schema.Job data structure and an error variable. +// To check if no job was found test err == sql.ErrNoRows +func (r *JobRepository) FindAll( + jobId *int64, + cluster *string, + startTime *int64) ([]*schema.Job, error) { + + q := sq.Select(jobColumns...).From("job"). + Where("job.job_id = ?", *jobId) + + if cluster != nil { + q = q.Where("job.cluster = ?", *cluster) + } + if startTime != nil { + q = q.Where("job.start_time = ?", *startTime) + } + + rows, err := q.RunWith(r.stmtCache).Query() + if err != nil { + return nil, err + } + + jobs := make([]*schema.Job, 0, 10) + for rows.Next() { + job, err := scanJob(rows) + if err != nil { + return nil, err + } + jobs = append(jobs, job) + } + return jobs, nil +} + // FindById executes a SQL query to find a specific batch job. // The job is queried using the database id. // It returns a pointer to a schema.Job data structure and an error variable.