From c9740e587db29ffe394d80471fc6348bfaee8f89 Mon Sep 17 00:00:00 2001 From: Lou Knauer Date: Thu, 24 Mar 2022 10:32:08 +0100 Subject: [PATCH 1/6] Add allocatedNodes to the GraphQL API --- graph/generated/generated.go | 86 ++++++++++++++++++++++++++++++++++++ graph/schema.graphqls | 1 + graph/schema.resolvers.go | 4 ++ repository/job.go | 30 +++++++++++++ 4 files changed, 121 insertions(+) diff --git a/graph/generated/generated.go b/graph/generated/generated.go index cc179cc..2d98de2 100644 --- a/graph/generated/generated.go +++ b/graph/generated/generated.go @@ -179,6 +179,7 @@ type ComplexityRoot struct { } Query struct { + AllocatedNodes func(childComplexity int, cluster string) int Clusters func(childComplexity int) int Job func(childComplexity int, id string) int JobMetrics func(childComplexity int, id string, metrics []string, scopes []schema.MetricScope) int @@ -272,6 +273,7 @@ type QueryResolver interface { Clusters(ctx context.Context) ([]*model.Cluster, error) Tags(ctx context.Context) ([]*schema.Tag, error) User(ctx context.Context, username string) (*model.User, error) + AllocatedNodes(ctx context.Context, cluster string) ([]string, error) Job(ctx context.Context, id string) (*schema.Job, error) JobMetrics(ctx context.Context, id string, metrics []string, scopes []schema.MetricScope) ([]*model.JobMetricWithName, error) JobsFootprints(ctx context.Context, filter []*model.JobFilter, metrics []string) (*model.Footprints, error) @@ -882,6 +884,18 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in return e.complexity.NodeMetrics.Metrics(childComplexity), true + case "Query.allocatedNodes": + if e.complexity.Query.AllocatedNodes == nil { + break + } + + args, err := ec.field_Query_allocatedNodes_args(context.TODO(), rawArgs) + if err != nil { + return 0, false + } + + return e.complexity.Query.AllocatedNodes(childComplexity, args["cluster"].(string)), true + case "Query.clusters": if e.complexity.Query.Clusters == nil { break @@ -1472,6 +1486,7 @@ type Query { tags: [Tag!]! # List of all tags user(username: String!): User + allocatedNodes(cluster: String!): [String!]! job(id: ID!): Job jobMetrics(id: ID!, metrics: [String!], scopes: [MetricScope!]): [JobMetricWithName!]! @@ -1709,6 +1724,21 @@ func (ec *executionContext) field_Query___type_args(ctx context.Context, rawArgs return args, nil } +func (ec *executionContext) field_Query_allocatedNodes_args(ctx context.Context, rawArgs map[string]interface{}) (map[string]interface{}, error) { + var err error + args := map[string]interface{}{} + var arg0 string + if tmp, ok := rawArgs["cluster"]; ok { + ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("cluster")) + arg0, err = ec.unmarshalNString2string(ctx, tmp) + if err != nil { + return nil, err + } + } + args["cluster"] = arg0 + return args, nil +} + func (ec *executionContext) field_Query_jobMetrics_args(ctx context.Context, rawArgs map[string]interface{}) (map[string]interface{}, error) { var err error args := map[string]interface{}{} @@ -4979,6 +5009,48 @@ func (ec *executionContext) _Query_user(ctx context.Context, field graphql.Colle return ec.marshalOUser2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋgraphᚋmodelᚐUser(ctx, field.Selections, res) } +func (ec *executionContext) _Query_allocatedNodes(ctx context.Context, field graphql.CollectedField) (ret graphql.Marshaler) { + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + fc := &graphql.FieldContext{ + Object: "Query", + Field: field, + Args: nil, + IsMethod: true, + IsResolver: true, + } + + ctx = graphql.WithFieldContext(ctx, fc) + rawArgs := field.ArgumentMap(ec.Variables) + args, err := ec.field_Query_allocatedNodes_args(ctx, rawArgs) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + fc.Args = args + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return ec.resolvers.Query().AllocatedNodes(rctx, args["cluster"].(string)) + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + if !graphql.HasFieldError(ctx, fc) { + ec.Errorf(ctx, "must not be null") + } + return graphql.Null + } + res := resTmp.([]string) + fc.Result = res + return ec.marshalNString2ᚕstringᚄ(ctx, field.Selections, res) +} + func (ec *executionContext) _Query_job(ctx context.Context, field graphql.CollectedField) (ret graphql.Marshaler) { defer func() { if r := recover(); r != nil { @@ -8852,6 +8924,20 @@ func (ec *executionContext) _Query(ctx context.Context, sel ast.SelectionSet) gr res = ec._Query_user(ctx, field) return res }) + case "allocatedNodes": + field := field + out.Concurrently(i, func() (res graphql.Marshaler) { + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + } + }() + res = ec._Query_allocatedNodes(ctx, field) + if res == graphql.Null { + atomic.AddUint32(&invalids, 1) + } + return res + }) case "job": field := field out.Concurrently(i, func() (res graphql.Marshaler) { diff --git a/graph/schema.graphqls b/graph/schema.graphqls index f3209e3..8ff12b2 100644 --- a/graph/schema.graphqls +++ b/graph/schema.graphqls @@ -157,6 +157,7 @@ type Query { tags: [Tag!]! # List of all tags user(username: String!): User + allocatedNodes(cluster: String!): [String!]! job(id: ID!): Job jobMetrics(id: ID!, metrics: [String!], scopes: [MetricScope!]): [JobMetricWithName!]! diff --git a/graph/schema.resolvers.go b/graph/schema.resolvers.go index 58fd99d..2f4ffce 100644 --- a/graph/schema.resolvers.go +++ b/graph/schema.resolvers.go @@ -110,6 +110,10 @@ func (r *queryResolver) User(ctx context.Context, username string) (*model.User, return auth.FetchUser(ctx, r.DB, username) } +func (r *queryResolver) AllocatedNodes(ctx context.Context, cluster string) ([]string, error) { + return r.Repo.AllocatedNodes(cluster) +} + func (r *queryResolver) Job(ctx context.Context, id string) (*schema.Job, error) { numericId, err := strconv.ParseInt(id, 10, 64) if err != nil { diff --git a/repository/job.go b/repository/job.go index f80672a..2538ce7 100644 --- a/repository/job.go +++ b/repository/job.go @@ -318,3 +318,33 @@ func (r *JobRepository) Partitions(cluster string) ([]string, error) { } return partitions.([]string), nil } + +func (r *JobRepository) AllocatedNodes(cluster string) ([]string, error) { + nodes := make(map[string]int) + rows, err := sq.Select("resources").From("job"). + Where("job.job_state = 'running'"). + Where("job.cluster = ?", cluster). + RunWith(r.stmtCache).Query() + if err != nil { + return nil, err + } + + var raw []byte + defer rows.Close() + for rows.Next() { + raw = raw[0:0] + var resources []*schema.Resource + if err := rows.Scan(&raw); err != nil { + return nil, err + } + if err := json.Unmarshal(raw, &resources); err != nil { + return nil, err + } + + for _, resource := range resources { + nodes[resource.Hostname] += 1 + } + } + + return nil, nil +} From b572ef2aefb7cf63674a593817c2113f9d493183 Mon Sep 17 00:00:00 2001 From: Lou Knauer Date: Thu, 24 Mar 2022 10:35:52 +0100 Subject: [PATCH 2/6] Add template for status page --- routes.go | 3 +++ server.go | 4 ++++ templates/monitoring/status.tmpl | 14 ++++++++++++++ templates/templates.go | 1 + 4 files changed, 22 insertions(+) create mode 100644 templates/monitoring/status.tmpl diff --git a/routes.go b/routes.go index aa51895..63350e5 100644 --- a/routes.go +++ b/routes.go @@ -1,6 +1,7 @@ package main import ( + "fmt" "net/http" "net/url" "strconv" @@ -39,6 +40,7 @@ var routes []Route = []Route{ {"/monitoring/systems/{cluster}", "monitoring/systems.tmpl", "Cluster - ClusterCockpit", false, setupClusterRoute}, {"/monitoring/node/{cluster}/{hostname}", "monitoring/node.tmpl", "Node - ClusterCockpit", false, setupNodeRoute}, {"/monitoring/analysis/{cluster}", "monitoring/analysis.tmpl", "Analaysis - ClusterCockpit", true, setupAnalysisRoute}, + {"/monitoring/status/{cluster}", "monitoring/status.tmpl", "Status of - ClusterCockpit", false, setupClusterRoute}, } func setupHomeRoute(i InfoType, r *http.Request) InfoType { @@ -118,6 +120,7 @@ func setupNodeRoute(i InfoType, r *http.Request) InfoType { vars := mux.Vars(r) i["cluster"] = vars["cluster"] i["hostname"] = vars["hostname"] + i["id"] = fmt.Sprintf("%s (%s)", vars["cluster"], vars["hostname"]) from, to := r.URL.Query().Get("from"), r.URL.Query().Get("to") if from != "" || to != "" { i["from"] = from diff --git a/server.go b/server.go index 197ed6e..28f210a 100644 --- a/server.go +++ b/server.go @@ -13,6 +13,7 @@ import ( "net/url" "os" "os/signal" + "runtime/debug" "strings" "sync" "syscall" @@ -476,6 +477,9 @@ func main() { api.OngoingArchivings.Wait() }() + if os.Getenv("GOGC") == "" { + debug.SetGCPercent(25) + } systemdNotifiy(true, "running") wg.Wait() log.Print("Gracefull shutdown completed!") diff --git a/templates/monitoring/status.tmpl b/templates/monitoring/status.tmpl new file mode 100644 index 0000000..15aff69 --- /dev/null +++ b/templates/monitoring/status.tmpl @@ -0,0 +1,14 @@ +{{define "content"}} +
+{{end}} + +{{define "stylesheets"}} + +{{end}} +{{define "javascript"}} + + +{{end}} diff --git a/templates/templates.go b/templates/templates.go index c5e248d..0d0b956 100644 --- a/templates/templates.go +++ b/templates/templates.go @@ -48,6 +48,7 @@ func init() { "monitoring/list.tmpl", "monitoring/user.tmpl", "monitoring/systems.tmpl", + "monitoring/status.tmpl", "monitoring/node.tmpl", "monitoring/analysis.tmpl", } From 0b83917294666c63db35f9cb372f5dfb2f94a62b Mon Sep 17 00:00:00 2001 From: Lou Knauer Date: Thu, 24 Mar 2022 14:34:42 +0100 Subject: [PATCH 3/6] Update nodeMetrics query; Add numberOfNodes to SubCluster type --- config/config.go | 19 ++++ graph/generated/generated.go | 165 ++++++++++++++++++++++++++-------- graph/model/models_gen.go | 6 +- graph/schema.graphqls | 8 +- graph/schema.resolvers.go | 9 +- metricdata/cc-metric-store.go | 2 +- metricdata/metricdata.go | 6 +- metricdata/utils.go | 2 +- 8 files changed, 163 insertions(+), 54 deletions(-) diff --git a/config/config.go b/config/config.go index ef49a12..76adeb0 100644 --- a/config/config.go +++ b/config/config.go @@ -277,3 +277,22 @@ func AssignSubCluster(job *schema.BaseJob) error { return fmt.Errorf("no subcluster found for cluster %#v and host %#v", job.Cluster, host0) } + +func GetSubClusterByNode(cluster, hostname string) (string, error) { + for sc, nl := range nodeLists[cluster] { + if nl != nil && nl.Contains(hostname) { + return sc, nil + } + } + + c := GetCluster(cluster) + if c == nil { + return "", fmt.Errorf("unkown cluster: %#v", cluster) + } + + if c.SubClusters[0].Nodes == "" { + return c.SubClusters[0].Name, nil + } + + return "", fmt.Errorf("no subcluster found for cluster %#v and host %#v", cluster, hostname) +} diff --git a/graph/generated/generated.go b/graph/generated/generated.go index 2d98de2..7e01446 100644 --- a/graph/generated/generated.go +++ b/graph/generated/generated.go @@ -174,8 +174,9 @@ type ComplexityRoot struct { } NodeMetrics struct { - Host func(childComplexity int) int - Metrics func(childComplexity int) int + Host func(childComplexity int) int + Metrics func(childComplexity int) int + SubCluster func(childComplexity int) int } Query struct { @@ -187,7 +188,7 @@ type ComplexityRoot struct { JobsCount func(childComplexity int, filter []*model.JobFilter, groupBy model.Aggregate, limit *int) int JobsFootprints func(childComplexity int, filter []*model.JobFilter, metrics []string) int JobsStatistics func(childComplexity int, filter []*model.JobFilter, groupBy *model.Aggregate) int - NodeMetrics func(childComplexity int, cluster string, partition *string, nodes []string, scopes []schema.MetricScope, metrics []string, from time.Time, to time.Time) int + NodeMetrics func(childComplexity int, cluster string, nodes []string, scopes []schema.MetricScope, metrics []string, from time.Time, to time.Time) int RooflineHeatmap func(childComplexity int, filter []*model.JobFilter, rows int, cols int, minX float64, minY float64, maxX float64, maxY float64) int Tags func(childComplexity int) int User func(childComplexity int, username string) int @@ -220,6 +221,7 @@ type ComplexityRoot struct { MemoryBandwidth func(childComplexity int) int Name func(childComplexity int) int Nodes func(childComplexity int) int + NumberOfNodes func(childComplexity int) int ProcessorType func(childComplexity int) int SocketsPerNode func(childComplexity int) int ThreadsPerCore func(childComplexity int) int @@ -281,7 +283,7 @@ type QueryResolver interface { JobsStatistics(ctx context.Context, filter []*model.JobFilter, groupBy *model.Aggregate) ([]*model.JobsStatistics, error) JobsCount(ctx context.Context, filter []*model.JobFilter, groupBy model.Aggregate, limit *int) ([]*model.Count, error) RooflineHeatmap(ctx context.Context, filter []*model.JobFilter, rows int, cols int, minX float64, minY float64, maxX float64, maxY float64) ([][]float64, error) - NodeMetrics(ctx context.Context, cluster string, partition *string, nodes []string, scopes []schema.MetricScope, metrics []string, from time.Time, to time.Time) ([]*model.NodeMetrics, error) + NodeMetrics(ctx context.Context, cluster string, nodes []string, scopes []schema.MetricScope, metrics []string, from time.Time, to time.Time) ([]*model.NodeMetrics, error) } type executableSchema struct { @@ -884,6 +886,13 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in return e.complexity.NodeMetrics.Metrics(childComplexity), true + case "NodeMetrics.subCluster": + if e.complexity.NodeMetrics.SubCluster == nil { + break + } + + return e.complexity.NodeMetrics.SubCluster(childComplexity), true + case "Query.allocatedNodes": if e.complexity.Query.AllocatedNodes == nil { break @@ -985,7 +994,7 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in return 0, false } - return e.complexity.Query.NodeMetrics(childComplexity, args["cluster"].(string), args["partition"].(*string), args["nodes"].([]string), args["scopes"].([]schema.MetricScope), args["metrics"].([]string), args["from"].(time.Time), args["to"].(time.Time)), true + return e.complexity.Query.NodeMetrics(childComplexity, args["cluster"].(string), args["nodes"].([]string), args["scopes"].([]schema.MetricScope), args["metrics"].([]string), args["from"].(time.Time), args["to"].(time.Time)), true case "Query.rooflineHeatmap": if e.complexity.Query.RooflineHeatmap == nil { @@ -1137,6 +1146,13 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in return e.complexity.SubCluster.Nodes(childComplexity), true + case "SubCluster.numberOfNodes": + if e.complexity.SubCluster.NumberOfNodes == nil { + break + } + + return e.complexity.SubCluster.NumberOfNodes(childComplexity), true + case "SubCluster.processorType": if e.complexity.SubCluster.ProcessorType == nil { break @@ -1371,6 +1387,7 @@ type Cluster { type SubCluster { name: String! nodes: String! + numberOfNodes: Int! processorType: String! socketsPerNode: Int! coresPerSocket: Int! @@ -1466,8 +1483,9 @@ type Footprints { enum Aggregate { USER, PROJECT, CLUSTER } type NodeMetrics { - host: String! - metrics: [JobMetricWithName!]! + host: String! + subCluster: String! + metrics: [JobMetricWithName!]! } type Count { @@ -1498,7 +1516,7 @@ type Query { rooflineHeatmap(filter: [JobFilter!]!, rows: Int!, cols: Int!, minX: Float!, minY: Float!, maxX: Float!, maxY: Float!): [[Float!]!]! - nodeMetrics(cluster: String!, partition: String, nodes: [String!], scopes: [MetricScope!], metrics: [String!], from: Time!, to: Time!): [NodeMetrics!]! + nodeMetrics(cluster: String!, nodes: [String!], scopes: [MetricScope!], metrics: [String!], from: Time!, to: Time!): [NodeMetrics!]! } type Mutation { @@ -1913,60 +1931,51 @@ func (ec *executionContext) field_Query_nodeMetrics_args(ctx context.Context, ra } } args["cluster"] = arg0 - var arg1 *string - if tmp, ok := rawArgs["partition"]; ok { - ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("partition")) - arg1, err = ec.unmarshalOString2ᚖstring(ctx, tmp) - if err != nil { - return nil, err - } - } - args["partition"] = arg1 - var arg2 []string + var arg1 []string if tmp, ok := rawArgs["nodes"]; ok { ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("nodes")) - arg2, err = ec.unmarshalOString2ᚕstringᚄ(ctx, tmp) + arg1, err = ec.unmarshalOString2ᚕstringᚄ(ctx, tmp) if err != nil { return nil, err } } - args["nodes"] = arg2 - var arg3 []schema.MetricScope + args["nodes"] = arg1 + var arg2 []schema.MetricScope if tmp, ok := rawArgs["scopes"]; ok { ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("scopes")) - arg3, err = ec.unmarshalOMetricScope2ᚕgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋschemaᚐMetricScopeᚄ(ctx, tmp) + arg2, err = ec.unmarshalOMetricScope2ᚕgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋschemaᚐMetricScopeᚄ(ctx, tmp) if err != nil { return nil, err } } - args["scopes"] = arg3 - var arg4 []string + args["scopes"] = arg2 + var arg3 []string if tmp, ok := rawArgs["metrics"]; ok { ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("metrics")) - arg4, err = ec.unmarshalOString2ᚕstringᚄ(ctx, tmp) + arg3, err = ec.unmarshalOString2ᚕstringᚄ(ctx, tmp) if err != nil { return nil, err } } - args["metrics"] = arg4 - var arg5 time.Time + args["metrics"] = arg3 + var arg4 time.Time if tmp, ok := rawArgs["from"]; ok { ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("from")) + arg4, err = ec.unmarshalNTime2timeᚐTime(ctx, tmp) + if err != nil { + return nil, err + } + } + args["from"] = arg4 + var arg5 time.Time + if tmp, ok := rawArgs["to"]; ok { + ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("to")) arg5, err = ec.unmarshalNTime2timeᚐTime(ctx, tmp) if err != nil { return nil, err } } - args["from"] = arg5 - var arg6 time.Time - if tmp, ok := rawArgs["to"]; ok { - ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("to")) - arg6, err = ec.unmarshalNTime2timeᚐTime(ctx, tmp) - if err != nil { - return nil, err - } - } - args["to"] = arg6 + args["to"] = arg5 return args, nil } @@ -4865,6 +4874,41 @@ func (ec *executionContext) _NodeMetrics_host(ctx context.Context, field graphql return ec.marshalNString2string(ctx, field.Selections, res) } +func (ec *executionContext) _NodeMetrics_subCluster(ctx context.Context, field graphql.CollectedField, obj *model.NodeMetrics) (ret graphql.Marshaler) { + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + fc := &graphql.FieldContext{ + Object: "NodeMetrics", + Field: field, + Args: nil, + IsMethod: false, + IsResolver: false, + } + + ctx = graphql.WithFieldContext(ctx, fc) + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return obj.SubCluster, nil + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + if !graphql.HasFieldError(ctx, fc) { + ec.Errorf(ctx, "must not be null") + } + return graphql.Null + } + res := resTmp.(string) + fc.Result = res + return ec.marshalNString2string(ctx, field.Selections, res) +} + func (ec *executionContext) _NodeMetrics_metrics(ctx context.Context, field graphql.CollectedField, obj *model.NodeMetrics) (ret graphql.Marshaler) { defer func() { if r := recover(); r != nil { @@ -5364,7 +5408,7 @@ func (ec *executionContext) _Query_nodeMetrics(ctx context.Context, field graphq fc.Args = args resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { ctx = rctx // use context from middleware stack in children - return ec.resolvers.Query().NodeMetrics(rctx, args["cluster"].(string), args["partition"].(*string), args["nodes"].([]string), args["scopes"].([]schema.MetricScope), args["metrics"].([]string), args["from"].(time.Time), args["to"].(time.Time)) + return ec.resolvers.Query().NodeMetrics(rctx, args["cluster"].(string), args["nodes"].([]string), args["scopes"].([]schema.MetricScope), args["metrics"].([]string), args["from"].(time.Time), args["to"].(time.Time)) }) if err != nil { ec.Error(ctx, err) @@ -5892,6 +5936,41 @@ func (ec *executionContext) _SubCluster_nodes(ctx context.Context, field graphql return ec.marshalNString2string(ctx, field.Selections, res) } +func (ec *executionContext) _SubCluster_numberOfNodes(ctx context.Context, field graphql.CollectedField, obj *model.SubCluster) (ret graphql.Marshaler) { + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + fc := &graphql.FieldContext{ + Object: "SubCluster", + Field: field, + Args: nil, + IsMethod: false, + IsResolver: false, + } + + ctx = graphql.WithFieldContext(ctx, fc) + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return obj.NumberOfNodes, nil + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + if !graphql.HasFieldError(ctx, fc) { + ec.Errorf(ctx, "must not be null") + } + return graphql.Null + } + res := resTmp.(int) + fc.Result = res + return ec.marshalNInt2int(ctx, field.Selections, res) +} + func (ec *executionContext) _SubCluster_processorType(ctx context.Context, field graphql.CollectedField, obj *model.SubCluster) (ret graphql.Marshaler) { defer func() { if r := recover(); r != nil { @@ -8854,6 +8933,11 @@ func (ec *executionContext) _NodeMetrics(ctx context.Context, sel ast.SelectionS if out.Values[i] == graphql.Null { invalids++ } + case "subCluster": + out.Values[i] = ec._NodeMetrics_subCluster(ctx, field, obj) + if out.Values[i] == graphql.Null { + invalids++ + } case "metrics": out.Values[i] = ec._NodeMetrics_metrics(ctx, field, obj) if out.Values[i] == graphql.Null { @@ -9186,6 +9270,11 @@ func (ec *executionContext) _SubCluster(ctx context.Context, sel ast.SelectionSe if out.Values[i] == graphql.Null { invalids++ } + case "numberOfNodes": + out.Values[i] = ec._SubCluster_numberOfNodes(ctx, field, obj) + if out.Values[i] == graphql.Null { + invalids++ + } case "processorType": out.Values[i] = ec._SubCluster_processorType(ctx, field, obj) if out.Values[i] == graphql.Null { diff --git a/graph/model/models_gen.go b/graph/model/models_gen.go index acd50bb..16e6590 100644 --- a/graph/model/models_gen.go +++ b/graph/model/models_gen.go @@ -114,8 +114,9 @@ type MetricFootprints struct { } type NodeMetrics struct { - Host string `json:"host"` - Metrics []*JobMetricWithName `json:"metrics"` + Host string `json:"host"` + SubCluster string `json:"subCluster"` + Metrics []*JobMetricWithName `json:"metrics"` } type OrderByInput struct { @@ -138,6 +139,7 @@ type StringInput struct { type SubCluster struct { Name string `json:"name"` Nodes string `json:"nodes"` + NumberOfNodes int `json:"numberOfNodes"` ProcessorType string `json:"processorType"` SocketsPerNode int `json:"socketsPerNode"` CoresPerSocket int `json:"coresPerSocket"` diff --git a/graph/schema.graphqls b/graph/schema.graphqls index 8ff12b2..c9f2cf5 100644 --- a/graph/schema.graphqls +++ b/graph/schema.graphqls @@ -42,6 +42,7 @@ type Cluster { type SubCluster { name: String! nodes: String! + numberOfNodes: Int! processorType: String! socketsPerNode: Int! coresPerSocket: Int! @@ -137,8 +138,9 @@ type Footprints { enum Aggregate { USER, PROJECT, CLUSTER } type NodeMetrics { - host: String! - metrics: [JobMetricWithName!]! + host: String! + subCluster: String! + metrics: [JobMetricWithName!]! } type Count { @@ -169,7 +171,7 @@ type Query { rooflineHeatmap(filter: [JobFilter!]!, rows: Int!, cols: Int!, minX: Float!, minY: Float!, maxX: Float!, maxY: Float!): [[Float!]!]! - nodeMetrics(cluster: String!, partition: String, nodes: [String!], scopes: [MetricScope!], metrics: [String!], from: Time!, to: Time!): [NodeMetrics!]! + nodeMetrics(cluster: String!, nodes: [String!], scopes: [MetricScope!], metrics: [String!], from: Time!, to: Time!): [NodeMetrics!]! } type Mutation { diff --git a/graph/schema.resolvers.go b/graph/schema.resolvers.go index 2f4ffce..d886cfe 100644 --- a/graph/schema.resolvers.go +++ b/graph/schema.resolvers.go @@ -209,23 +209,19 @@ func (r *queryResolver) RooflineHeatmap(ctx context.Context, filter []*model.Job return r.rooflineHeatmap(ctx, filter, rows, cols, minX, minY, maxX, maxY) } -func (r *queryResolver) NodeMetrics(ctx context.Context, cluster string, partition *string, nodes []string, scopes []schema.MetricScope, metrics []string, from time.Time, to time.Time) ([]*model.NodeMetrics, error) { +func (r *queryResolver) NodeMetrics(ctx context.Context, cluster string, nodes []string, scopes []schema.MetricScope, metrics []string, from time.Time, to time.Time) ([]*model.NodeMetrics, error) { user := auth.GetUser(ctx) if user != nil && !user.HasRole(auth.RoleAdmin) { return nil, errors.New("you need to be an administrator for this query") } - if partition == nil { - partition = new(string) - } - if metrics == nil { for _, mc := range config.GetCluster(cluster).MetricConfig { metrics = append(metrics, mc.Name) } } - data, err := metricdata.LoadNodeData(cluster, *partition, metrics, nodes, scopes, from, to, ctx) + data, err := metricdata.LoadNodeData(cluster, metrics, nodes, scopes, from, to, ctx) if err != nil { return nil, err } @@ -236,6 +232,7 @@ func (r *queryResolver) NodeMetrics(ctx context.Context, cluster string, partiti Host: hostname, Metrics: make([]*model.JobMetricWithName, 0, len(metrics)*len(scopes)), } + host.SubCluster, _ = config.GetSubClusterByNode(cluster, hostname) for metric, scopedMetrics := range metrics { for _, scopedMetric := range scopedMetrics { diff --git a/metricdata/cc-metric-store.go b/metricdata/cc-metric-store.go index c77d43d..2c3401d 100644 --- a/metricdata/cc-metric-store.go +++ b/metricdata/cc-metric-store.go @@ -499,7 +499,7 @@ func (ccms *CCMetricStore) LoadStats(job *schema.Job, metrics []string, ctx cont } // TODO: Support sub-node-scope metrics! For this, the partition of a node needs to be known! -func (ccms *CCMetricStore) LoadNodeData(cluster, partition string, metrics, nodes []string, scopes []schema.MetricScope, from, to time.Time, ctx context.Context) (map[string]map[string][]*schema.JobMetric, error) { +func (ccms *CCMetricStore) LoadNodeData(cluster string, metrics, nodes []string, scopes []schema.MetricScope, from, to time.Time, ctx context.Context) (map[string]map[string][]*schema.JobMetric, error) { req := ApiQueryRequest{ Cluster: cluster, From: from.Unix(), diff --git a/metricdata/metricdata.go b/metricdata/metricdata.go index 76fdcbe..edbce95 100644 --- a/metricdata/metricdata.go +++ b/metricdata/metricdata.go @@ -23,7 +23,7 @@ type MetricDataRepository interface { LoadStats(job *schema.Job, metrics []string, ctx context.Context) (map[string]map[string]schema.MetricStatistics, error) // Return a map of hosts to a map of metrics at the requested scopes for that node. - LoadNodeData(cluster, partition string, metrics, nodes []string, scopes []schema.MetricScope, from, to time.Time, ctx context.Context) (map[string]map[string][]*schema.JobMetric, error) + LoadNodeData(cluster string, metrics, nodes []string, scopes []schema.MetricScope, from, to time.Time, ctx context.Context) (map[string]map[string][]*schema.JobMetric, error) } var metricDataRepos map[string]MetricDataRepository = map[string]MetricDataRepository{} @@ -173,7 +173,7 @@ func LoadAverages(job *schema.Job, metrics []string, data [][]schema.Float, ctx } // Used for the node/system view. Returns a map of nodes to a map of metrics. -func LoadNodeData(cluster, partition string, metrics, nodes []string, scopes []schema.MetricScope, from, to time.Time, ctx context.Context) (map[string]map[string][]*schema.JobMetric, error) { +func LoadNodeData(cluster string, metrics, nodes []string, scopes []schema.MetricScope, from, to time.Time, ctx context.Context) (map[string]map[string][]*schema.JobMetric, error) { repo, ok := metricDataRepos[cluster] if !ok { return nil, fmt.Errorf("no metric data repository configured for '%s'", cluster) @@ -185,7 +185,7 @@ func LoadNodeData(cluster, partition string, metrics, nodes []string, scopes []s } } - data, err := repo.LoadNodeData(cluster, partition, metrics, nodes, scopes, from, to, ctx) + data, err := repo.LoadNodeData(cluster, metrics, nodes, scopes, from, to, ctx) if err != nil { if len(data) != 0 { log.Errorf("partial error: %s", err.Error()) diff --git a/metricdata/utils.go b/metricdata/utils.go index de6c573..27c7d10 100644 --- a/metricdata/utils.go +++ b/metricdata/utils.go @@ -32,6 +32,6 @@ func (tmdr *TestMetricDataRepository) LoadStats(job *schema.Job, metrics []strin panic("TODO") } -func (tmdr *TestMetricDataRepository) LoadNodeData(cluster, partition string, metrics, nodes []string, scopes []schema.MetricScope, from, to time.Time, ctx context.Context) (map[string]map[string][]*schema.JobMetric, error) { +func (tmdr *TestMetricDataRepository) LoadNodeData(cluster string, metrics, nodes []string, scopes []schema.MetricScope, from, to time.Time, ctx context.Context) (map[string]map[string][]*schema.JobMetric, error) { panic("TODO") } From f3c2a6cc247b0ed7721b40b4f654a660c4840423 Mon Sep 17 00:00:00 2001 From: Lou Knauer Date: Thu, 24 Mar 2022 16:08:47 +0100 Subject: [PATCH 4/6] Change allocatedNodes; Update frontend --- frontend | 2 +- graph/generated/generated.go | 8 ++++---- graph/schema.graphqls | 2 +- graph/schema.resolvers.go | 17 +++++++++++++++-- repository/job.go | 21 +++++++++++++++------ 5 files changed, 36 insertions(+), 14 deletions(-) diff --git a/frontend b/frontend index 03818be..239bf19 160000 --- a/frontend +++ b/frontend @@ -1 +1 @@ -Subproject commit 03818be47032194f1f450ab242eed56a94a3c5d1 +Subproject commit 239bf19c9a018b89db655d17c4e9adb8e64fb08e diff --git a/graph/generated/generated.go b/graph/generated/generated.go index 7e01446..9b76179 100644 --- a/graph/generated/generated.go +++ b/graph/generated/generated.go @@ -275,7 +275,7 @@ type QueryResolver interface { Clusters(ctx context.Context) ([]*model.Cluster, error) Tags(ctx context.Context) ([]*schema.Tag, error) User(ctx context.Context, username string) (*model.User, error) - AllocatedNodes(ctx context.Context, cluster string) ([]string, error) + AllocatedNodes(ctx context.Context, cluster string) ([]*model.Count, error) Job(ctx context.Context, id string) (*schema.Job, error) JobMetrics(ctx context.Context, id string, metrics []string, scopes []schema.MetricScope) ([]*model.JobMetricWithName, error) JobsFootprints(ctx context.Context, filter []*model.JobFilter, metrics []string) (*model.Footprints, error) @@ -1504,7 +1504,7 @@ type Query { tags: [Tag!]! # List of all tags user(username: String!): User - allocatedNodes(cluster: String!): [String!]! + allocatedNodes(cluster: String!): [Count!]! job(id: ID!): Job jobMetrics(id: ID!, metrics: [String!], scopes: [MetricScope!]): [JobMetricWithName!]! @@ -5090,9 +5090,9 @@ func (ec *executionContext) _Query_allocatedNodes(ctx context.Context, field gra } return graphql.Null } - res := resTmp.([]string) + res := resTmp.([]*model.Count) fc.Result = res - return ec.marshalNString2ᚕstringᚄ(ctx, field.Selections, res) + return ec.marshalNCount2ᚕᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋgraphᚋmodelᚐCountᚄ(ctx, field.Selections, res) } func (ec *executionContext) _Query_job(ctx context.Context, field graphql.CollectedField) (ret graphql.Marshaler) { diff --git a/graph/schema.graphqls b/graph/schema.graphqls index c9f2cf5..8bab641 100644 --- a/graph/schema.graphqls +++ b/graph/schema.graphqls @@ -159,7 +159,7 @@ type Query { tags: [Tag!]! # List of all tags user(username: String!): User - allocatedNodes(cluster: String!): [String!]! + allocatedNodes(cluster: String!): [Count!]! job(id: ID!): Job jobMetrics(id: ID!, metrics: [String!], scopes: [MetricScope!]): [JobMetricWithName!]! diff --git a/graph/schema.resolvers.go b/graph/schema.resolvers.go index d886cfe..ec22f45 100644 --- a/graph/schema.resolvers.go +++ b/graph/schema.resolvers.go @@ -110,8 +110,21 @@ func (r *queryResolver) User(ctx context.Context, username string) (*model.User, return auth.FetchUser(ctx, r.DB, username) } -func (r *queryResolver) AllocatedNodes(ctx context.Context, cluster string) ([]string, error) { - return r.Repo.AllocatedNodes(cluster) +func (r *queryResolver) AllocatedNodes(ctx context.Context, cluster string) ([]*model.Count, error) { + data, err := r.Repo.AllocatedNodes(cluster) + if err != nil { + return nil, err + } + + counts := make([]*model.Count, 0, len(data)) + for subcluster, hosts := range data { + counts = append(counts, &model.Count{ + Name: subcluster, + Count: len(hosts), + }) + } + + return counts, nil } func (r *queryResolver) Job(ctx context.Context, id string) (*schema.Job, error) { diff --git a/repository/job.go b/repository/job.go index 2538ce7..4118153 100644 --- a/repository/job.go +++ b/repository/job.go @@ -319,9 +319,11 @@ func (r *JobRepository) Partitions(cluster string) ([]string, error) { return partitions.([]string), nil } -func (r *JobRepository) AllocatedNodes(cluster string) ([]string, error) { - nodes := make(map[string]int) - rows, err := sq.Select("resources").From("job"). +// AllocatedNodes returns a map of all subclusters to a map of hostnames to the amount of jobs running on that host. +// Hosts with zero jobs running on them will not show up! +func (r *JobRepository) AllocatedNodes(cluster string) (map[string]map[string]int, error) { + subclusters := make(map[string]map[string]int) + rows, err := sq.Select("resources", "subcluster").From("job"). Where("job.job_state = 'running'"). Where("job.cluster = ?", cluster). RunWith(r.stmtCache).Query() @@ -334,17 +336,24 @@ func (r *JobRepository) AllocatedNodes(cluster string) ([]string, error) { for rows.Next() { raw = raw[0:0] var resources []*schema.Resource - if err := rows.Scan(&raw); err != nil { + var subcluster string + if err := rows.Scan(&raw, &subcluster); err != nil { return nil, err } if err := json.Unmarshal(raw, &resources); err != nil { return nil, err } + hosts, ok := subclusters[subcluster] + if !ok { + hosts = make(map[string]int) + subclusters[subcluster] = hosts + } + for _, resource := range resources { - nodes[resource.Hostname] += 1 + hosts[resource.Hostname] += 1 } } - return nil, nil + return subclusters, nil } From b4c541fb73b8f6307cd5a1d6d63a856e19aee52d Mon Sep 17 00:00:00 2001 From: Lou Knauer Date: Fri, 25 Mar 2022 10:20:33 +0100 Subject: [PATCH 5/6] Allow weighting job counts --- graph/generated/generated.go | 46 ++++++++++++++++++++++++++++-------- graph/model/models_gen.go | 41 ++++++++++++++++++++++++++++++++ graph/schema.graphqls | 3 ++- graph/schema.resolvers.go | 4 ++-- repository/job.go | 19 ++++++++++++--- routes.go | 7 +++--- 6 files changed, 100 insertions(+), 20 deletions(-) diff --git a/graph/generated/generated.go b/graph/generated/generated.go index 9b76179..91db1f1 100644 --- a/graph/generated/generated.go +++ b/graph/generated/generated.go @@ -185,7 +185,7 @@ type ComplexityRoot struct { Job func(childComplexity int, id string) int JobMetrics func(childComplexity int, id string, metrics []string, scopes []schema.MetricScope) int Jobs func(childComplexity int, filter []*model.JobFilter, page *model.PageRequest, order *model.OrderByInput) int - JobsCount func(childComplexity int, filter []*model.JobFilter, groupBy model.Aggregate, limit *int) int + JobsCount func(childComplexity int, filter []*model.JobFilter, groupBy model.Aggregate, weight *model.Weights, limit *int) int JobsFootprints func(childComplexity int, filter []*model.JobFilter, metrics []string) int JobsStatistics func(childComplexity int, filter []*model.JobFilter, groupBy *model.Aggregate) int NodeMetrics func(childComplexity int, cluster string, nodes []string, scopes []schema.MetricScope, metrics []string, from time.Time, to time.Time) int @@ -281,7 +281,7 @@ type QueryResolver interface { JobsFootprints(ctx context.Context, filter []*model.JobFilter, metrics []string) (*model.Footprints, error) Jobs(ctx context.Context, filter []*model.JobFilter, page *model.PageRequest, order *model.OrderByInput) (*model.JobResultList, error) JobsStatistics(ctx context.Context, filter []*model.JobFilter, groupBy *model.Aggregate) ([]*model.JobsStatistics, error) - JobsCount(ctx context.Context, filter []*model.JobFilter, groupBy model.Aggregate, limit *int) ([]*model.Count, error) + JobsCount(ctx context.Context, filter []*model.JobFilter, groupBy model.Aggregate, weight *model.Weights, limit *int) ([]*model.Count, error) RooflineHeatmap(ctx context.Context, filter []*model.JobFilter, rows int, cols int, minX float64, minY float64, maxX float64, maxY float64) ([][]float64, error) NodeMetrics(ctx context.Context, cluster string, nodes []string, scopes []schema.MetricScope, metrics []string, from time.Time, to time.Time) ([]*model.NodeMetrics, error) } @@ -958,7 +958,7 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in return 0, false } - return e.complexity.Query.JobsCount(childComplexity, args["filter"].([]*model.JobFilter), args["groupBy"].(model.Aggregate), args["limit"].(*int)), true + return e.complexity.Query.JobsCount(childComplexity, args["filter"].([]*model.JobFilter), args["groupBy"].(model.Aggregate), args["weight"].(*model.Weights), args["limit"].(*int)), true case "Query.jobsFootprints": if e.complexity.Query.JobsFootprints == nil { @@ -1481,6 +1481,7 @@ type Footprints { } enum Aggregate { USER, PROJECT, CLUSTER } +enum Weights { NODE_COUNT, NODE_HOURS } type NodeMetrics { host: String! @@ -1512,7 +1513,7 @@ type Query { jobs(filter: [JobFilter!], page: PageRequest, order: OrderByInput): JobResultList! jobsStatistics(filter: [JobFilter!], groupBy: Aggregate): [JobsStatistics!]! - jobsCount(filter: [JobFilter]!, groupBy: Aggregate!, limit: Int): [Count!]! + jobsCount(filter: [JobFilter]!, groupBy: Aggregate!, weight: Weights, limit: Int): [Count!]! rooflineHeatmap(filter: [JobFilter!]!, rows: Int!, cols: Int!, minX: Float!, minY: Float!, maxX: Float!, maxY: Float!): [[Float!]!]! @@ -1826,15 +1827,24 @@ func (ec *executionContext) field_Query_jobsCount_args(ctx context.Context, rawA } } args["groupBy"] = arg1 - var arg2 *int - if tmp, ok := rawArgs["limit"]; ok { - ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("limit")) - arg2, err = ec.unmarshalOInt2ᚖint(ctx, tmp) + var arg2 *model.Weights + if tmp, ok := rawArgs["weight"]; ok { + ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("weight")) + arg2, err = ec.unmarshalOWeights2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋgraphᚋmodelᚐWeights(ctx, tmp) if err != nil { return nil, err } } - args["limit"] = arg2 + args["weight"] = arg2 + var arg3 *int + if tmp, ok := rawArgs["limit"]; ok { + ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("limit")) + arg3, err = ec.unmarshalOInt2ᚖint(ctx, tmp) + if err != nil { + return nil, err + } + } + args["limit"] = arg3 return args, nil } @@ -5324,7 +5334,7 @@ func (ec *executionContext) _Query_jobsCount(ctx context.Context, field graphql. fc.Args = args resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { ctx = rctx // use context from middleware stack in children - return ec.resolvers.Query().JobsCount(rctx, args["filter"].([]*model.JobFilter), args["groupBy"].(model.Aggregate), args["limit"].(*int)) + return ec.resolvers.Query().JobsCount(rctx, args["filter"].([]*model.JobFilter), args["groupBy"].(model.Aggregate), args["weight"].(*model.Weights), args["limit"].(*int)) }) if err != nil { ec.Error(ctx, err) @@ -11518,6 +11528,22 @@ func (ec *executionContext) marshalOUser2ᚖgithubᚗcomᚋClusterCockpitᚋcc return ec._User(ctx, sel, v) } +func (ec *executionContext) unmarshalOWeights2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋgraphᚋmodelᚐWeights(ctx context.Context, v interface{}) (*model.Weights, error) { + if v == nil { + return nil, nil + } + var res = new(model.Weights) + err := res.UnmarshalGQL(v) + return res, graphql.ErrorOnPath(ctx, err) +} + +func (ec *executionContext) marshalOWeights2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋgraphᚋmodelᚐWeights(ctx context.Context, sel ast.SelectionSet, v *model.Weights) graphql.Marshaler { + if v == nil { + return graphql.Null + } + return v +} + func (ec *executionContext) marshalO__EnumValue2ᚕgithubᚗcomᚋ99designsᚋgqlgenᚋgraphqlᚋintrospectionᚐEnumValueᚄ(ctx context.Context, sel ast.SelectionSet, v []introspection.EnumValue) graphql.Marshaler { if v == nil { return graphql.Null diff --git a/graph/model/models_gen.go b/graph/model/models_gen.go index 16e6590..46baa30 100644 --- a/graph/model/models_gen.go +++ b/graph/model/models_gen.go @@ -258,3 +258,44 @@ func (e *SortDirectionEnum) UnmarshalGQL(v interface{}) error { func (e SortDirectionEnum) MarshalGQL(w io.Writer) { fmt.Fprint(w, strconv.Quote(e.String())) } + +type Weights string + +const ( + WeightsNodeCount Weights = "NODE_COUNT" + WeightsNodeHours Weights = "NODE_HOURS" +) + +var AllWeights = []Weights{ + WeightsNodeCount, + WeightsNodeHours, +} + +func (e Weights) IsValid() bool { + switch e { + case WeightsNodeCount, WeightsNodeHours: + return true + } + return false +} + +func (e Weights) String() string { + return string(e) +} + +func (e *Weights) UnmarshalGQL(v interface{}) error { + str, ok := v.(string) + if !ok { + return fmt.Errorf("enums must be strings") + } + + *e = Weights(str) + if !e.IsValid() { + return fmt.Errorf("%s is not a valid Weights", str) + } + return nil +} + +func (e Weights) MarshalGQL(w io.Writer) { + fmt.Fprint(w, strconv.Quote(e.String())) +} diff --git a/graph/schema.graphqls b/graph/schema.graphqls index 8bab641..793fe69 100644 --- a/graph/schema.graphqls +++ b/graph/schema.graphqls @@ -136,6 +136,7 @@ type Footprints { } enum Aggregate { USER, PROJECT, CLUSTER } +enum Weights { NODE_COUNT, NODE_HOURS } type NodeMetrics { host: String! @@ -167,7 +168,7 @@ type Query { jobs(filter: [JobFilter!], page: PageRequest, order: OrderByInput): JobResultList! jobsStatistics(filter: [JobFilter!], groupBy: Aggregate): [JobsStatistics!]! - jobsCount(filter: [JobFilter]!, groupBy: Aggregate!, limit: Int): [Count!]! + jobsCount(filter: [JobFilter]!, groupBy: Aggregate!, weight: Weights, limit: Int): [Count!]! rooflineHeatmap(filter: [JobFilter!]!, rows: Int!, cols: Int!, minX: Float!, minY: Float!, maxX: Float!, maxY: Float!): [[Float!]!]! diff --git a/graph/schema.resolvers.go b/graph/schema.resolvers.go index ec22f45..46b4d7f 100644 --- a/graph/schema.resolvers.go +++ b/graph/schema.resolvers.go @@ -202,8 +202,8 @@ func (r *queryResolver) JobsStatistics(ctx context.Context, filter []*model.JobF return r.jobsStatistics(ctx, filter, groupBy) } -func (r *queryResolver) JobsCount(ctx context.Context, filter []*model.JobFilter, groupBy model.Aggregate, limit *int) ([]*model.Count, error) { - counts, err := r.Repo.CountGroupedJobs(ctx, groupBy, filter, limit) +func (r *queryResolver) JobsCount(ctx context.Context, filter []*model.JobFilter, groupBy model.Aggregate, weight *model.Weights, limit *int) ([]*model.Count, error) { + counts, err := r.Repo.CountGroupedJobs(ctx, groupBy, filter, weight, limit) if err != nil { return nil, err } diff --git a/repository/job.go b/repository/job.go index 4118153..d6866ca 100644 --- a/repository/job.go +++ b/repository/job.go @@ -193,12 +193,25 @@ func (r *JobRepository) Stop( } // TODO: Use node hours instead: SELECT job.user, sum(job.num_nodes * (CASE WHEN job.job_state = "running" THEN CAST(strftime('%s', 'now') AS INTEGER) - job.start_time ELSE job.duration END)) as x FROM job GROUP BY user ORDER BY x DESC; -func (r *JobRepository) CountGroupedJobs(ctx context.Context, aggreg model.Aggregate, filters []*model.JobFilter, limit *int) (map[string]int, error) { +func (r *JobRepository) CountGroupedJobs(ctx context.Context, aggreg model.Aggregate, filters []*model.JobFilter, weight *model.Weights, limit *int) (map[string]int, error) { if !aggreg.IsValid() { return nil, errors.New("invalid aggregate") } - q := sq.Select("job."+string(aggreg), "count(*) as count").From("job").GroupBy("job." + string(aggreg)).OrderBy("count DESC") + runner := (sq.BaseRunner)(r.stmtCache) + count := "count(*) as count" + if weight != nil { + switch *weight { + case model.WeightsNodeCount: + count = "sum(job.num_nodes) as count" + case model.WeightsNodeHours: + now := time.Now().Unix() + count = fmt.Sprintf(`sum(job.num_nodes * (CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END)) as count`, now) + runner = r.DB + } + } + + q := sq.Select("job."+string(aggreg), count).From("job").GroupBy("job." + string(aggreg)).OrderBy("count DESC") q = SecurityCheck(ctx, q) for _, f := range filters { q = BuildWhereClause(f, q) @@ -208,7 +221,7 @@ func (r *JobRepository) CountGroupedJobs(ctx context.Context, aggreg model.Aggre } counts := map[string]int{} - rows, err := q.RunWith(r.DB).Query() + rows, err := q.RunWith(runner).Query() if err != nil { return nil, err } diff --git a/routes.go b/routes.go index 63350e5..9885b94 100644 --- a/routes.go +++ b/routes.go @@ -53,22 +53,21 @@ func setupHomeRoute(i InfoType, r *http.Request) InfoType { runningJobs, err := jobRepo.CountGroupedJobs(r.Context(), model.AggregateCluster, []*model.JobFilter{{ State: []schema.JobState{schema.JobStateRunning}, - }}, nil) + }}, nil, nil) if err != nil { log.Errorf("failed to count jobs: %s", err.Error()) runningJobs = map[string]int{} } - totalJobs, err := jobRepo.CountGroupedJobs(r.Context(), model.AggregateCluster, nil, nil) + totalJobs, err := jobRepo.CountGroupedJobs(r.Context(), model.AggregateCluster, nil, nil, nil) if err != nil { log.Errorf("failed to count jobs: %s", err.Error()) totalJobs = map[string]int{} } - from := time.Now().Add(-24 * time.Hour) recentShortJobs, err := jobRepo.CountGroupedJobs(r.Context(), model.AggregateCluster, []*model.JobFilter{{ StartTime: &model.TimeRange{From: &from, To: nil}, Duration: &model.IntRange{From: 0, To: graph.ShortJobDuration}, - }}, nil) + }}, nil, nil) if err != nil { log.Errorf("failed to count jobs: %s", err.Error()) recentShortJobs = map[string]int{} From 1bdb0cc0b3ade28d65cbcc4d7cd61a8f1a68f630 Mon Sep 17 00:00:00 2001 From: Lou Knauer Date: Fri, 25 Mar 2022 10:20:49 +0100 Subject: [PATCH 6/6] Update frontend --- frontend | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/frontend b/frontend index 239bf19..0260253 160000 --- a/frontend +++ b/frontend @@ -1 +1 @@ -Subproject commit 239bf19c9a018b89db655d17c4e9adb8e64fb08e +Subproject commit 0260253f4240eb04d76cbc2bceb3cd494557ee80