Introduce caching table for faster job inserts

Fixes #392
This commit is contained in:
Jan Eitzinger 2025-05-16 17:32:19 +02:00
parent 432e06e801
commit eab7961a83
6 changed files with 114 additions and 20 deletions

View File

@ -9,12 +9,12 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"maps"
"math" "math"
"strconv" "strconv"
"sync" "sync"
"time" "time"
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
"github.com/ClusterCockpit/cc-backend/pkg/archive" "github.com/ClusterCockpit/cc-backend/pkg/archive"
"github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/lrucache" "github.com/ClusterCockpit/cc-backend/pkg/lrucache"
@ -33,6 +33,7 @@ type JobRepository struct {
stmtCache *sq.StmtCache stmtCache *sq.StmtCache
cache *lrucache.Cache cache *lrucache.Cache
driver string driver string
Mutex sync.Mutex
} }
func GetJobRepository() *JobRepository { func GetJobRepository() *JobRepository {
@ -56,7 +57,7 @@ var jobColumns []string = []string{
"job.duration", "job.walltime", "job.resources", "job.footprint", "job.energy", "job.duration", "job.walltime", "job.resources", "job.footprint", "job.energy",
} }
func scanJob(row interface{ Scan(...interface{}) error }) (*schema.Job, error) { func scanJob(row interface{ Scan(...any) error }) (*schema.Job, error) {
job := &schema.Job{} job := &schema.Job{}
if err := row.Scan( if err := row.Scan(
@ -138,17 +139,6 @@ func (r *JobRepository) Flush() error {
return nil return nil
} }
func scanJobLink(row interface{ Scan(...interface{}) error }) (*model.JobLink, error) {
jobLink := &model.JobLink{}
if err := row.Scan(
&jobLink.ID, &jobLink.JobID); err != nil {
log.Warn("Error while scanning rows (jobLink)")
return nil, err
}
return jobLink, nil
}
func (r *JobRepository) FetchMetadata(job *schema.Job) (map[string]string, error) { func (r *JobRepository) FetchMetadata(job *schema.Job) (map[string]string, error) {
start := time.Now() start := time.Now()
cachekey := fmt.Sprintf("metadata:%d", job.ID) cachekey := fmt.Sprintf("metadata:%d", job.ID)
@ -189,9 +179,7 @@ func (r *JobRepository) UpdateMetadata(job *schema.Job, key, val string) (err er
if job.MetaData != nil { if job.MetaData != nil {
cpy := make(map[string]string, len(job.MetaData)+1) cpy := make(map[string]string, len(job.MetaData)+1)
for k, v := range job.MetaData { maps.Copy(cpy, job.MetaData)
cpy[k] = v
}
cpy[key] = val cpy[key] = val
job.MetaData = cpy job.MetaData = cpy
} else { } else {
@ -389,7 +377,7 @@ func (r *JobRepository) FindColumnValues(user *schema.User, query string, table
func (r *JobRepository) Partitions(cluster string) ([]string, error) { func (r *JobRepository) Partitions(cluster string) ([]string, error) {
var err error var err error
start := time.Now() start := time.Now()
partitions := r.cache.Get("partitions:"+cluster, func() (interface{}, time.Duration, int) { partitions := r.cache.Get("partitions:"+cluster, func() (any, time.Duration, int) {
parts := []string{} parts := []string{}
if err = r.DB.Select(&parts, `SELECT DISTINCT job.cluster_partition FROM job WHERE job.cluster = ?;`, cluster); err != nil { if err = r.DB.Select(&parts, `SELECT DISTINCT job.cluster_partition FROM job WHERE job.cluster = ?;`, cluster); err != nil {
return nil, 0, 1000 return nil, 0, 1000

View File

@ -13,6 +13,14 @@ import (
sq "github.com/Masterminds/squirrel" sq "github.com/Masterminds/squirrel"
) )
const NamedJobCacheInsert string = `INSERT INTO job_cache (
job_id, hpc_user, project, cluster, subcluster, cluster_partition, array_job_id, num_nodes, num_hwthreads, num_acc,
exclusive, monitoring_status, smt, job_state, start_time, duration, walltime, footprint, energy, energy_footprint, resources, meta_data
) VALUES (
:job_id, :hpc_user, :project, :cluster, :subcluster, :cluster_partition, :array_job_id, :num_nodes, :num_hwthreads, :num_acc,
:exclusive, :monitoring_status, :smt, :job_state, :start_time, :duration, :walltime, :footprint, :energy, :energy_footprint, :resources, :meta_data
);`
const NamedJobInsert string = `INSERT INTO job ( const NamedJobInsert string = `INSERT INTO job (
job_id, hpc_user, project, cluster, subcluster, cluster_partition, array_job_id, num_nodes, num_hwthreads, num_acc, job_id, hpc_user, project, cluster, subcluster, cluster_partition, array_job_id, num_nodes, num_hwthreads, num_acc,
exclusive, monitoring_status, smt, job_state, start_time, duration, walltime, footprint, energy, energy_footprint, resources, meta_data exclusive, monitoring_status, smt, job_state, start_time, duration, walltime, footprint, energy, energy_footprint, resources, meta_data
@ -22,7 +30,9 @@ const NamedJobInsert string = `INSERT INTO job (
);` );`
func (r *JobRepository) InsertJob(job *schema.JobMeta) (int64, error) { func (r *JobRepository) InsertJob(job *schema.JobMeta) (int64, error) {
res, err := r.DB.NamedExec(NamedJobInsert, job) r.Mutex.Lock()
res, err := r.DB.NamedExec(NamedJobCacheInsert, job)
r.Mutex.Unlock()
if err != nil { if err != nil {
log.Warn("Error while NamedJobInsert") log.Warn("Error while NamedJobInsert")
return 0, err return 0, err
@ -36,6 +46,25 @@ func (r *JobRepository) InsertJob(job *schema.JobMeta) (int64, error) {
return id, nil return id, nil
} }
func (r *JobRepository) SyncJobs() error {
r.Mutex.Lock()
defer r.Mutex.Unlock()
_, err := r.DB.Exec(
"INSERT INTO job (job_id, cluster, subcluster, start_time, hpc_user, project, cluster_partition, array_job_id, num_nodes, num_hwthreads, num_acc, exclusive, monitoring_status, smt, job_state, duration, walltime, footprint, energy, energy_footprint, resources, meta_data) SELECT job_id, cluster, subcluster, start_time, hpc_user, project, cluster_partition, array_job_id, num_nodes, num_hwthreads, num_acc, exclusive, monitoring_status, smt, job_state, duration, walltime, footprint, energy, energy_footprint, resources, meta_data FROM job_cache")
if err != nil {
log.Warnf("Error while Job sync: %v", err)
return err
}
_, err = r.DB.Exec("DELETE FROM job_cache")
if err != nil {
log.Warn("Error while Job cache clean")
return err
}
return nil
}
// Start inserts a new job in the table, returning the unique job ID. // Start inserts a new job in the table, returning the unique job ID.
// Statistics are not transfered! // Statistics are not transfered!
func (r *JobRepository) Start(job *schema.JobMeta) (id int64, err error) { func (r *JobRepository) Start(job *schema.JobMeta) (id int64, err error) {
@ -73,3 +102,19 @@ func (r *JobRepository) Stop(
_, err = stmt.RunWith(r.stmtCache).Exec() _, err = stmt.RunWith(r.stmtCache).Exec()
return return
} }
func (r *JobRepository) StopCached(
jobId int64,
duration int32,
state schema.JobState,
monitoringStatus int32,
) (err error) {
stmt := sq.Update("job_cache").
Set("job_state", state).
Set("duration", duration).
Set("monitoring_status", monitoringStatus).
Where("job.id = ?", jobId)
_, err = stmt.RunWith(r.stmtCache).Exec()
return
}

View File

@ -43,6 +43,26 @@ func (r *JobRepository) Find(
return scanJob(q.RunWith(r.stmtCache).QueryRow()) return scanJob(q.RunWith(r.stmtCache).QueryRow())
} }
func (r *JobRepository) FindCached(
jobId *int64,
cluster *string,
startTime *int64,
) (*schema.Job, error) {
q := sq.Select(jobColumns...).From("job_cache").
Where("job_cache.job_id = ?", *jobId)
if cluster != nil {
q = q.Where("job_cache.cluster = ?", *cluster)
}
if startTime != nil {
q = q.Where("job_cache.start_time = ?", *startTime)
}
q = q.OrderBy("job_cache.id DESC") // always use newest matching job by db id if more than one match
return scanJob(q.RunWith(r.stmtCache).QueryRow())
}
// Find executes a SQL query to find a specific batch job. // Find executes a SQL query to find a specific batch job.
// The job is queried using the batch job id, the cluster name, // The job is queried using the batch job id, the cluster name,
// and the start time of the job in UNIX epoch time seconds. // and the start time of the job in UNIX epoch time seconds.

View File

@ -16,7 +16,7 @@ import (
"github.com/golang-migrate/migrate/v4/source/iofs" "github.com/golang-migrate/migrate/v4/source/iofs"
) )
const Version uint = 8 const Version uint = 9
//go:embed migrations/* //go:embed migrations/*
var migrationFiles embed.FS var migrationFiles embed.FS
@ -115,8 +115,17 @@ func MigrateDB(backend string, db string) error {
} }
v, dirty, err := m.Version() v, dirty, err := m.Version()
if err != nil {
if err == migrate.ErrNilVersion {
log.Warn("Legacy database without version or missing database file!")
} else {
return err
}
}
log.Infof("unsupported database version %d, need %d.\nPlease backup your database file and run cc-backend -migrate-db", v, Version) if v < Version {
log.Infof("unsupported database version %d, need %d.\nPlease backup your database file and run cc-backend -migrate-db", v, Version)
}
if dirty { if dirty {
return fmt.Errorf("last migration to version %d has failed, please fix the db manually and force version with -force-db flag", Version) return fmt.Errorf("last migration to version %d has failed, please fix the db manually and force version with -force-db flag", Version)

View File

@ -0,0 +1 @@
DROP TABLE IF EXISTS job_cache;

View File

@ -0,0 +1,31 @@
CREATE TABLE "job_cache" (
id INTEGER PRIMARY KEY,
job_id BIGINT NOT NULL,
cluster VARCHAR(255) NOT NULL,
subcluster VARCHAR(255) NOT NULL,
start_time BIGINT NOT NULL, -- Unix timestamp
hpc_user VARCHAR(255) NOT NULL,
project VARCHAR(255) NOT NULL,
cluster_partition VARCHAR(255),
array_job_id BIGINT,
duration INT NOT NULL,
walltime INT NOT NULL,
job_state VARCHAR(255) NOT NULL
CHECK (job_state IN (
'running', 'completed', 'failed', 'cancelled',
'stopped', 'timeout', 'preempted', 'out_of_memory'
)),
meta_data TEXT, -- JSON
resources TEXT NOT NULL, -- JSON
num_nodes INT NOT NULL,
num_hwthreads INT,
num_acc INT,
smt TINYINT NOT NULL DEFAULT 1 CHECK (smt IN (0, 1)),
exclusive TINYINT NOT NULL DEFAULT 1 CHECK (exclusive IN (0, 1, 2)),
monitoring_status TINYINT NOT NULL DEFAULT 1
CHECK (monitoring_status IN (0, 1, 2, 3)),
energy REAL NOT NULL DEFAULT 0.0,
energy_footprint TEXT DEFAULT NULL,
footprint TEXT DEFAULT NULL,
UNIQUE (job_id, cluster, start_time)
);