diff --git a/cmd/cc-backend/main.go b/cmd/cc-backend/main.go index cd2d08d..ab07d28 100644 --- a/cmd/cc-backend/main.go +++ b/cmd/cc-backend/main.go @@ -213,6 +213,10 @@ func main() { } } + if config.Keys.EnableJobTaggers { + tagger.Init() + } + if flagApplyTags { if err := tagger.RunTaggers(); err != nil { log.Abortf("Running job taggers.\nError: %s\n", err.Error()) @@ -225,9 +229,6 @@ func main() { archiver.Start(repository.GetJobRepository()) - if config.Keys.EnableJobTaggers { - tagger.Init() - } taskManager.Start() serverInit() diff --git a/go.mod b/go.mod index 062ee3e..c57d9ed 100644 --- a/go.mod +++ b/go.mod @@ -54,6 +54,8 @@ require ( github.com/go-openapi/swag v0.23.0 // indirect github.com/go-viper/mapstructure/v2 v2.2.1 // indirect github.com/google/uuid v1.6.0 // indirect + github.com/gookit/color v1.5.4 // indirect + github.com/gookit/goutil v0.6.18 // indirect github.com/gorilla/securecookie v1.1.2 // indirect github.com/gorilla/websocket v1.5.3 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect @@ -80,6 +82,7 @@ require ( github.com/sosodev/duration v1.3.1 // indirect github.com/swaggo/files v1.0.1 // indirect github.com/urfave/cli/v2 v2.27.5 // indirect + github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e // indirect github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 // indirect go.uber.org/atomic v1.11.0 // indirect golang.org/x/mod v0.23.0 // indirect diff --git a/go.sum b/go.sum index b4c3781..2102888 100644 --- a/go.sum +++ b/go.sum @@ -101,6 +101,10 @@ github.com/google/gops v0.3.28 h1:2Xr57tqKAmQYRAfG12E+yLcoa2Y42UJo2lOrUFL9ark= github.com/google/gops v0.3.28/go.mod h1:6f6+Nl8LcHrzJwi8+p0ii+vmBFSlB4f8cOOkTJ7sk4c= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gookit/color v1.5.4 h1:FZmqs7XOyGgCAxmWyPslpiok1k05wmY3SJTytgvYFs0= +github.com/gookit/color v1.5.4/go.mod h1:pZJOeOS8DM43rXbp4AZo1n9zCU2qjpcRko0b6/QJi9w= +github.com/gookit/goutil v0.6.18 h1:MUVj0G16flubWT8zYVicIuisUiHdgirPAkmnfD2kKgw= +github.com/gookit/goutil v0.6.18/go.mod h1:AY/5sAwKe7Xck+mEbuxj0n/bc3qwrGNe3Oeulln7zBA= github.com/gorilla/handlers v1.5.2 h1:cLTUSsNkgcwhgRqvCNmdbRWG0A3N4F+M2nWKdScwyEE= github.com/gorilla/handlers v1.5.2/go.mod h1:dX+xVpaxdSw+q0Qek8SSsl3dfMk3jNddUkMzo0GtH0w= github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY= @@ -241,6 +245,8 @@ github.com/urfave/cli/v2 v2.27.5 h1:WoHEJLdsXr6dDWoJgMq/CboDmyY/8HMMH1fTECbih+w= github.com/urfave/cli/v2 v2.27.5/go.mod h1:3Sevf16NykTbInEnD0yKkjDAeZDS0A6bzhBH5hrMvTQ= github.com/vektah/gqlparser/v2 v2.5.22 h1:yaaeJ0fu+nv1vUMW0Hl+aS1eiv1vMfapBNjpffAda1I= github.com/vektah/gqlparser/v2 v2.5.22/go.mod h1:xMl+ta8a5M1Yo1A1Iwt/k7gSpscwSnHZdw7tfhEGfTM= +github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e h1:JVG44RsyaB9T2KIHavMF/ppJZNG9ZpyihvCd0w101no= +github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e/go.mod h1:RbqR21r5mrJuqunuUZ/Dhy/avygyECGrLceyNeo4LiM= github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 h1:gEOO8jv9F4OT7lGCjxCBTO/36wtF6j2nSip77qHd4x4= github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1/go.mod h1:Ohn+xnUBiLI6FVj/9LpzZWtj1/D6lUovWYBkxHVV3aM= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= diff --git a/internal/tagger/classifyJob.go b/internal/tagger/classifyJob.go index f7195e3..bf86894 100644 --- a/internal/tagger/classifyJob.go +++ b/internal/tagger/classifyJob.go @@ -21,6 +21,7 @@ import ( "github.com/ClusterCockpit/cc-backend/pkg/schema" "github.com/expr-lang/expr" "github.com/expr-lang/expr/vm" + "github.com/gookit/goutil/dump" ) //go:embed jobclasses/* @@ -63,13 +64,7 @@ type JobClassTagger struct { cfgPath string } -func (t *JobClassTagger) prepareRule(filename string, fns string) { - b, err := os.ReadFile(filename) - if err != nil { - log.Warnf("prepareRule() > open file error: %v", err) - return - } - +func (t *JobClassTagger) prepareRule(b []byte, fns string) { var rule RuleFormat if err := json.NewDecoder(bytes.NewReader(b)).Decode(&rule); err != nil { log.Warn("Error while decoding raw job meta json") @@ -93,9 +88,7 @@ func (t *JobClassTagger) prepareRule(filename string, fns string) { } // set all required metrics - for _, m := range rule.Metrics { - ri.metrics = append(ri.metrics, m) - } + ri.metrics = append(ri.metrics, rule.Metrics...) // compile requirements for _, r := range rule.Requirements { @@ -107,6 +100,16 @@ func (t *JobClassTagger) prepareRule(filename string, fns string) { ri.requirements = append(ri.requirements, req) } + // compile variables + for _, v := range rule.Variables { + req, err := expr.Compile(v.Expr, expr.AsFloat64()) + if err != nil { + log.Errorf("error compiling requirement %s: %#v", v.Name, err) + return + } + ri.variables = append(ri.variables, ruleVariable{name: v.Name, expr: req}) + } + // compile rule exp, err := expr.Compile(rule.Rule, expr.AsBool()) if err != nil { @@ -116,7 +119,11 @@ func (t *JobClassTagger) prepareRule(filename string, fns string) { ri.rule = exp // prepare hint template - ri.hint = template.Must(template.New(fns).Parse(rule.Hint)) + ri.hint, err = template.New(fns).Parse(rule.Hint) + if err != nil { + log.Errorf("error processing template %s: %#v", fns, err) + } + log.Infof("prepareRule() > processing %s with %d requirements and %d variables", fns, len(ri.requirements), len(ri.variables)) delete(t.rules, rule.Tag) t.rules[rule.Tag] = ri @@ -137,38 +144,59 @@ func (t *JobClassTagger) EventCallback() { fns := fn.Name() log.Debugf("Process: %s", fns) filename := fmt.Sprintf("%s/%s", t.cfgPath, fns) - t.prepareRule(filename, fns) + b, err := os.ReadFile(filename) + if err != nil { + log.Warnf("prepareRule() > open file error: %v", err) + return + } + t.prepareRule(b, fns) } } +func (t *JobClassTagger) initParameters() error { + log.Info("Initialize parameters") + b, err := jobclassFiles.ReadFile("jobclasses/parameters.json") + if err != nil { + log.Warnf("prepareRule() > open file error: %v", err) + return err + } + + if err := json.NewDecoder(bytes.NewReader(b)).Decode(&t.parameters); err != nil { + log.Warn("Error while decoding parameters.json") + return err + } + + return nil +} + func (t *JobClassTagger) Register() error { t.cfgPath = "./var/tagger/jobclasses" t.tagType = "jobClass" - files, err := appFiles.ReadDir("jobclasses") + err := t.initParameters() + if err != nil { + log.Warnf("error reading parameters.json: %v", err) + return err + } + + files, err := jobclassFiles.ReadDir("jobclasses") if err != nil { return fmt.Errorf("error reading app folder: %#v", err) } t.rules = make(map[string]ruleInfo, 0) for _, fn := range files { fns := fn.Name() - filename := fmt.Sprintf("%s/%s", t.cfgPath, fns) + if fns != "parameters.json" { + filename := fmt.Sprintf("jobclasses/%s", fns) + log.Infof("Process: %s", fns) - if fn.Name() == "parameters.json" { - b, err := os.ReadFile(filename) + b, err := jobclassFiles.ReadFile(filename) if err != nil { log.Warnf("prepareRule() > open file error: %v", err) return err } - - if err := json.NewDecoder(bytes.NewReader(b)).Decode(&t.parameters); err != nil { - log.Warn("Error while decoding parameters.json") - return err - } - continue + t.prepareRule(b, fns) } - log.Debugf("Process: %s", fns) - t.prepareRule(filename, fns) } if util.CheckFileExists(t.cfgPath) { @@ -183,6 +211,7 @@ func (t *JobClassTagger) Register() error { func (t *JobClassTagger) Match(job *schema.Job) { r := repository.GetJobRepository() jobstats, err := archive.GetStatistics(job) + log.Infof("Enter match rule with %d rules for job %d", len(t.rules), job.JobID) if err != nil { log.Errorf("job classification failed for job %d: %#v", job.JobID, err) return @@ -191,6 +220,16 @@ func (t *JobClassTagger) Match(job *schema.Job) { for tag, ri := range t.rules { env := make(map[string]any) maps.Copy(env, ri.env) + log.Infof("Try to match rule %s for job %d", tag, job.JobID) + env["job"] = map[string]any{ + "exclusive": job.Exclusive, + "duration": job.Duration, + "numCores": job.NumHWThreads, + "numNodes": job.NumNodes, + "jobState": job.State, + "numAcc": job.NumAcc, + "smt": job.SMT, + } // add metrics to env for _, m := range ri.metrics { @@ -225,21 +264,28 @@ func (t *JobClassTagger) Match(job *schema.Job) { env[v.name] = value } - match, err := expr.Run(ri.rule, job) + dump.P(env) + + match, err := expr.Run(ri.rule, env) if err != nil { log.Errorf("error running rule %s: %#v", tag, err) + return } if match.(bool) { + log.Info("Rule matches!") id := job.ID if !r.HasTag(id, t.tagType, tag) { r.AddTagOrCreateDirect(id, t.tagType, tag) } + } else { + log.Info("Rule does not match!") } // process hint template var msg bytes.Buffer if err := ri.hint.Execute(&msg, env); err != nil { log.Errorf("Template error: %s", err.Error()) + return } // FIXME: Handle case where multiple tags apply diff --git a/internal/tagger/detectApp.go b/internal/tagger/detectApp.go index a37924e..7945b48 100644 --- a/internal/tagger/detectApp.go +++ b/internal/tagger/detectApp.go @@ -79,10 +79,10 @@ func (t *AppTagger) Register() error { fns := fn.Name() log.Debugf("Process: %s", fns) f, err := appFiles.Open(fmt.Sprintf("apps/%s", fns)) - defer f.Close() if err != nil { return fmt.Errorf("error opening app file %s: %#v", fns, err) } + defer f.Close() t.scanApp(f, fns) } @@ -97,7 +97,13 @@ func (t *AppTagger) Register() error { func (t *AppTagger) Match(job *schema.Job) { r := repository.GetJobRepository() - jobscript, ok := job.MetaData["jobScript"] + metadata, err := r.FetchMetadata(job) + if err != nil { + log.Infof("Cannot fetch metadata for job: %d on %s", job.JobID, job.Cluster) + return + } + + jobscript, ok := metadata["jobScript"] if ok { id := job.ID diff --git a/internal/tagger/jobclasses/highload.json b/internal/tagger/jobclasses/highload.json index 29d4026..2715ee8 100644 --- a/internal/tagger/jobclasses/highload.json +++ b/internal/tagger/jobclasses/highload.json @@ -7,27 +7,21 @@ "job_min_duration_seconds", "sampling_interval_seconds" ], - "metrics": [ - "cpu_load" - ], + "metrics": ["cpu_load"], "requirements": [ "job.exclusive == 1", "job.duration > job_min_duration_seconds" ], - "terms": [ - { - "name": "", - "load_mean": "cpu_load[cpu_load_pre_cutoff_samples].mean('all')" - }, + "variables": [ { "name": "load_threshold", - "expr": "(job.numHwthreads/job.numNodes) * excessivecpuload_threshold_factor" + "expr": "(job.numCores / job.numNodes) * excessivecpuload_threshold_factor" }, { "name": "load_perc", - "expr": "load_mean / load_threshold" + "expr": "cpu_load / load_threshold" } ], "rule": "cpu_load > load_threshold", - "hint": "This job was detected as excessiveload because the average cpu load {{ cpu_load }} falls above the threshold {{ load_threshold }}." + "hint": "This job was detected as excessiveload because the average cpu load {{.cpu_load}} falls above the threshold {{.load_threshold}}." } diff --git a/internal/tagger/jobclasses/lowload.json b/internal/tagger/jobclasses/lowload.json index 3c5bd4d..4c21a6b 100644 --- a/internal/tagger/jobclasses/lowload.json +++ b/internal/tagger/jobclasses/lowload.json @@ -6,9 +6,7 @@ "job_min_duration_seconds", "sampling_interval_seconds" ], - "metrics": [ - "cpu_load" - ], + "metrics": ["cpu_load"], "requirements": [ "job.exclusive == 1", "job.duration > job_min_duration_seconds" @@ -16,7 +14,7 @@ "variables": [ { "name": "load_threshold", - "expr": "job.numHwthreads * lowcpuload_threshold_factor" + "expr": "job.numCores * lowcpuload_threshold_factor" }, { "name": "load_perc", @@ -24,5 +22,5 @@ } ], "rule": "cpu_load < load_threshold", - "hint": "This job was detected as lowload because the average cpu load {{ cpu_load }} falls below the threshold {{ load_threshold }}." + "hint": "This job was detected as lowload because the average cpu load {{.cpu_load}} falls below the threshold {{.load_threshold}}." } diff --git a/internal/tagger/tagger.go b/internal/tagger/tagger.go index da32fc4..04edd49 100644 --- a/internal/tagger/tagger.go +++ b/internal/tagger/tagger.go @@ -32,11 +32,14 @@ func newTagger() { jobTagger.startTaggers = make([]Tagger, 0) jobTagger.startTaggers = append(jobTagger.startTaggers, &AppTagger{}) jobTagger.stopTaggers = make([]Tagger, 0) - jobTagger.stopTaggers = append(jobTagger.startTaggers, &JobClassTagger{}) + jobTagger.stopTaggers = append(jobTagger.stopTaggers, &JobClassTagger{}) for _, tagger := range jobTagger.startTaggers { tagger.Register() } + for _, tagger := range jobTagger.stopTaggers { + tagger.Register() + } } func Init() { @@ -77,6 +80,7 @@ func RunTaggers() error { tagger.Match(job) } for _, tagger := range jobTagger.stopTaggers { + log.Infof("Run stop tagger for job %d", job.ID) tagger.Match(job) } }