mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2025-04-01 18:15:54 +02:00
Merge d009a065f116388503cf664d919b0b379f7e73aa into 0e27ae779594e07e2b1c0afb92b44749e36eb721
This commit is contained in:
commit
1db9050315
@ -40,6 +40,7 @@ type CCMetricStore struct {
|
|||||||
jwt string
|
jwt string
|
||||||
url string
|
url string
|
||||||
queryEndpoint string
|
queryEndpoint string
|
||||||
|
topologyCache map[string]*schema.Topology // cluster -> topology cache
|
||||||
}
|
}
|
||||||
|
|
||||||
type ApiQueryRequest struct {
|
type ApiQueryRequest struct {
|
||||||
@ -92,6 +93,7 @@ func (ccms *CCMetricStore) Init(rawConfig json.RawMessage) error {
|
|||||||
ccms.client = http.Client{
|
ccms.client = http.Client{
|
||||||
Timeout: 10 * time.Second,
|
Timeout: 10 * time.Second,
|
||||||
}
|
}
|
||||||
|
ccms.topologyCache = make(map[string]*schema.Topology)
|
||||||
|
|
||||||
if config.Renamings != nil {
|
if config.Renamings != nil {
|
||||||
ccms.here2there = config.Renamings
|
ccms.here2there = config.Renamings
|
||||||
@ -181,6 +183,12 @@ func (ccms *CCMetricStore) LoadData(
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Verify assignment is correct - log any inconsistencies for debugging
|
||||||
|
if len(queries) != len(assignedScope) {
|
||||||
|
log.Errorf("Critical error: queries and assignedScope have different lengths after buildQueries: %d vs %d",
|
||||||
|
len(queries), len(assignedScope))
|
||||||
|
}
|
||||||
|
|
||||||
req := ApiQueryRequest{
|
req := ApiQueryRequest{
|
||||||
Cluster: job.Cluster,
|
Cluster: job.Cluster,
|
||||||
From: job.StartTime.Unix(),
|
From: job.StartTime.Unix(),
|
||||||
@ -198,11 +206,36 @@ func (ccms *CCMetricStore) LoadData(
|
|||||||
|
|
||||||
var errors []string
|
var errors []string
|
||||||
jobData := make(schema.JobData)
|
jobData := make(schema.JobData)
|
||||||
|
|
||||||
|
// Add safety check for potential index out of range errors
|
||||||
|
if len(resBody.Results) != len(req.Queries) || len(assignedScope) != len(req.Queries) {
|
||||||
|
log.Warnf("Mismatch in query results count: queries=%d, results=%d, assignedScope=%d",
|
||||||
|
len(req.Queries), len(resBody.Results), len(assignedScope))
|
||||||
|
if len(resBody.Results) > len(req.Queries) {
|
||||||
|
resBody.Results = resBody.Results[:len(req.Queries)]
|
||||||
|
}
|
||||||
|
if len(assignedScope) > len(req.Queries) {
|
||||||
|
assignedScope = assignedScope[:len(req.Queries)]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
for i, row := range resBody.Results {
|
for i, row := range resBody.Results {
|
||||||
|
// Safety check to prevent index out of range errors
|
||||||
|
if i >= len(req.Queries) || i >= len(assignedScope) {
|
||||||
|
log.Warnf("Index out of range prevented: i=%d, queries=%d, assignedScope=%d",
|
||||||
|
i, len(req.Queries), len(assignedScope))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
query := req.Queries[i]
|
query := req.Queries[i]
|
||||||
metric := ccms.toLocalName(query.Metric)
|
metric := ccms.toLocalName(query.Metric)
|
||||||
scope := assignedScope[i]
|
scope := assignedScope[i]
|
||||||
mc := archive.GetMetricConfig(job.Cluster, metric)
|
mc := archive.GetMetricConfig(job.Cluster, metric)
|
||||||
|
if mc == nil {
|
||||||
|
log.Warnf("Metric config not found for %s on cluster %s", metric, job.Cluster)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
if _, ok := jobData[metric]; !ok {
|
if _, ok := jobData[metric]; !ok {
|
||||||
jobData[metric] = make(map[schema.MetricScope]*schema.JobMetric)
|
jobData[metric] = make(map[schema.MetricScope]*schema.JobMetric)
|
||||||
}
|
}
|
||||||
@ -231,8 +264,15 @@ func (ccms *CCMetricStore) LoadData(
|
|||||||
|
|
||||||
id := (*string)(nil)
|
id := (*string)(nil)
|
||||||
if query.Type != nil {
|
if query.Type != nil {
|
||||||
id = new(string)
|
// Check if ndx is within the bounds of TypeIds slice
|
||||||
*id = query.TypeIds[ndx]
|
if ndx < len(query.TypeIds) {
|
||||||
|
id = new(string)
|
||||||
|
*id = query.TypeIds[ndx]
|
||||||
|
} else {
|
||||||
|
// Log the error but continue processing
|
||||||
|
log.Warnf("TypeIds index out of range: %d with length %d for metric %s on host %s",
|
||||||
|
ndx, len(query.TypeIds), query.Metric, query.Hostname)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if res.Avg.IsNaN() || res.Min.IsNaN() || res.Max.IsNaN() {
|
if res.Avg.IsNaN() || res.Min.IsNaN() || res.Max.IsNaN() {
|
||||||
@ -284,20 +324,19 @@ func (ccms *CCMetricStore) buildQueries(
|
|||||||
scopes []schema.MetricScope,
|
scopes []schema.MetricScope,
|
||||||
resolution int,
|
resolution int,
|
||||||
) ([]ApiQuery, []schema.MetricScope, error) {
|
) ([]ApiQuery, []schema.MetricScope, error) {
|
||||||
|
// Initialize both slices together
|
||||||
queries := make([]ApiQuery, 0, len(metrics)*len(scopes)*len(job.Resources))
|
queries := make([]ApiQuery, 0, len(metrics)*len(scopes)*len(job.Resources))
|
||||||
assignedScope := []schema.MetricScope{}
|
assignedScope := make([]schema.MetricScope, 0, len(metrics)*len(scopes)*len(job.Resources))
|
||||||
|
|
||||||
subcluster, scerr := archive.GetSubCluster(job.Cluster, job.SubCluster)
|
topology, err := ccms.getTopology(job.Cluster, job.SubCluster)
|
||||||
if scerr != nil {
|
if err != nil {
|
||||||
return nil, nil, scerr
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
topology := subcluster.Topology
|
|
||||||
|
|
||||||
for _, metric := range metrics {
|
for _, metric := range metrics {
|
||||||
remoteName := ccms.toRemoteName(metric)
|
remoteName := ccms.toRemoteName(metric)
|
||||||
mc := archive.GetMetricConfig(job.Cluster, metric)
|
mc := archive.GetMetricConfig(job.Cluster, metric)
|
||||||
if mc == nil {
|
if mc == nil {
|
||||||
// return nil, fmt.Errorf("METRICDATA/CCMS > metric '%s' is not specified for cluster '%s'", metric, job.Cluster)
|
|
||||||
log.Infof("metric '%s' is not specified for cluster '%s'", metric, job.Cluster)
|
log.Infof("metric '%s' is not specified for cluster '%s'", metric, job.Cluster)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@ -329,7 +368,6 @@ func (ccms *CCMetricStore) buildQueries(
|
|||||||
// Accelerator -> Accelerator (Use "accelerator" scope if requested scope is lower than node)
|
// Accelerator -> Accelerator (Use "accelerator" scope if requested scope is lower than node)
|
||||||
if nativeScope == schema.MetricScopeAccelerator && scope.LT(schema.MetricScopeNode) {
|
if nativeScope == schema.MetricScopeAccelerator && scope.LT(schema.MetricScopeNode) {
|
||||||
if scope != schema.MetricScopeAccelerator {
|
if scope != schema.MetricScopeAccelerator {
|
||||||
// Skip all other catched cases
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -502,6 +540,31 @@ func (ccms *CCMetricStore) buildQueries(
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// MemoryDomain -> Socket
|
||||||
|
if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeSocket {
|
||||||
|
memDomains, _ := topology.GetMemoryDomainsFromHWThreads(hwthreads)
|
||||||
|
socketToDomains, err := topology.GetMemoryDomainsBySocket(memDomains)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Error mapping memory domains to sockets: %v", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create a query for each socket
|
||||||
|
for _, domains := range socketToDomains {
|
||||||
|
queries = append(queries, ApiQuery{
|
||||||
|
Metric: remoteName,
|
||||||
|
Hostname: host.Hostname,
|
||||||
|
Aggregate: true,
|
||||||
|
Type: &memoryDomainString,
|
||||||
|
TypeIds: intToStringSlice(domains),
|
||||||
|
Resolution: resolution,
|
||||||
|
})
|
||||||
|
// Add scope for each query, not just once
|
||||||
|
assignedScope = append(assignedScope, scope)
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
// Socket -> Socket
|
// Socket -> Socket
|
||||||
if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeSocket {
|
if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeSocket {
|
||||||
sockets, _ := topology.GetSocketsFromHWThreads(hwthreads)
|
sockets, _ := topology.GetSocketsFromHWThreads(hwthreads)
|
||||||
@ -772,6 +835,12 @@ func (ccms *CCMetricStore) LoadNodeListData(
|
|||||||
return nil, totalNodes, hasNextPage, err
|
return nil, totalNodes, hasNextPage, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Verify assignment is correct - log any inconsistencies for debugging
|
||||||
|
if len(queries) != len(assignedScope) {
|
||||||
|
log.Errorf("Critical error: queries and assignedScope have different lengths after buildNodeQueries: %d vs %d",
|
||||||
|
len(queries), len(assignedScope))
|
||||||
|
}
|
||||||
|
|
||||||
req := ApiQueryRequest{
|
req := ApiQueryRequest{
|
||||||
Cluster: cluster,
|
Cluster: cluster,
|
||||||
Queries: queries,
|
Queries: queries,
|
||||||
@ -789,17 +858,48 @@ func (ccms *CCMetricStore) LoadNodeListData(
|
|||||||
|
|
||||||
var errors []string
|
var errors []string
|
||||||
data := make(map[string]schema.JobData)
|
data := make(map[string]schema.JobData)
|
||||||
|
|
||||||
|
// Add safety check for index out of range issues
|
||||||
|
if len(resBody.Results) != len(req.Queries) || len(assignedScope) != len(req.Queries) {
|
||||||
|
log.Warnf("Mismatch in query results count: queries=%d, results=%d, assignedScope=%d",
|
||||||
|
len(req.Queries), len(resBody.Results), len(assignedScope))
|
||||||
|
if len(resBody.Results) > len(req.Queries) {
|
||||||
|
resBody.Results = resBody.Results[:len(req.Queries)]
|
||||||
|
}
|
||||||
|
if len(assignedScope) > len(req.Queries) {
|
||||||
|
assignedScope = assignedScope[:len(req.Queries)]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
for i, row := range resBody.Results {
|
for i, row := range resBody.Results {
|
||||||
|
// Safety check to prevent index out of range errors
|
||||||
|
if i >= len(req.Queries) || i >= len(assignedScope) {
|
||||||
|
log.Warnf("Index out of range prevented: i=%d, queries=%d, assignedScope=%d",
|
||||||
|
i, len(req.Queries), len(assignedScope))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
var query ApiQuery
|
var query ApiQuery
|
||||||
if resBody.Queries != nil {
|
if resBody.Queries != nil {
|
||||||
query = resBody.Queries[i]
|
if i < len(resBody.Queries) {
|
||||||
|
query = resBody.Queries[i]
|
||||||
|
} else {
|
||||||
|
log.Warnf("Index out of range prevented for resBody.Queries: i=%d, len=%d",
|
||||||
|
i, len(resBody.Queries))
|
||||||
|
continue
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
query = req.Queries[i]
|
query = req.Queries[i]
|
||||||
}
|
}
|
||||||
|
|
||||||
// qdata := res[0]
|
// qdata := res[0]
|
||||||
metric := ccms.toLocalName(query.Metric)
|
metric := ccms.toLocalName(query.Metric)
|
||||||
scope := assignedScope[i]
|
scope := assignedScope[i]
|
||||||
mc := archive.GetMetricConfig(cluster, metric)
|
mc := archive.GetMetricConfig(cluster, metric)
|
||||||
|
if mc == nil {
|
||||||
|
log.Warnf("Metric config not found for %s on cluster %s", metric, cluster)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
res := mc.Timestep
|
res := mc.Timestep
|
||||||
if len(row) > 0 {
|
if len(row) > 0 {
|
||||||
@ -838,8 +938,15 @@ func (ccms *CCMetricStore) LoadNodeListData(
|
|||||||
|
|
||||||
id := (*string)(nil)
|
id := (*string)(nil)
|
||||||
if query.Type != nil {
|
if query.Type != nil {
|
||||||
id = new(string)
|
// Check if ndx is within the bounds of TypeIds slice
|
||||||
*id = query.TypeIds[ndx]
|
if ndx < len(query.TypeIds) {
|
||||||
|
id = new(string)
|
||||||
|
*id = query.TypeIds[ndx]
|
||||||
|
} else {
|
||||||
|
// Log the error but continue processing
|
||||||
|
log.Warnf("TypeIds index out of range: %d with length %d for metric %s on host %s",
|
||||||
|
ndx, len(query.TypeIds), query.Metric, query.Hostname)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if res.Avg.IsNaN() || res.Min.IsNaN() || res.Max.IsNaN() {
|
if res.Avg.IsNaN() || res.Min.IsNaN() || res.Max.IsNaN() {
|
||||||
@ -878,26 +985,14 @@ func (ccms *CCMetricStore) buildNodeQueries(
|
|||||||
scopes []schema.MetricScope,
|
scopes []schema.MetricScope,
|
||||||
resolution int,
|
resolution int,
|
||||||
) ([]ApiQuery, []schema.MetricScope, error) {
|
) ([]ApiQuery, []schema.MetricScope, error) {
|
||||||
|
// Initialize both slices together with the same capacity
|
||||||
queries := make([]ApiQuery, 0, len(metrics)*len(scopes)*len(nodes))
|
queries := make([]ApiQuery, 0, len(metrics)*len(scopes)*len(nodes))
|
||||||
assignedScope := []schema.MetricScope{}
|
assignedScope := make([]schema.MetricScope, 0, len(metrics)*len(scopes)*len(nodes))
|
||||||
|
|
||||||
// Get Topol before loop if subCluster given
|
|
||||||
var subClusterTopol *schema.SubCluster
|
|
||||||
var scterr error
|
|
||||||
if subCluster != "" {
|
|
||||||
subClusterTopol, scterr = archive.GetSubCluster(cluster, subCluster)
|
|
||||||
if scterr != nil {
|
|
||||||
// TODO: Log
|
|
||||||
return nil, nil, scterr
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, metric := range metrics {
|
for _, metric := range metrics {
|
||||||
remoteName := ccms.toRemoteName(metric)
|
remoteName := ccms.toRemoteName(metric)
|
||||||
mc := archive.GetMetricConfig(cluster, metric)
|
mc := archive.GetMetricConfig(cluster, metric)
|
||||||
if mc == nil {
|
if mc == nil {
|
||||||
// return nil, fmt.Errorf("METRICDATA/CCMS > metric '%s' is not specified for cluster '%s'", metric, cluster)
|
|
||||||
log.Infof("metric '%s' is not specified for cluster '%s'", metric, cluster)
|
log.Infof("metric '%s' is not specified for cluster '%s'", metric, cluster)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@ -918,22 +1013,22 @@ func (ccms *CCMetricStore) buildNodeQueries(
|
|||||||
handledScopes = append(handledScopes, scope)
|
handledScopes = append(handledScopes, scope)
|
||||||
|
|
||||||
for _, hostname := range nodes {
|
for _, hostname := range nodes {
|
||||||
|
var topology *schema.Topology
|
||||||
|
var err error
|
||||||
|
|
||||||
// If no subCluster given, get it by node
|
// If no subCluster given, get it by node
|
||||||
if subCluster == "" {
|
if subCluster == "" {
|
||||||
subClusterName, scnerr := archive.GetSubClusterByNode(cluster, hostname)
|
topology, err = ccms.getTopologyByNode(cluster, hostname)
|
||||||
if scnerr != nil {
|
} else {
|
||||||
return nil, nil, scnerr
|
topology, err = ccms.getTopology(cluster, subCluster)
|
||||||
}
|
}
|
||||||
subClusterTopol, scterr = archive.GetSubCluster(cluster, subClusterName)
|
|
||||||
if scterr != nil {
|
if err != nil {
|
||||||
return nil, nil, scterr
|
return nil, nil, err
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Always full node hwthread id list, no partial queries expected -> Use "topology.Node" directly where applicable
|
// Always full node hwthread id list, no partial queries expected -> Use "topology.Node" directly where applicable
|
||||||
// Always full accelerator id list, no partial queries expected -> Use "acceleratorIds" directly where applicable
|
// Always full accelerator id list, no partial queries expected -> Use "acceleratorIds" directly where applicable
|
||||||
topology := subClusterTopol.Topology
|
|
||||||
acceleratorIds := topology.GetAcceleratorIDs()
|
acceleratorIds := topology.GetAcceleratorIDs()
|
||||||
|
|
||||||
// Moved check here if metric matches hardware specs
|
// Moved check here if metric matches hardware specs
|
||||||
@ -944,7 +1039,6 @@ func (ccms *CCMetricStore) buildNodeQueries(
|
|||||||
// Accelerator -> Accelerator (Use "accelerator" scope if requested scope is lower than node)
|
// Accelerator -> Accelerator (Use "accelerator" scope if requested scope is lower than node)
|
||||||
if nativeScope == schema.MetricScopeAccelerator && scope.LT(schema.MetricScopeNode) {
|
if nativeScope == schema.MetricScopeAccelerator && scope.LT(schema.MetricScopeNode) {
|
||||||
if scope != schema.MetricScopeAccelerator {
|
if scope != schema.MetricScopeAccelerator {
|
||||||
// Skip all other catched cases
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1117,6 +1211,31 @@ func (ccms *CCMetricStore) buildNodeQueries(
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// MemoryDomain -> Socket
|
||||||
|
if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeSocket {
|
||||||
|
memDomains, _ := topology.GetMemoryDomainsFromHWThreads(topology.Node)
|
||||||
|
socketToDomains, err := topology.GetMemoryDomainsBySocket(memDomains)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Error mapping memory domains to sockets: %v", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create a query for each socket
|
||||||
|
for _, domains := range socketToDomains {
|
||||||
|
queries = append(queries, ApiQuery{
|
||||||
|
Metric: remoteName,
|
||||||
|
Hostname: hostname,
|
||||||
|
Aggregate: true,
|
||||||
|
Type: &memoryDomainString,
|
||||||
|
TypeIds: intToStringSlice(domains),
|
||||||
|
Resolution: resolution,
|
||||||
|
})
|
||||||
|
// Add scope for each query, not just once
|
||||||
|
assignedScope = append(assignedScope, scope)
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
// Socket -> Socket
|
// Socket -> Socket
|
||||||
if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeSocket {
|
if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeSocket {
|
||||||
sockets, _ := topology.GetSocketsFromHWThreads(topology.Node)
|
sockets, _ := topology.GetSocketsFromHWThreads(topology.Node)
|
||||||
@ -1173,3 +1292,29 @@ func intToStringSlice(is []int) []string {
|
|||||||
}
|
}
|
||||||
return ss
|
return ss
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// getTopology returns the topology for a given cluster and subcluster, caching it if not already present
|
||||||
|
func (ccms *CCMetricStore) getTopology(cluster, subCluster string) (*schema.Topology, error) {
|
||||||
|
cacheKey := fmt.Sprintf("%s:%s", cluster, subCluster)
|
||||||
|
if topology, ok := ccms.topologyCache[cacheKey]; ok {
|
||||||
|
return topology, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
subcluster, err := archive.GetSubCluster(cluster, subCluster)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
ccms.topologyCache[cacheKey] = &subcluster.Topology
|
||||||
|
return &subcluster.Topology, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// getTopologyByNode returns the topology for a given cluster and node, caching it if not already present
|
||||||
|
func (ccms *CCMetricStore) getTopologyByNode(cluster, node string) (*schema.Topology, error) {
|
||||||
|
subCluster, err := archive.GetSubClusterByNode(cluster, node)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return ccms.getTopology(cluster, subCluster)
|
||||||
|
}
|
||||||
|
@ -22,6 +22,13 @@ type Topology struct {
|
|||||||
Die [][]*int `json:"die,omitempty"`
|
Die [][]*int `json:"die,omitempty"`
|
||||||
Core [][]int `json:"core"`
|
Core [][]int `json:"core"`
|
||||||
Accelerators []*Accelerator `json:"accelerators,omitempty"`
|
Accelerators []*Accelerator `json:"accelerators,omitempty"`
|
||||||
|
|
||||||
|
// Cache maps for faster lookups
|
||||||
|
hwthreadToSocket map[int][]int
|
||||||
|
hwthreadToCore map[int][]int
|
||||||
|
hwthreadToMemoryDomain map[int][]int
|
||||||
|
coreToSocket map[int][]int
|
||||||
|
memoryDomainToSocket map[int]int // New: Direct mapping from memory domain to socket
|
||||||
}
|
}
|
||||||
|
|
||||||
type MetricValue struct {
|
type MetricValue struct {
|
||||||
@ -92,156 +99,233 @@ type GlobalMetricListItem struct {
|
|||||||
Availability []ClusterSupport `json:"availability"`
|
Availability []ClusterSupport `json:"availability"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// Return a list of socket IDs given a list of hwthread IDs. Even if just one
|
// InitTopologyMaps initializes the topology mapping caches
|
||||||
// hwthread is in that socket, add it to the list. If no hwthreads other than
|
func (topo *Topology) InitTopologyMaps() {
|
||||||
// those in the argument list are assigned to one of the sockets in the first
|
// Initialize maps
|
||||||
// return value, return true as the second value. TODO: Optimize this, there
|
topo.hwthreadToSocket = make(map[int][]int)
|
||||||
// must be a more efficient way/algorithm.
|
topo.hwthreadToCore = make(map[int][]int)
|
||||||
|
topo.hwthreadToMemoryDomain = make(map[int][]int)
|
||||||
|
topo.coreToSocket = make(map[int][]int)
|
||||||
|
topo.memoryDomainToSocket = make(map[int]int)
|
||||||
|
|
||||||
|
// Build hwthread to socket mapping
|
||||||
|
for socketID, hwthreads := range topo.Socket {
|
||||||
|
for _, hwthread := range hwthreads {
|
||||||
|
topo.hwthreadToSocket[hwthread] = append(topo.hwthreadToSocket[hwthread], socketID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Build hwthread to core mapping
|
||||||
|
for coreID, hwthreads := range topo.Core {
|
||||||
|
for _, hwthread := range hwthreads {
|
||||||
|
topo.hwthreadToCore[hwthread] = append(topo.hwthreadToCore[hwthread], coreID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Build hwthread to memory domain mapping
|
||||||
|
for memDomID, hwthreads := range topo.MemoryDomain {
|
||||||
|
for _, hwthread := range hwthreads {
|
||||||
|
topo.hwthreadToMemoryDomain[hwthread] = append(topo.hwthreadToMemoryDomain[hwthread], memDomID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Build core to socket mapping
|
||||||
|
for coreID, hwthreads := range topo.Core {
|
||||||
|
socketSet := make(map[int]struct{})
|
||||||
|
for _, hwthread := range hwthreads {
|
||||||
|
for socketID := range topo.hwthreadToSocket[hwthread] {
|
||||||
|
socketSet[socketID] = struct{}{}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
topo.coreToSocket[coreID] = make([]int, 0, len(socketSet))
|
||||||
|
for socketID := range socketSet {
|
||||||
|
topo.coreToSocket[coreID] = append(topo.coreToSocket[coreID], socketID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Build memory domain to socket mapping
|
||||||
|
for memDomID, hwthreads := range topo.MemoryDomain {
|
||||||
|
if len(hwthreads) > 0 {
|
||||||
|
// Use the first hwthread to determine the socket
|
||||||
|
if socketIDs, ok := topo.hwthreadToSocket[hwthreads[0]]; ok && len(socketIDs) > 0 {
|
||||||
|
topo.memoryDomainToSocket[memDomID] = socketIDs[0]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// EnsureTopologyMaps ensures that the topology maps are initialized
|
||||||
|
func (topo *Topology) EnsureTopologyMaps() {
|
||||||
|
if topo.hwthreadToSocket == nil {
|
||||||
|
topo.InitTopologyMaps()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (topo *Topology) GetSocketsFromHWThreads(
|
func (topo *Topology) GetSocketsFromHWThreads(
|
||||||
hwthreads []int,
|
hwthreads []int,
|
||||||
) (sockets []int, exclusive bool) {
|
) (sockets []int, exclusive bool) {
|
||||||
socketsMap := map[int]int{}
|
topo.EnsureTopologyMaps()
|
||||||
|
|
||||||
|
socketsMap := make(map[int]int)
|
||||||
for _, hwthread := range hwthreads {
|
for _, hwthread := range hwthreads {
|
||||||
for socket, hwthreadsInSocket := range topo.Socket {
|
for _, socketID := range topo.hwthreadToSocket[hwthread] {
|
||||||
for _, hwthreadInSocket := range hwthreadsInSocket {
|
socketsMap[socketID]++
|
||||||
if hwthread == hwthreadInSocket {
|
|
||||||
socketsMap[socket] += 1
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
exclusive = true
|
exclusive = true
|
||||||
hwthreadsPerSocket := len(topo.Node) / len(topo.Socket)
|
|
||||||
sockets = make([]int, 0, len(socketsMap))
|
sockets = make([]int, 0, len(socketsMap))
|
||||||
for socket, count := range socketsMap {
|
for socket, count := range socketsMap {
|
||||||
sockets = append(sockets, socket)
|
sockets = append(sockets, socket)
|
||||||
exclusive = exclusive && count == hwthreadsPerSocket
|
// Check if all hwthreads in this socket are in our input list
|
||||||
|
exclusive = exclusive && count == len(topo.Socket[socket])
|
||||||
}
|
}
|
||||||
|
|
||||||
return sockets, exclusive
|
return sockets, exclusive
|
||||||
}
|
}
|
||||||
|
|
||||||
// Return a list of socket IDs given a list of core IDs. Even if just one
|
func (topo *Topology) GetSocketsFromCores(
|
||||||
// core is in that socket, add it to the list. If no cores other than
|
|
||||||
// those in the argument list are assigned to one of the sockets in the first
|
|
||||||
// return value, return true as the second value. TODO: Optimize this, there
|
|
||||||
// must be a more efficient way/algorithm.
|
|
||||||
func (topo *Topology) GetSocketsFromCores (
|
|
||||||
cores []int,
|
cores []int,
|
||||||
) (sockets []int, exclusive bool) {
|
) (sockets []int, exclusive bool) {
|
||||||
socketsMap := map[int]int{}
|
topo.EnsureTopologyMaps()
|
||||||
|
|
||||||
|
socketsMap := make(map[int]int)
|
||||||
for _, core := range cores {
|
for _, core := range cores {
|
||||||
for _, hwthreadInCore := range topo.Core[core] {
|
for _, socketID := range topo.coreToSocket[core] {
|
||||||
for socket, hwthreadsInSocket := range topo.Socket {
|
socketsMap[socketID]++
|
||||||
for _, hwthreadInSocket := range hwthreadsInSocket {
|
}
|
||||||
if hwthreadInCore == hwthreadInSocket {
|
}
|
||||||
socketsMap[socket] += 1
|
|
||||||
|
exclusive = true
|
||||||
|
sockets = make([]int, 0, len(socketsMap))
|
||||||
|
for socket, count := range socketsMap {
|
||||||
|
sockets = append(sockets, socket)
|
||||||
|
// Count total cores in this socket
|
||||||
|
totalCoresInSocket := 0
|
||||||
|
for _, hwthreads := range topo.Core {
|
||||||
|
for _, hwthread := range hwthreads {
|
||||||
|
for _, sID := range topo.hwthreadToSocket[hwthread] {
|
||||||
|
if sID == socket {
|
||||||
|
totalCoresInSocket++
|
||||||
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
exclusive = exclusive && count == totalCoresInSocket
|
||||||
|
|
||||||
exclusive = true
|
|
||||||
hwthreadsPerSocket := len(topo.Node) / len(topo.Socket)
|
|
||||||
sockets = make([]int, 0, len(socketsMap))
|
|
||||||
for socket, count := range socketsMap {
|
|
||||||
sockets = append(sockets, socket)
|
|
||||||
exclusive = exclusive && count == hwthreadsPerSocket
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return sockets, exclusive
|
return sockets, exclusive
|
||||||
}
|
}
|
||||||
|
|
||||||
// Return a list of core IDs given a list of hwthread IDs. Even if just one
|
|
||||||
// hwthread is in that core, add it to the list. If no hwthreads other than
|
|
||||||
// those in the argument list are assigned to one of the cores in the first
|
|
||||||
// return value, return true as the second value. TODO: Optimize this, there
|
|
||||||
// must be a more efficient way/algorithm.
|
|
||||||
func (topo *Topology) GetCoresFromHWThreads(
|
func (topo *Topology) GetCoresFromHWThreads(
|
||||||
hwthreads []int,
|
hwthreads []int,
|
||||||
) (cores []int, exclusive bool) {
|
) (cores []int, exclusive bool) {
|
||||||
coresMap := map[int]int{}
|
topo.EnsureTopologyMaps()
|
||||||
|
|
||||||
|
coresMap := make(map[int]int)
|
||||||
for _, hwthread := range hwthreads {
|
for _, hwthread := range hwthreads {
|
||||||
for core, hwthreadsInCore := range topo.Core {
|
for _, coreID := range topo.hwthreadToCore[hwthread] {
|
||||||
for _, hwthreadInCore := range hwthreadsInCore {
|
coresMap[coreID]++
|
||||||
if hwthread == hwthreadInCore {
|
|
||||||
coresMap[core] += 1
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
exclusive = true
|
exclusive = true
|
||||||
hwthreadsPerCore := len(topo.Node) / len(topo.Core)
|
|
||||||
cores = make([]int, 0, len(coresMap))
|
cores = make([]int, 0, len(coresMap))
|
||||||
for core, count := range coresMap {
|
for core, count := range coresMap {
|
||||||
cores = append(cores, core)
|
cores = append(cores, core)
|
||||||
exclusive = exclusive && count == hwthreadsPerCore
|
// Check if all hwthreads in this core are in our input list
|
||||||
|
exclusive = exclusive && count == len(topo.Core[core])
|
||||||
}
|
}
|
||||||
|
|
||||||
return cores, exclusive
|
return cores, exclusive
|
||||||
}
|
}
|
||||||
|
|
||||||
// Return a list of memory domain IDs given a list of hwthread IDs. Even if
|
|
||||||
// just one hwthread is in that memory domain, add it to the list. If no
|
|
||||||
// hwthreads other than those in the argument list are assigned to one of the
|
|
||||||
// memory domains in the first return value, return true as the second value.
|
|
||||||
// TODO: Optimize this, there must be a more efficient way/algorithm.
|
|
||||||
func (topo *Topology) GetMemoryDomainsFromHWThreads(
|
func (topo *Topology) GetMemoryDomainsFromHWThreads(
|
||||||
hwthreads []int,
|
hwthreads []int,
|
||||||
) (memDoms []int, exclusive bool) {
|
) (memDoms []int, exclusive bool) {
|
||||||
memDomsMap := map[int]int{}
|
topo.EnsureTopologyMaps()
|
||||||
|
|
||||||
|
memDomsMap := make(map[int]int)
|
||||||
for _, hwthread := range hwthreads {
|
for _, hwthread := range hwthreads {
|
||||||
for memDom, hwthreadsInmemDom := range topo.MemoryDomain {
|
for _, memDomID := range topo.hwthreadToMemoryDomain[hwthread] {
|
||||||
for _, hwthreadInmemDom := range hwthreadsInmemDom {
|
memDomsMap[memDomID]++
|
||||||
if hwthread == hwthreadInmemDom {
|
|
||||||
memDomsMap[memDom] += 1
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
exclusive = true
|
exclusive = true
|
||||||
hwthreadsPermemDom := len(topo.Node) / len(topo.MemoryDomain)
|
|
||||||
memDoms = make([]int, 0, len(memDomsMap))
|
memDoms = make([]int, 0, len(memDomsMap))
|
||||||
for memDom, count := range memDomsMap {
|
for memDom, count := range memDomsMap {
|
||||||
memDoms = append(memDoms, memDom)
|
memDoms = append(memDoms, memDom)
|
||||||
exclusive = exclusive && count == hwthreadsPermemDom
|
// Check if all hwthreads in this memory domain are in our input list
|
||||||
|
exclusive = exclusive && count == len(topo.MemoryDomain[memDom])
|
||||||
}
|
}
|
||||||
|
|
||||||
return memDoms, exclusive
|
return memDoms, exclusive
|
||||||
}
|
}
|
||||||
|
|
||||||
// Temporary fix to convert back from int id to string id for accelerators
|
// GetMemoryDomainsBySocket can now use the direct mapping
|
||||||
func (topo *Topology) GetAcceleratorID(id int) (string, error) {
|
func (topo *Topology) GetMemoryDomainsBySocket(domainIDs []int) (map[int][]int, error) {
|
||||||
if id < 0 {
|
socketToDomains := make(map[int][]int)
|
||||||
fmt.Printf("ID smaller than 0!\n")
|
for _, domainID := range domainIDs {
|
||||||
return topo.Accelerators[0].ID, nil
|
if domainID < 0 || domainID >= len(topo.MemoryDomain) || len(topo.MemoryDomain[domainID]) == 0 {
|
||||||
} else if id < len(topo.Accelerators) {
|
return nil, fmt.Errorf("MemoryDomain %d is invalid or empty", domainID)
|
||||||
return topo.Accelerators[id].ID, nil
|
}
|
||||||
} else {
|
|
||||||
return "", fmt.Errorf("index %d out of range", id)
|
socketID, ok := topo.memoryDomainToSocket[domainID]
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("MemoryDomain %d could not be assigned to any socket", domainID)
|
||||||
|
}
|
||||||
|
|
||||||
|
socketToDomains[socketID] = append(socketToDomains[socketID], domainID)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return socketToDomains, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Return list of hardware (string) accelerator IDs
|
// GetAcceleratorID converts a numeric ID to the corresponding Accelerator ID as a string.
|
||||||
|
// This is useful when accelerators are stored in arrays and accessed by index.
|
||||||
|
func (topo *Topology) GetAcceleratorID(id int) (string, error) {
|
||||||
|
if id < 0 {
|
||||||
|
return "", fmt.Errorf("accelerator ID %d is negative", id)
|
||||||
|
}
|
||||||
|
|
||||||
|
if id >= len(topo.Accelerators) {
|
||||||
|
return "", fmt.Errorf("accelerator index %d out of valid range (max: %d)",
|
||||||
|
id, len(topo.Accelerators)-1)
|
||||||
|
}
|
||||||
|
|
||||||
|
return topo.Accelerators[id].ID, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetAcceleratorIDs returns a list of all Accelerator IDs (as strings).
|
||||||
|
// Capacity is pre-allocated to improve efficiency.
|
||||||
func (topo *Topology) GetAcceleratorIDs() []string {
|
func (topo *Topology) GetAcceleratorIDs() []string {
|
||||||
accels := make([]string, 0)
|
if len(topo.Accelerators) == 0 {
|
||||||
|
return []string{}
|
||||||
|
}
|
||||||
|
|
||||||
|
accels := make([]string, 0, len(topo.Accelerators))
|
||||||
for _, accel := range topo.Accelerators {
|
for _, accel := range topo.Accelerators {
|
||||||
accels = append(accels, accel.ID)
|
accels = append(accels, accel.ID)
|
||||||
}
|
}
|
||||||
return accels
|
return accels
|
||||||
}
|
}
|
||||||
|
|
||||||
// Outdated? Or: Return indices of accelerators in parent array?
|
// GetAcceleratorIDsAsInt converts all Accelerator IDs to integer values.
|
||||||
|
// This function can fail if the IDs cannot be interpreted as numbers.
|
||||||
|
// Capacity is pre-allocated to improve efficiency.
|
||||||
func (topo *Topology) GetAcceleratorIDsAsInt() ([]int, error) {
|
func (topo *Topology) GetAcceleratorIDsAsInt() ([]int, error) {
|
||||||
accels := make([]int, 0)
|
if len(topo.Accelerators) == 0 {
|
||||||
for _, accel := range topo.Accelerators {
|
return []int{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
accels := make([]int, 0, len(topo.Accelerators))
|
||||||
|
for i, accel := range topo.Accelerators {
|
||||||
id, err := strconv.Atoi(accel.ID)
|
id, err := strconv.Atoi(accel.ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, fmt.Errorf("accelerator ID at position %d (%s) cannot be converted to a number: %w",
|
||||||
|
i, accel.ID, err)
|
||||||
}
|
}
|
||||||
accels = append(accels, id)
|
accels = append(accels, id)
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user