From ca634bb70741697f7f34cd21df998d1100b2ff76 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Thu, 22 May 2025 07:10:41 +0200 Subject: [PATCH] Refactor taggers. Refine Job Hooks. Start job classifier --- go.mod | 1 + go.sum | 2 + internal/api/rest.go | 2 - internal/archiver/archiveWorker.go | 7 ++ internal/repository/jobHooks.go | 4 +- internal/tagger/classifyJob.go | 121 +++++++++++++++++++++ internal/tagger/detectApp.go | 35 +++--- internal/tagger/detectApp_test.go | 8 +- internal/tagger/jobclasses/highload.json | 38 +++++++ internal/tagger/jobclasses/highmem.json | 40 +++++++ internal/tagger/jobclasses/lowgpuload.json | 36 ++++++ internal/tagger/jobclasses/lowload.json | 38 +++++++ internal/tagger/tagger.go | 6 +- pkg/archive/fsBackend.go | 39 +------ 14 files changed, 316 insertions(+), 61 deletions(-) create mode 100644 internal/tagger/classifyJob.go create mode 100644 internal/tagger/jobclasses/highload.json create mode 100644 internal/tagger/jobclasses/highmem.json create mode 100644 internal/tagger/jobclasses/lowgpuload.json create mode 100644 internal/tagger/jobclasses/lowload.json diff --git a/go.mod b/go.mod index f17ec18..062ee3e 100644 --- a/go.mod +++ b/go.mod @@ -43,6 +43,7 @@ require ( github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.6 // indirect + github.com/expr-lang/expr v1.17.3 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect github.com/fsnotify/fsnotify v1.9.0 // indirect github.com/go-asn1-ber/asn1-ber v1.5.7 // indirect diff --git a/go.sum b/go.sum index 57b1649..b4c3781 100644 --- a/go.sum +++ b/go.sum @@ -53,6 +53,8 @@ github.com/docker/go-connections v0.5.0 h1:USnMq7hx7gwdVZq1L49hLXaFtUdTADjXGp+uj github.com/docker/go-connections v0.5.0/go.mod h1:ov60Kzw0kKElRwhNs9UlUHAE/F9Fe6GLaXnqyDdmEXc= github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4= github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= +github.com/expr-lang/expr v1.17.3 h1:myeTTuDFz7k6eFe/JPlep/UsiIjVhG61FMHFu63U7j0= +github.com/expr-lang/expr v1.17.3/go.mod h1:8/vRC7+7HBzESEqt5kKpYXxrxkr31SaO8r40VO/1IT4= github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S9k= diff --git a/internal/api/rest.go b/internal/api/rest.go index 6133a5e..fe35942 100644 --- a/internal/api/rest.go +++ b/internal/api/rest.go @@ -1126,8 +1126,6 @@ func (api *RestApi) checkAndHandleStopJob(rw http.ResponseWriter, job *schema.Jo return } - repository.CallJobStopHooks(job) - // Trigger async archiving archiver.TriggerArchiving(job) } diff --git a/internal/archiver/archiveWorker.go b/internal/archiver/archiveWorker.go index 628e36e..6e514cb 100644 --- a/internal/archiver/archiveWorker.go +++ b/internal/archiver/archiveWorker.go @@ -72,7 +72,14 @@ func archivingWorker() { } log.Debugf("archiving job %d took %s", job.JobID, time.Since(start)) log.Printf("archiving job (dbid: %d) successful", job.ID) + + id := job.ID + jobMeta.ID = &id + + repository.CallJobStopHooks(jobMeta) archivePending.Done() + default: + continue } } } diff --git a/internal/repository/jobHooks.go b/internal/repository/jobHooks.go index 1016335..49535f7 100644 --- a/internal/repository/jobHooks.go +++ b/internal/repository/jobHooks.go @@ -12,7 +12,7 @@ import ( type JobHook interface { JobStartCallback(job *schema.Job) - JobStopCallback(job *schema.Job) + JobStopCallback(job *schema.JobMeta) } var ( @@ -44,7 +44,7 @@ func CallJobStartHooks(jobs []*schema.Job) { } } -func CallJobStopHooks(job *schema.Job) { +func CallJobStopHooks(job *schema.JobMeta) { if hooks == nil { return } diff --git a/internal/tagger/classifyJob.go b/internal/tagger/classifyJob.go new file mode 100644 index 0000000..ec1e843 --- /dev/null +++ b/internal/tagger/classifyJob.go @@ -0,0 +1,121 @@ +// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. This file is part of cc-backend. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. +package tagger + +import ( + "bytes" + "embed" + "fmt" + "io/fs" + "os" + "path/filepath" + "strings" + + "github.com/ClusterCockpit/cc-backend/internal/repository" + "github.com/ClusterCockpit/cc-backend/internal/util" + "github.com/ClusterCockpit/cc-backend/pkg/log" + "github.com/ClusterCockpit/cc-backend/pkg/schema" + "github.com/expr-lang/expr" + "github.com/expr-lang/expr/vm" +) + +//go:embed jobclasses/* +var jobclassFiles embed.FS + +type ruleInfo struct { + tag string + rule *vm.Program +} + +type JobClassTagger struct { + rules map[string]ruleInfo + tagType string + cfgPath string +} + +func (t *JobClassTagger) compileRule(f fs.File, fns string) { + buf := new(bytes.Buffer) + _, err := buf.ReadFrom(f) + if err != nil { + log.Errorf("error reading rule file %s: %#v", fns, err) + } + prg, err := expr.Compile(buf.String(), expr.AsBool()) + if err != nil { + log.Errorf("error compiling rule %s: %#v", fns, err) + } + ri := ruleInfo{tag: strings.TrimSuffix(fns, filepath.Ext(fns)), rule: prg} + + delete(t.rules, ri.tag) + t.rules[ri.tag] = ri +} + +func (t *JobClassTagger) EventMatch(s string) bool { + return strings.Contains(s, "jobclasses") +} + +// FIXME: Only process the file that caused the event +func (t *JobClassTagger) EventCallback() { + files, err := os.ReadDir(t.cfgPath) + if err != nil { + log.Fatal(err) + } + + for _, fn := range files { + fns := fn.Name() + log.Debugf("Process: %s", fns) + f, err := os.Open(fmt.Sprintf("%s/%s", t.cfgPath, fns)) + if err != nil { + log.Errorf("error opening app file %s: %#v", fns, err) + } + t.compileRule(f, fns) + } +} + +func (t *JobClassTagger) Register() error { + t.cfgPath = "./var/tagger/jobclasses" + t.tagType = "jobClass" + + files, err := appFiles.ReadDir("jobclasses") + if err != nil { + return fmt.Errorf("error reading app folder: %#v", err) + } + t.rules = make(map[string]ruleInfo, 0) + for _, fn := range files { + fns := fn.Name() + log.Debugf("Process: %s", fns) + f, err := appFiles.Open(fmt.Sprintf("apps/%s", fns)) + if err != nil { + return fmt.Errorf("error opening app file %s: %#v", fns, err) + } + defer f.Close() + t.compileRule(f, fns) + } + + if util.CheckFileExists(t.cfgPath) { + t.EventCallback() + log.Infof("Setup file watch for %s", t.cfgPath) + util.AddListener(t.cfgPath, t) + } + + return nil +} + +func (t *JobClassTagger) Match(job *schema.JobMeta) { + r := repository.GetJobRepository() + + for _, ri := range t.rules { + tag := ri.tag + output, err := expr.Run(ri.rule, job) + if err != nil { + log.Errorf("error running rule %s: %#v", tag, err) + } + if output.(bool) { + id := job.ID + if !r.HasTag(*id, t.tagType, tag) { + r.AddTagOrCreateDirect(*id, t.tagType, tag) + } + } + } +} diff --git a/internal/tagger/detectApp.go b/internal/tagger/detectApp.go index d3d797d..8057aad 100644 --- a/internal/tagger/detectApp.go +++ b/internal/tagger/detectApp.go @@ -19,11 +19,6 @@ import ( "github.com/ClusterCockpit/cc-backend/pkg/schema" ) -const ( - tagType = "app" - appPath = "./var/tagger/apps" -) - //go:embed apps/* var appFiles embed.FS @@ -33,7 +28,9 @@ type appInfo struct { } type AppTagger struct { - apps map[string]appInfo + apps map[string]appInfo + tagType string + cfgPath string } func (t *AppTagger) scanApp(f fs.File, fns string) { @@ -53,7 +50,7 @@ func (t *AppTagger) EventMatch(s string) bool { // FIXME: Only process the file that caused the event func (t *AppTagger) EventCallback() { - files, err := os.ReadDir(appPath) + files, err := os.ReadDir(t.cfgPath) if err != nil { log.Fatal(err) } @@ -61,7 +58,7 @@ func (t *AppTagger) EventCallback() { for _, fn := range files { fns := fn.Name() log.Debugf("Process: %s", fns) - f, err := os.Open(fmt.Sprintf("%s/%s", appPath, fns)) + f, err := os.Open(fmt.Sprintf("%s/%s", t.cfgPath, fns)) if err != nil { log.Errorf("error opening app file %s: %#v", fns, err) } @@ -70,6 +67,9 @@ func (t *AppTagger) EventCallback() { } func (t *AppTagger) Register() error { + t.cfgPath = "./var/tagger/apps" + t.tagType = "app" + files, err := appFiles.ReadDir("apps") if err != nil { return fmt.Errorf("error reading app folder: %#v", err) @@ -79,28 +79,25 @@ func (t *AppTagger) Register() error { fns := fn.Name() log.Debugf("Process: %s", fns) f, err := appFiles.Open(fmt.Sprintf("apps/%s", fns)) + defer f.Close() if err != nil { return fmt.Errorf("error opening app file %s: %#v", fns, err) } t.scanApp(f, fns) } - if util.CheckFileExists(appPath) { + if util.CheckFileExists(t.cfgPath) { t.EventCallback() - log.Infof("Setup file watch for %s", appPath) - util.AddListener(appPath, t) + log.Infof("Setup file watch for %s", t.cfgPath) + util.AddListener(t.cfgPath, t) } return nil } -func (t *AppTagger) Match(job *schema.Job) { +func (t *AppTagger) Match(job *schema.JobMeta) { r := repository.GetJobRepository() - meta, err := r.FetchMetadata(job) - if err != nil { - log.Error("cannot fetch meta data") - } - jobscript, ok := meta["jobScript"] + jobscript, ok := job.MetaData["jobScript"] if ok { id := job.ID @@ -109,8 +106,8 @@ func (t *AppTagger) Match(job *schema.Job) { tag := a.tag for _, s := range a.strings { if strings.Contains(jobscript, s) { - if !r.HasTag(id, tagType, tag) { - r.AddTagOrCreateDirect(id, tagType, tag) + if !r.HasTag(*id, t.tagType, tag) { + r.AddTagOrCreateDirect(*id, t.tagType, tag) break out } } diff --git a/internal/tagger/detectApp_test.go b/internal/tagger/detectApp_test.go index 3b43cce..56bd856 100644 --- a/internal/tagger/detectApp_test.go +++ b/internal/tagger/detectApp_test.go @@ -9,6 +9,7 @@ import ( "github.com/ClusterCockpit/cc-backend/internal/repository" "github.com/ClusterCockpit/cc-backend/pkg/log" + "github.com/ClusterCockpit/cc-backend/pkg/schema" ) func setup(tb testing.TB) *repository.JobRepository { @@ -51,7 +52,12 @@ func TestMatch(t *testing.T) { err = tagger.Register() noErr(t, err) - tagger.Match(job) + jobMeta := &schema.JobMeta{ + ID: &job.ID, + BaseJob: job.BaseJob, + StartTime: job.StartTime.Unix(), + } + tagger.Match(jobMeta) if !r.HasTag(5, "app", "vasp") { t.Errorf("missing tag vasp") diff --git a/internal/tagger/jobclasses/highload.json b/internal/tagger/jobclasses/highload.json new file mode 100644 index 0000000..a65f400 --- /dev/null +++ b/internal/tagger/jobclasses/highload.json @@ -0,0 +1,38 @@ +{ + "name": "Excessive CPU load", + "tag": "excessiveload", + "comment": "Assumptions: all nodes have the same number of cores.", + "parameters": [ + "excessivecpuload_threshold_factor", + "job_min_duration_seconds", + "sampling_interval_seconds" + ], + "metrics": [ + "cpu_load" + ], + "requirements": [ + "job.exclusive == 1", + "job.duration > job_min_duration_seconds", + "required_metrics_min_samples > job_min_duration_seconds / sampling_interval_seconds" + ], + "terms": [ + { + "load_mean": "cpu_load[cpu_load_pre_cutoff_samples].mean('all')" + }, + { + "load_threshold": "(job.numHwthreads/job.numNodes) * excessivecpuload_threshold_factor" + }, + { + "highload_nodes": "load_mean > load_threshold" + }, + { + "highload": "highload_nodes.any('all')" + }, + { + "load_perc": "load_mean / load_threshold" + } + ], + "output": "highload", + "output_scalar": "load_perc", + "template": "Job ({{ job.jobId }})\nThis job was detected as excessiveload because the mean cpu load {{ load_mean.array }} falls above the threshold {{ load_threshold }}." +} diff --git a/internal/tagger/jobclasses/highmem.json b/internal/tagger/jobclasses/highmem.json new file mode 100644 index 0000000..69ffcf3 --- /dev/null +++ b/internal/tagger/jobclasses/highmem.json @@ -0,0 +1,40 @@ +{ + "name": "High memory usage", + "tag": "high_memory_load", + "parameters": [ + "high_memory_load_threshold_factor", + "job_min_duration_seconds", + "sampling_interval_seconds" + ], + "metrics": [ + "mem_used" + ], + "requirements": [ + "job.duration > job_min_duration_seconds", + "required_metrics_min_samples > job_min_duration_seconds / sampling_interval_seconds", + "hasattr(job, \"allocated_memory\")" + ], + "terms": [ + { + "memory_alloc": "job.allocated_memory" + }, + { + "memory_used": "mem_used.max('time')" + }, + { + "load_threshold": "memory_alloc * high_memory_load_threshold_factor" + }, + { + "high_mem_nodes": "memory_used > load_threshold" + }, + { + "high_mem": "high_mem_nodes.any('all')" + }, + { + "load_perc": "memory_used / (memory_alloc * high_memory_load_threshold_factor)" + } + ], + "output": "high_mem", + "output_scalar": "load_perc", + "template": "Job ({{ job.jobId }})\nThis job was detected as high_memory_load because the memory usage {{ high_mem_nodes.array }} falls above the threshold {{ load_threshold }}." +} diff --git a/internal/tagger/jobclasses/lowgpuload.json b/internal/tagger/jobclasses/lowgpuload.json new file mode 100644 index 0000000..80339b2 --- /dev/null +++ b/internal/tagger/jobclasses/lowgpuload.json @@ -0,0 +1,36 @@ +{ + "name": "Low GPU load", + "tag": "lowgpuload", + "parameters": [ + "lowgpuload_threshold_factor", + "job_min_duration_seconds", + "sampling_interval_seconds" + ], + "metrics": [ + "nv_util" + ], + "requirements": [ + "job.duration > job_min_duration_seconds", + "required_metrics_min_samples > job_min_duration_seconds / sampling_interval_seconds" + ], + "terms": [ + { + "load_mean": "nv_util.mean('all')" + }, + { + "load_threshold": "job.numAcc * lowgpuload_threshold_factor" + }, + { + "lowload_nodes": "load_mean < load_threshold" + }, + { + "lowload": "lowload_nodes.any('all')" + }, + { + "load_perc": "1.0 - (load_mean / load_threshold)" + } + ], + "output": "lowload", + "output_scalar": "load_perc", + "template": "Job ({{ job.jobId }})\nThis job was detected as lowgpuload because the mean gpu load {{ load_mean }} falls below the threshold {{ load_threshold }}." +} diff --git a/internal/tagger/jobclasses/lowload.json b/internal/tagger/jobclasses/lowload.json new file mode 100644 index 0000000..e860361 --- /dev/null +++ b/internal/tagger/jobclasses/lowload.json @@ -0,0 +1,38 @@ +{ + "name": "Low CPU load", + "tag": "lowload", + "parameters": [ + "lowcpuload_threshold_factor", + "job_min_duration_seconds", + "sampling_interval_seconds" + ], + "metrics": [ + "cpu_load" + ], + "requirements": [ + "job.exclusive == 1", + "job.duration > job_min_duration_seconds", + "required_metrics_min_samples > job_min_duration_seconds / sampling_interval_seconds" + ], + "tagRule": [ + { + "load_mean": "cpu_load[cpu_load_pre_cutoff_samples:].mean('all')" + }, + { + "load_threshold": "job.numHwthreads * lowcpuload_threshold_factor" + }, + { + "lowload_nodes": "load_mean < load_threshold" + }, + { + "lowload": "lowload_nodes.any('all')" + }, + { + "load_perc": "1.0 - (load_mean / load_threshold)" + } + ], + "valueRule": [], + "output": "lowload", + "output_scalar": "load_perc", + "hint": "Job ({{ job.jobId }})\nThis job was detected as lowload because the mean cpu load {{ load_mean }} falls below the threshold {{ load_threshold }}." +} diff --git a/internal/tagger/tagger.go b/internal/tagger/tagger.go index b336125..d5e42b1 100644 --- a/internal/tagger/tagger.go +++ b/internal/tagger/tagger.go @@ -13,7 +13,7 @@ import ( type Tagger interface { Register() error - Match(job *schema.Job) + Match(job *schema.JobMeta) } var ( @@ -31,6 +31,8 @@ func Init() { jobTagger = &JobTagger{} jobTagger.startTaggers = make([]Tagger, 0) jobTagger.startTaggers = append(jobTagger.startTaggers, &AppTagger{}) + jobTagger.stopTaggers = make([]Tagger, 0) + jobTagger.stopTaggers = append(jobTagger.startTaggers, &JobClassTagger{}) for _, tagger := range jobTagger.startTaggers { tagger.Register() @@ -46,5 +48,5 @@ func (jt *JobTagger) JobStartCallback(job *schema.Job) { } } -func (jt *JobTagger) JobStopCallback(job *schema.Job) { +func (jt *JobTagger) JobStopCallback(job *schema.JobMeta) { } diff --git a/pkg/archive/fsBackend.go b/pkg/archive/fsBackend.go index 711b1f5..a59b663 100644 --- a/pkg/archive/fsBackend.go +++ b/pkg/archive/fsBackend.go @@ -59,14 +59,13 @@ func getDirectory( func getPath( job *schema.Job, rootPath string, - file string) string { - + file string, +) string { return filepath.Join( getDirectory(job, rootPath), file) } func loadJobMeta(filename string) (*schema.JobMeta, error) { - b, err := os.ReadFile(filename) if err != nil { log.Errorf("loadJobMeta() > open file error: %v", err) @@ -83,7 +82,6 @@ func loadJobMeta(filename string) (*schema.JobMeta, error) { func loadJobData(filename string, isCompressed bool) (schema.JobData, error) { f, err := os.Open(filename) - if err != nil { log.Errorf("fsBackend LoadJobData()- %v", err) return nil, err @@ -117,7 +115,6 @@ func loadJobData(filename string, isCompressed bool) (schema.JobData, error) { func loadJobStats(filename string, isCompressed bool) (schema.ScopedJobStats, error) { f, err := os.Open(filename) - if err != nil { log.Errorf("fsBackend LoadJobStats()- %v", err) return nil, err @@ -150,7 +147,6 @@ func loadJobStats(filename string, isCompressed bool) (schema.ScopedJobStats, er } func (fsa *FsArchive) Init(rawConfig json.RawMessage) (uint64, error) { - var config FsArchiveConfig if err := json.Unmarshal(rawConfig, &config); err != nil { log.Warnf("Init() > Unmarshal error: %#v", err) @@ -276,7 +272,6 @@ func (fsa *FsArchive) Exists(job *schema.Job) bool { } func (fsa *FsArchive) Clean(before int64, after int64) { - if after == 0 { after = math.MaxInt64 } @@ -392,7 +387,6 @@ func (fsa *FsArchive) Compress(jobs []*schema.Job) { } func (fsa *FsArchive) CompressLast(starttime int64) int64 { - filename := filepath.Join(fsa.path, "compress.txt") b, err := os.ReadFile(filename) if err != nil { @@ -441,7 +435,6 @@ func (fsa *FsArchive) LoadJobMeta(job *schema.Job) (*schema.JobMeta, error) { } func (fsa *FsArchive) LoadClusterCfg(name string) (*schema.Cluster, error) { - b, err := os.ReadFile(filepath.Join(fsa.path, name, "cluster.json")) if err != nil { log.Errorf("LoadClusterCfg() > open file error: %v", err) @@ -456,7 +449,6 @@ func (fsa *FsArchive) LoadClusterCfg(name string) (*schema.Cluster, error) { } func (fsa *FsArchive) Iter(loadMetricData bool) <-chan JobContainer { - ch := make(chan JobContainer) go func() { clustersDir, err := os.ReadDir(fsa.path) @@ -527,7 +519,6 @@ func (fsa *FsArchive) Iter(loadMetricData bool) <-chan JobContainer { } func (fsa *FsArchive) StoreJobMeta(jobMeta *schema.JobMeta) error { - job := schema.Job{ BaseJob: jobMeta.BaseJob, StartTime: time.Unix(jobMeta.StartTime, 0), @@ -556,8 +547,8 @@ func (fsa *FsArchive) GetClusters() []string { func (fsa *FsArchive) ImportJob( jobMeta *schema.JobMeta, - jobData *schema.JobData) error { - + jobData *schema.JobData, +) error { job := schema.Job{ BaseJob: jobMeta.BaseJob, StartTime: time.Unix(jobMeta.StartTime, 0), @@ -583,28 +574,6 @@ func (fsa *FsArchive) ImportJob( return err } - // var isCompressed bool = true - // // TODO Use shortJob Config for check - // if jobMeta.Duration < 300 { - // isCompressed = false - // f, err = os.Create(path.Join(dir, "data.json")) - // } else { - // f, err = os.Create(path.Join(dir, "data.json.gz")) - // } - // if err != nil { - // return err - // } - // - // if isCompressed { - // if err := EncodeJobData(gzip.NewWriter(f), jobData); err != nil { - // return err - // } - // } else { - // if err := EncodeJobData(f, jobData); err != nil { - // return err - // } - // } - f, err = os.Create(path.Join(dir, "data.json")) if err != nil { log.Error("Error while creating filepath for data.json")