diff --git a/collectors/collectorManager.go b/collectors/collectorManager.go index 0b2dfcc..7b0a9b7 100644 --- a/collectors/collectorManager.go +++ b/collectors/collectorManager.go @@ -102,18 +102,18 @@ func (cm *collectorManager) Init(ticker mct.MultiChanTicker, duration time.Durat // Start starts the metric collector manager func (cm *collectorManager) Start() { - cm.wg.Add(1) tick := make(chan time.Time) cm.ticker.AddChannel(tick) + cm.wg.Add(1) go func() { + defer cm.wg.Done() // Collector manager is done done := func() { // close all metric collectors for _, c := range cm.collectors { c.Close() } - cm.wg.Done() cclog.ComponentDebug("CollectorManager", "DONE") } diff --git a/internal/metricRouter/metricRouter.go b/internal/metricRouter/metricRouter.go index 6327d95..a321aae 100644 --- a/internal/metricRouter/metricRouter.go +++ b/internal/metricRouter/metricRouter.go @@ -80,7 +80,10 @@ func (r *metricRouter) StartTimer() { m := make(chan time.Time) r.ticker.AddChannel(m) r.timerdone = make(chan bool) + + r.wg.Add(1) go func() { + defer r.wg.Done() for { select { case <-r.timerdone: @@ -169,13 +172,11 @@ func (r *metricRouter) DoDelTags(point lp.CCMetric) { // Start starts the metric router func (r *metricRouter) Start() { - r.wg.Add(1) r.timestamp = time.Now() if r.config.IntervalStamp { r.StartTimer() } done := func() { - r.wg.Done() cclog.ComponentDebug("MetricRouter", "DONE") } forward := func(point lp.CCMetric) { @@ -186,7 +187,10 @@ func (r *metricRouter) Start() { o <- point } } + + r.wg.Add(1) go func() { + defer r.wg.Done() for { // RouterLoop: select { diff --git a/metric-collector.go b/metric-collector.go index 0cd368a..3975b62 100644 --- a/metric-collector.go +++ b/metric-collector.go @@ -51,25 +51,16 @@ type RuntimeConfig struct { CliArgs map[string]string ConfigFile CentralConfigFile - Router mr.MetricRouter - CollectManager collectors.CollectorManager - SinkManager sinks.SinkManager - ReceiveManager receivers.ReceiveManager - Ticker mct.MultiChanTicker + MetricRouter mr.MetricRouter + CollectManager collectors.CollectorManager + SinkManager sinks.SinkManager + ReceiveManager receivers.ReceiveManager + MultiChanTicker mct.MultiChanTicker Channels []chan lp.CCMetric Sync sync.WaitGroup } -func prepare_runcfg() RuntimeConfig { - return RuntimeConfig{ - Router: nil, - CollectManager: nil, - SinkManager: nil, - ReceiveManager: nil, - } -} - //// Structure of the configuration file //type GlobalConfig struct { // Sink sinks.SinkConfig `json:"sink"` @@ -157,8 +148,9 @@ func ReadCli() map[string]string { // General shutdownHandler function that gets executed in case of interrupt or graceful shutdownHandler func shutdownHandler(config *RuntimeConfig, shutdownSignal chan os.Signal) { - <-shutdownSignal + defer config.Sync.Done() + <-shutdownSignal // Remove shutdown handler // every additional interrupt signal will stop without cleaning up signal.Stop(shutdownSignal) @@ -166,7 +158,7 @@ func shutdownHandler(config *RuntimeConfig, shutdownSignal chan os.Signal) { cclog.Info("Shutdown...") cclog.Debug("Shutdown Ticker...") - config.Ticker.Close() + config.MultiChanTicker.Close() if config.CollectManager != nil { cclog.Debug("Shutdown CollectManager...") @@ -176,9 +168,9 @@ func shutdownHandler(config *RuntimeConfig, shutdownSignal chan os.Signal) { cclog.Debug("Shutdown ReceiveManager...") config.ReceiveManager.Close() } - if config.Router != nil { + if config.MetricRouter != nil { cclog.Debug("Shutdown Router...") - config.Router.Close() + config.MetricRouter.Close() } if config.SinkManager != nil { cclog.Debug("Shutdown SinkManager...") @@ -189,15 +181,20 @@ func shutdownHandler(config *RuntimeConfig, shutdownSignal chan os.Signal) { // RemovePidfile(pidfile) // pidfile = config.CliArgs["pidfile"] // RemovePidfile(pidfile) - config.Sync.Done() } func mainFunc() int { var err error use_recv := false - rcfg := prepare_runcfg() - rcfg.CliArgs = ReadCli() + // Initialize runtime configuration + rcfg := RuntimeConfig{ + MetricRouter: nil, + CollectManager: nil, + SinkManager: nil, + ReceiveManager: nil, + CliArgs: ReadCli(), + } // Load and check configuration err = LoadCentralConfiguration(rcfg.CliArgs["configfile"], &rcfg.ConfigFile) @@ -225,61 +222,75 @@ func mainFunc() int { rcfg.Hostname = strings.SplitN(rcfg.Hostname, `.`, 2)[0] // err = CreatePidfile(rcfg.CliArgs["pidfile"]) - if rcfg.CliArgs["logfile"] != "stderr" { - cclog.SetOutput(rcfg.CliArgs["logfile"]) + // Set log file + if logfile := rcfg.CliArgs["logfile"]; logfile != "stderr" { + cclog.SetOutput(logfile) } - // err = SetLogging(rcfg.CliArgs["logfile"]) - // if err != nil { - // log.Print("Error setting up logging system to ", rcfg.CliArgs["logfile"], " on ", rcfg.Hostname) - // return - // } - rcfg.Ticker = mct.NewTicker(rcfg.Interval) + + // Creat new multi channel ticker + rcfg.MultiChanTicker = mct.NewTicker(rcfg.Interval) + + // Create new metric router if len(rcfg.ConfigFile.RouterConfigFile) > 0 { - rcfg.Router, err = mr.New(rcfg.Ticker, &rcfg.Sync, rcfg.ConfigFile.RouterConfigFile) + rcfg.MetricRouter, err = mr.New(rcfg.MultiChanTicker, &rcfg.Sync, rcfg.ConfigFile.RouterConfigFile) if err != nil { cclog.Error(err.Error()) return 1 } } + + // Create new sink if len(rcfg.ConfigFile.SinkConfigFile) > 0 { rcfg.SinkManager, err = sinks.New(&rcfg.Sync, rcfg.ConfigFile.SinkConfigFile) if err != nil { cclog.Error(err.Error()) return 1 } + + // Connect metric router to sink manager RouterToSinksChannel := make(chan lp.CCMetric, 200) rcfg.SinkManager.AddInput(RouterToSinksChannel) - rcfg.Router.AddOutput(RouterToSinksChannel) + rcfg.MetricRouter.AddOutput(RouterToSinksChannel) } + + // Create new collector manager if len(rcfg.ConfigFile.CollectorConfigFile) > 0 { - rcfg.CollectManager, err = collectors.New(rcfg.Ticker, rcfg.Duration, &rcfg.Sync, rcfg.ConfigFile.CollectorConfigFile) + rcfg.CollectManager, err = collectors.New(rcfg.MultiChanTicker, rcfg.Duration, &rcfg.Sync, rcfg.ConfigFile.CollectorConfigFile) if err != nil { cclog.Error(err.Error()) return 1 } + + // Connect collector manager to metric router CollectToRouterChannel := make(chan lp.CCMetric, 200) rcfg.CollectManager.AddOutput(CollectToRouterChannel) - rcfg.Router.AddCollectorInput(CollectToRouterChannel) + rcfg.MetricRouter.AddCollectorInput(CollectToRouterChannel) } + + // Create new receive manager if len(rcfg.ConfigFile.ReceiverConfigFile) > 0 { rcfg.ReceiveManager, err = receivers.New(&rcfg.Sync, rcfg.ConfigFile.ReceiverConfigFile) if err != nil { cclog.Error(err.Error()) return 1 } + + // Connect receive manager to metric router ReceiveToRouterChannel := make(chan lp.CCMetric, 200) rcfg.ReceiveManager.AddOutput(ReceiveToRouterChannel) - rcfg.Router.AddReceiverInput(ReceiveToRouterChannel) + rcfg.MetricRouter.AddReceiverInput(ReceiveToRouterChannel) use_recv = true } + // Create shutdown handler shutdownSignal := make(chan os.Signal, 1) signal.Notify(shutdownSignal, os.Interrupt) signal.Notify(shutdownSignal, syscall.SIGTERM) + rcfg.Sync.Add(1) go shutdownHandler(&rcfg, shutdownSignal) - rcfg.Sync.Add(1) - rcfg.Router.Start() + // Start the managers + rcfg.MetricRouter.Start() rcfg.SinkManager.Start() rcfg.CollectManager.Start() @@ -294,8 +305,9 @@ func mainFunc() int { shutdownSignal <- os.Interrupt } - // Wait until shutdownHandler is executed + // Wait that all goroutines finish rcfg.Sync.Wait() + return 0 }