diff --git a/.gitignore b/.gitignore index 153e354..9f448aa 100644 --- a/.gitignore +++ b/.gitignore @@ -12,3 +12,5 @@ /.vscode/* /archive-migration /archive-manager +var/job.db-shm +var/job.db-wal diff --git a/internal/repository/dbConnection.go b/internal/repository/dbConnection.go index 790ec74..da3d40e 100644 --- a/internal/repository/dbConnection.go +++ b/internal/repository/dbConnection.go @@ -5,15 +5,11 @@ package repository import ( - "database/sql" - "fmt" "log" "sync" "time" "github.com/jmoiron/sqlx" - "github.com/mattn/go-sqlite3" - "github.com/qustavo/sqlhooks/v2" ) var ( @@ -26,35 +22,56 @@ type DBConnection struct { Driver string } +type DatabaseOptions struct { + URL string + MaxOpenConnections int + MaxIdleConnections int + ConnectionMaxLifetime time.Duration + ConnectionMaxIdleTime time.Duration +} + func Connect(driver string, db string) { var err error var dbHandle *sqlx.DB dbConnOnce.Do(func() { - if driver == "sqlite3" { - sql.Register("sqlite3WithHooks", sqlhooks.Wrap(&sqlite3.SQLiteDriver{}, &Hooks{})) - dbHandle, err = sqlx.Open("sqlite3WithHooks", fmt.Sprintf("%s?_foreign_keys=on", db)) - // dbHandle, err = sqlx.Open("sqlite3", fmt.Sprintf("%s?_foreign_keys=on", db)) + opts := DatabaseOptions{ + URL: db, + MaxOpenConnections: 4, + MaxIdleConnections: 4, + ConnectionMaxLifetime: time.Hour, + ConnectionMaxIdleTime: time.Hour, + } + + switch driver { + case "sqlite3": + // sql.Register("sqlite3WithHooks", sqlhooks.Wrap(&sqlite3.SQLiteDriver{}, &Hooks{})) + + // - Set WAL mode (not strictly necessary each time because it's persisted in the database, but good for first run) + // - Set busy timeout, so concurrent writers wait on each other instead of erroring immediately + // - Enable foreign key checks + opts.URL += "?_journal=WAL&_timeout=5000&_fk=true" + + // dbHandle, err = sqlx.Open("sqlite3WithHooks", fmt.Sprintf("%s?_foreign_keys=on", db)) + dbHandle, err = sqlx.Open("sqlite3", opts.URL) if err != nil { log.Fatal(err) } - - // sqlite does not multithread. Having more than one connection open would just mean - // waiting for locks. - dbHandle.SetMaxOpenConns(1) - } else if driver == "mysql" { - dbHandle, err = sqlx.Open("mysql", fmt.Sprintf("%s?multiStatements=true", db)) + case "mysql": + opts.URL += "?multiStatements=true" + dbHandle, err = sqlx.Open("mysql", opts.URL) if err != nil { log.Fatalf("sqlx.Open() error: %v", err) } - - dbHandle.SetConnMaxLifetime(time.Minute * 3) - dbHandle.SetMaxOpenConns(10) - dbHandle.SetMaxIdleConns(10) - } else { + default: log.Fatalf("unsupported database driver: %s", driver) } + dbHandle.SetMaxOpenConns(opts.MaxOpenConnections) + dbHandle.SetMaxIdleConns(opts.MaxIdleConnections) + dbHandle.SetConnMaxLifetime(opts.ConnectionMaxLifetime) + dbHandle.SetConnMaxIdleTime(opts.ConnectionMaxIdleTime) + dbConnInstance = &DBConnection{DB: dbHandle, Driver: driver} err = checkDBVersion(driver, dbHandle.DB) if err != nil { diff --git a/internal/repository/job.go b/internal/repository/job.go index 4eee0d3..c5df610 100644 --- a/internal/repository/job.go +++ b/internal/repository/job.go @@ -74,7 +74,7 @@ func scanJob(row interface{ Scan(...interface{}) error }) (*schema.Job, error) { &job.ID, &job.JobID, &job.User, &job.Project, &job.Cluster, &job.SubCluster, &job.StartTimeUnix, &job.Partition, &job.ArrayJobId, &job.NumNodes, &job.NumHWThreads, &job.NumAcc, &job.Exclusive, &job.MonitoringStatus, &job.SMT, &job.State, &job.Duration, &job.Walltime, &job.RawResources /*&job.RawMetaData*/); err != nil { - log.Warn("Error while scanning rows") + log.Warnf("Error while scanning rows: %v", err) return nil, err } diff --git a/internal/repository/job_test.go b/internal/repository/job_test.go index d74dad5..bf491f4 100644 --- a/internal/repository/job_test.go +++ b/internal/repository/job_test.go @@ -8,21 +8,9 @@ import ( "fmt" "testing" - "github.com/ClusterCockpit/cc-backend/pkg/log" _ "github.com/mattn/go-sqlite3" ) -func setup(t *testing.T) *JobRepository { - log.Init("info", true) - dbfilepath := "testdata/test.db" - err := MigrateDB("sqlite3", dbfilepath) - if err != nil { - t.Fatal(err) - } - Connect("sqlite3", dbfilepath) - return GetJobRepository() -} - func TestFind(t *testing.T) { r := setup(t) diff --git a/internal/repository/migrations/mysql/02_add-index.down.sql b/internal/repository/migrations/mysql/02_add-index.down.sql index 8129772..1392c45 100644 --- a/internal/repository/migrations/mysql/02_add-index.down.sql +++ b/internal/repository/migrations/mysql/02_add-index.down.sql @@ -2,4 +2,7 @@ DROP INDEX IF EXISTS job_stats; DROP INDEX IF EXISTS job_by_user; DROP INDEX IF EXISTS job_by_starttime; DROP INDEX IF EXISTS job_by_job_id; -DROP INDEX IF EXISTS job_by_state; +DROP INDEX IF EXISTS job_list; +DROP INDEX IF EXISTS job_list_user; +DROP INDEX IF EXISTS job_list_users; +DROP INDEX IF EXISTS job_list_users_start; diff --git a/internal/repository/migrations/mysql/02_add-index.up.sql b/internal/repository/migrations/mysql/02_add-index.up.sql index 7d8d04a..2524bd9 100644 --- a/internal/repository/migrations/mysql/02_add-index.up.sql +++ b/internal/repository/migrations/mysql/02_add-index.up.sql @@ -2,4 +2,7 @@ CREATE INDEX IF NOT EXISTS job_stats ON job (cluster,subcluster,user); CREATE INDEX IF NOT EXISTS job_by_user ON job (user); CREATE INDEX IF NOT EXISTS job_by_starttime ON job (start_time); CREATE INDEX IF NOT EXISTS job_by_job_id ON job (job_id); -CREATE INDEX IF NOT EXISTS job_by_state ON job (job_state); +CREATE INDEX IF NOT EXISTS job_list ON job (cluster, job_state); +CREATE INDEX IF NOT EXISTS job_list_user ON job (user, cluster, job_state); +CREATE INDEX IF NOT EXISTS job_list_users ON job (user, job_state); +CREATE INDEX IF NOT EXISTS job_list_users_start ON job (start_time, user, job_state); diff --git a/internal/repository/migrations/sqlite3/02_add-index.down.sql b/internal/repository/migrations/sqlite3/02_add-index.down.sql index 8129772..1392c45 100644 --- a/internal/repository/migrations/sqlite3/02_add-index.down.sql +++ b/internal/repository/migrations/sqlite3/02_add-index.down.sql @@ -2,4 +2,7 @@ DROP INDEX IF EXISTS job_stats; DROP INDEX IF EXISTS job_by_user; DROP INDEX IF EXISTS job_by_starttime; DROP INDEX IF EXISTS job_by_job_id; -DROP INDEX IF EXISTS job_by_state; +DROP INDEX IF EXISTS job_list; +DROP INDEX IF EXISTS job_list_user; +DROP INDEX IF EXISTS job_list_users; +DROP INDEX IF EXISTS job_list_users_start; diff --git a/internal/repository/migrations/sqlite3/02_add-index.up.sql b/internal/repository/migrations/sqlite3/02_add-index.up.sql index 7d8d04a..db9792d 100644 --- a/internal/repository/migrations/sqlite3/02_add-index.up.sql +++ b/internal/repository/migrations/sqlite3/02_add-index.up.sql @@ -1,5 +1,8 @@ CREATE INDEX IF NOT EXISTS job_stats ON job (cluster,subcluster,user); CREATE INDEX IF NOT EXISTS job_by_user ON job (user); CREATE INDEX IF NOT EXISTS job_by_starttime ON job (start_time); -CREATE INDEX IF NOT EXISTS job_by_job_id ON job (job_id); -CREATE INDEX IF NOT EXISTS job_by_state ON job (job_state); +CREATE INDEX IF NOT EXISTS job_by_job_id ON job (job_id, cluster, start_time); +CREATE INDEX IF NOT EXISTS job_list ON job (cluster, job_state); +CREATE INDEX IF NOT EXISTS job_list_user ON job (user, cluster, job_state); +CREATE INDEX IF NOT EXISTS job_list_users ON job (user, job_state); +CREATE INDEX IF NOT EXISTS job_list_users_start ON job (start_time, user, job_state); diff --git a/internal/repository/query.go b/internal/repository/query.go index 60354c1..5513d51 100644 --- a/internal/repository/query.go +++ b/internal/repository/query.go @@ -19,19 +19,12 @@ import ( sq "github.com/Masterminds/squirrel" ) -// QueryJobs returns a list of jobs matching the provided filters. page and order are optional- -func (r *JobRepository) QueryJobs( - ctx context.Context, +func (r *JobRepository) queryJobs( + query sq.SelectBuilder, filters []*model.JobFilter, page *model.PageRequest, order *model.OrderByInput) ([]*schema.Job, error) { - query, qerr := SecurityCheck(ctx, sq.Select(jobColumns...).From("job")) - - if qerr != nil { - return nil, qerr - } - if order != nil { field := toSnakeCase(order.Field) @@ -81,21 +74,46 @@ func (r *JobRepository) QueryJobs( return jobs, nil } -// CountJobs counts the number of jobs matching the filters. -func (r *JobRepository) CountJobs( - ctx context.Context, - filters []*model.JobFilter) (int, error) { +func (r *JobRepository) testQueryJobs( + filters []*model.JobFilter, + page *model.PageRequest, + order *model.OrderByInput) ([]*schema.Job, error) { - // count all jobs: - query, qerr := SecurityCheck(ctx, sq.Select("count(*)").From("job")) + return r.queryJobs(sq.Select(jobColumns...).From("job"), + filters, page, order) +} + +// QueryJobs returns a list of jobs matching the provided filters. page and order are optional- +func (r *JobRepository) QueryJobs( + ctx context.Context, + filters []*model.JobFilter, + page *model.PageRequest, + order *model.OrderByInput) ([]*schema.Job, error) { + + query, qerr := SecurityCheck(ctx, sq.Select(jobColumns...).From("job")) if qerr != nil { - return 0, qerr + return nil, qerr } + return r.queryJobs(query, + filters, page, order) +} + +func (r *JobRepository) countJobs(query sq.SelectBuilder, + filters []*model.JobFilter) (int, error) { + for _, f := range filters { query = BuildWhereClause(f, query) } + + sql, args, err := query.ToSql() + if err != nil { + log.Warn("Error while converting query to sql") + return 0, nil + } + + log.Debugf("SQL query: `%s`, args: %#v", sql, args) var count int if err := query.RunWith(r.DB).Scan(&count); err != nil { return 0, err @@ -104,6 +122,25 @@ func (r *JobRepository) CountJobs( return count, nil } +func (r *JobRepository) testCountJobs( + filters []*model.JobFilter) (int, error) { + + return r.countJobs(sq.Select("count(*)").From("job"), filters) +} + +func (r *JobRepository) CountJobs( + ctx context.Context, + filters []*model.JobFilter) (int, error) { + + query, qerr := SecurityCheck(ctx, sq.Select("count(*)").From("job")) + + if qerr != nil { + return 0, qerr + } + + return r.countJobs(query, filters) +} + func SecurityCheck(ctx context.Context, query sq.SelectBuilder) (queryOut sq.SelectBuilder, err error) { user := auth.GetUser(ctx) if user == nil || user.HasAnyRole([]auth.Role{auth.RoleAdmin, auth.RoleSupport, auth.RoleApi}) { // Admin & Co. : All jobs diff --git a/internal/repository/repository_test.go b/internal/repository/repository_test.go new file mode 100644 index 0000000..efb5395 --- /dev/null +++ b/internal/repository/repository_test.go @@ -0,0 +1,145 @@ +// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. +package repository + +import ( + "testing" + + "github.com/ClusterCockpit/cc-backend/internal/graph/model" + "github.com/ClusterCockpit/cc-backend/pkg/log" + _ "github.com/mattn/go-sqlite3" +) + +func TestPragma(t *testing.T) { + t.Run("sets up a new DB", func(t *testing.T) { + db := setup(t) + + for _, pragma := range []string{"synchronous", "journal_mode", "busy_timeout", "auto_vacuum", "foreign_keys"} { + t.Log("PRAGMA", pragma, getPragma(db, pragma)) + } + }) +} + +func getPragma(db *JobRepository, name string) string { + var s string + if err := db.DB.QueryRow(`PRAGMA ` + name).Scan(&s); err != nil { + panic(err) + } + return s +} + +func BenchmarkSelect1(b *testing.B) { + db := setup(b) + + b.ResetTimer() + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + _, err := db.DB.Exec(`select 1`) + noErr(b, err) + } + }) +} + +func BenchmarkDB_FindJobById(b *testing.B) { + var jobId int64 = 1677322 + + b.Run("FindJobById", func(b *testing.B) { + db := setup(b) + + b.ResetTimer() + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + _, err := db.FindById(jobId) + noErr(b, err) + } + }) + }) +} + +func BenchmarkDB_FindJob(b *testing.B) { + var jobId int64 = 107266 + var startTime int64 = 1657557241 + var cluster = "fritz" + + b.Run("FindJob", func(b *testing.B) { + db := setup(b) + + b.ResetTimer() + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + _, err := db.Find(&jobId, &cluster, &startTime) + noErr(b, err) + } + }) + }) +} + +func BenchmarkDB_CountJobs(b *testing.B) { + filter := &model.JobFilter{} + filter.State = append(filter.State, "running") + cluster := "fritz" + filter.Cluster = &model.StringInput{Eq: &cluster} + user := "mppi133h" + filter.User = &model.StringInput{Eq: &user} + + b.Run("CountJobs", func(b *testing.B) { + db := setup(b) + + b.ResetTimer() + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + _, err := db.testCountJobs([]*model.JobFilter{filter}) + noErr(b, err) + } + }) + }) +} + +func BenchmarkDB_QueryJobs(b *testing.B) { + filter := &model.JobFilter{} + filter.State = append(filter.State, "running") + cluster := "fritz" + filter.Cluster = &model.StringInput{Eq: &cluster} + user := "mppi133h" + filter.User = &model.StringInput{Eq: &user} + page := &model.PageRequest{ItemsPerPage: 50, Page: 1} + order := &model.OrderByInput{Field: "startTime", Order: model.SortDirectionEnumDesc} + + b.Run("QueryJobs", func(b *testing.B) { + db := setup(b) + + b.ResetTimer() + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + _, err := db.testQueryJobs([]*model.JobFilter{filter}, page, order) + noErr(b, err) + } + }) + }) +} + +func setup(tb testing.TB) *JobRepository { + tb.Helper() + log.Init("warn", true) + dbfile := "testdata/job.db" + err := MigrateDB("sqlite3", dbfile) + noErr(tb, err) + + Connect("sqlite3", dbfile) + return GetJobRepository() +} + +func noErr(tb testing.TB, err error) { + tb.Helper() + + if err != nil { + tb.Fatal("Error is not nil:", err) + } +} diff --git a/internal/repository/testdata/job.db b/internal/repository/testdata/job.db new file mode 100644 index 0000000..c07fb32 Binary files /dev/null and b/internal/repository/testdata/job.db differ diff --git a/internal/repository/testdata/job.db-shm b/internal/repository/testdata/job.db-shm new file mode 100644 index 0000000..fe9ac28 Binary files /dev/null and b/internal/repository/testdata/job.db-shm differ diff --git a/internal/repository/testdata/job.db-wal b/internal/repository/testdata/job.db-wal new file mode 100644 index 0000000..e69de29 diff --git a/internal/repository/testdata/test.db b/internal/repository/testdata/test.db index e2dd7de..1e3c8a7 100644 Binary files a/internal/repository/testdata/test.db and b/internal/repository/testdata/test.db differ diff --git a/pkg/log/log.go b/pkg/log/log.go index 8240194..f514df9 100644 --- a/pkg/log/log.go +++ b/pkg/log/log.go @@ -44,6 +44,7 @@ var ( /* CONFIG */ func Init(lvl string, logdate bool) { + switch lvl { case "crit": ErrWriter = io.Discard @@ -70,6 +71,12 @@ func Init(lvl string, logdate bool) { WarnLog = log.New(WarnWriter, WarnPrefix, log.Lshortfile) ErrLog = log.New(ErrWriter, ErrPrefix, log.Llongfile) CritLog = log.New(CritWriter, CritPrefix, log.Llongfile) + } else { + DebugLog = log.New(DebugWriter, DebugPrefix, log.LstdFlags) + InfoLog = log.New(InfoWriter, InfoPrefix, log.LstdFlags|log.Lshortfile) + WarnLog = log.New(WarnWriter, WarnPrefix, log.LstdFlags|log.Lshortfile) + ErrLog = log.New(ErrWriter, ErrPrefix, log.LstdFlags|log.Llongfile) + CritLog = log.New(CritWriter, CritPrefix, log.LstdFlags|log.Llongfile) } } diff --git a/pkg/schema/cluster.go b/pkg/schema/cluster.go index 0724ada..bc7a86a 100644 --- a/pkg/schema/cluster.go +++ b/pkg/schema/cluster.go @@ -162,10 +162,13 @@ func (topo *Topology) GetMemoryDomainsFromHWThreads( // Temporary fix to convert back from int id to string id for accelerators func (topo *Topology) GetAcceleratorID(id int) (string, error) { - if id < len(topo.Accelerators) { + if id < 0 { + fmt.Printf("ID smaller than 0!\n") + return topo.Accelerators[0].ID, nil + } else if id < len(topo.Accelerators) { return topo.Accelerators[id].ID, nil } else { - return "", fmt.Errorf("Index %d out of range", id) + return "", fmt.Errorf("index %d out of range", id) } }