Compare commits

..

2 Commits

Author SHA1 Message Date
Holger Obermaier
bed5491068 Fix Overflows in Infiniband collector (#219)
* Add information about the used infiniband counters
* Change datatype from int64 to uint64
* uint64 subtraction handles wraparound automatically
* Compute total rates by summing up the xmit and recv rates.
This avoids overflows in the raw counters
* Check for cases where the current counter can not be saved as last state
* Use golang variable naming convention (camelCase)
2026-06-08 14:00:09 +02:00
dependabot[bot]
a2eba41150 Bump golang.design/x/thread
Bumps [golang.design/x/thread](https://github.com/golang-design/thread) from 0.0.0-20210122121316-335e9adffdf1 to 0.3.2.
- [Release notes](https://github.com/golang-design/thread/releases)
- [Commits](https://github.com/golang-design/thread/commits/v0.3.2)

---
updated-dependencies:
- dependency-name: golang.design/x/thread
  dependency-version: 0.3.2
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2026-06-08 13:10:27 +02:00
6 changed files with 101 additions and 523 deletions

View File

@@ -50,7 +50,6 @@ var AvailableCollectors = map[string]MetricCollector{
"nfsiostat": new(NfsIOStatCollector),
"slurm_cgroup": new(SlurmCgroupCollector),
"smartmon": new(SmartMonCollector),
"nvidia_gpm": new(NvidiaGPMCollector),
}
// Metric collector manager data structure
@@ -100,17 +99,17 @@ func (cm *collectorManager) Init(ticker mct.MultiChanTicker, duration time.Durat
// Initialize configured collectors
for collectorName, collectorCfg := range cm.config {
if _, found := AvailableCollectors[collectorName]; !found {
cclog.ComponentErrorf("CollectorManager", "SKIP unknown collector %s", collectorName)
cclog.ComponentError("CollectorManager", "SKIP unknown collector", collectorName)
continue
}
collector := AvailableCollectors[collectorName]
err := collector.Init(collectorCfg)
if err != nil {
cclog.ComponentErrorf("CollectorManager", "Collector %s initialization failed: %v", collectorName, err)
cclog.ComponentError("CollectorManager", fmt.Sprintf("Collector %s initialization failed: %v", collectorName, err))
continue
}
cclog.ComponentDebugf("CollectorManager", "ADD COLLECTOR %s", collector.Name())
cclog.ComponentDebug("CollectorManager", "ADD COLLECTOR", collector.Name())
if collector.Parallel() {
cm.collectors = append(cm.collectors, collector)
} else {
@@ -156,7 +155,7 @@ func (cm *collectorManager) Start() {
return
default:
// Read metrics from collector c via goroutine
cclog.ComponentDebugf("CollectorManager: Read %s at %v", c.Name(), t)
cclog.ComponentDebug("CollectorManager", c.Name(), t)
cm.collector_wg.Add(1)
go func(myc MetricCollector) {
myc.Read(cm.duration, cm.output)
@@ -174,7 +173,7 @@ func (cm *collectorManager) Start() {
return
default:
// Read metrics from collector c
cclog.ComponentDebugf("CollectorManager: Read %s at %v", c.Name(), t)
cclog.ComponentDebug("CollectorManager", c.Name(), t)
c.Read(cm.duration, cm.output)
}
}

View File

@@ -23,20 +23,29 @@ import (
"golang.org/x/sys/unix"
)
const IB_BASEPATH = "/sys/class/infiniband/"
// See: https://www.kernel.org/doc/Documentation/ABI/stable/sysfs-class-infiniband
const (
ibBasePath = "/sys/class/infiniband/"
ibDataUnit = "bytes"
ibDataRateUnit = ibDataUnit + "/sec"
ibPkgUnit = "packets"
ibPkgRateUnit = ibPkgUnit + "/sec"
)
type InfinibandCollectorMetric struct {
name string
path string
unit string
scale int64
addToIBTotal bool
addToIBTotalPkgs bool
lastState int64
name string
path string
unit string
unitRates string
scaleByFourLanes bool
addToIBTotal bool
addToIBTotalPkgs bool
lastState uint64
lastStateAvailable bool
}
type InfinibandCollectorInfo struct {
LID string // IB local Identifier (LID)
lid string // IB local Identifier (LID)
device string // IB device
port string // IB device port
portCounterFiles []InfinibandCollectorMetric // mapping counter name -> InfinibandCollectorMetric
@@ -56,7 +65,7 @@ type InfinibandCollector struct {
lastTimestamp time.Time // Store time stamp of last tick to derive bandwidths
}
// Init initializes the Infiniband collector by walking through files below IB_BASEPATH
// Init initializes the Infiniband collector by walking through files below ibBasePath
func (m *InfinibandCollector) Init(config json.RawMessage) error {
// Check if already initialized
if m.init {
@@ -87,7 +96,7 @@ func (m *InfinibandCollector) Init(config json.RawMessage) error {
}
// Loop for all InfiniBand directories
globPattern := filepath.Join(IB_BASEPATH, "*", "ports", "*")
globPattern := filepath.Join(ibBasePath, "*", "ports", "*")
ibDirs, err := filepath.Glob(globPattern)
if err != nil {
return fmt.Errorf("%s Init(): unable to glob files with pattern %s: %w", m.name, globPattern, err)
@@ -122,36 +131,42 @@ func (m *InfinibandCollector) Init(config json.RawMessage) error {
countersDir := filepath.Join(path, "counters")
portCounterFiles := []InfinibandCollectorMetric{
{
name: "ib_recv",
path: filepath.Join(countersDir, "port_rcv_data"),
unit: "bytes",
scale: 4,
addToIBTotal: true,
lastState: -1,
// Total number of data octets, divided by 4 (lanes), received on all VLs.
// This is 64 bit counter
name: "ib_recv",
path: filepath.Join(countersDir, "port_rcv_data"),
unit: ibDataUnit,
unitRates: ibDataRateUnit,
scaleByFourLanes: true,
addToIBTotal: true,
},
{
name: "ib_xmit",
path: filepath.Join(countersDir, "port_xmit_data"),
unit: "bytes",
scale: 4,
addToIBTotal: true,
lastState: -1,
// Total number of data octets, divided by 4 (lanes), transmitted on all VLs.
// This is 64 bit counter
name: "ib_xmit",
path: filepath.Join(countersDir, "port_xmit_data"),
unit: ibDataUnit,
unitRates: ibDataRateUnit,
scaleByFourLanes: true,
addToIBTotal: true,
},
{
// Total number of packets received on all VLs from this port (this may include packets containing Errors.
// This is 64 bit counter.
name: "ib_recv_pkts",
path: filepath.Join(countersDir, "port_rcv_packets"),
unit: "packets",
scale: 1,
unit: ibPkgUnit,
unitRates: ibPkgRateUnit,
addToIBTotalPkgs: true,
lastState: -1,
},
{
// Total number of packets transmitted on all VLs from this port. This may include packets with errors.
// This is 64 bit counter.
name: "ib_xmit_pkts",
path: filepath.Join(countersDir, "port_xmit_packets"),
unit: "packets",
scale: 1,
unit: ibPkgUnit,
unitRates: ibPkgRateUnit,
addToIBTotalPkgs: true,
lastState: -1,
},
}
for _, counter := range portCounterFiles {
@@ -163,7 +178,7 @@ func (m *InfinibandCollector) Init(config json.RawMessage) error {
m.info = append(m.info,
InfinibandCollectorInfo{
LID: LID,
lid: LID,
device: device,
port: port,
portCounterFiles: portCounterFiles,
@@ -184,7 +199,7 @@ func (m *InfinibandCollector) Init(config json.RawMessage) error {
return nil
}
// Read reads Infiniband counter files below IB_BASEPATH
// Read reads Infiniband counter files below ibBasePath
func (m *InfinibandCollector) Read(interval time.Duration, output chan lp.CCMessage) {
// Check if already initialized
if !m.init {
@@ -201,9 +216,9 @@ func (m *InfinibandCollector) Read(interval time.Duration, output chan lp.CCMess
for i := range m.info {
info := &m.info[i]
var ib_total, ib_total_last_state,
ib_total_pkts, ib_total_pkts_last_state int64
var ib_total_last_state_available, ib_total_pkts_last_state_available bool
var ibTotal, ibTotalPkts uint64 // sum of xmit and recv counters
var ibTotalBw, ibTotalPktsBw float64 // sum of xmit and recv rates
var ibTotalBwAvailable, ibTotalPktsBwAvailable bool
for i := range info.portCounterFiles {
counterDef := &info.portCounterFiles[i]
@@ -213,24 +228,30 @@ func (m *InfinibandCollector) Read(interval time.Duration, output chan lp.CCMess
cclog.ComponentError(
m.name,
fmt.Sprintf("Read(): Failed to read from file '%s': %v", counterDef.path, err))
// Current counter can not be saved as last state
counterDef.lastStateAvailable = false
continue
}
data := strings.TrimSpace(string(line))
// convert counter to int64
v, err := strconv.ParseInt(data, 10, 64)
// convert counter to uint64
vRawCounter, err := strconv.ParseUint(data, 10, 64)
if err != nil {
cclog.ComponentError(
m.name,
fmt.Sprintf("Read(): Failed to convert Infininiband metrice %s='%s' to int64: %v", counterDef.name, data, err))
fmt.Sprintf("Read(): Failed to convert Infininiband metrice %s='%s' to uint64: %v", counterDef.name, data, err))
// Current counter can not be saved as last state
counterDef.lastStateAvailable = false
continue
}
// Scale raw value
v *= counterDef.scale
vScaledCounter := vRawCounter
if counterDef.scaleByFourLanes {
vScaledCounter *= uint64(4)
}
// Send absolut values
if m.config.SendAbsoluteValues {
if y, err := lp.NewMetric(counterDef.name, info.tagSet, m.meta, v, now); err == nil {
if y, err := lp.NewMetric(counterDef.name, info.tagSet, m.meta, vScaledCounter, now); err == nil {
y.AddMeta("unit", counterDef.unit)
output <- y
}
@@ -238,63 +259,72 @@ func (m *InfinibandCollector) Read(interval time.Duration, output chan lp.CCMess
// Send derived values
if m.config.SendDerivedValues {
if counterDef.lastState >= 0 {
rate := float64((v - counterDef.lastState)) / timeDiff
if counterDef.lastStateAvailable {
var rate float64
// uint64 subtraction handles wraparound automatically
// in case vRawCounter < counterDef.lastState we would compute:
// math.MaxUint64 - lastState + vRawCounter + 1
// = (2^64 - 1) - lastState + vRawCounter + 1
// = 2^64 - lastState + vRawCounter
// ≡ vRawCounter - lastState (mod 2^64)
rate = float64(vRawCounter-counterDef.lastState) / timeDiff
if counterDef.scaleByFourLanes {
rate *= float64(4)
}
if y, err := lp.NewMetric(counterDef.name+"_bw", info.tagSet, m.meta, rate, now); err == nil {
y.AddMeta("unit", counterDef.unit+"/sec")
y.AddMeta("unit", counterDef.unitRates)
output <- y
}
// Sum up total values of last state
// Sum up rates for total rates
if m.config.SendTotalValues {
switch {
case counterDef.addToIBTotal:
ib_total_last_state += counterDef.lastState
ib_total_last_state_available = true
ibTotalBw += rate
ibTotalBwAvailable = true
case counterDef.addToIBTotalPkgs:
ib_total_pkts_last_state += counterDef.lastState
ib_total_pkts_last_state_available = true
ibTotalPktsBw += rate
ibTotalPktsBwAvailable = true
}
}
}
counterDef.lastState = v
counterDef.lastState = vRawCounter
counterDef.lastStateAvailable = true
}
// Sum up total values
if m.config.SendTotalValues {
switch {
case counterDef.addToIBTotal:
ib_total += v
ibTotal += vScaledCounter
case counterDef.addToIBTotalPkgs:
ib_total_pkts += v
ibTotalPkts += vScaledCounter
}
}
}
// Send total values
if m.config.SendTotalValues {
if y, err := lp.NewMetric("ib_total", info.tagSet, m.meta, ib_total, now); err == nil {
y.AddMeta("unit", "bytes")
if y, err := lp.NewMetric("ib_total", info.tagSet, m.meta, ibTotal, now); err == nil {
y.AddMeta("unit", ibDataUnit)
output <- y
}
if y, err := lp.NewMetric("ib_total_pkts", info.tagSet, m.meta, ib_total_pkts, now); err == nil {
y.AddMeta("unit", "packets")
if y, err := lp.NewMetric("ib_total_pkts", info.tagSet, m.meta, ibTotalPkts, now); err == nil {
y.AddMeta("unit", ibPkgUnit)
output <- y
}
if m.config.SendDerivedValues && ib_total_last_state_available {
rate := float64((ib_total - ib_total_last_state)) / timeDiff
if y, err := lp.NewMetric("ib_total_bw", info.tagSet, m.meta, rate, now); err == nil {
y.AddMeta("unit", "bytes/sec")
if m.config.SendDerivedValues && ibTotalBwAvailable {
if y, err := lp.NewMetric("ib_total_bw", info.tagSet, m.meta, ibTotalBw, now); err == nil {
y.AddMeta("unit", ibDataRateUnit)
output <- y
}
}
if m.config.SendDerivedValues && ib_total_pkts_last_state_available {
rate := float64((ib_total_pkts - ib_total_pkts_last_state)) / timeDiff
if y, err := lp.NewMetric("ib_total_pkts_bw", info.tagSet, m.meta, rate, now); err == nil {
y.AddMeta("unit", "packets/sec")
if m.config.SendDerivedValues && ibTotalPktsBwAvailable {
if y, err := lp.NewMetric("ib_total_pkts_bw", info.tagSet, m.meta, ibTotalPktsBw, now); err == nil {
y.AddMeta("unit", ibPkgRateUnit)
output <- y
}
}

View File

@@ -1,396 +0,0 @@
package collectors
import (
"encoding/json"
"errors"
"fmt"
"slices"
"strconv"
"strings"
"time"
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
lp "github.com/ClusterCockpit/cc-lib/v2/ccMessage"
"github.com/NVIDIA/go-nvml/pkg/nvml"
)
type NvidiaGPMMetricDef struct {
name string
outname string
id nvml.GpmMetricId
unit string
}
var NvidiaGPMMetrics []NvidiaGPMMetricDef = []NvidiaGPMMetricDef{
{
name: "GRAPHICS_UTIL",
outname: "nv_gpm_graphics_util",
id: nvml.GPM_METRIC_GRAPHICS_UTIL,
unit: "%",
},
{
name: "SM_UTIL",
outname: "nv_gpm_sm_util",
id: nvml.GPM_METRIC_SM_UTIL,
unit: "%",
},
{
name: "SM_OCCUPANCY",
outname: "nv_gpm_sm_occupancy",
id: nvml.GPM_METRIC_SM_OCCUPANCY,
unit: "%",
},
{
name: "INTEGER_UTIL",
outname: "nv_gpm_integer_util",
id: nvml.GPM_METRIC_INTEGER_UTIL,
unit: "%",
},
{
name: "ANY_TENSOR_UTIL",
outname: "nv_gpm_any_tensor_util",
id: nvml.GPM_METRIC_ANY_TENSOR_UTIL,
unit: "%",
},
{
name: "DFMA_TENSOR_UTIL",
outname: "nv_gpm_dfma_tensor_util",
id: nvml.GPM_METRIC_DFMA_TENSOR_UTIL,
unit: "%",
},
{
name: "HMMA_TENSOR_UTIL",
outname: "nv_gpm_hmma_tensor_util",
id: nvml.GPM_METRIC_HMMA_TENSOR_UTIL,
unit: "%",
},
{
name: "IMMA_TENSOR_UTIL",
outname: "nv_gpm_imma_tensor_util",
id: nvml.GPM_METRIC_IMMA_TENSOR_UTIL,
unit: "%",
},
{
name: "DRAM_BW_UTIL",
outname: "nv_gpm_dram_bw_util",
id: nvml.GPM_METRIC_DRAM_BW_UTIL,
unit: "%",
},
{
name: "FP64_UTIL",
outname: "nv_gpm_fp64_util",
id: nvml.GPM_METRIC_FP64_UTIL,
unit: "%",
},
{
name: "FP32_UTIL",
outname: "nv_gpm_fp32_util",
id: nvml.GPM_METRIC_FP32_UTIL,
unit: "%",
},
{
name: "FP16_UTIL",
outname: "nv_gpm_fp16_util",
id: nvml.GPM_METRIC_FP16_UTIL,
unit: "%",
},
}
type NvidiaGPMCollectorConfig struct {
Metrics []string `json:"metrics,omitempty"`
ExcludeDevices []string `json:"exclude_devices,omitempty"`
AddPciInfoTag bool `json:"add_pci_info_tag,omitempty"`
UsePciInfoAsTypeId bool `json:"use_pci_info_as_type_id,omitempty"`
AddUuidMeta bool `json:"add_uuid_meta,omitempty"`
AddBoardNumberMeta bool `json:"add_board_number_meta,omitempty"`
AddSerialMeta bool `json:"add_serial_meta,omitempty"`
ProcessMigDevices bool `json:"process_mig_devices,omitempty"`
UseUuidForMigDevices bool `json:"use_uuid_for_mig_device,omitempty"`
UseSliceForMigDevices bool `json:"use_slice_for_mig_device,omitempty"`
}
type NvidiaGPMCollectorDevice struct {
device nvml.Device
tags map[string]string
meta map[string]string
startTime time.Time
endTime time.Time
measurement nvml.GpmMetricsGetType
metricsLookup map[int]NvidiaGPMMetricDef
}
type NvidiaGPMCollector struct {
metricCollector
config NvidiaGPMCollectorConfig
gpus []NvidiaGPMCollectorDevice
num_gpus int
}
func (m *NvidiaGPMCollector) Init(config json.RawMessage) error {
var err error = nil
m.name = "NvidiaGPMCollector"
m.parallel = true
if err := m.setup(); err != nil {
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
}
if len(config) > 0 {
d := json.NewDecoder(strings.NewReader(string(config)))
d.DisallowUnknownFields()
if err = d.Decode(&m.config); err != nil {
return fmt.Errorf("%s Init(): Error decoding JSON config: %w", m.name, err)
}
}
m.meta = map[string]string{
"source": m.name,
"group": "NvidiaGPM",
}
// Initialize NVIDIA Management Library (NVML)
ret := nvml.Init()
// Error: NVML library not found
// (nvml.ErrorString can not be used in this case)
if ret == nvml.ERROR_LIBRARY_NOT_FOUND {
return fmt.Errorf("%s Init(): NVML library not found", m.name)
}
if ret != nvml.SUCCESS {
err = errors.New(nvml.ErrorString(ret))
return fmt.Errorf("%s Init(): Unable to initialize NVML: %w", m.name, err)
}
// Number of NVIDIA GPUs
num_gpus, ret := nvml.DeviceGetCount()
if ret != nvml.SUCCESS {
err = errors.New(nvml.ErrorString(ret))
return fmt.Errorf("%s Init(): Unable to get device count: %w", m.name, err)
}
// For all GPUs
m.gpus = make([]NvidiaGPMCollectorDevice, 0, num_gpus)
for i := range num_gpus {
// Skip excluded devices by ID
str_i := strconv.Itoa(i)
if slices.Contains(m.config.ExcludeDevices, str_i) {
cclog.ComponentDebugf(m.name, "Skipping excluded device %s", str_i)
continue
}
// Get device handle
device, ret := nvml.DeviceGetHandleByIndex(i)
if ret != nvml.SUCCESS {
err = errors.New(nvml.ErrorString(ret))
cclog.ComponentErrorf(m.name, "Unable to get device at index %d: %s", i, err.Error())
continue
}
supportInfo, ret := nvml.GpmQueryDeviceSupport(device)
if ret != nvml.SUCCESS {
err = errors.New(nvml.ErrorString(ret))
cclog.ComponentErrorf(m.name, "Unable to query GPM support for device at index %d: %s", i, err.Error())
continue
} else {
if supportInfo.IsSupportedDevice == uint32(nvml.FEATURE_DISABLED) {
cclog.ComponentErrorf(m.name, "Device at index %d does not support GPM metrics", i)
continue
}
}
stream, ret := nvml.GpmQueryIfStreamingEnabled(device)
if ret != nvml.SUCCESS {
err = errors.New(nvml.ErrorString(ret))
cclog.ComponentErrorf(m.name, "Unable to query GPM streaming for device at index %d: %s", i, err.Error())
continue
} else {
if stream == uint32(nvml.FEATURE_DISABLED) {
ret = nvml.GpmSetStreamingEnabled(device, uint32(nvml.FEATURE_ENABLED))
if ret != nvml.SUCCESS {
err = errors.New(nvml.ErrorString(ret))
cclog.ComponentErrorf(m.name, "Unable to set streaming mode for device at index %d: %s", i, err.Error())
}
}
}
// Get device's PCI info
pciInfo, ret := nvml.DeviceGetPciInfo(device)
if ret != nvml.SUCCESS {
err = errors.New(nvml.ErrorString(ret))
cclog.ComponentErrorf(m.name, "Unable to get PCI info for device at index %d: %s", i, err.Error())
continue
}
// Create PCI ID in the common format used by the NVML.
pci_id := fmt.Sprintf(
nvml.DEVICE_PCI_BUS_ID_FMT,
pciInfo.Domain,
pciInfo.Bus,
pciInfo.Device)
// Skip excluded devices specified by PCI ID
if slices.Contains(m.config.ExcludeDevices, pci_id) {
cclog.ComponentDebugf(m.name, "Skipping excluded device %s", pci_id)
continue
}
ss, nvmlErr := nvml.GpmSampleAlloc()
if nvmlErr != nvml.SUCCESS {
err = errors.New(nvml.ErrorString(ret))
cclog.ComponentErrorf(m.name, "Failed to allocate GPM sample for device %d: %s", i, err.Error())
continue
}
es, nvmlErr := nvml.GpmSampleAlloc()
if nvmlErr != nvml.SUCCESS {
err = errors.New(nvml.ErrorString(ret))
cclog.ComponentErrorf(m.name, "Failed to allocate GPM sample for device %d: %s", i, err.Error())
continue
}
// Select which value to use as 'type-id'.
// The PCI ID is commonly required in SLURM environments because the
// numberic IDs used by SLURM and the ones used by NVML might differ
// depending on the job type. The PCI ID is more reliable but is commonly
// not recorded for a job, so it must be added manually in prologue or epilogue
// e.g. to the comment field
tid := str_i
if m.config.UsePciInfoAsTypeId {
tid = pci_id
}
// Now we got all infos together, populate the device list
g := NvidiaGPMCollectorDevice{}
// Add device handle
g.device = device
// Add tags
g.tags = map[string]string{
"type": "accelerator",
"type-id": tid,
}
// Add PCI info as tag if not already used as 'type-id'
if m.config.AddPciInfoTag && !m.config.UsePciInfoAsTypeId {
g.tags["pci_identifier"] = pci_id
}
g.meta = map[string]string{
"source": m.name,
"group": "Nvidia",
}
if m.config.AddBoardNumberMeta {
board, ret := nvml.DeviceGetBoardPartNumber(device)
if ret != nvml.SUCCESS {
err = errors.New(nvml.ErrorString(ret))
cclog.ComponentError(m.name, "Unable to get boart part number for device at index", i, ":", err.Error())
} else {
g.meta["board_number"] = board
}
}
if m.config.AddSerialMeta {
serial, ret := nvml.DeviceGetSerial(device)
if ret != nvml.SUCCESS {
err = errors.New(nvml.ErrorString(ret))
cclog.ComponentError(m.name, "Unable to get serial number for device at index", i, ":", err.Error())
} else {
g.meta["serial"] = serial
}
}
if m.config.AddUuidMeta {
uuid, ret := nvml.DeviceGetUUID(device)
if ret != nvml.SUCCESS {
err = errors.New(nvml.ErrorString(ret))
cclog.ComponentError(m.name, "Unable to get UUID for device at index", i, ":", err.Error())
} else {
g.meta["uuid"] = uuid
}
}
g.measurement.Sample1 = ss
g.measurement.Sample2 = es
g.measurement.Version = nvml.GPM_METRICS_GET_VERSION
g.metricsLookup = make(map[int]NvidiaGPMMetricDef)
metIdx := 0
for _, inmetric := range m.config.Metrics {
for _, defmetric := range NvidiaGPMMetrics {
if inmetric == defmetric.outname || inmetric == defmetric.name {
g.measurement.Metrics[metIdx] = nvml.GpmMetric{
MetricId: uint32(defmetric.id),
}
g.metricsLookup[metIdx] = defmetric
metIdx += 1
}
}
}
g.measurement.NumMetrics = uint32(metIdx)
m.gpus = append(m.gpus, g)
}
cclog.ComponentDebugf(m.name, "Found %d Nvidia GPUs with GPM support", len(m.gpus))
m.num_gpus = len(m.gpus)
m.init = true
return err
}
func (m *NvidiaGPMCollector) Read(interval time.Duration, output chan lp.CCMessage) {
var err error
if !m.init {
return
}
for i, gpu := range m.gpus {
gpu.startTime = time.Now()
nvmlErr := gpu.measurement.Sample1.Get(gpu.device)
if nvmlErr != nvml.SUCCESS {
err = errors.New(nvml.ErrorString(nvmlErr))
cclog.ComponentError(m.name, "Unable to get start GPM sample for device at index", i, ":", err.Error())
continue
}
}
time.Sleep(interval)
for i, gpu := range m.gpus {
gpu.endTime = time.Now()
nvmlErr := gpu.measurement.Sample2.Get(gpu.device)
if nvmlErr != nvml.SUCCESS {
err = errors.New(nvml.ErrorString(nvmlErr))
cclog.ComponentError(m.name, "Unable to get stop GPM sample for device at index", i, ":", err.Error())
continue
}
}
for i, gpu := range m.gpus {
nvmlErr := nvml.GpmMetricsGet(&gpu.measurement)
if nvmlErr != nvml.SUCCESS {
err = errors.New(nvml.ErrorString(nvmlErr))
cclog.ComponentError(m.name, "Unable to get evaluate GPM sample for device at index", i, ":", err.Error())
continue
}
for idx, metricDef := range gpu.metricsLookup {
y, err := lp.NewMetric(metricDef.outname, gpu.tags, gpu.meta, gpu.measurement.Metrics[idx].Value, time.Now())
if err == nil {
y.AddMeta("unit", metricDef.unit)
output <- y
}
}
}
}
func (m *NvidiaGPMCollector) Close() {
if m.init {
for i, gpu := range m.gpus {
ret := gpu.measurement.Sample1.Free()
if ret != nvml.SUCCESS {
err := errors.New(nvml.ErrorString(ret))
cclog.ComponentErrorf(m.name, "Unable to free start sample for device at index %d: %s", i, err.Error())
}
ret = gpu.measurement.Sample2.Free()
if ret != nvml.SUCCESS {
err := errors.New(nvml.ErrorString(ret))
cclog.ComponentErrorf(m.name, "Unable to free stop sample for device at index %d: %s", i, err.Error())
}
}
if ret := nvml.Shutdown(); ret != nvml.SUCCESS {
cclog.ComponentError(m.name, "nvml.Shutdown() not successful")
}
m.init = false
}
}

View File

@@ -1,54 +0,0 @@
<!--
---
title: "Nvidia NVML GPM metric collector"
description: Collect metrics for Nvidia GPUs using the NVML GPM interface
categories: [cc-metric-collector]
tags: ['Admin']
weight: 2
hugo_path: docs/reference/cc-metric-collector/collectors/nvidiaGPM.md
---
-->
## `nvidiaGPM` collector
```json
"nvidia_gpm": {
"metrics": [
"nv_fb_mem_used",
"nv_fan"
],
"exclude_devices": [
"0","1", "0000000:ff:01.0"
],
"process_mig_devices": false,
"use_pci_info_as_type_id": true,
"add_pci_info_tag": false,
"add_uuid_meta": false,
"add_board_number_meta": false,
"add_serial_meta": false,
"use_uuid_for_mig_device": false,
"use_slice_for_mig_device": false
}
```
The `nvidia_gpm` collector can be configured to leave out specific devices with the `exclude_devices` option. It takes IDs as supplied to the NVML with `nvmlDeviceGetHandleByIndex()` or the PCI address in NVML format (`%08X:%02X:%02X.0`). Commonly only the physical GPUs are monitored. If MIG devices should be analyzed as well, set `process_mig_devices` (adds `stype=mig,stype-id=<mig_index>`). With the options `use_uuid_for_mig_device` and `use_slice_for_mig_device`, the `<mig_index>` can be replaced with the UUID (e.g. `MIG-6a9f7cc8-6d5b-5ce0-92de-750edc4d8849`) or the MIG slice name (e.g. `1g.5gb`).
The metrics sent by the `nvidia_gpm` collector use `accelerator` as `type` tag. For the `type-id`, it uses the device handle index by default. With the `use_pci_info_as_type_id` option, the PCI ID is used instead. If both values should be added as tags, activate the `add_pci_info_tag` option. It uses the device handle index as `type-id` and adds the PCI ID as separate `pci_identifier` tag.
Optionally, it is possible to add the UUID, the board part number and the serial to the meta informations. They are not sent to the sinks (if not configured otherwise).
Available Metrics:
* `nv_gpm_graphics_util`
* `nv_gpm_sm_util`
* `nv_gpm_sm_occupancy`
* `nv_gpm_integer_util`
* `nv_gpm_any_tensor_util`
* `nv_gpm_dfma_tensor_util`
* `nv_gpm_hmma_tensor_util`
* `nv_gpm_imma_tensor_util`
* `nv_gpm_dram_bw_util`
* `nv_gpm_fp64_util`
* `nv_gpm_fp32_util`
* `nv_gpm_fp16_util`

2
go.mod
View File

@@ -9,7 +9,7 @@ require (
github.com/PaesslerAG/gval v1.2.4
github.com/fsnotify/fsnotify v1.10.1
github.com/tklauser/go-sysconf v0.4.0
golang.design/x/thread v0.0.0-20210122121316-335e9adffdf1
golang.design/x/thread v0.3.2
golang.org/x/sys v0.45.0
)

5
go.sum
View File

@@ -173,8 +173,8 @@ go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.yaml.in/yaml/v2 v2.4.4 h1:tuyd0P+2Ont/d6e2rl3be67goVK4R6deVxCUX5vyPaQ=
go.yaml.in/yaml/v2 v2.4.4/go.mod h1:gMZqIpDtDqOfM0uNfy0SkpRhvUryYH0Z6wdMYcacYXQ=
golang.design/x/thread v0.0.0-20210122121316-335e9adffdf1 h1:P7S/GeHBAFEZIYp0ePPs2kHXoazz8q2KsyxHyQVGCJg=
golang.design/x/thread v0.0.0-20210122121316-335e9adffdf1/go.mod h1:9CWpnTUmlQkfdpdutA1nNf4iE5lAVt3QZOu0Z6hahBE=
golang.design/x/thread v0.3.2 h1:FmD1glspGrQCe6FuQLmSrT6wz2CSzq7vKVDluyiMnqo=
golang.design/x/thread v0.3.2/go.mod h1:6+Hi2rMOgMHZdKDWaqNHyWtoFUx1HxZ06LfHPh5Z/hQ=
golang.org/x/crypto v0.50.0 h1:zO47/JPrL6vsNkINmLoo/PH1gcxpls50DNogFvB5ZGI=
golang.org/x/crypto v0.50.0/go.mod h1:3muZ7vA7PBCE6xgPX7nkzzjiUq87kRItoJQM1Yo8S+Q=
golang.org/x/exp v0.0.0-20231005195138-3e424a577f31 h1:9k5exFQKQglLo+RoP+4zMjOFE14P6+vyR0baDAi0Rcs=
@@ -183,7 +183,6 @@ golang.org/x/mod v0.13.0 h1:I/DsJXRlw/8l/0c24sM9yb0T4z9liZTduXvdAWYiysY=
golang.org/x/mod v0.13.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/net v0.53.0 h1:d+qAbo5L0orcWAr0a9JweQpjXF19LMXJE8Ey7hwOdUA=
golang.org/x/net v0.53.0/go.mod h1:JvMuJH7rrdiCfbeHoo3fCQU24Lf5JJwT9W3sJFulfgs=
golang.org/x/sys v0.0.0-20210122093101-04d7465088b8/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.45.0 h1:dO4czNzziLiiXplLQgBCEpCvXQ3dnkn0SdaZSYdQ+FY=
golang.org/x/sys v0.45.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
golang.org/x/time v0.15.0 h1:bbrp8t3bGUeFOx08pvsMYRTCVSMk89u4tKbNOZbp88U=