1 Commits

Author SHA1 Message Date
dependabot[bot]
d5813b71c7 Bump devalue
Bumps the npm_and_yarn group with 1 update in the /web/frontend directory: [devalue](https://github.com/sveltejs/devalue).


Updates `devalue` from 5.6.3 to 5.6.4
- [Release notes](https://github.com/sveltejs/devalue/releases)
- [Changelog](https://github.com/sveltejs/devalue/blob/main/CHANGELOG.md)
- [Commits](https://github.com/sveltejs/devalue/compare/v5.6.3...v5.6.4)

---
updated-dependencies:
- dependency-name: devalue
  dependency-version: 5.6.4
  dependency-type: indirect
  dependency-group: npm_and_yarn
...

Signed-off-by: dependabot[bot] <support@github.com>
2026-03-12 16:17:55 +00:00
38 changed files with 312 additions and 1413 deletions

View File

@@ -1,29 +0,0 @@
# Session Context
Session ID: 28147033-ddc8-4056-b064-e0558fbc614e
Commit Message: There are memory leaks in production on queries that trigger a full scan
## Prompts
### Prompt 1
There are memory leaks in production on queries that trigger a full scan. Review all queries for missing close and other bugs that could cause memory leaks in the sqlite-3 driver. Example for a query that triggers a massive allocation is a specific jobstate list with sorting on start_time on all jobs.
## Summary
All 31 `.Query()` calls now have proper `rows.Close()`. Build passes, tests pass, vet clean.
### Summary of changes
**`internal/repository/stats.go`** (7 fixes + 1 rewrite):
- Added `defer rows.Close()` + `rows.Err()` to `JobsStatsGrouped`, `JobCountGrouped`, `AddJobCountGrouped`, `jobsStatisticsHistogram`, `jobsDurationStatisticsHistogram`, `jobsMetricStatisticsHistogram`
- Rewrote `AddJobCount` to use `QueryRow()` instead of `Query()` (single-row result)
**`internal/repository/tags.go`** (5 fixes):
- Added `defer xrows.Close()` + `xrows.Err()` for `Queryx` in `CountTags`
- Added `defer rows.Close()` for second query in `CountTags`
- Added `defer rows.Close()` + `rows.Err()` to `GetTags`, `GetTagsDirect`, `getArchiveTags`
**`internal/repository/node.go`** (4 fixes + 1 rewrite):
- Added `defer rows.Close()` + `rows.Err()`, removed manual `rows.Close()` in `QueryNodes`, `QueryNodesWithMeta`, `CountStatesTimed`
- Rewrote `CountStates` to use `COUNT(*) ... GROUP BY` in SQL instead of fetching every row and counting in Go

File diff suppressed because one or more lines are too long

View File

@@ -1 +0,0 @@
There are memory leaks in production on queries that trigger a full scan. Review all queries for missing close and other bugs that could cause memory leaks in the sqlite-3 driver. Example for a query that triggers a massive allocation is a specific jobstate list with sorting on start_time on all jobs.

View File

@@ -1,16 +0,0 @@
All 31 `.Query()` calls now have proper `rows.Close()`. Build passes, tests pass, vet clean.
### Summary of changes
**`internal/repository/stats.go`** (7 fixes + 1 rewrite):
- Added `defer rows.Close()` + `rows.Err()` to `JobsStatsGrouped`, `JobCountGrouped`, `AddJobCountGrouped`, `jobsStatisticsHistogram`, `jobsDurationStatisticsHistogram`, `jobsMetricStatisticsHistogram`
- Rewrote `AddJobCount` to use `QueryRow()` instead of `Query()` (single-row result)
**`internal/repository/tags.go`** (5 fixes):
- Added `defer xrows.Close()` + `xrows.Err()` for `Queryx` in `CountTags`
- Added `defer rows.Close()` for second query in `CountTags`
- Added `defer rows.Close()` + `rows.Err()` to `GetTags`, `GetTagsDirect`, `getArchiveTags`
**`internal/repository/node.go`** (4 fixes + 1 rewrite):
- Added `defer rows.Close()` + `rows.Err()`, removed manual `rows.Close()` in `QueryNodes`, `QueryNodesWithMeta`, `CountStatesTimed`
- Rewrote `CountStates` to use `COUNT(*) ... GROUP BY` in SQL instead of fetching every row and counting in Go

View File

@@ -1,40 +0,0 @@
# Session Context
Session ID: 42401d2e-7d1c-4c0e-abe6-356cb2d48747
Commit Message: There also have to be bugs in jobQuery.go . Especially the following que
## Prompts
### Prompt 1
There also have to be bugs in jobQuery.go . Especially the following query triggers the memory leak: SELECT * FROM job WHERE job.job_state IN ("completed", "running", "failed") ORDER BY job.start_time DESC LIMIT 1 OFFSET 10; Dig deeper to find the cause. Also investigate why no existing index is used for this query.
## Summary
All passes. Here's a summary of findings and fixes:
---
## Findings
### Bug 1: Missing `defer rows.Close()` in `user.go:GetLdapUsernames()` (line 100)
**Fixed.** Every call leaked a database connection and its buffers.
### Bug 2: SQLite never runs `ANALYZE` — wrong index selection
**Fixed.** Added `ANALYZE` to `setupSqlite()` in `dbConnection.go`.
Without statistics in `sqlite_stat1`, SQLite's query planner **guessed** and chose `jobs_jobstate_energy` (a terrible index for this query) instead of scanning `jobs_starttime` in reverse. The result:
| | Before ANALYZE | After ANALYZE |
|---|---|---|
| Index used | `jobs_jobstate_energy` | `jobs_starttime` |
| Sort | **TEMP B-TREE** (materializes ALL rows) | None (index order) |
| Memory | Proportional to total matching rows | Constant |
| I/O | Full scan of all matching rows | Stops at OFFSET+LIMIT |
### Bug 3: `IN` clause + `ORDER BY` is fundamentally incompatible with composite indexes
Even with the "correct" index `(job_state, start_time)`, SQLite **cannot** merge-sort across 3 separate index range scans for `IN ('completed','running','failed')`. It always falls back to a temp B-tree sort. The only efficient plan is to use the standalone `jobs_starttime` index — which SQLite does automatically **after ANALYZE** because it realizes the 3 states cover virtually all rows, making the WHERE clause nearly a no-op.
### Observation: 79 indexes on the `job` table
This is excessive and actively harmful — it confuses the query planner (especially without ANALYZE) and slows writes. The `jobs_jobstate_starttime` index from migration 08 is also missing from the actual DB (only the 3-column `jobs_jobstate_starttime_duration` exists). This is worth investigating separately but is a schema/migration concern, not a code bug.

File diff suppressed because one or more lines are too long

View File

@@ -1 +0,0 @@
There also have to be bugs in jobQuery.go . Especially the following query triggers the memory leak: SELECT * FROM job WHERE job.job_state IN ("completed", "running", "failed") ORDER BY job.start_time DESC LIMIT 1 OFFSET 10; Dig deeper to find the cause. Also investigate why no existing index is used for this query.

View File

@@ -1,27 +0,0 @@
All passes. Here's a summary of findings and fixes:
---
## Findings
### Bug 1: Missing `defer rows.Close()` in `user.go:GetLdapUsernames()` (line 100)
**Fixed.** Every call leaked a database connection and its buffers.
### Bug 2: SQLite never runs `ANALYZE` — wrong index selection
**Fixed.** Added `ANALYZE` to `setupSqlite()` in `dbConnection.go`.
Without statistics in `sqlite_stat1`, SQLite's query planner **guessed** and chose `jobs_jobstate_energy` (a terrible index for this query) instead of scanning `jobs_starttime` in reverse. The result:
| | Before ANALYZE | After ANALYZE |
|---|---|---|
| Index used | `jobs_jobstate_energy` | `jobs_starttime` |
| Sort | **TEMP B-TREE** (materializes ALL rows) | None (index order) |
| Memory | Proportional to total matching rows | Constant |
| I/O | Full scan of all matching rows | Stops at OFFSET+LIMIT |
### Bug 3: `IN` clause + `ORDER BY` is fundamentally incompatible with composite indexes
Even with the "correct" index `(job_state, start_time)`, SQLite **cannot** merge-sort across 3 separate index range scans for `IN ('completed','running','failed')`. It always falls back to a temp B-tree sort. The only efficient plan is to use the standalone `jobs_starttime` index — which SQLite does automatically **after ANALYZE** because it realizes the 3 states cover virtually all rows, making the WHERE clause nearly a no-op.
### Observation: 79 indexes on the `job` table
This is excessive and actively harmful — it confuses the query planner (especially without ANALYZE) and slows writes. The `jobs_jobstate_starttime` index from migration 08 is also missing from the actual DB (only the 3-column `jobs_jobstate_starttime_duration` exists). This is worth investigating separately but is a schema/migration concern, not a code bug.

View File

@@ -1,6 +1,6 @@
TARGET = ./cc-backend
FRONTEND = ./web/frontend
VERSION = 1.5.1
VERSION = 1.5.0
GIT_HASH := $(shell git rev-parse --short HEAD || echo 'development')
CURRENT_TIME = $(shell date +"%Y-%m-%d:T%H:%M:%S")
LD_FLAGS = '-s -X main.date=${CURRENT_TIME} -X main.version=${VERSION} -X main.commit=${GIT_HASH}'

View File

@@ -133,92 +133,6 @@ ln -s <your-existing-job-archive> ./var/job-archive
./cc-backend -help
```
## Database Configuration
cc-backend uses SQLite as its database. For large installations, SQLite memory
usage can be tuned via the optional `db-config` section in config.json under
`main`:
```json
{
"main": {
"db": "./var/job.db",
"db-config": {
"cache-size-mb": 2048,
"soft-heap-limit-mb": 16384,
"max-open-connections": 4,
"max-idle-connections": 4,
"max-idle-time-minutes": 10
}
}
}
```
All fields are optional. If `db-config` is omitted entirely, built-in defaults
are used.
### Options
| Option | Default | Description |
| ----------------------- | ------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `cache-size-mb` | 2048 | SQLite page cache size per connection in MB. Maps to `PRAGMA cache_size`. Total cache memory is up to `cache-size-mb × max-open-connections`. |
| `soft-heap-limit-mb` | 16384 | Process-wide SQLite soft heap limit in MB. SQLite will try to release cache pages to stay under this limit. Queries won't fail if exceeded, but cache eviction becomes more aggressive. |
| `max-open-connections` | 4 | Maximum number of open database connections. |
| `max-idle-connections` | 4 | Maximum number of idle database connections kept in the pool. |
| `max-idle-time-minutes` | 10 | Maximum time in minutes a connection can sit idle before being closed. |
### Sizing Guidelines
SQLite's `cache_size` is a **per-connection** setting — each connection
maintains its own independent page cache. With multiple connections, the total
memory available for caching is the sum across all connections.
In practice, different connections tend to cache **different pages** (e.g., one
handles a job listing query while another runs a statistics aggregation), so
their caches naturally spread across the database. The formula
`DB_size / max-open-connections` gives enough per-connection cache that the
combined caches can cover the entire database.
However, this is a best-case estimate. Connections running similar queries will
cache the same pages redundantly. In the worst case (all connections caching
identical pages), only `cache-size-mb` worth of unique data is cached rather
than `cache-size-mb × max-open-connections`. For workloads with diverse
concurrent queries, cache overlap is typically low.
**Rules of thumb:**
- **cache-size-mb**: Set to `DB_size_in_MB / max-open-connections` to allow the
entire database to be cached in memory. For example, an 80GB database with 8
connections needs at least 10240 MB (10GB) per connection. If your workload
has many similar concurrent queries, consider setting it higher to account for
cache overlap between connections.
- **soft-heap-limit-mb**: Should be >= `cache-size-mb × max-open-connections` to
avoid cache thrashing. This is the total SQLite memory budget for the process.
- On small installations the defaults work well. On servers with large databases
(tens of GB) and plenty of RAM, increasing these values significantly improves
query performance by reducing disk I/O.
### Example: Large Server (512GB RAM, 80GB database)
```json
{
"main": {
"db-config": {
"cache-size-mb": 16384,
"soft-heap-limit-mb": 131072,
"max-open-connections": 8,
"max-idle-time-minutes": 30
}
}
}
```
This allows the entire 80GB database to be cached (8 × 16GB = 128GB page cache)
with a 128GB soft heap limit, using about 25% of available RAM.
The effective configuration is logged at startup for verification.
## Project file structure
- [`.github/`](https://github.com/ClusterCockpit/cc-backend/tree/master/.github)

View File

@@ -1,43 +1,11 @@
# `cc-backend` version 1.5.1
# `cc-backend` version 1.5.0
Supports job archive version 3 and database version 11.
Supports job archive version 3 and database version 10.
This is a bugfix release of `cc-backend`, the API backend and frontend
This is a feature release of `cc-backend`, the API backend and frontend
implementation of ClusterCockpit.
For release specific notes visit the [ClusterCockpit Documentation](https://clusterockpit.org/docs/release/).
## Changes in 1.5.1
### Database
- **New migration (version 11)**: Optimized database index count for better performance
- **ANALYZE on startup**: Database statistics are now refreshed on startup for improved query planning
- **SQLite configuration hardening**: Sanitized SQLite configuration with new configurable options; fixes large heap allocations in the SQLite driver
- **Query cancellation**: Long-running database queries can now be cancelled
- **Resource leak fix**: Added missing `defer Close()` calls for all query result sets
### Bug fixes
- **Segfault when taggers misconfigured**: Fixed crash when `enable-job-taggers` is set but tagger rule directories are missing
- **GroupBy stats query complexity**: Reduced complexity for `groupBy` statistics queries
- **Ranged filter conditions**: Fixed GT and LT conditions in ranged filters
- **Energy filter preset**: Reduced energy filter preset to a more practical default
- **JSON validity check**: Fixed wrong field being checked for JSON validity
- **Tagger float rounding**: Fixed rounding of floats in tagger messages
- **Node view null safety**: Added null-safe checks in node view to prevent runtime errors
### Frontend
- **Bumped patch versions**: Updated frontend dependencies to latest patch versions
### Documentation
- **New DB config options**: Added new database configuration options to README
---
*The sections below document all features and changes introduced in the 1.5.0 major release, which 1.5.1 is based on.*
## Breaking changes
### Configuration changes
@@ -66,7 +34,7 @@ For release specific notes visit the [ClusterCockpit Documentation](https://clus
### Dependency changes
- **cc-lib v2.8.0**: Switched to cc-lib version 2 with updated APIs
- **cc-lib v2.5.1**: Switched to cc-lib version 2 with updated APIs (currently at v2.5.1)
- **cclib NATS client**: Now using the cclib NATS client implementation
- Removed obsolete `util.Float` usage from cclib

View File

@@ -11,7 +11,7 @@ import "flag"
var (
flagReinitDB, flagInit, flagServer, flagSyncLDAP, flagGops, flagMigrateDB, flagRevertDB,
flagForceDB, flagDev, flagVersion, flagLogDateTime, flagApplyTags, flagOptimizeDB bool
flagForceDB, flagDev, flagVersion, flagLogDateTime, flagApplyTags bool
flagNewUser, flagDelUser, flagGenJWT, flagConfigFile, flagImportJob, flagLogLevel string
)
@@ -27,7 +27,6 @@ func cliInit() {
flag.BoolVar(&flagRevertDB, "revert-db", false, "Migrate database to previous version and exit")
flag.BoolVar(&flagApplyTags, "apply-tags", false, "Run taggers on all completed jobs and exit")
flag.BoolVar(&flagForceDB, "force-db", false, "Force database version, clear dirty flag and exit")
flag.BoolVar(&flagOptimizeDB, "optimize-db", false, "Optimize database: run VACUUM to reclaim space, then ANALYZE to update query planner statistics")
flag.BoolVar(&flagLogDateTime, "logdate", false, "Set this flag to add date and time to log messages")
flag.StringVar(&flagConfigFile, "config", "./config.json", "Specify alternative path to `config.json`")
flag.StringVar(&flagNewUser, "add-user", "", "Add a new user. Argument format: <username>:[admin,support,manager,api,user]:<password>")

View File

@@ -108,26 +108,6 @@ func initConfiguration() error {
}
func initDatabase() error {
if config.Keys.DbConfig != nil {
cfg := repository.DefaultConfig()
dc := config.Keys.DbConfig
if dc.CacheSizeMB > 0 {
cfg.DbCacheSizeMB = dc.CacheSizeMB
}
if dc.SoftHeapLimitMB > 0 {
cfg.DbSoftHeapLimitMB = dc.SoftHeapLimitMB
}
if dc.MaxOpenConnections > 0 {
cfg.MaxOpenConnections = dc.MaxOpenConnections
}
if dc.MaxIdleConnections > 0 {
cfg.MaxIdleConnections = dc.MaxIdleConnections
}
if dc.ConnectionMaxIdleTimeMins > 0 {
cfg.ConnectionMaxIdleTime = time.Duration(dc.ConnectionMaxIdleTimeMins) * time.Minute
}
repository.SetConfig(cfg)
}
repository.Connect(config.Keys.DB)
return nil
}
@@ -509,20 +489,6 @@ func run() error {
return err
}
// Optimize database if requested
if flagOptimizeDB {
db := repository.GetConnection()
cclog.Print("Running VACUUM to reclaim space and defragment database...")
if _, err := db.DB.Exec("VACUUM"); err != nil {
return fmt.Errorf("VACUUM failed: %w", err)
}
cclog.Print("Running ANALYZE to update query planner statistics...")
if _, err := db.DB.Exec("ANALYZE"); err != nil {
return fmt.Errorf("ANALYZE failed: %w", err)
}
cclog.Exitf("OptimizeDB Success: Database '%s' optimized (VACUUM + ANALYZE).\n", config.Keys.DB)
}
// Handle user commands (add, delete, sync, JWT)
if err := handleUserCommands(); err != nil {
return err

View File

@@ -77,17 +77,6 @@ type ProgramConfig struct {
// Node state retention configuration
NodeStateRetention *NodeStateRetention `json:"nodestate-retention"`
// Database tuning configuration
DbConfig *DbConfig `json:"db-config"`
}
type DbConfig struct {
CacheSizeMB int `json:"cache-size-mb"`
SoftHeapLimitMB int `json:"soft-heap-limit-mb"`
MaxOpenConnections int `json:"max-open-connections"`
MaxIdleConnections int `json:"max-idle-connections"`
ConnectionMaxIdleTimeMins int `json:"max-idle-time-minutes"`
}
type NodeStateRetention struct {

View File

@@ -177,32 +177,6 @@ var configSchema = `
}
},
"required": ["policy"]
},
"db-config": {
"description": "SQLite database tuning configuration.",
"type": "object",
"properties": {
"cache-size-mb": {
"description": "SQLite page cache size per connection in MB (default: 2048).",
"type": "integer"
},
"soft-heap-limit-mb": {
"description": "Process-wide SQLite soft heap limit in MB (default: 16384).",
"type": "integer"
},
"max-open-connections": {
"description": "Maximum number of open database connections (default: 4).",
"type": "integer"
},
"max-idle-connections": {
"description": "Maximum number of idle database connections (default: 4).",
"type": "integer"
},
"max-idle-time-minutes": {
"description": "Maximum idle time for a connection in minutes (default: 10).",
"type": "integer"
}
}
}
}
}`

View File

@@ -645,7 +645,6 @@ func (r *queryResolver) Jobs(ctx context.Context, filter []*model.JobFilter, pag
// JobsStatistics is the resolver for the jobsStatistics field.
func (r *queryResolver) JobsStatistics(ctx context.Context, filter []*model.JobFilter, metrics []string, page *model.PageRequest, sortBy *model.SortByAggregate, groupBy *model.Aggregate, numDurationBins *string, numMetricBins *int) ([]*model.JobsStatistics, error) {
startOverall := time.Now()
var err error
var stats []*model.JobsStatistics
@@ -653,50 +652,31 @@ func (r *queryResolver) JobsStatistics(ctx context.Context, filter []*model.JobF
defaultDurationBins := "1h"
defaultMetricBins := 10
// Build requested fields map for selective column computation
statsFields := []string{"totalJobs", "totalUsers", "totalWalltime", "totalNodes", "totalCores",
"totalAccs", "totalNodeHours", "totalCoreHours", "totalAccHours", "runningJobs", "shortJobs"}
reqFields := make(map[string]bool, len(statsFields))
fetchedMainStats := false
for _, f := range statsFields {
if requireField(ctx, f) {
reqFields[f] = true
if f != "runningJobs" && f != "shortJobs" {
fetchedMainStats = true
}
}
}
if fetchedMainStats {
if requireField(ctx, "totalJobs") || requireField(ctx, "totalUsers") || requireField(ctx, "totalWalltime") || requireField(ctx, "totalNodes") || requireField(ctx, "totalCores") ||
requireField(ctx, "totalAccs") || requireField(ctx, "totalNodeHours") || requireField(ctx, "totalCoreHours") || requireField(ctx, "totalAccHours") {
if groupBy == nil {
stats, err = r.Repo.JobsStats(ctx, filter, reqFields)
stats, err = r.Repo.JobsStats(ctx, filter)
} else {
startGrouped := time.Now()
stats, err = r.Repo.JobsStatsGrouped(ctx, filter, page, sortBy, groupBy, reqFields)
cclog.Infof("Timer JobsStatsGrouped call: %s", time.Since(startGrouped))
stats, err = r.Repo.JobsStatsGrouped(ctx, filter, page, sortBy, groupBy)
}
} else {
stats = make([]*model.JobsStatistics, 0, 1)
stats = append(stats, &model.JobsStatistics{})
}
// runningJobs and shortJobs are already inlined in JobsStats/JobsStatsGrouped.
// Only run separate count queries if main stats were not fetched.
if !fetchedMainStats {
if groupBy != nil {
if requireField(ctx, "shortJobs") {
stats, err = r.Repo.AddJobCountGrouped(ctx, filter, groupBy, stats, "short")
}
if requireField(ctx, "runningJobs") {
stats, err = r.Repo.AddJobCountGrouped(ctx, filter, groupBy, stats, "running")
}
} else {
if requireField(ctx, "shortJobs") {
stats, err = r.Repo.AddJobCount(ctx, filter, stats, "short")
}
if requireField(ctx, "runningJobs") {
stats, err = r.Repo.AddJobCount(ctx, filter, stats, "running")
}
if groupBy != nil {
if requireField(ctx, "shortJobs") {
stats, err = r.Repo.AddJobCountGrouped(ctx, filter, groupBy, stats, "short")
}
if requireField(ctx, "runningJobs") {
stats, err = r.Repo.AddJobCountGrouped(ctx, filter, groupBy, stats, "running")
}
} else {
if requireField(ctx, "shortJobs") {
stats, err = r.Repo.AddJobCount(ctx, filter, stats, "short")
}
if requireField(ctx, "runningJobs") {
stats, err = r.Repo.AddJobCount(ctx, filter, stats, "running")
}
}
@@ -736,7 +716,6 @@ func (r *queryResolver) JobsStatistics(ctx context.Context, filter []*model.JobF
}
}
cclog.Infof("Timer JobsStatistics overall: %s", time.Since(startOverall))
return stats, nil
}

View File

@@ -27,25 +27,13 @@ type RepositoryConfig struct {
ConnectionMaxLifetime time.Duration
// ConnectionMaxIdleTime is the maximum amount of time a connection may be idle.
// Default: 10 minutes
// Default: 1 hour
ConnectionMaxIdleTime time.Duration
// MinRunningJobDuration is the minimum duration in seconds for a job to be
// considered in "running jobs" queries. This filters out very short jobs.
// Default: 600 seconds (10 minutes)
MinRunningJobDuration int
// DbCacheSizeMB is the SQLite page cache size per connection in MB.
// Uses negative PRAGMA cache_size notation (KiB). With MaxOpenConnections=4
// and DbCacheSizeMB=2048, total page cache is up to 8GB.
// Default: 2048 (2GB)
DbCacheSizeMB int
// DbSoftHeapLimitMB is the process-wide SQLite soft heap limit in MB.
// SQLite will try to release cache pages to stay under this limit.
// It's a soft limit — queries won't fail, but cache eviction becomes more aggressive.
// Default: 16384 (16GB)
DbSoftHeapLimitMB int
}
// DefaultConfig returns the default repository configuration.
@@ -56,10 +44,8 @@ func DefaultConfig() *RepositoryConfig {
MaxOpenConnections: 4,
MaxIdleConnections: 4,
ConnectionMaxLifetime: time.Hour,
ConnectionMaxIdleTime: 10 * time.Minute,
ConnectionMaxIdleTime: time.Hour,
MinRunningJobDuration: 600, // 10 minutes
DbCacheSizeMB: 2048, // 2GB per connection
DbSoftHeapLimitMB: 16384, // 16GB process-wide
}
}

View File

@@ -36,10 +36,9 @@ type DatabaseOptions struct {
ConnectionMaxIdleTime time.Duration
}
func setupSqlite(db *sql.DB, cfg *RepositoryConfig) error {
func setupSqlite(db *sql.DB) error {
pragmas := []string{
"temp_store = memory",
fmt.Sprintf("soft_heap_limit = %d", int64(cfg.DbSoftHeapLimitMB)*1024*1024),
}
for _, pragma := range pragmas {
@@ -72,8 +71,7 @@ func Connect(db string) {
connectionURLParams.Add("_journal_mode", "WAL")
connectionURLParams.Add("_busy_timeout", "5000")
connectionURLParams.Add("_synchronous", "NORMAL")
cacheSizeKiB := repoConfig.DbCacheSizeMB * 1024 // Convert MB to KiB
connectionURLParams.Add("_cache_size", fmt.Sprintf("-%d", cacheSizeKiB))
connectionURLParams.Add("_cache_size", "1000000000")
connectionURLParams.Add("_foreign_keys", "true")
opts.URL = fmt.Sprintf("file:%s?%s", opts.URL, connectionURLParams.Encode())
@@ -88,14 +86,11 @@ func Connect(db string) {
cclog.Abortf("DB Connection: Could not connect to SQLite database with sqlx.Open().\nError: %s\n", err.Error())
}
err = setupSqlite(dbHandle.DB, repoConfig)
err = setupSqlite(dbHandle.DB)
if err != nil {
cclog.Abortf("Failed sqlite db setup.\nError: %s\n", err.Error())
}
cclog.Infof("SQLite config: cache_size=%dMB/conn, soft_heap_limit=%dMB, max_conns=%d",
repoConfig.DbCacheSizeMB, repoConfig.DbSoftHeapLimitMB, repoConfig.MaxOpenConnections)
dbHandle.SetMaxOpenConns(opts.MaxOpenConnections)
dbHandle.SetMaxIdleConns(opts.MaxIdleConnections)
dbHandle.SetConnMaxLifetime(opts.ConnectionMaxLifetime)

View File

@@ -171,7 +171,7 @@ func (r *JobRepository) FindByID(ctx context.Context, jobID int64) (*schema.Job,
return nil, qerr
}
return scanJob(q.RunWith(r.stmtCache).QueryRowContext(ctx))
return scanJob(q.RunWith(r.stmtCache).QueryRow())
}
// FindByIDWithUser executes a SQL query to find a specific batch job.
@@ -217,7 +217,7 @@ func (r *JobRepository) FindByJobID(ctx context.Context, jobID int64, startTime
return nil, qerr
}
return scanJob(q.RunWith(r.stmtCache).QueryRowContext(ctx))
return scanJob(q.RunWith(r.stmtCache).QueryRow())
}
// IsJobOwner checks if the specified user owns the batch job identified by jobID,

View File

@@ -63,7 +63,7 @@ func (r *JobRepository) QueryJobs(
}
} else {
// Order by footprint JSON field values
query = query.Where("JSON_VALID(footprint)")
query = query.Where("JSON_VALID(meta_data)")
switch order.Order {
case model.SortDirectionEnumAsc:
query = query.OrderBy(fmt.Sprintf("JSON_EXTRACT(footprint, \"$.%s\") ASC", field))
@@ -84,7 +84,7 @@ func (r *JobRepository) QueryJobs(
query = BuildWhereClause(f, query)
}
rows, err := query.RunWith(r.stmtCache).QueryContext(ctx)
rows, err := query.RunWith(r.stmtCache).Query()
if err != nil {
queryString, queryVars, _ := query.ToSql()
return nil, fmt.Errorf("query failed [%s] %v: %w", queryString, queryVars, err)
@@ -126,7 +126,7 @@ func (r *JobRepository) CountJobs(
}
var count int
if err := query.RunWith(r.DB).QueryRowContext(ctx).Scan(&count); err != nil {
if err := query.RunWith(r.DB).Scan(&count); err != nil {
return 0, fmt.Errorf("failed to count jobs: %w", err)
}
@@ -276,26 +276,28 @@ func BuildWhereClause(filter *model.JobFilter, query sq.SelectBuilder) sq.Select
return query
}
// buildIntCondition creates clauses for integer range filters, using BETWEEN only if required.
// buildIntCondition creates a BETWEEN clause for integer range filters.
// Reminder: BETWEEN Queries are slower and dont use indices as frequently: Only use if both conditions required
func buildIntCondition(field string, cond *config.IntRange, query sq.SelectBuilder) sq.SelectBuilder {
if cond.From != 1 && cond.To != 0 {
if cond.From != 0 && cond.To != 0 {
return query.Where(field+" BETWEEN ? AND ?", cond.From, cond.To)
} else if cond.From != 1 && cond.To == 0 {
} else if cond.From != 0 {
return query.Where(field+" >= ?", cond.From)
} else if cond.From == 1 && cond.To != 0 {
} else if cond.To != 0 {
return query.Where(field+" <= ?", cond.To)
} else {
return query
}
}
// buildFloatCondition creates a clauses for float range filters, using BETWEEN only if required.
// buildFloatCondition creates a BETWEEN clause for float range filters.
// Reminder: BETWEEN Queries are slower and dont use indices as frequently: Only use if both conditions required
func buildFloatCondition(field string, cond *model.FloatRange, query sq.SelectBuilder) sq.SelectBuilder {
if cond.From != 1.0 && cond.To != 0.0 {
if cond.From != 0.0 && cond.To != 0.0 {
return query.Where(field+" BETWEEN ? AND ?", cond.From, cond.To)
} else if cond.From != 1.0 && cond.To == 0.0 {
} else if cond.From != 0.0 {
return query.Where(field+" >= ?", cond.From)
} else if cond.From == 1.0 && cond.To != 0.0 {
} else if cond.To != 0.0 {
return query.Where(field+" <= ?", cond.To)
} else {
return query
@@ -334,15 +336,16 @@ func buildTimeCondition(field string, cond *config.TimeRange, query sq.SelectBui
}
}
// buildFloatJSONCondition creates a filter on a numeric field within the footprint JSON column, using BETWEEN only if required.
func buildFloatJSONCondition(jsonField string, cond *model.FloatRange, query sq.SelectBuilder) sq.SelectBuilder {
// buildFloatJSONCondition creates a filter on a numeric field within the footprint JSON column.
// Reminder: BETWEEN Queries are slower and dont use indices as frequently: Only use if both conditions required
func buildFloatJSONCondition(condName string, condRange *model.FloatRange, query sq.SelectBuilder) sq.SelectBuilder {
query = query.Where("JSON_VALID(footprint)")
if cond.From != 1.0 && cond.To != 0.0 {
return query.Where("JSON_EXTRACT(footprint, \"$."+jsonField+"\") BETWEEN ? AND ?", cond.From, cond.To)
} else if cond.From != 1.0 && cond.To == 0.0 {
return query.Where("JSON_EXTRACT(footprint, \"$."+jsonField+"\") >= ?", cond.From)
} else if cond.From == 1.0 && cond.To != 0.0 {
return query.Where("JSON_EXTRACT(footprint, \"$."+jsonField+"\") <= ?", cond.To)
if condRange.From != 0.0 && condRange.To != 0.0 {
return query.Where("JSON_EXTRACT(footprint, \"$."+condName+"\") BETWEEN ? AND ?", condRange.From, condRange.To)
} else if condRange.From != 0.0 {
return query.Where("JSON_EXTRACT(footprint, \"$."+condName+"\") >= ?", condRange.From)
} else if condRange.To != 0.0 {
return query.Where("JSON_EXTRACT(footprint, \"$."+condName+"\") <= ?", condRange.To)
} else {
return query
}

View File

@@ -21,12 +21,10 @@ import (
// is added to internal/repository/migrations/sqlite3/.
//
// Version history:
// - Version 12: Add covering index for stats queries (cluster, start_time, hpc_user, ...)
// - Version 11: Optimize job table indexes (reduce from ~78 to 48)
// - Version 10: Node table
// - Version 10: Current version
//
// Migration files are embedded at build time from the migrations directory.
const Version uint = 12
const Version uint = 10
//go:embed migrations/*
var migrationFiles embed.FS

View File

@@ -139,6 +139,12 @@ CREATE INDEX IF NOT EXISTS jobs_cluster_partition_project ON job (cluster, clust
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_jobstate ON job (cluster, cluster_partition, job_state);
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_shared ON job (cluster, cluster_partition, shared);
-- Cluster+Partition Filter Sorting
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_numnodes ON job (cluster, cluster_partition, num_nodes);
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_numhwthreads ON job (cluster, cluster_partition, num_hwthreads);
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_numacc ON job (cluster, cluster_partition, num_acc);
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_energy ON job (cluster, cluster_partition, energy);
-- Cluster+Partition Time Filter Sorting
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_duration_starttime ON job (cluster, cluster_partition, duration, start_time);
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_starttime_duration ON job (cluster, cluster_partition, start_time, duration);
@@ -146,6 +152,11 @@ CREATE INDEX IF NOT EXISTS jobs_cluster_partition_starttime_duration ON job (clu
-- Cluster+JobState Filter
CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_user ON job (cluster, job_state, hpc_user);
CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_project ON job (cluster, job_state, project);
-- Cluster+JobState Filter Sorting
CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_numnodes ON job (cluster, job_state, num_nodes);
CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_numhwthreads ON job (cluster, job_state, num_hwthreads);
CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_numacc ON job (cluster, job_state, num_acc);
CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_energy ON job (cluster, job_state, energy);
-- Cluster+JobState Time Filter Sorting
CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_starttime_duration ON job (cluster, job_state, start_time, duration);
@@ -154,18 +165,34 @@ CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_duration_starttime ON job (clus
-- Cluster+Shared Filter
CREATE INDEX IF NOT EXISTS jobs_cluster_shared_user ON job (cluster, shared, hpc_user);
CREATE INDEX IF NOT EXISTS jobs_cluster_shared_project ON job (cluster, shared, project);
-- Cluster+Shared Filter Sorting
CREATE INDEX IF NOT EXISTS jobs_cluster_shared_numnodes ON job (cluster, shared, num_nodes);
CREATE INDEX IF NOT EXISTS jobs_cluster_shared_numhwthreads ON job (cluster, shared, num_hwthreads);
CREATE INDEX IF NOT EXISTS jobs_cluster_shared_numacc ON job (cluster, shared, num_acc);
CREATE INDEX IF NOT EXISTS jobs_cluster_shared_energy ON job (cluster, shared, energy);
-- Cluster+Shared Time Filter Sorting
CREATE INDEX IF NOT EXISTS jobs_cluster_shared_starttime_duration ON job (cluster, shared, start_time, duration);
CREATE INDEX IF NOT EXISTS jobs_cluster_shared_duration_starttime ON job (cluster, shared, duration, start_time);
-- User Filter
-- User Filter Sorting
CREATE INDEX IF NOT EXISTS jobs_user_numnodes ON job (hpc_user, num_nodes);
CREATE INDEX IF NOT EXISTS jobs_user_numhwthreads ON job (hpc_user, num_hwthreads);
CREATE INDEX IF NOT EXISTS jobs_user_numacc ON job (hpc_user, num_acc);
CREATE INDEX IF NOT EXISTS jobs_user_energy ON job (hpc_user, energy);
-- Cluster+Shared Time Filter Sorting
CREATE INDEX IF NOT EXISTS jobs_user_starttime_duration ON job (hpc_user, start_time, duration);
CREATE INDEX IF NOT EXISTS jobs_user_duration_starttime ON job (hpc_user, duration, start_time);
-- Project Filter
CREATE INDEX IF NOT EXISTS jobs_project_user ON job (project, hpc_user);
-- Project Filter Sorting
CREATE INDEX IF NOT EXISTS jobs_project_numnodes ON job (project, num_nodes);
CREATE INDEX IF NOT EXISTS jobs_project_numhwthreads ON job (project, num_hwthreads);
CREATE INDEX IF NOT EXISTS jobs_project_numacc ON job (project, num_acc);
CREATE INDEX IF NOT EXISTS jobs_project_energy ON job (project, energy);
-- Cluster+Shared Time Filter Sorting
CREATE INDEX IF NOT EXISTS jobs_project_starttime_duration ON job (project, start_time, duration);
@@ -174,6 +201,11 @@ CREATE INDEX IF NOT EXISTS jobs_project_duration_starttime ON job (project, dura
-- JobState Filter
CREATE INDEX IF NOT EXISTS jobs_jobstate_user ON job (job_state, hpc_user);
CREATE INDEX IF NOT EXISTS jobs_jobstate_project ON job (job_state, project);
-- JobState Filter Sorting
CREATE INDEX IF NOT EXISTS jobs_jobstate_numnodes ON job (job_state, num_nodes);
CREATE INDEX IF NOT EXISTS jobs_jobstate_numhwthreads ON job (job_state, num_hwthreads);
CREATE INDEX IF NOT EXISTS jobs_jobstate_numacc ON job (job_state, num_acc);
CREATE INDEX IF NOT EXISTS jobs_jobstate_energy ON job (job_state, energy);
-- Cluster+Shared Time Filter Sorting
CREATE INDEX IF NOT EXISTS jobs_jobstate_starttime_duration ON job (job_state, start_time, duration);
@@ -182,6 +214,11 @@ CREATE INDEX IF NOT EXISTS jobs_jobstate_duration_starttime ON job (job_state, d
-- Shared Filter
CREATE INDEX IF NOT EXISTS jobs_shared_user ON job (shared, hpc_user);
CREATE INDEX IF NOT EXISTS jobs_shared_project ON job (shared, project);
-- Shared Filter Sorting
CREATE INDEX IF NOT EXISTS jobs_shared_numnodes ON job (shared, num_nodes);
CREATE INDEX IF NOT EXISTS jobs_shared_numhwthreads ON job (shared, num_hwthreads);
CREATE INDEX IF NOT EXISTS jobs_shared_numacc ON job (shared, num_acc);
CREATE INDEX IF NOT EXISTS jobs_shared_energy ON job (shared, energy);
-- Cluster+Shared Time Filter Sorting
CREATE INDEX IF NOT EXISTS jobs_shared_starttime_duration ON job (shared, start_time, duration);
@@ -189,6 +226,7 @@ CREATE INDEX IF NOT EXISTS jobs_shared_duration_starttime ON job (shared, durati
-- ArrayJob Filter
CREATE INDEX IF NOT EXISTS jobs_arrayjobid_starttime ON job (array_job_id, start_time);
CREATE INDEX IF NOT EXISTS jobs_cluster_arrayjobid_starttime ON job (cluster, array_job_id, start_time);
-- Single filters with default starttime sorting
CREATE INDEX IF NOT EXISTS jobs_duration_starttime ON job (duration, start_time);
@@ -206,6 +244,7 @@ CREATE INDEX IF NOT EXISTS jobs_energy_duration ON job (energy, duration);
-- Backup Indices For High Variety Columns
CREATE INDEX IF NOT EXISTS jobs_starttime ON job (start_time);
CREATE INDEX IF NOT EXISTS jobs_duration ON job (duration);
-- Notes:
-- Cluster+Partition+Jobstate Filter: Tested -> Full Array Of Combinations non-required

View File

@@ -1,56 +0,0 @@
-- Migration 11 DOWN: Restore indexes from migration 09
-- ============================================================
-- Recreate all removed indexes from migration 09
-- ============================================================
-- Cluster+Partition Filter Sorting
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_numnodes ON job (cluster, cluster_partition, num_nodes);
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_numhwthreads ON job (cluster, cluster_partition, num_hwthreads);
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_numacc ON job (cluster, cluster_partition, num_acc);
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_energy ON job (cluster, cluster_partition, energy);
-- Cluster+JobState Filter Sorting
CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_numnodes ON job (cluster, job_state, num_nodes);
CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_numhwthreads ON job (cluster, job_state, num_hwthreads);
CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_numacc ON job (cluster, job_state, num_acc);
CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_energy ON job (cluster, job_state, energy);
-- Cluster+Shared Filter Sorting
CREATE INDEX IF NOT EXISTS jobs_cluster_shared_numnodes ON job (cluster, shared, num_nodes);
CREATE INDEX IF NOT EXISTS jobs_cluster_shared_numhwthreads ON job (cluster, shared, num_hwthreads);
CREATE INDEX IF NOT EXISTS jobs_cluster_shared_numacc ON job (cluster, shared, num_acc);
CREATE INDEX IF NOT EXISTS jobs_cluster_shared_energy ON job (cluster, shared, energy);
-- User Filter Sorting
CREATE INDEX IF NOT EXISTS jobs_user_numnodes ON job (hpc_user, num_nodes);
CREATE INDEX IF NOT EXISTS jobs_user_numhwthreads ON job (hpc_user, num_hwthreads);
CREATE INDEX IF NOT EXISTS jobs_user_numacc ON job (hpc_user, num_acc);
CREATE INDEX IF NOT EXISTS jobs_user_energy ON job (hpc_user, energy);
-- Project Filter Sorting
CREATE INDEX IF NOT EXISTS jobs_project_numnodes ON job (project, num_nodes);
CREATE INDEX IF NOT EXISTS jobs_project_numhwthreads ON job (project, num_hwthreads);
CREATE INDEX IF NOT EXISTS jobs_project_numacc ON job (project, num_acc);
CREATE INDEX IF NOT EXISTS jobs_project_energy ON job (project, energy);
-- JobState Filter Sorting
CREATE INDEX IF NOT EXISTS jobs_jobstate_numnodes ON job (job_state, num_nodes);
CREATE INDEX IF NOT EXISTS jobs_jobstate_numhwthreads ON job (job_state, num_hwthreads);
CREATE INDEX IF NOT EXISTS jobs_jobstate_numacc ON job (job_state, num_acc);
CREATE INDEX IF NOT EXISTS jobs_jobstate_energy ON job (job_state, energy);
-- Shared Filter Sorting
CREATE INDEX IF NOT EXISTS jobs_shared_numnodes ON job (shared, num_nodes);
CREATE INDEX IF NOT EXISTS jobs_shared_numhwthreads ON job (shared, num_hwthreads);
CREATE INDEX IF NOT EXISTS jobs_shared_numacc ON job (shared, num_acc);
CREATE INDEX IF NOT EXISTS jobs_shared_energy ON job (shared, energy);
-- ArrayJob Filter
CREATE INDEX IF NOT EXISTS jobs_cluster_arrayjobid_starttime ON job (cluster, array_job_id, start_time);
-- Backup Indices For High Variety Columns
CREATE INDEX IF NOT EXISTS jobs_duration ON job (duration);
-- Optimize DB index usage
PRAGMA optimize;

View File

@@ -1,61 +0,0 @@
-- Migration 11: Remove overly specific table indexes formerly used in sorting
-- When one or two indexed columns are used, sorting usually is fast
-- Reduces from ~78 indexes to 48 for better write performance,
-- reduced disk usage, and more reliable query planner decisions.
-- Requires ANALYZE to be run after migration (done automatically on startup).
-- ============================================================
-- Drop SELECTED existing job indexes (from migrations 08/09)
-- sqlite_autoindex_job_1 (UNIQUE constraint) is kept automatically
-- ============================================================
-- Cluster+Partition Filter Sorting
DROP INDEX IF EXISTS jobs_cluster_partition_numnodes;
DROP INDEX IF EXISTS jobs_cluster_partition_numhwthreads;
DROP INDEX IF EXISTS jobs_cluster_partition_numacc;
DROP INDEX IF EXISTS jobs_cluster_partition_energy;
-- Cluster+JobState Filter Sorting
DROP INDEX IF EXISTS jobs_cluster_jobstate_numnodes;
DROP INDEX IF EXISTS jobs_cluster_jobstate_numhwthreads;
DROP INDEX IF EXISTS jobs_cluster_jobstate_numacc;
DROP INDEX IF EXISTS jobs_cluster_jobstate_energy;
-- Cluster+Shared Filter Sorting
DROP INDEX IF EXISTS jobs_cluster_shared_numnodes;
DROP INDEX IF EXISTS jobs_cluster_shared_numhwthreads;
DROP INDEX IF EXISTS jobs_cluster_shared_numacc;
DROP INDEX IF EXISTS jobs_cluster_shared_energy;
-- User Filter Sorting
DROP INDEX IF EXISTS jobs_user_numnodes;
DROP INDEX IF EXISTS jobs_user_numhwthreads;
DROP INDEX IF EXISTS jobs_user_numacc;
DROP INDEX IF EXISTS jobs_user_energy;
-- Project Filter Sorting
DROP INDEX IF EXISTS jobs_project_numnodes;
DROP INDEX IF EXISTS jobs_project_numhwthreads;
DROP INDEX IF EXISTS jobs_project_numacc;
DROP INDEX IF EXISTS jobs_project_energy;
-- JobState Filter Sorting
DROP INDEX IF EXISTS jobs_jobstate_numnodes;
DROP INDEX IF EXISTS jobs_jobstate_numhwthreads;
DROP INDEX IF EXISTS jobs_jobstate_numacc;
DROP INDEX IF EXISTS jobs_jobstate_energy;
-- Shared Filter Sorting
DROP INDEX IF EXISTS jobs_shared_numnodes;
DROP INDEX IF EXISTS jobs_shared_numhwthreads;
DROP INDEX IF EXISTS jobs_shared_numacc;
DROP INDEX IF EXISTS jobs_shared_energy;
-- ArrayJob Filter
DROP INDEX IF EXISTS jobs_cluster_arrayjobid_starttime;
-- Backup Indices For High Variety Columns
DROP INDEX IF EXISTS jobs_duration;
-- Optimize DB index usage
PRAGMA optimize;

View File

@@ -1 +0,0 @@
DROP INDEX IF EXISTS jobs_cluster_user_starttime_stats;

View File

@@ -1,11 +0,0 @@
-- Migration 12: Add covering index for grouped stats queries
-- Column order: cluster (equality), hpc_user (GROUP BY), start_time (range scan)
-- Includes aggregated columns to avoid main table lookups entirely.
CREATE INDEX IF NOT EXISTS jobs_cluster_user_starttime_stats
ON job (cluster, hpc_user, start_time, duration, job_state, num_nodes, num_hwthreads, num_acc);
CREATE INDEX IF NOT EXISTS jobs_cluster_project_starttime_stats
ON job (cluster, project, start_time, duration, job_state, num_nodes, num_hwthreads, num_acc);
PRAGMA optimize;

View File

@@ -366,23 +366,19 @@ func (r *NodeRepository) QueryNodes(
cclog.Errorf("Error while running query '%s' %v: %v", queryString, queryVars, err)
return nil, err
}
defer rows.Close()
nodes := make([]*schema.Node, 0)
for rows.Next() {
node := schema.Node{}
if err := rows.Scan(&node.Hostname, &node.Cluster, &node.SubCluster,
&node.NodeState, &node.HealthState); err != nil {
rows.Close()
cclog.Warn("Error while scanning rows (QueryNodes)")
return nil, err
}
nodes = append(nodes, &node)
}
if err := rows.Err(); err != nil {
return nil, err
}
return nodes, nil
}
@@ -419,7 +415,6 @@ func (r *NodeRepository) QueryNodesWithMeta(
cclog.Errorf("Error while running query '%s' %v: %v", queryString, queryVars, err)
return nil, err
}
defer rows.Close()
nodes := make([]*schema.Node, 0)
for rows.Next() {
@@ -429,6 +424,7 @@ func (r *NodeRepository) QueryNodesWithMeta(
if err := rows.Scan(&node.Hostname, &node.Cluster, &node.SubCluster,
&node.NodeState, &node.HealthState, &RawMetaData, &RawMetricHealth); err != nil {
rows.Close()
cclog.Warn("Error while scanning rows (QueryNodes)")
return nil, err
}
@@ -458,10 +454,6 @@ func (r *NodeRepository) QueryNodesWithMeta(
nodes = append(nodes, &node)
}
if err := rows.Err(); err != nil {
return nil, err
}
return nodes, nil
}
@@ -553,11 +545,10 @@ func (r *NodeRepository) MapNodes(cluster string) (map[string]string, error) {
func (r *NodeRepository) CountStates(ctx context.Context, filters []*model.NodeFilter, column string) ([]*model.NodeStates, error) {
query, qerr := AccessCheck(ctx,
sq.Select(column, "COUNT(*) as count").
sq.Select(column).
From("node").
Join("node_state ON node_state.node_id = node.id").
Where(latestStateCondition()).
GroupBy(column))
Where(latestStateCondition()))
if qerr != nil {
return nil, qerr
}
@@ -570,21 +561,23 @@ func (r *NodeRepository) CountStates(ctx context.Context, filters []*model.NodeF
cclog.Errorf("Error while running query '%s' %v: %v", queryString, queryVars, err)
return nil, err
}
defer rows.Close()
nodes := make([]*model.NodeStates, 0)
stateMap := map[string]int{}
for rows.Next() {
var state string
var count int
if err := rows.Scan(&state, &count); err != nil {
if err := rows.Scan(&state); err != nil {
rows.Close()
cclog.Warn("Error while scanning rows (CountStates)")
return nil, err
}
nodes = append(nodes, &model.NodeStates{State: state, Count: count})
stateMap[state] += 1
}
if err := rows.Err(); err != nil {
return nil, err
nodes := make([]*model.NodeStates, 0)
for state, counts := range stateMap {
node := model.NodeStates{State: state, Count: counts}
nodes = append(nodes, &node)
}
return nodes, nil
@@ -630,7 +623,6 @@ func (r *NodeRepository) CountStatesTimed(ctx context.Context, filters []*model.
cclog.Errorf("Error while running query '%s' %v: %v", queryString, queryVars, err)
return nil, err
}
defer rows.Close()
rawData := make(map[string][][]int)
for rows.Next() {
@@ -638,6 +630,7 @@ func (r *NodeRepository) CountStatesTimed(ctx context.Context, filters []*model.
var timestamp, count int
if err := rows.Scan(&state, &timestamp, &count); err != nil {
rows.Close()
cclog.Warnf("Error while scanning rows (CountStatesTimed) at time '%d'", timestamp)
return nil, err
}
@@ -650,10 +643,6 @@ func (r *NodeRepository) CountStatesTimed(ctx context.Context, filters []*model.
rawData[state][1] = append(rawData[state][1], count)
}
if err := rows.Err(); err != nil {
return nil, err
}
timedStates := make([]*model.NodeStatesTimed, 0)
for state, data := range rawData {
entry := model.NodeStatesTimed{State: state, Times: data[0], Counts: data[1]}

View File

@@ -105,9 +105,9 @@ func (r *JobRepository) buildCountQuery(
var query sq.SelectBuilder
if col != "" {
query = sq.Select(col, "COUNT(*)").From("job").GroupBy(col)
query = sq.Select(col, "COUNT(job.id)").From("job").GroupBy(col)
} else {
query = sq.Select("COUNT(*)").From("job")
query = sq.Select("COUNT(job.id)").From("job")
}
switch kind {
@@ -124,100 +124,59 @@ func (r *JobRepository) buildCountQuery(
return query
}
// buildStatsQuery constructs a SQL query to compute job statistics with optional grouping.
// Only requested columns are computed; unrequested columns select 0 as placeholder.
// buildStatsQuery constructs a SQL query to compute comprehensive job statistics with optional grouping.
//
// Parameters:
// - filter: Job filters to apply (cluster, user, time range, etc.)
// - col: Column name to GROUP BY; empty string for overall statistics without grouping
// - shortThreshold: Duration threshold in seconds for counting short-running jobs
// - reqFields: Set of requested field names; nil means compute all fields
//
// Returns a SelectBuilder that produces comprehensive statistics:
// - totalJobs: Count of jobs
// - totalUsers: Count of distinct users (always 0 when grouping by user)
// - totalWalltime: Sum of job durations in hours
// - totalNodes: Sum of nodes used across all jobs
// - totalNodeHours: Sum of (duration × num_nodes) in hours
// - totalCores: Sum of hardware threads used across all jobs
// - totalCoreHours: Sum of (duration × num_hwthreads) in hours
// - totalAccs: Sum of accelerators used across all jobs
// - totalAccHours: Sum of (duration × num_acc) in hours
//
// Special handling:
// - Running jobs: Duration calculated as (now - start_time) instead of stored duration
// - Grouped queries: Also select grouping column and user's display name from hpc_user table
// - All time values converted from seconds to hours (÷ 3600) and rounded
func (r *JobRepository) buildStatsQuery(
filter []*model.JobFilter,
col string,
shortThreshold int,
reqFields map[string]bool,
) sq.SelectBuilder {
now := time.Now().Unix()
// Helper: return real expression if field is requested (or reqFields is nil), else "0 as alias"
need := func(field string) bool {
return reqFields == nil || reqFields[field]
}
durationExpr := fmt.Sprintf(`(CASE WHEN job.job_state = 'running' THEN %d - job.start_time ELSE job.duration END)`, now)
// Build column list
columns := make([]string, 0, 14)
var query sq.SelectBuilder
if col != "" {
columns = append(columns, col)
}
columns = append(columns, "COUNT(*) as totalJobs")
if need("totalUsers") && col != "job.hpc_user" {
columns = append(columns, "COUNT(DISTINCT job.hpc_user) AS totalUsers")
query = sq.Select(
col,
"name",
"COUNT(job.id) as totalJobs",
"COUNT(DISTINCT job.hpc_user) AS totalUsers",
fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END)) / 3600) as int) as totalWalltime`, time.Now().Unix()),
`CAST(SUM(job.num_nodes) as int) as totalNodes`,
fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_nodes) / 3600) as int) as totalNodeHours`, time.Now().Unix()),
`CAST(SUM(job.num_hwthreads) as int) as totalCores`,
fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_hwthreads) / 3600) as int) as totalCoreHours`, time.Now().Unix()),
`CAST(SUM(job.num_acc) as int) as totalAccs`,
fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_acc) / 3600) as int) as totalAccHours`, time.Now().Unix()),
).From("job").LeftJoin("hpc_user ON hpc_user.username = job.hpc_user").GroupBy(col)
} else {
columns = append(columns, "0 AS totalUsers")
}
if need("totalWalltime") {
columns = append(columns, fmt.Sprintf(`CAST(ROUND(SUM(%s) / 3600) as int) as totalWalltime`, durationExpr))
} else {
columns = append(columns, "0 as totalWalltime")
}
if need("totalNodes") {
columns = append(columns, `CAST(SUM(job.num_nodes) as int) as totalNodes`)
} else {
columns = append(columns, "0 as totalNodes")
}
if need("totalNodeHours") {
columns = append(columns, fmt.Sprintf(`CAST(ROUND(SUM(%s * job.num_nodes) / 3600) as int) as totalNodeHours`, durationExpr))
} else {
columns = append(columns, "0 as totalNodeHours")
}
if need("totalCores") {
columns = append(columns, `CAST(SUM(job.num_hwthreads) as int) as totalCores`)
} else {
columns = append(columns, "0 as totalCores")
}
if need("totalCoreHours") {
columns = append(columns, fmt.Sprintf(`CAST(ROUND(SUM(%s * job.num_hwthreads) / 3600) as int) as totalCoreHours`, durationExpr))
} else {
columns = append(columns, "0 as totalCoreHours")
}
if need("totalAccs") {
columns = append(columns, `CAST(SUM(job.num_acc) as int) as totalAccs`)
} else {
columns = append(columns, "0 as totalAccs")
}
if need("totalAccHours") {
columns = append(columns, fmt.Sprintf(`CAST(ROUND(SUM(%s * job.num_acc) / 3600) as int) as totalAccHours`, durationExpr))
} else {
columns = append(columns, "0 as totalAccHours")
}
if need("runningJobs") {
columns = append(columns, `COUNT(CASE WHEN job.job_state = 'running' THEN 1 END) as runningJobs`)
} else {
columns = append(columns, "0 as runningJobs")
}
if need("shortJobs") {
columns = append(columns, fmt.Sprintf(`COUNT(CASE WHEN job.duration < %d THEN 1 END) as shortJobs`, shortThreshold))
} else {
columns = append(columns, "0 as shortJobs")
}
query := sq.Select(columns...).From("job")
if col != "" {
query = query.GroupBy(col)
query = sq.Select(
"COUNT(job.id) as totalJobs",
"COUNT(DISTINCT job.hpc_user) AS totalUsers",
fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END)) / 3600) as int)`, time.Now().Unix()),
`CAST(SUM(job.num_nodes) as int)`,
fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_nodes) / 3600) as int)`, time.Now().Unix()),
`CAST(SUM(job.num_hwthreads) as int)`,
fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_hwthreads) / 3600) as int)`, time.Now().Unix()),
`CAST(SUM(job.num_acc) as int)`,
fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_acc) / 3600) as int)`, time.Now().Unix()),
).From("job")
}
for _, f := range filter {
@@ -227,19 +186,35 @@ func (r *JobRepository) buildStatsQuery(
return query
}
// JobsStatsGrouped computes job statistics grouped by a dimension (user, project, cluster, or subcluster).
// Only columns listed in reqFields are computed; others return 0. User display names are looked up
// in a separate lightweight query to avoid JOIN overhead on the main aggregation.
// JobsStatsGrouped computes comprehensive job statistics grouped by a dimension (user, project, cluster, or subcluster).
//
// This is the primary method for generating aggregated statistics views in the UI, providing
// metrics like total jobs, walltime, and resource usage broken down by the specified grouping.
//
// Parameters:
// - ctx: Context for security checks and cancellation
// - filter: Filters to apply (time range, cluster, job state, etc.)
// - page: Optional pagination (ItemsPerPage: -1 disables pagination)
// - sortBy: Optional sort column (totalJobs, totalWalltime, totalCoreHours, etc.)
// - groupBy: Required grouping dimension (User, Project, Cluster, or SubCluster)
//
// Returns a slice of JobsStatistics, one per group, with:
// - ID: The group identifier (username, project name, cluster name, etc.)
// - Name: Display name (for users, from hpc_user.name; empty for other groups)
// - Statistics: totalJobs, totalUsers, totalWalltime, resource usage metrics
//
// Security: Respects user roles via SecurityCheck - users see only their own data unless admin/support.
// Performance: Results are sorted in SQL and pagination applied before scanning rows.
func (r *JobRepository) JobsStatsGrouped(
ctx context.Context,
filter []*model.JobFilter,
page *model.PageRequest,
sortBy *model.SortByAggregate,
groupBy *model.Aggregate,
reqFields map[string]bool,
) ([]*model.JobsStatistics, error) {
start := time.Now()
col := groupBy2column[*groupBy]
query := r.buildStatsQuery(filter, col, config.Keys.ShortRunningJobsDuration, reqFields)
query := r.buildStatsQuery(filter, col)
query, err := SecurityCheck(ctx, query)
if err != nil {
@@ -255,75 +230,97 @@ func (r *JobRepository) JobsStatsGrouped(
query = query.Offset((uint64(page.Page) - 1) * limit).Limit(limit)
}
rows, err := query.RunWith(r.DB).QueryContext(ctx)
rows, err := query.RunWith(r.DB).Query()
if err != nil {
cclog.Warn("Error while querying DB for job statistics")
return nil, err
}
defer rows.Close()
stats := make([]*model.JobsStatistics, 0, 100)
for rows.Next() {
var id sql.NullString
var jobs, users, walltime, nodes, nodeHours, cores, coreHours, accs, accHours, runningJobs, shortJobs sql.NullInt64
if err := rows.Scan(&id, &jobs, &users, &walltime, &nodes, &nodeHours, &cores, &coreHours, &accs, &accHours, &runningJobs, &shortJobs); err != nil {
var name sql.NullString
var jobs, users, walltime, nodes, nodeHours, cores, coreHours, accs, accHours sql.NullInt64
if err := rows.Scan(&id, &name, &jobs, &users, &walltime, &nodes, &nodeHours, &cores, &coreHours, &accs, &accHours); err != nil {
cclog.Warnf("Error while scanning rows: %s", err.Error())
return nil, err
}
if id.Valid {
stats = append(stats,
&model.JobsStatistics{
ID: id.String,
TotalJobs: int(jobs.Int64),
TotalUsers: int(users.Int64),
TotalWalltime: int(walltime.Int64),
TotalNodes: int(nodes.Int64),
TotalNodeHours: int(nodeHours.Int64),
TotalCores: int(cores.Int64),
TotalCoreHours: int(coreHours.Int64),
TotalAccs: int(accs.Int64),
TotalAccHours: int(accHours.Int64),
RunningJobs: int(runningJobs.Int64),
ShortJobs: int(shortJobs.Int64),
})
}
}
var totalJobs, totalUsers, totalWalltime, totalNodes, totalNodeHours, totalCores, totalCoreHours, totalAccs, totalAccHours int
var personName string
if err := rows.Err(); err != nil {
return nil, err
}
// Post-query name lookup for user grouping (avoids LEFT JOIN on aggregation query)
if col == "job.hpc_user" && len(stats) > 0 {
usernames := make([]any, len(stats))
for i, s := range stats {
usernames[i] = s.ID
}
nameQuery := sq.Select("username", "name").From("hpc_user").Where(sq.Eq{"username": usernames})
nameRows, err := nameQuery.RunWith(r.DB).QueryContext(ctx)
if err != nil {
cclog.Warnf("Error looking up user names: %s", err.Error())
// Non-fatal: stats are still valid without display names
} else {
defer nameRows.Close()
nameMap := make(map[string]string, len(stats))
for nameRows.Next() {
var username, name string
if err := nameRows.Scan(&username, &name); err == nil {
nameMap[username] = name
}
if name.Valid {
personName = name.String
}
for _, s := range stats {
if name, ok := nameMap[s.ID]; ok {
s.Name = name
}
if jobs.Valid {
totalJobs = int(jobs.Int64)
}
if users.Valid {
totalUsers = int(users.Int64)
}
if walltime.Valid {
totalWalltime = int(walltime.Int64)
}
if nodes.Valid {
totalNodes = int(nodes.Int64)
}
if cores.Valid {
totalCores = int(cores.Int64)
}
if accs.Valid {
totalAccs = int(accs.Int64)
}
if nodeHours.Valid {
totalNodeHours = int(nodeHours.Int64)
}
if coreHours.Valid {
totalCoreHours = int(coreHours.Int64)
}
if accHours.Valid {
totalAccHours = int(accHours.Int64)
}
if col == "job.hpc_user" {
// name := r.getUserName(ctx, id.String)
stats = append(stats,
&model.JobsStatistics{
ID: id.String,
Name: personName,
TotalJobs: totalJobs,
TotalWalltime: totalWalltime,
TotalNodes: totalNodes,
TotalNodeHours: totalNodeHours,
TotalCores: totalCores,
TotalCoreHours: totalCoreHours,
TotalAccs: totalAccs,
TotalAccHours: totalAccHours,
})
} else {
stats = append(stats,
&model.JobsStatistics{
ID: id.String,
TotalJobs: totalJobs,
TotalUsers: totalUsers,
TotalWalltime: totalWalltime,
TotalNodes: totalNodes,
TotalNodeHours: totalNodeHours,
TotalCores: totalCores,
TotalCoreHours: totalCoreHours,
TotalAccs: totalAccs,
TotalAccHours: totalAccHours,
})
}
}
}
cclog.Debugf("Timer JobsStatsGrouped %s", time.Since(start))
return stats, nil
}
@@ -345,20 +342,19 @@ func (r *JobRepository) JobsStatsGrouped(
func (r *JobRepository) JobsStats(
ctx context.Context,
filter []*model.JobFilter,
reqFields map[string]bool,
) ([]*model.JobsStatistics, error) {
start := time.Now()
query := r.buildStatsQuery(filter, "", config.Keys.ShortRunningJobsDuration, reqFields)
query := r.buildStatsQuery(filter, "")
query, err := SecurityCheck(ctx, query)
if err != nil {
return nil, err
}
row := query.RunWith(r.DB).QueryRowContext(ctx)
row := query.RunWith(r.DB).QueryRow()
stats := make([]*model.JobsStatistics, 0, 1)
var jobs, users, walltime, nodes, nodeHours, cores, coreHours, accs, accHours, runningJobs, shortJobs sql.NullInt64
if err := row.Scan(&jobs, &users, &walltime, &nodes, &nodeHours, &cores, &coreHours, &accs, &accHours, &runningJobs, &shortJobs); err != nil {
var jobs, users, walltime, nodes, nodeHours, cores, coreHours, accs, accHours sql.NullInt64
if err := row.Scan(&jobs, &users, &walltime, &nodes, &nodeHours, &cores, &coreHours, &accs, &accHours); err != nil {
cclog.Warn("Error while scanning rows")
return nil, err
}
@@ -383,8 +379,6 @@ func (r *JobRepository) JobsStats(
TotalNodeHours: totalNodeHours,
TotalCoreHours: totalCoreHours,
TotalAccHours: totalAccHours,
RunningJobs: int(runningJobs.Int64),
ShortJobs: int(shortJobs.Int64),
})
}
@@ -441,12 +435,11 @@ func (r *JobRepository) JobCountGrouped(
if err != nil {
return nil, err
}
rows, err := query.RunWith(r.DB).QueryContext(ctx)
rows, err := query.RunWith(r.DB).Query()
if err != nil {
cclog.Warn("Error while querying DB for job statistics")
return nil, err
}
defer rows.Close()
stats := make([]*model.JobsStatistics, 0, 100)
@@ -466,10 +459,6 @@ func (r *JobRepository) JobCountGrouped(
}
}
if err := rows.Err(); err != nil {
return nil, err
}
cclog.Debugf("Timer JobCountGrouped %s", time.Since(start))
return stats, nil
}
@@ -502,12 +491,11 @@ func (r *JobRepository) AddJobCountGrouped(
if err != nil {
return nil, err
}
rows, err := query.RunWith(r.DB).QueryContext(ctx)
rows, err := query.RunWith(r.DB).Query()
if err != nil {
cclog.Warn("Error while querying DB for job statistics")
return nil, err
}
defer rows.Close()
counts := make(map[string]int)
@@ -523,10 +511,6 @@ func (r *JobRepository) AddJobCountGrouped(
}
}
if err := rows.Err(); err != nil {
return nil, err
}
switch kind {
case "running":
for _, s := range stats {
@@ -566,13 +550,23 @@ func (r *JobRepository) AddJobCount(
if err != nil {
return nil, err
}
var cnt sql.NullInt64
if err := query.RunWith(r.DB).QueryRowContext(ctx).Scan(&cnt); err != nil {
cclog.Warn("Error while querying DB for job count")
rows, err := query.RunWith(r.DB).Query()
if err != nil {
cclog.Warn("Error while querying DB for job statistics")
return nil, err
}
count := int(cnt.Int64)
var count int
for rows.Next() {
var cnt sql.NullInt64
if err := rows.Scan(&cnt); err != nil {
cclog.Warn("Error while scanning rows")
return nil, err
}
count = int(cnt.Int64)
}
switch kind {
case "running":
@@ -642,7 +636,7 @@ func (r *JobRepository) AddHistograms(
var err error
// Return X-Values always as seconds, will be formatted into minutes and hours in frontend
value := fmt.Sprintf(`CAST(ROUND(((CASE WHEN job.job_state = 'running' THEN %d - job.start_time ELSE job.duration END) / %d) + 1) as int) as value`, time.Now().Unix(), targetBinSize)
value := fmt.Sprintf(`CAST(ROUND(((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) / %d) + 1) as int) as value`, time.Now().Unix(), targetBinSize)
stat.HistDuration, err = r.jobsDurationStatisticsHistogram(ctx, value, filter, targetBinSize, &targetBinCount)
if err != nil {
cclog.Warn("Error while loading job statistics histogram: job duration")
@@ -746,7 +740,7 @@ func (r *JobRepository) jobsStatisticsHistogram(
) ([]*model.HistoPoint, error) {
start := time.Now()
query, qerr := SecurityCheck(ctx,
sq.Select(value, "COUNT(*) AS count").From("job"))
sq.Select(value, "COUNT(job.id) AS count").From("job"))
if qerr != nil {
return nil, qerr
@@ -756,12 +750,11 @@ func (r *JobRepository) jobsStatisticsHistogram(
query = BuildWhereClause(f, query)
}
rows, err := query.GroupBy("value").RunWith(r.DB).QueryContext(ctx)
rows, err := query.GroupBy("value").RunWith(r.DB).Query()
if err != nil {
cclog.Error("Error while running query")
return nil, err
}
defer rows.Close()
points := make([]*model.HistoPoint, 0)
// is it possible to introduce zero values here? requires info about bincount
@@ -774,11 +767,6 @@ func (r *JobRepository) jobsStatisticsHistogram(
points = append(points, &point)
}
if err := rows.Err(); err != nil {
return nil, err
}
cclog.Debugf("Timer jobsStatisticsHistogram %s", time.Since(start))
return points, nil
}
@@ -812,7 +800,7 @@ func (r *JobRepository) jobsDurationStatisticsHistogram(
) ([]*model.HistoPoint, error) {
start := time.Now()
query, qerr := SecurityCheck(ctx,
sq.Select(value, "COUNT(*) AS count").From("job"))
sq.Select(value, "COUNT(job.id) AS count").From("job"))
if qerr != nil {
return nil, qerr
@@ -820,31 +808,24 @@ func (r *JobRepository) jobsDurationStatisticsHistogram(
// Each bin represents a duration range: bin N = [N*binSizeSeconds, (N+1)*binSizeSeconds)
// Example: binSizeSeconds=3600 (1 hour), bin 1 = 0-1h, bin 2 = 1-2h, etc.
points := make([]*model.HistoPoint, 0, *targetBinCount)
points := make([]*model.HistoPoint, 0)
for i := 1; i <= *targetBinCount; i++ {
point := model.HistoPoint{Value: i * binSizeSeconds, Count: 0}
points = append(points, &point)
}
// Build a map from bin value (seconds) to slice index for O(1) lookup.
binMap := make(map[int]int, len(points))
for i, p := range points {
binMap[p.Value] = i
}
for _, f := range filters {
query = BuildWhereClause(f, query)
}
rows, err := query.GroupBy("value").RunWith(r.DB).QueryContext(ctx)
rows, err := query.GroupBy("value").RunWith(r.DB).Query()
if err != nil {
cclog.Error("Error while running query")
return nil, err
}
defer rows.Close()
// Match query results to pre-initialized bins using map lookup.
// point.Value from query is the bin number; multiply by binSizeSeconds to get bin key.
// Match query results to pre-initialized bins.
// point.Value from query is the bin number; multiply by binSizeSeconds to match bin.Value.
for rows.Next() {
point := model.HistoPoint{}
if err := rows.Scan(&point.Value, &point.Count); err != nil {
@@ -852,15 +833,14 @@ func (r *JobRepository) jobsDurationStatisticsHistogram(
return nil, err
}
if idx, ok := binMap[point.Value*binSizeSeconds]; ok {
points[idx].Count = point.Count
for _, e := range points {
if e.Value == (point.Value * binSizeSeconds) {
e.Count = point.Count
break
}
}
}
if err := rows.Err(); err != nil {
return nil, err
}
cclog.Debugf("Timer jobsStatisticsHistogram %s", time.Since(start))
return points, nil
}
@@ -963,34 +943,24 @@ func (r *JobRepository) jobsMetricStatisticsHistogram(
mainQuery = mainQuery.GroupBy("bin").OrderBy("bin")
rows, err := mainQuery.RunWith(r.DB).QueryContext(ctx)
rows, err := mainQuery.RunWith(r.DB).Query()
if err != nil {
cclog.Errorf("Error while running mainQuery: %s", err)
return nil, err
}
defer rows.Close()
// Pre-initialize bins with calculated min/max ranges.
// Example: peak=1000, bins=10 -> bin 1=[0,100), bin 2=[100,200), ..., bin 10=[900,1000]
points := make([]*model.MetricHistoPoint, 0, *bins)
points := make([]*model.MetricHistoPoint, 0)
binStep := int(peak) / *bins
for i := 1; i <= *bins; i++ {
binMin := (binStep * (i - 1))
binMax := (binStep * i)
idx := i
epoint := model.MetricHistoPoint{Bin: &idx, Count: 0, Min: &binMin, Max: &binMax}
epoint := model.MetricHistoPoint{Bin: &i, Count: 0, Min: &binMin, Max: &binMax}
points = append(points, &epoint)
}
// Build a map from bin number to slice index for O(1) lookup.
binMap := make(map[int]int, len(points))
for i, p := range points {
if p.Bin != nil {
binMap[*p.Bin] = i
}
}
// Match query results to pre-initialized bins using map lookup.
// Match query results to pre-initialized bins.
for rows.Next() {
rpoint := model.MetricHistoPoint{}
if err := rows.Scan(&rpoint.Bin, &rpoint.Count); err != nil {
@@ -998,17 +968,14 @@ func (r *JobRepository) jobsMetricStatisticsHistogram(
return nil, err
}
if rpoint.Bin != nil {
if idx, ok := binMap[*rpoint.Bin]; ok {
points[idx].Count = rpoint.Count
for _, e := range points {
if e.Bin != nil && rpoint.Bin != nil && *e.Bin == *rpoint.Bin {
e.Count = rpoint.Count
break
}
}
}
if err := rows.Err(); err != nil {
return nil, err
}
result := model.MetricHistoPoints{Metric: metric, Unit: unit, Stat: &footprintStat, Data: points}
cclog.Debugf("Timer jobsStatisticsHistogram %s", time.Since(start))

View File

@@ -14,7 +14,7 @@ import (
func TestBuildJobStatsQuery(t *testing.T) {
r := setup(t)
q := r.buildStatsQuery(nil, "USER", 300, nil)
q := r.buildStatsQuery(nil, "USER")
sql, _, err := q.ToSql()
noErr(t, err)
@@ -29,7 +29,7 @@ func TestJobStats(t *testing.T) {
err := r.DB.QueryRow(`SELECT COUNT(*) FROM job`).Scan(&expectedCount)
noErr(t, err)
stats, err := r.JobsStats(getContext(t), []*model.JobFilter{}, nil)
stats, err := r.JobsStats(getContext(t), []*model.JobFilter{})
noErr(t, err)
if stats[0].TotalJobs != expectedCount {

View File

@@ -283,7 +283,6 @@ func (r *JobRepository) CountTags(user *schema.User) (tags []schema.Tag, counts
if err != nil {
return nil, nil, err
}
defer xrows.Close()
for xrows.Next() {
var t schema.Tag
@@ -301,10 +300,6 @@ func (r *JobRepository) CountTags(user *schema.User) (tags []schema.Tag, counts
}
}
if err := xrows.Err(); err != nil {
return nil, nil, err
}
// Query and Count Jobs with attached Tags
q := sq.Select("t.tag_type, t.tag_name, t.id, count(jt.tag_id)").
From("tag t").
@@ -339,7 +334,6 @@ func (r *JobRepository) CountTags(user *schema.User) (tags []schema.Tag, counts
if err != nil {
return nil, nil, err
}
defer rows.Close()
counts = make(map[string]int)
for rows.Next() {
@@ -517,7 +511,6 @@ func (r *JobRepository) GetTags(user *schema.User, job *int64) ([]*schema.Tag, e
cclog.Errorf("Error get tags with %s: %v", s, err)
return nil, err
}
defer rows.Close()
tags := make([]*schema.Tag, 0)
for rows.Next() {
@@ -536,10 +529,6 @@ func (r *JobRepository) GetTags(user *schema.User, job *int64) ([]*schema.Tag, e
}
}
if err := rows.Err(); err != nil {
return nil, err
}
return tags, nil
}
@@ -555,7 +544,6 @@ func (r *JobRepository) GetTagsDirect(job *int64) ([]*schema.Tag, error) {
cclog.Errorf("Error get tags with %s: %v", s, err)
return nil, err
}
defer rows.Close()
tags := make([]*schema.Tag, 0)
for rows.Next() {
@@ -567,10 +555,6 @@ func (r *JobRepository) GetTagsDirect(job *int64) ([]*schema.Tag, error) {
tags = append(tags, tag)
}
if err := rows.Err(); err != nil {
return nil, err
}
return tags, nil
}
@@ -598,7 +582,6 @@ func (r *JobRepository) getArchiveTags(job *int64) ([]*schema.Tag, error) {
cclog.Errorf("Error get tags with %s: %v", s, err)
return nil, err
}
defer rows.Close()
tags := make([]*schema.Tag, 0)
for rows.Next() {
@@ -610,10 +593,6 @@ func (r *JobRepository) getArchiveTags(job *int64) ([]*schema.Tag, error) {
tags = append(tags, tag)
}
if err := rows.Err(); err != nil {
return nil, err
}
return tags, nil
}

View File

@@ -102,7 +102,6 @@ func (r *UserRepository) GetLdapUsernames() ([]string, error) {
cclog.Warn("Error while querying usernames")
return nil, err
}
defer rows.Close()
for rows.Next() {
var username string

View File

@@ -10,7 +10,6 @@ import (
"encoding/json"
"fmt"
"maps"
"math"
"os"
"path/filepath"
"strings"
@@ -112,29 +111,6 @@ type JobClassTagger struct {
getMetricConfig func(cluster, subCluster string) map[string]*schema.Metric
}
// roundEnv returns a copy of env with all float64 values rounded to 2 decimal places.
// Nested map[string]any and map[string]float64 values are recursed into.
func roundEnv(env map[string]any) map[string]any {
rounded := make(map[string]any, len(env))
for k, v := range env {
switch val := v.(type) {
case float64:
rounded[k] = math.Round(val*100) / 100
case map[string]any:
rounded[k] = roundEnv(val)
case map[string]float64:
rm := make(map[string]float64, len(val))
for mk, mv := range val {
rm[mk] = math.Round(mv*100) / 100
}
rounded[k] = rm
default:
rounded[k] = v
}
}
return rounded
}
func (t *JobClassTagger) prepareRule(b []byte, fns string) {
var rule RuleFormat
if err := json.NewDecoder(bytes.NewReader(b)).Decode(&rule); err != nil {
@@ -432,7 +408,7 @@ func (t *JobClassTagger) Match(job *schema.Job) {
// process hint template
var msg bytes.Buffer
if err := ri.hint.Execute(&msg, roundEnv(env)); err != nil {
if err := ri.hint.Execute(&msg, env); err != nil {
cclog.Errorf("Template error: %s", err.Error())
continue
}

View File

@@ -20,14 +20,14 @@
"wonka": "^6.3.5"
},
"devDependencies": {
"@rollup/plugin-commonjs": "^29.0.2",
"@rollup/plugin-commonjs": "^29.0.1",
"@rollup/plugin-node-resolve": "^16.0.3",
"@rollup/plugin-terser": "^1.0.0",
"@timohausmann/quadtree-js": "^1.2.6",
"rollup": "^4.59.0",
"rollup-plugin-css-only": "^4.5.5",
"rollup-plugin-svelte": "^7.2.3",
"svelte": "^5.53.9"
"svelte": "^5.53.7"
}
},
"node_modules/@0no-co/graphql.web": {
@@ -126,9 +126,9 @@
}
},
"node_modules/@rollup/plugin-commonjs": {
"version": "29.0.2",
"resolved": "https://registry.npmjs.org/@rollup/plugin-commonjs/-/plugin-commonjs-29.0.2.tgz",
"integrity": "sha512-S/ggWH1LU7jTyi9DxZOKyxpVd4hF/OZ0JrEbeLjXk/DFXwRny0tjD2c992zOUYQobLrVkRVMDdmHP16HKP7GRg==",
"version": "29.0.1",
"resolved": "https://registry.npmjs.org/@rollup/plugin-commonjs/-/plugin-commonjs-29.0.1.tgz",
"integrity": "sha512-VUEHINN2rQEWPfNUR3mzidRObM1XZKXMQsaG6qBlDqd6M1qyw91nDZvcSozgyjt3x/QKrgKBc5MdxfdxAy6tdg==",
"dev": true,
"license": "MIT",
"dependencies": {
@@ -328,9 +328,6 @@
"cpu": [
"arm"
],
"libc": [
"glibc"
],
"license": "MIT",
"optional": true,
"os": [
@@ -344,9 +341,6 @@
"cpu": [
"arm"
],
"libc": [
"musl"
],
"license": "MIT",
"optional": true,
"os": [
@@ -360,9 +354,6 @@
"cpu": [
"arm64"
],
"libc": [
"glibc"
],
"license": "MIT",
"optional": true,
"os": [
@@ -376,9 +367,6 @@
"cpu": [
"arm64"
],
"libc": [
"musl"
],
"license": "MIT",
"optional": true,
"os": [
@@ -392,9 +380,6 @@
"cpu": [
"loong64"
],
"libc": [
"glibc"
],
"license": "MIT",
"optional": true,
"os": [
@@ -408,9 +393,6 @@
"cpu": [
"loong64"
],
"libc": [
"musl"
],
"license": "MIT",
"optional": true,
"os": [
@@ -424,9 +406,6 @@
"cpu": [
"ppc64"
],
"libc": [
"glibc"
],
"license": "MIT",
"optional": true,
"os": [
@@ -440,9 +419,6 @@
"cpu": [
"ppc64"
],
"libc": [
"musl"
],
"license": "MIT",
"optional": true,
"os": [
@@ -456,9 +432,6 @@
"cpu": [
"riscv64"
],
"libc": [
"glibc"
],
"license": "MIT",
"optional": true,
"os": [
@@ -472,9 +445,6 @@
"cpu": [
"riscv64"
],
"libc": [
"musl"
],
"license": "MIT",
"optional": true,
"os": [
@@ -488,9 +458,6 @@
"cpu": [
"s390x"
],
"libc": [
"glibc"
],
"license": "MIT",
"optional": true,
"os": [
@@ -504,9 +471,6 @@
"cpu": [
"x64"
],
"libc": [
"glibc"
],
"license": "MIT",
"optional": true,
"os": [
@@ -520,9 +484,6 @@
"cpu": [
"x64"
],
"libc": [
"musl"
],
"license": "MIT",
"optional": true,
"os": [
@@ -790,9 +751,9 @@
}
},
"node_modules/devalue": {
"version": "5.6.3",
"resolved": "https://registry.npmjs.org/devalue/-/devalue-5.6.3.tgz",
"integrity": "sha512-nc7XjUU/2Lb+SvEFVGcWLiKkzfw8+qHI7zn8WYXKkLMgfGSHbgCEaR6bJpev8Cm6Rmrb19Gfd/tZvGqx9is3wg==",
"version": "5.6.4",
"resolved": "https://registry.npmjs.org/devalue/-/devalue-5.6.4.tgz",
"integrity": "sha512-Gp6rDldRsFh/7XuouDbxMH3Mx8GMCcgzIb1pDTvNyn8pZGQ22u+Wa+lGV9dQCltFQ7uVw0MhRyb8XDskNFOReA==",
"license": "MIT"
},
"node_modules/escape-latex": {
@@ -1193,9 +1154,9 @@
}
},
"node_modules/svelte": {
"version": "5.53.9",
"resolved": "https://registry.npmjs.org/svelte/-/svelte-5.53.9.tgz",
"integrity": "sha512-MwDfWsN8qZzeP0jlQsWF4k/4B3csb3IbzCRggF+L/QqY7T8bbKvnChEo1cPZztF51HJQhilDbevWYl2LvXbquA==",
"version": "5.53.7",
"resolved": "https://registry.npmjs.org/svelte/-/svelte-5.53.7.tgz",
"integrity": "sha512-uxck1KI7JWtlfP3H6HOWi/94soAl23jsGJkBzN2BAWcQng0+lTrRNhxActFqORgnO9BHVd1hKJhG+ljRuIUWfQ==",
"license": "MIT",
"dependencies": {
"@jridgewell/remapping": "^2.3.4",

View File

@@ -7,14 +7,14 @@
"dev": "rollup -c -w"
},
"devDependencies": {
"@rollup/plugin-commonjs": "^29.0.2",
"@rollup/plugin-commonjs": "^29.0.1",
"@rollup/plugin-node-resolve": "^16.0.3",
"@rollup/plugin-terser": "^1.0.0",
"@timohausmann/quadtree-js": "^1.2.6",
"rollup": "^4.59.0",
"rollup-plugin-css-only": "^4.5.5",
"rollup-plugin-svelte": "^7.2.3",
"svelte": "^5.53.9"
"svelte": "^5.53.7"
},
"dependencies": {
"@rollup/plugin-replace": "^6.0.3",

View File

@@ -302,11 +302,11 @@
if (subclusterData) {
for (let i = 0; i < subclusterData.length; i++) {
const flopsData = subclusterData[i]?.metrics?.find((s) => s.name == "flops_any")
const memBwData = subclusterData[i]?.metrics?.find((s) => s.name == "mem_bw")
const flopsData = subclusterData[i].metrics.find((s) => s.name == "flops_any")
const memBwData = subclusterData[i].metrics.find((s) => s.name == "mem_bw")
const f = flopsData?.metric?.series[0]?.statistics?.avg || 0;
const m = memBwData?.metric?.series[0]?.statistics?.avg || 0;
const f = flopsData.metric.series[0].statistics.avg
const m = memBwData.metric.series[0].statistics.avg
let intensity = f / m
if (Number.isNaN(intensity) || !Number.isFinite(intensity)) {

View File

@@ -34,7 +34,6 @@
formatDurationTime
} from "./generic/units.js";
import Filters from "./generic/Filters.svelte";
import Pagination from "./generic/joblist/Pagination.svelte";
/* Svelte 5 Props */
let {
@@ -52,8 +51,6 @@
let jobFilters = $state([]);
let nameFilter = $state("");
let sorting = $state({ field: "totalJobs", direction: "desc" });
let page = $state(1);
let itemsPerPage = $state(25);
/* Derived Vars */
const fetchRunning = $derived(jobFilters.some(jf => jf?.state?.length == 1 && jf?.state?.includes("running")));
@@ -67,12 +64,6 @@
const sortedRows = $derived(
$stats.data ? sort($stats.data.rows, sorting, nameFilter) : []
);
const paginatedRows = $derived(
sortedRows.slice((page - 1) * itemsPerPage, page * itemsPerPage)
);
/* Reset page when sorting or filter changes */
$effect(() => { sorting; nameFilter; page = 1; });
let stats = $derived(
queryStore({
@@ -369,7 +360,7 @@
>
</tr>
{:else if $stats.data}
{#each paginatedRows as row (row.id)}
{#each sort($stats.data.rows, sorting, nameFilter) as row (row.id)}
<tr>
<td>
{#if type == "USER"}
@@ -411,16 +402,3 @@
{/if}
</tbody>
</Table>
{#if sortedRows.length > 0}
<Pagination
{page}
{itemsPerPage}
totalItems={sortedRows.length}
itemText={type === 'USER' ? 'Users' : 'Projects'}
pageSizes={[25, 50, 100]}
updatePaging={(detail) => {
itemsPerPage = detail.itemsPerPage;
page = detail.page;
}}
/>
{/if}

View File

@@ -167,7 +167,7 @@
<InputGroup>
<InputGroupText><Icon name="hdd" /></InputGroupText>
<InputGroupText>Selected Node</InputGroupText>
<Input style="background-color: white;" type="text" value="{hostname} [{cluster} {$nodeMetricsData?.data?.nodeMetrics[0] ? `(${$nodeMetricsData.data.nodeMetrics[0].subCluster})` : ''}]" disabled/>
<Input style="background-color: white;" type="text" value="{hostname} [{cluster} {$nodeMetricsData?.data ? `(${$nodeMetricsData.data.nodeMetrics[0].subCluster})` : ''}]" disabled/>
</InputGroup>
</Col>
<!-- State Col -->
@@ -259,7 +259,7 @@
</CardHeader>
<CardBody>
<p>No dataset(s) returned for <b>{item.name}</b></p>
<p class="mb-1">Metric has been disabled for subcluster <b>{$nodeMetricsData?.data?.nodeMetrics[0]?.subCluster}</b>.</p>
<p class="mb-1">Metric has been disabled for subcluster <b>{$nodeMetricsData.data.nodeMetrics[0].subCluster}</b>.</p>
</CardBody>
</Card>
{:else if item?.metric}
@@ -267,7 +267,7 @@
metric={item.name}
timestep={item.metric.timestep}
cluster={clusterInfos.find((c) => c.name == cluster)}
subCluster={$nodeMetricsData?.data?.nodeMetrics[0]?.subCluster}
subCluster={$nodeMetricsData.data.nodeMetrics[0].subCluster}
series={item.metric.series}
enableFlip
forNode
@@ -286,17 +286,17 @@
{/snippet}
<PlotGrid
items={$nodeMetricsData?.data?.nodeMetrics[0]?.metrics
items={$nodeMetricsData.data.nodeMetrics[0].metrics
.map((m) => ({
...m,
availability: checkMetricAvailability(
globalMetrics,
m.name,
cluster,
$nodeMetricsData?.data?.nodeMetrics[0]?.subCluster,
$nodeMetricsData.data.nodeMetrics[0].subCluster,
),
}))
.sort((a, b) => a.name.localeCompare(b.name)) || []}
.sort((a, b) => a.name.localeCompare(b.name))}
itemsPerRow={ccconfig.plotConfiguration_plotsPerRow}
{gridContent}
/>

View File

@@ -29,7 +29,7 @@
/* Const */
const minEnergyPreset = 1;
const maxEnergyPreset = 100;
const maxEnergyPreset = 1000;
/* Derived */
// Pending