Merge pull request #64 from fodinabor/fix/sanitize-all-matching-jobs

Upon start job, compare start time against all matching jobs.
This commit is contained in:
Jan Eitzinger 2022-11-10 06:11:45 +01:00 committed by GitHub
commit c4072ff4c7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 49 additions and 11 deletions

View File

@ -99,12 +99,12 @@ type StartJobApiResponse struct {
// @Description Request to stop running job using stoptime and final state. // @Description Request to stop running job using stoptime and final state.
// @Description They are only required if no database id was provided with endpoint. // @Description They are only required if no database id was provided with endpoint.
type StopJobApiRequest struct { type StopJobApiRequest struct {
// Stop Time of job as epoch // Stop Time of job as epoch
StopTime int64 `json:"stopTime" validate:"required" example:"1649763839"` StopTime int64 `json:"stopTime" validate:"required" example:"1649763839"`
State schema.JobState `json:"jobState" validate:"required" example:"completed" enums:"completed,failed,cancelled,stopped,timeout"` // Final job state State schema.JobState `json:"jobState" validate:"required" example:"completed" enums:"completed,failed,cancelled,stopped,timeout"` // Final job state
JobId *int64 `json:"jobId" example:"123000"` // Cluster Job ID of job JobId *int64 `json:"jobId" example:"123000"` // Cluster Job ID of job
Cluster *string `json:"cluster" example:"fritz"` // Cluster of job Cluster *string `json:"cluster" example:"fritz"` // Cluster of job
StartTime *int64 `json:"startTime" example:"1649723812"` // Start Time of job as epoch StartTime *int64 `json:"startTime" example:"1649723812"` // Start Time of job as epoch
} }
// ErrorResponse model // ErrorResponse model
@ -363,14 +363,16 @@ func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) {
} }
// Check if combination of (job_id, cluster_id, start_time) already exists: // Check if combination of (job_id, cluster_id, start_time) already exists:
job, err := api.JobRepository.Find(&req.JobID, &req.Cluster, nil) jobs, err := api.JobRepository.FindAll(&req.JobID, &req.Cluster, nil)
if err != nil && err != sql.ErrNoRows { if err != nil && err != sql.ErrNoRows {
handleError(fmt.Errorf("checking for duplicate failed: %w", err), http.StatusInternalServerError, rw) handleError(fmt.Errorf("checking for duplicate failed: %w", err), http.StatusInternalServerError, rw)
return return
} else if err == nil { } else if err == nil {
if (req.StartTime - job.StartTimeUnix) < 86400 { for _, job := range jobs {
handleError(fmt.Errorf("a job with that jobId, cluster and startTime already exists: dbid: %d", job.ID), http.StatusUnprocessableEntity, rw) if (req.StartTime - job.StartTimeUnix) < 86400 {
return handleError(fmt.Errorf("a job with that jobId, cluster and startTime already exists: dbid: %d", job.ID), http.StatusUnprocessableEntity, rw)
return
}
} }
} }
@ -723,13 +725,13 @@ func (api *RestApi) updateUser(rw http.ResponseWriter, r *http.Request) {
delrole := r.FormValue("remove-role") delrole := r.FormValue("remove-role")
// TODO: Handle anything but roles... // TODO: Handle anything but roles...
if (newrole != "") { if newrole != "" {
if err := api.Authentication.AddRole(r.Context(), mux.Vars(r)["id"], newrole); err != nil { if err := api.Authentication.AddRole(r.Context(), mux.Vars(r)["id"], newrole); err != nil {
http.Error(rw, err.Error(), http.StatusUnprocessableEntity) http.Error(rw, err.Error(), http.StatusUnprocessableEntity)
return return
} }
rw.Write([]byte("Add Role Success")) rw.Write([]byte("Add Role Success"))
} else if (delrole != "") { } else if delrole != "" {
if err := api.Authentication.RemoveRole(r.Context(), mux.Vars(r)["id"], delrole); err != nil { if err := api.Authentication.RemoveRole(r.Context(), mux.Vars(r)["id"], delrole); err != nil {
http.Error(rw, err.Error(), http.StatusUnprocessableEntity) http.Error(rw, err.Error(), http.StatusUnprocessableEntity)
return return

View File

@ -156,6 +156,42 @@ func (r *JobRepository) Find(
return scanJob(q.RunWith(r.stmtCache).QueryRow()) return scanJob(q.RunWith(r.stmtCache).QueryRow())
} }
// Find executes a SQL query to find a specific batch job.
// The job is queried using the batch job id, the cluster name,
// and the start time of the job in UNIX epoch time seconds.
// It returns a pointer to a schema.Job data structure and an error variable.
// To check if no job was found test err == sql.ErrNoRows
func (r *JobRepository) FindAll(
jobId *int64,
cluster *string,
startTime *int64) ([]*schema.Job, error) {
q := sq.Select(jobColumns...).From("job").
Where("job.job_id = ?", *jobId)
if cluster != nil {
q = q.Where("job.cluster = ?", *cluster)
}
if startTime != nil {
q = q.Where("job.start_time = ?", *startTime)
}
rows, err := q.RunWith(r.stmtCache).Query()
if err != nil {
return nil, err
}
jobs := make([]*schema.Job, 0, 10)
for rows.Next() {
job, err := scanJob(rows)
if err != nil {
return nil, err
}
jobs = append(jobs, job)
}
return jobs, nil
}
// FindById executes a SQL query to find a specific batch job. // FindById executes a SQL query to find a specific batch job.
// The job is queried using the database id. // The job is queried using the database id.
// It returns a pointer to a schema.Job data structure and an error variable. // It returns a pointer to a schema.Job data structure and an error variable.