diff --git a/cmd/cc-backend/server.go b/cmd/cc-backend/server.go index 8687db63..bdbb8a7e 100644 --- a/cmd/cc-backend/server.go +++ b/cmd/cc-backend/server.go @@ -20,6 +20,7 @@ import ( "strings" "time" + "github.com/99designs/gqlgen/graphql" "github.com/99designs/gqlgen/graphql/handler" "github.com/99designs/gqlgen/graphql/handler/transport" "github.com/99designs/gqlgen/graphql/playground" @@ -89,6 +90,12 @@ func (s *Server) init() error { graphQLServer.AddTransport(transport.POST{}) + // Inject a per-request stats cache so that grouped statistics queries + // sharing the same (filter, groupBy) pair are executed only once. + graphQLServer.AroundOperations(func(ctx context.Context, next graphql.OperationHandler) graphql.ResponseHandler { + return next(graph.WithStatsGroupCache(ctx)) + }) + if os.Getenv(envDebug) != "1" { // Having this handler means that a error message is returned via GraphQL instead of the connection simply beeing closed. // The problem with this is that then, no more stacktrace is printed to stderr. diff --git a/go.mod b/go.mod index b03e7e0b..b022a386 100644 --- a/go.mod +++ b/go.mod @@ -38,6 +38,7 @@ require ( github.com/vektah/gqlparser/v2 v2.5.32 golang.org/x/crypto v0.48.0 golang.org/x/oauth2 v0.35.0 + golang.org/x/sync v0.19.0 golang.org/x/time v0.14.0 ) @@ -113,7 +114,6 @@ require ( go.yaml.in/yaml/v3 v3.0.4 // indirect golang.org/x/mod v0.33.0 // indirect golang.org/x/net v0.51.0 // indirect - golang.org/x/sync v0.19.0 // indirect golang.org/x/sys v0.41.0 // indirect golang.org/x/text v0.34.0 // indirect golang.org/x/tools v0.42.0 // indirect diff --git a/internal/graph/schema.resolvers.go b/internal/graph/schema.resolvers.go index 8872c6cc..74b4d86c 100644 --- a/internal/graph/schema.resolvers.go +++ b/internal/graph/schema.resolvers.go @@ -673,7 +673,20 @@ func (r *queryResolver) JobsStatistics(ctx context.Context, filter []*model.JobF stats, err = r.Repo.JobsStats(ctx, filter, reqFields) } else { startGrouped := time.Now() - stats, err = r.Repo.JobsStatsGrouped(ctx, filter, page, sortBy, groupBy, reqFields) + // Use request-scoped cache: multiple aliases with same (filter, groupBy) + // but different sortBy/page hit the DB only once. + if cache := getStatsGroupCache(ctx); cache != nil { + key := statsCacheKey(filter, groupBy) + var allStats []*model.JobsStatistics + allStats, err = cache.getOrCompute(key, func() ([]*model.JobsStatistics, error) { + return r.Repo.JobsStatsGrouped(ctx, filter, nil, nil, groupBy, nil) + }) + if err == nil { + stats = sortAndPageStats(allStats, sortBy, page) + } + } else { + stats, err = r.Repo.JobsStatsGrouped(ctx, filter, page, sortBy, groupBy, reqFields) + } cclog.Infof("Timer JobsStatsGrouped call: %s", time.Since(startGrouped)) } } else { diff --git a/internal/graph/stats_cache.go b/internal/graph/stats_cache.go new file mode 100644 index 00000000..2dbb5ca5 --- /dev/null +++ b/internal/graph/stats_cache.go @@ -0,0 +1,135 @@ +// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. This file is part of cc-backend. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package graph + +import ( + "context" + "fmt" + "slices" + "sync" + + "github.com/ClusterCockpit/cc-backend/internal/graph/model" +) + +// statsGroupCache is a per-request cache for grouped JobsStatistics results. +// It deduplicates identical (filter+groupBy) SQL queries that arise when the +// frontend requests multiple sort/page slices of the same underlying data +// (e.g. topUserJobs, topUserNodes, topUserAccs all group by USER). +type statsGroupCache struct { + mu sync.Mutex + entries map[string]*cacheEntry +} + +type cacheEntry struct { + once sync.Once + result []*model.JobsStatistics + err error +} + +type ctxKey int + +const statsGroupCacheKey ctxKey = iota + +// newStatsGroupCache creates a new empty cache. +func newStatsGroupCache() *statsGroupCache { + return &statsGroupCache{ + entries: make(map[string]*cacheEntry), + } +} + +// WithStatsGroupCache injects a new cache into the context. +func WithStatsGroupCache(ctx context.Context) context.Context { + return context.WithValue(ctx, statsGroupCacheKey, newStatsGroupCache()) +} + +// getStatsGroupCache retrieves the cache from context, or nil if absent. +func getStatsGroupCache(ctx context.Context) *statsGroupCache { + if c, ok := ctx.Value(statsGroupCacheKey).(*statsGroupCache); ok { + return c + } + return nil +} + +// cacheKey builds a deterministic string key from filter + groupBy. +func statsCacheKey(filter []*model.JobFilter, groupBy *model.Aggregate) string { + return fmt.Sprintf("%v|%v", filter, *groupBy) +} + +// getOrCompute returns cached results for the given key, computing them on +// first access via the provided function. +func (c *statsGroupCache) getOrCompute( + key string, + compute func() ([]*model.JobsStatistics, error), +) ([]*model.JobsStatistics, error) { + c.mu.Lock() + entry, ok := c.entries[key] + if !ok { + entry = &cacheEntry{} + c.entries[key] = entry + } + c.mu.Unlock() + + entry.once.Do(func() { + entry.result, entry.err = compute() + }) + return entry.result, entry.err +} + +// sortAndPageStats sorts a copy of allStats by the given sortBy field (descending) +// and returns the requested page slice. +func sortAndPageStats(allStats []*model.JobsStatistics, sortBy *model.SortByAggregate, page *model.PageRequest) []*model.JobsStatistics { + // Work on a shallow copy so the cached slice order is not mutated. + sorted := make([]*model.JobsStatistics, len(allStats)) + copy(sorted, allStats) + + if sortBy != nil { + getter := statsFieldGetter(*sortBy) + slices.SortFunc(sorted, func(a, b *model.JobsStatistics) int { + return getter(b) - getter(a) // descending + }) + } + + if page != nil && page.ItemsPerPage != -1 { + start := (page.Page - 1) * page.ItemsPerPage + if start >= len(sorted) { + return nil + } + end := start + page.ItemsPerPage + if end > len(sorted) { + end = len(sorted) + } + sorted = sorted[start:end] + } + + return sorted +} + +// statsFieldGetter returns a function that extracts the sortable int field +// from a JobsStatistics struct for the given sort key. +func statsFieldGetter(sortBy model.SortByAggregate) func(*model.JobsStatistics) int { + switch sortBy { + case model.SortByAggregateTotaljobs: + return func(s *model.JobsStatistics) int { return s.TotalJobs } + case model.SortByAggregateTotalusers: + return func(s *model.JobsStatistics) int { return s.TotalUsers } + case model.SortByAggregateTotalwalltime: + return func(s *model.JobsStatistics) int { return s.TotalWalltime } + case model.SortByAggregateTotalnodes: + return func(s *model.JobsStatistics) int { return s.TotalNodes } + case model.SortByAggregateTotalnodehours: + return func(s *model.JobsStatistics) int { return s.TotalNodeHours } + case model.SortByAggregateTotalcores: + return func(s *model.JobsStatistics) int { return s.TotalCores } + case model.SortByAggregateTotalcorehours: + return func(s *model.JobsStatistics) int { return s.TotalCoreHours } + case model.SortByAggregateTotalaccs: + return func(s *model.JobsStatistics) int { return s.TotalAccs } + case model.SortByAggregateTotalacchours: + return func(s *model.JobsStatistics) int { return s.TotalAccHours } + default: + return func(s *model.JobsStatistics) int { return s.TotalJobs } + } +} diff --git a/internal/repository/jobQuery.go b/internal/repository/jobQuery.go index 445d91d3..72951746 100644 --- a/internal/repository/jobQuery.go +++ b/internal/repository/jobQuery.go @@ -198,8 +198,10 @@ func BuildWhereClause(filter *model.JobFilter, query sq.SelectBuilder) sq.Select } if filter.State != nil { if len(filter.State) == 1 { + // Inline literal value so SQLite can match partial indexes (WHERE job_state = 'running'). + // Safe: values come from validated GraphQL enum (model.JobState). singleStat := string(filter.State[0]) - query = query.Where("job.job_state = ?", singleStat) + query = query.Where(fmt.Sprintf("job.job_state = '%s'", singleStat)) } else { states := make([]string, len(filter.State)) for i, val := range filter.State { diff --git a/internal/repository/migration.go b/internal/repository/migration.go index 5a530d2d..a2c09c1f 100644 --- a/internal/repository/migration.go +++ b/internal/repository/migration.go @@ -21,13 +21,14 @@ import ( // is added to internal/repository/migrations/sqlite3/. // // Version history: +// - Version 14: Partial covering indexes for running jobs (tiny B-tree vs millions) // - Version 13: Add covering indexes for status/dashboard queries (cluster, job_state, ...) // - Version 12: Add covering index for stats queries (cluster, start_time, hpc_user, ...) // - Version 11: Optimize job table indexes (reduce from ~78 to 48) // - Version 10: Node table // // Migration files are embedded at build time from the migrations directory. -const Version uint = 13 +const Version uint = 14 //go:embed migrations/* var migrationFiles embed.FS diff --git a/internal/repository/migrations/sqlite3/14_running-partial-indexes.down.sql b/internal/repository/migrations/sqlite3/14_running-partial-indexes.down.sql new file mode 100644 index 00000000..6a831512 --- /dev/null +++ b/internal/repository/migrations/sqlite3/14_running-partial-indexes.down.sql @@ -0,0 +1,5 @@ +-- Reverse migration 14: Drop partial indexes for running jobs + +DROP INDEX IF EXISTS jobs_running_user_stats; +DROP INDEX IF EXISTS jobs_running_project_stats; +DROP INDEX IF EXISTS jobs_running_subcluster_stats; diff --git a/internal/repository/migrations/sqlite3/14_running-partial-indexes.up.sql b/internal/repository/migrations/sqlite3/14_running-partial-indexes.up.sql new file mode 100644 index 00000000..8d7217d8 --- /dev/null +++ b/internal/repository/migrations/sqlite3/14_running-partial-indexes.up.sql @@ -0,0 +1,18 @@ +-- Migration 14: Partial covering indexes for running jobs +-- Only running jobs are in the B-tree, so these indexes are tiny compared to +-- the full-table indexes from migration 13. SQLite uses them when the query +-- contains the literal `job_state = 'running'` (not a parameter placeholder). + +CREATE INDEX IF NOT EXISTS jobs_running_user_stats + ON job (cluster, hpc_user, num_nodes, num_hwthreads, num_acc, duration, start_time) + WHERE job_state = 'running'; + +CREATE INDEX IF NOT EXISTS jobs_running_project_stats + ON job (cluster, project, num_nodes, num_hwthreads, num_acc, duration, start_time) + WHERE job_state = 'running'; + +CREATE INDEX IF NOT EXISTS jobs_running_subcluster_stats + ON job (cluster, subcluster, num_nodes, num_hwthreads, num_acc, duration, start_time) + WHERE job_state = 'running'; + +PRAGMA optimize; diff --git a/internal/repository/stats.go b/internal/repository/stats.go index 8311e797..6d29a5e5 100644 --- a/internal/repository/stats.go +++ b/internal/repository/stats.go @@ -55,6 +55,7 @@ import ( cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" "github.com/ClusterCockpit/cc-lib/v2/schema" sq "github.com/Masterminds/squirrel" + "golang.org/x/sync/errgroup" ) // groupBy2column maps GraphQL Aggregate enum values to their corresponding database column names. @@ -640,30 +641,45 @@ func (r *JobRepository) AddHistograms( targetBinSize = 3600 } - var err error - // Return X-Values always as seconds, will be formatted into minutes and hours in frontend + // Run all 4 histogram queries in parallel — each writes a distinct struct field. + g, gctx := errgroup.WithContext(ctx) + value := fmt.Sprintf(`CAST(ROUND(((CASE WHEN job.job_state = 'running' THEN %d - job.start_time ELSE job.duration END) / %d) + 1) as int) as value`, time.Now().Unix(), targetBinSize) - stat.HistDuration, err = r.jobsDurationStatisticsHistogram(ctx, value, filter, targetBinSize, &targetBinCount) - if err != nil { - cclog.Warn("Error while loading job statistics histogram: job duration") - return nil, err - } - stat.HistNumNodes, err = r.jobsStatisticsHistogram(ctx, "job.num_nodes as value", filter) - if err != nil { - cclog.Warn("Error while loading job statistics histogram: num nodes") - return nil, err - } + g.Go(func() error { + var err error + stat.HistDuration, err = r.jobsDurationStatisticsHistogram(gctx, value, filter, targetBinSize, &targetBinCount) + if err != nil { + cclog.Warn("Error while loading job statistics histogram: job duration") + } + return err + }) + g.Go(func() error { + var err error + stat.HistNumNodes, err = r.jobsStatisticsHistogram(gctx, "job.num_nodes as value", filter) + if err != nil { + cclog.Warn("Error while loading job statistics histogram: num nodes") + } + return err + }) + g.Go(func() error { + var err error + stat.HistNumCores, err = r.jobsStatisticsHistogram(gctx, "job.num_hwthreads as value", filter) + if err != nil { + cclog.Warn("Error while loading job statistics histogram: num hwthreads") + } + return err + }) + g.Go(func() error { + var err error + stat.HistNumAccs, err = r.jobsStatisticsHistogram(gctx, "job.num_acc as value", filter) + if err != nil { + cclog.Warn("Error while loading job statistics histogram: num acc") + } + return err + }) - stat.HistNumCores, err = r.jobsStatisticsHistogram(ctx, "job.num_hwthreads as value", filter) - if err != nil { - cclog.Warn("Error while loading job statistics histogram: num hwthreads") - return nil, err - } - - stat.HistNumAccs, err = r.jobsStatisticsHistogram(ctx, "job.num_acc as value", filter) - if err != nil { - cclog.Warn("Error while loading job statistics histogram: num acc") + if err := g.Wait(); err != nil { return nil, err } diff --git a/web/frontend/src/status/dashdetails/UsageDash.svelte b/web/frontend/src/status/dashdetails/UsageDash.svelte index 29a34697..45e5f7b2 100644 --- a/web/frontend/src/status/dashdetails/UsageDash.svelte +++ b/web/frontend/src/status/dashdetails/UsageDash.svelte @@ -71,75 +71,68 @@ ? queryStore({ client: client, query: gql` - query ($filter: [JobFilter!]!, $paging: PageRequest!) { - topUserJobs: jobsStatistics( + query ($filter: [JobFilter!]!) { + allUsers: jobsStatistics( filter: $filter - page: $paging - sortBy: TOTALJOBS groupBy: USER ) { id name totalJobs - } - topProjectJobs: jobsStatistics( - filter: $filter - page: $paging - sortBy: TOTALJOBS - groupBy: PROJECT - ) { - id - totalJobs - } - topUserNodes: jobsStatistics( - filter: $filter - page: $paging - sortBy: TOTALNODES - groupBy: USER - ) { - id - name totalNodes - } - topProjectNodes: jobsStatistics( - filter: $filter - page: $paging - sortBy: TOTALNODES - groupBy: PROJECT - ) { - id - totalNodes - } - topUserAccs: jobsStatistics( - filter: $filter - page: $paging - sortBy: TOTALACCS - groupBy: USER - ) { - id - name totalAccs } - topProjectAccs: jobsStatistics( + allProjects: jobsStatistics( filter: $filter - page: $paging - sortBy: TOTALACCS groupBy: PROJECT ) { id + totalJobs + totalNodes totalAccs } } `, variables: { filter: statusFilter, - paging: pagingState, // Top 10 }, requestPolicy: "network-only", }) : null, ); + // Sort + slice top-10 from the full results in the frontend + const topUserJobs = $derived( + $topStatsQuery?.data?.allUsers + ?.toSorted((a, b) => b.totalJobs - a.totalJobs) + .slice(0, 10) ?? [], + ); + const topProjectJobs = $derived( + $topStatsQuery?.data?.allProjects + ?.toSorted((a, b) => b.totalJobs - a.totalJobs) + .slice(0, 10) ?? [], + ); + const topUserNodes = $derived( + $topStatsQuery?.data?.allUsers + ?.toSorted((a, b) => b.totalNodes - a.totalNodes) + .slice(0, 10) ?? [], + ); + const topProjectNodes = $derived( + $topStatsQuery?.data?.allProjects + ?.toSorted((a, b) => b.totalNodes - a.totalNodes) + .slice(0, 10) ?? [], + ); + const topUserAccs = $derived( + $topStatsQuery?.data?.allUsers + ?.toSorted((a, b) => b.totalAccs - a.totalAccs) + .slice(0, 10) ?? [], + ); + const topProjectAccs = $derived( + $topStatsQuery?.data?.allProjects + ?.toSorted((a, b) => b.totalAccs - a.totalAccs) + .slice(0, 10) ?? [], + ); + // Note: nodeMetrics are requested on configured $timestep resolution const nodeStatusQuery = $derived( loadMe @@ -255,10 +248,10 @@ canvasId="{canvasPrefix}-hpcpie-jobs-users" size={colWidthJobs * 0.75} sliceLabel="Jobs" - quantities={$topStatsQuery.data.topUserJobs.map( + quantities={topUserJobs.map( (tu) => tu["totalJobs"], )} - entities={$topStatsQuery.data.topUserJobs.map((tu) => + entities={topUserJobs.map((tu) => scrambleNames ? scramble(tu.id) : tu.id, )} /> @@ -271,7 +264,7 @@ User Jobs - {#each $topStatsQuery.data.topUserJobs as tu, i} + {#each topUserJobs as tu, i} tp["totalJobs"], )} - entities={$topStatsQuery.data.topProjectJobs.map((tp) => + entities={topProjectJobs.map((tp) => scrambleNames ? scramble(tp.id) : tp.id, )} /> @@ -320,7 +313,7 @@ Project Jobs - {#each $topStatsQuery.data.topProjectJobs as tp, i} + {#each topProjectJobs as tp, i} tu["totalNodes"], )} - entities={$topStatsQuery.data.topUserNodes.map((tu) => + entities={topUserNodes.map((tu) => scrambleNames ? scramble(tu.id) : tu.id, )} /> @@ -391,7 +384,7 @@ User Nodes - {#each $topStatsQuery.data.topUserNodes as tu, i} + {#each topUserNodes as tu, i} tp["totalNodes"], )} - entities={$topStatsQuery.data.topProjectNodes.map((tp) => + entities={topProjectNodes.map((tp) => scrambleNames ? scramble(tp.id) : tp.id, )} /> @@ -440,7 +433,7 @@ Project Nodes - {#each $topStatsQuery.data.topProjectNodes as tp, i} + {#each topProjectNodes as tp, i} tu["totalAccs"], )} - entities={$topStatsQuery.data.topUserAccs.map((tu) => + entities={topUserAccs.map((tu) => scrambleNames ? scramble(tu.id) : tu.id, )} /> @@ -511,7 +504,7 @@ User GPUs - {#each $topStatsQuery.data.topUserAccs as tu, i} + {#each topUserAccs as tu, i} tp["totalAccs"], )} - entities={$topStatsQuery.data.topProjectAccs.map((tp) => + entities={topProjectAccs.map((tp) => scrambleNames ? scramble(tp.id) : tp.id, )} /> @@ -560,7 +553,7 @@ Project GPUs - {#each $topStatsQuery.data.topProjectAccs as tp, i} + {#each topProjectAccs as tp, i}