Split MetricRouter and MetricAggregator

This commit is contained in:
Thomas Roehl 2022-02-02 17:03:10 +01:00
parent e59852be03
commit b79567623d
5 changed files with 28 additions and 27 deletions

View File

@ -24,7 +24,7 @@ import (
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
topo "github.com/ClusterCockpit/cc-metric-collector/internal/ccTopology" topo "github.com/ClusterCockpit/cc-metric-collector/internal/ccTopology"
mr "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" agg "github.com/ClusterCockpit/cc-metric-collector/internal/metricAggregator"
) )
type MetricScope string type MetricScope string
@ -72,7 +72,7 @@ func GetAllMetricScopes() []MetricScope {
type LikwidCollectorMetricConfig struct { type LikwidCollectorMetricConfig struct {
Name string `json:"name"` // Name of the metric Name string `json:"name"` // Name of the metric
Calc string `json:"calc"` // Calculation for the metric using Calc string `json:"calc"` // Calculation for the metric using
Aggr string `json:"aggregation"` // if scope unequal to LIKWID metric scope, the values are combined (sum, min, max, mean or avg, median) //Aggr string `json:"aggregation"` // if scope unequal to LIKWID metric scope, the values are combined (sum, min, max, mean or avg, median)
Scope MetricScope `json:"scope"` // scope for calculation. subscopes are aggregated using the 'aggregation' function Scope MetricScope `json:"scope"` // scope for calculation. subscopes are aggregated using the 'aggregation' function
Publish bool `json:"publish"` Publish bool `json:"publish"`
granulatity MetricScope granulatity MetricScope
@ -316,7 +316,7 @@ func (m *LikwidCollector) Init(config json.RawMessage) error {
} }
for _, metric := range evset.Metrics { for _, metric := range evset.Metrics {
// Try to evaluate the metric // Try to evaluate the metric
_, err := mr.EvalFloat64Condition(metric.Calc, params) _, err := agg.EvalFloat64Condition(metric.Calc, params)
if err != nil { if err != nil {
cclog.ComponentError(m.name, "Calculation for metric", metric.Name, "failed:", err.Error()) cclog.ComponentError(m.name, "Calculation for metric", metric.Name, "failed:", err.Error())
continue continue
@ -345,7 +345,7 @@ func (m *LikwidCollector) Init(config json.RawMessage) error {
} }
for _, metric := range m.config.Metrics { for _, metric := range m.config.Metrics {
// Try to evaluate the global metric // Try to evaluate the global metric
_, err := mr.EvalFloat64Condition(metric.Calc, globalParams) _, err := agg.EvalFloat64Condition(metric.Calc, globalParams)
if err != nil { if err != nil {
cclog.ComponentError(m.name, "Calculation for metric", metric.Name, "failed:", err.Error()) cclog.ComponentError(m.name, "Calculation for metric", metric.Name, "failed:", err.Error())
continue continue
@ -430,7 +430,7 @@ func (m *LikwidCollector) calcEventsetMetrics(group int, interval time.Duration,
scopemap := m.scopeRespTids[metric.Scope] scopemap := m.scopeRespTids[metric.Scope]
for domain, tid := range scopemap { for domain, tid := range scopemap {
if tid >= 0 { if tid >= 0 {
value, err := mr.EvalFloat64Condition(metric.Calc, m.results[group][tid]) value, err := agg.EvalFloat64Condition(metric.Calc, m.results[group][tid])
if err != nil { if err != nil {
cclog.ComponentError(m.name, "Calculation for metric", metric.Name, "failed:", err.Error()) cclog.ComponentError(m.name, "Calculation for metric", metric.Name, "failed:", err.Error())
continue continue
@ -467,7 +467,7 @@ func (m *LikwidCollector) calcGlobalMetrics(interval time.Duration, output chan
} }
} }
// Evaluate the metric // Evaluate the metric
value, err := mr.EvalFloat64Condition(metric.Calc, params) value, err := agg.EvalFloat64Condition(metric.Calc, params)
if err != nil { if err != nil {
cclog.ComponentError(m.name, "Calculation for metric", metric.Name, "failed:", err.Error()) cclog.ComponentError(m.name, "Calculation for metric", metric.Name, "failed:", err.Error())
continue continue

View File

@ -1,4 +1,4 @@
package metricRouter package metricAggregator
import ( import (
"context" "context"
@ -16,7 +16,7 @@ import (
"github.com/PaesslerAG/gval" "github.com/PaesslerAG/gval"
) )
type metricAggregatorIntervalConfig struct { type MetricAggregatorIntervalConfig struct {
Name string `json:"name"` // Metric name for the new metric Name string `json:"name"` // Metric name for the new metric
Function string `json:"function"` // Function to apply on the metric Function string `json:"function"` // Function to apply on the metric
Condition string `json:"if"` // Condition for applying function Condition string `json:"if"` // Condition for applying function
@ -27,7 +27,7 @@ type metricAggregatorIntervalConfig struct {
} }
type metricAggregator struct { type metricAggregator struct {
functions []*metricAggregatorIntervalConfig functions []*MetricAggregatorIntervalConfig
constants map[string]interface{} constants map[string]interface{}
language gval.Language language gval.Language
output chan lp.CCMetric output chan lp.CCMetric
@ -65,7 +65,7 @@ var metricCacheLanguage = gval.NewLanguage(
func (c *metricAggregator) Init(output chan lp.CCMetric) error { func (c *metricAggregator) Init(output chan lp.CCMetric) error {
c.output = output c.output = output
c.functions = make([]*metricAggregatorIntervalConfig, 0) c.functions = make([]*MetricAggregatorIntervalConfig, 0)
c.constants = make(map[string]interface{}) c.constants = make(map[string]interface{})
// add constants like hostname, numSockets, ... to constants list // add constants like hostname, numSockets, ... to constants list
@ -246,7 +246,7 @@ func (c *metricAggregator) AddAggregation(name, function, condition string, tags
return nil return nil
} }
} }
var agg metricAggregatorIntervalConfig var agg MetricAggregatorIntervalConfig
agg.Name = name agg.Name = name
agg.Condition = newcond agg.Condition = newcond
agg.gvalCond = gvalCond agg.gvalCond = gvalCond

View File

@ -1,4 +1,4 @@
package metricRouter package metricAggregator
import ( import (
"errors" "errors"

View File

@ -10,6 +10,7 @@ import (
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
agg "github.com/ClusterCockpit/cc-metric-collector/internal/metricAggregator"
mct "github.com/ClusterCockpit/cc-metric-collector/internal/multiChanTicker" mct "github.com/ClusterCockpit/cc-metric-collector/internal/multiChanTicker"
) )
@ -24,7 +25,7 @@ type metricRouterTagConfig struct {
type metricRouterConfig struct { type metricRouterConfig struct {
AddTags []metricRouterTagConfig `json:"add_tags"` // List of tags that are added when the condition is met AddTags []metricRouterTagConfig `json:"add_tags"` // List of tags that are added when the condition is met
DelTags []metricRouterTagConfig `json:"delete_tags"` // List of tags that are removed when the condition is met DelTags []metricRouterTagConfig `json:"delete_tags"` // List of tags that are removed when the condition is met
IntervalAgg []metricAggregatorIntervalConfig `json:"interval_aggregates"` // List of aggregation function processed at the end of an interval IntervalAgg []agg.MetricAggregatorIntervalConfig `json:"interval_aggregates"` // List of aggregation function processed at the end of an interval
DropMetrics []string `json:"drop_metrics"` // List of metric names to drop. For fine-grained dropping use drop_metrics_if DropMetrics []string `json:"drop_metrics"` // List of metric names to drop. For fine-grained dropping use drop_metrics_if
DropMetricsIf []string `json:"drop_metrics_if"` // List of evaluatable terms to drop metrics DropMetricsIf []string `json:"drop_metrics_if"` // List of evaluatable terms to drop metrics
RenameMetrics map[string]string `json:"rename_metrics"` // Map to rename metric name from key to value RenameMetrics map[string]string `json:"rename_metrics"` // Map to rename metric name from key to value
@ -161,7 +162,7 @@ func (r *metricRouter) DoAddTags(point lp.CCMetric) {
conditionMatches = true conditionMatches = true
} else { } else {
var err error var err error
conditionMatches, err = EvalBoolCondition(m.Condition, getParamMap(point)) conditionMatches, err = agg.EvalBoolCondition(m.Condition, getParamMap(point))
if err != nil { if err != nil {
cclog.ComponentError("MetricRouter", err.Error()) cclog.ComponentError("MetricRouter", err.Error())
conditionMatches = false conditionMatches = false
@ -182,7 +183,7 @@ func (r *metricRouter) DoDelTags(point lp.CCMetric) {
conditionMatches = true conditionMatches = true
} else { } else {
var err error var err error
conditionMatches, err = EvalBoolCondition(m.Condition, getParamMap(point)) conditionMatches, err = agg.EvalBoolCondition(m.Condition, getParamMap(point))
if err != nil { if err != nil {
cclog.ComponentError("MetricRouter", err.Error()) cclog.ComponentError("MetricRouter", err.Error())
conditionMatches = false conditionMatches = false
@ -202,7 +203,7 @@ func (r *metricRouter) dropMetric(point lp.CCMetric) bool {
} }
// Checking the dropping conditions // Checking the dropping conditions
for _, m := range r.config.DropMetricsIf { for _, m := range r.config.DropMetricsIf {
conditionMatches, err := EvalBoolCondition(m, getParamMap(point)) conditionMatches, err := agg.EvalBoolCondition(m, getParamMap(point))
if conditionMatches || err != nil { if conditionMatches || err != nil {
return true return true
} }