diff --git a/internal/metricRouter/metricRouter.go b/internal/metricRouter/metricRouter.go index f76c31f..57ba708 100644 --- a/internal/metricRouter/metricRouter.go +++ b/internal/metricRouter/metricRouter.go @@ -28,19 +28,22 @@ type metricRouterConfig struct { } type metricRouter struct { - inputs []chan lp.CCMetric // List of all input channels - outputs []chan lp.CCMetric // List of all output channels - done chan bool // channel to finish / stop metric router - wg *sync.WaitGroup - timestamp time.Time // timestamp - ticker mct.MultiChanTicker - config metricRouterConfig + coll_input chan lp.CCMetric // Input channel from CollectorManager + recv_input chan lp.CCMetric // Input channel from ReceiveManager + outputs []chan lp.CCMetric // List of all output channels + done chan bool // channel to finish / stop metric router + wg *sync.WaitGroup + timestamp time.Time // timestamp + timerdone chan bool // channel to finish / stop timestamp updater + ticker mct.MultiChanTicker + config metricRouterConfig } // MetricRouter access functions type MetricRouter interface { Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, routerConfigFile string) error - AddInput(input chan lp.CCMetric) + AddCollectorInput(input chan lp.CCMetric) + AddReceiverInput(input chan lp.CCMetric) AddOutput(output chan lp.CCMetric) Start() Close() @@ -53,7 +56,6 @@ type MetricRouter interface { // * ticker (from variable ticker) // * configuration (read from config file in variable routerConfigFile) func (r *metricRouter) Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, routerConfigFile string) error { - r.inputs = make([]chan lp.CCMetric, 0) r.outputs = make([]chan lp.CCMetric, 0) r.done = make(chan bool) r.wg = wg @@ -77,12 +79,19 @@ func (r *metricRouter) Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, rout func (r *metricRouter) StartTimer() { m := make(chan time.Time) r.ticker.AddChannel(m) + r.timerdone = make(chan bool) go func() { for { - t := <-m - r.timestamp = t + select { + case <-r.timerdone: + cclog.ComponentDebug("MetricRouter", "TIMER DONE") + return + case t := <-m: + r.timestamp = t + } } }() + cclog.ComponentDebug("MetricRouter", "TIMER START") } // EvalCondition evaluates condition Cond for metric data from point @@ -165,35 +174,35 @@ func (r *metricRouter) Start() { if r.config.IntervalStamp { r.StartTimer() } + done := func() { + r.wg.Done() + cclog.ComponentDebug("MetricRouter", "DONE") + } + forward := func(point lp.CCMetric) { + cclog.ComponentDebug("MetricRouter", "FORWARD", point) + r.DoAddTags(point) + r.DoDelTags(point) + for _, o := range r.outputs { + o <- point + } + } go func() { for { - RouterLoop: + // RouterLoop: select { case <-r.done: - cclog.ComponentDebug("MetricRouter", "DONE") - r.wg.Done() - break RouterLoop - default: - for _, c := range r.inputs { - RouterInputLoop: - select { - case <-r.done: - cclog.ComponentDebug("MetricRouter", "DONE") - r.wg.Done() - break RouterInputLoop - case p := <-c: - cclog.ComponentDebug("MetricRouter", "FORWARD", p) - r.DoAddTags(p) - r.DoDelTags(p) - if r.config.IntervalStamp { - p.SetTime(r.timestamp) - } - for _, o := range r.outputs { - o <- p - } - default: - } + done() + return + case p := <-r.coll_input: + if r.config.IntervalStamp { + p.SetTime(r.timestamp) } + forward(p) + case p := <-r.recv_input: + if r.config.IntervalStamp { + p.SetTime(r.timestamp) + } + forward(p) } } }() @@ -201,8 +210,12 @@ func (r *metricRouter) Start() { } // AddInput adds a input channel to the metric router -func (r *metricRouter) AddInput(input chan lp.CCMetric) { - r.inputs = append(r.inputs, input) +func (r *metricRouter) AddCollectorInput(input chan lp.CCMetric) { + r.coll_input = input +} + +func (r *metricRouter) AddReceiverInput(input chan lp.CCMetric) { + r.recv_input = input } // AddOutput adds a output channel to the metric router @@ -212,7 +225,17 @@ func (r *metricRouter) AddOutput(output chan lp.CCMetric) { // Close finishes / stops the metric router func (r *metricRouter) Close() { - r.done <- true + select { + case r.done <- true: + default: + } + if r.config.IntervalStamp { + cclog.ComponentDebug("MetricRouter", "TIMER CLOSE") + select { + case r.timerdone <- true: + default: + } + } cclog.ComponentDebug("MetricRouter", "CLOSE") } diff --git a/internal/multiChanTicker/multiChanTicker.go b/internal/multiChanTicker/multiChanTicker.go index f8139fa..37778ad 100644 --- a/internal/multiChanTicker/multiChanTicker.go +++ b/internal/multiChanTicker/multiChanTicker.go @@ -1,27 +1,43 @@ package multiChanTicker import ( + cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" "time" ) type multiChanTicker struct { ticker *time.Ticker channels []chan time.Time + done chan bool } type MultiChanTicker interface { Init(duration time.Duration) AddChannel(chan time.Time) + Close() } func (t *multiChanTicker) Init(duration time.Duration) { t.ticker = time.NewTicker(duration) + t.done = make(chan bool) go func() { + done := func() { + cclog.ComponentDebug("MultiChanTicker", "DONE") + } for { select { + case <-t.done: + done() + return case ts := <-t.ticker.C: + cclog.ComponentDebug("MultiChanTicker", "Tick", ts) for _, c := range t.channels { - c <- ts + select { + case <-t.done: + done() + return + case c <- ts: + } } } } @@ -32,6 +48,11 @@ func (t *multiChanTicker) AddChannel(channel chan time.Time) { t.channels = append(t.channels, channel) } +func (t *multiChanTicker) Close() { + t.done <- true + cclog.ComponentDebug("MultiChanTicker", "CLOSE") +} + func NewTicker(duration time.Duration) MultiChanTicker { t := &multiChanTicker{} t.Init(duration) diff --git a/metric-collector.go b/metric-collector.go index 25989ed..6a6c1b3 100644 --- a/metric-collector.go +++ b/metric-collector.go @@ -3,7 +3,6 @@ package main import ( "encoding/json" "flag" -// "log" "os" "os/signal" "strings" @@ -158,6 +157,7 @@ func ReadCli() map[string]string { // General shutdown function that gets executed in case of interrupt or graceful shutdown func shutdown(config *RuntimeConfig) { cclog.Info("Shutdown...") + config.Ticker.Close() if config.CollectManager != nil { cclog.Debug("Shutdown CollectManager...") config.CollectManager.Close() @@ -228,7 +228,7 @@ func mainFunc() int { // err = CreatePidfile(rcfg.CliArgs["pidfile"]) if rcfg.CliArgs["logfile"] != "stderr" { - cclog.SetOutput(rcfg.CliArgs["logfile"]) + cclog.SetOutput(rcfg.CliArgs["logfile"]) } // err = SetLogging(rcfg.CliArgs["logfile"]) // if err != nil { @@ -261,7 +261,7 @@ func mainFunc() int { } CollectToRouterChannel := make(chan lp.CCMetric) rcfg.CollectManager.AddOutput(CollectToRouterChannel) - rcfg.Router.AddInput(CollectToRouterChannel) + rcfg.Router.AddCollectorInput(CollectToRouterChannel) } if len(rcfg.ConfigFile.ReceiverConfigFile) > 0 { rcfg.ReceiveManager, err = receivers.New(&rcfg.Sync, rcfg.ConfigFile.ReceiverConfigFile) @@ -271,7 +271,7 @@ func mainFunc() int { } ReceiveToRouterChannel := make(chan lp.CCMetric) rcfg.ReceiveManager.AddOutput(ReceiveToRouterChannel) - rcfg.Router.AddInput(ReceiveToRouterChannel) + rcfg.Router.AddReceiverInput(ReceiveToRouterChannel) use_recv = true } prepare_shutdown(&rcfg)