From f64182630b747efc71c58e7215fa48505f97145e Mon Sep 17 00:00:00 2001 From: brinkcoder Date: Wed, 5 Mar 2025 00:58:06 +0100 Subject: [PATCH] add only_metrics, diff_values and derived_values. docs: clarify usage --- collectors/customCmdMetric.go | 153 ++++++++++++++++++++++++++++------ collectors/customCmdMetric.md | 37 +++++++- 2 files changed, 161 insertions(+), 29 deletions(-) diff --git a/collectors/customCmdMetric.go b/collectors/customCmdMetric.go index c9a50cf..0eb5fa6 100644 --- a/collectors/customCmdMetric.go +++ b/collectors/customCmdMetric.go @@ -9,25 +9,67 @@ import ( "strings" "time" - lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message" + lp "github.com/ClusterCockpit/cc-lib/ccMessage" influx "github.com/influxdata/line-protocol" ) const CUSTOMCMDPATH = `/home/unrz139/Work/cc-metric-collector/collectors/custom` type CustomCmdCollectorConfig struct { - Commands []string `json:"commands"` - Files []string `json:"files"` - ExcludeMetrics []string `json:"exclude_metrics"` + Commands []string `json:"commands"` + Files []string `json:"files"` + ExcludeMetrics []string `json:"exclude_metrics"` + OnlyMetrics []string `json:"only_metrics,omitempty"` + SendAbsoluteValues *bool `json:"send_abs_values,omitempty"` + SendDiffValues *bool `json:"send_diff_values,omitempty"` + SendDerivedValues *bool `json:"send_derived_values,omitempty"` +} + +// set default values: send_abs_values: true, send_diff_values: false, send_derived_values: false. +func (cfg *CustomCmdCollectorConfig) AbsValues() bool { + if cfg.SendAbsoluteValues == nil { + return true + } + return *cfg.SendAbsoluteValues +} +func (cfg *CustomCmdCollectorConfig) DiffValues() bool { + if cfg.SendDiffValues == nil { + return false + } + return *cfg.SendDiffValues +} +func (cfg *CustomCmdCollectorConfig) DerivedValues() bool { + if cfg.SendDerivedValues == nil { + return false + } + return *cfg.SendDerivedValues } type CustomCmdCollector struct { metricCollector - handler *influx.MetricHandler - parser *influx.Parser - config CustomCmdCollectorConfig - commands []string - files []string + handler *influx.MetricHandler + parser *influx.Parser + config CustomCmdCollectorConfig + commands []string + files []string + oldValues map[string]float64 +} + +func (m *CustomCmdCollector) shouldOutput(metricName string) bool { + if len(m.config.OnlyMetrics) > 0 { + for _, name := range m.config.OnlyMetrics { + if name == metricName { + return true + } + } + return false + } + for _, name := range m.config.ExcludeMetrics { + if name == metricName { + return false + } + } + return true } func (m *CustomCmdCollector) Init(config json.RawMessage) error { @@ -67,6 +109,7 @@ func (m *CustomCmdCollector) Init(config json.RawMessage) error { m.handler = influx.NewMetricHandler() m.parser = influx.NewParser(m.handler) m.parser.SetTimeFunc(DefaultTime) + m.oldValues = make(map[string]float64) m.init = true return nil } @@ -75,6 +118,73 @@ var DefaultTime = func() time.Time { return time.Unix(42, 0) } +// copyMeta creates a deep copy of a map[string]string. +func copyMeta(orig map[string]string) map[string]string { + newMeta := make(map[string]string) + for k, v := range orig { + newMeta[k] = v + } + return newMeta +} + +// getMetricValueFromMsg extracts the numeric "value" field from a CCMessage. +func getMetricValueFromMsg(msg lp.CCMessage) (float64, bool) { + fields := msg.Fields() + if v, ok := fields["value"]; ok { + if num, ok := v.(float64); ok { + return num, true + } + } + return 0, false +} + +// processMetric processes a single metric: +// - If send_abs_values is enabled, sends the absolute metric. +// - If send_diff_values is enabled and a previous value exists, sends the diff value under the name "_diff". +// - If send_derived_values is enabled and a previous value exists, sends the derived rate as "_rate". +func (m *CustomCmdCollector) processMetric(msg lp.CCMessage, interval time.Duration, output chan lp.CCMessage) { + name := msg.Name() + if !m.shouldOutput(name) { + return + } + if m.config.AbsValues() { + output <- msg + } + val, ok := getMetricValueFromMsg(msg) + if !ok { + return + } + if prev, exists := m.oldValues[name]; exists { + diff := val - prev + if m.config.DiffValues() && m.shouldOutput(name+"_diff") { + diffMsg, err := lp.NewMessage(name+"_diff", msg.Tags(), msg.Meta(), map[string]interface{}{"value": diff}, time.Now()) + if err == nil { + output <- diffMsg + } + } + if m.config.DerivedValues() && m.shouldOutput(name+"_rate") { + derived := diff / interval.Seconds() + newMeta := copyMeta(msg.Meta()) + unit := newMeta["unit"] + if unit == "" { + if tagUnit, ok := msg.Tags()["unit"]; ok { + unit = tagUnit + newMeta["unit"] = unit + } + } + if unit != "" && !strings.HasSuffix(unit, "/s") { + newMeta["unit"] = unit + "/s" + } + derivedMsg, err := lp.NewMessage(name+"_rate", msg.Tags(), newMeta, map[string]interface{}{"value": derived}, time.Now()) + if err == nil { + output <- derivedMsg + } + } + } + m.oldValues[name] = val +} + +// Read processes commands and files, parses their output, and processes each metric. func (m *CustomCmdCollector) Read(interval time.Duration, output chan lp.CCMessage) { if !m.init { return @@ -88,37 +198,30 @@ func (m *CustomCmdCollector) Read(interval time.Duration, output chan lp.CCMessa log.Print(err) continue } - cmdmetrics, err := m.parser.Parse(stdout) + metrics, err := m.parser.Parse(stdout) if err != nil { log.Print(err) continue } - for _, c := range cmdmetrics { - _, skip := stringArrayContains(m.config.ExcludeMetrics, c.Name()) - if skip { - continue - } - - output <- lp.FromInfluxMetric(c) + for _, met := range metrics { + msg := lp.FromInfluxMetric(met) + m.processMetric(msg, interval, output) } } for _, file := range m.files { buffer, err := os.ReadFile(file) if err != nil { log.Print(err) - return + continue } - fmetrics, err := m.parser.Parse(buffer) + metrics, err := m.parser.Parse(buffer) if err != nil { log.Print(err) continue } - for _, f := range fmetrics { - _, skip := stringArrayContains(m.config.ExcludeMetrics, f.Name()) - if skip { - continue - } - output <- lp.FromInfluxMetric(f) + for _, met := range metrics { + msg := lp.FromInfluxMetric(met) + m.processMetric(msg, interval, output) } } } diff --git a/collectors/customCmdMetric.md b/collectors/customCmdMetric.md index 011135d..7e33947 100644 --- a/collectors/customCmdMetric.md +++ b/collectors/customCmdMetric.md @@ -1,4 +1,3 @@ - ## `customcmd` collector ```json @@ -6,15 +5,45 @@ "exclude_metrics": [ "mymetric" ], - "files" : [ + "only_metrics": [ + "cpu_usage", + "cpu_usage_rate", + "mem_usage", + "mem_usage_rate" + ], + "send_abs_values": true, + "send_diff_values": true, + "send_derived_values": true, + "files": [ "/var/run/myapp.metrics" ], - "commands" : [ + "commands": [ "/usr/local/bin/getmetrics.pl" ] } ``` -The `customcmd` collector reads data from files and the output of executed commands. The files and commands can output multiple metrics (separated by newline) but the have to be in the [InfluxDB line protocol](https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol/). If a metric is not parsable, it is skipped. If a metric is not required, it can be excluded from forwarding it to the sink. +The `customcmd` collector reads data from specified files and executed commands. +Both the output of commands and the content of files must follow the [InfluxDB line protocol](https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol/). +**Expected format example:** +``` +cpu_usage,host=myhost,type=hwthread,type-id=0,unit=MByte value=42.0 1670000000000000000 +mem_usage,host=myhost,type=node,unit=MByte value=1024 1670000000000000000 +``` + +The following tags are commonly used: +- **type:** Indicates the metric scope, e.g. "node", "socket" or "hwthread". +- **type-id:** The identifier for the type (e.g. the specific hardware thread or socket). +- **unit:** The unit of the metric (e.g. "MByte"). + +For each metric parsed from the output: +- If `send_abs_values` is enabled, the **absolute (raw) metric** is forwarded. +- If `send_diff_values` is enabled and a previous value exists, the collector computes the **difference** (current value minus previous value) and forwards it as a new metric with the suffix `_diff`. +- If `send_derived_values` is enabled and a previous value exists, the collector computes the **derived rate** (difference divided by the time interval) and forwards it as a new metric with the suffix `_rate`. + Additionally, if the original metric includes a unit (in its meta data or tags), the derived metric's unit is set to that unit with "/s" appended. + +Both filtering mechanisms are supported: +- `exclude_metrics`: Excludes the specified metrics. +- `only_metrics`: If provided, only the listed metrics are collected. This takes precedence over `exclude_metrics`.