mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2026-03-05 07:37:30 +01:00
Further consolidate and improve ccms query builder
Entire-Checkpoint: d10e6221ee4f
This commit is contained in:
@@ -129,13 +129,6 @@ func (ccms *InternalMetricStore) LoadData(
|
||||
}
|
||||
|
||||
for i, row := range resBody.Results {
|
||||
// Safety check to prevent index out of range errors
|
||||
if i >= len(req.Queries) || i >= len(assignedScope) {
|
||||
cclog.Warnf("Index out of range prevented: i=%d, queries=%d, assignedScope=%d",
|
||||
i, len(req.Queries), len(assignedScope))
|
||||
continue
|
||||
}
|
||||
|
||||
query := req.Queries[i]
|
||||
metric := query.Metric
|
||||
scope := assignedScope[i]
|
||||
@@ -172,18 +165,7 @@ func (ccms *InternalMetricStore) LoadData(
|
||||
continue
|
||||
}
|
||||
|
||||
id := (*string)(nil)
|
||||
if query.Type != nil {
|
||||
// Check if ndx is within the bounds of TypeIds slice
|
||||
if ndx < len(query.TypeIds) {
|
||||
id = new(string)
|
||||
*id = query.TypeIds[ndx]
|
||||
} else {
|
||||
// Log the error but continue processing
|
||||
cclog.Warnf("TypeIds index out of range: %d with length %d for metric %s on host %s",
|
||||
ndx, len(query.TypeIds), query.Metric, query.Hostname)
|
||||
}
|
||||
}
|
||||
id := ExtractTypeID(query.Type, query.TypeIds, ndx, query.Metric, query.Hostname)
|
||||
|
||||
SanitizeStats(&res.Avg, &res.Min, &res.Max)
|
||||
|
||||
@@ -251,7 +233,7 @@ func buildQueries(
|
||||
}
|
||||
|
||||
queries := make([]APIQuery, 0, len(metrics)*len(scopes)*len(job.Resources))
|
||||
assignedScope := []schema.MetricScope{}
|
||||
assignedScope := make([]schema.MetricScope, 0, len(metrics)*len(scopes)*len(job.Resources))
|
||||
|
||||
subcluster, scerr := archive.GetSubCluster(job.Cluster, job.SubCluster)
|
||||
if scerr != nil {
|
||||
@@ -267,17 +249,8 @@ func buildQueries(
|
||||
}
|
||||
|
||||
// Skip if metric is removed for subcluster
|
||||
if len(mc.SubClusters) != 0 {
|
||||
isRemoved := false
|
||||
for _, scConfig := range mc.SubClusters {
|
||||
if scConfig.Name == job.SubCluster && scConfig.Remove {
|
||||
isRemoved = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if isRemoved {
|
||||
continue
|
||||
}
|
||||
if len(mc.SubClusters) != 0 && IsMetricRemovedForSubCluster(mc, job.SubCluster) {
|
||||
continue
|
||||
}
|
||||
|
||||
// Avoid duplicates...
|
||||
@@ -311,7 +284,7 @@ func buildQueries(
|
||||
)
|
||||
|
||||
if !ok {
|
||||
return nil, nil, fmt.Errorf("METRICDATA/INTERNAL-CCMS > TODO: unhandled case: native-scope=%s, requested-scope=%s", nativeScope, requestedScope)
|
||||
return nil, nil, fmt.Errorf("METRICDATA/INTERNAL-CCMS > unsupported scope transformation: native-scope=%s, requested-scope=%s", nativeScope, requestedScope)
|
||||
}
|
||||
|
||||
for _, sr := range scopeResults {
|
||||
@@ -374,6 +347,10 @@ func (ccms *InternalMetricStore) LoadStats(
|
||||
|
||||
stats := make(map[string]map[string]schema.MetricStatistics, len(metrics))
|
||||
for i, res := range resBody.Results {
|
||||
if i >= len(req.Queries) {
|
||||
cclog.Warnf("LoadStats: result index %d exceeds queries length %d", i, len(req.Queries))
|
||||
break
|
||||
}
|
||||
if len(res) == 0 {
|
||||
// No Data Found For Metric, Logged in FetchData to Warn
|
||||
continue
|
||||
@@ -475,18 +452,7 @@ func (ccms *InternalMetricStore) LoadScopedStats(
|
||||
continue
|
||||
}
|
||||
|
||||
id := (*string)(nil)
|
||||
if query.Type != nil {
|
||||
// Check if ndx is within the bounds of TypeIds slice
|
||||
if ndx < len(query.TypeIds) {
|
||||
id = new(string)
|
||||
*id = query.TypeIds[ndx]
|
||||
} else {
|
||||
// Log the error but continue processing
|
||||
cclog.Warnf("TypeIds index out of range: %d with length %d for metric %s on host %s",
|
||||
ndx, len(query.TypeIds), query.Metric, query.Hostname)
|
||||
}
|
||||
}
|
||||
id := ExtractTypeID(query.Type, query.TypeIds, ndx, query.Metric, query.Hostname)
|
||||
|
||||
SanitizeStats(&res.Avg, &res.Min, &res.Max)
|
||||
|
||||
@@ -587,6 +553,13 @@ func (ccms *InternalMetricStore) LoadNodeData(
|
||||
qdata := res[0]
|
||||
if qdata.Error != nil {
|
||||
errors = append(errors, fmt.Sprintf("fetching %s for node %s failed: %s", metric, query.Hostname, *qdata.Error))
|
||||
continue
|
||||
}
|
||||
|
||||
mc := archive.GetMetricConfig(cluster, metric)
|
||||
if mc == nil {
|
||||
cclog.Warnf("Metric config not found for %s on cluster %s", metric, cluster)
|
||||
continue
|
||||
}
|
||||
|
||||
SanitizeStats(&qdata.Avg, &qdata.Min, &qdata.Max)
|
||||
@@ -597,7 +570,6 @@ func (ccms *InternalMetricStore) LoadNodeData(
|
||||
data[query.Hostname] = hostdata
|
||||
}
|
||||
|
||||
mc := archive.GetMetricConfig(cluster, metric)
|
||||
hostdata[metric] = append(hostdata[metric], &schema.JobMetric{
|
||||
Unit: mc.Unit,
|
||||
Timestep: mc.Timestep,
|
||||
@@ -694,13 +666,6 @@ func (ccms *InternalMetricStore) LoadNodeListData(
|
||||
}
|
||||
|
||||
for i, row := range resBody.Results {
|
||||
// Safety check to prevent index out of range errors
|
||||
if i >= len(req.Queries) || i >= len(assignedScope) {
|
||||
cclog.Warnf("Index out of range prevented: i=%d, queries=%d, assignedScope=%d",
|
||||
i, len(req.Queries), len(assignedScope))
|
||||
continue
|
||||
}
|
||||
|
||||
var query APIQuery
|
||||
if resBody.Queries != nil {
|
||||
if i < len(resBody.Queries) {
|
||||
@@ -757,18 +722,7 @@ func (ccms *InternalMetricStore) LoadNodeListData(
|
||||
continue
|
||||
}
|
||||
|
||||
id := (*string)(nil)
|
||||
if query.Type != nil {
|
||||
// Check if ndx is within the bounds of TypeIds slice
|
||||
if ndx < len(query.TypeIds) {
|
||||
id = new(string)
|
||||
*id = query.TypeIds[ndx]
|
||||
} else {
|
||||
// Log the error but continue processing
|
||||
cclog.Warnf("TypeIds index out of range: %d with length %d for metric %s on host %s",
|
||||
ndx, len(query.TypeIds), query.Metric, query.Hostname)
|
||||
}
|
||||
}
|
||||
id := ExtractTypeID(query.Type, query.TypeIds, ndx, query.Metric, query.Hostname)
|
||||
|
||||
SanitizeStats(&res.Avg, &res.Min, &res.Max)
|
||||
|
||||
@@ -819,7 +773,7 @@ func buildNodeQueries(
|
||||
resolution int64,
|
||||
) ([]APIQuery, []schema.MetricScope, error) {
|
||||
queries := make([]APIQuery, 0, len(metrics)*len(scopes)*len(nodes))
|
||||
assignedScope := []schema.MetricScope{}
|
||||
assignedScope := make([]schema.MetricScope, 0, len(metrics)*len(scopes)*len(nodes))
|
||||
|
||||
// Get Topol before loop if subCluster given
|
||||
var subClusterTopol *schema.SubCluster
|
||||
@@ -840,17 +794,8 @@ func buildNodeQueries(
|
||||
}
|
||||
|
||||
// Skip if metric is removed for subcluster
|
||||
if mc.SubClusters != nil {
|
||||
isRemoved := false
|
||||
for _, scConfig := range mc.SubClusters {
|
||||
if scConfig.Name == subCluster && scConfig.Remove {
|
||||
isRemoved = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if isRemoved {
|
||||
continue
|
||||
}
|
||||
if mc.SubClusters != nil && IsMetricRemovedForSubCluster(mc, subCluster) {
|
||||
continue
|
||||
}
|
||||
|
||||
// Avoid duplicates...
|
||||
@@ -898,7 +843,7 @@ func buildNodeQueries(
|
||||
)
|
||||
|
||||
if !ok {
|
||||
return nil, nil, fmt.Errorf("METRICDATA/INTERNAL-CCMS > TODO: unhandled case: native-scope=%s, requested-scope=%s", nativeScope, requestedScope)
|
||||
return nil, nil, fmt.Errorf("METRICDATA/INTERNAL-CCMS > unsupported scope transformation: native-scope=%s, requested-scope=%s", nativeScope, requestedScope)
|
||||
}
|
||||
|
||||
for _, sr := range scopeResults {
|
||||
|
||||
@@ -303,6 +303,33 @@ func IntToStringSlice(is []int) []string {
|
||||
return ss
|
||||
}
|
||||
|
||||
// ExtractTypeID returns the type ID at the given index from a query's TypeIds slice.
|
||||
// Returns nil if queryType is nil (no type filtering). Logs a warning and returns nil
|
||||
// if the index is out of range.
|
||||
func ExtractTypeID(queryType *string, typeIds []string, ndx int, metric, hostname string) *string {
|
||||
if queryType == nil {
|
||||
return nil
|
||||
}
|
||||
if ndx < len(typeIds) {
|
||||
id := typeIds[ndx]
|
||||
return &id
|
||||
}
|
||||
cclog.Warnf("TypeIds index out of range: %d with length %d for metric %s on host %s",
|
||||
ndx, len(typeIds), metric, hostname)
|
||||
return nil
|
||||
}
|
||||
|
||||
// IsMetricRemovedForSubCluster checks whether a metric is marked as removed
|
||||
// for the given subcluster in its per-subcluster configuration.
|
||||
func IsMetricRemovedForSubCluster(mc *schema.MetricConfig, subCluster string) bool {
|
||||
for _, scConfig := range mc.SubClusters {
|
||||
if scConfig.Name == subCluster && scConfig.Remove {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// SanitizeStats replaces NaN values in statistics with 0 to enable JSON marshaling.
|
||||
// If ANY of avg/min/max is NaN, ALL three are zeroed for consistency.
|
||||
func SanitizeStats(avg, min, max *schema.Float) {
|
||||
|
||||
Reference in New Issue
Block a user