Merge branch 'main' of github.com:ClusterCockpit/cc-metric-collector

This commit is contained in:
Thomas Roehl
2026-06-30 12:49:50 +02:00
6 changed files with 158 additions and 73 deletions
+82 -52
View File
@@ -23,20 +23,29 @@ import (
"golang.org/x/sys/unix"
)
const IB_BASEPATH = "/sys/class/infiniband/"
// See: https://www.kernel.org/doc/Documentation/ABI/stable/sysfs-class-infiniband
const (
ibBasePath = "/sys/class/infiniband/"
ibDataUnit = "bytes"
ibDataRateUnit = ibDataUnit + "/sec"
ibPkgUnit = "packets"
ibPkgRateUnit = ibPkgUnit + "/sec"
)
type InfinibandCollectorMetric struct {
name string
path string
unit string
scale int64
unitRates string
scaleByFourLanes bool
addToIBTotal bool
addToIBTotalPkgs bool
lastState int64
lastState uint64
lastStateAvailable bool
}
type InfinibandCollectorInfo struct {
LID string // IB local Identifier (LID)
lid string // IB local Identifier (LID)
device string // IB device
port string // IB device port
portCounterFiles []InfinibandCollectorMetric // mapping counter name -> InfinibandCollectorMetric
@@ -56,7 +65,7 @@ type InfinibandCollector struct {
lastTimestamp time.Time // Store time stamp of last tick to derive bandwidths
}
// Init initializes the Infiniband collector by walking through files below IB_BASEPATH
// Init initializes the Infiniband collector by walking through files below ibBasePath
func (m *InfinibandCollector) Init(config json.RawMessage) error {
// Check if already initialized
if m.init {
@@ -87,7 +96,7 @@ func (m *InfinibandCollector) Init(config json.RawMessage) error {
}
// Loop for all InfiniBand directories
globPattern := filepath.Join(IB_BASEPATH, "*", "ports", "*")
globPattern := filepath.Join(ibBasePath, "*", "ports", "*")
ibDirs, err := filepath.Glob(globPattern)
if err != nil {
return fmt.Errorf("%s Init(): unable to glob files with pattern %s: %w", m.name, globPattern, err)
@@ -122,36 +131,42 @@ func (m *InfinibandCollector) Init(config json.RawMessage) error {
countersDir := filepath.Join(path, "counters")
portCounterFiles := []InfinibandCollectorMetric{
{
// Total number of data octets, divided by 4 (lanes), received on all VLs.
// This is 64 bit counter
name: "ib_recv",
path: filepath.Join(countersDir, "port_rcv_data"),
unit: "bytes",
scale: 4,
unit: ibDataUnit,
unitRates: ibDataRateUnit,
scaleByFourLanes: true,
addToIBTotal: true,
lastState: -1,
},
{
// Total number of data octets, divided by 4 (lanes), transmitted on all VLs.
// This is 64 bit counter
name: "ib_xmit",
path: filepath.Join(countersDir, "port_xmit_data"),
unit: "bytes",
scale: 4,
unit: ibDataUnit,
unitRates: ibDataRateUnit,
scaleByFourLanes: true,
addToIBTotal: true,
lastState: -1,
},
{
// Total number of packets received on all VLs from this port (this may include packets containing Errors.
// This is 64 bit counter.
name: "ib_recv_pkts",
path: filepath.Join(countersDir, "port_rcv_packets"),
unit: "packets",
scale: 1,
unit: ibPkgUnit,
unitRates: ibPkgRateUnit,
addToIBTotalPkgs: true,
lastState: -1,
},
{
// Total number of packets transmitted on all VLs from this port. This may include packets with errors.
// This is 64 bit counter.
name: "ib_xmit_pkts",
path: filepath.Join(countersDir, "port_xmit_packets"),
unit: "packets",
scale: 1,
unit: ibPkgUnit,
unitRates: ibPkgRateUnit,
addToIBTotalPkgs: true,
lastState: -1,
},
}
for _, counter := range portCounterFiles {
@@ -163,7 +178,7 @@ func (m *InfinibandCollector) Init(config json.RawMessage) error {
m.info = append(m.info,
InfinibandCollectorInfo{
LID: LID,
lid: LID,
device: device,
port: port,
portCounterFiles: portCounterFiles,
@@ -184,7 +199,7 @@ func (m *InfinibandCollector) Init(config json.RawMessage) error {
return nil
}
// Read reads Infiniband counter files below IB_BASEPATH
// Read reads Infiniband counter files below ibBasePath
func (m *InfinibandCollector) Read(interval time.Duration, output chan lp.CCMessage) {
// Check if already initialized
if !m.init {
@@ -201,9 +216,9 @@ func (m *InfinibandCollector) Read(interval time.Duration, output chan lp.CCMess
for i := range m.info {
info := &m.info[i]
var ib_total, ib_total_last_state,
ib_total_pkts, ib_total_pkts_last_state int64
var ib_total_last_state_available, ib_total_pkts_last_state_available bool
var ibTotal, ibTotalPkts uint64 // sum of xmit and recv counters
var ibTotalBw, ibTotalPktsBw float64 // sum of xmit and recv rates
var ibTotalBwAvailable, ibTotalPktsBwAvailable bool
for i := range info.portCounterFiles {
counterDef := &info.portCounterFiles[i]
@@ -213,24 +228,30 @@ func (m *InfinibandCollector) Read(interval time.Duration, output chan lp.CCMess
cclog.ComponentError(
m.name,
fmt.Sprintf("Read(): Failed to read from file '%s': %v", counterDef.path, err))
// Current counter can not be saved as last state
counterDef.lastStateAvailable = false
continue
}
data := strings.TrimSpace(string(line))
// convert counter to int64
v, err := strconv.ParseInt(data, 10, 64)
// convert counter to uint64
vRawCounter, err := strconv.ParseUint(data, 10, 64)
if err != nil {
cclog.ComponentError(
m.name,
fmt.Sprintf("Read(): Failed to convert Infininiband metrice %s='%s' to int64: %v", counterDef.name, data, err))
fmt.Sprintf("Read(): Failed to convert Infininiband metrice %s='%s' to uint64: %v", counterDef.name, data, err))
// Current counter can not be saved as last state
counterDef.lastStateAvailable = false
continue
}
// Scale raw value
v *= counterDef.scale
vScaledCounter := vRawCounter
if counterDef.scaleByFourLanes {
vScaledCounter *= uint64(4)
}
// Send absolut values
if m.config.SendAbsoluteValues {
if y, err := lp.NewMetric(counterDef.name, info.tagSet, m.meta, v, now); err == nil {
if y, err := lp.NewMetric(counterDef.name, info.tagSet, m.meta, vScaledCounter, now); err == nil {
y.AddMeta("unit", counterDef.unit)
output <- y
}
@@ -238,63 +259,72 @@ func (m *InfinibandCollector) Read(interval time.Duration, output chan lp.CCMess
// Send derived values
if m.config.SendDerivedValues {
if counterDef.lastState >= 0 {
rate := float64((v - counterDef.lastState)) / timeDiff
if counterDef.lastStateAvailable {
var rate float64
// uint64 subtraction handles wraparound automatically
// in case vRawCounter < counterDef.lastState we would compute:
// math.MaxUint64 - lastState + vRawCounter + 1
// = (2^64 - 1) - lastState + vRawCounter + 1
// = 2^64 - lastState + vRawCounter
// ≡ vRawCounter - lastState (mod 2^64)
rate = float64(vRawCounter-counterDef.lastState) / timeDiff
if counterDef.scaleByFourLanes {
rate *= float64(4)
}
if y, err := lp.NewMetric(counterDef.name+"_bw", info.tagSet, m.meta, rate, now); err == nil {
y.AddMeta("unit", counterDef.unit+"/sec")
y.AddMeta("unit", counterDef.unitRates)
output <- y
}
// Sum up total values of last state
// Sum up rates for total rates
if m.config.SendTotalValues {
switch {
case counterDef.addToIBTotal:
ib_total_last_state += counterDef.lastState
ib_total_last_state_available = true
ibTotalBw += rate
ibTotalBwAvailable = true
case counterDef.addToIBTotalPkgs:
ib_total_pkts_last_state += counterDef.lastState
ib_total_pkts_last_state_available = true
ibTotalPktsBw += rate
ibTotalPktsBwAvailable = true
}
}
}
counterDef.lastState = v
counterDef.lastState = vRawCounter
counterDef.lastStateAvailable = true
}
// Sum up total values
if m.config.SendTotalValues {
switch {
case counterDef.addToIBTotal:
ib_total += v
ibTotal += vScaledCounter
case counterDef.addToIBTotalPkgs:
ib_total_pkts += v
ibTotalPkts += vScaledCounter
}
}
}
// Send total values
if m.config.SendTotalValues {
if y, err := lp.NewMetric("ib_total", info.tagSet, m.meta, ib_total, now); err == nil {
y.AddMeta("unit", "bytes")
if y, err := lp.NewMetric("ib_total", info.tagSet, m.meta, ibTotal, now); err == nil {
y.AddMeta("unit", ibDataUnit)
output <- y
}
if y, err := lp.NewMetric("ib_total_pkts", info.tagSet, m.meta, ib_total_pkts, now); err == nil {
y.AddMeta("unit", "packets")
if y, err := lp.NewMetric("ib_total_pkts", info.tagSet, m.meta, ibTotalPkts, now); err == nil {
y.AddMeta("unit", ibPkgUnit)
output <- y
}
if m.config.SendDerivedValues && ib_total_last_state_available {
rate := float64((ib_total - ib_total_last_state)) / timeDiff
if y, err := lp.NewMetric("ib_total_bw", info.tagSet, m.meta, rate, now); err == nil {
y.AddMeta("unit", "bytes/sec")
if m.config.SendDerivedValues && ibTotalBwAvailable {
if y, err := lp.NewMetric("ib_total_bw", info.tagSet, m.meta, ibTotalBw, now); err == nil {
y.AddMeta("unit", ibDataRateUnit)
output <- y
}
}
if m.config.SendDerivedValues && ib_total_pkts_last_state_available {
rate := float64((ib_total_pkts - ib_total_pkts_last_state)) / timeDiff
if y, err := lp.NewMetric("ib_total_pkts_bw", info.tagSet, m.meta, rate, now); err == nil {
y.AddMeta("unit", "packets/sec")
if m.config.SendDerivedValues && ibTotalPktsBwAvailable {
if y, err := lp.NewMetric("ib_total_pkts_bw", info.tagSet, m.meta, ibTotalPktsBw, now); err == nil {
y.AddMeta("unit", ibPkgRateUnit)
output <- y
}
}
+21
View File
@@ -31,10 +31,12 @@ type IpmiCollector struct {
IpmitoolPath string `json:"ipmitool_path"`
IpmisensorsPath string `json:"ipmisensors_path"`
Sudo bool `json:"use_sudo"`
IncludeMetrics []string `json:"include_metrics"`
}
ipmitool string
ipmisensors string
includeMetrics map[string]bool
}
func (m *IpmiCollector) Init(config json.RawMessage) error {
@@ -64,6 +66,15 @@ func (m *IpmiCollector) Init(config json.RawMessage) error {
}
}
// Read metrics to include
m.includeMetrics = make(map[string]bool)
for _, metric := range m.config.IncludeMetrics {
metric = strings.ToLower(strings.TrimSpace(metric))
if metric != "" {
m.includeMetrics[metric] = true
}
}
m.ipmitool = m.config.IpmitoolPath
m.ipmisensors = m.config.IpmisensorsPath
@@ -145,6 +156,11 @@ func (m *IpmiCollector) readIpmiTool(output chan lp.CCMessage) error {
continue
}
name := strings.ToLower(strings.ReplaceAll(strings.TrimSpace(lv[0]), " ", "_"))
if len(m.includeMetrics) > 0 && !m.includeMetrics[name] {
continue
}
unit := strings.TrimSpace(lv[2])
switch unit {
case "Volts":
@@ -212,6 +228,11 @@ func (m *IpmiCollector) readIpmiSensors(output chan lp.CCMessage) error {
continue
}
name := strings.ToLower(strings.ReplaceAll(lv[1], " ", "_"))
if len(m.includeMetrics) > 0 && !m.includeMetrics[name] {
continue
}
y, err := lp.NewMetric(name, map[string]string{"type": "node"}, m.meta, v, time.Now())
if err != nil {
cclog.ComponentErrorf(m.name, "Failed to create message: %v", err)
+4 -1
View File
@@ -15,7 +15,8 @@ hugo_path: docs/reference/cc-metric-collector/collectors/ipmi.md
"ipmistat": {
"ipmitool_path": "/path/to/ipmitool",
"ipmisensors_path": "/path/to/ipmi-sensors",
"use_sudo": true
"use_sudo": true,
"include_metrics" : []
}
```
@@ -36,3 +37,5 @@ Defaults: monitoring !log_allowed, !pam_session
monitoring ALL = (root) NOPASSWD:/usr/bin/ipmitool sensor
monitoring ALL = (root) NOPASSWD:/usr/sbin/ipmi-sensors --comma-separated-output --sdr-cache-recreate
```
If `include_ipmi_metrics` contains any entry, ipmistat collector will only submit these metrics. Any other values will get discarded.
+34 -3
View File
@@ -7,6 +7,7 @@ import (
"os/exec"
"os/user"
"path/filepath"
"regexp"
"strconv"
"strings"
"time"
@@ -45,6 +46,37 @@ type SlurmCgroupCollector struct {
const defaultCgroupBase = "/sys/fs/cgroup/system.slice/slurmstepd.scope"
var (
// Slurm cgroup v2 directory layout:
// - Slurm <= 25.11: job_<numeric job id>
// - Slurm >= 26.05: SLUID, encoded as "s" + 13 Crockford Base32 characters
jobIDDirRE = regexp.MustCompile(`^job_[0-9]+$`)
sluidDirRE = regexp.MustCompile(`(?i)^s[0-9A-HJKMNP-TV-Z]{13}$`)
)
func (m *SlurmCgroupCollector) findSlurmJobDirs() ([]string, error) {
entries, err := os.ReadDir(m.cgroupBase)
if err != nil {
return nil, err
}
jobDirs := make([]string, 0)
for _, entry := range entries {
if !entry.IsDir() {
continue
}
name := entry.Name()
if jobIDDirRE.MatchString(name) || sluidDirRE.MatchString(name) {
jobDirs = append(jobDirs, filepath.Join(m.cgroupBase, name))
}
}
return jobDirs, nil
}
func ParseCPUs(cpuset string) ([]int, error) {
var result []int
if cpuset == "" {
@@ -237,10 +269,9 @@ func (m *SlurmCgroupCollector) Read(interval time.Duration, output chan lp.CCMes
delete(m.cpuUsed, k)
}
globPattern := filepath.Join(m.cgroupBase, "job_*")
jobDirs, err := filepath.Glob(globPattern)
jobDirs, err := m.findSlurmJobDirs()
if err != nil {
cclog.ComponentError(m.name, "Error globbing job directories:", err.Error())
cclog.ComponentError(m.name, "Error reading job directories:", err.Error())
return
}
+2 -2
View File
@@ -5,11 +5,11 @@ go 1.25.0
require (
github.com/ClusterCockpit/cc-lib/v2 v2.12.0
github.com/ClusterCockpit/go-rocm-smi v0.4.0
github.com/NVIDIA/go-nvml v0.13.0-1
github.com/NVIDIA/go-nvml v0.13.2-0
github.com/PaesslerAG/gval v1.2.4
github.com/fsnotify/fsnotify v1.10.1
github.com/tklauser/go-sysconf v0.4.0
golang.design/x/thread v0.0.0-20210122121316-335e9adffdf1
golang.design/x/thread v0.3.2
golang.org/x/sys v0.45.0
)
+4 -4
View File
@@ -12,8 +12,9 @@ github.com/Microsoft/go-winio v0.6.1 h1:9/kr64B9VUZrLm5YYwbGtUJnMgqWVOdUAXu6Migc
github.com/Microsoft/go-winio v0.6.1/go.mod h1:LRdKpFKfdobln8UmuiYcKPot9D2v6svN5+sAH+4kjUM=
github.com/Microsoft/hcsshim v0.11.4 h1:68vKo2VN8DE9AdN4tnkWnmdhqdbpUFM8OF3Airm7fz8=
github.com/Microsoft/hcsshim v0.11.4/go.mod h1:smjE4dvqPX9Zldna+t5FG3rnoHhaB7QYxPRqGcpAD9w=
github.com/NVIDIA/go-nvml v0.13.0-1 h1:OLX8Jq3dONuPOQPC7rndB6+iDmDakw0XTYgzMxObkEw=
github.com/NVIDIA/go-nvml v0.13.0-1/go.mod h1:+KNA7c7gIBH7SKSJ1ntlwkfN80zdx8ovl4hrK3LmPt4=
github.com/NVIDIA/go-nvml v0.13.2-0 h1:7M4cFG62wSUHw8i0XSiNU7ejKODytTS6ZrW/vgB2NSI=
github.com/NVIDIA/go-nvml v0.13.2-0/go.mod h1:ahi2psRYoa+wYUBIrZPRO+wJs9lcvMhxSSkjjvsJJNQ=
github.com/PaesslerAG/gval v1.2.4 h1:rhX7MpjJlcxYwL2eTTYIOBUyEKZ+A96T9vQySWkVUiU=
github.com/PaesslerAG/gval v1.2.4/go.mod h1:XRFLwvmkTEdYziLdaCeCa5ImcGVrfQbeNUbVR+C6xac=
github.com/PaesslerAG/jsonpath v0.1.0 h1:gADYeifvlqK3R3i2cR5B4DGgxLXIPb3TRTH1mGi0jPI=
@@ -173,8 +174,8 @@ go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.yaml.in/yaml/v2 v2.4.4 h1:tuyd0P+2Ont/d6e2rl3be67goVK4R6deVxCUX5vyPaQ=
go.yaml.in/yaml/v2 v2.4.4/go.mod h1:gMZqIpDtDqOfM0uNfy0SkpRhvUryYH0Z6wdMYcacYXQ=
golang.design/x/thread v0.0.0-20210122121316-335e9adffdf1 h1:P7S/GeHBAFEZIYp0ePPs2kHXoazz8q2KsyxHyQVGCJg=
golang.design/x/thread v0.0.0-20210122121316-335e9adffdf1/go.mod h1:9CWpnTUmlQkfdpdutA1nNf4iE5lAVt3QZOu0Z6hahBE=
golang.design/x/thread v0.3.2 h1:FmD1glspGrQCe6FuQLmSrT6wz2CSzq7vKVDluyiMnqo=
golang.design/x/thread v0.3.2/go.mod h1:6+Hi2rMOgMHZdKDWaqNHyWtoFUx1HxZ06LfHPh5Z/hQ=
golang.org/x/crypto v0.50.0 h1:zO47/JPrL6vsNkINmLoo/PH1gcxpls50DNogFvB5ZGI=
golang.org/x/crypto v0.50.0/go.mod h1:3muZ7vA7PBCE6xgPX7nkzzjiUq87kRItoJQM1Yo8S+Q=
golang.org/x/exp v0.0.0-20231005195138-3e424a577f31 h1:9k5exFQKQglLo+RoP+4zMjOFE14P6+vyR0baDAi0Rcs=
@@ -183,7 +184,6 @@ golang.org/x/mod v0.13.0 h1:I/DsJXRlw/8l/0c24sM9yb0T4z9liZTduXvdAWYiysY=
golang.org/x/mod v0.13.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/net v0.53.0 h1:d+qAbo5L0orcWAr0a9JweQpjXF19LMXJE8Ey7hwOdUA=
golang.org/x/net v0.53.0/go.mod h1:JvMuJH7rrdiCfbeHoo3fCQU24Lf5JJwT9W3sJFulfgs=
golang.org/x/sys v0.0.0-20210122093101-04d7465088b8/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.45.0 h1:dO4czNzziLiiXplLQgBCEpCvXQ3dnkn0SdaZSYdQ+FY=
golang.org/x/sys v0.45.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
golang.org/x/time v0.15.0 h1:bbrp8t3bGUeFOx08pvsMYRTCVSMk89u4tKbNOZbp88U=