From d93610f70007d12418c04e27b613ff71c5639e3b Mon Sep 17 00:00:00 2001 From: Christoph Kluge Date: Fri, 28 Apr 2023 12:34:40 +0200 Subject: [PATCH] List parallel jobs on node for jobs on shared node - Relates to issue #97 - required GQL schema extension and regeneration - Works for archived jobs aswell --- api/schema.graphqls | 18 + internal/graph/generated/generated.go | 531 +++++++++++++++++++++++- internal/graph/model/models_gen.go | 16 + internal/graph/schema.resolvers.go | 17 + internal/repository/job.go | 13 +- internal/repository/query.go | 63 ++- web/frontend/src/Job.root.svelte | 2 +- web/frontend/src/joblist/JobInfo.svelte | 36 ++ 8 files changed, 691 insertions(+), 5 deletions(-) diff --git a/api/schema.graphqls b/api/schema.graphqls index d078123..60d4ac7 100644 --- a/api/schema.graphqls +++ b/api/schema.graphqls @@ -32,6 +32,11 @@ type Job { userData: User } +type JobLink { + id: ID! + jobId: Int! +} + type Cluster { name: String! partitions: [String!]! # Slurm partitions @@ -187,6 +192,7 @@ type Query { jobsFootprints(filter: [JobFilter!], metrics: [String!]!): Footprints jobs(filter: [JobFilter!], page: PageRequest, order: OrderByInput): JobResultList! + jobsParallel(filter: [JobFilter!]): JobLinkResultList! jobsStatistics(filter: [JobFilter!], groupBy: Aggregate): [JobsStatistics!]! jobsCount(filter: [JobFilter]!, groupBy: Aggregate!, weight: Weights, limit: Int): [Count!]! @@ -230,6 +236,12 @@ input JobFilter { memBwAvg: FloatRange loadAvg: FloatRange memUsedMax: FloatRange + + exclusive: Int + sharedNode: StringInput + selfJobId: StringInput + selfStartTime: Time + selfDuration: Int } input OrderByInput { @@ -244,6 +256,7 @@ enum SortDirectionEnum { input StringInput { eq: String + neq: String contains: String startsWith: String endsWith: String @@ -261,6 +274,11 @@ type JobResultList { count: Int } +type JobLinkResultList { + items: [JobLink!]! + count: Int +} + type HistoPoint { count: Int! value: Int! diff --git a/internal/graph/generated/generated.go b/internal/graph/generated/generated.go index db6902a..c7bf095 100644 --- a/internal/graph/generated/generated.go +++ b/internal/graph/generated/generated.go @@ -108,6 +108,16 @@ type ComplexityRoot struct { Walltime func(childComplexity int) int } + JobLink struct { + ID func(childComplexity int) int + JobID func(childComplexity int) int + } + + JobLinkResultList struct { + Count func(childComplexity int) int + Items func(childComplexity int) int + } + JobMetric struct { Series func(childComplexity int) int StatisticsSeries func(childComplexity int) int @@ -190,6 +200,7 @@ type ComplexityRoot struct { Jobs func(childComplexity int, filter []*model.JobFilter, page *model.PageRequest, order *model.OrderByInput) int JobsCount func(childComplexity int, filter []*model.JobFilter, groupBy model.Aggregate, weight *model.Weights, limit *int) int JobsFootprints func(childComplexity int, filter []*model.JobFilter, metrics []string) int + JobsParallel func(childComplexity int, filter []*model.JobFilter) int JobsStatistics func(childComplexity int, filter []*model.JobFilter, groupBy *model.Aggregate) int NodeMetrics func(childComplexity int, cluster string, nodes []string, scopes []schema.MetricScope, metrics []string, from time.Time, to time.Time) int RooflineHeatmap func(childComplexity int, filter []*model.JobFilter, rows int, cols int, minX float64, minY float64, maxX float64, maxY float64) int @@ -299,6 +310,7 @@ type QueryResolver interface { JobMetrics(ctx context.Context, id string, metrics []string, scopes []schema.MetricScope) ([]*model.JobMetricWithName, error) JobsFootprints(ctx context.Context, filter []*model.JobFilter, metrics []string) (*model.Footprints, error) Jobs(ctx context.Context, filter []*model.JobFilter, page *model.PageRequest, order *model.OrderByInput) (*model.JobResultList, error) + JobsParallel(ctx context.Context, filter []*model.JobFilter) (*model.JobLinkResultList, error) JobsStatistics(ctx context.Context, filter []*model.JobFilter, groupBy *model.Aggregate) ([]*model.JobsStatistics, error) JobsCount(ctx context.Context, filter []*model.JobFilter, groupBy model.Aggregate, weight *model.Weights, limit *int) ([]*model.Count, error) RooflineHeatmap(ctx context.Context, filter []*model.JobFilter, rows int, cols int, minX float64, minY float64, maxX float64, maxY float64) ([][]float64, error) @@ -589,6 +601,34 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in return e.complexity.Job.Walltime(childComplexity), true + case "JobLink.id": + if e.complexity.JobLink.ID == nil { + break + } + + return e.complexity.JobLink.ID(childComplexity), true + + case "JobLink.jobId": + if e.complexity.JobLink.JobID == nil { + break + } + + return e.complexity.JobLink.JobID(childComplexity), true + + case "JobLinkResultList.count": + if e.complexity.JobLinkResultList.Count == nil { + break + } + + return e.complexity.JobLinkResultList.Count(childComplexity), true + + case "JobLinkResultList.items": + if e.complexity.JobLinkResultList.Items == nil { + break + } + + return e.complexity.JobLinkResultList.Items(childComplexity), true + case "JobMetric.series": if e.complexity.JobMetric.Series == nil { break @@ -1001,6 +1041,18 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in return e.complexity.Query.JobsFootprints(childComplexity, args["filter"].([]*model.JobFilter), args["metrics"].([]string)), true + case "Query.jobsParallel": + if e.complexity.Query.JobsParallel == nil { + break + } + + args, err := ec.field_Query_jobsParallel_args(context.TODO(), rawArgs) + if err != nil { + return 0, false + } + + return e.complexity.Query.JobsParallel(childComplexity, args["filter"].([]*model.JobFilter)), true + case "Query.jobsStatistics": if e.complexity.Query.JobsStatistics == nil { break @@ -1473,6 +1525,11 @@ type Job { userData: User } +type JobLink { + id: ID! + jobId: Int! +} + type Cluster { name: String! partitions: [String!]! # Slurm partitions @@ -1628,6 +1685,7 @@ type Query { jobsFootprints(filter: [JobFilter!], metrics: [String!]!): Footprints jobs(filter: [JobFilter!], page: PageRequest, order: OrderByInput): JobResultList! + jobsParallel(filter: [JobFilter!]): JobLinkResultList! jobsStatistics(filter: [JobFilter!], groupBy: Aggregate): [JobsStatistics!]! jobsCount(filter: [JobFilter]!, groupBy: Aggregate!, weight: Weights, limit: Int): [Count!]! @@ -1671,6 +1729,12 @@ input JobFilter { memBwAvg: FloatRange loadAvg: FloatRange memUsedMax: FloatRange + + exclusive: Int + sharedNode: StringInput + selfJobId: StringInput + selfStartTime: Time + selfDuration: Int } input OrderByInput { @@ -1685,6 +1749,7 @@ enum SortDirectionEnum { input StringInput { eq: String + neq: String contains: String startsWith: String endsWith: String @@ -1702,6 +1767,11 @@ type JobResultList { count: Int } +type JobLinkResultList { + items: [JobLink!]! + count: Int +} + type HistoPoint { count: Int! value: Int! @@ -1985,6 +2055,21 @@ func (ec *executionContext) field_Query_jobsFootprints_args(ctx context.Context, return args, nil } +func (ec *executionContext) field_Query_jobsParallel_args(ctx context.Context, rawArgs map[string]interface{}) (map[string]interface{}, error) { + var err error + args := map[string]interface{}{} + var arg0 []*model.JobFilter + if tmp, ok := rawArgs["filter"]; ok { + ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("filter")) + arg0, err = ec.unmarshalOJobFilter2ᚕᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋinternalᚋgraphᚋmodelᚐJobFilterᚄ(ctx, tmp) + if err != nil { + return nil, err + } + } + args["filter"] = arg0 + return args, nil +} + func (ec *executionContext) field_Query_jobsStatistics_args(ctx context.Context, rawArgs map[string]interface{}) (map[string]interface{}, error) { var err error args := map[string]interface{}{} @@ -3965,6 +4050,185 @@ func (ec *executionContext) fieldContext_Job_userData(ctx context.Context, field return fc, nil } +func (ec *executionContext) _JobLink_id(ctx context.Context, field graphql.CollectedField, obj *model.JobLink) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_JobLink_id(ctx, field) + if err != nil { + return graphql.Null + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return obj.ID, nil + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + if !graphql.HasFieldError(ctx, fc) { + ec.Errorf(ctx, "must not be null") + } + return graphql.Null + } + res := resTmp.(string) + fc.Result = res + return ec.marshalNID2string(ctx, field.Selections, res) +} + +func (ec *executionContext) fieldContext_JobLink_id(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "JobLink", + Field: field, + IsMethod: false, + IsResolver: false, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + return nil, errors.New("field of type ID does not have child fields") + }, + } + return fc, nil +} + +func (ec *executionContext) _JobLink_jobId(ctx context.Context, field graphql.CollectedField, obj *model.JobLink) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_JobLink_jobId(ctx, field) + if err != nil { + return graphql.Null + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return obj.JobID, nil + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + if !graphql.HasFieldError(ctx, fc) { + ec.Errorf(ctx, "must not be null") + } + return graphql.Null + } + res := resTmp.(int) + fc.Result = res + return ec.marshalNInt2int(ctx, field.Selections, res) +} + +func (ec *executionContext) fieldContext_JobLink_jobId(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "JobLink", + Field: field, + IsMethod: false, + IsResolver: false, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + return nil, errors.New("field of type Int does not have child fields") + }, + } + return fc, nil +} + +func (ec *executionContext) _JobLinkResultList_items(ctx context.Context, field graphql.CollectedField, obj *model.JobLinkResultList) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_JobLinkResultList_items(ctx, field) + if err != nil { + return graphql.Null + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return obj.Items, nil + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + if !graphql.HasFieldError(ctx, fc) { + ec.Errorf(ctx, "must not be null") + } + return graphql.Null + } + res := resTmp.([]*model.JobLink) + fc.Result = res + return ec.marshalNJobLink2ᚕᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋinternalᚋgraphᚋmodelᚐJobLinkᚄ(ctx, field.Selections, res) +} + +func (ec *executionContext) fieldContext_JobLinkResultList_items(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "JobLinkResultList", + Field: field, + IsMethod: false, + IsResolver: false, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + switch field.Name { + case "id": + return ec.fieldContext_JobLink_id(ctx, field) + case "jobId": + return ec.fieldContext_JobLink_jobId(ctx, field) + } + return nil, fmt.Errorf("no field named %q was found under type JobLink", field.Name) + }, + } + return fc, nil +} + +func (ec *executionContext) _JobLinkResultList_count(ctx context.Context, field graphql.CollectedField, obj *model.JobLinkResultList) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_JobLinkResultList_count(ctx, field) + if err != nil { + return graphql.Null + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return obj.Count, nil + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + return graphql.Null + } + res := resTmp.(*int) + fc.Result = res + return ec.marshalOInt2ᚖint(ctx, field.Selections, res) +} + +func (ec *executionContext) fieldContext_JobLinkResultList_count(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "JobLinkResultList", + Field: field, + IsMethod: false, + IsResolver: false, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + return nil, errors.New("field of type Int does not have child fields") + }, + } + return fc, nil +} + func (ec *executionContext) _JobMetric_unit(ctx context.Context, field graphql.CollectedField, obj *schema.JobMetric) (ret graphql.Marshaler) { fc, err := ec.fieldContext_JobMetric_unit(ctx, field) if err != nil { @@ -6581,6 +6845,66 @@ func (ec *executionContext) fieldContext_Query_jobs(ctx context.Context, field g return fc, nil } +func (ec *executionContext) _Query_jobsParallel(ctx context.Context, field graphql.CollectedField) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_Query_jobsParallel(ctx, field) + if err != nil { + return graphql.Null + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return ec.resolvers.Query().JobsParallel(rctx, fc.Args["filter"].([]*model.JobFilter)) + }) + if err != nil { + ec.Error(ctx, err) + } + if resTmp == nil { + if !graphql.HasFieldError(ctx, fc) { + ec.Errorf(ctx, "must not be null") + } + return graphql.Null + } + res := resTmp.(*model.JobLinkResultList) + fc.Result = res + return ec.marshalNJobLinkResultList2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋinternalᚋgraphᚋmodelᚐJobLinkResultList(ctx, field.Selections, res) +} + +func (ec *executionContext) fieldContext_Query_jobsParallel(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "Query", + Field: field, + IsMethod: true, + IsResolver: true, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + switch field.Name { + case "items": + return ec.fieldContext_JobLinkResultList_items(ctx, field) + case "count": + return ec.fieldContext_JobLinkResultList_count(ctx, field) + } + return nil, fmt.Errorf("no field named %q was found under type JobLinkResultList", field.Name) + }, + } + defer func() { + if r := recover(); r != nil { + err = ec.Recover(ctx, r) + ec.Error(ctx, err) + } + }() + ctx = graphql.WithFieldContext(ctx, fc) + if fc.Args, err = ec.field_Query_jobsParallel_args(ctx, field.ArgumentMap(ec.Variables)); err != nil { + ec.Error(ctx, err) + return + } + return fc, nil +} + func (ec *executionContext) _Query_jobsStatistics(ctx context.Context, field graphql.CollectedField) (ret graphql.Marshaler) { fc, err := ec.fieldContext_Query_jobsStatistics(ctx, field) if err != nil { @@ -10741,7 +11065,7 @@ func (ec *executionContext) unmarshalInputJobFilter(ctx context.Context, obj int asMap[k] = v } - fieldsInOrder := [...]string{"tags", "jobId", "arrayJobId", "user", "project", "jobName", "cluster", "partition", "duration", "minRunningFor", "numNodes", "numAccelerators", "numHWThreads", "startTime", "state", "flopsAnyAvg", "memBwAvg", "loadAvg", "memUsedMax"} + fieldsInOrder := [...]string{"tags", "jobId", "arrayJobId", "user", "project", "jobName", "cluster", "partition", "duration", "minRunningFor", "numNodes", "numAccelerators", "numHWThreads", "startTime", "state", "flopsAnyAvg", "memBwAvg", "loadAvg", "memUsedMax", "exclusive", "sharedNode", "selfJobId", "selfStartTime", "selfDuration"} for _, k := range fieldsInOrder { v, ok := asMap[k] if !ok { @@ -10900,6 +11224,46 @@ func (ec *executionContext) unmarshalInputJobFilter(ctx context.Context, obj int if err != nil { return it, err } + case "exclusive": + var err error + + ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("exclusive")) + it.Exclusive, err = ec.unmarshalOInt2ᚖint(ctx, v) + if err != nil { + return it, err + } + case "sharedNode": + var err error + + ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("sharedNode")) + it.SharedNode, err = ec.unmarshalOStringInput2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋinternalᚋgraphᚋmodelᚐStringInput(ctx, v) + if err != nil { + return it, err + } + case "selfJobId": + var err error + + ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("selfJobId")) + it.SelfJobID, err = ec.unmarshalOStringInput2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋinternalᚋgraphᚋmodelᚐStringInput(ctx, v) + if err != nil { + return it, err + } + case "selfStartTime": + var err error + + ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("selfStartTime")) + it.SelfStartTime, err = ec.unmarshalOTime2ᚖtimeᚐTime(ctx, v) + if err != nil { + return it, err + } + case "selfDuration": + var err error + + ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("selfDuration")) + it.SelfDuration, err = ec.unmarshalOInt2ᚖint(ctx, v) + if err != nil { + return it, err + } } } @@ -10989,7 +11353,7 @@ func (ec *executionContext) unmarshalInputStringInput(ctx context.Context, obj i asMap[k] = v } - fieldsInOrder := [...]string{"eq", "contains", "startsWith", "endsWith", "in"} + fieldsInOrder := [...]string{"eq", "neq", "contains", "startsWith", "endsWith", "in"} for _, k := range fieldsInOrder { v, ok := asMap[k] if !ok { @@ -11004,6 +11368,14 @@ func (ec *executionContext) unmarshalInputStringInput(ctx context.Context, obj i if err != nil { return it, err } + case "neq": + var err error + + ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("neq")) + it.Neq, err = ec.unmarshalOString2ᚖstring(ctx, v) + if err != nil { + return it, err + } case "contains": var err error @@ -11555,6 +11927,73 @@ func (ec *executionContext) _Job(ctx context.Context, sel ast.SelectionSet, obj return out } +var jobLinkImplementors = []string{"JobLink"} + +func (ec *executionContext) _JobLink(ctx context.Context, sel ast.SelectionSet, obj *model.JobLink) graphql.Marshaler { + fields := graphql.CollectFields(ec.OperationContext, sel, jobLinkImplementors) + out := graphql.NewFieldSet(fields) + var invalids uint32 + for i, field := range fields { + switch field.Name { + case "__typename": + out.Values[i] = graphql.MarshalString("JobLink") + case "id": + + out.Values[i] = ec._JobLink_id(ctx, field, obj) + + if out.Values[i] == graphql.Null { + invalids++ + } + case "jobId": + + out.Values[i] = ec._JobLink_jobId(ctx, field, obj) + + if out.Values[i] == graphql.Null { + invalids++ + } + default: + panic("unknown field " + strconv.Quote(field.Name)) + } + } + out.Dispatch() + if invalids > 0 { + return graphql.Null + } + return out +} + +var jobLinkResultListImplementors = []string{"JobLinkResultList"} + +func (ec *executionContext) _JobLinkResultList(ctx context.Context, sel ast.SelectionSet, obj *model.JobLinkResultList) graphql.Marshaler { + fields := graphql.CollectFields(ec.OperationContext, sel, jobLinkResultListImplementors) + out := graphql.NewFieldSet(fields) + var invalids uint32 + for i, field := range fields { + switch field.Name { + case "__typename": + out.Values[i] = graphql.MarshalString("JobLinkResultList") + case "items": + + out.Values[i] = ec._JobLinkResultList_items(ctx, field, obj) + + if out.Values[i] == graphql.Null { + invalids++ + } + case "count": + + out.Values[i] = ec._JobLinkResultList_count(ctx, field, obj) + + default: + panic("unknown field " + strconv.Quote(field.Name)) + } + } + out.Dispatch() + if invalids > 0 { + return graphql.Null + } + return out +} + var jobMetricImplementors = []string{"JobMetric"} func (ec *executionContext) _JobMetric(ctx context.Context, sel ast.SelectionSet, obj *schema.JobMetric) graphql.Marshaler { @@ -12224,6 +12663,26 @@ func (ec *executionContext) _Query(ctx context.Context, sel ast.SelectionSet) gr return ec.OperationContext.RootResolverMiddleware(ctx, innerFunc) } + out.Concurrently(i, func() graphql.Marshaler { + return rrm(innerCtx) + }) + case "jobsParallel": + field := field + + innerFunc := func(ctx context.Context) (res graphql.Marshaler) { + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + } + }() + res = ec._Query_jobsParallel(ctx, field) + return res + } + + rrm := func(ctx context.Context) graphql.Marshaler { + return ec.OperationContext.RootResolverMiddleware(ctx, innerFunc) + } + out.Concurrently(i, func() graphql.Marshaler { return rrm(innerCtx) }) @@ -13728,6 +14187,74 @@ func (ec *executionContext) unmarshalNJobFilter2ᚖgithubᚗcomᚋClusterCockpit return &res, graphql.ErrorOnPath(ctx, err) } +func (ec *executionContext) marshalNJobLink2ᚕᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋinternalᚋgraphᚋmodelᚐJobLinkᚄ(ctx context.Context, sel ast.SelectionSet, v []*model.JobLink) graphql.Marshaler { + ret := make(graphql.Array, len(v)) + var wg sync.WaitGroup + isLen1 := len(v) == 1 + if !isLen1 { + wg.Add(len(v)) + } + for i := range v { + i := i + fc := &graphql.FieldContext{ + Index: &i, + Result: &v[i], + } + ctx := graphql.WithFieldContext(ctx, fc) + f := func(i int) { + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = nil + } + }() + if !isLen1 { + defer wg.Done() + } + ret[i] = ec.marshalNJobLink2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋinternalᚋgraphᚋmodelᚐJobLink(ctx, sel, v[i]) + } + if isLen1 { + f(i) + } else { + go f(i) + } + + } + wg.Wait() + + for _, e := range ret { + if e == graphql.Null { + return graphql.Null + } + } + + return ret +} + +func (ec *executionContext) marshalNJobLink2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋinternalᚋgraphᚋmodelᚐJobLink(ctx context.Context, sel ast.SelectionSet, v *model.JobLink) graphql.Marshaler { + if v == nil { + if !graphql.HasFieldError(ctx, graphql.GetFieldContext(ctx)) { + ec.Errorf(ctx, "the requested element is null which the schema does not allow") + } + return graphql.Null + } + return ec._JobLink(ctx, sel, v) +} + +func (ec *executionContext) marshalNJobLinkResultList2githubᚗcomᚋClusterCockpitᚋccᚑbackendᚋinternalᚋgraphᚋmodelᚐJobLinkResultList(ctx context.Context, sel ast.SelectionSet, v model.JobLinkResultList) graphql.Marshaler { + return ec._JobLinkResultList(ctx, sel, &v) +} + +func (ec *executionContext) marshalNJobLinkResultList2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋinternalᚋgraphᚋmodelᚐJobLinkResultList(ctx context.Context, sel ast.SelectionSet, v *model.JobLinkResultList) graphql.Marshaler { + if v == nil { + if !graphql.HasFieldError(ctx, graphql.GetFieldContext(ctx)) { + ec.Errorf(ctx, "the requested element is null which the schema does not allow") + } + return graphql.Null + } + return ec._JobLinkResultList(ctx, sel, v) +} + func (ec *executionContext) marshalNJobMetric2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋpkgᚋschemaᚐJobMetric(ctx context.Context, sel ast.SelectionSet, v *schema.JobMetric) graphql.Marshaler { if v == nil { if !graphql.HasFieldError(ctx, graphql.GetFieldContext(ctx)) { diff --git a/internal/graph/model/models_gen.go b/internal/graph/model/models_gen.go index 5d3aedd..263db8f 100644 --- a/internal/graph/model/models_gen.go +++ b/internal/graph/model/models_gen.go @@ -56,6 +56,21 @@ type JobFilter struct { MemBwAvg *FloatRange `json:"memBwAvg"` LoadAvg *FloatRange `json:"loadAvg"` MemUsedMax *FloatRange `json:"memUsedMax"` + Exclusive *int `json:"exclusive"` + SharedNode *StringInput `json:"sharedNode"` + SelfJobID *StringInput `json:"selfJobId"` + SelfStartTime *time.Time `json:"selfStartTime"` + SelfDuration *int `json:"selfDuration"` +} + +type JobLink struct { + ID string `json:"id"` + JobID int `json:"jobId"` +} + +type JobLinkResultList struct { + Items []*JobLink `json:"items"` + Count *int `json:"count"` } type JobMetricWithName struct { @@ -105,6 +120,7 @@ type PageRequest struct { type StringInput struct { Eq *string `json:"eq"` + Neq *string `json:"neq"` Contains *string `json:"contains"` StartsWith *string `json:"startsWith"` EndsWith *string `json:"endsWith"` diff --git a/internal/graph/schema.resolvers.go b/internal/graph/schema.resolvers.go index e3cbd08..c4e86e0 100644 --- a/internal/graph/schema.resolvers.go +++ b/internal/graph/schema.resolvers.go @@ -234,6 +234,23 @@ func (r *queryResolver) Jobs(ctx context.Context, filter []*model.JobFilter, pag return &model.JobResultList{Items: jobs, Count: &count}, nil } +// JobsShared is the resolver for the jobsShared field. +func (r *queryResolver) JobsParallel(ctx context.Context, filter []*model.JobFilter) (*model.JobLinkResultList, error) { + jobLinks, err := r.Repo.QueryJobLinks(ctx, filter) + if err != nil { + log.Warn("Error while querying jobLinks") + return nil, err + } + + count, err := r.Repo.CountJobs(ctx, filter) + if err != nil { + log.Warn("Error while counting jobLinks") + return nil, err + } + + return &model.JobLinkResultList{Items: jobLinks, Count: &count}, nil +} + // JobsStatistics is the resolver for the jobsStatistics field. func (r *queryResolver) JobsStatistics(ctx context.Context, filter []*model.JobFilter, groupBy *model.Aggregate) ([]*model.JobsStatistics, error) { return r.Repo.JobsStatistics(ctx, filter, groupBy) diff --git a/internal/repository/job.go b/internal/repository/job.go index 81a19ec..50041d8 100644 --- a/internal/repository/job.go +++ b/internal/repository/job.go @@ -74,7 +74,7 @@ func scanJob(row interface{ Scan(...interface{}) error }) (*schema.Job, error) { &job.ID, &job.JobID, &job.User, &job.Project, &job.Cluster, &job.SubCluster, &job.StartTimeUnix, &job.Partition, &job.ArrayJobId, &job.NumNodes, &job.NumHWThreads, &job.NumAcc, &job.Exclusive, &job.MonitoringStatus, &job.SMT, &job.State, &job.Duration, &job.Walltime, &job.RawResources /*&job.RawMetaData*/); err != nil { - log.Warn("Error while scanning rows") + log.Warn("Error while scanning rows (Job)") return nil, err } @@ -96,6 +96,17 @@ func scanJob(row interface{ Scan(...interface{}) error }) (*schema.Job, error) { return job, nil } +func scanJobLink(row interface{ Scan(...interface{}) error }) (*model.JobLink, error) { + jobLink := &model.JobLink{} + if err := row.Scan( + &jobLink.ID, &jobLink.JobID); err != nil { + log.Warn("Error while scanning rows (jobLink)") + return nil, err + } + + return jobLink, nil +} + func (r *JobRepository) FetchJobName(job *schema.Job) (*string, error) { start := time.Now() cachekey := fmt.Sprintf("metadata:%d", job.ID) diff --git a/internal/repository/query.go b/internal/repository/query.go index fd90faf..d43bb7c 100644 --- a/internal/repository/query.go +++ b/internal/repository/query.go @@ -70,7 +70,7 @@ func (r *JobRepository) QueryJobs( job, err := scanJob(rows) if err != nil { rows.Close() - log.Warn("Error while scanning rows") + log.Warn("Error while scanning rows (Jobs)") return nil, err } jobs = append(jobs, job) @@ -79,6 +79,48 @@ func (r *JobRepository) QueryJobs( return jobs, nil } +// QueryJobLinks returns a list of minimal job information (DB-ID and jobId) of shared jobs for link-building based the provided filters. +func (r *JobRepository) QueryJobLinks( + ctx context.Context, + filters []*model.JobFilter) ([]*model.JobLink, error) { + + query, qerr := SecurityCheck(ctx, sq.Select("job.id", "job.job_id").From("job")) + + if qerr != nil { + return nil, qerr + } + + for _, f := range filters { + query = BuildWhereClause(f, query) + } + + sql, args, err := query.ToSql() + if err != nil { + log.Warn("Error while converting query to sql") + return nil, err + } + + log.Debugf("SQL query: `%s`, args: %#v", sql, args) + rows, err := query.RunWith(r.stmtCache).Query() + if err != nil { + log.Error("Error while running query") + return nil, err + } + + jobLinks := make([]*model.JobLink, 0, 50) + for rows.Next() { + jobLink, err := scanJobLink(rows) + if err != nil { + rows.Close() + log.Warn("Error while scanning rows (JobLinks)") + return nil, err + } + jobLinks = append(jobLinks, jobLink) + } + + return jobLinks, nil +} + // CountJobs counts the number of jobs matching the filters. func (r *JobRepository) CountJobs( ctx context.Context, @@ -187,6 +229,22 @@ func BuildWhereClause(filter *model.JobFilter, query sq.SelectBuilder) sq.Select if filter.MemUsedMax != nil { query = buildFloatCondition("job.mem_used_max", filter.MemUsedMax, query) } + // Shared Jobs Query + if filter.Exclusive != nil { + query = query.Where("job.exclusive = ?", *filter.Exclusive) + } + if filter.SharedNode != nil { + query = buildStringCondition("job.resources", filter.SharedNode, query) + } + if filter.SelfJobID != nil { + query = buildStringCondition("job.job_id", filter.SelfJobID, query) + } + if filter.SelfStartTime != nil && filter.SelfDuration != nil { // Offset of 30 minutes? + log.Debug("SET SELFTIME FILTERS") + start := filter.SelfStartTime.Unix() // There does not seam to be a portable way to get the current unix timestamp accross different DBs. + end := start + int64(*filter.SelfDuration) + query = query.Where("((job.start_time BETWEEN ? AND ?) OR ((job.start_time + job.duration) BETWEEN ? AND ?))", start, end, start, end) + } return query } @@ -214,6 +272,9 @@ func buildStringCondition(field string, cond *model.StringInput, query sq.Select if cond.Eq != nil { return query.Where(field+" = ?", *cond.Eq) } + if cond.Neq != nil { + return query.Where(field+" != ?", *cond.Neq) + } if cond.StartsWith != nil { return query.Where(field+" LIKE ?", fmt.Sprint(*cond.StartsWith, "%")) } diff --git a/web/frontend/src/Job.root.svelte b/web/frontend/src/Job.root.svelte index efccadf..62ce2b7 100644 --- a/web/frontend/src/Job.root.svelte +++ b/web/frontend/src/Job.root.svelte @@ -95,7 +95,7 @@ {#if $initq.error} {$initq.error.message} {:else if $initq.data} - + {:else} {/if} diff --git a/web/frontend/src/joblist/JobInfo.svelte b/web/frontend/src/joblist/JobInfo.svelte index db0361e..dd651e2 100644 --- a/web/frontend/src/joblist/JobInfo.svelte +++ b/web/frontend/src/joblist/JobInfo.svelte @@ -12,9 +12,11 @@
@@ -57,6 +80,19 @@ {scrambleNames ? scramble(job.project) : job.project} {/if} + {#if showParallelJobs && $parallelJobs.data != null} +
+ + {#if $parallelJobs.data.jobsParallel.count == 0} + No Parallel Jobs + {:else} + {#each $parallelJobs.data.jobsParallel.items as pjob, index} + + {pjob.jobId}{#if index != $parallelJobs.data.jobsParallel.count - 1},{/if} + + {/each} + {/if} + {/if}