From 51517f80314742186a41ceaa9d1f2b92aae66b43 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Mon, 16 Mar 2026 11:17:47 +0100 Subject: [PATCH] Reduce insert pressure in db. Increase sqlite timeout value Entire-Checkpoint: a1e2931d4deb --- internal/api/nats.go | 26 +++++++++-- internal/api/node.go | 13 ++++-- internal/auth/ldap.go | 2 +- internal/repository/config.go | 7 +++ internal/repository/dbConnection.go | 2 +- internal/repository/jobCreate.go | 17 ++++++- internal/repository/node.go | 71 +++++++++++++++++++++++++++++ internal/repository/user.go | 15 ++++++ pkg/archive/sqliteBackend.go | 2 +- 9 files changed, 142 insertions(+), 13 deletions(-) diff --git a/internal/api/nats.go b/internal/api/nats.go index efa4ab6f..1834ff28 100644 --- a/internal/api/nats.go +++ b/internal/api/nats.go @@ -64,12 +64,19 @@ type NatsAPI struct { // RepositoryMutex protects job creation operations from race conditions // when checking for duplicate jobs during startJob calls. RepositoryMutex sync.Mutex + // jobSem limits concurrent NATS job event processing to prevent goroutine accumulation + // when the database is under contention. + jobSem chan struct{} + // nodeSem limits concurrent NATS node state processing. + nodeSem chan struct{} } // NewNatsAPI creates a new NatsAPI instance with default dependencies. func NewNatsAPI() *NatsAPI { return &NatsAPI{ JobRepository: repository.GetJobRepository(), + jobSem: make(chan struct{}, 8), + nodeSem: make(chan struct{}, 2), } } @@ -137,6 +144,9 @@ func (api *NatsAPI) processJobEvent(msg lp.CCMessage) { // // Example: job,function=start_job event="{\"jobId\":1001,...}" 1234567890000000000 func (api *NatsAPI) handleJobEvent(subject string, data []byte) { + api.jobSem <- struct{}{} + defer func() { <-api.jobSem }() + if len(data) == 0 { cclog.Warnf("NATS %s: received empty message", subject) return @@ -345,6 +355,7 @@ func (api *NatsAPI) processNodestateEvent(msg lp.CCMessage) { repo := repository.GetNodeRepository() requestReceived := time.Now().Unix() + updates := make([]repository.NodeStateUpdate, 0, len(req.Nodes)) for _, node := range req.Nodes { state := determineState(node.States) nodeState := schema.NodeStateDB{ @@ -356,11 +367,15 @@ func (api *NatsAPI) processNodestateEvent(msg lp.CCMessage) { HealthState: schema.MonitoringStateFull, JobsRunning: node.JobsRunning, } + updates = append(updates, repository.NodeStateUpdate{ + Hostname: node.Hostname, + Cluster: req.Cluster, + NodeState: &nodeState, + }) + } - if err := repo.UpdateNodeState(node.Hostname, req.Cluster, &nodeState); err != nil { - cclog.Errorf("NATS nodestate: updating node state for %s on %s failed: %v", - node.Hostname, req.Cluster, err) - } + if err := repo.BatchUpdateNodeStates(updates); err != nil { + cclog.Errorf("NATS nodestate: batch update for cluster %s failed: %v", req.Cluster, err) } cclog.Debugf("NATS nodestate: updated %d node states for cluster %s", len(req.Nodes), req.Cluster) @@ -372,6 +387,9 @@ func (api *NatsAPI) processNodestateEvent(msg lp.CCMessage) { // // Example: nodestate event="{\"cluster\":\"testcluster\",\"nodes\":[...]}" 1234567890000000000 func (api *NatsAPI) handleNodeState(subject string, data []byte) { + api.nodeSem <- struct{}{} + defer func() { <-api.nodeSem }() + if len(data) == 0 { cclog.Warnf("NATS %s: received empty message", subject) return diff --git a/internal/api/node.go b/internal/api/node.go index 5032ed7b..06787cd1 100644 --- a/internal/api/node.go +++ b/internal/api/node.go @@ -116,6 +116,7 @@ func (api *RestAPI) updateNodeStates(rw http.ResponseWriter, r *http.Request) { cclog.Debugf("Timer updateNodeStates, MemStore HealthCheck: %s", time.Since(startMs)) startDB := time.Now() + updates := make([]repository.NodeStateUpdate, 0, len(req.Nodes)) for _, node := range req.Nodes { state := determineState(node.States) healthState := schema.MonitoringStateFailed @@ -134,11 +135,15 @@ func (api *RestAPI) updateNodeStates(rw http.ResponseWriter, r *http.Request) { HealthMetrics: healthMetrics, JobsRunning: node.JobsRunning, } + updates = append(updates, repository.NodeStateUpdate{ + Hostname: node.Hostname, + Cluster: req.Cluster, + NodeState: &nodeState, + }) + } - if err := repo.UpdateNodeState(node.Hostname, req.Cluster, &nodeState); err != nil { - cclog.Errorf("updateNodeStates: updating node state for %s on %s failed: %v", - node.Hostname, req.Cluster, err) - } + if err := repo.BatchUpdateNodeStates(updates); err != nil { + cclog.Errorf("updateNodeStates: batch update for cluster %s failed: %v", req.Cluster, err) } cclog.Debugf("Timer updateNodeStates, SQLite Inserts: %s", time.Since(startDB)) diff --git a/internal/auth/ldap.go b/internal/auth/ldap.go index a174bb9d..b65e485f 100644 --- a/internal/auth/ldap.go +++ b/internal/auth/ldap.go @@ -210,7 +210,7 @@ func (la *LdapAuthenticator) Sync() error { } cclog.Debugf("sync: add %v (name: %v, roles: [user], ldap: true)", username, name) - if err := ur.AddUser(user); err != nil { + if err := ur.AddUserIfNotExists(user); err != nil { cclog.Errorf("User '%s' LDAP: Insert into DB failed", username) return err } diff --git a/internal/repository/config.go b/internal/repository/config.go index 2bd4f749..a201bb6b 100644 --- a/internal/repository/config.go +++ b/internal/repository/config.go @@ -46,6 +46,12 @@ type RepositoryConfig struct { // It's a soft limit — queries won't fail, but cache eviction becomes more aggressive. // Default: 16384 (16GB) DbSoftHeapLimitMB int + + // BusyTimeoutMs is the SQLite busy_timeout in milliseconds. + // When a write is blocked by another writer, SQLite retries internally + // using a backoff mechanism for up to this duration before returning SQLITE_BUSY. + // Default: 60000 (60 seconds) + BusyTimeoutMs int } // DefaultConfig returns the default repository configuration. @@ -60,6 +66,7 @@ func DefaultConfig() *RepositoryConfig { MinRunningJobDuration: 600, // 10 minutes DbCacheSizeMB: 2048, // 2GB per connection DbSoftHeapLimitMB: 16384, // 16GB process-wide + BusyTimeoutMs: 60000, // 60 seconds } } diff --git a/internal/repository/dbConnection.go b/internal/repository/dbConnection.go index 23a19c93..80df2869 100644 --- a/internal/repository/dbConnection.go +++ b/internal/repository/dbConnection.go @@ -70,7 +70,7 @@ func Connect(db string) { connectionURLParams := make(url.Values) connectionURLParams.Add("_txlock", "immediate") connectionURLParams.Add("_journal_mode", "WAL") - connectionURLParams.Add("_busy_timeout", "5000") + connectionURLParams.Add("_busy_timeout", fmt.Sprintf("%d", repoConfig.BusyTimeoutMs)) connectionURLParams.Add("_synchronous", "NORMAL") cacheSizeKiB := repoConfig.DbCacheSizeMB * 1024 // Convert MB to KiB connectionURLParams.Add("_cache_size", fmt.Sprintf("-%d", cacheSizeKiB)) diff --git a/internal/repository/jobCreate.go b/internal/repository/jobCreate.go index 7047f12f..9a4ab04d 100644 --- a/internal/repository/jobCreate.go +++ b/internal/repository/jobCreate.go @@ -92,20 +92,33 @@ func (r *JobRepository) SyncJobs() ([]*schema.Job, error) { jobs = append(jobs, job) } + // Transfer cached jobs to main table and clear cache in a single transaction. + tx, err := r.DB.Beginx() + if err != nil { + cclog.Errorf("SyncJobs: begin transaction: %v", err) + return nil, err + } + defer tx.Rollback() + // Use INSERT OR IGNORE to skip jobs already transferred by the stop path - _, err = r.DB.Exec( + _, err = tx.Exec( "INSERT OR IGNORE INTO job (job_id, cluster, subcluster, start_time, hpc_user, project, cluster_partition, array_job_id, num_nodes, num_hwthreads, num_acc, shared, monitoring_status, smt, job_state, duration, walltime, footprint, energy, energy_footprint, resources, meta_data) SELECT job_id, cluster, subcluster, start_time, hpc_user, project, cluster_partition, array_job_id, num_nodes, num_hwthreads, num_acc, shared, monitoring_status, smt, job_state, duration, walltime, footprint, energy, energy_footprint, resources, meta_data FROM job_cache") if err != nil { cclog.Errorf("Error while Job sync: %v", err) return nil, err } - _, err = r.DB.Exec("DELETE FROM job_cache") + _, err = tx.Exec("DELETE FROM job_cache") if err != nil { cclog.Errorf("Error while Job cache clean: %v", err) return nil, err } + if err := tx.Commit(); err != nil { + cclog.Errorf("SyncJobs: commit transaction: %v", err) + return nil, err + } + // Resolve correct job.id from the job table. The IDs read from job_cache // are from a different auto-increment sequence and must not be used to // query the job table. diff --git a/internal/repository/node.go b/internal/repository/node.go index 116b9868..2e174e95 100644 --- a/internal/repository/node.go +++ b/internal/repository/node.go @@ -244,6 +244,77 @@ func (r *NodeRepository) UpdateNodeState(hostname string, cluster string, nodeSt return nil } +// NodeStateUpdate holds the data needed to update one node's state in a batch operation. +type NodeStateUpdate struct { + Hostname string + Cluster string + NodeState *schema.NodeStateDB +} + +// BatchUpdateNodeStates inserts node state rows for multiple nodes in a single transaction. +// For each node, it looks up (or creates) the node row, then inserts the state row. +// This reduces lock acquisitions from 2*N to 1 for N nodes. +func (r *NodeRepository) BatchUpdateNodeStates(updates []NodeStateUpdate) error { + tx, err := r.DB.Beginx() + if err != nil { + return fmt.Errorf("BatchUpdateNodeStates: begin transaction: %w", err) + } + defer tx.Rollback() + + stmtLookup, err := tx.Preparex("SELECT id FROM node WHERE hostname = ? AND cluster = ?") + if err != nil { + return fmt.Errorf("BatchUpdateNodeStates: prepare lookup: %w", err) + } + defer stmtLookup.Close() + + stmtInsertNode, err := tx.PrepareNamed(NamedNodeInsert) + if err != nil { + return fmt.Errorf("BatchUpdateNodeStates: prepare node insert: %w", err) + } + defer stmtInsertNode.Close() + + stmtInsertState, err := tx.PrepareNamed(NamedNodeStateInsert) + if err != nil { + return fmt.Errorf("BatchUpdateNodeStates: prepare state insert: %w", err) + } + defer stmtInsertState.Close() + + for _, u := range updates { + var id int64 + if err := stmtLookup.QueryRow(u.Hostname, u.Cluster).Scan(&id); err != nil { + if err == sql.ErrNoRows { + subcluster, scErr := archive.GetSubClusterByNode(u.Cluster, u.Hostname) + if scErr != nil { + cclog.Errorf("BatchUpdateNodeStates: subcluster lookup for '%s' in '%s': %v", u.Hostname, u.Cluster, scErr) + continue + } + node := schema.NodeDB{ + Hostname: u.Hostname, Cluster: u.Cluster, SubCluster: subcluster, + } + res, insertErr := stmtInsertNode.Exec(&node) + if insertErr != nil { + cclog.Errorf("BatchUpdateNodeStates: insert node '%s': %v", u.Hostname, insertErr) + continue + } + id, _ = res.LastInsertId() + } else { + cclog.Errorf("BatchUpdateNodeStates: lookup node '%s': %v", u.Hostname, err) + continue + } + } + + u.NodeState.NodeID = id + if _, err := stmtInsertState.Exec(u.NodeState); err != nil { + cclog.Errorf("BatchUpdateNodeStates: insert state for '%s': %v", u.Hostname, err) + } + } + + if err := tx.Commit(); err != nil { + return fmt.Errorf("BatchUpdateNodeStates: commit: %w", err) + } + return nil +} + // func (r *NodeRepository) UpdateHealthState(hostname string, healthState *schema.MonitoringState) error { // if _, err := sq.Update("node").Set("health_state", healthState).Where("node.id = ?", id).RunWith(r.DB).Exec(); err != nil { // cclog.Errorf("error while updating node '%d'", id) diff --git a/internal/repository/user.go b/internal/repository/user.go index 8e45e16a..6323103c 100644 --- a/internal/repository/user.go +++ b/internal/repository/user.go @@ -188,6 +188,21 @@ func (r *UserRepository) AddUser(user *schema.User) error { return nil } +// AddUserIfNotExists inserts a user only if the username does not already exist. +// Uses INSERT OR IGNORE to avoid UNIQUE constraint errors when a user is +// concurrently created (e.g., by a login while LDAP sync is running). +// Unlike AddUser, this intentionally skips the deprecated default metrics config insertion. +func (r *UserRepository) AddUserIfNotExists(user *schema.User) error { + rolesJson, _ := json.Marshal(user.Roles) + projectsJson, _ := json.Marshal(user.Projects) + + cols := "username, name, roles, projects, ldap" + _, err := r.DB.Exec( + `INSERT OR IGNORE INTO hpc_user (`+cols+`) VALUES (?, ?, ?, ?, ?)`, + user.Username, user.Name, string(rolesJson), string(projectsJson), int(user.AuthSource)) + return err +} + func (r *UserRepository) UpdateUser(dbUser *schema.User, user *schema.User) error { // user contains updated info -> Apply to dbUser // --- Simple Name Update --- diff --git a/pkg/archive/sqliteBackend.go b/pkg/archive/sqliteBackend.go index 3f214136..ff7bf333 100644 --- a/pkg/archive/sqliteBackend.go +++ b/pkg/archive/sqliteBackend.go @@ -102,7 +102,7 @@ func (sa *SqliteArchive) Init(rawConfig json.RawMessage) (uint64, error) { "PRAGMA journal_mode=WAL", "PRAGMA synchronous=NORMAL", "PRAGMA cache_size=-64000", // 64MB cache - "PRAGMA busy_timeout=5000", + "PRAGMA busy_timeout=60000", } for _, pragma := range pragmas { if _, err := sa.db.Exec(pragma); err != nil {