mirror of
https://github.com/ClusterCockpit/cc-metric-collector.git
synced 2025-08-03 01:50:34 +02:00
Merge branch 'develop' into exclude-devices-iostat
This commit is contained in:
@@ -9,22 +9,20 @@ import (
|
||||
"time"
|
||||
|
||||
cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
|
||||
lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message"
|
||||
lp "github.com/ClusterCockpit/cc-lib/ccMessage"
|
||||
)
|
||||
|
||||
// "log"
|
||||
|
||||
const MOUNTFILE = `/proc/self/mounts`
|
||||
|
||||
type DiskstatCollectorConfig struct {
|
||||
ExcludeMetrics []string `json:"exclude_metrics,omitempty"`
|
||||
ExcludeMounts []string `json:"exclude_mounts,omitempty"`
|
||||
}
|
||||
|
||||
type DiskstatCollector struct {
|
||||
metricCollector
|
||||
//matches map[string]int
|
||||
config IOstatCollectorConfig
|
||||
//devices map[string]IOstatCollectorEntry
|
||||
config DiskstatCollectorConfig
|
||||
allowedMetrics map[string]bool
|
||||
}
|
||||
|
||||
func (m *DiskstatCollector) Init(config json.RawMessage) error {
|
||||
@@ -33,12 +31,21 @@ func (m *DiskstatCollector) Init(config json.RawMessage) error {
|
||||
m.meta = map[string]string{"source": m.name, "group": "Disk"}
|
||||
m.setup()
|
||||
if len(config) > 0 {
|
||||
err := json.Unmarshal(config, &m.config)
|
||||
if err != nil {
|
||||
if err := json.Unmarshal(config, &m.config); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
file, err := os.Open(string(MOUNTFILE))
|
||||
m.allowedMetrics = map[string]bool{
|
||||
"disk_total": true,
|
||||
"disk_free": true,
|
||||
"part_max_used": true,
|
||||
}
|
||||
for _, excl := range m.config.ExcludeMetrics {
|
||||
if _, ok := m.allowedMetrics[excl]; ok {
|
||||
m.allowedMetrics[excl] = false
|
||||
}
|
||||
}
|
||||
file, err := os.Open(MOUNTFILE)
|
||||
if err != nil {
|
||||
cclog.ComponentError(m.name, err.Error())
|
||||
return err
|
||||
@@ -53,7 +60,7 @@ func (m *DiskstatCollector) Read(interval time.Duration, output chan lp.CCMessag
|
||||
return
|
||||
}
|
||||
|
||||
file, err := os.Open(string(MOUNTFILE))
|
||||
file, err := os.Open(MOUNTFILE)
|
||||
if err != nil {
|
||||
cclog.ComponentError(m.name, err.Error())
|
||||
return
|
||||
@@ -62,6 +69,7 @@ func (m *DiskstatCollector) Read(interval time.Duration, output chan lp.CCMessag
|
||||
|
||||
part_max_used := uint64(0)
|
||||
scanner := bufio.NewScanner(file)
|
||||
mountLoop:
|
||||
for scanner.Scan() {
|
||||
line := scanner.Text()
|
||||
if len(line) == 0 {
|
||||
@@ -77,13 +85,17 @@ func (m *DiskstatCollector) Read(interval time.Duration, output chan lp.CCMessag
|
||||
if strings.Contains(linefields[1], "boot") {
|
||||
continue
|
||||
}
|
||||
path := strings.Replace(linefields[1], `\040`, " ", -1)
|
||||
stat := syscall.Statfs_t{
|
||||
Blocks: 0,
|
||||
Bsize: 0,
|
||||
Bfree: 0,
|
||||
|
||||
mountPath := strings.Replace(linefields[1], `\040`, " ", -1)
|
||||
|
||||
for _, excl := range m.config.ExcludeMounts {
|
||||
if strings.Contains(mountPath, excl) {
|
||||
continue mountLoop
|
||||
}
|
||||
}
|
||||
err := syscall.Statfs(path, &stat)
|
||||
|
||||
stat := syscall.Statfs_t{}
|
||||
err := syscall.Statfs(mountPath, &stat)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
@@ -92,16 +104,20 @@ func (m *DiskstatCollector) Read(interval time.Duration, output chan lp.CCMessag
|
||||
}
|
||||
tags := map[string]string{"type": "node", "device": linefields[0]}
|
||||
total := (stat.Blocks * uint64(stat.Bsize)) / uint64(1000000000)
|
||||
y, err := lp.NewMessage("disk_total", tags, m.meta, map[string]interface{}{"value": total}, time.Now())
|
||||
if err == nil {
|
||||
y.AddMeta("unit", "GBytes")
|
||||
output <- y
|
||||
if m.allowedMetrics["disk_total"] {
|
||||
y, err := lp.NewMessage("disk_total", tags, m.meta, map[string]interface{}{"value": total}, time.Now())
|
||||
if err == nil {
|
||||
y.AddMeta("unit", "GBytes")
|
||||
output <- y
|
||||
}
|
||||
}
|
||||
free := (stat.Bfree * uint64(stat.Bsize)) / uint64(1000000000)
|
||||
y, err = lp.NewMessage("disk_free", tags, m.meta, map[string]interface{}{"value": free}, time.Now())
|
||||
if err == nil {
|
||||
y.AddMeta("unit", "GBytes")
|
||||
output <- y
|
||||
if m.allowedMetrics["disk_free"] {
|
||||
y, err := lp.NewMessage("disk_free", tags, m.meta, map[string]interface{}{"value": free}, time.Now())
|
||||
if err == nil {
|
||||
y.AddMeta("unit", "GBytes")
|
||||
output <- y
|
||||
}
|
||||
}
|
||||
if total > 0 {
|
||||
perc := (100 * (total - free)) / total
|
||||
@@ -110,10 +126,12 @@ func (m *DiskstatCollector) Read(interval time.Duration, output chan lp.CCMessag
|
||||
}
|
||||
}
|
||||
}
|
||||
y, err := lp.NewMessage("part_max_used", map[string]string{"type": "node"}, m.meta, map[string]interface{}{"value": int(part_max_used)}, time.Now())
|
||||
if err == nil {
|
||||
y.AddMeta("unit", "percent")
|
||||
output <- y
|
||||
if m.allowedMetrics["part_max_used"] {
|
||||
y, err := lp.NewMessage("part_max_used", map[string]string{"type": "node"}, m.meta, map[string]interface{}{"value": int(part_max_used)}, time.Now())
|
||||
if err == nil {
|
||||
y.AddMeta("unit", "percent")
|
||||
output <- y
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -6,10 +6,13 @@
|
||||
"exclude_metrics": [
|
||||
"disk_total"
|
||||
],
|
||||
"exclude_mounts": [
|
||||
"slurm-tmpfs"
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
The `diskstat` collector reads data from `/proc/self/mounts` and outputs a handful **node** metrics. If a metric is not required, it can be excluded from forwarding it to the sink.
|
||||
The `diskstat` collector reads data from `/proc/self/mounts` and outputs a handful **node** metrics. If a metric is not required, it can be excluded from forwarding it to the sink. Additionally, any mount point containing one of the strings specified in `exclude_mounts` will be skipped during metric collection.
|
||||
|
||||
Metrics per device (with `device` tag):
|
||||
* `disk_total` (unit `GBytes`)
|
||||
|
@@ -10,15 +10,16 @@ import (
|
||||
"time"
|
||||
|
||||
cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
|
||||
lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message"
|
||||
lp "github.com/ClusterCockpit/cc-lib/ccMessage"
|
||||
)
|
||||
|
||||
const NETSTATFILE = "/proc/net/dev"
|
||||
|
||||
type NetstatCollectorConfig struct {
|
||||
IncludeDevices []string `json:"include_devices"`
|
||||
SendAbsoluteValues bool `json:"send_abs_values"`
|
||||
SendDerivedValues bool `json:"send_derived_values"`
|
||||
IncludeDevices []string `json:"include_devices"`
|
||||
SendAbsoluteValues bool `json:"send_abs_values"`
|
||||
SendDerivedValues bool `json:"send_derived_values"`
|
||||
InterfaceAliases map[string][]string `json:"interface_aliases,omitempty"`
|
||||
}
|
||||
|
||||
type NetstatCollectorMetric struct {
|
||||
@@ -32,9 +33,26 @@ type NetstatCollectorMetric struct {
|
||||
|
||||
type NetstatCollector struct {
|
||||
metricCollector
|
||||
config NetstatCollectorConfig
|
||||
matches map[string][]NetstatCollectorMetric
|
||||
lastTimestamp time.Time
|
||||
config NetstatCollectorConfig
|
||||
aliasToCanonical map[string]string
|
||||
matches map[string][]NetstatCollectorMetric
|
||||
lastTimestamp time.Time
|
||||
}
|
||||
|
||||
func (m *NetstatCollector) buildAliasMapping() {
|
||||
m.aliasToCanonical = make(map[string]string)
|
||||
for canon, aliases := range m.config.InterfaceAliases {
|
||||
for _, alias := range aliases {
|
||||
m.aliasToCanonical[alias] = canon
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func getCanonicalName(raw string, aliasToCanonical map[string]string) string {
|
||||
if canon, ok := aliasToCanonical[raw]; ok {
|
||||
return canon
|
||||
}
|
||||
return raw
|
||||
}
|
||||
|
||||
func (m *NetstatCollector) Init(config json.RawMessage) error {
|
||||
@@ -77,6 +95,8 @@ func (m *NetstatCollector) Init(config json.RawMessage) error {
|
||||
}
|
||||
}
|
||||
|
||||
m.buildAliasMapping()
|
||||
|
||||
// Check access to net statistic file
|
||||
file, err := os.Open(NETSTATFILE)
|
||||
if err != nil {
|
||||
@@ -97,18 +117,20 @@ func (m *NetstatCollector) Init(config json.RawMessage) error {
|
||||
// Split line into fields
|
||||
f := strings.Fields(l)
|
||||
|
||||
// Get net device entry
|
||||
dev := strings.Trim(f[0], ": ")
|
||||
// Get raw and canonical names
|
||||
raw := strings.Trim(f[0], ": ")
|
||||
canonical := getCanonicalName(raw, m.aliasToCanonical)
|
||||
|
||||
// Check if device is a included device
|
||||
if _, ok := stringArrayContains(m.config.IncludeDevices, dev); ok {
|
||||
tags := map[string]string{"stype": "network", "stype-id": dev, "type": "node"}
|
||||
if _, ok := stringArrayContains(m.config.IncludeDevices, canonical); ok {
|
||||
// Tag will contain original device name (raw).
|
||||
tags := map[string]string{"stype": "network", "stype-id": raw, "type": "node"}
|
||||
meta_unit_byte := map[string]string{"source": m.name, "group": "Network", "unit": "bytes"}
|
||||
meta_unit_byte_per_sec := map[string]string{"source": m.name, "group": "Network", "unit": "bytes/sec"}
|
||||
meta_unit_pkts := map[string]string{"source": m.name, "group": "Network", "unit": "packets"}
|
||||
meta_unit_pkts_per_sec := map[string]string{"source": m.name, "group": "Network", "unit": "packets/sec"}
|
||||
|
||||
m.matches[dev] = []NetstatCollectorMetric{
|
||||
m.matches[canonical] = []NetstatCollectorMetric{
|
||||
{
|
||||
name: "net_bytes_in",
|
||||
index: fieldReceiveBytes,
|
||||
@@ -143,7 +165,6 @@ func (m *NetstatCollector) Init(config json.RawMessage) error {
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
if len(m.matches) == 0 {
|
||||
@@ -164,7 +185,7 @@ func (m *NetstatCollector) Read(interval time.Duration, output chan lp.CCMessage
|
||||
// Save current timestamp
|
||||
m.lastTimestamp = now
|
||||
|
||||
file, err := os.Open(string(NETSTATFILE))
|
||||
file, err := os.Open(NETSTATFILE)
|
||||
if err != nil {
|
||||
cclog.ComponentError(m.name, err.Error())
|
||||
return
|
||||
@@ -183,11 +204,12 @@ func (m *NetstatCollector) Read(interval time.Duration, output chan lp.CCMessage
|
||||
// Split line into fields
|
||||
f := strings.Fields(l)
|
||||
|
||||
// Get net device entry
|
||||
dev := strings.Trim(f[0], ":")
|
||||
// Get raw and canonical names
|
||||
raw := strings.Trim(f[0], ":")
|
||||
canonical := getCanonicalName(raw, m.aliasToCanonical)
|
||||
|
||||
// Check if device is a included device
|
||||
if devmetrics, ok := m.matches[dev]; ok {
|
||||
if devmetrics, ok := m.matches[canonical]; ok {
|
||||
for i := range devmetrics {
|
||||
metric := &devmetrics[i]
|
||||
|
||||
|
@@ -4,14 +4,19 @@
|
||||
```json
|
||||
"netstat": {
|
||||
"include_devices": [
|
||||
"eth0"
|
||||
"eth0",
|
||||
"eno1"
|
||||
],
|
||||
"send_abs_values" : true,
|
||||
"send_derived_values" : true
|
||||
"send_abs_values": true,
|
||||
"send_derived_values": true,
|
||||
"interface_aliases": {
|
||||
"eno1": ["eno1np0", "eno1_alt"],
|
||||
"eth0": ["eth0_alias"]
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
The `netstat` collector reads data from `/proc/net/dev` and outputs a handful **node** metrics. With the `include_devices` list you can specify which network devices should be measured. **Note**: Most other collectors use an _exclude_ list instead of an include list.
|
||||
The `netstat` collector reads data from `/proc/net/dev` and outputs a handful **node** metrics. With the `include_devices` list you can specify which network devices should be measured. **Note**: Most other collectors use an _exclude_ list instead of an include list. Optionally, you can define an interface_aliases mapping. For each canonical device (as listed in include_devices), you may provide an array of aliases that may be reported by the system. When an alias is detected, it is preferred for matching, while the output tag stype-id always shows the actual system-reported name.
|
||||
|
||||
Metrics:
|
||||
* `net_bytes_in` (`unit=bytes`)
|
||||
@@ -23,5 +28,4 @@ Metrics:
|
||||
* `net_pkts_in_bw` (`unit=packets/sec` if `send_derived_values == true`)
|
||||
* `net_pkts_out_bw` (`unit=packets/sec` if `send_derived_values == true`)
|
||||
|
||||
The device name is added as tag `stype=network,stype-id=<device>`.
|
||||
|
||||
The device name is added as tag `stype=network,stype-id=<device>`.
|
@@ -10,7 +10,7 @@ import (
|
||||
"time"
|
||||
|
||||
cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
|
||||
lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message"
|
||||
lp "github.com/ClusterCockpit/cc-lib/ccMessage"
|
||||
)
|
||||
|
||||
// These are the fields we read from the JSON configuration
|
||||
@@ -18,17 +18,20 @@ type NfsIOStatCollectorConfig struct {
|
||||
ExcludeMetrics []string `json:"exclude_metrics,omitempty"`
|
||||
ExcludeFilesystem []string `json:"exclude_filesystem,omitempty"`
|
||||
UseServerAddressAsSType bool `json:"use_server_as_stype,omitempty"`
|
||||
SendAbsoluteValues bool `json:"send_abs_values"`
|
||||
SendDerivedValues bool `json:"send_derived_values"`
|
||||
}
|
||||
|
||||
// This contains all variables we need during execution and the variables
|
||||
// defined by metricCollector (name, init, ...)
|
||||
type NfsIOStatCollector struct {
|
||||
metricCollector
|
||||
config NfsIOStatCollectorConfig // the configuration structure
|
||||
meta map[string]string // default meta information
|
||||
tags map[string]string // default tags
|
||||
data map[string]map[string]int64 // data storage for difference calculation
|
||||
key string // which device info should be used as subtype ID? 'server' or 'mntpoint', see NfsIOStatCollectorConfig.UseServerAddressAsSType
|
||||
config NfsIOStatCollectorConfig // the configuration structure
|
||||
meta map[string]string // default meta information
|
||||
tags map[string]string // default tags
|
||||
data map[string]map[string]int64 // data storage for difference calculation
|
||||
key string // which device info should be used as subtype ID? 'server' or 'mntpoint'
|
||||
lastTimestamp time.Time
|
||||
}
|
||||
|
||||
var deviceRegex = regexp.MustCompile(`device (?P<server>[^ ]+) mounted on (?P<mntpoint>[^ ]+) with fstype nfs(?P<version>\d*) statvers=[\d\.]+`)
|
||||
@@ -81,7 +84,6 @@ func (m *NfsIOStatCollector) readNfsiostats() map[string]map[string]int64 {
|
||||
data[current[m.key]][name] = val
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
current = nil
|
||||
}
|
||||
@@ -98,6 +100,9 @@ func (m *NfsIOStatCollector) Init(config json.RawMessage) error {
|
||||
m.meta = map[string]string{"source": m.name, "group": "NFS", "unit": "bytes"}
|
||||
m.tags = map[string]string{"type": "node"}
|
||||
m.config.UseServerAddressAsSType = false
|
||||
// Set default configuration
|
||||
m.config.SendAbsoluteValues = true
|
||||
m.config.SendDerivedValues = false
|
||||
if len(config) > 0 {
|
||||
err = json.Unmarshal(config, &m.config)
|
||||
if err != nil {
|
||||
@@ -110,12 +115,15 @@ func (m *NfsIOStatCollector) Init(config json.RawMessage) error {
|
||||
m.key = "server"
|
||||
}
|
||||
m.data = m.readNfsiostats()
|
||||
m.lastTimestamp = time.Now()
|
||||
m.init = true
|
||||
return err
|
||||
}
|
||||
|
||||
func (m *NfsIOStatCollector) Read(interval time.Duration, output chan lp.CCMessage) {
|
||||
timestamp := time.Now()
|
||||
now := time.Now()
|
||||
timeDiff := now.Sub(m.lastTimestamp).Seconds()
|
||||
m.lastTimestamp = now
|
||||
|
||||
// Get the current values for all mountpoints
|
||||
newdata := m.readNfsiostats()
|
||||
@@ -123,21 +131,30 @@ func (m *NfsIOStatCollector) Read(interval time.Duration, output chan lp.CCMessa
|
||||
for mntpoint, values := range newdata {
|
||||
// Was the mount point already present in the last iteration
|
||||
if old, ok := m.data[mntpoint]; ok {
|
||||
// Calculate the difference of old and new values
|
||||
for i := range values {
|
||||
x := values[i] - old[i]
|
||||
y, err := lp.NewMessage(fmt.Sprintf("nfsio_%s", i), m.tags, m.meta, map[string]interface{}{"value": x}, timestamp)
|
||||
if err == nil {
|
||||
if strings.HasPrefix(i, "page") {
|
||||
y.AddMeta("unit", "4K_Pages")
|
||||
for name, newVal := range values {
|
||||
if m.config.SendAbsoluteValues {
|
||||
msg, err := lp.NewMessage(fmt.Sprintf("nfsio_%s", name), m.tags, m.meta, map[string]interface{}{"value": newVal}, now)
|
||||
if err == nil {
|
||||
msg.AddTag("stype", "filesystem")
|
||||
msg.AddTag("stype-id", mntpoint)
|
||||
output <- msg
|
||||
}
|
||||
y.AddTag("stype", "filesystem")
|
||||
y.AddTag("stype-id", mntpoint)
|
||||
// Send it to output channel
|
||||
output <- y
|
||||
}
|
||||
// Update old to the new value for the next iteration
|
||||
old[i] = values[i]
|
||||
if m.config.SendDerivedValues {
|
||||
rate := float64(newVal-old[name]) / timeDiff
|
||||
msg, err := lp.NewMessage(fmt.Sprintf("nfsio_%s_bw", name), m.tags, m.meta, map[string]interface{}{"value": rate}, now)
|
||||
if err == nil {
|
||||
if strings.HasPrefix(name, "page") {
|
||||
msg.AddMeta("unit", "4K_pages/s")
|
||||
} else {
|
||||
msg.AddMeta("unit", "bytes/sec")
|
||||
}
|
||||
msg.AddTag("stype", "filesystem")
|
||||
msg.AddTag("stype-id", mntpoint)
|
||||
output <- msg
|
||||
}
|
||||
}
|
||||
old[name] = newVal
|
||||
}
|
||||
} else {
|
||||
// First time we see this mount point, store all values
|
||||
@@ -157,7 +174,6 @@ func (m *NfsIOStatCollector) Read(interval time.Duration, output chan lp.CCMessa
|
||||
m.data[mntpoint] = nil
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (m *NfsIOStatCollector) Close() {
|
||||
|
@@ -3,16 +3,18 @@
|
||||
```json
|
||||
"nfsiostat": {
|
||||
"exclude_metrics": [
|
||||
"nfsio_oread"
|
||||
"oread", "pageread"
|
||||
],
|
||||
"exclude_filesystems" : [
|
||||
"/mnt",
|
||||
"exclude_filesystems": [
|
||||
"/mnt"
|
||||
],
|
||||
"use_server_as_stype": false
|
||||
"use_server_as_stype": false,
|
||||
"send_abs_values": false,
|
||||
"send_derived_values": true
|
||||
}
|
||||
```
|
||||
|
||||
The `nfsiostat` collector reads data from `/proc/self/mountstats` and outputs a handful **node** metrics for each NFS filesystem. If a metric or filesystem is not required, it can be excluded from forwarding it to the sink.
|
||||
The `nfsiostat` collector reads data from `/proc/self/mountstats` and outputs a handful **node** metrics for each NFS filesystem. If a metric or filesystem is not required, it can be excluded from forwarding it to the sink. **Note:** When excluding metrics, you must provide the base metric name (e.g. pageread) without the nfsio_ prefix. This exclusion applies to both absolute and derived values.
|
||||
|
||||
Metrics:
|
||||
* `nfsio_nread`: Bytes transferred by normal `read()` calls
|
||||
@@ -24,4 +26,9 @@ Metrics:
|
||||
* `nfsio_nfsread`: Bytes transferred for reading from the server
|
||||
* `nfsio_nfswrite`: Pages transferred by writing to the server
|
||||
|
||||
The `nfsiostat` collector adds the mountpoint to the tags as `stype=filesystem,stype-id=<mountpoint>`. If the server address should be used instead of the mountpoint, use the `use_server_as_stype` config setting.
|
||||
For each of these, if derived values are enabled, an additional metric is sent with the `_bw` suffix, which represents the rate:
|
||||
|
||||
* For normal byte metrics: `unit=bytes/sec`
|
||||
* For page metrics: `unit=4K_pages/s`
|
||||
|
||||
The `nfsiostat` collector adds the mountpoint to the tags as `stype=filesystem,stype-id=<mountpoint>`. If the server address should be used instead of the mountpoint, use the `use_server_as_stype` config setting.
|
||||
|
@@ -1,11 +1,9 @@
|
||||
# Running average power limit (RAPL) metric collector
|
||||
## `rapl` collector
|
||||
|
||||
This collector reads running average power limit (RAPL) monitoring attributes to compute average power consumption metrics. See <https://www.kernel.org/doc/html/latest/power/powercap/powercap.html#monitoring-attributes>.
|
||||
|
||||
The Likwid metric collector provides similar functionality.
|
||||
|
||||
## Configuration
|
||||
|
||||
```json
|
||||
"rapl": {
|
||||
"exclude_device_by_id": ["0:1", "0:2"],
|
||||
@@ -13,6 +11,5 @@ The Likwid metric collector provides similar functionality.
|
||||
}
|
||||
```
|
||||
|
||||
## Metrics
|
||||
|
||||
Metrics:
|
||||
* `rapl_average_power`: average power consumption in Watt. The average is computed over the entire runtime from the last measurement to the current measurement
|
||||
|
Reference in New Issue
Block a user