From aa6336ea1eccd00592f2557cb81e7d3dec640a92 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Wed, 6 Mar 2024 14:50:08 +0100 Subject: [PATCH] Refactor Reformat. Convert to query builder. Add descriptive error log messages. --- internal/repository/job.go | 46 +++++++++++++++++++++----------------- 1 file changed, 26 insertions(+), 20 deletions(-) diff --git a/internal/repository/job.go b/internal/repository/job.go index e1a997a..db02283 100644 --- a/internal/repository/job.go +++ b/internal/repository/job.go @@ -223,8 +223,8 @@ func (r *JobRepository) UpdateMetadata(job *schema.Job, key, val string) (err er func (r *JobRepository) Find( jobId *int64, cluster *string, - startTime *int64) (*schema.Job, error) { - + startTime *int64, +) (*schema.Job, error) { start := time.Now() q := sq.Select(jobColumns...).From("job"). Where("job.job_id = ?", *jobId) @@ -248,8 +248,8 @@ func (r *JobRepository) Find( func (r *JobRepository) FindAll( jobId *int64, cluster *string, - startTime *int64) ([]*schema.Job, error) { - + startTime *int64, +) ([]*schema.Job, error) { start := time.Now() q := sq.Select(jobColumns...).From("job"). Where("job.job_id = ?", *jobId) @@ -292,7 +292,8 @@ func (r *JobRepository) FindById(jobId int64) (*schema.Job, error) { func (r *JobRepository) FindConcurrentJobs( ctx context.Context, - job *schema.Job) (*model.JobLinkResultList, error) { + job *schema.Job, +) (*model.JobLinkResultList, error) { if job == nil { return nil, nil } @@ -420,8 +421,8 @@ func (r *JobRepository) Stop( jobId int64, duration int32, state schema.JobState, - monitoringStatus int32) (err error) { - + monitoringStatus int32, +) (err error) { stmt := sq.Update("job"). Set("job_state", state). Set("duration", duration). @@ -434,11 +435,14 @@ func (r *JobRepository) Stop( func (r *JobRepository) DeleteJobsBefore(startTime int64) (int, error) { var cnt int - qs := fmt.Sprintf("SELECT count(*) FROM job WHERE job.start_time < %d", startTime) - err := r.DB.Get(&cnt, qs) //ignore error as it will also occur in delete statement - _, err = r.DB.Exec(`DELETE FROM job WHERE job.start_time < ?`, startTime) + q := sq.Select("count(*)").From("job").Where("job.start_time < ?", startTime) + q.RunWith(r.DB).QueryRow().Scan(cnt) + qd := sq.Delete("job").Where("job.start_time < ?", startTime) + _, err := qd.RunWith(r.DB).Exec() + if err != nil { - log.Errorf(" DeleteJobsBefore(%d): error %#v", startTime, err) + s, _, _ := qd.ToSql() + log.Errorf(" DeleteJobsBefore(%d) with %s: error %#v", startTime, s, err) } else { log.Debugf("DeleteJobsBefore(%d): Deleted %d jobs", startTime, cnt) } @@ -446,9 +450,12 @@ func (r *JobRepository) DeleteJobsBefore(startTime int64) (int, error) { } func (r *JobRepository) DeleteJobById(id int64) error { - _, err := r.DB.Exec(`DELETE FROM job WHERE job.id = ?`, id) + qd := sq.Delete("job").Where("job.id = ?", id) + _, err := qd.RunWith(r.DB).Exec() + if err != nil { - log.Errorf("DeleteJobById(%d): error %#v", id, err) + s, _, _ := qd.ToSql() + log.Errorf("DeleteJobById(%d) with %s : error %#v", id, s, err) } else { log.Debugf("DeleteJobById(%d): Success", id) } @@ -468,8 +475,8 @@ func (r *JobRepository) UpdateMonitoringStatus(job int64, monitoringStatus int32 func (r *JobRepository) MarkArchived( jobId int64, monitoringStatus int32, - metricStats map[string]schema.JobStatistics) error { - + metricStats map[string]schema.JobStatistics, +) error { stmt := sq.Update("job"). Set("monitoring_status", monitoringStatus). Where("job.id = ?", jobId) @@ -578,8 +585,10 @@ func (r *JobRepository) FindUserOrProjectOrJobname(user *schema.User, searchterm } } -var ErrNotFound = errors.New("no such jobname, project or user") -var ErrForbidden = errors.New("not authorized") +var ( + ErrNotFound = errors.New("no such jobname, project or user") + ErrForbidden = errors.New("not authorized") +) func (r *JobRepository) FindColumnValue(user *schema.User, searchterm string, table string, selectColumn string, whereColumn string, isLike bool) (result string, err error) { compareStr := " = ?" @@ -663,7 +672,6 @@ func (r *JobRepository) Partitions(cluster string) ([]string, error) { // AllocatedNodes returns a map of all subclusters to a map of hostnames to the amount of jobs running on that host. // Hosts with zero jobs running on them will not show up! func (r *JobRepository) AllocatedNodes(cluster string) (map[string]map[string]int, error) { - start := time.Now() subclusters := make(map[string]map[string]int) rows, err := sq.Select("resources", "subcluster").From("job"). @@ -706,7 +714,6 @@ func (r *JobRepository) AllocatedNodes(cluster string) (map[string]map[string]in } func (r *JobRepository) StopJobsExceedingWalltimeBy(seconds int) error { - start := time.Now() res, err := sq.Update("job"). Set("monitoring_status", schema.MonitoringStatusArchivingFailed). @@ -735,7 +742,6 @@ func (r *JobRepository) StopJobsExceedingWalltimeBy(seconds int) error { } func (r *JobRepository) FindJobsBetween(startTimeBegin int64, startTimeEnd int64) ([]*schema.Job, error) { - var query sq.SelectBuilder if startTimeBegin == startTimeEnd || startTimeBegin > startTimeEnd {