Merge branch 'develop' into use_metric_pointers

This commit is contained in:
Thomas Gruber 2022-02-02 15:53:26 +01:00 committed by GitHub
commit 2611b1e301
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 996 additions and 405 deletions

View File

@ -1,5 +1,8 @@
name: Run RPM Build name: Run RPM Build
on: push on:
push:
tags:
- '**'
jobs: jobs:
build-centos8: build-centos8:

View File

@ -2,7 +2,7 @@ name: Run Test
on: push on: push
jobs: jobs:
build: build-1-17:
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
- uses: actions/checkout@v2 - uses: actions/checkout@v2
@ -18,3 +18,19 @@ jobs:
- name: Run MetricCollector - name: Run MetricCollector
run: ./cc-metric-collector --once --config .github/ci-config.json run: ./cc-metric-collector --once --config .github/ci-config.json
build-1-16:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
# See: https://github.com/marketplace/actions/setup-go-environment
- name: Setup Golang
uses: actions/setup-go@v2.1.5
with:
go-version: '^1.16.7' # The version AlmaLinux 8.5 uses
- name: Build MetricCollector
run: make
- name: Run MetricCollector
run: ./cc-metric-collector --once --config .github/ci-config.json

View File

@ -13,41 +13,75 @@ import (
"errors" "errors"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"log"
"math" "math"
"os" "os"
"regexp"
"strconv" "strconv"
"strings" "strings"
"time" "time"
"unsafe" "unsafe"
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
"gopkg.in/Knetic/govaluate.v2" topo "github.com/ClusterCockpit/cc-metric-collector/internal/ccTopology"
mr "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
) )
type MetricScope int type MetricScope string
const ( const (
METRIC_SCOPE_HWTHREAD = iota METRIC_SCOPE_HWTHREAD = iota
METRIC_SCOPE_SOCKET METRIC_SCOPE_CORE
METRIC_SCOPE_LLC
METRIC_SCOPE_NUMA METRIC_SCOPE_NUMA
METRIC_SCOPE_DIE
METRIC_SCOPE_SOCKET
METRIC_SCOPE_NODE METRIC_SCOPE_NODE
) )
func (ms MetricScope) String() string { func (ms MetricScope) String() string {
return []string{"Head", "Shoulder", "Knee", "Toe"}[ms] return string(ms)
}
func (ms MetricScope) Likwid() string {
LikwidDomains := map[string]string{
"hwthread": "",
"core": "",
"llc": "C",
"numadomain": "M",
"die": "D",
"socket": "S",
"node": "N",
}
return LikwidDomains[string(ms)]
}
func (ms MetricScope) Granularity() int {
for i, g := range GetAllMetricScopes() {
if ms == g {
return i
}
}
return -1
}
func GetAllMetricScopes() []MetricScope {
return []MetricScope{"hwthread" /*, "core", "llc", "numadomain", "die",*/, "socket", "node"}
} }
type LikwidCollectorMetricConfig struct { type LikwidCollectorMetricConfig struct {
Name string `json:"name"` Name string `json:"name"` // Name of the metric
Calc string `json:"calc"` Calc string `json:"calc"` // Calculation for the metric using
Scope MetricScope `json:"socket_scope"` Aggr string `json:"aggregation"` // if scope unequal to LIKWID metric scope, the values are combined (sum, min, max, mean or avg, median)
Publish bool `json:"publish"` Scope MetricScope `json:"scope"` // scope for calculation. subscopes are aggregated using the 'aggregation' function
Publish bool `json:"publish"`
granulatity MetricScope
} }
type LikwidCollectorEventsetConfig struct { type LikwidCollectorEventsetConfig struct {
Events map[string]string `json:"events"` Events map[string]string `json:"events"`
Metrics []LikwidCollectorMetricConfig `json:"metrics"` granulatity map[string]MetricScope
Metrics []LikwidCollectorMetricConfig `json:"metrics"`
} }
type LikwidCollectorConfig struct { type LikwidCollectorConfig struct {
@ -59,22 +93,25 @@ type LikwidCollectorConfig struct {
type LikwidCollector struct { type LikwidCollector struct {
metricCollector metricCollector
cpulist []C.int cpulist []C.int
sock2tid map[int]int cpu2tid map[int]int
metrics map[C.int]map[string]int sock2tid map[int]int
groups []C.int scopeRespTids map[MetricScope]map[int]int
config LikwidCollectorConfig metrics map[C.int]map[string]int
results map[int]map[int]map[string]interface{} groups []C.int
mresults map[int]map[int]map[string]float64 config LikwidCollectorConfig
gmresults map[int]map[string]float64 results map[int]map[int]map[string]interface{}
basefreq float64 mresults map[int]map[int]map[string]float64
gmresults map[int]map[string]float64
basefreq float64
running bool
} }
type LikwidMetric struct { type LikwidMetric struct {
name string name string
search string search string
socket_scope bool scope MetricScope
group_idx int group_idx int
} }
func eventsToEventStr(events map[string]string) string { func eventsToEventStr(events map[string]string) string {
@ -85,6 +122,21 @@ func eventsToEventStr(events map[string]string) string {
return strings.Join(elist, ",") return strings.Join(elist, ",")
} }
func getGranularity(counter, event string) MetricScope {
if strings.HasPrefix(counter, "PMC") || strings.HasPrefix(counter, "FIXC") {
return "hwthread"
} else if strings.Contains(counter, "BOX") || strings.Contains(counter, "DEV") {
return "socket"
} else if strings.HasPrefix(counter, "PWR") {
if event == "RAPL_CORE_ENERGY" {
return "hwthread"
} else {
return "socket"
}
}
return "unknown"
}
func getBaseFreq() float64 { func getBaseFreq() float64 {
var freq float64 = math.NaN() var freq float64 = math.NaN()
C.power_init(0) C.power_init(0)
@ -104,18 +156,98 @@ func getBaseFreq() float64 {
return freq return freq
} }
func getSocketCpus() map[C.int]int { func (m *LikwidCollector) initGranularity() {
slist := SocketList() splitRegex := regexp.MustCompile("[+-/*()]")
var cpu C.int for _, evset := range m.config.Eventsets {
outmap := make(map[C.int]int) evset.granulatity = make(map[string]MetricScope)
for _, s := range slist { for counter, event := range evset.Events {
t := C.CString(fmt.Sprintf("S%d", s)) gran := getGranularity(counter, event)
clen := C.cpustr_to_cpulist(t, &cpu, 1) if gran.Granularity() >= 0 {
if int(clen) == 1 { evset.granulatity[counter] = gran
outmap[cpu] = s }
}
for i, metric := range evset.Metrics {
s := splitRegex.Split(metric.Calc, -1)
gran := MetricScope("hwthread")
evset.Metrics[i].granulatity = gran
for _, x := range s {
if _, ok := evset.Events[x]; ok {
if evset.granulatity[x].Granularity() > gran.Granularity() {
gran = evset.granulatity[x]
}
}
}
evset.Metrics[i].granulatity = gran
} }
} }
return outmap for i, metric := range m.config.Metrics {
s := splitRegex.Split(metric.Calc, -1)
gran := MetricScope("hwthread")
m.config.Metrics[i].granulatity = gran
for _, x := range s {
for _, evset := range m.config.Eventsets {
for _, m := range evset.Metrics {
if m.Name == x && m.granulatity.Granularity() > gran.Granularity() {
gran = m.granulatity
}
}
}
}
m.config.Metrics[i].granulatity = gran
}
}
type TopoResolveFunc func(cpuid int) int
func (m *LikwidCollector) getResponsiblities() map[MetricScope]map[int]int {
get_cpus := func(scope MetricScope) map[int]int {
var slist []int
var cpu C.int
var input func(index int) string
switch scope {
case "node":
slist = []int{0}
input = func(index int) string { return "N:0" }
case "socket":
input = func(index int) string { return fmt.Sprintf("%s%d:0", scope.Likwid(), index) }
slist = topo.SocketList()
// case "numadomain":
// input = func(index int) string { return fmt.Sprintf("%s%d:0", scope.Likwid(), index) }
// slist = topo.NumaNodeList()
// cclog.Debug(scope, " ", input(0), " ", slist)
// case "die":
// input = func(index int) string { return fmt.Sprintf("%s%d:0", scope.Likwid(), index) }
// slist = topo.DieList()
// case "llc":
// input = fmt.Sprintf("%s%d:0", scope.Likwid(), s)
// slist = topo.LLCacheList()
case "hwthread":
input = func(index int) string { return fmt.Sprintf("%d", index) }
slist = topo.CpuList()
}
outmap := make(map[int]int)
for _, s := range slist {
t := C.CString(input(s))
clen := C.cpustr_to_cpulist(t, &cpu, 1)
if int(clen) == 1 {
outmap[s] = m.cpu2tid[int(cpu)]
} else {
cclog.Error(fmt.Sprintf("Cannot determine responsible CPU for %s", input(s)))
outmap[s] = -1
}
C.free(unsafe.Pointer(t))
}
return outmap
}
scopes := GetAllMetricScopes()
complete := make(map[MetricScope]map[int]int)
for _, s := range scopes {
cclog.Debug("Start ", s)
complete[s] = get_cpus(s)
cclog.Debug("End ", s)
}
return complete
} }
func (m *LikwidCollector) Init(config json.RawMessage) error { func (m *LikwidCollector) Init(config json.RawMessage) error {
@ -127,38 +259,74 @@ func (m *LikwidCollector) Init(config json.RawMessage) error {
return err return err
} }
} }
if m.config.ForceOverwrite {
cclog.ComponentDebug(m.name, "Set LIKWID_FORCE=1")
os.Setenv("LIKWID_FORCE", "1")
}
m.setup() m.setup()
m.meta = map[string]string{"source": m.name, "group": "PerfCounter"}
cpulist := CpuList()
m.cpulist = make([]C.int, len(cpulist))
slist := getSocketCpus()
m.sock2tid = make(map[int]int) m.meta = map[string]string{"source": m.name, "group": "PerfCounter"}
// m.numa2tid = make(map[int]int) cclog.ComponentDebug(m.name, "Get cpulist and init maps and lists")
cpulist := topo.CpuList()
m.cpulist = make([]C.int, len(cpulist))
m.cpu2tid = make(map[int]int)
for i, c := range cpulist { for i, c := range cpulist {
m.cpulist[i] = C.int(c) m.cpulist[i] = C.int(c)
if sid, found := slist[m.cpulist[i]]; found { m.cpu2tid[c] = i
m.sock2tid[sid] = i
}
} }
m.results = make(map[int]map[int]map[string]interface{}) m.results = make(map[int]map[int]map[string]interface{})
m.mresults = make(map[int]map[int]map[string]float64) m.mresults = make(map[int]map[int]map[string]float64)
m.gmresults = make(map[int]map[string]float64) m.gmresults = make(map[int]map[string]float64)
cclog.ComponentDebug(m.name, "initialize LIKWID topology")
ret = C.topology_init() ret = C.topology_init()
if ret != 0 { if ret != 0 {
return errors.New("Failed to initialize LIKWID topology") err := errors.New("failed to initialize LIKWID topology")
} cclog.ComponentError(m.name, err.Error())
if m.config.ForceOverwrite { return err
os.Setenv("LIKWID_FORCE", "1")
} }
// Determine which counter works at which level. PMC*: hwthread, *BOX*: socket, ...
m.initGranularity()
// Generate map for MetricScope -> scope_id (like socket id) -> responsible id (offset in cpulist)
m.scopeRespTids = m.getResponsiblities()
cclog.ComponentDebug(m.name, "initialize LIKWID perfmon module")
ret = C.perfmon_init(C.int(len(m.cpulist)), &m.cpulist[0]) ret = C.perfmon_init(C.int(len(m.cpulist)), &m.cpulist[0])
if ret != 0 { if ret != 0 {
C.topology_finalize() C.topology_finalize()
return errors.New("Failed to initialize LIKWID topology") err := errors.New("failed to initialize LIKWID topology")
cclog.ComponentError(m.name, err.Error())
return err
} }
// This is for the global metrics computation test
globalParams := make(map[string]interface{})
globalParams["time"] = float64(1.0)
globalParams["inverseClock"] = float64(1.0)
// While adding the events, we test the metrics whether they can be computed at all
for i, evset := range m.config.Eventsets { for i, evset := range m.config.Eventsets {
estr := eventsToEventStr(evset.Events) estr := eventsToEventStr(evset.Events)
// Generate parameter list for the metric computing test
params := make(map[string]interface{})
params["time"] = float64(1.0)
params["inverseClock"] = float64(1.0)
for counter := range evset.Events {
params[counter] = float64(1.0)
}
for _, metric := range evset.Metrics {
// Try to evaluate the metric
_, err := mr.EvalFloat64Condition(metric.Calc, params)
if err != nil {
cclog.ComponentError(m.name, "Calculation for metric", metric.Name, "failed:", err.Error())
continue
}
// If the metric is not in the parameter list for the global metrics, add it
if _, ok := globalParams[metric.Name]; !ok {
globalParams[metric.Name] = float64(1.0)
}
}
// Now we add the list of events to likwid
cstr := C.CString(estr) cstr := C.CString(estr)
gid := C.perfmon_addEventSet(cstr) gid := C.perfmon_addEventSet(cstr)
if gid >= 0 { if gid >= 0 {
@ -170,161 +338,191 @@ func (m *LikwidCollector) Init(config json.RawMessage) error {
for tid := range m.cpulist { for tid := range m.cpulist {
m.results[i][tid] = make(map[string]interface{}) m.results[i][tid] = make(map[string]interface{})
m.mresults[i][tid] = make(map[string]float64) m.mresults[i][tid] = make(map[string]float64)
m.gmresults[tid] = make(map[string]float64) if i == 0 {
m.gmresults[tid] = make(map[string]float64)
}
}
}
for _, metric := range m.config.Metrics {
// Try to evaluate the global metric
_, err := mr.EvalFloat64Condition(metric.Calc, globalParams)
if err != nil {
cclog.ComponentError(m.name, "Calculation for metric", metric.Name, "failed:", err.Error())
continue
} }
} }
// If no event set could be added, shut down LikwidCollector
if len(m.groups) == 0 { if len(m.groups) == 0 {
C.perfmon_finalize() C.perfmon_finalize()
C.topology_finalize() C.topology_finalize()
return errors.New("No LIKWID performance group initialized") err := errors.New("no LIKWID performance group initialized")
cclog.ComponentError(m.name, err.Error())
return err
} }
m.basefreq = getBaseFreq() m.basefreq = getBaseFreq()
m.init = true m.init = true
return nil return nil
} }
func (m *LikwidCollector) Read(interval time.Duration, output chan *lp.CCMetric) {
// take a measurement for 'interval' seconds of event set index 'group'
func (m *LikwidCollector) takeMeasurement(group int, interval time.Duration) error {
var ret C.int
gid := m.groups[group]
ret = C.perfmon_setupCounters(gid)
if ret != 0 {
gctr := C.GoString(C.perfmon_getGroupName(gid))
err := fmt.Errorf("failed to setup performance group %s", gctr)
cclog.ComponentError(m.name, err.Error())
return err
}
ret = C.perfmon_startCounters()
if ret != 0 {
gctr := C.GoString(C.perfmon_getGroupName(gid))
err := fmt.Errorf("failed to start performance group %s", gctr)
cclog.ComponentError(m.name, err.Error())
return err
}
m.running = true
time.Sleep(interval)
m.running = false
ret = C.perfmon_stopCounters()
if ret != 0 {
gctr := C.GoString(C.perfmon_getGroupName(gid))
err := fmt.Errorf("failed to stop performance group %s", gctr)
cclog.ComponentError(m.name, err.Error())
return err
}
return nil
}
// Get all measurement results for an event set, derive the metric values out of the measurement results and send it
func (m *LikwidCollector) calcEventsetMetrics(group int, interval time.Duration, output chan lp.CCMetric) error {
var eidx C.int
evset := m.config.Eventsets[group]
gid := m.groups[group]
// Go over events and get the results
for eidx = 0; int(eidx) < len(evset.Events); eidx++ {
ctr := C.perfmon_getCounterName(gid, eidx)
ev := C.perfmon_getEventName(gid, eidx)
gctr := C.GoString(ctr)
gev := C.GoString(ev)
// MetricScope for the counter (and if needed the event)
scope := getGranularity(gctr, gev)
// Get the map scope-id -> tids
// This way we read less counters like only the responsible hardware thread for a socket
scopemap := m.scopeRespTids[scope]
for _, tid := range scopemap {
if tid >= 0 {
m.results[group][tid]["time"] = interval.Seconds()
m.results[group][tid]["inverseClock"] = float64(1.0 / m.basefreq)
res := C.perfmon_getLastResult(gid, eidx, C.int(tid))
m.results[group][tid][gctr] = float64(res)
}
}
}
// Go over the event set metrics, derive the value out of the event:counter values and send it
for _, metric := range evset.Metrics {
// The metric scope is determined in the Init() function
// Get the map scope-id -> tids
scopemap := m.scopeRespTids[metric.Scope]
for domain, tid := range scopemap {
if tid >= 0 {
value, err := mr.EvalFloat64Condition(metric.Calc, m.results[group][tid])
if err != nil {
cclog.ComponentError(m.name, "Calculation for metric", metric.Name, "failed:", err.Error())
continue
}
m.mresults[group][tid][metric.Name] = value
// Now we have the result, send it with the proper tags
tags := map[string]string{"type": metric.Scope.String()}
if metric.Scope != "node" {
tags["type-id"] = fmt.Sprintf("%d", domain)
}
fields := map[string]interface{}{"value": value}
y, err := lp.New(metric.Name, tags, m.meta, fields, time.Now())
if err == nil {
output <- y
}
}
}
}
return nil
}
// Go over the global metrics, derive the value out of the event sets' metric values and send it
func (m *LikwidCollector) calcGlobalMetrics(interval time.Duration, output chan lp.CCMetric) error {
for _, metric := range m.config.Metrics {
scopemap := m.scopeRespTids[metric.Scope]
for domain, tid := range scopemap {
if tid >= 0 {
// Here we generate parameter list
params := make(map[string]interface{})
for j := range m.groups {
for mname, mres := range m.mresults[j][tid] {
params[mname] = mres
}
}
// Evaluate the metric
value, err := mr.EvalFloat64Condition(metric.Calc, params)
if err != nil {
cclog.ComponentError(m.name, "Calculation for metric", metric.Name, "failed:", err.Error())
continue
}
m.gmresults[tid][metric.Name] = value
// Now we have the result, send it with the proper tags
tags := map[string]string{"type": metric.Scope.String()}
if metric.Scope != "node" {
tags["type-id"] = fmt.Sprintf("%d", domain)
}
fields := map[string]interface{}{"value": value}
y, err := lp.New(metric.Name, tags, m.meta, fields, time.Now())
if err == nil {
output <- y
}
}
}
}
return nil
}
// main read function taking multiple measurement rounds, each 'interval' seconds long
func (m *LikwidCollector) Read(interval time.Duration, output chan lp.CCMetric) {
if !m.init { if !m.init {
return return
} }
var ret C.int
for i, gid := range m.groups { for i, _ := range m.groups {
evset := m.config.Eventsets[i] // measure event set 'i' for 'interval' seconds
ret = C.perfmon_setupCounters(gid) err := m.takeMeasurement(i, interval)
if ret != 0 { if err != nil {
log.Print("Failed to setup performance group ", C.perfmon_getGroupName(gid)) cclog.ComponentError(m.name, err.Error())
continue continue
} }
ret = C.perfmon_startCounters() // read measurements and derive event set metrics
if ret != 0 { m.calcEventsetMetrics(i, interval, output)
log.Print("Failed to start performance group ", C.perfmon_getGroupName(gid))
continue
}
time.Sleep(interval)
ret = C.perfmon_stopCounters()
if ret != 0 {
log.Print("Failed to stop performance group ", C.perfmon_getGroupName(gid))
continue
}
var eidx C.int
for tid := range m.cpulist {
for eidx = 0; int(eidx) < len(evset.Events); eidx++ {
ctr := C.perfmon_getCounterName(gid, eidx)
gctr := C.GoString(ctr)
res := C.perfmon_getLastResult(gid, eidx, C.int(tid))
m.results[i][tid][gctr] = float64(res)
}
m.results[i][tid]["time"] = interval.Seconds()
m.results[i][tid]["inverseClock"] = float64(1.0 / m.basefreq)
for _, metric := range evset.Metrics {
expression, err := govaluate.NewEvaluableExpression(metric.Calc)
if err != nil {
log.Print(err.Error())
continue
}
result, err := expression.Evaluate(m.results[i][tid])
if err != nil {
log.Print(err.Error())
continue
}
m.mresults[i][tid][metric.Name] = float64(result.(float64))
}
}
}
for _, metric := range m.config.Metrics {
for tid := range m.cpulist {
var params map[string]interface{}
expression, err := govaluate.NewEvaluableExpression(metric.Calc)
if err != nil {
log.Print(err.Error())
continue
}
params = make(map[string]interface{})
for j := range m.groups {
for mname, mres := range m.mresults[j][tid] {
params[mname] = mres
}
}
result, err := expression.Evaluate(params)
if err != nil {
log.Print(err.Error())
continue
}
m.gmresults[tid][metric.Name] = float64(result.(float64))
}
}
for i := range m.groups {
evset := m.config.Eventsets[i]
for _, metric := range evset.Metrics {
_, skip := stringArrayContains(m.config.ExcludeMetrics, metric.Name)
if metric.Publish && !skip {
if metric.Scope.String() == "socket" {
for sid, tid := range m.sock2tid {
y, err := lp.New(metric.Name,
map[string]string{"type": "socket",
"type-id": fmt.Sprintf("%d", int(sid))},
m.meta,
map[string]interface{}{"value": m.mresults[i][tid][metric.Name]},
time.Now())
if err == nil {
output <- &y
}
}
} else if metric.Scope.String() == "hwthread" {
for tid, cpu := range m.cpulist {
y, err := lp.New(metric.Name,
map[string]string{"type": "cpu",
"type-id": fmt.Sprintf("%d", int(cpu))},
m.meta,
map[string]interface{}{"value": m.mresults[i][tid][metric.Name]},
time.Now())
if err == nil {
output <- &y
}
}
}
}
}
}
for _, metric := range m.config.Metrics {
_, skip := stringArrayContains(m.config.ExcludeMetrics, metric.Name)
if metric.Publish && !skip {
if metric.Scope.String() == "socket" {
for sid, tid := range m.sock2tid {
y, err := lp.New(metric.Name,
map[string]string{"type": "socket",
"type-id": fmt.Sprintf("%d", int(sid))},
m.meta,
map[string]interface{}{"value": m.gmresults[tid][metric.Name]},
time.Now())
if err == nil {
output <- &y
}
}
} else {
for tid, cpu := range m.cpulist {
y, err := lp.New(metric.Name,
map[string]string{"type": "cpu",
"type-id": fmt.Sprintf("%d", int(cpu))},
m.meta,
map[string]interface{}{"value": m.gmresults[tid][metric.Name]},
time.Now())
if err == nil {
output <- &y
}
}
}
}
} }
// use the event set metrics to derive the global metrics
m.calcGlobalMetrics(interval, output)
} }
func (m *LikwidCollector) Close() { func (m *LikwidCollector) Close() {
if m.init { if m.init {
cclog.ComponentDebug(m.name, "Closing ...")
m.init = false m.init = false
if m.running {
cclog.ComponentDebug(m.name, "Stopping counters")
C.perfmon_stopCounters()
}
cclog.ComponentDebug(m.name, "Finalize LIKWID perfmon module")
C.perfmon_finalize() C.perfmon_finalize()
cclog.ComponentDebug(m.name, "Finalize LIKWID topology module")
C.topology_finalize() C.topology_finalize()
cclog.ComponentDebug(m.name, "Closing done")
} }
} }

View File

@ -11,59 +11,84 @@ import (
// Most functions are derived from github.com/influxdata/line-protocol/metric.go // Most functions are derived from github.com/influxdata/line-protocol/metric.go
// The metric type is extended with an extra meta information list re-using the Tag // The metric type is extended with an extra meta information list re-using the Tag
// type. // type.
//
// See: https://docs.influxdata.com/influxdb/latest/reference/syntax/line-protocol/
type ccMetric struct { type ccMetric struct {
name string name string // Measurement name
tags []*lp.Tag meta map[string]string // map of meta data tags
fields []*lp.Field tags map[string]string // map of of tags
tm time.Time fields []*lp.Field // unordered list of of fields
meta []*lp.Tag tm time.Time // timestamp
} }
// ccmetric access functions
type CCMetric interface { type CCMetric interface {
lp.MutableMetric lp.Metric // Time(), Name(), TagList(), FieldList()
AddMeta(key, value string)
MetaList() []*lp.Tag SetName(name string)
RemoveTag(key string) SetTime(t time.Time)
GetTag(key string) (string, bool)
GetMeta(key string) (string, bool) Meta() map[string]string // Map of meta data tags
GetField(key string) (interface{}, bool) MetaList() []*lp.Tag // Ordered list of meta data
HasField(key string) bool AddMeta(key, value string) // Add a meta data tag
RemoveField(key string) GetMeta(key string) (string, bool) // Get a meta data tab addressed by its key
Tags() map[string]string // Map of tags
AddTag(key, value string) // Add a tag
GetTag(key string) (string, bool) // Get a tag by its key
RemoveTag(key string) // Remove a tag by its key
GetField(key string) (interface{}, bool) // Get a field addressed by its key
HasField(key string) bool // Check if a field key is present
RemoveField(key string) // Remove a field addressed by its key
} }
// Meta returns the meta data tags as key-value mapping
func (m *ccMetric) Meta() map[string]string { func (m *ccMetric) Meta() map[string]string {
meta := make(map[string]string, len(m.meta))
for _, m := range m.meta {
meta[m.Key] = m.Value
}
return meta
}
func (m *ccMetric) MetaList() []*lp.Tag {
return m.meta return m.meta
} }
func (m *ccMetric) String() string { // MetaList returns the the list of meta data tags as sorted list of key value tags
return fmt.Sprintf("%s %v %v %v %d", m.name, m.Tags(), m.Meta(), m.Fields(), m.tm.UnixNano()) func (m *ccMetric) MetaList() []*lp.Tag {
ml := make([]*lp.Tag, 0, len(m.meta))
for key, value := range m.meta {
ml = append(ml, &lp.Tag{Key: key, Value: value})
}
sort.Slice(ml, func(i, j int) bool { return ml[i].Key < ml[j].Key })
return ml
} }
// String implements the stringer interface for data type ccMetric
func (m *ccMetric) String() string {
return fmt.Sprintf("%s %v %v %v %d", m.name, m.tags, m.meta, m.Fields(), m.tm.UnixNano())
}
// Name returns the measurement name
func (m *ccMetric) Name() string { func (m *ccMetric) Name() string {
return m.name return m.name
} }
func (m *ccMetric) Tags() map[string]string { func (m *ccMetric) SetName(name string) {
tags := make(map[string]string, len(m.tags)) m.name = name
for _, tag := range m.tags {
tags[tag.Key] = tag.Value
}
return tags
} }
func (m *ccMetric) TagList() []*lp.Tag { // Tags returns the the list of tags as key-value-mapping
func (m *ccMetric) Tags() map[string]string {
return m.tags return m.tags
} }
// TagList returns the the list of tags as sorted list of key value tags
func (m *ccMetric) TagList() []*lp.Tag {
tl := make([]*lp.Tag, 0, len(m.tags))
for key, value := range m.tags {
tl = append(tl, &lp.Tag{Key: key, Value: value})
}
sort.Slice(tl, func(i, j int) bool { return tl[i].Key < tl[j].Key })
return tl
}
// Fields returns the list of fields as key-value-mapping
func (m *ccMetric) Fields() map[string]interface{} { func (m *ccMetric) Fields() map[string]interface{} {
fields := make(map[string]interface{}, len(m.fields)) fields := make(map[string]interface{}, len(m.fields))
for _, field := range m.fields { for _, field := range m.fields {
@ -73,116 +98,70 @@ func (m *ccMetric) Fields() map[string]interface{} {
return fields return fields
} }
// FieldList returns the list of fields
func (m *ccMetric) FieldList() []*lp.Field { func (m *ccMetric) FieldList() []*lp.Field {
return m.fields return m.fields
} }
// Time returns timestamp
func (m *ccMetric) Time() time.Time { func (m *ccMetric) Time() time.Time {
return m.tm return m.tm
} }
// SetTime sets the timestamp
func (m *ccMetric) SetTime(t time.Time) { func (m *ccMetric) SetTime(t time.Time) {
m.tm = t m.tm = t
} }
// HasTag checks if a tag with key equal to <key> is present in the list of tags
func (m *ccMetric) HasTag(key string) bool { func (m *ccMetric) HasTag(key string) bool {
for _, tag := range m.tags { _, ok := m.tags[key]
if tag.Key == key { return ok
return true
}
}
return false
} }
// GetTag returns the tag with tag's key equal to <key>
func (m *ccMetric) GetTag(key string) (string, bool) { func (m *ccMetric) GetTag(key string) (string, bool) {
for _, tag := range m.tags { value, ok := m.tags[key]
if tag.Key == key { return value, ok
return tag.Value, true
}
}
return "", false
} }
// RemoveTag removes the tag with tag's key equal to <key>
// and keeps the tag list ordered by the keys
func (m *ccMetric) RemoveTag(key string) { func (m *ccMetric) RemoveTag(key string) {
for i, tag := range m.tags { delete(m.tags, key)
if tag.Key == key {
copy(m.tags[i:], m.tags[i+1:])
m.tags[len(m.tags)-1] = nil
m.tags = m.tags[:len(m.tags)-1]
return
}
}
} }
// AddTag adds a tag (consisting of key and value)
// and keeps the tag list ordered by the keys
func (m *ccMetric) AddTag(key, value string) { func (m *ccMetric) AddTag(key, value string) {
for i, tag := range m.tags { m.tags[key] = value
if key > tag.Key {
continue
}
if key == tag.Key {
tag.Value = value
return
}
m.tags = append(m.tags, nil)
copy(m.tags[i+1:], m.tags[i:])
m.tags[i] = &lp.Tag{Key: key, Value: value}
return
}
m.tags = append(m.tags, &lp.Tag{Key: key, Value: value})
} }
// HasTag checks if a meta data tag with meta data's key equal to <key> is present in the list of meta data tags
func (m *ccMetric) HasMeta(key string) bool { func (m *ccMetric) HasMeta(key string) bool {
for _, tag := range m.meta { _, ok := m.meta[key]
if tag.Key == key { return ok
return true
}
}
return false
} }
// GetMeta returns the meta data tag with meta data's key equal to <key>
func (m *ccMetric) GetMeta(key string) (string, bool) { func (m *ccMetric) GetMeta(key string) (string, bool) {
for _, tag := range m.meta { value, ok := m.meta[key]
if tag.Key == key { return value, ok
return tag.Value, true
}
}
return "", false
} }
// RemoveMeta removes the meta data tag with tag's key equal to <key>
// and keeps the meta data tag list ordered by the keys
func (m *ccMetric) RemoveMeta(key string) { func (m *ccMetric) RemoveMeta(key string) {
for i, tag := range m.meta { delete(m.meta, key)
if tag.Key == key {
copy(m.meta[i:], m.meta[i+1:])
m.meta[len(m.meta)-1] = nil
m.meta = m.meta[:len(m.meta)-1]
return
}
}
} }
// AddMeta adds a meta data tag (consisting of key and value)
// and keeps the meta data list ordered by the keys
func (m *ccMetric) AddMeta(key, value string) { func (m *ccMetric) AddMeta(key, value string) {
for i, tag := range m.meta { m.meta[key] = value
if key > tag.Key {
continue
}
if key == tag.Key {
tag.Value = value
return
}
m.meta = append(m.meta, nil)
copy(m.meta[i+1:], m.meta[i:])
m.meta[i] = &lp.Tag{Key: key, Value: value}
return
}
m.meta = append(m.meta, &lp.Tag{Key: key, Value: value})
} }
// AddField adds a field (consisting of key and value) to the unordered list of fields
func (m *ccMetric) AddField(key string, value interface{}) { func (m *ccMetric) AddField(key string, value interface{}) {
for i, field := range m.fields { for i, field := range m.fields {
if key == field.Key { if key == field.Key {
@ -193,6 +172,7 @@ func (m *ccMetric) AddField(key string, value interface{}) {
m.fields = append(m.fields, &lp.Field{Key: key, Value: convertField(value)}) m.fields = append(m.fields, &lp.Field{Key: key, Value: convertField(value)})
} }
// GetField returns the field with field's key equal to <key>
func (m *ccMetric) GetField(key string) (interface{}, bool) { func (m *ccMetric) GetField(key string) (interface{}, bool) {
for _, field := range m.fields { for _, field := range m.fields {
if field.Key == key { if field.Key == key {
@ -202,6 +182,7 @@ func (m *ccMetric) GetField(key string) (interface{}, bool) {
return "", false return "", false
} }
// HasField checks if a field with field's key equal to <key> is present in the list of fields
func (m *ccMetric) HasField(key string) bool { func (m *ccMetric) HasField(key string) bool {
for _, field := range m.fields { for _, field := range m.fields {
if field.Key == key { if field.Key == key {
@ -211,6 +192,8 @@ func (m *ccMetric) HasField(key string) bool {
return false return false
} }
// RemoveField removes the field with field's key equal to <key>
// from the unordered list of fields
func (m *ccMetric) RemoveField(key string) { func (m *ccMetric) RemoveField(key string) {
for i, field := range m.fields { for i, field := range m.fields {
if field.Key == key { if field.Key == key {
@ -222,6 +205,7 @@ func (m *ccMetric) RemoveField(key string) {
} }
} }
// New creates a new measurement point
func New( func New(
name string, name string,
tags map[string]string, tags map[string]string,
@ -231,58 +215,49 @@ func New(
) (CCMetric, error) { ) (CCMetric, error) {
m := &ccMetric{ m := &ccMetric{
name: name, name: name,
tags: nil, tags: make(map[string]string, len(tags)),
fields: nil, meta: make(map[string]string, len(meta)),
fields: make([]*lp.Field, 0, len(fields)),
tm: tm, tm: tm,
meta: nil,
} }
if len(tags) > 0 { // deep copy tags
m.tags = make([]*lp.Tag, 0, len(tags)) for k, v := range tags {
for k, v := range tags { m.tags[k] = v
m.tags = append(m.tags,
&lp.Tag{Key: k, Value: v})
}
sort.Slice(m.tags, func(i, j int) bool { return m.tags[i].Key < m.tags[j].Key })
} }
if len(meta) > 0 { // deep copy meta data tags
m.meta = make([]*lp.Tag, 0, len(meta)) for k, v := range meta {
for k, v := range meta { m.meta[k] = v
m.meta = append(m.meta,
&lp.Tag{Key: k, Value: v})
}
sort.Slice(m.meta, func(i, j int) bool { return m.meta[i].Key < m.meta[j].Key })
} }
if len(fields) > 0 { // Unsorted list of fields
m.fields = make([]*lp.Field, 0, len(fields)) for k, v := range fields {
for k, v := range fields { v := convertField(v)
v := convertField(v) if v == nil {
if v == nil { continue
continue
}
m.AddField(k, v)
} }
m.AddField(k, v)
} }
return m, nil return m, nil
} }
func FromMetric(other CCMetric) CCMetric { // FromMetric copies the metric <other>
func FromMetric(other ccMetric) CCMetric {
m := &ccMetric{ m := &ccMetric{
name: other.Name(), name: other.Name(),
tags: make([]*lp.Tag, len(other.TagList())), tags: make(map[string]string),
fields: make([]*lp.Field, len(other.FieldList())), fields: make([]*lp.Field, len(other.FieldList())),
meta: make([]*lp.Tag, len(other.MetaList())), meta: make(map[string]string),
tm: other.Time(), tm: other.Time(),
} }
for i, tag := range other.TagList() { for key, value := range other.Tags() {
m.tags[i] = &lp.Tag{Key: tag.Key, Value: tag.Value} m.tags[key] = value
} }
for i, s := range other.MetaList() { for key, value := range other.Meta() {
m.meta[i] = &lp.Tag{Key: s.Key, Value: s.Value} m.meta[key] = value
} }
for i, field := range other.FieldList() { for i, field := range other.FieldList() {
@ -291,25 +266,35 @@ func FromMetric(other CCMetric) CCMetric {
return m return m
} }
// FromInfluxMetric copies the influxDB line protocol metric <other>
func FromInfluxMetric(other lp.Metric) CCMetric { func FromInfluxMetric(other lp.Metric) CCMetric {
m := &ccMetric{ m := &ccMetric{
name: other.Name(), name: other.Name(),
tags: make([]*lp.Tag, len(other.TagList())), tags: make(map[string]string),
fields: make([]*lp.Field, len(other.FieldList())), fields: make([]*lp.Field, len(other.FieldList())),
meta: make([]*lp.Tag, 0), meta: make(map[string]string),
tm: other.Time(), tm: other.Time(),
} }
for i, tag := range other.TagList() { for _, otherTag := range other.TagList() {
m.tags[i] = &lp.Tag{Key: tag.Key, Value: tag.Value} m.tags[otherTag.Key] = otherTag.Value
} }
for i, field := range other.FieldList() { for i, otherField := range other.FieldList() {
m.fields[i] = &lp.Field{Key: field.Key, Value: field.Value} m.fields[i] = &lp.Field{
Key: otherField.Key,
Value: otherField.Value,
}
} }
return m return m
} }
// convertField converts data types of fields by the following schemata:
// *float32, *float64, float32, float64 -> float64
// *int, *int8, *int16, *int32, *int64, int, int8, int16, int32, int64 -> int64
// *uint, *uint8, *uint16, *uint32, *uint64, uint, uint8, uint16, uint32, uint64 -> uint64
// *[]byte, *string, []byte, string -> string
// *bool, bool -> bool
func convertField(v interface{}) interface{} { func convertField(v interface{}) interface{} {
switch v := v.(type) { switch v := v.(type) {
case float64: case float64:

View File

@ -24,17 +24,23 @@ func intArrayContains(array []int, str int) (int, bool) {
return -1, false return -1, false
} }
// stringArrayContains scans an array of strings if the value str is present in the array func fileToInt(path string) int {
// If the specified value is found, the corresponding array index is returned. buffer, err := ioutil.ReadFile(path)
// The bool value is used to signal success or failure if err != nil {
// func stringArrayContains(array []string, str string) (int, bool) { log.Print(err)
// for i, a := range array { cclogger.ComponentError("ccTopology", "Reading", path, ":", err.Error())
// if a == str { return -1
// return i, true }
// } sbuffer := strings.Replace(string(buffer), "\n", "", -1)
// } var id int64
// return -1, false //_, err = fmt.Scanf("%d", sbuffer, &id)
// } id, err = strconv.ParseInt(sbuffer, 10, 32)
if err != nil {
cclogger.ComponentError("ccTopology", "Parsing", path, ":", sbuffer, err.Error())
return -1
}
return int(id)
}
func SocketList() []int { func SocketList() []int {
buffer, err := ioutil.ReadFile("/proc/cpuinfo") buffer, err := ioutil.ReadFile("/proc/cpuinfo")
@ -68,7 +74,7 @@ func CpuList() []int {
return nil return nil
} }
ll := strings.Split(string(buffer), "\n") ll := strings.Split(string(buffer), "\n")
var cpulist []int cpulist := make([]int, 0)
for _, line := range ll { for _, line := range ll {
if strings.HasPrefix(line, "processor") { if strings.HasPrefix(line, "processor") {
lv := strings.Fields(line) lv := strings.Fields(line)
@ -86,6 +92,67 @@ func CpuList() []int {
return cpulist return cpulist
} }
func CoreList() []int {
buffer, err := ioutil.ReadFile("/proc/cpuinfo")
if err != nil {
log.Print(err)
return nil
}
ll := strings.Split(string(buffer), "\n")
corelist := make([]int, 0)
for _, line := range ll {
if strings.HasPrefix(line, "core id") {
lv := strings.Fields(line)
id, err := strconv.ParseInt(lv[3], 10, 32)
if err != nil {
log.Print(err)
return corelist
}
_, found := intArrayContains(corelist, int(id))
if !found {
corelist = append(corelist, int(id))
}
}
}
return corelist
}
func NumaNodeList() []int {
numalist := make([]int, 0)
files, err := filepath.Glob("/sys/devices/system/node/node*")
if err != nil {
log.Print(err)
}
for _, f := range files {
finfo, err := os.Lstat(f)
if err == nil && (finfo.IsDir() || finfo.Mode()&os.ModeSymlink != 0) {
var id int
parts := strings.Split(f, "/")
_, err = fmt.Scanf("node%d", parts[len(parts)-1], &id)
if err == nil {
_, found := intArrayContains(numalist, int(id))
if !found {
numalist = append(numalist, int(id))
}
}
}
}
return numalist
}
func DieList() []int {
cpulist := CpuList()
dielist := make([]int, 0)
for _, c := range cpulist {
dieid := fileToInt(fmt.Sprintf("/sys/devices/system/cpu/cpu%d/topology/die_id", c))
_, found := intArrayContains(dielist, int(dieid))
if !found {
dielist = append(dielist, int(dieid))
}
}
return dielist
}
type CpuEntry struct { type CpuEntry struct {
Cpuid int Cpuid int
SMT int SMT int
@ -203,6 +270,7 @@ type CpuInformation struct {
SMTWidth int SMTWidth int
NumSockets int NumSockets int
NumDies int NumDies int
NumCores int
NumNumaDomains int NumNumaDomains int
} }
@ -213,6 +281,7 @@ func CpuInfo() CpuInformation {
numa := 0 numa := 0
die := 0 die := 0
socket := 0 socket := 0
core := 0
cdata := CpuData() cdata := CpuData()
for _, d := range cdata { for _, d := range cdata {
if d.SMT > smt { if d.SMT > smt {
@ -227,10 +296,14 @@ func CpuInfo() CpuInformation {
if d.Socket > socket { if d.Socket > socket {
socket = d.Socket socket = d.Socket
} }
if d.Core > core {
core = d.Core
}
} }
c.NumNumaDomains = numa + 1 c.NumNumaDomains = numa + 1
c.SMTWidth = smt + 1 c.SMTWidth = smt + 1
c.NumDies = die + 1 c.NumDies = die + 1
c.NumCores = core + 1
c.NumSockets = socket + 1 c.NumSockets = socket + 1
c.NumHWthreads = len(cdata) c.NumHWthreads = len(cdata)
return c return c
@ -275,3 +348,47 @@ func GetCpuCore(cpuid int) int {
} }
return -1 return -1
} }
func GetSocketCpus(socket int) []int {
all := CpuData()
cpulist := make([]int, 0)
for _, d := range all {
if d.Socket == socket {
cpulist = append(cpulist, d.Cpuid)
}
}
return cpulist
}
func GetNumaDomainCpus(domain int) []int {
all := CpuData()
cpulist := make([]int, 0)
for _, d := range all {
if d.Numadomain == domain {
cpulist = append(cpulist, d.Cpuid)
}
}
return cpulist
}
func GetDieCpus(die int) []int {
all := CpuData()
cpulist := make([]int, 0)
for _, d := range all {
if d.Die == die {
cpulist = append(cpulist, d.Cpuid)
}
}
return cpulist
}
func GetCoreCpus(core int) []int {
all := CpuData()
cpulist := make([]int, 0)
for _, d := range all {
if d.Core == core {
cpulist = append(cpulist, d.Cpuid)
}
}
return cpulist
}

View File

@ -6,6 +6,8 @@ The CCMetric router sits in between the collectors and the sinks and can be used
```json ```json
{ {
"num_cache_intervals" : 1,
"interval_timestamp" : true,
"add_tags" : [ "add_tags" : [
{ {
"key" : "cluster", "key" : "cluster",
@ -25,16 +27,58 @@ The CCMetric router sits in between the collectors and the sinks and can be used
"if" : "*" "if" : "*"
} }
], ],
"interval_timestamp" : true "interval_aggregates" : [
{
"name" : "temp_cores_avg",
"if" : "match('temp_core_%d+', metric.Name())",
"function" : "avg(values)",
"tags" : {
"type" : "node"
},
"meta" : {
"group": "IPMI",
"unit": "degC",
"source": "TempCollector"
}
}
],
"drop_metrics" : [
"not_interesting_metric_at_all"
],
"drop_metrics_if" : [
"match('temp_core_%d+', metric.Name())"
],
"rename_metrics" : {
"metric_12345" : "mymetric"
}
} }
``` ```
There are three main options `add_tags`, `delete_tags` and `interval_timestamp`. `add_tags` and `delete_tags` are lists consisting of dicts with `key`, `value` and `if`. The `value` can be omitted in the `delete_tags` part as it only uses the `key` for removal. The `interval_timestamp` setting means that a unique timestamp is applied to all metrics traversing the router during an interval. There are three main options `add_tags`, `delete_tags` and `interval_timestamp`. `add_tags` and `delete_tags` are lists consisting of dicts with `key`, `value` and `if`. The `value` can be omitted in the `delete_tags` part as it only uses the `key` for removal. The `interval_timestamp` setting means that a unique timestamp is applied to all metrics traversing the router during an interval.
# The `interval_timestamp` option
# Conditional manipulation of tags The collectors' `Read()` functions are not called simultaneously and therefore the metrics gathered in an interval can have different timestamps. If you want to avoid that and have a common timestamp (the beginning of the interval), set this option to `true` and the MetricRouter sets the time.
The `if` setting allows conditional testing of a single metric like in the example: # The `num_cache_intervals` option
If the MetricRouter should buffer metrics of intervals in a MetricCache, this option specifies the number of past intervals that should be kept. If `num_cache_intervals = 0`, the cache is disabled. With `num_cache_intervals = 1`, only the metrics of the last interval are buffered.
A `num_cache_intervals > 0` is required to use the `interval_aggregates` option.
# The `rename_metrics` option
In the ClusterCockpit world we specified a set of standard metrics. Since some collectors determine the metric names based on files, execuables and libraries, they might change from system to system (or installation to installtion, OS to OS, ...). In order to get the common names, you can rename incoming metrics before sending them to the sink. If the metric name matches the `oldname`, it is changed to `newname`
```json
{
"oldname" : "newname",
"clock_mhz" : "clock"
}
```
# Conditional manipulation of tags (`add_tags` and `del_tags`)
Common config format:
```json ```json
{ {
"key" : "test", "key" : "test",
@ -43,8 +87,131 @@ The `if` setting allows conditional testing of a single metric like in the examp
} }
``` ```
If the CCMetric name is equal to 'temp_package_id_0', it adds an additional tag `test=testing` to the metric. ## The `del_tags` option
In order to match all metrics, you can use `*`, so in order to add a flag per default, like the `cluster=testcluster` tag in the example. The collectors are free to add whatever `key=value` pair to the metric tags (although the usage of tags should be minimized). If you want to delete a tag afterwards, you can do that. When the `if` condition matches on a metric, the `key` is removed from the metric's tags.
If you want to remove a tag for all metrics, use the condition wildcard `*`. The `value` field can be omitted in the `del_tags` case.
Never delete tags:
- `hostname`
- `type`
- `type-id`
## The `add_tags` option
In some cases, metrics should be tagged or an existing tag changed based on some condition. This can be done in the `add_tags` section. When the `if` condition evaluates to `true`, the tag `key` is added or gets changed to the new `value`.
If the CCMetric name is equal to `temp_package_id_0`, it adds an additional tag `test=testing` to the metric.
For this metric, a more useful example would be:
```json
[
{
"key" : "type",
"value" : "socket",
"if" : "name == 'temp_package_id_0'"
},
{
"key" : "type-id",
"value" : "0",
"if" : "name == 'temp_package_id_0'"
},
]
```
The metric `temp_package_id_0` corresponds to the tempature of the first CPU socket (=package). With the above configuration, the tags would reflect that because commonly the [TempCollector](../../collectors/tempMetric.md) submits only `node` metrics.
In order to match all metrics, you can use `*`, so in order to add a flag per default. This is useful to attached system-specific tags like `cluster=testcluster`:
```json
{
"key" : "cluster",
"value" : "testcluster",
"if" : "*"
}
```
# Dropping metrics
In some cases, you want to drop a metric and don't get it forwarded to the sinks. There are two options based on the required specification:
- Based only on the metric name -> `drop_metrics` section
- An evaluable condition with more overhead -> `drop_metrics_if` section
## The `drop_metrics` section
The argument is a list of metric names. No futher checks are performed, only a comparison of the metric name
```json
{
"drop_metrics" : [
"drop_metric_1",
"drop_metric_2"
]
}
```
The example drops all metrics with the name `drop_metric_1` and `drop_metric_2`.
## The `drop_metrics_if` section
This option takes a list of evaluable conditions and performs them one after the other on **all** metrics incoming from the collectors and the metric cache (aka `interval_aggregates`).
```json
{
"drop_metrics_if" : [
"match('drop_metric_%d+', name)",
"match('cpu', type) && type-id == 0"
]
}
```
The first line is comparable with the example in `drop_metrics`, it drops all metrics starting with `drop_metric_` and ending with a number. The second line drops all metrics of the first hardware thread (**not** recommended)
# Aggregate metric values of the current interval with the `interval_aggregates` option
**Note:** `interval_aggregates` works only if `num_cache_intervals` > 0
In some cases, you need to derive new metrics based on the metrics arriving during an interval. This can be done in the `interval_aggregates` section. The logic is similar to the other metric manipulation and filtering options. A cache stores all metrics that arrive during an interval. At the beginning of the *next* interval, the list of metrics is submitted to the MetricAggregator. It derives new metrics and submits them back to the MetricRouter, so they are sent in the next interval but have the timestamp of the previous interval beginning.
```json
"interval_aggregates" : [
{
"name" : "new_metric_name",
"if" : "match('sub_metric_%d+', metric.Name())",
"function" : "avg(values)",
"tags" : {
"key" : "value",
"type" : "node"
},
"meta" : {
"key" : "value",
"group": "IPMI",
"unit": "<copy>",
}
}
]
```
The above configuration, collects all metric values for metrics evaluating `if` to `true`. Afterwards it calculates the average `avg` of the `values` (list of all metrics' field `value`) and creates a new CCMetric with the name `new_metric_name` and adds the tags in `tags` and the meta information in `meta`. The special value `<copy>` searches the input metrics and copies the value of the first match of `key` to the new CCMetric.
If you are not interested in the input metrics `sub_metric_%d+` at all, you can add the same condition used here to the `drop_metrics_if` section to drop them.
Use cases for `interval_aggregates`:
- Combine multiple metrics of the a collector to a new one like the [MemstatCollector](../../collectors/memstatMetric.md) does it for `mem_used`)):
```json
{
"name" : "mem_used",
"if" : "source == 'MemstatCollector'",
"function" : "sum(mem_total) - (sum(mem_free) + sum(mem_buffers) + sum(mem_cached))",
"tags" : {
"type" : "node"
},
"meta" : {
"group": "<copy>",
"unit": "<copy>",
"source": "<copy>"
}
}
```

View File

@ -3,6 +3,7 @@ package metricRouter
import ( import (
"context" "context"
"fmt" "fmt"
"math"
"os" "os"
"strings" "strings"
"time" "time"
@ -84,7 +85,7 @@ func (c *metricAggregator) Init(output chan *lp.CCMetric) error {
c.constants["smtWidth"] = cinfo.SMTWidth c.constants["smtWidth"] = cinfo.SMTWidth
c.language = gval.NewLanguage( c.language = gval.NewLanguage(
gval.Base(), gval.Full(),
metricCacheLanguage, metricCacheLanguage,
) )
@ -283,6 +284,86 @@ func (c *metricAggregator) AddFunction(name string, function func(args ...interf
c.language = gval.NewLanguage(c.language, gval.Function(name, function)) c.language = gval.NewLanguage(c.language, gval.Function(name, function))
} }
func EvalBoolCondition(condition string, params map[string]interface{}) (bool, error) {
newcond := strings.ReplaceAll(condition, "'", "\"")
newcond = strings.ReplaceAll(newcond, "%", "\\")
language := gval.NewLanguage(
gval.Full(),
metricCacheLanguage,
)
value, err := gval.Evaluate(newcond, params, language)
if err != nil {
return false, err
}
var endResult bool = false
err = nil
switch r := value.(type) {
case bool:
endResult = r
case float64:
if r != 0.0 {
endResult = true
}
case float32:
if r != 0.0 {
endResult = true
}
case int:
if r != 0 {
endResult = true
}
case int64:
if r != 0 {
endResult = true
}
case int32:
if r != 0 {
endResult = true
}
default:
err = fmt.Errorf("cannot evaluate '%s' to bool", newcond)
}
return endResult, err
}
func EvalFloat64Condition(condition string, params map[string]interface{}) (float64, error) {
var endResult float64 = math.NaN()
newcond := strings.ReplaceAll(condition, "'", "\"")
newcond = strings.ReplaceAll(newcond, "%", "\\")
language := gval.NewLanguage(
gval.Full(),
metricCacheLanguage,
)
value, err := gval.Evaluate(newcond, params, language)
if err != nil {
cclog.ComponentDebug("MetricRouter", condition, " = ", err.Error())
return endResult, err
}
err = nil
switch r := value.(type) {
case bool:
if r {
endResult = 1.0
} else {
endResult = 0.0
}
case float64:
endResult = r
case float32:
endResult = float64(r)
case int:
endResult = float64(r)
case int64:
endResult = float64(r)
case int32:
endResult = float64(r)
default:
err = fmt.Errorf("cannot evaluate '%s' to float64", newcond)
}
return endResult, err
}
func NewAggregator(output chan *lp.CCMetric) (MetricAggregator, error) { func NewAggregator(output chan *lp.CCMetric) (MetricAggregator, error) {
a := new(metricAggregator) a := new(metricAggregator)
err := a.Init(output) err := a.Init(output)

View File

@ -11,7 +11,6 @@ import (
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
mct "github.com/ClusterCockpit/cc-metric-collector/internal/multiChanTicker" mct "github.com/ClusterCockpit/cc-metric-collector/internal/multiChanTicker"
"gopkg.in/Knetic/govaluate.v2"
) )
// Metric router tag configuration // Metric router tag configuration
@ -26,8 +25,12 @@ type metricRouterConfig struct {
AddTags []metricRouterTagConfig `json:"add_tags"` // List of tags that are added when the condition is met AddTags []metricRouterTagConfig `json:"add_tags"` // List of tags that are added when the condition is met
DelTags []metricRouterTagConfig `json:"delete_tags"` // List of tags that are removed when the condition is met DelTags []metricRouterTagConfig `json:"delete_tags"` // List of tags that are removed when the condition is met
IntervalAgg []metricAggregatorIntervalConfig `json:"interval_aggregates"` // List of aggregation function processed at the end of an interval IntervalAgg []metricAggregatorIntervalConfig `json:"interval_aggregates"` // List of aggregation function processed at the end of an interval
DropMetrics []string `json:"drop_metrics"` // List of metric names to drop. For fine-grained dropping use drop_metrics_if
DropMetricsIf []string `json:"drop_metrics_if"` // List of evaluatable terms to drop metrics
RenameMetrics map[string]string `json:"rename_metrics"` // Map to rename metric name from key to value
IntervalStamp bool `json:"interval_timestamp"` // Update timestamp periodically by ticker each interval? IntervalStamp bool `json:"interval_timestamp"` // Update timestamp periodically by ticker each interval?
NumCacheIntervals int `json:"num_cache_intervals"` // Number of intervals of cached metrics for evaluation NumCacheIntervals int `json:"num_cache_intervals"` // Number of intervals of cached metrics for evaluation
dropMetrics map[string]bool // Internal map for O(1) lookup
} }
// Metric router data structure // Metric router data structure
@ -92,17 +95,19 @@ func (r *metricRouter) Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, rout
cclog.ComponentError("MetricRouter", err.Error()) cclog.ComponentError("MetricRouter", err.Error())
return err return err
} }
numIntervals := r.config.NumCacheIntervals if r.config.NumCacheIntervals >= 0 {
if numIntervals <= 0 { r.cache, err = NewCache(r.cache_input, r.ticker, &r.cachewg, r.config.NumCacheIntervals)
numIntervals = 1 if err != nil {
cclog.ComponentError("MetricRouter", "MetricCache initialization failed:", err.Error())
return err
}
for _, agg := range r.config.IntervalAgg {
r.cache.AddAggregation(agg.Name, agg.Function, agg.Condition, agg.Tags, agg.Meta)
}
} }
r.cache, err = NewCache(r.cache_input, r.ticker, &r.cachewg, numIntervals) r.config.dropMetrics = make(map[string]bool)
if err != nil { for _, mname := range r.config.DropMetrics {
cclog.ComponentError("MetricRouter", "MetricCache initialization failed:", err.Error()) r.config.dropMetrics[mname] = true
return err
}
for _, agg := range r.config.IntervalAgg {
r.cache.AddAggregation(agg.Name, agg.Function, agg.Condition, agg.Tags, agg.Meta)
} }
return nil return nil
} }
@ -130,48 +135,34 @@ func (r *metricRouter) StartTimer() {
cclog.ComponentDebug("MetricRouter", "TIMER START") cclog.ComponentDebug("MetricRouter", "TIMER START")
} }
// EvalCondition evaluates condition cond for metric data from point
func (r *metricRouter) EvalCondition(cond string, pptr *lp.CCMetric) (bool, error) {
point := *pptr
expression, err := govaluate.NewEvaluableExpression(cond)
if err != nil {
cclog.ComponentDebug("MetricRouter", cond, " = ", err.Error())
return false, err
}
// Add metric name, tags, meta data, fields and timestamp to the parameter list func getParamMap(point lp.CCMetric) map[string]interface{} {
params := make(map[string]interface{}) params := make(map[string]interface{})
params["metric"] = point
params["name"] = point.Name() params["name"] = point.Name()
for _, t := range point.TagList() { for key, value := range point.Tags() {
params[t.Key] = t.Value params[key] = value
} }
for _, m := range point.MetaList() { for key, value := range point.Meta() {
params[m.Key] = m.Value params[key] = value
} }
for _, f := range point.FieldList() { for _, f := range point.FieldList() {
params[f.Key] = f.Value params[f.Key] = f.Value
} }
params["timestamp"] = point.Time() params["timestamp"] = point.Time()
return params
// evaluate condition
result, err := expression.Evaluate(params)
if err != nil {
cclog.ComponentDebug("MetricRouter", cond, " = ", err.Error())
return false, err
}
return bool(result.(bool)), err
} }
// DoAddTags adds a tag when condition is fullfiled // DoAddTags adds a tag when condition is fullfiled
func (r *metricRouter) DoAddTags(point *lp.CCMetric) { func (r *metricRouter) DoAddTags(point *lp.CCMetric) {
for _, m := range r.config.AddTags { for _, m := range r.config.AddTags {
var conditionMatches bool var conditionMatches bool = false
if m.Condition == "*" { if m.Condition == "*" {
conditionMatches = true conditionMatches = true
} else { } else {
var err error var err error
conditionMatches, err = r.EvalCondition(m.Condition, point) conditionMatches, err = EvalBoolCondition(m.Condition, getParamMap(point))
if err != nil { if err != nil {
cclog.ComponentError("MetricRouter", err.Error()) cclog.ComponentError("MetricRouter", err.Error())
conditionMatches = false conditionMatches = false
@ -186,13 +177,13 @@ func (r *metricRouter) DoAddTags(point *lp.CCMetric) {
// DoDelTags removes a tag when condition is fullfiled // DoDelTags removes a tag when condition is fullfiled
func (r *metricRouter) DoDelTags(point *lp.CCMetric) { func (r *metricRouter) DoDelTags(point *lp.CCMetric) {
for _, m := range r.config.DelTags { for _, m := range r.config.DelTags {
var conditionMatches bool var conditionMatches bool = false
if m.Condition == "*" { if m.Condition == "*" {
conditionMatches = true conditionMatches = true
} else { } else {
var err error var err error
conditionMatches, err = r.EvalCondition(m.Condition, point) conditionMatches, err = EvalBoolCondition(m.Condition, getParamMap(point))
if err != nil { if err != nil {
cclog.ComponentError("MetricRouter", err.Error()) cclog.ComponentError("MetricRouter", err.Error())
conditionMatches = false conditionMatches = false
@ -204,9 +195,24 @@ func (r *metricRouter) DoDelTags(point *lp.CCMetric) {
} }
} }
// Conditional test whether a metric should be dropped
func (r *metricRouter) dropMetric(point *lp.CCMetric) bool {
// Simple drop check
if _, ok := r.config.dropMetrics[(*point).Name()]; ok {
return true
}
// Checking the dropping conditions
for _, m := range r.config.DropMetricsIf {
conditionMatches, err := EvalBoolCondition(m, getParamMap((*point)))
if conditionMatches || err != nil {
return true
}
}
return false
}
// Start starts the metric router // Start starts the metric router
func (r *metricRouter) Start() { func (r *metricRouter) Start() {
// start timer if configured // start timer if configured
r.timestamp = time.Now() r.timestamp = time.Now()
if r.config.IntervalStamp { if r.config.IntervalStamp {
@ -225,13 +231,21 @@ func (r *metricRouter) Start() {
cclog.ComponentDebug("MetricRouter", "FORWARD", *point) cclog.ComponentDebug("MetricRouter", "FORWARD", *point)
r.DoAddTags(point) r.DoAddTags(point)
r.DoDelTags(point) r.DoDelTags(point)
if new, ok := r.config.RenameMetrics[point.Name()]; ok {
point.SetName(new)
}
r.DoAddTags(point)
r.DoDelTags(point)
for _, o := range r.outputs { for _, o := range r.outputs {
o <- point o <- point
} }
} }
// Start Metric Cache // Start Metric Cache
r.cache.Start() if r.config.NumCacheIntervals > 0 {
r.cache.Start()
}
r.wg.Add(1) r.wg.Add(1)
go func() { go func() {
@ -248,20 +262,30 @@ func (r *metricRouter) Start() {
if r.config.IntervalStamp { if r.config.IntervalStamp {
(*p).SetTime(r.timestamp) (*p).SetTime(r.timestamp)
} }
forward(p) if !r.dropMetric(p) {
r.cache.Add(p) forward(p)
}
// even if the metric is dropped, it is stored in the cache for
// aggregations
if r.config.NumCacheIntervals > 0 {
r.cache.Add(p)
}
case p := <-r.recv_input: case p := <-r.recv_input:
// receive from receive manager // receive from receive manager
if r.config.IntervalStamp { if r.config.IntervalStamp {
(*p).SetTime(r.timestamp) (*p).SetTime(r.timestamp)
} }
forward(p) if !r.dropMetric(p) {
forward(p)
}
case p := <-r.cache_input: case p := <-r.cache_input:
// receive from metric cache and aggregator // receive from metric collector
(*p).AddTag("hostname", r.hostname) if !r.dropMetric(p) {
forward(p) (*p).AddTag("hostname", r.hostname)
forward(p)
}
} }
} }
}() }()
@ -295,8 +319,10 @@ func (r *metricRouter) Close() {
// wait for close of channel r.timerdone // wait for close of channel r.timerdone
<-r.timerdone <-r.timerdone
} }
r.cache.Close() if r.config.NumCacheIntervals > 0 {
r.cachewg.Wait() r.cache.Close()
r.cachewg.Wait()
}
} }
// New creates a new initialized metric router // New creates a new initialized metric router

View File

@ -26,28 +26,27 @@ func (s *GangliaSink) Init(config sinkConfig) error {
return err return err
} }
func (s *GangliaSink) Write(pptr *lp.CCMetric) error { func (s *GangliaSink) Write(point *lp.CCMetric) error {
var err error = nil var err error = nil
var tagsstr []string var tagsstr []string
var argstr []string var argstr []string
point := *pptr for key, value := range (*point).Tags() {
for _, t := range point.TagList() { switch key {
switch t.Key {
case "cluster": case "cluster":
argstr = append(argstr, fmt.Sprintf("--cluster=%s", t.Value)) argstr = append(argstr, fmt.Sprintf("--cluster=%s", value))
case "unit": case "unit":
argstr = append(argstr, fmt.Sprintf("--units=%s", t.Value)) argstr = append(argstr, fmt.Sprintf("--units=%s", value))
case "group": case "group":
argstr = append(argstr, fmt.Sprintf("--group=%s", t.Value)) argstr = append(argstr, fmt.Sprintf("--group=%s", value))
default: default:
tagsstr = append(tagsstr, fmt.Sprintf("%s=%s", t.Key, t.Value)) tagsstr = append(tagsstr, fmt.Sprintf("%s=%s", key, value))
} }
} }
if len(tagsstr) > 0 { if len(tagsstr) > 0 {
argstr = append(argstr, fmt.Sprintf("--desc=%q", strings.Join(tagsstr, ","))) argstr = append(argstr, fmt.Sprintf("--desc=%q", strings.Join(tagsstr, ",")))
} }
argstr = append(argstr, fmt.Sprintf("--name=%s", point.Name())) argstr = append(argstr, fmt.Sprintf("--name=%s", (*point).Name()))
for _, f := range point.FieldList() { for _, f := range (*point).FieldList() {
if f.Key == "value" { if f.Key == "value" {
switch f.Value.(type) { switch f.Value.(type) {
case float64: case float64:

View File

@ -62,19 +62,18 @@ func (s *InfluxSink) Init(config sinkConfig) error {
func (s *InfluxSink) Write(point *lp.CCMetric) error { func (s *InfluxSink) Write(point *lp.CCMetric) error {
tags := map[string]string{} tags := map[string]string{}
fields := map[string]interface{}{} fields := map[string]interface{}{}
p := *point for key, value := range (*point).Tags() {
for _, t := range p.TagList() { tags[key] = value
tags[t.Key] = t.Value
} }
if s.meta_as_tags { if s.meta_as_tags {
for _, m := range p.MetaList() { for key, value := range (*point).Meta() {
tags[m.Key] = m.Value tags[key] = value
} }
} }
for _, f := range p.FieldList() { for _, f := range (*point).FieldList() {
fields[f.Key] = f.Value fields[f.Key] = f.Value
} }
x := influxdb2.NewPoint(p.Name(), tags, fields, p.Time()) x := influxdb2.NewPoint((*point).Name(), tags, fields, (*point).Time())
err := s.writeApi.WritePoint(context.Background(), x) err := s.writeApi.WritePoint(context.Background(), x)
return err return err
} }