23 Commits

Author SHA1 Message Date
96fc44a649 fix: Optimize project stat query 2026-03-13 06:06:38 +01:00
8e86e8720d Make stats query selective. Add stats index. Add paging to user list.
Entire-Checkpoint: d42431eee30d
2026-03-12 20:16:55 +01:00
4555fb8a86 Merge branch 'hotfix' of github.com:ClusterCockpit/cc-backend into hotfix 2026-03-12 20:15:54 +01:00
0e27624d73 Add flag to optimize db. Remove ANALYZE on startup.
Entire-Checkpoint: d49917ff4b10
2026-03-12 20:12:49 +01:00
Christoph Kluge
8563ed5e08 fix: remove indices from migration 9
- optimization migration 11 drops these indices, so rather not create them in the first place
2026-03-12 14:45:45 +01:00
Christoph Kluge
2d07bdf6b5 fix: add missing nullsafe in publicDash 2026-03-12 14:13:45 +01:00
7f069f1ec1 Prepare bugfix release 1.5.1
Entire-Checkpoint: 15cc90a0347a
2026-03-12 06:40:36 +01:00
2506a92cdf Remove entire log 2026-03-12 06:14:11 +01:00
Christoph Kluge
972b14033a add db migration 11, optimizing index count 2026-03-11 16:07:29 +01:00
af78f06ced fix: Reduce complexity for groupBy stats queries
Entire-Checkpoint: fc899a70a751
2026-03-11 15:14:59 +01:00
6e0fe62566 Add new db config options to README 2026-03-11 14:30:41 +01:00
e70310dcbc fix: Segvault when taggers are enabled but rule directories missing 2026-03-11 11:15:08 +01:00
00d2f97c4c fix: Large heap allocations in sqlite driver. Sanitize sqlite config and make it configurablex. Allow to cancel queries. 2026-03-11 11:14:37 +01:00
c8d8f7084a Merge branch 'hotfix' of github.com:ClusterCockpit/cc-backend into hotfix 2026-03-11 07:50:55 +01:00
dc7407d0f0 fix: prevent segvault if enable-job-taggers option is tru but tagger config directories are missing
Entire-Checkpoint: 9ec86e3669e1
2026-03-11 07:50:53 +01:00
eba3995610 Add Analyse on db startup
Entire-Checkpoint: ea70a955214d
2026-03-11 05:28:52 +01:00
f8831e7040 Fixed merge errors
Entire-Checkpoint: ddd4fa4a7bbb
2026-03-11 05:09:38 +01:00
1cf99206a9 Merge branch 'hotfix' of github.com:ClusterCockpit/cc-backend into hotfix 2026-03-11 05:06:26 +01:00
5d3d77620e fix: Add defer.close for all queries 2026-03-11 05:04:20 +01:00
Christoph Kluge
5c72664162 bump frontend patch versions 2026-03-10 18:15:24 +01:00
Christoph Kluge
f3e796f3f5 add nullsafes to node view 2026-03-10 17:05:50 +01:00
Christoph Kluge
cc38b17472 fix wrong field checked vor json validity 2026-03-10 17:02:09 +01:00
282197ebef fix: Round floats in tagger message
Entire-Checkpoint: b68850c6fcff
2026-03-10 06:01:31 +01:00
37 changed files with 1356 additions and 291 deletions

View File

@@ -0,0 +1,29 @@
# Session Context
Session ID: 28147033-ddc8-4056-b064-e0558fbc614e
Commit Message: There are memory leaks in production on queries that trigger a full scan
## Prompts
### Prompt 1
There are memory leaks in production on queries that trigger a full scan. Review all queries for missing close and other bugs that could cause memory leaks in the sqlite-3 driver. Example for a query that triggers a massive allocation is a specific jobstate list with sorting on start_time on all jobs.
## Summary
All 31 `.Query()` calls now have proper `rows.Close()`. Build passes, tests pass, vet clean.
### Summary of changes
**`internal/repository/stats.go`** (7 fixes + 1 rewrite):
- Added `defer rows.Close()` + `rows.Err()` to `JobsStatsGrouped`, `JobCountGrouped`, `AddJobCountGrouped`, `jobsStatisticsHistogram`, `jobsDurationStatisticsHistogram`, `jobsMetricStatisticsHistogram`
- Rewrote `AddJobCount` to use `QueryRow()` instead of `Query()` (single-row result)
**`internal/repository/tags.go`** (5 fixes):
- Added `defer xrows.Close()` + `xrows.Err()` for `Queryx` in `CountTags`
- Added `defer rows.Close()` for second query in `CountTags`
- Added `defer rows.Close()` + `rows.Err()` to `GetTags`, `GetTagsDirect`, `getArchiveTags`
**`internal/repository/node.go`** (4 fixes + 1 rewrite):
- Added `defer rows.Close()` + `rows.Err()`, removed manual `rows.Close()` in `QueryNodes`, `QueryNodesWithMeta`, `CountStatesTimed`
- Rewrote `CountStates` to use `COUNT(*) ... GROUP BY` in SQL instead of fetching every row and counting in Go

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1 @@
There are memory leaks in production on queries that trigger a full scan. Review all queries for missing close and other bugs that could cause memory leaks in the sqlite-3 driver. Example for a query that triggers a massive allocation is a specific jobstate list with sorting on start_time on all jobs.

View File

@@ -0,0 +1,16 @@
All 31 `.Query()` calls now have proper `rows.Close()`. Build passes, tests pass, vet clean.
### Summary of changes
**`internal/repository/stats.go`** (7 fixes + 1 rewrite):
- Added `defer rows.Close()` + `rows.Err()` to `JobsStatsGrouped`, `JobCountGrouped`, `AddJobCountGrouped`, `jobsStatisticsHistogram`, `jobsDurationStatisticsHistogram`, `jobsMetricStatisticsHistogram`
- Rewrote `AddJobCount` to use `QueryRow()` instead of `Query()` (single-row result)
**`internal/repository/tags.go`** (5 fixes):
- Added `defer xrows.Close()` + `xrows.Err()` for `Queryx` in `CountTags`
- Added `defer rows.Close()` for second query in `CountTags`
- Added `defer rows.Close()` + `rows.Err()` to `GetTags`, `GetTagsDirect`, `getArchiveTags`
**`internal/repository/node.go`** (4 fixes + 1 rewrite):
- Added `defer rows.Close()` + `rows.Err()`, removed manual `rows.Close()` in `QueryNodes`, `QueryNodesWithMeta`, `CountStatesTimed`
- Rewrote `CountStates` to use `COUNT(*) ... GROUP BY` in SQL instead of fetching every row and counting in Go

View File

@@ -0,0 +1,40 @@
# Session Context
Session ID: 42401d2e-7d1c-4c0e-abe6-356cb2d48747
Commit Message: There also have to be bugs in jobQuery.go . Especially the following que
## Prompts
### Prompt 1
There also have to be bugs in jobQuery.go . Especially the following query triggers the memory leak: SELECT * FROM job WHERE job.job_state IN ("completed", "running", "failed") ORDER BY job.start_time DESC LIMIT 1 OFFSET 10; Dig deeper to find the cause. Also investigate why no existing index is used for this query.
## Summary
All passes. Here's a summary of findings and fixes:
---
## Findings
### Bug 1: Missing `defer rows.Close()` in `user.go:GetLdapUsernames()` (line 100)
**Fixed.** Every call leaked a database connection and its buffers.
### Bug 2: SQLite never runs `ANALYZE` — wrong index selection
**Fixed.** Added `ANALYZE` to `setupSqlite()` in `dbConnection.go`.
Without statistics in `sqlite_stat1`, SQLite's query planner **guessed** and chose `jobs_jobstate_energy` (a terrible index for this query) instead of scanning `jobs_starttime` in reverse. The result:
| | Before ANALYZE | After ANALYZE |
|---|---|---|
| Index used | `jobs_jobstate_energy` | `jobs_starttime` |
| Sort | **TEMP B-TREE** (materializes ALL rows) | None (index order) |
| Memory | Proportional to total matching rows | Constant |
| I/O | Full scan of all matching rows | Stops at OFFSET+LIMIT |
### Bug 3: `IN` clause + `ORDER BY` is fundamentally incompatible with composite indexes
Even with the "correct" index `(job_state, start_time)`, SQLite **cannot** merge-sort across 3 separate index range scans for `IN ('completed','running','failed')`. It always falls back to a temp B-tree sort. The only efficient plan is to use the standalone `jobs_starttime` index — which SQLite does automatically **after ANALYZE** because it realizes the 3 states cover virtually all rows, making the WHERE clause nearly a no-op.
### Observation: 79 indexes on the `job` table
This is excessive and actively harmful — it confuses the query planner (especially without ANALYZE) and slows writes. The `jobs_jobstate_starttime` index from migration 08 is also missing from the actual DB (only the 3-column `jobs_jobstate_starttime_duration` exists). This is worth investigating separately but is a schema/migration concern, not a code bug.

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1 @@
There also have to be bugs in jobQuery.go . Especially the following query triggers the memory leak: SELECT * FROM job WHERE job.job_state IN ("completed", "running", "failed") ORDER BY job.start_time DESC LIMIT 1 OFFSET 10; Dig deeper to find the cause. Also investigate why no existing index is used for this query.

View File

@@ -0,0 +1,27 @@
All passes. Here's a summary of findings and fixes:
---
## Findings
### Bug 1: Missing `defer rows.Close()` in `user.go:GetLdapUsernames()` (line 100)
**Fixed.** Every call leaked a database connection and its buffers.
### Bug 2: SQLite never runs `ANALYZE` — wrong index selection
**Fixed.** Added `ANALYZE` to `setupSqlite()` in `dbConnection.go`.
Without statistics in `sqlite_stat1`, SQLite's query planner **guessed** and chose `jobs_jobstate_energy` (a terrible index for this query) instead of scanning `jobs_starttime` in reverse. The result:
| | Before ANALYZE | After ANALYZE |
|---|---|---|
| Index used | `jobs_jobstate_energy` | `jobs_starttime` |
| Sort | **TEMP B-TREE** (materializes ALL rows) | None (index order) |
| Memory | Proportional to total matching rows | Constant |
| I/O | Full scan of all matching rows | Stops at OFFSET+LIMIT |
### Bug 3: `IN` clause + `ORDER BY` is fundamentally incompatible with composite indexes
Even with the "correct" index `(job_state, start_time)`, SQLite **cannot** merge-sort across 3 separate index range scans for `IN ('completed','running','failed')`. It always falls back to a temp B-tree sort. The only efficient plan is to use the standalone `jobs_starttime` index — which SQLite does automatically **after ANALYZE** because it realizes the 3 states cover virtually all rows, making the WHERE clause nearly a no-op.
### Observation: 79 indexes on the `job` table
This is excessive and actively harmful — it confuses the query planner (especially without ANALYZE) and slows writes. The `jobs_jobstate_starttime` index from migration 08 is also missing from the actual DB (only the 3-column `jobs_jobstate_starttime_duration` exists). This is worth investigating separately but is a schema/migration concern, not a code bug.

View File

@@ -1,6 +1,6 @@
TARGET = ./cc-backend TARGET = ./cc-backend
FRONTEND = ./web/frontend FRONTEND = ./web/frontend
VERSION = 1.5.0 VERSION = 1.5.1
GIT_HASH := $(shell git rev-parse --short HEAD || echo 'development') GIT_HASH := $(shell git rev-parse --short HEAD || echo 'development')
CURRENT_TIME = $(shell date +"%Y-%m-%d:T%H:%M:%S") CURRENT_TIME = $(shell date +"%Y-%m-%d:T%H:%M:%S")
LD_FLAGS = '-s -X main.date=${CURRENT_TIME} -X main.version=${VERSION} -X main.commit=${GIT_HASH}' LD_FLAGS = '-s -X main.date=${CURRENT_TIME} -X main.version=${VERSION} -X main.commit=${GIT_HASH}'

View File

