mirror of
https://github.com/ClusterCockpit/cc-metric-collector.git
synced 2025-07-23 05:11:40 +02:00
Merge develop
branch into main
(#96)
* InfiniBandCollector: Scale raw readings from octets to bytes * Fix clock frequency coming from LikwidCollector and update docs * Build DEB package for Ubuntu 20.04 for releases * Fix memstat collector with numa_stats option * Remove useless prints from MemstatCollector * Replace ioutils with os and io (#87) * Use lower case for error strings in RocmSmiCollector * move maybe-usable-by-other-cc-components to pkg. Fix all files to use the new paths (#88) * Add collector for monitoring the execution of cc-metric-collector itself (#81) * Add collector to monitor execution of cc-metric-collector itself * Register SelfCollector * Fix import paths for moved packages * Check if at least one CPU with frequency information was detected * Correct type: /proc/stats -> /proc/stat * Update README.md * Run ipmitool asynchron. Improved error handling. * Corrected some typos * Add running average power limit (RAPL) metric collector * Add running average power limit (RAPL) metric collector * Do not mess up with the orignal configuration * * Corrected json config in numastatsMetric.md * Added some debug output to numastatsMetric.go * Fixed computing number of physical packages for non continous physical package IDs (e.g. on Ampere Altra Q80-30) * Fix kernel panic for receiver config with missing receiver type * Add receiver to gather remote IPMI sensor metrics * Added config option to add ipmi-sensors command line options * Add documentaion for IPMI receiver * Update to latest version of included go modules * Add go.mod to App dependency * Try to use common metric tags across hardware vendors * Add IPMI metric: current * remove prefix enumeration like 01-... * Add IPMI receiver example configuration to receivers.json * Minimal formating changes * Add hostlist package * Added tests for hostlist Expand() * Use package hostlist to expand a host list * Use package hostlist to expand a host list * Some servers return "ConsumedPowerWatt":65535 instead of "ConsumedPowerWatt":null * Updated to latest package versions * Do not allow unknown fields in JSON configuration file * Add workflow to customize packages to docs * NFS I/O Stats Collector (#91) * Initial version * Delete values for vanished mount points and comments * Fix for Likwid collector (#95) * Run LIKWID in separate thread and check metric type * Change LIKWID collector documentation to use 'type' instead of 'scope' * Re-initialize LIKWID after one read is missing due to lock toggle * Register cc-metric-collector at Zenodo (#93) * Add initial version of Zenodo project file * Orcid ID added * Update .zenodo.json Co-authored-by: Holger Obermaier <holger.obermaier@kit.edu> * Update ipmiMetric.go Co-authored-by: Holger Obermaier <40787752+ho-ob@users.noreply.github.com> Co-authored-by: Holger Obermaier <Holger.Obermaier@kit.edu>
This commit is contained in:
@@ -51,7 +51,7 @@ A collector reads data from any source, parses it to metrics and submits these m
|
||||
* `Name() string`: Return the name of the collector
|
||||
* `Init(config json.RawMessage) error`: Initializes the collector using the given collector-specific config in JSON. Check if needed files/commands exists, ...
|
||||
* `Initialized() bool`: Check if a collector is successfully initialized
|
||||
* `Read(duration time.Duration, output chan ccMetric.CCMetric)`: Read, parse and submit data to the `output` channel as [`CCMetric`](../internal/ccMetric/README.md). If the collector has to measure anything for some duration, use the provided function argument `duration`.
|
||||
* `Read(duration time.Duration, output chan ccMetric.CCMetric)`: Read, parse and submit data to the `output` channel as [`CCMetric`](../internal/ccMetric/README.md). If the collector has to measure anything for some duration, use the provided function argument `duration`.
|
||||
* `Close()`: Closes down the collector.
|
||||
|
||||
It is recommanded to call `setup()` in the `Init()` function.
|
||||
|
@@ -36,6 +36,7 @@ var AvailableCollectors = map[string]MetricCollector{
|
||||
"numastats": new(NUMAStatsCollector),
|
||||
"beegfs_meta": new(BeegfsMetaCollector),
|
||||
"beegfs_storage": new(BeegfsStorageCollector),
|
||||
"rapl": new(RAPLCollector),
|
||||
"rocm_smi": new(RocmSmiCollector),
|
||||
"self": new(SelfCollector),
|
||||
"schedstat": new(SchedstatCollector),
|
||||
|
@@ -142,6 +142,11 @@ func (m *CPUFreqCpuInfoCollector) Init(config json.RawMessage) error {
|
||||
}
|
||||
}
|
||||
|
||||
// Check if at least one CPU with frequency information was detected
|
||||
if len(m.topology) == 0 {
|
||||
return fmt.Errorf("No CPU frequency info found in %s", cpuInfoFile)
|
||||
}
|
||||
|
||||
numPhysicalPackageID_int := maxPhysicalPackageID + 1
|
||||
numPhysicalPackageID := fmt.Sprint(numPhysicalPackageID_int)
|
||||
numNonHT := fmt.Sprint(numNonHT_int)
|
||||
|
@@ -23,20 +23,18 @@ type CPUFreqCollectorTopology struct {
|
||||
numPhysicalPackages string // number of sockets / packages
|
||||
numPhysicalPackages_int int64
|
||||
isHT bool
|
||||
numNonHT string // number of non hyperthreading processors
|
||||
numNonHT string // number of non hyper-threading processors
|
||||
numNonHT_int int64
|
||||
scalingCurFreqFile string
|
||||
tagSet map[string]string
|
||||
}
|
||||
|
||||
//
|
||||
// CPUFreqCollector
|
||||
// a metric collector to measure the current frequency of the CPUs
|
||||
// as obtained from the hardware (in KHz)
|
||||
// Only measure on the first hyper thread
|
||||
// Only measure on the first hyper-thread
|
||||
//
|
||||
// See: https://www.kernel.org/doc/html/latest/admin-guide/pm/cpufreq.html
|
||||
//
|
||||
type CPUFreqCollector struct {
|
||||
metricCollector
|
||||
topology []CPUFreqCollectorTopology
|
||||
@@ -126,7 +124,7 @@ func (m *CPUFreqCollector) Init(config json.RawMessage) error {
|
||||
t.scalingCurFreqFile = scalingCurFreqFile
|
||||
}
|
||||
|
||||
// is processor a hyperthread?
|
||||
// is processor a hyper-thread?
|
||||
coreSeenBefore := make(map[string]bool)
|
||||
for i := range m.topology {
|
||||
t := &m.topology[i]
|
||||
@@ -136,23 +134,20 @@ func (m *CPUFreqCollector) Init(config json.RawMessage) error {
|
||||
coreSeenBefore[globalID] = true
|
||||
}
|
||||
|
||||
// number of non hyper thread cores and packages / sockets
|
||||
// number of non hyper-thread cores and packages / sockets
|
||||
var numNonHT_int int64 = 0
|
||||
var maxPhysicalPackageID int64 = 0
|
||||
PhysicalPackageIDs := make(map[int64]struct{})
|
||||
for i := range m.topology {
|
||||
t := &m.topology[i]
|
||||
|
||||
// Update maxPackageID
|
||||
if t.physicalPackageID_int > maxPhysicalPackageID {
|
||||
maxPhysicalPackageID = t.physicalPackageID_int
|
||||
}
|
||||
|
||||
if !t.isHT {
|
||||
numNonHT_int++
|
||||
}
|
||||
|
||||
PhysicalPackageIDs[t.physicalPackageID_int] = struct{}{}
|
||||
}
|
||||
|
||||
numPhysicalPackageID_int := maxPhysicalPackageID + 1
|
||||
numPhysicalPackageID_int := int64(len(PhysicalPackageIDs))
|
||||
numPhysicalPackageID := fmt.Sprint(numPhysicalPackageID_int)
|
||||
numNonHT := fmt.Sprint(numNonHT_int)
|
||||
for i := range m.topology {
|
||||
@@ -168,6 +163,13 @@ func (m *CPUFreqCollector) Init(config json.RawMessage) error {
|
||||
}
|
||||
}
|
||||
|
||||
// Initialized
|
||||
cclog.ComponentDebug(
|
||||
m.name,
|
||||
"initialized",
|
||||
numPhysicalPackageID_int, "physical packages,",
|
||||
len(cpuDirs), "CPUs,",
|
||||
numNonHT, "non-hyper-threading CPUs")
|
||||
m.init = true
|
||||
return nil
|
||||
}
|
||||
@@ -182,7 +184,7 @@ func (m *CPUFreqCollector) Read(interval time.Duration, output chan lp.CCMetric)
|
||||
for i := range m.topology {
|
||||
t := &m.topology[i]
|
||||
|
||||
// skip hyperthreads
|
||||
// skip hyper-threads
|
||||
if t.isHT {
|
||||
continue
|
||||
}
|
||||
|
@@ -1,4 +1,5 @@
|
||||
## `cpufreq_cpuinfo` collector
|
||||
|
||||
```json
|
||||
"cpufreq": {
|
||||
"exclude_metrics": []
|
||||
@@ -8,4 +9,5 @@
|
||||
The `cpufreq` collector reads the clock frequency from `/sys/devices/system/cpu/cpu*/cpufreq` and outputs a handful **hwthread** metrics.
|
||||
|
||||
Metrics:
|
||||
* `cpufreq`
|
||||
|
||||
* `cpufreq`
|
||||
|
@@ -1,5 +1,6 @@
|
||||
|
||||
## `cpustat` collector
|
||||
|
||||
```json
|
||||
"cpustat": {
|
||||
"exclude_metrics": [
|
||||
@@ -8,9 +9,10 @@
|
||||
}
|
||||
```
|
||||
|
||||
The `cpustat` collector reads data from `/proc/stats` and outputs a handful **node** and **hwthread** metrics. If a metric is not required, it can be excluded from forwarding it to the sink.
|
||||
The `cpustat` collector reads data from `/proc/stat` and outputs a handful **node** and **hwthread** metrics. If a metric is not required, it can be excluded from forwarding it to the sink.
|
||||
|
||||
Metrics:
|
||||
|
||||
* `cpu_user`
|
||||
* `cpu_nice`
|
||||
* `cpu_system`
|
||||
|
@@ -1,51 +1,57 @@
|
||||
package collectors
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"os"
|
||||
"os/exec"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
|
||||
lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
|
||||
)
|
||||
|
||||
const IPMITOOL_PATH = `ipmitool`
|
||||
const IPMISENSORS_PATH = `ipmi-sensors`
|
||||
|
||||
type IpmiCollectorConfig struct {
|
||||
ExcludeDevices []string `json:"exclude_devices"`
|
||||
IpmitoolPath string `json:"ipmitool_path"`
|
||||
IpmisensorsPath string `json:"ipmisensors_path"`
|
||||
}
|
||||
|
||||
type IpmiCollector struct {
|
||||
metricCollector
|
||||
//tags map[string]string
|
||||
//matches map[string]string
|
||||
config IpmiCollectorConfig
|
||||
config struct {
|
||||
ExcludeDevices []string `json:"exclude_devices"`
|
||||
IpmitoolPath string `json:"ipmitool_path"`
|
||||
IpmisensorsPath string `json:"ipmisensors_path"`
|
||||
}
|
||||
ipmitool string
|
||||
ipmisensors string
|
||||
}
|
||||
|
||||
func (m *IpmiCollector) Init(config json.RawMessage) error {
|
||||
// Check if already initialized
|
||||
if m.init {
|
||||
return nil
|
||||
}
|
||||
|
||||
m.name = "IpmiCollector"
|
||||
m.setup()
|
||||
m.parallel = true
|
||||
m.meta = map[string]string{"source": m.name, "group": "IPMI"}
|
||||
m.config.IpmitoolPath = string(IPMITOOL_PATH)
|
||||
m.config.IpmisensorsPath = string(IPMISENSORS_PATH)
|
||||
m.ipmitool = ""
|
||||
m.ipmisensors = ""
|
||||
m.meta = map[string]string{
|
||||
"source": m.name,
|
||||
"group": "IPMI",
|
||||
}
|
||||
// default path to IPMI tools
|
||||
m.config.IpmitoolPath = "ipmitool"
|
||||
m.config.IpmisensorsPath = "ipmi-sensors"
|
||||
if len(config) > 0 {
|
||||
err := json.Unmarshal(config, &m.config)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
// Check if executables ipmitool or ipmisensors are found
|
||||
p, err := exec.LookPath(m.config.IpmitoolPath)
|
||||
if err == nil {
|
||||
m.ipmitool = p
|
||||
@@ -62,25 +68,33 @@ func (m *IpmiCollector) Init(config json.RawMessage) error {
|
||||
}
|
||||
|
||||
func (m *IpmiCollector) readIpmiTool(cmd string, output chan lp.CCMetric) {
|
||||
|
||||
// Setup ipmitool command
|
||||
command := exec.Command(cmd, "sensor")
|
||||
command.Wait()
|
||||
stdout, err := command.Output()
|
||||
if err != nil {
|
||||
log.Print(err)
|
||||
stdout, _ := command.StdoutPipe()
|
||||
errBuf := new(bytes.Buffer)
|
||||
command.Stderr = errBuf
|
||||
|
||||
// start command
|
||||
if err := command.Start(); err != nil {
|
||||
cclog.ComponentError(
|
||||
m.name,
|
||||
fmt.Sprintf("readIpmiTool(): Failed to start command \"%s\": %v", command.String(), err),
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
ll := strings.Split(string(stdout), "\n")
|
||||
|
||||
for _, line := range ll {
|
||||
lv := strings.Split(line, "|")
|
||||
// Read command output
|
||||
scanner := bufio.NewScanner(stdout)
|
||||
for scanner.Scan() {
|
||||
lv := strings.Split(scanner.Text(), "|")
|
||||
if len(lv) < 3 {
|
||||
continue
|
||||
}
|
||||
v, err := strconv.ParseFloat(strings.Trim(lv[1], " "), 64)
|
||||
v, err := strconv.ParseFloat(strings.TrimSpace(lv[1]), 64)
|
||||
if err == nil {
|
||||
name := strings.ToLower(strings.Replace(strings.Trim(lv[0], " "), " ", "_", -1))
|
||||
unit := strings.Trim(lv[2], " ")
|
||||
name := strings.ToLower(strings.Replace(strings.TrimSpace(lv[0]), " ", "_", -1))
|
||||
unit := strings.TrimSpace(lv[2])
|
||||
if unit == "Volts" {
|
||||
unit = "Volts"
|
||||
} else if unit == "degrees C" {
|
||||
@@ -98,6 +112,17 @@ func (m *IpmiCollector) readIpmiTool(cmd string, output chan lp.CCMetric) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Wait for command end
|
||||
if err := command.Wait(); err != nil {
|
||||
errMsg, _ := io.ReadAll(errBuf)
|
||||
cclog.ComponentError(
|
||||
m.name,
|
||||
fmt.Sprintf("readIpmiTool(): Failed to wait for the end of command \"%s\": %v\n", command.String(), err),
|
||||
fmt.Sprintf("readIpmiTool(): command stderr: \"%s\"\n", string(errMsg)),
|
||||
)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (m *IpmiCollector) readIpmiSensors(cmd string, output chan lp.CCMetric) {
|
||||
@@ -131,16 +156,16 @@ func (m *IpmiCollector) readIpmiSensors(cmd string, output chan lp.CCMetric) {
|
||||
}
|
||||
|
||||
func (m *IpmiCollector) Read(interval time.Duration, output chan lp.CCMetric) {
|
||||
|
||||
// Check if already initialized
|
||||
if !m.init {
|
||||
return
|
||||
}
|
||||
|
||||
if len(m.config.IpmitoolPath) > 0 {
|
||||
_, err := os.Stat(m.config.IpmitoolPath)
|
||||
if err == nil {
|
||||
m.readIpmiTool(m.config.IpmitoolPath, output)
|
||||
}
|
||||
m.readIpmiTool(m.config.IpmitoolPath, output)
|
||||
} else if len(m.config.IpmisensorsPath) > 0 {
|
||||
_, err := os.Stat(m.config.IpmisensorsPath)
|
||||
if err == nil {
|
||||
m.readIpmiSensors(m.config.IpmisensorsPath, output)
|
||||
}
|
||||
m.readIpmiSensors(m.config.IpmisensorsPath, output)
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -8,9 +8,6 @@
|
||||
}
|
||||
```
|
||||
|
||||
The `ipmistat` collector reads data from `ipmitool` (`ipmitool sensor`) or `ipmi-sensors` (`ipmi-sensors --sdr-cache-recreate --comma-separated-output`).
|
||||
The `ipmistat` collector reads data from `ipmitool` (`ipmitool sensor`) or `ipmi-sensors` (`ipmi-sensors --sdr-cache-recreate --comma-separated-output`).
|
||||
|
||||
The metrics depend on the output of the underlying tools but contain temperature, power and energy metrics.
|
||||
|
||||
|
||||
|
||||
|
@@ -28,6 +28,7 @@ import (
|
||||
lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
|
||||
topo "github.com/ClusterCockpit/cc-metric-collector/pkg/ccTopology"
|
||||
"github.com/NVIDIA/go-nvml/pkg/dl"
|
||||
"golang.design/x/thread"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -71,18 +72,20 @@ type LikwidCollectorConfig struct {
|
||||
|
||||
type LikwidCollector struct {
|
||||
metricCollector
|
||||
cpulist []C.int
|
||||
cpu2tid map[int]int
|
||||
sock2tid map[int]int
|
||||
metrics map[C.int]map[string]int
|
||||
groups []C.int
|
||||
config LikwidCollectorConfig
|
||||
gmresults map[int]map[string]float64
|
||||
basefreq float64
|
||||
running bool
|
||||
initialized bool
|
||||
likwidGroups map[C.int]LikwidEventsetConfig
|
||||
lock sync.Mutex
|
||||
cpulist []C.int
|
||||
cpu2tid map[int]int
|
||||
sock2tid map[int]int
|
||||
metrics map[C.int]map[string]int
|
||||
groups []C.int
|
||||
config LikwidCollectorConfig
|
||||
gmresults map[int]map[string]float64
|
||||
basefreq float64
|
||||
running bool
|
||||
initialized bool
|
||||
needs_reinit bool
|
||||
likwidGroups map[C.int]LikwidEventsetConfig
|
||||
lock sync.Mutex
|
||||
measureThread thread.Thread
|
||||
}
|
||||
|
||||
type LikwidMetric struct {
|
||||
@@ -92,6 +95,18 @@ type LikwidMetric struct {
|
||||
group_idx int
|
||||
}
|
||||
|
||||
func checkMetricType(t string) bool {
|
||||
valid := map[string]bool{
|
||||
"node": true,
|
||||
"socket": true,
|
||||
"hwthread": true,
|
||||
"core": true,
|
||||
"memoryDomain": true,
|
||||
}
|
||||
_, ok := valid[t]
|
||||
return ok
|
||||
}
|
||||
|
||||
func eventsToEventStr(events map[string]string) string {
|
||||
elist := make([]string, 0)
|
||||
for k, v := range events {
|
||||
@@ -179,6 +194,7 @@ func (m *LikwidCollector) Init(config json.RawMessage) error {
|
||||
m.name = "LikwidCollector"
|
||||
m.parallel = false
|
||||
m.initialized = false
|
||||
m.needs_reinit = true
|
||||
m.running = false
|
||||
m.config.AccessMode = LIKWID_DEF_ACCESSMODE
|
||||
m.config.LibraryPath = LIKWID_LIB_NAME
|
||||
@@ -239,7 +255,7 @@ func (m *LikwidCollector) Init(config json.RawMessage) error {
|
||||
}
|
||||
for _, metric := range evset.Metrics {
|
||||
// Try to evaluate the metric
|
||||
if testLikwidMetricFormula(metric.Calc, params) {
|
||||
if testLikwidMetricFormula(metric.Calc, params) && checkMetricType(metric.Type) {
|
||||
// Add the computable metric to the parameter list for the global metrics
|
||||
globalParams = append(globalParams, metric.Name)
|
||||
totalMetrics++
|
||||
@@ -257,6 +273,9 @@ func (m *LikwidCollector) Init(config json.RawMessage) error {
|
||||
if !testLikwidMetricFormula(metric.Calc, globalParams) {
|
||||
cclog.ComponentError(m.name, "Calculation for metric", metric.Name, "failed")
|
||||
metric.Calc = ""
|
||||
} else if !checkMetricType(metric.Type) {
|
||||
cclog.ComponentError(m.name, "Metric", metric.Name, "has invalid type")
|
||||
metric.Calc = ""
|
||||
} else {
|
||||
totalMetrics++
|
||||
}
|
||||
@@ -268,6 +287,7 @@ func (m *LikwidCollector) Init(config json.RawMessage) error {
|
||||
cclog.ComponentError(m.name, err.Error())
|
||||
return err
|
||||
}
|
||||
m.measureThread = thread.New()
|
||||
m.init = true
|
||||
return nil
|
||||
}
|
||||
@@ -281,6 +301,7 @@ func (m *LikwidCollector) takeMeasurement(evset LikwidEventsetConfig, interval t
|
||||
if ret != 0 {
|
||||
var err error = nil
|
||||
var skip bool = false
|
||||
cclog.ComponentDebug(m.name, "Setup returns", ret)
|
||||
if ret == -37 {
|
||||
skip = true
|
||||
} else {
|
||||
@@ -289,6 +310,7 @@ func (m *LikwidCollector) takeMeasurement(evset LikwidEventsetConfig, interval t
|
||||
m.lock.Unlock()
|
||||
return skip, err
|
||||
}
|
||||
m.running = true
|
||||
ret = C.perfmon_startCounters()
|
||||
if ret != 0 {
|
||||
var err error = nil
|
||||
@@ -301,7 +323,7 @@ func (m *LikwidCollector) takeMeasurement(evset LikwidEventsetConfig, interval t
|
||||
m.lock.Unlock()
|
||||
return skip, err
|
||||
}
|
||||
m.running = true
|
||||
ret = C.perfmon_readCounters()
|
||||
time.Sleep(interval)
|
||||
m.running = false
|
||||
ret = C.perfmon_stopCounters()
|
||||
@@ -316,6 +338,24 @@ func (m *LikwidCollector) takeMeasurement(evset LikwidEventsetConfig, interval t
|
||||
m.lock.Unlock()
|
||||
return skip, err
|
||||
}
|
||||
m.running = false
|
||||
runtime := float64(C.perfmon_getLastTimeOfGroup(evset.gid))
|
||||
// Go over events and get the results
|
||||
for eidx, counter := range evset.eorder {
|
||||
gctr := C.GoString(counter)
|
||||
for _, tid := range m.cpu2tid {
|
||||
res := C.perfmon_getLastResult(evset.gid, C.int(eidx), C.int(tid))
|
||||
fres := float64(res)
|
||||
if m.config.InvalidToZero && (math.IsNaN(fres) || math.IsInf(fres, 0)) {
|
||||
cclog.ComponentDebug(m.name, "Sanitize", gctr, "to zero")
|
||||
fres = 0.0
|
||||
}
|
||||
evset.results[tid][gctr] = fres
|
||||
}
|
||||
}
|
||||
for _, tid := range m.cpu2tid {
|
||||
evset.results[tid]["time"] = runtime
|
||||
}
|
||||
}
|
||||
m.lock.Unlock()
|
||||
return false, nil
|
||||
@@ -325,19 +365,8 @@ func (m *LikwidCollector) takeMeasurement(evset LikwidEventsetConfig, interval t
|
||||
func (m *LikwidCollector) calcEventsetMetrics(evset LikwidEventsetConfig, interval time.Duration, output chan lp.CCMetric) error {
|
||||
invClock := float64(1.0 / m.basefreq)
|
||||
|
||||
// Go over events and get the results
|
||||
for eidx, counter := range evset.eorder {
|
||||
gctr := C.GoString(counter)
|
||||
for _, tid := range m.cpu2tid {
|
||||
res := C.perfmon_getLastResult(evset.gid, C.int(eidx), C.int(tid))
|
||||
fres := float64(res)
|
||||
if m.config.InvalidToZero && (math.IsNaN(fres) || math.IsInf(fres, 0)) {
|
||||
fres = 0.0
|
||||
}
|
||||
evset.results[tid][gctr] = fres
|
||||
evset.results[tid]["time"] = interval.Seconds()
|
||||
evset.results[tid]["inverseClock"] = invClock
|
||||
}
|
||||
for _, tid := range m.cpu2tid {
|
||||
evset.results[tid]["inverseClock"] = invClock
|
||||
}
|
||||
|
||||
// Go over the event set metrics, derive the value out of the event:counter values and send it
|
||||
@@ -431,6 +460,28 @@ func (m *LikwidCollector) calcGlobalMetrics(interval time.Duration, output chan
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *LikwidCollector) ReInit() error {
|
||||
C.perfmon_finalize()
|
||||
ret := C.perfmon_init(C.int(len(m.cpulist)), &m.cpulist[0])
|
||||
if ret != 0 {
|
||||
return nil
|
||||
}
|
||||
for i, evset := range m.config.Eventsets {
|
||||
var gid C.int
|
||||
if len(evset.Events) > 0 {
|
||||
//skip := false
|
||||
likwidGroup := genLikwidEventSet(evset)
|
||||
gid = C.perfmon_addEventSet(likwidGroup.estr)
|
||||
if gid >= 0 {
|
||||
likwidGroup.gid = gid
|
||||
likwidGroup.internal = i
|
||||
m.likwidGroups[gid] = likwidGroup
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *LikwidCollector) LateInit() error {
|
||||
var ret C.int
|
||||
if m.initialized {
|
||||
@@ -445,6 +496,9 @@ func (m *LikwidCollector) LateInit() error {
|
||||
os.Setenv("PATH", m.config.DaemonPath+":"+p)
|
||||
}
|
||||
C.HPMmode(1)
|
||||
for _, c := range m.cpulist {
|
||||
C.HPMaddThread(c)
|
||||
}
|
||||
}
|
||||
cclog.ComponentDebug(m.name, "initialize LIKWID topology")
|
||||
ret = C.topology_init()
|
||||
@@ -468,48 +522,53 @@ func (m *LikwidCollector) LateInit() error {
|
||||
m.basefreq = getBaseFreq()
|
||||
cclog.ComponentDebug(m.name, "BaseFreq", m.basefreq)
|
||||
|
||||
cclog.ComponentDebug(m.name, "initialize LIKWID perfmon module")
|
||||
ret = C.perfmon_init(C.int(len(m.cpulist)), &m.cpulist[0])
|
||||
if ret != 0 {
|
||||
var err error = nil
|
||||
C.topology_finalize()
|
||||
if ret != -22 {
|
||||
err = errors.New("failed to initialize LIKWID perfmon")
|
||||
cclog.ComponentError(m.name, err.Error())
|
||||
} else {
|
||||
err = errors.New("access to LIKWID perfmon locked")
|
||||
}
|
||||
return err
|
||||
if m.needs_reinit {
|
||||
m.ReInit()
|
||||
m.needs_reinit = false
|
||||
}
|
||||
|
||||
// While adding the events, we test the metrics whether they can be computed at all
|
||||
for i, evset := range m.config.Eventsets {
|
||||
var gid C.int
|
||||
if len(evset.Events) > 0 {
|
||||
skip := false
|
||||
likwidGroup := genLikwidEventSet(evset)
|
||||
for _, g := range m.likwidGroups {
|
||||
if likwidGroup.go_estr == g.go_estr {
|
||||
skip = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if skip {
|
||||
continue
|
||||
}
|
||||
// Now we add the list of events to likwid
|
||||
gid = C.perfmon_addEventSet(likwidGroup.estr)
|
||||
if gid >= 0 {
|
||||
likwidGroup.gid = gid
|
||||
likwidGroup.internal = i
|
||||
m.likwidGroups[gid] = likwidGroup
|
||||
}
|
||||
} else {
|
||||
cclog.ComponentError(m.name, "Invalid Likwid eventset config, no events given")
|
||||
continue
|
||||
}
|
||||
// cclog.ComponentDebug(m.name, "initialize LIKWID perfmon module")
|
||||
// ret = C.perfmon_init(C.int(len(m.cpulist)), &m.cpulist[0])
|
||||
// if ret != 0 {
|
||||
// var err error = nil
|
||||
// C.topology_finalize()
|
||||
// if ret != -22 {
|
||||
// err = errors.New("failed to initialize LIKWID perfmon")
|
||||
// cclog.ComponentError(m.name, err.Error())
|
||||
// } else {
|
||||
// err = errors.New("access to LIKWID perfmon locked")
|
||||
// }
|
||||
// return err
|
||||
// }
|
||||
|
||||
}
|
||||
// // While adding the events, we test the metrics whether they can be computed at all
|
||||
// for i, evset := range m.config.Eventsets {
|
||||
// var gid C.int
|
||||
// if len(evset.Events) > 0 {
|
||||
// //skip := false
|
||||
// likwidGroup := genLikwidEventSet(evset)
|
||||
// // for _, g := range m.likwidGroups {
|
||||
// // if likwidGroup.go_estr == g.go_estr {
|
||||
// // skip = true
|
||||
// // break
|
||||
// // }
|
||||
// // }
|
||||
// // if skip {
|
||||
// // continue
|
||||
// // }
|
||||
// // Now we add the list of events to likwid
|
||||
// gid = C.perfmon_addEventSet(likwidGroup.estr)
|
||||
// if gid >= 0 {
|
||||
// likwidGroup.gid = gid
|
||||
// likwidGroup.internal = i
|
||||
// m.likwidGroups[gid] = likwidGroup
|
||||
// }
|
||||
// } else {
|
||||
// cclog.ComponentError(m.name, "Invalid Likwid eventset config, no events given")
|
||||
// continue
|
||||
// }
|
||||
|
||||
// }
|
||||
|
||||
// If no event set could be added, shut down LikwidCollector
|
||||
if len(m.likwidGroups) == 0 {
|
||||
@@ -540,38 +599,48 @@ func (m *LikwidCollector) Read(interval time.Duration, output chan lp.CCMetric)
|
||||
return
|
||||
}
|
||||
|
||||
if !m.initialized {
|
||||
m.lock.Lock()
|
||||
err = m.LateInit()
|
||||
if err != nil {
|
||||
m.measureThread.Call(func() {
|
||||
if !m.initialized {
|
||||
m.lock.Lock()
|
||||
err = m.LateInit()
|
||||
if err != nil {
|
||||
m.lock.Unlock()
|
||||
cclog.ComponentError(m.name, "lateinit failed")
|
||||
return
|
||||
}
|
||||
m.initialized = true
|
||||
m.lock.Unlock()
|
||||
return
|
||||
skip = true
|
||||
}
|
||||
m.initialized = true
|
||||
m.lock.Unlock()
|
||||
}
|
||||
|
||||
if m.initialized && !skip {
|
||||
for _, evset := range m.likwidGroups {
|
||||
if !skip {
|
||||
// measure event set 'i' for 'interval' seconds
|
||||
skip, err = m.takeMeasurement(evset, interval)
|
||||
if err != nil {
|
||||
cclog.ComponentError(m.name, err.Error())
|
||||
return
|
||||
if m.initialized && !skip {
|
||||
time := interval
|
||||
for _, evset := range m.likwidGroups {
|
||||
if !skip {
|
||||
// measure event set 'i' for 'interval' seconds
|
||||
skip, err = m.takeMeasurement(evset, interval)
|
||||
if err != nil {
|
||||
cclog.ComponentError(m.name, err.Error())
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if !skip {
|
||||
// read measurements and derive event set metrics
|
||||
m.calcEventsetMetrics(evset, time, output)
|
||||
}
|
||||
}
|
||||
|
||||
if !skip {
|
||||
// read measurements and derive event set metrics
|
||||
m.calcEventsetMetrics(evset, interval, output)
|
||||
// use the event set metrics to derive the global metrics
|
||||
m.calcGlobalMetrics(time, output)
|
||||
}
|
||||
if skip {
|
||||
m.needs_reinit = true
|
||||
m.initialized = false
|
||||
}
|
||||
}
|
||||
if !skip {
|
||||
// use the event set metrics to derive the global metrics
|
||||
m.calcGlobalMetrics(interval, output)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func (m *LikwidCollector) Close() {
|
||||
|
@@ -41,7 +41,7 @@ The `likwid` collector is probably the most complicated collector. The LIKWID li
|
||||
|
||||
The `likwid` configuration consists of two parts, the `eventsets` and `globalmetrics`:
|
||||
- An event set list itself has two parts, the `events` and a set of derivable `metrics`. Each of the `events` is a `counter:event` pair in LIKWID's syntax. The `metrics` are a list of formulas to derive the metric value from the measurements of the `events`' values. Each metric has a name, the formula, a type and a publish flag. There is an optional `unit` field. Counter names can be used like variables in the formulas, so `PMC0+PMC1` sums the measurements for the both events configured in the counters `PMC0` and `PMC1`. You can optionally use `time` for the measurement time and `inverseClock` for `1.0/baseCpuFrequency`. The type tells the LikwidCollector whether it is a metric for each hardware thread (`cpu`) or each CPU socket (`socket`). You may specify a unit for the metric with `unit`. The last one is the publishing flag. It tells the LikwidCollector whether a metric should be sent to the router or is only used internally to compute a global metric.
|
||||
- The `globalmetrics` are metrics which require data from multiple event set measurements to be derived. The inputs are the metrics in the event sets. Similar to the metrics in the event sets, the global metrics are defined by a name, a formula, a scope and a publish flag. See event set metrics for details. The only difference is that there is no access to the raw event measurements anymore but only to the metrics. Also `time` and `inverseClock` cannot be used anymore. So, the idea is to derive a metric in the `eventsets` section and reuse it in the `globalmetrics` part. If you need a metric only for deriving the global metrics, disable forwarding of the event set metrics (`"publish": false`). **Be aware** that the combination might be misleading because the "behavior" of a metric changes over time and the multiple measurements might count different computing phases. Similar to the metrics in the eventset, you can specify a metric unit with the `unit` field.
|
||||
- The `globalmetrics` are metrics which require data from multiple event set measurements to be derived. The inputs are the metrics in the event sets. Similar to the metrics in the event sets, the global metrics are defined by a name, a formula, a type and a publish flag. See event set metrics for details. The only difference is that there is no access to the raw event measurements anymore but only to the metrics. Also `time` and `inverseClock` cannot be used anymore. So, the idea is to derive a metric in the `eventsets` section and reuse it in the `globalmetrics` part. If you need a metric only for deriving the global metrics, disable forwarding of the event set metrics (`"publish": false`). **Be aware** that the combination might be misleading because the "behavior" of a metric changes over time and the multiple measurements might count different computing phases. Similar to the metrics in the eventset, you can specify a metric unit with the `unit` field.
|
||||
|
||||
Additional options:
|
||||
- `force_overwrite`: Same as setting `LIKWID_FORCE=1`. In case counters are already in-use, LIKWID overwrites their configuration to do its measurements
|
||||
@@ -50,20 +50,20 @@ Additional options:
|
||||
- `accessdaemon_path`: Folder of the accessDaemon `likwid-accessD` (like `/usr/local/sbin`)
|
||||
- `liblikwid_path`: Location of `liblikwid.so` including file name like `/usr/local/lib/liblikwid.so`
|
||||
|
||||
### Available metric scopes
|
||||
### Available metric types
|
||||
|
||||
Hardware performance counters are scattered all over the system nowadays. A counter coveres a specific part of the system. While there are hardware thread specific counter for CPU cycles, instructions and so on, some others are specific for a whole CPU socket/package. To address that, the LikwidCollector provides the specification of a `type` for each metric.
|
||||
|
||||
- `hwthread` : One metric per CPU hardware thread with the tags `"type" : "hwthread"` and `"type-id" : "$hwthread_id"`
|
||||
- `socket` : One metric per CPU socket/package with the tags `"type" : "socket"` and `"type-id" : "$socket_id"`
|
||||
|
||||
**Note:** You cannot specify `socket` scope for a metric that is measured at `hwthread` scope, so some kind of expert knowledge or lookup work in the [Likwid Wiki](https://github.com/RRZE-HPC/likwid/wiki) is required. Get the scope of each counter from the *Architecture* pages and as soon as one counter in a metric is socket-specific, the whole metric is socket-specific.
|
||||
**Note:** You cannot specify `socket` type for a metric that is measured at `hwthread` type, so some kind of expert knowledge or lookup work in the [Likwid Wiki](https://github.com/RRZE-HPC/likwid/wiki) is required. Get the type of each counter from the *Architecture* pages and as soon as one counter in a metric is socket-specific, the whole metric is socket-specific.
|
||||
|
||||
As a guideline:
|
||||
- All counters `FIXCx`, `PMCy` and `TMAz` have the scope `hwthread`
|
||||
- All counters names containing `BOX` have the scope `socket`
|
||||
- All `PWRx` counters have scope `socket`, except `"PWR1" : "RAPL_CORE_ENERGY"` has `hwthread` scope
|
||||
- All `DFCx` counters have scope `socket`
|
||||
- All counters `FIXCx`, `PMCy` and `TMAz` have the type `hwthread`
|
||||
- All counters names containing `BOX` have the type `socket`
|
||||
- All `PWRx` counters have type `socket`, except `"PWR1" : "RAPL_CORE_ENERGY"` has `hwthread` type
|
||||
- All `DFCx` counters have type `socket`
|
||||
|
||||
### Help with the configuration
|
||||
|
||||
@@ -93,7 +93,7 @@ $ scripts/likwid_perfgroup_to_cc_config.py ICX MEM_DP
|
||||
"name": "Runtime (RDTSC) [s]",
|
||||
"publish": true,
|
||||
"unit": "seconds"
|
||||
"scope": "hwthread"
|
||||
"type": "hwthread"
|
||||
},
|
||||
{
|
||||
"..." : "..."
|
||||
@@ -245,7 +245,7 @@ METRICS -> "metrics": [
|
||||
IPC PMC0/PMC1 -> {
|
||||
-> "name" : "IPC",
|
||||
-> "calc" : "PMC0/PMC1",
|
||||
-> "scope": "hwthread",
|
||||
-> "type": "hwthread",
|
||||
-> "publish": true
|
||||
-> }
|
||||
-> ]
|
||||
|
166
collectors/nfsiostatMetric.go
Normal file
166
collectors/nfsiostatMetric.go
Normal file
@@ -0,0 +1,166 @@
|
||||
package collectors
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
|
||||
lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
|
||||
)
|
||||
|
||||
// These are the fields we read from the JSON configuration
|
||||
type NfsIOStatCollectorConfig struct {
|
||||
ExcludeMetrics []string `json:"exclude_metrics,omitempty"`
|
||||
ExcludeFilesystem []string `json:"exclude_filesystem,omitempty"`
|
||||
UseServerAddressAsSType bool `json:"use_server_as_stype,omitempty"`
|
||||
}
|
||||
|
||||
// This contains all variables we need during execution and the variables
|
||||
// defined by metricCollector (name, init, ...)
|
||||
type NfsIOStatCollector struct {
|
||||
metricCollector
|
||||
config NfsIOStatCollectorConfig // the configuration structure
|
||||
meta map[string]string // default meta information
|
||||
tags map[string]string // default tags
|
||||
data map[string]map[string]int64 // data storage for difference calculation
|
||||
key string // which device info should be used as subtype ID? 'server' or 'mntpoint', see NfsIOStatCollectorConfig.UseServerAddressAsSType
|
||||
}
|
||||
|
||||
var deviceRegex = regexp.MustCompile(`device (?P<server>[^ ]+) mounted on (?P<mntpoint>[^ ]+) with fstype nfs(?P<version>\d*) statvers=[\d\.]+`)
|
||||
var bytesRegex = regexp.MustCompile(`\s+bytes:\s+(?P<nread>[^ ]+) (?P<nwrite>[^ ]+) (?P<dread>[^ ]+) (?P<dwrite>[^ ]+) (?P<nfsread>[^ ]+) (?P<nfswrite>[^ ]+) (?P<pageread>[^ ]+) (?P<pagewrite>[^ ]+)`)
|
||||
|
||||
func resolve_regex_fields(s string, regex *regexp.Regexp) map[string]string {
|
||||
fields := make(map[string]string)
|
||||
groups := regex.SubexpNames()
|
||||
for _, match := range regex.FindAllStringSubmatch(s, -1) {
|
||||
for groupIdx, group := range match {
|
||||
if len(groups[groupIdx]) > 0 {
|
||||
fields[groups[groupIdx]] = group
|
||||
}
|
||||
}
|
||||
}
|
||||
return fields
|
||||
}
|
||||
|
||||
func (m *NfsIOStatCollector) readNfsiostats() map[string]map[string]int64 {
|
||||
data := make(map[string]map[string]int64)
|
||||
filename := "/proc/self/mountstats"
|
||||
stats, err := os.ReadFile(filename)
|
||||
if err != nil {
|
||||
return data
|
||||
}
|
||||
|
||||
lines := strings.Split(string(stats), "\n")
|
||||
var current map[string]string = nil
|
||||
for _, l := range lines {
|
||||
// Is this a device line with mount point, remote target and NFS version?
|
||||
dev := resolve_regex_fields(l, deviceRegex)
|
||||
if len(dev) > 0 {
|
||||
if _, ok := stringArrayContains(m.config.ExcludeFilesystem, dev[m.key]); !ok {
|
||||
current = dev
|
||||
if len(current["version"]) == 0 {
|
||||
current["version"] = "3"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if len(current) > 0 {
|
||||
// Byte line parsing (if found the device for it)
|
||||
bytes := resolve_regex_fields(l, bytesRegex)
|
||||
if len(bytes) > 0 {
|
||||
data[current[m.key]] = make(map[string]int64)
|
||||
for name, sval := range bytes {
|
||||
if _, ok := stringArrayContains(m.config.ExcludeMetrics, name); !ok {
|
||||
val, err := strconv.ParseInt(sval, 10, 64)
|
||||
if err == nil {
|
||||
data[current[m.key]][name] = val
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
current = nil
|
||||
}
|
||||
}
|
||||
}
|
||||
return data
|
||||
}
|
||||
|
||||
func (m *NfsIOStatCollector) Init(config json.RawMessage) error {
|
||||
var err error = nil
|
||||
m.name = "NfsIOStatCollector"
|
||||
m.setup()
|
||||
m.parallel = true
|
||||
m.meta = map[string]string{"source": m.name, "group": "NFS", "unit": "bytes"}
|
||||
m.tags = map[string]string{"type": "node"}
|
||||
m.config.UseServerAddressAsSType = false
|
||||
if len(config) > 0 {
|
||||
err = json.Unmarshal(config, &m.config)
|
||||
if err != nil {
|
||||
cclog.ComponentError(m.name, "Error reading config:", err.Error())
|
||||
return err
|
||||
}
|
||||
}
|
||||
m.key = "mntpoint"
|
||||
if m.config.UseServerAddressAsSType {
|
||||
m.key = "server"
|
||||
}
|
||||
m.data = m.readNfsiostats()
|
||||
m.init = true
|
||||
return err
|
||||
}
|
||||
|
||||
func (m *NfsIOStatCollector) Read(interval time.Duration, output chan lp.CCMetric) {
|
||||
timestamp := time.Now()
|
||||
|
||||
// Get the current values for all mountpoints
|
||||
newdata := m.readNfsiostats()
|
||||
|
||||
for mntpoint, values := range newdata {
|
||||
// Was the mount point already present in the last iteration
|
||||
if old, ok := m.data[mntpoint]; ok {
|
||||
// Calculate the difference of old and new values
|
||||
for i := range values {
|
||||
x := values[i] - old[i]
|
||||
y, err := lp.New(fmt.Sprintf("nfsio_%s", i), m.tags, m.meta, map[string]interface{}{"value": x}, timestamp)
|
||||
if err == nil {
|
||||
if strings.HasPrefix(i, "page") {
|
||||
y.AddMeta("unit", "4K_Pages")
|
||||
}
|
||||
y.AddTag("stype", "filesystem")
|
||||
y.AddTag("stype-id", mntpoint)
|
||||
// Send it to output channel
|
||||
output <- y
|
||||
}
|
||||
// Update old to the new value for the next iteration
|
||||
old[i] = values[i]
|
||||
}
|
||||
} else {
|
||||
// First time we see this mount point, store all values
|
||||
m.data[mntpoint] = values
|
||||
}
|
||||
}
|
||||
// Reset entries that do not exist anymore
|
||||
for mntpoint := range m.data {
|
||||
found := false
|
||||
for new := range newdata {
|
||||
if new == mntpoint {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
m.data[mntpoint] = nil
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (m *NfsIOStatCollector) Close() {
|
||||
// Unset flag
|
||||
m.init = false
|
||||
}
|
27
collectors/nfsiostatMetric.md
Normal file
27
collectors/nfsiostatMetric.md
Normal file
@@ -0,0 +1,27 @@
|
||||
## `nfsiostat` collector
|
||||
|
||||
```json
|
||||
"nfsiostat": {
|
||||
"exclude_metrics": [
|
||||
"nfsio_oread"
|
||||
],
|
||||
"exclude_filesystems" : [
|
||||
"/mnt",
|
||||
],
|
||||
"use_server_as_stype": false
|
||||
}
|
||||
```
|
||||
|
||||
The `nfsiostat` collector reads data from `/proc/self/mountstats` and outputs a handful **node** metrics for each NFS filesystem. If a metric or filesystem is not required, it can be excluded from forwarding it to the sink.
|
||||
|
||||
Metrics:
|
||||
* `nfsio_nread`: Bytes transferred by normal `read()` calls
|
||||
* `nfsio_nwrite`: Bytes transferred by normal `write()` calls
|
||||
* `nfsio_oread`: Bytes transferred by `read()` calls with `O_DIRECT`
|
||||
* `nfsio_owrite`: Bytes transferred by `write()` calls with `O_DIRECT`
|
||||
* `nfsio_pageread`: Pages transferred by `read()` calls
|
||||
* `nfsio_pagewrite`: Pages transferred by `write()` calls
|
||||
* `nfsio_nfsread`: Bytes transferred for reading from the server
|
||||
* `nfsio_nfswrite`: Pages transferred by writing to the server
|
||||
|
||||
The `nfsiostat` collector adds the mountpoint to the tags as `stype=filesystem,stype-id=<mountpoint>`. If the server address should be used instead of the mountpoint, use the `use_server_as_stype` config setting.
|
@@ -14,29 +14,38 @@ import (
|
||||
lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
|
||||
)
|
||||
|
||||
//
|
||||
// Numa policy hit/miss statistics
|
||||
// Non-Uniform Memory Access (NUMA) policy hit/miss statistics
|
||||
//
|
||||
// numa_hit:
|
||||
// A process wanted to allocate memory from this node, and succeeded.
|
||||
//
|
||||
// A process wanted to allocate memory from this node, and succeeded.
|
||||
//
|
||||
// numa_miss:
|
||||
// A process wanted to allocate memory from another node,
|
||||
// but ended up with memory from this node.
|
||||
//
|
||||
// A process wanted to allocate memory from another node,
|
||||
// but ended up with memory from this node.
|
||||
//
|
||||
// numa_foreign:
|
||||
// A process wanted to allocate on this node,
|
||||
// but ended up with memory from another node.
|
||||
//
|
||||
// A process wanted to allocate on this node,
|
||||
// but ended up with memory from another node.
|
||||
//
|
||||
// local_node:
|
||||
// A process ran on this node's CPU,
|
||||
// and got memory from this node.
|
||||
//
|
||||
// A process ran on this node's CPU,
|
||||
// and got memory from this node.
|
||||
//
|
||||
// other_node:
|
||||
// A process ran on a different node's CPU
|
||||
// and got memory from this node.
|
||||
//
|
||||
// A process ran on a different node's CPU
|
||||
// and got memory from this node.
|
||||
//
|
||||
// interleave_hit:
|
||||
// Interleaving wanted to allocate from this node
|
||||
// and succeeded.
|
||||
//
|
||||
// Interleaving wanted to allocate from this node
|
||||
// and succeeded.
|
||||
//
|
||||
// See: https://www.kernel.org/doc/html/latest/admin-guide/numastat.html
|
||||
//
|
||||
type NUMAStatsCollectorTopolgy struct {
|
||||
file string
|
||||
tagSet map[string]string
|
||||
@@ -82,6 +91,8 @@ func (m *NUMAStatsCollector) Init(config json.RawMessage) error {
|
||||
})
|
||||
}
|
||||
|
||||
// Initialized
|
||||
cclog.ComponentDebug(m.name, "initialized", len(m.topology), "NUMA domains")
|
||||
m.init = true
|
||||
return nil
|
||||
}
|
||||
|
@@ -1,15 +1,17 @@
|
||||
|
||||
## `numastat` collector
|
||||
|
||||
```json
|
||||
"numastat": {}
|
||||
"numastats": {}
|
||||
```
|
||||
|
||||
The `numastat` collector reads data from `/sys/devices/system/node/node*/numastat` and outputs a handful **memoryDomain** metrics. See: https://www.kernel.org/doc/html/latest/admin-guide/numastat.html
|
||||
The `numastat` collector reads data from `/sys/devices/system/node/node*/numastat` and outputs a handful **memoryDomain** metrics. See: <https://www.kernel.org/doc/html/latest/admin-guide/numastat.html>
|
||||
|
||||
Metrics:
|
||||
|
||||
* `numastats_numa_hit`: A process wanted to allocate memory from this node, and succeeded.
|
||||
* `numastats_numa_miss`: A process wanted to allocate memory from another node, but ended up with memory from this node.
|
||||
* `numastats_numa_foreign`: A process wanted to allocate on this node, but ended up with memory from another node.
|
||||
* `numastats_local_node`: A process ran on this node's CPU, and got memory from this node.
|
||||
* `numastats_other_node`: A process ran on a different node's CPU, and got memory from this node.
|
||||
* `numastats_interleave_hit`: Interleaving wanted to allocate from this node and succeeded.
|
||||
* `numastats_interleave_hit`: Interleaving wanted to allocate from this node and succeeded.
|
||||
|
262
collectors/raplMetric.go
Normal file
262
collectors/raplMetric.go
Normal file
@@ -0,0 +1,262 @@
|
||||
package collectors
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
|
||||
lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
|
||||
)
|
||||
|
||||
// running average power limit (RAPL) monitoring attributes for a zone
|
||||
type RAPLZoneInfo struct {
|
||||
// tags describing the RAPL zone:
|
||||
// * zone_name, subzone_name: e.g. psys, dram, core, uncore, package-0
|
||||
// * zone_id: e.g. 0:1 (zone 0 sub zone 1)
|
||||
tags map[string]string
|
||||
energyFilepath string // path to a file containing the zones current energy counter in micro joules
|
||||
energy int64 // current reading of the energy counter in micro joules
|
||||
energyTimestamp time.Time // timestamp when energy counter was read
|
||||
maxEnergyRange int64 // Range of the above energy counter in micro-joules
|
||||
}
|
||||
|
||||
type RAPLCollector struct {
|
||||
metricCollector
|
||||
config struct {
|
||||
// Exclude IDs for RAPL zones, e.g.
|
||||
// * 0 for zone 0
|
||||
// * 0:1 for zone 0 subzone 1
|
||||
ExcludeByID []string `json:"exclude_device_by_id,omitempty"`
|
||||
// Exclude names for RAPL zones, e.g. psys, dram, core, uncore, package-0
|
||||
ExcludeByName []string `json:"exclude_device_by_name,omitempty"`
|
||||
}
|
||||
RAPLZoneInfo []RAPLZoneInfo
|
||||
meta map[string]string // default meta information
|
||||
}
|
||||
|
||||
// Init initializes the running average power limit (RAPL) collector
|
||||
func (m *RAPLCollector) Init(config json.RawMessage) error {
|
||||
|
||||
// Check if already initialized
|
||||
if m.init {
|
||||
return nil
|
||||
}
|
||||
|
||||
var err error = nil
|
||||
m.name = "RAPLCollector"
|
||||
m.setup()
|
||||
m.parallel = true
|
||||
m.meta = map[string]string{
|
||||
"source": m.name,
|
||||
"group": "energy",
|
||||
"unit": "Watt",
|
||||
}
|
||||
|
||||
// Read in the JSON configuration
|
||||
if len(config) > 0 {
|
||||
err = json.Unmarshal(config, &m.config)
|
||||
if err != nil {
|
||||
cclog.ComponentError(m.name, "Error reading config:", err.Error())
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Configure excluded RAPL zones
|
||||
isIDExcluded := make(map[string]bool)
|
||||
if m.config.ExcludeByID != nil {
|
||||
for _, ID := range m.config.ExcludeByID {
|
||||
isIDExcluded[ID] = true
|
||||
}
|
||||
}
|
||||
isNameExcluded := make(map[string]bool)
|
||||
if m.config.ExcludeByName != nil {
|
||||
for _, name := range m.config.ExcludeByName {
|
||||
isNameExcluded[name] = true
|
||||
}
|
||||
}
|
||||
|
||||
// readZoneInfo reads RAPL monitoring attributes for a zone given by zonePath
|
||||
// See: https://www.kernel.org/doc/html/latest/power/powercap/powercap.html#monitoring-attributes
|
||||
readZoneInfo := func(zonePath string) (z struct {
|
||||
name string // zones name e.g. psys, dram, core, uncore, package-0
|
||||
energyFilepath string // path to a file containing the zones current energy counter in micro joules
|
||||
energy int64 // current reading of the energy counter in micro joules
|
||||
energyTimestamp time.Time // timestamp when energy counter was read
|
||||
maxEnergyRange int64 // Range of the above energy counter in micro-joules
|
||||
ok bool // Are all information available?
|
||||
}) {
|
||||
// zones name e.g. psys, dram, core, uncore, package-0
|
||||
foundName := false
|
||||
if v, err :=
|
||||
os.ReadFile(
|
||||
filepath.Join(zonePath, "name")); err == nil {
|
||||
foundName = true
|
||||
z.name = strings.TrimSpace(string(v))
|
||||
}
|
||||
|
||||
// path to a file containing the zones current energy counter in micro joules
|
||||
z.energyFilepath = filepath.Join(zonePath, "energy_uj")
|
||||
|
||||
// current reading of the energy counter in micro joules
|
||||
foundEnergy := false
|
||||
if v, err := os.ReadFile(z.energyFilepath); err == nil {
|
||||
// timestamp when energy counter was read
|
||||
z.energyTimestamp = time.Now()
|
||||
if i, err := strconv.ParseInt(strings.TrimSpace(string(v)), 10, 64); err == nil {
|
||||
foundEnergy = true
|
||||
z.energy = i
|
||||
}
|
||||
}
|
||||
|
||||
// Range of the above energy counter in micro-joules
|
||||
foundMaxEnergyRange := false
|
||||
if v, err :=
|
||||
os.ReadFile(
|
||||
filepath.Join(zonePath, "max_energy_range_uj")); err == nil {
|
||||
if i, err := strconv.ParseInt(strings.TrimSpace(string(v)), 10, 64); err == nil {
|
||||
foundMaxEnergyRange = true
|
||||
z.maxEnergyRange = i
|
||||
}
|
||||
}
|
||||
|
||||
// Are all information available?
|
||||
z.ok = foundName && foundEnergy && foundMaxEnergyRange
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
powerCapPrefix := "/sys/devices/virtual/powercap"
|
||||
controlType := "intel-rapl"
|
||||
controlTypePath := filepath.Join(powerCapPrefix, controlType)
|
||||
|
||||
// Find all RAPL zones
|
||||
zonePrefix := filepath.Join(controlTypePath, controlType+":")
|
||||
zonesPath, err := filepath.Glob(zonePrefix + "*")
|
||||
if err != nil || zonesPath == nil {
|
||||
return fmt.Errorf("unable to find any zones under %s", controlTypePath)
|
||||
}
|
||||
|
||||
for _, zonePath := range zonesPath {
|
||||
zoneID := strings.TrimPrefix(zonePath, zonePrefix)
|
||||
z := readZoneInfo(zonePath)
|
||||
if z.ok &&
|
||||
!isIDExcluded[zoneID] &&
|
||||
!isNameExcluded[z.name] {
|
||||
|
||||
// Add RAPL monitoring attributes for a zone
|
||||
m.RAPLZoneInfo =
|
||||
append(
|
||||
m.RAPLZoneInfo,
|
||||
RAPLZoneInfo{
|
||||
tags: map[string]string{
|
||||
"id": zoneID,
|
||||
"zone_name": z.name,
|
||||
},
|
||||
energyFilepath: z.energyFilepath,
|
||||
energy: z.energy,
|
||||
energyTimestamp: z.energyTimestamp,
|
||||
maxEnergyRange: z.maxEnergyRange,
|
||||
})
|
||||
}
|
||||
|
||||
// find all sub zones for the given zone
|
||||
subZonePrefix := filepath.Join(zonePath, controlType+":"+zoneID+":")
|
||||
subZonesPath, err := filepath.Glob(subZonePrefix + "*")
|
||||
if err != nil || subZonesPath == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
for _, subZonePath := range subZonesPath {
|
||||
subZoneID := strings.TrimPrefix(subZonePath, subZonePrefix)
|
||||
sz := readZoneInfo(subZonePath)
|
||||
if len(zoneID) > 0 && len(z.name) > 0 &&
|
||||
sz.ok &&
|
||||
!isIDExcluded[zoneID+":"+subZoneID] &&
|
||||
!isNameExcluded[sz.name] {
|
||||
m.RAPLZoneInfo =
|
||||
append(
|
||||
m.RAPLZoneInfo,
|
||||
RAPLZoneInfo{
|
||||
tags: map[string]string{
|
||||
"id": zoneID + ":" + subZoneID,
|
||||
"zone_name": z.name,
|
||||
"sub_zone_name": sz.name,
|
||||
},
|
||||
energyFilepath: sz.energyFilepath,
|
||||
energy: sz.energy,
|
||||
energyTimestamp: sz.energyTimestamp,
|
||||
maxEnergyRange: sz.maxEnergyRange,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if m.RAPLZoneInfo == nil {
|
||||
return fmt.Errorf("no running average power limit (RAPL) device found in %s", controlTypePath)
|
||||
|
||||
}
|
||||
|
||||
// Initialized
|
||||
cclog.ComponentDebug(
|
||||
m.name,
|
||||
"initialized",
|
||||
len(m.RAPLZoneInfo),
|
||||
"zones with running average power limit (RAPL) monitoring attributes")
|
||||
m.init = true
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// Read reads running average power limit (RAPL) monitoring attributes for all initialized zones
|
||||
// See: https://www.kernel.org/doc/html/latest/power/powercap/powercap.html#monitoring-attributes
|
||||
func (m *RAPLCollector) Read(interval time.Duration, output chan lp.CCMetric) {
|
||||
|
||||
for i := range m.RAPLZoneInfo {
|
||||
p := &m.RAPLZoneInfo[i]
|
||||
|
||||
// Read current value of the energy counter in micro joules
|
||||
if v, err := os.ReadFile(p.energyFilepath); err == nil {
|
||||
energyTimestamp := time.Now()
|
||||
if i, err := strconv.ParseInt(strings.TrimSpace(string(v)), 10, 64); err == nil {
|
||||
energy := i
|
||||
|
||||
// Compute average power (Δ energy / Δ time)
|
||||
energyDiff := energy - p.energy
|
||||
if energyDiff < 0 {
|
||||
// Handle overflow:
|
||||
// ( p.maxEnergyRange - p.energy ) + energy
|
||||
// = p.maxEnergyRange + ( energy - p.energy )
|
||||
// = p.maxEnergyRange + diffEnergy
|
||||
energyDiff += p.maxEnergyRange
|
||||
}
|
||||
timeDiff := energyTimestamp.Sub(p.energyTimestamp)
|
||||
averagePower := float64(energyDiff) / float64(timeDiff.Microseconds())
|
||||
|
||||
y, err := lp.New(
|
||||
"rapl_average_power",
|
||||
p.tags,
|
||||
m.meta,
|
||||
map[string]interface{}{"value": averagePower},
|
||||
energyTimestamp)
|
||||
if err == nil {
|
||||
output <- y
|
||||
}
|
||||
|
||||
// Save current energy counter state
|
||||
p.energy = energy
|
||||
p.energyTimestamp = energyTimestamp
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Close closes running average power limit (RAPL) metric collector
|
||||
func (m *RAPLCollector) Close() {
|
||||
// Unset flag
|
||||
m.init = false
|
||||
}
|
18
collectors/raplMetric.md
Normal file
18
collectors/raplMetric.md
Normal file
@@ -0,0 +1,18 @@
|
||||
# Running average power limit (RAPL) metric collector
|
||||
|
||||
This collector reads running average power limit (RAPL) monitoring attributes to compute average power consumption metrics. See <https://www.kernel.org/doc/html/latest/power/powercap/powercap.html#monitoring-attributes>.
|
||||
|
||||
The Likwid metric collector provides similar functionality.
|
||||
|
||||
## Configuration
|
||||
|
||||
```json
|
||||
"rapl": {
|
||||
"exclude_device_by_id": ["0:1", "0:2"],
|
||||
"exclude_device_by_name": ["psys"]
|
||||
}
|
||||
```
|
||||
|
||||
## Metrics
|
||||
|
||||
* `rapl_average_power`: average power consumption in Watt. The average is computed over the entire runtime from the last measurement to the current measurement
|
@@ -17,9 +17,9 @@ type SampleCollectorConfig struct {
|
||||
// defined by metricCollector (name, init, ...)
|
||||
type SampleCollector struct {
|
||||
metricCollector
|
||||
config SampleTimerCollectorConfig // the configuration structure
|
||||
meta map[string]string // default meta information
|
||||
tags map[string]string // default tags
|
||||
config SampleCollectorConfig // the configuration structure
|
||||
meta map[string]string // default meta information
|
||||
tags map[string]string // default tags
|
||||
}
|
||||
|
||||
// Functions to implement MetricCollector interface
|
||||
@@ -36,14 +36,14 @@ func (m *SampleCollector) Init(config json.RawMessage) error {
|
||||
// This is for later use, also call it early
|
||||
m.setup()
|
||||
// Tell whether the collector should be run in parallel with others (reading files, ...)
|
||||
// or it should be run serially, mostly for collectors acutally doing measurements
|
||||
// or it should be run serially, mostly for collectors actually doing measurements
|
||||
// because they should not measure the execution of the other collectors
|
||||
m.parallel = true
|
||||
// Define meta information sent with each metric
|
||||
// (Can also be dynamic or this is the basic set with extension through AddMeta())
|
||||
m.meta = map[string]string{"source": m.name, "group": "SAMPLE"}
|
||||
// Define tags sent with each metric
|
||||
// The 'type' tag is always needed, it defines the granulatity of the metric
|
||||
// The 'type' tag is always needed, it defines the granularity of the metric
|
||||
// node -> whole system
|
||||
// socket -> CPU socket (requires socket ID as 'type-id' tag)
|
||||
// die -> CPU die (requires CPU die ID as 'type-id' tag)
|
||||
|
@@ -38,7 +38,7 @@ func (m *SampleTimerCollector) Init(name string, config json.RawMessage) error {
|
||||
// (Can also be dynamic or this is the basic set with extension through AddMeta())
|
||||
m.meta = map[string]string{"source": m.name, "group": "SAMPLE"}
|
||||
// Define tags sent with each metric
|
||||
// The 'type' tag is always needed, it defines the granulatity of the metric
|
||||
// The 'type' tag is always needed, it defines the granularity of the metric
|
||||
// node -> whole system
|
||||
// socket -> CPU socket (requires socket ID as 'type-id' tag)
|
||||
// cpu -> single CPU hardware thread (requires cpu ID as 'type-id' tag)
|
||||
@@ -60,7 +60,7 @@ func (m *SampleTimerCollector) Init(name string, config json.RawMessage) error {
|
||||
|
||||
// Storage for output channel
|
||||
m.output = nil
|
||||
// Mangement channel for the timer function.
|
||||
// Management channel for the timer function.
|
||||
m.done = make(chan bool)
|
||||
// Create the own ticker
|
||||
m.ticker = time.NewTicker(m.interval)
|
||||
@@ -94,7 +94,7 @@ func (m *SampleTimerCollector) ReadMetrics(timestamp time.Time) {
|
||||
|
||||
value := 1.0
|
||||
|
||||
// If you want to measure something for a specific amout of time, use interval
|
||||
// If you want to measure something for a specific amount of time, use interval
|
||||
// start := readState()
|
||||
// time.Sleep(interval)
|
||||
// stop := readState()
|
||||
|
Reference in New Issue
Block a user