diff --git a/collectors/likwidMetric.go b/collectors/likwidMetric.go index 182cb72..957c4aa 100644 --- a/collectors/likwidMetric.go +++ b/collectors/likwidMetric.go @@ -24,7 +24,7 @@ import ( cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" topo "github.com/ClusterCockpit/cc-metric-collector/internal/ccTopology" - mr "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" + agg "github.com/ClusterCockpit/cc-metric-collector/internal/metricAggregator" ) type MetricScope string @@ -70,10 +70,10 @@ func GetAllMetricScopes() []MetricScope { } type LikwidCollectorMetricConfig struct { - Name string `json:"name"` // Name of the metric - Calc string `json:"calc"` // Calculation for the metric using - Aggr string `json:"aggregation"` // if scope unequal to LIKWID metric scope, the values are combined (sum, min, max, mean or avg, median) - Scope MetricScope `json:"scope"` // scope for calculation. subscopes are aggregated using the 'aggregation' function + Name string `json:"name"` // Name of the metric + Calc string `json:"calc"` // Calculation for the metric using + //Aggr string `json:"aggregation"` // if scope unequal to LIKWID metric scope, the values are combined (sum, min, max, mean or avg, median) + Scope MetricScope `json:"scope"` // scope for calculation. subscopes are aggregated using the 'aggregation' function Publish bool `json:"publish"` granulatity MetricScope } @@ -314,7 +314,7 @@ func (m *LikwidCollector) Init(config json.RawMessage) error { } for _, metric := range evset.Metrics { // Try to evaluate the metric - _, err := mr.EvalFloat64Condition(metric.Calc, params) + _, err := agg.EvalFloat64Condition(metric.Calc, params) if err != nil { cclog.ComponentError(m.name, "Calculation for metric", metric.Name, "failed:", err.Error()) continue @@ -343,7 +343,7 @@ func (m *LikwidCollector) Init(config json.RawMessage) error { } for _, metric := range m.config.Metrics { // Try to evaluate the global metric - _, err := mr.EvalFloat64Condition(metric.Calc, globalParams) + _, err := agg.EvalFloat64Condition(metric.Calc, globalParams) if err != nil { cclog.ComponentError(m.name, "Calculation for metric", metric.Name, "failed:", err.Error()) continue @@ -428,7 +428,7 @@ func (m *LikwidCollector) calcEventsetMetrics(group int, interval time.Duration, scopemap := m.scopeRespTids[metric.Scope] for domain, tid := range scopemap { if tid >= 0 { - value, err := mr.EvalFloat64Condition(metric.Calc, m.results[group][tid]) + value, err := agg.EvalFloat64Condition(metric.Calc, m.results[group][tid]) if err != nil { cclog.ComponentError(m.name, "Calculation for metric", metric.Name, "failed:", err.Error()) continue @@ -465,7 +465,7 @@ func (m *LikwidCollector) calcGlobalMetrics(interval time.Duration, output chan } } // Evaluate the metric - value, err := mr.EvalFloat64Condition(metric.Calc, params) + value, err := agg.EvalFloat64Condition(metric.Calc, params) if err != nil { cclog.ComponentError(m.name, "Calculation for metric", metric.Name, "failed:", err.Error()) continue diff --git a/internal/metricAggregator/README.md b/internal/metricAggregator/README.md new file mode 100644 index 0000000..bc07663 --- /dev/null +++ b/internal/metricAggregator/README.md @@ -0,0 +1,38 @@ +# The MetricAggregator + +In some cases, further combination of metrics or raw values is required. For that strings like `foo + 1` with runtime dependent `foo` need to be evaluated. The MetricAggregator relies on the [`gval`](https://github.com/PaesslerAG/gval) Golang package to perform all expression evaluation. The `gval` package provides the basic arithmetic operations but the MetricAggregator defines additional ones. + +**Note**: To get an impression which expressions can be handled by `gval`, see its [README](https://github.com/PaesslerAG/gval/blob/master/README.md) + +## Simple expression evaluation + +For simple expression evaluation, the MetricAggregator provides two function for different use-cases: +- `EvalBoolCondition(expression string, params map[string]interface{}`: Used by the MetricRouter to match metrics like `metric.Name() == 'mymetric'` +- `EvalFloat64Condition(expression string, params map[string]interface{})`: Used by the MetricRouter and LikwidCollector to derive new values like `(PMC0+PMC1)/PMC3` + +## MetricAggregator extensions for `gval` + +The MetricAggregator provides these functions additional to the `Full` language in `gval`: +- `sum(array)`: Sum up values in an array like `sum(values)` +- `min(array)`: Get the minimum value in an array like `min(values)` +- `avg(array)`: Get the mean value in an array like `avg(values)` +- `mean(array)`: Get the mean value in an array like `mean(values)` +- `max(array)`: Get the maximum value in an array like `max(values)` +- `len(array)`: Get the length of an array like `len(values)` +- `median(array)`: Get the median value in an array like `mean(values)` +- `in`: Check existence in an array like `0 in getCpuList()` to check whether there is an entry `0`. Also substring matching works like `temp in metric.Name()` +- `match`: Regular-expression matching like `match('temp_cores_%d+', metric.Name())`. **Note** all `\` in an regex has to be replaced with `%` +- `getCpuCore(cpuid)`: For a CPU id, the the corresponding CPU core id like `getCpuCore(0)` +- `getCpuSocket(cpuid)`: For a CPU id, the the corresponding CPU socket id +- `getCpuNuma(cpuid)`: For a CPU id, the the corresponding NUMA domain id +- `getCpuDie(cpuid)`: For a CPU id, the the corresponding CPU die id +- `getSockCpuList(sockid)`: For a given CPU socket id, the list of CPU ids is returned like the CPUs on socket 1 `getSockCpuList(1)` +- `getNumaCpuList(numaid)`: For a given NUMA node id, the list of CPU ids is returned +- `getDieCpuList(dieid)`: For a given CPU die id, the list of CPU ids is returned +- `getCoreCpuList(coreid)`: For a given CPU core id, the list of CPU ids is returned +- `getCpuList`: Get the list of all CPUs + +## Limitations + +- Since the metrics are written in JSON files which do not allow `""` without proper escaping inside of JSON strings, you have to use `''` for strings. +- Since `\` is interpreted by JSON as escape character, it cannot be used in metrics. But it is required to write regular expressions. So instead of `/`, use `%` and the MetricAggregator replaces them after reading the JSON file. \ No newline at end of file diff --git a/internal/metricRouter/metricAggregator.go b/internal/metricAggregator/metricAggregator.go similarity index 98% rename from internal/metricRouter/metricAggregator.go rename to internal/metricAggregator/metricAggregator.go index e3303e4..a05f061 100644 --- a/internal/metricRouter/metricAggregator.go +++ b/internal/metricAggregator/metricAggregator.go @@ -1,4 +1,4 @@ -package metricRouter +package metricAggregator import ( "context" @@ -16,7 +16,7 @@ import ( "github.com/PaesslerAG/gval" ) -type metricAggregatorIntervalConfig struct { +type MetricAggregatorIntervalConfig struct { Name string `json:"name"` // Metric name for the new metric Function string `json:"function"` // Function to apply on the metric Condition string `json:"if"` // Condition for applying function @@ -27,7 +27,7 @@ type metricAggregatorIntervalConfig struct { } type metricAggregator struct { - functions []*metricAggregatorIntervalConfig + functions []*MetricAggregatorIntervalConfig constants map[string]interface{} language gval.Language output chan lp.CCMetric @@ -65,7 +65,7 @@ var metricCacheLanguage = gval.NewLanguage( func (c *metricAggregator) Init(output chan lp.CCMetric) error { c.output = output - c.functions = make([]*metricAggregatorIntervalConfig, 0) + c.functions = make([]*MetricAggregatorIntervalConfig, 0) c.constants = make(map[string]interface{}) // add constants like hostname, numSockets, ... to constants list @@ -246,7 +246,7 @@ func (c *metricAggregator) AddAggregation(name, function, condition string, tags return nil } } - var agg metricAggregatorIntervalConfig + var agg MetricAggregatorIntervalConfig agg.Name = name agg.Condition = newcond agg.gvalCond = gvalCond diff --git a/internal/metricRouter/metricAggregatorFunctions.go b/internal/metricAggregator/metricAggregatorFunctions.go similarity index 99% rename from internal/metricRouter/metricAggregatorFunctions.go rename to internal/metricAggregator/metricAggregatorFunctions.go index f00479d..1fbef65 100644 --- a/internal/metricRouter/metricAggregatorFunctions.go +++ b/internal/metricAggregator/metricAggregatorFunctions.go @@ -1,4 +1,4 @@ -package metricRouter +package metricAggregator import ( "errors" diff --git a/internal/metricRouter/metricCache.go b/internal/metricRouter/metricCache.go index 1cfd8c3..67522c9 100644 --- a/internal/metricRouter/metricCache.go +++ b/internal/metricRouter/metricCache.go @@ -7,6 +7,7 @@ import ( cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + agg "github.com/ClusterCockpit/cc-metric-collector/internal/metricAggregator" mct "github.com/ClusterCockpit/cc-metric-collector/internal/multiChanTicker" ) @@ -28,7 +29,7 @@ type metricCache struct { tickchan chan time.Time done chan bool output chan lp.CCMetric - aggEngine MetricAggregator + aggEngine agg.MetricAggregator } type MetricCache interface { @@ -59,7 +60,7 @@ func (c *metricCache) Init(output chan lp.CCMetric, ticker mct.MultiChanTicker, // Create a new aggregation engine. No separate goroutine at the moment // The code is executed by the MetricCache goroutine - c.aggEngine, err = NewAggregator(c.output) + c.aggEngine, err = agg.NewAggregator(c.output) if err != nil { cclog.ComponentError("MetricCache", "Cannot create aggregator") return err diff --git a/internal/metricRouter/metricRouter.go b/internal/metricRouter/metricRouter.go index 6d63e15..a31f2a6 100644 --- a/internal/metricRouter/metricRouter.go +++ b/internal/metricRouter/metricRouter.go @@ -10,6 +10,7 @@ import ( cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + agg "github.com/ClusterCockpit/cc-metric-collector/internal/metricAggregator" mct "github.com/ClusterCockpit/cc-metric-collector/internal/multiChanTicker" ) @@ -22,15 +23,15 @@ type metricRouterTagConfig struct { // Metric router configuration type metricRouterConfig struct { - AddTags []metricRouterTagConfig `json:"add_tags"` // List of tags that are added when the condition is met - DelTags []metricRouterTagConfig `json:"delete_tags"` // List of tags that are removed when the condition is met - IntervalAgg []metricAggregatorIntervalConfig `json:"interval_aggregates"` // List of aggregation function processed at the end of an interval - DropMetrics []string `json:"drop_metrics"` // List of metric names to drop. For fine-grained dropping use drop_metrics_if - DropMetricsIf []string `json:"drop_metrics_if"` // List of evaluatable terms to drop metrics - RenameMetrics map[string]string `json:"rename_metrics"` // Map to rename metric name from key to value - IntervalStamp bool `json:"interval_timestamp"` // Update timestamp periodically by ticker each interval? - NumCacheIntervals int `json:"num_cache_intervals"` // Number of intervals of cached metrics for evaluation - dropMetrics map[string]bool // Internal map for O(1) lookup + AddTags []metricRouterTagConfig `json:"add_tags"` // List of tags that are added when the condition is met + DelTags []metricRouterTagConfig `json:"delete_tags"` // List of tags that are removed when the condition is met + IntervalAgg []agg.MetricAggregatorIntervalConfig `json:"interval_aggregates"` // List of aggregation function processed at the end of an interval + DropMetrics []string `json:"drop_metrics"` // List of metric names to drop. For fine-grained dropping use drop_metrics_if + DropMetricsIf []string `json:"drop_metrics_if"` // List of evaluatable terms to drop metrics + RenameMetrics map[string]string `json:"rename_metrics"` // Map to rename metric name from key to value + IntervalStamp bool `json:"interval_timestamp"` // Update timestamp periodically by ticker each interval? + NumCacheIntervals int `json:"num_cache_intervals"` // Number of intervals of cached metrics for evaluation + dropMetrics map[string]bool // Internal map for O(1) lookup } // Metric router data structure @@ -161,7 +162,7 @@ func (r *metricRouter) DoAddTags(point lp.CCMetric) { conditionMatches = true } else { var err error - conditionMatches, err = EvalBoolCondition(m.Condition, getParamMap(point)) + conditionMatches, err = agg.EvalBoolCondition(m.Condition, getParamMap(point)) if err != nil { cclog.ComponentError("MetricRouter", err.Error()) conditionMatches = false @@ -182,7 +183,7 @@ func (r *metricRouter) DoDelTags(point lp.CCMetric) { conditionMatches = true } else { var err error - conditionMatches, err = EvalBoolCondition(m.Condition, getParamMap(point)) + conditionMatches, err = agg.EvalBoolCondition(m.Condition, getParamMap(point)) if err != nil { cclog.ComponentError("MetricRouter", err.Error()) conditionMatches = false @@ -202,7 +203,7 @@ func (r *metricRouter) dropMetric(point lp.CCMetric) bool { } // Checking the dropping conditions for _, m := range r.config.DropMetricsIf { - conditionMatches, err := EvalBoolCondition(m, getParamMap(point)) + conditionMatches, err := agg.EvalBoolCondition(m, getParamMap(point)) if conditionMatches || err != nil { return true }