mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2025-06-08 16:43:49 +02:00
323 lines
7.9 KiB
Go
323 lines
7.9 KiB
Go
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
|
|
// All rights reserved. This file is part of cc-backend.
|
|
// Use of this source code is governed by a MIT-style
|
|
// license that can be found in the LICENSE file.
|
|
package tagger
|
|
|
|
import (
|
|
"bytes"
|
|
"embed"
|
|
"encoding/json"
|
|
"fmt"
|
|
"maps"
|
|
"os"
|
|
"strings"
|
|
"text/template"
|
|
|
|
"github.com/ClusterCockpit/cc-backend/internal/repository"
|
|
"github.com/ClusterCockpit/cc-backend/internal/util"
|
|
"github.com/ClusterCockpit/cc-backend/pkg/archive"
|
|
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
|
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
|
"github.com/expr-lang/expr"
|
|
"github.com/expr-lang/expr/vm"
|
|
)
|
|
|
|
//go:embed jobclasses/*
|
|
var jobclassFiles embed.FS
|
|
|
|
type Variable struct {
|
|
Name string `json:"name"`
|
|
Expr string `json:"expr"`
|
|
}
|
|
|
|
type ruleVariable struct {
|
|
name string
|
|
expr *vm.Program
|
|
}
|
|
|
|
type RuleFormat struct {
|
|
Name string `json:"name"`
|
|
Tag string `json:"tag"`
|
|
Parameters []string `json:"parameters"`
|
|
Metrics []string `json:"metrics"`
|
|
Requirements []string `json:"requirements"`
|
|
Variables []Variable `json:"variables"`
|
|
Rule string `json:"rule"`
|
|
Hint string `json:"hint"`
|
|
}
|
|
|
|
type ruleInfo struct {
|
|
env map[string]any
|
|
metrics []string
|
|
requirements []*vm.Program
|
|
variables []ruleVariable
|
|
rule *vm.Program
|
|
hint *template.Template
|
|
}
|
|
|
|
type JobClassTagger struct {
|
|
rules map[string]ruleInfo
|
|
parameters map[string]any
|
|
tagType string
|
|
cfgPath string
|
|
}
|
|
|
|
func (t *JobClassTagger) prepareRule(b []byte, fns string) {
|
|
var rule RuleFormat
|
|
if err := json.NewDecoder(bytes.NewReader(b)).Decode(&rule); err != nil {
|
|
log.Warn("Error while decoding raw job meta json")
|
|
return
|
|
}
|
|
|
|
ri := ruleInfo{}
|
|
ri.env = make(map[string]any)
|
|
ri.metrics = make([]string, 0)
|
|
ri.requirements = make([]*vm.Program, 0)
|
|
ri.variables = make([]ruleVariable, 0)
|
|
|
|
// check if all required parameters are available
|
|
for _, p := range rule.Parameters {
|
|
param, ok := t.parameters[p]
|
|
if !ok {
|
|
log.Warnf("prepareRule() > missing parameter %s in rule %s", p, fns)
|
|
return
|
|
}
|
|
ri.env[p] = param
|
|
}
|
|
|
|
// set all required metrics
|
|
ri.metrics = append(ri.metrics, rule.Metrics...)
|
|
|
|
// compile requirements
|
|
for _, r := range rule.Requirements {
|
|
req, err := expr.Compile(r, expr.AsBool())
|
|
if err != nil {
|
|
log.Errorf("error compiling requirement %s: %#v", r, err)
|
|
return
|
|
}
|
|
ri.requirements = append(ri.requirements, req)
|
|
}
|
|
|
|
// compile variables
|
|
for _, v := range rule.Variables {
|
|
req, err := expr.Compile(v.Expr, expr.AsFloat64())
|
|
if err != nil {
|
|
log.Errorf("error compiling requirement %s: %#v", v.Name, err)
|
|
return
|
|
}
|
|
ri.variables = append(ri.variables, ruleVariable{name: v.Name, expr: req})
|
|
}
|
|
|
|
// compile rule
|
|
exp, err := expr.Compile(rule.Rule, expr.AsBool())
|
|
if err != nil {
|
|
log.Errorf("error compiling rule %s: %#v", fns, err)
|
|
return
|
|
}
|
|
ri.rule = exp
|
|
|
|
// prepare hint template
|
|
ri.hint, err = template.New(fns).Parse(rule.Hint)
|
|
if err != nil {
|
|
log.Errorf("error processing template %s: %#v", fns, err)
|
|
}
|
|
log.Infof("prepareRule() > processing %s with %d requirements and %d variables", fns, len(ri.requirements), len(ri.variables))
|
|
|
|
t.rules[rule.Tag] = ri
|
|
}
|
|
|
|
func (t *JobClassTagger) EventMatch(s string) bool {
|
|
return strings.Contains(s, "jobclasses")
|
|
}
|
|
|
|
// FIXME: Only process the file that caused the event
|
|
func (t *JobClassTagger) EventCallback() {
|
|
files, err := os.ReadDir(t.cfgPath)
|
|
if err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
|
|
if util.CheckFileExists(t.cfgPath + "/parameters.json") {
|
|
log.Info("Merge parameters")
|
|
b, err := os.ReadFile(t.cfgPath + "/parameters.json")
|
|
if err != nil {
|
|
log.Warnf("prepareRule() > open file error: %v", err)
|
|
}
|
|
|
|
var paramTmp map[string]any
|
|
if err := json.NewDecoder(bytes.NewReader(b)).Decode(¶mTmp); err != nil {
|
|
log.Warn("Error while decoding parameters.json")
|
|
}
|
|
|
|
maps.Copy(t.parameters, paramTmp)
|
|
}
|
|
|
|
for _, fn := range files {
|
|
fns := fn.Name()
|
|
if fns != "parameters.json" {
|
|
log.Debugf("Process: %s", fns)
|
|
filename := fmt.Sprintf("%s/%s", t.cfgPath, fns)
|
|
b, err := os.ReadFile(filename)
|
|
if err != nil {
|
|
log.Warnf("prepareRule() > open file error: %v", err)
|
|
return
|
|
}
|
|
t.prepareRule(b, fns)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (t *JobClassTagger) initParameters() error {
|
|
log.Info("Initialize parameters")
|
|
b, err := jobclassFiles.ReadFile("jobclasses/parameters.json")
|
|
if err != nil {
|
|
log.Warnf("prepareRule() > open file error: %v", err)
|
|
return err
|
|
}
|
|
|
|
if err := json.NewDecoder(bytes.NewReader(b)).Decode(&t.parameters); err != nil {
|
|
log.Warn("Error while decoding parameters.json")
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (t *JobClassTagger) Register() error {
|
|
t.cfgPath = "./var/tagger/jobclasses"
|
|
t.tagType = "jobClass"
|
|
|
|
err := t.initParameters()
|
|
if err != nil {
|
|
log.Warnf("error reading parameters.json: %v", err)
|
|
return err
|
|
}
|
|
|
|
files, err := jobclassFiles.ReadDir("jobclasses")
|
|
if err != nil {
|
|
return fmt.Errorf("error reading app folder: %#v", err)
|
|
}
|
|
t.rules = make(map[string]ruleInfo, 0)
|
|
for _, fn := range files {
|
|
fns := fn.Name()
|
|
if fns != "parameters.json" {
|
|
filename := fmt.Sprintf("jobclasses/%s", fns)
|
|
log.Infof("Process: %s", fns)
|
|
|
|
b, err := jobclassFiles.ReadFile(filename)
|
|
if err != nil {
|
|
log.Warnf("prepareRule() > open file error: %v", err)
|
|
return err
|
|
}
|
|
t.prepareRule(b, fns)
|
|
}
|
|
}
|
|
|
|
if util.CheckFileExists(t.cfgPath) {
|
|
t.EventCallback()
|
|
log.Infof("Setup file watch for %s", t.cfgPath)
|
|
util.AddListener(t.cfgPath, t)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (t *JobClassTagger) Match(job *schema.Job) {
|
|
r := repository.GetJobRepository()
|
|
jobstats, err := archive.GetStatistics(job)
|
|
metricsList := archive.GetMetricConfigSubCluster(job.Cluster, job.SubCluster)
|
|
log.Infof("Enter match rule with %d rules for job %d", len(t.rules), job.JobID)
|
|
if err != nil {
|
|
log.Errorf("job classification failed for job %d: %#v", job.JobID, err)
|
|
return
|
|
}
|
|
|
|
for tag, ri := range t.rules {
|
|
env := make(map[string]any)
|
|
maps.Copy(env, ri.env)
|
|
log.Infof("Try to match rule %s for job %d", tag, job.JobID)
|
|
|
|
// Initialize environment
|
|
env["job"] = map[string]any{
|
|
"exclusive": job.Exclusive,
|
|
"duration": job.Duration,
|
|
"numCores": job.NumHWThreads,
|
|
"numNodes": job.NumNodes,
|
|
"jobState": job.State,
|
|
"numAcc": job.NumAcc,
|
|
"smt": job.SMT,
|
|
}
|
|
|
|
// add metrics to env
|
|
for _, m := range ri.metrics {
|
|
stats, ok := jobstats[m]
|
|
if !ok {
|
|
log.Errorf("job classification failed for job %d: missing metric '%s'", job.JobID, m)
|
|
return
|
|
}
|
|
env[m] = map[string]any{
|
|
"min": stats.Min,
|
|
"max": stats.Max,
|
|
"avg": stats.Avg,
|
|
"limits": map[string]float64{
|
|
"peak": metricsList[m].Peak,
|
|
"normal": metricsList[m].Normal,
|
|
"caution": metricsList[m].Caution,
|
|
"alert": metricsList[m].Alert,
|
|
},
|
|
}
|
|
}
|
|
|
|
// check rule requirements apply
|
|
for _, r := range ri.requirements {
|
|
ok, err := expr.Run(r, env)
|
|
if err != nil {
|
|
log.Errorf("error running requirement for rule %s: %#v", tag, err)
|
|
return
|
|
}
|
|
if !ok.(bool) {
|
|
log.Infof("requirement for rule %s not met", tag)
|
|
return
|
|
}
|
|
}
|
|
|
|
// validate rule expression
|
|
for _, v := range ri.variables {
|
|
value, err := expr.Run(v.expr, env)
|
|
if err != nil {
|
|
log.Errorf("error running rule %s: %#v", tag, err)
|
|
return
|
|
}
|
|
env[v.name] = value
|
|
}
|
|
|
|
// dump.P(env)
|
|
|
|
match, err := expr.Run(ri.rule, env)
|
|
if err != nil {
|
|
log.Errorf("error running rule %s: %#v", tag, err)
|
|
return
|
|
}
|
|
if match.(bool) {
|
|
log.Info("Rule matches!")
|
|
id := *job.ID
|
|
if !r.HasTag(id, t.tagType, tag) {
|
|
r.AddTagOrCreateDirect(id, t.tagType, tag)
|
|
}
|
|
|
|
// process hint template
|
|
var msg bytes.Buffer
|
|
if err := ri.hint.Execute(&msg, env); err != nil {
|
|
log.Errorf("Template error: %s", err.Error())
|
|
return
|
|
}
|
|
|
|
// FIXME: Handle case where multiple tags apply
|
|
r.UpdateMetadata(job, "message", msg.String())
|
|
} else {
|
|
log.Info("Rule does not match!")
|
|
}
|
|
}
|
|
}
|