From 33613cdda0cac4ec07e64facdf3d33588bbb9f4c Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Tue, 20 Jun 2023 10:38:53 +0200 Subject: [PATCH] Sync commit --- internal/graph/schema.resolvers.go | 31 +++++++----------------------- internal/repository/job.go | 20 +++++++++++++++++++ 2 files changed, 27 insertions(+), 24 deletions(-) diff --git a/internal/graph/schema.resolvers.go b/internal/graph/schema.resolvers.go index bf9c6e7..03e74df 100644 --- a/internal/graph/schema.resolvers.go +++ b/internal/graph/schema.resolvers.go @@ -32,32 +32,15 @@ func (r *jobResolver) Tags(ctx context.Context, obj *schema.Job) ([]*schema.Tag, } // ConcurrentJobs is the resolver for the concurrentJobs field. -func (r *jobResolver) ConcurrentJobs(ctx context.Context, obj *schema.Job) (*model.JobLinkResultList, error) { - exc := int(obj.Exclusive) - if exc != 1 { - filter := []*model.JobFilter{} - jid := fmt.Sprint(obj.JobID) - jdu := int(obj.Duration) - filter = append(filter, &model.JobFilter{Exclusive: &exc}) - filter = append(filter, &model.JobFilter{SharedNode: &model.StringInput{Contains: &obj.Resources[0].Hostname}}) - filter = append(filter, &model.JobFilter{SelfJobID: &model.StringInput{Neq: &jid}}) - filter = append(filter, &model.JobFilter{SelfStartTime: &obj.StartTime, SelfDuration: &jdu}) +func (r *jobResolver) ConcurrentJobs( + ctx context.Context, obj *schema.Job) (*model.JobLinkResultList, error) { - jobLinks, err := r.Repo.QueryJobLinks(ctx, filter) - if err != nil { - log.Warn("Error while querying jobLinks") - return nil, err - } + if obj.State == schema.JobStateRunning { + obj.Duration = int32(time.Now().Unix() - obj.StartTimeUnix) + } - count, err := r.Repo.CountJobs(ctx, filter) - if err != nil { - log.Warn("Error while counting jobLinks") - return nil, err - } - - result := &model.JobLinkResultList{Items: jobLinks, Count: &count} - - return result, nil + if obj.Exclusive != 1 && obj.Duration > 600 { + return r.Repo.FindConcurrentJobs(obj) } return nil, nil diff --git a/internal/repository/job.go b/internal/repository/job.go index 0f280cd..b696b70 100644 --- a/internal/repository/job.go +++ b/internal/repository/job.go @@ -292,6 +292,26 @@ func (r *JobRepository) FindById(jobId int64) (*schema.Job, error) { return scanJob(q.RunWith(r.stmtCache).QueryRow()) } +func (r *JobRepository) FindConcurrentJobs( + job *schema.Job) (*model.JobLinkResultList, error) { + + query := sq.Select("job.id","job.job_id").From("job").Where("cluster = ?",job.Cluster) + var startTime := job.StartTimeUnix + var stopTime int64 + + if job.State == schema.JobStateRunning { + stopTime = time.Now().Unix() + } else { + stopTime = startTime + int64(job.Duration) + } + + // Add 5m overlap for jobs start time at the end + stopTime -= 300 + + query = query.Where("start_time BETWEEN ? AND ?", startTime, stopTime) + + } + // Start inserts a new job in the table, returning the unique job ID. // Statistics are not transfered! func (r *JobRepository) Start(job *schema.JobMeta) (id int64, err error) {