Compare commits

...

22 Commits
v0.7.3 ... main

Author SHA1 Message Date
Thomas Roehl
13fc8a53d3 Memstat: Fix mem_shared and add more metrics 2026-03-17 18:07:30 +01:00
Thomas Röhl
1937ef2587 Update cc-lib to 2.8.2 2026-03-13 18:00:26 +01:00
Holger Obermaier
35510d3d39 Use strict JSON decoding 2026-03-13 17:57:33 +01:00
Holger Obermaier
ef5e4c2604 Corrected json config 2026-03-13 17:57:33 +01:00
Holger Obermaier
44401318e4 Enable same linters as in CI pipeline 2026-03-13 17:57:33 +01:00
Holger Obermaier
2e60d3111c Add config option to exclude metrics 2026-03-13 17:57:33 +01:00
Holger Obermaier
e8734c02db Add config option for manual device configuration 2026-03-13 17:57:33 +01:00
Holger Obermaier
54650d40a6 Store query command for later reuse 2026-03-13 17:57:33 +01:00
Holger Obermaier
e7050834f5 * Honor config option excluded devices
* Use device type in read command
2026-03-13 17:57:33 +01:00
Holger Obermaier
893a0d69de Improve error reporting 2026-03-13 17:57:33 +01:00
Holger Obermaier
345119866a Switch from lp.NewMessage to lp.NewMetric 2026-03-13 17:57:33 +01:00
Holger Obermaier
ec917cf802 Switch from lp.NewMessage to lp.NewMetric 2026-03-13 17:57:33 +01:00
Holger Obermaier
c7cfc0723b Fix all linter warnings 2026-03-13 17:57:33 +01:00
Holger Obermaier
4f2685f4c4 Addapt to new ccMessage syntax 2026-03-13 17:57:33 +01:00
Thomas Roehl
439bfacfd9 Add SmartMonCollector to CollectorManager 2026-03-13 17:57:33 +01:00
Thomas Roehl
cd4ac9c885 Add Collector for S.M.A.R.T disk data 2026-03-13 17:57:33 +01:00
Holger Obermaier
eeb60ba0df Add target to build stripped executable 2026-03-12 11:39:43 +01:00
Holger Obermaier
a481a34dcd Avoid duplicate error printing 2026-03-12 10:08:23 +01:00
Holger Obermaier
b65576431e Stricter json parsing (#204) 2026-03-11 15:59:14 +01:00
Holger Obermaier
a927565868 Fix router config syntax 2026-03-10 13:51:06 +01:00
dependabot[bot]
0b67993eb0 Bump github.com/ClusterCockpit/cc-lib/v2 from 2.7.0 to 2.8.0
Bumps [github.com/ClusterCockpit/cc-lib/v2](https://github.com/ClusterCockpit/cc-lib) from 2.7.0 to 2.8.0.
- [Release notes](https://github.com/ClusterCockpit/cc-lib/releases)
- [Commits](https://github.com/ClusterCockpit/cc-lib/compare/v2.7.0...v2.8.0)

---
updated-dependencies:
- dependency-name: github.com/ClusterCockpit/cc-lib/v2
  dependency-version: 2.8.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2026-03-09 07:58:27 +01:00
dependabot[bot]
4164e3d1a3 Bump golang.org/x/sys from 0.41.0 to 0.42.0
Bumps [golang.org/x/sys](https://github.com/golang/sys) from 0.41.0 to 0.42.0.
- [Commits](https://github.com/golang/sys/compare/v0.41.0...v0.42.0)

---
updated-dependencies:
- dependency-name: golang.org/x/sys
  dependency-version: 0.42.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2026-03-09 07:58:11 +01:00
49 changed files with 815 additions and 302 deletions

View File

@@ -27,6 +27,17 @@ $(APP): $(GOSRC) go.mod
$(GOBIN) get
$(GOBIN) build -o $(APP) $(GOSRC_APP)
# -ldflags:
# -s : drops the OS symbol table
# -w : drops DWARF
# -> Panic stack traces still show function names and file:line
.PHONY: build-stripped
build-stripped:
make -C collectors
$(GOBIN) get
$(GOBIN) build -ldflags "-s -w" -trimpath -o $(APP) $(GOSRC_APP)
.PHONY: install
install: $(APP)
@WORKSPACE=$(PREFIX)
@if [ -z "$${WORKSPACE}" ]; then exit 1; fi
@@ -89,7 +100,7 @@ staticcheck:
.PHONY: golangci-lint
golangci-lint:
$(GOBIN) install github.com/golangci/golangci-lint/v2/cmd/golangci-lint@latest
$$($(GOBIN) env GOPATH)/bin/golangci-lint run
$$($(GOBIN) env GOPATH)/bin/golangci-lint run --enable errorlint,govet,misspell,modernize,prealloc,staticcheck,unconvert,wastedassign
.ONESHELL:
.PHONY: RPM

View File

@@ -8,6 +8,7 @@
package main
import (
"bytes"
"encoding/json"
"flag"
"os"
@@ -48,22 +49,22 @@ type RuntimeConfig struct {
Sync sync.WaitGroup
}
// ReadCli reads the command line arguments
func ReadCli() map[string]string {
var m map[string]string
cfg := flag.String("config", "./config.json", "Path to configuration file")
logfile := flag.String("log", "stderr", "Path for logfile")
once := flag.Bool("once", false, "Run all collectors only once")
loglevel := flag.String("loglevel", "info", "Set log level")
flag.Parse()
m = make(map[string]string)
m["configfile"] = *cfg
m["logfile"] = *logfile
m := map[string]string{
"configfile": *cfg,
"logfile": *logfile,
"once": "false",
"loglevel": *loglevel,
}
if *once {
m["once"] = "true"
} else {
m["once"] = "false"
}
m["loglevel"] = *loglevel
return m
}
@@ -120,9 +121,10 @@ func mainFunc() int {
// Load and check configuration
main := ccconf.GetPackageConfig("main")
err = json.Unmarshal(main, &rcfg.ConfigFile)
if err != nil {
cclog.Error("Error reading configuration file ", rcfg.CliArgs["configfile"], ": ", err.Error())
d := json.NewDecoder(bytes.NewReader(main))
d.DisallowUnknownFields()
if err := d.Decode(&rcfg.ConfigFile); err != nil {
cclog.Errorf("Error reading configuration file %s: %v", rcfg.CliArgs["configfile"], err)
return 1
}

View File

@@ -59,6 +59,7 @@ In contrast to the configuration files for sinks and receivers, the collectors c
* [ ] Aggreate metrics to higher topology entity (sum hwthread metrics to socket metric, ...). Needs to be configurable
# Contributing own collectors
A collector reads data from any source, parses it to metrics and submits these metrics to the `metric-collector`. A collector provides three function:
* `Name() string`: Return the name of the collector
@@ -104,8 +105,10 @@ func (m *SampleCollector) Init(config json.RawMessage) error {
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
}
if len(config) > 0 {
if err := json.Unmarshal(config, &m.config); err != nil {
return fmt.Errorf("%s Init(): json.Unmarshal() call failed: %w", m.name, err)
d := json.NewDecoder(bytes.NewReader(config))
d.DisallowUnknownFields()
if err := d.Decode(&m.config); err != nil {
return fmt.Errorf("%s Init(): Error decoding JSON config: %w", m.name, err)
}
}
m.meta = map[string]string{"source": m.name, "group": "Sample"}

View File

@@ -30,9 +30,9 @@ const DEFAULT_BEEGFS_CMD = "beegfs-ctl"
// Struct for the collector-specific JSON config
type BeegfsMetaCollectorConfig struct {
Beegfs string `json:"beegfs_path"`
ExcludeMetrics []string `json:"exclude_metrics,omitempty"`
ExcludeFilesystem []string `json:"exclude_filesystem"`
Beegfs string `json:"beegfs_path"`
ExcludeMetrics []string `json:"exclude_metrics,omitempty"`
ExcludeFilesystems []string `json:"exclude_filesystem"`
}
type BeegfsMetaCollector struct {
@@ -74,9 +74,10 @@ func (m *BeegfsMetaCollector) Init(config json.RawMessage) error {
// Read JSON configuration
if len(config) > 0 {
err := json.Unmarshal(config, &m.config)
if err != nil {
return err
d := json.NewDecoder(bytes.NewReader(config))
d.DisallowUnknownFields()
if err := d.Decode(&m.config); err != nil {
return fmt.Errorf("%s Init(): Failed to decode JSON config: %w", m.name, err)
}
}
@@ -99,23 +100,23 @@ func (m *BeegfsMetaCollector) Init(config json.RawMessage) error {
"filesystem": "",
}
m.skipFS = make(map[string]struct{})
for _, fs := range m.config.ExcludeFilesystem {
for _, fs := range m.config.ExcludeFilesystems {
m.skipFS[fs] = struct{}{}
}
// Beegfs file system statistics can only be queried by user root
user, err := user.Current()
if err != nil {
return fmt.Errorf("BeegfsMetaCollector.Init(): Failed to get current user: %w", err)
return fmt.Errorf("%s Init(): Failed to get current user: %w", m.name, err)
}
if user.Uid != "0" {
return fmt.Errorf("BeegfsMetaCollector.Init(): BeeGFS file system statistics can only be queried by user root")
return fmt.Errorf("%s Init(): BeeGFS file system statistics can only be queried by user root", m.name)
}
// Check if beegfs-ctl is in executable search path
_, err = exec.LookPath(m.config.Beegfs)
if err != nil {
return fmt.Errorf("BeegfsMetaCollector.Init(): Failed to find beegfs-ctl binary '%s': %w", m.config.Beegfs, err)
return fmt.Errorf("%s Init(): Failed to find beegfs-ctl binary '%s': %w", m.name, m.config.Beegfs, err)
}
m.init = true
return nil

View File

@@ -28,9 +28,9 @@ import (
// Struct for the collector-specific JSON config
type BeegfsStorageCollectorConfig struct {
Beegfs string `json:"beegfs_path"`
ExcludeMetrics []string `json:"exclude_metrics,omitempty"`
ExcludeFilesystem []string `json:"exclude_filesystem"`
Beegfs string `json:"beegfs_path"`
ExcludeMetrics []string `json:"exclude_metrics,omitempty"`
ExcludeFilesystems []string `json:"exclude_filesystem"`
}
type BeegfsStorageCollector struct {
@@ -67,9 +67,10 @@ func (m *BeegfsStorageCollector) Init(config json.RawMessage) error {
// Read JSON configuration
if len(config) > 0 {
err := json.Unmarshal(config, &m.config)
if err != nil {
return err
d := json.NewDecoder(bytes.NewReader(config))
d.DisallowUnknownFields()
if err := d.Decode(&m.config); err != nil {
return fmt.Errorf("%s Init(): failed to decode JSON config: %w", m.name, err)
}
}
@@ -92,23 +93,23 @@ func (m *BeegfsStorageCollector) Init(config json.RawMessage) error {
"filesystem": "",
}
m.skipFS = make(map[string]struct{})
for _, fs := range m.config.ExcludeFilesystem {
for _, fs := range m.config.ExcludeFilesystems {
m.skipFS[fs] = struct{}{}
}
// Beegfs file system statistics can only be queried by user root
user, err := user.Current()
if err != nil {
return fmt.Errorf("BeegfsStorageCollector.Init(): Failed to get current user: %w", err)
return fmt.Errorf("%s Init(): Failed to get current user: %w", m.name, err)
}
if user.Uid != "0" {
return fmt.Errorf("BeegfsStorageCollector.Init(): BeeGFS file system statistics can only be queried by user root")
return fmt.Errorf("%s Init(): BeeGFS file system statistics can only be queried by user root", m.name)
}
// Check if beegfs-ctl is in executable search path
_, err = exec.LookPath(m.config.Beegfs)
if err != nil {
return fmt.Errorf("BeegfsStorageCollector.Init(): Failed to find beegfs-ctl binary '%s': %w", m.config.Beegfs, err)
return fmt.Errorf("%s Init(): Failed to find beegfs-ctl binary '%s': %w", m.name, m.config.Beegfs, err)
}
m.init = true
return nil

View File

@@ -14,14 +14,14 @@ This Collector is to collect BeeGFS on Demand (BeeOND) storage stats.
```json
"beegfs_storage": {
"beegfs_path": "/usr/bin/beegfs-ctl",
"beegfs_path": "/usr/bin/beegfs-ctl",
"exclude_filesystem": [
"/mnt/ignore_me"
],
"exclude_metrics": [
"ack",
"storInf",
"unlnk"
"ack",
"storInf",
"unlnk"
]
}
```

View File

@@ -8,6 +8,7 @@
package collectors
import (
"bytes"
"encoding/json"
"fmt"
"sync"
@@ -48,6 +49,7 @@ var AvailableCollectors = map[string]MetricCollector{
"schedstat": new(SchedstatCollector),
"nfsiostat": new(NfsIOStatCollector),
"slurm_cgroup": new(SlurmCgroupCollector),
"smartmon": new(SmartMonCollector),
}
// Metric collector manager data structure
@@ -88,10 +90,10 @@ func (cm *collectorManager) Init(ticker mct.MultiChanTicker, duration time.Durat
cm.ticker = ticker
cm.duration = duration
err := json.Unmarshal(collectConfig, &cm.config)
if err != nil {
cclog.Error(err.Error())
return err
d := json.NewDecoder(bytes.NewReader(collectConfig))
d.DisallowUnknownFields()
if err := d.Decode(&cm.config); err != nil {
return fmt.Errorf("%s Init(): Error decoding collector manager config: %w", "CollectorManager", err)
}
// Initialize configured collectors
@@ -102,7 +104,7 @@ func (cm *collectorManager) Init(ticker mct.MultiChanTicker, duration time.Durat
}
collector := AvailableCollectors[collectorName]
err = collector.Init(collectorCfg)
err := collector.Init(collectorCfg)
if err != nil {
cclog.ComponentError("CollectorManager", fmt.Sprintf("Collector %s initialization failed: %v", collectorName, err))
continue

View File

@@ -12,7 +12,9 @@ hugo_path: docs/reference/cc-metric-collector/collectors/cpufreq_cpuinfo.md
## `cpufreq_cpuinfo` collector
```json
"cpufreq_cpuinfo": {}
"cpufreq_cpuinfo": {
"exclude_metrics": []
}
```
The `cpufreq_cpuinfo` collector reads the clock frequency from `/proc/cpuinfo` and outputs a handful **hwthread** metrics.

View File

@@ -8,6 +8,7 @@
package collectors
import (
"bytes"
"encoding/json"
"fmt"
"os"
@@ -54,9 +55,10 @@ func (m *CPUFreqCollector) Init(config json.RawMessage) error {
}
m.parallel = true
if len(config) > 0 {
err := json.Unmarshal(config, &m.config)
if err != nil {
return err
d := json.NewDecoder(bytes.NewReader(config))
d.DisallowUnknownFields()
if err := d.Decode(&m.config); err != nil {
return fmt.Errorf("%s Init(): failed to decode JSON config: %w", m.name, err)
}
}
m.meta = map[string]string{
@@ -77,7 +79,7 @@ func (m *CPUFreqCollector) Init(config json.RawMessage) error {
scalingCurFreqFile := filepath.Join("/sys/devices/system/cpu", fmt.Sprintf("cpu%d", c.CpuID), "cpufreq/scaling_cur_freq")
err := unix.Access(scalingCurFreqFile, unix.R_OK)
if err != nil {
return fmt.Errorf("unable to access file '%s': %w", scalingCurFreqFile, err)
return fmt.Errorf("%s Init(): unable to access file '%s': %w", m.name, scalingCurFreqFile, err)
}
m.topology = append(m.topology,

View File

@@ -9,6 +9,7 @@ package collectors
import (
"bufio"
"bytes"
"encoding/json"
"fmt"
"os"
@@ -53,9 +54,10 @@ func (m *CpustatCollector) Init(config json.RawMessage) error {
"type": "node",
}
if len(config) > 0 {
err := json.Unmarshal(config, &m.config)
if err != nil {
return err
d := json.NewDecoder(bytes.NewReader(config))
d.DisallowUnknownFields()
if err := d.Decode(&m.config); err != nil {
return fmt.Errorf("%s Init(): Error decoding JSON config: %w", m.name, err)
}
}
matches := map[string]int{
@@ -79,19 +81,10 @@ func (m *CpustatCollector) Init(config json.RawMessage) error {
}
// Check input file
file, err := os.Open(string(CPUSTATFILE))
file, err := os.Open(CPUSTATFILE)
if err != nil {
cclog.ComponentError(
m.name,
fmt.Sprintf("Init(): Failed to open file '%s': %v", string(CPUSTATFILE), err))
return fmt.Errorf("%s Init(): Failed to open file '%s': %w", m.name, CPUSTATFILE, err)
}
defer func() {
if err := file.Close(); err != nil {
cclog.ComponentError(
m.name,
fmt.Sprintf("Init(): Failed to close file '%s': %v", string(CPUSTATFILE), err))
}
}()
// Pre-generate tags for all CPUs
num_cpus := 0
@@ -120,6 +113,12 @@ func (m *CpustatCollector) Init(config json.RawMessage) error {
num_cpus++
}
}
// Close file
if err := file.Close(); err != nil {
return fmt.Errorf("%s Init(): Failed to close file '%s': %w", m.name, CPUSTATFILE, err)
}
m.lastTimestamp = time.Now()
m.init = true
return nil
@@ -166,11 +165,11 @@ func (m *CpustatCollector) Read(interval time.Duration, output chan lp.CCMessage
now := time.Now()
tsdelta := now.Sub(m.lastTimestamp)
file, err := os.Open(string(CPUSTATFILE))
file, err := os.Open(CPUSTATFILE)
if err != nil {
cclog.ComponentError(
m.name,
fmt.Sprintf("Read(): Failed to open file '%s': %v", string(CPUSTATFILE), err))
fmt.Sprintf("Read(): Failed to open file '%s': %v", CPUSTATFILE, err))
}
defer func() {
if err := file.Close(); err != nil {

View File

@@ -8,8 +8,8 @@
package collectors
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"os"
"os/exec"
@@ -47,8 +47,10 @@ func (m *CustomCmdCollector) Init(config json.RawMessage) error {
// Read configuration
if len(config) > 0 {
if err := json.Unmarshal(config, &m.config); err != nil {
return fmt.Errorf("%s Init(): json.Unmarshal() call failed: %w", m.name, err)
d := json.NewDecoder(bytes.NewReader(config))
d.DisallowUnknownFields()
if err := d.Decode(&m.config); err != nil {
return fmt.Errorf("%s Init(): Error decoding JSON config: %w", m.name, err)
}
}
@@ -82,7 +84,7 @@ func (m *CustomCmdCollector) Init(config json.RawMessage) error {
}
if len(m.files) == 0 && len(m.cmdFieldsSlice) == 0 {
return errors.New("no metrics to collect")
return fmt.Errorf("%s Init(): no metrics to collect", m.name)
}
m.init = true
return nil

View File

@@ -9,6 +9,7 @@ package collectors
import (
"bufio"
"bytes"
"encoding/json"
"fmt"
"os"
@@ -42,8 +43,10 @@ func (m *DiskstatCollector) Init(config json.RawMessage) error {
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
}
if len(config) > 0 {
if err := json.Unmarshal(config, &m.config); err != nil {
return err
d := json.NewDecoder(bytes.NewReader(config))
d.DisallowUnknownFields()
if err := d.Decode(&m.config); err != nil {
return fmt.Errorf("%s Init(): Error decoding JSON config: %w", m.name, err)
}
}
m.allowedMetrics = map[string]bool{

View File

@@ -32,7 +32,7 @@ type GpfsCollectorState map[string]int64
type GpfsCollectorConfig struct {
Mmpmon string `json:"mmpmon_path,omitempty"`
ExcludeFilesystem []string `json:"exclude_filesystem,omitempty"`
ExcludeFilesystems []string `json:"exclude_filesystem,omitempty"`
ExcludeMetrics []string `json:"exclude_metrics,omitempty"`
Sudo bool `json:"use_sudo,omitempty"`
SendAbsoluteValues bool `json:"send_abs_values,omitempty"`
@@ -322,9 +322,10 @@ func (m *GpfsCollector) Init(config json.RawMessage) error {
// Read JSON configuration
if len(config) > 0 {
err := json.Unmarshal(config, &m.config)
if err != nil {
return fmt.Errorf("%s Init(): failed to unmarshal JSON config: %w", m.name, err)
d := json.NewDecoder(bytes.NewReader(config))
d.DisallowUnknownFields()
if err := d.Decode(&m.config); err != nil {
return fmt.Errorf("%s Init(): failed to decode JSON config: %w", m.name, err)
}
}
m.meta = map[string]string{
@@ -336,7 +337,7 @@ func (m *GpfsCollector) Init(config json.RawMessage) error {
"filesystem": "",
}
m.skipFS = make(map[string]struct{})
for _, fs := range m.config.ExcludeFilesystem {
for _, fs := range m.config.ExcludeFilesystems {
m.skipFS[fs] = struct{}{}
}
m.lastState = make(map[string]GpfsCollectorState)
@@ -346,18 +347,15 @@ func (m *GpfsCollector) Init(config json.RawMessage) error {
if !m.config.Sudo {
user, err := user.Current()
if err != nil {
cclog.ComponentError(m.name, "Failed to get current user:", err.Error())
return err
return fmt.Errorf("%s Init(): failed to get current user: %w", m.name, err)
}
if user.Uid != "0" {
cclog.ComponentError(m.name, "GPFS file system statistics can only be queried by user root")
return err
return fmt.Errorf("%s Init(): GPFS file system statistics can only be queried by user root", m.name)
}
} else {
p, err := exec.LookPath("sudo")
if err != nil {
cclog.ComponentError(m.name, "Cannot find 'sudo'")
return err
return fmt.Errorf("%s Init(): cannot find 'sudo': %w", m.name, err)
}
m.sudoCmd = p
}
@@ -377,7 +375,6 @@ func (m *GpfsCollector) Init(config json.RawMessage) error {
// the file was given in the config, use it
p = m.config.Mmpmon
} else {
cclog.ComponentError(m.name, fmt.Sprintf("failed to find mmpmon binary '%s': %v", m.config.Mmpmon, err))
return fmt.Errorf("%s Init(): failed to find mmpmon binary '%s': %w", m.name, m.config.Mmpmon, err)
}
}
@@ -434,7 +431,7 @@ func (m *GpfsCollector) Init(config json.RawMessage) error {
}
}
if len(m.definitions) == 0 {
return errors.New("no metrics to collect")
return fmt.Errorf("%s Init(): no metrics to collect", m.name)
}
m.init = true

View File

@@ -14,7 +14,7 @@ hugo_path: docs/reference/cc-metric-collector/collectors/gpfs.md
```json
"gpfs": {
"mmpmon_path": "/path/to/mmpmon",
"use_sudo": "true",
"use_sudo": true,
"exclude_filesystem": [
"fs1"
],

View File

@@ -8,6 +8,7 @@
package collectors
import (
"bytes"
"encoding/json"
"fmt"
"os"
@@ -79,9 +80,10 @@ func (m *InfinibandCollector) Init(config json.RawMessage) error {
m.config.SendDerivedValues = false
// Read configuration file, allow overwriting default config
if len(config) > 0 {
err = json.Unmarshal(config, &m.config)
if err != nil {
return err
d := json.NewDecoder(bytes.NewReader(config))
d.DisallowUnknownFields()
if err := d.Decode(&m.config); err != nil {
return fmt.Errorf("%s Init(): failed to decode JSON config: %w", m.name, err)
}
}

View File

@@ -9,8 +9,8 @@ package collectors
import (
"bufio"
"bytes"
"encoding/json"
"errors"
"fmt"
"os"
"slices"
@@ -44,7 +44,6 @@ type IOstatCollector struct {
}
func (m *IOstatCollector) Init(config json.RawMessage) error {
var err error
m.name = "IOstatCollector"
m.parallel = true
m.meta = map[string]string{"source": m.name, "group": "Disk"}
@@ -52,9 +51,10 @@ func (m *IOstatCollector) Init(config json.RawMessage) error {
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
}
if len(config) > 0 {
err = json.Unmarshal(config, &m.config)
if err != nil {
return err
d := json.NewDecoder(bytes.NewReader(config))
d.DisallowUnknownFields()
if err := d.Decode(&m.config); err != nil {
return fmt.Errorf("%s Init(): Error decoding JSON config: %w", m.name, err)
}
}
// https://www.kernel.org/doc/html/latest/admin-guide/iostats.html
@@ -85,7 +85,7 @@ func (m *IOstatCollector) Init(config json.RawMessage) error {
}
}
if len(m.matches) == 0 {
return errors.New("no metrics to collect")
return fmt.Errorf("%s Init(): no metrics to collect", m.name)
}
file, err := os.Open(IOSTATFILE)
if err != nil {
@@ -135,7 +135,7 @@ func (m *IOstatCollector) Init(config json.RawMessage) error {
}
m.init = true
return err
return nil
}
func (m *IOstatCollector) Read(interval time.Duration, output chan lp.CCMessage) {

View File

@@ -11,7 +11,6 @@ import (
"bufio"
"bytes"
"encoding/json"
"errors"
"fmt"
"io"
"os/exec"
@@ -56,9 +55,10 @@ func (m *IpmiCollector) Init(config json.RawMessage) error {
m.config.IpmitoolPath = "ipmitool"
m.config.IpmisensorsPath = "ipmi-sensors"
if len(config) > 0 {
err := json.Unmarshal(config, &m.config)
if err != nil {
return err
d := json.NewDecoder(bytes.NewReader(config))
d.DisallowUnknownFields()
if err := d.Decode(&m.config); err != nil {
return fmt.Errorf("%s Init(): Error decoding JSON config: %w", m.name, err)
}
}
// Check if executables ipmitool or ipmisensors are found
@@ -67,7 +67,7 @@ func (m *IpmiCollector) Init(config json.RawMessage) error {
command := exec.Command(p)
err := command.Run()
if err != nil {
cclog.ComponentError(m.name, fmt.Sprintf("Failed to execute %s: %v", p, err.Error()))
cclog.ComponentError(m.name, fmt.Sprintf("Failed to execute %s: %s", p, err.Error()))
m.ipmitool = ""
} else {
m.ipmitool = p
@@ -78,14 +78,14 @@ func (m *IpmiCollector) Init(config json.RawMessage) error {
command := exec.Command(p)
err := command.Run()
if err != nil {
cclog.ComponentError(m.name, fmt.Sprintf("Failed to execute %s: %v", p, err.Error()))
cclog.ComponentError(m.name, fmt.Sprintf("Failed to execute %s: %s", p, err.Error()))
m.ipmisensors = ""
} else {
m.ipmisensors = p
}
}
if len(m.ipmitool) == 0 && len(m.ipmisensors) == 0 {
return errors.New("no usable IPMI reader found")
return fmt.Errorf("%s Init(): no usable IPMI reader found", m.name)
}
m.init = true

View File

@@ -14,7 +14,7 @@ hugo_path: docs/reference/cc-metric-collector/collectors/ipmi.md
```json
"ipmistat": {
"ipmitool_path": "/path/to/ipmitool",
"ipmisensors_path": "/path/to/ipmi-sensors",
"ipmisensors_path": "/path/to/ipmi-sensors"
}
```

View File

@@ -16,8 +16,8 @@ package collectors
import "C"
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"maps"
"math"
@@ -207,24 +207,25 @@ func (m *LikwidCollector) Init(config json.RawMessage) error {
m.config.LibraryPath = LIKWID_LIB_NAME
m.config.LockfilePath = LIKWID_DEF_LOCKFILE
if len(config) > 0 {
err := json.Unmarshal(config, &m.config)
if err != nil {
return fmt.Errorf("%s Init(): failed to unmarshal JSON config: %w", m.name, err)
d := json.NewDecoder(bytes.NewReader(config))
d.DisallowUnknownFields()
if err := d.Decode(&m.config); err != nil {
return fmt.Errorf("%s Init(): Error decoding JSON config: %w", m.name, err)
}
}
lib := dl.New(m.config.LibraryPath, LIKWID_LIB_DL_FLAGS)
if lib == nil {
return fmt.Errorf("error instantiating DynamicLibrary for %s", m.config.LibraryPath)
return fmt.Errorf("%s Init(): error instantiating DynamicLibrary for %s", m.name, m.config.LibraryPath)
}
err := lib.Open()
if err != nil {
return fmt.Errorf("error opening %s: %w", m.config.LibraryPath, err)
return fmt.Errorf("%s Init(): error opening %s: %w", m.name, m.config.LibraryPath, err)
}
if m.config.ForceOverwrite {
cclog.ComponentDebug(m.name, "Set LIKWID_FORCE=1")
if err := os.Setenv("LIKWID_FORCE", "1"); err != nil {
return fmt.Errorf("error setting environment variable LIKWID_FORCE=1: %w", err)
return fmt.Errorf("%s Init(): error setting environment variable LIKWID_FORCE=1: %w", m.name, err)
}
}
if err := m.setup(); err != nil {
@@ -295,16 +296,12 @@ func (m *LikwidCollector) Init(config json.RawMessage) error {
// If no event set could be added, shut down LikwidCollector
if totalMetrics == 0 {
err := errors.New("no LIKWID eventset or metric usable")
cclog.ComponentError(m.name, err.Error())
return err
return fmt.Errorf("%s Init(): no LIKWID eventset or metric usable", m.name)
}
ret := C.topology_init()
if ret != 0 {
err := errors.New("failed to initialize topology module")
cclog.ComponentError(m.name, err.Error())
return err
return fmt.Errorf("%s Init(): failed to initialize topology module", m.name)
}
m.measureThread = thread.New()
switch m.config.AccessMode {
@@ -319,7 +316,7 @@ func (m *LikwidCollector) Init(config json.RawMessage) error {
p = m.config.DaemonPath
}
if err := os.Setenv("PATH", p); err != nil {
return fmt.Errorf("error setting environment variable PATH=%s: %w", p, err)
return fmt.Errorf("%s Init(): error setting environment variable PATH=%s: %w", m.name, p, err)
}
}
C.HPMmode(1)

View File

@@ -8,6 +8,7 @@
package collectors
import (
"bytes"
"encoding/json"
"fmt"
"os"
@@ -48,9 +49,10 @@ func (m *LoadavgCollector) Init(config json.RawMessage) error {
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
}
if len(config) > 0 {
err := json.Unmarshal(config, &m.config)
if err != nil {
return err
d := json.NewDecoder(bytes.NewReader(config))
d.DisallowUnknownFields()
if err := d.Decode(&m.config); err != nil {
return fmt.Errorf("%s Init(): Error decoding JSON config: %w", m.name, err)
}
}
m.meta = map[string]string{
@@ -63,16 +65,17 @@ func (m *LoadavgCollector) Init(config json.RawMessage) error {
"load_five",
"load_fifteen",
}
m.load_skips = make([]bool, len(m.load_matches))
m.proc_matches = []string{
"proc_run",
"proc_total",
}
m.proc_skips = make([]bool, len(m.proc_matches))
m.load_skips = make([]bool, len(m.load_matches))
for i, name := range m.load_matches {
m.load_skips[i] = slices.Contains(m.config.ExcludeMetrics, name)
}
m.proc_skips = make([]bool, len(m.proc_matches))
for i, name := range m.proc_matches {
m.proc_skips[i] = slices.Contains(m.config.ExcludeMetrics, name)
}

View File

@@ -8,6 +8,7 @@
package collectors
import (
"bytes"
"encoding/json"
"errors"
"fmt"
@@ -18,7 +19,6 @@ import (
"strings"
"time"
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
lp "github.com/ClusterCockpit/cc-lib/v2/ccMessage"
)
@@ -300,9 +300,10 @@ func (m *LustreCollector) Init(config json.RawMessage) error {
m.name = "LustreCollector"
m.parallel = true
if len(config) > 0 {
err = json.Unmarshal(config, &m.config)
if err != nil {
return err
d := json.NewDecoder(bytes.NewReader(config))
d.DisallowUnknownFields()
if err := d.Decode(&m.config); err != nil {
return fmt.Errorf("%s Init(): Error decoding JSON config: %w", m.name, err)
}
}
if err := m.setup(); err != nil {
@@ -316,18 +317,15 @@ func (m *LustreCollector) Init(config json.RawMessage) error {
if !m.config.Sudo {
user, err := user.Current()
if err != nil {
cclog.ComponentError(m.name, "Failed to get current user:", err.Error())
return err
return fmt.Errorf("%s Init(): Failed to get current user: %w", m.name, err)
}
if user.Uid != "0" {
cclog.ComponentError(m.name, "Lustre file system statistics can only be queried by user root")
return err
return fmt.Errorf("%s Init(): Lustre file system statistics can only be queried by user root", m.name)
}
} else {
p, err := exec.LookPath("sudo")
if err != nil {
cclog.ComponentError(m.name, "Cannot find 'sudo'")
return err
return fmt.Errorf("%s Init(): Cannot find 'sudo': %w", m.name, err)
}
m.sudoCmd = p
}
@@ -336,7 +334,7 @@ func (m *LustreCollector) Init(config json.RawMessage) error {
if err != nil {
p, err = exec.LookPath(LCTL_CMD)
if err != nil {
return err
return fmt.Errorf("%s Init(): Cannot find %s command: %w", m.name, LCTL_CMD, err)
}
}
m.lctl = p
@@ -364,12 +362,12 @@ func (m *LustreCollector) Init(config json.RawMessage) error {
}
}
if len(m.definitions) == 0 {
return errors.New("no metrics to collect")
return fmt.Errorf("%s Init(): no metrics to collect", m.name)
}
devices := m.getDevices()
if len(devices) == 0 {
return errors.New("no Lustre devices found")
return fmt.Errorf("%s Init(): no Lustre devices found", m.name)
}
m.stats = make(map[string]map[string]int64)
for _, d := range devices {

View File

@@ -9,8 +9,8 @@ package collectors
import (
"bufio"
"bytes"
"encoding/json"
"errors"
"fmt"
"os"
"path/filepath"
@@ -95,15 +95,15 @@ func getStats(filename string) map[string]MemstatStats {
}
func (m *MemstatCollector) Init(config json.RawMessage) error {
var err error
m.name = "MemstatCollector"
m.parallel = true
m.config.NodeStats = true
m.config.NumaStats = false
if len(config) > 0 {
err = json.Unmarshal(config, &m.config)
if err != nil {
return err
d := json.NewDecoder(bytes.NewReader(config))
d.DisallowUnknownFields()
if err := d.Decode(&m.config); err != nil {
return fmt.Errorf("%s Init(): Error decoding JSON config: %w", m.name, err)
}
}
m.meta = map[string]string{"source": m.name, "group": "Memory"}
@@ -111,16 +111,38 @@ func (m *MemstatCollector) Init(config json.RawMessage) error {
m.matches = make(map[string]string)
m.tags = map[string]string{"type": "node"}
matches := map[string]string{
"MemTotal": "mem_total",
"SwapTotal": "swap_total",
"SReclaimable": "mem_sreclaimable",
"Slab": "mem_slab",
"MemFree": "mem_free",
"Buffers": "mem_buffers",
"Cached": "mem_cached",
"MemAvailable": "mem_available",
"SwapFree": "swap_free",
"MemShared": "mem_shared",
"MemTotal": "mem_total",
"SwapTotal": "swap_total",
"SReclaimable": "mem_sreclaimable",
"Slab": "mem_slab",
"MemFree": "mem_free",
"Buffers": "mem_buffers",
"Cached": "mem_cached",
"MemAvailable": "mem_available",
"SwapFree": "swap_free",
"Shmem": "mem_shared",
"Active": "mem_active",
"Inactive": "mem_inactive",
"Dirty": "mem_dirty",
"Writeback": "mem_writeback",
"AnonPages": "mem_anon_pages",
"Mapped": "mem_mapped",
"VmallocTotal": "mem_vmalloc_total",
"AnonHugePages": "mem_anon_hugepages",
"ShmemHugePages": "mem_shared_hugepages",
"ShmemPmdMapped": "mem_shared_pmd_mapped",
"HugePages_Total": "mem_hugepages_total",
"HugePages_Free": "mem_hugepages_free",
"HugePages_Rsvd": "mem_hugepages_reserved",
"HugePages_Surp": "mem_hugepages_surplus",
"Hugepagesize": "mem_hugepages_size",
"DirectMap4k": "mem_direct_mapped_4k",
"DirectMap4M": "mem_direct_mapped_4m",
"DirectMap2M": "mem_direct_mapped_2m",
"DirectMap1G": "mem_direct_mapped_1g",
"Mlocked": "mem_locked",
"PageTables": "mem_pagetables",
"KernelStack": "mem_kernelstack",
}
for k, v := range matches {
if !slices.Contains(m.config.ExcludeMetrics, k) {
@@ -132,7 +154,7 @@ func (m *MemstatCollector) Init(config json.RawMessage) error {
m.sendMemUsed = true
}
if len(m.matches) == 0 {
return errors.New("no metrics to collect")
return fmt.Errorf("%s Init(): no metrics to collect", m.name)
}
if err := m.setup(); err != nil {
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
@@ -140,7 +162,7 @@ func (m *MemstatCollector) Init(config json.RawMessage) error {
if m.config.NodeStats {
if stats := getStats(MEMSTATFILE); len(stats) == 0 {
return fmt.Errorf("cannot read data from file %s", MEMSTATFILE)
return fmt.Errorf("%s Init(): cannot read data from file %s", m.name, MEMSTATFILE)
}
}
@@ -152,7 +174,7 @@ func (m *MemstatCollector) Init(config json.RawMessage) error {
m.nodefiles = make(map[int]MemstatCollectorNode)
for _, f := range files {
if stats := getStats(f); len(stats) == 0 {
return fmt.Errorf("cannot read data from file %s", f)
return fmt.Errorf("%s Init(): cannot read data from file %s", m.name, f)
}
rematch := regex.FindStringSubmatch(f)
if len(rematch) == 2 {
@@ -172,7 +194,7 @@ func (m *MemstatCollector) Init(config json.RawMessage) error {
}
}
m.init = true
return err
return nil
}
func (m *MemstatCollector) Read(interval time.Duration, output chan lp.CCMessage) {
@@ -221,6 +243,12 @@ func (m *MemstatCollector) Read(interval time.Duration, output chan lp.CCMessage
unit = cacheVal.unit
}
}
if shmemVal, shmem := stats["Shmem"]; shmem {
memUsed -= shmemVal.value
if len(shmemVal.unit) > 0 && len(unit) == 0 {
unit = shmemVal.unit
}
}
}
}
}

View File

@@ -32,7 +32,29 @@ Metrics:
* `mem_cached`
* `mem_available`
* `mem_shared`
* `mem_active`
* `mem_inactive`
* `mem_dirty`
* `mem_writeback`
* `mem_anon_pages`
* `mem_mapped`
* `mem_vmalloc_total`
* `mem_anon_hugepages`
* `mem_shared_hugepages`
* `mem_shared_pmd_mapped`
* `mem_hugepages_total`
* `mem_hugepages_free`
* `mem_hugepages_reserved`
* `mem_hugepages_surplus`
* `mem_hugepages_size`
* `mem_direct_mapped_4k`
* `mem_direct_mapped_2m`
* `mem_direct_mapped_4m`
* `mem_direct_mapped_1g`
* `mem_locked`
* `mem_pagetables`
* `mem_kernelstack`
* `swap_total`
* `swap_free`
* `mem_used` = `mem_total` - (`mem_free` + `mem_buffers` + `mem_cached`)
* `mem_used` = `mem_total` - (`mem_free` + `mem_buffers` + `mem_cached` + `mem_shared`)

View File

@@ -9,6 +9,7 @@ package collectors
import (
"bufio"
"bytes"
"encoding/json"
"fmt"
"os"
@@ -99,10 +100,10 @@ func (m *NetstatCollector) Init(config json.RawMessage) error {
m.config.SendDerivedValues = false
// Read configuration file, allow overwriting default config
if len(config) > 0 {
err := json.Unmarshal(config, &m.config)
if err != nil {
cclog.ComponentError(m.name, "Error reading config:", err.Error())
return err
d := json.NewDecoder(bytes.NewReader(config))
d.DisallowUnknownFields()
if err := d.Decode(&m.config); err != nil {
return fmt.Errorf("%s Init(): failed to decode JSON config: %w", m.name, err)
}
}
@@ -133,11 +134,31 @@ func (m *NetstatCollector) Init(config json.RawMessage) error {
// Check if device is a included device
if slices.Contains(m.config.IncludeDevices, canonical) {
// Tag will contain original device name (raw).
tags := map[string]string{"stype": "network", "stype-id": raw, "type": "node"}
meta_unit_byte := map[string]string{"source": m.name, "group": "Network", "unit": "bytes"}
meta_unit_byte_per_sec := map[string]string{"source": m.name, "group": "Network", "unit": "bytes/sec"}
meta_unit_pkts := map[string]string{"source": m.name, "group": "Network", "unit": "packets"}
meta_unit_pkts_per_sec := map[string]string{"source": m.name, "group": "Network", "unit": "packets/sec"}
tags := map[string]string{
"stype": "network",
"stype-id": raw,
"type": "node",
}
meta_unit_byte := map[string]string{
"source": m.name,
"group": "Network",
"unit": "bytes",
}
meta_unit_byte_per_sec := map[string]string{
"source": m.name,
"group": "Network",
"unit": "bytes/sec",
}
meta_unit_pkts := map[string]string{
"source": m.name,
"group": "Network",
"unit": "packets",
}
meta_unit_pkts_per_sec := map[string]string{
"source": m.name,
"group": "Network",
"unit": "packets/sec",
}
m.matches[canonical] = []NetstatCollectorMetric{
{

View File

@@ -8,6 +8,7 @@
package collectors
import (
"bytes"
"encoding/json"
"fmt"
"slices"
@@ -45,12 +46,7 @@ type nfsCollector struct {
}
func (m *nfsCollector) updateStats() error {
cmd := exec.Command(m.config.Nfsstats, `-l`, `--all`)
// Wait for cmd end
if err := cmd.Wait(); err != nil {
return fmt.Errorf("%s updateStats(): %w", m.name, err)
}
cmd := exec.Command(m.config.Nfsstats, "-l", "--all")
buffer, err := cmd.Output()
if err != nil {
@@ -95,9 +91,10 @@ func (m *nfsCollector) MainInit(config json.RawMessage) error {
m.config.Nfsstats = string(NFSSTAT_EXEC)
// Read JSON configuration
if len(config) > 0 {
err := json.Unmarshal(config, &m.config)
if err != nil {
return fmt.Errorf("%s Init(): failed to unmarshal JSON config: %w", m.name, err)
d := json.NewDecoder(bytes.NewReader(config))
d.DisallowUnknownFields()
if err := d.Decode(&m.config); err != nil {
return fmt.Errorf("%s Init(): failed to decode JSON config: %w", m.name, err)
}
}
m.meta = map[string]string{

View File

@@ -8,6 +8,7 @@
package collectors
import (
"bytes"
"encoding/json"
"fmt"
"os"
@@ -17,14 +18,13 @@ import (
"strings"
"time"
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
lp "github.com/ClusterCockpit/cc-lib/v2/ccMessage"
)
// These are the fields we read from the JSON configuration
type NfsIOStatCollectorConfig struct {
ExcludeMetrics []string `json:"exclude_metrics,omitempty"`
ExcludeFilesystem []string `json:"exclude_filesystem,omitempty"`
ExcludeFilesystems []string `json:"exclude_filesystem,omitempty"`
UseServerAddressAsSType bool `json:"use_server_as_stype,omitempty"`
SendAbsoluteValues bool `json:"send_abs_values"`
SendDerivedValues bool `json:"send_derived_values"`
@@ -75,7 +75,7 @@ func (m *NfsIOStatCollector) readNfsiostats() map[string]map[string]int64 {
// Is this a device line with mount point, remote target and NFS version?
dev := resolve_regex_fields(l, deviceRegex)
if len(dev) > 0 {
if !slices.Contains(m.config.ExcludeFilesystem, dev[m.key]) {
if !slices.Contains(m.config.ExcludeFilesystems, dev[m.key]) {
current = dev
if len(current["version"]) == 0 {
current["version"] = "3"
@@ -104,7 +104,6 @@ func (m *NfsIOStatCollector) readNfsiostats() map[string]map[string]int64 {
}
func (m *NfsIOStatCollector) Init(config json.RawMessage) error {
var err error = nil
m.name = "NfsIOStatCollector"
if err := m.setup(); err != nil {
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
@@ -117,10 +116,10 @@ func (m *NfsIOStatCollector) Init(config json.RawMessage) error {
m.config.SendAbsoluteValues = true
m.config.SendDerivedValues = false
if len(config) > 0 {
err = json.Unmarshal(config, &m.config)
if err != nil {
cclog.ComponentError(m.name, "Error reading config:", err.Error())
return err
d := json.NewDecoder(bytes.NewReader(config))
d.DisallowUnknownFields()
if err := d.Decode(&m.config); err != nil {
return fmt.Errorf("%s Init(): failed to decode JSON config: %w", m.name, err)
}
}
m.key = "mntpoint"
@@ -130,7 +129,7 @@ func (m *NfsIOStatCollector) Init(config json.RawMessage) error {
m.data = m.readNfsiostats()
m.lastTimestamp = time.Now()
m.init = true
return err
return nil
}
func (m *NfsIOStatCollector) Read(interval time.Duration, output chan lp.CCMessage) {

View File

@@ -16,7 +16,7 @@ hugo_path: docs/reference/cc-metric-collector/collectors/nfsio.md
"exclude_metrics": [
"oread", "pageread"
],
"exclude_filesystems": [
"exclude_filesystem": [
"/mnt"
],
"use_server_as_stype": false,

View File

@@ -2,6 +2,7 @@ package collectors
import (
"bufio"
"bytes"
"encoding/json"
"fmt"
"os"
@@ -83,9 +84,10 @@ func (m *NUMAStatsCollector) Init(config json.RawMessage) error {
m.config.SendAbsoluteValues = true
if len(config) > 0 {
err := json.Unmarshal(config, &m.config)
if err != nil {
return fmt.Errorf("%s Init(): unable to unmarshal numastat configuration: %w", m.name, err)
d := json.NewDecoder(bytes.NewReader(config))
d.DisallowUnknownFields()
if err := d.Decode(&m.config); err != nil {
return fmt.Errorf("%s Init(): Error decoding JSON config: %w", m.name, err)
}
}

View File

@@ -15,7 +15,7 @@ hugo_path: docs/reference/cc-metric-collector/collectors/numastat.md
"numastats": {
"send_abs_values" : true,
"send_derived_values" : true
}
}
```
The `numastat` collector reads data from `/sys/devices/system/node/node*/numastat` and outputs a handful **memoryDomain** metrics. See: <https://www.kernel.org/doc/html/latest/admin-guide/numastat.html>

View File

@@ -72,9 +72,10 @@ func (m *NvidiaCollector) Init(config json.RawMessage) error {
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
}
if len(config) > 0 {
err = json.Unmarshal(config, &m.config)
if err != nil {
return err
d := json.NewDecoder(strings.NewReader(string(config)))
d.DisallowUnknownFields()
if err = d.Decode(&m.config); err != nil {
return fmt.Errorf("%s Init(): Error decoding JSON config: %w", m.name, err)
}
}
m.meta = map[string]string{
@@ -90,22 +91,18 @@ func (m *NvidiaCollector) Init(config json.RawMessage) error {
// Error: NVML library not found
// (nvml.ErrorString can not be used in this case)
if ret == nvml.ERROR_LIBRARY_NOT_FOUND {
err = fmt.Errorf("NVML library not found")
cclog.ComponentError(m.name, err.Error())
return err
return fmt.Errorf("%s Init(): NVML library not found", m.name)
}
if ret != nvml.SUCCESS {
err = errors.New(nvml.ErrorString(ret))
cclog.ComponentError(m.name, "Unable to initialize NVML", err.Error())
return err
return fmt.Errorf("%s Init(): Unable to initialize NVML: %w", m.name, err)
}
// Number of NVIDIA GPUs
num_gpus, ret := nvml.DeviceGetCount()
if ret != nvml.SUCCESS {
err = errors.New(nvml.ErrorString(ret))
cclog.ComponentError(m.name, "Unable to get device count", err.Error())
return err
return fmt.Errorf("%s Init(): Unable to get device count: %w", m.name, err)
}
// For all GPUs

View File

@@ -8,6 +8,7 @@
package collectors
import (
"bytes"
"encoding/json"
"fmt"
"os"
@@ -67,10 +68,10 @@ func (m *RAPLCollector) Init(config json.RawMessage) error {
// Read in the JSON configuration
if len(config) > 0 {
err := json.Unmarshal(config, &m.config)
if err != nil {
cclog.ComponentError(m.name, "Error reading config:", err.Error())
return err
d := json.NewDecoder(bytes.NewReader(config))
d.DisallowUnknownFields()
if err := d.Decode(&m.config); err != nil {
return fmt.Errorf("%s Init(): Error decoding JSON config: %w", m.name, err)
}
}

View File

@@ -8,8 +8,8 @@
package collectors
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"slices"
"strconv"
@@ -51,7 +51,6 @@ type RocmSmiCollector struct {
// Called once by the collector manager
// All tags, meta data tags and metrics that do not change over the runtime should be set here
func (m *RocmSmiCollector) Init(config json.RawMessage) error {
var err error = nil
// Always set the name early in Init() to use it in cclog.Component* functions
m.name = "RocmSmiCollector"
// This is for later use, also call it early
@@ -60,25 +59,21 @@ func (m *RocmSmiCollector) Init(config json.RawMessage) error {
}
// Read in the JSON configuration
if len(config) > 0 {
err = json.Unmarshal(config, &m.config)
if err != nil {
cclog.ComponentError(m.name, "Error reading config:", err.Error())
return err
d := json.NewDecoder(bytes.NewReader(config))
d.DisallowUnknownFields()
if err := d.Decode(&m.config); err != nil {
return fmt.Errorf("%s Init(): Error decoding JSON config: %w", m.name, err)
}
}
ret := rocm_smi.Init()
if ret != rocm_smi.STATUS_SUCCESS {
err = errors.New("failed to initialize ROCm SMI library")
cclog.ComponentError(m.name, err.Error())
return err
return fmt.Errorf("%s Init(): failed to initialize ROCm SMI library", m.name)
}
numDevs, ret := rocm_smi.NumMonitorDevices()
if ret != rocm_smi.STATUS_SUCCESS {
err = errors.New("failed to get number of GPUs from ROCm SMI library")
cclog.ComponentError(m.name, err.Error())
return err
return fmt.Errorf("%s Init(): failed to get number of GPUs from ROCm SMI library", m.name)
}
m.devices = make([]RocmSmiCollectorDevice, 0)
@@ -90,16 +85,12 @@ func (m *RocmSmiCollector) Init(config json.RawMessage) error {
}
device, ret := rocm_smi.DeviceGetHandleByIndex(i)
if ret != rocm_smi.STATUS_SUCCESS {
err = fmt.Errorf("failed to get handle for GPU %d", i)
cclog.ComponentError(m.name, err.Error())
return err
return fmt.Errorf("%s Init(): failed to get get handle for GPU %d", m.name, i)
}
pciInfo, ret := rocm_smi.DeviceGetPciInfo(device)
if ret != rocm_smi.STATUS_SUCCESS {
err = fmt.Errorf("failed to get PCI information for GPU %d", i)
cclog.ComponentError(m.name, err.Error())
return err
return fmt.Errorf("%s Init(): failed to get PCI information for GPU %d", m.name, i)
}
pciId := fmt.Sprintf(
@@ -149,7 +140,7 @@ func (m *RocmSmiCollector) Init(config json.RawMessage) error {
// Set this flag only if everything is initialized properly, all required files exist, ...
m.init = true
return err
return nil
}
// Read collects all metrics belonging to the sample collector

View File

@@ -8,11 +8,11 @@
package collectors
import (
"bytes"
"encoding/json"
"fmt"
"time"
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
lp "github.com/ClusterCockpit/cc-lib/v2/ccMessage"
)
@@ -52,7 +52,10 @@ func (m *SampleCollector) Init(config json.RawMessage) error {
m.parallel = true
// Define meta information sent with each metric
// (Can also be dynamic or this is the basic set with extension through AddMeta())
m.meta = map[string]string{"source": m.name, "group": "SAMPLE"}
m.meta = map[string]string{
"source": m.name,
"group": "SAMPLE",
}
// Define tags sent with each metric
// The 'type' tag is always needed, it defines the granularity of the metric
// node -> whole system
@@ -63,13 +66,15 @@ func (m *SampleCollector) Init(config json.RawMessage) error {
// core -> single CPU core that may consist of multiple hardware threads (SMT) (requires core ID as 'type-id' tag)
// hwthtread -> single CPU hardware thread (requires hardware thread ID as 'type-id' tag)
// accelerator -> A accelerator device like GPU or FPGA (requires an accelerator ID as 'type-id' tag)
m.tags = map[string]string{"type": "node"}
m.tags = map[string]string{
"type": "node",
}
// Read in the JSON configuration
if len(config) > 0 {
err = json.Unmarshal(config, &m.config)
if err != nil {
cclog.ComponentError(m.name, "Error reading config:", err.Error())
return err
d := json.NewDecoder(bytes.NewReader(config))
d.DisallowUnknownFields()
if err := d.Decode(&m.config); err != nil {
return fmt.Errorf("%s Init(): Error decoding JSON config: %w", m.name, err)
}
}
@@ -96,7 +101,7 @@ func (m *SampleCollector) Read(interval time.Duration, output chan lp.CCMessage)
// stop := readState()
// value = (stop - start) / interval.Seconds()
y, err := lp.NewMessage("sample_metric", m.tags, m.meta, map[string]any{"value": value}, timestamp)
y, err := lp.NewMetric("sample_metric", m.tags, m.meta, value, timestamp)
if err == nil {
// Send it to output channel
output <- y

View File

@@ -8,6 +8,7 @@
package collectors
import (
"bytes"
"encoding/json"
"fmt"
"sync"
@@ -47,26 +48,30 @@ func (m *SampleTimerCollector) Init(name string, config json.RawMessage) error {
}
// Define meta information sent with each metric
// (Can also be dynamic or this is the basic set with extension through AddMeta())
m.meta = map[string]string{"source": m.name, "group": "SAMPLE"}
m.meta = map[string]string{
"source": m.name,
"group": "SAMPLE",
}
// Define tags sent with each metric
// The 'type' tag is always needed, it defines the granularity of the metric
// node -> whole system
// socket -> CPU socket (requires socket ID as 'type-id' tag)
// cpu -> single CPU hardware thread (requires cpu ID as 'type-id' tag)
m.tags = map[string]string{"type": "node"}
m.tags = map[string]string{
"type": "node",
}
// Read in the JSON configuration
if len(config) > 0 {
err = json.Unmarshal(config, &m.config)
if err != nil {
cclog.ComponentError(m.name, "Error reading config:", err.Error())
return err
d := json.NewDecoder(bytes.NewReader(config))
d.DisallowUnknownFields()
if err := d.Decode(&m.config); err != nil {
return fmt.Errorf("%s Init(): error decoding JSON config: %w", m.name, err)
}
}
// Parse the read interval duration
m.interval, err = time.ParseDuration(m.config.Interval)
if err != nil {
cclog.ComponentError(m.name, "Error parsing interval:", err.Error())
return err
return fmt.Errorf("%s Init(): error parsing interval: %w", m.name, err)
}
// Storage for output channel
@@ -77,13 +82,11 @@ func (m *SampleTimerCollector) Init(name string, config json.RawMessage) error {
m.ticker = time.NewTicker(m.interval)
// Start the timer loop with return functionality by sending 'true' to the done channel
m.wg.Add(1)
go func() {
m.wg.Go(func() {
select {
case <-m.done:
// Exit the timer loop
cclog.ComponentDebug(m.name, "Closing...")
m.wg.Done()
return
case timestamp := <-m.ticker.C:
// This is executed every timer tick but we have to wait until the first
@@ -92,7 +95,7 @@ func (m *SampleTimerCollector) Init(name string, config json.RawMessage) error {
m.ReadMetrics(timestamp)
}
}
}()
})
// Set this flag only if everything is initialized properly, all required files exist, ...
m.init = true
@@ -111,7 +114,7 @@ func (m *SampleTimerCollector) ReadMetrics(timestamp time.Time) {
// stop := readState()
// value = (stop - start) / interval.Seconds()
y, err := lp.NewMessage("sample_metric", m.tags, m.meta, map[string]any{"value": value}, timestamp)
y, err := lp.NewMetric("sample_metric", m.tags, m.meta, value, timestamp)
if err == nil && m.output != nil {
// Send it to output channel if we have a valid channel
m.output <- y

View File

@@ -9,6 +9,7 @@ package collectors
import (
"bufio"
"bytes"
"encoding/json"
"fmt"
"os"
@@ -66,8 +67,10 @@ func (m *SchedstatCollector) Init(config json.RawMessage) error {
// Read in the JSON configuration
if len(config) > 0 {
if err := json.Unmarshal(config, &m.config); err != nil {
return fmt.Errorf("%s Init(): Error reading config: %w", m.name, err)
d := json.NewDecoder(bytes.NewReader(config))
d.DisallowUnknownFields()
if err := d.Decode(&m.config); err != nil {
return fmt.Errorf("%s Init(): failed to decode JSON config: %w", m.name, err)
}
}
@@ -124,7 +127,7 @@ func (m *SchedstatCollector) ParseProcLine(linefields []string, tags map[string]
m.olddata[linefields[0]]["waiting"] = waiting
value := l_running + l_waiting
y, err := lp.NewMessage("cpu_load_core", tags, m.meta, map[string]any{"value": value}, now)
y, err := lp.NewMetric("cpu_load_core", tags, m.meta, value, now)
if err == nil {
// Send it to output channel
output <- y

View File

@@ -8,13 +8,13 @@
package collectors
import (
"bytes"
"encoding/json"
"fmt"
"runtime"
"syscall"
"time"
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
lp "github.com/ClusterCockpit/cc-lib/v2/ccMessage"
)
@@ -40,13 +40,18 @@ func (m *SelfCollector) Init(config json.RawMessage) error {
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
}
m.parallel = true
m.meta = map[string]string{"source": m.name, "group": "Self"}
m.tags = map[string]string{"type": "node"}
m.meta = map[string]string{
"source": m.name,
"group": "Self",
}
m.tags = map[string]string{
"type": "node",
}
if len(config) > 0 {
err = json.Unmarshal(config, &m.config)
if err != nil {
cclog.ComponentError(m.name, "Error reading config:", err.Error())
return err
d := json.NewDecoder(bytes.NewReader(config))
d.DisallowUnknownFields()
if err := d.Decode(&m.config); err != nil {
return fmt.Errorf("%s Init(): Error decoding JSON config: %w", m.name, err)
}
}
m.init = true
@@ -60,49 +65,49 @@ func (m *SelfCollector) Read(interval time.Duration, output chan lp.CCMessage) {
var memstats runtime.MemStats
runtime.ReadMemStats(&memstats)
y, err := lp.NewMessage("total_alloc", m.tags, m.meta, map[string]any{"value": memstats.TotalAlloc}, timestamp)
y, err := lp.NewMetric("total_alloc", m.tags, m.meta, memstats.TotalAlloc, timestamp)
if err == nil {
y.AddMeta("unit", "Bytes")
output <- y
}
y, err = lp.NewMessage("heap_alloc", m.tags, m.meta, map[string]any{"value": memstats.HeapAlloc}, timestamp)
y, err = lp.NewMetric("heap_alloc", m.tags, m.meta, memstats.HeapAlloc, timestamp)
if err == nil {
y.AddMeta("unit", "Bytes")
output <- y
}
y, err = lp.NewMessage("heap_sys", m.tags, m.meta, map[string]any{"value": memstats.HeapSys}, timestamp)
y, err = lp.NewMetric("heap_sys", m.tags, m.meta, memstats.HeapSys, timestamp)
if err == nil {
y.AddMeta("unit", "Bytes")
output <- y
}
y, err = lp.NewMessage("heap_idle", m.tags, m.meta, map[string]any{"value": memstats.HeapIdle}, timestamp)
y, err = lp.NewMetric("heap_idle", m.tags, m.meta, memstats.HeapIdle, timestamp)
if err == nil {
y.AddMeta("unit", "Bytes")
output <- y
}
y, err = lp.NewMessage("heap_inuse", m.tags, m.meta, map[string]any{"value": memstats.HeapInuse}, timestamp)
y, err = lp.NewMetric("heap_inuse", m.tags, m.meta, memstats.HeapInuse, timestamp)
if err == nil {
y.AddMeta("unit", "Bytes")
output <- y
}
y, err = lp.NewMessage("heap_released", m.tags, m.meta, map[string]any{"value": memstats.HeapReleased}, timestamp)
y, err = lp.NewMetric("heap_released", m.tags, m.meta, memstats.HeapReleased, timestamp)
if err == nil {
y.AddMeta("unit", "Bytes")
output <- y
}
y, err = lp.NewMessage("heap_objects", m.tags, m.meta, map[string]any{"value": memstats.HeapObjects}, timestamp)
y, err = lp.NewMetric("heap_objects", m.tags, m.meta, memstats.HeapObjects, timestamp)
if err == nil {
output <- y
}
}
if m.config.GoRoutines {
y, err := lp.NewMessage("num_goroutines", m.tags, m.meta, map[string]any{"value": runtime.NumGoroutine()}, timestamp)
y, err := lp.NewMetric("num_goroutines", m.tags, m.meta, runtime.NumGoroutine(), timestamp)
if err == nil {
output <- y
}
}
if m.config.CgoCalls {
y, err := lp.NewMessage("num_cgo_calls", m.tags, m.meta, map[string]any{"value": runtime.NumCgoCall()}, timestamp)
y, err := lp.NewMetric("num_cgo_calls", m.tags, m.meta, runtime.NumCgoCall(), timestamp)
if err == nil {
output <- y
}
@@ -113,35 +118,35 @@ func (m *SelfCollector) Read(interval time.Duration, output chan lp.CCMessage) {
if err == nil {
sec, nsec := rusage.Utime.Unix()
t := float64(sec) + (float64(nsec) * 1e-9)
y, err := lp.NewMessage("rusage_user_time", m.tags, m.meta, map[string]any{"value": t}, timestamp)
y, err := lp.NewMetric("rusage_user_time", m.tags, m.meta, t, timestamp)
if err == nil {
y.AddMeta("unit", "seconds")
output <- y
}
sec, nsec = rusage.Stime.Unix()
t = float64(sec) + (float64(nsec) * 1e-9)
y, err = lp.NewMessage("rusage_system_time", m.tags, m.meta, map[string]any{"value": t}, timestamp)
y, err = lp.NewMetric("rusage_system_time", m.tags, m.meta, t, timestamp)
if err == nil {
y.AddMeta("unit", "seconds")
output <- y
}
y, err = lp.NewMessage("rusage_vol_ctx_switch", m.tags, m.meta, map[string]any{"value": rusage.Nvcsw}, timestamp)
y, err = lp.NewMetric("rusage_vol_ctx_switch", m.tags, m.meta, rusage.Nvcsw, timestamp)
if err == nil {
output <- y
}
y, err = lp.NewMessage("rusage_invol_ctx_switch", m.tags, m.meta, map[string]any{"value": rusage.Nivcsw}, timestamp)
y, err = lp.NewMetric("rusage_invol_ctx_switch", m.tags, m.meta, rusage.Nivcsw, timestamp)
if err == nil {
output <- y
}
y, err = lp.NewMessage("rusage_signals", m.tags, m.meta, map[string]any{"value": rusage.Nsignals}, timestamp)
y, err = lp.NewMetric("rusage_signals", m.tags, m.meta, rusage.Nsignals, timestamp)
if err == nil {
output <- y
}
y, err = lp.NewMessage("rusage_major_pgfaults", m.tags, m.meta, map[string]any{"value": rusage.Majflt}, timestamp)
y, err = lp.NewMetric("rusage_major_pgfaults", m.tags, m.meta, rusage.Majflt, timestamp)
if err == nil {
output <- y
}
y, err = lp.NewMessage("rusage_minor_pgfaults", m.tags, m.meta, map[string]any{"value": rusage.Minflt}, timestamp)
y, err = lp.NewMetric("rusage_minor_pgfaults", m.tags, m.meta, rusage.Minflt, timestamp)
if err == nil {
output <- y
}

View File

@@ -119,8 +119,9 @@ func (m *SlurmCgroupCollector) Init(config json.RawMessage) error {
m.cgroupBase = defaultCgroupBase
if len(config) > 0 {
err = json.Unmarshal(config, &m.config)
if err != nil {
d := json.NewDecoder(strings.NewReader(string(config)))
d.DisallowUnknownFields()
if err = d.Decode(&m.config); err != nil {
return fmt.Errorf("%s Init(): Error reading JSON config: %w", m.name, err)
}
m.excludeMetrics = make(map[string]struct{})

View File

@@ -0,0 +1,360 @@
package collectors
import (
"bytes"
"encoding/json"
"fmt"
"os/exec"
"slices"
"time"
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
lp "github.com/ClusterCockpit/cc-lib/v2/ccMessage"
)
type SmartMonCollectorConfig struct {
UseSudo bool `json:"use_sudo,omitempty"`
ExcludeDevices []string `json:"exclude_devices,omitempty"`
ExcludeMetrics []string `json:"excludeMetrics,omitempty"`
Devices []struct {
Name string `json:"name"`
Type string `json:"type"`
} `json:"devices,omitempty"`
}
type deviceT struct {
Name string `json:"name"`
Type string `json:"type"`
queryCommand []string
}
type SmartMonCollector struct {
metricCollector
config SmartMonCollectorConfig // the configuration structure
meta map[string]string // default meta information
tags map[string]string // default tags
devices []deviceT // smartmon devices
sudoCmd string // Full path to 'sudo' command
smartCtlCmd string // Full path to 'smartctl' command
excludeMetric struct {
temp,
percentUsed,
availSpare,
dataUnitsRead,
dataUnitsWrite,
hostReads,
hostWrites,
powerCycles,
powerOn,
UnsafeShutdowns,
mediaErrors,
errlogEntries,
warnTempTime,
critCompTime bool
}
}
func (m *SmartMonCollector) getSmartmonDevices() error {
// Use configured devices
if len(m.config.Devices) > 0 {
for _, configDevice := range m.config.Devices {
if !slices.Contains(m.config.ExcludeDevices, configDevice.Name) {
d := deviceT{
Name: configDevice.Name,
Type: configDevice.Type,
}
if m.config.UseSudo {
d.queryCommand = append(d.queryCommand, m.sudoCmd)
}
d.queryCommand = append(d.queryCommand, m.smartCtlCmd, "--json=c", "--device="+d.Type, "--all", d.Name)
m.devices = append(m.devices, d)
}
}
return nil
}
// Use scan command
var scanCmd []string
if m.config.UseSudo {
scanCmd = append(scanCmd, m.sudoCmd)
}
scanCmd = append(scanCmd, m.smartCtlCmd, "--scan", "--json=c")
command := exec.Command(scanCmd[0], scanCmd[1:]...)
stdout, err := command.Output()
if err != nil {
return fmt.Errorf(
"%s getSmartmonDevices(): Failed to execute device scan command %s: %w",
m.name, command.String(), err)
}
var scanOutput struct {
Devices []deviceT `json:"devices"`
}
err = json.Unmarshal(stdout, &scanOutput)
if err != nil {
return fmt.Errorf("%s getSmartmonDevices(): Failed to parse JSON output from device scan command: %w",
m.name, err)
}
m.devices = make([]deviceT, 0)
for _, d := range scanOutput.Devices {
if !slices.Contains(m.config.ExcludeDevices, d.Name) {
if m.config.UseSudo {
d.queryCommand = append(d.queryCommand, m.sudoCmd)
}
d.queryCommand = append(d.queryCommand, m.smartCtlCmd, "--json=c", "--device="+d.Type, "--all", d.Name)
m.devices = append(m.devices, d)
}
}
return nil
}
func (m *SmartMonCollector) Init(config json.RawMessage) error {
m.name = "SmartMonCollector"
if err := m.setup(); err != nil {
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
}
m.parallel = true
m.meta = map[string]string{
"source": m.name,
"group": "Disk",
}
m.tags = map[string]string{
"type": "node",
"stype": "disk",
}
// Read in the JSON configuration
if len(config) > 0 {
d := json.NewDecoder(bytes.NewReader(config))
d.DisallowUnknownFields()
if err := d.Decode(&m.config); err != nil {
return fmt.Errorf("%s Init(): Error reading config: %w", m.name, err)
}
}
for _, excludeMetric := range m.config.ExcludeMetrics {
switch excludeMetric {
case "smartmon_temp":
m.excludeMetric.temp = true
case "smartmon_percent_used":
m.excludeMetric.percentUsed = true
case "smartmon_avail_spare":
m.excludeMetric.availSpare = true
case "smartmon_data_units_read":
m.excludeMetric.dataUnitsRead = true
case "smartmon_data_units_write":
m.excludeMetric.dataUnitsWrite = true
case "smartmon_host_reads":
m.excludeMetric.hostReads = true
case "smartmon_host_writes":
m.excludeMetric.hostWrites = true
case "smartmon_power_cycles":
m.excludeMetric.powerCycles = true
case "smartmon_power_on":
m.excludeMetric.powerOn = true
case "smartmon_unsafe_shutdowns":
m.excludeMetric.UnsafeShutdowns = true
case "smartmon_media_errors":
m.excludeMetric.mediaErrors = true
case "smartmon_errlog_entries":
m.excludeMetric.errlogEntries = true
case "smartmon_warn_temp_time":
m.excludeMetric.warnTempTime = true
case "smartmon_crit_comp_time":
m.excludeMetric.critCompTime = true
default:
return fmt.Errorf("%s Init(): Unknown excluded metric: %s", m.name, excludeMetric)
}
}
// Check if sudo and smartctl are in search path
if m.config.UseSudo {
p, err := exec.LookPath("sudo")
if err != nil {
return fmt.Errorf("%s Init(): No sudo command found in search path: %w", m.name, err)
}
m.sudoCmd = p
}
p, err := exec.LookPath("smartctl")
if err != nil {
return fmt.Errorf("%s Init(): No smartctl command found in search path: %w", m.name, err)
}
m.smartCtlCmd = p
if err = m.getSmartmonDevices(); err != nil {
return err
}
m.init = true
return err
}
type SmartMonData struct {
SerialNumber string `json:"serial_number"`
UserCapacity struct {
Blocks int `json:"blocks"`
Bytes int `json:"bytes"`
} `json:"user_capacity"`
HealthLog struct {
// Available SMART health information:
// sudo smartctl -a --json=c /dev/nvme0 | jq --color-output | less --RAW-CONTROL-CHARS
Temperature int `json:"temperature"`
PercentageUsed int `json:"percentage_used"`
AvailableSpare int `json:"available_spare"`
DataUnitsRead int `json:"data_units_read"`
DataUnitsWrite int `json:"data_units_written"`
HostReads int `json:"host_reads"`
HostWrites int `json:"host_writes"`
PowerCycles int `json:"power_cycles"`
PowerOnHours int `json:"power_on_hours"`
UnsafeShutdowns int `json:"unsafe_shutdowns"`
MediaErrors int `json:"media_errors"`
NumErrorLogEntries int `json:"num_err_log_entries"`
WarnTempTime int `json:"warning_temp_time"`
CriticalCompTime int `json:"critical_comp_time"`
} `json:"nvme_smart_health_information_log"`
}
func (m *SmartMonCollector) Read(interval time.Duration, output chan lp.CCMessage) {
timestamp := time.Now()
for _, d := range m.devices {
var data SmartMonData
command := exec.Command(d.queryCommand[0], d.queryCommand[1:]...)
stdout, err := command.Output()
if err != nil {
cclog.ComponentError(m.name, "cannot read data for device", d.Name)
continue
}
err = json.Unmarshal(stdout, &data)
if err != nil {
cclog.ComponentError(m.name, "cannot unmarshal data for device", d.Name)
continue
}
if !m.excludeMetric.temp {
y, err := lp.NewMetric(
"smartmon_temp", m.tags, m.meta, data.HealthLog.Temperature, timestamp)
if err == nil {
y.AddTag("stype-id", d.Name)
y.AddMeta("unit", "degC")
output <- y
}
}
if !m.excludeMetric.percentUsed {
y, err := lp.NewMetric(
"smartmon_percent_used", m.tags, m.meta, data.HealthLog.PercentageUsed, timestamp)
if err == nil {
y.AddTag("stype-id", d.Name)
y.AddMeta("unit", "percent")
output <- y
}
}
if !m.excludeMetric.availSpare {
y, err := lp.NewMetric(
"smartmon_avail_spare", m.tags, m.meta, data.HealthLog.AvailableSpare, timestamp)
if err == nil {
y.AddTag("stype-id", d.Name)
y.AddMeta("unit", "percent")
output <- y
}
}
if !m.excludeMetric.dataUnitsRead {
y, err := lp.NewMetric(
"smartmon_data_units_read", m.tags, m.meta, data.HealthLog.DataUnitsRead, timestamp)
if err == nil {
y.AddTag("stype-id", d.Name)
output <- y
}
}
if !m.excludeMetric.dataUnitsWrite {
y, err := lp.NewMetric(
"smartmon_data_units_write", m.tags, m.meta, data.HealthLog.DataUnitsWrite, timestamp)
if err == nil {
y.AddTag("stype-id", d.Name)
output <- y
}
}
if !m.excludeMetric.hostReads {
y, err := lp.NewMetric(
"smartmon_host_reads", m.tags, m.meta, data.HealthLog.HostReads, timestamp)
if err == nil {
y.AddTag("stype-id", d.Name)
output <- y
}
}
if !m.excludeMetric.hostWrites {
y, err := lp.NewMetric(
"smartmon_host_writes", m.tags, m.meta, data.HealthLog.HostWrites, timestamp)
if err == nil {
y.AddTag("stype-id", d.Name)
output <- y
}
}
if !m.excludeMetric.powerCycles {
y, err := lp.NewMetric(
"smartmon_power_cycles", m.tags, m.meta, data.HealthLog.PowerCycles, timestamp)
if err == nil {
y.AddTag("stype-id", d.Name)
output <- y
}
}
if !m.excludeMetric.powerOn {
y, err := lp.NewMetric(
"smartmon_power_on", m.tags, m.meta, int64(data.HealthLog.PowerOnHours)*3600, timestamp)
if err == nil {
y.AddTag("stype-id", d.Name)
y.AddMeta("unit", "sec")
output <- y
}
}
if !m.excludeMetric.UnsafeShutdowns {
y, err := lp.NewMetric(
"smartmon_unsafe_shutdowns", m.tags, m.meta, data.HealthLog.UnsafeShutdowns, timestamp)
if err == nil {
y.AddTag("stype-id", d.Name)
output <- y
}
}
if !m.excludeMetric.mediaErrors {
y, err := lp.NewMetric(
"smartmon_media_errors", m.tags, m.meta, data.HealthLog.MediaErrors, timestamp)
if err == nil {
y.AddTag("stype-id", d.Name)
output <- y
}
}
if !m.excludeMetric.errlogEntries {
y, err := lp.NewMetric(
"smartmon_errlog_entries", m.tags, m.meta, data.HealthLog.NumErrorLogEntries, timestamp)
if err == nil {
y.AddTag("stype-id", d.Name)
output <- y
}
}
if !m.excludeMetric.warnTempTime {
y, err := lp.NewMetric(
"smartmon_warn_temp_time", m.tags, m.meta, data.HealthLog.WarnTempTime, timestamp)
if err == nil {
y.AddTag("stype-id", d.Name)
output <- y
}
}
if !m.excludeMetric.critCompTime {
y, err := lp.NewMetric(
"smartmon_crit_comp_time", m.tags, m.meta, data.HealthLog.CriticalCompTime, timestamp)
if err == nil {
y.AddTag("stype-id", d.Name)
output <- y
}
}
}
}
func (m *SmartMonCollector) Close() {
m.init = false
}

View File

@@ -0,0 +1,52 @@
<!--
---
title: smartmon metric collector
description: Collect S.M.A.R.T data from NVMEs
categories: [cc-metric-collector]
tags: ['Admin']
weight: 2
hugo_path: docs/reference/cc-metric-collector/collectors/smartmonMetric.md
---
-->
## `smartmon` collector
```json
"smartmon": {
"use_sudo": true,
"exclude_devices": [
"/dev/sda"
],
"excludeMetrics": [
"smartmon_warn_temp_time",
"smartmon_crit_comp_time"
],
"devices": [
{
"name": "/dev/nvme0",
"type": "nvme"
}
]
}
```
The `smartmon` collector retrieves S.M.A.R.T data from NVMEs via command `smartctl`.
Available NVMEs can be either automatically detected by a device scan or manually added with the "devices" config option.
Metrics:
* `smartmon_temp`: Temperature of the device (`unit=degC`)
* `smartmon_avail_spare`: Amount of spare left (`unit=percent`)
* `smartmon_percent_used`: Percentage of the device is used (`unit=percent`)
* `smartmon_data_units_read`: Read data units
* `smartmon_data_units_write`: Written data units
* `smartmon_host_reads`: Read operations
* `smartmon_host_writes`: Write operations
* `smartmon_power_cycles`: Number of power cycles
* `smartmon_power_on`: Seconds the device is powered on (`unit=seconds`)
* `smartmon_unsafe_shutdowns`: Count of unsafe shutdowns
* `smartmon_media_errors`: Media errors of the device
* `smartmon_errlog_entries`: Error log entries
* `smartmon_warn_temp_time`: Time above the warning temperature threshold
* `smartmon_crit_comp_time`: Time above the critical composite temperature threshold

View File

@@ -8,6 +8,7 @@
package collectors
import (
"bytes"
"encoding/json"
"fmt"
"os"
@@ -63,9 +64,10 @@ func (m *TempCollector) Init(config json.RawMessage) error {
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
}
if len(config) > 0 {
err := json.Unmarshal(config, &m.config)
if err != nil {
return fmt.Errorf("%s Init(): failed to unmarshal JSON config: %w", m.name, err)
d := json.NewDecoder(bytes.NewReader(config))
d.DisallowUnknownFields()
if err := d.Decode(&m.config); err != nil {
return fmt.Errorf("%s Init(): Error decoding JSON config: %w", m.name, err)
}
}

View File

@@ -14,10 +14,10 @@ hugo_path: docs/reference/cc-metric-collector/collectors/temp.md
```json
"tempstat": {
"tag_override" : {
"<device like hwmon1>" : {
"type" : "socket",
"type-id" : "0"
"tag_override": {
"<device like hwmon1>": {
"type": "socket",
"type-id": "0"
}
},
"exclude_metrics": [

View File

@@ -8,6 +8,7 @@
package collectors
import (
"bytes"
"encoding/json"
"fmt"
"os/exec"
@@ -46,9 +47,10 @@ func (m *TopProcsCollector) Init(config json.RawMessage) error {
"group": "TopProcs",
}
if len(config) > 0 {
err = json.Unmarshal(config, &m.config)
if err != nil {
return fmt.Errorf("%s Init(): json.Unmarshal() failed: %w", m.name, err)
d := json.NewDecoder(bytes.NewReader(config))
d.DisallowUnknownFields()
if err := d.Decode(&m.config); err != nil {
return fmt.Errorf("%s Init(): Error decoding JSON config: %w", m.name, err)
}
} else {
m.config.Num_procs = int(DEFAULT_NUM_PROCS)

View File

@@ -34,8 +34,8 @@
},
"numastats": {},
"nvidia": {},
"schedstat": {
},
"schedstat": {},
"smartmon": {},
"tempstat": {
"report_max_temperature": true,
"report_critical_temperature": true,

View File

@@ -1,6 +1,6 @@
{
"process_messages" : {
"add_tag_if": [
"add_tags_if": [
{
"key" : "cluster",
"value" : "testcluster",
@@ -12,7 +12,7 @@
"if" : "name == 'temp_package_id_0'"
}
],
"delete_tag_if": [
"delete_meta_if": [
{
"key" : "unit",
"if" : "true"

5
go.mod
View File

@@ -3,14 +3,14 @@ module github.com/ClusterCockpit/cc-metric-collector
go 1.25.0
require (
github.com/ClusterCockpit/cc-lib/v2 v2.7.0
github.com/ClusterCockpit/cc-lib/v2 v2.8.2
github.com/ClusterCockpit/go-rocm-smi v0.3.0
github.com/NVIDIA/go-nvml v0.13.0-1
github.com/PaesslerAG/gval v1.2.4
github.com/fsnotify/fsnotify v1.9.0
github.com/tklauser/go-sysconf v0.3.16
golang.design/x/thread v0.0.0-20210122121316-335e9adffdf1
golang.org/x/sys v0.41.0
golang.org/x/sys v0.42.0
)
require (
@@ -39,7 +39,6 @@ require (
github.com/tklauser/numcpus v0.11.0 // indirect
go.yaml.in/yaml/v2 v2.4.3 // indirect
golang.org/x/crypto v0.48.0 // indirect
golang.org/x/exp v0.0.0-20260218203240-3dfff04db8fa // indirect
golang.org/x/net v0.51.0 // indirect
google.golang.org/protobuf v1.36.11 // indirect
)

12
go.sum
View File

@@ -1,5 +1,7 @@
github.com/ClusterCockpit/cc-lib/v2 v2.7.0 h1:EMTShk6rMTR1wlfmQ8SVCawH1OdltUbD3kVQmaW+5pE=
github.com/ClusterCockpit/cc-lib/v2 v2.7.0/go.mod h1:0Etx8WMs0lYZ4tiOQizY18CQop+2i3WROvU9rMUxHA4=
github.com/ClusterCockpit/cc-lib/v2 v2.8.0 h1:ROduRzRuusi+6kLB991AAu3Pp2AHOasQJFJc7JU/n/E=
github.com/ClusterCockpit/cc-lib/v2 v2.8.0/go.mod h1:FwD8vnTIbBM3ngeLNKmCvp9FoSjQZm7xnuaVxEKR23o=
github.com/ClusterCockpit/cc-lib/v2 v2.8.2 h1:rCLZk8wz8yq8xBnBEdVKigvA2ngR8dPmHbEFwxxb3jw=
github.com/ClusterCockpit/cc-lib/v2 v2.8.2/go.mod h1:FwD8vnTIbBM3ngeLNKmCvp9FoSjQZm7xnuaVxEKR23o=
github.com/ClusterCockpit/cc-line-protocol/v2 v2.4.0 h1:hIzxgTBWcmCIHtoDKDkSCsKCOCOwUC34sFsbD2wcW0Q=
github.com/ClusterCockpit/cc-line-protocol/v2 v2.4.0/go.mod h1:y42qUu+YFmu5fdNuUAS4VbbIKxVjxCvbVqFdpdh8ahY=
github.com/ClusterCockpit/go-rocm-smi v0.3.0 h1:1qZnSpG7/NyLtc7AjqnUL9Jb8xtqG1nMVgp69rJfaR8=
@@ -105,13 +107,11 @@ golang.design/x/thread v0.0.0-20210122121316-335e9adffdf1 h1:P7S/GeHBAFEZIYp0ePP
golang.design/x/thread v0.0.0-20210122121316-335e9adffdf1/go.mod h1:9CWpnTUmlQkfdpdutA1nNf4iE5lAVt3QZOu0Z6hahBE=
golang.org/x/crypto v0.48.0 h1:/VRzVqiRSggnhY7gNRxPauEQ5Drw9haKdM0jqfcCFts=
golang.org/x/crypto v0.48.0/go.mod h1:r0kV5h3qnFPlQnBSrULhlsRfryS2pmewsg+XfMgkVos=
golang.org/x/exp v0.0.0-20260218203240-3dfff04db8fa h1:Zt3DZoOFFYkKhDT3v7Lm9FDMEV06GpzjG2jrqW+QTE0=
golang.org/x/exp v0.0.0-20260218203240-3dfff04db8fa/go.mod h1:K79w1Vqn7PoiZn+TkNpx3BUWUQksGO3JcVX6qIjytmA=
golang.org/x/net v0.51.0 h1:94R/GTO7mt3/4wIKpcR5gkGmRLOuE/2hNGeWq/GBIFo=
golang.org/x/net v0.51.0/go.mod h1:aamm+2QF5ogm02fjy5Bb7CQ0WMt1/WVM7FtyaTLlA9Y=
golang.org/x/sys v0.0.0-20210122093101-04d7465088b8/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.41.0 h1:Ivj+2Cp/ylzLiEU89QhWblYnOE9zerudt9Ftecq2C6k=
golang.org/x/sys v0.41.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
golang.org/x/sys v0.42.0 h1:omrd2nAlyT5ESRdCLYdm3+fMfNFE/+Rf4bDIQImRJeo=
golang.org/x/sys v0.42.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
golang.org/x/time v0.14.0 h1:MRx4UaLrDotUKUdCIqzPC48t1Y9hANFKIRpNx+Te8PI=
golang.org/x/time v0.14.0/go.mod h1:eL/Oa2bBBK0TkX57Fyni+NgnyQQN4LitPmob2Hjnqw4=
google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE=

View File

@@ -94,8 +94,7 @@ func (c *metricAggregator) Init(output chan lp.CCMessage) error {
// Set hostname
hostname, err := os.Hostname()
if err != nil {
cclog.Error(err.Error())
return err
return fmt.Errorf("metricAggregator: failed to get hostname: %w", err)
}
// Drop domain part of host name
c.constants["hostname"] = strings.SplitN(hostname, `.`, 2)[0]

View File

@@ -8,6 +8,7 @@
package metricRouter
import (
"fmt"
"sync"
"time"
@@ -70,8 +71,7 @@ func (c *metricCache) Init(output chan lp.CCMessage, ticker mct.MultiChanTicker,
// The code is executed by the MetricCache goroutine
c.aggEngine, err = agg.NewAggregator(c.output)
if err != nil {
cclog.ComponentError("MetricCache", "Cannot create aggregator")
return err
return fmt.Errorf("MetricCache: failed to create aggregator: %w", err)
}
return nil

View File

@@ -8,6 +8,7 @@
package metricRouter
import (
"bytes"
"encoding/json"
"fmt"
"maps"
@@ -46,8 +47,7 @@ type metricRouterConfig struct {
MaxForward int `json:"max_forward"` // Number of maximal forwarded metrics at one select
NormalizeUnits bool `json:"normalize_units"` // Check unit meta flag and normalize it using cc-units
ChangeUnitPrefix map[string]string `json:"change_unit_prefix"` // Add prefix that should be applied to the metrics
// dropMetrics map[string]bool // Internal map for O(1) lookup
MessageProcessor json.RawMessage `json:"process_messages,omitempty"`
MessageProcessor json.RawMessage `json:"process_messages,omitempty"`
}
// Metric router data structure
@@ -102,18 +102,17 @@ func (r *metricRouter) Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, rout
// Drop domain part of host name
r.hostname = strings.SplitN(hostname, `.`, 2)[0]
err = json.Unmarshal(routerConfig, &r.config)
if err != nil {
cclog.ComponentError("MetricRouter", err.Error())
return err
d := json.NewDecoder(bytes.NewReader(routerConfig))
d.DisallowUnknownFields()
if err := d.Decode(&r.config); err != nil {
return fmt.Errorf("failed to decode metric router config: %w", err)
}
r.maxForward = max(1, r.config.MaxForward)
if r.config.NumCacheIntervals > 0 {
r.cache, err = NewCache(r.cache_input, r.ticker, &r.cachewg, r.config.NumCacheIntervals)
if err != nil {
cclog.ComponentError("MetricRouter", "MetricCache initialization failed:", err.Error())
return err
return fmt.Errorf("MetricRouter: failed to initialize MetricCache: %w", err)
}
for _, agg := range r.config.IntervalAgg {
err = r.cache.AddAggregation(agg.Name, agg.Function, agg.Condition, agg.Tags, agg.Meta)