// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. // All rights reserved. This file is part of cc-backend. // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. package tagger import ( "bytes" "embed" "encoding/json" "fmt" "maps" "os" "strings" "text/template" "github.com/ClusterCockpit/cc-backend/internal/repository" "github.com/ClusterCockpit/cc-backend/internal/util" "github.com/ClusterCockpit/cc-backend/pkg/archive" "github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/schema" "github.com/expr-lang/expr" "github.com/expr-lang/expr/vm" ) //go:embed jobclasses/* var jobclassFiles embed.FS type Variable struct { Name string `json:"name"` Expr string `json:"expr"` } type ruleVariable struct { name string expr *vm.Program } type RuleFormat struct { Name string `json:"name"` Tag string `json:"tag"` Parameters []string `json:"parameters"` Metrics []string `json:"metrics"` Requirements []string `json:"requirements"` Variables []Variable `json:"variables"` Rule string `json:"rule"` Hint string `json:"hint"` } type ruleInfo struct { env map[string]any metrics []string requirements []*vm.Program variables []ruleVariable rule *vm.Program hint *template.Template } type JobClassTagger struct { rules map[string]ruleInfo parameters map[string]any tagType string cfgPath string } func (t *JobClassTagger) prepareRule(filename string, fns string) { b, err := os.ReadFile(filename) if err != nil { log.Warnf("prepareRule() > open file error: %v", err) return } var rule RuleFormat if err := json.NewDecoder(bytes.NewReader(b)).Decode(&rule); err != nil { log.Warn("Error while decoding raw job meta json") return } ri := ruleInfo{} ri.env = make(map[string]any) ri.metrics = make([]string, 0) ri.requirements = make([]*vm.Program, 0) ri.variables = make([]ruleVariable, 0) // check if all required parameters are available for _, p := range rule.Parameters { param, ok := t.parameters[p] if !ok { log.Warnf("prepareRule() > missing parameter %s in rule %s", p, fns) return } ri.env[p] = param } // set all required metrics for _, m := range rule.Metrics { ri.metrics = append(ri.metrics, m) } // compile requirements for _, r := range rule.Requirements { req, err := expr.Compile(r, expr.AsBool()) if err != nil { log.Errorf("error compiling requirement %s: %#v", r, err) return } ri.requirements = append(ri.requirements, req) } // compile rule exp, err := expr.Compile(rule.Rule, expr.AsBool()) if err != nil { log.Errorf("error compiling rule %s: %#v", fns, err) return } ri.rule = exp // prepare hint template ri.hint = template.Must(template.New(fns).Parse(rule.Hint)) delete(t.rules, rule.Tag) t.rules[rule.Tag] = ri } func (t *JobClassTagger) EventMatch(s string) bool { return strings.Contains(s, "jobclasses") } // FIXME: Only process the file that caused the event func (t *JobClassTagger) EventCallback() { files, err := os.ReadDir(t.cfgPath) if err != nil { log.Fatal(err) } for _, fn := range files { fns := fn.Name() log.Debugf("Process: %s", fns) filename := fmt.Sprintf("%s/%s", t.cfgPath, fns) t.prepareRule(filename, fns) } } func (t *JobClassTagger) Register() error { t.cfgPath = "./var/tagger/jobclasses" t.tagType = "jobClass" files, err := appFiles.ReadDir("jobclasses") if err != nil { return fmt.Errorf("error reading app folder: %#v", err) } t.rules = make(map[string]ruleInfo, 0) for _, fn := range files { fns := fn.Name() filename := fmt.Sprintf("%s/%s", t.cfgPath, fns) if fn.Name() == "parameters.json" { b, err := os.ReadFile(filename) if err != nil { log.Warnf("prepareRule() > open file error: %v", err) return err } if err := json.NewDecoder(bytes.NewReader(b)).Decode(&t.parameters); err != nil { log.Warn("Error while decoding parameters.json") return err } continue } log.Debugf("Process: %s", fns) t.prepareRule(filename, fns) } if util.CheckFileExists(t.cfgPath) { t.EventCallback() log.Infof("Setup file watch for %s", t.cfgPath) util.AddListener(t.cfgPath, t) } return nil } func (t *JobClassTagger) Match(job *schema.Job) { r := repository.GetJobRepository() jobstats, err := archive.GetStatistics(job) if err != nil { log.Errorf("job classification failed for job %d: %#v", job.JobID, err) return } for tag, ri := range t.rules { env := make(map[string]any) maps.Copy(env, ri.env) // add metrics to env for _, m := range ri.metrics { stats, ok := jobstats[m] if !ok { log.Errorf("job classification failed for job %d: missing metric '%s'", job.JobID, m) return } env[m] = stats.Avg } // check rule requirements apply for _, r := range ri.requirements { ok, err := expr.Run(r, env) if err != nil { log.Errorf("error running requirement for rule %s: %#v", tag, err) return } if !ok.(bool) { log.Infof("requirement for rule %s not met", tag) return } } // validate rule expression for _, v := range ri.variables { value, err := expr.Run(v.expr, env) if err != nil { log.Errorf("error running rule %s: %#v", tag, err) return } env[v.name] = value } match, err := expr.Run(ri.rule, job) if err != nil { log.Errorf("error running rule %s: %#v", tag, err) } if match.(bool) { id := job.ID if !r.HasTag(id, t.tagType, tag) { r.AddTagOrCreateDirect(id, t.tagType, tag) } } // process hint template var msg bytes.Buffer if err := ri.hint.Execute(&msg, env); err != nil { log.Errorf("Template error: %s", err.Error()) } // FIXME: Handle case where multiple tags apply r.UpdateMetadata(job, "message", msg.String()) } }