mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2025-01-27 03:39:05 +01:00
fix: Adapt tag db queries to also work with mysql/mariadb
231 sql statement syntax
This commit is contained in:
commit
1c7cc9e16f
@ -134,7 +134,7 @@ func initEnv() {
|
||||
}
|
||||
|
||||
func main() {
|
||||
var flagReinitDB, flagInit, flagServer, flagSyncLDAP, flagGops, flagMigrateDB, flagDev, flagVersion, flagLogDateTime bool
|
||||
var flagReinitDB, flagInit, flagServer, flagSyncLDAP, flagGops, flagMigrateDB, flagRevertDB, flagForceDB, flagDev, flagVersion, flagLogDateTime bool
|
||||
var flagNewUser, flagDelUser, flagGenJWT, flagConfigFile, flagImportJob, flagLogLevel string
|
||||
flag.BoolVar(&flagInit, "init", false, "Setup var directory, initialize swlite database file, config.json and .env")
|
||||
flag.BoolVar(&flagReinitDB, "init-db", false, "Go through job-archive and re-initialize the 'job', 'tag', and 'jobtag' tables (all running jobs will be lost!)")
|
||||
@ -144,6 +144,8 @@ func main() {
|
||||
flag.BoolVar(&flagDev, "dev", false, "Enable development components: GraphQL Playground and Swagger UI")
|
||||
flag.BoolVar(&flagVersion, "version", false, "Show version information and exit")
|
||||
flag.BoolVar(&flagMigrateDB, "migrate-db", false, "Migrate database to supported version and exit")
|
||||
flag.BoolVar(&flagRevertDB, "revert-db", false, "Migrate database to previous version and exit")
|
||||
flag.BoolVar(&flagForceDB, "force-db", false, "Force database version, clear dirty flag and exit")
|
||||
flag.BoolVar(&flagLogDateTime, "logdate", false, "Set this flag to add date and time to log messages")
|
||||
flag.StringVar(&flagConfigFile, "config", "./config.json", "Specify alternative path to `config.json`")
|
||||
flag.StringVar(&flagNewUser, "add-user", "", "Add a new user. Argument format: `<username>:[admin,support,manager,api,user]:<password>`")
|
||||
@ -205,6 +207,22 @@ func main() {
|
||||
os.Exit(0)
|
||||
}
|
||||
|
||||
if flagRevertDB {
|
||||
err := repository.RevertDB(config.Keys.DBDriver, config.Keys.DB)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
os.Exit(0)
|
||||
}
|
||||
|
||||
if flagForceDB {
|
||||
err := repository.ForceDB(config.Keys.DBDriver, config.Keys.DB)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
os.Exit(0)
|
||||
}
|
||||
|
||||
repository.Connect(config.Keys.DBDriver, config.Keys.DB)
|
||||
db := repository.GetConnection()
|
||||
|
||||
|
@ -436,11 +436,14 @@ func (r *JobRepository) Stop(
|
||||
|
||||
func (r *JobRepository) DeleteJobsBefore(startTime int64) (int, error) {
|
||||
var cnt int
|
||||
qs := fmt.Sprintf("SELECT count(*) FROM job WHERE job.start_time < %d", startTime)
|
||||
err := r.DB.Get(&cnt, qs) // ignore error as it will also occur in delete statement
|
||||
_, err = r.DB.Exec(`DELETE FROM job WHERE job.start_time < ?`, startTime)
|
||||
q := sq.Select("count(*)").From("job").Where("job.start_time < ?", startTime)
|
||||
q.RunWith(r.DB).QueryRow().Scan(cnt)
|
||||
qd := sq.Delete("job").Where("job.start_time < ?", startTime)
|
||||
_, err := qd.RunWith(r.DB).Exec()
|
||||
|
||||
if err != nil {
|
||||
log.Errorf(" DeleteJobsBefore(%d): error %#v", startTime, err)
|
||||
s, _, _ := qd.ToSql()
|
||||
log.Errorf(" DeleteJobsBefore(%d) with %s: error %#v", startTime, s, err)
|
||||
} else {
|
||||
log.Debugf("DeleteJobsBefore(%d): Deleted %d jobs", startTime, cnt)
|
||||
}
|
||||
@ -448,9 +451,12 @@ func (r *JobRepository) DeleteJobsBefore(startTime int64) (int, error) {
|
||||
}
|
||||
|
||||
func (r *JobRepository) DeleteJobById(id int64) error {
|
||||
_, err := r.DB.Exec(`DELETE FROM job WHERE job.id = ?`, id)
|
||||
qd := sq.Delete("job").Where("job.id = ?", id)
|
||||
_, err := qd.RunWith(r.DB).Exec()
|
||||
|
||||
if err != nil {
|
||||
log.Errorf("DeleteJobById(%d): error %#v", id, err)
|
||||
s, _, _ := qd.ToSql()
|
||||
log.Errorf("DeleteJobById(%d) with %s : error %#v", id, s, err)
|
||||
} else {
|
||||
log.Debugf("DeleteJobById(%d): Success", id)
|
||||
}
|
||||
|
@ -16,7 +16,7 @@ import (
|
||||
"github.com/golang-migrate/migrate/v4/source/iofs"
|
||||
)
|
||||
|
||||
const Version uint = 6
|
||||
const Version uint = 7
|
||||
|
||||
//go:embed migrations/*
|
||||
var migrationFiles embed.FS
|
||||
@ -57,7 +57,7 @@ func checkDBVersion(backend string, db *sql.DB) error {
|
||||
log.Fatalf("unsupported database backend: %s", backend)
|
||||
}
|
||||
|
||||
v, _, err := m.Version()
|
||||
v, dirty, err := m.Version()
|
||||
if err != nil {
|
||||
if err == migrate.ErrNilVersion {
|
||||
log.Warn("Legacy database without version or missing database file!")
|
||||
@ -68,18 +68,18 @@ func checkDBVersion(backend string, db *sql.DB) error {
|
||||
|
||||
if v < Version {
|
||||
return fmt.Errorf("unsupported database version %d, need %d.\nPlease backup your database file and run cc-backend -migrate-db", v, Version)
|
||||
} else if v > Version {
|
||||
return fmt.Errorf("unsupported database version %d, need %d.\nPlease refer to documentation how to downgrade db with external migrate tool", v, Version)
|
||||
}
|
||||
|
||||
if v > Version {
|
||||
return fmt.Errorf("unsupported database version %d, need %d.\nPlease refer to documentation how to downgrade db with external migrate tool", v, Version)
|
||||
if dirty {
|
||||
return fmt.Errorf("last migration to version %d has failed, please fix the db manually and force version with -force-db flag", Version)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func MigrateDB(backend string, db string) error {
|
||||
var m *migrate.Migrate
|
||||
|
||||
func getMigrateInstance(backend string, db string) (m *migrate.Migrate, err error) {
|
||||
switch backend {
|
||||
case "sqlite3":
|
||||
d, err := iofs.New(migrationFiles, "migrations/sqlite3")
|
||||
@ -89,22 +89,31 @@ func MigrateDB(backend string, db string) error {
|
||||
|
||||
m, err = migrate.NewWithSourceInstance("iofs", d, fmt.Sprintf("sqlite3://%s?_foreign_keys=on", db))
|
||||
if err != nil {
|
||||
return err
|
||||
return m, err
|
||||
}
|
||||
case "mysql":
|
||||
d, err := iofs.New(migrationFiles, "migrations/mysql")
|
||||
if err != nil {
|
||||
return err
|
||||
return m, err
|
||||
}
|
||||
|
||||
m, err = migrate.NewWithSourceInstance("iofs", d, fmt.Sprintf("mysql://%s?multiStatements=true", db))
|
||||
if err != nil {
|
||||
return err
|
||||
return m, err
|
||||
}
|
||||
default:
|
||||
log.Fatalf("unsupported database backend: %s", backend)
|
||||
}
|
||||
|
||||
return m, nil
|
||||
}
|
||||
|
||||
func MigrateDB(backend string, db string) error {
|
||||
m, err := getMigrateInstance(backend, db)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := m.Up(); err != nil {
|
||||
if err == migrate.ErrNoChange {
|
||||
log.Info("DB already up to date!")
|
||||
@ -116,3 +125,35 @@ func MigrateDB(backend string, db string) error {
|
||||
m.Close()
|
||||
return nil
|
||||
}
|
||||
|
||||
func RevertDB(backend string, db string) error {
|
||||
m, err := getMigrateInstance(backend, db)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := m.Migrate(Version - 1); err != nil {
|
||||
if err == migrate.ErrNoChange {
|
||||
log.Info("DB already up to date!")
|
||||
} else {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
m.Close()
|
||||
return nil
|
||||
}
|
||||
|
||||
func ForceDB(backend string, db string) error {
|
||||
m, err := getMigrateInstance(backend, db)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := m.Force(int(Version)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
m.Close()
|
||||
return nil
|
||||
}
|
||||
|
@ -0,0 +1 @@
|
||||
ALTER TABLE tag MODIFY id INTEGER;
|
@ -0,0 +1 @@
|
||||
ALTER TABLE tag MODIFY id INTEGER AUTO_INCREMENT;
|
@ -15,8 +15,11 @@ import (
|
||||
|
||||
// Add the tag with id `tagId` to the job with the database id `jobId`.
|
||||
func (r *JobRepository) AddTag(job int64, tag int64) ([]*schema.Tag, error) {
|
||||
if _, err := r.stmtCache.Exec(`INSERT INTO jobtag (job_id, tag_id) VALUES ($1, $2)`, job, tag); err != nil {
|
||||
log.Error("Error while running query")
|
||||
q := sq.Insert("jobtag").Columns("job_id", "tag_id").Values(job, tag)
|
||||
|
||||
if _, err := q.RunWith(r.stmtCache).Exec(); err != nil {
|
||||
s, _, _ := q.ToSql()
|
||||
log.Errorf("Error adding tag with %s: %v", s, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -37,8 +40,11 @@ func (r *JobRepository) AddTag(job int64, tag int64) ([]*schema.Tag, error) {
|
||||
|
||||
// Removes a tag from a job
|
||||
func (r *JobRepository) RemoveTag(job, tag int64) ([]*schema.Tag, error) {
|
||||
if _, err := r.stmtCache.Exec("DELETE FROM jobtag WHERE jobtag.job_id = $1 AND jobtag.tag_id = $2", job, tag); err != nil {
|
||||
log.Error("Error while running query")
|
||||
q := sq.Delete("jobtag").Where("jobtag.job_id = ?", job).Where("jobtag.tag_id = ?", tag)
|
||||
|
||||
if _, err := q.RunWith(r.stmtCache).Exec(); err != nil {
|
||||
s, _, _ := q.ToSql()
|
||||
log.Errorf("Error adding tag with %s: %v", s, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -59,8 +65,12 @@ func (r *JobRepository) RemoveTag(job, tag int64) ([]*schema.Tag, error) {
|
||||
|
||||
// CreateTag creates a new tag with the specified type and name and returns its database id.
|
||||
func (r *JobRepository) CreateTag(tagType string, tagName string) (tagId int64, err error) {
|
||||
res, err := r.stmtCache.Exec("INSERT INTO tag (tag_type, tag_name) VALUES ($1, $2)", tagType, tagName)
|
||||
q := sq.Insert("tag").Columns("tag_type", "tag_name").Values(tagType, tagName)
|
||||
|
||||
res, err := q.RunWith(r.stmtCache).Exec()
|
||||
if err != nil {
|
||||
s, _, _ := q.ToSql()
|
||||
log.Errorf("Error inserting tag with %s: %v", s, err)
|
||||
return 0, err
|
||||
}
|
||||
|
||||
@ -154,7 +164,8 @@ func (r *JobRepository) GetTags(job *int64) ([]*schema.Tag, error) {
|
||||
|
||||
rows, err := q.RunWith(r.stmtCache).Query()
|
||||
if err != nil {
|
||||
log.Error("Error while running query")
|
||||
s, _, _ := q.ToSql()
|
||||
log.Errorf("Error get tags with %s: %v", s, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user