mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2026-03-15 12:27:30 +01:00
Make stats query selective. Add stats index. Add paging to user list.
Entire-Checkpoint: d42431eee30d
This commit is contained in:
@@ -105,9 +105,9 @@ func (r *JobRepository) buildCountQuery(
|
||||
var query sq.SelectBuilder
|
||||
|
||||
if col != "" {
|
||||
query = sq.Select(col, "COUNT(job.id)").From("job").GroupBy(col)
|
||||
query = sq.Select(col, "COUNT(*)").From("job").GroupBy(col)
|
||||
} else {
|
||||
query = sq.Select("COUNT(job.id)").From("job")
|
||||
query = sq.Select("COUNT(*)").From("job")
|
||||
}
|
||||
|
||||
switch kind {
|
||||
@@ -124,87 +124,100 @@ func (r *JobRepository) buildCountQuery(
|
||||
return query
|
||||
}
|
||||
|
||||
// buildStatsQuery constructs a SQL query to compute comprehensive job statistics with optional grouping.
|
||||
// buildStatsQuery constructs a SQL query to compute job statistics with optional grouping.
|
||||
// Only requested columns are computed; unrequested columns select 0 as placeholder.
|
||||
//
|
||||
// Parameters:
|
||||
// - filter: Job filters to apply (cluster, user, time range, etc.)
|
||||
// - col: Column name to GROUP BY; empty string for overall statistics without grouping
|
||||
// - shortThreshold: Duration threshold in seconds for counting short-running jobs
|
||||
//
|
||||
// Returns a SelectBuilder that produces comprehensive statistics:
|
||||
// - totalJobs: Count of jobs
|
||||
// - totalUsers: Count of distinct users (always 0 when grouping by user)
|
||||
// - totalWalltime: Sum of job durations in hours
|
||||
// - totalNodes: Sum of nodes used across all jobs
|
||||
// - totalNodeHours: Sum of (duration × num_nodes) in hours
|
||||
// - totalCores: Sum of hardware threads used across all jobs
|
||||
// - totalCoreHours: Sum of (duration × num_hwthreads) in hours
|
||||
// - totalAccs: Sum of accelerators used across all jobs
|
||||
// - totalAccHours: Sum of (duration × num_acc) in hours
|
||||
// - runningJobs: Count of jobs with job_state = 'running'
|
||||
// - shortJobs: Count of jobs with duration < shortThreshold
|
||||
//
|
||||
// Special handling:
|
||||
// - Running jobs: Duration calculated as (now - start_time) instead of stored duration
|
||||
// - Grouped by user: Also joins hpc_user table to select display name
|
||||
// - Grouped by other dimensions: Selects empty string for name column (no join)
|
||||
// - All time values converted from seconds to hours (÷ 3600) and rounded
|
||||
// - reqFields: Set of requested field names; nil means compute all fields
|
||||
func (r *JobRepository) buildStatsQuery(
|
||||
filter []*model.JobFilter,
|
||||
col string,
|
||||
shortThreshold int,
|
||||
reqFields map[string]bool,
|
||||
) sq.SelectBuilder {
|
||||
now := time.Now().Unix()
|
||||
var query sq.SelectBuilder
|
||||
|
||||
// Helper: return real expression if field is requested (or reqFields is nil), else "0 as alias"
|
||||
need := func(field string) bool {
|
||||
return reqFields == nil || reqFields[field]
|
||||
}
|
||||
durationExpr := fmt.Sprintf(`(CASE WHEN job.job_state = 'running' THEN %d - job.start_time ELSE job.duration END)`, now)
|
||||
|
||||
// Build column list
|
||||
columns := make([]string, 0, 14)
|
||||
|
||||
if col != "" {
|
||||
if col == "job.hpc_user" {
|
||||
query = sq.Select(
|
||||
col,
|
||||
"name",
|
||||
"COUNT(job.id) as totalJobs",
|
||||
"COUNT(DISTINCT job.hpc_user) AS totalUsers",
|
||||
fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END)) / 3600) as int) as totalWalltime`, now),
|
||||
`CAST(SUM(job.num_nodes) as int) as totalNodes`,
|
||||
fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_nodes) / 3600) as int) as totalNodeHours`, now),
|
||||
`CAST(SUM(job.num_hwthreads) as int) as totalCores`,
|
||||
fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_hwthreads) / 3600) as int) as totalCoreHours`, now),
|
||||
`CAST(SUM(job.num_acc) as int) as totalAccs`,
|
||||
fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_acc) / 3600) as int) as totalAccHours`, now),
|
||||
`COUNT(CASE WHEN job.job_state = 'running' THEN 1 END) as runningJobs`,
|
||||
fmt.Sprintf(`COUNT(CASE WHEN job.duration < %d THEN 1 END) as shortJobs`, shortThreshold),
|
||||
).From("job").LeftJoin("hpc_user ON hpc_user.username = job.hpc_user").GroupBy(col)
|
||||
} else {
|
||||
query = sq.Select(
|
||||
col,
|
||||
"'' as name",
|
||||
"COUNT(job.id) as totalJobs",
|
||||
"COUNT(DISTINCT job.hpc_user) AS totalUsers",
|
||||
fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END)) / 3600) as int) as totalWalltime`, now),
|
||||
`CAST(SUM(job.num_nodes) as int) as totalNodes`,
|
||||
fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_nodes) / 3600) as int) as totalNodeHours`, now),
|
||||
`CAST(SUM(job.num_hwthreads) as int) as totalCores`,
|
||||
fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_hwthreads) / 3600) as int) as totalCoreHours`, now),
|
||||
`CAST(SUM(job.num_acc) as int) as totalAccs`,
|
||||
fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_acc) / 3600) as int) as totalAccHours`, now),
|
||||
`COUNT(CASE WHEN job.job_state = 'running' THEN 1 END) as runningJobs`,
|
||||
fmt.Sprintf(`COUNT(CASE WHEN job.duration < %d THEN 1 END) as shortJobs`, shortThreshold),
|
||||
).From("job").GroupBy(col)
|
||||
}
|
||||
columns = append(columns, col)
|
||||
}
|
||||
|
||||
columns = append(columns, "COUNT(*) as totalJobs")
|
||||
|
||||
if need("totalUsers") && col != "job.hpc_user" {
|
||||
columns = append(columns, "COUNT(DISTINCT job.hpc_user) AS totalUsers")
|
||||
} else {
|
||||
query = sq.Select(
|
||||
"COUNT(job.id) as totalJobs",
|
||||
"COUNT(DISTINCT job.hpc_user) AS totalUsers",
|
||||
fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END)) / 3600) as int)`, now),
|
||||
`CAST(SUM(job.num_nodes) as int)`,
|
||||
fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_nodes) / 3600) as int)`, now),
|
||||
`CAST(SUM(job.num_hwthreads) as int)`,
|
||||
fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_hwthreads) / 3600) as int)`, now),
|
||||
`CAST(SUM(job.num_acc) as int)`,
|
||||
fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_acc) / 3600) as int)`, now),
|
||||
`COUNT(CASE WHEN job.job_state = 'running' THEN 1 END) as runningJobs`,
|
||||
fmt.Sprintf(`COUNT(CASE WHEN job.duration < %d THEN 1 END) as shortJobs`, shortThreshold),
|
||||
).From("job")
|
||||
columns = append(columns, "0 AS totalUsers")
|
||||
}
|
||||
|
||||
if need("totalWalltime") {
|
||||
columns = append(columns, fmt.Sprintf(`CAST(ROUND(SUM(%s) / 3600) as int) as totalWalltime`, durationExpr))
|
||||
} else {
|
||||
columns = append(columns, "0 as totalWalltime")
|
||||
}
|
||||
|
||||
if need("totalNodes") {
|
||||
columns = append(columns, `CAST(SUM(job.num_nodes) as int) as totalNodes`)
|
||||
} else {
|
||||
columns = append(columns, "0 as totalNodes")
|
||||
}
|
||||
|
||||
if need("totalNodeHours") {
|
||||
columns = append(columns, fmt.Sprintf(`CAST(ROUND(SUM(%s * job.num_nodes) / 3600) as int) as totalNodeHours`, durationExpr))
|
||||
} else {
|
||||
columns = append(columns, "0 as totalNodeHours")
|
||||
}
|
||||
|
||||
if need("totalCores") {
|
||||
columns = append(columns, `CAST(SUM(job.num_hwthreads) as int) as totalCores`)
|
||||
} else {
|
||||
columns = append(columns, "0 as totalCores")
|
||||
}
|
||||
|
||||
if need("totalCoreHours") {
|
||||
columns = append(columns, fmt.Sprintf(`CAST(ROUND(SUM(%s * job.num_hwthreads) / 3600) as int) as totalCoreHours`, durationExpr))
|
||||
} else {
|
||||
columns = append(columns, "0 as totalCoreHours")
|
||||
}
|
||||
|
||||
if need("totalAccs") {
|
||||
columns = append(columns, `CAST(SUM(job.num_acc) as int) as totalAccs`)
|
||||
} else {
|
||||
columns = append(columns, "0 as totalAccs")
|
||||
}
|
||||
|
||||
if need("totalAccHours") {
|
||||
columns = append(columns, fmt.Sprintf(`CAST(ROUND(SUM(%s * job.num_acc) / 3600) as int) as totalAccHours`, durationExpr))
|
||||
} else {
|
||||
columns = append(columns, "0 as totalAccHours")
|
||||
}
|
||||
|
||||
if need("runningJobs") {
|
||||
columns = append(columns, `COUNT(CASE WHEN job.job_state = 'running' THEN 1 END) as runningJobs`)
|
||||
} else {
|
||||
columns = append(columns, "0 as runningJobs")
|
||||
}
|
||||
|
||||
if need("shortJobs") {
|
||||
columns = append(columns, fmt.Sprintf(`COUNT(CASE WHEN job.duration < %d THEN 1 END) as shortJobs`, shortThreshold))
|
||||
} else {
|
||||
columns = append(columns, "0 as shortJobs")
|
||||
}
|
||||
|
||||
query := sq.Select(columns...).From("job")
|
||||
if col != "" {
|
||||
query = query.GroupBy(col)
|
||||
}
|
||||
|
||||
for _, f := range filter {
|
||||
@@ -214,35 +227,20 @@ func (r *JobRepository) buildStatsQuery(
|
||||
return query
|
||||
}
|
||||
|
||||
// JobsStatsGrouped computes comprehensive job statistics grouped by a dimension (user, project, cluster, or subcluster).
|
||||
//
|
||||
// This is the primary method for generating aggregated statistics views in the UI, providing
|
||||
// metrics like total jobs, walltime, and resource usage broken down by the specified grouping.
|
||||
//
|
||||
// Parameters:
|
||||
// - ctx: Context for security checks and cancellation
|
||||
// - filter: Filters to apply (time range, cluster, job state, etc.)
|
||||
// - page: Optional pagination (ItemsPerPage: -1 disables pagination)
|
||||
// - sortBy: Optional sort column (totalJobs, totalWalltime, totalCoreHours, etc.)
|
||||
// - groupBy: Required grouping dimension (User, Project, Cluster, or SubCluster)
|
||||
//
|
||||
// Returns a slice of JobsStatistics, one per group, with:
|
||||
// - ID: The group identifier (username, project name, cluster name, etc.)
|
||||
// - Name: Display name (for users, from hpc_user.name; empty for other groups)
|
||||
// - Statistics: totalJobs, totalUsers, totalWalltime, resource usage metrics
|
||||
//
|
||||
// Security: Respects user roles via SecurityCheck - users see only their own data unless admin/support.
|
||||
// Performance: Results are sorted in SQL and pagination applied before scanning rows.
|
||||
// JobsStatsGrouped computes job statistics grouped by a dimension (user, project, cluster, or subcluster).
|
||||
// Only columns listed in reqFields are computed; others return 0. User display names are looked up
|
||||
// in a separate lightweight query to avoid JOIN overhead on the main aggregation.
|
||||
func (r *JobRepository) JobsStatsGrouped(
|
||||
ctx context.Context,
|
||||
filter []*model.JobFilter,
|
||||
page *model.PageRequest,
|
||||
sortBy *model.SortByAggregate,
|
||||
groupBy *model.Aggregate,
|
||||
reqFields map[string]bool,
|
||||
) ([]*model.JobsStatistics, error) {
|
||||
start := time.Now()
|
||||
col := groupBy2column[*groupBy]
|
||||
query := r.buildStatsQuery(filter, col, config.Keys.ShortRunningJobsDuration)
|
||||
query := r.buildStatsQuery(filter, col, config.Keys.ShortRunningJobsDuration, reqFields)
|
||||
|
||||
query, err := SecurityCheck(ctx, query)
|
||||
if err != nil {
|
||||
@@ -269,86 +267,28 @@ func (r *JobRepository) JobsStatsGrouped(
|
||||
|
||||
for rows.Next() {
|
||||
var id sql.NullString
|
||||
var name sql.NullString
|
||||
var jobs, users, walltime, nodes, nodeHours, cores, coreHours, accs, accHours, runningJobs, shortJobs sql.NullInt64
|
||||
if err := rows.Scan(&id, &name, &jobs, &users, &walltime, &nodes, &nodeHours, &cores, &coreHours, &accs, &accHours, &runningJobs, &shortJobs); err != nil {
|
||||
if err := rows.Scan(&id, &jobs, &users, &walltime, &nodes, &nodeHours, &cores, &coreHours, &accs, &accHours, &runningJobs, &shortJobs); err != nil {
|
||||
cclog.Warnf("Error while scanning rows: %s", err.Error())
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if id.Valid {
|
||||
var totalJobs, totalUsers, totalWalltime, totalNodes, totalNodeHours, totalCores, totalCoreHours, totalAccs, totalAccHours int
|
||||
var personName string
|
||||
|
||||
if name.Valid {
|
||||
personName = name.String
|
||||
}
|
||||
|
||||
if jobs.Valid {
|
||||
totalJobs = int(jobs.Int64)
|
||||
}
|
||||
|
||||
if users.Valid {
|
||||
totalUsers = int(users.Int64)
|
||||
}
|
||||
|
||||
if walltime.Valid {
|
||||
totalWalltime = int(walltime.Int64)
|
||||
}
|
||||
|
||||
if nodes.Valid {
|
||||
totalNodes = int(nodes.Int64)
|
||||
}
|
||||
if cores.Valid {
|
||||
totalCores = int(cores.Int64)
|
||||
}
|
||||
if accs.Valid {
|
||||
totalAccs = int(accs.Int64)
|
||||
}
|
||||
|
||||
if nodeHours.Valid {
|
||||
totalNodeHours = int(nodeHours.Int64)
|
||||
}
|
||||
if coreHours.Valid {
|
||||
totalCoreHours = int(coreHours.Int64)
|
||||
}
|
||||
if accHours.Valid {
|
||||
totalAccHours = int(accHours.Int64)
|
||||
}
|
||||
|
||||
if col == "job.hpc_user" {
|
||||
stats = append(stats,
|
||||
&model.JobsStatistics{
|
||||
ID: id.String,
|
||||
Name: personName,
|
||||
TotalJobs: totalJobs,
|
||||
TotalWalltime: totalWalltime,
|
||||
TotalNodes: totalNodes,
|
||||
TotalNodeHours: totalNodeHours,
|
||||
TotalCores: totalCores,
|
||||
TotalCoreHours: totalCoreHours,
|
||||
TotalAccs: totalAccs,
|
||||
TotalAccHours: totalAccHours,
|
||||
RunningJobs: int(runningJobs.Int64),
|
||||
ShortJobs: int(shortJobs.Int64),
|
||||
})
|
||||
} else {
|
||||
stats = append(stats,
|
||||
&model.JobsStatistics{
|
||||
ID: id.String,
|
||||
TotalJobs: totalJobs,
|
||||
TotalUsers: totalUsers,
|
||||
TotalWalltime: totalWalltime,
|
||||
TotalNodes: totalNodes,
|
||||
TotalNodeHours: totalNodeHours,
|
||||
TotalCores: totalCores,
|
||||
TotalCoreHours: totalCoreHours,
|
||||
TotalAccs: totalAccs,
|
||||
TotalAccHours: totalAccHours,
|
||||
RunningJobs: int(runningJobs.Int64),
|
||||
ShortJobs: int(shortJobs.Int64),
|
||||
})
|
||||
}
|
||||
stats = append(stats,
|
||||
&model.JobsStatistics{
|
||||
ID: id.String,
|
||||
TotalJobs: int(jobs.Int64),
|
||||
TotalUsers: int(users.Int64),
|
||||
TotalWalltime: int(walltime.Int64),
|
||||
TotalNodes: int(nodes.Int64),
|
||||
TotalNodeHours: int(nodeHours.Int64),
|
||||
TotalCores: int(cores.Int64),
|
||||
TotalCoreHours: int(coreHours.Int64),
|
||||
TotalAccs: int(accs.Int64),
|
||||
TotalAccHours: int(accHours.Int64),
|
||||
RunningJobs: int(runningJobs.Int64),
|
||||
ShortJobs: int(shortJobs.Int64),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -356,6 +296,35 @@ func (r *JobRepository) JobsStatsGrouped(
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Post-query name lookup for user grouping (avoids LEFT JOIN on aggregation query)
|
||||
if col == "job.hpc_user" && len(stats) > 0 {
|
||||
usernames := make([]any, len(stats))
|
||||
for i, s := range stats {
|
||||
usernames[i] = s.ID
|
||||
}
|
||||
|
||||
nameQuery := sq.Select("username", "name").From("hpc_user").Where(sq.Eq{"username": usernames})
|
||||
nameRows, err := nameQuery.RunWith(r.DB).QueryContext(ctx)
|
||||
if err != nil {
|
||||
cclog.Warnf("Error looking up user names: %s", err.Error())
|
||||
// Non-fatal: stats are still valid without display names
|
||||
} else {
|
||||
defer nameRows.Close()
|
||||
nameMap := make(map[string]string, len(stats))
|
||||
for nameRows.Next() {
|
||||
var username, name string
|
||||
if err := nameRows.Scan(&username, &name); err == nil {
|
||||
nameMap[username] = name
|
||||
}
|
||||
}
|
||||
for _, s := range stats {
|
||||
if name, ok := nameMap[s.ID]; ok {
|
||||
s.Name = name
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
cclog.Debugf("Timer JobsStatsGrouped %s", time.Since(start))
|
||||
return stats, nil
|
||||
}
|
||||
@@ -378,9 +347,10 @@ func (r *JobRepository) JobsStatsGrouped(
|
||||
func (r *JobRepository) JobsStats(
|
||||
ctx context.Context,
|
||||
filter []*model.JobFilter,
|
||||
reqFields map[string]bool,
|
||||
) ([]*model.JobsStatistics, error) {
|
||||
start := time.Now()
|
||||
query := r.buildStatsQuery(filter, "", config.Keys.ShortRunningJobsDuration)
|
||||
query := r.buildStatsQuery(filter, "", config.Keys.ShortRunningJobsDuration, reqFields)
|
||||
query, err := SecurityCheck(ctx, query)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -674,7 +644,7 @@ func (r *JobRepository) AddHistograms(
|
||||
|
||||
var err error
|
||||
// Return X-Values always as seconds, will be formatted into minutes and hours in frontend
|
||||
value := fmt.Sprintf(`CAST(ROUND(((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) / %d) + 1) as int) as value`, time.Now().Unix(), targetBinSize)
|
||||
value := fmt.Sprintf(`CAST(ROUND(((CASE WHEN job.job_state = 'running' THEN %d - job.start_time ELSE job.duration END) / %d) + 1) as int) as value`, time.Now().Unix(), targetBinSize)
|
||||
stat.HistDuration, err = r.jobsDurationStatisticsHistogram(ctx, value, filter, targetBinSize, &targetBinCount)
|
||||
if err != nil {
|
||||
cclog.Warn("Error while loading job statistics histogram: job duration")
|
||||
@@ -778,7 +748,7 @@ func (r *JobRepository) jobsStatisticsHistogram(
|
||||
) ([]*model.HistoPoint, error) {
|
||||
start := time.Now()
|
||||
query, qerr := SecurityCheck(ctx,
|
||||
sq.Select(value, "COUNT(job.id) AS count").From("job"))
|
||||
sq.Select(value, "COUNT(*) AS count").From("job"))
|
||||
|
||||
if qerr != nil {
|
||||
return nil, qerr
|
||||
@@ -844,7 +814,7 @@ func (r *JobRepository) jobsDurationStatisticsHistogram(
|
||||
) ([]*model.HistoPoint, error) {
|
||||
start := time.Now()
|
||||
query, qerr := SecurityCheck(ctx,
|
||||
sq.Select(value, "COUNT(job.id) AS count").From("job"))
|
||||
sq.Select(value, "COUNT(*) AS count").From("job"))
|
||||
|
||||
if qerr != nil {
|
||||
return nil, qerr
|
||||
|
||||
Reference in New Issue
Block a user