From f5f36427a45d082cb92df72a79566ea8efbe75d0 Mon Sep 17 00:00:00 2001 From: Christoph Kluge <christoph.kluge@fau.de> Date: Thu, 13 Mar 2025 17:33:55 +0100 Subject: [PATCH 1/5] split statsTable data from jobMetrics query, initial commit - mainly backend changes - statstable changes only for prototyping --- api/schema.graphqls | 39 +- internal/graph/generated/generated.go | 1326 +++++++++++++---- internal/graph/model/models_gen.go | 22 +- internal/graph/schema.resolvers.go | 56 +- internal/metricDataDispatcher/dataLoader.go | 34 +- internal/metricdata/cc-metric-store.go | 93 +- internal/metricdata/influxdb-v2.go | 12 + internal/metricdata/metricdata.go | 5 +- internal/metricdata/prometheus.go | 14 +- internal/metricdata/utils.go | 12 +- pkg/archive/archive.go | 20 +- pkg/archive/fsBackend.go | 46 + pkg/archive/json.go | 37 + pkg/schema/metrics.go | 7 + web/frontend/src/Job.root.svelte | 44 +- web/frontend/src/job/Metric.svelte | 5 - web/frontend/src/job/StatsTable.svelte | 95 +- web/frontend/src/job/StatsTableEntry.svelte | 18 +- .../job/jobsummary/JobFootprintPolar.svelte | 12 +- 19 files changed, 1471 insertions(+), 426 deletions(-) diff --git a/api/schema.graphqls b/api/schema.graphqls index 9385a6f..ed8843c 100644 --- a/api/schema.graphqls +++ b/api/schema.graphqls @@ -137,11 +137,6 @@ type JobMetricWithName { metric: JobMetric! } -type JobMetricStatWithName { - name: String! - stats: MetricStatistics! -} - type JobMetric { unit: Unit timestep: Int! @@ -156,6 +151,30 @@ type Series { data: [NullableFloat!]! } +type StatsSeries { + mean: [NullableFloat!]! + median: [NullableFloat!]! + min: [NullableFloat!]! + max: [NullableFloat!]! +} + +type JobStatsWithScope { + name: String! + scope: MetricScope! + stats: [ScopedStats!]! +} + +type ScopedStats { + hostname: String! + id: String + data: MetricStatistics! +} + +type JobStats { + name: String! + stats: MetricStatistics! +} + type Unit { base: String! prefix: String @@ -167,13 +186,6 @@ type MetricStatistics { max: Float! } -type StatsSeries { - mean: [NullableFloat!]! - median: [NullableFloat!]! - min: [NullableFloat!]! - max: [NullableFloat!]! -} - type MetricFootprints { metric: String! data: [NullableFloat!]! @@ -247,7 +259,8 @@ type Query { job(id: ID!): Job jobMetrics(id: ID!, metrics: [String!], scopes: [MetricScope!], resolution: Int): [JobMetricWithName!]! - jobMetricStats(id: ID!, metrics: [String!]): [JobMetricStatWithName!]! + jobStats(id: ID!, metrics: [String!]): [JobStats!]! + scopedJobStats(id: ID!, metrics: [String!], scopes: [MetricScope!]): [JobStatsWithScope!]! jobsFootprints(filter: [JobFilter!], metrics: [String!]!): Footprints jobs(filter: [JobFilter!], page: PageRequest, order: OrderByInput): JobResultList! diff --git a/internal/graph/generated/generated.go b/internal/graph/generated/generated.go index b4c6e19..e5c9ca2 100644 --- a/internal/graph/generated/generated.go +++ b/internal/graph/generated/generated.go @@ -156,11 +156,6 @@ type ComplexityRoot struct { Unit func(childComplexity int) int } - JobMetricStatWithName struct { - Name func(childComplexity int) int - Stats func(childComplexity int) int - } - JobMetricWithName struct { Metric func(childComplexity int) int Name func(childComplexity int) int @@ -175,6 +170,17 @@ type ComplexityRoot struct { Offset func(childComplexity int) int } + JobStats struct { + Name func(childComplexity int) int + Stats func(childComplexity int) int + } + + JobStatsWithScope struct { + Name func(childComplexity int) int + Scope func(childComplexity int) int + Stats func(childComplexity int) int + } + JobsStatistics struct { HistDuration func(childComplexity int) int HistMetrics func(childComplexity int) int @@ -268,14 +274,15 @@ type ComplexityRoot struct { Clusters func(childComplexity int) int GlobalMetrics func(childComplexity int) int Job func(childComplexity int, id string) int - JobMetricStats func(childComplexity int, id string, metrics []string) int JobMetrics func(childComplexity int, id string, metrics []string, scopes []schema.MetricScope, resolution *int) int + JobStats func(childComplexity int, id string, metrics []string) int Jobs func(childComplexity int, filter []*model.JobFilter, page *model.PageRequest, order *model.OrderByInput) int JobsFootprints func(childComplexity int, filter []*model.JobFilter, metrics []string) int JobsStatistics func(childComplexity int, filter []*model.JobFilter, metrics []string, page *model.PageRequest, sortBy *model.SortByAggregate, groupBy *model.Aggregate, numDurationBins *string, numMetricBins *int) int NodeMetrics func(childComplexity int, cluster string, nodes []string, scopes []schema.MetricScope, metrics []string, from time.Time, to time.Time) int NodeMetricsList func(childComplexity int, cluster string, subCluster string, nodeFilter string, scopes []schema.MetricScope, metrics []string, from time.Time, to time.Time, page *model.PageRequest, resolution *int) int RooflineHeatmap func(childComplexity int, filter []*model.JobFilter, rows int, cols int, minX float64, minY float64, maxX float64, maxY float64) int + ScopedJobStats func(childComplexity int, id string, metrics []string, scopes []schema.MetricScope) int Tags func(childComplexity int) int User func(childComplexity int, username string) int } @@ -287,6 +294,12 @@ type ComplexityRoot struct { Hostname func(childComplexity int) int } + ScopedStats struct { + Data func(childComplexity int) int + Hostname func(childComplexity int) int + ID func(childComplexity int) int + } + Series struct { Data func(childComplexity int) int Hostname func(childComplexity int) int @@ -396,7 +409,8 @@ type QueryResolver interface { AllocatedNodes(ctx context.Context, cluster string) ([]*model.Count, error) Job(ctx context.Context, id string) (*schema.Job, error) JobMetrics(ctx context.Context, id string, metrics []string, scopes []schema.MetricScope, resolution *int) ([]*model.JobMetricWithName, error) - JobMetricStats(ctx context.Context, id string, metrics []string) ([]*model.JobMetricStatWithName, error) + JobStats(ctx context.Context, id string, metrics []string) ([]*model.JobStats, error) + ScopedJobStats(ctx context.Context, id string, metrics []string, scopes []schema.MetricScope) ([]*model.JobStatsWithScope, error) JobsFootprints(ctx context.Context, filter []*model.JobFilter, metrics []string) (*model.Footprints, error) Jobs(ctx context.Context, filter []*model.JobFilter, page *model.PageRequest, order *model.OrderByInput) (*model.JobResultList, error) JobsStatistics(ctx context.Context, filter []*model.JobFilter, metrics []string, page *model.PageRequest, sortBy *model.SortByAggregate, groupBy *model.Aggregate, numDurationBins *string, numMetricBins *int) ([]*model.JobsStatistics, error) @@ -861,20 +875,6 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in return e.complexity.JobMetric.Unit(childComplexity), true - case "JobMetricStatWithName.name": - if e.complexity.JobMetricStatWithName.Name == nil { - break - } - - return e.complexity.JobMetricStatWithName.Name(childComplexity), true - - case "JobMetricStatWithName.stats": - if e.complexity.JobMetricStatWithName.Stats == nil { - break - } - - return e.complexity.JobMetricStatWithName.Stats(childComplexity), true - case "JobMetricWithName.metric": if e.complexity.JobMetricWithName.Metric == nil { break @@ -931,6 +931,41 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in return e.complexity.JobResultList.Offset(childComplexity), true + case "JobStats.name": + if e.complexity.JobStats.Name == nil { + break + } + + return e.complexity.JobStats.Name(childComplexity), true + + case "JobStats.stats": + if e.complexity.JobStats.Stats == nil { + break + } + + return e.complexity.JobStats.Stats(childComplexity), true + + case "JobStatsWithScope.name": + if e.complexity.JobStatsWithScope.Name == nil { + break + } + + return e.complexity.JobStatsWithScope.Name(childComplexity), true + + case "JobStatsWithScope.scope": + if e.complexity.JobStatsWithScope.Scope == nil { + break + } + + return e.complexity.JobStatsWithScope.Scope(childComplexity), true + + case "JobStatsWithScope.stats": + if e.complexity.JobStatsWithScope.Stats == nil { + break + } + + return e.complexity.JobStatsWithScope.Stats(childComplexity), true + case "JobsStatistics.histDuration": if e.complexity.JobsStatistics.HistDuration == nil { break @@ -1400,18 +1435,6 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in return e.complexity.Query.Job(childComplexity, args["id"].(string)), true - case "Query.jobMetricStats": - if e.complexity.Query.JobMetricStats == nil { - break - } - - args, err := ec.field_Query_jobMetricStats_args(context.TODO(), rawArgs) - if err != nil { - return 0, false - } - - return e.complexity.Query.JobMetricStats(childComplexity, args["id"].(string), args["metrics"].([]string)), true - case "Query.jobMetrics": if e.complexity.Query.JobMetrics == nil { break @@ -1424,6 +1447,18 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in return e.complexity.Query.JobMetrics(childComplexity, args["id"].(string), args["metrics"].([]string), args["scopes"].([]schema.MetricScope), args["resolution"].(*int)), true + case "Query.jobStats": + if e.complexity.Query.JobStats == nil { + break + } + + args, err := ec.field_Query_jobStats_args(context.TODO(), rawArgs) + if err != nil { + return 0, false + } + + return e.complexity.Query.JobStats(childComplexity, args["id"].(string), args["metrics"].([]string)), true + case "Query.jobs": if e.complexity.Query.Jobs == nil { break @@ -1496,6 +1531,18 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in return e.complexity.Query.RooflineHeatmap(childComplexity, args["filter"].([]*model.JobFilter), args["rows"].(int), args["cols"].(int), args["minX"].(float64), args["minY"].(float64), args["maxX"].(float64), args["maxY"].(float64)), true + case "Query.scopedJobStats": + if e.complexity.Query.ScopedJobStats == nil { + break + } + + args, err := ec.field_Query_scopedJobStats_args(context.TODO(), rawArgs) + if err != nil { + return 0, false + } + + return e.complexity.Query.ScopedJobStats(childComplexity, args["id"].(string), args["metrics"].([]string), args["scopes"].([]schema.MetricScope)), true + case "Query.tags": if e.complexity.Query.Tags == nil { break @@ -1543,6 +1590,27 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in return e.complexity.Resource.Hostname(childComplexity), true + case "ScopedStats.data": + if e.complexity.ScopedStats.Data == nil { + break + } + + return e.complexity.ScopedStats.Data(childComplexity), true + + case "ScopedStats.hostname": + if e.complexity.ScopedStats.Hostname == nil { + break + } + + return e.complexity.ScopedStats.Hostname(childComplexity), true + + case "ScopedStats.id": + if e.complexity.ScopedStats.ID == nil { + break + } + + return e.complexity.ScopedStats.ID(childComplexity), true + case "Series.data": if e.complexity.Series.Data == nil { break @@ -2131,11 +2199,6 @@ type JobMetricWithName { metric: JobMetric! } -type JobMetricStatWithName { - name: String! - stats: MetricStatistics! -} - type JobMetric { unit: Unit timestep: Int! @@ -2150,6 +2213,30 @@ type Series { data: [NullableFloat!]! } +type StatsSeries { + mean: [NullableFloat!]! + median: [NullableFloat!]! + min: [NullableFloat!]! + max: [NullableFloat!]! +} + +type JobStatsWithScope { + name: String! + scope: MetricScope! + stats: [ScopedStats!]! +} + +type ScopedStats { + hostname: String! + id: String + data: MetricStatistics! +} + +type JobStats { + name: String! + stats: MetricStatistics! +} + type Unit { base: String! prefix: String @@ -2161,13 +2248,6 @@ type MetricStatistics { max: Float! } -type StatsSeries { - mean: [NullableFloat!]! - median: [NullableFloat!]! - min: [NullableFloat!]! - max: [NullableFloat!]! -} - type MetricFootprints { metric: String! data: [NullableFloat!]! @@ -2241,7 +2321,8 @@ type Query { job(id: ID!): Job jobMetrics(id: ID!, metrics: [String!], scopes: [MetricScope!], resolution: Int): [JobMetricWithName!]! - jobMetricStats(id: ID!, metrics: [String!]): [JobMetricStatWithName!]! + jobStats(id: ID!, metrics: [String!]): [JobStats!]! + scopedJobStats(id: ID!, metrics: [String!], scopes: [MetricScope!]): [JobStatsWithScope!]! jobsFootprints(filter: [JobFilter!], metrics: [String!]!): Footprints jobs(filter: [JobFilter!], page: PageRequest, order: OrderByInput): JobResultList! @@ -2694,57 +2775,6 @@ func (ec *executionContext) field_Query_allocatedNodes_argsCluster( return zeroVal, nil } -func (ec *executionContext) field_Query_jobMetricStats_args(ctx context.Context, rawArgs map[string]any) (map[string]any, error) { - var err error - args := map[string]any{} - arg0, err := ec.field_Query_jobMetricStats_argsID(ctx, rawArgs) - if err != nil { - return nil, err - } - args["id"] = arg0 - arg1, err := ec.field_Query_jobMetricStats_argsMetrics(ctx, rawArgs) - if err != nil { - return nil, err - } - args["metrics"] = arg1 - return args, nil -} -func (ec *executionContext) field_Query_jobMetricStats_argsID( - ctx context.Context, - rawArgs map[string]any, -) (string, error) { - if _, ok := rawArgs["id"]; !ok { - var zeroVal string - return zeroVal, nil - } - - ctx = graphql.WithPathContext(ctx, graphql.NewPathWithField("id")) - if tmp, ok := rawArgs["id"]; ok { - return ec.unmarshalNID2string(ctx, tmp) - } - - var zeroVal string - return zeroVal, nil -} - -func (ec *executionContext) field_Query_jobMetricStats_argsMetrics( - ctx context.Context, - rawArgs map[string]any, -) ([]string, error) { - if _, ok := rawArgs["metrics"]; !ok { - var zeroVal []string - return zeroVal, nil - } - - ctx = graphql.WithPathContext(ctx, graphql.NewPathWithField("metrics")) - if tmp, ok := rawArgs["metrics"]; ok { - return ec.unmarshalOString2ᚕstringᚄ(ctx, tmp) - } - - var zeroVal []string - return zeroVal, nil -} - func (ec *executionContext) field_Query_jobMetrics_args(ctx context.Context, rawArgs map[string]any) (map[string]any, error) { var err error args := map[string]any{} @@ -2842,6 +2872,57 @@ func (ec *executionContext) field_Query_jobMetrics_argsResolution( return zeroVal, nil } +func (ec *executionContext) field_Query_jobStats_args(ctx context.Context, rawArgs map[string]any) (map[string]any, error) { + var err error + args := map[string]any{} + arg0, err := ec.field_Query_jobStats_argsID(ctx, rawArgs) + if err != nil { + return nil, err + } + args["id"] = arg0 + arg1, err := ec.field_Query_jobStats_argsMetrics(ctx, rawArgs) + if err != nil { + return nil, err + } + args["metrics"] = arg1 + return args, nil +} +func (ec *executionContext) field_Query_jobStats_argsID( + ctx context.Context, + rawArgs map[string]any, +) (string, error) { + if _, ok := rawArgs["id"]; !ok { + var zeroVal string + return zeroVal, nil + } + + ctx = graphql.WithPathContext(ctx, graphql.NewPathWithField("id")) + if tmp, ok := rawArgs["id"]; ok { + return ec.unmarshalNID2string(ctx, tmp) + } + + var zeroVal string + return zeroVal, nil +} + +func (ec *executionContext) field_Query_jobStats_argsMetrics( + ctx context.Context, + rawArgs map[string]any, +) ([]string, error) { + if _, ok := rawArgs["metrics"]; !ok { + var zeroVal []string + return zeroVal, nil + } + + ctx = graphql.WithPathContext(ctx, graphql.NewPathWithField("metrics")) + if tmp, ok := rawArgs["metrics"]; ok { + return ec.unmarshalOString2ᚕstringᚄ(ctx, tmp) + } + + var zeroVal []string + return zeroVal, nil +} + func (ec *executionContext) field_Query_job_args(ctx context.Context, rawArgs map[string]any) (map[string]any, error) { var err error args := map[string]any{} @@ -3682,6 +3763,80 @@ func (ec *executionContext) field_Query_rooflineHeatmap_argsMaxY( return zeroVal, nil } +func (ec *executionContext) field_Query_scopedJobStats_args(ctx context.Context, rawArgs map[string]any) (map[string]any, error) { + var err error + args := map[string]any{} + arg0, err := ec.field_Query_scopedJobStats_argsID(ctx, rawArgs) + if err != nil { + return nil, err + } + args["id"] = arg0 + arg1, err := ec.field_Query_scopedJobStats_argsMetrics(ctx, rawArgs) + if err != nil { + return nil, err + } + args["metrics"] = arg1 + arg2, err := ec.field_Query_scopedJobStats_argsScopes(ctx, rawArgs) + if err != nil { + return nil, err + } + args["scopes"] = arg2 + return args, nil +} +func (ec *executionContext) field_Query_scopedJobStats_argsID( + ctx context.Context, + rawArgs map[string]any, +) (string, error) { + if _, ok := rawArgs["id"]; !ok { + var zeroVal string + return zeroVal, nil + } + + ctx = graphql.WithPathContext(ctx, graphql.NewPathWithField("id")) + if tmp, ok := rawArgs["id"]; ok { + return ec.unmarshalNID2string(ctx, tmp) + } + + var zeroVal string + return zeroVal, nil +} + +func (ec *executionContext) field_Query_scopedJobStats_argsMetrics( + ctx context.Context, + rawArgs map[string]any, +) ([]string, error) { + if _, ok := rawArgs["metrics"]; !ok { + var zeroVal []string + return zeroVal, nil + } + + ctx = graphql.WithPathContext(ctx, graphql.NewPathWithField("metrics")) + if tmp, ok := rawArgs["metrics"]; ok { + return ec.unmarshalOString2ᚕstringᚄ(ctx, tmp) + } + + var zeroVal []string + return zeroVal, nil +} + +func (ec *executionContext) field_Query_scopedJobStats_argsScopes( + ctx context.Context, + rawArgs map[string]any, +) ([]schema.MetricScope, error) { + if _, ok := rawArgs["scopes"]; !ok { + var zeroVal []schema.MetricScope + return zeroVal, nil + } + + ctx = graphql.WithPathContext(ctx, graphql.NewPathWithField("scopes")) + if tmp, ok := rawArgs["scopes"]; ok { + return ec.unmarshalOMetricScope2ᚕgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋpkgᚋschemaᚐMetricScopeᚄ(ctx, tmp) + } + + var zeroVal []schema.MetricScope + return zeroVal, nil +} + func (ec *executionContext) field_Query_user_args(ctx context.Context, rawArgs map[string]any) (map[string]any, error) { var err error args := map[string]any{} @@ -6663,102 +6818,6 @@ func (ec *executionContext) fieldContext_JobMetric_statisticsSeries(_ context.Co return fc, nil } -func (ec *executionContext) _JobMetricStatWithName_name(ctx context.Context, field graphql.CollectedField, obj *model.JobMetricStatWithName) (ret graphql.Marshaler) { - fc, err := ec.fieldContext_JobMetricStatWithName_name(ctx, field) - if err != nil { - return graphql.Null - } - ctx = graphql.WithFieldContext(ctx, fc) - defer func() { - if r := recover(); r != nil { - ec.Error(ctx, ec.Recover(ctx, r)) - ret = graphql.Null - } - }() - resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (any, error) { - ctx = rctx // use context from middleware stack in children - return obj.Name, nil - }) - if err != nil { - ec.Error(ctx, err) - return graphql.Null - } - if resTmp == nil { - if !graphql.HasFieldError(ctx, fc) { - ec.Errorf(ctx, "must not be null") - } - return graphql.Null - } - res := resTmp.(string) - fc.Result = res - return ec.marshalNString2string(ctx, field.Selections, res) -} - -func (ec *executionContext) fieldContext_JobMetricStatWithName_name(_ context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { - fc = &graphql.FieldContext{ - Object: "JobMetricStatWithName", - Field: field, - IsMethod: false, - IsResolver: false, - Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { - return nil, errors.New("field of type String does not have child fields") - }, - } - return fc, nil -} - -func (ec *executionContext) _JobMetricStatWithName_stats(ctx context.Context, field graphql.CollectedField, obj *model.JobMetricStatWithName) (ret graphql.Marshaler) { - fc, err := ec.fieldContext_JobMetricStatWithName_stats(ctx, field) - if err != nil { - return graphql.Null - } - ctx = graphql.WithFieldContext(ctx, fc) - defer func() { - if r := recover(); r != nil { - ec.Error(ctx, ec.Recover(ctx, r)) - ret = graphql.Null - } - }() - resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (any, error) { - ctx = rctx // use context from middleware stack in children - return obj.Stats, nil - }) - if err != nil { - ec.Error(ctx, err) - return graphql.Null - } - if resTmp == nil { - if !graphql.HasFieldError(ctx, fc) { - ec.Errorf(ctx, "must not be null") - } - return graphql.Null - } - res := resTmp.(*schema.MetricStatistics) - fc.Result = res - return ec.marshalNMetricStatistics2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋpkgᚋschemaᚐMetricStatistics(ctx, field.Selections, res) -} - -func (ec *executionContext) fieldContext_JobMetricStatWithName_stats(_ context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { - fc = &graphql.FieldContext{ - Object: "JobMetricStatWithName", - Field: field, - IsMethod: false, - IsResolver: false, - Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { - switch field.Name { - case "avg": - return ec.fieldContext_MetricStatistics_avg(ctx, field) - case "min": - return ec.fieldContext_MetricStatistics_min(ctx, field) - case "max": - return ec.fieldContext_MetricStatistics_max(ctx, field) - } - return nil, fmt.Errorf("no field named %q was found under type MetricStatistics", field.Name) - }, - } - return fc, nil -} - func (ec *executionContext) _JobMetricWithName_name(ctx context.Context, field graphql.CollectedField, obj *model.JobMetricWithName) (ret graphql.Marshaler) { fc, err := ec.fieldContext_JobMetricWithName_name(ctx, field) if err != nil { @@ -7163,6 +7222,242 @@ func (ec *executionContext) fieldContext_JobResultList_hasNextPage(_ context.Con return fc, nil } +func (ec *executionContext) _JobStats_name(ctx context.Context, field graphql.CollectedField, obj *model.JobStats) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_JobStats_name(ctx, field) + if err != nil { + return graphql.Null + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (any, error) { + ctx = rctx // use context from middleware stack in children + return obj.Name, nil + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + if !graphql.HasFieldError(ctx, fc) { + ec.Errorf(ctx, "must not be null") + } + return graphql.Null + } + res := resTmp.(string) + fc.Result = res + return ec.marshalNString2string(ctx, field.Selections, res) +} + +func (ec *executionContext) fieldContext_JobStats_name(_ context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "JobStats", + Field: field, + IsMethod: false, + IsResolver: false, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + return nil, errors.New("field of type String does not have child fields") + }, + } + return fc, nil +} + +func (ec *executionContext) _JobStats_stats(ctx context.Context, field graphql.CollectedField, obj *model.JobStats) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_JobStats_stats(ctx, field) + if err != nil { + return graphql.Null + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (any, error) { + ctx = rctx // use context from middleware stack in children + return obj.Stats, nil + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + if !graphql.HasFieldError(ctx, fc) { + ec.Errorf(ctx, "must not be null") + } + return graphql.Null + } + res := resTmp.(*schema.MetricStatistics) + fc.Result = res + return ec.marshalNMetricStatistics2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋpkgᚋschemaᚐMetricStatistics(ctx, field.Selections, res) +} + +func (ec *executionContext) fieldContext_JobStats_stats(_ context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "JobStats", + Field: field, + IsMethod: false, + IsResolver: false, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + switch field.Name { + case "avg": + return ec.fieldContext_MetricStatistics_avg(ctx, field) + case "min": + return ec.fieldContext_MetricStatistics_min(ctx, field) + case "max": + return ec.fieldContext_MetricStatistics_max(ctx, field) + } + return nil, fmt.Errorf("no field named %q was found under type MetricStatistics", field.Name) + }, + } + return fc, nil +} + +func (ec *executionContext) _JobStatsWithScope_name(ctx context.Context, field graphql.CollectedField, obj *model.JobStatsWithScope) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_JobStatsWithScope_name(ctx, field) + if err != nil { + return graphql.Null + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (any, error) { + ctx = rctx // use context from middleware stack in children + return obj.Name, nil + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + if !graphql.HasFieldError(ctx, fc) { + ec.Errorf(ctx, "must not be null") + } + return graphql.Null + } + res := resTmp.(string) + fc.Result = res + return ec.marshalNString2string(ctx, field.Selections, res) +} + +func (ec *executionContext) fieldContext_JobStatsWithScope_name(_ context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "JobStatsWithScope", + Field: field, + IsMethod: false, + IsResolver: false, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + return nil, errors.New("field of type String does not have child fields") + }, + } + return fc, nil +} + +func (ec *executionContext) _JobStatsWithScope_scope(ctx context.Context, field graphql.CollectedField, obj *model.JobStatsWithScope) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_JobStatsWithScope_scope(ctx, field) + if err != nil { + return graphql.Null + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (any, error) { + ctx = rctx // use context from middleware stack in children + return obj.Scope, nil + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + if !graphql.HasFieldError(ctx, fc) { + ec.Errorf(ctx, "must not be null") + } + return graphql.Null + } + res := resTmp.(schema.MetricScope) + fc.Result = res + return ec.marshalNMetricScope2githubᚗcomᚋClusterCockpitᚋccᚑbackendᚋpkgᚋschemaᚐMetricScope(ctx, field.Selections, res) +} + +func (ec *executionContext) fieldContext_JobStatsWithScope_scope(_ context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "JobStatsWithScope", + Field: field, + IsMethod: false, + IsResolver: false, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + return nil, errors.New("field of type MetricScope does not have child fields") + }, + } + return fc, nil +} + +func (ec *executionContext) _JobStatsWithScope_stats(ctx context.Context, field graphql.CollectedField, obj *model.JobStatsWithScope) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_JobStatsWithScope_stats(ctx, field) + if err != nil { + return graphql.Null + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (any, error) { + ctx = rctx // use context from middleware stack in children + return obj.Stats, nil + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + if !graphql.HasFieldError(ctx, fc) { + ec.Errorf(ctx, "must not be null") + } + return graphql.Null + } + res := resTmp.([]*model.ScopedStats) + fc.Result = res + return ec.marshalNScopedStats2ᚕᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋinternalᚋgraphᚋmodelᚐScopedStatsᚄ(ctx, field.Selections, res) +} + +func (ec *executionContext) fieldContext_JobStatsWithScope_stats(_ context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "JobStatsWithScope", + Field: field, + IsMethod: false, + IsResolver: false, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + switch field.Name { + case "hostname": + return ec.fieldContext_ScopedStats_hostname(ctx, field) + case "id": + return ec.fieldContext_ScopedStats_id(ctx, field) + case "data": + return ec.fieldContext_ScopedStats_data(ctx, field) + } + return nil, fmt.Errorf("no field named %q was found under type ScopedStats", field.Name) + }, + } + return fc, nil +} + func (ec *executionContext) _JobsStatistics_id(ctx context.Context, field graphql.CollectedField, obj *model.JobsStatistics) (ret graphql.Marshaler) { fc, err := ec.fieldContext_JobsStatistics_id(ctx, field) if err != nil { @@ -10296,8 +10591,8 @@ func (ec *executionContext) fieldContext_Query_jobMetrics(ctx context.Context, f return fc, nil } -func (ec *executionContext) _Query_jobMetricStats(ctx context.Context, field graphql.CollectedField) (ret graphql.Marshaler) { - fc, err := ec.fieldContext_Query_jobMetricStats(ctx, field) +func (ec *executionContext) _Query_jobStats(ctx context.Context, field graphql.CollectedField) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_Query_jobStats(ctx, field) if err != nil { return graphql.Null } @@ -10310,7 +10605,7 @@ func (ec *executionContext) _Query_jobMetricStats(ctx context.Context, field gra }() resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (any, error) { ctx = rctx // use context from middleware stack in children - return ec.resolvers.Query().JobMetricStats(rctx, fc.Args["id"].(string), fc.Args["metrics"].([]string)) + return ec.resolvers.Query().JobStats(rctx, fc.Args["id"].(string), fc.Args["metrics"].([]string)) }) if err != nil { ec.Error(ctx, err) @@ -10322,12 +10617,12 @@ func (ec *executionContext) _Query_jobMetricStats(ctx context.Context, field gra } return graphql.Null } - res := resTmp.([]*model.JobMetricStatWithName) + res := resTmp.([]*model.JobStats) fc.Result = res - return ec.marshalNJobMetricStatWithName2ᚕᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋinternalᚋgraphᚋmodelᚐJobMetricStatWithNameᚄ(ctx, field.Selections, res) + return ec.marshalNJobStats2ᚕᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋinternalᚋgraphᚋmodelᚐJobStatsᚄ(ctx, field.Selections, res) } -func (ec *executionContext) fieldContext_Query_jobMetricStats(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { +func (ec *executionContext) fieldContext_Query_jobStats(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { fc = &graphql.FieldContext{ Object: "Query", Field: field, @@ -10336,11 +10631,11 @@ func (ec *executionContext) fieldContext_Query_jobMetricStats(ctx context.Contex Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { switch field.Name { case "name": - return ec.fieldContext_JobMetricStatWithName_name(ctx, field) + return ec.fieldContext_JobStats_name(ctx, field) case "stats": - return ec.fieldContext_JobMetricStatWithName_stats(ctx, field) + return ec.fieldContext_JobStats_stats(ctx, field) } - return nil, fmt.Errorf("no field named %q was found under type JobMetricStatWithName", field.Name) + return nil, fmt.Errorf("no field named %q was found under type JobStats", field.Name) }, } defer func() { @@ -10350,7 +10645,70 @@ func (ec *executionContext) fieldContext_Query_jobMetricStats(ctx context.Contex } }() ctx = graphql.WithFieldContext(ctx, fc) - if fc.Args, err = ec.field_Query_jobMetricStats_args(ctx, field.ArgumentMap(ec.Variables)); err != nil { + if fc.Args, err = ec.field_Query_jobStats_args(ctx, field.ArgumentMap(ec.Variables)); err != nil { + ec.Error(ctx, err) + return fc, err + } + return fc, nil +} + +func (ec *executionContext) _Query_scopedJobStats(ctx context.Context, field graphql.CollectedField) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_Query_scopedJobStats(ctx, field) + if err != nil { + return graphql.Null + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (any, error) { + ctx = rctx // use context from middleware stack in children + return ec.resolvers.Query().ScopedJobStats(rctx, fc.Args["id"].(string), fc.Args["metrics"].([]string), fc.Args["scopes"].([]schema.MetricScope)) + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + if !graphql.HasFieldError(ctx, fc) { + ec.Errorf(ctx, "must not be null") + } + return graphql.Null + } + res := resTmp.([]*model.JobStatsWithScope) + fc.Result = res + return ec.marshalNJobStatsWithScope2ᚕᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋinternalᚋgraphᚋmodelᚐJobStatsWithScopeᚄ(ctx, field.Selections, res) +} + +func (ec *executionContext) fieldContext_Query_scopedJobStats(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "Query", + Field: field, + IsMethod: true, + IsResolver: true, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + switch field.Name { + case "name": + return ec.fieldContext_JobStatsWithScope_name(ctx, field) + case "scope": + return ec.fieldContext_JobStatsWithScope_scope(ctx, field) + case "stats": + return ec.fieldContext_JobStatsWithScope_stats(ctx, field) + } + return nil, fmt.Errorf("no field named %q was found under type JobStatsWithScope", field.Name) + }, + } + defer func() { + if r := recover(); r != nil { + err = ec.Recover(ctx, r) + ec.Error(ctx, err) + } + }() + ctx = graphql.WithFieldContext(ctx, fc) + if fc.Args, err = ec.field_Query_scopedJobStats_args(ctx, field.ArgumentMap(ec.Variables)); err != nil { ec.Error(ctx, err) return fc, err } @@ -11058,6 +11416,143 @@ func (ec *executionContext) fieldContext_Resource_configuration(_ context.Contex return fc, nil } +func (ec *executionContext) _ScopedStats_hostname(ctx context.Context, field graphql.CollectedField, obj *model.ScopedStats) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_ScopedStats_hostname(ctx, field) + if err != nil { + return graphql.Null + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (any, error) { + ctx = rctx // use context from middleware stack in children + return obj.Hostname, nil + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + if !graphql.HasFieldError(ctx, fc) { + ec.Errorf(ctx, "must not be null") + } + return graphql.Null + } + res := resTmp.(string) + fc.Result = res + return ec.marshalNString2string(ctx, field.Selections, res) +} + +func (ec *executionContext) fieldContext_ScopedStats_hostname(_ context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "ScopedStats", + Field: field, + IsMethod: false, + IsResolver: false, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + return nil, errors.New("field of type String does not have child fields") + }, + } + return fc, nil +} + +func (ec *executionContext) _ScopedStats_id(ctx context.Context, field graphql.CollectedField, obj *model.ScopedStats) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_ScopedStats_id(ctx, field) + if err != nil { + return graphql.Null + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (any, error) { + ctx = rctx // use context from middleware stack in children + return obj.ID, nil + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + return graphql.Null + } + res := resTmp.(*string) + fc.Result = res + return ec.marshalOString2ᚖstring(ctx, field.Selections, res) +} + +func (ec *executionContext) fieldContext_ScopedStats_id(_ context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "ScopedStats", + Field: field, + IsMethod: false, + IsResolver: false, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + return nil, errors.New("field of type String does not have child fields") + }, + } + return fc, nil +} + +func (ec *executionContext) _ScopedStats_data(ctx context.Context, field graphql.CollectedField, obj *model.ScopedStats) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_ScopedStats_data(ctx, field) + if err != nil { + return graphql.Null + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (any, error) { + ctx = rctx // use context from middleware stack in children + return obj.Data, nil + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + if !graphql.HasFieldError(ctx, fc) { + ec.Errorf(ctx, "must not be null") + } + return graphql.Null + } + res := resTmp.(*schema.MetricStatistics) + fc.Result = res + return ec.marshalNMetricStatistics2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋpkgᚋschemaᚐMetricStatistics(ctx, field.Selections, res) +} + +func (ec *executionContext) fieldContext_ScopedStats_data(_ context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "ScopedStats", + Field: field, + IsMethod: false, + IsResolver: false, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + switch field.Name { + case "avg": + return ec.fieldContext_MetricStatistics_avg(ctx, field) + case "min": + return ec.fieldContext_MetricStatistics_min(ctx, field) + case "max": + return ec.fieldContext_MetricStatistics_max(ctx, field) + } + return nil, fmt.Errorf("no field named %q was found under type MetricStatistics", field.Name) + }, + } + return fc, nil +} + func (ec *executionContext) _Series_hostname(ctx context.Context, field graphql.CollectedField, obj *schema.Series) (ret graphql.Marshaler) { fc, err := ec.fieldContext_Series_hostname(ctx, field) if err != nil { @@ -16569,50 +17064,6 @@ func (ec *executionContext) _JobMetric(ctx context.Context, sel ast.SelectionSet return out } -var jobMetricStatWithNameImplementors = []string{"JobMetricStatWithName"} - -func (ec *executionContext) _JobMetricStatWithName(ctx context.Context, sel ast.SelectionSet, obj *model.JobMetricStatWithName) graphql.Marshaler { - fields := graphql.CollectFields(ec.OperationContext, sel, jobMetricStatWithNameImplementors) - - out := graphql.NewFieldSet(fields) - deferred := make(map[string]*graphql.FieldSet) - for i, field := range fields { - switch field.Name { - case "__typename": - out.Values[i] = graphql.MarshalString("JobMetricStatWithName") - case "name": - out.Values[i] = ec._JobMetricStatWithName_name(ctx, field, obj) - if out.Values[i] == graphql.Null { - out.Invalids++ - } - case "stats": - out.Values[i] = ec._JobMetricStatWithName_stats(ctx, field, obj) - if out.Values[i] == graphql.Null { - out.Invalids++ - } - default: - panic("unknown field " + strconv.Quote(field.Name)) - } - } - out.Dispatch(ctx) - if out.Invalids > 0 { - return graphql.Null - } - - atomic.AddInt32(&ec.deferred, int32(len(deferred))) - - for label, dfs := range deferred { - ec.processDeferredGroup(graphql.DeferredGroup{ - Label: label, - Path: graphql.GetPath(ctx), - FieldSet: dfs, - Context: ctx, - }) - } - - return out -} - var jobMetricWithNameImplementors = []string{"JobMetricWithName"} func (ec *executionContext) _JobMetricWithName(ctx context.Context, sel ast.SelectionSet, obj *model.JobMetricWithName) graphql.Marshaler { @@ -16709,6 +17160,99 @@ func (ec *executionContext) _JobResultList(ctx context.Context, sel ast.Selectio return out } +var jobStatsImplementors = []string{"JobStats"} + +func (ec *executionContext) _JobStats(ctx context.Context, sel ast.SelectionSet, obj *model.JobStats) graphql.Marshaler { + fields := graphql.CollectFields(ec.OperationContext, sel, jobStatsImplementors) + + out := graphql.NewFieldSet(fields) + deferred := make(map[string]*graphql.FieldSet) + for i, field := range fields { + switch field.Name { + case "__typename": + out.Values[i] = graphql.MarshalString("JobStats") + case "name": + out.Values[i] = ec._JobStats_name(ctx, field, obj) + if out.Values[i] == graphql.Null { + out.Invalids++ + } + case "stats": + out.Values[i] = ec._JobStats_stats(ctx, field, obj) + if out.Values[i] == graphql.Null { + out.Invalids++ + } + default: + panic("unknown field " + strconv.Quote(field.Name)) + } + } + out.Dispatch(ctx) + if out.Invalids > 0 { + return graphql.Null + } + + atomic.AddInt32(&ec.deferred, int32(len(deferred))) + + for label, dfs := range deferred { + ec.processDeferredGroup(graphql.DeferredGroup{ + Label: label, + Path: graphql.GetPath(ctx), + FieldSet: dfs, + Context: ctx, + }) + } + + return out +} + +var jobStatsWithScopeImplementors = []string{"JobStatsWithScope"} + +func (ec *executionContext) _JobStatsWithScope(ctx context.Context, sel ast.SelectionSet, obj *model.JobStatsWithScope) graphql.Marshaler { + fields := graphql.CollectFields(ec.OperationContext, sel, jobStatsWithScopeImplementors) + + out := graphql.NewFieldSet(fields) + deferred := make(map[string]*graphql.FieldSet) + for i, field := range fields { + switch field.Name { + case "__typename": + out.Values[i] = graphql.MarshalString("JobStatsWithScope") + case "name": + out.Values[i] = ec._JobStatsWithScope_name(ctx, field, obj) + if out.Values[i] == graphql.Null { + out.Invalids++ + } + case "scope": + out.Values[i] = ec._JobStatsWithScope_scope(ctx, field, obj) + if out.Values[i] == graphql.Null { + out.Invalids++ + } + case "stats": + out.Values[i] = ec._JobStatsWithScope_stats(ctx, field, obj) + if out.Values[i] == graphql.Null { + out.Invalids++ + } + default: + panic("unknown field " + strconv.Quote(field.Name)) + } + } + out.Dispatch(ctx) + if out.Invalids > 0 { + return graphql.Null + } + + atomic.AddInt32(&ec.deferred, int32(len(deferred))) + + for label, dfs := range deferred { + ec.processDeferredGroup(graphql.DeferredGroup{ + Label: label, + Path: graphql.GetPath(ctx), + FieldSet: dfs, + Context: ctx, + }) + } + + return out +} + var jobsStatisticsImplementors = []string{"JobsStatistics"} func (ec *executionContext) _JobsStatistics(ctx context.Context, sel ast.SelectionSet, obj *model.JobsStatistics) graphql.Marshaler { @@ -17513,7 +18057,7 @@ func (ec *executionContext) _Query(ctx context.Context, sel ast.SelectionSet) gr } out.Concurrently(i, func(ctx context.Context) graphql.Marshaler { return rrm(innerCtx) }) - case "jobMetricStats": + case "jobStats": field := field innerFunc := func(ctx context.Context, fs *graphql.FieldSet) (res graphql.Marshaler) { @@ -17522,7 +18066,29 @@ func (ec *executionContext) _Query(ctx context.Context, sel ast.SelectionSet) gr ec.Error(ctx, ec.Recover(ctx, r)) } }() - res = ec._Query_jobMetricStats(ctx, field) + res = ec._Query_jobStats(ctx, field) + if res == graphql.Null { + atomic.AddUint32(&fs.Invalids, 1) + } + return res + } + + rrm := func(ctx context.Context) graphql.Marshaler { + return ec.OperationContext.RootResolverMiddleware(ctx, + func(ctx context.Context) graphql.Marshaler { return innerFunc(ctx, out) }) + } + + out.Concurrently(i, func(ctx context.Context) graphql.Marshaler { return rrm(innerCtx) }) + case "scopedJobStats": + field := field + + innerFunc := func(ctx context.Context, fs *graphql.FieldSet) (res graphql.Marshaler) { + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + } + }() + res = ec._Query_scopedJobStats(ctx, field) if res == graphql.Null { atomic.AddUint32(&fs.Invalids, 1) } @@ -17740,6 +18306,52 @@ func (ec *executionContext) _Resource(ctx context.Context, sel ast.SelectionSet, return out } +var scopedStatsImplementors = []string{"ScopedStats"} + +func (ec *executionContext) _ScopedStats(ctx context.Context, sel ast.SelectionSet, obj *model.ScopedStats) graphql.Marshaler { + fields := graphql.CollectFields(ec.OperationContext, sel, scopedStatsImplementors) + + out := graphql.NewFieldSet(fields) + deferred := make(map[string]*graphql.FieldSet) + for i, field := range fields { + switch field.Name { + case "__typename": + out.Values[i] = graphql.MarshalString("ScopedStats") + case "hostname": + out.Values[i] = ec._ScopedStats_hostname(ctx, field, obj) + if out.Values[i] == graphql.Null { + out.Invalids++ + } + case "id": + out.Values[i] = ec._ScopedStats_id(ctx, field, obj) + case "data": + out.Values[i] = ec._ScopedStats_data(ctx, field, obj) + if out.Values[i] == graphql.Null { + out.Invalids++ + } + default: + panic("unknown field " + strconv.Quote(field.Name)) + } + } + out.Dispatch(ctx) + if out.Invalids > 0 { + return graphql.Null + } + + atomic.AddInt32(&ec.deferred, int32(len(deferred))) + + for label, dfs := range deferred { + ec.processDeferredGroup(graphql.DeferredGroup{ + Label: label, + Path: graphql.GetPath(ctx), + FieldSet: dfs, + Context: ctx, + }) + } + + return out +} + var seriesImplementors = []string{"Series"} func (ec *executionContext) _Series(ctx context.Context, sel ast.SelectionSet, obj *schema.Series) graphql.Marshaler { @@ -19346,60 +19958,6 @@ func (ec *executionContext) marshalNJobMetric2ᚖgithubᚗcomᚋClusterCockpit return ec._JobMetric(ctx, sel, v) } -func (ec *executionContext) marshalNJobMetricStatWithName2ᚕᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋinternalᚋgraphᚋmodelᚐJobMetricStatWithNameᚄ(ctx context.Context, sel ast.SelectionSet, v []*model.JobMetricStatWithName) graphql.Marshaler { - ret := make(graphql.Array, len(v)) - var wg sync.WaitGroup - isLen1 := len(v) == 1 - if !isLen1 { - wg.Add(len(v)) - } - for i := range v { - i := i - fc := &graphql.FieldContext{ - Index: &i, - Result: &v[i], - } - ctx := graphql.WithFieldContext(ctx, fc) - f := func(i int) { - defer func() { - if r := recover(); r != nil { - ec.Error(ctx, ec.Recover(ctx, r)) - ret = nil - } - }() - if !isLen1 { - defer wg.Done() - } - ret[i] = ec.marshalNJobMetricStatWithName2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋinternalᚋgraphᚋmodelᚐJobMetricStatWithName(ctx, sel, v[i]) - } - if isLen1 { - f(i) - } else { - go f(i) - } - - } - wg.Wait() - - for _, e := range ret { - if e == graphql.Null { - return graphql.Null - } - } - - return ret -} - -func (ec *executionContext) marshalNJobMetricStatWithName2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋinternalᚋgraphᚋmodelᚐJobMetricStatWithName(ctx context.Context, sel ast.SelectionSet, v *model.JobMetricStatWithName) graphql.Marshaler { - if v == nil { - if !graphql.HasFieldError(ctx, graphql.GetFieldContext(ctx)) { - ec.Errorf(ctx, "the requested element is null which the schema does not allow") - } - return graphql.Null - } - return ec._JobMetricStatWithName(ctx, sel, v) -} - func (ec *executionContext) marshalNJobMetricWithName2ᚕᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋinternalᚋgraphᚋmodelᚐJobMetricWithNameᚄ(ctx context.Context, sel ast.SelectionSet, v []*model.JobMetricWithName) graphql.Marshaler { ret := make(graphql.Array, len(v)) var wg sync.WaitGroup @@ -19478,6 +20036,114 @@ func (ec *executionContext) marshalNJobState2githubᚗcomᚋClusterCockpitᚋcc return v } +func (ec *executionContext) marshalNJobStats2ᚕᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋinternalᚋgraphᚋmodelᚐJobStatsᚄ(ctx context.Context, sel ast.SelectionSet, v []*model.JobStats) graphql.Marshaler { + ret := make(graphql.Array, len(v)) + var wg sync.WaitGroup + isLen1 := len(v) == 1 + if !isLen1 { + wg.Add(len(v)) + } + for i := range v { + i := i + fc := &graphql.FieldContext{ + Index: &i, + Result: &v[i], + } + ctx := graphql.WithFieldContext(ctx, fc) + f := func(i int) { + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = nil + } + }() + if !isLen1 { + defer wg.Done() + } + ret[i] = ec.marshalNJobStats2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋinternalᚋgraphᚋmodelᚐJobStats(ctx, sel, v[i]) + } + if isLen1 { + f(i) + } else { + go f(i) + } + + } + wg.Wait() + + for _, e := range ret { + if e == graphql.Null { + return graphql.Null + } + } + + return ret +} + +func (ec *executionContext) marshalNJobStats2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋinternalᚋgraphᚋmodelᚐJobStats(ctx context.Context, sel ast.SelectionSet, v *model.JobStats) graphql.Marshaler { + if v == nil { + if !graphql.HasFieldError(ctx, graphql.GetFieldContext(ctx)) { + ec.Errorf(ctx, "the requested element is null which the schema does not allow") + } + return graphql.Null + } + return ec._JobStats(ctx, sel, v) +} + +func (ec *executionContext) marshalNJobStatsWithScope2ᚕᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋinternalᚋgraphᚋmodelᚐJobStatsWithScopeᚄ(ctx context.Context, sel ast.SelectionSet, v []*model.JobStatsWithScope) graphql.Marshaler { + ret := make(graphql.Array, len(v)) + var wg sync.WaitGroup + isLen1 := len(v) == 1 + if !isLen1 { + wg.Add(len(v)) + } + for i := range v { + i := i + fc := &graphql.FieldContext{ + Index: &i, + Result: &v[i], + } + ctx := graphql.WithFieldContext(ctx, fc) + f := func(i int) { + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = nil + } + }() + if !isLen1 { + defer wg.Done() + } + ret[i] = ec.marshalNJobStatsWithScope2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋinternalᚋgraphᚋmodelᚐJobStatsWithScope(ctx, sel, v[i]) + } + if isLen1 { + f(i) + } else { + go f(i) + } + + } + wg.Wait() + + for _, e := range ret { + if e == graphql.Null { + return graphql.Null + } + } + + return ret +} + +func (ec *executionContext) marshalNJobStatsWithScope2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋinternalᚋgraphᚋmodelᚐJobStatsWithScope(ctx context.Context, sel ast.SelectionSet, v *model.JobStatsWithScope) graphql.Marshaler { + if v == nil { + if !graphql.HasFieldError(ctx, graphql.GetFieldContext(ctx)) { + ec.Errorf(ctx, "the requested element is null which the schema does not allow") + } + return graphql.Null + } + return ec._JobStatsWithScope(ctx, sel, v) +} + func (ec *executionContext) marshalNJobsStatistics2ᚕᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋinternalᚋgraphᚋmodelᚐJobsStatisticsᚄ(ctx context.Context, sel ast.SelectionSet, v []*model.JobsStatistics) graphql.Marshaler { ret := make(graphql.Array, len(v)) var wg sync.WaitGroup @@ -19891,6 +20557,60 @@ func (ec *executionContext) marshalNResource2ᚖgithubᚗcomᚋClusterCockpitᚋ return ec._Resource(ctx, sel, v) } +func (ec *executionContext) marshalNScopedStats2ᚕᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋinternalᚋgraphᚋmodelᚐScopedStatsᚄ(ctx context.Context, sel ast.SelectionSet, v []*model.ScopedStats) graphql.Marshaler { + ret := make(graphql.Array, len(v)) + var wg sync.WaitGroup + isLen1 := len(v) == 1 + if !isLen1 { + wg.Add(len(v)) + } + for i := range v { + i := i + fc := &graphql.FieldContext{ + Index: &i, + Result: &v[i], + } + ctx := graphql.WithFieldContext(ctx, fc) + f := func(i int) { + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = nil + } + }() + if !isLen1 { + defer wg.Done() + } + ret[i] = ec.marshalNScopedStats2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋinternalᚋgraphᚋmodelᚐScopedStats(ctx, sel, v[i]) + } + if isLen1 { + f(i) + } else { + go f(i) + } + + } + wg.Wait() + + for _, e := range ret { + if e == graphql.Null { + return graphql.Null + } + } + + return ret +} + +func (ec *executionContext) marshalNScopedStats2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋinternalᚋgraphᚋmodelᚐScopedStats(ctx context.Context, sel ast.SelectionSet, v *model.ScopedStats) graphql.Marshaler { + if v == nil { + if !graphql.HasFieldError(ctx, graphql.GetFieldContext(ctx)) { + ec.Errorf(ctx, "the requested element is null which the schema does not allow") + } + return graphql.Null + } + return ec._ScopedStats(ctx, sel, v) +} + func (ec *executionContext) marshalNSeries2githubᚗcomᚋClusterCockpitᚋccᚑbackendᚋpkgᚋschemaᚐSeries(ctx context.Context, sel ast.SelectionSet, v schema.Series) graphql.Marshaler { return ec._Series(ctx, sel, &v) } diff --git a/internal/graph/model/models_gen.go b/internal/graph/model/models_gen.go index d83a318..43c4e37 100644 --- a/internal/graph/model/models_gen.go +++ b/internal/graph/model/models_gen.go @@ -81,11 +81,6 @@ type JobLinkResultList struct { Count *int `json:"count,omitempty"` } -type JobMetricStatWithName struct { - Name string `json:"name"` - Stats *schema.MetricStatistics `json:"stats"` -} - type JobMetricWithName struct { Name string `json:"name"` Scope schema.MetricScope `json:"scope"` @@ -100,6 +95,17 @@ type JobResultList struct { HasNextPage *bool `json:"hasNextPage,omitempty"` } +type JobStats struct { + Name string `json:"name"` + Stats *schema.MetricStatistics `json:"stats"` +} + +type JobStatsWithScope struct { + Name string `json:"name"` + Scope schema.MetricScope `json:"scope"` + Stats []*ScopedStats `json:"stats"` +} + type JobsStatistics struct { ID string `json:"id"` Name string `json:"name"` @@ -173,6 +179,12 @@ type PageRequest struct { Page int `json:"page"` } +type ScopedStats struct { + Hostname string `json:"hostname"` + ID *string `json:"id,omitempty"` + Data *schema.MetricStatistics `json:"data"` +} + type StringInput struct { Eq *string `json:"eq,omitempty"` Neq *string `json:"neq,omitempty"` diff --git a/internal/graph/schema.resolvers.go b/internal/graph/schema.resolvers.go index ce1384b..1565c7e 100644 --- a/internal/graph/schema.resolvers.go +++ b/internal/graph/schema.resolvers.go @@ -301,24 +301,23 @@ func (r *queryResolver) JobMetrics(ctx context.Context, id string, metrics []str return res, err } -// JobMetricStats is the resolver for the jobMetricStats field. -func (r *queryResolver) JobMetricStats(ctx context.Context, id string, metrics []string) ([]*model.JobMetricStatWithName, error) { - +// JobMetricStats is the resolver for the jobStats field. +func (r *queryResolver) JobStats(ctx context.Context, id string, metrics []string) ([]*model.JobStats, error) { job, err := r.Query().Job(ctx, id) if err != nil { - log.Warn("Error while querying job for metrics") + log.Warnf("Error while querying job %s for metrics", id) return nil, err } - data, err := metricDataDispatcher.LoadStatData(job, metrics, ctx) + data, err := metricDataDispatcher.LoadJobStats(job, metrics, ctx) if err != nil { - log.Warn("Error while loading job stat data") + log.Warnf("Error while loading job stat data for job id %s", id) return nil, err } - res := []*model.JobMetricStatWithName{} + res := []*model.JobStats{} for name, md := range data { - res = append(res, &model.JobMetricStatWithName{ + res = append(res, &model.JobStats{ Name: name, Stats: &md, }) @@ -327,6 +326,47 @@ func (r *queryResolver) JobMetricStats(ctx context.Context, id string, metrics [ return res, err } +// JobStats is the resolver for the scopedJobStats field. +func (r *queryResolver) ScopedJobStats(ctx context.Context, id string, metrics []string, scopes []schema.MetricScope) ([]*model.JobStatsWithScope, error) { + job, err := r.Query().Job(ctx, id) + if err != nil { + log.Warnf("Error while querying job %s for metrics", id) + return nil, err + } + + data, err := metricDataDispatcher.LoadScopedJobStats(job, metrics, scopes, ctx) + if err != nil { + log.Warnf("Error while loading scoped job stat data for job id %s", id) + return nil, err + } + + res := make([]*model.JobStatsWithScope, 0) + for name, scoped := range data { + for scope, stats := range scoped { + // log.Debugf("HANDLE >>>>> %s @ %s -> First Array Value %#v", name, scope, *stats[0]) + + mdlStats := make([]*model.ScopedStats, 0) + for _, stat := range stats { + // log.Debugf("CONVERT >>>>> >>>>> %s -> %v -> %#v", stat.Hostname, stat.Id, stat.Data) + mdlStats = append(mdlStats, &model.ScopedStats{ + Hostname: stat.Hostname, + ID: stat.Id, + Data: stat.Data, + }) + } + + // log.Debugf("APPEND >>>>> >>>>> %#v", mdlStats) + res = append(res, &model.JobStatsWithScope{ + Name: name, + Scope: scope, + Stats: mdlStats, + }) + } + } + + return res, nil +} + // JobsFootprints is the resolver for the jobsFootprints field. func (r *queryResolver) JobsFootprints(ctx context.Context, filter []*model.JobFilter, metrics []string) (*model.Footprints, error) { // NOTE: Legacy Naming! This resolver is for normalized histograms in analysis view only - *Not* related to DB "footprint" column! diff --git a/internal/metricDataDispatcher/dataLoader.go b/internal/metricDataDispatcher/dataLoader.go index f3f60b4..c6cecd8 100644 --- a/internal/metricDataDispatcher/dataLoader.go +++ b/internal/metricDataDispatcher/dataLoader.go @@ -224,8 +224,34 @@ func LoadAverages( return nil } -// Used for polar plots in frontend -func LoadStatData( +// Used for statsTable in frontend: Return scoped statistics by metric. +func LoadScopedJobStats( + job *schema.Job, + metrics []string, + scopes []schema.MetricScope, + ctx context.Context, +) (schema.ScopedJobStats, error) { + + if job.State != schema.JobStateRunning && !config.Keys.DisableArchive { + return archive.LoadScopedStatsFromArchive(job, metrics, scopes) + } + + repo, err := metricdata.GetMetricDataRepo(job.Cluster) + if err != nil { + return nil, fmt.Errorf("job %d: no metric data repository configured for '%s'", job.JobID, job.Cluster) + } + + scopedStats, err := repo.LoadScopedStats(job, metrics, scopes, ctx) + if err != nil { + log.Errorf("error while loading scoped statistics for job %d (User %s, Project %s)", job.JobID, job.User, job.Project) + return nil, err + } + + return scopedStats, nil +} + +// Used for polar plots in frontend: Aggregates statistics for all nodes to single values for job per metric. +func LoadJobStats( job *schema.Job, metrics []string, ctx context.Context, @@ -237,12 +263,12 @@ func LoadStatData( data := make(map[string]schema.MetricStatistics, len(metrics)) repo, err := metricdata.GetMetricDataRepo(job.Cluster) if err != nil { - return data, fmt.Errorf("METRICDATA/METRICDATA > no metric data repository configured for '%s'", job.Cluster) + return data, fmt.Errorf("job %d: no metric data repository configured for '%s'", job.JobID, job.Cluster) } stats, err := repo.LoadStats(job, metrics, ctx) if err != nil { - log.Errorf("Error while loading statistics for job %v (User %v, Project %v)", job.JobID, job.User, job.Project) + log.Errorf("error while loading statistics for job %d (User %s, Project %s)", job.JobID, job.User, job.Project) return data, err } diff --git a/internal/metricdata/cc-metric-store.go b/internal/metricdata/cc-metric-store.go index 2b92fbb..6635299 100644 --- a/internal/metricdata/cc-metric-store.go +++ b/internal/metricdata/cc-metric-store.go @@ -618,7 +618,98 @@ func (ccms *CCMetricStore) LoadStats( return stats, nil } -// TODO: Support sub-node-scope metrics! For this, the partition of a node needs to be known! +// Scoped Stats: Basically Load Data without resolution and data query flag? +func (ccms *CCMetricStore) LoadScopedStats( + job *schema.Job, + metrics []string, + scopes []schema.MetricScope, + ctx context.Context, +) (schema.ScopedJobStats, error) { + queries, assignedScope, err := ccms.buildQueries(job, metrics, scopes, 0) + if err != nil { + log.Warn("Error while building queries") + return nil, err + } + + req := ApiQueryRequest{ + Cluster: job.Cluster, + From: job.StartTime.Unix(), + To: job.StartTime.Add(time.Duration(job.Duration) * time.Second).Unix(), + Queries: queries, + WithStats: true, + WithData: false, + } + + resBody, err := ccms.doRequest(ctx, &req) + if err != nil { + log.Error("Error while performing request") + return nil, err + } + + var errors []string + scopedJobStats := make(schema.ScopedJobStats) + + for i, row := range resBody.Results { + query := req.Queries[i] + metric := ccms.toLocalName(query.Metric) + scope := assignedScope[i] + + if _, ok := scopedJobStats[metric]; !ok { + scopedJobStats[metric] = make(map[schema.MetricScope][]*schema.ScopedStats) + } + + if _, ok := scopedJobStats[metric][scope]; !ok { + scopedJobStats[metric][scope] = make([]*schema.ScopedStats, 0) + } + + for ndx, res := range row { + if res.Error != nil { + /* Build list for "partial errors", if any */ + errors = append(errors, fmt.Sprintf("failed to fetch '%s' from host '%s': %s", query.Metric, query.Hostname, *res.Error)) + continue + } + + id := (*string)(nil) + if query.Type != nil { + id = new(string) + *id = query.TypeIds[ndx] + } + + if res.Avg.IsNaN() || res.Min.IsNaN() || res.Max.IsNaN() { + // "schema.Float()" because regular float64 can not be JSONed when NaN. + res.Avg = schema.Float(0) + res.Min = schema.Float(0) + res.Max = schema.Float(0) + } + + scopedJobStats[metric][scope] = append(scopedJobStats[metric][scope], &schema.ScopedStats{ + Hostname: query.Hostname, + Id: id, + Data: &schema.MetricStatistics{ + Avg: float64(res.Avg), + Min: float64(res.Min), + Max: float64(res.Max), + }, + }) + } + + // So that one can later check len(scopedJobStats[metric][scope]): Remove from map if empty + if len(scopedJobStats[metric][scope]) == 0 { + delete(scopedJobStats[metric], scope) + if len(scopedJobStats[metric]) == 0 { + delete(scopedJobStats, metric) + } + } + } + + if len(errors) != 0 { + /* Returns list for "partial errors" */ + return scopedJobStats, fmt.Errorf("METRICDATA/CCMS > Errors: %s", strings.Join(errors, ", ")) + } + return scopedJobStats, nil +} + +// TODO: Support sub-node-scope metrics! For this, the partition of a node needs to be known! - Todo Outdated with NodeListData? func (ccms *CCMetricStore) LoadNodeData( cluster string, metrics, nodes []string, diff --git a/internal/metricdata/influxdb-v2.go b/internal/metricdata/influxdb-v2.go index 79c2d4a..2a943b6 100644 --- a/internal/metricdata/influxdb-v2.go +++ b/internal/metricdata/influxdb-v2.go @@ -301,6 +301,18 @@ func (idb *InfluxDBv2DataRepository) LoadStats( return stats, nil } +func (idb *InfluxDBv2DataRepository) LoadScopedStats( + job *schema.Job, + metrics []string, + scopes []schema.MetricScope, + ctx context.Context) (schema.ScopedJobStats, error) { + + // TODO : Implement to be used in JobView Stats Table + log.Infof("LoadScopedStats unimplemented for InfluxDBv2DataRepository, Args: Job-ID %d, metrics %v, scopes %v", job.JobID, metrics, scopes) + + return nil, errors.New("METRICDATA/INFLUXV2 > unimplemented for InfluxDBv2DataRepository") +} + func (idb *InfluxDBv2DataRepository) LoadNodeData( cluster string, metrics, nodes []string, diff --git a/internal/metricdata/metricdata.go b/internal/metricdata/metricdata.go index 0fe94d1..f30d837 100644 --- a/internal/metricdata/metricdata.go +++ b/internal/metricdata/metricdata.go @@ -24,9 +24,12 @@ type MetricDataRepository interface { // Return the JobData for the given job, only with the requested metrics. LoadData(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context, resolution int) (schema.JobData, error) - // Return a map of metrics to a map of nodes to the metric statistics of the job. node scope assumed for now. + // Return a map of metrics to a map of nodes to the metric statistics of the job. node scope only. LoadStats(job *schema.Job, metrics []string, ctx context.Context) (map[string]map[string]schema.MetricStatistics, error) + // Return a map of metrics to a map of scopes to the scoped metric statistics of the job. + LoadScopedStats(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context) (schema.ScopedJobStats, error) + // Return a map of hosts to a map of metrics at the requested scopes (currently only node) for that node. LoadNodeData(cluster string, metrics, nodes []string, scopes []schema.MetricScope, from, to time.Time, ctx context.Context) (map[string]map[string][]*schema.JobMetric, error) diff --git a/internal/metricdata/prometheus.go b/internal/metricdata/prometheus.go index cd849ce..fe829c0 100644 --- a/internal/metricdata/prometheus.go +++ b/internal/metricdata/prometheus.go @@ -448,6 +448,18 @@ func (pdb *PrometheusDataRepository) LoadNodeData( return data, nil } +func (pdb *PrometheusDataRepository) LoadScopedStats( + job *schema.Job, + metrics []string, + scopes []schema.MetricScope, + ctx context.Context) (schema.ScopedJobStats, error) { + + // TODO : Implement to be used in Job-View StatsTable + log.Infof("LoadScopedStats unimplemented for PrometheusDataRepository, Args: job-id %v, metrics %v, scopes %v", job.JobID, metrics, scopes) + + return nil, errors.New("METRICDATA/PROMETHEUS > unimplemented for PrometheusDataRepository") +} + func (pdb *PrometheusDataRepository) LoadNodeListData( cluster, subCluster, nodeFilter string, metrics []string, @@ -463,5 +475,5 @@ func (pdb *PrometheusDataRepository) LoadNodeListData( // TODO : Implement to be used in NodeList-View log.Infof("LoadNodeListData unimplemented for PrometheusDataRepository, Args: cluster %s, metrics %v, nodeFilter %v, scopes %v", cluster, metrics, nodeFilter, scopes) - return nil, totalNodes, hasNextPage, errors.New("METRICDATA/INFLUXV2 > unimplemented for PrometheusDataRepository") + return nil, totalNodes, hasNextPage, errors.New("METRICDATA/PROMETHEUS > unimplemented for PrometheusDataRepository") } diff --git a/internal/metricdata/utils.go b/internal/metricdata/utils.go index 48dd237..aa7bde1 100644 --- a/internal/metricdata/utils.go +++ b/internal/metricdata/utils.go @@ -36,7 +36,17 @@ func (tmdr *TestMetricDataRepository) LoadData( func (tmdr *TestMetricDataRepository) LoadStats( job *schema.Job, - metrics []string, ctx context.Context) (map[string]map[string]schema.MetricStatistics, error) { + metrics []string, + ctx context.Context) (map[string]map[string]schema.MetricStatistics, error) { + + panic("TODO") +} + +func (tmdr *TestMetricDataRepository) LoadScopedStats( + job *schema.Job, + metrics []string, + scopes []schema.MetricScope, + ctx context.Context) (schema.ScopedJobStats, error) { panic("TODO") } diff --git a/pkg/archive/archive.go b/pkg/archive/archive.go index 2eabb52..002fd5e 100644 --- a/pkg/archive/archive.go +++ b/pkg/archive/archive.go @@ -27,6 +27,8 @@ type ArchiveBackend interface { LoadJobData(job *schema.Job) (schema.JobData, error) + LoadJobStats(job *schema.Job) (schema.ScopedJobStats, error) + LoadClusterCfg(name string) (*schema.Cluster, error) StoreJobMeta(jobMeta *schema.JobMeta) error @@ -125,7 +127,7 @@ func LoadAveragesFromArchive( return nil } -// Helper to metricdataloader.LoadStatData(). +// Helper to metricdataloader.LoadJobStats(). func LoadStatsFromArchive( job *schema.Job, metrics []string, @@ -154,6 +156,22 @@ func LoadStatsFromArchive( return data, nil } +// Helper to metricdataloader.LoadScopedJobStats(). +func LoadScopedStatsFromArchive( + job *schema.Job, + metrics []string, + scopes []schema.MetricScope, +) (schema.ScopedJobStats, error) { + + data, err := ar.LoadJobStats(job) + if err != nil { + log.Warn("Error while loading job metadata from archiveBackend") + return nil, err + } + + return data, nil +} + func GetStatistics(job *schema.Job) (map[string]schema.JobStatistics, error) { metaFile, err := ar.LoadJobMeta(job) if err != nil { diff --git a/pkg/archive/fsBackend.go b/pkg/archive/fsBackend.go index 8a43748..711b1f5 100644 --- a/pkg/archive/fsBackend.go +++ b/pkg/archive/fsBackend.go @@ -115,6 +115,40 @@ func loadJobData(filename string, isCompressed bool) (schema.JobData, error) { } } +func loadJobStats(filename string, isCompressed bool) (schema.ScopedJobStats, error) { + f, err := os.Open(filename) + + if err != nil { + log.Errorf("fsBackend LoadJobStats()- %v", err) + return nil, err + } + defer f.Close() + + if isCompressed { + r, err := gzip.NewReader(f) + if err != nil { + log.Errorf(" %v", err) + return nil, err + } + defer r.Close() + + if config.Keys.Validate { + if err := schema.Validate(schema.Data, r); err != nil { + return nil, fmt.Errorf("validate job data: %v", err) + } + } + + return DecodeJobStats(r, filename) + } else { + if config.Keys.Validate { + if err := schema.Validate(schema.Data, bufio.NewReader(f)); err != nil { + return nil, fmt.Errorf("validate job data: %v", err) + } + } + return DecodeJobStats(bufio.NewReader(f), filename) + } +} + func (fsa *FsArchive) Init(rawConfig json.RawMessage) (uint64, error) { var config FsArchiveConfig @@ -389,6 +423,18 @@ func (fsa *FsArchive) LoadJobData(job *schema.Job) (schema.JobData, error) { return loadJobData(filename, isCompressed) } +func (fsa *FsArchive) LoadJobStats(job *schema.Job) (schema.ScopedJobStats, error) { + var isCompressed bool = true + filename := getPath(job, fsa.path, "data.json.gz") + + if !util.CheckFileExists(filename) { + filename = getPath(job, fsa.path, "data.json") + isCompressed = false + } + + return loadJobStats(filename, isCompressed) +} + func (fsa *FsArchive) LoadJobMeta(job *schema.Job) (*schema.JobMeta, error) { filename := getPath(job, fsa.path, "meta.json") return loadJobMeta(filename) diff --git a/pkg/archive/json.go b/pkg/archive/json.go index 1219658..5201b74 100644 --- a/pkg/archive/json.go +++ b/pkg/archive/json.go @@ -32,6 +32,43 @@ func DecodeJobData(r io.Reader, k string) (schema.JobData, error) { return data.(schema.JobData), nil } +func DecodeJobStats(r io.Reader, k string) (schema.ScopedJobStats, error) { + jobData, err := DecodeJobData(r, k) + // Convert schema.JobData to schema.ScopedJobStats + if jobData != nil { + scopedJobStats := make(schema.ScopedJobStats) + for metric, metricData := range jobData { + if _, ok := scopedJobStats[metric]; !ok { + scopedJobStats[metric] = make(map[schema.MetricScope][]*schema.ScopedStats) + } + + for scope, jobMetric := range metricData { + if _, ok := scopedJobStats[metric][scope]; !ok { + scopedJobStats[metric][scope] = make([]*schema.ScopedStats, 0) + } + + for _, series := range jobMetric.Series { + scopedJobStats[metric][scope] = append(scopedJobStats[metric][scope], &schema.ScopedStats{ + Hostname: series.Hostname, + Id: series.Id, + Data: &series.Statistics, + }) + } + + // So that one can later check len(scopedJobStats[metric][scope]): Remove from map if empty + if len(scopedJobStats[metric][scope]) == 0 { + delete(scopedJobStats[metric], scope) + if len(scopedJobStats[metric]) == 0 { + delete(scopedJobStats, metric) + } + } + } + } + return scopedJobStats, nil + } + return nil, err +} + func DecodeJobMeta(r io.Reader) (*schema.JobMeta, error) { var d schema.JobMeta if err := json.NewDecoder(r).Decode(&d); err != nil { diff --git a/pkg/schema/metrics.go b/pkg/schema/metrics.go index ffac21b..fbb85e4 100644 --- a/pkg/schema/metrics.go +++ b/pkg/schema/metrics.go @@ -15,6 +15,7 @@ import ( ) type JobData map[string]map[MetricScope]*JobMetric +type ScopedJobStats map[string]map[MetricScope][]*ScopedStats type JobMetric struct { StatisticsSeries *StatsSeries `json:"statisticsSeries,omitempty"` @@ -30,6 +31,12 @@ type Series struct { Statistics MetricStatistics `json:"statistics"` } +type ScopedStats struct { + Hostname string `json:"hostname"` + Id *string `json:"id,omitempty"` + Data *MetricStatistics `json:"data"` +} + type MetricStatistics struct { Avg float64 `json:"avg"` Min float64 `json:"min"` diff --git a/web/frontend/src/Job.root.svelte b/web/frontend/src/Job.root.svelte index 6980230..2fe5bc4 100644 --- a/web/frontend/src/Job.root.svelte +++ b/web/frontend/src/Job.root.svelte @@ -127,28 +127,17 @@ let job = $initq.data.job; if (!job) return; - const pendingMetrics = [ - ...( - ( - ccconfig[`job_view_selectedMetrics:${job.cluster}:${job.subCluster}`] || - ccconfig[`job_view_selectedMetrics:${job.cluster}`] - ) || - $initq.data.globalMetrics - .reduce((names, gm) => { - if (gm.availability.find((av) => av.cluster === job.cluster && av.subClusters.includes(job.subCluster))) { - names.push(gm.name); - } - return names; - }, []) - ), - ...( - ( - ccconfig[`job_view_nodestats_selectedMetrics:${job.cluster}:${job.subCluster}`] || - ccconfig[`job_view_nodestats_selectedMetrics:${job.cluster}`] - ) || - ccconfig[`job_view_nodestats_selectedMetrics`] - ), - ]; + const pendingMetrics = ( + ccconfig[`job_view_selectedMetrics:${job.cluster}:${job.subCluster}`] || + ccconfig[`job_view_selectedMetrics:${job.cluster}`] + ) || + $initq.data.globalMetrics + .reduce((names, gm) => { + if (gm.availability.find((av) => av.cluster === job.cluster && av.subClusters.includes(job.subCluster))) { + names.push(gm.name); + } + return names; + }, []) // Select default Scopes to load: Check before if any metric has accelerator scope by default const accScopeDefault = [...pendingMetrics].some(function (m) { @@ -343,7 +332,6 @@ {#if item.data} <Metric bind:this={plots[item.metric]} - on:more-loaded={({ detail }) => statsTable.moreLoaded(detail)} job={$initq.data.job} metricName={item.metric} metricUnit={$initq.data.globalMetrics.find((gm) => gm.name == item.metric)?.unit} @@ -404,15 +392,7 @@ class="overflow-x-auto" active={!somethingMissing} > - {#if $jobMetrics?.data?.jobMetrics} - {#key $jobMetrics.data.jobMetrics} - <StatsTable - bind:this={statsTable} - job={$initq.data.job} - jobMetrics={$jobMetrics.data.jobMetrics} - /> - {/key} - {/if} + <StatsTable job={$initq.data.job}/> </TabPane> <TabPane tabId="job-script" tab="Job Script"> <div class="pre-wrapper"> diff --git a/web/frontend/src/job/Metric.svelte b/web/frontend/src/job/Metric.svelte index bcfa4fd..b68ef47 100644 --- a/web/frontend/src/job/Metric.svelte +++ b/web/frontend/src/job/Metric.svelte @@ -150,11 +150,6 @@ // On additional scope request if (selectedScope == "load-all") { - // Push scope to statsTable (Needs to be in this case, else newly selected 'Metric.svelte' renders cause statsTable race condition) - const statsTableData = $metricData.data.singleUpdate.filter((x) => x.scope !== "node") - if (statsTableData.length > 0) { - dispatch("more-loaded", statsTableData); - } // Set selected scope to min of returned scopes selectedScope = minScope(scopes) nodeOnly = (selectedScope == "node") // "node" still only scope after load-all diff --git a/web/frontend/src/job/StatsTable.svelte b/web/frontend/src/job/StatsTable.svelte index c8f12f2..159d24b 100644 --- a/web/frontend/src/job/StatsTable.svelte +++ b/web/frontend/src/job/StatsTable.svelte @@ -3,13 +3,14 @@ Properties: - `job Object`: The job object - - `jobMetrics [Object]`: The jobs metricdata - - Exported: - - `moreLoaded`: Adds additional scopes requested from Metric.svelte in Job-View --> <script> + import { + queryStore, + gql, + getContextClient + } from "@urql/svelte"; import { getContext } from "svelte"; import { Button, @@ -26,11 +27,6 @@ import MetricSelection from "../generic/select/MetricSelection.svelte"; export let job; - export let jobMetrics; - - const sortedJobMetrics = [...new Set(jobMetrics.map((m) => m.name))].sort() - const scopesForMetric = (metric) => - jobMetrics.filter((jm) => jm.name == metric).map((jm) => jm.scope); let hosts = job.resources.map((r) => r.hostname).sort(), selectedScopes = {}, @@ -42,29 +38,63 @@ getContext("cc-config")[`job_view_nodestats_selectedMetrics:${job.cluster}`] ) || getContext("cc-config")["job_view_nodestats_selectedMetrics"]; - for (let metric of sortedJobMetrics) { - // Not Exclusive or Multi-Node: get maxScope directly (mostly: node) - // -> Else: Load smallest available granularity as default as per availability - const availableScopes = scopesForMetric(metric); - if (job.exclusive != 1 || job.numNodes == 1) { - if (availableScopes.includes("accelerator")) { - selectedScopes[metric] = "accelerator"; - } else if (availableScopes.includes("core")) { - selectedScopes[metric] = "core"; - } else if (availableScopes.includes("socket")) { - selectedScopes[metric] = "socket"; - } else { - selectedScopes[metric] = "node"; + const client = getContextClient(); + const query = gql` + query ($dbid: ID!, $selectedMetrics: [String!]!, $selectedScopes: [MetricScope!]!) { + scopedJobStats(id: $dbid, metrics: $selectedMetrics, scopes: $selectedScopes) { + name + scope + stats { + hostname + id + data { + min + avg + max + } + } } - } else { - selectedScopes[metric] = maxScope(availableScopes); } + `; - sorting[metric] = { - min: { dir: "up", active: false }, - avg: { dir: "up", active: false }, - max: { dir: "up", active: false }, - }; + $: scopedStats = queryStore({ + client: client, + query: query, + variables: { dbid: job.id, selectedMetrics, selectedScopes: ["node"] }, + }); + + $: console.log(">>>> RESULT:", $scopedStats?.data?.scopedJobStats) + + $: jobMetrics = $scopedStats?.data?.scopedJobStats || []; + + const scopesForMetric = (metric) => + jobMetrics.filter((jm) => jm.name == metric).map((jm) => jm.scope); + + $: if ($scopedStats?.data) { + for (let metric of selectedMetrics) { + // Not Exclusive or Multi-Node: get maxScope directly (mostly: node) + // -> Else: Load smallest available granularity as default as per availability + const availableScopes = scopesForMetric(metric); + if (job.exclusive != 1 || job.numNodes == 1) { + if (availableScopes.includes("accelerator")) { + selectedScopes[metric] = "accelerator"; + } else if (availableScopes.includes("core")) { + selectedScopes[metric] = "core"; + } else if (availableScopes.includes("socket")) { + selectedScopes[metric] = "socket"; + } else { + selectedScopes[metric] = "node"; + } + } else { + selectedScopes[metric] = maxScope(availableScopes); + } + + sorting[metric] = { + min: { dir: "up", active: false }, + avg: { dir: "up", active: false }, + max: { dir: "up", active: false }, + }; + } } function sortBy(metric, stat) { @@ -90,13 +120,6 @@ }); } - export function moreLoaded(moreJobMetrics) { - moreJobMetrics.forEach(function (newMetric) { - if (!jobMetrics.some((m) => m.scope == newMetric.scope)) { - jobMetrics = [...jobMetrics, newMetric] - } - }); - }; </script> <Row> diff --git a/web/frontend/src/job/StatsTableEntry.svelte b/web/frontend/src/job/StatsTableEntry.svelte index 9504a63..dc2f628 100644 --- a/web/frontend/src/job/StatsTableEntry.svelte +++ b/web/frontend/src/job/StatsTableEntry.svelte @@ -37,8 +37,8 @@ return s.dir != "up" ? a[field] - b[field] : b[field] - a[field]; } else { return s.dir != "up" - ? a.statistics[field] - b.statistics[field] - : b.statistics[field] - a.statistics[field]; + ? a.data[field] - b.data[field] + : b.data[field] - a.data[field]; } }); } @@ -52,7 +52,7 @@ $: series = jobMetrics .find((jm) => jm.name == metric && jm.scope == scope) - ?.metric.series.filter((s) => s.hostname == host && s.statistics != null) + ?.stats.filter((s) => s.hostname == host && s.data != null) ?.sort(compareNumbers); </script> @@ -60,13 +60,13 @@ <td colspan={scope == "node" ? 3 : 4}><i>No data</i></td> {:else if series.length == 1 && scope == "node"} <td> - {series[0].statistics.min} + {series[0].data.min} </td> <td> - {series[0].statistics.avg} + {series[0].data.avg} </td> <td> - {series[0].statistics.max} + {series[0].data.max} </td> {:else} <td colspan="4"> @@ -86,9 +86,9 @@ {#each series as s, i} <tr> <th>{s.id ?? i}</th> - <td>{s.statistics.min}</td> - <td>{s.statistics.avg}</td> - <td>{s.statistics.max}</td> + <td>{s.data.min}</td> + <td>{s.data.avg}</td> + <td>{s.data.max}</td> </tr> {/each} </table> diff --git a/web/frontend/src/job/jobsummary/JobFootprintPolar.svelte b/web/frontend/src/job/jobsummary/JobFootprintPolar.svelte index cf90408..fe6693b 100644 --- a/web/frontend/src/job/jobsummary/JobFootprintPolar.svelte +++ b/web/frontend/src/job/jobsummary/JobFootprintPolar.svelte @@ -40,14 +40,14 @@ const client = getContextClient(); const polarQuery = gql` query ($dbid: ID!, $selectedMetrics: [String!]!) { - jobMetricStats(id: $dbid, metrics: $selectedMetrics) { + jobStats(id: $dbid, metrics: $selectedMetrics) { name stats { - min - avg - max - } + min + avg + max } + } } `; @@ -66,7 +66,7 @@ {:else} <Polar {polarMetrics} - polarData={$polarData.data.jobMetricStats} + polarData={$polarData.data.jobStats} /> {/if} </CardBody> \ No newline at end of file From 8da2fc30c39d2ce4b3c7c2b532b480851f431d89 Mon Sep 17 00:00:00 2001 From: Christoph Kluge <christoph.kluge@fau.de> Date: Fri, 14 Mar 2025 16:36:31 +0100 Subject: [PATCH 2/5] split statsTable data from jobMetrics query, frontend refactor --- internal/graph/schema.resolvers.go | 3 - web/frontend/src/Job.root.svelte | 18 +- web/frontend/src/job/StatsTab.svelte | 145 +++++++++++++ web/frontend/src/job/StatsTable.svelte | 201 ------------------ .../src/job/statstab/StatsTable.svelte | 139 ++++++++++++ .../job/{ => statstab}/StatsTableEntry.svelte | 50 ++--- 6 files changed, 314 insertions(+), 242 deletions(-) create mode 100644 web/frontend/src/job/StatsTab.svelte delete mode 100644 web/frontend/src/job/StatsTable.svelte create mode 100644 web/frontend/src/job/statstab/StatsTable.svelte rename web/frontend/src/job/{ => statstab}/StatsTableEntry.svelte (67%) diff --git a/internal/graph/schema.resolvers.go b/internal/graph/schema.resolvers.go index 1565c7e..a470807 100644 --- a/internal/graph/schema.resolvers.go +++ b/internal/graph/schema.resolvers.go @@ -343,11 +343,9 @@ func (r *queryResolver) ScopedJobStats(ctx context.Context, id string, metrics [ res := make([]*model.JobStatsWithScope, 0) for name, scoped := range data { for scope, stats := range scoped { - // log.Debugf("HANDLE >>>>> %s @ %s -> First Array Value %#v", name, scope, *stats[0]) mdlStats := make([]*model.ScopedStats, 0) for _, stat := range stats { - // log.Debugf("CONVERT >>>>> >>>>> %s -> %v -> %#v", stat.Hostname, stat.Id, stat.Data) mdlStats = append(mdlStats, &model.ScopedStats{ Hostname: stat.Hostname, ID: stat.Id, @@ -355,7 +353,6 @@ func (r *queryResolver) ScopedJobStats(ctx context.Context, id string, metrics [ }) } - // log.Debugf("APPEND >>>>> >>>>> %#v", mdlStats) res = append(res, &model.JobStatsWithScope{ Name: name, Scope: scope, diff --git a/web/frontend/src/Job.root.svelte b/web/frontend/src/Job.root.svelte index 2fe5bc4..c2748e6 100644 --- a/web/frontend/src/Job.root.svelte +++ b/web/frontend/src/Job.root.svelte @@ -40,7 +40,7 @@ import JobRoofline from "./job/JobRoofline.svelte"; import EnergySummary from "./job/EnergySummary.svelte"; import PlotGrid from "./generic/PlotGrid.svelte"; - import StatsTable from "./job/StatsTable.svelte"; + import StatsTab from "./job/StatsTab.svelte"; export let dbid; export let username; @@ -53,10 +53,8 @@ let isMetricsSelectionOpen = false, selectedMetrics = [], - selectedScopes = []; - - let plots = {}, - statsTable + selectedScopes = [], + plots = {}; let availableMetrics = new Set(), missingMetrics = [], @@ -386,14 +384,8 @@ </div> </TabPane> {/if} - <TabPane - tabId="stats" - tab="Statistics Table" - class="overflow-x-auto" - active={!somethingMissing} - > - <StatsTable job={$initq.data.job}/> - </TabPane> + <!-- Includes <TabPane> Statistics Table with Independent GQL Query --> + <StatsTab job={$initq.data.job} clusters={$initq.data.clusters} tabActive={!somethingMissing}/> <TabPane tabId="job-script" tab="Job Script"> <div class="pre-wrapper"> {#if $initq.data.job.metaData?.jobScript} diff --git a/web/frontend/src/job/StatsTab.svelte b/web/frontend/src/job/StatsTab.svelte new file mode 100644 index 0000000..b7647b5 --- /dev/null +++ b/web/frontend/src/job/StatsTab.svelte @@ -0,0 +1,145 @@ +<!-- + @component Job-View subcomponent; Wraps the statsTable in a TabPane and contains GQL query for scoped statsData + + Properties: + - `job Object`: The job object + - `clusters Object`: The clusters object + - `tabActive bool`: Boolean if StatsTabe Tab is Active on Creation + --> + +<script> + import { + queryStore, + gql, + getContextClient + } from "@urql/svelte"; + import { getContext } from "svelte"; + import { + Card, + Button, + Row, + Col, + TabPane, + Spinner, + Icon + } from "@sveltestrap/sveltestrap"; + import MetricSelection from "../generic/select/MetricSelection.svelte"; + import StatsTable from "./statstab/StatsTable.svelte"; + + export let job; + export let clusters; + export let tabActive; + + let loadScopes = false; + let selectedScopes = []; + let selectedMetrics = []; + let availableMetrics = new Set(); // For Info Only, filled by MetricSelection Component + let isMetricSelectionOpen = false; + + const client = getContextClient(); + const query = gql` + query ($dbid: ID!, $selectedMetrics: [String!]!, $selectedScopes: [MetricScope!]!) { + scopedJobStats(id: $dbid, metrics: $selectedMetrics, scopes: $selectedScopes) { + name + scope + stats { + hostname + id + data { + min + avg + max + } + } + } + } + `; + + $: scopedStats = queryStore({ + client: client, + query: query, + variables: { dbid: job.id, selectedMetrics, selectedScopes }, + }); + + $: if (loadScopes) { + selectedScopes = ["node", "socket", "core", "hwthread", "accelerator"]; + } + + // Handle Job Query on Init -> is not executed anymore + getContext("on-init")(() => { + if (!job) return; + + const pendingMetrics = ( + getContext("cc-config")[`job_view_nodestats_selectedMetrics:${job.cluster}:${job.subCluster}`] || + getContext("cc-config")[`job_view_nodestats_selectedMetrics:${job.cluster}`] + ) || getContext("cc-config")["job_view_nodestats_selectedMetrics"]; + + // Select default Scopes to load: Check before if any metric has accelerator scope by default + const accScopeDefault = [...pendingMetrics].some(function (m) { + const cluster = clusters.find((c) => c.name == job.cluster); + const subCluster = cluster.subClusters.find((sc) => sc.name == job.subCluster); + return subCluster.metricConfig.find((smc) => smc.name == m)?.scope === "accelerator"; + }); + + const pendingScopes = ["node"] + if (job.numNodes === 1) { + pendingScopes.push("socket") + pendingScopes.push("core") + pendingScopes.push("hwthread") + if (accScopeDefault) { pendingScopes.push("accelerator") } + } + + selectedMetrics = [...pendingMetrics]; + selectedScopes = [...pendingScopes]; + }); + +</script> + +<TabPane tabId="stats" tab="Statistics Table" class="overflow-x-auto" active={tabActive}> + <Row> + <Col class="m-2"> + <Button outline on:click={() => (isMetricSelectionOpen = true)} class="px-2" color="primary" style="margin-right:0.5rem"> + Select Metrics (Selected {selectedMetrics.length} of {availableMetrics.size} available) + </Button> + {#if job.numNodes > 1} + <Button class="px-2 ml-auto" color="success" outline on:click={() => (loadScopes = !loadScopes)} disabled={loadScopes}> + {#if !loadScopes} + <Icon name="plus-square-fill" style="margin-right:0.25rem"/> Add More Scopes + {:else} + <Icon name="check-square-fill" style="margin-right:0.25rem"/> OK: Scopes Added + {/if} + </Button> + {/if} + </Col> + </Row> + <hr class="mb-1 mt-1"/> + <!-- ROW1: Status--> + {#if $scopedStats.fetching} + <Row> + <Col class="m-3" style="text-align: center;"> + <Spinner secondary/> + </Col> + </Row> + {:else if $scopedStats.error} + <Row> + <Col class="m-2"> + <Card body color="danger">{$scopedStats.error.message}</Card> + </Col> + </Row> + {:else} + <StatsTable + hosts={job.resources.map((r) => r.hostname).sort()} + data={$scopedStats?.data?.scopedJobStats} + {selectedMetrics} + /> + {/if} +</TabPane> + +<MetricSelection + cluster={job.cluster} + subCluster={job.subCluster} + configName="job_view_nodestats_selectedMetrics" + bind:allMetrics={availableMetrics} + bind:metrics={selectedMetrics} + bind:isOpen={isMetricSelectionOpen} +/> diff --git a/web/frontend/src/job/StatsTable.svelte b/web/frontend/src/job/StatsTable.svelte deleted file mode 100644 index 159d24b..0000000 --- a/web/frontend/src/job/StatsTable.svelte +++ /dev/null @@ -1,201 +0,0 @@ -<!-- - @component Job-View subcomponent; display table of metric data statistics with selectable scopes - - Properties: - - `job Object`: The job object - --> - -<script> - import { - queryStore, - gql, - getContextClient - } from "@urql/svelte"; - import { getContext } from "svelte"; - import { - Button, - Table, - Input, - InputGroup, - InputGroupText, - Icon, - Row, - Col - } from "@sveltestrap/sveltestrap"; - import { maxScope } from "../generic/utils.js"; - import StatsTableEntry from "./StatsTableEntry.svelte"; - import MetricSelection from "../generic/select/MetricSelection.svelte"; - - export let job; - - let hosts = job.resources.map((r) => r.hostname).sort(), - selectedScopes = {}, - sorting = {}, - isMetricSelectionOpen = false, - availableMetrics = new Set(), - selectedMetrics = ( - getContext("cc-config")[`job_view_nodestats_selectedMetrics:${job.cluster}:${job.subCluster}`] || - getContext("cc-config")[`job_view_nodestats_selectedMetrics:${job.cluster}`] - ) || getContext("cc-config")["job_view_nodestats_selectedMetrics"]; - - const client = getContextClient(); - const query = gql` - query ($dbid: ID!, $selectedMetrics: [String!]!, $selectedScopes: [MetricScope!]!) { - scopedJobStats(id: $dbid, metrics: $selectedMetrics, scopes: $selectedScopes) { - name - scope - stats { - hostname - id - data { - min - avg - max - } - } - } - } - `; - - $: scopedStats = queryStore({ - client: client, - query: query, - variables: { dbid: job.id, selectedMetrics, selectedScopes: ["node"] }, - }); - - $: console.log(">>>> RESULT:", $scopedStats?.data?.scopedJobStats) - - $: jobMetrics = $scopedStats?.data?.scopedJobStats || []; - - const scopesForMetric = (metric) => - jobMetrics.filter((jm) => jm.name == metric).map((jm) => jm.scope); - - $: if ($scopedStats?.data) { - for (let metric of selectedMetrics) { - // Not Exclusive or Multi-Node: get maxScope directly (mostly: node) - // -> Else: Load smallest available granularity as default as per availability - const availableScopes = scopesForMetric(metric); - if (job.exclusive != 1 || job.numNodes == 1) { - if (availableScopes.includes("accelerator")) { - selectedScopes[metric] = "accelerator"; - } else if (availableScopes.includes("core")) { - selectedScopes[metric] = "core"; - } else if (availableScopes.includes("socket")) { - selectedScopes[metric] = "socket"; - } else { - selectedScopes[metric] = "node"; - } - } else { - selectedScopes[metric] = maxScope(availableScopes); - } - - sorting[metric] = { - min: { dir: "up", active: false }, - avg: { dir: "up", active: false }, - max: { dir: "up", active: false }, - }; - } - } - - function sortBy(metric, stat) { - let s = sorting[metric][stat]; - if (s.active) { - s.dir = s.dir == "up" ? "down" : "up"; - } else { - for (let metric in sorting) - for (let stat in sorting[metric]) sorting[metric][stat].active = false; - s.active = true; - } - - let series = jobMetrics.find( - (jm) => jm.name == metric && jm.scope == "node", - )?.metric.series; - sorting = { ...sorting }; - hosts = hosts.sort((h1, h2) => { - let s1 = series.find((s) => s.hostname == h1)?.statistics; - let s2 = series.find((s) => s.hostname == h2)?.statistics; - if (s1 == null || s2 == null) return -1; - - return s.dir != "up" ? s1[stat] - s2[stat] : s2[stat] - s1[stat]; - }); - } - -</script> - -<Row> - <Col class="m-2"> - <Button outline on:click={() => (isMetricSelectionOpen = true)} class="w-auto px-2" color="primary"> - Select Metrics (Selected {selectedMetrics.length} of {availableMetrics.size} available) - </Button> - </Col> -</Row> -<hr class="mb-1 mt-1"/> -<Table class="mb-0"> - <thead> - <!-- Header Row 1: Selectors --> - <tr> - <th/> - {#each selectedMetrics as metric} - <!-- To Match Row-2 Header Field Count--> - <th colspan={selectedScopes[metric] == "node" ? 3 : 4}> - <InputGroup> - <InputGroupText> - {metric} - </InputGroupText> - <Input type="select" bind:value={selectedScopes[metric]}> - {#each scopesForMetric(metric, jobMetrics) as scope} - <option value={scope}>{scope}</option> - {/each} - </Input> - </InputGroup> - </th> - {/each} - </tr> - <!-- Header Row 2: Fields --> - <tr> - <th>Node</th> - {#each selectedMetrics as metric} - {#if selectedScopes[metric] != "node"} - <th>Id</th> - {/if} - {#each ["min", "avg", "max"] as stat} - <th on:click={() => sortBy(metric, stat)}> - {stat} - {#if selectedScopes[metric] == "node"} - <Icon - name="caret-{sorting[metric][stat].dir}{sorting[metric][stat] - .active - ? '-fill' - : ''}" - /> - {/if} - </th> - {/each} - {/each} - </tr> - </thead> - <tbody> - {#each hosts as host (host)} - <tr> - <th scope="col">{host}</th> - {#each selectedMetrics as metric (metric)} - <StatsTableEntry - {host} - {metric} - scope={selectedScopes[metric]} - {jobMetrics} - /> - {/each} - </tr> - {/each} - </tbody> -</Table> - -<MetricSelection - cluster={job.cluster} - subCluster={job.subCluster} - configName="job_view_nodestats_selectedMetrics" - bind:allMetrics={availableMetrics} - bind:metrics={selectedMetrics} - bind:isOpen={isMetricSelectionOpen} -/> diff --git a/web/frontend/src/job/statstab/StatsTable.svelte b/web/frontend/src/job/statstab/StatsTable.svelte new file mode 100644 index 0000000..4adb4bd --- /dev/null +++ b/web/frontend/src/job/statstab/StatsTable.svelte @@ -0,0 +1,139 @@ +<!--: + @component Job-View subcomponent; display table of metric data statistics with selectable scopes + + Properties: + - `job Object`: The job object + - `clusters Object`: The clusters object + - `hosts [String]`: The list of hostnames of this job + --> + +<script> + import { + Table, + Input, + InputGroup, + InputGroupText, + Icon, + } from "@sveltestrap/sveltestrap"; + import StatsTableEntry from "./StatsTableEntry.svelte"; + + export let data = []; + export let selectedMetrics = []; + export let hosts = []; + + let sorting = {}; + let availableScopes = {}; + let selectedScopes = {}; + + const scopesForMetric = (metric) => + data?.filter((jm) => jm.name == metric)?.map((jm) => jm.scope) || []; + const setScopeForMetric = (metric, scope) => + selectedScopes[metric] = scope + + $: if (data && selectedMetrics) { + for (let metric of selectedMetrics) { + availableScopes[metric] = scopesForMetric(metric); + // Set Initial Selection, but do not use selectedScopes: Skips reactivity + if (availableScopes[metric].includes("accelerator")) { + setScopeForMetric(metric, "accelerator"); + } else if (availableScopes[metric].includes("core")) { + setScopeForMetric(metric, "core"); + } else if (availableScopes[metric].includes("socket")) { + setScopeForMetric(metric, "socket"); + } else { + setScopeForMetric(metric, "node"); + } + + sorting[metric] = { + min: { dir: "up", active: false }, + avg: { dir: "up", active: false }, + max: { dir: "up", active: false }, + }; + } + } + + function sortBy(metric, stat) { + let s = sorting[metric][stat]; + if (s.active) { + s.dir = s.dir == "up" ? "down" : "up"; + } else { + for (let metric in sorting) + for (let stat in sorting[metric]) sorting[metric][stat].active = false; + s.active = true; + } + + let stats = data.find( + (d) => d.name == metric && d.scope == "node", + )?.stats || []; + sorting = { ...sorting }; + hosts = hosts.sort((h1, h2) => { + let s1 = stats.find((s) => s.hostname == h1)?.data; + let s2 = stats.find((s) => s.hostname == h2)?.data; + if (s1 == null || s2 == null) return -1; + + return s.dir != "up" ? s1[stat] - s2[stat] : s2[stat] - s1[stat]; + }); + } + +</script> + +<Table class="mb-0"> + <thead> + <!-- Header Row 1: Selectors --> + <tr> + <th/> + {#each selectedMetrics as metric} + <!-- To Match Row-2 Header Field Count--> + <th colspan={selectedScopes[metric] == "node" ? 3 : 4}> + <InputGroup> + <InputGroupText> + {metric} + </InputGroupText> + <Input type="select" bind:value={selectedScopes[metric]} disabled={availableScopes[metric].length === 1}> + {#each (availableScopes[metric] || []) as scope} + <option value={scope}>{scope}</option> + {/each} + </Input> + </InputGroup> + </th> + {/each} + </tr> + <!-- Header Row 2: Fields --> + <tr> + <th>Node</th> + {#each selectedMetrics as metric} + {#if selectedScopes[metric] != "node"} + <th>Id</th> + {/if} + {#each ["min", "avg", "max"] as stat} + <th on:click={() => sortBy(metric, stat)}> + {stat} + {#if selectedScopes[metric] == "node"} + <Icon + name="caret-{sorting[metric][stat].dir}{sorting[metric][stat] + .active + ? '-fill' + : ''}" + /> + {/if} + </th> + {/each} + {/each} + </tr> + </thead> + <tbody> + {#each hosts as host (host)} + <tr> + <th scope="col">{host}</th> + {#each selectedMetrics as metric (metric)} + <StatsTableEntry + {data} + {host} + {metric} + scope={selectedScopes[metric]} + /> + {/each} + </tr> + {/each} + </tbody> +</Table> \ No newline at end of file diff --git a/web/frontend/src/job/StatsTableEntry.svelte b/web/frontend/src/job/statstab/StatsTableEntry.svelte similarity index 67% rename from web/frontend/src/job/StatsTableEntry.svelte rename to web/frontend/src/job/statstab/StatsTableEntry.svelte index dc2f628..b39eacb 100644 --- a/web/frontend/src/job/StatsTableEntry.svelte +++ b/web/frontend/src/job/statstab/StatsTableEntry.svelte @@ -1,11 +1,11 @@ <!-- - @component Job-View subcomponent; Single Statistics entry component fpr statstable + @component Job-View subcomponent; Single Statistics entry component for statstable Properties: - `host String`: The hostname (== node) - `metric String`: The metric name - `scope String`: The selected scope - - `jobMetrics [Object]`: The jobs metricdata + - `data [Object]`: The jobs statsdata --> <script> @@ -14,27 +14,34 @@ export let host; export let metric; export let scope; - export let jobMetrics; + export let data; + + let entrySorting = { + id: { dir: "down", active: true }, + min: { dir: "up", active: false }, + avg: { dir: "up", active: false }, + max: { dir: "up", active: false }, + }; function compareNumbers(a, b) { return a.id - b.id; } function sortByField(field) { - let s = sorting[field]; + let s = entrySorting[field]; if (s.active) { s.dir = s.dir == "up" ? "down" : "up"; } else { - for (let field in sorting) sorting[field].active = false; + for (let field in entrySorting) entrySorting[field].active = false; s.active = true; } - sorting = { ...sorting }; - series = series.sort((a, b) => { + entrySorting = { ...entrySorting }; + stats = stats.sort((a, b) => { if (a == null || b == null) return -1; if (field === "id") { - return s.dir != "up" ? a[field] - b[field] : b[field] - a[field]; + return s.dir != "up" ? a[field].localeCompare(b[field]) : b[field].localeCompare(a[field]) } else { return s.dir != "up" ? a.data[field] - b.data[field] @@ -43,30 +50,23 @@ }); } - let sorting = { - id: { dir: "down", active: true }, - min: { dir: "up", active: false }, - avg: { dir: "up", active: false }, - max: { dir: "up", active: false }, - }; - - $: series = jobMetrics - .find((jm) => jm.name == metric && jm.scope == scope) + $: stats = data + ?.find((d) => d.name == metric && d.scope == scope) ?.stats.filter((s) => s.hostname == host && s.data != null) - ?.sort(compareNumbers); + ?.sort(compareNumbers) || []; </script> -{#if series == null || series.length == 0} +{#if stats == null || stats.length == 0} <td colspan={scope == "node" ? 3 : 4}><i>No data</i></td> -{:else if series.length == 1 && scope == "node"} +{:else if stats.length == 1 && scope == "node"} <td> - {series[0].data.min} + {stats[0].data.min} </td> <td> - {series[0].data.avg} + {stats[0].data.avg} </td> <td> - {series[0].data.max} + {stats[0].data.max} </td> {:else} <td colspan="4"> @@ -76,14 +76,14 @@ <th on:click={() => sortByField(field)}> Sort <Icon - name="caret-{sorting[field].dir}{sorting[field].active + name="caret-{entrySorting[field].dir}{entrySorting[field].active ? '-fill' : ''}" /> </th> {/each} </tr> - {#each series as s, i} + {#each stats as s, i} <tr> <th>{s.id ?? i}</th> <td>{s.data.min}</td> From 0144ad43f57c6e3c903a7a9f7152cd8617920e59 Mon Sep 17 00:00:00 2001 From: Christoph Kluge <christoph.kluge@fau.de> Date: Mon, 17 Mar 2025 11:03:51 +0100 Subject: [PATCH 3/5] Implement NodeListData and ScopedStats for Prometheus Backend --- internal/metricdata/prometheus.go | 167 ++++++++++++++++++++++++++++-- 1 file changed, 161 insertions(+), 6 deletions(-) diff --git a/internal/metricdata/prometheus.go b/internal/metricdata/prometheus.go index fe829c0..d16501e 100644 --- a/internal/metricdata/prometheus.go +++ b/internal/metricdata/prometheus.go @@ -448,18 +448,51 @@ func (pdb *PrometheusDataRepository) LoadNodeData( return data, nil } +// Implemented by NHR@FAU; Used in Job-View StatsTable func (pdb *PrometheusDataRepository) LoadScopedStats( job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context) (schema.ScopedJobStats, error) { - // TODO : Implement to be used in Job-View StatsTable - log.Infof("LoadScopedStats unimplemented for PrometheusDataRepository, Args: job-id %v, metrics %v, scopes %v", job.JobID, metrics, scopes) + // Assumption: pdb.loadData() only returns series node-scope - use node scope for statsTable + scopedJobStats := make(schema.ScopedJobStats) + data, err := pdb.LoadData(job, metrics, []schema.MetricScope{schema.MetricScopeNode}, ctx, 0 /*resolution here*/) + if err != nil { + log.Warn("Error while loading job for scopedJobStats") + return nil, err + } - return nil, errors.New("METRICDATA/PROMETHEUS > unimplemented for PrometheusDataRepository") + for metric, metricData := range data { + for _, scope := range scopes { + if scope != schema.MetricScopeNode { + logOnce.Do(func() { + log.Infof("Note: Scope '%s' requested, but not yet supported: Will return 'node' scope only.", scope) + }) + continue + } + + if _, ok := scopedJobStats[metric]; !ok { + scopedJobStats[metric] = make(map[schema.MetricScope][]*schema.ScopedStats) + } + + if _, ok := scopedJobStats[metric][scope]; !ok { + scopedJobStats[metric][scope] = make([]*schema.ScopedStats, 0) + } + + for _, series := range metricData[scope].Series { + scopedJobStats[metric][scope] = append(scopedJobStats[metric][scope], &schema.ScopedStats{ + Hostname: series.Hostname, + Data: &series.Statistics, + }) + } + } + } + + return scopedJobStats, nil } +// Implemented by NHR@FAU; Used in NodeList-View func (pdb *PrometheusDataRepository) LoadNodeListData( cluster, subCluster, nodeFilter string, metrics []string, @@ -470,10 +503,132 @@ func (pdb *PrometheusDataRepository) LoadNodeListData( ctx context.Context, ) (map[string]schema.JobData, int, bool, error) { + // Assumption: pdb.loadData() only returns series node-scope - use node scope for NodeList + + // 0) Init additional vars var totalNodes int = 0 var hasNextPage bool = false - // TODO : Implement to be used in NodeList-View - log.Infof("LoadNodeListData unimplemented for PrometheusDataRepository, Args: cluster %s, metrics %v, nodeFilter %v, scopes %v", cluster, metrics, nodeFilter, scopes) - return nil, totalNodes, hasNextPage, errors.New("METRICDATA/PROMETHEUS > unimplemented for PrometheusDataRepository") + // 1) Get list of all nodes + var nodes []string + if subCluster != "" { + scNodes := archive.NodeLists[cluster][subCluster] + nodes = scNodes.PrintList() + } else { + subClusterNodeLists := archive.NodeLists[cluster] + for _, nodeList := range subClusterNodeLists { + nodes = append(nodes, nodeList.PrintList()...) + } + } + + // 2) Filter nodes + if nodeFilter != "" { + filteredNodes := []string{} + for _, node := range nodes { + if strings.Contains(node, nodeFilter) { + filteredNodes = append(filteredNodes, node) + } + } + nodes = filteredNodes + } + + // 2.1) Count total nodes && Sort nodes -> Sorting invalidated after return ... + totalNodes = len(nodes) + sort.Strings(nodes) + + // 3) Apply paging + if len(nodes) > page.ItemsPerPage { + start := (page.Page - 1) * page.ItemsPerPage + end := start + page.ItemsPerPage + if end > len(nodes) { + end = len(nodes) + hasNextPage = false + } else { + hasNextPage = true + } + nodes = nodes[start:end] + } + + // 4) Fetch Data, based on pdb.LoadNodeData() + + t0 := time.Now() + // Map of hosts of jobData + data := make(map[string]schema.JobData) + + // query db for each metric + // TODO: scopes seems to be always empty + if len(scopes) == 0 || !contains(scopes, schema.MetricScopeNode) { + scopes = append(scopes, schema.MetricScopeNode) + } + + for _, scope := range scopes { + if scope != schema.MetricScopeNode { + logOnce.Do(func() { + log.Infof("Note: Scope '%s' requested, but not yet supported: Will return 'node' scope only.", scope) + }) + continue + } + + for _, metric := range metrics { + metricConfig := archive.GetMetricConfig(cluster, metric) + if metricConfig == nil { + log.Warnf("Error in LoadNodeListData: Metric %s for cluster %s not configured", metric, cluster) + return nil, totalNodes, hasNextPage, errors.New("Prometheus config error") + } + query, err := pdb.FormatQuery(metric, scope, nodes, cluster) + if err != nil { + log.Warn("Error while formatting prometheus query") + return nil, totalNodes, hasNextPage, err + } + + // ranged query over all nodes + r := promv1.Range{ + Start: from, + End: to, + Step: time.Duration(metricConfig.Timestep * 1e9), + } + result, warnings, err := pdb.queryClient.QueryRange(ctx, query, r) + if err != nil { + log.Errorf("Prometheus query error in LoadNodeData: %v\n", err) + return nil, totalNodes, hasNextPage, errors.New("Prometheus query error") + } + if len(warnings) > 0 { + log.Warnf("Warnings: %v\n", warnings) + } + + step := int64(metricConfig.Timestep) + steps := int64(to.Sub(from).Seconds()) / step + + // iter rows of host, metric, values + for _, row := range result.(promm.Matrix) { + hostname := strings.TrimSuffix(string(row.Metric["exported_instance"]), pdb.suffix) + + hostdata, ok := data[hostname] + if !ok { + hostdata = make(schema.JobData) + data[hostname] = hostdata + } + + metricdata, ok := hostdata[metric] + if !ok { + metricdata = make(map[schema.MetricScope]*schema.JobMetric) + data[hostname][metric] = metricdata + } + + // output per host, metric and scope + scopeData, ok := metricdata[scope] + if !ok { + scopeData = &schema.JobMetric{ + Unit: metricConfig.Unit, + Timestep: metricConfig.Timestep, + Series: []schema.Series{pdb.RowToSeries(from, step, steps, row)}, + } + data[hostname][metric][scope] = scopeData + } + } + } + } + t1 := time.Since(t0) + log.Debugf("LoadNodeListData of %v nodes took %s", len(data), t1) + return data, totalNodes, hasNextPage, nil } From 93040d46296eb16f4a9c02bef220e33f3abefdd0 Mon Sep 17 00:00:00 2001 From: Christoph Kluge <christoph.kluge@fau.de> Date: Mon, 17 Mar 2025 15:25:33 +0100 Subject: [PATCH 4/5] IMplement LoadNode Data, LoadNodeListData, LoadScopedStats for influxDB2 backend - Untested - Only Node Scope --- internal/metricdata/influxdb-v2.go | 250 +++++++++++++++++++++++++++-- 1 file changed, 240 insertions(+), 10 deletions(-) diff --git a/internal/metricdata/influxdb-v2.go b/internal/metricdata/influxdb-v2.go index 2a943b6..c53dad3 100644 --- a/internal/metricdata/influxdb-v2.go +++ b/internal/metricdata/influxdb-v2.go @@ -10,6 +10,8 @@ import ( "encoding/json" "errors" "fmt" + "math" + "sort" "strings" "time" @@ -64,6 +66,8 @@ func (idb *InfluxDBv2DataRepository) LoadData( ctx context.Context, resolution int) (schema.JobData, error) { + log.Infof("InfluxDB 2 Backend: Resolution Scaling not Implemented, will return default timestep. Requested Resolution %d", resolution) + measurementsConds := make([]string, 0, len(metrics)) for _, m := range metrics { measurementsConds = append(measurementsConds, fmt.Sprintf(`r["_measurement"] == "%s"`, m)) @@ -86,7 +90,7 @@ func (idb *InfluxDBv2DataRepository) LoadData( query := "" switch scope { case "node": - // Get Finest Granularity, Groupy By Measurement and Hostname (== Metric / Node), Calculate Mean for 60s windows + // Get Finest Granularity, Groupy By Measurement and Hostname (== Metric / Node), Calculate Mean for 60s windows <-- Resolution could be added here? // log.Info("Scope 'node' requested. ") query = fmt.Sprintf(` from(bucket: "%s") @@ -116,6 +120,12 @@ func (idb *InfluxDBv2DataRepository) LoadData( // idb.bucket, // idb.formatTime(job.StartTime), idb.formatTime(idb.epochToTime(job.StartTimeUnix + int64(job.Duration) + int64(1) )), // measurementsCond, hostsCond) + case "hwthread": + log.Info(" Scope 'hwthread' requested, but not yet supported: Will return 'node' scope only. ") + continue + case "accelerator": + log.Info(" Scope 'accelerator' requested, but not yet supported: Will return 'node' scope only. ") + continue default: log.Infof("Unknown scope '%s' requested: Will return 'node' scope.", scope) continue @@ -173,6 +183,11 @@ func (idb *InfluxDBv2DataRepository) LoadData( } case "socket": continue + case "accelerator": + continue + case "hwthread": + // See below @ core + continue case "core": continue // Include Series.Id in hostSeries @@ -301,18 +316,53 @@ func (idb *InfluxDBv2DataRepository) LoadStats( return stats, nil } +// Used in Job-View StatsTable +// UNTESTED func (idb *InfluxDBv2DataRepository) LoadScopedStats( job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context) (schema.ScopedJobStats, error) { - // TODO : Implement to be used in JobView Stats Table - log.Infof("LoadScopedStats unimplemented for InfluxDBv2DataRepository, Args: Job-ID %d, metrics %v, scopes %v", job.JobID, metrics, scopes) + // Assumption: idb.loadData() only returns series node-scope - use node scope for statsTable + scopedJobStats := make(schema.ScopedJobStats) + data, err := idb.LoadData(job, metrics, []schema.MetricScope{schema.MetricScopeNode}, ctx, 0 /*resolution here*/) + if err != nil { + log.Warn("Error while loading job for scopedJobStats") + return nil, err + } - return nil, errors.New("METRICDATA/INFLUXV2 > unimplemented for InfluxDBv2DataRepository") + for metric, metricData := range data { + for _, scope := range scopes { + if scope != schema.MetricScopeNode { + logOnce.Do(func() { + log.Infof("Note: Scope '%s' requested, but not yet supported: Will return 'node' scope only.", scope) + }) + continue + } + + if _, ok := scopedJobStats[metric]; !ok { + scopedJobStats[metric] = make(map[schema.MetricScope][]*schema.ScopedStats) + } + + if _, ok := scopedJobStats[metric][scope]; !ok { + scopedJobStats[metric][scope] = make([]*schema.ScopedStats, 0) + } + + for _, series := range metricData[scope].Series { + scopedJobStats[metric][scope] = append(scopedJobStats[metric][scope], &schema.ScopedStats{ + Hostname: series.Hostname, + Data: &series.Statistics, + }) + } + } + } + + return scopedJobStats, nil } +// Used in Systems-View @ Node-Overview +// UNTESTED func (idb *InfluxDBv2DataRepository) LoadNodeData( cluster string, metrics, nodes []string, @@ -320,12 +370,123 @@ func (idb *InfluxDBv2DataRepository) LoadNodeData( from, to time.Time, ctx context.Context) (map[string]map[string][]*schema.JobMetric, error) { - // TODO : Implement to be used in Analysis- und System/Node-View - log.Infof("LoadNodeData unimplemented for InfluxDBv2DataRepository, Args: cluster %s, metrics %v, nodes %v, scopes %v", cluster, metrics, nodes, scopes) + // Note: scopes[] Array will be ignored, only return node scope - return nil, errors.New("METRICDATA/INFLUXV2 > unimplemented for InfluxDBv2DataRepository") + // CONVERT ARGS TO INFLUX + measurementsConds := make([]string, 0) + for _, m := range metrics { + measurementsConds = append(measurementsConds, fmt.Sprintf(`r["_measurement"] == "%s"`, m)) + } + measurementsCond := strings.Join(measurementsConds, " or ") + + hostsConds := make([]string, 0) + if nodes == nil { + var allNodes []string + subClusterNodeLists := archive.NodeLists[cluster] + for _, nodeList := range subClusterNodeLists { + allNodes = append(nodes, nodeList.PrintList()...) + } + for _, node := range allNodes { + nodes = append(nodes, node) + hostsConds = append(hostsConds, fmt.Sprintf(`r["hostname"] == "%s"`, node)) + } + } else { + for _, node := range nodes { + hostsConds = append(hostsConds, fmt.Sprintf(`r["hostname"] == "%s"`, node)) + } + } + hostsCond := strings.Join(hostsConds, " or ") + + // BUILD AND PERFORM QUERY + query := fmt.Sprintf(` + from(bucket: "%s") + |> range(start: %s, stop: %s) + |> filter(fn: (r) => (%s) and (%s) ) + |> drop(columns: ["_start", "_stop"]) + |> group(columns: ["hostname", "_measurement"]) + |> aggregateWindow(every: 60s, fn: mean) + |> drop(columns: ["_time"])`, + idb.bucket, + idb.formatTime(from), idb.formatTime(to), + measurementsCond, hostsCond) + + rows, err := idb.queryClient.Query(ctx, query) + if err != nil { + log.Error("Error while performing query") + return nil, err + } + + // HANDLE QUERY RETURN + // Collect Float Arrays for Node@Metric -> No Scope Handling! + influxData := make(map[string]map[string][]schema.Float) + for rows.Next() { + row := rows.Record() + host, field := row.ValueByKey("hostname").(string), row.Measurement() + + influxHostData, ok := influxData[host] + if !ok { + influxHostData = make(map[string][]schema.Float) + influxData[host] = influxHostData + } + + influxFieldData, ok := influxData[host][field] + if !ok { + influxFieldData = make([]schema.Float, 0) + influxData[host][field] = influxFieldData + } + + val, ok := row.Value().(float64) + if ok { + influxData[host][field] = append(influxData[host][field], schema.Float(val)) + } else { + influxData[host][field] = append(influxData[host][field], schema.Float(0)) + } + } + + // BUILD FUNCTION RETURN + data := make(map[string]map[string][]*schema.JobMetric) + for node, metricData := range influxData { + + nodeData, ok := data[node] + if !ok { + nodeData = make(map[string][]*schema.JobMetric) + data[node] = nodeData + } + + for metric, floatArray := range metricData { + avg, min, max := 0.0, 0.0, 0.0 + for _, val := range floatArray { + avg += float64(val) + min = math.Min(min, float64(val)) + max = math.Max(max, float64(val)) + } + + stats := schema.MetricStatistics{ + Avg: (math.Round((avg/float64(len(floatArray)))*100) / 100), + Min: (math.Round(min*100) / 100), + Max: (math.Round(max*100) / 100), + } + + mc := archive.GetMetricConfig(cluster, metric) + nodeData[metric] = append(nodeData[metric], &schema.JobMetric{ + Unit: mc.Unit, + Timestep: mc.Timestep, + Series: []schema.Series{ + { + Hostname: node, + Statistics: stats, + Data: floatArray, + }, + }, + }) + } + } + + return data, nil } +// Used in Systems-View @ Node-List +// UNTESTED func (idb *InfluxDBv2DataRepository) LoadNodeListData( cluster, subCluster, nodeFilter string, metrics []string, @@ -336,10 +497,79 @@ func (idb *InfluxDBv2DataRepository) LoadNodeListData( ctx context.Context, ) (map[string]schema.JobData, int, bool, error) { + // Assumption: idb.loadData() only returns series node-scope - use node scope for NodeList + + // 0) Init additional vars var totalNodes int = 0 var hasNextPage bool = false - // TODO : Implement to be used in NodeList-View - log.Infof("LoadNodeListData unimplemented for InfluxDBv2DataRepository, Args: cluster %s, metrics %v, nodeFilter %v, scopes %v", cluster, metrics, nodeFilter, scopes) - return nil, totalNodes, hasNextPage, errors.New("METRICDATA/INFLUXV2 > unimplemented for InfluxDBv2DataRepository") + // 1) Get list of all nodes + var nodes []string + if subCluster != "" { + scNodes := archive.NodeLists[cluster][subCluster] + nodes = scNodes.PrintList() + } else { + subClusterNodeLists := archive.NodeLists[cluster] + for _, nodeList := range subClusterNodeLists { + nodes = append(nodes, nodeList.PrintList()...) + } + } + + // 2) Filter nodes + if nodeFilter != "" { + filteredNodes := []string{} + for _, node := range nodes { + if strings.Contains(node, nodeFilter) { + filteredNodes = append(filteredNodes, node) + } + } + nodes = filteredNodes + } + + // 2.1) Count total nodes && Sort nodes -> Sorting invalidated after return ... + totalNodes = len(nodes) + sort.Strings(nodes) + + // 3) Apply paging + if len(nodes) > page.ItemsPerPage { + start := (page.Page - 1) * page.ItemsPerPage + end := start + page.ItemsPerPage + if end > len(nodes) { + end = len(nodes) + hasNextPage = false + } else { + hasNextPage = true + } + nodes = nodes[start:end] + } + + // 4) Fetch And Convert Data, use idb.LoadNodeData() for query + + rawNodeData, err := idb.LoadNodeData(cluster, metrics, nodes, scopes, from, to, ctx) + if err != nil { + log.Error(fmt.Sprintf("Error while loading influx nodeData for nodeListData %#v\n", err)) + return nil, totalNodes, hasNextPage, err + } + + data := make(map[string]schema.JobData) + for node, nodeData := range rawNodeData { + // Init Nested Map Data Structures If Not Found + hostData, ok := data[node] + if !ok { + hostData = make(schema.JobData) + data[node] = hostData + } + + for metric, nodeMetricData := range nodeData { + metricData, ok := hostData[metric] + if !ok { + metricData = make(map[schema.MetricScope]*schema.JobMetric) + data[node][metric] = metricData + } + + data[node][metric][schema.MetricScopeNode] = nodeMetricData[0] // Only Node Scope Returned from loadNodeData + } + } + + return data, totalNodes, hasNextPage, nil } From 9ed64e0388eb948bd8ad3b1dc07f14feb771a367 Mon Sep 17 00:00:00 2001 From: Christoph Kluge <christoph.kluge@fau.de> Date: Mon, 17 Mar 2025 17:39:17 +0100 Subject: [PATCH 5/5] Review logging, comment cleanup --- internal/graph/schema.resolvers.go | 12 ++--- internal/metricdata/cc-metric-store.go | 48 ++++++++----------- pkg/archive/archive.go | 14 +++--- .../src/job/statstab/StatsTable.svelte | 4 +- 4 files changed, 35 insertions(+), 43 deletions(-) diff --git a/internal/graph/schema.resolvers.go b/internal/graph/schema.resolvers.go index a470807..b5966c7 100644 --- a/internal/graph/schema.resolvers.go +++ b/internal/graph/schema.resolvers.go @@ -301,17 +301,17 @@ func (r *queryResolver) JobMetrics(ctx context.Context, id string, metrics []str return res, err } -// JobMetricStats is the resolver for the jobStats field. +// JobStats is the resolver for the jobStats field. func (r *queryResolver) JobStats(ctx context.Context, id string, metrics []string) ([]*model.JobStats, error) { job, err := r.Query().Job(ctx, id) if err != nil { - log.Warnf("Error while querying job %s for metrics", id) + log.Warnf("Error while querying job %s for metadata", id) return nil, err } data, err := metricDataDispatcher.LoadJobStats(job, metrics, ctx) if err != nil { - log.Warnf("Error while loading job stat data for job id %s", id) + log.Warnf("Error while loading jobStats data for job id %s", id) return nil, err } @@ -326,17 +326,17 @@ func (r *queryResolver) JobStats(ctx context.Context, id string, metrics []strin return res, err } -// JobStats is the resolver for the scopedJobStats field. +// ScopedJobStats is the resolver for the scopedJobStats field. func (r *queryResolver) ScopedJobStats(ctx context.Context, id string, metrics []string, scopes []schema.MetricScope) ([]*model.JobStatsWithScope, error) { job, err := r.Query().Job(ctx, id) if err != nil { - log.Warnf("Error while querying job %s for metrics", id) + log.Warnf("Error while querying job %s for metadata", id) return nil, err } data, err := metricDataDispatcher.LoadScopedJobStats(job, metrics, scopes, ctx) if err != nil { - log.Warnf("Error while loading scoped job stat data for job id %s", id) + log.Warnf("Error while loading scopedJobStats data for job id %s", id) return nil, err } diff --git a/internal/metricdata/cc-metric-store.go b/internal/metricdata/cc-metric-store.go index 6635299..9516e2b 100644 --- a/internal/metricdata/cc-metric-store.go +++ b/internal/metricdata/cc-metric-store.go @@ -129,13 +129,13 @@ func (ccms *CCMetricStore) doRequest( ) (*ApiQueryResponse, error) { buf := &bytes.Buffer{} if err := json.NewEncoder(buf).Encode(body); err != nil { - log.Warn("Error while encoding request body") + log.Errorf("Error while encoding request body: %s", err.Error()) return nil, err } req, err := http.NewRequestWithContext(ctx, http.MethodGet, ccms.queryEndpoint, buf) if err != nil { - log.Warn("Error while building request body") + log.Errorf("Error while building request body: %s", err.Error()) return nil, err } if ccms.jwt != "" { @@ -151,7 +151,7 @@ func (ccms *CCMetricStore) doRequest( res, err := ccms.client.Do(req) if err != nil { - log.Error("Error while performing request") + log.Errorf("Error while performing request: %s", err.Error()) return nil, err } @@ -161,7 +161,7 @@ func (ccms *CCMetricStore) doRequest( var resBody ApiQueryResponse if err := json.NewDecoder(bufio.NewReader(res.Body)).Decode(&resBody); err != nil { - log.Warn("Error while decoding result body") + log.Errorf("Error while decoding result body: %s", err.Error()) return nil, err } @@ -177,7 +177,7 @@ func (ccms *CCMetricStore) LoadData( ) (schema.JobData, error) { queries, assignedScope, err := ccms.buildQueries(job, metrics, scopes, resolution) if err != nil { - log.Warn("Error while building queries") + log.Errorf("Error while building queries for jobId %d, Metrics %v, Scopes %v: %s", job.JobID, metrics, scopes, err.Error()) return nil, err } @@ -192,7 +192,7 @@ func (ccms *CCMetricStore) LoadData( resBody, err := ccms.doRequest(ctx, &req) if err != nil { - log.Error("Error while performing request") + log.Errorf("Error while performing request: %s", err.Error()) return nil, err } @@ -557,16 +557,9 @@ func (ccms *CCMetricStore) LoadStats( ctx context.Context, ) (map[string]map[string]schema.MetricStatistics, error) { - // metricConfigs := archive.GetCluster(job.Cluster).MetricConfig - // resolution := 9000 - - // for _, mc := range metricConfigs { - // resolution = min(resolution, mc.Timestep) - // } - queries, _, err := ccms.buildQueries(job, metrics, []schema.MetricScope{schema.MetricScopeNode}, 0) // #166 Add scope shere for analysis view accelerator normalization? if err != nil { - log.Warn("Error while building query") + log.Errorf("Error while building queries for jobId %d, Metrics %v: %s", job.JobID, metrics, err.Error()) return nil, err } @@ -581,7 +574,7 @@ func (ccms *CCMetricStore) LoadStats( resBody, err := ccms.doRequest(ctx, &req) if err != nil { - log.Error("Error while performing request") + log.Errorf("Error while performing request: %s", err.Error()) return nil, err } @@ -591,9 +584,8 @@ func (ccms *CCMetricStore) LoadStats( metric := ccms.toLocalName(query.Metric) data := res[0] if data.Error != nil { - log.Infof("fetching %s for node %s failed: %s", metric, query.Hostname, *data.Error) + log.Errorf("fetching %s for node %s failed: %s", metric, query.Hostname, *data.Error) continue - // return nil, fmt.Errorf("METRICDATA/CCMS > fetching %s for node %s failed: %s", metric, query.Hostname, *data.Error) } metricdata, ok := stats[metric] @@ -603,9 +595,8 @@ func (ccms *CCMetricStore) LoadStats( } if data.Avg.IsNaN() || data.Min.IsNaN() || data.Max.IsNaN() { - log.Infof("fetching %s for node %s failed: one of avg/min/max is NaN", metric, query.Hostname) + log.Warnf("fetching %s for node %s failed: one of avg/min/max is NaN", metric, query.Hostname) continue - // return nil, fmt.Errorf("METRICDATA/CCMS > fetching %s for node %s failed: %s", metric, query.Hostname, "avg/min/max is NaN") } metricdata[query.Hostname] = schema.MetricStatistics{ @@ -618,7 +609,7 @@ func (ccms *CCMetricStore) LoadStats( return stats, nil } -// Scoped Stats: Basically Load Data without resolution and data query flag? +// Used for Job-View Statistics Table func (ccms *CCMetricStore) LoadScopedStats( job *schema.Job, metrics []string, @@ -627,7 +618,7 @@ func (ccms *CCMetricStore) LoadScopedStats( ) (schema.ScopedJobStats, error) { queries, assignedScope, err := ccms.buildQueries(job, metrics, scopes, 0) if err != nil { - log.Warn("Error while building queries") + log.Errorf("Error while building queries for jobId %d, Metrics %v, Scopes %v: %s", job.JobID, metrics, scopes, err.Error()) return nil, err } @@ -642,7 +633,7 @@ func (ccms *CCMetricStore) LoadScopedStats( resBody, err := ccms.doRequest(ctx, &req) if err != nil { - log.Error("Error while performing request") + log.Errorf("Error while performing request: %s", err.Error()) return nil, err } @@ -709,7 +700,7 @@ func (ccms *CCMetricStore) LoadScopedStats( return scopedJobStats, nil } -// TODO: Support sub-node-scope metrics! For this, the partition of a node needs to be known! - Todo Outdated with NodeListData? +// Used for Systems-View Node-Overview func (ccms *CCMetricStore) LoadNodeData( cluster string, metrics, nodes []string, @@ -743,7 +734,7 @@ func (ccms *CCMetricStore) LoadNodeData( resBody, err := ccms.doRequest(ctx, &req) if err != nil { - log.Error(fmt.Sprintf("Error while performing request %#v\n", err)) + log.Errorf("Error while performing request: %s", err.Error()) return nil, err } @@ -801,6 +792,7 @@ func (ccms *CCMetricStore) LoadNodeData( return data, nil } +// Used for Systems-View Node-List func (ccms *CCMetricStore) LoadNodeListData( cluster, subCluster, nodeFilter string, metrics []string, @@ -859,7 +851,7 @@ func (ccms *CCMetricStore) LoadNodeListData( queries, assignedScope, err := ccms.buildNodeQueries(cluster, subCluster, nodes, metrics, scopes, resolution) if err != nil { - log.Warn("Error while building queries") + log.Errorf("Error while building node queries for Cluster %s, SubCLuster %s, Metrics %v, Scopes %v: %s", cluster, subCluster, metrics, scopes, err.Error()) return nil, totalNodes, hasNextPage, err } @@ -874,7 +866,7 @@ func (ccms *CCMetricStore) LoadNodeListData( resBody, err := ccms.doRequest(ctx, &req) if err != nil { - log.Error(fmt.Sprintf("Error while performing request %#v\n", err)) + log.Errorf("Error while performing request: %s", err.Error()) return nil, totalNodes, hasNextPage, err } @@ -979,7 +971,7 @@ func (ccms *CCMetricStore) buildNodeQueries( if subCluster != "" { subClusterTopol, scterr = archive.GetSubCluster(cluster, subCluster) if scterr != nil { - // TODO: Log + log.Errorf("could not load cluster %s subCluster %s topology: %s", cluster, subCluster, scterr.Error()) return nil, nil, scterr } } @@ -989,7 +981,7 @@ func (ccms *CCMetricStore) buildNodeQueries( mc := archive.GetMetricConfig(cluster, metric) if mc == nil { // return nil, fmt.Errorf("METRICDATA/CCMS > metric '%s' is not specified for cluster '%s'", metric, cluster) - log.Infof("metric '%s' is not specified for cluster '%s'", metric, cluster) + log.Warnf("metric '%s' is not specified for cluster '%s'", metric, cluster) continue } diff --git a/pkg/archive/archive.go b/pkg/archive/archive.go index 002fd5e..cd457eb 100644 --- a/pkg/archive/archive.go +++ b/pkg/archive/archive.go @@ -89,7 +89,7 @@ func Init(rawConfig json.RawMessage, disableArchive bool) error { var version uint64 version, err = ar.Init(rawConfig) if err != nil { - log.Error("Error while initializing archiveBackend") + log.Errorf("Error while initializing archiveBackend: %s", err.Error()) return } log.Infof("Load archive version %d", version) @@ -112,7 +112,7 @@ func LoadAveragesFromArchive( ) error { metaFile, err := ar.LoadJobMeta(job) if err != nil { - log.Warn("Error while loading job metadata from archiveBackend") + log.Errorf("Error while loading job metadata from archiveBackend: %s", err.Error()) return err } @@ -135,7 +135,7 @@ func LoadStatsFromArchive( data := make(map[string]schema.MetricStatistics, len(metrics)) metaFile, err := ar.LoadJobMeta(job) if err != nil { - log.Warn("Error while loading job metadata from archiveBackend") + log.Errorf("Error while loading job metadata from archiveBackend: %s", err.Error()) return data, err } @@ -165,7 +165,7 @@ func LoadScopedStatsFromArchive( data, err := ar.LoadJobStats(job) if err != nil { - log.Warn("Error while loading job metadata from archiveBackend") + log.Errorf("Error while loading job stats from archiveBackend: %s", err.Error()) return nil, err } @@ -175,7 +175,7 @@ func LoadScopedStatsFromArchive( func GetStatistics(job *schema.Job) (map[string]schema.JobStatistics, error) { metaFile, err := ar.LoadJobMeta(job) if err != nil { - log.Warn("Error while loading job metadata from archiveBackend") + log.Errorf("Error while loading job metadata from archiveBackend: %s", err.Error()) return nil, err } @@ -191,7 +191,7 @@ func UpdateMetadata(job *schema.Job, metadata map[string]string) error { jobMeta, err := ar.LoadJobMeta(job) if err != nil { - log.Warn("Error while loading job metadata from archiveBackend") + log.Errorf("Error while loading job metadata from archiveBackend: %s", err.Error()) return err } @@ -211,7 +211,7 @@ func UpdateTags(job *schema.Job, tags []*schema.Tag) error { jobMeta, err := ar.LoadJobMeta(job) if err != nil { - log.Warn("Error while loading job metadata from archiveBackend") + log.Errorf("Error while loading job metadata from archiveBackend: %s", err.Error()) return err } diff --git a/web/frontend/src/job/statstab/StatsTable.svelte b/web/frontend/src/job/statstab/StatsTable.svelte index 4adb4bd..2ed2f28 100644 --- a/web/frontend/src/job/statstab/StatsTable.svelte +++ b/web/frontend/src/job/statstab/StatsTable.svelte @@ -2,8 +2,8 @@ @component Job-View subcomponent; display table of metric data statistics with selectable scopes Properties: - - `job Object`: The job object - - `clusters Object`: The clusters object + - `data Object`: The data object + - `selectedMetrics [String]`: The selected metrics - `hosts [String]`: The list of hostnames of this job -->