Add collectors for perf command and perf_event_open system call

This commit is contained in:
Thomas Roehl 2024-12-28 03:33:12 +01:00
parent 7b343d0bab
commit 1edddc3dc2
5 changed files with 965 additions and 0 deletions

View File

@ -41,6 +41,8 @@ var AvailableCollectors = map[string]MetricCollector{
"self": new(SelfCollector), "self": new(SelfCollector),
"schedstat": new(SchedstatCollector), "schedstat": new(SchedstatCollector),
"nfsiostat": new(NfsIOStatCollector), "nfsiostat": new(NfsIOStatCollector),
"perf_event": new(PerfEventCollector),
"perf_cmd": new(PerfCmdCollector),
} }
// Metric collector manager data structure // Metric collector manager data structure

384
collectors/perfCmdMetric.go Normal file
View File

@ -0,0 +1,384 @@
package collectors
import (
"encoding/json"
"errors"
"fmt"
"math"
"os"
"os/exec"
"regexp"
"strconv"
"strings"
"time"
lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message"
cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
topo "github.com/ClusterCockpit/cc-metric-collector/pkg/ccTopology"
)
var perf_number_regex = regexp.MustCompile(`(\d+),(\d+)`)
const PERF_NOT_COUNTED = "<not counted>"
const PERF_UNIT_NULL = "(null)"
var VALID_METRIC_TYPES = []string{
"hwthread",
"core",
"llc",
"socket",
"die",
"node",
"memoryDomain",
}
type PerfCmdCollectorEventConfig struct {
Metric string `json:"metric"` // metric name
Event string `json:"event"` // perf event configuration
Type string `json:"type"` // Metric type (aka node, socket, hwthread, ...)
Tags map[string]string `json:"tags,omitempty"` // extra tags for the metric
Meta map[string]string `json:"meta,omitempty"` // extra meta information for the metric
Unit string `json:"unit,omitempty"` // unit of metric (if any)
UsePerfUnit bool `json:"use_perf_unit,omitempty"` // for some events perf tells a metric
TypeAgg string `json:"type_aggregation,omitempty"` // how to aggregate cpu-data to metric type
Publish bool `json:"publish,omitempty"`
//lastCounterValue float64
//lastMetricValue float64
collectorTags *map[string]string
collectorMeta *map[string]string
useCpus map[int][]int
}
type PerfCmdCollectorExpression struct {
Metric string `json:"metric"` // metric name
Expression string `json:"expression"` // expression based on metrics
Type string `json:"type"` // Metric type (aka node, socket, hwthread, ...)
TypeAgg string `json:"type_aggregation,omitempty"` // how to aggregate cpu-data to metric type
Publish bool `json:"publish,omitempty"`
}
// These are the fields we read from the JSON configuration
type PerfCmdCollectorConfig struct {
Metrics []PerfCmdCollectorEventConfig `json:"metrics"`
Expressions []PerfCmdCollectorExpression `json:"expressions"`
PerfCmd string `json:"perf_command,omitempty"`
}
// This contains all variables we need during execution and the variables
// defined by metricCollector (name, init, ...)
type PerfCmdCollector struct {
metricCollector
config PerfCmdCollectorConfig // the configuration structure
meta map[string]string // default meta information
tags map[string]string // default tags
metrics map[string]*PerfCmdCollectorEventConfig // list of events for internal data
perfEventString string
}
// Functions to implement MetricCollector interface
// Init(...), Read(...), Close()
// See: metricCollector.go
// Init initializes the sample collector
// Called once by the collector manager
// All tags, meta data tags and metrics that do not change over the runtime should be set here
func (m *PerfCmdCollector) Init(config json.RawMessage) error {
var err error = nil
// Always set the name early in Init() to use it in cclog.Component* functions
m.name = "PerfCmdCollector"
m.parallel = false
// This is for later use, also call it early
m.setup()
// Tell whether the collector should be run in parallel with others (reading files, ...)
// or it should be run serially, mostly for collectors actually doing measurements
// because they should not measure the execution of the other collectors
m.parallel = true
// Define meta information sent with each metric
// (Can also be dynamic or this is the basic set with extension through AddMeta())
m.meta = map[string]string{"source": m.name, "group": "PerfCounter"}
// Define tags sent with each metric
// The 'type' tag is always needed, it defines the granularity of the metric
// node -> whole system
// socket -> CPU socket (requires socket ID as 'type-id' tag)
// die -> CPU die (requires CPU die ID as 'type-id' tag)
// memoryDomain -> NUMA domain (requires NUMA domain ID as 'type-id' tag)
// llc -> Last level cache (requires last level cache ID as 'type-id' tag)
// core -> single CPU core that may consist of multiple hardware threads (SMT) (requires core ID as 'type-id' tag)
// hwthtread -> single CPU hardware thread (requires hardware thread ID as 'type-id' tag)
// accelerator -> A accelerator device like GPU or FPGA (requires an accelerator ID as 'type-id' tag)
m.tags = map[string]string{"type": "node"}
// Read in the JSON configuration
if len(config) > 0 {
err = json.Unmarshal(config, &m.config)
if err != nil {
cclog.ComponentError(m.name, "Error reading config:", err.Error())
return err
}
}
m.config.PerfCmd = "perf"
if len(m.config.PerfCmd) > 0 {
_, err := os.Stat(m.config.PerfCmd)
if err != nil {
abs, err := exec.LookPath(m.config.PerfCmd)
if err != nil {
cclog.ComponentError(m.name, "Error looking up perf command", m.config.PerfCmd, ":", err.Error())
return err
}
m.config.PerfCmd = abs
}
}
// Set up everything that the collector requires during the Read() execution
// Check files required, test execution of some commands, create data structure
// for all topological entities (sockets, NUMA domains, ...)
// Return some useful error message in case of any failures
valid_metrics := make([]*PerfCmdCollectorEventConfig, 0)
valid_events := make([]string, 0)
test_type := func(Type string) bool {
for _, t := range VALID_METRIC_TYPES {
if Type == t {
return true
}
}
return false
}
for i, metric := range m.config.Metrics {
if !test_type(metric.Type) {
cclog.ComponentError(m.name, "Metric", metric.Metric, "has an invalid type")
continue
}
cmd := exec.Command(m.config.PerfCmd, "stat", "--null", "-e", metric.Event, "hostname")
cclog.ComponentDebug(m.name, "Running", cmd.String())
err := cmd.Run()
if err != nil {
cclog.ComponentError(m.name, "Event", metric.Event, "not available in perf", err.Error())
} else {
valid_metrics = append(valid_metrics, &m.config.Metrics[i])
}
}
if len(valid_metrics) == 0 {
return errors.New("no configured metric available through perf")
}
IntToStringList := func(ilist []int) []string {
list := make([]string, 0)
for _, i := range ilist {
list = append(list, fmt.Sprintf("%v", i))
}
return list
}
m.metrics = make(map[string]*PerfCmdCollectorEventConfig, 0)
for _, metric := range valid_metrics {
metric.collectorMeta = &m.meta
metric.collectorTags = &m.tags
metric.useCpus = make(map[int][]int)
tlist := topo.GetTypeList(metric.Type)
cclog.ComponentDebug(m.name, "Metric", metric.Metric, "with type", metric.Type, ":", strings.Join(IntToStringList(tlist), ","))
for _, t := range tlist {
metric.useCpus[t] = topo.GetTypeHwthreads(metric.Type, t)
cclog.ComponentDebug(m.name, "Metric", metric.Metric, "with type", metric.Type, "and ID", t, ":", strings.Join(IntToStringList(metric.useCpus[t]), ","))
}
m.metrics[metric.Event] = metric
valid_events = append(valid_events, metric.Event)
}
m.perfEventString = strings.Join(valid_events, ",")
cclog.ComponentDebug(m.name, "perfEventString", m.perfEventString)
// Set this flag only if everything is initialized properly, all required files exist, ...
m.init = true
return err
}
type PerfEventJson struct {
CounterValue string `json:"counter-value"`
counterValue float64
MetricValue string `json:"metric-value"`
metricValue float64
CounterUnit string `json:"unit"`
counterUnit string
MetricUnit string `json:"metric-unit"`
metricUnit string
Cpu string `json:"cpu,omitempty"`
cpu int
Event string `json:"event"`
Runtime uint64 `json:"event-runtime"`
PcntRunning float64 `json:"pcnt-running"`
metrictypeid string
metrictype string
metricname string
publish bool
}
func parseEvent(line string) (*PerfEventJson, error) {
data := PerfEventJson{}
tmp := perf_number_regex.ReplaceAllString(line, `$1.$2`)
err := json.Unmarshal([]byte(tmp), &data)
if err != nil {
return nil, err
}
if len(data.CounterValue) > 0 && data.CounterValue != PERF_NOT_COUNTED {
val, err := strconv.ParseFloat(data.CounterValue, 64)
if err == nil {
if data.PcntRunning != 100.0 {
val = (val / data.PcntRunning) * 100
}
data.counterValue = val
}
}
if len(data.MetricValue) > 0 && data.MetricValue != PERF_NOT_COUNTED {
val, err := strconv.ParseFloat(data.MetricValue, 64)
if err == nil {
if data.PcntRunning != 100.0 {
val = (val / data.PcntRunning) * 100
}
data.metricValue = val
}
}
if len(data.CounterUnit) > 0 && data.CounterUnit != PERF_UNIT_NULL {
data.counterUnit = data.CounterUnit
}
if len(data.MetricUnit) > 0 && data.MetricUnit != PERF_UNIT_NULL {
data.metricUnit = data.MetricUnit
}
if len(data.Cpu) > 0 {
val, err := strconv.ParseInt(data.Cpu, 10, 64)
if err == nil {
data.cpu = int(val)
}
}
return &data, nil
}
func perfdataToMetric(data *PerfEventJson, config *PerfCmdCollectorEventConfig, timestamp time.Time) (lp.CCMetric, error) {
metric, err := lp.NewMetric(config.Metric, *config.collectorTags, *config.collectorMeta, data.counterValue, timestamp)
if err == nil {
metric.AddTag("type", data.metrictype)
if data.metrictype != "node" {
metric.AddTag("type-id", data.metrictypeid)
}
for k, v := range config.Tags {
metric.AddTag(k, v)
}
for k, v := range config.Meta {
metric.AddMeta(k, v)
}
if len(config.Unit) > 0 {
metric.AddMeta("unit", config.Unit)
}
if config.UsePerfUnit && (!metric.HasMeta("unit")) && (!metric.HasTag("unit")) {
var unit string = ""
if len(data.counterUnit) > 0 {
unit = data.counterUnit
} else if len(data.metricUnit) > 0 {
unit = data.metricUnit
}
if len(unit) > 0 {
metric.AddMeta("unit", unit)
}
}
return metric, nil
}
return nil, err
}
// Read collects all metrics belonging to the sample collector
// and sends them through the output channel to the collector manager
func (m *PerfCmdCollector) Read(interval time.Duration, output chan lp.CCMessage) {
perfdata := make([]*PerfEventJson, 0)
// Create a sample metric
timestamp := time.Now()
cmd := exec.Command(m.config.PerfCmd, "stat", "-A", "-a", "-j", "-e", m.perfEventString, "/usr/bin/sleep", fmt.Sprintf("%d", int(interval.Seconds())))
cclog.ComponentDebug(m.name, "Running", cmd.String())
out, err := cmd.CombinedOutput()
if err == nil {
sout := strings.TrimSpace(string(out))
for _, l := range strings.Split(sout, "\n") {
d, err := parseEvent(l)
if err == nil {
perfdata = append(perfdata, d)
}
}
} else {
cclog.ComponentError(m.name, "Execution of", cmd.String(), "failed with", err.Error())
}
metricData := make([]*PerfEventJson, 0)
for _, metricTmp := range m.config.Metrics {
metricConfig := m.metrics[metricTmp.Event]
for t, clist := range metricConfig.useCpus {
val := float64(0)
sum := float64(0)
min := math.MaxFloat64
max := float64(0)
count := 0
cunit := ""
munit := ""
for _, c := range clist {
for _, d := range perfdata {
if strings.HasPrefix(d.Event, metricConfig.Event) && d.cpu == c {
//cclog.ComponentDebug(m.name, "do calc on CPU", c, ":", d.counterValue)
sum += d.counterValue
if d.counterValue < min {
min = d.counterValue
}
if d.counterValue > max {
max = d.counterValue
}
count++
cunit = d.counterUnit
munit = d.metricUnit
}
}
}
if metricConfig.TypeAgg == "sum" {
val = sum
} else if metricConfig.TypeAgg == "min" {
val = min
} else if metricConfig.TypeAgg == "max" {
val = max
} else if metricConfig.TypeAgg == "avg" || metricConfig.TypeAgg == "mean" {
val = sum / float64(count)
} else {
val = sum
}
//cclog.ComponentDebug(m.name, "Metric", metricConfig.Metric, "type", metricConfig.Type, "ID", t, ":", val)
metricData = append(metricData, &PerfEventJson{
Event: metricConfig.Event,
metricname: metricConfig.Metric,
metrictype: metricConfig.Type,
metrictypeid: fmt.Sprintf("%v", t),
counterValue: val,
metricValue: 0,
metricUnit: munit,
counterUnit: cunit,
publish: metricConfig.Publish,
})
}
}
for _, d := range metricData {
if d.publish {
m, err := perfdataToMetric(d, m.metrics[d.Event], timestamp)
if err == nil {
output <- m
}
}
}
}
// Close metric collector: close network connection, close files, close libraries, ...
// Called once by the collector manager
func (m *PerfCmdCollector) Close() {
// Unset flag
m.init = false
}

