mirror of
https://github.com/ClusterCockpit/cc-metric-collector.git
synced 2025-07-20 11:51:40 +02:00
Compare commits
9 Commits
lustre_job
...
v0.6.2
Author | SHA1 | Date | |
---|---|---|---|
|
e79601e2e8 | ||
|
317d36c9dd | ||
|
821d104656 | ||
|
be20f956c2 | ||
|
5b6a2b9018 | ||
|
3438972237 | ||
|
88fabc2e83 | ||
|
b3c27e0af5 | ||
|
2adf9484a3 |
8
Makefile
8
Makefile
@@ -112,7 +112,9 @@ DEB: scripts/cc-metric-collector.deb.control $(APP)
|
|||||||
#@mkdir --parents --verbose $$DEBIANDIR
|
#@mkdir --parents --verbose $$DEBIANDIR
|
||||||
@CONTROLFILE="$${BASEDIR}/scripts/cc-metric-collector.deb.control"
|
@CONTROLFILE="$${BASEDIR}/scripts/cc-metric-collector.deb.control"
|
||||||
@COMMITISH="HEAD"
|
@COMMITISH="HEAD"
|
||||||
|
@git describe --tags --abbrev=0 $${COMMITISH}
|
||||||
@VERS=$$(git describe --tags --abbrev=0 $${COMMITISH})
|
@VERS=$$(git describe --tags --abbrev=0 $${COMMITISH})
|
||||||
|
@if [ -z "$$VERS" ]; then VERS=${GITHUB_REF_NAME}; fi
|
||||||
@VERS=$${VERS#v}
|
@VERS=$${VERS#v}
|
||||||
@VERS=$$(echo $$VERS | sed -e s+'-'+'_'+g)
|
@VERS=$$(echo $$VERS | sed -e s+'-'+'_'+g)
|
||||||
@ARCH=$$(uname -m)
|
@ARCH=$$(uname -m)
|
||||||
@@ -121,8 +123,14 @@ DEB: scripts/cc-metric-collector.deb.control $(APP)
|
|||||||
@SIZE_BYTES=$$(du -bcs --exclude=.dpkgbuild "$$WORKSPACE"/ | awk '{print $$1}' | head -1 | sed -e 's/^0\+//')
|
@SIZE_BYTES=$$(du -bcs --exclude=.dpkgbuild "$$WORKSPACE"/ | awk '{print $$1}' | head -1 | sed -e 's/^0\+//')
|
||||||
@SIZE="$$(awk -v size="$$SIZE_BYTES" 'BEGIN {print (size/1024)+1}' | awk '{print int($$0)}')"
|
@SIZE="$$(awk -v size="$$SIZE_BYTES" 'BEGIN {print (size/1024)+1}' | awk '{print int($$0)}')"
|
||||||
#@sed -e s+"{VERSION}"+"$$VERS"+g -e s+"{INSTALLED_SIZE}"+"$$SIZE"+g -e s+"{ARCH}"+"$$ARCH"+g $$CONTROLFILE > $${DEBIANDIR}/control
|
#@sed -e s+"{VERSION}"+"$$VERS"+g -e s+"{INSTALLED_SIZE}"+"$$SIZE"+g -e s+"{ARCH}"+"$$ARCH"+g $$CONTROLFILE > $${DEBIANDIR}/control
|
||||||
|
@echo "Version: $$VERS"
|
||||||
|
@echo "Size: $$SIZE"
|
||||||
|
@echo "Arch: $$ARCH"
|
||||||
@sed -e s+"{VERSION}"+"$$VERS"+g -e s+"{INSTALLED_SIZE}"+"$$SIZE"+g -e s+"{ARCH}"+"$$ARCH"+g $$CONTROLFILE > $${DEBIANBINDIR}/control
|
@sed -e s+"{VERSION}"+"$$VERS"+g -e s+"{INSTALLED_SIZE}"+"$$SIZE"+g -e s+"{ARCH}"+"$$ARCH"+g $$CONTROLFILE > $${DEBIANBINDIR}/control
|
||||||
@make PREFIX=$${WORKSPACE} install
|
@make PREFIX=$${WORKSPACE} install
|
||||||
@DEB_FILE="cc-metric-collector_$${VERS}_$${ARCH}.deb"
|
@DEB_FILE="cc-metric-collector_$${VERS}_$${ARCH}.deb"
|
||||||
@dpkg-deb -b $${WORKSPACE} "$$DEB_FILE"
|
@dpkg-deb -b $${WORKSPACE} "$$DEB_FILE"
|
||||||
|
@if [ "$${GITHUB_ACTIONS}" = "true" ]; then
|
||||||
|
@ echo "::set-output name=DEB::$${DEB_FILE}"
|
||||||
|
@fi
|
||||||
@rm -r "$${WORKSPACE}"
|
@rm -r "$${WORKSPACE}"
|
||||||
|
@@ -20,7 +20,6 @@ var AvailableCollectors = map[string]MetricCollector{
|
|||||||
"netstat": new(NetstatCollector),
|
"netstat": new(NetstatCollector),
|
||||||
"ibstat": new(InfinibandCollector),
|
"ibstat": new(InfinibandCollector),
|
||||||
"lustrestat": new(LustreCollector),
|
"lustrestat": new(LustreCollector),
|
||||||
"lustre_jobstat": new(LustreJobstatCollector),
|
|
||||||
"cpustat": new(CpustatCollector),
|
"cpustat": new(CpustatCollector),
|
||||||
"topprocs": new(TopProcsCollector),
|
"topprocs": new(TopProcsCollector),
|
||||||
"nvidia": new(NvidiaCollector),
|
"nvidia": new(NvidiaCollector),
|
||||||
@@ -38,6 +37,7 @@ var AvailableCollectors = map[string]MetricCollector{
|
|||||||
"beegfs_meta": new(BeegfsMetaCollector),
|
"beegfs_meta": new(BeegfsMetaCollector),
|
||||||
"beegfs_storage": new(BeegfsStorageCollector),
|
"beegfs_storage": new(BeegfsStorageCollector),
|
||||||
"rocm_smi": new(RocmSmiCollector),
|
"rocm_smi": new(RocmSmiCollector),
|
||||||
|
"self": new(SelfCollector),
|
||||||
"schedstat": new(SchedstatCollector),
|
"schedstat": new(SchedstatCollector),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -1,270 +0,0 @@
|
|||||||
package collectors
|
|
||||||
|
|
||||||
import (
|
|
||||||
"encoding/json"
|
|
||||||
"errors"
|
|
||||||
"os/exec"
|
|
||||||
"regexp"
|
|
||||||
"strings"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
|
|
||||||
lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
|
|
||||||
)
|
|
||||||
|
|
||||||
type LustreJobstatCollectorConfig struct {
|
|
||||||
LCtlCommand string `json:"lctl_command,omitempty"`
|
|
||||||
ExcludeMetrics []string `json:"exclude_metrics,omitempty"`
|
|
||||||
Sudo bool `json:"use_sudo,omitempty"`
|
|
||||||
SendAbsoluteValues bool `json:"send_abs_values,omitempty"`
|
|
||||||
SendDerivedValues bool `json:"send_derived_values,omitempty"`
|
|
||||||
SendDiffValues bool `json:"send_diff_values,omitempty"`
|
|
||||||
JobRegex string `json:"jobid_regex,omitempty"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type LustreJobstatCollector struct {
|
|
||||||
metricCollector
|
|
||||||
tags map[string]string
|
|
||||||
config LustreJobstatCollectorConfig
|
|
||||||
lctl string
|
|
||||||
sudoCmd string
|
|
||||||
lastTimestamp time.Time // Store time stamp of last tick to derive bandwidths
|
|
||||||
definitions []LustreMetricDefinition // Combined list without excluded metrics
|
|
||||||
//stats map[string]map[string]int64 // Data for last value per device and metric
|
|
||||||
lastMdtData *map[string]map[string]LustreMetricData
|
|
||||||
lastObdfilterData *map[string]map[string]LustreMetricData
|
|
||||||
jobidRegex *regexp.Regexp
|
|
||||||
}
|
|
||||||
|
|
||||||
var defaultJobidRegex = `^(?P<jobid>[\d\w\.]+)$`
|
|
||||||
|
|
||||||
var LustreMetricJobstatsDefinition = []LustreMetricDefinition{
|
|
||||||
{
|
|
||||||
name: "lustre_job_read_samples",
|
|
||||||
lineprefix: "read",
|
|
||||||
offsetname: "samples",
|
|
||||||
unit: "requests",
|
|
||||||
calc: "none",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "lustre_job_read_min_bytes",
|
|
||||||
lineprefix: "read_bytes",
|
|
||||||
offsetname: "min",
|
|
||||||
unit: "bytes",
|
|
||||||
calc: "none",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "lustre_job_read_max_bytes",
|
|
||||||
lineprefix: "read_bytes",
|
|
||||||
offsetname: "max",
|
|
||||||
unit: "bytes",
|
|
||||||
calc: "none",
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *LustreJobstatCollector) executeLustreCommand(option string) []string {
|
|
||||||
return executeLustreCommand(m.sudoCmd, m.lctl, LCTL_OPTION, option, m.config.Sudo)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *LustreJobstatCollector) Init(config json.RawMessage) error {
|
|
||||||
var err error
|
|
||||||
m.name = "LustreJobstatCollector"
|
|
||||||
m.parallel = true
|
|
||||||
m.config.JobRegex = defaultJobidRegex
|
|
||||||
m.config.SendAbsoluteValues = true
|
|
||||||
if len(config) > 0 {
|
|
||||||
err = json.Unmarshal(config, &m.config)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
m.setup()
|
|
||||||
m.tags = map[string]string{"type": "jobid"}
|
|
||||||
m.meta = map[string]string{"source": m.name, "group": "Lustre", "scope": "job"}
|
|
||||||
|
|
||||||
// Lustre file system statistics can only be queried by user root
|
|
||||||
// or with password-less sudo
|
|
||||||
// if !m.config.Sudo {
|
|
||||||
// user, err := user.Current()
|
|
||||||
// if err != nil {
|
|
||||||
// cclog.ComponentError(m.name, "Failed to get current user:", err.Error())
|
|
||||||
// return err
|
|
||||||
// }
|
|
||||||
// if user.Uid != "0" {
|
|
||||||
// cclog.ComponentError(m.name, "Lustre file system statistics can only be queried by user root")
|
|
||||||
// return err
|
|
||||||
// }
|
|
||||||
// } else {
|
|
||||||
// p, err := exec.LookPath("sudo")
|
|
||||||
// if err != nil {
|
|
||||||
// cclog.ComponentError(m.name, "Cannot find 'sudo'")
|
|
||||||
// return err
|
|
||||||
// }
|
|
||||||
// m.sudoCmd = p
|
|
||||||
// }
|
|
||||||
|
|
||||||
p, err := exec.LookPath(m.config.LCtlCommand)
|
|
||||||
if err != nil {
|
|
||||||
p, err = exec.LookPath(LCTL_CMD)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
m.lctl = p
|
|
||||||
|
|
||||||
m.definitions = make([]LustreMetricDefinition, 0)
|
|
||||||
if m.config.SendAbsoluteValues {
|
|
||||||
for _, def := range LustreMetricJobstatsDefinition {
|
|
||||||
if _, skip := stringArrayContains(m.config.ExcludeMetrics, def.name); !skip {
|
|
||||||
m.definitions = append(m.definitions, def)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(m.definitions) == 0 {
|
|
||||||
return errors.New("no metrics to collect")
|
|
||||||
}
|
|
||||||
|
|
||||||
x := make(map[string]map[string]LustreMetricData)
|
|
||||||
m.lastMdtData = &x
|
|
||||||
x = make(map[string]map[string]LustreMetricData)
|
|
||||||
m.lastObdfilterData = &x
|
|
||||||
|
|
||||||
if len(m.config.JobRegex) > 0 {
|
|
||||||
jregex := strings.ReplaceAll(m.config.JobRegex, "%", "\\")
|
|
||||||
r, err := regexp.Compile(jregex)
|
|
||||||
if err == nil {
|
|
||||||
m.jobidRegex = r
|
|
||||||
} else {
|
|
||||||
cclog.ComponentError(m.name, "Cannot compile jobid regex")
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
m.lastTimestamp = time.Now()
|
|
||||||
m.init = true
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *LustreJobstatCollector) Read(interval time.Duration, output chan lp.CCMetric) {
|
|
||||||
if !m.init {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
getValue := func(data map[string]map[string]LustreMetricData, device string, jobid string, operation string, field string) int64 {
|
|
||||||
var value int64 = -1
|
|
||||||
if ddata, ok := data[device]; ok {
|
|
||||||
if jdata, ok := ddata[jobid]; ok {
|
|
||||||
if opdata, ok := jdata.op_data[operation]; ok {
|
|
||||||
if v, ok := opdata[field]; ok {
|
|
||||||
value = v
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return value
|
|
||||||
}
|
|
||||||
|
|
||||||
jobIdToTags := func(jobregex *regexp.Regexp, job string) map[string]string {
|
|
||||||
tags := make(map[string]string)
|
|
||||||
groups := jobregex.SubexpNames()
|
|
||||||
for _, match := range jobregex.FindAllStringSubmatch(job, -1) {
|
|
||||||
for groupIdx, group := range match {
|
|
||||||
if len(groups[groupIdx]) > 0 {
|
|
||||||
tags[groups[groupIdx]] = group
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return tags
|
|
||||||
}
|
|
||||||
|
|
||||||
generateMetric := func(definition LustreMetricDefinition, data map[string]map[string]LustreMetricData, last map[string]map[string]LustreMetricData, now time.Time) {
|
|
||||||
tdiff := now.Sub(m.lastTimestamp)
|
|
||||||
for dev, ddata := range data {
|
|
||||||
for jobid, jdata := range ddata {
|
|
||||||
jobtags := jobIdToTags(m.jobidRegex, jobid)
|
|
||||||
if _, ok := jobtags["jobid"]; !ok {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
cur := getValue(data, dev, jobid, definition.lineprefix, definition.offsetname)
|
|
||||||
old := getValue(last, dev, jobid, definition.lineprefix, definition.offsetname)
|
|
||||||
var x interface{} = -1
|
|
||||||
var valid = false
|
|
||||||
switch definition.calc {
|
|
||||||
case "none":
|
|
||||||
x = cur
|
|
||||||
valid = true
|
|
||||||
case "difference":
|
|
||||||
if len(last) > 0 {
|
|
||||||
if old >= 0 {
|
|
||||||
x = cur - old
|
|
||||||
valid = true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
case "derivative":
|
|
||||||
if len(last) > 0 {
|
|
||||||
if old >= 0 {
|
|
||||||
x = float64(cur-old) / tdiff.Seconds()
|
|
||||||
valid = true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if valid {
|
|
||||||
y, err := lp.New(definition.name, m.tags, m.meta, map[string]interface{}{"value": x}, now)
|
|
||||||
if err == nil {
|
|
||||||
y.AddTag("stype", "device")
|
|
||||||
y.AddTag("stype-id", dev)
|
|
||||||
if j, ok := jobtags["jobid"]; ok {
|
|
||||||
y.AddTag("type-id", j)
|
|
||||||
} else {
|
|
||||||
y.AddTag("type-id", jobid)
|
|
||||||
}
|
|
||||||
for k, v := range jobtags {
|
|
||||||
switch k {
|
|
||||||
case "jobid":
|
|
||||||
case "hostname":
|
|
||||||
y.AddTag("hostname", v)
|
|
||||||
default:
|
|
||||||
y.AddMeta(k, v)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if len(definition.unit) > 0 {
|
|
||||||
y.AddMeta("unit", definition.unit)
|
|
||||||
} else {
|
|
||||||
if unit, ok := jdata.op_units[definition.lineprefix]; ok {
|
|
||||||
y.AddMeta("unit", unit)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
output <- y
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
now := time.Now()
|
|
||||||
|
|
||||||
mdt_lines := m.executeLustreCommand("mdt.*.job_stats")
|
|
||||||
if len(mdt_lines) > 0 {
|
|
||||||
mdt_data := readCommandOutput(mdt_lines)
|
|
||||||
for _, def := range m.definitions {
|
|
||||||
generateMetric(def, mdt_data, *m.lastMdtData, now)
|
|
||||||
}
|
|
||||||
m.lastMdtData = &mdt_data
|
|
||||||
}
|
|
||||||
|
|
||||||
obdfilter_lines := m.executeLustreCommand("obdfilter.*.job_stats")
|
|
||||||
if len(obdfilter_lines) > 0 {
|
|
||||||
obdfilter_data := readCommandOutput(obdfilter_lines)
|
|
||||||
for _, def := range m.definitions {
|
|
||||||
generateMetric(def, obdfilter_data, *m.lastObdfilterData, now)
|
|
||||||
}
|
|
||||||
m.lastObdfilterData = &obdfilter_data
|
|
||||||
}
|
|
||||||
|
|
||||||
m.lastTimestamp = now
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *LustreJobstatCollector) Close() {
|
|
||||||
m.init = false
|
|
||||||
}
|
|
@@ -1,35 +0,0 @@
|
|||||||
|
|
||||||
## `lustre_jobstat` collector
|
|
||||||
|
|
||||||
**Note**: This collector is meant to run on the Lustre servers, **not** the clients
|
|
||||||
|
|
||||||
The Lustre filesystem provides a feature (`job_stats`) to group processes on client side with an identifier string (like a compute job with its jobid) and retrieve the file system operation counts on the server side. Check the section [How to configure `job_stats`]() for more information.
|
|
||||||
|
|
||||||
### Configuration
|
|
||||||
|
|
||||||
```json
|
|
||||||
"lustre_jobstat_": {
|
|
||||||
"lctl_command": "/path/to/lctl",
|
|
||||||
"use_sudo": false,
|
|
||||||
"exclude_metrics": [
|
|
||||||
"setattr",
|
|
||||||
"getattr"
|
|
||||||
],
|
|
||||||
"send_abs_values" : true,
|
|
||||||
|
|
||||||
"jobid_regex" : "^(?P<jobid>[%d%w%.]+)$"
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
The `lustre_jobstat` collector uses the `lctl` application with the `get_param` option to get all `mdt.*.job_stats` and `obdfilter.*.job_stats` metrics. These metrics are only available for root users. If password-less sudo is configured, you can enable `sudo` in the configuration. In the `exclude_metrics` list, some metrics can be excluded to reduce network traffic and storage. With the `send_abs_values` flag, the collector sends absolute values for the configured metrics. The `jobid_regex` can be used to split the Lustre `job_stats` identifier into multiple parts. Since JSON cannot handle strings like `\d`, use `%` instead of `\`.
|
|
||||||
|
|
||||||
Metrics:
|
|
||||||
- `lustre_job_read_samples` (unit: `requests`)
|
|
||||||
- `lustre_job_read_min_bytes` (unit: `bytes`)
|
|
||||||
- `lustre_job_read_max_bytes` (unit: `bytes`)
|
|
||||||
|
|
||||||
The collector adds the tags: `type=jobid,typeid=<jobid_from_regex>,stype=device,stype=<device_name_from_output>`.
|
|
||||||
|
|
||||||
The collector adds the mega information: `unit=<unit>,scope=job`
|
|
||||||
|
|
||||||
### How to configure `job_stats`
|
|
@@ -14,6 +14,10 @@ import (
|
|||||||
lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
|
lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const LUSTRE_SYSFS = `/sys/fs/lustre`
|
||||||
|
const LCTL_CMD = `lctl`
|
||||||
|
const LCTL_OPTION = `get_param`
|
||||||
|
|
||||||
type LustreCollectorConfig struct {
|
type LustreCollectorConfig struct {
|
||||||
LCtlCommand string `json:"lctl_command,omitempty"`
|
LCtlCommand string `json:"lctl_command,omitempty"`
|
||||||
ExcludeMetrics []string `json:"exclude_metrics,omitempty"`
|
ExcludeMetrics []string `json:"exclude_metrics,omitempty"`
|
||||||
@@ -23,6 +27,14 @@ type LustreCollectorConfig struct {
|
|||||||
SendDiffValues bool `json:"send_diff_values,omitempty"`
|
SendDiffValues bool `json:"send_diff_values,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type LustreMetricDefinition struct {
|
||||||
|
name string
|
||||||
|
lineprefix string
|
||||||
|
lineoffset int
|
||||||
|
unit string
|
||||||
|
calc string
|
||||||
|
}
|
||||||
|
|
||||||
type LustreCollector struct {
|
type LustreCollector struct {
|
||||||
metricCollector
|
metricCollector
|
||||||
tags map[string]string
|
tags map[string]string
|
||||||
@@ -34,209 +46,17 @@ type LustreCollector struct {
|
|||||||
stats map[string]map[string]int64 // Data for last value per device and metric
|
stats map[string]map[string]int64 // Data for last value per device and metric
|
||||||
}
|
}
|
||||||
|
|
||||||
var LustreAbsMetrics = []LustreMetricDefinition{
|
|
||||||
{
|
|
||||||
name: "lustre_read_requests",
|
|
||||||
lineprefix: "read_bytes",
|
|
||||||
lineoffset: 1,
|
|
||||||
offsetname: "samples",
|
|
||||||
unit: "requests",
|
|
||||||
calc: "none",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "lustre_write_requests",
|
|
||||||
lineprefix: "write_bytes",
|
|
||||||
lineoffset: 1,
|
|
||||||
offsetname: "samples",
|
|
||||||
unit: "requests",
|
|
||||||
calc: "none",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "lustre_read_bytes",
|
|
||||||
lineprefix: "read_bytes",
|
|
||||||
lineoffset: 6,
|
|
||||||
offsetname: "sum",
|
|
||||||
unit: "bytes",
|
|
||||||
calc: "none",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "lustre_write_bytes",
|
|
||||||
lineprefix: "write_bytes",
|
|
||||||
lineoffset: 6,
|
|
||||||
offsetname: "sum",
|
|
||||||
unit: "bytes",
|
|
||||||
calc: "none",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "lustre_open",
|
|
||||||
lineprefix: "open",
|
|
||||||
lineoffset: 1,
|
|
||||||
offsetname: "samples",
|
|
||||||
unit: "",
|
|
||||||
calc: "none",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "lustre_close",
|
|
||||||
lineprefix: "close",
|
|
||||||
lineoffset: 1,
|
|
||||||
offsetname: "samples",
|
|
||||||
unit: "",
|
|
||||||
calc: "none",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "lustre_setattr",
|
|
||||||
lineprefix: "setattr",
|
|
||||||
lineoffset: 1,
|
|
||||||
offsetname: "samples",
|
|
||||||
unit: "",
|
|
||||||
calc: "none",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "lustre_getattr",
|
|
||||||
lineprefix: "getattr",
|
|
||||||
lineoffset: 1,
|
|
||||||
offsetname: "samples",
|
|
||||||
unit: "",
|
|
||||||
calc: "none",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "lustre_statfs",
|
|
||||||
lineprefix: "statfs",
|
|
||||||
lineoffset: 1,
|
|
||||||
offsetname: "samples",
|
|
||||||
unit: "",
|
|
||||||
calc: "none",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "lustre_inode_permission",
|
|
||||||
lineprefix: "inode_permission",
|
|
||||||
lineoffset: 1,
|
|
||||||
offsetname: "samples",
|
|
||||||
unit: "",
|
|
||||||
calc: "none",
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
var LustreDiffMetrics = []LustreMetricDefinition{
|
|
||||||
{
|
|
||||||
name: "lustre_read_requests_diff",
|
|
||||||
lineprefix: "read_bytes",
|
|
||||||
lineoffset: 1,
|
|
||||||
offsetname: "samples",
|
|
||||||
unit: "requests",
|
|
||||||
calc: "difference",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "lustre_write_requests_diff",
|
|
||||||
lineprefix: "write_bytes",
|
|
||||||
lineoffset: 1,
|
|
||||||
offsetname: "samples",
|
|
||||||
unit: "requests",
|
|
||||||
calc: "difference",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "lustre_read_bytes_diff",
|
|
||||||
lineprefix: "read_bytes",
|
|
||||||
lineoffset: 6,
|
|
||||||
offsetname: "sum",
|
|
||||||
unit: "bytes",
|
|
||||||
calc: "difference",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "lustre_write_bytes_diff",
|
|
||||||
lineprefix: "write_bytes",
|
|
||||||
lineoffset: 6,
|
|
||||||
offsetname: "sum",
|
|
||||||
unit: "bytes",
|
|
||||||
calc: "difference",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "lustre_open_diff",
|
|
||||||
lineprefix: "open",
|
|
||||||
lineoffset: 1,
|
|
||||||
offsetname: "samples",
|
|
||||||
unit: "",
|
|
||||||
calc: "difference",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "lustre_close_diff",
|
|
||||||
lineprefix: "close",
|
|
||||||
lineoffset: 1,
|
|
||||||
offsetname: "samples",
|
|
||||||
unit: "",
|
|
||||||
calc: "difference",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "lustre_setattr_diff",
|
|
||||||
lineprefix: "setattr",
|
|
||||||
lineoffset: 1,
|
|
||||||
offsetname: "samples",
|
|
||||||
unit: "",
|
|
||||||
calc: "difference",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "lustre_getattr_diff",
|
|
||||||
lineprefix: "getattr",
|
|
||||||
lineoffset: 1,
|
|
||||||
offsetname: "samples",
|
|
||||||
unit: "",
|
|
||||||
calc: "difference",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "lustre_statfs_diff",
|
|
||||||
lineprefix: "statfs",
|
|
||||||
lineoffset: 1,
|
|
||||||
offsetname: "samples",
|
|
||||||
unit: "",
|
|
||||||
calc: "difference",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "lustre_inode_permission_diff",
|
|
||||||
lineprefix: "inode_permission",
|
|
||||||
lineoffset: 1,
|
|
||||||
offsetname: "samples",
|
|
||||||
unit: "",
|
|
||||||
calc: "difference",
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
var LustreDeriveMetrics = []LustreMetricDefinition{
|
|
||||||
{
|
|
||||||
name: "lustre_read_requests_rate",
|
|
||||||
lineprefix: "read_bytes",
|
|
||||||
lineoffset: 1,
|
|
||||||
offsetname: "samples",
|
|
||||||
unit: "requests/sec",
|
|
||||||
calc: "derivative",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "lustre_write_requests_rate",
|
|
||||||
lineprefix: "write_bytes",
|
|
||||||
lineoffset: 1,
|
|
||||||
offsetname: "samples",
|
|
||||||
unit: "requests/sec",
|
|
||||||
calc: "derivative",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "lustre_read_bw",
|
|
||||||
lineprefix: "read_bytes",
|
|
||||||
lineoffset: 6,
|
|
||||||
offsetname: "sum",
|
|
||||||
unit: "bytes/sec",
|
|
||||||
calc: "derivative",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "lustre_write_bw",
|
|
||||||
lineprefix: "write_bytes",
|
|
||||||
lineoffset: 6,
|
|
||||||
offsetname: "sum",
|
|
||||||
unit: "bytes/sec",
|
|
||||||
calc: "derivative",
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *LustreCollector) getDeviceDataCommand(device string) []string {
|
func (m *LustreCollector) getDeviceDataCommand(device string) []string {
|
||||||
return executeLustreCommand(m.sudoCmd, m.lctl, LCTL_OPTION, fmt.Sprintf("llite.%s.stats", device), m.config.Sudo)
|
var command *exec.Cmd
|
||||||
|
statsfile := fmt.Sprintf("llite.%s.stats", device)
|
||||||
|
if m.config.Sudo {
|
||||||
|
command = exec.Command(m.sudoCmd, m.lctl, LCTL_OPTION, statsfile)
|
||||||
|
} else {
|
||||||
|
command = exec.Command(m.lctl, LCTL_OPTION, statsfile)
|
||||||
|
}
|
||||||
|
command.Wait()
|
||||||
|
stdout, _ := command.Output()
|
||||||
|
return strings.Split(string(stdout), "\n")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *LustreCollector) getDevices() []string {
|
func (m *LustreCollector) getDevices() []string {
|
||||||
@@ -288,6 +108,183 @@ func getMetricData(lines []string, prefix string, offset int) (int64, error) {
|
|||||||
// return strings.Split(string(buffer), "\n")
|
// return strings.Split(string(buffer), "\n")
|
||||||
// }
|
// }
|
||||||
|
|
||||||
|
var LustreAbsMetrics = []LustreMetricDefinition{
|
||||||
|
{
|
||||||
|
name: "lustre_read_requests",
|
||||||
|
lineprefix: "read_bytes",
|
||||||
|
lineoffset: 1,
|
||||||
|
unit: "requests",
|
||||||
|
calc: "none",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "lustre_write_requests",
|
||||||
|
lineprefix: "write_bytes",
|
||||||
|
lineoffset: 1,
|
||||||
|
unit: "requests",
|
||||||
|
calc: "none",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "lustre_read_bytes",
|
||||||
|
lineprefix: "read_bytes",
|
||||||
|
lineoffset: 6,
|
||||||
|
unit: "bytes",
|
||||||
|
calc: "none",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "lustre_write_bytes",
|
||||||
|
lineprefix: "write_bytes",
|
||||||
|
lineoffset: 6,
|
||||||
|
unit: "bytes",
|
||||||
|
calc: "none",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "lustre_open",
|
||||||
|
lineprefix: "open",
|
||||||
|
lineoffset: 1,
|
||||||
|
unit: "",
|
||||||
|
calc: "none",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "lustre_close",
|
||||||
|
lineprefix: "close",
|
||||||
|
lineoffset: 1,
|
||||||
|
unit: "",
|
||||||
|
calc: "none",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "lustre_setattr",
|
||||||
|
lineprefix: "setattr",
|
||||||
|
lineoffset: 1,
|
||||||
|
unit: "",
|
||||||
|
calc: "none",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "lustre_getattr",
|
||||||
|
lineprefix: "getattr",
|
||||||
|
lineoffset: 1,
|
||||||
|
unit: "",
|
||||||
|
calc: "none",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "lustre_statfs",
|
||||||
|
lineprefix: "statfs",
|
||||||
|
lineoffset: 1,
|
||||||
|
unit: "",
|
||||||
|
calc: "none",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "lustre_inode_permission",
|
||||||
|
lineprefix: "inode_permission",
|
||||||
|
lineoffset: 1,
|
||||||
|
unit: "",
|
||||||
|
calc: "none",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
var LustreDiffMetrics = []LustreMetricDefinition{
|
||||||
|
{
|
||||||
|
name: "lustre_read_requests_diff",
|
||||||
|
lineprefix: "read_bytes",
|
||||||
|
lineoffset: 1,
|
||||||
|
unit: "requests",
|
||||||
|
calc: "difference",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "lustre_write_requests_diff",
|
||||||
|
lineprefix: "write_bytes",
|
||||||
|
lineoffset: 1,
|
||||||
|
unit: "requests",
|
||||||
|
calc: "difference",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "lustre_read_bytes_diff",
|
||||||
|
lineprefix: "read_bytes",
|
||||||
|
lineoffset: 6,
|
||||||
|
unit: "bytes",
|
||||||
|
calc: "difference",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "lustre_write_bytes_diff",
|
||||||
|
lineprefix: "write_bytes",
|
||||||
|
lineoffset: 6,
|
||||||
|
unit: "bytes",
|
||||||
|
calc: "difference",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "lustre_open_diff",
|
||||||
|
lineprefix: "open",
|
||||||
|
lineoffset: 1,
|
||||||
|
unit: "",
|
||||||
|
calc: "difference",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "lustre_close_diff",
|
||||||
|
lineprefix: "close",
|
||||||
|
lineoffset: 1,
|
||||||
|
unit: "",
|
||||||
|
calc: "difference",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "lustre_setattr_diff",
|
||||||
|
lineprefix: "setattr",
|
||||||
|
lineoffset: 1,
|
||||||
|
unit: "",
|
||||||
|
calc: "difference",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "lustre_getattr_diff",
|
||||||
|
lineprefix: "getattr",
|
||||||
|
lineoffset: 1,
|
||||||
|
unit: "",
|
||||||
|
calc: "difference",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "lustre_statfs_diff",
|
||||||
|
lineprefix: "statfs",
|
||||||
|
lineoffset: 1,
|
||||||
|
unit: "",
|
||||||
|
calc: "difference",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "lustre_inode_permission_diff",
|
||||||
|
lineprefix: "inode_permission",
|
||||||
|
lineoffset: 1,
|
||||||
|
unit: "",
|
||||||
|
calc: "difference",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
var LustreDeriveMetrics = []LustreMetricDefinition{
|
||||||
|
{
|
||||||
|
name: "lustre_read_requests_rate",
|
||||||
|
lineprefix: "read_bytes",
|
||||||
|
lineoffset: 1,
|
||||||
|
unit: "requests/sec",
|
||||||
|
calc: "derivative",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "lustre_write_requests_rate",
|
||||||
|
lineprefix: "write_bytes",
|
||||||
|
lineoffset: 1,
|
||||||
|
unit: "requests/sec",
|
||||||
|
calc: "derivative",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "lustre_read_bw",
|
||||||
|
lineprefix: "read_bytes",
|
||||||
|
lineoffset: 6,
|
||||||
|
unit: "bytes/sec",
|
||||||
|
calc: "derivative",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "lustre_write_bw",
|
||||||
|
lineprefix: "write_bytes",
|
||||||
|
lineoffset: 6,
|
||||||
|
unit: "bytes/sec",
|
||||||
|
calc: "derivative",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
func (m *LustreCollector) Init(config json.RawMessage) error {
|
func (m *LustreCollector) Init(config json.RawMessage) error {
|
||||||
var err error
|
var err error
|
||||||
m.name = "LustreCollector"
|
m.name = "LustreCollector"
|
||||||
@@ -300,7 +297,7 @@ func (m *LustreCollector) Init(config json.RawMessage) error {
|
|||||||
}
|
}
|
||||||
m.setup()
|
m.setup()
|
||||||
m.tags = map[string]string{"type": "node"}
|
m.tags = map[string]string{"type": "node"}
|
||||||
m.meta = map[string]string{"source": m.name, "group": "Lustre", "scope": "node"}
|
m.meta = map[string]string{"source": m.name, "group": "Lustre"}
|
||||||
|
|
||||||
// Lustre file system statistics can only be queried by user root
|
// Lustre file system statistics can only be queried by user root
|
||||||
// or with password-less sudo
|
// or with password-less sudo
|
||||||
|
@@ -1,190 +0,0 @@
|
|||||||
package collectors
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"os/exec"
|
|
||||||
"regexp"
|
|
||||||
"strconv"
|
|
||||||
"strings"
|
|
||||||
)
|
|
||||||
|
|
||||||
const LUSTRE_SYSFS = `/sys/fs/lustre`
|
|
||||||
const LCTL_CMD = `lctl`
|
|
||||||
const LCTL_OPTION = `get_param`
|
|
||||||
|
|
||||||
type LustreMetricDefinition struct {
|
|
||||||
name string
|
|
||||||
lineprefix string
|
|
||||||
lineoffset int
|
|
||||||
offsetname string
|
|
||||||
unit string
|
|
||||||
calc string
|
|
||||||
}
|
|
||||||
|
|
||||||
type LustreMetricData struct {
|
|
||||||
sample_time int64
|
|
||||||
start_time int64
|
|
||||||
elapsed_time int64
|
|
||||||
op_data map[string]map[string]int64
|
|
||||||
op_units map[string]string
|
|
||||||
sample_time_unit string
|
|
||||||
start_time_unit string
|
|
||||||
elapsed_time_unit string
|
|
||||||
}
|
|
||||||
|
|
||||||
var devicePattern = regexp.MustCompile(`^[\w\d\-_]+\.([\w\d\-_]+)\.[\w\d\-_]+=$`)
|
|
||||||
var jobPattern = regexp.MustCompile(`^-\s*job_id:\s*([\w\d\-_\.:]+)$`)
|
|
||||||
var snapshotPattern = regexp.MustCompile(`^\s*snapshot_time\s*:\s*([\d\.]+)\s*([\w\d\-_\.]*)$`)
|
|
||||||
var startPattern = regexp.MustCompile(`^\s*start_time\s*:\s*([\d\.]+)\s*([\w\d\-_\.]*)$`)
|
|
||||||
var elapsedPattern = regexp.MustCompile(`^\s*elapsed_time\s*:\s*([\d\.]+)\s*([\w\d\-_\.]*)$`)
|
|
||||||
var linePattern = regexp.MustCompile(`^\s*([\w\d\-_\.]+):\s*\{\s*samples:\s*([\d\.]+),\s*unit:\s*([\w\d\-_\.]+),\s*min:\s*([\d\.]+),\s*max:\s*([\d\.]+),\s*sum:\s*([\d\.]+),\s*sumsq:\s*([\d\.]+)\s*\}`)
|
|
||||||
|
|
||||||
func executeLustreCommand(sudo, lctl, option, search string, use_sudo bool) []string {
|
|
||||||
var command *exec.Cmd
|
|
||||||
if use_sudo {
|
|
||||||
command = exec.Command(sudo, lctl, option, search)
|
|
||||||
} else {
|
|
||||||
command = exec.Command(lctl, option, search)
|
|
||||||
}
|
|
||||||
command.Wait()
|
|
||||||
stdout, _ := command.Output()
|
|
||||||
return strings.Split(string(stdout), "\n")
|
|
||||||
}
|
|
||||||
|
|
||||||
func splitTree(lines []string, splitRegex *regexp.Regexp) map[string][]string {
|
|
||||||
entries := make(map[string][]string)
|
|
||||||
ent_lines := make([]int, 0)
|
|
||||||
for i, l := range lines {
|
|
||||||
m := splitRegex.FindStringSubmatch(l)
|
|
||||||
if len(m) == 2 {
|
|
||||||
ent_lines = append(ent_lines, i)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if len(ent_lines) > 0 {
|
|
||||||
for i, idx := range ent_lines[:len(ent_lines)-1] {
|
|
||||||
m := splitRegex.FindStringSubmatch(lines[idx])
|
|
||||||
entries[m[1]] = lines[idx+1 : ent_lines[i+1]]
|
|
||||||
}
|
|
||||||
last := ent_lines[len(ent_lines)-1]
|
|
||||||
m := splitRegex.FindStringSubmatch(lines[last])
|
|
||||||
entries[m[1]] = lines[last:]
|
|
||||||
}
|
|
||||||
return entries
|
|
||||||
}
|
|
||||||
|
|
||||||
func readDevices(lines []string) map[string][]string {
|
|
||||||
return splitTree(lines, devicePattern)
|
|
||||||
}
|
|
||||||
|
|
||||||
func readJobs(lines []string) map[string][]string {
|
|
||||||
return splitTree(lines, jobPattern)
|
|
||||||
}
|
|
||||||
|
|
||||||
func readJobdata(lines []string) LustreMetricData {
|
|
||||||
|
|
||||||
jobdata := LustreMetricData{
|
|
||||||
op_data: make(map[string]map[string]int64),
|
|
||||||
op_units: make(map[string]string),
|
|
||||||
sample_time: 0,
|
|
||||||
sample_time_unit: "nsec",
|
|
||||||
start_time: 0,
|
|
||||||
start_time_unit: "nsec",
|
|
||||||
elapsed_time: 0,
|
|
||||||
elapsed_time_unit: "nsec",
|
|
||||||
}
|
|
||||||
parseTime := func(value, unit string) int64 {
|
|
||||||
var t int64 = 0
|
|
||||||
if len(unit) == 0 {
|
|
||||||
unit = "secs"
|
|
||||||
}
|
|
||||||
values := strings.Split(value, ".")
|
|
||||||
units := strings.Split(unit, ".")
|
|
||||||
if len(values) != len(units) {
|
|
||||||
fmt.Printf("Invalid time specification '%s' and '%s'\n", value, unit)
|
|
||||||
}
|
|
||||||
for i, v := range values {
|
|
||||||
if len(units) > i {
|
|
||||||
s, err := strconv.ParseInt(v, 10, 64)
|
|
||||||
if err == nil {
|
|
||||||
switch units[i] {
|
|
||||||
case "secs":
|
|
||||||
t += s * 1e9
|
|
||||||
case "msecs":
|
|
||||||
t += s * 1e6
|
|
||||||
case "usecs":
|
|
||||||
t += s * 1e3
|
|
||||||
case "nsecs":
|
|
||||||
t += s
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return t
|
|
||||||
}
|
|
||||||
parseNumber := func(value string) int64 {
|
|
||||||
s, err := strconv.ParseInt(value, 10, 64)
|
|
||||||
if err == nil {
|
|
||||||
return s
|
|
||||||
}
|
|
||||||
return 0
|
|
||||||
}
|
|
||||||
for _, l := range lines {
|
|
||||||
if jobdata.sample_time == 0 {
|
|
||||||
m := snapshotPattern.FindStringSubmatch(l)
|
|
||||||
if len(m) == 3 {
|
|
||||||
if len(m[2]) > 0 {
|
|
||||||
jobdata.sample_time = parseTime(m[1], m[2])
|
|
||||||
} else {
|
|
||||||
jobdata.sample_time = parseTime(m[1], "secs")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if jobdata.start_time == 0 {
|
|
||||||
m := startPattern.FindStringSubmatch(l)
|
|
||||||
if len(m) == 3 {
|
|
||||||
if len(m[2]) > 0 {
|
|
||||||
jobdata.start_time = parseTime(m[1], m[2])
|
|
||||||
} else {
|
|
||||||
jobdata.start_time = parseTime(m[1], "secs")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if jobdata.elapsed_time == 0 {
|
|
||||||
m := elapsedPattern.FindStringSubmatch(l)
|
|
||||||
if len(m) == 3 {
|
|
||||||
if len(m[2]) > 0 {
|
|
||||||
jobdata.elapsed_time = parseTime(m[1], m[2])
|
|
||||||
} else {
|
|
||||||
jobdata.elapsed_time = parseTime(m[1], "secs")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
m := linePattern.FindStringSubmatch(l)
|
|
||||||
if len(m) == 8 {
|
|
||||||
jobdata.op_units[m[1]] = m[3]
|
|
||||||
jobdata.op_data[m[1]] = map[string]int64{
|
|
||||||
"samples": parseNumber(m[2]),
|
|
||||||
"min": parseNumber(m[4]),
|
|
||||||
"max": parseNumber(m[5]),
|
|
||||||
"sum": parseNumber(m[6]),
|
|
||||||
"sumsq": parseNumber(m[7]),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return jobdata
|
|
||||||
}
|
|
||||||
|
|
||||||
func readCommandOutput(lines []string) map[string]map[string]LustreMetricData {
|
|
||||||
var data map[string]map[string]LustreMetricData = make(map[string]map[string]LustreMetricData)
|
|
||||||
devs := readDevices(lines)
|
|
||||||
for d, ddata := range devs {
|
|
||||||
data[d] = make(map[string]LustreMetricData)
|
|
||||||
jobs := readJobs(ddata)
|
|
||||||
for j, jdata := range jobs {
|
|
||||||
x := readJobdata(jdata)
|
|
||||||
data[d][j] = x
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return data
|
|
||||||
}
|
|
144
collectors/selfMetric.go
Normal file
144
collectors/selfMetric.go
Normal file
@@ -0,0 +1,144 @@
|
|||||||
|
package collectors
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"runtime"
|
||||||
|
"syscall"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
|
||||||
|
lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
|
||||||
|
)
|
||||||
|
|
||||||
|
type SelfCollectorConfig struct {
|
||||||
|
MemStats bool `json:"read_mem_stats"`
|
||||||
|
GoRoutines bool `json:"read_goroutines"`
|
||||||
|
CgoCalls bool `json:"read_cgo_calls"`
|
||||||
|
Rusage bool `json:"read_rusage"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type SelfCollector struct {
|
||||||
|
metricCollector
|
||||||
|
config SelfCollectorConfig // the configuration structure
|
||||||
|
meta map[string]string // default meta information
|
||||||
|
tags map[string]string // default tags
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *SelfCollector) Init(config json.RawMessage) error {
|
||||||
|
var err error = nil
|
||||||
|
m.name = "SelfCollector"
|
||||||
|
m.setup()
|
||||||
|
m.parallel = true
|
||||||
|
m.meta = map[string]string{"source": m.name, "group": "Self"}
|
||||||
|
m.tags = map[string]string{"type": "node"}
|
||||||
|
if len(config) > 0 {
|
||||||
|
err = json.Unmarshal(config, &m.config)
|
||||||
|
if err != nil {
|
||||||
|
cclog.ComponentError(m.name, "Error reading config:", err.Error())
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
m.init = true
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *SelfCollector) Read(interval time.Duration, output chan lp.CCMetric) {
|
||||||
|
timestamp := time.Now()
|
||||||
|
|
||||||
|
if m.config.MemStats {
|
||||||
|
var memstats runtime.MemStats
|
||||||
|
runtime.ReadMemStats(&memstats)
|
||||||
|
|
||||||
|
y, err := lp.New("total_alloc", m.tags, m.meta, map[string]interface{}{"value": memstats.TotalAlloc}, timestamp)
|
||||||
|
if err == nil {
|
||||||
|
y.AddMeta("unit", "Bytes")
|
||||||
|
output <- y
|
||||||
|
}
|
||||||
|
y, err = lp.New("heap_alloc", m.tags, m.meta, map[string]interface{}{"value": memstats.HeapAlloc}, timestamp)
|
||||||
|
if err == nil {
|
||||||
|
y.AddMeta("unit", "Bytes")
|
||||||
|
output <- y
|
||||||
|
}
|
||||||
|
y, err = lp.New("heap_sys", m.tags, m.meta, map[string]interface{}{"value": memstats.HeapSys}, timestamp)
|
||||||
|
if err == nil {
|
||||||
|
y.AddMeta("unit", "Bytes")
|
||||||
|
output <- y
|
||||||
|
}
|
||||||
|
y, err = lp.New("heap_idle", m.tags, m.meta, map[string]interface{}{"value": memstats.HeapIdle}, timestamp)
|
||||||
|
if err == nil {
|
||||||
|
y.AddMeta("unit", "Bytes")
|
||||||
|
output <- y
|
||||||
|
}
|
||||||
|
y, err = lp.New("heap_inuse", m.tags, m.meta, map[string]interface{}{"value": memstats.HeapInuse}, timestamp)
|
||||||
|
if err == nil {
|
||||||
|
y.AddMeta("unit", "Bytes")
|
||||||
|
output <- y
|
||||||
|
}
|
||||||
|
y, err = lp.New("heap_released", m.tags, m.meta, map[string]interface{}{"value": memstats.HeapReleased}, timestamp)
|
||||||
|
if err == nil {
|
||||||
|
y.AddMeta("unit", "Bytes")
|
||||||
|
output <- y
|
||||||
|
}
|
||||||
|
y, err = lp.New("heap_objects", m.tags, m.meta, map[string]interface{}{"value": memstats.HeapObjects}, timestamp)
|
||||||
|
if err == nil {
|
||||||
|
output <- y
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if m.config.GoRoutines {
|
||||||
|
y, err := lp.New("num_goroutines", m.tags, m.meta, map[string]interface{}{"value": runtime.NumGoroutine()}, timestamp)
|
||||||
|
if err == nil {
|
||||||
|
output <- y
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if m.config.CgoCalls {
|
||||||
|
y, err := lp.New("num_cgo_calls", m.tags, m.meta, map[string]interface{}{"value": runtime.NumCgoCall()}, timestamp)
|
||||||
|
if err == nil {
|
||||||
|
output <- y
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if m.config.Rusage {
|
||||||
|
var rusage syscall.Rusage
|
||||||
|
err := syscall.Getrusage(syscall.RUSAGE_SELF, &rusage)
|
||||||
|
if err == nil {
|
||||||
|
sec, nsec := rusage.Utime.Unix()
|
||||||
|
t := float64(sec) + (float64(nsec) * 1e-9)
|
||||||
|
y, err := lp.New("rusage_user_time", m.tags, m.meta, map[string]interface{}{"value": t}, timestamp)
|
||||||
|
if err == nil {
|
||||||
|
y.AddMeta("unit", "seconds")
|
||||||
|
output <- y
|
||||||
|
}
|
||||||
|
sec, nsec = rusage.Stime.Unix()
|
||||||
|
t = float64(sec) + (float64(nsec) * 1e-9)
|
||||||
|
y, err = lp.New("rusage_system_time", m.tags, m.meta, map[string]interface{}{"value": t}, timestamp)
|
||||||
|
if err == nil {
|
||||||
|
y.AddMeta("unit", "seconds")
|
||||||
|
output <- y
|
||||||
|
}
|
||||||
|
y, err = lp.New("rusage_vol_ctx_switch", m.tags, m.meta, map[string]interface{}{"value": rusage.Nvcsw}, timestamp)
|
||||||
|
if err == nil {
|
||||||
|
output <- y
|
||||||
|
}
|
||||||
|
y, err = lp.New("rusage_invol_ctx_switch", m.tags, m.meta, map[string]interface{}{"value": rusage.Nivcsw}, timestamp)
|
||||||
|
if err == nil {
|
||||||
|
output <- y
|
||||||
|
}
|
||||||
|
y, err = lp.New("rusage_signals", m.tags, m.meta, map[string]interface{}{"value": rusage.Nsignals}, timestamp)
|
||||||
|
if err == nil {
|
||||||
|
output <- y
|
||||||
|
}
|
||||||
|
y, err = lp.New("rusage_major_pgfaults", m.tags, m.meta, map[string]interface{}{"value": rusage.Majflt}, timestamp)
|
||||||
|
if err == nil {
|
||||||
|
output <- y
|
||||||
|
}
|
||||||
|
y, err = lp.New("rusage_minor_pgfaults", m.tags, m.meta, map[string]interface{}{"value": rusage.Minflt}, timestamp)
|
||||||
|
if err == nil {
|
||||||
|
output <- y
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *SelfCollector) Close() {
|
||||||
|
m.init = false
|
||||||
|
}
|
34
collectors/selfMetric.md
Normal file
34
collectors/selfMetric.md
Normal file
@@ -0,0 +1,34 @@
|
|||||||
|
## `self` collector
|
||||||
|
|
||||||
|
```json
|
||||||
|
"self": {
|
||||||
|
"read_mem_stats" : true,
|
||||||
|
"read_goroutines" : true,
|
||||||
|
"read_cgo_calls" : true,
|
||||||
|
"read_rusage" : true
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
The `self` collector reads the data from the `runtime` and `syscall` packages, so monitors the execution of the cc-metric-collector itself.
|
||||||
|
|
||||||
|
Metrics:
|
||||||
|
* If `read_mem_stats == true`:
|
||||||
|
* `total_alloc`: The metric reports cumulative bytes allocated for heap objects.
|
||||||
|
* `heap_alloc`: The metric reports bytes of allocated heap objects.
|
||||||
|
* `heap_sys`: The metric reports bytes of heap memory obtained from the OS.
|
||||||
|
* `heap_idle`: The metric reports bytes in idle (unused) spans.
|
||||||
|
* `heap_inuse`: The metric reports bytes in in-use spans.
|
||||||
|
* `heap_released`: The metric reports bytes of physical memory returned to the OS.
|
||||||
|
* `heap_objects`: The metric reports the number of allocated heap objects.
|
||||||
|
* If `read_goroutines == true`:
|
||||||
|
* `num_goroutines`: The metric reports the number of goroutines that currently exist.
|
||||||
|
* If `read_cgo_calls == true`:
|
||||||
|
* `num_cgo_calls`: The metric reports the number of cgo calls made by the current process.
|
||||||
|
* If `read_rusage == true`:
|
||||||
|
* `rusage_user_time`: The metric reports the amount of time that this process has been scheduled in user mode.
|
||||||
|
* `rusage_system_time`: The metric reports the amount of time that this process has been scheduled in kernel mode.
|
||||||
|
* `rusage_vol_ctx_switch`: The metric reports the amount of voluntary context switches.
|
||||||
|
* `rusage_invol_ctx_switch`: The metric reports the amount of involuntary context switches.
|
||||||
|
* `rusage_signals`: The metric reports the number of signals received.
|
||||||
|
* `rusage_major_pgfaults`: The metric reports the number of major faults the process has made which have required loading a memory page from disk.
|
||||||
|
* `rusage_minor_pgfaults`: The metric reports the number of minor faults the process has made which have not required loading a memory page from disk.
|
2
go.mod
2
go.mod
@@ -8,6 +8,7 @@ require (
|
|||||||
github.com/NVIDIA/go-nvml v0.11.6-0
|
github.com/NVIDIA/go-nvml v0.11.6-0
|
||||||
github.com/PaesslerAG/gval v1.2.0
|
github.com/PaesslerAG/gval v1.2.0
|
||||||
github.com/gorilla/mux v1.8.0
|
github.com/gorilla/mux v1.8.0
|
||||||
|
github.com/influxdata/influxdb-client-go/v2 v2.9.0
|
||||||
github.com/influxdata/influxdb-client-go/v2 v2.9.1
|
github.com/influxdata/influxdb-client-go/v2 v2.9.1
|
||||||
github.com/influxdata/line-protocol v0.0.0-20210922203350-b1ad95c89adf
|
github.com/influxdata/line-protocol v0.0.0-20210922203350-b1ad95c89adf
|
||||||
github.com/nats-io/nats.go v1.16.0
|
github.com/nats-io/nats.go v1.16.0
|
||||||
@@ -17,6 +18,7 @@ require (
|
|||||||
golang.org/x/sys v0.0.0-20220712014510-0a85c31ab51e
|
golang.org/x/sys v0.0.0-20220712014510-0a85c31ab51e
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/beorn7/perks v1.0.1 // indirect
|
github.com/beorn7/perks v1.0.1 // indirect
|
||||||
github.com/cespare/xxhash/v2 v2.1.2 // indirect
|
github.com/cespare/xxhash/v2 v2.1.2 // indirect
|
||||||
|
@@ -9,8 +9,8 @@ import (
|
|||||||
|
|
||||||
cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
|
cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
|
||||||
|
|
||||||
lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
|
|
||||||
agg "github.com/ClusterCockpit/cc-metric-collector/internal/metricAggregator"
|
agg "github.com/ClusterCockpit/cc-metric-collector/internal/metricAggregator"
|
||||||
|
lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
|
||||||
mct "github.com/ClusterCockpit/cc-metric-collector/pkg/multiChanTicker"
|
mct "github.com/ClusterCockpit/cc-metric-collector/pkg/multiChanTicker"
|
||||||
units "github.com/ClusterCockpit/cc-units"
|
units "github.com/ClusterCockpit/cc-units"
|
||||||
)
|
)
|
||||||
@@ -281,9 +281,7 @@ func (r *metricRouter) Start() {
|
|||||||
// Foward message received from collector channel
|
// Foward message received from collector channel
|
||||||
coll_forward := func(p lp.CCMetric) {
|
coll_forward := func(p lp.CCMetric) {
|
||||||
// receive from metric collector
|
// receive from metric collector
|
||||||
if !p.HasTag(r.config.HostnameTagName) {
|
p.AddTag(r.config.HostnameTagName, r.hostname)
|
||||||
p.AddTag(r.config.HostnameTagName, r.hostname)
|
|
||||||
}
|
|
||||||
if r.config.IntervalStamp {
|
if r.config.IntervalStamp {
|
||||||
p.SetTime(r.timestamp)
|
p.SetTime(r.timestamp)
|
||||||
}
|
}
|
||||||
@@ -312,9 +310,7 @@ func (r *metricRouter) Start() {
|
|||||||
cache_forward := func(p lp.CCMetric) {
|
cache_forward := func(p lp.CCMetric) {
|
||||||
// receive from metric collector
|
// receive from metric collector
|
||||||
if !r.dropMetric(p) {
|
if !r.dropMetric(p) {
|
||||||
if !p.HasTag(r.config.HostnameTagName) {
|
p.AddTag(r.config.HostnameTagName, r.hostname)
|
||||||
p.AddTag(r.config.HostnameTagName, r.hostname)
|
|
||||||
}
|
|
||||||
forward(p)
|
forward(p)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user