add slurm_cgroup Collector

This commit is contained in:
brinkcoder
2025-07-16 07:53:19 +02:00
committed by Thomas Gruber
parent a45366646e
commit c5183feafc
4 changed files with 371 additions and 0 deletions

View File

@@ -52,6 +52,7 @@ In contrast to the configuration files for sinks and receivers, the collectors c
* [`beegfs_meta`](./beegfsmetaMetric.md)
* [`beegfs_storage`](./beegfsstorageMetric.md)
* [`rocm_smi`](./rocmsmiMetric.md)
* [`slurm_cgroup`](./slurmCgroupMetric.md)
## Todos

View File

@@ -47,6 +47,7 @@ var AvailableCollectors = map[string]MetricCollector{
"self": new(SelfCollector),
"schedstat": new(SchedstatCollector),
"nfsiostat": new(NfsIOStatCollector),
"slurm_cgroup": new(SlurmCgroupCollector),
}
// Metric collector manager data structure

View File

@@ -0,0 +1,321 @@
package collectors
import (
"encoding/json"
"fmt"
"os"
"path/filepath"
"strconv"
"strings"
"time"
cclog "github.com/ClusterCockpit/cc-lib/ccLogger"
lp "github.com/ClusterCockpit/cc-lib/ccMessage"
)
type SlurmJobData struct {
MemoryUsage float64
MaxMemoryUsage float64
LimitMemoryUsage float64
CpuUsageUser float64
CpuUsageSys float64
CpuSet []int
}
type SlurmCgroupsConfig struct {
CgroupBase string `json:"cgroup_base"`
ExcludeMetrics []string `json:"exclude_metrics,omitempty"`
}
type SlurmCgroupCollector struct {
metricCollector
config SlurmCgroupsConfig
meta map[string]string
tags map[string]string
allCPUs []int
cpuUsed map[int]bool
cgroupBase string
excludeMetrics map[string]struct{}
}
const defaultCgroupBase = "/sys/fs/cgroup/system.slice/slurmstepd.scope"
func ParseCPUs(cpuset string) ([]int, error) {
var result []int
if cpuset == "" {
return result, nil
}
ranges := strings.Split(cpuset, ",")
for _, r := range ranges {
if strings.Contains(r, "-") {
parts := strings.Split(r, "-")
if len(parts) != 2 {
return nil, fmt.Errorf("invalid CPU range: %s", r)
}
start, err := strconv.Atoi(strings.TrimSpace(parts[0]))
if err != nil {
return nil, fmt.Errorf("invalid CPU range start: %s", parts[0])
}
end, err := strconv.Atoi(strings.TrimSpace(parts[1]))
if err != nil {
return nil, fmt.Errorf("invalid CPU range end: %s", parts[1])
}
for i := start; i <= end; i++ {
result = append(result, i)
}
} else {
cpu, err := strconv.Atoi(strings.TrimSpace(r))
if err != nil {
return nil, fmt.Errorf("invalid CPU ID: %s", r)
}
result = append(result, cpu)
}
}
return result, nil
}
func GetAllCPUs() ([]int, error) {
data, err := os.ReadFile("/sys/devices/system/cpu/online")
if err != nil {
return nil, fmt.Errorf("failed to read /sys/devices/system/cpu/online: %v", err)
}
return ParseCPUs(strings.TrimSpace(string(data)))
}
func (m *SlurmCgroupCollector) isExcluded(metric string) bool {
_, found := m.excludeMetrics[metric]
return found
}
func (m *SlurmCgroupCollector) Init(config json.RawMessage) error {
var err error
m.name = "SlurmCgroupCollector"
m.setup()
m.parallel = true
m.meta = map[string]string{"source": m.name, "group": "SLURM"}
m.tags = map[string]string{"type": "hwthread"}
m.cpuUsed = make(map[int]bool)
m.cgroupBase = defaultCgroupBase
if len(config) > 0 {
err = json.Unmarshal(config, &m.config)
m.excludeMetrics = make(map[string]struct{})
for _, metric := range m.config.ExcludeMetrics {
m.excludeMetrics[metric] = struct{}{}
}
if err != nil {
cclog.ComponentError(m.name, "Error reading config:", err.Error())
return err
}
if m.config.CgroupBase != "" {
m.cgroupBase = m.config.CgroupBase
}
}
m.allCPUs, err = GetAllCPUs()
if err != nil {
cclog.ComponentError(m.name, "Error reading online CPUs:", err.Error())
return err
}
m.init = true
return nil
}
func (m *SlurmCgroupCollector) ReadJobData(jobdir string) (SlurmJobData, error) {
jobdata := SlurmJobData{
MemoryUsage: 0,
MaxMemoryUsage: 0,
LimitMemoryUsage: 0,
CpuUsageUser: 0,
CpuUsageSys: 0,
CpuSet: []int{},
}
cg := func(f string) string { return filepath.Join(m.cgroupBase, jobdir, f) }
memUsage, err := os.ReadFile(cg("memory.current"))
if err == nil {
x, err := strconv.ParseFloat(strings.TrimSpace(string(memUsage)), 64)
if err == nil {
jobdata.MemoryUsage = x
}
}
maxMem, err := os.ReadFile(cg("memory.peak"))
if err == nil {
x, err := strconv.ParseFloat(strings.TrimSpace(string(maxMem)), 64)
if err == nil {
jobdata.MaxMemoryUsage = x
}
}
limitMem, err := os.ReadFile(cg("memory.max"))
if err == nil {
x, err := strconv.ParseFloat(strings.TrimSpace(string(limitMem)), 64)
if err == nil {
jobdata.LimitMemoryUsage = x
}
}
cpuStat, err := os.ReadFile(cg("cpu.stat"))
if err == nil {
lines := strings.Split(strings.TrimSpace(string(cpuStat)), "\n")
var usageUsec, userUsec, systemUsec float64
for _, line := range lines {
fields := strings.Fields(line)
if len(fields) < 2 {
continue
}
value, err := strconv.ParseFloat(fields[1], 64)
if err != nil {
continue
}
switch fields[0] {
case "usage_usec":
usageUsec = value
case "user_usec":
userUsec = value
case "system_usec":
systemUsec = value
}
}
if usageUsec > 0 {
jobdata.CpuUsageUser = (userUsec * 100 / usageUsec)
jobdata.CpuUsageSys = (systemUsec * 100 / usageUsec)
}
}
cpuSet, err := os.ReadFile(cg("cpuset.cpus"))
if err == nil {
cpus, err := ParseCPUs(strings.TrimSpace(string(cpuSet)))
if err == nil {
jobdata.CpuSet = cpus
}
}
return jobdata, nil
}
func (m *SlurmCgroupCollector) Read(interval time.Duration, output chan lp.CCMessage) {
timestamp := time.Now()
for k := range m.cpuUsed {
delete(m.cpuUsed, k)
}
globPattern := filepath.Join(m.cgroupBase, "job_*")
jobDirs, err := filepath.Glob(globPattern)
if err != nil {
cclog.ComponentError(m.name, "Error globbing job directories:", err.Error())
return
}
for _, jdir := range jobDirs {
jKey := filepath.Base(jdir)
jobdata, err := m.ReadJobData(jKey)
if err != nil {
cclog.ComponentError(m.name, "Error reading job data for", jKey, ":", err.Error())
continue
}
if len(jobdata.CpuSet) > 0 {
coreCount := float64(len(jobdata.CpuSet))
for _, cpu := range jobdata.CpuSet {
coreTags := map[string]string{
"type": "hwthread",
"type-id": fmt.Sprintf("%d", cpu),
}
if coreCount > 0 && !m.isExcluded("job_mem_used") {
memPerCore := jobdata.MemoryUsage / coreCount
if y, err := lp.NewMessage("job_mem_used", coreTags, m.meta, map[string]interface{}{"value": memPerCore}, timestamp); err == nil {
y.AddMeta("unit", "Bytes")
output <- y
}
}
if coreCount > 0 && !m.isExcluded("job_max_mem_used") {
maxMemPerCore := jobdata.MaxMemoryUsage / coreCount
if y, err := lp.NewMessage("job_max_mem_used", coreTags, m.meta, map[string]interface{}{"value": maxMemPerCore}, timestamp); err == nil {
y.AddMeta("unit", "Bytes")
output <- y
}
}
if coreCount > 0 && !m.isExcluded("job_mem_limit") {
limitPerCore := jobdata.LimitMemoryUsage / coreCount
if y, err := lp.NewMessage("job_mem_limit", coreTags, m.meta, map[string]interface{}{"value": limitPerCore}, timestamp); err == nil {
y.AddMeta("unit", "Bytes")
output <- y
}
}
if coreCount > 0 && !m.isExcluded("job_user_cpu") {
cpuUserPerCore := jobdata.CpuUsageUser / coreCount
if y, err := lp.NewMessage("job_user_cpu", coreTags, m.meta, map[string]interface{}{"value": cpuUserPerCore}, timestamp); err == nil {
y.AddMeta("unit", "%")
output <- y
}
}
if coreCount > 0 && !m.isExcluded("job_sys_cpu") {
cpuSysPerCore := jobdata.CpuUsageSys / coreCount
if y, err := lp.NewMessage("job_sys_cpu", coreTags, m.meta, map[string]interface{}{"value": cpuSysPerCore}, timestamp); err == nil {
y.AddMeta("unit", "%")
output <- y
}
}
m.cpuUsed[cpu] = true
}
}
}
for _, cpu := range m.allCPUs {
if !m.cpuUsed[cpu] {
coreTags := map[string]string{
"type": "hwthread",
"type-id": fmt.Sprintf("%d", cpu),
}
if !m.isExcluded("job_mem_used") {
if y, err := lp.NewMessage("job_mem_used", coreTags, m.meta, map[string]interface{}{"value": 0}, timestamp); err == nil {
y.AddMeta("unit", "Bytes")
output <- y
}
}
if !m.isExcluded("job_max_mem_used") {
if y, err := lp.NewMessage("job_max_mem_used", coreTags, m.meta, map[string]interface{}{"value": 0}, timestamp); err == nil {
y.AddMeta("unit", "Bytes")
output <- y
}
}
if !m.isExcluded("job_mem_limit") {
if y, err := lp.NewMessage("job_mem_limit", coreTags, m.meta, map[string]interface{}{"value": 0}, timestamp); err == nil {
y.AddMeta("unit", "Bytes")
output <- y
}
}
if !m.isExcluded("job_user_cpu") {
if y, err := lp.NewMessage("job_user_cpu", coreTags, m.meta, map[string]interface{}{"value": 0}, timestamp); err == nil {
y.AddMeta("unit", "%")
output <- y
}
}
if !m.isExcluded("job_sys_cpu") {
if y, err := lp.NewMessage("job_sys_cpu", coreTags, m.meta, map[string]interface{}{"value": 0}, timestamp); err == nil {
y.AddMeta("unit", "%")
output <- y
}
}
}
}
}
func (m *SlurmCgroupCollector) Close() {
m.init = false
}

