Merge pull request #397 from ClusterCockpit/134-job-tagging

134 job tagging
This commit is contained in:
Jan Eitzinger 2025-05-27 13:14:50 +02:00 committed by GitHub
commit bdffe73f59
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
42 changed files with 1327 additions and 137 deletions

View File

@ -7,7 +7,8 @@ package main
import "flag"
var (
flagReinitDB, flagInit, flagServer, flagSyncLDAP, flagGops, flagMigrateDB, flagRevertDB, flagForceDB, flagDev, flagVersion, flagLogDateTime bool
flagReinitDB, flagInit, flagServer, flagSyncLDAP, flagGops, flagMigrateDB, flagRevertDB,
flagForceDB, flagDev, flagVersion, flagLogDateTime, flagApplyTags bool
flagNewUser, flagDelUser, flagGenJWT, flagConfigFile, flagImportJob, flagLogLevel string
)
@ -21,6 +22,7 @@ func cliInit() {
flag.BoolVar(&flagVersion, "version", false, "Show version information and exit")
flag.BoolVar(&flagMigrateDB, "migrate-db", false, "Migrate database to supported version and exit")
flag.BoolVar(&flagRevertDB, "revert-db", false, "Migrate database to previous version and exit")
flag.BoolVar(&flagApplyTags, "apply-tags", false, "Run taggers on all completed jobs and exit")
flag.BoolVar(&flagForceDB, "force-db", false, "Force database version, clear dirty flag and exit")
flag.BoolVar(&flagLogDateTime, "logdate", false, "Set this flag to add date and time to log messages")
flag.StringVar(&flagConfigFile, "config", "./config.json", "Specify alternative path to `config.json`")

View File

@ -19,7 +19,9 @@ import (
"github.com/ClusterCockpit/cc-backend/internal/importer"
"github.com/ClusterCockpit/cc-backend/internal/metricdata"
"github.com/ClusterCockpit/cc-backend/internal/repository"
"github.com/ClusterCockpit/cc-backend/internal/tagger"
"github.com/ClusterCockpit/cc-backend/internal/taskManager"
"github.com/ClusterCockpit/cc-backend/internal/util"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
"github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/runtimeEnv"
@ -211,11 +213,22 @@ func main() {
}
}
if config.Keys.EnableJobTaggers {
tagger.Init()
}
if flagApplyTags {
if err := tagger.RunTaggers(); err != nil {
log.Abortf("Running job taggers.\nError: %s\n", err.Error())
}
}
if !flagServer {
log.Exit("No errors, server flag not set. Exiting cc-backend.")
}
archiver.Start(repository.GetJobRepository())
taskManager.Start()
serverInit()
@ -237,6 +250,8 @@ func main() {
serverShutdown()
util.FsWatcherShutdown()
taskManager.Shutdown()
}()

4
go.mod
View File

@ -9,6 +9,8 @@ require (
github.com/ClusterCockpit/cc-units v0.4.0
github.com/Masterminds/squirrel v1.5.4
github.com/coreos/go-oidc/v3 v3.12.0
github.com/expr-lang/expr v1.17.3
github.com/fsnotify/fsnotify v1.9.0
github.com/go-co-op/gocron/v2 v2.16.0
github.com/go-ldap/ldap/v3 v3.4.10
github.com/go-sql-driver/mysql v1.9.0
@ -20,6 +22,7 @@ require (
github.com/gorilla/sessions v1.4.0
github.com/influxdata/influxdb-client-go/v2 v2.14.0
github.com/jmoiron/sqlx v1.4.0
github.com/joho/godotenv v1.5.1
github.com/mattn/go-sqlite3 v1.14.24
github.com/prometheus/client_golang v1.21.0
github.com/prometheus/common v0.62.0
@ -58,7 +61,6 @@ require (
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect
github.com/influxdata/line-protocol v0.0.0-20210922203350-b1ad95c89adf // indirect
github.com/joho/godotenv v1.5.1 // indirect
github.com/jonboulle/clockwork v0.5.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/jpillora/backoff v1.0.0 // indirect

4
go.sum
View File

@ -53,8 +53,12 @@ github.com/docker/go-connections v0.5.0 h1:USnMq7hx7gwdVZq1L49hLXaFtUdTADjXGp+uj
github.com/docker/go-connections v0.5.0/go.mod h1:ov60Kzw0kKElRwhNs9UlUHAE/F9Fe6GLaXnqyDdmEXc=
github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4=
github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/expr-lang/expr v1.17.3 h1:myeTTuDFz7k6eFe/JPlep/UsiIjVhG61FMHFu63U7j0=
github.com/expr-lang/expr v1.17.3/go.mod h1:8/vRC7+7HBzESEqt5kKpYXxrxkr31SaO8r40VO/1IT4=
github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg=
github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S9k=
github.com/fsnotify/fsnotify v1.9.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0=
github.com/go-asn1-ber/asn1-ber v1.5.7 h1:DTX+lbVTWaTw1hQ+PbZPlnDZPEIs0SS/GCZAl535dDk=
github.com/go-asn1-ber/asn1-ber v1.5.7/go.mod h1:hEBeB/ic+5LoWskz+yKT7vGhhPYkProFKoKdwZRWMe0=
github.com/go-co-op/gocron/v2 v2.16.0 h1:uqUF6WFZ4enRU45pWFNcn1xpDLc+jBOTKhPQI16Z1xs=

View File

@ -123,7 +123,7 @@ func setup(t *testing.T) *api.RestApi {
t.Fatal(err)
}
if err := os.WriteFile(filepath.Join(jobarchive, "version.txt"), []byte(fmt.Sprintf("%d", 2)), 0666); err != nil {
if err := os.WriteFile(filepath.Join(jobarchive, "version.txt"), fmt.Appendf(nil, "%d", 2), 0666); err != nil {
t.Fatal(err)
}
@ -204,7 +204,7 @@ func TestRestApi(t *testing.T) {
restapi.MountApiRoutes(r)
var TestJobId int64 = 123
var TestClusterName string = "testcluster"
TestClusterName := "testcluster"
var TestStartTime int64 = 123456789
const startJobBody string = `{
@ -221,7 +221,6 @@ func TestRestApi(t *testing.T) {
"exclusive": 1,
"monitoringStatus": 1,
"smt": 1,
"tags": [{ "type": "testTagType", "name": "testTagName", "scope": "testuser" }],
"resources": [
{
"hostname": "host123",
@ -252,16 +251,17 @@ func TestRestApi(t *testing.T) {
if response.StatusCode != http.StatusCreated {
t.Fatal(response.Status, recorder.Body.String())
}
resolver := graph.GetResolverInstance()
// resolver := graph.GetResolverInstance()
restapi.JobRepository.SyncJobs()
job, err := restapi.JobRepository.Find(&TestJobId, &TestClusterName, &TestStartTime)
if err != nil {
t.Fatal(err)
}
job.Tags, err = resolver.Job().Tags(ctx, job)
if err != nil {
t.Fatal(err)
}
// job.Tags, err = resolver.Job().Tags(ctx, job)
// if err != nil {
// t.Fatal(err)
// }
if job.JobID != 123 ||
job.User != "testuser" ||
@ -282,9 +282,9 @@ func TestRestApi(t *testing.T) {
t.Fatalf("unexpected job properties: %#v", job)
}
if len(job.Tags) != 1 || job.Tags[0].Type != "testTagType" || job.Tags[0].Name != "testTagName" || job.Tags[0].Scope != "testuser" {
t.Fatalf("unexpected tags: %#v", job.Tags)
}
// if len(job.Tags) != 1 || job.Tags[0].Type != "testTagType" || job.Tags[0].Name != "testTagName" || job.Tags[0].Scope != "testuser" {
// t.Fatalf("unexpected tags: %#v", job.Tags)
// }
}); !ok {
return
}
@ -352,7 +352,7 @@ func TestRestApi(t *testing.T) {
t.Run("CheckDoubleStart", func(t *testing.T) {
// Starting a job with the same jobId and cluster should only be allowed if the startTime is far appart!
body := strings.Replace(startJobBody, `"startTime": 123456789`, `"startTime": 123456790`, -1)
body := strings.ReplaceAll(startJobBody, `"startTime": 123456789`, `"startTime": 123456790`)
req := httptest.NewRequest(http.MethodPost, "/jobs/start_job/", bytes.NewBuffer([]byte(body)))
recorder := httptest.NewRecorder()
@ -402,6 +402,7 @@ func TestRestApi(t *testing.T) {
}
time.Sleep(1 * time.Second)
restapi.JobRepository.SyncJobs()
const stopJobBodyFailed string = `{
"jobId": 12345,

View File

@ -820,7 +820,7 @@ func (api *RestApi) removeTags(rw http.ResponseWriter, r *http.Request) {
}
rw.WriteHeader(http.StatusOK)
rw.Write([]byte(fmt.Sprintf("Deleted Tags from DB: %d successfull of %d requested\n", currentCount, targetCount)))
fmt.Fprintf(rw, "Deleted Tags from DB: %d successfull of %d requested\n", currentCount, targetCount)
}
// startJob godoc
@ -846,6 +846,7 @@ func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) {
return
}
log.Printf("REST: %s\n", req.GoString())
req.State = schema.JobStateRunning
if err := importer.SanityChecks(&req.BaseJob); err != nil {
@ -930,10 +931,14 @@ func (api *RestApi) stopJobByRequest(rw http.ResponseWriter, r *http.Request) {
// log.Printf("loading db job for stopJobByRequest... : stopJobApiRequest=%v", req)
job, err = api.JobRepository.Find(req.JobId, req.Cluster, req.StartTime)
if err != nil {
job, err = api.JobRepository.FindCached(req.JobId, req.Cluster, req.StartTime)
// FIXME: Previous error is hidden
if err != nil {
handleError(fmt.Errorf("finding job failed: %w", err), http.StatusUnprocessableEntity, rw)
return
}
}
api.checkAndHandleStopJob(rw, job, req)
}
@ -1097,10 +1102,15 @@ func (api *RestApi) checkAndHandleStopJob(rw http.ResponseWriter, job *schema.Jo
// Mark job as stopped in the database (update state and duration)
job.Duration = int32(req.StopTime - job.StartTime.Unix())
job.State = req.State
api.JobRepository.Mutex.Lock()
if err := api.JobRepository.Stop(job.ID, job.Duration, job.State, job.MonitoringStatus); err != nil {
if err := api.JobRepository.StopCached(job.ID, job.Duration, job.State, job.MonitoringStatus); err != nil {
api.JobRepository.Mutex.Unlock()
handleError(fmt.Errorf("jobId %d (id %d) on %s : marking job as '%s' (duration: %d) in DB failed: %w", job.JobID, job.ID, job.Cluster, job.State, job.Duration, err), http.StatusInternalServerError, rw)
return
}
}
api.JobRepository.Mutex.Unlock()
log.Printf("archiving job... (dbid: %d): cluster=%s, jobId=%d, user=%s, startTime=%s, duration=%d, state=%s", job.ID, job.Cluster, job.JobID, job.User, job.StartTime, job.Duration, job.State)

View File

@ -72,7 +72,11 @@ func archivingWorker() {
}
log.Debugf("archiving job %d took %s", job.JobID, time.Since(start))
log.Printf("archiving job (dbid: %d) successful", job.ID)
repository.CallJobStopHooks(job)
archivePending.Done()
default:
continue
}
}
}

View File

@ -237,7 +237,7 @@ func (auth *Authentication) Login(
limiter := getIPUserLimiter(ip, username)
if !limiter.Allow() {
log.Warnf("AUTH/RATE > Too many login attempts for combination IP: %s, Username: %s", ip, username)
onfailure(rw, r, errors.New("Too many login attempts, try again in a few minutes."))
onfailure(rw, r, errors.New("too many login attempts, try again in a few minutes"))
return
}

View File

@ -143,7 +143,7 @@ func (r *mutationResolver) CreateTag(ctx context.Context, typeArg string, name s
return &schema.Tag{ID: id, Type: typeArg, Name: name, Scope: scope}, nil
} else {
log.Warnf("Not authorized to create tag with scope: %s", scope)
return nil, fmt.Errorf("Not authorized to create tag with scope: %s", scope)
return nil, fmt.Errorf("not authorized to create tag with scope: %s", scope)
}
}
@ -179,7 +179,7 @@ func (r *mutationResolver) AddTagsToJob(ctx context.Context, job string, tagIds
_, _, tscope, exists := r.Repo.TagInfo(tid)
if !exists {
log.Warnf("Tag does not exist (ID): %d", tid)
return nil, fmt.Errorf("Tag does not exist (ID): %d", tid)
return nil, fmt.Errorf("tag does not exist (ID): %d", tid)
}
// Test Access: Admins && Admin Tag OR Support/Admin and Global Tag OR Everyone && Private Tag
@ -193,7 +193,7 @@ func (r *mutationResolver) AddTagsToJob(ctx context.Context, job string, tagIds
}
} else {
log.Warnf("Not authorized to add tag: %d", tid)
return nil, fmt.Errorf("Not authorized to add tag: %d", tid)
return nil, fmt.Errorf("not authorized to add tag: %d", tid)
}
}
@ -226,7 +226,7 @@ func (r *mutationResolver) RemoveTagsFromJob(ctx context.Context, job string, ta
_, _, tscope, exists := r.Repo.TagInfo(tid)
if !exists {
log.Warnf("Tag does not exist (ID): %d", tid)
return nil, fmt.Errorf("Tag does not exist (ID): %d", tid)
return nil, fmt.Errorf("tag does not exist (ID): %d", tid)
}
// Test Access: Admins && Admin Tag OR Support/Admin and Global Tag OR Everyone && Private Tag
@ -240,7 +240,7 @@ func (r *mutationResolver) RemoveTagsFromJob(ctx context.Context, job string, ta
}
} else {
log.Warnf("Not authorized to remove tag: %d", tid)
return nil, fmt.Errorf("Not authorized to remove tag: %d", tid)
return nil, fmt.Errorf("not authorized to remove tag: %d", tid)
}
}
@ -269,7 +269,7 @@ func (r *mutationResolver) RemoveTagFromList(ctx context.Context, tagIds []strin
_, _, tscope, exists := r.Repo.TagInfo(tid)
if !exists {
log.Warnf("Tag does not exist (ID): %d", tid)
return nil, fmt.Errorf("Tag does not exist (ID): %d", tid)
return nil, fmt.Errorf("tag does not exist (ID): %d", tid)
}
// Test Access: Admins && Admin Tag OR Everyone && Private Tag
@ -283,7 +283,7 @@ func (r *mutationResolver) RemoveTagFromList(ctx context.Context, tagIds []strin
}
} else {
log.Warnf("Not authorized to remove tag: %d", tid)
return nil, fmt.Errorf("Not authorized to remove tag: %d", tid)
return nil, fmt.Errorf("not authorized to remove tag: %d", tid)
}
}
return tags, nil
@ -499,10 +499,7 @@ func (r *queryResolver) Jobs(ctx context.Context, filter []*model.JobFilter, pag
return nil, err
}
hasNextPage := false
if len(nextJobs) == 1 {
hasNextPage = true
}
hasNextPage := len(nextJobs) == 1
return &model.JobResultList{Items: jobs, Count: &count, HasNextPage: &hasNextPage}, nil
}
@ -513,8 +510,8 @@ func (r *queryResolver) JobsStatistics(ctx context.Context, filter []*model.JobF
var stats []*model.JobsStatistics
// Top Level Defaults
var defaultDurationBins string = "1h"
var defaultMetricBins int = 10
defaultDurationBins := "1h"
defaultMetricBins := 10
if requireField(ctx, "totalJobs") || requireField(ctx, "totalWalltime") || requireField(ctx, "totalNodes") || requireField(ctx, "totalCores") ||
requireField(ctx, "totalAccs") || requireField(ctx, "totalNodeHours") || requireField(ctx, "totalCoreHours") || requireField(ctx, "totalAccHours") {
@ -779,9 +776,11 @@ func (r *Resolver) Query() generated.QueryResolver { return &queryResolver{r} }
// SubCluster returns generated.SubClusterResolver implementation.
func (r *Resolver) SubCluster() generated.SubClusterResolver { return &subClusterResolver{r} }
type clusterResolver struct{ *Resolver }
type jobResolver struct{ *Resolver }
type metricValueResolver struct{ *Resolver }
type mutationResolver struct{ *Resolver }
type queryResolver struct{ *Resolver }
type subClusterResolver struct{ *Resolver }
type (
clusterResolver struct{ *Resolver }
jobResolver struct{ *Resolver }
metricValueResolver struct{ *Resolver }
mutationResolver struct{ *Resolver }
queryResolver struct{ *Resolver }
subClusterResolver struct{ *Resolver }
)

View File

@ -166,7 +166,7 @@ func TestHandleImportFlag(t *testing.T) {
}
result := readResult(t, testname)
job, err := r.Find(&result.JobId, &result.Cluster, &result.StartTime)
job, err := r.FindCached(&result.JobId, &result.Cluster, &result.StartTime)
if err != nil {
t.Fatal(err)
}

View File

@ -9,12 +9,12 @@ import (
"encoding/json"
"errors"
"fmt"
"maps"
"math"
"strconv"
"sync"
"time"
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
"github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/lrucache"
@ -33,6 +33,7 @@ type JobRepository struct {
stmtCache *sq.StmtCache
cache *lrucache.Cache
driver string
Mutex sync.Mutex
}
func GetJobRepository() *JobRepository {
@ -51,17 +52,29 @@ func GetJobRepository() *JobRepository {
}
var jobColumns []string = []string{
"job.id", "job.job_id", "job.hpc_user", "job.project", "job.cluster", "job.subcluster", "job.start_time", "job.cluster_partition", "job.array_job_id",
"job.num_nodes", "job.num_hwthreads", "job.num_acc", "job.exclusive", "job.monitoring_status", "job.smt", "job.job_state",
"job.duration", "job.walltime", "job.resources", "job.footprint", "job.energy",
"job.id", "job.job_id", "job.hpc_user", "job.project", "job.cluster", "job.subcluster",
"job.start_time", "job.cluster_partition", "job.array_job_id", "job.num_nodes",
"job.num_hwthreads", "job.num_acc", "job.exclusive", "job.monitoring_status",
"job.smt", "job.job_state", "job.duration", "job.walltime", "job.resources",
"job.footprint", "job.energy",
}
func scanJob(row interface{ Scan(...interface{}) error }) (*schema.Job, error) {
var jobCacheColumns []string = []string{
"job_cache.id", "job_cache.job_id", "job_cache.hpc_user", "job_cache.project", "job_cache.cluster",
"job_cache.subcluster", "job_cache.start_time", "job_cache.cluster_partition",
"job_cache.array_job_id", "job_cache.num_nodes", "job_cache.num_hwthreads",
"job_cache.num_acc", "job_cache.exclusive", "job_cache.monitoring_status", "job_cache.smt",
"job_cache.job_state", "job_cache.duration", "job_cache.walltime", "job_cache.resources",
"job_cache.footprint", "job_cache.energy",
}
func scanJob(row interface{ Scan(...any) error }) (*schema.Job, error) {
job := &schema.Job{}
if err := row.Scan(
&job.ID, &job.JobID, &job.User, &job.Project, &job.Cluster, &job.SubCluster, &job.StartTimeUnix, &job.Partition, &job.ArrayJobId,
&job.NumNodes, &job.NumHWThreads, &job.NumAcc, &job.Exclusive, &job.MonitoringStatus, &job.SMT, &job.State,
&job.ID, &job.JobID, &job.User, &job.Project, &job.Cluster, &job.SubCluster,
&job.StartTimeUnix, &job.Partition, &job.ArrayJobId, &job.NumNodes, &job.NumHWThreads,
&job.NumAcc, &job.Exclusive, &job.MonitoringStatus, &job.SMT, &job.State,
&job.Duration, &job.Walltime, &job.RawResources, &job.RawFootprint, &job.Energy); err != nil {
log.Warnf("Error while scanning rows (Job): %v", err)
return nil, err
@ -138,17 +151,6 @@ func (r *JobRepository) Flush() error {
return nil
}
func scanJobLink(row interface{ Scan(...interface{}) error }) (*model.JobLink, error) {
jobLink := &model.JobLink{}
if err := row.Scan(
&jobLink.ID, &jobLink.JobID); err != nil {
log.Warn("Error while scanning rows (jobLink)")
return nil, err
}
return jobLink, nil
}
func (r *JobRepository) FetchMetadata(job *schema.Job) (map[string]string, error) {
start := time.Now()
cachekey := fmt.Sprintf("metadata:%d", job.ID)
@ -189,9 +191,7 @@ func (r *JobRepository) UpdateMetadata(job *schema.Job, key, val string) (err er
if job.MetaData != nil {
cpy := make(map[string]string, len(job.MetaData)+1)
for k, v := range job.MetaData {
cpy[k] = v
}
maps.Copy(cpy, job.MetaData)
cpy[key] = val
job.MetaData = cpy
} else {
@ -389,7 +389,7 @@ func (r *JobRepository) FindColumnValues(user *schema.User, query string, table
func (r *JobRepository) Partitions(cluster string) ([]string, error) {
var err error
start := time.Now()
partitions := r.cache.Get("partitions:"+cluster, func() (interface{}, time.Duration, int) {
partitions := r.cache.Get("partitions:"+cluster, func() (any, time.Duration, int) {
parts := []string{}
if err = r.DB.Select(&parts, `SELECT DISTINCT job.cluster_partition FROM job WHERE job.cluster = ?;`, cluster); err != nil {
return nil, 0, 1000
@ -477,6 +477,7 @@ func (r *JobRepository) StopJobsExceedingWalltimeBy(seconds int) error {
return nil
}
// FIXME: Reconsider filtering short jobs with harcoded threshold
func (r *JobRepository) FindRunningJobs(cluster string) ([]*schema.Job, error) {
query := sq.Select(jobColumns...).From("job").
Where(fmt.Sprintf("job.cluster = '%s'", cluster)).

View File

@ -13,6 +13,14 @@ import (
sq "github.com/Masterminds/squirrel"
)
const NamedJobCacheInsert string = `INSERT INTO job_cache (
job_id, hpc_user, project, cluster, subcluster, cluster_partition, array_job_id, num_nodes, num_hwthreads, num_acc,
exclusive, monitoring_status, smt, job_state, start_time, duration, walltime, footprint, energy, energy_footprint, resources, meta_data
) VALUES (
:job_id, :hpc_user, :project, :cluster, :subcluster, :cluster_partition, :array_job_id, :num_nodes, :num_hwthreads, :num_acc,
:exclusive, :monitoring_status, :smt, :job_state, :start_time, :duration, :walltime, :footprint, :energy, :energy_footprint, :resources, :meta_data
);`
const NamedJobInsert string = `INSERT INTO job (
job_id, hpc_user, project, cluster, subcluster, cluster_partition, array_job_id, num_nodes, num_hwthreads, num_acc,
exclusive, monitoring_status, smt, job_state, start_time, duration, walltime, footprint, energy, energy_footprint, resources, meta_data
@ -22,7 +30,9 @@ const NamedJobInsert string = `INSERT INTO job (
);`
func (r *JobRepository) InsertJob(job *schema.JobMeta) (int64, error) {
res, err := r.DB.NamedExec(NamedJobInsert, job)
r.Mutex.Lock()
res, err := r.DB.NamedExec(NamedJobCacheInsert, job)
r.Mutex.Unlock()
if err != nil {
log.Warn("Error while NamedJobInsert")
return 0, err
@ -36,6 +46,45 @@ func (r *JobRepository) InsertJob(job *schema.JobMeta) (int64, error) {
return id, nil
}
func (r *JobRepository) SyncJobs() ([]*schema.Job, error) {
r.Mutex.Lock()
defer r.Mutex.Unlock()
query := sq.Select(jobCacheColumns...).From("job_cache")
rows, err := query.RunWith(r.stmtCache).Query()
if err != nil {
log.Errorf("Error while running query %v", err)
return nil, err
}
jobs := make([]*schema.Job, 0, 50)
for rows.Next() {
job, err := scanJob(rows)
if err != nil {
rows.Close()
log.Warn("Error while scanning rows")
return nil, err
}
jobs = append(jobs, job)
}
_, err = r.DB.Exec(
"INSERT INTO job (job_id, cluster, subcluster, start_time, hpc_user, project, cluster_partition, array_job_id, num_nodes, num_hwthreads, num_acc, exclusive, monitoring_status, smt, job_state, duration, walltime, footprint, energy, energy_footprint, resources, meta_data) SELECT job_id, cluster, subcluster, start_time, hpc_user, project, cluster_partition, array_job_id, num_nodes, num_hwthreads, num_acc, exclusive, monitoring_status, smt, job_state, duration, walltime, footprint, energy, energy_footprint, resources, meta_data FROM job_cache")
if err != nil {
log.Warnf("Error while Job sync: %v", err)
return nil, err
}
_, err = r.DB.Exec("DELETE FROM job_cache")
if err != nil {
log.Warnf("Error while Job cache clean: %v", err)
return nil, err
}
return jobs, nil
}
// Start inserts a new job in the table, returning the unique job ID.
// Statistics are not transfered!
func (r *JobRepository) Start(job *schema.JobMeta) (id int64, err error) {
@ -73,3 +122,19 @@ func (r *JobRepository) Stop(
_, err = stmt.RunWith(r.stmtCache).Exec()
return
}
func (r *JobRepository) StopCached(
jobId int64,
duration int32,
state schema.JobState,
monitoringStatus int32,
) (err error) {
stmt := sq.Update("job_cache").
Set("job_state", state).
Set("duration", duration).
Set("monitoring_status", monitoringStatus).
Where("job.id = ?", jobId)
_, err = stmt.RunWith(r.stmtCache).Exec()
return
}

View File

@ -43,6 +43,26 @@ func (r *JobRepository) Find(
return scanJob(q.RunWith(r.stmtCache).QueryRow())
}
func (r *JobRepository) FindCached(
jobId *int64,
cluster *string,
startTime *int64,
) (*schema.Job, error) {
q := sq.Select(jobCacheColumns...).From("job_cache").
Where("job_cache.job_id = ?", *jobId)
if cluster != nil {
q = q.Where("job_cache.cluster = ?", *cluster)
}
if startTime != nil {
q = q.Where("job_cache.start_time = ?", *startTime)
}
q = q.OrderBy("job_cache.id DESC") // always use newest matching job by db id if more than one match
return scanJob(q.RunWith(r.stmtCache).QueryRow())
}
// Find executes a SQL query to find a specific batch job.
// The job is queried using the batch job id, the cluster name,
// and the start time of the job in UNIX epoch time seconds.
@ -83,6 +103,35 @@ func (r *JobRepository) FindAll(
return jobs, nil
}
// Get complete joblist only consisting of db ids.
// This is useful to process large job counts and intended to be used
// together with FindById to process jobs one by one
func (r *JobRepository) GetJobList() ([]int64, error) {
query := sq.Select("id").From("job").
Where("job.job_state != 'running'")
rows, err := query.RunWith(r.stmtCache).Query()
if err != nil {
log.Error("Error while running query")
return nil, err
}
jl := make([]int64, 0, 1000)
for rows.Next() {
var id int64
err := rows.Scan(&id)
if err != nil {
rows.Close()
log.Warn("Error while scanning rows")
return nil, err
}
jl = append(jl, id)
}
log.Infof("Return job count %d", len(jl))
return jl, nil
}
// FindById executes a SQL query to find a specific batch job.
// The job is queried using the database id.
// It returns a pointer to a schema.Job data structure and an error variable.

View File

@ -0,0 +1,57 @@
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package repository
import (
"sync"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
)
type JobHook interface {
JobStartCallback(job *schema.Job)
JobStopCallback(job *schema.Job)
}
var (
initOnce sync.Once
hooks []JobHook
)
func RegisterJobJook(hook JobHook) {
initOnce.Do(func() {
hooks = make([]JobHook, 0)
})
if hook != nil {
hooks = append(hooks, hook)
}
}
func CallJobStartHooks(jobs []*schema.Job) {
if hooks == nil {
return
}
for _, hook := range hooks {
if hook != nil {
for _, job := range jobs {
hook.JobStartCallback(job)
}
}
}
}
func CallJobStopHooks(job *schema.Job) {
if hooks == nil {
return
}
for _, hook := range hooks {
if hook != nil {
hook.JobStopCallback(job)
}
}
}

View File

@ -148,9 +148,7 @@ func BuildWhereClause(filter *model.JobFilter, query sq.SelectBuilder) sq.Select
}
if filter.DbID != nil {
dbIDs := make([]string, len(filter.DbID))
for i, val := range filter.DbID {
dbIDs[i] = val
}
copy(dbIDs, filter.DbID)
query = query.Where(sq.Eq{"job.id": dbIDs})
}
if filter.JobID != nil {

View File

@ -16,7 +16,7 @@ import (
"github.com/golang-migrate/migrate/v4/source/iofs"
)
const Version uint = 8
const Version uint = 9
//go:embed migrations/*
var migrationFiles embed.FS
@ -115,8 +115,17 @@ func MigrateDB(backend string, db string) error {
}
v, dirty, err := m.Version()
if err != nil {
if err == migrate.ErrNilVersion {
log.Warn("Legacy database without version or missing database file!")
} else {
return err
}
}
if v < Version {
log.Infof("unsupported database version %d, need %d.\nPlease backup your database file and run cc-backend -migrate-db", v, Version)
}
if dirty {
return fmt.Errorf("last migration to version %d has failed, please fix the db manually and force version with -force-db flag", Version)

View File

@ -0,0 +1 @@
DROP TABLE IF EXISTS job_cache;

View File

@ -0,0 +1,31 @@
CREATE TABLE "job_cache" (
id INTEGER PRIMARY KEY,
job_id BIGINT NOT NULL,
cluster VARCHAR(255) NOT NULL,
subcluster VARCHAR(255) NOT NULL,
start_time BIGINT NOT NULL, -- Unix timestamp
hpc_user VARCHAR(255) NOT NULL,
project VARCHAR(255) NOT NULL,
cluster_partition VARCHAR(255),
array_job_id BIGINT,
duration INT NOT NULL,
walltime INT NOT NULL,
job_state VARCHAR(255) NOT NULL
CHECK (job_state IN (
'running', 'completed', 'failed', 'cancelled',
'stopped', 'timeout', 'preempted', 'out_of_memory'
)),
meta_data TEXT, -- JSON
resources TEXT NOT NULL, -- JSON
num_nodes INT NOT NULL,
num_hwthreads INT,
num_acc INT,
smt TINYINT NOT NULL DEFAULT 1 CHECK (smt IN (0, 1)),
exclusive TINYINT NOT NULL DEFAULT 1 CHECK (exclusive IN (0, 1, 2)),
monitoring_status TINYINT NOT NULL DEFAULT 1
CHECK (monitoring_status IN (0, 1, 2, 3)),
energy REAL NOT NULL DEFAULT 0.0,
energy_footprint TEXT DEFAULT NULL,
footprint TEXT DEFAULT NULL,
UNIQUE (job_id, cluster, start_time)
);

View File

@ -45,6 +45,36 @@ func (r *JobRepository) AddTag(user *schema.User, job int64, tag int64) ([]*sche
return tags, archive.UpdateTags(j, archiveTags)
}
func (r *JobRepository) AddTagDirect(job int64, tag int64) ([]*schema.Tag, error) {
j, err := r.FindByIdDirect(job)
if err != nil {
log.Warn("Error while finding job by id")
return nil, err
}
q := sq.Insert("jobtag").Columns("job_id", "tag_id").Values(job, tag)
if _, err := q.RunWith(r.stmtCache).Exec(); err != nil {
s, _, _ := q.ToSql()
log.Errorf("Error adding tag with %s: %v", s, err)
return nil, err
}
tags, err := r.GetTagsDirect(&job)
if err != nil {
log.Warn("Error while getting tags for job")
return nil, err
}
archiveTags, err := r.getArchiveTags(&job)
if err != nil {
log.Warn("Error while getting tags for job")
return nil, err
}
return tags, archive.UpdateTags(j, archiveTags)
}
// Removes a tag from a job by tag id
func (r *JobRepository) RemoveTag(user *schema.User, job, tag int64) ([]*schema.Tag, error) {
j, err := r.FindByIdWithUser(user, job)
@ -82,7 +112,7 @@ func (r *JobRepository) RemoveJobTagByRequest(user *schema.User, job int64, tagT
tagID, exists := r.TagId(tagType, tagName, tagScope)
if !exists {
log.Warnf("Tag does not exist (name, type, scope): %s, %s, %s", tagName, tagType, tagScope)
return nil, fmt.Errorf("Tag does not exist (name, type, scope): %s, %s, %s", tagName, tagType, tagScope)
return nil, fmt.Errorf("tag does not exist (name, type, scope): %s, %s, %s", tagName, tagType, tagScope)
}
// Get Job
@ -122,7 +152,7 @@ func (r *JobRepository) RemoveTagByRequest(tagType string, tagName string, tagSc
tagID, exists := r.TagId(tagType, tagName, tagScope)
if !exists {
log.Warnf("Tag does not exist (name, type, scope): %s, %s, %s", tagName, tagType, tagScope)
return fmt.Errorf("Tag does not exist (name, type, scope): %s, %s, %s", tagName, tagType, tagScope)
return fmt.Errorf("tag does not exist (name, type, scope): %s, %s, %s", tagName, tagType, tagScope)
}
// Handle Delete JobTagTable
@ -291,6 +321,37 @@ func (r *JobRepository) AddTagOrCreate(user *schema.User, jobId int64, tagType s
return tagId, nil
}
func (r *JobRepository) AddTagOrCreateDirect(jobId int64, tagType string, tagName string) (tagId int64, err error) {
tagScope := "global"
tagId, exists := r.TagId(tagType, tagName, tagScope)
if !exists {
tagId, err = r.CreateTag(tagType, tagName, tagScope)
if err != nil {
return 0, err
}
}
if _, err := r.AddTagDirect(jobId, tagId); err != nil {
return 0, err
}
return tagId, nil
}
func (r *JobRepository) HasTag(jobId int64, tagType string, tagName string) bool {
var id int64
q := sq.Select("id").From("tag").Join("jobtag ON jobtag.tag_id = tag.id").
Where("jobtag.job_id = ?", jobId).Where("tag.tag_type = ?", tagType).
Where("tag.tag_name = ?", tagName)
err := q.RunWith(r.stmtCache).QueryRow().Scan(&id)
if err != nil {
return false
} else {
return true
}
}
// TagId returns the database id of the tag with the specified type and name.
func (r *JobRepository) TagId(tagType string, tagName string, tagScope string) (tagId int64, exists bool) {
exists = true
@ -346,6 +407,32 @@ func (r *JobRepository) GetTags(user *schema.User, job *int64) ([]*schema.Tag, e
return tags, nil
}
func (r *JobRepository) GetTagsDirect(job *int64) ([]*schema.Tag, error) {
q := sq.Select("id", "tag_type", "tag_name", "tag_scope").From("tag")
if job != nil {
q = q.Join("jobtag ON jobtag.tag_id = tag.id").Where("jobtag.job_id = ?", *job)
}
rows, err := q.RunWith(r.stmtCache).Query()
if err != nil {
s, _, _ := q.ToSql()
log.Errorf("Error get tags with %s: %v", s, err)
return nil, err
}
tags := make([]*schema.Tag, 0)
for rows.Next() {
tag := &schema.Tag{}
if err := rows.Scan(&tag.ID, &tag.Type, &tag.Name, &tag.Scope); err != nil {
log.Warn("Error while scanning rows")
return nil, err
}
tags = append(tags, tag)
}
return tags, nil
}
// GetArchiveTags returns a list of all tags *regardless of scope* for archiving if job is nil or of the tags that the job with that database ID has.
func (r *JobRepository) getArchiveTags(job *int64) ([]*schema.Tag, error) {
q := sq.Select("id", "tag_type", "tag_name", "tag_scope").From("tag")

View File

@ -0,0 +1,4 @@
GROMACS
gromacs
GMX
mdrun

View File

@ -0,0 +1 @@
openfoam

View File

@ -0,0 +1,3 @@
python
anaconda
conda

View File

@ -0,0 +1,2 @@
VASP
vasp

View File

@ -0,0 +1,322 @@
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved. This file is part of cc-backend.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package tagger
import (
"bytes"
"embed"
"encoding/json"
"fmt"
"maps"
"os"
"strings"
"text/template"
"github.com/ClusterCockpit/cc-backend/internal/repository"
"github.com/ClusterCockpit/cc-backend/internal/util"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
"github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
"github.com/expr-lang/expr"
"github.com/expr-lang/expr/vm"
)
//go:embed jobclasses/*
var jobclassFiles embed.FS
type Variable struct {
Name string `json:"name"`
Expr string `json:"expr"`
}
type ruleVariable struct {
name string
expr *vm.Program
}
type RuleFormat struct {
Name string `json:"name"`
Tag string `json:"tag"`
Parameters []string `json:"parameters"`
Metrics []string `json:"metrics"`
Requirements []string `json:"requirements"`
Variables []Variable `json:"variables"`
Rule string `json:"rule"`
Hint string `json:"hint"`
}
type ruleInfo struct {
env map[string]any
metrics []string
requirements []*vm.Program
variables []ruleVariable
rule *vm.Program
hint *template.Template
}
type JobClassTagger struct {
rules map[string]ruleInfo
parameters map[string]any
tagType string
cfgPath string
}
func (t *JobClassTagger) prepareRule(b []byte, fns string) {
var rule RuleFormat
if err := json.NewDecoder(bytes.NewReader(b)).Decode(&rule); err != nil {
log.Warn("Error while decoding raw job meta json")
return
}
ri := ruleInfo{}
ri.env = make(map[string]any)
ri.metrics = make([]string, 0)
ri.requirements = make([]*vm.Program, 0)
ri.variables = make([]ruleVariable, 0)
// check if all required parameters are available
for _, p := range rule.Parameters {
param, ok := t.parameters[p]
if !ok {
log.Warnf("prepareRule() > missing parameter %s in rule %s", p, fns)
return
}
ri.env[p] = param
}
// set all required metrics
ri.metrics = append(ri.metrics, rule.Metrics...)
// compile requirements
for _, r := range rule.Requirements {
req, err := expr.Compile(r, expr.AsBool())
if err != nil {
log.Errorf("error compiling requirement %s: %#v", r, err)
return
}
ri.requirements = append(ri.requirements, req)
}
// compile variables
for _, v := range rule.Variables {
req, err := expr.Compile(v.Expr, expr.AsFloat64())
if err != nil {
log.Errorf("error compiling requirement %s: %#v", v.Name, err)
return
}
ri.variables = append(ri.variables, ruleVariable{name: v.Name, expr: req})
}
// compile rule
exp, err := expr.Compile(rule.Rule, expr.AsBool())
if err != nil {
log.Errorf("error compiling rule %s: %#v", fns, err)
return
}
ri.rule = exp
// prepare hint template
ri.hint, err = template.New(fns).Parse(rule.Hint)
if err != nil {
log.Errorf("error processing template %s: %#v", fns, err)
}
log.Infof("prepareRule() > processing %s with %d requirements and %d variables", fns, len(ri.requirements), len(ri.variables))
t.rules[rule.Tag] = ri
}
func (t *JobClassTagger) EventMatch(s string) bool {
return strings.Contains(s, "jobclasses")
}
// FIXME: Only process the file that caused the event
func (t *JobClassTagger) EventCallback() {
files, err := os.ReadDir(t.cfgPath)
if err != nil {
log.Fatal(err)
}
if util.CheckFileExists(t.cfgPath + "/parameters.json") {
log.Info("Merge parameters")
b, err := os.ReadFile(t.cfgPath + "/parameters.json")
if err != nil {
log.Warnf("prepareRule() > open file error: %v", err)
}
var paramTmp map[string]any
if err := json.NewDecoder(bytes.NewReader(b)).Decode(&paramTmp); err != nil {
log.Warn("Error while decoding parameters.json")
}
maps.Copy(t.parameters, paramTmp)
}
for _, fn := range files {
fns := fn.Name()
if fns != "parameters.json" {
log.Debugf("Process: %s", fns)
filename := fmt.Sprintf("%s/%s", t.cfgPath, fns)
b, err := os.ReadFile(filename)
if err != nil {
log.Warnf("prepareRule() > open file error: %v", err)
return
}
t.prepareRule(b, fns)
}
}
}
func (t *JobClassTagger) initParameters() error {
log.Info("Initialize parameters")
b, err := jobclassFiles.ReadFile("jobclasses/parameters.json")
if err != nil {
log.Warnf("prepareRule() > open file error: %v", err)
return err
}
if err := json.NewDecoder(bytes.NewReader(b)).Decode(&t.parameters); err != nil {
log.Warn("Error while decoding parameters.json")
return err
}
return nil
}
func (t *JobClassTagger) Register() error {
t.cfgPath = "./var/tagger/jobclasses"
t.tagType = "jobClass"
err := t.initParameters()
if err != nil {
log.Warnf("error reading parameters.json: %v", err)
return err
}
files, err := jobclassFiles.ReadDir("jobclasses")
if err != nil {
return fmt.Errorf("error reading app folder: %#v", err)
}
t.rules = make(map[string]ruleInfo, 0)
for _, fn := range files {
fns := fn.Name()
if fns != "parameters.json" {
filename := fmt.Sprintf("jobclasses/%s", fns)
log.Infof("Process: %s", fns)
b, err := jobclassFiles.ReadFile(filename)
if err != nil {
log.Warnf("prepareRule() > open file error: %v", err)
return err
}
t.prepareRule(b, fns)
}
}
if util.CheckFileExists(t.cfgPath) {
t.EventCallback()
log.Infof("Setup file watch for %s", t.cfgPath)
util.AddListener(t.cfgPath, t)
}
return nil
}
func (t *JobClassTagger) Match(job *schema.Job) {
r := repository.GetJobRepository()
jobstats, err := archive.GetStatistics(job)
metricsList := archive.GetMetricConfigSubCluster(job.Cluster, job.SubCluster)
log.Infof("Enter match rule with %d rules for job %d", len(t.rules), job.JobID)
if err != nil {
log.Errorf("job classification failed for job %d: %#v", job.JobID, err)
return
}
for tag, ri := range t.rules {
env := make(map[string]any)
maps.Copy(env, ri.env)
log.Infof("Try to match rule %s for job %d", tag, job.JobID)
// Initialize environment
env["job"] = map[string]any{
"exclusive": job.Exclusive,
"duration": job.Duration,
"numCores": job.NumHWThreads,
"numNodes": job.NumNodes,
"jobState": job.State,
"numAcc": job.NumAcc,
"smt": job.SMT,
}
// add metrics to env
for _, m := range ri.metrics {
stats, ok := jobstats[m]
if !ok {
log.Errorf("job classification failed for job %d: missing metric '%s'", job.JobID, m)
return
}
env[m] = map[string]any{
"min": stats.Min,
"max": stats.Max,
"avg": stats.Avg,
"limits": map[string]float64{
"peak": metricsList[m].Peak,
"normal": metricsList[m].Normal,
"caution": metricsList[m].Caution,
"alert": metricsList[m].Alert,
},
}
}
// check rule requirements apply
for _, r := range ri.requirements {
ok, err := expr.Run(r, env)
if err != nil {
log.Errorf("error running requirement for rule %s: %#v", tag, err)
return
}
if !ok.(bool) {
log.Infof("requirement for rule %s not met", tag)
return
}
}
// validate rule expression
for _, v := range ri.variables {
value, err := expr.Run(v.expr, env)
if err != nil {
log.Errorf("error running rule %s: %#v", tag, err)
return
}
env[v.name] = value
}
// dump.P(env)
match, err := expr.Run(ri.rule, env)
if err != nil {
log.Errorf("error running rule %s: %#v", tag, err)
return
}
if match.(bool) {
log.Info("Rule matches!")
id := job.ID
if !r.HasTag(id, t.tagType, tag) {
r.AddTagOrCreateDirect(id, t.tagType, tag)
}
// process hint template
var msg bytes.Buffer
if err := ri.hint.Execute(&msg, env); err != nil {
log.Errorf("Template error: %s", err.Error())
return
}
// FIXME: Handle case where multiple tags apply
r.UpdateMetadata(job, "message", msg.String())
} else {
log.Info("Rule does not match!")
}
}
}

View File

@ -0,0 +1,125 @@
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved. This file is part of cc-backend.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package tagger
import (
"bufio"
"embed"
"fmt"
"io/fs"
"os"
"path/filepath"
"strings"
"github.com/ClusterCockpit/cc-backend/internal/repository"
"github.com/ClusterCockpit/cc-backend/internal/util"
"github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
)
//go:embed apps/*
var appFiles embed.FS
type appInfo struct {
tag string
strings []string
}
type AppTagger struct {
apps map[string]appInfo
tagType string
cfgPath string
}
func (t *AppTagger) scanApp(f fs.File, fns string) {
scanner := bufio.NewScanner(f)
ai := appInfo{tag: strings.TrimSuffix(fns, filepath.Ext(fns)), strings: make([]string, 0)}
for scanner.Scan() {
ai.strings = append(ai.strings, scanner.Text())
}
delete(t.apps, ai.tag)
t.apps[ai.tag] = ai
}
func (t *AppTagger) EventMatch(s string) bool {
return strings.Contains(s, "apps")
}
// FIXME: Only process the file that caused the event
func (t *AppTagger) EventCallback() {
files, err := os.ReadDir(t.cfgPath)
if err != nil {
log.Fatal(err)
}
for _, fn := range files {
fns := fn.Name()
log.Debugf("Process: %s", fns)
f, err := os.Open(fmt.Sprintf("%s/%s", t.cfgPath, fns))
if err != nil {
log.Errorf("error opening app file %s: %#v", fns, err)
}
t.scanApp(f, fns)
}
}
func (t *AppTagger) Register() error {
t.cfgPath = "./var/tagger/apps"
t.tagType = "app"
files, err := appFiles.ReadDir("apps")
if err != nil {
return fmt.Errorf("error reading app folder: %#v", err)
}
t.apps = make(map[string]appInfo, 0)
for _, fn := range files {
fns := fn.Name()
log.Debugf("Process: %s", fns)
f, err := appFiles.Open(fmt.Sprintf("apps/%s", fns))
if err != nil {
return fmt.Errorf("error opening app file %s: %#v", fns, err)
}
defer f.Close()
t.scanApp(f, fns)
}
if util.CheckFileExists(t.cfgPath) {
t.EventCallback()
log.Infof("Setup file watch for %s", t.cfgPath)
util.AddListener(t.cfgPath, t)
}
return nil
}
func (t *AppTagger) Match(job *schema.Job) {
r := repository.GetJobRepository()
metadata, err := r.FetchMetadata(job)
if err != nil {
log.Infof("Cannot fetch metadata for job: %d on %s", job.JobID, job.Cluster)
return
}
jobscript, ok := metadata["jobScript"]
if ok {
id := job.ID
out:
for _, a := range t.apps {
tag := a.tag
for _, s := range a.strings {
if strings.Contains(jobscript, s) {
if !r.HasTag(id, t.tagType, tag) {
r.AddTagOrCreateDirect(id, t.tagType, tag)
break out
}
}
}
}
} else {
log.Infof("Cannot extract job script for job: %d on %s", job.JobID, job.Cluster)
}
}

View File

@ -0,0 +1,59 @@
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package tagger
import (
"testing"
"github.com/ClusterCockpit/cc-backend/internal/repository"
"github.com/ClusterCockpit/cc-backend/pkg/log"
)
func setup(tb testing.TB) *repository.JobRepository {
tb.Helper()
log.Init("warn", true)
dbfile := "../repository/testdata/job.db"
err := repository.MigrateDB("sqlite3", dbfile)
noErr(tb, err)
repository.Connect("sqlite3", dbfile)
return repository.GetJobRepository()
}
func noErr(tb testing.TB, err error) {
tb.Helper()
if err != nil {
tb.Fatal("Error is not nil:", err)
}
}
func TestRegister(t *testing.T) {
var tagger AppTagger
err := tagger.Register()
noErr(t, err)
if len(tagger.apps) != 4 {
t.Errorf("wrong summary for diagnostic \ngot: %d \nwant: 3", len(tagger.apps))
}
}
func TestMatch(t *testing.T) {
r := setup(t)
job, err := r.FindByIdDirect(5)
noErr(t, err)
var tagger AppTagger
err = tagger.Register()
noErr(t, err)
tagger.Match(job)
if !r.HasTag(5, "app", "vasp") {
t.Errorf("missing tag vasp")
}
}

View File

@ -0,0 +1,27 @@
{
"name": "Excessive CPU load",
"tag": "excessiveload",
"comment": "Assumptions: all nodes have the same number of cores.",
"parameters": [
"excessivecpuload_threshold_factor",
"job_min_duration_seconds",
"sampling_interval_seconds"
],
"metrics": ["cpu_load"],
"requirements": [
"job.exclusive == 1",
"job.duration > job_min_duration_seconds"
],
"variables": [
{
"name": "load_threshold",
"expr": "(job.numCores / job.numNodes) * excessivecpuload_threshold_factor"
},
{
"name": "load_perc",
"expr": "cpu_load.avg / load_threshold"
}
],
"rule": "cpu_load > load_threshold",
"hint": "This job was detected as excessiveload because the average cpu load {{.cpu_load}} falls above the threshold {{.load_threshold}}."
}

View File

@ -0,0 +1,26 @@
{
"name": "Low CPU load",
"tag": "lowload",
"parameters": [
"lowcpuload_threshold_factor",
"job_min_duration_seconds",
"sampling_interval_seconds"
],
"metrics": ["cpu_load"],
"requirements": [
"job.exclusive == 1",
"job.duration > job_min_duration_seconds"
],
"variables": [
{
"name": "load_threshold",
"expr": "job.numCores * lowcpuload_threshold_factor"
},
{
"name": "load_perc",
"expr": "1.0 - (cpu_load / load_threshold)"
}
],
"rule": "cpu_load.avg < load_threshold",
"hint": "This job was detected as lowload because the average cpu load {{.cpu_load}} falls below the threshold {{.load_threshold}}."
}

View File

@ -0,0 +1,14 @@
{
"lowcpuload_threshold_factor": 0.9,
"excessivecpuload_threshold_factor": 1.1,
"highmemoryusage_threshold_factor": 0.9,
"node_load_imbalance_threshold_factor": 0.1,
"core_load_imbalance_threshold_factor": 0.1,
"high_memory_load_threshold_factor": 0.9,
"lowgpuload_threshold_factor": 0.7,
"memory_leak_slope_threshold": 0.1,
"job_min_duration_seconds": 600.0,
"sampling_interval_seconds": 30.0,
"cpu_load_pre_cutoff_samples": 11.0,
"cpu_load_core_pre_cutoff_samples": 6.0
}

View File

@ -0,0 +1,21 @@
{
"and": [
{
"in": [
"a40",
{
"var": "metaData.jobScript"
}
]
},
{
">": [
{
"var": "statistics.clock.min"
},
2000
]
}
]
}

88
internal/tagger/tagger.go Normal file
View File

@ -0,0 +1,88 @@
// Copyright (C) 2023 NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package tagger
import (
"sync"
"github.com/ClusterCockpit/cc-backend/internal/repository"
"github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
)
type Tagger interface {
Register() error
Match(job *schema.Job)
}
var (
initOnce sync.Once
jobTagger *JobTagger
)
type JobTagger struct {
startTaggers []Tagger
stopTaggers []Tagger
}
func newTagger() {
jobTagger = &JobTagger{}
jobTagger.startTaggers = make([]Tagger, 0)
jobTagger.startTaggers = append(jobTagger.startTaggers, &AppTagger{})
jobTagger.stopTaggers = make([]Tagger, 0)
jobTagger.stopTaggers = append(jobTagger.stopTaggers, &JobClassTagger{})
for _, tagger := range jobTagger.startTaggers {
tagger.Register()
}
for _, tagger := range jobTagger.stopTaggers {
tagger.Register()
}
}
func Init() {
initOnce.Do(func() {
newTagger()
repository.RegisterJobJook(jobTagger)
})
}
func (jt *JobTagger) JobStartCallback(job *schema.Job) {
for _, tagger := range jt.startTaggers {
tagger.Match(job)
}
}
func (jt *JobTagger) JobStopCallback(job *schema.Job) {
for _, tagger := range jt.stopTaggers {
tagger.Match(job)
}
}
func RunTaggers() error {
newTagger()
r := repository.GetJobRepository()
jl, err := r.GetJobList()
if err != nil {
log.Errorf("Error while getting job list %s", err)
return err
}
for _, id := range jl {
job, err := r.FindByIdDirect(id)
if err != nil {
log.Errorf("Error while getting job %s", err)
return err
}
for _, tagger := range jobTagger.startTaggers {
tagger.Match(job)
}
for _, tagger := range jobTagger.stopTaggers {
log.Infof("Run stop tagger for job %d", job.ID)
tagger.Match(job)
}
}
return nil
}

View File

@ -0,0 +1,31 @@
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package tagger
import (
"testing"
"github.com/ClusterCockpit/cc-backend/internal/repository"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
)
func TestInit(t *testing.T) {
Init()
}
func TestJobStartCallback(t *testing.T) {
Init()
r := setup(t)
job, err := r.FindByIdDirect(2)
noErr(t, err)
jobs := make([]*schema.Job, 0, 1)
jobs = append(jobs, job)
repository.CallJobStartHooks(jobs)
if !r.HasTag(2, "app", "python") {
t.Errorf("missing tag python")
}
}

View File

@ -0,0 +1,35 @@
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package taskManager
import (
"time"
"github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-backend/internal/repository"
"github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/go-co-op/gocron/v2"
)
func RegisterCommitJobService() {
var frequency string
if config.Keys.CronFrequency != nil && config.Keys.CronFrequency.CommitJobWorker != "" {
frequency = config.Keys.CronFrequency.CommitJobWorker
} else {
frequency = "2m"
}
d, _ := time.ParseDuration(frequency)
log.Infof("Register commitJob service with %s interval", frequency)
s.NewJob(gocron.DurationJob(d),
gocron.NewTask(
func() {
start := time.Now()
log.Printf("Jobcache sync started at %s", start.Format(time.RFC3339))
jobs, _ := jobRepo.SyncJobs()
repository.CallJobStartHooks(jobs)
log.Printf("Jobcache sync and job callbacks are done and took %s", time.Since(start))
}))
}

View File

@ -81,6 +81,7 @@ func Start() {
RegisterFootprintWorker()
RegisterUpdateDurationWorker()
RegisterCommitJobService()
s.Start()
}

View File

@ -0,0 +1,75 @@
// Copyright (C) 2023 NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package util
import (
"sync"
"github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/fsnotify/fsnotify"
)
type Listener interface {
EventCallback()
EventMatch(event string) bool
}
var (
initOnce sync.Once
w *fsnotify.Watcher
listeners []Listener
)
func AddListener(path string, l Listener) {
var err error
initOnce.Do(func() {
var err error
w, err = fsnotify.NewWatcher()
if err != nil {
log.Error("creating a new watcher: %w", err)
}
listeners = make([]Listener, 0)
go watchLoop(w)
})
listeners = append(listeners, l)
err = w.Add(path)
if err != nil {
log.Warnf("%q: %s", path, err)
}
}
func FsWatcherShutdown() {
if w != nil {
w.Close()
}
}
func watchLoop(w *fsnotify.Watcher) {
for {
select {
// Read from Errors.
case err, ok := <-w.Errors:
if !ok { // Channel was closed (i.e. Watcher.Close() was called).
return
}
log.Errorf("watch event loop: %s", err)
// Read from Events.
case e, ok := <-w.Events:
if !ok { // Channel was closed (i.e. Watcher.Close() was called).
return
}
log.Infof("Event %s", e)
for _, l := range listeners {
if l.EventMatch(e.String()) {
l.EventCallback()
}
}
}
}
}

View File

@ -69,16 +69,18 @@ func initClusterConfig() error {
for _, sc := range cluster.SubClusters {
newMetric := &schema.MetricConfig{
Unit: mc.Unit,
Energy: mc.Energy,
Metric: schema.Metric{
Name: mc.Name,
Scope: mc.Scope,
Aggregation: mc.Aggregation,
Unit: mc.Unit,
Peak: mc.Peak,
Normal: mc.Normal,
Caution: mc.Caution,
Alert: mc.Alert,
},
Energy: mc.Energy,
Scope: mc.Scope,
Aggregation: mc.Aggregation,
Timestep: mc.Timestep,
Normal: mc.Normal,
LowerIsBetter: mc.LowerIsBetter,
}
@ -167,6 +169,45 @@ func GetSubCluster(cluster, subcluster string) (*schema.SubCluster, error) {
return nil, fmt.Errorf("subcluster '%v' not found for cluster '%v', or cluster '%v' not configured", subcluster, cluster, cluster)
}
func GetMetricConfigSubCluster(cluster, subcluster string) map[string]*schema.Metric {
metrics := make(map[string]*schema.Metric)
for _, c := range Clusters {
if c.Name == cluster {
for _, m := range c.MetricConfig {
for _, s := range m.SubClusters {
if s.Name == subcluster {
metrics[m.Name] = &schema.Metric{
Name: m.Name,
Unit: s.Unit,
Peak: s.Peak,
Normal: s.Normal,
Caution: s.Caution,
Alert: s.Alert,
}
break
}
}
_, ok := metrics[m.Name]
if !ok {
metrics[m.Name] = &schema.Metric{
Name: m.Name,
Unit: m.Unit,
Peak: m.Peak,
Normal: m.Normal,
Caution: m.Caution,
Alert: m.Alert,
}
}
}
break
}
}
return metrics
}
func GetMetricConfig(cluster, metric string) *schema.MetricConfig {
for _, c := range Clusters {
if c.Name == cluster {

View File

@ -59,14 +59,13 @@ func getDirectory(
func getPath(
job *schema.Job,
rootPath string,
file string) string {
file string,
) string {
return filepath.Join(
getDirectory(job, rootPath), file)
}
func loadJobMeta(filename string) (*schema.JobMeta, error) {
b, err := os.ReadFile(filename)
if err != nil {
log.Errorf("loadJobMeta() > open file error: %v", err)
@ -83,7 +82,6 @@ func loadJobMeta(filename string) (*schema.JobMeta, error) {
func loadJobData(filename string, isCompressed bool) (schema.JobData, error) {
f, err := os.Open(filename)
if err != nil {
log.Errorf("fsBackend LoadJobData()- %v", err)
return nil, err
@ -117,7 +115,6 @@ func loadJobData(filename string, isCompressed bool) (schema.JobData, error) {
func loadJobStats(filename string, isCompressed bool) (schema.ScopedJobStats, error) {
f, err := os.Open(filename)
if err != nil {
log.Errorf("fsBackend LoadJobStats()- %v", err)
return nil, err
@ -150,7 +147,6 @@ func loadJobStats(filename string, isCompressed bool) (schema.ScopedJobStats, er
}
func (fsa *FsArchive) Init(rawConfig json.RawMessage) (uint64, error) {
var config FsArchiveConfig
if err := json.Unmarshal(rawConfig, &config); err != nil {
log.Warnf("Init() > Unmarshal error: %#v", err)
@ -276,7 +272,6 @@ func (fsa *FsArchive) Exists(job *schema.Job) bool {
}
func (fsa *FsArchive) Clean(before int64, after int64) {
if after == 0 {
after = math.MaxInt64
}
@ -392,7 +387,6 @@ func (fsa *FsArchive) Compress(jobs []*schema.Job) {
}
func (fsa *FsArchive) CompressLast(starttime int64) int64 {
filename := filepath.Join(fsa.path, "compress.txt")
b, err := os.ReadFile(filename)
if err != nil {
@ -441,7 +435,6 @@ func (fsa *FsArchive) LoadJobMeta(job *schema.Job) (*schema.JobMeta, error) {
}
func (fsa *FsArchive) LoadClusterCfg(name string) (*schema.Cluster, error) {
b, err := os.ReadFile(filepath.Join(fsa.path, name, "cluster.json"))
if err != nil {
log.Errorf("LoadClusterCfg() > open file error: %v", err)
@ -456,7 +449,6 @@ func (fsa *FsArchive) LoadClusterCfg(name string) (*schema.Cluster, error) {
}
func (fsa *FsArchive) Iter(loadMetricData bool) <-chan JobContainer {
ch := make(chan JobContainer)
go func() {
clustersDir, err := os.ReadDir(fsa.path)
@ -527,7 +519,6 @@ func (fsa *FsArchive) Iter(loadMetricData bool) <-chan JobContainer {
}
func (fsa *FsArchive) StoreJobMeta(jobMeta *schema.JobMeta) error {
job := schema.Job{
BaseJob: jobMeta.BaseJob,
StartTime: time.Unix(jobMeta.StartTime, 0),
@ -556,8 +547,8 @@ func (fsa *FsArchive) GetClusters() []string {
func (fsa *FsArchive) ImportJob(
jobMeta *schema.JobMeta,
jobData *schema.JobData) error {
jobData *schema.JobData,
) error {
job := schema.Job{
BaseJob: jobMeta.BaseJob,
StartTime: time.Unix(jobMeta.StartTime, 0),
@ -583,28 +574,6 @@ func (fsa *FsArchive) ImportJob(
return err
}
// var isCompressed bool = true
// // TODO Use shortJob Config for check
// if jobMeta.Duration < 300 {
// isCompressed = false
// f, err = os.Create(path.Join(dir, "data.json"))
// } else {
// f, err = os.Create(path.Join(dir, "data.json.gz"))
// }
// if err != nil {
// return err
// }
//
// if isCompressed {
// if err := EncodeJobData(gzip.NewWriter(f), jobData); err != nil {
// return err
// }
// } else {
// if err := EncodeJobData(f, jobData); err != nil {
// return err
// }
// }
f, err = os.Create(path.Join(dir, "data.json"))
if err != nil {
log.Error("Error while creating filepath for data.json")

View File

@ -61,7 +61,7 @@ func (nl *NodeList) PrintList() []string {
}
func (nl *NodeList) NodeCount() int {
var out int = 0
out := 0
for _, term := range *nl {
if len(term) == 1 { // If only String-Part in Term: Single Node Name -> add one
out += 1
@ -160,7 +160,7 @@ func (nle NLExprIntRange) limits() []map[string]int {
m["start"] = int(nle.start)
m["end"] = int(nle.end)
m["digits"] = int(nle.digits)
if nle.zeroPadded == true {
if nle.zeroPadded {
m["zeroPadded"] = 1
} else {
m["zeroPadded"] = 0
@ -183,14 +183,15 @@ func ParseNodeList(raw string) (NodeList, error) {
rawterms := []string{}
prevterm := 0
for i := 0; i < len(raw); i++ {
if raw[i] == '[' {
switch raw[i] {
case '[':
for i < len(raw) && raw[i] != ']' {
i++
}
if i == len(raw) {
return nil, fmt.Errorf("ARCHIVE/NODELIST > unclosed '['")
}
} else if raw[i] == ',' {
case ',':
rawterms = append(rawterms, raw[prevterm:i])
prevterm = i + 1
}

View File

@ -45,31 +45,31 @@ type SubCluster struct {
ThreadsPerCore int `json:"threadsPerCore"`
}
type SubClusterConfig struct {
type Metric struct {
Name string `json:"name"`
Footprint string `json:"footprint,omitempty"`
Energy string `json:"energy"`
Unit Unit `json:"unit"`
Peak float64 `json:"peak"`
Normal float64 `json:"normal"`
Caution float64 `json:"caution"`
Alert float64 `json:"alert"`
}
type SubClusterConfig struct {
Metric
Footprint string `json:"footprint,omitempty"`
Energy string `json:"energy"`
Remove bool `json:"remove"`
LowerIsBetter bool `json:"lowerIsBetter"`
}
type MetricConfig struct {
Unit Unit `json:"unit"`
Metric
Energy string `json:"energy"`
Name string `json:"name"`
Scope MetricScope `json:"scope"`
Aggregation string `json:"aggregation"`
Footprint string `json:"footprint,omitempty"`
SubClusters []*SubClusterConfig `json:"subClusters,omitempty"`
Peak float64 `json:"peak"`
Caution float64 `json:"caution"`
Alert float64 `json:"alert"`
Timestep int `json:"timestep"`
Normal float64 `json:"normal"`
LowerIsBetter bool `json:"lowerIsBetter"`
}

View File

@ -89,6 +89,8 @@ type ResampleConfig struct {
}
type CronFrequency struct {
// Duration Update Worker [Defaults to '2m']
CommitJobWorker string `json:"commit-job-worker"`
// Duration Update Worker [Defaults to '5m']
DurationWorker string `json:"duration-worker"`
// Metric-Footprint Update Worker [Defaults to '10m']
@ -129,6 +131,8 @@ type ProgramConfig struct {
// do not write to the job-archive.
DisableArchive bool `json:"disable-archive"`
EnableJobTaggers bool `json:"enable-job-taggers"`
// Validate json input against schema
Validate bool `json:"validate"`
@ -150,7 +154,7 @@ type ProgramConfig struct {
// If overwritten, at least all the options in the defaults below must
// be provided! Most options here can be overwritten by the user.
UiDefaults map[string]interface{} `json:"ui-defaults"`
UiDefaults map[string]any `json:"ui-defaults"`
// If exists, will enable dynamic zoom in frontend metric plots using the configured values
EnableResampling *ResampleConfig `json:"enable-resampling"`

View File

@ -145,7 +145,12 @@ const (
JobStateOutOfMemory JobState = "out_of_memory"
)
func (e *JobState) UnmarshalGQL(v interface{}) error {
func (j JobMeta) GoString() string {
return fmt.Sprintf("JobMeta{ID:%d, StartTime:%d, JobID:%v, BaseJob:%v}",
j.ID, j.StartTime, j.JobID, j.BaseJob)
}
func (e *JobState) UnmarshalGQL(v any) error {
str, ok := v.(string)
if !ok {
return fmt.Errorf("SCHEMA/JOB > enums must be strings")

View File

@ -28,12 +28,13 @@ const (
//go:embed schemas/*
var schemaFiles embed.FS
func Validate(k Kind, r io.Reader) (err error) {
func Validate(k Kind, r io.Reader) error {
jsonschema.Loaders["embedfs"] = func(s string) (io.ReadCloser, error) {
f := filepath.Join("schemas", strings.Split(s, "//")[1])
return schemaFiles.Open(f)
}
var s *jsonschema.Schema
var err error
switch k {
case Meta:
@ -54,7 +55,7 @@ func Validate(k Kind, r io.Reader) (err error) {
}
var v interface{}
if err := json.NewDecoder(r).Decode(&v); err != nil {
if err = json.NewDecoder(r).Decode(&v); err != nil {
log.Warnf("Error while decoding raw json schema: %#v", err)
return err
}