From 5c348059188d031bfbaf9b8357db8250843d40ab Mon Sep 17 00:00:00 2001 From: Thomas Gruber Date: Fri, 13 May 2022 14:10:39 +0200 Subject: [PATCH] Collectors in parallel (#74) * Provide info to CollectorManager whether the collector can be executed in parallel with others * Split serial and parallel collectors. Read in parallel first --- collectors/beegfsmetaMetric.go | 1 + collectors/beegfsstorageMetric.go | 1 + collectors/collectorManager.go | 91 ++++++++++++++++++++---------- collectors/cpufreqCpuinfoMetric.go | 1 + collectors/cpufreqMetric.go | 1 + collectors/cpustatMetric.go | 1 + collectors/customCmdMetric.go | 1 + collectors/diskstatMetric.go | 15 +++-- collectors/gpfsMetric.go | 1 + collectors/infinibandMetric.go | 1 + collectors/iostatMetric.go | 1 + collectors/ipmiMetric.go | 1 + collectors/likwidMetric.go | 1 + collectors/loadavgMetric.go | 1 + collectors/lustreMetric.go | 1 + collectors/memstatMetric.go | 2 + collectors/metricCollector.go | 19 +++++-- collectors/netstatMetric.go | 1 + collectors/nfsMetric.go | 1 + collectors/numastatsMetric.go | 1 + collectors/sampleMetric.go | 4 ++ collectors/tempMetric.go | 1 + collectors/topprocsMetric.go | 1 + 23 files changed, 109 insertions(+), 40 deletions(-) diff --git a/collectors/beegfsmetaMetric.go b/collectors/beegfsmetaMetric.go index 57b1e39..a27faf2 100644 --- a/collectors/beegfsmetaMetric.go +++ b/collectors/beegfsmetaMetric.go @@ -55,6 +55,7 @@ func (m *BeegfsMetaCollector) Init(config json.RawMessage) error { m.name = "BeegfsMetaCollector" m.setup() + m.parallel = true // Set default beegfs-ctl binary m.config.Beegfs = DEFAULT_BEEGFS_CMD diff --git a/collectors/beegfsstorageMetric.go b/collectors/beegfsstorageMetric.go index cbc8314..1160664 100644 --- a/collectors/beegfsstorageMetric.go +++ b/collectors/beegfsstorageMetric.go @@ -48,6 +48,7 @@ func (m *BeegfsStorageCollector) Init(config json.RawMessage) error { m.name = "BeegfsStorageCollector" m.setup() + m.parallel = true // Set default beegfs-ctl binary m.config.Beegfs = DEFAULT_BEEGFS_CMD diff --git a/collectors/collectorManager.go b/collectors/collectorManager.go index e9ccfe7..62f6220 100644 --- a/collectors/collectorManager.go +++ b/collectors/collectorManager.go @@ -14,39 +14,42 @@ import ( // Map of all available metric collectors var AvailableCollectors = map[string]MetricCollector{ - "likwid": new(LikwidCollector), - "loadavg": new(LoadavgCollector), - "memstat": new(MemstatCollector), - "netstat": new(NetstatCollector), - "ibstat": new(InfinibandCollector), - "lustrestat": new(LustreCollector), - "cpustat": new(CpustatCollector), - "topprocs": new(TopProcsCollector), - "nvidia": new(NvidiaCollector), - "customcmd": new(CustomCmdCollector), - "iostat": new(IOstatCollector), - "diskstat": new(DiskstatCollector), - "tempstat": new(TempCollector), - "ipmistat": new(IpmiCollector), - "gpfs": new(GpfsCollector), - "cpufreq": new(CPUFreqCollector), - "cpufreq_cpuinfo": new(CPUFreqCpuInfoCollector), - "nfs3stat": new(Nfs3Collector), - "nfs4stat": new(Nfs4Collector), - "numastats": new(NUMAStatsCollector), - "beegfs_meta": new(BeegfsMetaCollector), - "beegfs_storage": new(BeegfsStorageCollector), + "likwid": new(LikwidCollector), + "loadavg": new(LoadavgCollector), + "memstat": new(MemstatCollector), + "netstat": new(NetstatCollector), + "ibstat": new(InfinibandCollector), + "lustrestat": new(LustreCollector), + "cpustat": new(CpustatCollector), + "topprocs": new(TopProcsCollector), + "nvidia": new(NvidiaCollector), + "customcmd": new(CustomCmdCollector), + "iostat": new(IOstatCollector), + "diskstat": new(DiskstatCollector), + "tempstat": new(TempCollector), + "ipmistat": new(IpmiCollector), + "gpfs": new(GpfsCollector), + "cpufreq": new(CPUFreqCollector), + "cpufreq_cpuinfo": new(CPUFreqCpuInfoCollector), + "nfs3stat": new(Nfs3Collector), + "nfs4stat": new(Nfs4Collector), + "numastats": new(NUMAStatsCollector), + "beegfs_meta": new(BeegfsMetaCollector), + "beegfs_storage": new(BeegfsStorageCollector), } // Metric collector manager data structure type collectorManager struct { - collectors []MetricCollector // List of metric collectors to use - output chan lp.CCMetric // Output channels - done chan bool // channel to finish / stop metric collector manager - ticker mct.MultiChanTicker // periodically ticking once each interval - duration time.Duration // duration (for metrics that measure over a given duration) - wg *sync.WaitGroup // wait group for all goroutines in cc-metric-collector - config map[string]json.RawMessage // json encoded config for collector manager + collectors []MetricCollector // List of metric collectors to read in parallel + serial []MetricCollector // List of metric collectors to read serially + output chan lp.CCMetric // Output channels + done chan bool // channel to finish / stop metric collector manager + ticker mct.MultiChanTicker // periodically ticking once each interval + duration time.Duration // duration (for metrics that measure over a given duration) + wg *sync.WaitGroup // wait group for all goroutines in cc-metric-collector + config map[string]json.RawMessage // json encoded config for collector manager + collector_wg sync.WaitGroup // internally used wait group for the parallel reading of collector + parallel_run bool // Flag whether the collectors are currently read in parallel } // Metric collector manager access functions @@ -66,6 +69,7 @@ type CollectorManager interface { // Initialization is done for all configured collectors func (cm *collectorManager) Init(ticker mct.MultiChanTicker, duration time.Duration, wg *sync.WaitGroup, collectConfigFile string) error { cm.collectors = make([]MetricCollector, 0) + cm.serial = make([]MetricCollector, 0) cm.output = nil cm.done = make(chan bool) cm.wg = wg @@ -100,7 +104,11 @@ func (cm *collectorManager) Init(ticker mct.MultiChanTicker, duration time.Durat continue } cclog.ComponentDebug("CollectorManager", "ADD COLLECTOR", collector.Name()) - cm.collectors = append(cm.collectors, collector) + if collector.Parallel() { + cm.collectors = append(cm.collectors, collector) + } else { + cm.serial = append(cm.serial, collector) + } } return nil } @@ -116,6 +124,10 @@ func (cm *collectorManager) Start() { // Collector manager is done done := func() { // close all metric collectors + if cm.parallel_run { + cm.collector_wg.Wait() + cm.parallel_run = false + } for _, c := range cm.collectors { c.Close() } @@ -130,7 +142,26 @@ func (cm *collectorManager) Start() { done() return case t := <-tick: + cm.parallel_run = true for _, c := range cm.collectors { + // Wait for done signal or execute the collector + select { + case <-cm.done: + done() + return + default: + // Read metrics from collector c via goroutine + cclog.ComponentDebug("CollectorManager", c.Name(), t) + cm.collector_wg.Add(1) + go func(myc MetricCollector) { + myc.Read(cm.duration, cm.output) + cm.collector_wg.Done() + }(c) + } + } + cm.collector_wg.Wait() + cm.parallel_run = false + for _, c := range cm.serial { // Wait for done signal or execute the collector select { case <-cm.done: diff --git a/collectors/cpufreqCpuinfoMetric.go b/collectors/cpufreqCpuinfoMetric.go index 96ee9c5..80732ff 100644 --- a/collectors/cpufreqCpuinfoMetric.go +++ b/collectors/cpufreqCpuinfoMetric.go @@ -48,6 +48,7 @@ func (m *CPUFreqCpuInfoCollector) Init(config json.RawMessage) error { m.setup() m.name = "CPUFreqCpuInfoCollector" + m.parallel = true m.meta = map[string]string{ "source": m.name, "group": "CPU", diff --git a/collectors/cpufreqMetric.go b/collectors/cpufreqMetric.go index 076fdf5..3099900 100644 --- a/collectors/cpufreqMetric.go +++ b/collectors/cpufreqMetric.go @@ -53,6 +53,7 @@ func (m *CPUFreqCollector) Init(config json.RawMessage) error { m.name = "CPUFreqCollector" m.setup() + m.parallel = true if len(config) > 0 { err := json.Unmarshal(config, &m.config) if err != nil { diff --git a/collectors/cpustatMetric.go b/collectors/cpustatMetric.go index 2f2b084..c0dcf13 100644 --- a/collectors/cpustatMetric.go +++ b/collectors/cpustatMetric.go @@ -30,6 +30,7 @@ type CpustatCollector struct { func (m *CpustatCollector) Init(config json.RawMessage) error { m.name = "CpustatCollector" m.setup() + m.parallel = true m.meta = map[string]string{"source": m.name, "group": "CPU", "unit": "Percent"} m.nodetags = map[string]string{"type": "node"} if len(config) > 0 { diff --git a/collectors/customCmdMetric.go b/collectors/customCmdMetric.go index ec2109b..492dd48 100644 --- a/collectors/customCmdMetric.go +++ b/collectors/customCmdMetric.go @@ -33,6 +33,7 @@ type CustomCmdCollector struct { func (m *CustomCmdCollector) Init(config json.RawMessage) error { var err error m.name = "CustomCmdCollector" + m.parallel = true m.meta = map[string]string{"source": m.name, "group": "Custom"} if len(config) > 0 { err = json.Unmarshal(config, &m.config) diff --git a/collectors/diskstatMetric.go b/collectors/diskstatMetric.go index 3e7af01..69ffe07 100644 --- a/collectors/diskstatMetric.go +++ b/collectors/diskstatMetric.go @@ -29,6 +29,7 @@ type DiskstatCollector struct { func (m *DiskstatCollector) Init(config json.RawMessage) error { m.name = "DiskstatCollector" + m.parallel = true m.meta = map[string]string{"source": m.name, "group": "Disk"} m.setup() if len(config) > 0 { @@ -77,7 +78,11 @@ func (m *DiskstatCollector) Read(interval time.Duration, output chan lp.CCMetric continue } path := strings.Replace(linefields[1], `\040`, " ", -1) - stat := syscall.Statfs_t{} + stat := syscall.Statfs_t{ + Blocks: 0, + Bsize: 0, + Bfree: 0, + } err := syscall.Statfs(path, &stat) if err != nil { continue @@ -98,9 +103,11 @@ func (m *DiskstatCollector) Read(interval time.Duration, output chan lp.CCMetric y.AddMeta("unit", "GBytes") output <- y } - perc := (100 * (total - free)) / total - if perc > part_max_used { - part_max_used = perc + if total > 0 { + perc := (100 * (total - free)) / total + if perc > part_max_used { + part_max_used = perc + } } } y, err := lp.New("part_max_used", map[string]string{"type": "node"}, m.meta, map[string]interface{}{"value": int(part_max_used)}, time.Now()) diff --git a/collectors/gpfsMetric.go b/collectors/gpfsMetric.go index ed63201..ca9affe 100644 --- a/collectors/gpfsMetric.go +++ b/collectors/gpfsMetric.go @@ -46,6 +46,7 @@ func (m *GpfsCollector) Init(config json.RawMessage) error { var err error m.name = "GpfsCollector" m.setup() + m.parallel = true // Set default mmpmon binary m.config.Mmpmon = DEFAULT_GPFS_CMD diff --git a/collectors/infinibandMetric.go b/collectors/infinibandMetric.go index 274e669..92ea911 100644 --- a/collectors/infinibandMetric.go +++ b/collectors/infinibandMetric.go @@ -54,6 +54,7 @@ func (m *InfinibandCollector) Init(config json.RawMessage) error { var err error m.name = "InfinibandCollector" m.setup() + m.parallel = true m.meta = map[string]string{ "source": m.name, "group": "Network", diff --git a/collectors/iostatMetric.go b/collectors/iostatMetric.go index ca7f33c..19b4157 100644 --- a/collectors/iostatMetric.go +++ b/collectors/iostatMetric.go @@ -37,6 +37,7 @@ type IOstatCollector struct { func (m *IOstatCollector) Init(config json.RawMessage) error { var err error m.name = "IOstatCollector" + m.parallel = true m.meta = map[string]string{"source": m.name, "group": "Disk"} m.setup() if len(config) > 0 { diff --git a/collectors/ipmiMetric.go b/collectors/ipmiMetric.go index 16b08ef..50605ac 100644 --- a/collectors/ipmiMetric.go +++ b/collectors/ipmiMetric.go @@ -34,6 +34,7 @@ type IpmiCollector struct { func (m *IpmiCollector) Init(config json.RawMessage) error { m.name = "IpmiCollector" m.setup() + m.parallel = true m.meta = map[string]string{"source": m.name, "group": "IPMI"} m.config.IpmitoolPath = string(IPMITOOL_PATH) m.config.IpmisensorsPath = string(IPMISENSORS_PATH) diff --git a/collectors/likwidMetric.go b/collectors/likwidMetric.go index f2229d1..ae44392 100644 --- a/collectors/likwidMetric.go +++ b/collectors/likwidMetric.go @@ -177,6 +177,7 @@ func getBaseFreq() float64 { func (m *LikwidCollector) Init(config json.RawMessage) error { m.name = "LikwidCollector" + m.parallel = false m.initialized = false m.running = false m.config.AccessMode = LIKWID_DEF_ACCESSMODE diff --git a/collectors/loadavgMetric.go b/collectors/loadavgMetric.go index 3859721..58fb102 100644 --- a/collectors/loadavgMetric.go +++ b/collectors/loadavgMetric.go @@ -36,6 +36,7 @@ type LoadavgCollector struct { func (m *LoadavgCollector) Init(config json.RawMessage) error { m.name = "LoadavgCollector" + m.parallel = true m.setup() if len(config) > 0 { err := json.Unmarshal(config, &m.config) diff --git a/collectors/lustreMetric.go b/collectors/lustreMetric.go index d5a96e4..eade2ca 100644 --- a/collectors/lustreMetric.go +++ b/collectors/lustreMetric.go @@ -288,6 +288,7 @@ var LustreDeriveMetrics = []LustreMetricDefinition{ func (m *LustreCollector) Init(config json.RawMessage) error { var err error m.name = "LustreCollector" + m.parallel = true if len(config) > 0 { err = json.Unmarshal(config, &m.config) if err != nil { diff --git a/collectors/memstatMetric.go b/collectors/memstatMetric.go index c6c7f34..9841a01 100644 --- a/collectors/memstatMetric.go +++ b/collectors/memstatMetric.go @@ -81,6 +81,7 @@ func getStats(filename string) map[string]MemstatStats { func (m *MemstatCollector) Init(config json.RawMessage) error { var err error m.name = "MemstatCollector" + m.parallel = true m.config.NodeStats = true m.config.NumaStats = false if len(config) > 0 { @@ -159,6 +160,7 @@ func (m *MemstatCollector) Init(config json.RawMessage) error { func (m *MemstatCollector) Read(interval time.Duration, output chan lp.CCMetric) { if !m.init { + cclog.ComponentPrint(m.name, "Here") return } diff --git a/collectors/metricCollector.go b/collectors/metricCollector.go index 7c04e90..13f8f2a 100644 --- a/collectors/metricCollector.go +++ b/collectors/metricCollector.go @@ -13,17 +13,19 @@ import ( ) type MetricCollector interface { - Name() string // Name of the metric collector - Init(config json.RawMessage) error // Initialize metric collector - Initialized() bool // Is metric collector initialized? + Name() string // Name of the metric collector + Init(config json.RawMessage) error // Initialize metric collector + Initialized() bool // Is metric collector initialized? + Parallel() bool Read(duration time.Duration, output chan lp.CCMetric) // Read metrics from metric collector Close() // Close / finish metric collector } type metricCollector struct { - name string // name of the metric - init bool // is metric collector initialized? - meta map[string]string // static meta data tags + name string // name of the metric + init bool // is metric collector initialized? + parallel bool // can the metric collector be executed in parallel with others + meta map[string]string // static meta data tags } // Name returns the name of the metric collector @@ -31,6 +33,11 @@ func (c *metricCollector) Name() string { return c.name } +// Name returns the name of the metric collector +func (c *metricCollector) Parallel() bool { + return c.parallel +} + // Setup is for future use func (c *metricCollector) setup() error { return nil diff --git a/collectors/netstatMetric.go b/collectors/netstatMetric.go index d171d4b..8cfb34e 100644 --- a/collectors/netstatMetric.go +++ b/collectors/netstatMetric.go @@ -39,6 +39,7 @@ type NetstatCollector struct { func (m *NetstatCollector) Init(config json.RawMessage) error { m.name = "NetstatCollector" + m.parallel = true m.setup() m.lastTimestamp = time.Now() diff --git a/collectors/nfsMetric.go b/collectors/nfsMetric.go index c511b0d..6b15784 100644 --- a/collectors/nfsMetric.go +++ b/collectors/nfsMetric.go @@ -114,6 +114,7 @@ func (m *nfsCollector) MainInit(config json.RawMessage) error { m.data = make(map[string]NfsCollectorData) m.initStats() m.init = true + m.parallel = true return nil } diff --git a/collectors/numastatsMetric.go b/collectors/numastatsMetric.go index 52a2638..f65a019 100644 --- a/collectors/numastatsMetric.go +++ b/collectors/numastatsMetric.go @@ -54,6 +54,7 @@ func (m *NUMAStatsCollector) Init(config json.RawMessage) error { } m.name = "NUMAStatsCollector" + m.parallel = true m.setup() m.meta = map[string]string{ "source": m.name, diff --git a/collectors/sampleMetric.go b/collectors/sampleMetric.go index bea4df0..47ec296 100644 --- a/collectors/sampleMetric.go +++ b/collectors/sampleMetric.go @@ -35,6 +35,10 @@ func (m *SampleCollector) Init(config json.RawMessage) error { m.name = "InternalCollector" // This is for later use, also call it early m.setup() + // Tell whether the collector should be run in parallel with others (reading files, ...) + // or it should be run serially, mostly for collectors acutally doing measurements + // because they should not measure the execution of the other collectors + m.parallel = true // Define meta information sent with each metric // (Can also be dynamic or this is the basic set with extension through AddMeta()) m.meta = map[string]string{"source": m.name, "group": "SAMPLE"} diff --git a/collectors/tempMetric.go b/collectors/tempMetric.go index e007099..af9d7fd 100644 --- a/collectors/tempMetric.go +++ b/collectors/tempMetric.go @@ -50,6 +50,7 @@ func (m *TempCollector) Init(config json.RawMessage) error { } m.name = "TempCollector" + m.parallel = true m.setup() if len(config) > 0 { err := json.Unmarshal(config, &m.config) diff --git a/collectors/topprocsMetric.go b/collectors/topprocsMetric.go index 408c3cc..1f4aaca 100644 --- a/collectors/topprocsMetric.go +++ b/collectors/topprocsMetric.go @@ -28,6 +28,7 @@ type TopProcsCollector struct { func (m *TopProcsCollector) Init(config json.RawMessage) error { var err error m.name = "TopProcsCollector" + m.parallel = true m.tags = map[string]string{"type": "node"} m.meta = map[string]string{"source": m.name, "group": "TopProcs"} if len(config) > 0 {