Merge branch 'develop' into main

This commit is contained in:
Thomas Röhl 2022-03-31 11:47:02 +02:00
commit 16e898ecca
5 changed files with 328 additions and 167 deletions

View File

@ -15,8 +15,11 @@ import (
"io/ioutil" "io/ioutil"
"math" "math"
"os" "os"
"os/signal"
"strconv" "strconv"
"strings" "strings"
"sync"
"syscall"
"time" "time"
"unsafe" "unsafe"
@ -46,6 +49,15 @@ type LikwidCollectorEventsetConfig struct {
Metrics []LikwidCollectorMetricConfig `json:"metrics"` Metrics []LikwidCollectorMetricConfig `json:"metrics"`
} }
type LikwidEventsetConfig struct {
internal int
gid C.int
eorder []*C.char
estr *C.char
results map[int]map[string]interface{}
metrics map[int]map[string]float64
}
type LikwidCollectorConfig struct { type LikwidCollectorConfig struct {
Eventsets []LikwidCollectorEventsetConfig `json:"eventsets"` Eventsets []LikwidCollectorEventsetConfig `json:"eventsets"`
Metrics []LikwidCollectorMetricConfig `json:"globalmetrics,omitempty"` Metrics []LikwidCollectorMetricConfig `json:"globalmetrics,omitempty"`
@ -64,11 +76,12 @@ type LikwidCollector struct {
metrics map[C.int]map[string]int metrics map[C.int]map[string]int
groups []C.int groups []C.int
config LikwidCollectorConfig config LikwidCollectorConfig
results map[int]map[int]map[string]interface{}
mresults map[int]map[int]map[string]float64
gmresults map[int]map[string]float64 gmresults map[int]map[string]float64
basefreq float64 basefreq float64
running bool running bool
initialized bool
likwidGroups map[C.int]LikwidEventsetConfig
lock sync.Mutex
} }
type LikwidMetric struct { type LikwidMetric struct {
@ -86,14 +99,53 @@ func eventsToEventStr(events map[string]string) string {
return strings.Join(elist, ",") return strings.Join(elist, ",")
} }
func genLikwidEventSet(input LikwidCollectorEventsetConfig) LikwidEventsetConfig {
tmplist := make([]string, 0)
elist := make([]*C.char, 0)
for k, v := range input.Events {
tmplist = append(tmplist, fmt.Sprintf("%s:%s", v, k))
c_counter := C.CString(k)
elist = append(elist, c_counter)
}
estr := strings.Join(tmplist, ",")
res := make(map[int]map[string]interface{})
met := make(map[int]map[string]float64)
for _, i := range topo.CpuList() {
res[i] = make(map[string]interface{})
for k := range input.Events {
res[i][k] = 0.0
}
met[i] = make(map[string]float64)
for _, v := range input.Metrics {
res[i][v.Name] = 0.0
}
}
return LikwidEventsetConfig{
gid: -1,
eorder: elist,
estr: C.CString(estr),
results: res,
metrics: met,
}
}
func testLikwidMetricFormula(formula string, params []string) bool {
myparams := make(map[string]interface{})
for _, p := range params {
myparams[p] = float64(1.0)
}
_, err := agg.EvalFloat64Condition(formula, myparams)
return err == nil
}
func getBaseFreq() float64 { func getBaseFreq() float64 {
files := []string{
"/sys/devices/system/cpu/cpu0/cpufreq/bios_limit",
"/sys/devices/system/cpu/cpu0/cpufreq/base_frequency",
}
var freq float64 = math.NaN() var freq float64 = math.NaN()
C.power_init(0) for _, f := range files {
info := C.get_powerInfo() buffer, err := ioutil.ReadFile(f)
if float64(info.baseFrequency) != 0 {
freq = float64(info.baseFrequency) * 1e6
} else {
buffer, err := ioutil.ReadFile("/sys/devices/system/cpu/cpu0/cpufreq/bios_limit")
if err == nil { if err == nil {
data := strings.Replace(string(buffer), "\n", "", -1) data := strings.Replace(string(buffer), "\n", "", -1)
x, err := strconv.ParseInt(data, 0, 64) x, err := strconv.ParseInt(data, 0, 64)
@ -102,12 +154,22 @@ func getBaseFreq() float64 {
} }
} }
} }
if math.IsNaN(freq) {
C.power_init(0)
info := C.get_powerInfo()
if float64(info.baseFrequency) != 0 {
freq = float64(info.baseFrequency) * 1e6
}
C.power_finalize()
}
return freq return freq
} }
func (m *LikwidCollector) Init(config json.RawMessage) error { func (m *LikwidCollector) Init(config json.RawMessage) error {
var ret C.int
m.name = "LikwidCollector" m.name = "LikwidCollector"
m.initialized = false
m.running = false
m.config.AccessMode = LIKWID_DEF_ACCESSMODE m.config.AccessMode = LIKWID_DEF_ACCESSMODE
m.config.LibraryPath = LIKWID_LIB_NAME m.config.LibraryPath = LIKWID_LIB_NAME
if len(config) > 0 { if len(config) > 0 {
@ -140,172 +202,132 @@ func (m *LikwidCollector) Init(config json.RawMessage) error {
m.cpulist[i] = C.int(c) m.cpulist[i] = C.int(c)
m.cpu2tid[c] = i m.cpu2tid[c] = i
} }
m.sock2tid = make(map[int]int)
tmp := make([]C.int, 1) m.likwidGroups = make(map[C.int]LikwidEventsetConfig)
for _, sid := range topo.SocketList() {
cstr := C.CString(fmt.Sprintf("S%d:0", sid)) // m.results = make(map[int]map[int]map[string]interface{})
ret = C.cpustr_to_cpulist(cstr, &tmp[0], 1) // m.mresults = make(map[int]map[int]map[string]float64)
if ret > 0 {
m.sock2tid[sid] = m.cpu2tid[int(tmp[0])]
}
C.free(unsafe.Pointer(cstr))
}
m.results = make(map[int]map[int]map[string]interface{})
m.mresults = make(map[int]map[int]map[string]float64)
m.gmresults = make(map[int]map[string]float64) m.gmresults = make(map[int]map[string]float64)
cclog.ComponentDebug(m.name, "initialize LIKWID topology") for _, tid := range m.cpu2tid {
ret = C.topology_init() m.gmresults[tid] = make(map[string]float64)
if ret != 0 {
err := errors.New("failed to initialize LIKWID topology")
cclog.ComponentError(m.name, err.Error())
return err
}
switch m.config.AccessMode {
case "direct":
C.HPMmode(0)
case "accessdaemon":
if len(m.config.DaemonPath) > 0 {
p := os.Getenv("PATH")
os.Setenv("PATH", m.config.DaemonPath+":"+p)
}
C.HPMmode(1)
}
cclog.ComponentDebug(m.name, "initialize LIKWID perfmon module")
ret = C.perfmon_init(C.int(len(m.cpulist)), &m.cpulist[0])
if ret != 0 {
C.topology_finalize()
err := errors.New("failed to initialize LIKWID topology")
cclog.ComponentError(m.name, err.Error())
return err
} }
// This is for the global metrics computation test // This is for the global metrics computation test
globalParams := make(map[string]interface{}) totalMetrics := 0
globalParams["time"] = float64(1.0)
globalParams["inverseClock"] = float64(1.0)
// While adding the events, we test the metrics whether they can be computed at all
for i, evset := range m.config.Eventsets {
var gid C.int
var cstr *C.char
if len(evset.Events) > 0 {
estr := eventsToEventStr(evset.Events)
// Generate parameter list for the metric computing test // Generate parameter list for the metric computing test
params := make(map[string]interface{}) params := make([]string, 0)
params["time"] = float64(1.0) params = append(params, "time", "inverseClock")
params["inverseClock"] = float64(1.0) // Generate parameter list for the global metric computing test
globalParams := make([]string, 0)
globalParams = append(globalParams, "time", "inverseClock")
// We test the eventset metrics whether they can be computed at all
for _, evset := range m.config.Eventsets {
if len(evset.Events) > 0 {
params = params[:2]
for counter := range evset.Events { for counter := range evset.Events {
params[counter] = float64(1.0) params = append(params, counter)
} }
for _, metric := range evset.Metrics { for _, metric := range evset.Metrics {
// Try to evaluate the metric // Try to evaluate the metric
_, err := agg.EvalFloat64Condition(metric.Calc, params) if testLikwidMetricFormula(metric.Calc, params) {
if err != nil { // Add the computable metric to the parameter list for the global metrics
cclog.ComponentError(m.name, "Calculation for metric", metric.Name, "failed:", err.Error()) globalParams = append(globalParams, metric.Name)
continue totalMetrics++
} } else {
// If the metric is not in the parameter list for the global metrics, add it metric.Calc = ""
if _, ok := globalParams[metric.Name]; !ok {
globalParams[metric.Name] = float64(1.0)
} }
} }
// Now we add the list of events to likwid
cstr = C.CString(estr)
gid = C.perfmon_addEventSet(cstr)
} else { } else {
cclog.ComponentError(m.name, "Invalid Likwid eventset config, no events given") cclog.ComponentError(m.name, "Invalid Likwid eventset config, no events given")
continue continue
} }
if gid >= 0 {
m.groups = append(m.groups, gid)
}
C.free(unsafe.Pointer(cstr))
m.results[i] = make(map[int]map[string]interface{})
m.mresults[i] = make(map[int]map[string]float64)
for tid := range m.cpulist {
m.results[i][tid] = make(map[string]interface{})
m.mresults[i][tid] = make(map[string]float64)
if i == 0 {
m.gmresults[tid] = make(map[string]float64)
}
}
} }
for _, metric := range m.config.Metrics { for _, metric := range m.config.Metrics {
// Try to evaluate the global metric // Try to evaluate the global metric
_, err := agg.EvalFloat64Condition(metric.Calc, globalParams) if !testLikwidMetricFormula(metric.Calc, globalParams) {
if err != nil { cclog.ComponentError(m.name, "Calculation for metric", metric.Name, "failed")
cclog.ComponentError(m.name, "Calculation for metric", metric.Name, "failed:", err.Error()) metric.Calc = ""
continue } else {
totalMetrics++
} }
} }
// If no event set could be added, shut down LikwidCollector // If no event set could be added, shut down LikwidCollector
if len(m.groups) == 0 { if totalMetrics == 0 {
C.perfmon_finalize() err := errors.New("no LIKWID eventset or metric usable")
C.topology_finalize()
err := errors.New("no LIKWID performance group initialized")
cclog.ComponentError(m.name, err.Error()) cclog.ComponentError(m.name, err.Error())
return err return err
} }
m.basefreq = getBaseFreq()
cclog.ComponentDebug(m.name, "BaseFreq", m.basefreq)
m.init = true m.init = true
return nil return nil
} }
// take a measurement for 'interval' seconds of event set index 'group' // take a measurement for 'interval' seconds of event set index 'group'
func (m *LikwidCollector) takeMeasurement(group int, interval time.Duration) error { func (m *LikwidCollector) takeMeasurement(evset LikwidEventsetConfig, interval time.Duration) (bool, error) {
var ret C.int var ret C.int
gid := m.groups[group] m.lock.Lock()
ret = C.perfmon_setupCounters(gid) if m.initialized {
ret = C.perfmon_setupCounters(evset.gid)
if ret != 0 { if ret != 0 {
gctr := C.GoString(C.perfmon_getGroupName(gid)) var err error = nil
err := fmt.Errorf("failed to setup performance group %d (%s)", gid, gctr) var skip bool = false
return err if ret == -37 {
skip = true
} else {
err = fmt.Errorf("failed to setup performance group %d", evset.gid)
}
m.lock.Unlock()
return skip, err
} }
ret = C.perfmon_startCounters() ret = C.perfmon_startCounters()
if ret != 0 { if ret != 0 {
gctr := C.GoString(C.perfmon_getGroupName(gid)) var err error = nil
err := fmt.Errorf("failed to start performance group %d (%s)", gid, gctr) var skip bool = false
return err if ret == -37 {
skip = true
} else {
err = fmt.Errorf("failed to setup performance group %d", evset.gid)
}
m.lock.Unlock()
return skip, err
} }
m.running = true m.running = true
time.Sleep(interval) time.Sleep(interval)
m.running = false m.running = false
ret = C.perfmon_stopCounters() ret = C.perfmon_stopCounters()
if ret != 0 { if ret != 0 {
gctr := C.GoString(C.perfmon_getGroupName(gid)) var err error = nil
err := fmt.Errorf("failed to stop performance group %d (%s)", gid, gctr) var skip bool = false
return err if ret == -37 {
skip = true
} else {
err = fmt.Errorf("failed to setup performance group %d", evset.gid)
} }
return nil m.lock.Unlock()
return skip, err
}
}
m.lock.Unlock()
return false, nil
} }
// Get all measurement results for an event set, derive the metric values out of the measurement results and send it // Get all measurement results for an event set, derive the metric values out of the measurement results and send it
func (m *LikwidCollector) calcEventsetMetrics(group int, interval time.Duration, output chan lp.CCMetric) error { func (m *LikwidCollector) calcEventsetMetrics(evset LikwidEventsetConfig, interval time.Duration, output chan lp.CCMetric) error {
var eidx C.int
evset := m.config.Eventsets[group]
gid := m.groups[group]
invClock := float64(1.0 / m.basefreq) invClock := float64(1.0 / m.basefreq)
// Go over events and get the results // Go over events and get the results
for eidx = 0; int(eidx) < len(evset.Events); eidx++ { for eidx, counter := range evset.eorder {
ctr := C.perfmon_getCounterName(gid, eidx) gctr := C.GoString(counter)
gctr := C.GoString(ctr)
for _, tid := range m.cpu2tid { for _, tid := range m.cpu2tid {
if tid >= 0 { res := C.perfmon_getLastResult(evset.gid, C.int(eidx), C.int(tid))
m.results[group][tid]["time"] = interval.Seconds() evset.results[tid][gctr] = float64(res)
m.results[group][tid]["inverseClock"] = invClock evset.results[tid]["time"] = interval.Seconds()
res := C.perfmon_getLastResult(gid, eidx, C.int(tid)) evset.results[tid]["inverseClock"] = invClock
m.results[group][tid][gctr] = float64(res)
}
} }
} }
// Go over the event set metrics, derive the value out of the event:counter values and send it // Go over the event set metrics, derive the value out of the event:counter values and send it
for _, metric := range evset.Metrics { for _, metric := range m.config.Eventsets[evset.internal].Metrics {
// The metric scope is determined in the Init() function // The metric scope is determined in the Init() function
// Get the map scope-id -> tids // Get the map scope-id -> tids
scopemap := m.cpu2tid scopemap := m.cpu2tid
@ -313,13 +335,13 @@ func (m *LikwidCollector) calcEventsetMetrics(group int, interval time.Duration,
scopemap = m.sock2tid scopemap = m.sock2tid
} }
for domain, tid := range scopemap { for domain, tid := range scopemap {
if tid >= 0 { if tid >= 0 && len(metric.Calc) > 0 {
value, err := agg.EvalFloat64Condition(metric.Calc, m.results[group][tid]) value, err := agg.EvalFloat64Condition(metric.Calc, evset.results[tid])
if err != nil { if err != nil {
cclog.ComponentError(m.name, "Calculation for metric", metric.Name, "failed:", err.Error()) cclog.ComponentError(m.name, "Calculation for metric", metric.Name, "failed:", err.Error())
continue continue
} }
m.mresults[group][tid][metric.Name] = value evset.metrics[tid][metric.Name] = value
if m.config.InvalidToZero && math.IsNaN(value) { if m.config.InvalidToZero && math.IsNaN(value) {
value = 0.0 value = 0.0
} }
@ -360,8 +382,8 @@ func (m *LikwidCollector) calcGlobalMetrics(interval time.Duration, output chan
if tid >= 0 { if tid >= 0 {
// Here we generate parameter list // Here we generate parameter list
params := make(map[string]interface{}) params := make(map[string]interface{})
for j := range m.groups { for _, evset := range m.likwidGroups {
for mname, mres := range m.mresults[j][tid] { for mname, mres := range evset.metrics[tid] {
params[mname] = mres params[mname] = mres
} }
} }
@ -401,38 +423,145 @@ func (m *LikwidCollector) calcGlobalMetrics(interval time.Duration, output chan
return nil return nil
} }
func (m *LikwidCollector) LateInit() error {
var ret C.int
switch m.config.AccessMode {
case "direct":
C.HPMmode(0)
case "accessdaemon":
if len(m.config.DaemonPath) > 0 {
p := os.Getenv("PATH")
os.Setenv("PATH", m.config.DaemonPath+":"+p)
}
C.HPMmode(1)
}
cclog.ComponentDebug(m.name, "initialize LIKWID topology")
ret = C.topology_init()
if ret != 0 {
err := errors.New("failed to initialize LIKWID topology")
cclog.ComponentError(m.name, err.Error())
return err
}
m.sock2tid = make(map[int]int)
tmp := make([]C.int, 1)
for _, sid := range topo.SocketList() {
cstr := C.CString(fmt.Sprintf("S%d:0", sid))
ret = C.cpustr_to_cpulist(cstr, &tmp[0], 1)
if ret > 0 {
m.sock2tid[sid] = m.cpu2tid[int(tmp[0])]
}
C.free(unsafe.Pointer(cstr))
}
m.basefreq = getBaseFreq()
cclog.ComponentDebug(m.name, "BaseFreq", m.basefreq)
cclog.ComponentDebug(m.name, "initialize LIKWID perfmon module")
ret = C.perfmon_init(C.int(len(m.cpulist)), &m.cpulist[0])
if ret != 0 {
var err error = nil
C.topology_finalize()
if ret != -22 {
err = errors.New("failed to initialize LIKWID perfmon")
cclog.ComponentError(m.name, err.Error())
} else {
err = errors.New("access to LIKWID perfmon locked")
}
return err
}
// While adding the events, we test the metrics whether they can be computed at all
for i, evset := range m.config.Eventsets {
var gid C.int
if len(evset.Events) > 0 {
likwidGroup := genLikwidEventSet(evset)
// Now we add the list of events to likwid
gid = C.perfmon_addEventSet(likwidGroup.estr)
if gid >= 0 {
likwidGroup.gid = gid
likwidGroup.internal = i
m.likwidGroups[gid] = likwidGroup
}
} else {
cclog.ComponentError(m.name, "Invalid Likwid eventset config, no events given")
continue
}
}
// If no event set could be added, shut down LikwidCollector
if len(m.likwidGroups) == 0 {
C.perfmon_finalize()
C.topology_finalize()
err := errors.New("no LIKWID performance group initialized")
cclog.ComponentError(m.name, err.Error())
return err
}
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGCHLD)
signal.Notify(sigchan, os.Interrupt)
go func() {
<-sigchan
signal.Stop(sigchan)
m.initialized = false
}()
m.initialized = true
return nil
}
// main read function taking multiple measurement rounds, each 'interval' seconds long // main read function taking multiple measurement rounds, each 'interval' seconds long
func (m *LikwidCollector) Read(interval time.Duration, output chan lp.CCMetric) { func (m *LikwidCollector) Read(interval time.Duration, output chan lp.CCMetric) {
var skip bool = false
var err error
if !m.init { if !m.init {
return return
} }
for i := range m.groups { if !m.initialized {
if m.LateInit() != nil {
return
}
}
if m.initialized && !skip {
for _, evset := range m.likwidGroups {
if !skip {
// measure event set 'i' for 'interval' seconds // measure event set 'i' for 'interval' seconds
err := m.takeMeasurement(i, interval) skip, err = m.takeMeasurement(evset, interval)
if err != nil { if err != nil {
cclog.ComponentError(m.name, err.Error()) cclog.ComponentError(m.name, err.Error())
return return
} }
// read measurements and derive event set metrics
m.calcEventsetMetrics(i, interval, output)
} }
if !skip {
// read measurements and derive event set metrics
m.calcEventsetMetrics(evset, interval, output)
}
}
if !skip {
// use the event set metrics to derive the global metrics // use the event set metrics to derive the global metrics
m.calcGlobalMetrics(interval, output) m.calcGlobalMetrics(interval, output)
}
}
} }
func (m *LikwidCollector) Close() { func (m *LikwidCollector) Close() {
if m.init { if m.init {
cclog.ComponentDebug(m.name, "Closing ...")
m.init = false m.init = false
if m.running { cclog.ComponentDebug(m.name, "Closing ...")
cclog.ComponentDebug(m.name, "Stopping counters") m.lock.Lock()
C.perfmon_stopCounters() if m.initialized {
}
cclog.ComponentDebug(m.name, "Finalize LIKWID perfmon module") cclog.ComponentDebug(m.name, "Finalize LIKWID perfmon module")
C.perfmon_finalize() C.perfmon_finalize()
m.initialized = false
}
m.lock.Unlock()
cclog.ComponentDebug(m.name, "Finalize LIKWID topology module") cclog.ComponentDebug(m.name, "Finalize LIKWID topology module")
C.topology_finalize() C.topology_finalize()
cclog.ComponentDebug(m.name, "Closing done") cclog.ComponentDebug(m.name, "Closing done")
} }
} }

View File

@ -36,7 +36,7 @@ type nfsCollector struct {
} }
func (m *nfsCollector) initStats() error { func (m *nfsCollector) initStats() error {
cmd := exec.Command(m.config.Nfsstats, `-l`) cmd := exec.Command(m.config.Nfsstats, `-l`, `--all`)
cmd.Wait() cmd.Wait()
buffer, err := cmd.Output() buffer, err := cmd.Output()
if err == nil { if err == nil {
@ -52,7 +52,7 @@ func (m *nfsCollector) initStats() error {
if err == nil { if err == nil {
x := m.data[name] x := m.data[name]
x.current = value x.current = value
x.last = 0 x.last = value
m.data[name] = x m.data[name] = x
} }
} }
@ -63,7 +63,7 @@ func (m *nfsCollector) initStats() error {
} }
func (m *nfsCollector) updateStats() error { func (m *nfsCollector) updateStats() error {
cmd := exec.Command(m.config.Nfsstats, `-l`) cmd := exec.Command(m.config.Nfsstats, `-l`, `--all`)
cmd.Wait() cmd.Wait()
buffer, err := cmd.Output() buffer, err := cmd.Output()
if err == nil { if err == nil {

View File

@ -169,7 +169,10 @@ func DieList() []int {
} }
} }
} }
if len(dielist) > 0 {
return dielist return dielist
}
return SocketList()
} }
type CpuEntry struct { type CpuEntry struct {
@ -261,7 +264,7 @@ func CpuData() []CpuEntry {
for _, c := range CpuList() { for _, c := range CpuList() {
clist = append(clist, CpuEntry{Cpuid: c}) clist = append(clist, CpuEntry{Cpuid: c})
} }
for _, centry := range clist { for i, centry := range clist {
centry.Socket = -1 centry.Socket = -1
centry.Numadomain = -1 centry.Numadomain = -1
centry.Die = -1 centry.Die = -1
@ -289,6 +292,8 @@ func CpuData() []CpuEntry {
// Lookup NUMA domain id // Lookup NUMA domain id
centry.Numadomain = getNumaDomain(base) centry.Numadomain = getNumaDomain(base)
// Update values in output list
clist[i] = centry
} }
return clist return clist
} }

View File

@ -8,6 +8,8 @@ The CCMetric router sits in between the collectors and the sinks and can be used
{ {
"num_cache_intervals" : 1, "num_cache_intervals" : 1,
"interval_timestamp" : true, "interval_timestamp" : true,
"hostname_tag" : "hostname",
"max_forward" : 50,
"add_tags" : [ "add_tags" : [
{ {
"key" : "cluster", "key" : "cluster",
@ -55,6 +57,20 @@ The CCMetric router sits in between the collectors and the sinks and can be used
``` ```
There are three main options `add_tags`, `delete_tags` and `interval_timestamp`. `add_tags` and `delete_tags` are lists consisting of dicts with `key`, `value` and `if`. The `value` can be omitted in the `delete_tags` part as it only uses the `key` for removal. The `interval_timestamp` setting means that a unique timestamp is applied to all metrics traversing the router during an interval. There are three main options `add_tags`, `delete_tags` and `interval_timestamp`. `add_tags` and `delete_tags` are lists consisting of dicts with `key`, `value` and `if`. The `value` can be omitted in the `delete_tags` part as it only uses the `key` for removal. The `interval_timestamp` setting means that a unique timestamp is applied to all metrics traversing the router during an interval.
# Processing order in the router
- Add the `hostname_tag` tag (if sent by collectors or cache)
- If `interval_timestamp == true`, change time of metrics
- Check if metric should be dropped (`drop_metrics` and `drop_metrics_if`)
- Add tags from `add_tags`
- Delete tags from `del_tags`
- Rename metric based on `rename_metrics` and store old name as `oldname` in meta information
- Add tags from `add_tags` (if you used the new name in the `if` condition)
- Delete tags from `del_tags` (if you used the new name in the `if` condition)
- Send to sinks
- Move to cache (if `num_cache_intervals > 0`)
# The `interval_timestamp` option # The `interval_timestamp` option
The collectors' `Read()` functions are not called simultaneously and therefore the metrics gathered in an interval can have different timestamps. If you want to avoid that and have a common timestamp (the beginning of the interval), set this option to `true` and the MetricRouter sets the time. The collectors' `Read()` functions are not called simultaneously and therefore the metrics gathered in an interval can have different timestamps. If you want to avoid that and have a common timestamp (the beginning of the interval), set this option to `true` and the MetricRouter sets the time.
@ -65,6 +81,14 @@ If the MetricRouter should buffer metrics of intervals in a MetricCache, this op
A `num_cache_intervals > 0` is required to use the `interval_aggregates` option. A `num_cache_intervals > 0` is required to use the `interval_aggregates` option.
# The `hostname_tag` option
By default, the router tags metrics with the hostname for all locally created metrics. The default tag name is `hostname`, but it can be changed if your organization wants anything else
# The `max_forward` option
Every time the router receives a metric through any of the channels, it tries to directly read up to `max_forward` metrics from the same channel. This was done as the router thread would go to sleep and wake up with every arriving metric. The default are `50` metrics at once and `max_forward` needs to greater than `1`.
# The `rename_metrics` option # The `rename_metrics` option
In the ClusterCockpit world we specified a set of standard metrics. Since some collectors determine the metric names based on files, execuables and libraries, they might change from system to system (or installation to installtion, OS to OS, ...). In order to get the common names, you can rename incoming metrics before sending them to the sink. If the metric name matches the `oldname`, it is changed to `newname` In the ClusterCockpit world we specified a set of standard metrics. Since some collectors determine the metric names based on files, execuables and libraries, they might change from system to system (or installation to installtion, OS to OS, ...). In order to get the common names, you can rename incoming metrics before sending them to the sink. If the metric name matches the `oldname`, it is changed to `newname`

View File

@ -103,7 +103,10 @@ func (r *metricRouter) Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, rout
cclog.ComponentError("MetricRouter", err.Error()) cclog.ComponentError("MetricRouter", err.Error())
return err return err
} }
r.maxForward = 1
if r.config.MaxForward > r.maxForward {
r.maxForward = r.config.MaxForward r.maxForward = r.config.MaxForward
}
if r.config.NumCacheIntervals > 0 { if r.config.NumCacheIntervals > 0 {
r.cache, err = NewCache(r.cache_input, r.ticker, &r.cachewg, r.config.NumCacheIntervals) r.cache, err = NewCache(r.cache_input, r.ticker, &r.cachewg, r.config.NumCacheIntervals)
if err != nil { if err != nil {