migrate changes from cc-backend PR#364

This commit is contained in:
Christoph Kluge
2026-02-20 15:10:02 +01:00
parent e1c1148160
commit bae7ec11b4
4 changed files with 266 additions and 53 deletions

View File

@@ -71,10 +71,11 @@ import (
// CCMetricStore is the HTTP client for communicating with cc-metric-store.
// It manages connection details, authentication, and provides methods for querying metrics.
type CCMetricStore struct {
client http.Client // HTTP client with 10-second timeout
jwt string // JWT Bearer token for authentication
url string // Base URL of cc-metric-store instance
queryEndpoint string // Full URL to query API endpoint
client http.Client // HTTP client with 10-second timeout
jwt string // JWT Bearer token for authentication
url string // Base URL of cc-metric-store instance
queryEndpoint string // Full URL to query API endpoint
topologyCache map[string]*schema.Topology // cluster -> topology cache
}
// APIQueryRequest represents a request to the cc-metric-store query API.
@@ -133,6 +134,7 @@ func NewCCMetricStore(url string, token string) *CCMetricStore {
client: http.Client{
Timeout: 10 * time.Second,
},
topologyCache: make(map[string]*schema.Topology),
}
}
@@ -185,6 +187,32 @@ func (ccms *CCMetricStore) doRequest(
return &resBody, nil
}
// getTopology returns the topology for a given cluster and subcluster, caching it if not already present
func (ccms *CCMetricStore) getTopology(cluster, subCluster string) (*schema.Topology, error) {
cacheKey := fmt.Sprintf("%s:%s", cluster, subCluster)
if topology, ok := ccms.topologyCache[cacheKey]; ok {
return topology, nil
}
subcluster, err := archive.GetSubCluster(cluster, subCluster)
if err != nil {
return nil, err
}
ccms.topologyCache[cacheKey] = &subcluster.Topology
return &subcluster.Topology, nil
}
// getTopologyByNode returns the topology for a given cluster and node, caching it if not already present
func (ccms *CCMetricStore) getTopologyByNode(cluster, node string) (*schema.Topology, error) {
subCluster, err := archive.GetSubClusterByNode(cluster, node)
if err != nil {
return nil, err
}
return ccms.getTopology(cluster, subCluster)
}
// LoadData retrieves time series data and statistics for the specified job and metrics.
// It queries data for the job's time range and resources, handling scope transformations automatically.
//
@@ -210,6 +238,12 @@ func (ccms *CCMetricStore) LoadData(
return nil, err
}
// Verify assignment is correct - log any inconsistencies for debugging
if len(queries) != len(assignedScope) {
cclog.Errorf("Critical error: queries and assignedScope have different lengths after buildQueries: %d vs %d",
len(queries), len(assignedScope))
}
req := APIQueryRequest{
Cluster: job.Cluster,
From: job.StartTime,
@@ -227,11 +261,37 @@ func (ccms *CCMetricStore) LoadData(
var errors []string
jobData := make(schema.JobData)
// Add safety check for potential index out of range errors
if len(resBody.Results) != len(req.Queries) || len(assignedScope) != len(req.Queries) {
cclog.Warnf("Mismatch in query results count: queries=%d, results=%d, assignedScope=%d",
len(req.Queries), len(resBody.Results), len(assignedScope))
if len(resBody.Results) > len(req.Queries) {
resBody.Results = resBody.Results[:len(req.Queries)]
}
if len(assignedScope) > len(req.Queries) {
assignedScope = assignedScope[:len(req.Queries)]
}
}
for i, row := range resBody.Results {
// Safety check to prevent index out of range errors
if i >= len(req.Queries) || i >= len(assignedScope) {
cclog.Warnf("Index out of range prevented: i=%d, queries=%d, assignedScope=%d",
i, len(req.Queries), len(assignedScope))
continue
}
query := req.Queries[i]
metric := query.Metric
scope := assignedScope[i]
mc := archive.GetMetricConfig(job.Cluster, metric)
if mc == nil {
cclog.Warnf("Metric config not found for %s on cluster %s", metric, job.Cluster)
continue
}
if _, ok := jobData[metric]; !ok {
jobData[metric] = make(map[schema.MetricScope]*schema.JobMetric)
}
@@ -260,8 +320,15 @@ func (ccms *CCMetricStore) LoadData(
id := (*string)(nil)
if query.Type != nil {
id = new(string)
*id = query.TypeIds[ndx]
// Check if ndx is within the bounds of TypeIds slice
if ndx < len(query.TypeIds) {
id = new(string)
*id = query.TypeIds[ndx]
} else {
// Log the error but continue processing
cclog.Warnf("TypeIds index out of range: %d with length %d for metric %s on host %s",
ndx, len(query.TypeIds), query.Metric, query.Hostname)
}
}
sanitizeStats(&res.Avg, &res.Min, &res.Max)
@@ -412,8 +479,15 @@ func (ccms *CCMetricStore) LoadScopedStats(
id := (*string)(nil)
if query.Type != nil {
id = new(string)
*id = query.TypeIds[ndx]
// Check if ndx is within the bounds of TypeIds slice
if ndx < len(query.TypeIds) {
id = new(string)
*id = query.TypeIds[ndx]
} else {
// Log the error but continue processing
cclog.Warnf("TypeIds index out of range: %d with length %d for metric %s on host %s",
ndx, len(query.TypeIds), query.Metric, query.Hostname)
}
}
sanitizeStats(&res.Avg, &res.Min, &res.Max)
@@ -561,6 +635,12 @@ func (ccms *CCMetricStore) LoadNodeListData(
return nil, err
}
// Verify assignment is correct - log any inconsistencies for debugging
if len(queries) != len(assignedScope) {
cclog.Errorf("Critical error: queries and assignedScope have different lengths after buildNodeQueries: %d vs %d",
len(queries), len(assignedScope))
}
req := APIQueryRequest{
Cluster: cluster,
Queries: queries,
@@ -578,17 +658,47 @@ func (ccms *CCMetricStore) LoadNodeListData(
var errors []string
data := make(map[string]schema.JobData)
// Add safety check for index out of range issues
if len(resBody.Results) != len(req.Queries) || len(assignedScope) != len(req.Queries) {
cclog.Warnf("Mismatch in query results count: queries=%d, results=%d, assignedScope=%d",
len(req.Queries), len(resBody.Results), len(assignedScope))
if len(resBody.Results) > len(req.Queries) {
resBody.Results = resBody.Results[:len(req.Queries)]
}
if len(assignedScope) > len(req.Queries) {
assignedScope = assignedScope[:len(req.Queries)]
}
}
for i, row := range resBody.Results {
// Safety check to prevent index out of range errors
if i >= len(req.Queries) || i >= len(assignedScope) {
cclog.Warnf("Index out of range prevented: i=%d, queries=%d, assignedScope=%d",
i, len(req.Queries), len(assignedScope))
continue
}
var query APIQuery
if resBody.Queries != nil {
query = resBody.Queries[i]
if i < len(resBody.Queries) {
query = resBody.Queries[i]
} else {
cclog.Warnf("Index out of range prevented for resBody.Queries: i=%d, len=%d",
i, len(resBody.Queries))
continue
}
} else {
query = req.Queries[i]
}
// qdata := res[0]
metric := query.Metric
scope := assignedScope[i]
mc := archive.GetMetricConfig(cluster, metric)
if mc == nil {
cclog.Warnf("Metric config not found for %s on cluster %s", metric, cluster)
continue
}
res := mc.Timestep
if len(row) > 0 {
@@ -627,8 +737,15 @@ func (ccms *CCMetricStore) LoadNodeListData(
id := (*string)(nil)
if query.Type != nil {
id = new(string)
*id = query.TypeIds[ndx]
// Check if ndx is within the bounds of TypeIds slice
if ndx < len(query.TypeIds) {
id = new(string)
*id = query.TypeIds[ndx]
} else {
// Log the error but continue processing
cclog.Warnf("TypeIds index out of range: %d with length %d for metric %s on host %s",
ndx, len(query.TypeIds), query.Metric, query.Hostname)
}
}
sanitizeStats(&res.Avg, &res.Min, &res.Max)