diff --git a/internal/repository/jobFind.go b/internal/repository/jobFind.go index ff2c27aa..8f6daeb4 100644 --- a/internal/repository/jobFind.go +++ b/internal/repository/jobFind.go @@ -27,6 +27,10 @@ func (r *JobRepository) Find( cluster *string, startTime *int64, ) (*schema.Job, error) { + if jobID == nil { + return nil, fmt.Errorf("jobID cannot be nil") + } + start := time.Now() q := sq.Select(jobColumns...).From("job"). Where("job.job_id = ?", *jobID) @@ -38,17 +42,27 @@ func (r *JobRepository) Find( q = q.Where("job.start_time = ?", *startTime) } - q = q.OrderBy("job.id DESC") // always use newest matching job by db id if more than one match + q = q.OrderBy("job.id DESC").Limit(1) // always use newest matching job by db id if more than one match cclog.Debugf("Timer Find %s", time.Since(start)) return scanJob(q.RunWith(r.stmtCache).QueryRow()) } +// FindCached executes a SQL query to find a specific batch job from the job_cache table. +// The job is queried using the batch job id, and optionally filtered by cluster name +// and start time (UNIX epoch time seconds). This method uses cached job data which +// may be stale but provides faster access than Find(). +// It returns a pointer to a schema.Job data structure and an error variable. +// To check if no job was found test err == sql.ErrNoRows func (r *JobRepository) FindCached( jobID *int64, cluster *string, startTime *int64, ) (*schema.Job, error) { + if jobID == nil { + return nil, fmt.Errorf("jobID cannot be nil") + } + q := sq.Select(jobCacheColumns...).From("job_cache"). Where("job_cache.job_id = ?", *jobID) @@ -59,7 +73,7 @@ func (r *JobRepository) FindCached( q = q.Where("job_cache.start_time = ?", *startTime) } - q = q.OrderBy("job_cache.id DESC") // always use newest matching job by db id if more than one match + q = q.OrderBy("job_cache.id DESC").Limit(1) // always use newest matching job by db id if more than one match return scanJob(q.RunWith(r.stmtCache).QueryRow()) } @@ -74,6 +88,10 @@ func (r *JobRepository) FindAll( cluster *string, startTime *int64, ) ([]*schema.Job, error) { + if jobID == nil { + return nil, fmt.Errorf("jobID cannot be nil") + } + start := time.Now() q := sq.Select(jobColumns...).From("job"). Where("job.job_id = ?", *jobID) @@ -87,8 +105,8 @@ func (r *JobRepository) FindAll( rows, err := q.RunWith(r.stmtCache).Query() if err != nil { - cclog.Error("Error while running query") - return nil, err + cclog.Errorf("Error while running FindAll query for jobID=%d: %v", *jobID, err) + return nil, fmt.Errorf("failed to execute FindAll query: %w", err) } defer rows.Close() @@ -96,8 +114,8 @@ func (r *JobRepository) FindAll( for rows.Next() { job, err := scanJob(rows) if err != nil { - cclog.Warn("Error while scanning rows") - return nil, err + cclog.Warnf("Error while scanning rows in FindAll: %v", err) + return nil, fmt.Errorf("failed to scan job row: %w", err) } jobs = append(jobs, job) } @@ -120,8 +138,8 @@ func (r *JobRepository) GetJobList(limit int, offset int) ([]int64, error) { rows, err := query.RunWith(r.stmtCache).Query() if err != nil { - cclog.Error("Error while running query") - return nil, err + cclog.Errorf("Error while running GetJobList query (limit=%d, offset=%d): %v", limit, offset, err) + return nil, fmt.Errorf("failed to execute GetJobList query: %w", err) } defer rows.Close() @@ -130,8 +148,8 @@ func (r *JobRepository) GetJobList(limit int, offset int) ([]int64, error) { var id int64 err := rows.Scan(&id) if err != nil { - cclog.Warn("Error while scanning rows") - return nil, err + cclog.Warnf("Error while scanning rows in GetJobList: %v", err) + return nil, fmt.Errorf("failed to scan job ID: %w", err) } jl = append(jl, id) } @@ -202,10 +220,10 @@ func (r *JobRepository) FindByJobID(ctx context.Context, jobID int64, startTime return scanJob(q.RunWith(r.stmtCache).QueryRow()) } -// IsJobOwner executes a SQL query to find a specific batch job. -// The job is queried using the slurm id,a username and the cluster. -// It returns a bool. -// If job was found, user is owner: test err != sql.ErrNoRows +// IsJobOwner checks if the specified user owns the batch job identified by jobID, +// startTime, and cluster. Returns true if the user is the owner, false otherwise. +// This method does not return errors; it returns false for both non-existent jobs +// and jobs owned by other users. func (r *JobRepository) IsJobOwner(jobID int64, startTime int64, user string, cluster string) bool { q := sq.Select("id"). From("job"). @@ -215,6 +233,9 @@ func (r *JobRepository) IsJobOwner(jobID int64, startTime int64, user string, cl Where("job.start_time = ?", startTime) _, err := scanJob(q.RunWith(r.stmtCache).QueryRow()) + if err != nil && err != sql.ErrNoRows { + cclog.Warnf("IsJobOwner: unexpected error for jobID=%d, user=%s, cluster=%s: %v", jobID, user, cluster, err) + } return err != sql.ErrNoRows } @@ -232,6 +253,11 @@ func (r *JobRepository) FindConcurrentJobs( } query = query.Where("cluster = ?", job.Cluster) + + if len(job.Resources) == 0 { + return nil, fmt.Errorf("job has no resources defined") + } + var startTime int64 var stopTime int64 @@ -244,10 +270,15 @@ func (r *JobRepository) FindConcurrentJobs( stopTime = startTime + int64(job.Duration) } - // Add 200s overlap for jobs start time at the end - startTimeTail := startTime + 10 - stopTimeTail := stopTime - 200 - startTimeFront := startTime + 200 + // Time buffer constants for finding overlapping jobs + // overlapBufferStart: 10s grace period at job start to catch jobs starting just after + // overlapBufferEnd: 200s buffer at job end to account for scheduling/cleanup overlap + const overlapBufferStart = 10 + const overlapBufferEnd = 200 + + startTimeTail := startTime + overlapBufferStart + stopTimeTail := stopTime - overlapBufferEnd + startTimeFront := startTime + overlapBufferEnd queryRunning := query.Where("job.job_state = ?").Where("(job.start_time BETWEEN ? AND ? OR job.start_time < ?)", "running", startTimeTail, stopTimeTail, startTime) @@ -261,8 +292,8 @@ func (r *JobRepository) FindConcurrentJobs( rows, err := query.RunWith(r.stmtCache).Query() if err != nil { - cclog.Errorf("Error while running query: %v", err) - return nil, err + cclog.Errorf("Error while running concurrent jobs query: %v", err) + return nil, fmt.Errorf("failed to execute concurrent jobs query: %w", err) } defer rows.Close() @@ -273,8 +304,8 @@ func (r *JobRepository) FindConcurrentJobs( var id, jobID, startTime sql.NullInt64 if err = rows.Scan(&id, &jobID, &startTime); err != nil { - cclog.Warn("Error while scanning rows") - return nil, err + cclog.Warnf("Error while scanning concurrent job rows: %v", err) + return nil, fmt.Errorf("failed to scan concurrent job row: %w", err) } if id.Valid { @@ -289,8 +320,8 @@ func (r *JobRepository) FindConcurrentJobs( rows, err = queryRunning.RunWith(r.stmtCache).Query() if err != nil { - cclog.Errorf("Error while running query: %v", err) - return nil, err + cclog.Errorf("Error while running concurrent running jobs query: %v", err) + return nil, fmt.Errorf("failed to execute concurrent running jobs query: %w", err) } defer rows.Close() @@ -298,8 +329,8 @@ func (r *JobRepository) FindConcurrentJobs( var id, jobID, startTime sql.NullInt64 if err := rows.Scan(&id, &jobID, &startTime); err != nil { - cclog.Warn("Error while scanning rows") - return nil, err + cclog.Warnf("Error while scanning running concurrent job rows: %v", err) + return nil, fmt.Errorf("failed to scan running concurrent job row: %w", err) } if id.Valid {