2022-02-06 09:48:31 +01:00
package repository
import (
2022-02-09 15:03:12 +01:00
"context"
"database/sql"
2022-02-22 09:25:41 +01:00
"encoding/json"
2022-02-09 15:03:12 +01:00
"errors"
2022-03-08 11:53:24 +01:00
"fmt"
2022-02-09 15:03:12 +01:00
"strconv"
2022-06-21 17:52:36 +02:00
"sync"
2022-02-22 09:25:41 +01:00
"time"
2022-02-09 15:03:12 +01:00
2022-06-21 17:52:36 +02:00
"github.com/ClusterCockpit/cc-backend/internal/auth"
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
"github.com/ClusterCockpit/cc-backend/pkg/log"
2022-06-22 06:11:00 +02:00
"github.com/ClusterCockpit/cc-backend/pkg/lrucache"
2022-06-21 17:52:36 +02:00
"github.com/ClusterCockpit/cc-backend/pkg/schema"
2022-02-06 09:48:31 +01:00
sq "github.com/Masterminds/squirrel"
"github.com/jmoiron/sqlx"
)
2022-06-21 17:52:36 +02:00
var (
jobRepoOnce sync . Once
jobRepoInstance * JobRepository
)
2022-02-06 09:48:31 +01:00
type JobRepository struct {
DB * sqlx . DB
2022-02-22 09:25:41 +01:00
stmtCache * sq . StmtCache
2022-03-14 09:08:02 +01:00
cache * lrucache . Cache
2022-02-06 09:48:31 +01:00
}
2022-06-21 17:52:36 +02:00
func GetRepository ( ) * JobRepository {
jobRepoOnce . Do ( func ( ) {
db := GetConnection ( )
jobRepoInstance = & JobRepository {
DB : db . DB ,
stmtCache : sq . NewStmtCache ( db . DB ) ,
cache : lrucache . New ( 1024 * 1024 ) ,
}
} )
return jobRepoInstance
2022-02-19 10:28:29 +01:00
}
2022-02-22 09:25:41 +01:00
var jobColumns [ ] string = [ ] string {
2022-03-14 10:18:56 +01:00
"job.id" , "job.job_id" , "job.user" , "job.project" , "job.cluster" , "job.subcluster" , "job.start_time" , "job.partition" , "job.array_job_id" ,
2022-02-22 09:25:41 +01:00
"job.num_nodes" , "job.num_hwthreads" , "job.num_acc" , "job.exclusive" , "job.monitoring_status" , "job.smt" , "job.job_state" ,
2022-03-14 10:18:56 +01:00
"job.duration" , "job.walltime" , "job.resources" , // "job.meta_data",
2022-02-22 09:25:41 +01:00
}
func scanJob ( row interface { Scan ( ... interface { } ) error } ) ( * schema . Job , error ) {
job := & schema . Job { }
if err := row . Scan (
2022-03-14 10:18:56 +01:00
& job . ID , & job . JobID , & job . User , & job . Project , & job . Cluster , & job . SubCluster , & job . StartTimeUnix , & job . Partition , & job . ArrayJobId ,
2022-02-22 09:25:41 +01:00
& job . NumNodes , & job . NumHWThreads , & job . NumAcc , & job . Exclusive , & job . MonitoringStatus , & job . SMT , & job . State ,
2022-03-14 10:18:56 +01:00
& job . Duration , & job . Walltime , & job . RawResources /*&job.MetaData*/ ) ; err != nil {
2022-02-22 09:25:41 +01:00
return nil , err
}
if err := json . Unmarshal ( job . RawResources , & job . Resources ) ; err != nil {
return nil , err
}
job . StartTime = time . Unix ( job . StartTimeUnix , 0 )
if job . Duration == 0 && job . State == schema . JobStateRunning {
job . Duration = int32 ( time . Since ( job . StartTime ) . Seconds ( ) )
}
job . RawResources = nil
return job , nil
}
2022-03-08 11:53:24 +01:00
func ( r * JobRepository ) FetchMetadata ( job * schema . Job ) ( map [ string ] string , error ) {
2022-03-17 11:18:22 +01:00
cachekey := fmt . Sprintf ( "metadata:%d" , job . ID )
if cached := r . cache . Get ( cachekey , nil ) ; cached != nil {
job . MetaData = cached . ( map [ string ] string )
return job . MetaData , nil
}
2022-03-08 11:53:24 +01:00
if err := sq . Select ( "job.meta_data" ) . From ( "job" ) . Where ( "job.id = ?" , job . ID ) .
RunWith ( r . stmtCache ) . QueryRow ( ) . Scan ( & job . RawMetaData ) ; err != nil {
return nil , err
}
if len ( job . RawMetaData ) == 0 {
return nil , nil
}
if err := json . Unmarshal ( job . RawMetaData , & job . MetaData ) ; err != nil {
return nil , err
}
2022-03-17 11:18:22 +01:00
r . cache . Put ( cachekey , job . MetaData , len ( job . RawMetaData ) , 24 * time . Hour )
2022-03-08 11:53:24 +01:00
return job . MetaData , nil
}
2022-03-17 11:18:22 +01:00
func ( r * JobRepository ) UpdateMetadata ( job * schema . Job , key , val string ) ( err error ) {
cachekey := fmt . Sprintf ( "metadata:%d" , job . ID )
r . cache . Del ( cachekey )
if job . MetaData == nil {
if _ , err = r . FetchMetadata ( job ) ; err != nil {
return err
}
}
if job . MetaData != nil {
cpy := make ( map [ string ] string , len ( job . MetaData ) + 1 )
for k , v := range job . MetaData {
cpy [ k ] = v
}
cpy [ key ] = val
job . MetaData = cpy
} else {
job . MetaData = map [ string ] string { key : val }
}
if job . RawMetaData , err = json . Marshal ( job . MetaData ) ; err != nil {
return err
}
if _ , err = sq . Update ( "job" ) . Set ( "meta_data" , job . RawMetaData ) . Where ( "job.id = ?" , job . ID ) . RunWith ( r . stmtCache ) . Exec ( ) ; err != nil {
return err
}
r . cache . Put ( cachekey , job . MetaData , len ( job . RawMetaData ) , 24 * time . Hour )
return nil
}
2022-02-07 09:57:06 +01:00
// Find executes a SQL query to find a specific batch job.
// The job is queried using the batch job id, the cluster name,
// and the start time of the job in UNIX epoch time seconds.
// It returns a pointer to a schema.Job data structure and an error variable.
2022-02-07 14:56:46 +01:00
// To check if no job was found test err == sql.ErrNoRows
2022-02-07 09:57:06 +01:00
func ( r * JobRepository ) Find (
2022-02-15 17:13:16 +01:00
jobId * int64 ,
cluster * string ,
startTime * int64 ) ( * schema . Job , error ) {
2022-02-22 09:25:41 +01:00
q := sq . Select ( jobColumns ... ) . From ( "job" ) .
2022-03-08 10:33:56 +01:00
Where ( "job.job_id = ?" , * jobId )
2022-02-15 17:13:16 +01:00
if cluster != nil {
2022-02-22 09:25:41 +01:00
q = q . Where ( "job.cluster = ?" , * cluster )
2022-02-15 17:13:16 +01:00
}
if startTime != nil {
2022-02-22 09:25:41 +01:00
q = q . Where ( "job.start_time = ?" , * startTime )
2022-02-07 07:09:47 +01:00
}
2022-02-22 09:25:41 +01:00
return scanJob ( q . RunWith ( r . stmtCache ) . QueryRow ( ) )
2022-02-07 07:09:47 +01:00
}
2022-02-07 09:57:06 +01:00
// FindById executes a SQL query to find a specific batch job.
// The job is queried using the database id.
// It returns a pointer to a schema.Job data structure and an error variable.
2022-02-07 14:56:46 +01:00
// To check if no job was found test err == sql.ErrNoRows
2022-02-07 09:57:06 +01:00
func ( r * JobRepository ) FindById (
jobId int64 ) ( * schema . Job , error ) {
2022-02-22 09:25:41 +01:00
q := sq . Select ( jobColumns ... ) .
From ( "job" ) . Where ( "job.id = ?" , jobId )
return scanJob ( q . RunWith ( r . stmtCache ) . QueryRow ( ) )
2022-02-07 07:09:47 +01:00
}
2022-02-08 12:49:28 +01:00
// Start inserts a new job in the table, returning the unique job ID.
// Statistics are not transfered!
func ( r * JobRepository ) Start ( job * schema . JobMeta ) ( id int64 , err error ) {
2022-03-08 11:53:24 +01:00
job . RawResources , err = json . Marshal ( job . Resources )
if err != nil {
return - 1 , fmt . Errorf ( "encoding resources field failed: %w" , err )
}
job . RawMetaData , err = json . Marshal ( job . MetaData )
if err != nil {
return - 1 , fmt . Errorf ( "encoding metaData field failed: %w" , err )
}
2022-02-08 12:49:28 +01:00
res , err := r . DB . NamedExec ( ` INSERT INTO job (
2022-03-14 09:08:02 +01:00
job_id , user , project , cluster , subcluster , ` +" ` partition ` "+ ` , array_job_id , num_nodes , num_hwthreads , num_acc ,
exclusive , monitoring_status , smt , job_state , start_time , duration , walltime , resources , meta_data
2022-02-07 07:09:47 +01:00
) VALUES (
2022-03-14 09:08:02 +01:00
: job_id , : user , : project , : cluster , : subcluster , : partition , : array_job_id , : num_nodes , : num_hwthreads , : num_acc ,
: exclusive , : monitoring_status , : smt , : job_state , : start_time , : duration , : walltime , : resources , : meta_data
2022-02-07 07:09:47 +01:00
) ; ` , job )
2022-02-08 12:49:28 +01:00
if err != nil {
return - 1 , err
}
return res . LastInsertId ( )
2022-02-07 07:09:47 +01:00
}
2022-02-08 12:49:28 +01:00
// Stop updates the job with the database id jobId using the provided arguments.
2022-02-07 07:09:47 +01:00
func ( r * JobRepository ) Stop (
jobId int64 ,
duration int32 ,
2022-02-15 14:25:39 +01:00
state schema . JobState ,
monitoringStatus int32 ) ( err error ) {
2022-02-07 07:09:47 +01:00
stmt := sq . Update ( "job" ) .
Set ( "job_state" , state ) .
Set ( "duration" , duration ) .
2022-02-15 14:25:39 +01:00
Set ( "monitoring_status" , monitoringStatus ) .
2022-02-07 07:09:47 +01:00
Where ( "job.id = ?" , jobId )
2022-02-22 09:25:41 +01:00
_ , err = stmt . RunWith ( r . stmtCache ) . Exec ( )
2022-02-15 11:33:59 +01:00
return
2022-02-15 11:10:49 +01:00
}
2022-02-24 11:54:36 +01:00
// TODO: Use node hours instead: SELECT job.user, sum(job.num_nodes * (CASE WHEN job.job_state = "running" THEN CAST(strftime('%s', 'now') AS INTEGER) - job.start_time ELSE job.duration END)) as x FROM job GROUP BY user ORDER BY x DESC;
2022-03-25 10:20:33 +01:00
func ( r * JobRepository ) CountGroupedJobs ( ctx context . Context , aggreg model . Aggregate , filters [ ] * model . JobFilter , weight * model . Weights , limit * int ) ( map [ string ] int , error ) {
2022-02-19 10:28:29 +01:00
if ! aggreg . IsValid ( ) {
return nil , errors . New ( "invalid aggregate" )
}
2022-03-25 10:20:33 +01:00
runner := ( sq . BaseRunner ) ( r . stmtCache )
count := "count(*) as count"
if weight != nil {
switch * weight {
case model . WeightsNodeCount :
count = "sum(job.num_nodes) as count"
case model . WeightsNodeHours :
now := time . Now ( ) . Unix ( )
count = fmt . Sprintf ( ` sum(job.num_nodes * (CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END)) as count ` , now )
runner = r . DB
}
}
q := sq . Select ( "job." + string ( aggreg ) , count ) . From ( "job" ) . GroupBy ( "job." + string ( aggreg ) ) . OrderBy ( "count DESC" )
2022-02-19 10:28:29 +01:00
q = SecurityCheck ( ctx , q )
for _ , f := range filters {
q = BuildWhereClause ( f , q )
2022-02-16 12:29:54 +01:00
}
2022-02-19 10:28:29 +01:00
if limit != nil {
q = q . Limit ( uint64 ( * limit ) )
2022-02-16 12:29:54 +01:00
}
counts := map [ string ] int { }
2022-03-25 10:20:33 +01:00
rows , err := q . RunWith ( runner ) . Query ( )
2022-02-16 12:29:54 +01:00
if err != nil {
return nil , err
}
for rows . Next ( ) {
2022-02-19 10:28:29 +01:00
var group string
2022-02-16 12:29:54 +01:00
var count int
2022-02-19 10:28:29 +01:00
if err := rows . Scan ( & group , & count ) ; err != nil {
2022-02-16 12:29:54 +01:00
return nil , err
}
2022-02-19 10:28:29 +01:00
counts [ group ] = count
2022-02-16 12:29:54 +01:00
}
return counts , nil
}
2022-02-15 13:18:27 +01:00
func ( r * JobRepository ) UpdateMonitoringStatus ( job int64 , monitoringStatus int32 ) ( err error ) {
stmt := sq . Update ( "job" ) .
Set ( "monitoring_status" , monitoringStatus ) .
Where ( "job.id = ?" , job )
2022-02-22 09:25:41 +01:00
_ , err = stmt . RunWith ( r . stmtCache ) . Exec ( )
2022-02-15 13:18:27 +01:00
return
}
2022-02-15 11:10:49 +01:00
// Stop updates the job with the database id jobId using the provided arguments.
func ( r * JobRepository ) Archive (
jobId int64 ,
monitoringStatus int32 ,
2022-02-15 13:18:27 +01:00
metricStats map [ string ] schema . JobStatistics ) error {
2022-02-15 11:10:49 +01:00
stmt := sq . Update ( "job" ) .
Set ( "monitoring_status" , monitoringStatus ) .
Where ( "job.id = ?" , jobId )
2022-02-07 07:09:47 +01:00
for metric , stats := range metricStats {
switch metric {
case "flops_any" :
stmt = stmt . Set ( "flops_any_avg" , stats . Avg )
case "mem_used" :
stmt = stmt . Set ( "mem_used_max" , stats . Max )
case "mem_bw" :
stmt = stmt . Set ( "mem_bw_avg" , stats . Avg )
case "load" :
stmt = stmt . Set ( "load_avg" , stats . Avg )
case "net_bw" :
stmt = stmt . Set ( "net_bw_avg" , stats . Avg )
case "file_bw" :
stmt = stmt . Set ( "file_bw_avg" , stats . Avg )
}
}
2022-02-22 09:25:41 +01:00
if _ , err := stmt . RunWith ( r . stmtCache ) . Exec ( ) ; err != nil {
2022-02-15 13:18:27 +01:00
return err
2022-02-07 07:09:47 +01:00
}
2022-02-15 13:18:27 +01:00
return nil
2022-02-07 07:09:47 +01:00
}
2022-02-09 15:03:12 +01:00
var ErrNotFound = errors . New ( "no such job or user" )
// FindJobOrUser returns a job database ID or a username if a job or user machtes the search term.
// As 0 is a valid job id, check if username is "" instead in order to check what machted.
// If nothing matches the search, `ErrNotFound` is returned.
func ( r * JobRepository ) FindJobOrUser ( ctx context . Context , searchterm string ) ( job int64 , username string , err error ) {
user := auth . GetUser ( ctx )
if id , err := strconv . Atoi ( searchterm ) ; err == nil {
qb := sq . Select ( "job.id" ) . From ( "job" ) . Where ( "job.job_id = ?" , id )
if user != nil && ! user . HasRole ( auth . RoleAdmin ) {
qb = qb . Where ( "job.user = ?" , user . Username )
}
2022-02-22 09:25:41 +01:00
err := qb . RunWith ( r . stmtCache ) . QueryRow ( ) . Scan ( & job )
2022-02-09 15:03:12 +01:00
if err != nil && err != sql . ErrNoRows {
return 0 , "" , err
} else if err == nil {
return job , "" , nil
}
}
if user == nil || user . HasRole ( auth . RoleAdmin ) {
err := sq . Select ( "job.user" ) . Distinct ( ) . From ( "job" ) .
Where ( "job.user = ?" , searchterm ) .
2022-02-22 09:25:41 +01:00
RunWith ( r . stmtCache ) . QueryRow ( ) . Scan ( & username )
2022-02-09 15:03:12 +01:00
if err != nil && err != sql . ErrNoRows {
return 0 , "" , err
} else if err == nil {
return 0 , username , nil
}
}
return 0 , "" , ErrNotFound
}
2022-03-14 09:08:02 +01:00
func ( r * JobRepository ) Partitions ( cluster string ) ( [ ] string , error ) {
var err error
partitions := r . cache . Get ( "partitions:" + cluster , func ( ) ( interface { } , time . Duration , int ) {
parts := [ ] string { }
if err = r . DB . Select ( & parts , ` SELECT DISTINCT job.partition FROM job WHERE job.cluster = ?; ` , cluster ) ; err != nil {
return nil , 0 , 1000
}
return parts , 1 * time . Hour , 1
} )
if err != nil {
return nil , err
}
return partitions . ( [ ] string ) , nil
}
2022-03-24 10:32:08 +01:00
2022-03-24 16:08:47 +01:00
// AllocatedNodes returns a map of all subclusters to a map of hostnames to the amount of jobs running on that host.
// Hosts with zero jobs running on them will not show up!
func ( r * JobRepository ) AllocatedNodes ( cluster string ) ( map [ string ] map [ string ] int , error ) {
subclusters := make ( map [ string ] map [ string ] int )
rows , err := sq . Select ( "resources" , "subcluster" ) . From ( "job" ) .
2022-03-24 10:32:08 +01:00
Where ( "job.job_state = 'running'" ) .
Where ( "job.cluster = ?" , cluster ) .
RunWith ( r . stmtCache ) . Query ( )
if err != nil {
return nil , err
}
var raw [ ] byte
defer rows . Close ( )
for rows . Next ( ) {
raw = raw [ 0 : 0 ]
var resources [ ] * schema . Resource
2022-03-24 16:08:47 +01:00
var subcluster string
if err := rows . Scan ( & raw , & subcluster ) ; err != nil {
2022-03-24 10:32:08 +01:00
return nil , err
}
if err := json . Unmarshal ( raw , & resources ) ; err != nil {
return nil , err
}
2022-03-24 16:08:47 +01:00
hosts , ok := subclusters [ subcluster ]
if ! ok {
hosts = make ( map [ string ] int )
subclusters [ subcluster ] = hosts
}
2022-03-24 10:32:08 +01:00
for _ , resource := range resources {
2022-03-24 16:08:47 +01:00
hosts [ resource . Hostname ] += 1
2022-03-24 10:32:08 +01:00
}
}
2022-03-24 16:08:47 +01:00
return subclusters , nil
2022-03-24 10:32:08 +01:00
}
2022-04-07 09:50:32 +02:00
func ( r * JobRepository ) StopJobsExceedingWalltimeBy ( seconds int ) error {
res , err := sq . Update ( "job" ) .
Set ( "monitoring_status" , schema . MonitoringStatusArchivingFailed ) .
Set ( "duration" , 0 ) .
Set ( "job_state" , schema . JobStateFailed ) .
Where ( "job.job_state = 'running'" ) .
2022-05-09 11:53:41 +02:00
Where ( "job.walltime > 0" ) .
2022-04-07 09:50:32 +02:00
Where ( fmt . Sprintf ( "(%d - job.start_time) > (job.walltime + %d)" , time . Now ( ) . Unix ( ) , seconds ) ) .
RunWith ( r . DB ) . Exec ( )
if err != nil {
return err
}
rowsAffected , err := res . RowsAffected ( )
if err != nil {
return err
}
if rowsAffected > 0 {
log . Warnf ( "%d jobs have been marked as failed due to running too long" , rowsAffected )
}
return nil
}