Merge pull request #162 from ClusterCockpit/refactor-concurrent-job-resolver

Refactor concurrent job resolver
This commit is contained in:
Jan Eitzinger
2023-06-26 14:28:00 +02:00
committed by GitHub
3 changed files with 99 additions and 111 deletions

View File

@@ -291,6 +291,97 @@ func (r *JobRepository) FindById(jobId int64) (*schema.Job, error) {
return scanJob(q.RunWith(r.stmtCache).QueryRow())
}
func (r *JobRepository) FindConcurrentJobs(
ctx context.Context,
job *schema.Job) (*model.JobLinkResultList, error) {
if job == nil {
return nil, nil
}
query, qerr := SecurityCheck(ctx, sq.Select("job.id", "job.job_id").From("job"))
if qerr != nil {
return nil, qerr
}
query = query.Where("cluster = ?", job.Cluster)
var startTime int64
var stopTime int64
startTime = job.StartTimeUnix
if job.State == schema.JobStateRunning {
stopTime = time.Now().Unix()
} else {
stopTime = startTime + int64(job.Duration)
}
// Add 5m overlap for jobs start time at the end
stopTimeTail := stopTime - 300
startTimeTail := startTime + 10
startTimeFront := startTime + 300
queryRunning := query.Where("job.job_state = ?").Where("(job.start_time BETWEEN ? AND ?) OR (job.start_time < ?))",
"running", startTimeTail, stopTimeTail, startTime)
query = query.Where("job.job_state != ?").Where("(job.start_time BETWEEN ? AND ?) OR ((job.start_time + job.duration) BETWEEN ? AND ?) OR ((job.start_time < ?) AND ((job.start_time + job.duration)) > ?)",
"running", startTimeTail, stopTimeTail, startTimeFront, stopTime, startTime, stopTime)
rows, err := query.RunWith(r.stmtCache).Query()
if err != nil {
log.Errorf("Error while running query: %v", err)
return nil, err
}
items := make([]*model.JobLink, 0, 10)
for rows.Next() {
var id, jobId sql.NullInt64
if err = rows.Scan(&id, &jobId); err != nil {
log.Warn("Error while scanning rows")
return nil, err
}
if id.Valid {
items = append(items,
&model.JobLink{
ID: fmt.Sprint(id),
JobID: int(jobId.Int64),
})
}
}
rows, err = queryRunning.RunWith(r.stmtCache).Query()
if err != nil {
log.Errorf("Error while running query: %v", err)
return nil, err
}
for rows.Next() {
var id, jobId sql.NullInt64
if err := rows.Scan(&id, &jobId); err != nil {
log.Warn("Error while scanning rows")
return nil, err
}
if id.Valid {
items = append(items,
&model.JobLink{
ID: fmt.Sprint(id),
JobID: int(jobId.Int64),
})
}
}
cnt := len(items)
return &model.JobLinkResultList{
Items: items,
Count: &cnt,
}, nil
}
// Start inserts a new job in the table, returning the unique job ID.
// Statistics are not transfered!
func (r *JobRepository) Start(job *schema.JobMeta) (id int64, err error) {