Add sorting and paging to JobStatsGrouped

This commit is contained in:
2023-08-25 13:14:34 +02:00
parent 13d99a6ae0
commit d7117f3d49
6 changed files with 158 additions and 74 deletions

View File

@@ -23,6 +23,13 @@ var groupBy2column = map[model.Aggregate]string{
model.AggregateCluster: "job.cluster",
}
var sortBy2column = map[model.SortByAggregate]string{
model.SortByAggregateWalltime: "totalWalltime",
model.SortByAggregateNodehours: "totalNodeHours",
model.SortByAggregateCorehours: "totalCoreHours",
model.SortByAggregateAcchours: "totalAccHours",
}
func (r *JobRepository) buildCountQuery(
filter []*model.JobFilter,
kind string,
@@ -62,11 +69,12 @@ func (r *JobRepository) buildStatsQuery(
if col != "" {
// Scan columns: id, totalJobs, totalWalltime, totalNodeHours, totalCoreHours, totalAccHours
query = sq.Select(col, "COUNT(job.id)",
fmt.Sprintf("CAST(ROUND(SUM(job.duration) / 3600) as %s)", castType),
fmt.Sprintf("CAST(ROUND(SUM(job.duration * job.num_nodes) / 3600) as %s)", castType),
fmt.Sprintf("CAST(ROUND(SUM(job.duration * job.num_hwthreads) / 3600) as %s)", castType),
fmt.Sprintf("CAST(ROUND(SUM(job.duration * job.num_acc) / 3600) as %s)", castType),
fmt.Sprintf("CAST(ROUND(SUM(job.duration) / 3600) as %s) as totalWalltime", castType),
fmt.Sprintf("CAST(ROUND(SUM(job.duration * job.num_nodes) / 3600) as %s) as totalNodeHours", castType),
fmt.Sprintf("CAST(ROUND(SUM(job.duration * job.num_hwthreads) / 3600) as %s) as totalCoreHours", castType),
fmt.Sprintf("CAST(ROUND(SUM(job.duration * job.num_acc) / 3600) as %s) as totalAccHours", castType),
).From("job").GroupBy(col)
} else {
// Scan columns: totalJobs, totalWalltime, totalNodeHours, totalCoreHours, totalAccHours
query = sq.Select("COUNT(job.id)",
@@ -112,16 +120,28 @@ func (r *JobRepository) getCastType() string {
func (r *JobRepository) JobsStatsGrouped(
ctx context.Context,
filter []*model.JobFilter,
page *model.PageRequest,
sortBy *model.SortByAggregate,
groupBy *model.Aggregate) ([]*model.JobsStatistics, error) {
start := time.Now()
col := groupBy2column[*groupBy]
query := r.buildStatsQuery(filter, col)
query, err := SecurityCheck(ctx, query)
if err != nil {
return nil, err
}
if sortBy != nil {
sortBy := sortBy2column[*sortBy]
query = query.OrderBy(fmt.Sprintf("%s DESC", sortBy))
}
if page != nil && page.ItemsPerPage != -1 {
limit := uint64(page.ItemsPerPage)
query = query.Offset((uint64(page.Page) - 1) * limit).Limit(limit)
}
rows, err := query.RunWith(r.DB).Query()
if err != nil {
log.Warn("Error while querying DB for job statistics")
@@ -174,17 +194,10 @@ func (r *JobRepository) JobsStatsGrouped(
return stats, nil
}
func (r *JobRepository) JobsStats(
ctx context.Context,
func (r *JobRepository) jobsStats(
query sq.SelectBuilder,
filter []*model.JobFilter) ([]*model.JobsStatistics, error) {
start := time.Now()
query := r.buildStatsQuery(filter, "")
query, err := SecurityCheck(ctx, query)
if err != nil {
return nil, err
}
row := query.RunWith(r.DB).QueryRow()
stats := make([]*model.JobsStatistics, 0, 1)
@@ -211,10 +224,31 @@ func (r *JobRepository) JobsStats(
TotalAccHours: totalAccHours})
}
log.Debugf("Timer JobStats %s", time.Since(start))
return stats, nil
}
func (r *JobRepository) testJobsStats(
filter []*model.JobFilter) ([]*model.JobsStatistics, error) {
query := r.buildStatsQuery(filter, "")
return r.jobsStats(query, filter)
}
func (r *JobRepository) JobsStats(
ctx context.Context,
filter []*model.JobFilter) ([]*model.JobsStatistics, error) {
start := time.Now()
query := r.buildStatsQuery(filter, "")
query, err := SecurityCheck(ctx, query)
if err != nil {
return nil, err
}
log.Debugf("Timer JobStats %s", time.Since(start))
return r.jobsStats(query, filter)
}
func (r *JobRepository) JobCountGrouped(
ctx context.Context,
filter []*model.JobFilter,