Split serial and parallel collectors. Read in parallel first

This commit is contained in:
Thomas Roehl 2022-05-13 12:23:34 +02:00
parent 7b804ad7a2
commit 1d1315c08b

View File

@ -40,13 +40,16 @@ var AvailableCollectors = map[string]MetricCollector{
// Metric collector manager data structure // Metric collector manager data structure
type collectorManager struct { type collectorManager struct {
collectors []MetricCollector // List of metric collectors to use collectors []MetricCollector // List of metric collectors to read in parallel
serial []MetricCollector // List of metric collectors to read serially
output chan lp.CCMetric // Output channels output chan lp.CCMetric // Output channels
done chan bool // channel to finish / stop metric collector manager done chan bool // channel to finish / stop metric collector manager
ticker mct.MultiChanTicker // periodically ticking once each interval ticker mct.MultiChanTicker // periodically ticking once each interval
duration time.Duration // duration (for metrics that measure over a given duration) duration time.Duration // duration (for metrics that measure over a given duration)
wg *sync.WaitGroup // wait group for all goroutines in cc-metric-collector wg *sync.WaitGroup // wait group for all goroutines in cc-metric-collector
config map[string]json.RawMessage // json encoded config for collector manager config map[string]json.RawMessage // json encoded config for collector manager
collector_wg sync.WaitGroup // internally used wait group for the parallel reading of collector
parallel_run bool // Flag whether the collectors are currently read in parallel
} }
// Metric collector manager access functions // Metric collector manager access functions
@ -66,6 +69,7 @@ type CollectorManager interface {
// Initialization is done for all configured collectors // Initialization is done for all configured collectors
func (cm *collectorManager) Init(ticker mct.MultiChanTicker, duration time.Duration, wg *sync.WaitGroup, collectConfigFile string) error { func (cm *collectorManager) Init(ticker mct.MultiChanTicker, duration time.Duration, wg *sync.WaitGroup, collectConfigFile string) error {
cm.collectors = make([]MetricCollector, 0) cm.collectors = make([]MetricCollector, 0)
cm.serial = make([]MetricCollector, 0)
cm.output = nil cm.output = nil
cm.done = make(chan bool) cm.done = make(chan bool)
cm.wg = wg cm.wg = wg
@ -100,7 +104,11 @@ func (cm *collectorManager) Init(ticker mct.MultiChanTicker, duration time.Durat
continue continue
} }
cclog.ComponentDebug("CollectorManager", "ADD COLLECTOR", collector.Name()) cclog.ComponentDebug("CollectorManager", "ADD COLLECTOR", collector.Name())
if collector.Parallel() {
cm.collectors = append(cm.collectors, collector) cm.collectors = append(cm.collectors, collector)
} else {
cm.serial = append(cm.serial, collector)
}
} }
return nil return nil
} }
@ -116,6 +124,10 @@ func (cm *collectorManager) Start() {
// Collector manager is done // Collector manager is done
done := func() { done := func() {
// close all metric collectors // close all metric collectors
if cm.parallel_run {
cm.collector_wg.Wait()
cm.parallel_run = false
}
for _, c := range cm.collectors { for _, c := range cm.collectors {
c.Close() c.Close()
} }
@ -130,7 +142,26 @@ func (cm *collectorManager) Start() {
done() done()
return return
case t := <-tick: case t := <-tick:
cm.parallel_run = true
for _, c := range cm.collectors { for _, c := range cm.collectors {
// Wait for done signal or execute the collector
select {
case <-cm.done:
done()
return
default:
// Read metrics from collector c via goroutine
cclog.ComponentDebug("CollectorManager", c.Name(), t)
cm.collector_wg.Add(1)
go func(myc MetricCollector) {
myc.Read(cm.duration, cm.output)
cm.collector_wg.Done()
}(c)
}
}
cm.collector_wg.Wait()
cm.parallel_run = false
for _, c := range cm.serial {
// Wait for done signal or execute the collector // Wait for done signal or execute the collector
select { select {
case <-cm.done: case <-cm.done: