diff --git a/internal/graph/schema.resolvers.go b/internal/graph/schema.resolvers.go index 4c398ee3..571c532f 100644 --- a/internal/graph/schema.resolvers.go +++ b/internal/graph/schema.resolvers.go @@ -652,8 +652,10 @@ func (r *queryResolver) JobsStatistics(ctx context.Context, filter []*model.JobF defaultDurationBins := "1h" defaultMetricBins := 10 - if requireField(ctx, "totalJobs") || requireField(ctx, "totalUsers") || requireField(ctx, "totalWalltime") || requireField(ctx, "totalNodes") || requireField(ctx, "totalCores") || - requireField(ctx, "totalAccs") || requireField(ctx, "totalNodeHours") || requireField(ctx, "totalCoreHours") || requireField(ctx, "totalAccHours") { + fetchedMainStats := requireField(ctx, "totalJobs") || requireField(ctx, "totalUsers") || requireField(ctx, "totalWalltime") || requireField(ctx, "totalNodes") || requireField(ctx, "totalCores") || + requireField(ctx, "totalAccs") || requireField(ctx, "totalNodeHours") || requireField(ctx, "totalCoreHours") || requireField(ctx, "totalAccHours") + + if fetchedMainStats { if groupBy == nil { stats, err = r.Repo.JobsStats(ctx, filter) } else { @@ -664,19 +666,23 @@ func (r *queryResolver) JobsStatistics(ctx context.Context, filter []*model.JobF stats = append(stats, &model.JobsStatistics{}) } - if groupBy != nil { - if requireField(ctx, "shortJobs") { - stats, err = r.Repo.AddJobCountGrouped(ctx, filter, groupBy, stats, "short") - } - if requireField(ctx, "runningJobs") { - stats, err = r.Repo.AddJobCountGrouped(ctx, filter, groupBy, stats, "running") - } - } else { - if requireField(ctx, "shortJobs") { - stats, err = r.Repo.AddJobCount(ctx, filter, stats, "short") - } - if requireField(ctx, "runningJobs") { - stats, err = r.Repo.AddJobCount(ctx, filter, stats, "running") + // runningJobs and shortJobs are already inlined in JobsStats/JobsStatsGrouped. + // Only run separate count queries if main stats were not fetched. + if !fetchedMainStats { + if groupBy != nil { + if requireField(ctx, "shortJobs") { + stats, err = r.Repo.AddJobCountGrouped(ctx, filter, groupBy, stats, "short") + } + if requireField(ctx, "runningJobs") { + stats, err = r.Repo.AddJobCountGrouped(ctx, filter, groupBy, stats, "running") + } + } else { + if requireField(ctx, "shortJobs") { + stats, err = r.Repo.AddJobCount(ctx, filter, stats, "short") + } + if requireField(ctx, "runningJobs") { + stats, err = r.Repo.AddJobCount(ctx, filter, stats, "running") + } } } diff --git a/internal/repository/stats.go b/internal/repository/stats.go index 16ee3549..13eb75ca 100644 --- a/internal/repository/stats.go +++ b/internal/repository/stats.go @@ -129,6 +129,7 @@ func (r *JobRepository) buildCountQuery( // Parameters: // - filter: Job filters to apply (cluster, user, time range, etc.) // - col: Column name to GROUP BY; empty string for overall statistics without grouping +// - shortThreshold: Duration threshold in seconds for counting short-running jobs // // Returns a SelectBuilder that produces comprehensive statistics: // - totalJobs: Count of jobs @@ -140,42 +141,69 @@ func (r *JobRepository) buildCountQuery( // - totalCoreHours: Sum of (duration × num_hwthreads) in hours // - totalAccs: Sum of accelerators used across all jobs // - totalAccHours: Sum of (duration × num_acc) in hours +// - runningJobs: Count of jobs with job_state = 'running' +// - shortJobs: Count of jobs with duration < shortThreshold // // Special handling: // - Running jobs: Duration calculated as (now - start_time) instead of stored duration -// - Grouped queries: Also select grouping column and user's display name from hpc_user table +// - Grouped by user: Also joins hpc_user table to select display name +// - Grouped by other dimensions: Selects empty string for name column (no join) // - All time values converted from seconds to hours (÷ 3600) and rounded func (r *JobRepository) buildStatsQuery( filter []*model.JobFilter, col string, + shortThreshold int, ) sq.SelectBuilder { + now := time.Now().Unix() var query sq.SelectBuilder if col != "" { - query = sq.Select( - col, - "name", - "COUNT(job.id) as totalJobs", - "COUNT(DISTINCT job.hpc_user) AS totalUsers", - fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END)) / 3600) as int) as totalWalltime`, time.Now().Unix()), - `CAST(SUM(job.num_nodes) as int) as totalNodes`, - fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_nodes) / 3600) as int) as totalNodeHours`, time.Now().Unix()), - `CAST(SUM(job.num_hwthreads) as int) as totalCores`, - fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_hwthreads) / 3600) as int) as totalCoreHours`, time.Now().Unix()), - `CAST(SUM(job.num_acc) as int) as totalAccs`, - fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_acc) / 3600) as int) as totalAccHours`, time.Now().Unix()), - ).From("job").LeftJoin("hpc_user ON hpc_user.username = job.hpc_user").GroupBy(col) + if col == "job.hpc_user" { + query = sq.Select( + col, + "name", + "COUNT(job.id) as totalJobs", + "COUNT(DISTINCT job.hpc_user) AS totalUsers", + fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END)) / 3600) as int) as totalWalltime`, now), + `CAST(SUM(job.num_nodes) as int) as totalNodes`, + fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_nodes) / 3600) as int) as totalNodeHours`, now), + `CAST(SUM(job.num_hwthreads) as int) as totalCores`, + fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_hwthreads) / 3600) as int) as totalCoreHours`, now), + `CAST(SUM(job.num_acc) as int) as totalAccs`, + fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_acc) / 3600) as int) as totalAccHours`, now), + `COUNT(CASE WHEN job.job_state = 'running' THEN 1 END) as runningJobs`, + fmt.Sprintf(`COUNT(CASE WHEN job.duration < %d THEN 1 END) as shortJobs`, shortThreshold), + ).From("job").LeftJoin("hpc_user ON hpc_user.username = job.hpc_user").GroupBy(col) + } else { + query = sq.Select( + col, + "'' as name", + "COUNT(job.id) as totalJobs", + "COUNT(DISTINCT job.hpc_user) AS totalUsers", + fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END)) / 3600) as int) as totalWalltime`, now), + `CAST(SUM(job.num_nodes) as int) as totalNodes`, + fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_nodes) / 3600) as int) as totalNodeHours`, now), + `CAST(SUM(job.num_hwthreads) as int) as totalCores`, + fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_hwthreads) / 3600) as int) as totalCoreHours`, now), + `CAST(SUM(job.num_acc) as int) as totalAccs`, + fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_acc) / 3600) as int) as totalAccHours`, now), + `COUNT(CASE WHEN job.job_state = 'running' THEN 1 END) as runningJobs`, + fmt.Sprintf(`COUNT(CASE WHEN job.duration < %d THEN 1 END) as shortJobs`, shortThreshold), + ).From("job").GroupBy(col) + } } else { query = sq.Select( "COUNT(job.id) as totalJobs", "COUNT(DISTINCT job.hpc_user) AS totalUsers", - fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END)) / 3600) as int)`, time.Now().Unix()), + fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END)) / 3600) as int)`, now), `CAST(SUM(job.num_nodes) as int)`, - fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_nodes) / 3600) as int)`, time.Now().Unix()), + fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_nodes) / 3600) as int)`, now), `CAST(SUM(job.num_hwthreads) as int)`, - fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_hwthreads) / 3600) as int)`, time.Now().Unix()), + fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_hwthreads) / 3600) as int)`, now), `CAST(SUM(job.num_acc) as int)`, - fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_acc) / 3600) as int)`, time.Now().Unix()), + fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_acc) / 3600) as int)`, now), + `COUNT(CASE WHEN job.job_state = 'running' THEN 1 END) as runningJobs`, + fmt.Sprintf(`COUNT(CASE WHEN job.duration < %d THEN 1 END) as shortJobs`, shortThreshold), ).From("job") } @@ -214,7 +242,7 @@ func (r *JobRepository) JobsStatsGrouped( ) ([]*model.JobsStatistics, error) { start := time.Now() col := groupBy2column[*groupBy] - query := r.buildStatsQuery(filter, col) + query := r.buildStatsQuery(filter, col, config.Keys.ShortRunningJobsDuration) query, err := SecurityCheck(ctx, query) if err != nil { @@ -242,8 +270,8 @@ func (r *JobRepository) JobsStatsGrouped( for rows.Next() { var id sql.NullString var name sql.NullString - var jobs, users, walltime, nodes, nodeHours, cores, coreHours, accs, accHours sql.NullInt64 - if err := rows.Scan(&id, &name, &jobs, &users, &walltime, &nodes, &nodeHours, &cores, &coreHours, &accs, &accHours); err != nil { + var jobs, users, walltime, nodes, nodeHours, cores, coreHours, accs, accHours, runningJobs, shortJobs sql.NullInt64 + if err := rows.Scan(&id, &name, &jobs, &users, &walltime, &nodes, &nodeHours, &cores, &coreHours, &accs, &accHours, &runningJobs, &shortJobs); err != nil { cclog.Warnf("Error while scanning rows: %s", err.Error()) return nil, err } @@ -289,7 +317,6 @@ func (r *JobRepository) JobsStatsGrouped( } if col == "job.hpc_user" { - // name := r.getUserName(ctx, id.String) stats = append(stats, &model.JobsStatistics{ ID: id.String, @@ -302,6 +329,8 @@ func (r *JobRepository) JobsStatsGrouped( TotalCoreHours: totalCoreHours, TotalAccs: totalAccs, TotalAccHours: totalAccHours, + RunningJobs: int(runningJobs.Int64), + ShortJobs: int(shortJobs.Int64), }) } else { stats = append(stats, @@ -316,6 +345,8 @@ func (r *JobRepository) JobsStatsGrouped( TotalCoreHours: totalCoreHours, TotalAccs: totalAccs, TotalAccHours: totalAccHours, + RunningJobs: int(runningJobs.Int64), + ShortJobs: int(shortJobs.Int64), }) } } @@ -349,7 +380,7 @@ func (r *JobRepository) JobsStats( filter []*model.JobFilter, ) ([]*model.JobsStatistics, error) { start := time.Now() - query := r.buildStatsQuery(filter, "") + query := r.buildStatsQuery(filter, "", config.Keys.ShortRunningJobsDuration) query, err := SecurityCheck(ctx, query) if err != nil { return nil, err @@ -358,8 +389,8 @@ func (r *JobRepository) JobsStats( row := query.RunWith(r.DB).QueryRowContext(ctx) stats := make([]*model.JobsStatistics, 0, 1) - var jobs, users, walltime, nodes, nodeHours, cores, coreHours, accs, accHours sql.NullInt64 - if err := row.Scan(&jobs, &users, &walltime, &nodes, &nodeHours, &cores, &coreHours, &accs, &accHours); err != nil { + var jobs, users, walltime, nodes, nodeHours, cores, coreHours, accs, accHours, runningJobs, shortJobs sql.NullInt64 + if err := row.Scan(&jobs, &users, &walltime, &nodes, &nodeHours, &cores, &coreHours, &accs, &accHours, &runningJobs, &shortJobs); err != nil { cclog.Warn("Error while scanning rows") return nil, err } @@ -384,6 +415,8 @@ func (r *JobRepository) JobsStats( TotalNodeHours: totalNodeHours, TotalCoreHours: totalCoreHours, TotalAccHours: totalAccHours, + RunningJobs: int(runningJobs.Int64), + ShortJobs: int(shortJobs.Int64), }) } @@ -819,12 +852,18 @@ func (r *JobRepository) jobsDurationStatisticsHistogram( // Each bin represents a duration range: bin N = [N*binSizeSeconds, (N+1)*binSizeSeconds) // Example: binSizeSeconds=3600 (1 hour), bin 1 = 0-1h, bin 2 = 1-2h, etc. - points := make([]*model.HistoPoint, 0) + points := make([]*model.HistoPoint, 0, *targetBinCount) for i := 1; i <= *targetBinCount; i++ { point := model.HistoPoint{Value: i * binSizeSeconds, Count: 0} points = append(points, &point) } + // Build a map from bin value (seconds) to slice index for O(1) lookup. + binMap := make(map[int]int, len(points)) + for i, p := range points { + binMap[p.Value] = i + } + for _, f := range filters { query = BuildWhereClause(f, query) } @@ -836,8 +875,8 @@ func (r *JobRepository) jobsDurationStatisticsHistogram( } defer rows.Close() - // Match query results to pre-initialized bins. - // point.Value from query is the bin number; multiply by binSizeSeconds to match bin.Value. + // Match query results to pre-initialized bins using map lookup. + // point.Value from query is the bin number; multiply by binSizeSeconds to get bin key. for rows.Next() { point := model.HistoPoint{} if err := rows.Scan(&point.Value, &point.Count); err != nil { @@ -845,11 +884,8 @@ func (r *JobRepository) jobsDurationStatisticsHistogram( return nil, err } - for _, e := range points { - if e.Value == (point.Value * binSizeSeconds) { - e.Count = point.Count - break - } + if idx, ok := binMap[point.Value*binSizeSeconds]; ok { + points[idx].Count = point.Count } } @@ -968,16 +1004,25 @@ func (r *JobRepository) jobsMetricStatisticsHistogram( // Pre-initialize bins with calculated min/max ranges. // Example: peak=1000, bins=10 -> bin 1=[0,100), bin 2=[100,200), ..., bin 10=[900,1000] - points := make([]*model.MetricHistoPoint, 0) + points := make([]*model.MetricHistoPoint, 0, *bins) binStep := int(peak) / *bins for i := 1; i <= *bins; i++ { binMin := (binStep * (i - 1)) binMax := (binStep * i) - epoint := model.MetricHistoPoint{Bin: &i, Count: 0, Min: &binMin, Max: &binMax} + idx := i + epoint := model.MetricHistoPoint{Bin: &idx, Count: 0, Min: &binMin, Max: &binMax} points = append(points, &epoint) } - // Match query results to pre-initialized bins. + // Build a map from bin number to slice index for O(1) lookup. + binMap := make(map[int]int, len(points)) + for i, p := range points { + if p.Bin != nil { + binMap[*p.Bin] = i + } + } + + // Match query results to pre-initialized bins using map lookup. for rows.Next() { rpoint := model.MetricHistoPoint{} if err := rows.Scan(&rpoint.Bin, &rpoint.Count); err != nil { @@ -985,10 +1030,9 @@ func (r *JobRepository) jobsMetricStatisticsHistogram( return nil, err } - for _, e := range points { - if e.Bin != nil && rpoint.Bin != nil && *e.Bin == *rpoint.Bin { - e.Count = rpoint.Count - break + if rpoint.Bin != nil { + if idx, ok := binMap[*rpoint.Bin]; ok { + points[idx].Count = rpoint.Count } } } diff --git a/internal/repository/stats_test.go b/internal/repository/stats_test.go index a6c2da17..b5638f31 100644 --- a/internal/repository/stats_test.go +++ b/internal/repository/stats_test.go @@ -14,7 +14,7 @@ import ( func TestBuildJobStatsQuery(t *testing.T) { r := setup(t) - q := r.buildStatsQuery(nil, "USER") + q := r.buildStatsQuery(nil, "USER", 300) sql, _, err := q.ToSql() noErr(t, err)