From fb86ebdbcc1c018e9de5a9d90c6fba0fdf88abca Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Thu, 8 Jun 2023 06:18:19 +0200 Subject: [PATCH] Renaming --- internal/graph/schema.resolvers.go | 4 +- internal/repository/stats.go | 124 +++++++++++++++++++++++++++-- 2 files changed, 120 insertions(+), 8 deletions(-) diff --git a/internal/graph/schema.resolvers.go b/internal/graph/schema.resolvers.go index 94d8727..c881a78 100644 --- a/internal/graph/schema.resolvers.go +++ b/internal/graph/schema.resolvers.go @@ -280,8 +280,8 @@ func (r *queryResolver) JobsStatistics(ctx context.Context, filter []*model.JobF var err error var stats []*model.JobsStatistics - if requireField(ctx, "TotalJobs") { - if requireField(ctx, "TotalCoreHours") { + if requireField(ctx, "totalJobs") { + if requireField(ctx, "totalCoreHours") { if groupBy == nil { stats, err = r.Repo.JobsStatsPlain(ctx, filter) } else { diff --git a/internal/repository/stats.go b/internal/repository/stats.go index 9ec646f..2197a79 100644 --- a/internal/repository/stats.go +++ b/internal/repository/stats.go @@ -11,6 +11,7 @@ import ( "time" "github.com/ClusterCockpit/cc-backend/internal/auth" + "github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-backend/internal/graph/model" "github.com/ClusterCockpit/cc-backend/pkg/archive" "github.com/ClusterCockpit/cc-backend/pkg/log" @@ -24,7 +25,36 @@ var groupBy2column = map[model.Aggregate]string{ model.AggregateCluster: "job.cluster", } -func (r *JobRepository) buildJobsStatsQuery( +func (r *JobRepository) buildCountQuery( + filter []*model.JobFilter, + kind string, + col string) sq.SelectBuilder { + + var query sq.SelectBuilder + + if col != "" { + // Scan columns: id, cnt + query = sq.Select(col, "COUNT(job.id)").From("job").GroupBy(col) + } else { + // Scan columns: cnt + query = sq.Select("COUNT(job.id)").From("job") + } + + switch kind { + case "running": + query = query.Where("job.job_state = ?", "running") + case "short": + query = query.Where("job.duration < ?", config.Keys.ShortRunningJobsDuration) + } + + for _, f := range filter { + query = BuildWhereClause(f, query) + } + + return query +} + +func (r *JobRepository) buildStatsQuery( filter []*model.JobFilter, col string) sq.SelectBuilder { @@ -83,7 +113,7 @@ func (r *JobRepository) JobsStatsNoCoreH( start := time.Now() col := groupBy2column[*groupBy] - query := r.buildJobsStatsQuery(filter, col) + query := r.buildStatsQuery(filter, col) query, err := SecurityCheck(ctx, query) if err != nil { return nil, err @@ -134,7 +164,7 @@ func (r *JobRepository) JobsStatsPlainNoCoreH( filter []*model.JobFilter) ([]*model.JobsStatistics, error) { start := time.Now() - query := r.buildJobsStatsQuery(filter, "") + query := r.buildStatsQuery(filter, "") query, err := SecurityCheck(ctx, query) if err != nil { return nil, err @@ -149,10 +179,21 @@ func (r *JobRepository) JobsStatsPlainNoCoreH( } if jobs.Valid { + query := r.buildCountQuery(filter, "short", "") + query, err := SecurityCheck(ctx, query) + if err != nil { + return nil, err + } + var cnt sql.NullInt64 + if err := query.RunWith(r.DB).QueryRow().Scan(&cnt); err != nil { + log.Warn("Error while scanning rows") + return nil, err + } stats = append(stats, &model.JobsStatistics{ TotalJobs: int(jobs.Int64), - TotalWalltime: int(walltime.Int64)}) + TotalWalltime: int(walltime.Int64), + ShortJobs: int(cnt.Int64)}) } log.Infof("Timer JobStatistics %s", time.Since(start)) @@ -165,7 +206,7 @@ func (r *JobRepository) JobsStatsPlain( filter []*model.JobFilter) ([]*model.JobsStatistics, error) { start := time.Now() - query := r.buildJobsStatsQuery(filter, "") + query := r.buildStatsQuery(filter, "") query, err := SecurityCheck(ctx, query) if err != nil { return nil, err @@ -218,7 +259,7 @@ func (r *JobRepository) JobsStats( stats := map[string]*model.JobsStatistics{} col := groupBy2column[*groupBy] - query := r.buildJobsStatsQuery(filter, col) + query := r.buildStatsQuery(filter, col) query, err := SecurityCheck(ctx, query) if err != nil { return nil, err @@ -286,6 +327,77 @@ func (r *JobRepository) JobsStats( return res, nil } +type jobCountResult struct { + id string + shortJobs int + totalJobs int + runningJobs int +} + +func (r *JobRepository) JobCounts( + ctx context.Context, + filter []*model.JobFilter) ([]*model.JobsStatistics, error) { + + counts := make(map[string]jobCountResult) + start := time.Now() + query := r.buildCountQuery(filter, "short", "cluster") + query, err := SecurityCheck(ctx, query) + if err != nil { + return nil, err + } + rows, err := query.RunWith(r.DB).Query() + if err != nil { + log.Warn("Error while querying DB for job statistics") + return nil, err + } + + for rows.Next() { + var id sql.NullString + var cnt sql.NullInt64 + if err := rows.Scan(&id, &cnt); err != nil { + log.Warn("Error while scanning rows") + return nil, err + } + if id.Valid { + counts[id.String] = jobCountResult{id: id.String, shortJobs: int(cnt.Int64)} + } + } + + query = r.buildCountQuery(filter, "running", "cluster") + query, err = SecurityCheck(ctx, query) + if err != nil { + return nil, err + } + rows, err = query.RunWith(r.DB).Query() + if err != nil { + log.Warn("Error while querying DB for job statistics") + return nil, err + } + + for rows.Next() { + var id sql.NullString + var cnt sql.NullInt64 + if err := rows.Scan(&id, &cnt); err != nil { + log.Warn("Error while scanning rows") + return nil, err + } + if id.Valid { + counts[id.String].runningJobs = int(cnt.Int64) + } + } + + stats := make([]*model.JobsStatistics, 0, 20) + if id.Valid { + stats = append(stats, + &model.JobsStatistics{ + ID: id.String, + TotalJobs: int(jobs.Int64), + RunningJobs: int(walltime.Int64)}) + } + log.Infof("Timer JobStatistics %s", time.Since(start)) + return stats, nil +} + func (r *JobRepository) AddHistograms( ctx context.Context, filter []*model.JobFilter,