From 5ba6f0ed3aecb82ba3beb5dcd7b05006b0b139e4 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Fri, 9 Jun 2023 09:09:41 +0200 Subject: [PATCH] Refactor and adapt to new API --- api/schema.graphqls | 7 +- internal/graph/generated/generated.go | 128 ++++++++++++++- internal/graph/model/models_gen.go | 2 + internal/graph/schema.resolvers.go | 64 ++------ internal/graph/{stats.go => util.go} | 13 ++ internal/repository/stats.go | 223 ++++++-------------------- internal/routerConfig/routes.go | 42 +---- pkg/schema/job.go | 17 +- web/templates/home.tmpl | 21 ++- 9 files changed, 239 insertions(+), 278 deletions(-) rename internal/graph/{stats.go => util.go} (95%) diff --git a/api/schema.graphqls b/api/schema.graphqls index b7c16ce..71a5373 100644 --- a/api/schema.graphqls +++ b/api/schema.graphqls @@ -286,10 +286,13 @@ type HistoPoint { type JobsStatistics { id: ID! # If `groupBy` was used, ID of the user/project/cluster name: String! # if User-Statistics: Given Name of Account (ID) Owner - totalJobs: Int! # Number of jobs that matched - shortJobs: Int! # Number of jobs with a duration of less than 2 minutes + totalJobs: Int! # Number of jobs + runningJobs: Int! # Number of running jobs + shortJobs: Int! # Number of jobs with a duration of less than duration totalWalltime: Int! # Sum of the duration of all matched jobs in hours + totalNodeHours: Int! # Sum of the node hours of all matched jobs totalCoreHours: Int! # Sum of the core hours of all matched jobs + totalAccHours: Int! # Sum of the gpu hours of all matched jobs histDuration: [HistoPoint!]! # value: hour, count: number of jobs with a rounded duration of value histNumNodes: [HistoPoint!]! # value: number of nodes, count: number of jobs with that number of nodes } diff --git a/internal/graph/generated/generated.go b/internal/graph/generated/generated.go index 907526e..0ef1fcb 100644 --- a/internal/graph/generated/generated.go +++ b/internal/graph/generated/generated.go @@ -143,9 +143,11 @@ type ComplexityRoot struct { HistNumNodes func(childComplexity int) int ID func(childComplexity int) int Name func(childComplexity int) int + RunningJobs func(childComplexity int) int ShortJobs func(childComplexity int) int TotalCoreHours func(childComplexity int) int TotalJobs func(childComplexity int) int + TotalNodeHours func(childComplexity int) int TotalWalltime func(childComplexity int) int } @@ -731,6 +733,13 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in return e.complexity.JobsStatistics.Name(childComplexity), true + case "JobsStatistics.runningJobs": + if e.complexity.JobsStatistics.RunningJobs == nil { + break + } + + return e.complexity.JobsStatistics.RunningJobs(childComplexity), true + case "JobsStatistics.shortJobs": if e.complexity.JobsStatistics.ShortJobs == nil { break @@ -752,6 +761,13 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in return e.complexity.JobsStatistics.TotalJobs(childComplexity), true + case "JobsStatistics.totalNodeHours": + if e.complexity.JobsStatistics.TotalNodeHours == nil { + break + } + + return e.complexity.JobsStatistics.TotalNodeHours(childComplexity), true + case "JobsStatistics.totalWalltime": if e.complexity.JobsStatistics.TotalWalltime == nil { break @@ -1764,9 +1780,11 @@ type HistoPoint { type JobsStatistics { id: ID! # If ` + "`" + `groupBy` + "`" + ` was used, ID of the user/project/cluster name: String! # if User-Statistics: Given Name of Account (ID) Owner - totalJobs: Int! # Number of jobs that matched - shortJobs: Int! # Number of jobs with a duration of less than 2 minutes + totalJobs: Int! # Number of jobs + runningJobs: Int! # Number of running jobs + shortJobs: Int! # Number of jobs with a duration of less than duration totalWalltime: Int! # Sum of the duration of all matched jobs in hours + totalNodeHours: Int! # Sum of the node hours of all matched jobs totalCoreHours: Int! # Sum of the core hours of all matched jobs histDuration: [HistoPoint!]! # value: hour, count: number of jobs with a rounded duration of value histNumNodes: [HistoPoint!]! # value: number of nodes, count: number of jobs with that number of nodes @@ -4884,6 +4902,50 @@ func (ec *executionContext) fieldContext_JobsStatistics_totalJobs(ctx context.Co return fc, nil } +func (ec *executionContext) _JobsStatistics_runningJobs(ctx context.Context, field graphql.CollectedField, obj *model.JobsStatistics) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_JobsStatistics_runningJobs(ctx, field) + if err != nil { + return graphql.Null + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return obj.RunningJobs, nil + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + if !graphql.HasFieldError(ctx, fc) { + ec.Errorf(ctx, "must not be null") + } + return graphql.Null + } + res := resTmp.(int) + fc.Result = res + return ec.marshalNInt2int(ctx, field.Selections, res) +} + +func (ec *executionContext) fieldContext_JobsStatistics_runningJobs(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "JobsStatistics", + Field: field, + IsMethod: false, + IsResolver: false, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + return nil, errors.New("field of type Int does not have child fields") + }, + } + return fc, nil +} + func (ec *executionContext) _JobsStatistics_shortJobs(ctx context.Context, field graphql.CollectedField, obj *model.JobsStatistics) (ret graphql.Marshaler) { fc, err := ec.fieldContext_JobsStatistics_shortJobs(ctx, field) if err != nil { @@ -4972,6 +5034,50 @@ func (ec *executionContext) fieldContext_JobsStatistics_totalWalltime(ctx contex return fc, nil } +func (ec *executionContext) _JobsStatistics_totalNodeHours(ctx context.Context, field graphql.CollectedField, obj *model.JobsStatistics) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_JobsStatistics_totalNodeHours(ctx, field) + if err != nil { + return graphql.Null + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return obj.TotalNodeHours, nil + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + if !graphql.HasFieldError(ctx, fc) { + ec.Errorf(ctx, "must not be null") + } + return graphql.Null + } + res := resTmp.(int) + fc.Result = res + return ec.marshalNInt2int(ctx, field.Selections, res) +} + +func (ec *executionContext) fieldContext_JobsStatistics_totalNodeHours(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "JobsStatistics", + Field: field, + IsMethod: false, + IsResolver: false, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + return nil, errors.New("field of type Int does not have child fields") + }, + } + return fc, nil +} + func (ec *executionContext) _JobsStatistics_totalCoreHours(ctx context.Context, field graphql.CollectedField, obj *model.JobsStatistics) (ret graphql.Marshaler) { fc, err := ec.fieldContext_JobsStatistics_totalCoreHours(ctx, field) if err != nil { @@ -6867,10 +6973,14 @@ func (ec *executionContext) fieldContext_Query_jobsStatistics(ctx context.Contex return ec.fieldContext_JobsStatistics_name(ctx, field) case "totalJobs": return ec.fieldContext_JobsStatistics_totalJobs(ctx, field) + case "runningJobs": + return ec.fieldContext_JobsStatistics_runningJobs(ctx, field) case "shortJobs": return ec.fieldContext_JobsStatistics_shortJobs(ctx, field) case "totalWalltime": return ec.fieldContext_JobsStatistics_totalWalltime(ctx, field) + case "totalNodeHours": + return ec.fieldContext_JobsStatistics_totalNodeHours(ctx, field) case "totalCoreHours": return ec.fieldContext_JobsStatistics_totalCoreHours(ctx, field) case "histDuration": @@ -12062,6 +12172,13 @@ func (ec *executionContext) _JobsStatistics(ctx context.Context, sel ast.Selecti out.Values[i] = ec._JobsStatistics_totalJobs(ctx, field, obj) + if out.Values[i] == graphql.Null { + invalids++ + } + case "runningJobs": + + out.Values[i] = ec._JobsStatistics_runningJobs(ctx, field, obj) + if out.Values[i] == graphql.Null { invalids++ } @@ -12076,6 +12193,13 @@ func (ec *executionContext) _JobsStatistics(ctx context.Context, sel ast.Selecti out.Values[i] = ec._JobsStatistics_totalWalltime(ctx, field, obj) + if out.Values[i] == graphql.Null { + invalids++ + } + case "totalNodeHours": + + out.Values[i] = ec._JobsStatistics_totalNodeHours(ctx, field, obj) + if out.Values[i] == graphql.Null { invalids++ } diff --git a/internal/graph/model/models_gen.go b/internal/graph/model/models_gen.go index 552204e..4538098 100644 --- a/internal/graph/model/models_gen.go +++ b/internal/graph/model/models_gen.go @@ -90,8 +90,10 @@ type JobsStatistics struct { ID string `json:"id"` Name string `json:"name"` TotalJobs int `json:"totalJobs"` + RunningJobs int `json:"runningJobs"` ShortJobs int `json:"shortJobs"` TotalWalltime int `json:"totalWalltime"` + TotalNodeHours int `json:"totalNodeHours"` TotalCoreHours int `json:"totalCoreHours"` HistDuration []*HistoPoint `json:"histDuration"` HistNumNodes []*HistoPoint `json:"histNumNodes"` diff --git a/internal/graph/schema.resolvers.go b/internal/graph/schema.resolvers.go index 91deb6d..de81bf7 100644 --- a/internal/graph/schema.resolvers.go +++ b/internal/graph/schema.resolvers.go @@ -11,7 +11,6 @@ import ( "strconv" "time" - "github.com/99designs/gqlgen/graphql" "github.com/ClusterCockpit/cc-backend/internal/auth" "github.com/ClusterCockpit/cc-backend/internal/graph/generated" "github.com/ClusterCockpit/cc-backend/internal/graph/model" @@ -234,20 +233,12 @@ func (r *queryResolver) JobMetrics(ctx context.Context, id string, metrics []str } // JobsFootprints is the resolver for the jobsFootprints field. -func (r *queryResolver) JobsFootprints( - ctx context.Context, - filter []*model.JobFilter, - metrics []string) (*model.Footprints, error) { - +func (r *queryResolver) JobsFootprints(ctx context.Context, filter []*model.JobFilter, metrics []string) (*model.Footprints, error) { return r.jobsFootprints(ctx, filter, metrics) } // Jobs is the resolver for the jobs field. -func (r *queryResolver) Jobs( - ctx context.Context, - filter []*model.JobFilter, - page *model.PageRequest, - order *model.OrderByInput) (*model.JobResultList, error) { +func (r *queryResolver) Jobs(ctx context.Context, filter []*model.JobFilter, page *model.PageRequest, order *model.OrderByInput) (*model.JobResultList, error) { if page == nil { page = &model.PageRequest{ ItemsPerPage: 50, @@ -276,23 +267,26 @@ func (r *queryResolver) JobsStatistics(ctx context.Context, filter []*model.JobF var stats []*model.JobsStatistics if requireField(ctx, "totalJobs") { - if requireField(ctx, "totalCoreHours") { - if groupBy == nil { - stats, err = r.Repo.JobsStatsPlain(ctx, filter) - } else { - stats, err = r.Repo.JobsStats(ctx, filter, groupBy) - } + if groupBy == nil { + stats, err = r.Repo.JobsStats(ctx, filter) } else { - if groupBy == nil { - stats, err = r.Repo.JobsStatsPlainNoCoreH(ctx, filter) - } else { - stats, err = r.Repo.JobsStatsNoCoreH(ctx, filter, groupBy) - } + stats, err = r.Repo.JobsStatsGrouped(ctx, filter, groupBy) } } else { stats = make([]*model.JobsStatistics, 0, 1) } + if groupBy != nil { + if requireField(ctx, "shortJobs") { + stats, err = r.Repo.AddJobCountGrouped(ctx, filter, groupBy, stats, "short") + } + if requireField(ctx, "RunningJobs") { + stats, err = r.Repo.AddJobCountGrouped(ctx, filter, groupBy, stats, "running") + } + } else { + return nil, errors.New("Job counts only implemented with groupBy argument") + } + if err != nil { return nil, err } @@ -312,13 +306,7 @@ func (r *queryResolver) JobsStatistics(ctx context.Context, filter []*model.JobF } // JobsCount is the resolver for the jobsCount field. -func (r *queryResolver) JobsCount( - ctx context.Context, - filter []*model.JobFilter, - groupBy model.Aggregate, - weight *model.Weights, - limit *int) ([]*model.Count, error) { - +func (r *queryResolver) JobsCount(ctx context.Context, filter []*model.JobFilter, groupBy model.Aggregate, weight *model.Weights, limit *int) ([]*model.Count, error) { counts, err := r.Repo.CountGroupedJobs(ctx, groupBy, filter, weight, limit) if err != nil { log.Warn("Error while counting grouped jobs") @@ -412,21 +400,3 @@ type jobResolver struct{ *Resolver } type mutationResolver struct{ *Resolver } type queryResolver struct{ *Resolver } type subClusterResolver struct{ *Resolver } - -// !!! WARNING !!! -// The code below was going to be deleted when updating resolvers. It has been copied here so you have -// one last chance to move it out of harms way if you want. There are two reasons this happens: -// - When renaming or deleting a resolver the old code will be put in here. You can safely delete -// it when you're done. -// - You have helper methods in this file. Move them out to keep these resolver files clean. -func requireField(ctx context.Context, name string) bool { - fields := graphql.CollectAllFields(ctx) - - for _, f := range fields { - if f == name { - return true - } - } - - return false -} diff --git a/internal/graph/stats.go b/internal/graph/util.go similarity index 95% rename from internal/graph/stats.go rename to internal/graph/util.go index 46aac11..c9423e1 100644 --- a/internal/graph/stats.go +++ b/internal/graph/util.go @@ -10,6 +10,7 @@ import ( "fmt" "math" + "github.com/99designs/gqlgen/graphql" "github.com/ClusterCockpit/cc-backend/internal/graph/model" "github.com/ClusterCockpit/cc-backend/internal/metricdata" "github.com/ClusterCockpit/cc-backend/pkg/log" @@ -132,3 +133,15 @@ func (r *queryResolver) jobsFootprints(ctx context.Context, filter []*model.JobF Metrics: res, }, nil } + +func requireField(ctx context.Context, name string) bool { + fields := graphql.CollectAllFields(ctx) + + for _, f := range fields { + if f == name { + return true + } + } + + return false +} diff --git a/internal/repository/stats.go b/internal/repository/stats.go index 2197a79..a54c587 100644 --- a/internal/repository/stats.go +++ b/internal/repository/stats.go @@ -13,7 +13,6 @@ import ( "github.com/ClusterCockpit/cc-backend/internal/auth" "github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-backend/internal/graph/model" - "github.com/ClusterCockpit/cc-backend/pkg/archive" "github.com/ClusterCockpit/cc-backend/pkg/log" sq "github.com/Masterminds/squirrel" ) @@ -62,14 +61,18 @@ func (r *JobRepository) buildStatsQuery( castType := r.getCastType() if col != "" { - // Scan columns: id, totalJobs, totalWalltime + // Scan columns: id, totalJobs, totalWalltime, totalNodeHours, totalCoreHours query = sq.Select(col, "COUNT(job.id)", fmt.Sprintf("CAST(ROUND(SUM(job.duration) / 3600) as %s)", castType), + fmt.Sprintf("CAST(ROUND(SUM(job.duration * job.num_nodes) / 3600) as %s)", castType), + fmt.Sprintf("CAST(ROUND(SUM(job.duration * job.num_hwthreads) / 3600) as %s)", castType), ).From("job").GroupBy(col) } else { - // Scan columns: totalJobs, totalWalltime + // Scan columns: totalJobs, totalWalltime, totalNodeHours, totalCoreHours query = sq.Select("COUNT(job.id)", fmt.Sprintf("CAST(ROUND(SUM(job.duration) / 3600) as %s)", castType), + fmt.Sprintf("CAST(ROUND(SUM(job.duration * job.num_nodes) / 3600) as %s)", castType), + fmt.Sprintf("CAST(ROUND(SUM(job.duration * job.num_hwthreads) / 3600) as %s)", castType), ).From("job") } @@ -105,8 +108,7 @@ func (r *JobRepository) getCastType() string { return castType } -// with groupBy and without coreHours -func (r *JobRepository) JobsStatsNoCoreH( +func (r *JobRepository) JobsStatsGrouped( ctx context.Context, filter []*model.JobFilter, groupBy *model.Aggregate) ([]*model.JobsStatistics, error) { @@ -129,8 +131,8 @@ func (r *JobRepository) JobsStatsNoCoreH( for rows.Next() { var id sql.NullString - var jobs, walltime sql.NullInt64 - if err := rows.Scan(&id, &jobs, &walltime); err != nil { + var jobs, walltime, nodeHours, coreHours sql.NullInt64 + if err := rows.Scan(&id, &jobs, &walltime, &nodeHours, &coreHours); err != nil { log.Warn("Error while scanning rows") return nil, err } @@ -141,7 +143,7 @@ func (r *JobRepository) JobsStatsNoCoreH( stats = append(stats, &model.JobsStatistics{ ID: id.String, - Name: &name, + Name: name, TotalJobs: int(jobs.Int64), TotalWalltime: int(walltime.Int64)}) } else { @@ -158,8 +160,7 @@ func (r *JobRepository) JobsStatsNoCoreH( return stats, nil } -// without groupBy and without coreHours -func (r *JobRepository) JobsStatsPlainNoCoreH( +func (r *JobRepository) JobsStats( ctx context.Context, filter []*model.JobFilter) ([]*model.JobsStatistics, error) { @@ -172,175 +173,32 @@ func (r *JobRepository) JobsStatsPlainNoCoreH( row := query.RunWith(r.DB).QueryRow() stats := make([]*model.JobsStatistics, 0, 1) - var jobs, walltime sql.NullInt64 - if err := row.Scan(&jobs, &walltime); err != nil { + + var jobs, walltime, nodeHours, coreHours sql.NullInt64 + if err := row.Scan(&jobs, &walltime, &nodeHours, &coreHours); err != nil { log.Warn("Error while scanning rows") return nil, err } if jobs.Valid { - query := r.buildCountQuery(filter, "short", "") - query, err := SecurityCheck(ctx, query) - if err != nil { - return nil, err - } - var cnt sql.NullInt64 - if err := query.RunWith(r.DB).QueryRow().Scan(&cnt); err != nil { - log.Warn("Error while scanning rows") - return nil, err - } stats = append(stats, &model.JobsStatistics{ TotalJobs: int(jobs.Int64), - TotalWalltime: int(walltime.Int64), - ShortJobs: int(cnt.Int64)}) + TotalWalltime: int(walltime.Int64)}) } log.Infof("Timer JobStatistics %s", time.Since(start)) return stats, nil } -// without groupBy and with coreHours -func (r *JobRepository) JobsStatsPlain( - ctx context.Context, - filter []*model.JobFilter) ([]*model.JobsStatistics, error) { - - start := time.Now() - query := r.buildStatsQuery(filter, "") - query, err := SecurityCheck(ctx, query) - if err != nil { - return nil, err - } - - castType := r.getCastType() - var totalJobs, totalWalltime, totalCoreHours int64 - - for _, cluster := range archive.Clusters { - for _, subcluster := range cluster.SubClusters { - - scQuery := query.Column(fmt.Sprintf( - "CAST(ROUND(SUM(job.duration * job.num_nodes * %d * %d) / 3600) as %s)", - subcluster.SocketsPerNode, subcluster.CoresPerSocket, castType)) - scQuery = scQuery.Where("job.cluster = ?", cluster.Name). - Where("job.subcluster = ?", subcluster.Name) - - row := scQuery.RunWith(r.DB).QueryRow() - var jobs, walltime, corehours sql.NullInt64 - if err := row.Scan(&jobs, &walltime, &corehours); err != nil { - log.Warn("Error while scanning rows") - return nil, err - } - - if jobs.Valid { - totalJobs += jobs.Int64 - totalWalltime += walltime.Int64 - totalCoreHours += corehours.Int64 - } - } - } - stats := make([]*model.JobsStatistics, 0, 1) - stats = append(stats, - &model.JobsStatistics{ - TotalJobs: int(totalJobs), - TotalWalltime: int(totalWalltime), - TotalCoreHours: int(totalCoreHours)}) - - log.Infof("Timer JobStatistics %s", time.Since(start)) - return stats, nil -} - -// with groupBy and with coreHours -func (r *JobRepository) JobsStats( +func (r *JobRepository) JobCountGrouped( ctx context.Context, filter []*model.JobFilter, groupBy *model.Aggregate) ([]*model.JobsStatistics, error) { start := time.Now() - - stats := map[string]*model.JobsStatistics{} col := groupBy2column[*groupBy] - query := r.buildStatsQuery(filter, col) - query, err := SecurityCheck(ctx, query) - if err != nil { - return nil, err - } - - castType := r.getCastType() - - for _, cluster := range archive.Clusters { - for _, subcluster := range cluster.SubClusters { - - scQuery := query.Column(fmt.Sprintf( - "CAST(ROUND(SUM(job.duration * job.num_nodes * %d * %d) / 3600) as %s)", - subcluster.SocketsPerNode, subcluster.CoresPerSocket, castType)) - - scQuery = scQuery.Where("job.cluster = ?", cluster.Name). - Where("job.subcluster = ?", subcluster.Name) - - rows, err := scQuery.RunWith(r.DB).Query() - if err != nil { - log.Warn("Error while querying DB for job statistics") - return nil, err - } - - for rows.Next() { - var id sql.NullString - var jobs, walltime, corehours sql.NullInt64 - if err := rows.Scan(&id, &jobs, &walltime, &corehours); err != nil { - log.Warn("Error while scanning rows") - return nil, err - } - - if s, ok := stats[id.String]; ok { - s.TotalJobs += int(jobs.Int64) - s.TotalWalltime += int(walltime.Int64) - s.TotalCoreHours += int(corehours.Int64) - } else { - if col == "job.user" { - name := r.getUserName(ctx, id.String) - stats[id.String] = &model.JobsStatistics{ - ID: id.String, - Name: &name, - TotalJobs: int(jobs.Int64), - TotalWalltime: int(walltime.Int64), - TotalCoreHours: int(corehours.Int64), - } - } else { - stats[id.String] = &model.JobsStatistics{ - ID: id.String, - TotalJobs: int(jobs.Int64), - TotalWalltime: int(walltime.Int64), - TotalCoreHours: int(corehours.Int64), - } - } - } - } - } - } - - res := make([]*model.JobsStatistics, 0, len(stats)) - for _, stat := range stats { - res = append(res, stat) - } - - log.Infof("Timer JobStatistics %s", time.Since(start)) - return res, nil -} - -type jobCountResult struct { - id string - shortJobs int - totalJobs int - runningJobs int -} - -func (r *JobRepository) JobCounts( - ctx context.Context, - filter []*model.JobFilter) ([]*model.JobsStatistics, error) { - - counts := make(map[string]jobCountResult) - start := time.Now() - query := r.buildCountQuery(filter, "short", "cluster") + query := r.buildCountQuery(filter, "", col) query, err := SecurityCheck(ctx, query) if err != nil { return nil, err @@ -351,6 +209,8 @@ func (r *JobRepository) JobCounts( return nil, err } + stats := make([]*model.JobsStatistics, 0, 100) + for rows.Next() { var id sql.NullString var cnt sql.NullInt64 @@ -359,21 +219,39 @@ func (r *JobRepository) JobCounts( return nil, err } if id.Valid { - counts[id.String] = jobCountResult{id: id.String, shortJobs: int(cnt.Int64)} + stats = append(stats, + &model.JobsStatistics{ + ID: id.String, + TotalJobs: int(cnt.Int64)}) } } - query = r.buildCountQuery(filter, "running", "cluster") - query, err = SecurityCheck(ctx, query) + log.Infof("Timer JobStatistics %s", time.Since(start)) + return stats, nil +} + +func (r *JobRepository) AddJobCountGrouped( + ctx context.Context, + filter []*model.JobFilter, + groupBy *model.Aggregate, + stats []*model.JobsStatistics, + kind string) ([]*model.JobsStatistics, error) { + + start := time.Now() + col := groupBy2column[*groupBy] + query := r.buildCountQuery(filter, kind, col) + query, err := SecurityCheck(ctx, query) if err != nil { return nil, err } - rows, err = query.RunWith(r.DB).Query() + rows, err := query.RunWith(r.DB).Query() if err != nil { log.Warn("Error while querying DB for job statistics") return nil, err } + counts := make(map[string]int) + for rows.Next() { var id sql.NullString var cnt sql.NullInt64 @@ -382,18 +260,21 @@ func (r *JobRepository) JobCounts( return nil, err } if id.Valid { - counts[id.String].runningJobs = int(cnt.Int64) + counts[id.String] = int(cnt.Int64) } } - stats := make([]*model.JobsStatistics, 0, 20) - if id.Valid { - stats = append(stats, - &model.JobsStatistics{ - ID: id.String, - TotalJobs: int(jobs.Int64), - RunningJobs: int(walltime.Int64)}) + switch kind { + case "running": + for _, s := range stats { + s.RunningJobs = counts[s.ID] + } + case "short": + for _, s := range stats { + s.ShortJobs = counts[s.ID] + } } + log.Infof("Timer JobStatistics %s", time.Since(start)) return stats, nil } diff --git a/internal/routerConfig/routes.go b/internal/routerConfig/routes.go index 87d2c0e..2aa3f05 100644 --- a/internal/routerConfig/routes.go +++ b/internal/routerConfig/routes.go @@ -14,12 +14,9 @@ import ( "github.com/ClusterCockpit/cc-backend/internal/api" "github.com/ClusterCockpit/cc-backend/internal/auth" - "github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-backend/internal/graph/model" "github.com/ClusterCockpit/cc-backend/internal/repository" - "github.com/ClusterCockpit/cc-backend/pkg/archive" "github.com/ClusterCockpit/cc-backend/pkg/log" - "github.com/ClusterCockpit/cc-backend/pkg/schema" "github.com/ClusterCockpit/cc-backend/web" "github.com/gorilla/mux" ) @@ -50,47 +47,20 @@ var routes []Route = []Route{ } func setupHomeRoute(i InfoType, r *http.Request) InfoType { - type cluster struct { - Name string - RunningJobs int - TotalJobs int - RecentShortJobs int - } jobRepo := repository.GetJobRepository() + groupBy := model.AggregateCluster - runningJobs, err := jobRepo.CountGroupedJobs(r.Context(), model.AggregateCluster, []*model.JobFilter{{ - State: []schema.JobState{schema.JobStateRunning}, - }}, nil, nil) + stats, err := jobRepo.JobCountGrouped(r.Context(), nil, &groupBy) if err != nil { log.Warnf("failed to count jobs: %s", err.Error()) - runningJobs = map[string]int{} - } - totalJobs, err := jobRepo.CountGroupedJobs(r.Context(), model.AggregateCluster, nil, nil, nil) - if err != nil { - log.Warnf("failed to count jobs: %s", err.Error()) - totalJobs = map[string]int{} - } - from := time.Now().Add(-24 * time.Hour) - recentShortJobs, err := jobRepo.CountGroupedJobs(r.Context(), model.AggregateCluster, []*model.JobFilter{{ - StartTime: &schema.TimeRange{From: &from, To: nil}, - Duration: &schema.IntRange{From: 0, To: config.Keys.ShortRunningJobsDuration}, - }}, nil, nil) - if err != nil { - log.Warnf("failed to count jobs: %s", err.Error()) - recentShortJobs = map[string]int{} } - clusters := make([]cluster, 0) - for _, c := range archive.Clusters { - clusters = append(clusters, cluster{ - Name: c.Name, - RunningJobs: runningJobs[c.Name], - TotalJobs: totalJobs[c.Name], - RecentShortJobs: recentShortJobs[c.Name], - }) + stats, err = jobRepo.AddJobCountGrouped(r.Context(), nil, &groupBy, stats, "running") + if err != nil { + log.Warnf("failed to count running jobs: %s", err.Error()) } - i["clusters"] = clusters + i["clusters"] = stats return i } diff --git a/pkg/schema/job.go b/pkg/schema/job.go index 3f75551..d967dd0 100644 --- a/pkg/schema/job.go +++ b/pkg/schema/job.go @@ -17,14 +17,15 @@ import ( type BaseJob struct { // The unique identifier of a job - JobID int64 `json:"jobId" db:"job_id" example:"123000"` - User string `json:"user" db:"user" example:"abcd100h"` // The unique identifier of a user - Project string `json:"project" db:"project" example:"abcd200"` // The unique identifier of a project - Cluster string `json:"cluster" db:"cluster" example:"fritz"` // The unique identifier of a cluster - SubCluster string `json:"subCluster" db:"subcluster" example:"main"` // The unique identifier of a sub cluster - Partition string `json:"partition,omitempty" db:"partition" example:"main"` // The Slurm partition to which the job was submitted - ArrayJobId int64 `json:"arrayJobId,omitempty" db:"array_job_id" example:"123000"` // The unique identifier of an array job - NumNodes int32 `json:"numNodes" db:"num_nodes" example:"2" minimum:"1"` // Number of nodes used (Min > 0) + JobID int64 `json:"jobId" db:"job_id" example:"123000"` + User string `json:"user" db:"user" example:"abcd100h"` // The unique identifier of a user + Project string `json:"project" db:"project" example:"abcd200"` // The unique identifier of a project + Cluster string `json:"cluster" db:"cluster" example:"fritz"` // The unique identifier of a cluster + SubCluster string `json:"subCluster" db:"subcluster" example:"main"` // The unique identifier of a sub cluster + Partition string `json:"partition,omitempty" db:"partition" example:"main"` // The Slurm partition to which the job was submitted + ArrayJobId int64 `json:"arrayJobId,omitempty" db:"array_job_id" example:"123000"` // The unique identifier of an array job + NumNodes int32 `json:"numNodes" db:"num_nodes" example:"2" minimum:"1"` // Number of nodes used (Min > 0) + // NumCores int32 `json:"numCores" db:"num_cores" example:"20" minimum:"1"` // Number of HWThreads used (Min > 0) NumHWThreads int32 `json:"numHwthreads,omitempty" db:"num_hwthreads" example:"20" minimum:"1"` // Number of HWThreads used (Min > 0) NumAcc int32 `json:"numAcc,omitempty" db:"num_acc" example:"2" minimum:"1"` // Number of accelerators used (Min > 0) Exclusive int32 `json:"exclusive" db:"exclusive" example:"1" minimum:"0" maximum:"2"` // Specifies how nodes are shared: 0 - Shared among multiple jobs of multiple users, 1 - Job exclusive (Default), 2 - Shared among multiple jobs of same user diff --git a/web/templates/home.tmpl b/web/templates/home.tmpl index 92c80f9..ff49e1e 100644 --- a/web/templates/home.tmpl +++ b/web/templates/home.tmpl @@ -6,9 +6,8 @@ Name - Running Jobs (short ones not listed) + Running Jobs Total Jobs - Short Jobs in past 24h {{if .User.HasRole .Roles.admin}} System View Analysis View @@ -19,21 +18,19 @@ {{if .User.HasRole .Roles.admin}} {{range .Infos.clusters}} - {{.Name}} - {{.RunningJobs}} jobs - {{.TotalJobs}} jobs - {{.RecentShortJobs}} - System View - Analysis View + {{.ID}} + {{.RunningJobs}} jobs + {{.TotalJobs}} jobs + System View + Analysis View {{end}} {{else}} {{range .Infos.clusters}} - {{.Name}} - {{.RunningJobs}} jobs - {{.TotalJobs}} jobs - {{.RecentShortJobs}} + {{.ID}} + {{.RunningJobs}} jobs + {{.TotalJobs}} jobs {{end}} {{end}}