diff --git a/internal/archiver/archiveWorker.go b/internal/archiver/archiveWorker.go index 6e514cb..42a60b9 100644 --- a/internal/archiver/archiveWorker.go +++ b/internal/archiver/archiveWorker.go @@ -73,10 +73,7 @@ func archivingWorker() { log.Debugf("archiving job %d took %s", job.JobID, time.Since(start)) log.Printf("archiving job (dbid: %d) successful", job.ID) - id := job.ID - jobMeta.ID = &id - - repository.CallJobStopHooks(jobMeta) + repository.CallJobStopHooks(job) archivePending.Done() default: continue diff --git a/internal/repository/jobHooks.go b/internal/repository/jobHooks.go index 49535f7..1016335 100644 --- a/internal/repository/jobHooks.go +++ b/internal/repository/jobHooks.go @@ -12,7 +12,7 @@ import ( type JobHook interface { JobStartCallback(job *schema.Job) - JobStopCallback(job *schema.JobMeta) + JobStopCallback(job *schema.Job) } var ( @@ -44,7 +44,7 @@ func CallJobStartHooks(jobs []*schema.Job) { } } -func CallJobStopHooks(job *schema.JobMeta) { +func CallJobStopHooks(job *schema.Job) { if hooks == nil { return } diff --git a/internal/tagger/classifyJob.go b/internal/tagger/classifyJob.go index ec1e843..f7195e3 100644 --- a/internal/tagger/classifyJob.go +++ b/internal/tagger/classifyJob.go @@ -7,14 +7,16 @@ package tagger import ( "bytes" "embed" + "encoding/json" "fmt" - "io/fs" + "maps" "os" - "path/filepath" "strings" + "text/template" "github.com/ClusterCockpit/cc-backend/internal/repository" "github.com/ClusterCockpit/cc-backend/internal/util" + "github.com/ClusterCockpit/cc-backend/pkg/archive" "github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/schema" "github.com/expr-lang/expr" @@ -24,31 +26,100 @@ import ( //go:embed jobclasses/* var jobclassFiles embed.FS +type Variable struct { + Name string `json:"name"` + Expr string `json:"expr"` +} + +type ruleVariable struct { + name string + expr *vm.Program +} + +type RuleFormat struct { + Name string `json:"name"` + Tag string `json:"tag"` + Parameters []string `json:"parameters"` + Metrics []string `json:"metrics"` + Requirements []string `json:"requirements"` + Variables []Variable `json:"variables"` + Rule string `json:"rule"` + Hint string `json:"hint"` +} + type ruleInfo struct { - tag string - rule *vm.Program + env map[string]any + metrics []string + requirements []*vm.Program + variables []ruleVariable + rule *vm.Program + hint *template.Template } type JobClassTagger struct { - rules map[string]ruleInfo - tagType string - cfgPath string + rules map[string]ruleInfo + parameters map[string]any + tagType string + cfgPath string } -func (t *JobClassTagger) compileRule(f fs.File, fns string) { - buf := new(bytes.Buffer) - _, err := buf.ReadFrom(f) +func (t *JobClassTagger) prepareRule(filename string, fns string) { + b, err := os.ReadFile(filename) if err != nil { - log.Errorf("error reading rule file %s: %#v", fns, err) + log.Warnf("prepareRule() > open file error: %v", err) + return } - prg, err := expr.Compile(buf.String(), expr.AsBool()) + + var rule RuleFormat + if err := json.NewDecoder(bytes.NewReader(b)).Decode(&rule); err != nil { + log.Warn("Error while decoding raw job meta json") + return + } + + ri := ruleInfo{} + ri.env = make(map[string]any) + ri.metrics = make([]string, 0) + ri.requirements = make([]*vm.Program, 0) + ri.variables = make([]ruleVariable, 0) + + // check if all required parameters are available + for _, p := range rule.Parameters { + param, ok := t.parameters[p] + if !ok { + log.Warnf("prepareRule() > missing parameter %s in rule %s", p, fns) + return + } + ri.env[p] = param + } + + // set all required metrics + for _, m := range rule.Metrics { + ri.metrics = append(ri.metrics, m) + } + + // compile requirements + for _, r := range rule.Requirements { + req, err := expr.Compile(r, expr.AsBool()) + if err != nil { + log.Errorf("error compiling requirement %s: %#v", r, err) + return + } + ri.requirements = append(ri.requirements, req) + } + + // compile rule + exp, err := expr.Compile(rule.Rule, expr.AsBool()) if err != nil { log.Errorf("error compiling rule %s: %#v", fns, err) + return } - ri := ruleInfo{tag: strings.TrimSuffix(fns, filepath.Ext(fns)), rule: prg} + ri.rule = exp - delete(t.rules, ri.tag) - t.rules[ri.tag] = ri + // prepare hint template + ri.hint = template.Must(template.New(fns).Parse(rule.Hint)) + + delete(t.rules, rule.Tag) + t.rules[rule.Tag] = ri } func (t *JobClassTagger) EventMatch(s string) bool { @@ -65,11 +136,8 @@ func (t *JobClassTagger) EventCallback() { for _, fn := range files { fns := fn.Name() log.Debugf("Process: %s", fns) - f, err := os.Open(fmt.Sprintf("%s/%s", t.cfgPath, fns)) - if err != nil { - log.Errorf("error opening app file %s: %#v", fns, err) - } - t.compileRule(f, fns) + filename := fmt.Sprintf("%s/%s", t.cfgPath, fns) + t.prepareRule(filename, fns) } } @@ -84,13 +152,23 @@ func (t *JobClassTagger) Register() error { t.rules = make(map[string]ruleInfo, 0) for _, fn := range files { fns := fn.Name() - log.Debugf("Process: %s", fns) - f, err := appFiles.Open(fmt.Sprintf("apps/%s", fns)) - if err != nil { - return fmt.Errorf("error opening app file %s: %#v", fns, err) + filename := fmt.Sprintf("%s/%s", t.cfgPath, fns) + + if fn.Name() == "parameters.json" { + b, err := os.ReadFile(filename) + if err != nil { + log.Warnf("prepareRule() > open file error: %v", err) + return err + } + + if err := json.NewDecoder(bytes.NewReader(b)).Decode(&t.parameters); err != nil { + log.Warn("Error while decoding parameters.json") + return err + } + continue } - defer f.Close() - t.compileRule(f, fns) + log.Debugf("Process: %s", fns) + t.prepareRule(filename, fns) } if util.CheckFileExists(t.cfgPath) { @@ -102,20 +180,69 @@ func (t *JobClassTagger) Register() error { return nil } -func (t *JobClassTagger) Match(job *schema.JobMeta) { +func (t *JobClassTagger) Match(job *schema.Job) { r := repository.GetJobRepository() + jobstats, err := archive.GetStatistics(job) + if err != nil { + log.Errorf("job classification failed for job %d: %#v", job.JobID, err) + return + } - for _, ri := range t.rules { - tag := ri.tag - output, err := expr.Run(ri.rule, job) + for tag, ri := range t.rules { + env := make(map[string]any) + maps.Copy(env, ri.env) + + // add metrics to env + for _, m := range ri.metrics { + stats, ok := jobstats[m] + if !ok { + log.Errorf("job classification failed for job %d: missing metric '%s'", job.JobID, m) + return + } + env[m] = stats.Avg + } + + // check rule requirements apply + for _, r := range ri.requirements { + ok, err := expr.Run(r, env) + if err != nil { + log.Errorf("error running requirement for rule %s: %#v", tag, err) + return + } + if !ok.(bool) { + log.Infof("requirement for rule %s not met", tag) + return + } + } + + // validate rule expression + for _, v := range ri.variables { + value, err := expr.Run(v.expr, env) + if err != nil { + log.Errorf("error running rule %s: %#v", tag, err) + return + } + env[v.name] = value + } + + match, err := expr.Run(ri.rule, job) if err != nil { log.Errorf("error running rule %s: %#v", tag, err) } - if output.(bool) { + if match.(bool) { id := job.ID - if !r.HasTag(*id, t.tagType, tag) { - r.AddTagOrCreateDirect(*id, t.tagType, tag) + if !r.HasTag(id, t.tagType, tag) { + r.AddTagOrCreateDirect(id, t.tagType, tag) } } + + // process hint template + var msg bytes.Buffer + if err := ri.hint.Execute(&msg, env); err != nil { + log.Errorf("Template error: %s", err.Error()) + } + + // FIXME: Handle case where multiple tags apply + r.UpdateMetadata(job, "message", msg.String()) } } diff --git a/internal/tagger/detectApp.go b/internal/tagger/detectApp.go index 8057aad..a37924e 100644 --- a/internal/tagger/detectApp.go +++ b/internal/tagger/detectApp.go @@ -95,7 +95,7 @@ func (t *AppTagger) Register() error { return nil } -func (t *AppTagger) Match(job *schema.JobMeta) { +func (t *AppTagger) Match(job *schema.Job) { r := repository.GetJobRepository() jobscript, ok := job.MetaData["jobScript"] if ok { @@ -106,8 +106,8 @@ func (t *AppTagger) Match(job *schema.JobMeta) { tag := a.tag for _, s := range a.strings { if strings.Contains(jobscript, s) { - if !r.HasTag(*id, t.tagType, tag) { - r.AddTagOrCreateDirect(*id, t.tagType, tag) + if !r.HasTag(id, t.tagType, tag) { + r.AddTagOrCreateDirect(id, t.tagType, tag) break out } } diff --git a/internal/tagger/detectApp_test.go b/internal/tagger/detectApp_test.go index 56bd856..3b43cce 100644 --- a/internal/tagger/detectApp_test.go +++ b/internal/tagger/detectApp_test.go @@ -9,7 +9,6 @@ import ( "github.com/ClusterCockpit/cc-backend/internal/repository" "github.com/ClusterCockpit/cc-backend/pkg/log" - "github.com/ClusterCockpit/cc-backend/pkg/schema" ) func setup(tb testing.TB) *repository.JobRepository { @@ -52,12 +51,7 @@ func TestMatch(t *testing.T) { err = tagger.Register() noErr(t, err) - jobMeta := &schema.JobMeta{ - ID: &job.ID, - BaseJob: job.BaseJob, - StartTime: job.StartTime.Unix(), - } - tagger.Match(jobMeta) + tagger.Match(job) if !r.HasTag(5, "app", "vasp") { t.Errorf("missing tag vasp") diff --git a/internal/tagger/jobclasses/highload.json b/internal/tagger/jobclasses/highload.json index a65f400..29d4026 100644 --- a/internal/tagger/jobclasses/highload.json +++ b/internal/tagger/jobclasses/highload.json @@ -12,27 +12,22 @@ ], "requirements": [ "job.exclusive == 1", - "job.duration > job_min_duration_seconds", - "required_metrics_min_samples > job_min_duration_seconds / sampling_interval_seconds" + "job.duration > job_min_duration_seconds" ], "terms": [ { + "name": "", "load_mean": "cpu_load[cpu_load_pre_cutoff_samples].mean('all')" }, { - "load_threshold": "(job.numHwthreads/job.numNodes) * excessivecpuload_threshold_factor" + "name": "load_threshold", + "expr": "(job.numHwthreads/job.numNodes) * excessivecpuload_threshold_factor" }, { - "highload_nodes": "load_mean > load_threshold" - }, - { - "highload": "highload_nodes.any('all')" - }, - { - "load_perc": "load_mean / load_threshold" + "name": "load_perc", + "expr": "load_mean / load_threshold" } ], - "output": "highload", - "output_scalar": "load_perc", - "template": "Job ({{ job.jobId }})\nThis job was detected as excessiveload because the mean cpu load {{ load_mean.array }} falls above the threshold {{ load_threshold }}." + "rule": "cpu_load > load_threshold", + "hint": "This job was detected as excessiveload because the average cpu load {{ cpu_load }} falls above the threshold {{ load_threshold }}." } diff --git a/internal/tagger/jobclasses/highmem.json b/internal/tagger/jobclasses/highmem.json deleted file mode 100644 index 69ffcf3..0000000 --- a/internal/tagger/jobclasses/highmem.json +++ /dev/null @@ -1,40 +0,0 @@ -{ - "name": "High memory usage", - "tag": "high_memory_load", - "parameters": [ - "high_memory_load_threshold_factor", - "job_min_duration_seconds", - "sampling_interval_seconds" - ], - "metrics": [ - "mem_used" - ], - "requirements": [ - "job.duration > job_min_duration_seconds", - "required_metrics_min_samples > job_min_duration_seconds / sampling_interval_seconds", - "hasattr(job, \"allocated_memory\")" - ], - "terms": [ - { - "memory_alloc": "job.allocated_memory" - }, - { - "memory_used": "mem_used.max('time')" - }, - { - "load_threshold": "memory_alloc * high_memory_load_threshold_factor" - }, - { - "high_mem_nodes": "memory_used > load_threshold" - }, - { - "high_mem": "high_mem_nodes.any('all')" - }, - { - "load_perc": "memory_used / (memory_alloc * high_memory_load_threshold_factor)" - } - ], - "output": "high_mem", - "output_scalar": "load_perc", - "template": "Job ({{ job.jobId }})\nThis job was detected as high_memory_load because the memory usage {{ high_mem_nodes.array }} falls above the threshold {{ load_threshold }}." -} diff --git a/internal/tagger/jobclasses/lowgpuload.json b/internal/tagger/jobclasses/lowgpuload.json deleted file mode 100644 index 80339b2..0000000 --- a/internal/tagger/jobclasses/lowgpuload.json +++ /dev/null @@ -1,36 +0,0 @@ -{ - "name": "Low GPU load", - "tag": "lowgpuload", - "parameters": [ - "lowgpuload_threshold_factor", - "job_min_duration_seconds", - "sampling_interval_seconds" - ], - "metrics": [ - "nv_util" - ], - "requirements": [ - "job.duration > job_min_duration_seconds", - "required_metrics_min_samples > job_min_duration_seconds / sampling_interval_seconds" - ], - "terms": [ - { - "load_mean": "nv_util.mean('all')" - }, - { - "load_threshold": "job.numAcc * lowgpuload_threshold_factor" - }, - { - "lowload_nodes": "load_mean < load_threshold" - }, - { - "lowload": "lowload_nodes.any('all')" - }, - { - "load_perc": "1.0 - (load_mean / load_threshold)" - } - ], - "output": "lowload", - "output_scalar": "load_perc", - "template": "Job ({{ job.jobId }})\nThis job was detected as lowgpuload because the mean gpu load {{ load_mean }} falls below the threshold {{ load_threshold }}." -} diff --git a/internal/tagger/jobclasses/lowload.json b/internal/tagger/jobclasses/lowload.json index e860361..3c5bd4d 100644 --- a/internal/tagger/jobclasses/lowload.json +++ b/internal/tagger/jobclasses/lowload.json @@ -11,28 +11,18 @@ ], "requirements": [ "job.exclusive == 1", - "job.duration > job_min_duration_seconds", - "required_metrics_min_samples > job_min_duration_seconds / sampling_interval_seconds" + "job.duration > job_min_duration_seconds" ], - "tagRule": [ + "variables": [ { - "load_mean": "cpu_load[cpu_load_pre_cutoff_samples:].mean('all')" + "name": "load_threshold", + "expr": "job.numHwthreads * lowcpuload_threshold_factor" }, { - "load_threshold": "job.numHwthreads * lowcpuload_threshold_factor" - }, - { - "lowload_nodes": "load_mean < load_threshold" - }, - { - "lowload": "lowload_nodes.any('all')" - }, - { - "load_perc": "1.0 - (load_mean / load_threshold)" + "name": "load_perc", + "expr": "1.0 - (cpu_load / load_threshold)" } ], - "valueRule": [], - "output": "lowload", - "output_scalar": "load_perc", - "hint": "Job ({{ job.jobId }})\nThis job was detected as lowload because the mean cpu load {{ load_mean }} falls below the threshold {{ load_threshold }}." + "rule": "cpu_load < load_threshold", + "hint": "This job was detected as lowload because the average cpu load {{ cpu_load }} falls below the threshold {{ load_threshold }}." } diff --git a/internal/tagger/jobclasses/parameters.json b/internal/tagger/jobclasses/parameters.json new file mode 100644 index 0000000..39e94c1 --- /dev/null +++ b/internal/tagger/jobclasses/parameters.json @@ -0,0 +1,14 @@ +{ + "lowcpuload_threshold_factor": 0.9, + "excessivecpuload_threshold_factor": 1.1, + "highmemoryusage_threshold_factor": 0.9, + "node_load_imbalance_threshold_factor": 0.1, + "core_load_imbalance_threshold_factor": 0.1, + "high_memory_load_threshold_factor": 0.9, + "lowgpuload_threshold_factor": 0.7, + "memory_leak_slope_threshold": 0.1, + "job_min_duration_seconds": 600.0, + "sampling_interval_seconds": 30.0, + "cpu_load_pre_cutoff_samples": 11.0, + "cpu_load_core_pre_cutoff_samples": 6.0 +} diff --git a/internal/tagger/tagger.go b/internal/tagger/tagger.go index d5e42b1..ffdd011 100644 --- a/internal/tagger/tagger.go +++ b/internal/tagger/tagger.go @@ -13,7 +13,7 @@ import ( type Tagger interface { Register() error - Match(job *schema.JobMeta) + Match(job *schema.Job) } var ( @@ -48,5 +48,8 @@ func (jt *JobTagger) JobStartCallback(job *schema.Job) { } } -func (jt *JobTagger) JobStopCallback(job *schema.JobMeta) { +func (jt *JobTagger) JobStopCallback(job *schema.Job) { + for _, tagger := range jobTagger.stopTaggers { + tagger.Match(job) + } }