Optimize usage dashboard: partial indexes, request cache, parallel histograms

- Add migration 14: partial covering indexes WHERE job_state='running'
  for user/project/subcluster groupings (tiny B-tree vs full table)
- Inline literal state value in BuildWhereClause so SQLite matches
  partial indexes instead of parameterized placeholders
- Add per-request statsGroupCache (sync.Once per filter+groupBy key)
  so identical grouped stats queries execute only once per GQL operation
- Parallelize 4 histogram queries in AddHistograms using errgroup
- Consolidate frontend from 6 GQL aliases to 2, sort+slice top-10
  client-side via $derived

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Entire-Checkpoint: 5b26a6e5ff10
This commit is contained in:
2026-03-13 14:31:37 +01:00
parent cbe46c3524
commit d586fe4b43
10 changed files with 277 additions and 87 deletions

View File

@@ -673,7 +673,20 @@ func (r *queryResolver) JobsStatistics(ctx context.Context, filter []*model.JobF
stats, err = r.Repo.JobsStats(ctx, filter, reqFields)
} else {
startGrouped := time.Now()
stats, err = r.Repo.JobsStatsGrouped(ctx, filter, page, sortBy, groupBy, reqFields)
// Use request-scoped cache: multiple aliases with same (filter, groupBy)
// but different sortBy/page hit the DB only once.
if cache := getStatsGroupCache(ctx); cache != nil {
key := statsCacheKey(filter, groupBy)
var allStats []*model.JobsStatistics
allStats, err = cache.getOrCompute(key, func() ([]*model.JobsStatistics, error) {
return r.Repo.JobsStatsGrouped(ctx, filter, nil, nil, groupBy, nil)
})
if err == nil {
stats = sortAndPageStats(allStats, sortBy, page)
}
} else {
stats, err = r.Repo.JobsStatsGrouped(ctx, filter, page, sortBy, groupBy, reqFields)
}
cclog.Infof("Timer JobsStatsGrouped call: %s", time.Since(startGrouped))
}
} else {

View File

@@ -0,0 +1,135 @@
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved. This file is part of cc-backend.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package graph
import (
"context"
"fmt"
"slices"
"sync"
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
)
// statsGroupCache is a per-request cache for grouped JobsStatistics results.
// It deduplicates identical (filter+groupBy) SQL queries that arise when the
// frontend requests multiple sort/page slices of the same underlying data
// (e.g. topUserJobs, topUserNodes, topUserAccs all group by USER).
type statsGroupCache struct {
mu sync.Mutex
entries map[string]*cacheEntry
}
type cacheEntry struct {
once sync.Once
result []*model.JobsStatistics
err error
}
type ctxKey int
const statsGroupCacheKey ctxKey = iota
// newStatsGroupCache creates a new empty cache.
func newStatsGroupCache() *statsGroupCache {
return &statsGroupCache{
entries: make(map[string]*cacheEntry),
}
}
// WithStatsGroupCache injects a new cache into the context.
func WithStatsGroupCache(ctx context.Context) context.Context {
return context.WithValue(ctx, statsGroupCacheKey, newStatsGroupCache())
}
// getStatsGroupCache retrieves the cache from context, or nil if absent.
func getStatsGroupCache(ctx context.Context) *statsGroupCache {
if c, ok := ctx.Value(statsGroupCacheKey).(*statsGroupCache); ok {
return c
}
return nil
}
// cacheKey builds a deterministic string key from filter + groupBy.
func statsCacheKey(filter []*model.JobFilter, groupBy *model.Aggregate) string {
return fmt.Sprintf("%v|%v", filter, *groupBy)
}
// getOrCompute returns cached results for the given key, computing them on
// first access via the provided function.
func (c *statsGroupCache) getOrCompute(
key string,
compute func() ([]*model.JobsStatistics, error),
) ([]*model.JobsStatistics, error) {
c.mu.Lock()
entry, ok := c.entries[key]
if !ok {
entry = &cacheEntry{}
c.entries[key] = entry
}
c.mu.Unlock()
entry.once.Do(func() {
entry.result, entry.err = compute()
})
return entry.result, entry.err
}
// sortAndPageStats sorts a copy of allStats by the given sortBy field (descending)
// and returns the requested page slice.
func sortAndPageStats(allStats []*model.JobsStatistics, sortBy *model.SortByAggregate, page *model.PageRequest) []*model.JobsStatistics {
// Work on a shallow copy so the cached slice order is not mutated.
sorted := make([]*model.JobsStatistics, len(allStats))
copy(sorted, allStats)
if sortBy != nil {
getter := statsFieldGetter(*sortBy)
slices.SortFunc(sorted, func(a, b *model.JobsStatistics) int {
return getter(b) - getter(a) // descending
})
}
if page != nil && page.ItemsPerPage != -1 {
start := (page.Page - 1) * page.ItemsPerPage
if start >= len(sorted) {
return nil
}
end := start + page.ItemsPerPage
if end > len(sorted) {
end = len(sorted)
}
sorted = sorted[start:end]
}
return sorted
}
// statsFieldGetter returns a function that extracts the sortable int field
// from a JobsStatistics struct for the given sort key.
func statsFieldGetter(sortBy model.SortByAggregate) func(*model.JobsStatistics) int {
switch sortBy {
case model.SortByAggregateTotaljobs:
return func(s *model.JobsStatistics) int { return s.TotalJobs }
case model.SortByAggregateTotalusers:
return func(s *model.JobsStatistics) int { return s.TotalUsers }
case model.SortByAggregateTotalwalltime:
return func(s *model.JobsStatistics) int { return s.TotalWalltime }
case model.SortByAggregateTotalnodes:
return func(s *model.JobsStatistics) int { return s.TotalNodes }
case model.SortByAggregateTotalnodehours:
return func(s *model.JobsStatistics) int { return s.TotalNodeHours }
case model.SortByAggregateTotalcores:
return func(s *model.JobsStatistics) int { return s.TotalCores }
case model.SortByAggregateTotalcorehours:
return func(s *model.JobsStatistics) int { return s.TotalCoreHours }
case model.SortByAggregateTotalaccs:
return func(s *model.JobsStatistics) int { return s.TotalAccs }
case model.SortByAggregateTotalacchours:
return func(s *model.JobsStatistics) int { return s.TotalAccHours }
default:
return func(s *model.JobsStatistics) int { return s.TotalJobs }
}
}

View File

@@ -198,8 +198,10 @@ func BuildWhereClause(filter *model.JobFilter, query sq.SelectBuilder) sq.Select
}
if filter.State != nil {
if len(filter.State) == 1 {
// Inline literal value so SQLite can match partial indexes (WHERE job_state = 'running').
// Safe: values come from validated GraphQL enum (model.JobState).
singleStat := string(filter.State[0])
query = query.Where("job.job_state = ?", singleStat)
query = query.Where(fmt.Sprintf("job.job_state = '%s'", singleStat))
} else {
states := make([]string, len(filter.State))
for i, val := range filter.State {

View File

@@ -21,13 +21,14 @@ import (
// is added to internal/repository/migrations/sqlite3/.
//
// Version history:
// - Version 14: Partial covering indexes for running jobs (tiny B-tree vs millions)
// - Version 13: Add covering indexes for status/dashboard queries (cluster, job_state, ...)
// - Version 12: Add covering index for stats queries (cluster, start_time, hpc_user, ...)
// - Version 11: Optimize job table indexes (reduce from ~78 to 48)
// - Version 10: Node table
//
// Migration files are embedded at build time from the migrations directory.
const Version uint = 13
const Version uint = 14
//go:embed migrations/*
var migrationFiles embed.FS

View File

@@ -0,0 +1,5 @@
-- Reverse migration 14: Drop partial indexes for running jobs
DROP INDEX IF EXISTS jobs_running_user_stats;
DROP INDEX IF EXISTS jobs_running_project_stats;
DROP INDEX IF EXISTS jobs_running_subcluster_stats;

View File

@@ -0,0 +1,18 @@
-- Migration 14: Partial covering indexes for running jobs
-- Only running jobs are in the B-tree, so these indexes are tiny compared to
-- the full-table indexes from migration 13. SQLite uses them when the query
-- contains the literal `job_state = 'running'` (not a parameter placeholder).
CREATE INDEX IF NOT EXISTS jobs_running_user_stats
ON job (cluster, hpc_user, num_nodes, num_hwthreads, num_acc, duration, start_time)
WHERE job_state = 'running';
CREATE INDEX IF NOT EXISTS jobs_running_project_stats
ON job (cluster, project, num_nodes, num_hwthreads, num_acc, duration, start_time)
WHERE job_state = 'running';
CREATE INDEX IF NOT EXISTS jobs_running_subcluster_stats
ON job (cluster, subcluster, num_nodes, num_hwthreads, num_acc, duration, start_time)
WHERE job_state = 'running';
PRAGMA optimize;

View File

@@ -55,6 +55,7 @@ import (
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
"github.com/ClusterCockpit/cc-lib/v2/schema"
sq "github.com/Masterminds/squirrel"
"golang.org/x/sync/errgroup"
)
// groupBy2column maps GraphQL Aggregate enum values to their corresponding database column names.
@@ -640,30 +641,45 @@ func (r *JobRepository) AddHistograms(
targetBinSize = 3600
}
var err error
// Return X-Values always as seconds, will be formatted into minutes and hours in frontend
// Run all 4 histogram queries in parallel — each writes a distinct struct field.
g, gctx := errgroup.WithContext(ctx)
value := fmt.Sprintf(`CAST(ROUND(((CASE WHEN job.job_state = 'running' THEN %d - job.start_time ELSE job.duration END) / %d) + 1) as int) as value`, time.Now().Unix(), targetBinSize)
stat.HistDuration, err = r.jobsDurationStatisticsHistogram(ctx, value, filter, targetBinSize, &targetBinCount)
if err != nil {
cclog.Warn("Error while loading job statistics histogram: job duration")
return nil, err
}
stat.HistNumNodes, err = r.jobsStatisticsHistogram(ctx, "job.num_nodes as value", filter)
if err != nil {
cclog.Warn("Error while loading job statistics histogram: num nodes")
return nil, err
}
g.Go(func() error {
var err error
stat.HistDuration, err = r.jobsDurationStatisticsHistogram(gctx, value, filter, targetBinSize, &targetBinCount)
if err != nil {
cclog.Warn("Error while loading job statistics histogram: job duration")
}
return err
})
g.Go(func() error {
var err error
stat.HistNumNodes, err = r.jobsStatisticsHistogram(gctx, "job.num_nodes as value", filter)
if err != nil {
cclog.Warn("Error while loading job statistics histogram: num nodes")
}
return err
})
g.Go(func() error {
var err error
stat.HistNumCores, err = r.jobsStatisticsHistogram(gctx, "job.num_hwthreads as value", filter)
if err != nil {
cclog.Warn("Error while loading job statistics histogram: num hwthreads")
}
return err
})
g.Go(func() error {
var err error
stat.HistNumAccs, err = r.jobsStatisticsHistogram(gctx, "job.num_acc as value", filter)
if err != nil {
cclog.Warn("Error while loading job statistics histogram: num acc")
}
return err
})
stat.HistNumCores, err = r.jobsStatisticsHistogram(ctx, "job.num_hwthreads as value", filter)
if err != nil {
cclog.Warn("Error while loading job statistics histogram: num hwthreads")
return nil, err
}
stat.HistNumAccs, err = r.jobsStatisticsHistogram(ctx, "job.num_acc as value", filter)
if err != nil {
cclog.Warn("Error while loading job statistics histogram: num acc")
if err := g.Wait(); err != nil {
return nil, err
}