mirror of
				https://github.com/ClusterCockpit/cc-backend
				synced 2025-11-04 01:25:06 +01:00 
			
		
		
		
	Extend Job Hooks and add unit tests
Add job tagger control
This commit is contained in:
		internal
api
repository
tagger
taskManager
@@ -1126,7 +1126,7 @@ func (api *RestApi) checkAndHandleStopJob(rw http.ResponseWriter, job *schema.Jo
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	repository.CallJobStopHooks()
 | 
			
		||||
	repository.CallJobStopHooks(job)
 | 
			
		||||
 | 
			
		||||
	// Trigger async archiving
 | 
			
		||||
	archiver.TriggerArchiving(job)
 | 
			
		||||
 
 | 
			
		||||
@@ -46,23 +46,43 @@ func (r *JobRepository) InsertJob(job *schema.JobMeta) (int64, error) {
 | 
			
		||||
	return id, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (r *JobRepository) SyncJobs() error {
 | 
			
		||||
func (r *JobRepository) SyncJobs() ([]*schema.Job, error) {
 | 
			
		||||
	r.Mutex.Lock()
 | 
			
		||||
	defer r.Mutex.Unlock()
 | 
			
		||||
	_, err := r.DB.Exec(
 | 
			
		||||
 | 
			
		||||
	query := sq.Select(jobColumns...).From("job_cache")
 | 
			
		||||
 | 
			
		||||
	rows, err := query.RunWith(r.stmtCache).Query()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		log.Errorf("Error while running query %v", err)
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	jobs := make([]*schema.Job, 0, 50)
 | 
			
		||||
	for rows.Next() {
 | 
			
		||||
		job, err := scanJob(rows)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			rows.Close()
 | 
			
		||||
			log.Warn("Error while scanning rows")
 | 
			
		||||
			return nil, err
 | 
			
		||||
		}
 | 
			
		||||
		jobs = append(jobs, job)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	_, err = r.DB.Exec(
 | 
			
		||||
		"INSERT INTO job (job_id, cluster, subcluster, start_time, hpc_user, project, cluster_partition, array_job_id, num_nodes, num_hwthreads, num_acc, exclusive, monitoring_status, smt, job_state, duration, walltime, footprint, energy, energy_footprint, resources, meta_data) SELECT job_id, cluster, subcluster, start_time, hpc_user, project, cluster_partition, array_job_id, num_nodes, num_hwthreads, num_acc, exclusive, monitoring_status, smt, job_state, duration, walltime, footprint, energy, energy_footprint, resources, meta_data FROM job_cache")
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		log.Warnf("Error while Job sync: %v", err)
 | 
			
		||||
		return err
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	_, err = r.DB.Exec("DELETE FROM job_cache")
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		log.Warn("Error while Job cache clean")
 | 
			
		||||
		return err
 | 
			
		||||
		log.Warnf("Error while Job cache clean: %v", err)
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return nil
 | 
			
		||||
	return jobs, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Start inserts a new job in the table, returning the unique job ID.
 | 
			
		||||
 
 | 
			
		||||
@@ -4,31 +4,54 @@
 | 
			
		||||
// license that can be found in the LICENSE file.
 | 
			
		||||
package repository
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"sync"
 | 
			
		||||
 | 
			
		||||
	"github.com/ClusterCockpit/cc-backend/pkg/schema"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type JobHook interface {
 | 
			
		||||
	jobStartCallback()
 | 
			
		||||
	jobStopCallback()
 | 
			
		||||
	JobStartCallback(job *schema.Job)
 | 
			
		||||
	JobStopCallback(job *schema.Job)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
var hooks []JobHook
 | 
			
		||||
var (
 | 
			
		||||
	initOnce sync.Once
 | 
			
		||||
	hooks    []JobHook
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func RegisterJobJook(hook JobHook) {
 | 
			
		||||
	initOnce.Do(func() {
 | 
			
		||||
		hooks = make([]JobHook, 0)
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	if hook != nil {
 | 
			
		||||
		hooks = append(hooks, hook)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func CallJobStartHooks() {
 | 
			
		||||
func CallJobStartHooks(jobs []*schema.Job) {
 | 
			
		||||
	if hooks == nil {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for _, hook := range hooks {
 | 
			
		||||
		if hook != nil {
 | 
			
		||||
			hook.jobStartCallback()
 | 
			
		||||
			for _, job := range jobs {
 | 
			
		||||
				hook.JobStartCallback(job)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func CallJobStopHooks() {
 | 
			
		||||
func CallJobStopHooks(job *schema.Job) {
 | 
			
		||||
	if hooks == nil {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for _, hook := range hooks {
 | 
			
		||||
		if hook != nil {
 | 
			
		||||
			hook.jobStopCallback()
 | 
			
		||||
			hook.JobStopCallback(job)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										3
									
								
								internal/tagger/apps/python.txt
									
									
									
									
									
										Normal file
									
								
							
							
								
								
								
								
								
									
									
								
							
						
						
									
										3
									
								
								internal/tagger/apps/python.txt
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,3 @@
 | 
			
		||||
python
 | 
			
		||||
anaconda
 | 
			
		||||
conda
 | 
			
		||||
@@ -8,6 +8,7 @@ import (
 | 
			
		||||
	"bufio"
 | 
			
		||||
	"embed"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"io/fs"
 | 
			
		||||
	"path/filepath"
 | 
			
		||||
	"strings"
 | 
			
		||||
 | 
			
		||||
@@ -27,16 +28,10 @@ type appInfo struct {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type AppTagger struct {
 | 
			
		||||
	apps []appInfo
 | 
			
		||||
	apps map[string]appInfo
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (t *AppTagger) Register() error {
 | 
			
		||||
	files, err := appFiles.ReadDir("apps")
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return fmt.Errorf("error reading app folder: %#v", err)
 | 
			
		||||
	}
 | 
			
		||||
	t.apps = make([]appInfo, 0)
 | 
			
		||||
 | 
			
		||||
func (t *AppTagger) scanApps(files []fs.DirEntry) error {
 | 
			
		||||
	for _, fn := range files {
 | 
			
		||||
		fns := fn.Name()
 | 
			
		||||
		log.Debugf("Process: %s", fns)
 | 
			
		||||
@@ -50,12 +45,25 @@ func (t *AppTagger) Register() error {
 | 
			
		||||
		for scanner.Scan() {
 | 
			
		||||
			ai.strings = append(ai.strings, scanner.Text())
 | 
			
		||||
		}
 | 
			
		||||
		t.apps = append(t.apps, ai)
 | 
			
		||||
		delete(t.apps, ai.tag)
 | 
			
		||||
		t.apps[ai.tag] = ai
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// func (t *AppTagger) Reload() error {
 | 
			
		||||
//
 | 
			
		||||
// }
 | 
			
		||||
 | 
			
		||||
func (t *AppTagger) Register() error {
 | 
			
		||||
	files, err := appFiles.ReadDir("apps")
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return fmt.Errorf("error reading app folder: %#v", err)
 | 
			
		||||
	}
 | 
			
		||||
	t.apps = make(map[string]appInfo, 0)
 | 
			
		||||
	return t.scanApps(files)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (t *AppTagger) Match(job *schema.Job) {
 | 
			
		||||
	r := repository.GetJobRepository()
 | 
			
		||||
	meta, err := r.FetchMetadata(job)
 | 
			
		||||
 
 | 
			
		||||
@@ -35,7 +35,7 @@ func TestRegister(t *testing.T) {
 | 
			
		||||
	err := tagger.Register()
 | 
			
		||||
	noErr(t, err)
 | 
			
		||||
 | 
			
		||||
	if len(tagger.apps) != 3 {
 | 
			
		||||
	if len(tagger.apps) != 4 {
 | 
			
		||||
		t.Errorf("wrong summary for diagnostic \ngot: %d \nwant: 3", len(tagger.apps))
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -4,14 +4,48 @@
 | 
			
		||||
// license that can be found in the LICENSE file.
 | 
			
		||||
package tagger
 | 
			
		||||
 | 
			
		||||
import "github.com/ClusterCockpit/cc-backend/pkg/schema"
 | 
			
		||||
import (
 | 
			
		||||
	"sync"
 | 
			
		||||
 | 
			
		||||
	"github.com/ClusterCockpit/cc-backend/internal/repository"
 | 
			
		||||
	"github.com/ClusterCockpit/cc-backend/pkg/schema"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type Tagger interface {
 | 
			
		||||
	Register() error
 | 
			
		||||
	Match(job *schema.Job)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func Init() error {
 | 
			
		||||
var (
 | 
			
		||||
	initOnce  sync.Once
 | 
			
		||||
	jobTagger *JobTagger
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
	return nil
 | 
			
		||||
type JobTagger struct {
 | 
			
		||||
	startTaggers []Tagger
 | 
			
		||||
	stopTaggers  []Tagger
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func Init() {
 | 
			
		||||
	initOnce.Do(func() {
 | 
			
		||||
		jobTagger = &JobTagger{}
 | 
			
		||||
		jobTagger.startTaggers = make([]Tagger, 0)
 | 
			
		||||
		jobTagger.startTaggers = append(jobTagger.startTaggers, &AppTagger{})
 | 
			
		||||
 | 
			
		||||
		for _, tagger := range jobTagger.startTaggers {
 | 
			
		||||
			tagger.Register()
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		// jobTagger.stopTaggers = make([]Tagger, 0)
 | 
			
		||||
		repository.RegisterJobJook(jobTagger)
 | 
			
		||||
	})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (jt *JobTagger) JobStartCallback(job *schema.Job) {
 | 
			
		||||
	for _, tagger := range jobTagger.startTaggers {
 | 
			
		||||
		tagger.Match(job)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (jt *JobTagger) JobStopCallback(job *schema.Job) {
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										31
									
								
								internal/tagger/tagger_test.go
									
									
									
									
									
										Normal file
									
								
							
							
								
								
								
								
								
									
									
								
							
						
						
									
										31
									
								
								internal/tagger/tagger_test.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,31 @@
 | 
			
		||||
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
 | 
			
		||||
// All rights reserved.
 | 
			
		||||
// Use of this source code is governed by a MIT-style
 | 
			
		||||
// license that can be found in the LICENSE file.
 | 
			
		||||
package tagger
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"testing"
 | 
			
		||||
 | 
			
		||||
	"github.com/ClusterCockpit/cc-backend/internal/repository"
 | 
			
		||||
	"github.com/ClusterCockpit/cc-backend/pkg/schema"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func TestInit(t *testing.T) {
 | 
			
		||||
	Init()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestJobStartCallback(t *testing.T) {
 | 
			
		||||
	Init()
 | 
			
		||||
	r := setup(t)
 | 
			
		||||
	job, err := r.FindByIdDirect(2)
 | 
			
		||||
	noErr(t, err)
 | 
			
		||||
 | 
			
		||||
	jobs := make([]*schema.Job, 0, 1)
 | 
			
		||||
	jobs = append(jobs, job)
 | 
			
		||||
 | 
			
		||||
	repository.CallJobStartHooks(jobs)
 | 
			
		||||
	if !r.HasTag(2, "app", "python") {
 | 
			
		||||
		t.Errorf("missing tag python")
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
@@ -28,8 +28,8 @@ func RegisterCommitJobService() {
 | 
			
		||||
			func() {
 | 
			
		||||
				start := time.Now()
 | 
			
		||||
				log.Printf("Jobcache sync started at %s", start.Format(time.RFC3339))
 | 
			
		||||
				jobRepo.SyncJobs()
 | 
			
		||||
				repository.CallJobStartHooks()
 | 
			
		||||
				log.Printf("Jobcache sync is done and took %s", time.Since(start))
 | 
			
		||||
				jobs, _ := jobRepo.SyncJobs()
 | 
			
		||||
				repository.CallJobStartHooks(jobs)
 | 
			
		||||
				log.Printf("Jobcache sync and job callbacks are done and took %s", time.Since(start))
 | 
			
		||||
			}))
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user