diff --git a/collectors/gpfsMetric.go b/collectors/gpfsMetric.go index 453704c..98894b3 100644 --- a/collectors/gpfsMetric.go +++ b/collectors/gpfsMetric.go @@ -19,14 +19,22 @@ import ( const DEFAULT_GPFS_CMD = `mmpmon` +type GpfsCollectorLastValues struct { + read int64 + write int64 +} + type GpfsCollector struct { metricCollector tags map[string]string config struct { Mmpmon string `json:"mmpmon_path,omitempty"` ExcludeFilesystem []string `json:"exclude_filesystem,omitempty"` + SendBandwidths bool `json:"send_bandwidths"` } - skipFS map[string]struct{} + skipFS map[string]struct{} + lastTimestamp time.Time // Store time stamp of last tick to derive bandwidths + lastValues map[string]GpfsCollectorLastValues } func (m *GpfsCollector) Init(config json.RawMessage) error { @@ -38,6 +46,7 @@ func (m *GpfsCollector) Init(config json.RawMessage) error { var err error m.name = "GpfsCollector" m.setup() + m.lastTimestamp = time.Now() // Set default mmpmon binary m.config.Mmpmon = string(DEFAULT_GPFS_CMD) @@ -89,6 +98,9 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) { return } + now := time.Now() + tdiff := now.Sub(m.lastTimestamp) + m.lastTimestamp = now // mmpmon: // -p: generate output that can be parsed // -s: suppress the prompt on input @@ -148,6 +160,12 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) { } m.tags["filesystem"] = filesystem + if _, ok := m.lastValues[filesystem]; !ok { + m.lastValues[filesystem] = GpfsCollectorLastValues{ + read: 0, + write: 0, + } + } // return code rc, err := strconv.Atoi(key_value["_rc_"]) @@ -191,6 +209,15 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) { if y, err := lp.New("gpfs_bytes_read", m.tags, m.meta, map[string]interface{}{"value": bytesRead}, timestamp); err == nil { output <- y } + if m.config.SendBandwidths { + lastVal := m.lastValues[filesystem] + diff := bytesRead - lastVal.read + lastVal.read = bytesRead + if y, err := lp.New("gpfs_bw_read", m.tags, m.meta, map[string]interface{}{"value": float64(diff) / tdiff.Seconds()}, timestamp); err == nil { + output <- y + } + m.lastValues[filesystem] = lastVal + } // bytes written bytesWritten, err := strconv.ParseInt(key_value["_bw_"], 10, 64) @@ -203,6 +230,15 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) { if y, err := lp.New("gpfs_bytes_written", m.tags, m.meta, map[string]interface{}{"value": bytesWritten}, timestamp); err == nil { output <- y } + if m.config.SendBandwidths { + lastVal := m.lastValues[filesystem] + diff := bytesWritten - lastVal.write + lastVal.write = bytesWritten + if y, err := lp.New("gpfs_bw_write", m.tags, m.meta, map[string]interface{}{"value": float64(diff) / tdiff.Seconds()}, timestamp); err == nil { + output <- y + } + m.lastValues[filesystem] = lastVal + } // number of opens numOpens, err := strconv.ParseInt(key_value["_oc_"], 10, 64) diff --git a/collectors/infinibandMetric.go b/collectors/infinibandMetric.go index ac79e0a..7f3326d 100644 --- a/collectors/infinibandMetric.go +++ b/collectors/infinibandMetric.go @@ -24,14 +24,18 @@ type InfinibandCollectorInfo struct { port string // IB device port portCounterFiles map[string]string // mapping counter name -> sysfs file tagSet map[string]string // corresponding tag list + stats map[string]int64 } type InfinibandCollector struct { metricCollector config struct { - ExcludeDevices []string `json:"exclude_devices,omitempty"` // IB device to exclude e.g. mlx5_0 + ExcludeDevices []string `json:"exclude_devices,omitempty"` // IB device to exclude e.g. mlx5_0 + SendAbsoluteValues bool `json:"send_abs_values"` + SendDerivedValues bool `json:"send_derived_values"` } - info []*InfinibandCollectorInfo + info []*InfinibandCollectorInfo + lastTimestamp time.Time // Store time stamp of last tick to derive bandwidths } // Init initializes the Infiniband collector by walking through files below IB_BASEPATH @@ -49,6 +53,9 @@ func (m *InfinibandCollector) Init(config json.RawMessage) error { "source": m.name, "group": "Network", } + m.lastTimestamp = time.Now() + m.config.SendAbsoluteValues = true + m.config.SendDerivedValues = false if len(config) > 0 { err = json.Unmarshal(config, &m.config) if err != nil { @@ -60,10 +67,10 @@ func (m *InfinibandCollector) Init(config json.RawMessage) error { globPattern := filepath.Join(IB_BASEPATH, "*", "ports", "*") ibDirs, err := filepath.Glob(globPattern) if err != nil { - return fmt.Errorf("Unable to glob files with pattern %s: %v", globPattern, err) + return fmt.Errorf("unable to glob files with pattern %s: %v", globPattern, err) } if ibDirs == nil { - return fmt.Errorf("Unable to find any directories with pattern %s", globPattern) + return fmt.Errorf("unable to find any directories with pattern %s", globPattern) } for _, path := range ibDirs { @@ -106,7 +113,7 @@ func (m *InfinibandCollector) Init(config json.RawMessage) error { for _, counterFile := range portCounterFiles { err := unix.Access(counterFile, unix.R_OK) if err != nil { - return fmt.Errorf("Unable to access %s: %v", counterFile, err) + return fmt.Errorf("unable to access %s: %v", counterFile, err) } } @@ -122,11 +129,17 @@ func (m *InfinibandCollector) Init(config json.RawMessage) error { "port": port, "lid": LID, }, + stats: map[string]int64{ + "ib_recv": 0, + "ib_xmit": 0, + "ib_recv_pkts": 0, + "ib_xmit_pkts": 0, + }, }) } if len(m.info) == 0 { - return fmt.Errorf("Found no IB devices") + return fmt.Errorf("found no IB devices") } m.init = true @@ -142,6 +155,7 @@ func (m *InfinibandCollector) Read(interval time.Duration, output chan lp.CCMetr } now := time.Now() + tdiff := now.Sub(m.lastTimestamp) for _, info := range m.info { for counterName, counterFile := range info.portCounterFiles { line, err := ioutil.ReadFile(counterFile) @@ -159,12 +173,22 @@ func (m *InfinibandCollector) Read(interval time.Duration, output chan lp.CCMetr fmt.Sprintf("Read(): Failed to convert Infininiband metrice %s='%s' to int64: %v", counterName, data, err)) continue } - if y, err := lp.New(counterName, info.tagSet, m.meta, map[string]interface{}{"value": v}, now); err == nil { - output <- y + if m.config.SendAbsoluteValues { + if y, err := lp.New(counterName, info.tagSet, m.meta, map[string]interface{}{"value": v}, now); err == nil { + output <- y + } + } + if m.config.SendDerivedValues { + diff := float64((v - info.stats[counterName])) / tdiff.Seconds() + if y, err := lp.New(counterName+"_bw", info.tagSet, m.meta, map[string]interface{}{"value": diff}, now); err == nil { + output <- y + } + info.stats[counterName] = v } } } + m.lastTimestamp = now } func (m *InfinibandCollector) Close() { diff --git a/collectors/lustreMetric.go b/collectors/lustreMetric.go index 66fd3fd..67efd7a 100644 --- a/collectors/lustreMetric.go +++ b/collectors/lustreMetric.go @@ -19,20 +19,23 @@ const LCTL_CMD = `lctl` const LCTL_OPTION = `get_param` type LustreCollectorConfig struct { - LCtlCommand string `json:"lctl_command"` - ExcludeMetrics []string `json:"exclude_metrics"` - SendAllMetrics bool `json:"send_all_metrics"` - Sudo bool `json:"use_sudo"` + LCtlCommand string `json:"lctl_command"` + ExcludeMetrics []string `json:"exclude_metrics"` + SendAllMetrics bool `json:"send_all_metrics"` + Sudo bool `json:"use_sudo"` + SendAbsoluteValues bool `json:"send_abs_values"` + SendDerivedValues bool `json:"send_derived_values"` } type LustreCollector struct { metricCollector - tags map[string]string - matches map[string]map[string]int - stats map[string]map[string]int64 - config LustreCollectorConfig - lctl string - sudoCmd string + tags map[string]string + matches map[string]map[string]int + stats map[string]map[string]int64 + config LustreCollectorConfig + lctl string + sudoCmd string + lastTimestamp time.Time // Store time stamp of last tick to derive bandwidths } func (m *LustreCollector) getDeviceDataCommand(device string) []string { @@ -165,6 +168,7 @@ func (m *LustreCollector) Init(config json.RawMessage) error { } } } + m.lastTimestamp = time.Now() m.init = true return nil } @@ -173,6 +177,8 @@ func (m *LustreCollector) Read(interval time.Duration, output chan lp.CCMetric) if !m.init { return } + now := time.Now() + tdiff := now.Sub(m.lastTimestamp) for device, devData := range m.stats { stats := m.getDeviceDataCommand(device) processed := []string{} @@ -183,23 +189,35 @@ func (m *LustreCollector) Read(interval time.Duration, output chan lp.CCMetric) if fields, ok := m.matches[lf[0]]; ok { for name, idx := range fields { x, err := strconv.ParseInt(lf[idx], 0, 64) - if err != nil { - continue - } - value := x - devData[name] - devData[name] = x - if value < 0 { - value = 0 - } - y, err := lp.New(name, m.tags, m.meta, map[string]interface{}{"value": value}, time.Now()) if err == nil { - y.AddTag("device", device) - if strings.Contains(name, "byte") { - y.AddMeta("unit", "Byte") + value := x - devData[name] + devData[name] = x + if value < 0 { + value = 0 } - output <- y - if m.config.SendAllMetrics { - processed = append(processed, name) + if m.config.SendAbsoluteValues { + y, err := lp.New(name, m.tags, m.meta, map[string]interface{}{"value": value}, time.Now()) + if err == nil { + y.AddTag("device", device) + if strings.Contains(name, "byte") { + y.AddMeta("unit", "Byte") + } + output <- y + if m.config.SendAllMetrics { + processed = append(processed, name) + } + } + } + if m.config.SendDerivedValues && strings.Contains(name, "bytes") { + y, err := lp.New(name+"_bw", m.tags, m.meta, map[string]interface{}{"value": float64(value) / tdiff.Seconds()}, time.Now()) + if err == nil { + y.AddTag("device", device) + y.AddMeta("unit", "Bytes/sec") + output <- y + if m.config.SendAllMetrics { + processed = append(processed, name) + } + } } } } @@ -221,6 +239,7 @@ func (m *LustreCollector) Read(interval time.Duration, output chan lp.CCMetric) } } } + m.lastTimestamp = now } func (m *LustreCollector) Close() { diff --git a/collectors/netstatMetric.go b/collectors/netstatMetric.go index 7eaa3cf..d499337 100644 --- a/collectors/netstatMetric.go +++ b/collectors/netstatMetric.go @@ -16,7 +16,9 @@ import ( const NETSTATFILE = `/proc/net/dev` type NetstatCollectorConfig struct { - IncludeDevices []string `json:"include_devices"` + IncludeDevices []string `json:"include_devices"` + SendAbsoluteValues bool `json:"send_abs_values"` + SendDerivedValues bool `json:"send_derived_values"` } type NetstatCollectorMetric struct { @@ -111,21 +113,34 @@ func (m *NetstatCollector) Read(interval time.Duration, output chan lp.CCMetric) for name, data := range devmetrics { v, err := strconv.ParseFloat(f[data.index], 64) if err == nil { - vdiff := v - data.lastValue - value := vdiff / tdiff.Seconds() - if data.lastValue == 0 { - value = 0 - } - data.lastValue = v - y, err := lp.New(name, m.devtags[dev], m.meta, map[string]interface{}{"value": value}, now) - if err == nil { - switch { - case strings.Contains(name, "byte"): - y.AddMeta("unit", "bytes/sec") - case strings.Contains(name, "pkt"): - y.AddMeta("unit", "packets/sec") + if m.config.SendAbsoluteValues { + if y, err := lp.New(name, m.devtags[dev], m.meta, map[string]interface{}{"value": v}, now); err == nil { + switch { + case strings.Contains(name, "byte"): + y.AddMeta("unit", "bytes") + case strings.Contains(name, "pkt"): + y.AddMeta("unit", "packets") + } + output <- y + } + } + if m.config.SendDerivedValues { + + vdiff := v - data.lastValue + value := vdiff / tdiff.Seconds() + if data.lastValue == 0 { + value = 0 + } + data.lastValue = v + if y, err := lp.New(name+"_bw", m.devtags[dev], m.meta, map[string]interface{}{"value": value}, now); err == nil { + switch { + case strings.Contains(name, "byte"): + y.AddMeta("unit", "bytes/sec") + case strings.Contains(name, "pkt"): + y.AddMeta("unit", "packets/sec") + } + output <- y } - output <- y } devmetrics[name] = data }