Merge branch 'master' into 97_107_mark_and_show_shared

- Solves query.go conflict by splitting QueryJobLinks function aswell
This commit is contained in:
Christoph Kluge 2023-06-01 17:48:43 +02:00
commit a6cb833843
16 changed files with 290 additions and 60 deletions

2
.gitignore vendored
View File

@ -12,3 +12,5 @@
/.vscode/* /.vscode/*
/archive-migration /archive-migration
/archive-manager /archive-manager
var/job.db-shm
var/job.db-wal

View File

@ -5,15 +5,11 @@
package repository package repository
import ( import (
"database/sql"
"fmt"
"log" "log"
"sync" "sync"
"time" "time"
"github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx"
"github.com/mattn/go-sqlite3"
"github.com/qustavo/sqlhooks/v2"
) )
var ( var (
@ -26,35 +22,56 @@ type DBConnection struct {
Driver string Driver string
} }
type DatabaseOptions struct {
URL string
MaxOpenConnections int
MaxIdleConnections int
ConnectionMaxLifetime time.Duration
ConnectionMaxIdleTime time.Duration
}
func Connect(driver string, db string) { func Connect(driver string, db string) {
var err error var err error
var dbHandle *sqlx.DB var dbHandle *sqlx.DB
dbConnOnce.Do(func() { dbConnOnce.Do(func() {
if driver == "sqlite3" { opts := DatabaseOptions{
sql.Register("sqlite3WithHooks", sqlhooks.Wrap(&sqlite3.SQLiteDriver{}, &Hooks{})) URL: db,
dbHandle, err = sqlx.Open("sqlite3WithHooks", fmt.Sprintf("%s?_foreign_keys=on", db)) MaxOpenConnections: 4,
// dbHandle, err = sqlx.Open("sqlite3", fmt.Sprintf("%s?_foreign_keys=on", db)) MaxIdleConnections: 4,
ConnectionMaxLifetime: time.Hour,
ConnectionMaxIdleTime: time.Hour,
}
switch driver {
case "sqlite3":
// sql.Register("sqlite3WithHooks", sqlhooks.Wrap(&sqlite3.SQLiteDriver{}, &Hooks{}))
// - Set WAL mode (not strictly necessary each time because it's persisted in the database, but good for first run)
// - Set busy timeout, so concurrent writers wait on each other instead of erroring immediately
// - Enable foreign key checks
opts.URL += "?_journal=WAL&_timeout=5000&_fk=true"
// dbHandle, err = sqlx.Open("sqlite3WithHooks", fmt.Sprintf("%s?_foreign_keys=on", db))
dbHandle, err = sqlx.Open("sqlite3", opts.URL)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
case "mysql":
// sqlite does not multithread. Having more than one connection open would just mean opts.URL += "?multiStatements=true"
// waiting for locks. dbHandle, err = sqlx.Open("mysql", opts.URL)
dbHandle.SetMaxOpenConns(1)
} else if driver == "mysql" {
dbHandle, err = sqlx.Open("mysql", fmt.Sprintf("%s?multiStatements=true", db))
if err != nil { if err != nil {
log.Fatalf("sqlx.Open() error: %v", err) log.Fatalf("sqlx.Open() error: %v", err)
} }
default:
dbHandle.SetConnMaxLifetime(time.Minute * 3)
dbHandle.SetMaxOpenConns(10)
dbHandle.SetMaxIdleConns(10)
} else {
log.Fatalf("unsupported database driver: %s", driver) log.Fatalf("unsupported database driver: %s", driver)
} }
dbHandle.SetMaxOpenConns(opts.MaxOpenConnections)
dbHandle.SetMaxIdleConns(opts.MaxIdleConnections)
dbHandle.SetConnMaxLifetime(opts.ConnectionMaxLifetime)
dbHandle.SetConnMaxIdleTime(opts.ConnectionMaxIdleTime)
dbConnInstance = &DBConnection{DB: dbHandle, Driver: driver} dbConnInstance = &DBConnection{DB: dbHandle, Driver: driver}
err = checkDBVersion(driver, dbHandle.DB) err = checkDBVersion(driver, dbHandle.DB)
if err != nil { if err != nil {

View File

@ -74,7 +74,7 @@ func scanJob(row interface{ Scan(...interface{}) error }) (*schema.Job, error) {
&job.ID, &job.JobID, &job.User, &job.Project, &job.Cluster, &job.SubCluster, &job.StartTimeUnix, &job.Partition, &job.ArrayJobId, &job.ID, &job.JobID, &job.User, &job.Project, &job.Cluster, &job.SubCluster, &job.StartTimeUnix, &job.Partition, &job.ArrayJobId,
&job.NumNodes, &job.NumHWThreads, &job.NumAcc, &job.Exclusive, &job.MonitoringStatus, &job.SMT, &job.State, &job.NumNodes, &job.NumHWThreads, &job.NumAcc, &job.Exclusive, &job.MonitoringStatus, &job.SMT, &job.State,
&job.Duration, &job.Walltime, &job.RawResources /*&job.RawMetaData*/); err != nil { &job.Duration, &job.Walltime, &job.RawResources /*&job.RawMetaData*/); err != nil {
log.Warn("Error while scanning rows (Job)") log.Warnf("Error while scanning rows (Job): %v", err)
return nil, err return nil, err
} }

