From dd887cbb1fe800f41f77dcd0202b4e388f6c4a4b Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Wed, 6 Mar 2024 13:47:52 +0100 Subject: [PATCH 1/5] Do all tag queries with query builder Fix bug in mysql database initialization. Fixes #231 --- .../migrations/mysql/01_init-schema.up.sql | 2 +- internal/repository/tags.go | 23 ++++++++++++++----- 2 files changed, 18 insertions(+), 7 deletions(-) diff --git a/internal/repository/migrations/mysql/01_init-schema.up.sql b/internal/repository/migrations/mysql/01_init-schema.up.sql index 3a6930c..4eb6268 100644 --- a/internal/repository/migrations/mysql/01_init-schema.up.sql +++ b/internal/repository/migrations/mysql/01_init-schema.up.sql @@ -36,7 +36,7 @@ CREATE TABLE IF NOT EXISTS job ( ); CREATE TABLE IF NOT EXISTS tag ( - id INTEGER PRIMARY KEY, + id INTEGER AUTO_INCREMENT PRIMARY KEY, tag_type VARCHAR(255) NOT NULL, tag_name VARCHAR(255) NOT NULL, UNIQUE (tag_type, tag_name)); diff --git a/internal/repository/tags.go b/internal/repository/tags.go index 52bc836..e97bc65 100644 --- a/internal/repository/tags.go +++ b/internal/repository/tags.go @@ -15,8 +15,11 @@ import ( // Add the tag with id `tagId` to the job with the database id `jobId`. func (r *JobRepository) AddTag(job int64, tag int64) ([]*schema.Tag, error) { - if _, err := r.stmtCache.Exec(`INSERT INTO jobtag (job_id, tag_id) VALUES ($1, $2)`, job, tag); err != nil { - log.Error("Error while running query") + q := sq.Insert("jobtag").Columns("job_id", "tag_id").Values(job, tag) + + if _, err := q.RunWith(r.stmtCache).Exec(); err != nil { + s, _, _ := q.ToSql() + log.Errorf("Error adding tag with %s: %v", s, err) return nil, err } @@ -37,8 +40,11 @@ func (r *JobRepository) AddTag(job int64, tag int64) ([]*schema.Tag, error) { // Removes a tag from a job func (r *JobRepository) RemoveTag(job, tag int64) ([]*schema.Tag, error) { - if _, err := r.stmtCache.Exec("DELETE FROM jobtag WHERE jobtag.job_id = $1 AND jobtag.tag_id = $2", job, tag); err != nil { - log.Error("Error while running query") + q := sq.Delete("jobtag").Where("jobtag.job_id = ?", job).Where("jobtag.tag_id = ?", tag) + + if _, err := q.RunWith(r.stmtCache).Exec(); err != nil { + s, _, _ := q.ToSql() + log.Errorf("Error adding tag with %s: %v", s, err) return nil, err } @@ -59,8 +65,12 @@ func (r *JobRepository) RemoveTag(job, tag int64) ([]*schema.Tag, error) { // CreateTag creates a new tag with the specified type and name and returns its database id. func (r *JobRepository) CreateTag(tagType string, tagName string) (tagId int64, err error) { - res, err := r.stmtCache.Exec("INSERT INTO tag (tag_type, tag_name) VALUES ($1, $2)", tagType, tagName) + q := sq.Insert("tag").Columns("tag_type", "tag_name").Values(tagType, tagName) + + res, err := q.RunWith(r.stmtCache).Exec() if err != nil { + s, _, _ := q.ToSql() + log.Errorf("Error inserting tag with %s: %v", s, err) return 0, err } @@ -154,7 +164,8 @@ func (r *JobRepository) GetTags(job *int64) ([]*schema.Tag, error) { rows, err := q.RunWith(r.stmtCache).Query() if err != nil { - log.Error("Error while running query") + s, _, _ := q.ToSql() + log.Errorf("Error get tags with %s: %v", s, err) return nil, err } From aa6336ea1eccd00592f2557cb81e7d3dec640a92 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Wed, 6 Mar 2024 14:50:08 +0100 Subject: [PATCH 2/5] Refactor Reformat. Convert to query builder. Add descriptive error log messages. --- internal/repository/job.go | 46 +++++++++++++++++++++----------------- 1 file changed, 26 insertions(+), 20 deletions(-) diff --git a/internal/repository/job.go b/internal/repository/job.go index e1a997a..db02283 100644 --- a/internal/repository/job.go +++ b/internal/repository/job.go @@ -223,8 +223,8 @@ func (r *JobRepository) UpdateMetadata(job *schema.Job, key, val string) (err er func (r *JobRepository) Find( jobId *int64, cluster *string, - startTime *int64) (*schema.Job, error) { - + startTime *int64, +) (*schema.Job, error) { start := time.Now() q := sq.Select(jobColumns...).From("job"). Where("job.job_id = ?", *jobId) @@ -248,8 +248,8 @@ func (r *JobRepository) Find( func (r *JobRepository) FindAll( jobId *int64, cluster *string, - startTime *int64) ([]*schema.Job, error) { - + startTime *int64, +) ([]*schema.Job, error) { start := time.Now() q := sq.Select(jobColumns...).From("job"). Where("job.job_id = ?", *jobId) @@ -292,7 +292,8 @@ func (r *JobRepository) FindById(jobId int64) (*schema.Job, error) { func (r *JobRepository) FindConcurrentJobs( ctx context.Context, - job *schema.Job) (*model.JobLinkResultList, error) { + job *schema.Job, +) (*model.JobLinkResultList, error) { if job == nil { return nil, nil } @@ -420,8 +421,8 @@ func (r *JobRepository) Stop( jobId int64, duration int32, state schema.JobState, - monitoringStatus int32) (err error) { - + monitoringStatus int32, +) (err error) { stmt := sq.Update("job"). Set("job_state", state). Set("duration", duration). @@ -434,11 +435,14 @@ func (r *JobRepository) Stop( func (r *JobRepository) DeleteJobsBefore(startTime int64) (int, error) { var cnt int - qs := fmt.Sprintf("SELECT count(*) FROM job WHERE job.start_time < %d", startTime) - err := r.DB.Get(&cnt, qs) //ignore error as it will also occur in delete statement - _, err = r.DB.Exec(`DELETE FROM job WHERE job.start_time < ?`, startTime) + q := sq.Select("count(*)").From("job").Where("job.start_time < ?", startTime) + q.RunWith(r.DB).QueryRow().Scan(cnt) + qd := sq.Delete("job").Where("job.start_time < ?", startTime) + _, err := qd.RunWith(r.DB).Exec() + if err != nil { - log.Errorf(" DeleteJobsBefore(%d): error %#v", startTime, err) + s, _, _ := qd.ToSql() + log.Errorf(" DeleteJobsBefore(%d) with %s: error %#v", startTime, s, err) } else { log.Debugf("DeleteJobsBefore(%d): Deleted %d jobs", startTime, cnt) } @@ -446,9 +450,12 @@ func (r *JobRepository) DeleteJobsBefore(startTime int64) (int, error) { } func (r *JobRepository) DeleteJobById(id int64) error { - _, err := r.DB.Exec(`DELETE FROM job WHERE job.id = ?`, id) + qd := sq.Delete("job").Where("job.id = ?", id) + _, err := qd.RunWith(r.DB).Exec() + if err != nil { - log.Errorf("DeleteJobById(%d): error %#v", id, err) + s, _, _ := qd.ToSql() + log.Errorf("DeleteJobById(%d) with %s : error %#v", id, s, err) } else { log.Debugf("DeleteJobById(%d): Success", id) } @@ -468,8 +475,8 @@ func (r *JobRepository) UpdateMonitoringStatus(job int64, monitoringStatus int32 func (r *JobRepository) MarkArchived( jobId int64, monitoringStatus int32, - metricStats map[string]schema.JobStatistics) error { - + metricStats map[string]schema.JobStatistics, +) error { stmt := sq.Update("job"). Set("monitoring_status", monitoringStatus). Where("job.id = ?", jobId) @@ -578,8 +585,10 @@ func (r *JobRepository) FindUserOrProjectOrJobname(user *schema.User, searchterm } } -var ErrNotFound = errors.New("no such jobname, project or user") -var ErrForbidden = errors.New("not authorized") +var ( + ErrNotFound = errors.New("no such jobname, project or user") + ErrForbidden = errors.New("not authorized") +) func (r *JobRepository) FindColumnValue(user *schema.User, searchterm string, table string, selectColumn string, whereColumn string, isLike bool) (result string, err error) { compareStr := " = ?" @@ -663,7 +672,6 @@ func (r *JobRepository) Partitions(cluster string) ([]string, error) { // AllocatedNodes returns a map of all subclusters to a map of hostnames to the amount of jobs running on that host. // Hosts with zero jobs running on them will not show up! func (r *JobRepository) AllocatedNodes(cluster string) (map[string]map[string]int, error) { - start := time.Now() subclusters := make(map[string]map[string]int) rows, err := sq.Select("resources", "subcluster").From("job"). @@ -706,7 +714,6 @@ func (r *JobRepository) AllocatedNodes(cluster string) (map[string]map[string]in } func (r *JobRepository) StopJobsExceedingWalltimeBy(seconds int) error { - start := time.Now() res, err := sq.Update("job"). Set("monitoring_status", schema.MonitoringStatusArchivingFailed). @@ -735,7 +742,6 @@ func (r *JobRepository) StopJobsExceedingWalltimeBy(seconds int) error { } func (r *JobRepository) FindJobsBetween(startTimeBegin int64, startTimeEnd int64) ([]*schema.Job, error) { - var query sq.SelectBuilder if startTimeBegin == startTimeEnd || startTimeBegin > startTimeEnd { From 105b7eabf063de39035528fca0c5a8256c1f9e8d Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Fri, 8 Mar 2024 10:47:32 +0100 Subject: [PATCH 3/5] Add migration and introduce dirty flag handling --- cmd/cc-backend/main.go | 11 ++++- internal/repository/migration.go | 43 ++++++++++++++----- .../migrations/mysql/01_init-schema.up.sql | 2 +- .../migrations/mysql/07_fix-tag-id.down.sql | 1 + .../migrations/mysql/07_fix-tag-id.up.sql | 1 + .../migrations/sqlite3/07_fix-tag-id.down.sql | 0 .../migrations/sqlite3/07_fix-tag-id.up.sql | 0 7 files changed, 46 insertions(+), 12 deletions(-) create mode 100644 internal/repository/migrations/mysql/07_fix-tag-id.down.sql create mode 100644 internal/repository/migrations/mysql/07_fix-tag-id.up.sql create mode 100644 internal/repository/migrations/sqlite3/07_fix-tag-id.down.sql create mode 100644 internal/repository/migrations/sqlite3/07_fix-tag-id.up.sql diff --git a/cmd/cc-backend/main.go b/cmd/cc-backend/main.go index e956503..6bb514f 100644 --- a/cmd/cc-backend/main.go +++ b/cmd/cc-backend/main.go @@ -134,7 +134,7 @@ func initEnv() { } func main() { - var flagReinitDB, flagInit, flagServer, flagSyncLDAP, flagGops, flagMigrateDB, flagDev, flagVersion, flagLogDateTime bool + var flagReinitDB, flagInit, flagServer, flagSyncLDAP, flagGops, flagMigrateDB, flagForceDB, flagDev, flagVersion, flagLogDateTime bool var flagNewUser, flagDelUser, flagGenJWT, flagConfigFile, flagImportJob, flagLogLevel string flag.BoolVar(&flagInit, "init", false, "Setup var directory, initialize swlite database file, config.json and .env") flag.BoolVar(&flagReinitDB, "init-db", false, "Go through job-archive and re-initialize the 'job', 'tag', and 'jobtag' tables (all running jobs will be lost!)") @@ -144,6 +144,7 @@ func main() { flag.BoolVar(&flagDev, "dev", false, "Enable development components: GraphQL Playground and Swagger UI") flag.BoolVar(&flagVersion, "version", false, "Show version information and exit") flag.BoolVar(&flagMigrateDB, "migrate-db", false, "Migrate database to supported version and exit") + flag.BoolVar(&flagForceDB, "force-db", false, "Force database version, clear dirty flag and exit") flag.BoolVar(&flagLogDateTime, "logdate", false, "Set this flag to add date and time to log messages") flag.StringVar(&flagConfigFile, "config", "./config.json", "Specify alternative path to `config.json`") flag.StringVar(&flagNewUser, "add-user", "", "Add a new user. Argument format: `:[admin,support,manager,api,user]:`") @@ -205,6 +206,14 @@ func main() { os.Exit(0) } + if flagForceDB { + err := repository.ForceDB(config.Keys.DBDriver, config.Keys.DB) + if err != nil { + log.Fatal(err) + } + os.Exit(0) + } + repository.Connect(config.Keys.DBDriver, config.Keys.DB) db := repository.GetConnection() diff --git a/internal/repository/migration.go b/internal/repository/migration.go index 0f37d0a..e71ed66 100644 --- a/internal/repository/migration.go +++ b/internal/repository/migration.go @@ -16,7 +16,7 @@ import ( "github.com/golang-migrate/migrate/v4/source/iofs" ) -const Version uint = 6 +const Version uint = 7 //go:embed migrations/* var migrationFiles embed.FS @@ -57,7 +57,7 @@ func checkDBVersion(backend string, db *sql.DB) error { log.Fatalf("unsupported database backend: %s", backend) } - v, _, err := m.Version() + v, dirty, err := m.Version() if err != nil { if err == migrate.ErrNilVersion { log.Warn("Legacy database without version or missing database file!") @@ -68,18 +68,18 @@ func checkDBVersion(backend string, db *sql.DB) error { if v < Version { return fmt.Errorf("unsupported database version %d, need %d.\nPlease backup your database file and run cc-backend -migrate-db", v, Version) + } else if v > Version { + return fmt.Errorf("unsupported database version %d, need %d.\nPlease refer to documentation how to downgrade db with external migrate tool", v, Version) } - if v > Version { - return fmt.Errorf("unsupported database version %d, need %d.\nPlease refer to documentation how to downgrade db with external migrate tool", v, Version) + if dirty { + return fmt.Errorf("last migration to version %d has failed, please fix the db manually and force version with -force-db flag", Version) } return nil } -func MigrateDB(backend string, db string) error { - var m *migrate.Migrate - +func getMigrateInstance(backend string, db string) (m *migrate.Migrate, err error) { switch backend { case "sqlite3": d, err := iofs.New(migrationFiles, "migrations/sqlite3") @@ -89,22 +89,31 @@ func MigrateDB(backend string, db string) error { m, err = migrate.NewWithSourceInstance("iofs", d, fmt.Sprintf("sqlite3://%s?_foreign_keys=on", db)) if err != nil { - return err + return m, err } case "mysql": d, err := iofs.New(migrationFiles, "migrations/mysql") if err != nil { - return err + return m, err } m, err = migrate.NewWithSourceInstance("iofs", d, fmt.Sprintf("mysql://%s?multiStatements=true", db)) if err != nil { - return err + return m, err } default: log.Fatalf("unsupported database backend: %s", backend) } + return m, nil +} + +func MigrateDB(backend string, db string) error { + m, err := getMigrateInstance(backend, db) + if err != nil { + return err + } + if err := m.Up(); err != nil { if err == migrate.ErrNoChange { log.Info("DB already up to date!") @@ -116,3 +125,17 @@ func MigrateDB(backend string, db string) error { m.Close() return nil } + +func ForceDB(backend string, db string) error { + m, err := getMigrateInstance(backend, db) + if err != nil { + return err + } + + if err := m.Force(int(Version)); err != nil { + return err + } + + m.Close() + return nil +} diff --git a/internal/repository/migrations/mysql/01_init-schema.up.sql b/internal/repository/migrations/mysql/01_init-schema.up.sql index 4eb6268..3a6930c 100644 --- a/internal/repository/migrations/mysql/01_init-schema.up.sql +++ b/internal/repository/migrations/mysql/01_init-schema.up.sql @@ -36,7 +36,7 @@ CREATE TABLE IF NOT EXISTS job ( ); CREATE TABLE IF NOT EXISTS tag ( - id INTEGER AUTO_INCREMENT PRIMARY KEY, + id INTEGER PRIMARY KEY, tag_type VARCHAR(255) NOT NULL, tag_name VARCHAR(255) NOT NULL, UNIQUE (tag_type, tag_name)); diff --git a/internal/repository/migrations/mysql/07_fix-tag-id.down.sql b/internal/repository/migrations/mysql/07_fix-tag-id.down.sql new file mode 100644 index 0000000..4172f4e --- /dev/null +++ b/internal/repository/migrations/mysql/07_fix-tag-id.down.sql @@ -0,0 +1 @@ +ALTER TABLE tag MODIFY id INTEGER; diff --git a/internal/repository/migrations/mysql/07_fix-tag-id.up.sql b/internal/repository/migrations/mysql/07_fix-tag-id.up.sql new file mode 100644 index 0000000..f8d805f --- /dev/null +++ b/internal/repository/migrations/mysql/07_fix-tag-id.up.sql @@ -0,0 +1 @@ +ALTER TABLE tag MODIFY id INTEGER AUTO_INCREMENT; diff --git a/internal/repository/migrations/sqlite3/07_fix-tag-id.down.sql b/internal/repository/migrations/sqlite3/07_fix-tag-id.down.sql new file mode 100644 index 0000000..e69de29 diff --git a/internal/repository/migrations/sqlite3/07_fix-tag-id.up.sql b/internal/repository/migrations/sqlite3/07_fix-tag-id.up.sql new file mode 100644 index 0000000..e69de29 From 2c2c1accb53d44eb1d1d1ebf9c7b7f0ef95003c8 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Fri, 8 Mar 2024 10:58:45 +0100 Subject: [PATCH 4/5] Allow up and down migration of database --- internal/repository/migration.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/repository/migration.go b/internal/repository/migration.go index e71ed66..d7bfe13 100644 --- a/internal/repository/migration.go +++ b/internal/repository/migration.go @@ -114,7 +114,7 @@ func MigrateDB(backend string, db string) error { return err } - if err := m.Up(); err != nil { + if err := m.Migrate(Version); err != nil { if err == migrate.ErrNoChange { log.Info("DB already up to date!") } else { From 06d01962a65a175a7f0cf923bbf50be944c2f9a5 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Fri, 8 Mar 2024 11:28:26 +0100 Subject: [PATCH 5/5] feat: Allow to revert db to previous version --- cmd/cc-backend/main.go | 11 ++++++++++- internal/repository/migration.go | 20 +++++++++++++++++++- 2 files changed, 29 insertions(+), 2 deletions(-) diff --git a/cmd/cc-backend/main.go b/cmd/cc-backend/main.go index 6bb514f..991fe6b 100644 --- a/cmd/cc-backend/main.go +++ b/cmd/cc-backend/main.go @@ -134,7 +134,7 @@ func initEnv() { } func main() { - var flagReinitDB, flagInit, flagServer, flagSyncLDAP, flagGops, flagMigrateDB, flagForceDB, flagDev, flagVersion, flagLogDateTime bool + var flagReinitDB, flagInit, flagServer, flagSyncLDAP, flagGops, flagMigrateDB, flagRevertDB, flagForceDB, flagDev, flagVersion, flagLogDateTime bool var flagNewUser, flagDelUser, flagGenJWT, flagConfigFile, flagImportJob, flagLogLevel string flag.BoolVar(&flagInit, "init", false, "Setup var directory, initialize swlite database file, config.json and .env") flag.BoolVar(&flagReinitDB, "init-db", false, "Go through job-archive and re-initialize the 'job', 'tag', and 'jobtag' tables (all running jobs will be lost!)") @@ -144,6 +144,7 @@ func main() { flag.BoolVar(&flagDev, "dev", false, "Enable development components: GraphQL Playground and Swagger UI") flag.BoolVar(&flagVersion, "version", false, "Show version information and exit") flag.BoolVar(&flagMigrateDB, "migrate-db", false, "Migrate database to supported version and exit") + flag.BoolVar(&flagRevertDB, "revert-db", false, "Migrate database to previous version and exit") flag.BoolVar(&flagForceDB, "force-db", false, "Force database version, clear dirty flag and exit") flag.BoolVar(&flagLogDateTime, "logdate", false, "Set this flag to add date and time to log messages") flag.StringVar(&flagConfigFile, "config", "./config.json", "Specify alternative path to `config.json`") @@ -206,6 +207,14 @@ func main() { os.Exit(0) } + if flagRevertDB { + err := repository.RevertDB(config.Keys.DBDriver, config.Keys.DB) + if err != nil { + log.Fatal(err) + } + os.Exit(0) + } + if flagForceDB { err := repository.ForceDB(config.Keys.DBDriver, config.Keys.DB) if err != nil { diff --git a/internal/repository/migration.go b/internal/repository/migration.go index d7bfe13..38a88f7 100644 --- a/internal/repository/migration.go +++ b/internal/repository/migration.go @@ -114,7 +114,25 @@ func MigrateDB(backend string, db string) error { return err } - if err := m.Migrate(Version); err != nil { + if err := m.Up(); err != nil { + if err == migrate.ErrNoChange { + log.Info("DB already up to date!") + } else { + return err + } + } + + m.Close() + return nil +} + +func RevertDB(backend string, db string) error { + m, err := getMigrateInstance(backend, db) + if err != nil { + return err + } + + if err := m.Migrate(Version - 1); err != nil { if err == migrate.ErrNoChange { log.Info("DB already up to date!") } else {