Add time-based derivatived (e.g. bandwidth) to some collectors

This commit is contained in:
Thomas Roehl 2022-03-07 18:53:01 +01:00
parent 9b66080e5e
commit 24e163d3d2
4 changed files with 143 additions and 49 deletions

View File

@ -19,14 +19,22 @@ import (
const DEFAULT_GPFS_CMD = `mmpmon`
type GpfsCollectorLastValues struct {
read int64
write int64
}
type GpfsCollector struct {
metricCollector
tags map[string]string
config struct {
Mmpmon string `json:"mmpmon_path,omitempty"`
ExcludeFilesystem []string `json:"exclude_filesystem,omitempty"`
SendBandwidths bool `json:"send_bandwidths"`
}
skipFS map[string]struct{}
skipFS map[string]struct{}
lastTimestamp time.Time // Store time stamp of last tick to derive bandwidths
lastValues map[string]GpfsCollectorLastValues
}
func (m *GpfsCollector) Init(config json.RawMessage) error {
@ -38,6 +46,7 @@ func (m *GpfsCollector) Init(config json.RawMessage) error {
var err error
m.name = "GpfsCollector"
m.setup()
m.lastTimestamp = time.Now()
// Set default mmpmon binary
m.config.Mmpmon = string(DEFAULT_GPFS_CMD)
@ -89,6 +98,9 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) {
return
}
now := time.Now()
tdiff := now.Sub(m.lastTimestamp)
m.lastTimestamp = now
// mmpmon:
// -p: generate output that can be parsed
// -s: suppress the prompt on input
@ -148,6 +160,12 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) {
}
m.tags["filesystem"] = filesystem
if _, ok := m.lastValues[filesystem]; !ok {
m.lastValues[filesystem] = GpfsCollectorLastValues{
read: 0,
write: 0,
}
}
// return code
rc, err := strconv.Atoi(key_value["_rc_"])
@ -191,6 +209,15 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) {
if y, err := lp.New("gpfs_bytes_read", m.tags, m.meta, map[string]interface{}{"value": bytesRead}, timestamp); err == nil {
output <- y
}
if m.config.SendBandwidths {
lastVal := m.lastValues[filesystem]
diff := bytesRead - lastVal.read
lastVal.read = bytesRead
if y, err := lp.New("gpfs_bw_read", m.tags, m.meta, map[string]interface{}{"value": float64(diff) / tdiff.Seconds()}, timestamp); err == nil {
output <- y
}
m.lastValues[filesystem] = lastVal
}
// bytes written
bytesWritten, err := strconv.ParseInt(key_value["_bw_"], 10, 64)
@ -203,6 +230,15 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) {
if y, err := lp.New("gpfs_bytes_written", m.tags, m.meta, map[string]interface{}{"value": bytesWritten}, timestamp); err == nil {
output <- y
}
if m.config.SendBandwidths {
lastVal := m.lastValues[filesystem]
diff := bytesWritten - lastVal.write
lastVal.write = bytesWritten
if y, err := lp.New("gpfs_bw_write", m.tags, m.meta, map[string]interface{}{"value": float64(diff) / tdiff.Seconds()}, timestamp); err == nil {
output <- y
}
m.lastValues[filesystem] = lastVal
}
// number of opens
numOpens, err := strconv.ParseInt(key_value["_oc_"], 10, 64)

View File

@ -24,14 +24,18 @@ type InfinibandCollectorInfo struct {
port string // IB device port
portCounterFiles map[string]string // mapping counter name -> sysfs file
tagSet map[string]string // corresponding tag list
stats map[string]int64
}
type InfinibandCollector struct {
metricCollector
config struct {
ExcludeDevices []string `json:"exclude_devices,omitempty"` // IB device to exclude e.g. mlx5_0
ExcludeDevices []string `json:"exclude_devices,omitempty"` // IB device to exclude e.g. mlx5_0
SendAbsoluteValues bool `json:"send_abs_values"`
SendDerivedValues bool `json:"send_derived_values"`
}
info []*InfinibandCollectorInfo
info []*InfinibandCollectorInfo
lastTimestamp time.Time // Store time stamp of last tick to derive bandwidths
}
// Init initializes the Infiniband collector by walking through files below IB_BASEPATH
@ -49,6 +53,9 @@ func (m *InfinibandCollector) Init(config json.RawMessage) error {
"source": m.name,
"group": "Network",
}
m.lastTimestamp = time.Now()
m.config.SendAbsoluteValues = true
m.config.SendDerivedValues = false
if len(config) > 0 {
err = json.Unmarshal(config, &m.config)
if err != nil {
@ -60,10 +67,10 @@ func (m *InfinibandCollector) Init(config json.RawMessage) error {
globPattern := filepath.Join(IB_BASEPATH, "*", "ports", "*")
ibDirs, err := filepath.Glob(globPattern)
if err != nil {
return fmt.Errorf("Unable to glob files with pattern %s: %v", globPattern, err)
return fmt.Errorf("unable to glob files with pattern %s: %v", globPattern, err)
}
if ibDirs == nil {
return fmt.Errorf("Unable to find any directories with pattern %s", globPattern)
return fmt.Errorf("unable to find any directories with pattern %s", globPattern)
}
for _, path := range ibDirs {
@ -106,7 +113,7 @@ func (m *InfinibandCollector) Init(config json.RawMessage) error {
for _, counterFile := range portCounterFiles {
err := unix.Access(counterFile, unix.R_OK)
if err != nil {
return fmt.Errorf("Unable to access %s: %v", counterFile, err)
return fmt.Errorf("unable to access %s: %v", counterFile, err)
}
}
@ -122,11 +129,17 @@ func (m *InfinibandCollector) Init(config json.RawMessage) error {
"port": port,
"lid": LID,
},
stats: map[string]int64{
"ib_recv": 0,
"ib_xmit": 0,
"ib_recv_pkts": 0,
"ib_xmit_pkts": 0,
},
})
}
if len(m.info) == 0 {
return fmt.Errorf("Found no IB devices")
return fmt.Errorf("found no IB devices")
}
m.init = true
@ -142,6 +155,7 @@ func (m *InfinibandCollector) Read(interval time.Duration, output chan lp.CCMetr
}
now := time.Now()
tdiff := now.Sub(m.lastTimestamp)
for _, info := range m.info {
for counterName, counterFile := range info.portCounterFiles {
line, err := ioutil.ReadFile(counterFile)
@ -159,12 +173,22 @@ func (m *InfinibandCollector) Read(interval time.Duration, output chan lp.CCMetr
fmt.Sprintf("Read(): Failed to convert Infininiband metrice %s='%s' to int64: %v", counterName, data, err))
continue
}
if y, err := lp.New(counterName, info.tagSet, m.meta, map[string]interface{}{"value": v}, now); err == nil {
output <- y
if m.config.SendAbsoluteValues {
if y, err := lp.New(counterName, info.tagSet, m.meta, map[string]interface{}{"value": v}, now); err == nil {
output <- y
}
}
if m.config.SendDerivedValues {
diff := float64((v - info.stats[counterName])) / tdiff.Seconds()
if y, err := lp.New(counterName+"_bw", info.tagSet, m.meta, map[string]interface{}{"value": diff}, now); err == nil {
output <- y
}
info.stats[counterName] = v
}
}
}
m.lastTimestamp = now
}
func (m *InfinibandCollector) Close() {

View File

@ -19,20 +19,23 @@ const LCTL_CMD = `lctl`
const LCTL_OPTION = `get_param`
type LustreCollectorConfig struct {
LCtlCommand string `json:"lctl_command"`
ExcludeMetrics []string `json:"exclude_metrics"`
SendAllMetrics bool `json:"send_all_metrics"`
Sudo bool `json:"use_sudo"`
LCtlCommand string `json:"lctl_command"`
ExcludeMetrics []string `json:"exclude_metrics"`
SendAllMetrics bool `json:"send_all_metrics"`
Sudo bool `json:"use_sudo"`
SendAbsoluteValues bool `json:"send_abs_values"`
SendDerivedValues bool `json:"send_derived_values"`
}
type LustreCollector struct {
metricCollector
tags map[string]string
matches map[string]map[string]int
stats map[string]map[string]int64
config LustreCollectorConfig
lctl string
sudoCmd string
tags map[string]string
matches map[string]map[string]int
stats map[string]map[string]int64
config LustreCollectorConfig
lctl string
sudoCmd string
lastTimestamp time.Time // Store time stamp of last tick to derive bandwidths
}
func (m *LustreCollector) getDeviceDataCommand(device string) []string {
@ -165,6 +168,7 @@ func (m *LustreCollector) Init(config json.RawMessage) error {
}
}
}
m.lastTimestamp = time.Now()
m.init = true
return nil
}
@ -173,6 +177,8 @@ func (m *LustreCollector) Read(interval time.Duration, output chan lp.CCMetric)
if !m.init {
return
}
now := time.Now()
tdiff := now.Sub(m.lastTimestamp)
for device, devData := range m.stats {
stats := m.getDeviceDataCommand(device)
processed := []string{}
@ -183,23 +189,35 @@ func (m *LustreCollector) Read(interval time.Duration, output chan lp.CCMetric)
if fields, ok := m.matches[lf[0]]; ok {
for name, idx := range fields {
x, err := strconv.ParseInt(lf[idx], 0, 64)
if err != nil {
continue
}
value := x - devData[name]
devData[name] = x
if value < 0 {
value = 0
}
y, err := lp.New(name, m.tags, m.meta, map[string]interface{}{"value": value}, time.Now())
if err == nil {
y.AddTag("device", device)
if strings.Contains(name, "byte") {
y.AddMeta("unit", "Byte")
value := x - devData[name]
devData[name] = x
if value < 0 {
value = 0
}
output <- y
if m.config.SendAllMetrics {
processed = append(processed, name)
if m.config.SendAbsoluteValues {
y, err := lp.New(name, m.tags, m.meta, map[string]interface{}{"value": value}, time.Now())
if err == nil {
y.AddTag("device", device)
if strings.Contains(name, "byte") {
y.AddMeta("unit", "Byte")
}
output <- y
if m.config.SendAllMetrics {
processed = append(processed, name)
}
}
}
if m.config.SendDerivedValues && strings.Contains(name, "bytes") {
y, err := lp.New(name+"_bw", m.tags, m.meta, map[string]interface{}{"value": float64(value) / tdiff.Seconds()}, time.Now())
if err == nil {
y.AddTag("device", device)
y.AddMeta("unit", "Bytes/sec")
output <- y
if m.config.SendAllMetrics {
processed = append(processed, name)
}
}
}
}
}
@ -221,6 +239,7 @@ func (m *LustreCollector) Read(interval time.Duration, output chan lp.CCMetric)
}
}
}
m.lastTimestamp = now
}
func (m *LustreCollector) Close() {

View File

@ -16,7 +16,9 @@ import (
const NETSTATFILE = `/proc/net/dev`
type NetstatCollectorConfig struct {
IncludeDevices []string `json:"include_devices"`
IncludeDevices []string `json:"include_devices"`
SendAbsoluteValues bool `json:"send_abs_values"`
SendDerivedValues bool `json:"send_derived_values"`
}
type NetstatCollectorMetric struct {
@ -111,21 +113,34 @@ func (m *NetstatCollector) Read(interval time.Duration, output chan lp.CCMetric)
for name, data := range devmetrics {
v, err := strconv.ParseFloat(f[data.index], 64)
if err == nil {
vdiff := v - data.lastValue
value := vdiff / tdiff.Seconds()
if data.lastValue == 0 {
value = 0
}
data.lastValue = v
y, err := lp.New(name, m.devtags[dev], m.meta, map[string]interface{}{"value": value}, now)
if err == nil {
switch {
case strings.Contains(name, "byte"):
y.AddMeta("unit", "bytes/sec")
case strings.Contains(name, "pkt"):
y.AddMeta("unit", "packets/sec")
if m.config.SendAbsoluteValues {
if y, err := lp.New(name, m.devtags[dev], m.meta, map[string]interface{}{"value": v}, now); err == nil {
switch {
case strings.Contains(name, "byte"):
y.AddMeta("unit", "bytes")
case strings.Contains(name, "pkt"):
y.AddMeta("unit", "packets")
}
output <- y
}
}
if m.config.SendDerivedValues {
vdiff := v - data.lastValue
value := vdiff / tdiff.Seconds()
if data.lastValue == 0 {
value = 0
}
data.lastValue = v
if y, err := lp.New(name+"_bw", m.devtags[dev], m.meta, map[string]interface{}{"value": value}, now); err == nil {
switch {
case strings.Contains(name, "byte"):
y.AddMeta("unit", "bytes/sec")
case strings.Contains(name, "pkt"):
y.AddMeta("unit", "packets/sec")
}
output <- y
}
output <- y
}
devmetrics[name] = data
}