mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2026-03-16 12:57:30 +01:00
Reduce insert pressure in db. Increase sqlite timeout value
Entire-Checkpoint: a1e2931d4deb
This commit is contained in:
@@ -64,12 +64,19 @@ type NatsAPI struct {
|
|||||||
// RepositoryMutex protects job creation operations from race conditions
|
// RepositoryMutex protects job creation operations from race conditions
|
||||||
// when checking for duplicate jobs during startJob calls.
|
// when checking for duplicate jobs during startJob calls.
|
||||||
RepositoryMutex sync.Mutex
|
RepositoryMutex sync.Mutex
|
||||||
|
// jobSem limits concurrent NATS job event processing to prevent goroutine accumulation
|
||||||
|
// when the database is under contention.
|
||||||
|
jobSem chan struct{}
|
||||||
|
// nodeSem limits concurrent NATS node state processing.
|
||||||
|
nodeSem chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewNatsAPI creates a new NatsAPI instance with default dependencies.
|
// NewNatsAPI creates a new NatsAPI instance with default dependencies.
|
||||||
func NewNatsAPI() *NatsAPI {
|
func NewNatsAPI() *NatsAPI {
|
||||||
return &NatsAPI{
|
return &NatsAPI{
|
||||||
JobRepository: repository.GetJobRepository(),
|
JobRepository: repository.GetJobRepository(),
|
||||||
|
jobSem: make(chan struct{}, 8),
|
||||||
|
nodeSem: make(chan struct{}, 2),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -137,6 +144,9 @@ func (api *NatsAPI) processJobEvent(msg lp.CCMessage) {
|
|||||||
//
|
//
|
||||||
// Example: job,function=start_job event="{\"jobId\":1001,...}" 1234567890000000000
|
// Example: job,function=start_job event="{\"jobId\":1001,...}" 1234567890000000000
|
||||||
func (api *NatsAPI) handleJobEvent(subject string, data []byte) {
|
func (api *NatsAPI) handleJobEvent(subject string, data []byte) {
|
||||||
|
api.jobSem <- struct{}{}
|
||||||
|
defer func() { <-api.jobSem }()
|
||||||
|
|
||||||
if len(data) == 0 {
|
if len(data) == 0 {
|
||||||
cclog.Warnf("NATS %s: received empty message", subject)
|
cclog.Warnf("NATS %s: received empty message", subject)
|
||||||
return
|
return
|
||||||
@@ -345,6 +355,7 @@ func (api *NatsAPI) processNodestateEvent(msg lp.CCMessage) {
|
|||||||
repo := repository.GetNodeRepository()
|
repo := repository.GetNodeRepository()
|
||||||
requestReceived := time.Now().Unix()
|
requestReceived := time.Now().Unix()
|
||||||
|
|
||||||
|
updates := make([]repository.NodeStateUpdate, 0, len(req.Nodes))
|
||||||
for _, node := range req.Nodes {
|
for _, node := range req.Nodes {
|
||||||
state := determineState(node.States)
|
state := determineState(node.States)
|
||||||
nodeState := schema.NodeStateDB{
|
nodeState := schema.NodeStateDB{
|
||||||
@@ -356,11 +367,15 @@ func (api *NatsAPI) processNodestateEvent(msg lp.CCMessage) {
|
|||||||
HealthState: schema.MonitoringStateFull,
|
HealthState: schema.MonitoringStateFull,
|
||||||
JobsRunning: node.JobsRunning,
|
JobsRunning: node.JobsRunning,
|
||||||
}
|
}
|
||||||
|
updates = append(updates, repository.NodeStateUpdate{
|
||||||
|
Hostname: node.Hostname,
|
||||||
|
Cluster: req.Cluster,
|
||||||
|
NodeState: &nodeState,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
if err := repo.UpdateNodeState(node.Hostname, req.Cluster, &nodeState); err != nil {
|
if err := repo.BatchUpdateNodeStates(updates); err != nil {
|
||||||
cclog.Errorf("NATS nodestate: updating node state for %s on %s failed: %v",
|
cclog.Errorf("NATS nodestate: batch update for cluster %s failed: %v", req.Cluster, err)
|
||||||
node.Hostname, req.Cluster, err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
cclog.Debugf("NATS nodestate: updated %d node states for cluster %s", len(req.Nodes), req.Cluster)
|
cclog.Debugf("NATS nodestate: updated %d node states for cluster %s", len(req.Nodes), req.Cluster)
|
||||||
@@ -372,6 +387,9 @@ func (api *NatsAPI) processNodestateEvent(msg lp.CCMessage) {
|
|||||||
//
|
//
|
||||||
// Example: nodestate event="{\"cluster\":\"testcluster\",\"nodes\":[...]}" 1234567890000000000
|
// Example: nodestate event="{\"cluster\":\"testcluster\",\"nodes\":[...]}" 1234567890000000000
|
||||||
func (api *NatsAPI) handleNodeState(subject string, data []byte) {
|
func (api *NatsAPI) handleNodeState(subject string, data []byte) {
|
||||||
|
api.nodeSem <- struct{}{}
|
||||||
|
defer func() { <-api.nodeSem }()
|
||||||
|
|
||||||
if len(data) == 0 {
|
if len(data) == 0 {
|
||||||
cclog.Warnf("NATS %s: received empty message", subject)
|
cclog.Warnf("NATS %s: received empty message", subject)
|
||||||
return
|
return
|
||||||
|
|||||||
@@ -116,6 +116,7 @@ func (api *RestAPI) updateNodeStates(rw http.ResponseWriter, r *http.Request) {
|
|||||||
cclog.Debugf("Timer updateNodeStates, MemStore HealthCheck: %s", time.Since(startMs))
|
cclog.Debugf("Timer updateNodeStates, MemStore HealthCheck: %s", time.Since(startMs))
|
||||||
startDB := time.Now()
|
startDB := time.Now()
|
||||||
|
|
||||||
|
updates := make([]repository.NodeStateUpdate, 0, len(req.Nodes))
|
||||||
for _, node := range req.Nodes {
|
for _, node := range req.Nodes {
|
||||||
state := determineState(node.States)
|
state := determineState(node.States)
|
||||||
healthState := schema.MonitoringStateFailed
|
healthState := schema.MonitoringStateFailed
|
||||||
@@ -134,11 +135,15 @@ func (api *RestAPI) updateNodeStates(rw http.ResponseWriter, r *http.Request) {
|
|||||||
HealthMetrics: healthMetrics,
|
HealthMetrics: healthMetrics,
|
||||||
JobsRunning: node.JobsRunning,
|
JobsRunning: node.JobsRunning,
|
||||||
}
|
}
|
||||||
|
updates = append(updates, repository.NodeStateUpdate{
|
||||||
|
Hostname: node.Hostname,
|
||||||
|
Cluster: req.Cluster,
|
||||||
|
NodeState: &nodeState,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
if err := repo.UpdateNodeState(node.Hostname, req.Cluster, &nodeState); err != nil {
|
if err := repo.BatchUpdateNodeStates(updates); err != nil {
|
||||||
cclog.Errorf("updateNodeStates: updating node state for %s on %s failed: %v",
|
cclog.Errorf("updateNodeStates: batch update for cluster %s failed: %v", req.Cluster, err)
|
||||||
node.Hostname, req.Cluster, err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
cclog.Debugf("Timer updateNodeStates, SQLite Inserts: %s", time.Since(startDB))
|
cclog.Debugf("Timer updateNodeStates, SQLite Inserts: %s", time.Since(startDB))
|
||||||
|
|||||||
@@ -210,7 +210,7 @@ func (la *LdapAuthenticator) Sync() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
cclog.Debugf("sync: add %v (name: %v, roles: [user], ldap: true)", username, name)
|
cclog.Debugf("sync: add %v (name: %v, roles: [user], ldap: true)", username, name)
|
||||||
if err := ur.AddUser(user); err != nil {
|
if err := ur.AddUserIfNotExists(user); err != nil {
|
||||||
cclog.Errorf("User '%s' LDAP: Insert into DB failed", username)
|
cclog.Errorf("User '%s' LDAP: Insert into DB failed", username)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -46,6 +46,12 @@ type RepositoryConfig struct {
|
|||||||
// It's a soft limit — queries won't fail, but cache eviction becomes more aggressive.
|
// It's a soft limit — queries won't fail, but cache eviction becomes more aggressive.
|
||||||
// Default: 16384 (16GB)
|
// Default: 16384 (16GB)
|
||||||
DbSoftHeapLimitMB int
|
DbSoftHeapLimitMB int
|
||||||
|
|
||||||
|
// BusyTimeoutMs is the SQLite busy_timeout in milliseconds.
|
||||||
|
// When a write is blocked by another writer, SQLite retries internally
|
||||||
|
// using a backoff mechanism for up to this duration before returning SQLITE_BUSY.
|
||||||
|
// Default: 60000 (60 seconds)
|
||||||
|
BusyTimeoutMs int
|
||||||
}
|
}
|
||||||
|
|
||||||
// DefaultConfig returns the default repository configuration.
|
// DefaultConfig returns the default repository configuration.
|
||||||
@@ -60,6 +66,7 @@ func DefaultConfig() *RepositoryConfig {
|
|||||||
MinRunningJobDuration: 600, // 10 minutes
|
MinRunningJobDuration: 600, // 10 minutes
|
||||||
DbCacheSizeMB: 2048, // 2GB per connection
|
DbCacheSizeMB: 2048, // 2GB per connection
|
||||||
DbSoftHeapLimitMB: 16384, // 16GB process-wide
|
DbSoftHeapLimitMB: 16384, // 16GB process-wide
|
||||||
|
BusyTimeoutMs: 60000, // 60 seconds
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -70,7 +70,7 @@ func Connect(db string) {
|
|||||||
connectionURLParams := make(url.Values)
|
connectionURLParams := make(url.Values)
|
||||||
connectionURLParams.Add("_txlock", "immediate")
|
connectionURLParams.Add("_txlock", "immediate")
|
||||||
connectionURLParams.Add("_journal_mode", "WAL")
|
connectionURLParams.Add("_journal_mode", "WAL")
|
||||||
connectionURLParams.Add("_busy_timeout", "5000")
|
connectionURLParams.Add("_busy_timeout", fmt.Sprintf("%d", repoConfig.BusyTimeoutMs))
|
||||||
connectionURLParams.Add("_synchronous", "NORMAL")
|
connectionURLParams.Add("_synchronous", "NORMAL")
|
||||||
cacheSizeKiB := repoConfig.DbCacheSizeMB * 1024 // Convert MB to KiB
|
cacheSizeKiB := repoConfig.DbCacheSizeMB * 1024 // Convert MB to KiB
|
||||||
connectionURLParams.Add("_cache_size", fmt.Sprintf("-%d", cacheSizeKiB))
|
connectionURLParams.Add("_cache_size", fmt.Sprintf("-%d", cacheSizeKiB))
|
||||||
|
|||||||
@@ -92,20 +92,33 @@ func (r *JobRepository) SyncJobs() ([]*schema.Job, error) {
|
|||||||
jobs = append(jobs, job)
|
jobs = append(jobs, job)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Transfer cached jobs to main table and clear cache in a single transaction.
|
||||||
|
tx, err := r.DB.Beginx()
|
||||||
|
if err != nil {
|
||||||
|
cclog.Errorf("SyncJobs: begin transaction: %v", err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer tx.Rollback()
|
||||||
|
|
||||||
// Use INSERT OR IGNORE to skip jobs already transferred by the stop path
|
// Use INSERT OR IGNORE to skip jobs already transferred by the stop path
|
||||||
_, err = r.DB.Exec(
|
_, err = tx.Exec(
|
||||||
"INSERT OR IGNORE INTO job (job_id, cluster, subcluster, start_time, hpc_user, project, cluster_partition, array_job_id, num_nodes, num_hwthreads, num_acc, shared, monitoring_status, smt, job_state, duration, walltime, footprint, energy, energy_footprint, resources, meta_data) SELECT job_id, cluster, subcluster, start_time, hpc_user, project, cluster_partition, array_job_id, num_nodes, num_hwthreads, num_acc, shared, monitoring_status, smt, job_state, duration, walltime, footprint, energy, energy_footprint, resources, meta_data FROM job_cache")
|
"INSERT OR IGNORE INTO job (job_id, cluster, subcluster, start_time, hpc_user, project, cluster_partition, array_job_id, num_nodes, num_hwthreads, num_acc, shared, monitoring_status, smt, job_state, duration, walltime, footprint, energy, energy_footprint, resources, meta_data) SELECT job_id, cluster, subcluster, start_time, hpc_user, project, cluster_partition, array_job_id, num_nodes, num_hwthreads, num_acc, shared, monitoring_status, smt, job_state, duration, walltime, footprint, energy, energy_footprint, resources, meta_data FROM job_cache")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cclog.Errorf("Error while Job sync: %v", err)
|
cclog.Errorf("Error while Job sync: %v", err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = r.DB.Exec("DELETE FROM job_cache")
|
_, err = tx.Exec("DELETE FROM job_cache")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cclog.Errorf("Error while Job cache clean: %v", err)
|
cclog.Errorf("Error while Job cache clean: %v", err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err := tx.Commit(); err != nil {
|
||||||
|
cclog.Errorf("SyncJobs: commit transaction: %v", err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
// Resolve correct job.id from the job table. The IDs read from job_cache
|
// Resolve correct job.id from the job table. The IDs read from job_cache
|
||||||
// are from a different auto-increment sequence and must not be used to
|
// are from a different auto-increment sequence and must not be used to
|
||||||
// query the job table.
|
// query the job table.
|
||||||
|
|||||||
@@ -244,6 +244,77 @@ func (r *NodeRepository) UpdateNodeState(hostname string, cluster string, nodeSt
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NodeStateUpdate holds the data needed to update one node's state in a batch operation.
|
||||||
|
type NodeStateUpdate struct {
|
||||||
|
Hostname string
|
||||||
|
Cluster string
|
||||||
|
NodeState *schema.NodeStateDB
|
||||||
|
}
|
||||||
|
|
||||||
|
// BatchUpdateNodeStates inserts node state rows for multiple nodes in a single transaction.
|
||||||
|
// For each node, it looks up (or creates) the node row, then inserts the state row.
|
||||||
|
// This reduces lock acquisitions from 2*N to 1 for N nodes.
|
||||||
|
func (r *NodeRepository) BatchUpdateNodeStates(updates []NodeStateUpdate) error {
|
||||||
|
tx, err := r.DB.Beginx()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("BatchUpdateNodeStates: begin transaction: %w", err)
|
||||||
|
}
|
||||||
|
defer tx.Rollback()
|
||||||
|
|
||||||
|
stmtLookup, err := tx.Preparex("SELECT id FROM node WHERE hostname = ? AND cluster = ?")
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("BatchUpdateNodeStates: prepare lookup: %w", err)
|
||||||
|
}
|
||||||
|
defer stmtLookup.Close()
|
||||||
|
|
||||||
|
stmtInsertNode, err := tx.PrepareNamed(NamedNodeInsert)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("BatchUpdateNodeStates: prepare node insert: %w", err)
|
||||||
|
}
|
||||||
|
defer stmtInsertNode.Close()
|
||||||
|
|
||||||
|
stmtInsertState, err := tx.PrepareNamed(NamedNodeStateInsert)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("BatchUpdateNodeStates: prepare state insert: %w", err)
|
||||||
|
}
|
||||||
|
defer stmtInsertState.Close()
|
||||||
|
|
||||||
|
for _, u := range updates {
|
||||||
|
var id int64
|
||||||
|
if err := stmtLookup.QueryRow(u.Hostname, u.Cluster).Scan(&id); err != nil {
|
||||||
|
if err == sql.ErrNoRows {
|
||||||
|
subcluster, scErr := archive.GetSubClusterByNode(u.Cluster, u.Hostname)
|
||||||
|
if scErr != nil {
|
||||||
|
cclog.Errorf("BatchUpdateNodeStates: subcluster lookup for '%s' in '%s': %v", u.Hostname, u.Cluster, scErr)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
node := schema.NodeDB{
|
||||||
|
Hostname: u.Hostname, Cluster: u.Cluster, SubCluster: subcluster,
|
||||||
|
}
|
||||||
|
res, insertErr := stmtInsertNode.Exec(&node)
|
||||||
|
if insertErr != nil {
|
||||||
|
cclog.Errorf("BatchUpdateNodeStates: insert node '%s': %v", u.Hostname, insertErr)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
id, _ = res.LastInsertId()
|
||||||
|
} else {
|
||||||
|
cclog.Errorf("BatchUpdateNodeStates: lookup node '%s': %v", u.Hostname, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
u.NodeState.NodeID = id
|
||||||
|
if _, err := stmtInsertState.Exec(u.NodeState); err != nil {
|
||||||
|
cclog.Errorf("BatchUpdateNodeStates: insert state for '%s': %v", u.Hostname, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := tx.Commit(); err != nil {
|
||||||
|
return fmt.Errorf("BatchUpdateNodeStates: commit: %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// func (r *NodeRepository) UpdateHealthState(hostname string, healthState *schema.MonitoringState) error {
|
// func (r *NodeRepository) UpdateHealthState(hostname string, healthState *schema.MonitoringState) error {
|
||||||
// if _, err := sq.Update("node").Set("health_state", healthState).Where("node.id = ?", id).RunWith(r.DB).Exec(); err != nil {
|
// if _, err := sq.Update("node").Set("health_state", healthState).Where("node.id = ?", id).RunWith(r.DB).Exec(); err != nil {
|
||||||
// cclog.Errorf("error while updating node '%d'", id)
|
// cclog.Errorf("error while updating node '%d'", id)
|
||||||
|
|||||||
@@ -188,6 +188,21 @@ func (r *UserRepository) AddUser(user *schema.User) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// AddUserIfNotExists inserts a user only if the username does not already exist.
|
||||||
|
// Uses INSERT OR IGNORE to avoid UNIQUE constraint errors when a user is
|
||||||
|
// concurrently created (e.g., by a login while LDAP sync is running).
|
||||||
|
// Unlike AddUser, this intentionally skips the deprecated default metrics config insertion.
|
||||||
|
func (r *UserRepository) AddUserIfNotExists(user *schema.User) error {
|
||||||
|
rolesJson, _ := json.Marshal(user.Roles)
|
||||||
|
projectsJson, _ := json.Marshal(user.Projects)
|
||||||
|
|
||||||
|
cols := "username, name, roles, projects, ldap"
|
||||||
|
_, err := r.DB.Exec(
|
||||||
|
`INSERT OR IGNORE INTO hpc_user (`+cols+`) VALUES (?, ?, ?, ?, ?)`,
|
||||||
|
user.Username, user.Name, string(rolesJson), string(projectsJson), int(user.AuthSource))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
func (r *UserRepository) UpdateUser(dbUser *schema.User, user *schema.User) error {
|
func (r *UserRepository) UpdateUser(dbUser *schema.User, user *schema.User) error {
|
||||||
// user contains updated info -> Apply to dbUser
|
// user contains updated info -> Apply to dbUser
|
||||||
// --- Simple Name Update ---
|
// --- Simple Name Update ---
|
||||||
|
|||||||
@@ -102,7 +102,7 @@ func (sa *SqliteArchive) Init(rawConfig json.RawMessage) (uint64, error) {
|
|||||||
"PRAGMA journal_mode=WAL",
|
"PRAGMA journal_mode=WAL",
|
||||||
"PRAGMA synchronous=NORMAL",
|
"PRAGMA synchronous=NORMAL",
|
||||||
"PRAGMA cache_size=-64000", // 64MB cache
|
"PRAGMA cache_size=-64000", // 64MB cache
|
||||||
"PRAGMA busy_timeout=5000",
|
"PRAGMA busy_timeout=60000",
|
||||||
}
|
}
|
||||||
for _, pragma := range pragmas {
|
for _, pragma := range pragmas {
|
||||||
if _, err := sa.db.Exec(pragma); err != nil {
|
if _, err := sa.db.Exec(pragma); err != nil {
|
||||||
|
|||||||
Reference in New Issue
Block a user