Refactor query logic and move to job.go

This commit is contained in:
Jan Eitzinger 2023-06-20 15:52:16 +02:00
parent 33613cdda0
commit bb7c1005c9
2 changed files with 79 additions and 84 deletions

View File

@ -294,11 +294,16 @@ func (r *JobRepository) FindById(jobId int64) (*schema.Job, error) {
func (r *JobRepository) FindConcurrentJobs( func (r *JobRepository) FindConcurrentJobs(
job *schema.Job) (*model.JobLinkResultList, error) { job *schema.Job) (*model.JobLinkResultList, error) {
if job == nil {
return nil, nil
}
query := sq.Select("job.id","job.job_id").From("job").Where("cluster = ?",job.Cluster) query := sq.Select("job.id", "job.job_id").From("job").Where("cluster = ?", job.Cluster)
var startTime := job.StartTimeUnix var startTime int64
var stopTime int64 var stopTime int64
startTime = job.StartTimeUnix
if job.State == schema.JobStateRunning { if job.State == schema.JobStateRunning {
stopTime = time.Now().Unix() stopTime = time.Now().Unix()
} else { } else {
@ -306,12 +311,72 @@ func (r *JobRepository) FindConcurrentJobs(
} }
// Add 5m overlap for jobs start time at the end // Add 5m overlap for jobs start time at the end
stopTime -= 300 stopTimeTail := stopTime - 300
startTimeTail := startTime + 10
startTimeFront := startTime + 300
query = query.Where("start_time BETWEEN ? AND ?", startTime, stopTime) queryRunning := query.Where("job.job_state = ?").Where("(job.start_time BETWEEN ? AND ?) OR (job.start_time < ?))",
"running", startTimeTail, stopTimeTail, startTime)
query = query.Where("job.job_state != ?").Where("(job.start_time BETWEEN ? AND ?) OR ((job.start_time + job.duration) BETWEEN ? AND ?) OR ((job.start_time < ?) AND ((job.start_time + job.duration)) > ?)",
"running", startTimeTail, stopTimeTail, startTimeFront, stopTime, startTime, stopTime)
rows, err := query.RunWith(r.stmtCache).Query()
if err != nil {
log.Errorf("Error while running query: %v", err)
return nil, err
} }
items := make([]*model.JobLink, 0, 10)
for rows.Next() {
var id, jobId sql.NullInt64
if err = rows.Scan(&id, &jobId); err != nil {
log.Warn("Error while scanning rows")
return nil, err
}
if id.Valid {
items = append(items,
&model.JobLink{
ID: fmt.Sprint(id),
JobID: int(jobId.Int64),
})
}
}
rows, err = queryRunning.RunWith(r.stmtCache).Query()
if err != nil {
log.Errorf("Error while running query: %v", err)
return nil, err
}
for rows.Next() {
var id, jobId sql.NullInt64
if err := rows.Scan(&id, &jobId); err != nil {
log.Warn("Error while scanning rows")
return nil, err
}
if id.Valid {
items = append(items,
&model.JobLink{
ID: fmt.Sprint(id),
JobID: int(jobId.Int64),
})
}
}
cnt := len(items)
return &model.JobLinkResultList{
Items: items,
Count: &cnt,
}, nil
}
// Start inserts a new job in the table, returning the unique job ID. // Start inserts a new job in the table, returning the unique job ID.
// Statistics are not transfered! // Statistics are not transfered!
func (r *JobRepository) Start(job *schema.JobMeta) (id int64, err error) { func (r *JobRepository) Start(job *schema.JobMeta) (id int64, err error) {

View File

@ -48,16 +48,9 @@ func (r *JobRepository) queryJobs(
query = BuildWhereClause(f, query) query = BuildWhereClause(f, query)
} }
sql, args, err := query.ToSql()
if err != nil {
log.Warn("Error while converting query to sql")
return nil, err
}
log.Debugf("SQL query: `%s`, args: %#v", sql, args)
rows, err := query.RunWith(r.stmtCache).Query() rows, err := query.RunWith(r.stmtCache).Query()
if err != nil { if err != nil {
log.Error("Error while running query") log.Errorf("Error while running query: %v", err)
return nil, err return nil, err
} }
@ -102,62 +95,6 @@ func (r *JobRepository) QueryJobs(
filters, page, order) filters, page, order)
} }
// SecurityCheck-less, private: returns a list of minimal job information (DB-ID and jobId) of shared jobs for link-building based the provided filters.
func (r *JobRepository) queryJobLinks(
query sq.SelectBuilder,
filters []*model.JobFilter) ([]*model.JobLink, error) {
for _, f := range filters {
query = BuildWhereClause(f, query)
}
sql, args, err := query.ToSql()
if err != nil {
log.Warn("Error while converting query to sql")
return nil, err
}
log.Debugf("SQL query: `%s`, args: %#v", sql, args)
rows, err := query.RunWith(r.stmtCache).Query()
if err != nil {
log.Error("Error while running query")
return nil, err
}
jobLinks := make([]*model.JobLink, 0, 50)
for rows.Next() {
jobLink, err := scanJobLink(rows)
if err != nil {
rows.Close()
log.Warn("Error while scanning rows (JobLinks)")
return nil, err
}
jobLinks = append(jobLinks, jobLink)
}
return jobLinks, nil
}
// testFunction for queryJobLinks
func (r *JobRepository) testQueryJobLinks(
filters []*model.JobFilter) ([]*model.JobLink, error) {
return r.queryJobLinks(sq.Select(jobColumns...).From("job"), filters)
}
func (r *JobRepository) QueryJobLinks(
ctx context.Context,
filters []*model.JobFilter) ([]*model.JobLink, error) {
query, qerr := SecurityCheck(ctx, sq.Select("job.id", "job.job_id").From("job"))
if qerr != nil {
return nil, qerr
}
return r.queryJobLinks(query, filters)
}
// SecurityCheck-less, private: Returns the number of jobs matching the filters // SecurityCheck-less, private: Returns the number of jobs matching the filters
func (r *JobRepository) countJobs(query sq.SelectBuilder, func (r *JobRepository) countJobs(query sq.SelectBuilder,
filters []*model.JobFilter) (int, error) { filters []*model.JobFilter) (int, error) {
@ -166,13 +103,6 @@ func (r *JobRepository) countJobs(query sq.SelectBuilder,
query = BuildWhereClause(f, query) query = BuildWhereClause(f, query)
} }
sql, args, err := query.ToSql()
if err != nil {
log.Warn("Error while converting query to sql")
return 0, nil
}
log.Debugf("SQL query: `%s`, args: %#v", sql, args)
var count int var count int
if err := query.RunWith(r.DB).Scan(&count); err != nil { if err := query.RunWith(r.DB).Scan(&count); err != nil {
return 0, err return 0, err