mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2026-01-16 09:41:47 +01:00
Document and refactor
This commit is contained in:
@@ -3,6 +3,27 @@
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
// This file implements high-level query functions for loading job metric data
|
||||
// with automatic scope transformation and aggregation.
|
||||
//
|
||||
// Key Concepts:
|
||||
//
|
||||
// Metric Scopes: Metrics are collected at different granularities (native scope):
|
||||
// - HWThread: Per hardware thread
|
||||
// - Core: Per CPU core
|
||||
// - Socket: Per CPU socket
|
||||
// - MemoryDomain: Per memory domain (NUMA)
|
||||
// - Accelerator: Per GPU/accelerator
|
||||
// - Node: Per compute node
|
||||
//
|
||||
// Scope Transformation: The buildQueries functions transform between native scope
|
||||
// and requested scope by:
|
||||
// - Aggregating finer-grained data (e.g., HWThread → Core → Socket → Node)
|
||||
// - Rejecting requests for finer granularity than available
|
||||
// - Handling special cases (e.g., Accelerator metrics)
|
||||
//
|
||||
// Query Building: Constructs APIQuery structures with proper selectors (Type, TypeIds)
|
||||
// based on cluster topology and job resources.
|
||||
package metricstore
|
||||
|
||||
import (
|
||||
@@ -17,9 +38,33 @@ import (
|
||||
"github.com/ClusterCockpit/cc-lib/v2/schema"
|
||||
)
|
||||
|
||||
// TestLoadDataCallback allows tests to override LoadData behavior
|
||||
// TestLoadDataCallback allows tests to override LoadData behavior for testing purposes.
|
||||
// When set to a non-nil function, LoadData will call this function instead of the default implementation.
|
||||
var TestLoadDataCallback func(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context, resolution int) (schema.JobData, error)
|
||||
|
||||
// LoadData loads metric data for a specific job with automatic scope transformation.
|
||||
//
|
||||
// This is the primary function for retrieving job metric data. It handles:
|
||||
// - Building queries with scope transformation via buildQueries
|
||||
// - Fetching data from the metric store
|
||||
// - Organizing results by metric and scope
|
||||
// - Converting NaN statistics to 0 for JSON compatibility
|
||||
// - Partial error handling (returns data for successful queries even if some fail)
|
||||
//
|
||||
// Parameters:
|
||||
// - job: Job metadata including cluster, resources, and time range
|
||||
// - metrics: List of metric names to load
|
||||
// - scopes: Requested metric scopes (will be transformed to match native scopes)
|
||||
// - ctx: Context for cancellation (currently unused but reserved for future use)
|
||||
// - resolution: Data resolution in seconds (0 for native resolution)
|
||||
//
|
||||
// Returns:
|
||||
// - JobData: Map of metric → scope → JobMetric with time-series data and statistics
|
||||
// - Error: Returns error if query building or fetching fails, or partial error listing failed hosts
|
||||
//
|
||||
// Example:
|
||||
//
|
||||
// jobData, err := LoadData(job, []string{"cpu_load", "mem_used"}, []schema.MetricScope{schema.MetricScopeNode}, ctx, 60)
|
||||
func LoadData(
|
||||
job *schema.Job,
|
||||
metrics []string,
|
||||
@@ -91,12 +136,7 @@ func LoadData(
|
||||
*id = query.TypeIds[ndx]
|
||||
}
|
||||
|
||||
if res.Avg.IsNaN() || res.Min.IsNaN() || res.Max.IsNaN() {
|
||||
// "schema.Float()" because regular float64 can not be JSONed when NaN.
|
||||
res.Avg = schema.Float(0)
|
||||
res.Min = schema.Float(0)
|
||||
res.Max = schema.Float(0)
|
||||
}
|
||||
sanitizeStats(&res)
|
||||
|
||||
jobMetric.Series = append(jobMetric.Series, schema.Series{
|
||||
Hostname: query.Hostname,
|
||||
@@ -126,6 +166,10 @@ func LoadData(
|
||||
return jobData, nil
|
||||
}
|
||||
|
||||
// Pre-converted scope strings avoid repeated string(MetricScope) allocations during
|
||||
// query construction. These are used in APIQuery.Type field throughout buildQueries
|
||||
// and buildNodeQueries functions. Converting once at package initialization improves
|
||||
// performance for high-volume query building.
|
||||
var (
|
||||
hwthreadString = string(schema.MetricScopeHWThread)
|
||||
coreString = string(schema.MetricScopeCore)
|
||||
@@ -134,12 +178,41 @@ var (
|
||||
acceleratorString = string(schema.MetricScopeAccelerator)
|
||||
)
|
||||
|
||||
// buildQueries constructs APIQuery structures with automatic scope transformation for a job.
|
||||
//
|
||||
// This function implements the core scope transformation logic, handling all combinations of
|
||||
// native metric scopes and requested scopes. It uses the cluster topology to determine which
|
||||
// hardware IDs to include in each query.
|
||||
//
|
||||
// Scope Transformation Rules:
|
||||
// - If native scope >= requested scope: Aggregates data (Aggregate=true in APIQuery)
|
||||
// - If native scope < requested scope: Returns error (cannot increase granularity)
|
||||
// - Special handling for Accelerator scope (independent of CPU hierarchy)
|
||||
//
|
||||
// The function generates one or more APIQuery per (metric, scope, host) combination:
|
||||
// - For non-aggregated queries: One query with all relevant IDs
|
||||
// - For aggregated queries: May generate multiple queries (e.g., one per socket/core)
|
||||
//
|
||||
// Parameters:
|
||||
// - job: Job metadata including cluster, subcluster, and resource allocation
|
||||
// - metrics: List of metrics to query
|
||||
// - scopes: Requested scopes for each metric
|
||||
// - resolution: Data resolution in seconds
|
||||
//
|
||||
// Returns:
|
||||
// - []APIQuery: List of queries to execute
|
||||
// - []schema.MetricScope: Assigned scope for each query (after transformation)
|
||||
// - error: Returns error if topology lookup fails or unhandled scope combination encountered
|
||||
func buildQueries(
|
||||
job *schema.Job,
|
||||
metrics []string,
|
||||
scopes []schema.MetricScope,
|
||||
resolution int64,
|
||||
) ([]APIQuery, []schema.MetricScope, error) {
|
||||
if len(job.Resources) == 0 {
|
||||
return nil, nil, fmt.Errorf("METRICDATA/CCMS > no resources allocated for job %d", job.JobID)
|
||||
}
|
||||
|
||||
queries := make([]APIQuery, 0, len(metrics)*len(scopes)*len(job.Resources))
|
||||
assignedScope := []schema.MetricScope{}
|
||||
|
||||
@@ -152,7 +225,6 @@ func buildQueries(
|
||||
for _, metric := range metrics {
|
||||
mc := archive.GetMetricConfig(job.Cluster, metric)
|
||||
if mc == nil {
|
||||
// return nil, fmt.Errorf("METRICDATA/CCMS > metric '%s' is not specified for cluster '%s'", metric, job.Cluster)
|
||||
cclog.Infof("metric '%s' is not specified for cluster '%s'", metric, job.Cluster)
|
||||
continue
|
||||
}
|
||||
@@ -171,10 +243,9 @@ func buildQueries(
|
||||
}
|
||||
}
|
||||
|
||||
// Avoid duplicates...
|
||||
handledScopes := make([]schema.MetricScope, 0, 3)
|
||||
// Avoid duplicates using map for O(1) lookup
|
||||
handledScopes := make(map[schema.MetricScope]bool, 3)
|
||||
|
||||
scopesLoop:
|
||||
for _, requestedScope := range scopes {
|
||||
nativeScope := mc.Scope
|
||||
if nativeScope == schema.MetricScopeAccelerator && job.NumAcc == 0 {
|
||||
@@ -182,12 +253,10 @@ func buildQueries(
|
||||
}
|
||||
|
||||
scope := nativeScope.Max(requestedScope)
|
||||
for _, s := range handledScopes {
|
||||
if scope == s {
|
||||
continue scopesLoop
|
||||
if handledScopes[scope] {
|
||||
continue
|
||||
}
|
||||
}
|
||||
handledScopes = append(handledScopes, scope)
|
||||
handledScopes[scope] = true
|
||||
|
||||
for _, host := range job.Resources {
|
||||
hwthreads := host.HWThreads
|
||||
@@ -232,7 +301,7 @@ func buildQueries(
|
||||
continue
|
||||
}
|
||||
|
||||
// HWThread -> HWThead
|
||||
// HWThread -> HWThread
|
||||
if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeHWThread {
|
||||
queries = append(queries, APIQuery{
|
||||
Metric: metric,
|
||||
@@ -356,7 +425,7 @@ func buildQueries(
|
||||
continue
|
||||
}
|
||||
|
||||
// MemoryDoman -> Node
|
||||
// MemoryDomain -> Node
|
||||
if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeNode {
|
||||
sockets, _ := topology.GetMemoryDomainsFromHWThreads(hwthreads)
|
||||
queries = append(queries, APIQuery{
|
||||
@@ -420,12 +489,26 @@ func buildQueries(
|
||||
return queries, assignedScope, nil
|
||||
}
|
||||
|
||||
// LoadStats loads only metric statistics (avg/min/max) for a job at node scope.
|
||||
//
|
||||
// This is an optimized version of LoadData that fetches only statistics without
|
||||
// time-series data, reducing bandwidth and memory usage. Always queries at node scope.
|
||||
//
|
||||
// Parameters:
|
||||
// - job: Job metadata
|
||||
// - metrics: List of metric names
|
||||
// - ctx: Context (currently unused)
|
||||
//
|
||||
// Returns:
|
||||
// - Map of metric → hostname → statistics
|
||||
// - Error on query building or fetching failure
|
||||
func LoadStats(
|
||||
job *schema.Job,
|
||||
metrics []string,
|
||||
ctx context.Context,
|
||||
) (map[string]map[string]schema.MetricStatistics, error) {
|
||||
queries, _, err := buildQueries(job, metrics, []schema.MetricScope{schema.MetricScopeNode}, 0) // #166 Add scope shere for analysis view accelerator normalization?
|
||||
// TODO(#166): Add scope parameter for analysis view accelerator normalization
|
||||
queries, _, err := buildQueries(job, metrics, []schema.MetricScope{schema.MetricScopeNode}, 0)
|
||||
if err != nil {
|
||||
cclog.Errorf("Error while building queries for jobId %d, Metrics %v: %s", job.JobID, metrics, err.Error())
|
||||
return nil, err
|
||||
@@ -477,6 +560,20 @@ func LoadStats(
|
||||
return stats, nil
|
||||
}
|
||||
|
||||
// LoadScopedStats loads metric statistics for a job with scope-aware grouping.
|
||||
//
|
||||
// Similar to LoadStats but supports multiple scopes and returns statistics grouped
|
||||
// by scope with hardware IDs (e.g., per-core, per-socket statistics).
|
||||
//
|
||||
// Parameters:
|
||||
// - job: Job metadata
|
||||
// - metrics: List of metric names
|
||||
// - scopes: Requested metric scopes
|
||||
// - ctx: Context (currently unused)
|
||||
//
|
||||
// Returns:
|
||||
// - ScopedJobStats: Map of metric → scope → []ScopedStats (with hostname and ID)
|
||||
// - Error or partial error listing failed queries
|
||||
func LoadScopedStats(
|
||||
job *schema.Job,
|
||||
metrics []string,
|
||||
@@ -533,12 +630,7 @@ func LoadScopedStats(
|
||||
*id = query.TypeIds[ndx]
|
||||
}
|
||||
|
||||
if res.Avg.IsNaN() || res.Min.IsNaN() || res.Max.IsNaN() {
|
||||
// "schema.Float()" because regular float64 can not be JSONed when NaN.
|
||||
res.Avg = schema.Float(0)
|
||||
res.Min = schema.Float(0)
|
||||
res.Max = schema.Float(0)
|
||||
}
|
||||
sanitizeStats(&res)
|
||||
|
||||
scopedJobStats[metric][scope] = append(scopedJobStats[metric][scope], &schema.ScopedStats{
|
||||
Hostname: query.Hostname,
|
||||
@@ -567,6 +659,22 @@ func LoadScopedStats(
|
||||
return scopedJobStats, nil
|
||||
}
|
||||
|
||||
// LoadNodeData loads metric data for specific nodes in a cluster over a time range.
|
||||
//
|
||||
// Unlike LoadData which operates on job resources, this function queries arbitrary nodes
|
||||
// directly. Useful for system monitoring and node status views.
|
||||
//
|
||||
// Parameters:
|
||||
// - cluster: Cluster name
|
||||
// - metrics: List of metric names
|
||||
// - nodes: List of node hostnames (nil = all nodes in cluster via ForAllNodes)
|
||||
// - scopes: Requested metric scopes (currently unused - always node scope)
|
||||
// - from, to: Time range
|
||||
// - ctx: Context (currently unused)
|
||||
//
|
||||
// Returns:
|
||||
// - Map of hostname → metric → []JobMetric
|
||||
// - Error or partial error listing failed queries
|
||||
func LoadNodeData(
|
||||
cluster string,
|
||||
metrics, nodes []string,
|
||||
@@ -615,14 +723,10 @@ func LoadNodeData(
|
||||
metric := query.Metric
|
||||
qdata := res[0]
|
||||
if qdata.Error != nil {
|
||||
/* Build list for "partial errors", if any */
|
||||
errors = append(errors, fmt.Sprintf("fetching %s for node %s failed: %s", metric, query.Hostname, *qdata.Error))
|
||||
}
|
||||
|
||||
if qdata.Avg.IsNaN() || qdata.Min.IsNaN() || qdata.Max.IsNaN() {
|
||||
// return nil, fmt.Errorf("METRICDATA/CCMS > fetching %s for node %s failed: %s", metric, query.Hostname, "avg/min/max is NaN")
|
||||
qdata.Avg, qdata.Min, qdata.Max = 0., 0., 0.
|
||||
}
|
||||
sanitizeStats(&qdata)
|
||||
|
||||
hostdata, ok := data[query.Hostname]
|
||||
if !ok {
|
||||
@@ -656,6 +760,24 @@ func LoadNodeData(
|
||||
return data, nil
|
||||
}
|
||||
|
||||
// LoadNodeListData loads metric data for a list of nodes with full scope transformation support.
|
||||
//
|
||||
// This is the most flexible node data loading function, supporting arbitrary scopes and
|
||||
// resolution. Uses buildNodeQueries for proper scope transformation based on topology.
|
||||
//
|
||||
// Parameters:
|
||||
// - cluster: Cluster name
|
||||
// - subCluster: SubCluster name (empty string to infer from node names)
|
||||
// - nodes: List of node hostnames
|
||||
// - metrics: List of metric names
|
||||
// - scopes: Requested metric scopes
|
||||
// - resolution: Data resolution in seconds
|
||||
// - from, to: Time range
|
||||
// - ctx: Context (currently unused)
|
||||
//
|
||||
// Returns:
|
||||
// - Map of hostname → JobData (metric → scope → JobMetric)
|
||||
// - Error or partial error listing failed queries
|
||||
func LoadNodeListData(
|
||||
cluster, subCluster string,
|
||||
nodes []string,
|
||||
@@ -696,7 +818,7 @@ func LoadNodeListData(
|
||||
} else {
|
||||
query = req.Queries[i]
|
||||
}
|
||||
// qdata := res[0]
|
||||
|
||||
metric := query.Metric
|
||||
scope := assignedScope[i]
|
||||
mc := archive.GetMetricConfig(cluster, metric)
|
||||
@@ -742,12 +864,7 @@ func LoadNodeListData(
|
||||
*id = query.TypeIds[ndx]
|
||||
}
|
||||
|
||||
if res.Avg.IsNaN() || res.Min.IsNaN() || res.Max.IsNaN() {
|
||||
// "schema.Float()" because regular float64 can not be JSONed when NaN.
|
||||
res.Avg = schema.Float(0)
|
||||
res.Min = schema.Float(0)
|
||||
res.Max = schema.Float(0)
|
||||
}
|
||||
sanitizeStats(&res)
|
||||
|
||||
scopeData.Series = append(scopeData.Series, schema.Series{
|
||||
Hostname: query.Hostname,
|
||||
@@ -770,6 +887,23 @@ func LoadNodeListData(
|
||||
return data, nil
|
||||
}
|
||||
|
||||
// buildNodeQueries constructs APIQuery structures for node-based queries with scope transformation.
|
||||
//
|
||||
// Similar to buildQueries but operates on node lists rather than job resources.
|
||||
// Supports dynamic subcluster lookup when subCluster parameter is empty.
|
||||
//
|
||||
// Parameters:
|
||||
// - cluster: Cluster name
|
||||
// - subCluster: SubCluster name (empty = infer from node hostnames)
|
||||
// - nodes: List of node hostnames
|
||||
// - metrics: List of metric names
|
||||
// - scopes: Requested metric scopes
|
||||
// - resolution: Data resolution in seconds
|
||||
//
|
||||
// Returns:
|
||||
// - []APIQuery: List of queries to execute
|
||||
// - []schema.MetricScope: Assigned scope for each query
|
||||
// - error: Returns error if topology lookup fails or unhandled scope combination
|
||||
func buildNodeQueries(
|
||||
cluster string,
|
||||
subCluster string,
|
||||
@@ -778,6 +912,10 @@ func buildNodeQueries(
|
||||
scopes []schema.MetricScope,
|
||||
resolution int64,
|
||||
) ([]APIQuery, []schema.MetricScope, error) {
|
||||
if len(nodes) == 0 {
|
||||
return nil, nil, fmt.Errorf("METRICDATA/CCMS > no nodes specified for query")
|
||||
}
|
||||
|
||||
queries := make([]APIQuery, 0, len(metrics)*len(scopes)*len(nodes))
|
||||
assignedScope := []schema.MetricScope{}
|
||||
|
||||
@@ -795,7 +933,6 @@ func buildNodeQueries(
|
||||
for _, metric := range metrics {
|
||||
mc := archive.GetMetricConfig(cluster, metric)
|
||||
if mc == nil {
|
||||
// return nil, fmt.Errorf("METRICDATA/CCMS > metric '%s' is not specified for cluster '%s'", metric, cluster)
|
||||
cclog.Warnf("metric '%s' is not specified for cluster '%s'", metric, cluster)
|
||||
continue
|
||||
}
|
||||
@@ -814,20 +951,17 @@ func buildNodeQueries(
|
||||
}
|
||||
}
|
||||
|
||||
// Avoid duplicates...
|
||||
handledScopes := make([]schema.MetricScope, 0, 3)
|
||||
// Avoid duplicates using map for O(1) lookup
|
||||
handledScopes := make(map[schema.MetricScope]bool, 3)
|
||||
|
||||
scopesLoop:
|
||||
for _, requestedScope := range scopes {
|
||||
nativeScope := mc.Scope
|
||||
|
||||
scope := nativeScope.Max(requestedScope)
|
||||
for _, s := range handledScopes {
|
||||
if scope == s {
|
||||
continue scopesLoop
|
||||
if handledScopes[scope] {
|
||||
continue
|
||||
}
|
||||
}
|
||||
handledScopes = append(handledScopes, scope)
|
||||
handledScopes[scope] = true
|
||||
|
||||
for _, hostname := range nodes {
|
||||
|
||||
@@ -850,7 +984,7 @@ func buildNodeQueries(
|
||||
|
||||
// Moved check here if metric matches hardware specs
|
||||
if nativeScope == schema.MetricScopeAccelerator && len(acceleratorIds) == 0 {
|
||||
continue scopesLoop
|
||||
continue
|
||||
}
|
||||
|
||||
// Accelerator -> Accelerator (Use "accelerator" scope if requested scope is lower than node)
|
||||
@@ -890,7 +1024,7 @@ func buildNodeQueries(
|
||||
continue
|
||||
}
|
||||
|
||||
// HWThread -> HWThead
|
||||
// HWThread -> HWThread
|
||||
if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeHWThread {
|
||||
queries = append(queries, APIQuery{
|
||||
Metric: metric,
|
||||
@@ -1014,7 +1148,7 @@ func buildNodeQueries(
|
||||
continue
|
||||
}
|
||||
|
||||
// MemoryDoman -> Node
|
||||
// MemoryDomain -> Node
|
||||
if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeNode {
|
||||
sockets, _ := topology.GetMemoryDomainsFromHWThreads(topology.Node)
|
||||
queries = append(queries, APIQuery{
|
||||
@@ -1078,10 +1212,37 @@ func buildNodeQueries(
|
||||
return queries, assignedScope, nil
|
||||
}
|
||||
|
||||
// sanitizeStats converts NaN statistics to zero for JSON compatibility.
|
||||
//
|
||||
// schema.Float with NaN values cannot be properly JSON-encoded, so we convert
|
||||
// NaN to 0. This loses the distinction between "no data" and "zero value",
|
||||
// but maintains API compatibility.
|
||||
func sanitizeStats(data *APIMetricData) {
|
||||
if data.Avg.IsNaN() {
|
||||
data.Avg = schema.Float(0)
|
||||
}
|
||||
if data.Min.IsNaN() {
|
||||
data.Min = schema.Float(0)
|
||||
}
|
||||
if data.Max.IsNaN() {
|
||||
data.Max = schema.Float(0)
|
||||
}
|
||||
}
|
||||
|
||||
// intToStringSlice converts a slice of integers to a slice of strings.
|
||||
// Used to convert hardware thread/core/socket IDs from topology (int) to APIQuery TypeIds (string).
|
||||
//
|
||||
// Optimized to reuse a byte buffer for string conversion, reducing allocations.
|
||||
func intToStringSlice(is []int) []string {
|
||||
if len(is) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
ss := make([]string, len(is))
|
||||
buf := make([]byte, 0, 16) // Reusable buffer for integer conversion
|
||||
for i, x := range is {
|
||||
ss[i] = strconv.Itoa(x)
|
||||
buf = strconv.AppendInt(buf[:0], int64(x), 10)
|
||||
ss[i] = string(buf)
|
||||
}
|
||||
return ss
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user