mirror of
				https://github.com/ClusterCockpit/cc-metric-collector.git
				synced 2025-11-04 02:35:07 +01:00 
			
		
		
		
	add only_metrics. docs: add units
This commit is contained in:
		@@ -1,18 +1,17 @@
 | 
			
		||||
package collectors
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"encoding/json"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"os"
 | 
			
		||||
 | 
			
		||||
	cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
 | 
			
		||||
	lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message"
 | 
			
		||||
	"golang.org/x/sys/unix"
 | 
			
		||||
 | 
			
		||||
	"encoding/json"
 | 
			
		||||
	"path/filepath"
 | 
			
		||||
	"strconv"
 | 
			
		||||
	"strings"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	lp "github.com/ClusterCockpit/cc-lib/ccMessage"
 | 
			
		||||
	cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
 | 
			
		||||
	"golang.org/x/sys/unix"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
const IB_BASEPATH = "/sys/class/infiniband/"
 | 
			
		||||
@@ -32,26 +31,45 @@ type InfinibandCollectorInfo struct {
 | 
			
		||||
	LID              string                      // IB local Identifier (LID)
 | 
			
		||||
	device           string                      // IB device
 | 
			
		||||
	port             string                      // IB device port
 | 
			
		||||
	portCounterFiles []InfinibandCollectorMetric // mapping counter name -> InfinibandCollectorMetric
 | 
			
		||||
	tagSet           map[string]string           // corresponding tag list
 | 
			
		||||
	portCounterFiles []InfinibandCollectorMetric // list of counters for this port
 | 
			
		||||
	tagSet           map[string]string           // tags for this IB port
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type InfinibandCollector struct {
 | 
			
		||||
	metricCollector
 | 
			
		||||
	config struct {
 | 
			
		||||
		ExcludeDevices     []string `json:"exclude_devices,omitempty"` // IB device to exclude e.g. mlx5_0
 | 
			
		||||
		SendAbsoluteValues bool     `json:"send_abs_values"`           // Send absolut values as read from sys filesystem
 | 
			
		||||
		SendTotalValues    bool     `json:"send_total_values"`         // Send computed total values
 | 
			
		||||
		SendDerivedValues  bool     `json:"send_derived_values"`       // Send derived values e.g. rates
 | 
			
		||||
		ExcludeDevices     []string `json:"exclude_devices,omitempty"`
 | 
			
		||||
		ExcludeMetrics     []string `json:"exclude_metrics,omitempty"`
 | 
			
		||||
		OnlyMetrics        []string `json:"only_metrics,omitempty"`
 | 
			
		||||
		SendAbsoluteValues bool     `json:"send_abs_values"`
 | 
			
		||||
		SendTotalValues    bool     `json:"send_total_values"`
 | 
			
		||||
		SendDerivedValues  bool     `json:"send_derived_values"`
 | 
			
		||||
	}
 | 
			
		||||
	info          []InfinibandCollectorInfo
 | 
			
		||||
	lastTimestamp time.Time // Store time stamp of last tick to derive bandwidths
 | 
			
		||||
	lastTimestamp time.Time // For derived calculations
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Init initializes the Infiniband collector by walking through files below IB_BASEPATH
 | 
			
		||||
func (m *InfinibandCollector) Init(config json.RawMessage) error {
 | 
			
		||||
// shouldOutput returns true if a metric (or its derived variant) should be forwarded.
 | 
			
		||||
func (m *InfinibandCollector) shouldOutput(metricName string) bool {
 | 
			
		||||
	// If only_metrics is set, only metrics with an exact match are allowed.
 | 
			
		||||
	if len(m.config.OnlyMetrics) > 0 {
 | 
			
		||||
		for _, n := range m.config.OnlyMetrics {
 | 
			
		||||
			if n == metricName {
 | 
			
		||||
				return true
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		return false
 | 
			
		||||
	}
 | 
			
		||||
	// Otherwise, exclude if present in exclude_metrics.
 | 
			
		||||
	for _, n := range m.config.ExcludeMetrics {
 | 
			
		||||
		if n == metricName {
 | 
			
		||||
			return false
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return true
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
	// Check if already initialized
 | 
			
		||||
func (m *InfinibandCollector) Init(config json.RawMessage) error {
 | 
			
		||||
	if m.init {
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
@@ -64,7 +82,6 @@ func (m *InfinibandCollector) Init(config json.RawMessage) error {
 | 
			
		||||
		"source": m.name,
 | 
			
		||||
		"group":  "Network",
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Set default configuration,
 | 
			
		||||
	m.config.SendAbsoluteValues = true
 | 
			
		||||
	m.config.SendDerivedValues = false
 | 
			
		||||
@@ -87,9 +104,9 @@ func (m *InfinibandCollector) Init(config json.RawMessage) error {
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for _, path := range ibDirs {
 | 
			
		||||
 | 
			
		||||
		// Skip, when no LID is assigned
 | 
			
		||||
		line, err := os.ReadFile(filepath.Join(path, "lid"))
 | 
			
		||||
		// Skip when no LID is assigned.
 | 
			
		||||
		lidFile := filepath.Join(path, "lid")
 | 
			
		||||
		line, err := os.ReadFile(lidFile)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
@@ -98,12 +115,15 @@ func (m *InfinibandCollector) Init(config json.RawMessage) error {
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		// Get device and port component
 | 
			
		||||
		// Get device and port components.
 | 
			
		||||
		pathSplit := strings.Split(path, string(os.PathSeparator))
 | 
			
		||||
		if len(pathSplit) < 7 {
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
		device := pathSplit[4]
 | 
			
		||||
		port := pathSplit[6]
 | 
			
		||||
 | 
			
		||||
		// Skip excluded devices
 | 
			
		||||
		// Skip excluded devices.
 | 
			
		||||
		skip := false
 | 
			
		||||
		for _, excludedDevice := range m.config.ExcludeDevices {
 | 
			
		||||
			if excludedDevice == device {
 | 
			
		||||
@@ -115,7 +135,7 @@ func (m *InfinibandCollector) Init(config json.RawMessage) error {
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		// Check access to counter files
 | 
			
		||||
		// Define the counters for the port.
 | 
			
		||||
		countersDir := filepath.Join(path, "counters")
 | 
			
		||||
		portCounterFiles := []InfinibandCollectorMetric{
 | 
			
		||||
			{
 | 
			
		||||
@@ -158,18 +178,19 @@ func (m *InfinibandCollector) Init(config json.RawMessage) error {
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		tagSet := map[string]string{
 | 
			
		||||
			"type":   "node",
 | 
			
		||||
			"device": device,
 | 
			
		||||
			"port":   port,
 | 
			
		||||
			"lid":    LID,
 | 
			
		||||
		}
 | 
			
		||||
		m.info = append(m.info,
 | 
			
		||||
			InfinibandCollectorInfo{
 | 
			
		||||
				LID:              LID,
 | 
			
		||||
				device:           device,
 | 
			
		||||
				port:             port,
 | 
			
		||||
				portCounterFiles: portCounterFiles,
 | 
			
		||||
				tagSet: map[string]string{
 | 
			
		||||
					"type":   "node",
 | 
			
		||||
					"device": device,
 | 
			
		||||
					"port":   port,
 | 
			
		||||
					"lid":    LID,
 | 
			
		||||
				},
 | 
			
		||||
				tagSet:           tagSet,
 | 
			
		||||
			})
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@@ -178,6 +199,7 @@ func (m *InfinibandCollector) Init(config json.RawMessage) error {
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	m.init = true
 | 
			
		||||
	m.lastTimestamp = time.Now()
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -198,112 +220,71 @@ func (m *InfinibandCollector) Read(interval time.Duration, output chan lp.CCMess
 | 
			
		||||
 | 
			
		||||
	for i := range m.info {
 | 
			
		||||
		info := &m.info[i]
 | 
			
		||||
 | 
			
		||||
		var ib_total, ib_total_pkts int64
 | 
			
		||||
		for i := range info.portCounterFiles {
 | 
			
		||||
			counterDef := &info.portCounterFiles[i]
 | 
			
		||||
 | 
			
		||||
			// Read counter file
 | 
			
		||||
		var ib_total, ib_total_pkgs int64
 | 
			
		||||
		for j := range info.portCounterFiles {
 | 
			
		||||
			counterDef := &info.portCounterFiles[j]
 | 
			
		||||
			line, err := os.ReadFile(counterDef.path)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				cclog.ComponentError(
 | 
			
		||||
					m.name,
 | 
			
		||||
					fmt.Sprintf("Read(): Failed to read from file '%s': %v", counterDef.path, err))
 | 
			
		||||
				cclog.ComponentError(m.name, fmt.Sprintf("Read(): Failed to read from file '%s': %v", counterDef.path, err))
 | 
			
		||||
				continue
 | 
			
		||||
			}
 | 
			
		||||
			data := strings.TrimSpace(string(line))
 | 
			
		||||
 | 
			
		||||
			// convert counter to int64
 | 
			
		||||
			v, err := strconv.ParseInt(data, 10, 64)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				cclog.ComponentError(
 | 
			
		||||
					m.name,
 | 
			
		||||
					fmt.Sprintf("Read(): Failed to convert Infininiband metrice %s='%s' to int64: %v", counterDef.name, data, err))
 | 
			
		||||
				cclog.ComponentError(m.name, fmt.Sprintf("Read(): Failed to convert counter %s='%s': %v", counterDef.name, data, err))
 | 
			
		||||
				continue
 | 
			
		||||
			}
 | 
			
		||||
			// Scale raw value
 | 
			
		||||
			// Scale raw value.
 | 
			
		||||
			v *= counterDef.scale
 | 
			
		||||
 | 
			
		||||
			// Save current state
 | 
			
		||||
			counterDef.currentState = v
 | 
			
		||||
 | 
			
		||||
			// Send absolut values
 | 
			
		||||
			if m.config.SendAbsoluteValues {
 | 
			
		||||
				if y, err :=
 | 
			
		||||
					lp.NewMessage(
 | 
			
		||||
						counterDef.name,
 | 
			
		||||
						info.tagSet,
 | 
			
		||||
						m.meta,
 | 
			
		||||
						map[string]interface{}{
 | 
			
		||||
							"value": counterDef.currentState,
 | 
			
		||||
						},
 | 
			
		||||
						now); err == nil {
 | 
			
		||||
			// Send absolute values.
 | 
			
		||||
			if m.config.SendAbsoluteValues && m.shouldOutput(counterDef.name) {
 | 
			
		||||
				if y, err := lp.NewMessage(counterDef.name, info.tagSet, m.meta, map[string]interface{}{"value": counterDef.currentState}, now); err == nil {
 | 
			
		||||
					y.AddMeta("unit", counterDef.unit)
 | 
			
		||||
					output <- y
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			// Send derived values
 | 
			
		||||
			// Send derived values.
 | 
			
		||||
			if m.config.SendDerivedValues {
 | 
			
		||||
				if counterDef.lastState >= 0 {
 | 
			
		||||
					rate := float64((counterDef.currentState - counterDef.lastState)) / timeDiff
 | 
			
		||||
					if y, err :=
 | 
			
		||||
						lp.NewMessage(
 | 
			
		||||
							counterDef.name+"_bw",
 | 
			
		||||
							info.tagSet,
 | 
			
		||||
							m.meta,
 | 
			
		||||
							map[string]interface{}{
 | 
			
		||||
								"value": rate,
 | 
			
		||||
							},
 | 
			
		||||
							now); err == nil {
 | 
			
		||||
					rate := float64(counterDef.currentState-counterDef.lastState) / timeDiff
 | 
			
		||||
					if m.shouldOutput(counterDef.name + "_bw") {
 | 
			
		||||
						if y, err := lp.NewMessage(counterDef.name+"_bw", info.tagSet, m.meta, map[string]interface{}{"value": rate}, now); err == nil {
 | 
			
		||||
							y.AddMeta("unit", counterDef.unit+"/sec")
 | 
			
		||||
							output <- y
 | 
			
		||||
 | 
			
		||||
						}
 | 
			
		||||
					}
 | 
			
		||||
				}
 | 
			
		||||
				counterDef.lastState = counterDef.currentState
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			// Sum up total values
 | 
			
		||||
			// Sum up total values if enabled.
 | 
			
		||||
			if m.config.SendTotalValues {
 | 
			
		||||
				switch {
 | 
			
		||||
				case counterDef.addToIBTotal:
 | 
			
		||||
				if counterDef.addToIBTotal {
 | 
			
		||||
					ib_total += counterDef.currentState
 | 
			
		||||
				case counterDef.addToIBTotalPkgs:
 | 
			
		||||
					ib_total_pkts += counterDef.currentState
 | 
			
		||||
				} else if counterDef.addToIBTotalPkgs {
 | 
			
		||||
					ib_total_pkgs += counterDef.currentState
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		// Send total values
 | 
			
		||||
		// Send total values.
 | 
			
		||||
		if m.config.SendTotalValues {
 | 
			
		||||
			if y, err :=
 | 
			
		||||
				lp.NewMessage(
 | 
			
		||||
					"ib_total",
 | 
			
		||||
					info.tagSet,
 | 
			
		||||
					m.meta,
 | 
			
		||||
					map[string]interface{}{
 | 
			
		||||
						"value": ib_total,
 | 
			
		||||
					},
 | 
			
		||||
					now); err == nil {
 | 
			
		||||
			if m.shouldOutput("ib_total") {
 | 
			
		||||
				if y, err := lp.NewMessage("ib_total", info.tagSet, m.meta, map[string]interface{}{"value": ib_total}, now); err == nil {
 | 
			
		||||
					y.AddMeta("unit", "bytes")
 | 
			
		||||
					output <- y
 | 
			
		||||
				}
 | 
			
		||||
 | 
			
		||||
			if y, err :=
 | 
			
		||||
				lp.NewMessage(
 | 
			
		||||
					"ib_total_pkts",
 | 
			
		||||
					info.tagSet,
 | 
			
		||||
					m.meta,
 | 
			
		||||
					map[string]interface{}{
 | 
			
		||||
						"value": ib_total_pkts,
 | 
			
		||||
					},
 | 
			
		||||
					now); err == nil {
 | 
			
		||||
			}
 | 
			
		||||
			if m.shouldOutput("ib_total_pkts") {
 | 
			
		||||
				if y, err := lp.NewMessage("ib_total_pkts", info.tagSet, m.meta, map[string]interface{}{"value": ib_total_pkgs}, now); err == nil {
 | 
			
		||||
					y.AddMeta("unit", "packets")
 | 
			
		||||
					output <- y
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (m *InfinibandCollector) Close() {
 | 
			
		||||
 
 | 
			
		||||
@@ -1,4 +1,3 @@
 | 
			
		||||
 | 
			
		||||
## `ibstat` collector
 | 
			
		||||
 | 
			
		||||
```json
 | 
			
		||||
@@ -6,30 +5,41 @@
 | 
			
		||||
    "exclude_devices": [
 | 
			
		||||
      "mlx4"
 | 
			
		||||
    ],
 | 
			
		||||
    "exclude_metrics": [
 | 
			
		||||
      "ib_total"
 | 
			
		||||
    ],
 | 
			
		||||
    "only_metrics": [
 | 
			
		||||
      "ib_revc_bw"
 | 
			
		||||
    ],
 | 
			
		||||
    "send_abs_values": true,
 | 
			
		||||
    "send_derived_values": true
 | 
			
		||||
    "send_derived_values": true,
 | 
			
		||||
    "send_total_values": true
 | 
			
		||||
  }
 | 
			
		||||
```
 | 
			
		||||
 | 
			
		||||
The `ibstat` collector includes all Infiniband devices that can be
 | 
			
		||||
found below `/sys/class/infiniband/` and where any of the ports provides a
 | 
			
		||||
LID file (`/sys/class/infiniband/<dev>/ports/<port>/lid`)
 | 
			
		||||
 | 
			
		||||
The devices can be filtered with the `exclude_devices` option in the configuration.
 | 
			
		||||
The ibstat collector includes all InfiniBand devices found under `/sys/class/infiniband/` for which a LID file (`/sys/class/infiniband/<dev>/ports/<port>/lid`) is present.
 | 
			
		||||
Devices can be filtered with the `exclude_devices` option.
 | 
			
		||||
 | 
			
		||||
For each found LID the collector reads data through the sysfs files below `/sys/class/infiniband/<device>`. (See: <https://www.kernel.org/doc/Documentation/ABI/stable/sysfs-class-infiniband>)
 | 
			
		||||
 | 
			
		||||
Metrics:
 | 
			
		||||
Both filtering mechanisms are supported:
 | 
			
		||||
- `exclude_metrics`: Excludes the specified metrics.
 | 
			
		||||
- `only_metrics`: If provided, only the listed metrics are collected. This takes precedence over `exclude_metrics`.
 | 
			
		||||
 | 
			
		||||
* `ib_recv`
 | 
			
		||||
* `ib_xmit`
 | 
			
		||||
* `ib_recv_pkts`
 | 
			
		||||
* `ib_xmit_pkts`
 | 
			
		||||
* `ib_total = ib_recv + ib_xmit` (if `send_total_values == true`)
 | 
			
		||||
* `ib_total_pkts = ib_recv_pkts + ib_xmit_pkts` (if `send_total_values == true`)
 | 
			
		||||
* `ib_recv_bw` (if `send_derived_values == true`)
 | 
			
		||||
* `ib_xmit_bw` (if `send_derived_values == true`)
 | 
			
		||||
* `ib_recv_pkts_bw` (if `send_derived_values == true`)
 | 
			
		||||
* `ib_xmit_pkts_bw` (if `send_derived_values == true`)
 | 
			
		||||
**Absolute Metrics:**
 | 
			
		||||
- `ib_recv` (unit: `bytes`)
 | 
			
		||||
- `ib_xmit` (unit: `bytes`)
 | 
			
		||||
- `ib_recv_pkts` (unit: `packets`)
 | 
			
		||||
- `ib_xmit_pkts` (unit: `packets`)
 | 
			
		||||
 | 
			
		||||
**Derived Metrics:**
 | 
			
		||||
- `ib_recvi_bw` (unit: `bytes/s`)
 | 
			
		||||
- `ib_xmit_bw` (unit: `bytes/s`)
 | 
			
		||||
- `ib_recv_pkts_bw` (unit: `packets/s`)
 | 
			
		||||
- `ib_xmit_pkts_bw` (unit: `packets/s`)
 | 
			
		||||
 | 
			
		||||
**Global metrics** (if `send_total_values` is enabled):
 | 
			
		||||
- `ib_total` = ib_recv + ib_xmit (unit: `bytes`)
 | 
			
		||||
- `ib_total_pkts` = ib_recv_pkts + ib_xmit_pkts (unit: `packets`)
 | 
			
		||||
 | 
			
		||||
The collector adds a `device` tag to all metrics
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user