fix: Large heap allocations in sqlite driver. Sanitize sqlite config and make it configurablex. Allow to cancel queries.

This commit is contained in:
2026-03-11 11:14:37 +01:00
parent c8d8f7084a
commit 00d2f97c4c
8 changed files with 93 additions and 17 deletions

View File

@@ -108,6 +108,26 @@ func initConfiguration() error {
} }
func initDatabase() error { func initDatabase() error {
if config.Keys.DbConfig != nil {
cfg := repository.DefaultConfig()
dc := config.Keys.DbConfig
if dc.CacheSizeMB > 0 {
cfg.DbCacheSizeMB = dc.CacheSizeMB
}
if dc.SoftHeapLimitMB > 0 {
cfg.DbSoftHeapLimitMB = dc.SoftHeapLimitMB
}
if dc.MaxOpenConnections > 0 {
cfg.MaxOpenConnections = dc.MaxOpenConnections
}
if dc.MaxIdleConnections > 0 {
cfg.MaxIdleConnections = dc.MaxIdleConnections
}
if dc.ConnectionMaxIdleTimeMins > 0 {
cfg.ConnectionMaxIdleTime = time.Duration(dc.ConnectionMaxIdleTimeMins) * time.Minute
}
repository.SetConfig(cfg)
}
repository.Connect(config.Keys.DB) repository.Connect(config.Keys.DB)
return nil return nil
} }

View File

