Remove embedded tagger rules

This commit is contained in:
2026-01-13 07:20:26 +01:00
parent 4cec933349
commit 42809e3f75
25 changed files with 166 additions and 78 deletions

0
configs/tagger/README.md Normal file
View File

View File

@@ -2,15 +2,16 @@
// All rights reserved. This file is part of cc-backend. // All rights reserved. This file is part of cc-backend.
// Use of this source code is governed by a MIT-style // Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file. // license that can be found in the LICENSE file.
package tagger package tagger
import ( import (
"bytes" "bytes"
"embed"
"encoding/json" "encoding/json"
"fmt" "fmt"
"maps" "maps"
"os" "os"
"path/filepath"
"strings" "strings"
"text/template" "text/template"
@@ -23,8 +24,16 @@ import (
"github.com/expr-lang/expr/vm" "github.com/expr-lang/expr/vm"
) )
//go:embed jobclasses/* const (
var jobClassFiles embed.FS // defaultJobClassConfigPath is the default path for job classification configuration
defaultJobClassConfigPath = "./var/tagger/jobclasses"
// tagTypeJobClass is the tag type identifier for job classification tags
tagTypeJobClass = "jobClass"
// jobClassConfigDirMatch is the directory name used for matching filesystem events
jobClassConfigDirMatch = "jobclasses"
// parametersFileName is the name of the parameters configuration file
parametersFileName = "parameters.json"
)
// Variable defines a named expression that can be computed and reused in rules. // Variable defines a named expression that can be computed and reused in rules.
// Variables are evaluated before the main rule and their results are added to the environment. // Variables are evaluated before the main rule and their results are added to the environment.
@@ -45,21 +54,21 @@ type ruleVariable struct {
// and the final rule expression that determines if the job matches the classification. // and the final rule expression that determines if the job matches the classification.
type RuleFormat struct { type RuleFormat struct {
// Name is a human-readable description of the rule // Name is a human-readable description of the rule
Name string `json:"name"` Name string `json:"name"`
// Tag is the classification tag to apply if the rule matches // Tag is the classification tag to apply if the rule matches
Tag string `json:"tag"` Tag string `json:"tag"`
// Parameters are shared values referenced in the rule (e.g., thresholds) // Parameters are shared values referenced in the rule (e.g., thresholds)
Parameters []string `json:"parameters"` Parameters []string `json:"parameters"`
// Metrics are the job metrics required for this rule (e.g., "cpu_load", "mem_used") // Metrics are the job metrics required for this rule (e.g., "cpu_load", "mem_used")
Metrics []string `json:"metrics"` Metrics []string `json:"metrics"`
// Requirements are boolean expressions that must be true for the rule to apply // Requirements are boolean expressions that must be true for the rule to apply
Requirements []string `json:"requirements"` Requirements []string `json:"requirements"`
// Variables are computed values used in the rule expression // Variables are computed values used in the rule expression
Variables []Variable `json:"variables"` Variables []Variable `json:"variables"`
// Rule is the boolean expression that determines if the job matches // Rule is the boolean expression that determines if the job matches
Rule string `json:"rule"` Rule string `json:"rule"`
// Hint is a template string that generates a message when the rule matches // Hint is a template string that generates a message when the rule matches
Hint string `json:"hint"` Hint string `json:"hint"`
} }
type ruleInfo struct { type ruleInfo struct {
@@ -75,29 +84,29 @@ type ruleInfo struct {
// This interface allows for easier testing and decoupling from the concrete repository implementation. // This interface allows for easier testing and decoupling from the concrete repository implementation.
type JobRepository interface { type JobRepository interface {
// HasTag checks if a job already has a specific tag // HasTag checks if a job already has a specific tag
HasTag(jobId int64, tagType string, tagName string) bool HasTag(jobID int64, tagType string, tagName string) bool
// AddTagOrCreateDirect adds a tag to a job or creates it if it doesn't exist // AddTagOrCreateDirect adds a tag to a job or creates it if it doesn't exist
AddTagOrCreateDirect(jobId int64, tagType string, tagName string) (tagId int64, err error) AddTagOrCreateDirect(jobID int64, tagType string, tagName string) (tagID int64, err error)
// UpdateMetadata updates job metadata with a key-value pair // UpdateMetadata updates job metadata with a key-value pair
UpdateMetadata(job *schema.Job, key, val string) (err error) UpdateMetadata(job *schema.Job, key, val string) (err error)
} }
// JobClassTagger classifies jobs based on configurable rules that evaluate job metrics and properties. // JobClassTagger classifies jobs based on configurable rules that evaluate job metrics and properties.
// Rules are loaded from embedded JSON files and can be dynamically reloaded from a watched directory. // Rules are loaded from an external configuration directory and can be dynamically reloaded when files change.
// When a job matches a rule, it is tagged with the corresponding classification and an optional hint message. // When a job matches a rule, it is tagged with the corresponding classification and an optional hint message.
type JobClassTagger struct { type JobClassTagger struct {
// rules maps classification tags to their compiled rule information // rules maps classification tags to their compiled rule information
rules map[string]ruleInfo rules map[string]ruleInfo
// parameters are shared values (e.g., thresholds) used across multiple rules // parameters are shared values (e.g., thresholds) used across multiple rules
parameters map[string]any parameters map[string]any
// tagType is the type of tag ("jobClass") // tagType is the type of tag ("jobClass")
tagType string tagType string
// cfgPath is the path to watch for configuration changes // cfgPath is the path to watch for configuration changes
cfgPath string cfgPath string
// repo provides access to job database operations // repo provides access to job database operations
repo JobRepository repo JobRepository
// getStatistics retrieves job statistics for analysis // getStatistics retrieves job statistics for analysis
getStatistics func(job *schema.Job) (map[string]schema.JobStatistics, error) getStatistics func(job *schema.Job) (map[string]schema.JobStatistics, error)
// getMetricConfig retrieves metric configuration (limits) for a cluster // getMetricConfig retrieves metric configuration (limits) for a cluster
getMetricConfig func(cluster, subCluster string) map[string]*schema.Metric getMetricConfig func(cluster, subCluster string) map[string]*schema.Metric
} }
@@ -169,7 +178,7 @@ func (t *JobClassTagger) prepareRule(b []byte, fns string) {
// EventMatch checks if a filesystem event should trigger configuration reload. // EventMatch checks if a filesystem event should trigger configuration reload.
// It returns true if the event path contains "jobclasses". // It returns true if the event path contains "jobclasses".
func (t *JobClassTagger) EventMatch(s string) bool { func (t *JobClassTagger) EventMatch(s string) bool {
return strings.Contains(s, "jobclasses") return strings.Contains(s, jobClassConfigDirMatch)
} }
// EventCallback is triggered when the configuration directory changes. // EventCallback is triggered when the configuration directory changes.
@@ -181,9 +190,10 @@ func (t *JobClassTagger) EventCallback() {
cclog.Fatal(err) cclog.Fatal(err)
} }
if util.CheckFileExists(t.cfgPath + "/parameters.json") { parametersFile := filepath.Join(t.cfgPath, parametersFileName)
if util.CheckFileExists(parametersFile) {
cclog.Info("Merge parameters") cclog.Info("Merge parameters")
b, err := os.ReadFile(t.cfgPath + "/parameters.json") b, err := os.ReadFile(parametersFile)
if err != nil { if err != nil {
cclog.Warnf("prepareRule() > open file error: %v", err) cclog.Warnf("prepareRule() > open file error: %v", err)
} }
@@ -198,13 +208,13 @@ func (t *JobClassTagger) EventCallback() {
for _, fn := range files { for _, fn := range files {
fns := fn.Name() fns := fn.Name()
if fns != "parameters.json" { if fns != parametersFileName {
cclog.Debugf("Process: %s", fns) cclog.Debugf("Process: %s", fns)
filename := fmt.Sprintf("%s/%s", t.cfgPath, fns) filename := filepath.Join(t.cfgPath, fns)
b, err := os.ReadFile(filename) b, err := os.ReadFile(filename)
if err != nil { if err != nil {
cclog.Warnf("prepareRule() > open file error: %v", err) cclog.Warnf("prepareRule() > open file error: %v", err)
return continue
} }
t.prepareRule(b, fns) t.prepareRule(b, fns)
} }
@@ -213,7 +223,8 @@ func (t *JobClassTagger) EventCallback() {
func (t *JobClassTagger) initParameters() error { func (t *JobClassTagger) initParameters() error {
cclog.Info("Initialize parameters") cclog.Info("Initialize parameters")
b, err := jobClassFiles.ReadFile("jobclasses/parameters.json") parametersFile := filepath.Join(t.cfgPath, parametersFileName)
b, err := os.ReadFile(parametersFile)
if err != nil { if err != nil {
cclog.Warnf("prepareRule() > open file error: %v", err) cclog.Warnf("prepareRule() > open file error: %v", err)
return err return err
@@ -227,13 +238,20 @@ func (t *JobClassTagger) initParameters() error {
return nil return nil
} }
// Register initializes the JobClassTagger by loading parameters and classification rules. // Register initializes the JobClassTagger by loading parameters and classification rules from external folder.
// It loads embedded configuration files and sets up a file watch on ./var/tagger/jobclasses // It sets up a file watch on ./var/tagger/jobclasses if it exists, allowing for
// if it exists, allowing for dynamic configuration updates without restarting the application. // dynamic configuration updates without restarting the application.
// Returns an error if the embedded configuration files cannot be read or parsed. // Returns an error if the configuration path does not exist or cannot be read.
func (t *JobClassTagger) Register() error { func (t *JobClassTagger) Register() error {
t.cfgPath = "./var/tagger/jobclasses" if t.cfgPath == "" {
t.tagType = "jobClass" t.cfgPath = defaultJobClassConfigPath
}
t.tagType = tagTypeJobClass
t.rules = make(map[string]ruleInfo)
if !util.CheckFileExists(t.cfgPath) {
return fmt.Errorf("configuration path does not exist: %s", t.cfgPath)
}
err := t.initParameters() err := t.initParameters()
if err != nil { if err != nil {
@@ -241,31 +259,28 @@ func (t *JobClassTagger) Register() error {
return err return err
} }
files, err := jobClassFiles.ReadDir("jobclasses") files, err := os.ReadDir(t.cfgPath)
if err != nil { if err != nil {
return fmt.Errorf("error reading app folder: %#v", err) return fmt.Errorf("error reading jobclasses folder: %#v", err)
} }
t.rules = make(map[string]ruleInfo)
for _, fn := range files { for _, fn := range files {
fns := fn.Name() fns := fn.Name()
if fns != "parameters.json" { if fns != parametersFileName {
filename := fmt.Sprintf("jobclasses/%s", fns)
cclog.Infof("Process: %s", fns) cclog.Infof("Process: %s", fns)
filename := filepath.Join(t.cfgPath, fns)
b, err := jobClassFiles.ReadFile(filename) b, err := os.ReadFile(filename)
if err != nil { if err != nil {
cclog.Warnf("prepareRule() > open file error: %v", err) cclog.Warnf("prepareRule() > open file error: %v", err)
return err continue
} }
t.prepareRule(b, fns) t.prepareRule(b, fns)
} }
} }
if util.CheckFileExists(t.cfgPath) { cclog.Infof("Setup file watch for %s", t.cfgPath)
t.EventCallback() util.AddListener(t.cfgPath, t)
cclog.Infof("Setup file watch for %s", t.cfgPath)
util.AddListener(t.cfgPath, t)
}
t.repo = repository.GetJobRepository() t.repo = repository.GetJobRepository()
t.getStatistics = archive.GetStatistics t.getStatistics = archive.GetStatistics

View File

@@ -13,13 +13,13 @@ type MockJobRepository struct {
mock.Mock mock.Mock
} }
func (m *MockJobRepository) HasTag(jobId int64, tagType string, tagName string) bool { func (m *MockJobRepository) HasTag(jobID int64, tagType string, tagName string) bool {
args := m.Called(jobId, tagType, tagName) args := m.Called(jobID, tagType, tagName)
return args.Bool(0) return args.Bool(0)
} }
func (m *MockJobRepository) AddTagOrCreateDirect(jobId int64, tagType string, tagName string) (tagId int64, err error) { func (m *MockJobRepository) AddTagOrCreateDirect(jobID int64, tagType string, tagName string) (tagID int64, err error) {
args := m.Called(jobId, tagType, tagName) args := m.Called(jobID, tagType, tagName)
return args.Get(0).(int64), args.Error(1) return args.Get(0).(int64), args.Error(1)
} }

View File

@@ -7,9 +7,7 @@ package tagger
import ( import (
"bufio" "bufio"
"embed"
"fmt" "fmt"
"io/fs"
"os" "os"
"path/filepath" "path/filepath"
"regexp" "regexp"
@@ -21,8 +19,14 @@ import (
"github.com/ClusterCockpit/cc-lib/v2/util" "github.com/ClusterCockpit/cc-lib/v2/util"
) )
//go:embed apps/* const (
var appFiles embed.FS // defaultConfigPath is the default path for application tagging configuration
defaultConfigPath = "./var/tagger/apps"
// tagTypeApp is the tag type identifier for application tags
tagTypeApp = "app"
// configDirMatch is the directory name used for matching filesystem events
configDirMatch = "apps"
)
type appInfo struct { type appInfo struct {
tag string tag string
@@ -30,19 +34,19 @@ type appInfo struct {
} }
// AppTagger detects applications by matching patterns in job scripts. // AppTagger detects applications by matching patterns in job scripts.
// It loads application patterns from embedded files and can dynamically reload // It loads application patterns from an external configuration directory and can dynamically reload
// configuration from a watched directory. When a job script matches a pattern, // configuration when files change. When a job script matches a pattern,
// the corresponding application tag is automatically applied. // the corresponding application tag is automatically applied.
type AppTagger struct { type AppTagger struct {
// apps maps application tags to their matching patterns // apps maps application tags to their matching patterns
apps map[string]appInfo apps map[string]appInfo
// tagType is the type of tag ("app") // tagType is the type of tag ("app")
tagType string tagType string
// cfgPath is the path to watch for configuration changes // cfgPath is the path to watch for configuration changes
cfgPath string cfgPath string
} }
func (t *AppTagger) scanApp(f fs.File, fns string) { func (t *AppTagger) scanApp(f *os.File, fns string) {
scanner := bufio.NewScanner(f) scanner := bufio.NewScanner(f)
ai := appInfo{tag: strings.TrimSuffix(fns, filepath.Ext(fns)), strings: make([]string, 0)} ai := appInfo{tag: strings.TrimSuffix(fns, filepath.Ext(fns)), strings: make([]string, 0)}
@@ -56,7 +60,7 @@ func (t *AppTagger) scanApp(f fs.File, fns string) {
// EventMatch checks if a filesystem event should trigger configuration reload. // EventMatch checks if a filesystem event should trigger configuration reload.
// It returns true if the event path contains "apps". // It returns true if the event path contains "apps".
func (t *AppTagger) EventMatch(s string) bool { func (t *AppTagger) EventMatch(s string) bool {
return strings.Contains(s, "apps") return strings.Contains(s, configDirMatch)
} }
// EventCallback is triggered when the configuration directory changes. // EventCallback is triggered when the configuration directory changes.
@@ -71,43 +75,50 @@ func (t *AppTagger) EventCallback() {
for _, fn := range files { for _, fn := range files {
fns := fn.Name() fns := fn.Name()
cclog.Debugf("Process: %s", fns) cclog.Debugf("Process: %s", fns)
f, err := os.Open(fmt.Sprintf("%s/%s", t.cfgPath, fns)) f, err := os.Open(filepath.Join(t.cfgPath, fns))
if err != nil { if err != nil {
cclog.Errorf("error opening app file %s: %#v", fns, err) cclog.Errorf("error opening app file %s: %#v", fns, err)
continue
} }
t.scanApp(f, fns) t.scanApp(f, fns)
f.Close()
} }
} }
// Register initializes the AppTagger by loading application patterns from embedded files. // Register initializes the AppTagger by loading application patterns from external folder.
// It also sets up a file watch on ./var/tagger/apps if it exists, allowing for // It sets up a file watch on ./var/tagger/apps if it exists, allowing for
// dynamic configuration updates without restarting the application. // dynamic configuration updates without restarting the application.
// Returns an error if the embedded application files cannot be read. // Returns an error if the configuration path does not exist or cannot be read.
func (t *AppTagger) Register() error { func (t *AppTagger) Register() error {
t.cfgPath = "./var/tagger/apps" if t.cfgPath == "" {
t.tagType = "app" t.cfgPath = defaultConfigPath
}
t.tagType = tagTypeApp
t.apps = make(map[string]appInfo, 0)
files, err := appFiles.ReadDir("apps") if !util.CheckFileExists(t.cfgPath) {
return fmt.Errorf("configuration path does not exist: %s", t.cfgPath)
}
files, err := os.ReadDir(t.cfgPath)
if err != nil { if err != nil {
return fmt.Errorf("error reading app folder: %#v", err) return fmt.Errorf("error reading app folder: %#v", err)
} }
t.apps = make(map[string]appInfo, 0)
for _, fn := range files { for _, fn := range files {
fns := fn.Name() fns := fn.Name()
cclog.Debugf("Process: %s", fns) cclog.Debugf("Process: %s", fns)
f, err := appFiles.Open(fmt.Sprintf("apps/%s", fns)) f, err := os.Open(filepath.Join(t.cfgPath, fns))
if err != nil { if err != nil {
return fmt.Errorf("error opening app file %s: %#v", fns, err) cclog.Errorf("error opening app file %s: %#v", fns, err)
continue
} }
defer f.Close()
t.scanApp(f, fns) t.scanApp(f, fns)
f.Close()
} }
if util.CheckFileExists(t.cfgPath) { cclog.Infof("Setup file watch for %s", t.cfgPath)
t.EventCallback() util.AddListener(t.cfgPath, t)
cclog.Infof("Setup file watch for %s", t.cfgPath)
util.AddListener(t.cfgPath, t)
}
return nil return nil
} }

View File

@@ -5,6 +5,8 @@
package tagger package tagger
import ( import (
"os"
"path/filepath"
"testing" "testing"
"github.com/ClusterCockpit/cc-backend/internal/repository" "github.com/ClusterCockpit/cc-backend/internal/repository"
@@ -29,28 +31,88 @@ func noErr(tb testing.TB, err error) {
} }
} }
func TestRegister(t *testing.T) { func setupAppTaggerTestDir(t *testing.T) string {
var tagger AppTagger t.Helper()
err := tagger.Register() testDir := t.TempDir()
appsDir := filepath.Join(testDir, "apps")
err := os.MkdirAll(appsDir, 0o755)
noErr(t, err) noErr(t, err)
srcDir := "../../configs/tagger/apps"
files, err := os.ReadDir(srcDir)
noErr(t, err)
for _, file := range files {
if file.IsDir() {
continue
}
srcPath := filepath.Join(srcDir, file.Name())
dstPath := filepath.Join(appsDir, file.Name())
data, err := os.ReadFile(srcPath)
noErr(t, err)
err = os.WriteFile(dstPath, data, 0o644)
noErr(t, err)
}
return appsDir
}
func TestRegister(t *testing.T) {
appsDir := setupAppTaggerTestDir(t)
var tagger AppTagger
tagger.cfgPath = appsDir
tagger.tagType = tagTypeApp
tagger.apps = make(map[string]appInfo, 0)
files, err := os.ReadDir(appsDir)
noErr(t, err)
for _, fn := range files {
if fn.IsDir() {
continue
}
fns := fn.Name()
f, err := os.Open(filepath.Join(appsDir, fns))
noErr(t, err)
tagger.scanApp(f, fns)
f.Close()
}
if len(tagger.apps) != 16 { if len(tagger.apps) != 16 {
t.Errorf("wrong summary for diagnostic \ngot: %d \nwant: 16", len(tagger.apps)) t.Errorf("wrong summary for diagnostic \ngot: %d \nwant: 16", len(tagger.apps))
} }
} }
func TestMatch(t *testing.T) { func TestMatch(t *testing.T) {
appsDir := setupAppTaggerTestDir(t)
r := setup(t) r := setup(t)
job, err := r.FindByIDDirect(317) job, err := r.FindByIDDirect(317)
noErr(t, err) noErr(t, err)
var tagger AppTagger var tagger AppTagger
tagger.cfgPath = appsDir
tagger.tagType = tagTypeApp
tagger.apps = make(map[string]appInfo, 0)
err = tagger.Register() files, err := os.ReadDir(appsDir)
noErr(t, err) noErr(t, err)
for _, fn := range files {
if fn.IsDir() {
continue
}
fns := fn.Name()
f, err := os.Open(filepath.Join(appsDir, fns))
noErr(t, err)
tagger.scanApp(f, fns)
f.Close()
}
tagger.Match(job) tagger.Match(job)
if !r.HasTag(317, "app", "vasp") { if !r.HasTag(317, "app", "vasp") {