Place wait group Add() and Done() near to each other

This commit is contained in:
Holger Obermaier 2022-01-27 20:45:22 +01:00
parent b9236dcc31
commit aea3e2c6b1
3 changed files with 57 additions and 41 deletions

View File

@ -102,18 +102,18 @@ func (cm *collectorManager) Init(ticker mct.MultiChanTicker, duration time.Durat
// Start starts the metric collector manager // Start starts the metric collector manager
func (cm *collectorManager) Start() { func (cm *collectorManager) Start() {
cm.wg.Add(1)
tick := make(chan time.Time) tick := make(chan time.Time)
cm.ticker.AddChannel(tick) cm.ticker.AddChannel(tick)
cm.wg.Add(1)
go func() { go func() {
defer cm.wg.Done()
// Collector manager is done // Collector manager is done
done := func() { done := func() {
// close all metric collectors // close all metric collectors
for _, c := range cm.collectors { for _, c := range cm.collectors {
c.Close() c.Close()
} }
cm.wg.Done()
cclog.ComponentDebug("CollectorManager", "DONE") cclog.ComponentDebug("CollectorManager", "DONE")
} }

View File

@ -80,7 +80,10 @@ func (r *metricRouter) StartTimer() {
m := make(chan time.Time) m := make(chan time.Time)
r.ticker.AddChannel(m) r.ticker.AddChannel(m)
r.timerdone = make(chan bool) r.timerdone = make(chan bool)
r.wg.Add(1)
go func() { go func() {
defer r.wg.Done()
for { for {
select { select {
case <-r.timerdone: case <-r.timerdone:
@ -169,13 +172,11 @@ func (r *metricRouter) DoDelTags(point lp.CCMetric) {
// Start starts the metric router // Start starts the metric router
func (r *metricRouter) Start() { func (r *metricRouter) Start() {
r.wg.Add(1)
r.timestamp = time.Now() r.timestamp = time.Now()
if r.config.IntervalStamp { if r.config.IntervalStamp {
r.StartTimer() r.StartTimer()
} }
done := func() { done := func() {
r.wg.Done()
cclog.ComponentDebug("MetricRouter", "DONE") cclog.ComponentDebug("MetricRouter", "DONE")
} }
forward := func(point lp.CCMetric) { forward := func(point lp.CCMetric) {
@ -186,7 +187,10 @@ func (r *metricRouter) Start() {
o <- point o <- point
} }
} }
r.wg.Add(1)
go func() { go func() {
defer r.wg.Done()
for { for {
// RouterLoop: // RouterLoop:
select { select {

View File

@ -51,25 +51,16 @@ type RuntimeConfig struct {
CliArgs map[string]string CliArgs map[string]string
ConfigFile CentralConfigFile ConfigFile CentralConfigFile
Router mr.MetricRouter MetricRouter mr.MetricRouter
CollectManager collectors.CollectorManager CollectManager collectors.CollectorManager
SinkManager sinks.SinkManager SinkManager sinks.SinkManager
ReceiveManager receivers.ReceiveManager ReceiveManager receivers.ReceiveManager
Ticker mct.MultiChanTicker MultiChanTicker mct.MultiChanTicker
Channels []chan lp.CCMetric Channels []chan lp.CCMetric
Sync sync.WaitGroup Sync sync.WaitGroup
} }
func prepare_runcfg() RuntimeConfig {
return RuntimeConfig{
Router: nil,
CollectManager: nil,
SinkManager: nil,
ReceiveManager: nil,
}
}
//// Structure of the configuration file //// Structure of the configuration file
//type GlobalConfig struct { //type GlobalConfig struct {
// Sink sinks.SinkConfig `json:"sink"` // Sink sinks.SinkConfig `json:"sink"`
@ -157,8 +148,9 @@ func ReadCli() map[string]string {
// General shutdownHandler function that gets executed in case of interrupt or graceful shutdownHandler // General shutdownHandler function that gets executed in case of interrupt or graceful shutdownHandler
func shutdownHandler(config *RuntimeConfig, shutdownSignal chan os.Signal) { func shutdownHandler(config *RuntimeConfig, shutdownSignal chan os.Signal) {
<-shutdownSignal defer config.Sync.Done()
<-shutdownSignal
// Remove shutdown handler // Remove shutdown handler
// every additional interrupt signal will stop without cleaning up // every additional interrupt signal will stop without cleaning up
signal.Stop(shutdownSignal) signal.Stop(shutdownSignal)
@ -166,7 +158,7 @@ func shutdownHandler(config *RuntimeConfig, shutdownSignal chan os.Signal) {
cclog.Info("Shutdown...") cclog.Info("Shutdown...")
cclog.Debug("Shutdown Ticker...") cclog.Debug("Shutdown Ticker...")
config.Ticker.Close() config.MultiChanTicker.Close()
if config.CollectManager != nil { if config.CollectManager != nil {
cclog.Debug("Shutdown CollectManager...") cclog.Debug("Shutdown CollectManager...")
@ -176,9 +168,9 @@ func shutdownHandler(config *RuntimeConfig, shutdownSignal chan os.Signal) {
cclog.Debug("Shutdown ReceiveManager...") cclog.Debug("Shutdown ReceiveManager...")
config.ReceiveManager.Close() config.ReceiveManager.Close()
} }
if config.Router != nil { if config.MetricRouter != nil {
cclog.Debug("Shutdown Router...") cclog.Debug("Shutdown Router...")
config.Router.Close() config.MetricRouter.Close()
} }
if config.SinkManager != nil { if config.SinkManager != nil {
cclog.Debug("Shutdown SinkManager...") cclog.Debug("Shutdown SinkManager...")
@ -189,15 +181,20 @@ func shutdownHandler(config *RuntimeConfig, shutdownSignal chan os.Signal) {
// RemovePidfile(pidfile) // RemovePidfile(pidfile)
// pidfile = config.CliArgs["pidfile"] // pidfile = config.CliArgs["pidfile"]
// RemovePidfile(pidfile) // RemovePidfile(pidfile)
config.Sync.Done()
} }
func mainFunc() int { func mainFunc() int {
var err error var err error
use_recv := false use_recv := false
rcfg := prepare_runcfg() // Initialize runtime configuration
rcfg.CliArgs = ReadCli() rcfg := RuntimeConfig{
MetricRouter: nil,
CollectManager: nil,
SinkManager: nil,
ReceiveManager: nil,
CliArgs: ReadCli(),
}
// Load and check configuration // Load and check configuration
err = LoadCentralConfiguration(rcfg.CliArgs["configfile"], &rcfg.ConfigFile) err = LoadCentralConfiguration(rcfg.CliArgs["configfile"], &rcfg.ConfigFile)
@ -225,61 +222,75 @@ func mainFunc() int {
rcfg.Hostname = strings.SplitN(rcfg.Hostname, `.`, 2)[0] rcfg.Hostname = strings.SplitN(rcfg.Hostname, `.`, 2)[0]
// err = CreatePidfile(rcfg.CliArgs["pidfile"]) // err = CreatePidfile(rcfg.CliArgs["pidfile"])
if rcfg.CliArgs["logfile"] != "stderr" { // Set log file
cclog.SetOutput(rcfg.CliArgs["logfile"]) if logfile := rcfg.CliArgs["logfile"]; logfile != "stderr" {
cclog.SetOutput(logfile)
} }
// err = SetLogging(rcfg.CliArgs["logfile"])
// if err != nil { // Creat new multi channel ticker
// log.Print("Error setting up logging system to ", rcfg.CliArgs["logfile"], " on ", rcfg.Hostname) rcfg.MultiChanTicker = mct.NewTicker(rcfg.Interval)
// return
// } // Create new metric router
rcfg.Ticker = mct.NewTicker(rcfg.Interval)
if len(rcfg.ConfigFile.RouterConfigFile) > 0 { if len(rcfg.ConfigFile.RouterConfigFile) > 0 {
rcfg.Router, err = mr.New(rcfg.Ticker, &rcfg.Sync, rcfg.ConfigFile.RouterConfigFile) rcfg.MetricRouter, err = mr.New(rcfg.MultiChanTicker, &rcfg.Sync, rcfg.ConfigFile.RouterConfigFile)
if err != nil { if err != nil {
cclog.Error(err.Error()) cclog.Error(err.Error())
return 1 return 1
} }
} }
// Create new sink
if len(rcfg.ConfigFile.SinkConfigFile) > 0 { if len(rcfg.ConfigFile.SinkConfigFile) > 0 {
rcfg.SinkManager, err = sinks.New(&rcfg.Sync, rcfg.ConfigFile.SinkConfigFile) rcfg.SinkManager, err = sinks.New(&rcfg.Sync, rcfg.ConfigFile.SinkConfigFile)
if err != nil { if err != nil {
cclog.Error(err.Error()) cclog.Error(err.Error())
return 1 return 1
} }
// Connect metric router to sink manager
RouterToSinksChannel := make(chan lp.CCMetric, 200) RouterToSinksChannel := make(chan lp.CCMetric, 200)
rcfg.SinkManager.AddInput(RouterToSinksChannel) rcfg.SinkManager.AddInput(RouterToSinksChannel)
rcfg.Router.AddOutput(RouterToSinksChannel) rcfg.MetricRouter.AddOutput(RouterToSinksChannel)
} }
// Create new collector manager
if len(rcfg.ConfigFile.CollectorConfigFile) > 0 { if len(rcfg.ConfigFile.CollectorConfigFile) > 0 {
rcfg.CollectManager, err = collectors.New(rcfg.Ticker, rcfg.Duration, &rcfg.Sync, rcfg.ConfigFile.CollectorConfigFile) rcfg.CollectManager, err = collectors.New(rcfg.MultiChanTicker, rcfg.Duration, &rcfg.Sync, rcfg.ConfigFile.CollectorConfigFile)
if err != nil { if err != nil {
cclog.Error(err.Error()) cclog.Error(err.Error())
return 1 return 1
} }
// Connect collector manager to metric router
CollectToRouterChannel := make(chan lp.CCMetric, 200) CollectToRouterChannel := make(chan lp.CCMetric, 200)
rcfg.CollectManager.AddOutput(CollectToRouterChannel) rcfg.CollectManager.AddOutput(CollectToRouterChannel)
rcfg.Router.AddCollectorInput(CollectToRouterChannel) rcfg.MetricRouter.AddCollectorInput(CollectToRouterChannel)
} }
// Create new receive manager
if len(rcfg.ConfigFile.ReceiverConfigFile) > 0 { if len(rcfg.ConfigFile.ReceiverConfigFile) > 0 {
rcfg.ReceiveManager, err = receivers.New(&rcfg.Sync, rcfg.ConfigFile.ReceiverConfigFile) rcfg.ReceiveManager, err = receivers.New(&rcfg.Sync, rcfg.ConfigFile.ReceiverConfigFile)
if err != nil { if err != nil {
cclog.Error(err.Error()) cclog.Error(err.Error())
return 1 return 1
} }
// Connect receive manager to metric router
ReceiveToRouterChannel := make(chan lp.CCMetric, 200) ReceiveToRouterChannel := make(chan lp.CCMetric, 200)
rcfg.ReceiveManager.AddOutput(ReceiveToRouterChannel) rcfg.ReceiveManager.AddOutput(ReceiveToRouterChannel)
rcfg.Router.AddReceiverInput(ReceiveToRouterChannel) rcfg.MetricRouter.AddReceiverInput(ReceiveToRouterChannel)
use_recv = true use_recv = true
} }
// Create shutdown handler
shutdownSignal := make(chan os.Signal, 1) shutdownSignal := make(chan os.Signal, 1)
signal.Notify(shutdownSignal, os.Interrupt) signal.Notify(shutdownSignal, os.Interrupt)
signal.Notify(shutdownSignal, syscall.SIGTERM) signal.Notify(shutdownSignal, syscall.SIGTERM)
rcfg.Sync.Add(1)
go shutdownHandler(&rcfg, shutdownSignal) go shutdownHandler(&rcfg, shutdownSignal)
rcfg.Sync.Add(1) // Start the managers
rcfg.Router.Start() rcfg.MetricRouter.Start()
rcfg.SinkManager.Start() rcfg.SinkManager.Start()
rcfg.CollectManager.Start() rcfg.CollectManager.Start()
@ -294,8 +305,9 @@ func mainFunc() int {
shutdownSignal <- os.Interrupt shutdownSignal <- os.Interrupt
} }
// Wait until shutdownHandler is executed // Wait that all goroutines finish
rcfg.Sync.Wait() rcfg.Sync.Wait()
return 0 return 0
} }