View File

@ -0,0 +1,54 @@
# PerfCmdMetric collector
## Configuration
```json
{
"perf_command": "perf",
"metrics" : [
{
"name": "cpu_cycles",
"event": "cycles",
"unit": "Hz",
"type": "hwthread",
"publish": true,
"use_perf_unit": false,
"type_aggregation": "socket",
"tags": {
"tags_just" : "for_the_event"
},
"meta": {
"meta_info_just" : "for_the_event"
}
}
],
"expressions": [
{
"metric": "avg_cycles_per_second",
"expression": "cpu_cycles / time",
"type": "node",
"type_aggregation": "avg",
"publish": true
}
]
}
```
- `perf_command`: Path to the `perf` command. If it is not an absolute path, the command is looked up in `$PATH`.
- `metrics`: List of metrics to measure
- `name`: Name of metric for output and expressions
- `event`: Event as supplied to `perf stat -e <event>` like `cycles` or `uncore_imc_0/event=0x01,umask=0x00/`
- `unit` : Unit for the metric. Will be added as meta information thus similar then adding `"meta" : {"unit": "myunit"}`.
- `type`: Do measurments at this level (`hwthread` and `socket` are the most common ones).
- `publish`: Publish the metric or use it only for expressions.
- `use_perf_unit`: For some events, `perf` outputs a unit. With this switch, the unit provided by `perf` is added as meta informations.
- `type_aggregation`: Sum the metric values to the given type
- `tags`: Tags just for this metric
- `meta`: Meta informations just for this metric
- `expressions`: Calculate metrics out of multiple measurements
- `metric`: Name of metric for output
- `expression`: What should be calculated
- `type`: Aggregate the expression results to this level
- `type_aggregation`: Aggregate the expression results with `sum`, `min`, `max`, `avg` or `mean`
- `publish`: Publish metric

