diff --git a/api/schema.graphqls b/api/schema.graphqls index 7f427af..eb3e270 100644 --- a/api/schema.graphqls +++ b/api/schema.graphqls @@ -167,7 +167,7 @@ type TimeWeights { } enum Aggregate { USER, PROJECT, CLUSTER } -enum Weights { NODE_COUNT, NODE_HOURS, CORE_COUNT, CORE_HOURS } +enum SortByAggregate { WALLTIME, NODEHOURS, COREHOURS, ACCHOURS } type NodeMetrics { host: String! @@ -198,7 +198,7 @@ type Query { jobsFootprints(filter: [JobFilter!], metrics: [String!]!): Footprints jobs(filter: [JobFilter!], page: PageRequest, order: OrderByInput): JobResultList! - jobsStatistics(filter: [JobFilter!], groupBy: Aggregate): [JobsStatistics!]! + jobsStatistics(filter: [JobFilter!], page: PageRequest, sortBy: SortByAggregate, groupBy: Aggregate): [JobsStatistics!]! rooflineHeatmap(filter: [JobFilter!]!, rows: Int!, cols: Int!, minX: Float!, minY: Float!, maxX: Float!, maxY: Float!): [[Float!]!]! diff --git a/internal/graph/generated/generated.go b/internal/graph/generated/generated.go index 7544148..355b25a 100644 --- a/internal/graph/generated/generated.go +++ b/internal/graph/generated/generated.go @@ -203,7 +203,7 @@ type ComplexityRoot struct { JobMetrics func(childComplexity int, id string, metrics []string, scopes []schema.MetricScope) int Jobs func(childComplexity int, filter []*model.JobFilter, page *model.PageRequest, order *model.OrderByInput) int JobsFootprints func(childComplexity int, filter []*model.JobFilter, metrics []string) int - JobsStatistics func(childComplexity int, filter []*model.JobFilter, groupBy *model.Aggregate) int + JobsStatistics func(childComplexity int, filter []*model.JobFilter, page *model.PageRequest, sortBy *model.SortByAggregate, groupBy *model.Aggregate) int NodeMetrics func(childComplexity int, cluster string, nodes []string, scopes []schema.MetricScope, metrics []string, from time.Time, to time.Time) int RooflineHeatmap func(childComplexity int, filter []*model.JobFilter, rows int, cols int, minX float64, minY float64, maxX float64, maxY float64) int Tags func(childComplexity int) int @@ -317,7 +317,7 @@ type QueryResolver interface { JobMetrics(ctx context.Context, id string, metrics []string, scopes []schema.MetricScope) ([]*model.JobMetricWithName, error) JobsFootprints(ctx context.Context, filter []*model.JobFilter, metrics []string) (*model.Footprints, error) Jobs(ctx context.Context, filter []*model.JobFilter, page *model.PageRequest, order *model.OrderByInput) (*model.JobResultList, error) - JobsStatistics(ctx context.Context, filter []*model.JobFilter, groupBy *model.Aggregate) ([]*model.JobsStatistics, error) + JobsStatistics(ctx context.Context, filter []*model.JobFilter, page *model.PageRequest, sortBy *model.SortByAggregate, groupBy *model.Aggregate) ([]*model.JobsStatistics, error) RooflineHeatmap(ctx context.Context, filter []*model.JobFilter, rows int, cols int, minX float64, minY float64, maxX float64, maxY float64) ([][]float64, error) NodeMetrics(ctx context.Context, cluster string, nodes []string, scopes []schema.MetricScope, metrics []string, from time.Time, to time.Time) ([]*model.NodeMetrics, error) } @@ -1072,7 +1072,7 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in return 0, false } - return e.complexity.Query.JobsStatistics(childComplexity, args["filter"].([]*model.JobFilter), args["groupBy"].(*model.Aggregate)), true + return e.complexity.Query.JobsStatistics(childComplexity, args["filter"].([]*model.JobFilter), args["page"].(*model.PageRequest), args["sortBy"].(*model.SortByAggregate), args["groupBy"].(*model.Aggregate)), true case "Query.nodeMetrics": if e.complexity.Query.NodeMetrics == nil { @@ -1727,7 +1727,7 @@ type TimeWeights { } enum Aggregate { USER, PROJECT, CLUSTER } -enum Weights { NODE_COUNT, NODE_HOURS, CORE_COUNT, CORE_HOURS } +enum SortByAggregate { WALLTIME, NODEHOURS, COREHOURS, ACCHOURS } type NodeMetrics { host: String! @@ -1758,7 +1758,7 @@ type Query { jobsFootprints(filter: [JobFilter!], metrics: [String!]!): Footprints jobs(filter: [JobFilter!], page: PageRequest, order: OrderByInput): JobResultList! - jobsStatistics(filter: [JobFilter!], groupBy: Aggregate): [JobsStatistics!]! + jobsStatistics(filter: [JobFilter!], page: PageRequest, sortBy: SortByAggregate, groupBy: Aggregate): [JobsStatistics!]! rooflineHeatmap(filter: [JobFilter!]!, rows: Int!, cols: Int!, minX: Float!, minY: Float!, maxX: Float!, maxY: Float!): [[Float!]!]! @@ -2097,15 +2097,33 @@ func (ec *executionContext) field_Query_jobsStatistics_args(ctx context.Context, } } args["filter"] = arg0 - var arg1 *model.Aggregate - if tmp, ok := rawArgs["groupBy"]; ok { - ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("groupBy")) - arg1, err = ec.unmarshalOAggregate2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋinternalᚋgraphᚋmodelᚐAggregate(ctx, tmp) + var arg1 *model.PageRequest + if tmp, ok := rawArgs["page"]; ok { + ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("page")) + arg1, err = ec.unmarshalOPageRequest2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋinternalᚋgraphᚋmodelᚐPageRequest(ctx, tmp) if err != nil { return nil, err } } - args["groupBy"] = arg1 + args["page"] = arg1 + var arg2 *model.SortByAggregate + if tmp, ok := rawArgs["sortBy"]; ok { + ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("sortBy")) + arg2, err = ec.unmarshalOSortByAggregate2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋinternalᚋgraphᚋmodelᚐSortByAggregate(ctx, tmp) + if err != nil { + return nil, err + } + } + args["sortBy"] = arg2 + var arg3 *model.Aggregate + if tmp, ok := rawArgs["groupBy"]; ok { + ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("groupBy")) + arg3, err = ec.unmarshalOAggregate2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋinternalᚋgraphᚋmodelᚐAggregate(ctx, tmp) + if err != nil { + return nil, err + } + } + args["groupBy"] = arg3 return args, nil } @@ -7079,7 +7097,7 @@ func (ec *executionContext) _Query_jobsStatistics(ctx context.Context, field gra }() resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { ctx = rctx // use context from middleware stack in children - return ec.resolvers.Query().JobsStatistics(rctx, fc.Args["filter"].([]*model.JobFilter), fc.Args["groupBy"].(*model.Aggregate)) + return ec.resolvers.Query().JobsStatistics(rctx, fc.Args["filter"].([]*model.JobFilter), fc.Args["page"].(*model.PageRequest), fc.Args["sortBy"].(*model.SortByAggregate), fc.Args["groupBy"].(*model.Aggregate)) }) if err != nil { ec.Error(ctx, err) @@ -16140,6 +16158,22 @@ func (ec *executionContext) marshalOSeries2ᚕgithubᚗcomᚋClusterCockpitᚋcc return ret } +func (ec *executionContext) unmarshalOSortByAggregate2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋinternalᚋgraphᚋmodelᚐSortByAggregate(ctx context.Context, v interface{}) (*model.SortByAggregate, error) { + if v == nil { + return nil, nil + } + var res = new(model.SortByAggregate) + err := res.UnmarshalGQL(v) + return res, graphql.ErrorOnPath(ctx, err) +} + +func (ec *executionContext) marshalOSortByAggregate2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋinternalᚋgraphᚋmodelᚐSortByAggregate(ctx context.Context, sel ast.SelectionSet, v *model.SortByAggregate) graphql.Marshaler { + if v == nil { + return graphql.Null + } + return v +} + func (ec *executionContext) marshalOStatsSeries2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋpkgᚋschemaᚐStatsSeries(ctx context.Context, sel ast.SelectionSet, v *schema.StatsSeries) graphql.Marshaler { if v == nil { return graphql.Null diff --git a/internal/graph/model/models_gen.go b/internal/graph/model/models_gen.go index faffae7..609e6a4 100644 --- a/internal/graph/model/models_gen.go +++ b/internal/graph/model/models_gen.go @@ -188,6 +188,51 @@ func (e Aggregate) MarshalGQL(w io.Writer) { fmt.Fprint(w, strconv.Quote(e.String())) } +type SortByAggregate string + +const ( + SortByAggregateWalltime SortByAggregate = "WALLTIME" + SortByAggregateNodehours SortByAggregate = "NODEHOURS" + SortByAggregateCorehours SortByAggregate = "COREHOURS" + SortByAggregateAcchours SortByAggregate = "ACCHOURS" +) + +var AllSortByAggregate = []SortByAggregate{ + SortByAggregateWalltime, + SortByAggregateNodehours, + SortByAggregateCorehours, + SortByAggregateAcchours, +} + +func (e SortByAggregate) IsValid() bool { + switch e { + case SortByAggregateWalltime, SortByAggregateNodehours, SortByAggregateCorehours, SortByAggregateAcchours: + return true + } + return false +} + +func (e SortByAggregate) String() string { + return string(e) +} + +func (e *SortByAggregate) UnmarshalGQL(v interface{}) error { + str, ok := v.(string) + if !ok { + return fmt.Errorf("enums must be strings") + } + + *e = SortByAggregate(str) + if !e.IsValid() { + return fmt.Errorf("%s is not a valid SortByAggregate", str) + } + return nil +} + +func (e SortByAggregate) MarshalGQL(w io.Writer) { + fmt.Fprint(w, strconv.Quote(e.String())) +} + type SortDirectionEnum string const ( @@ -228,48 +273,3 @@ func (e *SortDirectionEnum) UnmarshalGQL(v interface{}) error { func (e SortDirectionEnum) MarshalGQL(w io.Writer) { fmt.Fprint(w, strconv.Quote(e.String())) } - -type Weights string - -const ( - WeightsNodeCount Weights = "NODE_COUNT" - WeightsNodeHours Weights = "NODE_HOURS" - WeightsCoreCount Weights = "CORE_COUNT" - WeightsCoreHours Weights = "CORE_HOURS" -) - -var AllWeights = []Weights{ - WeightsNodeCount, - WeightsNodeHours, - WeightsCoreCount, - WeightsCoreHours, -} - -func (e Weights) IsValid() bool { - switch e { - case WeightsNodeCount, WeightsNodeHours, WeightsCoreCount, WeightsCoreHours: - return true - } - return false -} - -func (e Weights) String() string { - return string(e) -} - -func (e *Weights) UnmarshalGQL(v interface{}) error { - str, ok := v.(string) - if !ok { - return fmt.Errorf("enums must be strings") - } - - *e = Weights(str) - if !e.IsValid() { - return fmt.Errorf("%s is not a valid Weights", str) - } - return nil -} - -func (e Weights) MarshalGQL(w io.Writer) { - fmt.Fprint(w, strconv.Quote(e.String())) -} diff --git a/internal/graph/schema.resolvers.go b/internal/graph/schema.resolvers.go index f2a3c65..83aec04 100644 --- a/internal/graph/schema.resolvers.go +++ b/internal/graph/schema.resolvers.go @@ -244,7 +244,7 @@ func (r *queryResolver) Jobs(ctx context.Context, filter []*model.JobFilter, pag } // JobsStatistics is the resolver for the jobsStatistics field. -func (r *queryResolver) JobsStatistics(ctx context.Context, filter []*model.JobFilter, groupBy *model.Aggregate) ([]*model.JobsStatistics, error) { +func (r *queryResolver) JobsStatistics(ctx context.Context, filter []*model.JobFilter, page *model.PageRequest, sortBy *model.SortByAggregate, groupBy *model.Aggregate) ([]*model.JobsStatistics, error) { var err error var stats []*model.JobsStatistics @@ -252,7 +252,7 @@ func (r *queryResolver) JobsStatistics(ctx context.Context, filter []*model.JobF if groupBy == nil { stats, err = r.Repo.JobsStats(ctx, filter) } else { - stats, err = r.Repo.JobsStatsGrouped(ctx, filter, groupBy) + stats, err = r.Repo.JobsStatsGrouped(ctx, filter, page, sortBy, groupBy) } } else { stats = make([]*model.JobsStatistics, 0, 1) diff --git a/internal/repository/stats.go b/internal/repository/stats.go index ee48ee1..18d495b 100644 --- a/internal/repository/stats.go +++ b/internal/repository/stats.go @@ -23,6 +23,13 @@ var groupBy2column = map[model.Aggregate]string{ model.AggregateCluster: "job.cluster", } +var sortBy2column = map[model.SortByAggregate]string{ + model.SortByAggregateWalltime: "totalWalltime", + model.SortByAggregateNodehours: "totalNodeHours", + model.SortByAggregateCorehours: "totalCoreHours", + model.SortByAggregateAcchours: "totalAccHours", +} + func (r *JobRepository) buildCountQuery( filter []*model.JobFilter, kind string, @@ -62,11 +69,12 @@ func (r *JobRepository) buildStatsQuery( if col != "" { // Scan columns: id, totalJobs, totalWalltime, totalNodeHours, totalCoreHours, totalAccHours query = sq.Select(col, "COUNT(job.id)", - fmt.Sprintf("CAST(ROUND(SUM(job.duration) / 3600) as %s)", castType), - fmt.Sprintf("CAST(ROUND(SUM(job.duration * job.num_nodes) / 3600) as %s)", castType), - fmt.Sprintf("CAST(ROUND(SUM(job.duration * job.num_hwthreads) / 3600) as %s)", castType), - fmt.Sprintf("CAST(ROUND(SUM(job.duration * job.num_acc) / 3600) as %s)", castType), + fmt.Sprintf("CAST(ROUND(SUM(job.duration) / 3600) as %s) as totalWalltime", castType), + fmt.Sprintf("CAST(ROUND(SUM(job.duration * job.num_nodes) / 3600) as %s) as totalNodeHours", castType), + fmt.Sprintf("CAST(ROUND(SUM(job.duration * job.num_hwthreads) / 3600) as %s) as totalCoreHours", castType), + fmt.Sprintf("CAST(ROUND(SUM(job.duration * job.num_acc) / 3600) as %s) as totalAccHours", castType), ).From("job").GroupBy(col) + } else { // Scan columns: totalJobs, totalWalltime, totalNodeHours, totalCoreHours, totalAccHours query = sq.Select("COUNT(job.id)", @@ -112,16 +120,28 @@ func (r *JobRepository) getCastType() string { func (r *JobRepository) JobsStatsGrouped( ctx context.Context, filter []*model.JobFilter, + page *model.PageRequest, + sortBy *model.SortByAggregate, groupBy *model.Aggregate) ([]*model.JobsStatistics, error) { start := time.Now() col := groupBy2column[*groupBy] query := r.buildStatsQuery(filter, col) + query, err := SecurityCheck(ctx, query) if err != nil { return nil, err } + if sortBy != nil { + sortBy := sortBy2column[*sortBy] + query = query.OrderBy(fmt.Sprintf("%s DESC", sortBy)) + } + if page != nil && page.ItemsPerPage != -1 { + limit := uint64(page.ItemsPerPage) + query = query.Offset((uint64(page.Page) - 1) * limit).Limit(limit) + } + rows, err := query.RunWith(r.DB).Query() if err != nil { log.Warn("Error while querying DB for job statistics") @@ -174,17 +194,10 @@ func (r *JobRepository) JobsStatsGrouped( return stats, nil } -func (r *JobRepository) JobsStats( - ctx context.Context, +func (r *JobRepository) jobsStats( + query sq.SelectBuilder, filter []*model.JobFilter) ([]*model.JobsStatistics, error) { - start := time.Now() - query := r.buildStatsQuery(filter, "") - query, err := SecurityCheck(ctx, query) - if err != nil { - return nil, err - } - row := query.RunWith(r.DB).QueryRow() stats := make([]*model.JobsStatistics, 0, 1) @@ -211,10 +224,31 @@ func (r *JobRepository) JobsStats( TotalAccHours: totalAccHours}) } - log.Debugf("Timer JobStats %s", time.Since(start)) return stats, nil } +func (r *JobRepository) testJobsStats( + filter []*model.JobFilter) ([]*model.JobsStatistics, error) { + + query := r.buildStatsQuery(filter, "") + return r.jobsStats(query, filter) +} + +func (r *JobRepository) JobsStats( + ctx context.Context, + filter []*model.JobFilter) ([]*model.JobsStatistics, error) { + + start := time.Now() + query := r.buildStatsQuery(filter, "") + query, err := SecurityCheck(ctx, query) + if err != nil { + return nil, err + } + + log.Debugf("Timer JobStats %s", time.Since(start)) + return r.jobsStats(query, filter) +} + func (r *JobRepository) JobCountGrouped( ctx context.Context, filter []*model.JobFilter, diff --git a/internal/repository/stats_test.go b/internal/repository/stats_test.go index b1a815e..2672b3f 100644 --- a/internal/repository/stats_test.go +++ b/internal/repository/stats_test.go @@ -7,6 +7,8 @@ package repository import ( "fmt" "testing" + + "github.com/ClusterCockpit/cc-backend/internal/graph/model" ) func TestBuildJobStatsQuery(t *testing.T) { @@ -19,3 +21,17 @@ func TestBuildJobStatsQuery(t *testing.T) { fmt.Printf("SQL: %s\n", sql) } + +func TestJobStats(t *testing.T) { + r := setup(t) + + filter := &model.JobFilter{} + var err error + var stats []*model.JobsStatistics + stats, err = r.testJobsStats([]*model.JobFilter{filter}) + noErr(t, err) + + if stats[0].TotalJobs != 98 { + t.Fatalf("Want 98, Got %d", stats[0].TotalJobs) + } +}