2024-04-11 23:04:30 +02:00
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
2023-06-06 10:27:55 +02:00
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package repository
import (
"context"
"database/sql"
"fmt"
"time"
2023-06-08 06:18:19 +02:00
"github.com/ClusterCockpit/cc-backend/internal/config"
2023-06-06 10:27:55 +02:00
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
2024-08-28 10:03:04 +02:00
"github.com/ClusterCockpit/cc-backend/internal/metricDataDispatcher"
2023-12-01 13:22:01 +01:00
"github.com/ClusterCockpit/cc-backend/pkg/archive"
2023-06-06 10:27:55 +02:00
"github.com/ClusterCockpit/cc-backend/pkg/log"
2023-12-01 13:22:01 +01:00
"github.com/ClusterCockpit/cc-backend/pkg/schema"
2023-06-06 10:27:55 +02:00
sq "github.com/Masterminds/squirrel"
)
// GraphQL validation should make sure that no unkown values can be specified.
var groupBy2column = map [ model . Aggregate ] string {
2024-11-21 15:02:30 +01:00
model . AggregateUser : "job.hpc_user" ,
2023-06-06 10:27:55 +02:00
model . AggregateProject : "job.project" ,
model . AggregateCluster : "job.cluster" ,
}
2023-08-25 13:14:34 +02:00
var sortBy2column = map [ model . SortByAggregate ] string {
2023-08-29 17:38:17 +02:00
model . SortByAggregateTotaljobs : "totalJobs" ,
model . SortByAggregateTotalwalltime : "totalWalltime" ,
model . SortByAggregateTotalnodes : "totalNodes" ,
model . SortByAggregateTotalnodehours : "totalNodeHours" ,
model . SortByAggregateTotalcores : "totalCores" ,
model . SortByAggregateTotalcorehours : "totalCoreHours" ,
model . SortByAggregateTotalaccs : "totalAccs" ,
model . SortByAggregateTotalacchours : "totalAccHours" ,
2023-08-25 13:14:34 +02:00
}
2023-06-08 06:18:19 +02:00
func ( r * JobRepository ) buildCountQuery (
filter [ ] * model . JobFilter ,
kind string ,
2024-07-12 13:42:12 +02:00
col string ,
) sq . SelectBuilder {
2023-06-08 06:18:19 +02:00
var query sq . SelectBuilder
if col != "" {
// Scan columns: id, cnt
query = sq . Select ( col , "COUNT(job.id)" ) . From ( "job" ) . GroupBy ( col )
} else {
// Scan columns: cnt
query = sq . Select ( "COUNT(job.id)" ) . From ( "job" )
}
switch kind {
case "running" :
query = query . Where ( "job.job_state = ?" , "running" )
case "short" :
query = query . Where ( "job.duration < ?" , config . Keys . ShortRunningJobsDuration )
}
for _ , f := range filter {
query = BuildWhereClause ( f , query )
}
return query
}
func ( r * JobRepository ) buildStatsQuery (
2023-06-06 10:27:55 +02:00
filter [ ] * model . JobFilter ,
2024-07-12 13:42:12 +02:00
col string ,
) sq . SelectBuilder {
2023-06-07 11:58:58 +02:00
var query sq . SelectBuilder
castType := r . getCastType ( )
2023-09-01 10:23:14 +02:00
// fmt.Sprintf(`CAST(ROUND((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) / 3600) as %s) as value`, time.Now().Unix(), castType)
2023-06-07 11:58:58 +02:00
if col != "" {
2024-09-02 17:54:25 +02:00
// Scan columns: id, totalJobs, name, totalWalltime, totalNodes, totalNodeHours, totalCores, totalCoreHours, totalAccs, totalAccHours
query = sq . Select ( col , "COUNT(job.id) as totalJobs" , "name" ,
2023-09-01 10:23:14 +02:00
fmt . Sprintf ( ` CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END)) / 3600) as %s) as totalWalltime ` , time . Now ( ) . Unix ( ) , castType ) ,
fmt . Sprintf ( ` CAST(SUM(job.num_nodes) as %s) as totalNodes ` , castType ) ,
fmt . Sprintf ( ` CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_nodes) / 3600) as %s) as totalNodeHours ` , time . Now ( ) . Unix ( ) , castType ) ,
fmt . Sprintf ( ` CAST(SUM(job.num_hwthreads) as %s) as totalCores ` , castType ) ,
fmt . Sprintf ( ` CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_hwthreads) / 3600) as %s) as totalCoreHours ` , time . Now ( ) . Unix ( ) , castType ) ,
fmt . Sprintf ( ` CAST(SUM(job.num_acc) as %s) as totalAccs ` , castType ) ,
fmt . Sprintf ( ` CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_acc) / 3600) as %s) as totalAccHours ` , time . Now ( ) . Unix ( ) , castType ) ,
2024-11-21 15:03:55 +01:00
) . From ( "job" ) . LeftJoin ( "hpc_user ON hpc_user.username = job.hpc_user" ) . GroupBy ( col )
2023-06-07 11:58:58 +02:00
} else {
2024-09-02 17:54:25 +02:00
// Scan columns: totalJobs, name, totalWalltime, totalNodes, totalNodeHours, totalCores, totalCoreHours, totalAccs, totalAccHours
2024-09-02 18:45:33 +02:00
query = sq . Select ( "COUNT(job.id)" ,
2023-09-01 10:23:14 +02:00
fmt . Sprintf ( ` CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END)) / 3600) as %s) ` , time . Now ( ) . Unix ( ) , castType ) ,
fmt . Sprintf ( ` CAST(SUM(job.num_nodes) as %s) ` , castType ) ,
fmt . Sprintf ( ` CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_nodes) / 3600) as %s) ` , time . Now ( ) . Unix ( ) , castType ) ,
fmt . Sprintf ( ` CAST(SUM(job.num_hwthreads) as %s) ` , castType ) ,
fmt . Sprintf ( ` CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_hwthreads) / 3600) as %s) ` , time . Now ( ) . Unix ( ) , castType ) ,
fmt . Sprintf ( ` CAST(SUM(job.num_acc) as %s) ` , castType ) ,
fmt . Sprintf ( ` CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_acc) / 3600) as %s) ` , time . Now ( ) . Unix ( ) , castType ) ,
2024-09-02 18:45:33 +02:00
) . From ( "job" )
2023-06-07 11:58:58 +02:00
}
for _ , f := range filter {
query = BuildWhereClause ( f , query )
}
return query
}
2024-09-02 17:54:25 +02:00
// func (r *JobRepository) getUserName(ctx context.Context, id string) string {
// user := GetUserFromContext(ctx)
2024-11-21 15:02:30 +01:00
// name, _ := r.FindColumnValue(user, id, "hpc_user", "name", "username", false)
2024-09-02 17:54:25 +02:00
// if name != "" {
// return name
// } else {
// return "-"
// }
// }
2023-06-07 11:58:58 +02:00
func ( r * JobRepository ) getCastType ( ) string {
2023-06-06 10:27:55 +02:00
var castType string
switch r . driver {
case "sqlite3" :
castType = "int"
case "mysql" :
castType = "unsigned"
2023-06-07 11:58:58 +02:00
default :
castType = ""
2023-06-06 10:27:55 +02:00
}
2023-06-07 11:58:58 +02:00
return castType
}
2023-06-09 09:09:41 +02:00
func ( r * JobRepository ) JobsStatsGrouped (
2023-06-07 11:58:58 +02:00
ctx context . Context ,
filter [ ] * model . JobFilter ,
2023-08-25 13:14:34 +02:00
page * model . PageRequest ,
sortBy * model . SortByAggregate ,
2024-07-12 13:42:12 +02:00
groupBy * model . Aggregate ,
) ( [ ] * model . JobsStatistics , error ) {
2023-06-07 11:58:58 +02:00
start := time . Now ( )
col := groupBy2column [ * groupBy ]
2023-06-08 06:18:19 +02:00
query := r . buildStatsQuery ( filter , col )
2023-08-25 13:14:34 +02:00
2023-06-07 11:58:58 +02:00
query , err := SecurityCheck ( ctx , query )
if err != nil {
return nil , err
}
2023-08-25 13:14:34 +02:00
if sortBy != nil {
sortBy := sortBy2column [ * sortBy ]
query = query . OrderBy ( fmt . Sprintf ( "%s DESC" , sortBy ) )
}
if page != nil && page . ItemsPerPage != - 1 {
limit := uint64 ( page . ItemsPerPage )
query = query . Offset ( ( uint64 ( page . Page ) - 1 ) * limit ) . Limit ( limit )
}
2023-06-07 11:58:58 +02:00
rows , err := query . RunWith ( r . DB ) . Query ( )
if err != nil {
log . Warn ( "Error while querying DB for job statistics" )
return nil , err
}
stats := make ( [ ] * model . JobsStatistics , 0 , 100 )
for rows . Next ( ) {
var id sql . NullString
2024-09-02 17:54:25 +02:00
var name sql . NullString
2023-08-25 17:38:25 +02:00
var jobs , walltime , nodes , nodeHours , cores , coreHours , accs , accHours sql . NullInt64
2024-09-02 17:54:25 +02:00
if err := rows . Scan ( & id , & jobs , & name , & walltime , & nodes , & nodeHours , & cores , & coreHours , & accs , & accHours ) ; err != nil {
2023-06-07 11:58:58 +02:00
log . Warn ( "Error while scanning rows" )
return nil , err
}
if id . Valid {
2023-08-29 17:38:17 +02:00
var totalJobs , totalWalltime , totalNodes , totalNodeHours , totalCores , totalCoreHours , totalAccs , totalAccHours int
2024-09-02 17:54:25 +02:00
var personName string
if name . Valid {
personName = name . String
}
2023-08-25 17:38:25 +02:00
2023-08-29 14:01:01 +02:00
if jobs . Valid {
totalJobs = int ( jobs . Int64 )
}
2023-08-29 17:38:17 +02:00
if walltime . Valid {
totalWalltime = int ( walltime . Int64 )
}
2023-08-29 14:01:01 +02:00
if nodes . Valid {
totalNodes = int ( nodes . Int64 )
}
2023-08-25 17:38:25 +02:00
if cores . Valid {
totalCores = int ( cores . Int64 )
}
if accs . Valid {
totalAccs = int ( accs . Int64 )
}
2023-06-09 11:29:07 +02:00
2023-08-29 14:01:01 +02:00
if nodeHours . Valid {
totalNodeHours = int ( nodeHours . Int64 )
}
2023-06-09 11:29:07 +02:00
if coreHours . Valid {
totalCoreHours = int ( coreHours . Int64 )
}
if accHours . Valid {
totalAccHours = int ( accHours . Int64 )
}
2024-11-21 15:02:30 +01:00
if col == "job.hpc_user" {
2024-09-02 17:54:25 +02:00
// name := r.getUserName(ctx, id.String)
2023-06-07 11:58:58 +02:00
stats = append ( stats ,
& model . JobsStatistics {
2023-06-09 11:29:07 +02:00
ID : id . String ,
2024-09-02 17:54:25 +02:00
Name : personName ,
2023-08-29 14:01:01 +02:00
TotalJobs : totalJobs ,
2023-08-29 17:38:17 +02:00
TotalWalltime : totalWalltime ,
2023-08-29 14:01:01 +02:00
TotalNodes : totalNodes ,
TotalNodeHours : totalNodeHours ,
2023-08-25 17:38:25 +02:00
TotalCores : totalCores ,
2023-06-09 11:29:07 +02:00
TotalCoreHours : totalCoreHours ,
2023-08-25 17:38:25 +02:00
TotalAccs : totalAccs ,
2024-07-12 13:42:12 +02:00
TotalAccHours : totalAccHours ,
} )
2023-06-06 10:27:55 +02:00
} else {
2023-06-07 11:58:58 +02:00
stats = append ( stats ,
& model . JobsStatistics {
2023-06-09 11:29:07 +02:00
ID : id . String ,
TotalJobs : int ( jobs . Int64 ) ,
TotalWalltime : int ( walltime . Int64 ) ,
2023-08-29 14:01:01 +02:00
TotalNodes : totalNodes ,
TotalNodeHours : totalNodeHours ,
2023-08-25 17:38:25 +02:00
TotalCores : totalCores ,
2023-06-09 11:29:07 +02:00
TotalCoreHours : totalCoreHours ,
2023-08-25 17:38:25 +02:00
TotalAccs : totalAccs ,
2024-07-12 13:42:12 +02:00
TotalAccHours : totalAccHours ,
} )
2023-06-06 10:27:55 +02:00
}
2023-06-07 11:58:58 +02:00
}
}
2023-06-06 10:27:55 +02:00
2023-06-20 15:47:38 +02:00
log . Debugf ( "Timer JobsStatsGrouped %s" , time . Since ( start ) )
2023-06-07 11:58:58 +02:00
return stats , nil
}
2023-08-30 15:04:50 +02:00
func ( r * JobRepository ) JobsStats (
ctx context . Context ,
2024-07-12 13:42:12 +02:00
filter [ ] * model . JobFilter ,
) ( [ ] * model . JobsStatistics , error ) {
2023-08-30 15:04:50 +02:00
start := time . Now ( )
query := r . buildStatsQuery ( filter , "" )
query , err := SecurityCheck ( ctx , query )
if err != nil {
return nil , err
}
2023-06-07 11:58:58 +02:00
row := query . RunWith ( r . DB ) . QueryRow ( )
stats := make ( [ ] * model . JobsStatistics , 0 , 1 )
2023-06-09 09:09:41 +02:00
2023-08-28 10:00:20 +02:00
var jobs , walltime , nodes , nodeHours , cores , coreHours , accs , accHours sql . NullInt64
if err := row . Scan ( & jobs , & walltime , & nodes , & nodeHours , & cores , & coreHours , & accs , & accHours ) ; err != nil {
2023-06-07 11:58:58 +02:00
log . Warn ( "Error while scanning rows" )
return nil , err
}
if jobs . Valid {
2023-08-29 14:01:01 +02:00
var totalNodeHours , totalCoreHours , totalAccHours int
2023-06-09 11:29:07 +02:00
2023-08-29 14:01:01 +02:00
if nodeHours . Valid {
totalNodeHours = int ( nodeHours . Int64 )
}
2023-06-09 11:29:07 +02:00
if coreHours . Valid {
totalCoreHours = int ( coreHours . Int64 )
}
if accHours . Valid {
totalAccHours = int ( accHours . Int64 )
}
2023-06-07 11:58:58 +02:00
stats = append ( stats ,
& model . JobsStatistics {
2023-06-09 11:29:07 +02:00
TotalJobs : int ( jobs . Int64 ) ,
TotalWalltime : int ( walltime . Int64 ) ,
2023-08-29 14:01:01 +02:00
TotalNodeHours : totalNodeHours ,
2023-06-09 11:29:07 +02:00
TotalCoreHours : totalCoreHours ,
2024-07-12 13:42:12 +02:00
TotalAccHours : totalAccHours ,
} )
2023-06-07 11:58:58 +02:00
}
2023-08-25 13:14:34 +02:00
log . Debugf ( "Timer JobStats %s" , time . Since ( start ) )
2023-08-30 15:04:50 +02:00
return stats , nil
2023-08-25 13:14:34 +02:00
}
2024-08-29 07:26:49 +02:00
func LoadJobStat ( job * schema . JobMeta , metric string , statType string ) float64 {
2024-07-09 09:50:32 +02:00
if stats , ok := job . Statistics [ metric ] ; ok {
2024-08-29 07:26:49 +02:00
switch statType {
case "avg" :
2024-07-09 09:50:32 +02:00
return stats . Avg
2024-08-29 07:26:49 +02:00
case "max" :
return stats . Max
case "min" :
return stats . Min
default :
log . Errorf ( "Unknown stat type %s" , statType )
2024-07-09 09:50:32 +02:00
}
}
return 0.0
}
2023-06-09 09:09:41 +02:00
func ( r * JobRepository ) JobCountGrouped (
2023-06-07 11:58:58 +02:00
ctx context . Context ,
filter [ ] * model . JobFilter ,
2024-07-12 13:42:12 +02:00
groupBy * model . Aggregate ,
) ( [ ] * model . JobsStatistics , error ) {
2023-06-07 11:58:58 +02:00
start := time . Now ( )
col := groupBy2column [ * groupBy ]
2023-06-09 09:09:41 +02:00
query := r . buildCountQuery ( filter , "" , col )
2023-06-08 06:18:19 +02:00
query , err := SecurityCheck ( ctx , query )
if err != nil {
return nil , err
}
rows , err := query . RunWith ( r . DB ) . Query ( )
if err != nil {
log . Warn ( "Error while querying DB for job statistics" )
return nil , err
}
2023-06-09 09:09:41 +02:00
stats := make ( [ ] * model . JobsStatistics , 0 , 100 )
2023-06-08 06:18:19 +02:00
for rows . Next ( ) {
var id sql . NullString
var cnt sql . NullInt64
if err := rows . Scan ( & id , & cnt ) ; err != nil {
log . Warn ( "Error while scanning rows" )
return nil , err
}
if id . Valid {
2023-06-09 09:09:41 +02:00
stats = append ( stats ,
& model . JobsStatistics {
ID : id . String ,
2024-07-12 13:42:12 +02:00
TotalJobs : int ( cnt . Int64 ) ,
} )
2023-06-08 06:18:19 +02:00
}
}
2023-06-20 15:47:38 +02:00
log . Debugf ( "Timer JobCountGrouped %s" , time . Since ( start ) )
2023-06-09 09:09:41 +02:00
return stats , nil
}
func ( r * JobRepository ) AddJobCountGrouped (
ctx context . Context ,
filter [ ] * model . JobFilter ,
groupBy * model . Aggregate ,
stats [ ] * model . JobsStatistics ,
2024-07-12 13:42:12 +02:00
kind string ,
) ( [ ] * model . JobsStatistics , error ) {
2023-06-09 09:09:41 +02:00
start := time . Now ( )
col := groupBy2column [ * groupBy ]
query := r . buildCountQuery ( filter , kind , col )
query , err := SecurityCheck ( ctx , query )
2023-06-08 06:18:19 +02:00
if err != nil {
return nil , err
}
2023-06-09 09:09:41 +02:00
rows , err := query . RunWith ( r . DB ) . Query ( )
2023-06-08 06:18:19 +02:00
if err != nil {
log . Warn ( "Error while querying DB for job statistics" )
return nil , err
}
2023-06-09 09:09:41 +02:00
counts := make ( map [ string ] int )
2023-06-08 06:18:19 +02:00
for rows . Next ( ) {
var id sql . NullString
var cnt sql . NullInt64
if err := rows . Scan ( & id , & cnt ) ; err != nil {
log . Warn ( "Error while scanning rows" )
return nil , err
}
if id . Valid {
2023-06-09 09:09:41 +02:00
counts [ id . String ] = int ( cnt . Int64 )
2023-06-08 06:18:19 +02:00
}
}
2023-06-09 09:09:41 +02:00
switch kind {
case "running" :
for _ , s := range stats {
s . RunningJobs = counts [ s . ID ]
}
case "short" :
for _ , s := range stats {
s . ShortJobs = counts [ s . ID ]
}
2023-06-08 06:18:19 +02:00
}
2023-06-09 09:09:41 +02:00
2023-06-20 15:47:38 +02:00
log . Debugf ( "Timer AddJobCountGrouped %s" , time . Since ( start ) )
2023-06-08 06:18:19 +02:00
return stats , nil
}
2023-06-09 13:15:25 +02:00
func ( r * JobRepository ) AddJobCount (
ctx context . Context ,
filter [ ] * model . JobFilter ,
stats [ ] * model . JobsStatistics ,
2024-07-12 13:42:12 +02:00
kind string ,
) ( [ ] * model . JobsStatistics , error ) {
2023-06-09 13:15:25 +02:00
start := time . Now ( )
query := r . buildCountQuery ( filter , kind , "" )
query , err := SecurityCheck ( ctx , query )
if err != nil {
return nil , err
}
rows , err := query . RunWith ( r . DB ) . Query ( )
if err != nil {
log . Warn ( "Error while querying DB for job statistics" )
return nil , err
}
2023-08-24 14:26:23 +02:00
var count int
2023-06-09 13:15:25 +02:00
for rows . Next ( ) {
var cnt sql . NullInt64
if err := rows . Scan ( & cnt ) ; err != nil {
log . Warn ( "Error while scanning rows" )
return nil , err
}
2023-08-24 14:26:23 +02:00
count = int ( cnt . Int64 )
2023-06-09 13:15:25 +02:00
}
switch kind {
case "running" :
for _ , s := range stats {
2023-08-24 14:26:23 +02:00
s . RunningJobs = count
2023-06-09 13:15:25 +02:00
}
case "short" :
for _ , s := range stats {
2023-08-24 14:26:23 +02:00
s . ShortJobs = count
2023-06-09 13:15:25 +02:00
}
}
2023-08-24 14:26:23 +02:00
log . Debugf ( "Timer AddJobCount %s" , time . Since ( start ) )
2023-06-09 13:15:25 +02:00
return stats , nil
}
2023-06-07 11:58:58 +02:00
func ( r * JobRepository ) AddHistograms (
ctx context . Context ,
filter [ ] * model . JobFilter ,
2024-07-12 13:42:12 +02:00
stat * model . JobsStatistics ,
2025-01-22 12:07:12 +01:00
targetBinCount * int ,
2024-07-12 13:42:12 +02:00
) ( * model . JobsStatistics , error ) {
2023-06-09 15:02:22 +02:00
start := time . Now ( )
2023-06-07 11:58:58 +02:00
2025-01-22 12:07:12 +01:00
// targetBinCount : Frontendargument
2025-01-24 10:39:33 +01:00
// -> Min Bins: 25 -> Min Resolution: By Hour
// -> In Between Bins: 50 -> Resolution by Half Hour
// 100 -> Resolution by Quarter Hour
// 150 -> Resolution by 10 Minutes
// 300 -> Resolution by 5 Minutes
// 750 -> Resolution by 2 Minutes
// -> Max Bins: 1500 -> Max Resolution: By Minute
binSizeSeconds := ( 90000 / * targetBinCount )
// Important Note: Fixed to 25h max display range -> Too site specific! Configurable or Extend? -> Start view with "classic" by hour histogram, zoom mostly required for "small" runtimes
2025-01-22 12:07:12 +01:00
2023-06-07 11:58:58 +02:00
castType := r . getCastType ( )
var err error
2025-01-22 12:07:12 +01:00
// Return X-Values always as seconds, will be formatted into minutes and hours in frontend
value := fmt . Sprintf ( ` CAST(ROUND(((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) / %d) + 1) as %s) as value ` , time . Now ( ) . Unix ( ) , binSizeSeconds , castType )
2025-01-23 12:23:29 +01:00
stat . HistDuration , err = r . jobsDurationStatisticsHistogram ( ctx , value , filter , binSizeSeconds , targetBinCount )
2023-06-07 11:58:58 +02:00
if err != nil {
2025-01-16 12:25:49 +01:00
log . Warn ( "Error while loading job statistics histogram: job duration" )
2023-06-07 11:58:58 +02:00
return nil , err
}
stat . HistNumNodes , err = r . jobsStatisticsHistogram ( ctx , "job.num_nodes as value" , filter )
if err != nil {
log . Warn ( "Error while loading job statistics histogram: num nodes" )
2023-08-29 14:02:23 +02:00
return nil , err
}
stat . HistNumCores , err = r . jobsStatisticsHistogram ( ctx , "job.num_hwthreads as value" , filter )
if err != nil {
log . Warn ( "Error while loading job statistics histogram: num hwthreads" )
return nil , err
}
stat . HistNumAccs , err = r . jobsStatisticsHistogram ( ctx , "job.num_acc as value" , filter )
if err != nil {
log . Warn ( "Error while loading job statistics histogram: num acc" )
2023-06-07 11:58:58 +02:00
return nil , err
}
2023-06-20 15:47:38 +02:00
log . Debugf ( "Timer AddHistograms %s" , time . Since ( start ) )
2023-06-07 11:58:58 +02:00
return stat , nil
}
2023-12-01 13:22:01 +01:00
// Requires thresholds for metric from config for cluster? Of all clusters and use largest? split to 10 + 1 for artifacts?
func ( r * JobRepository ) AddMetricHistograms (
ctx context . Context ,
filter [ ] * model . JobFilter ,
metrics [ ] string ,
2024-07-12 13:42:12 +02:00
stat * model . JobsStatistics ,
2025-01-22 12:07:12 +01:00
targetBinCount * int ,
2024-07-12 13:42:12 +02:00
) ( * model . JobsStatistics , error ) {
2023-12-01 13:22:01 +01:00
start := time . Now ( )
2023-12-12 16:46:03 +01:00
// Running Jobs Only: First query jobdata from sqlite, then query data and make bins
for _ , f := range filter {
if f . State != nil {
if len ( f . State ) == 1 && f . State [ 0 ] == "running" {
2025-01-22 12:07:12 +01:00
stat . HistMetrics = r . runningJobsMetricStatisticsHistogram ( ctx , metrics , filter , targetBinCount )
2023-12-12 16:46:03 +01:00
log . Debugf ( "Timer AddMetricHistograms %s" , time . Since ( start ) )
return stat , nil
}
}
}
// All other cases: Query and make bins in sqlite directly
2023-12-08 12:03:04 +01:00
for _ , m := range metrics {
2025-01-22 12:07:12 +01:00
metricHisto , err := r . jobsMetricStatisticsHistogram ( ctx , m , filter , targetBinCount )
2023-12-01 13:22:01 +01:00
if err != nil {
log . Warnf ( "Error while loading job metric statistics histogram: %s" , m )
continue
}
stat . HistMetrics = append ( stat . HistMetrics , metricHisto )
}
log . Debugf ( "Timer AddMetricHistograms %s" , time . Since ( start ) )
return stat , nil
}
2023-06-07 11:58:58 +02:00
// `value` must be the column grouped by, but renamed to "value"
func ( r * JobRepository ) jobsStatisticsHistogram (
ctx context . Context ,
value string ,
2024-07-12 13:42:12 +02:00
filters [ ] * model . JobFilter ,
) ( [ ] * model . HistoPoint , error ) {
2023-06-06 10:27:55 +02:00
start := time . Now ( )
2023-06-07 11:58:58 +02:00
query , qerr := SecurityCheck ( ctx ,
sq . Select ( value , "COUNT(job.id) AS count" ) . From ( "job" ) )
2023-06-06 10:27:55 +02:00
if qerr != nil {
return nil , qerr
}
for _ , f := range filters {
query = BuildWhereClause ( f , query )
}
rows , err := query . GroupBy ( "value" ) . RunWith ( r . DB ) . Query ( )
if err != nil {
log . Error ( "Error while running query" )
return nil , err
}
points := make ( [ ] * model . HistoPoint , 0 )
2025-01-23 12:23:29 +01:00
// is it possible to introduce zero values here? requires info about bincount
2023-06-06 10:27:55 +02:00
for rows . Next ( ) {
point := model . HistoPoint { }
if err := rows . Scan ( & point . Value , & point . Count ) ; err != nil {
log . Warn ( "Error while scanning rows" )
return nil , err
}
points = append ( points , & point )
}
2023-06-20 15:47:38 +02:00
log . Debugf ( "Timer jobsStatisticsHistogram %s" , time . Since ( start ) )
2025-01-23 12:23:29 +01:00
return points , nil
}
func ( r * JobRepository ) jobsDurationStatisticsHistogram (
ctx context . Context ,
value string ,
filters [ ] * model . JobFilter ,
binSizeSeconds int ,
targetBinCount * int ,
) ( [ ] * model . HistoPoint , error ) {
start := time . Now ( )
query , qerr := SecurityCheck ( ctx ,
sq . Select ( value , "COUNT(job.id) AS count" ) . From ( "job" ) )
if qerr != nil {
return nil , qerr
}
// Setup Array
points := make ( [ ] * model . HistoPoint , 0 )
for i := 1 ; i <= * targetBinCount ; i ++ {
point := model . HistoPoint { Value : i * binSizeSeconds , Count : 0 }
points = append ( points , & point )
}
for _ , f := range filters {
query = BuildWhereClause ( f , query )
}
rows , err := query . GroupBy ( "value" ) . RunWith ( r . DB ) . Query ( )
if err != nil {
log . Error ( "Error while running query" )
return nil , err
}
// Fill Array at matching $Value
for rows . Next ( ) {
point := model . HistoPoint { }
if err := rows . Scan ( & point . Value , & point . Count ) ; err != nil {
log . Warn ( "Error while scanning rows" )
return nil , err
}
for _ , e := range points {
if e . Value == ( point . Value * binSizeSeconds ) {
// Note:
// Matching on unmodified integer value (and multiplying point.Value by binSizeSeconds after match)
// causes frontend to loop into highest targetBinCount, due to zoom condition instantly being fullfilled (cause unknown)
e . Count = point . Count
break
}
}
}
log . Debugf ( "Timer jobsStatisticsHistogram %s" , time . Since ( start ) )
2023-06-06 10:27:55 +02:00
return points , nil
}
2023-12-01 13:22:01 +01:00
func ( r * JobRepository ) jobsMetricStatisticsHistogram (
ctx context . Context ,
metric string ,
2024-07-12 13:42:12 +02:00
filters [ ] * model . JobFilter ,
2025-01-22 12:07:12 +01:00
bins * int ,
2024-07-12 13:42:12 +02:00
) ( * model . MetricHistoPoints , error ) {
2023-12-01 13:22:01 +01:00
// Get specific Peak or largest Peak
var metricConfig * schema . MetricConfig
2024-11-25 16:44:50 +01:00
var peak float64
var unit string
var footprintStat string
2023-12-05 11:59:01 +01:00
2023-12-01 13:22:01 +01:00
for _ , f := range filters {
if f . Cluster != nil {
metricConfig = archive . GetMetricConfig ( * f . Cluster . Eq , metric )
peak = metricConfig . Peak
2023-12-05 15:30:40 +01:00
unit = metricConfig . Unit . Prefix + metricConfig . Unit . Base
2024-07-22 15:41:33 +02:00
footprintStat = metricConfig . Footprint
2023-12-05 11:59:01 +01:00
log . Debugf ( "Cluster %s filter found with peak %f for %s" , * f . Cluster . Eq , peak , metric )
}
}
if peak == 0.0 {
for _ , c := range archive . Clusters {
for _ , m := range c . MetricConfig {
if m . Name == metric {
if m . Peak > peak {
peak = m . Peak
2023-12-01 13:22:01 +01:00
}
2023-12-05 15:30:40 +01:00
if unit == "" {
unit = m . Unit . Prefix + m . Unit . Base
}
2024-07-22 15:41:33 +02:00
if footprintStat == "" {
footprintStat = m . Footprint
}
2023-12-01 13:22:01 +01:00
}
}
}
}
2024-07-22 15:41:33 +02:00
// log.Debugf("Metric %s, Peak %f, Unit %s, Aggregation %s", metric, peak, unit, aggreg)
2023-12-05 11:59:01 +01:00
// Make bins, see https://jereze.com/code/sql-histogram/
2023-12-08 12:03:04 +01:00
2023-12-01 13:22:01 +01:00
start := time . Now ( )
2024-07-22 15:41:33 +02:00
jm := fmt . Sprintf ( ` json_extract(footprint, "$.%s") ` , ( metric + "_" + footprintStat ) )
2023-12-08 12:03:04 +01:00
crossJoinQuery := sq . Select (
2024-07-12 13:42:12 +02:00
fmt . Sprintf ( ` max(%s) as max ` , jm ) ,
fmt . Sprintf ( ` min(%s) as min ` , jm ) ,
2023-12-08 12:03:04 +01:00
) . From ( "job" ) . Where (
2024-07-22 15:41:33 +02:00
"JSON_VALID(footprint)" ,
) . Where (
2024-07-12 13:42:12 +02:00
fmt . Sprintf ( ` %s is not null ` , jm ) ,
2023-12-08 12:03:04 +01:00
) . Where (
2024-07-12 13:42:12 +02:00
fmt . Sprintf ( ` %s <= %f ` , jm , peak ) ,
2023-12-08 12:03:04 +01:00
)
crossJoinQuery , cjqerr := SecurityCheck ( ctx , crossJoinQuery )
2023-12-11 13:55:56 +01:00
2023-12-08 12:03:04 +01:00
if cjqerr != nil {
return nil , cjqerr
}
2023-12-11 13:55:56 +01:00
for _ , f := range filters {
crossJoinQuery = BuildWhereClause ( f , crossJoinQuery )
}
2023-12-12 15:07:23 +01:00
crossJoinQuerySql , crossJoinQueryArgs , sqlerr := crossJoinQuery . ToSql ( )
2023-12-08 12:03:04 +01:00
if sqlerr != nil {
return nil , sqlerr
}
2024-07-12 13:42:12 +02:00
binQuery := fmt . Sprintf ( ` CAST ( ( case when % s = value . max
then value . max * 0.999999999 else % s end - value . min ) / ( value . max -
2025-01-23 17:48:45 +01:00
value . min ) * % v as INTEGER ) ` , jm , jm , * bins )
2023-12-08 12:03:04 +01:00
mainQuery := sq . Select (
fmt . Sprintf ( ` %s + 1 as bin ` , binQuery ) ,
2024-07-12 14:08:48 +02:00
fmt . Sprintf ( ` count(%s) as count ` , jm ) ,
2025-01-23 17:48:45 +01:00
fmt . Sprintf ( ` CAST(((value.max / %d) * (%v )) as INTEGER ) as min ` , * bins , binQuery ) ,
fmt . Sprintf ( ` CAST(((value.max / %d) * (%v + 1 )) as INTEGER ) as max ` , * bins , binQuery ) ,
2023-12-08 12:03:04 +01:00
) . From ( "job" ) . CrossJoin (
2023-12-12 15:07:23 +01:00
fmt . Sprintf ( ` (%s) as value ` , crossJoinQuerySql ) , crossJoinQueryArgs ... ,
2024-07-12 14:08:48 +02:00
) . Where ( fmt . Sprintf ( ` %s is not null and %s <= %f ` , jm , jm , peak ) )
2023-12-05 11:59:01 +01:00
2023-12-08 12:03:04 +01:00
mainQuery , qerr := SecurityCheck ( ctx , mainQuery )
2023-12-01 13:22:01 +01:00
if qerr != nil {
return nil , qerr
}
for _ , f := range filters {
2023-12-08 12:03:04 +01:00
mainQuery = BuildWhereClause ( f , mainQuery )
2023-12-01 13:22:01 +01:00
}
2023-12-05 11:59:01 +01:00
// Finalize query with Grouping and Ordering
2023-12-08 12:03:04 +01:00
mainQuery = mainQuery . GroupBy ( "bin" ) . OrderBy ( "bin" )
2023-12-05 11:59:01 +01:00
2023-12-08 12:03:04 +01:00
rows , err := mainQuery . RunWith ( r . DB ) . Query ( )
2023-12-01 13:22:01 +01:00
if err != nil {
2023-12-08 12:03:04 +01:00
log . Errorf ( "Error while running mainQuery: %s" , err )
2023-12-01 13:22:01 +01:00
return nil , err
}
2025-01-23 17:48:45 +01:00
// Setup Array
2023-12-05 11:59:01 +01:00
points := make ( [ ] * model . MetricHistoPoint , 0 )
2025-01-23 17:48:45 +01:00
for i := 1 ; i <= * bins ; i ++ {
binMax := ( ( int ( peak ) / * bins ) * i )
binMin := ( ( int ( peak ) / * bins ) * ( i - 1 ) )
point := model . MetricHistoPoint { Bin : & i , Count : 0 , Min : & binMin , Max : & binMax }
points = append ( points , & point )
}
2023-12-01 13:22:01 +01:00
for rows . Next ( ) {
2023-12-05 11:59:01 +01:00
point := model . MetricHistoPoint { }
2023-12-08 12:03:04 +01:00
if err := rows . Scan ( & point . Bin , & point . Count , & point . Min , & point . Max ) ; err != nil {
2024-07-12 14:08:48 +02:00
log . Warnf ( "Error while scanning rows for %s" , jm )
2023-12-08 12:03:04 +01:00
return nil , err // Totally bricks cc-backend if returned and if all metrics requested?
2023-12-01 13:22:01 +01:00
}
2025-01-23 17:48:45 +01:00
for _ , e := range points {
if e . Bin != nil && point . Bin != nil {
if * e . Bin == * point . Bin {
e . Count = point . Count
if point . Min != nil {
e . Min = point . Min
}
if point . Max != nil {
e . Max = point . Max
}
break
}
}
}
2023-12-01 13:22:01 +01:00
}
2023-12-05 11:59:01 +01:00
2024-07-22 15:41:33 +02:00
result := model . MetricHistoPoints { Metric : metric , Unit : unit , Stat : & footprintStat , Data : points }
2023-12-05 11:59:01 +01:00
2023-12-01 13:22:01 +01:00
log . Debugf ( "Timer jobsStatisticsHistogram %s" , time . Since ( start ) )
2023-12-05 11:59:01 +01:00
return & result , nil
2023-12-01 13:22:01 +01:00
}
2023-12-12 16:46:03 +01:00
func ( r * JobRepository ) runningJobsMetricStatisticsHistogram (
ctx context . Context ,
metrics [ ] string ,
2024-07-12 13:42:12 +02:00
filters [ ] * model . JobFilter ,
2025-01-22 12:07:12 +01:00
bins * int ,
2024-07-12 13:42:12 +02:00
) [ ] * model . MetricHistoPoints {
2025-01-23 17:48:45 +01:00
2023-12-12 16:46:03 +01:00
// Get Jobs
jobs , err := r . QueryJobs ( ctx , filters , & model . PageRequest { Page : 1 , ItemsPerPage : 500 + 1 } , nil )
if err != nil {
log . Errorf ( "Error while querying jobs for footprint: %s" , err )
return nil
}
if len ( jobs ) > 500 {
log . Errorf ( "too many jobs matched (max: %d)" , 500 )
return nil
}
// Get AVGs from metric repo
avgs := make ( [ ] [ ] schema . Float , len ( metrics ) )
for i := range avgs {
avgs [ i ] = make ( [ ] schema . Float , 0 , len ( jobs ) )
}
for _ , job := range jobs {
if job . MonitoringStatus == schema . MonitoringStatusDisabled || job . MonitoringStatus == schema . MonitoringStatusArchivingFailed {
continue
}
2024-08-28 10:03:04 +02:00
if err := metricDataDispatcher . LoadAverages ( job , metrics , avgs , ctx ) ; err != nil {
2023-12-12 16:46:03 +01:00
log . Errorf ( "Error while loading averages for histogram: %s" , err )
return nil
}
}
// Iterate metrics to fill endresult
data := make ( [ ] * model . MetricHistoPoints , 0 )
for idx , metric := range metrics {
// Get specific Peak or largest Peak
var metricConfig * schema . MetricConfig
2024-11-25 16:44:50 +01:00
var peak float64
var unit string
2023-12-12 16:46:03 +01:00
for _ , f := range filters {
if f . Cluster != nil {
metricConfig = archive . GetMetricConfig ( * f . Cluster . Eq , metric )
peak = metricConfig . Peak
unit = metricConfig . Unit . Prefix + metricConfig . Unit . Base
}
}
if peak == 0.0 {
for _ , c := range archive . Clusters {
for _ , m := range c . MetricConfig {
if m . Name == metric {
if m . Peak > peak {
peak = m . Peak
}
if unit == "" {
unit = m . Unit . Prefix + m . Unit . Base
}
}
}
}
}
// Make and fill bins
2025-01-23 17:48:45 +01:00
peakBin := int ( peak ) / * bins
2023-12-12 16:46:03 +01:00
points := make ( [ ] * model . MetricHistoPoint , 0 )
2025-01-23 17:48:45 +01:00
for b := 0 ; b < * bins ; b ++ {
2023-12-12 16:46:03 +01:00
count := 0
bindex := b + 1
2025-01-23 17:48:45 +01:00
bmin := peakBin * b
bmax := peakBin * ( b + 1 )
2023-12-12 16:46:03 +01:00
// Iterate AVG values for indexed metric and count for bins
for _ , val := range avgs [ idx ] {
2025-01-23 17:48:45 +01:00
if int ( val ) >= bmin && int ( val ) < bmax {
2023-12-12 16:46:03 +01:00
count += 1
}
}
// Append Bin to Metric Result Array
2025-01-23 17:48:45 +01:00
point := model . MetricHistoPoint { Bin : & bindex , Count : count , Min : & bmin , Max : & bmax }
2023-12-12 16:46:03 +01:00
points = append ( points , & point )
}
// Append Metric Result Array to final results array
result := model . MetricHistoPoints { Metric : metric , Unit : unit , Data : points }
data = append ( data , & result )
}
return data
}