View File

@@ -0,0 +1,48 @@
<!--
---
title: Slurm cgroup metric collector
description: Collect per-core memory and CPU usage for SLURM jobs from cgroup v2
categories: [cc-metric-collector]
tags: ['Admin']
weight: 3
hugo_path: docs/reference/cc-metric-collector/collectors/slurm_cgroup.md
---
-->
## `slurm_cgroup` collector
The `slurm_cgroup` collector reads job-specific resource metrics from the cgroup v2 filesystem and provides **hwthread** metrics for memory and CPU usage of running SLURM jobs.
### Example configuration
```json
"slurm_cgroup": {
"cgroup_base": "/sys/fs/cgroup/system.slice/slurmstepd.scope",
"exclude_metrics": [
"job_sys_cpu",
"job_mem_limit"
]
}
```
* The `cgroup_base` parameter (optional) can be set to specify the root path to SLURM job cgroups. The default is `/sys/fs/cgroup/system.slice/slurmstepd.scope`.
* The `exclude_metrics` array can be used to suppress individual metrics from being sent to the sink.
### Reported metrics
All metrics are available **per hardware thread** :
* `job_mem_used` (`unit=Bytes`): Current memory usage of the job
* `job_max_mem_used` (`unit=Bytes`): Peak memory usage
* `job_mem_limit` (`unit=Bytes`): Cgroup memory limit
* `job_user_cpu` (`unit=%`): User CPU utilization percentage
* `job_sys_cpu` (`unit=%`): System CPU utilization percentage
Each metric has tags:
* `type=hwthread`
* `type-id=<core_id>`
### Limitations
* **cgroups v2 required:** This collector only supports systems running with cgroups v2 (unified hierarchy).