Fix bugs in job classifier and tagger infrastructure

This commit is contained in:
2025-05-26 13:08:03 +02:00
parent 3c66840f95
commit f14bdb3068
8 changed files with 105 additions and 47 deletions

View File

@@ -21,6 +21,7 @@ import (
"github.com/ClusterCockpit/cc-backend/pkg/schema"
"github.com/expr-lang/expr"
"github.com/expr-lang/expr/vm"
"github.com/gookit/goutil/dump"
)
//go:embed jobclasses/*
@@ -63,13 +64,7 @@ type JobClassTagger struct {
cfgPath string
}
func (t *JobClassTagger) prepareRule(filename string, fns string) {
b, err := os.ReadFile(filename)
if err != nil {
log.Warnf("prepareRule() > open file error: %v", err)
return
}
func (t *JobClassTagger) prepareRule(b []byte, fns string) {
var rule RuleFormat
if err := json.NewDecoder(bytes.NewReader(b)).Decode(&rule); err != nil {
log.Warn("Error while decoding raw job meta json")
@@ -93,9 +88,7 @@ func (t *JobClassTagger) prepareRule(filename string, fns string) {
}
// set all required metrics
for _, m := range rule.Metrics {
ri.metrics = append(ri.metrics, m)
}
ri.metrics = append(ri.metrics, rule.Metrics...)
// compile requirements
for _, r := range rule.Requirements {
@@ -107,6 +100,16 @@ func (t *JobClassTagger) prepareRule(filename string, fns string) {
ri.requirements = append(ri.requirements, req)
}
// compile variables
for _, v := range rule.Variables {
req, err := expr.Compile(v.Expr, expr.AsFloat64())
if err != nil {
log.Errorf("error compiling requirement %s: %#v", v.Name, err)
return
}
ri.variables = append(ri.variables, ruleVariable{name: v.Name, expr: req})
}
// compile rule
exp, err := expr.Compile(rule.Rule, expr.AsBool())
if err != nil {
@@ -116,7 +119,11 @@ func (t *JobClassTagger) prepareRule(filename string, fns string) {
ri.rule = exp
// prepare hint template
ri.hint = template.Must(template.New(fns).Parse(rule.Hint))
ri.hint, err = template.New(fns).Parse(rule.Hint)
if err != nil {
log.Errorf("error processing template %s: %#v", fns, err)
}
log.Infof("prepareRule() > processing %s with %d requirements and %d variables", fns, len(ri.requirements), len(ri.variables))
delete(t.rules, rule.Tag)
t.rules[rule.Tag] = ri
@@ -137,38 +144,59 @@ func (t *JobClassTagger) EventCallback() {
fns := fn.Name()
log.Debugf("Process: %s", fns)
filename := fmt.Sprintf("%s/%s", t.cfgPath, fns)
t.prepareRule(filename, fns)
b, err := os.ReadFile(filename)
if err != nil {
log.Warnf("prepareRule() > open file error: %v", err)
return
}
t.prepareRule(b, fns)
}
}
func (t *JobClassTagger) initParameters() error {
log.Info("Initialize parameters")
b, err := jobclassFiles.ReadFile("jobclasses/parameters.json")
if err != nil {
log.Warnf("prepareRule() > open file error: %v", err)
return err
}
if err := json.NewDecoder(bytes.NewReader(b)).Decode(&t.parameters); err != nil {
log.Warn("Error while decoding parameters.json")
return err
}
return nil
}
func (t *JobClassTagger) Register() error {
t.cfgPath = "./var/tagger/jobclasses"
t.tagType = "jobClass"
files, err := appFiles.ReadDir("jobclasses")
err := t.initParameters()
if err != nil {
log.Warnf("error reading parameters.json: %v", err)
return err
}
files, err := jobclassFiles.ReadDir("jobclasses")
if err != nil {
return fmt.Errorf("error reading app folder: %#v", err)
}
t.rules = make(map[string]ruleInfo, 0)
for _, fn := range files {
fns := fn.Name()
filename := fmt.Sprintf("%s/%s", t.cfgPath, fns)
if fns != "parameters.json" {
filename := fmt.Sprintf("jobclasses/%s", fns)
log.Infof("Process: %s", fns)
if fn.Name() == "parameters.json" {
b, err := os.ReadFile(filename)
b, err := jobclassFiles.ReadFile(filename)
if err != nil {
log.Warnf("prepareRule() > open file error: %v", err)
return err
}
if err := json.NewDecoder(bytes.NewReader(b)).Decode(&t.parameters); err != nil {
log.Warn("Error while decoding parameters.json")
return err
}
continue
t.prepareRule(b, fns)
}
log.Debugf("Process: %s", fns)
t.prepareRule(filename, fns)
}
if util.CheckFileExists(t.cfgPath) {
@@ -183,6 +211,7 @@ func (t *JobClassTagger) Register() error {
func (t *JobClassTagger) Match(job *schema.Job) {
r := repository.GetJobRepository()
jobstats, err := archive.GetStatistics(job)
log.Infof("Enter match rule with %d rules for job %d", len(t.rules), job.JobID)
if err != nil {
log.Errorf("job classification failed for job %d: %#v", job.JobID, err)
return
@@ -191,6 +220,16 @@ func (t *JobClassTagger) Match(job *schema.Job) {
for tag, ri := range t.rules {
env := make(map[string]any)
maps.Copy(env, ri.env)
log.Infof("Try to match rule %s for job %d", tag, job.JobID)
env["job"] = map[string]any{
"exclusive": job.Exclusive,
"duration": job.Duration,
"numCores": job.NumHWThreads,
"numNodes": job.NumNodes,
"jobState": job.State,
"numAcc": job.NumAcc,
"smt": job.SMT,
}
// add metrics to env
for _, m := range ri.metrics {
@@ -225,21 +264,28 @@ func (t *JobClassTagger) Match(job *schema.Job) {
env[v.name] = value
}
match, err := expr.Run(ri.rule, job)
dump.P(env)
match, err := expr.Run(ri.rule, env)
if err != nil {
log.Errorf("error running rule %s: %#v", tag, err)
return
}
if match.(bool) {
log.Info("Rule matches!")
id := job.ID
if !r.HasTag(id, t.tagType, tag) {
r.AddTagOrCreateDirect(id, t.tagType, tag)
}
} else {
log.Info("Rule does not match!")
}
// process hint template
var msg bytes.Buffer
if err := ri.hint.Execute(&msg, env); err != nil {
log.Errorf("Template error: %s", err.Error())
return
}
// FIXME: Handle case where multiple tags apply

View File

@@ -79,10 +79,10 @@ func (t *AppTagger) Register() error {
fns := fn.Name()
log.Debugf("Process: %s", fns)
f, err := appFiles.Open(fmt.Sprintf("apps/%s", fns))
defer f.Close()
if err != nil {
return fmt.Errorf("error opening app file %s: %#v", fns, err)
}
defer f.Close()
t.scanApp(f, fns)
}
@@ -97,7 +97,13 @@ func (t *AppTagger) Register() error {
func (t *AppTagger) Match(job *schema.Job) {
r := repository.GetJobRepository()
jobscript, ok := job.MetaData["jobScript"]
metadata, err := r.FetchMetadata(job)
if err != nil {
log.Infof("Cannot fetch metadata for job: %d on %s", job.JobID, job.Cluster)
return
}
jobscript, ok := metadata["jobScript"]
if ok {
id := job.ID

View File

@@ -7,27 +7,21 @@
"job_min_duration_seconds",
"sampling_interval_seconds"
],
"metrics": [
"cpu_load"
],
"metrics": ["cpu_load"],
"requirements": [
"job.exclusive == 1",
"job.duration > job_min_duration_seconds"
],
"terms": [
{
"name": "",
"load_mean": "cpu_load[cpu_load_pre_cutoff_samples].mean('all')"
},
"variables": [
{
"name": "load_threshold",
"expr": "(job.numHwthreads/job.numNodes) * excessivecpuload_threshold_factor"
"expr": "(job.numCores / job.numNodes) * excessivecpuload_threshold_factor"
},
{
"name": "load_perc",
"expr": "load_mean / load_threshold"
"expr": "cpu_load / load_threshold"
}
],
"rule": "cpu_load > load_threshold",
"hint": "This job was detected as excessiveload because the average cpu load {{ cpu_load }} falls above the threshold {{ load_threshold }}."
"hint": "This job was detected as excessiveload because the average cpu load {{.cpu_load}} falls above the threshold {{.load_threshold}}."
}

View File

@@ -6,9 +6,7 @@
"job_min_duration_seconds",
"sampling_interval_seconds"
],
"metrics": [
"cpu_load"
],
"metrics": ["cpu_load"],
"requirements": [
"job.exclusive == 1",
"job.duration > job_min_duration_seconds"
@@ -16,7 +14,7 @@
"variables": [
{
"name": "load_threshold",
"expr": "job.numHwthreads * lowcpuload_threshold_factor"
"expr": "job.numCores * lowcpuload_threshold_factor"
},
{
"name": "load_perc",
@@ -24,5 +22,5 @@
}
],
"rule": "cpu_load < load_threshold",
"hint": "This job was detected as lowload because the average cpu load {{ cpu_load }} falls below the threshold {{ load_threshold }}."
"hint": "This job was detected as lowload because the average cpu load {{.cpu_load}} falls below the threshold {{.load_threshold}}."
}

View File

@@ -32,11 +32,14 @@ func newTagger() {
jobTagger.startTaggers = make([]Tagger, 0)
jobTagger.startTaggers = append(jobTagger.startTaggers, &AppTagger{})
jobTagger.stopTaggers = make([]Tagger, 0)
jobTagger.stopTaggers = append(jobTagger.startTaggers, &JobClassTagger{})
jobTagger.stopTaggers = append(jobTagger.stopTaggers, &JobClassTagger{})
for _, tagger := range jobTagger.startTaggers {
tagger.Register()
}
for _, tagger := range jobTagger.stopTaggers {
tagger.Register()
}
}
func Init() {
@@ -77,6 +80,7 @@ func RunTaggers() error {
tagger.Match(job)
}
for _, tagger := range jobTagger.stopTaggers {
log.Infof("Run stop tagger for job %d", job.ID)
tagger.Match(job)
}
}