Update entry points

This commit is contained in:
Thomas Roehl 2021-12-20 17:44:10 +01:00
parent 8366f69a89
commit 9eac94d7e7

View File

@ -15,6 +15,7 @@ import (
"time" "time"
mr "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" mr "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
mct "github.com/ClusterCockpit/cc-metric-collector/internal/multiChanTicker"
) )
// List of provided collectors. Which collector should be run can be // List of provided collectors. Which collector should be run can be
@ -76,13 +77,10 @@ type RuntimeConfig struct {
ConfigFile CentralConfigFile ConfigFile CentralConfigFile
Router mr.MetricRouter Router mr.MetricRouter
RouterDone chan bool
CollectManager collectors.CollectorManager CollectManager collectors.CollectorManager
CollectManagerDone chan bool
SinkManager sinks.SinkManager SinkManager sinks.SinkManager
SinkManagerDone chan bool
ReceiveManager receivers.ReceiveManager ReceiveManager receivers.ReceiveManager
ReceiveManagerDone chan bool Ticker mct.MultiChanTicker
Channels []chan lp.CCMetric Channels []chan lp.CCMetric
Sync sync.WaitGroup Sync sync.WaitGroup
@ -252,40 +250,30 @@ func main() {
// log.Print("Error setting up logging system to ", rcfg.CliArgs["logfile"], " on ", rcfg.Hostname) // log.Print("Error setting up logging system to ", rcfg.CliArgs["logfile"], " on ", rcfg.Hostname)
// return // return
// } // }
rcfg.Ticker = mct.NewTicker(rcfg.Interval)
if len(rcfg.ConfigFile.RouterConfigFile) > 0 { if len(rcfg.ConfigFile.RouterConfigFile) > 0 {
rcfg.RouterDone = make(chan bool) rcfg.Router, err = mr.New(rcfg.Ticker, &rcfg.Sync, rcfg.ConfigFile.RouterConfigFile)
rcfg.Router, err = mr.New(rcfg.RouterDone, &rcfg.Sync)
if err != nil { if err != nil {
log.Print(err.Error()) log.Print(err.Error())
return return
} }
rcfg.Router.ReadConfig(rcfg.ConfigFile.RouterConfigFile)
} }
if len(rcfg.ConfigFile.SinkConfigFile) > 0 { if len(rcfg.ConfigFile.SinkConfigFile) > 0 {
rcfg.SinkManagerDone = make(chan bool)
rcfg.SinkManager, err = sinks.New(&rcfg.Sync, rcfg.ConfigFile.SinkConfigFile) rcfg.SinkManager, err = sinks.New(&rcfg.Sync, rcfg.ConfigFile.SinkConfigFile)
if err != nil { if err != nil {
log.Print(err.Error()) log.Print(err.Error())
return return
} }
// rcfg.SinkManager.ReadConfig(rcfg.ConfigFile.SinkConfigFile)
RouterToSinksChannel := make(chan lp.CCMetric) RouterToSinksChannel := make(chan lp.CCMetric)
rcfg.SinkManager.AddInput(RouterToSinksChannel) rcfg.SinkManager.AddInput(RouterToSinksChannel)
rcfg.Router.AddOutput(RouterToSinksChannel) rcfg.Router.AddOutput(RouterToSinksChannel)
} }
if len(rcfg.ConfigFile.CollectorConfigFile) > 0 { if len(rcfg.ConfigFile.CollectorConfigFile) > 0 {
// rcfg.CollectManagerDone = make(chan bool) rcfg.CollectManager, err = collectors.New(rcfg.Ticker, rcfg.Duration, &rcfg.Sync, rcfg.ConfigFile.CollectorConfigFile)
rcfg.CollectManager, err = collectors.New(rcfg.Interval, rcfg.Duration, &rcfg.Sync, rcfg.ConfigFile.CollectorConfigFile)
if err != nil { if err != nil {
log.Print(err.Error()) log.Print(err.Error())
return return
} }
// rcfg.CollectManager.ReadConfig(rcfg.ConfigFile.CollectorConfigFile)
// if err != nil {
// log.Print(err.Error())
// return
// }
CollectToRouterChannel := make(chan lp.CCMetric) CollectToRouterChannel := make(chan lp.CCMetric)
rcfg.CollectManager.AddOutput(CollectToRouterChannel) rcfg.CollectManager.AddOutput(CollectToRouterChannel)
rcfg.Router.AddInput(CollectToRouterChannel) rcfg.Router.AddInput(CollectToRouterChannel)
@ -306,6 +294,7 @@ func main() {
rcfg.Router.Start() rcfg.Router.Start()
rcfg.SinkManager.Start() rcfg.SinkManager.Start()
rcfg.CollectManager.Start() rcfg.CollectManager.Start()
if use_recv { if use_recv {
rcfg.ReceiveManager.Start() rcfg.ReceiveManager.Start()
} }