Collectors in parallel (#74)

* Provide info to CollectorManager whether the collector can be executed in parallel with others

* Split serial and parallel collectors. Read in parallel first
This commit is contained in:
Thomas Gruber 2022-05-13 14:10:39 +02:00 committed by GitHub
parent 1db5f3b29a
commit 5c34805918
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 109 additions and 40 deletions

View File

@ -55,6 +55,7 @@ func (m *BeegfsMetaCollector) Init(config json.RawMessage) error {
m.name = "BeegfsMetaCollector" m.name = "BeegfsMetaCollector"
m.setup() m.setup()
m.parallel = true
// Set default beegfs-ctl binary // Set default beegfs-ctl binary
m.config.Beegfs = DEFAULT_BEEGFS_CMD m.config.Beegfs = DEFAULT_BEEGFS_CMD

View File

@ -48,6 +48,7 @@ func (m *BeegfsStorageCollector) Init(config json.RawMessage) error {
m.name = "BeegfsStorageCollector" m.name = "BeegfsStorageCollector"
m.setup() m.setup()
m.parallel = true
// Set default beegfs-ctl binary // Set default beegfs-ctl binary
m.config.Beegfs = DEFAULT_BEEGFS_CMD m.config.Beegfs = DEFAULT_BEEGFS_CMD

View File

@ -14,39 +14,42 @@ import (
// Map of all available metric collectors // Map of all available metric collectors
var AvailableCollectors = map[string]MetricCollector{ var AvailableCollectors = map[string]MetricCollector{
"likwid": new(LikwidCollector), "likwid": new(LikwidCollector),
"loadavg": new(LoadavgCollector), "loadavg": new(LoadavgCollector),
"memstat": new(MemstatCollector), "memstat": new(MemstatCollector),
"netstat": new(NetstatCollector), "netstat": new(NetstatCollector),
"ibstat": new(InfinibandCollector), "ibstat": new(InfinibandCollector),
"lustrestat": new(LustreCollector), "lustrestat": new(LustreCollector),
"cpustat": new(CpustatCollector), "cpustat": new(CpustatCollector),
"topprocs": new(TopProcsCollector), "topprocs": new(TopProcsCollector),
"nvidia": new(NvidiaCollector), "nvidia": new(NvidiaCollector),
"customcmd": new(CustomCmdCollector), "customcmd": new(CustomCmdCollector),
"iostat": new(IOstatCollector), "iostat": new(IOstatCollector),
"diskstat": new(DiskstatCollector), "diskstat": new(DiskstatCollector),
"tempstat": new(TempCollector), "tempstat": new(TempCollector),
"ipmistat": new(IpmiCollector), "ipmistat": new(IpmiCollector),
"gpfs": new(GpfsCollector), "gpfs": new(GpfsCollector),
"cpufreq": new(CPUFreqCollector), "cpufreq": new(CPUFreqCollector),
"cpufreq_cpuinfo": new(CPUFreqCpuInfoCollector), "cpufreq_cpuinfo": new(CPUFreqCpuInfoCollector),
"nfs3stat": new(Nfs3Collector), "nfs3stat": new(Nfs3Collector),
"nfs4stat": new(Nfs4Collector), "nfs4stat": new(Nfs4Collector),
"numastats": new(NUMAStatsCollector), "numastats": new(NUMAStatsCollector),
"beegfs_meta": new(BeegfsMetaCollector), "beegfs_meta": new(BeegfsMetaCollector),
"beegfs_storage": new(BeegfsStorageCollector), "beegfs_storage": new(BeegfsStorageCollector),
} }
// Metric collector manager data structure // Metric collector manager data structure
type collectorManager struct { type collectorManager struct {
collectors []MetricCollector // List of metric collectors to use collectors []MetricCollector // List of metric collectors to read in parallel
output chan lp.CCMetric // Output channels serial []MetricCollector // List of metric collectors to read serially
done chan bool // channel to finish / stop metric collector manager output chan lp.CCMetric // Output channels
ticker mct.MultiChanTicker // periodically ticking once each interval done chan bool // channel to finish / stop metric collector manager
duration time.Duration // duration (for metrics that measure over a given duration) ticker mct.MultiChanTicker // periodically ticking once each interval
wg *sync.WaitGroup // wait group for all goroutines in cc-metric-collector duration time.Duration // duration (for metrics that measure over a given duration)
config map[string]json.RawMessage // json encoded config for collector manager wg *sync.WaitGroup // wait group for all goroutines in cc-metric-collector
config map[string]json.RawMessage // json encoded config for collector manager
collector_wg sync.WaitGroup // internally used wait group for the parallel reading of collector
parallel_run bool // Flag whether the collectors are currently read in parallel
} }
// Metric collector manager access functions // Metric collector manager access functions
@ -66,6 +69,7 @@ type CollectorManager interface {
// Initialization is done for all configured collectors // Initialization is done for all configured collectors
func (cm *collectorManager) Init(ticker mct.MultiChanTicker, duration time.Duration, wg *sync.WaitGroup, collectConfigFile string) error { func (cm *collectorManager) Init(ticker mct.MultiChanTicker, duration time.Duration, wg *sync.WaitGroup, collectConfigFile string) error {
cm.collectors = make([]MetricCollector, 0) cm.collectors = make([]MetricCollector, 0)
cm.serial = make([]MetricCollector, 0)
cm.output = nil cm.output = nil
cm.done = make(chan bool) cm.done = make(chan bool)
cm.wg = wg cm.wg = wg
@ -100,7 +104,11 @@ func (cm *collectorManager) Init(ticker mct.MultiChanTicker, duration time.Durat
continue continue
} }
cclog.ComponentDebug("CollectorManager", "ADD COLLECTOR", collector.Name()) cclog.ComponentDebug("CollectorManager", "ADD COLLECTOR", collector.Name())
cm.collectors = append(cm.collectors, collector) if collector.Parallel() {
cm.collectors = append(cm.collectors, collector)
} else {
cm.serial = append(cm.serial, collector)
}
} }
return nil return nil
} }
@ -116,6 +124,10 @@ func (cm *collectorManager) Start() {
// Collector manager is done // Collector manager is done
done := func() { done := func() {
// close all metric collectors // close all metric collectors
if cm.parallel_run {
cm.collector_wg.Wait()
cm.parallel_run = false
}
for _, c := range cm.collectors { for _, c := range cm.collectors {
c.Close() c.Close()
} }
@ -130,7 +142,26 @@ func (cm *collectorManager) Start() {
done() done()
return return
case t := <-tick: case t := <-tick:
cm.parallel_run = true
for _, c := range cm.collectors { for _, c := range cm.collectors {
// Wait for done signal or execute the collector
select {
case <-cm.done:
done()
return
default:
// Read metrics from collector c via goroutine
cclog.ComponentDebug("CollectorManager", c.Name(), t)
cm.collector_wg.Add(1)
go func(myc MetricCollector) {
myc.Read(cm.duration, cm.output)
cm.collector_wg.Done()
}(c)
}
}
cm.collector_wg.Wait()
cm.parallel_run = false
for _, c := range cm.serial {
// Wait for done signal or execute the collector // Wait for done signal or execute the collector
select { select {
case <-cm.done: case <-cm.done:

View File

@ -48,6 +48,7 @@ func (m *CPUFreqCpuInfoCollector) Init(config json.RawMessage) error {
m.setup() m.setup()
m.name = "CPUFreqCpuInfoCollector" m.name = "CPUFreqCpuInfoCollector"
m.parallel = true
m.meta = map[string]string{ m.meta = map[string]string{
"source": m.name, "source": m.name,
"group": "CPU", "group": "CPU",

View File

@ -53,6 +53,7 @@ func (m *CPUFreqCollector) Init(config json.RawMessage) error {
m.name = "CPUFreqCollector" m.name = "CPUFreqCollector"
m.setup() m.setup()
m.parallel = true
if len(config) > 0 { if len(config) > 0 {
err := json.Unmarshal(config, &m.config) err := json.Unmarshal(config, &m.config)
if err != nil { if err != nil {

View File

@ -30,6 +30,7 @@ type CpustatCollector struct {
func (m *CpustatCollector) Init(config json.RawMessage) error { func (m *CpustatCollector) Init(config json.RawMessage) error {
m.name = "CpustatCollector" m.name = "CpustatCollector"
m.setup() m.setup()
m.parallel = true
m.meta = map[string]string{"source": m.name, "group": "CPU", "unit": "Percent"} m.meta = map[string]string{"source": m.name, "group": "CPU", "unit": "Percent"}
m.nodetags = map[string]string{"type": "node"} m.nodetags = map[string]string{"type": "node"}
if len(config) > 0 { if len(config) > 0 {

View File

@ -33,6 +33,7 @@ type CustomCmdCollector struct {
func (m *CustomCmdCollector) Init(config json.RawMessage) error { func (m *CustomCmdCollector) Init(config json.RawMessage) error {
var err error var err error
m.name = "CustomCmdCollector" m.name = "CustomCmdCollector"
m.parallel = true
m.meta = map[string]string{"source": m.name, "group": "Custom"} m.meta = map[string]string{"source": m.name, "group": "Custom"}
if len(config) > 0 { if len(config) > 0 {
err = json.Unmarshal(config, &m.config) err = json.Unmarshal(config, &m.config)

View File

@ -29,6 +29,7 @@ type DiskstatCollector struct {
func (m *DiskstatCollector) Init(config json.RawMessage) error { func (m *DiskstatCollector) Init(config json.RawMessage) error {
m.name = "DiskstatCollector" m.name = "DiskstatCollector"
m.parallel = true
m.meta = map[string]string{"source": m.name, "group": "Disk"} m.meta = map[string]string{"source": m.name, "group": "Disk"}
m.setup() m.setup()
if len(config) > 0 { if len(config) > 0 {
@ -77,7 +78,11 @@ func (m *DiskstatCollector) Read(interval time.Duration, output chan lp.CCMetric
continue continue
} }
path := strings.Replace(linefields[1], `\040`, " ", -1) path := strings.Replace(linefields[1], `\040`, " ", -1)
stat := syscall.Statfs_t{} stat := syscall.Statfs_t{
Blocks: 0,
Bsize: 0,
Bfree: 0,
}
err := syscall.Statfs(path, &stat) err := syscall.Statfs(path, &stat)
if err != nil { if err != nil {
continue continue
@ -98,9 +103,11 @@ func (m *DiskstatCollector) Read(interval time.Duration, output chan lp.CCMetric
y.AddMeta("unit", "GBytes") y.AddMeta("unit", "GBytes")
output <- y output <- y
} }
perc := (100 * (total - free)) / total if total > 0 {
if perc > part_max_used { perc := (100 * (total - free)) / total
part_max_used = perc if perc > part_max_used {
part_max_used = perc
}
} }
} }
y, err := lp.New("part_max_used", map[string]string{"type": "node"}, m.meta, map[string]interface{}{"value": int(part_max_used)}, time.Now()) y, err := lp.New("part_max_used", map[string]string{"type": "node"}, m.meta, map[string]interface{}{"value": int(part_max_used)}, time.Now())

View File

@ -46,6 +46,7 @@ func (m *GpfsCollector) Init(config json.RawMessage) error {
var err error var err error
m.name = "GpfsCollector" m.name = "GpfsCollector"
m.setup() m.setup()
m.parallel = true
// Set default mmpmon binary // Set default mmpmon binary
m.config.Mmpmon = DEFAULT_GPFS_CMD m.config.Mmpmon = DEFAULT_GPFS_CMD

View File

@ -54,6 +54,7 @@ func (m *InfinibandCollector) Init(config json.RawMessage) error {
var err error var err error
m.name = "InfinibandCollector" m.name = "InfinibandCollector"
m.setup() m.setup()
m.parallel = true
m.meta = map[string]string{ m.meta = map[string]string{
"source": m.name, "source": m.name,
"group": "Network", "group": "Network",

View File

@ -37,6 +37,7 @@ type IOstatCollector struct {
func (m *IOstatCollector) Init(config json.RawMessage) error { func (m *IOstatCollector) Init(config json.RawMessage) error {
var err error var err error
m.name = "IOstatCollector" m.name = "IOstatCollector"
m.parallel = true
m.meta = map[string]string{"source": m.name, "group": "Disk"} m.meta = map[string]string{"source": m.name, "group": "Disk"}
m.setup() m.setup()
if len(config) > 0 { if len(config) > 0 {

View File

@ -34,6 +34,7 @@ type IpmiCollector struct {
func (m *IpmiCollector) Init(config json.RawMessage) error { func (m *IpmiCollector) Init(config json.RawMessage) error {
m.name = "IpmiCollector" m.name = "IpmiCollector"
m.setup() m.setup()
m.parallel = true
m.meta = map[string]string{"source": m.name, "group": "IPMI"} m.meta = map[string]string{"source": m.name, "group": "IPMI"}
m.config.IpmitoolPath = string(IPMITOOL_PATH) m.config.IpmitoolPath = string(IPMITOOL_PATH)
m.config.IpmisensorsPath = string(IPMISENSORS_PATH) m.config.IpmisensorsPath = string(IPMISENSORS_PATH)

View File

@ -177,6 +177,7 @@ func getBaseFreq() float64 {
func (m *LikwidCollector) Init(config json.RawMessage) error { func (m *LikwidCollector) Init(config json.RawMessage) error {
m.name = "LikwidCollector" m.name = "LikwidCollector"
m.parallel = false
m.initialized = false m.initialized = false
m.running = false m.running = false
m.config.AccessMode = LIKWID_DEF_ACCESSMODE m.config.AccessMode = LIKWID_DEF_ACCESSMODE

View File

@ -36,6 +36,7 @@ type LoadavgCollector struct {
func (m *LoadavgCollector) Init(config json.RawMessage) error { func (m *LoadavgCollector) Init(config json.RawMessage) error {
m.name = "LoadavgCollector" m.name = "LoadavgCollector"
m.parallel = true
m.setup() m.setup()
if len(config) > 0 { if len(config) > 0 {
err := json.Unmarshal(config, &m.config) err := json.Unmarshal(config, &m.config)

View File

@ -288,6 +288,7 @@ var LustreDeriveMetrics = []LustreMetricDefinition{
func (m *LustreCollector) Init(config json.RawMessage) error { func (m *LustreCollector) Init(config json.RawMessage) error {
var err error var err error
m.name = "LustreCollector" m.name = "LustreCollector"
m.parallel = true
if len(config) > 0 { if len(config) > 0 {
err = json.Unmarshal(config, &m.config) err = json.Unmarshal(config, &m.config)
if err != nil { if err != nil {

View File

@ -81,6 +81,7 @@ func getStats(filename string) map[string]MemstatStats {
func (m *MemstatCollector) Init(config json.RawMessage) error { func (m *MemstatCollector) Init(config json.RawMessage) error {
var err error var err error
m.name = "MemstatCollector" m.name = "MemstatCollector"
m.parallel = true
m.config.NodeStats = true m.config.NodeStats = true
m.config.NumaStats = false m.config.NumaStats = false
if len(config) > 0 { if len(config) > 0 {
@ -159,6 +160,7 @@ func (m *MemstatCollector) Init(config json.RawMessage) error {
func (m *MemstatCollector) Read(interval time.Duration, output chan lp.CCMetric) { func (m *MemstatCollector) Read(interval time.Duration, output chan lp.CCMetric) {
if !m.init { if !m.init {
cclog.ComponentPrint(m.name, "Here")
return return
} }

View File

@ -13,17 +13,19 @@ import (
) )
type MetricCollector interface { type MetricCollector interface {
Name() string // Name of the metric collector Name() string // Name of the metric collector
Init(config json.RawMessage) error // Initialize metric collector Init(config json.RawMessage) error // Initialize metric collector
Initialized() bool // Is metric collector initialized? Initialized() bool // Is metric collector initialized?
Parallel() bool
Read(duration time.Duration, output chan lp.CCMetric) // Read metrics from metric collector Read(duration time.Duration, output chan lp.CCMetric) // Read metrics from metric collector
Close() // Close / finish metric collector Close() // Close / finish metric collector
} }
type metricCollector struct { type metricCollector struct {
name string // name of the metric name string // name of the metric
init bool // is metric collector initialized? init bool // is metric collector initialized?
meta map[string]string // static meta data tags parallel bool // can the metric collector be executed in parallel with others
meta map[string]string // static meta data tags
} }
// Name returns the name of the metric collector // Name returns the name of the metric collector
@ -31,6 +33,11 @@ func (c *metricCollector) Name() string {
return c.name return c.name
} }
// Name returns the name of the metric collector
func (c *metricCollector) Parallel() bool {
return c.parallel
}
// Setup is for future use // Setup is for future use
func (c *metricCollector) setup() error { func (c *metricCollector) setup() error {
return nil return nil

View File

@ -39,6 +39,7 @@ type NetstatCollector struct {
func (m *NetstatCollector) Init(config json.RawMessage) error { func (m *NetstatCollector) Init(config json.RawMessage) error {
m.name = "NetstatCollector" m.name = "NetstatCollector"
m.parallel = true
m.setup() m.setup()
m.lastTimestamp = time.Now() m.lastTimestamp = time.Now()

View File

@ -114,6 +114,7 @@ func (m *nfsCollector) MainInit(config json.RawMessage) error {
m.data = make(map[string]NfsCollectorData) m.data = make(map[string]NfsCollectorData)
m.initStats() m.initStats()
m.init = true m.init = true
m.parallel = true
return nil return nil
} }

View File

@ -54,6 +54,7 @@ func (m *NUMAStatsCollector) Init(config json.RawMessage) error {
} }
m.name = "NUMAStatsCollector" m.name = "NUMAStatsCollector"
m.parallel = true
m.setup() m.setup()
m.meta = map[string]string{ m.meta = map[string]string{
"source": m.name, "source": m.name,

View File

@ -35,6 +35,10 @@ func (m *SampleCollector) Init(config json.RawMessage) error {
m.name = "InternalCollector" m.name = "InternalCollector"
// This is for later use, also call it early // This is for later use, also call it early
m.setup() m.setup()
// Tell whether the collector should be run in parallel with others (reading files, ...)
// or it should be run serially, mostly for collectors acutally doing measurements
// because they should not measure the execution of the other collectors
m.parallel = true
// Define meta information sent with each metric // Define meta information sent with each metric
// (Can also be dynamic or this is the basic set with extension through AddMeta()) // (Can also be dynamic or this is the basic set with extension through AddMeta())
m.meta = map[string]string{"source": m.name, "group": "SAMPLE"} m.meta = map[string]string{"source": m.name, "group": "SAMPLE"}

View File

@ -50,6 +50,7 @@ func (m *TempCollector) Init(config json.RawMessage) error {
} }
m.name = "TempCollector" m.name = "TempCollector"
m.parallel = true
m.setup() m.setup()
if len(config) > 0 { if len(config) > 0 {
err := json.Unmarshal(config, &m.config) err := json.Unmarshal(config, &m.config)

View File

@ -28,6 +28,7 @@ type TopProcsCollector struct {
func (m *TopProcsCollector) Init(config json.RawMessage) error { func (m *TopProcsCollector) Init(config json.RawMessage) error {
var err error var err error
m.name = "TopProcsCollector" m.name = "TopProcsCollector"
m.parallel = true
m.tags = map[string]string{"type": "node"} m.tags = map[string]string{"type": "node"}
m.meta = map[string]string{"source": m.name, "group": "TopProcs"} m.meta = map[string]string{"source": m.name, "group": "TopProcs"}
if len(config) > 0 { if len(config) > 0 {