From 14bad81b9fd46ceca683aaffbe64566fb7b37972 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Mon, 19 May 2025 13:25:39 +0200 Subject: [PATCH] Extend Job Hooks and add unit tests Add job tagger control --- internal/api/rest.go | 2 +- internal/repository/jobCreate.go | 32 +++++++++++++++---- internal/repository/jobHooks.go | 37 +++++++++++++++++----- internal/tagger/apps/python.txt | 3 ++ internal/tagger/detectApp.go | 28 +++++++++++------ internal/tagger/detectApp_test.go | 2 +- internal/tagger/tagger.go | 40 ++++++++++++++++++++++-- internal/tagger/tagger_test.go | 31 ++++++++++++++++++ internal/taskManager/commitJobService.go | 6 ++-- 9 files changed, 150 insertions(+), 31 deletions(-) create mode 100644 internal/tagger/apps/python.txt create mode 100644 internal/tagger/tagger_test.go diff --git a/internal/api/rest.go b/internal/api/rest.go index e0804cb..6133a5e 100644 --- a/internal/api/rest.go +++ b/internal/api/rest.go @@ -1126,7 +1126,7 @@ func (api *RestApi) checkAndHandleStopJob(rw http.ResponseWriter, job *schema.Jo return } - repository.CallJobStopHooks() + repository.CallJobStopHooks(job) // Trigger async archiving archiver.TriggerArchiving(job) diff --git a/internal/repository/jobCreate.go b/internal/repository/jobCreate.go index 3b997f3..a651db9 100644 --- a/internal/repository/jobCreate.go +++ b/internal/repository/jobCreate.go @@ -46,23 +46,43 @@ func (r *JobRepository) InsertJob(job *schema.JobMeta) (int64, error) { return id, nil } -func (r *JobRepository) SyncJobs() error { +func (r *JobRepository) SyncJobs() ([]*schema.Job, error) { r.Mutex.Lock() defer r.Mutex.Unlock() - _, err := r.DB.Exec( + + query := sq.Select(jobColumns...).From("job_cache") + + rows, err := query.RunWith(r.stmtCache).Query() + if err != nil { + log.Errorf("Error while running query %v", err) + return nil, err + } + + jobs := make([]*schema.Job, 0, 50) + for rows.Next() { + job, err := scanJob(rows) + if err != nil { + rows.Close() + log.Warn("Error while scanning rows") + return nil, err + } + jobs = append(jobs, job) + } + + _, err = r.DB.Exec( "INSERT INTO job (job_id, cluster, subcluster, start_time, hpc_user, project, cluster_partition, array_job_id, num_nodes, num_hwthreads, num_acc, exclusive, monitoring_status, smt, job_state, duration, walltime, footprint, energy, energy_footprint, resources, meta_data) SELECT job_id, cluster, subcluster, start_time, hpc_user, project, cluster_partition, array_job_id, num_nodes, num_hwthreads, num_acc, exclusive, monitoring_status, smt, job_state, duration, walltime, footprint, energy, energy_footprint, resources, meta_data FROM job_cache") if err != nil { log.Warnf("Error while Job sync: %v", err) - return err + return nil, err } _, err = r.DB.Exec("DELETE FROM job_cache") if err != nil { - log.Warn("Error while Job cache clean") - return err + log.Warnf("Error while Job cache clean: %v", err) + return nil, err } - return nil + return jobs, nil } // Start inserts a new job in the table, returning the unique job ID. diff --git a/internal/repository/jobHooks.go b/internal/repository/jobHooks.go index d69874f..1016335 100644 --- a/internal/repository/jobHooks.go +++ b/internal/repository/jobHooks.go @@ -4,31 +4,54 @@ // license that can be found in the LICENSE file. package repository +import ( + "sync" + + "github.com/ClusterCockpit/cc-backend/pkg/schema" +) + type JobHook interface { - jobStartCallback() - jobStopCallback() + JobStartCallback(job *schema.Job) + JobStopCallback(job *schema.Job) } -var hooks []JobHook +var ( + initOnce sync.Once + hooks []JobHook +) func RegisterJobJook(hook JobHook) { + initOnce.Do(func() { + hooks = make([]JobHook, 0) + }) + if hook != nil { hooks = append(hooks, hook) } } -func CallJobStartHooks() { +func CallJobStartHooks(jobs []*schema.Job) { + if hooks == nil { + return + } + for _, hook := range hooks { if hook != nil { - hook.jobStartCallback() + for _, job := range jobs { + hook.JobStartCallback(job) + } } } } -func CallJobStopHooks() { +func CallJobStopHooks(job *schema.Job) { + if hooks == nil { + return + } + for _, hook := range hooks { if hook != nil { - hook.jobStopCallback() + hook.JobStopCallback(job) } } } diff --git a/internal/tagger/apps/python.txt b/internal/tagger/apps/python.txt new file mode 100644 index 0000000..7a5c661 --- /dev/null +++ b/internal/tagger/apps/python.txt @@ -0,0 +1,3 @@ +python +anaconda +conda diff --git a/internal/tagger/detectApp.go b/internal/tagger/detectApp.go index 339e398..44a08e0 100644 --- a/internal/tagger/detectApp.go +++ b/internal/tagger/detectApp.go @@ -8,6 +8,7 @@ import ( "bufio" "embed" "fmt" + "io/fs" "path/filepath" "strings" @@ -27,16 +28,10 @@ type appInfo struct { } type AppTagger struct { - apps []appInfo + apps map[string]appInfo } -func (t *AppTagger) Register() error { - files, err := appFiles.ReadDir("apps") - if err != nil { - return fmt.Errorf("error reading app folder: %#v", err) - } - t.apps = make([]appInfo, 0) - +func (t *AppTagger) scanApps(files []fs.DirEntry) error { for _, fn := range files { fns := fn.Name() log.Debugf("Process: %s", fns) @@ -50,12 +45,25 @@ func (t *AppTagger) Register() error { for scanner.Scan() { ai.strings = append(ai.strings, scanner.Text()) } - t.apps = append(t.apps, ai) + delete(t.apps, ai.tag) + t.apps[ai.tag] = ai } - return nil } +// func (t *AppTagger) Reload() error { +// +// } + +func (t *AppTagger) Register() error { + files, err := appFiles.ReadDir("apps") + if err != nil { + return fmt.Errorf("error reading app folder: %#v", err) + } + t.apps = make(map[string]appInfo, 0) + return t.scanApps(files) +} + func (t *AppTagger) Match(job *schema.Job) { r := repository.GetJobRepository() meta, err := r.FetchMetadata(job) diff --git a/internal/tagger/detectApp_test.go b/internal/tagger/detectApp_test.go index 8978e35..3b43cce 100644 --- a/internal/tagger/detectApp_test.go +++ b/internal/tagger/detectApp_test.go @@ -35,7 +35,7 @@ func TestRegister(t *testing.T) { err := tagger.Register() noErr(t, err) - if len(tagger.apps) != 3 { + if len(tagger.apps) != 4 { t.Errorf("wrong summary for diagnostic \ngot: %d \nwant: 3", len(tagger.apps)) } } diff --git a/internal/tagger/tagger.go b/internal/tagger/tagger.go index 52a369b..4fbbc9e 100644 --- a/internal/tagger/tagger.go +++ b/internal/tagger/tagger.go @@ -4,14 +4,48 @@ // license that can be found in the LICENSE file. package tagger -import "github.com/ClusterCockpit/cc-backend/pkg/schema" +import ( + "sync" + + "github.com/ClusterCockpit/cc-backend/internal/repository" + "github.com/ClusterCockpit/cc-backend/pkg/schema" +) type Tagger interface { Register() error Match(job *schema.Job) } -func Init() error { +var ( + initOnce sync.Once + jobTagger *JobTagger +) - return nil +type JobTagger struct { + startTaggers []Tagger + stopTaggers []Tagger +} + +func Init() { + initOnce.Do(func() { + jobTagger = &JobTagger{} + jobTagger.startTaggers = make([]Tagger, 0) + jobTagger.startTaggers = append(jobTagger.startTaggers, &AppTagger{}) + + for _, tagger := range jobTagger.startTaggers { + tagger.Register() + } + + // jobTagger.stopTaggers = make([]Tagger, 0) + repository.RegisterJobJook(jobTagger) + }) +} + +func (jt *JobTagger) JobStartCallback(job *schema.Job) { + for _, tagger := range jobTagger.startTaggers { + tagger.Match(job) + } +} + +func (jt *JobTagger) JobStopCallback(job *schema.Job) { } diff --git a/internal/tagger/tagger_test.go b/internal/tagger/tagger_test.go new file mode 100644 index 0000000..057ca17 --- /dev/null +++ b/internal/tagger/tagger_test.go @@ -0,0 +1,31 @@ +// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. +package tagger + +import ( + "testing" + + "github.com/ClusterCockpit/cc-backend/internal/repository" + "github.com/ClusterCockpit/cc-backend/pkg/schema" +) + +func TestInit(t *testing.T) { + Init() +} + +func TestJobStartCallback(t *testing.T) { + Init() + r := setup(t) + job, err := r.FindByIdDirect(2) + noErr(t, err) + + jobs := make([]*schema.Job, 0, 1) + jobs = append(jobs, job) + + repository.CallJobStartHooks(jobs) + if !r.HasTag(2, "app", "python") { + t.Errorf("missing tag python") + } +} diff --git a/internal/taskManager/commitJobService.go b/internal/taskManager/commitJobService.go index 7749348..c60acb3 100644 --- a/internal/taskManager/commitJobService.go +++ b/internal/taskManager/commitJobService.go @@ -28,8 +28,8 @@ func RegisterCommitJobService() { func() { start := time.Now() log.Printf("Jobcache sync started at %s", start.Format(time.RFC3339)) - jobRepo.SyncJobs() - repository.CallJobStartHooks() - log.Printf("Jobcache sync is done and took %s", time.Since(start)) + jobs, _ := jobRepo.SyncJobs() + repository.CallJobStartHooks(jobs) + log.Printf("Jobcache sync and job callbacks are done and took %s", time.Since(start)) })) }