Merge latest developments into main (#67)

* Update configuration.md

Add an additional receiver to have better alignment of components

* Change default GpfsCollector command to `mmpmon` (#53)

* Set default cmd to 'mmpmon'

* Reuse looked up path

* Cast const to string

* Just download LIKWID to get the headers (#54)

* Just download LIKWID to get the headers

* Remove perl-Data-Dumper from BuildRequires, only required by LIKWID build

* Add HttpReceiver as counterpart to the HttpSink (#49)

* Use GBytes as unit for large memory numbers

* Make maxForward configurable, save old name in meta in rename metrics and make the hostname tag key configurable

* Single release action (#55)

Building all RPMs and releasing in a single workflow

* Makefile target to build binary-only Debian packages (#61)

* Add 'install' and 'DEB' make targets to build binary-only Debian packages

* Add control file for DEB builds

* Use a single line for bash loop in make clean

* Add config options for retry intervals of InfluxDB clients (#59)

* Refactoring of LikwidCollector and metric units (#62)

* Reduce complexity of LikwidCollector and allow metric units

* Add unit to LikwidCollector docu and fix some typos

* Make library path configurable

* Use old metric name in Ganglia if rename has happened in the router (#60)

* Use old metric name if rename has happened in the router

* Also check for Ganglia renames for the oldname

* Derived metrics (#57)

* Add time-based derivatived (e.g. bandwidth) to some collectors

* Add documentation

* Add comments

* Fix: Only compute rates with a valid previous state

* Only compute rates with a valid previous state

* Define const values for net/dev fields

* Set default config values

* Add comments

* Refactor: Consolidate data structures

* Refactor: Consolidate data structures

* Refactor: Avoid struct deep copy

* Refactor: Avoid redundant tag maps

* Refactor: Use int64 type for absolut values

Co-authored-by: Holger Obermaier <40787752+ho-ob@users.noreply.github.com>

* Simplified iota usage

* Move unit tag to meta data tags

* Derived metrics (#65)

* Add time-based derivatived (e.g. bandwidth) to some collectors

* Add documentation

* Add comments

* Fix: Only compute rates with a valid previous state

* Only compute rates with a valid previous state

* Define const values for net/dev fields

* Set default config values

* Add comments

* Refactor: Consolidate data structures

* Refactor: Consolidate data structures

* Refactor: Avoid struct deep copy

* Refactor: Avoid redundant tag maps

* Refactor: Use int64 type for absolut values

* Update LustreCollector

Co-authored-by: Holger Obermaier <40787752+ho-ob@users.noreply.github.com>

* Meta to tags list and map for sinks (#63)

* Change ccMetric->Influx functions

* Use a meta_as_tags string list in config but create a lookup map afterwards

* Add meta as tag logic to sampleSink

* Fix staticcheck warnings (#66)

Co-authored-by: Holger Obermaier <40787752+ho-ob@users.noreply.github.com>
This commit is contained in:
Thomas Gruber
2022-03-15 16:41:11 +01:00
committed by GitHub
parent 3157386b3e
commit 3f76947f54
45 changed files with 1329 additions and 714 deletions

View File

@@ -1,79 +1,25 @@
# Use central installation
CENTRAL_INSTALL = false
# How to access hardware performance counters through LIKWID.
# Recommended is 'direct' mode
ACCESSMODE = direct
#######################################################################
# if CENTRAL_INSTALL == true
#######################################################################
# Path to central installation (if CENTRAL_INSTALL=true)
LIKWID_BASE=/apps/likwid/5.2.1
# LIKWID version (should be same major version as central installation, 5.2.x)
all: likwid
# LIKWID version
LIKWID_VERSION = 5.2.1
#######################################################################
# if CENTRAL_INSTALL == false and ACCESSMODE == accessdaemon
#######################################################################
# Where to install the accessdaemon
DAEMON_INSTALLDIR = /usr/local
# Which user to use for the accessdaemon
DAEMON_USER = root
# Which group to use for the accessdaemon
DAEMON_GROUP = root
.ONESHELL:
.PHONY: likwid
likwid:
INSTALL_FOLDER="$${PWD}/likwid"
BUILD_FOLDER="$${PWD}/likwidbuild"
if [ -d $${INSTALL_FOLDER} ]; then rm -r $${INSTALL_FOLDER}; fi
mkdir --parents --verbose $${INSTALL_FOLDER} $${BUILD_FOLDER}
wget -P "$${BUILD_FOLDER}" ftp://ftp.rrze.uni-erlangen.de/mirrors/likwid/likwid-$(LIKWID_VERSION).tar.gz
tar -C $${BUILD_FOLDER} -xf $${BUILD_FOLDER}/likwid-$(LIKWID_VERSION).tar.gz
install -Dpm 0644 $${BUILD_FOLDER}/likwid-$(LIKWID_VERSION)/src/includes/likwid*.h $${INSTALL_FOLDER}/
install -Dpm 0644 $${BUILD_FOLDER}/likwid-$(LIKWID_VERSION)/src/includes/bstrlib.h $${INSTALL_FOLDER}/
rm -r $${BUILD_FOLDER}
#################################################
# No need to change anything below this line
#################################################
INSTALL_FOLDER = ./likwid
BUILD_FOLDER = ./likwid/build
ifneq ($(strip $(CENTRAL_INSTALL)),true)
LIKWID_BASE := $(shell pwd)/$(INSTALL_FOLDER)
DAEMON_BASE := $(LIKWID_BASE)
GROUPS_BASE := $(LIKWID_BASE)/groups
all: $(INSTALL_FOLDER)/liblikwid.a cleanup
else
DAEMON_BASE= $(LIKWID_BASE)/sbin
all: $(INSTALL_FOLDER)/liblikwid.a cleanup
endif
$(BUILD_FOLDER)/likwid-$(LIKWID_VERSION).tar.gz: $(BUILD_FOLDER)
wget -P $(BUILD_FOLDER) ftp://ftp.rrze.uni-erlangen.de/mirrors/likwid/likwid-$(LIKWID_VERSION).tar.gz
$(BUILD_FOLDER):
mkdir -p $(BUILD_FOLDER)
$(INSTALL_FOLDER):
mkdir -p $(INSTALL_FOLDER)
$(BUILD_FOLDER)/likwid-$(LIKWID_VERSION): $(BUILD_FOLDER)/likwid-$(LIKWID_VERSION).tar.gz
tar -C $(BUILD_FOLDER) -xf $(BUILD_FOLDER)/likwid-$(LIKWID_VERSION).tar.gz
$(INSTALL_FOLDER)/liblikwid.a: $(BUILD_FOLDER)/likwid-$(LIKWID_VERSION) $(INSTALL_FOLDER)
cd "$(BUILD_FOLDER)/likwid-$(LIKWID_VERSION)" && make "PREFIX=$(LIKWID_BASE)" "SHARED_LIBRARY=false" "ACCESSMODE=$(ACCESSMODE)" "INSTALLED_ACCESSDAEMON=$(DAEMON_INSTALLDIR)/likwid-accessD"
cp \
$(BUILD_FOLDER)/likwid-$(LIKWID_VERSION)/liblikwid.a \
$(BUILD_FOLDER)/likwid-$(LIKWID_VERSION)/ext/hwloc/liblikwid-hwloc.a \
$(BUILD_FOLDER)/likwid-$(LIKWID_VERSION)/src/includes/likwid*.h \
$(BUILD_FOLDER)/likwid-$(LIKWID_VERSION)/src/includes/bstrlib.h \
$(INSTALL_FOLDER)
$(DAEMON_INSTALLDIR)/likwid-accessD: $(BUILD_FOLDER)/likwid-$(LIKWID_VERSION)/likwid-accessD
sudo -u $(DAEMON_USER) -g $(DAEMON_GROUP) install -m 4775 $(BUILD_FOLDER)/likwid-$(LIKWID_VERSION)/likwid-accessD $(DAEMON_INSTALLDIR)/likwid-accessD
prepare_collector: likwidMetric.go
cp likwidMetric.go likwidMetric.go.orig
sed -i -e s+"const GROUPPATH =.*"+"const GROUPPATH = \`$(GROUPS_BASE)\`"+g likwidMetric.go
cleanup:
rm -rf $(BUILD_FOLDER)
clean: cleanup
clean:
rm -rf likwid
.PHONY: clean

View File

@@ -57,7 +57,7 @@ func (m *CPUFreqCpuInfoCollector) Init(config json.RawMessage) error {
const cpuInfoFile = "/proc/cpuinfo"
file, err := os.Open(cpuInfoFile)
if err != nil {
return fmt.Errorf("Failed to open file '%s': %v", cpuInfoFile, err)
return fmt.Errorf("failed to open file '%s': %v", cpuInfoFile, err)
}
defer file.Close()
@@ -106,14 +106,14 @@ func (m *CPUFreqCpuInfoCollector) Init(config json.RawMessage) error {
topology.coreID = coreID
topology.coreID_int, err = strconv.ParseInt(coreID, 10, 64)
if err != nil {
return fmt.Errorf("Unable to convert coreID '%s' to int64: %v", coreID, err)
return fmt.Errorf("unable to convert coreID '%s' to int64: %v", coreID, err)
}
// Physical package ID
topology.physicalPackageID = physicalPackageID
topology.physicalPackageID_int, err = strconv.ParseInt(physicalPackageID, 10, 64)
if err != nil {
return fmt.Errorf("Unable to convert physicalPackageID '%s' to int64: %v", physicalPackageID, err)
return fmt.Errorf("unable to convert physicalPackageID '%s' to int64: %v", physicalPackageID, err)
}
// increase maximun socket / package ID, when required

View File

@@ -70,10 +70,10 @@ func (m *CPUFreqCollector) Init(config json.RawMessage) error {
globPattern := filepath.Join(baseDir, "cpu[0-9]*")
cpuDirs, err := filepath.Glob(globPattern)
if err != nil {
return fmt.Errorf("Unable to glob files with pattern '%s': %v", globPattern, err)
return fmt.Errorf("unable to glob files with pattern '%s': %v", globPattern, err)
}
if cpuDirs == nil {
return fmt.Errorf("Unable to find any files with pattern '%s'", globPattern)
return fmt.Errorf("unable to find any files with pattern '%s'", globPattern)
}
// Initialize CPU topology
@@ -82,38 +82,38 @@ func (m *CPUFreqCollector) Init(config json.RawMessage) error {
processor := strings.TrimPrefix(cpuDir, "/sys/devices/system/cpu/cpu")
processor_int, err := strconv.ParseInt(processor, 10, 64)
if err != nil {
return fmt.Errorf("Unable to convert cpuID '%s' to int64: %v", processor, err)
return fmt.Errorf("unable to convert cpuID '%s' to int64: %v", processor, err)
}
// Read package ID
physicalPackageIDFile := filepath.Join(cpuDir, "topology", "physical_package_id")
line, err := ioutil.ReadFile(physicalPackageIDFile)
if err != nil {
return fmt.Errorf("Unable to read physical package ID from file '%s': %v", physicalPackageIDFile, err)
return fmt.Errorf("unable to read physical package ID from file '%s': %v", physicalPackageIDFile, err)
}
physicalPackageID := strings.TrimSpace(string(line))
physicalPackageID_int, err := strconv.ParseInt(physicalPackageID, 10, 64)
if err != nil {
return fmt.Errorf("Unable to convert packageID '%s' to int64: %v", physicalPackageID, err)
return fmt.Errorf("unable to convert packageID '%s' to int64: %v", physicalPackageID, err)
}
// Read core ID
coreIDFile := filepath.Join(cpuDir, "topology", "core_id")
line, err = ioutil.ReadFile(coreIDFile)
if err != nil {
return fmt.Errorf("Unable to read core ID from file '%s': %v", coreIDFile, err)
return fmt.Errorf("unable to read core ID from file '%s': %v", coreIDFile, err)
}
coreID := strings.TrimSpace(string(line))
coreID_int, err := strconv.ParseInt(coreID, 10, 64)
if err != nil {
return fmt.Errorf("Unable to convert coreID '%s' to int64: %v", coreID, err)
return fmt.Errorf("unable to convert coreID '%s' to int64: %v", coreID, err)
}
// Check access to current frequency file
scalingCurFreqFile := filepath.Join(cpuDir, "cpufreq", "scaling_cur_freq")
err = unix.Access(scalingCurFreqFile, unix.R_OK)
if err != nil {
return fmt.Errorf("Unable to access file '%s': %v", scalingCurFreqFile, err)
return fmt.Errorf("unable to access file '%s': %v", scalingCurFreqFile, err)
}
t := &m.topology[processor_int]

View File

@@ -21,11 +21,10 @@ type CpustatCollectorConfig struct {
type CpustatCollector struct {
metricCollector
config CpustatCollectorConfig
matches map[string]int
cputags map[string]map[string]string
nodetags map[string]string
num_cpus_metric lp.CCMetric
config CpustatCollectorConfig
matches map[string]int
cputags map[string]map[string]string
nodetags map[string]string
}
func (m *CpustatCollector) Init(config json.RawMessage) error {

View File

@@ -61,7 +61,7 @@ func (m *CustomCmdCollector) Init(config json.RawMessage) error {
}
}
if len(m.files) == 0 && len(m.commands) == 0 {
return errors.New("No metrics to collect")
return errors.New("no metrics to collect")
}
m.handler = influx.NewMetricHandler()
m.parser = influx.NewParser(m.handler)

View File

@@ -17,14 +17,24 @@ import (
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
)
const DEFAULT_GPFS_CMD = "mmpmon"
type GpfsCollectorLastState struct {
bytesRead int64
bytesWritten int64
}
type GpfsCollector struct {
metricCollector
tags map[string]string
config struct {
Mmpmon string `json:"mmpmon_path,omitempty"`
ExcludeFilesystem []string `json:"exclude_filesystem,omitempty"`
SendBandwidths bool `json:"send_bandwidths"`
}
skipFS map[string]struct{}
skipFS map[string]struct{}
lastTimestamp time.Time // Store time stamp of last tick to derive bandwidths
lastState map[string]GpfsCollectorLastState
}
func (m *GpfsCollector) Init(config json.RawMessage) error {
@@ -38,7 +48,7 @@ func (m *GpfsCollector) Init(config json.RawMessage) error {
m.setup()
// Set default mmpmon binary
m.config.Mmpmon = "/usr/lpp/mmfs/bin/mmpmon"
m.config.Mmpmon = DEFAULT_GPFS_CMD
// Read JSON configuration
if len(config) > 0 {
@@ -64,17 +74,18 @@ func (m *GpfsCollector) Init(config json.RawMessage) error {
// GPFS / IBM Spectrum Scale file system statistics can only be queried by user root
user, err := user.Current()
if err != nil {
return fmt.Errorf("Failed to get current user: %v", err)
return fmt.Errorf("failed to get current user: %v", err)
}
if user.Uid != "0" {
return fmt.Errorf("GPFS file system statistics can only be queried by user root")
}
// Check if mmpmon is in executable search path
_, err = exec.LookPath(m.config.Mmpmon)
p, err := exec.LookPath(m.config.Mmpmon)
if err != nil {
return fmt.Errorf("Failed to find mmpmon binary '%s': %v", m.config.Mmpmon, err)
return fmt.Errorf("failed to find mmpmon binary '%s': %v", m.config.Mmpmon, err)
}
m.config.Mmpmon = p
m.init = true
return nil
@@ -86,6 +97,13 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) {
return
}
// Current time stamp
now := time.Now()
// time difference to last time stamp
timeDiff := now.Sub(m.lastTimestamp).Seconds()
// Save current timestamp
m.lastTimestamp = now
// mmpmon:
// -p: generate output that can be parsed
// -s: suppress the prompt on input
@@ -145,6 +163,12 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) {
}
m.tags["filesystem"] = filesystem
if _, ok := m.lastState[filesystem]; !ok {
m.lastState[filesystem] = GpfsCollectorLastState{
bytesRead: -1,
bytesWritten: -1,
}
}
// return code
rc, err := strconv.Atoi(key_value["_rc_"])
@@ -188,6 +212,14 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) {
if y, err := lp.New("gpfs_bytes_read", m.tags, m.meta, map[string]interface{}{"value": bytesRead}, timestamp); err == nil {
output <- y
}
if m.config.SendBandwidths {
if lastBytesRead := m.lastState[filesystem].bytesRead; lastBytesRead >= 0 {
bwRead := float64(bytesRead-lastBytesRead) / timeDiff
if y, err := lp.New("gpfs_bw_read", m.tags, m.meta, map[string]interface{}{"value": bwRead}, timestamp); err == nil {
output <- y
}
}
}
// bytes written
bytesWritten, err := strconv.ParseInt(key_value["_bw_"], 10, 64)
@@ -200,6 +232,21 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) {
if y, err := lp.New("gpfs_bytes_written", m.tags, m.meta, map[string]interface{}{"value": bytesWritten}, timestamp); err == nil {
output <- y
}
if m.config.SendBandwidths {
if lastBytesWritten := m.lastState[filesystem].bytesRead; lastBytesWritten >= 0 {
bwWrite := float64(bytesWritten-lastBytesWritten) / timeDiff
if y, err := lp.New("gpfs_bw_write", m.tags, m.meta, map[string]interface{}{"value": bwWrite}, timestamp); err == nil {
output <- y
}
}
}
if m.config.SendBandwidths {
m.lastState[filesystem] = GpfsCollectorLastState{
bytesRead: bytesRead,
bytesWritten: bytesWritten,
}
}
// number of opens
numOpens, err := strconv.ParseInt(key_value["_oc_"], 10, 64)

View File

@@ -5,7 +5,8 @@
"mmpmon_path": "/path/to/mmpmon",
"exclude_filesystem": [
"fs1"
]
],
"send_bandwidths" : true
}
```
@@ -16,15 +17,18 @@ The reported filesystems can be filtered with the `exclude_filesystem` option
in the configuration.
The path to the `mmpmon` command can be configured with the `mmpmon_path` option
in the configuration.
in the configuration. If nothing is set, the collector searches in `$PATH` for `mmpmon`.
Metrics:
* `bytes_read`
* `gpfs_bytes_read`
* `gpfs_bytes_written`
* `gpfs_num_opens`
* `gpfs_num_closes`
* `gpfs_num_reads`
* `gpfs_num_readdirs`
* `gpfs_num_inode_updates`
* `gpfs_bw_read` (if `send_bandwidths == true`)
* `gpfs_bw_write` (if `send_bandwidths == true`)
The collector adds a `filesystem` tag to all metrics

View File

@@ -16,7 +16,7 @@ import (
"time"
)
const IB_BASEPATH = `/sys/class/infiniband/`
const IB_BASEPATH = "/sys/class/infiniband/"
type InfinibandCollectorInfo struct {
LID string // IB local Identifier (LID)
@@ -24,14 +24,18 @@ type InfinibandCollectorInfo struct {
port string // IB device port
portCounterFiles map[string]string // mapping counter name -> sysfs file
tagSet map[string]string // corresponding tag list
lastState map[string]int64 // State from last measurement
}
type InfinibandCollector struct {
metricCollector
config struct {
ExcludeDevices []string `json:"exclude_devices,omitempty"` // IB device to exclude e.g. mlx5_0
ExcludeDevices []string `json:"exclude_devices,omitempty"` // IB device to exclude e.g. mlx5_0
SendAbsoluteValues bool `json:"send_abs_values"` // Send absolut values as read from sys filesystem
SendDerivedValues bool `json:"send_derived_values"` // Send derived values e.g. rates
}
info []*InfinibandCollectorInfo
info []*InfinibandCollectorInfo
lastTimestamp time.Time // Store time stamp of last tick to derive bandwidths
}
// Init initializes the Infiniband collector by walking through files below IB_BASEPATH
@@ -49,6 +53,11 @@ func (m *InfinibandCollector) Init(config json.RawMessage) error {
"source": m.name,
"group": "Network",
}
// Set default configuration,
m.config.SendAbsoluteValues = true
m.config.SendDerivedValues = false
// Read configuration file, allow overwriting default config
if len(config) > 0 {
err = json.Unmarshal(config, &m.config)
if err != nil {
@@ -60,10 +69,10 @@ func (m *InfinibandCollector) Init(config json.RawMessage) error {
globPattern := filepath.Join(IB_BASEPATH, "*", "ports", "*")
ibDirs, err := filepath.Glob(globPattern)
if err != nil {
return fmt.Errorf("Unable to glob files with pattern %s: %v", globPattern, err)
return fmt.Errorf("unable to glob files with pattern %s: %v", globPattern, err)
}
if ibDirs == nil {
return fmt.Errorf("Unable to find any directories with pattern %s", globPattern)
return fmt.Errorf("unable to find any directories with pattern %s", globPattern)
}
for _, path := range ibDirs {
@@ -106,10 +115,16 @@ func (m *InfinibandCollector) Init(config json.RawMessage) error {
for _, counterFile := range portCounterFiles {
err := unix.Access(counterFile, unix.R_OK)
if err != nil {
return fmt.Errorf("Unable to access %s: %v", counterFile, err)
return fmt.Errorf("unable to access %s: %v", counterFile, err)
}
}
// Initialize last state
lastState := make(map[string]int64)
for counter := range portCounterFiles {
lastState[counter] = -1
}
m.info = append(m.info,
&InfinibandCollectorInfo{
LID: LID,
@@ -122,11 +137,12 @@ func (m *InfinibandCollector) Init(config json.RawMessage) error {
"port": port,
"lid": LID,
},
lastState: lastState,
})
}
if len(m.info) == 0 {
return fmt.Errorf("Found no IB devices")
return fmt.Errorf("found no IB devices")
}
m.init = true
@@ -141,9 +157,17 @@ func (m *InfinibandCollector) Read(interval time.Duration, output chan lp.CCMetr
return
}
// Current time stamp
now := time.Now()
// time difference to last time stamp
timeDiff := now.Sub(m.lastTimestamp).Seconds()
// Save current timestamp
m.lastTimestamp = now
for _, info := range m.info {
for counterName, counterFile := range info.portCounterFiles {
// Read counter file
line, err := ioutil.ReadFile(counterFile)
if err != nil {
cclog.ComponentError(
@@ -152,6 +176,8 @@ func (m *InfinibandCollector) Read(interval time.Duration, output chan lp.CCMetr
continue
}
data := strings.TrimSpace(string(line))
// convert counter to int64
v, err := strconv.ParseInt(data, 10, 64)
if err != nil {
cclog.ComponentError(
@@ -159,8 +185,24 @@ func (m *InfinibandCollector) Read(interval time.Duration, output chan lp.CCMetr
fmt.Sprintf("Read(): Failed to convert Infininiband metrice %s='%s' to int64: %v", counterName, data, err))
continue
}
if y, err := lp.New(counterName, info.tagSet, m.meta, map[string]interface{}{"value": v}, now); err == nil {
output <- y
// Send absolut values
if m.config.SendAbsoluteValues {
if y, err := lp.New(counterName, info.tagSet, m.meta, map[string]interface{}{"value": v}, now); err == nil {
output <- y
}
}
// Send derived values
if m.config.SendDerivedValues {
if info.lastState[counterName] >= 0 {
rate := float64((v - info.lastState[counterName])) / timeDiff
if y, err := lp.New(counterName+"_bw", info.tagSet, m.meta, map[string]interface{}{"value": rate}, now); err == nil {
output <- y
}
}
// Save current state
info.lastState[counterName] = v
}
}

View File

@@ -5,7 +5,9 @@
"ibstat": {
"exclude_devices": [
"mlx4"
]
],
"send_abs_values": true,
"send_derived_values": true
}
```
@@ -22,5 +24,9 @@ Metrics:
* `ib_xmit`
* `ib_recv_pkts`
* `ib_xmit_pkts`
* `ib_recv_bw` (if `send_derived_values == true`)
* `ib_xmit_bw` (if `send_derived_values == true`)
* `ib_recv_pkts_bw` (if `send_derived_values == true`)
* `ib_xmit_pkts_bw` (if `send_derived_values == true`)
The collector adds a `device` tag to all metrics

View File

@@ -54,7 +54,7 @@ func (m *IpmiCollector) Init(config json.RawMessage) error {
m.ipmisensors = p
}
if len(m.ipmitool) == 0 && len(m.ipmisensors) == 0 {
return errors.New("No IPMI reader found")
return errors.New("no IPMI reader found")
}
m.init = true
return nil

View File

@@ -15,7 +15,6 @@ import (
"io/ioutil"
"math"
"os"
"regexp"
"strconv"
"strings"
"time"
@@ -28,48 +27,6 @@ import (
"github.com/NVIDIA/go-nvml/pkg/dl"
)
type MetricScope string
const (
METRIC_SCOPE_HWTHREAD = iota
METRIC_SCOPE_CORE
METRIC_SCOPE_LLC
METRIC_SCOPE_NUMA
METRIC_SCOPE_DIE
METRIC_SCOPE_SOCKET
METRIC_SCOPE_NODE
)
func (ms MetricScope) String() string {
return string(ms)
}
func (ms MetricScope) Likwid() string {
LikwidDomains := map[string]string{
"cpu": "",
"core": "",
"llc": "C",
"numadomain": "M",
"die": "D",
"socket": "S",
"node": "N",
}
return LikwidDomains[string(ms)]
}
func (ms MetricScope) Granularity() int {
for i, g := range GetAllMetricScopes() {
if ms == g {
return i
}
}
return -1
}
func GetAllMetricScopes() []MetricScope {
return []MetricScope{"cpu" /*, "core", "llc", "numadomain", "die",*/, "socket", "node"}
}
const (
LIKWID_LIB_NAME = "liblikwid.so"
LIKWID_LIB_DL_FLAGS = dl.RTLD_LAZY | dl.RTLD_GLOBAL
@@ -77,18 +34,16 @@ const (
)
type LikwidCollectorMetricConfig struct {
Name string `json:"name"` // Name of the metric
Calc string `json:"calc"` // Calculation for the metric using
//Aggr string `json:"aggregation"` // if scope unequal to LIKWID metric scope, the values are combined (sum, min, max, mean or avg, median)
Scope MetricScope `json:"scope"` // scope for calculation. subscopes are aggregated using the 'aggregation' function
Publish bool `json:"publish"`
granulatity MetricScope
Name string `json:"name"` // Name of the metric
Calc string `json:"calc"` // Calculation for the metric using
Type string `json:"type"` // Metric type (aka node, socket, cpu, ...)
Publish bool `json:"publish"`
Unit string `json:"unit"` // Unit of metric if any
}
type LikwidCollectorEventsetConfig struct {
Events map[string]string `json:"events"`
granulatity map[string]MetricScope
Metrics []LikwidCollectorMetricConfig `json:"metrics"`
Events map[string]string `json:"events"`
Metrics []LikwidCollectorMetricConfig `json:"metrics"`
}
type LikwidCollectorConfig struct {
@@ -98,28 +53,28 @@ type LikwidCollectorConfig struct {
InvalidToZero bool `json:"invalid_to_zero,omitempty"`
AccessMode string `json:"access_mode,omitempty"`
DaemonPath string `json:"accessdaemon_path,omitempty"`
LibraryPath string `json:"liblikwid_path,omitempty"`
}
type LikwidCollector struct {
metricCollector
cpulist []C.int
cpu2tid map[int]int
sock2tid map[int]int
scopeRespTids map[MetricScope]map[int]int
metrics map[C.int]map[string]int
groups []C.int
config LikwidCollectorConfig
results map[int]map[int]map[string]interface{}
mresults map[int]map[int]map[string]float64
gmresults map[int]map[string]float64
basefreq float64
running bool
cpulist []C.int
cpu2tid map[int]int
sock2tid map[int]int
metrics map[C.int]map[string]int
groups []C.int
config LikwidCollectorConfig
results map[int]map[int]map[string]interface{}
mresults map[int]map[int]map[string]float64
gmresults map[int]map[string]float64
basefreq float64
running bool
}
type LikwidMetric struct {
name string
search string
scope MetricScope
scope string
group_idx int
}
@@ -131,152 +86,43 @@ func eventsToEventStr(events map[string]string) string {
return strings.Join(elist, ",")
}
func getGranularity(counter, event string) MetricScope {
if strings.HasPrefix(counter, "PMC") || strings.HasPrefix(counter, "FIXC") {
return "cpu"
} else if strings.Contains(counter, "BOX") || strings.Contains(counter, "DEV") {
return "socket"
} else if strings.HasPrefix(counter, "PWR") {
if event == "RAPL_CORE_ENERGY" {
return "cpu"
} else {
return "socket"
}
}
return "unknown"
}
func getBaseFreq() float64 {
var freq float64 = math.NaN()
C.power_init(0)
info := C.get_powerInfo()
if float64(info.baseFrequency) != 0 {
freq = float64(info.baseFrequency) * 1e3
freq = float64(info.baseFrequency) * 1e6
} else {
buffer, err := ioutil.ReadFile("/sys/devices/system/cpu/cpu0/cpufreq/bios_limit")
if err == nil {
data := strings.Replace(string(buffer), "\n", "", -1)
x, err := strconv.ParseInt(data, 0, 64)
if err == nil {
freq = float64(x) * 1e3
freq = float64(x) * 1e6
}
}
}
return freq
}
func (m *LikwidCollector) initGranularity() {
splitRegex := regexp.MustCompile("[+-/*()]")
for _, evset := range m.config.Eventsets {
evset.granulatity = make(map[string]MetricScope)
for counter, event := range evset.Events {
gran := getGranularity(counter, event)
if gran.Granularity() >= 0 {
evset.granulatity[counter] = gran
}
}
for i, metric := range evset.Metrics {
s := splitRegex.Split(metric.Calc, -1)
gran := MetricScope("cpu")
evset.Metrics[i].granulatity = gran
for _, x := range s {
if _, ok := evset.Events[x]; ok {
if evset.granulatity[x].Granularity() > gran.Granularity() {
gran = evset.granulatity[x]
}
}
}
evset.Metrics[i].granulatity = gran
}
}
for i, metric := range m.config.Metrics {
s := splitRegex.Split(metric.Calc, -1)
gran := MetricScope("cpu")
m.config.Metrics[i].granulatity = gran
for _, x := range s {
for _, evset := range m.config.Eventsets {
for _, m := range evset.Metrics {
if m.Name == x && m.granulatity.Granularity() > gran.Granularity() {
gran = m.granulatity
}
}
}
}
m.config.Metrics[i].granulatity = gran
}
}
type TopoResolveFunc func(cpuid int) int
func (m *LikwidCollector) getResponsiblities() map[MetricScope]map[int]int {
get_cpus := func(scope MetricScope) map[int]int {
var slist []int
var cpu C.int
var input func(index int) string
switch scope {
case "node":
slist = []int{0}
input = func(index int) string { return "N:0" }
case "socket":
input = func(index int) string { return fmt.Sprintf("%s%d:0", scope.Likwid(), index) }
slist = topo.SocketList()
// case "numadomain":
// input = func(index int) string { return fmt.Sprintf("%s%d:0", scope.Likwid(), index) }
// slist = topo.NumaNodeList()
// cclog.Debug(scope, " ", input(0), " ", slist)
// case "die":
// input = func(index int) string { return fmt.Sprintf("%s%d:0", scope.Likwid(), index) }
// slist = topo.DieList()
// case "llc":
// input = fmt.Sprintf("%s%d:0", scope.Likwid(), s)
// slist = topo.LLCacheList()
case "cpu":
input = func(index int) string { return fmt.Sprintf("%d", index) }
slist = topo.CpuList()
case "hwthread":
input = func(index int) string { return fmt.Sprintf("%d", index) }
slist = topo.CpuList()
}
outmap := make(map[int]int)
for _, s := range slist {
t := C.CString(input(s))
clen := C.cpustr_to_cpulist(t, &cpu, 1)
if int(clen) == 1 {
outmap[s] = m.cpu2tid[int(cpu)]
} else {
cclog.Error(fmt.Sprintf("Cannot determine responsible CPU for %s", input(s)))
outmap[s] = -1
}
C.free(unsafe.Pointer(t))
}
return outmap
}
scopes := GetAllMetricScopes()
complete := make(map[MetricScope]map[int]int)
for _, s := range scopes {
complete[s] = get_cpus(s)
}
return complete
}
func (m *LikwidCollector) Init(config json.RawMessage) error {
var ret C.int
m.name = "LikwidCollector"
m.config.AccessMode = LIKWID_DEF_ACCESSMODE
m.config.LibraryPath = LIKWID_LIB_NAME
if len(config) > 0 {
err := json.Unmarshal(config, &m.config)
if err != nil {
return err
}
}
lib := dl.New(LIKWID_LIB_NAME, LIKWID_LIB_DL_FLAGS)
lib := dl.New(m.config.LibraryPath, LIKWID_LIB_DL_FLAGS)
if lib == nil {
return fmt.Errorf("error instantiating DynamicLibrary for %s", LIKWID_LIB_NAME)
return fmt.Errorf("error instantiating DynamicLibrary for %s", m.config.LibraryPath)
}
err := lib.Open()
if err != nil {
return fmt.Errorf("error opening %s: %v", LIKWID_LIB_NAME, err)
return fmt.Errorf("error opening %s: %v", m.config.LibraryPath, err)
}
if m.config.ForceOverwrite {
@@ -306,10 +152,6 @@ func (m *LikwidCollector) Init(config json.RawMessage) error {
return err
}
// Determine which counter works at which level. PMC*: cpu, *BOX*: socket, ...
m.initGranularity()
// Generate map for MetricScope -> scope_id (like socket id) -> responsible id (offset in cpulist)
m.scopeRespTids = m.getResponsiblities()
switch m.config.AccessMode {
case "direct":
C.HPMmode(0)
@@ -336,29 +178,36 @@ func (m *LikwidCollector) Init(config json.RawMessage) error {
globalParams["inverseClock"] = float64(1.0)
// While adding the events, we test the metrics whether they can be computed at all
for i, evset := range m.config.Eventsets {
estr := eventsToEventStr(evset.Events)
// Generate parameter list for the metric computing test
params := make(map[string]interface{})
params["time"] = float64(1.0)
params["inverseClock"] = float64(1.0)
for counter := range evset.Events {
params[counter] = float64(1.0)
}
for _, metric := range evset.Metrics {
// Try to evaluate the metric
_, err := agg.EvalFloat64Condition(metric.Calc, params)
if err != nil {
cclog.ComponentError(m.name, "Calculation for metric", metric.Name, "failed:", err.Error())
continue
var gid C.int
var cstr *C.char
if len(evset.Events) > 0 {
estr := eventsToEventStr(evset.Events)
// Generate parameter list for the metric computing test
params := make(map[string]interface{})
params["time"] = float64(1.0)
params["inverseClock"] = float64(1.0)
for counter := range evset.Events {
params[counter] = float64(1.0)
}
// If the metric is not in the parameter list for the global metrics, add it
if _, ok := globalParams[metric.Name]; !ok {
globalParams[metric.Name] = float64(1.0)
for _, metric := range evset.Metrics {
// Try to evaluate the metric
_, err := agg.EvalFloat64Condition(metric.Calc, params)
if err != nil {
cclog.ComponentError(m.name, "Calculation for metric", metric.Name, "failed:", err.Error())
continue
}
// If the metric is not in the parameter list for the global metrics, add it
if _, ok := globalParams[metric.Name]; !ok {
globalParams[metric.Name] = float64(1.0)
}
}
// Now we add the list of events to likwid
cstr = C.CString(estr)
gid = C.perfmon_addEventSet(cstr)
} else {
cclog.ComponentError(m.name, "Invalid Likwid eventset config, no events given")
continue
}
// Now we add the list of events to likwid
cstr := C.CString(estr)
gid := C.perfmon_addEventSet(cstr)
if gid >= 0 {
m.groups = append(m.groups, gid)
}
@@ -434,15 +283,9 @@ func (m *LikwidCollector) calcEventsetMetrics(group int, interval time.Duration,
// Go over events and get the results
for eidx = 0; int(eidx) < len(evset.Events); eidx++ {
ctr := C.perfmon_getCounterName(gid, eidx)
ev := C.perfmon_getEventName(gid, eidx)
gctr := C.GoString(ctr)
gev := C.GoString(ev)
// MetricScope for the counter (and if needed the event)
scope := getGranularity(gctr, gev)
// Get the map scope-id -> tids
// This way we read less counters like only the responsible hardware thread for a socket
scopemap := m.scopeRespTids[scope]
for _, tid := range scopemap {
for _, tid := range m.cpu2tid {
if tid >= 0 {
m.results[group][tid]["time"] = interval.Seconds()
m.results[group][tid]["inverseClock"] = invClock
@@ -456,7 +299,10 @@ func (m *LikwidCollector) calcEventsetMetrics(group int, interval time.Duration,
for _, metric := range evset.Metrics {
// The metric scope is determined in the Init() function
// Get the map scope-id -> tids
scopemap := m.scopeRespTids[metric.Scope]
scopemap := m.cpu2tid
if metric.Type == "socket" {
scopemap = m.sock2tid
}
for domain, tid := range scopemap {
if tid >= 0 {
value, err := agg.EvalFloat64Condition(metric.Calc, m.results[group][tid])
@@ -474,13 +320,15 @@ func (m *LikwidCollector) calcEventsetMetrics(group int, interval time.Duration,
// Now we have the result, send it with the proper tags
if !math.IsNaN(value) {
if metric.Publish {
tags := map[string]string{"type": metric.Scope.String()}
if metric.Scope != "node" {
tags["type-id"] = fmt.Sprintf("%d", domain)
}
fields := map[string]interface{}{"value": value}
y, err := lp.New(metric.Name, tags, m.meta, fields, time.Now())
y, err := lp.New(metric.Name, map[string]string{"type": metric.Type}, m.meta, fields, time.Now())
if err == nil {
if metric.Type != "node" {
y.AddTag("type-id", fmt.Sprintf("%d", domain))
}
if len(metric.Unit) > 0 {
y.AddMeta("unit", metric.Unit)
}
output <- y
}
}
@@ -495,7 +343,10 @@ func (m *LikwidCollector) calcEventsetMetrics(group int, interval time.Duration,
// Go over the global metrics, derive the value out of the event sets' metric values and send it
func (m *LikwidCollector) calcGlobalMetrics(interval time.Duration, output chan lp.CCMetric) error {
for _, metric := range m.config.Metrics {
scopemap := m.scopeRespTids[metric.Scope]
scopemap := m.cpu2tid
if metric.Type == "socket" {
scopemap = m.sock2tid
}
for domain, tid := range scopemap {
if tid >= 0 {
// Here we generate parameter list
@@ -521,13 +372,16 @@ func (m *LikwidCollector) calcGlobalMetrics(interval time.Duration, output chan
// Now we have the result, send it with the proper tags
if !math.IsNaN(value) {
if metric.Publish {
tags := map[string]string{"type": metric.Scope.String()}
if metric.Scope != "node" {
tags["type-id"] = fmt.Sprintf("%d", domain)
}
tags := map[string]string{"type": metric.Type}
fields := map[string]interface{}{"value": value}
y, err := lp.New(metric.Name, tags, m.meta, fields, time.Now())
if err == nil {
if metric.Type != "node" {
y.AddTag("type-id", fmt.Sprintf("%d", domain))
}
if len(metric.Unit) > 0 {
y.AddMeta("unit", metric.Unit)
}
output <- y
}
}

View File

@@ -4,14 +4,17 @@
The `likwid` collector is probably the most complicated collector. The LIKWID library is included as static library with *direct* access mode. The *direct* access mode is suitable if the daemon is executed by a root user. The static library does not contain the performance groups, so all information needs to be provided in the configuration.
The `likwid` configuration consists of two parts, the "eventsets" and "globalmetrics":
- An event set list itself has two parts, the "events" and a set of derivable "metrics". Each of the "events" is a counter:event pair in LIKWID's syntax. The "metrics" are a list of formulas to derive the metric value from the measurements of the "events". Each metric has a name, the formula, a scope and a publish flag. Counter names can be used like variables in the formulas, so `PMC0+PMC1` sums the measurements for the both events configured in the counters `PMC0` and `PMC1`. The scope tells the Collector whether it is a metric for each hardware thread (`cpu`) or each CPU socket (`socket`). The last one is the publishing flag. It tells the collector whether a metric should be sent to the router.
- The global metrics are metrics which require data from all event set measurements to be derived. The inputs are the metrics in the event sets. Similar to the metrics in the event sets, the global metrics are defined by a name, a formula, a scope and a publish flag. See event set metrics for details. The only difference is that there is no access to the raw event measurements anymore but only to the metrics. So, the idea is to derive a metric in the "eventsets" section and reuse it in the "globalmetrics" part. If you need a metric only for deriving the global metrics, disable forwarding of the event set metrics. **Be aware** that the combination might be misleading because the "behavior" of a metric changes over time and the multiple measurements might count different computing phases.
- An event set list itself has two parts, the "events" and a set of derivable "metrics". Each of the "events" is a counter:event pair in LIKWID's syntax. The "metrics" are a list of formulas to derive the metric value from the measurements of the "events". Each metric has a name, the formula, a scope and a publish flag. Counter names can be used like variables in the formulas, so `PMC0+PMC1` sums the measurements for the both events configured in the counters `PMC0` and `PMC1`. The scope tells the Collector whether it is a metric for each hardware thread (`cpu`) or each CPU socket (`socket`). You may specify a unit for the metric with `unit`. The last one is the publishing flag. It tells the collector whether a metric should be sent to the router.
- The global metrics are metrics which require data from all event set measurements to be derived. The inputs are the metrics in the event sets. Similar to the metrics in the event sets, the global metrics are defined by a name, a formula, a scope and a publish flag. See event set metrics for details. The only difference is that there is no access to the raw event measurements anymore but only to the metrics. So, the idea is to derive a metric in the "eventsets" section and reuse it in the "globalmetrics" part. If you need a metric only for deriving the global metrics, disable forwarding of the event set metrics (`publish=false`). **Be aware** that the combination might be misleading because the "behavior" of a metric changes over time and the multiple measurements might count different computing phases. Similar to the metrics in the eventset, you can specify a metric unit with the `unit` field.
Additional options:
- `access_mode` : Method to use for hardware performance monitoring (`direct` access as root user, `accessdaemon` for the daemon mode)
- `accessdaemon_path`: Folder with the access daemon `likwid-accessD`, commonly `$LIKWID_INSTALL_LOC/sbin`
- `force_overwrite`: Same as setting `LIKWID_FORCE=1`. In case counters are already in-use, LIKWID overwrites their configuration to do its measurements
- `invalid_to_zero`: In some cases, the calculations result in `NaN` or `Inf`. With this option, all `NaN` and `Inf` values are replaces with `0.0`.
- `access_mode`: Specify LIKWID access mode: `direct` for direct register access as root user or `accessdaemon`
- `accessdaemon_path`: Folder of the accessDaemon `likwid-accessD`
- `liblikwid_path`: Location of `liblikwid.so`
### Available metric scopes
@@ -54,7 +57,8 @@ $ scripts/likwid_perfgroup_to_cc_config.py ICX MEM_DP
"calc": "time",
"name": "Runtime (RDTSC) [s]",
"publish": true,
"scope": "hwthread"
"unit": "seconds"
"scope": "cpu"
},
{
"..." : "..."
@@ -104,25 +108,28 @@ $ chwon $CCUSER /var/run/likwid.lock
{
"name": "ipc",
"calc": "PMC0/PMC1",
"scope": "cpu",
"type": "cpu",
"publish": true
},
{
"name": "flops_any",
"calc": "0.000001*PMC2/time",
"scope": "cpu",
"unit": "MFlops/s",
"type": "cpu",
"publish": true
},
{
"name": "clock_mhz",
"name": "clock",
"calc": "0.000001*(FIXC1/FIXC2)/inverseClock",
"scope": "cpu",
"type": "cpu",
"unit": "MHz",
"publish": true
},
{
"name": "mem1",
"calc": "0.000001*(DFC0+DFC1+DFC2+DFC3)*64.0/time",
"scope": "socket",
"unit": "Mbyte/s",
"type": "socket",
"publish": false
}
]
@@ -140,19 +147,22 @@ $ chwon $CCUSER /var/run/likwid.lock
{
"name": "pwr_core",
"calc": "PWR0/time",
"scope": "socket",
"unit": "Watt"
"type": "socket",
"publish": true
},
{
"name": "pwr_pkg",
"calc": "PWR1/time",
"scope": "socket",
"type": "socket",
"unit": "Watt"
"publish": true
},
{
"name": "mem2",
"calc": "0.000001*(DFC0+DFC1+DFC2+DFC3)*64.0/time",
"scope": "socket",
"unit": "Mbyte/s",
"type": "socket",
"publish": false
}
]
@@ -162,7 +172,8 @@ $ chwon $CCUSER /var/run/likwid.lock
{
"name": "mem_bw",
"calc": "mem1+mem2",
"scope": "socket",
"type": "socket",
"unit": "Mbyte/s",
"publish": true
}
]
@@ -198,3 +209,4 @@ IPC PMC0/PMC1 -> {
-> ]
```
The script `scripts/likwid_perfgroup_to_cc_config.py` might help you.

View File

@@ -19,20 +19,31 @@ const LCTL_CMD = `lctl`
const LCTL_OPTION = `get_param`
type LustreCollectorConfig struct {
LCtlCommand string `json:"lctl_command"`
ExcludeMetrics []string `json:"exclude_metrics"`
SendAllMetrics bool `json:"send_all_metrics"`
Sudo bool `json:"use_sudo"`
LCtlCommand string `json:"lctl_command,omitempty"`
ExcludeMetrics []string `json:"exclude_metrics,omitempty"`
Sudo bool `json:"use_sudo,omitempty"`
SendAbsoluteValues bool `json:"send_abs_values,omitempty"`
SendDerivedValues bool `json:"send_derived_values,omitempty"`
SendDiffValues bool `json:"send_diff_values,omitempty"`
}
type LustreMetricDefinition struct {
name string
lineprefix string
lineoffset int
unit string
calc string
}
type LustreCollector struct {
metricCollector
tags map[string]string
matches map[string]map[string]int
stats map[string]map[string]int64
config LustreCollectorConfig
lctl string
sudoCmd string
tags map[string]string
config LustreCollectorConfig
lctl string
sudoCmd string
lastTimestamp time.Time // Store time stamp of last tick to derive bandwidths
definitions []LustreMetricDefinition // Combined list without excluded metrics
stats map[string]map[string]int64 // Data for last value per device and metric
}
func (m *LustreCollector) getDeviceDataCommand(device string) []string {
@@ -75,6 +86,16 @@ func (m *LustreCollector) getDevices() []string {
return devices
}
func getMetricData(lines []string, prefix string, offset int) (int64, error) {
for _, line := range lines {
if strings.HasPrefix(line, prefix) {
lf := strings.Fields(line)
return strconv.ParseInt(lf[offset], 0, 64)
}
}
return 0, errors.New("no such line in data")
}
// //Version reading the stats data of a device from sysfs
// func (m *LustreCollector) getDeviceDataSysfs(device string) []string {
// llitedir := filepath.Join(LUSTRE_SYSFS, "llite")
@@ -87,6 +108,183 @@ func (m *LustreCollector) getDevices() []string {
// return strings.Split(string(buffer), "\n")
// }
var LustreAbsMetrics = []LustreMetricDefinition{
{
name: "lustre_read_requests",
lineprefix: "read_bytes",
lineoffset: 1,
unit: "requests",
calc: "none",
},
{
name: "lustre_write_requests",
lineprefix: "write_bytes",
lineoffset: 1,
unit: "requests",
calc: "none",
},
{
name: "lustre_read_bytes",
lineprefix: "read_bytes",
lineoffset: 6,
unit: "bytes",
calc: "none",
},
{
name: "lustre_write_bytes",
lineprefix: "write_bytes",
lineoffset: 6,
unit: "bytes",
calc: "none",
},
{
name: "lustre_open",
lineprefix: "open",
lineoffset: 1,
unit: "",
calc: "none",
},
{
name: "lustre_close",
lineprefix: "close",
lineoffset: 1,
unit: "",
calc: "none",
},
{
name: "lustre_setattr",
lineprefix: "setattr",
lineoffset: 1,
unit: "",
calc: "none",
},
{
name: "lustre_getattr",
lineprefix: "getattr",
lineoffset: 1,
unit: "",
calc: "none",
},
{
name: "lustre_statfs",
lineprefix: "statfs",
lineoffset: 1,
unit: "",
calc: "none",
},
{
name: "lustre_inode_permission",
lineprefix: "inode_permission",
lineoffset: 1,
unit: "",
calc: "none",
},
}
var LustreDiffMetrics = []LustreMetricDefinition{
{
name: "lustre_read_requests_diff",
lineprefix: "read_bytes",
lineoffset: 1,
unit: "requests",
calc: "difference",
},
{
name: "lustre_write_requests_diff",
lineprefix: "write_bytes",
lineoffset: 1,
unit: "requests",
calc: "difference",
},
{
name: "lustre_read_bytes_diff",
lineprefix: "read_bytes",
lineoffset: 6,
unit: "bytes",
calc: "difference",
},
{
name: "lustre_write_bytes_diff",
lineprefix: "write_bytes",
lineoffset: 6,
unit: "bytes",
calc: "difference",
},
{
name: "lustre_open_diff",
lineprefix: "open",
lineoffset: 1,
unit: "",
calc: "difference",
},
{
name: "lustre_close_diff",
lineprefix: "close",
lineoffset: 1,
unit: "",
calc: "difference",
},
{
name: "lustre_setattr_diff",
lineprefix: "setattr",
lineoffset: 1,
unit: "",
calc: "difference",
},
{
name: "lustre_getattr_diff",
lineprefix: "getattr",
lineoffset: 1,
unit: "",
calc: "difference",
},
{
name: "lustre_statfs_diff",
lineprefix: "statfs",
lineoffset: 1,
unit: "",
calc: "difference",
},
{
name: "lustre_inode_permission_diff",
lineprefix: "inode_permission",
lineoffset: 1,
unit: "",
calc: "difference",
},
}
var LustreDeriveMetrics = []LustreMetricDefinition{
{
name: "lustre_read_requests_rate",
lineprefix: "read_bytes",
lineoffset: 1,
unit: "requests/sec",
calc: "derivative",
},
{
name: "lustre_write_requests_rate",
lineprefix: "write_bytes",
lineoffset: 1,
unit: "requests/sec",
calc: "derivative",
},
{
name: "lustre_read_bw",
lineprefix: "read_bytes",
lineoffset: 6,
unit: "bytes/sec",
calc: "derivative",
},
{
name: "lustre_write_bw",
lineprefix: "write_bytes",
lineoffset: 6,
unit: "bytes/sec",
calc: "derivative",
},
}
func (m *LustreCollector) Init(config json.RawMessage) error {
var err error
m.name = "LustreCollector"
@@ -99,17 +297,9 @@ func (m *LustreCollector) Init(config json.RawMessage) error {
m.setup()
m.tags = map[string]string{"type": "node"}
m.meta = map[string]string{"source": m.name, "group": "Lustre"}
defmatches := map[string]map[string]int{
"read_bytes": {"lustre_read_bytes": 6, "lustre_read_requests": 1},
"write_bytes": {"lustre_write_bytes": 6, "lustre_write_requests": 1},
"open": {"lustre_open": 1},
"close": {"lustre_close": 1},
"setattr": {"lustre_setattr": 1},
"getattr": {"lustre_getattr": 1},
"statfs": {"lustre_statfs": 1},
"inode_permission": {"lustre_inode_permission": 1}}
// Lustre file system statistics can only be queried by user root
// or with password-less sudo
if !m.config.Sudo {
user, err := user.Current()
if err != nil {
@@ -120,23 +310,15 @@ func (m *LustreCollector) Init(config json.RawMessage) error {
cclog.ComponentError(m.name, "Lustre file system statistics can only be queried by user root")
return err
}
} else {
p, err := exec.LookPath("sudo")
if err != nil {
cclog.ComponentError(m.name, "Cannot find 'sudo'")
return err
}
m.sudoCmd = p
}
m.matches = make(map[string]map[string]int)
for lineprefix, names := range defmatches {
for metricname, offset := range names {
_, skip := stringArrayContains(m.config.ExcludeMetrics, metricname)
if skip {
continue
}
if _, prefixExist := m.matches[lineprefix]; !prefixExist {
m.matches[lineprefix] = make(map[string]int)
}
if _, metricExist := m.matches[lineprefix][metricname]; !metricExist {
m.matches[lineprefix][metricname] = offset
}
}
}
p, err := exec.LookPath(m.config.LCtlCommand)
if err != nil {
p, err = exec.LookPath(LCTL_CMD)
@@ -145,26 +327,51 @@ func (m *LustreCollector) Init(config json.RawMessage) error {
}
}
m.lctl = p
if m.config.Sudo {
p, err := exec.LookPath("sudo")
if err != nil {
m.sudoCmd = p
m.definitions = []LustreMetricDefinition{}
if m.config.SendAbsoluteValues {
for _, def := range LustreAbsMetrics {
if _, skip := stringArrayContains(m.config.ExcludeMetrics, def.name); !skip {
m.definitions = append(m.definitions, def)
}
}
}
if m.config.SendDiffValues {
for _, def := range LustreDiffMetrics {
if _, skip := stringArrayContains(m.config.ExcludeMetrics, def.name); !skip {
m.definitions = append(m.definitions, def)
}
}
}
if m.config.SendDerivedValues {
for _, def := range LustreDeriveMetrics {
if _, skip := stringArrayContains(m.config.ExcludeMetrics, def.name); !skip {
m.definitions = append(m.definitions, def)
}
}
}
if len(m.definitions) == 0 {
return errors.New("no metrics to collect")
}
devices := m.getDevices()
if len(devices) == 0 {
return errors.New("no metrics to collect")
return errors.New("no Lustre devices found")
}
m.stats = make(map[string]map[string]int64)
for _, d := range devices {
m.stats[d] = make(map[string]int64)
for _, names := range m.matches {
for metricname := range names {
m.stats[d][metricname] = 0
data := m.getDeviceDataCommand(d)
for _, def := range m.definitions {
x, err := getMetricData(data, def.lineprefix, def.lineoffset)
if err == nil {
m.stats[d][def.name] = x
} else {
m.stats[d][def.name] = 0
}
}
}
m.lastTimestamp = time.Now()
m.init = true
return nil
}
@@ -173,54 +380,49 @@ func (m *LustreCollector) Read(interval time.Duration, output chan lp.CCMetric)
if !m.init {
return
}
now := time.Now()
tdiff := now.Sub(m.lastTimestamp)
for device, devData := range m.stats {
stats := m.getDeviceDataCommand(device)
processed := []string{}
for _, line := range stats {
lf := strings.Fields(line)
if len(lf) > 1 {
if fields, ok := m.matches[lf[0]]; ok {
for name, idx := range fields {
x, err := strconv.ParseInt(lf[idx], 0, 64)
if err != nil {
continue
}
value := x - devData[name]
devData[name] = x
if value < 0 {
value = 0
}
y, err := lp.New(name, m.tags, m.meta, map[string]interface{}{"value": value}, time.Now())
if err == nil {
y.AddTag("device", device)
if strings.Contains(name, "byte") {
y.AddMeta("unit", "Byte")
}
output <- y
if m.config.SendAllMetrics {
processed = append(processed, name)
}
}
}
}
data := m.getDeviceDataCommand(device)
for _, def := range m.definitions {
var use_x int64
var err error
var y lp.CCMetric
x, err := getMetricData(data, def.lineprefix, def.lineoffset)
if err == nil {
use_x = x
} else {
use_x = devData[def.name]
}
}
if m.config.SendAllMetrics {
for name := range devData {
if _, done := stringArrayContains(processed, name); !done {
y, err := lp.New(name, m.tags, m.meta, map[string]interface{}{"value": 0}, time.Now())
if err == nil {
y.AddTag("device", device)
if strings.Contains(name, "byte") {
y.AddMeta("unit", "Byte")
}
output <- y
}
var value interface{}
switch def.calc {
case "none":
value = use_x
y, err = lp.New(def.name, m.tags, m.meta, map[string]interface{}{"value": value}, time.Now())
case "difference":
value = use_x - devData[def.name]
if value.(int64) < 0 {
value = 0
}
y, err = lp.New(def.name, m.tags, m.meta, map[string]interface{}{"value": value}, time.Now())
case "derivative":
value = float64(use_x-devData[def.name]) / tdiff.Seconds()
if value.(float64) < 0 {
value = 0
}
y, err = lp.New(def.name, m.tags, m.meta, map[string]interface{}{"value": value}, time.Now())
}
if err == nil {
y.AddTag("device", device)
if len(def.unit) > 0 {
y.AddMeta("unit", def.unit)
}
output <- y
}
devData[def.name] = use_x
}
}
m.lastTimestamp = now
}
func (m *LustreCollector) Close() {

View File

@@ -3,27 +3,44 @@
```json
"lustrestat": {
"procfiles" : [
"/proc/fs/lustre/llite/lnec-XXXXXX/stats"
],
"lctl_command": "/path/to/lctl",
"exclude_metrics": [
"setattr",
"getattr"
]
],
"send_abs_values" : true,
"send_derived_values" : true,
"send_diff_values": true,
"use_sudo": false
}
```
The `lustrestat` collector reads from the procfs stat files for Lustre like `/proc/fs/lustre/llite/lnec-XXXXXX/stats`.
The `lustrestat` collector uses the `lctl` application with the `get_param` option to get all `llite` metrics (Lustre client). The `llite` metrics are only available for root users. If password-less sudo is configured, you can enable `sudo` in the configuration.
Metrics:
* `read_bytes`
* `read_requests`
* `write_bytes`
* `write_requests`
* `open`
* `close`
* `getattr`
* `setattr`
* `statfs`
* `inode_permission`
* `lustre_read_bytes` (unit `bytes`)
* `lustre_read_requests` (unit `requests`)
* `lustre_write_bytes` (unit `bytes`)
* `lustre_write_requests` (unit `requests`)
* `lustre_open`
* `lustre_close`
* `lustre_getattr`
* `lustre_setattr`
* `lustre_statfs`
* `lustre_inode_permission`
* `lustre_read_bw` (if `send_derived_values == true`, unit `bytes/sec`)
* `lustre_write_bw` (if `send_derived_values == true`, unit `bytes/sec`)
* `lustre_read_requests_rate` (if `send_derived_values == true`, unit `requests/sec`)
* `lustre_write_requests_rate` (if `send_derived_values == true`, unit `requests/sec`)
* `lustre_read_bytes_diff` (if `send_diff_values == true`, unit `bytes`)
* `lustre_read_requests_diff` (if `send_diff_values == true`, unit `requests`)
* `lustre_write_bytes_diff` (if `send_diff_values == true`, unit `bytes`)
* `lustre_write_requests_diff` (if `send_diff_values == true`, unit `requests`)
* `lustre_open_diff` (if `send_diff_values == true`)
* `lustre_close_diff` (if `send_diff_values == true`)
* `lustre_getattr_diff` (if `send_diff_values == true`)
* `lustre_setattr_diff` (if `send_diff_values == true`)
* `lustre_statfs_diff` (if `send_diff_values == true`)
* `lustre_inode_permission_diff` (if `send_diff_values == true`)
This collector adds an `device` tag.

View File

@@ -32,11 +32,12 @@ type MemstatCollectorNode struct {
type MemstatCollector struct {
metricCollector
stats map[string]int64
tags map[string]string
matches map[string]string
config MemstatCollectorConfig
nodefiles map[int]MemstatCollectorNode
stats map[string]int64
tags map[string]string
matches map[string]string
config MemstatCollectorConfig
nodefiles map[int]MemstatCollectorNode
sendMemUsed bool
}
func getStats(filename string) map[string]float64 {
@@ -77,7 +78,7 @@ func (m *MemstatCollector) Init(config json.RawMessage) error {
return err
}
}
m.meta = map[string]string{"source": m.name, "group": "Memory", "unit": "kByte"}
m.meta = map[string]string{"source": m.name, "group": "Memory", "unit": "GByte"}
m.stats = make(map[string]int64)
m.matches = make(map[string]string)
m.tags = map[string]string{"type": "node"}
@@ -99,6 +100,10 @@ func (m *MemstatCollector) Init(config json.RawMessage) error {
m.matches[k] = v
}
}
m.sendMemUsed = false
if _, skip := stringArrayContains(m.config.ExcludeMetrics, "mem_used"); !skip {
m.sendMemUsed = true
}
if len(m.matches) == 0 {
return errors.New("no metrics to collect")
}
@@ -152,23 +157,26 @@ func (m *MemstatCollector) Read(interval time.Duration, output chan lp.CCMetric)
if v, ok := stats[match]; ok {
value = v
}
y, err := lp.New(name, tags, m.meta, map[string]interface{}{"value": value}, time.Now())
y, err := lp.New(name, tags, m.meta, map[string]interface{}{"value": value * 1e-6}, time.Now())
if err == nil {
output <- y
}
}
if _, skip := stringArrayContains(m.config.ExcludeMetrics, "mem_used"); !skip {
if freeVal, free := stats["MemFree"]; free {
if bufVal, buffers := stats["Buffers"]; buffers {
if cacheVal, cached := stats["Cached"]; cached {
memUsed := stats["MemTotal"] - (freeVal + bufVal + cacheVal)
y, err := lp.New("mem_used", tags, m.meta, map[string]interface{}{"value": memUsed}, time.Now())
if err == nil {
output <- y
if m.sendMemUsed {
memUsed := 0.0
if totalVal, total := stats["MemTotal"]; total {
if freeVal, free := stats["MemFree"]; free {
if bufVal, buffers := stats["Buffers"]; buffers {
if cacheVal, cached := stats["Cached"]; cached {
memUsed = totalVal - (freeVal + bufVal + cacheVal)
}
}
}
}
y, err := lp.New("mem_used", tags, m.meta, map[string]interface{}{"value": memUsed * 1e-6}, time.Now())
if err == nil {
output <- y
}
}
}

View File

@@ -125,5 +125,5 @@ func RemoveFromStringList(s []string, r string) ([]string, error) {
return append(s[:i], s[i+1:]...), nil
}
}
return s, fmt.Errorf("No such string in list")
return s, fmt.Errorf("no such string in list")
}

View File

@@ -13,22 +13,27 @@ import (
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
)
const NETSTATFILE = `/proc/net/dev`
const NETSTATFILE = "/proc/net/dev"
type NetstatCollectorConfig struct {
IncludeDevices []string `json:"include_devices"`
IncludeDevices []string `json:"include_devices"`
SendAbsoluteValues bool `json:"send_abs_values"`
SendDerivedValues bool `json:"send_derived_values"`
}
type NetstatCollectorMetric struct {
index int
lastValue float64
name string
index int
tags map[string]string
meta map[string]string
meta_rates map[string]string
lastValue int64
}
type NetstatCollector struct {
metricCollector
config NetstatCollectorConfig
matches map[string]map[string]NetstatCollectorMetric
devtags map[string]map[string]string
matches map[string][]NetstatCollectorMetric
lastTimestamp time.Time
}
@@ -36,15 +41,33 @@ func (m *NetstatCollector) Init(config json.RawMessage) error {
m.name = "NetstatCollector"
m.setup()
m.lastTimestamp = time.Now()
m.meta = map[string]string{"source": m.name, "group": "Network"}
m.devtags = make(map[string]map[string]string)
nameIndexMap := map[string]int{
"net_bytes_in": 1,
"net_pkts_in": 2,
"net_bytes_out": 9,
"net_pkts_out": 10,
}
m.matches = make(map[string]map[string]NetstatCollectorMetric)
const (
fieldInterface = iota
fieldReceiveBytes
fieldReceivePackets
fieldReceiveErrs
fieldReceiveDrop
fieldReceiveFifo
fieldReceiveFrame
fieldReceiveCompressed
fieldReceiveMulticast
fieldTransmitBytes
fieldTransmitPackets
fieldTransmitErrs
fieldTransmitDrop
fieldTransmitFifo
fieldTransmitColls
fieldTransmitCarrier
fieldTransmitCompressed
)
m.matches = make(map[string][]NetstatCollectorMetric)
// Set default configuration,
m.config.SendAbsoluteValues = true
m.config.SendDerivedValues = false
// Read configuration file, allow overwriting default config
if len(config) > 0 {
err := json.Unmarshal(config, &m.config)
if err != nil {
@@ -52,7 +75,9 @@ func (m *NetstatCollector) Init(config json.RawMessage) error {
return err
}
}
file, err := os.Open(string(NETSTATFILE))
// Check access to net statistic file
file, err := os.Open(NETSTATFILE)
if err != nil {
cclog.ComponentError(m.name, err.Error())
return err
@@ -62,23 +87,65 @@ func (m *NetstatCollector) Init(config json.RawMessage) error {
scanner := bufio.NewScanner(file)
for scanner.Scan() {
l := scanner.Text()
// Skip lines with no net device entry
if !strings.Contains(l, ":") {
continue
}
// Split line into fields
f := strings.Fields(l)
// Get net device entry
dev := strings.Trim(f[0], ": ")
// Check if device is a included device
if _, ok := stringArrayContains(m.config.IncludeDevices, dev); ok {
m.matches[dev] = make(map[string]NetstatCollectorMetric)
for name, idx := range nameIndexMap {
m.matches[dev][name] = NetstatCollectorMetric{
index: idx,
lastValue: 0,
}
tags := map[string]string{"device": dev, "type": "node"}
meta_unit_byte := map[string]string{"source": m.name, "group": "Network", "unit": "bytes"}
meta_unit_byte_per_sec := map[string]string{"source": m.name, "group": "Network", "unit": "bytes/sec"}
meta_unit_pkts := map[string]string{"source": m.name, "group": "Network", "unit": "packets"}
meta_unit_pkts_per_sec := map[string]string{"source": m.name, "group": "Network", "unit": "packets/sec"}
m.matches[dev] = []NetstatCollectorMetric{
{
name: "net_bytes_in",
index: fieldReceiveBytes,
lastValue: -1,
tags: tags,
meta: meta_unit_byte,
meta_rates: meta_unit_byte_per_sec,
},
{
name: "net_pkts_in",
index: fieldReceivePackets,
lastValue: -1,
tags: tags,
meta: meta_unit_pkts,
meta_rates: meta_unit_pkts_per_sec,
},
{
name: "net_bytes_out",
index: fieldTransmitBytes,
lastValue: -1,
tags: tags,
meta: meta_unit_byte,
meta_rates: meta_unit_byte_per_sec,
},
{
name: "net_pkts_out",
index: fieldTransmitPackets,
lastValue: -1,
tags: tags,
meta: meta_unit_pkts,
meta_rates: meta_unit_pkts_per_sec,
},
}
m.devtags[dev] = map[string]string{"device": dev, "type": "node"}
}
}
if len(m.devtags) == 0 {
if len(m.matches) == 0 {
return errors.New("no devices to collector metrics found")
}
m.init = true
@@ -89,50 +156,62 @@ func (m *NetstatCollector) Read(interval time.Duration, output chan lp.CCMetric)
if !m.init {
return
}
// Current time stamp
now := time.Now()
// time difference to last time stamp
timeDiff := now.Sub(m.lastTimestamp).Seconds()
// Save current timestamp
m.lastTimestamp = now
file, err := os.Open(string(NETSTATFILE))
if err != nil {
cclog.ComponentError(m.name, err.Error())
return
}
defer file.Close()
tdiff := now.Sub(m.lastTimestamp)
scanner := bufio.NewScanner(file)
for scanner.Scan() {
l := scanner.Text()
// Skip lines with no net device entry
if !strings.Contains(l, ":") {
continue
}
// Split line into fields
f := strings.Fields(l)
// Get net device entry
dev := strings.Trim(f[0], ":")
// Check if device is a included device
if devmetrics, ok := m.matches[dev]; ok {
for name, data := range devmetrics {
v, err := strconv.ParseFloat(f[data.index], 64)
if err == nil {
vdiff := v - data.lastValue
value := vdiff / tdiff.Seconds()
if data.lastValue == 0 {
value = 0
}
data.lastValue = v
y, err := lp.New(name, m.devtags[dev], m.meta, map[string]interface{}{"value": value}, now)
if err == nil {
switch {
case strings.Contains(name, "byte"):
y.AddMeta("unit", "bytes/sec")
case strings.Contains(name, "pkt"):
y.AddMeta("unit", "packets/sec")
}
for i := range devmetrics {
metric := &devmetrics[i]
// Read value
v, err := strconv.ParseInt(f[metric.index], 10, 64)
if err != nil {
continue
}
if m.config.SendAbsoluteValues {
if y, err := lp.New(metric.name, metric.tags, metric.meta, map[string]interface{}{"value": v}, now); err == nil {
output <- y
}
devmetrics[name] = data
}
if m.config.SendDerivedValues {
if metric.lastValue >= 0 {
rate := float64(v-metric.lastValue) / timeDiff
if y, err := lp.New(metric.name+"_bw", metric.tags, metric.meta_rates, map[string]interface{}{"value": rate}, now); err == nil {
output <- y
}
}
metric.lastValue = v
}
}
}
}
m.lastTimestamp = time.Now()
}
func (m *NetstatCollector) Close() {

View File

@@ -5,17 +5,23 @@
"netstat": {
"include_devices": [
"eth0"
]
],
"send_abs_values" : true,
"send_derived_values" : true
}
```
The `netstat` collector reads data from `/proc/net/dev` and outputs a handful **node** metrics. With the `include_devices` list you can specify which network devices should be measured. **Note**: Most other collectors use an _exclude_ list instead of an include list.
Metrics:
* `net_bytes_in` (`unit=bytes/sec`)
* `net_bytes_out` (`unit=bytes/sec`)
* `net_pkts_in` (`unit=packets/sec`)
* `net_pkts_out` (`unit=packets/sec`)
* `net_bytes_in` (`unit=bytes`)
* `net_bytes_out` (`unit=bytes`)
* `net_pkts_in` (`unit=packets`)
* `net_pkts_out` (`unit=packets`)
* `net_bytes_in_bw` (`unit=bytes/sec` if `send_derived_values == true`)
* `net_bytes_out_bw` (`unit=bytes/sec` if `send_derived_values == true`)
* `net_pkts_in_bw` (`unit=packets/sec` if `send_derived_values == true`)
* `net_pkts_out_bw` (`unit=packets/sec` if `send_derived_values == true`)
The device name is added as tag `device`.

View File

@@ -70,10 +70,10 @@ func (m *TempCollector) Init(config json.RawMessage) error {
globPattern := filepath.Join("/sys/class/hwmon", "*", "temp*_input")
inputFiles, err := filepath.Glob(globPattern)
if err != nil {
return fmt.Errorf("Unable to glob files with pattern '%s': %v", globPattern, err)
return fmt.Errorf("unable to glob files with pattern '%s': %v", globPattern, err)
}
if inputFiles == nil {
return fmt.Errorf("Unable to find any files with pattern '%s'", globPattern)
return fmt.Errorf("unable to find any files with pattern '%s'", globPattern)
}
// Get sensor name for each temperature sensor file
@@ -158,7 +158,7 @@ func (m *TempCollector) Init(config json.RawMessage) error {
// Empty sensors map
if len(m.sensors) == 0 {
return fmt.Errorf("No temperature sensors found")
return fmt.Errorf("no temperature sensors found")
}
// Finished initialization

View File

@@ -39,14 +39,14 @@ func (m *TopProcsCollector) Init(config json.RawMessage) error {
m.config.Num_procs = int(DEFAULT_NUM_PROCS)
}
if m.config.Num_procs <= 0 || m.config.Num_procs > MAX_NUM_PROCS {
return errors.New(fmt.Sprintf("num_procs option must be set in 'topprocs' config (range: 1-%d)", MAX_NUM_PROCS))
return fmt.Errorf("num_procs option must be set in 'topprocs' config (range: 1-%d)", MAX_NUM_PROCS)
}
m.setup()
command := exec.Command("ps", "-Ao", "comm", "--sort=-pcpu")
command.Wait()
_, err = command.Output()
if err != nil {
return errors.New("Failed to execute command")
return errors.New("failed to execute command")
}
m.init = true
return nil