diff --git a/collectors/customCmdMetric.go b/collectors/customCmdMetric.go
index c9a50cf..0eb5fa6 100644
--- a/collectors/customCmdMetric.go
+++ b/collectors/customCmdMetric.go
@@ -9,25 +9,67 @@ import (
"strings"
"time"
- lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message"
+ lp "github.com/ClusterCockpit/cc-lib/ccMessage"
influx "github.com/influxdata/line-protocol"
)
const CUSTOMCMDPATH = `/home/unrz139/Work/cc-metric-collector/collectors/custom`
type CustomCmdCollectorConfig struct {
- Commands []string `json:"commands"`
- Files []string `json:"files"`
- ExcludeMetrics []string `json:"exclude_metrics"`
+ Commands []string `json:"commands"`
+ Files []string `json:"files"`
+ ExcludeMetrics []string `json:"exclude_metrics"`
+ OnlyMetrics []string `json:"only_metrics,omitempty"`
+ SendAbsoluteValues *bool `json:"send_abs_values,omitempty"`
+ SendDiffValues *bool `json:"send_diff_values,omitempty"`
+ SendDerivedValues *bool `json:"send_derived_values,omitempty"`
+}
+
+// set default values: send_abs_values: true, send_diff_values: false, send_derived_values: false.
+func (cfg *CustomCmdCollectorConfig) AbsValues() bool {
+ if cfg.SendAbsoluteValues == nil {
+ return true
+ }
+ return *cfg.SendAbsoluteValues
+}
+func (cfg *CustomCmdCollectorConfig) DiffValues() bool {
+ if cfg.SendDiffValues == nil {
+ return false
+ }
+ return *cfg.SendDiffValues
+}
+func (cfg *CustomCmdCollectorConfig) DerivedValues() bool {
+ if cfg.SendDerivedValues == nil {
+ return false
+ }
+ return *cfg.SendDerivedValues
}
type CustomCmdCollector struct {
metricCollector
- handler *influx.MetricHandler
- parser *influx.Parser
- config CustomCmdCollectorConfig
- commands []string
- files []string
+ handler *influx.MetricHandler
+ parser *influx.Parser
+ config CustomCmdCollectorConfig
+ commands []string
+ files []string
+ oldValues map[string]float64
+}
+
+func (m *CustomCmdCollector) shouldOutput(metricName string) bool {
+ if len(m.config.OnlyMetrics) > 0 {
+ for _, name := range m.config.OnlyMetrics {
+ if name == metricName {
+ return true
+ }
+ }
+ return false
+ }
+ for _, name := range m.config.ExcludeMetrics {
+ if name == metricName {
+ return false
+ }
+ }
+ return true
}
func (m *CustomCmdCollector) Init(config json.RawMessage) error {
@@ -67,6 +109,7 @@ func (m *CustomCmdCollector) Init(config json.RawMessage) error {
m.handler = influx.NewMetricHandler()
m.parser = influx.NewParser(m.handler)
m.parser.SetTimeFunc(DefaultTime)
+ m.oldValues = make(map[string]float64)
m.init = true
return nil
}
@@ -75,6 +118,73 @@ var DefaultTime = func() time.Time {
return time.Unix(42, 0)
}
+// copyMeta creates a deep copy of a map[string]string.
+func copyMeta(orig map[string]string) map[string]string {
+ newMeta := make(map[string]string)
+ for k, v := range orig {
+ newMeta[k] = v
+ }
+ return newMeta
+}
+
+// getMetricValueFromMsg extracts the numeric "value" field from a CCMessage.
+func getMetricValueFromMsg(msg lp.CCMessage) (float64, bool) {
+ fields := msg.Fields()
+ if v, ok := fields["value"]; ok {
+ if num, ok := v.(float64); ok {
+ return num, true
+ }
+ }
+ return 0, false
+}
+
+// processMetric processes a single metric:
+// - If send_abs_values is enabled, sends the absolute metric.
+// - If send_diff_values is enabled and a previous value exists, sends the diff value under the name "_diff".
+// - If send_derived_values is enabled and a previous value exists, sends the derived rate as "_rate".
+func (m *CustomCmdCollector) processMetric(msg lp.CCMessage, interval time.Duration, output chan lp.CCMessage) {
+ name := msg.Name()
+ if !m.shouldOutput(name) {
+ return
+ }
+ if m.config.AbsValues() {
+ output <- msg
+ }
+ val, ok := getMetricValueFromMsg(msg)
+ if !ok {
+ return
+ }
+ if prev, exists := m.oldValues[name]; exists {
+ diff := val - prev
+ if m.config.DiffValues() && m.shouldOutput(name+"_diff") {
+ diffMsg, err := lp.NewMessage(name+"_diff", msg.Tags(), msg.Meta(), map[string]interface{}{"value": diff}, time.Now())
+ if err == nil {
+ output <- diffMsg
+ }
+ }
+ if m.config.DerivedValues() && m.shouldOutput(name+"_rate") {
+ derived := diff / interval.Seconds()
+ newMeta := copyMeta(msg.Meta())
+ unit := newMeta["unit"]
+ if unit == "" {
+ if tagUnit, ok := msg.Tags()["unit"]; ok {
+ unit = tagUnit
+ newMeta["unit"] = unit
+ }
+ }
+ if unit != "" && !strings.HasSuffix(unit, "/s") {
+ newMeta["unit"] = unit + "/s"
+ }
+ derivedMsg, err := lp.NewMessage(name+"_rate", msg.Tags(), newMeta, map[string]interface{}{"value": derived}, time.Now())
+ if err == nil {
+ output <- derivedMsg
+ }
+ }
+ }
+ m.oldValues[name] = val
+}
+
+// Read processes commands and files, parses their output, and processes each metric.
func (m *CustomCmdCollector) Read(interval time.Duration, output chan lp.CCMessage) {
if !m.init {
return
@@ -88,37 +198,30 @@ func (m *CustomCmdCollector) Read(interval time.Duration, output chan lp.CCMessa
log.Print(err)
continue
}
- cmdmetrics, err := m.parser.Parse(stdout)
+ metrics, err := m.parser.Parse(stdout)
if err != nil {
log.Print(err)
continue
}
- for _, c := range cmdmetrics {
- _, skip := stringArrayContains(m.config.ExcludeMetrics, c.Name())
- if skip {
- continue
- }
-
- output <- lp.FromInfluxMetric(c)
+ for _, met := range metrics {
+ msg := lp.FromInfluxMetric(met)
+ m.processMetric(msg, interval, output)
}
}
for _, file := range m.files {
buffer, err := os.ReadFile(file)
if err != nil {
log.Print(err)
- return
+ continue
}
- fmetrics, err := m.parser.Parse(buffer)
+ metrics, err := m.parser.Parse(buffer)
if err != nil {
log.Print(err)
continue
}
- for _, f := range fmetrics {
- _, skip := stringArrayContains(m.config.ExcludeMetrics, f.Name())
- if skip {
- continue
- }
- output <- lp.FromInfluxMetric(f)
+ for _, met := range metrics {
+ msg := lp.FromInfluxMetric(met)
+ m.processMetric(msg, interval, output)
}
}
}
diff --git a/collectors/customCmdMetric.md b/collectors/customCmdMetric.md
index 011135d..7e33947 100644
--- a/collectors/customCmdMetric.md
+++ b/collectors/customCmdMetric.md
@@ -1,4 +1,3 @@
-
## `customcmd` collector
```json
@@ -6,15 +5,45 @@
"exclude_metrics": [
"mymetric"
],
- "files" : [
+ "only_metrics": [
+ "cpu_usage",
+ "cpu_usage_rate",
+ "mem_usage",
+ "mem_usage_rate"
+ ],
+ "send_abs_values": true,
+ "send_diff_values": true,
+ "send_derived_values": true,
+ "files": [
"/var/run/myapp.metrics"
],
- "commands" : [
+ "commands": [
"/usr/local/bin/getmetrics.pl"
]
}
```
-The `customcmd` collector reads data from files and the output of executed commands. The files and commands can output multiple metrics (separated by newline) but the have to be in the [InfluxDB line protocol](https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol/). If a metric is not parsable, it is skipped. If a metric is not required, it can be excluded from forwarding it to the sink.
+The `customcmd` collector reads data from specified files and executed commands.
+Both the output of commands and the content of files must follow the [InfluxDB line protocol](https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol/).
+**Expected format example:**
+```
+cpu_usage,host=myhost,type=hwthread,type-id=0,unit=MByte value=42.0 1670000000000000000
+mem_usage,host=myhost,type=node,unit=MByte value=1024 1670000000000000000
+```
+
+The following tags are commonly used:
+- **type:** Indicates the metric scope, e.g. "node", "socket" or "hwthread".
+- **type-id:** The identifier for the type (e.g. the specific hardware thread or socket).
+- **unit:** The unit of the metric (e.g. "MByte").
+
+For each metric parsed from the output:
+- If `send_abs_values` is enabled, the **absolute (raw) metric** is forwarded.
+- If `send_diff_values` is enabled and a previous value exists, the collector computes the **difference** (current value minus previous value) and forwards it as a new metric with the suffix `_diff`.
+- If `send_derived_values` is enabled and a previous value exists, the collector computes the **derived rate** (difference divided by the time interval) and forwards it as a new metric with the suffix `_rate`.
+ Additionally, if the original metric includes a unit (in its meta data or tags), the derived metric's unit is set to that unit with "/s" appended.
+
+Both filtering mechanisms are supported:
+- `exclude_metrics`: Excludes the specified metrics.
+- `only_metrics`: If provided, only the listed metrics are collected. This takes precedence over `exclude_metrics`.