diff --git a/collectors.json b/collectors.json index 27ef822..be3eea7 100644 --- a/collectors.json +++ b/collectors.json @@ -12,6 +12,12 @@ "proc_total" ] }, + "netstat": { + "include_devices": [ + "enp5s0" + ], + "send_derived_values": true + }, "numastats": {}, "nvidia": {}, "tempstat": { diff --git a/collectors/gpfsMetric.go b/collectors/gpfsMetric.go index 453704c..26fc723 100644 --- a/collectors/gpfsMetric.go +++ b/collectors/gpfsMetric.go @@ -17,7 +17,12 @@ import ( lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" ) -const DEFAULT_GPFS_CMD = `mmpmon` +const DEFAULT_GPFS_CMD = "mmpmon" + +type GpfsCollectorLastState struct { + bytesRead int64 + bytesWritten int64 +} type GpfsCollector struct { metricCollector @@ -25,8 +30,11 @@ type GpfsCollector struct { config struct { Mmpmon string `json:"mmpmon_path,omitempty"` ExcludeFilesystem []string `json:"exclude_filesystem,omitempty"` + SendBandwidths bool `json:"send_bandwidths"` } - skipFS map[string]struct{} + skipFS map[string]struct{} + lastTimestamp time.Time // Store time stamp of last tick to derive bandwidths + lastState map[string]GpfsCollectorLastState } func (m *GpfsCollector) Init(config json.RawMessage) error { @@ -40,7 +48,7 @@ func (m *GpfsCollector) Init(config json.RawMessage) error { m.setup() // Set default mmpmon binary - m.config.Mmpmon = string(DEFAULT_GPFS_CMD) + m.config.Mmpmon = DEFAULT_GPFS_CMD // Read JSON configuration if len(config) > 0 { @@ -89,6 +97,13 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) { return } + // Current time stamp + now := time.Now() + // time difference to last time stamp + timeDiff := now.Sub(m.lastTimestamp).Seconds() + // Save current timestamp + m.lastTimestamp = now + // mmpmon: // -p: generate output that can be parsed // -s: suppress the prompt on input @@ -148,6 +163,12 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) { } m.tags["filesystem"] = filesystem + if _, ok := m.lastState[filesystem]; !ok { + m.lastState[filesystem] = GpfsCollectorLastState{ + bytesRead: -1, + bytesWritten: -1, + } + } // return code rc, err := strconv.Atoi(key_value["_rc_"]) @@ -191,6 +212,14 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) { if y, err := lp.New("gpfs_bytes_read", m.tags, m.meta, map[string]interface{}{"value": bytesRead}, timestamp); err == nil { output <- y } + if m.config.SendBandwidths { + if lastBytesRead := m.lastState[filesystem].bytesRead; lastBytesRead >= 0 { + bwRead := float64(bytesRead-lastBytesRead) / timeDiff + if y, err := lp.New("gpfs_bw_read", m.tags, m.meta, map[string]interface{}{"value": bwRead}, timestamp); err == nil { + output <- y + } + } + } // bytes written bytesWritten, err := strconv.ParseInt(key_value["_bw_"], 10, 64) @@ -203,6 +232,21 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) { if y, err := lp.New("gpfs_bytes_written", m.tags, m.meta, map[string]interface{}{"value": bytesWritten}, timestamp); err == nil { output <- y } + if m.config.SendBandwidths { + if lastBytesWritten := m.lastState[filesystem].bytesRead; lastBytesWritten >= 0 { + bwWrite := float64(bytesWritten-lastBytesWritten) / timeDiff + if y, err := lp.New("gpfs_bw_write", m.tags, m.meta, map[string]interface{}{"value": bwWrite}, timestamp); err == nil { + output <- y + } + } + } + + if m.config.SendBandwidths { + m.lastState[filesystem] = GpfsCollectorLastState{ + bytesRead: bytesRead, + bytesWritten: bytesWritten, + } + } // number of opens numOpens, err := strconv.ParseInt(key_value["_oc_"], 10, 64) diff --git a/collectors/gpfsMetric.md b/collectors/gpfsMetric.md index 4f2c897..30a5a40 100644 --- a/collectors/gpfsMetric.md +++ b/collectors/gpfsMetric.md @@ -5,7 +5,8 @@ "mmpmon_path": "/path/to/mmpmon", "exclude_filesystem": [ "fs1" - ] + ], + "send_bandwidths" : true } ``` @@ -18,13 +19,16 @@ in the configuration. The path to the `mmpmon` command can be configured with the `mmpmon_path` option in the configuration. If nothing is set, the collector searches in `$PATH` for `mmpmon`. + Metrics: -* `bytes_read` +* `gpfs_bytes_read` * `gpfs_bytes_written` * `gpfs_num_opens` * `gpfs_num_closes` * `gpfs_num_reads` * `gpfs_num_readdirs` * `gpfs_num_inode_updates` +* `gpfs_bw_read` (if `send_bandwidths == true`) +* `gpfs_bw_write` (if `send_bandwidths == true`) The collector adds a `filesystem` tag to all metrics diff --git a/collectors/infinibandMetric.go b/collectors/infinibandMetric.go index ac79e0a..5be095d 100644 --- a/collectors/infinibandMetric.go +++ b/collectors/infinibandMetric.go @@ -16,7 +16,7 @@ import ( "time" ) -const IB_BASEPATH = `/sys/class/infiniband/` +const IB_BASEPATH = "/sys/class/infiniband/" type InfinibandCollectorInfo struct { LID string // IB local Identifier (LID) @@ -24,14 +24,18 @@ type InfinibandCollectorInfo struct { port string // IB device port portCounterFiles map[string]string // mapping counter name -> sysfs file tagSet map[string]string // corresponding tag list + lastState map[string]int64 // State from last measurement } type InfinibandCollector struct { metricCollector config struct { - ExcludeDevices []string `json:"exclude_devices,omitempty"` // IB device to exclude e.g. mlx5_0 + ExcludeDevices []string `json:"exclude_devices,omitempty"` // IB device to exclude e.g. mlx5_0 + SendAbsoluteValues bool `json:"send_abs_values"` // Send absolut values as read from sys filesystem + SendDerivedValues bool `json:"send_derived_values"` // Send derived values e.g. rates } - info []*InfinibandCollectorInfo + info []*InfinibandCollectorInfo + lastTimestamp time.Time // Store time stamp of last tick to derive bandwidths } // Init initializes the Infiniband collector by walking through files below IB_BASEPATH @@ -49,6 +53,11 @@ func (m *InfinibandCollector) Init(config json.RawMessage) error { "source": m.name, "group": "Network", } + + // Set default configuration, + m.config.SendAbsoluteValues = true + m.config.SendDerivedValues = false + // Read configuration file, allow overwriting default config if len(config) > 0 { err = json.Unmarshal(config, &m.config) if err != nil { @@ -60,10 +69,10 @@ func (m *InfinibandCollector) Init(config json.RawMessage) error { globPattern := filepath.Join(IB_BASEPATH, "*", "ports", "*") ibDirs, err := filepath.Glob(globPattern) if err != nil { - return fmt.Errorf("Unable to glob files with pattern %s: %v", globPattern, err) + return fmt.Errorf("unable to glob files with pattern %s: %v", globPattern, err) } if ibDirs == nil { - return fmt.Errorf("Unable to find any directories with pattern %s", globPattern) + return fmt.Errorf("unable to find any directories with pattern %s", globPattern) } for _, path := range ibDirs { @@ -106,10 +115,16 @@ func (m *InfinibandCollector) Init(config json.RawMessage) error { for _, counterFile := range portCounterFiles { err := unix.Access(counterFile, unix.R_OK) if err != nil { - return fmt.Errorf("Unable to access %s: %v", counterFile, err) + return fmt.Errorf("unable to access %s: %v", counterFile, err) } } + // Initialize last state + lastState := make(map[string]int64) + for counter := range portCounterFiles { + lastState[counter] = -1 + } + m.info = append(m.info, &InfinibandCollectorInfo{ LID: LID, @@ -122,11 +137,12 @@ func (m *InfinibandCollector) Init(config json.RawMessage) error { "port": port, "lid": LID, }, + lastState: lastState, }) } if len(m.info) == 0 { - return fmt.Errorf("Found no IB devices") + return fmt.Errorf("found no IB devices") } m.init = true @@ -141,9 +157,17 @@ func (m *InfinibandCollector) Read(interval time.Duration, output chan lp.CCMetr return } + // Current time stamp now := time.Now() + // time difference to last time stamp + timeDiff := now.Sub(m.lastTimestamp).Seconds() + // Save current timestamp + m.lastTimestamp = now + for _, info := range m.info { for counterName, counterFile := range info.portCounterFiles { + + // Read counter file line, err := ioutil.ReadFile(counterFile) if err != nil { cclog.ComponentError( @@ -152,6 +176,8 @@ func (m *InfinibandCollector) Read(interval time.Duration, output chan lp.CCMetr continue } data := strings.TrimSpace(string(line)) + + // convert counter to int64 v, err := strconv.ParseInt(data, 10, 64) if err != nil { cclog.ComponentError( @@ -159,8 +185,24 @@ func (m *InfinibandCollector) Read(interval time.Duration, output chan lp.CCMetr fmt.Sprintf("Read(): Failed to convert Infininiband metrice %s='%s' to int64: %v", counterName, data, err)) continue } - if y, err := lp.New(counterName, info.tagSet, m.meta, map[string]interface{}{"value": v}, now); err == nil { - output <- y + + // Send absolut values + if m.config.SendAbsoluteValues { + if y, err := lp.New(counterName, info.tagSet, m.meta, map[string]interface{}{"value": v}, now); err == nil { + output <- y + } + } + + // Send derived values + if m.config.SendDerivedValues { + if info.lastState[counterName] >= 0 { + rate := float64((v - info.lastState[counterName])) / timeDiff + if y, err := lp.New(counterName+"_bw", info.tagSet, m.meta, map[string]interface{}{"value": rate}, now); err == nil { + output <- y + } + } + // Save current state + info.lastState[counterName] = v } } diff --git a/collectors/infinibandMetric.md b/collectors/infinibandMetric.md index 579ed77..f129aad 100644 --- a/collectors/infinibandMetric.md +++ b/collectors/infinibandMetric.md @@ -5,7 +5,9 @@ "ibstat": { "exclude_devices": [ "mlx4" - ] + ], + "send_abs_values": true, + "send_derived_values": true } ``` @@ -22,5 +24,9 @@ Metrics: * `ib_xmit` * `ib_recv_pkts` * `ib_xmit_pkts` +* `ib_recv_bw` (if `send_derived_values == true`) +* `ib_xmit_bw` (if `send_derived_values == true`) +* `ib_recv_pkts_bw` (if `send_derived_values == true`) +* `ib_xmit_pkts_bw` (if `send_derived_values == true`) The collector adds a `device` tag to all metrics diff --git a/collectors/lustreMetric.go b/collectors/lustreMetric.go index 66fd3fd..67efd7a 100644 --- a/collectors/lustreMetric.go +++ b/collectors/lustreMetric.go @@ -19,20 +19,23 @@ const LCTL_CMD = `lctl` const LCTL_OPTION = `get_param` type LustreCollectorConfig struct { - LCtlCommand string `json:"lctl_command"` - ExcludeMetrics []string `json:"exclude_metrics"` - SendAllMetrics bool `json:"send_all_metrics"` - Sudo bool `json:"use_sudo"` + LCtlCommand string `json:"lctl_command"` + ExcludeMetrics []string `json:"exclude_metrics"` + SendAllMetrics bool `json:"send_all_metrics"` + Sudo bool `json:"use_sudo"` + SendAbsoluteValues bool `json:"send_abs_values"` + SendDerivedValues bool `json:"send_derived_values"` } type LustreCollector struct { metricCollector - tags map[string]string - matches map[string]map[string]int - stats map[string]map[string]int64 - config LustreCollectorConfig - lctl string - sudoCmd string + tags map[string]string + matches map[string]map[string]int + stats map[string]map[string]int64 + config LustreCollectorConfig + lctl string + sudoCmd string + lastTimestamp time.Time // Store time stamp of last tick to derive bandwidths } func (m *LustreCollector) getDeviceDataCommand(device string) []string { @@ -165,6 +168,7 @@ func (m *LustreCollector) Init(config json.RawMessage) error { } } } + m.lastTimestamp = time.Now() m.init = true return nil } @@ -173,6 +177,8 @@ func (m *LustreCollector) Read(interval time.Duration, output chan lp.CCMetric) if !m.init { return } + now := time.Now() + tdiff := now.Sub(m.lastTimestamp) for device, devData := range m.stats { stats := m.getDeviceDataCommand(device) processed := []string{} @@ -183,23 +189,35 @@ func (m *LustreCollector) Read(interval time.Duration, output chan lp.CCMetric) if fields, ok := m.matches[lf[0]]; ok { for name, idx := range fields { x, err := strconv.ParseInt(lf[idx], 0, 64) - if err != nil { - continue - } - value := x - devData[name] - devData[name] = x - if value < 0 { - value = 0 - } - y, err := lp.New(name, m.tags, m.meta, map[string]interface{}{"value": value}, time.Now()) if err == nil { - y.AddTag("device", device) - if strings.Contains(name, "byte") { - y.AddMeta("unit", "Byte") + value := x - devData[name] + devData[name] = x + if value < 0 { + value = 0 } - output <- y - if m.config.SendAllMetrics { - processed = append(processed, name) + if m.config.SendAbsoluteValues { + y, err := lp.New(name, m.tags, m.meta, map[string]interface{}{"value": value}, time.Now()) + if err == nil { + y.AddTag("device", device) + if strings.Contains(name, "byte") { + y.AddMeta("unit", "Byte") + } + output <- y + if m.config.SendAllMetrics { + processed = append(processed, name) + } + } + } + if m.config.SendDerivedValues && strings.Contains(name, "bytes") { + y, err := lp.New(name+"_bw", m.tags, m.meta, map[string]interface{}{"value": float64(value) / tdiff.Seconds()}, time.Now()) + if err == nil { + y.AddTag("device", device) + y.AddMeta("unit", "Bytes/sec") + output <- y + if m.config.SendAllMetrics { + processed = append(processed, name) + } + } } } } @@ -221,6 +239,7 @@ func (m *LustreCollector) Read(interval time.Duration, output chan lp.CCMetric) } } } + m.lastTimestamp = now } func (m *LustreCollector) Close() { diff --git a/collectors/lustreMetric.md b/collectors/lustreMetric.md index 0cb9fc8..de4ed60 100644 --- a/collectors/lustreMetric.md +++ b/collectors/lustreMetric.md @@ -9,21 +9,26 @@ "exclude_metrics": [ "setattr", "getattr" - ] + ], + "send_abs_values" : true, + "send_derived_values" : true } ``` The `lustrestat` collector reads from the procfs stat files for Lustre like `/proc/fs/lustre/llite/lnec-XXXXXX/stats`. Metrics: -* `read_bytes` -* `read_requests` -* `write_bytes` -* `write_requests` -* `open` -* `close` -* `getattr` -* `setattr` -* `statfs` -* `inode_permission` +* `lustre_read_bytes` +* `lustre_read_requests` +* `lustre_write_bytes` +* `lustre_write_requests` +* `lustre_open` +* `lustre_close` +* `lustre_getattr` +* `lustre_setattr` +* `lustre_statfs` +* `lustre_inode_permission` +* `lustre_read_bytes_bw` (if `send_derived_values == true`) +* `lustre_write_bytes_bw` (if `send_derived_values == true`) +This collector adds an `device` tag. \ No newline at end of file diff --git a/collectors/netstatMetric.go b/collectors/netstatMetric.go index 7eaa3cf..4bfc7ab 100644 --- a/collectors/netstatMetric.go +++ b/collectors/netstatMetric.go @@ -13,22 +13,26 @@ import ( lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" ) -const NETSTATFILE = `/proc/net/dev` +const NETSTATFILE = "/proc/net/dev" type NetstatCollectorConfig struct { - IncludeDevices []string `json:"include_devices"` + IncludeDevices []string `json:"include_devices"` + SendAbsoluteValues bool `json:"send_abs_values"` + SendDerivedValues bool `json:"send_derived_values"` } type NetstatCollectorMetric struct { + name string index int - lastValue float64 + tags map[string]string + rate_tags map[string]string + lastValue int64 } type NetstatCollector struct { metricCollector config NetstatCollectorConfig - matches map[string]map[string]NetstatCollectorMetric - devtags map[string]map[string]string + matches map[string][]NetstatCollectorMetric lastTimestamp time.Time } @@ -36,15 +40,37 @@ func (m *NetstatCollector) Init(config json.RawMessage) error { m.name = "NetstatCollector" m.setup() m.lastTimestamp = time.Now() - m.meta = map[string]string{"source": m.name, "group": "Network"} - m.devtags = make(map[string]map[string]string) - nameIndexMap := map[string]int{ - "net_bytes_in": 1, - "net_pkts_in": 2, - "net_bytes_out": 9, - "net_pkts_out": 10, + m.meta = map[string]string{ + "source": m.name, + "group": "Network", } - m.matches = make(map[string]map[string]NetstatCollectorMetric) + + const ( + fieldInterface = iota + fieldReceiveBytes = iota + fieldReceivePackets = iota + fieldReceiveErrs = iota + fieldReceiveDrop = iota + fieldReceiveFifo = iota + fieldReceiveFrame = iota + fieldReceiveCompressed = iota + fieldReceiveMulticast = iota + fieldTransmitBytes = iota + fieldTransmitPackets = iota + fieldTransmitErrs = iota + fieldTransmitDrop = iota + fieldTransmitFifo = iota + fieldTransmitColls = iota + fieldTransmitCarrier = iota + fieldTransmitCompressed = iota + ) + + m.matches = make(map[string][]NetstatCollectorMetric) + + // Set default configuration, + m.config.SendAbsoluteValues = true + m.config.SendDerivedValues = false + // Read configuration file, allow overwriting default config if len(config) > 0 { err := json.Unmarshal(config, &m.config) if err != nil { @@ -52,7 +78,9 @@ func (m *NetstatCollector) Init(config json.RawMessage) error { return err } } - file, err := os.Open(string(NETSTATFILE)) + + // Check access to net statistic file + file, err := os.Open(NETSTATFILE) if err != nil { cclog.ComponentError(m.name, err.Error()) return err @@ -62,23 +90,60 @@ func (m *NetstatCollector) Init(config json.RawMessage) error { scanner := bufio.NewScanner(file) for scanner.Scan() { l := scanner.Text() + + // Skip lines with no net device entry if !strings.Contains(l, ":") { continue } + + // Split line into fields f := strings.Fields(l) + + // Get net device entry dev := strings.Trim(f[0], ": ") + + // Check if device is a included device if _, ok := stringArrayContains(m.config.IncludeDevices, dev); ok { - m.matches[dev] = make(map[string]NetstatCollectorMetric) - for name, idx := range nameIndexMap { - m.matches[dev][name] = NetstatCollectorMetric{ - index: idx, - lastValue: 0, - } + tags_unit_byte := map[string]string{"device": dev, "type": "node", "unit": "bytes"} + tags_unit_byte_per_sec := map[string]string{"device": dev, "type": "node", "unit": "bytes/sec"} + tags_unit_pkts := map[string]string{"device": dev, "type": "node", "unit": "packets"} + tags_unit_pkts_per_sec := map[string]string{"device": dev, "type": "node", "unit": "packets/sec"} + + m.matches[dev] = []NetstatCollectorMetric{ + { + name: "net_bytes_in", + index: fieldReceiveBytes, + lastValue: -1, + tags: tags_unit_byte, + rate_tags: tags_unit_byte_per_sec, + }, + { + name: "net_pkts_in", + index: fieldReceivePackets, + lastValue: -1, + tags: tags_unit_pkts, + rate_tags: tags_unit_pkts_per_sec, + }, + { + name: "net_bytes_out", + index: fieldTransmitBytes, + lastValue: -1, + tags: tags_unit_byte, + rate_tags: tags_unit_byte_per_sec, + }, + { + name: "net_pkts_out", + index: fieldTransmitPackets, + lastValue: -1, + tags: tags_unit_pkts, + rate_tags: tags_unit_pkts_per_sec, + }, } - m.devtags[dev] = map[string]string{"device": dev, "type": "node"} } + } - if len(m.devtags) == 0 { + + if len(m.matches) == 0 { return errors.New("no devices to collector metrics found") } m.init = true @@ -89,50 +154,62 @@ func (m *NetstatCollector) Read(interval time.Duration, output chan lp.CCMetric) if !m.init { return } + // Current time stamp now := time.Now() + // time difference to last time stamp + timeDiff := now.Sub(m.lastTimestamp).Seconds() + // Save current timestamp + m.lastTimestamp = now + file, err := os.Open(string(NETSTATFILE)) if err != nil { cclog.ComponentError(m.name, err.Error()) return } defer file.Close() - tdiff := now.Sub(m.lastTimestamp) scanner := bufio.NewScanner(file) for scanner.Scan() { l := scanner.Text() + + // Skip lines with no net device entry if !strings.Contains(l, ":") { continue } + + // Split line into fields f := strings.Fields(l) + + // Get net device entry dev := strings.Trim(f[0], ":") + // Check if device is a included device if devmetrics, ok := m.matches[dev]; ok { - for name, data := range devmetrics { - v, err := strconv.ParseFloat(f[data.index], 64) - if err == nil { - vdiff := v - data.lastValue - value := vdiff / tdiff.Seconds() - if data.lastValue == 0 { - value = 0 - } - data.lastValue = v - y, err := lp.New(name, m.devtags[dev], m.meta, map[string]interface{}{"value": value}, now) - if err == nil { - switch { - case strings.Contains(name, "byte"): - y.AddMeta("unit", "bytes/sec") - case strings.Contains(name, "pkt"): - y.AddMeta("unit", "packets/sec") - } + for i := range devmetrics { + metric := &devmetrics[i] + + // Read value + v, err := strconv.ParseInt(f[metric.index], 10, 64) + if err != nil { + continue + } + if m.config.SendAbsoluteValues { + if y, err := lp.New(metric.name, metric.tags, m.meta, map[string]interface{}{"value": v}, now); err == nil { output <- y } - devmetrics[name] = data + } + if m.config.SendDerivedValues { + if metric.lastValue >= 0 { + rate := float64(v-metric.lastValue) / timeDiff + if y, err := lp.New(metric.name+"_bw", metric.rate_tags, m.meta, map[string]interface{}{"value": rate}, now); err == nil { + output <- y + } + } + metric.lastValue = v } } } } - m.lastTimestamp = time.Now() } func (m *NetstatCollector) Close() { diff --git a/collectors/netstatMetric.md b/collectors/netstatMetric.md index 90d8600..424cf77 100644 --- a/collectors/netstatMetric.md +++ b/collectors/netstatMetric.md @@ -5,17 +5,23 @@ "netstat": { "include_devices": [ "eth0" - ] + ], + "send_abs_values" : true, + "send_derived_values" : true } ``` The `netstat` collector reads data from `/proc/net/dev` and outputs a handful **node** metrics. With the `include_devices` list you can specify which network devices should be measured. **Note**: Most other collectors use an _exclude_ list instead of an include list. Metrics: -* `net_bytes_in` (`unit=bytes/sec`) -* `net_bytes_out` (`unit=bytes/sec`) -* `net_pkts_in` (`unit=packets/sec`) -* `net_pkts_out` (`unit=packets/sec`) +* `net_bytes_in` (`unit=bytes`) +* `net_bytes_out` (`unit=bytes`) +* `net_pkts_in` (`unit=packets`) +* `net_pkts_out` (`unit=packets`) +* `net_bytes_in_bw` (`unit=bytes/sec` if `send_derived_values == true`) +* `net_bytes_out_bw` (`unit=bytes/sec` if `send_derived_values == true`) +* `net_pkts_in_bw` (`unit=packets/sec` if `send_derived_values == true`) +* `net_pkts_out_bw` (`unit=packets/sec` if `send_derived_values == true`) The device name is added as tag `device`.