Add comments

This commit is contained in:
Holger Obermaier 2022-02-09 10:09:03 +01:00
parent 7f78a5baf2
commit b4d7643c25

View File

@ -37,20 +37,28 @@ type SinkManager interface {
Close() Close()
} }
// Init initializes the sink manager by:
// * Reading its configuration file
// * Adding the configured sinks and providing them with the corresponding config
func (sm *sinkManager) Init(wg *sync.WaitGroup, sinkConfigFile string) error { func (sm *sinkManager) Init(wg *sync.WaitGroup, sinkConfigFile string) error {
sm.input = nil sm.input = nil
sm.done = make(chan bool) sm.done = make(chan bool)
sm.wg = wg sm.wg = wg
sm.sinks = make(map[string]Sink, 0) sm.sinks = make(map[string]Sink, 0)
if len(sinkConfigFile) == 0 {
return nil
}
// Read sink config file // Read sink config file
if len(sinkConfigFile) > 0 {
configFile, err := os.Open(sinkConfigFile) configFile, err := os.Open(sinkConfigFile)
if err != nil { if err != nil {
cclog.ComponentError("SinkManager", err.Error()) cclog.ComponentError("SinkManager", err.Error())
return err return err
} }
defer configFile.Close() defer configFile.Close()
// Parse config
jsonParser := json.NewDecoder(configFile) jsonParser := json.NewDecoder(configFile)
var rawConfigs map[string]json.RawMessage var rawConfigs map[string]json.RawMessage
err = jsonParser.Decode(&rawConfigs) err = jsonParser.Decode(&rawConfigs)
@ -58,6 +66,8 @@ func (sm *sinkManager) Init(wg *sync.WaitGroup, sinkConfigFile string) error {
cclog.ComponentError("SinkManager", err.Error()) cclog.ComponentError("SinkManager", err.Error())
return err return err
} }
// Start sinks
for name, raw := range rawConfigs { for name, raw := range rawConfigs {
err = sm.AddOutput(name, raw) err = sm.AddOutput(name, raw)
if err != nil { if err != nil {
@ -65,10 +75,12 @@ func (sm *sinkManager) Init(wg *sync.WaitGroup, sinkConfigFile string) error {
continue continue
} }
} }
}
return nil return nil
} }
// Start starts the sink managers background task, which
// distributes received metrics to the sinks
func (sm *sinkManager) Start() { func (sm *sinkManager) Start() {
batchcount := 20 batchcount := 20
@ -156,7 +168,7 @@ func (sm *sinkManager) Close() {
// New creates a new initialized sink manager // New creates a new initialized sink manager
func New(wg *sync.WaitGroup, sinkConfigFile string) (SinkManager, error) { func New(wg *sync.WaitGroup, sinkConfigFile string) (SinkManager, error) {
sm := &sinkManager{} sm := new(sinkManager)
err := sm.Init(wg, sinkConfigFile) err := sm.Init(wg, sinkConfigFile)
if err != nil { if err != nil {
return nil, err return nil, err