Refactor tagger package

Fix issues
Add documentation
Add unit tests
This commit is contained in:
2025-11-19 16:58:48 +01:00
parent dd63e7157a
commit 39a2157d46
4 changed files with 297 additions and 16 deletions

View File

@@ -24,10 +24,14 @@ import (
) )
//go:embed jobclasses/* //go:embed jobclasses/*
var jobclassFiles embed.FS var jobClassFiles embed.FS
// Variable defines a named expression that can be computed and reused in rules.
// Variables are evaluated before the main rule and their results are added to the environment.
type Variable struct { type Variable struct {
// Name is the variable identifier used in rule expressions
Name string `json:"name"` Name string `json:"name"`
// Expr is the expression to evaluate (must return a numeric value)
Expr string `json:"expr"` Expr string `json:"expr"`
} }
@@ -36,14 +40,25 @@ type ruleVariable struct {
expr *vm.Program expr *vm.Program
} }
// RuleFormat defines the JSON structure for job classification rules.
// Each rule specifies requirements, metrics to analyze, variables to compute,
// and the final rule expression that determines if the job matches the classification.
type RuleFormat struct { type RuleFormat struct {
// Name is a human-readable description of the rule
Name string `json:"name"` Name string `json:"name"`
// Tag is the classification tag to apply if the rule matches
Tag string `json:"tag"` Tag string `json:"tag"`
// Parameters are shared values referenced in the rule (e.g., thresholds)
Parameters []string `json:"parameters"` Parameters []string `json:"parameters"`
// Metrics are the job metrics required for this rule (e.g., "cpu_load", "mem_used")
Metrics []string `json:"metrics"` Metrics []string `json:"metrics"`
// Requirements are boolean expressions that must be true for the rule to apply
Requirements []string `json:"requirements"` Requirements []string `json:"requirements"`
// Variables are computed values used in the rule expression
Variables []Variable `json:"variables"` Variables []Variable `json:"variables"`
// Rule is the boolean expression that determines if the job matches
Rule string `json:"rule"` Rule string `json:"rule"`
// Hint is a template string that generates a message when the rule matches
Hint string `json:"hint"` Hint string `json:"hint"`
} }
@@ -56,11 +71,35 @@ type ruleInfo struct {
hint *template.Template hint *template.Template
} }
// JobRepository defines the interface for job database operations needed by the tagger.
// This interface allows for easier testing and decoupling from the concrete repository implementation.
type JobRepository interface {
// HasTag checks if a job already has a specific tag
HasTag(jobId int64, tagType string, tagName string) bool
// AddTagOrCreateDirect adds a tag to a job or creates it if it doesn't exist
AddTagOrCreateDirect(jobId int64, tagType string, tagName string) (tagId int64, err error)
// UpdateMetadata updates job metadata with a key-value pair
UpdateMetadata(job *schema.Job, key, val string) (err error)
}
// JobClassTagger classifies jobs based on configurable rules that evaluate job metrics and properties.
// Rules are loaded from embedded JSON files and can be dynamically reloaded from a watched directory.
// When a job matches a rule, it is tagged with the corresponding classification and an optional hint message.
type JobClassTagger struct { type JobClassTagger struct {
rules map[string]ruleInfo // rules maps classification tags to their compiled rule information
parameters map[string]any rules map[string]ruleInfo
tagType string // parameters are shared values (e.g., thresholds) used across multiple rules
cfgPath string parameters map[string]any
// tagType is the type of tag ("jobClass")
tagType string
// cfgPath is the path to watch for configuration changes
cfgPath string
// repo provides access to job database operations
repo JobRepository
// getStatistics retrieves job statistics for analysis
getStatistics func(job *schema.Job) (map[string]schema.JobStatistics, error)
// getMetricConfig retrieves metric configuration (limits) for a cluster
getMetricConfig func(cluster, subCluster string) map[string]*schema.Metric
} }
func (t *JobClassTagger) prepareRule(b []byte, fns string) { func (t *JobClassTagger) prepareRule(b []byte, fns string) {
@@ -127,10 +166,14 @@ func (t *JobClassTagger) prepareRule(b []byte, fns string) {
t.rules[rule.Tag] = ri t.rules[rule.Tag] = ri
} }
// EventMatch checks if a filesystem event should trigger configuration reload.
// It returns true if the event path contains "jobclasses".
func (t *JobClassTagger) EventMatch(s string) bool { func (t *JobClassTagger) EventMatch(s string) bool {
return strings.Contains(s, "jobclasses") return strings.Contains(s, "jobclasses")
} }
// EventCallback is triggered when the configuration directory changes.
// It reloads parameters and all rule files from the watched directory.
// FIXME: Only process the file that caused the event // FIXME: Only process the file that caused the event
func (t *JobClassTagger) EventCallback() { func (t *JobClassTagger) EventCallback() {
files, err := os.ReadDir(t.cfgPath) files, err := os.ReadDir(t.cfgPath)
@@ -170,7 +213,7 @@ func (t *JobClassTagger) EventCallback() {
func (t *JobClassTagger) initParameters() error { func (t *JobClassTagger) initParameters() error {
cclog.Info("Initialize parameters") cclog.Info("Initialize parameters")
b, err := jobclassFiles.ReadFile("jobclasses/parameters.json") b, err := jobClassFiles.ReadFile("jobclasses/parameters.json")
if err != nil { if err != nil {
cclog.Warnf("prepareRule() > open file error: %v", err) cclog.Warnf("prepareRule() > open file error: %v", err)
return err return err
@@ -184,6 +227,10 @@ func (t *JobClassTagger) initParameters() error {
return nil return nil
} }
// Register initializes the JobClassTagger by loading parameters and classification rules.
// It loads embedded configuration files and sets up a file watch on ./var/tagger/jobclasses
// if it exists, allowing for dynamic configuration updates without restarting the application.
// Returns an error if the embedded configuration files cannot be read or parsed.
func (t *JobClassTagger) Register() error { func (t *JobClassTagger) Register() error {
t.cfgPath = "./var/tagger/jobclasses" t.cfgPath = "./var/tagger/jobclasses"
t.tagType = "jobClass" t.tagType = "jobClass"
@@ -194,18 +241,18 @@ func (t *JobClassTagger) Register() error {
return err return err
} }
files, err := jobclassFiles.ReadDir("jobclasses") files, err := jobClassFiles.ReadDir("jobclasses")
if err != nil { if err != nil {
return fmt.Errorf("error reading app folder: %#v", err) return fmt.Errorf("error reading app folder: %#v", err)
} }
t.rules = make(map[string]ruleInfo, 0) t.rules = make(map[string]ruleInfo)
for _, fn := range files { for _, fn := range files {
fns := fn.Name() fns := fn.Name()
if fns != "parameters.json" { if fns != "parameters.json" {
filename := fmt.Sprintf("jobclasses/%s", fns) filename := fmt.Sprintf("jobclasses/%s", fns)
cclog.Infof("Process: %s", fns) cclog.Infof("Process: %s", fns)
b, err := jobclassFiles.ReadFile(filename) b, err := jobClassFiles.ReadFile(filename)
if err != nil { if err != nil {
cclog.Warnf("prepareRule() > open file error: %v", err) cclog.Warnf("prepareRule() > open file error: %v", err)
return err return err
@@ -220,13 +267,30 @@ func (t *JobClassTagger) Register() error {
util.AddListener(t.cfgPath, t) util.AddListener(t.cfgPath, t)
} }
t.repo = repository.GetJobRepository()
t.getStatistics = archive.GetStatistics
t.getMetricConfig = archive.GetMetricConfigSubCluster
return nil return nil
} }
// Match evaluates all classification rules against a job and applies matching tags.
// It retrieves job statistics and metric configurations, then tests each rule's requirements
// and main expression. For each matching rule, it:
// - Applies the classification tag to the job
// - Generates and stores a hint message based on the rule's template
//
// The function constructs an evaluation environment containing:
// - Job properties (duration, cores, nodes, state, etc.)
// - Metric statistics (min, max, avg) and their configured limits
// - Shared parameters defined in parameters.json
// - Computed variables from the rule definition
//
// Rules are evaluated in arbitrary order. If multiple rules match, only the first
// encountered match is applied (FIXME: this should handle multiple matches).
func (t *JobClassTagger) Match(job *schema.Job) { func (t *JobClassTagger) Match(job *schema.Job) {
r := repository.GetJobRepository() jobStats, err := t.getStatistics(job)
jobstats, err := archive.GetStatistics(job) metricsList := t.getMetricConfig(job.Cluster, job.SubCluster)
metricsList := archive.GetMetricConfigSubCluster(job.Cluster, job.SubCluster)
cclog.Infof("Enter match rule with %d rules for job %d", len(t.rules), job.JobID) cclog.Infof("Enter match rule with %d rules for job %d", len(t.rules), job.JobID)
if err != nil { if err != nil {
cclog.Errorf("job classification failed for job %d: %#v", job.JobID, err) cclog.Errorf("job classification failed for job %d: %#v", job.JobID, err)
@@ -251,7 +315,7 @@ func (t *JobClassTagger) Match(job *schema.Job) {
// add metrics to env // add metrics to env
for _, m := range ri.metrics { for _, m := range ri.metrics {
stats, ok := jobstats[m] stats, ok := jobStats[m]
if !ok { if !ok {
cclog.Errorf("job classification failed for job %d: missing metric '%s'", job.JobID, m) cclog.Errorf("job classification failed for job %d: missing metric '%s'", job.JobID, m)
return return
@@ -302,8 +366,11 @@ func (t *JobClassTagger) Match(job *schema.Job) {
if match.(bool) { if match.(bool) {
cclog.Info("Rule matches!") cclog.Info("Rule matches!")
id := *job.ID id := *job.ID
if !r.HasTag(id, t.tagType, tag) { if !t.repo.HasTag(id, t.tagType, tag) {
r.AddTagOrCreateDirect(id, t.tagType, tag) _, err := t.repo.AddTagOrCreateDirect(id, t.tagType, tag)
if err != nil {
return
}
} }
// process hint template // process hint template
@@ -314,7 +381,11 @@ func (t *JobClassTagger) Match(job *schema.Job) {
} }
// FIXME: Handle case where multiple tags apply // FIXME: Handle case where multiple tags apply
r.UpdateMetadata(job, "message", msg.String()) // FIXME: Handle case where multiple tags apply
err = t.repo.UpdateMetadata(job, "message", msg.String())
if err != nil {
return
}
} else { } else {
cclog.Info("Rule does not match!") cclog.Info("Rule does not match!")
} }

View File

@@ -0,0 +1,162 @@
package tagger
import (
"testing"
"github.com/ClusterCockpit/cc-lib/schema"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)
// MockJobRepository is a mock implementation of the JobRepository interface
type MockJobRepository struct {
mock.Mock
}
func (m *MockJobRepository) HasTag(jobId int64, tagType string, tagName string) bool {
args := m.Called(jobId, tagType, tagName)
return args.Bool(0)
}
func (m *MockJobRepository) AddTagOrCreateDirect(jobId int64, tagType string, tagName string) (tagId int64, err error) {
args := m.Called(jobId, tagType, tagName)
return args.Get(0).(int64), args.Error(1)
}
func (m *MockJobRepository) UpdateMetadata(job *schema.Job, key, val string) (err error) {
args := m.Called(job, key, val)
return args.Error(0)
}
func TestPrepareRule(t *testing.T) {
tagger := &JobClassTagger{
rules: make(map[string]ruleInfo),
parameters: make(map[string]any),
}
// Valid rule JSON
validRule := []byte(`{
"name": "Test Rule",
"tag": "test_tag",
"parameters": [],
"metrics": ["flops_any"],
"requirements": ["job.numNodes > 1"],
"variables": [{"name": "avg_flops", "expr": "flops_any.avg"}],
"rule": "avg_flops > 100",
"hint": "High FLOPS"
}`)
tagger.prepareRule(validRule, "test_rule.json")
assert.Contains(t, tagger.rules, "test_tag")
rule := tagger.rules["test_tag"]
assert.Equal(t, 1, len(rule.metrics))
assert.Equal(t, 1, len(rule.requirements))
assert.Equal(t, 1, len(rule.variables))
assert.NotNil(t, rule.rule)
assert.NotNil(t, rule.hint)
}
func TestClassifyJobMatch(t *testing.T) {
mockRepo := new(MockJobRepository)
tagger := &JobClassTagger{
rules: make(map[string]ruleInfo),
parameters: make(map[string]any),
tagType: "jobClass",
repo: mockRepo,
getStatistics: func(job *schema.Job) (map[string]schema.JobStatistics, error) {
return map[string]schema.JobStatistics{
"flops_any": {Min: 0, Max: 200, Avg: 150},
}, nil
},
getMetricConfig: func(cluster, subCluster string) map[string]*schema.Metric {
return map[string]*schema.Metric{
"flops_any": {Peak: 1000, Normal: 100, Caution: 50, Alert: 10},
}
},
}
// Add a rule manually or via prepareRule
validRule := []byte(`{
"name": "Test Rule",
"tag": "high_flops",
"parameters": [],
"metrics": ["flops_any"],
"requirements": [],
"variables": [{"name": "avg_flops", "expr": "flops_any.avg"}],
"rule": "avg_flops > 100",
"hint": "High FLOPS: {{.avg_flops}}"
}`)
tagger.prepareRule(validRule, "test_rule.json")
jobID := int64(123)
job := &schema.Job{
ID: &jobID,
JobID: 123,
Cluster: "test_cluster",
SubCluster: "test_subcluster",
NumNodes: 2,
NumHWThreads: 4,
State: schema.JobStateCompleted,
}
// Expectation: Rule matches
// 1. Check if tag exists (return false)
mockRepo.On("HasTag", jobID, "jobClass", "high_flops").Return(false)
// 2. Add tag
mockRepo.On("AddTagOrCreateDirect", jobID, "jobClass", "high_flops").Return(int64(1), nil)
// 3. Update metadata
mockRepo.On("UpdateMetadata", job, "message", mock.Anything).Return(nil)
tagger.Match(job)
mockRepo.AssertExpectations(t)
}
func TestMatch_NoMatch(t *testing.T) {
mockRepo := new(MockJobRepository)
tagger := &JobClassTagger{
rules: make(map[string]ruleInfo),
parameters: make(map[string]any),
tagType: "jobClass",
repo: mockRepo,
getStatistics: func(job *schema.Job) (map[string]schema.JobStatistics, error) {
return map[string]schema.JobStatistics{
"flops_any": {Min: 0, Max: 50, Avg: 20}, // Avg 20 < 100
}, nil
},
getMetricConfig: func(cluster, subCluster string) map[string]*schema.Metric {
return map[string]*schema.Metric{
"flops_any": {Peak: 1000, Normal: 100, Caution: 50, Alert: 10},
}
},
}
validRule := []byte(`{
"name": "Test Rule",
"tag": "high_flops",
"parameters": [],
"metrics": ["flops_any"],
"requirements": [],
"variables": [{"name": "avg_flops", "expr": "flops_any.avg"}],
"rule": "avg_flops > 100",
"hint": "High FLOPS"
}`)
tagger.prepareRule(validRule, "test_rule.json")
jobID := int64(123)
job := &schema.Job{
ID: &jobID,
JobID: 123,
Cluster: "test_cluster",
SubCluster: "test_subcluster",
NumNodes: 2,
NumHWThreads: 4,
State: schema.JobStateCompleted,
}
// Expectation: Rule does NOT match, so no repo calls
tagger.Match(job)
mockRepo.AssertExpectations(t)
}

View File

@@ -2,6 +2,7 @@
// All rights reserved. This file is part of cc-backend. // All rights reserved. This file is part of cc-backend.
// Use of this source code is governed by a MIT-style // Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file. // license that can be found in the LICENSE file.
package tagger package tagger
import ( import (
@@ -28,9 +29,16 @@ type appInfo struct {
strings []string strings []string
} }
// AppTagger detects applications by matching patterns in job scripts.
// It loads application patterns from embedded files and can dynamically reload
// configuration from a watched directory. When a job script matches a pattern,
// the corresponding application tag is automatically applied.
type AppTagger struct { type AppTagger struct {
// apps maps application tags to their matching patterns
apps map[string]appInfo apps map[string]appInfo
// tagType is the type of tag ("app")
tagType string tagType string
// cfgPath is the path to watch for configuration changes
cfgPath string cfgPath string
} }
@@ -45,10 +53,14 @@ func (t *AppTagger) scanApp(f fs.File, fns string) {
t.apps[ai.tag] = ai t.apps[ai.tag] = ai
} }
// EventMatch checks if a filesystem event should trigger configuration reload.
// It returns true if the event path contains "apps".
func (t *AppTagger) EventMatch(s string) bool { func (t *AppTagger) EventMatch(s string) bool {
return strings.Contains(s, "apps") return strings.Contains(s, "apps")
} }
// EventCallback is triggered when the configuration directory changes.
// It reloads all application pattern files from the watched directory.
// FIXME: Only process the file that caused the event // FIXME: Only process the file that caused the event
func (t *AppTagger) EventCallback() { func (t *AppTagger) EventCallback() {
files, err := os.ReadDir(t.cfgPath) files, err := os.ReadDir(t.cfgPath)
@@ -67,6 +79,10 @@ func (t *AppTagger) EventCallback() {
} }
} }
// Register initializes the AppTagger by loading application patterns from embedded files.
// It also sets up a file watch on ./var/tagger/apps if it exists, allowing for
// dynamic configuration updates without restarting the application.
// Returns an error if the embedded application files cannot be read.
func (t *AppTagger) Register() error { func (t *AppTagger) Register() error {
t.cfgPath = "./var/tagger/apps" t.cfgPath = "./var/tagger/apps"
t.tagType = "app" t.tagType = "app"
@@ -96,6 +112,11 @@ func (t *AppTagger) Register() error {
return nil return nil
} }
// Match attempts to detect the application used by a job by analyzing its job script.
// It fetches the job metadata, extracts the job script, and matches it against
// all configured application patterns using regular expressions.
// If a match is found, the corresponding application tag is added to the job.
// Only the first matching application is tagged.
func (t *AppTagger) Match(job *schema.Job) { func (t *AppTagger) Match(job *schema.Job) {
r := repository.GetJobRepository() r := repository.GetJobRepository()
metadata, err := r.FetchMetadata(job) metadata, err := r.FetchMetadata(job)

View File

@@ -2,6 +2,11 @@
// All rights reserved. This file is part of cc-backend. // All rights reserved. This file is part of cc-backend.
// Use of this source code is governed by a MIT-style // Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file. // license that can be found in the LICENSE file.
// Package tagger provides automatic job tagging functionality for cc-backend.
// It supports detecting applications and classifying jobs based on configurable rules.
// Tags are automatically applied when jobs start or stop, or can be applied retroactively
// to existing jobs using RunTaggers.
package tagger package tagger
import ( import (
@@ -12,8 +17,15 @@ import (
"github.com/ClusterCockpit/cc-lib/schema" "github.com/ClusterCockpit/cc-lib/schema"
) )
// Tagger is the interface that must be implemented by all tagging components.
// Taggers can be registered at job start or stop events to automatically apply tags.
type Tagger interface { type Tagger interface {
// Register initializes the tagger and loads any required configuration.
// It should be called once before the tagger is used.
Register() error Register() error
// Match evaluates the tagger's rules against a job and applies appropriate tags.
// It is called for each job that needs to be evaluated.
Match(job *schema.Job) Match(job *schema.Job)
} }
@@ -22,8 +34,12 @@ var (
jobTagger *JobTagger jobTagger *JobTagger
) )
// JobTagger coordinates multiple taggers that run at different job lifecycle events.
// It maintains separate lists of taggers that run when jobs start and when they stop.
type JobTagger struct { type JobTagger struct {
// startTaggers are applied when a job starts (e.g., application detection)
startTaggers []Tagger startTaggers []Tagger
// stopTaggers are applied when a job completes (e.g., job classification)
stopTaggers []Tagger stopTaggers []Tagger
} }
@@ -42,6 +58,9 @@ func newTagger() {
} }
} }
// Init initializes the job tagger system and registers it with the job repository.
// This function is safe to call multiple times; initialization only occurs once.
// It should be called during application startup.
func Init() { func Init() {
initOnce.Do(func() { initOnce.Do(func() {
newTagger() newTagger()
@@ -49,18 +68,26 @@ func Init() {
}) })
} }
// JobStartCallback is called when a job starts.
// It runs all registered start taggers (e.g., application detection) on the job.
func (jt *JobTagger) JobStartCallback(job *schema.Job) { func (jt *JobTagger) JobStartCallback(job *schema.Job) {
for _, tagger := range jt.startTaggers { for _, tagger := range jt.startTaggers {
tagger.Match(job) tagger.Match(job)
} }
} }
// JobStopCallback is called when a job completes.
// It runs all registered stop taggers (e.g., job classification) on the job.
func (jt *JobTagger) JobStopCallback(job *schema.Job) { func (jt *JobTagger) JobStopCallback(job *schema.Job) {
for _, tagger := range jt.stopTaggers { for _, tagger := range jt.stopTaggers {
tagger.Match(job) tagger.Match(job)
} }
} }
// RunTaggers applies all configured taggers to all existing jobs in the repository.
// This is useful for retroactively applying tags to jobs that were created before
// the tagger system was initialized or when new tagging rules are added.
// It fetches all jobs and runs both start and stop taggers on each one.
func RunTaggers() error { func RunTaggers() error {
newTagger() newTagger()
r := repository.GetJobRepository() r := repository.GetJobRepository()