From 3c66840f953cdba46a23f7c32e71e6186e830489 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Fri, 23 May 2025 10:13:59 +0200 Subject: [PATCH] Add tagger config option and command line switch to run taggers on all jobs --- cmd/cc-backend/cli.go | 6 ++-- cmd/cc-backend/main.go | 11 ++++++- internal/repository/job.go | 1 + internal/repository/jobFind.go | 29 +++++++++++++++++++ internal/tagger/tagger.go | 53 ++++++++++++++++++++++++++-------- pkg/schema/config.go | 2 ++ 6 files changed, 87 insertions(+), 15 deletions(-) diff --git a/cmd/cc-backend/cli.go b/cmd/cc-backend/cli.go index 8d9e7e6..8b826bb 100644 --- a/cmd/cc-backend/cli.go +++ b/cmd/cc-backend/cli.go @@ -7,8 +7,9 @@ package main import "flag" var ( - flagReinitDB, flagInit, flagServer, flagSyncLDAP, flagGops, flagMigrateDB, flagRevertDB, flagForceDB, flagDev, flagVersion, flagLogDateTime bool - flagNewUser, flagDelUser, flagGenJWT, flagConfigFile, flagImportJob, flagLogLevel string + flagReinitDB, flagInit, flagServer, flagSyncLDAP, flagGops, flagMigrateDB, flagRevertDB, + flagForceDB, flagDev, flagVersion, flagLogDateTime, flagApplyTags bool + flagNewUser, flagDelUser, flagGenJWT, flagConfigFile, flagImportJob, flagLogLevel string ) func cliInit() { @@ -21,6 +22,7 @@ func cliInit() { flag.BoolVar(&flagVersion, "version", false, "Show version information and exit") flag.BoolVar(&flagMigrateDB, "migrate-db", false, "Migrate database to supported version and exit") flag.BoolVar(&flagRevertDB, "revert-db", false, "Migrate database to previous version and exit") + flag.BoolVar(&flagApplyTags, "apply-tags", false, "Run taggers on all completed jobs and exit") flag.BoolVar(&flagForceDB, "force-db", false, "Force database version, clear dirty flag and exit") flag.BoolVar(&flagLogDateTime, "logdate", false, "Set this flag to add date and time to log messages") flag.StringVar(&flagConfigFile, "config", "./config.json", "Specify alternative path to `config.json`") diff --git a/cmd/cc-backend/main.go b/cmd/cc-backend/main.go index cbfccef..cd2d08d 100644 --- a/cmd/cc-backend/main.go +++ b/cmd/cc-backend/main.go @@ -213,12 +213,21 @@ func main() { } } + if flagApplyTags { + if err := tagger.RunTaggers(); err != nil { + log.Abortf("Running job taggers.\nError: %s\n", err.Error()) + } + } + if !flagServer { log.Exit("No errors, server flag not set. Exiting cc-backend.") } archiver.Start(repository.GetJobRepository()) - tagger.Init() + + if config.Keys.EnableJobTaggers { + tagger.Init() + } taskManager.Start() serverInit() diff --git a/internal/repository/job.go b/internal/repository/job.go index 73a2588..97ca280 100644 --- a/internal/repository/job.go +++ b/internal/repository/job.go @@ -472,6 +472,7 @@ func (r *JobRepository) StopJobsExceedingWalltimeBy(seconds int) error { return nil } +// FIXME: Reconsider filtering short jobs with harcoded threshold func (r *JobRepository) FindRunningJobs(cluster string) ([]*schema.Job, error) { query := sq.Select(jobColumns...).From("job"). Where(fmt.Sprintf("job.cluster = '%s'", cluster)). diff --git a/internal/repository/jobFind.go b/internal/repository/jobFind.go index ac09355..614b7c0 100644 --- a/internal/repository/jobFind.go +++ b/internal/repository/jobFind.go @@ -103,6 +103,35 @@ func (r *JobRepository) FindAll( return jobs, nil } +// Get complete joblist only consisting of db ids. +// This is useful to process large job counts and intended to be used +// together with FindById to process jobs one by one +func (r *JobRepository) GetJobList() ([]int64, error) { + query := sq.Select("id").From("job"). + Where("job.job_state != 'running'") + + rows, err := query.RunWith(r.stmtCache).Query() + if err != nil { + log.Error("Error while running query") + return nil, err + } + + jl := make([]int64, 0, 1000) + for rows.Next() { + var id int64 + err := rows.Scan(&id) + if err != nil { + rows.Close() + log.Warn("Error while scanning rows") + return nil, err + } + jl = append(jl, id) + } + + log.Infof("Return job count %d", len(jl)) + return jl, nil +} + // FindById executes a SQL query to find a specific batch job. // The job is queried using the database id. // It returns a pointer to a schema.Job data structure and an error variable. diff --git a/internal/tagger/tagger.go b/internal/tagger/tagger.go index ffdd011..da32fc4 100644 --- a/internal/tagger/tagger.go +++ b/internal/tagger/tagger.go @@ -8,6 +8,7 @@ import ( "sync" "github.com/ClusterCockpit/cc-backend/internal/repository" + "github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/schema" ) @@ -26,30 +27,58 @@ type JobTagger struct { stopTaggers []Tagger } +func newTagger() { + jobTagger = &JobTagger{} + jobTagger.startTaggers = make([]Tagger, 0) + jobTagger.startTaggers = append(jobTagger.startTaggers, &AppTagger{}) + jobTagger.stopTaggers = make([]Tagger, 0) + jobTagger.stopTaggers = append(jobTagger.startTaggers, &JobClassTagger{}) + + for _, tagger := range jobTagger.startTaggers { + tagger.Register() + } +} + func Init() { initOnce.Do(func() { - jobTagger = &JobTagger{} - jobTagger.startTaggers = make([]Tagger, 0) - jobTagger.startTaggers = append(jobTagger.startTaggers, &AppTagger{}) - jobTagger.stopTaggers = make([]Tagger, 0) - jobTagger.stopTaggers = append(jobTagger.startTaggers, &JobClassTagger{}) - - for _, tagger := range jobTagger.startTaggers { - tagger.Register() - } - + newTagger() repository.RegisterJobJook(jobTagger) }) } func (jt *JobTagger) JobStartCallback(job *schema.Job) { - for _, tagger := range jobTagger.startTaggers { + for _, tagger := range jt.startTaggers { tagger.Match(job) } } func (jt *JobTagger) JobStopCallback(job *schema.Job) { - for _, tagger := range jobTagger.stopTaggers { + for _, tagger := range jt.stopTaggers { tagger.Match(job) } } + +func RunTaggers() error { + newTagger() + r := repository.GetJobRepository() + jl, err := r.GetJobList() + if err != nil { + log.Errorf("Error while getting job list %s", err) + return err + } + + for _, id := range jl { + job, err := r.FindByIdDirect(id) + if err != nil { + log.Errorf("Error while getting job %s", err) + return err + } + for _, tagger := range jobTagger.startTaggers { + tagger.Match(job) + } + for _, tagger := range jobTagger.stopTaggers { + tagger.Match(job) + } + } + return nil +} diff --git a/pkg/schema/config.go b/pkg/schema/config.go index a5caa61..eda3d91 100644 --- a/pkg/schema/config.go +++ b/pkg/schema/config.go @@ -131,6 +131,8 @@ type ProgramConfig struct { // do not write to the job-archive. DisableArchive bool `json:"disable-archive"` + EnableJobTaggers bool `json:"enable-job-taggers"` + // Validate json input against schema Validate bool `json:"validate"`