diff --git a/internal/metricRouter/metricRouter.go b/internal/metricRouter/metricRouter.go index 5fd55ba..dc7703a 100644 --- a/internal/metricRouter/metricRouter.go +++ b/internal/metricRouter/metricRouter.go @@ -2,38 +2,42 @@ package metricRouter import ( "encoding/json" - cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" "os" "sync" "time" + cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" + lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" mct "github.com/ClusterCockpit/cc-metric-collector/internal/multiChanTicker" "gopkg.in/Knetic/govaluate.v2" ) +// Metric router tag configuration type metricRouterTagConfig struct { - Key string `json:"key"` - Value string `json:"value"` - Condition string `json:"if"` + Key string `json:"key"` // Tag name + Value string `json:"value"` // Tag value + Condition string `json:"if"` // Condition for adding or removing corresponding tag } +// Metric router configuration type metricRouterConfig struct { - AddTags []metricRouterTagConfig `json:"add_tags"` - DelTags []metricRouterTagConfig `json:"delete_tags"` - IntervalStamp bool `json:"interval_timestamp"` + AddTags []metricRouterTagConfig `json:"add_tags"` // List of tags that are added when the condition is met + DelTags []metricRouterTagConfig `json:"delete_tags"` // List of tags that are removed when the condition is met + IntervalStamp bool `json:"interval_timestamp"` // Update timestamp periodically? } type metricRouter struct { - inputs []chan lp.CCMetric - outputs []chan lp.CCMetric - done chan bool + inputs []chan lp.CCMetric // List of all input channels + outputs []chan lp.CCMetric // List of all output channels + done chan bool // channel to finish stop metric router wg *sync.WaitGroup - timestamp time.Time + timestamp time.Time // timestamp ticker mct.MultiChanTicker config metricRouterConfig } +// MetricRouter access functions type MetricRouter interface { Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, routerConfigFile string) error AddInput(input chan lp.CCMetric) @@ -42,6 +46,12 @@ type MetricRouter interface { Close() } +// Init initializes a metric router by setting up: +// * input and output channels +// * done channel +// * wait group synchronization (from variable wg) +// * ticker (from variable ticker) +// * configuration (read from config file in variable routerConfigFile) func (r *metricRouter) Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, routerConfigFile string) error { r.inputs = make([]chan lp.CCMetric, 0) r.outputs = make([]chan lp.CCMetric, 0) @@ -63,25 +73,27 @@ func (r *metricRouter) Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, rout return nil } +// StartTimer starts a timer which updates timestamp periodically func (r *metricRouter) StartTimer() { m := make(chan time.Time) r.ticker.AddChannel(m) go func() { for { - select { - case t := <-m: - r.timestamp = t - } + t := <-m + r.timestamp = t } }() } +// EvalCondition evaluates condition Cond for metric data from point func (r *metricRouter) EvalCondition(Cond string, point lp.CCMetric) (bool, error) { expression, err := govaluate.NewEvaluableExpression(Cond) if err != nil { cclog.ComponentDebug("MetricRouter", Cond, " = ", err.Error()) return false, err } + + // Add metric name, tags, meta data, fields and timestamp to the parameter list params := make(map[string]interface{}) params["name"] = point.Name() for _, t := range point.TagList() { @@ -95,6 +107,7 @@ func (r *metricRouter) EvalCondition(Cond string, point lp.CCMetric) (bool, erro } params["timestamp"] = point.Time() + // evaluate condition result, err := expression.Evaluate(params) if err != nil { cclog.ComponentDebug("MetricRouter", Cond, " = ", err.Error()) @@ -103,6 +116,7 @@ func (r *metricRouter) EvalCondition(Cond string, point lp.CCMetric) (bool, erro return bool(result.(bool)), err } +// DoAddTags adds a tag when condition is fullfiled func (r *metricRouter) DoAddTags(point lp.CCMetric) { for _, m := range r.config.AddTags { var conditionMatches bool @@ -123,6 +137,7 @@ func (r *metricRouter) DoAddTags(point lp.CCMetric) { } } +// DoDelTags removes a tag when condition is fullfiled func (r *metricRouter) DoDelTags(point lp.CCMetric) { for _, m := range r.config.DelTags { var conditionMatches bool @@ -143,6 +158,7 @@ func (r *metricRouter) DoDelTags(point lp.CCMetric) { } } +// Start starts the metric router func (r *metricRouter) Start() { r.wg.Add(1) r.timestamp = time.Now()