Add Cache and Aggregator to MetricRouter (#21)

* Add Cache and Aggregator to MetricRouter

* Close done channel in MetricCache
This commit is contained in:
Thomas Gruber
2022-01-30 15:03:21 +01:00
committed by GitHub
parent 11844d9d5d
commit cf810b1c0c
4 changed files with 885 additions and 13 deletions

View File

@@ -23,23 +23,28 @@ type metricRouterTagConfig struct {
// Metric router configuration
type metricRouterConfig struct {
AddTags []metricRouterTagConfig `json:"add_tags"` // List of tags that are added when the condition is met
DelTags []metricRouterTagConfig `json:"delete_tags"` // List of tags that are removed when the condition is met
IntervalStamp bool `json:"interval_timestamp"` // Update timestamp periodically by ticker each interval?
AddTags []metricRouterTagConfig `json:"add_tags"` // List of tags that are added when the condition is met
DelTags []metricRouterTagConfig `json:"delete_tags"` // List of tags that are removed when the condition is met
IntervalAgg []metricAggregatorIntervalConfig `json:"interval_aggregates"` // List of aggregation function processed at the end of an interval
IntervalStamp bool `json:"interval_timestamp"` // Update timestamp periodically by ticker each interval?
NumCacheIntervals int `json:"num_cache_intervals"` // Number of intervals of cached metrics for evaluation
}
// Metric router data structure
type metricRouter struct {
hostname string // Hostname used in tags
coll_input chan lp.CCMetric // Input channel from CollectorManager
recv_input chan lp.CCMetric // Input channel from ReceiveManager
outputs []chan lp.CCMetric // List of all output channels
done chan bool // channel to finish / stop metric router
wg *sync.WaitGroup // wait group for all goroutines in cc-metric-collector
timestamp time.Time // timestamp periodically updated by ticker each interval
timerdone chan bool // channel to finish / stop timestamp updater
ticker mct.MultiChanTicker // periodically ticking once each interval
config metricRouterConfig // json encoded config for metric router
hostname string // Hostname used in tags
coll_input chan lp.CCMetric // Input channel from CollectorManager
recv_input chan lp.CCMetric // Input channel from ReceiveManager
cache_input chan lp.CCMetric // Input channel from MetricCache
outputs []chan lp.CCMetric // List of all output channels
done chan bool // channel to finish / stop metric router
wg *sync.WaitGroup // wait group for all goroutines in cc-metric-collector
timestamp time.Time // timestamp periodically updated by ticker each interval
timerdone chan bool // channel to finish / stop timestamp updater
ticker mct.MultiChanTicker // periodically ticking once each interval
config metricRouterConfig // json encoded config for metric router
cache MetricCache // pointer to MetricCache
cachewg sync.WaitGroup // wait group for MetricCache
}
// MetricRouter access functions
@@ -61,6 +66,7 @@ type MetricRouter interface {
func (r *metricRouter) Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, routerConfigFile string) error {
r.outputs = make([]chan lp.CCMetric, 0)
r.done = make(chan bool)
r.cache_input = make(chan lp.CCMetric)
r.wg = wg
r.ticker = ticker
@@ -86,6 +92,18 @@ func (r *metricRouter) Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, rout
cclog.ComponentError("MetricRouter", err.Error())
return err
}
numIntervals := r.config.NumCacheIntervals
if numIntervals <= 0 {
numIntervals = 1
}
r.cache, err = NewCache(r.cache_input, r.ticker, &r.cachewg, numIntervals)
if err != nil {
cclog.ComponentError("MetricRouter", "MetricCache initialization failed:", err.Error())
return err
}
for _, agg := range r.config.IntervalAgg {
r.cache.AddAggregation(agg.Name, agg.Function, agg.Condition, agg.Tags, agg.Meta)
}
return nil
}
@@ -211,6 +229,9 @@ func (r *metricRouter) Start() {
}
}
// Start Metric Cache
r.cache.Start()
r.wg.Add(1)
go func() {
defer r.wg.Done()
@@ -227,6 +248,7 @@ func (r *metricRouter) Start() {
p.SetTime(r.timestamp)
}
forward(p)
r.cache.Add(p)
case p := <-r.recv_input:
// receive from receive manager
@@ -234,6 +256,11 @@ func (r *metricRouter) Start() {
p.SetTime(r.timestamp)
}
forward(p)
case p := <-r.cache_input:
// receive from metric collector
p.AddTag("hostname", r.hostname)
forward(p)
}
}
}()
@@ -267,6 +294,8 @@ func (r *metricRouter) Close() {
// wait for close of channel r.timerdone
<-r.timerdone
}
r.cache.Close()
r.cachewg.Wait()
}
// New creates a new initialized metric router