From 33613cdda0cac4ec07e64facdf3d33588bbb9f4c Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Tue, 20 Jun 2023 10:38:53 +0200 Subject: [PATCH 1/3] Sync commit --- internal/graph/schema.resolvers.go | 31 +++++++----------------------- internal/repository/job.go | 20 +++++++++++++++++++ 2 files changed, 27 insertions(+), 24 deletions(-) diff --git a/internal/graph/schema.resolvers.go b/internal/graph/schema.resolvers.go index bf9c6e7..03e74df 100644 --- a/internal/graph/schema.resolvers.go +++ b/internal/graph/schema.resolvers.go @@ -32,32 +32,15 @@ func (r *jobResolver) Tags(ctx context.Context, obj *schema.Job) ([]*schema.Tag, } // ConcurrentJobs is the resolver for the concurrentJobs field. -func (r *jobResolver) ConcurrentJobs(ctx context.Context, obj *schema.Job) (*model.JobLinkResultList, error) { - exc := int(obj.Exclusive) - if exc != 1 { - filter := []*model.JobFilter{} - jid := fmt.Sprint(obj.JobID) - jdu := int(obj.Duration) - filter = append(filter, &model.JobFilter{Exclusive: &exc}) - filter = append(filter, &model.JobFilter{SharedNode: &model.StringInput{Contains: &obj.Resources[0].Hostname}}) - filter = append(filter, &model.JobFilter{SelfJobID: &model.StringInput{Neq: &jid}}) - filter = append(filter, &model.JobFilter{SelfStartTime: &obj.StartTime, SelfDuration: &jdu}) +func (r *jobResolver) ConcurrentJobs( + ctx context.Context, obj *schema.Job) (*model.JobLinkResultList, error) { - jobLinks, err := r.Repo.QueryJobLinks(ctx, filter) - if err != nil { - log.Warn("Error while querying jobLinks") - return nil, err - } + if obj.State == schema.JobStateRunning { + obj.Duration = int32(time.Now().Unix() - obj.StartTimeUnix) + } - count, err := r.Repo.CountJobs(ctx, filter) - if err != nil { - log.Warn("Error while counting jobLinks") - return nil, err - } - - result := &model.JobLinkResultList{Items: jobLinks, Count: &count} - - return result, nil + if obj.Exclusive != 1 && obj.Duration > 600 { + return r.Repo.FindConcurrentJobs(obj) } return nil, nil diff --git a/internal/repository/job.go b/internal/repository/job.go index 0f280cd..b696b70 100644 --- a/internal/repository/job.go +++ b/internal/repository/job.go @@ -292,6 +292,26 @@ func (r *JobRepository) FindById(jobId int64) (*schema.Job, error) { return scanJob(q.RunWith(r.stmtCache).QueryRow()) } +func (r *JobRepository) FindConcurrentJobs( + job *schema.Job) (*model.JobLinkResultList, error) { + + query := sq.Select("job.id","job.job_id").From("job").Where("cluster = ?",job.Cluster) + var startTime := job.StartTimeUnix + var stopTime int64 + + if job.State == schema.JobStateRunning { + stopTime = time.Now().Unix() + } else { + stopTime = startTime + int64(job.Duration) + } + + // Add 5m overlap for jobs start time at the end + stopTime -= 300 + + query = query.Where("start_time BETWEEN ? AND ?", startTime, stopTime) + + } + // Start inserts a new job in the table, returning the unique job ID. // Statistics are not transfered! func (r *JobRepository) Start(job *schema.JobMeta) (id int64, err error) { From bb7c1005c90ef6aff908579b1555c5c29a9fa190 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Tue, 20 Jun 2023 15:52:16 +0200 Subject: [PATCH 2/3] Refactor query logic and move to job.go --- internal/repository/job.go | 91 ++++++++++++++++++++++++++++++------ internal/repository/query.go | 72 +--------------------------- 2 files changed, 79 insertions(+), 84 deletions(-) diff --git a/internal/repository/job.go b/internal/repository/job.go index b696b70..922db07 100644 --- a/internal/repository/job.go +++ b/internal/repository/job.go @@ -293,24 +293,89 @@ func (r *JobRepository) FindById(jobId int64) (*schema.Job, error) { } func (r *JobRepository) FindConcurrentJobs( - job *schema.Job) (*model.JobLinkResultList, error) { + job *schema.Job) (*model.JobLinkResultList, error) { + if job == nil { + return nil, nil + } - query := sq.Select("job.id","job.job_id").From("job").Where("cluster = ?",job.Cluster) - var startTime := job.StartTimeUnix - var stopTime int64 + query := sq.Select("job.id", "job.job_id").From("job").Where("cluster = ?", job.Cluster) + var startTime int64 + var stopTime int64 - if job.State == schema.JobStateRunning { - stopTime = time.Now().Unix() - } else { - stopTime = startTime + int64(job.Duration) - } + startTime = job.StartTimeUnix - // Add 5m overlap for jobs start time at the end - stopTime -= 300 + if job.State == schema.JobStateRunning { + stopTime = time.Now().Unix() + } else { + stopTime = startTime + int64(job.Duration) + } - query = query.Where("start_time BETWEEN ? AND ?", startTime, stopTime) + // Add 5m overlap for jobs start time at the end + stopTimeTail := stopTime - 300 + startTimeTail := startTime + 10 + startTimeFront := startTime + 300 - } + queryRunning := query.Where("job.job_state = ?").Where("(job.start_time BETWEEN ? AND ?) OR (job.start_time < ?))", + "running", startTimeTail, stopTimeTail, startTime) + + query = query.Where("job.job_state != ?").Where("(job.start_time BETWEEN ? AND ?) OR ((job.start_time + job.duration) BETWEEN ? AND ?) OR ((job.start_time < ?) AND ((job.start_time + job.duration)) > ?)", + "running", startTimeTail, stopTimeTail, startTimeFront, stopTime, startTime, stopTime) + + rows, err := query.RunWith(r.stmtCache).Query() + if err != nil { + log.Errorf("Error while running query: %v", err) + return nil, err + } + + items := make([]*model.JobLink, 0, 10) + + for rows.Next() { + var id, jobId sql.NullInt64 + + if err = rows.Scan(&id, &jobId); err != nil { + log.Warn("Error while scanning rows") + return nil, err + } + + if id.Valid { + items = append(items, + &model.JobLink{ + ID: fmt.Sprint(id), + JobID: int(jobId.Int64), + }) + } + } + + rows, err = queryRunning.RunWith(r.stmtCache).Query() + if err != nil { + log.Errorf("Error while running query: %v", err) + return nil, err + } + + for rows.Next() { + var id, jobId sql.NullInt64 + + if err := rows.Scan(&id, &jobId); err != nil { + log.Warn("Error while scanning rows") + return nil, err + } + + if id.Valid { + items = append(items, + &model.JobLink{ + ID: fmt.Sprint(id), + JobID: int(jobId.Int64), + }) + } + } + + cnt := len(items) + + return &model.JobLinkResultList{ + Items: items, + Count: &cnt, + }, nil +} // Start inserts a new job in the table, returning the unique job ID. // Statistics are not transfered! diff --git a/internal/repository/query.go b/internal/repository/query.go index 32ecea0..f047da2 100644 --- a/internal/repository/query.go +++ b/internal/repository/query.go @@ -48,16 +48,9 @@ func (r *JobRepository) queryJobs( query = BuildWhereClause(f, query) } - sql, args, err := query.ToSql() - if err != nil { - log.Warn("Error while converting query to sql") - return nil, err - } - - log.Debugf("SQL query: `%s`, args: %#v", sql, args) rows, err := query.RunWith(r.stmtCache).Query() if err != nil { - log.Error("Error while running query") + log.Errorf("Error while running query: %v", err) return nil, err } @@ -102,62 +95,6 @@ func (r *JobRepository) QueryJobs( filters, page, order) } -// SecurityCheck-less, private: returns a list of minimal job information (DB-ID and jobId) of shared jobs for link-building based the provided filters. -func (r *JobRepository) queryJobLinks( - query sq.SelectBuilder, - filters []*model.JobFilter) ([]*model.JobLink, error) { - - for _, f := range filters { - query = BuildWhereClause(f, query) - } - - sql, args, err := query.ToSql() - if err != nil { - log.Warn("Error while converting query to sql") - return nil, err - } - - log.Debugf("SQL query: `%s`, args: %#v", sql, args) - rows, err := query.RunWith(r.stmtCache).Query() - if err != nil { - log.Error("Error while running query") - return nil, err - } - - jobLinks := make([]*model.JobLink, 0, 50) - for rows.Next() { - jobLink, err := scanJobLink(rows) - if err != nil { - rows.Close() - log.Warn("Error while scanning rows (JobLinks)") - return nil, err - } - jobLinks = append(jobLinks, jobLink) - } - - return jobLinks, nil -} - -// testFunction for queryJobLinks -func (r *JobRepository) testQueryJobLinks( - filters []*model.JobFilter) ([]*model.JobLink, error) { - - return r.queryJobLinks(sq.Select(jobColumns...).From("job"), filters) -} - -func (r *JobRepository) QueryJobLinks( - ctx context.Context, - filters []*model.JobFilter) ([]*model.JobLink, error) { - - query, qerr := SecurityCheck(ctx, sq.Select("job.id", "job.job_id").From("job")) - - if qerr != nil { - return nil, qerr - } - - return r.queryJobLinks(query, filters) -} - // SecurityCheck-less, private: Returns the number of jobs matching the filters func (r *JobRepository) countJobs(query sq.SelectBuilder, filters []*model.JobFilter) (int, error) { @@ -166,13 +103,6 @@ func (r *JobRepository) countJobs(query sq.SelectBuilder, query = BuildWhereClause(f, query) } - sql, args, err := query.ToSql() - if err != nil { - log.Warn("Error while converting query to sql") - return 0, nil - } - - log.Debugf("SQL query: `%s`, args: %#v", sql, args) var count int if err := query.RunWith(r.DB).Scan(&count); err != nil { return 0, err From feba722a539c8d8063ce139889db4923cb0c70f0 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Thu, 22 Jun 2023 06:26:19 +0200 Subject: [PATCH 3/3] Refactor and Cleanup Add SecurityCheck --- internal/graph/schema.resolvers.go | 2 +- internal/repository/job.go | 8 +++++++- internal/repository/query.go | 16 ---------------- 3 files changed, 8 insertions(+), 18 deletions(-) diff --git a/internal/graph/schema.resolvers.go b/internal/graph/schema.resolvers.go index 03e74df..571a8b9 100644 --- a/internal/graph/schema.resolvers.go +++ b/internal/graph/schema.resolvers.go @@ -40,7 +40,7 @@ func (r *jobResolver) ConcurrentJobs( } if obj.Exclusive != 1 && obj.Duration > 600 { - return r.Repo.FindConcurrentJobs(obj) + return r.Repo.FindConcurrentJobs(ctx, obj) } return nil, nil diff --git a/internal/repository/job.go b/internal/repository/job.go index 922db07..c58748b 100644 --- a/internal/repository/job.go +++ b/internal/repository/job.go @@ -293,12 +293,18 @@ func (r *JobRepository) FindById(jobId int64) (*schema.Job, error) { } func (r *JobRepository) FindConcurrentJobs( + ctx context.Context, job *schema.Job) (*model.JobLinkResultList, error) { if job == nil { return nil, nil } - query := sq.Select("job.id", "job.job_id").From("job").Where("cluster = ?", job.Cluster) + query, qerr := SecurityCheck(ctx, sq.Select("job.id", "job.job_id").From("job")) + if qerr != nil { + return nil, qerr + } + + query = query.Where("cluster = ?", job.Cluster) var startTime int64 var stopTime int64 diff --git a/internal/repository/query.go b/internal/repository/query.go index f047da2..dd43dd7 100644 --- a/internal/repository/query.go +++ b/internal/repository/query.go @@ -86,7 +86,6 @@ func (r *JobRepository) QueryJobs( order *model.OrderByInput) ([]*schema.Job, error) { query, qerr := SecurityCheck(ctx, sq.Select(jobColumns...).From("job")) - if qerr != nil { return nil, qerr } @@ -223,21 +222,6 @@ func BuildWhereClause(filter *model.JobFilter, query sq.SelectBuilder) sq.Select if filter.MemUsedMax != nil { query = buildFloatCondition("job.mem_used_max", filter.MemUsedMax, query) } - // Shared Jobs Query - if filter.Exclusive != nil { - query = query.Where("job.exclusive = ?", *filter.Exclusive) - } - if filter.SharedNode != nil { - query = buildStringCondition("job.resources", filter.SharedNode, query) - } - if filter.SelfJobID != nil { - query = buildStringCondition("job.job_id", filter.SelfJobID, query) - } - if filter.SelfStartTime != nil && filter.SelfDuration != nil { - start := filter.SelfStartTime.Unix() + 10 // There does not seem to be a portable way to get the current unix timestamp accross different DBs. - end := start + int64(*filter.SelfDuration) - 20 - query = query.Where("((job.start_time BETWEEN ? AND ?) OR ((job.start_time + job.duration) BETWEEN ? AND ?))", start, end, start, end) - } return query }