Merge branch 'dev' into 134-job-tagging

This commit is contained in:
2025-05-13 14:48:58 +02:00
232 changed files with 32360 additions and 16001 deletions

View File

@@ -59,17 +59,15 @@ func Connect(driver string, db string) {
} else {
dbHandle, err = sqlx.Open("sqlite3", opts.URL)
}
if err != nil {
log.Fatal(err)
}
case "mysql":
opts.URL += "?multiStatements=true"
dbHandle, err = sqlx.Open("mysql", opts.URL)
if err != nil {
log.Fatalf("sqlx.Open() error: %v", err)
}
default:
log.Fatalf("unsupported database driver: %s", driver)
log.Abortf("DB Connection: Unsupported database driver '%s'.\n", driver)
}
if err != nil {
log.Abortf("DB Connection: Could not connect to '%s' database with sqlx.Open().\nError: %s\n", driver, err.Error())
}
dbHandle.SetMaxOpenConns(opts.MaxOpenConnections)
@@ -80,7 +78,7 @@ func Connect(driver string, db string) {
dbConnInstance = &DBConnection{DB: dbHandle, Driver: driver}
err = checkDBVersion(driver, dbHandle.DB)
if err != nil {
log.Fatal(err)
log.Abortf("DB Connection: Failed DB version check.\nError: %s\n", err.Error())
}
})
}

View File

