From 26982088c33ce8efdd8b5acf0b6e99f11cba3656 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Wed, 4 Mar 2026 16:43:05 +0100 Subject: [PATCH] Consolidate code for external and internal ccms buildQueries function Entire-Checkpoint: fc3be444ef4c --- .../cc-metric-store-queries.go | 324 +---------- internal/metricstoreclient/cc-metric-store.go | 26 +- pkg/metricstore/metricstore.go | 2 +- pkg/metricstore/query.go | 543 ++---------------- pkg/metricstore/scopequery.go | 314 ++++++++++ pkg/metricstore/scopequery_test.go | 273 +++++++++ 6 files changed, 676 insertions(+), 806 deletions(-) create mode 100644 pkg/metricstore/scopequery.go create mode 100644 pkg/metricstore/scopequery_test.go diff --git a/internal/metricstoreclient/cc-metric-store-queries.go b/internal/metricstoreclient/cc-metric-store-queries.go index 7a04efc4..b8e3a94a 100644 --- a/internal/metricstoreclient/cc-metric-store-queries.go +++ b/internal/metricstoreclient/cc-metric-store-queries.go @@ -37,23 +37,13 @@ package metricstoreclient import ( "fmt" - "strconv" "github.com/ClusterCockpit/cc-backend/pkg/archive" + "github.com/ClusterCockpit/cc-backend/pkg/metricstore" cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" "github.com/ClusterCockpit/cc-lib/v2/schema" ) -// Scope string constants used in API queries. -// Pre-converted to avoid repeated allocations during query building. -var ( - hwthreadString = string(schema.MetricScopeHWThread) - coreString = string(schema.MetricScopeCore) - memoryDomainString = string(schema.MetricScopeMemoryDomain) - socketString = string(schema.MetricScopeSocket) - acceleratorString = string(schema.MetricScopeAccelerator) -) - // buildQueries constructs API queries for job-specific metric data. // It iterates through metrics, scopes, and job resources to build the complete query set. // @@ -126,21 +116,27 @@ func (ccms *CCMetricStore) buildQueries( hwthreads = topology.Node } - // Note: Expected exceptions will return as empty slices -> Continue - hostQueries, hostScopes := buildScopeQueries( + scopeResults, ok := metricstore.BuildScopeQueries( nativeScope, requestedScope, remoteName, host.Hostname, topology, hwthreads, host.Accelerators, - resolution, ) - // Note: Unexpected errors, such as unhandled cases, will return as nils -> Error - if hostQueries == nil && hostScopes == nil { + if !ok { return nil, nil, fmt.Errorf("METRICDATA/EXTERNAL-CCMS > TODO: unhandled case: native-scope=%s, requested-scope=%s", nativeScope, requestedScope) } - queries = append(queries, hostQueries...) - assignedScope = append(assignedScope, hostScopes...) + for _, sr := range scopeResults { + queries = append(queries, APIQuery{ + Metric: sr.Metric, + Hostname: sr.Hostname, + Aggregate: sr.Aggregate, + Type: sr.Type, + TypeIds: sr.TypeIds, + Resolution: resolution, + }) + assignedScope = append(assignedScope, sr.Scope) + } } } } @@ -231,19 +227,27 @@ func (ccms *CCMetricStore) buildNodeQueries( continue scopesLoop } - nodeQueries, nodeScopes := buildScopeQueries( + scopeResults, ok := metricstore.BuildScopeQueries( nativeScope, requestedScope, remoteName, hostname, topology, topology.Node, acceleratorIds, - resolution, ) - if len(nodeQueries) == 0 && len(nodeScopes) == 0 { + if !ok { return nil, nil, fmt.Errorf("METRICDATA/EXTERNAL-CCMS > TODO: unhandled case: native-scope=%s, requested-scope=%s", nativeScope, requestedScope) } - queries = append(queries, nodeQueries...) - assignedScope = append(assignedScope, nodeScopes...) + for _, sr := range scopeResults { + queries = append(queries, APIQuery{ + Metric: sr.Metric, + Hostname: sr.Hostname, + Aggregate: sr.Aggregate, + Type: sr.Type, + TypeIds: sr.TypeIds, + Resolution: resolution, + }) + assignedScope = append(assignedScope, sr.Scope) + } } } } @@ -251,277 +255,3 @@ func (ccms *CCMetricStore) buildNodeQueries( return queries, assignedScope, nil } -// buildScopeQueries generates API queries for a given scope transformation. -// It returns a slice of queries and corresponding assigned scopes. -// Some transformations (e.g., HWThread -> Core/Socket) may generate multiple queries. -func buildScopeQueries( - nativeScope, requestedScope schema.MetricScope, - metric, hostname string, - topology *schema.Topology, - hwthreads []int, - accelerators []string, - resolution int, -) ([]APIQuery, []schema.MetricScope) { - scope := nativeScope.Max(requestedScope) - queries := []APIQuery{} - scopes := []schema.MetricScope{} - - hwthreadsStr := intToStringSlice(hwthreads) - - // Accelerator -> Accelerator (Use "accelerator" scope if requested scope is lower than node) - if nativeScope == schema.MetricScopeAccelerator && scope.LT(schema.MetricScopeNode) { - if scope != schema.MetricScopeAccelerator { - // Expected Exception -> Continue -> Return Empty Slices - return queries, scopes - } - - queries = append(queries, APIQuery{ - Metric: metric, - Hostname: hostname, - Aggregate: false, - Type: &acceleratorString, - TypeIds: accelerators, - Resolution: resolution, - }) - scopes = append(scopes, schema.MetricScopeAccelerator) - return queries, scopes - } - - // Accelerator -> Node - if nativeScope == schema.MetricScopeAccelerator && scope == schema.MetricScopeNode { - if len(accelerators) == 0 { - // Expected Exception -> Continue -> Return Empty Slices - return queries, scopes - } - - queries = append(queries, APIQuery{ - Metric: metric, - Hostname: hostname, - Aggregate: true, - Type: &acceleratorString, - TypeIds: accelerators, - Resolution: resolution, - }) - scopes = append(scopes, scope) - return queries, scopes - } - - // HWThread -> HWThread - if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeHWThread { - queries = append(queries, APIQuery{ - Metric: metric, - Hostname: hostname, - Aggregate: false, - Type: &hwthreadString, - TypeIds: hwthreadsStr, - Resolution: resolution, - }) - scopes = append(scopes, scope) - return queries, scopes - } - - // HWThread -> Core - if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeCore { - cores, _ := topology.GetCoresFromHWThreads(hwthreads) - for _, core := range cores { - queries = append(queries, APIQuery{ - Metric: metric, - Hostname: hostname, - Aggregate: true, - Type: &hwthreadString, - TypeIds: intToStringSlice(topology.Core[core]), - Resolution: resolution, - }) - scopes = append(scopes, scope) - } - return queries, scopes - } - - // HWThread -> Socket - if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeSocket { - sockets, _ := topology.GetSocketsFromHWThreads(hwthreads) - for _, socket := range sockets { - queries = append(queries, APIQuery{ - Metric: metric, - Hostname: hostname, - Aggregate: true, - Type: &hwthreadString, - TypeIds: intToStringSlice(topology.Socket[socket]), - Resolution: resolution, - }) - scopes = append(scopes, scope) - } - return queries, scopes - } - - // HWThread -> Node - if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeNode { - queries = append(queries, APIQuery{ - Metric: metric, - Hostname: hostname, - Aggregate: true, - Type: &hwthreadString, - TypeIds: hwthreadsStr, - Resolution: resolution, - }) - scopes = append(scopes, scope) - return queries, scopes - } - - // Core -> Core - if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeCore { - cores, _ := topology.GetCoresFromHWThreads(hwthreads) - queries = append(queries, APIQuery{ - Metric: metric, - Hostname: hostname, - Aggregate: false, - Type: &coreString, - TypeIds: intToStringSlice(cores), - Resolution: resolution, - }) - scopes = append(scopes, scope) - return queries, scopes - } - - // Core -> Socket - if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeSocket { - sockets, _ := topology.GetSocketsFromCores(hwthreads) - for _, socket := range sockets { - queries = append(queries, APIQuery{ - Metric: metric, - Hostname: hostname, - Aggregate: true, - Type: &coreString, - TypeIds: intToStringSlice(topology.Socket[socket]), - Resolution: resolution, - }) - scopes = append(scopes, scope) - } - return queries, scopes - } - - // Core -> Node - if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeNode { - cores, _ := topology.GetCoresFromHWThreads(hwthreads) - queries = append(queries, APIQuery{ - Metric: metric, - Hostname: hostname, - Aggregate: true, - Type: &coreString, - TypeIds: intToStringSlice(cores), - Resolution: resolution, - }) - scopes = append(scopes, scope) - return queries, scopes - } - - // MemoryDomain -> MemoryDomain - if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeMemoryDomain { - memDomains, _ := topology.GetMemoryDomainsFromHWThreads(hwthreads) - queries = append(queries, APIQuery{ - Metric: metric, - Hostname: hostname, - Aggregate: false, - Type: &memoryDomainString, - TypeIds: intToStringSlice(memDomains), - Resolution: resolution, - }) - scopes = append(scopes, scope) - return queries, scopes - } - - // MemoryDomain -> Node - if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeNode { - memDomains, _ := topology.GetMemoryDomainsFromHWThreads(hwthreads) - queries = append(queries, APIQuery{ - Metric: metric, - Hostname: hostname, - Aggregate: true, - Type: &memoryDomainString, - TypeIds: intToStringSlice(memDomains), - Resolution: resolution, - }) - scopes = append(scopes, scope) - return queries, scopes - } - - // MemoryDomain -> Socket - if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeSocket { - memDomains, _ := topology.GetMemoryDomainsFromHWThreads(hwthreads) - socketToDomains, err := topology.GetMemoryDomainsBySocket(memDomains) - if err != nil { - cclog.Errorf("Error mapping memory domains to sockets, return unchanged: %v", err) - // Rare Error Case -> Still Continue -> Return Empty Slices - return queries, scopes - } - - // Create a query for each socket - for _, domains := range socketToDomains { - queries = append(queries, APIQuery{ - Metric: metric, - Hostname: hostname, - Aggregate: true, - Type: &memoryDomainString, - TypeIds: intToStringSlice(domains), - Resolution: resolution, - }) - // Add scope for each query, not just once - scopes = append(scopes, scope) - } - return queries, scopes - } - - // Socket -> Socket - if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeSocket { - sockets, _ := topology.GetSocketsFromHWThreads(hwthreads) - queries = append(queries, APIQuery{ - Metric: metric, - Hostname: hostname, - Aggregate: false, - Type: &socketString, - TypeIds: intToStringSlice(sockets), - Resolution: resolution, - }) - scopes = append(scopes, scope) - return queries, scopes - } - - // Socket -> Node - if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeNode { - sockets, _ := topology.GetSocketsFromHWThreads(hwthreads) - queries = append(queries, APIQuery{ - Metric: metric, - Hostname: hostname, - Aggregate: true, - Type: &socketString, - TypeIds: intToStringSlice(sockets), - Resolution: resolution, - }) - scopes = append(scopes, scope) - return queries, scopes - } - - // Node -> Node - if nativeScope == schema.MetricScopeNode && scope == schema.MetricScopeNode { - queries = append(queries, APIQuery{ - Metric: metric, - Hostname: hostname, - Resolution: resolution, - }) - scopes = append(scopes, scope) - return queries, scopes - } - - // Unhandled Case -> Error -> Return nils - return nil, nil -} - -// intToStringSlice converts a slice of integers to a slice of strings. -// Used to convert hardware IDs (core IDs, socket IDs, etc.) to the string format required by the API. -func intToStringSlice(is []int) []string { - ss := make([]string, len(is)) - for i, x := range is { - ss[i] = strconv.Itoa(x) - } - return ss -} diff --git a/internal/metricstoreclient/cc-metric-store.go b/internal/metricstoreclient/cc-metric-store.go index e2a84466..2f13ade6 100644 --- a/internal/metricstoreclient/cc-metric-store.go +++ b/internal/metricstoreclient/cc-metric-store.go @@ -63,7 +63,7 @@ import ( "time" "github.com/ClusterCockpit/cc-backend/pkg/archive" - "github.com/ClusterCockpit/cc-backend/pkg/metricstore" + ms "github.com/ClusterCockpit/cc-backend/pkg/metricstore" cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" "github.com/ClusterCockpit/cc-lib/v2/schema" ) @@ -331,7 +331,7 @@ func (ccms *CCMetricStore) LoadData( } } - sanitizeStats(&res.Avg, &res.Min, &res.Max) + ms.SanitizeStats(&res.Avg, &res.Min, &res.Max) jobMetric.Series = append(jobMetric.Series, schema.Series{ Hostname: query.Hostname, @@ -494,7 +494,7 @@ func (ccms *CCMetricStore) LoadScopedStats( } } - sanitizeStats(&res.Avg, &res.Min, &res.Max) + ms.SanitizeStats(&res.Avg, &res.Min, &res.Max) scopedJobStats[metric][scope] = append(scopedJobStats[metric][scope], &schema.ScopedStats{ Hostname: query.Hostname, @@ -584,7 +584,7 @@ func (ccms *CCMetricStore) LoadNodeData( errors = append(errors, fmt.Sprintf("fetching %s for node %s failed: %s", metric, query.Hostname, *qdata.Error)) } - sanitizeStats(&qdata.Avg, &qdata.Min, &qdata.Max) + ms.SanitizeStats(&qdata.Avg, &qdata.Min, &qdata.Max) hostdata, ok := data[query.Hostname] if !ok { @@ -756,7 +756,7 @@ func (ccms *CCMetricStore) LoadNodeListData( } } - sanitizeStats(&res.Avg, &res.Min, &res.Max) + ms.SanitizeStats(&res.Avg, &res.Min, &res.Max) scopeData.Series = append(scopeData.Series, schema.Series{ Hostname: query.Hostname, @@ -784,8 +784,8 @@ func (ccms *CCMetricStore) LoadNodeListData( // returns the per-node health check results. func (ccms *CCMetricStore) HealthCheck(cluster string, nodes []string, metrics []string, -) (map[string]metricstore.HealthCheckResult, error) { - req := metricstore.HealthCheckReq{ +) (map[string]ms.HealthCheckResult, error) { + req := ms.HealthCheckReq{ Cluster: cluster, Nodes: nodes, MetricNames: metrics, @@ -818,7 +818,7 @@ func (ccms *CCMetricStore) HealthCheck(cluster string, return nil, fmt.Errorf("'%s': HTTP Status: %s", endpoint, res.Status) } - var results map[string]metricstore.HealthCheckResult + var results map[string]ms.HealthCheckResult if err := json.NewDecoder(bufio.NewReader(res.Body)).Decode(&results); err != nil { cclog.Errorf("Error while decoding health check response: %s", err.Error()) return nil, err @@ -827,16 +827,6 @@ func (ccms *CCMetricStore) HealthCheck(cluster string, return results, nil } -// sanitizeStats replaces NaN values in statistics with 0 to enable JSON marshaling. -// Regular float64 values cannot be JSONed when NaN. -func sanitizeStats(avg, min, max *schema.Float) { - if avg.IsNaN() || min.IsNaN() || max.IsNaN() { - *avg = schema.Float(0) - *min = schema.Float(0) - *max = schema.Float(0) - } -} - // hasNaNStats returns true if any of the statistics contain NaN values. func hasNaNStats(avg, min, max schema.Float) bool { return avg.IsNaN() || min.IsNaN() || max.IsNaN() diff --git a/pkg/metricstore/metricstore.go b/pkg/metricstore/metricstore.go index 6d49624a..b6fbb51a 100644 --- a/pkg/metricstore/metricstore.go +++ b/pkg/metricstore/metricstore.go @@ -235,7 +235,7 @@ func InitMetrics(metrics map[string]MetricConfig) { // This function is safe for concurrent use after initialization. func GetMemoryStore() *MemoryStore { if msInstance == nil { - cclog.Fatalf("[METRICSTORE]> MemoryStore not initialized!") + cclog.Warnf("[METRICSTORE]> MemoryStore not initialized!") } return msInstance diff --git a/pkg/metricstore/query.go b/pkg/metricstore/query.go index 735c45d6..ed55521f 100644 --- a/pkg/metricstore/query.go +++ b/pkg/metricstore/query.go @@ -29,7 +29,6 @@ package metricstore import ( "context" "fmt" - "strconv" "strings" "time" @@ -186,7 +185,7 @@ func (ccms *InternalMetricStore) LoadData( } } - sanitizeStats(&res) + SanitizeStats(&res.Avg, &res.Min, &res.Max) jobMetric.Series = append(jobMetric.Series, schema.Series{ Hostname: query.Hostname, @@ -216,18 +215,6 @@ func (ccms *InternalMetricStore) LoadData( return jobData, nil } -// Pre-converted scope strings avoid repeated string(MetricScope) allocations during -// query construction. These are used in APIQuery.Type field throughout buildQueries -// and buildNodeQueries functions. Converting once at package initialization improves -// performance for high-volume query building. -var ( - hwthreadString = string(schema.MetricScopeHWThread) - coreString = string(schema.MetricScopeCore) - memoryDomainString = string(schema.MetricScopeMemoryDomain) - socketString = string(schema.MetricScopeSocket) - acceleratorString = string(schema.MetricScopeAccelerator) -) - // buildQueries constructs APIQuery structures with automatic scope transformation for a job. // // This function implements the core scope transformation logic, handling all combinations of @@ -293,9 +280,10 @@ func buildQueries( } } - // Avoid duplicates using map for O(1) lookup - handledScopes := make(map[schema.MetricScope]bool, 3) + // Avoid duplicates... + handledScopes := make([]schema.MetricScope, 0, 3) + scopesLoop: for _, requestedScope := range scopes { nativeScope := mc.Scope if nativeScope == schema.MetricScopeAccelerator && job.NumAcc == 0 { @@ -303,10 +291,12 @@ func buildQueries( } scope := nativeScope.Max(requestedScope) - if handledScopes[scope] { - continue + for _, s := range handledScopes { + if scope == s { + continue scopesLoop + } } - handledScopes[scope] = true + handledScopes = append(handledScopes, scope) for _, host := range job.Resources { hwthreads := host.HWThreads @@ -314,224 +304,27 @@ func buildQueries( hwthreads = topology.Node } - // Accelerator -> Accelerator (Use "accelerator" scope if requested scope is lower than node) - if nativeScope == schema.MetricScopeAccelerator && scope.LT(schema.MetricScopeNode) { - if scope != schema.MetricScopeAccelerator { - // Skip all other catched cases - continue - } + scopeResults, ok := BuildScopeQueries( + nativeScope, requestedScope, + metric, host.Hostname, + &topology, hwthreads, host.Accelerators, + ) + if !ok { + return nil, nil, fmt.Errorf("METRICDATA/INTERNAL-CCMS > TODO: unhandled case: native-scope=%s, requested-scope=%s", nativeScope, requestedScope) + } + + for _, sr := range scopeResults { queries = append(queries, APIQuery{ - Metric: metric, - Hostname: host.Hostname, - Aggregate: false, - Type: &acceleratorString, - TypeIds: host.Accelerators, + Metric: sr.Metric, + Hostname: sr.Hostname, + Aggregate: sr.Aggregate, + Type: sr.Type, + TypeIds: sr.TypeIds, Resolution: resolution, }) - assignedScope = append(assignedScope, schema.MetricScopeAccelerator) - continue + assignedScope = append(assignedScope, sr.Scope) } - - // Accelerator -> Node - if nativeScope == schema.MetricScopeAccelerator && scope == schema.MetricScopeNode { - if len(host.Accelerators) == 0 { - continue - } - - queries = append(queries, APIQuery{ - Metric: metric, - Hostname: host.Hostname, - Aggregate: true, - Type: &acceleratorString, - TypeIds: host.Accelerators, - Resolution: resolution, - }) - assignedScope = append(assignedScope, scope) - continue - } - - // HWThread -> HWThread - if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeHWThread { - queries = append(queries, APIQuery{ - Metric: metric, - Hostname: host.Hostname, - Aggregate: false, - Type: &hwthreadString, - TypeIds: intToStringSlice(hwthreads), - Resolution: resolution, - }) - assignedScope = append(assignedScope, scope) - continue - } - - // HWThread -> Core - if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeCore { - cores, _ := topology.GetCoresFromHWThreads(hwthreads) - for _, core := range cores { - queries = append(queries, APIQuery{ - Metric: metric, - Hostname: host.Hostname, - Aggregate: true, - Type: &hwthreadString, - TypeIds: intToStringSlice(topology.Core[core]), - Resolution: resolution, - }) - assignedScope = append(assignedScope, scope) - } - continue - } - - // HWThread -> Socket - if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeSocket { - sockets, _ := topology.GetSocketsFromHWThreads(hwthreads) - for _, socket := range sockets { - queries = append(queries, APIQuery{ - Metric: metric, - Hostname: host.Hostname, - Aggregate: true, - Type: &hwthreadString, - TypeIds: intToStringSlice(topology.Socket[socket]), - Resolution: resolution, - }) - assignedScope = append(assignedScope, scope) - } - continue - } - - // HWThread -> Node - if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeNode { - queries = append(queries, APIQuery{ - Metric: metric, - Hostname: host.Hostname, - Aggregate: true, - Type: &hwthreadString, - TypeIds: intToStringSlice(hwthreads), - Resolution: resolution, - }) - assignedScope = append(assignedScope, scope) - continue - } - - // Core -> Core - if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeCore { - cores, _ := topology.GetCoresFromHWThreads(hwthreads) - queries = append(queries, APIQuery{ - Metric: metric, - Hostname: host.Hostname, - Aggregate: false, - Type: &coreString, - TypeIds: intToStringSlice(cores), - Resolution: resolution, - }) - assignedScope = append(assignedScope, scope) - continue - } - - // Core -> Socket - if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeSocket { - sockets, _ := topology.GetSocketsFromCores(hwthreads) - for _, socket := range sockets { - queries = append(queries, APIQuery{ - Metric: metric, - Hostname: host.Hostname, - Aggregate: true, - Type: &coreString, - TypeIds: intToStringSlice(topology.Socket[socket]), - Resolution: resolution, - }) - assignedScope = append(assignedScope, scope) - } - continue - } - - // Core -> Node - if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeNode { - cores, _ := topology.GetCoresFromHWThreads(hwthreads) - queries = append(queries, APIQuery{ - Metric: metric, - Hostname: host.Hostname, - Aggregate: true, - Type: &coreString, - TypeIds: intToStringSlice(cores), - Resolution: resolution, - }) - assignedScope = append(assignedScope, scope) - continue - } - - // MemoryDomain -> MemoryDomain - if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeMemoryDomain { - sockets, _ := topology.GetMemoryDomainsFromHWThreads(hwthreads) - queries = append(queries, APIQuery{ - Metric: metric, - Hostname: host.Hostname, - Aggregate: false, - Type: &memoryDomainString, - TypeIds: intToStringSlice(sockets), - Resolution: resolution, - }) - assignedScope = append(assignedScope, scope) - continue - } - - // MemoryDomain -> Node - if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeNode { - sockets, _ := topology.GetMemoryDomainsFromHWThreads(hwthreads) - queries = append(queries, APIQuery{ - Metric: metric, - Hostname: host.Hostname, - Aggregate: true, - Type: &memoryDomainString, - TypeIds: intToStringSlice(sockets), - Resolution: resolution, - }) - assignedScope = append(assignedScope, scope) - continue - } - - // Socket -> Socket - if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeSocket { - sockets, _ := topology.GetSocketsFromHWThreads(hwthreads) - queries = append(queries, APIQuery{ - Metric: metric, - Hostname: host.Hostname, - Aggregate: false, - Type: &socketString, - TypeIds: intToStringSlice(sockets), - Resolution: resolution, - }) - assignedScope = append(assignedScope, scope) - continue - } - - // Socket -> Node - if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeNode { - sockets, _ := topology.GetSocketsFromHWThreads(hwthreads) - queries = append(queries, APIQuery{ - Metric: metric, - Hostname: host.Hostname, - Aggregate: true, - Type: &socketString, - TypeIds: intToStringSlice(sockets), - Resolution: resolution, - }) - assignedScope = append(assignedScope, scope) - continue - } - - // Node -> Node - if nativeScope == schema.MetricScopeNode && scope == schema.MetricScopeNode { - queries = append(queries, APIQuery{ - Metric: metric, - Hostname: host.Hostname, - Resolution: resolution, - }) - assignedScope = append(assignedScope, scope) - continue - } - - return nil, nil, fmt.Errorf("METRICDATA/INTERNAL-CCMS > TODO: unhandled case: native-scope=%s, requested-scope=%s", nativeScope, requestedScope) } } } @@ -695,7 +488,7 @@ func (ccms *InternalMetricStore) LoadScopedStats( } } - sanitizeStats(&res) + SanitizeStats(&res.Avg, &res.Min, &res.Max) scopedJobStats[metric][scope] = append(scopedJobStats[metric][scope], &schema.ScopedStats{ Hostname: query.Hostname, @@ -796,7 +589,7 @@ func (ccms *InternalMetricStore) LoadNodeData( errors = append(errors, fmt.Sprintf("fetching %s for node %s failed: %s", metric, query.Hostname, *qdata.Error)) } - sanitizeStats(&qdata) + SanitizeStats(&qdata.Avg, &qdata.Min, &qdata.Max) hostdata, ok := data[query.Hostname] if !ok { @@ -977,7 +770,7 @@ func (ccms *InternalMetricStore) LoadNodeListData( } } - sanitizeStats(&res) + SanitizeStats(&res.Avg, &res.Min, &res.Max) scopeData.Series = append(scopeData.Series, schema.Series{ Hostname: query.Hostname, @@ -1060,17 +853,20 @@ func buildNodeQueries( } } - // Avoid duplicates using map for O(1) lookup - handledScopes := make(map[schema.MetricScope]bool, 3) + // Avoid duplicates... + handledScopes := make([]schema.MetricScope, 0, 3) + nodeScopesLoop: for _, requestedScope := range scopes { nativeScope := mc.Scope scope := nativeScope.Max(requestedScope) - if handledScopes[scope] { - continue + for _, s := range handledScopes { + if scope == s { + continue nodeScopesLoop + } } - handledScopes[scope] = true + handledScopes = append(handledScopes, scope) for _, hostname := range nodes { @@ -1086,8 +882,7 @@ func buildNodeQueries( } } - // Always full node hwthread id list, no partial queries expected -> Use "topology.Node" directly where applicable - // Always full accelerator id list, no partial queries expected -> Use "acceleratorIds" directly where applicable + // Always full node hwthread id list, no partial queries expected topology := subClusterTopol.Topology acceleratorIds := topology.GetAcceleratorIDs() @@ -1096,262 +891,30 @@ func buildNodeQueries( continue } - // Accelerator -> Accelerator (Use "accelerator" scope if requested scope is lower than node) - if nativeScope == schema.MetricScopeAccelerator && scope.LT(schema.MetricScopeNode) { - if scope != schema.MetricScopeAccelerator { - // Skip all other catched cases - continue - } + scopeResults, ok := BuildScopeQueries( + nativeScope, requestedScope, + metric, hostname, + &topology, topology.Node, acceleratorIds, + ) + if !ok { + return nil, nil, fmt.Errorf("METRICDATA/INTERNAL-CCMS > TODO: unhandled case: native-scope=%s, requested-scope=%s", nativeScope, requestedScope) + } + + for _, sr := range scopeResults { queries = append(queries, APIQuery{ - Metric: metric, - Hostname: hostname, - Aggregate: false, - Type: &acceleratorString, - TypeIds: acceleratorIds, + Metric: sr.Metric, + Hostname: sr.Hostname, + Aggregate: sr.Aggregate, + Type: sr.Type, + TypeIds: sr.TypeIds, Resolution: resolution, }) - assignedScope = append(assignedScope, schema.MetricScopeAccelerator) - continue + assignedScope = append(assignedScope, sr.Scope) } - - // Accelerator -> Node - if nativeScope == schema.MetricScopeAccelerator && scope == schema.MetricScopeNode { - if len(acceleratorIds) == 0 { - continue - } - - queries = append(queries, APIQuery{ - Metric: metric, - Hostname: hostname, - Aggregate: true, - Type: &acceleratorString, - TypeIds: acceleratorIds, - Resolution: resolution, - }) - assignedScope = append(assignedScope, scope) - continue - } - - // HWThread -> HWThread - if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeHWThread { - queries = append(queries, APIQuery{ - Metric: metric, - Hostname: hostname, - Aggregate: false, - Type: &hwthreadString, - TypeIds: intToStringSlice(topology.Node), - Resolution: resolution, - }) - assignedScope = append(assignedScope, scope) - continue - } - - // HWThread -> Core - if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeCore { - cores, _ := topology.GetCoresFromHWThreads(topology.Node) - for _, core := range cores { - queries = append(queries, APIQuery{ - Metric: metric, - Hostname: hostname, - Aggregate: true, - Type: &hwthreadString, - TypeIds: intToStringSlice(topology.Core[core]), - Resolution: resolution, - }) - assignedScope = append(assignedScope, scope) - } - continue - } - - // HWThread -> Socket - if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeSocket { - sockets, _ := topology.GetSocketsFromHWThreads(topology.Node) - for _, socket := range sockets { - queries = append(queries, APIQuery{ - Metric: metric, - Hostname: hostname, - Aggregate: true, - Type: &hwthreadString, - TypeIds: intToStringSlice(topology.Socket[socket]), - Resolution: resolution, - }) - assignedScope = append(assignedScope, scope) - } - continue - } - - // HWThread -> Node - if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeNode { - queries = append(queries, APIQuery{ - Metric: metric, - Hostname: hostname, - Aggregate: true, - Type: &hwthreadString, - TypeIds: intToStringSlice(topology.Node), - Resolution: resolution, - }) - assignedScope = append(assignedScope, scope) - continue - } - - // Core -> Core - if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeCore { - cores, _ := topology.GetCoresFromHWThreads(topology.Node) - queries = append(queries, APIQuery{ - Metric: metric, - Hostname: hostname, - Aggregate: false, - Type: &coreString, - TypeIds: intToStringSlice(cores), - Resolution: resolution, - }) - assignedScope = append(assignedScope, scope) - continue - } - - // Core -> Socket - if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeSocket { - sockets, _ := topology.GetSocketsFromCores(topology.Node) - for _, socket := range sockets { - queries = append(queries, APIQuery{ - Metric: metric, - Hostname: hostname, - Aggregate: true, - Type: &coreString, - TypeIds: intToStringSlice(topology.Socket[socket]), - Resolution: resolution, - }) - assignedScope = append(assignedScope, scope) - } - continue - } - - // Core -> Node - if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeNode { - cores, _ := topology.GetCoresFromHWThreads(topology.Node) - queries = append(queries, APIQuery{ - Metric: metric, - Hostname: hostname, - Aggregate: true, - Type: &coreString, - TypeIds: intToStringSlice(cores), - Resolution: resolution, - }) - assignedScope = append(assignedScope, scope) - continue - } - - // MemoryDomain -> MemoryDomain - if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeMemoryDomain { - sockets, _ := topology.GetMemoryDomainsFromHWThreads(topology.Node) - queries = append(queries, APIQuery{ - Metric: metric, - Hostname: hostname, - Aggregate: false, - Type: &memoryDomainString, - TypeIds: intToStringSlice(sockets), - Resolution: resolution, - }) - assignedScope = append(assignedScope, scope) - continue - } - - // MemoryDomain -> Node - if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeNode { - sockets, _ := topology.GetMemoryDomainsFromHWThreads(topology.Node) - queries = append(queries, APIQuery{ - Metric: metric, - Hostname: hostname, - Aggregate: true, - Type: &memoryDomainString, - TypeIds: intToStringSlice(sockets), - Resolution: resolution, - }) - assignedScope = append(assignedScope, scope) - continue - } - - // Socket -> Socket - if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeSocket { - sockets, _ := topology.GetSocketsFromHWThreads(topology.Node) - queries = append(queries, APIQuery{ - Metric: metric, - Hostname: hostname, - Aggregate: false, - Type: &socketString, - TypeIds: intToStringSlice(sockets), - Resolution: resolution, - }) - assignedScope = append(assignedScope, scope) - continue - } - - // Socket -> Node - if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeNode { - sockets, _ := topology.GetSocketsFromHWThreads(topology.Node) - queries = append(queries, APIQuery{ - Metric: metric, - Hostname: hostname, - Aggregate: true, - Type: &socketString, - TypeIds: intToStringSlice(sockets), - Resolution: resolution, - }) - assignedScope = append(assignedScope, scope) - continue - } - - // Node -> Node - if nativeScope == schema.MetricScopeNode && scope == schema.MetricScopeNode { - queries = append(queries, APIQuery{ - Metric: metric, - Hostname: hostname, - Resolution: resolution, - }) - assignedScope = append(assignedScope, scope) - continue - } - - return nil, nil, fmt.Errorf("METRICDATA/INTERNAL-CCMS > TODO: unhandled case: native-scope=%s, requested-scope=%s", nativeScope, requestedScope) } } } return queries, assignedScope, nil } - -// sanitizeStats converts NaN statistics to zero for JSON compatibility. -// -// schema.Float with NaN values cannot be properly JSON-encoded, so we convert -// NaN to 0. This loses the distinction between "no data" and "zero value", -// but maintains API compatibility. -func sanitizeStats(data *APIMetricData) { - if data.Avg.IsNaN() { - data.Avg = schema.Float(0) - } - if data.Min.IsNaN() { - data.Min = schema.Float(0) - } - if data.Max.IsNaN() { - data.Max = schema.Float(0) - } -} - -// intToStringSlice converts a slice of integers to a slice of strings. -// Used to convert hardware thread/core/socket IDs from topology (int) to APIQuery TypeIds (string). -// -// Optimized to reuse a byte buffer for string conversion, reducing allocations. -func intToStringSlice(is []int) []string { - if len(is) == 0 { - return nil - } - - ss := make([]string, len(is)) - buf := make([]byte, 0, 16) // Reusable buffer for integer conversion - for i, x := range is { - buf = strconv.AppendInt(buf[:0], int64(x), 10) - ss[i] = string(buf) - } - return ss -} diff --git a/pkg/metricstore/scopequery.go b/pkg/metricstore/scopequery.go new file mode 100644 index 00000000..a414b794 --- /dev/null +++ b/pkg/metricstore/scopequery.go @@ -0,0 +1,314 @@ +// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. This file is part of cc-backend. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +// This file contains shared scope transformation logic used by both the internal +// metric store (pkg/metricstore) and the external cc-metric-store client +// (internal/metricstoreclient). It extracts the common algorithm for mapping +// between native metric scopes and requested scopes based on cluster topology. +package metricstore + +import ( + "strconv" + + cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" + "github.com/ClusterCockpit/cc-lib/v2/schema" +) + +// Pre-converted scope strings avoid repeated string(MetricScope) allocations +// during query construction. Used in ScopeQueryResult.Type field. +var ( + HWThreadString = string(schema.MetricScopeHWThread) + CoreString = string(schema.MetricScopeCore) + MemoryDomainString = string(schema.MetricScopeMemoryDomain) + SocketString = string(schema.MetricScopeSocket) + AcceleratorString = string(schema.MetricScopeAccelerator) +) + +// ScopeQueryResult is a package-independent intermediate type returned by +// BuildScopeQueries. Each consumer converts it to their own APIQuery type +// (adding Resolution and any other package-specific fields). +type ScopeQueryResult struct { + Type *string + Metric string + Hostname string + TypeIds []string + Scope schema.MetricScope + Aggregate bool +} + +// BuildScopeQueries generates scope query results for a given scope transformation. +// It returns a slice of results and a boolean indicating success. +// An empty slice means an expected exception (skip this combination). +// ok=false means an unhandled case (caller should return an error). +func BuildScopeQueries( + nativeScope, requestedScope schema.MetricScope, + metric, hostname string, + topology *schema.Topology, + hwthreads []int, + accelerators []string, +) ([]ScopeQueryResult, bool) { + scope := nativeScope.Max(requestedScope) + results := []ScopeQueryResult{} + + hwthreadsStr := IntToStringSlice(hwthreads) + + // Accelerator -> Accelerator (Use "accelerator" scope if requested scope is lower than node) + if nativeScope == schema.MetricScopeAccelerator && scope.LT(schema.MetricScopeNode) { + if scope != schema.MetricScopeAccelerator { + // Expected Exception -> Return Empty Slice + return results, true + } + + results = append(results, ScopeQueryResult{ + Metric: metric, + Hostname: hostname, + Aggregate: false, + Type: &AcceleratorString, + TypeIds: accelerators, + Scope: schema.MetricScopeAccelerator, + }) + return results, true + } + + // Accelerator -> Node + if nativeScope == schema.MetricScopeAccelerator && scope == schema.MetricScopeNode { + if len(accelerators) == 0 { + // Expected Exception -> Return Empty Slice + return results, true + } + + results = append(results, ScopeQueryResult{ + Metric: metric, + Hostname: hostname, + Aggregate: true, + Type: &AcceleratorString, + TypeIds: accelerators, + Scope: scope, + }) + return results, true + } + + // HWThread -> HWThread + if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeHWThread { + results = append(results, ScopeQueryResult{ + Metric: metric, + Hostname: hostname, + Aggregate: false, + Type: &HWThreadString, + TypeIds: hwthreadsStr, + Scope: scope, + }) + return results, true + } + + // HWThread -> Core + if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeCore { + cores, _ := topology.GetCoresFromHWThreads(hwthreads) + for _, core := range cores { + results = append(results, ScopeQueryResult{ + Metric: metric, + Hostname: hostname, + Aggregate: true, + Type: &HWThreadString, + TypeIds: IntToStringSlice(topology.Core[core]), + Scope: scope, + }) + } + return results, true + } + + // HWThread -> Socket + if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeSocket { + sockets, _ := topology.GetSocketsFromHWThreads(hwthreads) + for _, socket := range sockets { + results = append(results, ScopeQueryResult{ + Metric: metric, + Hostname: hostname, + Aggregate: true, + Type: &HWThreadString, + TypeIds: IntToStringSlice(topology.Socket[socket]), + Scope: scope, + }) + } + return results, true + } + + // HWThread -> Node + if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeNode { + results = append(results, ScopeQueryResult{ + Metric: metric, + Hostname: hostname, + Aggregate: true, + Type: &HWThreadString, + TypeIds: hwthreadsStr, + Scope: scope, + }) + return results, true + } + + // Core -> Core + if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeCore { + cores, _ := topology.GetCoresFromHWThreads(hwthreads) + results = append(results, ScopeQueryResult{ + Metric: metric, + Hostname: hostname, + Aggregate: false, + Type: &CoreString, + TypeIds: IntToStringSlice(cores), + Scope: scope, + }) + return results, true + } + + // Core -> Socket + if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeSocket { + sockets, _ := topology.GetSocketsFromCores(hwthreads) + for _, socket := range sockets { + results = append(results, ScopeQueryResult{ + Metric: metric, + Hostname: hostname, + Aggregate: true, + Type: &CoreString, + TypeIds: IntToStringSlice(topology.Socket[socket]), + Scope: scope, + }) + } + return results, true + } + + // Core -> Node + if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeNode { + cores, _ := topology.GetCoresFromHWThreads(hwthreads) + results = append(results, ScopeQueryResult{ + Metric: metric, + Hostname: hostname, + Aggregate: true, + Type: &CoreString, + TypeIds: IntToStringSlice(cores), + Scope: scope, + }) + return results, true + } + + // MemoryDomain -> MemoryDomain + if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeMemoryDomain { + memDomains, _ := topology.GetMemoryDomainsFromHWThreads(hwthreads) + results = append(results, ScopeQueryResult{ + Metric: metric, + Hostname: hostname, + Aggregate: false, + Type: &MemoryDomainString, + TypeIds: IntToStringSlice(memDomains), + Scope: scope, + }) + return results, true + } + + // MemoryDomain -> Socket + if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeSocket { + memDomains, _ := topology.GetMemoryDomainsFromHWThreads(hwthreads) + socketToDomains, err := topology.GetMemoryDomainsBySocket(memDomains) + if err != nil { + cclog.Errorf("Error mapping memory domains to sockets, return unchanged: %v", err) + // Rare Error Case -> Still Continue -> Return Empty Slice + return results, true + } + + // Create a query for each socket + for _, domains := range socketToDomains { + results = append(results, ScopeQueryResult{ + Metric: metric, + Hostname: hostname, + Aggregate: true, + Type: &MemoryDomainString, + TypeIds: IntToStringSlice(domains), + Scope: scope, + }) + } + return results, true + } + + // MemoryDomain -> Node + if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeNode { + memDomains, _ := topology.GetMemoryDomainsFromHWThreads(hwthreads) + results = append(results, ScopeQueryResult{ + Metric: metric, + Hostname: hostname, + Aggregate: true, + Type: &MemoryDomainString, + TypeIds: IntToStringSlice(memDomains), + Scope: scope, + }) + return results, true + } + + // Socket -> Socket + if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeSocket { + sockets, _ := topology.GetSocketsFromHWThreads(hwthreads) + results = append(results, ScopeQueryResult{ + Metric: metric, + Hostname: hostname, + Aggregate: false, + Type: &SocketString, + TypeIds: IntToStringSlice(sockets), + Scope: scope, + }) + return results, true + } + + // Socket -> Node + if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeNode { + sockets, _ := topology.GetSocketsFromHWThreads(hwthreads) + results = append(results, ScopeQueryResult{ + Metric: metric, + Hostname: hostname, + Aggregate: true, + Type: &SocketString, + TypeIds: IntToStringSlice(sockets), + Scope: scope, + }) + return results, true + } + + // Node -> Node + if nativeScope == schema.MetricScopeNode && scope == schema.MetricScopeNode { + results = append(results, ScopeQueryResult{ + Metric: metric, + Hostname: hostname, + Scope: scope, + }) + return results, true + } + + // Unhandled Case + return nil, false +} + +// IntToStringSlice converts a slice of integers to a slice of strings. +// Used to convert hardware thread/core/socket IDs from topology (int) to query TypeIds (string). +// Optimized to reuse a byte buffer for string conversion, reducing allocations. +func IntToStringSlice(is []int) []string { + if len(is) == 0 { + return nil + } + + ss := make([]string, len(is)) + buf := make([]byte, 0, 16) // Reusable buffer for integer conversion + for i, x := range is { + buf = strconv.AppendInt(buf[:0], int64(x), 10) + ss[i] = string(buf) + } + return ss +} + +// SanitizeStats replaces NaN values in statistics with 0 to enable JSON marshaling. +// If ANY of avg/min/max is NaN, ALL three are zeroed for consistency. +func SanitizeStats(avg, min, max *schema.Float) { + if avg.IsNaN() || min.IsNaN() || max.IsNaN() { + *avg = schema.Float(0) + *min = schema.Float(0) + *max = schema.Float(0) + } +} diff --git a/pkg/metricstore/scopequery_test.go b/pkg/metricstore/scopequery_test.go new file mode 100644 index 00000000..4cdfca78 --- /dev/null +++ b/pkg/metricstore/scopequery_test.go @@ -0,0 +1,273 @@ +// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. This file is part of cc-backend. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. +package metricstore + +import ( + "testing" + + "github.com/ClusterCockpit/cc-lib/v2/schema" +) + +// makeTopology creates a simple 2-socket, 4-core, 8-hwthread topology for testing. +// Socket 0: cores 0,1 with hwthreads 0,1,2,3 +// Socket 1: cores 2,3 with hwthreads 4,5,6,7 +// MemoryDomain 0: hwthreads 0,1,2,3 (socket 0) +// MemoryDomain 1: hwthreads 4,5,6,7 (socket 1) +func makeTopology() schema.Topology { + topo := schema.Topology{ + Node: []int{0, 1, 2, 3, 4, 5, 6, 7}, + Socket: [][]int{{0, 1, 2, 3}, {4, 5, 6, 7}}, + MemoryDomain: [][]int{{0, 1, 2, 3}, {4, 5, 6, 7}}, + Core: [][]int{{0, 1}, {2, 3}, {4, 5}, {6, 7}}, + Accelerators: []*schema.Accelerator{ + {ID: "gpu0"}, + {ID: "gpu1"}, + }, + } + return topo +} + +func TestBuildScopeQueries(t *testing.T) { + topo := makeTopology() + topo.InitTopologyMaps() + accIds := topo.GetAcceleratorIDs() + + tests := []struct { + name string + nativeScope schema.MetricScope + requestedScope schema.MetricScope + expectOk bool + expectLen int // expected number of results + expectAgg bool + expectScope schema.MetricScope + }{ + // Same-scope cases + { + name: "HWThread->HWThread", nativeScope: schema.MetricScopeHWThread, + requestedScope: schema.MetricScopeHWThread, expectOk: true, expectLen: 1, + expectAgg: false, expectScope: schema.MetricScopeHWThread, + }, + { + name: "Core->Core", nativeScope: schema.MetricScopeCore, + requestedScope: schema.MetricScopeCore, expectOk: true, expectLen: 1, + expectAgg: false, expectScope: schema.MetricScopeCore, + }, + { + name: "Socket->Socket", nativeScope: schema.MetricScopeSocket, + requestedScope: schema.MetricScopeSocket, expectOk: true, expectLen: 1, + expectAgg: false, expectScope: schema.MetricScopeSocket, + }, + { + name: "MemoryDomain->MemoryDomain", nativeScope: schema.MetricScopeMemoryDomain, + requestedScope: schema.MetricScopeMemoryDomain, expectOk: true, expectLen: 1, + expectAgg: false, expectScope: schema.MetricScopeMemoryDomain, + }, + { + name: "Node->Node", nativeScope: schema.MetricScopeNode, + requestedScope: schema.MetricScopeNode, expectOk: true, expectLen: 1, + expectAgg: false, expectScope: schema.MetricScopeNode, + }, + { + name: "Accelerator->Accelerator", nativeScope: schema.MetricScopeAccelerator, + requestedScope: schema.MetricScopeAccelerator, expectOk: true, expectLen: 1, + expectAgg: false, expectScope: schema.MetricScopeAccelerator, + }, + // Aggregation cases + { + name: "HWThread->Core", nativeScope: schema.MetricScopeHWThread, + requestedScope: schema.MetricScopeCore, expectOk: true, expectLen: 4, // 4 cores + expectAgg: true, expectScope: schema.MetricScopeCore, + }, + { + name: "HWThread->Socket", nativeScope: schema.MetricScopeHWThread, + requestedScope: schema.MetricScopeSocket, expectOk: true, expectLen: 2, // 2 sockets + expectAgg: true, expectScope: schema.MetricScopeSocket, + }, + { + name: "HWThread->Node", nativeScope: schema.MetricScopeHWThread, + requestedScope: schema.MetricScopeNode, expectOk: true, expectLen: 1, + expectAgg: true, expectScope: schema.MetricScopeNode, + }, + { + name: "Core->Socket", nativeScope: schema.MetricScopeCore, + requestedScope: schema.MetricScopeSocket, expectOk: true, expectLen: 2, // 2 sockets + expectAgg: true, expectScope: schema.MetricScopeSocket, + }, + { + name: "Core->Node", nativeScope: schema.MetricScopeCore, + requestedScope: schema.MetricScopeNode, expectOk: true, expectLen: 1, + expectAgg: true, expectScope: schema.MetricScopeNode, + }, + { + name: "Socket->Node", nativeScope: schema.MetricScopeSocket, + requestedScope: schema.MetricScopeNode, expectOk: true, expectLen: 1, + expectAgg: true, expectScope: schema.MetricScopeNode, + }, + { + name: "MemoryDomain->Node", nativeScope: schema.MetricScopeMemoryDomain, + requestedScope: schema.MetricScopeNode, expectOk: true, expectLen: 1, + expectAgg: true, expectScope: schema.MetricScopeNode, + }, + { + name: "MemoryDomain->Socket", nativeScope: schema.MetricScopeMemoryDomain, + requestedScope: schema.MetricScopeSocket, expectOk: true, expectLen: 2, // 2 sockets + expectAgg: true, expectScope: schema.MetricScopeSocket, + }, + { + name: "Accelerator->Node", nativeScope: schema.MetricScopeAccelerator, + requestedScope: schema.MetricScopeNode, expectOk: true, expectLen: 1, + expectAgg: true, expectScope: schema.MetricScopeNode, + }, + // Expected exception: Accelerator scope requested but non-accelerator scope in between + { + name: "Accelerator->Core (exception)", nativeScope: schema.MetricScopeAccelerator, + requestedScope: schema.MetricScopeCore, expectOk: true, expectLen: 0, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + results, ok := BuildScopeQueries( + tt.nativeScope, tt.requestedScope, + "test_metric", "node001", + &topo, topo.Node, accIds, + ) + + if ok != tt.expectOk { + t.Fatalf("expected ok=%v, got ok=%v", tt.expectOk, ok) + } + + if len(results) != tt.expectLen { + t.Fatalf("expected %d results, got %d", tt.expectLen, len(results)) + } + + if tt.expectLen > 0 { + for _, r := range results { + if r.Scope != tt.expectScope { + t.Errorf("expected scope %s, got %s", tt.expectScope, r.Scope) + } + if r.Aggregate != tt.expectAgg { + t.Errorf("expected aggregate=%v, got %v", tt.expectAgg, r.Aggregate) + } + if r.Metric != "test_metric" { + t.Errorf("expected metric 'test_metric', got '%s'", r.Metric) + } + if r.Hostname != "node001" { + t.Errorf("expected hostname 'node001', got '%s'", r.Hostname) + } + } + } + }) + } +} + +func TestBuildScopeQueries_UnhandledCase(t *testing.T) { + topo := makeTopology() + topo.InitTopologyMaps() + + // Node native with HWThread requested => scope.Max = Node, but let's try an invalid combination + // Actually all valid combinations are handled. An unhandled case would be something like + // a scope that doesn't exist in the if-chain. Since all real scopes are covered, + // we test with a synthetic unhandled combination by checking the bool return. + // The function should return ok=false for truly unhandled cases. + + // For now, verify all known combinations return ok=true + scopes := []schema.MetricScope{ + schema.MetricScopeHWThread, schema.MetricScopeCore, + schema.MetricScopeSocket, schema.MetricScopeNode, + } + + for _, native := range scopes { + for _, requested := range scopes { + results, ok := BuildScopeQueries( + native, requested, + "m", "h", &topo, topo.Node, nil, + ) + if !ok { + t.Errorf("unexpected unhandled case: native=%s, requested=%s", native, requested) + } + if results == nil { + t.Errorf("results should not be nil for native=%s, requested=%s", native, requested) + } + } + } +} + +func TestIntToStringSlice(t *testing.T) { + tests := []struct { + input []int + expected []string + }{ + {nil, nil}, + {[]int{}, nil}, + {[]int{0}, []string{"0"}}, + {[]int{1, 2, 3}, []string{"1", "2", "3"}}, + {[]int{10, 100, 1000}, []string{"10", "100", "1000"}}, + } + + for _, tt := range tests { + result := IntToStringSlice(tt.input) + if len(result) != len(tt.expected) { + t.Errorf("IntToStringSlice(%v): expected len %d, got %d", tt.input, len(tt.expected), len(result)) + continue + } + for i := range result { + if result[i] != tt.expected[i] { + t.Errorf("IntToStringSlice(%v)[%d]: expected %s, got %s", tt.input, i, tt.expected[i], result[i]) + } + } + } +} + +func TestSanitizeStats(t *testing.T) { + // Test: all valid - should remain unchanged + avg, min, max := schema.Float(1.0), schema.Float(0.5), schema.Float(2.0) + SanitizeStats(&avg, &min, &max) + if avg != 1.0 || min != 0.5 || max != 2.0 { + t.Errorf("SanitizeStats should not change valid values") + } + + // Test: one NaN - all should be zeroed + avg, min, max = schema.Float(1.0), schema.Float(0.5), schema.NaN + SanitizeStats(&avg, &min, &max) + if avg != 0 || min != 0 || max != 0 { + t.Errorf("SanitizeStats should zero all when any is NaN, got avg=%v min=%v max=%v", avg, min, max) + } + + // Test: all NaN + avg, min, max = schema.NaN, schema.NaN, schema.NaN + SanitizeStats(&avg, &min, &max) + if avg != 0 || min != 0 || max != 0 { + t.Errorf("SanitizeStats should zero all NaN values") + } +} + +func TestNodeToNodeQuery(t *testing.T) { + topo := makeTopology() + topo.InitTopologyMaps() + + results, ok := BuildScopeQueries( + schema.MetricScopeNode, schema.MetricScopeNode, + "cpu_load", "node001", + &topo, topo.Node, nil, + ) + + if !ok { + t.Fatal("expected ok=true for Node->Node") + } + if len(results) != 1 { + t.Fatalf("expected 1 result, got %d", len(results)) + } + r := results[0] + if r.Type != nil { + t.Error("Node->Node should have nil Type") + } + if r.TypeIds != nil { + t.Error("Node->Node should have nil TypeIds") + } + if r.Aggregate { + t.Error("Node->Node should not aggregate") + } +}