View File

@ -0,0 +1,481 @@
package collectors
/*
#cgo CFLAGS: -I/usr/include
#cgo LDFLAGS: -Wl,--unresolved-symbols=ignore-in-object-files
#include <stdlib.h>
#include <unistd.h>
#include <stdint.h>
#include <linux/perf_event.h>
#include <linux/hw_breakpoint.h>
#include <sys/ioctl.h>
#include <syscall.h>
#include <string.h>
#include <errno.h>
typedef enum {
PERF_EVENT_WITH_CONFIG1 = (1<<0),
PERF_EVENT_WITH_CONFIG2 = (1<<1),
PERF_EVENT_WITH_EXCLUDE_KERNEL = (1<<2),
PERF_EVENT_WITH_EXCLUDE_HV = (1<<3),
} PERF_EVENT_FLAG;
int perf_event_open(int type, uint64_t config, int cpu, uint64_t config1, uint64_t config2, int uncore)
{
int ret;
struct perf_event_attr attr;
memset(&attr, 0, sizeof(struct perf_event_attr));
attr.type = type;
attr.config = config;
if (!uncore) {
attr.exclude_kernel = 1;
attr.exclude_hv = 1;
}
//attr.disabled = 1;
//
// if (config1 > 0)
// {
// attr.config1 = config1;
// }
// if (config2 > 0)
// {
// attr.config2 = config2;
// }
// if (flags & PERF_EVENT_WITH_CONFIG1)
// {
// attr.config1 = config1;
// }
// if (flags & PERF_EVENT_WITH_CONFIG2)
// {
// attr.config2 = config2;
// }
// if (flags & PERF_EVENT_WITH_EXCLUDE_KERNEL)
// {
// attr.exclude_kernel = 1;
// }
// if (flags & PERF_EVENT_WITH_EXCLUDE_HV)
// {
// attr.exclude_hv = 1;
// }
ret = syscall(__NR_perf_event_open, &attr, -1, cpu, -1, 0);
if (ret < 0)
{
return -errno;
}
return 0;
}
int perf_event_stop(int fd)
{
return ioctl(fd, PERF_EVENT_IOC_DISABLE, 0);
}
int perf_event_start(int fd)
{
return ioctl(fd, PERF_EVENT_IOC_ENABLE, 0);
}
int perf_event_reset(int fd)
{
return ioctl(fd, PERF_EVENT_IOC_RESET, 0);
}
int perf_event_read(int fd, uint64_t *data)
{
int ret = 0;
ret = read(fd, data, sizeof(uint64_t));
if (ret != sizeof(uint64_t))
{
return -errno;
}
return 0;
}
int perf_event_close(int fd)
{
close(fd);
}
*/
import "C"
import (
"encoding/json"
"errors"
"fmt"
"os"
"path"
"strconv"
"strings"
"sync"
"time"
lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message"
cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
"github.com/ClusterCockpit/cc-metric-collector/pkg/ccTopology"
)
const SYSFS_PERF_EVENT_PATH = `/sys/devices`
type PerfEventCollectorEventConfig struct {
Name string `json:"name"`
Unit string `json:"unit,omitempty"`
unitType int
Config string `json:"config"`
config C.uint64_t
Config1 string `json:"config1,omitempty"`
config1 C.uint64_t
Config2 string `json:"config2,omitempty"`
config2 C.uint64_t
ExcludeKernel bool `json:"exclude_kernel,omitempty"`
ExcludeHypervisor bool `json:"exclude_hypervisor,omitempty"`
Tags map[string]string `json:"tags,omitempty"`
Meta map[string]string `json:"meta,omitempty"`
PerHwthread bool `json:"per_hwthread,omitempty"`
PerSocket bool `json:"per_socket,omitempty"`
ScaleFile string `json:"scale_file,omitempty"`
scaling_factor float64
flags uint64
valid bool
cpumask []int
}
type PerfEventCollectorEventData struct {
fd C.int
last uint64
last_diff uint64
idx int
}
type PerfEventCollectorConfig struct {
Events []PerfEventCollectorEventConfig `json:"events"`
events []PerfEventCollectorEventConfig
}
type PerfEventCollector struct {
metricCollector
config PerfEventCollectorConfig // the configuration structure
meta map[string]string // default meta information
tags map[string]string // default tags
events map[int]map[int]PerfEventCollectorEventData
}
func UpdateEventConfig(event *PerfEventCollectorEventConfig) error {
parseHexNumber := func(number string) (uint64, error) {
snum := strings.Trim(number, "\n")
snum = strings.Replace(snum, "0x", "", -1)
snum = strings.Replace(snum, "0X", "", -1)
return strconv.ParseUint(snum, 16, 64)
}
if len(event.Unit) == 0 {
event.Unit = "cpu"
}
unitpath := path.Join(SYSFS_PERF_EVENT_PATH, event.Unit)
if _, err := os.Stat(unitpath); err != nil {
return err
}
typefile := path.Join(unitpath, "type")
if _, err := os.Stat(typefile); err != nil {
return err
}
typebytes, err := os.ReadFile(typefile)
if err != nil {
return err
}
typestring := string(typebytes)
ut, err := strconv.ParseUint(strings.Trim(typestring, "\n"), 10, 64)
if err != nil {
return err
}
event.unitType = int(ut)
if len(event.Config) > 0 {
x, err := parseHexNumber(event.Config)
if err != nil {
return err
}
event.config = C.uint64_t(x)
}
if len(event.Config1) > 0 {
x, err := parseHexNumber(event.Config1)
if err != nil {
return err
}
event.config1 = C.uint64_t(x)
}
if len(event.Config2) > 0 {
x, err := parseHexNumber(event.Config2)
if err != nil {
return err
}
event.config2 = C.uint64_t(x)
}
if len(event.ScaleFile) > 0 {
if _, err := os.Stat(event.ScaleFile); err != nil {
return err
}
scalebytes, err := os.ReadFile(event.ScaleFile)
if err != nil {
return err
}
x, err := strconv.ParseFloat(string(scalebytes), 64)
if err != nil {
return err
}
event.scaling_factor = x
}
event.cpumask = make([]int, 0)
cpumaskfile := path.Join(unitpath, "cpumask")
if _, err := os.Stat(cpumaskfile); err == nil {
cpumaskbytes, err := os.ReadFile(cpumaskfile)
if err != nil {
return err
}
cpumaskstring := strings.Trim(string(cpumaskbytes), "\n")
cclog.Debug("cpumask", cpumaskstring)
for _, part := range strings.Split(cpumaskstring, ",") {
start := 0
end := 0
count, _ := fmt.Sscanf(part, "%d-%d", &start, &end)
cclog.Debug("scanf", count, " s ", start, " e ", end)
if count == 1 {
cclog.Debug("adding ", start)
event.cpumask = append(event.cpumask, start)
} else if count == 2 {
for i := start; i <= end; i++ {
cclog.Debug("adding ", i)
event.cpumask = append(event.cpumask, i)
}
}
}
} else {
event.cpumask = append(event.cpumask, ccTopology.CpuList()...)
}
event.valid = true
return nil
}
func (m *PerfEventCollector) Init(config json.RawMessage) error {
var err error = nil
m.name = "PerfEventCollector"
m.setup()
m.parallel = false
m.meta = map[string]string{"source": m.name, "group": "PerfCounter"}
m.tags = map[string]string{"type": "node"}
cpudata := ccTopology.CpuData()
if len(config) > 0 {
err = json.Unmarshal(config, &m.config)
if err != nil {
cclog.ComponentError(m.name, "Error reading config:", err.Error())
return err
}
}
for i, e := range m.config.Events {
err = UpdateEventConfig(&e)
if err != nil {
cclog.ComponentError(m.name, "Checks for event unit", e.Name, "failed:", err.Error())
}
m.config.Events[i] = e
}
total := 0
m.events = make(map[int]map[int]PerfEventCollectorEventData)
for _, hwt := range cpudata {
cclog.ComponentDebug(m.name, "Adding events for cpuid", hwt.CpuID)
hwt_events := make(map[int]PerfEventCollectorEventData)
for j, e := range m.config.Events {
if e.valid {
if _, ok := intArrayContains(e.cpumask, hwt.CpuID); ok {
cclog.ComponentDebug(m.name, "Adding event", e.Name, fmt.Sprintf("(cpuid %d unit %s(%d) config %s config1 %s config2 %s)",
hwt.CpuID,
e.Unit,
e.unitType,
e.Config,
e.Config1,
e.Config2,
))
// (int type, uint64_t config, int cpu, uint64_t config1, uint64_t config2, int uncore)
fd := C.perf_event_open(C.int(e.unitType), e.config, C.int(hwt.CpuID), e.config1, e.config2, C.int(1))
if fd < 0 {
cclog.ComponentError(m.name, "Failed to create event", e.Name, ":", fd)
continue
}
hwt_events[j] = PerfEventCollectorEventData{
idx: j,
fd: fd,
last: 0,
}
total++
} else {
cclog.ComponentDebug(m.name, "Cpu not in cpumask of unit", e.cpumask)
hwt_events[j] = PerfEventCollectorEventData{
idx: j,
fd: -1,
last: 0,
}
}
} else {
cclog.ComponentError(m.name, "Event", e.Name, "not valid")
}
}
cclog.ComponentDebug(m.name, "Adding", len(hwt_events), "events for cpuid", hwt.CpuID)
m.events[hwt.CpuID] = hwt_events
}
if total == 0 {
cclog.ComponentError(m.name, "Failed to add events")
return errors.New("failed to add events")
}
m.init = true
return err
}
func (m *PerfEventCollector) CalcSocketData() map[int]map[int]interface{} {
out := make(map[int]map[int]interface{})
for cpuid, cpudata := range m.events {
for i, eventdata := range cpudata {
eventconfig := m.config.Events[i]
sid := ccTopology.GetHwthreadSocket(cpuid)
if _, ok := out[sid]; !ok {
out[sid] = make(map[int]interface{})
for i := range cpudata {
out[sid][i] = 0
}
}
if eventconfig.scaling_factor != 0 {
out[sid][i] = out[sid][i].(float64) + (float64(eventdata.last_diff) * eventconfig.scaling_factor)
} else {
out[sid][i] = out[sid][i].(uint64) + eventdata.last_diff
}
}
}
return out
}
func (m *PerfEventCollector) Read(interval time.Duration, output chan lp.CCMessage) {
timestamp := time.Now()
var wg sync.WaitGroup
for cpuid := range m.events {
wg.Add(1)
go func(cpuid int, data map[int]map[int]PerfEventCollectorEventData, wg *sync.WaitGroup) {
var err error = nil
var events map[int]PerfEventCollectorEventData = data[cpuid]
for i, e := range events {
var data C.uint64_t = 0
if e.fd < 0 {
continue
}
ret := C.perf_event_read(e.fd, &data)
if ret < 0 {
event := m.config.Events[i]
cclog.ComponentError(m.name, "Failed to read event", event.Name, ":", ret)
}
if e.last == 0 {
cclog.ComponentDebug(m.name, "Updating last value on first iteration")
e.last = uint64(data)
} else {
var metric lp.CCMetric
event := m.config.Events[i]
value := uint64(data) - e.last
cclog.ComponentDebug(m.name, "Calculating difference", uint64(data), "-", e.last, "=", uint64(data)-e.last)
e.last = uint64(data)
e.last_diff = value
if event.scaling_factor == 0 {
metric, err = lp.NewMetric(event.Name, m.tags, m.meta, value, timestamp)
} else {
var f64_value float64 = float64(value) * event.scaling_factor
metric, err = lp.NewMetric(event.Name, m.tags, m.meta, f64_value, timestamp)
}
//if event.PerHwthread {
if err == nil {
metric.AddTag("type", "hwthread")
metric.AddTag("type-id", fmt.Sprintf("%d", cpuid))
for k, v := range event.Tags {
metric.AddTag(k, v)
}
for k, v := range event.Meta {
metric.AddMeta(k, v)
}
output <- metric
} else {
cclog.ComponentError(m.name, "Failed to create CCMetric for event", event.Name)
}
//}
}
events[i] = e
}
data[cpuid] = events
wg.Done()
}(cpuid, m.events, &wg)
}
wg.Wait()
// var data C.uint64_t = 0
// event := m.config.Events[e.idx]
// cclog.ComponentDebug(m.name, "Reading event", event.Name)
// ret := C.perf_event_read(e.fd, &data)
// if ret < 0 {
// cclog.ComponentError(m.name, "Failed to read event", event.Name, ":", ret)
// }
// if e.last == 0 {
// cclog.ComponentDebug(m.name, "Updating last value on first iteration")
// e.last = uint64(data)
// } else {
// value := uint64(data) - e.last
// cclog.ComponentDebug(m.name, "Calculating difference", uint64(data), "-", e.last, "=", uint64(data)-e.last)
// e.last = uint64(data)
// y, err := lp.NewMetric(event.Name, m.tags, m.meta, value, timestamp)
// if err == nil {
// for k, v := range event.Tags {
// y.AddTag(k, v)
// }
// for k, v := range event.Meta {
// y.AddMeta(k, v)
// }
// output <- y
// } else {
// cclog.ComponentError(m.name, "Failed to create CCMetric for event", event.Name)
// }
// }
// m.events[i] = e
// }
}
func (m *PerfEventCollector) Close() {
for _, events := range m.events {
for _, e := range events {
C.perf_event_close(e.fd)
}
}
m.init = false
}