@@ -77,6 +77,17 @@ type ProgramConfig struct {
// Node state retention configuration // Node state retention configuration
NodeStateRetention *NodeStateRetention `json:"nodestate-retention"` NodeStateRetention *NodeStateRetention `json:"nodestate-retention"`
// Database tuning configuration
DbConfig *DbConfig `json:"db-config"`
}
type DbConfig struct {
CacheSizeMB int `json:"cache-size-mb"`
SoftHeapLimitMB int `json:"soft-heap-limit-mb"`
MaxOpenConnections int `json:"max-open-connections"`
MaxIdleConnections int `json:"max-idle-connections"`
ConnectionMaxIdleTimeMins int `json:"max-idle-time-minutes"`
} }
type NodeStateRetention struct { type NodeStateRetention struct {

View File

@@ -177,6 +177,32 @@ var configSchema = `
} }
}, },
"required": ["policy"] "required": ["policy"]
},
"db-config": {
"description": "SQLite database tuning configuration.",
"type": "object",
"properties": {
"cache-size-mb": {
"description": "SQLite page cache size per connection in MB (default: 2048).",
"type": "integer"
},
"soft-heap-limit-mb": {
"description": "Process-wide SQLite soft heap limit in MB (default: 16384).",
"type": "integer"
},
"max-open-connections": {
"description": "Maximum number of open database connections (default: 4).",
"type": "integer"
},
"max-idle-connections": {
"description": "Maximum number of idle database connections (default: 4).",
"type": "integer"
},
"max-idle-time-minutes": {
"description": "Maximum idle time for a connection in minutes (default: 10).",
"type": "integer"
}
}
} }
} }
}` }`

View File

@@ -27,13 +27,25 @@ type RepositoryConfig struct {
ConnectionMaxLifetime time.Duration ConnectionMaxLifetime time.Duration
// ConnectionMaxIdleTime is the maximum amount of time a connection may be idle. // ConnectionMaxIdleTime is the maximum amount of time a connection may be idle.
// Default: 1 hour // Default: 10 minutes
ConnectionMaxIdleTime time.Duration ConnectionMaxIdleTime time.Duration
// MinRunningJobDuration is the minimum duration in seconds for a job to be // MinRunningJobDuration is the minimum duration in seconds for a job to be
// considered in "running jobs" queries. This filters out very short jobs. // considered in "running jobs" queries. This filters out very short jobs.
// Default: 600 seconds (10 minutes) // Default: 600 seconds (10 minutes)
MinRunningJobDuration int MinRunningJobDuration int
// DbCacheSizeMB is the SQLite page cache size per connection in MB.
// Uses negative PRAGMA cache_size notation (KiB). With MaxOpenConnections=4
// and DbCacheSizeMB=2048, total page cache is up to 8GB.
// Default: 2048 (2GB)
DbCacheSizeMB int
// DbSoftHeapLimitMB is the process-wide SQLite soft heap limit in MB.
// SQLite will try to release cache pages to stay under this limit.
// It's a soft limit — queries won't fail, but cache eviction becomes more aggressive.
// Default: 16384 (16GB)
DbSoftHeapLimitMB int
} }
// DefaultConfig returns the default repository configuration. // DefaultConfig returns the default repository configuration.
@@ -44,8 +56,10 @@ func DefaultConfig() *RepositoryConfig {
MaxOpenConnections: 4, MaxOpenConnections: 4,
MaxIdleConnections: 4, MaxIdleConnections: 4,
ConnectionMaxLifetime: time.Hour, ConnectionMaxLifetime: time.Hour,
ConnectionMaxIdleTime: time.Hour, ConnectionMaxIdleTime: 10 * time.Minute,
MinRunningJobDuration: 600, // 10 minutes MinRunningJobDuration: 600, // 10 minutes
DbCacheSizeMB: 2048, // 2GB per connection
DbSoftHeapLimitMB: 16384, // 16GB process-wide
} }
} }

View File

@@ -36,9 +36,10 @@ type DatabaseOptions struct {
ConnectionMaxIdleTime time.Duration ConnectionMaxIdleTime time.Duration
} }
func setupSqlite(db *sql.DB) error { func setupSqlite(db *sql.DB, cfg *RepositoryConfig) error {
pragmas := []string{ pragmas := []string{
"temp_store = memory", "temp_store = memory",
fmt.Sprintf("soft_heap_limit = %d", int64(cfg.DbSoftHeapLimitMB)*1024*1024),
} }
for _, pragma := range pragmas { for _, pragma := range pragmas {
@@ -79,7 +80,8 @@ func Connect(db string) {
connectionURLParams.Add("_journal_mode", "WAL") connectionURLParams.Add("_journal_mode", "WAL")
connectionURLParams.Add("_busy_timeout", "5000") connectionURLParams.Add("_busy_timeout", "5000")
connectionURLParams.Add("_synchronous", "NORMAL") connectionURLParams.Add("_synchronous", "NORMAL")
connectionURLParams.Add("_cache_size", "1000000000") cacheSizeKiB := repoConfig.DbCacheSizeMB * 1024 // Convert MB to KiB
connectionURLParams.Add("_cache_size", fmt.Sprintf("-%d", cacheSizeKiB))
connectionURLParams.Add("_foreign_keys", "true") connectionURLParams.Add("_foreign_keys", "true")
opts.URL = fmt.Sprintf("file:%s?%s", opts.URL, connectionURLParams.Encode()) opts.URL = fmt.Sprintf("file:%s?%s", opts.URL, connectionURLParams.Encode())
@@ -94,11 +96,14 @@ func Connect(db string) {
cclog.Abortf("DB Connection: Could not connect to SQLite database with sqlx.Open().\nError: %s\n", err.Error()) cclog.Abortf("DB Connection: Could not connect to SQLite database with sqlx.Open().\nError: %s\n", err.Error())
} }
err = setupSqlite(dbHandle.DB) err = setupSqlite(dbHandle.DB, repoConfig)
if err != nil { if err != nil {
cclog.Abortf("Failed sqlite db setup.\nError: %s\n", err.Error()) cclog.Abortf("Failed sqlite db setup.\nError: %s\n", err.Error())
} }
cclog.Infof("SQLite config: cache_size=%dMB/conn, soft_heap_limit=%dMB, max_conns=%d",
repoConfig.DbCacheSizeMB, repoConfig.DbSoftHeapLimitMB, repoConfig.MaxOpenConnections)
dbHandle.SetMaxOpenConns(opts.MaxOpenConnections) dbHandle.SetMaxOpenConns(opts.MaxOpenConnections)
dbHandle.SetMaxIdleConns(opts.MaxIdleConnections) dbHandle.SetMaxIdleConns(opts.MaxIdleConnections)
dbHandle.SetConnMaxLifetime(opts.ConnectionMaxLifetime) dbHandle.SetConnMaxLifetime(opts.ConnectionMaxLifetime)

View File

@@ -171,7 +171,7 @@ func (r *JobRepository) FindByID(ctx context.Context, jobID int64) (*schema.Job,
return nil, qerr return nil, qerr
} }
return scanJob(q.RunWith(r.stmtCache).QueryRow()) return scanJob(q.RunWith(r.stmtCache).QueryRowContext(ctx))
} }
// FindByIDWithUser executes a SQL query to find a specific batch job. // FindByIDWithUser executes a SQL query to find a specific batch job.
@@ -217,7 +217,7 @@ func (r *JobRepository) FindByJobID(ctx context.Context, jobID int64, startTime
return nil, qerr return nil, qerr
} }
return scanJob(q.RunWith(r.stmtCache).QueryRow()) return scanJob(q.RunWith(r.stmtCache).QueryRowContext(ctx))
} }
// IsJobOwner checks if the specified user owns the batch job identified by jobID, // IsJobOwner checks if the specified user owns the batch job identified by jobID,

View File

@@ -84,7 +84,7 @@ func (r *JobRepository) QueryJobs(
query = BuildWhereClause(f, query) query = BuildWhereClause(f, query)
} }
rows, err := query.RunWith(r.stmtCache).Query() rows, err := query.RunWith(r.stmtCache).QueryContext(ctx)
if err != nil { if err != nil {
queryString, queryVars, _ := query.ToSql() queryString, queryVars, _ := query.ToSql()
return nil, fmt.Errorf("query failed [%s] %v: %w", queryString, queryVars, err) return nil, fmt.Errorf("query failed [%s] %v: %w", queryString, queryVars, err)
@@ -126,7 +126,7 @@ func (r *JobRepository) CountJobs(
} }
var count int var count int
if err := query.RunWith(r.DB).Scan(&count); err != nil { if err := query.RunWith(r.DB).QueryRowContext(ctx).Scan(&count); err != nil {
return 0, fmt.Errorf("failed to count jobs: %w", err) return 0, fmt.Errorf("failed to count jobs: %w", err)
} }

View File

@@ -230,7 +230,7 @@ func (r *JobRepository) JobsStatsGrouped(
query = query.Offset((uint64(page.Page) - 1) * limit).Limit(limit) query = query.Offset((uint64(page.Page) - 1) * limit).Limit(limit)
} }
rows, err := query.RunWith(r.DB).Query() rows, err := query.RunWith(r.DB).QueryContext(ctx)
if err != nil { if err != nil {
cclog.Warn("Error while querying DB for job statistics") cclog.Warn("Error while querying DB for job statistics")
return nil, err return nil, err
@@ -355,7 +355,7 @@ func (r *JobRepository) JobsStats(
return nil, err return nil, err
} }
row := query.RunWith(r.DB).QueryRow() row := query.RunWith(r.DB).QueryRowContext(ctx)
stats := make([]*model.JobsStatistics, 0, 1) stats := make([]*model.JobsStatistics, 0, 1)
var jobs, users, walltime, nodes, nodeHours, cores, coreHours, accs, accHours sql.NullInt64 var jobs, users, walltime, nodes, nodeHours, cores, coreHours, accs, accHours sql.NullInt64
@@ -440,7 +440,7 @@ func (r *JobRepository) JobCountGrouped(
if err != nil { if err != nil {
return nil, err return nil, err
} }
rows, err := query.RunWith(r.DB).Query() rows, err := query.RunWith(r.DB).QueryContext(ctx)
if err != nil { if err != nil {
cclog.Warn("Error while querying DB for job statistics") cclog.Warn("Error while querying DB for job statistics")
return nil, err return nil, err
@@ -501,7 +501,7 @@ func (r *JobRepository) AddJobCountGrouped(
if err != nil { if err != nil {
return nil, err return nil, err
} }
rows, err := query.RunWith(r.DB).Query() rows, err := query.RunWith(r.DB).QueryContext(ctx)
if err != nil { if err != nil {
cclog.Warn("Error while querying DB for job statistics") cclog.Warn("Error while querying DB for job statistics")
return nil, err return nil, err
@@ -566,7 +566,7 @@ func (r *JobRepository) AddJobCount(
return nil, err return nil, err
} }
var cnt sql.NullInt64 var cnt sql.NullInt64
if err := query.RunWith(r.DB).QueryRow().Scan(&cnt); err != nil { if err := query.RunWith(r.DB).QueryRowContext(ctx).Scan(&cnt); err != nil {
cclog.Warn("Error while querying DB for job count") cclog.Warn("Error while querying DB for job count")
return nil, err return nil, err
} }
@@ -755,7 +755,7 @@ func (r *JobRepository) jobsStatisticsHistogram(
query = BuildWhereClause(f, query) query = BuildWhereClause(f, query)
} }
rows, err := query.GroupBy("value").RunWith(r.DB).Query() rows, err := query.GroupBy("value").RunWith(r.DB).QueryContext(ctx)
if err != nil { if err != nil {
cclog.Error("Error while running query") cclog.Error("Error while running query")
return nil, err return nil, err
@@ -829,7 +829,7 @@ func (r *JobRepository) jobsDurationStatisticsHistogram(
query = BuildWhereClause(f, query) query = BuildWhereClause(f, query)
} }
rows, err := query.GroupBy("value").RunWith(r.DB).Query() rows, err := query.GroupBy("value").RunWith(r.DB).QueryContext(ctx)
if err != nil { if err != nil {
cclog.Error("Error while running query") cclog.Error("Error while running query")
return nil, err return nil, err
@@ -959,7 +959,7 @@ func (r *JobRepository) jobsMetricStatisticsHistogram(
mainQuery = mainQuery.GroupBy("bin").OrderBy("bin") mainQuery = mainQuery.GroupBy("bin").OrderBy("bin")
rows, err := mainQuery.RunWith(r.DB).Query() rows, err := mainQuery.RunWith(r.DB).QueryContext(ctx)
if err != nil { if err != nil {
cclog.Errorf("Error while running mainQuery: %s", err) cclog.Errorf("Error while running mainQuery: %s", err)
return nil, err return nil, err