Use the MetricAggregator for all calculations in the MetricRouter

This commit is contained in:
Thomas Roehl
2022-02-01 18:27:59 +01:00
parent 64a12b80bb
commit a4bd141786
2 changed files with 130 additions and 28 deletions

View File

@@ -11,7 +11,6 @@ import (
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
mct "github.com/ClusterCockpit/cc-metric-collector/internal/multiChanTicker"
"gopkg.in/Knetic/govaluate.v2"
)
// Metric router tag configuration
@@ -26,8 +25,12 @@ type metricRouterConfig struct {
AddTags []metricRouterTagConfig `json:"add_tags"` // List of tags that are added when the condition is met
DelTags []metricRouterTagConfig `json:"delete_tags"` // List of tags that are removed when the condition is met
IntervalAgg []metricAggregatorIntervalConfig `json:"interval_aggregates"` // List of aggregation function processed at the end of an interval
DropMetrics []string `json:"drop_metrics"` // List of metric names to drop. For fine-grained dropping use drop_metrics_if
DropMetricsIf []string `json:"drop_metrics_if"` // List of evaluatable terms to drop metrics
RenameMetrics map[string]string `json:"rename_metrics"` // Map to rename metric name from key to value
IntervalStamp bool `json:"interval_timestamp"` // Update timestamp periodically by ticker each interval?
NumCacheIntervals int `json:"num_cache_intervals"` // Number of intervals of cached metrics for evaluation
dropMetrics map[string]bool // Internal map for O(1) lookup
}
// Metric router data structure
@@ -104,6 +107,10 @@ func (r *metricRouter) Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, rout
for _, agg := range r.config.IntervalAgg {
r.cache.AddAggregation(agg.Name, agg.Function, agg.Condition, agg.Tags, agg.Meta)
}
r.config.dropMetrics = make(map[string]bool)
for _, mname := range r.config.DropMetrics {
r.config.dropMetrics[mname] = true
}
return nil
}
@@ -130,16 +137,9 @@ func (r *metricRouter) StartTimer() {
cclog.ComponentDebug("MetricRouter", "TIMER START")
}
// EvalCondition evaluates condition cond for metric data from point
func (r *metricRouter) EvalCondition(cond string, point lp.CCMetric) (bool, error) {
expression, err := govaluate.NewEvaluableExpression(cond)
if err != nil {
cclog.ComponentDebug("MetricRouter", cond, " = ", err.Error())
return false, err
}
// Add metric name, tags, meta data, fields and timestamp to the parameter list
func getParamMap(point lp.CCMetric) map[string]interface{} {
params := make(map[string]interface{})
params["metric"] = point
params["name"] = point.Name()
for key, value := range point.Tags() {
params[key] = value
@@ -151,26 +151,19 @@ func (r *metricRouter) EvalCondition(cond string, point lp.CCMetric) (bool, erro
params[f.Key] = f.Value
}
params["timestamp"] = point.Time()
// evaluate condition
result, err := expression.Evaluate(params)
if err != nil {
cclog.ComponentDebug("MetricRouter", cond, " = ", err.Error())
return false, err
}
return bool(result.(bool)), err
return params
}
// DoAddTags adds a tag when condition is fullfiled
func (r *metricRouter) DoAddTags(point lp.CCMetric) {
for _, m := range r.config.AddTags {
var conditionMatches bool
var conditionMatches bool = false
if m.Condition == "*" {
conditionMatches = true
} else {
var err error
conditionMatches, err = r.EvalCondition(m.Condition, point)
conditionMatches, err = EvalBoolCondition(m.Condition, getParamMap(point))
if err != nil {
cclog.ComponentError("MetricRouter", err.Error())
conditionMatches = false
@@ -185,13 +178,13 @@ func (r *metricRouter) DoAddTags(point lp.CCMetric) {
// DoDelTags removes a tag when condition is fullfiled
func (r *metricRouter) DoDelTags(point lp.CCMetric) {
for _, m := range r.config.DelTags {
var conditionMatches bool
var conditionMatches bool = false
if m.Condition == "*" {
conditionMatches = true
} else {
var err error
conditionMatches, err = r.EvalCondition(m.Condition, point)
conditionMatches, err = EvalBoolCondition(m.Condition, getParamMap(point))
if err != nil {
cclog.ComponentError("MetricRouter", err.Error())
conditionMatches = false
@@ -203,9 +196,24 @@ func (r *metricRouter) DoDelTags(point lp.CCMetric) {
}
}
// Conditional test whether a metric should be dropped
func (r *metricRouter) dropMetric(point lp.CCMetric) bool {
// Simple drop check
if _, ok := r.config.dropMetrics[point.Name()]; ok {
return true
}
// Checking the dropping conditions
for _, m := range r.config.DropMetricsIf {
conditionMatches, err := EvalBoolCondition(m, getParamMap(point))
if conditionMatches || err != nil {
return true
}
}
return false
}
// Start starts the metric router
func (r *metricRouter) Start() {
// start timer if configured
r.timestamp = time.Now()
if r.config.IntervalStamp {
@@ -224,6 +232,12 @@ func (r *metricRouter) Start() {
cclog.ComponentDebug("MetricRouter", "FORWARD", point)
r.DoAddTags(point)
r.DoDelTags(point)
if new, ok := r.config.RenameMetrics[point.Name()]; ok {
point.SetName(new)
}
r.DoAddTags(point)
r.DoDelTags(point)
for _, o := range r.outputs {
o <- point
}
@@ -247,7 +261,11 @@ func (r *metricRouter) Start() {
if r.config.IntervalStamp {
p.SetTime(r.timestamp)
}
forward(p)
if !r.dropMetric(p) {
forward(p)
}
// even if the metric is dropped, it is stored in the cache for
// aggregations
r.cache.Add(p)
case p := <-r.recv_input:
@@ -255,12 +273,16 @@ func (r *metricRouter) Start() {
if r.config.IntervalStamp {
p.SetTime(r.timestamp)
}
forward(p)
if !r.dropMetric(p) {
forward(p)
}
case p := <-r.cache_input:
// receive from metric collector
p.AddTag("hostname", r.hostname)
forward(p)
if !r.dropMetric(p) {
p.AddTag("hostname", r.hostname)
forward(p)
}
}
}
}()