View File

@ -8,21 +8,9 @@ import (
"fmt" "fmt"
"testing" "testing"
"github.com/ClusterCockpit/cc-backend/pkg/log"
_ "github.com/mattn/go-sqlite3" _ "github.com/mattn/go-sqlite3"
) )
func setup(t *testing.T) *JobRepository {
log.Init("info", true)
dbfilepath := "testdata/test.db"
err := MigrateDB("sqlite3", dbfilepath)
if err != nil {
t.Fatal(err)
}
Connect("sqlite3", dbfilepath)
return GetJobRepository()
}
func TestFind(t *testing.T) { func TestFind(t *testing.T) {
r := setup(t) r := setup(t)

View File

@ -2,4 +2,7 @@ DROP INDEX IF EXISTS job_stats;
DROP INDEX IF EXISTS job_by_user; DROP INDEX IF EXISTS job_by_user;
DROP INDEX IF EXISTS job_by_starttime; DROP INDEX IF EXISTS job_by_starttime;
DROP INDEX IF EXISTS job_by_job_id; DROP INDEX IF EXISTS job_by_job_id;
DROP INDEX IF EXISTS job_by_state; DROP INDEX IF EXISTS job_list;
DROP INDEX IF EXISTS job_list_user;
DROP INDEX IF EXISTS job_list_users;
DROP INDEX IF EXISTS job_list_users_start;

View File

@ -2,4 +2,7 @@ CREATE INDEX IF NOT EXISTS job_stats ON job (cluster,subcluster,user);
CREATE INDEX IF NOT EXISTS job_by_user ON job (user); CREATE INDEX IF NOT EXISTS job_by_user ON job (user);
CREATE INDEX IF NOT EXISTS job_by_starttime ON job (start_time); CREATE INDEX IF NOT EXISTS job_by_starttime ON job (start_time);
CREATE INDEX IF NOT EXISTS job_by_job_id ON job (job_id); CREATE INDEX IF NOT EXISTS job_by_job_id ON job (job_id);
CREATE INDEX IF NOT EXISTS job_by_state ON job (job_state); CREATE INDEX IF NOT EXISTS job_list ON job (cluster, job_state);
CREATE INDEX IF NOT EXISTS job_list_user ON job (user, cluster, job_state);
CREATE INDEX IF NOT EXISTS job_list_users ON job (user, job_state);
CREATE INDEX IF NOT EXISTS job_list_users_start ON job (start_time, user, job_state);

View File

@ -2,4 +2,7 @@ DROP INDEX IF EXISTS job_stats;
DROP INDEX IF EXISTS job_by_user; DROP INDEX IF EXISTS job_by_user;
DROP INDEX IF EXISTS job_by_starttime; DROP INDEX IF EXISTS job_by_starttime;
DROP INDEX IF EXISTS job_by_job_id; DROP INDEX IF EXISTS job_by_job_id;
DROP INDEX IF EXISTS job_by_state; DROP INDEX IF EXISTS job_list;
DROP INDEX IF EXISTS job_list_user;
DROP INDEX IF EXISTS job_list_users;
DROP INDEX IF EXISTS job_list_users_start;

View File

@ -1,5 +1,8 @@
CREATE INDEX IF NOT EXISTS job_stats ON job (cluster,subcluster,user); CREATE INDEX IF NOT EXISTS job_stats ON job (cluster,subcluster,user);
CREATE INDEX IF NOT EXISTS job_by_user ON job (user); CREATE INDEX IF NOT EXISTS job_by_user ON job (user);
CREATE INDEX IF NOT EXISTS job_by_starttime ON job (start_time); CREATE INDEX IF NOT EXISTS job_by_starttime ON job (start_time);
CREATE INDEX IF NOT EXISTS job_by_job_id ON job (job_id); CREATE INDEX IF NOT EXISTS job_by_job_id ON job (job_id, cluster, start_time);
CREATE INDEX IF NOT EXISTS job_by_state ON job (job_state); CREATE INDEX IF NOT EXISTS job_list ON job (cluster, job_state);
CREATE INDEX IF NOT EXISTS job_list_user ON job (user, cluster, job_state);
CREATE INDEX IF NOT EXISTS job_list_users ON job (user, job_state);
CREATE INDEX IF NOT EXISTS job_list_users_start ON job (start_time, user, job_state);

View File

@ -19,19 +19,13 @@ import (
sq "github.com/Masterminds/squirrel" sq "github.com/Masterminds/squirrel"
) )
// QueryJobs returns a list of jobs matching the provided filters. page and order are optional- // SecurityCheck-less, private: Returns a list of jobs matching the provided filters. page and order are optional-
func (r *JobRepository) QueryJobs( func (r *JobRepository) queryJobs(
ctx context.Context, query sq.SelectBuilder,
filters []*model.JobFilter, filters []*model.JobFilter,
page *model.PageRequest, page *model.PageRequest,
order *model.OrderByInput) ([]*schema.Job, error) { order *model.OrderByInput) ([]*schema.Job, error) {
query, qerr := SecurityCheck(ctx, sq.Select(jobColumns...).From("job"))
if qerr != nil {
return nil, qerr
}
if order != nil { if order != nil {
field := toSnakeCase(order.Field) field := toSnakeCase(order.Field)
@ -81,17 +75,38 @@ func (r *JobRepository) QueryJobs(
return jobs, nil return jobs, nil
} }
// QueryJobLinks returns a list of minimal job information (DB-ID and jobId) of shared jobs for link-building based the provided filters. // testFunction for queryJobs
func (r *JobRepository) QueryJobLinks( func (r *JobRepository) testQueryJobs(
ctx context.Context, filters []*model.JobFilter,
filters []*model.JobFilter) ([]*model.JobLink, error) { page *model.PageRequest,
order *model.OrderByInput) ([]*schema.Job, error) {
query, qerr := SecurityCheck(ctx, sq.Select("job.id", "job.job_id").From("job")) return r.queryJobs(sq.Select(jobColumns...).From("job"),
filters, page, order)
}
// Public function with added securityCheck, calls private queryJobs function above
func (r *JobRepository) QueryJobs(
ctx context.Context,
filters []*model.JobFilter,
page *model.PageRequest,
order *model.OrderByInput) ([]*schema.Job, error) {
query, qerr := SecurityCheck(ctx, sq.Select(jobColumns...).From("job"))
if qerr != nil { if qerr != nil {
return nil, qerr return nil, qerr
} }
return r.queryJobs(query,
filters, page, order)
}
// SecurityCheck-less, private: returns a list of minimal job information (DB-ID and jobId) of shared jobs for link-building based the provided filters.
func (r *JobRepository) queryJobLinks(
query sq.SelectBuilder,
filters []*model.JobFilter) ([]*model.JobLink, error) {
for _, f := range filters { for _, f := range filters {
query = BuildWhereClause(f, query) query = BuildWhereClause(f, query)
} }
@ -123,21 +138,41 @@ func (r *JobRepository) QueryJobLinks(
return jobLinks, nil return jobLinks, nil
} }
// CountJobs counts the number of jobs matching the filters. // testFunction for queryJobLinks
func (r *JobRepository) CountJobs( func (r *JobRepository) testQueryJobLinks(
ctx context.Context, filters []*model.JobFilter) ([]*model.JobLink, error) {
filters []*model.JobFilter) (int, error) {
// count all jobs: return r.queryJobLinks(sq.Select(jobColumns...).From("job"), filters)
query, qerr := SecurityCheck(ctx, sq.Select("count(*)").From("job")) }
func (r *JobRepository) QueryJobLinks(
ctx context.Context,
filters []*model.JobFilter) ([]*model.JobLink, error) {
query, qerr := SecurityCheck(ctx, sq.Select("job.id", "job.job_id").From("job"))
if qerr != nil { if qerr != nil {
return 0, qerr return nil, qerr
} }
return r.queryJobLinks(query, filters)
}
// SecurityCheck-less, private: Returns the number of jobs matching the filters
func (r *JobRepository) countJobs(query sq.SelectBuilder,
filters []*model.JobFilter) (int, error) {
for _, f := range filters { for _, f := range filters {
query = BuildWhereClause(f, query) query = BuildWhereClause(f, query)
} }
sql, args, err := query.ToSql()
if err != nil {
log.Warn("Error while converting query to sql")
return 0, nil
}
log.Debugf("SQL query: `%s`, args: %#v", sql, args)
var count int var count int
if err := query.RunWith(r.DB).Scan(&count); err != nil { if err := query.RunWith(r.DB).Scan(&count); err != nil {
return 0, err return 0, err
@ -146,6 +181,27 @@ func (r *JobRepository) CountJobs(
return count, nil return count, nil
} }
// testFunction for countJobs
func (r *JobRepository) testCountJobs(
filters []*model.JobFilter) (int, error) {
return r.countJobs(sq.Select("count(*)").From("job"), filters)
}
// Public function with added securityCheck, calls private countJobs function above
func (r *JobRepository) CountJobs(
ctx context.Context,
filters []*model.JobFilter) (int, error) {
query, qerr := SecurityCheck(ctx, sq.Select("count(*)").From("job"))
if qerr != nil {
return 0, qerr
}
return r.countJobs(query, filters)
}
func SecurityCheck(ctx context.Context, query sq.SelectBuilder) (queryOut sq.SelectBuilder, err error) { func SecurityCheck(ctx context.Context, query sq.SelectBuilder) (queryOut sq.SelectBuilder, err error) {
user := auth.GetUser(ctx) user := auth.GetUser(ctx)
if user == nil || user.HasAnyRole([]auth.Role{auth.RoleAdmin, auth.RoleSupport, auth.RoleApi}) { // Admin & Co. : All jobs if user == nil || user.HasAnyRole([]auth.Role{auth.RoleAdmin, auth.RoleSupport, auth.RoleApi}) { // Admin & Co. : All jobs

View File

@ -0,0 +1,145 @@
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package repository
import (
"testing"
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
"github.com/ClusterCockpit/cc-backend/pkg/log"
_ "github.com/mattn/go-sqlite3"
)
func TestPragma(t *testing.T) {
t.Run("sets up a new DB", func(t *testing.T) {
db := setup(t)
for _, pragma := range []string{"synchronous", "journal_mode", "busy_timeout", "auto_vacuum", "foreign_keys"} {
t.Log("PRAGMA", pragma, getPragma(db, pragma))
}
})
}
func getPragma(db *JobRepository, name string) string {
var s string
if err := db.DB.QueryRow(`PRAGMA ` + name).Scan(&s); err != nil {
panic(err)
}
return s
}
func BenchmarkSelect1(b *testing.B) {
db := setup(b)
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
_, err := db.DB.Exec(`select 1`)
noErr(b, err)
}
})
}
func BenchmarkDB_FindJobById(b *testing.B) {
var jobId int64 = 1677322
b.Run("FindJobById", func(b *testing.B) {
db := setup(b)
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
_, err := db.FindById(jobId)
noErr(b, err)
}
})
})
}
func BenchmarkDB_FindJob(b *testing.B) {
var jobId int64 = 107266
var startTime int64 = 1657557241
var cluster = "fritz"
b.Run("FindJob", func(b *testing.B) {
db := setup(b)
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
_, err := db.Find(&jobId, &cluster, &startTime)
noErr(b, err)
}
})
})
}
func BenchmarkDB_CountJobs(b *testing.B) {
filter := &model.JobFilter{}
filter.State = append(filter.State, "running")
cluster := "fritz"
filter.Cluster = &model.StringInput{Eq: &cluster}
user := "mppi133h"
filter.User = &model.StringInput{Eq: &user}
b.Run("CountJobs", func(b *testing.B) {
db := setup(b)
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
_, err := db.testCountJobs([]*model.JobFilter{filter})
noErr(b, err)
}
})
})
}
func BenchmarkDB_QueryJobs(b *testing.B) {
filter := &model.JobFilter{}
filter.State = append(filter.State, "running")
cluster := "fritz"
filter.Cluster = &model.StringInput{Eq: &cluster}
user := "mppi133h"
filter.User = &model.StringInput{Eq: &user}
page := &model.PageRequest{ItemsPerPage: 50, Page: 1}
order := &model.OrderByInput{Field: "startTime", Order: model.SortDirectionEnumDesc}
b.Run("QueryJobs", func(b *testing.B) {
db := setup(b)
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
_, err := db.testQueryJobs([]*model.JobFilter{filter}, page, order)
noErr(b, err)
}
})
})
}
func setup(tb testing.TB) *JobRepository {
tb.Helper()
log.Init("warn", true)
dbfile := "testdata/job.db"
err := MigrateDB("sqlite3", dbfile)
noErr(tb, err)
Connect("sqlite3", dbfile)
return GetJobRepository()
}
func noErr(tb testing.TB, err error) {
tb.Helper()
if err != nil {
tb.Fatal("Error is not nil:", err)
}
}