@@ -5,17 +5,16 @@
package repository
import (
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"math"
"strconv"
"sync"
"time"
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
"github.com/ClusterCockpit/cc-backend/internal/metricdata"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
"github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/lrucache"
@@ -30,12 +29,10 @@ var (
)
type JobRepository struct {
DB *sqlx.DB
stmtCache *sq.StmtCache
cache *lrucache.Cache
archiveChannel chan *schema.Job
driver string
archivePending sync.WaitGroup
DB *sqlx.DB
stmtCache *sq.StmtCache
cache *lrucache.Cache
driver string
}
func GetJobRepository() *JobRepository {
@@ -46,47 +43,48 @@ func GetJobRepository() *JobRepository {
DB: db.DB,
driver: db.Driver,
stmtCache: sq.NewStmtCache(db.DB),
cache: lrucache.New(1024 * 1024),
archiveChannel: make(chan *schema.Job, 128),
stmtCache: sq.NewStmtCache(db.DB),
cache: lrucache.New(1024 * 1024),
}
// start archiving worker
go jobRepoInstance.archivingWorker()
})
return jobRepoInstance
}
var jobColumns []string = []string{
"job.id", "job.job_id", "job.user", "job.project", "job.cluster", "job.subcluster", "job.start_time", "job.partition", "job.array_job_id",
"job.id", "job.job_id", "job.hpc_user", "job.project", "job.cluster", "job.subcluster", "job.start_time", "job.cluster_partition", "job.array_job_id",
"job.num_nodes", "job.num_hwthreads", "job.num_acc", "job.exclusive", "job.monitoring_status", "job.smt", "job.job_state",
"job.duration", "job.walltime", "job.resources", "job.mem_used_max", "job.flops_any_avg", "job.mem_bw_avg", "job.load_avg", // "job.meta_data",
"job.duration", "job.walltime", "job.resources", "job.footprint", "job.energy",
}
func scanJob(row interface{ Scan(...interface{}) error }) (*schema.Job, error) {
job := &schema.Job{}
if err := row.Scan(
&job.ID, &job.JobID, &job.User, &job.Project, &job.Cluster, &job.SubCluster, &job.StartTimeUnix, &job.Partition, &job.ArrayJobId,
&job.NumNodes, &job.NumHWThreads, &job.NumAcc, &job.Exclusive, &job.MonitoringStatus, &job.SMT, &job.State,
&job.Duration, &job.Walltime, &job.RawResources, &job.MemUsedMax, &job.FlopsAnyAvg, &job.MemBwAvg, &job.LoadAvg /*&job.RawMetaData*/); err != nil {
&job.Duration, &job.Walltime, &job.RawResources, &job.RawFootprint, &job.Energy); err != nil {
log.Warnf("Error while scanning rows (Job): %v", err)
return nil, err
}
if err := json.Unmarshal(job.RawResources, &job.Resources); err != nil {
log.Warn("Error while unmarhsaling raw resources json")
log.Warn("Error while unmarshaling raw resources json")
return nil, err
}
job.RawResources = nil
// if err := json.Unmarshal(job.RawMetaData, &job.MetaData); err != nil {
// return nil, err
// }
if err := json.Unmarshal(job.RawFootprint, &job.Footprint); err != nil {
log.Warnf("Error while unmarshaling raw footprint json: %v", err)
return nil, err
}
job.RawFootprint = nil
job.StartTime = time.Unix(job.StartTimeUnix, 0)
if job.Duration == 0 && job.State == schema.JobStateRunning {
// Always ensure accurate duration for running jobs
if job.State == schema.JobStateRunning {
job.Duration = int32(time.Since(job.StartTime).Seconds())
}
job.RawResources = nil
return job, nil
}
@@ -205,7 +203,10 @@ func (r *JobRepository) UpdateMetadata(job *schema.Job, key, val string) (err er
return err
}
if _, err = sq.Update("job").Set("meta_data", job.RawMetaData).Where("job.id = ?", job.ID).RunWith(r.stmtCache).Exec(); err != nil {
if _, err = sq.Update("job").
Set("meta_data", job.RawMetaData).
Where("job.id = ?", job.ID).
RunWith(r.stmtCache).Exec(); err != nil {
log.Warnf("Error while updating metadata for job, DB ID '%v'", job.ID)
return err
}
@@ -214,222 +215,54 @@ func (r *JobRepository) UpdateMetadata(job *schema.Job, key, val string) (err er
return archive.UpdateMetadata(job, job.MetaData)
}
// Find executes a SQL query to find a specific batch job.
// The job is queried using the batch job id, the cluster name,
// and the start time of the job in UNIX epoch time seconds.
// It returns a pointer to a schema.Job data structure and an error variable.
// To check if no job was found test err == sql.ErrNoRows
func (r *JobRepository) Find(
jobId *int64,
cluster *string,
startTime *int64,
) (*schema.Job, error) {
func (r *JobRepository) FetchFootprint(job *schema.Job) (map[string]float64, error) {
start := time.Now()
q := sq.Select(jobColumns...).From("job").
Where("job.job_id = ?", *jobId)
if cluster != nil {
q = q.Where("job.cluster = ?", *cluster)
}
if startTime != nil {
q = q.Where("job.start_time = ?", *startTime)
}
log.Debugf("Timer Find %s", time.Since(start))
return scanJob(q.RunWith(r.stmtCache).QueryRow())
}
// Find executes a SQL query to find a specific batch job.
// The job is queried using the batch job id, the cluster name,
// and the start time of the job in UNIX epoch time seconds.
// It returns a pointer to a schema.Job data structure and an error variable.
// To check if no job was found test err == sql.ErrNoRows
func (r *JobRepository) FindAll(
jobId *int64,
cluster *string,
startTime *int64,
) ([]*schema.Job, error) {
start := time.Now()
q := sq.Select(jobColumns...).From("job").
Where("job.job_id = ?", *jobId)
if cluster != nil {
q = q.Where("job.cluster = ?", *cluster)
}
if startTime != nil {
q = q.Where("job.start_time = ?", *startTime)
}
rows, err := q.RunWith(r.stmtCache).Query()
if err != nil {
log.Error("Error while running query")
if err := sq.Select("job.footprint").From("job").Where("job.id = ?", job.ID).
RunWith(r.stmtCache).QueryRow().Scan(&job.RawFootprint); err != nil {
log.Warn("Error while scanning for job footprint")
return nil, err
}
jobs := make([]*schema.Job, 0, 10)
for rows.Next() {
job, err := scanJob(rows)
if err != nil {
log.Warn("Error while scanning rows")
return nil, err
}
jobs = append(jobs, job)
}
log.Debugf("Timer FindAll %s", time.Since(start))
return jobs, nil
}
// FindById executes a SQL query to find a specific batch job.
// The job is queried using the database id.
// It returns a pointer to a schema.Job data structure and an error variable.
// To check if no job was found test err == sql.ErrNoRows
func (r *JobRepository) FindById(jobId int64) (*schema.Job, error) {
q := sq.Select(jobColumns...).
From("job").Where("job.id = ?", jobId)
return scanJob(q.RunWith(r.stmtCache).QueryRow())
}
func (r *JobRepository) FindConcurrentJobs(
ctx context.Context,
job *schema.Job,
) (*model.JobLinkResultList, error) {
if job == nil {
if len(job.RawFootprint) == 0 {
return nil, nil
}
query, qerr := SecurityCheck(ctx, sq.Select("job.id", "job.job_id", "job.start_time").From("job"))
if qerr != nil {
return nil, qerr
}
query = query.Where("cluster = ?", job.Cluster)
var startTime int64
var stopTime int64
startTime = job.StartTimeUnix
hostname := job.Resources[0].Hostname
if job.State == schema.JobStateRunning {
stopTime = time.Now().Unix()
} else {
stopTime = startTime + int64(job.Duration)
}
// Add 200s overlap for jobs start time at the end
startTimeTail := startTime + 10
stopTimeTail := stopTime - 200
startTimeFront := startTime + 200
queryRunning := query.Where("job.job_state = ?").Where("(job.start_time BETWEEN ? AND ? OR job.start_time < ?)",
"running", startTimeTail, stopTimeTail, startTime)
queryRunning = queryRunning.Where("job.resources LIKE ?", fmt.Sprint("%", hostname, "%"))
query = query.Where("job.job_state != ?").Where("((job.start_time BETWEEN ? AND ?) OR (job.start_time + job.duration) BETWEEN ? AND ? OR (job.start_time < ?) AND (job.start_time + job.duration) > ?)",
"running", startTimeTail, stopTimeTail, startTimeFront, stopTimeTail, startTime, stopTime)
query = query.Where("job.resources LIKE ?", fmt.Sprint("%", hostname, "%"))
rows, err := query.RunWith(r.stmtCache).Query()
if err != nil {
log.Errorf("Error while running query: %v", err)
if err := json.Unmarshal(job.RawFootprint, &job.Footprint); err != nil {
log.Warn("Error while unmarshaling raw footprint json")
return nil, err
}
items := make([]*model.JobLink, 0, 10)
queryString := fmt.Sprintf("cluster=%s", job.Cluster)
log.Debugf("Timer FetchFootprint %s", time.Since(start))
return job.Footprint, nil
}
for rows.Next() {
var id, jobId, startTime sql.NullInt64
if err = rows.Scan(&id, &jobId, &startTime); err != nil {
log.Warn("Error while scanning rows")
return nil, err
}
if id.Valid {
queryString += fmt.Sprintf("&jobId=%d", int(jobId.Int64))
items = append(items,
&model.JobLink{
ID: fmt.Sprint(id.Int64),
JobID: int(jobId.Int64),
})
}
func (r *JobRepository) FetchEnergyFootprint(job *schema.Job) (map[string]float64, error) {
start := time.Now()
cachekey := fmt.Sprintf("energyFootprint:%d", job.ID)
if cached := r.cache.Get(cachekey, nil); cached != nil {
job.EnergyFootprint = cached.(map[string]float64)
return job.EnergyFootprint, nil
}
rows, err = queryRunning.RunWith(r.stmtCache).Query()
if err != nil {
log.Errorf("Error while running query: %v", err)
if err := sq.Select("job.energy_footprint").From("job").Where("job.id = ?", job.ID).
RunWith(r.stmtCache).QueryRow().Scan(&job.RawEnergyFootprint); err != nil {
log.Warn("Error while scanning for job energy_footprint")
return nil, err
}
for rows.Next() {
var id, jobId, startTime sql.NullInt64
if err := rows.Scan(&id, &jobId, &startTime); err != nil {
log.Warn("Error while scanning rows")
return nil, err
}
if id.Valid {
queryString += fmt.Sprintf("&jobId=%d", int(jobId.Int64))
items = append(items,
&model.JobLink{
ID: fmt.Sprint(id.Int64),
JobID: int(jobId.Int64),
})
}
if len(job.RawEnergyFootprint) == 0 {
return nil, nil
}
cnt := len(items)
return &model.JobLinkResultList{
ListQuery: &queryString,
Items: items,
Count: &cnt,
}, nil
}
// Start inserts a new job in the table, returning the unique job ID.
// Statistics are not transfered!
func (r *JobRepository) Start(job *schema.JobMeta) (id int64, err error) {
job.RawResources, err = json.Marshal(job.Resources)
if err != nil {
return -1, fmt.Errorf("REPOSITORY/JOB > encoding resources field failed: %w", err)
if err := json.Unmarshal(job.RawEnergyFootprint, &job.EnergyFootprint); err != nil {
log.Warn("Error while unmarshaling raw energy footprint json")
return nil, err
}
job.RawMetaData, err = json.Marshal(job.MetaData)
if err != nil {
return -1, fmt.Errorf("REPOSITORY/JOB > encoding metaData field failed: %w", err)
}
res, err := r.DB.NamedExec(`INSERT INTO job (
job_id, user, project, cluster, subcluster, `+"`partition`"+`, array_job_id, num_nodes, num_hwthreads, num_acc,
exclusive, monitoring_status, smt, job_state, start_time, duration, walltime, resources, meta_data
) VALUES (
:job_id, :user, :project, :cluster, :subcluster, :partition, :array_job_id, :num_nodes, :num_hwthreads, :num_acc,
:exclusive, :monitoring_status, :smt, :job_state, :start_time, :duration, :walltime, :resources, :meta_data
);`, job)
if err != nil {
return -1, err
}
return res.LastInsertId()
}
// Stop updates the job with the database id jobId using the provided arguments.
func (r *JobRepository) Stop(
jobId int64,
duration int32,
state schema.JobState,
monitoringStatus int32,
) (err error) {
stmt := sq.Update("job").
Set("job_state", state).
Set("duration", duration).
Set("monitoring_status", monitoringStatus).
Where("job.id = ?", jobId)
_, err = stmt.RunWith(r.stmtCache).Exec()
return
r.cache.Put(cachekey, job.EnergyFootprint, len(job.EnergyFootprint), 24*time.Hour)
log.Debugf("Timer FetchEnergyFootprint %s", time.Since(start))
return job.EnergyFootprint, nil
}
func (r *JobRepository) DeleteJobsBefore(startTime int64) (int, error) {
@@ -461,119 +294,22 @@ func (r *JobRepository) DeleteJobById(id int64) error {
return err
}
func (r *JobRepository) UpdateMonitoringStatus(job int64, monitoringStatus int32) (err error) {
stmt := sq.Update("job").
Set("monitoring_status", monitoringStatus).
Where("job.id = ?", job)
_, err = stmt.RunWith(r.stmtCache).Exec()
return
}
// Stop updates the job with the database id jobId using the provided arguments.
func (r *JobRepository) MarkArchived(
jobId int64,
monitoringStatus int32,
metricStats map[string]schema.JobStatistics,
) error {
stmt := sq.Update("job").
Set("monitoring_status", monitoringStatus).
Where("job.id = ?", jobId)
for metric, stats := range metricStats {
switch metric {
case "flops_any":
stmt = stmt.Set("flops_any_avg", stats.Avg)
case "mem_used":
stmt = stmt.Set("mem_used_max", stats.Max)
case "mem_bw":
stmt = stmt.Set("mem_bw_avg", stats.Avg)
case "load":
stmt = stmt.Set("load_avg", stats.Avg)
case "cpu_load":
stmt = stmt.Set("load_avg", stats.Avg)
case "net_bw":
stmt = stmt.Set("net_bw_avg", stats.Avg)
case "file_bw":
stmt = stmt.Set("file_bw_avg", stats.Avg)
default:
log.Debugf("MarkArchived() Metric '%v' unknown", metric)
}
}
if _, err := stmt.RunWith(r.stmtCache).Exec(); err != nil {
log.Warn("Error while marking job as archived")
return err
}
return nil
}
// Archiving worker thread
func (r *JobRepository) archivingWorker() {
for {
select {
case job, ok := <-r.archiveChannel:
if !ok {
break
}
start := time.Now()
// not using meta data, called to load JobMeta into Cache?
// will fail if job meta not in repository
if _, err := r.FetchMetadata(job); err != nil {
log.Errorf("archiving job (dbid: %d) failed: %s", job.ID, err.Error())
r.UpdateMonitoringStatus(job.ID, schema.MonitoringStatusArchivingFailed)
continue
}
// metricdata.ArchiveJob will fetch all the data from a MetricDataRepository and push into configured archive backend
// TODO: Maybe use context with cancel/timeout here
jobMeta, err := metricdata.ArchiveJob(job, context.Background())
if err != nil {
log.Errorf("archiving job (dbid: %d) failed: %s", job.ID, err.Error())
r.UpdateMonitoringStatus(job.ID, schema.MonitoringStatusArchivingFailed)
continue
}
// Update the jobs database entry one last time:
if err := r.MarkArchived(job.ID, schema.MonitoringStatusArchivingSuccessful, jobMeta.Statistics); err != nil {
log.Errorf("archiving job (dbid: %d) failed: %s", job.ID, err.Error())
continue
}
log.Debugf("archiving job %d took %s", job.JobID, time.Since(start))
log.Printf("archiving job (dbid: %d) successful", job.ID)
r.archivePending.Done()
}
}
}
// Trigger async archiving
func (r *JobRepository) TriggerArchiving(job *schema.Job) {
r.archivePending.Add(1)
r.archiveChannel <- job
}
// Wait for background thread to finish pending archiving operations
func (r *JobRepository) WaitForArchiving() {
// close channel and wait for worker to process remaining jobs
r.archivePending.Wait()
}
func (r *JobRepository) FindUserOrProjectOrJobname(user *schema.User, searchterm string) (jobid string, username string, project string, jobname string) {
if _, err := strconv.Atoi(searchterm); err == nil { // Return empty on successful conversion: parent method will redirect for integer jobId
return searchterm, "", "", ""
} else { // Has to have letters and logged-in user for other guesses
if user != nil {
// Find username in jobs (match)
uresult, _ := r.FindColumnValue(user, searchterm, "job", "user", "user", false)
// Find username by username in job table (match)
uresult, _ := r.FindColumnValue(user, searchterm, "job", "hpc_user", "hpc_user", false)
if uresult != "" {
return "", uresult, "", ""
}
// Find username by name (like)
nresult, _ := r.FindColumnValue(user, searchterm, "user", "username", "name", true)
// Find username by real name in hpc_user table (like)
nresult, _ := r.FindColumnValue(user, searchterm, "hpc_user", "username", "name", true)
if nresult != "" {
return "", nresult, "", ""
}
// Find projectId in jobs (match)
// Find projectId by projectId in job table (match)
presult, _ := r.FindColumnValue(user, searchterm, "job", "project", "project", false)
if presult != "" {
return "", "", presult, ""
@@ -655,7 +391,7 @@ func (r *JobRepository) Partitions(cluster string) ([]string, error) {
start := time.Now()
partitions := r.cache.Get("partitions:"+cluster, func() (interface{}, time.Duration, int) {
parts := []string{}
if err = r.DB.Select(&parts, `SELECT DISTINCT job.partition FROM job WHERE job.cluster = ?;`, cluster); err != nil {
if err = r.DB.Select(&parts, `SELECT DISTINCT job.cluster_partition FROM job WHERE job.cluster = ?;`, cluster); err != nil {
return nil, 0, 1000
}
@@ -712,6 +448,7 @@ func (r *JobRepository) AllocatedNodes(cluster string) (map[string]map[string]in
return subclusters, nil
}
// FIXME: Set duration to requested walltime?
func (r *JobRepository) StopJobsExceedingWalltimeBy(seconds int) error {
start := time.Now()
res, err := sq.Update("job").
@@ -740,6 +477,46 @@ func (r *JobRepository) StopJobsExceedingWalltimeBy(seconds int) error {
return nil
}
func (r *JobRepository) FindRunningJobs(cluster string) ([]*schema.Job, error) {
query := sq.Select(jobColumns...).From("job").
Where(fmt.Sprintf("job.cluster = '%s'", cluster)).
Where("job.job_state = 'running'").
Where("job.duration > 600")
rows, err := query.RunWith(r.stmtCache).Query()
if err != nil {
log.Error("Error while running query")
return nil, err
}
jobs := make([]*schema.Job, 0, 50)
for rows.Next() {
job, err := scanJob(rows)
if err != nil {
rows.Close()
log.Warn("Error while scanning rows")
return nil, err
}
jobs = append(jobs, job)
}
log.Infof("Return job count %d", len(jobs))
return jobs, nil
}
func (r *JobRepository) UpdateDuration() error {
stmnt := sq.Update("job").
Set("duration", sq.Expr("? - job.start_time", time.Now().Unix())).
Where("job_state = 'running'")
_, err := stmnt.RunWith(r.stmtCache).Exec()
if err != nil {
return err
}
return nil
}
func (r *JobRepository) FindJobsBetween(startTimeBegin int64, startTimeEnd int64) ([]*schema.Job, error) {
var query sq.SelectBuilder
@@ -778,27 +555,118 @@ func (r *JobRepository) FindJobsBetween(startTimeBegin int64, startTimeEnd int64
return jobs, nil
}
const NamedJobInsert string = `INSERT INTO job (
job_id, user, project, cluster, subcluster, ` + "`partition`" + `, array_job_id, num_nodes, num_hwthreads, num_acc,
exclusive, monitoring_status, smt, job_state, start_time, duration, walltime, resources, meta_data,
mem_used_max, flops_any_avg, mem_bw_avg, load_avg, net_bw_avg, net_data_vol_total, file_bw_avg, file_data_vol_total
) VALUES (
:job_id, :user, :project, :cluster, :subcluster, :partition, :array_job_id, :num_nodes, :num_hwthreads, :num_acc,
:exclusive, :monitoring_status, :smt, :job_state, :start_time, :duration, :walltime, :resources, :meta_data,
:mem_used_max, :flops_any_avg, :mem_bw_avg, :load_avg, :net_bw_avg, :net_data_vol_total, :file_bw_avg, :file_data_vol_total
);`
func (r *JobRepository) UpdateMonitoringStatus(job int64, monitoringStatus int32) (err error) {
stmt := sq.Update("job").
Set("monitoring_status", monitoringStatus).
Where("job.id = ?", job)
func (r *JobRepository) InsertJob(job *schema.Job) (int64, error) {
res, err := r.DB.NamedExec(NamedJobInsert, job)
if err != nil {
log.Warn("Error while NamedJobInsert")
return 0, err
}
id, err := res.LastInsertId()
if err != nil {
log.Warn("Error while getting last insert ID")
return 0, err
}
return id, nil
_, err = stmt.RunWith(r.stmtCache).Exec()
return
}
func (r *JobRepository) Execute(stmt sq.UpdateBuilder) error {
if _, err := stmt.RunWith(r.stmtCache).Exec(); err != nil {
return err
}
return nil
}
func (r *JobRepository) MarkArchived(
stmt sq.UpdateBuilder,
monitoringStatus int32,
) sq.UpdateBuilder {
return stmt.Set("monitoring_status", monitoringStatus)
}
func (r *JobRepository) UpdateEnergy(
stmt sq.UpdateBuilder,
jobMeta *schema.JobMeta,
) (sq.UpdateBuilder, error) {
/* Note: Only Called for Running Jobs during Intermediate Update or on Archiving */
sc, err := archive.GetSubCluster(jobMeta.Cluster, jobMeta.SubCluster)
if err != nil {
log.Errorf("cannot get subcluster: %s", err.Error())
return stmt, err
}
energyFootprint := make(map[string]float64)
// Total Job Energy Outside Loop
totalEnergy := 0.0
for _, fp := range sc.EnergyFootprint {
// Always Init Metric Energy Inside Loop
metricEnergy := 0.0
if i, err := archive.MetricIndex(sc.MetricConfig, fp); err == nil {
// Note: For DB data, calculate and save as kWh
if sc.MetricConfig[i].Energy == "energy" { // this metric has energy as unit (Joules or Wh)
log.Warnf("Update EnergyFootprint for Job %d and Metric %s on cluster %s: Set to 'energy' in cluster.json: Not implemented, will return 0.0", jobMeta.JobID, jobMeta.Cluster, fp)
// FIXME: Needs sum as stats type
} else if sc.MetricConfig[i].Energy == "power" { // this metric has power as unit (Watt)
// Energy: Power (in Watts) * Time (in Seconds)
// Unit: (W * (s / 3600)) / 1000 = kWh
// Round 2 Digits: round(Energy * 100) / 100
// Here: (All-Node Metric Average * Number of Nodes) * (Job Duration in Seconds / 3600) / 1000
// Note: Shared Jobs handled correctly since "Node Average" is based on partial resources, while "numNodes" factor is 1
rawEnergy := ((LoadJobStat(jobMeta, fp, "avg") * float64(jobMeta.NumNodes)) * (float64(jobMeta.Duration) / 3600.0)) / 1000.0
metricEnergy = math.Round(rawEnergy*100.0) / 100.0
}
} else {
log.Warnf("Error while collecting energy metric %s for job, DB ID '%v', return '0.0'", fp, jobMeta.ID)
}
energyFootprint[fp] = metricEnergy
totalEnergy += metricEnergy
// log.Infof("Metric %s Average %f -> %f kWh | Job %d Total -> %f kWh", fp, LoadJobStat(jobMeta, fp, "avg"), energy, jobMeta.JobID, totalEnergy)
}
var rawFootprint []byte
if rawFootprint, err = json.Marshal(energyFootprint); err != nil {
log.Warnf("Error while marshaling energy footprint for job INTO BYTES, DB ID '%v'", jobMeta.ID)
return stmt, err
}
return stmt.Set("energy_footprint", string(rawFootprint)).Set("energy", (math.Round(totalEnergy*100.0) / 100.0)), nil
}
func (r *JobRepository) UpdateFootprint(
stmt sq.UpdateBuilder,
jobMeta *schema.JobMeta,
) (sq.UpdateBuilder, error) {
/* Note: Only Called for Running Jobs during Intermediate Update or on Archiving */
sc, err := archive.GetSubCluster(jobMeta.Cluster, jobMeta.SubCluster)
if err != nil {
log.Errorf("cannot get subcluster: %s", err.Error())
return stmt, err
}
footprint := make(map[string]float64)
for _, fp := range sc.Footprint {
var statType string
for _, gm := range archive.GlobalMetricList {
if gm.Name == fp {
statType = gm.Footprint
}
}
if statType != "avg" && statType != "min" && statType != "max" {
log.Warnf("unknown statType for footprint update: %s", statType)
return stmt, fmt.Errorf("unknown statType for footprint update: %s", statType)
}
if i, err := archive.MetricIndex(sc.MetricConfig, fp); err != nil {
statType = sc.MetricConfig[i].Footprint
}
name := fmt.Sprintf("%s_%s", fp, statType)
footprint[name] = LoadJobStat(jobMeta, fp, statType)
}
var rawFootprint []byte
if rawFootprint, err = json.Marshal(footprint); err != nil {
log.Warnf("Error while marshaling footprint for job INTO BYTES, DB ID '%v'", jobMeta.ID)
return stmt, err
}
return stmt.Set("footprint", string(rawFootprint)), nil
}

View File

@@ -0,0 +1,75 @@
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package repository
import (
"encoding/json"
"fmt"
"github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
sq "github.com/Masterminds/squirrel"
)
const NamedJobInsert string = `INSERT INTO job (
job_id, hpc_user, project, cluster, subcluster, cluster_partition, array_job_id, num_nodes, num_hwthreads, num_acc,
exclusive, monitoring_status, smt, job_state, start_time, duration, walltime, footprint, energy, energy_footprint, resources, meta_data
) VALUES (
:job_id, :hpc_user, :project, :cluster, :subcluster, :cluster_partition, :array_job_id, :num_nodes, :num_hwthreads, :num_acc,
:exclusive, :monitoring_status, :smt, :job_state, :start_time, :duration, :walltime, :footprint, :energy, :energy_footprint, :resources, :meta_data
);`
func (r *JobRepository) InsertJob(job *schema.JobMeta) (int64, error) {
res, err := r.DB.NamedExec(NamedJobInsert, job)
if err != nil {
log.Warn("Error while NamedJobInsert")
return 0, err
}
id, err := res.LastInsertId()
if err != nil {
log.Warn("Error while getting last insert ID")
return 0, err
}
return id, nil
}
// Start inserts a new job in the table, returning the unique job ID.
// Statistics are not transfered!
func (r *JobRepository) Start(job *schema.JobMeta) (id int64, err error) {
job.RawFootprint, err = json.Marshal(job.Footprint)
if err != nil {
return -1, fmt.Errorf("REPOSITORY/JOB > encoding footprint field failed: %w", err)
}
job.RawResources, err = json.Marshal(job.Resources)
if err != nil {
return -1, fmt.Errorf("REPOSITORY/JOB > encoding resources field failed: %w", err)
}
job.RawMetaData, err = json.Marshal(job.MetaData)
if err != nil {
return -1, fmt.Errorf("REPOSITORY/JOB > encoding metaData field failed: %w", err)
}
return r.InsertJob(job)
}
// Stop updates the job with the database id jobId using the provided arguments.
func (r *JobRepository) Stop(
jobId int64,
duration int32,
state schema.JobState,
monitoringStatus int32,
) (err error) {
stmt := sq.Update("job").
Set("job_state", state).
Set("duration", duration).
Set("monitoring_status", monitoringStatus).
Where("job.id = ?", jobId)
_, err = stmt.RunWith(r.stmtCache).Exec()
return
}

View File

@@ -0,0 +1,263 @@
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package repository
import (
"context"
"database/sql"
"fmt"
"time"
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
"github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
sq "github.com/Masterminds/squirrel"
)
// Find executes a SQL query to find a specific batch job.
// The job is queried using the batch job id, the cluster name,
// and the start time of the job in UNIX epoch time seconds.
// It returns a pointer to a schema.Job data structure and an error variable.
// To check if no job was found test err == sql.ErrNoRows
func (r *JobRepository) Find(
jobId *int64,
cluster *string,
startTime *int64,
) (*schema.Job, error) {
start := time.Now()
q := sq.Select(jobColumns...).From("job").
Where("job.job_id = ?", *jobId)
if cluster != nil {
q = q.Where("job.cluster = ?", *cluster)
}
if startTime != nil {
q = q.Where("job.start_time = ?", *startTime)
}
q = q.OrderBy("job.id DESC") // always use newest matching job by db id if more than one match
log.Debugf("Timer Find %s", time.Since(start))
return scanJob(q.RunWith(r.stmtCache).QueryRow())
}
// Find executes a SQL query to find a specific batch job.
// The job is queried using the batch job id, the cluster name,
// and the start time of the job in UNIX epoch time seconds.
// It returns a pointer to a schema.Job data structure and an error variable.
// To check if no job was found test err == sql.ErrNoRows
func (r *JobRepository) FindAll(
jobId *int64,
cluster *string,
startTime *int64,
) ([]*schema.Job, error) {
start := time.Now()
q := sq.Select(jobColumns...).From("job").
Where("job.job_id = ?", *jobId)
if cluster != nil {
q = q.Where("job.cluster = ?", *cluster)
}
if startTime != nil {
q = q.Where("job.start_time = ?", *startTime)
}
rows, err := q.RunWith(r.stmtCache).Query()
if err != nil {
log.Error("Error while running query")
return nil, err
}
jobs := make([]*schema.Job, 0, 10)
for rows.Next() {
job, err := scanJob(rows)
if err != nil {
log.Warn("Error while scanning rows")
return nil, err
}
jobs = append(jobs, job)
}
log.Debugf("Timer FindAll %s", time.Since(start))
return jobs, nil
}
// FindById executes a SQL query to find a specific batch job.
// The job is queried using the database id.
// It returns a pointer to a schema.Job data structure and an error variable.
// To check if no job was found test err == sql.ErrNoRows
func (r *JobRepository) FindById(ctx context.Context, jobId int64) (*schema.Job, error) {
q := sq.Select(jobColumns...).
From("job").Where("job.id = ?", jobId)
q, qerr := SecurityCheck(ctx, q)
if qerr != nil {
return nil, qerr
}
return scanJob(q.RunWith(r.stmtCache).QueryRow())
}
// FindByIdWithUser executes a SQL query to find a specific batch job.
// The job is queried using the database id. The user is passed directly,
// instead as part of the context.
// It returns a pointer to a schema.Job data structure and an error variable.
// To check if no job was found test err == sql.ErrNoRows
func (r *JobRepository) FindByIdWithUser(user *schema.User, jobId int64) (*schema.Job, error) {
q := sq.Select(jobColumns...).
From("job").Where("job.id = ?", jobId)
q, qerr := SecurityCheckWithUser(user, q)
if qerr != nil {
return nil, qerr
}
return scanJob(q.RunWith(r.stmtCache).QueryRow())
}
// FindByIdDirect executes a SQL query to find a specific batch job.
// The job is queried using the database id.
// It returns a pointer to a schema.Job data structure and an error variable.
// To check if no job was found test err == sql.ErrNoRows
func (r *JobRepository) FindByIdDirect(jobId int64) (*schema.Job, error) {
q := sq.Select(jobColumns...).
From("job").Where("job.id = ?", jobId)
return scanJob(q.RunWith(r.stmtCache).QueryRow())
}
// FindByJobId executes a SQL query to find a specific batch job.
// The job is queried using the slurm id and the clustername.
// It returns a pointer to a schema.Job data structure and an error variable.
// To check if no job was found test err == sql.ErrNoRows
func (r *JobRepository) FindByJobId(ctx context.Context, jobId int64, startTime int64, cluster string) (*schema.Job, error) {
q := sq.Select(jobColumns...).
From("job").
Where("job.job_id = ?", jobId).
Where("job.cluster = ?", cluster).
Where("job.start_time = ?", startTime)
q, qerr := SecurityCheck(ctx, q)
if qerr != nil {
return nil, qerr
}
return scanJob(q.RunWith(r.stmtCache).QueryRow())
}
// IsJobOwner executes a SQL query to find a specific batch job.
// The job is queried using the slurm id,a username and the cluster.
// It returns a bool.
// If job was found, user is owner: test err != sql.ErrNoRows
func (r *JobRepository) IsJobOwner(jobId int64, startTime int64, user string, cluster string) bool {
q := sq.Select("id").
From("job").
Where("job.job_id = ?", jobId).
Where("job.hpc_user = ?", user).
Where("job.cluster = ?", cluster).
Where("job.start_time = ?", startTime)
_, err := scanJob(q.RunWith(r.stmtCache).QueryRow())
return err != sql.ErrNoRows
}
func (r *JobRepository) FindConcurrentJobs(
ctx context.Context,
job *schema.Job,
) (*model.JobLinkResultList, error) {
if job == nil {
return nil, nil
}
query, qerr := SecurityCheck(ctx, sq.Select("job.id", "job.job_id", "job.start_time").From("job"))
if qerr != nil {
return nil, qerr
}
query = query.Where("cluster = ?", job.Cluster)
var startTime int64
var stopTime int64
startTime = job.StartTimeUnix
hostname := job.Resources[0].Hostname
if job.State == schema.JobStateRunning {
stopTime = time.Now().Unix()
} else {
stopTime = startTime + int64(job.Duration)
}
// Add 200s overlap for jobs start time at the end
startTimeTail := startTime + 10
stopTimeTail := stopTime - 200
startTimeFront := startTime + 200
queryRunning := query.Where("job.job_state = ?").Where("(job.start_time BETWEEN ? AND ? OR job.start_time < ?)",
"running", startTimeTail, stopTimeTail, startTime)
// Get At Least One Exact Hostname Match from JSON Resources Array in Database
queryRunning = queryRunning.Where("EXISTS (SELECT 1 FROM json_each(job.resources) WHERE json_extract(value, '$.hostname') = ?)", hostname)
query = query.Where("job.job_state != ?").Where("((job.start_time BETWEEN ? AND ?) OR (job.start_time + job.duration) BETWEEN ? AND ? OR (job.start_time < ?) AND (job.start_time + job.duration) > ?)",
"running", startTimeTail, stopTimeTail, startTimeFront, stopTimeTail, startTime, stopTime)
// Get At Least One Exact Hostname Match from JSON Resources Array in Database
query = query.Where("EXISTS (SELECT 1 FROM json_each(job.resources) WHERE json_extract(value, '$.hostname') = ?)", hostname)
rows, err := query.RunWith(r.stmtCache).Query()
if err != nil {
log.Errorf("Error while running query: %v", err)
return nil, err
}
items := make([]*model.JobLink, 0, 10)
queryString := fmt.Sprintf("cluster=%s", job.Cluster)
for rows.Next() {
var id, jobId, startTime sql.NullInt64
if err = rows.Scan(&id, &jobId, &startTime); err != nil {
log.Warn("Error while scanning rows")
return nil, err
}
if id.Valid {
queryString += fmt.Sprintf("&jobId=%d", int(jobId.Int64))
items = append(items,
&model.JobLink{
ID: fmt.Sprint(id.Int64),
JobID: int(jobId.Int64),
})
}
}
rows, err = queryRunning.RunWith(r.stmtCache).Query()
if err != nil {
log.Errorf("Error while running query: %v", err)
return nil, err
}
for rows.Next() {
var id, jobId, startTime sql.NullInt64
if err := rows.Scan(&id, &jobId, &startTime); err != nil {
log.Warn("Error while scanning rows")
return nil, err
}
if id.Valid {
queryString += fmt.Sprintf("&jobId=%d", int(jobId.Int64))
items = append(items,
&model.JobLink{
ID: fmt.Sprint(id.Int64),
JobID: int(jobId.Int64),
})
}
}
cnt := len(items)
return &model.JobLinkResultList{
ListQuery: &queryString,
Items: items,
Count: &cnt,
}, nil
}

View File

@@ -0,0 +1,349 @@
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package repository
import (
"context"
"errors"
"fmt"
"regexp"
"strings"
"time"
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
"github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
sq "github.com/Masterminds/squirrel"
)
func (r *JobRepository) QueryJobs(
ctx context.Context,
filters []*model.JobFilter,
page *model.PageRequest,
order *model.OrderByInput,
) ([]*schema.Job, error) {
query, qerr := SecurityCheck(ctx, sq.Select(jobColumns...).From("job"))
if qerr != nil {
return nil, qerr
}
if order != nil {
field := toSnakeCase(order.Field)
if order.Type == "col" {
// "col": Fixed column name query
switch order.Order {
case model.SortDirectionEnumAsc:
query = query.OrderBy(fmt.Sprintf("job.%s ASC", field))
case model.SortDirectionEnumDesc:
query = query.OrderBy(fmt.Sprintf("job.%s DESC", field))
default:
return nil, errors.New("REPOSITORY/QUERY > invalid sorting order for column")
}
} else {
// "foot": Order by footprint JSON field values
// Verify and Search Only in Valid Jsons
query = query.Where("JSON_VALID(meta_data)")
switch order.Order {
case model.SortDirectionEnumAsc:
query = query.OrderBy(fmt.Sprintf("JSON_EXTRACT(footprint, \"$.%s\") ASC", field))
case model.SortDirectionEnumDesc:
query = query.OrderBy(fmt.Sprintf("JSON_EXTRACT(footprint, \"$.%s\") DESC", field))
default:
return nil, errors.New("REPOSITORY/QUERY > invalid sorting order for footprint")
}
}
}
if page != nil && page.ItemsPerPage != -1 {
limit := uint64(page.ItemsPerPage)
query = query.Offset((uint64(page.Page) - 1) * limit).Limit(limit)
}
for _, f := range filters {
query = BuildWhereClause(f, query)
}
rows, err := query.RunWith(r.stmtCache).Query()
if err != nil {
queryString, queryVars, _ := query.ToSql()
log.Errorf("Error while running query '%s' %v: %v", queryString, queryVars, err)
return nil, err
}
jobs := make([]*schema.Job, 0, 50)
for rows.Next() {
job, err := scanJob(rows)
if err != nil {
rows.Close()
log.Warn("Error while scanning rows (Jobs)")
return nil, err
}
jobs = append(jobs, job)
}
return jobs, nil
}
func (r *JobRepository) CountJobs(
ctx context.Context,
filters []*model.JobFilter,
) (int, error) {
// DISTICT count for tags filters, does not affect other queries
query, qerr := SecurityCheck(ctx, sq.Select("count(DISTINCT job.id)").From("job"))
if qerr != nil {
return 0, qerr
}
for _, f := range filters {
query = BuildWhereClause(f, query)
}
var count int
if err := query.RunWith(r.DB).Scan(&count); err != nil {
return 0, err
}
return count, nil
}
func SecurityCheckWithUser(user *schema.User, query sq.SelectBuilder) (sq.SelectBuilder, error) {
if user == nil {
var qnil sq.SelectBuilder
return qnil, fmt.Errorf("user context is nil")
}
switch {
case len(user.Roles) == 1 && user.HasRole(schema.RoleApi): // API-User : All jobs
return query, nil
case user.HasAnyRole([]schema.Role{schema.RoleAdmin, schema.RoleSupport}): // Admin & Support : All jobs
return query, nil
case user.HasRole(schema.RoleManager): // Manager : Add filter for managed projects' jobs only + personal jobs
if len(user.Projects) != 0 {
return query.Where(sq.Or{sq.Eq{"job.project": user.Projects}, sq.Eq{"job.hpc_user": user.Username}}), nil
} else {
log.Debugf("Manager-User '%s' has no defined projects to lookup! Query only personal jobs ...", user.Username)
return query.Where("job.hpc_user = ?", user.Username), nil
}
case user.HasRole(schema.RoleUser): // User : Only personal jobs
return query.Where("job.hpc_user = ?", user.Username), nil
default: // No known Role, return error
var qnil sq.SelectBuilder
return qnil, fmt.Errorf("user has no or unknown roles")
}
}
func SecurityCheck(ctx context.Context, query sq.SelectBuilder) (sq.SelectBuilder, error) {
user := GetUserFromContext(ctx)
return SecurityCheckWithUser(user, query)
}
// Build a sq.SelectBuilder out of a schema.JobFilter.
func BuildWhereClause(filter *model.JobFilter, query sq.SelectBuilder) sq.SelectBuilder {
if filter.Tags != nil {
// This is an OR-Logic query: Returns all distinct jobs with at least one of the requested tags; TODO: AND-Logic query?
query = query.Join("jobtag ON jobtag.job_id = job.id").Where(sq.Eq{"jobtag.tag_id": filter.Tags}).Distinct()
}
if filter.DbID != nil {
dbIDs := make([]string, len(filter.DbID))
for i, val := range filter.DbID {
dbIDs[i] = val
}
query = query.Where(sq.Eq{"job.id": dbIDs})
}
if filter.JobID != nil {
query = buildStringCondition("job.job_id", filter.JobID, query)
}
if filter.ArrayJobID != nil {
query = query.Where("job.array_job_id = ?", *filter.ArrayJobID)
}
if filter.User != nil {
query = buildStringCondition("job.hpc_user", filter.User, query)
}
if filter.Project != nil {
query = buildStringCondition("job.project", filter.Project, query)
}
if filter.JobName != nil {
query = buildMetaJsonCondition("jobName", filter.JobName, query)
}
if filter.Cluster != nil {
query = buildStringCondition("job.cluster", filter.Cluster, query)
}
if filter.Partition != nil {
query = buildStringCondition("job.cluster_partition", filter.Partition, query)
}
if filter.StartTime != nil {
query = buildTimeCondition("job.start_time", filter.StartTime, query)
}
if filter.Duration != nil {
query = buildIntCondition("job.duration", filter.Duration, query)
}
if filter.MinRunningFor != nil {
now := time.Now().Unix() // There does not seam to be a portable way to get the current unix timestamp accross different DBs.
query = query.Where("(job.job_state != 'running' OR (? - job.start_time) > ?)", now, *filter.MinRunningFor)
}
if filter.Exclusive != nil {
query = query.Where("job.exclusive = ?", *filter.Exclusive)
}
if filter.State != nil {
states := make([]string, len(filter.State))
for i, val := range filter.State {
states[i] = string(val)
}
query = query.Where(sq.Eq{"job.job_state": states})
}
if filter.NumNodes != nil {
query = buildIntCondition("job.num_nodes", filter.NumNodes, query)
}
if filter.NumAccelerators != nil {
query = buildIntCondition("job.num_acc", filter.NumAccelerators, query)
}
if filter.NumHWThreads != nil {
query = buildIntCondition("job.num_hwthreads", filter.NumHWThreads, query)
}
if filter.Node != nil {
query = buildResourceJsonCondition("hostname", filter.Node, query)
}
if filter.Energy != nil {
query = buildFloatCondition("job.energy", filter.Energy, query)
}
if filter.MetricStats != nil {
for _, ms := range filter.MetricStats {
query = buildFloatJsonCondition(ms.MetricName, ms.Range, query)
}
}
return query
}
func buildIntCondition(field string, cond *schema.IntRange, query sq.SelectBuilder) sq.SelectBuilder {
return query.Where(field+" BETWEEN ? AND ?", cond.From, cond.To)
}
func buildFloatCondition(field string, cond *model.FloatRange, query sq.SelectBuilder) sq.SelectBuilder {
return query.Where(field+" BETWEEN ? AND ?", cond.From, cond.To)
}
func buildTimeCondition(field string, cond *schema.TimeRange, query sq.SelectBuilder) sq.SelectBuilder {
if cond.From != nil && cond.To != nil {
return query.Where(field+" BETWEEN ? AND ?", cond.From.Unix(), cond.To.Unix())
} else if cond.From != nil {
return query.Where("? <= "+field, cond.From.Unix())
} else if cond.To != nil {
return query.Where(field+" <= ?", cond.To.Unix())
} else if cond.Range != "" {
now := time.Now().Unix()
var then int64
switch cond.Range {
case "last6h":
then = now - (60 * 60 * 6)
case "last24h":
then = now - (60 * 60 * 24)
case "last7d":
then = now - (60 * 60 * 24 * 7)
case "last30d":
then = now - (60 * 60 * 24 * 30)
default:
log.Debugf("No known named timeRange: startTime.range = %s", cond.Range)
return query
}
return query.Where(field+" BETWEEN ? AND ?", then, now)
} else {
return query
}
}
func buildFloatJsonCondition(condName string, condRange *model.FloatRange, query sq.SelectBuilder) sq.SelectBuilder {
// Verify and Search Only in Valid Jsons
query = query.Where("JSON_VALID(footprint)")
return query.Where("JSON_EXTRACT(footprint, \"$."+condName+"\") BETWEEN ? AND ?", condRange.From, condRange.To)
}
func buildStringCondition(field string, cond *model.StringInput, query sq.SelectBuilder) sq.SelectBuilder {
if cond.Eq != nil {
return query.Where(field+" = ?", *cond.Eq)
}
if cond.Neq != nil {
return query.Where(field+" != ?", *cond.Neq)
}
if cond.StartsWith != nil {
return query.Where(field+" LIKE ?", fmt.Sprint(*cond.StartsWith, "%"))
}
if cond.EndsWith != nil {
return query.Where(field+" LIKE ?", fmt.Sprint("%", *cond.EndsWith))
}
if cond.Contains != nil {
return query.Where(field+" LIKE ?", fmt.Sprint("%", *cond.Contains, "%"))
}
if cond.In != nil {
queryElements := make([]string, len(cond.In))
copy(queryElements, cond.In)
return query.Where(sq.Or{sq.Eq{field: queryElements}})
}
return query
}
func buildMetaJsonCondition(jsonField string, cond *model.StringInput, query sq.SelectBuilder) sq.SelectBuilder {
// Verify and Search Only in Valid Jsons
query = query.Where("JSON_VALID(meta_data)")
// add "AND" Sql query Block for field match
if cond.Eq != nil {
return query.Where("JSON_EXTRACT(meta_data, \"$."+jsonField+"\") = ?", *cond.Eq)
}
if cond.Neq != nil {
return query.Where("JSON_EXTRACT(meta_data, \"$."+jsonField+"\") != ?", *cond.Neq)
}
if cond.StartsWith != nil {
return query.Where("JSON_EXTRACT(meta_data, \"$."+jsonField+"\") LIKE ?", fmt.Sprint(*cond.StartsWith, "%"))
}
if cond.EndsWith != nil {
return query.Where("JSON_EXTRACT(meta_data, \"$."+jsonField+"\") LIKE ?", fmt.Sprint("%", *cond.EndsWith))
}
if cond.Contains != nil {
return query.Where("JSON_EXTRACT(meta_data, \"$."+jsonField+"\") LIKE ?", fmt.Sprint("%", *cond.Contains, "%"))
}
return query
}
func buildResourceJsonCondition(jsonField string, cond *model.StringInput, query sq.SelectBuilder) sq.SelectBuilder {
// Verify and Search Only in Valid Jsons
query = query.Where("JSON_VALID(resources)")
// add "AND" Sql query Block for field match
if cond.Eq != nil {
return query.Where("EXISTS (SELECT 1 FROM json_each(job.resources) WHERE json_extract(value, \"$."+jsonField+"\") = ?)", *cond.Eq)
}
if cond.Neq != nil { // Currently Unused
return query.Where("EXISTS (SELECT 1 FROM json_each(job.resources) WHERE json_extract(value, \"$."+jsonField+"\") != ?)", *cond.Neq)
}
if cond.StartsWith != nil { // Currently Unused
return query.Where("EXISTS (SELECT 1 FROM json_each(job.resources) WHERE json_extract(value, \"$."+jsonField+"\")) LIKE ?)", fmt.Sprint(*cond.StartsWith, "%"))
}
if cond.EndsWith != nil { // Currently Unused
return query.Where("EXISTS (SELECT 1 FROM json_each(job.resources) WHERE json_extract(value, \"$."+jsonField+"\") LIKE ?)", fmt.Sprint("%", *cond.EndsWith))
}
if cond.Contains != nil {
return query.Where("EXISTS (SELECT 1 FROM json_each(job.resources) WHERE json_extract(value, \"$."+jsonField+"\") LIKE ?)", fmt.Sprint("%", *cond.Contains, "%"))
}
return query
}
var (
matchFirstCap = regexp.MustCompile("(.)([A-Z][a-z]+)")
matchAllCap = regexp.MustCompile("([a-z0-9])([A-Z])")
)
func toSnakeCase(str string) string {
for _, c := range str {
if c == '\'' || c == '\\' {
log.Panic("toSnakeCase() attack vector!")
}
}
str = strings.ReplaceAll(str, "'", "")
str = strings.ReplaceAll(str, "\\", "")
snake := matchFirstCap.ReplaceAllString(str, "${1}_${2}")
snake = matchAllCap.ReplaceAllString(snake, "${1}_${2}")
return strings.ToLower(snake)
}

View File

@@ -5,9 +5,11 @@
package repository
import (
"context"
"fmt"
"testing"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
_ "github.com/mattn/go-sqlite3"
)
@@ -28,8 +30,10 @@ func TestFind(t *testing.T) {
func TestFindById(t *testing.T) {
r := setup(t)
job, err := r.FindById(5)
noErr(t, err)
job, err := r.FindById(getContext(t), 5)
if err != nil {
t.Fatal(err)
}
// fmt.Printf("%+v", job)
@@ -41,8 +45,22 @@ func TestFindById(t *testing.T) {
func TestGetTags(t *testing.T) {
r := setup(t)
tags, counts, err := r.CountTags(nil)
noErr(t, err)
const contextUserKey ContextKey = "user"
contextUserValue := &schema.User{
Username: "testuser",
Projects: make([]string, 0),
Roles: []string{"user"},
AuthType: 0,
AuthSource: 2,
}
ctx := context.WithValue(getContext(t), contextUserKey, contextUserValue)
// Test Tag has Scope "global"
tags, counts, err := r.CountTags(GetUserFromContext(ctx))
if err != nil {
t.Fatal(err)
}
fmt.Printf("TAGS %+v \n", tags)
// fmt.Printf("COUNTS %+v \n", counts)

View File

@@ -16,7 +16,7 @@ import (
"github.com/golang-migrate/migrate/v4/source/iofs"
)
const Version uint = 7
const Version uint = 8
//go:embed migrations/*
var migrationFiles embed.FS
@@ -54,7 +54,7 @@ func checkDBVersion(backend string, db *sql.DB) error {
return err
}
default:
log.Fatalf("unsupported database backend: %s", backend)
log.Abortf("Migration: Unsupported database backend '%s'.\n", backend)
}
v, dirty, err := m.Version()
@@ -102,7 +102,7 @@ func getMigrateInstance(backend string, db string) (m *migrate.Migrate, err erro
return m, err
}
default:
log.Fatalf("unsupported database backend: %s", backend)
log.Abortf("Migration: Unsupported database backend '%s'.\n", backend)
}
return m, nil
@@ -114,6 +114,14 @@ func MigrateDB(backend string, db string) error {
return err
}
v, dirty, err := m.Version()
log.Infof("unsupported database version %d, need %d.\nPlease backup your database file and run cc-backend -migrate-db", v, Version)
if dirty {
return fmt.Errorf("last migration to version %d has failed, please fix the db manually and force version with -force-db flag", Version)
}
if err := m.Up(); err != nil {
if err == migrate.ErrNoChange {
log.Info("DB already up to date!")

View File

@@ -0,0 +1,83 @@
ALTER TABLE job DROP energy;
ALTER TABLE job DROP energy_footprint;
ALTER TABLE job ADD COLUMN flops_any_avg;
ALTER TABLE job ADD COLUMN mem_bw_avg;
ALTER TABLE job ADD COLUMN mem_used_max;
ALTER TABLE job ADD COLUMN load_avg;
ALTER TABLE job ADD COLUMN net_bw_avg;
ALTER TABLE job ADD COLUMN net_data_vol_total;
ALTER TABLE job ADD COLUMN file_bw_avg;
ALTER TABLE job ADD COLUMN file_data_vol_total;
UPDATE job SET flops_any_avg = json_extract(footprint, '$.flops_any_avg');
UPDATE job SET mem_bw_avg = json_extract(footprint, '$.mem_bw_avg');
UPDATE job SET mem_used_max = json_extract(footprint, '$.mem_used_max');
UPDATE job SET load_avg = json_extract(footprint, '$.cpu_load_avg');
UPDATE job SET net_bw_avg = json_extract(footprint, '$.net_bw_avg');
UPDATE job SET net_data_vol_total = json_extract(footprint, '$.net_data_vol_total');
UPDATE job SET file_bw_avg = json_extract(footprint, '$.file_bw_avg');
UPDATE job SET file_data_vol_total = json_extract(footprint, '$.file_data_vol_total');
ALTER TABLE job DROP footprint;
-- Do not use reserved keywords anymore
RENAME TABLE hpc_user TO `user`;
ALTER TABLE job RENAME COLUMN hpc_user TO `user`;
ALTER TABLE job RENAME COLUMN cluster_partition TO `partition`;
DROP INDEX IF EXISTS jobs_cluster;
DROP INDEX IF EXISTS jobs_cluster_user;
DROP INDEX IF EXISTS jobs_cluster_project;
DROP INDEX IF EXISTS jobs_cluster_subcluster;
DROP INDEX IF EXISTS jobs_cluster_starttime;
DROP INDEX IF EXISTS jobs_cluster_duration;
DROP INDEX IF EXISTS jobs_cluster_numnodes;
DROP INDEX IF EXISTS jobs_cluster_partition;
DROP INDEX IF EXISTS jobs_cluster_partition_starttime;
DROP INDEX IF EXISTS jobs_cluster_partition_duration;
DROP INDEX IF EXISTS jobs_cluster_partition_numnodes;
DROP INDEX IF EXISTS jobs_cluster_partition_jobstate;
DROP INDEX IF EXISTS jobs_cluster_partition_jobstate_user;
DROP INDEX IF EXISTS jobs_cluster_partition_jobstate_project;
DROP INDEX IF EXISTS jobs_cluster_partition_jobstate_starttime;
DROP INDEX IF EXISTS jobs_cluster_partition_jobstate_duration;
DROP INDEX IF EXISTS jobs_cluster_partition_jobstate_numnodes;
DROP INDEX IF EXISTS jobs_cluster_jobstate;
DROP INDEX IF EXISTS jobs_cluster_jobstate_user;
DROP INDEX IF EXISTS jobs_cluster_jobstate_project;
DROP INDEX IF EXISTS jobs_cluster_jobstate_starttime;
DROP INDEX IF EXISTS jobs_cluster_jobstate_duration;
DROP INDEX IF EXISTS jobs_cluster_jobstate_numnodes;
DROP INDEX IF EXISTS jobs_user;
DROP INDEX IF EXISTS jobs_user_starttime;
DROP INDEX IF EXISTS jobs_user_duration;
DROP INDEX IF EXISTS jobs_user_numnodes;
DROP INDEX IF EXISTS jobs_project;
DROP INDEX IF EXISTS jobs_project_user;
DROP INDEX IF EXISTS jobs_project_starttime;
DROP INDEX IF EXISTS jobs_project_duration;
DROP INDEX IF EXISTS jobs_project_numnodes;
DROP INDEX IF EXISTS jobs_jobstate;
DROP INDEX IF EXISTS jobs_jobstate_user;
DROP INDEX IF EXISTS jobs_jobstate_project;
DROP INDEX IF EXISTS jobs_jobstate_starttime;
DROP INDEX IF EXISTS jobs_jobstate_duration;
DROP INDEX IF EXISTS jobs_jobstate_numnodes;
DROP INDEX IF EXISTS jobs_arrayjobid_starttime;
DROP INDEX IF EXISTS jobs_cluster_arrayjobid_starttime;
DROP INDEX IF EXISTS jobs_starttime;
DROP INDEX IF EXISTS jobs_duration;
DROP INDEX IF EXISTS jobs_numnodes;
DROP INDEX IF EXISTS jobs_duration_starttime;
DROP INDEX IF EXISTS jobs_numnodes_starttime;
DROP INDEX IF EXISTS jobs_numacc_starttime;
DROP INDEX IF EXISTS jobs_energy_starttime;

View File

@@ -0,0 +1,123 @@
DROP INDEX IF EXISTS job_stats ON job;
DROP INDEX IF EXISTS job_by_user ON job;
DROP INDEX IF EXISTS job_by_starttime ON job;
DROP INDEX IF EXISTS job_by_job_id ON job;
DROP INDEX IF EXISTS job_list ON job;
DROP INDEX IF EXISTS job_list_user ON job;
DROP INDEX IF EXISTS job_list_users ON job;
DROP INDEX IF EXISTS job_list_users_start ON job;
ALTER TABLE job ADD COLUMN energy REAL NOT NULL DEFAULT 0.0;
ALTER TABLE job ADD COLUMN energy_footprint JSON;
ALTER TABLE job ADD COLUMN footprint JSON;
ALTER TABLE tag ADD COLUMN tag_scope TEXT NOT NULL DEFAULT 'global';
-- Do not use reserved keywords anymore
RENAME TABLE `user` TO hpc_user;
ALTER TABLE job RENAME COLUMN `user` TO hpc_user;
ALTER TABLE job RENAME COLUMN `partition` TO cluster_partition;
ALTER TABLE job MODIFY COLUMN cluster VARCHAR(50);
ALTER TABLE job MODIFY COLUMN hpc_user VARCHAR(50);
ALTER TABLE job MODIFY COLUMN subcluster VARCHAR(50);
ALTER TABLE job MODIFY COLUMN project VARCHAR(50);
ALTER TABLE job MODIFY COLUMN cluster_partition VARCHAR(50);
ALTER TABLE job MODIFY COLUMN job_state VARCHAR(25);
UPDATE job SET footprint = '{"flops_any_avg": 0.0}';
UPDATE job SET footprint = json_replace(footprint, '$.flops_any_avg', job.flops_any_avg);
UPDATE job SET footprint = json_insert(footprint, '$.mem_bw_avg', job.mem_bw_avg);
UPDATE job SET footprint = json_insert(footprint, '$.mem_used_max', job.mem_used_max);
UPDATE job SET footprint = json_insert(footprint, '$.cpu_load_avg', job.load_avg);
UPDATE job SET footprint = json_insert(footprint, '$.net_bw_avg', job.net_bw_avg) WHERE job.net_bw_avg != 0;
UPDATE job SET footprint = json_insert(footprint, '$.net_data_vol_total', job.net_data_vol_total) WHERE job.net_data_vol_total != 0;
UPDATE job SET footprint = json_insert(footprint, '$.file_bw_avg', job.file_bw_avg) WHERE job.file_bw_avg != 0;
UPDATE job SET footprint = json_insert(footprint, '$.file_data_vol_total', job.file_data_vol_total) WHERE job.file_data_vol_total != 0;
ALTER TABLE job DROP flops_any_avg;
ALTER TABLE job DROP mem_bw_avg;
ALTER TABLE job DROP mem_used_max;
ALTER TABLE job DROP load_avg;
ALTER TABLE job DROP net_bw_avg;
ALTER TABLE job DROP net_data_vol_total;
ALTER TABLE job DROP file_bw_avg;
ALTER TABLE job DROP file_data_vol_total;
-- Indices for: Single filters, combined filters, sorting, sorting with filters
-- Cluster Filter
CREATE INDEX IF NOT EXISTS jobs_cluster ON job (cluster);
CREATE INDEX IF NOT EXISTS jobs_cluster_user ON job (cluster, hpc_user);
CREATE INDEX IF NOT EXISTS jobs_cluster_project ON job (cluster, project);
CREATE INDEX IF NOT EXISTS jobs_cluster_subcluster ON job (cluster, subcluster);
-- Cluster Filter Sorting
CREATE INDEX IF NOT EXISTS jobs_cluster_starttime ON job (cluster, start_time);
CREATE INDEX IF NOT EXISTS jobs_cluster_duration ON job (cluster, duration);
CREATE INDEX IF NOT EXISTS jobs_cluster_numnodes ON job (cluster, num_nodes);
-- Cluster+Partition Filter
CREATE INDEX IF NOT EXISTS jobs_cluster_partition ON job (cluster, cluster_partition);
-- Cluster+Partition Filter Sorting
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_starttime ON job (cluster, cluster_partition, start_time);
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_duration ON job (cluster, cluster_partition, duration);
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_numnodes ON job (cluster, cluster_partition, num_nodes);
-- Cluster+Partition+Jobstate Filter
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_jobstate ON job (cluster, cluster_partition, job_state);
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_jobstate_user ON job (cluster, cluster_partition, job_state, hpc_user);
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_jobstate_project ON job (cluster, cluster_partition, job_state, project);
-- Cluster+Partition+Jobstate Filter Sorting
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_jobstate_starttime ON job (cluster, cluster_partition, job_state, start_time);
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_jobstate_duration ON job (cluster, cluster_partition, job_state, duration);
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_jobstate_numnodes ON job (cluster, cluster_partition, job_state, num_nodes);
-- Cluster+JobState Filter
CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate ON job (cluster, job_state);
CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_user ON job (cluster, job_state, hpc_user);
CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_project ON job (cluster, job_state, project);
-- Cluster+JobState Filter Sorting
CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_starttime ON job (cluster, job_state, start_time);
CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_duration ON job (cluster, job_state, duration);
CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_numnodes ON job (cluster, job_state, num_nodes);
-- User Filter
CREATE INDEX IF NOT EXISTS jobs_user ON job (hpc_user);
-- User Filter Sorting
CREATE INDEX IF NOT EXISTS jobs_user_starttime ON job (hpc_user, start_time);
CREATE INDEX IF NOT EXISTS jobs_user_duration ON job (hpc_user, duration);
CREATE INDEX IF NOT EXISTS jobs_user_numnodes ON job (hpc_user, num_nodes);
-- Project Filter
CREATE INDEX IF NOT EXISTS jobs_project ON job (project);
CREATE INDEX IF NOT EXISTS jobs_project_user ON job (project, hpc_user);
-- Project Filter Sorting
CREATE INDEX IF NOT EXISTS jobs_project_starttime ON job (project, start_time);
CREATE INDEX IF NOT EXISTS jobs_project_duration ON job (project, duration);
CREATE INDEX IF NOT EXISTS jobs_project_numnodes ON job (project, num_nodes);
-- JobState Filter
CREATE INDEX IF NOT EXISTS jobs_jobstate ON job (job_state);
CREATE INDEX IF NOT EXISTS jobs_jobstate_user ON job (job_state, hpc_user);
CREATE INDEX IF NOT EXISTS jobs_jobstate_project ON job (job_state, project);
CREATE INDEX IF NOT EXISTS jobs_jobstate_cluster ON job (job_state, cluster);
-- JobState Filter Sorting
CREATE INDEX IF NOT EXISTS jobs_jobstate_starttime ON job (job_state, start_time);
CREATE INDEX IF NOT EXISTS jobs_jobstate_duration ON job (job_state, duration);
CREATE INDEX IF NOT EXISTS jobs_jobstate_numnodes ON job (job_state, num_nodes);
-- ArrayJob Filter
CREATE INDEX IF NOT EXISTS jobs_arrayjobid_starttime ON job (array_job_id, start_time);
CREATE INDEX IF NOT EXISTS jobs_cluster_arrayjobid_starttime ON job (cluster, array_job_id, start_time);
-- Sorting without active filters
CREATE INDEX IF NOT EXISTS jobs_starttime ON job (start_time);
CREATE INDEX IF NOT EXISTS jobs_duration ON job (duration);
CREATE INDEX IF NOT EXISTS jobs_numnodes ON job (num_nodes);
-- Single filters with default starttime sorting
CREATE INDEX IF NOT EXISTS jobs_duration_starttime ON job (duration, start_time);
CREATE INDEX IF NOT EXISTS jobs_numnodes_starttime ON job (num_nodes, start_time);
CREATE INDEX IF NOT EXISTS jobs_numacc_starttime ON job (num_acc, start_time);
CREATE INDEX IF NOT EXISTS jobs_energy_starttime ON job (energy, start_time);
-- Optimize DB index usage

View File

@@ -0,0 +1,103 @@
ALTER TABLE job DROP energy;
ALTER TABLE job DROP energy_footprint;
ALTER TABLE job ADD COLUMN flops_any_avg;
ALTER TABLE job ADD COLUMN mem_bw_avg;
ALTER TABLE job ADD COLUMN mem_used_max;
ALTER TABLE job ADD COLUMN load_avg;
ALTER TABLE job ADD COLUMN net_bw_avg;
ALTER TABLE job ADD COLUMN net_data_vol_total;
ALTER TABLE job ADD COLUMN file_bw_avg;
ALTER TABLE job ADD COLUMN file_data_vol_total;
UPDATE job SET flops_any_avg = json_extract(footprint, '$.flops_any_avg');
UPDATE job SET mem_bw_avg = json_extract(footprint, '$.mem_bw_avg');
UPDATE job SET mem_used_max = json_extract(footprint, '$.mem_used_max');
UPDATE job SET load_avg = json_extract(footprint, '$.cpu_load_avg');
UPDATE job SET net_bw_avg = json_extract(footprint, '$.net_bw_avg');
UPDATE job SET net_data_vol_total = json_extract(footprint, '$.net_data_vol_total');
UPDATE job SET file_bw_avg = json_extract(footprint, '$.file_bw_avg');
UPDATE job SET file_data_vol_total = json_extract(footprint, '$.file_data_vol_total');
ALTER TABLE job DROP footprint;
DROP INDEX IF EXISTS jobs_cluster;
DROP INDEX IF EXISTS jobs_cluster_user;
DROP INDEX IF EXISTS jobs_cluster_project;
DROP INDEX IF EXISTS jobs_cluster_subcluster;
DROP INDEX IF EXISTS jobs_cluster_starttime;
DROP INDEX IF EXISTS jobs_cluster_duration;
DROP INDEX IF EXISTS jobs_cluster_numnodes;
DROP INDEX IF EXISTS jobs_cluster_numhwthreads;
DROP INDEX IF EXISTS jobs_cluster_numacc;
DROP INDEX IF EXISTS jobs_cluster_energy;
DROP INDEX IF EXISTS jobs_cluster_partition;
DROP INDEX IF EXISTS jobs_cluster_partition_starttime;
DROP INDEX IF EXISTS jobs_cluster_partition_duration;
DROP INDEX IF EXISTS jobs_cluster_partition_numnodes;
DROP INDEX IF EXISTS jobs_cluster_partition_numhwthreads;
DROP INDEX IF EXISTS jobs_cluster_partition_numacc;
DROP INDEX IF EXISTS jobs_cluster_partition_energy;
DROP INDEX IF EXISTS jobs_cluster_partition_jobstate;
DROP INDEX IF EXISTS jobs_cluster_partition_jobstate_user;
DROP INDEX IF EXISTS jobs_cluster_partition_jobstate_project;
DROP INDEX IF EXISTS jobs_cluster_partition_jobstate_starttime;
DROP INDEX IF EXISTS jobs_cluster_partition_jobstate_duration;
DROP INDEX IF EXISTS jobs_cluster_partition_jobstate_numnodes;
DROP INDEX IF EXISTS jobs_cluster_partition_jobstate_numhwthreads;
DROP INDEX IF EXISTS jobs_cluster_partition_jobstate_numacc;
DROP INDEX IF EXISTS jobs_cluster_partition_jobstate_energy;
DROP INDEX IF EXISTS jobs_cluster_jobstate;
DROP INDEX IF EXISTS jobs_cluster_jobstate_user;
DROP INDEX IF EXISTS jobs_cluster_jobstate_project;
DROP INDEX IF EXISTS jobs_cluster_jobstate_starttime;
DROP INDEX IF EXISTS jobs_cluster_jobstate_duration;
DROP INDEX IF EXISTS jobs_cluster_jobstate_numnodes;
DROP INDEX IF EXISTS jobs_cluster_jobstate_numhwthreads;
DROP INDEX IF EXISTS jobs_cluster_jobstate_numacc;
DROP INDEX IF EXISTS jobs_cluster_jobstate_energy;
DROP INDEX IF EXISTS jobs_user;
DROP INDEX IF EXISTS jobs_user_starttime;
DROP INDEX IF EXISTS jobs_user_duration;
DROP INDEX IF EXISTS jobs_user_numnodes;
DROP INDEX IF EXISTS jobs_user_numhwthreads;
DROP INDEX IF EXISTS jobs_user_numacc;
DROP INDEX IF EXISTS jobs_user_energy;
DROP INDEX IF EXISTS jobs_project;
DROP INDEX IF EXISTS jobs_project_user;
DROP INDEX IF EXISTS jobs_project_starttime;
DROP INDEX IF EXISTS jobs_project_duration;
DROP INDEX IF EXISTS jobs_project_numnodes;
DROP INDEX IF EXISTS jobs_project_numhwthreads;
DROP INDEX IF EXISTS jobs_project_numacc;
DROP INDEX IF EXISTS jobs_project_energy;
DROP INDEX IF EXISTS jobs_jobstate;
DROP INDEX IF EXISTS jobs_jobstate_user;
DROP INDEX IF EXISTS jobs_jobstate_project;
DROP INDEX IF EXISTS jobs_jobstate_starttime;
DROP INDEX IF EXISTS jobs_jobstate_duration;
DROP INDEX IF EXISTS jobs_jobstate_numnodes;
DROP INDEX IF EXISTS jobs_jobstate_numhwthreads;
DROP INDEX IF EXISTS jobs_jobstate_numacc;
DROP INDEX IF EXISTS jobs_arrayjobid_starttime;
DROP INDEX IF EXISTS jobs_cluster_arrayjobid_starttime;
DROP INDEX IF EXISTS jobs_starttime;
DROP INDEX IF EXISTS jobs_duration;
DROP INDEX IF EXISTS jobs_numnodes;
DROP INDEX IF EXISTS jobs_numhwthreads;
DROP INDEX IF EXISTS jobs_numacc;
DROP INDEX IF EXISTS jobs_energy;
DROP INDEX IF EXISTS jobs_duration_starttime;
DROP INDEX IF EXISTS jobs_numnodes_starttime;
DROP INDEX IF EXISTS jobs_numhwthreads_starttime;
DROP INDEX IF EXISTS jobs_numacc_starttime;
DROP INDEX IF EXISTS jobs_energy_starttime;

View File

@@ -0,0 +1,142 @@
DROP INDEX IF EXISTS job_stats;
DROP INDEX IF EXISTS job_by_user;
DROP INDEX IF EXISTS job_by_starttime;
DROP INDEX IF EXISTS job_by_job_id;
DROP INDEX IF EXISTS job_list;
DROP INDEX IF EXISTS job_list_user;
DROP INDEX IF EXISTS job_list_users;
DROP INDEX IF EXISTS job_list_users_start;
ALTER TABLE job ADD COLUMN energy REAL NOT NULL DEFAULT 0.0;
ALTER TABLE job ADD COLUMN energy_footprint TEXT DEFAULT NULL;
ALTER TABLE job ADD COLUMN footprint TEXT DEFAULT NULL;
ALTER TABLE tag ADD COLUMN tag_scope TEXT NOT NULL DEFAULT 'global';
-- Do not use reserved keywords anymore
ALTER TABLE "user" RENAME TO hpc_user;
ALTER TABLE job RENAME COLUMN "user" TO hpc_user;
ALTER TABLE job RENAME COLUMN "partition" TO cluster_partition;
UPDATE job SET footprint = '{"flops_any_avg": 0.0}';
UPDATE job SET footprint = json_replace(footprint, '$.flops_any_avg', job.flops_any_avg);
UPDATE job SET footprint = json_insert(footprint, '$.mem_bw_avg', job.mem_bw_avg);
UPDATE job SET footprint = json_insert(footprint, '$.mem_used_max', job.mem_used_max);
UPDATE job SET footprint = json_insert(footprint, '$.cpu_load_avg', job.load_avg);
UPDATE job SET footprint = json_insert(footprint, '$.net_bw_avg', job.net_bw_avg) WHERE job.net_bw_avg != 0;
UPDATE job SET footprint = json_insert(footprint, '$.net_data_vol_total', job.net_data_vol_total) WHERE job.net_data_vol_total != 0;
UPDATE job SET footprint = json_insert(footprint, '$.file_bw_avg', job.file_bw_avg) WHERE job.file_bw_avg != 0;
UPDATE job SET footprint = json_insert(footprint, '$.file_data_vol_total', job.file_data_vol_total) WHERE job.file_data_vol_total != 0;
ALTER TABLE job DROP flops_any_avg;
ALTER TABLE job DROP mem_bw_avg;
ALTER TABLE job DROP mem_used_max;
ALTER TABLE job DROP load_avg;
ALTER TABLE job DROP net_bw_avg;
ALTER TABLE job DROP net_data_vol_total;
ALTER TABLE job DROP file_bw_avg;
ALTER TABLE job DROP file_data_vol_total;
-- Indices for: Single filters, combined filters, sorting, sorting with filters
-- Cluster Filter
CREATE INDEX IF NOT EXISTS jobs_cluster ON job (cluster);
CREATE INDEX IF NOT EXISTS jobs_cluster_user ON job (cluster, hpc_user);
CREATE INDEX IF NOT EXISTS jobs_cluster_project ON job (cluster, project);
CREATE INDEX IF NOT EXISTS jobs_cluster_subcluster ON job (cluster, subcluster);
-- Cluster Filter Sorting
CREATE INDEX IF NOT EXISTS jobs_cluster_starttime ON job (cluster, start_time);
CREATE INDEX IF NOT EXISTS jobs_cluster_duration ON job (cluster, duration);
CREATE INDEX IF NOT EXISTS jobs_cluster_numnodes ON job (cluster, num_nodes);
CREATE INDEX IF NOT EXISTS jobs_cluster_numhwthreads ON job (cluster, num_hwthreads);
CREATE INDEX IF NOT EXISTS jobs_cluster_numacc ON job (cluster, num_acc);
CREATE INDEX IF NOT EXISTS jobs_cluster_energy ON job (cluster, energy);
-- Cluster+Partition Filter
CREATE INDEX IF NOT EXISTS jobs_cluster_partition ON job (cluster, cluster_partition);
-- Cluster+Partition Filter Sorting
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_starttime ON job (cluster, cluster_partition, start_time);
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_duration ON job (cluster, cluster_partition, duration);
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_numnodes ON job (cluster, cluster_partition, num_nodes);
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_numhwthreads ON job (cluster, cluster_partition, num_hwthreads);
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_numacc ON job (cluster, cluster_partition, num_acc);
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_energy ON job (cluster, cluster_partition, energy);
-- Cluster+Partition+Jobstate Filter
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_jobstate ON job (cluster, cluster_partition, job_state);
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_jobstate_user ON job (cluster, cluster_partition, job_state, hpc_user);
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_jobstate_project ON job (cluster, cluster_partition, job_state, project);
-- Cluster+Partition+Jobstate Filter Sorting
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_jobstate_starttime ON job (cluster, cluster_partition, job_state, start_time);
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_jobstate_duration ON job (cluster, cluster_partition, job_state, duration);
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_jobstate_numnodes ON job (cluster, cluster_partition, job_state, num_nodes);
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_jobstate_numhwthreads ON job (cluster, cluster_partition, job_state, num_hwthreads);
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_jobstate_numacc ON job (cluster, cluster_partition, job_state, num_acc);
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_jobstate_energy ON job (cluster, cluster_partition, job_state, energy);
-- Cluster+JobState Filter
CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate ON job (cluster, job_state);
CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_user ON job (cluster, job_state, hpc_user);
CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_project ON job (cluster, job_state, project);
-- Cluster+JobState Filter Sorting
CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_starttime ON job (cluster, job_state, start_time);
CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_duration ON job (cluster, job_state, duration);
CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_numnodes ON job (cluster, job_state, num_nodes);
CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_numhwthreads ON job (cluster, job_state, num_hwthreads);
CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_numacc ON job (cluster, job_state, num_acc);
CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_energy ON job (cluster, job_state, energy);
-- User Filter
CREATE INDEX IF NOT EXISTS jobs_user ON job (hpc_user);
-- User Filter Sorting
CREATE INDEX IF NOT EXISTS jobs_user_starttime ON job (hpc_user, start_time);
CREATE INDEX IF NOT EXISTS jobs_user_duration ON job (hpc_user, duration);
CREATE INDEX IF NOT EXISTS jobs_user_numnodes ON job (hpc_user, num_nodes);
CREATE INDEX IF NOT EXISTS jobs_user_numhwthreads ON job (hpc_user, num_hwthreads);
CREATE INDEX IF NOT EXISTS jobs_user_numacc ON job (hpc_user, num_acc);
CREATE INDEX IF NOT EXISTS jobs_user_energy ON job (hpc_user, energy);
-- Project Filter
CREATE INDEX IF NOT EXISTS jobs_project ON job (project);
CREATE INDEX IF NOT EXISTS jobs_project_user ON job (project, hpc_user);
-- Project Filter Sorting
CREATE INDEX IF NOT EXISTS jobs_project_starttime ON job (project, start_time);
CREATE INDEX IF NOT EXISTS jobs_project_duration ON job (project, duration);
CREATE INDEX IF NOT EXISTS jobs_project_numnodes ON job (project, num_nodes);
CREATE INDEX IF NOT EXISTS jobs_project_numhwthreads ON job (project, num_hwthreads);
CREATE INDEX IF NOT EXISTS jobs_project_numacc ON job (project, num_acc);
CREATE INDEX IF NOT EXISTS jobs_project_energy ON job (project, energy);
-- JobState Filter
CREATE INDEX IF NOT EXISTS jobs_jobstate ON job (job_state);
CREATE INDEX IF NOT EXISTS jobs_jobstate_user ON job (job_state, hpc_user);
CREATE INDEX IF NOT EXISTS jobs_jobstate_project ON job (job_state, project);
CREATE INDEX IF NOT EXISTS jobs_jobstate_cluster ON job (job_state, cluster);
-- JobState Filter Sorting
CREATE INDEX IF NOT EXISTS jobs_jobstate_starttime ON job (job_state, start_time);
CREATE INDEX IF NOT EXISTS jobs_jobstate_duration ON job (job_state, duration);
CREATE INDEX IF NOT EXISTS jobs_jobstate_numnodes ON job (job_state, num_nodes);
CREATE INDEX IF NOT EXISTS jobs_jobstate_numhwthreads ON job (job_state, num_hwthreads);
CREATE INDEX IF NOT EXISTS jobs_jobstate_numacc ON job (job_state, num_acc);
CREATE INDEX IF NOT EXISTS jobs_jobstate_energy ON job (job_state, energy);
-- ArrayJob Filter
CREATE INDEX IF NOT EXISTS jobs_arrayjobid_starttime ON job (array_job_id, start_time);
CREATE INDEX IF NOT EXISTS jobs_cluster_arrayjobid_starttime ON job (cluster, array_job_id, start_time);
-- Sorting without active filters
CREATE INDEX IF NOT EXISTS jobs_starttime ON job (start_time);
CREATE INDEX IF NOT EXISTS jobs_duration ON job (duration);
CREATE INDEX IF NOT EXISTS jobs_numnodes ON job (num_nodes);
CREATE INDEX IF NOT EXISTS jobs_numhwthreads ON job (num_hwthreads);
CREATE INDEX IF NOT EXISTS jobs_numacc ON job (num_acc);
CREATE INDEX IF NOT EXISTS jobs_energy ON job (energy);
-- Single filters with default starttime sorting
CREATE INDEX IF NOT EXISTS jobs_duration_starttime ON job (duration, start_time);
CREATE INDEX IF NOT EXISTS jobs_numnodes_starttime ON job (num_nodes, start_time);
CREATE INDEX IF NOT EXISTS jobs_numhwthreads_starttime ON job (num_hwthreads, start_time);
CREATE INDEX IF NOT EXISTS jobs_numacc_starttime ON job (num_acc, start_time);
CREATE INDEX IF NOT EXISTS jobs_energy_starttime ON job (energy, start_time);
-- Optimize DB index usage
PRAGMA optimize;

View File

@@ -1,253 +0,0 @@
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package repository
import (
"context"
"errors"
"fmt"
"regexp"
"strings"
"time"
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
"github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
sq "github.com/Masterminds/squirrel"
)
func (r *JobRepository) QueryJobs(
ctx context.Context,
filters []*model.JobFilter,
page *model.PageRequest,
order *model.OrderByInput) ([]*schema.Job, error) {
query, qerr := SecurityCheck(ctx, sq.Select(jobColumns...).From("job"))
if qerr != nil {
return nil, qerr
}
if order != nil {
field := toSnakeCase(order.Field)
switch order.Order {
case model.SortDirectionEnumAsc:
query = query.OrderBy(fmt.Sprintf("job.%s ASC", field))
case model.SortDirectionEnumDesc:
query = query.OrderBy(fmt.Sprintf("job.%s DESC", field))
default:
return nil, errors.New("REPOSITORY/QUERY > invalid sorting order")
}
}
if page != nil && page.ItemsPerPage != -1 {
limit := uint64(page.ItemsPerPage)
query = query.Offset((uint64(page.Page) - 1) * limit).Limit(limit)
}
for _, f := range filters {
query = BuildWhereClause(f, query)
}
rows, err := query.RunWith(r.stmtCache).Query()
if err != nil {
log.Errorf("Error while running query: %v", err)
return nil, err
}
jobs := make([]*schema.Job, 0, 50)
for rows.Next() {
job, err := scanJob(rows)
if err != nil {
rows.Close()
log.Warn("Error while scanning rows (Jobs)")
return nil, err
}
jobs = append(jobs, job)
}
return jobs, nil
}
func (r *JobRepository) CountJobs(
ctx context.Context,
filters []*model.JobFilter) (int, error) {
query, qerr := SecurityCheck(ctx, sq.Select("count(*)").From("job"))
if qerr != nil {
return 0, qerr
}
for _, f := range filters {
query = BuildWhereClause(f, query)
}
var count int
if err := query.RunWith(r.DB).Scan(&count); err != nil {
return 0, err
}
return count, nil
}
func SecurityCheck(ctx context.Context, query sq.SelectBuilder) (sq.SelectBuilder, error) {
user := GetUserFromContext(ctx)
if user == nil {
var qnil sq.SelectBuilder
return qnil, fmt.Errorf("user context is nil")
} else if user.HasAnyRole([]schema.Role{schema.RoleAdmin, schema.RoleSupport, schema.RoleApi}) { // Admin & Co. : All jobs
return query, nil
} else if user.HasRole(schema.RoleManager) { // Manager : Add filter for managed projects' jobs only + personal jobs
if len(user.Projects) != 0 {
return query.Where(sq.Or{sq.Eq{"job.project": user.Projects}, sq.Eq{"job.user": user.Username}}), nil
} else {
log.Debugf("Manager-User '%s' has no defined projects to lookup! Query only personal jobs ...", user.Username)
return query.Where("job.user = ?", user.Username), nil
}
} else if user.HasRole(schema.RoleUser) { // User : Only personal jobs
return query.Where("job.user = ?", user.Username), nil
} else {
// Shortterm compatibility: Return User-Query if no roles:
return query.Where("job.user = ?", user.Username), nil
// // On the longterm: Return Error instead of fallback:
// var qnil sq.SelectBuilder
// return qnil, fmt.Errorf("user '%s' with unknown roles [%#v]", user.Username, user.Roles)
}
}
// Build a sq.SelectBuilder out of a schema.JobFilter.
func BuildWhereClause(filter *model.JobFilter, query sq.SelectBuilder) sq.SelectBuilder {
if filter.Tags != nil {
query = query.Join("jobtag ON jobtag.job_id = job.id").Where(sq.Eq{"jobtag.tag_id": filter.Tags})
}
if filter.JobID != nil {
query = buildStringCondition("job.job_id", filter.JobID, query)
}
if filter.ArrayJobID != nil {
query = query.Where("job.array_job_id = ?", *filter.ArrayJobID)
}
if filter.User != nil {
query = buildStringCondition("job.user", filter.User, query)
}
if filter.Project != nil {
query = buildStringCondition("job.project", filter.Project, query)
}
if filter.JobName != nil {
query = buildStringCondition("job.meta_data", filter.JobName, query)
}
if filter.Cluster != nil {
query = buildStringCondition("job.cluster", filter.Cluster, query)
}
if filter.Partition != nil {
query = buildStringCondition("job.partition", filter.Partition, query)
}
if filter.StartTime != nil {
query = buildTimeCondition("job.start_time", filter.StartTime, query)
}
if filter.Duration != nil {
now := time.Now().Unix() // There does not seam to be a portable way to get the current unix timestamp accross different DBs.
query = query.Where("(CASE WHEN job.job_state = 'running' THEN (? - job.start_time) ELSE job.duration END) BETWEEN ? AND ?", now, filter.Duration.From, filter.Duration.To)
}
if filter.MinRunningFor != nil {
now := time.Now().Unix() // There does not seam to be a portable way to get the current unix timestamp accross different DBs.
query = query.Where("(job.job_state != 'running' OR (? - job.start_time) > ?)", now, *filter.MinRunningFor)
}
if filter.State != nil {
states := make([]string, len(filter.State))
for i, val := range filter.State {
states[i] = string(val)
}
query = query.Where(sq.Eq{"job.job_state": states})
}
if filter.NumNodes != nil {
query = buildIntCondition("job.num_nodes", filter.NumNodes, query)
}
if filter.NumAccelerators != nil {
query = buildIntCondition("job.num_acc", filter.NumAccelerators, query)
}
if filter.NumHWThreads != nil {
query = buildIntCondition("job.num_hwthreads", filter.NumHWThreads, query)
}
if filter.Node != nil {
query = buildStringCondition("job.resources", filter.Node, query)
}
if filter.FlopsAnyAvg != nil {
query = buildFloatCondition("job.flops_any_avg", filter.FlopsAnyAvg, query)
}
if filter.MemBwAvg != nil {
query = buildFloatCondition("job.mem_bw_avg", filter.MemBwAvg, query)
}
if filter.LoadAvg != nil {
query = buildFloatCondition("job.load_avg", filter.LoadAvg, query)
}
if filter.MemUsedMax != nil {
query = buildFloatCondition("job.mem_used_max", filter.MemUsedMax, query)
}
return query
}
func buildIntCondition(field string, cond *schema.IntRange, query sq.SelectBuilder) sq.SelectBuilder {
return query.Where(field+" BETWEEN ? AND ?", cond.From, cond.To)
}
func buildTimeCondition(field string, cond *schema.TimeRange, query sq.SelectBuilder) sq.SelectBuilder {
if cond.From != nil && cond.To != nil {
return query.Where(field+" BETWEEN ? AND ?", cond.From.Unix(), cond.To.Unix())
} else if cond.From != nil {
return query.Where("? <= "+field, cond.From.Unix())
} else if cond.To != nil {
return query.Where(field+" <= ?", cond.To.Unix())
} else {
return query
}
}
func buildFloatCondition(field string, cond *model.FloatRange, query sq.SelectBuilder) sq.SelectBuilder {
return query.Where(field+" BETWEEN ? AND ?", cond.From, cond.To)
}
func buildStringCondition(field string, cond *model.StringInput, query sq.SelectBuilder) sq.SelectBuilder {
if cond.Eq != nil {
return query.Where(field+" = ?", *cond.Eq)
}
if cond.Neq != nil {
return query.Where(field+" != ?", *cond.Neq)
}
if cond.StartsWith != nil {
return query.Where(field+" LIKE ?", fmt.Sprint(*cond.StartsWith, "%"))
}
if cond.EndsWith != nil {
return query.Where(field+" LIKE ?", fmt.Sprint("%", *cond.EndsWith))
}
if cond.Contains != nil {
return query.Where(field+" LIKE ?", fmt.Sprint("%", *cond.Contains, "%"))
}
if cond.In != nil {
queryElements := make([]string, len(cond.In))
for i, val := range cond.In {
queryElements[i] = val
}
return query.Where(sq.Or{sq.Eq{field: queryElements}})
}
return query
}
var matchFirstCap = regexp.MustCompile("(.)([A-Z][a-z]+)")
var matchAllCap = regexp.MustCompile("([a-z0-9])([A-Z])")
func toSnakeCase(str string) string {
for _, c := range str {
if c == '\'' || c == '\\' {
log.Panic("toSnakeCase() attack vector!")
}
}
str = strings.ReplaceAll(str, "'", "")
str = strings.ReplaceAll(str, "\\", "")
snake := matchFirstCap.ReplaceAllString(str, "${1}_${2}")
snake = matchAllCap.ReplaceAllString(snake, "${1}_${2}")
return strings.ToLower(snake)
}

View File

@@ -55,7 +55,7 @@ func BenchmarkDB_FindJobById(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
_, err := db.FindById(jobId)
_, err := db.FindById(getContext(b), jobId)
noErr(b, err)
}
})
@@ -111,7 +111,7 @@ func BenchmarkDB_QueryJobs(b *testing.B) {
user := "mppi133h"
filter.User = &model.StringInput{Eq: &user}
page := &model.PageRequest{ItemsPerPage: 50, Page: 1}
order := &model.OrderByInput{Field: "startTime", Order: model.SortDirectionEnumDesc}
order := &model.OrderByInput{Field: "startTime", Type: "col", Order: model.SortDirectionEnumDesc}
b.Run("QueryJobs", func(b *testing.B) {
db := setup(b)

View File

@@ -8,12 +8,11 @@ import (
"context"
"database/sql"
"fmt"
"math"
"time"
"github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
"github.com/ClusterCockpit/cc-backend/internal/metricdata"
"github.com/ClusterCockpit/cc-backend/internal/metricDataDispatcher"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
"github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
@@ -22,7 +21,7 @@ import (
// GraphQL validation should make sure that no unkown values can be specified.
var groupBy2column = map[model.Aggregate]string{
model.AggregateUser: "job.user",
model.AggregateUser: "job.hpc_user",
model.AggregateProject: "job.project",
model.AggregateCluster: "job.cluster",
}
@@ -41,8 +40,8 @@ var sortBy2column = map[model.SortByAggregate]string{
func (r *JobRepository) buildCountQuery(
filter []*model.JobFilter,
kind string,
col string) sq.SelectBuilder {
col string,
) sq.SelectBuilder {
var query sq.SelectBuilder
if col != "" {
@@ -69,16 +68,16 @@ func (r *JobRepository) buildCountQuery(
func (r *JobRepository) buildStatsQuery(
filter []*model.JobFilter,
col string) sq.SelectBuilder {
col string,
) sq.SelectBuilder {
var query sq.SelectBuilder
castType := r.getCastType()
// fmt.Sprintf(`CAST(ROUND((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) / 3600) as %s) as value`, time.Now().Unix(), castType)
if col != "" {
// Scan columns: id, totalJobs, totalWalltime, totalNodes, totalNodeHours, totalCores, totalCoreHours, totalAccs, totalAccHours
query = sq.Select(col, "COUNT(job.id) as totalJobs",
// Scan columns: id, totalJobs, name, totalWalltime, totalNodes, totalNodeHours, totalCores, totalCoreHours, totalAccs, totalAccHours
query = sq.Select(col, "COUNT(job.id) as totalJobs", "name",
fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END)) / 3600) as %s) as totalWalltime`, time.Now().Unix(), castType),
fmt.Sprintf(`CAST(SUM(job.num_nodes) as %s) as totalNodes`, castType),
fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_nodes) / 3600) as %s) as totalNodeHours`, time.Now().Unix(), castType),
@@ -86,10 +85,9 @@ func (r *JobRepository) buildStatsQuery(
fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_hwthreads) / 3600) as %s) as totalCoreHours`, time.Now().Unix(), castType),
fmt.Sprintf(`CAST(SUM(job.num_acc) as %s) as totalAccs`, castType),
fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_acc) / 3600) as %s) as totalAccHours`, time.Now().Unix(), castType),
).From("job").GroupBy(col)
).From("job").LeftJoin("hpc_user ON hpc_user.username = job.hpc_user").GroupBy(col)
} else {
// Scan columns: totalJobs, totalWalltime, totalNodes, totalNodeHours, totalCores, totalCoreHours, totalAccs, totalAccHours
// Scan columns: totalJobs, name, totalWalltime, totalNodes, totalNodeHours, totalCores, totalCoreHours, totalAccs, totalAccHours
query = sq.Select("COUNT(job.id)",
fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END)) / 3600) as %s)`, time.Now().Unix(), castType),
fmt.Sprintf(`CAST(SUM(job.num_nodes) as %s)`, castType),
@@ -108,15 +106,15 @@ func (r *JobRepository) buildStatsQuery(
return query
}
func (r *JobRepository) getUserName(ctx context.Context, id string) string {
user := GetUserFromContext(ctx)
name, _ := r.FindColumnValue(user, id, "user", "name", "username", false)
if name != "" {
return name
} else {
return "-"
}
}
// func (r *JobRepository) getUserName(ctx context.Context, id string) string {
// user := GetUserFromContext(ctx)
// name, _ := r.FindColumnValue(user, id, "hpc_user", "name", "username", false)
// if name != "" {
// return name
// } else {
// return "-"
// }
// }
func (r *JobRepository) getCastType() string {
var castType string
@@ -138,8 +136,8 @@ func (r *JobRepository) JobsStatsGrouped(
filter []*model.JobFilter,
page *model.PageRequest,
sortBy *model.SortByAggregate,
groupBy *model.Aggregate) ([]*model.JobsStatistics, error) {
groupBy *model.Aggregate,
) ([]*model.JobsStatistics, error) {
start := time.Now()
col := groupBy2column[*groupBy]
query := r.buildStatsQuery(filter, col)
@@ -168,14 +166,20 @@ func (r *JobRepository) JobsStatsGrouped(
for rows.Next() {
var id sql.NullString
var name sql.NullString
var jobs, walltime, nodes, nodeHours, cores, coreHours, accs, accHours sql.NullInt64
if err := rows.Scan(&id, &jobs, &walltime, &nodes, &nodeHours, &cores, &coreHours, &accs, &accHours); err != nil {
if err := rows.Scan(&id, &jobs, &name, &walltime, &nodes, &nodeHours, &cores, &coreHours, &accs, &accHours); err != nil {
log.Warn("Error while scanning rows")
return nil, err
}
if id.Valid {
var totalJobs, totalWalltime, totalNodes, totalNodeHours, totalCores, totalCoreHours, totalAccs, totalAccHours int
var personName string
if name.Valid {
personName = name.String
}
if jobs.Valid {
totalJobs = int(jobs.Int64)
@@ -205,12 +209,12 @@ func (r *JobRepository) JobsStatsGrouped(
totalAccHours = int(accHours.Int64)
}
if col == "job.user" {
name := r.getUserName(ctx, id.String)
if col == "job.hpc_user" {
// name := r.getUserName(ctx, id.String)
stats = append(stats,
&model.JobsStatistics{
ID: id.String,
Name: name,
Name: personName,
TotalJobs: totalJobs,
TotalWalltime: totalWalltime,
TotalNodes: totalNodes,
@@ -218,7 +222,8 @@ func (r *JobRepository) JobsStatsGrouped(
TotalCores: totalCores,
TotalCoreHours: totalCoreHours,
TotalAccs: totalAccs,
TotalAccHours: totalAccHours})
TotalAccHours: totalAccHours,
})
} else {
stats = append(stats,
&model.JobsStatistics{
@@ -230,7 +235,8 @@ func (r *JobRepository) JobsStatsGrouped(
TotalCores: totalCores,
TotalCoreHours: totalCoreHours,
TotalAccs: totalAccs,
TotalAccHours: totalAccHours})
TotalAccHours: totalAccHours,
})
}
}
}
@@ -241,8 +247,8 @@ func (r *JobRepository) JobsStatsGrouped(
func (r *JobRepository) JobsStats(
ctx context.Context,
filter []*model.JobFilter) ([]*model.JobsStatistics, error) {
filter []*model.JobFilter,
) ([]*model.JobsStatistics, error) {
start := time.Now()
query := r.buildStatsQuery(filter, "")
query, err := SecurityCheck(ctx, query)
@@ -277,18 +283,36 @@ func (r *JobRepository) JobsStats(
TotalWalltime: int(walltime.Int64),
TotalNodeHours: totalNodeHours,
TotalCoreHours: totalCoreHours,
TotalAccHours: totalAccHours})
TotalAccHours: totalAccHours,
})
}
log.Debugf("Timer JobStats %s", time.Since(start))
return stats, nil
}
func LoadJobStat(job *schema.JobMeta, metric string, statType string) float64 {
if stats, ok := job.Statistics[metric]; ok {
switch statType {
case "avg":
return stats.Avg
case "max":
return stats.Max
case "min":
return stats.Min
default:
log.Errorf("Unknown stat type %s", statType)
}
}
return 0.0
}
func (r *JobRepository) JobCountGrouped(
ctx context.Context,
filter []*model.JobFilter,
groupBy *model.Aggregate) ([]*model.JobsStatistics, error) {
groupBy *model.Aggregate,
) ([]*model.JobsStatistics, error) {
start := time.Now()
col := groupBy2column[*groupBy]
query := r.buildCountQuery(filter, "", col)
@@ -315,7 +339,8 @@ func (r *JobRepository) JobCountGrouped(
stats = append(stats,
&model.JobsStatistics{
ID: id.String,
TotalJobs: int(cnt.Int64)})
TotalJobs: int(cnt.Int64),
})
}
}
@@ -328,8 +353,8 @@ func (r *JobRepository) AddJobCountGrouped(
filter []*model.JobFilter,
groupBy *model.Aggregate,
stats []*model.JobsStatistics,
kind string) ([]*model.JobsStatistics, error) {
kind string,
) ([]*model.JobsStatistics, error) {
start := time.Now()
col := groupBy2column[*groupBy]
query := r.buildCountQuery(filter, kind, col)
@@ -376,8 +401,8 @@ func (r *JobRepository) AddJobCount(
ctx context.Context,
filter []*model.JobFilter,
stats []*model.JobsStatistics,
kind string) ([]*model.JobsStatistics, error) {
kind string,
) ([]*model.JobsStatistics, error) {
start := time.Now()
query := r.buildCountQuery(filter, kind, "")
query, err := SecurityCheck(ctx, query)
@@ -420,15 +445,41 @@ func (r *JobRepository) AddJobCount(
func (r *JobRepository) AddHistograms(
ctx context.Context,
filter []*model.JobFilter,
stat *model.JobsStatistics) (*model.JobsStatistics, error) {
stat *model.JobsStatistics,
durationBins *string,
) (*model.JobsStatistics, error) {
start := time.Now()
var targetBinCount int
var targetBinSize int
switch {
case *durationBins == "1m": // 1 Minute Bins + Max 60 Bins -> Max 60 Minutes
targetBinCount = 60
targetBinSize = 60
case *durationBins == "10m": // 10 Minute Bins + Max 72 Bins -> Max 12 Hours
targetBinCount = 72
targetBinSize = 600
case *durationBins == "1h": // 1 Hour Bins + Max 48 Bins -> Max 48 Hours
targetBinCount = 48
targetBinSize = 3600
case *durationBins == "6h": // 6 Hour Bins + Max 12 Bins -> Max 3 Days
targetBinCount = 12
targetBinSize = 21600
case *durationBins == "12h": // 12 hour Bins + Max 14 Bins -> Max 7 Days
targetBinCount = 14
targetBinSize = 43200
default: // 24h
targetBinCount = 24
targetBinSize = 3600
}
castType := r.getCastType()
var err error
value := fmt.Sprintf(`CAST(ROUND((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) / 3600) as %s) as value`, time.Now().Unix(), castType)
stat.HistDuration, err = r.jobsStatisticsHistogram(ctx, value, filter)
// Return X-Values always as seconds, will be formatted into minutes and hours in frontend
value := fmt.Sprintf(`CAST(ROUND(((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) / %d) + 1) as %s) as value`, time.Now().Unix(), targetBinSize, castType)
stat.HistDuration, err = r.jobsDurationStatisticsHistogram(ctx, value, filter, targetBinSize, &targetBinCount)
if err != nil {
log.Warn("Error while loading job statistics histogram: running jobs")
log.Warn("Error while loading job statistics histogram: job duration")
return nil, err
}
@@ -459,14 +510,16 @@ func (r *JobRepository) AddMetricHistograms(
ctx context.Context,
filter []*model.JobFilter,
metrics []string,
stat *model.JobsStatistics) (*model.JobsStatistics, error) {
stat *model.JobsStatistics,
targetBinCount *int,
) (*model.JobsStatistics, error) {
start := time.Now()
// Running Jobs Only: First query jobdata from sqlite, then query data and make bins
for _, f := range filter {
if f.State != nil {
if len(f.State) == 1 && f.State[0] == "running" {
stat.HistMetrics = r.runningJobsMetricStatisticsHistogram(ctx, metrics, filter)
stat.HistMetrics = r.runningJobsMetricStatisticsHistogram(ctx, metrics, filter, targetBinCount)
log.Debugf("Timer AddMetricHistograms %s", time.Since(start))
return stat, nil
}
@@ -475,7 +528,7 @@ func (r *JobRepository) AddMetricHistograms(
// All other cases: Query and make bins in sqlite directly
for _, m := range metrics {
metricHisto, err := r.jobsMetricStatisticsHistogram(ctx, m, filter)
metricHisto, err := r.jobsMetricStatisticsHistogram(ctx, m, filter, targetBinCount)
if err != nil {
log.Warnf("Error while loading job metric statistics histogram: %s", m)
continue
@@ -491,8 +544,8 @@ func (r *JobRepository) AddMetricHistograms(
func (r *JobRepository) jobsStatisticsHistogram(
ctx context.Context,
value string,
filters []*model.JobFilter) ([]*model.HistoPoint, error) {
filters []*model.JobFilter,
) ([]*model.HistoPoint, error) {
start := time.Now()
query, qerr := SecurityCheck(ctx,
sq.Select(value, "COUNT(job.id) AS count").From("job"))
@@ -512,6 +565,7 @@ func (r *JobRepository) jobsStatisticsHistogram(
}
points := make([]*model.HistoPoint, 0)
// is it possible to introduce zero values here? requires info about bincount
for rows.Next() {
point := model.HistoPoint{}
if err := rows.Scan(&point.Value, &point.Count); err != nil {
@@ -525,39 +579,79 @@ func (r *JobRepository) jobsStatisticsHistogram(
return points, nil
}
func (r *JobRepository) jobsDurationStatisticsHistogram(
ctx context.Context,
value string,
filters []*model.JobFilter,
binSizeSeconds int,
targetBinCount *int,
) ([]*model.HistoPoint, error) {
start := time.Now()
query, qerr := SecurityCheck(ctx,
sq.Select(value, "COUNT(job.id) AS count").From("job"))
if qerr != nil {
return nil, qerr
}
// Setup Array
points := make([]*model.HistoPoint, 0)
for i := 1; i <= *targetBinCount; i++ {
point := model.HistoPoint{Value: i * binSizeSeconds, Count: 0}
points = append(points, &point)
}
for _, f := range filters {
query = BuildWhereClause(f, query)
}
rows, err := query.GroupBy("value").RunWith(r.DB).Query()
if err != nil {
log.Error("Error while running query")
return nil, err
}
// Fill Array at matching $Value
for rows.Next() {
point := model.HistoPoint{}
if err := rows.Scan(&point.Value, &point.Count); err != nil {
log.Warn("Error while scanning rows")
return nil, err
}
for _, e := range points {
if e.Value == (point.Value * binSizeSeconds) {
// Note:
// Matching on unmodified integer value (and multiplying point.Value by binSizeSeconds after match)
// causes frontend to loop into highest targetBinCount, due to zoom condition instantly being fullfilled (cause unknown)
e.Count = point.Count
break
}
}
}
log.Debugf("Timer jobsStatisticsHistogram %s", time.Since(start))
return points, nil
}
func (r *JobRepository) jobsMetricStatisticsHistogram(
ctx context.Context,
metric string,
filters []*model.JobFilter) (*model.MetricHistoPoints, error) {
var dbMetric string
switch metric {
case "cpu_load":
dbMetric = "load_avg"
case "flops_any":
dbMetric = "flops_any_avg"
case "mem_bw":
dbMetric = "mem_bw_avg"
case "mem_used":
dbMetric = "mem_used_max"
case "net_bw":
dbMetric = "net_bw_avg"
case "file_bw":
dbMetric = "file_bw_avg"
default:
return nil, fmt.Errorf("%s not implemented", metric)
}
filters []*model.JobFilter,
bins *int,
) (*model.MetricHistoPoints, error) {
// Get specific Peak or largest Peak
var metricConfig *schema.MetricConfig
var peak float64 = 0.0
var unit string = ""
var peak float64
var unit string
var footprintStat string
for _, f := range filters {
if f.Cluster != nil {
metricConfig = archive.GetMetricConfig(*f.Cluster.Eq, metric)
peak = metricConfig.Peak
unit = metricConfig.Unit.Prefix + metricConfig.Unit.Base
footprintStat = metricConfig.Footprint
log.Debugf("Cluster %s filter found with peak %f for %s", *f.Cluster.Eq, peak, metric)
}
}
@@ -572,58 +666,40 @@ func (r *JobRepository) jobsMetricStatisticsHistogram(
if unit == "" {
unit = m.Unit.Prefix + m.Unit.Base
}
if footprintStat == "" {
footprintStat = m.Footprint
}
}
}
}
}
// log.Debugf("Metric %s: DB %s, Peak %f, Unit %s", metric, dbMetric, peak, unit)
// Make bins, see https://jereze.com/code/sql-histogram/
// log.Debugf("Metric %s, Peak %f, Unit %s", metric, peak, unit)
// Make bins, see https://jereze.com/code/sql-histogram/ (Modified here)
start := time.Now()
crossJoinQuery := sq.Select(
fmt.Sprintf(`max(%s) as max`, dbMetric),
fmt.Sprintf(`min(%s) as min`, dbMetric),
).From("job").Where(
fmt.Sprintf(`%s is not null`, dbMetric),
).Where(
fmt.Sprintf(`%s <= %f`, dbMetric, peak),
)
crossJoinQuery, cjqerr := SecurityCheck(ctx, crossJoinQuery)
if cjqerr != nil {
return nil, cjqerr
}
for _, f := range filters {
crossJoinQuery = BuildWhereClause(f, crossJoinQuery)
}
crossJoinQuerySql, crossJoinQueryArgs, sqlerr := crossJoinQuery.ToSql()
if sqlerr != nil {
return nil, sqlerr
}
bins := 10
binQuery := fmt.Sprintf(`CAST( (case when job.%s = value.max then value.max*0.999999999 else job.%s end - value.min) / (value.max - value.min) * %d as INTEGER )`, dbMetric, dbMetric, bins)
// Find Jobs' Value Bin Number: Divide Value by Peak, Multiply by RequestedBins, then CAST to INT: Gets Bin-Number of Job
binQuery := fmt.Sprintf(`CAST(
((case when json_extract(footprint, "$.%s") = %f then %f*0.999999999 else json_extract(footprint, "$.%s") end) / %f)
* %v as INTEGER )`,
(metric + "_" + footprintStat), peak, peak, (metric + "_" + footprintStat), peak, *bins)
mainQuery := sq.Select(
fmt.Sprintf(`%s + 1 as bin`, binQuery),
fmt.Sprintf(`count(job.%s) as count`, dbMetric),
fmt.Sprintf(`CAST(((value.max / %d) * (%s )) as INTEGER ) as min`, bins, binQuery),
fmt.Sprintf(`CAST(((value.max / %d) * (%s + 1 )) as INTEGER ) as max`, bins, binQuery),
).From("job").CrossJoin(
fmt.Sprintf(`(%s) as value`, crossJoinQuerySql), crossJoinQueryArgs...,
).Where(fmt.Sprintf(`job.%s is not null and job.%s <= %f`, dbMetric, dbMetric, peak))
fmt.Sprintf(`count(*) as count`),
// For Debug: // fmt.Sprintf(`CAST((%f / %d) as INTEGER ) * %s as min`, peak, *bins, binQuery),
// For Debug: // fmt.Sprintf(`CAST((%f / %d) as INTEGER ) * (%s + 1) as max`, peak, *bins, binQuery),
).From("job").Where(
"JSON_VALID(footprint)",
).Where(fmt.Sprintf(`json_extract(footprint, "$.%s") is not null and json_extract(footprint, "$.%s") <= %f`, (metric + "_" + footprintStat), (metric + "_" + footprintStat), peak))
// Only accessible Jobs...
mainQuery, qerr := SecurityCheck(ctx, mainQuery)
if qerr != nil {
return nil, qerr
}
// Filters...
for _, f := range filters {
mainQuery = BuildWhereClause(f, mainQuery)
}
@@ -637,18 +713,41 @@ func (r *JobRepository) jobsMetricStatisticsHistogram(
return nil, err
}
// Setup Return Array With Bin-Numbers for Match and Min/Max based on Peak
points := make([]*model.MetricHistoPoint, 0)
for rows.Next() {
point := model.MetricHistoPoint{}
if err := rows.Scan(&point.Bin, &point.Count, &point.Min, &point.Max); err != nil {
log.Warnf("Error while scanning rows for %s", metric)
return nil, err // Totally bricks cc-backend if returned and if all metrics requested?
}
points = append(points, &point)
binStep := int(peak) / *bins
for i := 1; i <= *bins; i++ {
binMin := (binStep * (i - 1))
binMax := (binStep * i)
epoint := model.MetricHistoPoint{Bin: &i, Count: 0, Min: &binMin, Max: &binMax}
points = append(points, &epoint)
}
result := model.MetricHistoPoints{Metric: metric, Unit: unit, Data: points}
for rows.Next() { // Fill Count if Bin-No. Matches (Not every Bin exists in DB!)
rpoint := model.MetricHistoPoint{}
if err := rows.Scan(&rpoint.Bin, &rpoint.Count); err != nil { // Required for Debug: &rpoint.Min, &rpoint.Max
log.Warnf("Error while scanning rows for %s", metric)
return nil, err // FIXME: Totally bricks cc-backend if returned and if all metrics requested?
}
for _, e := range points {
if e.Bin != nil && rpoint.Bin != nil {
if *e.Bin == *rpoint.Bin {
e.Count = rpoint.Count
// Only Required For Debug: Check DB returned Min/Max against Backend Init above
// if rpoint.Min != nil {
// log.Warnf(">>>> Bin %d Min Set For %s to %d (Init'd with: %d)", *e.Bin, metric, *rpoint.Min, *e.Min)
// }
// if rpoint.Max != nil {
// log.Warnf(">>>> Bin %d Max Set For %s to %d (Init'd with: %d)", *e.Bin, metric, *rpoint.Max, *e.Max)
// }
break
}
}
}
}
result := model.MetricHistoPoints{Metric: metric, Unit: unit, Stat: &footprintStat, Data: points}
log.Debugf("Timer jobsStatisticsHistogram %s", time.Since(start))
return &result, nil
@@ -657,7 +756,9 @@ func (r *JobRepository) jobsMetricStatisticsHistogram(
func (r *JobRepository) runningJobsMetricStatisticsHistogram(
ctx context.Context,
metrics []string,
filters []*model.JobFilter) []*model.MetricHistoPoints {
filters []*model.JobFilter,
bins *int,
) []*model.MetricHistoPoints {
// Get Jobs
jobs, err := r.QueryJobs(ctx, filters, &model.PageRequest{Page: 1, ItemsPerPage: 500 + 1}, nil)
@@ -681,7 +782,7 @@ func (r *JobRepository) runningJobsMetricStatisticsHistogram(
continue
}
if err := metricdata.LoadAverages(job, metrics, avgs, ctx); err != nil {
if err := metricDataDispatcher.LoadAverages(job, metrics, avgs, ctx); err != nil {
log.Errorf("Error while loading averages for histogram: %s", err)
return nil
}
@@ -692,15 +793,14 @@ func (r *JobRepository) runningJobsMetricStatisticsHistogram(
for idx, metric := range metrics {
// Get specific Peak or largest Peak
var metricConfig *schema.MetricConfig
var peak float64 = 0.0
var unit string = ""
var peak float64
var unit string
for _, f := range filters {
if f.Cluster != nil {
metricConfig = archive.GetMetricConfig(*f.Cluster.Eq, metric)
peak = metricConfig.Peak
unit = metricConfig.Unit.Prefix + metricConfig.Unit.Base
log.Debugf("Cluster %s filter found with peak %f for %s", *f.Cluster.Eq, peak, metric)
}
}
@@ -720,28 +820,24 @@ func (r *JobRepository) runningJobsMetricStatisticsHistogram(
}
// Make and fill bins
bins := 10.0
peakBin := peak / bins
peakBin := int(peak) / *bins
points := make([]*model.MetricHistoPoint, 0)
for b := 0; b < 10; b++ {
for b := 0; b < *bins; b++ {
count := 0
bindex := b + 1
bmin := math.Round(peakBin * float64(b))
bmax := math.Round(peakBin * (float64(b) + 1.0))
bmin := peakBin * b
bmax := peakBin * (b + 1)
// Iterate AVG values for indexed metric and count for bins
for _, val := range avgs[idx] {
if float64(val) >= bmin && float64(val) < bmax {
if int(val) >= bmin && int(val) < bmax {
count += 1
}
}
bminint := int(bmin)
bmaxint := int(bmax)
// Append Bin to Metric Result Array
point := model.MetricHistoPoint{Bin: &bindex, Count: count, Min: &bminint, Max: &bmaxint}
point := model.MetricHistoPoint{Bin: &bindex, Count: count, Min: &bmin, Max: &bmax}
points = append(points, &point)
}

View File

@@ -5,6 +5,7 @@
package repository
import (
"fmt"
"strings"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
@@ -14,7 +15,13 @@ import (
)
// Add the tag with id `tagId` to the job with the database id `jobId`.
func (r *JobRepository) AddTag(job int64, tag int64) ([]*schema.Tag, error) {
func (r *JobRepository) AddTag(user *schema.User, job int64, tag int64) ([]*schema.Tag, error) {
j, err := r.FindByIdWithUser(user, job)
if err != nil {
log.Warn("Error while finding job by id")
return nil, err
}
q := sq.Insert("jobtag").Columns("job_id", "tag_id").Values(job, tag)
if _, err := q.RunWith(r.stmtCache).Exec(); err != nil {
@@ -23,49 +30,153 @@ func (r *JobRepository) AddTag(job int64, tag int64) ([]*schema.Tag, error) {
return nil, err
}
j, err := r.FindById(job)
if err != nil {
log.Warn("Error while finding job by id")
return nil, err
}
tags, err := r.GetTags(&job)
tags, err := r.GetTags(user, &job)
if err != nil {
log.Warn("Error while getting tags for job")
return nil, err
}
return tags, archive.UpdateTags(j, tags)
archiveTags, err := r.getArchiveTags(&job)
if err != nil {
log.Warn("Error while getting tags for job")
return nil, err
}
return tags, archive.UpdateTags(j, archiveTags)
}
// Removes a tag from a job
func (r *JobRepository) RemoveTag(job, tag int64) ([]*schema.Tag, error) {
// Removes a tag from a job by tag id
func (r *JobRepository) RemoveTag(user *schema.User, job, tag int64) ([]*schema.Tag, error) {
j, err := r.FindByIdWithUser(user, job)
if err != nil {
log.Warn("Error while finding job by id")
return nil, err
}
q := sq.Delete("jobtag").Where("jobtag.job_id = ?", job).Where("jobtag.tag_id = ?", tag)
if _, err := q.RunWith(r.stmtCache).Exec(); err != nil {
s, _, _ := q.ToSql()
log.Errorf("Error adding tag with %s: %v", s, err)
log.Errorf("Error removing tag with %s: %v", s, err)
return nil, err
}
j, err := r.FindById(job)
if err != nil {
log.Warn("Error while finding job by id")
return nil, err
}
tags, err := r.GetTags(&job)
tags, err := r.GetTags(user, &job)
if err != nil {
log.Warn("Error while getting tags for job")
return nil, err
}
return tags, archive.UpdateTags(j, tags)
archiveTags, err := r.getArchiveTags(&job)
if err != nil {
log.Warn("Error while getting tags for job")
return nil, err
}
return tags, archive.UpdateTags(j, archiveTags)
}
// Removes a tag from a job by tag info
func (r *JobRepository) RemoveJobTagByRequest(user *schema.User, job int64, tagType string, tagName string, tagScope string) ([]*schema.Tag, error) {
// Get Tag ID to delete
tagID, exists := r.TagId(tagType, tagName, tagScope)
if !exists {
log.Warnf("Tag does not exist (name, type, scope): %s, %s, %s", tagName, tagType, tagScope)
return nil, fmt.Errorf("Tag does not exist (name, type, scope): %s, %s, %s", tagName, tagType, tagScope)
}
// Get Job
j, err := r.FindByIdWithUser(user, job)
if err != nil {
log.Warn("Error while finding job by id")
return nil, err
}
// Handle Delete
q := sq.Delete("jobtag").Where("jobtag.job_id = ?", job).Where("jobtag.tag_id = ?", tagID)
if _, err := q.RunWith(r.stmtCache).Exec(); err != nil {
s, _, _ := q.ToSql()
log.Errorf("Error removing tag from table 'jobTag' with %s: %v", s, err)
return nil, err
}
tags, err := r.GetTags(user, &job)
if err != nil {
log.Warn("Error while getting tags for job")
return nil, err
}
archiveTags, err := r.getArchiveTags(&job)
if err != nil {
log.Warn("Error while getting tags for job")
return nil, err
}
return tags, archive.UpdateTags(j, archiveTags)
}
// Removes a tag from db by tag info
func (r *JobRepository) RemoveTagByRequest(tagType string, tagName string, tagScope string) error {
// Get Tag ID to delete
tagID, exists := r.TagId(tagType, tagName, tagScope)
if !exists {
log.Warnf("Tag does not exist (name, type, scope): %s, %s, %s", tagName, tagType, tagScope)
return fmt.Errorf("Tag does not exist (name, type, scope): %s, %s, %s", tagName, tagType, tagScope)
}
// Handle Delete JobTagTable
qJobTag := sq.Delete("jobtag").Where("jobtag.tag_id = ?", tagID)
if _, err := qJobTag.RunWith(r.stmtCache).Exec(); err != nil {
s, _, _ := qJobTag.ToSql()
log.Errorf("Error removing tag from table 'jobTag' with %s: %v", s, err)
return err
}
// Handle Delete TagTable
qTag := sq.Delete("tag").Where("tag.id = ?", tagID)
if _, err := qTag.RunWith(r.stmtCache).Exec(); err != nil {
s, _, _ := qTag.ToSql()
log.Errorf("Error removing tag from table 'tag' with %s: %v", s, err)
return err
}
return nil
}
// Removes a tag from db by tag id
func (r *JobRepository) RemoveTagById(tagID int64) error {
// Handle Delete JobTagTable
qJobTag := sq.Delete("jobtag").Where("jobtag.tag_id = ?", tagID)
if _, err := qJobTag.RunWith(r.stmtCache).Exec(); err != nil {
s, _, _ := qJobTag.ToSql()
log.Errorf("Error removing tag from table 'jobTag' with %s: %v", s, err)
return err
}
// Handle Delete TagTable
qTag := sq.Delete("tag").Where("tag.id = ?", tagID)
if _, err := qTag.RunWith(r.stmtCache).Exec(); err != nil {
s, _, _ := qTag.ToSql()
log.Errorf("Error removing tag from table 'tag' with %s: %v", s, err)
return err
}
return nil
}
// CreateTag creates a new tag with the specified type and name and returns its database id.
func (r *JobRepository) CreateTag(tagType string, tagName string) (tagId int64, err error) {
q := sq.Insert("tag").Columns("tag_type", "tag_name").Values(tagType, tagName)
func (r *JobRepository) CreateTag(tagType string, tagName string, tagScope string) (tagId int64, err error) {
// Default to "Global" scope if none defined
if tagScope == "" {
tagScope = "global"
}
q := sq.Insert("tag").Columns("tag_type", "tag_name", "tag_scope").Values(tagType, tagName, tagScope)
res, err := q.RunWith(r.stmtCache).Exec()
if err != nil {
@@ -78,8 +189,9 @@ func (r *JobRepository) CreateTag(tagType string, tagName string) (tagId int64,
}
func (r *JobRepository) CountTags(user *schema.User) (tags []schema.Tag, counts map[string]int, err error) {
// Fetch all Tags in DB for Display in Frontend Tag-View
tags = make([]schema.Tag, 0, 100)
xrows, err := r.DB.Queryx("SELECT id, tag_type, tag_name FROM tag")
xrows, err := r.DB.Queryx("SELECT id, tag_type, tag_name, tag_scope FROM tag")
if err != nil {
return nil, nil, err
}
@@ -89,22 +201,42 @@ func (r *JobRepository) CountTags(user *schema.User) (tags []schema.Tag, counts
if err = xrows.StructScan(&t); err != nil {
return nil, nil, err
}
tags = append(tags, t)
// Handle Scope Filtering: Tag Scope is Global, Private (== Username) or User is auth'd to view Admin Tags
readable, err := r.checkScopeAuth(user, "read", t.Scope)
if err != nil {
return nil, nil, err
}
if readable {
tags = append(tags, t)
}
}
q := sq.Select("t.tag_name, count(jt.tag_id)").
// Query and Count Jobs with attached Tags
q := sq.Select("t.tag_name, t.id, count(jt.tag_id)").
From("tag t").
LeftJoin("jobtag jt ON t.id = jt.tag_id").
GroupBy("t.tag_name")
// Handle Scope Filtering
scopeList := "\"global\""
if user != nil {
scopeList += ",\"" + user.Username + "\""
}
if user.HasAnyRole([]schema.Role{schema.RoleAdmin, schema.RoleSupport}) {
scopeList += ",\"admin\""
}
q = q.Where("t.tag_scope IN (" + scopeList + ")")
// Handle Job Ownership
if user != nil && user.HasAnyRole([]schema.Role{schema.RoleAdmin, schema.RoleSupport}) { // ADMIN || SUPPORT: Count all jobs
log.Debug("CountTags: User Admin or Support -> Count all Jobs for Tags")
// log.Debug("CountTags: User Admin or Support -> Count all Jobs for Tags")
// Unchanged: Needs to be own case still, due to UserRole/NoRole compatibility handling in else case
} else if user != nil && user.HasRole(schema.RoleManager) { // MANAGER: Count own jobs plus project's jobs
// Build ("project1", "project2", ...) list of variable length directly in SQL string
q = q.Where("jt.job_id IN (SELECT id FROM job WHERE job.user = ? OR job.project IN (\""+strings.Join(user.Projects, "\",\"")+"\"))", user.Username)
q = q.Where("jt.job_id IN (SELECT id FROM job WHERE job.hpc_user = ? OR job.project IN (\""+strings.Join(user.Projects, "\",\"")+"\"))", user.Username)
} else if user != nil { // USER OR NO ROLE (Compatibility): Only count own jobs
q = q.Where("jt.job_id IN (SELECT id FROM job WHERE job.user = ?)", user.Username)
q = q.Where("jt.job_id IN (SELECT id FROM job WHERE job.hpc_user = ?)", user.Username)
}
rows, err := q.RunWith(r.stmtCache).Query()
@@ -115,29 +247,44 @@ func (r *JobRepository) CountTags(user *schema.User) (tags []schema.Tag, counts
counts = make(map[string]int)
for rows.Next() {
var tagName string
var tagId int
var count int
if err = rows.Scan(&tagName, &count); err != nil {
if err = rows.Scan(&tagName, &tagId, &count); err != nil {
return nil, nil, err
}
counts[tagName] = count
// Use tagId as second Map-Key component to differentiate tags with identical names
counts[fmt.Sprint(tagName, tagId)] = count
}
err = rows.Err()
return
return tags, counts, err
}
// AddTagOrCreate adds the tag with the specified type and name to the job with the database id `jobId`.
// If such a tag does not yet exist, it is created.
func (r *JobRepository) AddTagOrCreate(jobId int64, tagType string, tagName string) (tagId int64, err error) {
tagId, exists := r.TagId(tagType, tagName)
func (r *JobRepository) AddTagOrCreate(user *schema.User, jobId int64, tagType string, tagName string, tagScope string) (tagId int64, err error) {
// Default to "Global" scope if none defined
if tagScope == "" {
tagScope = "global"
}
writable, err := r.checkScopeAuth(user, "write", tagScope)
if err != nil {
return 0, err
}
if !writable {
return 0, fmt.Errorf("cannot write tag scope with current authorization")
}
tagId, exists := r.TagId(tagType, tagName, tagScope)
if !exists {
tagId, err = r.CreateTag(tagType, tagName)
tagId, err = r.CreateTag(tagType, tagName, tagScope)
if err != nil {
return 0, err
}
}
if _, err := r.AddTag(jobId, tagId); err != nil {
if _, err := r.AddTag(user, jobId, tagId); err != nil {
return 0, err
}
@@ -158,19 +305,29 @@ func (r *JobRepository) HasTag(jobId int64, tagType string, tagName string) bool
}
// TagId returns the database id of the tag with the specified type and name.
func (r *JobRepository) TagId(tagType string, tagName string) (tagId int64, exists bool) {
func (r *JobRepository) TagId(tagType string, tagName string, tagScope string) (tagId int64, exists bool) {
exists = true
if err := sq.Select("id").From("tag").
Where("tag.tag_type = ?", tagType).Where("tag.tag_name = ?", tagName).
Where("tag.tag_type = ?", tagType).Where("tag.tag_name = ?", tagName).Where("tag.tag_scope = ?", tagScope).
RunWith(r.stmtCache).QueryRow().Scan(&tagId); err != nil {
exists = false
}
return
}
// GetTags returns a list of all tags if job is nil or of the tags that the job with that database ID has.
func (r *JobRepository) GetTags(job *int64) ([]*schema.Tag, error) {
q := sq.Select("id", "tag_type", "tag_name").From("tag")
// TagInfo returns the database infos of the tag with the specified id.
func (r *JobRepository) TagInfo(tagId int64) (tagType string, tagName string, tagScope string, exists bool) {
exists = true
if err := sq.Select("tag.tag_type", "tag.tag_name", "tag.tag_scope").From("tag").Where("tag.id = ?", tagId).
RunWith(r.stmtCache).QueryRow().Scan(&tagType, &tagName, &tagScope); err != nil {
exists = false
}
return
}
// GetTags returns a list of all scoped tags if job is nil or of the tags that the job with that database ID has.
func (r *JobRepository) GetTags(user *schema.User, job *int64) ([]*schema.Tag, error) {
q := sq.Select("id", "tag_type", "tag_name", "tag_scope").From("tag")
if job != nil {
q = q.Join("jobtag ON jobtag.tag_id = tag.id").Where("jobtag.job_id = ?", *job)
}
@@ -185,7 +342,41 @@ func (r *JobRepository) GetTags(job *int64) ([]*schema.Tag, error) {
tags := make([]*schema.Tag, 0)
for rows.Next() {
tag := &schema.Tag{}
if err := rows.Scan(&tag.ID, &tag.Type, &tag.Name); err != nil {
if err := rows.Scan(&tag.ID, &tag.Type, &tag.Name, &tag.Scope); err != nil {
log.Warn("Error while scanning rows")
return nil, err
}
// Handle Scope Filtering: Tag Scope is Global, Private (== Username) or User is auth'd to view Admin Tags
readable, err := r.checkScopeAuth(user, "read", tag.Scope)
if err != nil {
return nil, err
}
if readable {
tags = append(tags, tag)
}
}
return tags, nil
}
// GetArchiveTags returns a list of all tags *regardless of scope* for archiving if job is nil or of the tags that the job with that database ID has.
func (r *JobRepository) getArchiveTags(job *int64) ([]*schema.Tag, error) {
q := sq.Select("id", "tag_type", "tag_name", "tag_scope").From("tag")
if job != nil {
q = q.Join("jobtag ON jobtag.tag_id = tag.id").Where("jobtag.job_id = ?", *job)
}
rows, err := q.RunWith(r.stmtCache).Query()
if err != nil {
s, _, _ := q.ToSql()
log.Errorf("Error get tags with %s: %v", s, err)
return nil, err
}
tags := make([]*schema.Tag, 0)
for rows.Next() {
tag := &schema.Tag{}
if err := rows.Scan(&tag.ID, &tag.Type, &tag.Name, &tag.Scope); err != nil {
log.Warn("Error while scanning rows")
return nil, err
}
@@ -194,3 +385,59 @@ func (r *JobRepository) GetTags(job *int64) ([]*schema.Tag, error) {
return tags, nil
}
func (r *JobRepository) ImportTag(jobId int64, tagType string, tagName string, tagScope string) (err error) {
// Import has no scope ctx, only import from metafile to DB (No recursive archive update required), only returns err
tagId, exists := r.TagId(tagType, tagName, tagScope)
if !exists {
tagId, err = r.CreateTag(tagType, tagName, tagScope)
if err != nil {
return err
}
}
q := sq.Insert("jobtag").Columns("job_id", "tag_id").Values(jobId, tagId)
if _, err := q.RunWith(r.stmtCache).Exec(); err != nil {
s, _, _ := q.ToSql()
log.Errorf("Error adding tag on import with %s: %v", s, err)
return err
}
return nil
}
func (r *JobRepository) checkScopeAuth(user *schema.User, operation string, scope string) (pass bool, err error) {
if user != nil {
switch {
case operation == "write" && scope == "admin":
if user.HasRole(schema.RoleAdmin) || (len(user.Roles) == 1 && user.HasRole(schema.RoleApi)) {
return true, nil
}
return false, nil
case operation == "write" && scope == "global":
if user.HasAnyRole([]schema.Role{schema.RoleAdmin, schema.RoleSupport}) || (len(user.Roles) == 1 && user.HasRole(schema.RoleApi)) {
return true, nil
}
return false, nil
case operation == "write" && scope == user.Username:
return true, nil
case operation == "read" && scope == "admin":
return user.HasAnyRole([]schema.Role{schema.RoleAdmin, schema.RoleSupport}), nil
case operation == "read" && scope == "global":
return true, nil
case operation == "read" && scope == user.Username:
return true, nil
default:
if operation == "read" || operation == "write" {
// No acceptable scope: deny tag
return false, nil
} else {
return false, fmt.Errorf("error while checking tag operation auth: unknown operation (%s)", operation)
}
}
} else {
return false, fmt.Errorf("error while checking tag operation auth: no user in context")
}
}

Binary file not shown.

Binary file not shown.

View File

@@ -6,7 +6,6 @@ package repository
import (
"github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
"github.com/jmoiron/sqlx"
)
@@ -18,20 +17,12 @@ type Transaction struct {
func (r *JobRepository) TransactionInit() (*Transaction, error) {
var err error
t := new(Transaction)
// Inserts are bundled into transactions because in sqlite,
// that speeds up inserts A LOT.
t.tx, err = r.DB.Beginx()
if err != nil {
log.Warn("Error while bundling transactions")
return nil, err
}
t.stmt, err = t.tx.PrepareNamed(NamedJobInsert)
if err != nil {
log.Warn("Error while preparing namedJobInsert")
return nil, err
}
return t, nil
}
@@ -50,7 +41,6 @@ func (r *JobRepository) TransactionCommit(t *Transaction) error {
return err
}
t.stmt = t.tx.NamedStmt(t.stmt)
return nil
}
@@ -59,14 +49,17 @@ func (r *JobRepository) TransactionEnd(t *Transaction) error {
log.Warn("Error while committing SQL transactions")
return err
}
return nil
}
func (r *JobRepository) TransactionAdd(t *Transaction, job schema.Job) (int64, error) {
res, err := t.stmt.Exec(job)
func (r *JobRepository) TransactionAddNamed(
t *Transaction,
query string,
args ...interface{},
) (int64, error) {
res, err := t.tx.NamedExec(query, args)
if err != nil {
log.Errorf("repository initDB(): %v", err)
log.Errorf("Named Exec failed: %v", err)
return 0, err
}
@@ -79,26 +72,19 @@ func (r *JobRepository) TransactionAdd(t *Transaction, job schema.Job) (int64, e
return id, nil
}
func (r *JobRepository) TransactionAddTag(t *Transaction, tag *schema.Tag) (int64, error) {
res, err := t.tx.Exec(`INSERT INTO tag (tag_name, tag_type) VALUES (?, ?)`, tag.Name, tag.Type)
func (r *JobRepository) TransactionAdd(t *Transaction, query string, args ...interface{}) (int64, error) {
res, err := t.tx.Exec(query, args...)
if err != nil {
log.Errorf("Error while inserting tag into tag table: %v (Type %v)", tag.Name, tag.Type)
return 0, err
}
tagId, err := res.LastInsertId()
if err != nil {
log.Warn("Error while getting last insert ID")
log.Errorf("TransactionAdd(), Exec() Error: %v", err)
return 0, err
}
return tagId, nil
}
func (r *JobRepository) TransactionSetTag(t *Transaction, jobId int64, tagId int64) error {
if _, err := t.tx.Exec(`INSERT INTO jobtag (job_id, tag_id) VALUES (?, ?)`, jobId, tagId); err != nil {
log.Errorf("Error while inserting jobtag into jobtag table: %v (TagID %v)", jobId, tagId)
return err
id, err := res.LastInsertId()
if err != nil {
log.Errorf("TransactionAdd(), LastInsertId() Error: %v", err)
return 0, err
}
return nil
return id, nil
}

View File

@@ -19,6 +19,7 @@ import (
sq "github.com/Masterminds/squirrel"
"github.com/jmoiron/sqlx"
"golang.org/x/crypto/bcrypt"
"github.com/ClusterCockpit/cc-backend/internal/config"
)
var (
@@ -46,8 +47,8 @@ func GetUserRepository() *UserRepository {
func (r *UserRepository) GetUser(username string) (*schema.User, error) {
user := &schema.User{Username: username}
var hashedPassword, name, rawRoles, email, rawProjects sql.NullString
if err := sq.Select("password", "ldap", "name", "roles", "email", "projects").From("user").
Where("user.username = ?", username).RunWith(r.DB).
if err := sq.Select("password", "ldap", "name", "roles", "email", "projects").From("hpc_user").
Where("hpc_user.username = ?", username).RunWith(r.DB).
QueryRow().Scan(&hashedPassword, &user.AuthSource, &name, &rawRoles, &email, &rawProjects); err != nil {
log.Warnf("Error while querying user '%v' from database", username)
return nil, err
@@ -72,9 +73,8 @@ func (r *UserRepository) GetUser(username string) (*schema.User, error) {
}
func (r *UserRepository) GetLdapUsernames() ([]string, error) {
var users []string
rows, err := r.DB.Query(`SELECT username FROM user WHERE user.ldap = 1`)
rows, err := r.DB.Query(`SELECT username FROM hpc_user WHERE hpc_user.ldap = 1`)
if err != nil {
log.Warn("Error while querying usernames")
return nil, err
@@ -122,18 +122,62 @@ func (r *UserRepository) AddUser(user *schema.User) error {
vals = append(vals, int(user.AuthSource))
}
if _, err := sq.Insert("user").Columns(cols...).Values(vals...).RunWith(r.DB).Exec(); err != nil {
if _, err := sq.Insert("hpc_user").Columns(cols...).Values(vals...).RunWith(r.DB).Exec(); err != nil {
log.Errorf("Error while inserting new user '%v' into DB", user.Username)
return err
}
log.Infof("new user %#v created (roles: %s, auth-source: %d, projects: %s)", user.Username, rolesJson, user.AuthSource, projectsJson)
defaultMetricsCfg, err := config.LoadDefaultMetricsConfig()
if err != nil {
log.Errorf("Error loading default metrics config: %v", err)
} else if defaultMetricsCfg != nil {
for _, cluster := range defaultMetricsCfg.Clusters {
metricsArray := config.ParseMetricsString(cluster.DefaultMetrics)
metricsJSON, err := json.Marshal(metricsArray)
if err != nil {
log.Errorf("Error marshaling default metrics for cluster %s: %v", cluster.Name, err)
continue
}
confKey := "job_view_selectedMetrics:" + cluster.Name
if _, err := sq.Insert("configuration").
Columns("username", "confkey", "value").
Values(user.Username, confKey, string(metricsJSON)).
RunWith(r.DB).Exec(); err != nil {
log.Errorf("Error inserting default job view metrics for user %s and cluster %s: %v", user.Username, cluster.Name, err)
} else {
log.Infof("Default job view metrics for user %s and cluster %s set to %s", user.Username, cluster.Name, string(metricsJSON))
}
}
}
return nil
}
func (r *UserRepository) UpdateUser(dbUser *schema.User, user *schema.User) error {
// user contains updated info, apply to dbuser
// TODO: Discuss updatable fields
if dbUser.Name != user.Name {
if _, err := sq.Update("hpc_user").Set("name", user.Name).Where("hpc_user.username = ?", dbUser.Username).RunWith(r.DB).Exec(); err != nil {
log.Errorf("error while updating name of user '%s'", user.Username)
return err
}
}
// Toggled until greenlit
// if dbUser.HasRole(schema.RoleManager) && !reflect.DeepEqual(dbUser.Projects, user.Projects) {
// projects, _ := json.Marshal(user.Projects)
// if _, err := sq.Update("hpc_user").Set("projects", projects).Where("hpc_user.username = ?", dbUser.Username).RunWith(r.DB).Exec(); err != nil {
// return err
// }
// }
return nil
}
func (r *UserRepository) DelUser(username string) error {
_, err := r.DB.Exec(`DELETE FROM user WHERE user.username = ?`, username)
_, err := r.DB.Exec(`DELETE FROM hpc_user WHERE hpc_user.username = ?`, username)
if err != nil {
log.Errorf("Error while deleting user '%s' from DB", username)
return err
@@ -143,8 +187,7 @@ func (r *UserRepository) DelUser(username string) error {
}
func (r *UserRepository) ListUsers(specialsOnly bool) ([]*schema.User, error) {
q := sq.Select("username", "name", "email", "roles", "projects").From("user")
q := sq.Select("username", "name", "email", "roles", "projects").From("hpc_user")
if specialsOnly {
q = q.Where("(roles != '[\"user\"]' AND roles != '[]')")
}
@@ -186,8 +229,8 @@ func (r *UserRepository) ListUsers(specialsOnly bool) ([]*schema.User, error) {
func (r *UserRepository) AddRole(
ctx context.Context,
username string,
queryrole string) error {
queryrole string,
) error {
newRole := strings.ToLower(queryrole)
user, err := r.GetUser(username)
if err != nil {
@@ -198,15 +241,15 @@ func (r *UserRepository) AddRole(
exists, valid := user.HasValidRole(newRole)
if !valid {
return fmt.Errorf("Supplied role is no valid option : %v", newRole)
return fmt.Errorf("supplied role is no valid option : %v", newRole)
}
if exists {
return fmt.Errorf("User %v already has role %v", username, newRole)
return fmt.Errorf("user %v already has role %v", username, newRole)
}
roles, _ := json.Marshal(append(user.Roles, newRole))
if _, err := sq.Update("user").Set("roles", roles).Where("user.username = ?", username).RunWith(r.DB).Exec(); err != nil {
log.Errorf("Error while adding new role for user '%s'", user.Username)
if _, err := sq.Update("hpc_user").Set("roles", roles).Where("hpc_user.username = ?", username).RunWith(r.DB).Exec(); err != nil {
log.Errorf("error while adding new role for user '%s'", user.Username)
return err
}
return nil
@@ -223,14 +266,14 @@ func (r *UserRepository) RemoveRole(ctx context.Context, username string, queryr
exists, valid := user.HasValidRole(oldRole)
if !valid {
return fmt.Errorf("Supplied role is no valid option : %v", oldRole)
return fmt.Errorf("supplied role is no valid option : %v", oldRole)
}
if !exists {
return fmt.Errorf("Role already deleted for user '%v': %v", username, oldRole)
return fmt.Errorf("role already deleted for user '%v': %v", username, oldRole)
}
if oldRole == schema.GetRoleString(schema.RoleManager) && len(user.Projects) != 0 {
return fmt.Errorf("Cannot remove role 'manager' while user %s still has assigned project(s) : %v", username, user.Projects)
return fmt.Errorf("cannot remove role 'manager' while user %s still has assigned project(s) : %v", username, user.Projects)
}
var newroles []string
@@ -240,8 +283,8 @@ func (r *UserRepository) RemoveRole(ctx context.Context, username string, queryr
}
}
var mroles, _ = json.Marshal(newroles)
if _, err := sq.Update("user").Set("roles", mroles).Where("user.username = ?", username).RunWith(r.DB).Exec(); err != nil {
mroles, _ := json.Marshal(newroles)
if _, err := sq.Update("hpc_user").Set("roles", mroles).Where("hpc_user.username = ?", username).RunWith(r.DB).Exec(); err != nil {
log.Errorf("Error while removing role for user '%s'", user.Username)
return err
}
@@ -251,15 +294,15 @@ func (r *UserRepository) RemoveRole(ctx context.Context, username string, queryr
func (r *UserRepository) AddProject(
ctx context.Context,
username string,
project string) error {
project string,
) error {
user, err := r.GetUser(username)
if err != nil {
return err
}
if !user.HasRole(schema.RoleManager) {
return fmt.Errorf("user '%s' is not a manager!", username)
return fmt.Errorf("user '%s' is not a manager", username)
}
if user.HasProject(project) {
@@ -267,7 +310,7 @@ func (r *UserRepository) AddProject(
}
projects, _ := json.Marshal(append(user.Projects, project))
if _, err := sq.Update("user").Set("projects", projects).Where("user.username = ?", username).RunWith(r.DB).Exec(); err != nil {
if _, err := sq.Update("hpc_user").Set("projects", projects).Where("hpc_user.username = ?", username).RunWith(r.DB).Exec(); err != nil {
return err
}
@@ -281,11 +324,11 @@ func (r *UserRepository) RemoveProject(ctx context.Context, username string, pro
}
if !user.HasRole(schema.RoleManager) {
return fmt.Errorf("user '%#v' is not a manager!", username)
return fmt.Errorf("user '%#v' is not a manager", username)
}
if !user.HasProject(project) {
return fmt.Errorf("user '%#v': Cannot remove project '%#v' - Does not match!", username, project)
return fmt.Errorf("user '%#v': Cannot remove project '%#v' - Does not match", username, project)
}
var exists bool
@@ -298,14 +341,14 @@ func (r *UserRepository) RemoveProject(ctx context.Context, username string, pro
}
}
if exists == true {
if exists {
var result interface{}
if len(newprojects) == 0 {
result = "[]"
} else {
result, _ = json.Marshal(newprojects)
}
if _, err := sq.Update("user").Set("projects", result).Where("user.username = ?", username).RunWith(r.DB).Exec(); err != nil {
if _, err := sq.Update("hpc_user").Set("projects", result).Where("hpc_user.username = ?", username).RunWith(r.DB).Exec(); err != nil {
return err
}
return nil
@@ -321,9 +364,10 @@ const ContextUserKey ContextKey = "user"
func GetUserFromContext(ctx context.Context) *schema.User {
x := ctx.Value(ContextUserKey)
if x == nil {
log.Warnf("no user retrieved from context")
return nil
}
// log.Infof("user retrieved from context: %v", x.(*schema.User))
return x.(*schema.User)
}
@@ -336,7 +380,7 @@ func (r *UserRepository) FetchUserInCtx(ctx context.Context, username string) (*
user := &model.User{Username: username}
var name, email sql.NullString
if err := sq.Select("name", "email").From("user").Where("user.username = ?", username).
if err := sq.Select("name", "email").From("hpc_user").Where("hpc_user.username = ?", username).
RunWith(r.DB).QueryRow().Scan(&name, &email); err != nil {
if err == sql.ErrNoRows {
/* This warning will be logged *often* for non-local users, i.e. users mentioned only in job-table or archive, */

View File

@@ -24,9 +24,9 @@ var (
type UserCfgRepo struct {
DB *sqlx.DB
Lookup *sqlx.Stmt
lock sync.RWMutex
uiDefaults map[string]interface{}
cache *lrucache.Cache
lock sync.RWMutex
}
func GetUserCfgRepo() *UserCfgRepo {
@@ -35,7 +35,7 @@ func GetUserCfgRepo() *UserCfgRepo {
lookupConfigStmt, err := db.DB.Preparex(`SELECT confkey, value FROM configuration WHERE configuration.username = ?`)
if err != nil {
log.Fatalf("db.DB.Preparex() error: %v", err)
log.Fatalf("User Config: Call 'db.DB.Preparex()' failed.\nError: %s\n", err.Error())
}
userCfgRepoInstance = &UserCfgRepo{
@@ -112,8 +112,8 @@ func (uCfg *UserCfgRepo) GetUIConfig(user *schema.User) (map[string]interface{},
// configuration.
func (uCfg *UserCfgRepo) UpdateConfig(
key, value string,
user *schema.User) error {
user *schema.User,
) error {
if user == nil {
var val interface{}
if err := json.Unmarshal([]byte(value), &val); err != nil {

View File

@@ -25,6 +25,9 @@ func setupUserTest(t *testing.T) *UserCfgRepo {
"jwts": {
"max-age": "2m"
},
"apiAllowedIPs": [
"*"
],
"clusters": [
{
"name": "testcluster",