diff --git a/collectors/nfsiostatMetric.go b/collectors/nfsiostatMetric.go index 09686e9..4f19407 100644 --- a/collectors/nfsiostatMetric.go +++ b/collectors/nfsiostatMetric.go @@ -9,31 +9,36 @@ import ( "strings" "time" + lp "github.com/ClusterCockpit/cc-lib/ccMessage" cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" - lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message" ) -// These are the fields we read from the JSON configuration +// NfsIOStatCollectorConfig holds configuration options for the nfsiostat collector. type NfsIOStatCollectorConfig struct { ExcludeMetrics []string `json:"exclude_metrics,omitempty"` + OnlyMetrics []string `json:"only_metrics,omitempty"` ExcludeFilesystem []string `json:"exclude_filesystem,omitempty"` UseServerAddressAsSType bool `json:"use_server_as_stype,omitempty"` + SendAbsoluteValues bool `json:"send_abs_values"` + SendDerivedValues bool `json:"send_derived_values"` } -// This contains all variables we need during execution and the variables -// defined by metricCollector (name, init, ...) +// NfsIOStatCollector reads NFS I/O statistics from /proc/self/mountstats. type NfsIOStatCollector struct { metricCollector - config NfsIOStatCollectorConfig // the configuration structure - meta map[string]string // default meta information - tags map[string]string // default tags - data map[string]map[string]int64 // data storage for difference calculation - key string // which device info should be used as subtype ID? 'server' or 'mntpoint', see NfsIOStatCollectorConfig.UseServerAddressAsSType + config NfsIOStatCollectorConfig + meta map[string]string + tags map[string]string + data map[string]map[string]int64 // previous values per filesystem + key string // "server" or "mntpoint" + lastTimestamp time.Time } +// Regular expressions to parse mount info and byte statistics. var deviceRegex = regexp.MustCompile(`device (?P[^ ]+) mounted on (?P[^ ]+) with fstype nfs(?P\d*) statvers=[\d\.]+`) var bytesRegex = regexp.MustCompile(`\s+bytes:\s+(?P[^ ]+) (?P[^ ]+) (?P[^ ]+) (?P[^ ]+) (?P[^ ]+) (?P[^ ]+) (?P[^ ]+) (?P[^ ]+)`) +// resolve_regex_fields extracts named regex groups from a string. func resolve_regex_fields(s string, regex *regexp.Regexp) map[string]string { fields := make(map[string]string) groups := regex.SubexpNames() @@ -47,6 +52,52 @@ func resolve_regex_fields(s string, regex *regexp.Regexp) map[string]string { return fields } +// shouldOutput returns true if a base metric (without prefix) is allowed. +func (m *NfsIOStatCollector) shouldOutput(metricName string) bool { + if len(m.config.OnlyMetrics) > 0 { + for _, n := range m.config.OnlyMetrics { + if n == metricName { + return true + } + } + return false + } + for _, n := range m.config.ExcludeMetrics { + if n == metricName { + return false + } + } + return true +} + +func (m *NfsIOStatCollector) Init(config json.RawMessage) error { + var err error + m.name = "NfsIOStatCollector" + m.setup() + m.parallel = true + m.meta = map[string]string{"source": m.name, "group": "NFS", "unit": "bytes"} + m.tags = map[string]string{"type": "node"} + // Default: use_server_as_stype is false. + m.config.UseServerAddressAsSType = false + // Defaults for absolute and derived values. + m.config.SendAbsoluteValues = false + m.config.SendDerivedValues = true + if len(config) > 0 { + if err = json.Unmarshal(config, &m.config); err != nil { + cclog.ComponentError(m.name, "Error reading config:", err.Error()) + return err + } + } + m.key = "mntpoint" + if m.config.UseServerAddressAsSType { + m.key = "server" + } + m.data = m.readNfsiostats() + m.lastTimestamp = time.Now() + m.init = true + return nil +} + func (m *NfsIOStatCollector) readNfsiostats() map[string]map[string]int64 { data := make(map[string]map[string]int64) filename := "/proc/self/mountstats" @@ -58,7 +109,7 @@ func (m *NfsIOStatCollector) readNfsiostats() map[string]map[string]int64 { lines := strings.Split(string(stats), "\n") var current map[string]string = nil for _, l := range lines { - // Is this a device line with mount point, remote target and NFS version? + // Check for a device line. dev := resolve_regex_fields(l, deviceRegex) if len(dev) > 0 { if _, ok := stringArrayContains(m.config.ExcludeFilesystem, dev[m.key]); !ok { @@ -66,22 +117,35 @@ func (m *NfsIOStatCollector) readNfsiostats() map[string]map[string]int64 { if len(current["version"]) == 0 { current["version"] = "3" } + } else { + current = nil } } - - if len(current) > 0 { - // Byte line parsing (if found the device for it) + if current != nil { + // Parse byte statistics line. bytes := resolve_regex_fields(l, bytesRegex) if len(bytes) > 0 { data[current[m.key]] = make(map[string]int64) for name, sval := range bytes { - if _, ok := stringArrayContains(m.config.ExcludeMetrics, name); !ok { - val, err := strconv.ParseInt(sval, 10, 64) - if err == nil { - data[current[m.key]][name] = val + if _, ok := stringArrayContains(m.config.ExcludeMetrics, name); ok { + continue + } + if len(m.config.OnlyMetrics) > 0 { + found := false + for _, metric := range m.config.OnlyMetrics { + if metric == name { + found = true + break + } + } + if !found { + continue } } - + val, err := strconv.ParseInt(sval, 10, 64) + if err == nil { + data[current[m.key]][name] = val + } } current = nil } @@ -90,65 +154,56 @@ func (m *NfsIOStatCollector) readNfsiostats() map[string]map[string]int64 { return data } -func (m *NfsIOStatCollector) Init(config json.RawMessage) error { - var err error = nil - m.name = "NfsIOStatCollector" - m.setup() - m.parallel = true - m.meta = map[string]string{"source": m.name, "group": "NFS", "unit": "bytes"} - m.tags = map[string]string{"type": "node"} - m.config.UseServerAddressAsSType = false - if len(config) > 0 { - err = json.Unmarshal(config, &m.config) - if err != nil { - cclog.ComponentError(m.name, "Error reading config:", err.Error()) - return err - } - } - m.key = "mntpoint" - if m.config.UseServerAddressAsSType { - m.key = "server" - } - m.data = m.readNfsiostats() - m.init = true - return err -} - func (m *NfsIOStatCollector) Read(interval time.Duration, output chan lp.CCMessage) { - timestamp := time.Now() + now := time.Now() + timeDiff := now.Sub(m.lastTimestamp).Seconds() + m.lastTimestamp = now - // Get the current values for all mountpoints newdata := m.readNfsiostats() for mntpoint, values := range newdata { - // Was the mount point already present in the last iteration - if old, ok := m.data[mntpoint]; ok { - // Calculate the difference of old and new values - for i := range values { - x := values[i] - old[i] - y, err := lp.NewMessage(fmt.Sprintf("nfsio_%s", i), m.tags, m.meta, map[string]interface{}{"value": x}, timestamp) + if _, ok := stringArrayContains(m.config.ExcludeFilesystem, mntpoint); ok { + continue + } + for name, newVal := range values { + baseName := name // Base metric name. + if m.config.SendAbsoluteValues && m.shouldOutput(baseName) { + msg, err := lp.NewMessage(fmt.Sprintf("nfsio_%s", baseName), m.tags, m.meta, map[string]interface{}{"value": newVal}, now) if err == nil { - if strings.HasPrefix(i, "page") { - y.AddMeta("unit", "4K_Pages") - } - y.AddTag("stype", "filesystem") - y.AddTag("stype-id", mntpoint) - // Send it to output channel - output <- y + msg.AddTag("stype", "filesystem") + msg.AddTag("stype-id", mntpoint) + output <- msg } - // Update old to the new value for the next iteration - old[i] = values[i] } - } else { - // First time we see this mount point, store all values - m.data[mntpoint] = values + if m.config.SendDerivedValues { + if old, ok := m.data[mntpoint][name]; ok { + rate := float64(newVal-old) / timeDiff + if m.shouldOutput(baseName) { + msg, err := lp.NewMessage(fmt.Sprintf("nfsio_%s_bw", baseName), m.tags, m.meta, map[string]interface{}{"value": rate}, now) + if err == nil { + if strings.HasPrefix(name, "page") { + msg.AddMeta("unit", "4K_pages/s") + } else { + msg.AddMeta("unit", "bytes/sec") + } + msg.AddTag("stype", "filesystem") + msg.AddTag("stype-id", mntpoint) + output <- msg + } + } + } + } + if m.data[mntpoint] == nil { + m.data[mntpoint] = make(map[string]int64) + } + m.data[mntpoint][name] = newVal } } - // Reset entries that do not exist anymore + // Remove mountpoints that no longer exist. for mntpoint := range m.data { found := false - for new := range newdata { - if new == mntpoint { + for newMnt := range newdata { + if newMnt == mntpoint { found = true break } @@ -157,10 +212,8 @@ func (m *NfsIOStatCollector) Read(interval time.Duration, output chan lp.CCMessa m.data[mntpoint] = nil } } - } func (m *NfsIOStatCollector) Close() { - // Unset flag m.init = false } diff --git a/collectors/nfsiostatMetric.md b/collectors/nfsiostatMetric.md index 7f374e7..3309fe2 100644 --- a/collectors/nfsiostatMetric.md +++ b/collectors/nfsiostatMetric.md @@ -3,25 +3,42 @@ ```json "nfsiostat": { "exclude_metrics": [ - "nfsio_oread" + "oread", "pageread" ], - "exclude_filesystems" : [ - "/mnt", + "only_metrics": [ + "nread", "nwrite", "nfsread", "nfswrite" ], - "use_server_as_stype": false + "exclude_filesystem": [ + "/mnt" + ], + "use_server_as_stype": false, + "send_abs_values": false, + "send_derived_values": true } ``` -The `nfsiostat` collector reads data from `/proc/self/mountstats` and outputs a handful **node** metrics for each NFS filesystem. If a metric or filesystem is not required, it can be excluded from forwarding it to the sink. +The `nfsiostat` collector reads data from `/proc/self/mountstats` and outputs a handful **node**s metrics for each NFS filesystem. +Metrics are output with the prefix `nfsio_` and the base metric name (e.g. `nread`, `nwrite`, etc.). Filtering applies to the base metric name (without the `nfsio_` prefix). -Metrics: -* `nfsio_nread`: Bytes transferred by normal `read()` calls -* `nfsio_nwrite`: Bytes transferred by normal `write()` calls -* `nfsio_oread`: Bytes transferred by `read()` calls with `O_DIRECT` -* `nfsio_owrite`: Bytes transferred by `write()` calls with `O_DIRECT` -* `nfsio_pageread`: Pages transferred by `read()` calls -* `nfsio_pagewrite`: Pages transferred by `write()` calls -* `nfsio_nfsread`: Bytes transferred for reading from the server -* `nfsio_nfswrite`: Pages transferred by writing to the server +Both filtering mechanisms are supported: +- `exclude_metrics`: Excludes the specified metrics. +- `only_metrics`: If provided, only the listed metrics are collected. This takes precedence over `exclude_metrics`. -The `nfsiostat` collector adds the mountpoint to the tags as `stype=filesystem,stype-id=`. If the server address should be used instead of the mountpoint, use the `use_server_as_stype` config setting. \ No newline at end of file +Metrics are categorized as follows: + +**Absolute Metrics:** +- `nfsio_nread`: Bytes transferred by normal read() calls +- `nfsio_nwrite`: Bytes transferred by normal write() calls +- `nfsio_oread`: Bytes transferred by read() calls with O_DIRECT +- `nfsio_owrite`: Bytes transferred by write() calls with O_DIRECT +- `nfsio_pageread`: Pages transferred by read() calls +- `nfsio_pagewrite`: Pages transferred by write() calls +- `nfsio_nfsread`: Bytes transferred for reading from the server +- `nfsio_nfswrite`: Bytes transferred for writing to the server + +**Derived Metrics:** +For each absolute metric, if `send_derived_values` is enabled, an additional metric is sent with the `_bw` suffix, representing the rate: +- For byte metrics: `unit=bytes/sec` +- For page metrics: `unit=4K_pages/s` + +The `nfsiostat` collector adds the mountpoint to the tags as `stype=filesystem,stype-id=`. If the server address should be used instead of the mountpoint, use the `use_server_as_stype` config setting.