mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2026-03-15 12:27:30 +01:00
fix: Prevent memory explosion in sqlite. And make db options configurable
Entire-Checkpoint: e368e6d8abf3
This commit is contained in:
@@ -27,13 +27,25 @@ type RepositoryConfig struct {
|
||||
ConnectionMaxLifetime time.Duration
|
||||
|
||||
// ConnectionMaxIdleTime is the maximum amount of time a connection may be idle.
|
||||
// Default: 1 hour
|
||||
// Default: 10 minutes
|
||||
ConnectionMaxIdleTime time.Duration
|
||||
|
||||
// MinRunningJobDuration is the minimum duration in seconds for a job to be
|
||||
// considered in "running jobs" queries. This filters out very short jobs.
|
||||
// Default: 600 seconds (10 minutes)
|
||||
MinRunningJobDuration int
|
||||
|
||||
// DbCacheSizeMB is the SQLite page cache size per connection in MB.
|
||||
// Uses negative PRAGMA cache_size notation (KiB). With MaxOpenConnections=4
|
||||
// and DbCacheSizeMB=2048, total page cache is up to 8GB.
|
||||
// Default: 2048 (2GB)
|
||||
DbCacheSizeMB int
|
||||
|
||||
// DbSoftHeapLimitMB is the process-wide SQLite soft heap limit in MB.
|
||||
// SQLite will try to release cache pages to stay under this limit.
|
||||
// It's a soft limit — queries won't fail, but cache eviction becomes more aggressive.
|
||||
// Default: 16384 (16GB)
|
||||
DbSoftHeapLimitMB int
|
||||
}
|
||||
|
||||
// DefaultConfig returns the default repository configuration.
|
||||
@@ -44,8 +56,10 @@ func DefaultConfig() *RepositoryConfig {
|
||||
MaxOpenConnections: 4,
|
||||
MaxIdleConnections: 4,
|
||||
ConnectionMaxLifetime: time.Hour,
|
||||
ConnectionMaxIdleTime: time.Hour,
|
||||
MinRunningJobDuration: 600, // 10 minutes
|
||||
ConnectionMaxIdleTime: 10 * time.Minute,
|
||||
MinRunningJobDuration: 600, // 10 minutes
|
||||
DbCacheSizeMB: 2048, // 2GB per connection
|
||||
DbSoftHeapLimitMB: 16384, // 16GB process-wide
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -36,9 +36,10 @@ type DatabaseOptions struct {
|
||||
ConnectionMaxIdleTime time.Duration
|
||||
}
|
||||
|
||||
func setupSqlite(db *sql.DB) error {
|
||||
func setupSqlite(db *sql.DB, cfg *RepositoryConfig) error {
|
||||
pragmas := []string{
|
||||
"temp_store = memory",
|
||||
fmt.Sprintf("soft_heap_limit = %d", int64(cfg.DbSoftHeapLimitMB)*1024*1024),
|
||||
}
|
||||
|
||||
for _, pragma := range pragmas {
|
||||
@@ -79,7 +80,8 @@ func Connect(db string) {
|
||||
connectionURLParams.Add("_journal_mode", "WAL")
|
||||
connectionURLParams.Add("_busy_timeout", "5000")
|
||||
connectionURLParams.Add("_synchronous", "NORMAL")
|
||||
connectionURLParams.Add("_cache_size", "1000000000")
|
||||
cacheSizeKiB := repoConfig.DbCacheSizeMB * 1024 // Convert MB to KiB
|
||||
connectionURLParams.Add("_cache_size", fmt.Sprintf("-%d", cacheSizeKiB))
|
||||
connectionURLParams.Add("_foreign_keys", "true")
|
||||
opts.URL = fmt.Sprintf("file:%s?%s", opts.URL, connectionURLParams.Encode())
|
||||
|
||||
@@ -94,11 +96,14 @@ func Connect(db string) {
|
||||
cclog.Abortf("DB Connection: Could not connect to SQLite database with sqlx.Open().\nError: %s\n", err.Error())
|
||||
}
|
||||
|
||||
err = setupSqlite(dbHandle.DB)
|
||||
err = setupSqlite(dbHandle.DB, repoConfig)
|
||||
if err != nil {
|
||||
cclog.Abortf("Failed sqlite db setup.\nError: %s\n", err.Error())
|
||||
}
|
||||
|
||||
cclog.Infof("SQLite config: cache_size=%dMB/conn, soft_heap_limit=%dMB, max_conns=%d",
|
||||
repoConfig.DbCacheSizeMB, repoConfig.DbSoftHeapLimitMB, repoConfig.MaxOpenConnections)
|
||||
|
||||
dbHandle.SetMaxOpenConns(opts.MaxOpenConnections)
|
||||
dbHandle.SetMaxIdleConns(opts.MaxIdleConnections)
|
||||
dbHandle.SetConnMaxLifetime(opts.ConnectionMaxLifetime)
|
||||
|
||||
@@ -171,7 +171,7 @@ func (r *JobRepository) FindByID(ctx context.Context, jobID int64) (*schema.Job,
|
||||
return nil, qerr
|
||||
}
|
||||
|
||||
return scanJob(q.RunWith(r.stmtCache).QueryRow())
|
||||
return scanJob(q.RunWith(r.stmtCache).QueryRowContext(ctx))
|
||||
}
|
||||
|
||||
// FindByIDWithUser executes a SQL query to find a specific batch job.
|
||||
@@ -217,7 +217,7 @@ func (r *JobRepository) FindByJobID(ctx context.Context, jobID int64, startTime
|
||||
return nil, qerr
|
||||
}
|
||||
|
||||
return scanJob(q.RunWith(r.stmtCache).QueryRow())
|
||||
return scanJob(q.RunWith(r.stmtCache).QueryRowContext(ctx))
|
||||
}
|
||||
|
||||
// IsJobOwner checks if the specified user owns the batch job identified by jobID,
|
||||
|
||||
@@ -84,7 +84,7 @@ func (r *JobRepository) QueryJobs(
|
||||
query = BuildWhereClause(f, query)
|
||||
}
|
||||
|
||||
rows, err := query.RunWith(r.stmtCache).Query()
|
||||
rows, err := query.RunWith(r.stmtCache).QueryContext(ctx)
|
||||
if err != nil {
|
||||
queryString, queryVars, _ := query.ToSql()
|
||||
return nil, fmt.Errorf("query failed [%s] %v: %w", queryString, queryVars, err)
|
||||
@@ -126,7 +126,7 @@ func (r *JobRepository) CountJobs(
|
||||
}
|
||||
|
||||
var count int
|
||||
if err := query.RunWith(r.DB).Scan(&count); err != nil {
|
||||
if err := query.RunWith(r.DB).QueryRowContext(ctx).Scan(&count); err != nil {
|
||||
return 0, fmt.Errorf("failed to count jobs: %w", err)
|
||||
}
|
||||
|
||||
|
||||
@@ -230,7 +230,7 @@ func (r *JobRepository) JobsStatsGrouped(
|
||||
query = query.Offset((uint64(page.Page) - 1) * limit).Limit(limit)
|
||||
}
|
||||
|
||||
rows, err := query.RunWith(r.DB).Query()
|
||||
rows, err := query.RunWith(r.DB).QueryContext(ctx)
|
||||
if err != nil {
|
||||
cclog.Warn("Error while querying DB for job statistics")
|
||||
return nil, err
|
||||
@@ -355,7 +355,7 @@ func (r *JobRepository) JobsStats(
|
||||
return nil, err
|
||||
}
|
||||
|
||||
row := query.RunWith(r.DB).QueryRow()
|
||||
row := query.RunWith(r.DB).QueryRowContext(ctx)
|
||||
stats := make([]*model.JobsStatistics, 0, 1)
|
||||
|
||||
var jobs, users, walltime, nodes, nodeHours, cores, coreHours, accs, accHours sql.NullInt64
|
||||
@@ -440,7 +440,7 @@ func (r *JobRepository) JobCountGrouped(
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
rows, err := query.RunWith(r.DB).Query()
|
||||
rows, err := query.RunWith(r.DB).QueryContext(ctx)
|
||||
if err != nil {
|
||||
cclog.Warn("Error while querying DB for job statistics")
|
||||
return nil, err
|
||||
@@ -501,7 +501,7 @@ func (r *JobRepository) AddJobCountGrouped(
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
rows, err := query.RunWith(r.DB).Query()
|
||||
rows, err := query.RunWith(r.DB).QueryContext(ctx)
|
||||
if err != nil {
|
||||
cclog.Warn("Error while querying DB for job statistics")
|
||||
return nil, err
|
||||
@@ -566,7 +566,7 @@ func (r *JobRepository) AddJobCount(
|
||||
return nil, err
|
||||
}
|
||||
var cnt sql.NullInt64
|
||||
if err := query.RunWith(r.DB).QueryRow().Scan(&cnt); err != nil {
|
||||
if err := query.RunWith(r.DB).QueryRowContext(ctx).Scan(&cnt); err != nil {
|
||||
cclog.Warn("Error while querying DB for job count")
|
||||
return nil, err
|
||||
}
|
||||
@@ -755,7 +755,7 @@ func (r *JobRepository) jobsStatisticsHistogram(
|
||||
query = BuildWhereClause(f, query)
|
||||
}
|
||||
|
||||
rows, err := query.GroupBy("value").RunWith(r.DB).Query()
|
||||
rows, err := query.GroupBy("value").RunWith(r.DB).QueryContext(ctx)
|
||||
if err != nil {
|
||||
cclog.Error("Error while running query")
|
||||
return nil, err
|
||||
@@ -829,7 +829,7 @@ func (r *JobRepository) jobsDurationStatisticsHistogram(
|
||||
query = BuildWhereClause(f, query)
|
||||
}
|
||||
|
||||
rows, err := query.GroupBy("value").RunWith(r.DB).Query()
|
||||
rows, err := query.GroupBy("value").RunWith(r.DB).QueryContext(ctx)
|
||||
if err != nil {
|
||||
cclog.Error("Error while running query")
|
||||
return nil, err
|
||||
@@ -959,7 +959,7 @@ func (r *JobRepository) jobsMetricStatisticsHistogram(
|
||||
|
||||
mainQuery = mainQuery.GroupBy("bin").OrderBy("bin")
|
||||
|
||||
rows, err := mainQuery.RunWith(r.DB).Query()
|
||||
rows, err := mainQuery.RunWith(r.DB).QueryContext(ctx)
|
||||
if err != nil {
|
||||
cclog.Errorf("Error while running mainQuery: %s", err)
|
||||
return nil, err
|
||||
|
||||
Reference in New Issue
Block a user