diff --git a/api/rest.go b/api/rest.go index a40131e..a967d55 100644 --- a/api/rest.go +++ b/api/rest.go @@ -9,6 +9,7 @@ import ( "net/http" "os" "path/filepath" + "strconv" "sync" "github.com/ClusterCockpit/cc-backend/auth" @@ -203,16 +204,14 @@ func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) { } // Check if combination of (job_id, cluster_id, start_time) already exists: - rows, err := api.JobRepository.JobExists(req.JobID, req.Cluster, req.StartTime) + job, err := api.JobRepository.Find(req.JobID, req.Cluster, req.StartTime) if err != nil { http.Error(rw, err.Error(), http.StatusInternalServerError) return } - if rows.Next() { - var id int64 = -1 - rows.Scan(&id) - http.Error(rw, fmt.Sprintf("a job with that job_id, cluster_id and start_time already exists (database id: %d)", id), http.StatusUnprocessableEntity) + if job != nil { + http.Error(rw, fmt.Sprintf("a job with that job_id, cluster_id and start_time already exists (database id: %d)", job.ID), http.StatusUnprocessableEntity) return } @@ -226,7 +225,7 @@ func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) { return } - res, err := api.JobRepository.Add(req) + res, err := api.JobRepository.Start(req) if err != nil { log.Errorf("insert into job table failed: %s", err.Error()) http.Error(rw, err.Error(), http.StatusInternalServerError) @@ -264,9 +263,14 @@ func (api *RestApi) stopJob(rw http.ResponseWriter, r *http.Request) { var job *schema.Job var err error if ok { - job, err = api.JobRepository.StopById(id) + id, err := strconv.ParseInt(id, 10, 64) + if err != nil { + http.Error(rw, err.Error(), http.StatusBadRequest) + } + + job, err = api.JobRepository.FindById(id) } else { - job, err = api.JobRepository.Stop(*req.JobId, *req.Cluster, *req.StartTime) + job, err = api.JobRepository.Find(*req.JobId, *req.Cluster, *req.StartTime) } if err != nil { @@ -301,7 +305,7 @@ func (api *RestApi) stopJob(rw http.ResponseWriter, r *http.Request) { return err } - api.JobRepository.Close(job.JobID, job.Duration, req.State, jobMeta.Statistics) + api.JobRepository.Stop(job.JobID, job.Duration, req.State, jobMeta.Statistics) log.Printf("job stopped and archived (dbid: %d)", job.ID) return nil } diff --git a/repository/job.go b/repository/job.go index ce08f97..c0061aa 100644 --- a/repository/job.go +++ b/repository/job.go @@ -13,7 +13,34 @@ type JobRepository struct { DB *sqlx.DB } -func (r *JobRepository) FindJobById( +// Find executes a SQL query to find a specific batch job. +// The job is queried using the batch job id, the cluster name, +// and the start time of the job in UNIX epoch time seconds. +// It returns a pointer to a schema.Job data structure and an error variable. +// If the job was not found nil is returned for the job pointer. +func (r *JobRepository) Find( + jobId int64, + cluster string, + startTime int64) (*schema.Job, error) { + qb := sq.Select(schema.JobColumns...).From("job"). + Where("job.job_id = ?", jobId). + Where("job.cluster = ?", cluster). + Where("job.start_time = ?", startTime) + + sql, args, err := qb.ToSql() + if err != nil { + return nil, err + } + + job, err := schema.ScanJob(r.DB.QueryRowx(sql, args...)) + return job, err +} + +// FindById executes a SQL query to find a specific batch job. +// The job is queried using the database id. +// It returns a pointer to a schema.Job data structure and an error variable. +// If the job was not found nil is returned for the job pointer. +func (r *JobRepository) FindById( jobId int64) (*schema.Job, error) { sql, args, err := sq.Select(schema.JobColumns...). From("job").Where("job.id = ?", jobId).ToSql() @@ -25,36 +52,7 @@ func (r *JobRepository) FindJobById( return job, err } -// func (r *JobRepository) FindJobsByFilter( ) ([]*schema.Job, int, error) { - -// } - -func (r *JobRepository) FindJobByIdWithUser( - jobId int64, - username string) (*schema.Job, error) { - - sql, args, err := sq.Select(schema.JobColumns...). - From("job"). - Where("job.id = ?", jobId). - Where("job.user = ?", username).ToSql() - if err != nil { - return nil, err - } - - job, err := schema.ScanJob(r.DB.QueryRowx(sql, args...)) - return job, err -} - -func (r *JobRepository) JobExists( - jobId int64, - cluster string, - startTime int64) (rows *sql.Rows, err error) { - rows, err = r.DB.Query(`SELECT job.id FROM job WHERE job.job_id = ? AND job.cluster = ? AND job.start_time = ?`, - jobId, cluster, startTime) - return -} - -func (r *JobRepository) Add(job schema.JobMeta) (res sql.Result, err error) { +func (r *JobRepository) Start(job schema.JobMeta) (res sql.Result, err error) { res, err = r.DB.NamedExec(`INSERT INTO job ( job_id, user, project, cluster, `+"`partition`"+`, array_job_id, num_nodes, num_hwthreads, num_acc, exclusive, monitoring_status, smt, job_state, start_time, duration, resources, meta_data @@ -66,39 +64,6 @@ func (r *JobRepository) Add(job schema.JobMeta) (res sql.Result, err error) { } func (r *JobRepository) Stop( - jobId int64, - cluster string, - startTime int64) (job *schema.Job, err error) { - var sql string - var args []interface{} - qb := sq.Select(schema.JobColumns...).From("job"). - Where("job.job_id = ?", jobId). - Where("job.cluster = ?", cluster) - if startTime != 0 { - qb = qb.Where("job.start_time = ?", startTime) - } - sql, args, err = qb.ToSql() - if err != nil { - return - } - job, err = schema.ScanJob(r.DB.QueryRowx(sql, args...)) - return -} - -func (r *JobRepository) StopById(id string) (job *schema.Job, err error) { - var sql string - var args []interface{} - qb := sq.Select(schema.JobColumns...).From("job"). - Where("job.id = ?", id) - sql, args, err = qb.ToSql() - if err != nil { - return - } - job, err = schema.ScanJob(r.DB.QueryRowx(sql, args...)) - return -} - -func (r *JobRepository) Close( jobId int64, duration int32, state schema.JobState, @@ -136,31 +101,6 @@ func (r *JobRepository) Close( } } -func (r *JobRepository) findById(id string) (job *schema.Job, err error) { - var sql string - var args []interface{} - sql, args, err = sq.Select(schema.JobColumns...).From("job").Where("job.id = ?", id).ToSql() - if err != nil { - return - } - job, err = schema.ScanJob(r.DB.QueryRowx(sql, args...)) - return -} - -func (r *JobRepository) Exists(jobId int64) bool { - rows, err := r.DB.Query(`SELECT job.id FROM job WHERE job.job_id = ?`, jobId) - - if err != nil { - return false - } - - if rows.Next() { - return true - } - - return false -} - func (r *JobRepository) AddTag(jobId int64, tagId int64) error { _, err := r.DB.Exec(`INSERT INTO jobtag (job_id, tag_id) VALUES (?, ?)`, jobId, tagId) return err