cc-metric-collector/collectors/perfCmdMetric.go

385 lines
13 KiB
Go
Raw Permalink Normal View History

package collectors
import (
"encoding/json"
"errors"
"fmt"
"math"
"os"
"os/exec"
"regexp"
"strconv"
"strings"
"time"
lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message"
cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
topo "github.com/ClusterCockpit/cc-metric-collector/pkg/ccTopology"
)
var perf_number_regex = regexp.MustCompile(`(\d+),(\d+)`)
const PERF_NOT_COUNTED = "<not counted>"
const PERF_UNIT_NULL = "(null)"
var VALID_METRIC_TYPES = []string{
"hwthread",
"core",
"llc",
"socket",
"die",
"node",
"memoryDomain",
}
type PerfCmdCollectorEventConfig struct {
Metric string `json:"metric"` // metric name
Event string `json:"event"` // perf event configuration
Type string `json:"type"` // Metric type (aka node, socket, hwthread, ...)
Tags map[string]string `json:"tags,omitempty"` // extra tags for the metric
Meta map[string]string `json:"meta,omitempty"` // extra meta information for the metric
Unit string `json:"unit,omitempty"` // unit of metric (if any)
UsePerfUnit bool `json:"use_perf_unit,omitempty"` // for some events perf tells a metric
TypeAgg string `json:"type_aggregation,omitempty"` // how to aggregate cpu-data to metric type
Publish bool `json:"publish,omitempty"`
//lastCounterValue float64
//lastMetricValue float64
collectorTags *map[string]string
collectorMeta *map[string]string
useCpus map[int][]int
}
type PerfCmdCollectorExpression struct {
Metric string `json:"metric"` // metric name
Expression string `json:"expression"` // expression based on metrics
Type string `json:"type"` // Metric type (aka node, socket, hwthread, ...)
TypeAgg string `json:"type_aggregation,omitempty"` // how to aggregate cpu-data to metric type
Publish bool `json:"publish,omitempty"`
}
// These are the fields we read from the JSON configuration
type PerfCmdCollectorConfig struct {
Metrics []PerfCmdCollectorEventConfig `json:"metrics"`
Expressions []PerfCmdCollectorExpression `json:"expressions"`
PerfCmd string `json:"perf_command,omitempty"`
}
// This contains all variables we need during execution and the variables
// defined by metricCollector (name, init, ...)
type PerfCmdCollector struct {
metricCollector
config PerfCmdCollectorConfig // the configuration structure
meta map[string]string // default meta information
tags map[string]string // default tags
metrics map[string]*PerfCmdCollectorEventConfig // list of events for internal data
perfEventString string
}
// Functions to implement MetricCollector interface
// Init(...), Read(...), Close()
// See: metricCollector.go
// Init initializes the sample collector
// Called once by the collector manager
// All tags, meta data tags and metrics that do not change over the runtime should be set here
func (m *PerfCmdCollector) Init(config json.RawMessage) error {
var err error = nil
// Always set the name early in Init() to use it in cclog.Component* functions
m.name = "PerfCmdCollector"
m.parallel = false
// This is for later use, also call it early
m.setup()
// Tell whether the collector should be run in parallel with others (reading files, ...)
// or it should be run serially, mostly for collectors actually doing measurements
// because they should not measure the execution of the other collectors
m.parallel = true
// Define meta information sent with each metric
// (Can also be dynamic or this is the basic set with extension through AddMeta())
m.meta = map[string]string{"source": m.name, "group": "PerfCounter"}
// Define tags sent with each metric
// The 'type' tag is always needed, it defines the granularity of the metric
// node -> whole system
// socket -> CPU socket (requires socket ID as 'type-id' tag)
// die -> CPU die (requires CPU die ID as 'type-id' tag)
// memoryDomain -> NUMA domain (requires NUMA domain ID as 'type-id' tag)
// llc -> Last level cache (requires last level cache ID as 'type-id' tag)
// core -> single CPU core that may consist of multiple hardware threads (SMT) (requires core ID as 'type-id' tag)
// hwthtread -> single CPU hardware thread (requires hardware thread ID as 'type-id' tag)
// accelerator -> A accelerator device like GPU or FPGA (requires an accelerator ID as 'type-id' tag)
m.tags = map[string]string{"type": "node"}
// Read in the JSON configuration
if len(config) > 0 {
err = json.Unmarshal(config, &m.config)
if err != nil {
cclog.ComponentError(m.name, "Error reading config:", err.Error())
return err
}
}
m.config.PerfCmd = "perf"
if len(m.config.PerfCmd) > 0 {
_, err := os.Stat(m.config.PerfCmd)
if err != nil {
abs, err := exec.LookPath(m.config.PerfCmd)
if err != nil {
cclog.ComponentError(m.name, "Error looking up perf command", m.config.PerfCmd, ":", err.Error())
return err
}
m.config.PerfCmd = abs
}
}
// Set up everything that the collector requires during the Read() execution
// Check files required, test execution of some commands, create data structure
// for all topological entities (sockets, NUMA domains, ...)
// Return some useful error message in case of any failures
valid_metrics := make([]*PerfCmdCollectorEventConfig, 0)
valid_events := make([]string, 0)
test_type := func(Type string) bool {
for _, t := range VALID_METRIC_TYPES {
if Type == t {
return true
}
}
return false
}
for i, metric := range m.config.Metrics {
if !test_type(metric.Type) {
cclog.ComponentError(m.name, "Metric", metric.Metric, "has an invalid type")
continue
}
cmd := exec.Command(m.config.PerfCmd, "stat", "--null", "-e", metric.Event, "hostname")
cclog.ComponentDebug(m.name, "Running", cmd.String())
err := cmd.Run()
if err != nil {
cclog.ComponentError(m.name, "Event", metric.Event, "not available in perf", err.Error())
} else {
valid_metrics = append(valid_metrics, &m.config.Metrics[i])
}
}
if len(valid_metrics) == 0 {
return errors.New("no configured metric available through perf")
}
IntToStringList := func(ilist []int) []string {
list := make([]string, 0)
for _, i := range ilist {
list = append(list, fmt.Sprintf("%v", i))
}
return list
}
m.metrics = make(map[string]*PerfCmdCollectorEventConfig, 0)
for _, metric := range valid_metrics {
metric.collectorMeta = &m.meta
metric.collectorTags = &m.tags
metric.useCpus = make(map[int][]int)
tlist := topo.GetTypeList(metric.Type)
cclog.ComponentDebug(m.name, "Metric", metric.Metric, "with type", metric.Type, ":", strings.Join(IntToStringList(tlist), ","))
for _, t := range tlist {
metric.useCpus[t] = topo.GetTypeHwthreads(metric.Type, t)
cclog.ComponentDebug(m.name, "Metric", metric.Metric, "with type", metric.Type, "and ID", t, ":", strings.Join(IntToStringList(metric.useCpus[t]), ","))
}
m.metrics[metric.Event] = metric
valid_events = append(valid_events, metric.Event)
}
m.perfEventString = strings.Join(valid_events, ",")
cclog.ComponentDebug(m.name, "perfEventString", m.perfEventString)
// Set this flag only if everything is initialized properly, all required files exist, ...
m.init = true
return err
}
type PerfEventJson struct {
CounterValue string `json:"counter-value"`
counterValue float64
MetricValue string `json:"metric-value"`
metricValue float64
CounterUnit string `json:"unit"`
counterUnit string
MetricUnit string `json:"metric-unit"`
metricUnit string
Cpu string `json:"cpu,omitempty"`
cpu int
Event string `json:"event"`
Runtime uint64 `json:"event-runtime"`
PcntRunning float64 `json:"pcnt-running"`
metrictypeid string
metrictype string
metricname string
publish bool
}
func parseEvent(line string) (*PerfEventJson, error) {
data := PerfEventJson{}
tmp := perf_number_regex.ReplaceAllString(line, `$1.$2`)
err := json.Unmarshal([]byte(tmp), &data)
if err != nil {
return nil, err
}
if len(data.CounterValue) > 0 && data.CounterValue != PERF_NOT_COUNTED {
val, err := strconv.ParseFloat(data.CounterValue, 64)
if err == nil {
if data.PcntRunning != 100.0 {
val = (val / data.PcntRunning) * 100
}
data.counterValue = val
}
}
if len(data.MetricValue) > 0 && data.MetricValue != PERF_NOT_COUNTED {
val, err := strconv.ParseFloat(data.MetricValue, 64)
if err == nil {
if data.PcntRunning != 100.0 {
val = (val / data.PcntRunning) * 100
}
data.metricValue = val
}
}
if len(data.CounterUnit) > 0 && data.CounterUnit != PERF_UNIT_NULL {
data.counterUnit = data.CounterUnit
}
if len(data.MetricUnit) > 0 && data.MetricUnit != PERF_UNIT_NULL {
data.metricUnit = data.MetricUnit
}
if len(data.Cpu) > 0 {
val, err := strconv.ParseInt(data.Cpu, 10, 64)
if err == nil {
data.cpu = int(val)
}
}
return &data, nil
}
func perfdataToMetric(data *PerfEventJson, config *PerfCmdCollectorEventConfig, timestamp time.Time) (lp.CCMetric, error) {
metric, err := lp.NewMetric(config.Metric, *config.collectorTags, *config.collectorMeta, data.counterValue, timestamp)
if err == nil {
metric.AddTag("type", data.metrictype)
if data.metrictype != "node" {
metric.AddTag("type-id", data.metrictypeid)
}
for k, v := range config.Tags {
metric.AddTag(k, v)
}
for k, v := range config.Meta {
metric.AddMeta(k, v)
}
if len(config.Unit) > 0 {
metric.AddMeta("unit", config.Unit)
}
if config.UsePerfUnit && (!metric.HasMeta("unit")) && (!metric.HasTag("unit")) {
var unit string = ""
if len(data.counterUnit) > 0 {
unit = data.counterUnit
} else if len(data.metricUnit) > 0 {
unit = data.metricUnit
}
if len(unit) > 0 {
metric.AddMeta("unit", unit)
}
}
return metric, nil
}
return nil, err
}
// Read collects all metrics belonging to the sample collector
// and sends them through the output channel to the collector manager
func (m *PerfCmdCollector) Read(interval time.Duration, output chan lp.CCMessage) {
perfdata := make([]*PerfEventJson, 0)
// Create a sample metric
timestamp := time.Now()
cmd := exec.Command(m.config.PerfCmd, "stat", "-A", "-a", "-j", "-e", m.perfEventString, "/usr/bin/sleep", fmt.Sprintf("%d", int(interval.Seconds())))
cclog.ComponentDebug(m.name, "Running", cmd.String())
out, err := cmd.CombinedOutput()
if err == nil {
sout := strings.TrimSpace(string(out))
for _, l := range strings.Split(sout, "\n") {
d, err := parseEvent(l)
if err == nil {
perfdata = append(perfdata, d)
}
}
} else {
cclog.ComponentError(m.name, "Execution of", cmd.String(), "failed with", err.Error())
}
metricData := make([]*PerfEventJson, 0)
for _, metricTmp := range m.config.Metrics {
metricConfig := m.metrics[metricTmp.Event]
for t, clist := range metricConfig.useCpus {
val := float64(0)
sum := float64(0)
min := math.MaxFloat64
max := float64(0)
count := 0
cunit := ""
munit := ""
for _, c := range clist {
for _, d := range perfdata {
if strings.HasPrefix(d.Event, metricConfig.Event) && d.cpu == c {
//cclog.ComponentDebug(m.name, "do calc on CPU", c, ":", d.counterValue)
sum += d.counterValue
if d.counterValue < min {
min = d.counterValue
}
if d.counterValue > max {
max = d.counterValue
}
count++
cunit = d.counterUnit
munit = d.metricUnit
}
}
}
if metricConfig.TypeAgg == "sum" {
val = sum
} else if metricConfig.TypeAgg == "min" {
val = min
} else if metricConfig.TypeAgg == "max" {
val = max
} else if metricConfig.TypeAgg == "avg" || metricConfig.TypeAgg == "mean" {
val = sum / float64(count)
} else {
val = sum
}
//cclog.ComponentDebug(m.name, "Metric", metricConfig.Metric, "type", metricConfig.Type, "ID", t, ":", val)
metricData = append(metricData, &PerfEventJson{
Event: metricConfig.Event,
metricname: metricConfig.Metric,
metrictype: metricConfig.Type,
metrictypeid: fmt.Sprintf("%v", t),
counterValue: val,
metricValue: 0,
metricUnit: munit,
counterUnit: cunit,
publish: metricConfig.Publish,
})
}
}
for _, d := range metricData {
if d.publish {
m, err := perfdataToMetric(d, m.metrics[d.Event], timestamp)
if err == nil {
output <- m
}
}
}
}
// Close metric collector: close network connection, close files, close libraries, ...
// Called once by the collector manager
func (m *PerfCmdCollector) Close() {
// Unset flag
m.init = false
}