diff --git a/internal/graph/schema.resolvers.go b/internal/graph/schema.resolvers.go index bf9c6e7..571a8b9 100644 --- a/internal/graph/schema.resolvers.go +++ b/internal/graph/schema.resolvers.go @@ -32,32 +32,15 @@ func (r *jobResolver) Tags(ctx context.Context, obj *schema.Job) ([]*schema.Tag, } // ConcurrentJobs is the resolver for the concurrentJobs field. -func (r *jobResolver) ConcurrentJobs(ctx context.Context, obj *schema.Job) (*model.JobLinkResultList, error) { - exc := int(obj.Exclusive) - if exc != 1 { - filter := []*model.JobFilter{} - jid := fmt.Sprint(obj.JobID) - jdu := int(obj.Duration) - filter = append(filter, &model.JobFilter{Exclusive: &exc}) - filter = append(filter, &model.JobFilter{SharedNode: &model.StringInput{Contains: &obj.Resources[0].Hostname}}) - filter = append(filter, &model.JobFilter{SelfJobID: &model.StringInput{Neq: &jid}}) - filter = append(filter, &model.JobFilter{SelfStartTime: &obj.StartTime, SelfDuration: &jdu}) +func (r *jobResolver) ConcurrentJobs( + ctx context.Context, obj *schema.Job) (*model.JobLinkResultList, error) { - jobLinks, err := r.Repo.QueryJobLinks(ctx, filter) - if err != nil { - log.Warn("Error while querying jobLinks") - return nil, err - } + if obj.State == schema.JobStateRunning { + obj.Duration = int32(time.Now().Unix() - obj.StartTimeUnix) + } - count, err := r.Repo.CountJobs(ctx, filter) - if err != nil { - log.Warn("Error while counting jobLinks") - return nil, err - } - - result := &model.JobLinkResultList{Items: jobLinks, Count: &count} - - return result, nil + if obj.Exclusive != 1 && obj.Duration > 600 { + return r.Repo.FindConcurrentJobs(ctx, obj) } return nil, nil diff --git a/internal/repository/job.go b/internal/repository/job.go index 2ceab95..504e34e 100644 --- a/internal/repository/job.go +++ b/internal/repository/job.go @@ -291,6 +291,97 @@ func (r *JobRepository) FindById(jobId int64) (*schema.Job, error) { return scanJob(q.RunWith(r.stmtCache).QueryRow()) } +func (r *JobRepository) FindConcurrentJobs( + ctx context.Context, + job *schema.Job) (*model.JobLinkResultList, error) { + if job == nil { + return nil, nil + } + + query, qerr := SecurityCheck(ctx, sq.Select("job.id", "job.job_id").From("job")) + if qerr != nil { + return nil, qerr + } + + query = query.Where("cluster = ?", job.Cluster) + var startTime int64 + var stopTime int64 + + startTime = job.StartTimeUnix + + if job.State == schema.JobStateRunning { + stopTime = time.Now().Unix() + } else { + stopTime = startTime + int64(job.Duration) + } + + // Add 5m overlap for jobs start time at the end + stopTimeTail := stopTime - 300 + startTimeTail := startTime + 10 + startTimeFront := startTime + 300 + + queryRunning := query.Where("job.job_state = ?").Where("(job.start_time BETWEEN ? AND ?) OR (job.start_time < ?))", + "running", startTimeTail, stopTimeTail, startTime) + + query = query.Where("job.job_state != ?").Where("(job.start_time BETWEEN ? AND ?) OR ((job.start_time + job.duration) BETWEEN ? AND ?) OR ((job.start_time < ?) AND ((job.start_time + job.duration)) > ?)", + "running", startTimeTail, stopTimeTail, startTimeFront, stopTime, startTime, stopTime) + + rows, err := query.RunWith(r.stmtCache).Query() + if err != nil { + log.Errorf("Error while running query: %v", err) + return nil, err + } + + items := make([]*model.JobLink, 0, 10) + + for rows.Next() { + var id, jobId sql.NullInt64 + + if err = rows.Scan(&id, &jobId); err != nil { + log.Warn("Error while scanning rows") + return nil, err + } + + if id.Valid { + items = append(items, + &model.JobLink{ + ID: fmt.Sprint(id), + JobID: int(jobId.Int64), + }) + } + } + + rows, err = queryRunning.RunWith(r.stmtCache).Query() + if err != nil { + log.Errorf("Error while running query: %v", err) + return nil, err + } + + for rows.Next() { + var id, jobId sql.NullInt64 + + if err := rows.Scan(&id, &jobId); err != nil { + log.Warn("Error while scanning rows") + return nil, err + } + + if id.Valid { + items = append(items, + &model.JobLink{ + ID: fmt.Sprint(id), + JobID: int(jobId.Int64), + }) + } + } + + cnt := len(items) + + return &model.JobLinkResultList{ + Items: items, + Count: &cnt, + }, nil +} + // Start inserts a new job in the table, returning the unique job ID. // Statistics are not transfered! func (r *JobRepository) Start(job *schema.JobMeta) (id int64, err error) { diff --git a/internal/repository/query.go b/internal/repository/query.go index 56ba173..5cebf1d 100644 --- a/internal/repository/query.go +++ b/internal/repository/query.go @@ -48,16 +48,9 @@ func (r *JobRepository) queryJobs( query = BuildWhereClause(f, query) } - sql, args, err := query.ToSql() - if err != nil { - log.Warn("Error while converting query to sql") - return nil, err - } - - log.Debugf("SQL query: `%s`, args: %#v", sql, args) rows, err := query.RunWith(r.stmtCache).Query() if err != nil { - log.Error("Error while running query") + log.Errorf("Error while running query: %v", err) return nil, err } @@ -92,7 +85,6 @@ func (r *JobRepository) QueryJobs( order *model.OrderByInput) ([]*schema.Job, error) { query, qerr := SecurityCheck(ctx, sq.Select(jobColumns...).From("job")) - if qerr != nil { return nil, qerr } @@ -100,62 +92,6 @@ func (r *JobRepository) QueryJobs( return r.queryJobs(query, filters, page, order) } -// SecurityCheck-less, private: returns a list of minimal job information (DB-ID and jobId) of shared jobs for link-building based the provided filters. -func (r *JobRepository) queryJobLinks( - query sq.SelectBuilder, - filters []*model.JobFilter) ([]*model.JobLink, error) { - - for _, f := range filters { - query = BuildWhereClause(f, query) - } - - sql, args, err := query.ToSql() - if err != nil { - log.Warn("Error while converting query to sql") - return nil, err - } - - log.Debugf("SQL query: `%s`, args: %#v", sql, args) - rows, err := query.RunWith(r.stmtCache).Query() - if err != nil { - log.Error("Error while running query") - return nil, err - } - - jobLinks := make([]*model.JobLink, 0, 50) - for rows.Next() { - jobLink, err := scanJobLink(rows) - if err != nil { - rows.Close() - log.Warn("Error while scanning rows (JobLinks)") - return nil, err - } - jobLinks = append(jobLinks, jobLink) - } - - return jobLinks, nil -} - -// testFunction for queryJobLinks -func (r *JobRepository) testQueryJobLinks( - filters []*model.JobFilter) ([]*model.JobLink, error) { - - return r.queryJobLinks(sq.Select(jobColumns...).From("job"), filters) -} - -func (r *JobRepository) QueryJobLinks( - ctx context.Context, - filters []*model.JobFilter) ([]*model.JobLink, error) { - - query, qerr := SecurityCheck(ctx, sq.Select("job.id", "job.job_id").From("job")) - - if qerr != nil { - return nil, qerr - } - - return r.queryJobLinks(query, filters) -} - // SecurityCheck-less, private: Returns the number of jobs matching the filters func (r *JobRepository) countJobs(query sq.SelectBuilder, filters []*model.JobFilter) (int, error) { @@ -164,13 +100,6 @@ func (r *JobRepository) countJobs(query sq.SelectBuilder, query = BuildWhereClause(f, query) } - sql, args, err := query.ToSql() - if err != nil { - log.Warn("Error while converting query to sql") - return 0, nil - } - - log.Debugf("SQL query: `%s`, args: %#v", sql, args) var count int if err := query.RunWith(r.DB).Scan(&count); err != nil { return 0, err @@ -291,21 +220,6 @@ func BuildWhereClause(filter *model.JobFilter, query sq.SelectBuilder) sq.Select if filter.MemUsedMax != nil { query = buildFloatCondition("job.mem_used_max", filter.MemUsedMax, query) } - // Shared Jobs Query - if filter.Exclusive != nil { - query = query.Where("job.exclusive = ?", *filter.Exclusive) - } - if filter.SharedNode != nil { - query = buildStringCondition("job.resources", filter.SharedNode, query) - } - if filter.SelfJobID != nil { - query = buildStringCondition("job.job_id", filter.SelfJobID, query) - } - if filter.SelfStartTime != nil && filter.SelfDuration != nil { - start := filter.SelfStartTime.Unix() + 10 // There does not seem to be a portable way to get the current unix timestamp accross different DBs. - end := start + int64(*filter.SelfDuration) - 20 - query = query.Where("((job.start_time BETWEEN ? AND ?) OR ((job.start_time + job.duration) BETWEEN ? AND ?))", start, end, start, end) - } return query }