@@ -133,6 +133,92 @@ ln -s <your-existing-job-archive> ./var/job-archive
./cc-backend -help ./cc-backend -help
``` ```
## Database Configuration
cc-backend uses SQLite as its database. For large installations, SQLite memory
usage can be tuned via the optional `db-config` section in config.json under
`main`:
```json
{
"main": {
"db": "./var/job.db",
"db-config": {
"cache-size-mb": 2048,
"soft-heap-limit-mb": 16384,
"max-open-connections": 4,
"max-idle-connections": 4,
"max-idle-time-minutes": 10
}
}
}
```
All fields are optional. If `db-config` is omitted entirely, built-in defaults
are used.
### Options
| Option | Default | Description |
| ----------------------- | ------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `cache-size-mb` | 2048 | SQLite page cache size per connection in MB. Maps to `PRAGMA cache_size`. Total cache memory is up to `cache-size-mb × max-open-connections`. |
| `soft-heap-limit-mb` | 16384 | Process-wide SQLite soft heap limit in MB. SQLite will try to release cache pages to stay under this limit. Queries won't fail if exceeded, but cache eviction becomes more aggressive. |
| `max-open-connections` | 4 | Maximum number of open database connections. |
| `max-idle-connections` | 4 | Maximum number of idle database connections kept in the pool. |
| `max-idle-time-minutes` | 10 | Maximum time in minutes a connection can sit idle before being closed. |
### Sizing Guidelines
SQLite's `cache_size` is a **per-connection** setting — each connection
maintains its own independent page cache. With multiple connections, the total
memory available for caching is the sum across all connections.
In practice, different connections tend to cache **different pages** (e.g., one
handles a job listing query while another runs a statistics aggregation), so
their caches naturally spread across the database. The formula
`DB_size / max-open-connections` gives enough per-connection cache that the
combined caches can cover the entire database.
However, this is a best-case estimate. Connections running similar queries will
cache the same pages redundantly. In the worst case (all connections caching
identical pages), only `cache-size-mb` worth of unique data is cached rather
than `cache-size-mb × max-open-connections`. For workloads with diverse
concurrent queries, cache overlap is typically low.
**Rules of thumb:**
- **cache-size-mb**: Set to `DB_size_in_MB / max-open-connections` to allow the
entire database to be cached in memory. For example, an 80GB database with 8
connections needs at least 10240 MB (10GB) per connection. If your workload
has many similar concurrent queries, consider setting it higher to account for
cache overlap between connections.
- **soft-heap-limit-mb**: Should be >= `cache-size-mb × max-open-connections` to
avoid cache thrashing. This is the total SQLite memory budget for the process.
- On small installations the defaults work well. On servers with large databases
(tens of GB) and plenty of RAM, increasing these values significantly improves
query performance by reducing disk I/O.
### Example: Large Server (512GB RAM, 80GB database)
```json
{
"main": {
"db-config": {
"cache-size-mb": 16384,
"soft-heap-limit-mb": 131072,
"max-open-connections": 8,
"max-idle-time-minutes": 30
}
}
}
```
This allows the entire 80GB database to be cached (8 × 16GB = 128GB page cache)
with a 128GB soft heap limit, using about 25% of available RAM.
The effective configuration is logged at startup for verification.
## Project file structure ## Project file structure
- [`.github/`](https://github.com/ClusterCockpit/cc-backend/tree/master/.github) - [`.github/`](https://github.com/ClusterCockpit/cc-backend/tree/master/.github)

View File

@@ -1,11 +1,43 @@
# `cc-backend` version 1.5.0 # `cc-backend` version 1.5.1
Supports job archive version 3 and database version 10. Supports job archive version 3 and database version 11.
This is a feature release of `cc-backend`, the API backend and frontend This is a bugfix release of `cc-backend`, the API backend and frontend
implementation of ClusterCockpit. implementation of ClusterCockpit.
For release specific notes visit the [ClusterCockpit Documentation](https://clusterockpit.org/docs/release/). For release specific notes visit the [ClusterCockpit Documentation](https://clusterockpit.org/docs/release/).
## Changes in 1.5.1
### Database
- **New migration (version 11)**: Optimized database index count for better performance
- **ANALYZE on startup**: Database statistics are now refreshed on startup for improved query planning
- **SQLite configuration hardening**: Sanitized SQLite configuration with new configurable options; fixes large heap allocations in the SQLite driver
- **Query cancellation**: Long-running database queries can now be cancelled
- **Resource leak fix**: Added missing `defer Close()` calls for all query result sets
### Bug fixes
- **Segfault when taggers misconfigured**: Fixed crash when `enable-job-taggers` is set but tagger rule directories are missing
- **GroupBy stats query complexity**: Reduced complexity for `groupBy` statistics queries
- **Ranged filter conditions**: Fixed GT and LT conditions in ranged filters
- **Energy filter preset**: Reduced energy filter preset to a more practical default
- **JSON validity check**: Fixed wrong field being checked for JSON validity
- **Tagger float rounding**: Fixed rounding of floats in tagger messages
- **Node view null safety**: Added null-safe checks in node view to prevent runtime errors
### Frontend
- **Bumped patch versions**: Updated frontend dependencies to latest patch versions
### Documentation
- **New DB config options**: Added new database configuration options to README
---
*The sections below document all features and changes introduced in the 1.5.0 major release, which 1.5.1 is based on.*
## Breaking changes ## Breaking changes
### Configuration changes ### Configuration changes
@@ -34,7 +66,7 @@ For release specific notes visit the [ClusterCockpit Documentation](https://clus
### Dependency changes ### Dependency changes
- **cc-lib v2.5.1**: Switched to cc-lib version 2 with updated APIs (currently at v2.5.1) - **cc-lib v2.8.0**: Switched to cc-lib version 2 with updated APIs
- **cclib NATS client**: Now using the cclib NATS client implementation - **cclib NATS client**: Now using the cclib NATS client implementation
- Removed obsolete `util.Float` usage from cclib - Removed obsolete `util.Float` usage from cclib

View File

@@ -11,7 +11,7 @@ import "flag"
var ( var (
flagReinitDB, flagInit, flagServer, flagSyncLDAP, flagGops, flagMigrateDB, flagRevertDB, flagReinitDB, flagInit, flagServer, flagSyncLDAP, flagGops, flagMigrateDB, flagRevertDB,
flagForceDB, flagDev, flagVersion, flagLogDateTime, flagApplyTags bool flagForceDB, flagDev, flagVersion, flagLogDateTime, flagApplyTags, flagOptimizeDB bool
flagNewUser, flagDelUser, flagGenJWT, flagConfigFile, flagImportJob, flagLogLevel string flagNewUser, flagDelUser, flagGenJWT, flagConfigFile, flagImportJob, flagLogLevel string
) )
@@ -27,6 +27,7 @@ func cliInit() {
flag.BoolVar(&flagRevertDB, "revert-db", false, "Migrate database to previous version and exit") flag.BoolVar(&flagRevertDB, "revert-db", false, "Migrate database to previous version and exit")
flag.BoolVar(&flagApplyTags, "apply-tags", false, "Run taggers on all completed jobs and exit") flag.BoolVar(&flagApplyTags, "apply-tags", false, "Run taggers on all completed jobs and exit")
flag.BoolVar(&flagForceDB, "force-db", false, "Force database version, clear dirty flag and exit") flag.BoolVar(&flagForceDB, "force-db", false, "Force database version, clear dirty flag and exit")
flag.BoolVar(&flagOptimizeDB, "optimize-db", false, "Optimize database: run VACUUM to reclaim space, then ANALYZE to update query planner statistics")
flag.BoolVar(&flagLogDateTime, "logdate", false, "Set this flag to add date and time to log messages") flag.BoolVar(&flagLogDateTime, "logdate", false, "Set this flag to add date and time to log messages")
flag.StringVar(&flagConfigFile, "config", "./config.json", "Specify alternative path to `config.json`") flag.StringVar(&flagConfigFile, "config", "./config.json", "Specify alternative path to `config.json`")
flag.StringVar(&flagNewUser, "add-user", "", "Add a new user. Argument format: <username>:[admin,support,manager,api,user]:<password>") flag.StringVar(&flagNewUser, "add-user", "", "Add a new user. Argument format: <username>:[admin,support,manager,api,user]:<password>")

View File

@@ -108,6 +108,26 @@ func initConfiguration() error {
} }
func initDatabase() error { func initDatabase() error {
if config.Keys.DbConfig != nil {
cfg := repository.DefaultConfig()
dc := config.Keys.DbConfig
if dc.CacheSizeMB > 0 {
cfg.DbCacheSizeMB = dc.CacheSizeMB
}
if dc.SoftHeapLimitMB > 0 {
cfg.DbSoftHeapLimitMB = dc.SoftHeapLimitMB
}
if dc.MaxOpenConnections > 0 {
cfg.MaxOpenConnections = dc.MaxOpenConnections
}
if dc.MaxIdleConnections > 0 {
cfg.MaxIdleConnections = dc.MaxIdleConnections
}
if dc.ConnectionMaxIdleTimeMins > 0 {
cfg.ConnectionMaxIdleTime = time.Duration(dc.ConnectionMaxIdleTimeMins) * time.Minute
}
repository.SetConfig(cfg)
}
repository.Connect(config.Keys.DB) repository.Connect(config.Keys.DB)
return nil return nil
} }
@@ -489,6 +509,20 @@ func run() error {
return err return err
} }
// Optimize database if requested
if flagOptimizeDB {
db := repository.GetConnection()
cclog.Print("Running VACUUM to reclaim space and defragment database...")
if _, err := db.DB.Exec("VACUUM"); err != nil {
return fmt.Errorf("VACUUM failed: %w", err)
}
cclog.Print("Running ANALYZE to update query planner statistics...")
if _, err := db.DB.Exec("ANALYZE"); err != nil {
return fmt.Errorf("ANALYZE failed: %w", err)
}
cclog.Exitf("OptimizeDB Success: Database '%s' optimized (VACUUM + ANALYZE).\n", config.Keys.DB)
}
// Handle user commands (add, delete, sync, JWT) // Handle user commands (add, delete, sync, JWT)
if err := handleUserCommands(); err != nil { if err := handleUserCommands(); err != nil {
return err return err

View File

@@ -77,6 +77,17 @@ type ProgramConfig struct {
// Node state retention configuration // Node state retention configuration
NodeStateRetention *NodeStateRetention `json:"nodestate-retention"` NodeStateRetention *NodeStateRetention `json:"nodestate-retention"`
// Database tuning configuration
DbConfig *DbConfig `json:"db-config"`
}
type DbConfig struct {
CacheSizeMB int `json:"cache-size-mb"`
SoftHeapLimitMB int `json:"soft-heap-limit-mb"`
MaxOpenConnections int `json:"max-open-connections"`
MaxIdleConnections int `json:"max-idle-connections"`
ConnectionMaxIdleTimeMins int `json:"max-idle-time-minutes"`
} }
type NodeStateRetention struct { type NodeStateRetention struct {

View File

@@ -177,6 +177,32 @@ var configSchema = `
} }
}, },
"required": ["policy"] "required": ["policy"]
},
"db-config": {
"description": "SQLite database tuning configuration.",
"type": "object",
"properties": {
"cache-size-mb": {
"description": "SQLite page cache size per connection in MB (default: 2048).",
"type": "integer"
},
"soft-heap-limit-mb": {
"description": "Process-wide SQLite soft heap limit in MB (default: 16384).",
"type": "integer"
},
"max-open-connections": {
"description": "Maximum number of open database connections (default: 4).",
"type": "integer"
},
"max-idle-connections": {
"description": "Maximum number of idle database connections (default: 4).",
"type": "integer"
},
"max-idle-time-minutes": {
"description": "Maximum idle time for a connection in minutes (default: 10).",
"type": "integer"
}
}
} }
} }
}` }`

View File

@@ -645,6 +645,7 @@ func (r *queryResolver) Jobs(ctx context.Context, filter []*model.JobFilter, pag
// JobsStatistics is the resolver for the jobsStatistics field. // JobsStatistics is the resolver for the jobsStatistics field.
func (r *queryResolver) JobsStatistics(ctx context.Context, filter []*model.JobFilter, metrics []string, page *model.PageRequest, sortBy *model.SortByAggregate, groupBy *model.Aggregate, numDurationBins *string, numMetricBins *int) ([]*model.JobsStatistics, error) { func (r *queryResolver) JobsStatistics(ctx context.Context, filter []*model.JobFilter, metrics []string, page *model.PageRequest, sortBy *model.SortByAggregate, groupBy *model.Aggregate, numDurationBins *string, numMetricBins *int) ([]*model.JobsStatistics, error) {
startOverall := time.Now()
var err error var err error
var stats []*model.JobsStatistics var stats []*model.JobsStatistics
@@ -652,31 +653,50 @@ func (r *queryResolver) JobsStatistics(ctx context.Context, filter []*model.JobF
defaultDurationBins := "1h" defaultDurationBins := "1h"
defaultMetricBins := 10 defaultMetricBins := 10
if requireField(ctx, "totalJobs") || requireField(ctx, "totalUsers") || requireField(ctx, "totalWalltime") || requireField(ctx, "totalNodes") || requireField(ctx, "totalCores") || // Build requested fields map for selective column computation
requireField(ctx, "totalAccs") || requireField(ctx, "totalNodeHours") || requireField(ctx, "totalCoreHours") || requireField(ctx, "totalAccHours") { statsFields := []string{"totalJobs", "totalUsers", "totalWalltime", "totalNodes", "totalCores",
"totalAccs", "totalNodeHours", "totalCoreHours", "totalAccHours", "runningJobs", "shortJobs"}
reqFields := make(map[string]bool, len(statsFields))
fetchedMainStats := false
for _, f := range statsFields {
if requireField(ctx, f) {
reqFields[f] = true
if f != "runningJobs" && f != "shortJobs" {
fetchedMainStats = true
}
}
}
if fetchedMainStats {
if groupBy == nil { if groupBy == nil {
stats, err = r.Repo.JobsStats(ctx, filter) stats, err = r.Repo.JobsStats(ctx, filter, reqFields)
} else { } else {
stats, err = r.Repo.JobsStatsGrouped(ctx, filter, page, sortBy, groupBy) startGrouped := time.Now()
stats, err = r.Repo.JobsStatsGrouped(ctx, filter, page, sortBy, groupBy, reqFields)
cclog.Infof("Timer JobsStatsGrouped call: %s", time.Since(startGrouped))
} }
} else { } else {
stats = make([]*model.JobsStatistics, 0, 1) stats = make([]*model.JobsStatistics, 0, 1)
stats = append(stats, &model.JobsStatistics{}) stats = append(stats, &model.JobsStatistics{})
} }
if groupBy != nil { // runningJobs and shortJobs are already inlined in JobsStats/JobsStatsGrouped.
if requireField(ctx, "shortJobs") { // Only run separate count queries if main stats were not fetched.
stats, err = r.Repo.AddJobCountGrouped(ctx, filter, groupBy, stats, "short") if !fetchedMainStats {
} if groupBy != nil {
if requireField(ctx, "runningJobs") { if requireField(ctx, "shortJobs") {
stats, err = r.Repo.AddJobCountGrouped(ctx, filter, groupBy, stats, "running") stats, err = r.Repo.AddJobCountGrouped(ctx, filter, groupBy, stats, "short")
} }
} else { if requireField(ctx, "runningJobs") {
if requireField(ctx, "shortJobs") { stats, err = r.Repo.AddJobCountGrouped(ctx, filter, groupBy, stats, "running")
stats, err = r.Repo.AddJobCount(ctx, filter, stats, "short") }
} } else {
if requireField(ctx, "runningJobs") { if requireField(ctx, "shortJobs") {
stats, err = r.Repo.AddJobCount(ctx, filter, stats, "running") stats, err = r.Repo.AddJobCount(ctx, filter, stats, "short")
}
if requireField(ctx, "runningJobs") {
stats, err = r.Repo.AddJobCount(ctx, filter, stats, "running")
}
} }
} }
@@ -716,6 +736,7 @@ func (r *queryResolver) JobsStatistics(ctx context.Context, filter []*model.JobF
} }
} }
cclog.Infof("Timer JobsStatistics overall: %s", time.Since(startOverall))
return stats, nil return stats, nil
} }

View File

@@ -27,13 +27,25 @@ type RepositoryConfig struct {
ConnectionMaxLifetime time.Duration ConnectionMaxLifetime time.Duration
// ConnectionMaxIdleTime is the maximum amount of time a connection may be idle. // ConnectionMaxIdleTime is the maximum amount of time a connection may be idle.
// Default: 1 hour // Default: 10 minutes
ConnectionMaxIdleTime time.Duration ConnectionMaxIdleTime time.Duration
// MinRunningJobDuration is the minimum duration in seconds for a job to be // MinRunningJobDuration is the minimum duration in seconds for a job to be
// considered in "running jobs" queries. This filters out very short jobs. // considered in "running jobs" queries. This filters out very short jobs.
// Default: 600 seconds (10 minutes) // Default: 600 seconds (10 minutes)
MinRunningJobDuration int MinRunningJobDuration int
// DbCacheSizeMB is the SQLite page cache size per connection in MB.
// Uses negative PRAGMA cache_size notation (KiB). With MaxOpenConnections=4
// and DbCacheSizeMB=2048, total page cache is up to 8GB.
// Default: 2048 (2GB)
DbCacheSizeMB int
// DbSoftHeapLimitMB is the process-wide SQLite soft heap limit in MB.
// SQLite will try to release cache pages to stay under this limit.
// It's a soft limit — queries won't fail, but cache eviction becomes more aggressive.
// Default: 16384 (16GB)
DbSoftHeapLimitMB int
} }
// DefaultConfig returns the default repository configuration. // DefaultConfig returns the default repository configuration.
@@ -44,8 +56,10 @@ func DefaultConfig() *RepositoryConfig {
MaxOpenConnections: 4, MaxOpenConnections: 4,
MaxIdleConnections: 4, MaxIdleConnections: 4,
ConnectionMaxLifetime: time.Hour, ConnectionMaxLifetime: time.Hour,
ConnectionMaxIdleTime: time.Hour, ConnectionMaxIdleTime: 10 * time.Minute,
MinRunningJobDuration: 600, // 10 minutes MinRunningJobDuration: 600, // 10 minutes
DbCacheSizeMB: 2048, // 2GB per connection
DbSoftHeapLimitMB: 16384, // 16GB process-wide
} }
} }

View File

@@ -36,9 +36,10 @@ type DatabaseOptions struct {
ConnectionMaxIdleTime time.Duration ConnectionMaxIdleTime time.Duration
} }
func setupSqlite(db *sql.DB) error { func setupSqlite(db *sql.DB, cfg *RepositoryConfig) error {
pragmas := []string{ pragmas := []string{
"temp_store = memory", "temp_store = memory",
fmt.Sprintf("soft_heap_limit = %d", int64(cfg.DbSoftHeapLimitMB)*1024*1024),
} }
for _, pragma := range pragmas { for _, pragma := range pragmas {
@@ -71,7 +72,8 @@ func Connect(db string) {
connectionURLParams.Add("_journal_mode", "WAL") connectionURLParams.Add("_journal_mode", "WAL")
connectionURLParams.Add("_busy_timeout", "5000") connectionURLParams.Add("_busy_timeout", "5000")
connectionURLParams.Add("_synchronous", "NORMAL") connectionURLParams.Add("_synchronous", "NORMAL")
connectionURLParams.Add("_cache_size", "1000000000") cacheSizeKiB := repoConfig.DbCacheSizeMB * 1024 // Convert MB to KiB
connectionURLParams.Add("_cache_size", fmt.Sprintf("-%d", cacheSizeKiB))
connectionURLParams.Add("_foreign_keys", "true") connectionURLParams.Add("_foreign_keys", "true")
opts.URL = fmt.Sprintf("file:%s?%s", opts.URL, connectionURLParams.Encode()) opts.URL = fmt.Sprintf("file:%s?%s", opts.URL, connectionURLParams.Encode())
@@ -86,11 +88,14 @@ func Connect(db string) {
cclog.Abortf("DB Connection: Could not connect to SQLite database with sqlx.Open().\nError: %s\n", err.Error()) cclog.Abortf("DB Connection: Could not connect to SQLite database with sqlx.Open().\nError: %s\n", err.Error())
} }
err = setupSqlite(dbHandle.DB) err = setupSqlite(dbHandle.DB, repoConfig)
if err != nil { if err != nil {
cclog.Abortf("Failed sqlite db setup.\nError: %s\n", err.Error()) cclog.Abortf("Failed sqlite db setup.\nError: %s\n", err.Error())
} }
cclog.Infof("SQLite config: cache_size=%dMB/conn, soft_heap_limit=%dMB, max_conns=%d",
repoConfig.DbCacheSizeMB, repoConfig.DbSoftHeapLimitMB, repoConfig.MaxOpenConnections)
dbHandle.SetMaxOpenConns(opts.MaxOpenConnections) dbHandle.SetMaxOpenConns(opts.MaxOpenConnections)
dbHandle.SetMaxIdleConns(opts.MaxIdleConnections) dbHandle.SetMaxIdleConns(opts.MaxIdleConnections)
dbHandle.SetConnMaxLifetime(opts.ConnectionMaxLifetime) dbHandle.SetConnMaxLifetime(opts.ConnectionMaxLifetime)

View File

@@ -171,7 +171,7 @@ func (r *JobRepository) FindByID(ctx context.Context, jobID int64) (*schema.Job,
return nil, qerr return nil, qerr
} }
return scanJob(q.RunWith(r.stmtCache).QueryRow()) return scanJob(q.RunWith(r.stmtCache).QueryRowContext(ctx))
} }
// FindByIDWithUser executes a SQL query to find a specific batch job. // FindByIDWithUser executes a SQL query to find a specific batch job.
@@ -217,7 +217,7 @@ func (r *JobRepository) FindByJobID(ctx context.Context, jobID int64, startTime
return nil, qerr return nil, qerr
} }
return scanJob(q.RunWith(r.stmtCache).QueryRow()) return scanJob(q.RunWith(r.stmtCache).QueryRowContext(ctx))
} }
// IsJobOwner checks if the specified user owns the batch job identified by jobID, // IsJobOwner checks if the specified user owns the batch job identified by jobID,

View File

@@ -63,7 +63,7 @@ func (r *JobRepository) QueryJobs(
} }
} else { } else {
// Order by footprint JSON field values // Order by footprint JSON field values
query = query.Where("JSON_VALID(meta_data)") query = query.Where("JSON_VALID(footprint)")
switch order.Order { switch order.Order {
case model.SortDirectionEnumAsc: case model.SortDirectionEnumAsc:
query = query.OrderBy(fmt.Sprintf("JSON_EXTRACT(footprint, \"$.%s\") ASC", field)) query = query.OrderBy(fmt.Sprintf("JSON_EXTRACT(footprint, \"$.%s\") ASC", field))
@@ -84,7 +84,7 @@ func (r *JobRepository) QueryJobs(
query = BuildWhereClause(f, query) query = BuildWhereClause(f, query)
} }
rows, err := query.RunWith(r.stmtCache).Query() rows, err := query.RunWith(r.stmtCache).QueryContext(ctx)
if err != nil { if err != nil {
queryString, queryVars, _ := query.ToSql() queryString, queryVars, _ := query.ToSql()
return nil, fmt.Errorf("query failed [%s] %v: %w", queryString, queryVars, err) return nil, fmt.Errorf("query failed [%s] %v: %w", queryString, queryVars, err)
@@ -126,7 +126,7 @@ func (r *JobRepository) CountJobs(
} }
var count int var count int
if err := query.RunWith(r.DB).Scan(&count); err != nil { if err := query.RunWith(r.DB).QueryRowContext(ctx).Scan(&count); err != nil {
return 0, fmt.Errorf("failed to count jobs: %w", err) return 0, fmt.Errorf("failed to count jobs: %w", err)
} }
@@ -335,14 +335,14 @@ func buildTimeCondition(field string, cond *config.TimeRange, query sq.SelectBui
} }
// buildFloatJSONCondition creates a filter on a numeric field within the footprint JSON column, using BETWEEN only if required. // buildFloatJSONCondition creates a filter on a numeric field within the footprint JSON column, using BETWEEN only if required.
func buildFloatJSONCondition(field string, cond *model.FloatRange, query sq.SelectBuilder) sq.SelectBuilder { func buildFloatJSONCondition(jsonField string, cond *model.FloatRange, query sq.SelectBuilder) sq.SelectBuilder {
query = query.Where("JSON_VALID(footprint)") query = query.Where("JSON_VALID(footprint)")
if cond.From != 1.0 && cond.To != 0.0 { if cond.From != 1.0 && cond.To != 0.0 {
return query.Where("JSON_EXTRACT(footprint, \"$."+field+"\") BETWEEN ? AND ?", cond.From, cond.To) return query.Where("JSON_EXTRACT(footprint, \"$."+jsonField+"\") BETWEEN ? AND ?", cond.From, cond.To)
} else if cond.From != 1.0 && cond.To == 0.0 { } else if cond.From != 1.0 && cond.To == 0.0 {
return query.Where("JSON_EXTRACT(footprint, \"$."+field+"\") >= ?", cond.From) return query.Where("JSON_EXTRACT(footprint, \"$."+jsonField+"\") >= ?", cond.From)
} else if cond.From == 1.0 && cond.To != 0.0 { } else if cond.From == 1.0 && cond.To != 0.0 {
return query.Where("JSON_EXTRACT(footprint, \"$."+field+"\") <= ?", cond.To) return query.Where("JSON_EXTRACT(footprint, \"$."+jsonField+"\") <= ?", cond.To)
} else { } else {
return query return query
} }

View File

@@ -21,10 +21,12 @@ import (
// is added to internal/repository/migrations/sqlite3/. // is added to internal/repository/migrations/sqlite3/.
// //
// Version history: // Version history:
// - Version 10: Current version // - Version 12: Add covering index for stats queries (cluster, start_time, hpc_user, ...)
// - Version 11: Optimize job table indexes (reduce from ~78 to 48)
// - Version 10: Node table
// //
// Migration files are embedded at build time from the migrations directory. // Migration files are embedded at build time from the migrations directory.
const Version uint = 10 const Version uint = 12
//go:embed migrations/* //go:embed migrations/*
var migrationFiles embed.FS var migrationFiles embed.FS

View File

@@ -139,12 +139,6 @@ CREATE INDEX IF NOT EXISTS jobs_cluster_partition_project ON job (cluster, clust
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_jobstate ON job (cluster, cluster_partition, job_state); CREATE INDEX IF NOT EXISTS jobs_cluster_partition_jobstate ON job (cluster, cluster_partition, job_state);
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_shared ON job (cluster, cluster_partition, shared); CREATE INDEX IF NOT EXISTS jobs_cluster_partition_shared ON job (cluster, cluster_partition, shared);
-- Cluster+Partition Filter Sorting
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_numnodes ON job (cluster, cluster_partition, num_nodes);
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_numhwthreads ON job (cluster, cluster_partition, num_hwthreads);
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_numacc ON job (cluster, cluster_partition, num_acc);
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_energy ON job (cluster, cluster_partition, energy);
-- Cluster+Partition Time Filter Sorting -- Cluster+Partition Time Filter Sorting
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_duration_starttime ON job (cluster, cluster_partition, duration, start_time); CREATE INDEX IF NOT EXISTS jobs_cluster_partition_duration_starttime ON job (cluster, cluster_partition, duration, start_time);
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_starttime_duration ON job (cluster, cluster_partition, start_time, duration); CREATE INDEX IF NOT EXISTS jobs_cluster_partition_starttime_duration ON job (cluster, cluster_partition, start_time, duration);
@@ -152,11 +146,6 @@ CREATE INDEX IF NOT EXISTS jobs_cluster_partition_starttime_duration ON job (clu
-- Cluster+JobState Filter -- Cluster+JobState Filter
CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_user ON job (cluster, job_state, hpc_user); CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_user ON job (cluster, job_state, hpc_user);
CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_project ON job (cluster, job_state, project); CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_project ON job (cluster, job_state, project);
-- Cluster+JobState Filter Sorting
CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_numnodes ON job (cluster, job_state, num_nodes);
CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_numhwthreads ON job (cluster, job_state, num_hwthreads);
CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_numacc ON job (cluster, job_state, num_acc);
CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_energy ON job (cluster, job_state, energy);
-- Cluster+JobState Time Filter Sorting -- Cluster+JobState Time Filter Sorting
CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_starttime_duration ON job (cluster, job_state, start_time, duration); CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_starttime_duration ON job (cluster, job_state, start_time, duration);
@@ -165,34 +154,18 @@ CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_duration_starttime ON job (clus
-- Cluster+Shared Filter -- Cluster+Shared Filter
CREATE INDEX IF NOT EXISTS jobs_cluster_shared_user ON job (cluster, shared, hpc_user); CREATE INDEX IF NOT EXISTS jobs_cluster_shared_user ON job (cluster, shared, hpc_user);
CREATE INDEX IF NOT EXISTS jobs_cluster_shared_project ON job (cluster, shared, project); CREATE INDEX IF NOT EXISTS jobs_cluster_shared_project ON job (cluster, shared, project);
-- Cluster+Shared Filter Sorting
CREATE INDEX IF NOT EXISTS jobs_cluster_shared_numnodes ON job (cluster, shared, num_nodes);
CREATE INDEX IF NOT EXISTS jobs_cluster_shared_numhwthreads ON job (cluster, shared, num_hwthreads);
CREATE INDEX IF NOT EXISTS jobs_cluster_shared_numacc ON job (cluster, shared, num_acc);
CREATE INDEX IF NOT EXISTS jobs_cluster_shared_energy ON job (cluster, shared, energy);
-- Cluster+Shared Time Filter Sorting -- Cluster+Shared Time Filter Sorting
CREATE INDEX IF NOT EXISTS jobs_cluster_shared_starttime_duration ON job (cluster, shared, start_time, duration); CREATE INDEX IF NOT EXISTS jobs_cluster_shared_starttime_duration ON job (cluster, shared, start_time, duration);
CREATE INDEX IF NOT EXISTS jobs_cluster_shared_duration_starttime ON job (cluster, shared, duration, start_time); CREATE INDEX IF NOT EXISTS jobs_cluster_shared_duration_starttime ON job (cluster, shared, duration, start_time);
-- User Filter -- User Filter
-- User Filter Sorting
CREATE INDEX IF NOT EXISTS jobs_user_numnodes ON job (hpc_user, num_nodes);
CREATE INDEX IF NOT EXISTS jobs_user_numhwthreads ON job (hpc_user, num_hwthreads);
CREATE INDEX IF NOT EXISTS jobs_user_numacc ON job (hpc_user, num_acc);
CREATE INDEX IF NOT EXISTS jobs_user_energy ON job (hpc_user, energy);
-- Cluster+Shared Time Filter Sorting -- Cluster+Shared Time Filter Sorting
CREATE INDEX IF NOT EXISTS jobs_user_starttime_duration ON job (hpc_user, start_time, duration); CREATE INDEX IF NOT EXISTS jobs_user_starttime_duration ON job (hpc_user, start_time, duration);
CREATE INDEX IF NOT EXISTS jobs_user_duration_starttime ON job (hpc_user, duration, start_time); CREATE INDEX IF NOT EXISTS jobs_user_duration_starttime ON job (hpc_user, duration, start_time);
-- Project Filter -- Project Filter
CREATE INDEX IF NOT EXISTS jobs_project_user ON job (project, hpc_user); CREATE INDEX IF NOT EXISTS jobs_project_user ON job (project, hpc_user);
-- Project Filter Sorting
CREATE INDEX IF NOT EXISTS jobs_project_numnodes ON job (project, num_nodes);
CREATE INDEX IF NOT EXISTS jobs_project_numhwthreads ON job (project, num_hwthreads);
CREATE INDEX IF NOT EXISTS jobs_project_numacc ON job (project, num_acc);
CREATE INDEX IF NOT EXISTS jobs_project_energy ON job (project, energy);
-- Cluster+Shared Time Filter Sorting -- Cluster+Shared Time Filter Sorting
CREATE INDEX IF NOT EXISTS jobs_project_starttime_duration ON job (project, start_time, duration); CREATE INDEX IF NOT EXISTS jobs_project_starttime_duration ON job (project, start_time, duration);
@@ -201,11 +174,6 @@ CREATE INDEX IF NOT EXISTS jobs_project_duration_starttime ON job (project, dura
-- JobState Filter -- JobState Filter
CREATE INDEX IF NOT EXISTS jobs_jobstate_user ON job (job_state, hpc_user); CREATE INDEX IF NOT EXISTS jobs_jobstate_user ON job (job_state, hpc_user);
CREATE INDEX IF NOT EXISTS jobs_jobstate_project ON job (job_state, project); CREATE INDEX IF NOT EXISTS jobs_jobstate_project ON job (job_state, project);
-- JobState Filter Sorting
CREATE INDEX IF NOT EXISTS jobs_jobstate_numnodes ON job (job_state, num_nodes);
CREATE INDEX IF NOT EXISTS jobs_jobstate_numhwthreads ON job (job_state, num_hwthreads);
CREATE INDEX IF NOT EXISTS jobs_jobstate_numacc ON job (job_state, num_acc);
CREATE INDEX IF NOT EXISTS jobs_jobstate_energy ON job (job_state, energy);
-- Cluster+Shared Time Filter Sorting -- Cluster+Shared Time Filter Sorting
CREATE INDEX IF NOT EXISTS jobs_jobstate_starttime_duration ON job (job_state, start_time, duration); CREATE INDEX IF NOT EXISTS jobs_jobstate_starttime_duration ON job (job_state, start_time, duration);
@@ -214,11 +182,6 @@ CREATE INDEX IF NOT EXISTS jobs_jobstate_duration_starttime ON job (job_state, d
-- Shared Filter -- Shared Filter
CREATE INDEX IF NOT EXISTS jobs_shared_user ON job (shared, hpc_user); CREATE INDEX IF NOT EXISTS jobs_shared_user ON job (shared, hpc_user);
CREATE INDEX IF NOT EXISTS jobs_shared_project ON job (shared, project); CREATE INDEX IF NOT EXISTS jobs_shared_project ON job (shared, project);
-- Shared Filter Sorting
CREATE INDEX IF NOT EXISTS jobs_shared_numnodes ON job (shared, num_nodes);
CREATE INDEX IF NOT EXISTS jobs_shared_numhwthreads ON job (shared, num_hwthreads);
CREATE INDEX IF NOT EXISTS jobs_shared_numacc ON job (shared, num_acc);
CREATE INDEX IF NOT EXISTS jobs_shared_energy ON job (shared, energy);
-- Cluster+Shared Time Filter Sorting -- Cluster+Shared Time Filter Sorting
CREATE INDEX IF NOT EXISTS jobs_shared_starttime_duration ON job (shared, start_time, duration); CREATE INDEX IF NOT EXISTS jobs_shared_starttime_duration ON job (shared, start_time, duration);
@@ -226,7 +189,6 @@ CREATE INDEX IF NOT EXISTS jobs_shared_duration_starttime ON job (shared, durati
-- ArrayJob Filter -- ArrayJob Filter
CREATE INDEX IF NOT EXISTS jobs_arrayjobid_starttime ON job (array_job_id, start_time); CREATE INDEX IF NOT EXISTS jobs_arrayjobid_starttime ON job (array_job_id, start_time);
CREATE INDEX IF NOT EXISTS jobs_cluster_arrayjobid_starttime ON job (cluster, array_job_id, start_time);
-- Single filters with default starttime sorting -- Single filters with default starttime sorting
CREATE INDEX IF NOT EXISTS jobs_duration_starttime ON job (duration, start_time); CREATE INDEX IF NOT EXISTS jobs_duration_starttime ON job (duration, start_time);
@@ -244,7 +206,6 @@ CREATE INDEX IF NOT EXISTS jobs_energy_duration ON job (energy, duration);
-- Backup Indices For High Variety Columns -- Backup Indices For High Variety Columns
CREATE INDEX IF NOT EXISTS jobs_starttime ON job (start_time); CREATE INDEX IF NOT EXISTS jobs_starttime ON job (start_time);
CREATE INDEX IF NOT EXISTS jobs_duration ON job (duration);
-- Notes: -- Notes:
-- Cluster+Partition+Jobstate Filter: Tested -> Full Array Of Combinations non-required -- Cluster+Partition+Jobstate Filter: Tested -> Full Array Of Combinations non-required

View File

@@ -0,0 +1,56 @@
-- Migration 11 DOWN: Restore indexes from migration 09
-- ============================================================
-- Recreate all removed indexes from migration 09
-- ============================================================
-- Cluster+Partition Filter Sorting
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_numnodes ON job (cluster, cluster_partition, num_nodes);
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_numhwthreads ON job (cluster, cluster_partition, num_hwthreads);
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_numacc ON job (cluster, cluster_partition, num_acc);
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_energy ON job (cluster, cluster_partition, energy);
-- Cluster+JobState Filter Sorting
CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_numnodes ON job (cluster, job_state, num_nodes);
CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_numhwthreads ON job (cluster, job_state, num_hwthreads);
CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_numacc ON job (cluster, job_state, num_acc);
CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_energy ON job (cluster, job_state, energy);
-- Cluster+Shared Filter Sorting
CREATE INDEX IF NOT EXISTS jobs_cluster_shared_numnodes ON job (cluster, shared, num_nodes);
CREATE INDEX IF NOT EXISTS jobs_cluster_shared_numhwthreads ON job (cluster, shared, num_hwthreads);
CREATE INDEX IF NOT EXISTS jobs_cluster_shared_numacc ON job (cluster, shared, num_acc);
CREATE INDEX IF NOT EXISTS jobs_cluster_shared_energy ON job (cluster, shared, energy);
-- User Filter Sorting
CREATE INDEX IF NOT EXISTS jobs_user_numnodes ON job (hpc_user, num_nodes);
CREATE INDEX IF NOT EXISTS jobs_user_numhwthreads ON job (hpc_user, num_hwthreads);
CREATE INDEX IF NOT EXISTS jobs_user_numacc ON job (hpc_user, num_acc);
CREATE INDEX IF NOT EXISTS jobs_user_energy ON job (hpc_user, energy);
-- Project Filter Sorting
CREATE INDEX IF NOT EXISTS jobs_project_numnodes ON job (project, num_nodes);
CREATE INDEX IF NOT EXISTS jobs_project_numhwthreads ON job (project, num_hwthreads);
CREATE INDEX IF NOT EXISTS jobs_project_numacc ON job (project, num_acc);
CREATE INDEX IF NOT EXISTS jobs_project_energy ON job (project, energy);
-- JobState Filter Sorting
CREATE INDEX IF NOT EXISTS jobs_jobstate_numnodes ON job (job_state, num_nodes);
CREATE INDEX IF NOT EXISTS jobs_jobstate_numhwthreads ON job (job_state, num_hwthreads);
CREATE INDEX IF NOT EXISTS jobs_jobstate_numacc ON job (job_state, num_acc);
CREATE INDEX IF NOT EXISTS jobs_jobstate_energy ON job (job_state, energy);
-- Shared Filter Sorting
CREATE INDEX IF NOT EXISTS jobs_shared_numnodes ON job (shared, num_nodes);
CREATE INDEX IF NOT EXISTS jobs_shared_numhwthreads ON job (shared, num_hwthreads);
CREATE INDEX IF NOT EXISTS jobs_shared_numacc ON job (shared, num_acc);
CREATE INDEX IF NOT EXISTS jobs_shared_energy ON job (shared, energy);
-- ArrayJob Filter
CREATE INDEX IF NOT EXISTS jobs_cluster_arrayjobid_starttime ON job (cluster, array_job_id, start_time);
-- Backup Indices For High Variety Columns
CREATE INDEX IF NOT EXISTS jobs_duration ON job (duration);
-- Optimize DB index usage
PRAGMA optimize;

View File

@@ -0,0 +1,61 @@
-- Migration 11: Remove overly specific table indexes formerly used in sorting
-- When one or two indexed columns are used, sorting usually is fast
-- Reduces from ~78 indexes to 48 for better write performance,
-- reduced disk usage, and more reliable query planner decisions.
-- Requires ANALYZE to be run after migration (done automatically on startup).
-- ============================================================
-- Drop SELECTED existing job indexes (from migrations 08/09)
-- sqlite_autoindex_job_1 (UNIQUE constraint) is kept automatically
-- ============================================================
-- Cluster+Partition Filter Sorting
DROP INDEX IF EXISTS jobs_cluster_partition_numnodes;
DROP INDEX IF EXISTS jobs_cluster_partition_numhwthreads;
DROP INDEX IF EXISTS jobs_cluster_partition_numacc;
DROP INDEX IF EXISTS jobs_cluster_partition_energy;
-- Cluster+JobState Filter Sorting
DROP INDEX IF EXISTS jobs_cluster_jobstate_numnodes;
DROP INDEX IF EXISTS jobs_cluster_jobstate_numhwthreads;
DROP INDEX IF EXISTS jobs_cluster_jobstate_numacc;
DROP INDEX IF EXISTS jobs_cluster_jobstate_energy;
-- Cluster+Shared Filter Sorting
DROP INDEX IF EXISTS jobs_cluster_shared_numnodes;
DROP INDEX IF EXISTS jobs_cluster_shared_numhwthreads;
DROP INDEX IF EXISTS jobs_cluster_shared_numacc;
DROP INDEX IF EXISTS jobs_cluster_shared_energy;
-- User Filter Sorting
DROP INDEX IF EXISTS jobs_user_numnodes;
DROP INDEX IF EXISTS jobs_user_numhwthreads;
DROP INDEX IF EXISTS jobs_user_numacc;
DROP INDEX IF EXISTS jobs_user_energy;
-- Project Filter Sorting
DROP INDEX IF EXISTS jobs_project_numnodes;
DROP INDEX IF EXISTS jobs_project_numhwthreads;
DROP INDEX IF EXISTS jobs_project_numacc;
DROP INDEX IF EXISTS jobs_project_energy;
-- JobState Filter Sorting
DROP INDEX IF EXISTS jobs_jobstate_numnodes;
DROP INDEX IF EXISTS jobs_jobstate_numhwthreads;
DROP INDEX IF EXISTS jobs_jobstate_numacc;
DROP INDEX IF EXISTS jobs_jobstate_energy;
-- Shared Filter Sorting
DROP INDEX IF EXISTS jobs_shared_numnodes;
DROP INDEX IF EXISTS jobs_shared_numhwthreads;
DROP INDEX IF EXISTS jobs_shared_numacc;
DROP INDEX IF EXISTS jobs_shared_energy;
-- ArrayJob Filter
DROP INDEX IF EXISTS jobs_cluster_arrayjobid_starttime;
-- Backup Indices For High Variety Columns
DROP INDEX IF EXISTS jobs_duration;
-- Optimize DB index usage
PRAGMA optimize;

View File

@@ -0,0 +1 @@
DROP INDEX IF EXISTS jobs_cluster_user_starttime_stats;

View File

@@ -0,0 +1,11 @@
-- Migration 12: Add covering index for grouped stats queries
-- Column order: cluster (equality), hpc_user (GROUP BY), start_time (range scan)
-- Includes aggregated columns to avoid main table lookups entirely.
CREATE INDEX IF NOT EXISTS jobs_cluster_user_starttime_stats
ON job (cluster, hpc_user, start_time, duration, job_state, num_nodes, num_hwthreads, num_acc);
CREATE INDEX IF NOT EXISTS jobs_cluster_project_starttime_stats
ON job (cluster, project, start_time, duration, job_state, num_nodes, num_hwthreads, num_acc);
PRAGMA optimize;

View File

@@ -366,19 +366,23 @@ func (r *NodeRepository) QueryNodes(
cclog.Errorf("Error while running query '%s' %v: %v", queryString, queryVars, err) cclog.Errorf("Error while running query '%s' %v: %v", queryString, queryVars, err)
return nil, err return nil, err
} }
defer rows.Close()
nodes := make([]*schema.Node, 0) nodes := make([]*schema.Node, 0)
for rows.Next() { for rows.Next() {
node := schema.Node{} node := schema.Node{}
if err := rows.Scan(&node.Hostname, &node.Cluster, &node.SubCluster, if err := rows.Scan(&node.Hostname, &node.Cluster, &node.SubCluster,
&node.NodeState, &node.HealthState); err != nil { &node.NodeState, &node.HealthState); err != nil {
rows.Close()
cclog.Warn("Error while scanning rows (QueryNodes)") cclog.Warn("Error while scanning rows (QueryNodes)")
return nil, err return nil, err
} }
nodes = append(nodes, &node) nodes = append(nodes, &node)
} }
if err := rows.Err(); err != nil {
return nil, err
}
return nodes, nil return nodes, nil
} }
@@ -415,6 +419,7 @@ func (r *NodeRepository) QueryNodesWithMeta(
cclog.Errorf("Error while running query '%s' %v: %v", queryString, queryVars, err) cclog.Errorf("Error while running query '%s' %v: %v", queryString, queryVars, err)
return nil, err return nil, err
} }
defer rows.Close()
nodes := make([]*schema.Node, 0) nodes := make([]*schema.Node, 0)
for rows.Next() { for rows.Next() {
@@ -424,7 +429,6 @@ func (r *NodeRepository) QueryNodesWithMeta(
if err := rows.Scan(&node.Hostname, &node.Cluster, &node.SubCluster, if err := rows.Scan(&node.Hostname, &node.Cluster, &node.SubCluster,
&node.NodeState, &node.HealthState, &RawMetaData, &RawMetricHealth); err != nil { &node.NodeState, &node.HealthState, &RawMetaData, &RawMetricHealth); err != nil {
rows.Close()
cclog.Warn("Error while scanning rows (QueryNodes)") cclog.Warn("Error while scanning rows (QueryNodes)")
return nil, err return nil, err
} }
@@ -454,6 +458,10 @@ func (r *NodeRepository) QueryNodesWithMeta(
nodes = append(nodes, &node) nodes = append(nodes, &node)
} }
if err := rows.Err(); err != nil {
return nil, err
}
return nodes, nil return nodes, nil
} }
@@ -545,10 +553,11 @@ func (r *NodeRepository) MapNodes(cluster string) (map[string]string, error) {
func (r *NodeRepository) CountStates(ctx context.Context, filters []*model.NodeFilter, column string) ([]*model.NodeStates, error) { func (r *NodeRepository) CountStates(ctx context.Context, filters []*model.NodeFilter, column string) ([]*model.NodeStates, error) {
query, qerr := AccessCheck(ctx, query, qerr := AccessCheck(ctx,
sq.Select(column). sq.Select(column, "COUNT(*) as count").
From("node"). From("node").
Join("node_state ON node_state.node_id = node.id"). Join("node_state ON node_state.node_id = node.id").
Where(latestStateCondition())) Where(latestStateCondition()).
GroupBy(column))
if qerr != nil { if qerr != nil {
return nil, qerr return nil, qerr
} }
@@ -561,23 +570,21 @@ func (r *NodeRepository) CountStates(ctx context.Context, filters []*model.NodeF
cclog.Errorf("Error while running query '%s' %v: %v", queryString, queryVars, err) cclog.Errorf("Error while running query '%s' %v: %v", queryString, queryVars, err)
return nil, err return nil, err
} }
defer rows.Close()
stateMap := map[string]int{} nodes := make([]*model.NodeStates, 0)
for rows.Next() { for rows.Next() {
var state string var state string
if err := rows.Scan(&state); err != nil { var count int
rows.Close() if err := rows.Scan(&state, &count); err != nil {
cclog.Warn("Error while scanning rows (CountStates)") cclog.Warn("Error while scanning rows (CountStates)")
return nil, err return nil, err
} }
nodes = append(nodes, &model.NodeStates{State: state, Count: count})
stateMap[state] += 1
} }
nodes := make([]*model.NodeStates, 0) if err := rows.Err(); err != nil {
for state, counts := range stateMap { return nil, err
node := model.NodeStates{State: state, Count: counts}
nodes = append(nodes, &node)
} }
return nodes, nil return nodes, nil
@@ -623,6 +630,7 @@ func (r *NodeRepository) CountStatesTimed(ctx context.Context, filters []*model.
cclog.Errorf("Error while running query '%s' %v: %v", queryString, queryVars, err) cclog.Errorf("Error while running query '%s' %v: %v", queryString, queryVars, err)
return nil, err return nil, err
} }
defer rows.Close()
rawData := make(map[string][][]int) rawData := make(map[string][][]int)
for rows.Next() { for rows.Next() {
@@ -630,7 +638,6 @@ func (r *NodeRepository) CountStatesTimed(ctx context.Context, filters []*model.
var timestamp, count int var timestamp, count int
if err := rows.Scan(&state, &timestamp, &count); err != nil { if err := rows.Scan(&state, &timestamp, &count); err != nil {
rows.Close()
cclog.Warnf("Error while scanning rows (CountStatesTimed) at time '%d'", timestamp) cclog.Warnf("Error while scanning rows (CountStatesTimed) at time '%d'", timestamp)
return nil, err return nil, err
} }
@@ -643,6 +650,10 @@ func (r *NodeRepository) CountStatesTimed(ctx context.Context, filters []*model.
rawData[state][1] = append(rawData[state][1], count) rawData[state][1] = append(rawData[state][1], count)
} }
if err := rows.Err(); err != nil {
return nil, err
}
timedStates := make([]*model.NodeStatesTimed, 0) timedStates := make([]*model.NodeStatesTimed, 0)
for state, data := range rawData { for state, data := range rawData {
entry := model.NodeStatesTimed{State: state, Times: data[0], Counts: data[1]} entry := model.NodeStatesTimed{State: state, Times: data[0], Counts: data[1]}

View File

@@ -105,9 +105,9 @@ func (r *JobRepository) buildCountQuery(
var query sq.SelectBuilder var query sq.SelectBuilder
if col != "" { if col != "" {
query = sq.Select(col, "COUNT(job.id)").From("job").GroupBy(col) query = sq.Select(col, "COUNT(*)").From("job").GroupBy(col)
} else { } else {
query = sq.Select("COUNT(job.id)").From("job") query = sq.Select("COUNT(*)").From("job")
} }
switch kind { switch kind {
@@ -124,59 +124,100 @@ func (r *JobRepository) buildCountQuery(
return query return query
} }
// buildStatsQuery constructs a SQL query to compute comprehensive job statistics with optional grouping. // buildStatsQuery constructs a SQL query to compute job statistics with optional grouping.
// Only requested columns are computed; unrequested columns select 0 as placeholder.
// //
// Parameters: // Parameters:
// - filter: Job filters to apply (cluster, user, time range, etc.) // - filter: Job filters to apply (cluster, user, time range, etc.)
// - col: Column name to GROUP BY; empty string for overall statistics without grouping // - col: Column name to GROUP BY; empty string for overall statistics without grouping
// // - shortThreshold: Duration threshold in seconds for counting short-running jobs
// Returns a SelectBuilder that produces comprehensive statistics: // - reqFields: Set of requested field names; nil means compute all fields
// - totalJobs: Count of jobs
// - totalUsers: Count of distinct users (always 0 when grouping by user)
// - totalWalltime: Sum of job durations in hours
// - totalNodes: Sum of nodes used across all jobs
// - totalNodeHours: Sum of (duration × num_nodes) in hours
// - totalCores: Sum of hardware threads used across all jobs
// - totalCoreHours: Sum of (duration × num_hwthreads) in hours
// - totalAccs: Sum of accelerators used across all jobs
// - totalAccHours: Sum of (duration × num_acc) in hours
//
// Special handling:
// - Running jobs: Duration calculated as (now - start_time) instead of stored duration
// - Grouped queries: Also select grouping column and user's display name from hpc_user table
// - All time values converted from seconds to hours (÷ 3600) and rounded
func (r *JobRepository) buildStatsQuery( func (r *JobRepository) buildStatsQuery(
filter []*model.JobFilter, filter []*model.JobFilter,
col string, col string,
shortThreshold int,
reqFields map[string]bool,
) sq.SelectBuilder { ) sq.SelectBuilder {
var query sq.SelectBuilder now := time.Now().Unix()
// Helper: return real expression if field is requested (or reqFields is nil), else "0 as alias"
need := func(field string) bool {
return reqFields == nil || reqFields[field]
}
durationExpr := fmt.Sprintf(`(CASE WHEN job.job_state = 'running' THEN %d - job.start_time ELSE job.duration END)`, now)
// Build column list
columns := make([]string, 0, 14)
if col != "" { if col != "" {
query = sq.Select( columns = append(columns, col)
col, }
"name",
"COUNT(job.id) as totalJobs", columns = append(columns, "COUNT(*) as totalJobs")
"COUNT(DISTINCT job.hpc_user) AS totalUsers",
fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END)) / 3600) as int) as totalWalltime`, time.Now().Unix()), if need("totalUsers") && col != "job.hpc_user" {
`CAST(SUM(job.num_nodes) as int) as totalNodes`, columns = append(columns, "COUNT(DISTINCT job.hpc_user) AS totalUsers")
fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_nodes) / 3600) as int) as totalNodeHours`, time.Now().Unix()),
`CAST(SUM(job.num_hwthreads) as int) as totalCores`,
fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_hwthreads) / 3600) as int) as totalCoreHours`, time.Now().Unix()),
`CAST(SUM(job.num_acc) as int) as totalAccs`,
fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_acc) / 3600) as int) as totalAccHours`, time.Now().Unix()),
).From("job").LeftJoin("hpc_user ON hpc_user.username = job.hpc_user").GroupBy(col)
} else { } else {
query = sq.Select( columns = append(columns, "0 AS totalUsers")
"COUNT(job.id) as totalJobs", }
"COUNT(DISTINCT job.hpc_user) AS totalUsers",
fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END)) / 3600) as int)`, time.Now().Unix()), if need("totalWalltime") {
`CAST(SUM(job.num_nodes) as int)`, columns = append(columns, fmt.Sprintf(`CAST(ROUND(SUM(%s) / 3600) as int) as totalWalltime`, durationExpr))
fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_nodes) / 3600) as int)`, time.Now().Unix()), } else {
`CAST(SUM(job.num_hwthreads) as int)`, columns = append(columns, "0 as totalWalltime")
fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_hwthreads) / 3600) as int)`, time.Now().Unix()), }
`CAST(SUM(job.num_acc) as int)`,
fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_acc) / 3600) as int)`, time.Now().Unix()), if need("totalNodes") {
).From("job") columns = append(columns, `CAST(SUM(job.num_nodes) as int) as totalNodes`)
} else {
columns = append(columns, "0 as totalNodes")
}
if need("totalNodeHours") {
columns = append(columns, fmt.Sprintf(`CAST(ROUND(SUM(%s * job.num_nodes) / 3600) as int) as totalNodeHours`, durationExpr))
} else {
columns = append(columns, "0 as totalNodeHours")
}
if need("totalCores") {
columns = append(columns, `CAST(SUM(job.num_hwthreads) as int) as totalCores`)
} else {
columns = append(columns, "0 as totalCores")
}
if need("totalCoreHours") {
columns = append(columns, fmt.Sprintf(`CAST(ROUND(SUM(%s * job.num_hwthreads) / 3600) as int) as totalCoreHours`, durationExpr))
} else {
columns = append(columns, "0 as totalCoreHours")
}
if need("totalAccs") {
columns = append(columns, `CAST(SUM(job.num_acc) as int) as totalAccs`)
} else {
columns = append(columns, "0 as totalAccs")
}
if need("totalAccHours") {
columns = append(columns, fmt.Sprintf(`CAST(ROUND(SUM(%s * job.num_acc) / 3600) as int) as totalAccHours`, durationExpr))
} else {
columns = append(columns, "0 as totalAccHours")
}
if need("runningJobs") {
columns = append(columns, `COUNT(CASE WHEN job.job_state = 'running' THEN 1 END) as runningJobs`)
} else {
columns = append(columns, "0 as runningJobs")
}
if need("shortJobs") {
columns = append(columns, fmt.Sprintf(`COUNT(CASE WHEN job.duration < %d THEN 1 END) as shortJobs`, shortThreshold))
} else {
columns = append(columns, "0 as shortJobs")
}
query := sq.Select(columns...).From("job")
if col != "" {
query = query.GroupBy(col)
} }
for _, f := range filter { for _, f := range filter {
@@ -186,35 +227,19 @@ func (r *JobRepository) buildStatsQuery(
return query return query
} }
// JobsStatsGrouped computes comprehensive job statistics grouped by a dimension (user, project, cluster, or subcluster). // JobsStatsGrouped computes job statistics grouped by a dimension (user, project, cluster, or subcluster).
// // Only columns listed in reqFields are computed; others return 0. User display names are looked up
// This is the primary method for generating aggregated statistics views in the UI, providing // in a separate lightweight query to avoid JOIN overhead on the main aggregation.
// metrics like total jobs, walltime, and resource usage broken down by the specified grouping.
//
// Parameters:
// - ctx: Context for security checks and cancellation
// - filter: Filters to apply (time range, cluster, job state, etc.)
// - page: Optional pagination (ItemsPerPage: -1 disables pagination)
// - sortBy: Optional sort column (totalJobs, totalWalltime, totalCoreHours, etc.)
// - groupBy: Required grouping dimension (User, Project, Cluster, or SubCluster)
//
// Returns a slice of JobsStatistics, one per group, with:
// - ID: The group identifier (username, project name, cluster name, etc.)
// - Name: Display name (for users, from hpc_user.name; empty for other groups)
// - Statistics: totalJobs, totalUsers, totalWalltime, resource usage metrics
//
// Security: Respects user roles via SecurityCheck - users see only their own data unless admin/support.
// Performance: Results are sorted in SQL and pagination applied before scanning rows.
func (r *JobRepository) JobsStatsGrouped( func (r *JobRepository) JobsStatsGrouped(
ctx context.Context, ctx context.Context,
filter []*model.JobFilter, filter []*model.JobFilter,
page *model.PageRequest, page *model.PageRequest,
sortBy *model.SortByAggregate, sortBy *model.SortByAggregate,
groupBy *model.Aggregate, groupBy *model.Aggregate,
reqFields map[string]bool,
) ([]*model.JobsStatistics, error) { ) ([]*model.JobsStatistics, error) {
start := time.Now()
col := groupBy2column[*groupBy] col := groupBy2column[*groupBy]
query := r.buildStatsQuery(filter, col) query := r.buildStatsQuery(filter, col, config.Keys.ShortRunningJobsDuration, reqFields)
query, err := SecurityCheck(ctx, query) query, err := SecurityCheck(ctx, query)
if err != nil { if err != nil {
@@ -230,97 +255,75 @@ func (r *JobRepository) JobsStatsGrouped(
query = query.Offset((uint64(page.Page) - 1) * limit).Limit(limit) query = query.Offset((uint64(page.Page) - 1) * limit).Limit(limit)
} }
rows, err := query.RunWith(r.DB).Query() rows, err := query.RunWith(r.DB).QueryContext(ctx)
if err != nil { if err != nil {
cclog.Warn("Error while querying DB for job statistics") cclog.Warn("Error while querying DB for job statistics")
return nil, err return nil, err
} }
defer rows.Close()
stats := make([]*model.JobsStatistics, 0, 100) stats := make([]*model.JobsStatistics, 0, 100)
for rows.Next() { for rows.Next() {
var id sql.NullString var id sql.NullString
var name sql.NullString var jobs, users, walltime, nodes, nodeHours, cores, coreHours, accs, accHours, runningJobs, shortJobs sql.NullInt64
var jobs, users, walltime, nodes, nodeHours, cores, coreHours, accs, accHours sql.NullInt64 if err := rows.Scan(&id, &jobs, &users, &walltime, &nodes, &nodeHours, &cores, &coreHours, &accs, &accHours, &runningJobs, &shortJobs); err != nil {
if err := rows.Scan(&id, &name, &jobs, &users, &walltime, &nodes, &nodeHours, &cores, &coreHours, &accs, &accHours); err != nil {
cclog.Warnf("Error while scanning rows: %s", err.Error()) cclog.Warnf("Error while scanning rows: %s", err.Error())
return nil, err return nil, err
} }
if id.Valid { if id.Valid {
var totalJobs, totalUsers, totalWalltime, totalNodes, totalNodeHours, totalCores, totalCoreHours, totalAccs, totalAccHours int stats = append(stats,
var personName string &model.JobsStatistics{
ID: id.String,
TotalJobs: int(jobs.Int64),
TotalUsers: int(users.Int64),
TotalWalltime: int(walltime.Int64),
TotalNodes: int(nodes.Int64),
TotalNodeHours: int(nodeHours.Int64),
TotalCores: int(cores.Int64),
TotalCoreHours: int(coreHours.Int64),
TotalAccs: int(accs.Int64),
TotalAccHours: int(accHours.Int64),
RunningJobs: int(runningJobs.Int64),
ShortJobs: int(shortJobs.Int64),
})
}
}
if name.Valid { if err := rows.Err(); err != nil {
personName = name.String return nil, err
} }
if jobs.Valid { // Post-query name lookup for user grouping (avoids LEFT JOIN on aggregation query)
totalJobs = int(jobs.Int64) if col == "job.hpc_user" && len(stats) > 0 {
} usernames := make([]any, len(stats))
for i, s := range stats {
usernames[i] = s.ID
}
if users.Valid { nameQuery := sq.Select("username", "name").From("hpc_user").Where(sq.Eq{"username": usernames})
totalUsers = int(users.Int64) nameRows, err := nameQuery.RunWith(r.DB).QueryContext(ctx)
if err != nil {
cclog.Warnf("Error looking up user names: %s", err.Error())
// Non-fatal: stats are still valid without display names
} else {
defer nameRows.Close()
nameMap := make(map[string]string, len(stats))
for nameRows.Next() {
var username, name string
if err := nameRows.Scan(&username, &name); err == nil {
nameMap[username] = name
}
} }
for _, s := range stats {
if walltime.Valid { if name, ok := nameMap[s.ID]; ok {
totalWalltime = int(walltime.Int64) s.Name = name
} }
if nodes.Valid {
totalNodes = int(nodes.Int64)
}
if cores.Valid {
totalCores = int(cores.Int64)
}
if accs.Valid {
totalAccs = int(accs.Int64)
}
if nodeHours.Valid {
totalNodeHours = int(nodeHours.Int64)
}
if coreHours.Valid {
totalCoreHours = int(coreHours.Int64)
}
if accHours.Valid {
totalAccHours = int(accHours.Int64)
}
if col == "job.hpc_user" {
// name := r.getUserName(ctx, id.String)
stats = append(stats,
&model.JobsStatistics{
ID: id.String,
Name: personName,
TotalJobs: totalJobs,
TotalWalltime: totalWalltime,
TotalNodes: totalNodes,
TotalNodeHours: totalNodeHours,
TotalCores: totalCores,
TotalCoreHours: totalCoreHours,
TotalAccs: totalAccs,
TotalAccHours: totalAccHours,
})
} else {
stats = append(stats,
&model.JobsStatistics{
ID: id.String,
TotalJobs: totalJobs,
TotalUsers: totalUsers,
TotalWalltime: totalWalltime,
TotalNodes: totalNodes,
TotalNodeHours: totalNodeHours,
TotalCores: totalCores,
TotalCoreHours: totalCoreHours,
TotalAccs: totalAccs,
TotalAccHours: totalAccHours,
})
} }
} }
} }
cclog.Debugf("Timer JobsStatsGrouped %s", time.Since(start))
return stats, nil return stats, nil
} }
@@ -342,19 +345,20 @@ func (r *JobRepository) JobsStatsGrouped(
func (r *JobRepository) JobsStats( func (r *JobRepository) JobsStats(
ctx context.Context, ctx context.Context,
filter []*model.JobFilter, filter []*model.JobFilter,
reqFields map[string]bool,
) ([]*model.JobsStatistics, error) { ) ([]*model.JobsStatistics, error) {
start := time.Now() start := time.Now()
query := r.buildStatsQuery(filter, "") query := r.buildStatsQuery(filter, "", config.Keys.ShortRunningJobsDuration, reqFields)
query, err := SecurityCheck(ctx, query) query, err := SecurityCheck(ctx, query)
if err != nil { if err != nil {
return nil, err return nil, err
} }
row := query.RunWith(r.DB).QueryRow() row := query.RunWith(r.DB).QueryRowContext(ctx)
stats := make([]*model.JobsStatistics, 0, 1) stats := make([]*model.JobsStatistics, 0, 1)
var jobs, users, walltime, nodes, nodeHours, cores, coreHours, accs, accHours sql.NullInt64 var jobs, users, walltime, nodes, nodeHours, cores, coreHours, accs, accHours, runningJobs, shortJobs sql.NullInt64
if err := row.Scan(&jobs, &users, &walltime, &nodes, &nodeHours, &cores, &coreHours, &accs, &accHours); err != nil { if err := row.Scan(&jobs, &users, &walltime, &nodes, &nodeHours, &cores, &coreHours, &accs, &accHours, &runningJobs, &shortJobs); err != nil {
cclog.Warn("Error while scanning rows") cclog.Warn("Error while scanning rows")
return nil, err return nil, err
} }
@@ -379,6 +383,8 @@ func (r *JobRepository) JobsStats(
TotalNodeHours: totalNodeHours, TotalNodeHours: totalNodeHours,
TotalCoreHours: totalCoreHours, TotalCoreHours: totalCoreHours,
TotalAccHours: totalAccHours, TotalAccHours: totalAccHours,
RunningJobs: int(runningJobs.Int64),
ShortJobs: int(shortJobs.Int64),
}) })
} }
@@ -435,11 +441,12 @@ func (r *JobRepository) JobCountGrouped(
if err != nil { if err != nil {
return nil, err return nil, err
} }
rows, err := query.RunWith(r.DB).Query() rows, err := query.RunWith(r.DB).QueryContext(ctx)
if err != nil { if err != nil {
cclog.Warn("Error while querying DB for job statistics") cclog.Warn("Error while querying DB for job statistics")
return nil, err return nil, err
} }
defer rows.Close()
stats := make([]*model.JobsStatistics, 0, 100) stats := make([]*model.JobsStatistics, 0, 100)
@@ -459,6 +466,10 @@ func (r *JobRepository) JobCountGrouped(
} }
} }
if err := rows.Err(); err != nil {
return nil, err
}
cclog.Debugf("Timer JobCountGrouped %s", time.Since(start)) cclog.Debugf("Timer JobCountGrouped %s", time.Since(start))
return stats, nil return stats, nil
} }
@@ -491,11 +502,12 @@ func (r *JobRepository) AddJobCountGrouped(
if err != nil { if err != nil {
return nil, err return nil, err
} }
rows, err := query.RunWith(r.DB).Query() rows, err := query.RunWith(r.DB).QueryContext(ctx)
if err != nil { if err != nil {
cclog.Warn("Error while querying DB for job statistics") cclog.Warn("Error while querying DB for job statistics")
return nil, err return nil, err
} }
defer rows.Close()
counts := make(map[string]int) counts := make(map[string]int)
@@ -511,6 +523,10 @@ func (r *JobRepository) AddJobCountGrouped(
} }
} }
if err := rows.Err(); err != nil {
return nil, err
}
switch kind { switch kind {
case "running": case "running":
for _, s := range stats { for _, s := range stats {
@@ -550,23 +566,13 @@ func (r *JobRepository) AddJobCount(
if err != nil { if err != nil {
return nil, err return nil, err
} }
rows, err := query.RunWith(r.DB).Query() var cnt sql.NullInt64
if err != nil { if err := query.RunWith(r.DB).QueryRowContext(ctx).Scan(&cnt); err != nil {
cclog.Warn("Error while querying DB for job statistics") cclog.Warn("Error while querying DB for job count")
return nil, err return nil, err
} }
var count int count := int(cnt.Int64)
for rows.Next() {
var cnt sql.NullInt64
if err := rows.Scan(&cnt); err != nil {
cclog.Warn("Error while scanning rows")
return nil, err
}
count = int(cnt.Int64)
}
switch kind { switch kind {
case "running": case "running":
@@ -636,7 +642,7 @@ func (r *JobRepository) AddHistograms(
var err error var err error
// Return X-Values always as seconds, will be formatted into minutes and hours in frontend // Return X-Values always as seconds, will be formatted into minutes and hours in frontend
value := fmt.Sprintf(`CAST(ROUND(((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) / %d) + 1) as int) as value`, time.Now().Unix(), targetBinSize) value := fmt.Sprintf(`CAST(ROUND(((CASE WHEN job.job_state = 'running' THEN %d - job.start_time ELSE job.duration END) / %d) + 1) as int) as value`, time.Now().Unix(), targetBinSize)
stat.HistDuration, err = r.jobsDurationStatisticsHistogram(ctx, value, filter, targetBinSize, &targetBinCount) stat.HistDuration, err = r.jobsDurationStatisticsHistogram(ctx, value, filter, targetBinSize, &targetBinCount)
if err != nil { if err != nil {
cclog.Warn("Error while loading job statistics histogram: job duration") cclog.Warn("Error while loading job statistics histogram: job duration")
@@ -740,7 +746,7 @@ func (r *JobRepository) jobsStatisticsHistogram(
) ([]*model.HistoPoint, error) { ) ([]*model.HistoPoint, error) {
start := time.Now() start := time.Now()
query, qerr := SecurityCheck(ctx, query, qerr := SecurityCheck(ctx,
sq.Select(value, "COUNT(job.id) AS count").From("job")) sq.Select(value, "COUNT(*) AS count").From("job"))
if qerr != nil { if qerr != nil {
return nil, qerr return nil, qerr
@@ -750,11 +756,12 @@ func (r *JobRepository) jobsStatisticsHistogram(
query = BuildWhereClause(f, query) query = BuildWhereClause(f, query)
} }
rows, err := query.GroupBy("value").RunWith(r.DB).Query() rows, err := query.GroupBy("value").RunWith(r.DB).QueryContext(ctx)
if err != nil { if err != nil {
cclog.Error("Error while running query") cclog.Error("Error while running query")
return nil, err return nil, err
} }
defer rows.Close()
points := make([]*model.HistoPoint, 0) points := make([]*model.HistoPoint, 0)
// is it possible to introduce zero values here? requires info about bincount // is it possible to introduce zero values here? requires info about bincount
@@ -767,6 +774,11 @@ func (r *JobRepository) jobsStatisticsHistogram(
points = append(points, &point) points = append(points, &point)
} }
if err := rows.Err(); err != nil {
return nil, err
}
cclog.Debugf("Timer jobsStatisticsHistogram %s", time.Since(start)) cclog.Debugf("Timer jobsStatisticsHistogram %s", time.Since(start))
return points, nil return points, nil
} }
@@ -800,7 +812,7 @@ func (r *JobRepository) jobsDurationStatisticsHistogram(
) ([]*model.HistoPoint, error) { ) ([]*model.HistoPoint, error) {
start := time.Now() start := time.Now()
query, qerr := SecurityCheck(ctx, query, qerr := SecurityCheck(ctx,
sq.Select(value, "COUNT(job.id) AS count").From("job")) sq.Select(value, "COUNT(*) AS count").From("job"))
if qerr != nil { if qerr != nil {
return nil, qerr return nil, qerr
@@ -808,24 +820,31 @@ func (r *JobRepository) jobsDurationStatisticsHistogram(
// Each bin represents a duration range: bin N = [N*binSizeSeconds, (N+1)*binSizeSeconds) // Each bin represents a duration range: bin N = [N*binSizeSeconds, (N+1)*binSizeSeconds)
// Example: binSizeSeconds=3600 (1 hour), bin 1 = 0-1h, bin 2 = 1-2h, etc. // Example: binSizeSeconds=3600 (1 hour), bin 1 = 0-1h, bin 2 = 1-2h, etc.
points := make([]*model.HistoPoint, 0) points := make([]*model.HistoPoint, 0, *targetBinCount)
for i := 1; i <= *targetBinCount; i++ { for i := 1; i <= *targetBinCount; i++ {
point := model.HistoPoint{Value: i * binSizeSeconds, Count: 0} point := model.HistoPoint{Value: i * binSizeSeconds, Count: 0}
points = append(points, &point) points = append(points, &point)
} }
// Build a map from bin value (seconds) to slice index for O(1) lookup.
binMap := make(map[int]int, len(points))
for i, p := range points {
binMap[p.Value] = i
}
for _, f := range filters { for _, f := range filters {
query = BuildWhereClause(f, query) query = BuildWhereClause(f, query)
} }
rows, err := query.GroupBy("value").RunWith(r.DB).Query() rows, err := query.GroupBy("value").RunWith(r.DB).QueryContext(ctx)
if err != nil { if err != nil {
cclog.Error("Error while running query") cclog.Error("Error while running query")
return nil, err return nil, err
} }
defer rows.Close()
// Match query results to pre-initialized bins. // Match query results to pre-initialized bins using map lookup.
// point.Value from query is the bin number; multiply by binSizeSeconds to match bin.Value. // point.Value from query is the bin number; multiply by binSizeSeconds to get bin key.
for rows.Next() { for rows.Next() {
point := model.HistoPoint{} point := model.HistoPoint{}
if err := rows.Scan(&point.Value, &point.Count); err != nil { if err := rows.Scan(&point.Value, &point.Count); err != nil {
@@ -833,14 +852,15 @@ func (r *JobRepository) jobsDurationStatisticsHistogram(
return nil, err return nil, err
} }
for _, e := range points { if idx, ok := binMap[point.Value*binSizeSeconds]; ok {
if e.Value == (point.Value * binSizeSeconds) { points[idx].Count = point.Count
e.Count = point.Count
break
}
} }
} }
if err := rows.Err(); err != nil {
return nil, err
}
cclog.Debugf("Timer jobsStatisticsHistogram %s", time.Since(start)) cclog.Debugf("Timer jobsStatisticsHistogram %s", time.Since(start))
return points, nil return points, nil
} }
@@ -943,24 +963,34 @@ func (r *JobRepository) jobsMetricStatisticsHistogram(
mainQuery = mainQuery.GroupBy("bin").OrderBy("bin") mainQuery = mainQuery.GroupBy("bin").OrderBy("bin")
rows, err := mainQuery.RunWith(r.DB).Query() rows, err := mainQuery.RunWith(r.DB).QueryContext(ctx)
if err != nil { if err != nil {
cclog.Errorf("Error while running mainQuery: %s", err) cclog.Errorf("Error while running mainQuery: %s", err)
return nil, err return nil, err
} }
defer rows.Close()
// Pre-initialize bins with calculated min/max ranges. // Pre-initialize bins with calculated min/max ranges.
// Example: peak=1000, bins=10 -> bin 1=[0,100), bin 2=[100,200), ..., bin 10=[900,1000] // Example: peak=1000, bins=10 -> bin 1=[0,100), bin 2=[100,200), ..., bin 10=[900,1000]
points := make([]*model.MetricHistoPoint, 0) points := make([]*model.MetricHistoPoint, 0, *bins)
binStep := int(peak) / *bins binStep := int(peak) / *bins
for i := 1; i <= *bins; i++ { for i := 1; i <= *bins; i++ {
binMin := (binStep * (i - 1)) binMin := (binStep * (i - 1))
binMax := (binStep * i) binMax := (binStep * i)
epoint := model.MetricHistoPoint{Bin: &i, Count: 0, Min: &binMin, Max: &binMax} idx := i
epoint := model.MetricHistoPoint{Bin: &idx, Count: 0, Min: &binMin, Max: &binMax}
points = append(points, &epoint) points = append(points, &epoint)
} }
// Match query results to pre-initialized bins. // Build a map from bin number to slice index for O(1) lookup.
binMap := make(map[int]int, len(points))
for i, p := range points {
if p.Bin != nil {
binMap[*p.Bin] = i
}
}
// Match query results to pre-initialized bins using map lookup.
for rows.Next() { for rows.Next() {
rpoint := model.MetricHistoPoint{} rpoint := model.MetricHistoPoint{}
if err := rows.Scan(&rpoint.Bin, &rpoint.Count); err != nil { if err := rows.Scan(&rpoint.Bin, &rpoint.Count); err != nil {
@@ -968,14 +998,17 @@ func (r *JobRepository) jobsMetricStatisticsHistogram(
return nil, err return nil, err
} }
for _, e := range points { if rpoint.Bin != nil {
if e.Bin != nil && rpoint.Bin != nil && *e.Bin == *rpoint.Bin { if idx, ok := binMap[*rpoint.Bin]; ok {
e.Count = rpoint.Count points[idx].Count = rpoint.Count
break
} }
} }
} }
if err := rows.Err(); err != nil {
return nil, err
}
result := model.MetricHistoPoints{Metric: metric, Unit: unit, Stat: &footprintStat, Data: points} result := model.MetricHistoPoints{Metric: metric, Unit: unit, Stat: &footprintStat, Data: points}
cclog.Debugf("Timer jobsStatisticsHistogram %s", time.Since(start)) cclog.Debugf("Timer jobsStatisticsHistogram %s", time.Since(start))

View File

@@ -14,7 +14,7 @@ import (
func TestBuildJobStatsQuery(t *testing.T) { func TestBuildJobStatsQuery(t *testing.T) {
r := setup(t) r := setup(t)
q := r.buildStatsQuery(nil, "USER") q := r.buildStatsQuery(nil, "USER", 300, nil)
sql, _, err := q.ToSql() sql, _, err := q.ToSql()
noErr(t, err) noErr(t, err)
@@ -29,7 +29,7 @@ func TestJobStats(t *testing.T) {
err := r.DB.QueryRow(`SELECT COUNT(*) FROM job`).Scan(&expectedCount) err := r.DB.QueryRow(`SELECT COUNT(*) FROM job`).Scan(&expectedCount)
noErr(t, err) noErr(t, err)
stats, err := r.JobsStats(getContext(t), []*model.JobFilter{}) stats, err := r.JobsStats(getContext(t), []*model.JobFilter{}, nil)
noErr(t, err) noErr(t, err)
if stats[0].TotalJobs != expectedCount { if stats[0].TotalJobs != expectedCount {

View File

@@ -283,6 +283,7 @@ func (r *JobRepository) CountTags(user *schema.User) (tags []schema.Tag, counts
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
defer xrows.Close()
for xrows.Next() { for xrows.Next() {
var t schema.Tag var t schema.Tag
@@ -300,6 +301,10 @@ func (r *JobRepository) CountTags(user *schema.User) (tags []schema.Tag, counts
} }
} }
if err := xrows.Err(); err != nil {
return nil, nil, err
}
// Query and Count Jobs with attached Tags // Query and Count Jobs with attached Tags
q := sq.Select("t.tag_type, t.tag_name, t.id, count(jt.tag_id)"). q := sq.Select("t.tag_type, t.tag_name, t.id, count(jt.tag_id)").
From("tag t"). From("tag t").
@@ -334,6 +339,7 @@ func (r *JobRepository) CountTags(user *schema.User) (tags []schema.Tag, counts
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
defer rows.Close()
counts = make(map[string]int) counts = make(map[string]int)
for rows.Next() { for rows.Next() {
@@ -511,6 +517,7 @@ func (r *JobRepository) GetTags(user *schema.User, job *int64) ([]*schema.Tag, e
cclog.Errorf("Error get tags with %s: %v", s, err) cclog.Errorf("Error get tags with %s: %v", s, err)
return nil, err return nil, err
} }
defer rows.Close()
tags := make([]*schema.Tag, 0) tags := make([]*schema.Tag, 0)
for rows.Next() { for rows.Next() {
@@ -529,6 +536,10 @@ func (r *JobRepository) GetTags(user *schema.User, job *int64) ([]*schema.Tag, e
} }
} }
if err := rows.Err(); err != nil {
return nil, err
}
return tags, nil return tags, nil
} }
@@ -544,6 +555,7 @@ func (r *JobRepository) GetTagsDirect(job *int64) ([]*schema.Tag, error) {
cclog.Errorf("Error get tags with %s: %v", s, err) cclog.Errorf("Error get tags with %s: %v", s, err)
return nil, err return nil, err
} }
defer rows.Close()
tags := make([]*schema.Tag, 0) tags := make([]*schema.Tag, 0)
for rows.Next() { for rows.Next() {
@@ -555,6 +567,10 @@ func (r *JobRepository) GetTagsDirect(job *int64) ([]*schema.Tag, error) {
tags = append(tags, tag) tags = append(tags, tag)
} }
if err := rows.Err(); err != nil {
return nil, err
}
return tags, nil return tags, nil
} }
@@ -582,6 +598,7 @@ func (r *JobRepository) getArchiveTags(job *int64) ([]*schema.Tag, error) {
cclog.Errorf("Error get tags with %s: %v", s, err) cclog.Errorf("Error get tags with %s: %v", s, err)
return nil, err return nil, err
} }
defer rows.Close()
tags := make([]*schema.Tag, 0) tags := make([]*schema.Tag, 0)
for rows.Next() { for rows.Next() {
@@ -593,6 +610,10 @@ func (r *JobRepository) getArchiveTags(job *int64) ([]*schema.Tag, error) {
tags = append(tags, tag) tags = append(tags, tag)
} }
if err := rows.Err(); err != nil {
return nil, err
}
return tags, nil return tags, nil
} }

View File

@@ -102,6 +102,7 @@ func (r *UserRepository) GetLdapUsernames() ([]string, error) {
cclog.Warn("Error while querying usernames") cclog.Warn("Error while querying usernames")
return nil, err return nil, err
} }
defer rows.Close()
for rows.Next() { for rows.Next() {
var username string var username string

View File

@@ -10,6 +10,7 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"maps" "maps"
"math"
"os" "os"
"path/filepath" "path/filepath"
"strings" "strings"
@@ -111,6 +112,29 @@ type JobClassTagger struct {
getMetricConfig func(cluster, subCluster string) map[string]*schema.Metric getMetricConfig func(cluster, subCluster string) map[string]*schema.Metric
} }
// roundEnv returns a copy of env with all float64 values rounded to 2 decimal places.
// Nested map[string]any and map[string]float64 values are recursed into.
func roundEnv(env map[string]any) map[string]any {
rounded := make(map[string]any, len(env))
for k, v := range env {
switch val := v.(type) {
case float64:
rounded[k] = math.Round(val*100) / 100
case map[string]any:
rounded[k] = roundEnv(val)
case map[string]float64:
rm := make(map[string]float64, len(val))
for mk, mv := range val {
rm[mk] = math.Round(mv*100) / 100
}
rounded[k] = rm
default:
rounded[k] = v
}
}
return rounded
}
func (t *JobClassTagger) prepareRule(b []byte, fns string) { func (t *JobClassTagger) prepareRule(b []byte, fns string) {
var rule RuleFormat var rule RuleFormat
if err := json.NewDecoder(bytes.NewReader(b)).Decode(&rule); err != nil { if err := json.NewDecoder(bytes.NewReader(b)).Decode(&rule); err != nil {
@@ -408,7 +432,7 @@ func (t *JobClassTagger) Match(job *schema.Job) {
// process hint template // process hint template
var msg bytes.Buffer var msg bytes.Buffer
if err := ri.hint.Execute(&msg, env); err != nil { if err := ri.hint.Execute(&msg, roundEnv(env)); err != nil {
cclog.Errorf("Template error: %s", err.Error()) cclog.Errorf("Template error: %s", err.Error())
continue continue
} }

View File

@@ -20,14 +20,14 @@
"wonka": "^6.3.5" "wonka": "^6.3.5"
}, },
"devDependencies": { "devDependencies": {
"@rollup/plugin-commonjs": "^29.0.1", "@rollup/plugin-commonjs": "^29.0.2",
"@rollup/plugin-node-resolve": "^16.0.3", "@rollup/plugin-node-resolve": "^16.0.3",
"@rollup/plugin-terser": "^1.0.0", "@rollup/plugin-terser": "^1.0.0",
"@timohausmann/quadtree-js": "^1.2.6", "@timohausmann/quadtree-js": "^1.2.6",
"rollup": "^4.59.0", "rollup": "^4.59.0",
"rollup-plugin-css-only": "^4.5.5", "rollup-plugin-css-only": "^4.5.5",
"rollup-plugin-svelte": "^7.2.3", "rollup-plugin-svelte": "^7.2.3",
"svelte": "^5.53.7" "svelte": "^5.53.9"
} }
}, },
"node_modules/@0no-co/graphql.web": { "node_modules/@0no-co/graphql.web": {
@@ -126,9 +126,9 @@
} }
}, },
"node_modules/@rollup/plugin-commonjs": { "node_modules/@rollup/plugin-commonjs": {
"version": "29.0.1", "version": "29.0.2",
"resolved": "https://registry.npmjs.org/@rollup/plugin-commonjs/-/plugin-commonjs-29.0.1.tgz", "resolved": "https://registry.npmjs.org/@rollup/plugin-commonjs/-/plugin-commonjs-29.0.2.tgz",
"integrity": "sha512-VUEHINN2rQEWPfNUR3mzidRObM1XZKXMQsaG6qBlDqd6M1qyw91nDZvcSozgyjt3x/QKrgKBc5MdxfdxAy6tdg==", "integrity": "sha512-S/ggWH1LU7jTyi9DxZOKyxpVd4hF/OZ0JrEbeLjXk/DFXwRny0tjD2c992zOUYQobLrVkRVMDdmHP16HKP7GRg==",
"dev": true, "dev": true,
"license": "MIT", "license": "MIT",
"dependencies": { "dependencies": {
@@ -1193,9 +1193,9 @@
} }
}, },
"node_modules/svelte": { "node_modules/svelte": {
"version": "5.53.7", "version": "5.53.9",
"resolved": "https://registry.npmjs.org/svelte/-/svelte-5.53.7.tgz", "resolved": "https://registry.npmjs.org/svelte/-/svelte-5.53.9.tgz",
"integrity": "sha512-uxck1KI7JWtlfP3H6HOWi/94soAl23jsGJkBzN2BAWcQng0+lTrRNhxActFqORgnO9BHVd1hKJhG+ljRuIUWfQ==", "integrity": "sha512-MwDfWsN8qZzeP0jlQsWF4k/4B3csb3IbzCRggF+L/QqY7T8bbKvnChEo1cPZztF51HJQhilDbevWYl2LvXbquA==",
"license": "MIT", "license": "MIT",
"dependencies": { "dependencies": {
"@jridgewell/remapping": "^2.3.4", "@jridgewell/remapping": "^2.3.4",

View File

@@ -7,14 +7,14 @@
"dev": "rollup -c -w" "dev": "rollup -c -w"
}, },
"devDependencies": { "devDependencies": {
"@rollup/plugin-commonjs": "^29.0.1", "@rollup/plugin-commonjs": "^29.0.2",
"@rollup/plugin-node-resolve": "^16.0.3", "@rollup/plugin-node-resolve": "^16.0.3",
"@rollup/plugin-terser": "^1.0.0", "@rollup/plugin-terser": "^1.0.0",
"@timohausmann/quadtree-js": "^1.2.6", "@timohausmann/quadtree-js": "^1.2.6",
"rollup": "^4.59.0", "rollup": "^4.59.0",
"rollup-plugin-css-only": "^4.5.5", "rollup-plugin-css-only": "^4.5.5",
"rollup-plugin-svelte": "^7.2.3", "rollup-plugin-svelte": "^7.2.3",
"svelte": "^5.53.7" "svelte": "^5.53.9"
}, },
"dependencies": { "dependencies": {
"@rollup/plugin-replace": "^6.0.3", "@rollup/plugin-replace": "^6.0.3",

View File

@@ -302,11 +302,11 @@
if (subclusterData) { if (subclusterData) {
for (let i = 0; i < subclusterData.length; i++) { for (let i = 0; i < subclusterData.length; i++) {
const flopsData = subclusterData[i].metrics.find((s) => s.name == "flops_any") const flopsData = subclusterData[i]?.metrics?.find((s) => s.name == "flops_any")
const memBwData = subclusterData[i].metrics.find((s) => s.name == "mem_bw") const memBwData = subclusterData[i]?.metrics?.find((s) => s.name == "mem_bw")
const f = flopsData.metric.series[0].statistics.avg const f = flopsData?.metric?.series[0]?.statistics?.avg || 0;
const m = memBwData.metric.series[0].statistics.avg const m = memBwData?.metric?.series[0]?.statistics?.avg || 0;
let intensity = f / m let intensity = f / m
if (Number.isNaN(intensity) || !Number.isFinite(intensity)) { if (Number.isNaN(intensity) || !Number.isFinite(intensity)) {

View File

@@ -34,6 +34,7 @@
formatDurationTime formatDurationTime
} from "./generic/units.js"; } from "./generic/units.js";
import Filters from "./generic/Filters.svelte"; import Filters from "./generic/Filters.svelte";
import Pagination from "./generic/joblist/Pagination.svelte";
/* Svelte 5 Props */ /* Svelte 5 Props */
let { let {
@@ -51,6 +52,8 @@
let jobFilters = $state([]); let jobFilters = $state([]);
let nameFilter = $state(""); let nameFilter = $state("");
let sorting = $state({ field: "totalJobs", direction: "desc" }); let sorting = $state({ field: "totalJobs", direction: "desc" });
let page = $state(1);
let itemsPerPage = $state(25);
/* Derived Vars */ /* Derived Vars */
const fetchRunning = $derived(jobFilters.some(jf => jf?.state?.length == 1 && jf?.state?.includes("running"))); const fetchRunning = $derived(jobFilters.some(jf => jf?.state?.length == 1 && jf?.state?.includes("running")));
@@ -64,6 +67,12 @@
const sortedRows = $derived( const sortedRows = $derived(
$stats.data ? sort($stats.data.rows, sorting, nameFilter) : [] $stats.data ? sort($stats.data.rows, sorting, nameFilter) : []
); );
const paginatedRows = $derived(
sortedRows.slice((page - 1) * itemsPerPage, page * itemsPerPage)
);
/* Reset page when sorting or filter changes */
$effect(() => { sorting; nameFilter; page = 1; });
let stats = $derived( let stats = $derived(
queryStore({ queryStore({
@@ -360,7 +369,7 @@
> >
</tr> </tr>
{:else if $stats.data} {:else if $stats.data}
{#each sort($stats.data.rows, sorting, nameFilter) as row (row.id)} {#each paginatedRows as row (row.id)}
<tr> <tr>
<td> <td>
{#if type == "USER"} {#if type == "USER"}
@@ -402,3 +411,16 @@
{/if} {/if}
</tbody> </tbody>
</Table> </Table>
{#if sortedRows.length > 0}
<Pagination
{page}
{itemsPerPage}
totalItems={sortedRows.length}
itemText={type === 'USER' ? 'Users' : 'Projects'}
pageSizes={[25, 50, 100]}
updatePaging={(detail) => {
itemsPerPage = detail.itemsPerPage;
page = detail.page;
}}
/>
{/if}

View File

@@ -167,7 +167,7 @@
<InputGroup> <InputGroup>
<InputGroupText><Icon name="hdd" /></InputGroupText> <InputGroupText><Icon name="hdd" /></InputGroupText>
<InputGroupText>Selected Node</InputGroupText> <InputGroupText>Selected Node</InputGroupText>
<Input style="background-color: white;" type="text" value="{hostname} [{cluster} {$nodeMetricsData?.data ? `(${$nodeMetricsData.data.nodeMetrics[0].subCluster})` : ''}]" disabled/> <Input style="background-color: white;" type="text" value="{hostname} [{cluster} {$nodeMetricsData?.data?.nodeMetrics[0] ? `(${$nodeMetricsData.data.nodeMetrics[0].subCluster})` : ''}]" disabled/>
</InputGroup> </InputGroup>
</Col> </Col>
<!-- State Col --> <!-- State Col -->
@@ -259,7 +259,7 @@
</CardHeader> </CardHeader>
<CardBody> <CardBody>
<p>No dataset(s) returned for <b>{item.name}</b></p> <p>No dataset(s) returned for <b>{item.name}</b></p>
<p class="mb-1">Metric has been disabled for subcluster <b>{$nodeMetricsData.data.nodeMetrics[0].subCluster}</b>.</p> <p class="mb-1">Metric has been disabled for subcluster <b>{$nodeMetricsData?.data?.nodeMetrics[0]?.subCluster}</b>.</p>
</CardBody> </CardBody>
</Card> </Card>
{:else if item?.metric} {:else if item?.metric}
@@ -267,7 +267,7 @@
metric={item.name} metric={item.name}
timestep={item.metric.timestep} timestep={item.metric.timestep}
cluster={clusterInfos.find((c) => c.name == cluster)} cluster={clusterInfos.find((c) => c.name == cluster)}
subCluster={$nodeMetricsData.data.nodeMetrics[0].subCluster} subCluster={$nodeMetricsData?.data?.nodeMetrics[0]?.subCluster}
series={item.metric.series} series={item.metric.series}
enableFlip enableFlip
forNode forNode
@@ -286,17 +286,17 @@
{/snippet} {/snippet}
<PlotGrid <PlotGrid
items={$nodeMetricsData.data.nodeMetrics[0].metrics items={$nodeMetricsData?.data?.nodeMetrics[0]?.metrics
.map((m) => ({ .map((m) => ({
...m, ...m,
availability: checkMetricAvailability( availability: checkMetricAvailability(
globalMetrics, globalMetrics,
m.name, m.name,
cluster, cluster,
$nodeMetricsData.data.nodeMetrics[0].subCluster, $nodeMetricsData?.data?.nodeMetrics[0]?.subCluster,
), ),
})) }))
.sort((a, b) => a.name.localeCompare(b.name))} .sort((a, b) => a.name.localeCompare(b.name)) || []}
itemsPerRow={ccconfig.plotConfiguration_plotsPerRow} itemsPerRow={ccconfig.plotConfiguration_plotsPerRow}
{gridContent} {gridContent}
/> />