Merge branch 'master' into 40_45_82_update_roles

This commit is contained in:
Christoph Kluge
2023-02-21 17:17:41 +01:00
66 changed files with 3132 additions and 826 deletions

View File

@@ -5,12 +5,15 @@
package repository
import (
"database/sql"
"fmt"
"log"
"sync"
"time"
"github.com/jmoiron/sqlx"
"github.com/mattn/go-sqlite3"
"github.com/qustavo/sqlhooks/v2"
)
var (
@@ -19,7 +22,8 @@ var (
)
type DBConnection struct {
DB *sqlx.DB
DB *sqlx.DB
Driver string
}
func Connect(driver string, db string) {
@@ -28,7 +32,9 @@ func Connect(driver string, db string) {
dbConnOnce.Do(func() {
if driver == "sqlite3" {
dbHandle, err = sqlx.Open("sqlite3", fmt.Sprintf("%s?_foreign_keys=on", db))
sql.Register("sqlite3WithHooks", sqlhooks.Wrap(&sqlite3.SQLiteDriver{}, &Hooks{}))
dbHandle, err = sqlx.Open("sqlite3WithHooks", fmt.Sprintf("%s?_foreign_keys=on", db))
// dbHandle, err = sqlx.Open("sqlite3", fmt.Sprintf("%s?_foreign_keys=on", db))
if err != nil {
log.Fatal(err)
}
@@ -39,7 +45,7 @@ func Connect(driver string, db string) {
} else if driver == "mysql" {
dbHandle, err = sqlx.Open("mysql", fmt.Sprintf("%s?multiStatements=true", db))
if err != nil {
log.Fatal(err)
log.Fatalf("sqlx.Open() error: %v", err)
}
dbHandle.SetConnMaxLifetime(time.Minute * 3)
@@ -49,7 +55,8 @@ func Connect(driver string, db string) {
log.Fatalf("unsupported database driver: %s", driver)
}
dbConnInstance = &DBConnection{DB: dbHandle}
dbConnInstance = &DBConnection{DB: dbHandle, Driver: driver}
checkDBVersion(driver, dbHandle.DB)
})
}

View File

@@ -0,0 +1,28 @@
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package repository
import (
"context"
"time"
"github.com/ClusterCockpit/cc-backend/pkg/log"
)
// Hooks satisfies the sqlhook.Hooks interface
type Hooks struct{}
// Before hook will print the query with it's args and return the context with the timestamp
func (h *Hooks) Before(ctx context.Context, query string, args ...interface{}) (context.Context, error) {
log.Infof("SQL query %s %q", query, args)
return context.WithValue(ctx, "begin", time.Now()), nil
}
// After hook will get the timestamp registered on the Before hook and print the elapsed time
func (h *Hooks) After(ctx context.Context, query string, args ...interface{}) (context.Context, error) {
begin := ctx.Value("begin").(time.Time)
log.Infof("Took: %s\n", time.Since(begin))
return ctx, nil
}

View File

@@ -19,67 +19,6 @@ import (
"github.com/ClusterCockpit/cc-backend/pkg/schema"
)
// `AUTO_INCREMENT` is in a comment because of this hack:
// https://stackoverflow.com/a/41028314 (sqlite creates unique ids automatically)
const JobsDBSchema string = `
DROP TABLE IF EXISTS jobtag;
DROP TABLE IF EXISTS job;
DROP TABLE IF EXISTS tag;
CREATE TABLE job (
id INTEGER PRIMARY KEY /*!40101 AUTO_INCREMENT */,
job_id BIGINT NOT NULL,
cluster VARCHAR(255) NOT NULL,
subcluster VARCHAR(255) NOT NULL,
start_time BIGINT NOT NULL, -- Unix timestamp
user VARCHAR(255) NOT NULL,
project VARCHAR(255) NOT NULL,
` + "`partition`" + ` VARCHAR(255) NOT NULL, -- partition is a keyword in mysql -.-
array_job_id BIGINT NOT NULL,
duration INT NOT NULL DEFAULT 0,
walltime INT NOT NULL DEFAULT 0,
job_state VARCHAR(255) NOT NULL CHECK(job_state IN ('running', 'completed', 'failed', 'cancelled', 'stopped', 'timeout', 'preempted', 'out_of_memory')),
meta_data TEXT, -- JSON
resources TEXT NOT NULL, -- JSON
num_nodes INT NOT NULL,
num_hwthreads INT NOT NULL,
num_acc INT NOT NULL,
smt TINYINT NOT NULL DEFAULT 1 CHECK(smt IN (0, 1 )),
exclusive TINYINT NOT NULL DEFAULT 1 CHECK(exclusive IN (0, 1, 2)),
monitoring_status TINYINT NOT NULL DEFAULT 1 CHECK(monitoring_status IN (0, 1, 2, 3)),
mem_used_max REAL NOT NULL DEFAULT 0.0,
flops_any_avg REAL NOT NULL DEFAULT 0.0,
mem_bw_avg REAL NOT NULL DEFAULT 0.0,
load_avg REAL NOT NULL DEFAULT 0.0,
net_bw_avg REAL NOT NULL DEFAULT 0.0,
net_data_vol_total REAL NOT NULL DEFAULT 0.0,
file_bw_avg REAL NOT NULL DEFAULT 0.0,
file_data_vol_total REAL NOT NULL DEFAULT 0.0);
CREATE TABLE tag (
id INTEGER PRIMARY KEY,
tag_type VARCHAR(255) NOT NULL,
tag_name VARCHAR(255) NOT NULL,
CONSTRAINT be_unique UNIQUE (tag_type, tag_name));
CREATE TABLE jobtag (
job_id INTEGER,
tag_id INTEGER,
PRIMARY KEY (job_id, tag_id),
FOREIGN KEY (job_id) REFERENCES job (id) ON DELETE CASCADE,
FOREIGN KEY (tag_id) REFERENCES tag (id) ON DELETE CASCADE);
`
// Indexes are created after the job-archive is traversed for faster inserts.
const JobsDbIndexes string = `
CREATE INDEX job_by_user ON job (user);
CREATE INDEX job_by_starttime ON job (start_time);
CREATE INDEX job_by_job_id ON job (job_id);
CREATE INDEX job_by_state ON job (job_state);
`
const NamedJobInsert string = `INSERT INTO job (
job_id, user, project, cluster, subcluster, ` + "`partition`" + `, array_job_id, num_nodes, num_hwthreads, num_acc,
exclusive, monitoring_status, smt, job_state, start_time, duration, walltime, resources, meta_data,
@@ -95,40 +34,44 @@ func HandleImportFlag(flag string) error {
for _, pair := range strings.Split(flag, ",") {
files := strings.Split(pair, ":")
if len(files) != 2 {
return fmt.Errorf("invalid import flag format")
return fmt.Errorf("REPOSITORY/INIT > invalid import flag format")
}
raw, err := os.ReadFile(files[0])
if err != nil {
log.Warn("Error while reading metadata file for import")
return err
}
if config.Keys.Validate {
if err := schema.Validate(schema.Meta, bytes.NewReader(raw)); err != nil {
return fmt.Errorf("validate job meta: %v", err)
return fmt.Errorf("REPOSITORY/INIT > validate job meta: %v", err)
}
}
dec := json.NewDecoder(bytes.NewReader(raw))
dec.DisallowUnknownFields()
jobMeta := schema.JobMeta{BaseJob: schema.JobDefaults}
if err := dec.Decode(&jobMeta); err != nil {
log.Warn("Error while decoding raw json metadata for import")
return err
}
raw, err = os.ReadFile(files[1])
if err != nil {
log.Warn("Error while reading jobdata file for import")
return err
}
if config.Keys.Validate {
if err := schema.Validate(schema.Data, bytes.NewReader(raw)); err != nil {
return fmt.Errorf("validate job data: %v", err)
return fmt.Errorf("REPOSITORY/INIT > validate job data: %v", err)
}
}
dec = json.NewDecoder(bytes.NewReader(raw))
dec.DisallowUnknownFields()
jobData := schema.JobData{}
if err := dec.Decode(&jobData); err != nil {
log.Warn("Error while decoding raw json jobdata for import")
return err
}
@@ -136,10 +79,11 @@ func HandleImportFlag(flag string) error {
jobMeta.MonitoringStatus = schema.MonitoringStatusArchivingSuccessful
if job, err := GetJobRepository().Find(&jobMeta.JobID, &jobMeta.Cluster, &jobMeta.StartTime); err != sql.ErrNoRows {
if err != nil {
log.Warn("Error while finding job in jobRepository")
return err
}
return fmt.Errorf("a job with that jobId, cluster and startTime does already exist (dbid: %d)", job.ID)
return fmt.Errorf("REPOSITORY/INIT > a job with that jobId, cluster and startTime does already exist (dbid: %d)", job.ID)
}
job := schema.Job{
@@ -155,38 +99,45 @@ func HandleImportFlag(flag string) error {
job.FileBwAvg = loadJobStat(&jobMeta, "file_bw")
job.RawResources, err = json.Marshal(job.Resources)
if err != nil {
log.Warn("Error while marshaling job resources")
return err
}
job.RawMetaData, err = json.Marshal(job.MetaData)
if err != nil {
log.Warn("Error while marshaling job metadata")
return err
}
if err := SanityChecks(&job.BaseJob); err != nil {
log.Warn("BaseJob SanityChecks failed")
return err
}
if err := archive.GetHandle().ImportJob(&jobMeta, &jobData); err != nil {
log.Error("Error while importing job")
return err
}
res, err := GetConnection().DB.NamedExec(NamedJobInsert, job)
if err != nil {
log.Warn("Error while NamedJobInsert")
return err
}
id, err := res.LastInsertId()
if err != nil {
log.Warn("Error while getting last insert ID")
return err
}
for _, tag := range job.Tags {
if _, err := GetJobRepository().AddTagOrCreate(id, tag.Type, tag.Name); err != nil {
log.Error("Error while adding or creating tag")
return err
}
}
log.Infof("Successfully imported a new job (jobId: %d, cluster: %s, dbid: %d)", job.JobID, job.Cluster, id)
log.Infof("successfully imported a new job (jobId: %d, cluster: %s, dbid: %d)", job.JobID, job.Cluster, id)
}
return nil
}
@@ -198,21 +149,17 @@ func InitDB() error {
starttime := time.Now()
log.Print("Building job table...")
// Basic database structure:
_, err := db.DB.Exec(JobsDBSchema)
if err != nil {
return err
}
// Inserts are bundled into transactions because in sqlite,
// that speeds up inserts A LOT.
tx, err := db.DB.Beginx()
if err != nil {
log.Warn("Error while bundling transactions")
return err
}
stmt, err := tx.PrepareNamed(NamedJobInsert)
if err != nil {
log.Warn("Error while preparing namedJobInsert")
return err
}
tags := make(map[string]int64)
@@ -232,12 +179,14 @@ func InitDB() error {
if i%10 == 0 {
if tx != nil {
if err := tx.Commit(); err != nil {
log.Warn("Error while committing transactions for jobMeta")
return err
}
}
tx, err = db.DB.Beginx()
if err != nil {
log.Warn("Error while bundling transactions for jobMeta")
return err
}
@@ -260,34 +209,34 @@ func InitDB() error {
job.RawResources, err = json.Marshal(job.Resources)
if err != nil {
log.Errorf("repository initDB()- %v", err)
log.Errorf("repository initDB(): %v", err)
errorOccured++
continue
}
job.RawMetaData, err = json.Marshal(job.MetaData)
if err != nil {
log.Errorf("repository initDB()- %v", err)
log.Errorf("repository initDB(): %v", err)
errorOccured++
continue
}
if err := SanityChecks(&job.BaseJob); err != nil {
log.Errorf("repository initDB()- %v", err)
log.Errorf("repository initDB(): %v", err)
errorOccured++
continue
}
res, err := stmt.Exec(job)
if err != nil {
log.Errorf("repository initDB()- %v", err)
log.Errorf("repository initDB(): %v", err)
errorOccured++
continue
}
id, err := res.LastInsertId()
if err != nil {
log.Errorf("repository initDB()- %v", err)
log.Errorf("repository initDB(): %v", err)
errorOccured++
continue
}
@@ -298,16 +247,19 @@ func InitDB() error {
if !ok {
res, err := tx.Exec(`INSERT INTO tag (tag_name, tag_type) VALUES (?, ?)`, tag.Name, tag.Type)
if err != nil {
log.Errorf("Error while inserting tag into tag table: %v (Type %v)", tag.Name, tag.Type)
return err
}
tagId, err = res.LastInsertId()
if err != nil {
log.Warn("Error while getting last insert ID")
return err
}
tags[tagstr] = tagId
}
if _, err := tx.Exec(`INSERT INTO jobtag (job_id, tag_id) VALUES (?, ?)`, id, tagId); err != nil {
log.Errorf("Error while inserting jobtag into jobtag table: %v (TagID %v)", id, tagId)
return err
}
}
@@ -318,16 +270,11 @@ func InitDB() error {
}
if errorOccured > 0 {
log.Errorf("Error in import of %d jobs!", errorOccured)
log.Warnf("Error in import of %d jobs!", errorOccured)
}
if err := tx.Commit(); err != nil {
return err
}
// Create indexes after inserts so that they do not
// need to be continually updated.
if _, err := db.DB.Exec(JobsDbIndexes); err != nil {
log.Warn("Error while committing SQL transactions")
return err
}
@@ -338,13 +285,14 @@ func InitDB() error {
// This function also sets the subcluster if necessary!
func SanityChecks(job *schema.BaseJob) error {
if c := archive.GetCluster(job.Cluster); c == nil {
return fmt.Errorf("no such cluster: %#v", job.Cluster)
return fmt.Errorf("no such cluster: %v", job.Cluster)
}
if err := archive.AssignSubCluster(job); err != nil {
log.Warn("Error while assigning subcluster to job")
return err
}
if !job.State.Valid() {
return fmt.Errorf("not a valid job state: %#v", job.State)
return fmt.Errorf("not a valid job state: %v", job.State)
}
if len(job.Resources) == 0 || len(job.User) == 0 {
return fmt.Errorf("'resources' and 'user' should not be empty")

View File

@@ -14,9 +14,11 @@ import (
"sync"
"time"
"github.com/99designs/gqlgen/graphql"
"github.com/ClusterCockpit/cc-backend/internal/auth"
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
"github.com/ClusterCockpit/cc-backend/internal/metricdata"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
"github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/lrucache"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
@@ -30,7 +32,8 @@ var (
)
type JobRepository struct {
DB *sqlx.DB
DB *sqlx.DB
driver string
stmtCache *sq.StmtCache
cache *lrucache.Cache
@@ -44,9 +47,11 @@ func GetJobRepository() *JobRepository {
db := GetConnection()
jobRepoInstance = &JobRepository{
DB: db.DB,
stmtCache: sq.NewStmtCache(db.DB),
cache: lrucache.New(1024 * 1024),
DB: db.DB,
driver: db.Driver,
stmtCache: sq.NewStmtCache(db.DB),
cache: lrucache.New(1024 * 1024),
archiveChannel: make(chan *schema.Job, 128),
}
// start archiving worker
@@ -67,14 +72,20 @@ func scanJob(row interface{ Scan(...interface{}) error }) (*schema.Job, error) {
if err := row.Scan(
&job.ID, &job.JobID, &job.User, &job.Project, &job.Cluster, &job.SubCluster, &job.StartTimeUnix, &job.Partition, &job.ArrayJobId,
&job.NumNodes, &job.NumHWThreads, &job.NumAcc, &job.Exclusive, &job.MonitoringStatus, &job.SMT, &job.State,
&job.Duration, &job.Walltime, &job.RawResources /*&job.MetaData*/); err != nil {
&job.Duration, &job.Walltime, &job.RawResources /*&job.RawMetaData*/); err != nil {
log.Warn("Error while scanning rows")
return nil, err
}
if err := json.Unmarshal(job.RawResources, &job.Resources); err != nil {
log.Warn("Error while unmarhsaling raw resources json")
return nil, err
}
// if err := json.Unmarshal(job.RawMetaData, &job.MetaData); err != nil {
// return nil, err
// }
job.StartTime = time.Unix(job.StartTimeUnix, 0)
if job.Duration == 0 && job.State == schema.JobStateRunning {
job.Duration = int32(time.Since(job.StartTime).Seconds())
@@ -84,11 +95,14 @@ func scanJob(row interface{ Scan(...interface{}) error }) (*schema.Job, error) {
return job, nil
}
func (r *JobRepository) FetchMetadata(job *schema.Job) (map[string]string, error) {
func (r *JobRepository) FetchJobName(job *schema.Job) (*string, error) {
start := time.Now()
cachekey := fmt.Sprintf("metadata:%d", job.ID)
if cached := r.cache.Get(cachekey, nil); cached != nil {
job.MetaData = cached.(map[string]string)
return job.MetaData, nil
if jobName := job.MetaData["jobName"]; jobName != "" {
return &jobName, nil
}
}
if err := sq.Select("job.meta_data").From("job").Where("job.id = ?", job.ID).
@@ -105,6 +119,40 @@ func (r *JobRepository) FetchMetadata(job *schema.Job) (map[string]string, error
}
r.cache.Put(cachekey, job.MetaData, len(job.RawMetaData), 24*time.Hour)
log.Infof("Timer FetchJobName %s", time.Since(start))
if jobName := job.MetaData["jobName"]; jobName != "" {
return &jobName, nil
} else {
return new(string), nil
}
}
func (r *JobRepository) FetchMetadata(job *schema.Job) (map[string]string, error) {
start := time.Now()
cachekey := fmt.Sprintf("metadata:%d", job.ID)
if cached := r.cache.Get(cachekey, nil); cached != nil {
job.MetaData = cached.(map[string]string)
return job.MetaData, nil
}
if err := sq.Select("job.meta_data").From("job").Where("job.id = ?", job.ID).
RunWith(r.stmtCache).QueryRow().Scan(&job.RawMetaData); err != nil {
log.Warn("Error while scanning for job metadata")
return nil, err
}
if len(job.RawMetaData) == 0 {
return nil, nil
}
if err := json.Unmarshal(job.RawMetaData, &job.MetaData); err != nil {
log.Warn("Error while unmarshaling raw metadata json")
return nil, err
}
r.cache.Put(cachekey, job.MetaData, len(job.RawMetaData), 24*time.Hour)
log.Infof("Timer FetchMetadata %s", time.Since(start))
return job.MetaData, nil
}
@@ -113,6 +161,7 @@ func (r *JobRepository) UpdateMetadata(job *schema.Job, key, val string) (err er
r.cache.Del(cachekey)
if job.MetaData == nil {
if _, err = r.FetchMetadata(job); err != nil {
log.Warnf("Error while fetching metadata for job, DB ID '%v'", job.ID)
return err
}
}
@@ -129,10 +178,12 @@ func (r *JobRepository) UpdateMetadata(job *schema.Job, key, val string) (err er
}
if job.RawMetaData, err = json.Marshal(job.MetaData); err != nil {
log.Warnf("Error while marshaling metadata for job, DB ID '%v'", job.ID)
return err
}
if _, err = sq.Update("job").Set("meta_data", job.RawMetaData).Where("job.id = ?", job.ID).RunWith(r.stmtCache).Exec(); err != nil {
log.Warnf("Error while updating metadata for job, DB ID '%v'", job.ID)
return err
}
@@ -150,6 +201,7 @@ func (r *JobRepository) Find(
cluster *string,
startTime *int64) (*schema.Job, error) {
start := time.Now()
q := sq.Select(jobColumns...).From("job").
Where("job.job_id = ?", *jobId)
@@ -160,6 +212,7 @@ func (r *JobRepository) Find(
q = q.Where("job.start_time = ?", *startTime)
}
log.Infof("Timer Find %s", time.Since(start))
return scanJob(q.RunWith(r.stmtCache).QueryRow())
}
@@ -173,6 +226,7 @@ func (r *JobRepository) FindAll(
cluster *string,
startTime *int64) ([]*schema.Job, error) {
start := time.Now()
q := sq.Select(jobColumns...).From("job").
Where("job.job_id = ?", *jobId)
@@ -185,6 +239,7 @@ func (r *JobRepository) FindAll(
rows, err := q.RunWith(r.stmtCache).Query()
if err != nil {
log.Error("Error while running query")
return nil, err
}
@@ -192,10 +247,12 @@ func (r *JobRepository) FindAll(
for rows.Next() {
job, err := scanJob(rows)
if err != nil {
log.Warn("Error while scanning rows")
return nil, err
}
jobs = append(jobs, job)
}
log.Infof("Timer FindAll %s", time.Since(start))
return jobs, nil
}
@@ -214,12 +271,12 @@ func (r *JobRepository) FindById(jobId int64) (*schema.Job, error) {
func (r *JobRepository) Start(job *schema.JobMeta) (id int64, err error) {
job.RawResources, err = json.Marshal(job.Resources)
if err != nil {
return -1, fmt.Errorf("encoding resources field failed: %w", err)
return -1, fmt.Errorf("REPOSITORY/JOB > encoding resources field failed: %w", err)
}
job.RawMetaData, err = json.Marshal(job.MetaData)
if err != nil {
return -1, fmt.Errorf("encoding metaData field failed: %w", err)
return -1, fmt.Errorf("REPOSITORY/JOB > encoding metaData field failed: %w", err)
}
res, err := r.DB.NamedExec(`INSERT INTO job (
@@ -259,7 +316,7 @@ func (r *JobRepository) DeleteJobsBefore(startTime int64) (int, error) {
err := r.DB.Get(&cnt, qs) //ignore error as it will also occur in delete statement
_, err = r.DB.Exec(`DELETE FROM job WHERE job.start_time < ?`, startTime)
if err != nil {
log.Warnf(" DeleteJobsBefore(%d): error %v", startTime, err)
log.Errorf(" DeleteJobsBefore(%d): error %#v", startTime, err)
} else {
log.Infof("DeleteJobsBefore(%d): Deleted %d jobs", startTime, cnt)
}
@@ -269,7 +326,7 @@ func (r *JobRepository) DeleteJobsBefore(startTime int64) (int, error) {
func (r *JobRepository) DeleteJobById(id int64) error {
_, err := r.DB.Exec(`DELETE FROM job WHERE job.id = ?`, id)
if err != nil {
log.Warnf("DeleteJobById(%d): error %v", id, err)
log.Errorf("DeleteJobById(%d): error %#v", id, err)
} else {
log.Infof("DeleteJobById(%d): Success", id)
}
@@ -278,6 +335,7 @@ func (r *JobRepository) DeleteJobById(id int64) error {
// TODO: Use node hours instead: SELECT job.user, sum(job.num_nodes * (CASE WHEN job.job_state = "running" THEN CAST(strftime('%s', 'now') AS INTEGER) - job.start_time ELSE job.duration END)) as x FROM job GROUP BY user ORDER BY x DESC;
func (r *JobRepository) CountGroupedJobs(ctx context.Context, aggreg model.Aggregate, filters []*model.JobFilter, weight *model.Weights, limit *int) (map[string]int, error) {
start := time.Now()
if !aggreg.IsValid() {
return nil, errors.New("invalid aggregate")
}
@@ -292,10 +350,12 @@ func (r *JobRepository) CountGroupedJobs(ctx context.Context, aggreg model.Aggre
now := time.Now().Unix()
count = fmt.Sprintf(`sum(job.num_nodes * (CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END)) as count`, now)
runner = r.DB
default:
log.Infof("CountGroupedJobs() Weight %v unknown.", *weight)
}
}
q, qerr := SecurityCheck(ctx, sq.Select("job."+string(aggreg), count).From("job").GroupBy("job." + string(aggreg)).OrderBy("count DESC"))
q, qerr := SecurityCheck(ctx, sq.Select("job."+string(aggreg), count).From("job").GroupBy("job."+string(aggreg)).OrderBy("count DESC"))
if qerr != nil {
return nil, qerr
@@ -311,6 +371,7 @@ func (r *JobRepository) CountGroupedJobs(ctx context.Context, aggreg model.Aggre
counts := map[string]int{}
rows, err := q.RunWith(runner).Query()
if err != nil {
log.Error("Error while running query")
return nil, err
}
@@ -318,12 +379,14 @@ func (r *JobRepository) CountGroupedJobs(ctx context.Context, aggreg model.Aggre
var group string
var count int
if err := rows.Scan(&group, &count); err != nil {
log.Warn("Error while scanning rows")
return nil, err
}
counts[group] = count
}
log.Infof("Timer CountGroupedJobs %s", time.Since(start))
return counts, nil
}
@@ -360,20 +423,23 @@ func (r *JobRepository) MarkArchived(
stmt = stmt.Set("net_bw_avg", stats.Avg)
case "file_bw":
stmt = stmt.Set("file_bw_avg", stats.Avg)
default:
log.Infof("MarkArchived() Metric '%v' unknown", metric)
}
}
if _, err := stmt.RunWith(r.stmtCache).Exec(); err != nil {
log.Warn("Error while marking job as archived")
return err
}
return nil
}
// Archiving worker thread
func (r *JobRepository) archivingWorker(){
func (r *JobRepository) archivingWorker() {
for {
select {
case job, ok := <- r.archiveChannel:
case job, ok := <-r.archiveChannel:
if !ok {
break
}
@@ -407,54 +473,216 @@ func (r *JobRepository) archivingWorker(){
}
// Trigger async archiving
func (r *JobRepository) TriggerArchiving(job *schema.Job){
func (r *JobRepository) TriggerArchiving(job *schema.Job) {
r.archivePending.Add(1)
r.archiveChannel <- job
}
// Wait for background thread to finish pending archiving operations
func (r *JobRepository) WaitForArchiving(){
func (r *JobRepository) WaitForArchiving() {
// close channel and wait for worker to process remaining jobs
r.archivePending.Wait()
}
var ErrNotFound = errors.New("no such job or user")
var ErrNotFound = errors.New("no such jobname, project or user")
var ErrForbidden = errors.New("not authorized")
// FindJobOrUser returns a job database ID or a username if a job or user machtes the search term.
// As 0 is a valid job id, check if username is "" instead in order to check what machted.
// FindJobnameOrUserOrProject returns a jobName or a username or a projectId if a jobName or user or project matches the search term.
// If query is found to be an integer (= conversion to INT datatype succeeds), skip back to parent call
// If nothing matches the search, `ErrNotFound` is returned.
func (r *JobRepository) FindJobOrUser(ctx context.Context, searchterm string) (job int64, username string, err error) {
func (r *JobRepository) FindJobnameOrUserOrProject(ctx context.Context, searchterm string) (metasnip string, username string, project string, err error) {
user := auth.GetUser(ctx)
if id, err := strconv.Atoi(searchterm); err == nil {
qb := sq.Select("job.id").From("job").Where("job.job_id = ?", id)
if _, err := strconv.Atoi(searchterm); err == nil { // Return empty on successful conversion: parent method will redirect for integer jobId
return "", "", "", nil
} else { // has to have letters
if user != nil && user.HasNotRoles([]string{auth.RoleAdmin, auth.RoleSupport}) {
qb = qb.Where("job.user = ?", user.Username)
err := sq.Select("job.user").Distinct().From("job").
Where("job.user = ?", searchterm).
RunWith(r.stmtCache).QueryRow().Scan(&username)
if err != nil && err != sql.ErrNoRows {
return "", "", "", err
} else if err == nil {
return "", username, "", nil
}
if username == "" { // Try with Name2Username query
errtwo := sq.Select("user.username").Distinct().From("user").
Where("user.name LIKE ?", fmt.Sprint("%"+searchterm+"%")).
RunWith(r.stmtCache).QueryRow().Scan(&username)
if errtwo != nil && errtwo != sql.ErrNoRows {
return "", "", "", errtwo
} else if errtwo == nil {
return "", username, "", nil
}
}
}
err := qb.RunWith(r.stmtCache).QueryRow().Scan(&job)
if user == nil || user.HasRole(auth.RoleAdmin) || user.HasRole(auth.RoleSupport) {
err := sq.Select("job.project").Distinct().From("job").
Where("job.project = ?", searchterm).
RunWith(r.stmtCache).QueryRow().Scan(&project)
if err != nil && err != sql.ErrNoRows {
return "", "", "", err
} else if err == nil {
return "", "", project, nil
}
}
// All Authorizations: If unlabeled query not username or projectId, try for jobname: Match Metadata, on hit, parent method redirects to jobName GQL query
err := sq.Select("job.cluster").Distinct().From("job").
Where("job.meta_data LIKE ?", "%"+searchterm+"%").
RunWith(r.stmtCache).QueryRow().Scan(&metasnip)
if err != nil && err != sql.ErrNoRows {
return 0, "", err
return "", "", "", err
} else if err == nil {
return job, "", nil
return metasnip[0:1], "", "", nil
}
}
return "", "", "", ErrNotFound
}
}
func (r *JobRepository) FindUser(ctx context.Context, searchterm string) (username string, err error) {
user := auth.GetUser(ctx)
if user == nil || user.HasAnyRole([]string{auth.RoleAdmin, auth.RoleSupport}) {
err := sq.Select("job.user").Distinct().From("job").
Where("job.user = ?", searchterm).
RunWith(r.stmtCache).QueryRow().Scan(&username)
if err != nil && err != sql.ErrNoRows {
return 0, "", err
return "", err
} else if err == nil {
return 0, username, nil
return username, nil
}
}
return "", ErrNotFound
return 0, "", ErrNotFound
} else {
log.Infof("Non-Admin User %s : Requested Query Username -> %s: Forbidden", user.Name, searchterm)
return "", ErrForbidden
}
}
func (r *JobRepository) FindUserByName(ctx context.Context, searchterm string) (username string, err error) {
user := auth.GetUser(ctx)
if user == nil || user.HasRole(auth.RoleAdmin) || user.HasRole(auth.RoleSupport) {
err := sq.Select("user.username").Distinct().From("user").
Where("user.name = ?", searchterm).
RunWith(r.stmtCache).QueryRow().Scan(&username)
if err != nil && err != sql.ErrNoRows {
return "", err
} else if err == nil {
return username, nil
}
return "", ErrNotFound
} else {
log.Infof("Non-Admin User %s : Requested Query Name -> %s: Forbidden", user.Name, searchterm)
return "", ErrForbidden
}
}
func (r *JobRepository) FindUsers(ctx context.Context, searchterm string) (usernames []string, err error) {
user := auth.GetUser(ctx)
emptyResult := make([]string, 0)
if user == nil || user.HasRole(auth.RoleAdmin) || user.HasRole(auth.RoleSupport) {
rows, err := sq.Select("job.user").Distinct().From("job").
Where("job.user LIKE ?", fmt.Sprint("%", searchterm, "%")).
RunWith(r.stmtCache).Query()
if err != nil && err != sql.ErrNoRows {
return emptyResult, err
} else if err == nil {
for rows.Next() {
var name string
err := rows.Scan(&name)
if err != nil {
rows.Close()
log.Warnf("Error while scanning rows: %v", err)
return emptyResult, err
}
usernames = append(usernames, name)
}
return usernames, nil
}
return emptyResult, ErrNotFound
} else {
log.Infof("Non-Admin User %s : Requested Query Usernames -> %s: Forbidden", user.Name, searchterm)
return emptyResult, ErrForbidden
}
}
func (r *JobRepository) FindUsersByName(ctx context.Context, searchterm string) (usernames []string, err error) {
user := auth.GetUser(ctx)
emptyResult := make([]string, 0)
if user == nil || user.HasRole(auth.RoleAdmin) || user.HasRole(auth.RoleSupport) {
rows, err := sq.Select("user.username").Distinct().From("user").
Where("user.name LIKE ?", fmt.Sprint("%", searchterm, "%")).
RunWith(r.stmtCache).Query()
if err != nil && err != sql.ErrNoRows {
return emptyResult, err
} else if err == nil {
for rows.Next() {
var username string
err := rows.Scan(&username)
if err != nil {
rows.Close()
log.Warnf("Error while scanning rows: %v", err)
return emptyResult, err
}
usernames = append(usernames, username)
}
return usernames, nil
}
return emptyResult, ErrNotFound
} else {
log.Infof("Non-Admin User %s : Requested Query name -> %s: Forbidden", user.Name, searchterm)
return emptyResult, ErrForbidden
}
}
func (r *JobRepository) FindNameByUser(ctx context.Context, searchterm string) (name string, err error) {
user := auth.GetUser(ctx)
if user == nil || user.HasRole(auth.RoleAdmin) || user.HasRole(auth.RoleSupport) {
err := sq.Select("user.name").Distinct().From("user").
Where("user.username = ?", searchterm).
RunWith(r.stmtCache).QueryRow().Scan(&name)
if err != nil && err != sql.ErrNoRows {
return "", err
} else if err == nil {
return name, nil
}
return "", ErrNotFound
} else {
log.Infof("Non-Admin User %s : Requested Query Name -> %s: Forbidden", user.Name, searchterm)
return "", ErrForbidden
}
}
func (r *JobRepository) FindProject(ctx context.Context, searchterm string) (project string, err error) {
user := auth.GetUser(ctx)
if user == nil || user.HasRole(auth.RoleAdmin) || user.HasRole(auth.RoleSupport) {
err := sq.Select("job.project").Distinct().From("job").
Where("job.project = ?", searchterm).
RunWith(r.stmtCache).QueryRow().Scan(&project)
if err != nil && err != sql.ErrNoRows {
return "", err
} else if err == nil {
return project, nil
}
return "", ErrNotFound
} else {
log.Infof("Non-Admin User %s : Requested Query Project -> %s: Forbidden", user.Name, project)
return "", ErrForbidden
}
}
func (r *JobRepository) Partitions(cluster string) ([]string, error) {
var err error
start := time.Now()
partitions := r.cache.Get("partitions:"+cluster, func() (interface{}, time.Duration, int) {
parts := []string{}
if err = r.DB.Select(&parts, `SELECT DISTINCT job.partition FROM job WHERE job.cluster = ?;`, cluster); err != nil {
@@ -466,18 +694,22 @@ func (r *JobRepository) Partitions(cluster string) ([]string, error) {
if err != nil {
return nil, err
}
log.Infof("Timer Partitions %s", time.Since(start))
return partitions.([]string), nil
}
// AllocatedNodes returns a map of all subclusters to a map of hostnames to the amount of jobs running on that host.
// Hosts with zero jobs running on them will not show up!
func (r *JobRepository) AllocatedNodes(cluster string) (map[string]map[string]int, error) {
start := time.Now()
subclusters := make(map[string]map[string]int)
rows, err := sq.Select("resources", "subcluster").From("job").
Where("job.job_state = 'running'").
Where("job.cluster = ?", cluster).
RunWith(r.stmtCache).Query()
if err != nil {
log.Error("Error while running query")
return nil, err
}
@@ -488,9 +720,11 @@ func (r *JobRepository) AllocatedNodes(cluster string) (map[string]map[string]in
var resources []*schema.Resource
var subcluster string
if err := rows.Scan(&raw, &subcluster); err != nil {
log.Warn("Error while scanning rows")
return nil, err
}
if err := json.Unmarshal(raw, &resources); err != nil {
log.Warn("Error while unmarshaling raw resources json")
return nil, err
}
@@ -505,10 +739,13 @@ func (r *JobRepository) AllocatedNodes(cluster string) (map[string]map[string]in
}
}
log.Infof("Timer AllocatedNodes %s", time.Since(start))
return subclusters, nil
}
func (r *JobRepository) StopJobsExceedingWalltimeBy(seconds int) error {
start := time.Now()
res, err := sq.Update("job").
Set("monitoring_status", schema.MonitoringStatusArchivingFailed).
Set("duration", 0).
@@ -518,16 +755,243 @@ func (r *JobRepository) StopJobsExceedingWalltimeBy(seconds int) error {
Where(fmt.Sprintf("(%d - job.start_time) > (job.walltime + %d)", time.Now().Unix(), seconds)).
RunWith(r.DB).Exec()
if err != nil {
log.Warn("Error while stopping jobs exceeding walltime")
return err
}
rowsAffected, err := res.RowsAffected()
if err != nil {
log.Warn("Error while fetching affected rows after stopping due to exceeded walltime")
return err
}
if rowsAffected > 0 {
log.Warnf("%d jobs have been marked as failed due to running too long", rowsAffected)
log.Infof("%d jobs have been marked as failed due to running too long", rowsAffected)
}
log.Infof("Timer StopJobsExceedingWalltimeBy %s", time.Since(start))
return nil
}
// TODO: Move to config
const ShortJobDuration int = 5 * 60
// GraphQL validation should make sure that no unkown values can be specified.
var groupBy2column = map[model.Aggregate]string{
model.AggregateUser: "job.user",
model.AggregateProject: "job.project",
model.AggregateCluster: "job.cluster",
}
// Helper function for the jobsStatistics GraphQL query placed here so that schema.resolvers.go is not too full.
func (r *JobRepository) JobsStatistics(ctx context.Context,
filter []*model.JobFilter,
groupBy *model.Aggregate) ([]*model.JobsStatistics, error) {
start := time.Now()
// In case `groupBy` is nil (not used), the model.JobsStatistics used is at the key '' (empty string)
stats := map[string]*model.JobsStatistics{}
var castType string
if r.driver == "sqlite3" {
castType = "int"
} else if r.driver == "mysql" {
castType = "unsigned"
}
// `socketsPerNode` and `coresPerSocket` can differ from cluster to cluster, so we need to explicitly loop over those.
for _, cluster := range archive.Clusters {
for _, subcluster := range cluster.SubClusters {
corehoursCol := fmt.Sprintf("CAST(ROUND(SUM(job.duration * job.num_nodes * %d * %d) / 3600) as %s)", subcluster.SocketsPerNode, subcluster.CoresPerSocket, castType)
var rawQuery sq.SelectBuilder
if groupBy == nil {
rawQuery = sq.Select(
"''",
"COUNT(job.id)",
fmt.Sprintf("CAST(ROUND(SUM(job.duration) / 3600) as %s)", castType),
corehoursCol,
).From("job")
} else {
col := groupBy2column[*groupBy]
rawQuery = sq.Select(
col,
"COUNT(job.id)",
fmt.Sprintf("CAST(ROUND(SUM(job.duration) / 3600) as %s)", castType),
corehoursCol,
).From("job").GroupBy(col)
}
rawQuery = rawQuery.
Where("job.cluster = ?", cluster.Name).
Where("job.subcluster = ?", subcluster.Name)
query, qerr := SecurityCheck(ctx, rawQuery)
if qerr != nil {
return nil, qerr
}
for _, f := range filter {
query = BuildWhereClause(f, query)
}
rows, err := query.RunWith(r.DB).Query()
if err != nil {
log.Warn("Error while querying DB for job statistics")
return nil, err
}
for rows.Next() {
var id sql.NullString
var jobs, walltime, corehours sql.NullInt64
if err := rows.Scan(&id, &jobs, &walltime, &corehours); err != nil {
log.Warn("Error while scanning rows")
return nil, err
}
if id.Valid {
if s, ok := stats[id.String]; ok {
s.TotalJobs += int(jobs.Int64)
s.TotalWalltime += int(walltime.Int64)
s.TotalCoreHours += int(corehours.Int64)
} else {
stats[id.String] = &model.JobsStatistics{
ID: id.String,
TotalJobs: int(jobs.Int64),
TotalWalltime: int(walltime.Int64),
TotalCoreHours: int(corehours.Int64),
}
}
}
}
}
}
if groupBy == nil {
query := sq.Select("COUNT(job.id)").From("job").Where("job.duration < ?", ShortJobDuration)
query, qerr := SecurityCheck(ctx, query)
if qerr != nil {
return nil, qerr
}
for _, f := range filter {
query = BuildWhereClause(f, query)
}
if err := query.RunWith(r.DB).QueryRow().Scan(&(stats[""].ShortJobs)); err != nil {
log.Warn("Error while scanning rows for short job stats")
return nil, err
}
} else {
col := groupBy2column[*groupBy]
query := sq.Select(col, "COUNT(job.id)").From("job").Where("job.duration < ?", ShortJobDuration)
query, qerr := SecurityCheck(ctx, query)
if qerr != nil {
return nil, qerr
}
for _, f := range filter {
query = BuildWhereClause(f, query)
}
rows, err := query.RunWith(r.DB).Query()
if err != nil {
log.Warn("Error while querying jobs for short jobs")
return nil, err
}
for rows.Next() {
var id sql.NullString
var shortJobs sql.NullInt64
if err := rows.Scan(&id, &shortJobs); err != nil {
log.Warn("Error while scanning rows for short jobs")
return nil, err
}
if id.Valid {
stats[id.String].ShortJobs = int(shortJobs.Int64)
}
}
}
// Calculating the histogram data is expensive, so only do it if needed.
// An explicit resolver can not be used because we need to know the filters.
histogramsNeeded := false
fields := graphql.CollectFieldsCtx(ctx, nil)
for _, col := range fields {
if col.Name == "histDuration" || col.Name == "histNumNodes" {
histogramsNeeded = true
}
}
res := make([]*model.JobsStatistics, 0, len(stats))
for _, stat := range stats {
res = append(res, stat)
id, col := "", ""
if groupBy != nil {
id = stat.ID
col = groupBy2column[*groupBy]
}
if histogramsNeeded {
var err error
value := fmt.Sprintf(`CAST(ROUND((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) / 3600) as %s) as value`, time.Now().Unix(), castType)
stat.HistDuration, err = r.jobsStatisticsHistogram(ctx, value, filter, id, col)
if err != nil {
log.Warn("Error while loading job statistics histogram: running jobs")
return nil, err
}
stat.HistNumNodes, err = r.jobsStatisticsHistogram(ctx, "job.num_nodes as value", filter, id, col)
if err != nil {
log.Warn("Error while loading job statistics histogram: num nodes")
return nil, err
}
}
}
log.Infof("Timer JobStatistics %s", time.Since(start))
return res, nil
}
// `value` must be the column grouped by, but renamed to "value". `id` and `col` can optionally be used
// to add a condition to the query of the kind "<col> = <id>".
func (r *JobRepository) jobsStatisticsHistogram(ctx context.Context,
value string, filters []*model.JobFilter, id, col string) ([]*model.HistoPoint, error) {
start := time.Now()
query := sq.Select(value, "COUNT(job.id) AS count").From("job")
query, qerr := SecurityCheck(ctx, sq.Select(value, "COUNT(job.id) AS count").From("job"))
if qerr != nil {
return nil, qerr
}
for _, f := range filters {
query = BuildWhereClause(f, query)
}
if len(id) != 0 && len(col) != 0 {
query = query.Where(col+" = ?", id)
}
rows, err := query.GroupBy("value").RunWith(r.DB).Query()
if err != nil {
log.Error("Error while running query")
return nil, err
}
points := make([]*model.HistoPoint, 0)
for rows.Next() {
point := model.HistoPoint{}
if err := rows.Scan(&point.Value, &point.Count); err != nil {
log.Warn("Error while scanning rows")
return nil, err
}
points = append(points, &point)
}
log.Infof("Timer jobsStatisticsHistogram %s", time.Since(start))
return points, nil
}

View File

@@ -8,10 +8,12 @@ import (
"fmt"
"testing"
"github.com/ClusterCockpit/cc-backend/pkg/log"
_ "github.com/mattn/go-sqlite3"
)
func init() {
log.Init("info", true)
Connect("sqlite3", "../../test/test.db")
}

View File

@@ -0,0 +1,109 @@
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package repository
import (
"database/sql"
"embed"
"fmt"
"os"
"github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/golang-migrate/migrate/v4"
"github.com/golang-migrate/migrate/v4/database/mysql"
"github.com/golang-migrate/migrate/v4/database/sqlite3"
"github.com/golang-migrate/migrate/v4/source/iofs"
)
const supportedVersion uint = 2
//go:embed migrations/*
var migrationFiles embed.FS
func checkDBVersion(backend string, db *sql.DB) {
var m *migrate.Migrate
if backend == "sqlite3" {
driver, err := sqlite3.WithInstance(db, &sqlite3.Config{})
if err != nil {
log.Fatal(err)
}
d, err := iofs.New(migrationFiles, "migrations/sqlite3")
if err != nil {
log.Fatal(err)
}
m, err = migrate.NewWithInstance("iofs", d, "sqlite3", driver)
if err != nil {
log.Fatal(err)
}
} else if backend == "mysql" {
driver, err := mysql.WithInstance(db, &mysql.Config{})
if err != nil {
log.Fatal(err)
}
d, err := iofs.New(migrationFiles, "migrations/mysql")
if err != nil {
log.Fatal(err)
}
m, err = migrate.NewWithInstance("iofs", d, "mysql", driver)
if err != nil {
log.Fatal(err)
}
}
v, _, err := m.Version()
if err != nil {
if err == migrate.ErrNilVersion {
log.Warn("Legacy database without version or missing database file!")
} else {
log.Fatal(err)
}
}
if v < supportedVersion {
log.Warnf("Unsupported database version %d, need %d.\nPlease backup your database file and run cc-backend --migrate-db", v, supportedVersion)
os.Exit(0)
}
if v > supportedVersion {
log.Warnf("Unsupported database version %d, need %d.\nPlease refer to documentation how to downgrade db with external migrate tool!", v, supportedVersion)
os.Exit(0)
}
}
func MigrateDB(backend string, db string) {
var m *migrate.Migrate
if backend == "sqlite3" {
d, err := iofs.New(migrationFiles, "migrations/sqlite3")
if err != nil {
log.Fatal(err)
}
m, err = migrate.NewWithSourceInstance("iofs", d, fmt.Sprintf("sqlite3://%s?_foreign_keys=on", db))
if err != nil {
log.Fatal(err)
}
} else if backend == "mysql" {
d, err := iofs.New(migrationFiles, "migrations/mysql")
if err != nil {
log.Fatal(err)
}
m, err = migrate.NewWithSourceInstance("iofs", d, fmt.Sprintf("mysql://%s?multiStatements=true", db))
if err != nil {
log.Fatal(err)
}
}
if err := m.Up(); err != nil {
log.Fatal(err)
}
m.Close()
}

View File

@@ -0,0 +1,5 @@
DROP TABLE IF EXISTS job;
DROP TABLE IF EXISTS tags;
DROP TABLE IF EXISTS jobtag;
DROP TABLE IF EXISTS configuration;
DROP TABLE IF EXISTS user;

View File

@@ -0,0 +1,62 @@
CREATE TABLE IF NOT EXISTS job (
id INTEGER AUTO_INCREMENT PRIMARY KEY ,
job_id BIGINT NOT NULL,
cluster VARCHAR(255) NOT NULL,
subcluster VARCHAR(255) NOT NULL,
start_time BIGINT NOT NULL, -- Unix timestamp
user VARCHAR(255) NOT NULL,
project VARCHAR(255) NOT NULL,
`partition` VARCHAR(255) NOT NULL,
array_job_id BIGINT NOT NULL,
duration INT NOT NULL DEFAULT 0,
walltime INT NOT NULL DEFAULT 0,
job_state VARCHAR(255) NOT NULL
CHECK(job_state IN ('running', 'completed', 'failed', 'cancelled',
'stopped', 'timeout', 'preempted', 'out_of_memory')),
meta_data TEXT, -- JSON
resources TEXT NOT NULL, -- JSON
num_nodes INT NOT NULL,
num_hwthreads INT NOT NULL,
num_acc INT NOT NULL,
smt TINYINT NOT NULL DEFAULT 1 CHECK(smt IN (0, 1 )),
exclusive TINYINT NOT NULL DEFAULT 1 CHECK(exclusive IN (0, 1, 2)),
monitoring_status TINYINT NOT NULL DEFAULT 1 CHECK(monitoring_status IN (0, 1, 2, 3)),
mem_used_max REAL NOT NULL DEFAULT 0.0,
flops_any_avg REAL NOT NULL DEFAULT 0.0,
mem_bw_avg REAL NOT NULL DEFAULT 0.0,
load_avg REAL NOT NULL DEFAULT 0.0,
net_bw_avg REAL NOT NULL DEFAULT 0.0,
net_data_vol_total REAL NOT NULL DEFAULT 0.0,
file_bw_avg REAL NOT NULL DEFAULT 0.0,
file_data_vol_total REAL NOT NULL DEFAULT 0.0);
CREATE TABLE IF NOT EXISTS tag (
id INTEGER PRIMARY KEY,
tag_type VARCHAR(255) NOT NULL,
tag_name VARCHAR(255) NOT NULL,
CONSTRAINT be_unique UNIQUE (tag_type, tag_name));
CREATE TABLE IF NOT EXISTS jobtag (
job_id INTEGER,
tag_id INTEGER,
PRIMARY KEY (job_id, tag_id),
FOREIGN KEY (job_id) REFERENCES job (id) ON DELETE CASCADE,
FOREIGN KEY (tag_id) REFERENCES tag (id) ON DELETE CASCADE);
CREATE TABLE IF NOT EXISTS configuration (
username varchar(255),
confkey varchar(255),
value varchar(255),
PRIMARY KEY (username, confkey),
FOREIGN KEY (username) REFERENCES user (username) ON DELETE CASCADE ON UPDATE NO ACTION);
CREATE TABLE IF NOT EXISTS user (
username varchar(255) PRIMARY KEY NOT NULL,
password varchar(255) DEFAULT NULL,
ldap tinyint NOT NULL DEFAULT 0, /* col called "ldap" for historic reasons, fills the "AuthSource" */
name varchar(255) DEFAULT NULL,
roles varchar(255) NOT NULL DEFAULT "[]",
email varchar(255) DEFAULT NULL);

View File

@@ -0,0 +1,5 @@
DROP INDEX IF EXISTS job_stats;
DROP INDEX IF EXISTS job_by_user;
DROP INDEX IF EXISTS job_by_starttime;
DROP INDEX IF EXISTS job_by_job_id;
DROP INDEX IF EXISTS job_by_state;

View File

@@ -0,0 +1,5 @@
CREATE INDEX IF NOT EXISTS job_stats ON job (cluster,subcluster,user);
CREATE INDEX IF NOT EXISTS job_by_user ON job (user);
CREATE INDEX IF NOT EXISTS job_by_starttime ON job (start_time);
CREATE INDEX IF NOT EXISTS job_by_job_id ON job (job_id);
CREATE INDEX IF NOT EXISTS job_by_state ON job (job_state);

View File

@@ -0,0 +1,5 @@
DROP TABLE IF EXISTS job;
DROP TABLE IF EXISTS tags;
DROP TABLE IF EXISTS jobtag;
DROP TABLE IF EXISTS configuration;
DROP TABLE IF EXISTS user;

View File

@@ -0,0 +1,62 @@
CREATE TABLE IF NOT EXISTS job (
id INTEGER PRIMARY KEY,
job_id BIGINT NOT NULL,
cluster VARCHAR(255) NOT NULL,
subcluster VARCHAR(255) NOT NULL,
start_time BIGINT NOT NULL, -- Unix timestamp
user VARCHAR(255) NOT NULL,
project VARCHAR(255) NOT NULL,
partition VARCHAR(255) NOT NULL,
array_job_id BIGINT NOT NULL,
duration INT NOT NULL DEFAULT 0,
walltime INT NOT NULL DEFAULT 0,
job_state VARCHAR(255) NOT NULL
CHECK(job_state IN ('running', 'completed', 'failed', 'cancelled',
'stopped', 'timeout', 'preempted', 'out_of_memory')),
meta_data TEXT, -- JSON
resources TEXT NOT NULL, -- JSON
num_nodes INT NOT NULL,
num_hwthreads INT NOT NULL,
num_acc INT NOT NULL,
smt TINYINT NOT NULL DEFAULT 1 CHECK(smt IN (0, 1 )),
exclusive TINYINT NOT NULL DEFAULT 1 CHECK(exclusive IN (0, 1, 2)),
monitoring_status TINYINT NOT NULL DEFAULT 1 CHECK(monitoring_status IN (0, 1, 2, 3)),
mem_used_max REAL NOT NULL DEFAULT 0.0,
flops_any_avg REAL NOT NULL DEFAULT 0.0,
mem_bw_avg REAL NOT NULL DEFAULT 0.0,
load_avg REAL NOT NULL DEFAULT 0.0,
net_bw_avg REAL NOT NULL DEFAULT 0.0,
net_data_vol_total REAL NOT NULL DEFAULT 0.0,
file_bw_avg REAL NOT NULL DEFAULT 0.0,
file_data_vol_total REAL NOT NULL DEFAULT 0.0);
CREATE TABLE IF NOT EXISTS tag (
id INTEGER PRIMARY KEY,
tag_type VARCHAR(255) NOT NULL,
tag_name VARCHAR(255) NOT NULL,
CONSTRAINT be_unique UNIQUE (tag_type, tag_name));
CREATE TABLE IF NOT EXISTS jobtag (
job_id INTEGER,
tag_id INTEGER,
PRIMARY KEY (job_id, tag_id),
FOREIGN KEY (job_id) REFERENCES job (id) ON DELETE CASCADE,
FOREIGN KEY (tag_id) REFERENCES tag (id) ON DELETE CASCADE);
CREATE TABLE IF NOT EXISTS configuration (
username varchar(255),
confkey varchar(255),
value varchar(255),
PRIMARY KEY (username, confkey),
FOREIGN KEY (username) REFERENCES user (username) ON DELETE CASCADE ON UPDATE NO ACTION);
CREATE TABLE IF NOT EXISTS user (
username varchar(255) PRIMARY KEY NOT NULL,
password varchar(255) DEFAULT NULL,
ldap tinyint NOT NULL DEFAULT 0, /* col called "ldap" for historic reasons, fills the "AuthSource" */
name varchar(255) DEFAULT NULL,
roles varchar(255) NOT NULL DEFAULT "[]",
email varchar(255) DEFAULT NULL);

View File

@@ -0,0 +1,5 @@
DROP INDEX IF EXISTS job_stats;
DROP INDEX IF EXISTS job_by_user;
DROP INDEX IF EXISTS job_by_starttime;
DROP INDEX IF EXISTS job_by_job_id;
DROP INDEX IF EXISTS job_by_state;

View File

@@ -0,0 +1,5 @@
CREATE INDEX IF NOT EXISTS job_stats ON job (cluster,subcluster,user);
CREATE INDEX IF NOT EXISTS job_by_user ON job (user);
CREATE INDEX IF NOT EXISTS job_by_starttime ON job (start_time);
CREATE INDEX IF NOT EXISTS job_by_job_id ON job (job_id);
CREATE INDEX IF NOT EXISTS job_by_state ON job (job_state);

View File

@@ -39,7 +39,7 @@ func (r *JobRepository) QueryJobs(
} else if order.Order == model.SortDirectionEnumDesc {
query = query.OrderBy(fmt.Sprintf("job.%s DESC", field))
} else {
return nil, errors.New("invalid sorting order")
return nil, errors.New("REPOSITORY/QUERY > invalid sorting order")
}
}
@@ -54,12 +54,14 @@ func (r *JobRepository) QueryJobs(
sql, args, err := query.ToSql()
if err != nil {
log.Warn("Error while converting query to sql")
return nil, err
}
log.Debugf("SQL query: `%s`, args: %#v", sql, args)
rows, err := query.RunWith(r.stmtCache).Query()
if err != nil {
log.Error("Error while running query")
return nil, err
}
@@ -68,6 +70,7 @@ func (r *JobRepository) QueryJobs(
job, err := scanJob(rows)
if err != nil {
rows.Close()
log.Warn("Error while scanning rows")
return nil, err
}
jobs = append(jobs, job)
@@ -135,6 +138,9 @@ func BuildWhereClause(filter *model.JobFilter, query sq.SelectBuilder) sq.Select
if filter.Project != nil {
query = buildStringCondition("job.project", filter.Project, query)
}
if filter.JobName != nil {
query = buildStringCondition("job.meta_data", filter.JobName, query)
}
if filter.Cluster != nil {
query = buildStringCondition("job.cluster", filter.Cluster, query)
}
@@ -217,6 +223,13 @@ func buildStringCondition(field string, cond *model.StringInput, query sq.Select
if cond.Contains != nil {
return query.Where(field+" LIKE ?", fmt.Sprint("%", *cond.Contains, "%"))
}
if cond.In != nil {
queryUsers := make([]string, len(cond.In))
for i, val := range cond.In {
queryUsers[i] = val
}
return query.Where(sq.Or{sq.Eq{"job.user": queryUsers}})
}
return query
}
@@ -226,7 +239,7 @@ var matchAllCap = regexp.MustCompile("([a-z0-9])([A-Z])")
func toSnakeCase(str string) string {
for _, c := range str {
if c == '\'' || c == '\\' {
panic("A hacker (probably not)!!!")
log.Panic("toSnakeCase() attack vector!")
}
}

View File

@@ -9,22 +9,26 @@ import (
"github.com/ClusterCockpit/cc-backend/pkg/archive"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
"github.com/ClusterCockpit/cc-backend/pkg/log"
sq "github.com/Masterminds/squirrel"
)
// Add the tag with id `tagId` to the job with the database id `jobId`.
func (r *JobRepository) AddTag(job int64, tag int64) ([]*schema.Tag, error) {
if _, err := r.stmtCache.Exec(`INSERT INTO jobtag (job_id, tag_id) VALUES ($1, $2)`, job, tag); err != nil {
log.Error("Error while running query")
return nil, err
}
j, err := r.FindById(job)
if err != nil {
log.Warn("Error while finding job by id")
return nil, err
}
tags, err := r.GetTags(&job)
if err != nil {
log.Warn("Error while getting tags for job")
return nil, err
}
@@ -34,16 +38,19 @@ func (r *JobRepository) AddTag(job int64, tag int64) ([]*schema.Tag, error) {
// Removes a tag from a job
func (r *JobRepository) RemoveTag(job, tag int64) ([]*schema.Tag, error) {
if _, err := r.stmtCache.Exec("DELETE FROM jobtag WHERE jobtag.job_id = $1 AND jobtag.tag_id = $2", job, tag); err != nil {
log.Error("Error while running query")
return nil, err
}
j, err := r.FindById(job)
if err != nil {
log.Warn("Error while finding job by id")
return nil, err
}
tags, err := r.GetTags(&job)
if err != nil {
log.Warn("Error while getting tags for job")
return nil, err
}
@@ -144,6 +151,7 @@ func (r *JobRepository) GetTags(job *int64) ([]*schema.Tag, error) {
rows, err := q.RunWith(r.stmtCache).Query()
if err != nil {
log.Error("Error while running query")
return nil, err
}
@@ -151,6 +159,7 @@ func (r *JobRepository) GetTags(job *int64) ([]*schema.Tag, error) {
for rows.Next() {
tag := &schema.Tag{}
if err := rows.Scan(&tag.ID, &tag.Type, &tag.Name); err != nil {
log.Warn("Error while scanning rows")
return nil, err
}
tags = append(tags, tag)

View File

@@ -6,13 +6,13 @@ package repository
import (
"encoding/json"
"log"
"sync"
"time"
"github.com/ClusterCockpit/cc-backend/internal/auth"
"github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-backend/pkg/lrucache"
"github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/jmoiron/sqlx"
)
@@ -33,21 +33,9 @@ func GetUserCfgRepo() *UserCfgRepo {
userCfgRepoOnce.Do(func() {
db := GetConnection()
_, err := db.DB.Exec(`
CREATE TABLE IF NOT EXISTS configuration (
username varchar(255),
confkey varchar(255),
value varchar(255),
PRIMARY KEY (username, confkey),
FOREIGN KEY (username) REFERENCES user (username) ON DELETE CASCADE ON UPDATE NO ACTION);`)
if err != nil {
log.Fatal(err)
}
lookupConfigStmt, err := db.DB.Preparex(`SELECT confkey, value FROM configuration WHERE configuration.username = ?`)
if err != nil {
log.Fatal(err)
log.Fatalf("db.DB.Preparex() error: %v", err)
}
userCfgRepoInstance = &UserCfgRepo{
@@ -82,6 +70,7 @@ func (uCfg *UserCfgRepo) GetUIConfig(user *auth.User) (map[string]interface{}, e
rows, err := uCfg.Lookup.Query(user.Username)
if err != nil {
log.Warnf("Error while looking up user config for user '%v'", user.Username)
return err, 0, 0
}
@@ -90,11 +79,13 @@ func (uCfg *UserCfgRepo) GetUIConfig(user *auth.User) (map[string]interface{}, e
for rows.Next() {
var key, rawval string
if err := rows.Scan(&key, &rawval); err != nil {
log.Warn("Error while scanning user config values")
return err, 0, 0
}
var val interface{}
if err := json.Unmarshal([]byte(rawval), &val); err != nil {
log.Warn("Error while unmarshaling raw user config json")
return err, 0, 0
}
@@ -106,6 +97,7 @@ func (uCfg *UserCfgRepo) GetUIConfig(user *auth.User) (map[string]interface{}, e
return config, 24 * time.Hour, size
})
if err, ok := data.(error); ok {
log.Error("Error in returned dataset")
return nil, err
}
@@ -122,6 +114,7 @@ func (uCfg *UserCfgRepo) UpdateConfig(
if user == nil {
var val interface{}
if err := json.Unmarshal([]byte(value), &val); err != nil {
log.Warn("Error while unmarshaling raw user config json")
return err
}
@@ -131,8 +124,8 @@ func (uCfg *UserCfgRepo) UpdateConfig(
return nil
}
if _, err := uCfg.DB.Exec(`REPLACE INTO configuration (username, confkey, value) VALUES (?, ?, ?)`,
user.Username, key, value); err != nil {
if _, err := uCfg.DB.Exec(`REPLACE INTO configuration (username, confkey, value) VALUES (?, ?, ?)`, user, key, value); err != nil {
log.Warnf("Error while replacing user config in DB for user '%v'", user)
return err
}