2024-04-11 23:04:30 +02:00
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
2023-06-06 10:27:55 +02:00
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package repository
import (
"context"
"database/sql"
"fmt"
2023-12-12 16:46:03 +01:00
"math"
2023-06-06 10:27:55 +02:00
"time"
2023-06-08 06:18:19 +02:00
"github.com/ClusterCockpit/cc-backend/internal/config"
2023-06-06 10:27:55 +02:00
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
2024-08-28 10:03:04 +02:00
"github.com/ClusterCockpit/cc-backend/internal/metricDataDispatcher"
2023-12-01 13:22:01 +01:00
"github.com/ClusterCockpit/cc-backend/pkg/archive"
2023-06-06 10:27:55 +02:00
"github.com/ClusterCockpit/cc-backend/pkg/log"
2023-12-01 13:22:01 +01:00
"github.com/ClusterCockpit/cc-backend/pkg/schema"
2023-06-06 10:27:55 +02:00
sq "github.com/Masterminds/squirrel"
)
// GraphQL validation should make sure that no unkown values can be specified.
var groupBy2column = map [ model . Aggregate ] string {
2024-11-21 15:02:30 +01:00
model . AggregateUser : "job.hpc_user" ,
2023-06-06 10:27:55 +02:00
model . AggregateProject : "job.project" ,
model . AggregateCluster : "job.cluster" ,
}
2023-08-25 13:14:34 +02:00
var sortBy2column = map [ model . SortByAggregate ] string {
2023-08-29 17:38:17 +02:00
model . SortByAggregateTotaljobs : "totalJobs" ,
model . SortByAggregateTotalwalltime : "totalWalltime" ,
model . SortByAggregateTotalnodes : "totalNodes" ,
model . SortByAggregateTotalnodehours : "totalNodeHours" ,
model . SortByAggregateTotalcores : "totalCores" ,
model . SortByAggregateTotalcorehours : "totalCoreHours" ,
model . SortByAggregateTotalaccs : "totalAccs" ,
model . SortByAggregateTotalacchours : "totalAccHours" ,
2023-08-25 13:14:34 +02:00
}
2023-06-08 06:18:19 +02:00
func ( r * JobRepository ) buildCountQuery (
filter [ ] * model . JobFilter ,
kind string ,
2024-07-12 13:42:12 +02:00
col string ,
) sq . SelectBuilder {
2023-06-08 06:18:19 +02:00
var query sq . SelectBuilder
if col != "" {
// Scan columns: id, cnt
query = sq . Select ( col , "COUNT(job.id)" ) . From ( "job" ) . GroupBy ( col )
} else {
// Scan columns: cnt
query = sq . Select ( "COUNT(job.id)" ) . From ( "job" )
}
switch kind {
case "running" :
query = query . Where ( "job.job_state = ?" , "running" )
case "short" :
query = query . Where ( "job.duration < ?" , config . Keys . ShortRunningJobsDuration )
}
for _ , f := range filter {
query = BuildWhereClause ( f , query )
}
return query
}
func ( r * JobRepository ) buildStatsQuery (
2023-06-06 10:27:55 +02:00
filter [ ] * model . JobFilter ,
2024-07-12 13:42:12 +02:00
col string ,
) sq . SelectBuilder {
2023-06-07 11:58:58 +02:00
var query sq . SelectBuilder
castType := r . getCastType ( )
2023-09-01 10:23:14 +02:00
// fmt.Sprintf(`CAST(ROUND((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) / 3600) as %s) as value`, time.Now().Unix(), castType)
2023-06-07 11:58:58 +02:00
if col != "" {
2024-09-02 17:54:25 +02:00
// Scan columns: id, totalJobs, name, totalWalltime, totalNodes, totalNodeHours, totalCores, totalCoreHours, totalAccs, totalAccHours
query = sq . Select ( col , "COUNT(job.id) as totalJobs" , "name" ,
2023-09-01 10:23:14 +02:00
fmt . Sprintf ( ` CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END)) / 3600) as %s) as totalWalltime ` , time . Now ( ) . Unix ( ) , castType ) ,
fmt . Sprintf ( ` CAST(SUM(job.num_nodes) as %s) as totalNodes ` , castType ) ,
fmt . Sprintf ( ` CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_nodes) / 3600) as %s) as totalNodeHours ` , time . Now ( ) . Unix ( ) , castType ) ,
fmt . Sprintf ( ` CAST(SUM(job.num_hwthreads) as %s) as totalCores ` , castType ) ,
fmt . Sprintf ( ` CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_hwthreads) / 3600) as %s) as totalCoreHours ` , time . Now ( ) . Unix ( ) , castType ) ,
fmt . Sprintf ( ` CAST(SUM(job.num_acc) as %s) as totalAccs ` , castType ) ,
fmt . Sprintf ( ` CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_acc) / 3600) as %s) as totalAccHours ` , time . Now ( ) . Unix ( ) , castType ) ,
2024-11-21 15:02:30 +01:00
) . From ( "job" ) . Join ( "hpc_user ON hpc_user.username = job.hpc_user" ) . GroupBy ( col )
2023-06-07 11:58:58 +02:00
} else {
2024-09-02 17:54:25 +02:00
// Scan columns: totalJobs, name, totalWalltime, totalNodes, totalNodeHours, totalCores, totalCoreHours, totalAccs, totalAccHours
2024-09-02 18:45:33 +02:00
query = sq . Select ( "COUNT(job.id)" ,
2023-09-01 10:23:14 +02:00
fmt . Sprintf ( ` CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END)) / 3600) as %s) ` , time . Now ( ) . Unix ( ) , castType ) ,
fmt . Sprintf ( ` CAST(SUM(job.num_nodes) as %s) ` , castType ) ,
fmt . Sprintf ( ` CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_nodes) / 3600) as %s) ` , time . Now ( ) . Unix ( ) , castType ) ,
fmt . Sprintf ( ` CAST(SUM(job.num_hwthreads) as %s) ` , castType ) ,
fmt . Sprintf ( ` CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_hwthreads) / 3600) as %s) ` , time . Now ( ) . Unix ( ) , castType ) ,
fmt . Sprintf ( ` CAST(SUM(job.num_acc) as %s) ` , castType ) ,
fmt . Sprintf ( ` CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_acc) / 3600) as %s) ` , time . Now ( ) . Unix ( ) , castType ) ,
2024-09-02 18:45:33 +02:00
) . From ( "job" )
2023-06-07 11:58:58 +02:00
}
for _ , f := range filter {
query = BuildWhereClause ( f , query )
}
return query
}
2024-09-02 17:54:25 +02:00
// func (r *JobRepository) getUserName(ctx context.Context, id string) string {
// user := GetUserFromContext(ctx)
2024-11-21 15:02:30 +01:00
// name, _ := r.FindColumnValue(user, id, "hpc_user", "name", "username", false)
2024-09-02 17:54:25 +02:00
// if name != "" {
// return name
// } else {
// return "-"
// }
// }
2023-06-07 11:58:58 +02:00
func ( r * JobRepository ) getCastType ( ) string {
2023-06-06 10:27:55 +02:00
var castType string
switch r . driver {
case "sqlite3" :
castType = "int"
case "mysql" :
castType = "unsigned"
2023-06-07 11:58:58 +02:00
default :
castType = ""
2023-06-06 10:27:55 +02:00
}
2023-06-07 11:58:58 +02:00
return castType
}
2023-06-09 09:09:41 +02:00
func ( r * JobRepository ) JobsStatsGrouped (
2023-06-07 11:58:58 +02:00
ctx context . Context ,
filter [ ] * model . JobFilter ,
2023-08-25 13:14:34 +02:00
page * model . PageRequest ,
sortBy * model . SortByAggregate ,
2024-07-12 13:42:12 +02:00
groupBy * model . Aggregate ,
) ( [ ] * model . JobsStatistics , error ) {
2023-06-07 11:58:58 +02:00
start := time . Now ( )
col := groupBy2column [ * groupBy ]
2023-06-08 06:18:19 +02:00
query := r . buildStatsQuery ( filter , col )
2023-08-25 13:14:34 +02:00
2023-06-07 11:58:58 +02:00
query , err := SecurityCheck ( ctx , query )
if err != nil {
return nil , err
}
2023-08-25 13:14:34 +02:00
if sortBy != nil {
sortBy := sortBy2column [ * sortBy ]
query = query . OrderBy ( fmt . Sprintf ( "%s DESC" , sortBy ) )
}
if page != nil && page . ItemsPerPage != - 1 {
limit := uint64 ( page . ItemsPerPage )
query = query . Offset ( ( uint64 ( page . Page ) - 1 ) * limit ) . Limit ( limit )
}
2023-06-07 11:58:58 +02:00
rows , err := query . RunWith ( r . DB ) . Query ( )
if err != nil {
log . Warn ( "Error while querying DB for job statistics" )
return nil , err
}
stats := make ( [ ] * model . JobsStatistics , 0 , 100 )
for rows . Next ( ) {
var id sql . NullString
2024-09-02 17:54:25 +02:00
var name sql . NullString
2023-08-25 17:38:25 +02:00
var jobs , walltime , nodes , nodeHours , cores , coreHours , accs , accHours sql . NullInt64
2024-09-02 17:54:25 +02:00
if err := rows . Scan ( & id , & jobs , & name , & walltime , & nodes , & nodeHours , & cores , & coreHours , & accs , & accHours ) ; err != nil {
2023-06-07 11:58:58 +02:00
log . Warn ( "Error while scanning rows" )
return nil , err
}
if id . Valid {
2023-08-29 17:38:17 +02:00
var totalJobs , totalWalltime , totalNodes , totalNodeHours , totalCores , totalCoreHours , totalAccs , totalAccHours int
2024-09-02 17:54:25 +02:00
var personName string
if name . Valid {
personName = name . String
}
2023-08-25 17:38:25 +02:00
2023-08-29 14:01:01 +02:00
if jobs . Valid {
totalJobs = int ( jobs . Int64 )
}
2023-08-29 17:38:17 +02:00
if walltime . Valid {
totalWalltime = int ( walltime . Int64 )
}
2023-08-29 14:01:01 +02:00
if nodes . Valid {
totalNodes = int ( nodes . Int64 )
}
2023-08-25 17:38:25 +02:00
if cores . Valid {
totalCores = int ( cores . Int64 )
}
if accs . Valid {
totalAccs = int ( accs . Int64 )
}
2023-06-09 11:29:07 +02:00
2023-08-29 14:01:01 +02:00
if nodeHours . Valid {
totalNodeHours = int ( nodeHours . Int64 )
}
2023-06-09 11:29:07 +02:00
if coreHours . Valid {
totalCoreHours = int ( coreHours . Int64 )
}
if accHours . Valid {
totalAccHours = int ( accHours . Int64 )
}
2024-11-21 15:02:30 +01:00
if col == "job.hpc_user" {
2024-09-02 17:54:25 +02:00
// name := r.getUserName(ctx, id.String)
2023-06-07 11:58:58 +02:00
stats = append ( stats ,
& model . JobsStatistics {
2023-06-09 11:29:07 +02:00
ID : id . String ,
2024-09-02 17:54:25 +02:00
Name : personName ,
2023-08-29 14:01:01 +02:00
TotalJobs : totalJobs ,
2023-08-29 17:38:17 +02:00
TotalWalltime : totalWalltime ,
2023-08-29 14:01:01 +02:00
TotalNodes : totalNodes ,
TotalNodeHours : totalNodeHours ,
2023-08-25 17:38:25 +02:00
TotalCores : totalCores ,
2023-06-09 11:29:07 +02:00
TotalCoreHours : totalCoreHours ,
2023-08-25 17:38:25 +02:00
TotalAccs : totalAccs ,
2024-07-12 13:42:12 +02:00
TotalAccHours : totalAccHours ,
} )
2023-06-06 10:27:55 +02:00
} else {
2023-06-07 11:58:58 +02:00
stats = append ( stats ,
& model . JobsStatistics {
2023-06-09 11:29:07 +02:00
ID : id . String ,
TotalJobs : int ( jobs . Int64 ) ,
TotalWalltime : int ( walltime . Int64 ) ,
2023-08-29 14:01:01 +02:00
TotalNodes : totalNodes ,
TotalNodeHours : totalNodeHours ,
2023-08-25 17:38:25 +02:00
TotalCores : totalCores ,
2023-06-09 11:29:07 +02:00
TotalCoreHours : totalCoreHours ,
2023-08-25 17:38:25 +02:00
TotalAccs : totalAccs ,
2024-07-12 13:42:12 +02:00
TotalAccHours : totalAccHours ,
} )
2023-06-06 10:27:55 +02:00
}
2023-06-07 11:58:58 +02:00
}
}
2023-06-06 10:27:55 +02:00
2023-06-20 15:47:38 +02:00
log . Debugf ( "Timer JobsStatsGrouped %s" , time . Since ( start ) )
2023-06-07 11:58:58 +02:00
return stats , nil
}
2023-08-30 15:04:50 +02:00
func ( r * JobRepository ) JobsStats (
ctx context . Context ,
2024-07-12 13:42:12 +02:00
filter [ ] * model . JobFilter ,
) ( [ ] * model . JobsStatistics , error ) {
2023-08-30 15:04:50 +02:00
start := time . Now ( )
query := r . buildStatsQuery ( filter , "" )
query , err := SecurityCheck ( ctx , query )
if err != nil {
return nil , err
}
2023-06-07 11:58:58 +02:00
row := query . RunWith ( r . DB ) . QueryRow ( )
stats := make ( [ ] * model . JobsStatistics , 0 , 1 )
2023-06-09 09:09:41 +02:00
2023-08-28 10:00:20 +02:00
var jobs , walltime , nodes , nodeHours , cores , coreHours , accs , accHours sql . NullInt64
if err := row . Scan ( & jobs , & walltime , & nodes , & nodeHours , & cores , & coreHours , & accs , & accHours ) ; err != nil {
2023-06-07 11:58:58 +02:00
log . Warn ( "Error while scanning rows" )
return nil , err
}
if jobs . Valid {
2023-08-29 14:01:01 +02:00
var totalNodeHours , totalCoreHours , totalAccHours int
2023-06-09 11:29:07 +02:00
2023-08-29 14:01:01 +02:00
if nodeHours . Valid {
totalNodeHours = int ( nodeHours . Int64 )
}
2023-06-09 11:29:07 +02:00
if coreHours . Valid {
totalCoreHours = int ( coreHours . Int64 )
}
if accHours . Valid {
totalAccHours = int ( accHours . Int64 )
}
2023-06-07 11:58:58 +02:00
stats = append ( stats ,
& model . JobsStatistics {
2023-06-09 11:29:07 +02:00
TotalJobs : int ( jobs . Int64 ) ,
TotalWalltime : int ( walltime . Int64 ) ,
2023-08-29 14:01:01 +02:00
TotalNodeHours : totalNodeHours ,
2023-06-09 11:29:07 +02:00
TotalCoreHours : totalCoreHours ,
2024-07-12 13:42:12 +02:00
TotalAccHours : totalAccHours ,
} )
2023-06-07 11:58:58 +02:00
}
2023-08-25 13:14:34 +02:00
log . Debugf ( "Timer JobStats %s" , time . Since ( start ) )
2023-08-30 15:04:50 +02:00
return stats , nil
2023-08-25 13:14:34 +02:00
}
2024-08-29 07:26:49 +02:00
func LoadJobStat ( job * schema . JobMeta , metric string , statType string ) float64 {
2024-07-09 09:50:32 +02:00
if stats , ok := job . Statistics [ metric ] ; ok {
2024-08-29 07:26:49 +02:00
switch statType {
case "avg" :
2024-07-09 09:50:32 +02:00
return stats . Avg
2024-08-29 07:26:49 +02:00
case "max" :
return stats . Max
case "min" :
return stats . Min
default :
log . Errorf ( "Unknown stat type %s" , statType )
2024-07-09 09:50:32 +02:00
}
}
return 0.0
}
2023-06-09 09:09:41 +02:00
func ( r * JobRepository ) JobCountGrouped (
2023-06-07 11:58:58 +02:00
ctx context . Context ,
filter [ ] * model . JobFilter ,
2024-07-12 13:42:12 +02:00
groupBy * model . Aggregate ,
) ( [ ] * model . JobsStatistics , error ) {
2023-06-07 11:58:58 +02:00
start := time . Now ( )
col := groupBy2column [ * groupBy ]
2023-06-09 09:09:41 +02:00
query := r . buildCountQuery ( filter , "" , col )
2023-06-08 06:18:19 +02:00
query , err := SecurityCheck ( ctx , query )
if err != nil {
return nil , err
}
rows , err := query . RunWith ( r . DB ) . Query ( )
if err != nil {
log . Warn ( "Error while querying DB for job statistics" )
return nil , err
}
2023-06-09 09:09:41 +02:00
stats := make ( [ ] * model . JobsStatistics , 0 , 100 )
2023-06-08 06:18:19 +02:00
for rows . Next ( ) {
var id sql . NullString
var cnt sql . NullInt64
if err := rows . Scan ( & id , & cnt ) ; err != nil {
log . Warn ( "Error while scanning rows" )
return nil , err
}
if id . Valid {
2023-06-09 09:09:41 +02:00
stats = append ( stats ,
& model . JobsStatistics {
ID : id . String ,
2024-07-12 13:42:12 +02:00
TotalJobs : int ( cnt . Int64 ) ,
} )
2023-06-08 06:18:19 +02:00
}
}
2023-06-20 15:47:38 +02:00
log . Debugf ( "Timer JobCountGrouped %s" , time . Since ( start ) )
2023-06-09 09:09:41 +02:00
return stats , nil
}
func ( r * JobRepository ) AddJobCountGrouped (
ctx context . Context ,
filter [ ] * model . JobFilter ,
groupBy * model . Aggregate ,
stats [ ] * model . JobsStatistics ,
2024-07-12 13:42:12 +02:00
kind string ,
) ( [ ] * model . JobsStatistics , error ) {
2023-06-09 09:09:41 +02:00
start := time . Now ( )
col := groupBy2column [ * groupBy ]
query := r . buildCountQuery ( filter , kind , col )
query , err := SecurityCheck ( ctx , query )
2023-06-08 06:18:19 +02:00
if err != nil {
return nil , err
}
2023-06-09 09:09:41 +02:00
rows , err := query . RunWith ( r . DB ) . Query ( )
2023-06-08 06:18:19 +02:00
if err != nil {
log . Warn ( "Error while querying DB for job statistics" )
return nil , err
}
2023-06-09 09:09:41 +02:00
counts := make ( map [ string ] int )
2023-06-08 06:18:19 +02:00
for rows . Next ( ) {
var id sql . NullString
var cnt sql . NullInt64
if err := rows . Scan ( & id , & cnt ) ; err != nil {
log . Warn ( "Error while scanning rows" )
return nil , err
}
if id . Valid {
2023-06-09 09:09:41 +02:00
counts [ id . String ] = int ( cnt . Int64 )
2023-06-08 06:18:19 +02:00
}
}
2023-06-09 09:09:41 +02:00
switch kind {
case "running" :
for _ , s := range stats {
s . RunningJobs = counts [ s . ID ]
}
case "short" :
for _ , s := range stats {
s . ShortJobs = counts [ s . ID ]
}
2023-06-08 06:18:19 +02:00
}
2023-06-09 09:09:41 +02:00
2023-06-20 15:47:38 +02:00
log . Debugf ( "Timer AddJobCountGrouped %s" , time . Since ( start ) )
2023-06-08 06:18:19 +02:00
return stats , nil
}
2023-06-09 13:15:25 +02:00
func ( r * JobRepository ) AddJobCount (
ctx context . Context ,
filter [ ] * model . JobFilter ,
stats [ ] * model . JobsStatistics ,
2024-07-12 13:42:12 +02:00
kind string ,
) ( [ ] * model . JobsStatistics , error ) {
2023-06-09 13:15:25 +02:00
start := time . Now ( )
query := r . buildCountQuery ( filter , kind , "" )
query , err := SecurityCheck ( ctx , query )
if err != nil {
return nil , err
}
rows , err := query . RunWith ( r . DB ) . Query ( )
if err != nil {
log . Warn ( "Error while querying DB for job statistics" )
return nil , err
}
2023-08-24 14:26:23 +02:00
var count int
2023-06-09 13:15:25 +02:00
for rows . Next ( ) {
var cnt sql . NullInt64
if err := rows . Scan ( & cnt ) ; err != nil {
log . Warn ( "Error while scanning rows" )
return nil , err
}
2023-08-24 14:26:23 +02:00
count = int ( cnt . Int64 )
2023-06-09 13:15:25 +02:00
}
switch kind {
case "running" :
for _ , s := range stats {
2023-08-24 14:26:23 +02:00
s . RunningJobs = count
2023-06-09 13:15:25 +02:00
}
case "short" :
for _ , s := range stats {
2023-08-24 14:26:23 +02:00
s . ShortJobs = count
2023-06-09 13:15:25 +02:00
}
}
2023-08-24 14:26:23 +02:00
log . Debugf ( "Timer AddJobCount %s" , time . Since ( start ) )
2023-06-09 13:15:25 +02:00
return stats , nil
}
2023-06-07 11:58:58 +02:00
func ( r * JobRepository ) AddHistograms (
ctx context . Context ,
filter [ ] * model . JobFilter ,
2024-07-12 13:42:12 +02:00
stat * model . JobsStatistics ,
) ( * model . JobsStatistics , error ) {
2023-06-09 15:02:22 +02:00
start := time . Now ( )
2023-06-07 11:58:58 +02:00
castType := r . getCastType ( )
var err error
value := fmt . Sprintf ( ` CAST(ROUND((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) / 3600) as %s) as value ` , time . Now ( ) . Unix ( ) , castType )
stat . HistDuration , err = r . jobsStatisticsHistogram ( ctx , value , filter )
if err != nil {
log . Warn ( "Error while loading job statistics histogram: running jobs" )
return nil , err
}
stat . HistNumNodes , err = r . jobsStatisticsHistogram ( ctx , "job.num_nodes as value" , filter )
if err != nil {
log . Warn ( "Error while loading job statistics histogram: num nodes" )
2023-08-29 14:02:23 +02:00
return nil , err
}
stat . HistNumCores , err = r . jobsStatisticsHistogram ( ctx , "job.num_hwthreads as value" , filter )
if err != nil {
log . Warn ( "Error while loading job statistics histogram: num hwthreads" )
return nil , err
}
stat . HistNumAccs , err = r . jobsStatisticsHistogram ( ctx , "job.num_acc as value" , filter )
if err != nil {
log . Warn ( "Error while loading job statistics histogram: num acc" )
2023-06-07 11:58:58 +02:00
return nil , err
}
2023-06-20 15:47:38 +02:00
log . Debugf ( "Timer AddHistograms %s" , time . Since ( start ) )
2023-06-07 11:58:58 +02:00
return stat , nil
}
2023-12-01 13:22:01 +01:00
// Requires thresholds for metric from config for cluster? Of all clusters and use largest? split to 10 + 1 for artifacts?
func ( r * JobRepository ) AddMetricHistograms (
ctx context . Context ,
filter [ ] * model . JobFilter ,
metrics [ ] string ,
2024-07-12 13:42:12 +02:00
stat * model . JobsStatistics ,
) ( * model . JobsStatistics , error ) {
2023-12-01 13:22:01 +01:00
start := time . Now ( )
2023-12-12 16:46:03 +01:00
// Running Jobs Only: First query jobdata from sqlite, then query data and make bins
for _ , f := range filter {
if f . State != nil {
if len ( f . State ) == 1 && f . State [ 0 ] == "running" {
stat . HistMetrics = r . runningJobsMetricStatisticsHistogram ( ctx , metrics , filter )
log . Debugf ( "Timer AddMetricHistograms %s" , time . Since ( start ) )
return stat , nil
}
}
}
// All other cases: Query and make bins in sqlite directly
2023-12-08 12:03:04 +01:00
for _ , m := range metrics {
metricHisto , err := r . jobsMetricStatisticsHistogram ( ctx , m , filter )
2023-12-01 13:22:01 +01:00
if err != nil {
log . Warnf ( "Error while loading job metric statistics histogram: %s" , m )
continue
}
stat . HistMetrics = append ( stat . HistMetrics , metricHisto )
}
log . Debugf ( "Timer AddMetricHistograms %s" , time . Since ( start ) )
return stat , nil
}
2023-06-07 11:58:58 +02:00
// `value` must be the column grouped by, but renamed to "value"
func ( r * JobRepository ) jobsStatisticsHistogram (
ctx context . Context ,
value string ,
2024-07-12 13:42:12 +02:00
filters [ ] * model . JobFilter ,
) ( [ ] * model . HistoPoint , error ) {
2023-06-06 10:27:55 +02:00
start := time . Now ( )
2023-06-07 11:58:58 +02:00
query , qerr := SecurityCheck ( ctx ,
sq . Select ( value , "COUNT(job.id) AS count" ) . From ( "job" ) )
2023-06-06 10:27:55 +02:00
if qerr != nil {
return nil , qerr
}
for _ , f := range filters {
query = BuildWhereClause ( f , query )
}
rows , err := query . GroupBy ( "value" ) . RunWith ( r . DB ) . Query ( )
if err != nil {
log . Error ( "Error while running query" )
return nil , err
}
points := make ( [ ] * model . HistoPoint , 0 )
for rows . Next ( ) {
point := model . HistoPoint { }
if err := rows . Scan ( & point . Value , & point . Count ) ; err != nil {
log . Warn ( "Error while scanning rows" )
return nil , err
}
points = append ( points , & point )
}
2023-06-20 15:47:38 +02:00
log . Debugf ( "Timer jobsStatisticsHistogram %s" , time . Since ( start ) )
2023-06-06 10:27:55 +02:00
return points , nil
}
2023-12-01 13:22:01 +01:00
func ( r * JobRepository ) jobsMetricStatisticsHistogram (
ctx context . Context ,
metric string ,
2024-07-12 13:42:12 +02:00
filters [ ] * model . JobFilter ,
) ( * model . MetricHistoPoints , error ) {
2023-12-01 13:22:01 +01:00
// Get specific Peak or largest Peak
var metricConfig * schema . MetricConfig
var peak float64 = 0.0
2023-12-05 15:30:40 +01:00
var unit string = ""
2024-07-22 15:41:33 +02:00
var footprintStat string = ""
2023-12-05 11:59:01 +01:00
2023-12-01 13:22:01 +01:00
for _ , f := range filters {
if f . Cluster != nil {
metricConfig = archive . GetMetricConfig ( * f . Cluster . Eq , metric )
peak = metricConfig . Peak
2023-12-05 15:30:40 +01:00
unit = metricConfig . Unit . Prefix + metricConfig . Unit . Base
2024-07-22 15:41:33 +02:00
footprintStat = metricConfig . Footprint
2023-12-05 11:59:01 +01:00
log . Debugf ( "Cluster %s filter found with peak %f for %s" , * f . Cluster . Eq , peak , metric )
}
}
if peak == 0.0 {
for _ , c := range archive . Clusters {
for _ , m := range c . MetricConfig {
if m . Name == metric {
if m . Peak > peak {
peak = m . Peak
2023-12-01 13:22:01 +01:00
}
2023-12-05 15:30:40 +01:00
if unit == "" {
unit = m . Unit . Prefix + m . Unit . Base
}
2024-07-22 15:41:33 +02:00
if footprintStat == "" {
footprintStat = m . Footprint
}
2023-12-01 13:22:01 +01:00
}
}
}
}
2024-07-22 15:41:33 +02:00
// log.Debugf("Metric %s, Peak %f, Unit %s, Aggregation %s", metric, peak, unit, aggreg)
2023-12-05 11:59:01 +01:00
// Make bins, see https://jereze.com/code/sql-histogram/
2023-12-08 12:03:04 +01:00
2023-12-01 13:22:01 +01:00
start := time . Now ( )
2024-07-22 15:41:33 +02:00
jm := fmt . Sprintf ( ` json_extract(footprint, "$.%s") ` , ( metric + "_" + footprintStat ) )
2023-12-08 12:03:04 +01:00
crossJoinQuery := sq . Select (
2024-07-12 13:42:12 +02:00
fmt . Sprintf ( ` max(%s) as max ` , jm ) ,
fmt . Sprintf ( ` min(%s) as min ` , jm ) ,
2023-12-08 12:03:04 +01:00
) . From ( "job" ) . Where (
2024-07-22 15:41:33 +02:00
"JSON_VALID(footprint)" ,
) . Where (
2024-07-12 13:42:12 +02:00
fmt . Sprintf ( ` %s is not null ` , jm ) ,
2023-12-08 12:03:04 +01:00
) . Where (
2024-07-12 13:42:12 +02:00
fmt . Sprintf ( ` %s <= %f ` , jm , peak ) ,
2023-12-08 12:03:04 +01:00
)
crossJoinQuery , cjqerr := SecurityCheck ( ctx , crossJoinQuery )
2023-12-11 13:55:56 +01:00
2023-12-08 12:03:04 +01:00
if cjqerr != nil {
return nil , cjqerr
}
2023-12-11 13:55:56 +01:00
for _ , f := range filters {
crossJoinQuery = BuildWhereClause ( f , crossJoinQuery )
}
2023-12-12 15:07:23 +01:00
crossJoinQuerySql , crossJoinQueryArgs , sqlerr := crossJoinQuery . ToSql ( )
2023-12-08 12:03:04 +01:00
if sqlerr != nil {
return nil , sqlerr
}
bins := 10
2024-07-12 13:42:12 +02:00
binQuery := fmt . Sprintf ( ` CAST ( ( case when % s = value . max
then value . max * 0.999999999 else % s end - value . min ) / ( value . max -
2024-07-12 14:08:48 +02:00
value . min ) * % d as INTEGER ) ` , jm , jm , bins )
2023-12-08 12:03:04 +01:00
mainQuery := sq . Select (
fmt . Sprintf ( ` %s + 1 as bin ` , binQuery ) ,
2024-07-12 14:08:48 +02:00
fmt . Sprintf ( ` count(%s) as count ` , jm ) ,
2023-12-08 12:03:04 +01:00
fmt . Sprintf ( ` CAST(((value.max / %d) * (%s )) as INTEGER ) as min ` , bins , binQuery ) ,
fmt . Sprintf ( ` CAST(((value.max / %d) * (%s + 1 )) as INTEGER ) as max ` , bins , binQuery ) ,
) . From ( "job" ) . CrossJoin (
2023-12-12 15:07:23 +01:00
fmt . Sprintf ( ` (%s) as value ` , crossJoinQuerySql ) , crossJoinQueryArgs ... ,
2024-07-12 14:08:48 +02:00
) . Where ( fmt . Sprintf ( ` %s is not null and %s <= %f ` , jm , jm , peak ) )
2023-12-05 11:59:01 +01:00
2023-12-08 12:03:04 +01:00
mainQuery , qerr := SecurityCheck ( ctx , mainQuery )
2023-12-01 13:22:01 +01:00
if qerr != nil {
return nil , qerr
}
for _ , f := range filters {
2023-12-08 12:03:04 +01:00
mainQuery = BuildWhereClause ( f , mainQuery )
2023-12-01 13:22:01 +01:00
}
2023-12-05 11:59:01 +01:00
// Finalize query with Grouping and Ordering
2023-12-08 12:03:04 +01:00
mainQuery = mainQuery . GroupBy ( "bin" ) . OrderBy ( "bin" )
2023-12-05 11:59:01 +01:00
2023-12-08 12:03:04 +01:00
rows , err := mainQuery . RunWith ( r . DB ) . Query ( )
2023-12-01 13:22:01 +01:00
if err != nil {
2023-12-08 12:03:04 +01:00
log . Errorf ( "Error while running mainQuery: %s" , err )
2023-12-01 13:22:01 +01:00
return nil , err
}
2023-12-05 11:59:01 +01:00
points := make ( [ ] * model . MetricHistoPoint , 0 )
2023-12-01 13:22:01 +01:00
for rows . Next ( ) {
2023-12-05 11:59:01 +01:00
point := model . MetricHistoPoint { }
2023-12-08 12:03:04 +01:00
if err := rows . Scan ( & point . Bin , & point . Count , & point . Min , & point . Max ) ; err != nil {
2024-07-12 14:08:48 +02:00
log . Warnf ( "Error while scanning rows for %s" , jm )
2023-12-08 12:03:04 +01:00
return nil , err // Totally bricks cc-backend if returned and if all metrics requested?
2023-12-01 13:22:01 +01:00
}
points = append ( points , & point )
}
2023-12-05 11:59:01 +01:00
2024-07-22 15:41:33 +02:00
result := model . MetricHistoPoints { Metric : metric , Unit : unit , Stat : & footprintStat , Data : points }
2023-12-05 11:59:01 +01:00
2023-12-01 13:22:01 +01:00
log . Debugf ( "Timer jobsStatisticsHistogram %s" , time . Since ( start ) )
2023-12-05 11:59:01 +01:00
return & result , nil
2023-12-01 13:22:01 +01:00
}
2023-12-12 16:46:03 +01:00
func ( r * JobRepository ) runningJobsMetricStatisticsHistogram (
ctx context . Context ,
metrics [ ] string ,
2024-07-12 13:42:12 +02:00
filters [ ] * model . JobFilter ,
) [ ] * model . MetricHistoPoints {
2023-12-12 16:46:03 +01:00
// Get Jobs
jobs , err := r . QueryJobs ( ctx , filters , & model . PageRequest { Page : 1 , ItemsPerPage : 500 + 1 } , nil )
if err != nil {
log . Errorf ( "Error while querying jobs for footprint: %s" , err )
return nil
}
if len ( jobs ) > 500 {
log . Errorf ( "too many jobs matched (max: %d)" , 500 )
return nil
}
// Get AVGs from metric repo
avgs := make ( [ ] [ ] schema . Float , len ( metrics ) )
for i := range avgs {
avgs [ i ] = make ( [ ] schema . Float , 0 , len ( jobs ) )
}
for _ , job := range jobs {
if job . MonitoringStatus == schema . MonitoringStatusDisabled || job . MonitoringStatus == schema . MonitoringStatusArchivingFailed {
continue
}
2024-08-28 10:03:04 +02:00
if err := metricDataDispatcher . LoadAverages ( job , metrics , avgs , ctx ) ; err != nil {
2023-12-12 16:46:03 +01:00
log . Errorf ( "Error while loading averages for histogram: %s" , err )
return nil
}
}
// Iterate metrics to fill endresult
data := make ( [ ] * model . MetricHistoPoints , 0 )
for idx , metric := range metrics {
// Get specific Peak or largest Peak
var metricConfig * schema . MetricConfig
var peak float64 = 0.0
var unit string = ""
for _ , f := range filters {
if f . Cluster != nil {
metricConfig = archive . GetMetricConfig ( * f . Cluster . Eq , metric )
peak = metricConfig . Peak
unit = metricConfig . Unit . Prefix + metricConfig . Unit . Base
log . Debugf ( "Cluster %s filter found with peak %f for %s" , * f . Cluster . Eq , peak , metric )
}
}
if peak == 0.0 {
for _ , c := range archive . Clusters {
for _ , m := range c . MetricConfig {
if m . Name == metric {
if m . Peak > peak {
peak = m . Peak
}
if unit == "" {
unit = m . Unit . Prefix + m . Unit . Base
}
}
}
}
}
// Make and fill bins
bins := 10.0
peakBin := peak / bins
points := make ( [ ] * model . MetricHistoPoint , 0 )
for b := 0 ; b < 10 ; b ++ {
count := 0
bindex := b + 1
bmin := math . Round ( peakBin * float64 ( b ) )
bmax := math . Round ( peakBin * ( float64 ( b ) + 1.0 ) )
// Iterate AVG values for indexed metric and count for bins
for _ , val := range avgs [ idx ] {
if float64 ( val ) >= bmin && float64 ( val ) < bmax {
count += 1
}
}
bminint := int ( bmin )
bmaxint := int ( bmax )
// Append Bin to Metric Result Array
point := model . MetricHistoPoint { Bin : & bindex , Count : count , Min : & bminint , Max : & bmaxint }
points = append ( points , & point )
}
// Append Metric Result Array to final results array
result := model . MetricHistoPoints { Metric : metric , Unit : unit , Data : points }
data = append ( data , & result )
}
return data
}