Refactor Tagger package. Add fsNotify Service

This commit is contained in:
Jan Eitzinger 2025-05-19 16:08:43 +02:00
parent 14bad81b9f
commit 85f17c0fd8
7 changed files with 131 additions and 21 deletions

View File

@ -19,7 +19,9 @@ import (
"github.com/ClusterCockpit/cc-backend/internal/importer" "github.com/ClusterCockpit/cc-backend/internal/importer"
"github.com/ClusterCockpit/cc-backend/internal/metricdata" "github.com/ClusterCockpit/cc-backend/internal/metricdata"
"github.com/ClusterCockpit/cc-backend/internal/repository" "github.com/ClusterCockpit/cc-backend/internal/repository"
"github.com/ClusterCockpit/cc-backend/internal/tagger"
"github.com/ClusterCockpit/cc-backend/internal/taskManager" "github.com/ClusterCockpit/cc-backend/internal/taskManager"
"github.com/ClusterCockpit/cc-backend/internal/util"
"github.com/ClusterCockpit/cc-backend/pkg/archive" "github.com/ClusterCockpit/cc-backend/pkg/archive"
"github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/runtimeEnv" "github.com/ClusterCockpit/cc-backend/pkg/runtimeEnv"
@ -216,6 +218,7 @@ func main() {
} }
archiver.Start(repository.GetJobRepository()) archiver.Start(repository.GetJobRepository())
tagger.Init()
taskManager.Start() taskManager.Start()
serverInit() serverInit()
@ -237,6 +240,8 @@ func main() {
serverShutdown() serverShutdown()
util.FsWatcherShutdown()
taskManager.Shutdown() taskManager.Shutdown()
}() }()

1
go.mod
View File

