mirror of
https://github.com/ClusterCockpit/cc-metric-collector.git
synced 2025-04-08 22:45:55 +02:00
Add time-based derivatived (e.g. bandwidth) to some collectors
This commit is contained in:
parent
c0e600269a
commit
96dc243c08
@ -19,14 +19,22 @@ import (
|
|||||||
|
|
||||||
const DEFAULT_GPFS_CMD = `mmpmon`
|
const DEFAULT_GPFS_CMD = `mmpmon`
|
||||||
|
|
||||||
|
type GpfsCollectorLastValues struct {
|
||||||
|
read int64
|
||||||
|
write int64
|
||||||
|
}
|
||||||
|
|
||||||
type GpfsCollector struct {
|
type GpfsCollector struct {
|
||||||
metricCollector
|
metricCollector
|
||||||
tags map[string]string
|
tags map[string]string
|
||||||
config struct {
|
config struct {
|
||||||
Mmpmon string `json:"mmpmon_path,omitempty"`
|
Mmpmon string `json:"mmpmon_path,omitempty"`
|
||||||
ExcludeFilesystem []string `json:"exclude_filesystem,omitempty"`
|
ExcludeFilesystem []string `json:"exclude_filesystem,omitempty"`
|
||||||
|
SendBandwidths bool `json:"send_bandwidths"`
|
||||||
}
|
}
|
||||||
skipFS map[string]struct{}
|
skipFS map[string]struct{}
|
||||||
|
lastTimestamp time.Time // Store time stamp of last tick to derive bandwidths
|
||||||
|
lastValues map[string]GpfsCollectorLastValues
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *GpfsCollector) Init(config json.RawMessage) error {
|
func (m *GpfsCollector) Init(config json.RawMessage) error {
|
||||||
@ -38,6 +46,7 @@ func (m *GpfsCollector) Init(config json.RawMessage) error {
|
|||||||
var err error
|
var err error
|
||||||
m.name = "GpfsCollector"
|
m.name = "GpfsCollector"
|
||||||
m.setup()
|
m.setup()
|
||||||
|
m.lastTimestamp = time.Now()
|
||||||
|
|
||||||
// Set default mmpmon binary
|
// Set default mmpmon binary
|
||||||
m.config.Mmpmon = string(DEFAULT_GPFS_CMD)
|
m.config.Mmpmon = string(DEFAULT_GPFS_CMD)
|
||||||
@ -89,6 +98,9 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
now := time.Now()
|
||||||
|
tdiff := now.Sub(m.lastTimestamp)
|
||||||
|
m.lastTimestamp = now
|
||||||
// mmpmon:
|
// mmpmon:
|
||||||
// -p: generate output that can be parsed
|
// -p: generate output that can be parsed
|
||||||
// -s: suppress the prompt on input
|
// -s: suppress the prompt on input
|
||||||
@ -148,6 +160,12 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
m.tags["filesystem"] = filesystem
|
m.tags["filesystem"] = filesystem
|
||||||
|
if _, ok := m.lastValues[filesystem]; !ok {
|
||||||
|
m.lastValues[filesystem] = GpfsCollectorLastValues{
|
||||||
|
read: 0,
|
||||||
|
write: 0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// return code
|
// return code
|
||||||
rc, err := strconv.Atoi(key_value["_rc_"])
|
rc, err := strconv.Atoi(key_value["_rc_"])
|
||||||
@ -191,6 +209,15 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) {
|
|||||||
if y, err := lp.New("gpfs_bytes_read", m.tags, m.meta, map[string]interface{}{"value": bytesRead}, timestamp); err == nil {
|
if y, err := lp.New("gpfs_bytes_read", m.tags, m.meta, map[string]interface{}{"value": bytesRead}, timestamp); err == nil {
|
||||||
output <- y
|
output <- y
|
||||||
}
|
}
|
||||||
|
if m.config.SendBandwidths {
|
||||||
|
lastVal := m.lastValues[filesystem]
|
||||||
|
diff := bytesRead - lastVal.read
|
||||||
|
lastVal.read = bytesRead
|
||||||
|
if y, err := lp.New("gpfs_bw_read", m.tags, m.meta, map[string]interface{}{"value": float64(diff) / tdiff.Seconds()}, timestamp); err == nil {
|
||||||
|
output <- y
|
||||||
|
}
|
||||||
|
m.lastValues[filesystem] = lastVal
|
||||||
|
}
|
||||||
|
|
||||||
// bytes written
|
// bytes written
|
||||||
bytesWritten, err := strconv.ParseInt(key_value["_bw_"], 10, 64)
|
bytesWritten, err := strconv.ParseInt(key_value["_bw_"], 10, 64)
|
||||||
@ -203,6 +230,15 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) {
|
|||||||
if y, err := lp.New("gpfs_bytes_written", m.tags, m.meta, map[string]interface{}{"value": bytesWritten}, timestamp); err == nil {
|
if y, err := lp.New("gpfs_bytes_written", m.tags, m.meta, map[string]interface{}{"value": bytesWritten}, timestamp); err == nil {
|
||||||
output <- y
|
output <- y
|
||||||
}
|
}
|
||||||
|
if m.config.SendBandwidths {
|
||||||
|
lastVal := m.lastValues[filesystem]
|
||||||
|
diff := bytesWritten - lastVal.write
|
||||||
|
lastVal.write = bytesWritten
|
||||||
|
if y, err := lp.New("gpfs_bw_write", m.tags, m.meta, map[string]interface{}{"value": float64(diff) / tdiff.Seconds()}, timestamp); err == nil {
|
||||||
|
output <- y
|
||||||
|
}
|
||||||
|
m.lastValues[filesystem] = lastVal
|
||||||
|
}
|
||||||
|
|
||||||
// number of opens
|
// number of opens
|
||||||
numOpens, err := strconv.ParseInt(key_value["_oc_"], 10, 64)
|
numOpens, err := strconv.ParseInt(key_value["_oc_"], 10, 64)
|
||||||
|
@ -24,14 +24,18 @@ type InfinibandCollectorInfo struct {
|
|||||||
port string // IB device port
|
port string // IB device port
|
||||||
portCounterFiles map[string]string // mapping counter name -> sysfs file
|
portCounterFiles map[string]string // mapping counter name -> sysfs file
|
||||||
tagSet map[string]string // corresponding tag list
|
tagSet map[string]string // corresponding tag list
|
||||||
|
stats map[string]int64
|
||||||
}
|
}
|
||||||
|
|
||||||
type InfinibandCollector struct {
|
type InfinibandCollector struct {
|
||||||
metricCollector
|
metricCollector
|
||||||
config struct {
|
config struct {
|
||||||
ExcludeDevices []string `json:"exclude_devices,omitempty"` // IB device to exclude e.g. mlx5_0
|
ExcludeDevices []string `json:"exclude_devices,omitempty"` // IB device to exclude e.g. mlx5_0
|
||||||
|
SendAbsoluteValues bool `json:"send_abs_values"`
|
||||||
|
SendDerivedValues bool `json:"send_derived_values"`
|
||||||
}
|
}
|
||||||
info []*InfinibandCollectorInfo
|
info []*InfinibandCollectorInfo
|
||||||
|
lastTimestamp time.Time // Store time stamp of last tick to derive bandwidths
|
||||||
}
|
}
|
||||||
|
|
||||||
// Init initializes the Infiniband collector by walking through files below IB_BASEPATH
|
// Init initializes the Infiniband collector by walking through files below IB_BASEPATH
|
||||||
@ -49,6 +53,9 @@ func (m *InfinibandCollector) Init(config json.RawMessage) error {
|
|||||||
"source": m.name,
|
"source": m.name,
|
||||||
"group": "Network",
|
"group": "Network",
|
||||||
}
|
}
|
||||||
|
m.lastTimestamp = time.Now()
|
||||||
|
m.config.SendAbsoluteValues = true
|
||||||
|
m.config.SendDerivedValues = false
|
||||||
if len(config) > 0 {
|
if len(config) > 0 {
|
||||||
err = json.Unmarshal(config, &m.config)
|
err = json.Unmarshal(config, &m.config)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -60,10 +67,10 @@ func (m *InfinibandCollector) Init(config json.RawMessage) error {
|
|||||||
globPattern := filepath.Join(IB_BASEPATH, "*", "ports", "*")
|
globPattern := filepath.Join(IB_BASEPATH, "*", "ports", "*")
|
||||||
ibDirs, err := filepath.Glob(globPattern)
|
ibDirs, err := filepath.Glob(globPattern)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("Unable to glob files with pattern %s: %v", globPattern, err)
|
return fmt.Errorf("unable to glob files with pattern %s: %v", globPattern, err)
|
||||||
}
|
}
|
||||||
if ibDirs == nil {
|
if ibDirs == nil {
|
||||||
return fmt.Errorf("Unable to find any directories with pattern %s", globPattern)
|
return fmt.Errorf("unable to find any directories with pattern %s", globPattern)
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, path := range ibDirs {
|
for _, path := range ibDirs {
|
||||||
@ -106,7 +113,7 @@ func (m *InfinibandCollector) Init(config json.RawMessage) error {
|
|||||||
for _, counterFile := range portCounterFiles {
|
for _, counterFile := range portCounterFiles {
|
||||||
err := unix.Access(counterFile, unix.R_OK)
|
err := unix.Access(counterFile, unix.R_OK)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("Unable to access %s: %v", counterFile, err)
|
return fmt.Errorf("unable to access %s: %v", counterFile, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -122,11 +129,17 @@ func (m *InfinibandCollector) Init(config json.RawMessage) error {
|
|||||||
"port": port,
|
"port": port,
|
||||||
"lid": LID,
|
"lid": LID,
|
||||||
},
|
},
|
||||||
|
stats: map[string]int64{
|
||||||
|
"ib_recv": 0,
|
||||||
|
"ib_xmit": 0,
|
||||||
|
"ib_recv_pkts": 0,
|
||||||
|
"ib_xmit_pkts": 0,
|
||||||
|
},
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(m.info) == 0 {
|
if len(m.info) == 0 {
|
||||||
return fmt.Errorf("Found no IB devices")
|
return fmt.Errorf("found no IB devices")
|
||||||
}
|
}
|
||||||
|
|
||||||
m.init = true
|
m.init = true
|
||||||
@ -142,6 +155,7 @@ func (m *InfinibandCollector) Read(interval time.Duration, output chan lp.CCMetr
|
|||||||
}
|
}
|
||||||
|
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
|
tdiff := now.Sub(m.lastTimestamp)
|
||||||
for _, info := range m.info {
|
for _, info := range m.info {
|
||||||
for counterName, counterFile := range info.portCounterFiles {
|
for counterName, counterFile := range info.portCounterFiles {
|
||||||
line, err := ioutil.ReadFile(counterFile)
|
line, err := ioutil.ReadFile(counterFile)
|
||||||
@ -159,12 +173,22 @@ func (m *InfinibandCollector) Read(interval time.Duration, output chan lp.CCMetr
|
|||||||
fmt.Sprintf("Read(): Failed to convert Infininiband metrice %s='%s' to int64: %v", counterName, data, err))
|
fmt.Sprintf("Read(): Failed to convert Infininiband metrice %s='%s' to int64: %v", counterName, data, err))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if y, err := lp.New(counterName, info.tagSet, m.meta, map[string]interface{}{"value": v}, now); err == nil {
|
if m.config.SendAbsoluteValues {
|
||||||
output <- y
|
if y, err := lp.New(counterName, info.tagSet, m.meta, map[string]interface{}{"value": v}, now); err == nil {
|
||||||
|
output <- y
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if m.config.SendDerivedValues {
|
||||||
|
diff := float64((v - info.stats[counterName])) / tdiff.Seconds()
|
||||||
|
if y, err := lp.New(counterName+"_bw", info.tagSet, m.meta, map[string]interface{}{"value": diff}, now); err == nil {
|
||||||
|
output <- y
|
||||||
|
}
|
||||||
|
info.stats[counterName] = v
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
m.lastTimestamp = now
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *InfinibandCollector) Close() {
|
func (m *InfinibandCollector) Close() {
|
||||||
|
@ -19,20 +19,23 @@ const LCTL_CMD = `lctl`
|
|||||||
const LCTL_OPTION = `get_param`
|
const LCTL_OPTION = `get_param`
|
||||||
|
|
||||||
type LustreCollectorConfig struct {
|
type LustreCollectorConfig struct {
|
||||||
LCtlCommand string `json:"lctl_command"`
|
LCtlCommand string `json:"lctl_command"`
|
||||||
ExcludeMetrics []string `json:"exclude_metrics"`
|
ExcludeMetrics []string `json:"exclude_metrics"`
|
||||||
SendAllMetrics bool `json:"send_all_metrics"`
|
SendAllMetrics bool `json:"send_all_metrics"`
|
||||||
Sudo bool `json:"use_sudo"`
|
Sudo bool `json:"use_sudo"`
|
||||||
|
SendAbsoluteValues bool `json:"send_abs_values"`
|
||||||
|
SendDerivedValues bool `json:"send_derived_values"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type LustreCollector struct {
|
type LustreCollector struct {
|
||||||
metricCollector
|
metricCollector
|
||||||
tags map[string]string
|
tags map[string]string
|
||||||
matches map[string]map[string]int
|
matches map[string]map[string]int
|
||||||
stats map[string]map[string]int64
|
stats map[string]map[string]int64
|
||||||
config LustreCollectorConfig
|
config LustreCollectorConfig
|
||||||
lctl string
|
lctl string
|
||||||
sudoCmd string
|
sudoCmd string
|
||||||
|
lastTimestamp time.Time // Store time stamp of last tick to derive bandwidths
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *LustreCollector) getDeviceDataCommand(device string) []string {
|
func (m *LustreCollector) getDeviceDataCommand(device string) []string {
|
||||||
@ -165,6 +168,7 @@ func (m *LustreCollector) Init(config json.RawMessage) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
m.lastTimestamp = time.Now()
|
||||||
m.init = true
|
m.init = true
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -173,6 +177,8 @@ func (m *LustreCollector) Read(interval time.Duration, output chan lp.CCMetric)
|
|||||||
if !m.init {
|
if !m.init {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
now := time.Now()
|
||||||
|
tdiff := now.Sub(m.lastTimestamp)
|
||||||
for device, devData := range m.stats {
|
for device, devData := range m.stats {
|
||||||
stats := m.getDeviceDataCommand(device)
|
stats := m.getDeviceDataCommand(device)
|
||||||
processed := []string{}
|
processed := []string{}
|
||||||
@ -183,23 +189,35 @@ func (m *LustreCollector) Read(interval time.Duration, output chan lp.CCMetric)
|
|||||||
if fields, ok := m.matches[lf[0]]; ok {
|
if fields, ok := m.matches[lf[0]]; ok {
|
||||||
for name, idx := range fields {
|
for name, idx := range fields {
|
||||||
x, err := strconv.ParseInt(lf[idx], 0, 64)
|
x, err := strconv.ParseInt(lf[idx], 0, 64)
|
||||||
if err != nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
value := x - devData[name]
|
|
||||||
devData[name] = x
|
|
||||||
if value < 0 {
|
|
||||||
value = 0
|
|
||||||
}
|
|
||||||
y, err := lp.New(name, m.tags, m.meta, map[string]interface{}{"value": value}, time.Now())
|
|
||||||
if err == nil {
|
if err == nil {
|
||||||
y.AddTag("device", device)
|
value := x - devData[name]
|
||||||
if strings.Contains(name, "byte") {
|
devData[name] = x
|
||||||
y.AddMeta("unit", "Byte")
|
if value < 0 {
|
||||||
|
value = 0
|
||||||
}
|
}
|
||||||
output <- y
|
if m.config.SendAbsoluteValues {
|
||||||
if m.config.SendAllMetrics {
|
y, err := lp.New(name, m.tags, m.meta, map[string]interface{}{"value": value}, time.Now())
|
||||||
processed = append(processed, name)
|
if err == nil {
|
||||||
|
y.AddTag("device", device)
|
||||||
|
if strings.Contains(name, "byte") {
|
||||||
|
y.AddMeta("unit", "Byte")
|
||||||
|
}
|
||||||
|
output <- y
|
||||||
|
if m.config.SendAllMetrics {
|
||||||
|
processed = append(processed, name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if m.config.SendDerivedValues && strings.Contains(name, "bytes") {
|
||||||
|
y, err := lp.New(name+"_bw", m.tags, m.meta, map[string]interface{}{"value": float64(value) / tdiff.Seconds()}, time.Now())
|
||||||
|
if err == nil {
|
||||||
|
y.AddTag("device", device)
|
||||||
|
y.AddMeta("unit", "Bytes/sec")
|
||||||
|
output <- y
|
||||||
|
if m.config.SendAllMetrics {
|
||||||
|
processed = append(processed, name)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -221,6 +239,7 @@ func (m *LustreCollector) Read(interval time.Duration, output chan lp.CCMetric)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
m.lastTimestamp = now
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *LustreCollector) Close() {
|
func (m *LustreCollector) Close() {
|
||||||
|
@ -16,7 +16,9 @@ import (
|
|||||||
const NETSTATFILE = `/proc/net/dev`
|
const NETSTATFILE = `/proc/net/dev`
|
||||||
|
|
||||||
type NetstatCollectorConfig struct {
|
type NetstatCollectorConfig struct {
|
||||||
IncludeDevices []string `json:"include_devices"`
|
IncludeDevices []string `json:"include_devices"`
|
||||||
|
SendAbsoluteValues bool `json:"send_abs_values"`
|
||||||
|
SendDerivedValues bool `json:"send_derived_values"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type NetstatCollectorMetric struct {
|
type NetstatCollectorMetric struct {
|
||||||
@ -111,21 +113,34 @@ func (m *NetstatCollector) Read(interval time.Duration, output chan lp.CCMetric)
|
|||||||
for name, data := range devmetrics {
|
for name, data := range devmetrics {
|
||||||
v, err := strconv.ParseFloat(f[data.index], 64)
|
v, err := strconv.ParseFloat(f[data.index], 64)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
vdiff := v - data.lastValue
|
if m.config.SendAbsoluteValues {
|
||||||
value := vdiff / tdiff.Seconds()
|
if y, err := lp.New(name, m.devtags[dev], m.meta, map[string]interface{}{"value": v}, now); err == nil {
|
||||||
if data.lastValue == 0 {
|
switch {
|
||||||
value = 0
|
case strings.Contains(name, "byte"):
|
||||||
}
|
y.AddMeta("unit", "bytes")
|
||||||
data.lastValue = v
|
case strings.Contains(name, "pkt"):
|
||||||
y, err := lp.New(name, m.devtags[dev], m.meta, map[string]interface{}{"value": value}, now)
|
y.AddMeta("unit", "packets")
|
||||||
if err == nil {
|
}
|
||||||
switch {
|
output <- y
|
||||||
case strings.Contains(name, "byte"):
|
}
|
||||||
y.AddMeta("unit", "bytes/sec")
|
}
|
||||||
case strings.Contains(name, "pkt"):
|
if m.config.SendDerivedValues {
|
||||||
y.AddMeta("unit", "packets/sec")
|
|
||||||
|
vdiff := v - data.lastValue
|
||||||
|
value := vdiff / tdiff.Seconds()
|
||||||
|
if data.lastValue == 0 {
|
||||||
|
value = 0
|
||||||
|
}
|
||||||
|
data.lastValue = v
|
||||||
|
if y, err := lp.New(name+"_bw", m.devtags[dev], m.meta, map[string]interface{}{"value": value}, now); err == nil {
|
||||||
|
switch {
|
||||||
|
case strings.Contains(name, "byte"):
|
||||||
|
y.AddMeta("unit", "bytes/sec")
|
||||||
|
case strings.Contains(name, "pkt"):
|
||||||
|
y.AddMeta("unit", "packets/sec")
|
||||||
|
}
|
||||||
|
output <- y
|
||||||
}
|
}
|
||||||
output <- y
|
|
||||||
}
|
}
|
||||||
devmetrics[name] = data
|
devmetrics[name] = data
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user