2022-07-29 06:29:21 +02:00
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
2022-02-06 09:48:31 +01:00
package repository
import (
2022-02-09 15:03:12 +01:00
"context"
"database/sql"
2022-02-22 09:25:41 +01:00
"encoding/json"
2022-02-09 15:03:12 +01:00
"errors"
2022-03-08 11:53:24 +01:00
"fmt"
2022-02-09 15:03:12 +01:00
"strconv"
2022-06-21 17:52:36 +02:00
"sync"
2022-02-22 09:25:41 +01:00
"time"
2022-02-09 15:03:12 +01:00
2022-06-21 17:52:36 +02:00
"github.com/ClusterCockpit/cc-backend/internal/auth"
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
2022-12-08 15:04:58 +01:00
"github.com/ClusterCockpit/cc-backend/internal/metricdata"
2022-06-21 17:52:36 +02:00
"github.com/ClusterCockpit/cc-backend/pkg/log"
2022-06-22 06:11:00 +02:00
"github.com/ClusterCockpit/cc-backend/pkg/lrucache"
2022-06-21 17:52:36 +02:00
"github.com/ClusterCockpit/cc-backend/pkg/schema"
2022-02-06 09:48:31 +01:00
sq "github.com/Masterminds/squirrel"
"github.com/jmoiron/sqlx"
)
2022-06-21 17:52:36 +02:00
var (
jobRepoOnce sync . Once
jobRepoInstance * JobRepository
)
2022-02-06 09:48:31 +01:00
type JobRepository struct {
2023-02-21 15:22:05 +01:00
DB * sqlx . DB
driver string
2022-02-22 09:25:41 +01:00
stmtCache * sq . StmtCache
2022-03-14 09:08:02 +01:00
cache * lrucache . Cache
2022-12-08 15:04:58 +01:00
archiveChannel chan * schema . Job
archivePending sync . WaitGroup
2022-02-06 09:48:31 +01:00
}
2022-09-05 17:46:38 +02:00
func GetJobRepository ( ) * JobRepository {
2022-06-21 17:52:36 +02:00
jobRepoOnce . Do ( func ( ) {
db := GetConnection ( )
jobRepoInstance = & JobRepository {
2023-02-21 17:17:41 +01:00
DB : db . DB ,
driver : db . Driver ,
2023-02-21 16:21:47 +01:00
2023-02-13 13:53:24 +01:00
stmtCache : sq . NewStmtCache ( db . DB ) ,
cache : lrucache . New ( 1024 * 1024 ) ,
2022-12-08 15:04:58 +01:00
archiveChannel : make ( chan * schema . Job , 128 ) ,
2022-06-21 17:52:36 +02:00
}
2022-12-08 15:04:58 +01:00
// start archiving worker
go jobRepoInstance . archivingWorker ( )
2022-06-21 17:52:36 +02:00
} )
return jobRepoInstance
2022-02-19 10:28:29 +01:00
}
2022-02-22 09:25:41 +01:00
var jobColumns [ ] string = [ ] string {
2022-03-14 10:18:56 +01:00
"job.id" , "job.job_id" , "job.user" , "job.project" , "job.cluster" , "job.subcluster" , "job.start_time" , "job.partition" , "job.array_job_id" ,
2022-02-22 09:25:41 +01:00
"job.num_nodes" , "job.num_hwthreads" , "job.num_acc" , "job.exclusive" , "job.monitoring_status" , "job.smt" , "job.job_state" ,
2022-03-14 10:18:56 +01:00
"job.duration" , "job.walltime" , "job.resources" , // "job.meta_data",
2022-02-22 09:25:41 +01:00
}
func scanJob ( row interface { Scan ( ... interface { } ) error } ) ( * schema . Job , error ) {
job := & schema . Job { }
if err := row . Scan (
2022-03-14 10:18:56 +01:00
& job . ID , & job . JobID , & job . User , & job . Project , & job . Cluster , & job . SubCluster , & job . StartTimeUnix , & job . Partition , & job . ArrayJobId ,
2022-02-22 09:25:41 +01:00
& job . NumNodes , & job . NumHWThreads , & job . NumAcc , & job . Exclusive , & job . MonitoringStatus , & job . SMT , & job . State ,
2023-02-13 13:53:24 +01:00
& job . Duration , & job . Walltime , & job . RawResources /*&job.RawMetaData*/ ) ; err != nil {
2023-06-01 17:48:43 +02:00
log . Warnf ( "Error while scanning rows (Job): %v" , err )
2022-02-22 09:25:41 +01:00
return nil , err
}
if err := json . Unmarshal ( job . RawResources , & job . Resources ) ; err != nil {
2023-02-01 11:58:27 +01:00
log . Warn ( "Error while unmarhsaling raw resources json" )
2022-02-22 09:25:41 +01:00
return nil , err
}
2023-01-12 11:57:44 +01:00
// if err := json.Unmarshal(job.RawMetaData, &job.MetaData); err != nil {
// return nil, err
// }
2023-01-11 16:25:02 +01:00
2022-02-22 09:25:41 +01:00
job . StartTime = time . Unix ( job . StartTimeUnix , 0 )
if job . Duration == 0 && job . State == schema . JobStateRunning {
job . Duration = int32 ( time . Since ( job . StartTime ) . Seconds ( ) )
}
job . RawResources = nil
return job , nil
}
2023-05-11 09:40:13 +02:00
func ( r * JobRepository ) Optimize ( ) error {
var err error
switch r . driver {
case "sqlite3" :
if _ , err = r . DB . Exec ( ` VACUUM ` ) ; err != nil {
return err
}
case "mysql" :
log . Info ( "Optimize currently not supported for mysql driver" )
}
return nil
}
2023-05-04 15:34:36 +02:00
func ( r * JobRepository ) Flush ( ) error {
var err error
switch r . driver {
case "sqlite3" :
2023-05-04 16:03:04 +02:00
if _ , err = r . DB . Exec ( ` DELETE FROM jobtag ` ) ; err != nil {
return err
}
if _ , err = r . DB . Exec ( ` DELETE FROM tag ` ) ; err != nil {
return err
}
if _ , err = r . DB . Exec ( ` DELETE FROM job ` ) ; err != nil {
return err
}
2023-05-04 15:34:36 +02:00
case "mysql" :
2023-06-07 16:49:08 +02:00
if _ , err = r . DB . Exec ( ` SET FOREIGN_KEY_CHECKS = 0 ` ) ; err != nil {
return err
}
2023-05-04 16:03:04 +02:00
if _ , err = r . DB . Exec ( ` TRUNCATE TABLE jobtag ` ) ; err != nil {
return err
}
if _ , err = r . DB . Exec ( ` TRUNCATE TABLE tag ` ) ; err != nil {
return err
}
if _ , err = r . DB . Exec ( ` TRUNCATE TABLE job ` ) ; err != nil {
return err
}
2023-06-07 16:49:08 +02:00
if _ , err = r . DB . Exec ( ` SET FOREIGN_KEY_CHECKS = 1 ` ) ; err != nil {
return err
}
2023-05-04 15:34:36 +02:00
}
2023-05-04 16:03:04 +02:00
return nil
2023-05-04 15:34:36 +02:00
}
2023-04-28 12:34:40 +02:00
func scanJobLink ( row interface { Scan ( ... interface { } ) error } ) ( * model . JobLink , error ) {
jobLink := & model . JobLink { }
if err := row . Scan (
& jobLink . ID , & jobLink . JobID ) ; err != nil {
log . Warn ( "Error while scanning rows (jobLink)" )
return nil , err
}
return jobLink , nil
}
2022-03-08 11:53:24 +01:00
func ( r * JobRepository ) FetchMetadata ( job * schema . Job ) ( map [ string ] string , error ) {
2023-02-20 15:08:23 +01:00
start := time . Now ( )
2022-03-17 11:18:22 +01:00
cachekey := fmt . Sprintf ( "metadata:%d" , job . ID )
if cached := r . cache . Get ( cachekey , nil ) ; cached != nil {
job . MetaData = cached . ( map [ string ] string )
return job . MetaData , nil
}
2022-03-08 11:53:24 +01:00
if err := sq . Select ( "job.meta_data" ) . From ( "job" ) . Where ( "job.id = ?" , job . ID ) .
RunWith ( r . stmtCache ) . QueryRow ( ) . Scan ( & job . RawMetaData ) ; err != nil {
2023-02-01 11:58:27 +01:00
log . Warn ( "Error while scanning for job metadata" )
2022-03-08 11:53:24 +01:00
return nil , err
}
if len ( job . RawMetaData ) == 0 {
return nil , nil
}
if err := json . Unmarshal ( job . RawMetaData , & job . MetaData ) ; err != nil {
2023-02-01 11:58:27 +01:00
log . Warn ( "Error while unmarshaling raw metadata json" )
2022-03-08 11:53:24 +01:00
return nil , err
}
2022-03-17 11:18:22 +01:00
r . cache . Put ( cachekey , job . MetaData , len ( job . RawMetaData ) , 24 * time . Hour )
2023-02-21 12:08:06 +01:00
log . Infof ( "Timer FetchMetadata %s" , time . Since ( start ) )
2022-03-08 11:53:24 +01:00
return job . MetaData , nil
}
2022-03-17 11:18:22 +01:00
func ( r * JobRepository ) UpdateMetadata ( job * schema . Job , key , val string ) ( err error ) {
cachekey := fmt . Sprintf ( "metadata:%d" , job . ID )
r . cache . Del ( cachekey )
if job . MetaData == nil {
if _ , err = r . FetchMetadata ( job ) ; err != nil {
2023-02-01 11:58:27 +01:00
log . Warnf ( "Error while fetching metadata for job, DB ID '%v'" , job . ID )
2022-03-17 11:18:22 +01:00
return err
}
}
if job . MetaData != nil {
cpy := make ( map [ string ] string , len ( job . MetaData ) + 1 )
for k , v := range job . MetaData {
cpy [ k ] = v
}
cpy [ key ] = val
job . MetaData = cpy
} else {
job . MetaData = map [ string ] string { key : val }
}
if job . RawMetaData , err = json . Marshal ( job . MetaData ) ; err != nil {
2023-02-01 11:58:27 +01:00
log . Warnf ( "Error while marshaling metadata for job, DB ID '%v'" , job . ID )
2022-03-17 11:18:22 +01:00
return err
}
if _ , err = sq . Update ( "job" ) . Set ( "meta_data" , job . RawMetaData ) . Where ( "job.id = ?" , job . ID ) . RunWith ( r . stmtCache ) . Exec ( ) ; err != nil {
2023-02-01 11:58:27 +01:00
log . Warnf ( "Error while updating metadata for job, DB ID '%v'" , job . ID )
2022-03-17 11:18:22 +01:00
return err
}
r . cache . Put ( cachekey , job . MetaData , len ( job . RawMetaData ) , 24 * time . Hour )
return nil
}
2022-02-07 09:57:06 +01:00
// Find executes a SQL query to find a specific batch job.
// The job is queried using the batch job id, the cluster name,
// and the start time of the job in UNIX epoch time seconds.
// It returns a pointer to a schema.Job data structure and an error variable.
2022-02-07 14:56:46 +01:00
// To check if no job was found test err == sql.ErrNoRows
2022-02-07 09:57:06 +01:00
func ( r * JobRepository ) Find (
2022-02-15 17:13:16 +01:00
jobId * int64 ,
cluster * string ,
startTime * int64 ) ( * schema . Job , error ) {
2023-02-20 15:08:23 +01:00
start := time . Now ( )
2022-02-22 09:25:41 +01:00
q := sq . Select ( jobColumns ... ) . From ( "job" ) .
2022-03-08 10:33:56 +01:00
Where ( "job.job_id = ?" , * jobId )
2022-02-15 17:13:16 +01:00
if cluster != nil {
2022-02-22 09:25:41 +01:00
q = q . Where ( "job.cluster = ?" , * cluster )
2022-02-15 17:13:16 +01:00
}
if startTime != nil {
2022-02-22 09:25:41 +01:00
q = q . Where ( "job.start_time = ?" , * startTime )
2022-02-07 07:09:47 +01:00
}
2023-02-21 12:08:06 +01:00
log . Infof ( "Timer Find %s" , time . Since ( start ) )
2022-02-22 09:25:41 +01:00
return scanJob ( q . RunWith ( r . stmtCache ) . QueryRow ( ) )
2022-02-07 07:09:47 +01:00
}
2022-11-09 11:49:36 +01:00
// Find executes a SQL query to find a specific batch job.
// The job is queried using the batch job id, the cluster name,
// and the start time of the job in UNIX epoch time seconds.
// It returns a pointer to a schema.Job data structure and an error variable.
// To check if no job was found test err == sql.ErrNoRows
func ( r * JobRepository ) FindAll (
jobId * int64 ,
cluster * string ,
startTime * int64 ) ( [ ] * schema . Job , error ) {
2023-02-20 15:08:23 +01:00
start := time . Now ( )
2022-11-09 11:49:36 +01:00
q := sq . Select ( jobColumns ... ) . From ( "job" ) .
Where ( "job.job_id = ?" , * jobId )
if cluster != nil {
q = q . Where ( "job.cluster = ?" , * cluster )
}
if startTime != nil {
q = q . Where ( "job.start_time = ?" , * startTime )
}
rows , err := q . RunWith ( r . stmtCache ) . Query ( )
if err != nil {
2023-01-31 18:28:44 +01:00
log . Error ( "Error while running query" )
2022-11-09 11:49:36 +01:00
return nil , err
}
jobs := make ( [ ] * schema . Job , 0 , 10 )
for rows . Next ( ) {
job , err := scanJob ( rows )
if err != nil {
2023-02-01 11:58:27 +01:00
log . Warn ( "Error while scanning rows" )
2022-11-09 11:49:36 +01:00
return nil , err
}
jobs = append ( jobs , job )
}
2023-02-21 12:08:06 +01:00
log . Infof ( "Timer FindAll %s" , time . Since ( start ) )
2022-11-09 11:49:36 +01:00
return jobs , nil
}
2022-02-07 09:57:06 +01:00
// FindById executes a SQL query to find a specific batch job.
// The job is queried using the database id.
// It returns a pointer to a schema.Job data structure and an error variable.
2022-02-07 14:56:46 +01:00
// To check if no job was found test err == sql.ErrNoRows
2022-09-23 15:23:45 +02:00
func ( r * JobRepository ) FindById ( jobId int64 ) ( * schema . Job , error ) {
2022-02-22 09:25:41 +01:00
q := sq . Select ( jobColumns ... ) .
From ( "job" ) . Where ( "job.id = ?" , jobId )
return scanJob ( q . RunWith ( r . stmtCache ) . QueryRow ( ) )
2022-02-07 07:09:47 +01:00
}
2022-02-08 12:49:28 +01:00
// Start inserts a new job in the table, returning the unique job ID.
// Statistics are not transfered!
func ( r * JobRepository ) Start ( job * schema . JobMeta ) ( id int64 , err error ) {
2022-03-08 11:53:24 +01:00
job . RawResources , err = json . Marshal ( job . Resources )
if err != nil {
2023-01-19 16:59:14 +01:00
return - 1 , fmt . Errorf ( "REPOSITORY/JOB > encoding resources field failed: %w" , err )
2022-03-08 11:53:24 +01:00
}
job . RawMetaData , err = json . Marshal ( job . MetaData )
if err != nil {
2023-01-19 16:59:14 +01:00
return - 1 , fmt . Errorf ( "REPOSITORY/JOB > encoding metaData field failed: %w" , err )
2022-03-08 11:53:24 +01:00
}
2022-02-08 12:49:28 +01:00
res , err := r . DB . NamedExec ( ` INSERT INTO job (
2022-03-14 09:08:02 +01:00
job_id , user , project , cluster , subcluster , ` +" ` partition ` "+ ` , array_job_id , num_nodes , num_hwthreads , num_acc ,
exclusive , monitoring_status , smt , job_state , start_time , duration , walltime , resources , meta_data
2022-02-07 07:09:47 +01:00
) VALUES (
2022-03-14 09:08:02 +01:00
: job_id , : user , : project , : cluster , : subcluster , : partition , : array_job_id , : num_nodes , : num_hwthreads , : num_acc ,
: exclusive , : monitoring_status , : smt , : job_state , : start_time , : duration , : walltime , : resources , : meta_data
2022-02-07 07:09:47 +01:00
) ; ` , job )
2022-02-08 12:49:28 +01:00
if err != nil {
return - 1 , err
}
return res . LastInsertId ( )
2022-02-07 07:09:47 +01:00
}
2022-02-08 12:49:28 +01:00
// Stop updates the job with the database id jobId using the provided arguments.
2022-02-07 07:09:47 +01:00
func ( r * JobRepository ) Stop (
jobId int64 ,
duration int32 ,
2022-02-15 14:25:39 +01:00
state schema . JobState ,
monitoringStatus int32 ) ( err error ) {
2022-02-07 07:09:47 +01:00
stmt := sq . Update ( "job" ) .
Set ( "job_state" , state ) .
Set ( "duration" , duration ) .
2022-02-15 14:25:39 +01:00
Set ( "monitoring_status" , monitoringStatus ) .
2022-02-07 07:09:47 +01:00
Where ( "job.id = ?" , jobId )
2022-02-22 09:25:41 +01:00
_ , err = stmt . RunWith ( r . stmtCache ) . Exec ( )
2022-02-15 11:33:59 +01:00
return
2022-02-15 11:10:49 +01:00
}
2022-11-11 15:26:27 +01:00
func ( r * JobRepository ) DeleteJobsBefore ( startTime int64 ) ( int , error ) {
var cnt int
2022-11-25 15:15:05 +01:00
qs := fmt . Sprintf ( "SELECT count(*) FROM job WHERE job.start_time < %d" , startTime )
err := r . DB . Get ( & cnt , qs ) //ignore error as it will also occur in delete statement
_ , err = r . DB . Exec ( ` DELETE FROM job WHERE job.start_time < ? ` , startTime )
if err != nil {
2023-01-31 18:28:44 +01:00
log . Errorf ( " DeleteJobsBefore(%d): error %#v" , startTime , err )
2022-11-25 15:15:05 +01:00
} else {
log . Infof ( "DeleteJobsBefore(%d): Deleted %d jobs" , startTime , cnt )
}
2022-11-11 15:26:27 +01:00
return cnt , err
}
func ( r * JobRepository ) DeleteJobById ( id int64 ) error {
_ , err := r . DB . Exec ( ` DELETE FROM job WHERE job.id = ? ` , id )
2022-11-25 15:15:05 +01:00
if err != nil {
2023-01-31 18:28:44 +01:00
log . Errorf ( "DeleteJobById(%d): error %#v" , id , err )
2022-11-25 15:15:05 +01:00
} else {
log . Infof ( "DeleteJobById(%d): Success" , id )
}
2022-11-11 15:26:27 +01:00
return err
}
2022-02-24 11:54:36 +01:00
// TODO: Use node hours instead: SELECT job.user, sum(job.num_nodes * (CASE WHEN job.job_state = "running" THEN CAST(strftime('%s', 'now') AS INTEGER) - job.start_time ELSE job.duration END)) as x FROM job GROUP BY user ORDER BY x DESC;
2022-10-04 10:12:35 +02:00
func ( r * JobRepository ) CountGroupedJobs (
ctx context . Context ,
aggreg model . Aggregate ,
filters [ ] * model . JobFilter ,
weight * model . Weights ,
limit * int ) ( map [ string ] int , error ) {
2023-02-20 15:08:23 +01:00
start := time . Now ( )
2022-02-19 10:28:29 +01:00
if ! aggreg . IsValid ( ) {
return nil , errors . New ( "invalid aggregate" )
}
2022-03-25 10:20:33 +01:00
runner := ( sq . BaseRunner ) ( r . stmtCache )
count := "count(*) as count"
if weight != nil {
switch * weight {
case model . WeightsNodeCount :
count = "sum(job.num_nodes) as count"
case model . WeightsNodeHours :
now := time . Now ( ) . Unix ( )
count = fmt . Sprintf ( ` sum(job.num_nodes * (CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END)) as count ` , now )
runner = r . DB
2023-01-24 12:02:29 +01:00
default :
2023-02-15 11:50:51 +01:00
log . Infof ( "CountGroupedJobs() Weight %v unknown." , * weight )
2022-03-25 10:20:33 +01:00
}
}
2023-02-21 17:17:41 +01:00
q , qerr := SecurityCheck ( ctx , sq . Select ( "job." + string ( aggreg ) , count ) . From ( "job" ) . GroupBy ( "job." + string ( aggreg ) ) . OrderBy ( "count DESC" ) )
2023-01-27 18:36:58 +01:00
if qerr != nil {
return nil , qerr
}
2022-02-19 10:28:29 +01:00
for _ , f := range filters {
q = BuildWhereClause ( f , q )
2022-02-16 12:29:54 +01:00
}
2022-02-19 10:28:29 +01:00
if limit != nil {
q = q . Limit ( uint64 ( * limit ) )
2022-02-16 12:29:54 +01:00
}
counts := map [ string ] int { }
2022-03-25 10:20:33 +01:00
rows , err := q . RunWith ( runner ) . Query ( )
2022-02-16 12:29:54 +01:00
if err != nil {
2023-01-31 18:28:44 +01:00
log . Error ( "Error while running query" )
2022-02-16 12:29:54 +01:00
return nil , err
}
for rows . Next ( ) {
2022-02-19 10:28:29 +01:00
var group string
2022-02-16 12:29:54 +01:00
var count int
2022-02-19 10:28:29 +01:00
if err := rows . Scan ( & group , & count ) ; err != nil {
2023-02-01 11:58:27 +01:00
log . Warn ( "Error while scanning rows" )
2022-02-16 12:29:54 +01:00
return nil , err
}
2022-02-19 10:28:29 +01:00
counts [ group ] = count
2022-02-16 12:29:54 +01:00
}
2023-02-21 12:08:06 +01:00
log . Infof ( "Timer CountGroupedJobs %s" , time . Since ( start ) )
2022-02-16 12:29:54 +01:00
return counts , nil
}
2022-02-15 13:18:27 +01:00
func ( r * JobRepository ) UpdateMonitoringStatus ( job int64 , monitoringStatus int32 ) ( err error ) {
stmt := sq . Update ( "job" ) .
Set ( "monitoring_status" , monitoringStatus ) .
Where ( "job.id = ?" , job )
2022-02-22 09:25:41 +01:00
_ , err = stmt . RunWith ( r . stmtCache ) . Exec ( )
2022-02-15 13:18:27 +01:00
return
}
2022-02-15 11:10:49 +01:00
// Stop updates the job with the database id jobId using the provided arguments.
2022-12-08 15:04:58 +01:00
func ( r * JobRepository ) MarkArchived (
2022-02-15 11:10:49 +01:00
jobId int64 ,
monitoringStatus int32 ,
2022-02-15 13:18:27 +01:00
metricStats map [ string ] schema . JobStatistics ) error {
2022-02-15 11:10:49 +01:00
stmt := sq . Update ( "job" ) .
Set ( "monitoring_status" , monitoringStatus ) .
Where ( "job.id = ?" , jobId )
2022-02-07 07:09:47 +01:00
for metric , stats := range metricStats {
switch metric {
case "flops_any" :
stmt = stmt . Set ( "flops_any_avg" , stats . Avg )
case "mem_used" :
stmt = stmt . Set ( "mem_used_max" , stats . Max )
case "mem_bw" :
stmt = stmt . Set ( "mem_bw_avg" , stats . Avg )
case "load" :
2023-06-15 11:07:48 +02:00
case "cpu_load" :
2022-02-07 07:09:47 +01:00
stmt = stmt . Set ( "load_avg" , stats . Avg )
case "net_bw" :
stmt = stmt . Set ( "net_bw_avg" , stats . Avg )
case "file_bw" :
stmt = stmt . Set ( "file_bw_avg" , stats . Avg )
2023-01-24 12:02:29 +01:00
default :
2023-02-15 11:50:51 +01:00
log . Infof ( "MarkArchived() Metric '%v' unknown" , metric )
2022-02-07 07:09:47 +01:00
}
}
2022-02-22 09:25:41 +01:00
if _ , err := stmt . RunWith ( r . stmtCache ) . Exec ( ) ; err != nil {
2023-02-01 11:58:27 +01:00
log . Warn ( "Error while marking job as archived" )
2022-02-15 13:18:27 +01:00
return err
2022-02-07 07:09:47 +01:00
}
2022-02-15 13:18:27 +01:00
return nil
2022-02-07 07:09:47 +01:00
}
2022-12-08 15:04:58 +01:00
// Archiving worker thread
2023-02-13 13:53:24 +01:00
func ( r * JobRepository ) archivingWorker ( ) {
2022-12-08 15:04:58 +01:00
for {
select {
2023-02-13 13:53:24 +01:00
case job , ok := <- r . archiveChannel :
2022-12-08 15:04:58 +01:00
if ! ok {
break
}
// not using meta data, called to load JobMeta into Cache?
// will fail if job meta not in repository
if _ , err := r . FetchMetadata ( job ) ; err != nil {
log . Errorf ( "archiving job (dbid: %d) failed: %s" , job . ID , err . Error ( ) )
r . UpdateMonitoringStatus ( job . ID , schema . MonitoringStatusArchivingFailed )
continue
}
// metricdata.ArchiveJob will fetch all the data from a MetricDataRepository and push into configured archive backend
// TODO: Maybe use context with cancel/timeout here
jobMeta , err := metricdata . ArchiveJob ( job , context . Background ( ) )
if err != nil {
log . Errorf ( "archiving job (dbid: %d) failed: %s" , job . ID , err . Error ( ) )
r . UpdateMonitoringStatus ( job . ID , schema . MonitoringStatusArchivingFailed )
continue
}
// Update the jobs database entry one last time:
if err := r . MarkArchived ( job . ID , schema . MonitoringStatusArchivingSuccessful , jobMeta . Statistics ) ; err != nil {
log . Errorf ( "archiving job (dbid: %d) failed: %s" , job . ID , err . Error ( ) )
continue
}
log . Printf ( "archiving job (dbid: %d) successful" , job . ID )
r . archivePending . Done ( )
}
}
}
2022-02-09 15:03:12 +01:00
2022-12-08 15:04:58 +01:00
// Trigger async archiving
2023-02-13 13:53:24 +01:00
func ( r * JobRepository ) TriggerArchiving ( job * schema . Job ) {
2022-12-08 15:04:58 +01:00
r . archivePending . Add ( 1 )
r . archiveChannel <- job
}
// Wait for background thread to finish pending archiving operations
2023-02-13 13:53:24 +01:00
func ( r * JobRepository ) WaitForArchiving ( ) {
2022-12-08 15:04:58 +01:00
// close channel and wait for worker to process remaining jobs
r . archivePending . Wait ( )
}
2022-10-04 10:12:35 +02:00
2023-01-12 11:26:01 +01:00
var ErrNotFound = errors . New ( "no such jobname, project or user" )
var ErrForbidden = errors . New ( "not authorized" )
2022-02-09 15:03:12 +01:00
2023-01-13 09:58:56 +01:00
// FindJobnameOrUserOrProject returns a jobName or a username or a projectId if a jobName or user or project matches the search term.
// If query is found to be an integer (= conversion to INT datatype succeeds), skip back to parent call
2022-02-09 15:03:12 +01:00
// If nothing matches the search, `ErrNotFound` is returned.
2022-12-13 09:53:37 +01:00
2023-02-22 16:30:01 +01:00
func ( r * JobRepository ) FindUserOrProjectOrJobname ( ctx context . Context , searchterm string ) ( username string , project string , metasnip string , err error ) {
2023-01-12 11:26:01 +01:00
if _ , err := strconv . Atoi ( searchterm ) ; err == nil { // Return empty on successful conversion: parent method will redirect for integer jobId
return "" , "" , "" , nil
2023-02-22 16:30:01 +01:00
} else { // Has to have letters and logged-in user for other guesses
user := auth . GetUser ( ctx )
if user != nil {
// Find username in jobs (match)
uresult , _ := r . FindColumnValue ( user , searchterm , "job" , "user" , "user" , false )
if uresult != "" {
return uresult , "" , "" , nil
2023-01-12 11:26:01 +01:00
}
2023-02-22 16:30:01 +01:00
// Find username by name (like)
nresult , _ := r . FindColumnValue ( user , searchterm , "user" , "username" , "name" , true )
if nresult != "" {
return nresult , "" , "" , nil
2023-02-17 10:45:27 +01:00
}
2023-02-22 16:30:01 +01:00
// Find projectId in jobs (match)
presult , _ := r . FindColumnValue ( user , searchterm , "job" , "project" , "project" , false )
if presult != "" {
return "" , presult , "" , nil
}
// Still no return (or not authorized for above): Try JobName
// Match Metadata, on hit, parent method redirects to jobName GQL query
err := sq . Select ( "job.cluster" ) . Distinct ( ) . From ( "job" ) .
Where ( "job.meta_data LIKE ?" , "%" + searchterm + "%" ) .
RunWith ( r . stmtCache ) . QueryRow ( ) . Scan ( & metasnip )
2023-01-12 11:26:01 +01:00
if err != nil && err != sql . ErrNoRows {
return "" , "" , "" , err
} else if err == nil {
2023-02-22 16:30:01 +01:00
return "" , "" , metasnip [ 0 : 1 ] , nil
2023-01-12 11:26:01 +01:00
}
2022-02-09 15:03:12 +01:00
}
2023-01-12 11:26:01 +01:00
return "" , "" , "" , ErrNotFound
2022-02-09 15:03:12 +01:00
}
2023-01-12 11:26:01 +01:00
}
2022-02-09 15:03:12 +01:00
2023-02-22 16:30:01 +01:00
func ( r * JobRepository ) FindColumnValue ( user * auth . User , searchterm string , table string , selectColumn string , whereColumn string , isLike bool ) ( result string , err error ) {
compareStr := " = ?"
query := searchterm
2023-05-04 16:27:30 +02:00
if isLike {
2023-02-22 16:30:01 +01:00
compareStr = " LIKE ?"
query = "%" + searchterm + "%"
}
2023-03-06 11:44:38 +01:00
if user . HasAnyRole ( [ ] auth . Role { auth . RoleAdmin , auth . RoleSupport , auth . RoleManager } ) {
2023-06-07 14:13:59 +02:00
theQuery := sq . Select ( table + "." + selectColumn ) . Distinct ( ) . From ( table ) .
Where ( table + "." + whereColumn + compareStr , query )
// theSql, args, theErr := theQuery.ToSql()
// if theErr != nil {
// log.Warn("Error while converting query to sql")
// return "", err
// }
// log.Debugf("SQL query (FindColumnValue): `%s`, args: %#v", theSql, args)
err := theQuery . RunWith ( r . stmtCache ) . QueryRow ( ) . Scan ( & result )
2022-02-09 15:03:12 +01:00
if err != nil && err != sql . ErrNoRows {
2023-02-17 10:45:27 +01:00
return "" , err
2022-02-09 15:03:12 +01:00
} else if err == nil {
2023-02-22 16:30:01 +01:00
return result , nil
2022-02-09 15:03:12 +01:00
}
2023-02-17 10:45:27 +01:00
return "" , ErrNotFound
} else {
2023-02-22 16:30:01 +01:00
log . Infof ( "Non-Admin User %s : Requested Query '%s' on table '%s' : Forbidden" , user . Name , query , table )
2023-02-17 10:45:27 +01:00
return "" , ErrForbidden
2022-02-09 15:03:12 +01:00
}
2023-02-17 10:45:27 +01:00
}
2022-02-09 15:03:12 +01:00
2023-02-22 16:30:01 +01:00
func ( r * JobRepository ) FindColumnValues ( user * auth . User , query string , table string , selectColumn string , whereColumn string ) ( results [ ] string , err error ) {
2023-02-17 10:45:27 +01:00
emptyResult := make ( [ ] string , 0 )
2023-03-06 11:44:38 +01:00
if user . HasAnyRole ( [ ] auth . Role { auth . RoleAdmin , auth . RoleSupport , auth . RoleManager } ) {
2023-02-22 16:30:01 +01:00
rows , err := sq . Select ( table + "." + selectColumn ) . Distinct ( ) . From ( table ) .
Where ( table + "." + whereColumn + " LIKE ?" , fmt . Sprint ( "%" , query , "%" ) ) .
2023-02-17 10:45:27 +01:00
RunWith ( r . stmtCache ) . Query ( )
2022-02-09 15:03:12 +01:00
if err != nil && err != sql . ErrNoRows {
2023-02-17 10:45:27 +01:00
return emptyResult , err
2022-02-09 15:03:12 +01:00
} else if err == nil {
2023-02-17 10:45:27 +01:00
for rows . Next ( ) {
2023-02-22 16:30:01 +01:00
var result string
err := rows . Scan ( & result )
2023-02-17 10:45:27 +01:00
if err != nil {
rows . Close ( )
log . Warnf ( "Error while scanning rows: %v" , err )
return emptyResult , err
}
2023-02-22 16:30:01 +01:00
results = append ( results , result )
2023-02-17 10:45:27 +01:00
}
2023-02-22 16:30:01 +01:00
return results , nil
2022-02-09 15:03:12 +01:00
}
2023-02-17 10:45:27 +01:00
return emptyResult , ErrNotFound
2022-02-09 15:03:12 +01:00
2023-02-17 10:45:27 +01:00
} else {
2023-02-22 16:30:01 +01:00
log . Infof ( "Non-Admin User %s : Requested Query '%s' on table '%s' : Forbidden" , user . Name , query , table )
2023-02-17 10:45:27 +01:00
return emptyResult , ErrForbidden
2022-02-09 15:03:12 +01:00
}
}
2022-03-14 09:08:02 +01:00
func ( r * JobRepository ) Partitions ( cluster string ) ( [ ] string , error ) {
var err error
2023-02-20 15:08:23 +01:00
start := time . Now ( )
2022-03-14 09:08:02 +01:00
partitions := r . cache . Get ( "partitions:" + cluster , func ( ) ( interface { } , time . Duration , int ) {
parts := [ ] string { }
if err = r . DB . Select ( & parts , ` SELECT DISTINCT job.partition FROM job WHERE job.cluster = ?; ` , cluster ) ; err != nil {
return nil , 0 , 1000
}
return parts , 1 * time . Hour , 1
} )
if err != nil {
return nil , err
}
2023-02-21 12:08:06 +01:00
log . Infof ( "Timer Partitions %s" , time . Since ( start ) )
2022-03-14 09:08:02 +01:00
return partitions . ( [ ] string ) , nil
}
2022-03-24 10:32:08 +01:00
2022-03-24 16:08:47 +01:00
// AllocatedNodes returns a map of all subclusters to a map of hostnames to the amount of jobs running on that host.
// Hosts with zero jobs running on them will not show up!
func ( r * JobRepository ) AllocatedNodes ( cluster string ) ( map [ string ] map [ string ] int , error ) {
2022-10-04 10:12:35 +02:00
2023-02-20 15:08:23 +01:00
start := time . Now ( )
2022-03-24 16:08:47 +01:00
subclusters := make ( map [ string ] map [ string ] int )
rows , err := sq . Select ( "resources" , "subcluster" ) . From ( "job" ) .
2022-03-24 10:32:08 +01:00
Where ( "job.job_state = 'running'" ) .
Where ( "job.cluster = ?" , cluster ) .
RunWith ( r . stmtCache ) . Query ( )
if err != nil {
2023-01-31 18:28:44 +01:00
log . Error ( "Error while running query" )
2022-03-24 10:32:08 +01:00
return nil , err
}
var raw [ ] byte
defer rows . Close ( )
for rows . Next ( ) {
raw = raw [ 0 : 0 ]
var resources [ ] * schema . Resource
2022-03-24 16:08:47 +01:00
var subcluster string
if err := rows . Scan ( & raw , & subcluster ) ; err != nil {
2023-02-01 11:58:27 +01:00
log . Warn ( "Error while scanning rows" )
2022-03-24 10:32:08 +01:00
return nil , err
}
if err := json . Unmarshal ( raw , & resources ) ; err != nil {
2023-02-01 11:58:27 +01:00
log . Warn ( "Error while unmarshaling raw resources json" )
2022-03-24 10:32:08 +01:00
return nil , err
}
2022-03-24 16:08:47 +01:00
hosts , ok := subclusters [ subcluster ]
if ! ok {
hosts = make ( map [ string ] int )
subclusters [ subcluster ] = hosts
}
2022-03-24 10:32:08 +01:00
for _ , resource := range resources {
2022-03-24 16:08:47 +01:00
hosts [ resource . Hostname ] += 1
2022-03-24 10:32:08 +01:00
}
}
2023-02-21 12:08:06 +01:00
log . Infof ( "Timer AllocatedNodes %s" , time . Since ( start ) )
2022-03-24 16:08:47 +01:00
return subclusters , nil
2022-03-24 10:32:08 +01:00
}
2022-04-07 09:50:32 +02:00
func ( r * JobRepository ) StopJobsExceedingWalltimeBy ( seconds int ) error {
2022-10-04 10:12:35 +02:00
2023-02-20 15:08:23 +01:00
start := time . Now ( )
2022-04-07 09:50:32 +02:00
res , err := sq . Update ( "job" ) .
Set ( "monitoring_status" , schema . MonitoringStatusArchivingFailed ) .
Set ( "duration" , 0 ) .
Set ( "job_state" , schema . JobStateFailed ) .
Where ( "job.job_state = 'running'" ) .
2022-05-09 11:53:41 +02:00
Where ( "job.walltime > 0" ) .
2022-04-07 09:50:32 +02:00
Where ( fmt . Sprintf ( "(%d - job.start_time) > (job.walltime + %d)" , time . Now ( ) . Unix ( ) , seconds ) ) .
RunWith ( r . DB ) . Exec ( )
if err != nil {
2023-02-01 11:58:27 +01:00
log . Warn ( "Error while stopping jobs exceeding walltime" )
2022-04-07 09:50:32 +02:00
return err
}
rowsAffected , err := res . RowsAffected ( )
if err != nil {
2023-02-01 11:58:27 +01:00
log . Warn ( "Error while fetching affected rows after stopping due to exceeded walltime" )
2022-04-07 09:50:32 +02:00
return err
}
if rowsAffected > 0 {
2023-02-15 11:50:51 +01:00
log . Infof ( "%d jobs have been marked as failed due to running too long" , rowsAffected )
2022-04-07 09:50:32 +02:00
}
2023-02-21 12:08:06 +01:00
log . Infof ( "Timer StopJobsExceedingWalltimeBy %s" , time . Since ( start ) )
2022-04-07 09:50:32 +02:00
return nil
}
2023-02-13 13:53:24 +01:00
2023-06-10 07:49:02 +02:00
func ( r * JobRepository ) FindJobsBetween ( startTimeBegin int64 , startTimeEnd int64 ) ( [ ] * schema . Job , error ) {
2023-05-09 16:33:26 +02:00
2023-06-10 07:49:02 +02:00
var query sq . SelectBuilder
2023-05-09 16:33:26 +02:00
2023-06-10 07:49:02 +02:00
if startTimeBegin == startTimeEnd || startTimeBegin > startTimeEnd {
return nil , errors . New ( "startTimeBegin is equal or larger startTimeEnd" )
}
if startTimeBegin == 0 {
query = sq . Select ( jobColumns ... ) . From ( "job" ) . Where ( fmt . Sprintf (
"job.start_time < %d" , startTimeEnd ) )
} else {
query = sq . Select ( jobColumns ... ) . From ( "job" ) . Where ( fmt . Sprintf (
"job.start_time BETWEEN %d AND %d" , startTimeBegin , startTimeEnd ) )
2023-05-09 16:33:26 +02:00
}
rows , err := query . RunWith ( r . stmtCache ) . Query ( )
if err != nil {
log . Error ( "Error while running query" )
return nil , err
}
jobs := make ( [ ] * schema . Job , 0 , 50 )
for rows . Next ( ) {
job , err := scanJob ( rows )
if err != nil {
rows . Close ( )
log . Warn ( "Error while scanning rows" )
return nil , err
}
jobs = append ( jobs , job )
}
return jobs , nil
}
2023-04-28 08:49:58 +02:00
const NamedJobInsert string = ` INSERT INTO job (
job_id , user , project , cluster , subcluster , ` + " ` partition ` " + ` , array_job_id , num_nodes , num_hwthreads , num_acc ,
exclusive , monitoring_status , smt , job_state , start_time , duration , walltime , resources , meta_data ,
mem_used_max , flops_any_avg , mem_bw_avg , load_avg , net_bw_avg , net_data_vol_total , file_bw_avg , file_data_vol_total
) VALUES (
: job_id , : user , : project , : cluster , : subcluster , : partition , : array_job_id , : num_nodes , : num_hwthreads , : num_acc ,
: exclusive , : monitoring_status , : smt , : job_state , : start_time , : duration , : walltime , : resources , : meta_data ,
: mem_used_max , : flops_any_avg , : mem_bw_avg , : load_avg , : net_bw_avg , : net_data_vol_total , : file_bw_avg , : file_data_vol_total
) ; `
func ( r * JobRepository ) InsertJob ( job * schema . Job ) ( int64 , error ) {
res , err := r . DB . NamedExec ( NamedJobInsert , job )
if err != nil {
log . Warn ( "Error while NamedJobInsert" )
return 0 , err
}
id , err := res . LastInsertId ( )
if err != nil {
log . Warn ( "Error while getting last insert ID" )
return 0 , err
}
return id , nil
}