From 845d0111af29376086aeb93fee082c4fbb6e9efc Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Wed, 4 Mar 2026 17:31:36 +0100 Subject: [PATCH] Further consolidate and improve ccms query builder Entire-Checkpoint: d10e6221ee4f --- .../cc-metric-store-queries.go | 30 ++---- internal/metricstoreclient/cc-metric-store.go | 65 +++--------- pkg/metricstore/query.go | 99 +++++-------------- pkg/metricstore/scopequery.go | 27 +++++ 4 files changed, 69 insertions(+), 152 deletions(-) diff --git a/internal/metricstoreclient/cc-metric-store-queries.go b/internal/metricstoreclient/cc-metric-store-queries.go index b8e3a94a..1119d70c 100644 --- a/internal/metricstoreclient/cc-metric-store-queries.go +++ b/internal/metricstoreclient/cc-metric-store-queries.go @@ -79,17 +79,8 @@ func (ccms *CCMetricStore) buildQueries( } // Skip if metric is removed for subcluster - if len(mc.SubClusters) != 0 { - isRemoved := false - for _, scConfig := range mc.SubClusters { - if scConfig.Name == job.SubCluster && scConfig.Remove { - isRemoved = true - break - } - } - if isRemoved { - continue - } + if len(mc.SubClusters) != 0 && metricstore.IsMetricRemovedForSubCluster(mc, job.SubCluster) { + continue } // Avoid duplicates... @@ -123,7 +114,7 @@ func (ccms *CCMetricStore) buildQueries( ) if !ok { - return nil, nil, fmt.Errorf("METRICDATA/EXTERNAL-CCMS > TODO: unhandled case: native-scope=%s, requested-scope=%s", nativeScope, requestedScope) + return nil, nil, fmt.Errorf("METRICDATA/EXTERNAL-CCMS > unsupported scope transformation: native-scope=%s, requested-scope=%s", nativeScope, requestedScope) } for _, sr := range scopeResults { @@ -175,17 +166,8 @@ func (ccms *CCMetricStore) buildNodeQueries( } // Skip if metric is removed for subcluster - if mc.SubClusters != nil { - isRemoved := false - for _, scConfig := range mc.SubClusters { - if scConfig.Name == subCluster && scConfig.Remove { - isRemoved = true - break - } - } - if isRemoved { - continue - } + if mc.SubClusters != nil && metricstore.IsMetricRemovedForSubCluster(mc, subCluster) { + continue } // Avoid duplicates... @@ -234,7 +216,7 @@ func (ccms *CCMetricStore) buildNodeQueries( ) if !ok { - return nil, nil, fmt.Errorf("METRICDATA/EXTERNAL-CCMS > TODO: unhandled case: native-scope=%s, requested-scope=%s", nativeScope, requestedScope) + return nil, nil, fmt.Errorf("METRICDATA/EXTERNAL-CCMS > unsupported scope transformation: native-scope=%s, requested-scope=%s", nativeScope, requestedScope) } for _, sr := range scopeResults { diff --git a/internal/metricstoreclient/cc-metric-store.go b/internal/metricstoreclient/cc-metric-store.go index 2f13ade6..55dc7fb5 100644 --- a/internal/metricstoreclient/cc-metric-store.go +++ b/internal/metricstoreclient/cc-metric-store.go @@ -275,13 +275,6 @@ func (ccms *CCMetricStore) LoadData( } for i, row := range resBody.Results { - // Safety check to prevent index out of range errors - if i >= len(req.Queries) || i >= len(assignedScope) { - cclog.Warnf("Index out of range prevented: i=%d, queries=%d, assignedScope=%d", - i, len(req.Queries), len(assignedScope)) - continue - } - query := req.Queries[i] metric := query.Metric scope := assignedScope[i] @@ -318,18 +311,7 @@ func (ccms *CCMetricStore) LoadData( continue } - id := (*string)(nil) - if query.Type != nil { - // Check if ndx is within the bounds of TypeIds slice - if ndx < len(query.TypeIds) { - id = new(string) - *id = query.TypeIds[ndx] - } else { - // Log the error but continue processing - cclog.Warnf("TypeIds index out of range: %d with length %d for metric %s on host %s", - ndx, len(query.TypeIds), query.Metric, query.Hostname) - } - } + id := ms.ExtractTypeID(query.Type, query.TypeIds, ndx, query.Metric, query.Hostname) ms.SanitizeStats(&res.Avg, &res.Min, &res.Max) @@ -393,6 +375,10 @@ func (ccms *CCMetricStore) LoadStats( stats := make(map[string]map[string]schema.MetricStatistics, len(metrics)) for i, res := range resBody.Results { + if i >= len(req.Queries) { + cclog.Warnf("LoadStats: result index %d exceeds queries length %d", i, len(req.Queries)) + break + } if len(res) == 0 { // No Data Found For Metric, Logged in FetchData to Warn continue @@ -481,18 +467,7 @@ func (ccms *CCMetricStore) LoadScopedStats( continue } - id := (*string)(nil) - if query.Type != nil { - // Check if ndx is within the bounds of TypeIds slice - if ndx < len(query.TypeIds) { - id = new(string) - *id = query.TypeIds[ndx] - } else { - // Log the error but continue processing - cclog.Warnf("TypeIds index out of range: %d with length %d for metric %s on host %s", - ndx, len(query.TypeIds), query.Metric, query.Hostname) - } - } + id := ms.ExtractTypeID(query.Type, query.TypeIds, ndx, query.Metric, query.Hostname) ms.SanitizeStats(&res.Avg, &res.Min, &res.Max) @@ -582,6 +557,13 @@ func (ccms *CCMetricStore) LoadNodeData( qdata := res[0] if qdata.Error != nil { errors = append(errors, fmt.Sprintf("fetching %s for node %s failed: %s", metric, query.Hostname, *qdata.Error)) + continue + } + + mc := archive.GetMetricConfig(cluster, metric) + if mc == nil { + cclog.Warnf("Metric config not found for %s on cluster %s", metric, cluster) + continue } ms.SanitizeStats(&qdata.Avg, &qdata.Min, &qdata.Max) @@ -592,7 +574,6 @@ func (ccms *CCMetricStore) LoadNodeData( data[query.Hostname] = hostdata } - mc := archive.GetMetricConfig(cluster, metric) hostdata[metric] = append(hostdata[metric], &schema.JobMetric{ Unit: mc.Unit, Timestep: mc.Timestep, @@ -680,13 +661,6 @@ func (ccms *CCMetricStore) LoadNodeListData( } for i, row := range resBody.Results { - // Safety check to prevent index out of range errors - if i >= len(req.Queries) || i >= len(assignedScope) { - cclog.Warnf("Index out of range prevented: i=%d, queries=%d, assignedScope=%d", - i, len(req.Queries), len(assignedScope)) - continue - } - var query APIQuery if resBody.Queries != nil { if i < len(resBody.Queries) { @@ -743,18 +717,7 @@ func (ccms *CCMetricStore) LoadNodeListData( continue } - id := (*string)(nil) - if query.Type != nil { - // Check if ndx is within the bounds of TypeIds slice - if ndx < len(query.TypeIds) { - id = new(string) - *id = query.TypeIds[ndx] - } else { - // Log the error but continue processing - cclog.Warnf("TypeIds index out of range: %d with length %d for metric %s on host %s", - ndx, len(query.TypeIds), query.Metric, query.Hostname) - } - } + id := ms.ExtractTypeID(query.Type, query.TypeIds, ndx, query.Metric, query.Hostname) ms.SanitizeStats(&res.Avg, &res.Min, &res.Max) diff --git a/pkg/metricstore/query.go b/pkg/metricstore/query.go index ed55521f..8a349b5a 100644 --- a/pkg/metricstore/query.go +++ b/pkg/metricstore/query.go @@ -129,13 +129,6 @@ func (ccms *InternalMetricStore) LoadData( } for i, row := range resBody.Results { - // Safety check to prevent index out of range errors - if i >= len(req.Queries) || i >= len(assignedScope) { - cclog.Warnf("Index out of range prevented: i=%d, queries=%d, assignedScope=%d", - i, len(req.Queries), len(assignedScope)) - continue - } - query := req.Queries[i] metric := query.Metric scope := assignedScope[i] @@ -172,18 +165,7 @@ func (ccms *InternalMetricStore) LoadData( continue } - id := (*string)(nil) - if query.Type != nil { - // Check if ndx is within the bounds of TypeIds slice - if ndx < len(query.TypeIds) { - id = new(string) - *id = query.TypeIds[ndx] - } else { - // Log the error but continue processing - cclog.Warnf("TypeIds index out of range: %d with length %d for metric %s on host %s", - ndx, len(query.TypeIds), query.Metric, query.Hostname) - } - } + id := ExtractTypeID(query.Type, query.TypeIds, ndx, query.Metric, query.Hostname) SanitizeStats(&res.Avg, &res.Min, &res.Max) @@ -251,7 +233,7 @@ func buildQueries( } queries := make([]APIQuery, 0, len(metrics)*len(scopes)*len(job.Resources)) - assignedScope := []schema.MetricScope{} + assignedScope := make([]schema.MetricScope, 0, len(metrics)*len(scopes)*len(job.Resources)) subcluster, scerr := archive.GetSubCluster(job.Cluster, job.SubCluster) if scerr != nil { @@ -267,17 +249,8 @@ func buildQueries( } // Skip if metric is removed for subcluster - if len(mc.SubClusters) != 0 { - isRemoved := false - for _, scConfig := range mc.SubClusters { - if scConfig.Name == job.SubCluster && scConfig.Remove { - isRemoved = true - break - } - } - if isRemoved { - continue - } + if len(mc.SubClusters) != 0 && IsMetricRemovedForSubCluster(mc, job.SubCluster) { + continue } // Avoid duplicates... @@ -311,7 +284,7 @@ func buildQueries( ) if !ok { - return nil, nil, fmt.Errorf("METRICDATA/INTERNAL-CCMS > TODO: unhandled case: native-scope=%s, requested-scope=%s", nativeScope, requestedScope) + return nil, nil, fmt.Errorf("METRICDATA/INTERNAL-CCMS > unsupported scope transformation: native-scope=%s, requested-scope=%s", nativeScope, requestedScope) } for _, sr := range scopeResults { @@ -374,6 +347,10 @@ func (ccms *InternalMetricStore) LoadStats( stats := make(map[string]map[string]schema.MetricStatistics, len(metrics)) for i, res := range resBody.Results { + if i >= len(req.Queries) { + cclog.Warnf("LoadStats: result index %d exceeds queries length %d", i, len(req.Queries)) + break + } if len(res) == 0 { // No Data Found For Metric, Logged in FetchData to Warn continue @@ -475,18 +452,7 @@ func (ccms *InternalMetricStore) LoadScopedStats( continue } - id := (*string)(nil) - if query.Type != nil { - // Check if ndx is within the bounds of TypeIds slice - if ndx < len(query.TypeIds) { - id = new(string) - *id = query.TypeIds[ndx] - } else { - // Log the error but continue processing - cclog.Warnf("TypeIds index out of range: %d with length %d for metric %s on host %s", - ndx, len(query.TypeIds), query.Metric, query.Hostname) - } - } + id := ExtractTypeID(query.Type, query.TypeIds, ndx, query.Metric, query.Hostname) SanitizeStats(&res.Avg, &res.Min, &res.Max) @@ -587,6 +553,13 @@ func (ccms *InternalMetricStore) LoadNodeData( qdata := res[0] if qdata.Error != nil { errors = append(errors, fmt.Sprintf("fetching %s for node %s failed: %s", metric, query.Hostname, *qdata.Error)) + continue + } + + mc := archive.GetMetricConfig(cluster, metric) + if mc == nil { + cclog.Warnf("Metric config not found for %s on cluster %s", metric, cluster) + continue } SanitizeStats(&qdata.Avg, &qdata.Min, &qdata.Max) @@ -597,7 +570,6 @@ func (ccms *InternalMetricStore) LoadNodeData( data[query.Hostname] = hostdata } - mc := archive.GetMetricConfig(cluster, metric) hostdata[metric] = append(hostdata[metric], &schema.JobMetric{ Unit: mc.Unit, Timestep: mc.Timestep, @@ -694,13 +666,6 @@ func (ccms *InternalMetricStore) LoadNodeListData( } for i, row := range resBody.Results { - // Safety check to prevent index out of range errors - if i >= len(req.Queries) || i >= len(assignedScope) { - cclog.Warnf("Index out of range prevented: i=%d, queries=%d, assignedScope=%d", - i, len(req.Queries), len(assignedScope)) - continue - } - var query APIQuery if resBody.Queries != nil { if i < len(resBody.Queries) { @@ -757,18 +722,7 @@ func (ccms *InternalMetricStore) LoadNodeListData( continue } - id := (*string)(nil) - if query.Type != nil { - // Check if ndx is within the bounds of TypeIds slice - if ndx < len(query.TypeIds) { - id = new(string) - *id = query.TypeIds[ndx] - } else { - // Log the error but continue processing - cclog.Warnf("TypeIds index out of range: %d with length %d for metric %s on host %s", - ndx, len(query.TypeIds), query.Metric, query.Hostname) - } - } + id := ExtractTypeID(query.Type, query.TypeIds, ndx, query.Metric, query.Hostname) SanitizeStats(&res.Avg, &res.Min, &res.Max) @@ -819,7 +773,7 @@ func buildNodeQueries( resolution int64, ) ([]APIQuery, []schema.MetricScope, error) { queries := make([]APIQuery, 0, len(metrics)*len(scopes)*len(nodes)) - assignedScope := []schema.MetricScope{} + assignedScope := make([]schema.MetricScope, 0, len(metrics)*len(scopes)*len(nodes)) // Get Topol before loop if subCluster given var subClusterTopol *schema.SubCluster @@ -840,17 +794,8 @@ func buildNodeQueries( } // Skip if metric is removed for subcluster - if mc.SubClusters != nil { - isRemoved := false - for _, scConfig := range mc.SubClusters { - if scConfig.Name == subCluster && scConfig.Remove { - isRemoved = true - break - } - } - if isRemoved { - continue - } + if mc.SubClusters != nil && IsMetricRemovedForSubCluster(mc, subCluster) { + continue } // Avoid duplicates... @@ -898,7 +843,7 @@ func buildNodeQueries( ) if !ok { - return nil, nil, fmt.Errorf("METRICDATA/INTERNAL-CCMS > TODO: unhandled case: native-scope=%s, requested-scope=%s", nativeScope, requestedScope) + return nil, nil, fmt.Errorf("METRICDATA/INTERNAL-CCMS > unsupported scope transformation: native-scope=%s, requested-scope=%s", nativeScope, requestedScope) } for _, sr := range scopeResults { diff --git a/pkg/metricstore/scopequery.go b/pkg/metricstore/scopequery.go index a414b794..a01a9cc6 100644 --- a/pkg/metricstore/scopequery.go +++ b/pkg/metricstore/scopequery.go @@ -303,6 +303,33 @@ func IntToStringSlice(is []int) []string { return ss } +// ExtractTypeID returns the type ID at the given index from a query's TypeIds slice. +// Returns nil if queryType is nil (no type filtering). Logs a warning and returns nil +// if the index is out of range. +func ExtractTypeID(queryType *string, typeIds []string, ndx int, metric, hostname string) *string { + if queryType == nil { + return nil + } + if ndx < len(typeIds) { + id := typeIds[ndx] + return &id + } + cclog.Warnf("TypeIds index out of range: %d with length %d for metric %s on host %s", + ndx, len(typeIds), metric, hostname) + return nil +} + +// IsMetricRemovedForSubCluster checks whether a metric is marked as removed +// for the given subcluster in its per-subcluster configuration. +func IsMetricRemovedForSubCluster(mc *schema.MetricConfig, subCluster string) bool { + for _, scConfig := range mc.SubClusters { + if scConfig.Name == subCluster && scConfig.Remove { + return true + } + } + return false +} + // SanitizeStats replaces NaN values in statistics with 0 to enable JSON marshaling. // If ANY of avg/min/max is NaN, ALL three are zeroed for consistency. func SanitizeStats(avg, min, max *schema.Float) {