Configuration option to disable MetricCache completely

This commit is contained in:
Thomas Roehl 2022-02-02 15:30:14 +01:00
parent 2c13cecf13
commit 1222f7a32f
2 changed files with 28 additions and 15 deletions

View File

@ -6,6 +6,7 @@ The CCMetric router sits in between the collectors and the sinks and can be used
```json ```json
{ {
"num_cache_intervals" : 1,
"interval_timestamp" : true, "interval_timestamp" : true,
"add_tags" : [ "add_tags" : [
{ {
@ -58,6 +59,12 @@ There are three main options `add_tags`, `delete_tags` and `interval_timestamp`.
The collectors' `Read()` functions are not called simultaneously and therefore the metrics gathered in an interval can have different timestamps. If you want to avoid that and have a common timestamp (the beginning of the interval), set this option to `true` and the MetricRouter sets the time. The collectors' `Read()` functions are not called simultaneously and therefore the metrics gathered in an interval can have different timestamps. If you want to avoid that and have a common timestamp (the beginning of the interval), set this option to `true` and the MetricRouter sets the time.
# The `num_cache_intervals` option
If the MetricRouter should buffer metrics of intervals in a MetricCache, this option specifies the number of past intervals that should be kept. If `num_cache_intervals = 0`, the cache is disabled. With `num_cache_intervals = 1`, only the metrics of the last interval are buffered.
A `num_cache_intervals > 0` is required to use the `interval_aggregates` option.
# The `rename_metrics` option # The `rename_metrics` option
In the ClusterCockpit world we specified a set of standard metrics. Since some collectors determine the metric names based on files, execuables and libraries, they might change from system to system (or installation to installtion, OS to OS, ...). In order to get the common names, you can rename incoming metrics before sending them to the sink. If the metric name matches the `oldname`, it is changed to `newname` In the ClusterCockpit world we specified a set of standard metrics. Since some collectors determine the metric names based on files, execuables and libraries, they might change from system to system (or installation to installtion, OS to OS, ...). In order to get the common names, you can rename incoming metrics before sending them to the sink. If the metric name matches the `oldname`, it is changed to `newname`
@ -164,6 +171,8 @@ The first line is comparable with the example in `drop_metrics`, it drops all me
# Aggregate metric values of the current interval with the `interval_aggregates` option # Aggregate metric values of the current interval with the `interval_aggregates` option
**Note:** `interval_aggregates` works only if `num_cache_intervals` > 0
In some cases, you need to derive new metrics based on the metrics arriving during an interval. This can be done in the `interval_aggregates` section. The logic is similar to the other metric manipulation and filtering options. A cache stores all metrics that arrive during an interval. At the beginning of the *next* interval, the list of metrics is submitted to the MetricAggregator. It derives new metrics and submits them back to the MetricRouter, so they are sent in the next interval but have the timestamp of the previous interval beginning. In some cases, you need to derive new metrics based on the metrics arriving during an interval. This can be done in the `interval_aggregates` section. The logic is similar to the other metric manipulation and filtering options. A cache stores all metrics that arrive during an interval. At the beginning of the *next* interval, the list of metrics is submitted to the MetricAggregator. It derives new metrics and submits them back to the MetricRouter, so they are sent in the next interval but have the timestamp of the previous interval beginning.
```json ```json

View File

@ -95,17 +95,15 @@ func (r *metricRouter) Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, rout
cclog.ComponentError("MetricRouter", err.Error()) cclog.ComponentError("MetricRouter", err.Error())
return err return err
} }
numIntervals := r.config.NumCacheIntervals if r.config.NumCacheIntervals >= 0 {
if numIntervals <= 0 { r.cache, err = NewCache(r.cache_input, r.ticker, &r.cachewg, r.config.NumCacheIntervals)
numIntervals = 1 if err != nil {
} cclog.ComponentError("MetricRouter", "MetricCache initialization failed:", err.Error())
r.cache, err = NewCache(r.cache_input, r.ticker, &r.cachewg, numIntervals) return err
if err != nil { }
cclog.ComponentError("MetricRouter", "MetricCache initialization failed:", err.Error()) for _, agg := range r.config.IntervalAgg {
return err r.cache.AddAggregation(agg.Name, agg.Function, agg.Condition, agg.Tags, agg.Meta)
} }
for _, agg := range r.config.IntervalAgg {
r.cache.AddAggregation(agg.Name, agg.Function, agg.Condition, agg.Tags, agg.Meta)
} }
r.config.dropMetrics = make(map[string]bool) r.config.dropMetrics = make(map[string]bool)
for _, mname := range r.config.DropMetrics { for _, mname := range r.config.DropMetrics {
@ -244,7 +242,9 @@ func (r *metricRouter) Start() {
} }
// Start Metric Cache // Start Metric Cache
r.cache.Start() if r.config.NumCacheIntervals > 0 {
r.cache.Start()
}
r.wg.Add(1) r.wg.Add(1)
go func() { go func() {
@ -266,7 +266,9 @@ func (r *metricRouter) Start() {
} }
// even if the metric is dropped, it is stored in the cache for // even if the metric is dropped, it is stored in the cache for
// aggregations // aggregations
r.cache.Add(p) if r.config.NumCacheIntervals > 0 {
r.cache.Add(p)
}
case p := <-r.recv_input: case p := <-r.recv_input:
// receive from receive manager // receive from receive manager
@ -316,8 +318,10 @@ func (r *metricRouter) Close() {
// wait for close of channel r.timerdone // wait for close of channel r.timerdone
<-r.timerdone <-r.timerdone
} }
r.cache.Close() if r.config.NumCacheIntervals > 0 {
r.cachewg.Wait() r.cache.Close()
r.cachewg.Wait()
}
} }
// New creates a new initialized metric router // New creates a new initialized metric router