Merge branch 'master' into add-influxdb2-client

This commit is contained in:
Jan Eitzinger 2022-03-29 15:43:53 +02:00
commit f1fd930916
16 changed files with 430 additions and 102 deletions

View File

@ -277,3 +277,22 @@ func AssignSubCluster(job *schema.BaseJob) error {
return fmt.Errorf("no subcluster found for cluster %#v and host %#v", job.Cluster, host0)
}
func GetSubClusterByNode(cluster, hostname string) (string, error) {
for sc, nl := range nodeLists[cluster] {
if nl != nil && nl.Contains(hostname) {
return sc, nil
}
}
c := GetCluster(cluster)
if c == nil {
return "", fmt.Errorf("unkown cluster: %#v", cluster)
}
if c.SubClusters[0].Nodes == "" {
return c.SubClusters[0].Name, nil
}
return "", fmt.Errorf("no subcluster found for cluster %#v and host %#v", cluster, hostname)
}

@ -1 +1 @@
Subproject commit 184a5079bd53a89442664c28c5cebb733557f1f1
Subproject commit 7344533214256e4441040852f7d8f1af3298804c

View File

@ -176,17 +176,19 @@ type ComplexityRoot struct {
NodeMetrics struct {
Host func(childComplexity int) int
Metrics func(childComplexity int) int
SubCluster func(childComplexity int) int
}
Query struct {
AllocatedNodes func(childComplexity int, cluster string) int
Clusters func(childComplexity int) int
Job func(childComplexity int, id string) int
JobMetrics func(childComplexity int, id string, metrics []string, scopes []schema.MetricScope) int
Jobs func(childComplexity int, filter []*model.JobFilter, page *model.PageRequest, order *model.OrderByInput) int
JobsCount func(childComplexity int, filter []*model.JobFilter, groupBy model.Aggregate, limit *int) int
JobsCount func(childComplexity int, filter []*model.JobFilter, groupBy model.Aggregate, weight *model.Weights, limit *int) int
JobsFootprints func(childComplexity int, filter []*model.JobFilter, metrics []string) int
JobsStatistics func(childComplexity int, filter []*model.JobFilter, groupBy *model.Aggregate) int
NodeMetrics func(childComplexity int, cluster string, partition *string, nodes []string, scopes []schema.MetricScope, metrics []string, from time.Time, to time.Time) int
NodeMetrics func(childComplexity int, cluster string, nodes []string, scopes []schema.MetricScope, metrics []string, from time.Time, to time.Time) int
RooflineHeatmap func(childComplexity int, filter []*model.JobFilter, rows int, cols int, minX float64, minY float64, maxX float64, maxY float64) int
Tags func(childComplexity int) int
User func(childComplexity int, username string) int
@ -219,6 +221,7 @@ type ComplexityRoot struct {
MemoryBandwidth func(childComplexity int) int
Name func(childComplexity int) int
Nodes func(childComplexity int) int
NumberOfNodes func(childComplexity int) int
ProcessorType func(childComplexity int) int
SocketsPerNode func(childComplexity int) int
ThreadsPerCore func(childComplexity int) int
@ -272,14 +275,15 @@ type QueryResolver interface {
Clusters(ctx context.Context) ([]*model.Cluster, error)
Tags(ctx context.Context) ([]*schema.Tag, error)
User(ctx context.Context, username string) (*model.User, error)
AllocatedNodes(ctx context.Context, cluster string) ([]*model.Count, error)
Job(ctx context.Context, id string) (*schema.Job, error)
JobMetrics(ctx context.Context, id string, metrics []string, scopes []schema.MetricScope) ([]*model.JobMetricWithName, error)
JobsFootprints(ctx context.Context, filter []*model.JobFilter, metrics []string) (*model.Footprints, error)
Jobs(ctx context.Context, filter []*model.JobFilter, page *model.PageRequest, order *model.OrderByInput) (*model.JobResultList, error)
JobsStatistics(ctx context.Context, filter []*model.JobFilter, groupBy *model.Aggregate) ([]*model.JobsStatistics, error)
JobsCount(ctx context.Context, filter []*model.JobFilter, groupBy model.Aggregate, limit *int) ([]*model.Count, error)
JobsCount(ctx context.Context, filter []*model.JobFilter, groupBy model.Aggregate, weight *model.Weights, limit *int) ([]*model.Count, error)
RooflineHeatmap(ctx context.Context, filter []*model.JobFilter, rows int, cols int, minX float64, minY float64, maxX float64, maxY float64) ([][]float64, error)
NodeMetrics(ctx context.Context, cluster string, partition *string, nodes []string, scopes []schema.MetricScope, metrics []string, from time.Time, to time.Time) ([]*model.NodeMetrics, error)
NodeMetrics(ctx context.Context, cluster string, nodes []string, scopes []schema.MetricScope, metrics []string, from time.Time, to time.Time) ([]*model.NodeMetrics, error)
}
type executableSchema struct {
@ -882,6 +886,25 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in
return e.complexity.NodeMetrics.Metrics(childComplexity), true
case "NodeMetrics.subCluster":
if e.complexity.NodeMetrics.SubCluster == nil {
break
}
return e.complexity.NodeMetrics.SubCluster(childComplexity), true
case "Query.allocatedNodes":
if e.complexity.Query.AllocatedNodes == nil {
break
}
args, err := ec.field_Query_allocatedNodes_args(context.TODO(), rawArgs)
if err != nil {
return 0, false
}
return e.complexity.Query.AllocatedNodes(childComplexity, args["cluster"].(string)), true
case "Query.clusters":
if e.complexity.Query.Clusters == nil {
break
@ -935,7 +958,7 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in
return 0, false
}
return e.complexity.Query.JobsCount(childComplexity, args["filter"].([]*model.JobFilter), args["groupBy"].(model.Aggregate), args["limit"].(*int)), true
return e.complexity.Query.JobsCount(childComplexity, args["filter"].([]*model.JobFilter), args["groupBy"].(model.Aggregate), args["weight"].(*model.Weights), args["limit"].(*int)), true
case "Query.jobsFootprints":
if e.complexity.Query.JobsFootprints == nil {
@ -971,7 +994,7 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in
return 0, false
}
return e.complexity.Query.NodeMetrics(childComplexity, args["cluster"].(string), args["partition"].(*string), args["nodes"].([]string), args["scopes"].([]schema.MetricScope), args["metrics"].([]string), args["from"].(time.Time), args["to"].(time.Time)), true
return e.complexity.Query.NodeMetrics(childComplexity, args["cluster"].(string), args["nodes"].([]string), args["scopes"].([]schema.MetricScope), args["metrics"].([]string), args["from"].(time.Time), args["to"].(time.Time)), true
case "Query.rooflineHeatmap":
if e.complexity.Query.RooflineHeatmap == nil {
@ -1123,6 +1146,13 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in
return e.complexity.SubCluster.Nodes(childComplexity), true
case "SubCluster.numberOfNodes":
if e.complexity.SubCluster.NumberOfNodes == nil {
break
}
return e.complexity.SubCluster.NumberOfNodes(childComplexity), true
case "SubCluster.processorType":
if e.complexity.SubCluster.ProcessorType == nil {
break
@ -1357,6 +1387,7 @@ type Cluster {
type SubCluster {
name: String!
nodes: String!
numberOfNodes: Int!
processorType: String!
socketsPerNode: Int!
coresPerSocket: Int!
@ -1450,9 +1481,11 @@ type Footprints {
}
enum Aggregate { USER, PROJECT, CLUSTER }
enum Weights { NODE_COUNT, NODE_HOURS }
type NodeMetrics {
host: String!
subCluster: String!
metrics: [JobMetricWithName!]!
}
@ -1472,6 +1505,7 @@ type Query {
tags: [Tag!]! # List of all tags
user(username: String!): User
allocatedNodes(cluster: String!): [Count!]!
job(id: ID!): Job
jobMetrics(id: ID!, metrics: [String!], scopes: [MetricScope!]): [JobMetricWithName!]!
@ -1479,11 +1513,11 @@ type Query {
jobs(filter: [JobFilter!], page: PageRequest, order: OrderByInput): JobResultList!
jobsStatistics(filter: [JobFilter!], groupBy: Aggregate): [JobsStatistics!]!
jobsCount(filter: [JobFilter]!, groupBy: Aggregate!, limit: Int): [Count!]!
jobsCount(filter: [JobFilter]!, groupBy: Aggregate!, weight: Weights, limit: Int): [Count!]!
rooflineHeatmap(filter: [JobFilter!]!, rows: Int!, cols: Int!, minX: Float!, minY: Float!, maxX: Float!, maxY: Float!): [[Float!]!]!
nodeMetrics(cluster: String!, partition: String, nodes: [String!], scopes: [MetricScope!], metrics: [String!], from: Time!, to: Time!): [NodeMetrics!]!
nodeMetrics(cluster: String!, nodes: [String!], scopes: [MetricScope!], metrics: [String!], from: Time!, to: Time!): [NodeMetrics!]!
}
type Mutation {
@ -1709,6 +1743,21 @@ func (ec *executionContext) field_Query___type_args(ctx context.Context, rawArgs
return args, nil
}
func (ec *executionContext) field_Query_allocatedNodes_args(ctx context.Context, rawArgs map[string]interface{}) (map[string]interface{}, error) {
var err error
args := map[string]interface{}{}
var arg0 string
if tmp, ok := rawArgs["cluster"]; ok {
ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("cluster"))
arg0, err = ec.unmarshalNString2string(ctx, tmp)
if err != nil {
return nil, err
}
}
args["cluster"] = arg0
return args, nil
}
func (ec *executionContext) field_Query_jobMetrics_args(ctx context.Context, rawArgs map[string]interface{}) (map[string]interface{}, error) {
var err error
args := map[string]interface{}{}
@ -1778,15 +1827,24 @@ func (ec *executionContext) field_Query_jobsCount_args(ctx context.Context, rawA
}
}
args["groupBy"] = arg1
var arg2 *int
if tmp, ok := rawArgs["limit"]; ok {
ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("limit"))
arg2, err = ec.unmarshalOInt2ᚖint(ctx, tmp)
var arg2 *model.Weights
if tmp, ok := rawArgs["weight"]; ok {
ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("weight"))
arg2, err = ec.unmarshalOWeights2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋgraphᚋmodelᚐWeights(ctx, tmp)
if err != nil {
return nil, err
}
}
args["limit"] = arg2
args["weight"] = arg2
var arg3 *int
if tmp, ok := rawArgs["limit"]; ok {
ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("limit"))
arg3, err = ec.unmarshalOInt2ᚖint(ctx, tmp)
if err != nil {
return nil, err
}
}
args["limit"] = arg3
return args, nil
}
@ -1883,60 +1941,51 @@ func (ec *executionContext) field_Query_nodeMetrics_args(ctx context.Context, ra
}
}
args["cluster"] = arg0
var arg1 *string
if tmp, ok := rawArgs["partition"]; ok {
ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("partition"))
arg1, err = ec.unmarshalOString2ᚖstring(ctx, tmp)
if err != nil {
return nil, err
}
}
args["partition"] = arg1
var arg2 []string
var arg1 []string
if tmp, ok := rawArgs["nodes"]; ok {
ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("nodes"))
arg2, err = ec.unmarshalOString2ᚕstringᚄ(ctx, tmp)
arg1, err = ec.unmarshalOString2ᚕstringᚄ(ctx, tmp)
if err != nil {
return nil, err
}
}
args["nodes"] = arg2
var arg3 []schema.MetricScope
args["nodes"] = arg1
var arg2 []schema.MetricScope
if tmp, ok := rawArgs["scopes"]; ok {
ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("scopes"))
arg3, err = ec.unmarshalOMetricScope2ᚕgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋschemaᚐMetricScopeᚄ(ctx, tmp)
arg2, err = ec.unmarshalOMetricScope2ᚕgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋschemaᚐMetricScopeᚄ(ctx, tmp)
if err != nil {
return nil, err
}
}
args["scopes"] = arg3
var arg4 []string
args["scopes"] = arg2
var arg3 []string
if tmp, ok := rawArgs["metrics"]; ok {
ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("metrics"))
arg4, err = ec.unmarshalOString2ᚕstringᚄ(ctx, tmp)
arg3, err = ec.unmarshalOString2ᚕstringᚄ(ctx, tmp)
if err != nil {
return nil, err
}
}
args["metrics"] = arg4
var arg5 time.Time
args["metrics"] = arg3
var arg4 time.Time
if tmp, ok := rawArgs["from"]; ok {
ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("from"))
arg4, err = ec.unmarshalNTime2timeᚐTime(ctx, tmp)
if err != nil {
return nil, err
}
}
args["from"] = arg4
var arg5 time.Time
if tmp, ok := rawArgs["to"]; ok {
ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("to"))
arg5, err = ec.unmarshalNTime2timeᚐTime(ctx, tmp)
if err != nil {
return nil, err
}
}
args["from"] = arg5
var arg6 time.Time
if tmp, ok := rawArgs["to"]; ok {
ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("to"))
arg6, err = ec.unmarshalNTime2timeᚐTime(ctx, tmp)
if err != nil {
return nil, err
}
}
args["to"] = arg6
args["to"] = arg5
return args, nil
}
@ -4835,6 +4884,41 @@ func (ec *executionContext) _NodeMetrics_host(ctx context.Context, field graphql
return ec.marshalNString2string(ctx, field.Selections, res)
}
func (ec *executionContext) _NodeMetrics_subCluster(ctx context.Context, field graphql.CollectedField, obj *model.NodeMetrics) (ret graphql.Marshaler) {
defer func() {
if r := recover(); r != nil {
ec.Error(ctx, ec.Recover(ctx, r))
ret = graphql.Null
}
}()
fc := &graphql.FieldContext{
Object: "NodeMetrics",
Field: field,
Args: nil,
IsMethod: false,
IsResolver: false,
}
ctx = graphql.WithFieldContext(ctx, fc)
resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) {
ctx = rctx // use context from middleware stack in children
return obj.SubCluster, nil
})
if err != nil {
ec.Error(ctx, err)
return graphql.Null
}
if resTmp == nil {
if !graphql.HasFieldError(ctx, fc) {
ec.Errorf(ctx, "must not be null")
}
return graphql.Null
}
res := resTmp.(string)
fc.Result = res
return ec.marshalNString2string(ctx, field.Selections, res)
}
func (ec *executionContext) _NodeMetrics_metrics(ctx context.Context, field graphql.CollectedField, obj *model.NodeMetrics) (ret graphql.Marshaler) {
defer func() {
if r := recover(); r != nil {
@ -4979,6 +5063,48 @@ func (ec *executionContext) _Query_user(ctx context.Context, field graphql.Colle
return ec.marshalOUser2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋgraphᚋmodelᚐUser(ctx, field.Selections, res)
}
func (ec *executionContext) _Query_allocatedNodes(ctx context.Context, field graphql.CollectedField) (ret graphql.Marshaler) {
defer func() {
if r := recover(); r != nil {
ec.Error(ctx, ec.Recover(ctx, r))
ret = graphql.Null
}
}()
fc := &graphql.FieldContext{
Object: "Query",
Field: field,
Args: nil,
IsMethod: true,
IsResolver: true,
}
ctx = graphql.WithFieldContext(ctx, fc)
rawArgs := field.ArgumentMap(ec.Variables)
args, err := ec.field_Query_allocatedNodes_args(ctx, rawArgs)
if err != nil {
ec.Error(ctx, err)
return graphql.Null
}
fc.Args = args
resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) {
ctx = rctx // use context from middleware stack in children
return ec.resolvers.Query().AllocatedNodes(rctx, args["cluster"].(string))
})
if err != nil {
ec.Error(ctx, err)
return graphql.Null
}
if resTmp == nil {
if !graphql.HasFieldError(ctx, fc) {
ec.Errorf(ctx, "must not be null")
}
return graphql.Null
}
res := resTmp.([]*model.Count)
fc.Result = res
return ec.marshalNCount2ᚕᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋgraphᚋmodelᚐCountᚄ(ctx, field.Selections, res)
}
func (ec *executionContext) _Query_job(ctx context.Context, field graphql.CollectedField) (ret graphql.Marshaler) {
defer func() {
if r := recover(); r != nil {
@ -5208,7 +5334,7 @@ func (ec *executionContext) _Query_jobsCount(ctx context.Context, field graphql.
fc.Args = args
resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) {
ctx = rctx // use context from middleware stack in children
return ec.resolvers.Query().JobsCount(rctx, args["filter"].([]*model.JobFilter), args["groupBy"].(model.Aggregate), args["limit"].(*int))
return ec.resolvers.Query().JobsCount(rctx, args["filter"].([]*model.JobFilter), args["groupBy"].(model.Aggregate), args["weight"].(*model.Weights), args["limit"].(*int))
})
if err != nil {
ec.Error(ctx, err)
@ -5292,7 +5418,7 @@ func (ec *executionContext) _Query_nodeMetrics(ctx context.Context, field graphq
fc.Args = args
resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) {
ctx = rctx // use context from middleware stack in children
return ec.resolvers.Query().NodeMetrics(rctx, args["cluster"].(string), args["partition"].(*string), args["nodes"].([]string), args["scopes"].([]schema.MetricScope), args["metrics"].([]string), args["from"].(time.Time), args["to"].(time.Time))
return ec.resolvers.Query().NodeMetrics(rctx, args["cluster"].(string), args["nodes"].([]string), args["scopes"].([]schema.MetricScope), args["metrics"].([]string), args["from"].(time.Time), args["to"].(time.Time))
})
if err != nil {
ec.Error(ctx, err)
@ -5820,6 +5946,41 @@ func (ec *executionContext) _SubCluster_nodes(ctx context.Context, field graphql
return ec.marshalNString2string(ctx, field.Selections, res)
}
func (ec *executionContext) _SubCluster_numberOfNodes(ctx context.Context, field graphql.CollectedField, obj *model.SubCluster) (ret graphql.Marshaler) {
defer func() {
if r := recover(); r != nil {
ec.Error(ctx, ec.Recover(ctx, r))
ret = graphql.Null
}
}()
fc := &graphql.FieldContext{
Object: "SubCluster",
Field: field,
Args: nil,
IsMethod: false,
IsResolver: false,
}
ctx = graphql.WithFieldContext(ctx, fc)
resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) {
ctx = rctx // use context from middleware stack in children
return obj.NumberOfNodes, nil
})
if err != nil {
ec.Error(ctx, err)
return graphql.Null
}
if resTmp == nil {
if !graphql.HasFieldError(ctx, fc) {
ec.Errorf(ctx, "must not be null")
}
return graphql.Null
}
res := resTmp.(int)
fc.Result = res
return ec.marshalNInt2int(ctx, field.Selections, res)
}
func (ec *executionContext) _SubCluster_processorType(ctx context.Context, field graphql.CollectedField, obj *model.SubCluster) (ret graphql.Marshaler) {
defer func() {
if r := recover(); r != nil {
@ -8782,6 +8943,11 @@ func (ec *executionContext) _NodeMetrics(ctx context.Context, sel ast.SelectionS
if out.Values[i] == graphql.Null {
invalids++
}
case "subCluster":
out.Values[i] = ec._NodeMetrics_subCluster(ctx, field, obj)
if out.Values[i] == graphql.Null {
invalids++
}
case "metrics":
out.Values[i] = ec._NodeMetrics_metrics(ctx, field, obj)
if out.Values[i] == graphql.Null {
@ -8852,6 +9018,20 @@ func (ec *executionContext) _Query(ctx context.Context, sel ast.SelectionSet) gr
res = ec._Query_user(ctx, field)
return res
})
case "allocatedNodes":
field := field
out.Concurrently(i, func() (res graphql.Marshaler) {
defer func() {
if r := recover(); r != nil {
ec.Error(ctx, ec.Recover(ctx, r))
}
}()
res = ec._Query_allocatedNodes(ctx, field)
if res == graphql.Null {
atomic.AddUint32(&invalids, 1)
}
return res
})
case "job":
field := field
out.Concurrently(i, func() (res graphql.Marshaler) {
@ -9100,6 +9280,11 @@ func (ec *executionContext) _SubCluster(ctx context.Context, sel ast.SelectionSe
if out.Values[i] == graphql.Null {
invalids++
}
case "numberOfNodes":
out.Values[i] = ec._SubCluster_numberOfNodes(ctx, field, obj)
if out.Values[i] == graphql.Null {
invalids++
}
case "processorType":
out.Values[i] = ec._SubCluster_processorType(ctx, field, obj)
if out.Values[i] == graphql.Null {
@ -11343,6 +11528,22 @@ func (ec *executionContext) marshalOUser2ᚖgithubᚗcomᚋClusterCockpitᚋcc
return ec._User(ctx, sel, v)
}
func (ec *executionContext) unmarshalOWeights2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋgraphᚋmodelᚐWeights(ctx context.Context, v interface{}) (*model.Weights, error) {
if v == nil {
return nil, nil
}
var res = new(model.Weights)
err := res.UnmarshalGQL(v)
return res, graphql.ErrorOnPath(ctx, err)
}
func (ec *executionContext) marshalOWeights2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋgraphᚋmodelᚐWeights(ctx context.Context, sel ast.SelectionSet, v *model.Weights) graphql.Marshaler {
if v == nil {
return graphql.Null
}
return v
}
func (ec *executionContext) marshalO__EnumValue2ᚕgithubᚗcomᚋ99designsᚋgqlgenᚋgraphqlᚋintrospectionᚐEnumValueᚄ(ctx context.Context, sel ast.SelectionSet, v []introspection.EnumValue) graphql.Marshaler {
if v == nil {
return graphql.Null

View File

@ -115,6 +115,7 @@ type MetricFootprints struct {
type NodeMetrics struct {
Host string `json:"host"`
SubCluster string `json:"subCluster"`
Metrics []*JobMetricWithName `json:"metrics"`
}
@ -138,6 +139,7 @@ type StringInput struct {
type SubCluster struct {
Name string `json:"name"`
Nodes string `json:"nodes"`
NumberOfNodes int `json:"numberOfNodes"`
ProcessorType string `json:"processorType"`
SocketsPerNode int `json:"socketsPerNode"`
CoresPerSocket int `json:"coresPerSocket"`
@ -256,3 +258,44 @@ func (e *SortDirectionEnum) UnmarshalGQL(v interface{}) error {
func (e SortDirectionEnum) MarshalGQL(w io.Writer) {
fmt.Fprint(w, strconv.Quote(e.String()))
}
type Weights string
const (
WeightsNodeCount Weights = "NODE_COUNT"
WeightsNodeHours Weights = "NODE_HOURS"
)
var AllWeights = []Weights{
WeightsNodeCount,
WeightsNodeHours,
}
func (e Weights) IsValid() bool {
switch e {
case WeightsNodeCount, WeightsNodeHours:
return true
}
return false
}
func (e Weights) String() string {
return string(e)
}
func (e *Weights) UnmarshalGQL(v interface{}) error {
str, ok := v.(string)
if !ok {
return fmt.Errorf("enums must be strings")
}
*e = Weights(str)
if !e.IsValid() {
return fmt.Errorf("%s is not a valid Weights", str)
}
return nil
}
func (e Weights) MarshalGQL(w io.Writer) {
fmt.Fprint(w, strconv.Quote(e.String()))
}

View File

@ -42,6 +42,7 @@ type Cluster {
type SubCluster {
name: String!
nodes: String!
numberOfNodes: Int!
processorType: String!
socketsPerNode: Int!
coresPerSocket: Int!
@ -135,9 +136,11 @@ type Footprints {
}
enum Aggregate { USER, PROJECT, CLUSTER }
enum Weights { NODE_COUNT, NODE_HOURS }
type NodeMetrics {
host: String!
subCluster: String!
metrics: [JobMetricWithName!]!
}
@ -157,6 +160,7 @@ type Query {
tags: [Tag!]! # List of all tags
user(username: String!): User
allocatedNodes(cluster: String!): [Count!]!
job(id: ID!): Job
jobMetrics(id: ID!, metrics: [String!], scopes: [MetricScope!]): [JobMetricWithName!]!
@ -164,11 +168,11 @@ type Query {
jobs(filter: [JobFilter!], page: PageRequest, order: OrderByInput): JobResultList!
jobsStatistics(filter: [JobFilter!], groupBy: Aggregate): [JobsStatistics!]!
jobsCount(filter: [JobFilter]!, groupBy: Aggregate!, limit: Int): [Count!]!
jobsCount(filter: [JobFilter]!, groupBy: Aggregate!, weight: Weights, limit: Int): [Count!]!
rooflineHeatmap(filter: [JobFilter!]!, rows: Int!, cols: Int!, minX: Float!, minY: Float!, maxX: Float!, maxY: Float!): [[Float!]!]!
nodeMetrics(cluster: String!, partition: String, nodes: [String!], scopes: [MetricScope!], metrics: [String!], from: Time!, to: Time!): [NodeMetrics!]!
nodeMetrics(cluster: String!, nodes: [String!], scopes: [MetricScope!], metrics: [String!], from: Time!, to: Time!): [NodeMetrics!]!
}
type Mutation {

View File

@ -110,6 +110,23 @@ func (r *queryResolver) User(ctx context.Context, username string) (*model.User,
return auth.FetchUser(ctx, r.DB, username)
}
func (r *queryResolver) AllocatedNodes(ctx context.Context, cluster string) ([]*model.Count, error) {
data, err := r.Repo.AllocatedNodes(cluster)
if err != nil {
return nil, err
}
counts := make([]*model.Count, 0, len(data))
for subcluster, hosts := range data {
counts = append(counts, &model.Count{
Name: subcluster,
Count: len(hosts),
})
}
return counts, nil
}
func (r *queryResolver) Job(ctx context.Context, id string) (*schema.Job, error) {
numericId, err := strconv.ParseInt(id, 10, 64)
if err != nil {
@ -185,8 +202,8 @@ func (r *queryResolver) JobsStatistics(ctx context.Context, filter []*model.JobF
return r.jobsStatistics(ctx, filter, groupBy)
}
func (r *queryResolver) JobsCount(ctx context.Context, filter []*model.JobFilter, groupBy model.Aggregate, limit *int) ([]*model.Count, error) {
counts, err := r.Repo.CountGroupedJobs(ctx, groupBy, filter, limit)
func (r *queryResolver) JobsCount(ctx context.Context, filter []*model.JobFilter, groupBy model.Aggregate, weight *model.Weights, limit *int) ([]*model.Count, error) {
counts, err := r.Repo.CountGroupedJobs(ctx, groupBy, filter, weight, limit)
if err != nil {
return nil, err
}
@ -205,23 +222,19 @@ func (r *queryResolver) RooflineHeatmap(ctx context.Context, filter []*model.Job
return r.rooflineHeatmap(ctx, filter, rows, cols, minX, minY, maxX, maxY)
}
func (r *queryResolver) NodeMetrics(ctx context.Context, cluster string, partition *string, nodes []string, scopes []schema.MetricScope, metrics []string, from time.Time, to time.Time) ([]*model.NodeMetrics, error) {
func (r *queryResolver) NodeMetrics(ctx context.Context, cluster string, nodes []string, scopes []schema.MetricScope, metrics []string, from time.Time, to time.Time) ([]*model.NodeMetrics, error) {
user := auth.GetUser(ctx)
if user != nil && !user.HasRole(auth.RoleAdmin) {
return nil, errors.New("you need to be an administrator for this query")
}
if partition == nil {
partition = new(string)
}
if metrics == nil {
for _, mc := range config.GetCluster(cluster).MetricConfig {
metrics = append(metrics, mc.Name)
}
}
data, err := metricdata.LoadNodeData(cluster, *partition, metrics, nodes, scopes, from, to, ctx)
data, err := metricdata.LoadNodeData(cluster, metrics, nodes, scopes, from, to, ctx)
if err != nil {
return nil, err
}
@ -232,6 +245,7 @@ func (r *queryResolver) NodeMetrics(ctx context.Context, cluster string, partiti
Host: hostname,
Metrics: make([]*model.JobMetricWithName, 0, len(metrics)*len(scopes)),
}
host.SubCluster, _ = config.GetSubClusterByNode(cluster, hostname)
for metric, scopedMetrics := range metrics {
for _, scopedMetric := range scopedMetrics {

View File

@ -79,7 +79,7 @@ func (ccms *CCMetricStore) Init(rawConfig json.RawMessage) error {
ccms.queryEndpoint = fmt.Sprintf("%s/api/query", config.Url)
ccms.jwt = config.Token
ccms.client = http.Client{
Timeout: 5 * time.Second,
Timeout: 10 * time.Second,
}
if config.Renamings != nil {
@ -515,7 +515,7 @@ func (ccms *CCMetricStore) LoadStats(job *schema.Job, metrics []string, ctx cont
}
// TODO: Support sub-node-scope metrics! For this, the partition of a node needs to be known!
func (ccms *CCMetricStore) LoadNodeData(cluster, partition string, metrics, nodes []string, scopes []schema.MetricScope, from, to time.Time, ctx context.Context) (map[string]map[string][]*schema.JobMetric, error) {
func (ccms *CCMetricStore) LoadNodeData(cluster string, metrics, nodes []string, scopes []schema.MetricScope, from, to time.Time, ctx context.Context) (map[string]map[string][]*schema.JobMetric, error) {
req := ApiQueryRequest{
Cluster: cluster,
From: from.Unix(),

View File

@ -24,7 +24,7 @@ type MetricDataRepository interface {
LoadStats(job *schema.Job, metrics []string, ctx context.Context) (map[string]map[string]schema.MetricStatistics, error)
// Return a map of hosts to a map of metrics at the requested scopes for that node.
LoadNodeData(cluster, partition string, metrics, nodes []string, scopes []schema.MetricScope, from, to time.Time, ctx context.Context) (map[string]map[string][]*schema.JobMetric, error)
LoadNodeData(cluster string, metrics, nodes []string, scopes []schema.MetricScope, from, to time.Time, ctx context.Context) (map[string]map[string][]*schema.JobMetric, error)
}
var metricDataRepos map[string]MetricDataRepository = map[string]MetricDataRepository{}
@ -180,7 +180,7 @@ func LoadAverages(job *schema.Job, metrics []string, data [][]schema.Float, ctx
}
// Used for the node/system view. Returns a map of nodes to a map of metrics.
func LoadNodeData(cluster, partition string, metrics, nodes []string, scopes []schema.MetricScope, from, to time.Time, ctx context.Context) (map[string]map[string][]*schema.JobMetric, error) {
func LoadNodeData(cluster string, metrics, nodes []string, scopes []schema.MetricScope, from, to time.Time, ctx context.Context) (map[string]map[string][]*schema.JobMetric, error) {
repo, ok := metricDataRepos[cluster]
if !ok {
return nil, fmt.Errorf("no metric data repository configured for '%s'", cluster)
@ -192,7 +192,7 @@ func LoadNodeData(cluster, partition string, metrics, nodes []string, scopes []s
}
}
data, err := repo.LoadNodeData(cluster, partition, metrics, nodes, scopes, from, to, ctx)
data, err := repo.LoadNodeData(cluster, metrics, nodes, scopes, from, to, ctx)
if err != nil {
if len(data) != 0 {
log.Errorf("partial error: %s", err.Error())

View File

@ -27,6 +27,6 @@ func (tmdr *TestMetricDataRepository) LoadStats(job *schema.Job, metrics []strin
panic("TODO")
}
func (tmdr *TestMetricDataRepository) LoadNodeData(cluster, partition string, metrics, nodes []string, scopes []schema.MetricScope, from, to time.Time, ctx context.Context) (map[string]map[string][]*schema.JobMetric, error) {
func (tmdr *TestMetricDataRepository) LoadNodeData(cluster string, metrics, nodes []string, scopes []schema.MetricScope, from, to time.Time, ctx context.Context) (map[string]map[string][]*schema.JobMetric, error) {
panic("TODO")
}

View File

@ -193,12 +193,25 @@ func (r *JobRepository) Stop(
}
// TODO: Use node hours instead: SELECT job.user, sum(job.num_nodes * (CASE WHEN job.job_state = "running" THEN CAST(strftime('%s', 'now') AS INTEGER) - job.start_time ELSE job.duration END)) as x FROM job GROUP BY user ORDER BY x DESC;
func (r *JobRepository) CountGroupedJobs(ctx context.Context, aggreg model.Aggregate, filters []*model.JobFilter, limit *int) (map[string]int, error) {
func (r *JobRepository) CountGroupedJobs(ctx context.Context, aggreg model.Aggregate, filters []*model.JobFilter, weight *model.Weights, limit *int) (map[string]int, error) {
if !aggreg.IsValid() {
return nil, errors.New("invalid aggregate")
}
q := sq.Select("job."+string(aggreg), "count(*) as count").From("job").GroupBy("job." + string(aggreg)).OrderBy("count DESC")
runner := (sq.BaseRunner)(r.stmtCache)
count := "count(*) as count"
if weight != nil {
switch *weight {
case model.WeightsNodeCount:
count = "sum(job.num_nodes) as count"
case model.WeightsNodeHours:
now := time.Now().Unix()
count = fmt.Sprintf(`sum(job.num_nodes * (CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END)) as count`, now)
runner = r.DB
}
}
q := sq.Select("job."+string(aggreg), count).From("job").GroupBy("job." + string(aggreg)).OrderBy("count DESC")
q = SecurityCheck(ctx, q)
for _, f := range filters {
q = BuildWhereClause(f, q)
@ -208,7 +221,7 @@ func (r *JobRepository) CountGroupedJobs(ctx context.Context, aggreg model.Aggre
}
counts := map[string]int{}
rows, err := q.RunWith(r.DB).Query()
rows, err := q.RunWith(runner).Query()
if err != nil {
return nil, err
}
@ -318,3 +331,42 @@ func (r *JobRepository) Partitions(cluster string) ([]string, error) {
}
return partitions.([]string), nil
}
// AllocatedNodes returns a map of all subclusters to a map of hostnames to the amount of jobs running on that host.
// Hosts with zero jobs running on them will not show up!
func (r *JobRepository) AllocatedNodes(cluster string) (map[string]map[string]int, error) {
subclusters := make(map[string]map[string]int)
rows, err := sq.Select("resources", "subcluster").From("job").
Where("job.job_state = 'running'").
Where("job.cluster = ?", cluster).
RunWith(r.stmtCache).Query()
if err != nil {
return nil, err
}
var raw []byte
defer rows.Close()
for rows.Next() {
raw = raw[0:0]
var resources []*schema.Resource
var subcluster string
if err := rows.Scan(&raw, &subcluster); err != nil {
return nil, err
}
if err := json.Unmarshal(raw, &resources); err != nil {
return nil, err
}
hosts, ok := subclusters[subcluster]
if !ok {
hosts = make(map[string]int)
subclusters[subcluster] = hosts
}
for _, resource := range resources {
hosts[resource.Hostname] += 1
}
}
return subclusters, nil
}

View File

@ -1,6 +1,7 @@
package main
import (
"fmt"
"net/http"
"net/url"
"strconv"
@ -39,6 +40,7 @@ var routes []Route = []Route{
{"/monitoring/systems/{cluster}", "monitoring/systems.tmpl", "Cluster <ID> - ClusterCockpit", false, setupClusterRoute},
{"/monitoring/node/{cluster}/{hostname}", "monitoring/node.tmpl", "Node <ID> - ClusterCockpit", false, setupNodeRoute},
{"/monitoring/analysis/{cluster}", "monitoring/analysis.tmpl", "Analaysis - ClusterCockpit", true, setupAnalysisRoute},
{"/monitoring/status/{cluster}", "monitoring/status.tmpl", "Status of <ID> - ClusterCockpit", false, setupClusterRoute},
}
func setupHomeRoute(i InfoType, r *http.Request) InfoType {
@ -51,22 +53,21 @@ func setupHomeRoute(i InfoType, r *http.Request) InfoType {
runningJobs, err := jobRepo.CountGroupedJobs(r.Context(), model.AggregateCluster, []*model.JobFilter{{
State: []schema.JobState{schema.JobStateRunning},
}}, nil)
}}, nil, nil)
if err != nil {
log.Errorf("failed to count jobs: %s", err.Error())
runningJobs = map[string]int{}
}
totalJobs, err := jobRepo.CountGroupedJobs(r.Context(), model.AggregateCluster, nil, nil)
totalJobs, err := jobRepo.CountGroupedJobs(r.Context(), model.AggregateCluster, nil, nil, nil)
if err != nil {
log.Errorf("failed to count jobs: %s", err.Error())
totalJobs = map[string]int{}
}
from := time.Now().Add(-24 * time.Hour)
recentShortJobs, err := jobRepo.CountGroupedJobs(r.Context(), model.AggregateCluster, []*model.JobFilter{{
StartTime: &model.TimeRange{From: &from, To: nil},
Duration: &model.IntRange{From: 0, To: graph.ShortJobDuration},
}}, nil)
}}, nil, nil)
if err != nil {
log.Errorf("failed to count jobs: %s", err.Error())
recentShortJobs = map[string]int{}
@ -118,6 +119,7 @@ func setupNodeRoute(i InfoType, r *http.Request) InfoType {
vars := mux.Vars(r)
i["cluster"] = vars["cluster"]
i["hostname"] = vars["hostname"]
i["id"] = fmt.Sprintf("%s (%s)", vars["cluster"], vars["hostname"])
from, to := r.URL.Query().Get("from"), r.URL.Query().Get("to")
if from != "" || to != "" {
i["from"] = from

View File

@ -13,6 +13,7 @@ import (
"net/url"
"os"
"os/signal"
"runtime/debug"
"strings"
"sync"
"syscall"
@ -476,6 +477,9 @@ func main() {
api.OngoingArchivings.Wait()
}()
if os.Getenv("GOGC") == "" {
debug.SetGCPercent(25)
}
systemdNotifiy(true, "running")
wg.Wait()
log.Print("Gracefull shutdown completed!")

View File

@ -0,0 +1,14 @@
{{define "content"}}
<div id="svelte-app"></div>
{{end}}
{{define "stylesheets"}}
<link rel='stylesheet' href='/build/status.css'>
{{end}}
{{define "javascript"}}
<script>
const infos = {{ .Infos }};
const clusterCockpitConfig = {{ .Config }};
</script>
<script src='/build/status.js'></script>
{{end}}

View File

@ -48,6 +48,7 @@ func init() {
"monitoring/list.tmpl",
"monitoring/user.tmpl",
"monitoring/systems.tmpl",
"monitoring/status.tmpl",
"monitoring/node.tmpl",
"monitoring/analysis.tmpl",
}

View File

@ -1,17 +0,0 @@
[Unit]
Description=ClusterCockpit Web Server (Go edition)
Documentation=https://github.com/ClusterCockpit/cc-backend
Wants=network-online.target
After=network-online.target
After=mariadb.service mysql.service
[Service]
WorkingDirectory=/var/clustercockpit
Type=notify
NotifyAccess=all
Restart=on-failure
TimeoutStopSec=100
ExecStart=/var/clustercockpit/cc-backend --config ./config.json
[Install]
WantedBy=multi-user.target

View File

@ -1,9 +0,0 @@
{
"addr": "0.0.0.0:443",
"https-cert-file": "/etc/letsencrypt/live/<...>/fullchain.pem",
"https-key-file": "/etc/letsencrypt/live/<...>/privkey.pem",
"db-driver": "mysql",
"db": "clustercockpit:password@/cc_backend",
"user": "clustercockpit",
"group": "clustercockpit"
}