2022-07-29 06:29:21 +02:00
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
2021-10-26 10:24:43 +02:00
package graph
import (
"context"
"database/sql"
2021-12-17 15:49:22 +01:00
"errors"
2021-10-26 10:24:43 +02:00
"fmt"
"math"
2022-03-31 09:44:26 +02:00
"time"
2021-10-26 10:24:43 +02:00
"github.com/99designs/gqlgen/graphql"
2022-06-21 17:52:36 +02:00
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
"github.com/ClusterCockpit/cc-backend/internal/metricdata"
"github.com/ClusterCockpit/cc-backend/internal/repository"
2022-09-05 17:46:38 +02:00
"github.com/ClusterCockpit/cc-backend/pkg/archive"
2022-06-21 17:52:36 +02:00
"github.com/ClusterCockpit/cc-backend/pkg/schema"
2021-10-26 10:24:43 +02:00
sq "github.com/Masterminds/squirrel"
)
// GraphQL validation should make sure that no unkown values can be specified.
var groupBy2column = map [ model . Aggregate ] string {
2021-12-17 15:49:22 +01:00
model . AggregateUser : "job.user" ,
model . AggregateProject : "job.project" ,
model . AggregateCluster : "job.cluster" ,
2021-10-26 10:24:43 +02:00
}
2022-03-08 10:26:51 +01:00
const ShortJobDuration int = 5 * 60
2022-03-02 10:48:52 +01:00
2021-10-26 10:24:43 +02:00
// Helper function for the jobsStatistics GraphQL query placed here so that schema.resolvers.go is not too full.
func ( r * queryResolver ) jobsStatistics ( ctx context . Context , filter [ ] * model . JobFilter , groupBy * model . Aggregate ) ( [ ] * model . JobsStatistics , error ) {
// In case `groupBy` is nil (not used), the model.JobsStatistics used is at the key '' (empty string)
stats := map [ string ] * model . JobsStatistics { }
// `socketsPerNode` and `coresPerSocket` can differ from cluster to cluster, so we need to explicitly loop over those.
2022-09-05 17:46:38 +02:00
for _ , cluster := range archive . Clusters {
2022-03-14 10:18:56 +01:00
for _ , subcluster := range cluster . SubClusters {
corehoursCol := fmt . Sprintf ( "CAST(ROUND(SUM(job.duration * job.num_nodes * %d * %d) / 3600) as int)" , subcluster . SocketsPerNode , subcluster . CoresPerSocket )
2023-01-27 18:36:58 +01:00
var rawQuery sq . SelectBuilder
2021-12-17 15:49:22 +01:00
if groupBy == nil {
2023-01-27 18:36:58 +01:00
rawQuery = sq . Select (
2021-12-17 15:49:22 +01:00
"''" ,
"COUNT(job.id)" ,
2022-02-03 15:59:45 +01:00
"CAST(ROUND(SUM(job.duration) / 3600) as int)" ,
2021-12-17 15:49:22 +01:00
corehoursCol ,
) . From ( "job" )
} else {
col := groupBy2column [ * groupBy ]
2023-01-27 18:36:58 +01:00
rawQuery = sq . Select (
2021-12-17 15:49:22 +01:00
col ,
"COUNT(job.id)" ,
2022-02-03 15:59:45 +01:00
"CAST(ROUND(SUM(job.duration) / 3600) as int)" ,
2021-12-17 15:49:22 +01:00
corehoursCol ,
) . From ( "job" ) . GroupBy ( col )
}
2021-10-26 10:24:43 +02:00
2023-01-27 18:36:58 +01:00
rawQuery = rawQuery .
2021-12-17 15:49:22 +01:00
Where ( "job.cluster = ?" , cluster . Name ) .
2022-03-14 10:18:56 +01:00
Where ( "job.subcluster = ?" , subcluster . Name )
2021-10-26 10:24:43 +02:00
2023-01-27 18:36:58 +01:00
query , qerr := repository . SecurityCheck ( ctx , rawQuery )
if qerr != nil {
return nil , qerr
}
2021-12-17 15:49:22 +01:00
for _ , f := range filter {
2022-02-17 09:04:57 +01:00
query = repository . BuildWhereClause ( f , query )
2021-12-17 15:49:22 +01:00
}
2021-10-26 10:24:43 +02:00
2021-12-17 15:49:22 +01:00
rows , err := query . RunWith ( r . DB ) . Query ( )
if err != nil {
2021-10-26 10:24:43 +02:00
return nil , err
}
2021-12-17 15:49:22 +01:00
for rows . Next ( ) {
var id sql . NullString
var jobs , walltime , corehours sql . NullInt64
if err := rows . Scan ( & id , & jobs , & walltime , & corehours ) ; err != nil {
return nil , err
}
if id . Valid {
if s , ok := stats [ id . String ] ; ok {
s . TotalJobs += int ( jobs . Int64 )
s . TotalWalltime += int ( walltime . Int64 )
s . TotalCoreHours += int ( corehours . Int64 )
} else {
stats [ id . String ] = & model . JobsStatistics {
ID : id . String ,
TotalJobs : int ( jobs . Int64 ) ,
TotalWalltime : int ( walltime . Int64 ) ,
TotalCoreHours : int ( corehours . Int64 ) ,
}
2021-10-26 10:24:43 +02:00
}
}
}
}
}
if groupBy == nil {
2023-01-27 18:36:58 +01:00
query , qerr := repository . SecurityCheck ( ctx , sq . Select ( "COUNT(job.id)" ) . From ( "job" ) . Where ( "job.duration < ?" , ShortJobDuration ) )
if qerr != nil {
return nil , qerr
}
2021-10-26 10:24:43 +02:00
for _ , f := range filter {
2022-02-17 09:04:57 +01:00
query = repository . BuildWhereClause ( f , query )
2021-10-26 10:24:43 +02:00
}
if err := query . RunWith ( r . DB ) . QueryRow ( ) . Scan ( & ( stats [ "" ] . ShortJobs ) ) ; err != nil {
return nil , err
}
} else {
col := groupBy2column [ * groupBy ]
2023-01-27 18:36:58 +01:00
query , qerr := repository . SecurityCheck ( ctx , sq . Select ( col , "COUNT(job.id)" ) . From ( "job" ) . Where ( "job.duration < ?" , ShortJobDuration ) )
if qerr != nil {
return nil , qerr
}
2021-11-26 10:35:07 +01:00
for _ , f := range filter {
2022-02-17 09:04:57 +01:00
query = repository . BuildWhereClause ( f , query )
2021-11-26 10:35:07 +01:00
}
rows , err := query . RunWith ( r . DB ) . Query ( )
2021-10-26 10:24:43 +02:00
if err != nil {
return nil , err
}
for rows . Next ( ) {
var id sql . NullString
var shortJobs sql . NullInt64
if err := rows . Scan ( & id , & shortJobs ) ; err != nil {
return nil , err
}
if id . Valid {
stats [ id . String ] . ShortJobs = int ( shortJobs . Int64 )
}
}
}
// Calculating the histogram data is expensive, so only do it if needed.
// An explicit resolver can not be used because we need to know the filters.
histogramsNeeded := false
fields := graphql . CollectFieldsCtx ( ctx , nil )
for _ , col := range fields {
2022-03-31 09:44:26 +02:00
if col . Name == "histDuration" || col . Name == "histNumNodes" {
2021-10-26 10:24:43 +02:00
histogramsNeeded = true
}
}
res := make ( [ ] * model . JobsStatistics , 0 , len ( stats ) )
for _ , stat := range stats {
res = append ( res , stat )
id , col := "" , ""
if groupBy != nil {
id = stat . ID
col = groupBy2column [ * groupBy ]
}
if histogramsNeeded {
var err error
2022-03-31 09:44:26 +02:00
value := fmt . Sprintf ( ` CAST(ROUND((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) / 3600) as int) as value ` , time . Now ( ) . Unix ( ) )
stat . HistDuration , err = r . jobsStatisticsHistogram ( ctx , value , filter , id , col )
2021-10-26 10:24:43 +02:00
if err != nil {
return nil , err
}
2021-12-08 10:12:19 +01:00
stat . HistNumNodes , err = r . jobsStatisticsHistogram ( ctx , "job.num_nodes as value" , filter , id , col )
2021-10-26 10:24:43 +02:00
if err != nil {
return nil , err
}
}
}
return res , nil
}
// `value` must be the column grouped by, but renamed to "value". `id` and `col` can optionally be used
// to add a condition to the query of the kind "<col> = <id>".
2021-12-08 10:12:19 +01:00
func ( r * queryResolver ) jobsStatisticsHistogram ( ctx context . Context , value string , filters [ ] * model . JobFilter , id , col string ) ( [ ] * model . HistoPoint , error ) {
2023-01-27 18:36:58 +01:00
query , qerr := repository . SecurityCheck ( ctx , sq . Select ( value , "COUNT(job.id) AS count" ) . From ( "job" ) )
if qerr != nil {
return nil , qerr
}
2021-10-26 10:24:43 +02:00
for _ , f := range filters {
2022-02-17 09:04:57 +01:00
query = repository . BuildWhereClause ( f , query )
2021-10-26 10:24:43 +02:00
}
if len ( id ) != 0 && len ( col ) != 0 {
query = query . Where ( col + " = ?" , id )
}
rows , err := query . GroupBy ( "value" ) . RunWith ( r . DB ) . Query ( )
if err != nil {
return nil , err
}
points := make ( [ ] * model . HistoPoint , 0 )
for rows . Next ( ) {
point := model . HistoPoint { }
if err := rows . Scan ( & point . Value , & point . Count ) ; err != nil {
return nil , err
}
points = append ( points , & point )
}
return points , nil
}
2022-02-17 09:04:57 +01:00
const MAX_JOBS_FOR_ANALYSIS = 500
2021-10-26 10:24:43 +02:00
// Helper function for the rooflineHeatmap GraphQL query placed here so that schema.resolvers.go is not too full.
2022-09-07 12:24:45 +02:00
func ( r * queryResolver ) rooflineHeatmap (
ctx context . Context ,
filter [ ] * model . JobFilter ,
rows int , cols int ,
2022-09-12 13:33:01 +02:00
minX float64 , minY float64 , maxX float64 , maxY float64 ) ( [ ] [ ] float64 , error ) {
2022-09-07 12:24:45 +02:00
2022-02-17 09:04:57 +01:00
jobs , err := r . Repo . QueryJobs ( ctx , filter , & model . PageRequest { Page : 1 , ItemsPerPage : MAX_JOBS_FOR_ANALYSIS + 1 } , nil )
2021-10-26 10:24:43 +02:00
if err != nil {
return nil , err
}
2022-02-17 09:04:57 +01:00
if len ( jobs ) > MAX_JOBS_FOR_ANALYSIS {
return nil , fmt . Errorf ( "too many jobs matched (max: %d)" , MAX_JOBS_FOR_ANALYSIS )
2021-10-26 10:24:43 +02:00
}
fcols , frows := float64 ( cols ) , float64 ( rows )
minX , minY , maxX , maxY = math . Log10 ( minX ) , math . Log10 ( minY ) , math . Log10 ( maxX ) , math . Log10 ( maxY )
2022-09-12 13:33:01 +02:00
tiles := make ( [ ] [ ] float64 , rows )
2021-10-26 10:24:43 +02:00
for i := range tiles {
2022-09-12 13:33:01 +02:00
tiles [ i ] = make ( [ ] float64 , cols )
2021-10-26 10:24:43 +02:00
}
for _ , job := range jobs {
2022-03-30 09:39:13 +02:00
if job . MonitoringStatus == schema . MonitoringStatusDisabled || job . MonitoringStatus == schema . MonitoringStatusArchivingFailed {
continue
}
2022-01-07 09:44:34 +01:00
jobdata , err := metricdata . LoadData ( job , [ ] string { "flops_any" , "mem_bw" } , [ ] schema . MetricScope { schema . MetricScopeNode } , ctx )
2021-10-26 10:24:43 +02:00
if err != nil {
return nil , err
}
2021-12-17 15:49:22 +01:00
flops_ , membw_ := jobdata [ "flops_any" ] , jobdata [ "mem_bw" ]
if flops_ == nil && membw_ == nil {
return nil , fmt . Errorf ( "'flops_any' or 'mem_bw' missing for job %d" , job . ID )
}
flops , ok1 := flops_ [ "node" ]
membw , ok2 := membw_ [ "node" ]
if ! ok1 || ! ok2 {
// TODO/FIXME:
return nil , errors . New ( "todo: rooflineHeatmap() query not implemented for where flops_any or mem_bw not available at 'node' level" )
2021-10-26 10:24:43 +02:00
}
for n := 0 ; n < len ( flops . Series ) ; n ++ {
flopsSeries , membwSeries := flops . Series [ n ] , membw . Series [ n ]
for i := 0 ; i < len ( flopsSeries . Data ) ; i ++ {
if i >= len ( membwSeries . Data ) {
break
}
x , y := math . Log10 ( float64 ( flopsSeries . Data [ i ] / membwSeries . Data [ i ] ) ) , math . Log10 ( float64 ( flopsSeries . Data [ i ] ) )
if math . IsNaN ( x ) || math . IsNaN ( y ) || x < minX || x >= maxX || y < minY || y > maxY {
continue
}
x , y = math . Floor ( ( ( x - minX ) / ( maxX - minX ) ) * fcols ) , math . Floor ( ( ( y - minY ) / ( maxY - minY ) ) * frows )
if x < 0 || x >= fcols || y < 0 || y >= frows {
continue
}
tiles [ int ( y ) ] [ int ( x ) ] += 1
}
}
}
return tiles , nil
}
// Helper function for the jobsFootprints GraphQL query placed here so that schema.resolvers.go is not too full.
2022-03-16 16:11:28 +01:00
func ( r * queryResolver ) jobsFootprints ( ctx context . Context , filter [ ] * model . JobFilter , metrics [ ] string ) ( * model . Footprints , error ) {
2022-02-17 09:04:57 +01:00
jobs , err := r . Repo . QueryJobs ( ctx , filter , & model . PageRequest { Page : 1 , ItemsPerPage : MAX_JOBS_FOR_ANALYSIS + 1 } , nil )
2021-10-26 10:24:43 +02:00
if err != nil {
return nil , err
}
2022-02-17 09:04:57 +01:00
if len ( jobs ) > MAX_JOBS_FOR_ANALYSIS {
return nil , fmt . Errorf ( "too many jobs matched (max: %d)" , MAX_JOBS_FOR_ANALYSIS )
2021-10-26 10:24:43 +02:00
}
avgs := make ( [ ] [ ] schema . Float , len ( metrics ) )
for i := range avgs {
avgs [ i ] = make ( [ ] schema . Float , 0 , len ( jobs ) )
}
2022-03-16 16:11:28 +01:00
nodehours := make ( [ ] schema . Float , 0 , len ( jobs ) )
2021-10-26 10:24:43 +02:00
for _ , job := range jobs {
2022-03-30 09:39:13 +02:00
if job . MonitoringStatus == schema . MonitoringStatusDisabled || job . MonitoringStatus == schema . MonitoringStatusArchivingFailed {
continue
}
2021-10-26 10:24:43 +02:00
if err := metricdata . LoadAverages ( job , metrics , avgs , ctx ) ; err != nil {
return nil , err
}
2022-03-16 16:11:28 +01:00
nodehours = append ( nodehours , schema . Float ( float64 ( job . Duration ) / 60.0 * float64 ( job . NumNodes ) ) )
2021-10-26 10:24:43 +02:00
}
res := make ( [ ] * model . MetricFootprints , len ( avgs ) )
for i , arr := range avgs {
res [ i ] = & model . MetricFootprints {
2022-03-16 16:11:28 +01:00
Metric : metrics [ i ] ,
Data : arr ,
2021-10-26 10:24:43 +02:00
}
}
2022-03-16 16:11:28 +01:00
return & model . Footprints {
Nodehours : nodehours ,
Metrics : res ,
} , nil
2021-10-26 10:24:43 +02:00
}