fix: Reduce complexity for groupBy stats queries

Entire-Checkpoint: fc899a70a751
This commit is contained in:
2026-03-11 15:14:59 +01:00
parent 6e0fe62566
commit af78f06ced
3 changed files with 106 additions and 56 deletions

View File

@@ -652,8 +652,10 @@ func (r *queryResolver) JobsStatistics(ctx context.Context, filter []*model.JobF
defaultDurationBins := "1h" defaultDurationBins := "1h"
defaultMetricBins := 10 defaultMetricBins := 10
if requireField(ctx, "totalJobs") || requireField(ctx, "totalUsers") || requireField(ctx, "totalWalltime") || requireField(ctx, "totalNodes") || requireField(ctx, "totalCores") || fetchedMainStats := requireField(ctx, "totalJobs") || requireField(ctx, "totalUsers") || requireField(ctx, "totalWalltime") || requireField(ctx, "totalNodes") || requireField(ctx, "totalCores") ||
requireField(ctx, "totalAccs") || requireField(ctx, "totalNodeHours") || requireField(ctx, "totalCoreHours") || requireField(ctx, "totalAccHours") { requireField(ctx, "totalAccs") || requireField(ctx, "totalNodeHours") || requireField(ctx, "totalCoreHours") || requireField(ctx, "totalAccHours")
if fetchedMainStats {
if groupBy == nil { if groupBy == nil {
stats, err = r.Repo.JobsStats(ctx, filter) stats, err = r.Repo.JobsStats(ctx, filter)
} else { } else {
@@ -664,6 +666,9 @@ func (r *queryResolver) JobsStatistics(ctx context.Context, filter []*model.JobF
stats = append(stats, &model.JobsStatistics{}) stats = append(stats, &model.JobsStatistics{})
} }
// runningJobs and shortJobs are already inlined in JobsStats/JobsStatsGrouped.
// Only run separate count queries if main stats were not fetched.
if !fetchedMainStats {
if groupBy != nil { if groupBy != nil {
if requireField(ctx, "shortJobs") { if requireField(ctx, "shortJobs") {
stats, err = r.Repo.AddJobCountGrouped(ctx, filter, groupBy, stats, "short") stats, err = r.Repo.AddJobCountGrouped(ctx, filter, groupBy, stats, "short")
@@ -679,6 +684,7 @@ func (r *queryResolver) JobsStatistics(ctx context.Context, filter []*model.JobF
stats, err = r.Repo.AddJobCount(ctx, filter, stats, "running") stats, err = r.Repo.AddJobCount(ctx, filter, stats, "running")
} }
} }
}
if err != nil { if err != nil {
return nil, err return nil, err

View File

@@ -129,6 +129,7 @@ func (r *JobRepository) buildCountQuery(
// Parameters: // Parameters:
// - filter: Job filters to apply (cluster, user, time range, etc.) // - filter: Job filters to apply (cluster, user, time range, etc.)
// - col: Column name to GROUP BY; empty string for overall statistics without grouping // - col: Column name to GROUP BY; empty string for overall statistics without grouping
// - shortThreshold: Duration threshold in seconds for counting short-running jobs
// //
// Returns a SelectBuilder that produces comprehensive statistics: // Returns a SelectBuilder that produces comprehensive statistics:
// - totalJobs: Count of jobs // - totalJobs: Count of jobs
@@ -140,42 +141,69 @@ func (r *JobRepository) buildCountQuery(
// - totalCoreHours: Sum of (duration × num_hwthreads) in hours // - totalCoreHours: Sum of (duration × num_hwthreads) in hours
// - totalAccs: Sum of accelerators used across all jobs // - totalAccs: Sum of accelerators used across all jobs
// - totalAccHours: Sum of (duration × num_acc) in hours // - totalAccHours: Sum of (duration × num_acc) in hours
// - runningJobs: Count of jobs with job_state = 'running'
// - shortJobs: Count of jobs with duration < shortThreshold
// //
// Special handling: // Special handling:
// - Running jobs: Duration calculated as (now - start_time) instead of stored duration // - Running jobs: Duration calculated as (now - start_time) instead of stored duration
// - Grouped queries: Also select grouping column and user's display name from hpc_user table // - Grouped by user: Also joins hpc_user table to select display name
// - Grouped by other dimensions: Selects empty string for name column (no join)
// - All time values converted from seconds to hours (÷ 3600) and rounded // - All time values converted from seconds to hours (÷ 3600) and rounded
func (r *JobRepository) buildStatsQuery( func (r *JobRepository) buildStatsQuery(
filter []*model.JobFilter, filter []*model.JobFilter,
col string, col string,
shortThreshold int,
) sq.SelectBuilder { ) sq.SelectBuilder {
now := time.Now().Unix()
var query sq.SelectBuilder var query sq.SelectBuilder
if col != "" { if col != "" {
if col == "job.hpc_user" {
query = sq.Select( query = sq.Select(
col, col,
"name", "name",
"COUNT(job.id) as totalJobs", "COUNT(job.id) as totalJobs",
"COUNT(DISTINCT job.hpc_user) AS totalUsers", "COUNT(DISTINCT job.hpc_user) AS totalUsers",
fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END)) / 3600) as int) as totalWalltime`, time.Now().Unix()), fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END)) / 3600) as int) as totalWalltime`, now),
`CAST(SUM(job.num_nodes) as int) as totalNodes`, `CAST(SUM(job.num_nodes) as int) as totalNodes`,
fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_nodes) / 3600) as int) as totalNodeHours`, time.Now().Unix()), fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_nodes) / 3600) as int) as totalNodeHours`, now),
`CAST(SUM(job.num_hwthreads) as int) as totalCores`, `CAST(SUM(job.num_hwthreads) as int) as totalCores`,
fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_hwthreads) / 3600) as int) as totalCoreHours`, time.Now().Unix()), fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_hwthreads) / 3600) as int) as totalCoreHours`, now),
`CAST(SUM(job.num_acc) as int) as totalAccs`, `CAST(SUM(job.num_acc) as int) as totalAccs`,
fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_acc) / 3600) as int) as totalAccHours`, time.Now().Unix()), fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_acc) / 3600) as int) as totalAccHours`, now),
`COUNT(CASE WHEN job.job_state = 'running' THEN 1 END) as runningJobs`,
fmt.Sprintf(`COUNT(CASE WHEN job.duration < %d THEN 1 END) as shortJobs`, shortThreshold),
).From("job").LeftJoin("hpc_user ON hpc_user.username = job.hpc_user").GroupBy(col) ).From("job").LeftJoin("hpc_user ON hpc_user.username = job.hpc_user").GroupBy(col)
} else {
query = sq.Select(
col,
"'' as name",
"COUNT(job.id) as totalJobs",
"COUNT(DISTINCT job.hpc_user) AS totalUsers",
fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END)) / 3600) as int) as totalWalltime`, now),
`CAST(SUM(job.num_nodes) as int) as totalNodes`,
fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_nodes) / 3600) as int) as totalNodeHours`, now),
`CAST(SUM(job.num_hwthreads) as int) as totalCores`,
fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_hwthreads) / 3600) as int) as totalCoreHours`, now),
`CAST(SUM(job.num_acc) as int) as totalAccs`,
fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_acc) / 3600) as int) as totalAccHours`, now),
`COUNT(CASE WHEN job.job_state = 'running' THEN 1 END) as runningJobs`,
fmt.Sprintf(`COUNT(CASE WHEN job.duration < %d THEN 1 END) as shortJobs`, shortThreshold),
).From("job").GroupBy(col)
}
} else { } else {
query = sq.Select( query = sq.Select(
"COUNT(job.id) as totalJobs", "COUNT(job.id) as totalJobs",
"COUNT(DISTINCT job.hpc_user) AS totalUsers", "COUNT(DISTINCT job.hpc_user) AS totalUsers",
fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END)) / 3600) as int)`, time.Now().Unix()), fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END)) / 3600) as int)`, now),
`CAST(SUM(job.num_nodes) as int)`, `CAST(SUM(job.num_nodes) as int)`,
fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_nodes) / 3600) as int)`, time.Now().Unix()), fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_nodes) / 3600) as int)`, now),
`CAST(SUM(job.num_hwthreads) as int)`, `CAST(SUM(job.num_hwthreads) as int)`,
fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_hwthreads) / 3600) as int)`, time.Now().Unix()), fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_hwthreads) / 3600) as int)`, now),
`CAST(SUM(job.num_acc) as int)`, `CAST(SUM(job.num_acc) as int)`,
fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_acc) / 3600) as int)`, time.Now().Unix()), fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_acc) / 3600) as int)`, now),
`COUNT(CASE WHEN job.job_state = 'running' THEN 1 END) as runningJobs`,
fmt.Sprintf(`COUNT(CASE WHEN job.duration < %d THEN 1 END) as shortJobs`, shortThreshold),
).From("job") ).From("job")
} }
@@ -214,7 +242,7 @@ func (r *JobRepository) JobsStatsGrouped(
) ([]*model.JobsStatistics, error) { ) ([]*model.JobsStatistics, error) {
start := time.Now() start := time.Now()
col := groupBy2column[*groupBy] col := groupBy2column[*groupBy]
query := r.buildStatsQuery(filter, col) query := r.buildStatsQuery(filter, col, config.Keys.ShortRunningJobsDuration)
query, err := SecurityCheck(ctx, query) query, err := SecurityCheck(ctx, query)
if err != nil { if err != nil {
@@ -242,8 +270,8 @@ func (r *JobRepository) JobsStatsGrouped(
for rows.Next() { for rows.Next() {
var id sql.NullString var id sql.NullString
var name sql.NullString var name sql.NullString
var jobs, users, walltime, nodes, nodeHours, cores, coreHours, accs, accHours sql.NullInt64 var jobs, users, walltime, nodes, nodeHours, cores, coreHours, accs, accHours, runningJobs, shortJobs sql.NullInt64
if err := rows.Scan(&id, &name, &jobs, &users, &walltime, &nodes, &nodeHours, &cores, &coreHours, &accs, &accHours); err != nil { if err := rows.Scan(&id, &name, &jobs, &users, &walltime, &nodes, &nodeHours, &cores, &coreHours, &accs, &accHours, &runningJobs, &shortJobs); err != nil {
cclog.Warnf("Error while scanning rows: %s", err.Error()) cclog.Warnf("Error while scanning rows: %s", err.Error())
return nil, err return nil, err
} }
@@ -289,7 +317,6 @@ func (r *JobRepository) JobsStatsGrouped(
} }
if col == "job.hpc_user" { if col == "job.hpc_user" {
// name := r.getUserName(ctx, id.String)
stats = append(stats, stats = append(stats,
&model.JobsStatistics{ &model.JobsStatistics{
ID: id.String, ID: id.String,
@@ -302,6 +329,8 @@ func (r *JobRepository) JobsStatsGrouped(
TotalCoreHours: totalCoreHours, TotalCoreHours: totalCoreHours,
TotalAccs: totalAccs, TotalAccs: totalAccs,
TotalAccHours: totalAccHours, TotalAccHours: totalAccHours,
RunningJobs: int(runningJobs.Int64),
ShortJobs: int(shortJobs.Int64),
}) })
} else { } else {
stats = append(stats, stats = append(stats,
@@ -316,6 +345,8 @@ func (r *JobRepository) JobsStatsGrouped(
TotalCoreHours: totalCoreHours, TotalCoreHours: totalCoreHours,
TotalAccs: totalAccs, TotalAccs: totalAccs,
TotalAccHours: totalAccHours, TotalAccHours: totalAccHours,
RunningJobs: int(runningJobs.Int64),
ShortJobs: int(shortJobs.Int64),
}) })
} }
} }
@@ -349,7 +380,7 @@ func (r *JobRepository) JobsStats(
filter []*model.JobFilter, filter []*model.JobFilter,
) ([]*model.JobsStatistics, error) { ) ([]*model.JobsStatistics, error) {
start := time.Now() start := time.Now()
query := r.buildStatsQuery(filter, "") query := r.buildStatsQuery(filter, "", config.Keys.ShortRunningJobsDuration)
query, err := SecurityCheck(ctx, query) query, err := SecurityCheck(ctx, query)
if err != nil { if err != nil {
return nil, err return nil, err
@@ -358,8 +389,8 @@ func (r *JobRepository) JobsStats(
row := query.RunWith(r.DB).QueryRowContext(ctx) row := query.RunWith(r.DB).QueryRowContext(ctx)
stats := make([]*model.JobsStatistics, 0, 1) stats := make([]*model.JobsStatistics, 0, 1)
var jobs, users, walltime, nodes, nodeHours, cores, coreHours, accs, accHours sql.NullInt64 var jobs, users, walltime, nodes, nodeHours, cores, coreHours, accs, accHours, runningJobs, shortJobs sql.NullInt64
if err := row.Scan(&jobs, &users, &walltime, &nodes, &nodeHours, &cores, &coreHours, &accs, &accHours); err != nil { if err := row.Scan(&jobs, &users, &walltime, &nodes, &nodeHours, &cores, &coreHours, &accs, &accHours, &runningJobs, &shortJobs); err != nil {
cclog.Warn("Error while scanning rows") cclog.Warn("Error while scanning rows")
return nil, err return nil, err
} }
@@ -384,6 +415,8 @@ func (r *JobRepository) JobsStats(
TotalNodeHours: totalNodeHours, TotalNodeHours: totalNodeHours,
TotalCoreHours: totalCoreHours, TotalCoreHours: totalCoreHours,
TotalAccHours: totalAccHours, TotalAccHours: totalAccHours,
RunningJobs: int(runningJobs.Int64),
ShortJobs: int(shortJobs.Int64),
}) })
} }
@@ -819,12 +852,18 @@ func (r *JobRepository) jobsDurationStatisticsHistogram(
// Each bin represents a duration range: bin N = [N*binSizeSeconds, (N+1)*binSizeSeconds) // Each bin represents a duration range: bin N = [N*binSizeSeconds, (N+1)*binSizeSeconds)
// Example: binSizeSeconds=3600 (1 hour), bin 1 = 0-1h, bin 2 = 1-2h, etc. // Example: binSizeSeconds=3600 (1 hour), bin 1 = 0-1h, bin 2 = 1-2h, etc.
points := make([]*model.HistoPoint, 0) points := make([]*model.HistoPoint, 0, *targetBinCount)
for i := 1; i <= *targetBinCount; i++ { for i := 1; i <= *targetBinCount; i++ {
point := model.HistoPoint{Value: i * binSizeSeconds, Count: 0} point := model.HistoPoint{Value: i * binSizeSeconds, Count: 0}
points = append(points, &point) points = append(points, &point)
} }
// Build a map from bin value (seconds) to slice index for O(1) lookup.
binMap := make(map[int]int, len(points))
for i, p := range points {
binMap[p.Value] = i
}
for _, f := range filters { for _, f := range filters {
query = BuildWhereClause(f, query) query = BuildWhereClause(f, query)
} }
@@ -836,8 +875,8 @@ func (r *JobRepository) jobsDurationStatisticsHistogram(
} }
defer rows.Close() defer rows.Close()
// Match query results to pre-initialized bins. // Match query results to pre-initialized bins using map lookup.
// point.Value from query is the bin number; multiply by binSizeSeconds to match bin.Value. // point.Value from query is the bin number; multiply by binSizeSeconds to get bin key.
for rows.Next() { for rows.Next() {
point := model.HistoPoint{} point := model.HistoPoint{}
if err := rows.Scan(&point.Value, &point.Count); err != nil { if err := rows.Scan(&point.Value, &point.Count); err != nil {
@@ -845,11 +884,8 @@ func (r *JobRepository) jobsDurationStatisticsHistogram(
return nil, err return nil, err
} }
for _, e := range points { if idx, ok := binMap[point.Value*binSizeSeconds]; ok {
if e.Value == (point.Value * binSizeSeconds) { points[idx].Count = point.Count
e.Count = point.Count
break
}
} }
} }
@@ -968,16 +1004,25 @@ func (r *JobRepository) jobsMetricStatisticsHistogram(
// Pre-initialize bins with calculated min/max ranges. // Pre-initialize bins with calculated min/max ranges.
// Example: peak=1000, bins=10 -> bin 1=[0,100), bin 2=[100,200), ..., bin 10=[900,1000] // Example: peak=1000, bins=10 -> bin 1=[0,100), bin 2=[100,200), ..., bin 10=[900,1000]
points := make([]*model.MetricHistoPoint, 0) points := make([]*model.MetricHistoPoint, 0, *bins)
binStep := int(peak) / *bins binStep := int(peak) / *bins
for i := 1; i <= *bins; i++ { for i := 1; i <= *bins; i++ {
binMin := (binStep * (i - 1)) binMin := (binStep * (i - 1))
binMax := (binStep * i) binMax := (binStep * i)
epoint := model.MetricHistoPoint{Bin: &i, Count: 0, Min: &binMin, Max: &binMax} idx := i
epoint := model.MetricHistoPoint{Bin: &idx, Count: 0, Min: &binMin, Max: &binMax}
points = append(points, &epoint) points = append(points, &epoint)
} }
// Match query results to pre-initialized bins. // Build a map from bin number to slice index for O(1) lookup.
binMap := make(map[int]int, len(points))
for i, p := range points {
if p.Bin != nil {
binMap[*p.Bin] = i
}
}
// Match query results to pre-initialized bins using map lookup.
for rows.Next() { for rows.Next() {
rpoint := model.MetricHistoPoint{} rpoint := model.MetricHistoPoint{}
if err := rows.Scan(&rpoint.Bin, &rpoint.Count); err != nil { if err := rows.Scan(&rpoint.Bin, &rpoint.Count); err != nil {
@@ -985,10 +1030,9 @@ func (r *JobRepository) jobsMetricStatisticsHistogram(
return nil, err return nil, err
} }
for _, e := range points { if rpoint.Bin != nil {
if e.Bin != nil && rpoint.Bin != nil && *e.Bin == *rpoint.Bin { if idx, ok := binMap[*rpoint.Bin]; ok {
e.Count = rpoint.Count points[idx].Count = rpoint.Count
break
} }
} }
} }

View File

@@ -14,7 +14,7 @@ import (
func TestBuildJobStatsQuery(t *testing.T) { func TestBuildJobStatsQuery(t *testing.T) {
r := setup(t) r := setup(t)
q := r.buildStatsQuery(nil, "USER") q := r.buildStatsQuery(nil, "USER", 300)
sql, _, err := q.ToSql() sql, _, err := q.ToSql()
noErr(t, err) noErr(t, err)