mirror of
https://github.com/ClusterCockpit/cc-metric-collector.git
synced 2025-04-17 10:25:56 +02:00
Provide info to CollectorManager whether the collector can be executed in parallel with others
This commit is contained in:
parent
ee4bd558f1
commit
7b804ad7a2
@ -55,6 +55,7 @@ func (m *BeegfsMetaCollector) Init(config json.RawMessage) error {
|
|||||||
|
|
||||||
m.name = "BeegfsMetaCollector"
|
m.name = "BeegfsMetaCollector"
|
||||||
m.setup()
|
m.setup()
|
||||||
|
m.parallel = true
|
||||||
// Set default beegfs-ctl binary
|
// Set default beegfs-ctl binary
|
||||||
|
|
||||||
m.config.Beegfs = DEFAULT_BEEGFS_CMD
|
m.config.Beegfs = DEFAULT_BEEGFS_CMD
|
||||||
|
@ -48,6 +48,7 @@ func (m *BeegfsStorageCollector) Init(config json.RawMessage) error {
|
|||||||
|
|
||||||
m.name = "BeegfsStorageCollector"
|
m.name = "BeegfsStorageCollector"
|
||||||
m.setup()
|
m.setup()
|
||||||
|
m.parallel = true
|
||||||
// Set default beegfs-ctl binary
|
// Set default beegfs-ctl binary
|
||||||
|
|
||||||
m.config.Beegfs = DEFAULT_BEEGFS_CMD
|
m.config.Beegfs = DEFAULT_BEEGFS_CMD
|
||||||
|
@ -48,6 +48,7 @@ func (m *CPUFreqCpuInfoCollector) Init(config json.RawMessage) error {
|
|||||||
m.setup()
|
m.setup()
|
||||||
|
|
||||||
m.name = "CPUFreqCpuInfoCollector"
|
m.name = "CPUFreqCpuInfoCollector"
|
||||||
|
m.parallel = true
|
||||||
m.meta = map[string]string{
|
m.meta = map[string]string{
|
||||||
"source": m.name,
|
"source": m.name,
|
||||||
"group": "CPU",
|
"group": "CPU",
|
||||||
|
@ -53,6 +53,7 @@ func (m *CPUFreqCollector) Init(config json.RawMessage) error {
|
|||||||
|
|
||||||
m.name = "CPUFreqCollector"
|
m.name = "CPUFreqCollector"
|
||||||
m.setup()
|
m.setup()
|
||||||
|
m.parallel = true
|
||||||
if len(config) > 0 {
|
if len(config) > 0 {
|
||||||
err := json.Unmarshal(config, &m.config)
|
err := json.Unmarshal(config, &m.config)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -30,6 +30,7 @@ type CpustatCollector struct {
|
|||||||
func (m *CpustatCollector) Init(config json.RawMessage) error {
|
func (m *CpustatCollector) Init(config json.RawMessage) error {
|
||||||
m.name = "CpustatCollector"
|
m.name = "CpustatCollector"
|
||||||
m.setup()
|
m.setup()
|
||||||
|
m.parallel = true
|
||||||
m.meta = map[string]string{"source": m.name, "group": "CPU", "unit": "Percent"}
|
m.meta = map[string]string{"source": m.name, "group": "CPU", "unit": "Percent"}
|
||||||
m.nodetags = map[string]string{"type": "node"}
|
m.nodetags = map[string]string{"type": "node"}
|
||||||
if len(config) > 0 {
|
if len(config) > 0 {
|
||||||
|
@ -33,6 +33,7 @@ type CustomCmdCollector struct {
|
|||||||
func (m *CustomCmdCollector) Init(config json.RawMessage) error {
|
func (m *CustomCmdCollector) Init(config json.RawMessage) error {
|
||||||
var err error
|
var err error
|
||||||
m.name = "CustomCmdCollector"
|
m.name = "CustomCmdCollector"
|
||||||
|
m.parallel = true
|
||||||
m.meta = map[string]string{"source": m.name, "group": "Custom"}
|
m.meta = map[string]string{"source": m.name, "group": "Custom"}
|
||||||
if len(config) > 0 {
|
if len(config) > 0 {
|
||||||
err = json.Unmarshal(config, &m.config)
|
err = json.Unmarshal(config, &m.config)
|
||||||
|
@ -29,6 +29,7 @@ type DiskstatCollector struct {
|
|||||||
|
|
||||||
func (m *DiskstatCollector) Init(config json.RawMessage) error {
|
func (m *DiskstatCollector) Init(config json.RawMessage) error {
|
||||||
m.name = "DiskstatCollector"
|
m.name = "DiskstatCollector"
|
||||||
|
m.parallel = true
|
||||||
m.meta = map[string]string{"source": m.name, "group": "Disk"}
|
m.meta = map[string]string{"source": m.name, "group": "Disk"}
|
||||||
m.setup()
|
m.setup()
|
||||||
if len(config) > 0 {
|
if len(config) > 0 {
|
||||||
@ -77,11 +78,18 @@ func (m *DiskstatCollector) Read(interval time.Duration, output chan lp.CCMetric
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
path := strings.Replace(linefields[1], `\040`, " ", -1)
|
path := strings.Replace(linefields[1], `\040`, " ", -1)
|
||||||
stat := syscall.Statfs_t{}
|
stat := syscall.Statfs_t{
|
||||||
|
Blocks: 0,
|
||||||
|
Bsize: 0,
|
||||||
|
Bfree: 0,
|
||||||
|
}
|
||||||
err := syscall.Statfs(path, &stat)
|
err := syscall.Statfs(path, &stat)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
if stat.Blocks == 0 || stat.Bsize == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
tags := map[string]string{"type": "node", "device": linefields[0]}
|
tags := map[string]string{"type": "node", "device": linefields[0]}
|
||||||
total := (stat.Blocks * uint64(stat.Bsize)) / uint64(1000000000)
|
total := (stat.Blocks * uint64(stat.Bsize)) / uint64(1000000000)
|
||||||
y, err := lp.New("disk_total", tags, m.meta, map[string]interface{}{"value": total}, time.Now())
|
y, err := lp.New("disk_total", tags, m.meta, map[string]interface{}{"value": total}, time.Now())
|
||||||
@ -95,11 +103,13 @@ func (m *DiskstatCollector) Read(interval time.Duration, output chan lp.CCMetric
|
|||||||
y.AddMeta("unit", "GBytes")
|
y.AddMeta("unit", "GBytes")
|
||||||
output <- y
|
output <- y
|
||||||
}
|
}
|
||||||
|
if total > 0 {
|
||||||
perc := (100 * (total - free)) / total
|
perc := (100 * (total - free)) / total
|
||||||
if perc > part_max_used {
|
if perc > part_max_used {
|
||||||
part_max_used = perc
|
part_max_used = perc
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
y, err := lp.New("part_max_used", map[string]string{"type": "node"}, m.meta, map[string]interface{}{"value": int(part_max_used)}, time.Now())
|
y, err := lp.New("part_max_used", map[string]string{"type": "node"}, m.meta, map[string]interface{}{"value": int(part_max_used)}, time.Now())
|
||||||
if err == nil {
|
if err == nil {
|
||||||
y.AddMeta("unit", "percent")
|
y.AddMeta("unit", "percent")
|
||||||
|
@ -46,6 +46,7 @@ func (m *GpfsCollector) Init(config json.RawMessage) error {
|
|||||||
var err error
|
var err error
|
||||||
m.name = "GpfsCollector"
|
m.name = "GpfsCollector"
|
||||||
m.setup()
|
m.setup()
|
||||||
|
m.parallel = true
|
||||||
|
|
||||||
// Set default mmpmon binary
|
// Set default mmpmon binary
|
||||||
m.config.Mmpmon = DEFAULT_GPFS_CMD
|
m.config.Mmpmon = DEFAULT_GPFS_CMD
|
||||||
|
@ -54,6 +54,7 @@ func (m *InfinibandCollector) Init(config json.RawMessage) error {
|
|||||||
var err error
|
var err error
|
||||||
m.name = "InfinibandCollector"
|
m.name = "InfinibandCollector"
|
||||||
m.setup()
|
m.setup()
|
||||||
|
m.parallel = true
|
||||||
m.meta = map[string]string{
|
m.meta = map[string]string{
|
||||||
"source": m.name,
|
"source": m.name,
|
||||||
"group": "Network",
|
"group": "Network",
|
||||||
|
@ -37,6 +37,7 @@ type IOstatCollector struct {
|
|||||||
func (m *IOstatCollector) Init(config json.RawMessage) error {
|
func (m *IOstatCollector) Init(config json.RawMessage) error {
|
||||||
var err error
|
var err error
|
||||||
m.name = "IOstatCollector"
|
m.name = "IOstatCollector"
|
||||||
|
m.parallel = true
|
||||||
m.meta = map[string]string{"source": m.name, "group": "Disk"}
|
m.meta = map[string]string{"source": m.name, "group": "Disk"}
|
||||||
m.setup()
|
m.setup()
|
||||||
if len(config) > 0 {
|
if len(config) > 0 {
|
||||||
|
@ -34,6 +34,7 @@ type IpmiCollector struct {
|
|||||||
func (m *IpmiCollector) Init(config json.RawMessage) error {
|
func (m *IpmiCollector) Init(config json.RawMessage) error {
|
||||||
m.name = "IpmiCollector"
|
m.name = "IpmiCollector"
|
||||||
m.setup()
|
m.setup()
|
||||||
|
m.parallel = true
|
||||||
m.meta = map[string]string{"source": m.name, "group": "IPMI"}
|
m.meta = map[string]string{"source": m.name, "group": "IPMI"}
|
||||||
m.config.IpmitoolPath = string(IPMITOOL_PATH)
|
m.config.IpmitoolPath = string(IPMITOOL_PATH)
|
||||||
m.config.IpmisensorsPath = string(IPMISENSORS_PATH)
|
m.config.IpmisensorsPath = string(IPMISENSORS_PATH)
|
||||||
|
@ -177,6 +177,7 @@ func getBaseFreq() float64 {
|
|||||||
|
|
||||||
func (m *LikwidCollector) Init(config json.RawMessage) error {
|
func (m *LikwidCollector) Init(config json.RawMessage) error {
|
||||||
m.name = "LikwidCollector"
|
m.name = "LikwidCollector"
|
||||||
|
m.parallel = false
|
||||||
m.initialized = false
|
m.initialized = false
|
||||||
m.running = false
|
m.running = false
|
||||||
m.config.AccessMode = LIKWID_DEF_ACCESSMODE
|
m.config.AccessMode = LIKWID_DEF_ACCESSMODE
|
||||||
|
@ -36,6 +36,7 @@ type LoadavgCollector struct {
|
|||||||
|
|
||||||
func (m *LoadavgCollector) Init(config json.RawMessage) error {
|
func (m *LoadavgCollector) Init(config json.RawMessage) error {
|
||||||
m.name = "LoadavgCollector"
|
m.name = "LoadavgCollector"
|
||||||
|
m.parallel = true
|
||||||
m.setup()
|
m.setup()
|
||||||
if len(config) > 0 {
|
if len(config) > 0 {
|
||||||
err := json.Unmarshal(config, &m.config)
|
err := json.Unmarshal(config, &m.config)
|
||||||
|
@ -288,6 +288,7 @@ var LustreDeriveMetrics = []LustreMetricDefinition{
|
|||||||
func (m *LustreCollector) Init(config json.RawMessage) error {
|
func (m *LustreCollector) Init(config json.RawMessage) error {
|
||||||
var err error
|
var err error
|
||||||
m.name = "LustreCollector"
|
m.name = "LustreCollector"
|
||||||
|
m.parallel = true
|
||||||
if len(config) > 0 {
|
if len(config) > 0 {
|
||||||
err = json.Unmarshal(config, &m.config)
|
err = json.Unmarshal(config, &m.config)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -81,6 +81,7 @@ func getStats(filename string) map[string]MemstatStats {
|
|||||||
func (m *MemstatCollector) Init(config json.RawMessage) error {
|
func (m *MemstatCollector) Init(config json.RawMessage) error {
|
||||||
var err error
|
var err error
|
||||||
m.name = "MemstatCollector"
|
m.name = "MemstatCollector"
|
||||||
|
m.parallel = true
|
||||||
m.config.NodeStats = true
|
m.config.NodeStats = true
|
||||||
m.config.NumaStats = false
|
m.config.NumaStats = false
|
||||||
if len(config) > 0 {
|
if len(config) > 0 {
|
||||||
@ -159,8 +160,10 @@ func (m *MemstatCollector) Init(config json.RawMessage) error {
|
|||||||
|
|
||||||
func (m *MemstatCollector) Read(interval time.Duration, output chan lp.CCMetric) {
|
func (m *MemstatCollector) Read(interval time.Duration, output chan lp.CCMetric) {
|
||||||
if !m.init {
|
if !m.init {
|
||||||
|
cclog.ComponentPrint(m.name, "Here")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
cclog.ComponentPrint(m.name, time.Now())
|
||||||
|
|
||||||
sendStats := func(stats map[string]MemstatStats, tags map[string]string) {
|
sendStats := func(stats map[string]MemstatStats, tags map[string]string) {
|
||||||
for match, name := range m.matches {
|
for match, name := range m.matches {
|
||||||
|
@ -16,6 +16,7 @@ type MetricCollector interface {
|
|||||||
Name() string // Name of the metric collector
|
Name() string // Name of the metric collector
|
||||||
Init(config json.RawMessage) error // Initialize metric collector
|
Init(config json.RawMessage) error // Initialize metric collector
|
||||||
Initialized() bool // Is metric collector initialized?
|
Initialized() bool // Is metric collector initialized?
|
||||||
|
Parallel() bool
|
||||||
Read(duration time.Duration, output chan lp.CCMetric) // Read metrics from metric collector
|
Read(duration time.Duration, output chan lp.CCMetric) // Read metrics from metric collector
|
||||||
Close() // Close / finish metric collector
|
Close() // Close / finish metric collector
|
||||||
}
|
}
|
||||||
@ -23,6 +24,7 @@ type MetricCollector interface {
|
|||||||
type metricCollector struct {
|
type metricCollector struct {
|
||||||
name string // name of the metric
|
name string // name of the metric
|
||||||
init bool // is metric collector initialized?
|
init bool // is metric collector initialized?
|
||||||
|
parallel bool // can the metric collector be executed in parallel with others
|
||||||
meta map[string]string // static meta data tags
|
meta map[string]string // static meta data tags
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -31,6 +33,11 @@ func (c *metricCollector) Name() string {
|
|||||||
return c.name
|
return c.name
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Name returns the name of the metric collector
|
||||||
|
func (c *metricCollector) Parallel() bool {
|
||||||
|
return c.parallel
|
||||||
|
}
|
||||||
|
|
||||||
// Setup is for future use
|
// Setup is for future use
|
||||||
func (c *metricCollector) setup() error {
|
func (c *metricCollector) setup() error {
|
||||||
return nil
|
return nil
|
||||||
|
@ -39,6 +39,7 @@ type NetstatCollector struct {
|
|||||||
|
|
||||||
func (m *NetstatCollector) Init(config json.RawMessage) error {
|
func (m *NetstatCollector) Init(config json.RawMessage) error {
|
||||||
m.name = "NetstatCollector"
|
m.name = "NetstatCollector"
|
||||||
|
m.parallel = true
|
||||||
m.setup()
|
m.setup()
|
||||||
m.lastTimestamp = time.Now()
|
m.lastTimestamp = time.Now()
|
||||||
|
|
||||||
|
@ -114,6 +114,7 @@ func (m *nfsCollector) MainInit(config json.RawMessage) error {
|
|||||||
m.data = make(map[string]NfsCollectorData)
|
m.data = make(map[string]NfsCollectorData)
|
||||||
m.initStats()
|
m.initStats()
|
||||||
m.init = true
|
m.init = true
|
||||||
|
m.parallel = true
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -54,6 +54,7 @@ func (m *NUMAStatsCollector) Init(config json.RawMessage) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
m.name = "NUMAStatsCollector"
|
m.name = "NUMAStatsCollector"
|
||||||
|
m.parallel = true
|
||||||
m.setup()
|
m.setup()
|
||||||
m.meta = map[string]string{
|
m.meta = map[string]string{
|
||||||
"source": m.name,
|
"source": m.name,
|
||||||
|
@ -35,6 +35,10 @@ func (m *SampleCollector) Init(config json.RawMessage) error {
|
|||||||
m.name = "InternalCollector"
|
m.name = "InternalCollector"
|
||||||
// This is for later use, also call it early
|
// This is for later use, also call it early
|
||||||
m.setup()
|
m.setup()
|
||||||
|
// Tell whether the collector should be run in parallel with others (reading files, ...)
|
||||||
|
// or it should be run serially, mostly for collectors acutally doing measurements
|
||||||
|
// because they should not measure the execution of the other collectors
|
||||||
|
m.parallel = true
|
||||||
// Define meta information sent with each metric
|
// Define meta information sent with each metric
|
||||||
// (Can also be dynamic or this is the basic set with extension through AddMeta())
|
// (Can also be dynamic or this is the basic set with extension through AddMeta())
|
||||||
m.meta = map[string]string{"source": m.name, "group": "SAMPLE"}
|
m.meta = map[string]string{"source": m.name, "group": "SAMPLE"}
|
||||||
|
@ -50,6 +50,7 @@ func (m *TempCollector) Init(config json.RawMessage) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
m.name = "TempCollector"
|
m.name = "TempCollector"
|
||||||
|
m.parallel = true
|
||||||
m.setup()
|
m.setup()
|
||||||
if len(config) > 0 {
|
if len(config) > 0 {
|
||||||
err := json.Unmarshal(config, &m.config)
|
err := json.Unmarshal(config, &m.config)
|
||||||
@ -116,6 +117,10 @@ func (m *TempCollector) Init(config json.RawMessage) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Sensor file
|
// Sensor file
|
||||||
|
_, err = ioutil.ReadFile(file)
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
sensor.file = file
|
sensor.file = file
|
||||||
|
|
||||||
// Sensor tags
|
// Sensor tags
|
||||||
|
@ -28,6 +28,7 @@ type TopProcsCollector struct {
|
|||||||
func (m *TopProcsCollector) Init(config json.RawMessage) error {
|
func (m *TopProcsCollector) Init(config json.RawMessage) error {
|
||||||
var err error
|
var err error
|
||||||
m.name = "TopProcsCollector"
|
m.name = "TopProcsCollector"
|
||||||
|
m.parallel = true
|
||||||
m.tags = map[string]string{"type": "node"}
|
m.tags = map[string]string{"type": "node"}
|
||||||
m.meta = map[string]string{"source": m.name, "group": "TopProcs"}
|
m.meta = map[string]string{"source": m.name, "group": "TopProcs"}
|
||||||
if len(config) > 0 {
|
if len(config) > 0 {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user