2022-07-29 06:29:21 +02:00
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
2022-02-06 09:48:31 +01:00
package repository
import (
2022-02-09 15:03:12 +01:00
"context"
"database/sql"
2022-02-22 09:25:41 +01:00
"encoding/json"
2022-02-09 15:03:12 +01:00
"errors"
2022-03-08 11:53:24 +01:00
"fmt"
2022-02-09 15:03:12 +01:00
"strconv"
2022-06-21 17:52:36 +02:00
"sync"
2022-02-22 09:25:41 +01:00
"time"
2022-02-09 15:03:12 +01:00
2023-02-13 13:53:24 +01:00
"github.com/99designs/gqlgen/graphql"
2022-06-21 17:52:36 +02:00
"github.com/ClusterCockpit/cc-backend/internal/auth"
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
2022-12-08 15:04:58 +01:00
"github.com/ClusterCockpit/cc-backend/internal/metricdata"
2023-02-13 13:53:24 +01:00
"github.com/ClusterCockpit/cc-backend/pkg/archive"
2022-06-21 17:52:36 +02:00
"github.com/ClusterCockpit/cc-backend/pkg/log"
2022-06-22 06:11:00 +02:00
"github.com/ClusterCockpit/cc-backend/pkg/lrucache"
2022-06-21 17:52:36 +02:00
"github.com/ClusterCockpit/cc-backend/pkg/schema"
2022-02-06 09:48:31 +01:00
sq "github.com/Masterminds/squirrel"
"github.com/jmoiron/sqlx"
)
2022-06-21 17:52:36 +02:00
var (
jobRepoOnce sync . Once
jobRepoInstance * JobRepository
)
2022-02-06 09:48:31 +01:00
type JobRepository struct {
DB * sqlx . DB
2022-02-22 09:25:41 +01:00
stmtCache * sq . StmtCache
2022-03-14 09:08:02 +01:00
cache * lrucache . Cache
2022-12-08 15:04:58 +01:00
archiveChannel chan * schema . Job
archivePending sync . WaitGroup
2022-02-06 09:48:31 +01:00
}
2022-09-05 17:46:38 +02:00
func GetJobRepository ( ) * JobRepository {
2022-06-21 17:52:36 +02:00
jobRepoOnce . Do ( func ( ) {
db := GetConnection ( )
jobRepoInstance = & JobRepository {
2023-02-13 13:53:24 +01:00
DB : db . DB ,
stmtCache : sq . NewStmtCache ( db . DB ) ,
cache : lrucache . New ( 1024 * 1024 ) ,
2022-12-08 15:04:58 +01:00
archiveChannel : make ( chan * schema . Job , 128 ) ,
2022-06-21 17:52:36 +02:00
}
2022-12-08 15:04:58 +01:00
// start archiving worker
go jobRepoInstance . archivingWorker ( )
2022-06-21 17:52:36 +02:00
} )
return jobRepoInstance
2022-02-19 10:28:29 +01:00
}
2022-02-22 09:25:41 +01:00
var jobColumns [ ] string = [ ] string {
2022-03-14 10:18:56 +01:00
"job.id" , "job.job_id" , "job.user" , "job.project" , "job.cluster" , "job.subcluster" , "job.start_time" , "job.partition" , "job.array_job_id" ,
2022-02-22 09:25:41 +01:00
"job.num_nodes" , "job.num_hwthreads" , "job.num_acc" , "job.exclusive" , "job.monitoring_status" , "job.smt" , "job.job_state" ,
2022-03-14 10:18:56 +01:00
"job.duration" , "job.walltime" , "job.resources" , // "job.meta_data",
2022-02-22 09:25:41 +01:00
}
func scanJob ( row interface { Scan ( ... interface { } ) error } ) ( * schema . Job , error ) {
job := & schema . Job { }
if err := row . Scan (
2022-03-14 10:18:56 +01:00
& job . ID , & job . JobID , & job . User , & job . Project , & job . Cluster , & job . SubCluster , & job . StartTimeUnix , & job . Partition , & job . ArrayJobId ,
2022-02-22 09:25:41 +01:00
& job . NumNodes , & job . NumHWThreads , & job . NumAcc , & job . Exclusive , & job . MonitoringStatus , & job . SMT , & job . State ,
2023-02-13 13:53:24 +01:00
& job . Duration , & job . Walltime , & job . RawResources /*&job.RawMetaData*/ ) ; err != nil {
log . Warn ( "Error while scanning rows" )
2022-02-22 09:25:41 +01:00
return nil , err
}
if err := json . Unmarshal ( job . RawResources , & job . Resources ) ; err != nil {
2023-02-01 11:58:27 +01:00
log . Warn ( "Error while unmarhsaling raw resources json" )
2022-02-22 09:25:41 +01:00
return nil , err
}
2023-01-12 11:57:44 +01:00
// if err := json.Unmarshal(job.RawMetaData, &job.MetaData); err != nil {
// return nil, err
// }
2023-01-11 16:25:02 +01:00
2022-02-22 09:25:41 +01:00
job . StartTime = time . Unix ( job . StartTimeUnix , 0 )
if job . Duration == 0 && job . State == schema . JobStateRunning {
job . Duration = int32 ( time . Since ( job . StartTime ) . Seconds ( ) )
}
job . RawResources = nil
return job , nil
}
2023-01-11 16:25:02 +01:00
func ( r * JobRepository ) FetchJobName ( job * schema . Job ) ( * string , error ) {
cachekey := fmt . Sprintf ( "metadata:%d" , job . ID )
if cached := r . cache . Get ( cachekey , nil ) ; cached != nil {
job . MetaData = cached . ( map [ string ] string )
if jobName := job . MetaData [ "jobName" ] ; jobName != "" {
return & jobName , nil
}
}
if err := sq . Select ( "job.meta_data" ) . From ( "job" ) . Where ( "job.id = ?" , job . ID ) .
RunWith ( r . stmtCache ) . QueryRow ( ) . Scan ( & job . RawMetaData ) ; err != nil {
return nil , err
}
if len ( job . RawMetaData ) == 0 {
return nil , nil
}
if err := json . Unmarshal ( job . RawMetaData , & job . MetaData ) ; err != nil {
return nil , err
}
r . cache . Put ( cachekey , job . MetaData , len ( job . RawMetaData ) , 24 * time . Hour )
if jobName := job . MetaData [ "jobName" ] ; jobName != "" {
return & jobName , nil
} else {
return new ( string ) , nil
}
}
2022-03-08 11:53:24 +01:00
func ( r * JobRepository ) FetchMetadata ( job * schema . Job ) ( map [ string ] string , error ) {
2022-03-17 11:18:22 +01:00
cachekey := fmt . Sprintf ( "metadata:%d" , job . ID )
if cached := r . cache . Get ( cachekey , nil ) ; cached != nil {
job . MetaData = cached . ( map [ string ] string )
return job . MetaData , nil
}
2022-03-08 11:53:24 +01:00
if err := sq . Select ( "job.meta_data" ) . From ( "job" ) . Where ( "job.id = ?" , job . ID ) .
RunWith ( r . stmtCache ) . QueryRow ( ) . Scan ( & job . RawMetaData ) ; err != nil {
2023-02-01 11:58:27 +01:00
log . Warn ( "Error while scanning for job metadata" )
2022-03-08 11:53:24 +01:00
return nil , err
}
if len ( job . RawMetaData ) == 0 {
return nil , nil
}
if err := json . Unmarshal ( job . RawMetaData , & job . MetaData ) ; err != nil {
2023-02-01 11:58:27 +01:00
log . Warn ( "Error while unmarshaling raw metadata json" )
2022-03-08 11:53:24 +01:00
return nil , err
}
2022-03-17 11:18:22 +01:00
r . cache . Put ( cachekey , job . MetaData , len ( job . RawMetaData ) , 24 * time . Hour )
2022-03-08 11:53:24 +01:00
return job . MetaData , nil
}
2022-03-17 11:18:22 +01:00
func ( r * JobRepository ) UpdateMetadata ( job * schema . Job , key , val string ) ( err error ) {
cachekey := fmt . Sprintf ( "metadata:%d" , job . ID )
r . cache . Del ( cachekey )
if job . MetaData == nil {
if _ , err = r . FetchMetadata ( job ) ; err != nil {
2023-02-01 11:58:27 +01:00
log . Warnf ( "Error while fetching metadata for job, DB ID '%v'" , job . ID )
2022-03-17 11:18:22 +01:00
return err
}
}
if job . MetaData != nil {
cpy := make ( map [ string ] string , len ( job . MetaData ) + 1 )
for k , v := range job . MetaData {
cpy [ k ] = v
}
cpy [ key ] = val
job . MetaData = cpy
} else {
job . MetaData = map [ string ] string { key : val }
}
if job . RawMetaData , err = json . Marshal ( job . MetaData ) ; err != nil {
2023-02-01 11:58:27 +01:00
log . Warnf ( "Error while marshaling metadata for job, DB ID '%v'" , job . ID )
2022-03-17 11:18:22 +01:00
return err
}
if _ , err = sq . Update ( "job" ) . Set ( "meta_data" , job . RawMetaData ) . Where ( "job.id = ?" , job . ID ) . RunWith ( r . stmtCache ) . Exec ( ) ; err != nil {
2023-02-01 11:58:27 +01:00
log . Warnf ( "Error while updating metadata for job, DB ID '%v'" , job . ID )
2022-03-17 11:18:22 +01:00
return err
}
r . cache . Put ( cachekey , job . MetaData , len ( job . RawMetaData ) , 24 * time . Hour )
return nil
}
2022-02-07 09:57:06 +01:00
// Find executes a SQL query to find a specific batch job.
// The job is queried using the batch job id, the cluster name,
// and the start time of the job in UNIX epoch time seconds.
// It returns a pointer to a schema.Job data structure and an error variable.
2022-02-07 14:56:46 +01:00
// To check if no job was found test err == sql.ErrNoRows
2022-02-07 09:57:06 +01:00
func ( r * JobRepository ) Find (
2022-02-15 17:13:16 +01:00
jobId * int64 ,
cluster * string ,
startTime * int64 ) ( * schema . Job , error ) {
2022-02-22 09:25:41 +01:00
q := sq . Select ( jobColumns ... ) . From ( "job" ) .
2022-03-08 10:33:56 +01:00
Where ( "job.job_id = ?" , * jobId )
2022-02-15 17:13:16 +01:00
if cluster != nil {
2022-02-22 09:25:41 +01:00
q = q . Where ( "job.cluster = ?" , * cluster )
2022-02-15 17:13:16 +01:00
}
if startTime != nil {
2022-02-22 09:25:41 +01:00
q = q . Where ( "job.start_time = ?" , * startTime )
2022-02-07 07:09:47 +01:00
}
2022-02-22 09:25:41 +01:00
return scanJob ( q . RunWith ( r . stmtCache ) . QueryRow ( ) )
2022-02-07 07:09:47 +01:00
}
2022-11-09 11:49:36 +01:00
// Find executes a SQL query to find a specific batch job.
// The job is queried using the batch job id, the cluster name,
// and the start time of the job in UNIX epoch time seconds.
// It returns a pointer to a schema.Job data structure and an error variable.
// To check if no job was found test err == sql.ErrNoRows
func ( r * JobRepository ) FindAll (
jobId * int64 ,
cluster * string ,
startTime * int64 ) ( [ ] * schema . Job , error ) {
q := sq . Select ( jobColumns ... ) . From ( "job" ) .
Where ( "job.job_id = ?" , * jobId )
if cluster != nil {
q = q . Where ( "job.cluster = ?" , * cluster )
}
if startTime != nil {
q = q . Where ( "job.start_time = ?" , * startTime )
}
rows , err := q . RunWith ( r . stmtCache ) . Query ( )
if err != nil {
2023-01-31 18:28:44 +01:00
log . Error ( "Error while running query" )
2022-11-09 11:49:36 +01:00
return nil , err
}
jobs := make ( [ ] * schema . Job , 0 , 10 )
for rows . Next ( ) {
job , err := scanJob ( rows )
if err != nil {
2023-02-01 11:58:27 +01:00
log . Warn ( "Error while scanning rows" )
2022-11-09 11:49:36 +01:00
return nil , err
}
jobs = append ( jobs , job )
}
return jobs , nil
}
2022-02-07 09:57:06 +01:00
// FindById executes a SQL query to find a specific batch job.
// The job is queried using the database id.
// It returns a pointer to a schema.Job data structure and an error variable.
2022-02-07 14:56:46 +01:00
// To check if no job was found test err == sql.ErrNoRows
2022-09-23 15:23:45 +02:00
func ( r * JobRepository ) FindById ( jobId int64 ) ( * schema . Job , error ) {
2022-02-22 09:25:41 +01:00
q := sq . Select ( jobColumns ... ) .
From ( "job" ) . Where ( "job.id = ?" , jobId )
return scanJob ( q . RunWith ( r . stmtCache ) . QueryRow ( ) )
2022-02-07 07:09:47 +01:00
}
2022-02-08 12:49:28 +01:00
// Start inserts a new job in the table, returning the unique job ID.
// Statistics are not transfered!
func ( r * JobRepository ) Start ( job * schema . JobMeta ) ( id int64 , err error ) {
2022-03-08 11:53:24 +01:00
job . RawResources , err = json . Marshal ( job . Resources )
if err != nil {
2023-01-19 16:59:14 +01:00
return - 1 , fmt . Errorf ( "REPOSITORY/JOB > encoding resources field failed: %w" , err )
2022-03-08 11:53:24 +01:00
}
job . RawMetaData , err = json . Marshal ( job . MetaData )
if err != nil {
2023-01-19 16:59:14 +01:00
return - 1 , fmt . Errorf ( "REPOSITORY/JOB > encoding metaData field failed: %w" , err )
2022-03-08 11:53:24 +01:00
}
2022-02-08 12:49:28 +01:00
res , err := r . DB . NamedExec ( ` INSERT INTO job (
2022-03-14 09:08:02 +01:00
job_id , user , project , cluster , subcluster , ` +" ` partition ` "+ ` , array_job_id , num_nodes , num_hwthreads , num_acc ,
exclusive , monitoring_status , smt , job_state , start_time , duration , walltime , resources , meta_data
2022-02-07 07:09:47 +01:00
) VALUES (
2022-03-14 09:08:02 +01:00
: job_id , : user , : project , : cluster , : subcluster , : partition , : array_job_id , : num_nodes , : num_hwthreads , : num_acc ,
: exclusive , : monitoring_status , : smt , : job_state , : start_time , : duration , : walltime , : resources , : meta_data
2022-02-07 07:09:47 +01:00
) ; ` , job )
2022-02-08 12:49:28 +01:00
if err != nil {
return - 1 , err
}
return res . LastInsertId ( )
2022-02-07 07:09:47 +01:00
}
2022-02-08 12:49:28 +01:00
// Stop updates the job with the database id jobId using the provided arguments.
2022-02-07 07:09:47 +01:00
func ( r * JobRepository ) Stop (
jobId int64 ,
duration int32 ,
2022-02-15 14:25:39 +01:00
state schema . JobState ,
monitoringStatus int32 ) ( err error ) {
2022-02-07 07:09:47 +01:00
stmt := sq . Update ( "job" ) .
Set ( "job_state" , state ) .
Set ( "duration" , duration ) .
2022-02-15 14:25:39 +01:00
Set ( "monitoring_status" , monitoringStatus ) .
2022-02-07 07:09:47 +01:00
Where ( "job.id = ?" , jobId )
2022-02-22 09:25:41 +01:00
_ , err = stmt . RunWith ( r . stmtCache ) . Exec ( )
2022-02-15 11:33:59 +01:00
return
2022-02-15 11:10:49 +01:00
}
2022-11-11 15:26:27 +01:00
func ( r * JobRepository ) DeleteJobsBefore ( startTime int64 ) ( int , error ) {
var cnt int
2022-11-25 15:15:05 +01:00
qs := fmt . Sprintf ( "SELECT count(*) FROM job WHERE job.start_time < %d" , startTime )
err := r . DB . Get ( & cnt , qs ) //ignore error as it will also occur in delete statement
_ , err = r . DB . Exec ( ` DELETE FROM job WHERE job.start_time < ? ` , startTime )
if err != nil {
2023-01-31 18:28:44 +01:00
log . Errorf ( " DeleteJobsBefore(%d): error %#v" , startTime , err )
2022-11-25 15:15:05 +01:00
} else {
2023-01-23 18:48:06 +01:00
log . Infof ( "DeleteJobsBefore(%d): Deleted %d jobs" , startTime , cnt )
2022-11-25 15:15:05 +01:00
}
2022-11-11 15:26:27 +01:00
return cnt , err
}
func ( r * JobRepository ) DeleteJobById ( id int64 ) error {
_ , err := r . DB . Exec ( ` DELETE FROM job WHERE job.id = ? ` , id )
2022-11-25 15:15:05 +01:00
if err != nil {
2023-01-31 18:28:44 +01:00
log . Errorf ( "DeleteJobById(%d): error %#v" , id , err )
2022-11-25 15:15:05 +01:00
} else {
2023-01-23 18:48:06 +01:00
log . Infof ( "DeleteJobById(%d): Success" , id )
2022-11-25 15:15:05 +01:00
}
2022-11-11 15:26:27 +01:00
return err
}
2022-02-24 11:54:36 +01:00
// TODO: Use node hours instead: SELECT job.user, sum(job.num_nodes * (CASE WHEN job.job_state = "running" THEN CAST(strftime('%s', 'now') AS INTEGER) - job.start_time ELSE job.duration END)) as x FROM job GROUP BY user ORDER BY x DESC;
2022-03-25 10:20:33 +01:00
func ( r * JobRepository ) CountGroupedJobs ( ctx context . Context , aggreg model . Aggregate , filters [ ] * model . JobFilter , weight * model . Weights , limit * int ) ( map [ string ] int , error ) {
2022-02-19 10:28:29 +01:00
if ! aggreg . IsValid ( ) {
return nil , errors . New ( "invalid aggregate" )
}
2022-03-25 10:20:33 +01:00
runner := ( sq . BaseRunner ) ( r . stmtCache )
count := "count(*) as count"
if weight != nil {
switch * weight {
case model . WeightsNodeCount :
count = "sum(job.num_nodes) as count"
case model . WeightsNodeHours :
now := time . Now ( ) . Unix ( )
count = fmt . Sprintf ( ` sum(job.num_nodes * (CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END)) as count ` , now )
runner = r . DB
2023-01-24 12:02:29 +01:00
default :
2023-02-01 11:58:27 +01:00
log . Notef ( "CountGroupedJobs() Weight %v unknown." , * weight )
2022-03-25 10:20:33 +01:00
}
}
q := sq . Select ( "job." + string ( aggreg ) , count ) . From ( "job" ) . GroupBy ( "job." + string ( aggreg ) ) . OrderBy ( "count DESC" )
2022-02-19 10:28:29 +01:00
q = SecurityCheck ( ctx , q )
for _ , f := range filters {
q = BuildWhereClause ( f , q )
2022-02-16 12:29:54 +01:00
}
2022-02-19 10:28:29 +01:00
if limit != nil {
q = q . Limit ( uint64 ( * limit ) )
2022-02-16 12:29:54 +01:00
}
counts := map [ string ] int { }
2022-03-25 10:20:33 +01:00
rows , err := q . RunWith ( runner ) . Query ( )
2022-02-16 12:29:54 +01:00
if err != nil {
2023-01-31 18:28:44 +01:00
log . Error ( "Error while running query" )
2022-02-16 12:29:54 +01:00
return nil , err
}
for rows . Next ( ) {
2022-02-19 10:28:29 +01:00
var group string
2022-02-16 12:29:54 +01:00
var count int
2022-02-19 10:28:29 +01:00
if err := rows . Scan ( & group , & count ) ; err != nil {
2023-02-01 11:58:27 +01:00
log . Warn ( "Error while scanning rows" )
2022-02-16 12:29:54 +01:00
return nil , err
}
2022-02-19 10:28:29 +01:00
counts [ group ] = count
2022-02-16 12:29:54 +01:00
}
return counts , nil
}
2022-02-15 13:18:27 +01:00
func ( r * JobRepository ) UpdateMonitoringStatus ( job int64 , monitoringStatus int32 ) ( err error ) {
stmt := sq . Update ( "job" ) .
Set ( "monitoring_status" , monitoringStatus ) .
Where ( "job.id = ?" , job )
2022-02-22 09:25:41 +01:00
_ , err = stmt . RunWith ( r . stmtCache ) . Exec ( )
2022-02-15 13:18:27 +01:00
return
}
2022-02-15 11:10:49 +01:00
// Stop updates the job with the database id jobId using the provided arguments.
2022-12-08 15:04:58 +01:00
func ( r * JobRepository ) MarkArchived (
2022-02-15 11:10:49 +01:00
jobId int64 ,
monitoringStatus int32 ,
2022-02-15 13:18:27 +01:00
metricStats map [ string ] schema . JobStatistics ) error {
2022-02-15 11:10:49 +01:00
stmt := sq . Update ( "job" ) .
Set ( "monitoring_status" , monitoringStatus ) .
Where ( "job.id = ?" , jobId )
2022-02-07 07:09:47 +01:00
for metric , stats := range metricStats {
switch metric {
case "flops_any" :
stmt = stmt . Set ( "flops_any_avg" , stats . Avg )
case "mem_used" :
stmt = stmt . Set ( "mem_used_max" , stats . Max )
case "mem_bw" :
stmt = stmt . Set ( "mem_bw_avg" , stats . Avg )
case "load" :
stmt = stmt . Set ( "load_avg" , stats . Avg )
case "net_bw" :
stmt = stmt . Set ( "net_bw_avg" , stats . Avg )
case "file_bw" :
stmt = stmt . Set ( "file_bw_avg" , stats . Avg )
2023-01-24 12:02:29 +01:00
default :
2023-02-01 11:58:27 +01:00
log . Notef ( "MarkArchived() Metric '%v' unknown" , metric )
2022-02-07 07:09:47 +01:00
}
}
2022-02-22 09:25:41 +01:00
if _ , err := stmt . RunWith ( r . stmtCache ) . Exec ( ) ; err != nil {
2023-02-01 11:58:27 +01:00
log . Warn ( "Error while marking job as archived" )
2022-02-15 13:18:27 +01:00
return err
2022-02-07 07:09:47 +01:00
}
2022-02-15 13:18:27 +01:00
return nil
2022-02-07 07:09:47 +01:00
}
2022-12-08 15:04:58 +01:00
// Archiving worker thread
2023-02-13 13:53:24 +01:00
func ( r * JobRepository ) archivingWorker ( ) {
2022-12-08 15:04:58 +01:00
for {
select {
2023-02-13 13:53:24 +01:00
case job , ok := <- r . archiveChannel :
2022-12-08 15:04:58 +01:00
if ! ok {
break
}
// not using meta data, called to load JobMeta into Cache?
// will fail if job meta not in repository
if _ , err := r . FetchMetadata ( job ) ; err != nil {
2023-01-23 18:48:06 +01:00
log . Errorf ( "archiving job (dbid: %d) failed: %s" , job . ID , err . Error ( ) )
2022-12-08 15:04:58 +01:00
r . UpdateMonitoringStatus ( job . ID , schema . MonitoringStatusArchivingFailed )
continue
}
// metricdata.ArchiveJob will fetch all the data from a MetricDataRepository and push into configured archive backend
// TODO: Maybe use context with cancel/timeout here
jobMeta , err := metricdata . ArchiveJob ( job , context . Background ( ) )
if err != nil {
2023-01-23 18:48:06 +01:00
log . Errorf ( "archiving job (dbid: %d) failed: %s" , job . ID , err . Error ( ) )
2022-12-08 15:04:58 +01:00
r . UpdateMonitoringStatus ( job . ID , schema . MonitoringStatusArchivingFailed )
continue
}
// Update the jobs database entry one last time:
if err := r . MarkArchived ( job . ID , schema . MonitoringStatusArchivingSuccessful , jobMeta . Statistics ) ; err != nil {
2023-01-23 18:48:06 +01:00
log . Errorf ( "archiving job (dbid: %d) failed: %s" , job . ID , err . Error ( ) )
2022-12-08 15:04:58 +01:00
continue
}
2023-01-23 18:48:06 +01:00
log . Printf ( "archiving job (dbid: %d) successful" , job . ID )
2022-12-08 15:04:58 +01:00
r . archivePending . Done ( )
}
}
}
// Trigger async archiving
2023-02-13 13:53:24 +01:00
func ( r * JobRepository ) TriggerArchiving ( job * schema . Job ) {
2022-12-08 15:04:58 +01:00
r . archivePending . Add ( 1 )
r . archiveChannel <- job
}
// Wait for background thread to finish pending archiving operations
2023-02-13 13:53:24 +01:00
func ( r * JobRepository ) WaitForArchiving ( ) {
2022-12-08 15:04:58 +01:00
// close channel and wait for worker to process remaining jobs
r . archivePending . Wait ( )
}
2023-01-12 11:26:01 +01:00
var ErrNotFound = errors . New ( "no such jobname, project or user" )
var ErrForbidden = errors . New ( "not authorized" )
2022-02-09 15:03:12 +01:00
2023-01-13 09:58:56 +01:00
// FindJobnameOrUserOrProject returns a jobName or a username or a projectId if a jobName or user or project matches the search term.
// If query is found to be an integer (= conversion to INT datatype succeeds), skip back to parent call
2022-02-09 15:03:12 +01:00
// If nothing matches the search, `ErrNotFound` is returned.
2022-12-13 09:53:37 +01:00
2023-01-12 11:26:01 +01:00
func ( r * JobRepository ) FindJobnameOrUserOrProject ( ctx context . Context , searchterm string ) ( metasnip string , username string , project string , err error ) {
2022-02-09 15:03:12 +01:00
user := auth . GetUser ( ctx )
2023-01-12 11:26:01 +01:00
if _ , err := strconv . Atoi ( searchterm ) ; err == nil { // Return empty on successful conversion: parent method will redirect for integer jobId
return "" , "" , "" , nil
} else { // has to have letters
if user == nil || user . HasRole ( auth . RoleAdmin ) || user . HasRole ( auth . RoleSupport ) {
err := sq . Select ( "job.user" ) . Distinct ( ) . From ( "job" ) .
Where ( "job.user = ?" , searchterm ) .
RunWith ( r . stmtCache ) . QueryRow ( ) . Scan ( & username )
if err != nil && err != sql . ErrNoRows {
return "" , "" , "" , err
} else if err == nil {
return "" , username , "" , nil
}
2022-02-09 15:03:12 +01:00
}
2023-01-12 11:26:01 +01:00
if user == nil || user . HasRole ( auth . RoleAdmin ) || user . HasRole ( auth . RoleSupport ) {
err := sq . Select ( "job.project" ) . Distinct ( ) . From ( "job" ) .
Where ( "job.project = ?" , searchterm ) .
RunWith ( r . stmtCache ) . QueryRow ( ) . Scan ( & project )
if err != nil && err != sql . ErrNoRows {
return "" , "" , "" , err
} else if err == nil {
return "" , "" , project , nil
}
2022-02-09 15:03:12 +01:00
}
2023-01-12 11:26:01 +01:00
// All Authorizations: If unlabeled query not username or projectId, try for jobname: Match Metadata, on hit, parent method redirects to jobName GQL query
err := sq . Select ( "job.cluster" ) . Distinct ( ) . From ( "job" ) .
2023-02-13 13:53:24 +01:00
Where ( "job.meta_data LIKE ?" , "%" + searchterm + "%" ) .
2023-01-12 11:26:01 +01:00
RunWith ( r . stmtCache ) . QueryRow ( ) . Scan ( & metasnip )
2022-02-09 15:03:12 +01:00
if err != nil && err != sql . ErrNoRows {
2023-01-12 11:26:01 +01:00
return "" , "" , "" , err
2022-02-09 15:03:12 +01:00
} else if err == nil {
2023-01-12 11:26:01 +01:00
return metasnip [ 0 : 1 ] , "" , "" , nil
2022-02-09 15:03:12 +01:00
}
2023-01-12 11:26:01 +01:00
return "" , "" , "" , ErrNotFound
2022-02-09 15:03:12 +01:00
}
2023-01-12 11:26:01 +01:00
}
2022-02-09 15:03:12 +01:00
2023-01-12 11:26:01 +01:00
func ( r * JobRepository ) FindUser ( ctx context . Context , searchterm string ) ( username string , err error ) {
user := auth . GetUser ( ctx )
2022-08-23 13:33:25 +02:00
if user == nil || user . HasRole ( auth . RoleAdmin ) || user . HasRole ( auth . RoleSupport ) {
2022-02-09 15:03:12 +01:00
err := sq . Select ( "job.user" ) . Distinct ( ) . From ( "job" ) .
Where ( "job.user = ?" , searchterm ) .
2022-02-22 09:25:41 +01:00
RunWith ( r . stmtCache ) . QueryRow ( ) . Scan ( & username )
2022-02-09 15:03:12 +01:00
if err != nil && err != sql . ErrNoRows {
2023-01-12 11:26:01 +01:00
return "" , err
2022-02-09 15:03:12 +01:00
} else if err == nil {
2023-01-12 11:26:01 +01:00
return username , nil
2022-02-09 15:03:12 +01:00
}
2023-01-12 11:26:01 +01:00
return "" , ErrNotFound
} else {
2023-01-12 11:34:21 +01:00
log . Infof ( "Non-Admin User %s : Requested Query Username -> %s: Forbidden" , user . Name , username )
2023-01-12 11:26:01 +01:00
return "" , ErrForbidden
2022-02-09 15:03:12 +01:00
}
2023-01-12 11:26:01 +01:00
}
2022-02-09 15:03:12 +01:00
2023-01-12 11:26:01 +01:00
func ( r * JobRepository ) FindProject ( ctx context . Context , searchterm string ) ( project string , err error ) {
user := auth . GetUser ( ctx )
2022-12-14 10:02:22 +01:00
if user == nil || user . HasRole ( auth . RoleAdmin ) || user . HasRole ( auth . RoleSupport ) {
err := sq . Select ( "job.project" ) . Distinct ( ) . From ( "job" ) .
Where ( "job.project = ?" , searchterm ) .
RunWith ( r . stmtCache ) . QueryRow ( ) . Scan ( & project )
if err != nil && err != sql . ErrNoRows {
2023-01-12 11:26:01 +01:00
return "" , err
2022-12-14 10:02:22 +01:00
} else if err == nil {
2023-01-12 11:26:01 +01:00
return project , nil
2022-12-14 10:02:22 +01:00
}
2023-01-12 11:26:01 +01:00
return "" , ErrNotFound
2022-12-14 10:02:22 +01:00
2023-01-12 11:26:01 +01:00
} else {
2023-01-12 11:34:21 +01:00
log . Infof ( "Non-Admin User %s : Requested Query Project -> %s: Forbidden" , user . Name , project )
2023-01-12 11:26:01 +01:00
return "" , ErrForbidden
}
2022-02-09 15:03:12 +01:00
}
2022-03-14 09:08:02 +01:00
func ( r * JobRepository ) Partitions ( cluster string ) ( [ ] string , error ) {
var err error
partitions := r . cache . Get ( "partitions:" + cluster , func ( ) ( interface { } , time . Duration , int ) {
parts := [ ] string { }
if err = r . DB . Select ( & parts , ` SELECT DISTINCT job.partition FROM job WHERE job.cluster = ?; ` , cluster ) ; err != nil {
return nil , 0 , 1000
}
return parts , 1 * time . Hour , 1
} )
if err != nil {
return nil , err
}
return partitions . ( [ ] string ) , nil
}
2022-03-24 10:32:08 +01:00
2022-03-24 16:08:47 +01:00
// AllocatedNodes returns a map of all subclusters to a map of hostnames to the amount of jobs running on that host.
// Hosts with zero jobs running on them will not show up!
func ( r * JobRepository ) AllocatedNodes ( cluster string ) ( map [ string ] map [ string ] int , error ) {
subclusters := make ( map [ string ] map [ string ] int )
rows , err := sq . Select ( "resources" , "subcluster" ) . From ( "job" ) .
2022-03-24 10:32:08 +01:00
Where ( "job.job_state = 'running'" ) .
Where ( "job.cluster = ?" , cluster ) .
RunWith ( r . stmtCache ) . Query ( )
if err != nil {
2023-01-31 18:28:44 +01:00
log . Error ( "Error while running query" )
2022-03-24 10:32:08 +01:00
return nil , err
}
var raw [ ] byte
defer rows . Close ( )
for rows . Next ( ) {
raw = raw [ 0 : 0 ]
var resources [ ] * schema . Resource
2022-03-24 16:08:47 +01:00
var subcluster string
if err := rows . Scan ( & raw , & subcluster ) ; err != nil {
2023-02-01 11:58:27 +01:00
log . Warn ( "Error while scanning rows" )
2022-03-24 10:32:08 +01:00
return nil , err
}
if err := json . Unmarshal ( raw , & resources ) ; err != nil {
2023-02-01 11:58:27 +01:00
log . Warn ( "Error while unmarshaling raw resources json" )
2022-03-24 10:32:08 +01:00
return nil , err
}
2022-03-24 16:08:47 +01:00
hosts , ok := subclusters [ subcluster ]
if ! ok {
hosts = make ( map [ string ] int )
subclusters [ subcluster ] = hosts
}
2022-03-24 10:32:08 +01:00
for _ , resource := range resources {
2022-03-24 16:08:47 +01:00
hosts [ resource . Hostname ] += 1
2022-03-24 10:32:08 +01:00
}
}
2022-03-24 16:08:47 +01:00
return subclusters , nil
2022-03-24 10:32:08 +01:00
}
2022-04-07 09:50:32 +02:00
func ( r * JobRepository ) StopJobsExceedingWalltimeBy ( seconds int ) error {
res , err := sq . Update ( "job" ) .
Set ( "monitoring_status" , schema . MonitoringStatusArchivingFailed ) .
Set ( "duration" , 0 ) .
Set ( "job_state" , schema . JobStateFailed ) .
Where ( "job.job_state = 'running'" ) .
2022-05-09 11:53:41 +02:00
Where ( "job.walltime > 0" ) .
2022-04-07 09:50:32 +02:00
Where ( fmt . Sprintf ( "(%d - job.start_time) > (job.walltime + %d)" , time . Now ( ) . Unix ( ) , seconds ) ) .
RunWith ( r . DB ) . Exec ( )
if err != nil {
2023-02-01 11:58:27 +01:00
log . Warn ( "Error while stopping jobs exceeding walltime" )
2022-04-07 09:50:32 +02:00
return err
}
rowsAffected , err := res . RowsAffected ( )
if err != nil {
2023-02-01 11:58:27 +01:00
log . Warn ( "Error while fetching affected rows after stopping due to exceeded walltime" )
2022-04-07 09:50:32 +02:00
return err
}
if rowsAffected > 0 {
2023-01-31 18:28:44 +01:00
log . Notef ( "%d jobs have been marked as failed due to running too long" , rowsAffected )
2022-04-07 09:50:32 +02:00
}
return nil
}
2023-02-13 13:53:24 +01:00
// TODO: Move to config
const ShortJobDuration int = 5 * 60
// GraphQL validation should make sure that no unkown values can be specified.
var groupBy2column = map [ model . Aggregate ] string {
model . AggregateUser : "job.user" ,
model . AggregateProject : "job.project" ,
model . AggregateCluster : "job.cluster" ,
}
// Helper function for the jobsStatistics GraphQL query placed here so that schema.resolvers.go is not too full.
func ( r * JobRepository ) JobsStatistics ( ctx context . Context , filter [ ] * model . JobFilter , groupBy * model . Aggregate ) ( [ ] * model . JobsStatistics , error ) {
// In case `groupBy` is nil (not used), the model.JobsStatistics used is at the key '' (empty string)
stats := map [ string ] * model . JobsStatistics { }
// `socketsPerNode` and `coresPerSocket` can differ from cluster to cluster, so we need to explicitly loop over those.
for _ , cluster := range archive . Clusters {
for _ , subcluster := range cluster . SubClusters {
corehoursCol := fmt . Sprintf ( "CAST(ROUND(SUM(job.duration * job.num_nodes * %d * %d) / 3600) as int)" , subcluster . SocketsPerNode , subcluster . CoresPerSocket )
var query sq . SelectBuilder
if groupBy == nil {
query = sq . Select (
"''" ,
"COUNT(job.id)" ,
"CAST(ROUND(SUM(job.duration) / 3600) as int)" ,
corehoursCol ,
) . From ( "job" )
} else {
col := groupBy2column [ * groupBy ]
query = sq . Select (
col ,
"COUNT(job.id)" ,
"CAST(ROUND(SUM(job.duration) / 3600) as int)" ,
corehoursCol ,
) . From ( "job" ) . GroupBy ( col )
}
query = query .
Where ( "job.cluster = ?" , cluster . Name ) .
Where ( "job.subcluster = ?" , subcluster . Name )
query = SecurityCheck ( ctx , query )
for _ , f := range filter {
query = BuildWhereClause ( f , query )
}
rows , err := query . RunWith ( r . DB ) . Query ( )
if err != nil {
log . Warn ( "Error while querying DB for job statistics" )
return nil , err
}
for rows . Next ( ) {
var id sql . NullString
var jobs , walltime , corehours sql . NullInt64
if err := rows . Scan ( & id , & jobs , & walltime , & corehours ) ; err != nil {
log . Warn ( "Error while scanning rows" )
return nil , err
}
if id . Valid {
if s , ok := stats [ id . String ] ; ok {
s . TotalJobs += int ( jobs . Int64 )
s . TotalWalltime += int ( walltime . Int64 )
s . TotalCoreHours += int ( corehours . Int64 )
} else {
stats [ id . String ] = & model . JobsStatistics {
ID : id . String ,
TotalJobs : int ( jobs . Int64 ) ,
TotalWalltime : int ( walltime . Int64 ) ,
TotalCoreHours : int ( corehours . Int64 ) ,
}
}
}
}
}
}
if groupBy == nil {
query := sq . Select ( "COUNT(job.id)" ) . From ( "job" ) . Where ( "job.duration < ?" , ShortJobDuration )
query = SecurityCheck ( ctx , query )
for _ , f := range filter {
query = BuildWhereClause ( f , query )
}
if err := query . RunWith ( r . DB ) . QueryRow ( ) . Scan ( & ( stats [ "" ] . ShortJobs ) ) ; err != nil {
log . Warn ( "Error while scanning rows for short job stats" )
return nil , err
}
} else {
col := groupBy2column [ * groupBy ]
query := sq . Select ( col , "COUNT(job.id)" ) . From ( "job" ) . Where ( "job.duration < ?" , ShortJobDuration )
query = SecurityCheck ( ctx , query )
for _ , f := range filter {
query = BuildWhereClause ( f , query )
}
rows , err := query . RunWith ( r . DB ) . Query ( )
if err != nil {
log . Warn ( "Error while querying jobs for short jobs" )
return nil , err
}
for rows . Next ( ) {
var id sql . NullString
var shortJobs sql . NullInt64
if err := rows . Scan ( & id , & shortJobs ) ; err != nil {
log . Warn ( "Error while scanning rows for short jobs" )
return nil , err
}
if id . Valid {
stats [ id . String ] . ShortJobs = int ( shortJobs . Int64 )
}
}
}
// Calculating the histogram data is expensive, so only do it if needed.
// An explicit resolver can not be used because we need to know the filters.
histogramsNeeded := false
fields := graphql . CollectFieldsCtx ( ctx , nil )
for _ , col := range fields {
if col . Name == "histDuration" || col . Name == "histNumNodes" {
histogramsNeeded = true
}
}
res := make ( [ ] * model . JobsStatistics , 0 , len ( stats ) )
for _ , stat := range stats {
res = append ( res , stat )
id , col := "" , ""
if groupBy != nil {
id = stat . ID
col = groupBy2column [ * groupBy ]
}
if histogramsNeeded {
var err error
value := fmt . Sprintf ( ` CAST(ROUND((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) / 3600) as int) as value ` , time . Now ( ) . Unix ( ) )
stat . HistDuration , err = r . jobsStatisticsHistogram ( ctx , value , filter , id , col )
if err != nil {
log . Warn ( "Error while loading job statistics histogram: running jobs" )
return nil , err
}
stat . HistNumNodes , err = r . jobsStatisticsHistogram ( ctx , "job.num_nodes as value" , filter , id , col )
if err != nil {
log . Warn ( "Error while loading job statistics histogram: num nodes" )
return nil , err
}
}
}
return res , nil
}
// `value` must be the column grouped by, but renamed to "value". `id` and `col` can optionally be used
// to add a condition to the query of the kind "<col> = <id>".
func ( r * JobRepository ) jobsStatisticsHistogram ( ctx context . Context , value string , filters [ ] * model . JobFilter , id , col string ) ( [ ] * model . HistoPoint , error ) {
query := sq . Select ( value , "COUNT(job.id) AS count" ) . From ( "job" )
query = SecurityCheck ( ctx , query )
for _ , f := range filters {
query = BuildWhereClause ( f , query )
}
if len ( id ) != 0 && len ( col ) != 0 {
query = query . Where ( col + " = ?" , id )
}
rows , err := query . GroupBy ( "value" ) . RunWith ( r . DB ) . Query ( )
if err != nil {
log . Error ( "Error while running query" )
return nil , err
}
points := make ( [ ] * model . HistoPoint , 0 )
for rows . Next ( ) {
point := model . HistoPoint { }
if err := rows . Scan ( & point . Value , & point . Count ) ; err != nil {
log . Warn ( "Error while scanning rows" )
return nil , err
}
points = append ( points , & point )
}
return points , nil
}