Finished backend sql query and gql resolve

This commit is contained in:
Christoph Kluge
2023-12-05 11:59:01 +01:00
parent 9bc36152d9
commit b5b355c16c
5 changed files with 445 additions and 107 deletions

View File

@@ -521,65 +521,89 @@ func (r *JobRepository) jobsMetricStatisticsHistogram(
metric string,
filters []*model.JobFilter) (*model.MetricHistoPoints, error) {
// "job.load_avg as value"
// switch m {
// case "cpu_load":
var dbMetric string
switch metric {
case "cpu_load":
dbMetric = "load_avg"
case "flops_any":
dbMetric = "flops_any_avg"
case "mem_bw":
dbMetric = "mem_bw_avg"
default:
return nil, fmt.Errorf("%s not implemented", metric)
}
// Get specific Peak or largest Peak
var metricConfig *schema.MetricConfig
var peak float64 = 0.0
for _, f := range filters {
if f.Cluster != nil {
metricConfig = archive.GetMetricConfig(*f.Cluster.Eq, metric)
peak = metricConfig.Peak
} else {
for _, c := range archive.Clusters {
for _, m := range c.MetricConfig {
if m.Name == metric {
if m.Peak > peak {
peak = m.Peak
}
log.Debugf("Cluster %s filter found with peak %f for %s", *f.Cluster.Eq, peak, metric)
}
}
if peak == 0.0 {
for _, c := range archive.Clusters {
for _, m := range c.MetricConfig {
if m.Name == metric {
if m.Peak > peak {
peak = m.Peak
}
}
}
}
}
// Make bins
// Make bins, see https://jereze.com/code/sql-histogram/
// Diffs:
// CAST(X AS INTEGER) instead of floor(X), used also for for Min , Max selection
// renamed to bin for simplicity and model struct
// Ditched rename from job to data, as it conflicts with security check afterwards
start := time.Now()
query, qerr := SecurityCheck(ctx,
sq.Select(value, "COUNT(job.id) AS count").From("job"))
prepQuery := sq.Select(
fmt.Sprintf(`CAST(min(job.%s) as INTEGER) as min`, dbMetric),
fmt.Sprintf(`CAST(max(job.%s) as INTEGER) as max`, dbMetric),
fmt.Sprintf(`count(job.%s) as count`, dbMetric),
fmt.Sprintf(`CAST((case when job.%s = value.max then value.max*0.999999999 else job.%s end - value.min) / (value.max - value.min) * 10 as INTEGER) +1 as bin`, dbMetric, dbMetric))
prepQuery = prepQuery.From("job")
prepQuery = prepQuery.CrossJoin(fmt.Sprintf(`(select max(%s) as max, min(%s) as min from job where %s is not null and %s < %f) as value`, dbMetric, dbMetric, dbMetric, dbMetric, peak))
prepQuery = prepQuery.Where(fmt.Sprintf(`job.%s is not null and job.%s < %f`, dbMetric, dbMetric, peak))
query, qerr := SecurityCheck(ctx, prepQuery)
if qerr != nil {
return nil, qerr
}
for _, f := range filters {
if f.Cluster != nil {
metricConfig = archive.GetMetricConfig(*f.Cluster.Eq, metric)
peak = metricConfig.Peak
}
query = BuildWhereClause(f, query)
}
rows, err := query.GroupBy("value").RunWith(r.DB).Query()
// Finalize query with Grouping and Ordering
query = query.GroupBy("bin").OrderBy("bin")
rows, err := query.RunWith(r.DB).Query()
if err != nil {
log.Error("Error while running query")
log.Errorf("Error while running query: %s", err)
return nil, err
}
points := make([]*model.HistoPoint, 0)
points := make([]*model.MetricHistoPoint, 0)
for rows.Next() {
point := model.HistoPoint{}
if err := rows.Scan(&point.Value, &point.Count); err != nil {
point := model.MetricHistoPoint{}
if err := rows.Scan(&point.Min, &point.Max, &point.Count, &point.Bin); err != nil {
log.Warn("Error while scanning rows")
return nil, err
}
points = append(points, &point)
}
result := model.MetricHistoPoints{Metric: metric, Data: points}
log.Debugf("Timer jobsStatisticsHistogram %s", time.Since(start))
return points, nil
return &result, nil
}