From eab7961a83ef1604fc496d75d6cae95249dc815a Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Fri, 16 May 2025 17:32:19 +0200 Subject: [PATCH] Introduce caching table for faster job inserts Fixes #392 --- internal/repository/job.go | 22 ++------- internal/repository/jobCreate.go | 47 ++++++++++++++++++- internal/repository/jobFind.go | 20 ++++++++ internal/repository/migration.go | 13 ++++- .../sqlite3/09_add-job-cache.down.sql | 1 + .../sqlite3/09_add-job-cache.up.sql | 31 ++++++++++++ 6 files changed, 114 insertions(+), 20 deletions(-) create mode 100644 internal/repository/migrations/sqlite3/09_add-job-cache.down.sql create mode 100644 internal/repository/migrations/sqlite3/09_add-job-cache.up.sql diff --git a/internal/repository/job.go b/internal/repository/job.go index 84de6f7..54a436a 100644 --- a/internal/repository/job.go +++ b/internal/repository/job.go @@ -9,12 +9,12 @@ import ( "encoding/json" "errors" "fmt" + "maps" "math" "strconv" "sync" "time" - "github.com/ClusterCockpit/cc-backend/internal/graph/model" "github.com/ClusterCockpit/cc-backend/pkg/archive" "github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/lrucache" @@ -33,6 +33,7 @@ type JobRepository struct { stmtCache *sq.StmtCache cache *lrucache.Cache driver string + Mutex sync.Mutex } func GetJobRepository() *JobRepository { @@ -56,7 +57,7 @@ var jobColumns []string = []string{ "job.duration", "job.walltime", "job.resources", "job.footprint", "job.energy", } -func scanJob(row interface{ Scan(...interface{}) error }) (*schema.Job, error) { +func scanJob(row interface{ Scan(...any) error }) (*schema.Job, error) { job := &schema.Job{} if err := row.Scan( @@ -138,17 +139,6 @@ func (r *JobRepository) Flush() error { return nil } -func scanJobLink(row interface{ Scan(...interface{}) error }) (*model.JobLink, error) { - jobLink := &model.JobLink{} - if err := row.Scan( - &jobLink.ID, &jobLink.JobID); err != nil { - log.Warn("Error while scanning rows (jobLink)") - return nil, err - } - - return jobLink, nil -} - func (r *JobRepository) FetchMetadata(job *schema.Job) (map[string]string, error) { start := time.Now() cachekey := fmt.Sprintf("metadata:%d", job.ID) @@ -189,9 +179,7 @@ func (r *JobRepository) UpdateMetadata(job *schema.Job, key, val string) (err er if job.MetaData != nil { cpy := make(map[string]string, len(job.MetaData)+1) - for k, v := range job.MetaData { - cpy[k] = v - } + maps.Copy(cpy, job.MetaData) cpy[key] = val job.MetaData = cpy } else { @@ -389,7 +377,7 @@ func (r *JobRepository) FindColumnValues(user *schema.User, query string, table func (r *JobRepository) Partitions(cluster string) ([]string, error) { var err error start := time.Now() - partitions := r.cache.Get("partitions:"+cluster, func() (interface{}, time.Duration, int) { + partitions := r.cache.Get("partitions:"+cluster, func() (any, time.Duration, int) { parts := []string{} if err = r.DB.Select(&parts, `SELECT DISTINCT job.cluster_partition FROM job WHERE job.cluster = ?;`, cluster); err != nil { return nil, 0, 1000 diff --git a/internal/repository/jobCreate.go b/internal/repository/jobCreate.go index 9e47974..3b997f3 100644 --- a/internal/repository/jobCreate.go +++ b/internal/repository/jobCreate.go @@ -13,6 +13,14 @@ import ( sq "github.com/Masterminds/squirrel" ) +const NamedJobCacheInsert string = `INSERT INTO job_cache ( + job_id, hpc_user, project, cluster, subcluster, cluster_partition, array_job_id, num_nodes, num_hwthreads, num_acc, + exclusive, monitoring_status, smt, job_state, start_time, duration, walltime, footprint, energy, energy_footprint, resources, meta_data +) VALUES ( + :job_id, :hpc_user, :project, :cluster, :subcluster, :cluster_partition, :array_job_id, :num_nodes, :num_hwthreads, :num_acc, + :exclusive, :monitoring_status, :smt, :job_state, :start_time, :duration, :walltime, :footprint, :energy, :energy_footprint, :resources, :meta_data +);` + const NamedJobInsert string = `INSERT INTO job ( job_id, hpc_user, project, cluster, subcluster, cluster_partition, array_job_id, num_nodes, num_hwthreads, num_acc, exclusive, monitoring_status, smt, job_state, start_time, duration, walltime, footprint, energy, energy_footprint, resources, meta_data @@ -22,7 +30,9 @@ const NamedJobInsert string = `INSERT INTO job ( );` func (r *JobRepository) InsertJob(job *schema.JobMeta) (int64, error) { - res, err := r.DB.NamedExec(NamedJobInsert, job) + r.Mutex.Lock() + res, err := r.DB.NamedExec(NamedJobCacheInsert, job) + r.Mutex.Unlock() if err != nil { log.Warn("Error while NamedJobInsert") return 0, err @@ -36,6 +46,25 @@ func (r *JobRepository) InsertJob(job *schema.JobMeta) (int64, error) { return id, nil } +func (r *JobRepository) SyncJobs() error { + r.Mutex.Lock() + defer r.Mutex.Unlock() + _, err := r.DB.Exec( + "INSERT INTO job (job_id, cluster, subcluster, start_time, hpc_user, project, cluster_partition, array_job_id, num_nodes, num_hwthreads, num_acc, exclusive, monitoring_status, smt, job_state, duration, walltime, footprint, energy, energy_footprint, resources, meta_data) SELECT job_id, cluster, subcluster, start_time, hpc_user, project, cluster_partition, array_job_id, num_nodes, num_hwthreads, num_acc, exclusive, monitoring_status, smt, job_state, duration, walltime, footprint, energy, energy_footprint, resources, meta_data FROM job_cache") + if err != nil { + log.Warnf("Error while Job sync: %v", err) + return err + } + + _, err = r.DB.Exec("DELETE FROM job_cache") + if err != nil { + log.Warn("Error while Job cache clean") + return err + } + + return nil +} + // Start inserts a new job in the table, returning the unique job ID. // Statistics are not transfered! func (r *JobRepository) Start(job *schema.JobMeta) (id int64, err error) { @@ -73,3 +102,19 @@ func (r *JobRepository) Stop( _, err = stmt.RunWith(r.stmtCache).Exec() return } + +func (r *JobRepository) StopCached( + jobId int64, + duration int32, + state schema.JobState, + monitoringStatus int32, +) (err error) { + stmt := sq.Update("job_cache"). + Set("job_state", state). + Set("duration", duration). + Set("monitoring_status", monitoringStatus). + Where("job.id = ?", jobId) + + _, err = stmt.RunWith(r.stmtCache).Exec() + return +} diff --git a/internal/repository/jobFind.go b/internal/repository/jobFind.go index 1e2ccb8..ac09355 100644 --- a/internal/repository/jobFind.go +++ b/internal/repository/jobFind.go @@ -43,6 +43,26 @@ func (r *JobRepository) Find( return scanJob(q.RunWith(r.stmtCache).QueryRow()) } +func (r *JobRepository) FindCached( + jobId *int64, + cluster *string, + startTime *int64, +) (*schema.Job, error) { + q := sq.Select(jobColumns...).From("job_cache"). + Where("job_cache.job_id = ?", *jobId) + + if cluster != nil { + q = q.Where("job_cache.cluster = ?", *cluster) + } + if startTime != nil { + q = q.Where("job_cache.start_time = ?", *startTime) + } + + q = q.OrderBy("job_cache.id DESC") // always use newest matching job by db id if more than one match + + return scanJob(q.RunWith(r.stmtCache).QueryRow()) +} + // Find executes a SQL query to find a specific batch job. // The job is queried using the batch job id, the cluster name, // and the start time of the job in UNIX epoch time seconds. diff --git a/internal/repository/migration.go b/internal/repository/migration.go index 0b2591e..c0693da 100644 --- a/internal/repository/migration.go +++ b/internal/repository/migration.go @@ -16,7 +16,7 @@ import ( "github.com/golang-migrate/migrate/v4/source/iofs" ) -const Version uint = 8 +const Version uint = 9 //go:embed migrations/* var migrationFiles embed.FS @@ -115,8 +115,17 @@ func MigrateDB(backend string, db string) error { } v, dirty, err := m.Version() + if err != nil { + if err == migrate.ErrNilVersion { + log.Warn("Legacy database without version or missing database file!") + } else { + return err + } + } - log.Infof("unsupported database version %d, need %d.\nPlease backup your database file and run cc-backend -migrate-db", v, Version) + if v < Version { + log.Infof("unsupported database version %d, need %d.\nPlease backup your database file and run cc-backend -migrate-db", v, Version) + } if dirty { return fmt.Errorf("last migration to version %d has failed, please fix the db manually and force version with -force-db flag", Version) diff --git a/internal/repository/migrations/sqlite3/09_add-job-cache.down.sql b/internal/repository/migrations/sqlite3/09_add-job-cache.down.sql new file mode 100644 index 0000000..ef257cf --- /dev/null +++ b/internal/repository/migrations/sqlite3/09_add-job-cache.down.sql @@ -0,0 +1 @@ +DROP TABLE IF EXISTS job_cache; diff --git a/internal/repository/migrations/sqlite3/09_add-job-cache.up.sql b/internal/repository/migrations/sqlite3/09_add-job-cache.up.sql new file mode 100644 index 0000000..7840369 --- /dev/null +++ b/internal/repository/migrations/sqlite3/09_add-job-cache.up.sql @@ -0,0 +1,31 @@ +CREATE TABLE "job_cache" ( + id INTEGER PRIMARY KEY, + job_id BIGINT NOT NULL, + cluster VARCHAR(255) NOT NULL, + subcluster VARCHAR(255) NOT NULL, + start_time BIGINT NOT NULL, -- Unix timestamp + hpc_user VARCHAR(255) NOT NULL, + project VARCHAR(255) NOT NULL, + cluster_partition VARCHAR(255), + array_job_id BIGINT, + duration INT NOT NULL, + walltime INT NOT NULL, + job_state VARCHAR(255) NOT NULL + CHECK (job_state IN ( + 'running', 'completed', 'failed', 'cancelled', + 'stopped', 'timeout', 'preempted', 'out_of_memory' + )), + meta_data TEXT, -- JSON + resources TEXT NOT NULL, -- JSON + num_nodes INT NOT NULL, + num_hwthreads INT, + num_acc INT, + smt TINYINT NOT NULL DEFAULT 1 CHECK (smt IN (0, 1)), + exclusive TINYINT NOT NULL DEFAULT 1 CHECK (exclusive IN (0, 1, 2)), + monitoring_status TINYINT NOT NULL DEFAULT 1 + CHECK (monitoring_status IN (0, 1, 2, 3)), + energy REAL NOT NULL DEFAULT 0.0, + energy_footprint TEXT DEFAULT NULL, + footprint TEXT DEFAULT NULL, + UNIQUE (job_id, cluster, start_time) +);