diff --git a/cmd/cc-backend/cli.go b/cmd/cc-backend/cli.go index 8d9e7e6..8b826bb 100644 --- a/cmd/cc-backend/cli.go +++ b/cmd/cc-backend/cli.go @@ -7,8 +7,9 @@ package main import "flag" var ( - flagReinitDB, flagInit, flagServer, flagSyncLDAP, flagGops, flagMigrateDB, flagRevertDB, flagForceDB, flagDev, flagVersion, flagLogDateTime bool - flagNewUser, flagDelUser, flagGenJWT, flagConfigFile, flagImportJob, flagLogLevel string + flagReinitDB, flagInit, flagServer, flagSyncLDAP, flagGops, flagMigrateDB, flagRevertDB, + flagForceDB, flagDev, flagVersion, flagLogDateTime, flagApplyTags bool + flagNewUser, flagDelUser, flagGenJWT, flagConfigFile, flagImportJob, flagLogLevel string ) func cliInit() { @@ -21,6 +22,7 @@ func cliInit() { flag.BoolVar(&flagVersion, "version", false, "Show version information and exit") flag.BoolVar(&flagMigrateDB, "migrate-db", false, "Migrate database to supported version and exit") flag.BoolVar(&flagRevertDB, "revert-db", false, "Migrate database to previous version and exit") + flag.BoolVar(&flagApplyTags, "apply-tags", false, "Run taggers on all completed jobs and exit") flag.BoolVar(&flagForceDB, "force-db", false, "Force database version, clear dirty flag and exit") flag.BoolVar(&flagLogDateTime, "logdate", false, "Set this flag to add date and time to log messages") flag.StringVar(&flagConfigFile, "config", "./config.json", "Specify alternative path to `config.json`") diff --git a/cmd/cc-backend/main.go b/cmd/cc-backend/main.go index 4b6d7f9..ab07d28 100644 --- a/cmd/cc-backend/main.go +++ b/cmd/cc-backend/main.go @@ -19,7 +19,9 @@ import ( "github.com/ClusterCockpit/cc-backend/internal/importer" "github.com/ClusterCockpit/cc-backend/internal/metricdata" "github.com/ClusterCockpit/cc-backend/internal/repository" + "github.com/ClusterCockpit/cc-backend/internal/tagger" "github.com/ClusterCockpit/cc-backend/internal/taskManager" + "github.com/ClusterCockpit/cc-backend/internal/util" "github.com/ClusterCockpit/cc-backend/pkg/archive" "github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/runtimeEnv" @@ -211,11 +213,22 @@ func main() { } } + if config.Keys.EnableJobTaggers { + tagger.Init() + } + + if flagApplyTags { + if err := tagger.RunTaggers(); err != nil { + log.Abortf("Running job taggers.\nError: %s\n", err.Error()) + } + } + if !flagServer { log.Exit("No errors, server flag not set. Exiting cc-backend.") } archiver.Start(repository.GetJobRepository()) + taskManager.Start() serverInit() @@ -237,6 +250,8 @@ func main() { serverShutdown() + util.FsWatcherShutdown() + taskManager.Shutdown() }() diff --git a/go.mod b/go.mod index 98d1cab..6c92171 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,8 @@ require ( github.com/ClusterCockpit/cc-units v0.4.0 github.com/Masterminds/squirrel v1.5.4 github.com/coreos/go-oidc/v3 v3.12.0 + github.com/expr-lang/expr v1.17.3 + github.com/fsnotify/fsnotify v1.9.0 github.com/go-co-op/gocron/v2 v2.16.0 github.com/go-ldap/ldap/v3 v3.4.10 github.com/go-sql-driver/mysql v1.9.0 @@ -20,6 +22,7 @@ require ( github.com/gorilla/sessions v1.4.0 github.com/influxdata/influxdb-client-go/v2 v2.14.0 github.com/jmoiron/sqlx v1.4.0 + github.com/joho/godotenv v1.5.1 github.com/mattn/go-sqlite3 v1.14.24 github.com/prometheus/client_golang v1.21.0 github.com/prometheus/common v0.62.0 @@ -58,7 +61,6 @@ require ( github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect github.com/influxdata/line-protocol v0.0.0-20210922203350-b1ad95c89adf // indirect - github.com/joho/godotenv v1.5.1 // indirect github.com/jonboulle/clockwork v0.5.0 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/jpillora/backoff v1.0.0 // indirect diff --git a/go.sum b/go.sum index a76e112..b4c3781 100644 --- a/go.sum +++ b/go.sum @@ -53,8 +53,12 @@ github.com/docker/go-connections v0.5.0 h1:USnMq7hx7gwdVZq1L49hLXaFtUdTADjXGp+uj github.com/docker/go-connections v0.5.0/go.mod h1:ov60Kzw0kKElRwhNs9UlUHAE/F9Fe6GLaXnqyDdmEXc= github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4= github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= +github.com/expr-lang/expr v1.17.3 h1:myeTTuDFz7k6eFe/JPlep/UsiIjVhG61FMHFu63U7j0= +github.com/expr-lang/expr v1.17.3/go.mod h1:8/vRC7+7HBzESEqt5kKpYXxrxkr31SaO8r40VO/1IT4= github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= +github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S9k= +github.com/fsnotify/fsnotify v1.9.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0= github.com/go-asn1-ber/asn1-ber v1.5.7 h1:DTX+lbVTWaTw1hQ+PbZPlnDZPEIs0SS/GCZAl535dDk= github.com/go-asn1-ber/asn1-ber v1.5.7/go.mod h1:hEBeB/ic+5LoWskz+yKT7vGhhPYkProFKoKdwZRWMe0= github.com/go-co-op/gocron/v2 v2.16.0 h1:uqUF6WFZ4enRU45pWFNcn1xpDLc+jBOTKhPQI16Z1xs= diff --git a/internal/api/api_test.go b/internal/api/api_test.go index e67813c..3af37ad 100644 --- a/internal/api/api_test.go +++ b/internal/api/api_test.go @@ -123,7 +123,7 @@ func setup(t *testing.T) *api.RestApi { t.Fatal(err) } - if err := os.WriteFile(filepath.Join(jobarchive, "version.txt"), []byte(fmt.Sprintf("%d", 2)), 0666); err != nil { + if err := os.WriteFile(filepath.Join(jobarchive, "version.txt"), fmt.Appendf(nil, "%d", 2), 0666); err != nil { t.Fatal(err) } @@ -204,11 +204,11 @@ func TestRestApi(t *testing.T) { restapi.MountApiRoutes(r) var TestJobId int64 = 123 - var TestClusterName string = "testcluster" + TestClusterName := "testcluster" var TestStartTime int64 = 123456789 const startJobBody string = `{ - "jobId": 123, + "jobId": 123, "user": "testuser", "project": "testproj", "cluster": "testcluster", @@ -221,7 +221,6 @@ func TestRestApi(t *testing.T) { "exclusive": 1, "monitoringStatus": 1, "smt": 1, - "tags": [{ "type": "testTagType", "name": "testTagName", "scope": "testuser" }], "resources": [ { "hostname": "host123", @@ -252,16 +251,17 @@ func TestRestApi(t *testing.T) { if response.StatusCode != http.StatusCreated { t.Fatal(response.Status, recorder.Body.String()) } - resolver := graph.GetResolverInstance() + // resolver := graph.GetResolverInstance() + restapi.JobRepository.SyncJobs() job, err := restapi.JobRepository.Find(&TestJobId, &TestClusterName, &TestStartTime) if err != nil { t.Fatal(err) } - job.Tags, err = resolver.Job().Tags(ctx, job) - if err != nil { - t.Fatal(err) - } + // job.Tags, err = resolver.Job().Tags(ctx, job) + // if err != nil { + // t.Fatal(err) + // } if job.JobID != 123 || job.User != "testuser" || @@ -282,9 +282,9 @@ func TestRestApi(t *testing.T) { t.Fatalf("unexpected job properties: %#v", job) } - if len(job.Tags) != 1 || job.Tags[0].Type != "testTagType" || job.Tags[0].Name != "testTagName" || job.Tags[0].Scope != "testuser" { - t.Fatalf("unexpected tags: %#v", job.Tags) - } + // if len(job.Tags) != 1 || job.Tags[0].Type != "testTagType" || job.Tags[0].Name != "testTagName" || job.Tags[0].Scope != "testuser" { + // t.Fatalf("unexpected tags: %#v", job.Tags) + // } }); !ok { return } @@ -352,7 +352,7 @@ func TestRestApi(t *testing.T) { t.Run("CheckDoubleStart", func(t *testing.T) { // Starting a job with the same jobId and cluster should only be allowed if the startTime is far appart! - body := strings.Replace(startJobBody, `"startTime": 123456789`, `"startTime": 123456790`, -1) + body := strings.ReplaceAll(startJobBody, `"startTime": 123456789`, `"startTime": 123456790`) req := httptest.NewRequest(http.MethodPost, "/jobs/start_job/", bytes.NewBuffer([]byte(body))) recorder := httptest.NewRecorder() @@ -402,6 +402,7 @@ func TestRestApi(t *testing.T) { } time.Sleep(1 * time.Second) + restapi.JobRepository.SyncJobs() const stopJobBodyFailed string = `{ "jobId": 12345, diff --git a/internal/api/rest.go b/internal/api/rest.go index 669768e..fe35942 100644 --- a/internal/api/rest.go +++ b/internal/api/rest.go @@ -820,7 +820,7 @@ func (api *RestApi) removeTags(rw http.ResponseWriter, r *http.Request) { } rw.WriteHeader(http.StatusOK) - rw.Write([]byte(fmt.Sprintf("Deleted Tags from DB: %d successfull of %d requested\n", currentCount, targetCount))) + fmt.Fprintf(rw, "Deleted Tags from DB: %d successfull of %d requested\n", currentCount, targetCount) } // startJob godoc @@ -846,6 +846,7 @@ func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) { return } + log.Printf("REST: %s\n", req.GoString()) req.State = schema.JobStateRunning if err := importer.SanityChecks(&req.BaseJob); err != nil { @@ -931,8 +932,12 @@ func (api *RestApi) stopJobByRequest(rw http.ResponseWriter, r *http.Request) { // log.Printf("loading db job for stopJobByRequest... : stopJobApiRequest=%v", req) job, err = api.JobRepository.Find(req.JobId, req.Cluster, req.StartTime) if err != nil { - handleError(fmt.Errorf("finding job failed: %w", err), http.StatusUnprocessableEntity, rw) - return + job, err = api.JobRepository.FindCached(req.JobId, req.Cluster, req.StartTime) + // FIXME: Previous error is hidden + if err != nil { + handleError(fmt.Errorf("finding job failed: %w", err), http.StatusUnprocessableEntity, rw) + return + } } api.checkAndHandleStopJob(rw, job, req) @@ -1097,10 +1102,15 @@ func (api *RestApi) checkAndHandleStopJob(rw http.ResponseWriter, job *schema.Jo // Mark job as stopped in the database (update state and duration) job.Duration = int32(req.StopTime - job.StartTime.Unix()) job.State = req.State + api.JobRepository.Mutex.Lock() if err := api.JobRepository.Stop(job.ID, job.Duration, job.State, job.MonitoringStatus); err != nil { - handleError(fmt.Errorf("jobId %d (id %d) on %s : marking job as '%s' (duration: %d) in DB failed: %w", job.JobID, job.ID, job.Cluster, job.State, job.Duration, err), http.StatusInternalServerError, rw) - return + if err := api.JobRepository.StopCached(job.ID, job.Duration, job.State, job.MonitoringStatus); err != nil { + api.JobRepository.Mutex.Unlock() + handleError(fmt.Errorf("jobId %d (id %d) on %s : marking job as '%s' (duration: %d) in DB failed: %w", job.JobID, job.ID, job.Cluster, job.State, job.Duration, err), http.StatusInternalServerError, rw) + return + } } + api.JobRepository.Mutex.Unlock() log.Printf("archiving job... (dbid: %d): cluster=%s, jobId=%d, user=%s, startTime=%s, duration=%d, state=%s", job.ID, job.Cluster, job.JobID, job.User, job.StartTime, job.Duration, job.State) diff --git a/internal/archiver/archiveWorker.go b/internal/archiver/archiveWorker.go index 628e36e..42a60b9 100644 --- a/internal/archiver/archiveWorker.go +++ b/internal/archiver/archiveWorker.go @@ -72,7 +72,11 @@ func archivingWorker() { } log.Debugf("archiving job %d took %s", job.JobID, time.Since(start)) log.Printf("archiving job (dbid: %d) successful", job.ID) + + repository.CallJobStopHooks(job) archivePending.Done() + default: + continue } } } diff --git a/internal/auth/auth.go b/internal/auth/auth.go index 5f88bbb..3e57768 100644 --- a/internal/auth/auth.go +++ b/internal/auth/auth.go @@ -237,7 +237,7 @@ func (auth *Authentication) Login( limiter := getIPUserLimiter(ip, username) if !limiter.Allow() { log.Warnf("AUTH/RATE > Too many login attempts for combination IP: %s, Username: %s", ip, username) - onfailure(rw, r, errors.New("Too many login attempts, try again in a few minutes.")) + onfailure(rw, r, errors.New("too many login attempts, try again in a few minutes")) return } diff --git a/internal/graph/schema.resolvers.go b/internal/graph/schema.resolvers.go index f3fc389..7e52b3d 100644 --- a/internal/graph/schema.resolvers.go +++ b/internal/graph/schema.resolvers.go @@ -143,7 +143,7 @@ func (r *mutationResolver) CreateTag(ctx context.Context, typeArg string, name s return &schema.Tag{ID: id, Type: typeArg, Name: name, Scope: scope}, nil } else { log.Warnf("Not authorized to create tag with scope: %s", scope) - return nil, fmt.Errorf("Not authorized to create tag with scope: %s", scope) + return nil, fmt.Errorf("not authorized to create tag with scope: %s", scope) } } @@ -179,7 +179,7 @@ func (r *mutationResolver) AddTagsToJob(ctx context.Context, job string, tagIds _, _, tscope, exists := r.Repo.TagInfo(tid) if !exists { log.Warnf("Tag does not exist (ID): %d", tid) - return nil, fmt.Errorf("Tag does not exist (ID): %d", tid) + return nil, fmt.Errorf("tag does not exist (ID): %d", tid) } // Test Access: Admins && Admin Tag OR Support/Admin and Global Tag OR Everyone && Private Tag @@ -193,7 +193,7 @@ func (r *mutationResolver) AddTagsToJob(ctx context.Context, job string, tagIds } } else { log.Warnf("Not authorized to add tag: %d", tid) - return nil, fmt.Errorf("Not authorized to add tag: %d", tid) + return nil, fmt.Errorf("not authorized to add tag: %d", tid) } } @@ -226,7 +226,7 @@ func (r *mutationResolver) RemoveTagsFromJob(ctx context.Context, job string, ta _, _, tscope, exists := r.Repo.TagInfo(tid) if !exists { log.Warnf("Tag does not exist (ID): %d", tid) - return nil, fmt.Errorf("Tag does not exist (ID): %d", tid) + return nil, fmt.Errorf("tag does not exist (ID): %d", tid) } // Test Access: Admins && Admin Tag OR Support/Admin and Global Tag OR Everyone && Private Tag @@ -240,7 +240,7 @@ func (r *mutationResolver) RemoveTagsFromJob(ctx context.Context, job string, ta } } else { log.Warnf("Not authorized to remove tag: %d", tid) - return nil, fmt.Errorf("Not authorized to remove tag: %d", tid) + return nil, fmt.Errorf("not authorized to remove tag: %d", tid) } } @@ -269,7 +269,7 @@ func (r *mutationResolver) RemoveTagFromList(ctx context.Context, tagIds []strin _, _, tscope, exists := r.Repo.TagInfo(tid) if !exists { log.Warnf("Tag does not exist (ID): %d", tid) - return nil, fmt.Errorf("Tag does not exist (ID): %d", tid) + return nil, fmt.Errorf("tag does not exist (ID): %d", tid) } // Test Access: Admins && Admin Tag OR Everyone && Private Tag @@ -283,7 +283,7 @@ func (r *mutationResolver) RemoveTagFromList(ctx context.Context, tagIds []strin } } else { log.Warnf("Not authorized to remove tag: %d", tid) - return nil, fmt.Errorf("Not authorized to remove tag: %d", tid) + return nil, fmt.Errorf("not authorized to remove tag: %d", tid) } } return tags, nil @@ -499,10 +499,7 @@ func (r *queryResolver) Jobs(ctx context.Context, filter []*model.JobFilter, pag return nil, err } - hasNextPage := false - if len(nextJobs) == 1 { - hasNextPage = true - } + hasNextPage := len(nextJobs) == 1 return &model.JobResultList{Items: jobs, Count: &count, HasNextPage: &hasNextPage}, nil } @@ -513,8 +510,8 @@ func (r *queryResolver) JobsStatistics(ctx context.Context, filter []*model.JobF var stats []*model.JobsStatistics // Top Level Defaults - var defaultDurationBins string = "1h" - var defaultMetricBins int = 10 + defaultDurationBins := "1h" + defaultMetricBins := 10 if requireField(ctx, "totalJobs") || requireField(ctx, "totalWalltime") || requireField(ctx, "totalNodes") || requireField(ctx, "totalCores") || requireField(ctx, "totalAccs") || requireField(ctx, "totalNodeHours") || requireField(ctx, "totalCoreHours") || requireField(ctx, "totalAccHours") { @@ -779,9 +776,11 @@ func (r *Resolver) Query() generated.QueryResolver { return &queryResolver{r} } // SubCluster returns generated.SubClusterResolver implementation. func (r *Resolver) SubCluster() generated.SubClusterResolver { return &subClusterResolver{r} } -type clusterResolver struct{ *Resolver } -type jobResolver struct{ *Resolver } -type metricValueResolver struct{ *Resolver } -type mutationResolver struct{ *Resolver } -type queryResolver struct{ *Resolver } -type subClusterResolver struct{ *Resolver } +type ( + clusterResolver struct{ *Resolver } + jobResolver struct{ *Resolver } + metricValueResolver struct{ *Resolver } + mutationResolver struct{ *Resolver } + queryResolver struct{ *Resolver } + subClusterResolver struct{ *Resolver } +) diff --git a/internal/importer/importer_test.go b/internal/importer/importer_test.go index 209b6be..d2bb0b4 100644 --- a/internal/importer/importer_test.go +++ b/internal/importer/importer_test.go @@ -166,7 +166,7 @@ func TestHandleImportFlag(t *testing.T) { } result := readResult(t, testname) - job, err := r.Find(&result.JobId, &result.Cluster, &result.StartTime) + job, err := r.FindCached(&result.JobId, &result.Cluster, &result.StartTime) if err != nil { t.Fatal(err) } diff --git a/internal/repository/job.go b/internal/repository/job.go index 84de6f7..3702099 100644 --- a/internal/repository/job.go +++ b/internal/repository/job.go @@ -9,12 +9,12 @@ import ( "encoding/json" "errors" "fmt" + "maps" "math" "strconv" "sync" "time" - "github.com/ClusterCockpit/cc-backend/internal/graph/model" "github.com/ClusterCockpit/cc-backend/pkg/archive" "github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/lrucache" @@ -33,6 +33,7 @@ type JobRepository struct { stmtCache *sq.StmtCache cache *lrucache.Cache driver string + Mutex sync.Mutex } func GetJobRepository() *JobRepository { @@ -51,17 +52,29 @@ func GetJobRepository() *JobRepository { } var jobColumns []string = []string{ - "job.id", "job.job_id", "job.hpc_user", "job.project", "job.cluster", "job.subcluster", "job.start_time", "job.cluster_partition", "job.array_job_id", - "job.num_nodes", "job.num_hwthreads", "job.num_acc", "job.exclusive", "job.monitoring_status", "job.smt", "job.job_state", - "job.duration", "job.walltime", "job.resources", "job.footprint", "job.energy", + "job.id", "job.job_id", "job.hpc_user", "job.project", "job.cluster", "job.subcluster", + "job.start_time", "job.cluster_partition", "job.array_job_id", "job.num_nodes", + "job.num_hwthreads", "job.num_acc", "job.exclusive", "job.monitoring_status", + "job.smt", "job.job_state", "job.duration", "job.walltime", "job.resources", + "job.footprint", "job.energy", } -func scanJob(row interface{ Scan(...interface{}) error }) (*schema.Job, error) { +var jobCacheColumns []string = []string{ + "job_cache.id", "job_cache.job_id", "job_cache.hpc_user", "job_cache.project", "job_cache.cluster", + "job_cache.subcluster", "job_cache.start_time", "job_cache.cluster_partition", + "job_cache.array_job_id", "job_cache.num_nodes", "job_cache.num_hwthreads", + "job_cache.num_acc", "job_cache.exclusive", "job_cache.monitoring_status", "job_cache.smt", + "job_cache.job_state", "job_cache.duration", "job_cache.walltime", "job_cache.resources", + "job_cache.footprint", "job_cache.energy", +} + +func scanJob(row interface{ Scan(...any) error }) (*schema.Job, error) { job := &schema.Job{} if err := row.Scan( - &job.ID, &job.JobID, &job.User, &job.Project, &job.Cluster, &job.SubCluster, &job.StartTimeUnix, &job.Partition, &job.ArrayJobId, - &job.NumNodes, &job.NumHWThreads, &job.NumAcc, &job.Exclusive, &job.MonitoringStatus, &job.SMT, &job.State, + &job.ID, &job.JobID, &job.User, &job.Project, &job.Cluster, &job.SubCluster, + &job.StartTimeUnix, &job.Partition, &job.ArrayJobId, &job.NumNodes, &job.NumHWThreads, + &job.NumAcc, &job.Exclusive, &job.MonitoringStatus, &job.SMT, &job.State, &job.Duration, &job.Walltime, &job.RawResources, &job.RawFootprint, &job.Energy); err != nil { log.Warnf("Error while scanning rows (Job): %v", err) return nil, err @@ -138,17 +151,6 @@ func (r *JobRepository) Flush() error { return nil } -func scanJobLink(row interface{ Scan(...interface{}) error }) (*model.JobLink, error) { - jobLink := &model.JobLink{} - if err := row.Scan( - &jobLink.ID, &jobLink.JobID); err != nil { - log.Warn("Error while scanning rows (jobLink)") - return nil, err - } - - return jobLink, nil -} - func (r *JobRepository) FetchMetadata(job *schema.Job) (map[string]string, error) { start := time.Now() cachekey := fmt.Sprintf("metadata:%d", job.ID) @@ -189,9 +191,7 @@ func (r *JobRepository) UpdateMetadata(job *schema.Job, key, val string) (err er if job.MetaData != nil { cpy := make(map[string]string, len(job.MetaData)+1) - for k, v := range job.MetaData { - cpy[k] = v - } + maps.Copy(cpy, job.MetaData) cpy[key] = val job.MetaData = cpy } else { @@ -389,7 +389,7 @@ func (r *JobRepository) FindColumnValues(user *schema.User, query string, table func (r *JobRepository) Partitions(cluster string) ([]string, error) { var err error start := time.Now() - partitions := r.cache.Get("partitions:"+cluster, func() (interface{}, time.Duration, int) { + partitions := r.cache.Get("partitions:"+cluster, func() (any, time.Duration, int) { parts := []string{} if err = r.DB.Select(&parts, `SELECT DISTINCT job.cluster_partition FROM job WHERE job.cluster = ?;`, cluster); err != nil { return nil, 0, 1000 @@ -477,6 +477,7 @@ func (r *JobRepository) StopJobsExceedingWalltimeBy(seconds int) error { return nil } +// FIXME: Reconsider filtering short jobs with harcoded threshold func (r *JobRepository) FindRunningJobs(cluster string) ([]*schema.Job, error) { query := sq.Select(jobColumns...).From("job"). Where(fmt.Sprintf("job.cluster = '%s'", cluster)). diff --git a/internal/repository/jobCreate.go b/internal/repository/jobCreate.go index 9e47974..f286c68 100644 --- a/internal/repository/jobCreate.go +++ b/internal/repository/jobCreate.go @@ -13,6 +13,14 @@ import ( sq "github.com/Masterminds/squirrel" ) +const NamedJobCacheInsert string = `INSERT INTO job_cache ( + job_id, hpc_user, project, cluster, subcluster, cluster_partition, array_job_id, num_nodes, num_hwthreads, num_acc, + exclusive, monitoring_status, smt, job_state, start_time, duration, walltime, footprint, energy, energy_footprint, resources, meta_data +) VALUES ( + :job_id, :hpc_user, :project, :cluster, :subcluster, :cluster_partition, :array_job_id, :num_nodes, :num_hwthreads, :num_acc, + :exclusive, :monitoring_status, :smt, :job_state, :start_time, :duration, :walltime, :footprint, :energy, :energy_footprint, :resources, :meta_data +);` + const NamedJobInsert string = `INSERT INTO job ( job_id, hpc_user, project, cluster, subcluster, cluster_partition, array_job_id, num_nodes, num_hwthreads, num_acc, exclusive, monitoring_status, smt, job_state, start_time, duration, walltime, footprint, energy, energy_footprint, resources, meta_data @@ -22,7 +30,9 @@ const NamedJobInsert string = `INSERT INTO job ( );` func (r *JobRepository) InsertJob(job *schema.JobMeta) (int64, error) { - res, err := r.DB.NamedExec(NamedJobInsert, job) + r.Mutex.Lock() + res, err := r.DB.NamedExec(NamedJobCacheInsert, job) + r.Mutex.Unlock() if err != nil { log.Warn("Error while NamedJobInsert") return 0, err @@ -36,6 +46,45 @@ func (r *JobRepository) InsertJob(job *schema.JobMeta) (int64, error) { return id, nil } +func (r *JobRepository) SyncJobs() ([]*schema.Job, error) { + r.Mutex.Lock() + defer r.Mutex.Unlock() + + query := sq.Select(jobCacheColumns...).From("job_cache") + + rows, err := query.RunWith(r.stmtCache).Query() + if err != nil { + log.Errorf("Error while running query %v", err) + return nil, err + } + + jobs := make([]*schema.Job, 0, 50) + for rows.Next() { + job, err := scanJob(rows) + if err != nil { + rows.Close() + log.Warn("Error while scanning rows") + return nil, err + } + jobs = append(jobs, job) + } + + _, err = r.DB.Exec( + "INSERT INTO job (job_id, cluster, subcluster, start_time, hpc_user, project, cluster_partition, array_job_id, num_nodes, num_hwthreads, num_acc, exclusive, monitoring_status, smt, job_state, duration, walltime, footprint, energy, energy_footprint, resources, meta_data) SELECT job_id, cluster, subcluster, start_time, hpc_user, project, cluster_partition, array_job_id, num_nodes, num_hwthreads, num_acc, exclusive, monitoring_status, smt, job_state, duration, walltime, footprint, energy, energy_footprint, resources, meta_data FROM job_cache") + if err != nil { + log.Warnf("Error while Job sync: %v", err) + return nil, err + } + + _, err = r.DB.Exec("DELETE FROM job_cache") + if err != nil { + log.Warnf("Error while Job cache clean: %v", err) + return nil, err + } + + return jobs, nil +} + // Start inserts a new job in the table, returning the unique job ID. // Statistics are not transfered! func (r *JobRepository) Start(job *schema.JobMeta) (id int64, err error) { @@ -73,3 +122,19 @@ func (r *JobRepository) Stop( _, err = stmt.RunWith(r.stmtCache).Exec() return } + +func (r *JobRepository) StopCached( + jobId int64, + duration int32, + state schema.JobState, + monitoringStatus int32, +) (err error) { + stmt := sq.Update("job_cache"). + Set("job_state", state). + Set("duration", duration). + Set("monitoring_status", monitoringStatus). + Where("job.id = ?", jobId) + + _, err = stmt.RunWith(r.stmtCache).Exec() + return +} diff --git a/internal/repository/jobFind.go b/internal/repository/jobFind.go index 1e2ccb8..b820084 100644 --- a/internal/repository/jobFind.go +++ b/internal/repository/jobFind.go @@ -43,6 +43,26 @@ func (r *JobRepository) Find( return scanJob(q.RunWith(r.stmtCache).QueryRow()) } +func (r *JobRepository) FindCached( + jobId *int64, + cluster *string, + startTime *int64, +) (*schema.Job, error) { + q := sq.Select(jobCacheColumns...).From("job_cache"). + Where("job_cache.job_id = ?", *jobId) + + if cluster != nil { + q = q.Where("job_cache.cluster = ?", *cluster) + } + if startTime != nil { + q = q.Where("job_cache.start_time = ?", *startTime) + } + + q = q.OrderBy("job_cache.id DESC") // always use newest matching job by db id if more than one match + + return scanJob(q.RunWith(r.stmtCache).QueryRow()) +} + // Find executes a SQL query to find a specific batch job. // The job is queried using the batch job id, the cluster name, // and the start time of the job in UNIX epoch time seconds. @@ -83,6 +103,35 @@ func (r *JobRepository) FindAll( return jobs, nil } +// Get complete joblist only consisting of db ids. +// This is useful to process large job counts and intended to be used +// together with FindById to process jobs one by one +func (r *JobRepository) GetJobList() ([]int64, error) { + query := sq.Select("id").From("job"). + Where("job.job_state != 'running'") + + rows, err := query.RunWith(r.stmtCache).Query() + if err != nil { + log.Error("Error while running query") + return nil, err + } + + jl := make([]int64, 0, 1000) + for rows.Next() { + var id int64 + err := rows.Scan(&id) + if err != nil { + rows.Close() + log.Warn("Error while scanning rows") + return nil, err + } + jl = append(jl, id) + } + + log.Infof("Return job count %d", len(jl)) + return jl, nil +} + // FindById executes a SQL query to find a specific batch job. // The job is queried using the database id. // It returns a pointer to a schema.Job data structure and an error variable. diff --git a/internal/repository/jobHooks.go b/internal/repository/jobHooks.go new file mode 100644 index 0000000..1016335 --- /dev/null +++ b/internal/repository/jobHooks.go @@ -0,0 +1,57 @@ +// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. +package repository + +import ( + "sync" + + "github.com/ClusterCockpit/cc-backend/pkg/schema" +) + +type JobHook interface { + JobStartCallback(job *schema.Job) + JobStopCallback(job *schema.Job) +} + +var ( + initOnce sync.Once + hooks []JobHook +) + +func RegisterJobJook(hook JobHook) { + initOnce.Do(func() { + hooks = make([]JobHook, 0) + }) + + if hook != nil { + hooks = append(hooks, hook) + } +} + +func CallJobStartHooks(jobs []*schema.Job) { + if hooks == nil { + return + } + + for _, hook := range hooks { + if hook != nil { + for _, job := range jobs { + hook.JobStartCallback(job) + } + } + } +} + +func CallJobStopHooks(job *schema.Job) { + if hooks == nil { + return + } + + for _, hook := range hooks { + if hook != nil { + hook.JobStopCallback(job) + } + } +} diff --git a/internal/repository/jobQuery.go b/internal/repository/jobQuery.go index 6a2ddec..2f72e77 100644 --- a/internal/repository/jobQuery.go +++ b/internal/repository/jobQuery.go @@ -148,9 +148,7 @@ func BuildWhereClause(filter *model.JobFilter, query sq.SelectBuilder) sq.Select } if filter.DbID != nil { dbIDs := make([]string, len(filter.DbID)) - for i, val := range filter.DbID { - dbIDs[i] = val - } + copy(dbIDs, filter.DbID) query = query.Where(sq.Eq{"job.id": dbIDs}) } if filter.JobID != nil { diff --git a/internal/repository/migration.go b/internal/repository/migration.go index 0b2591e..c0693da 100644 --- a/internal/repository/migration.go +++ b/internal/repository/migration.go @@ -16,7 +16,7 @@ import ( "github.com/golang-migrate/migrate/v4/source/iofs" ) -const Version uint = 8 +const Version uint = 9 //go:embed migrations/* var migrationFiles embed.FS @@ -115,8 +115,17 @@ func MigrateDB(backend string, db string) error { } v, dirty, err := m.Version() + if err != nil { + if err == migrate.ErrNilVersion { + log.Warn("Legacy database without version or missing database file!") + } else { + return err + } + } - log.Infof("unsupported database version %d, need %d.\nPlease backup your database file and run cc-backend -migrate-db", v, Version) + if v < Version { + log.Infof("unsupported database version %d, need %d.\nPlease backup your database file and run cc-backend -migrate-db", v, Version) + } if dirty { return fmt.Errorf("last migration to version %d has failed, please fix the db manually and force version with -force-db flag", Version) diff --git a/internal/repository/migrations/sqlite3/09_add-job-cache.down.sql b/internal/repository/migrations/sqlite3/09_add-job-cache.down.sql new file mode 100644 index 0000000..ef257cf --- /dev/null +++ b/internal/repository/migrations/sqlite3/09_add-job-cache.down.sql @@ -0,0 +1 @@ +DROP TABLE IF EXISTS job_cache; diff --git a/internal/repository/migrations/sqlite3/09_add-job-cache.up.sql b/internal/repository/migrations/sqlite3/09_add-job-cache.up.sql new file mode 100644 index 0000000..7840369 --- /dev/null +++ b/internal/repository/migrations/sqlite3/09_add-job-cache.up.sql @@ -0,0 +1,31 @@ +CREATE TABLE "job_cache" ( + id INTEGER PRIMARY KEY, + job_id BIGINT NOT NULL, + cluster VARCHAR(255) NOT NULL, + subcluster VARCHAR(255) NOT NULL, + start_time BIGINT NOT NULL, -- Unix timestamp + hpc_user VARCHAR(255) NOT NULL, + project VARCHAR(255) NOT NULL, + cluster_partition VARCHAR(255), + array_job_id BIGINT, + duration INT NOT NULL, + walltime INT NOT NULL, + job_state VARCHAR(255) NOT NULL + CHECK (job_state IN ( + 'running', 'completed', 'failed', 'cancelled', + 'stopped', 'timeout', 'preempted', 'out_of_memory' + )), + meta_data TEXT, -- JSON + resources TEXT NOT NULL, -- JSON + num_nodes INT NOT NULL, + num_hwthreads INT, + num_acc INT, + smt TINYINT NOT NULL DEFAULT 1 CHECK (smt IN (0, 1)), + exclusive TINYINT NOT NULL DEFAULT 1 CHECK (exclusive IN (0, 1, 2)), + monitoring_status TINYINT NOT NULL DEFAULT 1 + CHECK (monitoring_status IN (0, 1, 2, 3)), + energy REAL NOT NULL DEFAULT 0.0, + energy_footprint TEXT DEFAULT NULL, + footprint TEXT DEFAULT NULL, + UNIQUE (job_id, cluster, start_time) +); diff --git a/internal/repository/tags.go b/internal/repository/tags.go index 544163e..a9416c4 100644 --- a/internal/repository/tags.go +++ b/internal/repository/tags.go @@ -45,6 +45,36 @@ func (r *JobRepository) AddTag(user *schema.User, job int64, tag int64) ([]*sche return tags, archive.UpdateTags(j, archiveTags) } +func (r *JobRepository) AddTagDirect(job int64, tag int64) ([]*schema.Tag, error) { + j, err := r.FindByIdDirect(job) + if err != nil { + log.Warn("Error while finding job by id") + return nil, err + } + + q := sq.Insert("jobtag").Columns("job_id", "tag_id").Values(job, tag) + + if _, err := q.RunWith(r.stmtCache).Exec(); err != nil { + s, _, _ := q.ToSql() + log.Errorf("Error adding tag with %s: %v", s, err) + return nil, err + } + + tags, err := r.GetTagsDirect(&job) + if err != nil { + log.Warn("Error while getting tags for job") + return nil, err + } + + archiveTags, err := r.getArchiveTags(&job) + if err != nil { + log.Warn("Error while getting tags for job") + return nil, err + } + + return tags, archive.UpdateTags(j, archiveTags) +} + // Removes a tag from a job by tag id func (r *JobRepository) RemoveTag(user *schema.User, job, tag int64) ([]*schema.Tag, error) { j, err := r.FindByIdWithUser(user, job) @@ -82,7 +112,7 @@ func (r *JobRepository) RemoveJobTagByRequest(user *schema.User, job int64, tagT tagID, exists := r.TagId(tagType, tagName, tagScope) if !exists { log.Warnf("Tag does not exist (name, type, scope): %s, %s, %s", tagName, tagType, tagScope) - return nil, fmt.Errorf("Tag does not exist (name, type, scope): %s, %s, %s", tagName, tagType, tagScope) + return nil, fmt.Errorf("tag does not exist (name, type, scope): %s, %s, %s", tagName, tagType, tagScope) } // Get Job @@ -122,7 +152,7 @@ func (r *JobRepository) RemoveTagByRequest(tagType string, tagName string, tagSc tagID, exists := r.TagId(tagType, tagName, tagScope) if !exists { log.Warnf("Tag does not exist (name, type, scope): %s, %s, %s", tagName, tagType, tagScope) - return fmt.Errorf("Tag does not exist (name, type, scope): %s, %s, %s", tagName, tagType, tagScope) + return fmt.Errorf("tag does not exist (name, type, scope): %s, %s, %s", tagName, tagType, tagScope) } // Handle Delete JobTagTable @@ -291,6 +321,37 @@ func (r *JobRepository) AddTagOrCreate(user *schema.User, jobId int64, tagType s return tagId, nil } +func (r *JobRepository) AddTagOrCreateDirect(jobId int64, tagType string, tagName string) (tagId int64, err error) { + tagScope := "global" + + tagId, exists := r.TagId(tagType, tagName, tagScope) + if !exists { + tagId, err = r.CreateTag(tagType, tagName, tagScope) + if err != nil { + return 0, err + } + } + + if _, err := r.AddTagDirect(jobId, tagId); err != nil { + return 0, err + } + + return tagId, nil +} + +func (r *JobRepository) HasTag(jobId int64, tagType string, tagName string) bool { + var id int64 + q := sq.Select("id").From("tag").Join("jobtag ON jobtag.tag_id = tag.id"). + Where("jobtag.job_id = ?", jobId).Where("tag.tag_type = ?", tagType). + Where("tag.tag_name = ?", tagName) + err := q.RunWith(r.stmtCache).QueryRow().Scan(&id) + if err != nil { + return false + } else { + return true + } +} + // TagId returns the database id of the tag with the specified type and name. func (r *JobRepository) TagId(tagType string, tagName string, tagScope string) (tagId int64, exists bool) { exists = true @@ -346,6 +407,32 @@ func (r *JobRepository) GetTags(user *schema.User, job *int64) ([]*schema.Tag, e return tags, nil } +func (r *JobRepository) GetTagsDirect(job *int64) ([]*schema.Tag, error) { + q := sq.Select("id", "tag_type", "tag_name", "tag_scope").From("tag") + if job != nil { + q = q.Join("jobtag ON jobtag.tag_id = tag.id").Where("jobtag.job_id = ?", *job) + } + + rows, err := q.RunWith(r.stmtCache).Query() + if err != nil { + s, _, _ := q.ToSql() + log.Errorf("Error get tags with %s: %v", s, err) + return nil, err + } + + tags := make([]*schema.Tag, 0) + for rows.Next() { + tag := &schema.Tag{} + if err := rows.Scan(&tag.ID, &tag.Type, &tag.Name, &tag.Scope); err != nil { + log.Warn("Error while scanning rows") + return nil, err + } + tags = append(tags, tag) + } + + return tags, nil +} + // GetArchiveTags returns a list of all tags *regardless of scope* for archiving if job is nil or of the tags that the job with that database ID has. func (r *JobRepository) getArchiveTags(job *int64) ([]*schema.Tag, error) { q := sq.Select("id", "tag_type", "tag_name", "tag_scope").From("tag") diff --git a/internal/tagger/apps/gromacs.txt b/internal/tagger/apps/gromacs.txt new file mode 100644 index 0000000..c5d939b --- /dev/null +++ b/internal/tagger/apps/gromacs.txt @@ -0,0 +1,4 @@ +GROMACS +gromacs +GMX +mdrun diff --git a/internal/tagger/apps/openfoam.txt b/internal/tagger/apps/openfoam.txt new file mode 100644 index 0000000..542d645 --- /dev/null +++ b/internal/tagger/apps/openfoam.txt @@ -0,0 +1 @@ +openfoam diff --git a/internal/tagger/apps/python.txt b/internal/tagger/apps/python.txt new file mode 100644 index 0000000..7a5c661 --- /dev/null +++ b/internal/tagger/apps/python.txt @@ -0,0 +1,3 @@ +python +anaconda +conda diff --git a/internal/tagger/apps/vasp.txt b/internal/tagger/apps/vasp.txt new file mode 100644 index 0000000..eec9092 --- /dev/null +++ b/internal/tagger/apps/vasp.txt @@ -0,0 +1,2 @@ +VASP +vasp diff --git a/internal/tagger/classifyJob.go b/internal/tagger/classifyJob.go new file mode 100644 index 0000000..6fd3fae --- /dev/null +++ b/internal/tagger/classifyJob.go @@ -0,0 +1,322 @@ +// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. This file is part of cc-backend. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. +package tagger + +import ( + "bytes" + "embed" + "encoding/json" + "fmt" + "maps" + "os" + "strings" + "text/template" + + "github.com/ClusterCockpit/cc-backend/internal/repository" + "github.com/ClusterCockpit/cc-backend/internal/util" + "github.com/ClusterCockpit/cc-backend/pkg/archive" + "github.com/ClusterCockpit/cc-backend/pkg/log" + "github.com/ClusterCockpit/cc-backend/pkg/schema" + "github.com/expr-lang/expr" + "github.com/expr-lang/expr/vm" +) + +//go:embed jobclasses/* +var jobclassFiles embed.FS + +type Variable struct { + Name string `json:"name"` + Expr string `json:"expr"` +} + +type ruleVariable struct { + name string + expr *vm.Program +} + +type RuleFormat struct { + Name string `json:"name"` + Tag string `json:"tag"` + Parameters []string `json:"parameters"` + Metrics []string `json:"metrics"` + Requirements []string `json:"requirements"` + Variables []Variable `json:"variables"` + Rule string `json:"rule"` + Hint string `json:"hint"` +} + +type ruleInfo struct { + env map[string]any + metrics []string + requirements []*vm.Program + variables []ruleVariable + rule *vm.Program + hint *template.Template +} + +type JobClassTagger struct { + rules map[string]ruleInfo + parameters map[string]any + tagType string + cfgPath string +} + +func (t *JobClassTagger) prepareRule(b []byte, fns string) { + var rule RuleFormat + if err := json.NewDecoder(bytes.NewReader(b)).Decode(&rule); err != nil { + log.Warn("Error while decoding raw job meta json") + return + } + + ri := ruleInfo{} + ri.env = make(map[string]any) + ri.metrics = make([]string, 0) + ri.requirements = make([]*vm.Program, 0) + ri.variables = make([]ruleVariable, 0) + + // check if all required parameters are available + for _, p := range rule.Parameters { + param, ok := t.parameters[p] + if !ok { + log.Warnf("prepareRule() > missing parameter %s in rule %s", p, fns) + return + } + ri.env[p] = param + } + + // set all required metrics + ri.metrics = append(ri.metrics, rule.Metrics...) + + // compile requirements + for _, r := range rule.Requirements { + req, err := expr.Compile(r, expr.AsBool()) + if err != nil { + log.Errorf("error compiling requirement %s: %#v", r, err) + return + } + ri.requirements = append(ri.requirements, req) + } + + // compile variables + for _, v := range rule.Variables { + req, err := expr.Compile(v.Expr, expr.AsFloat64()) + if err != nil { + log.Errorf("error compiling requirement %s: %#v", v.Name, err) + return + } + ri.variables = append(ri.variables, ruleVariable{name: v.Name, expr: req}) + } + + // compile rule + exp, err := expr.Compile(rule.Rule, expr.AsBool()) + if err != nil { + log.Errorf("error compiling rule %s: %#v", fns, err) + return + } + ri.rule = exp + + // prepare hint template + ri.hint, err = template.New(fns).Parse(rule.Hint) + if err != nil { + log.Errorf("error processing template %s: %#v", fns, err) + } + log.Infof("prepareRule() > processing %s with %d requirements and %d variables", fns, len(ri.requirements), len(ri.variables)) + + t.rules[rule.Tag] = ri +} + +func (t *JobClassTagger) EventMatch(s string) bool { + return strings.Contains(s, "jobclasses") +} + +// FIXME: Only process the file that caused the event +func (t *JobClassTagger) EventCallback() { + files, err := os.ReadDir(t.cfgPath) + if err != nil { + log.Fatal(err) + } + + if util.CheckFileExists(t.cfgPath + "/parameters.json") { + log.Info("Merge parameters") + b, err := os.ReadFile(t.cfgPath + "/parameters.json") + if err != nil { + log.Warnf("prepareRule() > open file error: %v", err) + } + + var paramTmp map[string]any + if err := json.NewDecoder(bytes.NewReader(b)).Decode(¶mTmp); err != nil { + log.Warn("Error while decoding parameters.json") + } + + maps.Copy(t.parameters, paramTmp) + } + + for _, fn := range files { + fns := fn.Name() + if fns != "parameters.json" { + log.Debugf("Process: %s", fns) + filename := fmt.Sprintf("%s/%s", t.cfgPath, fns) + b, err := os.ReadFile(filename) + if err != nil { + log.Warnf("prepareRule() > open file error: %v", err) + return + } + t.prepareRule(b, fns) + } + } +} + +func (t *JobClassTagger) initParameters() error { + log.Info("Initialize parameters") + b, err := jobclassFiles.ReadFile("jobclasses/parameters.json") + if err != nil { + log.Warnf("prepareRule() > open file error: %v", err) + return err + } + + if err := json.NewDecoder(bytes.NewReader(b)).Decode(&t.parameters); err != nil { + log.Warn("Error while decoding parameters.json") + return err + } + + return nil +} + +func (t *JobClassTagger) Register() error { + t.cfgPath = "./var/tagger/jobclasses" + t.tagType = "jobClass" + + err := t.initParameters() + if err != nil { + log.Warnf("error reading parameters.json: %v", err) + return err + } + + files, err := jobclassFiles.ReadDir("jobclasses") + if err != nil { + return fmt.Errorf("error reading app folder: %#v", err) + } + t.rules = make(map[string]ruleInfo, 0) + for _, fn := range files { + fns := fn.Name() + if fns != "parameters.json" { + filename := fmt.Sprintf("jobclasses/%s", fns) + log.Infof("Process: %s", fns) + + b, err := jobclassFiles.ReadFile(filename) + if err != nil { + log.Warnf("prepareRule() > open file error: %v", err) + return err + } + t.prepareRule(b, fns) + } + } + + if util.CheckFileExists(t.cfgPath) { + t.EventCallback() + log.Infof("Setup file watch for %s", t.cfgPath) + util.AddListener(t.cfgPath, t) + } + + return nil +} + +func (t *JobClassTagger) Match(job *schema.Job) { + r := repository.GetJobRepository() + jobstats, err := archive.GetStatistics(job) + metricsList := archive.GetMetricConfigSubCluster(job.Cluster, job.SubCluster) + log.Infof("Enter match rule with %d rules for job %d", len(t.rules), job.JobID) + if err != nil { + log.Errorf("job classification failed for job %d: %#v", job.JobID, err) + return + } + + for tag, ri := range t.rules { + env := make(map[string]any) + maps.Copy(env, ri.env) + log.Infof("Try to match rule %s for job %d", tag, job.JobID) + + // Initialize environment + env["job"] = map[string]any{ + "exclusive": job.Exclusive, + "duration": job.Duration, + "numCores": job.NumHWThreads, + "numNodes": job.NumNodes, + "jobState": job.State, + "numAcc": job.NumAcc, + "smt": job.SMT, + } + + // add metrics to env + for _, m := range ri.metrics { + stats, ok := jobstats[m] + if !ok { + log.Errorf("job classification failed for job %d: missing metric '%s'", job.JobID, m) + return + } + env[m] = map[string]any{ + "min": stats.Min, + "max": stats.Max, + "avg": stats.Avg, + "limits": map[string]float64{ + "peak": metricsList[m].Peak, + "normal": metricsList[m].Normal, + "caution": metricsList[m].Caution, + "alert": metricsList[m].Alert, + }, + } + } + + // check rule requirements apply + for _, r := range ri.requirements { + ok, err := expr.Run(r, env) + if err != nil { + log.Errorf("error running requirement for rule %s: %#v", tag, err) + return + } + if !ok.(bool) { + log.Infof("requirement for rule %s not met", tag) + return + } + } + + // validate rule expression + for _, v := range ri.variables { + value, err := expr.Run(v.expr, env) + if err != nil { + log.Errorf("error running rule %s: %#v", tag, err) + return + } + env[v.name] = value + } + + // dump.P(env) + + match, err := expr.Run(ri.rule, env) + if err != nil { + log.Errorf("error running rule %s: %#v", tag, err) + return + } + if match.(bool) { + log.Info("Rule matches!") + id := job.ID + if !r.HasTag(id, t.tagType, tag) { + r.AddTagOrCreateDirect(id, t.tagType, tag) + } + + // process hint template + var msg bytes.Buffer + if err := ri.hint.Execute(&msg, env); err != nil { + log.Errorf("Template error: %s", err.Error()) + return + } + + // FIXME: Handle case where multiple tags apply + r.UpdateMetadata(job, "message", msg.String()) + } else { + log.Info("Rule does not match!") + } + } +} diff --git a/internal/tagger/detectApp.go b/internal/tagger/detectApp.go new file mode 100644 index 0000000..7945b48 --- /dev/null +++ b/internal/tagger/detectApp.go @@ -0,0 +1,125 @@ +// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. This file is part of cc-backend. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. +package tagger + +import ( + "bufio" + "embed" + "fmt" + "io/fs" + "os" + "path/filepath" + "strings" + + "github.com/ClusterCockpit/cc-backend/internal/repository" + "github.com/ClusterCockpit/cc-backend/internal/util" + "github.com/ClusterCockpit/cc-backend/pkg/log" + "github.com/ClusterCockpit/cc-backend/pkg/schema" +) + +//go:embed apps/* +var appFiles embed.FS + +type appInfo struct { + tag string + strings []string +} + +type AppTagger struct { + apps map[string]appInfo + tagType string + cfgPath string +} + +func (t *AppTagger) scanApp(f fs.File, fns string) { + scanner := bufio.NewScanner(f) + ai := appInfo{tag: strings.TrimSuffix(fns, filepath.Ext(fns)), strings: make([]string, 0)} + + for scanner.Scan() { + ai.strings = append(ai.strings, scanner.Text()) + } + delete(t.apps, ai.tag) + t.apps[ai.tag] = ai +} + +func (t *AppTagger) EventMatch(s string) bool { + return strings.Contains(s, "apps") +} + +// FIXME: Only process the file that caused the event +func (t *AppTagger) EventCallback() { + files, err := os.ReadDir(t.cfgPath) + if err != nil { + log.Fatal(err) + } + + for _, fn := range files { + fns := fn.Name() + log.Debugf("Process: %s", fns) + f, err := os.Open(fmt.Sprintf("%s/%s", t.cfgPath, fns)) + if err != nil { + log.Errorf("error opening app file %s: %#v", fns, err) + } + t.scanApp(f, fns) + } +} + +func (t *AppTagger) Register() error { + t.cfgPath = "./var/tagger/apps" + t.tagType = "app" + + files, err := appFiles.ReadDir("apps") + if err != nil { + return fmt.Errorf("error reading app folder: %#v", err) + } + t.apps = make(map[string]appInfo, 0) + for _, fn := range files { + fns := fn.Name() + log.Debugf("Process: %s", fns) + f, err := appFiles.Open(fmt.Sprintf("apps/%s", fns)) + if err != nil { + return fmt.Errorf("error opening app file %s: %#v", fns, err) + } + defer f.Close() + t.scanApp(f, fns) + } + + if util.CheckFileExists(t.cfgPath) { + t.EventCallback() + log.Infof("Setup file watch for %s", t.cfgPath) + util.AddListener(t.cfgPath, t) + } + + return nil +} + +func (t *AppTagger) Match(job *schema.Job) { + r := repository.GetJobRepository() + metadata, err := r.FetchMetadata(job) + if err != nil { + log.Infof("Cannot fetch metadata for job: %d on %s", job.JobID, job.Cluster) + return + } + + jobscript, ok := metadata["jobScript"] + if ok { + id := job.ID + + out: + for _, a := range t.apps { + tag := a.tag + for _, s := range a.strings { + if strings.Contains(jobscript, s) { + if !r.HasTag(id, t.tagType, tag) { + r.AddTagOrCreateDirect(id, t.tagType, tag) + break out + } + } + } + } + } else { + log.Infof("Cannot extract job script for job: %d on %s", job.JobID, job.Cluster) + } +} diff --git a/internal/tagger/detectApp_test.go b/internal/tagger/detectApp_test.go new file mode 100644 index 0000000..3b43cce --- /dev/null +++ b/internal/tagger/detectApp_test.go @@ -0,0 +1,59 @@ +// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. +package tagger + +import ( + "testing" + + "github.com/ClusterCockpit/cc-backend/internal/repository" + "github.com/ClusterCockpit/cc-backend/pkg/log" +) + +func setup(tb testing.TB) *repository.JobRepository { + tb.Helper() + log.Init("warn", true) + dbfile := "../repository/testdata/job.db" + err := repository.MigrateDB("sqlite3", dbfile) + noErr(tb, err) + repository.Connect("sqlite3", dbfile) + return repository.GetJobRepository() +} + +func noErr(tb testing.TB, err error) { + tb.Helper() + + if err != nil { + tb.Fatal("Error is not nil:", err) + } +} + +func TestRegister(t *testing.T) { + var tagger AppTagger + + err := tagger.Register() + noErr(t, err) + + if len(tagger.apps) != 4 { + t.Errorf("wrong summary for diagnostic \ngot: %d \nwant: 3", len(tagger.apps)) + } +} + +func TestMatch(t *testing.T) { + r := setup(t) + + job, err := r.FindByIdDirect(5) + noErr(t, err) + + var tagger AppTagger + + err = tagger.Register() + noErr(t, err) + + tagger.Match(job) + + if !r.HasTag(5, "app", "vasp") { + t.Errorf("missing tag vasp") + } +} diff --git a/internal/tagger/jobclasses/highload.json b/internal/tagger/jobclasses/highload.json new file mode 100644 index 0000000..444ca4d --- /dev/null +++ b/internal/tagger/jobclasses/highload.json @@ -0,0 +1,27 @@ +{ + "name": "Excessive CPU load", + "tag": "excessiveload", + "comment": "Assumptions: all nodes have the same number of cores.", + "parameters": [ + "excessivecpuload_threshold_factor", + "job_min_duration_seconds", + "sampling_interval_seconds" + ], + "metrics": ["cpu_load"], + "requirements": [ + "job.exclusive == 1", + "job.duration > job_min_duration_seconds" + ], + "variables": [ + { + "name": "load_threshold", + "expr": "(job.numCores / job.numNodes) * excessivecpuload_threshold_factor" + }, + { + "name": "load_perc", + "expr": "cpu_load.avg / load_threshold" + } + ], + "rule": "cpu_load > load_threshold", + "hint": "This job was detected as excessiveload because the average cpu load {{.cpu_load}} falls above the threshold {{.load_threshold}}." +} diff --git a/internal/tagger/jobclasses/lowload.json b/internal/tagger/jobclasses/lowload.json new file mode 100644 index 0000000..1d7e041 --- /dev/null +++ b/internal/tagger/jobclasses/lowload.json @@ -0,0 +1,26 @@ +{ + "name": "Low CPU load", + "tag": "lowload", + "parameters": [ + "lowcpuload_threshold_factor", + "job_min_duration_seconds", + "sampling_interval_seconds" + ], + "metrics": ["cpu_load"], + "requirements": [ + "job.exclusive == 1", + "job.duration > job_min_duration_seconds" + ], + "variables": [ + { + "name": "load_threshold", + "expr": "job.numCores * lowcpuload_threshold_factor" + }, + { + "name": "load_perc", + "expr": "1.0 - (cpu_load / load_threshold)" + } + ], + "rule": "cpu_load.avg < load_threshold", + "hint": "This job was detected as lowload because the average cpu load {{.cpu_load}} falls below the threshold {{.load_threshold}}." +} diff --git a/internal/tagger/jobclasses/parameters.json b/internal/tagger/jobclasses/parameters.json new file mode 100644 index 0000000..39e94c1 --- /dev/null +++ b/internal/tagger/jobclasses/parameters.json @@ -0,0 +1,14 @@ +{ + "lowcpuload_threshold_factor": 0.9, + "excessivecpuload_threshold_factor": 1.1, + "highmemoryusage_threshold_factor": 0.9, + "node_load_imbalance_threshold_factor": 0.1, + "core_load_imbalance_threshold_factor": 0.1, + "high_memory_load_threshold_factor": 0.9, + "lowgpuload_threshold_factor": 0.7, + "memory_leak_slope_threshold": 0.1, + "job_min_duration_seconds": 600.0, + "sampling_interval_seconds": 30.0, + "cpu_load_pre_cutoff_samples": 11.0, + "cpu_load_core_pre_cutoff_samples": 6.0 +} diff --git a/internal/tagger/rules.json b/internal/tagger/rules.json new file mode 100644 index 0000000..c88afb4 --- /dev/null +++ b/internal/tagger/rules.json @@ -0,0 +1,21 @@ +{ + "and": [ + { + "in": [ + "a40", + { + "var": "metaData.jobScript" + } + ] + }, + { + ">": [ + { + "var": "statistics.clock.min" + }, + 2000 + ] + } + ] + } + \ No newline at end of file diff --git a/internal/tagger/tagger.go b/internal/tagger/tagger.go new file mode 100644 index 0000000..04edd49 --- /dev/null +++ b/internal/tagger/tagger.go @@ -0,0 +1,88 @@ +// Copyright (C) 2023 NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. +package tagger + +import ( + "sync" + + "github.com/ClusterCockpit/cc-backend/internal/repository" + "github.com/ClusterCockpit/cc-backend/pkg/log" + "github.com/ClusterCockpit/cc-backend/pkg/schema" +) + +type Tagger interface { + Register() error + Match(job *schema.Job) +} + +var ( + initOnce sync.Once + jobTagger *JobTagger +) + +type JobTagger struct { + startTaggers []Tagger + stopTaggers []Tagger +} + +func newTagger() { + jobTagger = &JobTagger{} + jobTagger.startTaggers = make([]Tagger, 0) + jobTagger.startTaggers = append(jobTagger.startTaggers, &AppTagger{}) + jobTagger.stopTaggers = make([]Tagger, 0) + jobTagger.stopTaggers = append(jobTagger.stopTaggers, &JobClassTagger{}) + + for _, tagger := range jobTagger.startTaggers { + tagger.Register() + } + for _, tagger := range jobTagger.stopTaggers { + tagger.Register() + } +} + +func Init() { + initOnce.Do(func() { + newTagger() + repository.RegisterJobJook(jobTagger) + }) +} + +func (jt *JobTagger) JobStartCallback(job *schema.Job) { + for _, tagger := range jt.startTaggers { + tagger.Match(job) + } +} + +func (jt *JobTagger) JobStopCallback(job *schema.Job) { + for _, tagger := range jt.stopTaggers { + tagger.Match(job) + } +} + +func RunTaggers() error { + newTagger() + r := repository.GetJobRepository() + jl, err := r.GetJobList() + if err != nil { + log.Errorf("Error while getting job list %s", err) + return err + } + + for _, id := range jl { + job, err := r.FindByIdDirect(id) + if err != nil { + log.Errorf("Error while getting job %s", err) + return err + } + for _, tagger := range jobTagger.startTaggers { + tagger.Match(job) + } + for _, tagger := range jobTagger.stopTaggers { + log.Infof("Run stop tagger for job %d", job.ID) + tagger.Match(job) + } + } + return nil +} diff --git a/internal/tagger/tagger_test.go b/internal/tagger/tagger_test.go new file mode 100644 index 0000000..057ca17 --- /dev/null +++ b/internal/tagger/tagger_test.go @@ -0,0 +1,31 @@ +// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. +package tagger + +import ( + "testing" + + "github.com/ClusterCockpit/cc-backend/internal/repository" + "github.com/ClusterCockpit/cc-backend/pkg/schema" +) + +func TestInit(t *testing.T) { + Init() +} + +func TestJobStartCallback(t *testing.T) { + Init() + r := setup(t) + job, err := r.FindByIdDirect(2) + noErr(t, err) + + jobs := make([]*schema.Job, 0, 1) + jobs = append(jobs, job) + + repository.CallJobStartHooks(jobs) + if !r.HasTag(2, "app", "python") { + t.Errorf("missing tag python") + } +} diff --git a/internal/taskManager/commitJobService.go b/internal/taskManager/commitJobService.go new file mode 100644 index 0000000..c60acb3 --- /dev/null +++ b/internal/taskManager/commitJobService.go @@ -0,0 +1,35 @@ +// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. +package taskManager + +import ( + "time" + + "github.com/ClusterCockpit/cc-backend/internal/config" + "github.com/ClusterCockpit/cc-backend/internal/repository" + "github.com/ClusterCockpit/cc-backend/pkg/log" + "github.com/go-co-op/gocron/v2" +) + +func RegisterCommitJobService() { + var frequency string + if config.Keys.CronFrequency != nil && config.Keys.CronFrequency.CommitJobWorker != "" { + frequency = config.Keys.CronFrequency.CommitJobWorker + } else { + frequency = "2m" + } + d, _ := time.ParseDuration(frequency) + log.Infof("Register commitJob service with %s interval", frequency) + + s.NewJob(gocron.DurationJob(d), + gocron.NewTask( + func() { + start := time.Now() + log.Printf("Jobcache sync started at %s", start.Format(time.RFC3339)) + jobs, _ := jobRepo.SyncJobs() + repository.CallJobStartHooks(jobs) + log.Printf("Jobcache sync and job callbacks are done and took %s", time.Since(start)) + })) +} diff --git a/internal/taskManager/taskManager.go b/internal/taskManager/taskManager.go index 2004e0d..7d9a3a2 100644 --- a/internal/taskManager/taskManager.go +++ b/internal/taskManager/taskManager.go @@ -81,6 +81,7 @@ func Start() { RegisterFootprintWorker() RegisterUpdateDurationWorker() + RegisterCommitJobService() s.Start() } diff --git a/internal/util/fswatcher.go b/internal/util/fswatcher.go new file mode 100644 index 0000000..5d13462 --- /dev/null +++ b/internal/util/fswatcher.go @@ -0,0 +1,75 @@ +// Copyright (C) 2023 NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. +package util + +import ( + "sync" + + "github.com/ClusterCockpit/cc-backend/pkg/log" + "github.com/fsnotify/fsnotify" +) + +type Listener interface { + EventCallback() + EventMatch(event string) bool +} + +var ( + initOnce sync.Once + w *fsnotify.Watcher + listeners []Listener +) + +func AddListener(path string, l Listener) { + var err error + + initOnce.Do(func() { + var err error + w, err = fsnotify.NewWatcher() + if err != nil { + log.Error("creating a new watcher: %w", err) + } + listeners = make([]Listener, 0) + + go watchLoop(w) + }) + + listeners = append(listeners, l) + err = w.Add(path) + if err != nil { + log.Warnf("%q: %s", path, err) + } +} + +func FsWatcherShutdown() { + if w != nil { + w.Close() + } +} + +func watchLoop(w *fsnotify.Watcher) { + for { + select { + // Read from Errors. + case err, ok := <-w.Errors: + if !ok { // Channel was closed (i.e. Watcher.Close() was called). + return + } + log.Errorf("watch event loop: %s", err) + // Read from Events. + case e, ok := <-w.Events: + if !ok { // Channel was closed (i.e. Watcher.Close() was called). + return + } + + log.Infof("Event %s", e) + for _, l := range listeners { + if l.EventMatch(e.String()) { + l.EventCallback() + } + } + } + } +} diff --git a/pkg/archive/clusterConfig.go b/pkg/archive/clusterConfig.go index d53941b..95520a0 100644 --- a/pkg/archive/clusterConfig.go +++ b/pkg/archive/clusterConfig.go @@ -69,16 +69,18 @@ func initClusterConfig() error { for _, sc := range cluster.SubClusters { newMetric := &schema.MetricConfig{ - Unit: mc.Unit, + Metric: schema.Metric{ + Name: mc.Name, + Unit: mc.Unit, + Peak: mc.Peak, + Normal: mc.Normal, + Caution: mc.Caution, + Alert: mc.Alert, + }, Energy: mc.Energy, - Name: mc.Name, Scope: mc.Scope, Aggregation: mc.Aggregation, - Peak: mc.Peak, - Caution: mc.Caution, - Alert: mc.Alert, Timestep: mc.Timestep, - Normal: mc.Normal, LowerIsBetter: mc.LowerIsBetter, } @@ -167,6 +169,45 @@ func GetSubCluster(cluster, subcluster string) (*schema.SubCluster, error) { return nil, fmt.Errorf("subcluster '%v' not found for cluster '%v', or cluster '%v' not configured", subcluster, cluster, cluster) } +func GetMetricConfigSubCluster(cluster, subcluster string) map[string]*schema.Metric { + metrics := make(map[string]*schema.Metric) + + for _, c := range Clusters { + if c.Name == cluster { + for _, m := range c.MetricConfig { + for _, s := range m.SubClusters { + if s.Name == subcluster { + metrics[m.Name] = &schema.Metric{ + Name: m.Name, + Unit: s.Unit, + Peak: s.Peak, + Normal: s.Normal, + Caution: s.Caution, + Alert: s.Alert, + } + break + } + } + + _, ok := metrics[m.Name] + if !ok { + metrics[m.Name] = &schema.Metric{ + Name: m.Name, + Unit: m.Unit, + Peak: m.Peak, + Normal: m.Normal, + Caution: m.Caution, + Alert: m.Alert, + } + } + } + break + } + } + + return metrics +} + func GetMetricConfig(cluster, metric string) *schema.MetricConfig { for _, c := range Clusters { if c.Name == cluster { diff --git a/pkg/archive/fsBackend.go b/pkg/archive/fsBackend.go index 711b1f5..a59b663 100644 --- a/pkg/archive/fsBackend.go +++ b/pkg/archive/fsBackend.go @@ -59,14 +59,13 @@ func getDirectory( func getPath( job *schema.Job, rootPath string, - file string) string { - + file string, +) string { return filepath.Join( getDirectory(job, rootPath), file) } func loadJobMeta(filename string) (*schema.JobMeta, error) { - b, err := os.ReadFile(filename) if err != nil { log.Errorf("loadJobMeta() > open file error: %v", err) @@ -83,7 +82,6 @@ func loadJobMeta(filename string) (*schema.JobMeta, error) { func loadJobData(filename string, isCompressed bool) (schema.JobData, error) { f, err := os.Open(filename) - if err != nil { log.Errorf("fsBackend LoadJobData()- %v", err) return nil, err @@ -117,7 +115,6 @@ func loadJobData(filename string, isCompressed bool) (schema.JobData, error) { func loadJobStats(filename string, isCompressed bool) (schema.ScopedJobStats, error) { f, err := os.Open(filename) - if err != nil { log.Errorf("fsBackend LoadJobStats()- %v", err) return nil, err @@ -150,7 +147,6 @@ func loadJobStats(filename string, isCompressed bool) (schema.ScopedJobStats, er } func (fsa *FsArchive) Init(rawConfig json.RawMessage) (uint64, error) { - var config FsArchiveConfig if err := json.Unmarshal(rawConfig, &config); err != nil { log.Warnf("Init() > Unmarshal error: %#v", err) @@ -276,7 +272,6 @@ func (fsa *FsArchive) Exists(job *schema.Job) bool { } func (fsa *FsArchive) Clean(before int64, after int64) { - if after == 0 { after = math.MaxInt64 } @@ -392,7 +387,6 @@ func (fsa *FsArchive) Compress(jobs []*schema.Job) { } func (fsa *FsArchive) CompressLast(starttime int64) int64 { - filename := filepath.Join(fsa.path, "compress.txt") b, err := os.ReadFile(filename) if err != nil { @@ -441,7 +435,6 @@ func (fsa *FsArchive) LoadJobMeta(job *schema.Job) (*schema.JobMeta, error) { } func (fsa *FsArchive) LoadClusterCfg(name string) (*schema.Cluster, error) { - b, err := os.ReadFile(filepath.Join(fsa.path, name, "cluster.json")) if err != nil { log.Errorf("LoadClusterCfg() > open file error: %v", err) @@ -456,7 +449,6 @@ func (fsa *FsArchive) LoadClusterCfg(name string) (*schema.Cluster, error) { } func (fsa *FsArchive) Iter(loadMetricData bool) <-chan JobContainer { - ch := make(chan JobContainer) go func() { clustersDir, err := os.ReadDir(fsa.path) @@ -527,7 +519,6 @@ func (fsa *FsArchive) Iter(loadMetricData bool) <-chan JobContainer { } func (fsa *FsArchive) StoreJobMeta(jobMeta *schema.JobMeta) error { - job := schema.Job{ BaseJob: jobMeta.BaseJob, StartTime: time.Unix(jobMeta.StartTime, 0), @@ -556,8 +547,8 @@ func (fsa *FsArchive) GetClusters() []string { func (fsa *FsArchive) ImportJob( jobMeta *schema.JobMeta, - jobData *schema.JobData) error { - + jobData *schema.JobData, +) error { job := schema.Job{ BaseJob: jobMeta.BaseJob, StartTime: time.Unix(jobMeta.StartTime, 0), @@ -583,28 +574,6 @@ func (fsa *FsArchive) ImportJob( return err } - // var isCompressed bool = true - // // TODO Use shortJob Config for check - // if jobMeta.Duration < 300 { - // isCompressed = false - // f, err = os.Create(path.Join(dir, "data.json")) - // } else { - // f, err = os.Create(path.Join(dir, "data.json.gz")) - // } - // if err != nil { - // return err - // } - // - // if isCompressed { - // if err := EncodeJobData(gzip.NewWriter(f), jobData); err != nil { - // return err - // } - // } else { - // if err := EncodeJobData(f, jobData); err != nil { - // return err - // } - // } - f, err = os.Create(path.Join(dir, "data.json")) if err != nil { log.Error("Error while creating filepath for data.json") diff --git a/pkg/archive/nodelist.go b/pkg/archive/nodelist.go index 7700185..26a15d2 100644 --- a/pkg/archive/nodelist.go +++ b/pkg/archive/nodelist.go @@ -61,7 +61,7 @@ func (nl *NodeList) PrintList() []string { } func (nl *NodeList) NodeCount() int { - var out int = 0 + out := 0 for _, term := range *nl { if len(term) == 1 { // If only String-Part in Term: Single Node Name -> add one out += 1 @@ -160,7 +160,7 @@ func (nle NLExprIntRange) limits() []map[string]int { m["start"] = int(nle.start) m["end"] = int(nle.end) m["digits"] = int(nle.digits) - if nle.zeroPadded == true { + if nle.zeroPadded { m["zeroPadded"] = 1 } else { m["zeroPadded"] = 0 @@ -183,14 +183,15 @@ func ParseNodeList(raw string) (NodeList, error) { rawterms := []string{} prevterm := 0 for i := 0; i < len(raw); i++ { - if raw[i] == '[' { + switch raw[i] { + case '[': for i < len(raw) && raw[i] != ']' { i++ } if i == len(raw) { return nil, fmt.Errorf("ARCHIVE/NODELIST > unclosed '['") } - } else if raw[i] == ',' { + case ',': rawterms = append(rawterms, raw[prevterm:i]) prevterm = i + 1 } diff --git a/pkg/schema/cluster.go b/pkg/schema/cluster.go index 322f308..1b9f2cc 100644 --- a/pkg/schema/cluster.go +++ b/pkg/schema/cluster.go @@ -45,31 +45,31 @@ type SubCluster struct { ThreadsPerCore int `json:"threadsPerCore"` } +type Metric struct { + Name string `json:"name"` + Unit Unit `json:"unit"` + Peak float64 `json:"peak"` + Normal float64 `json:"normal"` + Caution float64 `json:"caution"` + Alert float64 `json:"alert"` +} + type SubClusterConfig struct { - Name string `json:"name"` - Footprint string `json:"footprint,omitempty"` - Energy string `json:"energy"` - Peak float64 `json:"peak"` - Normal float64 `json:"normal"` - Caution float64 `json:"caution"` - Alert float64 `json:"alert"` - Remove bool `json:"remove"` - LowerIsBetter bool `json:"lowerIsBetter"` + Metric + Footprint string `json:"footprint,omitempty"` + Energy string `json:"energy"` + Remove bool `json:"remove"` + LowerIsBetter bool `json:"lowerIsBetter"` } type MetricConfig struct { - Unit Unit `json:"unit"` + Metric Energy string `json:"energy"` - Name string `json:"name"` Scope MetricScope `json:"scope"` Aggregation string `json:"aggregation"` Footprint string `json:"footprint,omitempty"` SubClusters []*SubClusterConfig `json:"subClusters,omitempty"` - Peak float64 `json:"peak"` - Caution float64 `json:"caution"` - Alert float64 `json:"alert"` Timestep int `json:"timestep"` - Normal float64 `json:"normal"` LowerIsBetter bool `json:"lowerIsBetter"` } @@ -127,7 +127,7 @@ func (topo *Topology) GetSocketsFromHWThreads( // those in the argument list are assigned to one of the sockets in the first // return value, return true as the second value. TODO: Optimize this, there // must be a more efficient way/algorithm. -func (topo *Topology) GetSocketsFromCores ( +func (topo *Topology) GetSocketsFromCores( cores []int, ) (sockets []int, exclusive bool) { socketsMap := map[int]int{} diff --git a/pkg/schema/config.go b/pkg/schema/config.go index 27d11be..eda3d91 100644 --- a/pkg/schema/config.go +++ b/pkg/schema/config.go @@ -89,6 +89,8 @@ type ResampleConfig struct { } type CronFrequency struct { + // Duration Update Worker [Defaults to '2m'] + CommitJobWorker string `json:"commit-job-worker"` // Duration Update Worker [Defaults to '5m'] DurationWorker string `json:"duration-worker"` // Metric-Footprint Update Worker [Defaults to '10m'] @@ -129,6 +131,8 @@ type ProgramConfig struct { // do not write to the job-archive. DisableArchive bool `json:"disable-archive"` + EnableJobTaggers bool `json:"enable-job-taggers"` + // Validate json input against schema Validate bool `json:"validate"` @@ -150,7 +154,7 @@ type ProgramConfig struct { // If overwritten, at least all the options in the defaults below must // be provided! Most options here can be overwritten by the user. - UiDefaults map[string]interface{} `json:"ui-defaults"` + UiDefaults map[string]any `json:"ui-defaults"` // If exists, will enable dynamic zoom in frontend metric plots using the configured values EnableResampling *ResampleConfig `json:"enable-resampling"` diff --git a/pkg/schema/job.go b/pkg/schema/job.go index 5e3110b..df901b4 100644 --- a/pkg/schema/job.go +++ b/pkg/schema/job.go @@ -145,7 +145,12 @@ const ( JobStateOutOfMemory JobState = "out_of_memory" ) -func (e *JobState) UnmarshalGQL(v interface{}) error { +func (j JobMeta) GoString() string { + return fmt.Sprintf("JobMeta{ID:%d, StartTime:%d, JobID:%v, BaseJob:%v}", + j.ID, j.StartTime, j.JobID, j.BaseJob) +} + +func (e *JobState) UnmarshalGQL(v any) error { str, ok := v.(string) if !ok { return fmt.Errorf("SCHEMA/JOB > enums must be strings") diff --git a/pkg/schema/validate.go b/pkg/schema/validate.go index 3511936..d14adf5 100644 --- a/pkg/schema/validate.go +++ b/pkg/schema/validate.go @@ -28,12 +28,13 @@ const ( //go:embed schemas/* var schemaFiles embed.FS -func Validate(k Kind, r io.Reader) (err error) { +func Validate(k Kind, r io.Reader) error { jsonschema.Loaders["embedfs"] = func(s string) (io.ReadCloser, error) { f := filepath.Join("schemas", strings.Split(s, "//")[1]) return schemaFiles.Open(f) } var s *jsonschema.Schema + var err error switch k { case Meta: @@ -54,7 +55,7 @@ func Validate(k Kind, r io.Reader) (err error) { } var v interface{} - if err := json.NewDecoder(r).Decode(&v); err != nil { + if err = json.NewDecoder(r).Decode(&v); err != nil { log.Warnf("Error while decoding raw json schema: %#v", err) return err }