Add tagger config option and command line switch to run taggers on all jobs

This commit is contained in:
Jan Eitzinger 2025-05-23 10:13:59 +02:00
parent 733e3ea9d5
commit 3c66840f95
6 changed files with 87 additions and 15 deletions

View File

@ -7,8 +7,9 @@ package main
import "flag"
var (
flagReinitDB, flagInit, flagServer, flagSyncLDAP, flagGops, flagMigrateDB, flagRevertDB, flagForceDB, flagDev, flagVersion, flagLogDateTime bool
flagNewUser, flagDelUser, flagGenJWT, flagConfigFile, flagImportJob, flagLogLevel string
flagReinitDB, flagInit, flagServer, flagSyncLDAP, flagGops, flagMigrateDB, flagRevertDB,
flagForceDB, flagDev, flagVersion, flagLogDateTime, flagApplyTags bool
flagNewUser, flagDelUser, flagGenJWT, flagConfigFile, flagImportJob, flagLogLevel string
)
func cliInit() {
@ -21,6 +22,7 @@ func cliInit() {
flag.BoolVar(&flagVersion, "version", false, "Show version information and exit")
flag.BoolVar(&flagMigrateDB, "migrate-db", false, "Migrate database to supported version and exit")
flag.BoolVar(&flagRevertDB, "revert-db", false, "Migrate database to previous version and exit")
flag.BoolVar(&flagApplyTags, "apply-tags", false, "Run taggers on all completed jobs and exit")
flag.BoolVar(&flagForceDB, "force-db", false, "Force database version, clear dirty flag and exit")
flag.BoolVar(&flagLogDateTime, "logdate", false, "Set this flag to add date and time to log messages")
flag.StringVar(&flagConfigFile, "config", "./config.json", "Specify alternative path to `config.json`")

View File

@ -213,12 +213,21 @@ func main() {
}
}
if flagApplyTags {
if err := tagger.RunTaggers(); err != nil {
log.Abortf("Running job taggers.\nError: %s\n", err.Error())
}
}
if !flagServer {
log.Exit("No errors, server flag not set. Exiting cc-backend.")
}
archiver.Start(repository.GetJobRepository())
tagger.Init()
if config.Keys.EnableJobTaggers {
tagger.Init()
}
taskManager.Start()
serverInit()

View File

@ -472,6 +472,7 @@ func (r *JobRepository) StopJobsExceedingWalltimeBy(seconds int) error {
return nil
}
// FIXME: Reconsider filtering short jobs with harcoded threshold
func (r *JobRepository) FindRunningJobs(cluster string) ([]*schema.Job, error) {
query := sq.Select(jobColumns...).From("job").
Where(fmt.Sprintf("job.cluster = '%s'", cluster)).

View File

@ -103,6 +103,35 @@ func (r *JobRepository) FindAll(
return jobs, nil
}
// Get complete joblist only consisting of db ids.
// This is useful to process large job counts and intended to be used
// together with FindById to process jobs one by one
func (r *JobRepository) GetJobList() ([]int64, error) {
query := sq.Select("id").From("job").
Where("job.job_state != 'running'")
rows, err := query.RunWith(r.stmtCache).Query()
if err != nil {
log.Error("Error while running query")
return nil, err
}
jl := make([]int64, 0, 1000)
for rows.Next() {
var id int64
err := rows.Scan(&id)
if err != nil {
rows.Close()
log.Warn("Error while scanning rows")
return nil, err
}
jl = append(jl, id)
}
log.Infof("Return job count %d", len(jl))
return jl, nil
}
// FindById executes a SQL query to find a specific batch job.
// The job is queried using the database id.
// It returns a pointer to a schema.Job data structure and an error variable.

View File

@ -8,6 +8,7 @@ import (
"sync"
"github.com/ClusterCockpit/cc-backend/internal/repository"
"github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
)
@ -26,30 +27,58 @@ type JobTagger struct {
stopTaggers []Tagger
}
func newTagger() {
jobTagger = &JobTagger{}
jobTagger.startTaggers = make([]Tagger, 0)
jobTagger.startTaggers = append(jobTagger.startTaggers, &AppTagger{})
jobTagger.stopTaggers = make([]Tagger, 0)
jobTagger.stopTaggers = append(jobTagger.startTaggers, &JobClassTagger{})
for _, tagger := range jobTagger.startTaggers {
tagger.Register()
}
}
func Init() {
initOnce.Do(func() {
jobTagger = &JobTagger{}
jobTagger.startTaggers = make([]Tagger, 0)
jobTagger.startTaggers = append(jobTagger.startTaggers, &AppTagger{})
jobTagger.stopTaggers = make([]Tagger, 0)
jobTagger.stopTaggers = append(jobTagger.startTaggers, &JobClassTagger{})
for _, tagger := range jobTagger.startTaggers {
tagger.Register()
}
newTagger()
repository.RegisterJobJook(jobTagger)
})
}
func (jt *JobTagger) JobStartCallback(job *schema.Job) {
for _, tagger := range jobTagger.startTaggers {
for _, tagger := range jt.startTaggers {
tagger.Match(job)
}
}
func (jt *JobTagger) JobStopCallback(job *schema.Job) {
for _, tagger := range jobTagger.stopTaggers {
for _, tagger := range jt.stopTaggers {
tagger.Match(job)
}
}
func RunTaggers() error {
newTagger()
r := repository.GetJobRepository()
jl, err := r.GetJobList()
if err != nil {
log.Errorf("Error while getting job list %s", err)
return err
}
for _, id := range jl {
job, err := r.FindByIdDirect(id)
if err != nil {
log.Errorf("Error while getting job %s", err)
return err
}
for _, tagger := range jobTagger.startTaggers {
tagger.Match(job)
}
for _, tagger := range jobTagger.stopTaggers {
tagger.Match(job)
}
}
return nil
}

View File

@ -131,6 +131,8 @@ type ProgramConfig struct {
// do not write to the job-archive.
DisableArchive bool `json:"disable-archive"`
EnableJobTaggers bool `json:"enable-job-taggers"`
// Validate json input against schema
Validate bool `json:"validate"`