diff --git a/internal/metricdata/cc-metric-store.go b/internal/metricdata/cc-metric-store.go index 2b92fbb..fb6aca1 100644 --- a/internal/metricdata/cc-metric-store.go +++ b/internal/metricdata/cc-metric-store.go @@ -40,6 +40,7 @@ type CCMetricStore struct { jwt string url string queryEndpoint string + topologyCache map[string]*schema.Topology // cluster -> topology cache } type ApiQueryRequest struct { @@ -92,6 +93,7 @@ func (ccms *CCMetricStore) Init(rawConfig json.RawMessage) error { ccms.client = http.Client{ Timeout: 10 * time.Second, } + ccms.topologyCache = make(map[string]*schema.Topology) if config.Renamings != nil { ccms.here2there = config.Renamings @@ -181,6 +183,12 @@ func (ccms *CCMetricStore) LoadData( return nil, err } + // Verify assignment is correct - log any inconsistencies for debugging + if len(queries) != len(assignedScope) { + log.Errorf("Critical error: queries and assignedScope have different lengths after buildQueries: %d vs %d", + len(queries), len(assignedScope)) + } + req := ApiQueryRequest{ Cluster: job.Cluster, From: job.StartTime.Unix(), @@ -198,11 +206,36 @@ func (ccms *CCMetricStore) LoadData( var errors []string jobData := make(schema.JobData) + + // Add safety check for potential index out of range errors + if len(resBody.Results) != len(req.Queries) || len(assignedScope) != len(req.Queries) { + log.Warnf("Mismatch in query results count: queries=%d, results=%d, assignedScope=%d", + len(req.Queries), len(resBody.Results), len(assignedScope)) + if len(resBody.Results) > len(req.Queries) { + resBody.Results = resBody.Results[:len(req.Queries)] + } + if len(assignedScope) > len(req.Queries) { + assignedScope = assignedScope[:len(req.Queries)] + } + } + for i, row := range resBody.Results { + // Safety check to prevent index out of range errors + if i >= len(req.Queries) || i >= len(assignedScope) { + log.Warnf("Index out of range prevented: i=%d, queries=%d, assignedScope=%d", + i, len(req.Queries), len(assignedScope)) + continue + } + query := req.Queries[i] metric := ccms.toLocalName(query.Metric) scope := assignedScope[i] mc := archive.GetMetricConfig(job.Cluster, metric) + if mc == nil { + log.Warnf("Metric config not found for %s on cluster %s", metric, job.Cluster) + continue + } + if _, ok := jobData[metric]; !ok { jobData[metric] = make(map[schema.MetricScope]*schema.JobMetric) } @@ -231,8 +264,15 @@ func (ccms *CCMetricStore) LoadData( id := (*string)(nil) if query.Type != nil { - id = new(string) - *id = query.TypeIds[ndx] + // Check if ndx is within the bounds of TypeIds slice + if ndx < len(query.TypeIds) { + id = new(string) + *id = query.TypeIds[ndx] + } else { + // Log the error but continue processing + log.Warnf("TypeIds index out of range: %d with length %d for metric %s on host %s", + ndx, len(query.TypeIds), query.Metric, query.Hostname) + } } if res.Avg.IsNaN() || res.Min.IsNaN() || res.Max.IsNaN() { @@ -284,20 +324,19 @@ func (ccms *CCMetricStore) buildQueries( scopes []schema.MetricScope, resolution int, ) ([]ApiQuery, []schema.MetricScope, error) { + // Initialize both slices together queries := make([]ApiQuery, 0, len(metrics)*len(scopes)*len(job.Resources)) - assignedScope := []schema.MetricScope{} + assignedScope := make([]schema.MetricScope, 0, len(metrics)*len(scopes)*len(job.Resources)) - subcluster, scerr := archive.GetSubCluster(job.Cluster, job.SubCluster) - if scerr != nil { - return nil, nil, scerr + topology, err := ccms.getTopology(job.Cluster, job.SubCluster) + if err != nil { + return nil, nil, err } - topology := subcluster.Topology for _, metric := range metrics { remoteName := ccms.toRemoteName(metric) mc := archive.GetMetricConfig(job.Cluster, metric) if mc == nil { - // return nil, fmt.Errorf("METRICDATA/CCMS > metric '%s' is not specified for cluster '%s'", metric, job.Cluster) log.Infof("metric '%s' is not specified for cluster '%s'", metric, job.Cluster) continue } @@ -329,7 +368,6 @@ func (ccms *CCMetricStore) buildQueries( // Accelerator -> Accelerator (Use "accelerator" scope if requested scope is lower than node) if nativeScope == schema.MetricScopeAccelerator && scope.LT(schema.MetricScopeNode) { if scope != schema.MetricScopeAccelerator { - // Skip all other catched cases continue } @@ -502,6 +540,31 @@ func (ccms *CCMetricStore) buildQueries( continue } + // MemoryDomain -> Socket + if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeSocket { + memDomains, _ := topology.GetMemoryDomainsFromHWThreads(hwthreads) + socketToDomains, err := topology.GetMemoryDomainsBySocket(memDomains) + if err != nil { + log.Errorf("Error mapping memory domains to sockets: %v", err) + continue + } + + // Create a query for each socket + for _, domains := range socketToDomains { + queries = append(queries, ApiQuery{ + Metric: remoteName, + Hostname: host.Hostname, + Aggregate: true, + Type: &memoryDomainString, + TypeIds: intToStringSlice(domains), + Resolution: resolution, + }) + // Add scope for each query, not just once + assignedScope = append(assignedScope, scope) + } + continue + } + // Socket -> Socket if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeSocket { sockets, _ := topology.GetSocketsFromHWThreads(hwthreads) @@ -772,6 +835,12 @@ func (ccms *CCMetricStore) LoadNodeListData( return nil, totalNodes, hasNextPage, err } + // Verify assignment is correct - log any inconsistencies for debugging + if len(queries) != len(assignedScope) { + log.Errorf("Critical error: queries and assignedScope have different lengths after buildNodeQueries: %d vs %d", + len(queries), len(assignedScope)) + } + req := ApiQueryRequest{ Cluster: cluster, Queries: queries, @@ -789,17 +858,48 @@ func (ccms *CCMetricStore) LoadNodeListData( var errors []string data := make(map[string]schema.JobData) + + // Add safety check for index out of range issues + if len(resBody.Results) != len(req.Queries) || len(assignedScope) != len(req.Queries) { + log.Warnf("Mismatch in query results count: queries=%d, results=%d, assignedScope=%d", + len(req.Queries), len(resBody.Results), len(assignedScope)) + if len(resBody.Results) > len(req.Queries) { + resBody.Results = resBody.Results[:len(req.Queries)] + } + if len(assignedScope) > len(req.Queries) { + assignedScope = assignedScope[:len(req.Queries)] + } + } + for i, row := range resBody.Results { + // Safety check to prevent index out of range errors + if i >= len(req.Queries) || i >= len(assignedScope) { + log.Warnf("Index out of range prevented: i=%d, queries=%d, assignedScope=%d", + i, len(req.Queries), len(assignedScope)) + continue + } + var query ApiQuery if resBody.Queries != nil { - query = resBody.Queries[i] + if i < len(resBody.Queries) { + query = resBody.Queries[i] + } else { + log.Warnf("Index out of range prevented for resBody.Queries: i=%d, len=%d", + i, len(resBody.Queries)) + continue + } } else { query = req.Queries[i] } + // qdata := res[0] metric := ccms.toLocalName(query.Metric) scope := assignedScope[i] mc := archive.GetMetricConfig(cluster, metric) + if mc == nil { + log.Warnf("Metric config not found for %s on cluster %s", metric, cluster) + continue + } res := mc.Timestep if len(row) > 0 { @@ -838,8 +938,15 @@ func (ccms *CCMetricStore) LoadNodeListData( id := (*string)(nil) if query.Type != nil { - id = new(string) - *id = query.TypeIds[ndx] + // Check if ndx is within the bounds of TypeIds slice + if ndx < len(query.TypeIds) { + id = new(string) + *id = query.TypeIds[ndx] + } else { + // Log the error but continue processing + log.Warnf("TypeIds index out of range: %d with length %d for metric %s on host %s", + ndx, len(query.TypeIds), query.Metric, query.Hostname) + } } if res.Avg.IsNaN() || res.Min.IsNaN() || res.Max.IsNaN() { @@ -878,26 +985,14 @@ func (ccms *CCMetricStore) buildNodeQueries( scopes []schema.MetricScope, resolution int, ) ([]ApiQuery, []schema.MetricScope, error) { - + // Initialize both slices together with the same capacity queries := make([]ApiQuery, 0, len(metrics)*len(scopes)*len(nodes)) - assignedScope := []schema.MetricScope{} - - // Get Topol before loop if subCluster given - var subClusterTopol *schema.SubCluster - var scterr error - if subCluster != "" { - subClusterTopol, scterr = archive.GetSubCluster(cluster, subCluster) - if scterr != nil { - // TODO: Log - return nil, nil, scterr - } - } + assignedScope := make([]schema.MetricScope, 0, len(metrics)*len(scopes)*len(nodes)) for _, metric := range metrics { remoteName := ccms.toRemoteName(metric) mc := archive.GetMetricConfig(cluster, metric) if mc == nil { - // return nil, fmt.Errorf("METRICDATA/CCMS > metric '%s' is not specified for cluster '%s'", metric, cluster) log.Infof("metric '%s' is not specified for cluster '%s'", metric, cluster) continue } @@ -918,22 +1013,22 @@ func (ccms *CCMetricStore) buildNodeQueries( handledScopes = append(handledScopes, scope) for _, hostname := range nodes { + var topology *schema.Topology + var err error // If no subCluster given, get it by node if subCluster == "" { - subClusterName, scnerr := archive.GetSubClusterByNode(cluster, hostname) - if scnerr != nil { - return nil, nil, scnerr - } - subClusterTopol, scterr = archive.GetSubCluster(cluster, subClusterName) - if scterr != nil { - return nil, nil, scterr - } + topology, err = ccms.getTopologyByNode(cluster, hostname) + } else { + topology, err = ccms.getTopology(cluster, subCluster) + } + + if err != nil { + return nil, nil, err } // Always full node hwthread id list, no partial queries expected -> Use "topology.Node" directly where applicable // Always full accelerator id list, no partial queries expected -> Use "acceleratorIds" directly where applicable - topology := subClusterTopol.Topology acceleratorIds := topology.GetAcceleratorIDs() // Moved check here if metric matches hardware specs @@ -944,7 +1039,6 @@ func (ccms *CCMetricStore) buildNodeQueries( // Accelerator -> Accelerator (Use "accelerator" scope if requested scope is lower than node) if nativeScope == schema.MetricScopeAccelerator && scope.LT(schema.MetricScopeNode) { if scope != schema.MetricScopeAccelerator { - // Skip all other catched cases continue } @@ -1117,6 +1211,31 @@ func (ccms *CCMetricStore) buildNodeQueries( continue } + // MemoryDomain -> Socket + if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeSocket { + memDomains, _ := topology.GetMemoryDomainsFromHWThreads(topology.Node) + socketToDomains, err := topology.GetMemoryDomainsBySocket(memDomains) + if err != nil { + log.Errorf("Error mapping memory domains to sockets: %v", err) + continue + } + + // Create a query for each socket + for _, domains := range socketToDomains { + queries = append(queries, ApiQuery{ + Metric: remoteName, + Hostname: hostname, + Aggregate: true, + Type: &memoryDomainString, + TypeIds: intToStringSlice(domains), + Resolution: resolution, + }) + // Add scope for each query, not just once + assignedScope = append(assignedScope, scope) + } + continue + } + // Socket -> Socket if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeSocket { sockets, _ := topology.GetSocketsFromHWThreads(topology.Node) @@ -1173,3 +1292,29 @@ func intToStringSlice(is []int) []string { } return ss } + +// getTopology returns the topology for a given cluster and subcluster, caching it if not already present +func (ccms *CCMetricStore) getTopology(cluster, subCluster string) (*schema.Topology, error) { + cacheKey := fmt.Sprintf("%s:%s", cluster, subCluster) + if topology, ok := ccms.topologyCache[cacheKey]; ok { + return topology, nil + } + + subcluster, err := archive.GetSubCluster(cluster, subCluster) + if err != nil { + return nil, err + } + + ccms.topologyCache[cacheKey] = &subcluster.Topology + return &subcluster.Topology, nil +} + +// getTopologyByNode returns the topology for a given cluster and node, caching it if not already present +func (ccms *CCMetricStore) getTopologyByNode(cluster, node string) (*schema.Topology, error) { + subCluster, err := archive.GetSubClusterByNode(cluster, node) + if err != nil { + return nil, err + } + + return ccms.getTopology(cluster, subCluster) +} diff --git a/pkg/schema/cluster.go b/pkg/schema/cluster.go index 322f308..54f50a0 100644 --- a/pkg/schema/cluster.go +++ b/pkg/schema/cluster.go @@ -22,6 +22,13 @@ type Topology struct { Die [][]*int `json:"die,omitempty"` Core [][]int `json:"core"` Accelerators []*Accelerator `json:"accelerators,omitempty"` + + // Cache maps for faster lookups + hwthreadToSocket map[int][]int + hwthreadToCore map[int][]int + hwthreadToMemoryDomain map[int][]int + coreToSocket map[int][]int + memoryDomainToSocket map[int]int // New: Direct mapping from memory domain to socket } type MetricValue struct { @@ -92,156 +99,233 @@ type GlobalMetricListItem struct { Availability []ClusterSupport `json:"availability"` } -// Return a list of socket IDs given a list of hwthread IDs. Even if just one -// hwthread is in that socket, add it to the list. If no hwthreads other than -// those in the argument list are assigned to one of the sockets in the first -// return value, return true as the second value. TODO: Optimize this, there -// must be a more efficient way/algorithm. +// InitTopologyMaps initializes the topology mapping caches +func (topo *Topology) InitTopologyMaps() { + // Initialize maps + topo.hwthreadToSocket = make(map[int][]int) + topo.hwthreadToCore = make(map[int][]int) + topo.hwthreadToMemoryDomain = make(map[int][]int) + topo.coreToSocket = make(map[int][]int) + topo.memoryDomainToSocket = make(map[int]int) + + // Build hwthread to socket mapping + for socketID, hwthreads := range topo.Socket { + for _, hwthread := range hwthreads { + topo.hwthreadToSocket[hwthread] = append(topo.hwthreadToSocket[hwthread], socketID) + } + } + + // Build hwthread to core mapping + for coreID, hwthreads := range topo.Core { + for _, hwthread := range hwthreads { + topo.hwthreadToCore[hwthread] = append(topo.hwthreadToCore[hwthread], coreID) + } + } + + // Build hwthread to memory domain mapping + for memDomID, hwthreads := range topo.MemoryDomain { + for _, hwthread := range hwthreads { + topo.hwthreadToMemoryDomain[hwthread] = append(topo.hwthreadToMemoryDomain[hwthread], memDomID) + } + } + + // Build core to socket mapping + for coreID, hwthreads := range topo.Core { + socketSet := make(map[int]struct{}) + for _, hwthread := range hwthreads { + for socketID := range topo.hwthreadToSocket[hwthread] { + socketSet[socketID] = struct{}{} + } + } + topo.coreToSocket[coreID] = make([]int, 0, len(socketSet)) + for socketID := range socketSet { + topo.coreToSocket[coreID] = append(topo.coreToSocket[coreID], socketID) + } + } + + // Build memory domain to socket mapping + for memDomID, hwthreads := range topo.MemoryDomain { + if len(hwthreads) > 0 { + // Use the first hwthread to determine the socket + if socketIDs, ok := topo.hwthreadToSocket[hwthreads[0]]; ok && len(socketIDs) > 0 { + topo.memoryDomainToSocket[memDomID] = socketIDs[0] + } + } + } +} + +// EnsureTopologyMaps ensures that the topology maps are initialized +func (topo *Topology) EnsureTopologyMaps() { + if topo.hwthreadToSocket == nil { + topo.InitTopologyMaps() + } +} + func (topo *Topology) GetSocketsFromHWThreads( hwthreads []int, ) (sockets []int, exclusive bool) { - socketsMap := map[int]int{} + topo.EnsureTopologyMaps() + + socketsMap := make(map[int]int) for _, hwthread := range hwthreads { - for socket, hwthreadsInSocket := range topo.Socket { - for _, hwthreadInSocket := range hwthreadsInSocket { - if hwthread == hwthreadInSocket { - socketsMap[socket] += 1 - } - } + for _, socketID := range topo.hwthreadToSocket[hwthread] { + socketsMap[socketID]++ } } exclusive = true - hwthreadsPerSocket := len(topo.Node) / len(topo.Socket) sockets = make([]int, 0, len(socketsMap)) for socket, count := range socketsMap { sockets = append(sockets, socket) - exclusive = exclusive && count == hwthreadsPerSocket + // Check if all hwthreads in this socket are in our input list + exclusive = exclusive && count == len(topo.Socket[socket]) } return sockets, exclusive } -// Return a list of socket IDs given a list of core IDs. Even if just one -// core is in that socket, add it to the list. If no cores other than -// those in the argument list are assigned to one of the sockets in the first -// return value, return true as the second value. TODO: Optimize this, there -// must be a more efficient way/algorithm. -func (topo *Topology) GetSocketsFromCores ( +func (topo *Topology) GetSocketsFromCores( cores []int, ) (sockets []int, exclusive bool) { - socketsMap := map[int]int{} + topo.EnsureTopologyMaps() + + socketsMap := make(map[int]int) for _, core := range cores { - for _, hwthreadInCore := range topo.Core[core] { - for socket, hwthreadsInSocket := range topo.Socket { - for _, hwthreadInSocket := range hwthreadsInSocket { - if hwthreadInCore == hwthreadInSocket { - socketsMap[socket] += 1 + for _, socketID := range topo.coreToSocket[core] { + socketsMap[socketID]++ + } + } + + exclusive = true + sockets = make([]int, 0, len(socketsMap)) + for socket, count := range socketsMap { + sockets = append(sockets, socket) + // Count total cores in this socket + totalCoresInSocket := 0 + for _, hwthreads := range topo.Core { + for _, hwthread := range hwthreads { + for _, sID := range topo.hwthreadToSocket[hwthread] { + if sID == socket { + totalCoresInSocket++ + break } } } } - } - - exclusive = true - hwthreadsPerSocket := len(topo.Node) / len(topo.Socket) - sockets = make([]int, 0, len(socketsMap)) - for socket, count := range socketsMap { - sockets = append(sockets, socket) - exclusive = exclusive && count == hwthreadsPerSocket + exclusive = exclusive && count == totalCoresInSocket } return sockets, exclusive } -// Return a list of core IDs given a list of hwthread IDs. Even if just one -// hwthread is in that core, add it to the list. If no hwthreads other than -// those in the argument list are assigned to one of the cores in the first -// return value, return true as the second value. TODO: Optimize this, there -// must be a more efficient way/algorithm. func (topo *Topology) GetCoresFromHWThreads( hwthreads []int, ) (cores []int, exclusive bool) { - coresMap := map[int]int{} + topo.EnsureTopologyMaps() + + coresMap := make(map[int]int) for _, hwthread := range hwthreads { - for core, hwthreadsInCore := range topo.Core { - for _, hwthreadInCore := range hwthreadsInCore { - if hwthread == hwthreadInCore { - coresMap[core] += 1 - } - } + for _, coreID := range topo.hwthreadToCore[hwthread] { + coresMap[coreID]++ } } exclusive = true - hwthreadsPerCore := len(topo.Node) / len(topo.Core) cores = make([]int, 0, len(coresMap)) for core, count := range coresMap { cores = append(cores, core) - exclusive = exclusive && count == hwthreadsPerCore + // Check if all hwthreads in this core are in our input list + exclusive = exclusive && count == len(topo.Core[core]) } return cores, exclusive } -// Return a list of memory domain IDs given a list of hwthread IDs. Even if -// just one hwthread is in that memory domain, add it to the list. If no -// hwthreads other than those in the argument list are assigned to one of the -// memory domains in the first return value, return true as the second value. -// TODO: Optimize this, there must be a more efficient way/algorithm. func (topo *Topology) GetMemoryDomainsFromHWThreads( hwthreads []int, ) (memDoms []int, exclusive bool) { - memDomsMap := map[int]int{} + topo.EnsureTopologyMaps() + + memDomsMap := make(map[int]int) for _, hwthread := range hwthreads { - for memDom, hwthreadsInmemDom := range topo.MemoryDomain { - for _, hwthreadInmemDom := range hwthreadsInmemDom { - if hwthread == hwthreadInmemDom { - memDomsMap[memDom] += 1 - } - } + for _, memDomID := range topo.hwthreadToMemoryDomain[hwthread] { + memDomsMap[memDomID]++ } } exclusive = true - hwthreadsPermemDom := len(topo.Node) / len(topo.MemoryDomain) memDoms = make([]int, 0, len(memDomsMap)) for memDom, count := range memDomsMap { memDoms = append(memDoms, memDom) - exclusive = exclusive && count == hwthreadsPermemDom + // Check if all hwthreads in this memory domain are in our input list + exclusive = exclusive && count == len(topo.MemoryDomain[memDom]) } return memDoms, exclusive } -// Temporary fix to convert back from int id to string id for accelerators -func (topo *Topology) GetAcceleratorID(id int) (string, error) { - if id < 0 { - fmt.Printf("ID smaller than 0!\n") - return topo.Accelerators[0].ID, nil - } else if id < len(topo.Accelerators) { - return topo.Accelerators[id].ID, nil - } else { - return "", fmt.Errorf("index %d out of range", id) +// GetMemoryDomainsBySocket can now use the direct mapping +func (topo *Topology) GetMemoryDomainsBySocket(domainIDs []int) (map[int][]int, error) { + socketToDomains := make(map[int][]int) + for _, domainID := range domainIDs { + if domainID < 0 || domainID >= len(topo.MemoryDomain) || len(topo.MemoryDomain[domainID]) == 0 { + return nil, fmt.Errorf("MemoryDomain %d is invalid or empty", domainID) + } + + socketID, ok := topo.memoryDomainToSocket[domainID] + if !ok { + return nil, fmt.Errorf("MemoryDomain %d could not be assigned to any socket", domainID) + } + + socketToDomains[socketID] = append(socketToDomains[socketID], domainID) } + + return socketToDomains, nil } -// Return list of hardware (string) accelerator IDs +// GetAcceleratorID converts a numeric ID to the corresponding Accelerator ID as a string. +// This is useful when accelerators are stored in arrays and accessed by index. +func (topo *Topology) GetAcceleratorID(id int) (string, error) { + if id < 0 { + return "", fmt.Errorf("accelerator ID %d is negative", id) + } + + if id >= len(topo.Accelerators) { + return "", fmt.Errorf("accelerator index %d out of valid range (max: %d)", + id, len(topo.Accelerators)-1) + } + + return topo.Accelerators[id].ID, nil +} + +// GetAcceleratorIDs returns a list of all Accelerator IDs (as strings). +// Capacity is pre-allocated to improve efficiency. func (topo *Topology) GetAcceleratorIDs() []string { - accels := make([]string, 0) + if len(topo.Accelerators) == 0 { + return []string{} + } + + accels := make([]string, 0, len(topo.Accelerators)) for _, accel := range topo.Accelerators { accels = append(accels, accel.ID) } return accels } -// Outdated? Or: Return indices of accelerators in parent array? +// GetAcceleratorIDsAsInt converts all Accelerator IDs to integer values. +// This function can fail if the IDs cannot be interpreted as numbers. +// Capacity is pre-allocated to improve efficiency. func (topo *Topology) GetAcceleratorIDsAsInt() ([]int, error) { - accels := make([]int, 0) - for _, accel := range topo.Accelerators { + if len(topo.Accelerators) == 0 { + return []int{}, nil + } + + accels := make([]int, 0, len(topo.Accelerators)) + for i, accel := range topo.Accelerators { id, err := strconv.Atoi(accel.ID) if err != nil { - return nil, err + return nil, fmt.Errorf("accelerator ID at position %d (%s) cannot be converted to a number: %w", + i, accel.ID, err) } accels = append(accels, id) }