mirror of
https://github.com/ClusterCockpit/cc-metric-collector.git
synced 2025-07-21 12:21:41 +02:00
Compare commits
12 Commits
snmp_recei
...
slurm_cgro
Author | SHA1 | Date | |
---|---|---|---|
|
a7bf94a52b | ||
|
2e8182adb8 | ||
|
8055d1425c | ||
|
8450bc4342 | ||
|
113ccb3ac5 | ||
|
48335dd872 | ||
|
bace84bad0 | ||
|
9b671ce68f | ||
|
226e8425cb | ||
|
a37f6603c8 | ||
|
78902305e8 | ||
|
e7b77f7721 |
2
.github/workflows/Release.yml
vendored
2
.github/workflows/Release.yml
vendored
@@ -195,7 +195,7 @@ jobs:
|
||||
Release:
|
||||
runs-on: ubuntu-latest
|
||||
# We need the RPMs, so add dependency
|
||||
needs: [AlmaLinux-RPM-build, UBI-8-RPM-build, Ubuntu-jammy-build]
|
||||
needs: [AlmaLinux-RPM-build, UBI-8-RPM-build, Ubuntu-focal-build]
|
||||
|
||||
steps:
|
||||
# See: https://github.com/actions/download-artifact
|
||||
|
@@ -40,6 +40,7 @@ In contrast to the configuration files for sinks and receivers, the collectors c
|
||||
* [`beegfs_meta`](./beegfsmetaMetric.md)
|
||||
* [`beegfs_storage`](./beegfsstorageMetric.md)
|
||||
* [`rocm_smi`](./rocmsmiMetric.md)
|
||||
* [`slurm`](./slurmJobDetector.md)
|
||||
|
||||
## Todos
|
||||
|
||||
|
@@ -40,6 +40,7 @@ var AvailableCollectors = map[string]MetricCollector{
|
||||
"rocm_smi": new(RocmSmiCollector),
|
||||
"self": new(SelfCollector),
|
||||
"schedstat": new(SchedstatCollector),
|
||||
"slurm": new(SlurmJobDetector),
|
||||
"nfsiostat": new(NfsIOStatCollector),
|
||||
}
|
||||
|
||||
|
620
collectors/slurmJobDetector.go
Normal file
620
collectors/slurmJobDetector.go
Normal file
@@ -0,0 +1,620 @@
|
||||
package collectors
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
osuser "os/user"
|
||||
filepath "path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
|
||||
lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
|
||||
)
|
||||
|
||||
// These are the fields we read from the JSON configuration
|
||||
type SlurmJobDetectorConfig struct {
|
||||
Interval string `json:"interval"`
|
||||
SendJobEvents bool `json:"send_job_events,omitempty"`
|
||||
SendStepEvents bool `json:"send_step_events,omitempty"`
|
||||
SendJobMetrics bool `json:"send_job_metrics,omitempty"`
|
||||
SendStepMetrics bool `json:"send_step_metrics,omitempty"`
|
||||
BaseDirectory string `json:"sysfs_base,omitempty"`
|
||||
CgroupVersion string `json:"cgroup_version"`
|
||||
}
|
||||
|
||||
// This information is sent as JSON when an event occurs
|
||||
type SlurmJobMetadata struct {
|
||||
UID string `json:"uid,omitempty"`
|
||||
JobId string `json:"jobid"`
|
||||
Timestamp uint64 `json:"timestamp"`
|
||||
Status string `json:"status"`
|
||||
Step string `json:"step,omitempty"`
|
||||
Cpus []int `json:"cpus,omitempty"`
|
||||
Memories []int `json:"memories,omitempty"`
|
||||
MemoryLimitHard int64 `json:"memory_limit_hard,omitempty"`
|
||||
MemoryLimitSoft int64 `json:"memory_limit_soft,omitempty"`
|
||||
Devices []string `json:"devices,omitempty"`
|
||||
}
|
||||
|
||||
type SlurmJobMetrics struct {
|
||||
MemoryUsage int64
|
||||
MaxMemoryUsage int64
|
||||
LimitMemoryUsage int64
|
||||
CpuUsageUser int64
|
||||
CpuUsageSys int64
|
||||
}
|
||||
|
||||
// This contains all variables we need during execution and the variables
|
||||
// defined by metricCollector (name, init, ...)
|
||||
type SlurmJobDetector struct {
|
||||
metricCollector
|
||||
config SlurmJobDetectorConfig // the configuration structure
|
||||
meta map[string]string // default meta information
|
||||
interval time.Duration // the interval parsed from configuration
|
||||
ticker *time.Ticker // own timer for event checking
|
||||
output chan lp.CCMetric // variable to save output channel at Read() for event sending
|
||||
wg sync.WaitGroup // sync group for event checking management
|
||||
done chan bool // channel for event checking management
|
||||
directories map[string]SlurmJobMetadata // directory -> data mapping (data stored to re-send data in end job events)
|
||||
}
|
||||
|
||||
const default_base_dir = "/sys/fs/cgroup"
|
||||
const default_cgroup_version = "v1"
|
||||
|
||||
// not required to pre-initialized. Will be overwritten in Init() based on configuration
|
||||
var cpuacct_base = filepath.Join(default_base_dir, "cpuacct", "slurm")
|
||||
var memory_base = filepath.Join(default_base_dir, "memory", "slurm")
|
||||
var cpuset_base = filepath.Join(default_base_dir, "cpuset", "slurm")
|
||||
var devices_base = filepath.Join(default_base_dir, "devices", "slurm")
|
||||
|
||||
// Filenames for cgroup/v1
|
||||
var limit_in_bytes_file = "memory.limit_in_bytes"
|
||||
var soft_limit_in_bytes_file = "memory.soft_limit_in_bytes"
|
||||
var cpus_effective_file = "cpuset.effective_cpus"
|
||||
var mems_effective_file = "cpuset.effective_mems"
|
||||
var devices_list_file = "devices.list"
|
||||
var usage_in_bytes_file = "memory.usage_in_bytes"
|
||||
var max_usage_in_bytes_file = "memory.max_usage_in_bytes"
|
||||
var cpuacct_usage_file = "cpuacct.usage"
|
||||
var cpuacct_usage_user_file = "cpuacct.usage_user"
|
||||
|
||||
// Filenames for cgroup/v2
|
||||
// In Init() the filenames are set based on configuration
|
||||
const soft_limit_in_bytes_file_v2 = "memory.high"
|
||||
const limit_in_bytes_file_v2 = "memory.max"
|
||||
const cpus_effective_file_v2 = "cpuset.cpus.effective"
|
||||
const mems_effective_file_v2 = "cpuset.mems.effective"
|
||||
const devices_list_file_v2 = "devices.list"
|
||||
const usage_in_bytes_file_v2 = "memory.usage_in_bytes"
|
||||
const max_usage_in_bytes_file_v2 = "memory.max_usage_in_bytes"
|
||||
const cpuacct_usage_file_v2 = "cpuacct.usage"
|
||||
const cpuacct_usage_user_file_v2 = "cpuacct.usage_user"
|
||||
|
||||
func fileToInt64(filename string) (int64, error) {
|
||||
data, err := os.ReadFile(filename)
|
||||
if err == nil {
|
||||
x, err := strconv.ParseInt(string(data), 0, 64)
|
||||
if err == nil {
|
||||
return x, err
|
||||
}
|
||||
}
|
||||
return 0, err
|
||||
}
|
||||
|
||||
func ExpandList(strlist string) []int {
|
||||
out := make([]int, 0)
|
||||
level1 := strings.Split(strlist, ",")
|
||||
if len(level1) > 0 {
|
||||
for _, entry := range level1 {
|
||||
var s, e int
|
||||
_, err := fmt.Sscanf(entry, "%d-%d", &s, &e)
|
||||
if err == nil {
|
||||
if s < e {
|
||||
for i := s; i <= e; i++ {
|
||||
out = append(out, i)
|
||||
}
|
||||
} else {
|
||||
for i := e; i <= s; i-- {
|
||||
out = append(out, i)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
_, err := fmt.Sscanf(entry, "%d", &s)
|
||||
if err == nil {
|
||||
out = append(out, s)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
func ParseDevices(devlist string) []string {
|
||||
out := make([]string, 0)
|
||||
// a *:* rwm
|
||||
return out
|
||||
}
|
||||
|
||||
func GetPathParts(path string) []string {
|
||||
out := make([]string, 0)
|
||||
uid := ""
|
||||
jobid := ""
|
||||
step := ""
|
||||
parts := strings.Split(path, "/")
|
||||
// the folders of interest are at the end of the list, so traverse
|
||||
// from the back
|
||||
for i := len(parts) - 1; i >= 0; i-- {
|
||||
if strings.HasPrefix(parts[i], "uid_") {
|
||||
uid = parts[i]
|
||||
} else if strings.HasPrefix(parts[i], "job_") {
|
||||
jobid = parts[i]
|
||||
} else if strings.HasPrefix(parts[i], "step_") {
|
||||
step = parts[i]
|
||||
}
|
||||
}
|
||||
// only cgroup/v1 provides a uid but needs to be first entry
|
||||
if len(uid) > 0 {
|
||||
out = append(out, uid)
|
||||
}
|
||||
if len(jobid) > 0 {
|
||||
out = append(out, jobid)
|
||||
}
|
||||
// only if it's a step folder
|
||||
if len(step) > 0 {
|
||||
out = append(out, step)
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
func GetIdsFromParts(parts []string) (string, string, string) {
|
||||
uid := ""
|
||||
jobid := ""
|
||||
step := ""
|
||||
|
||||
for _, p := range parts {
|
||||
if strings.HasPrefix(p, "job_") {
|
||||
jobid = strings.TrimPrefix(p, "job_")
|
||||
} else if strings.HasPrefix(p, "uid_") {
|
||||
uid = strings.TrimPrefix(p, "uid_")
|
||||
} else if strings.HasPrefix(p, "step_") {
|
||||
step = strings.TrimPrefix(p, "step_")
|
||||
}
|
||||
}
|
||||
return uid, jobid, step
|
||||
}
|
||||
|
||||
func (m *SlurmJobDetector) CheckEvents(timestamp time.Time) {
|
||||
var err error = nil
|
||||
var dirs []string = nil
|
||||
parts := make([]string, 3)
|
||||
parts = append(parts, cpuacct_base)
|
||||
if m.config.CgroupVersion == "v1" {
|
||||
parts = append(parts, "uid_[0-9]*")
|
||||
}
|
||||
parts = append(parts, "job_[0-9]*")
|
||||
|
||||
dirs, err = filepath.Glob(filepath.Join(parts...))
|
||||
if err != nil {
|
||||
cclog.ComponentError(m.name, "Cannot get directory list for SLURM jobs")
|
||||
return
|
||||
}
|
||||
if m.config.SendStepEvents {
|
||||
parts = append(parts, "step_*")
|
||||
sdirs, err := filepath.Glob(filepath.Join(parts...))
|
||||
if err != nil {
|
||||
cclog.ComponentError(m.name, "Cannot get directory list for SLURM steps")
|
||||
return
|
||||
}
|
||||
dirs = append(dirs, sdirs...)
|
||||
}
|
||||
|
||||
for _, d := range dirs {
|
||||
// Folder not in known directories map -> New job
|
||||
if _, ok := m.directories[d]; !ok {
|
||||
dirParts := GetPathParts(d)
|
||||
data, err := m.NewJobEvent(dirParts, timestamp, m.output)
|
||||
if err == nil {
|
||||
// Add the directory to the map
|
||||
cclog.ComponentDebug(m.name, "Adding directory ", d, " to known directories")
|
||||
m.directories[d] = data
|
||||
}
|
||||
}
|
||||
}
|
||||
for d, data := range m.directories {
|
||||
// Known directory but it does not exist anymore -> Vanished/Finished job
|
||||
if _, ok := stringArrayContains(dirs, d); !ok {
|
||||
dirParts := GetPathParts(d)
|
||||
err := m.EndJobEvent(dirParts, data, timestamp, m.output)
|
||||
if err != nil {
|
||||
uid, jobid, step := GetIdsFromParts(dirParts)
|
||||
if len(step) == 0 {
|
||||
cclog.ComponentError(m.name, "Failed to end job for user ", uid, " jobid ", jobid)
|
||||
} else {
|
||||
cclog.ComponentError(m.name, "Failed to end job for user ", uid, " jobid ", jobid, " step ", step)
|
||||
}
|
||||
}
|
||||
// Remove the directory from the map
|
||||
cclog.ComponentDebug(m.name, "Removing directory ", d, " to known directories")
|
||||
delete(m.directories, d)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (m *SlurmJobDetector) NewJobEvent(parts []string, timestamp time.Time, output chan lp.CCMetric) (SlurmJobMetadata, error) {
|
||||
uid, jobid, step := GetIdsFromParts(parts)
|
||||
pathstr := filepath.Join(parts...)
|
||||
if len(jobid) > 0 {
|
||||
cclog.ComponentError(m.name, "No jobid in path ", pathstr)
|
||||
return SlurmJobMetadata{}, fmt.Errorf("no jobid in path %s", pathstr)
|
||||
}
|
||||
jobtags := map[string]string{
|
||||
"type": "job",
|
||||
"type-id": jobid,
|
||||
}
|
||||
|
||||
// Fill job JSON with data from cgroup
|
||||
md := SlurmJobMetadata{
|
||||
JobId: jobid,
|
||||
Timestamp: uint64(timestamp.Unix()),
|
||||
Status: "start",
|
||||
}
|
||||
// cgroup/v2 has no uid in parts
|
||||
if len(uid) > 0 {
|
||||
md.UID = uid
|
||||
}
|
||||
if len(step) > 0 {
|
||||
md.Step = step
|
||||
jobtags["stype"] = "step"
|
||||
jobtags["stype-id"] = step
|
||||
}
|
||||
|
||||
job_cpus, err := os.ReadFile(filepath.Join(cpuset_base, pathstr, cpus_effective_file))
|
||||
if err == nil {
|
||||
md.Cpus = ExpandList(string(job_cpus))
|
||||
}
|
||||
job_mems, err := os.ReadFile(filepath.Join(cpuset_base, pathstr, mems_effective_file))
|
||||
if err == nil {
|
||||
md.Memories = ExpandList(string(job_mems))
|
||||
}
|
||||
job_devs, err := os.ReadFile(filepath.Join(devices_base, pathstr, devices_list_file))
|
||||
if err == nil {
|
||||
md.Devices = ParseDevices(string(job_devs))
|
||||
}
|
||||
x, err := fileToInt64(filepath.Join(memory_base, pathstr, limit_in_bytes_file))
|
||||
if err == nil {
|
||||
md.MemoryLimitHard = x
|
||||
}
|
||||
x, err = fileToInt64(filepath.Join(memory_base, pathstr, soft_limit_in_bytes_file))
|
||||
if err == nil {
|
||||
md.MemoryLimitSoft = x
|
||||
}
|
||||
|
||||
jobjson, err := json.Marshal(md)
|
||||
if err == nil {
|
||||
y, err := lp.New("slurm", jobtags, m.meta, map[string]interface{}{"value": string(jobjson)}, timestamp)
|
||||
if err == nil {
|
||||
if len(uid) > 0 {
|
||||
y.AddMeta("uid", uid)
|
||||
uname, err := osuser.LookupId(uid)
|
||||
if err == nil {
|
||||
y.AddMeta("username", uname.Username)
|
||||
}
|
||||
}
|
||||
y.AddMeta("metric_type", "event")
|
||||
output <- y
|
||||
}
|
||||
}
|
||||
return md, nil
|
||||
}
|
||||
|
||||
// Not sure if it works with steps since the folders commonly do not vanish when a job step is finished
|
||||
func (m *SlurmJobDetector) EndJobEvent(parts []string, data SlurmJobMetadata, timestamp time.Time, output chan lp.CCMetric) error {
|
||||
uid, jobid, step := GetIdsFromParts(parts)
|
||||
pathstr := filepath.Join(parts...)
|
||||
if len(jobid) > 0 {
|
||||
err := fmt.Errorf("no jobid in path %s", pathstr)
|
||||
cclog.ComponentError(m.name, err.Error())
|
||||
return err
|
||||
}
|
||||
jobtags := map[string]string{
|
||||
"type": "job",
|
||||
"type-id": jobid,
|
||||
}
|
||||
|
||||
// Fill job JSON with data from cgroup
|
||||
md := SlurmJobMetadata{
|
||||
JobId: jobid,
|
||||
Timestamp: uint64(timestamp.Unix()),
|
||||
Cpus: data.Cpus,
|
||||
Memories: data.Memories,
|
||||
Devices: data.Devices,
|
||||
MemoryLimitHard: data.MemoryLimitHard,
|
||||
MemoryLimitSoft: data.MemoryLimitSoft,
|
||||
Status: "end",
|
||||
}
|
||||
// cgroup/v2 has no uid in parts
|
||||
if len(uid) > 0 {
|
||||
md.UID = uid
|
||||
}
|
||||
if len(step) > 0 {
|
||||
md.Step = step
|
||||
jobtags["stype"] = "step"
|
||||
jobtags["stype-id"] = step
|
||||
}
|
||||
|
||||
jobjson, err := json.Marshal(md)
|
||||
if err == nil {
|
||||
y, err := lp.New("slurm", jobtags, m.meta, map[string]interface{}{"value": string(jobjson)}, timestamp)
|
||||
if err == nil {
|
||||
if len(uid) > 0 {
|
||||
y.AddMeta("uid", uid)
|
||||
uname, err := osuser.LookupId(uid)
|
||||
if err == nil {
|
||||
y.AddMeta("username", uname.Username)
|
||||
}
|
||||
}
|
||||
y.AddMeta("metric_type", "event")
|
||||
output <- y
|
||||
} else {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *SlurmJobDetector) ReadMetrics(parts []string) (SlurmJobMetrics, error) {
|
||||
jobdata := SlurmJobMetrics{
|
||||
MemoryUsage: 0,
|
||||
MaxMemoryUsage: 0,
|
||||
LimitMemoryUsage: 0,
|
||||
CpuUsageUser: 0,
|
||||
CpuUsageSys: 0,
|
||||
}
|
||||
|
||||
part := filepath.Join(parts...)
|
||||
|
||||
x, err := fileToInt64(filepath.Join(memory_base, part, usage_in_bytes_file))
|
||||
if err == nil {
|
||||
jobdata.MemoryUsage = x
|
||||
}
|
||||
x, err = fileToInt64(filepath.Join(memory_base, part, max_usage_in_bytes_file))
|
||||
if err == nil {
|
||||
jobdata.MaxMemoryUsage = x
|
||||
}
|
||||
tu, err := fileToInt64(filepath.Join(cpuacct_base, part, cpuacct_usage_file))
|
||||
if err == nil {
|
||||
uu, err := fileToInt64(filepath.Join(cpuacct_base, part, cpuacct_usage_user_file))
|
||||
if err == nil {
|
||||
jobdata.CpuUsageUser = int64(uu/tu) * 100
|
||||
jobdata.CpuUsageSys = 100 - jobdata.CpuUsageUser
|
||||
}
|
||||
|
||||
}
|
||||
return jobdata, nil
|
||||
}
|
||||
|
||||
func (m *SlurmJobDetector) SendMetrics(jobtags, jobmeta map[string]string, jobmetrics SlurmJobMetrics, timestamp time.Time, output chan lp.CCMetric) {
|
||||
|
||||
y, err := lp.New("job_mem_used", jobtags, m.meta, map[string]interface{}{"value": jobmetrics.MemoryUsage}, timestamp)
|
||||
if err == nil {
|
||||
y.AddMeta("unit", "Bytes")
|
||||
for k, v := range jobmeta {
|
||||
y.AddMeta(k, v)
|
||||
}
|
||||
output <- y
|
||||
}
|
||||
y, err = lp.New("job_max_mem_used", jobtags, m.meta, map[string]interface{}{"value": jobmetrics.MaxMemoryUsage}, timestamp)
|
||||
if err == nil {
|
||||
y.AddMeta("unit", "Bytes")
|
||||
for k, v := range jobmeta {
|
||||
y.AddMeta(k, v)
|
||||
}
|
||||
output <- y
|
||||
}
|
||||
y, err = lp.New("job_cpu_user", jobtags, m.meta, map[string]interface{}{"value": jobmetrics.CpuUsageUser}, timestamp)
|
||||
if err == nil {
|
||||
y.AddMeta("unit", "%")
|
||||
for k, v := range jobmeta {
|
||||
y.AddMeta(k, v)
|
||||
}
|
||||
output <- y
|
||||
}
|
||||
y, err = lp.New("job_cpu_sys", jobtags, m.meta, map[string]interface{}{"value": jobmetrics.CpuUsageSys}, timestamp)
|
||||
if err == nil {
|
||||
y.AddMeta("unit", "%")
|
||||
for k, v := range jobmeta {
|
||||
y.AddMeta(k, v)
|
||||
}
|
||||
output <- y
|
||||
}
|
||||
}
|
||||
|
||||
// Init initializes the sample collector
|
||||
// Called once by the collector manager
|
||||
// All tags, meta data tags and metrics that do not change over the runtime should be set here
|
||||
func (m *SlurmJobDetector) Init(config json.RawMessage) error {
|
||||
var err error = nil
|
||||
m.name = "SlurmJobDetector"
|
||||
// This is for later use, also call it early
|
||||
m.setup()
|
||||
// Can be run in parallel with others
|
||||
m.parallel = true
|
||||
// Define meta information sent with each metric
|
||||
m.meta = map[string]string{"source": m.name, "group": "SLURM"}
|
||||
// Set configuration defaults
|
||||
m.config.SendJobEvents = false
|
||||
m.config.SendJobMetrics = false
|
||||
m.config.SendStepEvents = false
|
||||
m.config.SendStepMetrics = false
|
||||
m.config.CgroupVersion = default_cgroup_version
|
||||
m.config.BaseDirectory = default_base_dir
|
||||
// Read in the JSON configuration
|
||||
if len(config) > 0 {
|
||||
err = json.Unmarshal(config, &m.config)
|
||||
if err != nil {
|
||||
cclog.ComponentError(m.name, "Error reading config:", err.Error())
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Parse the read interval duration
|
||||
m.interval, err = time.ParseDuration(m.config.Interval)
|
||||
if err != nil {
|
||||
cclog.ComponentError(m.name, "Error parsing interval:", err.Error())
|
||||
return err
|
||||
}
|
||||
if m.config.CgroupVersion != "v1" && m.config.CgroupVersion != "v2" {
|
||||
cclog.ComponentError(m.name, "Invalid cgroup version", m.config.CgroupVersion, ":", err.Error())
|
||||
return err
|
||||
}
|
||||
|
||||
// Storage for output channel
|
||||
m.output = nil
|
||||
// Management channel for the timer function.
|
||||
m.done = make(chan bool)
|
||||
// Create the own ticker
|
||||
m.ticker = time.NewTicker(m.interval)
|
||||
// Create space for storing files
|
||||
m.directories = make(map[string]SlurmJobMetadata)
|
||||
|
||||
if _, err := os.Stat(m.config.BaseDirectory); err != nil {
|
||||
err := fmt.Errorf("cannot find base folder %s", m.config.BaseDirectory)
|
||||
cclog.ComponentError(m.name, err.Error())
|
||||
return err
|
||||
}
|
||||
cclog.ComponentDebug(m.name, "Using base directory", m.config.BaseDirectory)
|
||||
cpuacct_base = filepath.Join(m.config.BaseDirectory, "cpuacct", "slurm")
|
||||
memory_base = filepath.Join(m.config.BaseDirectory, "memory", "slurm")
|
||||
cpuset_base = filepath.Join(m.config.BaseDirectory, "cpuset", "slurm")
|
||||
devices_base = filepath.Join(m.config.BaseDirectory, "devices", "slurm")
|
||||
if m.config.CgroupVersion == "v2" {
|
||||
cclog.ComponentDebug(m.name, "Reconfiguring folders and filenames for cgroup/v2")
|
||||
cpuacct_base = filepath.Join(m.config.BaseDirectory, "system.slice", "slurmstepd.scope")
|
||||
memory_base = filepath.Join(m.config.BaseDirectory, "system.slice", "slurmstepd.scope")
|
||||
cpuset_base = filepath.Join(m.config.BaseDirectory, "system.slice", "slurmstepd.scope")
|
||||
devices_base = filepath.Join(m.config.BaseDirectory, "system.slice", "slurmstepd.scope")
|
||||
cpus_effective_file = cpus_effective_file_v2
|
||||
mems_effective_file = mems_effective_file_v2
|
||||
devices_list_file = devices_list_file_v2
|
||||
limit_in_bytes_file = limit_in_bytes_file_v2
|
||||
soft_limit_in_bytes_file = soft_limit_in_bytes_file_v2
|
||||
usage_in_bytes_file = usage_in_bytes_file_v2
|
||||
max_usage_in_bytes_file = max_usage_in_bytes_file_v2
|
||||
cpuacct_usage_file = cpuacct_usage_file_v2
|
||||
cpuacct_usage_user_file = cpuacct_usage_user_file_v2
|
||||
}
|
||||
if _, err := os.Stat(cpuacct_base); err != nil {
|
||||
err := fmt.Errorf("cannot find SLURM cgroup folder %s", cpuacct_base)
|
||||
cclog.ComponentError(m.name, err.Error())
|
||||
return err
|
||||
}
|
||||
|
||||
m.wg.Add(1)
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-m.done:
|
||||
// Exit the timer loop
|
||||
cclog.ComponentDebug(m.name, "Closing...")
|
||||
m.wg.Done()
|
||||
return
|
||||
case timestamp := <-m.ticker.C:
|
||||
if m.output != nil {
|
||||
cclog.ComponentDebug(m.name, "Checking events")
|
||||
m.CheckEvents(timestamp)
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Set this flag only if everything is initialized properly, all required files exist, ...
|
||||
m.init = true
|
||||
return err
|
||||
}
|
||||
|
||||
// Read collects all metrics belonging to the SlurmJobDetector collector
|
||||
// and sends them through the output channel to the collector manager
|
||||
func (m *SlurmJobDetector) Read(interval time.Duration, output chan lp.CCMetric) {
|
||||
// Create a sample metric
|
||||
timestamp := time.Now()
|
||||
// Capture output channel for event sending in goroutine, so at startup, the event checking
|
||||
// waits until the first call of Read()
|
||||
m.output = output
|
||||
|
||||
// This is the reading for metrics for all running jobs. For the event checking, check
|
||||
// the goroutine in Init()
|
||||
|
||||
parts := make([]string, 0)
|
||||
parts = append(parts, cpuacct_base)
|
||||
// Only cgroup/v1 has a uid_* folder
|
||||
if m.config.CgroupVersion == "v1" {
|
||||
parts = append(parts, "uid_[0-9]*")
|
||||
}
|
||||
parts = append(parts, "job_[0-9]*")
|
||||
// Get folders based on constructed glob path
|
||||
dirs, err := filepath.Glob(filepath.Join(parts...))
|
||||
if err != nil {
|
||||
cclog.ComponentError(m.name, "Cannot get directory list for SLURM jobs")
|
||||
return
|
||||
}
|
||||
if m.config.SendStepEvents {
|
||||
// Add step lookup if we process step events
|
||||
parts = append(parts, "step_*")
|
||||
// Get step folders based on constructed glob path
|
||||
sdirs, err := filepath.Glob(filepath.Join(parts...))
|
||||
if err != nil {
|
||||
cclog.ComponentError(m.name, "Cannot get directory list for SLURM steps")
|
||||
return
|
||||
}
|
||||
// Add step folders to directory list for processsing
|
||||
dirs = append(dirs, sdirs...)
|
||||
}
|
||||
|
||||
for _, d := range dirs {
|
||||
dirParts := GetPathParts(d) // Gets uid_*, job_* and step_* (if available)
|
||||
uid, jobid, step := GetIdsFromParts(dirParts) // extracts the IDs from the available parts
|
||||
|
||||
// Create tags map for the job
|
||||
jobtags := map[string]string{
|
||||
"type": "job",
|
||||
"type-id": jobid,
|
||||
}
|
||||
// Create meta map for the job
|
||||
jobmeta := make(map[string]string)
|
||||
|
||||
// if cgroup/v1, we have a uid
|
||||
if len(uid) > 0 {
|
||||
jobmeta["uid"] = uid
|
||||
uname, err := osuser.LookupId(uid)
|
||||
if err == nil {
|
||||
jobmeta["username"] = uname.Username
|
||||
}
|
||||
}
|
||||
|
||||
// if this is a step directory, add the sub type with value
|
||||
if len(step) > 0 {
|
||||
jobtags["stype"] = "step"
|
||||
jobtags["stype-id"] = step
|
||||
}
|
||||
jobmetrics, err := m.ReadMetrics(parts)
|
||||
if err != nil {
|
||||
// Send all metrics for the job
|
||||
m.SendMetrics(jobtags, jobmeta, jobmetrics, timestamp, output)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Close metric collector: close network connection, close files, close libraries, ...
|
||||
// Called once by the collector manager
|
||||
func (m *SlurmJobDetector) Close() {
|
||||
m.done <- true
|
||||
m.wg.Wait()
|
||||
// Unset flag
|
||||
m.init = false
|
||||
}
|
45
collectors/slurmJobDetector.md
Normal file
45
collectors/slurmJobDetector.md
Normal file
@@ -0,0 +1,45 @@
|
||||
# `slurm` collector
|
||||
|
||||
```json
|
||||
"slurm": {
|
||||
"interval" : "1s",
|
||||
"send_job_events" : true,
|
||||
"send_job_metrics" : true,
|
||||
"send_step_events": false,
|
||||
"send_step_metrics" : false,
|
||||
"cgroup_version" : "v1"
|
||||
}
|
||||
```
|
||||
|
||||
The `slurm` collector reads the data from `/sys/fs/cgroup/` to detect the creation and deletion of SLURM jobs on the node. Then detecting an event, it collects some event related information and sends the event. The event detection happens every `interval`.
|
||||
|
||||
Additionally, for all running jobs, is can collect metrics and send them out. This collection is done in the global collector interval.
|
||||
|
||||
Options:
|
||||
* `interval`: Time interval in which the folders are checked for new or vanished SLURM jobs
|
||||
* `send_job_events`: Send events when a job starts or ends
|
||||
* `send_job_metrics`: Send metrics of each running job with the global collector interval
|
||||
* `send_step_events`: Send events when a job step starts
|
||||
* `send_step_metrics`: Send metrics of each job step with the global collector interval
|
||||
* `cgroup_version`: Which cgroup version is in use. Possible values are `v1` and `v2`. `v1` is the default
|
||||
* `sysfs_base`: (Testing only) Set the base path for lookups, default `/sys/fs/cgroup`.
|
||||
|
||||
For `cgroup_version = v2`, the collector searches for jobs at `<sysfs_base>/system.slice/slurmstepd.scope`, by default with `<sysfs_base>=/sys/fs/cgroup`. If the cgroup folders are created below `/sys/fs/cgroup/unified`, adjust the `sysfs_base` option to `/sys/fs/cgroup/unified`.
|
||||
|
||||
## Testing
|
||||
For testing the collector, you can specifiy a different base directory that should be checked for new events. The default is `/sys/fs/cgroup/`. By specifying a `sysfs_base` in the configuration, this can be changed. Moreover, with the `slurmJobDetector_dummy.sh`, you can create and delete "jobs" for testing. Use the same directory with `--basedir`. It generates only cgroup/v1 directory structures at the moment.
|
||||
|
||||
```sh
|
||||
$ slurmJobDetector_dummy.sh -h
|
||||
|
||||
Usage: slurmJobDetector_dummy.sh <opts>
|
||||
[ -h | --help ]
|
||||
[ -v | --verbosity ]
|
||||
[ -u | --uid <UID> (default: XXXX) ]
|
||||
[ -j | --jobid <JOBID> (default: random) ]
|
||||
[ -b | --basedir <JOBID> (default: ./slurm-test) ]
|
||||
[ -d | --delete ]
|
||||
[ -l | --list ]
|
||||
```
|
||||
|
||||
With no options, it creates a job with the executing user's UID and a random JOBID. For deletion, use `-d -j JOBID`, deletion requires a JOBID. If you want to get a list of all UIDs and JOBIDs that currently exist, you can get the list with `-l`.
|
139
collectors/slurmJobDetector_dummy.sh
Executable file
139
collectors/slurmJobDetector_dummy.sh
Executable file
@@ -0,0 +1,139 @@
|
||||
#!/bin/bash -l
|
||||
|
||||
|
||||
# Some settings for scripting with less headache
|
||||
|
||||
# when a command fails, bash exits instead of continuing
|
||||
set -o errexit
|
||||
|
||||
# make the script fail, when accessing an unset variable
|
||||
# use "${VARNAME-}" instead of "$VARNAME" when you want to access
|
||||
# a variable that may or may not have been set
|
||||
set -o nounset
|
||||
|
||||
# ensure that a pipeline command is treated as failed, even if one command in the pipeline fails
|
||||
set -o pipefail
|
||||
|
||||
# enable debug mode, by running your script as TRACE=1 ./script.sh
|
||||
if [[ "${TRACE-0}" == "1" ]]; then
|
||||
set -o xtrace
|
||||
fi
|
||||
|
||||
|
||||
# Default values for variables
|
||||
: ${UID=$(id -u)}
|
||||
: ${VERBOSITY=0}
|
||||
: ${DELETE=0}
|
||||
: ${LIST=0}
|
||||
: ${JOBID="random"}
|
||||
: ${BASE=./slurmJobDetector-sys-fs-cgroup}
|
||||
|
||||
# Print usage if needed
|
||||
usage()
|
||||
{
|
||||
echo "
|
||||
Usage: $(basename $0) <opts>
|
||||
[ -h | --help ]
|
||||
[ -v | --verbosity ]
|
||||
[ -u | --uid <UID> (default: ${UID}) ]
|
||||
[ -j | --jobid <JOBID> (default: ${JOBID}) ]
|
||||
[ -b | --basedir <JOBID> (default: ${BASE}) ]
|
||||
[ -d | --delete ]
|
||||
[ -l | --list ]
|
||||
"
|
||||
exit $1;
|
||||
}
|
||||
|
||||
cd "$(dirname "$0")"
|
||||
|
||||
main() {
|
||||
PARSED_ARGUMENTS=$(getopt -a -n $(basename $0) -o hj:u:vb:dl --long help,verbosity,uid:,jobid:,basedir:,delete,list -- "$@")
|
||||
VALID_ARGUMENTS=$?
|
||||
# Parsing failed
|
||||
if [[ "$VALID_ARGUMENTS" != "0" ]]; then
|
||||
usage 2
|
||||
fi
|
||||
# No argument (comment out if command should work without any arguments)
|
||||
# if [[ "${PARSED_ARGUMENTS}" == " --" ]]; then
|
||||
# usage 0
|
||||
# fi
|
||||
# Evaluate arguments
|
||||
eval set -- "$PARSED_ARGUMENTS"
|
||||
while :
|
||||
do
|
||||
case "$1" in
|
||||
-h | --help) usage 0; shift ;;
|
||||
-v | --verbosity) VERBOSITY=1; shift ;;
|
||||
-d | --delete) DELETE=1; shift ;;
|
||||
-l | --list) LIST=1; shift ;;
|
||||
-u | --uid) UID=$2 ; shift 2 ;;
|
||||
-j | --jobid) JOBID=$2 ; shift 2 ;;
|
||||
-b | --basedir) BASE=$2 ; shift 2 ;;
|
||||
--) shift; break ;;
|
||||
*) echo "Unexpected option: $1 - this should not happen."
|
||||
usage 2;;
|
||||
esac
|
||||
done
|
||||
|
||||
if [[ ${LIST} -eq 1 ]]; then
|
||||
for F in $(ls -d ${BASE}/cpuset/slurm/uid_*/job_*); do
|
||||
JOBID=$(echo "$F" | rev | cut -d '/' -f 1 | rev | cut -d '_' -f 2)
|
||||
MYUID=$(echo "$F" | rev | cut -d '/' -f 2 | rev | cut -d '_' -f 2)
|
||||
echo "UID ${MYUID} JOBID ${JOBID}"
|
||||
done
|
||||
exit 0
|
||||
fi
|
||||
|
||||
|
||||
if [[ ${JOBID} == "random" ]]; then
|
||||
if [[ ${DELETE} -eq 1 ]]; then
|
||||
echo "Cannot use random JOBID for deletion"
|
||||
exit 1
|
||||
else
|
||||
JOBID=$RANDOM
|
||||
fi
|
||||
fi
|
||||
|
||||
FOLDERS="cpuset cpuacct memory devices"
|
||||
|
||||
if [[ ${DELETE} -eq 1 ]]; then
|
||||
for F in ${FOLDERS}; do
|
||||
rm -r --force "${BASE}/${F}/slurm/uid_${UID}/job_${JOBID}"
|
||||
done
|
||||
else
|
||||
for F in ${FOLDERS}; do
|
||||
if [[ $VERBOSITY -eq 1 ]]; then
|
||||
echo "${BASE}/${F}/slurm/uid_${UID}/job_${JOBID}"
|
||||
fi
|
||||
mkdir -p "${BASE}/${F}/slurm/uid_${UID}/job_${JOBID}"
|
||||
done
|
||||
|
||||
echo "0-71" > "${BASE}/cpuset/slurm/uid_${UID}/job_${JOBID}/cpuset.effective_cpus"
|
||||
echo "0-3" > "${BASE}/cpuset/slurm/uid_${UID}/job_${JOBID}/cpuset.effective_mems"
|
||||
|
||||
echo "249036800000" > "${BASE}/memory/slurm/uid_${UID}/job_${JOBID}/memory.limit_in_bytes"
|
||||
echo "249036800000" > "${BASE}/memory/slurm/uid_${UID}/job_${JOBID}/memory.soft_limit_in_bytes"
|
||||
echo "13987840" > "${BASE}/memory/slurm/uid_${UID}/job_${JOBID}/memory.usage_in_bytes"
|
||||
echo "14966784" > "${BASE}/memory/slurm/uid_${UID}/job_${JOBID}/memory.max_usage_in_bytes"
|
||||
echo "60" > "${BASE}/memory/slurm/uid_${UID}/job_${JOBID}/memory.swappiness"
|
||||
|
||||
echo "474140369" > "${BASE}/cpuacct/slurm/uid_${UID}/job_${JOBID}/cpuacct.usage"
|
||||
echo "169078878" > "${BASE}/cpuacct/slurm/uid_${UID}/job_${JOBID}/cpuacct.usage_user"
|
||||
echo "315684619" > "${BASE}/cpuacct/slurm/uid_${UID}/job_${JOBID}/cpuacct.usage_sys"
|
||||
|
||||
echo "a *:* rwm" > "${BASE}/devices/slurm/uid_${UID}/job_${JOBID}/devices.list"
|
||||
#memory.numa_stat
|
||||
#total=0 N0=0 N1=0 N2=0 N3=0
|
||||
#file=0 N0=0 N1=0 N2=0 N3=0
|
||||
#anon=0 N0=0 N1=0 N2=0 N3=0
|
||||
#unevictable=0 N0=0 N1=0 N2=0 N3=0
|
||||
#hierarchical_total=958 N0=28 N1=579 N2=180 N3=171
|
||||
#hierarchical_file=194 N0=0 N1=194 N2=0 N3=0
|
||||
#hierarchical_anon=764 N0=28 N1=385 N2=180 N3=171
|
||||
#hierarchical_unevictable=0 N0=0 N1=0 N2=0 N3=0
|
||||
|
||||
fi
|
||||
}
|
||||
|
||||
main "$@"
|
||||
|
1
go.mod
1
go.mod
@@ -9,7 +9,6 @@ require (
|
||||
github.com/PaesslerAG/gval v1.2.2
|
||||
github.com/fsnotify/fsnotify v1.6.0
|
||||
github.com/gorilla/mux v1.8.0
|
||||
github.com/gosnmp/gosnmp v1.37.0
|
||||
github.com/influxdata/influxdb-client-go/v2 v2.12.3
|
||||
github.com/influxdata/line-protocol v0.0.0-20210922203350-b1ad95c89adf
|
||||
github.com/influxdata/line-protocol/v2 v2.2.1
|
||||
|
4
go.sum
4
go.sum
@@ -87,7 +87,7 @@ github.com/gomarkdown/markdown v0.0.0-20230922112808-5421fefb8386 h1:EcQR3gusLHN
|
||||
github.com/gomarkdown/markdown v0.0.0-20230922112808-5421fefb8386/go.mod h1:JDGcbDT52eL4fju3sZ4TeHGsQwhG9nbDV21aMyhwPoA=
|
||||
github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
|
||||
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
|
||||
github.com/google/go-querystring v1.1.0 h1:AnCroh3fv4ZBgVIf1Iwtovgjaw/GiKJo8M8yD/fhyJ8=
|
||||
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
|
||||
github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4=
|
||||
@@ -97,8 +97,6 @@ github.com/gorilla/css v1.0.0/go.mod h1:Dn721qIggHpt4+EFCcTLTU/vk5ySda2ReITrtgBl
|
||||
github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI=
|
||||
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
|
||||
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
|
||||
github.com/gosnmp/gosnmp v1.37.0 h1:/Tf8D3b9wrnNuf/SfbvO+44mPrjVphBhRtcGg22V07Y=
|
||||
github.com/gosnmp/gosnmp v1.37.0/go.mod h1:GDH9vNqpsD7f2HvZhKs5dlqSEcAS6s6Qp099oZRCR+M=
|
||||
github.com/imkira/go-interpol v1.1.0 h1:KIiKr0VSG2CUW1hl1jpiyuzuJeKUUpC8iM1AIE7N1Vk=
|
||||
github.com/influxdata/influxdb-client-go/v2 v2.12.3 h1:28nRlNMRIV4QbtIUvxhWqaxn0IpXeMSkY/uJa/O/vC4=
|
||||
github.com/influxdata/influxdb-client-go/v2 v2.12.3/go.mod h1:IrrLUbCjjfkmRuaCiGQg4m2GbkaeJDcuWoxiWdQEbA0=
|
||||
|
@@ -24,7 +24,6 @@ This allows to specify
|
||||
- [`http`](./httpReceiver.md): Listen for HTTP Post requests transporting metrics in InfluxDB line protocol
|
||||
- [`ipmi`](./ipmiReceiver.md): Read IPMI sensor readings
|
||||
- [`redfish`](redfishReceiver.md) Use the Redfish (specification) to query thermal and power metrics
|
||||
- [`snmp`](./snmpReceiver.md) Query SNMP endpoints in the network
|
||||
|
||||
## Contributing own receivers
|
||||
|
||||
|
@@ -15,7 +15,6 @@ var AvailableReceivers = map[string]func(name string, config json.RawMessage) (R
|
||||
"ipmi": NewIPMIReceiver,
|
||||
"nats": NewNatsReceiver,
|
||||
"redfish": NewRedfishReceiver,
|
||||
"snmp": NewSNMPReceiver,
|
||||
}
|
||||
|
||||
type receiveManager struct {
|
||||
|
@@ -1,294 +0,0 @@
|
||||
package receivers
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"regexp"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
|
||||
lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
|
||||
"github.com/gosnmp/gosnmp"
|
||||
)
|
||||
|
||||
type SNMPReceiverTargetConfig struct {
|
||||
Hostname string `json:"hostname"`
|
||||
Port int `json:"port,omitempty"`
|
||||
Community string `json:"community,omitempty"`
|
||||
Timeout string `json:"timeout,omitempty"`
|
||||
timeout time.Duration
|
||||
Version string `json:"version,omitempty"`
|
||||
Type string `json:"type,omitempty"`
|
||||
TypeId string `json:"type-id,omitempty"`
|
||||
SubType string `json:"subtype,omitempty"`
|
||||
SubTypeId string `json:"subtype-id,omitempty"`
|
||||
}
|
||||
|
||||
type SNMPReceiverMetricConfig struct {
|
||||
Name string `json:"name"`
|
||||
OID string `json:"oid"`
|
||||
Unit string `json:"unit,omitempty"`
|
||||
}
|
||||
|
||||
// SNMPReceiver configuration: receiver type, listen address, port
|
||||
type SNMPReceiverConfig struct {
|
||||
Type string `json:"type"`
|
||||
Targets []SNMPReceiverTargetConfig `json:"targets"`
|
||||
Metrics []SNMPReceiverMetricConfig `json:"metrics"`
|
||||
ReadInterval string `json:"read_interval,omitempty"`
|
||||
}
|
||||
|
||||
type SNMPReceiver struct {
|
||||
receiver
|
||||
config SNMPReceiverConfig
|
||||
|
||||
// Storage for static information
|
||||
meta map[string]string
|
||||
tags map[string]string
|
||||
// Use in case of own go routine
|
||||
done chan bool
|
||||
wg sync.WaitGroup
|
||||
interval time.Duration
|
||||
}
|
||||
|
||||
func validOid(oid string) bool {
|
||||
// Regex from https://github.com/BornToBeRoot/NETworkManager/blob/6805740762bf19b95051c7eaa73cf2b4727733c3/Source/NETworkManager.Utilities/RegexHelper.cs#L88
|
||||
// Match on leading dot added by Thomas Gruber <thomas.gruber@fau.de>
|
||||
match, err := regexp.MatchString(`^[\.]?[012]\.(?:[0-9]|[1-3][0-9])(\.\d+)*$`, oid)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
return match
|
||||
}
|
||||
|
||||
func (r *SNMPReceiver) readTarget(target SNMPReceiverTargetConfig, output chan lp.CCMetric) {
|
||||
port := uint16(161)
|
||||
comm := "public"
|
||||
timeout := time.Duration(1) * time.Second
|
||||
version := gosnmp.Version2c
|
||||
timestamp := time.Now()
|
||||
if target.Port > 0 {
|
||||
port = uint16(target.Port)
|
||||
}
|
||||
if len(target.Community) > 0 {
|
||||
comm = target.Community
|
||||
}
|
||||
if target.timeout > 0 {
|
||||
timeout = target.timeout
|
||||
}
|
||||
if len(target.Version) > 0 {
|
||||
switch target.Version {
|
||||
case "1":
|
||||
version = gosnmp.Version1
|
||||
case "2c":
|
||||
version = gosnmp.Version2c
|
||||
case "3":
|
||||
version = gosnmp.Version3
|
||||
default:
|
||||
cclog.ComponentError(r.name, "Invalid SNMP version ", target.Version)
|
||||
return
|
||||
}
|
||||
}
|
||||
params := &gosnmp.GoSNMP{
|
||||
Target: target.Hostname,
|
||||
Port: port,
|
||||
Community: comm,
|
||||
Version: version,
|
||||
Timeout: timeout,
|
||||
}
|
||||
err := params.Connect()
|
||||
if err != nil {
|
||||
cclog.ComponentError(r.name, err.Error())
|
||||
return
|
||||
}
|
||||
for _, metric := range r.config.Metrics {
|
||||
if !validOid(metric.OID) {
|
||||
cclog.ComponentDebug(r.name, "Skipping ", metric.Name, ", not valid OID: ", metric.OID)
|
||||
continue
|
||||
}
|
||||
oids := make([]string, 0)
|
||||
name := gosnmp.SnmpPDU{
|
||||
Value: metric.Name,
|
||||
Name: metric.Name,
|
||||
}
|
||||
nameidx := -1
|
||||
value := gosnmp.SnmpPDU{
|
||||
Value: nil,
|
||||
Name: metric.OID,
|
||||
}
|
||||
valueidx := -1
|
||||
unit := gosnmp.SnmpPDU{
|
||||
Value: metric.Unit,
|
||||
Name: metric.Unit,
|
||||
}
|
||||
unitidx := -1
|
||||
idx := 0
|
||||
if validOid(metric.Name) {
|
||||
oids = append(oids, metric.Name)
|
||||
nameidx = idx
|
||||
idx = idx + 1
|
||||
}
|
||||
if validOid(metric.OID) {
|
||||
oids = append(oids, metric.OID)
|
||||
valueidx = idx
|
||||
idx = idx + 1
|
||||
}
|
||||
if len(metric.Unit) > 0 && validOid(metric.Unit) {
|
||||
oids = append(oids, metric.Unit)
|
||||
unitidx = idx
|
||||
}
|
||||
//cclog.ComponentDebug(r.name, len(oids), oids)
|
||||
result, err := params.Get(oids)
|
||||
if err != nil {
|
||||
cclog.ComponentError(r.name, "failed to get data for OIDs ", strings.Join(oids, ","), ": ", err.Error())
|
||||
continue
|
||||
}
|
||||
if nameidx >= 0 && len(result.Variables) > nameidx {
|
||||
name = result.Variables[nameidx]
|
||||
}
|
||||
if valueidx >= 0 && len(result.Variables) > valueidx {
|
||||
value = result.Variables[valueidx]
|
||||
}
|
||||
if unitidx >= 0 && len(result.Variables) > unitidx {
|
||||
unit = result.Variables[unitidx]
|
||||
}
|
||||
tags := r.tags
|
||||
if len(target.Type) > 0 {
|
||||
tags["type"] = target.Type
|
||||
}
|
||||
if len(target.TypeId) > 0 {
|
||||
tags["type-id"] = target.TypeId
|
||||
}
|
||||
if len(target.SubType) > 0 {
|
||||
tags["stype"] = target.SubType
|
||||
}
|
||||
if len(target.SubTypeId) > 0 {
|
||||
tags["stype-id"] = target.SubTypeId
|
||||
}
|
||||
if value.Value != nil {
|
||||
y, err := lp.New(name.Value.(string), tags, r.meta, map[string]interface{}{"value": value.Value}, timestamp)
|
||||
if err == nil {
|
||||
if len(unit.Name) > 0 && unit.Value != nil {
|
||||
y.AddMeta("unit", unit.Value.(string))
|
||||
}
|
||||
output <- y
|
||||
}
|
||||
}
|
||||
}
|
||||
params.Conn.Close()
|
||||
}
|
||||
|
||||
// Implement functions required for Receiver interface
|
||||
// Start(), Close()
|
||||
// See: metricReceiver.go
|
||||
|
||||
func (r *SNMPReceiver) Start() {
|
||||
cclog.ComponentDebug(r.name, "START")
|
||||
|
||||
r.done = make(chan bool)
|
||||
r.wg.Add(1)
|
||||
go func() {
|
||||
defer r.wg.Done()
|
||||
|
||||
// Create ticker
|
||||
ticker := time.NewTicker(r.interval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
// process ticker event -> continue
|
||||
if r.sink != nil {
|
||||
for _, t := range r.config.Targets {
|
||||
select {
|
||||
case <-r.done:
|
||||
return
|
||||
default:
|
||||
r.readTarget(t, r.sink)
|
||||
}
|
||||
}
|
||||
}
|
||||
continue
|
||||
case <-r.done:
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Close receiver: close network connection, close files, close libraries, ...
|
||||
func (r *SNMPReceiver) Close() {
|
||||
cclog.ComponentDebug(r.name, "CLOSE")
|
||||
|
||||
r.done <- true
|
||||
r.wg.Wait()
|
||||
}
|
||||
|
||||
// New function to create a new instance of the receiver
|
||||
// Initialize the receiver by giving it a name and reading in the config JSON
|
||||
func NewSNMPReceiver(name string, config json.RawMessage) (Receiver, error) {
|
||||
var err error = nil
|
||||
r := new(SNMPReceiver)
|
||||
|
||||
// Set name of SNMPReceiver
|
||||
// The name should be chosen in such a way that different instances of SNMPReceiver can be distinguished
|
||||
r.name = fmt.Sprintf("SNMPReceiver(%s)", name)
|
||||
|
||||
// Set static information
|
||||
r.meta = map[string]string{"source": r.name, "group": "SNMP"}
|
||||
r.tags = map[string]string{"type": "node"}
|
||||
|
||||
// Set defaults in r.config
|
||||
r.interval = time.Duration(30) * time.Second
|
||||
|
||||
// Read the sample receiver specific JSON config
|
||||
if len(config) > 0 {
|
||||
err := json.Unmarshal(config, &r.config)
|
||||
if err != nil {
|
||||
cclog.ComponentError(r.name, "Error reading config:", err.Error())
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// Check that all required fields in the configuration are set
|
||||
if len(r.config.Targets) == 0 {
|
||||
err = fmt.Errorf("no targets configured, exiting")
|
||||
cclog.ComponentError(r.name, err.Error())
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(r.config.Metrics) == 0 {
|
||||
err = fmt.Errorf("no metrics configured, exiting")
|
||||
cclog.ComponentError(r.name, err.Error())
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(r.config.ReadInterval) > 0 {
|
||||
d, err := time.ParseDuration(r.config.ReadInterval)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("failed to parse read interval, exiting")
|
||||
cclog.ComponentError(r.name, err.Error())
|
||||
return nil, err
|
||||
}
|
||||
r.interval = d
|
||||
}
|
||||
newtargets := make([]SNMPReceiverTargetConfig, 0)
|
||||
for _, t := range r.config.Targets {
|
||||
t.timeout = time.Duration(1) * time.Second
|
||||
if len(t.Timeout) > 0 {
|
||||
d, err := time.ParseDuration(t.Timeout)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("failed to parse interval for target %s", t.Hostname)
|
||||
cclog.ComponentError(r.name, err.Error())
|
||||
continue
|
||||
}
|
||||
t.timeout = d
|
||||
}
|
||||
newtargets = append(newtargets, t)
|
||||
}
|
||||
r.config.Targets = newtargets
|
||||
|
||||
return r, nil
|
||||
}
|
@@ -1,60 +0,0 @@
|
||||
# SNMP Receiver
|
||||
|
||||
```json
|
||||
"<name>": {
|
||||
"type": "snmp",
|
||||
"read_interval": "30s",
|
||||
"targets" : [{
|
||||
"hostname" : "host1.example.com",
|
||||
"port" : 161,
|
||||
"community": "public",
|
||||
"timeout" : 1,
|
||||
}],
|
||||
"metrics" : [
|
||||
{
|
||||
"name": "sensor1",
|
||||
"value": "1.3.6.1.2.1.1.4.0",
|
||||
"unit": "1.3.6.1.2.1.1.7.0",
|
||||
},
|
||||
{
|
||||
"name": "1.3.6.1.2.1.1.2.0",
|
||||
"value": "1.3.6.1.2.1.1.4.0",
|
||||
"unit": "mb/s",
|
||||
}
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
The `snmp` receiver uses [gosnmp](https://github.com/gosnmp/gosnmp) to read metrics from network-attached devices.
|
||||
|
||||
The configuration of SNMP is quite extensive due to it's flexibility.
|
||||
|
||||
## Configuration
|
||||
|
||||
- `type` has to be `snmp`
|
||||
- `read_interval` as duration like '1s' or '20s' (default '30s')
|
||||
|
||||
For the receiver, the configuration is split in two parts:
|
||||
### Target configuration
|
||||
|
||||
Each network-attached device that should be queried. A target consits of
|
||||
- `hostname`
|
||||
- `port` (default 161)
|
||||
- `community` (default `public`)
|
||||
- `timeout` as duration like '1s' or '20s' (default '1s')
|
||||
- `version` SNMP version `X` (`X` in `1`, `2c`, `3`) (default `2c`)
|
||||
- `type` to specify `type` tag for the target (default `node`)
|
||||
- `type-id` to specify `type-id` tag for the target
|
||||
- `stype` to specify `stype` tag (sub type) for the target
|
||||
- `stype-id` to specify `stype-id` tag for the target
|
||||
|
||||
### Metric configuration
|
||||
- `name` can be an OID or a user-given string
|
||||
- `value` has to be an OID
|
||||
- `unit` can be empty, an OID or a user-given string
|
||||
|
||||
If a OID is used for `name` or `unit`, the receiver will use the returned values to create the output metric. If there are any issues with the returned values, it uses the `OID`.
|
||||
|
||||
## Testing
|
||||
|
||||
For testing an SNMP endpoint and OIDs, you can use [`scripts/snmpReceiverTest`](../scripts/snmpReceiverTest)
|
@@ -25,7 +25,7 @@ CC_USER=clustercockpit
|
||||
CC_GROUP=clustercockpit
|
||||
CONF_DIR=/etc/cc-metric-collector
|
||||
PID_FILE=/var/run/$NAME.pid
|
||||
DAEMON=/usr/sbin/$NAME
|
||||
DAEMON=/usr/bin/$NAME
|
||||
CONF_FILE=${CONF_DIR}/cc-metric-collector.json
|
||||
|
||||
umask 0027
|
||||
|
@@ -1,37 +0,0 @@
|
||||
# snmpReceiverTest
|
||||
|
||||
This script is a basic implementation of how the SNMPReceiver to test the connection before configuring
|
||||
the collector to get the data periodically.
|
||||
|
||||
It does not support the specification of the `type`, `type-id`, `stype` and `stype-id` but since they are
|
||||
not required to test the functionality, they are left out.
|
||||
|
||||
## Usage
|
||||
|
||||
```sh
|
||||
$ go run snmpReceiverTest -h
|
||||
Usage of snmpReceiverTest:
|
||||
-community string
|
||||
SNMP community (default "public")
|
||||
-hostname string
|
||||
Hostname (default "127.0.0.1")
|
||||
-name string
|
||||
Name of metric or OID
|
||||
-port string
|
||||
Port number (default "161")
|
||||
-timeout string
|
||||
Timeout for SNMP request (default "1s")
|
||||
-unit string
|
||||
Unit of metric or OID
|
||||
-value string
|
||||
Value OID
|
||||
-version string
|
||||
SNMP version (default "2c")
|
||||
```
|
||||
|
||||
## Example
|
||||
|
||||
```sh
|
||||
$ go run scripts/snmpReceiverTest/snmpReceiverTest.go -name serialNumber -value .1.3.6.1.4.1.6574.1.5.2.0 -hostname $IP -community $COMMUNITY
|
||||
Name: serialNumber, Tags: map[type:node], Meta: map[], fields: map[value:18B0PCNXXXXX], Timestamp: 1702050709599311288
|
||||
```
|
@@ -1,167 +0,0 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
|
||||
"github.com/gosnmp/gosnmp"
|
||||
)
|
||||
|
||||
func ReadCLI() map[string]string {
|
||||
args := map[string]string{
|
||||
"port": "161",
|
||||
"community": "public",
|
||||
"version": "2c",
|
||||
"hostname": "127.0.0.1",
|
||||
"timeout": "1s",
|
||||
}
|
||||
|
||||
host_cfg := flag.String("hostname", "127.0.0.1", "Hostname")
|
||||
port_cfg := flag.String("port", "161", "Port number")
|
||||
comm_cfg := flag.String("community", "public", "SNMP community")
|
||||
vers_cfg := flag.String("version", "2c", "SNMP version")
|
||||
time_cfg := flag.String("timeout", "1s", "Timeout for SNMP request")
|
||||
|
||||
name_cfg := flag.String("name", "", "Name of metric or OID")
|
||||
value_cfg := flag.String("value", "", "Value OID")
|
||||
unit_cfg := flag.String("unit", "", "Unit of metric or OID")
|
||||
|
||||
flag.Parse()
|
||||
|
||||
args["port"] = *port_cfg
|
||||
args["community"] = *comm_cfg
|
||||
args["hostname"] = *host_cfg
|
||||
args["version"] = *vers_cfg
|
||||
args["timeout"] = *time_cfg
|
||||
|
||||
args["name"] = *name_cfg
|
||||
args["value"] = *value_cfg
|
||||
args["unit"] = *unit_cfg
|
||||
|
||||
if len(args["name"]) == 0 || len(args["value"]) == 0 {
|
||||
fmt.Printf("Required arguments: --name and --value\n")
|
||||
flag.Usage()
|
||||
}
|
||||
|
||||
return args
|
||||
}
|
||||
|
||||
func validOid(oid string) bool {
|
||||
// Regex from https://github.com/BornToBeRoot/NETworkManager/blob/6805740762bf19b95051c7eaa73cf2b4727733c3/Source/NETworkManager.Utilities/RegexHelper.cs#L88
|
||||
// Match on leading dot added by Thomas Gruber <thomas.gruber@fau.de>
|
||||
match, err := regexp.MatchString(`^[\.]?[012]\.(?:[0-9]|[1-3][0-9])(\.\d+)*$`, oid)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
return match
|
||||
}
|
||||
|
||||
func main() {
|
||||
|
||||
args := ReadCLI()
|
||||
|
||||
if len(args["name"]) == 0 || len(args["value"]) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
version := gosnmp.Version2c
|
||||
if len(args["version"]) > 0 {
|
||||
switch args["version"] {
|
||||
case "1":
|
||||
version = gosnmp.Version1
|
||||
case "2c":
|
||||
version = gosnmp.Version2c
|
||||
case "3":
|
||||
version = gosnmp.Version3
|
||||
default:
|
||||
fmt.Printf("Invalid SNMP version '%s'\n", args["version"])
|
||||
return
|
||||
}
|
||||
}
|
||||
v, err := strconv.ParseInt(args["port"], 10, 16)
|
||||
if err != nil {
|
||||
fmt.Printf("Failed to parse port number '%s'\n", args["port"])
|
||||
return
|
||||
}
|
||||
port := uint16(v)
|
||||
|
||||
t, err := time.ParseDuration(args["timeout"])
|
||||
if err != nil {
|
||||
fmt.Printf("Failed to parse timeout '%s'\n", args["timeout"])
|
||||
return
|
||||
}
|
||||
timeout := t
|
||||
|
||||
params := &gosnmp.GoSNMP{
|
||||
Target: args["hostname"],
|
||||
Port: port,
|
||||
Community: args["community"],
|
||||
Version: version,
|
||||
Timeout: timeout,
|
||||
}
|
||||
err = params.Connect()
|
||||
if err != nil {
|
||||
fmt.Printf("Failed to connect to %s:%d : %v\n", params.Target, params.Port, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
oids := make([]string, 0)
|
||||
idx := 0
|
||||
name := gosnmp.SnmpPDU{
|
||||
Value: args["name"],
|
||||
Name: args["name"],
|
||||
}
|
||||
nameidx := -1
|
||||
value := gosnmp.SnmpPDU{
|
||||
Value: nil,
|
||||
Name: args["value"],
|
||||
}
|
||||
valueidx := -1
|
||||
unit := gosnmp.SnmpPDU{
|
||||
Value: args["unit"],
|
||||
Name: args["unit"],
|
||||
}
|
||||
unitidx := -1
|
||||
if validOid(args["name"]) {
|
||||
oids = append(oids, args["name"])
|
||||
nameidx = idx
|
||||
idx++
|
||||
}
|
||||
if validOid(args["value"]) {
|
||||
oids = append(oids, args["value"])
|
||||
valueidx = idx
|
||||
idx++
|
||||
}
|
||||
if len(args["unit"]) > 0 && validOid(args["unit"]) {
|
||||
oids = append(oids, args["unit"])
|
||||
unitidx = idx
|
||||
}
|
||||
result, err := params.Get(oids)
|
||||
if err != nil {
|
||||
fmt.Printf("Failed to get data for OIDs [%s] : %v\n", strings.Join(oids, ", "), err.Error())
|
||||
return
|
||||
}
|
||||
if nameidx >= 0 && len(result.Variables) > nameidx {
|
||||
name = result.Variables[nameidx]
|
||||
}
|
||||
if valueidx >= 0 && len(result.Variables) > valueidx {
|
||||
value = result.Variables[valueidx]
|
||||
}
|
||||
if unitidx >= 0 && len(result.Variables) > unitidx {
|
||||
unit = result.Variables[unitidx]
|
||||
}
|
||||
if value.Value != nil {
|
||||
y, err := lp.New(name.Value.(string), map[string]string{"type": "node"}, map[string]string{}, map[string]interface{}{"value": value.Value}, time.Now())
|
||||
if err == nil {
|
||||
if len(unit.Name) > 0 && unit.Value != nil {
|
||||
y.AddMeta("unit", unit.Value.(string))
|
||||
}
|
||||
fmt.Println(y)
|
||||
}
|
||||
}
|
||||
}
|
@@ -45,6 +45,9 @@ type HttpSinkConfig struct {
|
||||
|
||||
// Maximum number of retries to connect to the http server (default: 3)
|
||||
MaxRetries int `json:"max_retries,omitempty"`
|
||||
|
||||
// Timestamp precision
|
||||
Precision string `json:"precision,omitempty"`
|
||||
}
|
||||
|
||||
type key_value_pair struct {
|
||||
@@ -141,7 +144,7 @@ func (s *HttpSink) Write(m lp.CCMetric) error {
|
||||
|
||||
// Check that encoding worked
|
||||
if err != nil {
|
||||
return fmt.Errorf("Encoding failed: %v", err)
|
||||
return fmt.Errorf("encoding failed: %v", err)
|
||||
}
|
||||
|
||||
if s.config.flushDelay == 0 {
|
||||
@@ -268,6 +271,7 @@ func NewHttpSink(name string, config json.RawMessage) (Sink, error) {
|
||||
s.config.Timeout = "5s"
|
||||
s.config.FlushDelay = "5s"
|
||||
s.config.MaxRetries = 3
|
||||
s.config.Precision = "ns"
|
||||
cclog.ComponentDebug(s.name, "Init()")
|
||||
|
||||
// Read config
|
||||
@@ -315,6 +319,19 @@ func NewHttpSink(name string, config json.RawMessage) (Sink, error) {
|
||||
cclog.ComponentDebug(s.name, "Init(): flushDelay", t)
|
||||
}
|
||||
}
|
||||
precision := influx.Nanosecond
|
||||
if len(s.config.Precision) > 0 {
|
||||
switch s.config.Precision {
|
||||
case "s":
|
||||
precision = influx.Second
|
||||
case "ms":
|
||||
precision = influx.Millisecond
|
||||
case "us":
|
||||
precision = influx.Microsecond
|
||||
case "ns":
|
||||
precision = influx.Nanosecond
|
||||
}
|
||||
}
|
||||
|
||||
// Create http client
|
||||
s.client = &http.Client{
|
||||
@@ -326,7 +343,7 @@ func NewHttpSink(name string, config json.RawMessage) (Sink, error) {
|
||||
}
|
||||
|
||||
// Configure influx line protocol encoder
|
||||
s.encoder.SetPrecision(influx.Nanosecond)
|
||||
s.encoder.SetPrecision(precision)
|
||||
s.extended_tag_list = make([]key_value_pair, 0)
|
||||
|
||||
return s, nil
|
||||
|
@@ -18,7 +18,8 @@ The `http` sink uses POST requests to a HTTP server to submit the metrics in the
|
||||
"timeout": "5s",
|
||||
"idle_connection_timeout" : "5s",
|
||||
"flush_delay": "2s",
|
||||
"batch_size": 1000
|
||||
"batch_size": 1000,
|
||||
"precision": "s"
|
||||
}
|
||||
}
|
||||
```
|
||||
@@ -34,3 +35,8 @@ The `http` sink uses POST requests to a HTTP server to submit the metrics in the
|
||||
- `idle_connection_timeout`: Timeout for idle connections (default '120s'). Should be larger than the measurement interval to keep the connection open
|
||||
- `flush_delay`: Batch all writes arriving in during this duration (default '1s', batching can be disabled by setting it to 0)
|
||||
- `batch_size`: Maximal batch size. If `batch_size` is reached before the end of `flush_delay`, the metrics are sent without further delay
|
||||
- `precision`: Precision of the timestamp. Valid values are 's', 'ms', 'us' and 'ns'. (default is 'ns')
|
||||
|
||||
### Using HttpSink for communication with cc-metric-store
|
||||
|
||||
The cc-metric-store only accepts metrics with a timestamp precision in seconds, so it is required to set `"precision": "s"`.
|
Reference in New Issue
Block a user