Use central timer for collectors and router. Add expressions to router

This commit is contained in:
Thomas Roehl
2021-12-20 17:40:28 +01:00
parent 44d8b0c979
commit b7fbd198ff
4 changed files with 152 additions and 19 deletions

View File

@@ -7,6 +7,7 @@ import (
"log"
"os"
"encoding/json"
mct "github.com/ClusterCockpit/cc-metric-collector/internal/multiChanTicker"
)
@@ -31,26 +32,26 @@ type collectorManager struct {
collectors []MetricCollector
output chan lp.CCMetric
done chan bool
interval time.Duration
ticker mct.MultiChanTicker
duration time.Duration
wg *sync.WaitGroup
config map[string]json.RawMessage
}
type CollectorManager interface {
Init(interval time.Duration, duration time.Duration, wg *sync.WaitGroup, collectConfigFile string) error
Init(ticker mct.MultiChanTicker, duration time.Duration, wg *sync.WaitGroup, collectConfigFile string) error
AddOutput(output chan lp.CCMetric)
Start()
Close()
}
func (cm *collectorManager) Init(interval time.Duration, duration time.Duration, wg *sync.WaitGroup, collectConfigFile string) error {
func (cm *collectorManager) Init(ticker mct.MultiChanTicker, duration time.Duration, wg *sync.WaitGroup, collectConfigFile string) error {
cm.collectors = make([]MetricCollector, 0)
cm.output = nil
cm.done = make(chan bool)
cm.wg = wg
cm.interval = interval
cm.ticker = ticker
cm.duration = duration
configFile, err := os.Open(collectConfigFile)
if err != nil {
@@ -84,7 +85,8 @@ func (cm *collectorManager) Init(interval time.Duration, duration time.Duration,
func (cm *collectorManager) Start() {
cm.wg.Add(1)
ticker := time.NewTicker(cm.interval)
tick := make(chan time.Time)
cm.ticker.AddChannel(tick)
go func() {
for {
CollectorManagerLoop:
@@ -96,7 +98,7 @@ CollectorManagerLoop:
cm.wg.Done()
log.Print("[CollectorManager] DONE\n")
break CollectorManagerLoop
case t := <-ticker.C:
case t := <- tick:
for _, c := range cm.collectors {
CollectorManagerInputLoop:
select {
@@ -128,9 +130,9 @@ func (cm *collectorManager) Close() {
log.Print("[CollectorManager] CLOSE")
}
func New(interval time.Duration, duration time.Duration, wg *sync.WaitGroup, collectConfigFile string) (CollectorManager, error) {
func New(ticker mct.MultiChanTicker, duration time.Duration, wg *sync.WaitGroup, collectConfigFile string) (CollectorManager, error) {
cm := &collectorManager{}
err := cm.Init(interval, duration, wg, collectConfigFile)
err := cm.Init(ticker, duration, wg, collectConfigFile)
if err != nil {
return nil, err
}