Thomas Gruber cf810b1c0c
Add Cache and Aggregator to MetricRouter (#21)
* Add Cache and Aggregator to MetricRouter

* Close done channel in MetricCache
2022-01-30 15:03:21 +01:00

310 lines
8.7 KiB

package metricRouter
import (
cclog ""
lp ""
mct ""
// Metric router tag configuration
type metricRouterTagConfig struct {
Key string `json:"key"` // Tag name
Value string `json:"value"` // Tag value
Condition string `json:"if"` // Condition for adding or removing corresponding tag
// Metric router configuration
type metricRouterConfig struct {
AddTags []metricRouterTagConfig `json:"add_tags"` // List of tags that are added when the condition is met
DelTags []metricRouterTagConfig `json:"delete_tags"` // List of tags that are removed when the condition is met
IntervalAgg []metricAggregatorIntervalConfig `json:"interval_aggregates"` // List of aggregation function processed at the end of an interval
IntervalStamp bool `json:"interval_timestamp"` // Update timestamp periodically by ticker each interval?
NumCacheIntervals int `json:"num_cache_intervals"` // Number of intervals of cached metrics for evaluation
// Metric router data structure
type metricRouter struct {
hostname string // Hostname used in tags
coll_input chan lp.CCMetric // Input channel from CollectorManager
recv_input chan lp.CCMetric // Input channel from ReceiveManager
cache_input chan lp.CCMetric // Input channel from MetricCache
outputs []chan lp.CCMetric // List of all output channels
done chan bool // channel to finish / stop metric router
wg *sync.WaitGroup // wait group for all goroutines in cc-metric-collector
timestamp time.Time // timestamp periodically updated by ticker each interval
timerdone chan bool // channel to finish / stop timestamp updater
ticker mct.MultiChanTicker // periodically ticking once each interval
config metricRouterConfig // json encoded config for metric router
cache MetricCache // pointer to MetricCache
cachewg sync.WaitGroup // wait group for MetricCache
// MetricRouter access functions
type MetricRouter interface {
Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, routerConfigFile string) error
AddCollectorInput(input chan lp.CCMetric)
AddReceiverInput(input chan lp.CCMetric)
AddOutput(output chan lp.CCMetric)
// Init initializes a metric router by setting up:
// * input and output channels
// * done channel
// * wait group synchronization (from variable wg)
// * ticker (from variable ticker)
// * configuration (read from config file in variable routerConfigFile)
func (r *metricRouter) Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, routerConfigFile string) error {
r.outputs = make([]chan lp.CCMetric, 0)
r.done = make(chan bool)
r.cache_input = make(chan lp.CCMetric)
r.wg = wg
r.ticker = ticker
// Set hostname
hostname, err := os.Hostname()
if err != nil {
return err
// Drop domain part of host name
r.hostname = strings.SplitN(hostname, `.`, 2)[0]
// Read metric router config file
configFile, err := os.Open(routerConfigFile)
if err != nil {
cclog.ComponentError("MetricRouter", err.Error())
return err
defer configFile.Close()
jsonParser := json.NewDecoder(configFile)
err = jsonParser.Decode(&r.config)
if err != nil {
cclog.ComponentError("MetricRouter", err.Error())
return err
numIntervals := r.config.NumCacheIntervals
if numIntervals <= 0 {
numIntervals = 1
r.cache, err = NewCache(r.cache_input, r.ticker, &r.cachewg, numIntervals)
if err != nil {
cclog.ComponentError("MetricRouter", "MetricCache initialization failed:", err.Error())
return err
for _, agg := range r.config.IntervalAgg {
r.cache.AddAggregation(agg.Name, agg.Function, agg.Condition, agg.Tags, agg.Meta)
return nil
// StartTimer starts a timer which updates timestamp periodically
func (r *metricRouter) StartTimer() {
m := make(chan time.Time)
r.timerdone = make(chan bool)
go func() {
defer r.wg.Done()
for {
select {
case <-r.timerdone:
cclog.ComponentDebug("MetricRouter", "TIMER DONE")
case t := <-m:
r.timestamp = t
cclog.ComponentDebug("MetricRouter", "TIMER START")
// EvalCondition evaluates condition cond for metric data from point
func (r *metricRouter) EvalCondition(cond string, point lp.CCMetric) (bool, error) {
expression, err := govaluate.NewEvaluableExpression(cond)
if err != nil {
cclog.ComponentDebug("MetricRouter", cond, " = ", err.Error())
return false, err
// Add metric name, tags, meta data, fields and timestamp to the parameter list
params := make(map[string]interface{})
params["name"] = point.Name()
for _, t := range point.TagList() {
params[t.Key] = t.Value
for _, m := range point.MetaList() {
params[m.Key] = m.Value
for _, f := range point.FieldList() {
params[f.Key] = f.Value
params["timestamp"] = point.Time()
// evaluate condition
result, err := expression.Evaluate(params)
if err != nil {
cclog.ComponentDebug("MetricRouter", cond, " = ", err.Error())
return false, err
return bool(result.(bool)), err
// DoAddTags adds a tag when condition is fullfiled
func (r *metricRouter) DoAddTags(point lp.CCMetric) {
for _, m := range r.config.AddTags {
var conditionMatches bool
if m.Condition == "*" {
conditionMatches = true
} else {
var err error
conditionMatches, err = r.EvalCondition(m.Condition, point)
if err != nil {
cclog.ComponentError("MetricRouter", err.Error())
conditionMatches = false
if conditionMatches {
point.AddTag(m.Key, m.Value)
// DoDelTags removes a tag when condition is fullfiled
func (r *metricRouter) DoDelTags(point lp.CCMetric) {
for _, m := range r.config.DelTags {
var conditionMatches bool
if m.Condition == "*" {
conditionMatches = true
} else {
var err error
conditionMatches, err = r.EvalCondition(m.Condition, point)
if err != nil {
cclog.ComponentError("MetricRouter", err.Error())
conditionMatches = false
if conditionMatches {
// Start starts the metric router
func (r *metricRouter) Start() {
// start timer if configured
r.timestamp = time.Now()
if r.config.IntervalStamp {
// Router manager is done
done := func() {
cclog.ComponentDebug("MetricRouter", "DONE")
// Forward takes a received metric, adds or deletes tags
// and forwards it to the output channels
forward := func(point lp.CCMetric) {
cclog.ComponentDebug("MetricRouter", "FORWARD", point)
for _, o := range r.outputs {
o <- point
// Start Metric Cache
go func() {
defer r.wg.Done()
for {
select {
case <-r.done:
case p := <-r.coll_input:
// receive from metric collector
p.AddTag("hostname", r.hostname)
if r.config.IntervalStamp {
case p := <-r.recv_input:
// receive from receive manager
if r.config.IntervalStamp {
case p := <-r.cache_input:
// receive from metric collector
p.AddTag("hostname", r.hostname)
cclog.ComponentDebug("MetricRouter", "STARTED")
// AddCollectorInput adds a channel between metric collector and metric router
func (r *metricRouter) AddCollectorInput(input chan lp.CCMetric) {
r.coll_input = input
// AddReceiverInput adds a channel between metric receiver and metric router
func (r *metricRouter) AddReceiverInput(input chan lp.CCMetric) {
r.recv_input = input
// AddOutput adds a output channel to the metric router
func (r *metricRouter) AddOutput(output chan lp.CCMetric) {
r.outputs = append(r.outputs, output)
// Close finishes / stops the metric router
func (r *metricRouter) Close() {
cclog.ComponentDebug("MetricRouter", "CLOSE")
r.done <- true
// wait for close of channel r.done
if r.config.IntervalStamp {
cclog.ComponentDebug("MetricRouter", "TIMER CLOSE")
r.timerdone <- true
// wait for close of channel r.timerdone
// New creates a new initialized metric router
func New(ticker mct.MultiChanTicker, wg *sync.WaitGroup, routerConfigFile string) (MetricRouter, error) {
r := new(metricRouter)
err := r.Init(ticker, wg, routerConfigFile)
if err != nil {
return nil, err
return r, err