BIN
internal/repository/testdata/job.db vendored Normal file

Binary file not shown.

BIN
internal/repository/testdata/job.db-shm vendored Normal file

Binary file not shown.

View File

Binary file not shown.

View File

@ -44,6 +44,7 @@ var (
/* CONFIG */ /* CONFIG */
func Init(lvl string, logdate bool) { func Init(lvl string, logdate bool) {
switch lvl { switch lvl {
case "crit": case "crit":
ErrWriter = io.Discard ErrWriter = io.Discard
@ -70,6 +71,12 @@ func Init(lvl string, logdate bool) {
WarnLog = log.New(WarnWriter, WarnPrefix, log.Lshortfile) WarnLog = log.New(WarnWriter, WarnPrefix, log.Lshortfile)
ErrLog = log.New(ErrWriter, ErrPrefix, log.Llongfile) ErrLog = log.New(ErrWriter, ErrPrefix, log.Llongfile)
CritLog = log.New(CritWriter, CritPrefix, log.Llongfile) CritLog = log.New(CritWriter, CritPrefix, log.Llongfile)
} else {
DebugLog = log.New(DebugWriter, DebugPrefix, log.LstdFlags)
InfoLog = log.New(InfoWriter, InfoPrefix, log.LstdFlags|log.Lshortfile)
WarnLog = log.New(WarnWriter, WarnPrefix, log.LstdFlags|log.Lshortfile)
ErrLog = log.New(ErrWriter, ErrPrefix, log.LstdFlags|log.Llongfile)
CritLog = log.New(CritWriter, CritPrefix, log.LstdFlags|log.Llongfile)
} }
} }

View File

@ -162,10 +162,13 @@ func (topo *Topology) GetMemoryDomainsFromHWThreads(
// Temporary fix to convert back from int id to string id for accelerators // Temporary fix to convert back from int id to string id for accelerators
func (topo *Topology) GetAcceleratorID(id int) (string, error) { func (topo *Topology) GetAcceleratorID(id int) (string, error) {
if id < len(topo.Accelerators) { if id < 0 {
fmt.Printf("ID smaller than 0!\n")
return topo.Accelerators[0].ID, nil
} else if id < len(topo.Accelerators) {
return topo.Accelerators[id].ID, nil return topo.Accelerators[id].ID, nil
} else { } else {
return "", fmt.Errorf("Index %d out of range", id) return "", fmt.Errorf("index %d out of range", id)
} }
} }