diff --git a/Makefile b/Makefile index 0378b700..3246538a 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,6 @@ TARGET = ./cc-backend FRONTEND = ./web/frontend -VERSION = 1.4.4 +VERSION = 1.5.0 GIT_HASH := $(shell git rev-parse --short HEAD || echo 'development') CURRENT_TIME = $(shell date +"%Y-%m-%d:T%H:%M:%S") LD_FLAGS = '-s -X main.date=${CURRENT_TIME} -X main.version=${VERSION} -X main.commit=${GIT_HASH}' @@ -46,7 +46,7 @@ $(TARGET): $(SVELTE_TARGETS) frontend: $(info ===> BUILD frontend) - cd web/frontend && npm install && npm run build + cd web/frontend && npm ci && npm run build swagger: $(info ===> GENERATE swagger) diff --git a/ReleaseNotes.md b/ReleaseNotes.md index 860f62a4..7ea43620 100644 --- a/ReleaseNotes.md +++ b/ReleaseNotes.md @@ -1,42 +1,183 @@ -# `cc-backend` version 1.4.4 +# `cc-backend` version 1.5.0 -Supports job archive version 2 and database version 8. +Supports job archive version 3 and database version 10. -This is a bug fix release of `cc-backend`, the API backend and frontend +This is a feature release of `cc-backend`, the API backend and frontend implementation of ClusterCockpit. For release specific notes visit the [ClusterCockpit Documentation](https://clusterockpit.org/docs/release/). ## Breaking changes -The option `apiAllowedIPs` is now a required configuration attribute in -`config.json`. This option restricts access to the admin API. +### Configuration changes -To retain the previous behavior that the API is per default accessible from -everywhere set: +- **JSON attribute naming**: All JSON configuration attributes now use `kebab-case` + style consistently (e.g., `api-allowed-ips` instead of `apiAllowedIPs`). + Update your `config.json` accordingly. +- **Removed `disable-archive` option**: This obsolete configuration option has been removed. +- **Removed `clusters` config section**: The separate clusters configuration section + has been removed. Cluster information is now derived from the job archive. +- **`apiAllowedIPs` is now optional**: If not specified, defaults to secure settings. + +### Architecture changes + +- **MetricStore moved**: The `metricstore` package has been moved from `internal/` + to `pkg/` as it is now part of the public API. +- **MySQL/MariaDB support removed**: Only SQLite is now supported as the database backend. +- **Archive to Cleanup renaming**: Archive-related functions have been refactored + and renamed to "Cleanup" for clarity. + +### Dependency changes + +- **cc-lib v2**: Switched to cc-lib version 2 with updated APIs +- **cclib NATS client**: Now using the cclib NATS client implementation +- Removed obsolete `util.Float` usage from cclib + +## Major new features + +### NATS API Integration + +- **Real-time job events**: Subscribe to job start/stop events via NATS +- **Node state updates**: Receive real-time node state changes via NATS +- **Configurable subjects**: NATS API subjects are now configurable via `api-subjects` +- **Deadlock fixes**: Improved NATS client stability and graceful shutdown + +### Public Dashboard + +- **Public-facing interface**: New public dashboard route for external users +- **DoubleMetricPlot component**: New visualization component for comparing metrics +- **Improved layout**: Reviewed and optimized dashboard layouts for better readability + +### Enhanced Node Management + +- **Node state tracking**: New node table in database with timestamp tracking +- **Node state filtering**: Filter jobs by node state in systems view +- **Node metrics improvements**: Better handling of node-level metrics and data +- **Node list enhancements**: Improved paging, filtering, and continuous scroll support + +### MetricStore Improvements + +- **Memory tracking worker**: New worker for CCMS memory usage tracking +- **Dynamic retention**: Support for cluster/subcluster-specific retention times +- **Improved compression**: Transparent compression for job archive imports +- **Parallel processing**: Parallelized Iter function in all archive backends + +### Job Tagging System + +- **Job tagger option**: Enable automatic job tagging via configuration flag +- **Application detection**: Automatic detection of applications (MATLAB, GROMACS, etc.) +- **Job classifaction**: Automatic detection of pathological jobs +- **omitTagged flag**: Option to exclude tagged jobs from retention/cleanup operations + +### Archive Backends + +- **S3 backend**: Full support for S3-compatible object storage +- **SQLite backend**: Full support for SQLite backend using blobs +- **Performance improvements**: Fixed performance bugs in archive backends +- **Better error handling**: Improved error messages and fallback handling + +## New features and improvements + +### Frontend + +- **Loading indicators**: Added loading indicators to status detail and job lists +- **Job info layout**: Reviewed and improved job info row layout +- **Metric selection**: Enhanced metric selection with drag-and-drop fixes +- **Filter presets**: Move list filter preset to URL for easy sharing +- **Job comparison**: Improved job comparison views and plots +- **Subcluster reactivity**: Job list now reacts to subcluster filter changes +- **Frontend dependencies**: Bumped frontend dependencies to latest versions +- **Svelte 5 compatibility**: Fixed Svelte state warnings and compatibility issues + +### Backend + +- **Progress bars**: Import function now shows progress during long operations +- **Better logging**: Improved logging with appropriate log levels throughout +- **Graceful shutdown**: Fixed shutdown timeout bugs and hanging issues +- **Configuration defaults**: Sensible defaults for most configuration options +- **Documentation**: Extensive documentation improvements across packages + +### API improvements + +- **Role-based metric visibility**: Metrics can now have role-based access control +- **Job exclusivity filter**: New filter for exclusive vs. shared jobs +- **Improved error messages**: Better error messages and documentation in REST API +- **GraphQL enhancements**: Improved GraphQL queries and resolvers + +### Performance + +- **Database indices**: Optimized SQLite indices for better query performance +- **Job cache**: Introduced caching table for faster job inserts +- **Parallel imports**: Archive imports now run in parallel where possible +- **External tool integration**: Optimized use of external tools (fd) for better performance + +### Developer experience + +- **AI agent guidelines**: Added documentation for AI coding agents (AGENTS.md, CLAUDE.md) +- **Example API payloads**: Added example JSON API payloads for testing +- **Unit tests**: Added more unit tests for NATS API and other components +- **Test improvements**: Better test coverage and test data + +## Bug fixes + +- Fixed nodelist paging issues +- Fixed metric select drag and drop functionality +- Fixed render race conditions in nodeList +- Fixed tag count grouping including type +- Fixed wrong metricstore schema (missing comma) +- Fixed configuration issues causing shutdown hangs +- Fixed deadlock when NATS is not configured +- Fixed archive backend performance bugs +- Fixed continuous scroll buildup on refresh +- Improved footprint calculation logic +- Fixed polar plot data query decoupling +- Fixed missing resolution parameter handling +- Fixed node table initialization fallback + +## Configuration changes + +### New configuration options ```json - "apiAllowedIPs": [ - "*" - ] +{ + "main": { + "enable-job-taggers": true, + "resampling": { + "minimum-points": 600, + "trigger": 180, + "resolutions": [240, 60] + }, + "api-subjects": { + "subject-job-event": "cc.job.event", + "subject-node-state": "cc.node.state" + } + }, + "nats": { + "address": "nats://0.0.0.0:4222", + "username": "root", + "password": "root" + }, + "cron": { + "commit-job-worker": "1m", + "duration-worker": "5m", + "footprint-worker": "10m" + }, + "metric-store": { + "cleanup": { + "mode": "archive", + "interval": "48h", + "directory": "./var/archive" + } + } +} ``` -## Breaking changes for minor release 1.4.x +## Migration notes -- You need to perform a database migration. Depending on your database size the - migration might require several hours! -- You need to adapt the `cluster.json` configuration files in the job-archive, - add new required attributes to the metric list and after that edit - `./job-archive/version.txt` to version 2. Only metrics that have the footprint - attribute set can be filtered and show up in the footprint UI and polar plot. -- Continuous scrolling is default now in all job lists. You can change this back - to paging globally, also every user can configure to use paging or continuous - scrolling individually. -- Tags have a scope now. Existing tags will get global scope in the database - migration. - -## New features - -- Enable to delete tags from the web interface +- Review and update your `config.json` to use kebab-case attribute names +- If using NATS, configure the new `nats` and `api-subjects` sections +- If using S3 archive backend, configure the new `archive` section options +- Test the new public dashboard at `/public` route +- Review cron worker configuration if you need different frequencies ## Known issues diff --git a/cmd/cc-backend/main.go b/cmd/cc-backend/main.go index 0e93944c..9ded95ba 100644 --- a/cmd/cc-backend/main.go +++ b/cmd/cc-backend/main.go @@ -24,6 +24,7 @@ import ( "github.com/ClusterCockpit/cc-backend/internal/auth" "github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-backend/internal/importer" + "github.com/ClusterCockpit/cc-backend/internal/metricdispatch" "github.com/ClusterCockpit/cc-backend/internal/repository" "github.com/ClusterCockpit/cc-backend/internal/tagger" "github.com/ClusterCockpit/cc-backend/internal/taskmanager" @@ -317,6 +318,7 @@ func runServer(ctx context.Context) error { var wg sync.WaitGroup // Initialize metric store if configuration is provided + haveMetricstore := false mscfg := ccconf.GetPackageConfig("metric-store") if mscfg != nil { metricstore.Init(mscfg, &wg) @@ -325,7 +327,26 @@ func runServer(ctx context.Context) error { ms := metricstore.GetMemoryStore() jobRepo := repository.GetJobRepository() ms.SetNodeProvider(jobRepo) + metricstore.MetricStoreHandle = &metricstore.InternalMetricStore{} + haveMetricstore = true } else { + metricstore.MetricStoreHandle = nil + cclog.Debug("missing internal metricstore configuration") + } + + // Initialize external metric stores if configuration is provided + mscfg = ccconf.GetPackageConfig("metric-store-external") + if mscfg != nil { + err := metricdispatch.Init(mscfg) + + if err != nil { + cclog.Debugf("initializing metricdispatch: %v", err) + } else { + haveMetricstore = true + } + } + + if !haveMetricstore { return fmt.Errorf("missing metricstore configuration") } diff --git a/configs/config.json b/configs/config.json index 00cca0af..584baed3 100644 --- a/configs/config.json +++ b/configs/config.json @@ -5,18 +5,13 @@ "https-key-file": "/etc/letsencrypt/live/url/privkey.pem", "user": "clustercockpit", "group": "clustercockpit", - "api-allowed-ips": [ - "*" - ], + "api-allowed-ips": ["*"], "short-running-jobs-duration": 300, "enable-job-taggers": true, "resampling": { "minimum-points": 600, "trigger": 180, - "resolutions": [ - 240, - 60 - ] + "resolutions": [240, 60] }, "api-subjects": { "subject-job-event": "cc.job.event", @@ -50,6 +45,28 @@ "location": "./var/archive" } }, + "metric-store-external": [ + { + "scope": "*", + "url": "http://x.x.x.x:8082", + "token": "MySecret" + }, + { + "scope": "fritz", + "url": "http://x.x.x.x:8084", + "token": "MySecret" + }, + { + "scope": "fritz-spr1tb", + "url": "http://x.x.x.x:8083", + "token": "MySecret" + }, + { + "scope": "alex", + "url": "http://x.x.x.x:8084", + "token": "MySecret" + } + ], "metric-store": { "checkpoints": { "interval": "12h", @@ -74,4 +91,5 @@ ] }, "ui-file": "ui-config.json" -} \ No newline at end of file +} + diff --git a/go.mod b/go.mod index 9e1a5453..da712da9 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,7 @@ tool ( require ( github.com/99designs/gqlgen v0.17.85 - github.com/ClusterCockpit/cc-lib/v2 v2.2.0 + github.com/ClusterCockpit/cc-lib/v2 v2.2.1 github.com/Masterminds/squirrel v1.5.4 github.com/aws/aws-sdk-go-v2 v1.41.1 github.com/aws/aws-sdk-go-v2/config v1.32.6 diff --git a/go.sum b/go.sum index 0cb7e925..898520b5 100644 --- a/go.sum +++ b/go.sum @@ -6,6 +6,8 @@ github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358 h1:mFRzDkZVAjdal+ github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358/go.mod h1:chxPXzSsl7ZWRAuOIE23GDNzjWuZquvFlgA8xmpunjU= github.com/ClusterCockpit/cc-lib/v2 v2.2.0 h1:gqMsh7zsJMUhaXviXzaZ3gqXcLVgerjRJHzIcwX4FmQ= github.com/ClusterCockpit/cc-lib/v2 v2.2.0/go.mod h1:JuxMAuEOaLLNEnnL9U3ejha8kMvsSatLdKPZEgJw6iw= +github.com/ClusterCockpit/cc-lib/v2 v2.2.1 h1:iCVas+Jc61zFH5S2VG3H1sc7tsn+U4lOJwUYjYZEims= +github.com/ClusterCockpit/cc-lib/v2 v2.2.1/go.mod h1:JuxMAuEOaLLNEnnL9U3ejha8kMvsSatLdKPZEgJw6iw= github.com/KyleBanks/depth v1.2.1 h1:5h8fQADFrWtarTdtDudMmGsC7GPbOAu6RVB3ffsVFHc= github.com/KyleBanks/depth v1.2.1/go.mod h1:jzSb9d0L43HxTQfT+oSA1EEp2q+ne2uh6XgeJcm8brE= github.com/Masterminds/squirrel v1.5.4 h1:uUcX/aBc8O7Fg9kaISIUsHXdKuqehiXAMQTYX8afzqM= diff --git a/internal/api/api_test.go b/internal/api/api_test.go index 7153cd1d..4a7fc07c 100644 --- a/internal/api/api_test.go +++ b/internal/api/api_test.go @@ -45,6 +45,13 @@ func setup(t *testing.T) *api.RestAPI { "api-allowed-ips": [ "*" ] + }, + "metric-store": { + "checkpoints": { + "interval": "12h" + }, + "retention-in-memory": "48h", + "memory-cap": 100 }, "archive": { "kind": "file", @@ -143,6 +150,7 @@ func setup(t *testing.T) *api.RestAPI { } ccconf.Init(cfgFilePath) + metricstore.MetricStoreHandle = &metricstore.InternalMetricStore{} // Load and check main configuration if cfg := ccconf.GetPackageConfig("main"); cfg != nil { diff --git a/internal/metricdispatch/configSchema.go b/internal/metricdispatch/configSchema.go new file mode 100644 index 00000000..6dec69bf --- /dev/null +++ b/internal/metricdispatch/configSchema.go @@ -0,0 +1,29 @@ +// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. This file is part of cc-backend. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package metricdispatch + +const configSchema = `{ + "type": "array", + "description": "Array of metric store configurations with scope-based routing.", + "items": { + "type": "object", + "properties": { + "scope": { + "description": "Scope identifier for routing metrics (e.g., cluster name, '*' for default)", + "type": "string" + }, + "url": { + "description": "URL of the metric store endpoint", + "type": "string" + }, + "token": { + "description": "Authentication token for the metric store", + "type": "string" + } + }, + "required": ["scope", "url", "token"] + } +}` diff --git a/internal/metricdispatch/dataLoader.go b/internal/metricdispatch/dataLoader.go index 91f5ecde..43a6d92b 100644 --- a/internal/metricdispatch/dataLoader.go +++ b/internal/metricdispatch/dataLoader.go @@ -44,7 +44,6 @@ import ( "time" "github.com/ClusterCockpit/cc-backend/pkg/archive" - "github.com/ClusterCockpit/cc-backend/pkg/metricstore" cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" "github.com/ClusterCockpit/cc-lib/v2/lrucache" "github.com/ClusterCockpit/cc-lib/v2/resampler" @@ -96,6 +95,13 @@ func LoadData(job *schema.Job, if job.State == schema.JobStateRunning || job.MonitoringStatus == schema.MonitoringStatusRunningOrArchiving { + ms, err := GetMetricDataRepo(job.Cluster, job.SubCluster) + if err != nil { + cclog.Errorf("failed to load job data from metric store for job %d (user: %s, project: %s): %s", + job.JobID, job.User, job.Project, err.Error()) + return err, 0, 0 + } + if scopes == nil { scopes = append(scopes, schema.MetricScopeNode) } @@ -107,7 +113,7 @@ func LoadData(job *schema.Job, } } - jd, err = metricstore.LoadData(job, metrics, scopes, ctx, resolution) + jd, err = ms.LoadData(job, metrics, scopes, ctx, resolution) if err != nil { if len(jd) != 0 { cclog.Warnf("partial error loading metrics from store for job %d (user: %s, project: %s): %s", @@ -236,7 +242,14 @@ func LoadAverages( return archive.LoadAveragesFromArchive(job, metrics, data) // #166 change also here? } - stats, err := metricstore.LoadStats(job, metrics, ctx) + ms, err := GetMetricDataRepo(job.Cluster, job.SubCluster) + if err != nil { + cclog.Errorf("failed to load job data from metric store for job %d (user: %s, project: %s): %s", + job.JobID, job.User, job.Project, err.Error()) + return err + } + + stats, err := ms.LoadStats(job, metrics, ctx) if err != nil { cclog.Errorf("failed to load statistics from metric store for job %d (user: %s, project: %s): %s", job.JobID, job.User, job.Project, err.Error()) @@ -273,13 +286,23 @@ func LoadScopedJobStats( return archive.LoadScopedStatsFromArchive(job, metrics, scopes) } - scopedStats, err := metricstore.LoadScopedStats(job, metrics, scopes, ctx) + ms, err := GetMetricDataRepo(job.Cluster, job.SubCluster) + if err != nil { + cclog.Errorf("failed to load job data from metric store for job %d (user: %s, project: %s): %s", + job.JobID, job.User, job.Project, err.Error()) + return nil, err + } + + scopedStats, err := ms.LoadScopedStats(job, metrics, scopes, ctx) if err != nil { cclog.Errorf("failed to load scoped statistics from metric store for job %d (user: %s, project: %s): %s", job.JobID, job.User, job.Project, err.Error()) return nil, err } + // Round Resulting Stat Values + scopedStats.RoundScopedMetricStats() + return scopedStats, nil } @@ -295,9 +318,16 @@ func LoadJobStats( return archive.LoadStatsFromArchive(job, metrics) } + ms, err := GetMetricDataRepo(job.Cluster, job.SubCluster) + if err != nil { + cclog.Errorf("failed to load job data from metric store for job %d (user: %s, project: %s): %s", + job.JobID, job.User, job.Project, err.Error()) + return nil, err + } + data := make(map[string]schema.MetricStatistics, len(metrics)) - stats, err := metricstore.LoadStats(job, metrics, ctx) + stats, err := ms.LoadStats(job, metrics, ctx) if err != nil { cclog.Errorf("failed to load statistics from metric store for job %d (user: %s, project: %s): %s", job.JobID, job.User, job.Project, err.Error()) @@ -333,6 +363,7 @@ func LoadJobStats( // the metric store (not the archive) since it's for current/recent node status monitoring. // // Returns a nested map structure: node -> metric -> scoped data. +// FIXME: Add support for subcluster specific cc-metric-stores func LoadNodeData( cluster string, metrics, nodes []string, @@ -346,7 +377,14 @@ func LoadNodeData( } } - data, err := metricstore.LoadNodeData(cluster, metrics, nodes, scopes, from, to, ctx) + ms, err := GetMetricDataRepo(cluster, "") + if err != nil { + cclog.Errorf("failed to load node data from metric store: %s", + err.Error()) + return nil, err + } + + data, err := ms.LoadNodeData(cluster, metrics, nodes, scopes, from, to, ctx) if err != nil { if len(data) != 0 { cclog.Warnf("partial error loading node data from metric store for cluster %s: %s", cluster, err.Error()) @@ -383,7 +421,14 @@ func LoadNodeListData( } } - data, err := metricstore.LoadNodeListData(cluster, subCluster, nodes, metrics, scopes, resolution, from, to, ctx) + ms, err := GetMetricDataRepo(cluster, subCluster) + if err != nil { + cclog.Errorf("failed to load node data from metric store: %s", + err.Error()) + return nil, err + } + + data, err := ms.LoadNodeListData(cluster, subCluster, nodes, metrics, scopes, resolution, from, to, ctx) if err != nil { if len(data) != 0 { cclog.Warnf("partial error loading node list data from metric store for cluster %s, subcluster %s: %s", diff --git a/internal/metricdispatch/metricdata.go b/internal/metricdispatch/metricdata.go new file mode 100755 index 00000000..9626ac86 --- /dev/null +++ b/internal/metricdispatch/metricdata.go @@ -0,0 +1,112 @@ +// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. +package metricdispatch + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "time" + + "github.com/ClusterCockpit/cc-backend/internal/config" + ccms "github.com/ClusterCockpit/cc-backend/internal/metricstoreclient" + "github.com/ClusterCockpit/cc-backend/pkg/metricstore" + cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" + "github.com/ClusterCockpit/cc-lib/v2/schema" +) + +type MetricDataRepository interface { + // Return the JobData for the given job, only with the requested metrics. + LoadData(job *schema.Job, + metrics []string, + scopes []schema.MetricScope, + ctx context.Context, + resolution int) (schema.JobData, error) + + // Return a map of metrics to a map of nodes to the metric statistics of the job. node scope only. + LoadStats(job *schema.Job, + metrics []string, + ctx context.Context) (map[string]map[string]schema.MetricStatistics, error) + + // Return a map of metrics to a map of scopes to the scoped metric statistics of the job. + LoadScopedStats(job *schema.Job, + metrics []string, + scopes []schema.MetricScope, + ctx context.Context) (schema.ScopedJobStats, error) + + // Return a map of hosts to a map of metrics at the requested scopes (currently only node) for that node. + LoadNodeData(cluster string, + metrics, nodes []string, + scopes []schema.MetricScope, + from, to time.Time, + ctx context.Context) (map[string]map[string][]*schema.JobMetric, error) + + // Return a map of hosts to a map of metrics to a map of scopes for multiple nodes. + LoadNodeListData(cluster, subCluster string, + nodes []string, + metrics []string, + scopes []schema.MetricScope, + resolution int, + from, to time.Time, + ctx context.Context) (map[string]schema.JobData, error) +} + +type CCMetricStoreConfig struct { + Scope string `json:"scope"` + URL string `json:"url"` + Token string `json:"token"` +} + +var metricDataRepos map[string]MetricDataRepository = map[string]MetricDataRepository{} + +func Init(rawConfig json.RawMessage) error { + if rawConfig != nil { + var configs []CCMetricStoreConfig + config.Validate(configSchema, rawConfig) + dec := json.NewDecoder(bytes.NewReader(rawConfig)) + dec.DisallowUnknownFields() + if err := dec.Decode(&configs); err != nil { + return fmt.Errorf("[METRICDISPATCH]> Metric Store Config Init: Could not decode config file '%s' Error: %s", rawConfig, err.Error()) + } + + if len(configs) == 0 { + return fmt.Errorf("[METRICDISPATCH]> No metric store configurations found in config file") + } + + for _, config := range configs { + metricDataRepos[config.Scope] = ccms.NewCCMetricStore(config.URL, config.Token) + } + } + + return nil +} + +func GetMetricDataRepo(cluster string, subcluster string) (MetricDataRepository, error) { + var repo MetricDataRepository + var ok bool + + key := cluster + "-" + subcluster + repo, ok = metricDataRepos[key] + + if !ok { + repo, ok = metricDataRepos[cluster] + + if !ok { + repo, ok = metricDataRepos["*"] + + if !ok { + if metricstore.MetricStoreHandle == nil { + return nil, fmt.Errorf("[METRICDISPATCH]> no metric data repository configured '%s'", key) + } + + repo = metricstore.MetricStoreHandle + cclog.Debugf("[METRICDISPATCH]> Using internal metric data repository for '%s'", key) + } + } + } + + return repo, nil +} diff --git a/internal/metricstoreclient/cc-metric-store-queries.go b/internal/metricstoreclient/cc-metric-store-queries.go new file mode 100644 index 00000000..338d7028 --- /dev/null +++ b/internal/metricstoreclient/cc-metric-store-queries.go @@ -0,0 +1,507 @@ +// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +// Package metricstoreclient - Query Building +// +// This file contains the query construction and scope transformation logic for cc-metric-store queries. +// It handles the complex mapping between requested metric scopes and native hardware topology, +// automatically aggregating or filtering metrics as needed. +// +// # Scope Transformations +// +// The buildScopeQueries function implements the core scope transformation algorithm. +// It handles 25+ different transformation cases, mapping between: +// - Accelerator (GPU) scope +// - HWThread (hardware thread/SMT) scope +// - Core (CPU core) scope +// - Socket (CPU package) scope +// - MemoryDomain (NUMA domain) scope +// - Node (full system) scope +// +// Transformations follow these rules: +// - Same scope: Return data as-is (e.g., Core → Core) +// - Coarser scope: Aggregate data (e.g., Core → Socket with Aggregate=true) +// - Finer scope: Error - cannot increase granularity +// +// # Query Building +// +// buildQueries and buildNodeQueries are the main entry points, handling job-specific +// and node-specific query construction respectively. They: +// - Validate metric configurations +// - Handle subcluster-specific metric filtering +// - Detect and skip duplicate scope requests +// - Call buildScopeQueries for each metric/scope/host combination +package metricstoreclient + +import ( + "fmt" + "strconv" + + "github.com/ClusterCockpit/cc-backend/pkg/archive" + cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" + "github.com/ClusterCockpit/cc-lib/v2/schema" +) + +// Scope string constants used in API queries. +// Pre-converted to avoid repeated allocations during query building. +var ( + hwthreadString = string(schema.MetricScopeHWThread) + coreString = string(schema.MetricScopeCore) + memoryDomainString = string(schema.MetricScopeMemoryDomain) + socketString = string(schema.MetricScopeSocket) + acceleratorString = string(schema.MetricScopeAccelerator) +) + +// buildQueries constructs API queries for job-specific metric data. +// It iterates through metrics, scopes, and job resources to build the complete query set. +// +// The function handles: +// - Metric configuration validation and subcluster filtering +// - Scope deduplication to avoid redundant queries +// - Hardware thread list resolution (job-allocated vs full node) +// - Delegation to buildScopeQueries for scope transformations +// +// Returns queries and their corresponding assigned scopes (which may differ from requested scopes). +func (ccms *CCMetricStore) buildQueries( + job *schema.Job, + metrics []string, + scopes []schema.MetricScope, + resolution int, +) ([]APIQuery, []schema.MetricScope, error) { + queries := make([]APIQuery, 0, len(metrics)*len(scopes)*len(job.Resources)) + assignedScope := []schema.MetricScope{} + + subcluster, scerr := archive.GetSubCluster(job.Cluster, job.SubCluster) + if scerr != nil { + return nil, nil, scerr + } + topology := subcluster.Topology + + for _, metric := range metrics { + remoteName := metric + mc := archive.GetMetricConfig(job.Cluster, metric) + if mc == nil { + cclog.Warnf("metric '%s' is not specified for cluster '%s' - skipping", metric, job.Cluster) + continue + } + + // Skip if metric is removed for subcluster + if len(mc.SubClusters) != 0 { + isRemoved := false + for _, scConfig := range mc.SubClusters { + if scConfig.Name == job.SubCluster && scConfig.Remove { + isRemoved = true + break + } + } + if isRemoved { + continue + } + } + + // Avoid duplicates... + handledScopes := make([]schema.MetricScope, 0, 3) + + scopesLoop: + for _, requestedScope := range scopes { + nativeScope := mc.Scope + if nativeScope == schema.MetricScopeAccelerator && job.NumAcc == 0 { + continue + } + + scope := nativeScope.Max(requestedScope) + for _, s := range handledScopes { + if scope == s { + continue scopesLoop + } + } + handledScopes = append(handledScopes, scope) + + for _, host := range job.Resources { + hwthreads := host.HWThreads + if hwthreads == nil { + hwthreads = topology.Node + } + + hostQueries, hostScopes := buildScopeQueries( + nativeScope, requestedScope, + remoteName, host.Hostname, + &topology, hwthreads, host.Accelerators, + resolution, + ) + + if len(hostQueries) == 0 && len(hostScopes) == 0 { + return nil, nil, fmt.Errorf("METRICDATA/CCMS > TODO: unhandled case: native-scope=%s, requested-scope=%s", nativeScope, requestedScope) + } + + queries = append(queries, hostQueries...) + assignedScope = append(assignedScope, hostScopes...) + } + } + } + + return queries, assignedScope, nil +} + +// buildNodeQueries constructs API queries for node-specific metric data (Systems View). +// Similar to buildQueries but uses full node topology instead of job-allocated resources. +// +// The function handles: +// - Subcluster topology resolution (either pre-loaded or per-node lookup) +// - Full node hardware thread lists (not job-specific subsets) +// - All accelerators on each node +// - Metric configuration validation with subcluster filtering +// +// Returns queries and their corresponding assigned scopes. +func (ccms *CCMetricStore) buildNodeQueries( + cluster string, + subCluster string, + nodes []string, + metrics []string, + scopes []schema.MetricScope, + resolution int, +) ([]APIQuery, []schema.MetricScope, error) { + queries := make([]APIQuery, 0, len(metrics)*len(scopes)*len(nodes)) + assignedScope := []schema.MetricScope{} + + // Get Topol before loop if subCluster given + var subClusterTopol *schema.SubCluster + var scterr error + if subCluster != "" { + subClusterTopol, scterr = archive.GetSubCluster(cluster, subCluster) + if scterr != nil { + cclog.Errorf("could not load cluster %s subCluster %s topology: %s", cluster, subCluster, scterr.Error()) + return nil, nil, scterr + } + } + + for _, metric := range metrics { + remoteName := metric + mc := archive.GetMetricConfig(cluster, metric) + if mc == nil { + cclog.Warnf("metric '%s' is not specified for cluster '%s'", metric, cluster) + continue + } + + // Skip if metric is removed for subcluster + if mc.SubClusters != nil { + isRemoved := false + for _, scConfig := range mc.SubClusters { + if scConfig.Name == subCluster && scConfig.Remove { + isRemoved = true + break + } + } + if isRemoved { + continue + } + } + + // Avoid duplicates... + handledScopes := make([]schema.MetricScope, 0, 3) + + scopesLoop: + for _, requestedScope := range scopes { + nativeScope := mc.Scope + + scope := nativeScope.Max(requestedScope) + for _, s := range handledScopes { + if scope == s { + continue scopesLoop + } + } + handledScopes = append(handledScopes, scope) + + for _, hostname := range nodes { + + // If no subCluster given, get it by node + if subCluster == "" { + subClusterName, scnerr := archive.GetSubClusterByNode(cluster, hostname) + if scnerr != nil { + return nil, nil, scnerr + } + subClusterTopol, scterr = archive.GetSubCluster(cluster, subClusterName) + if scterr != nil { + return nil, nil, scterr + } + } + + // Always full node hwthread id list, no partial queries expected -> Use "topology.Node" directly where applicable + // Always full accelerator id list, no partial queries expected -> Use "acceleratorIds" directly where applicable + topology := subClusterTopol.Topology + acceleratorIds := topology.GetAcceleratorIDs() + + // Moved check here if metric matches hardware specs + if nativeScope == schema.MetricScopeAccelerator && len(acceleratorIds) == 0 { + continue scopesLoop + } + + nodeQueries, nodeScopes := buildScopeQueries( + nativeScope, requestedScope, + remoteName, hostname, + &topology, topology.Node, acceleratorIds, + resolution, + ) + + if len(nodeQueries) == 0 && len(nodeScopes) == 0 { + return nil, nil, fmt.Errorf("METRICDATA/CCMS > TODO: unhandled case: native-scope=%s, requested-scope=%s", nativeScope, requestedScope) + } + + queries = append(queries, nodeQueries...) + assignedScope = append(assignedScope, nodeScopes...) + } + } + } + + return queries, assignedScope, nil +} + +// buildScopeQueries generates API queries for a given scope transformation. +// It returns a slice of queries and corresponding assigned scopes. +// Some transformations (e.g., HWThread -> Core/Socket) may generate multiple queries. +func buildScopeQueries( + nativeScope, requestedScope schema.MetricScope, + metric, hostname string, + topology *schema.Topology, + hwthreads []int, + accelerators []string, + resolution int, +) ([]APIQuery, []schema.MetricScope) { + scope := nativeScope.Max(requestedScope) + queries := []APIQuery{} + scopes := []schema.MetricScope{} + + hwthreadsStr := intToStringSlice(hwthreads) + + // Accelerator -> Accelerator (Use "accelerator" scope if requested scope is lower than node) + if nativeScope == schema.MetricScopeAccelerator && scope.LT(schema.MetricScopeNode) { + if scope != schema.MetricScopeAccelerator { + // Skip all other caught cases + return queries, scopes + } + + queries = append(queries, APIQuery{ + Metric: metric, + Hostname: hostname, + Aggregate: false, + Type: &acceleratorString, + TypeIds: accelerators, + Resolution: resolution, + }) + scopes = append(scopes, schema.MetricScopeAccelerator) + return queries, scopes + } + + // Accelerator -> Node + if nativeScope == schema.MetricScopeAccelerator && scope == schema.MetricScopeNode { + if len(accelerators) == 0 { + return queries, scopes + } + + queries = append(queries, APIQuery{ + Metric: metric, + Hostname: hostname, + Aggregate: true, + Type: &acceleratorString, + TypeIds: accelerators, + Resolution: resolution, + }) + scopes = append(scopes, scope) + return queries, scopes + } + + // HWThread -> HWThread + if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeHWThread { + queries = append(queries, APIQuery{ + Metric: metric, + Hostname: hostname, + Aggregate: false, + Type: &hwthreadString, + TypeIds: hwthreadsStr, + Resolution: resolution, + }) + scopes = append(scopes, scope) + return queries, scopes + } + + // HWThread -> Core + if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeCore { + cores, _ := topology.GetCoresFromHWThreads(hwthreads) + for _, core := range cores { + queries = append(queries, APIQuery{ + Metric: metric, + Hostname: hostname, + Aggregate: true, + Type: &hwthreadString, + TypeIds: intToStringSlice(topology.Core[core]), + Resolution: resolution, + }) + scopes = append(scopes, scope) + } + return queries, scopes + } + + // HWThread -> Socket + if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeSocket { + sockets, _ := topology.GetSocketsFromHWThreads(hwthreads) + for _, socket := range sockets { + queries = append(queries, APIQuery{ + Metric: metric, + Hostname: hostname, + Aggregate: true, + Type: &hwthreadString, + TypeIds: intToStringSlice(topology.Socket[socket]), + Resolution: resolution, + }) + scopes = append(scopes, scope) + } + return queries, scopes + } + + // HWThread -> Node + if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeNode { + queries = append(queries, APIQuery{ + Metric: metric, + Hostname: hostname, + Aggregate: true, + Type: &hwthreadString, + TypeIds: hwthreadsStr, + Resolution: resolution, + }) + scopes = append(scopes, scope) + return queries, scopes + } + + // Core -> Core + if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeCore { + cores, _ := topology.GetCoresFromHWThreads(hwthreads) + queries = append(queries, APIQuery{ + Metric: metric, + Hostname: hostname, + Aggregate: false, + Type: &coreString, + TypeIds: intToStringSlice(cores), + Resolution: resolution, + }) + scopes = append(scopes, scope) + return queries, scopes + } + + // Core -> Socket + if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeSocket { + sockets, _ := topology.GetSocketsFromCores(hwthreads) + for _, socket := range sockets { + queries = append(queries, APIQuery{ + Metric: metric, + Hostname: hostname, + Aggregate: true, + Type: &coreString, + TypeIds: intToStringSlice(topology.Socket[socket]), + Resolution: resolution, + }) + scopes = append(scopes, scope) + } + return queries, scopes + } + + // Core -> Node + if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeNode { + cores, _ := topology.GetCoresFromHWThreads(hwthreads) + queries = append(queries, APIQuery{ + Metric: metric, + Hostname: hostname, + Aggregate: true, + Type: &coreString, + TypeIds: intToStringSlice(cores), + Resolution: resolution, + }) + scopes = append(scopes, scope) + return queries, scopes + } + + // MemoryDomain -> MemoryDomain + if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeMemoryDomain { + memDomains, _ := topology.GetMemoryDomainsFromHWThreads(hwthreads) + queries = append(queries, APIQuery{ + Metric: metric, + Hostname: hostname, + Aggregate: false, + Type: &memoryDomainString, + TypeIds: intToStringSlice(memDomains), + Resolution: resolution, + }) + scopes = append(scopes, scope) + return queries, scopes + } + + // MemoryDomain -> Node + if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeNode { + memDomains, _ := topology.GetMemoryDomainsFromHWThreads(hwthreads) + queries = append(queries, APIQuery{ + Metric: metric, + Hostname: hostname, + Aggregate: true, + Type: &memoryDomainString, + TypeIds: intToStringSlice(memDomains), + Resolution: resolution, + }) + scopes = append(scopes, scope) + return queries, scopes + } + + // Socket -> Socket + if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeSocket { + sockets, _ := topology.GetSocketsFromHWThreads(hwthreads) + queries = append(queries, APIQuery{ + Metric: metric, + Hostname: hostname, + Aggregate: false, + Type: &socketString, + TypeIds: intToStringSlice(sockets), + Resolution: resolution, + }) + scopes = append(scopes, scope) + return queries, scopes + } + + // Socket -> Node + if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeNode { + sockets, _ := topology.GetSocketsFromHWThreads(hwthreads) + queries = append(queries, APIQuery{ + Metric: metric, + Hostname: hostname, + Aggregate: true, + Type: &socketString, + TypeIds: intToStringSlice(sockets), + Resolution: resolution, + }) + scopes = append(scopes, scope) + return queries, scopes + } + + // Node -> Node + if nativeScope == schema.MetricScopeNode && scope == schema.MetricScopeNode { + queries = append(queries, APIQuery{ + Metric: metric, + Hostname: hostname, + Resolution: resolution, + }) + scopes = append(scopes, scope) + return queries, scopes + } + + // Unhandled case - return empty slices + return queries, scopes +} + +// intToStringSlice converts a slice of integers to a slice of strings. +// Used to convert hardware IDs (core IDs, socket IDs, etc.) to the string format required by the API. +func intToStringSlice(is []int) []string { + ss := make([]string, len(is)) + for i, x := range is { + ss[i] = strconv.Itoa(x) + } + return ss +} diff --git a/internal/metricstoreclient/cc-metric-store.go b/internal/metricstoreclient/cc-metric-store.go new file mode 100644 index 00000000..ded644ea --- /dev/null +++ b/internal/metricstoreclient/cc-metric-store.go @@ -0,0 +1,669 @@ +// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +// Package metricstoreclient provides a client for querying the cc-metric-store time series database. +// +// The cc-metric-store is a high-performance time series database optimized for HPC metric data. +// This client handles HTTP communication, query construction, scope transformations, and data retrieval +// for job and node metrics across different metric scopes (node, socket, core, hwthread, accelerator). +// +// # Architecture +// +// The package is split into two main components: +// - Client Operations (cc-metric-store.go): HTTP client, request handling, data loading methods +// - Query Building (cc-metric-store-queries.go): Query construction and scope transformation logic +// +// # Basic Usage +// +// store := NewCCMetricStore("http://localhost:8080", "jwt-token") +// +// // Load job data +// jobData, err := store.LoadData(job, metrics, scopes, ctx, resolution) +// if err != nil { +// log.Fatal(err) +// } +// +// # Metric Scopes +// +// The client supports hierarchical metric scopes that map to HPC hardware topology: +// - MetricScopeAccelerator: GPU/accelerator level metrics +// - MetricScopeHWThread: Hardware thread (SMT) level metrics +// - MetricScopeCore: CPU core level metrics +// - MetricScopeSocket: CPU socket level metrics +// - MetricScopeMemoryDomain: NUMA domain level metrics +// - MetricScopeNode: Full node level metrics +// +// The client automatically handles scope transformations, aggregating finer-grained metrics +// to coarser scopes when needed (e.g., aggregating core metrics to socket level). +// +// # Error Handling +// +// The client supports partial errors - if some queries fail, it returns both the successful +// data and an error listing the failed queries. This allows processing partial results +// when some nodes or metrics are temporarily unavailable. +// +// # API Versioning +// +// The client uses cc-metric-store API v2, which includes support for: +// - Data resampling for bandwidth optimization +// - Multi-scope queries in a single request +// - Aggregation across hardware topology levels +package metricstoreclient + +import ( + "bufio" + "bytes" + "context" + "encoding/json" + "fmt" + "net/http" + "strings" + "time" + + "github.com/ClusterCockpit/cc-backend/pkg/archive" + cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" + "github.com/ClusterCockpit/cc-lib/v2/schema" +) + +// CCMetricStore is the HTTP client for communicating with cc-metric-store. +// It manages connection details, authentication, and provides methods for querying metrics. +type CCMetricStore struct { + client http.Client // HTTP client with 10-second timeout + jwt string // JWT Bearer token for authentication + url string // Base URL of cc-metric-store instance + queryEndpoint string // Full URL to query API endpoint +} + +// APIQueryRequest represents a request to the cc-metric-store query API. +// It supports both explicit queries and "for-all-nodes" bulk queries. +type APIQueryRequest struct { + Cluster string `json:"cluster"` // Target cluster name + Queries []APIQuery `json:"queries"` // Explicit list of metric queries + ForAllNodes []string `json:"for-all-nodes"` // Metrics to query for all nodes + From int64 `json:"from"` // Start time (Unix timestamp) + To int64 `json:"to"` // End time (Unix timestamp) + WithStats bool `json:"with-stats"` // Include min/avg/max statistics + WithData bool `json:"with-data"` // Include time series data points +} + +// APIQuery specifies a single metric query with optional scope filtering. +// Type and TypeIds define the hardware scope (core, socket, accelerator, etc.). +type APIQuery struct { + Type *string `json:"type,omitempty"` // Scope type (e.g., "core", "socket") + SubType *string `json:"subtype,omitempty"` // Sub-scope type (reserved for future use) + Metric string `json:"metric"` // Metric name + Hostname string `json:"host"` // Target hostname + Resolution int `json:"resolution"` // Data resolution in seconds (0 = native) + TypeIds []string `json:"type-ids,omitempty"` // IDs for the scope type (e.g., core IDs) + SubTypeIds []string `json:"subtype-ids,omitempty"` // IDs for sub-scope (reserved) + Aggregate bool `json:"aggreg"` // Aggregate across TypeIds +} + +// APIQueryResponse contains the results from a cc-metric-store query. +// Results align with the Queries slice by index. +type APIQueryResponse struct { + Queries []APIQuery `json:"queries,omitempty"` // Echoed queries (for bulk requests) + Results [][]APIMetricData `json:"results"` // Result data, indexed by query +} + +// APIMetricData represents time series data and statistics for a single metric series. +// Error is set if this particular series failed to load. +type APIMetricData struct { + Error *string `json:"error"` // Error message if query failed + Data []schema.Float `json:"data"` // Time series data points + From int64 `json:"from"` // Actual start time of data + To int64 `json:"to"` // Actual end time of data + Resolution int `json:"resolution"` // Actual resolution of data in seconds + Avg schema.Float `json:"avg"` // Average value across time range + Min schema.Float `json:"min"` // Minimum value in time range + Max schema.Float `json:"max"` // Maximum value in time range +} + +// NewCCMetricStore creates and initializes a new CCMetricStore client. +// The url parameter should include the protocol and port (e.g., "http://localhost:8080"). +// The token parameter is a JWT used for Bearer authentication; pass empty string if auth is disabled. +func NewCCMetricStore(url string, token string) *CCMetricStore { + return &CCMetricStore{ + url: url, + queryEndpoint: fmt.Sprintf("%s/api/query", url), + jwt: token, + client: http.Client{ + Timeout: 10 * time.Second, + }, + } +} + +// doRequest executes an HTTP POST request to the cc-metric-store query API. +// It handles JSON encoding/decoding, authentication, and API versioning. +// The request body is automatically closed to prevent resource leaks. +func (ccms *CCMetricStore) doRequest( + ctx context.Context, + body *APIQueryRequest, +) (*APIQueryResponse, error) { + buf := &bytes.Buffer{} + if err := json.NewEncoder(buf).Encode(body); err != nil { + cclog.Errorf("Error while encoding request body: %s", err.Error()) + return nil, err + } + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, ccms.queryEndpoint, buf) + if err != nil { + cclog.Errorf("Error while building request body: %s", err.Error()) + return nil, err + } + if ccms.jwt != "" { + req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", ccms.jwt)) + } + + // versioning the cc-metric-store query API. + // v2 = data with resampling + // v1 = data without resampling + q := req.URL.Query() + q.Add("version", "v2") + req.URL.RawQuery = q.Encode() + + res, err := ccms.client.Do(req) + if err != nil { + cclog.Errorf("Error while performing request: %s", err.Error()) + return nil, err + } + defer res.Body.Close() + + if res.StatusCode != http.StatusOK { + return nil, fmt.Errorf("'%s': HTTP Status: %s", ccms.queryEndpoint, res.Status) + } + + var resBody APIQueryResponse + if err := json.NewDecoder(bufio.NewReader(res.Body)).Decode(&resBody); err != nil { + cclog.Errorf("Error while decoding result body: %s", err.Error()) + return nil, err + } + + return &resBody, nil +} + +// LoadData retrieves time series data and statistics for the specified job and metrics. +// It queries data for the job's time range and resources, handling scope transformations automatically. +// +// Parameters: +// - job: Job metadata including cluster, time range, and allocated resources +// - metrics: List of metric names to retrieve +// - scopes: Requested metric scopes (node, socket, core, etc.) +// - ctx: Context for cancellation and timeouts +// - resolution: Data resolution in seconds (0 for native resolution) +// +// Returns JobData organized as: metric -> scope -> series list. +// Supports partial errors: returns available data even if some queries fail. +func (ccms *CCMetricStore) LoadData( + job *schema.Job, + metrics []string, + scopes []schema.MetricScope, + ctx context.Context, + resolution int, +) (schema.JobData, error) { + queries, assignedScope, err := ccms.buildQueries(job, metrics, scopes, resolution) + if err != nil { + cclog.Errorf("Error while building queries for jobId %d, Metrics %v, Scopes %v: %s", job.JobID, metrics, scopes, err.Error()) + return nil, err + } + + req := APIQueryRequest{ + Cluster: job.Cluster, + From: job.StartTime, + To: job.StartTime + int64(job.Duration), + Queries: queries, + WithStats: true, + WithData: true, + } + + resBody, err := ccms.doRequest(ctx, &req) + if err != nil { + cclog.Errorf("Error while performing request for job %d: %s", job.JobID, err.Error()) + return nil, err + } + + var errors []string + jobData := make(schema.JobData) + for i, row := range resBody.Results { + query := req.Queries[i] + metric := query.Metric + scope := assignedScope[i] + mc := archive.GetMetricConfig(job.Cluster, metric) + if _, ok := jobData[metric]; !ok { + jobData[metric] = make(map[schema.MetricScope]*schema.JobMetric) + } + + res := mc.Timestep + if len(row) > 0 { + res = row[0].Resolution + } + + jobMetric, ok := jobData[metric][scope] + if !ok { + jobMetric = &schema.JobMetric{ + Unit: mc.Unit, + Timestep: res, + Series: make([]schema.Series, 0), + } + jobData[metric][scope] = jobMetric + } + + for ndx, res := range row { + if res.Error != nil { + /* Build list for "partial errors", if any */ + errors = append(errors, fmt.Sprintf("failed to fetch '%s' from host '%s': %s", query.Metric, query.Hostname, *res.Error)) + continue + } + + id := (*string)(nil) + if query.Type != nil { + id = new(string) + *id = query.TypeIds[ndx] + } + + sanitizeStats(&res.Avg, &res.Min, &res.Max) + + jobMetric.Series = append(jobMetric.Series, schema.Series{ + Hostname: query.Hostname, + Id: id, + Statistics: schema.MetricStatistics{ + Avg: float64(res.Avg), + Min: float64(res.Min), + Max: float64(res.Max), + }, + Data: res.Data, + }) + } + + // So that one can later check len(jobData): + if len(jobMetric.Series) == 0 { + delete(jobData[metric], scope) + if len(jobData[metric]) == 0 { + delete(jobData, metric) + } + } + } + + if len(errors) != 0 { + /* Returns list for "partial errors" */ + return jobData, fmt.Errorf("METRICDATA/CCMS > Errors: %s", strings.Join(errors, ", ")) + } + return jobData, nil +} + +// LoadStats retrieves min/avg/max statistics for job metrics at node scope. +// This is faster than LoadData when only statistical summaries are needed (no time series data). +// +// Returns statistics organized as: metric -> hostname -> statistics. +func (ccms *CCMetricStore) LoadStats( + job *schema.Job, + metrics []string, + ctx context.Context, +) (map[string]map[string]schema.MetricStatistics, error) { + queries, _, err := ccms.buildQueries(job, metrics, []schema.MetricScope{schema.MetricScopeNode}, 0) // #166 Add scope shere for analysis view accelerator normalization? + if err != nil { + cclog.Errorf("Error while building queries for jobId %d, Metrics %v: %s", job.JobID, metrics, err.Error()) + return nil, err + } + + req := APIQueryRequest{ + Cluster: job.Cluster, + From: job.StartTime, + To: job.StartTime + int64(job.Duration), + Queries: queries, + WithStats: true, + WithData: false, + } + + resBody, err := ccms.doRequest(ctx, &req) + if err != nil { + cclog.Errorf("Error while performing request for job %d: %s", job.JobID, err.Error()) + return nil, err + } + + stats := make(map[string]map[string]schema.MetricStatistics, len(metrics)) + for i, res := range resBody.Results { + query := req.Queries[i] + metric := query.Metric + data := res[0] + if data.Error != nil { + cclog.Errorf("fetching %s for node %s failed: %s", metric, query.Hostname, *data.Error) + continue + } + + metricdata, ok := stats[metric] + if !ok { + metricdata = make(map[string]schema.MetricStatistics, job.NumNodes) + stats[metric] = metricdata + } + + if hasNaNStats(data.Avg, data.Min, data.Max) { + cclog.Warnf("fetching %s for node %s failed: one of avg/min/max is NaN", metric, query.Hostname) + continue + } + + metricdata[query.Hostname] = schema.MetricStatistics{ + Avg: float64(data.Avg), + Min: float64(data.Min), + Max: float64(data.Max), + } + } + + return stats, nil +} + +// LoadScopedStats retrieves statistics for job metrics across multiple scopes. +// Used for the Job-View Statistics Table to display per-scope breakdowns. +// +// Returns statistics organized as: metric -> scope -> list of scoped statistics. +// Each scoped statistic includes hostname, hardware ID (if applicable), and min/avg/max values. +func (ccms *CCMetricStore) LoadScopedStats( + job *schema.Job, + metrics []string, + scopes []schema.MetricScope, + ctx context.Context, +) (schema.ScopedJobStats, error) { + queries, assignedScope, err := ccms.buildQueries(job, metrics, scopes, 0) + if err != nil { + cclog.Errorf("Error while building queries for jobId %d, Metrics %v, Scopes %v: %s", job.JobID, metrics, scopes, err.Error()) + return nil, err + } + + req := APIQueryRequest{ + Cluster: job.Cluster, + From: job.StartTime, + To: job.StartTime + int64(job.Duration), + Queries: queries, + WithStats: true, + WithData: false, + } + + resBody, err := ccms.doRequest(ctx, &req) + if err != nil { + cclog.Errorf("Error while performing request for job %d: %s", job.JobID, err.Error()) + return nil, err + } + + var errors []string + scopedJobStats := make(schema.ScopedJobStats) + + for i, row := range resBody.Results { + query := req.Queries[i] + metric := query.Metric + scope := assignedScope[i] + + if _, ok := scopedJobStats[metric]; !ok { + scopedJobStats[metric] = make(map[schema.MetricScope][]*schema.ScopedStats) + } + + if _, ok := scopedJobStats[metric][scope]; !ok { + scopedJobStats[metric][scope] = make([]*schema.ScopedStats, 0) + } + + for ndx, res := range row { + if res.Error != nil { + /* Build list for "partial errors", if any */ + errors = append(errors, fmt.Sprintf("failed to fetch '%s' from host '%s': %s", query.Metric, query.Hostname, *res.Error)) + continue + } + + id := (*string)(nil) + if query.Type != nil { + id = new(string) + *id = query.TypeIds[ndx] + } + + sanitizeStats(&res.Avg, &res.Min, &res.Max) + + scopedJobStats[metric][scope] = append(scopedJobStats[metric][scope], &schema.ScopedStats{ + Hostname: query.Hostname, + Id: id, + Data: &schema.MetricStatistics{ + Avg: float64(res.Avg), + Min: float64(res.Min), + Max: float64(res.Max), + }, + }) + } + + // So that one can later check len(scopedJobStats[metric][scope]): Remove from map if empty + if len(scopedJobStats[metric][scope]) == 0 { + delete(scopedJobStats[metric], scope) + if len(scopedJobStats[metric]) == 0 { + delete(scopedJobStats, metric) + } + } + } + + if len(errors) != 0 { + /* Returns list for "partial errors" */ + return scopedJobStats, fmt.Errorf("METRICDATA/CCMS > Errors: %s", strings.Join(errors, ", ")) + } + return scopedJobStats, nil +} + +// LoadNodeData retrieves current metric data for specified nodes in a cluster. +// Used for the Systems-View Node-Overview to display real-time node status. +// +// If nodes is nil, queries all metrics for all nodes in the cluster (bulk query). +// Returns data organized as: hostname -> metric -> list of JobMetric (with time series and stats). +func (ccms *CCMetricStore) LoadNodeData( + cluster string, + metrics, nodes []string, + scopes []schema.MetricScope, + from, to time.Time, + ctx context.Context, +) (map[string]map[string][]*schema.JobMetric, error) { + req := APIQueryRequest{ + Cluster: cluster, + From: from.Unix(), + To: to.Unix(), + WithStats: true, + WithData: true, + } + + if nodes == nil { + req.ForAllNodes = append(req.ForAllNodes, metrics...) + } else { + for _, node := range nodes { + for _, metric := range metrics { + req.Queries = append(req.Queries, APIQuery{ + Hostname: node, + Metric: metric, + Resolution: 0, // Default for Node Queries: Will return metric $Timestep Resolution + }) + } + } + } + + resBody, err := ccms.doRequest(ctx, &req) + if err != nil { + cclog.Errorf("Error while performing request for cluster %s: %s", cluster, err.Error()) + return nil, err + } + + var errors []string + data := make(map[string]map[string][]*schema.JobMetric) + for i, res := range resBody.Results { + var query APIQuery + if resBody.Queries != nil { + query = resBody.Queries[i] + } else { + query = req.Queries[i] + } + + metric := query.Metric + qdata := res[0] + if qdata.Error != nil { + /* Build list for "partial errors", if any */ + errors = append(errors, fmt.Sprintf("fetching %s for node %s failed: %s", metric, query.Hostname, *qdata.Error)) + } + + sanitizeStats(&qdata.Avg, &qdata.Min, &qdata.Max) + + hostdata, ok := data[query.Hostname] + if !ok { + hostdata = make(map[string][]*schema.JobMetric) + data[query.Hostname] = hostdata + } + + mc := archive.GetMetricConfig(cluster, metric) + hostdata[metric] = append(hostdata[metric], &schema.JobMetric{ + Unit: mc.Unit, + Timestep: mc.Timestep, + Series: []schema.Series{ + { + Hostname: query.Hostname, + Data: qdata.Data, + Statistics: schema.MetricStatistics{ + Avg: float64(qdata.Avg), + Min: float64(qdata.Min), + Max: float64(qdata.Max), + }, + }, + }, + }) + } + + if len(errors) != 0 { + /* Returns list of "partial errors" */ + return data, fmt.Errorf("METRICDATA/CCMS > Errors: %s", strings.Join(errors, ", ")) + } + + return data, nil +} + +// LoadNodeListData retrieves paginated node metrics for the Systems-View Node-List. +// +// Supports filtering by subcluster and node name pattern. The nodeFilter performs +// substring matching on hostnames. +// +// Returns: +// - Node data organized as: hostname -> JobData (metric -> scope -> series) +// - Total node count (before pagination) +// - HasNextPage flag indicating if more pages are available +// - Error (may be partial error with some data returned) +func (ccms *CCMetricStore) LoadNodeListData( + cluster, subCluster string, + nodes []string, + metrics []string, + scopes []schema.MetricScope, + resolution int, + from, to time.Time, + ctx context.Context, +) (map[string]schema.JobData, error) { + queries, assignedScope, err := ccms.buildNodeQueries(cluster, subCluster, nodes, metrics, scopes, resolution) + if err != nil { + cclog.Errorf("Error while building node queries for Cluster %s, SubCLuster %s, Metrics %v, Scopes %v: %s", cluster, subCluster, metrics, scopes, err.Error()) + return nil, err + } + + req := APIQueryRequest{ + Cluster: cluster, + Queries: queries, + From: from.Unix(), + To: to.Unix(), + WithStats: true, + WithData: true, + } + + resBody, err := ccms.doRequest(ctx, &req) + if err != nil { + cclog.Errorf("Error while performing request for cluster %s: %s", cluster, err.Error()) + return nil, err + } + + var errors []string + data := make(map[string]schema.JobData) + for i, row := range resBody.Results { + var query APIQuery + if resBody.Queries != nil { + query = resBody.Queries[i] + } else { + query = req.Queries[i] + } + // qdata := res[0] + metric := query.Metric + scope := assignedScope[i] + mc := archive.GetMetricConfig(cluster, metric) + + res := mc.Timestep + if len(row) > 0 { + res = row[0].Resolution + } + + // Init Nested Map Data Structures If Not Found + hostData, ok := data[query.Hostname] + if !ok { + hostData = make(schema.JobData) + data[query.Hostname] = hostData + } + + metricData, ok := hostData[metric] + if !ok { + metricData = make(map[schema.MetricScope]*schema.JobMetric) + data[query.Hostname][metric] = metricData + } + + scopeData, ok := metricData[scope] + if !ok { + scopeData = &schema.JobMetric{ + Unit: mc.Unit, + Timestep: res, + Series: make([]schema.Series, 0), + } + data[query.Hostname][metric][scope] = scopeData + } + + for ndx, res := range row { + if res.Error != nil { + /* Build list for "partial errors", if any */ + errors = append(errors, fmt.Sprintf("failed to fetch '%s' from host '%s': %s", query.Metric, query.Hostname, *res.Error)) + continue + } + + id := (*string)(nil) + if query.Type != nil { + id = new(string) + *id = query.TypeIds[ndx] + } + + sanitizeStats(&res.Avg, &res.Min, &res.Max) + + scopeData.Series = append(scopeData.Series, schema.Series{ + Hostname: query.Hostname, + Id: id, + Statistics: schema.MetricStatistics{ + Avg: float64(res.Avg), + Min: float64(res.Min), + Max: float64(res.Max), + }, + Data: res.Data, + }) + } + } + + if len(errors) != 0 { + /* Returns list of "partial errors" */ + return data, fmt.Errorf("METRICDATA/CCMS > Errors: %s", strings.Join(errors, ", ")) + } + + return data, nil +} + +// sanitizeStats replaces NaN values in statistics with 0 to enable JSON marshaling. +// Regular float64 values cannot be JSONed when NaN. +func sanitizeStats(avg, min, max *schema.Float) { + if avg.IsNaN() || min.IsNaN() || max.IsNaN() { + *avg = schema.Float(0) + *min = schema.Float(0) + *max = schema.Float(0) + } +} + +// hasNaNStats returns true if any of the statistics contain NaN values. +func hasNaNStats(avg, min, max schema.Float) bool { + return avg.IsNaN() || min.IsNaN() || max.IsNaN() +} diff --git a/internal/repository/jobFind.go b/internal/repository/jobFind.go index 4386be2d..d79847a0 100644 --- a/internal/repository/jobFind.go +++ b/internal/repository/jobFind.go @@ -280,6 +280,7 @@ func (r *JobRepository) FindConcurrentJobs( stopTimeTail := stopTime - overlapBufferEnd startTimeFront := startTime + overlapBufferEnd + // Reminder: BETWEEN Queries are slower and dont use indices as frequently: Can this be optimized? queryRunning := query.Where("job.job_state = ?").Where("(job.start_time BETWEEN ? AND ? OR job.start_time < ?)", "running", startTimeTail, stopTimeTail, startTime) // Get At Least One Exact Hostname Match from JSON Resources Array in Database diff --git a/internal/repository/jobQuery.go b/internal/repository/jobQuery.go index 745fa32d..cf7010ee 100644 --- a/internal/repository/jobQuery.go +++ b/internal/repository/jobQuery.go @@ -274,17 +274,36 @@ func BuildWhereClause(filter *model.JobFilter, query sq.SelectBuilder) sq.Select } // buildIntCondition creates a BETWEEN clause for integer range filters. +// Reminder: BETWEEN Queries are slower and dont use indices as frequently: Only use if both conditions required func buildIntCondition(field string, cond *config.IntRange, query sq.SelectBuilder) sq.SelectBuilder { - return query.Where(field+" BETWEEN ? AND ?", cond.From, cond.To) + if cond.From != 0 && cond.To != 0 { + return query.Where(field+" BETWEEN ? AND ?", cond.From, cond.To) + } else if cond.From != 0 { + return query.Where("? <= "+field, cond.From) + } else if cond.To != 0 { + return query.Where(field+" <= ?", cond.To) + } else { + return query + } } // buildFloatCondition creates a BETWEEN clause for float range filters. +// Reminder: BETWEEN Queries are slower and dont use indices as frequently: Only use if both conditions required func buildFloatCondition(field string, cond *model.FloatRange, query sq.SelectBuilder) sq.SelectBuilder { - return query.Where(field+" BETWEEN ? AND ?", cond.From, cond.To) + if cond.From != 0.0 && cond.To != 0.0 { + return query.Where(field+" BETWEEN ? AND ?", cond.From, cond.To) + } else if cond.From != 0.0 { + return query.Where("? <= "+field, cond.From) + } else if cond.To != 0.0 { + return query.Where(field+" <= ?", cond.To) + } else { + return query + } } // buildTimeCondition creates time range filters supporting absolute timestamps, // relative time ranges (last6h, last24h, last7d, last30d), or open-ended ranges. +// Reminder: BETWEEN Queries are slower and dont use indices as frequently: Only use if both conditions required func buildTimeCondition(field string, cond *config.TimeRange, query sq.SelectBuilder) sq.SelectBuilder { if cond.From != nil && cond.To != nil { return query.Where(field+" BETWEEN ? AND ?", cond.From.Unix(), cond.To.Unix()) @@ -308,16 +327,25 @@ func buildTimeCondition(field string, cond *config.TimeRange, query sq.SelectBui cclog.Debugf("No known named timeRange: startTime.range = %s", cond.Range) return query } - return query.Where(field+" BETWEEN ? AND ?", then, now) + return query.Where("? <= "+field, then) } else { return query } } // buildFloatJSONCondition creates a filter on a numeric field within the footprint JSON column. +// Reminder: BETWEEN Queries are slower and dont use indices as frequently: Only use if both conditions required func buildFloatJSONCondition(condName string, condRange *model.FloatRange, query sq.SelectBuilder) sq.SelectBuilder { query = query.Where("JSON_VALID(footprint)") - return query.Where("JSON_EXTRACT(footprint, \"$."+condName+"\") BETWEEN ? AND ?", condRange.From, condRange.To) + if condRange.From != 0.0 && condRange.To != 0.0 { + return query.Where("JSON_EXTRACT(footprint, \"$."+condName+"\") BETWEEN ? AND ?", condRange.From, condRange.To) + } else if condRange.From != 0.0 { + return query.Where("? <= JSON_EXTRACT(footprint, \"$."+condName+"\")", condRange.From) + } else if condRange.To != 0.0 { + return query.Where("JSON_EXTRACT(footprint, \"$."+condName+"\") <= ?", condRange.To) + } else { + return query + } } // buildStringCondition creates filters for string fields supporting equality, diff --git a/internal/repository/migrations/sqlite3/09_add-job-cache.up.sql b/internal/repository/migrations/sqlite3/09_add-job-cache.up.sql index bd465bcb..6e1ac009 100644 --- a/internal/repository/migrations/sqlite3/09_add-job-cache.up.sql +++ b/internal/repository/migrations/sqlite3/09_add-job-cache.up.sql @@ -124,13 +124,15 @@ CREATE INDEX IF NOT EXISTS jobs_cluster_user ON job (cluster, hpc_user); CREATE INDEX IF NOT EXISTS jobs_cluster_project ON job (cluster, project); CREATE INDEX IF NOT EXISTS jobs_cluster_subcluster ON job (cluster, subcluster); -- Cluster Filter Sorting -CREATE INDEX IF NOT EXISTS jobs_cluster_starttime ON job (cluster, start_time); -CREATE INDEX IF NOT EXISTS jobs_cluster_duration ON job (cluster, duration); CREATE INDEX IF NOT EXISTS jobs_cluster_numnodes ON job (cluster, num_nodes); CREATE INDEX IF NOT EXISTS jobs_cluster_numhwthreads ON job (cluster, num_hwthreads); CREATE INDEX IF NOT EXISTS jobs_cluster_numacc ON job (cluster, num_acc); CREATE INDEX IF NOT EXISTS jobs_cluster_energy ON job (cluster, energy); +-- Cluster Time Filter Sorting +CREATE INDEX IF NOT EXISTS jobs_cluster_duration_starttime ON job (cluster, duration, start_time); +CREATE INDEX IF NOT EXISTS jobs_cluster_starttime_duration ON job (cluster, start_time, duration); + -- Cluster+Partition Filter CREATE INDEX IF NOT EXISTS jobs_cluster_partition_user ON job (cluster, cluster_partition, hpc_user); CREATE INDEX IF NOT EXISTS jobs_cluster_partition_project ON job (cluster, cluster_partition, project); @@ -138,76 +140,90 @@ CREATE INDEX IF NOT EXISTS jobs_cluster_partition_jobstate ON job (cluster, clus CREATE INDEX IF NOT EXISTS jobs_cluster_partition_shared ON job (cluster, cluster_partition, shared); -- Cluster+Partition Filter Sorting -CREATE INDEX IF NOT EXISTS jobs_cluster_partition_starttime ON job (cluster, cluster_partition, start_time); -CREATE INDEX IF NOT EXISTS jobs_cluster_partition_duration ON job (cluster, cluster_partition, duration); CREATE INDEX IF NOT EXISTS jobs_cluster_partition_numnodes ON job (cluster, cluster_partition, num_nodes); CREATE INDEX IF NOT EXISTS jobs_cluster_partition_numhwthreads ON job (cluster, cluster_partition, num_hwthreads); CREATE INDEX IF NOT EXISTS jobs_cluster_partition_numacc ON job (cluster, cluster_partition, num_acc); CREATE INDEX IF NOT EXISTS jobs_cluster_partition_energy ON job (cluster, cluster_partition, energy); +-- Cluster+Partition Time Filter Sorting +CREATE INDEX IF NOT EXISTS jobs_cluster_partition_duration_starttime ON job (cluster, cluster_partition, duration, start_time); +CREATE INDEX IF NOT EXISTS jobs_cluster_partition_starttime_duration ON job (cluster, cluster_partition, start_time, duration); + -- Cluster+JobState Filter CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_user ON job (cluster, job_state, hpc_user); CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_project ON job (cluster, job_state, project); -- Cluster+JobState Filter Sorting -CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_starttime ON job (cluster, job_state, start_time); -CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_duration ON job (cluster, job_state, duration); CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_numnodes ON job (cluster, job_state, num_nodes); CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_numhwthreads ON job (cluster, job_state, num_hwthreads); CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_numacc ON job (cluster, job_state, num_acc); CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_energy ON job (cluster, job_state, energy); +-- Cluster+JobState Time Filter Sorting +CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_starttime_duration ON job (cluster, job_state, start_time, duration); +CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_duration_starttime ON job (cluster, job_state, duration, start_time); + -- Cluster+Shared Filter CREATE INDEX IF NOT EXISTS jobs_cluster_shared_user ON job (cluster, shared, hpc_user); CREATE INDEX IF NOT EXISTS jobs_cluster_shared_project ON job (cluster, shared, project); -- Cluster+Shared Filter Sorting -CREATE INDEX IF NOT EXISTS jobs_cluster_shared_starttime ON job (cluster, shared, start_time); -CREATE INDEX IF NOT EXISTS jobs_cluster_shared_duration ON job (cluster, shared, duration); CREATE INDEX IF NOT EXISTS jobs_cluster_shared_numnodes ON job (cluster, shared, num_nodes); CREATE INDEX IF NOT EXISTS jobs_cluster_shared_numhwthreads ON job (cluster, shared, num_hwthreads); CREATE INDEX IF NOT EXISTS jobs_cluster_shared_numacc ON job (cluster, shared, num_acc); CREATE INDEX IF NOT EXISTS jobs_cluster_shared_energy ON job (cluster, shared, energy); +-- Cluster+Shared Time Filter Sorting +CREATE INDEX IF NOT EXISTS jobs_cluster_shared_starttime_duration ON job (cluster, shared, start_time, duration); +CREATE INDEX IF NOT EXISTS jobs_cluster_shared_duration_starttime ON job (cluster, shared, duration, start_time); + -- User Filter -- User Filter Sorting -CREATE INDEX IF NOT EXISTS jobs_user_starttime ON job (hpc_user, start_time); -CREATE INDEX IF NOT EXISTS jobs_user_duration ON job (hpc_user, duration); CREATE INDEX IF NOT EXISTS jobs_user_numnodes ON job (hpc_user, num_nodes); CREATE INDEX IF NOT EXISTS jobs_user_numhwthreads ON job (hpc_user, num_hwthreads); CREATE INDEX IF NOT EXISTS jobs_user_numacc ON job (hpc_user, num_acc); CREATE INDEX IF NOT EXISTS jobs_user_energy ON job (hpc_user, energy); +-- Cluster+Shared Time Filter Sorting +CREATE INDEX IF NOT EXISTS jobs_user_starttime_duration ON job (hpc_user, start_time, duration); +CREATE INDEX IF NOT EXISTS jobs_user_duration_starttime ON job (hpc_user, duration, start_time); + -- Project Filter CREATE INDEX IF NOT EXISTS jobs_project_user ON job (project, hpc_user); -- Project Filter Sorting -CREATE INDEX IF NOT EXISTS jobs_project_starttime ON job (project, start_time); -CREATE INDEX IF NOT EXISTS jobs_project_duration ON job (project, duration); CREATE INDEX IF NOT EXISTS jobs_project_numnodes ON job (project, num_nodes); CREATE INDEX IF NOT EXISTS jobs_project_numhwthreads ON job (project, num_hwthreads); CREATE INDEX IF NOT EXISTS jobs_project_numacc ON job (project, num_acc); CREATE INDEX IF NOT EXISTS jobs_project_energy ON job (project, energy); +-- Cluster+Shared Time Filter Sorting +CREATE INDEX IF NOT EXISTS jobs_project_starttime_duration ON job (project, start_time, duration); +CREATE INDEX IF NOT EXISTS jobs_project_duration_starttime ON job (project, duration, start_time); + -- JobState Filter CREATE INDEX IF NOT EXISTS jobs_jobstate_user ON job (job_state, hpc_user); CREATE INDEX IF NOT EXISTS jobs_jobstate_project ON job (job_state, project); -- JobState Filter Sorting -CREATE INDEX IF NOT EXISTS jobs_jobstate_starttime ON job (job_state, start_time); -CREATE INDEX IF NOT EXISTS jobs_jobstate_duration ON job (job_state, duration); CREATE INDEX IF NOT EXISTS jobs_jobstate_numnodes ON job (job_state, num_nodes); CREATE INDEX IF NOT EXISTS jobs_jobstate_numhwthreads ON job (job_state, num_hwthreads); CREATE INDEX IF NOT EXISTS jobs_jobstate_numacc ON job (job_state, num_acc); CREATE INDEX IF NOT EXISTS jobs_jobstate_energy ON job (job_state, energy); +-- Cluster+Shared Time Filter Sorting +CREATE INDEX IF NOT EXISTS jobs_jobstate_starttime_duration ON job (job_state, start_time, duration); +CREATE INDEX IF NOT EXISTS jobs_jobstate_duration_starttime ON job (job_state, duration, start_time); + -- Shared Filter CREATE INDEX IF NOT EXISTS jobs_shared_user ON job (shared, hpc_user); CREATE INDEX IF NOT EXISTS jobs_shared_project ON job (shared, project); -- Shared Filter Sorting -CREATE INDEX IF NOT EXISTS jobs_shared_starttime ON job (shared, start_time); -CREATE INDEX IF NOT EXISTS jobs_shared_duration ON job (shared, duration); CREATE INDEX IF NOT EXISTS jobs_shared_numnodes ON job (shared, num_nodes); CREATE INDEX IF NOT EXISTS jobs_shared_numhwthreads ON job (shared, num_hwthreads); CREATE INDEX IF NOT EXISTS jobs_shared_numacc ON job (shared, num_acc); CREATE INDEX IF NOT EXISTS jobs_shared_energy ON job (shared, energy); +-- Cluster+Shared Time Filter Sorting +CREATE INDEX IF NOT EXISTS jobs_shared_starttime_duration ON job (shared, start_time, duration); +CREATE INDEX IF NOT EXISTS jobs_shared_duration_starttime ON job (shared, duration, start_time); + -- ArrayJob Filter CREATE INDEX IF NOT EXISTS jobs_arrayjobid_starttime ON job (array_job_id, start_time); CREATE INDEX IF NOT EXISTS jobs_cluster_arrayjobid_starttime ON job (cluster, array_job_id, start_time); @@ -226,6 +242,10 @@ CREATE INDEX IF NOT EXISTS jobs_numhwthreads_duration ON job (num_hwthreads, dur CREATE INDEX IF NOT EXISTS jobs_numacc_duration ON job (num_acc, duration); CREATE INDEX IF NOT EXISTS jobs_energy_duration ON job (energy, duration); +-- Backup Indices For High Variety Columns +CREATE INDEX IF NOT EXISTS jobs_starttime ON job (start_time); +CREATE INDEX IF NOT EXISTS jobs_duration ON job (duration); + -- Notes: -- Cluster+Partition+Jobstate Filter: Tested -> Full Array Of Combinations non-required -- Cluster+JobState+Shared Filter: Tested -> No further timing improvement diff --git a/internal/repository/node.go b/internal/repository/node.go index 0d94e46f..3fa041f6 100644 --- a/internal/repository/node.go +++ b/internal/repository/node.go @@ -197,7 +197,7 @@ func (r *NodeRepository) UpdateNodeState(hostname string, cluster string, nodeSt return err } - cclog.Infof("Added node '%s' to database", hostname) + cclog.Debugf("Added node '%s' to database", hostname) return nil } else { cclog.Warnf("Error while querying node '%v' from database", id) @@ -212,7 +212,7 @@ func (r *NodeRepository) UpdateNodeState(hostname string, cluster string, nodeSt cclog.Errorf("Error while adding node state for '%v' to database", hostname) return err } - cclog.Infof("Updated node state for '%s' in database", hostname) + cclog.Debugf("Updated node state for '%s' in database", hostname) return nil } @@ -716,8 +716,8 @@ func AccessCheckWithUser(user *schema.User, query sq.SelectBuilder) (sq.SelectBu func getNodesFromTopol(cluster string, subCluster string, nodeFilter string, page *model.PageRequest) ([]string, int, bool) { // 0) Init additional vars - var hasNextPage bool = false - var totalNodes int = 0 + hasNextPage := false + totalNodes := 0 // 1) Get list of all nodes var topolNodes []string diff --git a/internal/routerConfig/routes.go b/internal/routerConfig/routes.go index 88c38eb1..b8f6de95 100644 --- a/internal/routerConfig/routes.go +++ b/internal/routerConfig/routes.go @@ -277,10 +277,22 @@ func buildFilterPresets(query url.Values) map[string]interface{} { if query.Get("duration") != "" { parts := strings.Split(query.Get("duration"), "-") if len(parts) == 2 { - a, e1 := strconv.Atoi(parts[0]) - b, e2 := strconv.Atoi(parts[1]) - if e1 == nil && e2 == nil { - filterPresets["duration"] = map[string]int{"from": a, "to": b} + if parts[0] == "lessthan" { + lt, lte := strconv.Atoi(parts[1]) + if lte == nil { + filterPresets["duration"] = map[string]int{"lessThan": lt, "from": 0, "to": 0} + } + } else if parts[0] == "morethan" { + mt, mte := strconv.Atoi(parts[1]) + if mte == nil { + filterPresets["duration"] = map[string]int{"moreThan": mt, "from": 0, "to": 0} + } + } else { + a, e1 := strconv.Atoi(parts[0]) + b, e2 := strconv.Atoi(parts[1]) + if e1 == nil && e2 == nil { + filterPresets["duration"] = map[string]int{"from": a, "to": b} + } } } } diff --git a/internal/taskmanager/updateFootprintService.go b/internal/taskmanager/updateFootprintService.go index fad278e2..71bf4089 100644 --- a/internal/taskmanager/updateFootprintService.go +++ b/internal/taskmanager/updateFootprintService.go @@ -10,8 +10,8 @@ import ( "math" "time" + "github.com/ClusterCockpit/cc-backend/internal/metricdispatch" "github.com/ClusterCockpit/cc-backend/pkg/archive" - "github.com/ClusterCockpit/cc-backend/pkg/metricstore" cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" "github.com/ClusterCockpit/cc-lib/v2/schema" sq "github.com/Masterminds/squirrel" @@ -66,7 +66,14 @@ func RegisterFootprintWorker() { sJob := time.Now() - jobStats, err := metricstore.LoadStats(job, allMetrics, context.Background()) + ms, err := metricdispatch.GetMetricDataRepo(job.Cluster, job.SubCluster) + if err != nil { + cclog.Errorf("failed to load job data from metric store for job %d (user: %s, project: %s): %s", + job.JobID, job.User, job.Project, err.Error()) + continue + } + + jobStats, err := ms.LoadStats(job, allMetrics, context.Background()) if err != nil { cclog.Errorf("error wile loading job data stats for footprint update: %v", err) ce++ diff --git a/pkg/metricstore/query.go b/pkg/metricstore/query.go index a031cb1e..62216e59 100644 --- a/pkg/metricstore/query.go +++ b/pkg/metricstore/query.go @@ -38,6 +38,10 @@ import ( "github.com/ClusterCockpit/cc-lib/v2/schema" ) +type InternalMetricStore struct{} + +var MetricStoreHandle *InternalMetricStore + // TestLoadDataCallback allows tests to override LoadData behavior for testing purposes. // When set to a non-nil function, LoadData will call this function instead of the default implementation. var TestLoadDataCallback func(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context, resolution int) (schema.JobData, error) @@ -65,7 +69,7 @@ var TestLoadDataCallback func(job *schema.Job, metrics []string, scopes []schema // Example: // // jobData, err := LoadData(job, []string{"cpu_load", "mem_used"}, []schema.MetricScope{schema.MetricScopeNode}, ctx, 60) -func LoadData( +func (ccms *InternalMetricStore) LoadData( job *schema.Job, metrics []string, scopes []schema.MetricScope, @@ -502,7 +506,7 @@ func buildQueries( // Returns: // - Map of metric → hostname → statistics // - Error on query building or fetching failure -func LoadStats( +func (ccms *InternalMetricStore) LoadStats( job *schema.Job, metrics []string, ctx context.Context, @@ -574,7 +578,7 @@ func LoadStats( // Returns: // - ScopedJobStats: Map of metric → scope → []ScopedStats (with hostname and ID) // - Error or partial error listing failed queries -func LoadScopedStats( +func (ccms *InternalMetricStore) LoadScopedStats( job *schema.Job, metrics []string, scopes []schema.MetricScope, @@ -675,7 +679,7 @@ func LoadScopedStats( // Returns: // - Map of hostname → metric → []JobMetric // - Error or partial error listing failed queries -func LoadNodeData( +func (ccms *InternalMetricStore) LoadNodeData( cluster string, metrics, nodes []string, scopes []schema.MetricScope, @@ -778,7 +782,7 @@ func LoadNodeData( // Returns: // - Map of hostname → JobData (metric → scope → JobMetric) // - Error or partial error listing failed queries -func LoadNodeListData( +func (ccms *InternalMetricStore) LoadNodeListData( cluster, subCluster string, nodes []string, metrics []string, @@ -912,7 +916,6 @@ func buildNodeQueries( scopes []schema.MetricScope, resolution int64, ) ([]APIQuery, []schema.MetricScope, error) { - queries := make([]APIQuery, 0, len(metrics)*len(scopes)*len(nodes)) assignedScope := []schema.MetricScope{} diff --git a/web/frontend/src/generic/Filters.svelte b/web/frontend/src/generic/Filters.svelte index adb865f3..c79a56e4 100644 --- a/web/frontend/src/generic/Filters.svelte +++ b/web/frontend/src/generic/Filters.svelte @@ -192,14 +192,14 @@ items.push({ startTime: { range: filters.startTime.range }, }); - if (filters.duration.from || filters.duration.to) + if (filters.duration.from && filters.duration.to) items.push({ duration: { from: filters.duration.from, to: filters.duration.to }, }); if (filters.duration.lessThan) - items.push({ duration: { from: 0, to: filters.duration.lessThan } }); + items.push({ duration: { to: filters.duration.lessThan, from: 0 } }); if (filters.duration.moreThan) - items.push({ duration: { from: filters.duration.moreThan, to: 604800 } }); // 7 days to include special jobs with long runtimes + items.push({ duration: { to: 0, from: filters.duration.moreThan } }); if (filters.energy.from || filters.energy.to) items.push({ energy: { from: filters.energy.from, to: filters.energy.to }, @@ -266,9 +266,9 @@ if (filters.duration.from && filters.duration.to) opts.push(`duration=${filters.duration.from}-${filters.duration.to}`); if (filters.duration.lessThan) - opts.push(`duration=0-${filters.duration.lessThan}`); + opts.push(`duration=lessthan-${filters.duration.lessThan}`); if (filters.duration.moreThan) - opts.push(`duration=${filters.duration.moreThan}-604800`); + opts.push(`duration=morethan-${filters.duration.moreThan}`); if (filters.tags.length != 0) for (let tag of filters.tags) opts.push(`tag=${tag}`); if (filters.numNodes.from && filters.numNodes.to) diff --git a/web/frontend/src/generic/JobList.svelte b/web/frontend/src/generic/JobList.svelte index c08a5400..17c8811f 100644 --- a/web/frontend/src/generic/JobList.svelte +++ b/web/frontend/src/generic/JobList.svelte @@ -254,6 +254,9 @@ style="width: {jobInfoColumnWidth}px; padding-top: {headerPaddingTop}px" > Job Info + {#if $jobsStore.fetching} + + {/if} {#if showFootprint} { + lessState = secsToHoursAndMins(pendingDuration?.lessThan); + }); + $effect(() => { + moreState = secsToHoursAndMins(pendingDuration?.moreThan); + }); + $effect(() => { + fromState = secsToHoursAndMins(pendingDuration?.from); + }); + $effect(() => { + toState = secsToHoursAndMins(pendingDuration?.to); + }); + /* Functions */ function resetPending() { pendingDuration = { diff --git a/web/frontend/src/systems/nodelist/NodeListRow.svelte b/web/frontend/src/systems/nodelist/NodeListRow.svelte index 0b34fbad..0e2aeb18 100644 --- a/web/frontend/src/systems/nodelist/NodeListRow.svelte +++ b/web/frontend/src/systems/nodelist/NodeListRow.svelte @@ -116,7 +116,7 @@ pendingExtendedLegendData = {}; for (const accId of accSet) { - const matchJob = $nodeJobsData.data.jobs.items.find((i) => i.resources.find((r) => r.accelerators.includes(accId))) + const matchJob = $nodeJobsData?.data?.jobs?.items?.find((i) => i?.resources?.find((r) => r?.accelerators?.includes(accId))) || null const matchUser = matchJob?.user ? matchJob.user : null pendingExtendedLegendData[accId] = { user: (scrambleNames && matchUser)