Fix bug in taggers. Allow to add multiple tags to one job

This commit is contained in:
2026-02-24 06:44:17 +01:00
parent 0dea959391
commit 998aff2345
2 changed files with 55 additions and 29 deletions

View File

@@ -190,6 +190,8 @@ func (t *JobClassTagger) EventCallback() {
cclog.Fatal(err) cclog.Fatal(err)
} }
t.rules = make(map[string]ruleInfo)
parametersFile := filepath.Join(t.cfgPath, parametersFileName) parametersFile := filepath.Join(t.cfgPath, parametersFileName)
if util.CheckFileExists(parametersFile) { if util.CheckFileExists(parametersFile) {
cclog.Info("Merge parameters") cclog.Info("Merge parameters")
@@ -301,17 +303,21 @@ func (t *JobClassTagger) Register() error {
// - Shared parameters defined in parameters.json // - Shared parameters defined in parameters.json
// - Computed variables from the rule definition // - Computed variables from the rule definition
// //
// Rules are evaluated in arbitrary order. If multiple rules match, only the first // Rules are evaluated in arbitrary order. Multiple rules can match and apply
// encountered match is applied (FIXME: this should handle multiple matches). // their tags to the same job. Hint messages from all matching rules are collected
// and stored as a combined message in the job metadata.
func (t *JobClassTagger) Match(job *schema.Job) { func (t *JobClassTagger) Match(job *schema.Job) {
jobStats, err := t.getStatistics(job) jobStats, err := t.getStatistics(job)
metricsList := t.getMetricConfig(job.Cluster, job.SubCluster) metricsList := t.getMetricConfig(job.Cluster, job.SubCluster)
cclog.Infof("Enter match rule with %d rules for job %d", len(t.rules), job.JobID) cclog.Infof("Enter match rule with %d rules for job %d", len(t.rules), job.JobID)
if err != nil { if err != nil {
cclog.Errorf("job classification failed for job %d: %#v", job.JobID, err) cclog.Errorf("job classification failed for job %d: %#v", job.JobID, err)
return return
} }
id := *job.ID
var messages []string
for tag, ri := range t.rules { for tag, ri := range t.rules {
env := make(map[string]any) env := make(map[string]any)
maps.Copy(env, ri.env) maps.Copy(env, ri.env)
@@ -329,11 +335,13 @@ func (t *JobClassTagger) Match(job *schema.Job) {
} }
// add metrics to env // add metrics to env
skipRule := false
for _, m := range ri.metrics { for _, m := range ri.metrics {
stats, ok := jobStats[m] stats, ok := jobStats[m]
if !ok { if !ok {
cclog.Errorf("job classification failed for job %d: missing metric '%s'", job.JobID, m) cclog.Errorf("job classification: missing metric '%s' for rule %s on job %d", m, tag, job.JobID)
return skipRule = true
break
} }
env[m] = map[string]any{ env[m] = map[string]any{
"min": stats.Min, "min": stats.Min,
@@ -347,44 +355,55 @@ func (t *JobClassTagger) Match(job *schema.Job) {
}, },
} }
} }
if skipRule {
continue
}
// check rule requirements apply // check rule requirements apply
requirementsMet := true
for _, r := range ri.requirements { for _, r := range ri.requirements {
ok, err := expr.Run(r, env) ok, err := expr.Run(r, env)
if err != nil { if err != nil {
cclog.Errorf("error running requirement for rule %s: %#v", tag, err) cclog.Errorf("error running requirement for rule %s: %#v", tag, err)
return requirementsMet = false
break
} }
if !ok.(bool) { if !ok.(bool) {
cclog.Infof("requirement for rule %s not met", tag) cclog.Infof("requirement for rule %s not met", tag)
return requirementsMet = false
break
} }
} }
if !requirementsMet {
continue
}
// validate rule expression // evaluate rule variables
varError := false
for _, v := range ri.variables { for _, v := range ri.variables {
value, err := expr.Run(v.expr, env) value, err := expr.Run(v.expr, env)
if err != nil { if err != nil {
cclog.Errorf("error running rule %s: %#v", tag, err) cclog.Errorf("error evaluating variable %s for rule %s: %#v", v.name, tag, err)
return varError = true
break
} }
env[v.name] = value env[v.name] = value
} }
if varError {
// dump.P(env) continue
}
match, err := expr.Run(ri.rule, env) match, err := expr.Run(ri.rule, env)
if err != nil { if err != nil {
cclog.Errorf("error running rule %s: %#v", tag, err) cclog.Errorf("error running rule %s: %#v", tag, err)
return continue
} }
if match.(bool) { if match.(bool) {
cclog.Info("Rule matches!") cclog.Info("Rule matches!")
id := *job.ID
if !t.repo.HasTag(id, t.tagType, tag) { if !t.repo.HasTag(id, t.tagType, tag) {
_, err := t.repo.AddTagOrCreateDirect(id, t.tagType, tag) if _, err := t.repo.AddTagOrCreateDirect(id, t.tagType, tag); err != nil {
if err != nil { cclog.Errorf("failed to add tag '%s' to job %d: %v", tag, id, err)
return continue
} }
} }
@@ -392,17 +411,18 @@ func (t *JobClassTagger) Match(job *schema.Job) {
var msg bytes.Buffer var msg bytes.Buffer
if err := ri.hint.Execute(&msg, env); err != nil { if err := ri.hint.Execute(&msg, env); err != nil {
cclog.Errorf("Template error: %s", err.Error()) cclog.Errorf("Template error: %s", err.Error())
return continue
}
// FIXME: Handle case where multiple tags apply
// FIXME: Handle case where multiple tags apply
err = t.repo.UpdateMetadata(job, "message", msg.String())
if err != nil {
return
} }
messages = append(messages, msg.String())
} else { } else {
cclog.Info("Rule does not match!") cclog.Info("Rule does not match!")
} }
} }
if len(messages) > 0 {
combined := strings.Join(messages, "\n")
if err := t.repo.UpdateMetadata(job, "message", combined); err != nil {
cclog.Errorf("failed to update metadata for job %d: %v", *job.ID, err)
}
}
} }

View File

@@ -98,6 +98,8 @@ func (t *AppTagger) EventCallback() {
cclog.Fatal(err) cclog.Fatal(err)
} }
t.apps = make([]appInfo, 0)
for _, fn := range files { for _, fn := range files {
if fn.IsDir() { if fn.IsDir() {
continue continue
@@ -163,7 +165,7 @@ func (t *AppTagger) Register() error {
// It fetches the job metadata, extracts the job script, and matches it against // It fetches the job metadata, extracts the job script, and matches it against
// all configured application patterns using regular expressions. // all configured application patterns using regular expressions.
// If a match is found, the corresponding application tag is added to the job. // If a match is found, the corresponding application tag is added to the job.
// Only the first matching application is tagged. // Multiple application tags can be applied if patterns for different apps match.
func (t *AppTagger) Match(job *schema.Job) { func (t *AppTagger) Match(job *schema.Job) {
r := repository.GetJobRepository() r := repository.GetJobRepository()
@@ -199,6 +201,7 @@ func (t *AppTagger) Match(job *schema.Job) {
jobscriptLower := strings.ToLower(jobscript) jobscriptLower := strings.ToLower(jobscript)
cclog.Debugf("AppTagger: matching job %d (script length: %d) against %d apps", id, len(jobscriptLower), len(t.apps)) cclog.Debugf("AppTagger: matching job %d (script length: %d) against %d apps", id, len(jobscriptLower), len(t.apps))
matched := false
for _, a := range t.apps { for _, a := range t.apps {
for _, re := range a.patterns { for _, re := range a.patterns {
if re.MatchString(jobscriptLower) { if re.MatchString(jobscriptLower) {
@@ -210,10 +213,13 @@ func (t *AppTagger) Match(job *schema.Job) {
cclog.Errorf("AppTagger: failed to add tag '%s' to job %d: %v", a.tag, id, err) cclog.Errorf("AppTagger: failed to add tag '%s' to job %d: %v", a.tag, id, err)
} }
} }
return matched = true
break // matched this app, move to next app
} }
} }
} }
cclog.Debugf("AppTagger: no pattern matched for job %d on %s", id, job.Cluster) if !matched {
cclog.Debugf("AppTagger: no pattern matched for job %d on %s", id, job.Cluster)
}
} }