From 1d1315c08bc105a97a621589256cb432c562ced0 Mon Sep 17 00:00:00 2001 From: Thomas Roehl Date: Fri, 13 May 2022 12:23:34 +0200 Subject: [PATCH] Split serial and parallel collectors. Read in parallel first --- collectors/collectorManager.go | 91 +++++++++++++++++++++++----------- 1 file changed, 61 insertions(+), 30 deletions(-) diff --git a/collectors/collectorManager.go b/collectors/collectorManager.go index e9ccfe7..62f6220 100644 --- a/collectors/collectorManager.go +++ b/collectors/collectorManager.go @@ -14,39 +14,42 @@ import ( // Map of all available metric collectors var AvailableCollectors = map[string]MetricCollector{ - "likwid": new(LikwidCollector), - "loadavg": new(LoadavgCollector), - "memstat": new(MemstatCollector), - "netstat": new(NetstatCollector), - "ibstat": new(InfinibandCollector), - "lustrestat": new(LustreCollector), - "cpustat": new(CpustatCollector), - "topprocs": new(TopProcsCollector), - "nvidia": new(NvidiaCollector), - "customcmd": new(CustomCmdCollector), - "iostat": new(IOstatCollector), - "diskstat": new(DiskstatCollector), - "tempstat": new(TempCollector), - "ipmistat": new(IpmiCollector), - "gpfs": new(GpfsCollector), - "cpufreq": new(CPUFreqCollector), - "cpufreq_cpuinfo": new(CPUFreqCpuInfoCollector), - "nfs3stat": new(Nfs3Collector), - "nfs4stat": new(Nfs4Collector), - "numastats": new(NUMAStatsCollector), - "beegfs_meta": new(BeegfsMetaCollector), - "beegfs_storage": new(BeegfsStorageCollector), + "likwid": new(LikwidCollector), + "loadavg": new(LoadavgCollector), + "memstat": new(MemstatCollector), + "netstat": new(NetstatCollector), + "ibstat": new(InfinibandCollector), + "lustrestat": new(LustreCollector), + "cpustat": new(CpustatCollector), + "topprocs": new(TopProcsCollector), + "nvidia": new(NvidiaCollector), + "customcmd": new(CustomCmdCollector), + "iostat": new(IOstatCollector), + "diskstat": new(DiskstatCollector), + "tempstat": new(TempCollector), + "ipmistat": new(IpmiCollector), + "gpfs": new(GpfsCollector), + "cpufreq": new(CPUFreqCollector), + "cpufreq_cpuinfo": new(CPUFreqCpuInfoCollector), + "nfs3stat": new(Nfs3Collector), + "nfs4stat": new(Nfs4Collector), + "numastats": new(NUMAStatsCollector), + "beegfs_meta": new(BeegfsMetaCollector), + "beegfs_storage": new(BeegfsStorageCollector), } // Metric collector manager data structure type collectorManager struct { - collectors []MetricCollector // List of metric collectors to use - output chan lp.CCMetric // Output channels - done chan bool // channel to finish / stop metric collector manager - ticker mct.MultiChanTicker // periodically ticking once each interval - duration time.Duration // duration (for metrics that measure over a given duration) - wg *sync.WaitGroup // wait group for all goroutines in cc-metric-collector - config map[string]json.RawMessage // json encoded config for collector manager + collectors []MetricCollector // List of metric collectors to read in parallel + serial []MetricCollector // List of metric collectors to read serially + output chan lp.CCMetric // Output channels + done chan bool // channel to finish / stop metric collector manager + ticker mct.MultiChanTicker // periodically ticking once each interval + duration time.Duration // duration (for metrics that measure over a given duration) + wg *sync.WaitGroup // wait group for all goroutines in cc-metric-collector + config map[string]json.RawMessage // json encoded config for collector manager + collector_wg sync.WaitGroup // internally used wait group for the parallel reading of collector + parallel_run bool // Flag whether the collectors are currently read in parallel } // Metric collector manager access functions @@ -66,6 +69,7 @@ type CollectorManager interface { // Initialization is done for all configured collectors func (cm *collectorManager) Init(ticker mct.MultiChanTicker, duration time.Duration, wg *sync.WaitGroup, collectConfigFile string) error { cm.collectors = make([]MetricCollector, 0) + cm.serial = make([]MetricCollector, 0) cm.output = nil cm.done = make(chan bool) cm.wg = wg @@ -100,7 +104,11 @@ func (cm *collectorManager) Init(ticker mct.MultiChanTicker, duration time.Durat continue } cclog.ComponentDebug("CollectorManager", "ADD COLLECTOR", collector.Name()) - cm.collectors = append(cm.collectors, collector) + if collector.Parallel() { + cm.collectors = append(cm.collectors, collector) + } else { + cm.serial = append(cm.serial, collector) + } } return nil } @@ -116,6 +124,10 @@ func (cm *collectorManager) Start() { // Collector manager is done done := func() { // close all metric collectors + if cm.parallel_run { + cm.collector_wg.Wait() + cm.parallel_run = false + } for _, c := range cm.collectors { c.Close() } @@ -130,7 +142,26 @@ func (cm *collectorManager) Start() { done() return case t := <-tick: + cm.parallel_run = true for _, c := range cm.collectors { + // Wait for done signal or execute the collector + select { + case <-cm.done: + done() + return + default: + // Read metrics from collector c via goroutine + cclog.ComponentDebug("CollectorManager", c.Name(), t) + cm.collector_wg.Add(1) + go func(myc MetricCollector) { + myc.Read(cm.duration, cm.output) + cm.collector_wg.Done() + }(c) + } + } + cm.collector_wg.Wait() + cm.parallel_run = false + for _, c := range cm.serial { // Wait for done signal or execute the collector select { case <-cm.done: