mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2026-02-28 21:37:31 +01:00
Enable to run taggers from within admin web interface
This commit is contained in:
@@ -10,6 +10,7 @@
|
||||
package tagger
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/ClusterCockpit/cc-backend/internal/repository"
|
||||
@@ -29,11 +30,31 @@ type Tagger interface {
|
||||
Match(job *schema.Job)
|
||||
}
|
||||
|
||||
// TaggerInfo holds metadata about a tagger for JSON serialization.
|
||||
type TaggerInfo struct {
|
||||
Name string `json:"name"`
|
||||
Type string `json:"type"`
|
||||
Running bool `json:"running"`
|
||||
}
|
||||
|
||||
var (
|
||||
initOnce sync.Once
|
||||
jobTagger *JobTagger
|
||||
initOnce sync.Once
|
||||
jobTagger *JobTagger
|
||||
statusMu sync.Mutex
|
||||
taggerStatus = map[string]bool{}
|
||||
)
|
||||
|
||||
// Known tagger definitions: name -> (type, factory)
|
||||
type taggerDef struct {
|
||||
ttype string
|
||||
factory func() Tagger
|
||||
}
|
||||
|
||||
var knownTaggers = map[string]taggerDef{
|
||||
"AppTagger": {ttype: "start", factory: func() Tagger { return &AppTagger{} }},
|
||||
"JobClassTagger": {ttype: "stop", factory: func() Tagger { return &JobClassTagger{} }},
|
||||
}
|
||||
|
||||
// JobTagger coordinates multiple taggers that run at different job lifecycle events.
|
||||
// It maintains separate lists of taggers that run when jobs start and when they stop.
|
||||
type JobTagger struct {
|
||||
@@ -88,6 +109,73 @@ func (jt *JobTagger) JobStopCallback(job *schema.Job) {
|
||||
}
|
||||
}
|
||||
|
||||
// ListTaggers returns information about all known taggers with their current running status.
|
||||
func ListTaggers() []TaggerInfo {
|
||||
statusMu.Lock()
|
||||
defer statusMu.Unlock()
|
||||
|
||||
result := make([]TaggerInfo, 0, len(knownTaggers))
|
||||
for name, def := range knownTaggers {
|
||||
result = append(result, TaggerInfo{
|
||||
Name: name,
|
||||
Type: def.ttype,
|
||||
Running: taggerStatus[name],
|
||||
})
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// RunTaggerByName starts a tagger by name asynchronously on all jobs.
|
||||
// Returns an error if the name is unknown or the tagger is already running.
|
||||
func RunTaggerByName(name string) error {
|
||||
def, ok := knownTaggers[name]
|
||||
if !ok {
|
||||
return fmt.Errorf("unknown tagger: %s", name)
|
||||
}
|
||||
|
||||
statusMu.Lock()
|
||||
if taggerStatus[name] {
|
||||
statusMu.Unlock()
|
||||
return fmt.Errorf("tagger %s is already running", name)
|
||||
}
|
||||
taggerStatus[name] = true
|
||||
statusMu.Unlock()
|
||||
|
||||
go func() {
|
||||
defer func() {
|
||||
statusMu.Lock()
|
||||
taggerStatus[name] = false
|
||||
statusMu.Unlock()
|
||||
}()
|
||||
|
||||
t := def.factory()
|
||||
if err := t.Register(); err != nil {
|
||||
cclog.Errorf("Failed to register tagger %s: %s", name, err)
|
||||
return
|
||||
}
|
||||
|
||||
r := repository.GetJobRepository()
|
||||
jl, err := r.GetJobList(0, 0)
|
||||
if err != nil {
|
||||
cclog.Errorf("Error getting job list for tagger %s: %s", name, err)
|
||||
return
|
||||
}
|
||||
|
||||
cclog.Infof("Running tagger %s on %d jobs", name, len(jl))
|
||||
for _, id := range jl {
|
||||
job, err := r.FindByIDDirect(id)
|
||||
if err != nil {
|
||||
cclog.Errorf("Error getting job %d for tagger %s: %s", id, name, err)
|
||||
continue
|
||||
}
|
||||
t.Match(job)
|
||||
}
|
||||
cclog.Infof("Tagger %s completed", name)
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// RunTaggers applies all configured taggers to all existing jobs in the repository.
|
||||
// This is useful for retroactively applying tags to jobs that were created before
|
||||
// the tagger system was initialized or when new tagging rules are added.
|
||||
|
||||
Reference in New Issue
Block a user