From b9236dcc31d56d432038c10480a69171695a4ace Mon Sep 17 00:00:00 2001 From: Holger Obermaier <40787752+ho-ob@users.noreply.github.com> Date: Thu, 27 Jan 2022 17:43:00 +0100 Subject: [PATCH] Handle shutdown sequentially --- collectors/collectorManager.go | 5 +-- collectors/cpufreqMetric.go | 1 + internal/metricRouter/metricRouter.go | 12 ++---- internal/multiChanTicker/multiChanTicker.go | 5 ++- metric-collector.go | 43 +++++++++++---------- receivers/receiveManager.go | 5 ++- sinks/sinkManager.go | 8 ++-- 7 files changed, 37 insertions(+), 42 deletions(-) diff --git a/collectors/collectorManager.go b/collectors/collectorManager.go index 98b6115..0b2dfcc 100644 --- a/collectors/collectorManager.go +++ b/collectors/collectorManager.go @@ -151,11 +151,8 @@ func (cm *collectorManager) AddOutput(output chan lp.CCMetric) { // Close finishes / stops the metric collector manager func (cm *collectorManager) Close() { - select { - case cm.done <- true: - default: - } cclog.ComponentDebug("CollectorManager", "CLOSE") + cm.done <- true } // New creates a new initialized metric collector manager diff --git a/collectors/cpufreqMetric.go b/collectors/cpufreqMetric.go index 5febed9..f3309ff 100644 --- a/collectors/cpufreqMetric.go +++ b/collectors/cpufreqMetric.go @@ -10,6 +10,7 @@ import ( "strconv" "strings" "time" + lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" "golang.org/x/sys/unix" ) diff --git a/internal/metricRouter/metricRouter.go b/internal/metricRouter/metricRouter.go index 57ba708..6327d95 100644 --- a/internal/metricRouter/metricRouter.go +++ b/internal/metricRouter/metricRouter.go @@ -225,18 +225,12 @@ func (r *metricRouter) AddOutput(output chan lp.CCMetric) { // Close finishes / stops the metric router func (r *metricRouter) Close() { - select { - case r.done <- true: - default: - } + cclog.ComponentDebug("MetricRouter", "CLOSE") + r.done <- true if r.config.IntervalStamp { cclog.ComponentDebug("MetricRouter", "TIMER CLOSE") - select { - case r.timerdone <- true: - default: - } + r.timerdone <- true } - cclog.ComponentDebug("MetricRouter", "CLOSE") } // New creates a new initialized metric router diff --git a/internal/multiChanTicker/multiChanTicker.go b/internal/multiChanTicker/multiChanTicker.go index 37778ad..a9394ab 100644 --- a/internal/multiChanTicker/multiChanTicker.go +++ b/internal/multiChanTicker/multiChanTicker.go @@ -1,8 +1,9 @@ package multiChanTicker import ( - cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" "time" + + cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" ) type multiChanTicker struct { @@ -49,8 +50,8 @@ func (t *multiChanTicker) AddChannel(channel chan time.Time) { } func (t *multiChanTicker) Close() { - t.done <- true cclog.ComponentDebug("MultiChanTicker", "CLOSE") + t.done <- true } func NewTicker(duration time.Duration) MultiChanTicker { diff --git a/metric-collector.go b/metric-collector.go index 6a6c1b3..0cd368a 100644 --- a/metric-collector.go +++ b/metric-collector.go @@ -6,6 +6,7 @@ import ( "os" "os/signal" "strings" + "syscall" "github.com/ClusterCockpit/cc-metric-collector/collectors" "github.com/ClusterCockpit/cc-metric-collector/receivers" @@ -154,10 +155,19 @@ func ReadCli() map[string]string { // return nil //} -// General shutdown function that gets executed in case of interrupt or graceful shutdown -func shutdown(config *RuntimeConfig) { +// General shutdownHandler function that gets executed in case of interrupt or graceful shutdownHandler +func shutdownHandler(config *RuntimeConfig, shutdownSignal chan os.Signal) { + <-shutdownSignal + + // Remove shutdown handler + // every additional interrupt signal will stop without cleaning up + signal.Stop(shutdownSignal) + cclog.Info("Shutdown...") + + cclog.Debug("Shutdown Ticker...") config.Ticker.Close() + if config.CollectManager != nil { cclog.Debug("Shutdown CollectManager...") config.CollectManager.Close() @@ -182,18 +192,6 @@ func shutdown(config *RuntimeConfig) { config.Sync.Done() } -// Register an interrupt handler for Ctrl+C and similar. At signal, -// all collectors are closed -func prepare_shutdown(config *RuntimeConfig) { - sigs := make(chan os.Signal, 1) - signal.Notify(sigs, os.Interrupt) - - go func(config *RuntimeConfig) { - <-sigs - shutdown(config) - }(config) -} - func mainFunc() int { var err error use_recv := false @@ -249,7 +247,7 @@ func mainFunc() int { cclog.Error(err.Error()) return 1 } - RouterToSinksChannel := make(chan lp.CCMetric) + RouterToSinksChannel := make(chan lp.CCMetric, 200) rcfg.SinkManager.AddInput(RouterToSinksChannel) rcfg.Router.AddOutput(RouterToSinksChannel) } @@ -259,7 +257,7 @@ func mainFunc() int { cclog.Error(err.Error()) return 1 } - CollectToRouterChannel := make(chan lp.CCMetric) + CollectToRouterChannel := make(chan lp.CCMetric, 200) rcfg.CollectManager.AddOutput(CollectToRouterChannel) rcfg.Router.AddCollectorInput(CollectToRouterChannel) } @@ -269,12 +267,17 @@ func mainFunc() int { cclog.Error(err.Error()) return 1 } - ReceiveToRouterChannel := make(chan lp.CCMetric) + ReceiveToRouterChannel := make(chan lp.CCMetric, 200) rcfg.ReceiveManager.AddOutput(ReceiveToRouterChannel) rcfg.Router.AddReceiverInput(ReceiveToRouterChannel) use_recv = true } - prepare_shutdown(&rcfg) + + shutdownSignal := make(chan os.Signal, 1) + signal.Notify(shutdownSignal, os.Interrupt) + signal.Notify(shutdownSignal, syscall.SIGTERM) + go shutdownHandler(&rcfg, shutdownSignal) + rcfg.Sync.Add(1) rcfg.Router.Start() rcfg.SinkManager.Start() @@ -288,10 +291,10 @@ func mainFunc() int { if rcfg.CliArgs["once"] == "true" { x := 1.2 * float64(rcfg.ConfigFile.Interval) time.Sleep(time.Duration(int(x)) * time.Second) - shutdown(&rcfg) + shutdownSignal <- os.Interrupt } - // Wait until receiving an interrupt + // Wait until shutdownHandler is executed rcfg.Sync.Wait() return 0 } diff --git a/receivers/receiveManager.go b/receivers/receiveManager.go index e6a2eee..c570aa4 100644 --- a/receivers/receiveManager.go +++ b/receivers/receiveManager.go @@ -2,10 +2,11 @@ package receivers import ( "encoding/json" - lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" - cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" "os" "sync" + + cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" + lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" ) var AvailableReceivers = map[string]Receiver{ diff --git a/sinks/sinkManager.go b/sinks/sinkManager.go index efcb5a0..4be8313 100644 --- a/sinks/sinkManager.go +++ b/sinks/sinkManager.go @@ -68,10 +68,11 @@ func (sm *sinkManager) Start() { go func() { done := func() { for _, s := range sm.outputs { + s.Flush() s.Close() } - cclog.ComponentDebug("SinkManager", "DONE") sm.wg.Done() + cclog.ComponentDebug("SinkManager", "DONE") } for { select { @@ -128,11 +129,8 @@ func (sm *sinkManager) AddOutput(rawConfig json.RawMessage) error { } func (sm *sinkManager) Close() { - select { - case sm.done <- true: - default: - } cclog.ComponentDebug("SinkManager", "CLOSE") + sm.done <- true } func New(wg *sync.WaitGroup, sinkConfigFile string) (SinkManager, error) {