Merge pull request #323 from ClusterCockpit/add_histogram_bin_select

Add histogram bin select
This commit is contained in:
Jan Eitzinger
2025-01-28 13:38:15 +01:00
committed by GitHub
9 changed files with 1391 additions and 375 deletions

View File

@@ -8,7 +8,6 @@ import (
"context"
"database/sql"
"fmt"
"math"
"time"
"github.com/ClusterCockpit/cc-backend/internal/config"
@@ -447,15 +446,40 @@ func (r *JobRepository) AddHistograms(
ctx context.Context,
filter []*model.JobFilter,
stat *model.JobsStatistics,
durationBins *string,
) (*model.JobsStatistics, error) {
start := time.Now()
var targetBinCount int
var targetBinSize int
switch {
case *durationBins == "1m": // 1 Minute Bins + Max 60 Bins -> Max 60 Minutes
targetBinCount = 60
targetBinSize = 60
case *durationBins == "10m": // 10 Minute Bins + Max 72 Bins -> Max 12 Hours
targetBinCount = 72
targetBinSize = 600
case *durationBins == "1h": // 1 Hour Bins + Max 48 Bins -> Max 48 Hours
targetBinCount = 48
targetBinSize = 3600
case *durationBins == "6h": // 6 Hour Bins + Max 12 Bins -> Max 3 Days
targetBinCount = 12
targetBinSize = 21600
case *durationBins == "12h": // 12 hour Bins + Max 14 Bins -> Max 7 Days
targetBinCount = 14
targetBinSize = 43200
default: // 24h
targetBinCount = 24
targetBinSize = 3600
}
castType := r.getCastType()
var err error
value := fmt.Sprintf(`CAST(ROUND((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) / 3600) as %s) as value`, time.Now().Unix(), castType)
stat.HistDuration, err = r.jobsStatisticsHistogram(ctx, value, filter)
// Return X-Values always as seconds, will be formatted into minutes and hours in frontend
value := fmt.Sprintf(`CAST(ROUND(((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) / %d) + 1) as %s) as value`, time.Now().Unix(), targetBinSize, castType)
stat.HistDuration, err = r.jobsDurationStatisticsHistogram(ctx, value, filter, targetBinSize, &targetBinCount)
if err != nil {
log.Warn("Error while loading job statistics histogram: running jobs")
log.Warn("Error while loading job statistics histogram: job duration")
return nil, err
}
@@ -487,6 +511,7 @@ func (r *JobRepository) AddMetricHistograms(
filter []*model.JobFilter,
metrics []string,
stat *model.JobsStatistics,
targetBinCount *int,
) (*model.JobsStatistics, error) {
start := time.Now()
@@ -494,7 +519,7 @@ func (r *JobRepository) AddMetricHistograms(
for _, f := range filter {
if f.State != nil {
if len(f.State) == 1 && f.State[0] == "running" {
stat.HistMetrics = r.runningJobsMetricStatisticsHistogram(ctx, metrics, filter)
stat.HistMetrics = r.runningJobsMetricStatisticsHistogram(ctx, metrics, filter, targetBinCount)
log.Debugf("Timer AddMetricHistograms %s", time.Since(start))
return stat, nil
}
@@ -503,7 +528,7 @@ func (r *JobRepository) AddMetricHistograms(
// All other cases: Query and make bins in sqlite directly
for _, m := range metrics {
metricHisto, err := r.jobsMetricStatisticsHistogram(ctx, m, filter)
metricHisto, err := r.jobsMetricStatisticsHistogram(ctx, m, filter, targetBinCount)
if err != nil {
log.Warnf("Error while loading job metric statistics histogram: %s", m)
continue
@@ -540,6 +565,7 @@ func (r *JobRepository) jobsStatisticsHistogram(
}
points := make([]*model.HistoPoint, 0)
// is it possible to introduce zero values here? requires info about bincount
for rows.Next() {
point := model.HistoPoint{}
if err := rows.Scan(&point.Value, &point.Count); err != nil {
@@ -553,10 +579,66 @@ func (r *JobRepository) jobsStatisticsHistogram(
return points, nil
}
func (r *JobRepository) jobsDurationStatisticsHistogram(
ctx context.Context,
value string,
filters []*model.JobFilter,
binSizeSeconds int,
targetBinCount *int,
) ([]*model.HistoPoint, error) {
start := time.Now()
query, qerr := SecurityCheck(ctx,
sq.Select(value, "COUNT(job.id) AS count").From("job"))
if qerr != nil {
return nil, qerr
}
// Setup Array
points := make([]*model.HistoPoint, 0)
for i := 1; i <= *targetBinCount; i++ {
point := model.HistoPoint{Value: i * binSizeSeconds, Count: 0}
points = append(points, &point)
}
for _, f := range filters {
query = BuildWhereClause(f, query)
}
rows, err := query.GroupBy("value").RunWith(r.DB).Query()
if err != nil {
log.Error("Error while running query")
return nil, err
}
// Fill Array at matching $Value
for rows.Next() {
point := model.HistoPoint{}
if err := rows.Scan(&point.Value, &point.Count); err != nil {
log.Warn("Error while scanning rows")
return nil, err
}
for _, e := range points {
if e.Value == (point.Value * binSizeSeconds) {
// Note:
// Matching on unmodified integer value (and multiplying point.Value by binSizeSeconds after match)
// causes frontend to loop into highest targetBinCount, due to zoom condition instantly being fullfilled (cause unknown)
e.Count = point.Count
break
}
}
}
log.Debugf("Timer jobsStatisticsHistogram %s", time.Since(start))
return points, nil
}
func (r *JobRepository) jobsMetricStatisticsHistogram(
ctx context.Context,
metric string,
filters []*model.JobFilter,
bins *int,
) (*model.MetricHistoPoints, error) {
// Get specific Peak or largest Peak
var metricConfig *schema.MetricConfig
@@ -624,16 +706,15 @@ func (r *JobRepository) jobsMetricStatisticsHistogram(
return nil, sqlerr
}
bins := 10
binQuery := fmt.Sprintf(`CAST( (case when %s = value.max
then value.max*0.999999999 else %s end - value.min) / (value.max -
value.min) * %d as INTEGER )`, jm, jm, bins)
value.min) * %v as INTEGER )`, jm, jm, *bins)
mainQuery := sq.Select(
fmt.Sprintf(`%s + 1 as bin`, binQuery),
fmt.Sprintf(`count(%s) as count`, jm),
fmt.Sprintf(`CAST(((value.max / %d) * (%s )) as INTEGER ) as min`, bins, binQuery),
fmt.Sprintf(`CAST(((value.max / %d) * (%s + 1 )) as INTEGER ) as max`, bins, binQuery),
fmt.Sprintf(`CAST(((value.max / %d) * (%v )) as INTEGER ) as min`, *bins, binQuery),
fmt.Sprintf(`CAST(((value.max / %d) * (%v + 1 )) as INTEGER ) as max`, *bins, binQuery),
).From("job").CrossJoin(
fmt.Sprintf(`(%s) as value`, crossJoinQuerySql), crossJoinQueryArgs...,
).Where(fmt.Sprintf(`%s is not null and %s <= %f`, jm, jm, peak))
@@ -657,7 +738,15 @@ func (r *JobRepository) jobsMetricStatisticsHistogram(
return nil, err
}
// Setup Array
points := make([]*model.MetricHistoPoint, 0)
for i := 1; i <= *bins; i++ {
binMax := ((int(peak) / *bins) * i)
binMin := ((int(peak) / *bins) * (i - 1))
point := model.MetricHistoPoint{Bin: &i, Count: 0, Min: &binMin, Max: &binMax}
points = append(points, &point)
}
for rows.Next() {
point := model.MetricHistoPoint{}
if err := rows.Scan(&point.Bin, &point.Count, &point.Min, &point.Max); err != nil {
@@ -665,7 +754,20 @@ func (r *JobRepository) jobsMetricStatisticsHistogram(
return nil, err // Totally bricks cc-backend if returned and if all metrics requested?
}
points = append(points, &point)
for _, e := range points {
if e.Bin != nil && point.Bin != nil {
if *e.Bin == *point.Bin {
e.Count = point.Count
if point.Min != nil {
e.Min = point.Min
}
if point.Max != nil {
e.Max = point.Max
}
break
}
}
}
}
result := model.MetricHistoPoints{Metric: metric, Unit: unit, Stat: &footprintStat, Data: points}
@@ -678,7 +780,9 @@ func (r *JobRepository) runningJobsMetricStatisticsHistogram(
ctx context.Context,
metrics []string,
filters []*model.JobFilter,
bins *int,
) []*model.MetricHistoPoints {
// Get Jobs
jobs, err := r.QueryJobs(ctx, filters, &model.PageRequest{Page: 1, ItemsPerPage: 500 + 1}, nil)
if err != nil {
@@ -720,7 +824,6 @@ func (r *JobRepository) runningJobsMetricStatisticsHistogram(
metricConfig = archive.GetMetricConfig(*f.Cluster.Eq, metric)
peak = metricConfig.Peak
unit = metricConfig.Unit.Prefix + metricConfig.Unit.Base
log.Debugf("Cluster %s filter found with peak %f for %s", *f.Cluster.Eq, peak, metric)
}
}
@@ -740,28 +843,24 @@ func (r *JobRepository) runningJobsMetricStatisticsHistogram(
}
// Make and fill bins
bins := 10.0
peakBin := peak / bins
peakBin := int(peak) / *bins
points := make([]*model.MetricHistoPoint, 0)
for b := 0; b < 10; b++ {
for b := 0; b < *bins; b++ {
count := 0
bindex := b + 1
bmin := math.Round(peakBin * float64(b))
bmax := math.Round(peakBin * (float64(b) + 1.0))
bmin := peakBin * b
bmax := peakBin * (b + 1)
// Iterate AVG values for indexed metric and count for bins
for _, val := range avgs[idx] {
if float64(val) >= bmin && float64(val) < bmax {
if int(val) >= bmin && int(val) < bmax {
count += 1
}
}
bminint := int(bmin)
bmaxint := int(bmax)
// Append Bin to Metric Result Array
point := model.MetricHistoPoint{Bin: &bindex, Count: count, Min: &bminint, Max: &bmaxint}
point := model.MetricHistoPoint{Bin: &bindex, Count: count, Min: &bmin, Max: &bmax}
points = append(points, &point)
}