diff --git a/graph/generated/generated.go b/graph/generated/generated.go index cc179cc..2d98de2 100644 --- a/graph/generated/generated.go +++ b/graph/generated/generated.go @@ -179,6 +179,7 @@ type ComplexityRoot struct { } Query struct { + AllocatedNodes func(childComplexity int, cluster string) int Clusters func(childComplexity int) int Job func(childComplexity int, id string) int JobMetrics func(childComplexity int, id string, metrics []string, scopes []schema.MetricScope) int @@ -272,6 +273,7 @@ type QueryResolver interface { Clusters(ctx context.Context) ([]*model.Cluster, error) Tags(ctx context.Context) ([]*schema.Tag, error) User(ctx context.Context, username string) (*model.User, error) + AllocatedNodes(ctx context.Context, cluster string) ([]string, error) Job(ctx context.Context, id string) (*schema.Job, error) JobMetrics(ctx context.Context, id string, metrics []string, scopes []schema.MetricScope) ([]*model.JobMetricWithName, error) JobsFootprints(ctx context.Context, filter []*model.JobFilter, metrics []string) (*model.Footprints, error) @@ -882,6 +884,18 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in return e.complexity.NodeMetrics.Metrics(childComplexity), true + case "Query.allocatedNodes": + if e.complexity.Query.AllocatedNodes == nil { + break + } + + args, err := ec.field_Query_allocatedNodes_args(context.TODO(), rawArgs) + if err != nil { + return 0, false + } + + return e.complexity.Query.AllocatedNodes(childComplexity, args["cluster"].(string)), true + case "Query.clusters": if e.complexity.Query.Clusters == nil { break @@ -1472,6 +1486,7 @@ type Query { tags: [Tag!]! # List of all tags user(username: String!): User + allocatedNodes(cluster: String!): [String!]! job(id: ID!): Job jobMetrics(id: ID!, metrics: [String!], scopes: [MetricScope!]): [JobMetricWithName!]! @@ -1709,6 +1724,21 @@ func (ec *executionContext) field_Query___type_args(ctx context.Context, rawArgs return args, nil } +func (ec *executionContext) field_Query_allocatedNodes_args(ctx context.Context, rawArgs map[string]interface{}) (map[string]interface{}, error) { + var err error + args := map[string]interface{}{} + var arg0 string + if tmp, ok := rawArgs["cluster"]; ok { + ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("cluster")) + arg0, err = ec.unmarshalNString2string(ctx, tmp) + if err != nil { + return nil, err + } + } + args["cluster"] = arg0 + return args, nil +} + func (ec *executionContext) field_Query_jobMetrics_args(ctx context.Context, rawArgs map[string]interface{}) (map[string]interface{}, error) { var err error args := map[string]interface{}{} @@ -4979,6 +5009,48 @@ func (ec *executionContext) _Query_user(ctx context.Context, field graphql.Colle return ec.marshalOUser2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋgraphᚋmodelᚐUser(ctx, field.Selections, res) } +func (ec *executionContext) _Query_allocatedNodes(ctx context.Context, field graphql.CollectedField) (ret graphql.Marshaler) { + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + fc := &graphql.FieldContext{ + Object: "Query", + Field: field, + Args: nil, + IsMethod: true, + IsResolver: true, + } + + ctx = graphql.WithFieldContext(ctx, fc) + rawArgs := field.ArgumentMap(ec.Variables) + args, err := ec.field_Query_allocatedNodes_args(ctx, rawArgs) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + fc.Args = args + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return ec.resolvers.Query().AllocatedNodes(rctx, args["cluster"].(string)) + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + if !graphql.HasFieldError(ctx, fc) { + ec.Errorf(ctx, "must not be null") + } + return graphql.Null + } + res := resTmp.([]string) + fc.Result = res + return ec.marshalNString2ᚕstringᚄ(ctx, field.Selections, res) +} + func (ec *executionContext) _Query_job(ctx context.Context, field graphql.CollectedField) (ret graphql.Marshaler) { defer func() { if r := recover(); r != nil { @@ -8852,6 +8924,20 @@ func (ec *executionContext) _Query(ctx context.Context, sel ast.SelectionSet) gr res = ec._Query_user(ctx, field) return res }) + case "allocatedNodes": + field := field + out.Concurrently(i, func() (res graphql.Marshaler) { + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + } + }() + res = ec._Query_allocatedNodes(ctx, field) + if res == graphql.Null { + atomic.AddUint32(&invalids, 1) + } + return res + }) case "job": field := field out.Concurrently(i, func() (res graphql.Marshaler) { diff --git a/graph/schema.graphqls b/graph/schema.graphqls index f3209e3..8ff12b2 100644 --- a/graph/schema.graphqls +++ b/graph/schema.graphqls @@ -157,6 +157,7 @@ type Query { tags: [Tag!]! # List of all tags user(username: String!): User + allocatedNodes(cluster: String!): [String!]! job(id: ID!): Job jobMetrics(id: ID!, metrics: [String!], scopes: [MetricScope!]): [JobMetricWithName!]! diff --git a/graph/schema.resolvers.go b/graph/schema.resolvers.go index 58fd99d..2f4ffce 100644 --- a/graph/schema.resolvers.go +++ b/graph/schema.resolvers.go @@ -110,6 +110,10 @@ func (r *queryResolver) User(ctx context.Context, username string) (*model.User, return auth.FetchUser(ctx, r.DB, username) } +func (r *queryResolver) AllocatedNodes(ctx context.Context, cluster string) ([]string, error) { + return r.Repo.AllocatedNodes(cluster) +} + func (r *queryResolver) Job(ctx context.Context, id string) (*schema.Job, error) { numericId, err := strconv.ParseInt(id, 10, 64) if err != nil { diff --git a/repository/job.go b/repository/job.go index f80672a..2538ce7 100644 --- a/repository/job.go +++ b/repository/job.go @@ -318,3 +318,33 @@ func (r *JobRepository) Partitions(cluster string) ([]string, error) { } return partitions.([]string), nil } + +func (r *JobRepository) AllocatedNodes(cluster string) ([]string, error) { + nodes := make(map[string]int) + rows, err := sq.Select("resources").From("job"). + Where("job.job_state = 'running'"). + Where("job.cluster = ?", cluster). + RunWith(r.stmtCache).Query() + if err != nil { + return nil, err + } + + var raw []byte + defer rows.Close() + for rows.Next() { + raw = raw[0:0] + var resources []*schema.Resource + if err := rows.Scan(&raw); err != nil { + return nil, err + } + if err := json.Unmarshal(raw, &resources); err != nil { + return nil, err + } + + for _, resource := range resources { + nodes[resource.Hostname] += 1 + } + } + + return nil, nil +}