@ -44,6 +44,7 @@ require (
github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.6 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.6 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/fsnotify/fsnotify v1.9.0 // indirect
github.com/go-asn1-ber/asn1-ber v1.5.7 // indirect github.com/go-asn1-ber/asn1-ber v1.5.7 // indirect
github.com/go-jose/go-jose/v4 v4.0.5 // indirect github.com/go-jose/go-jose/v4 v4.0.5 // indirect
github.com/go-openapi/jsonpointer v0.21.0 // indirect github.com/go-openapi/jsonpointer v0.21.0 // indirect

2
go.sum
View File

@ -55,6 +55,8 @@ github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4
github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg=
github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S9k=
github.com/fsnotify/fsnotify v1.9.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0=
github.com/go-asn1-ber/asn1-ber v1.5.7 h1:DTX+lbVTWaTw1hQ+PbZPlnDZPEIs0SS/GCZAl535dDk= github.com/go-asn1-ber/asn1-ber v1.5.7 h1:DTX+lbVTWaTw1hQ+PbZPlnDZPEIs0SS/GCZAl535dDk=
github.com/go-asn1-ber/asn1-ber v1.5.7/go.mod h1:hEBeB/ic+5LoWskz+yKT7vGhhPYkProFKoKdwZRWMe0= github.com/go-asn1-ber/asn1-ber v1.5.7/go.mod h1:hEBeB/ic+5LoWskz+yKT7vGhhPYkProFKoKdwZRWMe0=
github.com/go-co-op/gocron/v2 v2.16.0 h1:uqUF6WFZ4enRU45pWFNcn1xpDLc+jBOTKhPQI16Z1xs= github.com/go-co-op/gocron/v2 v2.16.0 h1:uqUF6WFZ4enRU45pWFNcn1xpDLc+jBOTKhPQI16Z1xs=

View File

@ -58,8 +58,10 @@ func GetJobRepository() *JobRepository {
// } // }
var jobColumns []string = []string{ var jobColumns []string = []string{
"id", "job_id", "hpc_user", "project", "cluster", "subcluster", "start_time", "cluster_partition", "array_job_id", "num_nodes", "num_hwthreads", "num_acc", "exclusive", "monitoring_status", "smt", "job_state", "id", "job_id", "hpc_user", "project", "cluster", "subcluster", "start_time",
"duration", "walltime", "resources", "footprint", "energy", "cluster_partition", "array_job_id", "num_nodes", "num_hwthreads", "num_acc",
"exclusive", "monitoring_status", "smt", "job_state", "duration", "walltime",
"resources", "footprint", "energy",
} }
func scanJob(row interface{ Scan(...any) error }) (*schema.Job, error) { func scanJob(row interface{ Scan(...any) error }) (*schema.Job, error) {

View File

@ -9,15 +9,20 @@ import (
"embed" "embed"
"fmt" "fmt"
"io/fs" "io/fs"
"os"
"path/filepath" "path/filepath"
"strings" "strings"
"github.com/ClusterCockpit/cc-backend/internal/repository" "github.com/ClusterCockpit/cc-backend/internal/repository"
"github.com/ClusterCockpit/cc-backend/internal/util"
"github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/schema" "github.com/ClusterCockpit/cc-backend/pkg/schema"
) )
const tagType = "app" const (
tagType = "app"
appPath = "./var/tagger/apps"
)
//go:embed apps/* //go:embed apps/*
var appFiles embed.FS var appFiles embed.FS
@ -31,37 +36,60 @@ type AppTagger struct {
apps map[string]appInfo apps map[string]appInfo
} }
func (t *AppTagger) scanApps(files []fs.DirEntry) error { func (t *AppTagger) scanApp(f fs.File, fns string) {
scanner := bufio.NewScanner(f)
ai := appInfo{tag: strings.TrimSuffix(fns, filepath.Ext(fns)), strings: make([]string, 0)}
for scanner.Scan() {
ai.strings = append(ai.strings, scanner.Text())
}
delete(t.apps, ai.tag)
t.apps[ai.tag] = ai
}
func (t *AppTagger) EventMatch(s string) bool {
return strings.Contains(s, "apps")
}
func (t *AppTagger) EventCallback() {
files, err := os.ReadDir(appPath)
if err != nil {
log.Fatal(err)
}
for _, fn := range files { for _, fn := range files {
fns := fn.Name() fns := fn.Name()
log.Debugf("Process: %s", fns) log.Debugf("Process: %s", fns)
f, err := appFiles.Open(fmt.Sprintf("apps/%s", fns)) f, err := os.Open(fmt.Sprintf("%s/%s", appPath, fns))
if err != nil { if err != nil {
return fmt.Errorf("error opening app file %s: %#v", fns, err) log.Errorf("error opening app file %s: %#v", fns, err)
} }
scanner := bufio.NewScanner(f) t.scanApp(f, fns)
ai := appInfo{tag: strings.TrimSuffix(fns, filepath.Ext(fns)), strings: make([]string, 0)}
for scanner.Scan() {
ai.strings = append(ai.strings, scanner.Text())
}
delete(t.apps, ai.tag)
t.apps[ai.tag] = ai
} }
return nil
} }
// func (t *AppTagger) Reload() error {
//
// }
func (t *AppTagger) Register() error { func (t *AppTagger) Register() error {
files, err := appFiles.ReadDir("apps") files, err := appFiles.ReadDir("apps")
if err != nil { if err != nil {
return fmt.Errorf("error reading app folder: %#v", err) return fmt.Errorf("error reading app folder: %#v", err)
} }
t.apps = make(map[string]appInfo, 0) t.apps = make(map[string]appInfo, 0)
return t.scanApps(files) for _, fn := range files {
fns := fn.Name()
log.Debugf("Process: %s", fns)
f, err := appFiles.Open(fmt.Sprintf("apps/%s", fns))
if err != nil {
return fmt.Errorf("error opening app file %s: %#v", fns, err)
}
t.scanApp(f, fns)
}
if util.CheckFileExists(appPath) {
log.Infof("Setup file watch for %s", appPath)
util.AddListener(appPath, t)
}
return nil
} }
func (t *AppTagger) Match(job *schema.Job) { func (t *AppTagger) Match(job *schema.Job) {

View File

@ -36,7 +36,6 @@ func Init() {
tagger.Register() tagger.Register()
} }
// jobTagger.stopTaggers = make([]Tagger, 0)
repository.RegisterJobJook(jobTagger) repository.RegisterJobJook(jobTagger)
}) })
} }

View File

@ -0,0 +1,73 @@
// Copyright (C) 2023 NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package util
import (
"sync"
"github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/fsnotify/fsnotify"
)
type Listener interface {
EventCallback()
EventMatch(event string) bool
}
var (
initOnce sync.Once
w *fsnotify.Watcher
listeners []Listener
)
func AddListener(path string, l Listener) {
var err error
initOnce.Do(func() {
var err error
w, err = fsnotify.NewWatcher()
if err != nil {
log.Error("creating a new watcher: %w", err)
}
listeners = make([]Listener, 0)
go watchLoop(w)
})
listeners = append(listeners, l)
err = w.Add(path)
if err != nil {
log.Warnf("%q: %s", path, err)
}
}
func FsWatcherShutdown() {
w.Close()
}
func watchLoop(w *fsnotify.Watcher) {
for {
select {
// Read from Errors.
case err, ok := <-w.Errors:
if !ok { // Channel was closed (i.e. Watcher.Close() was called).
return
}
log.Errorf("watch event loop: %s", err)
// Read from Events.
case e, ok := <-w.Events:
if !ok { // Channel was closed (i.e. Watcher.Close() was called).
return
}
log.Infof("Event %s", e)
for _, l := range listeners {
if l.EventMatch(e.String()) {
l.EventCallback()
}
}
}
}
}