Merge branch 'dev' of github.com:ClusterCockpit/cc-backend into dev

This commit is contained in:
brinkcoder
2025-03-14 10:52:39 +01:00
23 changed files with 665 additions and 292 deletions

View File

@@ -40,6 +40,7 @@ type CCMetricStore struct {
jwt string
url string
queryEndpoint string
topologyCache map[string]*schema.Topology // cluster -> topology cache
}
type ApiQueryRequest struct {
@@ -92,6 +93,7 @@ func (ccms *CCMetricStore) Init(rawConfig json.RawMessage) error {
ccms.client = http.Client{
Timeout: 10 * time.Second,
}
ccms.topologyCache = make(map[string]*schema.Topology)
if config.Renamings != nil {
ccms.here2there = config.Renamings
@@ -181,6 +183,12 @@ func (ccms *CCMetricStore) LoadData(
return nil, err
}
// Verify assignment is correct - log any inconsistencies for debugging
if len(queries) != len(assignedScope) {
log.Errorf("Critical error: queries and assignedScope have different lengths after buildQueries: %d vs %d",
len(queries), len(assignedScope))
}
req := ApiQueryRequest{
Cluster: job.Cluster,
From: job.StartTime.Unix(),
@@ -198,11 +206,36 @@ func (ccms *CCMetricStore) LoadData(
var errors []string
jobData := make(schema.JobData)
// Add safety check for potential index out of range errors
if len(resBody.Results) != len(req.Queries) || len(assignedScope) != len(req.Queries) {
log.Warnf("Mismatch in query results count: queries=%d, results=%d, assignedScope=%d",
len(req.Queries), len(resBody.Results), len(assignedScope))
if len(resBody.Results) > len(req.Queries) {
resBody.Results = resBody.Results[:len(req.Queries)]
}
if len(assignedScope) > len(req.Queries) {
assignedScope = assignedScope[:len(req.Queries)]
}
}
for i, row := range resBody.Results {
// Safety check to prevent index out of range errors
if i >= len(req.Queries) || i >= len(assignedScope) {
log.Warnf("Index out of range prevented: i=%d, queries=%d, assignedScope=%d",
i, len(req.Queries), len(assignedScope))
continue
}
query := req.Queries[i]
metric := ccms.toLocalName(query.Metric)
scope := assignedScope[i]
mc := archive.GetMetricConfig(job.Cluster, metric)
if mc == nil {
log.Warnf("Metric config not found for %s on cluster %s", metric, job.Cluster)
continue
}
if _, ok := jobData[metric]; !ok {
jobData[metric] = make(map[schema.MetricScope]*schema.JobMetric)
}
@@ -231,8 +264,15 @@ func (ccms *CCMetricStore) LoadData(
id := (*string)(nil)
if query.Type != nil {
id = new(string)
*id = query.TypeIds[ndx]
// Check if ndx is within the bounds of TypeIds slice
if ndx < len(query.TypeIds) {
id = new(string)
*id = query.TypeIds[ndx]
} else {
// Log the error but continue processing
log.Warnf("TypeIds index out of range: %d with length %d for metric %s on host %s",
ndx, len(query.TypeIds), query.Metric, query.Hostname)
}
}
if res.Avg.IsNaN() || res.Min.IsNaN() || res.Max.IsNaN() {
@@ -284,20 +324,19 @@ func (ccms *CCMetricStore) buildQueries(
scopes []schema.MetricScope,
resolution int,
) ([]ApiQuery, []schema.MetricScope, error) {
// Initialize both slices together
queries := make([]ApiQuery, 0, len(metrics)*len(scopes)*len(job.Resources))
assignedScope := []schema.MetricScope{}
assignedScope := make([]schema.MetricScope, 0, len(metrics)*len(scopes)*len(job.Resources))
subcluster, scerr := archive.GetSubCluster(job.Cluster, job.SubCluster)
if scerr != nil {
return nil, nil, scerr
topology, err := ccms.getTopology(job.Cluster, job.SubCluster)
if err != nil {
return nil, nil, err
}
topology := subcluster.Topology
for _, metric := range metrics {
remoteName := ccms.toRemoteName(metric)
mc := archive.GetMetricConfig(job.Cluster, metric)
if mc == nil {
// return nil, fmt.Errorf("METRICDATA/CCMS > metric '%s' is not specified for cluster '%s'", metric, job.Cluster)
log.Infof("metric '%s' is not specified for cluster '%s'", metric, job.Cluster)
continue
}
@@ -329,7 +368,6 @@ func (ccms *CCMetricStore) buildQueries(
// Accelerator -> Accelerator (Use "accelerator" scope if requested scope is lower than node)
if nativeScope == schema.MetricScopeAccelerator && scope.LT(schema.MetricScopeNode) {
if scope != schema.MetricScopeAccelerator {
// Skip all other catched cases
continue
}
@@ -502,6 +540,31 @@ func (ccms *CCMetricStore) buildQueries(
continue
}
// MemoryDomain -> Socket
if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeSocket {
memDomains, _ := topology.GetMemoryDomainsFromHWThreads(hwthreads)
socketToDomains, err := topology.GetMemoryDomainsBySocket(memDomains)
if err != nil {
log.Errorf("Error mapping memory domains to sockets: %v", err)
continue
}
// Create a query for each socket
for _, domains := range socketToDomains {
queries = append(queries, ApiQuery{
Metric: remoteName,
Hostname: host.Hostname,
Aggregate: true,
Type: &memoryDomainString,
TypeIds: intToStringSlice(domains),
Resolution: resolution,
})
// Add scope for each query, not just once
assignedScope = append(assignedScope, scope)
}
continue
}
// Socket -> Socket
if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeSocket {
sockets, _ := topology.GetSocketsFromHWThreads(hwthreads)
@@ -772,6 +835,12 @@ func (ccms *CCMetricStore) LoadNodeListData(
return nil, totalNodes, hasNextPage, err
}
// Verify assignment is correct - log any inconsistencies for debugging
if len(queries) != len(assignedScope) {
log.Errorf("Critical error: queries and assignedScope have different lengths after buildNodeQueries: %d vs %d",
len(queries), len(assignedScope))
}
req := ApiQueryRequest{
Cluster: cluster,
Queries: queries,
@@ -789,17 +858,48 @@ func (ccms *CCMetricStore) LoadNodeListData(
var errors []string
data := make(map[string]schema.JobData)
// Add safety check for index out of range issues
if len(resBody.Results) != len(req.Queries) || len(assignedScope) != len(req.Queries) {
log.Warnf("Mismatch in query results count: queries=%d, results=%d, assignedScope=%d",
len(req.Queries), len(resBody.Results), len(assignedScope))
if len(resBody.Results) > len(req.Queries) {
resBody.Results = resBody.Results[:len(req.Queries)]
}
if len(assignedScope) > len(req.Queries) {
assignedScope = assignedScope[:len(req.Queries)]
}
}
for i, row := range resBody.Results {
// Safety check to prevent index out of range errors
if i >= len(req.Queries) || i >= len(assignedScope) {
log.Warnf("Index out of range prevented: i=%d, queries=%d, assignedScope=%d",
i, len(req.Queries), len(assignedScope))
continue
}
var query ApiQuery
if resBody.Queries != nil {
query = resBody.Queries[i]
if i < len(resBody.Queries) {
query = resBody.Queries[i]
} else {
log.Warnf("Index out of range prevented for resBody.Queries: i=%d, len=%d",
i, len(resBody.Queries))
continue
}
} else {
query = req.Queries[i]
}
// qdata := res[0]
metric := ccms.toLocalName(query.Metric)
scope := assignedScope[i]
mc := archive.GetMetricConfig(cluster, metric)
if mc == nil {
log.Warnf("Metric config not found for %s on cluster %s", metric, cluster)
continue
}
res := mc.Timestep
if len(row) > 0 {
@@ -838,8 +938,15 @@ func (ccms *CCMetricStore) LoadNodeListData(
id := (*string)(nil)
if query.Type != nil {
id = new(string)
*id = query.TypeIds[ndx]
// Check if ndx is within the bounds of TypeIds slice
if ndx < len(query.TypeIds) {
id = new(string)
*id = query.TypeIds[ndx]
} else {
// Log the error but continue processing
log.Warnf("TypeIds index out of range: %d with length %d for metric %s on host %s",
ndx, len(query.TypeIds), query.Metric, query.Hostname)
}
}
if res.Avg.IsNaN() || res.Min.IsNaN() || res.Max.IsNaN() {
@@ -878,26 +985,14 @@ func (ccms *CCMetricStore) buildNodeQueries(
scopes []schema.MetricScope,
resolution int,
) ([]ApiQuery, []schema.MetricScope, error) {
// Initialize both slices together with the same capacity
queries := make([]ApiQuery, 0, len(metrics)*len(scopes)*len(nodes))
assignedScope := []schema.MetricScope{}
// Get Topol before loop if subCluster given
var subClusterTopol *schema.SubCluster
var scterr error
if subCluster != "" {
subClusterTopol, scterr = archive.GetSubCluster(cluster, subCluster)
if scterr != nil {
// TODO: Log
return nil, nil, scterr
}
}
assignedScope := make([]schema.MetricScope, 0, len(metrics)*len(scopes)*len(nodes))
for _, metric := range metrics {
remoteName := ccms.toRemoteName(metric)
mc := archive.GetMetricConfig(cluster, metric)
if mc == nil {
// return nil, fmt.Errorf("METRICDATA/CCMS > metric '%s' is not specified for cluster '%s'", metric, cluster)
log.Infof("metric '%s' is not specified for cluster '%s'", metric, cluster)
continue
}
@@ -918,22 +1013,22 @@ func (ccms *CCMetricStore) buildNodeQueries(
handledScopes = append(handledScopes, scope)
for _, hostname := range nodes {
var topology *schema.Topology
var err error
// If no subCluster given, get it by node
if subCluster == "" {
subClusterName, scnerr := archive.GetSubClusterByNode(cluster, hostname)
if scnerr != nil {
return nil, nil, scnerr
}
subClusterTopol, scterr = archive.GetSubCluster(cluster, subClusterName)
if scterr != nil {
return nil, nil, scterr
}
topology, err = ccms.getTopologyByNode(cluster, hostname)
} else {
topology, err = ccms.getTopology(cluster, subCluster)
}
if err != nil {
return nil, nil, err
}
// Always full node hwthread id list, no partial queries expected -> Use "topology.Node" directly where applicable
// Always full accelerator id list, no partial queries expected -> Use "acceleratorIds" directly where applicable
topology := subClusterTopol.Topology
acceleratorIds := topology.GetAcceleratorIDs()
// Moved check here if metric matches hardware specs
@@ -944,7 +1039,6 @@ func (ccms *CCMetricStore) buildNodeQueries(
// Accelerator -> Accelerator (Use "accelerator" scope if requested scope is lower than node)
if nativeScope == schema.MetricScopeAccelerator && scope.LT(schema.MetricScopeNode) {
if scope != schema.MetricScopeAccelerator {
// Skip all other catched cases
continue
}
@@ -1117,6 +1211,31 @@ func (ccms *CCMetricStore) buildNodeQueries(
continue
}
// MemoryDomain -> Socket
if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeSocket {
memDomains, _ := topology.GetMemoryDomainsFromHWThreads(topology.Node)
socketToDomains, err := topology.GetMemoryDomainsBySocket(memDomains)
if err != nil {
log.Errorf("Error mapping memory domains to sockets: %v", err)
continue
}
// Create a query for each socket
for _, domains := range socketToDomains {
queries = append(queries, ApiQuery{
Metric: remoteName,
Hostname: hostname,
Aggregate: true,
Type: &memoryDomainString,
TypeIds: intToStringSlice(domains),
Resolution: resolution,
})
// Add scope for each query, not just once
assignedScope = append(assignedScope, scope)
}
continue
}
// Socket -> Socket
if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeSocket {
sockets, _ := topology.GetSocketsFromHWThreads(topology.Node)
@@ -1173,3 +1292,29 @@ func intToStringSlice(is []int) []string {
}
return ss
}
// getTopology returns the topology for a given cluster and subcluster, caching it if not already present
func (ccms *CCMetricStore) getTopology(cluster, subCluster string) (*schema.Topology, error) {
cacheKey := fmt.Sprintf("%s:%s", cluster, subCluster)
if topology, ok := ccms.topologyCache[cacheKey]; ok {
return topology, nil
}
subcluster, err := archive.GetSubCluster(cluster, subCluster)
if err != nil {
return nil, err
}
ccms.topologyCache[cacheKey] = &subcluster.Topology
return &subcluster.Topology, nil
}
// getTopologyByNode returns the topology for a given cluster and node, caching it if not already present
func (ccms *CCMetricStore) getTopologyByNode(cluster, node string) (*schema.Topology, error) {
subCluster, err := archive.GetSubClusterByNode(cluster, node)
if err != nil {
return nil, err
}
return ccms.getTopology(cluster, subCluster)
}