From 85f17c0fd85fff07e14009f85566927955477f25 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Mon, 19 May 2025 16:08:43 +0200 Subject: [PATCH] Refactor Tagger package. Add fsNotify Service --- cmd/cc-backend/main.go | 5 +++ go.mod | 1 + go.sum | 2 + internal/repository/job.go | 6 ++- internal/tagger/detectApp.go | 64 ++++++++++++++++++++++--------- internal/tagger/tagger.go | 1 - internal/util/fswatcher.go | 73 ++++++++++++++++++++++++++++++++++++ 7 files changed, 131 insertions(+), 21 deletions(-) create mode 100644 internal/util/fswatcher.go diff --git a/cmd/cc-backend/main.go b/cmd/cc-backend/main.go index 4b6d7f9..cbfccef 100644 --- a/cmd/cc-backend/main.go +++ b/cmd/cc-backend/main.go @@ -19,7 +19,9 @@ import ( "github.com/ClusterCockpit/cc-backend/internal/importer" "github.com/ClusterCockpit/cc-backend/internal/metricdata" "github.com/ClusterCockpit/cc-backend/internal/repository" + "github.com/ClusterCockpit/cc-backend/internal/tagger" "github.com/ClusterCockpit/cc-backend/internal/taskManager" + "github.com/ClusterCockpit/cc-backend/internal/util" "github.com/ClusterCockpit/cc-backend/pkg/archive" "github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/runtimeEnv" @@ -216,6 +218,7 @@ func main() { } archiver.Start(repository.GetJobRepository()) + tagger.Init() taskManager.Start() serverInit() @@ -237,6 +240,8 @@ func main() { serverShutdown() + util.FsWatcherShutdown() + taskManager.Shutdown() }() diff --git a/go.mod b/go.mod index 98d1cab..f17ec18 100644 --- a/go.mod +++ b/go.mod @@ -44,6 +44,7 @@ require ( github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.6 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect + github.com/fsnotify/fsnotify v1.9.0 // indirect github.com/go-asn1-ber/asn1-ber v1.5.7 // indirect github.com/go-jose/go-jose/v4 v4.0.5 // indirect github.com/go-openapi/jsonpointer v0.21.0 // indirect diff --git a/go.sum b/go.sum index a76e112..57b1649 100644 --- a/go.sum +++ b/go.sum @@ -55,6 +55,8 @@ github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4 github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= +github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S9k= +github.com/fsnotify/fsnotify v1.9.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0= github.com/go-asn1-ber/asn1-ber v1.5.7 h1:DTX+lbVTWaTw1hQ+PbZPlnDZPEIs0SS/GCZAl535dDk= github.com/go-asn1-ber/asn1-ber v1.5.7/go.mod h1:hEBeB/ic+5LoWskz+yKT7vGhhPYkProFKoKdwZRWMe0= github.com/go-co-op/gocron/v2 v2.16.0 h1:uqUF6WFZ4enRU45pWFNcn1xpDLc+jBOTKhPQI16Z1xs= diff --git a/internal/repository/job.go b/internal/repository/job.go index 29aa63e..73a2588 100644 --- a/internal/repository/job.go +++ b/internal/repository/job.go @@ -58,8 +58,10 @@ func GetJobRepository() *JobRepository { // } var jobColumns []string = []string{ - "id", "job_id", "hpc_user", "project", "cluster", "subcluster", "start_time", "cluster_partition", "array_job_id", "num_nodes", "num_hwthreads", "num_acc", "exclusive", "monitoring_status", "smt", "job_state", - "duration", "walltime", "resources", "footprint", "energy", + "id", "job_id", "hpc_user", "project", "cluster", "subcluster", "start_time", + "cluster_partition", "array_job_id", "num_nodes", "num_hwthreads", "num_acc", + "exclusive", "monitoring_status", "smt", "job_state", "duration", "walltime", + "resources", "footprint", "energy", } func scanJob(row interface{ Scan(...any) error }) (*schema.Job, error) { diff --git a/internal/tagger/detectApp.go b/internal/tagger/detectApp.go index 44a08e0..621e20c 100644 --- a/internal/tagger/detectApp.go +++ b/internal/tagger/detectApp.go @@ -9,15 +9,20 @@ import ( "embed" "fmt" "io/fs" + "os" "path/filepath" "strings" "github.com/ClusterCockpit/cc-backend/internal/repository" + "github.com/ClusterCockpit/cc-backend/internal/util" "github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/schema" ) -const tagType = "app" +const ( + tagType = "app" + appPath = "./var/tagger/apps" +) //go:embed apps/* var appFiles embed.FS @@ -31,37 +36,60 @@ type AppTagger struct { apps map[string]appInfo } -func (t *AppTagger) scanApps(files []fs.DirEntry) error { +func (t *AppTagger) scanApp(f fs.File, fns string) { + scanner := bufio.NewScanner(f) + ai := appInfo{tag: strings.TrimSuffix(fns, filepath.Ext(fns)), strings: make([]string, 0)} + + for scanner.Scan() { + ai.strings = append(ai.strings, scanner.Text()) + } + delete(t.apps, ai.tag) + t.apps[ai.tag] = ai +} + +func (t *AppTagger) EventMatch(s string) bool { + return strings.Contains(s, "apps") +} + +func (t *AppTagger) EventCallback() { + files, err := os.ReadDir(appPath) + if err != nil { + log.Fatal(err) + } + for _, fn := range files { fns := fn.Name() log.Debugf("Process: %s", fns) - f, err := appFiles.Open(fmt.Sprintf("apps/%s", fns)) + f, err := os.Open(fmt.Sprintf("%s/%s", appPath, fns)) if err != nil { - return fmt.Errorf("error opening app file %s: %#v", fns, err) + log.Errorf("error opening app file %s: %#v", fns, err) } - scanner := bufio.NewScanner(f) - ai := appInfo{tag: strings.TrimSuffix(fns, filepath.Ext(fns)), strings: make([]string, 0)} - - for scanner.Scan() { - ai.strings = append(ai.strings, scanner.Text()) - } - delete(t.apps, ai.tag) - t.apps[ai.tag] = ai + t.scanApp(f, fns) } - return nil } -// func (t *AppTagger) Reload() error { -// -// } - func (t *AppTagger) Register() error { files, err := appFiles.ReadDir("apps") if err != nil { return fmt.Errorf("error reading app folder: %#v", err) } t.apps = make(map[string]appInfo, 0) - return t.scanApps(files) + for _, fn := range files { + fns := fn.Name() + log.Debugf("Process: %s", fns) + f, err := appFiles.Open(fmt.Sprintf("apps/%s", fns)) + if err != nil { + return fmt.Errorf("error opening app file %s: %#v", fns, err) + } + t.scanApp(f, fns) + } + + if util.CheckFileExists(appPath) { + log.Infof("Setup file watch for %s", appPath) + util.AddListener(appPath, t) + } + + return nil } func (t *AppTagger) Match(job *schema.Job) { diff --git a/internal/tagger/tagger.go b/internal/tagger/tagger.go index 4fbbc9e..b336125 100644 --- a/internal/tagger/tagger.go +++ b/internal/tagger/tagger.go @@ -36,7 +36,6 @@ func Init() { tagger.Register() } - // jobTagger.stopTaggers = make([]Tagger, 0) repository.RegisterJobJook(jobTagger) }) } diff --git a/internal/util/fswatcher.go b/internal/util/fswatcher.go new file mode 100644 index 0000000..aaf3372 --- /dev/null +++ b/internal/util/fswatcher.go @@ -0,0 +1,73 @@ +// Copyright (C) 2023 NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. +package util + +import ( + "sync" + + "github.com/ClusterCockpit/cc-backend/pkg/log" + "github.com/fsnotify/fsnotify" +) + +type Listener interface { + EventCallback() + EventMatch(event string) bool +} + +var ( + initOnce sync.Once + w *fsnotify.Watcher + listeners []Listener +) + +func AddListener(path string, l Listener) { + var err error + + initOnce.Do(func() { + var err error + w, err = fsnotify.NewWatcher() + if err != nil { + log.Error("creating a new watcher: %w", err) + } + listeners = make([]Listener, 0) + + go watchLoop(w) + }) + + listeners = append(listeners, l) + err = w.Add(path) + if err != nil { + log.Warnf("%q: %s", path, err) + } +} + +func FsWatcherShutdown() { + w.Close() +} + +func watchLoop(w *fsnotify.Watcher) { + for { + select { + // Read from Errors. + case err, ok := <-w.Errors: + if !ok { // Channel was closed (i.e. Watcher.Close() was called). + return + } + log.Errorf("watch event loop: %s", err) + // Read from Events. + case e, ok := <-w.Events: + if !ok { // Channel was closed (i.e. Watcher.Close() was called). + return + } + + log.Infof("Event %s", e) + for _, l := range listeners { + if l.EventMatch(e.String()) { + l.EventCallback() + } + } + } + } +}