diff --git a/internal/tagger/classifyJob.go b/internal/tagger/classifyJob.go index 0317f81..4e46f37 100644 --- a/internal/tagger/classifyJob.go +++ b/internal/tagger/classifyJob.go @@ -24,10 +24,14 @@ import ( ) //go:embed jobclasses/* -var jobclassFiles embed.FS +var jobClassFiles embed.FS +// Variable defines a named expression that can be computed and reused in rules. +// Variables are evaluated before the main rule and their results are added to the environment. type Variable struct { + // Name is the variable identifier used in rule expressions Name string `json:"name"` + // Expr is the expression to evaluate (must return a numeric value) Expr string `json:"expr"` } @@ -36,14 +40,25 @@ type ruleVariable struct { expr *vm.Program } +// RuleFormat defines the JSON structure for job classification rules. +// Each rule specifies requirements, metrics to analyze, variables to compute, +// and the final rule expression that determines if the job matches the classification. type RuleFormat struct { + // Name is a human-readable description of the rule Name string `json:"name"` + // Tag is the classification tag to apply if the rule matches Tag string `json:"tag"` + // Parameters are shared values referenced in the rule (e.g., thresholds) Parameters []string `json:"parameters"` + // Metrics are the job metrics required for this rule (e.g., "cpu_load", "mem_used") Metrics []string `json:"metrics"` + // Requirements are boolean expressions that must be true for the rule to apply Requirements []string `json:"requirements"` + // Variables are computed values used in the rule expression Variables []Variable `json:"variables"` + // Rule is the boolean expression that determines if the job matches Rule string `json:"rule"` + // Hint is a template string that generates a message when the rule matches Hint string `json:"hint"` } @@ -56,11 +71,35 @@ type ruleInfo struct { hint *template.Template } +// JobRepository defines the interface for job database operations needed by the tagger. +// This interface allows for easier testing and decoupling from the concrete repository implementation. +type JobRepository interface { + // HasTag checks if a job already has a specific tag + HasTag(jobId int64, tagType string, tagName string) bool + // AddTagOrCreateDirect adds a tag to a job or creates it if it doesn't exist + AddTagOrCreateDirect(jobId int64, tagType string, tagName string) (tagId int64, err error) + // UpdateMetadata updates job metadata with a key-value pair + UpdateMetadata(job *schema.Job, key, val string) (err error) +} + +// JobClassTagger classifies jobs based on configurable rules that evaluate job metrics and properties. +// Rules are loaded from embedded JSON files and can be dynamically reloaded from a watched directory. +// When a job matches a rule, it is tagged with the corresponding classification and an optional hint message. type JobClassTagger struct { - rules map[string]ruleInfo - parameters map[string]any - tagType string - cfgPath string + // rules maps classification tags to their compiled rule information + rules map[string]ruleInfo + // parameters are shared values (e.g., thresholds) used across multiple rules + parameters map[string]any + // tagType is the type of tag ("jobClass") + tagType string + // cfgPath is the path to watch for configuration changes + cfgPath string + // repo provides access to job database operations + repo JobRepository + // getStatistics retrieves job statistics for analysis + getStatistics func(job *schema.Job) (map[string]schema.JobStatistics, error) + // getMetricConfig retrieves metric configuration (limits) for a cluster + getMetricConfig func(cluster, subCluster string) map[string]*schema.Metric } func (t *JobClassTagger) prepareRule(b []byte, fns string) { @@ -127,10 +166,14 @@ func (t *JobClassTagger) prepareRule(b []byte, fns string) { t.rules[rule.Tag] = ri } +// EventMatch checks if a filesystem event should trigger configuration reload. +// It returns true if the event path contains "jobclasses". func (t *JobClassTagger) EventMatch(s string) bool { return strings.Contains(s, "jobclasses") } +// EventCallback is triggered when the configuration directory changes. +// It reloads parameters and all rule files from the watched directory. // FIXME: Only process the file that caused the event func (t *JobClassTagger) EventCallback() { files, err := os.ReadDir(t.cfgPath) @@ -170,7 +213,7 @@ func (t *JobClassTagger) EventCallback() { func (t *JobClassTagger) initParameters() error { cclog.Info("Initialize parameters") - b, err := jobclassFiles.ReadFile("jobclasses/parameters.json") + b, err := jobClassFiles.ReadFile("jobclasses/parameters.json") if err != nil { cclog.Warnf("prepareRule() > open file error: %v", err) return err @@ -184,6 +227,10 @@ func (t *JobClassTagger) initParameters() error { return nil } +// Register initializes the JobClassTagger by loading parameters and classification rules. +// It loads embedded configuration files and sets up a file watch on ./var/tagger/jobclasses +// if it exists, allowing for dynamic configuration updates without restarting the application. +// Returns an error if the embedded configuration files cannot be read or parsed. func (t *JobClassTagger) Register() error { t.cfgPath = "./var/tagger/jobclasses" t.tagType = "jobClass" @@ -194,18 +241,18 @@ func (t *JobClassTagger) Register() error { return err } - files, err := jobclassFiles.ReadDir("jobclasses") + files, err := jobClassFiles.ReadDir("jobclasses") if err != nil { return fmt.Errorf("error reading app folder: %#v", err) } - t.rules = make(map[string]ruleInfo, 0) + t.rules = make(map[string]ruleInfo) for _, fn := range files { fns := fn.Name() if fns != "parameters.json" { filename := fmt.Sprintf("jobclasses/%s", fns) cclog.Infof("Process: %s", fns) - b, err := jobclassFiles.ReadFile(filename) + b, err := jobClassFiles.ReadFile(filename) if err != nil { cclog.Warnf("prepareRule() > open file error: %v", err) return err @@ -220,13 +267,30 @@ func (t *JobClassTagger) Register() error { util.AddListener(t.cfgPath, t) } + t.repo = repository.GetJobRepository() + t.getStatistics = archive.GetStatistics + t.getMetricConfig = archive.GetMetricConfigSubCluster + return nil } +// Match evaluates all classification rules against a job and applies matching tags. +// It retrieves job statistics and metric configurations, then tests each rule's requirements +// and main expression. For each matching rule, it: +// - Applies the classification tag to the job +// - Generates and stores a hint message based on the rule's template +// +// The function constructs an evaluation environment containing: +// - Job properties (duration, cores, nodes, state, etc.) +// - Metric statistics (min, max, avg) and their configured limits +// - Shared parameters defined in parameters.json +// - Computed variables from the rule definition +// +// Rules are evaluated in arbitrary order. If multiple rules match, only the first +// encountered match is applied (FIXME: this should handle multiple matches). func (t *JobClassTagger) Match(job *schema.Job) { - r := repository.GetJobRepository() - jobstats, err := archive.GetStatistics(job) - metricsList := archive.GetMetricConfigSubCluster(job.Cluster, job.SubCluster) + jobStats, err := t.getStatistics(job) + metricsList := t.getMetricConfig(job.Cluster, job.SubCluster) cclog.Infof("Enter match rule with %d rules for job %d", len(t.rules), job.JobID) if err != nil { cclog.Errorf("job classification failed for job %d: %#v", job.JobID, err) @@ -251,7 +315,7 @@ func (t *JobClassTagger) Match(job *schema.Job) { // add metrics to env for _, m := range ri.metrics { - stats, ok := jobstats[m] + stats, ok := jobStats[m] if !ok { cclog.Errorf("job classification failed for job %d: missing metric '%s'", job.JobID, m) return @@ -302,8 +366,11 @@ func (t *JobClassTagger) Match(job *schema.Job) { if match.(bool) { cclog.Info("Rule matches!") id := *job.ID - if !r.HasTag(id, t.tagType, tag) { - r.AddTagOrCreateDirect(id, t.tagType, tag) + if !t.repo.HasTag(id, t.tagType, tag) { + _, err := t.repo.AddTagOrCreateDirect(id, t.tagType, tag) + if err != nil { + return + } } // process hint template @@ -314,7 +381,11 @@ func (t *JobClassTagger) Match(job *schema.Job) { } // FIXME: Handle case where multiple tags apply - r.UpdateMetadata(job, "message", msg.String()) + // FIXME: Handle case where multiple tags apply + err = t.repo.UpdateMetadata(job, "message", msg.String()) + if err != nil { + return + } } else { cclog.Info("Rule does not match!") } diff --git a/internal/tagger/classifyJob_test.go b/internal/tagger/classifyJob_test.go new file mode 100644 index 0000000..3795a60 --- /dev/null +++ b/internal/tagger/classifyJob_test.go @@ -0,0 +1,162 @@ +package tagger + +import ( + "testing" + + "github.com/ClusterCockpit/cc-lib/schema" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" +) + +// MockJobRepository is a mock implementation of the JobRepository interface +type MockJobRepository struct { + mock.Mock +} + +func (m *MockJobRepository) HasTag(jobId int64, tagType string, tagName string) bool { + args := m.Called(jobId, tagType, tagName) + return args.Bool(0) +} + +func (m *MockJobRepository) AddTagOrCreateDirect(jobId int64, tagType string, tagName string) (tagId int64, err error) { + args := m.Called(jobId, tagType, tagName) + return args.Get(0).(int64), args.Error(1) +} + +func (m *MockJobRepository) UpdateMetadata(job *schema.Job, key, val string) (err error) { + args := m.Called(job, key, val) + return args.Error(0) +} + +func TestPrepareRule(t *testing.T) { + tagger := &JobClassTagger{ + rules: make(map[string]ruleInfo), + parameters: make(map[string]any), + } + + // Valid rule JSON + validRule := []byte(`{ + "name": "Test Rule", + "tag": "test_tag", + "parameters": [], + "metrics": ["flops_any"], + "requirements": ["job.numNodes > 1"], + "variables": [{"name": "avg_flops", "expr": "flops_any.avg"}], + "rule": "avg_flops > 100", + "hint": "High FLOPS" + }`) + + tagger.prepareRule(validRule, "test_rule.json") + + assert.Contains(t, tagger.rules, "test_tag") + rule := tagger.rules["test_tag"] + assert.Equal(t, 1, len(rule.metrics)) + assert.Equal(t, 1, len(rule.requirements)) + assert.Equal(t, 1, len(rule.variables)) + assert.NotNil(t, rule.rule) + assert.NotNil(t, rule.hint) +} + +func TestClassifyJobMatch(t *testing.T) { + mockRepo := new(MockJobRepository) + tagger := &JobClassTagger{ + rules: make(map[string]ruleInfo), + parameters: make(map[string]any), + tagType: "jobClass", + repo: mockRepo, + getStatistics: func(job *schema.Job) (map[string]schema.JobStatistics, error) { + return map[string]schema.JobStatistics{ + "flops_any": {Min: 0, Max: 200, Avg: 150}, + }, nil + }, + getMetricConfig: func(cluster, subCluster string) map[string]*schema.Metric { + return map[string]*schema.Metric{ + "flops_any": {Peak: 1000, Normal: 100, Caution: 50, Alert: 10}, + } + }, + } + + // Add a rule manually or via prepareRule + validRule := []byte(`{ + "name": "Test Rule", + "tag": "high_flops", + "parameters": [], + "metrics": ["flops_any"], + "requirements": [], + "variables": [{"name": "avg_flops", "expr": "flops_any.avg"}], + "rule": "avg_flops > 100", + "hint": "High FLOPS: {{.avg_flops}}" + }`) + tagger.prepareRule(validRule, "test_rule.json") + + jobID := int64(123) + job := &schema.Job{ + ID: &jobID, + JobID: 123, + Cluster: "test_cluster", + SubCluster: "test_subcluster", + NumNodes: 2, + NumHWThreads: 4, + State: schema.JobStateCompleted, + } + + // Expectation: Rule matches + // 1. Check if tag exists (return false) + mockRepo.On("HasTag", jobID, "jobClass", "high_flops").Return(false) + // 2. Add tag + mockRepo.On("AddTagOrCreateDirect", jobID, "jobClass", "high_flops").Return(int64(1), nil) + // 3. Update metadata + mockRepo.On("UpdateMetadata", job, "message", mock.Anything).Return(nil) + + tagger.Match(job) + + mockRepo.AssertExpectations(t) +} + +func TestMatch_NoMatch(t *testing.T) { + mockRepo := new(MockJobRepository) + tagger := &JobClassTagger{ + rules: make(map[string]ruleInfo), + parameters: make(map[string]any), + tagType: "jobClass", + repo: mockRepo, + getStatistics: func(job *schema.Job) (map[string]schema.JobStatistics, error) { + return map[string]schema.JobStatistics{ + "flops_any": {Min: 0, Max: 50, Avg: 20}, // Avg 20 < 100 + }, nil + }, + getMetricConfig: func(cluster, subCluster string) map[string]*schema.Metric { + return map[string]*schema.Metric{ + "flops_any": {Peak: 1000, Normal: 100, Caution: 50, Alert: 10}, + } + }, + } + + validRule := []byte(`{ + "name": "Test Rule", + "tag": "high_flops", + "parameters": [], + "metrics": ["flops_any"], + "requirements": [], + "variables": [{"name": "avg_flops", "expr": "flops_any.avg"}], + "rule": "avg_flops > 100", + "hint": "High FLOPS" + }`) + tagger.prepareRule(validRule, "test_rule.json") + + jobID := int64(123) + job := &schema.Job{ + ID: &jobID, + JobID: 123, + Cluster: "test_cluster", + SubCluster: "test_subcluster", + NumNodes: 2, + NumHWThreads: 4, + State: schema.JobStateCompleted, + } + + // Expectation: Rule does NOT match, so no repo calls + tagger.Match(job) + + mockRepo.AssertExpectations(t) +} diff --git a/internal/tagger/detectApp.go b/internal/tagger/detectApp.go index c06fb72..4e8f858 100644 --- a/internal/tagger/detectApp.go +++ b/internal/tagger/detectApp.go @@ -2,6 +2,7 @@ // All rights reserved. This file is part of cc-backend. // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. + package tagger import ( @@ -28,9 +29,16 @@ type appInfo struct { strings []string } +// AppTagger detects applications by matching patterns in job scripts. +// It loads application patterns from embedded files and can dynamically reload +// configuration from a watched directory. When a job script matches a pattern, +// the corresponding application tag is automatically applied. type AppTagger struct { + // apps maps application tags to their matching patterns apps map[string]appInfo + // tagType is the type of tag ("app") tagType string + // cfgPath is the path to watch for configuration changes cfgPath string } @@ -45,10 +53,14 @@ func (t *AppTagger) scanApp(f fs.File, fns string) { t.apps[ai.tag] = ai } +// EventMatch checks if a filesystem event should trigger configuration reload. +// It returns true if the event path contains "apps". func (t *AppTagger) EventMatch(s string) bool { return strings.Contains(s, "apps") } +// EventCallback is triggered when the configuration directory changes. +// It reloads all application pattern files from the watched directory. // FIXME: Only process the file that caused the event func (t *AppTagger) EventCallback() { files, err := os.ReadDir(t.cfgPath) @@ -67,6 +79,10 @@ func (t *AppTagger) EventCallback() { } } +// Register initializes the AppTagger by loading application patterns from embedded files. +// It also sets up a file watch on ./var/tagger/apps if it exists, allowing for +// dynamic configuration updates without restarting the application. +// Returns an error if the embedded application files cannot be read. func (t *AppTagger) Register() error { t.cfgPath = "./var/tagger/apps" t.tagType = "app" @@ -96,6 +112,11 @@ func (t *AppTagger) Register() error { return nil } +// Match attempts to detect the application used by a job by analyzing its job script. +// It fetches the job metadata, extracts the job script, and matches it against +// all configured application patterns using regular expressions. +// If a match is found, the corresponding application tag is added to the job. +// Only the first matching application is tagged. func (t *AppTagger) Match(job *schema.Job) { r := repository.GetJobRepository() metadata, err := r.FetchMetadata(job) diff --git a/internal/tagger/tagger.go b/internal/tagger/tagger.go index af0ba19..7558914 100644 --- a/internal/tagger/tagger.go +++ b/internal/tagger/tagger.go @@ -2,6 +2,11 @@ // All rights reserved. This file is part of cc-backend. // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. + +// Package tagger provides automatic job tagging functionality for cc-backend. +// It supports detecting applications and classifying jobs based on configurable rules. +// Tags are automatically applied when jobs start or stop, or can be applied retroactively +// to existing jobs using RunTaggers. package tagger import ( @@ -12,8 +17,15 @@ import ( "github.com/ClusterCockpit/cc-lib/schema" ) +// Tagger is the interface that must be implemented by all tagging components. +// Taggers can be registered at job start or stop events to automatically apply tags. type Tagger interface { + // Register initializes the tagger and loads any required configuration. + // It should be called once before the tagger is used. Register() error + + // Match evaluates the tagger's rules against a job and applies appropriate tags. + // It is called for each job that needs to be evaluated. Match(job *schema.Job) } @@ -22,8 +34,12 @@ var ( jobTagger *JobTagger ) +// JobTagger coordinates multiple taggers that run at different job lifecycle events. +// It maintains separate lists of taggers that run when jobs start and when they stop. type JobTagger struct { + // startTaggers are applied when a job starts (e.g., application detection) startTaggers []Tagger + // stopTaggers are applied when a job completes (e.g., job classification) stopTaggers []Tagger } @@ -42,6 +58,9 @@ func newTagger() { } } +// Init initializes the job tagger system and registers it with the job repository. +// This function is safe to call multiple times; initialization only occurs once. +// It should be called during application startup. func Init() { initOnce.Do(func() { newTagger() @@ -49,18 +68,26 @@ func Init() { }) } +// JobStartCallback is called when a job starts. +// It runs all registered start taggers (e.g., application detection) on the job. func (jt *JobTagger) JobStartCallback(job *schema.Job) { for _, tagger := range jt.startTaggers { tagger.Match(job) } } +// JobStopCallback is called when a job completes. +// It runs all registered stop taggers (e.g., job classification) on the job. func (jt *JobTagger) JobStopCallback(job *schema.Job) { for _, tagger := range jt.stopTaggers { tagger.Match(job) } } +// RunTaggers applies all configured taggers to all existing jobs in the repository. +// This is useful for retroactively applying tags to jobs that were created before +// the tagger system was initialized or when new tagging rules are added. +// It fetches all jobs and runs both start and stop taggers on each one. func RunTaggers() error { newTagger() r := repository.GetJobRepository()