From 48538142288314a7de36b3cf7fea92e74845e30e Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Tue, 27 Jan 2026 07:17:17 +0100 Subject: [PATCH] Review and refactor metric-store client. Add documentation --- .../cc-metric-store-queries.go | 507 +++++++++++ internal/metricstoreclient/cc-metric-store.go | 818 ++++-------------- 2 files changed, 657 insertions(+), 668 deletions(-) create mode 100644 internal/metricstoreclient/cc-metric-store-queries.go diff --git a/internal/metricstoreclient/cc-metric-store-queries.go b/internal/metricstoreclient/cc-metric-store-queries.go new file mode 100644 index 00000000..338d7028 --- /dev/null +++ b/internal/metricstoreclient/cc-metric-store-queries.go @@ -0,0 +1,507 @@ +// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +// Package metricstoreclient - Query Building +// +// This file contains the query construction and scope transformation logic for cc-metric-store queries. +// It handles the complex mapping between requested metric scopes and native hardware topology, +// automatically aggregating or filtering metrics as needed. +// +// # Scope Transformations +// +// The buildScopeQueries function implements the core scope transformation algorithm. +// It handles 25+ different transformation cases, mapping between: +// - Accelerator (GPU) scope +// - HWThread (hardware thread/SMT) scope +// - Core (CPU core) scope +// - Socket (CPU package) scope +// - MemoryDomain (NUMA domain) scope +// - Node (full system) scope +// +// Transformations follow these rules: +// - Same scope: Return data as-is (e.g., Core → Core) +// - Coarser scope: Aggregate data (e.g., Core → Socket with Aggregate=true) +// - Finer scope: Error - cannot increase granularity +// +// # Query Building +// +// buildQueries and buildNodeQueries are the main entry points, handling job-specific +// and node-specific query construction respectively. They: +// - Validate metric configurations +// - Handle subcluster-specific metric filtering +// - Detect and skip duplicate scope requests +// - Call buildScopeQueries for each metric/scope/host combination +package metricstoreclient + +import ( + "fmt" + "strconv" + + "github.com/ClusterCockpit/cc-backend/pkg/archive" + cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" + "github.com/ClusterCockpit/cc-lib/v2/schema" +) + +// Scope string constants used in API queries. +// Pre-converted to avoid repeated allocations during query building. +var ( + hwthreadString = string(schema.MetricScopeHWThread) + coreString = string(schema.MetricScopeCore) + memoryDomainString = string(schema.MetricScopeMemoryDomain) + socketString = string(schema.MetricScopeSocket) + acceleratorString = string(schema.MetricScopeAccelerator) +) + +// buildQueries constructs API queries for job-specific metric data. +// It iterates through metrics, scopes, and job resources to build the complete query set. +// +// The function handles: +// - Metric configuration validation and subcluster filtering +// - Scope deduplication to avoid redundant queries +// - Hardware thread list resolution (job-allocated vs full node) +// - Delegation to buildScopeQueries for scope transformations +// +// Returns queries and their corresponding assigned scopes (which may differ from requested scopes). +func (ccms *CCMetricStore) buildQueries( + job *schema.Job, + metrics []string, + scopes []schema.MetricScope, + resolution int, +) ([]APIQuery, []schema.MetricScope, error) { + queries := make([]APIQuery, 0, len(metrics)*len(scopes)*len(job.Resources)) + assignedScope := []schema.MetricScope{} + + subcluster, scerr := archive.GetSubCluster(job.Cluster, job.SubCluster) + if scerr != nil { + return nil, nil, scerr + } + topology := subcluster.Topology + + for _, metric := range metrics { + remoteName := metric + mc := archive.GetMetricConfig(job.Cluster, metric) + if mc == nil { + cclog.Warnf("metric '%s' is not specified for cluster '%s' - skipping", metric, job.Cluster) + continue + } + + // Skip if metric is removed for subcluster + if len(mc.SubClusters) != 0 { + isRemoved := false + for _, scConfig := range mc.SubClusters { + if scConfig.Name == job.SubCluster && scConfig.Remove { + isRemoved = true + break + } + } + if isRemoved { + continue + } + } + + // Avoid duplicates... + handledScopes := make([]schema.MetricScope, 0, 3) + + scopesLoop: + for _, requestedScope := range scopes { + nativeScope := mc.Scope + if nativeScope == schema.MetricScopeAccelerator && job.NumAcc == 0 { + continue + } + + scope := nativeScope.Max(requestedScope) + for _, s := range handledScopes { + if scope == s { + continue scopesLoop + } + } + handledScopes = append(handledScopes, scope) + + for _, host := range job.Resources { + hwthreads := host.HWThreads + if hwthreads == nil { + hwthreads = topology.Node + } + + hostQueries, hostScopes := buildScopeQueries( + nativeScope, requestedScope, + remoteName, host.Hostname, + &topology, hwthreads, host.Accelerators, + resolution, + ) + + if len(hostQueries) == 0 && len(hostScopes) == 0 { + return nil, nil, fmt.Errorf("METRICDATA/CCMS > TODO: unhandled case: native-scope=%s, requested-scope=%s", nativeScope, requestedScope) + } + + queries = append(queries, hostQueries...) + assignedScope = append(assignedScope, hostScopes...) + } + } + } + + return queries, assignedScope, nil +} + +// buildNodeQueries constructs API queries for node-specific metric data (Systems View). +// Similar to buildQueries but uses full node topology instead of job-allocated resources. +// +// The function handles: +// - Subcluster topology resolution (either pre-loaded or per-node lookup) +// - Full node hardware thread lists (not job-specific subsets) +// - All accelerators on each node +// - Metric configuration validation with subcluster filtering +// +// Returns queries and their corresponding assigned scopes. +func (ccms *CCMetricStore) buildNodeQueries( + cluster string, + subCluster string, + nodes []string, + metrics []string, + scopes []schema.MetricScope, + resolution int, +) ([]APIQuery, []schema.MetricScope, error) { + queries := make([]APIQuery, 0, len(metrics)*len(scopes)*len(nodes)) + assignedScope := []schema.MetricScope{} + + // Get Topol before loop if subCluster given + var subClusterTopol *schema.SubCluster + var scterr error + if subCluster != "" { + subClusterTopol, scterr = archive.GetSubCluster(cluster, subCluster) + if scterr != nil { + cclog.Errorf("could not load cluster %s subCluster %s topology: %s", cluster, subCluster, scterr.Error()) + return nil, nil, scterr + } + } + + for _, metric := range metrics { + remoteName := metric + mc := archive.GetMetricConfig(cluster, metric) + if mc == nil { + cclog.Warnf("metric '%s' is not specified for cluster '%s'", metric, cluster) + continue + } + + // Skip if metric is removed for subcluster + if mc.SubClusters != nil { + isRemoved := false + for _, scConfig := range mc.SubClusters { + if scConfig.Name == subCluster && scConfig.Remove { + isRemoved = true + break + } + } + if isRemoved { + continue + } + } + + // Avoid duplicates... + handledScopes := make([]schema.MetricScope, 0, 3) + + scopesLoop: + for _, requestedScope := range scopes { + nativeScope := mc.Scope + + scope := nativeScope.Max(requestedScope) + for _, s := range handledScopes { + if scope == s { + continue scopesLoop + } + } + handledScopes = append(handledScopes, scope) + + for _, hostname := range nodes { + + // If no subCluster given, get it by node + if subCluster == "" { + subClusterName, scnerr := archive.GetSubClusterByNode(cluster, hostname) + if scnerr != nil { + return nil, nil, scnerr + } + subClusterTopol, scterr = archive.GetSubCluster(cluster, subClusterName) + if scterr != nil { + return nil, nil, scterr + } + } + + // Always full node hwthread id list, no partial queries expected -> Use "topology.Node" directly where applicable + // Always full accelerator id list, no partial queries expected -> Use "acceleratorIds" directly where applicable + topology := subClusterTopol.Topology + acceleratorIds := topology.GetAcceleratorIDs() + + // Moved check here if metric matches hardware specs + if nativeScope == schema.MetricScopeAccelerator && len(acceleratorIds) == 0 { + continue scopesLoop + } + + nodeQueries, nodeScopes := buildScopeQueries( + nativeScope, requestedScope, + remoteName, hostname, + &topology, topology.Node, acceleratorIds, + resolution, + ) + + if len(nodeQueries) == 0 && len(nodeScopes) == 0 { + return nil, nil, fmt.Errorf("METRICDATA/CCMS > TODO: unhandled case: native-scope=%s, requested-scope=%s", nativeScope, requestedScope) + } + + queries = append(queries, nodeQueries...) + assignedScope = append(assignedScope, nodeScopes...) + } + } + } + + return queries, assignedScope, nil +} + +// buildScopeQueries generates API queries for a given scope transformation. +// It returns a slice of queries and corresponding assigned scopes. +// Some transformations (e.g., HWThread -> Core/Socket) may generate multiple queries. +func buildScopeQueries( + nativeScope, requestedScope schema.MetricScope, + metric, hostname string, + topology *schema.Topology, + hwthreads []int, + accelerators []string, + resolution int, +) ([]APIQuery, []schema.MetricScope) { + scope := nativeScope.Max(requestedScope) + queries := []APIQuery{} + scopes := []schema.MetricScope{} + + hwthreadsStr := intToStringSlice(hwthreads) + + // Accelerator -> Accelerator (Use "accelerator" scope if requested scope is lower than node) + if nativeScope == schema.MetricScopeAccelerator && scope.LT(schema.MetricScopeNode) { + if scope != schema.MetricScopeAccelerator { + // Skip all other caught cases + return queries, scopes + } + + queries = append(queries, APIQuery{ + Metric: metric, + Hostname: hostname, + Aggregate: false, + Type: &acceleratorString, + TypeIds: accelerators, + Resolution: resolution, + }) + scopes = append(scopes, schema.MetricScopeAccelerator) + return queries, scopes + } + + // Accelerator -> Node + if nativeScope == schema.MetricScopeAccelerator && scope == schema.MetricScopeNode { + if len(accelerators) == 0 { + return queries, scopes + } + + queries = append(queries, APIQuery{ + Metric: metric, + Hostname: hostname, + Aggregate: true, + Type: &acceleratorString, + TypeIds: accelerators, + Resolution: resolution, + }) + scopes = append(scopes, scope) + return queries, scopes + } + + // HWThread -> HWThread + if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeHWThread { + queries = append(queries, APIQuery{ + Metric: metric, + Hostname: hostname, + Aggregate: false, + Type: &hwthreadString, + TypeIds: hwthreadsStr, + Resolution: resolution, + }) + scopes = append(scopes, scope) + return queries, scopes + } + + // HWThread -> Core + if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeCore { + cores, _ := topology.GetCoresFromHWThreads(hwthreads) + for _, core := range cores { + queries = append(queries, APIQuery{ + Metric: metric, + Hostname: hostname, + Aggregate: true, + Type: &hwthreadString, + TypeIds: intToStringSlice(topology.Core[core]), + Resolution: resolution, + }) + scopes = append(scopes, scope) + } + return queries, scopes + } + + // HWThread -> Socket + if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeSocket { + sockets, _ := topology.GetSocketsFromHWThreads(hwthreads) + for _, socket := range sockets { + queries = append(queries, APIQuery{ + Metric: metric, + Hostname: hostname, + Aggregate: true, + Type: &hwthreadString, + TypeIds: intToStringSlice(topology.Socket[socket]), + Resolution: resolution, + }) + scopes = append(scopes, scope) + } + return queries, scopes + } + + // HWThread -> Node + if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeNode { + queries = append(queries, APIQuery{ + Metric: metric, + Hostname: hostname, + Aggregate: true, + Type: &hwthreadString, + TypeIds: hwthreadsStr, + Resolution: resolution, + }) + scopes = append(scopes, scope) + return queries, scopes + } + + // Core -> Core + if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeCore { + cores, _ := topology.GetCoresFromHWThreads(hwthreads) + queries = append(queries, APIQuery{ + Metric: metric, + Hostname: hostname, + Aggregate: false, + Type: &coreString, + TypeIds: intToStringSlice(cores), + Resolution: resolution, + }) + scopes = append(scopes, scope) + return queries, scopes + } + + // Core -> Socket + if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeSocket { + sockets, _ := topology.GetSocketsFromCores(hwthreads) + for _, socket := range sockets { + queries = append(queries, APIQuery{ + Metric: metric, + Hostname: hostname, + Aggregate: true, + Type: &coreString, + TypeIds: intToStringSlice(topology.Socket[socket]), + Resolution: resolution, + }) + scopes = append(scopes, scope) + } + return queries, scopes + } + + // Core -> Node + if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeNode { + cores, _ := topology.GetCoresFromHWThreads(hwthreads) + queries = append(queries, APIQuery{ + Metric: metric, + Hostname: hostname, + Aggregate: true, + Type: &coreString, + TypeIds: intToStringSlice(cores), + Resolution: resolution, + }) + scopes = append(scopes, scope) + return queries, scopes + } + + // MemoryDomain -> MemoryDomain + if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeMemoryDomain { + memDomains, _ := topology.GetMemoryDomainsFromHWThreads(hwthreads) + queries = append(queries, APIQuery{ + Metric: metric, + Hostname: hostname, + Aggregate: false, + Type: &memoryDomainString, + TypeIds: intToStringSlice(memDomains), + Resolution: resolution, + }) + scopes = append(scopes, scope) + return queries, scopes + } + + // MemoryDomain -> Node + if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeNode { + memDomains, _ := topology.GetMemoryDomainsFromHWThreads(hwthreads) + queries = append(queries, APIQuery{ + Metric: metric, + Hostname: hostname, + Aggregate: true, + Type: &memoryDomainString, + TypeIds: intToStringSlice(memDomains), + Resolution: resolution, + }) + scopes = append(scopes, scope) + return queries, scopes + } + + // Socket -> Socket + if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeSocket { + sockets, _ := topology.GetSocketsFromHWThreads(hwthreads) + queries = append(queries, APIQuery{ + Metric: metric, + Hostname: hostname, + Aggregate: false, + Type: &socketString, + TypeIds: intToStringSlice(sockets), + Resolution: resolution, + }) + scopes = append(scopes, scope) + return queries, scopes + } + + // Socket -> Node + if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeNode { + sockets, _ := topology.GetSocketsFromHWThreads(hwthreads) + queries = append(queries, APIQuery{ + Metric: metric, + Hostname: hostname, + Aggregate: true, + Type: &socketString, + TypeIds: intToStringSlice(sockets), + Resolution: resolution, + }) + scopes = append(scopes, scope) + return queries, scopes + } + + // Node -> Node + if nativeScope == schema.MetricScopeNode && scope == schema.MetricScopeNode { + queries = append(queries, APIQuery{ + Metric: metric, + Hostname: hostname, + Resolution: resolution, + }) + scopes = append(scopes, scope) + return queries, scopes + } + + // Unhandled case - return empty slices + return queries, scopes +} + +// intToStringSlice converts a slice of integers to a slice of strings. +// Used to convert hardware IDs (core IDs, socket IDs, etc.) to the string format required by the API. +func intToStringSlice(is []int) []string { + ss := make([]string, len(is)) + for i, x := range is { + ss[i] = strconv.Itoa(x) + } + return ss +} diff --git a/internal/metricstoreclient/cc-metric-store.go b/internal/metricstoreclient/cc-metric-store.go index 31847a6b..c1505fdc 100644 --- a/internal/metricstoreclient/cc-metric-store.go +++ b/internal/metricstoreclient/cc-metric-store.go @@ -3,6 +3,54 @@ // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. +// Package metricstoreclient provides a client for querying the cc-metric-store time series database. +// +// The cc-metric-store is a high-performance time series database optimized for HPC metric data. +// This client handles HTTP communication, query construction, scope transformations, and data retrieval +// for job and node metrics across different metric scopes (node, socket, core, hwthread, accelerator). +// +// # Architecture +// +// The package is split into two main components: +// - Client Operations (cc-metric-store.go): HTTP client, request handling, data loading methods +// - Query Building (cc-metric-store-queries.go): Query construction and scope transformation logic +// +// # Basic Usage +// +// store := &CCMetricStore{} +// store.Init("http://localhost:8080", "jwt-token") +// +// // Load job data +// jobData, err := store.LoadData(job, metrics, scopes, ctx, resolution) +// if err != nil { +// log.Fatal(err) +// } +// +// # Metric Scopes +// +// The client supports hierarchical metric scopes that map to HPC hardware topology: +// - MetricScopeAccelerator: GPU/accelerator level metrics +// - MetricScopeHWThread: Hardware thread (SMT) level metrics +// - MetricScopeCore: CPU core level metrics +// - MetricScopeSocket: CPU socket level metrics +// - MetricScopeMemoryDomain: NUMA domain level metrics +// - MetricScopeNode: Full node level metrics +// +// The client automatically handles scope transformations, aggregating finer-grained metrics +// to coarser scopes when needed (e.g., aggregating core metrics to socket level). +// +// # Error Handling +// +// The client supports partial errors - if some queries fail, it returns both the successful +// data and an error listing the failed queries. This allows processing partial results +// when some nodes or metrics are temporarily unavailable. +// +// # API Versioning +// +// The client uses cc-metric-store API v2, which includes support for: +// - Data resampling for bandwidth optimization +// - Multi-scope queries in a single request +// - Aggregation across hardware topology levels package metricstoreclient import ( @@ -13,7 +61,6 @@ import ( "fmt" "net/http" "sort" - "strconv" "strings" "time" @@ -23,50 +70,63 @@ import ( "github.com/ClusterCockpit/cc-lib/v2/schema" ) +// CCMetricStore is the HTTP client for communicating with cc-metric-store. +// It manages connection details, authentication, and provides methods for querying metrics. type CCMetricStore struct { - client http.Client - jwt string - url string - queryEndpoint string + client http.Client // HTTP client with 10-second timeout + jwt string // JWT Bearer token for authentication + url string // Base URL of cc-metric-store instance + queryEndpoint string // Full URL to query API endpoint } +// APIQueryRequest represents a request to the cc-metric-store query API. +// It supports both explicit queries and "for-all-nodes" bulk queries. type APIQueryRequest struct { - Cluster string `json:"cluster"` - Queries []APIQuery `json:"queries"` - ForAllNodes []string `json:"for-all-nodes"` - From int64 `json:"from"` - To int64 `json:"to"` - WithStats bool `json:"with-stats"` - WithData bool `json:"with-data"` + Cluster string `json:"cluster"` // Target cluster name + Queries []APIQuery `json:"queries"` // Explicit list of metric queries + ForAllNodes []string `json:"for-all-nodes"` // Metrics to query for all nodes + From int64 `json:"from"` // Start time (Unix timestamp) + To int64 `json:"to"` // End time (Unix timestamp) + WithStats bool `json:"with-stats"` // Include min/avg/max statistics + WithData bool `json:"with-data"` // Include time series data points } +// APIQuery specifies a single metric query with optional scope filtering. +// Type and TypeIds define the hardware scope (core, socket, accelerator, etc.). type APIQuery struct { - Type *string `json:"type,omitempty"` - SubType *string `json:"subtype,omitempty"` - Metric string `json:"metric"` - Hostname string `json:"host"` - Resolution int `json:"resolution"` - TypeIds []string `json:"type-ids,omitempty"` - SubTypeIds []string `json:"subtype-ids,omitempty"` - Aggregate bool `json:"aggreg"` + Type *string `json:"type,omitempty"` // Scope type (e.g., "core", "socket") + SubType *string `json:"subtype,omitempty"` // Sub-scope type (reserved for future use) + Metric string `json:"metric"` // Metric name + Hostname string `json:"host"` // Target hostname + Resolution int `json:"resolution"` // Data resolution in seconds (0 = native) + TypeIds []string `json:"type-ids,omitempty"` // IDs for the scope type (e.g., core IDs) + SubTypeIds []string `json:"subtype-ids,omitempty"` // IDs for sub-scope (reserved) + Aggregate bool `json:"aggreg"` // Aggregate across TypeIds } +// APIQueryResponse contains the results from a cc-metric-store query. +// Results align with the Queries slice by index. type APIQueryResponse struct { - Queries []APIQuery `json:"queries,omitempty"` - Results [][]APIMetricData `json:"results"` + Queries []APIQuery `json:"queries,omitempty"` // Echoed queries (for bulk requests) + Results [][]APIMetricData `json:"results"` // Result data, indexed by query } +// APIMetricData represents time series data and statistics for a single metric series. +// Error is set if this particular series failed to load. type APIMetricData struct { - Error *string `json:"error"` - Data []schema.Float `json:"data"` - From int64 `json:"from"` - To int64 `json:"to"` - Resolution int `json:"resolution"` - Avg schema.Float `json:"avg"` - Min schema.Float `json:"min"` - Max schema.Float `json:"max"` + Error *string `json:"error"` // Error message if query failed + Data []schema.Float `json:"data"` // Time series data points + From int64 `json:"from"` // Actual start time of data + To int64 `json:"to"` // Actual end time of data + Resolution int `json:"resolution"` // Actual resolution of data in seconds + Avg schema.Float `json:"avg"` // Average value across time range + Min schema.Float `json:"min"` // Minimum value in time range + Max schema.Float `json:"max"` // Maximum value in time range } +// Init initializes the CCMetricStore client with connection details. +// The url parameter should include the protocol and port (e.g., "http://localhost:8080"). +// The token parameter is a JWT used for Bearer authentication; pass empty string if auth is disabled. func (ccms *CCMetricStore) Init(url string, token string) { ccms.url = url ccms.queryEndpoint = fmt.Sprintf("%s/api/query", url) @@ -76,6 +136,9 @@ func (ccms *CCMetricStore) Init(url string, token string) { } } +// doRequest executes an HTTP POST request to the cc-metric-store query API. +// It handles JSON encoding/decoding, authentication, and API versioning. +// The request body is automatically closed to prevent resource leaks. func (ccms *CCMetricStore) doRequest( ctx context.Context, body *APIQueryRequest, @@ -107,6 +170,7 @@ func (ccms *CCMetricStore) doRequest( cclog.Errorf("Error while performing request: %s", err.Error()) return nil, err } + defer res.Body.Close() if res.StatusCode != http.StatusOK { return nil, fmt.Errorf("'%s': HTTP Status: %s", ccms.queryEndpoint, res.Status) @@ -121,6 +185,18 @@ func (ccms *CCMetricStore) doRequest( return &resBody, nil } +// LoadData retrieves time series data and statistics for the specified job and metrics. +// It queries data for the job's time range and resources, handling scope transformations automatically. +// +// Parameters: +// - job: Job metadata including cluster, time range, and allocated resources +// - metrics: List of metric names to retrieve +// - scopes: Requested metric scopes (node, socket, core, etc.) +// - ctx: Context for cancellation and timeouts +// - resolution: Data resolution in seconds (0 for native resolution) +// +// Returns JobData organized as: metric -> scope -> series list. +// Supports partial errors: returns available data even if some queries fail. func (ccms *CCMetricStore) LoadData( job *schema.Job, metrics []string, @@ -145,7 +221,7 @@ func (ccms *CCMetricStore) LoadData( resBody, err := ccms.doRequest(ctx, &req) if err != nil { - cclog.Errorf("Error while performing request: %s", err.Error()) + cclog.Errorf("Error while performing request for job %d: %s", job.JobID, err.Error()) return nil, err } @@ -188,12 +264,7 @@ func (ccms *CCMetricStore) LoadData( *id = query.TypeIds[ndx] } - if res.Avg.IsNaN() || res.Min.IsNaN() || res.Max.IsNaN() { - // "schema.Float()" because regular float64 can not be JSONed when NaN. - res.Avg = schema.Float(0) - res.Min = schema.Float(0) - res.Max = schema.Float(0) - } + sanitizeStats(&res.Avg, &res.Min, &res.Max) jobMetric.Series = append(jobMetric.Series, schema.Series{ Hostname: query.Hostname, @@ -223,301 +294,10 @@ func (ccms *CCMetricStore) LoadData( return jobData, nil } -var ( - hwthreadString = string(schema.MetricScopeHWThread) - coreString = string(schema.MetricScopeCore) - memoryDomainString = string(schema.MetricScopeMemoryDomain) - socketString = string(schema.MetricScopeSocket) - acceleratorString = string(schema.MetricScopeAccelerator) -) - -func (ccms *CCMetricStore) buildQueries( - job *schema.Job, - metrics []string, - scopes []schema.MetricScope, - resolution int, -) ([]APIQuery, []schema.MetricScope, error) { - queries := make([]APIQuery, 0, len(metrics)*len(scopes)*len(job.Resources)) - assignedScope := []schema.MetricScope{} - - subcluster, scerr := archive.GetSubCluster(job.Cluster, job.SubCluster) - if scerr != nil { - return nil, nil, scerr - } - topology := subcluster.Topology - - for _, metric := range metrics { - remoteName := metric - mc := archive.GetMetricConfig(job.Cluster, metric) - if mc == nil { - // return nil, fmt.Errorf("METRICDATA/CCMS > metric '%s' is not specified for cluster '%s'", metric, job.Cluster) - cclog.Infof("metric '%s' is not specified for cluster '%s'", metric, job.Cluster) - continue - } - - // Skip if metric is removed for subcluster - if len(mc.SubClusters) != 0 { - isRemoved := false - for _, scConfig := range mc.SubClusters { - if scConfig.Name == job.SubCluster && scConfig.Remove { - isRemoved = true - break - } - } - if isRemoved { - continue - } - } - - // Avoid duplicates... - handledScopes := make([]schema.MetricScope, 0, 3) - - scopesLoop: - for _, requestedScope := range scopes { - nativeScope := mc.Scope - if nativeScope == schema.MetricScopeAccelerator && job.NumAcc == 0 { - continue - } - - scope := nativeScope.Max(requestedScope) - for _, s := range handledScopes { - if scope == s { - continue scopesLoop - } - } - handledScopes = append(handledScopes, scope) - - for _, host := range job.Resources { - hwthreads := host.HWThreads - if hwthreads == nil { - hwthreads = topology.Node - } - - // Accelerator -> Accelerator (Use "accelerator" scope if requested scope is lower than node) - if nativeScope == schema.MetricScopeAccelerator && scope.LT(schema.MetricScopeNode) { - if scope != schema.MetricScopeAccelerator { - // Skip all other catched cases - continue - } - - queries = append(queries, APIQuery{ - Metric: remoteName, - Hostname: host.Hostname, - Aggregate: false, - Type: &acceleratorString, - TypeIds: host.Accelerators, - Resolution: resolution, - }) - assignedScope = append(assignedScope, schema.MetricScopeAccelerator) - continue - } - - // Accelerator -> Node - if nativeScope == schema.MetricScopeAccelerator && scope == schema.MetricScopeNode { - if len(host.Accelerators) == 0 { - continue - } - - queries = append(queries, APIQuery{ - Metric: remoteName, - Hostname: host.Hostname, - Aggregate: true, - Type: &acceleratorString, - TypeIds: host.Accelerators, - Resolution: resolution, - }) - assignedScope = append(assignedScope, scope) - continue - } - - // HWThread -> HWThead - if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeHWThread { - queries = append(queries, APIQuery{ - Metric: remoteName, - Hostname: host.Hostname, - Aggregate: false, - Type: &hwthreadString, - TypeIds: intToStringSlice(hwthreads), - Resolution: resolution, - }) - assignedScope = append(assignedScope, scope) - continue - } - - // HWThread -> Core - if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeCore { - cores, _ := topology.GetCoresFromHWThreads(hwthreads) - for _, core := range cores { - queries = append(queries, APIQuery{ - Metric: remoteName, - Hostname: host.Hostname, - Aggregate: true, - Type: &hwthreadString, - TypeIds: intToStringSlice(topology.Core[core]), - Resolution: resolution, - }) - assignedScope = append(assignedScope, scope) - } - continue - } - - // HWThread -> Socket - if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeSocket { - sockets, _ := topology.GetSocketsFromHWThreads(hwthreads) - for _, socket := range sockets { - queries = append(queries, APIQuery{ - Metric: remoteName, - Hostname: host.Hostname, - Aggregate: true, - Type: &hwthreadString, - TypeIds: intToStringSlice(topology.Socket[socket]), - Resolution: resolution, - }) - assignedScope = append(assignedScope, scope) - } - continue - } - - // HWThread -> Node - if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeNode { - queries = append(queries, APIQuery{ - Metric: remoteName, - Hostname: host.Hostname, - Aggregate: true, - Type: &hwthreadString, - TypeIds: intToStringSlice(hwthreads), - Resolution: resolution, - }) - assignedScope = append(assignedScope, scope) - continue - } - - // Core -> Core - if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeCore { - cores, _ := topology.GetCoresFromHWThreads(hwthreads) - queries = append(queries, APIQuery{ - Metric: remoteName, - Hostname: host.Hostname, - Aggregate: false, - Type: &coreString, - TypeIds: intToStringSlice(cores), - Resolution: resolution, - }) - assignedScope = append(assignedScope, scope) - continue - } - - // Core -> Socket - if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeSocket { - sockets, _ := topology.GetSocketsFromCores(hwthreads) - for _, socket := range sockets { - queries = append(queries, APIQuery{ - Metric: remoteName, - Hostname: host.Hostname, - Aggregate: true, - Type: &coreString, - TypeIds: intToStringSlice(topology.Socket[socket]), - Resolution: resolution, - }) - assignedScope = append(assignedScope, scope) - } - continue - } - - // Core -> Node - if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeNode { - cores, _ := topology.GetCoresFromHWThreads(hwthreads) - queries = append(queries, APIQuery{ - Metric: remoteName, - Hostname: host.Hostname, - Aggregate: true, - Type: &coreString, - TypeIds: intToStringSlice(cores), - Resolution: resolution, - }) - assignedScope = append(assignedScope, scope) - continue - } - - // MemoryDomain -> MemoryDomain - if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeMemoryDomain { - sockets, _ := topology.GetMemoryDomainsFromHWThreads(hwthreads) - queries = append(queries, APIQuery{ - Metric: remoteName, - Hostname: host.Hostname, - Aggregate: false, - Type: &memoryDomainString, - TypeIds: intToStringSlice(sockets), - Resolution: resolution, - }) - assignedScope = append(assignedScope, scope) - continue - } - - // MemoryDoman -> Node - if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeNode { - sockets, _ := topology.GetMemoryDomainsFromHWThreads(hwthreads) - queries = append(queries, APIQuery{ - Metric: remoteName, - Hostname: host.Hostname, - Aggregate: true, - Type: &memoryDomainString, - TypeIds: intToStringSlice(sockets), - Resolution: resolution, - }) - assignedScope = append(assignedScope, scope) - continue - } - - // Socket -> Socket - if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeSocket { - sockets, _ := topology.GetSocketsFromHWThreads(hwthreads) - queries = append(queries, APIQuery{ - Metric: remoteName, - Hostname: host.Hostname, - Aggregate: false, - Type: &socketString, - TypeIds: intToStringSlice(sockets), - Resolution: resolution, - }) - assignedScope = append(assignedScope, scope) - continue - } - - // Socket -> Node - if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeNode { - sockets, _ := topology.GetSocketsFromHWThreads(hwthreads) - queries = append(queries, APIQuery{ - Metric: remoteName, - Hostname: host.Hostname, - Aggregate: true, - Type: &socketString, - TypeIds: intToStringSlice(sockets), - Resolution: resolution, - }) - assignedScope = append(assignedScope, scope) - continue - } - - // Node -> Node - if nativeScope == schema.MetricScopeNode && scope == schema.MetricScopeNode { - queries = append(queries, APIQuery{ - Metric: remoteName, - Hostname: host.Hostname, - Resolution: resolution, - }) - assignedScope = append(assignedScope, scope) - continue - } - - return nil, nil, fmt.Errorf("METRICDATA/CCMS > TODO: unhandled case: native-scope=%s, requested-scope=%s", nativeScope, requestedScope) - } - } - } - - return queries, assignedScope, nil -} - +// LoadStats retrieves min/avg/max statistics for job metrics at node scope. +// This is faster than LoadData when only statistical summaries are needed (no time series data). +// +// Returns statistics organized as: metric -> hostname -> statistics. func (ccms *CCMetricStore) LoadStats( job *schema.Job, metrics []string, @@ -540,7 +320,7 @@ func (ccms *CCMetricStore) LoadStats( resBody, err := ccms.doRequest(ctx, &req) if err != nil { - cclog.Errorf("Error while performing request: %s", err.Error()) + cclog.Errorf("Error while performing request for job %d: %s", job.JobID, err.Error()) return nil, err } @@ -560,7 +340,7 @@ func (ccms *CCMetricStore) LoadStats( stats[metric] = metricdata } - if data.Avg.IsNaN() || data.Min.IsNaN() || data.Max.IsNaN() { + if hasNaNStats(data.Avg, data.Min, data.Max) { cclog.Warnf("fetching %s for node %s failed: one of avg/min/max is NaN", metric, query.Hostname) continue } @@ -575,7 +355,11 @@ func (ccms *CCMetricStore) LoadStats( return stats, nil } -// Used for Job-View Statistics Table +// LoadScopedStats retrieves statistics for job metrics across multiple scopes. +// Used for the Job-View Statistics Table to display per-scope breakdowns. +// +// Returns statistics organized as: metric -> scope -> list of scoped statistics. +// Each scoped statistic includes hostname, hardware ID (if applicable), and min/avg/max values. func (ccms *CCMetricStore) LoadScopedStats( job *schema.Job, metrics []string, @@ -599,7 +383,7 @@ func (ccms *CCMetricStore) LoadScopedStats( resBody, err := ccms.doRequest(ctx, &req) if err != nil { - cclog.Errorf("Error while performing request: %s", err.Error()) + cclog.Errorf("Error while performing request for job %d: %s", job.JobID, err.Error()) return nil, err } @@ -632,12 +416,7 @@ func (ccms *CCMetricStore) LoadScopedStats( *id = query.TypeIds[ndx] } - if res.Avg.IsNaN() || res.Min.IsNaN() || res.Max.IsNaN() { - // "schema.Float()" because regular float64 can not be JSONed when NaN. - res.Avg = schema.Float(0) - res.Min = schema.Float(0) - res.Max = schema.Float(0) - } + sanitizeStats(&res.Avg, &res.Min, &res.Max) scopedJobStats[metric][scope] = append(scopedJobStats[metric][scope], &schema.ScopedStats{ Hostname: query.Hostname, @@ -666,7 +445,11 @@ func (ccms *CCMetricStore) LoadScopedStats( return scopedJobStats, nil } -// Used for Systems-View Node-Overview +// LoadNodeData retrieves current metric data for specified nodes in a cluster. +// Used for the Systems-View Node-Overview to display real-time node status. +// +// If nodes is nil, queries all metrics for all nodes in the cluster (bulk query). +// Returns data organized as: hostname -> metric -> list of JobMetric (with time series and stats). func (ccms *CCMetricStore) LoadNodeData( cluster string, metrics, nodes []string, @@ -698,7 +481,7 @@ func (ccms *CCMetricStore) LoadNodeData( resBody, err := ccms.doRequest(ctx, &req) if err != nil { - cclog.Errorf("Error while performing request: %s", err.Error()) + cclog.Errorf("Error while performing request for cluster %s: %s", cluster, err.Error()) return nil, err } @@ -719,10 +502,7 @@ func (ccms *CCMetricStore) LoadNodeData( errors = append(errors, fmt.Sprintf("fetching %s for node %s failed: %s", metric, query.Hostname, *qdata.Error)) } - if qdata.Avg.IsNaN() || qdata.Min.IsNaN() || qdata.Max.IsNaN() { - // return nil, fmt.Errorf("METRICDATA/CCMS > fetching %s for node %s failed: %s", metric, query.Hostname, "avg/min/max is NaN") - qdata.Avg, qdata.Min, qdata.Max = 0., 0., 0. - } + sanitizeStats(&qdata.Avg, &qdata.Min, &qdata.Max) hostdata, ok := data[query.Hostname] if !ok { @@ -756,7 +536,16 @@ func (ccms *CCMetricStore) LoadNodeData( return data, nil } -// Used for Systems-View Node-List +// LoadNodeListData retrieves paginated node metrics for the Systems-View Node-List. +// +// Supports filtering by subcluster and node name pattern. The nodeFilter performs +// substring matching on hostnames. +// +// Returns: +// - Node data organized as: hostname -> JobData (metric -> scope -> series) +// - Total node count (before pagination) +// - HasNextPage flag indicating if more pages are available +// - Error (may be partial error with some data returned) func (ccms *CCMetricStore) LoadNodeListData( cluster, subCluster, nodeFilter string, metrics []string, @@ -829,7 +618,7 @@ func (ccms *CCMetricStore) LoadNodeListData( resBody, err := ccms.doRequest(ctx, &req) if err != nil { - cclog.Errorf("Error while performing request: %s", err.Error()) + cclog.Errorf("Error while performing request for cluster %s: %s", cluster, err.Error()) return nil, totalNodes, hasNextPage, err } @@ -888,12 +677,7 @@ func (ccms *CCMetricStore) LoadNodeListData( *id = query.TypeIds[ndx] } - if res.Avg.IsNaN() || res.Min.IsNaN() || res.Max.IsNaN() { - // "schema.Float()" because regular float64 can not be JSONed when NaN. - res.Avg = schema.Float(0) - res.Min = schema.Float(0) - res.Max = schema.Float(0) - } + sanitizeStats(&res.Avg, &res.Min, &res.Max) scopeData.Series = append(scopeData.Series, schema.Series{ Hostname: query.Hostname, @@ -916,319 +700,17 @@ func (ccms *CCMetricStore) LoadNodeListData( return data, totalNodes, hasNextPage, nil } -func (ccms *CCMetricStore) buildNodeQueries( - cluster string, - subCluster string, - nodes []string, - metrics []string, - scopes []schema.MetricScope, - resolution int, -) ([]APIQuery, []schema.MetricScope, error) { - queries := make([]APIQuery, 0, len(metrics)*len(scopes)*len(nodes)) - assignedScope := []schema.MetricScope{} - - // Get Topol before loop if subCluster given - var subClusterTopol *schema.SubCluster - var scterr error - if subCluster != "" { - subClusterTopol, scterr = archive.GetSubCluster(cluster, subCluster) - if scterr != nil { - cclog.Errorf("could not load cluster %s subCluster %s topology: %s", cluster, subCluster, scterr.Error()) - return nil, nil, scterr - } +// sanitizeStats replaces NaN values in statistics with 0 to enable JSON marshaling. +// Regular float64 values cannot be JSONed when NaN. +func sanitizeStats(avg, min, max *schema.Float) { + if avg.IsNaN() || min.IsNaN() || max.IsNaN() { + *avg = schema.Float(0) + *min = schema.Float(0) + *max = schema.Float(0) } - - for _, metric := range metrics { - remoteName := metric - mc := archive.GetMetricConfig(cluster, metric) - if mc == nil { - // return nil, fmt.Errorf("METRICDATA/CCMS > metric '%s' is not specified for cluster '%s'", metric, cluster) - cclog.Warnf("metric '%s' is not specified for cluster '%s'", metric, cluster) - continue - } - - // Skip if metric is removed for subcluster - if mc.SubClusters != nil { - isRemoved := false - for _, scConfig := range mc.SubClusters { - if scConfig.Name == subCluster && scConfig.Remove { - isRemoved = true - break - } - } - if isRemoved { - continue - } - } - - // Avoid duplicates... - handledScopes := make([]schema.MetricScope, 0, 3) - - scopesLoop: - for _, requestedScope := range scopes { - nativeScope := mc.Scope - - scope := nativeScope.Max(requestedScope) - for _, s := range handledScopes { - if scope == s { - continue scopesLoop - } - } - handledScopes = append(handledScopes, scope) - - for _, hostname := range nodes { - - // If no subCluster given, get it by node - if subCluster == "" { - subClusterName, scnerr := archive.GetSubClusterByNode(cluster, hostname) - if scnerr != nil { - return nil, nil, scnerr - } - subClusterTopol, scterr = archive.GetSubCluster(cluster, subClusterName) - if scterr != nil { - return nil, nil, scterr - } - } - - // Always full node hwthread id list, no partial queries expected -> Use "topology.Node" directly where applicable - // Always full accelerator id list, no partial queries expected -> Use "acceleratorIds" directly where applicable - topology := subClusterTopol.Topology - acceleratorIds := topology.GetAcceleratorIDs() - - // Moved check here if metric matches hardware specs - if nativeScope == schema.MetricScopeAccelerator && len(acceleratorIds) == 0 { - continue scopesLoop - } - - // Accelerator -> Accelerator (Use "accelerator" scope if requested scope is lower than node) - if nativeScope == schema.MetricScopeAccelerator && scope.LT(schema.MetricScopeNode) { - if scope != schema.MetricScopeAccelerator { - // Skip all other catched cases - continue - } - - queries = append(queries, APIQuery{ - Metric: remoteName, - Hostname: hostname, - Aggregate: false, - Type: &acceleratorString, - TypeIds: acceleratorIds, - Resolution: resolution, - }) - assignedScope = append(assignedScope, schema.MetricScopeAccelerator) - continue - } - - // Accelerator -> Node - if nativeScope == schema.MetricScopeAccelerator && scope == schema.MetricScopeNode { - if len(acceleratorIds) == 0 { - continue - } - - queries = append(queries, APIQuery{ - Metric: remoteName, - Hostname: hostname, - Aggregate: true, - Type: &acceleratorString, - TypeIds: acceleratorIds, - Resolution: resolution, - }) - assignedScope = append(assignedScope, scope) - continue - } - - // HWThread -> HWThead - if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeHWThread { - queries = append(queries, APIQuery{ - Metric: remoteName, - Hostname: hostname, - Aggregate: false, - Type: &hwthreadString, - TypeIds: intToStringSlice(topology.Node), - Resolution: resolution, - }) - assignedScope = append(assignedScope, scope) - continue - } - - // HWThread -> Core - if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeCore { - cores, _ := topology.GetCoresFromHWThreads(topology.Node) - for _, core := range cores { - queries = append(queries, APIQuery{ - Metric: remoteName, - Hostname: hostname, - Aggregate: true, - Type: &hwthreadString, - TypeIds: intToStringSlice(topology.Core[core]), - Resolution: resolution, - }) - assignedScope = append(assignedScope, scope) - } - continue - } - - // HWThread -> Socket - if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeSocket { - sockets, _ := topology.GetSocketsFromHWThreads(topology.Node) - for _, socket := range sockets { - queries = append(queries, APIQuery{ - Metric: remoteName, - Hostname: hostname, - Aggregate: true, - Type: &hwthreadString, - TypeIds: intToStringSlice(topology.Socket[socket]), - Resolution: resolution, - }) - assignedScope = append(assignedScope, scope) - } - continue - } - - // HWThread -> Node - if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeNode { - queries = append(queries, APIQuery{ - Metric: remoteName, - Hostname: hostname, - Aggregate: true, - Type: &hwthreadString, - TypeIds: intToStringSlice(topology.Node), - Resolution: resolution, - }) - assignedScope = append(assignedScope, scope) - continue - } - - // Core -> Core - if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeCore { - cores, _ := topology.GetCoresFromHWThreads(topology.Node) - queries = append(queries, APIQuery{ - Metric: remoteName, - Hostname: hostname, - Aggregate: false, - Type: &coreString, - TypeIds: intToStringSlice(cores), - Resolution: resolution, - }) - assignedScope = append(assignedScope, scope) - continue - } - - // Core -> Socket - if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeSocket { - sockets, _ := topology.GetSocketsFromCores(topology.Node) - for _, socket := range sockets { - queries = append(queries, APIQuery{ - Metric: remoteName, - Hostname: hostname, - Aggregate: true, - Type: &coreString, - TypeIds: intToStringSlice(topology.Socket[socket]), - Resolution: resolution, - }) - assignedScope = append(assignedScope, scope) - } - continue - } - - // Core -> Node - if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeNode { - cores, _ := topology.GetCoresFromHWThreads(topology.Node) - queries = append(queries, APIQuery{ - Metric: remoteName, - Hostname: hostname, - Aggregate: true, - Type: &coreString, - TypeIds: intToStringSlice(cores), - Resolution: resolution, - }) - assignedScope = append(assignedScope, scope) - continue - } - - // MemoryDomain -> MemoryDomain - if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeMemoryDomain { - sockets, _ := topology.GetMemoryDomainsFromHWThreads(topology.Node) - queries = append(queries, APIQuery{ - Metric: remoteName, - Hostname: hostname, - Aggregate: false, - Type: &memoryDomainString, - TypeIds: intToStringSlice(sockets), - Resolution: resolution, - }) - assignedScope = append(assignedScope, scope) - continue - } - - // MemoryDoman -> Node - if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeNode { - sockets, _ := topology.GetMemoryDomainsFromHWThreads(topology.Node) - queries = append(queries, APIQuery{ - Metric: remoteName, - Hostname: hostname, - Aggregate: true, - Type: &memoryDomainString, - TypeIds: intToStringSlice(sockets), - Resolution: resolution, - }) - assignedScope = append(assignedScope, scope) - continue - } - - // Socket -> Socket - if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeSocket { - sockets, _ := topology.GetSocketsFromHWThreads(topology.Node) - queries = append(queries, APIQuery{ - Metric: remoteName, - Hostname: hostname, - Aggregate: false, - Type: &socketString, - TypeIds: intToStringSlice(sockets), - Resolution: resolution, - }) - assignedScope = append(assignedScope, scope) - continue - } - - // Socket -> Node - if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeNode { - sockets, _ := topology.GetSocketsFromHWThreads(topology.Node) - queries = append(queries, APIQuery{ - Metric: remoteName, - Hostname: hostname, - Aggregate: true, - Type: &socketString, - TypeIds: intToStringSlice(sockets), - Resolution: resolution, - }) - assignedScope = append(assignedScope, scope) - continue - } - - // Node -> Node - if nativeScope == schema.MetricScopeNode && scope == schema.MetricScopeNode { - queries = append(queries, APIQuery{ - Metric: remoteName, - Hostname: hostname, - Resolution: resolution, - }) - assignedScope = append(assignedScope, scope) - continue - } - - return nil, nil, fmt.Errorf("METRICDATA/CCMS > TODO: unhandled case: native-scope=%s, requested-scope=%s", nativeScope, requestedScope) - } - } - } - - return queries, assignedScope, nil } -func intToStringSlice(is []int) []string { - ss := make([]string, len(is)) - for i, x := range is { - ss[i] = strconv.Itoa(x) - } - return ss +// hasNaNStats returns true if any of the statistics contain NaN values. +func hasNaNStats(avg, min, max schema.Float) bool { + return avg.IsNaN() || min.IsNaN() || max.IsNaN() }