Use non-blocking send at close, use common done function and remove default case

This commit is contained in:
Thomas Roehl 2022-01-26 16:54:51 +01:00
parent babd7a9af8
commit 3fd77e6887

View File

@ -2,12 +2,11 @@ package sinks
import ( import (
"encoding/json" "encoding/json"
// "log"
"os" "os"
"sync" "sync"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
) )
var AvailableSinks = map[string]Sink{ var AvailableSinks = map[string]Sink{
@ -67,16 +66,18 @@ func (sm *sinkManager) Start() {
sm.wg.Add(1) sm.wg.Add(1)
batchcount := 20 batchcount := 20
go func() { go func() {
done := func() {
for _, s := range sm.outputs {
s.Close()
}
cclog.ComponentDebug("SinkManager", "DONE")
sm.wg.Done()
}
for { for {
SinkManagerLoop:
select { select {
case <-sm.done: case <-sm.done:
for _, s := range sm.outputs { done()
s.Close() return
}
cclog.ComponentDebug("SinkManager", "DONE")
sm.wg.Done()
break SinkManagerLoop
case p := <-sm.input: case p := <-sm.input:
cclog.ComponentDebug("SinkManager", "WRITE", p) cclog.ComponentDebug("SinkManager", "WRITE", p)
for _, s := range sm.outputs { for _, s := range sm.outputs {
@ -90,7 +91,6 @@ func (sm *sinkManager) Start() {
batchcount = 20 batchcount = 20
} }
batchcount-- batchcount--
default:
} }
} }
}() }()
@ -128,7 +128,10 @@ func (sm *sinkManager) AddOutput(rawConfig json.RawMessage) error {
} }
func (sm *sinkManager) Close() { func (sm *sinkManager) Close() {
sm.done <- true select {
case sm.done <- true:
default:
}
cclog.ComponentDebug("SinkManager", "CLOSE") cclog.ComponentDebug("SinkManager", "CLOSE")
} }