Convert histogram query to json keys

This commit is contained in:
Jan Eitzinger 2024-07-12 13:42:12 +02:00
parent c61ffce0e9
commit 0458675608
Signed by: moebiusband
GPG Key ID: 2574BA29B90D6DD5

View File

@ -41,8 +41,8 @@ var sortBy2column = map[model.SortByAggregate]string{
func (r *JobRepository) buildCountQuery( func (r *JobRepository) buildCountQuery(
filter []*model.JobFilter, filter []*model.JobFilter,
kind string, kind string,
col string) sq.SelectBuilder { col string,
) sq.SelectBuilder {
var query sq.SelectBuilder var query sq.SelectBuilder
if col != "" { if col != "" {
@ -69,8 +69,8 @@ func (r *JobRepository) buildCountQuery(
func (r *JobRepository) buildStatsQuery( func (r *JobRepository) buildStatsQuery(
filter []*model.JobFilter, filter []*model.JobFilter,
col string) sq.SelectBuilder { col string,
) sq.SelectBuilder {
var query sq.SelectBuilder var query sq.SelectBuilder
castType := r.getCastType() castType := r.getCastType()
@ -87,7 +87,6 @@ func (r *JobRepository) buildStatsQuery(
fmt.Sprintf(`CAST(SUM(job.num_acc) as %s) as totalAccs`, castType), fmt.Sprintf(`CAST(SUM(job.num_acc) as %s) as totalAccs`, castType),
fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_acc) / 3600) as %s) as totalAccHours`, time.Now().Unix(), castType), fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_acc) / 3600) as %s) as totalAccHours`, time.Now().Unix(), castType),
).From("job").GroupBy(col) ).From("job").GroupBy(col)
} else { } else {
// Scan columns: totalJobs, totalWalltime, totalNodes, totalNodeHours, totalCores, totalCoreHours, totalAccs, totalAccHours // Scan columns: totalJobs, totalWalltime, totalNodes, totalNodeHours, totalCores, totalCoreHours, totalAccs, totalAccHours
query = sq.Select("COUNT(job.id)", query = sq.Select("COUNT(job.id)",
@ -138,8 +137,8 @@ func (r *JobRepository) JobsStatsGrouped(
filter []*model.JobFilter, filter []*model.JobFilter,
page *model.PageRequest, page *model.PageRequest,
sortBy *model.SortByAggregate, sortBy *model.SortByAggregate,
groupBy *model.Aggregate) ([]*model.JobsStatistics, error) { groupBy *model.Aggregate,
) ([]*model.JobsStatistics, error) {
start := time.Now() start := time.Now()
col := groupBy2column[*groupBy] col := groupBy2column[*groupBy]
query := r.buildStatsQuery(filter, col) query := r.buildStatsQuery(filter, col)
@ -218,7 +217,8 @@ func (r *JobRepository) JobsStatsGrouped(
TotalCores: totalCores, TotalCores: totalCores,
TotalCoreHours: totalCoreHours, TotalCoreHours: totalCoreHours,
TotalAccs: totalAccs, TotalAccs: totalAccs,
TotalAccHours: totalAccHours}) TotalAccHours: totalAccHours,
})
} else { } else {
stats = append(stats, stats = append(stats,
&model.JobsStatistics{ &model.JobsStatistics{
@ -230,7 +230,8 @@ func (r *JobRepository) JobsStatsGrouped(
TotalCores: totalCores, TotalCores: totalCores,
TotalCoreHours: totalCoreHours, TotalCoreHours: totalCoreHours,
TotalAccs: totalAccs, TotalAccs: totalAccs,
TotalAccHours: totalAccHours}) TotalAccHours: totalAccHours,
})
} }
} }
} }
@ -241,8 +242,8 @@ func (r *JobRepository) JobsStatsGrouped(
func (r *JobRepository) JobsStats( func (r *JobRepository) JobsStats(
ctx context.Context, ctx context.Context,
filter []*model.JobFilter) ([]*model.JobsStatistics, error) { filter []*model.JobFilter,
) ([]*model.JobsStatistics, error) {
start := time.Now() start := time.Now()
query := r.buildStatsQuery(filter, "") query := r.buildStatsQuery(filter, "")
query, err := SecurityCheck(ctx, query) query, err := SecurityCheck(ctx, query)
@ -277,7 +278,8 @@ func (r *JobRepository) JobsStats(
TotalWalltime: int(walltime.Int64), TotalWalltime: int(walltime.Int64),
TotalNodeHours: totalNodeHours, TotalNodeHours: totalNodeHours,
TotalCoreHours: totalCoreHours, TotalCoreHours: totalCoreHours,
TotalAccHours: totalAccHours}) TotalAccHours: totalAccHours,
})
} }
log.Debugf("Timer JobStats %s", time.Since(start)) log.Debugf("Timer JobStats %s", time.Since(start))
@ -299,8 +301,8 @@ func LoadJobStat(job *schema.JobMeta, metric string) float64 {
func (r *JobRepository) JobCountGrouped( func (r *JobRepository) JobCountGrouped(
ctx context.Context, ctx context.Context,
filter []*model.JobFilter, filter []*model.JobFilter,
groupBy *model.Aggregate) ([]*model.JobsStatistics, error) { groupBy *model.Aggregate,
) ([]*model.JobsStatistics, error) {
start := time.Now() start := time.Now()
col := groupBy2column[*groupBy] col := groupBy2column[*groupBy]
query := r.buildCountQuery(filter, "", col) query := r.buildCountQuery(filter, "", col)
@ -327,7 +329,8 @@ func (r *JobRepository) JobCountGrouped(
stats = append(stats, stats = append(stats,
&model.JobsStatistics{ &model.JobsStatistics{
ID: id.String, ID: id.String,
TotalJobs: int(cnt.Int64)}) TotalJobs: int(cnt.Int64),
})
} }
} }
@ -340,8 +343,8 @@ func (r *JobRepository) AddJobCountGrouped(
filter []*model.JobFilter, filter []*model.JobFilter,
groupBy *model.Aggregate, groupBy *model.Aggregate,
stats []*model.JobsStatistics, stats []*model.JobsStatistics,
kind string) ([]*model.JobsStatistics, error) { kind string,
) ([]*model.JobsStatistics, error) {
start := time.Now() start := time.Now()
col := groupBy2column[*groupBy] col := groupBy2column[*groupBy]
query := r.buildCountQuery(filter, kind, col) query := r.buildCountQuery(filter, kind, col)
@ -388,8 +391,8 @@ func (r *JobRepository) AddJobCount(
ctx context.Context, ctx context.Context,
filter []*model.JobFilter, filter []*model.JobFilter,
stats []*model.JobsStatistics, stats []*model.JobsStatistics,
kind string) ([]*model.JobsStatistics, error) { kind string,
) ([]*model.JobsStatistics, error) {
start := time.Now() start := time.Now()
query := r.buildCountQuery(filter, kind, "") query := r.buildCountQuery(filter, kind, "")
query, err := SecurityCheck(ctx, query) query, err := SecurityCheck(ctx, query)
@ -432,7 +435,8 @@ func (r *JobRepository) AddJobCount(
func (r *JobRepository) AddHistograms( func (r *JobRepository) AddHistograms(
ctx context.Context, ctx context.Context,
filter []*model.JobFilter, filter []*model.JobFilter,
stat *model.JobsStatistics) (*model.JobsStatistics, error) { stat *model.JobsStatistics,
) (*model.JobsStatistics, error) {
start := time.Now() start := time.Now()
castType := r.getCastType() castType := r.getCastType()
@ -471,7 +475,8 @@ func (r *JobRepository) AddMetricHistograms(
ctx context.Context, ctx context.Context,
filter []*model.JobFilter, filter []*model.JobFilter,
metrics []string, metrics []string,
stat *model.JobsStatistics) (*model.JobsStatistics, error) { stat *model.JobsStatistics,
) (*model.JobsStatistics, error) {
start := time.Now() start := time.Now()
// Running Jobs Only: First query jobdata from sqlite, then query data and make bins // Running Jobs Only: First query jobdata from sqlite, then query data and make bins
@ -503,8 +508,8 @@ func (r *JobRepository) AddMetricHistograms(
func (r *JobRepository) jobsStatisticsHistogram( func (r *JobRepository) jobsStatisticsHistogram(
ctx context.Context, ctx context.Context,
value string, value string,
filters []*model.JobFilter) ([]*model.HistoPoint, error) { filters []*model.JobFilter,
) ([]*model.HistoPoint, error) {
start := time.Now() start := time.Now()
query, qerr := SecurityCheck(ctx, query, qerr := SecurityCheck(ctx,
sq.Select(value, "COUNT(job.id) AS count").From("job")) sq.Select(value, "COUNT(job.id) AS count").From("job"))
@ -540,26 +545,8 @@ func (r *JobRepository) jobsStatisticsHistogram(
func (r *JobRepository) jobsMetricStatisticsHistogram( func (r *JobRepository) jobsMetricStatisticsHistogram(
ctx context.Context, ctx context.Context,
metric string, metric string,
filters []*model.JobFilter) (*model.MetricHistoPoints, error) { filters []*model.JobFilter,
) (*model.MetricHistoPoints, error) {
var dbMetric string
switch metric {
case "cpu_load":
dbMetric = "load_avg"
case "flops_any":
dbMetric = "flops_any_avg"
case "mem_bw":
dbMetric = "mem_bw_avg"
case "mem_used":
dbMetric = "mem_used_max"
case "net_bw":
dbMetric = "net_bw_avg"
case "file_bw":
dbMetric = "file_bw_avg"
default:
return nil, fmt.Errorf("%s not implemented", metric)
}
// Get specific Peak or largest Peak // Get specific Peak or largest Peak
var metricConfig *schema.MetricConfig var metricConfig *schema.MetricConfig
var peak float64 = 0.0 var peak float64 = 0.0
@ -593,14 +580,15 @@ func (r *JobRepository) jobsMetricStatisticsHistogram(
// Make bins, see https://jereze.com/code/sql-histogram/ // Make bins, see https://jereze.com/code/sql-histogram/
start := time.Now() start := time.Now()
jm := fmt.Sprintf(`json_extract(footprint, "$.%s")`, metric)
crossJoinQuery := sq.Select( crossJoinQuery := sq.Select(
fmt.Sprintf(`max(%s) as max`, dbMetric), fmt.Sprintf(`max(%s) as max`, jm),
fmt.Sprintf(`min(%s) as min`, dbMetric), fmt.Sprintf(`min(%s) as min`, jm),
).From("job").Where( ).From("job").Where(
fmt.Sprintf(`%s is not null`, dbMetric), fmt.Sprintf(`%s is not null`, jm),
).Where( ).Where(
fmt.Sprintf(`%s <= %f`, dbMetric, peak), fmt.Sprintf(`%s <= %f`, jm, peak),
) )
crossJoinQuery, cjqerr := SecurityCheck(ctx, crossJoinQuery) crossJoinQuery, cjqerr := SecurityCheck(ctx, crossJoinQuery)
@ -619,16 +607,18 @@ func (r *JobRepository) jobsMetricStatisticsHistogram(
} }
bins := 10 bins := 10
binQuery := fmt.Sprintf(`CAST( (case when job.%s = value.max then value.max*0.999999999 else job.%s end - value.min) / (value.max - value.min) * %d as INTEGER )`, dbMetric, dbMetric, bins) binQuery := fmt.Sprintf(`CAST( (case when %s = value.max
then value.max*0.999999999 else %s end - value.min) / (value.max -
value.min) * %d as INTEGER )`, metric, metric, bins)
mainQuery := sq.Select( mainQuery := sq.Select(
fmt.Sprintf(`%s + 1 as bin`, binQuery), fmt.Sprintf(`%s + 1 as bin`, binQuery),
fmt.Sprintf(`count(job.%s) as count`, dbMetric), fmt.Sprintf(`count(%s) as count`, metric),
fmt.Sprintf(`CAST(((value.max / %d) * (%s )) as INTEGER ) as min`, bins, binQuery), fmt.Sprintf(`CAST(((value.max / %d) * (%s )) as INTEGER ) as min`, bins, binQuery),
fmt.Sprintf(`CAST(((value.max / %d) * (%s + 1 )) as INTEGER ) as max`, bins, binQuery), fmt.Sprintf(`CAST(((value.max / %d) * (%s + 1 )) as INTEGER ) as max`, bins, binQuery),
).From("job").CrossJoin( ).From("job").CrossJoin(
fmt.Sprintf(`(%s) as value`, crossJoinQuerySql), crossJoinQueryArgs..., fmt.Sprintf(`(%s) as value`, crossJoinQuerySql), crossJoinQueryArgs...,
).Where(fmt.Sprintf(`job.%s is not null and job.%s <= %f`, dbMetric, dbMetric, peak)) ).Where(fmt.Sprintf(`%s is not null and %s <= %f`, metric, metric, peak))
mainQuery, qerr := SecurityCheck(ctx, mainQuery) mainQuery, qerr := SecurityCheck(ctx, mainQuery)
@ -669,8 +659,8 @@ func (r *JobRepository) jobsMetricStatisticsHistogram(
func (r *JobRepository) runningJobsMetricStatisticsHistogram( func (r *JobRepository) runningJobsMetricStatisticsHistogram(
ctx context.Context, ctx context.Context,
metrics []string, metrics []string,
filters []*model.JobFilter) []*model.MetricHistoPoints { filters []*model.JobFilter,
) []*model.MetricHistoPoints {
// Get Jobs // Get Jobs
jobs, err := r.QueryJobs(ctx, filters, &model.PageRequest{Page: 1, ItemsPerPage: 500 + 1}, nil) jobs, err := r.QueryJobs(ctx, filters, &model.PageRequest{Page: 1, ItemsPerPage: 500 + 1}, nil)
if err != nil { if err != nil {