mirror of
https://github.com/ClusterCockpit/cc-metric-collector.git
synced 2025-07-19 11:21:41 +02:00
Compare commits
7 Commits
app_metric
...
lustre_job
Author | SHA1 | Date | |
---|---|---|---|
|
1fcb302620 | ||
|
00d8041254 | ||
|
6b0ac45d07 | ||
|
9e746006e8 | ||
|
fbcacf1a4f | ||
|
0b343171a7 | ||
|
b973e8ac9c |
2
Makefile
2
Makefile
@@ -22,7 +22,7 @@ GOBIN = $(shell which go)
|
||||
.PHONY: all
|
||||
all: $(APP)
|
||||
|
||||
$(APP): $(GOSRC) go.mod
|
||||
$(APP): $(GOSRC)
|
||||
make -C collectors
|
||||
$(GOBIN) get
|
||||
$(GOBIN) build -o $(APP) $(GOSRC_APP)
|
||||
|
@@ -51,7 +51,7 @@ A collector reads data from any source, parses it to metrics and submits these m
|
||||
* `Name() string`: Return the name of the collector
|
||||
* `Init(config json.RawMessage) error`: Initializes the collector using the given collector-specific config in JSON. Check if needed files/commands exists, ...
|
||||
* `Initialized() bool`: Check if a collector is successfully initialized
|
||||
* `Read(duration time.Duration, output chan ccMetric.CCMetric)`: Read, parse and submit data to the `output` channel as [`CCMetric`](../internal/ccMetric/README.md). If the collector has to measure anything for some duration, use the provided function argument `duration`.
|
||||
* `Read(duration time.Duration, output chan ccMetric.CCMetric)`: Read, parse and submit data to the `output` channel as [`CCMetric`](../internal/ccMetric/README.md). If the collector has to measure anything for some duration, use the provided function argument `duration`.
|
||||
* `Close()`: Closes down the collector.
|
||||
|
||||
It is recommanded to call `setup()` in the `Init()` function.
|
||||
|
@@ -20,6 +20,7 @@ var AvailableCollectors = map[string]MetricCollector{
|
||||
"netstat": new(NetstatCollector),
|
||||
"ibstat": new(InfinibandCollector),
|
||||
"lustrestat": new(LustreCollector),
|
||||
"lustre_jobstat": new(LustreJobstatCollector),
|
||||
"cpustat": new(CpustatCollector),
|
||||
"topprocs": new(TopProcsCollector),
|
||||
"nvidia": new(NvidiaCollector),
|
||||
@@ -36,9 +37,7 @@ var AvailableCollectors = map[string]MetricCollector{
|
||||
"numastats": new(NUMAStatsCollector),
|
||||
"beegfs_meta": new(BeegfsMetaCollector),
|
||||
"beegfs_storage": new(BeegfsStorageCollector),
|
||||
"rapl": new(RAPLCollector),
|
||||
"rocm_smi": new(RocmSmiCollector),
|
||||
"self": new(SelfCollector),
|
||||
"schedstat": new(SchedstatCollector),
|
||||
}
|
||||
|
||||
|
@@ -142,11 +142,6 @@ func (m *CPUFreqCpuInfoCollector) Init(config json.RawMessage) error {
|
||||
}
|
||||
}
|
||||
|
||||
// Check if at least one CPU with frequency information was detected
|
||||
if len(m.topology) == 0 {
|
||||
return fmt.Errorf("No CPU frequency info found in %s", cpuInfoFile)
|
||||
}
|
||||
|
||||
numPhysicalPackageID_int := maxPhysicalPackageID + 1
|
||||
numPhysicalPackageID := fmt.Sprint(numPhysicalPackageID_int)
|
||||
numNonHT := fmt.Sprint(numNonHT_int)
|
||||
|
@@ -23,18 +23,20 @@ type CPUFreqCollectorTopology struct {
|
||||
numPhysicalPackages string // number of sockets / packages
|
||||
numPhysicalPackages_int int64
|
||||
isHT bool
|
||||
numNonHT string // number of non hyper-threading processors
|
||||
numNonHT string // number of non hyperthreading processors
|
||||
numNonHT_int int64
|
||||
scalingCurFreqFile string
|
||||
tagSet map[string]string
|
||||
}
|
||||
|
||||
//
|
||||
// CPUFreqCollector
|
||||
// a metric collector to measure the current frequency of the CPUs
|
||||
// as obtained from the hardware (in KHz)
|
||||
// Only measure on the first hyper-thread
|
||||
// Only measure on the first hyper thread
|
||||
//
|
||||
// See: https://www.kernel.org/doc/html/latest/admin-guide/pm/cpufreq.html
|
||||
//
|
||||
type CPUFreqCollector struct {
|
||||
metricCollector
|
||||
topology []CPUFreqCollectorTopology
|
||||
@@ -124,7 +126,7 @@ func (m *CPUFreqCollector) Init(config json.RawMessage) error {
|
||||
t.scalingCurFreqFile = scalingCurFreqFile
|
||||
}
|
||||
|
||||
// is processor a hyper-thread?
|
||||
// is processor a hyperthread?
|
||||
coreSeenBefore := make(map[string]bool)
|
||||
for i := range m.topology {
|
||||
t := &m.topology[i]
|
||||
@@ -134,20 +136,23 @@ func (m *CPUFreqCollector) Init(config json.RawMessage) error {
|
||||
coreSeenBefore[globalID] = true
|
||||
}
|
||||
|
||||
// number of non hyper-thread cores and packages / sockets
|
||||
// number of non hyper thread cores and packages / sockets
|
||||
var numNonHT_int int64 = 0
|
||||
PhysicalPackageIDs := make(map[int64]struct{})
|
||||
var maxPhysicalPackageID int64 = 0
|
||||
for i := range m.topology {
|
||||
t := &m.topology[i]
|
||||
|
||||
// Update maxPackageID
|
||||
if t.physicalPackageID_int > maxPhysicalPackageID {
|
||||
maxPhysicalPackageID = t.physicalPackageID_int
|
||||
}
|
||||
|
||||
if !t.isHT {
|
||||
numNonHT_int++
|
||||
}
|
||||
|
||||
PhysicalPackageIDs[t.physicalPackageID_int] = struct{}{}
|
||||
}
|
||||
|
||||
numPhysicalPackageID_int := int64(len(PhysicalPackageIDs))
|
||||
numPhysicalPackageID_int := maxPhysicalPackageID + 1
|
||||
numPhysicalPackageID := fmt.Sprint(numPhysicalPackageID_int)
|
||||
numNonHT := fmt.Sprint(numNonHT_int)
|
||||
for i := range m.topology {
|
||||
@@ -163,13 +168,6 @@ func (m *CPUFreqCollector) Init(config json.RawMessage) error {
|
||||
}
|
||||
}
|
||||
|
||||
// Initialized
|
||||
cclog.ComponentDebug(
|
||||
m.name,
|
||||
"initialized",
|
||||
numPhysicalPackageID_int, "physical packages,",
|
||||
len(cpuDirs), "CPUs,",
|
||||
numNonHT, "non-hyper-threading CPUs")
|
||||
m.init = true
|
||||
return nil
|
||||
}
|
||||
@@ -184,7 +182,7 @@ func (m *CPUFreqCollector) Read(interval time.Duration, output chan lp.CCMetric)
|
||||
for i := range m.topology {
|
||||
t := &m.topology[i]
|
||||
|
||||
// skip hyper-threads
|
||||
// skip hyperthreads
|
||||
if t.isHT {
|
||||
continue
|
||||
}
|
||||
|
@@ -1,5 +1,4 @@
|
||||
## `cpufreq_cpuinfo` collector
|
||||
|
||||
```json
|
||||
"cpufreq": {
|
||||
"exclude_metrics": []
|
||||
@@ -9,5 +8,4 @@
|
||||
The `cpufreq` collector reads the clock frequency from `/sys/devices/system/cpu/cpu*/cpufreq` and outputs a handful **hwthread** metrics.
|
||||
|
||||
Metrics:
|
||||
|
||||
* `cpufreq`
|
||||
* `cpufreq`
|
@@ -1,6 +1,5 @@
|
||||
|
||||
## `cpustat` collector
|
||||
|
||||
```json
|
||||
"cpustat": {
|
||||
"exclude_metrics": [
|
||||
@@ -9,10 +8,9 @@
|
||||
}
|
||||
```
|
||||
|
||||
The `cpustat` collector reads data from `/proc/stat` and outputs a handful **node** and **hwthread** metrics. If a metric is not required, it can be excluded from forwarding it to the sink.
|
||||
The `cpustat` collector reads data from `/proc/stats` and outputs a handful **node** and **hwthread** metrics. If a metric is not required, it can be excluded from forwarding it to the sink.
|
||||
|
||||
Metrics:
|
||||
|
||||
* `cpu_user`
|
||||
* `cpu_nice`
|
||||
* `cpu_system`
|
||||
|
@@ -1,58 +1,51 @@
|
||||
package collectors
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"os"
|
||||
"os/exec"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
|
||||
lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
|
||||
)
|
||||
|
||||
const IPMITOOL_PATH = `ipmitool`
|
||||
const IPMISENSORS_PATH = `ipmi-sensors`
|
||||
|
||||
type IpmiCollectorConfig struct {
|
||||
ExcludeDevices []string `json:"exclude_devices"`
|
||||
IpmitoolPath string `json:"ipmitool_path"`
|
||||
IpmisensorsPath string `json:"ipmisensors_path"`
|
||||
}
|
||||
|
||||
type IpmiCollector struct {
|
||||
metricCollector
|
||||
config struct {
|
||||
ExcludeDevices []string `json:"exclude_devices"`
|
||||
IpmitoolPath string `json:"ipmitool_path"`
|
||||
IpmisensorsPath string `json:"ipmisensors_path"`
|
||||
}
|
||||
//tags map[string]string
|
||||
//matches map[string]string
|
||||
config IpmiCollectorConfig
|
||||
ipmitool string
|
||||
ipmisensors string
|
||||
}
|
||||
|
||||
func (m *IpmiCollector) Init(config json.RawMessage) error {
|
||||
// Check if already initialized
|
||||
if m.init {
|
||||
return nil
|
||||
}
|
||||
|
||||
m.name = "IpmiCollector"
|
||||
m.setup()
|
||||
m.parallel = true
|
||||
m.meta = map[string]string{
|
||||
"source": m.name,
|
||||
"group": "IPMI",
|
||||
}
|
||||
// default path to IPMI tools
|
||||
m.config.IpmitoolPath = "ipmitool"
|
||||
m.config.IpmisensorsPath = "ipmi-sensors"
|
||||
m.meta = map[string]string{"source": m.name, "group": "IPMI"}
|
||||
m.config.IpmitoolPath = string(IPMITOOL_PATH)
|
||||
m.config.IpmisensorsPath = string(IPMISENSORS_PATH)
|
||||
m.ipmitool = ""
|
||||
m.ipmisensors = ""
|
||||
if len(config) > 0 {
|
||||
err := json.Unmarshal(config, &m.config)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
// Check if executables ipmitool or ipmisensors are found
|
||||
p, err := exec.LookPath(m.config.IpmitoolPath)
|
||||
if err == nil {
|
||||
m.ipmitool = p
|
||||
@@ -69,33 +62,25 @@ func (m *IpmiCollector) Init(config json.RawMessage) error {
|
||||
}
|
||||
|
||||
func (m *IpmiCollector) readIpmiTool(cmd string, output chan lp.CCMetric) {
|
||||
|
||||
// Setup ipmitool command
|
||||
command := exec.Command(cmd, "sensor")
|
||||
stdout, _ := command.StdoutPipe()
|
||||
errBuf := new(bytes.Buffer)
|
||||
command.Stderr = errBuf
|
||||
|
||||
// start command
|
||||
if err := command.Start(); err != nil {
|
||||
cclog.ComponentError(
|
||||
m.name,
|
||||
fmt.Sprintf("readIpmiTool(): Failed to start command \"%s\": %v", command.String(), err),
|
||||
)
|
||||
command.Wait()
|
||||
stdout, err := command.Output()
|
||||
if err != nil {
|
||||
log.Print(err)
|
||||
return
|
||||
}
|
||||
|
||||
// Read command output
|
||||
scanner := bufio.NewScanner(stdout)
|
||||
for scanner.Scan() {
|
||||
lv := strings.Split(scanner.Text(), "|")
|
||||
ll := strings.Split(string(stdout), "\n")
|
||||
|
||||
for _, line := range ll {
|
||||
lv := strings.Split(line, "|")
|
||||
if len(lv) < 3 {
|
||||
continue
|
||||
}
|
||||
v, err := strconv.ParseFloat(strings.TrimSpace(lv[1]), 64)
|
||||
v, err := strconv.ParseFloat(strings.Trim(lv[1], " "), 64)
|
||||
if err == nil {
|
||||
name := strings.ToLower(strings.Replace(strings.TrimSpace(lv[0]), " ", "_", -1))
|
||||
unit := strings.TrimSpace(lv[2])
|
||||
name := strings.ToLower(strings.Replace(strings.Trim(lv[0], " "), " ", "_", -1))
|
||||
unit := strings.Trim(lv[2], " ")
|
||||
if unit == "Volts" {
|
||||
unit = "Volts"
|
||||
} else if unit == "degrees C" {
|
||||
@@ -113,17 +98,6 @@ func (m *IpmiCollector) readIpmiTool(cmd string, output chan lp.CCMetric) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Wait for command end
|
||||
if err := command.Wait(); err != nil {
|
||||
errMsg, _ := io.ReadAll(errBuf)
|
||||
cclog.ComponentError(
|
||||
m.name,
|
||||
fmt.Sprintf("readIpmiTool(): Failed to wait for the end of command \"%s\": %v\n", command.String(), err),
|
||||
fmt.Sprintf("readIpmiTool(): command stderr: \"%s\"\n", string(errMsg)),
|
||||
)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (m *IpmiCollector) readIpmiSensors(cmd string, output chan lp.CCMetric) {
|
||||
@@ -157,16 +131,16 @@ func (m *IpmiCollector) readIpmiSensors(cmd string, output chan lp.CCMetric) {
|
||||
}
|
||||
|
||||
func (m *IpmiCollector) Read(interval time.Duration, output chan lp.CCMetric) {
|
||||
|
||||
// Check if already initialized
|
||||
if !m.init {
|
||||
return
|
||||
}
|
||||
|
||||
if len(m.config.IpmitoolPath) > 0 {
|
||||
m.readIpmiTool(m.config.IpmitoolPath, output)
|
||||
_, err := os.Stat(m.config.IpmitoolPath)
|
||||
if err == nil {
|
||||
m.readIpmiTool(m.config.IpmitoolPath, output)
|
||||
}
|
||||
} else if len(m.config.IpmisensorsPath) > 0 {
|
||||
m.readIpmiSensors(m.config.IpmisensorsPath, output)
|
||||
_, err := os.Stat(m.config.IpmisensorsPath)
|
||||
if err == nil {
|
||||
m.readIpmiSensors(m.config.IpmisensorsPath, output)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -8,6 +8,9 @@
|
||||
}
|
||||
```
|
||||
|
||||
The `ipmistat` collector reads data from `ipmitool` (`ipmitool sensor`) or `ipmi-sensors` (`ipmi-sensors --sdr-cache-recreate --comma-separated-output`).
|
||||
The `ipmistat` collector reads data from `ipmitool` (`ipmitool sensor`) or `ipmi-sensors` (`ipmi-sensors --sdr-cache-recreate --comma-separated-output`).
|
||||
|
||||
The metrics depend on the output of the underlying tools but contain temperature, power and energy metrics.
|
||||
|
||||
|
||||
|
||||
|
270
collectors/lustreJobstatMetric.go
Normal file
270
collectors/lustreJobstatMetric.go
Normal file
@@ -0,0 +1,270 @@
|
||||
package collectors
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"os/exec"
|
||||
"regexp"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
|
||||
lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
|
||||
)
|
||||
|
||||
type LustreJobstatCollectorConfig struct {
|
||||
LCtlCommand string `json:"lctl_command,omitempty"`
|
||||
ExcludeMetrics []string `json:"exclude_metrics,omitempty"`
|
||||
Sudo bool `json:"use_sudo,omitempty"`
|
||||
SendAbsoluteValues bool `json:"send_abs_values,omitempty"`
|
||||
SendDerivedValues bool `json:"send_derived_values,omitempty"`
|
||||
SendDiffValues bool `json:"send_diff_values,omitempty"`
|
||||
JobRegex string `json:"jobid_regex,omitempty"`
|
||||
}
|
||||
|
||||
type LustreJobstatCollector struct {
|
||||
metricCollector
|
||||
tags map[string]string
|
||||
config LustreJobstatCollectorConfig
|
||||
lctl string
|
||||
sudoCmd string
|
||||
lastTimestamp time.Time // Store time stamp of last tick to derive bandwidths
|
||||
definitions []LustreMetricDefinition // Combined list without excluded metrics
|
||||
//stats map[string]map[string]int64 // Data for last value per device and metric
|
||||
lastMdtData *map[string]map[string]LustreMetricData
|
||||
lastObdfilterData *map[string]map[string]LustreMetricData
|
||||
jobidRegex *regexp.Regexp
|
||||
}
|
||||
|
||||
var defaultJobidRegex = `^(?P<jobid>[\d\w\.]+)$`
|
||||
|
||||
var LustreMetricJobstatsDefinition = []LustreMetricDefinition{
|
||||
{
|
||||
name: "lustre_job_read_samples",
|
||||
lineprefix: "read",
|
||||
offsetname: "samples",
|
||||
unit: "requests",
|
||||
calc: "none",
|
||||
},
|
||||
{
|
||||
name: "lustre_job_read_min_bytes",
|
||||
lineprefix: "read_bytes",
|
||||
offsetname: "min",
|
||||
unit: "bytes",
|
||||
calc: "none",
|
||||
},
|
||||
{
|
||||
name: "lustre_job_read_max_bytes",
|
||||
lineprefix: "read_bytes",
|
||||
offsetname: "max",
|
||||
unit: "bytes",
|
||||
calc: "none",
|
||||
},
|
||||
}
|
||||
|
||||
func (m *LustreJobstatCollector) executeLustreCommand(option string) []string {
|
||||
return executeLustreCommand(m.sudoCmd, m.lctl, LCTL_OPTION, option, m.config.Sudo)
|
||||
}
|
||||
|
||||
func (m *LustreJobstatCollector) Init(config json.RawMessage) error {
|
||||
var err error
|
||||
m.name = "LustreJobstatCollector"
|
||||
m.parallel = true
|
||||
m.config.JobRegex = defaultJobidRegex
|
||||
m.config.SendAbsoluteValues = true
|
||||
if len(config) > 0 {
|
||||
err = json.Unmarshal(config, &m.config)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
m.setup()
|
||||
m.tags = map[string]string{"type": "jobid"}
|
||||
m.meta = map[string]string{"source": m.name, "group": "Lustre", "scope": "job"}
|
||||
|
||||
// Lustre file system statistics can only be queried by user root
|
||||
// or with password-less sudo
|
||||
// if !m.config.Sudo {
|
||||
// user, err := user.Current()
|
||||
// if err != nil {
|
||||
// cclog.ComponentError(m.name, "Failed to get current user:", err.Error())
|
||||
// return err
|
||||
// }
|
||||
// if user.Uid != "0" {
|
||||
// cclog.ComponentError(m.name, "Lustre file system statistics can only be queried by user root")
|
||||
// return err
|
||||
// }
|
||||
// } else {
|
||||
// p, err := exec.LookPath("sudo")
|
||||
// if err != nil {
|
||||
// cclog.ComponentError(m.name, "Cannot find 'sudo'")
|
||||
// return err
|
||||
// }
|
||||
// m.sudoCmd = p
|
||||
// }
|
||||
|
||||
p, err := exec.LookPath(m.config.LCtlCommand)
|
||||
if err != nil {
|
||||
p, err = exec.LookPath(LCTL_CMD)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
m.lctl = p
|
||||
|
||||
m.definitions = make([]LustreMetricDefinition, 0)
|
||||
if m.config.SendAbsoluteValues {
|
||||
for _, def := range LustreMetricJobstatsDefinition {
|
||||
if _, skip := stringArrayContains(m.config.ExcludeMetrics, def.name); !skip {
|
||||
m.definitions = append(m.definitions, def)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if len(m.definitions) == 0 {
|
||||
return errors.New("no metrics to collect")
|
||||
}
|
||||
|
||||
x := make(map[string]map[string]LustreMetricData)
|
||||
m.lastMdtData = &x
|
||||
x = make(map[string]map[string]LustreMetricData)
|
||||
m.lastObdfilterData = &x
|
||||
|
||||
if len(m.config.JobRegex) > 0 {
|
||||
jregex := strings.ReplaceAll(m.config.JobRegex, "%", "\\")
|
||||
r, err := regexp.Compile(jregex)
|
||||
if err == nil {
|
||||
m.jobidRegex = r
|
||||
} else {
|
||||
cclog.ComponentError(m.name, "Cannot compile jobid regex")
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
m.lastTimestamp = time.Now()
|
||||
m.init = true
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *LustreJobstatCollector) Read(interval time.Duration, output chan lp.CCMetric) {
|
||||
if !m.init {
|
||||
return
|
||||
}
|
||||
|
||||
getValue := func(data map[string]map[string]LustreMetricData, device string, jobid string, operation string, field string) int64 {
|
||||
var value int64 = -1
|
||||
if ddata, ok := data[device]; ok {
|
||||
if jdata, ok := ddata[jobid]; ok {
|
||||
if opdata, ok := jdata.op_data[operation]; ok {
|
||||
if v, ok := opdata[field]; ok {
|
||||
value = v
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return value
|
||||
}
|
||||
|
||||
jobIdToTags := func(jobregex *regexp.Regexp, job string) map[string]string {
|
||||
tags := make(map[string]string)
|
||||
groups := jobregex.SubexpNames()
|
||||
for _, match := range jobregex.FindAllStringSubmatch(job, -1) {
|
||||
for groupIdx, group := range match {
|
||||
if len(groups[groupIdx]) > 0 {
|
||||
tags[groups[groupIdx]] = group
|
||||
}
|
||||
}
|
||||
}
|
||||
return tags
|
||||
}
|
||||
|
||||
generateMetric := func(definition LustreMetricDefinition, data map[string]map[string]LustreMetricData, last map[string]map[string]LustreMetricData, now time.Time) {
|
||||
tdiff := now.Sub(m.lastTimestamp)
|
||||
for dev, ddata := range data {
|
||||
for jobid, jdata := range ddata {
|
||||
jobtags := jobIdToTags(m.jobidRegex, jobid)
|
||||
if _, ok := jobtags["jobid"]; !ok {
|
||||
continue
|
||||
}
|
||||
cur := getValue(data, dev, jobid, definition.lineprefix, definition.offsetname)
|
||||
old := getValue(last, dev, jobid, definition.lineprefix, definition.offsetname)
|
||||
var x interface{} = -1
|
||||
var valid = false
|
||||
switch definition.calc {
|
||||
case "none":
|
||||
x = cur
|
||||
valid = true
|
||||
case "difference":
|
||||
if len(last) > 0 {
|
||||
if old >= 0 {
|
||||
x = cur - old
|
||||
valid = true
|
||||
}
|
||||
}
|
||||
case "derivative":
|
||||
if len(last) > 0 {
|
||||
if old >= 0 {
|
||||
x = float64(cur-old) / tdiff.Seconds()
|
||||
valid = true
|
||||
}
|
||||
}
|
||||
}
|
||||
if valid {
|
||||
y, err := lp.New(definition.name, m.tags, m.meta, map[string]interface{}{"value": x}, now)
|
||||
if err == nil {
|
||||
y.AddTag("stype", "device")
|
||||
y.AddTag("stype-id", dev)
|
||||
if j, ok := jobtags["jobid"]; ok {
|
||||
y.AddTag("type-id", j)
|
||||
} else {
|
||||
y.AddTag("type-id", jobid)
|
||||
}
|
||||
for k, v := range jobtags {
|
||||
switch k {
|
||||
case "jobid":
|
||||
case "hostname":
|
||||
y.AddTag("hostname", v)
|
||||
default:
|
||||
y.AddMeta(k, v)
|
||||
}
|
||||
}
|
||||
if len(definition.unit) > 0 {
|
||||
y.AddMeta("unit", definition.unit)
|
||||
} else {
|
||||
if unit, ok := jdata.op_units[definition.lineprefix]; ok {
|
||||
y.AddMeta("unit", unit)
|
||||
}
|
||||
}
|
||||
output <- y
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
|
||||
mdt_lines := m.executeLustreCommand("mdt.*.job_stats")
|
||||
if len(mdt_lines) > 0 {
|
||||
mdt_data := readCommandOutput(mdt_lines)
|
||||
for _, def := range m.definitions {
|
||||
generateMetric(def, mdt_data, *m.lastMdtData, now)
|
||||
}
|
||||
m.lastMdtData = &mdt_data
|
||||
}
|
||||
|
||||
obdfilter_lines := m.executeLustreCommand("obdfilter.*.job_stats")
|
||||
if len(obdfilter_lines) > 0 {
|
||||
obdfilter_data := readCommandOutput(obdfilter_lines)
|
||||
for _, def := range m.definitions {
|
||||
generateMetric(def, obdfilter_data, *m.lastObdfilterData, now)
|
||||
}
|
||||
m.lastObdfilterData = &obdfilter_data
|
||||
}
|
||||
|
||||
m.lastTimestamp = now
|
||||
}
|
||||
|
||||
func (m *LustreJobstatCollector) Close() {
|
||||
m.init = false
|
||||
}
|
35
collectors/lustreJobstatMetric.md
Normal file
35
collectors/lustreJobstatMetric.md
Normal file
@@ -0,0 +1,35 @@
|
||||
|
||||
## `lustre_jobstat` collector
|
||||
|
||||
**Note**: This collector is meant to run on the Lustre servers, **not** the clients
|
||||
|
||||
The Lustre filesystem provides a feature (`job_stats`) to group processes on client side with an identifier string (like a compute job with its jobid) and retrieve the file system operation counts on the server side. Check the section [How to configure `job_stats`]() for more information.
|
||||
|
||||
### Configuration
|
||||
|
||||
```json
|
||||
"lustre_jobstat_": {
|
||||
"lctl_command": "/path/to/lctl",
|
||||
"use_sudo": false,
|
||||
"exclude_metrics": [
|
||||
"setattr",
|
||||
"getattr"
|
||||
],
|
||||
"send_abs_values" : true,
|
||||
|
||||
"jobid_regex" : "^(?P<jobid>[%d%w%.]+)$"
|
||||
}
|
||||
```
|
||||
|
||||
The `lustre_jobstat` collector uses the `lctl` application with the `get_param` option to get all `mdt.*.job_stats` and `obdfilter.*.job_stats` metrics. These metrics are only available for root users. If password-less sudo is configured, you can enable `sudo` in the configuration. In the `exclude_metrics` list, some metrics can be excluded to reduce network traffic and storage. With the `send_abs_values` flag, the collector sends absolute values for the configured metrics. The `jobid_regex` can be used to split the Lustre `job_stats` identifier into multiple parts. Since JSON cannot handle strings like `\d`, use `%` instead of `\`.
|
||||
|
||||
Metrics:
|
||||
- `lustre_job_read_samples` (unit: `requests`)
|
||||
- `lustre_job_read_min_bytes` (unit: `bytes`)
|
||||
- `lustre_job_read_max_bytes` (unit: `bytes`)
|
||||
|
||||
The collector adds the tags: `type=jobid,typeid=<jobid_from_regex>,stype=device,stype=<device_name_from_output>`.
|
||||
|
||||
The collector adds the mega information: `unit=<unit>,scope=job`
|
||||
|
||||
### How to configure `job_stats`
|
@@ -14,10 +14,6 @@ import (
|
||||
lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
|
||||
)
|
||||
|
||||
const LUSTRE_SYSFS = `/sys/fs/lustre`
|
||||
const LCTL_CMD = `lctl`
|
||||
const LCTL_OPTION = `get_param`
|
||||
|
||||
type LustreCollectorConfig struct {
|
||||
LCtlCommand string `json:"lctl_command,omitempty"`
|
||||
ExcludeMetrics []string `json:"exclude_metrics,omitempty"`
|
||||
@@ -27,14 +23,6 @@ type LustreCollectorConfig struct {
|
||||
SendDiffValues bool `json:"send_diff_values,omitempty"`
|
||||
}
|
||||
|
||||
type LustreMetricDefinition struct {
|
||||
name string
|
||||
lineprefix string
|
||||
lineoffset int
|
||||
unit string
|
||||
calc string
|
||||
}
|
||||
|
||||
type LustreCollector struct {
|
||||
metricCollector
|
||||
tags map[string]string
|
||||
@@ -46,17 +34,209 @@ type LustreCollector struct {
|
||||
stats map[string]map[string]int64 // Data for last value per device and metric
|
||||
}
|
||||
|
||||
var LustreAbsMetrics = []LustreMetricDefinition{
|
||||
{
|
||||
name: "lustre_read_requests",
|
||||
lineprefix: "read_bytes",
|
||||
lineoffset: 1,
|
||||
offsetname: "samples",
|
||||
unit: "requests",
|
||||
calc: "none",
|
||||
},
|
||||
{
|
||||
name: "lustre_write_requests",
|
||||
lineprefix: "write_bytes",
|
||||
lineoffset: 1,
|
||||
offsetname: "samples",
|
||||
unit: "requests",
|
||||
calc: "none",
|
||||
},
|
||||
{
|
||||
name: "lustre_read_bytes",
|
||||
lineprefix: "read_bytes",
|
||||
lineoffset: 6,
|
||||
offsetname: "sum",
|
||||
unit: "bytes",
|
||||
calc: "none",
|
||||
},
|
||||
{
|
||||
name: "lustre_write_bytes",
|
||||
lineprefix: "write_bytes",
|
||||
lineoffset: 6,
|
||||
offsetname: "sum",
|
||||
unit: "bytes",
|
||||
calc: "none",
|
||||
},
|
||||
{
|
||||
name: "lustre_open",
|
||||
lineprefix: "open",
|
||||
lineoffset: 1,
|
||||
offsetname: "samples",
|
||||
unit: "",
|
||||
calc: "none",
|
||||
},
|
||||
{
|
||||
name: "lustre_close",
|
||||
lineprefix: "close",
|
||||
lineoffset: 1,
|
||||
offsetname: "samples",
|
||||
unit: "",
|
||||
calc: "none",
|
||||
},
|
||||
{
|
||||
name: "lustre_setattr",
|
||||
lineprefix: "setattr",
|
||||
lineoffset: 1,
|
||||
offsetname: "samples",
|
||||
unit: "",
|
||||
calc: "none",
|
||||
},
|
||||
{
|
||||
name: "lustre_getattr",
|
||||
lineprefix: "getattr",
|
||||
lineoffset: 1,
|
||||
offsetname: "samples",
|
||||
unit: "",
|
||||
calc: "none",
|
||||
},
|
||||
{
|
||||
name: "lustre_statfs",
|
||||
lineprefix: "statfs",
|
||||
lineoffset: 1,
|
||||
offsetname: "samples",
|
||||
unit: "",
|
||||
calc: "none",
|
||||
},
|
||||
{
|
||||
name: "lustre_inode_permission",
|
||||
lineprefix: "inode_permission",
|
||||
lineoffset: 1,
|
||||
offsetname: "samples",
|
||||
unit: "",
|
||||
calc: "none",
|
||||
},
|
||||
}
|
||||
|
||||
var LustreDiffMetrics = []LustreMetricDefinition{
|
||||
{
|
||||
name: "lustre_read_requests_diff",
|
||||
lineprefix: "read_bytes",
|
||||
lineoffset: 1,
|
||||
offsetname: "samples",
|
||||
unit: "requests",
|
||||
calc: "difference",
|
||||
},
|
||||
{
|
||||
name: "lustre_write_requests_diff",
|
||||
lineprefix: "write_bytes",
|
||||
lineoffset: 1,
|
||||
offsetname: "samples",
|
||||
unit: "requests",
|
||||
calc: "difference",
|
||||
},
|
||||
{
|
||||
name: "lustre_read_bytes_diff",
|
||||
lineprefix: "read_bytes",
|
||||
lineoffset: 6,
|
||||
offsetname: "sum",
|
||||
unit: "bytes",
|
||||
calc: "difference",
|
||||
},
|
||||
{
|
||||
name: "lustre_write_bytes_diff",
|
||||
lineprefix: "write_bytes",
|
||||
lineoffset: 6,
|
||||
offsetname: "sum",
|
||||
unit: "bytes",
|
||||
calc: "difference",
|
||||
},
|
||||
{
|
||||
name: "lustre_open_diff",
|
||||
lineprefix: "open",
|
||||
lineoffset: 1,
|
||||
offsetname: "samples",
|
||||
unit: "",
|
||||
calc: "difference",
|
||||
},
|
||||
{
|
||||
name: "lustre_close_diff",
|
||||
lineprefix: "close",
|
||||
lineoffset: 1,
|
||||
offsetname: "samples",
|
||||
unit: "",
|
||||
calc: "difference",
|
||||
},
|
||||
{
|
||||
name: "lustre_setattr_diff",
|
||||
lineprefix: "setattr",
|
||||
lineoffset: 1,
|
||||
offsetname: "samples",
|
||||
unit: "",
|
||||
calc: "difference",
|
||||
},
|
||||
{
|
||||
name: "lustre_getattr_diff",
|
||||
lineprefix: "getattr",
|
||||
lineoffset: 1,
|
||||
offsetname: "samples",
|
||||
unit: "",
|
||||
calc: "difference",
|
||||
},
|
||||
{
|
||||
name: "lustre_statfs_diff",
|
||||
lineprefix: "statfs",
|
||||
lineoffset: 1,
|
||||
offsetname: "samples",
|
||||
unit: "",
|
||||
calc: "difference",
|
||||
},
|
||||
{
|
||||
name: "lustre_inode_permission_diff",
|
||||
lineprefix: "inode_permission",
|
||||
lineoffset: 1,
|
||||
offsetname: "samples",
|
||||
unit: "",
|
||||
calc: "difference",
|
||||
},
|
||||
}
|
||||
|
||||
var LustreDeriveMetrics = []LustreMetricDefinition{
|
||||
{
|
||||
name: "lustre_read_requests_rate",
|
||||
lineprefix: "read_bytes",
|
||||
lineoffset: 1,
|
||||
offsetname: "samples",
|
||||
unit: "requests/sec",
|
||||
calc: "derivative",
|
||||
},
|
||||
{
|
||||
name: "lustre_write_requests_rate",
|
||||
lineprefix: "write_bytes",
|
||||
lineoffset: 1,
|
||||
offsetname: "samples",
|
||||
unit: "requests/sec",
|
||||
calc: "derivative",
|
||||
},
|
||||
{
|
||||
name: "lustre_read_bw",
|
||||
lineprefix: "read_bytes",
|
||||
lineoffset: 6,
|
||||
offsetname: "sum",
|
||||
unit: "bytes/sec",
|
||||
calc: "derivative",
|
||||
},
|
||||
{
|
||||
name: "lustre_write_bw",
|
||||
lineprefix: "write_bytes",
|
||||
lineoffset: 6,
|
||||
offsetname: "sum",
|
||||
unit: "bytes/sec",
|
||||
calc: "derivative",
|
||||
},
|
||||
}
|
||||
|
||||
func (m *LustreCollector) getDeviceDataCommand(device string) []string {
|
||||
var command *exec.Cmd
|
||||
statsfile := fmt.Sprintf("llite.%s.stats", device)
|
||||
if m.config.Sudo {
|
||||
command = exec.Command(m.sudoCmd, m.lctl, LCTL_OPTION, statsfile)
|
||||
} else {
|
||||
command = exec.Command(m.lctl, LCTL_OPTION, statsfile)
|
||||
}
|
||||
command.Wait()
|
||||
stdout, _ := command.Output()
|
||||
return strings.Split(string(stdout), "\n")
|
||||
return executeLustreCommand(m.sudoCmd, m.lctl, LCTL_OPTION, fmt.Sprintf("llite.%s.stats", device), m.config.Sudo)
|
||||
}
|
||||
|
||||
func (m *LustreCollector) getDevices() []string {
|
||||
@@ -108,183 +288,6 @@ func getMetricData(lines []string, prefix string, offset int) (int64, error) {
|
||||
// return strings.Split(string(buffer), "\n")
|
||||
// }
|
||||
|
||||
var LustreAbsMetrics = []LustreMetricDefinition{
|
||||
{
|
||||
name: "lustre_read_requests",
|
||||
lineprefix: "read_bytes",
|
||||
lineoffset: 1,
|
||||
unit: "requests",
|
||||
calc: "none",
|
||||
},
|
||||
{
|
||||
name: "lustre_write_requests",
|
||||
lineprefix: "write_bytes",
|
||||
lineoffset: 1,
|
||||
unit: "requests",
|
||||
calc: "none",
|
||||
},
|
||||
{
|
||||
name: "lustre_read_bytes",
|
||||
lineprefix: "read_bytes",
|
||||
lineoffset: 6,
|
||||
unit: "bytes",
|
||||
calc: "none",
|
||||
},
|
||||
{
|
||||
name: "lustre_write_bytes",
|
||||
lineprefix: "write_bytes",
|
||||
lineoffset: 6,
|
||||
unit: "bytes",
|
||||
calc: "none",
|
||||
},
|
||||
{
|
||||
name: "lustre_open",
|
||||
lineprefix: "open",
|
||||
lineoffset: 1,
|
||||
unit: "",
|
||||
calc: "none",
|
||||
},
|
||||
{
|
||||
name: "lustre_close",
|
||||
lineprefix: "close",
|
||||
lineoffset: 1,
|
||||
unit: "",
|
||||
calc: "none",
|
||||
},
|
||||
{
|
||||
name: "lustre_setattr",
|
||||
lineprefix: "setattr",
|
||||
lineoffset: 1,
|
||||
unit: "",
|
||||
calc: "none",
|
||||
},
|
||||
{
|
||||
name: "lustre_getattr",
|
||||
lineprefix: "getattr",
|
||||
lineoffset: 1,
|
||||
unit: "",
|
||||
calc: "none",
|
||||
},
|
||||
{
|
||||
name: "lustre_statfs",
|
||||
lineprefix: "statfs",
|
||||
lineoffset: 1,
|
||||
unit: "",
|
||||
calc: "none",
|
||||
},
|
||||
{
|
||||
name: "lustre_inode_permission",
|
||||
lineprefix: "inode_permission",
|
||||
lineoffset: 1,
|
||||
unit: "",
|
||||
calc: "none",
|
||||
},
|
||||
}
|
||||
|
||||
var LustreDiffMetrics = []LustreMetricDefinition{
|
||||
{
|
||||
name: "lustre_read_requests_diff",
|
||||
lineprefix: "read_bytes",
|
||||
lineoffset: 1,
|
||||
unit: "requests",
|
||||
calc: "difference",
|
||||
},
|
||||
{
|
||||
name: "lustre_write_requests_diff",
|
||||
lineprefix: "write_bytes",
|
||||
lineoffset: 1,
|
||||
unit: "requests",
|
||||
calc: "difference",
|
||||
},
|
||||
{
|
||||
name: "lustre_read_bytes_diff",
|
||||
lineprefix: "read_bytes",
|
||||
lineoffset: 6,
|
||||
unit: "bytes",
|
||||
calc: "difference",
|
||||
},
|
||||
{
|
||||
name: "lustre_write_bytes_diff",
|
||||
lineprefix: "write_bytes",
|
||||
lineoffset: 6,
|
||||
unit: "bytes",
|
||||
calc: "difference",
|
||||
},
|
||||
{
|
||||
name: "lustre_open_diff",
|
||||
lineprefix: "open",
|
||||
lineoffset: 1,
|
||||
unit: "",
|
||||
calc: "difference",
|
||||
},
|
||||
{
|
||||
name: "lustre_close_diff",
|
||||
lineprefix: "close",
|
||||
lineoffset: 1,
|
||||
unit: "",
|
||||
calc: "difference",
|
||||
},
|
||||
{
|
||||
name: "lustre_setattr_diff",
|
||||
lineprefix: "setattr",
|
||||
lineoffset: 1,
|
||||
unit: "",
|
||||
calc: "difference",
|
||||
},
|
||||
{
|
||||
name: "lustre_getattr_diff",
|
||||
lineprefix: "getattr",
|
||||
lineoffset: 1,
|
||||
unit: "",
|
||||
calc: "difference",
|
||||
},
|
||||
{
|
||||
name: "lustre_statfs_diff",
|
||||
lineprefix: "statfs",
|
||||
lineoffset: 1,
|
||||
unit: "",
|
||||
calc: "difference",
|
||||
},
|
||||
{
|
||||
name: "lustre_inode_permission_diff",
|
||||
lineprefix: "inode_permission",
|
||||
lineoffset: 1,
|
||||
unit: "",
|
||||
calc: "difference",
|
||||
},
|
||||
}
|
||||
|
||||
var LustreDeriveMetrics = []LustreMetricDefinition{
|
||||
{
|
||||
name: "lustre_read_requests_rate",
|
||||
lineprefix: "read_bytes",
|
||||
lineoffset: 1,
|
||||
unit: "requests/sec",
|
||||
calc: "derivative",
|
||||
},
|
||||
{
|
||||
name: "lustre_write_requests_rate",
|
||||
lineprefix: "write_bytes",
|
||||
lineoffset: 1,
|
||||
unit: "requests/sec",
|
||||
calc: "derivative",
|
||||
},
|
||||
{
|
||||
name: "lustre_read_bw",
|
||||
lineprefix: "read_bytes",
|
||||
lineoffset: 6,
|
||||
unit: "bytes/sec",
|
||||
calc: "derivative",
|
||||
},
|
||||
{
|
||||
name: "lustre_write_bw",
|
||||
lineprefix: "write_bytes",
|
||||
lineoffset: 6,
|
||||
unit: "bytes/sec",
|
||||
calc: "derivative",
|
||||
},
|
||||
}
|
||||
|
||||
func (m *LustreCollector) Init(config json.RawMessage) error {
|
||||
var err error
|
||||
m.name = "LustreCollector"
|
||||
@@ -297,7 +300,7 @@ func (m *LustreCollector) Init(config json.RawMessage) error {
|
||||
}
|
||||
m.setup()
|
||||
m.tags = map[string]string{"type": "node"}
|
||||
m.meta = map[string]string{"source": m.name, "group": "Lustre"}
|
||||
m.meta = map[string]string{"source": m.name, "group": "Lustre", "scope": "node"}
|
||||
|
||||
// Lustre file system statistics can only be queried by user root
|
||||
// or with password-less sudo
|
||||
|
190
collectors/lustreMetricCommon.go
Normal file
190
collectors/lustreMetricCommon.go
Normal file
@@ -0,0 +1,190 @@
|
||||
package collectors
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os/exec"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
)
|
||||
|
||||
const LUSTRE_SYSFS = `/sys/fs/lustre`
|
||||
const LCTL_CMD = `lctl`
|
||||
const LCTL_OPTION = `get_param`
|
||||
|
||||
type LustreMetricDefinition struct {
|
||||
name string
|
||||
lineprefix string
|
||||
lineoffset int
|
||||
offsetname string
|
||||
unit string
|
||||
calc string
|
||||
}
|
||||
|
||||
type LustreMetricData struct {
|
||||
sample_time int64
|
||||
start_time int64
|
||||
elapsed_time int64
|
||||
op_data map[string]map[string]int64
|
||||
op_units map[string]string
|
||||
sample_time_unit string
|
||||
start_time_unit string
|
||||
elapsed_time_unit string
|
||||
}
|
||||
|
||||
var devicePattern = regexp.MustCompile(`^[\w\d\-_]+\.([\w\d\-_]+)\.[\w\d\-_]+=$`)
|
||||
var jobPattern = regexp.MustCompile(`^-\s*job_id:\s*([\w\d\-_\.:]+)$`)
|
||||
var snapshotPattern = regexp.MustCompile(`^\s*snapshot_time\s*:\s*([\d\.]+)\s*([\w\d\-_\.]*)$`)
|
||||
var startPattern = regexp.MustCompile(`^\s*start_time\s*:\s*([\d\.]+)\s*([\w\d\-_\.]*)$`)
|
||||
var elapsedPattern = regexp.MustCompile(`^\s*elapsed_time\s*:\s*([\d\.]+)\s*([\w\d\-_\.]*)$`)
|
||||
var linePattern = regexp.MustCompile(`^\s*([\w\d\-_\.]+):\s*\{\s*samples:\s*([\d\.]+),\s*unit:\s*([\w\d\-_\.]+),\s*min:\s*([\d\.]+),\s*max:\s*([\d\.]+),\s*sum:\s*([\d\.]+),\s*sumsq:\s*([\d\.]+)\s*\}`)
|
||||
|
||||
func executeLustreCommand(sudo, lctl, option, search string, use_sudo bool) []string {
|
||||
var command *exec.Cmd
|
||||
if use_sudo {
|
||||
command = exec.Command(sudo, lctl, option, search)
|
||||
} else {
|
||||
command = exec.Command(lctl, option, search)
|
||||
}
|
||||
command.Wait()
|
||||
stdout, _ := command.Output()
|
||||
return strings.Split(string(stdout), "\n")
|
||||
}
|
||||
|
||||
func splitTree(lines []string, splitRegex *regexp.Regexp) map[string][]string {
|
||||
entries := make(map[string][]string)
|
||||
ent_lines := make([]int, 0)
|
||||
for i, l := range lines {
|
||||
m := splitRegex.FindStringSubmatch(l)
|
||||
if len(m) == 2 {
|
||||
ent_lines = append(ent_lines, i)
|
||||
}
|
||||
}
|
||||
if len(ent_lines) > 0 {
|
||||
for i, idx := range ent_lines[:len(ent_lines)-1] {
|
||||
m := splitRegex.FindStringSubmatch(lines[idx])
|
||||
entries[m[1]] = lines[idx+1 : ent_lines[i+1]]
|
||||
}
|
||||
last := ent_lines[len(ent_lines)-1]
|
||||
m := splitRegex.FindStringSubmatch(lines[last])
|
||||
entries[m[1]] = lines[last:]
|
||||
}
|
||||
return entries
|
||||
}
|
||||
|
||||
func readDevices(lines []string) map[string][]string {
|
||||
return splitTree(lines, devicePattern)
|
||||
}
|
||||
|
||||
func readJobs(lines []string) map[string][]string {
|
||||
return splitTree(lines, jobPattern)
|
||||
}
|
||||
|
||||
func readJobdata(lines []string) LustreMetricData {
|
||||
|
||||
jobdata := LustreMetricData{
|
||||
op_data: make(map[string]map[string]int64),
|
||||
op_units: make(map[string]string),
|
||||
sample_time: 0,
|
||||
sample_time_unit: "nsec",
|
||||
start_time: 0,
|
||||
start_time_unit: "nsec",
|
||||
elapsed_time: 0,
|
||||
elapsed_time_unit: "nsec",
|
||||
}
|
||||
parseTime := func(value, unit string) int64 {
|
||||
var t int64 = 0
|
||||
if len(unit) == 0 {
|
||||
unit = "secs"
|
||||
}
|
||||
values := strings.Split(value, ".")
|
||||
units := strings.Split(unit, ".")
|
||||
if len(values) != len(units) {
|
||||
fmt.Printf("Invalid time specification '%s' and '%s'\n", value, unit)
|
||||
}
|
||||
for i, v := range values {
|
||||
if len(units) > i {
|
||||
s, err := strconv.ParseInt(v, 10, 64)
|
||||
if err == nil {
|
||||
switch units[i] {
|
||||
case "secs":
|
||||
t += s * 1e9
|
||||
case "msecs":
|
||||
t += s * 1e6
|
||||
case "usecs":
|
||||
t += s * 1e3
|
||||
case "nsecs":
|
||||
t += s
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return t
|
||||
}
|
||||
parseNumber := func(value string) int64 {
|
||||
s, err := strconv.ParseInt(value, 10, 64)
|
||||
if err == nil {
|
||||
return s
|
||||
}
|
||||
return 0
|
||||
}
|
||||
for _, l := range lines {
|
||||
if jobdata.sample_time == 0 {
|
||||
m := snapshotPattern.FindStringSubmatch(l)
|
||||
if len(m) == 3 {
|
||||
if len(m[2]) > 0 {
|
||||
jobdata.sample_time = parseTime(m[1], m[2])
|
||||
} else {
|
||||
jobdata.sample_time = parseTime(m[1], "secs")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if jobdata.start_time == 0 {
|
||||
m := startPattern.FindStringSubmatch(l)
|
||||
if len(m) == 3 {
|
||||
if len(m[2]) > 0 {
|
||||
jobdata.start_time = parseTime(m[1], m[2])
|
||||
} else {
|
||||
jobdata.start_time = parseTime(m[1], "secs")
|
||||
}
|
||||
}
|
||||
}
|
||||
if jobdata.elapsed_time == 0 {
|
||||
m := elapsedPattern.FindStringSubmatch(l)
|
||||
if len(m) == 3 {
|
||||
if len(m[2]) > 0 {
|
||||
jobdata.elapsed_time = parseTime(m[1], m[2])
|
||||
} else {
|
||||
jobdata.elapsed_time = parseTime(m[1], "secs")
|
||||
}
|
||||
}
|
||||
}
|
||||
m := linePattern.FindStringSubmatch(l)
|
||||
if len(m) == 8 {
|
||||
jobdata.op_units[m[1]] = m[3]
|
||||
jobdata.op_data[m[1]] = map[string]int64{
|
||||
"samples": parseNumber(m[2]),
|
||||
"min": parseNumber(m[4]),
|
||||
"max": parseNumber(m[5]),
|
||||
"sum": parseNumber(m[6]),
|
||||
"sumsq": parseNumber(m[7]),
|
||||
}
|
||||
}
|
||||
}
|
||||
return jobdata
|
||||
}
|
||||
|
||||
func readCommandOutput(lines []string) map[string]map[string]LustreMetricData {
|
||||
var data map[string]map[string]LustreMetricData = make(map[string]map[string]LustreMetricData)
|
||||
devs := readDevices(lines)
|
||||
for d, ddata := range devs {
|
||||
data[d] = make(map[string]LustreMetricData)
|
||||
jobs := readJobs(ddata)
|
||||
for j, jdata := range jobs {
|
||||
x := readJobdata(jdata)
|
||||
data[d][j] = x
|
||||
}
|
||||
}
|
||||
return data
|
||||
}
|
@@ -14,38 +14,29 @@ import (
|
||||
lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
|
||||
)
|
||||
|
||||
// Non-Uniform Memory Access (NUMA) policy hit/miss statistics
|
||||
//
|
||||
// Numa policy hit/miss statistics
|
||||
//
|
||||
// numa_hit:
|
||||
//
|
||||
// A process wanted to allocate memory from this node, and succeeded.
|
||||
//
|
||||
// A process wanted to allocate memory from this node, and succeeded.
|
||||
// numa_miss:
|
||||
//
|
||||
// A process wanted to allocate memory from another node,
|
||||
// but ended up with memory from this node.
|
||||
//
|
||||
// A process wanted to allocate memory from another node,
|
||||
// but ended up with memory from this node.
|
||||
// numa_foreign:
|
||||
//
|
||||
// A process wanted to allocate on this node,
|
||||
// but ended up with memory from another node.
|
||||
//
|
||||
// A process wanted to allocate on this node,
|
||||
// but ended up with memory from another node.
|
||||
// local_node:
|
||||
//
|
||||
// A process ran on this node's CPU,
|
||||
// and got memory from this node.
|
||||
//
|
||||
// A process ran on this node's CPU,
|
||||
// and got memory from this node.
|
||||
// other_node:
|
||||
//
|
||||
// A process ran on a different node's CPU
|
||||
// and got memory from this node.
|
||||
//
|
||||
// A process ran on a different node's CPU
|
||||
// and got memory from this node.
|
||||
// interleave_hit:
|
||||
//
|
||||
// Interleaving wanted to allocate from this node
|
||||
// and succeeded.
|
||||
// Interleaving wanted to allocate from this node
|
||||
// and succeeded.
|
||||
//
|
||||
// See: https://www.kernel.org/doc/html/latest/admin-guide/numastat.html
|
||||
//
|
||||
type NUMAStatsCollectorTopolgy struct {
|
||||
file string
|
||||
tagSet map[string]string
|
||||
@@ -91,8 +82,6 @@ func (m *NUMAStatsCollector) Init(config json.RawMessage) error {
|
||||
})
|
||||
}
|
||||
|
||||
// Initialized
|
||||
cclog.ComponentDebug(m.name, "initialized", len(m.topology), "NUMA domains")
|
||||
m.init = true
|
||||
return nil
|
||||
}
|
||||
|
@@ -1,17 +1,15 @@
|
||||
|
||||
## `numastat` collector
|
||||
|
||||
```json
|
||||
"numastats": {}
|
||||
"numastat": {}
|
||||
```
|
||||
|
||||
The `numastat` collector reads data from `/sys/devices/system/node/node*/numastat` and outputs a handful **memoryDomain** metrics. See: <https://www.kernel.org/doc/html/latest/admin-guide/numastat.html>
|
||||
The `numastat` collector reads data from `/sys/devices/system/node/node*/numastat` and outputs a handful **memoryDomain** metrics. See: https://www.kernel.org/doc/html/latest/admin-guide/numastat.html
|
||||
|
||||
Metrics:
|
||||
|
||||
* `numastats_numa_hit`: A process wanted to allocate memory from this node, and succeeded.
|
||||
* `numastats_numa_miss`: A process wanted to allocate memory from another node, but ended up with memory from this node.
|
||||
* `numastats_numa_foreign`: A process wanted to allocate on this node, but ended up with memory from another node.
|
||||
* `numastats_local_node`: A process ran on this node's CPU, and got memory from this node.
|
||||
* `numastats_other_node`: A process ran on a different node's CPU, and got memory from this node.
|
||||
* `numastats_interleave_hit`: Interleaving wanted to allocate from this node and succeeded.
|
||||
* `numastats_interleave_hit`: Interleaving wanted to allocate from this node and succeeded.
|
@@ -1,262 +0,0 @@
|
||||
package collectors
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
|
||||
lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
|
||||
)
|
||||
|
||||
// running average power limit (RAPL) monitoring attributes for a zone
|
||||
type RAPLZoneInfo struct {
|
||||
// tags describing the RAPL zone:
|
||||
// * zone_name, subzone_name: e.g. psys, dram, core, uncore, package-0
|
||||
// * zone_id: e.g. 0:1 (zone 0 sub zone 1)
|
||||
tags map[string]string
|
||||
energyFilepath string // path to a file containing the zones current energy counter in micro joules
|
||||
energy int64 // current reading of the energy counter in micro joules
|
||||
energyTimestamp time.Time // timestamp when energy counter was read
|
||||
maxEnergyRange int64 // Range of the above energy counter in micro-joules
|
||||
}
|
||||
|
||||
type RAPLCollector struct {
|
||||
metricCollector
|
||||
config struct {
|
||||
// Exclude IDs for RAPL zones, e.g.
|
||||
// * 0 for zone 0
|
||||
// * 0:1 for zone 0 subzone 1
|
||||
ExcludeByID []string `json:"exclude_device_by_id,omitempty"`
|
||||
// Exclude names for RAPL zones, e.g. psys, dram, core, uncore, package-0
|
||||
ExcludeByName []string `json:"exclude_device_by_name,omitempty"`
|
||||
}
|
||||
RAPLZoneInfo []RAPLZoneInfo
|
||||
meta map[string]string // default meta information
|
||||
}
|
||||
|
||||
// Init initializes the running average power limit (RAPL) collector
|
||||
func (m *RAPLCollector) Init(config json.RawMessage) error {
|
||||
|
||||
// Check if already initialized
|
||||
if m.init {
|
||||
return nil
|
||||
}
|
||||
|
||||
var err error = nil
|
||||
m.name = "RAPLCollector"
|
||||
m.setup()
|
||||
m.parallel = true
|
||||
m.meta = map[string]string{
|
||||
"source": m.name,
|
||||
"group": "energy",
|
||||
"unit": "Watt",
|
||||
}
|
||||
|
||||
// Read in the JSON configuration
|
||||
if len(config) > 0 {
|
||||
err = json.Unmarshal(config, &m.config)
|
||||
if err != nil {
|
||||
cclog.ComponentError(m.name, "Error reading config:", err.Error())
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Configure excluded RAPL zones
|
||||
isIDExcluded := make(map[string]bool)
|
||||
if m.config.ExcludeByID != nil {
|
||||
for _, ID := range m.config.ExcludeByID {
|
||||
isIDExcluded[ID] = true
|
||||
}
|
||||
}
|
||||
isNameExcluded := make(map[string]bool)
|
||||
if m.config.ExcludeByName != nil {
|
||||
for _, name := range m.config.ExcludeByName {
|
||||
isNameExcluded[name] = true
|
||||
}
|
||||
}
|
||||
|
||||
// readZoneInfo reads RAPL monitoring attributes for a zone given by zonePath
|
||||
// See: https://www.kernel.org/doc/html/latest/power/powercap/powercap.html#monitoring-attributes
|
||||
readZoneInfo := func(zonePath string) (z struct {
|
||||
name string // zones name e.g. psys, dram, core, uncore, package-0
|
||||
energyFilepath string // path to a file containing the zones current energy counter in micro joules
|
||||
energy int64 // current reading of the energy counter in micro joules
|
||||
energyTimestamp time.Time // timestamp when energy counter was read
|
||||
maxEnergyRange int64 // Range of the above energy counter in micro-joules
|
||||
ok bool // Are all information available?
|
||||
}) {
|
||||
// zones name e.g. psys, dram, core, uncore, package-0
|
||||
foundName := false
|
||||
if v, err :=
|
||||
os.ReadFile(
|
||||
filepath.Join(zonePath, "name")); err == nil {
|
||||
foundName = true
|
||||
z.name = strings.TrimSpace(string(v))
|
||||
}
|
||||
|
||||
// path to a file containing the zones current energy counter in micro joules
|
||||
z.energyFilepath = filepath.Join(zonePath, "energy_uj")
|
||||
|
||||
// current reading of the energy counter in micro joules
|
||||
foundEnergy := false
|
||||
if v, err := os.ReadFile(z.energyFilepath); err == nil {
|
||||
// timestamp when energy counter was read
|
||||
z.energyTimestamp = time.Now()
|
||||
if i, err := strconv.ParseInt(strings.TrimSpace(string(v)), 10, 64); err == nil {
|
||||
foundEnergy = true
|
||||
z.energy = i
|
||||
}
|
||||
}
|
||||
|
||||
// Range of the above energy counter in micro-joules
|
||||
foundMaxEnergyRange := false
|
||||
if v, err :=
|
||||
os.ReadFile(
|
||||
filepath.Join(zonePath, "max_energy_range_uj")); err == nil {
|
||||
if i, err := strconv.ParseInt(strings.TrimSpace(string(v)), 10, 64); err == nil {
|
||||
foundMaxEnergyRange = true
|
||||
z.maxEnergyRange = i
|
||||
}
|
||||
}
|
||||
|
||||
// Are all information available?
|
||||
z.ok = foundName && foundEnergy && foundMaxEnergyRange
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
powerCapPrefix := "/sys/devices/virtual/powercap"
|
||||
controlType := "intel-rapl"
|
||||
controlTypePath := filepath.Join(powerCapPrefix, controlType)
|
||||
|
||||
// Find all RAPL zones
|
||||
zonePrefix := filepath.Join(controlTypePath, controlType+":")
|
||||
zonesPath, err := filepath.Glob(zonePrefix + "*")
|
||||
if err != nil || zonesPath == nil {
|
||||
return fmt.Errorf("unable to find any zones under %s", controlTypePath)
|
||||
}
|
||||
|
||||
for _, zonePath := range zonesPath {
|
||||
zoneID := strings.TrimPrefix(zonePath, zonePrefix)
|
||||
z := readZoneInfo(zonePath)
|
||||
if z.ok &&
|
||||
!isIDExcluded[zoneID] &&
|
||||
!isNameExcluded[z.name] {
|
||||
|
||||
// Add RAPL monitoring attributes for a zone
|
||||
m.RAPLZoneInfo =
|
||||
append(
|
||||
m.RAPLZoneInfo,
|
||||
RAPLZoneInfo{
|
||||
tags: map[string]string{
|
||||
"id": zoneID,
|
||||
"zone_name": z.name,
|
||||
},
|
||||
energyFilepath: z.energyFilepath,
|
||||
energy: z.energy,
|
||||
energyTimestamp: z.energyTimestamp,
|
||||
maxEnergyRange: z.maxEnergyRange,
|
||||
})
|
||||
}
|
||||
|
||||
// find all sub zones for the given zone
|
||||
subZonePrefix := filepath.Join(zonePath, controlType+":"+zoneID+":")
|
||||
subZonesPath, err := filepath.Glob(subZonePrefix + "*")
|
||||
if err != nil || subZonesPath == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
for _, subZonePath := range subZonesPath {
|
||||
subZoneID := strings.TrimPrefix(subZonePath, subZonePrefix)
|
||||
sz := readZoneInfo(subZonePath)
|
||||
if len(zoneID) > 0 && len(z.name) > 0 &&
|
||||
sz.ok &&
|
||||
!isIDExcluded[zoneID+":"+subZoneID] &&
|
||||
!isNameExcluded[sz.name] {
|
||||
m.RAPLZoneInfo =
|
||||
append(
|
||||
m.RAPLZoneInfo,
|
||||
RAPLZoneInfo{
|
||||
tags: map[string]string{
|
||||
"id": zoneID + ":" + subZoneID,
|
||||
"zone_name": z.name,
|
||||
"sub_zone_name": sz.name,
|
||||
},
|
||||
energyFilepath: sz.energyFilepath,
|
||||
energy: sz.energy,
|
||||
energyTimestamp: sz.energyTimestamp,
|
||||
maxEnergyRange: sz.maxEnergyRange,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if m.RAPLZoneInfo == nil {
|
||||
return fmt.Errorf("no running average power limit (RAPL) device found in %s", controlTypePath)
|
||||
|
||||
}
|
||||
|
||||
// Initialized
|
||||
cclog.ComponentDebug(
|
||||
m.name,
|
||||
"initialized",
|
||||
len(m.RAPLZoneInfo),
|
||||
"zones with running average power limit (RAPL) monitoring attributes")
|
||||
m.init = true
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// Read reads running average power limit (RAPL) monitoring attributes for all initialized zones
|
||||
// See: https://www.kernel.org/doc/html/latest/power/powercap/powercap.html#monitoring-attributes
|
||||
func (m *RAPLCollector) Read(interval time.Duration, output chan lp.CCMetric) {
|
||||
|
||||
for i := range m.RAPLZoneInfo {
|
||||
p := &m.RAPLZoneInfo[i]
|
||||
|
||||
// Read current value of the energy counter in micro joules
|
||||
if v, err := os.ReadFile(p.energyFilepath); err == nil {
|
||||
energyTimestamp := time.Now()
|
||||
if i, err := strconv.ParseInt(strings.TrimSpace(string(v)), 10, 64); err == nil {
|
||||
energy := i
|
||||
|
||||
// Compute average power (Δ energy / Δ time)
|
||||
energyDiff := energy - p.energy
|
||||
if energyDiff < 0 {
|
||||
// Handle overflow:
|
||||
// ( p.maxEnergyRange - p.energy ) + energy
|
||||
// = p.maxEnergyRange + ( energy - p.energy )
|
||||
// = p.maxEnergyRange + diffEnergy
|
||||
energyDiff += p.maxEnergyRange
|
||||
}
|
||||
timeDiff := energyTimestamp.Sub(p.energyTimestamp)
|
||||
averagePower := float64(energyDiff) / float64(timeDiff.Microseconds())
|
||||
|
||||
y, err := lp.New(
|
||||
"rapl_average_power",
|
||||
p.tags,
|
||||
m.meta,
|
||||
map[string]interface{}{"value": averagePower},
|
||||
energyTimestamp)
|
||||
if err == nil {
|
||||
output <- y
|
||||
}
|
||||
|
||||
// Save current energy counter state
|
||||
p.energy = energy
|
||||
p.energyTimestamp = energyTimestamp
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Close closes running average power limit (RAPL) metric collector
|
||||
func (m *RAPLCollector) Close() {
|
||||
// Unset flag
|
||||
m.init = false
|
||||
}
|
@@ -1,18 +0,0 @@
|
||||
# Running average power limit (RAPL) metric collector
|
||||
|
||||
This collector reads running average power limit (RAPL) monitoring attributes to compute average power consumption metrics. See <https://www.kernel.org/doc/html/latest/power/powercap/powercap.html#monitoring-attributes>.
|
||||
|
||||
The Likwid metric collector provides similar functionality.
|
||||
|
||||
## Configuration
|
||||
|
||||
```json
|
||||
"rapl": {
|
||||
"exclude_device_by_id": ["0:1", "0:2"],
|
||||
"exclude_device_by_name": ["psys"]
|
||||
}
|
||||
```
|
||||
|
||||
## Metrics
|
||||
|
||||
* `rapl_average_power`: average power consumption in Watt. The average is computed over the entire runtime from the last measurement to the current measurement
|
@@ -17,9 +17,9 @@ type SampleCollectorConfig struct {
|
||||
// defined by metricCollector (name, init, ...)
|
||||
type SampleCollector struct {
|
||||
metricCollector
|
||||
config SampleCollectorConfig // the configuration structure
|
||||
meta map[string]string // default meta information
|
||||
tags map[string]string // default tags
|
||||
config SampleTimerCollectorConfig // the configuration structure
|
||||
meta map[string]string // default meta information
|
||||
tags map[string]string // default tags
|
||||
}
|
||||
|
||||
// Functions to implement MetricCollector interface
|
||||
@@ -36,14 +36,14 @@ func (m *SampleCollector) Init(config json.RawMessage) error {
|
||||
// This is for later use, also call it early
|
||||
m.setup()
|
||||
// Tell whether the collector should be run in parallel with others (reading files, ...)
|
||||
// or it should be run serially, mostly for collectors actually doing measurements
|
||||
// or it should be run serially, mostly for collectors acutally doing measurements
|
||||
// because they should not measure the execution of the other collectors
|
||||
m.parallel = true
|
||||
// Define meta information sent with each metric
|
||||
// (Can also be dynamic or this is the basic set with extension through AddMeta())
|
||||
m.meta = map[string]string{"source": m.name, "group": "SAMPLE"}
|
||||
// Define tags sent with each metric
|
||||
// The 'type' tag is always needed, it defines the granularity of the metric
|
||||
// The 'type' tag is always needed, it defines the granulatity of the metric
|
||||
// node -> whole system
|
||||
// socket -> CPU socket (requires socket ID as 'type-id' tag)
|
||||
// die -> CPU die (requires CPU die ID as 'type-id' tag)
|
||||
|
@@ -38,7 +38,7 @@ func (m *SampleTimerCollector) Init(name string, config json.RawMessage) error {
|
||||
// (Can also be dynamic or this is the basic set with extension through AddMeta())
|
||||
m.meta = map[string]string{"source": m.name, "group": "SAMPLE"}
|
||||
// Define tags sent with each metric
|
||||
// The 'type' tag is always needed, it defines the granularity of the metric
|
||||
// The 'type' tag is always needed, it defines the granulatity of the metric
|
||||
// node -> whole system
|
||||
// socket -> CPU socket (requires socket ID as 'type-id' tag)
|
||||
// cpu -> single CPU hardware thread (requires cpu ID as 'type-id' tag)
|
||||
@@ -60,7 +60,7 @@ func (m *SampleTimerCollector) Init(name string, config json.RawMessage) error {
|
||||
|
||||
// Storage for output channel
|
||||
m.output = nil
|
||||
// Management channel for the timer function.
|
||||
// Mangement channel for the timer function.
|
||||
m.done = make(chan bool)
|
||||
// Create the own ticker
|
||||
m.ticker = time.NewTicker(m.interval)
|
||||
@@ -94,7 +94,7 @@ func (m *SampleTimerCollector) ReadMetrics(timestamp time.Time) {
|
||||
|
||||
value := 1.0
|
||||
|
||||
// If you want to measure something for a specific amount of time, use interval
|
||||
// If you want to measure something for a specific amout of time, use interval
|
||||
// start := readState()
|
||||
// time.Sleep(interval)
|
||||
// stop := readState()
|
||||
|
@@ -1,144 +0,0 @@
|
||||
package collectors
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"runtime"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
|
||||
lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
|
||||
)
|
||||
|
||||
type SelfCollectorConfig struct {
|
||||
MemStats bool `json:"read_mem_stats"`
|
||||
GoRoutines bool `json:"read_goroutines"`
|
||||
CgoCalls bool `json:"read_cgo_calls"`
|
||||
Rusage bool `json:"read_rusage"`
|
||||
}
|
||||
|
||||
type SelfCollector struct {
|
||||
metricCollector
|
||||
config SelfCollectorConfig // the configuration structure
|
||||
meta map[string]string // default meta information
|
||||
tags map[string]string // default tags
|
||||
}
|
||||
|
||||
func (m *SelfCollector) Init(config json.RawMessage) error {
|
||||
var err error = nil
|
||||
m.name = "SelfCollector"
|
||||
m.setup()
|
||||
m.parallel = true
|
||||
m.meta = map[string]string{"source": m.name, "group": "Self"}
|
||||
m.tags = map[string]string{"type": "node"}
|
||||
if len(config) > 0 {
|
||||
err = json.Unmarshal(config, &m.config)
|
||||
if err != nil {
|
||||
cclog.ComponentError(m.name, "Error reading config:", err.Error())
|
||||
return err
|
||||
}
|
||||
}
|
||||
m.init = true
|
||||
return err
|
||||
}
|
||||
|
||||
func (m *SelfCollector) Read(interval time.Duration, output chan lp.CCMetric) {
|
||||
timestamp := time.Now()
|
||||
|
||||
if m.config.MemStats {
|
||||
var memstats runtime.MemStats
|
||||
runtime.ReadMemStats(&memstats)
|
||||
|
||||
y, err := lp.New("total_alloc", m.tags, m.meta, map[string]interface{}{"value": memstats.TotalAlloc}, timestamp)
|
||||
if err == nil {
|
||||
y.AddMeta("unit", "Bytes")
|
||||
output <- y
|
||||
}
|
||||
y, err = lp.New("heap_alloc", m.tags, m.meta, map[string]interface{}{"value": memstats.HeapAlloc}, timestamp)
|
||||
if err == nil {
|
||||
y.AddMeta("unit", "Bytes")
|
||||
output <- y
|
||||
}
|
||||
y, err = lp.New("heap_sys", m.tags, m.meta, map[string]interface{}{"value": memstats.HeapSys}, timestamp)
|
||||
if err == nil {
|
||||
y.AddMeta("unit", "Bytes")
|
||||
output <- y
|
||||
}
|
||||
y, err = lp.New("heap_idle", m.tags, m.meta, map[string]interface{}{"value": memstats.HeapIdle}, timestamp)
|
||||
if err == nil {
|
||||
y.AddMeta("unit", "Bytes")
|
||||
output <- y
|
||||
}
|
||||
y, err = lp.New("heap_inuse", m.tags, m.meta, map[string]interface{}{"value": memstats.HeapInuse}, timestamp)
|
||||
if err == nil {
|
||||
y.AddMeta("unit", "Bytes")
|
||||
output <- y
|
||||
}
|
||||
y, err = lp.New("heap_released", m.tags, m.meta, map[string]interface{}{"value": memstats.HeapReleased}, timestamp)
|
||||
if err == nil {
|
||||
y.AddMeta("unit", "Bytes")
|
||||
output <- y
|
||||
}
|
||||
y, err = lp.New("heap_objects", m.tags, m.meta, map[string]interface{}{"value": memstats.HeapObjects}, timestamp)
|
||||
if err == nil {
|
||||
output <- y
|
||||
}
|
||||
}
|
||||
if m.config.GoRoutines {
|
||||
y, err := lp.New("num_goroutines", m.tags, m.meta, map[string]interface{}{"value": runtime.NumGoroutine()}, timestamp)
|
||||
if err == nil {
|
||||
output <- y
|
||||
}
|
||||
}
|
||||
if m.config.CgoCalls {
|
||||
y, err := lp.New("num_cgo_calls", m.tags, m.meta, map[string]interface{}{"value": runtime.NumCgoCall()}, timestamp)
|
||||
if err == nil {
|
||||
output <- y
|
||||
}
|
||||
}
|
||||
if m.config.Rusage {
|
||||
var rusage syscall.Rusage
|
||||
err := syscall.Getrusage(syscall.RUSAGE_SELF, &rusage)
|
||||
if err == nil {
|
||||
sec, nsec := rusage.Utime.Unix()
|
||||
t := float64(sec) + (float64(nsec) * 1e-9)
|
||||
y, err := lp.New("rusage_user_time", m.tags, m.meta, map[string]interface{}{"value": t}, timestamp)
|
||||
if err == nil {
|
||||
y.AddMeta("unit", "seconds")
|
||||
output <- y
|
||||
}
|
||||
sec, nsec = rusage.Stime.Unix()
|
||||
t = float64(sec) + (float64(nsec) * 1e-9)
|
||||
y, err = lp.New("rusage_system_time", m.tags, m.meta, map[string]interface{}{"value": t}, timestamp)
|
||||
if err == nil {
|
||||
y.AddMeta("unit", "seconds")
|
||||
output <- y
|
||||
}
|
||||
y, err = lp.New("rusage_vol_ctx_switch", m.tags, m.meta, map[string]interface{}{"value": rusage.Nvcsw}, timestamp)
|
||||
if err == nil {
|
||||
output <- y
|
||||
}
|
||||
y, err = lp.New("rusage_invol_ctx_switch", m.tags, m.meta, map[string]interface{}{"value": rusage.Nivcsw}, timestamp)
|
||||
if err == nil {
|
||||
output <- y
|
||||
}
|
||||
y, err = lp.New("rusage_signals", m.tags, m.meta, map[string]interface{}{"value": rusage.Nsignals}, timestamp)
|
||||
if err == nil {
|
||||
output <- y
|
||||
}
|
||||
y, err = lp.New("rusage_major_pgfaults", m.tags, m.meta, map[string]interface{}{"value": rusage.Majflt}, timestamp)
|
||||
if err == nil {
|
||||
output <- y
|
||||
}
|
||||
y, err = lp.New("rusage_minor_pgfaults", m.tags, m.meta, map[string]interface{}{"value": rusage.Minflt}, timestamp)
|
||||
if err == nil {
|
||||
output <- y
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
func (m *SelfCollector) Close() {
|
||||
m.init = false
|
||||
}
|
@@ -1,34 +0,0 @@
|
||||
## `self` collector
|
||||
|
||||
```json
|
||||
"self": {
|
||||
"read_mem_stats" : true,
|
||||
"read_goroutines" : true,
|
||||
"read_cgo_calls" : true,
|
||||
"read_rusage" : true
|
||||
}
|
||||
```
|
||||
|
||||
The `self` collector reads the data from the `runtime` and `syscall` packages, so monitors the execution of the cc-metric-collector itself.
|
||||
|
||||
Metrics:
|
||||
* If `read_mem_stats == true`:
|
||||
* `total_alloc`: The metric reports cumulative bytes allocated for heap objects.
|
||||
* `heap_alloc`: The metric reports bytes of allocated heap objects.
|
||||
* `heap_sys`: The metric reports bytes of heap memory obtained from the OS.
|
||||
* `heap_idle`: The metric reports bytes in idle (unused) spans.
|
||||
* `heap_inuse`: The metric reports bytes in in-use spans.
|
||||
* `heap_released`: The metric reports bytes of physical memory returned to the OS.
|
||||
* `heap_objects`: The metric reports the number of allocated heap objects.
|
||||
* If `read_goroutines == true`:
|
||||
* `num_goroutines`: The metric reports the number of goroutines that currently exist.
|
||||
* If `read_cgo_calls == true`:
|
||||
* `num_cgo_calls`: The metric reports the number of cgo calls made by the current process.
|
||||
* If `read_rusage == true`:
|
||||
* `rusage_user_time`: The metric reports the amount of time that this process has been scheduled in user mode.
|
||||
* `rusage_system_time`: The metric reports the amount of time that this process has been scheduled in kernel mode.
|
||||
* `rusage_vol_ctx_switch`: The metric reports the amount of voluntary context switches.
|
||||
* `rusage_invol_ctx_switch`: The metric reports the amount of involuntary context switches.
|
||||
* `rusage_signals`: The metric reports the number of signals received.
|
||||
* `rusage_major_pgfaults`: The metric reports the number of major faults the process has made which have required loading a memory page from disk.
|
||||
* `rusage_minor_pgfaults`: The metric reports the number of minor faults the process has made which have not required loading a memory page from disk.
|
30
go.mod
30
go.mod
@@ -6,35 +6,35 @@ require (
|
||||
github.com/ClusterCockpit/cc-units v0.3.0
|
||||
github.com/ClusterCockpit/go-rocm-smi v0.3.0
|
||||
github.com/NVIDIA/go-nvml v0.11.6-0
|
||||
github.com/PaesslerAG/gval v1.2.1
|
||||
github.com/PaesslerAG/gval v1.2.0
|
||||
github.com/gorilla/mux v1.8.0
|
||||
github.com/influxdata/influxdb-client-go/v2 v2.12.0
|
||||
github.com/influxdata/influxdb-client-go/v2 v2.9.1
|
||||
github.com/influxdata/line-protocol v0.0.0-20210922203350-b1ad95c89adf
|
||||
github.com/nats-io/nats.go v1.20.0
|
||||
github.com/prometheus/client_golang v1.14.0
|
||||
github.com/nats-io/nats.go v1.16.0
|
||||
github.com/prometheus/client_golang v1.12.2
|
||||
github.com/stmcginnis/gofish v0.13.0
|
||||
github.com/tklauser/go-sysconf v0.3.11
|
||||
golang.org/x/sys v0.2.0
|
||||
github.com/tklauser/go-sysconf v0.3.10
|
||||
golang.org/x/sys v0.0.0-20220712014510-0a85c31ab51e
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/apapsch/go-jsonmerge/v2 v2.0.0 // indirect
|
||||
github.com/beorn7/perks v1.0.1 // indirect
|
||||
github.com/cespare/xxhash/v2 v2.1.2 // indirect
|
||||
github.com/deepmap/oapi-codegen v1.12.3 // indirect
|
||||
github.com/deepmap/oapi-codegen v1.11.0 // indirect
|
||||
github.com/golang/protobuf v1.5.2 // indirect
|
||||
github.com/google/uuid v1.3.0 // indirect
|
||||
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
|
||||
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
|
||||
github.com/nats-io/nats-server/v2 v2.8.4 // indirect
|
||||
github.com/nats-io/nkeys v0.3.0 // indirect
|
||||
github.com/nats-io/nuid v1.0.1 // indirect
|
||||
github.com/pkg/errors v0.9.1 // indirect
|
||||
github.com/prometheus/client_model v0.3.0 // indirect
|
||||
github.com/prometheus/client_model v0.2.0 // indirect
|
||||
github.com/prometheus/common v0.37.0 // indirect
|
||||
github.com/prometheus/procfs v0.8.0 // indirect
|
||||
github.com/prometheus/procfs v0.7.3 // indirect
|
||||
github.com/shopspring/decimal v1.3.1 // indirect
|
||||
github.com/tklauser/numcpus v0.6.0 // indirect
|
||||
golang.org/x/crypto v0.3.0 // indirect
|
||||
golang.org/x/net v0.2.0 // indirect
|
||||
google.golang.org/protobuf v1.28.1 // indirect
|
||||
github.com/tklauser/numcpus v0.4.0 // indirect
|
||||
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d // indirect
|
||||
golang.org/x/net v0.0.0-20220708220712-1185a9018129 // indirect
|
||||
google.golang.org/protobuf v1.28.0 // indirect
|
||||
gopkg.in/yaml.v2 v2.4.0 // indirect
|
||||
)
|
||||
|
@@ -9,8 +9,8 @@ import (
|
||||
|
||||
cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
|
||||
|
||||
agg "github.com/ClusterCockpit/cc-metric-collector/internal/metricAggregator"
|
||||
lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
|
||||
agg "github.com/ClusterCockpit/cc-metric-collector/internal/metricAggregator"
|
||||
mct "github.com/ClusterCockpit/cc-metric-collector/pkg/multiChanTicker"
|
||||
units "github.com/ClusterCockpit/cc-units"
|
||||
)
|
||||
@@ -281,7 +281,9 @@ func (r *metricRouter) Start() {
|
||||
// Foward message received from collector channel
|
||||
coll_forward := func(p lp.CCMetric) {
|
||||
// receive from metric collector
|
||||
p.AddTag(r.config.HostnameTagName, r.hostname)
|
||||
if !p.HasTag(r.config.HostnameTagName) {
|
||||
p.AddTag(r.config.HostnameTagName, r.hostname)
|
||||
}
|
||||
if r.config.IntervalStamp {
|
||||
p.SetTime(r.timestamp)
|
||||
}
|
||||
@@ -310,7 +312,9 @@ func (r *metricRouter) Start() {
|
||||
cache_forward := func(p lp.CCMetric) {
|
||||
// receive from metric collector
|
||||
if !r.dropMetric(p) {
|
||||
p.AddTag(r.config.HostnameTagName, r.hostname)
|
||||
if !p.HasTag(r.config.HostnameTagName) {
|
||||
p.AddTag(r.config.HostnameTagName, r.hostname)
|
||||
}
|
||||
forward(p)
|
||||
}
|
||||
}
|
||||
|
@@ -1,8 +1,8 @@
|
||||
{
|
||||
"natsrecv": {
|
||||
"natsrecv" : {
|
||||
"type": "nats",
|
||||
"address": "nats://my-url",
|
||||
"port": "4222",
|
||||
"port" : "4222",
|
||||
"database": "testcluster"
|
||||
},
|
||||
"redfish_recv": {
|
||||
@@ -21,30 +21,5 @@
|
||||
"endpoint": "https://my-endpoint-2"
|
||||
}
|
||||
]
|
||||
},
|
||||
"ipmi_recv": {
|
||||
"type": "ipmi",
|
||||
"exclude_metrics": [
|
||||
"fan_speed",
|
||||
"voltage"
|
||||
],
|
||||
"client_config": [
|
||||
{
|
||||
"username": "username-1",
|
||||
"password": "password-1",
|
||||
"endpoint": "ipmi-sensors://my-endpoint-1",
|
||||
"host_list": [
|
||||
"my-host-1"
|
||||
]
|
||||
},
|
||||
{
|
||||
"username": "username-2",
|
||||
"password": "password-2",
|
||||
"endpoint": "ipmi-sensors://my-endpoint-2",
|
||||
"host_list": [
|
||||
"my-host-2"
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@@ -1,164 +0,0 @@
|
||||
package receivers
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net"
|
||||
"os"
|
||||
"sync"
|
||||
|
||||
cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
|
||||
lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
|
||||
influx "github.com/influxdata/line-protocol"
|
||||
)
|
||||
|
||||
// SampleReceiver configuration: receiver type, listen address, port
|
||||
type AppMetricReceiverConfig struct {
|
||||
Type string `json:"type"`
|
||||
SocketFile string `json:"socket_file"`
|
||||
}
|
||||
|
||||
type AppMetricReceiver struct {
|
||||
receiver
|
||||
config AppMetricReceiverConfig
|
||||
|
||||
// Storage for static information
|
||||
meta map[string]string
|
||||
// Use in case of own go routine
|
||||
done chan bool
|
||||
wg sync.WaitGroup
|
||||
// Influx stuff
|
||||
handler *influx.MetricHandler
|
||||
parser *influx.Parser
|
||||
// WaitGroup for individual connections
|
||||
connWg sync.WaitGroup
|
||||
listener net.Listener
|
||||
}
|
||||
|
||||
func (r *AppMetricReceiver) newConnection(conn net.Conn) {
|
||||
//defer conn.Close()
|
||||
//defer wg.Done()
|
||||
|
||||
buffer, err := bufio.NewReader(conn).ReadBytes('\n')
|
||||
if err != nil {
|
||||
conn.Close()
|
||||
return
|
||||
}
|
||||
|
||||
metrics, err := r.parser.Parse(buffer)
|
||||
if err != nil {
|
||||
cclog.ComponentError(r.name, "failed to parse received metrics")
|
||||
return
|
||||
}
|
||||
for _, m := range metrics {
|
||||
y := lp.FromInfluxMetric(m)
|
||||
for k, v := range r.meta {
|
||||
y.AddMeta(k, v)
|
||||
}
|
||||
if r.sink != nil {
|
||||
r.sink <- y
|
||||
}
|
||||
}
|
||||
|
||||
r.newConnection(conn)
|
||||
|
||||
}
|
||||
|
||||
func (r *AppMetricReceiver) newAccepter(listenSocket net.Listener) {
|
||||
accept_loop:
|
||||
for {
|
||||
select {
|
||||
case <-r.done:
|
||||
break accept_loop
|
||||
default:
|
||||
conn, err := listenSocket.Accept()
|
||||
if err == nil {
|
||||
r.connWg.Add(1)
|
||||
go func() {
|
||||
r.newConnection(conn)
|
||||
r.connWg.Done()
|
||||
}()
|
||||
}
|
||||
}
|
||||
}
|
||||
r.wg.Done()
|
||||
}
|
||||
|
||||
// Implement functions required for Receiver interface
|
||||
// Start(), Close()
|
||||
// See: metricReceiver.go
|
||||
|
||||
func (r *AppMetricReceiver) Start() {
|
||||
var err error = nil
|
||||
cclog.ComponentDebug(r.name, "START")
|
||||
|
||||
r.listener, err = net.Listen("unix", r.config.SocketFile)
|
||||
if err != nil {
|
||||
cclog.ComponentError(r.name, "failed to listen at socket", r.config.SocketFile)
|
||||
}
|
||||
if _, err := os.Stat(r.config.SocketFile); err != nil {
|
||||
cclog.ComponentError(r.name, "failed to create socket", r.config.SocketFile)
|
||||
}
|
||||
|
||||
r.done = make(chan bool)
|
||||
r.wg.Add(1)
|
||||
go r.newAccepter(r.listener)
|
||||
|
||||
}
|
||||
|
||||
// Close receiver: close network connection, close files, close libraries, ...
|
||||
func (r *AppMetricReceiver) Close() {
|
||||
cclog.ComponentDebug(r.name, "CLOSE")
|
||||
|
||||
if _, err := os.Stat(r.config.SocketFile); err == nil {
|
||||
if err := os.RemoveAll(r.config.SocketFile); err != nil {
|
||||
cclog.ComponentError(r.name, "Failed to remove UNIX socket", r.config.SocketFile)
|
||||
}
|
||||
}
|
||||
|
||||
// in case of own go routine, send the signal and wait
|
||||
r.listener.Close()
|
||||
r.done <- true
|
||||
close(r.done)
|
||||
r.connWg.Wait()
|
||||
r.wg.Wait()
|
||||
}
|
||||
|
||||
// New function to create a new instance of the receiver
|
||||
// Initialize the receiver by giving it a name and reading in the config JSON
|
||||
func NewAppMetricReceiver(name string, config json.RawMessage) (Receiver, error) {
|
||||
r := new(AppMetricReceiver)
|
||||
|
||||
// Set name of SampleReceiver
|
||||
// The name should be chosen in such a way that different instances of SampleReceiver can be distinguished
|
||||
r.name = fmt.Sprintf("AppMetricReceiver(%s)", name)
|
||||
|
||||
// Set static information
|
||||
r.meta = map[string]string{"source": r.name}
|
||||
|
||||
// Set defaults in r.config
|
||||
// Allow overwriting these defaults by reading config JSON
|
||||
r.config.SocketFile = "/tmp/cc.sock"
|
||||
|
||||
// Read the sample receiver specific JSON config
|
||||
if len(config) > 0 {
|
||||
err := json.Unmarshal(config, &r.config)
|
||||
if err != nil {
|
||||
cclog.ComponentError(r.name, "Error reading config:", err.Error())
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
if len(r.config.SocketFile) == 0 {
|
||||
cclog.ComponentError(r.name, "Invalid socket_file setting:", r.config.SocketFile)
|
||||
return nil, fmt.Errorf("invalid socket_file setting: %s", r.config.SocketFile)
|
||||
}
|
||||
|
||||
// Check that all required fields in the configuration are set
|
||||
// Use 'if len(r.config.Option) > 0' for strings
|
||||
r.handler = influx.NewMetricHandler()
|
||||
r.parser = influx.NewParser(r.handler)
|
||||
r.parser.SetTimeFunc(DefaultTime)
|
||||
|
||||
return r, nil
|
||||
}
|
@@ -1,23 +0,0 @@
|
||||
## `appmetrics` receiver
|
||||
|
||||
The `appmetrics` receiver can be used to submit metrics from an application into the monitoring system. It listens for incoming connections on a UNIX socket.
|
||||
|
||||
### Configuration structure
|
||||
|
||||
```json
|
||||
{
|
||||
"<name>": {
|
||||
"type": "appmetrics",
|
||||
"socket_file" : "/tmp/cc.sock",
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
- `type`: makes the receiver a `appmetrics` receiver
|
||||
- `socket_file`: Listen UNIX socket
|
||||
|
||||
### Inputs from applications
|
||||
|
||||
Applcations can connect to the `appmetrics` socket and provide metric in the [InfluxDB line protocol](https://github.com/influxdata/line-protocol). It is currently not possible to submit meta information as the Influx line protocol does not know them.
|
||||
|
||||
|
@@ -1,525 +0,0 @@
|
||||
package receivers
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"os/exec"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
|
||||
lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
|
||||
)
|
||||
|
||||
type IPMIReceiverClientConfig struct {
|
||||
|
||||
// Hostname the IPMI service belongs to
|
||||
Protocol string // Protocol / tool to use for IPMI sensor reading
|
||||
DriverType string // Out of band IPMI driver
|
||||
Fanout int // Maximum number of simultaneous IPMI connections
|
||||
NumHosts int // Number of remote IPMI devices with the same configuration
|
||||
IPMIHosts string // List of remote IPMI devices to communicate with
|
||||
IPMI2HostMapping map[string]string // Mapping between IPMI device name and host name
|
||||
Username string // User name to authenticate with
|
||||
Password string // Password to use for authentication
|
||||
CLIOptions []string // Additional command line options for ipmi-sensors
|
||||
isExcluded map[string]bool // is metric excluded
|
||||
}
|
||||
|
||||
type IPMIReceiver struct {
|
||||
receiver
|
||||
config struct {
|
||||
Interval time.Duration
|
||||
|
||||
// Client config for each IPMI hosts
|
||||
ClientConfigs []IPMIReceiverClientConfig
|
||||
}
|
||||
|
||||
// Storage for static information
|
||||
meta map[string]string
|
||||
|
||||
done chan bool // channel to finish / stop IPMI receiver
|
||||
wg sync.WaitGroup // wait group for IPMI receiver
|
||||
}
|
||||
|
||||
// doReadMetrics reads metrics from all configure IPMI hosts.
|
||||
func (r *IPMIReceiver) doReadMetric() {
|
||||
for i := range r.config.ClientConfigs {
|
||||
clientConfig := &r.config.ClientConfigs[i]
|
||||
var cmd_options []string
|
||||
if clientConfig.Protocol == "ipmi-sensors" {
|
||||
cmd_options = append(cmd_options,
|
||||
"--always-prefix",
|
||||
"--sdr-cache-recreate",
|
||||
// Attempt to interpret OEM data, such as event data, sensor readings, or general extra info
|
||||
"--interpret-oem-data",
|
||||
// Ignore not-available (i.e. N/A) sensors in output
|
||||
"--ignore-not-available-sensors",
|
||||
// Ignore unrecognized sensor events
|
||||
"--ignore-unrecognized-events",
|
||||
// Output fields in comma separated format
|
||||
"--comma-separated-output",
|
||||
// Do not output column headers
|
||||
"--no-header-output",
|
||||
// Output non-abbreviated units (e.g. 'Amps' instead of 'A').
|
||||
// May aid in disambiguation of units (e.g. 'C' for Celsius or Coulombs).
|
||||
"--non-abbreviated-units",
|
||||
"--fanout", fmt.Sprint(clientConfig.Fanout),
|
||||
"--driver-type", clientConfig.DriverType,
|
||||
"--hostname", clientConfig.IPMIHosts,
|
||||
"--username", clientConfig.Username,
|
||||
"--password", clientConfig.Password,
|
||||
)
|
||||
cmd_options := append(cmd_options, clientConfig.CLIOptions...)
|
||||
|
||||
command := exec.Command("ipmi-sensors", cmd_options...)
|
||||
stdout, _ := command.StdoutPipe()
|
||||
errBuf := new(bytes.Buffer)
|
||||
command.Stderr = errBuf
|
||||
|
||||
// start command
|
||||
if err := command.Start(); err != nil {
|
||||
cclog.ComponentError(
|
||||
r.name,
|
||||
fmt.Sprintf("doReadMetric(): Failed to start command \"%s\": %v", command.String(), err),
|
||||
)
|
||||
continue
|
||||
}
|
||||
|
||||
// Read command output
|
||||
const (
|
||||
idxID = iota
|
||||
idxName
|
||||
idxType
|
||||
idxReading
|
||||
idxUnits
|
||||
idxEvent
|
||||
)
|
||||
numPrefixRegex := regexp.MustCompile("^[[:digit:]][[:digit:]]-(.*)$")
|
||||
scanner := bufio.NewScanner(stdout)
|
||||
for scanner.Scan() {
|
||||
// Read host
|
||||
v1 := strings.Split(scanner.Text(), ": ")
|
||||
if len(v1) != 2 {
|
||||
continue
|
||||
}
|
||||
host, ok := clientConfig.IPMI2HostMapping[v1[0]]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
// Read sensors
|
||||
v2 := strings.Split(v1[1], ",")
|
||||
if len(v2) != 6 {
|
||||
continue
|
||||
}
|
||||
// Skip sensors with non available sensor readings
|
||||
if v2[idxReading] == "N/A" {
|
||||
continue
|
||||
}
|
||||
|
||||
metric := strings.ToLower(v2[idxType])
|
||||
name := strings.ToLower(
|
||||
strings.Replace(
|
||||
strings.TrimSpace(
|
||||
v2[idxName]), " ", "_", -1))
|
||||
// remove prefix enumeration like 01-...
|
||||
if v := numPrefixRegex.FindStringSubmatch(name); v != nil {
|
||||
name = v[1]
|
||||
}
|
||||
unit := v2[idxUnits]
|
||||
if unit == "Watts" {
|
||||
|
||||
// Power
|
||||
metric = "power"
|
||||
name = strings.TrimSuffix(name, "_power")
|
||||
name = strings.TrimSuffix(name, "_pwr")
|
||||
name = strings.TrimPrefix(name, "pwr_")
|
||||
} else if metric == "voltage" &&
|
||||
unit == "Volts" {
|
||||
|
||||
// Voltage
|
||||
name = strings.TrimPrefix(name, "volt_")
|
||||
} else if metric == "current" &&
|
||||
unit == "Amps" {
|
||||
|
||||
// Current
|
||||
unit = "Ampere"
|
||||
} else if metric == "temperature" &&
|
||||
unit == "degrees C" {
|
||||
|
||||
// Temperature
|
||||
name = strings.TrimSuffix(name, "_temp")
|
||||
unit = "degC"
|
||||
} else if metric == "temperature" &&
|
||||
unit == "degrees F" {
|
||||
|
||||
// Temperature
|
||||
name = strings.TrimSuffix(name, "_temp")
|
||||
unit = "degF"
|
||||
} else if metric == "fan" && unit == "RPM" {
|
||||
|
||||
// Fan speed
|
||||
metric = "fan_speed"
|
||||
name = strings.TrimSuffix(name, "_tach")
|
||||
name = strings.TrimPrefix(name, "spd_")
|
||||
} else if (metric == "cooling device" ||
|
||||
metric == "other units based sensor") &&
|
||||
name == "system_air_flow" &&
|
||||
unit == "CFM" {
|
||||
|
||||
// Air flow
|
||||
metric = "air_flow"
|
||||
name = strings.TrimSuffix(name, "_air_flow")
|
||||
unit = "CubicFeetPerMinute"
|
||||
} else if (metric == "processor" ||
|
||||
metric == "other units based sensor") &&
|
||||
(name == "cpu_utilization" ||
|
||||
name == "io_utilization" ||
|
||||
name == "mem_utilization" ||
|
||||
name == "sys_utilization") &&
|
||||
(unit == "unspecified" ||
|
||||
unit == "%") {
|
||||
|
||||
// Utilization
|
||||
metric = "utilization"
|
||||
name = strings.TrimSuffix(name, "_utilization")
|
||||
unit = "percent"
|
||||
} else {
|
||||
if false {
|
||||
// Debug output for unprocessed metrics
|
||||
fmt.Printf(
|
||||
"host: '%s', metric: '%s', name: '%s', unit: '%s'\n",
|
||||
host, metric, name, unit)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
// Skip excluded metrics
|
||||
if clientConfig.isExcluded[metric] {
|
||||
continue
|
||||
}
|
||||
|
||||
// Parse sensor value
|
||||
value, err := strconv.ParseFloat(v2[idxReading], 64)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
y, err := lp.New(
|
||||
metric,
|
||||
map[string]string{
|
||||
"hostname": host,
|
||||
"type": "node",
|
||||
"name": name,
|
||||
},
|
||||
map[string]string{
|
||||
"source": r.name,
|
||||
"group": "IPMI",
|
||||
"unit": unit,
|
||||
},
|
||||
map[string]interface{}{
|
||||
"value": value,
|
||||
},
|
||||
time.Now())
|
||||
if err == nil {
|
||||
r.sink <- y
|
||||
}
|
||||
}
|
||||
|
||||
// Wait for command end
|
||||
if err := command.Wait(); err != nil {
|
||||
errMsg, _ := io.ReadAll(errBuf)
|
||||
cclog.ComponentError(
|
||||
r.name,
|
||||
fmt.Sprintf("doReadMetric(): Failed to wait for the end of command \"%s\": %v\n",
|
||||
strings.Replace(command.String(), clientConfig.Password, "<PW>", -1), err),
|
||||
fmt.Sprintf("doReadMetric(): command stderr: \"%s\"\n", string(errMsg)),
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *IPMIReceiver) Start() {
|
||||
cclog.ComponentDebug(r.name, "START")
|
||||
|
||||
// Start IPMI receiver
|
||||
r.wg.Add(1)
|
||||
go func() {
|
||||
defer r.wg.Done()
|
||||
|
||||
// Create ticker
|
||||
ticker := time.NewTicker(r.config.Interval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
r.doReadMetric()
|
||||
|
||||
select {
|
||||
case tickerTime := <-ticker.C:
|
||||
// Check if we missed the ticker event
|
||||
if since := time.Since(tickerTime); since > 5*time.Second {
|
||||
cclog.ComponentInfo(r.name, "Missed ticker event for more then", since)
|
||||
}
|
||||
|
||||
// process ticker event -> continue
|
||||
continue
|
||||
case <-r.done:
|
||||
// process done event
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
cclog.ComponentDebug(r.name, "STARTED")
|
||||
}
|
||||
|
||||
// Close receiver: close network connection, close files, close libraries, ...
|
||||
func (r *IPMIReceiver) Close() {
|
||||
cclog.ComponentDebug(r.name, "CLOSE")
|
||||
|
||||
// Send the signal and wait
|
||||
close(r.done)
|
||||
r.wg.Wait()
|
||||
|
||||
cclog.ComponentDebug(r.name, "DONE")
|
||||
}
|
||||
|
||||
// NewIPMIReceiver creates a new instance of the redfish receiver
|
||||
// Initialize the receiver by giving it a name and reading in the config JSON
|
||||
func NewIPMIReceiver(name string, config json.RawMessage) (Receiver, error) {
|
||||
r := new(IPMIReceiver)
|
||||
|
||||
// Config options from config file
|
||||
configJSON := struct {
|
||||
Type string `json:"type"`
|
||||
|
||||
// How often the IPMI sensor metrics should be read and send to the sink (default: 30 s)
|
||||
IntervalString string `json:"interval,omitempty"`
|
||||
|
||||
// Maximum number of simultaneous IPMI connections (default: 64)
|
||||
Fanout int `json:"fanout,omitempty"`
|
||||
|
||||
// Out of band IPMI driver (default: LAN_2_0)
|
||||
DriverType string `json:"driver_type,omitempty"`
|
||||
|
||||
// Default client username, password and endpoint
|
||||
Username *string `json:"username"` // User name to authenticate with
|
||||
Password *string `json:"password"` // Password to use for authentication
|
||||
Endpoint *string `json:"endpoint"` // URL of the IPMI device
|
||||
|
||||
// Globally excluded metrics
|
||||
ExcludeMetrics []string `json:"exclude_metrics,omitempty"`
|
||||
|
||||
ClientConfigs []struct {
|
||||
Fanout int `json:"fanout,omitempty"` // Maximum number of simultaneous IPMI connections (default: 64)
|
||||
DriverType string `json:"driver_type,omitempty"` // Out of band IPMI driver (default: LAN_2_0)
|
||||
HostList []string `json:"host_list"` // List of hosts with the same client configuration
|
||||
Username *string `json:"username"` // User name to authenticate with
|
||||
Password *string `json:"password"` // Password to use for authentication
|
||||
Endpoint *string `json:"endpoint"` // URL of the IPMI service
|
||||
|
||||
// Per client excluded metrics
|
||||
ExcludeMetrics []string `json:"exclude_metrics,omitempty"`
|
||||
|
||||
// Additional command line options for ipmi-sensors
|
||||
CLIOptions []string `json:"cli_options,omitempty"`
|
||||
} `json:"client_config"`
|
||||
}{
|
||||
// Set defaults values
|
||||
// Allow overwriting these defaults by reading config JSON
|
||||
Fanout: 64,
|
||||
DriverType: "LAN_2_0",
|
||||
IntervalString: "30s",
|
||||
}
|
||||
|
||||
// Set name of IPMIReceiver
|
||||
r.name = fmt.Sprintf("IPMIReceiver(%s)", name)
|
||||
|
||||
// Create done channel
|
||||
r.done = make(chan bool)
|
||||
|
||||
// Set static information
|
||||
r.meta = map[string]string{"source": r.name}
|
||||
|
||||
// Read the IPMI receiver specific JSON config
|
||||
if len(config) > 0 {
|
||||
err := json.Unmarshal(config, &configJSON)
|
||||
if err != nil {
|
||||
cclog.ComponentError(r.name, "Error reading config:", err.Error())
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// Convert interval string representation to duration
|
||||
var err error
|
||||
r.config.Interval, err = time.ParseDuration(configJSON.IntervalString)
|
||||
if err != nil {
|
||||
err := fmt.Errorf(
|
||||
"Failed to parse duration string interval='%s': %w",
|
||||
configJSON.IntervalString,
|
||||
err,
|
||||
)
|
||||
cclog.Error(r.name, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Create client config from JSON config
|
||||
totalNumHosts := 0
|
||||
for i := range configJSON.ClientConfigs {
|
||||
clientConfigJSON := &configJSON.ClientConfigs[i]
|
||||
|
||||
var endpoint string
|
||||
if clientConfigJSON.Endpoint != nil {
|
||||
endpoint = *clientConfigJSON.Endpoint
|
||||
} else if configJSON.Endpoint != nil {
|
||||
endpoint = *configJSON.Endpoint
|
||||
} else {
|
||||
err := fmt.Errorf("client config number %v requires endpoint", i)
|
||||
cclog.ComponentError(r.name, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
fanout := configJSON.Fanout
|
||||
if clientConfigJSON.Fanout != 0 {
|
||||
fanout = clientConfigJSON.Fanout
|
||||
}
|
||||
|
||||
driverType := configJSON.DriverType
|
||||
if clientConfigJSON.DriverType != "" {
|
||||
driverType = clientConfigJSON.DriverType
|
||||
}
|
||||
if driverType != "LAN" && driverType != "LAN_2_0" {
|
||||
err := fmt.Errorf("client config number %v has invalid driver type %s", i, driverType)
|
||||
cclog.ComponentError(r.name, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var protocol string
|
||||
var host_pattern string
|
||||
if e := strings.Split(endpoint, "://"); len(e) == 2 {
|
||||
protocol = e[0]
|
||||
host_pattern = e[1]
|
||||
} else {
|
||||
err := fmt.Errorf("client config number %v has invalid endpoint %s", i, endpoint)
|
||||
cclog.ComponentError(r.name, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var username string
|
||||
if clientConfigJSON.Username != nil {
|
||||
username = *clientConfigJSON.Username
|
||||
} else if configJSON.Username != nil {
|
||||
username = *configJSON.Username
|
||||
} else {
|
||||
err := fmt.Errorf("client config number %v requires username", i)
|
||||
cclog.ComponentError(r.name, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var password string
|
||||
if clientConfigJSON.Password != nil {
|
||||
password = *clientConfigJSON.Password
|
||||
} else if configJSON.Password != nil {
|
||||
password = *configJSON.Password
|
||||
} else {
|
||||
err := fmt.Errorf("client config number %v requires password", i)
|
||||
cclog.ComponentError(r.name, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Create mapping between ipmi hostname and node hostname
|
||||
// This also guaranties that all ipmi hostnames are uniqu
|
||||
ipmi2HostMapping := make(map[string]string)
|
||||
for _, host := range clientConfigJSON.HostList {
|
||||
ipmiHost := strings.Replace(host_pattern, "%h", host, -1)
|
||||
ipmi2HostMapping[ipmiHost] = host
|
||||
}
|
||||
|
||||
numHosts := len(ipmi2HostMapping)
|
||||
totalNumHosts += numHosts
|
||||
ipmiHostList := make([]string, 0, numHosts)
|
||||
for ipmiHost := range ipmi2HostMapping {
|
||||
ipmiHostList = append(ipmiHostList, ipmiHost)
|
||||
}
|
||||
|
||||
// Additional command line options
|
||||
for _, v := range clientConfigJSON.CLIOptions {
|
||||
switch {
|
||||
case v == "-u" || strings.HasPrefix(v, "--username"):
|
||||
err := fmt.Errorf("client config number %v: do not set username in cli_options. Use json config username instead", i)
|
||||
cclog.ComponentError(r.name, err)
|
||||
return nil, err
|
||||
case v == "-p" || strings.HasPrefix(v, "--password"):
|
||||
err := fmt.Errorf("client config number %v: do not set password in cli_options. Use json config password instead", i)
|
||||
cclog.ComponentError(r.name, err)
|
||||
return nil, err
|
||||
case v == "-h" || strings.HasPrefix(v, "--hostname"):
|
||||
err := fmt.Errorf("client config number %v: do not set hostname in cli_options. Use json config host_list instead", i)
|
||||
cclog.ComponentError(r.name, err)
|
||||
return nil, err
|
||||
case v == "-D" || strings.HasPrefix(v, "--driver-type"):
|
||||
err := fmt.Errorf("client config number %v: do not set driver type in cli_options. Use json config driver_type instead", i)
|
||||
cclog.ComponentError(r.name, err)
|
||||
return nil, err
|
||||
case v == "-F" || strings.HasPrefix(v, " --fanout"):
|
||||
err := fmt.Errorf("client config number %v: do not set fanout in cli_options. Use json config fanout instead", i)
|
||||
cclog.ComponentError(r.name, err)
|
||||
return nil, err
|
||||
case v == "--always-prefix" ||
|
||||
v == "--sdr-cache-recreate" ||
|
||||
v == "--interpret-oem-data" ||
|
||||
v == "--ignore-not-available-sensors" ||
|
||||
v == "--ignore-unrecognized-events" ||
|
||||
v == "--comma-separated-output" ||
|
||||
v == "--no-header-output" ||
|
||||
v == "--non-abbreviated-units":
|
||||
err := fmt.Errorf("client config number %v: Do not use option %s in cli_options, it is used internally", i, v)
|
||||
cclog.ComponentError(r.name, err)
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
cliOptions := make([]string, 0)
|
||||
cliOptions = append(cliOptions, clientConfigJSON.CLIOptions...)
|
||||
|
||||
// Is metrics excluded globally or per client
|
||||
isExcluded := make(map[string]bool)
|
||||
for _, key := range clientConfigJSON.ExcludeMetrics {
|
||||
isExcluded[key] = true
|
||||
}
|
||||
for _, key := range configJSON.ExcludeMetrics {
|
||||
isExcluded[key] = true
|
||||
}
|
||||
|
||||
r.config.ClientConfigs = append(
|
||||
r.config.ClientConfigs,
|
||||
IPMIReceiverClientConfig{
|
||||
Protocol: protocol,
|
||||
Fanout: fanout,
|
||||
DriverType: driverType,
|
||||
NumHosts: numHosts,
|
||||
IPMIHosts: strings.Join(ipmiHostList, ","),
|
||||
IPMI2HostMapping: ipmi2HostMapping,
|
||||
Username: username,
|
||||
Password: password,
|
||||
CLIOptions: cliOptions,
|
||||
isExcluded: isExcluded,
|
||||
})
|
||||
}
|
||||
|
||||
if totalNumHosts == 0 {
|
||||
err := fmt.Errorf("at least one IPMI host config is required")
|
||||
cclog.ComponentError(r.name, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cclog.ComponentInfo(r.name, "monitoring", totalNumHosts, "IPMI hosts")
|
||||
return r, nil
|
||||
}
|
@@ -1,48 +0,0 @@
|
||||
## IPMI Receiver
|
||||
|
||||
The IPMI Receiver uses `ipmi-sensors` from the [FreeIPMI](https://www.gnu.org/software/freeipmi/) project to read IPMI sensor readings and sensor data repository (SDR) information. The available metrics depend on the sensors provided by the hardware vendor but typically contain temperature, fan speed, voltage and power metrics.
|
||||
|
||||
### Configuration structure
|
||||
|
||||
```json
|
||||
{
|
||||
"<IPMI receiver name>": {
|
||||
"type": "ipmi",
|
||||
"interval": "30s",
|
||||
"fanout": 256,
|
||||
"username": "<Username>",
|
||||
"password": "<Password>",
|
||||
"endpoint": "ipmi-sensors://%h-p",
|
||||
"exclude_metrics": [ "fan_speed", "voltage" ],
|
||||
"client_config": [
|
||||
{
|
||||
"host_list": ["n1", "n2", "n3", "n4" ]
|
||||
},
|
||||
{
|
||||
"host_list": [ "n5", "n6" ],
|
||||
"driver_type": "LAN",
|
||||
"cli_options": [ "--workaround-flags=..." ],
|
||||
"password": "<Password 2>"
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
Global settings:
|
||||
|
||||
- `interval`: How often the IPMI sensor metrics should be read and send to the sink (default: 30 s)
|
||||
|
||||
Global and per IPMI device settings (per IPMI device settings overwrite the global settings):
|
||||
|
||||
- `exclude_metrics`: list of excluded metrics e.g. fan_speed, power, temperature, utilization, voltage
|
||||
- `fanout`: Maximum number of simultaneous IPMI connections (default: 64)
|
||||
- `driver_type`: Out of band IPMI driver (default: LAN_2_0)
|
||||
- `username`: User name to authenticate with
|
||||
- `password`: Password to use for authentication
|
||||
- `endpoint`: URL of the IPMI device (placeholder `%h` gets replaced by the hostname)
|
||||
|
||||
Per IPMI device settings:
|
||||
|
||||
- `host_list`: List of hosts with the same client configuration
|
||||
- `cli_options`: Additional command line options for ipmi-sensors
|
@@ -2,7 +2,6 @@ package receivers
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"sync"
|
||||
|
||||
@@ -11,10 +10,8 @@ import (
|
||||
)
|
||||
|
||||
var AvailableReceivers = map[string]func(name string, config json.RawMessage) (Receiver, error){
|
||||
"ipmi": NewIPMIReceiver,
|
||||
"nats": NewNatsReceiver,
|
||||
"redfish": NewRedfishReceiver,
|
||||
"appmetrics": NewAppMetricReceiver,
|
||||
"nats": NewNatsReceiver,
|
||||
"redfish": NewRedfishReceiver,
|
||||
}
|
||||
|
||||
type receiveManager struct {
|
||||
@@ -74,13 +71,9 @@ func (rm *receiveManager) AddInput(name string, rawConfig json.RawMessage) error
|
||||
cclog.ComponentError("ReceiveManager", "SKIP", config.Type, "JSON config error:", err.Error())
|
||||
return err
|
||||
}
|
||||
if config.Type == "" {
|
||||
cclog.ComponentError("ReceiveManager", "SKIP", "JSON config for receiver", name, "does not contain a receiver type")
|
||||
return fmt.Errorf("JSON config for receiver %s does not contain a receiver type", name)
|
||||
}
|
||||
if _, found := AvailableReceivers[config.Type]; !found {
|
||||
cclog.ComponentError("ReceiveManager", "SKIP", "unknown receiver type:", config.Type)
|
||||
return fmt.Errorf("unknown receiver type: %s", config.Type)
|
||||
cclog.ComponentError("ReceiveManager", "SKIP", config.Type, "unknown receiver:", err.Error())
|
||||
return err
|
||||
}
|
||||
r, err := AvailableReceivers[config.Type](name, rawConfig)
|
||||
if err != nil {
|
||||
|
@@ -20,9 +20,7 @@ The configuration file for the sinks is a list of configurations. The `type` fie
|
||||
[
|
||||
"mystdout" : {
|
||||
"type" : "stdout",
|
||||
"meta_as_tags" : [
|
||||
"unit"
|
||||
]
|
||||
"meta_as_tags" : false
|
||||
},
|
||||
"metricstore" : {
|
||||
"type" : "http",
|
||||
@@ -105,4 +103,4 @@ func NewSampleSink(name string, config json.RawMessage) (Sink, error) {
|
||||
return s, err
|
||||
}
|
||||
|
||||
```
|
||||
```
|
Reference in New Issue
Block a user