Consolidate code for external and internal ccms buildQueries function

Entire-Checkpoint: fc3be444ef4c
This commit is contained in:
2026-03-04 16:43:05 +01:00
parent 9672903d41
commit 26982088c3
6 changed files with 676 additions and 806 deletions

View File

@@ -37,23 +37,13 @@ package metricstoreclient
import (
"fmt"
"strconv"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
"github.com/ClusterCockpit/cc-backend/pkg/metricstore"
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
"github.com/ClusterCockpit/cc-lib/v2/schema"
)
// Scope string constants used in API queries.
// Pre-converted to avoid repeated allocations during query building.
var (
hwthreadString = string(schema.MetricScopeHWThread)
coreString = string(schema.MetricScopeCore)
memoryDomainString = string(schema.MetricScopeMemoryDomain)
socketString = string(schema.MetricScopeSocket)
acceleratorString = string(schema.MetricScopeAccelerator)
)
// buildQueries constructs API queries for job-specific metric data.
// It iterates through metrics, scopes, and job resources to build the complete query set.
//
@@ -126,21 +116,27 @@ func (ccms *CCMetricStore) buildQueries(
hwthreads = topology.Node
}
// Note: Expected exceptions will return as empty slices -> Continue
hostQueries, hostScopes := buildScopeQueries(
scopeResults, ok := metricstore.BuildScopeQueries(
nativeScope, requestedScope,
remoteName, host.Hostname,
topology, hwthreads, host.Accelerators,
resolution,
)
// Note: Unexpected errors, such as unhandled cases, will return as nils -> Error
if hostQueries == nil && hostScopes == nil {
if !ok {
return nil, nil, fmt.Errorf("METRICDATA/EXTERNAL-CCMS > TODO: unhandled case: native-scope=%s, requested-scope=%s", nativeScope, requestedScope)
}
queries = append(queries, hostQueries...)
assignedScope = append(assignedScope, hostScopes...)
for _, sr := range scopeResults {
queries = append(queries, APIQuery{
Metric: sr.Metric,
Hostname: sr.Hostname,
Aggregate: sr.Aggregate,
Type: sr.Type,
TypeIds: sr.TypeIds,
Resolution: resolution,
})
assignedScope = append(assignedScope, sr.Scope)
}
}
}
}
@@ -231,19 +227,27 @@ func (ccms *CCMetricStore) buildNodeQueries(
continue scopesLoop
}
nodeQueries, nodeScopes := buildScopeQueries(
scopeResults, ok := metricstore.BuildScopeQueries(
nativeScope, requestedScope,
remoteName, hostname,
topology, topology.Node, acceleratorIds,
resolution,
)
if len(nodeQueries) == 0 && len(nodeScopes) == 0 {
if !ok {
return nil, nil, fmt.Errorf("METRICDATA/EXTERNAL-CCMS > TODO: unhandled case: native-scope=%s, requested-scope=%s", nativeScope, requestedScope)
}
queries = append(queries, nodeQueries...)
assignedScope = append(assignedScope, nodeScopes...)
for _, sr := range scopeResults {
queries = append(queries, APIQuery{
Metric: sr.Metric,
Hostname: sr.Hostname,
Aggregate: sr.Aggregate,
Type: sr.Type,
TypeIds: sr.TypeIds,
Resolution: resolution,
})
assignedScope = append(assignedScope, sr.Scope)
}
}
}
}
@@ -251,277 +255,3 @@ func (ccms *CCMetricStore) buildNodeQueries(
return queries, assignedScope, nil
}
// buildScopeQueries generates API queries for a given scope transformation.
// It returns a slice of queries and corresponding assigned scopes.
// Some transformations (e.g., HWThread -> Core/Socket) may generate multiple queries.
func buildScopeQueries(
nativeScope, requestedScope schema.MetricScope,
metric, hostname string,
topology *schema.Topology,
hwthreads []int,
accelerators []string,
resolution int,
) ([]APIQuery, []schema.MetricScope) {
scope := nativeScope.Max(requestedScope)
queries := []APIQuery{}
scopes := []schema.MetricScope{}
hwthreadsStr := intToStringSlice(hwthreads)
// Accelerator -> Accelerator (Use "accelerator" scope if requested scope is lower than node)
if nativeScope == schema.MetricScopeAccelerator && scope.LT(schema.MetricScopeNode) {
if scope != schema.MetricScopeAccelerator {
// Expected Exception -> Continue -> Return Empty Slices
return queries, scopes
}
queries = append(queries, APIQuery{
Metric: metric,
Hostname: hostname,
Aggregate: false,
Type: &acceleratorString,
TypeIds: accelerators,
Resolution: resolution,
})
scopes = append(scopes, schema.MetricScopeAccelerator)
return queries, scopes
}
// Accelerator -> Node
if nativeScope == schema.MetricScopeAccelerator && scope == schema.MetricScopeNode {
if len(accelerators) == 0 {
// Expected Exception -> Continue -> Return Empty Slices
return queries, scopes
}
queries = append(queries, APIQuery{
Metric: metric,
Hostname: hostname,
Aggregate: true,
Type: &acceleratorString,
TypeIds: accelerators,
Resolution: resolution,
})
scopes = append(scopes, scope)
return queries, scopes
}
// HWThread -> HWThread
if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeHWThread {
queries = append(queries, APIQuery{
Metric: metric,
Hostname: hostname,
Aggregate: false,
Type: &hwthreadString,
TypeIds: hwthreadsStr,
Resolution: resolution,
})
scopes = append(scopes, scope)
return queries, scopes
}
// HWThread -> Core
if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeCore {
cores, _ := topology.GetCoresFromHWThreads(hwthreads)
for _, core := range cores {
queries = append(queries, APIQuery{
Metric: metric,
Hostname: hostname,
Aggregate: true,
Type: &hwthreadString,
TypeIds: intToStringSlice(topology.Core[core]),
Resolution: resolution,
})
scopes = append(scopes, scope)
}
return queries, scopes
}
// HWThread -> Socket
if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeSocket {
sockets, _ := topology.GetSocketsFromHWThreads(hwthreads)
for _, socket := range sockets {
queries = append(queries, APIQuery{
Metric: metric,
Hostname: hostname,
Aggregate: true,
Type: &hwthreadString,
TypeIds: intToStringSlice(topology.Socket[socket]),
Resolution: resolution,
})
scopes = append(scopes, scope)
}
return queries, scopes
}
// HWThread -> Node
if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeNode {
queries = append(queries, APIQuery{
Metric: metric,
Hostname: hostname,
Aggregate: true,
Type: &hwthreadString,
TypeIds: hwthreadsStr,
Resolution: resolution,
})
scopes = append(scopes, scope)
return queries, scopes
}
// Core -> Core
if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeCore {
cores, _ := topology.GetCoresFromHWThreads(hwthreads)
queries = append(queries, APIQuery{
Metric: metric,
Hostname: hostname,
Aggregate: false,
Type: &coreString,
TypeIds: intToStringSlice(cores),
Resolution: resolution,
})
scopes = append(scopes, scope)
return queries, scopes
}
// Core -> Socket
if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeSocket {
sockets, _ := topology.GetSocketsFromCores(hwthreads)
for _, socket := range sockets {
queries = append(queries, APIQuery{
Metric: metric,
Hostname: hostname,
Aggregate: true,
Type: &coreString,
TypeIds: intToStringSlice(topology.Socket[socket]),
Resolution: resolution,
})
scopes = append(scopes, scope)
}
return queries, scopes
}
// Core -> Node
if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeNode {
cores, _ := topology.GetCoresFromHWThreads(hwthreads)
queries = append(queries, APIQuery{
Metric: metric,
Hostname: hostname,
Aggregate: true,
Type: &coreString,
TypeIds: intToStringSlice(cores),
Resolution: resolution,
})
scopes = append(scopes, scope)
return queries, scopes
}
// MemoryDomain -> MemoryDomain
if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeMemoryDomain {
memDomains, _ := topology.GetMemoryDomainsFromHWThreads(hwthreads)
queries = append(queries, APIQuery{
Metric: metric,
Hostname: hostname,
Aggregate: false,
Type: &memoryDomainString,
TypeIds: intToStringSlice(memDomains),
Resolution: resolution,
})
scopes = append(scopes, scope)
return queries, scopes
}
// MemoryDomain -> Node
if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeNode {
memDomains, _ := topology.GetMemoryDomainsFromHWThreads(hwthreads)
queries = append(queries, APIQuery{
Metric: metric,
Hostname: hostname,
Aggregate: true,
Type: &memoryDomainString,
TypeIds: intToStringSlice(memDomains),
Resolution: resolution,
})
scopes = append(scopes, scope)
return queries, scopes
}
// MemoryDomain -> Socket
if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeSocket {
memDomains, _ := topology.GetMemoryDomainsFromHWThreads(hwthreads)
socketToDomains, err := topology.GetMemoryDomainsBySocket(memDomains)
if err != nil {
cclog.Errorf("Error mapping memory domains to sockets, return unchanged: %v", err)
// Rare Error Case -> Still Continue -> Return Empty Slices
return queries, scopes
}
// Create a query for each socket
for _, domains := range socketToDomains {
queries = append(queries, APIQuery{
Metric: metric,
Hostname: hostname,
Aggregate: true,
Type: &memoryDomainString,
TypeIds: intToStringSlice(domains),
Resolution: resolution,
})
// Add scope for each query, not just once
scopes = append(scopes, scope)
}
return queries, scopes
}
// Socket -> Socket
if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeSocket {
sockets, _ := topology.GetSocketsFromHWThreads(hwthreads)
queries = append(queries, APIQuery{
Metric: metric,
Hostname: hostname,
Aggregate: false,
Type: &socketString,
TypeIds: intToStringSlice(sockets),
Resolution: resolution,
})
scopes = append(scopes, scope)
return queries, scopes
}
// Socket -> Node
if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeNode {
sockets, _ := topology.GetSocketsFromHWThreads(hwthreads)
queries = append(queries, APIQuery{
Metric: metric,
Hostname: hostname,
Aggregate: true,
Type: &socketString,
TypeIds: intToStringSlice(sockets),
Resolution: resolution,
})
scopes = append(scopes, scope)
return queries, scopes
}
// Node -> Node
if nativeScope == schema.MetricScopeNode && scope == schema.MetricScopeNode {
queries = append(queries, APIQuery{
Metric: metric,
Hostname: hostname,
Resolution: resolution,
})
scopes = append(scopes, scope)
return queries, scopes
}
// Unhandled Case -> Error -> Return nils
return nil, nil
}
// intToStringSlice converts a slice of integers to a slice of strings.
// Used to convert hardware IDs (core IDs, socket IDs, etc.) to the string format required by the API.
func intToStringSlice(is []int) []string {
ss := make([]string, len(is))
for i, x := range is {
ss[i] = strconv.Itoa(x)
}
return ss
}