cc-metric-collector/pkg/ccTopology/ccTopology.go
Thomas Gruber 6ab45dd3ec
Merge develop into main (#109)
* Add cpu_used (all-cpu_idle) to CpustatCollector

* Update to line-protocol/v2

* Update runonce.yml with Golang 1.20

* Update fsnotify in LIKWID Collector

* Use not a pointer to line-protocol.Encoder

* Simplify Makefile

* Use only as many arguments as required

* Allow sum function to handle non float types

* Allow values to be a slice of type float64, float32, int, int64, int32, bool

* Use generic function to simplify code

* Add missing case for type []int32

* Use generic function to compute minimum

* Use generic function to compute maximum

* Use generic function to compute average

* Add error value to sumAnyType

* Use generic function to compute median

* For older versions of go slices is not part of the installation

* Remove old entries from go.sum

* Use simpler sort function

* Compute metrics ib_total and ib_total_pkts

* Add aggregated metrics.
Add missing units

* Update likwidMetric.go

Fixes a potential bug when `fsnotify.NewWatcher()` fails with an error

* Completly avoid memory allocations in infinibandMetric read()

* Fixed initialization: Initalization and measurements should run in the same thread

* Add safe.directory to Release action

* Fix path after installation to /usr/bin after installation

* ioutil.ReadFile is deprecated: As of Go 1.16, this function simply calls os.ReadFile

* Switch to package slices from the golang 1.21 default library

* Read file line by line

* Read file line by line

* Read file line by line

* Use CamelCase

* Use CamelCase

* Fix function getNumaDomain, it always returned 0

* Avoid type conversion by using Atoi
Avoid copying structs by using pointer access
Increase readability with CamelCase variable names

* Add caching

* Cache CpuData

* Cleanup

* Use init function to initalize cache structure to avoid multi threading problems

* Reuse information from /proc/cpuinfo

* Avoid slice cloning. Directly use the cache

* Add DieList

* Add NumaDomainList and SMTList

* Cleanup

* Add comment

* Lookup core ID from /sys/devices/system/cpu, /proc/cpuinfo is not portable

* Lookup all information from /sys/devices/system/cpu, /proc/cpuinfo is not portable

* Correctly handle lists from /sys

* Add Simultaneous Multithreading siblings

* Replace deprecated thread_siblings_list by core_cpus_list

* Reduce number of required slices

* Allow to send total values per core, socket and node

* Send all metrics with same time stamp
calcEventsetMetrics does only computiation, counter measurement is done before

* Input parameters should be float64 when evaluating to float64

* Send all metrics with same time stamp
calcGlobalMetrics does only computiation, counter measurement is done before

* Remove unused variable gmresults

* Add comments

* Updated go packages

* Add build with golang 1.21

* Switch to checkout action version 4

* Switch to setup-go action version 4

* Add workflow_dispatch to allow manual run of workflow

* Add workflow_dispatch to allow manual run of workflow

* Add release build jobs to runonce.yml

* Switch to golang 1.20 for RHEL based distributions

* Use dnf to download golang

* Remove golang versions before 1.20

* Upgrade Ubuntu focal -> jammy

* Pipe golang tar package directly to tar

* Update golang version

* Fix Ubuntu version number

* Add links to ipmi and redfish receivers

* Fix http server addr format

* github.com/influxdata/line-protocol -> github.com/influxdata/line-protocol/v2/lineprotocol

* Corrected spelling

* Add some comments

* github.com/influxdata/line-protocol -> github.com/influxdata/line-protocol/v2/lineprotocol

* Allow other fields not only field "value"

* Add some basic debugging documentation

* Add some basic debugging documentation

* Use a lock for the flush timer

* Add tags in lexical order as required by AddTag()

* Only access meta data, when it gets used as tag

* Use slice to store lexialicly orderd key value pairs

* Increase golang version requirement to 1.20.

* Avoid package cmp to allow builds with golang v1.20

* Fix: Error NVML library not found did crash
cc-metric-collector with "SIGSEGV: segmentation violation"

* Add config option idle_timeout

* Add basic authentication support

* Add basic authentication support

* Avoid unneccessary memory allocations

* Add documentation for send_*_total values

* Use generic package maps to clone maps

* Reuse flush timer

* Add Influx client options

* Reuse ccTopology functionality

* Do not store unused topology information

* Add batch_size config

* Cleanup

* Use stype and stype-id for the NIC in NetstatCollector

* Wait for concurrent flush operations to finish

* Be more verbose in error messages

* Reverted previous changes.
Made the code to complex without much advantages

* Use line protocol encoder

* Go pkg update

* Stop flush timer, when immediatelly flushing

* Fix: Corrected unlock access to batch slice

* Add config option to specify whether to use GZip compression in influx write requests

* Add asynchron send of encoder metrics

* Use DefaultServeMux instead of github.com/gorilla/mux

* Add config option for HTTP keep-alives

* Be more strict, when parsing json

* Add config option for HTTP request timeout and Retry interval

* Allow more then one background send operation

* Fix %sysusers_create_package args (#108)

%sysusers_create_package requires two arguments. See: https://github.com/systemd/systemd/blob/main/src/rpm/macros.systemd.in#L165

* Add nfsiostat to list of collectors

---------

Co-authored-by: Holger Obermaier <40787752+ho-ob@users.noreply.github.com>
Co-authored-by: Holger Obermaier <holgerob@gmx.de>
Co-authored-by: Obihörnchen <obihoernchende@gmail.com>
2023-12-04 12:21:26 +01:00

426 lines
12 KiB
Go

package ccTopology
import (
"fmt"
"log"
"os"
"path/filepath"
"regexp"
"strconv"
"strings"
cclogger "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
"golang.org/x/exp/slices"
)
const SYSFS_CPUBASE = `/sys/devices/system/cpu`
// Structure holding all information about a hardware thread
// See https://www.kernel.org/doc/Documentation/ABI/stable/sysfs-devices-system-cpu
type HwthreadEntry struct {
// for each CPUx:
CpuID int // CPU / hardware thread ID
SMT int // Simultaneous Multithreading ID
CoreCPUsList []int // CPUs within the same core
Core int // Socket local core ID
Socket int // Sockets (physical) ID
Die int // Die ID
NumaDomain int // NUMA Domain
}
var cache struct {
HwthreadList []int // List of CPU hardware threads
SMTList []int // List of symmetric hyper threading IDs
CoreList []int // List of CPU core IDs
SocketList []int // List of CPU sockets (physical) IDs
DieList []int // List of CPU Die IDs
NumaDomainList []int // List of NUMA Domains
CpuData []HwthreadEntry
}
// fileToInt reads an integer value from a sysfs file
// In case of an error -1 is returned
func fileToInt(path string) int {
buffer, err := os.ReadFile(path)
if err != nil {
log.Print(err)
cclogger.ComponentError("ccTopology", "fileToInt", "Reading", path, ":", err.Error())
return -1
}
stringBuffer := strings.TrimSpace(string(buffer))
id, err := strconv.Atoi(stringBuffer)
if err != nil {
cclogger.ComponentError("ccTopology", "fileToInt", "Parsing", path, ":", stringBuffer, err.Error())
return -1
}
return id
}
// fileToList reads a list from a sysfs file
// A list consists of value ranges separated by colon
// A range can be a single value or a range of values given by a startValue-endValue
// In case of an error nil is returned
func fileToList(path string) []int {
// Read list
buffer, err := os.ReadFile(path)
if err != nil {
log.Print(err)
cclogger.ComponentError("ccTopology", "fileToList", "Reading", path, ":", err.Error())
return nil
}
// Create list
list := make([]int, 0)
stringBuffer := strings.TrimSpace(string(buffer))
for _, valueRangeString := range strings.Split(stringBuffer, ",") {
valueRange := strings.Split(valueRangeString, "-")
switch len(valueRange) {
case 1:
singleValue, err := strconv.Atoi(valueRange[0])
if err != nil {
cclogger.ComponentError("CCTopology", "fileToList", "Parsing", valueRange[0], ":", err.Error())
return nil
}
list = append(list, singleValue)
case 2:
startValue, err := strconv.Atoi(valueRange[0])
if err != nil {
cclogger.ComponentError("CCTopology", "fileToList", "Parsing", valueRange[0], ":", err.Error())
return nil
}
endValue, err := strconv.Atoi(valueRange[1])
if err != nil {
cclogger.ComponentError("CCTopology", "fileToList", "Parsing", valueRange[1], ":", err.Error())
return nil
}
for value := startValue; value <= endValue; value++ {
list = append(list, value)
}
}
}
return list
}
// init initializes the cache structure
func init() {
getHWThreads :=
func() []int {
globPath := filepath.Join(SYSFS_CPUBASE, "cpu[0-9]*")
regexPath := filepath.Join(SYSFS_CPUBASE, "cpu([[:digit:]]+)")
regex := regexp.MustCompile(regexPath)
// File globbing for hardware threads
files, err := filepath.Glob(globPath)
if err != nil {
cclogger.ComponentError("CCTopology", "init:getHWThreads", err.Error())
return nil
}
hwThreadIDs := make([]int, len(files))
for i, file := range files {
// Extract hardware thread ID
matches := regex.FindStringSubmatch(file)
if len(matches) != 2 {
cclogger.ComponentError("CCTopology", "init:getHWThreads: Failed to extract hardware thread ID from ", file)
return nil
}
// Convert hardware thread ID to int
id, err := strconv.Atoi(matches[1])
if err != nil {
cclogger.ComponentError("CCTopology", "init:getHWThreads: Failed to convert to int hardware thread ID ", matches[1])
return nil
}
hwThreadIDs[i] = id
}
// Sort hardware thread IDs
slices.Sort(hwThreadIDs)
return hwThreadIDs
}
getNumaDomain :=
func(basePath string) int {
globPath := filepath.Join(basePath, "node*")
regexPath := filepath.Join(basePath, "node([[:digit:]]+)")
regex := regexp.MustCompile(regexPath)
// File globbing for NUMA node
files, err := filepath.Glob(globPath)
if err != nil {
cclogger.ComponentError("CCTopology", "init:getNumaDomain", err.Error())
return -1
}
// Check, that exactly one NUMA domain was found
if len(files) != 1 {
cclogger.ComponentError("CCTopology", "init:getNumaDomain", "Number of NUMA domains != 1: ", len(files))
return -1
}
// Extract NUMA node ID
matches := regex.FindStringSubmatch(files[0])
if len(matches) != 2 {
cclogger.ComponentError("CCTopology", "init:getNumaDomain", "Failed to extract NUMA node ID from: ", files[0])
return -1
}
id, err := strconv.Atoi(matches[1])
if err != nil {
cclogger.ComponentError("CCTopology", "init:getNumaDomain", "Failed to parse NUMA node ID from: ", matches[1])
return -1
}
return id
}
cache.HwthreadList = getHWThreads()
cache.CoreList = make([]int, len(cache.HwthreadList))
cache.SocketList = make([]int, len(cache.HwthreadList))
cache.DieList = make([]int, len(cache.HwthreadList))
cache.SMTList = make([]int, len(cache.HwthreadList))
cache.NumaDomainList = make([]int, len(cache.HwthreadList))
cache.CpuData = make([]HwthreadEntry, len(cache.HwthreadList))
for i, c := range cache.HwthreadList {
// Set cpuBase directory for topology lookup
cpuBase := filepath.Join(SYSFS_CPUBASE, fmt.Sprintf("cpu%d", c))
topoBase := filepath.Join(cpuBase, "topology")
// Lookup Core ID
cache.CoreList[i] = fileToInt(filepath.Join(topoBase, "core_id"))
// Lookup socket / physical package ID
cache.SocketList[i] = fileToInt(filepath.Join(topoBase, "physical_package_id"))
// Lookup CPU die id
cache.DieList[i] = fileToInt(filepath.Join(topoBase, "die_id"))
if cache.DieList[i] < 0 {
cache.DieList[i] = cache.SocketList[i]
}
// Lookup List of CPUs within the same core
coreCPUsList := fileToList(filepath.Join(topoBase, "core_cpus_list"))
// Find index of CPU ID in List of CPUs within the same core
// if not found return -1
cache.SMTList[i] = slices.Index(coreCPUsList, c)
// Lookup NUMA domain id
cache.NumaDomainList[i] = getNumaDomain(cpuBase)
cache.CpuData[i] =
HwthreadEntry{
CpuID: cache.HwthreadList[i],
SMT: cache.SMTList[i],
CoreCPUsList: coreCPUsList,
Socket: cache.SocketList[i],
NumaDomain: cache.NumaDomainList[i],
Die: cache.DieList[i],
Core: cache.CoreList[i],
}
}
slices.Sort(cache.HwthreadList)
cache.HwthreadList = slices.Compact(cache.HwthreadList)
slices.Sort(cache.SMTList)
cache.SMTList = slices.Compact(cache.SMTList)
slices.Sort(cache.CoreList)
cache.CoreList = slices.Compact(cache.CoreList)
slices.Sort(cache.SocketList)
cache.SocketList = slices.Compact(cache.SocketList)
slices.Sort(cache.DieList)
cache.DieList = slices.Compact(cache.DieList)
slices.Sort(cache.NumaDomainList)
cache.NumaDomainList = slices.Compact(cache.NumaDomainList)
}
// SocketList gets the list of CPU socket IDs
func SocketList() []int {
return slices.Clone(cache.SocketList)
}
// HwthreadList gets the list of hardware thread IDs in the order of listing in /proc/cpuinfo
func HwthreadList() []int {
return slices.Clone(cache.HwthreadList)
}
// Get list of hardware thread IDs in the order of listing in /proc/cpuinfo
// Deprecated! Use HwthreadList()
func CpuList() []int {
return HwthreadList()
}
// CoreList gets the list of CPU core IDs in the order of listing in /proc/cpuinfo
func CoreList() []int {
return slices.Clone(cache.CoreList)
}
// Get list of NUMA node IDs
func NumaNodeList() []int {
return slices.Clone(cache.NumaDomainList)
}
// DieList gets the list of CPU die IDs
func DieList() []int {
if len(cache.DieList) > 0 {
return slices.Clone(cache.DieList)
}
return SocketList()
}
// GetTypeList gets the list of specified type using the naming format inside ClusterCockpit
func GetTypeList(topology_type string) []int {
switch topology_type {
case "node":
return []int{0}
case "socket":
return SocketList()
case "die":
return DieList()
case "memoryDomain":
return NumaNodeList()
case "core":
return CoreList()
case "hwthread":
return HwthreadList()
}
return []int{}
}
// CpuData returns CPU data for each hardware thread
func CpuData() []HwthreadEntry {
// return a deep copy to protect cache data
c := slices.Clone(cache.CpuData)
for i := range c {
c[i].CoreCPUsList = slices.Clone(cache.CpuData[i].CoreCPUsList)
}
return c
}
// Structure holding basic information about a CPU
type CpuInformation struct {
NumHWthreads int
SMTWidth int
NumSockets int
NumDies int
NumCores int
NumNumaDomains int
}
// CpuInformation reports basic information about the CPU
func CpuInfo() CpuInformation {
return CpuInformation{
NumNumaDomains: len(cache.NumaDomainList),
SMTWidth: len(cache.SMTList),
NumDies: len(cache.DieList),
NumCores: len(cache.CoreList),
NumSockets: len(cache.SocketList),
NumHWthreads: len(cache.HwthreadList),
}
}
// GetHwthreadSocket gets the CPU socket ID for a given hardware thread ID
// In case hardware thread ID is not found -1 is returned
func GetHwthreadSocket(cpuID int) int {
for i := range cache.CpuData {
d := &cache.CpuData[i]
if d.CpuID == cpuID {
return d.Socket
}
}
return -1
}
// GetHwthreadNumaDomain gets the NUMA domain ID for a given hardware thread ID
// In case hardware thread ID is not found -1 is returned
func GetHwthreadNumaDomain(cpuID int) int {
for i := range cache.CpuData {
d := &cache.CpuData[i]
if d.CpuID == cpuID {
return d.NumaDomain
}
}
return -1
}
// GetHwthreadDie gets the CPU die ID for a given hardware thread ID
// In case hardware thread ID is not found -1 is returned
func GetHwthreadDie(cpuID int) int {
for i := range cache.CpuData {
d := &cache.CpuData[i]
if d.CpuID == cpuID {
return d.Die
}
}
return -1
}
// GetHwthreadCore gets the CPU core ID for a given hardware thread ID
// In case hardware thread ID is not found -1 is returned
func GetHwthreadCore(cpuID int) int {
for i := range cache.CpuData {
d := &cache.CpuData[i]
if d.CpuID == cpuID {
return d.Core
}
}
return -1
}
// GetSocketHwthreads gets all hardware thread IDs associated with a CPU socket
func GetSocketHwthreads(socket int) []int {
cpuList := make([]int, 0)
for i := range cache.CpuData {
d := &cache.CpuData[i]
if d.Socket == socket {
cpuList = append(cpuList, d.CpuID)
}
}
return cpuList
}
// GetNumaDomainHwthreads gets the all hardware thread IDs associated with a NUMA domain
func GetNumaDomainHwthreads(numaDomain int) []int {
cpuList := make([]int, 0)
for i := range cache.CpuData {
d := &cache.CpuData[i]
if d.NumaDomain == numaDomain {
cpuList = append(cpuList, d.CpuID)
}
}
return cpuList
}
// GetDieHwthreads gets all hardware thread IDs associated with a CPU die
func GetDieHwthreads(die int) []int {
cpuList := make([]int, 0)
for i := range cache.CpuData {
d := &cache.CpuData[i]
if d.Die == die {
cpuList = append(cpuList, d.CpuID)
}
}
return cpuList
}
// GetCoreHwthreads get all hardware thread IDs associated with a CPU core
func GetCoreHwthreads(core int) []int {
cpuList := make([]int, 0)
for i := range cache.CpuData {
d := &cache.CpuData[i]
if d.Core == core {
cpuList = append(cpuList, d.CpuID)
}
}
return cpuList
}