From 270750a78d630e087f810259256e3eb11d93be51 Mon Sep 17 00:00:00 2001 From: Christoph Kluge Date: Tue, 16 May 2023 12:42:06 +0200 Subject: [PATCH] Move concurrentJobs as field to main job query --- api/schema.graphqls | 2 +- internal/graph/generated/generated.go | 209 ++++++++++-------------- internal/graph/schema.resolvers.go | 50 ++++-- internal/repository/query.go | 7 +- pkg/schema/job.go | 11 ++ web/frontend/src/Job.root.svelte | 23 ++- web/frontend/src/joblist/JobInfo.svelte | 35 ---- 7 files changed, 153 insertions(+), 184 deletions(-) diff --git a/api/schema.graphqls b/api/schema.graphqls index 60d4ac7..c9aa2a8 100644 --- a/api/schema.graphqls +++ b/api/schema.graphqls @@ -27,6 +27,7 @@ type Job { state: JobState! tags: [Tag!]! resources: [Resource!]! + concurrentJobs: JobLinkResultList metaData: Any userData: User @@ -192,7 +193,6 @@ type Query { jobsFootprints(filter: [JobFilter!], metrics: [String!]!): Footprints jobs(filter: [JobFilter!], page: PageRequest, order: OrderByInput): JobResultList! - jobsParallel(filter: [JobFilter!]): JobLinkResultList! jobsStatistics(filter: [JobFilter!], groupBy: Aggregate): [JobsStatistics!]! jobsCount(filter: [JobFilter]!, groupBy: Aggregate!, weight: Weights, limit: Int): [Count!]! diff --git a/internal/graph/generated/generated.go b/internal/graph/generated/generated.go index c7bf095..3c8073d 100644 --- a/internal/graph/generated/generated.go +++ b/internal/graph/generated/generated.go @@ -85,6 +85,7 @@ type ComplexityRoot struct { Job struct { ArrayJobId func(childComplexity int) int Cluster func(childComplexity int) int + ConcurrentJobs func(childComplexity int) int Duration func(childComplexity int) int Exclusive func(childComplexity int) int ID func(childComplexity int) int @@ -200,7 +201,6 @@ type ComplexityRoot struct { Jobs func(childComplexity int, filter []*model.JobFilter, page *model.PageRequest, order *model.OrderByInput) int JobsCount func(childComplexity int, filter []*model.JobFilter, groupBy model.Aggregate, weight *model.Weights, limit *int) int JobsFootprints func(childComplexity int, filter []*model.JobFilter, metrics []string) int - JobsParallel func(childComplexity int, filter []*model.JobFilter) int JobsStatistics func(childComplexity int, filter []*model.JobFilter, groupBy *model.Aggregate) int NodeMetrics func(childComplexity int, cluster string, nodes []string, scopes []schema.MetricScope, metrics []string, from time.Time, to time.Time) int RooflineHeatmap func(childComplexity int, filter []*model.JobFilter, rows int, cols int, minX float64, minY float64, maxX float64, maxY float64) int @@ -291,6 +291,7 @@ type JobResolver interface { Tags(ctx context.Context, obj *schema.Job) ([]*schema.Tag, error) + ConcurrentJobs(ctx context.Context, obj *schema.Job) (*model.JobLinkResultList, error) MetaData(ctx context.Context, obj *schema.Job) (interface{}, error) UserData(ctx context.Context, obj *schema.Job) (*model.User, error) } @@ -310,7 +311,6 @@ type QueryResolver interface { JobMetrics(ctx context.Context, id string, metrics []string, scopes []schema.MetricScope) ([]*model.JobMetricWithName, error) JobsFootprints(ctx context.Context, filter []*model.JobFilter, metrics []string) (*model.Footprints, error) Jobs(ctx context.Context, filter []*model.JobFilter, page *model.PageRequest, order *model.OrderByInput) (*model.JobResultList, error) - JobsParallel(ctx context.Context, filter []*model.JobFilter) (*model.JobLinkResultList, error) JobsStatistics(ctx context.Context, filter []*model.JobFilter, groupBy *model.Aggregate) ([]*model.JobsStatistics, error) JobsCount(ctx context.Context, filter []*model.JobFilter, groupBy model.Aggregate, weight *model.Weights, limit *int) ([]*model.Count, error) RooflineHeatmap(ctx context.Context, filter []*model.JobFilter, rows int, cols int, minX float64, minY float64, maxX float64, maxY float64) ([][]float64, error) @@ -454,6 +454,13 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in return e.complexity.Job.Cluster(childComplexity), true + case "Job.concurrentJobs": + if e.complexity.Job.ConcurrentJobs == nil { + break + } + + return e.complexity.Job.ConcurrentJobs(childComplexity), true + case "Job.duration": if e.complexity.Job.Duration == nil { break @@ -1041,18 +1048,6 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in return e.complexity.Query.JobsFootprints(childComplexity, args["filter"].([]*model.JobFilter), args["metrics"].([]string)), true - case "Query.jobsParallel": - if e.complexity.Query.JobsParallel == nil { - break - } - - args, err := ec.field_Query_jobsParallel_args(context.TODO(), rawArgs) - if err != nil { - return 0, false - } - - return e.complexity.Query.JobsParallel(childComplexity, args["filter"].([]*model.JobFilter)), true - case "Query.jobsStatistics": if e.complexity.Query.JobsStatistics == nil { break @@ -1520,6 +1515,7 @@ type Job { state: JobState! tags: [Tag!]! resources: [Resource!]! + concurrentJobs: JobLinkResultList metaData: Any userData: User @@ -1685,7 +1681,6 @@ type Query { jobsFootprints(filter: [JobFilter!], metrics: [String!]!): Footprints jobs(filter: [JobFilter!], page: PageRequest, order: OrderByInput): JobResultList! - jobsParallel(filter: [JobFilter!]): JobLinkResultList! jobsStatistics(filter: [JobFilter!], groupBy: Aggregate): [JobsStatistics!]! jobsCount(filter: [JobFilter]!, groupBy: Aggregate!, weight: Weights, limit: Int): [Count!]! @@ -2055,21 +2050,6 @@ func (ec *executionContext) field_Query_jobsFootprints_args(ctx context.Context, return args, nil } -func (ec *executionContext) field_Query_jobsParallel_args(ctx context.Context, rawArgs map[string]interface{}) (map[string]interface{}, error) { - var err error - args := map[string]interface{}{} - var arg0 []*model.JobFilter - if tmp, ok := rawArgs["filter"]; ok { - ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("filter")) - arg0, err = ec.unmarshalOJobFilter2ᚕᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋinternalᚋgraphᚋmodelᚐJobFilterᚄ(ctx, tmp) - if err != nil { - return nil, err - } - } - args["filter"] = arg0 - return args, nil -} - func (ec *executionContext) field_Query_jobsStatistics_args(ctx context.Context, rawArgs map[string]interface{}) (map[string]interface{}, error) { var err error args := map[string]interface{}{} @@ -3960,6 +3940,53 @@ func (ec *executionContext) fieldContext_Job_resources(ctx context.Context, fiel return fc, nil } +func (ec *executionContext) _Job_concurrentJobs(ctx context.Context, field graphql.CollectedField, obj *schema.Job) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_Job_concurrentJobs(ctx, field) + if err != nil { + return graphql.Null + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return ec.resolvers.Job().ConcurrentJobs(rctx, obj) + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + return graphql.Null + } + res := resTmp.(*model.JobLinkResultList) + fc.Result = res + return ec.marshalOJobLinkResultList2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋinternalᚋgraphᚋmodelᚐJobLinkResultList(ctx, field.Selections, res) +} + +func (ec *executionContext) fieldContext_Job_concurrentJobs(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "Job", + Field: field, + IsMethod: true, + IsResolver: true, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + switch field.Name { + case "items": + return ec.fieldContext_JobLinkResultList_items(ctx, field) + case "count": + return ec.fieldContext_JobLinkResultList_count(ctx, field) + } + return nil, fmt.Errorf("no field named %q was found under type JobLinkResultList", field.Name) + }, + } + return fc, nil +} + func (ec *executionContext) _Job_metaData(ctx context.Context, field graphql.CollectedField, obj *schema.Job) (ret graphql.Marshaler) { fc, err := ec.fieldContext_Job_metaData(ctx, field) if err != nil { @@ -4643,6 +4670,8 @@ func (ec *executionContext) fieldContext_JobResultList_items(ctx context.Context return ec.fieldContext_Job_tags(ctx, field) case "resources": return ec.fieldContext_Job_resources(ctx, field) + case "concurrentJobs": + return ec.fieldContext_Job_concurrentJobs(ctx, field) case "metaData": return ec.fieldContext_Job_metaData(ctx, field) case "userData": @@ -6640,6 +6669,8 @@ func (ec *executionContext) fieldContext_Query_job(ctx context.Context, field gr return ec.fieldContext_Job_tags(ctx, field) case "resources": return ec.fieldContext_Job_resources(ctx, field) + case "concurrentJobs": + return ec.fieldContext_Job_concurrentJobs(ctx, field) case "metaData": return ec.fieldContext_Job_metaData(ctx, field) case "userData": @@ -6845,66 +6876,6 @@ func (ec *executionContext) fieldContext_Query_jobs(ctx context.Context, field g return fc, nil } -func (ec *executionContext) _Query_jobsParallel(ctx context.Context, field graphql.CollectedField) (ret graphql.Marshaler) { - fc, err := ec.fieldContext_Query_jobsParallel(ctx, field) - if err != nil { - return graphql.Null - } - ctx = graphql.WithFieldContext(ctx, fc) - defer func() { - if r := recover(); r != nil { - ec.Error(ctx, ec.Recover(ctx, r)) - ret = graphql.Null - } - }() - resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { - ctx = rctx // use context from middleware stack in children - return ec.resolvers.Query().JobsParallel(rctx, fc.Args["filter"].([]*model.JobFilter)) - }) - if err != nil { - ec.Error(ctx, err) - } - if resTmp == nil { - if !graphql.HasFieldError(ctx, fc) { - ec.Errorf(ctx, "must not be null") - } - return graphql.Null - } - res := resTmp.(*model.JobLinkResultList) - fc.Result = res - return ec.marshalNJobLinkResultList2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋinternalᚋgraphᚋmodelᚐJobLinkResultList(ctx, field.Selections, res) -} - -func (ec *executionContext) fieldContext_Query_jobsParallel(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { - fc = &graphql.FieldContext{ - Object: "Query", - Field: field, - IsMethod: true, - IsResolver: true, - Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { - switch field.Name { - case "items": - return ec.fieldContext_JobLinkResultList_items(ctx, field) - case "count": - return ec.fieldContext_JobLinkResultList_count(ctx, field) - } - return nil, fmt.Errorf("no field named %q was found under type JobLinkResultList", field.Name) - }, - } - defer func() { - if r := recover(); r != nil { - err = ec.Recover(ctx, r) - ec.Error(ctx, err) - } - }() - ctx = graphql.WithFieldContext(ctx, fc) - if fc.Args, err = ec.field_Query_jobsParallel_args(ctx, field.ArgumentMap(ec.Variables)); err != nil { - ec.Error(ctx, err) - return - } - return fc, nil -} - func (ec *executionContext) _Query_jobsStatistics(ctx context.Context, field graphql.CollectedField) (ret graphql.Marshaler) { fc, err := ec.fieldContext_Query_jobsStatistics(ctx, field) if err != nil { @@ -11882,6 +11853,23 @@ func (ec *executionContext) _Job(ctx context.Context, sel ast.SelectionSet, obj if out.Values[i] == graphql.Null { atomic.AddUint32(&invalids, 1) } + case "concurrentJobs": + field := field + + innerFunc := func(ctx context.Context) (res graphql.Marshaler) { + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + } + }() + res = ec._Job_concurrentJobs(ctx, field, obj) + return res + } + + out.Concurrently(i, func() graphql.Marshaler { + return innerFunc(ctx) + + }) case "metaData": field := field @@ -12663,26 +12651,6 @@ func (ec *executionContext) _Query(ctx context.Context, sel ast.SelectionSet) gr return ec.OperationContext.RootResolverMiddleware(ctx, innerFunc) } - out.Concurrently(i, func() graphql.Marshaler { - return rrm(innerCtx) - }) - case "jobsParallel": - field := field - - innerFunc := func(ctx context.Context) (res graphql.Marshaler) { - defer func() { - if r := recover(); r != nil { - ec.Error(ctx, ec.Recover(ctx, r)) - } - }() - res = ec._Query_jobsParallel(ctx, field) - return res - } - - rrm := func(ctx context.Context) graphql.Marshaler { - return ec.OperationContext.RootResolverMiddleware(ctx, innerFunc) - } - out.Concurrently(i, func() graphql.Marshaler { return rrm(innerCtx) }) @@ -14241,20 +14209,6 @@ func (ec *executionContext) marshalNJobLink2ᚖgithubᚗcomᚋClusterCockpitᚋc return ec._JobLink(ctx, sel, v) } -func (ec *executionContext) marshalNJobLinkResultList2githubᚗcomᚋClusterCockpitᚋccᚑbackendᚋinternalᚋgraphᚋmodelᚐJobLinkResultList(ctx context.Context, sel ast.SelectionSet, v model.JobLinkResultList) graphql.Marshaler { - return ec._JobLinkResultList(ctx, sel, &v) -} - -func (ec *executionContext) marshalNJobLinkResultList2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋinternalᚋgraphᚋmodelᚐJobLinkResultList(ctx context.Context, sel ast.SelectionSet, v *model.JobLinkResultList) graphql.Marshaler { - if v == nil { - if !graphql.HasFieldError(ctx, graphql.GetFieldContext(ctx)) { - ec.Errorf(ctx, "the requested element is null which the schema does not allow") - } - return graphql.Null - } - return ec._JobLinkResultList(ctx, sel, v) -} - func (ec *executionContext) marshalNJobMetric2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋpkgᚋschemaᚐJobMetric(ctx context.Context, sel ast.SelectionSet, v *schema.JobMetric) graphql.Marshaler { if v == nil { if !graphql.HasFieldError(ctx, graphql.GetFieldContext(ctx)) { @@ -15534,6 +15488,13 @@ func (ec *executionContext) unmarshalOJobFilter2ᚖgithubᚗcomᚋClusterCockpit return &res, graphql.ErrorOnPath(ctx, err) } +func (ec *executionContext) marshalOJobLinkResultList2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋinternalᚋgraphᚋmodelᚐJobLinkResultList(ctx context.Context, sel ast.SelectionSet, v *model.JobLinkResultList) graphql.Marshaler { + if v == nil { + return graphql.Null + } + return ec._JobLinkResultList(ctx, sel, v) +} + func (ec *executionContext) unmarshalOJobState2ᚕgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋpkgᚋschemaᚐJobStateᚄ(ctx context.Context, v interface{}) ([]schema.JobState, error) { if v == nil { return nil, nil diff --git a/internal/graph/schema.resolvers.go b/internal/graph/schema.resolvers.go index c4e86e0..5cf63be 100644 --- a/internal/graph/schema.resolvers.go +++ b/internal/graph/schema.resolvers.go @@ -36,6 +36,39 @@ func (r *jobResolver) Tags(ctx context.Context, obj *schema.Job) ([]*schema.Tag, return r.Repo.GetTags(&obj.ID) } +// ConcurrentJobs is the resolver for the concurrentJobs field. +func (r *jobResolver) ConcurrentJobs(ctx context.Context, obj *schema.Job) (*model.JobLinkResultList, error) { + + exc := int(obj.Exclusive) + if exc != 1 { + filter := []*model.JobFilter{} + jid := string(obj.JobID) + jdu := int(obj.Duration) + filter = append(filter, &model.JobFilter{Exclusive: &exc}) + filter = append(filter, &model.JobFilter{SharedNode: &model.StringInput{Contains: &obj.Resources[0].Hostname}}) + filter = append(filter, &model.JobFilter{SelfJobID: &model.StringInput{Neq: &jid}}) + filter = append(filter, &model.JobFilter{SelfStartTime: &obj.StartTime, SelfDuration: &jdu}) + + jobLinks, err := r.Repo.QueryJobLinks(ctx, filter) + if err != nil { + log.Warn("Error while querying jobLinks") + return nil, err + } + + count, err := r.Repo.CountJobs(ctx, filter) + if err != nil { + log.Warn("Error while counting jobLinks") + return nil, err + } + + result := &model.JobLinkResultList{Items: jobLinks, Count: &count} + + return result, nil + } + + return nil, nil +} + // MetaData is the resolver for the metaData field. func (r *jobResolver) MetaData(ctx context.Context, obj *schema.Job) (interface{}, error) { return r.Repo.FetchMetadata(obj) @@ -234,23 +267,6 @@ func (r *queryResolver) Jobs(ctx context.Context, filter []*model.JobFilter, pag return &model.JobResultList{Items: jobs, Count: &count}, nil } -// JobsShared is the resolver for the jobsShared field. -func (r *queryResolver) JobsParallel(ctx context.Context, filter []*model.JobFilter) (*model.JobLinkResultList, error) { - jobLinks, err := r.Repo.QueryJobLinks(ctx, filter) - if err != nil { - log.Warn("Error while querying jobLinks") - return nil, err - } - - count, err := r.Repo.CountJobs(ctx, filter) - if err != nil { - log.Warn("Error while counting jobLinks") - return nil, err - } - - return &model.JobLinkResultList{Items: jobLinks, Count: &count}, nil -} - // JobsStatistics is the resolver for the jobsStatistics field. func (r *queryResolver) JobsStatistics(ctx context.Context, filter []*model.JobFilter, groupBy *model.Aggregate) ([]*model.JobsStatistics, error) { return r.Repo.JobsStatistics(ctx, filter, groupBy) diff --git a/internal/repository/query.go b/internal/repository/query.go index d43bb7c..50585b3 100644 --- a/internal/repository/query.go +++ b/internal/repository/query.go @@ -239,10 +239,9 @@ func BuildWhereClause(filter *model.JobFilter, query sq.SelectBuilder) sq.Select if filter.SelfJobID != nil { query = buildStringCondition("job.job_id", filter.SelfJobID, query) } - if filter.SelfStartTime != nil && filter.SelfDuration != nil { // Offset of 30 minutes? - log.Debug("SET SELFTIME FILTERS") - start := filter.SelfStartTime.Unix() // There does not seam to be a portable way to get the current unix timestamp accross different DBs. - end := start + int64(*filter.SelfDuration) + if filter.SelfStartTime != nil && filter.SelfDuration != nil { + start := filter.SelfStartTime.Unix() + 10 // There does not seem to be a portable way to get the current unix timestamp accross different DBs. + end := start + int64(*filter.SelfDuration) - 20 query = query.Where("((job.start_time BETWEEN ? AND ?) OR ((job.start_time + job.duration) BETWEEN ? AND ?))", start, end, start, end) } return query diff --git a/pkg/schema/job.go b/pkg/schema/job.go index ecb1a2b..702f992 100644 --- a/pkg/schema/job.go +++ b/pkg/schema/job.go @@ -37,6 +37,7 @@ type BaseJob struct { Resources []*Resource `json:"resources"` // Resources used by job RawMetaData []byte `json:"-" db:"meta_data"` // Additional information about the job [As Bytes] MetaData map[string]string `json:"metaData"` // Additional information about the job + ConcurrentJobs JobLinkResultList `json:"concurrentJobs"` } // Non-Swaggered Comment: Job @@ -60,6 +61,16 @@ type Job struct { FileDataVolTotal float64 `json:"-" db:"file_data_vol_total"` // FileDataVolTotal as Float64 } +type JobLink struct { + ID int64 `json:"id"` + JobID int64 `json:"jobId"` +} + +type JobLinkResultList struct { + Items []*JobLink `json:"items"` + Count int `json:"count"` +} + // Non-Swaggered Comment: JobMeta // Non-Swaggered Comment: When reading from the database or sending data via GraphQL, the start time can be in the much more // Non-Swaggered Comment: convenient time.Time type. In the `meta.json` files, the start time is encoded as a unix epoch timestamp. diff --git a/web/frontend/src/Job.root.svelte b/web/frontend/src/Job.root.svelte index bbd02fa..29b577d 100644 --- a/web/frontend/src/Job.root.svelte +++ b/web/frontend/src/Job.root.svelte @@ -23,8 +23,9 @@ monitoringStatus, state, walltime, tags { id, type, name }, resources { hostname, hwthreads, accelerators }, - metaData - userData { name, email } + metaData, + userData { name, email }, + concurrentJobs { items { id, jobId }, count } } `) @@ -95,12 +96,22 @@ {#if $initq.error} {$initq.error.message} {:else if $initq.data} - + {:else} {/if} {#if $jobMetrics.data && $initq.data} + {#if $initq.data.job.concurrentJobs != null} + +
Concurrent Jobs
+
    + {#each $initq.data.job.concurrentJobs.items as pjob, index} +
  • {pjob.jobId}
  • + {/each} +
+ + {/if} diff --git a/web/frontend/src/joblist/JobInfo.svelte b/web/frontend/src/joblist/JobInfo.svelte index dd651e2..171d199 100644 --- a/web/frontend/src/joblist/JobInfo.svelte +++ b/web/frontend/src/joblist/JobInfo.svelte @@ -12,11 +12,9 @@
@@ -80,19 +58,6 @@ {scrambleNames ? scramble(job.project) : job.project} {/if} - {#if showParallelJobs && $parallelJobs.data != null} -
- - {#if $parallelJobs.data.jobsParallel.count == 0} - No Parallel Jobs - {:else} - {#each $parallelJobs.data.jobsParallel.items as pjob, index} - - {pjob.jobId}{#if index != $parallelJobs.data.jobsParallel.count - 1},{/if} - - {/each} - {/if} - {/if}