From b7fbd198ff37b8fef534a2be7b3c667bf5771f9d Mon Sep 17 00:00:00 2001 From: Thomas Roehl Date: Mon, 20 Dec 2021 17:40:28 +0100 Subject: [PATCH] Use central timer for collectors and router. Add expressions to router --- collectors/collectorManager.go | 18 ++-- internal/ccMetric/ccMetric.go | 1 + internal/metricRouter/metricRouter.go | 113 ++++++++++++++++++-- internal/multiChanTicker/multiChanTicker.go | 39 +++++++ 4 files changed, 152 insertions(+), 19 deletions(-) create mode 100644 internal/multiChanTicker/multiChanTicker.go diff --git a/collectors/collectorManager.go b/collectors/collectorManager.go index 1bd0d6f..cd4cc1f 100644 --- a/collectors/collectorManager.go +++ b/collectors/collectorManager.go @@ -7,6 +7,7 @@ import ( "log" "os" "encoding/json" + mct "github.com/ClusterCockpit/cc-metric-collector/internal/multiChanTicker" ) @@ -31,26 +32,26 @@ type collectorManager struct { collectors []MetricCollector output chan lp.CCMetric done chan bool - interval time.Duration + ticker mct.MultiChanTicker duration time.Duration wg *sync.WaitGroup config map[string]json.RawMessage } type CollectorManager interface { - Init(interval time.Duration, duration time.Duration, wg *sync.WaitGroup, collectConfigFile string) error + Init(ticker mct.MultiChanTicker, duration time.Duration, wg *sync.WaitGroup, collectConfigFile string) error AddOutput(output chan lp.CCMetric) Start() Close() } -func (cm *collectorManager) Init(interval time.Duration, duration time.Duration, wg *sync.WaitGroup, collectConfigFile string) error { +func (cm *collectorManager) Init(ticker mct.MultiChanTicker, duration time.Duration, wg *sync.WaitGroup, collectConfigFile string) error { cm.collectors = make([]MetricCollector, 0) cm.output = nil cm.done = make(chan bool) cm.wg = wg - cm.interval = interval + cm.ticker = ticker cm.duration = duration configFile, err := os.Open(collectConfigFile) if err != nil { @@ -84,7 +85,8 @@ func (cm *collectorManager) Init(interval time.Duration, duration time.Duration, func (cm *collectorManager) Start() { cm.wg.Add(1) - ticker := time.NewTicker(cm.interval) + tick := make(chan time.Time) + cm.ticker.AddChannel(tick) go func() { for { CollectorManagerLoop: @@ -96,7 +98,7 @@ CollectorManagerLoop: cm.wg.Done() log.Print("[CollectorManager] DONE\n") break CollectorManagerLoop - case t := <-ticker.C: + case t := <- tick: for _, c := range cm.collectors { CollectorManagerInputLoop: select { @@ -128,9 +130,9 @@ func (cm *collectorManager) Close() { log.Print("[CollectorManager] CLOSE") } -func New(interval time.Duration, duration time.Duration, wg *sync.WaitGroup, collectConfigFile string) (CollectorManager, error) { +func New(ticker mct.MultiChanTicker, duration time.Duration, wg *sync.WaitGroup, collectConfigFile string) (CollectorManager, error) { cm := &collectorManager{} - err := cm.Init(interval, duration, wg, collectConfigFile) + err := cm.Init(ticker, duration, wg, collectConfigFile) if err != nil { return nil, err } diff --git a/internal/ccMetric/ccMetric.go b/internal/ccMetric/ccMetric.go index 31c813c..c5864de 100644 --- a/internal/ccMetric/ccMetric.go +++ b/internal/ccMetric/ccMetric.go @@ -23,6 +23,7 @@ type CCMetric interface { lp.MutableMetric AddMeta(key, value string) MetaList() []*lp.Tag + RemoveTag(key string) } func (m *ccMetric) Meta() map[string]string { diff --git a/internal/metricRouter/metricRouter.go b/internal/metricRouter/metricRouter.go index 7c4d6e3..9e2d4f9 100644 --- a/internal/metricRouter/metricRouter.go +++ b/internal/metricRouter/metricRouter.go @@ -6,6 +6,9 @@ import ( "log" "encoding/json" "os" + "time" + "gopkg.in/Knetic/govaluate.v2" + mct "github.com/ClusterCockpit/cc-metric-collector/internal/multiChanTicker" ) type metricRounterTagConfig struct { @@ -25,29 +28,27 @@ type metricRouter struct { outputs []chan lp.CCMetric done chan bool wg *sync.WaitGroup + timestamp time.Time + ticker mct.MultiChanTicker config metricRouterConfig } type MetricRouter interface { - Init(routerDone chan bool, wg *sync.WaitGroup) error + Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, routerConfigFile string) error AddInput(input chan lp.CCMetric) AddOutput(output chan lp.CCMetric) - ReadConfig(filename string) error Start() Close() } -func (r *metricRouter) Init(routerDone chan bool, wg *sync.WaitGroup) error { +func (r *metricRouter) Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, routerConfigFile string) error { r.inputs = make([]chan lp.CCMetric, 0) r.outputs = make([]chan lp.CCMetric, 0) - r.done = routerDone + r.done = make(chan bool) r.wg = wg - return nil -} - -func (r *metricRouter) ReadConfig(filename string) error { - configFile, err := os.Open(filename) + r.ticker = ticker + configFile, err := os.Open(routerConfigFile) if err != nil { log.Print(err.Error()) return err @@ -62,8 +63,93 @@ func (r *metricRouter) ReadConfig(filename string) error { return nil } +func (r *metricRouter) StartTimer() { + m := make(chan time.Time) + r.ticker.AddChannel(m) + go func() { + for { + select { + case t := <- m: + r.timestamp = t + } + } + }() +} + +func (r *metricRouter) EvalCondition(Cond string, point lp.CCMetric) (bool, error){ + expression, err := govaluate.NewEvaluableExpression(Cond) + if err != nil { + log.Print(Cond, " = ", err.Error()) + return false, err + } + params := make(map[string]interface{}) + params["name"] = point.Name() + for _,t := range point.TagList() { + params[t.Key] = t.Value + } + for _,m := range point.MetaList() { + params[m.Key] = m.Value + } + for _,f := range point.FieldList() { + params[f.Key] = f.Value + } + params["timestamp"] = point.Time() + + result, err := expression.Evaluate(params) + if err != nil { + log.Print(Cond, " = ", err.Error()) + return false, err + } + return bool(result.(bool)), err +} + +func (r *metricRouter) DoAddTags(point lp.CCMetric) { + for _, m := range r.config.AddTags { + var res bool + var err error + + if m.Condition == "*" { + res = true + err = nil + } else { + res, err = r.EvalCondition(m.Condition, point) + if err != nil { + log.Print(err.Error()) + res = false + } + } + if res == true { + point.AddTag(m.Key, m.Value) + } + } +} + +func (r *metricRouter) DoDelTags(point lp.CCMetric) { + for _, m := range r.config.DelTags { + var res bool + var err error + if m.Condition == "*" { + res = true + err = nil + } else { + res, err = r.EvalCondition(m.Condition, point) + if err != nil { + log.Print(err.Error()) + res = false + } + } + if res == true { + point.RemoveTag(m.Key) + } + } +} + func (r *metricRouter) Start() { r.wg.Add(1) + r.timestamp = time.Now() + if r.config.IntervalStamp == true { + r.StartTimer() + } go func() { for { RouterLoop: @@ -82,6 +168,11 @@ RouterInputLoop: break RouterInputLoop case p := <- c: log.Print("[MetricRouter] FORWARD ",p) + r.DoAddTags(p) + r.DoDelTags(p) + if r.config.IntervalStamp == true { + p.SetTime(r.timestamp) + } for _, o := range r.outputs { o <- p } @@ -108,9 +199,9 @@ func (r *metricRouter) Close() { log.Print("[MetricRouter] CLOSE\n") } -func New(done chan bool, wg *sync.WaitGroup) (MetricRouter, error) { +func New(ticker mct.MultiChanTicker, wg *sync.WaitGroup, routerConfigFile string) (MetricRouter, error) { r := &metricRouter{} - err := r.Init(done, wg) + err := r.Init(ticker, wg, routerConfigFile) if err != nil { return nil, err } diff --git a/internal/multiChanTicker/multiChanTicker.go b/internal/multiChanTicker/multiChanTicker.go new file mode 100644 index 0000000..f063af4 --- /dev/null +++ b/internal/multiChanTicker/multiChanTicker.go @@ -0,0 +1,39 @@ +package multiChanTicker + +import ( + "time" +) + +type multiChanTicker struct { + ticker *time.Ticker + channels []chan time.Time +} + +type MultiChanTicker interface { + Init(duration time.Duration) + AddChannel(chan time.Time) +} + +func (t *multiChanTicker) Init(duration time.Duration) { + t.ticker = time.NewTicker(duration) + go func() { + for { + select { + case ts := <-t.ticker.C: + for _, c := range t.channels { + c <- ts + } + } + } + }() +} + +func (t *multiChanTicker) AddChannel(channel chan time.Time) { + t.channels = append(t.channels, channel) +} + +func NewTicker(duration time.Duration) MultiChanTicker { + t := &multiChanTicker{} + t.Init(duration) + return t +}