diff --git a/internal/repository/stats.go b/internal/repository/stats.go index 6865e18..0775db5 100644 --- a/internal/repository/stats.go +++ b/internal/repository/stats.go @@ -41,8 +41,8 @@ var sortBy2column = map[model.SortByAggregate]string{ func (r *JobRepository) buildCountQuery( filter []*model.JobFilter, kind string, - col string) sq.SelectBuilder { - + col string, +) sq.SelectBuilder { var query sq.SelectBuilder if col != "" { @@ -69,8 +69,8 @@ func (r *JobRepository) buildCountQuery( func (r *JobRepository) buildStatsQuery( filter []*model.JobFilter, - col string) sq.SelectBuilder { - + col string, +) sq.SelectBuilder { var query sq.SelectBuilder castType := r.getCastType() @@ -87,7 +87,6 @@ func (r *JobRepository) buildStatsQuery( fmt.Sprintf(`CAST(SUM(job.num_acc) as %s) as totalAccs`, castType), fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_acc) / 3600) as %s) as totalAccHours`, time.Now().Unix(), castType), ).From("job").GroupBy(col) - } else { // Scan columns: totalJobs, totalWalltime, totalNodes, totalNodeHours, totalCores, totalCoreHours, totalAccs, totalAccHours query = sq.Select("COUNT(job.id)", @@ -138,8 +137,8 @@ func (r *JobRepository) JobsStatsGrouped( filter []*model.JobFilter, page *model.PageRequest, sortBy *model.SortByAggregate, - groupBy *model.Aggregate) ([]*model.JobsStatistics, error) { - + groupBy *model.Aggregate, +) ([]*model.JobsStatistics, error) { start := time.Now() col := groupBy2column[*groupBy] query := r.buildStatsQuery(filter, col) @@ -218,7 +217,8 @@ func (r *JobRepository) JobsStatsGrouped( TotalCores: totalCores, TotalCoreHours: totalCoreHours, TotalAccs: totalAccs, - TotalAccHours: totalAccHours}) + TotalAccHours: totalAccHours, + }) } else { stats = append(stats, &model.JobsStatistics{ @@ -230,7 +230,8 @@ func (r *JobRepository) JobsStatsGrouped( TotalCores: totalCores, TotalCoreHours: totalCoreHours, TotalAccs: totalAccs, - TotalAccHours: totalAccHours}) + TotalAccHours: totalAccHours, + }) } } } @@ -241,8 +242,8 @@ func (r *JobRepository) JobsStatsGrouped( func (r *JobRepository) JobsStats( ctx context.Context, - filter []*model.JobFilter) ([]*model.JobsStatistics, error) { - + filter []*model.JobFilter, +) ([]*model.JobsStatistics, error) { start := time.Now() query := r.buildStatsQuery(filter, "") query, err := SecurityCheck(ctx, query) @@ -277,7 +278,8 @@ func (r *JobRepository) JobsStats( TotalWalltime: int(walltime.Int64), TotalNodeHours: totalNodeHours, TotalCoreHours: totalCoreHours, - TotalAccHours: totalAccHours}) + TotalAccHours: totalAccHours, + }) } log.Debugf("Timer JobStats %s", time.Since(start)) @@ -299,8 +301,8 @@ func LoadJobStat(job *schema.JobMeta, metric string) float64 { func (r *JobRepository) JobCountGrouped( ctx context.Context, filter []*model.JobFilter, - groupBy *model.Aggregate) ([]*model.JobsStatistics, error) { - + groupBy *model.Aggregate, +) ([]*model.JobsStatistics, error) { start := time.Now() col := groupBy2column[*groupBy] query := r.buildCountQuery(filter, "", col) @@ -327,7 +329,8 @@ func (r *JobRepository) JobCountGrouped( stats = append(stats, &model.JobsStatistics{ ID: id.String, - TotalJobs: int(cnt.Int64)}) + TotalJobs: int(cnt.Int64), + }) } } @@ -340,8 +343,8 @@ func (r *JobRepository) AddJobCountGrouped( filter []*model.JobFilter, groupBy *model.Aggregate, stats []*model.JobsStatistics, - kind string) ([]*model.JobsStatistics, error) { - + kind string, +) ([]*model.JobsStatistics, error) { start := time.Now() col := groupBy2column[*groupBy] query := r.buildCountQuery(filter, kind, col) @@ -388,8 +391,8 @@ func (r *JobRepository) AddJobCount( ctx context.Context, filter []*model.JobFilter, stats []*model.JobsStatistics, - kind string) ([]*model.JobsStatistics, error) { - + kind string, +) ([]*model.JobsStatistics, error) { start := time.Now() query := r.buildCountQuery(filter, kind, "") query, err := SecurityCheck(ctx, query) @@ -432,7 +435,8 @@ func (r *JobRepository) AddJobCount( func (r *JobRepository) AddHistograms( ctx context.Context, filter []*model.JobFilter, - stat *model.JobsStatistics) (*model.JobsStatistics, error) { + stat *model.JobsStatistics, +) (*model.JobsStatistics, error) { start := time.Now() castType := r.getCastType() @@ -471,7 +475,8 @@ func (r *JobRepository) AddMetricHistograms( ctx context.Context, filter []*model.JobFilter, metrics []string, - stat *model.JobsStatistics) (*model.JobsStatistics, error) { + stat *model.JobsStatistics, +) (*model.JobsStatistics, error) { start := time.Now() // Running Jobs Only: First query jobdata from sqlite, then query data and make bins @@ -503,8 +508,8 @@ func (r *JobRepository) AddMetricHistograms( func (r *JobRepository) jobsStatisticsHistogram( ctx context.Context, value string, - filters []*model.JobFilter) ([]*model.HistoPoint, error) { - + filters []*model.JobFilter, +) ([]*model.HistoPoint, error) { start := time.Now() query, qerr := SecurityCheck(ctx, sq.Select(value, "COUNT(job.id) AS count").From("job")) @@ -540,26 +545,8 @@ func (r *JobRepository) jobsStatisticsHistogram( func (r *JobRepository) jobsMetricStatisticsHistogram( ctx context.Context, metric string, - filters []*model.JobFilter) (*model.MetricHistoPoints, error) { - - var dbMetric string - switch metric { - case "cpu_load": - dbMetric = "load_avg" - case "flops_any": - dbMetric = "flops_any_avg" - case "mem_bw": - dbMetric = "mem_bw_avg" - case "mem_used": - dbMetric = "mem_used_max" - case "net_bw": - dbMetric = "net_bw_avg" - case "file_bw": - dbMetric = "file_bw_avg" - default: - return nil, fmt.Errorf("%s not implemented", metric) - } - + filters []*model.JobFilter, +) (*model.MetricHistoPoints, error) { // Get specific Peak or largest Peak var metricConfig *schema.MetricConfig var peak float64 = 0.0 @@ -593,14 +580,15 @@ func (r *JobRepository) jobsMetricStatisticsHistogram( // Make bins, see https://jereze.com/code/sql-histogram/ start := time.Now() + jm := fmt.Sprintf(`json_extract(footprint, "$.%s")`, metric) crossJoinQuery := sq.Select( - fmt.Sprintf(`max(%s) as max`, dbMetric), - fmt.Sprintf(`min(%s) as min`, dbMetric), + fmt.Sprintf(`max(%s) as max`, jm), + fmt.Sprintf(`min(%s) as min`, jm), ).From("job").Where( - fmt.Sprintf(`%s is not null`, dbMetric), + fmt.Sprintf(`%s is not null`, jm), ).Where( - fmt.Sprintf(`%s <= %f`, dbMetric, peak), + fmt.Sprintf(`%s <= %f`, jm, peak), ) crossJoinQuery, cjqerr := SecurityCheck(ctx, crossJoinQuery) @@ -619,16 +607,18 @@ func (r *JobRepository) jobsMetricStatisticsHistogram( } bins := 10 - binQuery := fmt.Sprintf(`CAST( (case when job.%s = value.max then value.max*0.999999999 else job.%s end - value.min) / (value.max - value.min) * %d as INTEGER )`, dbMetric, dbMetric, bins) + binQuery := fmt.Sprintf(`CAST( (case when %s = value.max + then value.max*0.999999999 else %s end - value.min) / (value.max - + value.min) * %d as INTEGER )`, metric, metric, bins) mainQuery := sq.Select( fmt.Sprintf(`%s + 1 as bin`, binQuery), - fmt.Sprintf(`count(job.%s) as count`, dbMetric), + fmt.Sprintf(`count(%s) as count`, metric), fmt.Sprintf(`CAST(((value.max / %d) * (%s )) as INTEGER ) as min`, bins, binQuery), fmt.Sprintf(`CAST(((value.max / %d) * (%s + 1 )) as INTEGER ) as max`, bins, binQuery), ).From("job").CrossJoin( fmt.Sprintf(`(%s) as value`, crossJoinQuerySql), crossJoinQueryArgs..., - ).Where(fmt.Sprintf(`job.%s is not null and job.%s <= %f`, dbMetric, dbMetric, peak)) + ).Where(fmt.Sprintf(`%s is not null and %s <= %f`, metric, metric, peak)) mainQuery, qerr := SecurityCheck(ctx, mainQuery) @@ -669,8 +659,8 @@ func (r *JobRepository) jobsMetricStatisticsHistogram( func (r *JobRepository) runningJobsMetricStatisticsHistogram( ctx context.Context, metrics []string, - filters []*model.JobFilter) []*model.MetricHistoPoints { - + filters []*model.JobFilter, +) []*model.MetricHistoPoints { // Get Jobs jobs, err := r.QueryJobs(ctx, filters, &model.PageRequest{Page: 1, ItemsPerPage: 500 + 1}, nil) if err != nil {