Merge develop branch into main (#123)

* Add cpu_used (all-cpu_idle) to CpustatCollector

* Update cc-metric-collector.init

* Allow selection of timestamp precision in HttpSink

* Add comment about precision requirement for cc-metric-store

* Fix for API changes in gofish@v0.15.0

* Update requirements to latest version

* Read sensors through redfish

* Update golang toolchain to 1.21

* Remove stray error check

* Update main config in configuration.md

* Update Release action to use golang 1.22 stable release, no golang RPMs anymore

* Update runonce action to use golang 1.22 stable release, no golang RPMs anymore

* Update README.md

Use right JSON type in configuration

* Update sink's README

* Test whether ipmitool or ipmi-sensors can be executed without errors

* Little fixes to the prometheus sink (#115)

* Add uint64 to float64 cast option

* Add prometheus sink to the list of available sinks

* Add aggregated counters by gpu for nvlink errors

---------

Co-authored-by: Michael Schwarz <schwarz@uni-paderborn.de>

* Ccmessage migration (#119)

* Add cpu_used (all-cpu_idle) to CpustatCollector

* Update cc-metric-collector.init

* Allow selection of timestamp precision in HttpSink

* Add comment about precision requirement for cc-metric-store

* Fix for API changes in gofish@v0.15.0

* Update requirements to latest version

* Read sensors through redfish

* Update golang toolchain to 1.21

* Remove stray error check

* Update main config in configuration.md

* Update Release action to use golang 1.22 stable release, no golang RPMs anymore

* Update runonce action to use golang 1.22 stable release, no golang RPMs anymore

* Switch to CCMessage for all files.

---------

Co-authored-by: Holger Obermaier <Holger.Obermaier@kit.edu>
Co-authored-by: Holger Obermaier <40787752+ho-ob@users.noreply.github.com>

* Switch to ccmessage also for latest additions in nvidiaMetric

* New Message processor (#118)

* Add cpu_used (all-cpu_idle) to CpustatCollector

* Update cc-metric-collector.init

* Allow selection of timestamp precision in HttpSink

* Add comment about precision requirement for cc-metric-store

* Fix for API changes in gofish@v0.15.0

* Update requirements to latest version

* Read sensors through redfish

* Update golang toolchain to 1.21

* Remove stray error check

* Update main config in configuration.md

* Update Release action to use golang 1.22 stable release, no golang RPMs anymore

* Update runonce action to use golang 1.22 stable release, no golang RPMs anymore

* New message processor to check whether a message should be dropped or manipulate it in flight

* Create a copy of message before manipulation

---------

Co-authored-by: Holger Obermaier <Holger.Obermaier@kit.edu>
Co-authored-by: Holger Obermaier <40787752+ho-ob@users.noreply.github.com>

* Update collector's Makefile and go.mod/sum files

* Use message processor in router, all sinks and all receivers

* Add support for credential file (NKEY) to NATS sink and receiver

* Fix JSON keys in message processor configuration

* Update docs for message processor, router and the default router config file

* Add link to expr syntax and fix regex matching docs

* Update sample collectors

* Minor style change in collector manager

* Some helpers for ccTopology

* LIKWID collector: write log owner change only once

* Fix for metrics without units and reduce debugging messages for messageProcessor

* Use shorted hostname for hostname added by router

* Define default port for NATS

* CPUstat collector: only add unit for applicable metrics

* Add precision option to all sinks using Influx's encoder

* Add message processor to all sink documentation

* Add units to documentation of cpustat collector

---------

Co-authored-by: Holger Obermaier <Holger.Obermaier@kit.edu>
Co-authored-by: Holger Obermaier <40787752+ho-ob@users.noreply.github.com>
Co-authored-by: oscarminus <me@oscarminus.de>
Co-authored-by: Michael Schwarz <schwarz@uni-paderborn.de>
This commit is contained in:
Thomas Gruber
2024-12-19 23:00:14 +01:00
committed by GitHub
parent 21646e1edf
commit 7840de7b82
74 changed files with 1902 additions and 1017 deletions

View File

@@ -11,7 +11,7 @@ import (
cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message"
topo "github.com/ClusterCockpit/cc-metric-collector/pkg/ccTopology"
"github.com/PaesslerAG/gval"
@@ -31,14 +31,14 @@ type metricAggregator struct {
functions []*MetricAggregatorIntervalConfig
constants map[string]interface{}
language gval.Language
output chan lp.CCMetric
output chan lp.CCMessage
}
type MetricAggregator interface {
AddAggregation(name, function, condition string, tags, meta map[string]string) error
DeleteAggregation(name string) error
Init(output chan lp.CCMetric) error
Eval(starttime time.Time, endtime time.Time, metrics []lp.CCMetric)
Init(output chan lp.CCMessage) error
Eval(starttime time.Time, endtime time.Time, metrics []lp.CCMessage)
}
var metricCacheLanguage = gval.NewLanguage(
@@ -74,7 +74,7 @@ var evaluables = struct {
mapping: make(map[string]gval.Evaluable),
}
func (c *metricAggregator) Init(output chan lp.CCMetric) error {
func (c *metricAggregator) Init(output chan lp.CCMessage) error {
c.output = output
c.functions = make([]*MetricAggregatorIntervalConfig, 0)
c.constants = make(map[string]interface{})
@@ -112,7 +112,7 @@ func (c *metricAggregator) Init(output chan lp.CCMetric) error {
return nil
}
func (c *metricAggregator) Eval(starttime time.Time, endtime time.Time, metrics []lp.CCMetric) {
func (c *metricAggregator) Eval(starttime time.Time, endtime time.Time, metrics []lp.CCMessage) {
vars := make(map[string]interface{})
for k, v := range c.constants {
vars[k] = v
@@ -127,7 +127,7 @@ func (c *metricAggregator) Eval(starttime time.Time, endtime time.Time, metrics
var valuesInt32 []int32
var valuesInt64 []int64
var valuesBool []bool
matches := make([]lp.CCMetric, 0)
matches := make([]lp.CCMessage, 0)
for _, m := range metrics {
vars["metric"] = m
//value, err := gval.Evaluate(f.Condition, vars, c.language)
@@ -216,7 +216,7 @@ func (c *metricAggregator) Eval(starttime time.Time, endtime time.Time, metrics
break
}
copy_tags := func(tags map[string]string, metrics []lp.CCMetric) map[string]string {
copy_tags := func(tags map[string]string, metrics []lp.CCMessage) map[string]string {
out := make(map[string]string)
for key, value := range tags {
switch value {
@@ -233,7 +233,7 @@ func (c *metricAggregator) Eval(starttime time.Time, endtime time.Time, metrics
}
return out
}
copy_meta := func(meta map[string]string, metrics []lp.CCMetric) map[string]string {
copy_meta := func(meta map[string]string, metrics []lp.CCMessage) map[string]string {
out := make(map[string]string)
for key, value := range meta {
switch value {
@@ -253,18 +253,18 @@ func (c *metricAggregator) Eval(starttime time.Time, endtime time.Time, metrics
tags := copy_tags(f.Tags, matches)
meta := copy_meta(f.Meta, matches)
var m lp.CCMetric
var m lp.CCMessage
switch t := value.(type) {
case float64:
m, err = lp.New(f.Name, tags, meta, map[string]interface{}{"value": t}, starttime)
m, err = lp.NewMessage(f.Name, tags, meta, map[string]interface{}{"value": t}, starttime)
case float32:
m, err = lp.New(f.Name, tags, meta, map[string]interface{}{"value": t}, starttime)
m, err = lp.NewMessage(f.Name, tags, meta, map[string]interface{}{"value": t}, starttime)
case int:
m, err = lp.New(f.Name, tags, meta, map[string]interface{}{"value": t}, starttime)
m, err = lp.NewMessage(f.Name, tags, meta, map[string]interface{}{"value": t}, starttime)
case int64:
m, err = lp.New(f.Name, tags, meta, map[string]interface{}{"value": t}, starttime)
m, err = lp.NewMessage(f.Name, tags, meta, map[string]interface{}{"value": t}, starttime)
case string:
m, err = lp.New(f.Name, tags, meta, map[string]interface{}{"value": t}, starttime)
m, err = lp.NewMessage(f.Name, tags, meta, map[string]interface{}{"value": t}, starttime)
default:
cclog.ComponentError("MetricCache", "Gval returned invalid type", t, "skipping metric", f.Name)
}
@@ -389,7 +389,7 @@ func EvalFloat64Condition(condition string, params map[string]float64) (float64,
return value, err
}
func NewAggregator(output chan lp.CCMetric) (MetricAggregator, error) {
func NewAggregator(output chan lp.CCMessage) (MetricAggregator, error) {
a := new(metricAggregator)
err := a.Init(output)
if err != nil {

View File

@@ -1,15 +1,21 @@
# CC Metric Router
The CCMetric router sits in between the collectors and the sinks and can be used to add and remove tags to/from traversing [CCMetrics](../ccMetric/README.md).
The CCMetric router sits in between the collectors and the sinks and can be used to add and remove tags to/from traversing [CCMessages](https://pkg.go.dev/github.com/ClusterCockpit/cc-energy-manager@v0.0.0-20240919152819-92a17f2da4f7/pkg/cc-message.
# Configuration
**Note**: Use the [message processor configuration](../../pkg/messageProcessor/README.md) with option `process_messages`.
```json
{
"num_cache_intervals" : 1,
"interval_timestamp" : true,
"hostname_tag" : "hostname",
"max_forward" : 50,
"process_messages": {
"see": "pkg/messageProcessor/README.md"
},
"add_tags" : [
{
"key" : "cluster",
@@ -63,6 +69,8 @@ The CCMetric router sits in between the collectors and the sinks and can be used
There are three main options `add_tags`, `delete_tags` and `interval_timestamp`. `add_tags` and `delete_tags` are lists consisting of dicts with `key`, `value` and `if`. The `value` can be omitted in the `delete_tags` part as it only uses the `key` for removal. The `interval_timestamp` setting means that a unique timestamp is applied to all metrics traversing the router during an interval.
**Note**: Use the [message processor configuration](../../pkg/messageProcessor/README.md) (option `process_messages`) instead of `add_tags`, `delete_tags`, `drop_metrics`, `drop_metrics_if`, `rename_metrics`, `normalize_units` and `change_unit_prefix`. These options are deprecated and will be removed in future versions. Until then, they are added to the message processor.
# Processing order in the router
- Add the `hostname_tag` tag (if sent by collectors or cache)
@@ -96,6 +104,8 @@ Every time the router receives a metric through any of the channels, it tries to
# The `rename_metrics` option
__deprecated__
In the ClusterCockpit world we specified a set of standard metrics. Since some collectors determine the metric names based on files, execuables and libraries, they might change from system to system (or installation to installtion, OS to OS, ...). In order to get the common names, you can rename incoming metrics before sending them to the sink. If the metric name matches the `oldname`, it is changed to `newname`
```json
@@ -107,6 +117,8 @@ In the ClusterCockpit world we specified a set of standard metrics. Since some c
# Conditional manipulation of tags (`add_tags` and `del_tags`)
__deprecated__
Common config format:
```json
{
@@ -118,6 +130,8 @@ Common config format:
## The `del_tags` option
__deprecated__
The collectors are free to add whatever `key=value` pair to the metric tags (although the usage of tags should be minimized). If you want to delete a tag afterwards, you can do that. When the `if` condition matches on a metric, the `key` is removed from the metric's tags.
If you want to remove a tag for all metrics, use the condition wildcard `*`. The `value` field can be omitted in the `del_tags` case.
@@ -129,6 +143,8 @@ Never delete tags:
## The `add_tags` option
__deprecated__
In some cases, metrics should be tagged or an existing tag changed based on some condition. This can be done in the `add_tags` section. When the `if` condition evaluates to `true`, the tag `key` is added or gets changed to the new `value`.
If the CCMetric name is equal to `temp_package_id_0`, it adds an additional tag `test=testing` to the metric.
@@ -170,6 +186,8 @@ In some cases, you want to drop a metric and don't get it forwarded to the sinks
## The `drop_metrics` section
__deprecated__
The argument is a list of metric names. No futher checks are performed, only a comparison of the metric name
```json
@@ -185,6 +203,8 @@ The example drops all metrics with the name `drop_metric_1` and `drop_metric_2`.
## The `drop_metrics_if` section
__deprecated__
This option takes a list of evaluable conditions and performs them one after the other on **all** metrics incoming from the collectors and the metric cache (aka `interval_aggregates`).
```json
@@ -200,15 +220,22 @@ The first line is comparable with the example in `drop_metrics`, it drops all me
# Manipulating the metric units
## The `normalize_units` option
__deprecated__
The cc-metric-collector tries to read the data from the system as it is reported. If available, it tries to read the metric unit from the system as well (e.g. from `/proc/meminfo`). The problem is that, depending on the source, the metric units are named differently. Just think about `byte`, `Byte`, `B`, `bytes`, ...
The [cc-units](https://github.com/ClusterCockpit/cc-units) package provides us a normalization option to use the same metric unit name for all metrics. It this option is set to true, all `unit` meta tags are normalized.
## The `change_unit_prefix` section
__deprecated__
It is often the case that metrics are reported by the system using a rather outdated unit prefix (like `/proc/meminfo` still uses kByte despite current memory sizes are in the GByte range). If you want to change the prefix of a unit, you can do that with the help of [cc-units](https://github.com/ClusterCockpit/cc-units). The setting works on the metric name and requires the new prefix for the metric. The cc-units package determines the scaling factor.
# Aggregate metric values of the current interval with the `interval_aggregates` option
**Note:** `interval_aggregates` works only if `num_cache_intervals` > 0
**Note:** `interval_aggregates` works only if `num_cache_intervals` > 0 and is **experimental**
In some cases, you need to derive new metrics based on the metrics arriving during an interval. This can be done in the `interval_aggregates` section. The logic is similar to the other metric manipulation and filtering options. A cache stores all metrics that arrive during an interval. At the beginning of the *next* interval, the list of metrics is submitted to the MetricAggregator. It derives new metrics and submits them back to the MetricRouter, so they are sent in the next interval but have the timestamp of the previous interval beginning.

View File

@@ -7,7 +7,7 @@ import (
cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
agg "github.com/ClusterCockpit/cc-metric-collector/internal/metricAggregator"
lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message"
mct "github.com/ClusterCockpit/cc-metric-collector/pkg/multiChanTicker"
)
@@ -16,7 +16,7 @@ type metricCachePeriod struct {
stopstamp time.Time
numMetrics int
sizeMetrics int
metrics []lp.CCMetric
metrics []lp.CCMessage
}
// Metric cache data structure
@@ -29,21 +29,21 @@ type metricCache struct {
ticker mct.MultiChanTicker
tickchan chan time.Time
done chan bool
output chan lp.CCMetric
output chan lp.CCMessage
aggEngine agg.MetricAggregator
}
type MetricCache interface {
Init(output chan lp.CCMetric, ticker mct.MultiChanTicker, wg *sync.WaitGroup, numPeriods int) error
Init(output chan lp.CCMessage, ticker mct.MultiChanTicker, wg *sync.WaitGroup, numPeriods int) error
Start()
Add(metric lp.CCMetric)
GetPeriod(index int) (time.Time, time.Time, []lp.CCMetric)
Add(metric lp.CCMessage)
GetPeriod(index int) (time.Time, time.Time, []lp.CCMessage)
AddAggregation(name, function, condition string, tags, meta map[string]string) error
DeleteAggregation(name string) error
Close()
}
func (c *metricCache) Init(output chan lp.CCMetric, ticker mct.MultiChanTicker, wg *sync.WaitGroup, numPeriods int) error {
func (c *metricCache) Init(output chan lp.CCMessage, ticker mct.MultiChanTicker, wg *sync.WaitGroup, numPeriods int) error {
var err error = nil
c.done = make(chan bool)
c.wg = wg
@@ -55,7 +55,7 @@ func (c *metricCache) Init(output chan lp.CCMetric, ticker mct.MultiChanTicker,
p := new(metricCachePeriod)
p.numMetrics = 0
p.sizeMetrics = 0
p.metrics = make([]lp.CCMetric, 0)
p.metrics = make([]lp.CCMessage, 0)
c.intervals = append(c.intervals, p)
}
@@ -124,7 +124,7 @@ func (c *metricCache) Start() {
// Add a metric to the cache. The interval is defined by the global timer (rotate() in Start())
// The intervals list is used as round-robin buffer and the metric list grows dynamically and
// to avoid reallocations
func (c *metricCache) Add(metric lp.CCMetric) {
func (c *metricCache) Add(metric lp.CCMessage) {
if c.curPeriod >= 0 && c.curPeriod < c.numPeriods {
c.lock.Lock()
p := c.intervals[c.curPeriod]
@@ -153,10 +153,10 @@ func (c *metricCache) DeleteAggregation(name string) error {
// Get all metrics of a interval. The index is the difference to the current interval, so index=0
// is the current one, index=1 the last interval and so on. Returns and empty array if a wrong index
// is given (negative index, index larger than configured number of total intervals, ...)
func (c *metricCache) GetPeriod(index int) (time.Time, time.Time, []lp.CCMetric) {
func (c *metricCache) GetPeriod(index int) (time.Time, time.Time, []lp.CCMessage) {
var start time.Time = time.Now()
var stop time.Time = time.Now()
var metrics []lp.CCMetric
var metrics []lp.CCMessage
if index >= 0 && index < c.numPeriods {
pindex := c.curPeriod - index
if pindex < 0 {
@@ -168,10 +168,10 @@ func (c *metricCache) GetPeriod(index int) (time.Time, time.Time, []lp.CCMetric)
metrics = c.intervals[pindex].metrics
//return c.intervals[pindex].startstamp, c.intervals[pindex].stopstamp, c.intervals[pindex].metrics
} else {
metrics = make([]lp.CCMetric, 0)
metrics = make([]lp.CCMessage, 0)
}
} else {
metrics = make([]lp.CCMetric, 0)
metrics = make([]lp.CCMessage, 0)
}
return start, stop, metrics
}
@@ -182,7 +182,7 @@ func (c *metricCache) Close() {
c.done <- true
}
func NewCache(output chan lp.CCMetric, ticker mct.MultiChanTicker, wg *sync.WaitGroup, numPeriods int) (MetricCache, error) {
func NewCache(output chan lp.CCMessage, ticker mct.MultiChanTicker, wg *sync.WaitGroup, numPeriods int) (MetricCache, error) {
c := new(metricCache)
err := c.Init(output, ticker, wg, numPeriods)
if err != nil {

View File

@@ -2,6 +2,7 @@ package metricRouter
import (
"encoding/json"
"fmt"
"os"
"strings"
"sync"
@@ -9,10 +10,10 @@ import (
cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message"
agg "github.com/ClusterCockpit/cc-metric-collector/internal/metricAggregator"
lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
mp "github.com/ClusterCockpit/cc-metric-collector/pkg/messageProcessor"
mct "github.com/ClusterCockpit/cc-metric-collector/pkg/multiChanTicker"
units "github.com/ClusterCockpit/cc-units"
)
const ROUTER_MAX_FORWARD = 50
@@ -38,16 +39,17 @@ type metricRouterConfig struct {
MaxForward int `json:"max_forward"` // Number of maximal forwarded metrics at one select
NormalizeUnits bool `json:"normalize_units"` // Check unit meta flag and normalize it using cc-units
ChangeUnitPrefix map[string]string `json:"change_unit_prefix"` // Add prefix that should be applied to the metrics
dropMetrics map[string]bool // Internal map for O(1) lookup
// dropMetrics map[string]bool // Internal map for O(1) lookup
MessageProcessor json.RawMessage `json:"process_message,omitempty"`
}
// Metric router data structure
type metricRouter struct {
hostname string // Hostname used in tags
coll_input chan lp.CCMetric // Input channel from CollectorManager
recv_input chan lp.CCMetric // Input channel from ReceiveManager
cache_input chan lp.CCMetric // Input channel from MetricCache
outputs []chan lp.CCMetric // List of all output channels
coll_input chan lp.CCMessage // Input channel from CollectorManager
recv_input chan lp.CCMessage // Input channel from ReceiveManager
cache_input chan lp.CCMessage // Input channel from MetricCache
outputs []chan lp.CCMessage // List of all output channels
done chan bool // channel to finish / stop metric router
wg *sync.WaitGroup // wait group for all goroutines in cc-metric-collector
timestamp time.Time // timestamp periodically updated by ticker each interval
@@ -56,14 +58,15 @@ type metricRouter struct {
cache MetricCache // pointer to MetricCache
cachewg sync.WaitGroup // wait group for MetricCache
maxForward int // number of metrics to forward maximally in one iteration
mp mp.MessageProcessor
}
// MetricRouter access functions
type MetricRouter interface {
Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, routerConfigFile string) error
AddCollectorInput(input chan lp.CCMetric)
AddReceiverInput(input chan lp.CCMetric)
AddOutput(output chan lp.CCMetric)
AddCollectorInput(input chan lp.CCMessage)
AddReceiverInput(input chan lp.CCMessage)
AddOutput(output chan lp.CCMessage)
Start()
Close()
}
@@ -75,9 +78,9 @@ type MetricRouter interface {
// * ticker (from variable ticker)
// * configuration (read from config file in variable routerConfigFile)
func (r *metricRouter) Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, routerConfigFile string) error {
r.outputs = make([]chan lp.CCMetric, 0)
r.outputs = make([]chan lp.CCMessage, 0)
r.done = make(chan bool)
r.cache_input = make(chan lp.CCMetric)
r.cache_input = make(chan lp.CCMessage)
r.wg = wg
r.ticker = ticker
r.config.MaxForward = ROUTER_MAX_FORWARD
@@ -119,14 +122,56 @@ func (r *metricRouter) Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, rout
r.cache.AddAggregation(agg.Name, agg.Function, agg.Condition, agg.Tags, agg.Meta)
}
}
r.config.dropMetrics = make(map[string]bool)
for _, mname := range r.config.DropMetrics {
r.config.dropMetrics[mname] = true
p, err := mp.NewMessageProcessor()
if err != nil {
return fmt.Errorf("initialization of message processor failed: %v", err.Error())
}
r.mp = p
if len(r.config.MessageProcessor) > 0 {
err = r.mp.FromConfigJSON(r.config.MessageProcessor)
if err != nil {
return fmt.Errorf("failed parsing JSON for message processor: %v", err.Error())
}
}
for _, mname := range r.config.DropMetrics {
r.mp.AddDropMessagesByName(mname)
}
for _, cond := range r.config.DropMetricsIf {
r.mp.AddDropMessagesByCondition(cond)
}
for _, data := range r.config.AddTags {
cond := data.Condition
if cond == "*" {
cond = "true"
}
r.mp.AddAddTagsByCondition(cond, data.Key, data.Value)
}
for _, data := range r.config.DelTags {
cond := data.Condition
if cond == "*" {
cond = "true"
}
r.mp.AddDeleteTagsByCondition(cond, data.Key, data.Value)
}
for oldname, newname := range r.config.RenameMetrics {
r.mp.AddRenameMetricByName(oldname, newname)
}
for metricName, prefix := range r.config.ChangeUnitPrefix {
r.mp.AddChangeUnitPrefix(fmt.Sprintf("name == '%s'", metricName), prefix)
}
r.mp.SetNormalizeUnits(r.config.NormalizeUnits)
r.mp.AddAddTagsByCondition("true", r.config.HostnameTagName, r.hostname)
// r.config.dropMetrics = make(map[string]bool)
// for _, mname := range r.config.DropMetrics {
// r.config.dropMetrics[mname] = true
// }
return nil
}
func getParamMap(point lp.CCMetric) map[string]interface{} {
func getParamMap(point lp.CCMessage) map[string]interface{} {
params := make(map[string]interface{})
params["metric"] = point
params["name"] = point.Name()
@@ -144,7 +189,7 @@ func getParamMap(point lp.CCMetric) map[string]interface{} {
}
// DoAddTags adds a tag when condition is fullfiled
func (r *metricRouter) DoAddTags(point lp.CCMetric) {
func (r *metricRouter) DoAddTags(point lp.CCMessage) {
var conditionMatches bool
for _, m := range r.config.AddTags {
if m.Condition == "*" {
@@ -166,81 +211,81 @@ func (r *metricRouter) DoAddTags(point lp.CCMetric) {
}
// DoDelTags removes a tag when condition is fullfiled
func (r *metricRouter) DoDelTags(point lp.CCMetric) {
var conditionMatches bool
for _, m := range r.config.DelTags {
if m.Condition == "*" {
// Condition is always matched
conditionMatches = true
} else {
// Evaluate condition
var err error
conditionMatches, err = agg.EvalBoolCondition(m.Condition, getParamMap(point))
if err != nil {
cclog.ComponentError("MetricRouter", err.Error())
conditionMatches = false
}
}
if conditionMatches {
point.RemoveTag(m.Key)
}
}
}
// func (r *metricRouter) DoDelTags(point lp.CCMessage) {
// var conditionMatches bool
// for _, m := range r.config.DelTags {
// if m.Condition == "*" {
// // Condition is always matched
// conditionMatches = true
// } else {
// // Evaluate condition
// var err error
// conditionMatches, err = agg.EvalBoolCondition(m.Condition, getParamMap(point))
// if err != nil {
// cclog.ComponentError("MetricRouter", err.Error())
// conditionMatches = false
// }
// }
// if conditionMatches {
// point.RemoveTag(m.Key)
// }
// }
// }
// Conditional test whether a metric should be dropped
func (r *metricRouter) dropMetric(point lp.CCMetric) bool {
// Simple drop check
if conditionMatches, ok := r.config.dropMetrics[point.Name()]; ok {
return conditionMatches
}
// func (r *metricRouter) dropMetric(point lp.CCMessage) bool {
// // Simple drop check
// if conditionMatches, ok := r.config.dropMetrics[point.Name()]; ok {
// return conditionMatches
// }
// Checking the dropping conditions
for _, m := range r.config.DropMetricsIf {
conditionMatches, err := agg.EvalBoolCondition(m, getParamMap(point))
if err != nil {
cclog.ComponentError("MetricRouter", err.Error())
conditionMatches = false
}
if conditionMatches {
return conditionMatches
}
}
// // Checking the dropping conditions
// for _, m := range r.config.DropMetricsIf {
// conditionMatches, err := agg.EvalBoolCondition(m, getParamMap(point))
// if err != nil {
// cclog.ComponentError("MetricRouter", err.Error())
// conditionMatches = false
// }
// if conditionMatches {
// return conditionMatches
// }
// }
// No dropping condition met
return false
}
// // No dropping condition met
// return false
// }
func (r *metricRouter) prepareUnit(point lp.CCMetric) bool {
if r.config.NormalizeUnits {
if in_unit, ok := point.GetMeta("unit"); ok {
u := units.NewUnit(in_unit)
if u.Valid() {
point.AddMeta("unit", u.Short())
}
}
}
if newP, ok := r.config.ChangeUnitPrefix[point.Name()]; ok {
// func (r *metricRouter) prepareUnit(point lp.CCMessage) bool {
// if r.config.NormalizeUnits {
// if in_unit, ok := point.GetMeta("unit"); ok {
// u := units.NewUnit(in_unit)
// if u.Valid() {
// point.AddMeta("unit", u.Short())
// }
// }
// }
// if newP, ok := r.config.ChangeUnitPrefix[point.Name()]; ok {
newPrefix := units.NewPrefix(newP)
// newPrefix := units.NewPrefix(newP)
if in_unit, ok := point.GetMeta("unit"); ok && newPrefix != units.InvalidPrefix {
u := units.NewUnit(in_unit)
if u.Valid() {
cclog.ComponentDebug("MetricRouter", "Change prefix to", newP, "for metric", point.Name())
conv, out_unit := units.GetUnitPrefixFactor(u, newPrefix)
if conv != nil && out_unit.Valid() {
if val, ok := point.GetField("value"); ok {
point.AddField("value", conv(val))
point.AddMeta("unit", out_unit.Short())
}
}
}
// if in_unit, ok := point.GetMeta("unit"); ok && newPrefix != units.InvalidPrefix {
// u := units.NewUnit(in_unit)
// if u.Valid() {
// cclog.ComponentDebug("MetricRouter", "Change prefix to", newP, "for metric", point.Name())
// conv, out_unit := units.GetUnitPrefixFactor(u, newPrefix)
// if conv != nil && out_unit.Valid() {
// if val, ok := point.GetField("value"); ok {
// point.AddField("value", conv(val))
// point.AddMeta("unit", out_unit.Short())
// }
// }
// }
}
}
// }
// }
return true
}
// return true
// }
// Start starts the metric router
func (r *metricRouter) Start() {
@@ -259,59 +304,75 @@ func (r *metricRouter) Start() {
// Forward takes a received metric, adds or deletes tags
// and forwards it to the output channels
forward := func(point lp.CCMetric) {
cclog.ComponentDebug("MetricRouter", "FORWARD", point)
r.DoAddTags(point)
r.DoDelTags(point)
name := point.Name()
if new, ok := r.config.RenameMetrics[name]; ok {
point.SetName(new)
point.AddMeta("oldname", name)
r.DoAddTags(point)
r.DoDelTags(point)
}
// forward := func(point lp.CCMessage) {
// cclog.ComponentDebug("MetricRouter", "FORWARD", point)
// r.DoAddTags(point)
// r.DoDelTags(point)
// name := point.Name()
// if new, ok := r.config.RenameMetrics[name]; ok {
// point.SetName(new)
// point.AddMeta("oldname", name)
// r.DoAddTags(point)
// r.DoDelTags(point)
// }
r.prepareUnit(point)
// r.prepareUnit(point)
for _, o := range r.outputs {
o <- point
}
}
// for _, o := range r.outputs {
// o <- point
// }
// }
// Foward message received from collector channel
coll_forward := func(p lp.CCMetric) {
coll_forward := func(p lp.CCMessage) {
// receive from metric collector
p.AddTag(r.config.HostnameTagName, r.hostname)
//p.AddTag(r.config.HostnameTagName, r.hostname)
if r.config.IntervalStamp {
p.SetTime(r.timestamp)
}
if !r.dropMetric(p) {
forward(p)
m, err := r.mp.ProcessMessage(p)
if err == nil && m != nil {
for _, o := range r.outputs {
o <- m
}
}
// if !r.dropMetric(p) {
// for _, o := range r.outputs {
// o <- point
// }
// }
// even if the metric is dropped, it is stored in the cache for
// aggregations
if r.config.NumCacheIntervals > 0 {
r.cache.Add(p)
r.cache.Add(m)
}
}
// Forward message received from receivers channel
recv_forward := func(p lp.CCMetric) {
recv_forward := func(p lp.CCMessage) {
// receive from receive manager
if r.config.IntervalStamp {
p.SetTime(r.timestamp)
}
if !r.dropMetric(p) {
forward(p)
m, err := r.mp.ProcessMessage(p)
if err == nil && m != nil {
for _, o := range r.outputs {
o <- m
}
}
// if !r.dropMetric(p) {
// forward(p)
// }
}
// Forward message received from cache channel
cache_forward := func(p lp.CCMetric) {
cache_forward := func(p lp.CCMessage) {
// receive from metric collector
if !r.dropMetric(p) {
p.AddTag(r.config.HostnameTagName, r.hostname)
forward(p)
m, err := r.mp.ProcessMessage(p)
if err == nil && m != nil {
for _, o := range r.outputs {
o <- m
}
}
}
@@ -358,17 +419,17 @@ func (r *metricRouter) Start() {
}
// AddCollectorInput adds a channel between metric collector and metric router
func (r *metricRouter) AddCollectorInput(input chan lp.CCMetric) {
func (r *metricRouter) AddCollectorInput(input chan lp.CCMessage) {
r.coll_input = input
}
// AddReceiverInput adds a channel between metric receiver and metric router
func (r *metricRouter) AddReceiverInput(input chan lp.CCMetric) {
func (r *metricRouter) AddReceiverInput(input chan lp.CCMessage) {
r.recv_input = input
}
// AddOutput adds a output channel to the metric router
func (r *metricRouter) AddOutput(output chan lp.CCMetric) {
func (r *metricRouter) AddOutput(output chan lp.CCMessage) {
r.outputs = append(r.outputs, output)
}