View File

@ -0,0 +1,44 @@
# `perf_event` collector
This collector uses directly the `perf_event_open` system call to measure events. There is no name to event translation, the configuration has to be as low-level as required by the system call. It allows to aggregate the measurements to topological entities like socket or the whole node.
## Configuration
```json
{
"events" : [
{
"name" : "instructions",
"unit" : "uncore_imc_0",
"config": "0x01",
"scale_file" : "/sys/devices/<unit>/events/<event>.scale",
"per_hwthread": true,
"per_socket": true,
"exclude_kernel": true,
"exclude_hypervisor": true,
"tags": {
"tags": "just_for_the_event"
},
"meta": {
"meta_info": "just_for_the_event"
},
"config1": "0x00",
"config2": "0x00",
}
]
}
```
- `events`: List of events to measure
- `name`: Name for the metric
- `unit`: Unit of the event or `cpu` if not given. The unit type ID is resolved by reading the file `/sys/devices/<unit>/type`. The unit type ID is then written to the `perf_event_attr` struct member `type`.
- `config`: Hex value written to the `perf_event_attr` struct member `config`.
- `config1`: Hex value written to the `perf_event_attr` struct member `config1` (optional).
- `config2`: Hex value written to the `perf_event_attr` struct member `config1` (optional).
- `scale_file`: If a measurement requires scaling, like the `power` unit aka RAPL, it is provided by the kernel in a `.scale` file at `/sys/devices/<unit>/events/<event>.scale`.
- `exclude_kernel`: Exclude the kernel from measurements (default: `true`). It sets the `perf_event_attr` struct member `exclude_kernel`.
- `exclude_hypervisor`: Exclude the hypervisors from measurements (default: `true`). It sets the `perf_event_attr` struct member `exclude_hypervisor`.
- `per_hwthread`: Generate metrics per hardware thread (default: `false`)
- `per_socket`: Generate metrics per hardware thread (default: `false`)
- `tags`: Tags just for the event.
- `meta`: Meta information just for the event, often a `unit`