Add documentation

This commit is contained in:
Holger Obermaier 2022-01-26 11:38:43 +01:00
parent 7f77cad056
commit 9bd8a3a90b

View File

@ -2,38 +2,42 @@ package metricRouter
import ( import (
"encoding/json" "encoding/json"
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
"os" "os"
"sync" "sync"
"time" "time"
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
mct "github.com/ClusterCockpit/cc-metric-collector/internal/multiChanTicker" mct "github.com/ClusterCockpit/cc-metric-collector/internal/multiChanTicker"
"gopkg.in/Knetic/govaluate.v2" "gopkg.in/Knetic/govaluate.v2"
) )
// Metric router tag configuration
type metricRouterTagConfig struct { type metricRouterTagConfig struct {
Key string `json:"key"` Key string `json:"key"` // Tag name
Value string `json:"value"` Value string `json:"value"` // Tag value
Condition string `json:"if"` Condition string `json:"if"` // Condition for adding or removing corresponding tag
} }
// Metric router configuration
type metricRouterConfig struct { type metricRouterConfig struct {
AddTags []metricRouterTagConfig `json:"add_tags"` AddTags []metricRouterTagConfig `json:"add_tags"` // List of tags that are added when the condition is met
DelTags []metricRouterTagConfig `json:"delete_tags"` DelTags []metricRouterTagConfig `json:"delete_tags"` // List of tags that are removed when the condition is met
IntervalStamp bool `json:"interval_timestamp"` IntervalStamp bool `json:"interval_timestamp"` // Update timestamp periodically?
} }
type metricRouter struct { type metricRouter struct {
inputs []chan lp.CCMetric inputs []chan lp.CCMetric // List of all input channels
outputs []chan lp.CCMetric outputs []chan lp.CCMetric // List of all output channels
done chan bool done chan bool // channel to finish stop metric router
wg *sync.WaitGroup wg *sync.WaitGroup
timestamp time.Time timestamp time.Time // timestamp
ticker mct.MultiChanTicker ticker mct.MultiChanTicker
config metricRouterConfig config metricRouterConfig
} }
// MetricRouter access functions
type MetricRouter interface { type MetricRouter interface {
Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, routerConfigFile string) error Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, routerConfigFile string) error
AddInput(input chan lp.CCMetric) AddInput(input chan lp.CCMetric)
@ -42,6 +46,12 @@ type MetricRouter interface {
Close() Close()
} }
// Init initializes a metric router by setting up:
// * input and output channels
// * done channel
// * wait group synchronization (from variable wg)
// * ticker (from variable ticker)
// * configuration (read from config file in variable routerConfigFile)
func (r *metricRouter) Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, routerConfigFile string) error { func (r *metricRouter) Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, routerConfigFile string) error {
r.inputs = make([]chan lp.CCMetric, 0) r.inputs = make([]chan lp.CCMetric, 0)
r.outputs = make([]chan lp.CCMetric, 0) r.outputs = make([]chan lp.CCMetric, 0)
@ -63,25 +73,27 @@ func (r *metricRouter) Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, rout
return nil return nil
} }
// StartTimer starts a timer which updates timestamp periodically
func (r *metricRouter) StartTimer() { func (r *metricRouter) StartTimer() {
m := make(chan time.Time) m := make(chan time.Time)
r.ticker.AddChannel(m) r.ticker.AddChannel(m)
go func() { go func() {
for { for {
select { t := <-m
case t := <-m:
r.timestamp = t r.timestamp = t
} }
}
}() }()
} }
// EvalCondition evaluates condition Cond for metric data from point
func (r *metricRouter) EvalCondition(Cond string, point lp.CCMetric) (bool, error) { func (r *metricRouter) EvalCondition(Cond string, point lp.CCMetric) (bool, error) {
expression, err := govaluate.NewEvaluableExpression(Cond) expression, err := govaluate.NewEvaluableExpression(Cond)
if err != nil { if err != nil {
cclog.ComponentDebug("MetricRouter", Cond, " = ", err.Error()) cclog.ComponentDebug("MetricRouter", Cond, " = ", err.Error())
return false, err return false, err
} }
// Add metric name, tags, meta data, fields and timestamp to the parameter list
params := make(map[string]interface{}) params := make(map[string]interface{})
params["name"] = point.Name() params["name"] = point.Name()
for _, t := range point.TagList() { for _, t := range point.TagList() {
@ -95,6 +107,7 @@ func (r *metricRouter) EvalCondition(Cond string, point lp.CCMetric) (bool, erro
} }
params["timestamp"] = point.Time() params["timestamp"] = point.Time()
// evaluate condition
result, err := expression.Evaluate(params) result, err := expression.Evaluate(params)
if err != nil { if err != nil {
cclog.ComponentDebug("MetricRouter", Cond, " = ", err.Error()) cclog.ComponentDebug("MetricRouter", Cond, " = ", err.Error())
@ -103,6 +116,7 @@ func (r *metricRouter) EvalCondition(Cond string, point lp.CCMetric) (bool, erro
return bool(result.(bool)), err return bool(result.(bool)), err
} }
// DoAddTags adds a tag when condition is fullfiled
func (r *metricRouter) DoAddTags(point lp.CCMetric) { func (r *metricRouter) DoAddTags(point lp.CCMetric) {
for _, m := range r.config.AddTags { for _, m := range r.config.AddTags {
var conditionMatches bool var conditionMatches bool
@ -123,6 +137,7 @@ func (r *metricRouter) DoAddTags(point lp.CCMetric) {
} }
} }
// DoDelTags removes a tag when condition is fullfiled
func (r *metricRouter) DoDelTags(point lp.CCMetric) { func (r *metricRouter) DoDelTags(point lp.CCMetric) {
for _, m := range r.config.DelTags { for _, m := range r.config.DelTags {
var conditionMatches bool var conditionMatches bool
@ -143,6 +158,7 @@ func (r *metricRouter) DoDelTags(point lp.CCMetric) {
} }
} }
// Start starts the metric router
func (r *metricRouter) Start() { func (r *metricRouter) Start() {
r.wg.Add(1) r.wg.Add(1)
r.timestamp = time.Now() r.timestamp = time.Now()