Handle shutdown sequentially

This commit is contained in:
Holger Obermaier 2022-01-27 17:43:00 +01:00
parent e1d0aacd1e
commit b9236dcc31
7 changed files with 37 additions and 42 deletions

View File

@ -151,11 +151,8 @@ func (cm *collectorManager) AddOutput(output chan lp.CCMetric) {
// Close finishes / stops the metric collector manager // Close finishes / stops the metric collector manager
func (cm *collectorManager) Close() { func (cm *collectorManager) Close() {
select {
case cm.done <- true:
default:
}
cclog.ComponentDebug("CollectorManager", "CLOSE") cclog.ComponentDebug("CollectorManager", "CLOSE")
cm.done <- true
} }
// New creates a new initialized metric collector manager // New creates a new initialized metric collector manager

View File

@ -10,6 +10,7 @@ import (
"strconv" "strconv"
"strings" "strings"
"time" "time"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
"golang.org/x/sys/unix" "golang.org/x/sys/unix"
) )

View File

@ -225,19 +225,13 @@ func (r *metricRouter) AddOutput(output chan lp.CCMetric) {
// Close finishes / stops the metric router // Close finishes / stops the metric router
func (r *metricRouter) Close() { func (r *metricRouter) Close() {
select { cclog.ComponentDebug("MetricRouter", "CLOSE")
case r.done <- true: r.done <- true
default:
}
if r.config.IntervalStamp { if r.config.IntervalStamp {
cclog.ComponentDebug("MetricRouter", "TIMER CLOSE") cclog.ComponentDebug("MetricRouter", "TIMER CLOSE")
select { r.timerdone <- true
case r.timerdone <- true:
default:
} }
} }
cclog.ComponentDebug("MetricRouter", "CLOSE")
}
// New creates a new initialized metric router // New creates a new initialized metric router
func New(ticker mct.MultiChanTicker, wg *sync.WaitGroup, routerConfigFile string) (MetricRouter, error) { func New(ticker mct.MultiChanTicker, wg *sync.WaitGroup, routerConfigFile string) (MetricRouter, error) {

View File

@ -1,8 +1,9 @@
package multiChanTicker package multiChanTicker
import ( import (
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
"time" "time"
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
) )
type multiChanTicker struct { type multiChanTicker struct {
@ -49,8 +50,8 @@ func (t *multiChanTicker) AddChannel(channel chan time.Time) {
} }
func (t *multiChanTicker) Close() { func (t *multiChanTicker) Close() {
t.done <- true
cclog.ComponentDebug("MultiChanTicker", "CLOSE") cclog.ComponentDebug("MultiChanTicker", "CLOSE")
t.done <- true
} }
func NewTicker(duration time.Duration) MultiChanTicker { func NewTicker(duration time.Duration) MultiChanTicker {

View File

@ -6,6 +6,7 @@ import (
"os" "os"
"os/signal" "os/signal"
"strings" "strings"
"syscall"
"github.com/ClusterCockpit/cc-metric-collector/collectors" "github.com/ClusterCockpit/cc-metric-collector/collectors"
"github.com/ClusterCockpit/cc-metric-collector/receivers" "github.com/ClusterCockpit/cc-metric-collector/receivers"
@ -154,10 +155,19 @@ func ReadCli() map[string]string {
// return nil // return nil
//} //}
// General shutdown function that gets executed in case of interrupt or graceful shutdown // General shutdownHandler function that gets executed in case of interrupt or graceful shutdownHandler
func shutdown(config *RuntimeConfig) { func shutdownHandler(config *RuntimeConfig, shutdownSignal chan os.Signal) {
<-shutdownSignal
// Remove shutdown handler
// every additional interrupt signal will stop without cleaning up
signal.Stop(shutdownSignal)
cclog.Info("Shutdown...") cclog.Info("Shutdown...")
cclog.Debug("Shutdown Ticker...")
config.Ticker.Close() config.Ticker.Close()
if config.CollectManager != nil { if config.CollectManager != nil {
cclog.Debug("Shutdown CollectManager...") cclog.Debug("Shutdown CollectManager...")
config.CollectManager.Close() config.CollectManager.Close()
@ -182,18 +192,6 @@ func shutdown(config *RuntimeConfig) {
config.Sync.Done() config.Sync.Done()
} }
// Register an interrupt handler for Ctrl+C and similar. At signal,
// all collectors are closed
func prepare_shutdown(config *RuntimeConfig) {
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, os.Interrupt)
go func(config *RuntimeConfig) {
<-sigs
shutdown(config)
}(config)
}
func mainFunc() int { func mainFunc() int {
var err error var err error
use_recv := false use_recv := false
@ -249,7 +247,7 @@ func mainFunc() int {
cclog.Error(err.Error()) cclog.Error(err.Error())
return 1 return 1
} }
RouterToSinksChannel := make(chan lp.CCMetric) RouterToSinksChannel := make(chan lp.CCMetric, 200)
rcfg.SinkManager.AddInput(RouterToSinksChannel) rcfg.SinkManager.AddInput(RouterToSinksChannel)
rcfg.Router.AddOutput(RouterToSinksChannel) rcfg.Router.AddOutput(RouterToSinksChannel)
} }
@ -259,7 +257,7 @@ func mainFunc() int {
cclog.Error(err.Error()) cclog.Error(err.Error())
return 1 return 1
} }
CollectToRouterChannel := make(chan lp.CCMetric) CollectToRouterChannel := make(chan lp.CCMetric, 200)
rcfg.CollectManager.AddOutput(CollectToRouterChannel) rcfg.CollectManager.AddOutput(CollectToRouterChannel)
rcfg.Router.AddCollectorInput(CollectToRouterChannel) rcfg.Router.AddCollectorInput(CollectToRouterChannel)
} }
@ -269,12 +267,17 @@ func mainFunc() int {
cclog.Error(err.Error()) cclog.Error(err.Error())
return 1 return 1
} }
ReceiveToRouterChannel := make(chan lp.CCMetric) ReceiveToRouterChannel := make(chan lp.CCMetric, 200)
rcfg.ReceiveManager.AddOutput(ReceiveToRouterChannel) rcfg.ReceiveManager.AddOutput(ReceiveToRouterChannel)
rcfg.Router.AddReceiverInput(ReceiveToRouterChannel) rcfg.Router.AddReceiverInput(ReceiveToRouterChannel)
use_recv = true use_recv = true
} }
prepare_shutdown(&rcfg)
shutdownSignal := make(chan os.Signal, 1)
signal.Notify(shutdownSignal, os.Interrupt)
signal.Notify(shutdownSignal, syscall.SIGTERM)
go shutdownHandler(&rcfg, shutdownSignal)
rcfg.Sync.Add(1) rcfg.Sync.Add(1)
rcfg.Router.Start() rcfg.Router.Start()
rcfg.SinkManager.Start() rcfg.SinkManager.Start()
@ -288,10 +291,10 @@ func mainFunc() int {
if rcfg.CliArgs["once"] == "true" { if rcfg.CliArgs["once"] == "true" {
x := 1.2 * float64(rcfg.ConfigFile.Interval) x := 1.2 * float64(rcfg.ConfigFile.Interval)
time.Sleep(time.Duration(int(x)) * time.Second) time.Sleep(time.Duration(int(x)) * time.Second)
shutdown(&rcfg) shutdownSignal <- os.Interrupt
} }
// Wait until receiving an interrupt // Wait until shutdownHandler is executed
rcfg.Sync.Wait() rcfg.Sync.Wait()
return 0 return 0
} }

View File

@ -2,10 +2,11 @@ package receivers
import ( import (
"encoding/json" "encoding/json"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
"os" "os"
"sync" "sync"
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
) )
var AvailableReceivers = map[string]Receiver{ var AvailableReceivers = map[string]Receiver{

View File

@ -68,10 +68,11 @@ func (sm *sinkManager) Start() {
go func() { go func() {
done := func() { done := func() {
for _, s := range sm.outputs { for _, s := range sm.outputs {
s.Flush()
s.Close() s.Close()
} }
cclog.ComponentDebug("SinkManager", "DONE")
sm.wg.Done() sm.wg.Done()
cclog.ComponentDebug("SinkManager", "DONE")
} }
for { for {
select { select {
@ -128,11 +129,8 @@ func (sm *sinkManager) AddOutput(rawConfig json.RawMessage) error {
} }
func (sm *sinkManager) Close() { func (sm *sinkManager) Close() {
select {
case sm.done <- true:
default:
}
cclog.ComponentDebug("SinkManager", "CLOSE") cclog.ComponentDebug("SinkManager", "CLOSE")
sm.done <- true
} }
func New(wg *sync.WaitGroup, sinkConfigFile string) (SinkManager, error) { func New(wg *sync.WaitGroup, sinkConfigFile string) (SinkManager, error) {