mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2026-01-27 22:41:46 +01:00
4
Makefile
4
Makefile
@@ -1,6 +1,6 @@
|
||||
TARGET = ./cc-backend
|
||||
FRONTEND = ./web/frontend
|
||||
VERSION = 1.4.4
|
||||
VERSION = 1.5.0
|
||||
GIT_HASH := $(shell git rev-parse --short HEAD || echo 'development')
|
||||
CURRENT_TIME = $(shell date +"%Y-%m-%d:T%H:%M:%S")
|
||||
LD_FLAGS = '-s -X main.date=${CURRENT_TIME} -X main.version=${VERSION} -X main.commit=${GIT_HASH}'
|
||||
@@ -46,7 +46,7 @@ $(TARGET): $(SVELTE_TARGETS)
|
||||
|
||||
frontend:
|
||||
$(info ===> BUILD frontend)
|
||||
cd web/frontend && npm install && npm run build
|
||||
cd web/frontend && npm ci && npm run build
|
||||
|
||||
swagger:
|
||||
$(info ===> GENERATE swagger)
|
||||
|
||||
193
ReleaseNotes.md
193
ReleaseNotes.md
@@ -1,42 +1,183 @@
|
||||
# `cc-backend` version 1.4.4
|
||||
# `cc-backend` version 1.5.0
|
||||
|
||||
Supports job archive version 2 and database version 8.
|
||||
Supports job archive version 3 and database version 10.
|
||||
|
||||
This is a bug fix release of `cc-backend`, the API backend and frontend
|
||||
This is a feature release of `cc-backend`, the API backend and frontend
|
||||
implementation of ClusterCockpit.
|
||||
For release specific notes visit the [ClusterCockpit Documentation](https://clusterockpit.org/docs/release/).
|
||||
|
||||
## Breaking changes
|
||||
|
||||
The option `apiAllowedIPs` is now a required configuration attribute in
|
||||
`config.json`. This option restricts access to the admin API.
|
||||
### Configuration changes
|
||||
|
||||
To retain the previous behavior that the API is per default accessible from
|
||||
everywhere set:
|
||||
- **JSON attribute naming**: All JSON configuration attributes now use `kebab-case`
|
||||
style consistently (e.g., `api-allowed-ips` instead of `apiAllowedIPs`).
|
||||
Update your `config.json` accordingly.
|
||||
- **Removed `disable-archive` option**: This obsolete configuration option has been removed.
|
||||
- **Removed `clusters` config section**: The separate clusters configuration section
|
||||
has been removed. Cluster information is now derived from the job archive.
|
||||
- **`apiAllowedIPs` is now optional**: If not specified, defaults to secure settings.
|
||||
|
||||
### Architecture changes
|
||||
|
||||
- **MetricStore moved**: The `metricstore` package has been moved from `internal/`
|
||||
to `pkg/` as it is now part of the public API.
|
||||
- **MySQL/MariaDB support removed**: Only SQLite is now supported as the database backend.
|
||||
- **Archive to Cleanup renaming**: Archive-related functions have been refactored
|
||||
and renamed to "Cleanup" for clarity.
|
||||
|
||||
### Dependency changes
|
||||
|
||||
- **cc-lib v2**: Switched to cc-lib version 2 with updated APIs
|
||||
- **cclib NATS client**: Now using the cclib NATS client implementation
|
||||
- Removed obsolete `util.Float` usage from cclib
|
||||
|
||||
## Major new features
|
||||
|
||||
### NATS API Integration
|
||||
|
||||
- **Real-time job events**: Subscribe to job start/stop events via NATS
|
||||
- **Node state updates**: Receive real-time node state changes via NATS
|
||||
- **Configurable subjects**: NATS API subjects are now configurable via `api-subjects`
|
||||
- **Deadlock fixes**: Improved NATS client stability and graceful shutdown
|
||||
|
||||
### Public Dashboard
|
||||
|
||||
- **Public-facing interface**: New public dashboard route for external users
|
||||
- **DoubleMetricPlot component**: New visualization component for comparing metrics
|
||||
- **Improved layout**: Reviewed and optimized dashboard layouts for better readability
|
||||
|
||||
### Enhanced Node Management
|
||||
|
||||
- **Node state tracking**: New node table in database with timestamp tracking
|
||||
- **Node state filtering**: Filter jobs by node state in systems view
|
||||
- **Node metrics improvements**: Better handling of node-level metrics and data
|
||||
- **Node list enhancements**: Improved paging, filtering, and continuous scroll support
|
||||
|
||||
### MetricStore Improvements
|
||||
|
||||
- **Memory tracking worker**: New worker for CCMS memory usage tracking
|
||||
- **Dynamic retention**: Support for cluster/subcluster-specific retention times
|
||||
- **Improved compression**: Transparent compression for job archive imports
|
||||
- **Parallel processing**: Parallelized Iter function in all archive backends
|
||||
|
||||
### Job Tagging System
|
||||
|
||||
- **Job tagger option**: Enable automatic job tagging via configuration flag
|
||||
- **Application detection**: Automatic detection of applications (MATLAB, GROMACS, etc.)
|
||||
- **Job classifaction**: Automatic detection of pathological jobs
|
||||
- **omitTagged flag**: Option to exclude tagged jobs from retention/cleanup operations
|
||||
|
||||
### Archive Backends
|
||||
|
||||
- **S3 backend**: Full support for S3-compatible object storage
|
||||
- **SQLite backend**: Full support for SQLite backend using blobs
|
||||
- **Performance improvements**: Fixed performance bugs in archive backends
|
||||
- **Better error handling**: Improved error messages and fallback handling
|
||||
|
||||
## New features and improvements
|
||||
|
||||
### Frontend
|
||||
|
||||
- **Loading indicators**: Added loading indicators to status detail and job lists
|
||||
- **Job info layout**: Reviewed and improved job info row layout
|
||||
- **Metric selection**: Enhanced metric selection with drag-and-drop fixes
|
||||
- **Filter presets**: Move list filter preset to URL for easy sharing
|
||||
- **Job comparison**: Improved job comparison views and plots
|
||||
- **Subcluster reactivity**: Job list now reacts to subcluster filter changes
|
||||
- **Frontend dependencies**: Bumped frontend dependencies to latest versions
|
||||
- **Svelte 5 compatibility**: Fixed Svelte state warnings and compatibility issues
|
||||
|
||||
### Backend
|
||||
|
||||
- **Progress bars**: Import function now shows progress during long operations
|
||||
- **Better logging**: Improved logging with appropriate log levels throughout
|
||||
- **Graceful shutdown**: Fixed shutdown timeout bugs and hanging issues
|
||||
- **Configuration defaults**: Sensible defaults for most configuration options
|
||||
- **Documentation**: Extensive documentation improvements across packages
|
||||
|
||||
### API improvements
|
||||
|
||||
- **Role-based metric visibility**: Metrics can now have role-based access control
|
||||
- **Job exclusivity filter**: New filter for exclusive vs. shared jobs
|
||||
- **Improved error messages**: Better error messages and documentation in REST API
|
||||
- **GraphQL enhancements**: Improved GraphQL queries and resolvers
|
||||
|
||||
### Performance
|
||||
|
||||
- **Database indices**: Optimized SQLite indices for better query performance
|
||||
- **Job cache**: Introduced caching table for faster job inserts
|
||||
- **Parallel imports**: Archive imports now run in parallel where possible
|
||||
- **External tool integration**: Optimized use of external tools (fd) for better performance
|
||||
|
||||
### Developer experience
|
||||
|
||||
- **AI agent guidelines**: Added documentation for AI coding agents (AGENTS.md, CLAUDE.md)
|
||||
- **Example API payloads**: Added example JSON API payloads for testing
|
||||
- **Unit tests**: Added more unit tests for NATS API and other components
|
||||
- **Test improvements**: Better test coverage and test data
|
||||
|
||||
## Bug fixes
|
||||
|
||||
- Fixed nodelist paging issues
|
||||
- Fixed metric select drag and drop functionality
|
||||
- Fixed render race conditions in nodeList
|
||||
- Fixed tag count grouping including type
|
||||
- Fixed wrong metricstore schema (missing comma)
|
||||
- Fixed configuration issues causing shutdown hangs
|
||||
- Fixed deadlock when NATS is not configured
|
||||
- Fixed archive backend performance bugs
|
||||
- Fixed continuous scroll buildup on refresh
|
||||
- Improved footprint calculation logic
|
||||
- Fixed polar plot data query decoupling
|
||||
- Fixed missing resolution parameter handling
|
||||
- Fixed node table initialization fallback
|
||||
|
||||
## Configuration changes
|
||||
|
||||
### New configuration options
|
||||
|
||||
```json
|
||||
"apiAllowedIPs": [
|
||||
"*"
|
||||
]
|
||||
{
|
||||
"main": {
|
||||
"enable-job-taggers": true,
|
||||
"resampling": {
|
||||
"minimum-points": 600,
|
||||
"trigger": 180,
|
||||
"resolutions": [240, 60]
|
||||
},
|
||||
"api-subjects": {
|
||||
"subject-job-event": "cc.job.event",
|
||||
"subject-node-state": "cc.node.state"
|
||||
}
|
||||
},
|
||||
"nats": {
|
||||
"address": "nats://0.0.0.0:4222",
|
||||
"username": "root",
|
||||
"password": "root"
|
||||
},
|
||||
"cron": {
|
||||
"commit-job-worker": "1m",
|
||||
"duration-worker": "5m",
|
||||
"footprint-worker": "10m"
|
||||
},
|
||||
"metric-store": {
|
||||
"cleanup": {
|
||||
"mode": "archive",
|
||||
"interval": "48h",
|
||||
"directory": "./var/archive"
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
## Breaking changes for minor release 1.4.x
|
||||
## Migration notes
|
||||
|
||||
- You need to perform a database migration. Depending on your database size the
|
||||
migration might require several hours!
|
||||
- You need to adapt the `cluster.json` configuration files in the job-archive,
|
||||
add new required attributes to the metric list and after that edit
|
||||
`./job-archive/version.txt` to version 2. Only metrics that have the footprint
|
||||
attribute set can be filtered and show up in the footprint UI and polar plot.
|
||||
- Continuous scrolling is default now in all job lists. You can change this back
|
||||
to paging globally, also every user can configure to use paging or continuous
|
||||
scrolling individually.
|
||||
- Tags have a scope now. Existing tags will get global scope in the database
|
||||
migration.
|
||||
|
||||
## New features
|
||||
|
||||
- Enable to delete tags from the web interface
|
||||
- Review and update your `config.json` to use kebab-case attribute names
|
||||
- If using NATS, configure the new `nats` and `api-subjects` sections
|
||||
- If using S3 archive backend, configure the new `archive` section options
|
||||
- Test the new public dashboard at `/public` route
|
||||
- Review cron worker configuration if you need different frequencies
|
||||
|
||||
## Known issues
|
||||
|
||||
|
||||
@@ -24,6 +24,7 @@ import (
|
||||
"github.com/ClusterCockpit/cc-backend/internal/auth"
|
||||
"github.com/ClusterCockpit/cc-backend/internal/config"
|
||||
"github.com/ClusterCockpit/cc-backend/internal/importer"
|
||||
"github.com/ClusterCockpit/cc-backend/internal/metricdispatch"
|
||||
"github.com/ClusterCockpit/cc-backend/internal/repository"
|
||||
"github.com/ClusterCockpit/cc-backend/internal/tagger"
|
||||
"github.com/ClusterCockpit/cc-backend/internal/taskmanager"
|
||||
@@ -317,6 +318,7 @@ func runServer(ctx context.Context) error {
|
||||
var wg sync.WaitGroup
|
||||
|
||||
// Initialize metric store if configuration is provided
|
||||
haveMetricstore := false
|
||||
mscfg := ccconf.GetPackageConfig("metric-store")
|
||||
if mscfg != nil {
|
||||
metricstore.Init(mscfg, &wg)
|
||||
@@ -325,7 +327,26 @@ func runServer(ctx context.Context) error {
|
||||
ms := metricstore.GetMemoryStore()
|
||||
jobRepo := repository.GetJobRepository()
|
||||
ms.SetNodeProvider(jobRepo)
|
||||
metricstore.MetricStoreHandle = &metricstore.InternalMetricStore{}
|
||||
haveMetricstore = true
|
||||
} else {
|
||||
metricstore.MetricStoreHandle = nil
|
||||
cclog.Debug("missing internal metricstore configuration")
|
||||
}
|
||||
|
||||
// Initialize external metric stores if configuration is provided
|
||||
mscfg = ccconf.GetPackageConfig("metric-store-external")
|
||||
if mscfg != nil {
|
||||
err := metricdispatch.Init(mscfg)
|
||||
|
||||
if err != nil {
|
||||
cclog.Debugf("initializing metricdispatch: %v", err)
|
||||
} else {
|
||||
haveMetricstore = true
|
||||
}
|
||||
}
|
||||
|
||||
if !haveMetricstore {
|
||||
return fmt.Errorf("missing metricstore configuration")
|
||||
}
|
||||
|
||||
|
||||
@@ -5,18 +5,13 @@
|
||||
"https-key-file": "/etc/letsencrypt/live/url/privkey.pem",
|
||||
"user": "clustercockpit",
|
||||
"group": "clustercockpit",
|
||||
"api-allowed-ips": [
|
||||
"*"
|
||||
],
|
||||
"api-allowed-ips": ["*"],
|
||||
"short-running-jobs-duration": 300,
|
||||
"enable-job-taggers": true,
|
||||
"resampling": {
|
||||
"minimum-points": 600,
|
||||
"trigger": 180,
|
||||
"resolutions": [
|
||||
240,
|
||||
60
|
||||
]
|
||||
"resolutions": [240, 60]
|
||||
},
|
||||
"api-subjects": {
|
||||
"subject-job-event": "cc.job.event",
|
||||
@@ -50,6 +45,28 @@
|
||||
"location": "./var/archive"
|
||||
}
|
||||
},
|
||||
"metric-store-external": [
|
||||
{
|
||||
"scope": "*",
|
||||
"url": "http://x.x.x.x:8082",
|
||||
"token": "MySecret"
|
||||
},
|
||||
{
|
||||
"scope": "fritz",
|
||||
"url": "http://x.x.x.x:8084",
|
||||
"token": "MySecret"
|
||||
},
|
||||
{
|
||||
"scope": "fritz-spr1tb",
|
||||
"url": "http://x.x.x.x:8083",
|
||||
"token": "MySecret"
|
||||
},
|
||||
{
|
||||
"scope": "alex",
|
||||
"url": "http://x.x.x.x:8084",
|
||||
"token": "MySecret"
|
||||
}
|
||||
],
|
||||
"metric-store": {
|
||||
"checkpoints": {
|
||||
"interval": "12h",
|
||||
@@ -74,4 +91,5 @@
|
||||
]
|
||||
},
|
||||
"ui-file": "ui-config.json"
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
2
go.mod
2
go.mod
@@ -11,7 +11,7 @@ tool (
|
||||
|
||||
require (
|
||||
github.com/99designs/gqlgen v0.17.85
|
||||
github.com/ClusterCockpit/cc-lib/v2 v2.2.0
|
||||
github.com/ClusterCockpit/cc-lib/v2 v2.2.1
|
||||
github.com/Masterminds/squirrel v1.5.4
|
||||
github.com/aws/aws-sdk-go-v2 v1.41.1
|
||||
github.com/aws/aws-sdk-go-v2/config v1.32.6
|
||||
|
||||
2
go.sum
2
go.sum
@@ -6,6 +6,8 @@ github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358 h1:mFRzDkZVAjdal+
|
||||
github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358/go.mod h1:chxPXzSsl7ZWRAuOIE23GDNzjWuZquvFlgA8xmpunjU=
|
||||
github.com/ClusterCockpit/cc-lib/v2 v2.2.0 h1:gqMsh7zsJMUhaXviXzaZ3gqXcLVgerjRJHzIcwX4FmQ=
|
||||
github.com/ClusterCockpit/cc-lib/v2 v2.2.0/go.mod h1:JuxMAuEOaLLNEnnL9U3ejha8kMvsSatLdKPZEgJw6iw=
|
||||
github.com/ClusterCockpit/cc-lib/v2 v2.2.1 h1:iCVas+Jc61zFH5S2VG3H1sc7tsn+U4lOJwUYjYZEims=
|
||||
github.com/ClusterCockpit/cc-lib/v2 v2.2.1/go.mod h1:JuxMAuEOaLLNEnnL9U3ejha8kMvsSatLdKPZEgJw6iw=
|
||||
github.com/KyleBanks/depth v1.2.1 h1:5h8fQADFrWtarTdtDudMmGsC7GPbOAu6RVB3ffsVFHc=
|
||||
github.com/KyleBanks/depth v1.2.1/go.mod h1:jzSb9d0L43HxTQfT+oSA1EEp2q+ne2uh6XgeJcm8brE=
|
||||
github.com/Masterminds/squirrel v1.5.4 h1:uUcX/aBc8O7Fg9kaISIUsHXdKuqehiXAMQTYX8afzqM=
|
||||
|
||||
@@ -45,6 +45,13 @@ func setup(t *testing.T) *api.RestAPI {
|
||||
"api-allowed-ips": [
|
||||
"*"
|
||||
]
|
||||
},
|
||||
"metric-store": {
|
||||
"checkpoints": {
|
||||
"interval": "12h"
|
||||
},
|
||||
"retention-in-memory": "48h",
|
||||
"memory-cap": 100
|
||||
},
|
||||
"archive": {
|
||||
"kind": "file",
|
||||
@@ -143,6 +150,7 @@ func setup(t *testing.T) *api.RestAPI {
|
||||
}
|
||||
|
||||
ccconf.Init(cfgFilePath)
|
||||
metricstore.MetricStoreHandle = &metricstore.InternalMetricStore{}
|
||||
|
||||
// Load and check main configuration
|
||||
if cfg := ccconf.GetPackageConfig("main"); cfg != nil {
|
||||
|
||||
29
internal/metricdispatch/configSchema.go
Normal file
29
internal/metricdispatch/configSchema.go
Normal file
@@ -0,0 +1,29 @@
|
||||
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
|
||||
// All rights reserved. This file is part of cc-backend.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package metricdispatch
|
||||
|
||||
const configSchema = `{
|
||||
"type": "array",
|
||||
"description": "Array of metric store configurations with scope-based routing.",
|
||||
"items": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"scope": {
|
||||
"description": "Scope identifier for routing metrics (e.g., cluster name, '*' for default)",
|
||||
"type": "string"
|
||||
},
|
||||
"url": {
|
||||
"description": "URL of the metric store endpoint",
|
||||
"type": "string"
|
||||
},
|
||||
"token": {
|
||||
"description": "Authentication token for the metric store",
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"required": ["scope", "url", "token"]
|
||||
}
|
||||
}`
|
||||
@@ -44,7 +44,6 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/archive"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/metricstore"
|
||||
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
|
||||
"github.com/ClusterCockpit/cc-lib/v2/lrucache"
|
||||
"github.com/ClusterCockpit/cc-lib/v2/resampler"
|
||||
@@ -96,6 +95,13 @@ func LoadData(job *schema.Job,
|
||||
if job.State == schema.JobStateRunning ||
|
||||
job.MonitoringStatus == schema.MonitoringStatusRunningOrArchiving {
|
||||
|
||||
ms, err := GetMetricDataRepo(job.Cluster, job.SubCluster)
|
||||
if err != nil {
|
||||
cclog.Errorf("failed to load job data from metric store for job %d (user: %s, project: %s): %s",
|
||||
job.JobID, job.User, job.Project, err.Error())
|
||||
return err, 0, 0
|
||||
}
|
||||
|
||||
if scopes == nil {
|
||||
scopes = append(scopes, schema.MetricScopeNode)
|
||||
}
|
||||
@@ -107,7 +113,7 @@ func LoadData(job *schema.Job,
|
||||
}
|
||||
}
|
||||
|
||||
jd, err = metricstore.LoadData(job, metrics, scopes, ctx, resolution)
|
||||
jd, err = ms.LoadData(job, metrics, scopes, ctx, resolution)
|
||||
if err != nil {
|
||||
if len(jd) != 0 {
|
||||
cclog.Warnf("partial error loading metrics from store for job %d (user: %s, project: %s): %s",
|
||||
@@ -236,7 +242,14 @@ func LoadAverages(
|
||||
return archive.LoadAveragesFromArchive(job, metrics, data) // #166 change also here?
|
||||
}
|
||||
|
||||
stats, err := metricstore.LoadStats(job, metrics, ctx)
|
||||
ms, err := GetMetricDataRepo(job.Cluster, job.SubCluster)
|
||||
if err != nil {
|
||||
cclog.Errorf("failed to load job data from metric store for job %d (user: %s, project: %s): %s",
|
||||
job.JobID, job.User, job.Project, err.Error())
|
||||
return err
|
||||
}
|
||||
|
||||
stats, err := ms.LoadStats(job, metrics, ctx)
|
||||
if err != nil {
|
||||
cclog.Errorf("failed to load statistics from metric store for job %d (user: %s, project: %s): %s",
|
||||
job.JobID, job.User, job.Project, err.Error())
|
||||
@@ -273,13 +286,23 @@ func LoadScopedJobStats(
|
||||
return archive.LoadScopedStatsFromArchive(job, metrics, scopes)
|
||||
}
|
||||
|
||||
scopedStats, err := metricstore.LoadScopedStats(job, metrics, scopes, ctx)
|
||||
ms, err := GetMetricDataRepo(job.Cluster, job.SubCluster)
|
||||
if err != nil {
|
||||
cclog.Errorf("failed to load job data from metric store for job %d (user: %s, project: %s): %s",
|
||||
job.JobID, job.User, job.Project, err.Error())
|
||||
return nil, err
|
||||
}
|
||||
|
||||
scopedStats, err := ms.LoadScopedStats(job, metrics, scopes, ctx)
|
||||
if err != nil {
|
||||
cclog.Errorf("failed to load scoped statistics from metric store for job %d (user: %s, project: %s): %s",
|
||||
job.JobID, job.User, job.Project, err.Error())
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Round Resulting Stat Values
|
||||
scopedStats.RoundScopedMetricStats()
|
||||
|
||||
return scopedStats, nil
|
||||
}
|
||||
|
||||
@@ -295,9 +318,16 @@ func LoadJobStats(
|
||||
return archive.LoadStatsFromArchive(job, metrics)
|
||||
}
|
||||
|
||||
ms, err := GetMetricDataRepo(job.Cluster, job.SubCluster)
|
||||
if err != nil {
|
||||
cclog.Errorf("failed to load job data from metric store for job %d (user: %s, project: %s): %s",
|
||||
job.JobID, job.User, job.Project, err.Error())
|
||||
return nil, err
|
||||
}
|
||||
|
||||
data := make(map[string]schema.MetricStatistics, len(metrics))
|
||||
|
||||
stats, err := metricstore.LoadStats(job, metrics, ctx)
|
||||
stats, err := ms.LoadStats(job, metrics, ctx)
|
||||
if err != nil {
|
||||
cclog.Errorf("failed to load statistics from metric store for job %d (user: %s, project: %s): %s",
|
||||
job.JobID, job.User, job.Project, err.Error())
|
||||
@@ -333,6 +363,7 @@ func LoadJobStats(
|
||||
// the metric store (not the archive) since it's for current/recent node status monitoring.
|
||||
//
|
||||
// Returns a nested map structure: node -> metric -> scoped data.
|
||||
// FIXME: Add support for subcluster specific cc-metric-stores
|
||||
func LoadNodeData(
|
||||
cluster string,
|
||||
metrics, nodes []string,
|
||||
@@ -346,7 +377,14 @@ func LoadNodeData(
|
||||
}
|
||||
}
|
||||
|
||||
data, err := metricstore.LoadNodeData(cluster, metrics, nodes, scopes, from, to, ctx)
|
||||
ms, err := GetMetricDataRepo(cluster, "")
|
||||
if err != nil {
|
||||
cclog.Errorf("failed to load node data from metric store: %s",
|
||||
err.Error())
|
||||
return nil, err
|
||||
}
|
||||
|
||||
data, err := ms.LoadNodeData(cluster, metrics, nodes, scopes, from, to, ctx)
|
||||
if err != nil {
|
||||
if len(data) != 0 {
|
||||
cclog.Warnf("partial error loading node data from metric store for cluster %s: %s", cluster, err.Error())
|
||||
@@ -383,7 +421,14 @@ func LoadNodeListData(
|
||||
}
|
||||
}
|
||||
|
||||
data, err := metricstore.LoadNodeListData(cluster, subCluster, nodes, metrics, scopes, resolution, from, to, ctx)
|
||||
ms, err := GetMetricDataRepo(cluster, subCluster)
|
||||
if err != nil {
|
||||
cclog.Errorf("failed to load node data from metric store: %s",
|
||||
err.Error())
|
||||
return nil, err
|
||||
}
|
||||
|
||||
data, err := ms.LoadNodeListData(cluster, subCluster, nodes, metrics, scopes, resolution, from, to, ctx)
|
||||
if err != nil {
|
||||
if len(data) != 0 {
|
||||
cclog.Warnf("partial error loading node list data from metric store for cluster %s, subcluster %s: %s",
|
||||
|
||||
112
internal/metricdispatch/metricdata.go
Executable file
112
internal/metricdispatch/metricdata.go
Executable file
@@ -0,0 +1,112 @@
|
||||
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
|
||||
// All rights reserved.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
package metricdispatch
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/ClusterCockpit/cc-backend/internal/config"
|
||||
ccms "github.com/ClusterCockpit/cc-backend/internal/metricstoreclient"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/metricstore"
|
||||
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
|
||||
"github.com/ClusterCockpit/cc-lib/v2/schema"
|
||||
)
|
||||
|
||||
type MetricDataRepository interface {
|
||||
// Return the JobData for the given job, only with the requested metrics.
|
||||
LoadData(job *schema.Job,
|
||||
metrics []string,
|
||||
scopes []schema.MetricScope,
|
||||
ctx context.Context,
|
||||
resolution int) (schema.JobData, error)
|
||||
|
||||
// Return a map of metrics to a map of nodes to the metric statistics of the job. node scope only.
|
||||
LoadStats(job *schema.Job,
|
||||
metrics []string,
|
||||
ctx context.Context) (map[string]map[string]schema.MetricStatistics, error)
|
||||
|
||||
// Return a map of metrics to a map of scopes to the scoped metric statistics of the job.
|
||||
LoadScopedStats(job *schema.Job,
|
||||
metrics []string,
|
||||
scopes []schema.MetricScope,
|
||||
ctx context.Context) (schema.ScopedJobStats, error)
|
||||
|
||||
// Return a map of hosts to a map of metrics at the requested scopes (currently only node) for that node.
|
||||
LoadNodeData(cluster string,
|
||||
metrics, nodes []string,
|
||||
scopes []schema.MetricScope,
|
||||
from, to time.Time,
|
||||
ctx context.Context) (map[string]map[string][]*schema.JobMetric, error)
|
||||
|
||||
// Return a map of hosts to a map of metrics to a map of scopes for multiple nodes.
|
||||
LoadNodeListData(cluster, subCluster string,
|
||||
nodes []string,
|
||||
metrics []string,
|
||||
scopes []schema.MetricScope,
|
||||
resolution int,
|
||||
from, to time.Time,
|
||||
ctx context.Context) (map[string]schema.JobData, error)
|
||||
}
|
||||
|
||||
type CCMetricStoreConfig struct {
|
||||
Scope string `json:"scope"`
|
||||
URL string `json:"url"`
|
||||
Token string `json:"token"`
|
||||
}
|
||||
|
||||
var metricDataRepos map[string]MetricDataRepository = map[string]MetricDataRepository{}
|
||||
|
||||
func Init(rawConfig json.RawMessage) error {
|
||||
if rawConfig != nil {
|
||||
var configs []CCMetricStoreConfig
|
||||
config.Validate(configSchema, rawConfig)
|
||||
dec := json.NewDecoder(bytes.NewReader(rawConfig))
|
||||
dec.DisallowUnknownFields()
|
||||
if err := dec.Decode(&configs); err != nil {
|
||||
return fmt.Errorf("[METRICDISPATCH]> Metric Store Config Init: Could not decode config file '%s' Error: %s", rawConfig, err.Error())
|
||||
}
|
||||
|
||||
if len(configs) == 0 {
|
||||
return fmt.Errorf("[METRICDISPATCH]> No metric store configurations found in config file")
|
||||
}
|
||||
|
||||
for _, config := range configs {
|
||||
metricDataRepos[config.Scope] = ccms.NewCCMetricStore(config.URL, config.Token)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func GetMetricDataRepo(cluster string, subcluster string) (MetricDataRepository, error) {
|
||||
var repo MetricDataRepository
|
||||
var ok bool
|
||||
|
||||
key := cluster + "-" + subcluster
|
||||
repo, ok = metricDataRepos[key]
|
||||
|
||||
if !ok {
|
||||
repo, ok = metricDataRepos[cluster]
|
||||
|
||||
if !ok {
|
||||
repo, ok = metricDataRepos["*"]
|
||||
|
||||
if !ok {
|
||||
if metricstore.MetricStoreHandle == nil {
|
||||
return nil, fmt.Errorf("[METRICDISPATCH]> no metric data repository configured '%s'", key)
|
||||
}
|
||||
|
||||
repo = metricstore.MetricStoreHandle
|
||||
cclog.Debugf("[METRICDISPATCH]> Using internal metric data repository for '%s'", key)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return repo, nil
|
||||
}
|
||||
507
internal/metricstoreclient/cc-metric-store-queries.go
Normal file
507
internal/metricstoreclient/cc-metric-store-queries.go
Normal file
@@ -0,0 +1,507 @@
|
||||
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
|
||||
// All rights reserved.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
// Package metricstoreclient - Query Building
|
||||
//
|
||||
// This file contains the query construction and scope transformation logic for cc-metric-store queries.
|
||||
// It handles the complex mapping between requested metric scopes and native hardware topology,
|
||||
// automatically aggregating or filtering metrics as needed.
|
||||
//
|
||||
// # Scope Transformations
|
||||
//
|
||||
// The buildScopeQueries function implements the core scope transformation algorithm.
|
||||
// It handles 25+ different transformation cases, mapping between:
|
||||
// - Accelerator (GPU) scope
|
||||
// - HWThread (hardware thread/SMT) scope
|
||||
// - Core (CPU core) scope
|
||||
// - Socket (CPU package) scope
|
||||
// - MemoryDomain (NUMA domain) scope
|
||||
// - Node (full system) scope
|
||||
//
|
||||
// Transformations follow these rules:
|
||||
// - Same scope: Return data as-is (e.g., Core → Core)
|
||||
// - Coarser scope: Aggregate data (e.g., Core → Socket with Aggregate=true)
|
||||
// - Finer scope: Error - cannot increase granularity
|
||||
//
|
||||
// # Query Building
|
||||
//
|
||||
// buildQueries and buildNodeQueries are the main entry points, handling job-specific
|
||||
// and node-specific query construction respectively. They:
|
||||
// - Validate metric configurations
|
||||
// - Handle subcluster-specific metric filtering
|
||||
// - Detect and skip duplicate scope requests
|
||||
// - Call buildScopeQueries for each metric/scope/host combination
|
||||
package metricstoreclient
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/archive"
|
||||
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
|
||||
"github.com/ClusterCockpit/cc-lib/v2/schema"
|
||||
)
|
||||
|
||||
// Scope string constants used in API queries.
|
||||
// Pre-converted to avoid repeated allocations during query building.
|
||||
var (
|
||||
hwthreadString = string(schema.MetricScopeHWThread)
|
||||
coreString = string(schema.MetricScopeCore)
|
||||
memoryDomainString = string(schema.MetricScopeMemoryDomain)
|
||||
socketString = string(schema.MetricScopeSocket)
|
||||
acceleratorString = string(schema.MetricScopeAccelerator)
|
||||
)
|
||||
|
||||
// buildQueries constructs API queries for job-specific metric data.
|
||||
// It iterates through metrics, scopes, and job resources to build the complete query set.
|
||||
//
|
||||
// The function handles:
|
||||
// - Metric configuration validation and subcluster filtering
|
||||
// - Scope deduplication to avoid redundant queries
|
||||
// - Hardware thread list resolution (job-allocated vs full node)
|
||||
// - Delegation to buildScopeQueries for scope transformations
|
||||
//
|
||||
// Returns queries and their corresponding assigned scopes (which may differ from requested scopes).
|
||||
func (ccms *CCMetricStore) buildQueries(
|
||||
job *schema.Job,
|
||||
metrics []string,
|
||||
scopes []schema.MetricScope,
|
||||
resolution int,
|
||||
) ([]APIQuery, []schema.MetricScope, error) {
|
||||
queries := make([]APIQuery, 0, len(metrics)*len(scopes)*len(job.Resources))
|
||||
assignedScope := []schema.MetricScope{}
|
||||
|
||||
subcluster, scerr := archive.GetSubCluster(job.Cluster, job.SubCluster)
|
||||
if scerr != nil {
|
||||
return nil, nil, scerr
|
||||
}
|
||||
topology := subcluster.Topology
|
||||
|
||||
for _, metric := range metrics {
|
||||
remoteName := metric
|
||||
mc := archive.GetMetricConfig(job.Cluster, metric)
|
||||
if mc == nil {
|
||||
cclog.Warnf("metric '%s' is not specified for cluster '%s' - skipping", metric, job.Cluster)
|
||||
continue
|
||||
}
|
||||
|
||||
// Skip if metric is removed for subcluster
|
||||
if len(mc.SubClusters) != 0 {
|
||||
isRemoved := false
|
||||
for _, scConfig := range mc.SubClusters {
|
||||
if scConfig.Name == job.SubCluster && scConfig.Remove {
|
||||
isRemoved = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if isRemoved {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// Avoid duplicates...
|
||||
handledScopes := make([]schema.MetricScope, 0, 3)
|
||||
|
||||
scopesLoop:
|
||||
for _, requestedScope := range scopes {
|
||||
nativeScope := mc.Scope
|
||||
if nativeScope == schema.MetricScopeAccelerator && job.NumAcc == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
scope := nativeScope.Max(requestedScope)
|
||||
for _, s := range handledScopes {
|
||||
if scope == s {
|
||||
continue scopesLoop
|
||||
}
|
||||
}
|
||||
handledScopes = append(handledScopes, scope)
|
||||
|
||||
for _, host := range job.Resources {
|
||||
hwthreads := host.HWThreads
|
||||
if hwthreads == nil {
|
||||
hwthreads = topology.Node
|
||||
}
|
||||
|
||||
hostQueries, hostScopes := buildScopeQueries(
|
||||
nativeScope, requestedScope,
|
||||
remoteName, host.Hostname,
|
||||
&topology, hwthreads, host.Accelerators,
|
||||
resolution,
|
||||
)
|
||||
|
||||
if len(hostQueries) == 0 && len(hostScopes) == 0 {
|
||||
return nil, nil, fmt.Errorf("METRICDATA/CCMS > TODO: unhandled case: native-scope=%s, requested-scope=%s", nativeScope, requestedScope)
|
||||
}
|
||||
|
||||
queries = append(queries, hostQueries...)
|
||||
assignedScope = append(assignedScope, hostScopes...)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return queries, assignedScope, nil
|
||||
}
|
||||
|
||||
// buildNodeQueries constructs API queries for node-specific metric data (Systems View).
|
||||
// Similar to buildQueries but uses full node topology instead of job-allocated resources.
|
||||
//
|
||||
// The function handles:
|
||||
// - Subcluster topology resolution (either pre-loaded or per-node lookup)
|
||||
// - Full node hardware thread lists (not job-specific subsets)
|
||||
// - All accelerators on each node
|
||||
// - Metric configuration validation with subcluster filtering
|
||||
//
|
||||
// Returns queries and their corresponding assigned scopes.
|
||||
func (ccms *CCMetricStore) buildNodeQueries(
|
||||
cluster string,
|
||||
subCluster string,
|
||||
nodes []string,
|
||||
metrics []string,
|
||||
scopes []schema.MetricScope,
|
||||
resolution int,
|
||||
) ([]APIQuery, []schema.MetricScope, error) {
|
||||
queries := make([]APIQuery, 0, len(metrics)*len(scopes)*len(nodes))
|
||||
assignedScope := []schema.MetricScope{}
|
||||
|
||||
// Get Topol before loop if subCluster given
|
||||
var subClusterTopol *schema.SubCluster
|
||||
var scterr error
|
||||
if subCluster != "" {
|
||||
subClusterTopol, scterr = archive.GetSubCluster(cluster, subCluster)
|
||||
if scterr != nil {
|
||||
cclog.Errorf("could not load cluster %s subCluster %s topology: %s", cluster, subCluster, scterr.Error())
|
||||
return nil, nil, scterr
|
||||
}
|
||||
}
|
||||
|
||||
for _, metric := range metrics {
|
||||
remoteName := metric
|
||||
mc := archive.GetMetricConfig(cluster, metric)
|
||||
if mc == nil {
|
||||
cclog.Warnf("metric '%s' is not specified for cluster '%s'", metric, cluster)
|
||||
continue
|
||||
}
|
||||
|
||||
// Skip if metric is removed for subcluster
|
||||
if mc.SubClusters != nil {
|
||||
isRemoved := false
|
||||
for _, scConfig := range mc.SubClusters {
|
||||
if scConfig.Name == subCluster && scConfig.Remove {
|
||||
isRemoved = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if isRemoved {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// Avoid duplicates...
|
||||
handledScopes := make([]schema.MetricScope, 0, 3)
|
||||
|
||||
scopesLoop:
|
||||
for _, requestedScope := range scopes {
|
||||
nativeScope := mc.Scope
|
||||
|
||||
scope := nativeScope.Max(requestedScope)
|
||||
for _, s := range handledScopes {
|
||||
if scope == s {
|
||||
continue scopesLoop
|
||||
}
|
||||
}
|
||||
handledScopes = append(handledScopes, scope)
|
||||
|
||||
for _, hostname := range nodes {
|
||||
|
||||
// If no subCluster given, get it by node
|
||||
if subCluster == "" {
|
||||
subClusterName, scnerr := archive.GetSubClusterByNode(cluster, hostname)
|
||||
if scnerr != nil {
|
||||
return nil, nil, scnerr
|
||||
}
|
||||
subClusterTopol, scterr = archive.GetSubCluster(cluster, subClusterName)
|
||||
if scterr != nil {
|
||||
return nil, nil, scterr
|
||||
}
|
||||
}
|
||||
|
||||
// Always full node hwthread id list, no partial queries expected -> Use "topology.Node" directly where applicable
|
||||
// Always full accelerator id list, no partial queries expected -> Use "acceleratorIds" directly where applicable
|
||||
topology := subClusterTopol.Topology
|
||||
acceleratorIds := topology.GetAcceleratorIDs()
|
||||
|
||||
// Moved check here if metric matches hardware specs
|
||||
if nativeScope == schema.MetricScopeAccelerator && len(acceleratorIds) == 0 {
|
||||
continue scopesLoop
|
||||
}
|
||||
|
||||
nodeQueries, nodeScopes := buildScopeQueries(
|
||||
nativeScope, requestedScope,
|
||||
remoteName, hostname,
|
||||
&topology, topology.Node, acceleratorIds,
|
||||
resolution,
|
||||
)
|
||||
|
||||
if len(nodeQueries) == 0 && len(nodeScopes) == 0 {
|
||||
return nil, nil, fmt.Errorf("METRICDATA/CCMS > TODO: unhandled case: native-scope=%s, requested-scope=%s", nativeScope, requestedScope)
|
||||
}
|
||||
|
||||
queries = append(queries, nodeQueries...)
|
||||
assignedScope = append(assignedScope, nodeScopes...)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return queries, assignedScope, nil
|
||||
}
|
||||
|
||||
// buildScopeQueries generates API queries for a given scope transformation.
|
||||
// It returns a slice of queries and corresponding assigned scopes.
|
||||
// Some transformations (e.g., HWThread -> Core/Socket) may generate multiple queries.
|
||||
func buildScopeQueries(
|
||||
nativeScope, requestedScope schema.MetricScope,
|
||||
metric, hostname string,
|
||||
topology *schema.Topology,
|
||||
hwthreads []int,
|
||||
accelerators []string,
|
||||
resolution int,
|
||||
) ([]APIQuery, []schema.MetricScope) {
|
||||
scope := nativeScope.Max(requestedScope)
|
||||
queries := []APIQuery{}
|
||||
scopes := []schema.MetricScope{}
|
||||
|
||||
hwthreadsStr := intToStringSlice(hwthreads)
|
||||
|
||||
// Accelerator -> Accelerator (Use "accelerator" scope if requested scope is lower than node)
|
||||
if nativeScope == schema.MetricScopeAccelerator && scope.LT(schema.MetricScopeNode) {
|
||||
if scope != schema.MetricScopeAccelerator {
|
||||
// Skip all other caught cases
|
||||
return queries, scopes
|
||||
}
|
||||
|
||||
queries = append(queries, APIQuery{
|
||||
Metric: metric,
|
||||
Hostname: hostname,
|
||||
Aggregate: false,
|
||||
Type: &acceleratorString,
|
||||
TypeIds: accelerators,
|
||||
Resolution: resolution,
|
||||
})
|
||||
scopes = append(scopes, schema.MetricScopeAccelerator)
|
||||
return queries, scopes
|
||||
}
|
||||
|
||||
// Accelerator -> Node
|
||||
if nativeScope == schema.MetricScopeAccelerator && scope == schema.MetricScopeNode {
|
||||
if len(accelerators) == 0 {
|
||||
return queries, scopes
|
||||
}
|
||||
|
||||
queries = append(queries, APIQuery{
|
||||
Metric: metric,
|
||||
Hostname: hostname,
|
||||
Aggregate: true,
|
||||
Type: &acceleratorString,
|
||||
TypeIds: accelerators,
|
||||
Resolution: resolution,
|
||||
})
|
||||
scopes = append(scopes, scope)
|
||||
return queries, scopes
|
||||
}
|
||||
|
||||
// HWThread -> HWThread
|
||||
if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeHWThread {
|
||||
queries = append(queries, APIQuery{
|
||||
Metric: metric,
|
||||
Hostname: hostname,
|
||||
Aggregate: false,
|
||||
Type: &hwthreadString,
|
||||
TypeIds: hwthreadsStr,
|
||||
Resolution: resolution,
|
||||
})
|
||||
scopes = append(scopes, scope)
|
||||
return queries, scopes
|
||||
}
|
||||
|
||||
// HWThread -> Core
|
||||
if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeCore {
|
||||
cores, _ := topology.GetCoresFromHWThreads(hwthreads)
|
||||
for _, core := range cores {
|
||||
queries = append(queries, APIQuery{
|
||||
Metric: metric,
|
||||
Hostname: hostname,
|
||||
Aggregate: true,
|
||||
Type: &hwthreadString,
|
||||
TypeIds: intToStringSlice(topology.Core[core]),
|
||||
Resolution: resolution,
|
||||
})
|
||||
scopes = append(scopes, scope)
|
||||
}
|
||||
return queries, scopes
|
||||
}
|
||||
|
||||
// HWThread -> Socket
|
||||
if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeSocket {
|
||||
sockets, _ := topology.GetSocketsFromHWThreads(hwthreads)
|
||||
for _, socket := range sockets {
|
||||
queries = append(queries, APIQuery{
|
||||
Metric: metric,
|
||||
Hostname: hostname,
|
||||
Aggregate: true,
|
||||
Type: &hwthreadString,
|
||||
TypeIds: intToStringSlice(topology.Socket[socket]),
|
||||
Resolution: resolution,
|
||||
})
|
||||
scopes = append(scopes, scope)
|
||||
}
|
||||
return queries, scopes
|
||||
}
|
||||
|
||||
// HWThread -> Node
|
||||
if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeNode {
|
||||
queries = append(queries, APIQuery{
|
||||
Metric: metric,
|
||||
Hostname: hostname,
|
||||
Aggregate: true,
|
||||
Type: &hwthreadString,
|
||||
TypeIds: hwthreadsStr,
|
||||
Resolution: resolution,
|
||||
})
|
||||
scopes = append(scopes, scope)
|
||||
return queries, scopes
|
||||
}
|
||||
|
||||
// Core -> Core
|
||||
if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeCore {
|
||||
cores, _ := topology.GetCoresFromHWThreads(hwthreads)
|
||||
queries = append(queries, APIQuery{
|
||||
Metric: metric,
|
||||
Hostname: hostname,
|
||||
Aggregate: false,
|
||||
Type: &coreString,
|
||||
TypeIds: intToStringSlice(cores),
|
||||
Resolution: resolution,
|
||||
})
|
||||
scopes = append(scopes, scope)
|
||||
return queries, scopes
|
||||
}
|
||||
|
||||
// Core -> Socket
|
||||
if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeSocket {
|
||||
sockets, _ := topology.GetSocketsFromCores(hwthreads)
|
||||
for _, socket := range sockets {
|
||||
queries = append(queries, APIQuery{
|
||||
Metric: metric,
|
||||
Hostname: hostname,
|
||||
Aggregate: true,
|
||||
Type: &coreString,
|
||||
TypeIds: intToStringSlice(topology.Socket[socket]),
|
||||
Resolution: resolution,
|
||||
})
|
||||
scopes = append(scopes, scope)
|
||||
}
|
||||
return queries, scopes
|
||||
}
|
||||
|
||||
// Core -> Node
|
||||
if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeNode {
|
||||
cores, _ := topology.GetCoresFromHWThreads(hwthreads)
|
||||
queries = append(queries, APIQuery{
|
||||
Metric: metric,
|
||||
Hostname: hostname,
|
||||
Aggregate: true,
|
||||
Type: &coreString,
|
||||
TypeIds: intToStringSlice(cores),
|
||||
Resolution: resolution,
|
||||
})
|
||||
scopes = append(scopes, scope)
|
||||
return queries, scopes
|
||||
}
|
||||
|
||||
// MemoryDomain -> MemoryDomain
|
||||
if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeMemoryDomain {
|
||||
memDomains, _ := topology.GetMemoryDomainsFromHWThreads(hwthreads)
|
||||
queries = append(queries, APIQuery{
|
||||
Metric: metric,
|
||||
Hostname: hostname,
|
||||
Aggregate: false,
|
||||
Type: &memoryDomainString,
|
||||
TypeIds: intToStringSlice(memDomains),
|
||||
Resolution: resolution,
|
||||
})
|
||||
scopes = append(scopes, scope)
|
||||
return queries, scopes
|
||||
}
|
||||
|
||||
// MemoryDomain -> Node
|
||||
if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeNode {
|
||||
memDomains, _ := topology.GetMemoryDomainsFromHWThreads(hwthreads)
|
||||
queries = append(queries, APIQuery{
|
||||
Metric: metric,
|
||||
Hostname: hostname,
|
||||
Aggregate: true,
|
||||
Type: &memoryDomainString,
|
||||
TypeIds: intToStringSlice(memDomains),
|
||||
Resolution: resolution,
|
||||
})
|
||||
scopes = append(scopes, scope)
|
||||
return queries, scopes
|
||||
}
|
||||
|
||||
// Socket -> Socket
|
||||
if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeSocket {
|
||||
sockets, _ := topology.GetSocketsFromHWThreads(hwthreads)
|
||||
queries = append(queries, APIQuery{
|
||||
Metric: metric,
|
||||
Hostname: hostname,
|
||||
Aggregate: false,
|
||||
Type: &socketString,
|
||||
TypeIds: intToStringSlice(sockets),
|
||||
Resolution: resolution,
|
||||
})
|
||||
scopes = append(scopes, scope)
|
||||
return queries, scopes
|
||||
}
|
||||
|
||||
// Socket -> Node
|
||||
if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeNode {
|
||||
sockets, _ := topology.GetSocketsFromHWThreads(hwthreads)
|
||||
queries = append(queries, APIQuery{
|
||||
Metric: metric,
|
||||
Hostname: hostname,
|
||||
Aggregate: true,
|
||||
Type: &socketString,
|
||||
TypeIds: intToStringSlice(sockets),
|
||||
Resolution: resolution,
|
||||
})
|
||||
scopes = append(scopes, scope)
|
||||
return queries, scopes
|
||||
}
|
||||
|
||||
// Node -> Node
|
||||
if nativeScope == schema.MetricScopeNode && scope == schema.MetricScopeNode {
|
||||
queries = append(queries, APIQuery{
|
||||
Metric: metric,
|
||||
Hostname: hostname,
|
||||
Resolution: resolution,
|
||||
})
|
||||
scopes = append(scopes, scope)
|
||||
return queries, scopes
|
||||
}
|
||||
|
||||
// Unhandled case - return empty slices
|
||||
return queries, scopes
|
||||
}
|
||||
|
||||
// intToStringSlice converts a slice of integers to a slice of strings.
|
||||
// Used to convert hardware IDs (core IDs, socket IDs, etc.) to the string format required by the API.
|
||||
func intToStringSlice(is []int) []string {
|
||||
ss := make([]string, len(is))
|
||||
for i, x := range is {
|
||||
ss[i] = strconv.Itoa(x)
|
||||
}
|
||||
return ss
|
||||
}
|
||||
669
internal/metricstoreclient/cc-metric-store.go
Normal file
669
internal/metricstoreclient/cc-metric-store.go
Normal file
@@ -0,0 +1,669 @@
|
||||
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
|
||||
// All rights reserved.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
// Package metricstoreclient provides a client for querying the cc-metric-store time series database.
|
||||
//
|
||||
// The cc-metric-store is a high-performance time series database optimized for HPC metric data.
|
||||
// This client handles HTTP communication, query construction, scope transformations, and data retrieval
|
||||
// for job and node metrics across different metric scopes (node, socket, core, hwthread, accelerator).
|
||||
//
|
||||
// # Architecture
|
||||
//
|
||||
// The package is split into two main components:
|
||||
// - Client Operations (cc-metric-store.go): HTTP client, request handling, data loading methods
|
||||
// - Query Building (cc-metric-store-queries.go): Query construction and scope transformation logic
|
||||
//
|
||||
// # Basic Usage
|
||||
//
|
||||
// store := NewCCMetricStore("http://localhost:8080", "jwt-token")
|
||||
//
|
||||
// // Load job data
|
||||
// jobData, err := store.LoadData(job, metrics, scopes, ctx, resolution)
|
||||
// if err != nil {
|
||||
// log.Fatal(err)
|
||||
// }
|
||||
//
|
||||
// # Metric Scopes
|
||||
//
|
||||
// The client supports hierarchical metric scopes that map to HPC hardware topology:
|
||||
// - MetricScopeAccelerator: GPU/accelerator level metrics
|
||||
// - MetricScopeHWThread: Hardware thread (SMT) level metrics
|
||||
// - MetricScopeCore: CPU core level metrics
|
||||
// - MetricScopeSocket: CPU socket level metrics
|
||||
// - MetricScopeMemoryDomain: NUMA domain level metrics
|
||||
// - MetricScopeNode: Full node level metrics
|
||||
//
|
||||
// The client automatically handles scope transformations, aggregating finer-grained metrics
|
||||
// to coarser scopes when needed (e.g., aggregating core metrics to socket level).
|
||||
//
|
||||
// # Error Handling
|
||||
//
|
||||
// The client supports partial errors - if some queries fail, it returns both the successful
|
||||
// data and an error listing the failed queries. This allows processing partial results
|
||||
// when some nodes or metrics are temporarily unavailable.
|
||||
//
|
||||
// # API Versioning
|
||||
//
|
||||
// The client uses cc-metric-store API v2, which includes support for:
|
||||
// - Data resampling for bandwidth optimization
|
||||
// - Multi-scope queries in a single request
|
||||
// - Aggregation across hardware topology levels
|
||||
package metricstoreclient
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/archive"
|
||||
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
|
||||
"github.com/ClusterCockpit/cc-lib/v2/schema"
|
||||
)
|
||||
|
||||
// CCMetricStore is the HTTP client for communicating with cc-metric-store.
|
||||
// It manages connection details, authentication, and provides methods for querying metrics.
|
||||
type CCMetricStore struct {
|
||||
client http.Client // HTTP client with 10-second timeout
|
||||
jwt string // JWT Bearer token for authentication
|
||||
url string // Base URL of cc-metric-store instance
|
||||
queryEndpoint string // Full URL to query API endpoint
|
||||
}
|
||||
|
||||
// APIQueryRequest represents a request to the cc-metric-store query API.
|
||||
// It supports both explicit queries and "for-all-nodes" bulk queries.
|
||||
type APIQueryRequest struct {
|
||||
Cluster string `json:"cluster"` // Target cluster name
|
||||
Queries []APIQuery `json:"queries"` // Explicit list of metric queries
|
||||
ForAllNodes []string `json:"for-all-nodes"` // Metrics to query for all nodes
|
||||
From int64 `json:"from"` // Start time (Unix timestamp)
|
||||
To int64 `json:"to"` // End time (Unix timestamp)
|
||||
WithStats bool `json:"with-stats"` // Include min/avg/max statistics
|
||||
WithData bool `json:"with-data"` // Include time series data points
|
||||
}
|
||||
|
||||
// APIQuery specifies a single metric query with optional scope filtering.
|
||||
// Type and TypeIds define the hardware scope (core, socket, accelerator, etc.).
|
||||
type APIQuery struct {
|
||||
Type *string `json:"type,omitempty"` // Scope type (e.g., "core", "socket")
|
||||
SubType *string `json:"subtype,omitempty"` // Sub-scope type (reserved for future use)
|
||||
Metric string `json:"metric"` // Metric name
|
||||
Hostname string `json:"host"` // Target hostname
|
||||
Resolution int `json:"resolution"` // Data resolution in seconds (0 = native)
|
||||
TypeIds []string `json:"type-ids,omitempty"` // IDs for the scope type (e.g., core IDs)
|
||||
SubTypeIds []string `json:"subtype-ids,omitempty"` // IDs for sub-scope (reserved)
|
||||
Aggregate bool `json:"aggreg"` // Aggregate across TypeIds
|
||||
}
|
||||
|
||||
// APIQueryResponse contains the results from a cc-metric-store query.
|
||||
// Results align with the Queries slice by index.
|
||||
type APIQueryResponse struct {
|
||||
Queries []APIQuery `json:"queries,omitempty"` // Echoed queries (for bulk requests)
|
||||
Results [][]APIMetricData `json:"results"` // Result data, indexed by query
|
||||
}
|
||||
|
||||
// APIMetricData represents time series data and statistics for a single metric series.
|
||||
// Error is set if this particular series failed to load.
|
||||
type APIMetricData struct {
|
||||
Error *string `json:"error"` // Error message if query failed
|
||||
Data []schema.Float `json:"data"` // Time series data points
|
||||
From int64 `json:"from"` // Actual start time of data
|
||||
To int64 `json:"to"` // Actual end time of data
|
||||
Resolution int `json:"resolution"` // Actual resolution of data in seconds
|
||||
Avg schema.Float `json:"avg"` // Average value across time range
|
||||
Min schema.Float `json:"min"` // Minimum value in time range
|
||||
Max schema.Float `json:"max"` // Maximum value in time range
|
||||
}
|
||||
|
||||
// NewCCMetricStore creates and initializes a new CCMetricStore client.
|
||||
// The url parameter should include the protocol and port (e.g., "http://localhost:8080").
|
||||
// The token parameter is a JWT used for Bearer authentication; pass empty string if auth is disabled.
|
||||
func NewCCMetricStore(url string, token string) *CCMetricStore {
|
||||
return &CCMetricStore{
|
||||
url: url,
|
||||
queryEndpoint: fmt.Sprintf("%s/api/query", url),
|
||||
jwt: token,
|
||||
client: http.Client{
|
||||
Timeout: 10 * time.Second,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// doRequest executes an HTTP POST request to the cc-metric-store query API.
|
||||
// It handles JSON encoding/decoding, authentication, and API versioning.
|
||||
// The request body is automatically closed to prevent resource leaks.
|
||||
func (ccms *CCMetricStore) doRequest(
|
||||
ctx context.Context,
|
||||
body *APIQueryRequest,
|
||||
) (*APIQueryResponse, error) {
|
||||
buf := &bytes.Buffer{}
|
||||
if err := json.NewEncoder(buf).Encode(body); err != nil {
|
||||
cclog.Errorf("Error while encoding request body: %s", err.Error())
|
||||
return nil, err
|
||||
}
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, ccms.queryEndpoint, buf)
|
||||
if err != nil {
|
||||
cclog.Errorf("Error while building request body: %s", err.Error())
|
||||
return nil, err
|
||||
}
|
||||
if ccms.jwt != "" {
|
||||
req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", ccms.jwt))
|
||||
}
|
||||
|
||||
// versioning the cc-metric-store query API.
|
||||
// v2 = data with resampling
|
||||
// v1 = data without resampling
|
||||
q := req.URL.Query()
|
||||
q.Add("version", "v2")
|
||||
req.URL.RawQuery = q.Encode()
|
||||
|
||||
res, err := ccms.client.Do(req)
|
||||
if err != nil {
|
||||
cclog.Errorf("Error while performing request: %s", err.Error())
|
||||
return nil, err
|
||||
}
|
||||
defer res.Body.Close()
|
||||
|
||||
if res.StatusCode != http.StatusOK {
|
||||
return nil, fmt.Errorf("'%s': HTTP Status: %s", ccms.queryEndpoint, res.Status)
|
||||
}
|
||||
|
||||
var resBody APIQueryResponse
|
||||
if err := json.NewDecoder(bufio.NewReader(res.Body)).Decode(&resBody); err != nil {
|
||||
cclog.Errorf("Error while decoding result body: %s", err.Error())
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &resBody, nil
|
||||
}
|
||||
|
||||
// LoadData retrieves time series data and statistics for the specified job and metrics.
|
||||
// It queries data for the job's time range and resources, handling scope transformations automatically.
|
||||
//
|
||||
// Parameters:
|
||||
// - job: Job metadata including cluster, time range, and allocated resources
|
||||
// - metrics: List of metric names to retrieve
|
||||
// - scopes: Requested metric scopes (node, socket, core, etc.)
|
||||
// - ctx: Context for cancellation and timeouts
|
||||
// - resolution: Data resolution in seconds (0 for native resolution)
|
||||
//
|
||||
// Returns JobData organized as: metric -> scope -> series list.
|
||||
// Supports partial errors: returns available data even if some queries fail.
|
||||
func (ccms *CCMetricStore) LoadData(
|
||||
job *schema.Job,
|
||||
metrics []string,
|
||||
scopes []schema.MetricScope,
|
||||
ctx context.Context,
|
||||
resolution int,
|
||||
) (schema.JobData, error) {
|
||||
queries, assignedScope, err := ccms.buildQueries(job, metrics, scopes, resolution)
|
||||
if err != nil {
|
||||
cclog.Errorf("Error while building queries for jobId %d, Metrics %v, Scopes %v: %s", job.JobID, metrics, scopes, err.Error())
|
||||
return nil, err
|
||||
}
|
||||
|
||||
req := APIQueryRequest{
|
||||
Cluster: job.Cluster,
|
||||
From: job.StartTime,
|
||||
To: job.StartTime + int64(job.Duration),
|
||||
Queries: queries,
|
||||
WithStats: true,
|
||||
WithData: true,
|
||||
}
|
||||
|
||||
resBody, err := ccms.doRequest(ctx, &req)
|
||||
if err != nil {
|
||||
cclog.Errorf("Error while performing request for job %d: %s", job.JobID, err.Error())
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var errors []string
|
||||
jobData := make(schema.JobData)
|
||||
for i, row := range resBody.Results {
|
||||
query := req.Queries[i]
|
||||
metric := query.Metric
|
||||
scope := assignedScope[i]
|
||||
mc := archive.GetMetricConfig(job.Cluster, metric)
|
||||
if _, ok := jobData[metric]; !ok {
|
||||
jobData[metric] = make(map[schema.MetricScope]*schema.JobMetric)
|
||||
}
|
||||
|
||||
res := mc.Timestep
|
||||
if len(row) > 0 {
|
||||
res = row[0].Resolution
|
||||
}
|
||||
|
||||
jobMetric, ok := jobData[metric][scope]
|
||||
if !ok {
|
||||
jobMetric = &schema.JobMetric{
|
||||
Unit: mc.Unit,
|
||||
Timestep: res,
|
||||
Series: make([]schema.Series, 0),
|
||||
}
|
||||
jobData[metric][scope] = jobMetric
|
||||
}
|
||||
|
||||
for ndx, res := range row {
|
||||
if res.Error != nil {
|
||||
/* Build list for "partial errors", if any */
|
||||
errors = append(errors, fmt.Sprintf("failed to fetch '%s' from host '%s': %s", query.Metric, query.Hostname, *res.Error))
|
||||
continue
|
||||
}
|
||||
|
||||
id := (*string)(nil)
|
||||
if query.Type != nil {
|
||||
id = new(string)
|
||||
*id = query.TypeIds[ndx]
|
||||
}
|
||||
|
||||
sanitizeStats(&res.Avg, &res.Min, &res.Max)
|
||||
|
||||
jobMetric.Series = append(jobMetric.Series, schema.Series{
|
||||
Hostname: query.Hostname,
|
||||
Id: id,
|
||||
Statistics: schema.MetricStatistics{
|
||||
Avg: float64(res.Avg),
|
||||
Min: float64(res.Min),
|
||||
Max: float64(res.Max),
|
||||
},
|
||||
Data: res.Data,
|
||||
})
|
||||
}
|
||||
|
||||
// So that one can later check len(jobData):
|
||||
if len(jobMetric.Series) == 0 {
|
||||
delete(jobData[metric], scope)
|
||||
if len(jobData[metric]) == 0 {
|
||||
delete(jobData, metric)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if len(errors) != 0 {
|
||||
/* Returns list for "partial errors" */
|
||||
return jobData, fmt.Errorf("METRICDATA/CCMS > Errors: %s", strings.Join(errors, ", "))
|
||||
}
|
||||
return jobData, nil
|
||||
}
|
||||
|
||||
// LoadStats retrieves min/avg/max statistics for job metrics at node scope.
|
||||
// This is faster than LoadData when only statistical summaries are needed (no time series data).
|
||||
//
|
||||
// Returns statistics organized as: metric -> hostname -> statistics.
|
||||
func (ccms *CCMetricStore) LoadStats(
|
||||
job *schema.Job,
|
||||
metrics []string,
|
||||
ctx context.Context,
|
||||
) (map[string]map[string]schema.MetricStatistics, error) {
|
||||
queries, _, err := ccms.buildQueries(job, metrics, []schema.MetricScope{schema.MetricScopeNode}, 0) // #166 Add scope shere for analysis view accelerator normalization?
|
||||
if err != nil {
|
||||
cclog.Errorf("Error while building queries for jobId %d, Metrics %v: %s", job.JobID, metrics, err.Error())
|
||||
return nil, err
|
||||
}
|
||||
|
||||
req := APIQueryRequest{
|
||||
Cluster: job.Cluster,
|
||||
From: job.StartTime,
|
||||
To: job.StartTime + int64(job.Duration),
|
||||
Queries: queries,
|
||||
WithStats: true,
|
||||
WithData: false,
|
||||
}
|
||||
|
||||
resBody, err := ccms.doRequest(ctx, &req)
|
||||
if err != nil {
|
||||
cclog.Errorf("Error while performing request for job %d: %s", job.JobID, err.Error())
|
||||
return nil, err
|
||||
}
|
||||
|
||||
stats := make(map[string]map[string]schema.MetricStatistics, len(metrics))
|
||||
for i, res := range resBody.Results {
|
||||
query := req.Queries[i]
|
||||
metric := query.Metric
|
||||
data := res[0]
|
||||
if data.Error != nil {
|
||||
cclog.Errorf("fetching %s for node %s failed: %s", metric, query.Hostname, *data.Error)
|
||||
continue
|
||||
}
|
||||
|
||||
metricdata, ok := stats[metric]
|
||||
if !ok {
|
||||
metricdata = make(map[string]schema.MetricStatistics, job.NumNodes)
|
||||
stats[metric] = metricdata
|
||||
}
|
||||
|
||||
if hasNaNStats(data.Avg, data.Min, data.Max) {
|
||||
cclog.Warnf("fetching %s for node %s failed: one of avg/min/max is NaN", metric, query.Hostname)
|
||||
continue
|
||||
}
|
||||
|
||||
metricdata[query.Hostname] = schema.MetricStatistics{
|
||||
Avg: float64(data.Avg),
|
||||
Min: float64(data.Min),
|
||||
Max: float64(data.Max),
|
||||
}
|
||||
}
|
||||
|
||||
return stats, nil
|
||||
}
|
||||
|
||||
// LoadScopedStats retrieves statistics for job metrics across multiple scopes.
|
||||
// Used for the Job-View Statistics Table to display per-scope breakdowns.
|
||||
//
|
||||
// Returns statistics organized as: metric -> scope -> list of scoped statistics.
|
||||
// Each scoped statistic includes hostname, hardware ID (if applicable), and min/avg/max values.
|
||||
func (ccms *CCMetricStore) LoadScopedStats(
|
||||
job *schema.Job,
|
||||
metrics []string,
|
||||
scopes []schema.MetricScope,
|
||||
ctx context.Context,
|
||||
) (schema.ScopedJobStats, error) {
|
||||
queries, assignedScope, err := ccms.buildQueries(job, metrics, scopes, 0)
|
||||
if err != nil {
|
||||
cclog.Errorf("Error while building queries for jobId %d, Metrics %v, Scopes %v: %s", job.JobID, metrics, scopes, err.Error())
|
||||
return nil, err
|
||||
}
|
||||
|
||||
req := APIQueryRequest{
|
||||
Cluster: job.Cluster,
|
||||
From: job.StartTime,
|
||||
To: job.StartTime + int64(job.Duration),
|
||||
Queries: queries,
|
||||
WithStats: true,
|
||||
WithData: false,
|
||||
}
|
||||
|
||||
resBody, err := ccms.doRequest(ctx, &req)
|
||||
if err != nil {
|
||||
cclog.Errorf("Error while performing request for job %d: %s", job.JobID, err.Error())
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var errors []string
|
||||
scopedJobStats := make(schema.ScopedJobStats)
|
||||
|
||||
for i, row := range resBody.Results {
|
||||
query := req.Queries[i]
|
||||
metric := query.Metric
|
||||
scope := assignedScope[i]
|
||||
|
||||
if _, ok := scopedJobStats[metric]; !ok {
|
||||
scopedJobStats[metric] = make(map[schema.MetricScope][]*schema.ScopedStats)
|
||||
}
|
||||
|
||||
if _, ok := scopedJobStats[metric][scope]; !ok {
|
||||
scopedJobStats[metric][scope] = make([]*schema.ScopedStats, 0)
|
||||
}
|
||||
|
||||
for ndx, res := range row {
|
||||
if res.Error != nil {
|
||||
/* Build list for "partial errors", if any */
|
||||
errors = append(errors, fmt.Sprintf("failed to fetch '%s' from host '%s': %s", query.Metric, query.Hostname, *res.Error))
|
||||
continue
|
||||
}
|
||||
|
||||
id := (*string)(nil)
|
||||
if query.Type != nil {
|
||||
id = new(string)
|
||||
*id = query.TypeIds[ndx]
|
||||
}
|
||||
|
||||
sanitizeStats(&res.Avg, &res.Min, &res.Max)
|
||||
|
||||
scopedJobStats[metric][scope] = append(scopedJobStats[metric][scope], &schema.ScopedStats{
|
||||
Hostname: query.Hostname,
|
||||
Id: id,
|
||||
Data: &schema.MetricStatistics{
|
||||
Avg: float64(res.Avg),
|
||||
Min: float64(res.Min),
|
||||
Max: float64(res.Max),
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
// So that one can later check len(scopedJobStats[metric][scope]): Remove from map if empty
|
||||
if len(scopedJobStats[metric][scope]) == 0 {
|
||||
delete(scopedJobStats[metric], scope)
|
||||
if len(scopedJobStats[metric]) == 0 {
|
||||
delete(scopedJobStats, metric)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if len(errors) != 0 {
|
||||
/* Returns list for "partial errors" */
|
||||
return scopedJobStats, fmt.Errorf("METRICDATA/CCMS > Errors: %s", strings.Join(errors, ", "))
|
||||
}
|
||||
return scopedJobStats, nil
|
||||
}
|
||||
|
||||
// LoadNodeData retrieves current metric data for specified nodes in a cluster.
|
||||
// Used for the Systems-View Node-Overview to display real-time node status.
|
||||
//
|
||||
// If nodes is nil, queries all metrics for all nodes in the cluster (bulk query).
|
||||
// Returns data organized as: hostname -> metric -> list of JobMetric (with time series and stats).
|
||||
func (ccms *CCMetricStore) LoadNodeData(
|
||||
cluster string,
|
||||
metrics, nodes []string,
|
||||
scopes []schema.MetricScope,
|
||||
from, to time.Time,
|
||||
ctx context.Context,
|
||||
) (map[string]map[string][]*schema.JobMetric, error) {
|
||||
req := APIQueryRequest{
|
||||
Cluster: cluster,
|
||||
From: from.Unix(),
|
||||
To: to.Unix(),
|
||||
WithStats: true,
|
||||
WithData: true,
|
||||
}
|
||||
|
||||
if nodes == nil {
|
||||
req.ForAllNodes = append(req.ForAllNodes, metrics...)
|
||||
} else {
|
||||
for _, node := range nodes {
|
||||
for _, metric := range metrics {
|
||||
req.Queries = append(req.Queries, APIQuery{
|
||||
Hostname: node,
|
||||
Metric: metric,
|
||||
Resolution: 0, // Default for Node Queries: Will return metric $Timestep Resolution
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
resBody, err := ccms.doRequest(ctx, &req)
|
||||
if err != nil {
|
||||
cclog.Errorf("Error while performing request for cluster %s: %s", cluster, err.Error())
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var errors []string
|
||||
data := make(map[string]map[string][]*schema.JobMetric)
|
||||
for i, res := range resBody.Results {
|
||||
var query APIQuery
|
||||
if resBody.Queries != nil {
|
||||
query = resBody.Queries[i]
|
||||
} else {
|
||||
query = req.Queries[i]
|
||||
}
|
||||
|
||||
metric := query.Metric
|
||||
qdata := res[0]
|
||||
if qdata.Error != nil {
|
||||
/* Build list for "partial errors", if any */
|
||||
errors = append(errors, fmt.Sprintf("fetching %s for node %s failed: %s", metric, query.Hostname, *qdata.Error))
|
||||
}
|
||||
|
||||
sanitizeStats(&qdata.Avg, &qdata.Min, &qdata.Max)
|
||||
|
||||
hostdata, ok := data[query.Hostname]
|
||||
if !ok {
|
||||
hostdata = make(map[string][]*schema.JobMetric)
|
||||
data[query.Hostname] = hostdata
|
||||
}
|
||||
|
||||
mc := archive.GetMetricConfig(cluster, metric)
|
||||
hostdata[metric] = append(hostdata[metric], &schema.JobMetric{
|
||||
Unit: mc.Unit,
|
||||
Timestep: mc.Timestep,
|
||||
Series: []schema.Series{
|
||||
{
|
||||
Hostname: query.Hostname,
|
||||
Data: qdata.Data,
|
||||
Statistics: schema.MetricStatistics{
|
||||
Avg: float64(qdata.Avg),
|
||||
Min: float64(qdata.Min),
|
||||
Max: float64(qdata.Max),
|
||||
},
|
||||
},
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
if len(errors) != 0 {
|
||||
/* Returns list of "partial errors" */
|
||||
return data, fmt.Errorf("METRICDATA/CCMS > Errors: %s", strings.Join(errors, ", "))
|
||||
}
|
||||
|
||||
return data, nil
|
||||
}
|
||||
|
||||
// LoadNodeListData retrieves paginated node metrics for the Systems-View Node-List.
|
||||
//
|
||||
// Supports filtering by subcluster and node name pattern. The nodeFilter performs
|
||||
// substring matching on hostnames.
|
||||
//
|
||||
// Returns:
|
||||
// - Node data organized as: hostname -> JobData (metric -> scope -> series)
|
||||
// - Total node count (before pagination)
|
||||
// - HasNextPage flag indicating if more pages are available
|
||||
// - Error (may be partial error with some data returned)
|
||||
func (ccms *CCMetricStore) LoadNodeListData(
|
||||
cluster, subCluster string,
|
||||
nodes []string,
|
||||
metrics []string,
|
||||
scopes []schema.MetricScope,
|
||||
resolution int,
|
||||
from, to time.Time,
|
||||
ctx context.Context,
|
||||
) (map[string]schema.JobData, error) {
|
||||
queries, assignedScope, err := ccms.buildNodeQueries(cluster, subCluster, nodes, metrics, scopes, resolution)
|
||||
if err != nil {
|
||||
cclog.Errorf("Error while building node queries for Cluster %s, SubCLuster %s, Metrics %v, Scopes %v: %s", cluster, subCluster, metrics, scopes, err.Error())
|
||||
return nil, err
|
||||
}
|
||||
|
||||
req := APIQueryRequest{
|
||||
Cluster: cluster,
|
||||
Queries: queries,
|
||||
From: from.Unix(),
|
||||
To: to.Unix(),
|
||||
WithStats: true,
|
||||
WithData: true,
|
||||
}
|
||||
|
||||
resBody, err := ccms.doRequest(ctx, &req)
|
||||
if err != nil {
|
||||
cclog.Errorf("Error while performing request for cluster %s: %s", cluster, err.Error())
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var errors []string
|
||||
data := make(map[string]schema.JobData)
|
||||
for i, row := range resBody.Results {
|
||||
var query APIQuery
|
||||
if resBody.Queries != nil {
|
||||
query = resBody.Queries[i]
|
||||
} else {
|
||||
query = req.Queries[i]
|
||||
}
|
||||
// qdata := res[0]
|
||||
metric := query.Metric
|
||||
scope := assignedScope[i]
|
||||
mc := archive.GetMetricConfig(cluster, metric)
|
||||
|
||||
res := mc.Timestep
|
||||
if len(row) > 0 {
|
||||
res = row[0].Resolution
|
||||
}
|
||||
|
||||
// Init Nested Map Data Structures If Not Found
|
||||
hostData, ok := data[query.Hostname]
|
||||
if !ok {
|
||||
hostData = make(schema.JobData)
|
||||
data[query.Hostname] = hostData
|
||||
}
|
||||
|
||||
metricData, ok := hostData[metric]
|
||||
if !ok {
|
||||
metricData = make(map[schema.MetricScope]*schema.JobMetric)
|
||||
data[query.Hostname][metric] = metricData
|
||||
}
|
||||
|
||||
scopeData, ok := metricData[scope]
|
||||
if !ok {
|
||||
scopeData = &schema.JobMetric{
|
||||
Unit: mc.Unit,
|
||||
Timestep: res,
|
||||
Series: make([]schema.Series, 0),
|
||||
}
|
||||
data[query.Hostname][metric][scope] = scopeData
|
||||
}
|
||||
|
||||
for ndx, res := range row {
|
||||
if res.Error != nil {
|
||||
/* Build list for "partial errors", if any */
|
||||
errors = append(errors, fmt.Sprintf("failed to fetch '%s' from host '%s': %s", query.Metric, query.Hostname, *res.Error))
|
||||
continue
|
||||
}
|
||||
|
||||
id := (*string)(nil)
|
||||
if query.Type != nil {
|
||||
id = new(string)
|
||||
*id = query.TypeIds[ndx]
|
||||
}
|
||||
|
||||
sanitizeStats(&res.Avg, &res.Min, &res.Max)
|
||||
|
||||
scopeData.Series = append(scopeData.Series, schema.Series{
|
||||
Hostname: query.Hostname,
|
||||
Id: id,
|
||||
Statistics: schema.MetricStatistics{
|
||||
Avg: float64(res.Avg),
|
||||
Min: float64(res.Min),
|
||||
Max: float64(res.Max),
|
||||
},
|
||||
Data: res.Data,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
if len(errors) != 0 {
|
||||
/* Returns list of "partial errors" */
|
||||
return data, fmt.Errorf("METRICDATA/CCMS > Errors: %s", strings.Join(errors, ", "))
|
||||
}
|
||||
|
||||
return data, nil
|
||||
}
|
||||
|
||||
// sanitizeStats replaces NaN values in statistics with 0 to enable JSON marshaling.
|
||||
// Regular float64 values cannot be JSONed when NaN.
|
||||
func sanitizeStats(avg, min, max *schema.Float) {
|
||||
if avg.IsNaN() || min.IsNaN() || max.IsNaN() {
|
||||
*avg = schema.Float(0)
|
||||
*min = schema.Float(0)
|
||||
*max = schema.Float(0)
|
||||
}
|
||||
}
|
||||
|
||||
// hasNaNStats returns true if any of the statistics contain NaN values.
|
||||
func hasNaNStats(avg, min, max schema.Float) bool {
|
||||
return avg.IsNaN() || min.IsNaN() || max.IsNaN()
|
||||
}
|
||||
@@ -280,6 +280,7 @@ func (r *JobRepository) FindConcurrentJobs(
|
||||
stopTimeTail := stopTime - overlapBufferEnd
|
||||
startTimeFront := startTime + overlapBufferEnd
|
||||
|
||||
// Reminder: BETWEEN Queries are slower and dont use indices as frequently: Can this be optimized?
|
||||
queryRunning := query.Where("job.job_state = ?").Where("(job.start_time BETWEEN ? AND ? OR job.start_time < ?)",
|
||||
"running", startTimeTail, stopTimeTail, startTime)
|
||||
// Get At Least One Exact Hostname Match from JSON Resources Array in Database
|
||||
|
||||
@@ -274,17 +274,36 @@ func BuildWhereClause(filter *model.JobFilter, query sq.SelectBuilder) sq.Select
|
||||
}
|
||||
|
||||
// buildIntCondition creates a BETWEEN clause for integer range filters.
|
||||
// Reminder: BETWEEN Queries are slower and dont use indices as frequently: Only use if both conditions required
|
||||
func buildIntCondition(field string, cond *config.IntRange, query sq.SelectBuilder) sq.SelectBuilder {
|
||||
return query.Where(field+" BETWEEN ? AND ?", cond.From, cond.To)
|
||||
if cond.From != 0 && cond.To != 0 {
|
||||
return query.Where(field+" BETWEEN ? AND ?", cond.From, cond.To)
|
||||
} else if cond.From != 0 {
|
||||
return query.Where("? <= "+field, cond.From)
|
||||
} else if cond.To != 0 {
|
||||
return query.Where(field+" <= ?", cond.To)
|
||||
} else {
|
||||
return query
|
||||
}
|
||||
}
|
||||
|
||||
// buildFloatCondition creates a BETWEEN clause for float range filters.
|
||||
// Reminder: BETWEEN Queries are slower and dont use indices as frequently: Only use if both conditions required
|
||||
func buildFloatCondition(field string, cond *model.FloatRange, query sq.SelectBuilder) sq.SelectBuilder {
|
||||
return query.Where(field+" BETWEEN ? AND ?", cond.From, cond.To)
|
||||
if cond.From != 0.0 && cond.To != 0.0 {
|
||||
return query.Where(field+" BETWEEN ? AND ?", cond.From, cond.To)
|
||||
} else if cond.From != 0.0 {
|
||||
return query.Where("? <= "+field, cond.From)
|
||||
} else if cond.To != 0.0 {
|
||||
return query.Where(field+" <= ?", cond.To)
|
||||
} else {
|
||||
return query
|
||||
}
|
||||
}
|
||||
|
||||
// buildTimeCondition creates time range filters supporting absolute timestamps,
|
||||
// relative time ranges (last6h, last24h, last7d, last30d), or open-ended ranges.
|
||||
// Reminder: BETWEEN Queries are slower and dont use indices as frequently: Only use if both conditions required
|
||||
func buildTimeCondition(field string, cond *config.TimeRange, query sq.SelectBuilder) sq.SelectBuilder {
|
||||
if cond.From != nil && cond.To != nil {
|
||||
return query.Where(field+" BETWEEN ? AND ?", cond.From.Unix(), cond.To.Unix())
|
||||
@@ -308,16 +327,25 @@ func buildTimeCondition(field string, cond *config.TimeRange, query sq.SelectBui
|
||||
cclog.Debugf("No known named timeRange: startTime.range = %s", cond.Range)
|
||||
return query
|
||||
}
|
||||
return query.Where(field+" BETWEEN ? AND ?", then, now)
|
||||
return query.Where("? <= "+field, then)
|
||||
} else {
|
||||
return query
|
||||
}
|
||||
}
|
||||
|
||||
// buildFloatJSONCondition creates a filter on a numeric field within the footprint JSON column.
|
||||
// Reminder: BETWEEN Queries are slower and dont use indices as frequently: Only use if both conditions required
|
||||
func buildFloatJSONCondition(condName string, condRange *model.FloatRange, query sq.SelectBuilder) sq.SelectBuilder {
|
||||
query = query.Where("JSON_VALID(footprint)")
|
||||
return query.Where("JSON_EXTRACT(footprint, \"$."+condName+"\") BETWEEN ? AND ?", condRange.From, condRange.To)
|
||||
if condRange.From != 0.0 && condRange.To != 0.0 {
|
||||
return query.Where("JSON_EXTRACT(footprint, \"$."+condName+"\") BETWEEN ? AND ?", condRange.From, condRange.To)
|
||||
} else if condRange.From != 0.0 {
|
||||
return query.Where("? <= JSON_EXTRACT(footprint, \"$."+condName+"\")", condRange.From)
|
||||
} else if condRange.To != 0.0 {
|
||||
return query.Where("JSON_EXTRACT(footprint, \"$."+condName+"\") <= ?", condRange.To)
|
||||
} else {
|
||||
return query
|
||||
}
|
||||
}
|
||||
|
||||
// buildStringCondition creates filters for string fields supporting equality,
|
||||
|
||||
@@ -124,13 +124,15 @@ CREATE INDEX IF NOT EXISTS jobs_cluster_user ON job (cluster, hpc_user);
|
||||
CREATE INDEX IF NOT EXISTS jobs_cluster_project ON job (cluster, project);
|
||||
CREATE INDEX IF NOT EXISTS jobs_cluster_subcluster ON job (cluster, subcluster);
|
||||
-- Cluster Filter Sorting
|
||||
CREATE INDEX IF NOT EXISTS jobs_cluster_starttime ON job (cluster, start_time);
|
||||
CREATE INDEX IF NOT EXISTS jobs_cluster_duration ON job (cluster, duration);
|
||||
CREATE INDEX IF NOT EXISTS jobs_cluster_numnodes ON job (cluster, num_nodes);
|
||||
CREATE INDEX IF NOT EXISTS jobs_cluster_numhwthreads ON job (cluster, num_hwthreads);
|
||||
CREATE INDEX IF NOT EXISTS jobs_cluster_numacc ON job (cluster, num_acc);
|
||||
CREATE INDEX IF NOT EXISTS jobs_cluster_energy ON job (cluster, energy);
|
||||
|
||||
-- Cluster Time Filter Sorting
|
||||
CREATE INDEX IF NOT EXISTS jobs_cluster_duration_starttime ON job (cluster, duration, start_time);
|
||||
CREATE INDEX IF NOT EXISTS jobs_cluster_starttime_duration ON job (cluster, start_time, duration);
|
||||
|
||||
-- Cluster+Partition Filter
|
||||
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_user ON job (cluster, cluster_partition, hpc_user);
|
||||
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_project ON job (cluster, cluster_partition, project);
|
||||
@@ -138,76 +140,90 @@ CREATE INDEX IF NOT EXISTS jobs_cluster_partition_jobstate ON job (cluster, clus
|
||||
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_shared ON job (cluster, cluster_partition, shared);
|
||||
|
||||
-- Cluster+Partition Filter Sorting
|
||||
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_starttime ON job (cluster, cluster_partition, start_time);
|
||||
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_duration ON job (cluster, cluster_partition, duration);
|
||||
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_numnodes ON job (cluster, cluster_partition, num_nodes);
|
||||
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_numhwthreads ON job (cluster, cluster_partition, num_hwthreads);
|
||||
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_numacc ON job (cluster, cluster_partition, num_acc);
|
||||
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_energy ON job (cluster, cluster_partition, energy);
|
||||
|
||||
-- Cluster+Partition Time Filter Sorting
|
||||
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_duration_starttime ON job (cluster, cluster_partition, duration, start_time);
|
||||
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_starttime_duration ON job (cluster, cluster_partition, start_time, duration);
|
||||
|
||||
-- Cluster+JobState Filter
|
||||
CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_user ON job (cluster, job_state, hpc_user);
|
||||
CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_project ON job (cluster, job_state, project);
|
||||
-- Cluster+JobState Filter Sorting
|
||||
CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_starttime ON job (cluster, job_state, start_time);
|
||||
CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_duration ON job (cluster, job_state, duration);
|
||||
CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_numnodes ON job (cluster, job_state, num_nodes);
|
||||
CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_numhwthreads ON job (cluster, job_state, num_hwthreads);
|
||||
CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_numacc ON job (cluster, job_state, num_acc);
|
||||
CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_energy ON job (cluster, job_state, energy);
|
||||
|
||||
-- Cluster+JobState Time Filter Sorting
|
||||
CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_starttime_duration ON job (cluster, job_state, start_time, duration);
|
||||
CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_duration_starttime ON job (cluster, job_state, duration, start_time);
|
||||
|
||||
-- Cluster+Shared Filter
|
||||
CREATE INDEX IF NOT EXISTS jobs_cluster_shared_user ON job (cluster, shared, hpc_user);
|
||||
CREATE INDEX IF NOT EXISTS jobs_cluster_shared_project ON job (cluster, shared, project);
|
||||
-- Cluster+Shared Filter Sorting
|
||||
CREATE INDEX IF NOT EXISTS jobs_cluster_shared_starttime ON job (cluster, shared, start_time);
|
||||
CREATE INDEX IF NOT EXISTS jobs_cluster_shared_duration ON job (cluster, shared, duration);
|
||||
CREATE INDEX IF NOT EXISTS jobs_cluster_shared_numnodes ON job (cluster, shared, num_nodes);
|
||||
CREATE INDEX IF NOT EXISTS jobs_cluster_shared_numhwthreads ON job (cluster, shared, num_hwthreads);
|
||||
CREATE INDEX IF NOT EXISTS jobs_cluster_shared_numacc ON job (cluster, shared, num_acc);
|
||||
CREATE INDEX IF NOT EXISTS jobs_cluster_shared_energy ON job (cluster, shared, energy);
|
||||
|
||||
-- Cluster+Shared Time Filter Sorting
|
||||
CREATE INDEX IF NOT EXISTS jobs_cluster_shared_starttime_duration ON job (cluster, shared, start_time, duration);
|
||||
CREATE INDEX IF NOT EXISTS jobs_cluster_shared_duration_starttime ON job (cluster, shared, duration, start_time);
|
||||
|
||||
-- User Filter
|
||||
-- User Filter Sorting
|
||||
CREATE INDEX IF NOT EXISTS jobs_user_starttime ON job (hpc_user, start_time);
|
||||
CREATE INDEX IF NOT EXISTS jobs_user_duration ON job (hpc_user, duration);
|
||||
CREATE INDEX IF NOT EXISTS jobs_user_numnodes ON job (hpc_user, num_nodes);
|
||||
CREATE INDEX IF NOT EXISTS jobs_user_numhwthreads ON job (hpc_user, num_hwthreads);
|
||||
CREATE INDEX IF NOT EXISTS jobs_user_numacc ON job (hpc_user, num_acc);
|
||||
CREATE INDEX IF NOT EXISTS jobs_user_energy ON job (hpc_user, energy);
|
||||
|
||||
-- Cluster+Shared Time Filter Sorting
|
||||
CREATE INDEX IF NOT EXISTS jobs_user_starttime_duration ON job (hpc_user, start_time, duration);
|
||||
CREATE INDEX IF NOT EXISTS jobs_user_duration_starttime ON job (hpc_user, duration, start_time);
|
||||
|
||||
-- Project Filter
|
||||
CREATE INDEX IF NOT EXISTS jobs_project_user ON job (project, hpc_user);
|
||||
-- Project Filter Sorting
|
||||
CREATE INDEX IF NOT EXISTS jobs_project_starttime ON job (project, start_time);
|
||||
CREATE INDEX IF NOT EXISTS jobs_project_duration ON job (project, duration);
|
||||
CREATE INDEX IF NOT EXISTS jobs_project_numnodes ON job (project, num_nodes);
|
||||
CREATE INDEX IF NOT EXISTS jobs_project_numhwthreads ON job (project, num_hwthreads);
|
||||
CREATE INDEX IF NOT EXISTS jobs_project_numacc ON job (project, num_acc);
|
||||
CREATE INDEX IF NOT EXISTS jobs_project_energy ON job (project, energy);
|
||||
|
||||
-- Cluster+Shared Time Filter Sorting
|
||||
CREATE INDEX IF NOT EXISTS jobs_project_starttime_duration ON job (project, start_time, duration);
|
||||
CREATE INDEX IF NOT EXISTS jobs_project_duration_starttime ON job (project, duration, start_time);
|
||||
|
||||
-- JobState Filter
|
||||
CREATE INDEX IF NOT EXISTS jobs_jobstate_user ON job (job_state, hpc_user);
|
||||
CREATE INDEX IF NOT EXISTS jobs_jobstate_project ON job (job_state, project);
|
||||
-- JobState Filter Sorting
|
||||
CREATE INDEX IF NOT EXISTS jobs_jobstate_starttime ON job (job_state, start_time);
|
||||
CREATE INDEX IF NOT EXISTS jobs_jobstate_duration ON job (job_state, duration);
|
||||
CREATE INDEX IF NOT EXISTS jobs_jobstate_numnodes ON job (job_state, num_nodes);
|
||||
CREATE INDEX IF NOT EXISTS jobs_jobstate_numhwthreads ON job (job_state, num_hwthreads);
|
||||
CREATE INDEX IF NOT EXISTS jobs_jobstate_numacc ON job (job_state, num_acc);
|
||||
CREATE INDEX IF NOT EXISTS jobs_jobstate_energy ON job (job_state, energy);
|
||||
|
||||
-- Cluster+Shared Time Filter Sorting
|
||||
CREATE INDEX IF NOT EXISTS jobs_jobstate_starttime_duration ON job (job_state, start_time, duration);
|
||||
CREATE INDEX IF NOT EXISTS jobs_jobstate_duration_starttime ON job (job_state, duration, start_time);
|
||||
|
||||
-- Shared Filter
|
||||
CREATE INDEX IF NOT EXISTS jobs_shared_user ON job (shared, hpc_user);
|
||||
CREATE INDEX IF NOT EXISTS jobs_shared_project ON job (shared, project);
|
||||
-- Shared Filter Sorting
|
||||
CREATE INDEX IF NOT EXISTS jobs_shared_starttime ON job (shared, start_time);
|
||||
CREATE INDEX IF NOT EXISTS jobs_shared_duration ON job (shared, duration);
|
||||
CREATE INDEX IF NOT EXISTS jobs_shared_numnodes ON job (shared, num_nodes);
|
||||
CREATE INDEX IF NOT EXISTS jobs_shared_numhwthreads ON job (shared, num_hwthreads);
|
||||
CREATE INDEX IF NOT EXISTS jobs_shared_numacc ON job (shared, num_acc);
|
||||
CREATE INDEX IF NOT EXISTS jobs_shared_energy ON job (shared, energy);
|
||||
|
||||
-- Cluster+Shared Time Filter Sorting
|
||||
CREATE INDEX IF NOT EXISTS jobs_shared_starttime_duration ON job (shared, start_time, duration);
|
||||
CREATE INDEX IF NOT EXISTS jobs_shared_duration_starttime ON job (shared, duration, start_time);
|
||||
|
||||
-- ArrayJob Filter
|
||||
CREATE INDEX IF NOT EXISTS jobs_arrayjobid_starttime ON job (array_job_id, start_time);
|
||||
CREATE INDEX IF NOT EXISTS jobs_cluster_arrayjobid_starttime ON job (cluster, array_job_id, start_time);
|
||||
@@ -226,6 +242,10 @@ CREATE INDEX IF NOT EXISTS jobs_numhwthreads_duration ON job (num_hwthreads, dur
|
||||
CREATE INDEX IF NOT EXISTS jobs_numacc_duration ON job (num_acc, duration);
|
||||
CREATE INDEX IF NOT EXISTS jobs_energy_duration ON job (energy, duration);
|
||||
|
||||
-- Backup Indices For High Variety Columns
|
||||
CREATE INDEX IF NOT EXISTS jobs_starttime ON job (start_time);
|
||||
CREATE INDEX IF NOT EXISTS jobs_duration ON job (duration);
|
||||
|
||||
-- Notes:
|
||||
-- Cluster+Partition+Jobstate Filter: Tested -> Full Array Of Combinations non-required
|
||||
-- Cluster+JobState+Shared Filter: Tested -> No further timing improvement
|
||||
|
||||
@@ -197,7 +197,7 @@ func (r *NodeRepository) UpdateNodeState(hostname string, cluster string, nodeSt
|
||||
return err
|
||||
}
|
||||
|
||||
cclog.Infof("Added node '%s' to database", hostname)
|
||||
cclog.Debugf("Added node '%s' to database", hostname)
|
||||
return nil
|
||||
} else {
|
||||
cclog.Warnf("Error while querying node '%v' from database", id)
|
||||
@@ -212,7 +212,7 @@ func (r *NodeRepository) UpdateNodeState(hostname string, cluster string, nodeSt
|
||||
cclog.Errorf("Error while adding node state for '%v' to database", hostname)
|
||||
return err
|
||||
}
|
||||
cclog.Infof("Updated node state for '%s' in database", hostname)
|
||||
cclog.Debugf("Updated node state for '%s' in database", hostname)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -716,8 +716,8 @@ func AccessCheckWithUser(user *schema.User, query sq.SelectBuilder) (sq.SelectBu
|
||||
|
||||
func getNodesFromTopol(cluster string, subCluster string, nodeFilter string, page *model.PageRequest) ([]string, int, bool) {
|
||||
// 0) Init additional vars
|
||||
var hasNextPage bool = false
|
||||
var totalNodes int = 0
|
||||
hasNextPage := false
|
||||
totalNodes := 0
|
||||
|
||||
// 1) Get list of all nodes
|
||||
var topolNodes []string
|
||||
|
||||
@@ -277,10 +277,22 @@ func buildFilterPresets(query url.Values) map[string]interface{} {
|
||||
if query.Get("duration") != "" {
|
||||
parts := strings.Split(query.Get("duration"), "-")
|
||||
if len(parts) == 2 {
|
||||
a, e1 := strconv.Atoi(parts[0])
|
||||
b, e2 := strconv.Atoi(parts[1])
|
||||
if e1 == nil && e2 == nil {
|
||||
filterPresets["duration"] = map[string]int{"from": a, "to": b}
|
||||
if parts[0] == "lessthan" {
|
||||
lt, lte := strconv.Atoi(parts[1])
|
||||
if lte == nil {
|
||||
filterPresets["duration"] = map[string]int{"lessThan": lt, "from": 0, "to": 0}
|
||||
}
|
||||
} else if parts[0] == "morethan" {
|
||||
mt, mte := strconv.Atoi(parts[1])
|
||||
if mte == nil {
|
||||
filterPresets["duration"] = map[string]int{"moreThan": mt, "from": 0, "to": 0}
|
||||
}
|
||||
} else {
|
||||
a, e1 := strconv.Atoi(parts[0])
|
||||
b, e2 := strconv.Atoi(parts[1])
|
||||
if e1 == nil && e2 == nil {
|
||||
filterPresets["duration"] = map[string]int{"from": a, "to": b}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -10,8 +10,8 @@ import (
|
||||
"math"
|
||||
"time"
|
||||
|
||||
"github.com/ClusterCockpit/cc-backend/internal/metricdispatch"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/archive"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/metricstore"
|
||||
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
|
||||
"github.com/ClusterCockpit/cc-lib/v2/schema"
|
||||
sq "github.com/Masterminds/squirrel"
|
||||
@@ -66,7 +66,14 @@ func RegisterFootprintWorker() {
|
||||
|
||||
sJob := time.Now()
|
||||
|
||||
jobStats, err := metricstore.LoadStats(job, allMetrics, context.Background())
|
||||
ms, err := metricdispatch.GetMetricDataRepo(job.Cluster, job.SubCluster)
|
||||
if err != nil {
|
||||
cclog.Errorf("failed to load job data from metric store for job %d (user: %s, project: %s): %s",
|
||||
job.JobID, job.User, job.Project, err.Error())
|
||||
continue
|
||||
}
|
||||
|
||||
jobStats, err := ms.LoadStats(job, allMetrics, context.Background())
|
||||
if err != nil {
|
||||
cclog.Errorf("error wile loading job data stats for footprint update: %v", err)
|
||||
ce++
|
||||
|
||||
@@ -38,6 +38,10 @@ import (
|
||||
"github.com/ClusterCockpit/cc-lib/v2/schema"
|
||||
)
|
||||
|
||||
type InternalMetricStore struct{}
|
||||
|
||||
var MetricStoreHandle *InternalMetricStore
|
||||
|
||||
// TestLoadDataCallback allows tests to override LoadData behavior for testing purposes.
|
||||
// When set to a non-nil function, LoadData will call this function instead of the default implementation.
|
||||
var TestLoadDataCallback func(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context, resolution int) (schema.JobData, error)
|
||||
@@ -65,7 +69,7 @@ var TestLoadDataCallback func(job *schema.Job, metrics []string, scopes []schema
|
||||
// Example:
|
||||
//
|
||||
// jobData, err := LoadData(job, []string{"cpu_load", "mem_used"}, []schema.MetricScope{schema.MetricScopeNode}, ctx, 60)
|
||||
func LoadData(
|
||||
func (ccms *InternalMetricStore) LoadData(
|
||||
job *schema.Job,
|
||||
metrics []string,
|
||||
scopes []schema.MetricScope,
|
||||
@@ -502,7 +506,7 @@ func buildQueries(
|
||||
// Returns:
|
||||
// - Map of metric → hostname → statistics
|
||||
// - Error on query building or fetching failure
|
||||
func LoadStats(
|
||||
func (ccms *InternalMetricStore) LoadStats(
|
||||
job *schema.Job,
|
||||
metrics []string,
|
||||
ctx context.Context,
|
||||
@@ -574,7 +578,7 @@ func LoadStats(
|
||||
// Returns:
|
||||
// - ScopedJobStats: Map of metric → scope → []ScopedStats (with hostname and ID)
|
||||
// - Error or partial error listing failed queries
|
||||
func LoadScopedStats(
|
||||
func (ccms *InternalMetricStore) LoadScopedStats(
|
||||
job *schema.Job,
|
||||
metrics []string,
|
||||
scopes []schema.MetricScope,
|
||||
@@ -675,7 +679,7 @@ func LoadScopedStats(
|
||||
// Returns:
|
||||
// - Map of hostname → metric → []JobMetric
|
||||
// - Error or partial error listing failed queries
|
||||
func LoadNodeData(
|
||||
func (ccms *InternalMetricStore) LoadNodeData(
|
||||
cluster string,
|
||||
metrics, nodes []string,
|
||||
scopes []schema.MetricScope,
|
||||
@@ -778,7 +782,7 @@ func LoadNodeData(
|
||||
// Returns:
|
||||
// - Map of hostname → JobData (metric → scope → JobMetric)
|
||||
// - Error or partial error listing failed queries
|
||||
func LoadNodeListData(
|
||||
func (ccms *InternalMetricStore) LoadNodeListData(
|
||||
cluster, subCluster string,
|
||||
nodes []string,
|
||||
metrics []string,
|
||||
@@ -912,7 +916,6 @@ func buildNodeQueries(
|
||||
scopes []schema.MetricScope,
|
||||
resolution int64,
|
||||
) ([]APIQuery, []schema.MetricScope, error) {
|
||||
|
||||
queries := make([]APIQuery, 0, len(metrics)*len(scopes)*len(nodes))
|
||||
assignedScope := []schema.MetricScope{}
|
||||
|
||||
|
||||
@@ -192,14 +192,14 @@
|
||||
items.push({
|
||||
startTime: { range: filters.startTime.range },
|
||||
});
|
||||
if (filters.duration.from || filters.duration.to)
|
||||
if (filters.duration.from && filters.duration.to)
|
||||
items.push({
|
||||
duration: { from: filters.duration.from, to: filters.duration.to },
|
||||
});
|
||||
if (filters.duration.lessThan)
|
||||
items.push({ duration: { from: 0, to: filters.duration.lessThan } });
|
||||
items.push({ duration: { to: filters.duration.lessThan, from: 0 } });
|
||||
if (filters.duration.moreThan)
|
||||
items.push({ duration: { from: filters.duration.moreThan, to: 604800 } }); // 7 days to include special jobs with long runtimes
|
||||
items.push({ duration: { to: 0, from: filters.duration.moreThan } });
|
||||
if (filters.energy.from || filters.energy.to)
|
||||
items.push({
|
||||
energy: { from: filters.energy.from, to: filters.energy.to },
|
||||
@@ -266,9 +266,9 @@
|
||||
if (filters.duration.from && filters.duration.to)
|
||||
opts.push(`duration=${filters.duration.from}-${filters.duration.to}`);
|
||||
if (filters.duration.lessThan)
|
||||
opts.push(`duration=0-${filters.duration.lessThan}`);
|
||||
opts.push(`duration=lessthan-${filters.duration.lessThan}`);
|
||||
if (filters.duration.moreThan)
|
||||
opts.push(`duration=${filters.duration.moreThan}-604800`);
|
||||
opts.push(`duration=morethan-${filters.duration.moreThan}`);
|
||||
if (filters.tags.length != 0)
|
||||
for (let tag of filters.tags) opts.push(`tag=${tag}`);
|
||||
if (filters.numNodes.from && filters.numNodes.to)
|
||||
|
||||
@@ -254,6 +254,9 @@
|
||||
style="width: {jobInfoColumnWidth}px; padding-top: {headerPaddingTop}px"
|
||||
>
|
||||
Job Info
|
||||
{#if $jobsStore.fetching}
|
||||
<Spinner size="sm" style="margin-left:10px;" secondary />
|
||||
{/if}
|
||||
</th>
|
||||
{#if showFootprint}
|
||||
<th
|
||||
|
||||
@@ -31,14 +31,16 @@
|
||||
setFilter
|
||||
} = $props();
|
||||
|
||||
/* States */
|
||||
let lessState = $state({ hours:0, mins:0 });
|
||||
let moreState = $state({ hours:0, mins:0 });
|
||||
let fromState = $state({ hours:0, mins:0 });
|
||||
let toState = $state({ hours:0, mins:0 });
|
||||
|
||||
/* Derived */
|
||||
let pendingDuration = $derived(presetDuration);
|
||||
let lessState = $derived(secsToHoursAndMins(presetDuration?.lessThan));
|
||||
let moreState = $derived(secsToHoursAndMins(presetDuration?.moreThan));
|
||||
let fromState = $derived(secsToHoursAndMins(presetDuration?.from));
|
||||
let toState = $derived(secsToHoursAndMins(presetDuration?.to));
|
||||
|
||||
const lessDisabled = $derived(
|
||||
let lessDisabled = $derived(
|
||||
moreState.hours !== 0 ||
|
||||
moreState.mins !== 0 ||
|
||||
fromState.hours !== 0 ||
|
||||
@@ -47,7 +49,7 @@
|
||||
toState.mins !== 0
|
||||
);
|
||||
|
||||
const moreDisabled = $derived(
|
||||
let moreDisabled = $derived(
|
||||
lessState.hours !== 0 ||
|
||||
lessState.mins !== 0 ||
|
||||
fromState.hours !== 0 ||
|
||||
@@ -56,13 +58,27 @@
|
||||
toState.mins !== 0
|
||||
);
|
||||
|
||||
const betweenDisabled = $derived(
|
||||
let betweenDisabled = $derived(
|
||||
moreState.hours !== 0 ||
|
||||
moreState.mins !== 0 ||
|
||||
lessState.hours !== 0 ||
|
||||
lessState.mins !== 0
|
||||
)
|
||||
|
||||
/* Effects */
|
||||
$effect(() => {
|
||||
lessState = secsToHoursAndMins(pendingDuration?.lessThan);
|
||||
});
|
||||
$effect(() => {
|
||||
moreState = secsToHoursAndMins(pendingDuration?.moreThan);
|
||||
});
|
||||
$effect(() => {
|
||||
fromState = secsToHoursAndMins(pendingDuration?.from);
|
||||
});
|
||||
$effect(() => {
|
||||
toState = secsToHoursAndMins(pendingDuration?.to);
|
||||
});
|
||||
|
||||
/* Functions */
|
||||
function resetPending() {
|
||||
pendingDuration = {
|
||||
|
||||
@@ -116,7 +116,7 @@
|
||||
|
||||
pendingExtendedLegendData = {};
|
||||
for (const accId of accSet) {
|
||||
const matchJob = $nodeJobsData.data.jobs.items.find((i) => i.resources.find((r) => r.accelerators.includes(accId)))
|
||||
const matchJob = $nodeJobsData?.data?.jobs?.items?.find((i) => i?.resources?.find((r) => r?.accelerators?.includes(accId))) || null
|
||||
const matchUser = matchJob?.user ? matchJob.user : null
|
||||
pendingExtendedLegendData[accId] = {
|
||||
user: (scrambleNames && matchUser)
|
||||
|
||||
Reference in New Issue
Block a user