Compare commits

..

28 Commits

Author SHA1 Message Date
Michael Panzlaff
1251f9ef6b Merge pull request #207 from ClusterCockpit/ipmi-sudo
Add IPMI sudo support
2026-03-24 15:32:37 +01:00
Michael Panzlaff
f816f4991b ipmi: refactor and add sudo support 2026-03-24 15:06:47 +01:00
Michael Panzlaff
e40816eb17 ipmi: refactor and add sudo support 2026-03-24 14:24:35 +01:00
Michael Panzlaff
b947f98459 update cc-lib to v2.11.0 2026-03-24 14:24:25 +01:00
dependabot[bot]
c328fbf05a Bump github.com/ClusterCockpit/go-rocm-smi from 0.3.0 to 0.4.0
Bumps [github.com/ClusterCockpit/go-rocm-smi](https://github.com/ClusterCockpit/go-rocm-smi) from 0.3.0 to 0.4.0.
- [Commits](https://github.com/ClusterCockpit/go-rocm-smi/compare/v0.3...v0.4.0)

---
updated-dependencies:
- dependency-name: github.com/ClusterCockpit/go-rocm-smi
  dependency-version: 0.4.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2026-03-23 13:00:23 +01:00
dependabot[bot]
37ec7c19e6 Bump github.com/ClusterCockpit/cc-lib/v2 from 2.8.2 to 2.10.0
Bumps [github.com/ClusterCockpit/cc-lib/v2](https://github.com/ClusterCockpit/cc-lib) from 2.8.2 to 2.10.0.
- [Release notes](https://github.com/ClusterCockpit/cc-lib/releases)
- [Commits](https://github.com/ClusterCockpit/cc-lib/compare/v2.8.2...v2.10.0)

---
updated-dependencies:
- dependency-name: github.com/ClusterCockpit/cc-lib/v2
  dependency-version: 2.10.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2026-03-23 12:48:02 +01:00
Thomas Roehl
13fc8a53d3 Memstat: Fix mem_shared and add more metrics 2026-03-17 18:07:30 +01:00
Thomas Röhl
1937ef2587 Update cc-lib to 2.8.2 2026-03-13 18:00:26 +01:00
Holger Obermaier
35510d3d39 Use strict JSON decoding 2026-03-13 17:57:33 +01:00
Holger Obermaier
ef5e4c2604 Corrected json config 2026-03-13 17:57:33 +01:00
Holger Obermaier
44401318e4 Enable same linters as in CI pipeline 2026-03-13 17:57:33 +01:00
Holger Obermaier
2e60d3111c Add config option to exclude metrics 2026-03-13 17:57:33 +01:00
Holger Obermaier
e8734c02db Add config option for manual device configuration 2026-03-13 17:57:33 +01:00
Holger Obermaier
54650d40a6 Store query command for later reuse 2026-03-13 17:57:33 +01:00
Holger Obermaier
e7050834f5 * Honor config option excluded devices
* Use device type in read command
2026-03-13 17:57:33 +01:00
Holger Obermaier
893a0d69de Improve error reporting 2026-03-13 17:57:33 +01:00
Holger Obermaier
345119866a Switch from lp.NewMessage to lp.NewMetric 2026-03-13 17:57:33 +01:00
Holger Obermaier
ec917cf802 Switch from lp.NewMessage to lp.NewMetric 2026-03-13 17:57:33 +01:00
Holger Obermaier
c7cfc0723b Fix all linter warnings 2026-03-13 17:57:33 +01:00
Holger Obermaier
4f2685f4c4 Addapt to new ccMessage syntax 2026-03-13 17:57:33 +01:00
Thomas Roehl
439bfacfd9 Add SmartMonCollector to CollectorManager 2026-03-13 17:57:33 +01:00
Thomas Roehl
cd4ac9c885 Add Collector for S.M.A.R.T disk data 2026-03-13 17:57:33 +01:00
Holger Obermaier
eeb60ba0df Add target to build stripped executable 2026-03-12 11:39:43 +01:00
Holger Obermaier
a481a34dcd Avoid duplicate error printing 2026-03-12 10:08:23 +01:00
Holger Obermaier
b65576431e Stricter json parsing (#204) 2026-03-11 15:59:14 +01:00
Holger Obermaier
a927565868 Fix router config syntax 2026-03-10 13:51:06 +01:00
dependabot[bot]
0b67993eb0 Bump github.com/ClusterCockpit/cc-lib/v2 from 2.7.0 to 2.8.0
Bumps [github.com/ClusterCockpit/cc-lib/v2](https://github.com/ClusterCockpit/cc-lib) from 2.7.0 to 2.8.0.
- [Release notes](https://github.com/ClusterCockpit/cc-lib/releases)
- [Commits](https://github.com/ClusterCockpit/cc-lib/compare/v2.7.0...v2.8.0)

---
updated-dependencies:
- dependency-name: github.com/ClusterCockpit/cc-lib/v2
  dependency-version: 2.8.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2026-03-09 07:58:27 +01:00
dependabot[bot]
4164e3d1a3 Bump golang.org/x/sys from 0.41.0 to 0.42.0
Bumps [golang.org/x/sys](https://github.com/golang/sys) from 0.41.0 to 0.42.0.
- [Commits](https://github.com/golang/sys/compare/v0.41.0...v0.42.0)

---
updated-dependencies:
- dependency-name: golang.org/x/sys
  dependency-version: 0.42.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2026-03-09 07:58:11 +01:00
49 changed files with 970 additions and 408 deletions

View File

@@ -27,6 +27,17 @@ $(APP): $(GOSRC) go.mod
$(GOBIN) get $(GOBIN) get
$(GOBIN) build -o $(APP) $(GOSRC_APP) $(GOBIN) build -o $(APP) $(GOSRC_APP)
# -ldflags:
# -s : drops the OS symbol table
# -w : drops DWARF
# -> Panic stack traces still show function names and file:line
.PHONY: build-stripped
build-stripped:
make -C collectors
$(GOBIN) get
$(GOBIN) build -ldflags "-s -w" -trimpath -o $(APP) $(GOSRC_APP)
.PHONY: install
install: $(APP) install: $(APP)
@WORKSPACE=$(PREFIX) @WORKSPACE=$(PREFIX)
@if [ -z "$${WORKSPACE}" ]; then exit 1; fi @if [ -z "$${WORKSPACE}" ]; then exit 1; fi
@@ -89,7 +100,7 @@ staticcheck:
.PHONY: golangci-lint .PHONY: golangci-lint
golangci-lint: golangci-lint:
$(GOBIN) install github.com/golangci/golangci-lint/v2/cmd/golangci-lint@latest $(GOBIN) install github.com/golangci/golangci-lint/v2/cmd/golangci-lint@latest
$$($(GOBIN) env GOPATH)/bin/golangci-lint run $$($(GOBIN) env GOPATH)/bin/golangci-lint run --enable errorlint,govet,misspell,modernize,prealloc,staticcheck,unconvert,wastedassign
.ONESHELL: .ONESHELL:
.PHONY: RPM .PHONY: RPM

View File

@@ -8,6 +8,7 @@
package main package main
import ( import (
"bytes"
"encoding/json" "encoding/json"
"flag" "flag"
"os" "os"
@@ -48,22 +49,22 @@ type RuntimeConfig struct {
Sync sync.WaitGroup Sync sync.WaitGroup
} }
// ReadCli reads the command line arguments
func ReadCli() map[string]string { func ReadCli() map[string]string {
var m map[string]string
cfg := flag.String("config", "./config.json", "Path to configuration file") cfg := flag.String("config", "./config.json", "Path to configuration file")
logfile := flag.String("log", "stderr", "Path for logfile") logfile := flag.String("log", "stderr", "Path for logfile")
once := flag.Bool("once", false, "Run all collectors only once") once := flag.Bool("once", false, "Run all collectors only once")
loglevel := flag.String("loglevel", "info", "Set log level") loglevel := flag.String("loglevel", "info", "Set log level")
flag.Parse() flag.Parse()
m = make(map[string]string) m := map[string]string{
m["configfile"] = *cfg "configfile": *cfg,
m["logfile"] = *logfile "logfile": *logfile,
"once": "false",
"loglevel": *loglevel,
}
if *once { if *once {
m["once"] = "true" m["once"] = "true"
} else {
m["once"] = "false"
} }
m["loglevel"] = *loglevel
return m return m
} }
@@ -120,9 +121,10 @@ func mainFunc() int {
// Load and check configuration // Load and check configuration
main := ccconf.GetPackageConfig("main") main := ccconf.GetPackageConfig("main")
err = json.Unmarshal(main, &rcfg.ConfigFile) d := json.NewDecoder(bytes.NewReader(main))
if err != nil { d.DisallowUnknownFields()
cclog.Error("Error reading configuration file ", rcfg.CliArgs["configfile"], ": ", err.Error()) if err := d.Decode(&rcfg.ConfigFile); err != nil {
cclog.Errorf("Error reading configuration file %s: %v", rcfg.CliArgs["configfile"], err)
return 1 return 1
} }

View File

@@ -59,6 +59,7 @@ In contrast to the configuration files for sinks and receivers, the collectors c
* [ ] Aggreate metrics to higher topology entity (sum hwthread metrics to socket metric, ...). Needs to be configurable * [ ] Aggreate metrics to higher topology entity (sum hwthread metrics to socket metric, ...). Needs to be configurable
# Contributing own collectors # Contributing own collectors
A collector reads data from any source, parses it to metrics and submits these metrics to the `metric-collector`. A collector provides three function: A collector reads data from any source, parses it to metrics and submits these metrics to the `metric-collector`. A collector provides three function:
* `Name() string`: Return the name of the collector * `Name() string`: Return the name of the collector
@@ -104,8 +105,10 @@ func (m *SampleCollector) Init(config json.RawMessage) error {
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err) return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
} }
if len(config) > 0 { if len(config) > 0 {
if err := json.Unmarshal(config, &m.config); err != nil { d := json.NewDecoder(bytes.NewReader(config))
return fmt.Errorf("%s Init(): json.Unmarshal() call failed: %w", m.name, err) d.DisallowUnknownFields()
if err := d.Decode(&m.config); err != nil {
return fmt.Errorf("%s Init(): Error decoding JSON config: %w", m.name, err)
} }
} }
m.meta = map[string]string{"source": m.name, "group": "Sample"} m.meta = map[string]string{"source": m.name, "group": "Sample"}

View File

@@ -32,7 +32,7 @@ const DEFAULT_BEEGFS_CMD = "beegfs-ctl"
type BeegfsMetaCollectorConfig struct { type BeegfsMetaCollectorConfig struct {
Beegfs string `json:"beegfs_path"` Beegfs string `json:"beegfs_path"`
ExcludeMetrics []string `json:"exclude_metrics,omitempty"` ExcludeMetrics []string `json:"exclude_metrics,omitempty"`
ExcludeFilesystem []string `json:"exclude_filesystem"` ExcludeFilesystems []string `json:"exclude_filesystem"`
} }
type BeegfsMetaCollector struct { type BeegfsMetaCollector struct {
@@ -74,9 +74,10 @@ func (m *BeegfsMetaCollector) Init(config json.RawMessage) error {
// Read JSON configuration // Read JSON configuration
if len(config) > 0 { if len(config) > 0 {
err := json.Unmarshal(config, &m.config) d := json.NewDecoder(bytes.NewReader(config))
if err != nil { d.DisallowUnknownFields()
return err if err := d.Decode(&m.config); err != nil {
return fmt.Errorf("%s Init(): Failed to decode JSON config: %w", m.name, err)
} }
} }
@@ -99,23 +100,23 @@ func (m *BeegfsMetaCollector) Init(config json.RawMessage) error {
"filesystem": "", "filesystem": "",
} }
m.skipFS = make(map[string]struct{}) m.skipFS = make(map[string]struct{})
for _, fs := range m.config.ExcludeFilesystem { for _, fs := range m.config.ExcludeFilesystems {
m.skipFS[fs] = struct{}{} m.skipFS[fs] = struct{}{}
} }
// Beegfs file system statistics can only be queried by user root // Beegfs file system statistics can only be queried by user root
user, err := user.Current() user, err := user.Current()
if err != nil { if err != nil {
return fmt.Errorf("BeegfsMetaCollector.Init(): Failed to get current user: %w", err) return fmt.Errorf("%s Init(): Failed to get current user: %w", m.name, err)
} }
if user.Uid != "0" { if user.Uid != "0" {
return fmt.Errorf("BeegfsMetaCollector.Init(): BeeGFS file system statistics can only be queried by user root") return fmt.Errorf("%s Init(): BeeGFS file system statistics can only be queried by user root", m.name)
} }
// Check if beegfs-ctl is in executable search path // Check if beegfs-ctl is in executable search path
_, err = exec.LookPath(m.config.Beegfs) _, err = exec.LookPath(m.config.Beegfs)
if err != nil { if err != nil {
return fmt.Errorf("BeegfsMetaCollector.Init(): Failed to find beegfs-ctl binary '%s': %w", m.config.Beegfs, err) return fmt.Errorf("%s Init(): Failed to find beegfs-ctl binary '%s': %w", m.name, m.config.Beegfs, err)
} }
m.init = true m.init = true
return nil return nil

View File

@@ -30,7 +30,7 @@ import (
type BeegfsStorageCollectorConfig struct { type BeegfsStorageCollectorConfig struct {
Beegfs string `json:"beegfs_path"` Beegfs string `json:"beegfs_path"`
ExcludeMetrics []string `json:"exclude_metrics,omitempty"` ExcludeMetrics []string `json:"exclude_metrics,omitempty"`
ExcludeFilesystem []string `json:"exclude_filesystem"` ExcludeFilesystems []string `json:"exclude_filesystem"`
} }
type BeegfsStorageCollector struct { type BeegfsStorageCollector struct {
@@ -67,9 +67,10 @@ func (m *BeegfsStorageCollector) Init(config json.RawMessage) error {
// Read JSON configuration // Read JSON configuration
if len(config) > 0 { if len(config) > 0 {
err := json.Unmarshal(config, &m.config) d := json.NewDecoder(bytes.NewReader(config))
if err != nil { d.DisallowUnknownFields()
return err if err := d.Decode(&m.config); err != nil {
return fmt.Errorf("%s Init(): failed to decode JSON config: %w", m.name, err)
} }
} }
@@ -92,23 +93,23 @@ func (m *BeegfsStorageCollector) Init(config json.RawMessage) error {
"filesystem": "", "filesystem": "",
} }
m.skipFS = make(map[string]struct{}) m.skipFS = make(map[string]struct{})
for _, fs := range m.config.ExcludeFilesystem { for _, fs := range m.config.ExcludeFilesystems {
m.skipFS[fs] = struct{}{} m.skipFS[fs] = struct{}{}
} }
// Beegfs file system statistics can only be queried by user root // Beegfs file system statistics can only be queried by user root
user, err := user.Current() user, err := user.Current()
if err != nil { if err != nil {
return fmt.Errorf("BeegfsStorageCollector.Init(): Failed to get current user: %w", err) return fmt.Errorf("%s Init(): Failed to get current user: %w", m.name, err)
} }
if user.Uid != "0" { if user.Uid != "0" {
return fmt.Errorf("BeegfsStorageCollector.Init(): BeeGFS file system statistics can only be queried by user root") return fmt.Errorf("%s Init(): BeeGFS file system statistics can only be queried by user root", m.name)
} }
// Check if beegfs-ctl is in executable search path // Check if beegfs-ctl is in executable search path
_, err = exec.LookPath(m.config.Beegfs) _, err = exec.LookPath(m.config.Beegfs)
if err != nil { if err != nil {
return fmt.Errorf("BeegfsStorageCollector.Init(): Failed to find beegfs-ctl binary '%s': %w", m.config.Beegfs, err) return fmt.Errorf("%s Init(): Failed to find beegfs-ctl binary '%s': %w", m.name, m.config.Beegfs, err)
} }
m.init = true m.init = true
return nil return nil

View File

@@ -8,6 +8,7 @@
package collectors package collectors
import ( import (
"bytes"
"encoding/json" "encoding/json"
"fmt" "fmt"
"sync" "sync"
@@ -48,6 +49,7 @@ var AvailableCollectors = map[string]MetricCollector{
"schedstat": new(SchedstatCollector), "schedstat": new(SchedstatCollector),
"nfsiostat": new(NfsIOStatCollector), "nfsiostat": new(NfsIOStatCollector),
"slurm_cgroup": new(SlurmCgroupCollector), "slurm_cgroup": new(SlurmCgroupCollector),
"smartmon": new(SmartMonCollector),
} }
// Metric collector manager data structure // Metric collector manager data structure
@@ -88,10 +90,10 @@ func (cm *collectorManager) Init(ticker mct.MultiChanTicker, duration time.Durat
cm.ticker = ticker cm.ticker = ticker
cm.duration = duration cm.duration = duration
err := json.Unmarshal(collectConfig, &cm.config) d := json.NewDecoder(bytes.NewReader(collectConfig))
if err != nil { d.DisallowUnknownFields()
cclog.Error(err.Error()) if err := d.Decode(&cm.config); err != nil {
return err return fmt.Errorf("%s Init(): Error decoding collector manager config: %w", "CollectorManager", err)
} }
// Initialize configured collectors // Initialize configured collectors
@@ -102,7 +104,7 @@ func (cm *collectorManager) Init(ticker mct.MultiChanTicker, duration time.Durat
} }
collector := AvailableCollectors[collectorName] collector := AvailableCollectors[collectorName]
err = collector.Init(collectorCfg) err := collector.Init(collectorCfg)
if err != nil { if err != nil {
cclog.ComponentError("CollectorManager", fmt.Sprintf("Collector %s initialization failed: %v", collectorName, err)) cclog.ComponentError("CollectorManager", fmt.Sprintf("Collector %s initialization failed: %v", collectorName, err))
continue continue

View File

@@ -12,7 +12,9 @@ hugo_path: docs/reference/cc-metric-collector/collectors/cpufreq_cpuinfo.md
## `cpufreq_cpuinfo` collector ## `cpufreq_cpuinfo` collector
```json ```json
"cpufreq_cpuinfo": {} "cpufreq_cpuinfo": {
"exclude_metrics": []
}
``` ```
The `cpufreq_cpuinfo` collector reads the clock frequency from `/proc/cpuinfo` and outputs a handful **hwthread** metrics. The `cpufreq_cpuinfo` collector reads the clock frequency from `/proc/cpuinfo` and outputs a handful **hwthread** metrics.

View File

@@ -8,6 +8,7 @@
package collectors package collectors
import ( import (
"bytes"
"encoding/json" "encoding/json"
"fmt" "fmt"
"os" "os"
@@ -54,9 +55,10 @@ func (m *CPUFreqCollector) Init(config json.RawMessage) error {
} }
m.parallel = true m.parallel = true
if len(config) > 0 { if len(config) > 0 {
err := json.Unmarshal(config, &m.config) d := json.NewDecoder(bytes.NewReader(config))
if err != nil { d.DisallowUnknownFields()
return err if err := d.Decode(&m.config); err != nil {
return fmt.Errorf("%s Init(): failed to decode JSON config: %w", m.name, err)
} }
} }
m.meta = map[string]string{ m.meta = map[string]string{
@@ -77,7 +79,7 @@ func (m *CPUFreqCollector) Init(config json.RawMessage) error {
scalingCurFreqFile := filepath.Join("/sys/devices/system/cpu", fmt.Sprintf("cpu%d", c.CpuID), "cpufreq/scaling_cur_freq") scalingCurFreqFile := filepath.Join("/sys/devices/system/cpu", fmt.Sprintf("cpu%d", c.CpuID), "cpufreq/scaling_cur_freq")
err := unix.Access(scalingCurFreqFile, unix.R_OK) err := unix.Access(scalingCurFreqFile, unix.R_OK)
if err != nil { if err != nil {
return fmt.Errorf("unable to access file '%s': %w", scalingCurFreqFile, err) return fmt.Errorf("%s Init(): unable to access file '%s': %w", m.name, scalingCurFreqFile, err)
} }
m.topology = append(m.topology, m.topology = append(m.topology,

View File

@@ -9,6 +9,7 @@ package collectors
import ( import (
"bufio" "bufio"
"bytes"
"encoding/json" "encoding/json"
"fmt" "fmt"
"os" "os"
@@ -53,9 +54,10 @@ func (m *CpustatCollector) Init(config json.RawMessage) error {
"type": "node", "type": "node",
} }
if len(config) > 0 { if len(config) > 0 {
err := json.Unmarshal(config, &m.config) d := json.NewDecoder(bytes.NewReader(config))
if err != nil { d.DisallowUnknownFields()
return err if err := d.Decode(&m.config); err != nil {
return fmt.Errorf("%s Init(): Error decoding JSON config: %w", m.name, err)
} }
} }
matches := map[string]int{ matches := map[string]int{
@@ -79,19 +81,10 @@ func (m *CpustatCollector) Init(config json.RawMessage) error {
} }
// Check input file // Check input file
file, err := os.Open(string(CPUSTATFILE)) file, err := os.Open(CPUSTATFILE)
if err != nil { if err != nil {
cclog.ComponentError( return fmt.Errorf("%s Init(): Failed to open file '%s': %w", m.name, CPUSTATFILE, err)
m.name,
fmt.Sprintf("Init(): Failed to open file '%s': %v", string(CPUSTATFILE), err))
} }
defer func() {
if err := file.Close(); err != nil {
cclog.ComponentError(
m.name,
fmt.Sprintf("Init(): Failed to close file '%s': %v", string(CPUSTATFILE), err))
}
}()
// Pre-generate tags for all CPUs // Pre-generate tags for all CPUs
num_cpus := 0 num_cpus := 0
@@ -120,6 +113,12 @@ func (m *CpustatCollector) Init(config json.RawMessage) error {
num_cpus++ num_cpus++
} }
} }
// Close file
if err := file.Close(); err != nil {
return fmt.Errorf("%s Init(): Failed to close file '%s': %w", m.name, CPUSTATFILE, err)
}
m.lastTimestamp = time.Now() m.lastTimestamp = time.Now()
m.init = true m.init = true
return nil return nil
@@ -166,11 +165,11 @@ func (m *CpustatCollector) Read(interval time.Duration, output chan lp.CCMessage
now := time.Now() now := time.Now()
tsdelta := now.Sub(m.lastTimestamp) tsdelta := now.Sub(m.lastTimestamp)
file, err := os.Open(string(CPUSTATFILE)) file, err := os.Open(CPUSTATFILE)
if err != nil { if err != nil {
cclog.ComponentError( cclog.ComponentError(
m.name, m.name,
fmt.Sprintf("Read(): Failed to open file '%s': %v", string(CPUSTATFILE), err)) fmt.Sprintf("Read(): Failed to open file '%s': %v", CPUSTATFILE, err))
} }
defer func() { defer func() {
if err := file.Close(); err != nil { if err := file.Close(); err != nil {

View File

@@ -8,8 +8,8 @@
package collectors package collectors
import ( import (
"bytes"
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"os" "os"
"os/exec" "os/exec"
@@ -47,8 +47,10 @@ func (m *CustomCmdCollector) Init(config json.RawMessage) error {
// Read configuration // Read configuration
if len(config) > 0 { if len(config) > 0 {
if err := json.Unmarshal(config, &m.config); err != nil { d := json.NewDecoder(bytes.NewReader(config))
return fmt.Errorf("%s Init(): json.Unmarshal() call failed: %w", m.name, err) d.DisallowUnknownFields()
if err := d.Decode(&m.config); err != nil {
return fmt.Errorf("%s Init(): Error decoding JSON config: %w", m.name, err)
} }
} }
@@ -82,7 +84,7 @@ func (m *CustomCmdCollector) Init(config json.RawMessage) error {
} }
if len(m.files) == 0 && len(m.cmdFieldsSlice) == 0 { if len(m.files) == 0 && len(m.cmdFieldsSlice) == 0 {
return errors.New("no metrics to collect") return fmt.Errorf("%s Init(): no metrics to collect", m.name)
} }
m.init = true m.init = true
return nil return nil

View File

@@ -9,6 +9,7 @@ package collectors
import ( import (
"bufio" "bufio"
"bytes"
"encoding/json" "encoding/json"
"fmt" "fmt"
"os" "os"
@@ -42,8 +43,10 @@ func (m *DiskstatCollector) Init(config json.RawMessage) error {
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err) return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
} }
if len(config) > 0 { if len(config) > 0 {
if err := json.Unmarshal(config, &m.config); err != nil { d := json.NewDecoder(bytes.NewReader(config))
return err d.DisallowUnknownFields()
if err := d.Decode(&m.config); err != nil {
return fmt.Errorf("%s Init(): Error decoding JSON config: %w", m.name, err)
} }
} }
m.allowedMetrics = map[string]bool{ m.allowedMetrics = map[string]bool{

View File

@@ -32,7 +32,7 @@ type GpfsCollectorState map[string]int64
type GpfsCollectorConfig struct { type GpfsCollectorConfig struct {
Mmpmon string `json:"mmpmon_path,omitempty"` Mmpmon string `json:"mmpmon_path,omitempty"`
ExcludeFilesystem []string `json:"exclude_filesystem,omitempty"` ExcludeFilesystems []string `json:"exclude_filesystem,omitempty"`
ExcludeMetrics []string `json:"exclude_metrics,omitempty"` ExcludeMetrics []string `json:"exclude_metrics,omitempty"`
Sudo bool `json:"use_sudo,omitempty"` Sudo bool `json:"use_sudo,omitempty"`
SendAbsoluteValues bool `json:"send_abs_values,omitempty"` SendAbsoluteValues bool `json:"send_abs_values,omitempty"`
@@ -322,9 +322,10 @@ func (m *GpfsCollector) Init(config json.RawMessage) error {
// Read JSON configuration // Read JSON configuration
if len(config) > 0 { if len(config) > 0 {
err := json.Unmarshal(config, &m.config) d := json.NewDecoder(bytes.NewReader(config))
if err != nil { d.DisallowUnknownFields()
return fmt.Errorf("%s Init(): failed to unmarshal JSON config: %w", m.name, err) if err := d.Decode(&m.config); err != nil {
return fmt.Errorf("%s Init(): failed to decode JSON config: %w", m.name, err)
} }
} }
m.meta = map[string]string{ m.meta = map[string]string{
@@ -336,7 +337,7 @@ func (m *GpfsCollector) Init(config json.RawMessage) error {
"filesystem": "", "filesystem": "",
} }
m.skipFS = make(map[string]struct{}) m.skipFS = make(map[string]struct{})
for _, fs := range m.config.ExcludeFilesystem { for _, fs := range m.config.ExcludeFilesystems {
m.skipFS[fs] = struct{}{} m.skipFS[fs] = struct{}{}
} }
m.lastState = make(map[string]GpfsCollectorState) m.lastState = make(map[string]GpfsCollectorState)
@@ -346,18 +347,15 @@ func (m *GpfsCollector) Init(config json.RawMessage) error {
if !m.config.Sudo { if !m.config.Sudo {
user, err := user.Current() user, err := user.Current()
if err != nil { if err != nil {
cclog.ComponentError(m.name, "Failed to get current user:", err.Error()) return fmt.Errorf("%s Init(): failed to get current user: %w", m.name, err)
return err
} }
if user.Uid != "0" { if user.Uid != "0" {
cclog.ComponentError(m.name, "GPFS file system statistics can only be queried by user root") return fmt.Errorf("%s Init(): GPFS file system statistics can only be queried by user root", m.name)
return err
} }
} else { } else {
p, err := exec.LookPath("sudo") p, err := exec.LookPath("sudo")
if err != nil { if err != nil {
cclog.ComponentError(m.name, "Cannot find 'sudo'") return fmt.Errorf("%s Init(): cannot find 'sudo': %w", m.name, err)
return err
} }
m.sudoCmd = p m.sudoCmd = p
} }
@@ -377,7 +375,6 @@ func (m *GpfsCollector) Init(config json.RawMessage) error {
// the file was given in the config, use it // the file was given in the config, use it
p = m.config.Mmpmon p = m.config.Mmpmon
} else { } else {
cclog.ComponentError(m.name, fmt.Sprintf("failed to find mmpmon binary '%s': %v", m.config.Mmpmon, err))
return fmt.Errorf("%s Init(): failed to find mmpmon binary '%s': %w", m.name, m.config.Mmpmon, err) return fmt.Errorf("%s Init(): failed to find mmpmon binary '%s': %w", m.name, m.config.Mmpmon, err)
} }
} }
@@ -434,7 +431,7 @@ func (m *GpfsCollector) Init(config json.RawMessage) error {
} }
} }
if len(m.definitions) == 0 { if len(m.definitions) == 0 {
return errors.New("no metrics to collect") return fmt.Errorf("%s Init(): no metrics to collect", m.name)
} }
m.init = true m.init = true

View File

@@ -14,7 +14,7 @@ hugo_path: docs/reference/cc-metric-collector/collectors/gpfs.md
```json ```json
"gpfs": { "gpfs": {
"mmpmon_path": "/path/to/mmpmon", "mmpmon_path": "/path/to/mmpmon",
"use_sudo": "true", "use_sudo": true,
"exclude_filesystem": [ "exclude_filesystem": [
"fs1" "fs1"
], ],

View File

@@ -8,6 +8,7 @@
package collectors package collectors
import ( import (
"bytes"
"encoding/json" "encoding/json"
"fmt" "fmt"
"os" "os"
@@ -79,9 +80,10 @@ func (m *InfinibandCollector) Init(config json.RawMessage) error {
m.config.SendDerivedValues = false m.config.SendDerivedValues = false
// Read configuration file, allow overwriting default config // Read configuration file, allow overwriting default config
if len(config) > 0 { if len(config) > 0 {
err = json.Unmarshal(config, &m.config) d := json.NewDecoder(bytes.NewReader(config))
if err != nil { d.DisallowUnknownFields()
return err if err := d.Decode(&m.config); err != nil {
return fmt.Errorf("%s Init(): failed to decode JSON config: %w", m.name, err)
} }
} }

View File

@@ -9,8 +9,8 @@ package collectors
import ( import (
"bufio" "bufio"
"bytes"
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"os" "os"
"slices" "slices"
@@ -44,7 +44,6 @@ type IOstatCollector struct {
} }
func (m *IOstatCollector) Init(config json.RawMessage) error { func (m *IOstatCollector) Init(config json.RawMessage) error {
var err error
m.name = "IOstatCollector" m.name = "IOstatCollector"
m.parallel = true m.parallel = true
m.meta = map[string]string{"source": m.name, "group": "Disk"} m.meta = map[string]string{"source": m.name, "group": "Disk"}
@@ -52,9 +51,10 @@ func (m *IOstatCollector) Init(config json.RawMessage) error {
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err) return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
} }
if len(config) > 0 { if len(config) > 0 {
err = json.Unmarshal(config, &m.config) d := json.NewDecoder(bytes.NewReader(config))
if err != nil { d.DisallowUnknownFields()
return err if err := d.Decode(&m.config); err != nil {
return fmt.Errorf("%s Init(): Error decoding JSON config: %w", m.name, err)
} }
} }
// https://www.kernel.org/doc/html/latest/admin-guide/iostats.html // https://www.kernel.org/doc/html/latest/admin-guide/iostats.html
@@ -85,7 +85,7 @@ func (m *IOstatCollector) Init(config json.RawMessage) error {
} }
} }
if len(m.matches) == 0 { if len(m.matches) == 0 {
return errors.New("no metrics to collect") return fmt.Errorf("%s Init(): no metrics to collect", m.name)
} }
file, err := os.Open(IOSTATFILE) file, err := os.Open(IOSTATFILE)
if err != nil { if err != nil {
@@ -135,7 +135,7 @@ func (m *IOstatCollector) Init(config json.RawMessage) error {
} }
m.init = true m.init = true
return err return nil
} }
func (m *IOstatCollector) Read(interval time.Duration, output chan lp.CCMessage) { func (m *IOstatCollector) Read(interval time.Duration, output chan lp.CCMessage) {

View File

@@ -11,7 +11,6 @@ import (
"bufio" "bufio"
"bytes" "bytes"
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"io" "io"
"os/exec" "os/exec"
@@ -32,7 +31,9 @@ type IpmiCollector struct {
ExcludeDevices []string `json:"exclude_devices"` ExcludeDevices []string `json:"exclude_devices"`
IpmitoolPath string `json:"ipmitool_path"` IpmitoolPath string `json:"ipmitool_path"`
IpmisensorsPath string `json:"ipmisensors_path"` IpmisensorsPath string `json:"ipmisensors_path"`
Sudo bool `json:"use_sudo"`
} }
ipmitool string ipmitool string
ipmisensors string ipmisensors string
} }
@@ -55,57 +56,75 @@ func (m *IpmiCollector) Init(config json.RawMessage) error {
// default path to IPMI tools // default path to IPMI tools
m.config.IpmitoolPath = "ipmitool" m.config.IpmitoolPath = "ipmitool"
m.config.IpmisensorsPath = "ipmi-sensors" m.config.IpmisensorsPath = "ipmi-sensors"
if len(config) > 0 { if len(config) > 0 {
err := json.Unmarshal(config, &m.config) d := json.NewDecoder(bytes.NewReader(config))
if err != nil { d.DisallowUnknownFields()
return err if err := d.Decode(&m.config); err != nil {
return fmt.Errorf("%s Init(): Error decoding JSON config: %w", m.name, err)
} }
} }
// Check if executables ipmitool or ipmisensors are found
p, err := exec.LookPath(m.config.IpmitoolPath)
if err == nil {
command := exec.Command(p)
err := command.Run()
if err != nil {
cclog.ComponentError(m.name, fmt.Sprintf("Failed to execute %s: %v", p, err.Error()))
m.ipmitool = ""
} else {
m.ipmitool = p
}
}
p, err = exec.LookPath(m.config.IpmisensorsPath)
if err == nil {
command := exec.Command(p)
err := command.Run()
if err != nil {
cclog.ComponentError(m.name, fmt.Sprintf("Failed to execute %s: %v", p, err.Error()))
m.ipmisensors = ""
} else {
m.ipmisensors = p
}
}
if len(m.ipmitool) == 0 && len(m.ipmisensors) == 0 {
return errors.New("no usable IPMI reader found")
}
m.ipmitool = m.config.IpmitoolPath
m.ipmisensors = m.config.IpmisensorsPath
// Test if any of the supported backends work
var dummyChan chan lp.CCMessage
dummyConsumer := func() {
for range dummyChan {
}
}
// Test if ipmi-sensors works (preferred over ipmitool, because it's faster)
var ipmiSensorsErr error
if _, ipmiSensorsErr = exec.LookPath(m.ipmisensors); ipmiSensorsErr == nil {
dummyChan = make(chan lp.CCMessage)
go dummyConsumer()
ipmiSensorsErr = m.readIpmiSensors(dummyChan)
close(dummyChan)
if ipmiSensorsErr == nil {
cclog.ComponentDebugf(m.name, "Using ipmi-sensors for ipmistat collector")
m.init = true m.init = true
return nil return nil
} }
}
cclog.ComponentDebugf(m.name, "Unable to use ipmi-sensors for ipmistat collector: %v", ipmiSensorsErr)
m.ipmisensors = ""
func (m *IpmiCollector) readIpmiTool(cmd string, output chan lp.CCMessage) { // Test if ipmitool works (may be very slow)
var ipmiToolErr error
if _, ipmiToolErr = exec.LookPath(m.ipmitool); ipmiToolErr == nil {
dummyChan = make(chan lp.CCMessage)
go dummyConsumer()
ipmiToolErr = m.readIpmiTool(dummyChan)
close(dummyChan)
if ipmiToolErr == nil {
cclog.ComponentDebugf(m.name, "Using ipmitool for ipmistat collector")
m.init = true
return nil
}
}
m.ipmitool = ""
cclog.ComponentDebugf(m.name, "Unable to use ipmitool for ipmistat collector: %v", ipmiToolErr)
return fmt.Errorf("unable to init neither ipmitool (%w) nor ipmi-sensors (%w)", ipmiToolErr, ipmiSensorsErr)
}
func (m *IpmiCollector) readIpmiTool(output chan lp.CCMessage) error {
// Setup ipmitool command // Setup ipmitool command
command := exec.Command(cmd, "sensor") argv := make([]string, 0)
if m.config.Sudo {
argv = append(argv, "sudo", "-n")
}
argv = append(argv, m.ipmitool, "sensor")
command := exec.Command(argv[0], argv[1:]...)
stdout, _ := command.StdoutPipe() stdout, _ := command.StdoutPipe()
errBuf := new(bytes.Buffer) errBuf := new(bytes.Buffer)
command.Stderr = errBuf command.Stderr = errBuf
// start command // start command
if err := command.Start(); err != nil { if err := command.Start(); err != nil {
cclog.ComponentError( return fmt.Errorf("failed to start command '%s': %w", command.String(), err)
m.name,
fmt.Sprintf("readIpmiTool(): Failed to start command \"%s\": %v", command.String(), err),
)
return
} }
// Read command output // Read command output
@@ -115,8 +134,17 @@ func (m *IpmiCollector) readIpmiTool(cmd string, output chan lp.CCMessage) {
if len(lv) < 3 { if len(lv) < 3 {
continue continue
} }
if strings.TrimSpace(lv[1]) == "0x0" || strings.TrimSpace(lv[1]) == "na" {
// Ignore known non-float values
continue
}
v, err := strconv.ParseFloat(strings.TrimSpace(lv[1]), 64) v, err := strconv.ParseFloat(strings.TrimSpace(lv[1]), 64)
if err == nil { if err != nil {
cclog.ComponentErrorf(m.name, "Failed to parse float '%s': %v", lv[1], err)
continue
}
name := strings.ToLower(strings.ReplaceAll(strings.TrimSpace(lv[0]), " ", "_")) name := strings.ToLower(strings.ReplaceAll(strings.TrimSpace(lv[0]), " ", "_"))
unit := strings.TrimSpace(lv[2]) unit := strings.TrimSpace(lv[2])
switch unit { switch unit {
@@ -131,70 +159,75 @@ func (m *IpmiCollector) readIpmiTool(cmd string, output chan lp.CCMessage) {
} }
y, err := lp.NewMessage(name, map[string]string{"type": "node"}, m.meta, map[string]any{"value": v}, time.Now()) y, err := lp.NewMessage(name, map[string]string{"type": "node"}, m.meta, map[string]any{"value": v}, time.Now())
if err == nil { if err != nil {
cclog.ComponentErrorf(m.name, "Failed to create message: %v", err)
continue
}
y.AddMeta("unit", unit) y.AddMeta("unit", unit)
output <- y output <- y
} }
}
}
// Wait for command end // Wait for command end
if err := command.Wait(); err != nil { if err := command.Wait(); err != nil {
errMsg, _ := io.ReadAll(errBuf) errMsg, _ := io.ReadAll(errBuf)
cclog.ComponentError( return fmt.Errorf("failed to complete command '%s': %w (stderr: %s)", command.String(), err, strings.TrimSpace(string(errMsg)))
m.name,
fmt.Sprintf("readIpmiTool(): Failed to wait for the end of command \"%s\": %v\n", command.String(), err),
)
cclog.ComponentError(m.name, fmt.Sprintf("readIpmiTool(): command stderr: \"%s\"\n", strings.TrimSpace(string(errMsg))))
return
}
} }
func (m *IpmiCollector) readIpmiSensors(cmd string, output chan lp.CCMessage) { return nil
}
func (m *IpmiCollector) readIpmiSensors(output chan lp.CCMessage) error {
// Setup ipmisensors command // Setup ipmisensors command
command := exec.Command(cmd, "--comma-separated-output", "--sdr-cache-recreate") argv := make([]string, 0)
if m.config.Sudo {
argv = append(argv, "sudo", "-n")
}
argv = append(argv, m.ipmisensors, "--comma-separated-output", "--sdr-cache-recreate")
command := exec.Command(argv[0], argv[1:]...)
stdout, _ := command.StdoutPipe() stdout, _ := command.StdoutPipe()
errBuf := new(bytes.Buffer) errBuf := new(bytes.Buffer)
command.Stderr = errBuf command.Stderr = errBuf
// start command // start command
if err := command.Start(); err != nil { if err := command.Start(); err != nil {
cclog.ComponentError( return fmt.Errorf("failed to start command '%s': %w", command.String(), err)
m.name,
fmt.Sprintf("readIpmiSensors(): Failed to start command \"%s\": %v", command.String(), err),
)
return
} }
// Read command output // Read command output
scanner := bufio.NewScanner(stdout) scanner := bufio.NewScanner(stdout)
for scanner.Scan() { for scanner.Scan() {
lv := strings.Split(scanner.Text(), ",") lv := strings.Split(scanner.Text(), ",")
if len(lv) > 3 { if len(lv) <= 3 {
v, err := strconv.ParseFloat(lv[3], 64) continue
if err == nil { }
if lv[3] == "N/A" || lv[3] == "Reading" {
// Ignore known non-float values
continue
}
v, err := strconv.ParseFloat(strings.TrimSpace(lv[3]), 64)
if err != nil {
cclog.ComponentErrorf(m.name, "Failed to parse float '%s': %v", lv[3], err)
continue
}
name := strings.ToLower(strings.ReplaceAll(lv[1], " ", "_")) name := strings.ToLower(strings.ReplaceAll(lv[1], " ", "_"))
y, err := lp.NewMessage(name, map[string]string{"type": "node"}, m.meta, map[string]any{"value": v}, time.Now()) y, err := lp.NewMessage(name, map[string]string{"type": "node"}, m.meta, map[string]any{"value": v}, time.Now())
if err == nil { if err != nil {
cclog.ComponentErrorf(m.name, "Failed to create message: %v", err)
continue
}
if len(lv) > 4 { if len(lv) > 4 {
y.AddMeta("unit", lv[4]) y.AddMeta("unit", lv[4])
} }
output <- y output <- y
} }
}
}
}
// Wait for command end // Wait for command end
if err := command.Wait(); err != nil { if err := command.Wait(); err != nil {
errMsg, _ := io.ReadAll(errBuf) errMsg, _ := io.ReadAll(errBuf)
cclog.ComponentError( return fmt.Errorf("failed to complete command '%s': %w (stderr: %s)", command.String(), err, strings.TrimSpace(string(errMsg)))
m.name,
fmt.Sprintf("readIpmiSensors(): Failed to wait for the end of command \"%s\": %v\n", command.String(), err),
)
cclog.ComponentError(m.name, fmt.Sprintf("readIpmiSensors(): command stderr: \"%s\"\n", strings.TrimSpace(string(errMsg))))
return
} }
return nil
} }
func (m *IpmiCollector) Read(interval time.Duration, output chan lp.CCMessage) { func (m *IpmiCollector) Read(interval time.Duration, output chan lp.CCMessage) {
@@ -203,10 +236,16 @@ func (m *IpmiCollector) Read(interval time.Duration, output chan lp.CCMessage) {
return return
} }
if len(m.config.IpmitoolPath) > 0 { if len(m.ipmisensors) > 0 {
m.readIpmiTool(m.config.IpmitoolPath, output) err := m.readIpmiSensors(output)
} else if len(m.config.IpmisensorsPath) > 0 { if err != nil {
m.readIpmiSensors(m.config.IpmisensorsPath, output) cclog.ComponentErrorf(m.name, "readIpmiSensors() failed: %v", err)
}
} else if len(m.ipmitool) > 0 {
err := m.readIpmiTool(output)
if err != nil {
cclog.ComponentErrorf(m.name, "readIpmiTool() failed: %v", err)
}
} }
} }

View File

@@ -15,9 +15,24 @@ hugo_path: docs/reference/cc-metric-collector/collectors/ipmi.md
"ipmistat": { "ipmistat": {
"ipmitool_path": "/path/to/ipmitool", "ipmitool_path": "/path/to/ipmitool",
"ipmisensors_path": "/path/to/ipmi-sensors", "ipmisensors_path": "/path/to/ipmi-sensors",
"use_sudo": true
} }
``` ```
The `ipmistat` collector reads data from `ipmitool` (`ipmitool sensor`) or `ipmi-sensors` (`ipmi-sensors --sdr-cache-recreate --comma-separated-output`). The `ipmistat` collector reads data from `ipmitool` (`ipmitool sensor`) or `ipmi-sensors` (`ipmi-sensors --sdr-cache-recreate --comma-separated-output`).
The metrics depend on the output of the underlying tools but contain temperature, power and energy metrics. The metrics depend on the output of the underlying tools but contain temperature, power and energy metrics.
ipmitool and ipmi-sensors typically require root to run.
In order to cc-metric-collector without root priviliges, you can enable `use_sudo`.
Add a file like this in /etc/sudoers.d/ to allow cc-metric-collector to run this command:
```
# Do not log the following sudo commands from monitoring, since this causes a lot of log spam.
# However keep log_denied enabled, to detect failures
Defaults: monitoring !log_allowed, !pam_session
# Allow to use ipmitool and ipmi-sensors
monitoring ALL = (root) NOPASSWD:/usr/bin/ipmitool sensor
monitoring ALL = (root) NOPASSWD:/usr/sbin/ipmi-sensors --comma-separated-output --sdr-cache-recreate
```

View File

@@ -16,8 +16,8 @@ package collectors
import "C" import "C"
import ( import (
"bytes"
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"maps" "maps"
"math" "math"
@@ -207,24 +207,25 @@ func (m *LikwidCollector) Init(config json.RawMessage) error {
m.config.LibraryPath = LIKWID_LIB_NAME m.config.LibraryPath = LIKWID_LIB_NAME
m.config.LockfilePath = LIKWID_DEF_LOCKFILE m.config.LockfilePath = LIKWID_DEF_LOCKFILE
if len(config) > 0 { if len(config) > 0 {
err := json.Unmarshal(config, &m.config) d := json.NewDecoder(bytes.NewReader(config))
if err != nil { d.DisallowUnknownFields()
return fmt.Errorf("%s Init(): failed to unmarshal JSON config: %w", m.name, err) if err := d.Decode(&m.config); err != nil {
return fmt.Errorf("%s Init(): Error decoding JSON config: %w", m.name, err)
} }
} }
lib := dl.New(m.config.LibraryPath, LIKWID_LIB_DL_FLAGS) lib := dl.New(m.config.LibraryPath, LIKWID_LIB_DL_FLAGS)
if lib == nil { if lib == nil {
return fmt.Errorf("error instantiating DynamicLibrary for %s", m.config.LibraryPath) return fmt.Errorf("%s Init(): error instantiating DynamicLibrary for %s", m.name, m.config.LibraryPath)
} }
err := lib.Open() err := lib.Open()
if err != nil { if err != nil {
return fmt.Errorf("error opening %s: %w", m.config.LibraryPath, err) return fmt.Errorf("%s Init(): error opening %s: %w", m.name, m.config.LibraryPath, err)
} }
if m.config.ForceOverwrite { if m.config.ForceOverwrite {
cclog.ComponentDebug(m.name, "Set LIKWID_FORCE=1") cclog.ComponentDebug(m.name, "Set LIKWID_FORCE=1")
if err := os.Setenv("LIKWID_FORCE", "1"); err != nil { if err := os.Setenv("LIKWID_FORCE", "1"); err != nil {
return fmt.Errorf("error setting environment variable LIKWID_FORCE=1: %w", err) return fmt.Errorf("%s Init(): error setting environment variable LIKWID_FORCE=1: %w", m.name, err)
} }
} }
if err := m.setup(); err != nil { if err := m.setup(); err != nil {
@@ -295,16 +296,12 @@ func (m *LikwidCollector) Init(config json.RawMessage) error {
// If no event set could be added, shut down LikwidCollector // If no event set could be added, shut down LikwidCollector
if totalMetrics == 0 { if totalMetrics == 0 {
err := errors.New("no LIKWID eventset or metric usable") return fmt.Errorf("%s Init(): no LIKWID eventset or metric usable", m.name)
cclog.ComponentError(m.name, err.Error())
return err
} }
ret := C.topology_init() ret := C.topology_init()
if ret != 0 { if ret != 0 {
err := errors.New("failed to initialize topology module") return fmt.Errorf("%s Init(): failed to initialize topology module", m.name)
cclog.ComponentError(m.name, err.Error())
return err
} }
m.measureThread = thread.New() m.measureThread = thread.New()
switch m.config.AccessMode { switch m.config.AccessMode {
@@ -319,7 +316,7 @@ func (m *LikwidCollector) Init(config json.RawMessage) error {
p = m.config.DaemonPath p = m.config.DaemonPath
} }
if err := os.Setenv("PATH", p); err != nil { if err := os.Setenv("PATH", p); err != nil {
return fmt.Errorf("error setting environment variable PATH=%s: %w", p, err) return fmt.Errorf("%s Init(): error setting environment variable PATH=%s: %w", m.name, p, err)
} }
} }
C.HPMmode(1) C.HPMmode(1)

View File

@@ -8,6 +8,7 @@
package collectors package collectors
import ( import (
"bytes"
"encoding/json" "encoding/json"
"fmt" "fmt"
"os" "os"
@@ -48,9 +49,10 @@ func (m *LoadavgCollector) Init(config json.RawMessage) error {
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err) return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
} }
if len(config) > 0 { if len(config) > 0 {
err := json.Unmarshal(config, &m.config) d := json.NewDecoder(bytes.NewReader(config))
if err != nil { d.DisallowUnknownFields()
return err if err := d.Decode(&m.config); err != nil {
return fmt.Errorf("%s Init(): Error decoding JSON config: %w", m.name, err)
} }
} }
m.meta = map[string]string{ m.meta = map[string]string{
@@ -63,16 +65,17 @@ func (m *LoadavgCollector) Init(config json.RawMessage) error {
"load_five", "load_five",
"load_fifteen", "load_fifteen",
} }
m.load_skips = make([]bool, len(m.load_matches))
m.proc_matches = []string{ m.proc_matches = []string{
"proc_run", "proc_run",
"proc_total", "proc_total",
} }
m.proc_skips = make([]bool, len(m.proc_matches))
m.load_skips = make([]bool, len(m.load_matches))
for i, name := range m.load_matches { for i, name := range m.load_matches {
m.load_skips[i] = slices.Contains(m.config.ExcludeMetrics, name) m.load_skips[i] = slices.Contains(m.config.ExcludeMetrics, name)
} }
m.proc_skips = make([]bool, len(m.proc_matches))
for i, name := range m.proc_matches { for i, name := range m.proc_matches {
m.proc_skips[i] = slices.Contains(m.config.ExcludeMetrics, name) m.proc_skips[i] = slices.Contains(m.config.ExcludeMetrics, name)
} }

View File

@@ -8,6 +8,7 @@
package collectors package collectors
import ( import (
"bytes"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
@@ -18,7 +19,6 @@ import (
"strings" "strings"
"time" "time"
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
lp "github.com/ClusterCockpit/cc-lib/v2/ccMessage" lp "github.com/ClusterCockpit/cc-lib/v2/ccMessage"
) )
@@ -300,9 +300,10 @@ func (m *LustreCollector) Init(config json.RawMessage) error {
m.name = "LustreCollector" m.name = "LustreCollector"
m.parallel = true m.parallel = true
if len(config) > 0 { if len(config) > 0 {
err = json.Unmarshal(config, &m.config) d := json.NewDecoder(bytes.NewReader(config))
if err != nil { d.DisallowUnknownFields()
return err if err := d.Decode(&m.config); err != nil {
return fmt.Errorf("%s Init(): Error decoding JSON config: %w", m.name, err)
} }
} }
if err := m.setup(); err != nil { if err := m.setup(); err != nil {
@@ -316,18 +317,15 @@ func (m *LustreCollector) Init(config json.RawMessage) error {
if !m.config.Sudo { if !m.config.Sudo {
user, err := user.Current() user, err := user.Current()
if err != nil { if err != nil {
cclog.ComponentError(m.name, "Failed to get current user:", err.Error()) return fmt.Errorf("%s Init(): Failed to get current user: %w", m.name, err)
return err
} }
if user.Uid != "0" { if user.Uid != "0" {
cclog.ComponentError(m.name, "Lustre file system statistics can only be queried by user root") return fmt.Errorf("%s Init(): Lustre file system statistics can only be queried by user root", m.name)
return err
} }
} else { } else {
p, err := exec.LookPath("sudo") p, err := exec.LookPath("sudo")
if err != nil { if err != nil {
cclog.ComponentError(m.name, "Cannot find 'sudo'") return fmt.Errorf("%s Init(): Cannot find 'sudo': %w", m.name, err)
return err
} }
m.sudoCmd = p m.sudoCmd = p
} }
@@ -336,7 +334,7 @@ func (m *LustreCollector) Init(config json.RawMessage) error {
if err != nil { if err != nil {
p, err = exec.LookPath(LCTL_CMD) p, err = exec.LookPath(LCTL_CMD)
if err != nil { if err != nil {
return err return fmt.Errorf("%s Init(): Cannot find %s command: %w", m.name, LCTL_CMD, err)
} }
} }
m.lctl = p m.lctl = p
@@ -364,12 +362,12 @@ func (m *LustreCollector) Init(config json.RawMessage) error {
} }
} }
if len(m.definitions) == 0 { if len(m.definitions) == 0 {
return errors.New("no metrics to collect") return fmt.Errorf("%s Init(): no metrics to collect", m.name)
} }
devices := m.getDevices() devices := m.getDevices()
if len(devices) == 0 { if len(devices) == 0 {
return errors.New("no Lustre devices found") return fmt.Errorf("%s Init(): no Lustre devices found", m.name)
} }
m.stats = make(map[string]map[string]int64) m.stats = make(map[string]map[string]int64)
for _, d := range devices { for _, d := range devices {

View File

@@ -9,8 +9,8 @@ package collectors
import ( import (
"bufio" "bufio"
"bytes"
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"os" "os"
"path/filepath" "path/filepath"
@@ -95,15 +95,15 @@ func getStats(filename string) map[string]MemstatStats {
} }
func (m *MemstatCollector) Init(config json.RawMessage) error { func (m *MemstatCollector) Init(config json.RawMessage) error {
var err error
m.name = "MemstatCollector" m.name = "MemstatCollector"
m.parallel = true m.parallel = true
m.config.NodeStats = true m.config.NodeStats = true
m.config.NumaStats = false m.config.NumaStats = false
if len(config) > 0 { if len(config) > 0 {
err = json.Unmarshal(config, &m.config) d := json.NewDecoder(bytes.NewReader(config))
if err != nil { d.DisallowUnknownFields()
return err if err := d.Decode(&m.config); err != nil {
return fmt.Errorf("%s Init(): Error decoding JSON config: %w", m.name, err)
} }
} }
m.meta = map[string]string{"source": m.name, "group": "Memory"} m.meta = map[string]string{"source": m.name, "group": "Memory"}
@@ -120,7 +120,29 @@ func (m *MemstatCollector) Init(config json.RawMessage) error {
"Cached": "mem_cached", "Cached": "mem_cached",
"MemAvailable": "mem_available", "MemAvailable": "mem_available",
"SwapFree": "swap_free", "SwapFree": "swap_free",
"MemShared": "mem_shared", "Shmem": "mem_shared",
"Active": "mem_active",
"Inactive": "mem_inactive",
"Dirty": "mem_dirty",
"Writeback": "mem_writeback",
"AnonPages": "mem_anon_pages",
"Mapped": "mem_mapped",
"VmallocTotal": "mem_vmalloc_total",
"AnonHugePages": "mem_anon_hugepages",
"ShmemHugePages": "mem_shared_hugepages",
"ShmemPmdMapped": "mem_shared_pmd_mapped",
"HugePages_Total": "mem_hugepages_total",
"HugePages_Free": "mem_hugepages_free",
"HugePages_Rsvd": "mem_hugepages_reserved",
"HugePages_Surp": "mem_hugepages_surplus",
"Hugepagesize": "mem_hugepages_size",
"DirectMap4k": "mem_direct_mapped_4k",
"DirectMap4M": "mem_direct_mapped_4m",
"DirectMap2M": "mem_direct_mapped_2m",
"DirectMap1G": "mem_direct_mapped_1g",
"Mlocked": "mem_locked",
"PageTables": "mem_pagetables",
"KernelStack": "mem_kernelstack",
} }
for k, v := range matches { for k, v := range matches {
if !slices.Contains(m.config.ExcludeMetrics, k) { if !slices.Contains(m.config.ExcludeMetrics, k) {
@@ -132,7 +154,7 @@ func (m *MemstatCollector) Init(config json.RawMessage) error {
m.sendMemUsed = true m.sendMemUsed = true
} }
if len(m.matches) == 0 { if len(m.matches) == 0 {
return errors.New("no metrics to collect") return fmt.Errorf("%s Init(): no metrics to collect", m.name)
} }
if err := m.setup(); err != nil { if err := m.setup(); err != nil {
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err) return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
@@ -140,7 +162,7 @@ func (m *MemstatCollector) Init(config json.RawMessage) error {
if m.config.NodeStats { if m.config.NodeStats {
if stats := getStats(MEMSTATFILE); len(stats) == 0 { if stats := getStats(MEMSTATFILE); len(stats) == 0 {
return fmt.Errorf("cannot read data from file %s", MEMSTATFILE) return fmt.Errorf("%s Init(): cannot read data from file %s", m.name, MEMSTATFILE)
} }
} }
@@ -152,7 +174,7 @@ func (m *MemstatCollector) Init(config json.RawMessage) error {
m.nodefiles = make(map[int]MemstatCollectorNode) m.nodefiles = make(map[int]MemstatCollectorNode)
for _, f := range files { for _, f := range files {
if stats := getStats(f); len(stats) == 0 { if stats := getStats(f); len(stats) == 0 {
return fmt.Errorf("cannot read data from file %s", f) return fmt.Errorf("%s Init(): cannot read data from file %s", m.name, f)
} }
rematch := regex.FindStringSubmatch(f) rematch := regex.FindStringSubmatch(f)
if len(rematch) == 2 { if len(rematch) == 2 {
@@ -172,7 +194,7 @@ func (m *MemstatCollector) Init(config json.RawMessage) error {
} }
} }
m.init = true m.init = true
return err return nil
} }
func (m *MemstatCollector) Read(interval time.Duration, output chan lp.CCMessage) { func (m *MemstatCollector) Read(interval time.Duration, output chan lp.CCMessage) {
@@ -221,6 +243,12 @@ func (m *MemstatCollector) Read(interval time.Duration, output chan lp.CCMessage
unit = cacheVal.unit unit = cacheVal.unit
} }
} }
if shmemVal, shmem := stats["Shmem"]; shmem {
memUsed -= shmemVal.value
if len(shmemVal.unit) > 0 && len(unit) == 0 {
unit = shmemVal.unit
}
}
} }
} }
} }

View File

@@ -32,7 +32,29 @@ Metrics:
* `mem_cached` * `mem_cached`
* `mem_available` * `mem_available`
* `mem_shared` * `mem_shared`
* `mem_active`
* `mem_inactive`
* `mem_dirty`
* `mem_writeback`
* `mem_anon_pages`
* `mem_mapped`
* `mem_vmalloc_total`
* `mem_anon_hugepages`
* `mem_shared_hugepages`
* `mem_shared_pmd_mapped`
* `mem_hugepages_total`
* `mem_hugepages_free`
* `mem_hugepages_reserved`
* `mem_hugepages_surplus`
* `mem_hugepages_size`
* `mem_direct_mapped_4k`
* `mem_direct_mapped_2m`
* `mem_direct_mapped_4m`
* `mem_direct_mapped_1g`
* `mem_locked`
* `mem_pagetables`
* `mem_kernelstack`
* `swap_total` * `swap_total`
* `swap_free` * `swap_free`
* `mem_used` = `mem_total` - (`mem_free` + `mem_buffers` + `mem_cached`) * `mem_used` = `mem_total` - (`mem_free` + `mem_buffers` + `mem_cached` + `mem_shared`)

View File

@@ -9,6 +9,7 @@ package collectors
import ( import (
"bufio" "bufio"
"bytes"
"encoding/json" "encoding/json"
"fmt" "fmt"
"os" "os"
@@ -99,10 +100,10 @@ func (m *NetstatCollector) Init(config json.RawMessage) error {
m.config.SendDerivedValues = false m.config.SendDerivedValues = false
// Read configuration file, allow overwriting default config // Read configuration file, allow overwriting default config
if len(config) > 0 { if len(config) > 0 {
err := json.Unmarshal(config, &m.config) d := json.NewDecoder(bytes.NewReader(config))
if err != nil { d.DisallowUnknownFields()
cclog.ComponentError(m.name, "Error reading config:", err.Error()) if err := d.Decode(&m.config); err != nil {
return err return fmt.Errorf("%s Init(): failed to decode JSON config: %w", m.name, err)
} }
} }
@@ -133,11 +134,31 @@ func (m *NetstatCollector) Init(config json.RawMessage) error {
// Check if device is a included device // Check if device is a included device
if slices.Contains(m.config.IncludeDevices, canonical) { if slices.Contains(m.config.IncludeDevices, canonical) {
// Tag will contain original device name (raw). // Tag will contain original device name (raw).
tags := map[string]string{"stype": "network", "stype-id": raw, "type": "node"} tags := map[string]string{
meta_unit_byte := map[string]string{"source": m.name, "group": "Network", "unit": "bytes"} "stype": "network",
meta_unit_byte_per_sec := map[string]string{"source": m.name, "group": "Network", "unit": "bytes/sec"} "stype-id": raw,
meta_unit_pkts := map[string]string{"source": m.name, "group": "Network", "unit": "packets"} "type": "node",
meta_unit_pkts_per_sec := map[string]string{"source": m.name, "group": "Network", "unit": "packets/sec"} }
meta_unit_byte := map[string]string{
"source": m.name,
"group": "Network",
"unit": "bytes",
}
meta_unit_byte_per_sec := map[string]string{
"source": m.name,
"group": "Network",
"unit": "bytes/sec",
}
meta_unit_pkts := map[string]string{
"source": m.name,
"group": "Network",
"unit": "packets",
}
meta_unit_pkts_per_sec := map[string]string{
"source": m.name,
"group": "Network",
"unit": "packets/sec",
}
m.matches[canonical] = []NetstatCollectorMetric{ m.matches[canonical] = []NetstatCollectorMetric{
{ {

View File

@@ -8,6 +8,7 @@
package collectors package collectors
import ( import (
"bytes"
"encoding/json" "encoding/json"
"fmt" "fmt"
"slices" "slices"
@@ -45,12 +46,7 @@ type nfsCollector struct {
} }
func (m *nfsCollector) updateStats() error { func (m *nfsCollector) updateStats() error {
cmd := exec.Command(m.config.Nfsstats, `-l`, `--all`) cmd := exec.Command(m.config.Nfsstats, "-l", "--all")
// Wait for cmd end
if err := cmd.Wait(); err != nil {
return fmt.Errorf("%s updateStats(): %w", m.name, err)
}
buffer, err := cmd.Output() buffer, err := cmd.Output()
if err != nil { if err != nil {
@@ -95,9 +91,10 @@ func (m *nfsCollector) MainInit(config json.RawMessage) error {
m.config.Nfsstats = string(NFSSTAT_EXEC) m.config.Nfsstats = string(NFSSTAT_EXEC)
// Read JSON configuration // Read JSON configuration
if len(config) > 0 { if len(config) > 0 {
err := json.Unmarshal(config, &m.config) d := json.NewDecoder(bytes.NewReader(config))
if err != nil { d.DisallowUnknownFields()
return fmt.Errorf("%s Init(): failed to unmarshal JSON config: %w", m.name, err) if err := d.Decode(&m.config); err != nil {
return fmt.Errorf("%s Init(): failed to decode JSON config: %w", m.name, err)
} }
} }
m.meta = map[string]string{ m.meta = map[string]string{

View File

@@ -8,6 +8,7 @@
package collectors package collectors
import ( import (
"bytes"
"encoding/json" "encoding/json"
"fmt" "fmt"
"os" "os"
@@ -17,14 +18,13 @@ import (
"strings" "strings"
"time" "time"
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
lp "github.com/ClusterCockpit/cc-lib/v2/ccMessage" lp "github.com/ClusterCockpit/cc-lib/v2/ccMessage"
) )
// These are the fields we read from the JSON configuration // These are the fields we read from the JSON configuration
type NfsIOStatCollectorConfig struct { type NfsIOStatCollectorConfig struct {
ExcludeMetrics []string `json:"exclude_metrics,omitempty"` ExcludeMetrics []string `json:"exclude_metrics,omitempty"`
ExcludeFilesystem []string `json:"exclude_filesystem,omitempty"` ExcludeFilesystems []string `json:"exclude_filesystem,omitempty"`
UseServerAddressAsSType bool `json:"use_server_as_stype,omitempty"` UseServerAddressAsSType bool `json:"use_server_as_stype,omitempty"`
SendAbsoluteValues bool `json:"send_abs_values"` SendAbsoluteValues bool `json:"send_abs_values"`
SendDerivedValues bool `json:"send_derived_values"` SendDerivedValues bool `json:"send_derived_values"`
@@ -75,7 +75,7 @@ func (m *NfsIOStatCollector) readNfsiostats() map[string]map[string]int64 {
// Is this a device line with mount point, remote target and NFS version? // Is this a device line with mount point, remote target and NFS version?
dev := resolve_regex_fields(l, deviceRegex) dev := resolve_regex_fields(l, deviceRegex)
if len(dev) > 0 { if len(dev) > 0 {
if !slices.Contains(m.config.ExcludeFilesystem, dev[m.key]) { if !slices.Contains(m.config.ExcludeFilesystems, dev[m.key]) {
current = dev current = dev
if len(current["version"]) == 0 { if len(current["version"]) == 0 {
current["version"] = "3" current["version"] = "3"
@@ -104,7 +104,6 @@ func (m *NfsIOStatCollector) readNfsiostats() map[string]map[string]int64 {
} }
func (m *NfsIOStatCollector) Init(config json.RawMessage) error { func (m *NfsIOStatCollector) Init(config json.RawMessage) error {
var err error = nil
m.name = "NfsIOStatCollector" m.name = "NfsIOStatCollector"
if err := m.setup(); err != nil { if err := m.setup(); err != nil {
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err) return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
@@ -117,10 +116,10 @@ func (m *NfsIOStatCollector) Init(config json.RawMessage) error {
m.config.SendAbsoluteValues = true m.config.SendAbsoluteValues = true
m.config.SendDerivedValues = false m.config.SendDerivedValues = false
if len(config) > 0 { if len(config) > 0 {
err = json.Unmarshal(config, &m.config) d := json.NewDecoder(bytes.NewReader(config))
if err != nil { d.DisallowUnknownFields()
cclog.ComponentError(m.name, "Error reading config:", err.Error()) if err := d.Decode(&m.config); err != nil {
return err return fmt.Errorf("%s Init(): failed to decode JSON config: %w", m.name, err)
} }
} }
m.key = "mntpoint" m.key = "mntpoint"
@@ -130,7 +129,7 @@ func (m *NfsIOStatCollector) Init(config json.RawMessage) error {
m.data = m.readNfsiostats() m.data = m.readNfsiostats()
m.lastTimestamp = time.Now() m.lastTimestamp = time.Now()
m.init = true m.init = true
return err return nil
} }
func (m *NfsIOStatCollector) Read(interval time.Duration, output chan lp.CCMessage) { func (m *NfsIOStatCollector) Read(interval time.Duration, output chan lp.CCMessage) {

View File

@@ -16,7 +16,7 @@ hugo_path: docs/reference/cc-metric-collector/collectors/nfsio.md
"exclude_metrics": [ "exclude_metrics": [
"oread", "pageread" "oread", "pageread"
], ],
"exclude_filesystems": [ "exclude_filesystem": [
"/mnt" "/mnt"
], ],
"use_server_as_stype": false, "use_server_as_stype": false,

View File

@@ -2,6 +2,7 @@ package collectors
import ( import (
"bufio" "bufio"
"bytes"
"encoding/json" "encoding/json"
"fmt" "fmt"
"os" "os"
@@ -83,9 +84,10 @@ func (m *NUMAStatsCollector) Init(config json.RawMessage) error {
m.config.SendAbsoluteValues = true m.config.SendAbsoluteValues = true
if len(config) > 0 { if len(config) > 0 {
err := json.Unmarshal(config, &m.config) d := json.NewDecoder(bytes.NewReader(config))
if err != nil { d.DisallowUnknownFields()
return fmt.Errorf("%s Init(): unable to unmarshal numastat configuration: %w", m.name, err) if err := d.Decode(&m.config); err != nil {
return fmt.Errorf("%s Init(): Error decoding JSON config: %w", m.name, err)
} }
} }

View File

@@ -72,9 +72,10 @@ func (m *NvidiaCollector) Init(config json.RawMessage) error {
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err) return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
} }
if len(config) > 0 { if len(config) > 0 {
err = json.Unmarshal(config, &m.config) d := json.NewDecoder(strings.NewReader(string(config)))
if err != nil { d.DisallowUnknownFields()
return err if err = d.Decode(&m.config); err != nil {
return fmt.Errorf("%s Init(): Error decoding JSON config: %w", m.name, err)
} }
} }
m.meta = map[string]string{ m.meta = map[string]string{
@@ -90,22 +91,18 @@ func (m *NvidiaCollector) Init(config json.RawMessage) error {
// Error: NVML library not found // Error: NVML library not found
// (nvml.ErrorString can not be used in this case) // (nvml.ErrorString can not be used in this case)
if ret == nvml.ERROR_LIBRARY_NOT_FOUND { if ret == nvml.ERROR_LIBRARY_NOT_FOUND {
err = fmt.Errorf("NVML library not found") return fmt.Errorf("%s Init(): NVML library not found", m.name)
cclog.ComponentError(m.name, err.Error())
return err
} }
if ret != nvml.SUCCESS { if ret != nvml.SUCCESS {
err = errors.New(nvml.ErrorString(ret)) err = errors.New(nvml.ErrorString(ret))
cclog.ComponentError(m.name, "Unable to initialize NVML", err.Error()) return fmt.Errorf("%s Init(): Unable to initialize NVML: %w", m.name, err)
return err
} }
// Number of NVIDIA GPUs // Number of NVIDIA GPUs
num_gpus, ret := nvml.DeviceGetCount() num_gpus, ret := nvml.DeviceGetCount()
if ret != nvml.SUCCESS { if ret != nvml.SUCCESS {
err = errors.New(nvml.ErrorString(ret)) err = errors.New(nvml.ErrorString(ret))
cclog.ComponentError(m.name, "Unable to get device count", err.Error()) return fmt.Errorf("%s Init(): Unable to get device count: %w", m.name, err)
return err
} }
// For all GPUs // For all GPUs

View File

@@ -8,6 +8,7 @@
package collectors package collectors
import ( import (
"bytes"
"encoding/json" "encoding/json"
"fmt" "fmt"
"os" "os"
@@ -67,10 +68,10 @@ func (m *RAPLCollector) Init(config json.RawMessage) error {
// Read in the JSON configuration // Read in the JSON configuration
if len(config) > 0 { if len(config) > 0 {
err := json.Unmarshal(config, &m.config) d := json.NewDecoder(bytes.NewReader(config))
if err != nil { d.DisallowUnknownFields()
cclog.ComponentError(m.name, "Error reading config:", err.Error()) if err := d.Decode(&m.config); err != nil {
return err return fmt.Errorf("%s Init(): Error decoding JSON config: %w", m.name, err)
} }
} }

View File

@@ -8,8 +8,8 @@
package collectors package collectors
import ( import (
"bytes"
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"slices" "slices"
"strconv" "strconv"
@@ -51,7 +51,6 @@ type RocmSmiCollector struct {
// Called once by the collector manager // Called once by the collector manager
// All tags, meta data tags and metrics that do not change over the runtime should be set here // All tags, meta data tags and metrics that do not change over the runtime should be set here
func (m *RocmSmiCollector) Init(config json.RawMessage) error { func (m *RocmSmiCollector) Init(config json.RawMessage) error {
var err error = nil
// Always set the name early in Init() to use it in cclog.Component* functions // Always set the name early in Init() to use it in cclog.Component* functions
m.name = "RocmSmiCollector" m.name = "RocmSmiCollector"
// This is for later use, also call it early // This is for later use, also call it early
@@ -60,25 +59,21 @@ func (m *RocmSmiCollector) Init(config json.RawMessage) error {
} }
// Read in the JSON configuration // Read in the JSON configuration
if len(config) > 0 { if len(config) > 0 {
err = json.Unmarshal(config, &m.config) d := json.NewDecoder(bytes.NewReader(config))
if err != nil { d.DisallowUnknownFields()
cclog.ComponentError(m.name, "Error reading config:", err.Error()) if err := d.Decode(&m.config); err != nil {
return err return fmt.Errorf("%s Init(): Error decoding JSON config: %w", m.name, err)
} }
} }
ret := rocm_smi.Init() ret := rocm_smi.Init()
if ret != rocm_smi.STATUS_SUCCESS { if ret != rocm_smi.STATUS_SUCCESS {
err = errors.New("failed to initialize ROCm SMI library") return fmt.Errorf("%s Init(): failed to initialize ROCm SMI library", m.name)
cclog.ComponentError(m.name, err.Error())
return err
} }
numDevs, ret := rocm_smi.NumMonitorDevices() numDevs, ret := rocm_smi.NumMonitorDevices()
if ret != rocm_smi.STATUS_SUCCESS { if ret != rocm_smi.STATUS_SUCCESS {
err = errors.New("failed to get number of GPUs from ROCm SMI library") return fmt.Errorf("%s Init(): failed to get number of GPUs from ROCm SMI library", m.name)
cclog.ComponentError(m.name, err.Error())
return err
} }
m.devices = make([]RocmSmiCollectorDevice, 0) m.devices = make([]RocmSmiCollectorDevice, 0)
@@ -90,16 +85,12 @@ func (m *RocmSmiCollector) Init(config json.RawMessage) error {
} }
device, ret := rocm_smi.DeviceGetHandleByIndex(i) device, ret := rocm_smi.DeviceGetHandleByIndex(i)
if ret != rocm_smi.STATUS_SUCCESS { if ret != rocm_smi.STATUS_SUCCESS {
err = fmt.Errorf("failed to get handle for GPU %d", i) return fmt.Errorf("%s Init(): failed to get get handle for GPU %d", m.name, i)
cclog.ComponentError(m.name, err.Error())
return err
} }
pciInfo, ret := rocm_smi.DeviceGetPciInfo(device) pciInfo, ret := rocm_smi.DeviceGetPciInfo(device)
if ret != rocm_smi.STATUS_SUCCESS { if ret != rocm_smi.STATUS_SUCCESS {
err = fmt.Errorf("failed to get PCI information for GPU %d", i) return fmt.Errorf("%s Init(): failed to get PCI information for GPU %d", m.name, i)
cclog.ComponentError(m.name, err.Error())
return err
} }
pciId := fmt.Sprintf( pciId := fmt.Sprintf(
@@ -149,7 +140,7 @@ func (m *RocmSmiCollector) Init(config json.RawMessage) error {
// Set this flag only if everything is initialized properly, all required files exist, ... // Set this flag only if everything is initialized properly, all required files exist, ...
m.init = true m.init = true
return err return nil
} }
// Read collects all metrics belonging to the sample collector // Read collects all metrics belonging to the sample collector

View File

@@ -8,11 +8,11 @@
package collectors package collectors
import ( import (
"bytes"
"encoding/json" "encoding/json"
"fmt" "fmt"
"time" "time"
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
lp "github.com/ClusterCockpit/cc-lib/v2/ccMessage" lp "github.com/ClusterCockpit/cc-lib/v2/ccMessage"
) )
@@ -52,7 +52,10 @@ func (m *SampleCollector) Init(config json.RawMessage) error {
m.parallel = true m.parallel = true
// Define meta information sent with each metric // Define meta information sent with each metric
// (Can also be dynamic or this is the basic set with extension through AddMeta()) // (Can also be dynamic or this is the basic set with extension through AddMeta())
m.meta = map[string]string{"source": m.name, "group": "SAMPLE"} m.meta = map[string]string{
"source": m.name,
"group": "SAMPLE",
}
// Define tags sent with each metric // Define tags sent with each metric
// The 'type' tag is always needed, it defines the granularity of the metric // The 'type' tag is always needed, it defines the granularity of the metric
// node -> whole system // node -> whole system
@@ -63,13 +66,15 @@ func (m *SampleCollector) Init(config json.RawMessage) error {
// core -> single CPU core that may consist of multiple hardware threads (SMT) (requires core ID as 'type-id' tag) // core -> single CPU core that may consist of multiple hardware threads (SMT) (requires core ID as 'type-id' tag)
// hwthtread -> single CPU hardware thread (requires hardware thread ID as 'type-id' tag) // hwthtread -> single CPU hardware thread (requires hardware thread ID as 'type-id' tag)
// accelerator -> A accelerator device like GPU or FPGA (requires an accelerator ID as 'type-id' tag) // accelerator -> A accelerator device like GPU or FPGA (requires an accelerator ID as 'type-id' tag)
m.tags = map[string]string{"type": "node"} m.tags = map[string]string{
"type": "node",
}
// Read in the JSON configuration // Read in the JSON configuration
if len(config) > 0 { if len(config) > 0 {
err = json.Unmarshal(config, &m.config) d := json.NewDecoder(bytes.NewReader(config))
if err != nil { d.DisallowUnknownFields()
cclog.ComponentError(m.name, "Error reading config:", err.Error()) if err := d.Decode(&m.config); err != nil {
return err return fmt.Errorf("%s Init(): Error decoding JSON config: %w", m.name, err)
} }
} }
@@ -96,7 +101,7 @@ func (m *SampleCollector) Read(interval time.Duration, output chan lp.CCMessage)
// stop := readState() // stop := readState()
// value = (stop - start) / interval.Seconds() // value = (stop - start) / interval.Seconds()
y, err := lp.NewMessage("sample_metric", m.tags, m.meta, map[string]any{"value": value}, timestamp) y, err := lp.NewMetric("sample_metric", m.tags, m.meta, value, timestamp)
if err == nil { if err == nil {
// Send it to output channel // Send it to output channel
output <- y output <- y

View File

@@ -8,6 +8,7 @@
package collectors package collectors
import ( import (
"bytes"
"encoding/json" "encoding/json"
"fmt" "fmt"
"sync" "sync"
@@ -47,26 +48,30 @@ func (m *SampleTimerCollector) Init(name string, config json.RawMessage) error {
} }
// Define meta information sent with each metric // Define meta information sent with each metric
// (Can also be dynamic or this is the basic set with extension through AddMeta()) // (Can also be dynamic or this is the basic set with extension through AddMeta())
m.meta = map[string]string{"source": m.name, "group": "SAMPLE"} m.meta = map[string]string{
"source": m.name,
"group": "SAMPLE",
}
// Define tags sent with each metric // Define tags sent with each metric
// The 'type' tag is always needed, it defines the granularity of the metric // The 'type' tag is always needed, it defines the granularity of the metric
// node -> whole system // node -> whole system
// socket -> CPU socket (requires socket ID as 'type-id' tag) // socket -> CPU socket (requires socket ID as 'type-id' tag)
// cpu -> single CPU hardware thread (requires cpu ID as 'type-id' tag) // cpu -> single CPU hardware thread (requires cpu ID as 'type-id' tag)
m.tags = map[string]string{"type": "node"} m.tags = map[string]string{
"type": "node",
}
// Read in the JSON configuration // Read in the JSON configuration
if len(config) > 0 { if len(config) > 0 {
err = json.Unmarshal(config, &m.config) d := json.NewDecoder(bytes.NewReader(config))
if err != nil { d.DisallowUnknownFields()
cclog.ComponentError(m.name, "Error reading config:", err.Error()) if err := d.Decode(&m.config); err != nil {
return err return fmt.Errorf("%s Init(): error decoding JSON config: %w", m.name, err)
} }
} }
// Parse the read interval duration // Parse the read interval duration
m.interval, err = time.ParseDuration(m.config.Interval) m.interval, err = time.ParseDuration(m.config.Interval)
if err != nil { if err != nil {
cclog.ComponentError(m.name, "Error parsing interval:", err.Error()) return fmt.Errorf("%s Init(): error parsing interval: %w", m.name, err)
return err
} }
// Storage for output channel // Storage for output channel
@@ -77,13 +82,11 @@ func (m *SampleTimerCollector) Init(name string, config json.RawMessage) error {
m.ticker = time.NewTicker(m.interval) m.ticker = time.NewTicker(m.interval)
// Start the timer loop with return functionality by sending 'true' to the done channel // Start the timer loop with return functionality by sending 'true' to the done channel
m.wg.Add(1) m.wg.Go(func() {
go func() {
select { select {
case <-m.done: case <-m.done:
// Exit the timer loop // Exit the timer loop
cclog.ComponentDebug(m.name, "Closing...") cclog.ComponentDebug(m.name, "Closing...")
m.wg.Done()
return return
case timestamp := <-m.ticker.C: case timestamp := <-m.ticker.C:
// This is executed every timer tick but we have to wait until the first // This is executed every timer tick but we have to wait until the first
@@ -92,7 +95,7 @@ func (m *SampleTimerCollector) Init(name string, config json.RawMessage) error {
m.ReadMetrics(timestamp) m.ReadMetrics(timestamp)
} }
} }
}() })
// Set this flag only if everything is initialized properly, all required files exist, ... // Set this flag only if everything is initialized properly, all required files exist, ...
m.init = true m.init = true
@@ -111,7 +114,7 @@ func (m *SampleTimerCollector) ReadMetrics(timestamp time.Time) {
// stop := readState() // stop := readState()
// value = (stop - start) / interval.Seconds() // value = (stop - start) / interval.Seconds()
y, err := lp.NewMessage("sample_metric", m.tags, m.meta, map[string]any{"value": value}, timestamp) y, err := lp.NewMetric("sample_metric", m.tags, m.meta, value, timestamp)
if err == nil && m.output != nil { if err == nil && m.output != nil {
// Send it to output channel if we have a valid channel // Send it to output channel if we have a valid channel
m.output <- y m.output <- y

View File

@@ -9,6 +9,7 @@ package collectors
import ( import (
"bufio" "bufio"
"bytes"
"encoding/json" "encoding/json"
"fmt" "fmt"
"os" "os"
@@ -66,8 +67,10 @@ func (m *SchedstatCollector) Init(config json.RawMessage) error {
// Read in the JSON configuration // Read in the JSON configuration
if len(config) > 0 { if len(config) > 0 {
if err := json.Unmarshal(config, &m.config); err != nil { d := json.NewDecoder(bytes.NewReader(config))
return fmt.Errorf("%s Init(): Error reading config: %w", m.name, err) d.DisallowUnknownFields()
if err := d.Decode(&m.config); err != nil {
return fmt.Errorf("%s Init(): failed to decode JSON config: %w", m.name, err)
} }
} }
@@ -124,7 +127,7 @@ func (m *SchedstatCollector) ParseProcLine(linefields []string, tags map[string]
m.olddata[linefields[0]]["waiting"] = waiting m.olddata[linefields[0]]["waiting"] = waiting
value := l_running + l_waiting value := l_running + l_waiting
y, err := lp.NewMessage("cpu_load_core", tags, m.meta, map[string]any{"value": value}, now) y, err := lp.NewMetric("cpu_load_core", tags, m.meta, value, now)
if err == nil { if err == nil {
// Send it to output channel // Send it to output channel
output <- y output <- y

View File

@@ -8,13 +8,13 @@
package collectors package collectors
import ( import (
"bytes"
"encoding/json" "encoding/json"
"fmt" "fmt"
"runtime" "runtime"
"syscall" "syscall"
"time" "time"
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
lp "github.com/ClusterCockpit/cc-lib/v2/ccMessage" lp "github.com/ClusterCockpit/cc-lib/v2/ccMessage"
) )
@@ -40,13 +40,18 @@ func (m *SelfCollector) Init(config json.RawMessage) error {
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err) return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
} }
m.parallel = true m.parallel = true
m.meta = map[string]string{"source": m.name, "group": "Self"} m.meta = map[string]string{
m.tags = map[string]string{"type": "node"} "source": m.name,
"group": "Self",
}
m.tags = map[string]string{
"type": "node",
}
if len(config) > 0 { if len(config) > 0 {
err = json.Unmarshal(config, &m.config) d := json.NewDecoder(bytes.NewReader(config))
if err != nil { d.DisallowUnknownFields()
cclog.ComponentError(m.name, "Error reading config:", err.Error()) if err := d.Decode(&m.config); err != nil {
return err return fmt.Errorf("%s Init(): Error decoding JSON config: %w", m.name, err)
} }
} }
m.init = true m.init = true
@@ -60,49 +65,49 @@ func (m *SelfCollector) Read(interval time.Duration, output chan lp.CCMessage) {
var memstats runtime.MemStats var memstats runtime.MemStats
runtime.ReadMemStats(&memstats) runtime.ReadMemStats(&memstats)
y, err := lp.NewMessage("total_alloc", m.tags, m.meta, map[string]any{"value": memstats.TotalAlloc}, timestamp) y, err := lp.NewMetric("total_alloc", m.tags, m.meta, memstats.TotalAlloc, timestamp)
if err == nil { if err == nil {
y.AddMeta("unit", "Bytes") y.AddMeta("unit", "Bytes")
output <- y output <- y
} }
y, err = lp.NewMessage("heap_alloc", m.tags, m.meta, map[string]any{"value": memstats.HeapAlloc}, timestamp) y, err = lp.NewMetric("heap_alloc", m.tags, m.meta, memstats.HeapAlloc, timestamp)
if err == nil { if err == nil {
y.AddMeta("unit", "Bytes") y.AddMeta("unit", "Bytes")
output <- y output <- y
} }
y, err = lp.NewMessage("heap_sys", m.tags, m.meta, map[string]any{"value": memstats.HeapSys}, timestamp) y, err = lp.NewMetric("heap_sys", m.tags, m.meta, memstats.HeapSys, timestamp)
if err == nil { if err == nil {
y.AddMeta("unit", "Bytes") y.AddMeta("unit", "Bytes")
output <- y output <- y
} }
y, err = lp.NewMessage("heap_idle", m.tags, m.meta, map[string]any{"value": memstats.HeapIdle}, timestamp) y, err = lp.NewMetric("heap_idle", m.tags, m.meta, memstats.HeapIdle, timestamp)
if err == nil { if err == nil {
y.AddMeta("unit", "Bytes") y.AddMeta("unit", "Bytes")
output <- y output <- y
} }
y, err = lp.NewMessage("heap_inuse", m.tags, m.meta, map[string]any{"value": memstats.HeapInuse}, timestamp) y, err = lp.NewMetric("heap_inuse", m.tags, m.meta, memstats.HeapInuse, timestamp)
if err == nil { if err == nil {
y.AddMeta("unit", "Bytes") y.AddMeta("unit", "Bytes")
output <- y output <- y
} }
y, err = lp.NewMessage("heap_released", m.tags, m.meta, map[string]any{"value": memstats.HeapReleased}, timestamp) y, err = lp.NewMetric("heap_released", m.tags, m.meta, memstats.HeapReleased, timestamp)
if err == nil { if err == nil {
y.AddMeta("unit", "Bytes") y.AddMeta("unit", "Bytes")
output <- y output <- y
} }
y, err = lp.NewMessage("heap_objects", m.tags, m.meta, map[string]any{"value": memstats.HeapObjects}, timestamp) y, err = lp.NewMetric("heap_objects", m.tags, m.meta, memstats.HeapObjects, timestamp)
if err == nil { if err == nil {
output <- y output <- y
} }
} }
if m.config.GoRoutines { if m.config.GoRoutines {
y, err := lp.NewMessage("num_goroutines", m.tags, m.meta, map[string]any{"value": runtime.NumGoroutine()}, timestamp) y, err := lp.NewMetric("num_goroutines", m.tags, m.meta, runtime.NumGoroutine(), timestamp)
if err == nil { if err == nil {
output <- y output <- y
} }
} }
if m.config.CgoCalls { if m.config.CgoCalls {
y, err := lp.NewMessage("num_cgo_calls", m.tags, m.meta, map[string]any{"value": runtime.NumCgoCall()}, timestamp) y, err := lp.NewMetric("num_cgo_calls", m.tags, m.meta, runtime.NumCgoCall(), timestamp)
if err == nil { if err == nil {
output <- y output <- y
} }
@@ -113,35 +118,35 @@ func (m *SelfCollector) Read(interval time.Duration, output chan lp.CCMessage) {
if err == nil { if err == nil {
sec, nsec := rusage.Utime.Unix() sec, nsec := rusage.Utime.Unix()
t := float64(sec) + (float64(nsec) * 1e-9) t := float64(sec) + (float64(nsec) * 1e-9)
y, err := lp.NewMessage("rusage_user_time", m.tags, m.meta, map[string]any{"value": t}, timestamp) y, err := lp.NewMetric("rusage_user_time", m.tags, m.meta, t, timestamp)
if err == nil { if err == nil {
y.AddMeta("unit", "seconds") y.AddMeta("unit", "seconds")
output <- y output <- y
} }
sec, nsec = rusage.Stime.Unix() sec, nsec = rusage.Stime.Unix()
t = float64(sec) + (float64(nsec) * 1e-9) t = float64(sec) + (float64(nsec) * 1e-9)
y, err = lp.NewMessage("rusage_system_time", m.tags, m.meta, map[string]any{"value": t}, timestamp) y, err = lp.NewMetric("rusage_system_time", m.tags, m.meta, t, timestamp)
if err == nil { if err == nil {
y.AddMeta("unit", "seconds") y.AddMeta("unit", "seconds")
output <- y output <- y
} }
y, err = lp.NewMessage("rusage_vol_ctx_switch", m.tags, m.meta, map[string]any{"value": rusage.Nvcsw}, timestamp) y, err = lp.NewMetric("rusage_vol_ctx_switch", m.tags, m.meta, rusage.Nvcsw, timestamp)
if err == nil { if err == nil {
output <- y output <- y
} }
y, err = lp.NewMessage("rusage_invol_ctx_switch", m.tags, m.meta, map[string]any{"value": rusage.Nivcsw}, timestamp) y, err = lp.NewMetric("rusage_invol_ctx_switch", m.tags, m.meta, rusage.Nivcsw, timestamp)
if err == nil { if err == nil {
output <- y output <- y
} }
y, err = lp.NewMessage("rusage_signals", m.tags, m.meta, map[string]any{"value": rusage.Nsignals}, timestamp) y, err = lp.NewMetric("rusage_signals", m.tags, m.meta, rusage.Nsignals, timestamp)
if err == nil { if err == nil {
output <- y output <- y
} }
y, err = lp.NewMessage("rusage_major_pgfaults", m.tags, m.meta, map[string]any{"value": rusage.Majflt}, timestamp) y, err = lp.NewMetric("rusage_major_pgfaults", m.tags, m.meta, rusage.Majflt, timestamp)
if err == nil { if err == nil {
output <- y output <- y
} }
y, err = lp.NewMessage("rusage_minor_pgfaults", m.tags, m.meta, map[string]any{"value": rusage.Minflt}, timestamp) y, err = lp.NewMetric("rusage_minor_pgfaults", m.tags, m.meta, rusage.Minflt, timestamp)
if err == nil { if err == nil {
output <- y output <- y
} }

View File

@@ -119,8 +119,9 @@ func (m *SlurmCgroupCollector) Init(config json.RawMessage) error {
m.cgroupBase = defaultCgroupBase m.cgroupBase = defaultCgroupBase
if len(config) > 0 { if len(config) > 0 {
err = json.Unmarshal(config, &m.config) d := json.NewDecoder(strings.NewReader(string(config)))
if err != nil { d.DisallowUnknownFields()
if err = d.Decode(&m.config); err != nil {
return fmt.Errorf("%s Init(): Error reading JSON config: %w", m.name, err) return fmt.Errorf("%s Init(): Error reading JSON config: %w", m.name, err)
} }
m.excludeMetrics = make(map[string]struct{}) m.excludeMetrics = make(map[string]struct{})

View File

@@ -0,0 +1,360 @@
package collectors
import (
"bytes"
"encoding/json"
"fmt"
"os/exec"
"slices"
"time"
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
lp "github.com/ClusterCockpit/cc-lib/v2/ccMessage"
)
type SmartMonCollectorConfig struct {
UseSudo bool `json:"use_sudo,omitempty"`
ExcludeDevices []string `json:"exclude_devices,omitempty"`
ExcludeMetrics []string `json:"excludeMetrics,omitempty"`
Devices []struct {
Name string `json:"name"`
Type string `json:"type"`
} `json:"devices,omitempty"`
}
type deviceT struct {
Name string `json:"name"`
Type string `json:"type"`
queryCommand []string
}
type SmartMonCollector struct {
metricCollector
config SmartMonCollectorConfig // the configuration structure
meta map[string]string // default meta information
tags map[string]string // default tags
devices []deviceT // smartmon devices
sudoCmd string // Full path to 'sudo' command
smartCtlCmd string // Full path to 'smartctl' command
excludeMetric struct {
temp,
percentUsed,
availSpare,
dataUnitsRead,
dataUnitsWrite,
hostReads,
hostWrites,
powerCycles,
powerOn,
UnsafeShutdowns,
mediaErrors,
errlogEntries,
warnTempTime,
critCompTime bool
}
}
func (m *SmartMonCollector) getSmartmonDevices() error {
// Use configured devices
if len(m.config.Devices) > 0 {
for _, configDevice := range m.config.Devices {
if !slices.Contains(m.config.ExcludeDevices, configDevice.Name) {
d := deviceT{
Name: configDevice.Name,
Type: configDevice.Type,
}
if m.config.UseSudo {
d.queryCommand = append(d.queryCommand, m.sudoCmd)
}
d.queryCommand = append(d.queryCommand, m.smartCtlCmd, "--json=c", "--device="+d.Type, "--all", d.Name)
m.devices = append(m.devices, d)
}
}
return nil
}
// Use scan command
var scanCmd []string
if m.config.UseSudo {
scanCmd = append(scanCmd, m.sudoCmd)
}
scanCmd = append(scanCmd, m.smartCtlCmd, "--scan", "--json=c")
command := exec.Command(scanCmd[0], scanCmd[1:]...)
stdout, err := command.Output()
if err != nil {
return fmt.Errorf(
"%s getSmartmonDevices(): Failed to execute device scan command %s: %w",
m.name, command.String(), err)
}
var scanOutput struct {
Devices []deviceT `json:"devices"`
}
err = json.Unmarshal(stdout, &scanOutput)
if err != nil {
return fmt.Errorf("%s getSmartmonDevices(): Failed to parse JSON output from device scan command: %w",
m.name, err)
}
m.devices = make([]deviceT, 0)
for _, d := range scanOutput.Devices {
if !slices.Contains(m.config.ExcludeDevices, d.Name) {
if m.config.UseSudo {
d.queryCommand = append(d.queryCommand, m.sudoCmd)
}
d.queryCommand = append(d.queryCommand, m.smartCtlCmd, "--json=c", "--device="+d.Type, "--all", d.Name)
m.devices = append(m.devices, d)
}
}
return nil
}
func (m *SmartMonCollector) Init(config json.RawMessage) error {
m.name = "SmartMonCollector"
if err := m.setup(); err != nil {
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
}
m.parallel = true
m.meta = map[string]string{
"source": m.name,
"group": "Disk",
}
m.tags = map[string]string{
"type": "node",
"stype": "disk",
}
// Read in the JSON configuration
if len(config) > 0 {
d := json.NewDecoder(bytes.NewReader(config))
d.DisallowUnknownFields()
if err := d.Decode(&m.config); err != nil {
return fmt.Errorf("%s Init(): Error reading config: %w", m.name, err)
}
}
for _, excludeMetric := range m.config.ExcludeMetrics {
switch excludeMetric {
case "smartmon_temp":
m.excludeMetric.temp = true
case "smartmon_percent_used":
m.excludeMetric.percentUsed = true
case "smartmon_avail_spare":
m.excludeMetric.availSpare = true
case "smartmon_data_units_read":
m.excludeMetric.dataUnitsRead = true
case "smartmon_data_units_write":
m.excludeMetric.dataUnitsWrite = true
case "smartmon_host_reads":
m.excludeMetric.hostReads = true
case "smartmon_host_writes":
m.excludeMetric.hostWrites = true
case "smartmon_power_cycles":
m.excludeMetric.powerCycles = true
case "smartmon_power_on":
m.excludeMetric.powerOn = true
case "smartmon_unsafe_shutdowns":
m.excludeMetric.UnsafeShutdowns = true
case "smartmon_media_errors":
m.excludeMetric.mediaErrors = true
case "smartmon_errlog_entries":
m.excludeMetric.errlogEntries = true
case "smartmon_warn_temp_time":
m.excludeMetric.warnTempTime = true
case "smartmon_crit_comp_time":
m.excludeMetric.critCompTime = true
default:
return fmt.Errorf("%s Init(): Unknown excluded metric: %s", m.name, excludeMetric)
}
}
// Check if sudo and smartctl are in search path
if m.config.UseSudo {
p, err := exec.LookPath("sudo")
if err != nil {
return fmt.Errorf("%s Init(): No sudo command found in search path: %w", m.name, err)
}
m.sudoCmd = p
}
p, err := exec.LookPath("smartctl")
if err != nil {
return fmt.Errorf("%s Init(): No smartctl command found in search path: %w", m.name, err)
}
m.smartCtlCmd = p
if err = m.getSmartmonDevices(); err != nil {
return err
}
m.init = true
return err
}
type SmartMonData struct {
SerialNumber string `json:"serial_number"`
UserCapacity struct {
Blocks int `json:"blocks"`
Bytes int `json:"bytes"`
} `json:"user_capacity"`
HealthLog struct {
// Available SMART health information:
// sudo smartctl -a --json=c /dev/nvme0 | jq --color-output | less --RAW-CONTROL-CHARS
Temperature int `json:"temperature"`
PercentageUsed int `json:"percentage_used"`
AvailableSpare int `json:"available_spare"`
DataUnitsRead int `json:"data_units_read"`
DataUnitsWrite int `json:"data_units_written"`
HostReads int `json:"host_reads"`
HostWrites int `json:"host_writes"`
PowerCycles int `json:"power_cycles"`
PowerOnHours int `json:"power_on_hours"`
UnsafeShutdowns int `json:"unsafe_shutdowns"`
MediaErrors int `json:"media_errors"`
NumErrorLogEntries int `json:"num_err_log_entries"`
WarnTempTime int `json:"warning_temp_time"`
CriticalCompTime int `json:"critical_comp_time"`
} `json:"nvme_smart_health_information_log"`
}
func (m *SmartMonCollector) Read(interval time.Duration, output chan lp.CCMessage) {
timestamp := time.Now()
for _, d := range m.devices {
var data SmartMonData
command := exec.Command(d.queryCommand[0], d.queryCommand[1:]...)
stdout, err := command.Output()
if err != nil {
cclog.ComponentError(m.name, "cannot read data for device", d.Name)
continue
}
err = json.Unmarshal(stdout, &data)
if err != nil {
cclog.ComponentError(m.name, "cannot unmarshal data for device", d.Name)
continue
}
if !m.excludeMetric.temp {
y, err := lp.NewMetric(
"smartmon_temp", m.tags, m.meta, data.HealthLog.Temperature, timestamp)
if err == nil {
y.AddTag("stype-id", d.Name)
y.AddMeta("unit", "degC")
output <- y
}
}
if !m.excludeMetric.percentUsed {
y, err := lp.NewMetric(
"smartmon_percent_used", m.tags, m.meta, data.HealthLog.PercentageUsed, timestamp)
if err == nil {
y.AddTag("stype-id", d.Name)
y.AddMeta("unit", "percent")
output <- y
}
}
if !m.excludeMetric.availSpare {
y, err := lp.NewMetric(
"smartmon_avail_spare", m.tags, m.meta, data.HealthLog.AvailableSpare, timestamp)
if err == nil {
y.AddTag("stype-id", d.Name)
y.AddMeta("unit", "percent")
output <- y
}
}
if !m.excludeMetric.dataUnitsRead {
y, err := lp.NewMetric(
"smartmon_data_units_read", m.tags, m.meta, data.HealthLog.DataUnitsRead, timestamp)
if err == nil {
y.AddTag("stype-id", d.Name)
output <- y
}
}
if !m.excludeMetric.dataUnitsWrite {
y, err := lp.NewMetric(
"smartmon_data_units_write", m.tags, m.meta, data.HealthLog.DataUnitsWrite, timestamp)
if err == nil {
y.AddTag("stype-id", d.Name)
output <- y
}
}
if !m.excludeMetric.hostReads {
y, err := lp.NewMetric(
"smartmon_host_reads", m.tags, m.meta, data.HealthLog.HostReads, timestamp)
if err == nil {
y.AddTag("stype-id", d.Name)
output <- y
}
}
if !m.excludeMetric.hostWrites {
y, err := lp.NewMetric(
"smartmon_host_writes", m.tags, m.meta, data.HealthLog.HostWrites, timestamp)
if err == nil {
y.AddTag("stype-id", d.Name)
output <- y
}
}
if !m.excludeMetric.powerCycles {
y, err := lp.NewMetric(
"smartmon_power_cycles", m.tags, m.meta, data.HealthLog.PowerCycles, timestamp)
if err == nil {
y.AddTag("stype-id", d.Name)
output <- y
}
}
if !m.excludeMetric.powerOn {
y, err := lp.NewMetric(
"smartmon_power_on", m.tags, m.meta, int64(data.HealthLog.PowerOnHours)*3600, timestamp)
if err == nil {
y.AddTag("stype-id", d.Name)
y.AddMeta("unit", "sec")
output <- y
}
}
if !m.excludeMetric.UnsafeShutdowns {
y, err := lp.NewMetric(
"smartmon_unsafe_shutdowns", m.tags, m.meta, data.HealthLog.UnsafeShutdowns, timestamp)
if err == nil {
y.AddTag("stype-id", d.Name)
output <- y
}
}
if !m.excludeMetric.mediaErrors {
y, err := lp.NewMetric(
"smartmon_media_errors", m.tags, m.meta, data.HealthLog.MediaErrors, timestamp)
if err == nil {
y.AddTag("stype-id", d.Name)
output <- y
}
}
if !m.excludeMetric.errlogEntries {
y, err := lp.NewMetric(
"smartmon_errlog_entries", m.tags, m.meta, data.HealthLog.NumErrorLogEntries, timestamp)
if err == nil {
y.AddTag("stype-id", d.Name)
output <- y
}
}
if !m.excludeMetric.warnTempTime {
y, err := lp.NewMetric(
"smartmon_warn_temp_time", m.tags, m.meta, data.HealthLog.WarnTempTime, timestamp)
if err == nil {
y.AddTag("stype-id", d.Name)
output <- y
}
}
if !m.excludeMetric.critCompTime {
y, err := lp.NewMetric(
"smartmon_crit_comp_time", m.tags, m.meta, data.HealthLog.CriticalCompTime, timestamp)
if err == nil {
y.AddTag("stype-id", d.Name)
output <- y
}
}
}
}
func (m *SmartMonCollector) Close() {
m.init = false
}

View File

@@ -0,0 +1,52 @@
<!--
---
title: smartmon metric collector
description: Collect S.M.A.R.T data from NVMEs
categories: [cc-metric-collector]
tags: ['Admin']
weight: 2
hugo_path: docs/reference/cc-metric-collector/collectors/smartmonMetric.md
---
-->
## `smartmon` collector
```json
"smartmon": {
"use_sudo": true,
"exclude_devices": [
"/dev/sda"
],
"excludeMetrics": [
"smartmon_warn_temp_time",
"smartmon_crit_comp_time"
],
"devices": [
{
"name": "/dev/nvme0",
"type": "nvme"
}
]
}
```
The `smartmon` collector retrieves S.M.A.R.T data from NVMEs via command `smartctl`.
Available NVMEs can be either automatically detected by a device scan or manually added with the "devices" config option.
Metrics:
* `smartmon_temp`: Temperature of the device (`unit=degC`)
* `smartmon_avail_spare`: Amount of spare left (`unit=percent`)
* `smartmon_percent_used`: Percentage of the device is used (`unit=percent`)
* `smartmon_data_units_read`: Read data units
* `smartmon_data_units_write`: Written data units
* `smartmon_host_reads`: Read operations
* `smartmon_host_writes`: Write operations
* `smartmon_power_cycles`: Number of power cycles
* `smartmon_power_on`: Seconds the device is powered on (`unit=seconds`)
* `smartmon_unsafe_shutdowns`: Count of unsafe shutdowns
* `smartmon_media_errors`: Media errors of the device
* `smartmon_errlog_entries`: Error log entries
* `smartmon_warn_temp_time`: Time above the warning temperature threshold
* `smartmon_crit_comp_time`: Time above the critical composite temperature threshold

View File

@@ -8,6 +8,7 @@
package collectors package collectors
import ( import (
"bytes"
"encoding/json" "encoding/json"
"fmt" "fmt"
"os" "os"
@@ -63,9 +64,10 @@ func (m *TempCollector) Init(config json.RawMessage) error {
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err) return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
} }
if len(config) > 0 { if len(config) > 0 {
err := json.Unmarshal(config, &m.config) d := json.NewDecoder(bytes.NewReader(config))
if err != nil { d.DisallowUnknownFields()
return fmt.Errorf("%s Init(): failed to unmarshal JSON config: %w", m.name, err) if err := d.Decode(&m.config); err != nil {
return fmt.Errorf("%s Init(): Error decoding JSON config: %w", m.name, err)
} }
} }

View File

@@ -8,6 +8,7 @@
package collectors package collectors
import ( import (
"bytes"
"encoding/json" "encoding/json"
"fmt" "fmt"
"os/exec" "os/exec"
@@ -46,9 +47,10 @@ func (m *TopProcsCollector) Init(config json.RawMessage) error {
"group": "TopProcs", "group": "TopProcs",
} }
if len(config) > 0 { if len(config) > 0 {
err = json.Unmarshal(config, &m.config) d := json.NewDecoder(bytes.NewReader(config))
if err != nil { d.DisallowUnknownFields()
return fmt.Errorf("%s Init(): json.Unmarshal() failed: %w", m.name, err) if err := d.Decode(&m.config); err != nil {
return fmt.Errorf("%s Init(): Error decoding JSON config: %w", m.name, err)
} }
} else { } else {
m.config.Num_procs = int(DEFAULT_NUM_PROCS) m.config.Num_procs = int(DEFAULT_NUM_PROCS)

View File

@@ -34,8 +34,8 @@
}, },
"numastats": {}, "numastats": {},
"nvidia": {}, "nvidia": {},
"schedstat": { "schedstat": {},
}, "smartmon": {},
"tempstat": { "tempstat": {
"report_max_temperature": true, "report_max_temperature": true,
"report_critical_temperature": true, "report_critical_temperature": true,

View File

@@ -1,6 +1,6 @@
{ {
"process_messages" : { "process_messages" : {
"add_tag_if": [ "add_tags_if": [
{ {
"key" : "cluster", "key" : "cluster",
"value" : "testcluster", "value" : "testcluster",
@@ -12,7 +12,7 @@
"if" : "name == 'temp_package_id_0'" "if" : "name == 'temp_package_id_0'"
} }
], ],
"delete_tag_if": [ "delete_meta_if": [
{ {
"key" : "unit", "key" : "unit",
"if" : "true" "if" : "true"

17
go.mod
View File

@@ -3,14 +3,14 @@ module github.com/ClusterCockpit/cc-metric-collector
go 1.25.0 go 1.25.0
require ( require (
github.com/ClusterCockpit/cc-lib/v2 v2.7.0 github.com/ClusterCockpit/cc-lib/v2 v2.11.0
github.com/ClusterCockpit/go-rocm-smi v0.3.0 github.com/ClusterCockpit/go-rocm-smi v0.3.0
github.com/NVIDIA/go-nvml v0.13.0-1 github.com/NVIDIA/go-nvml v0.13.0-1
github.com/PaesslerAG/gval v1.2.4 github.com/PaesslerAG/gval v1.2.4
github.com/fsnotify/fsnotify v1.9.0 github.com/fsnotify/fsnotify v1.9.0
github.com/tklauser/go-sysconf v0.3.16 github.com/tklauser/go-sysconf v0.3.16
golang.design/x/thread v0.0.0-20210122121316-335e9adffdf1 golang.design/x/thread v0.0.0-20210122121316-335e9adffdf1
golang.org/x/sys v0.41.0 golang.org/x/sys v0.42.0
) )
require ( require (
@@ -28,18 +28,17 @@ require (
github.com/nats-io/nats.go v1.49.0 // indirect github.com/nats-io/nats.go v1.49.0 // indirect
github.com/nats-io/nkeys v0.4.15 // indirect github.com/nats-io/nkeys v0.4.15 // indirect
github.com/nats-io/nuid v1.0.1 // indirect github.com/nats-io/nuid v1.0.1 // indirect
github.com/oapi-codegen/runtime v1.2.0 // indirect github.com/oapi-codegen/runtime v1.3.0 // indirect
github.com/prometheus/client_golang v1.23.2 // indirect github.com/prometheus/client_golang v1.23.2 // indirect
github.com/prometheus/client_model v0.6.2 // indirect github.com/prometheus/client_model v0.6.2 // indirect
github.com/prometheus/common v0.67.5 // indirect github.com/prometheus/common v0.67.5 // indirect
github.com/prometheus/procfs v0.20.0 // indirect github.com/prometheus/procfs v0.20.1 // indirect
github.com/santhosh-tekuri/jsonschema/v5 v5.3.1 // indirect github.com/santhosh-tekuri/jsonschema/v5 v5.3.1 // indirect
github.com/shopspring/decimal v1.4.0 // indirect github.com/shopspring/decimal v1.4.0 // indirect
github.com/stmcginnis/gofish v0.21.3 // indirect github.com/stmcginnis/gofish v0.21.4 // indirect
github.com/tklauser/numcpus v0.11.0 // indirect github.com/tklauser/numcpus v0.11.0 // indirect
go.yaml.in/yaml/v2 v2.4.3 // indirect go.yaml.in/yaml/v2 v2.4.4 // indirect
golang.org/x/crypto v0.48.0 // indirect golang.org/x/crypto v0.49.0 // indirect
golang.org/x/exp v0.0.0-20260218203240-3dfff04db8fa // indirect golang.org/x/net v0.52.0 // indirect
golang.org/x/net v0.51.0 // indirect
google.golang.org/protobuf v1.36.11 // indirect google.golang.org/protobuf v1.36.11 // indirect
) )

37
go.sum
View File

@@ -1,5 +1,5 @@
github.com/ClusterCockpit/cc-lib/v2 v2.7.0 h1:EMTShk6rMTR1wlfmQ8SVCawH1OdltUbD3kVQmaW+5pE= github.com/ClusterCockpit/cc-lib/v2 v2.11.0 h1:LaLs4J0b7FArIXT8byMUcIcUr55R5obATjVi7qI02r4=
github.com/ClusterCockpit/cc-lib/v2 v2.7.0/go.mod h1:0Etx8WMs0lYZ4tiOQizY18CQop+2i3WROvU9rMUxHA4= github.com/ClusterCockpit/cc-lib/v2 v2.11.0/go.mod h1:Oj+N2lpFqiBOBzjfrLIGJ2YSWT400TX4M0ii4lNl81A=
github.com/ClusterCockpit/cc-line-protocol/v2 v2.4.0 h1:hIzxgTBWcmCIHtoDKDkSCsKCOCOwUC34sFsbD2wcW0Q= github.com/ClusterCockpit/cc-line-protocol/v2 v2.4.0 h1:hIzxgTBWcmCIHtoDKDkSCsKCOCOwUC34sFsbD2wcW0Q=
github.com/ClusterCockpit/cc-line-protocol/v2 v2.4.0/go.mod h1:y42qUu+YFmu5fdNuUAS4VbbIKxVjxCvbVqFdpdh8ahY= github.com/ClusterCockpit/cc-line-protocol/v2 v2.4.0/go.mod h1:y42qUu+YFmu5fdNuUAS4VbbIKxVjxCvbVqFdpdh8ahY=
github.com/ClusterCockpit/go-rocm-smi v0.3.0 h1:1qZnSpG7/NyLtc7AjqnUL9Jb8xtqG1nMVgp69rJfaR8= github.com/ClusterCockpit/go-rocm-smi v0.3.0 h1:1qZnSpG7/NyLtc7AjqnUL9Jb8xtqG1nMVgp69rJfaR8=
@@ -67,8 +67,8 @@ github.com/nats-io/nkeys v0.4.15 h1:JACV5jRVO9V856KOapQ7x+EY8Jo3qw1vJt/9Jpwzkk4=
github.com/nats-io/nkeys v0.4.15/go.mod h1:CpMchTXC9fxA5zrMo4KpySxNjiDVvr8ANOSZdiNfUrs= github.com/nats-io/nkeys v0.4.15/go.mod h1:CpMchTXC9fxA5zrMo4KpySxNjiDVvr8ANOSZdiNfUrs=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/oapi-codegen/runtime v1.2.0 h1:RvKc1CVS1QeKSNzO97FBQbSMZyQ8s6rZd+LpmzwHMP4= github.com/oapi-codegen/runtime v1.3.0 h1:vyK1zc0gDWWXgk2xoQa4+X4RNNc5SL2RbTpJS/4vMYA=
github.com/oapi-codegen/runtime v1.2.0/go.mod h1:Y7ZhmmlE8ikZOmuHRRndiIm7nf3xcVv+YMweKgG1DT0= github.com/oapi-codegen/runtime v1.3.0/go.mod h1:kOdeacKy7t40Rclb1je37ZLFboFxh+YLy0zaPCMibPY=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v1.23.2 h1:Je96obch5RDVy3FDMndoUsjAhG5Edi49h0RJWRi/o0o= github.com/prometheus/client_golang v1.23.2 h1:Je96obch5RDVy3FDMndoUsjAhG5Edi49h0RJWRi/o0o=
@@ -77,8 +77,8 @@ github.com/prometheus/client_model v0.6.2 h1:oBsgwpGs7iVziMvrGhE53c/GrLUsZdHnqNw
github.com/prometheus/client_model v0.6.2/go.mod h1:y3m2F6Gdpfy6Ut/GBsUqTWZqCUvMVzSfMLjcu6wAwpE= github.com/prometheus/client_model v0.6.2/go.mod h1:y3m2F6Gdpfy6Ut/GBsUqTWZqCUvMVzSfMLjcu6wAwpE=
github.com/prometheus/common v0.67.5 h1:pIgK94WWlQt1WLwAC5j2ynLaBRDiinoAb86HZHTUGI4= github.com/prometheus/common v0.67.5 h1:pIgK94WWlQt1WLwAC5j2ynLaBRDiinoAb86HZHTUGI4=
github.com/prometheus/common v0.67.5/go.mod h1:SjE/0MzDEEAyrdr5Gqc6G+sXI67maCxzaT3A2+HqjUw= github.com/prometheus/common v0.67.5/go.mod h1:SjE/0MzDEEAyrdr5Gqc6G+sXI67maCxzaT3A2+HqjUw=
github.com/prometheus/procfs v0.20.0 h1:AA7aCvjxwAquZAlonN7888f2u4IN8WVeFgBi4k82M4Q= github.com/prometheus/procfs v0.20.1 h1:XwbrGOIplXW/AU3YhIhLODXMJYyC1isLFfYCsTEycfc=
github.com/prometheus/procfs v0.20.0/go.mod h1:o9EMBZGRyvDrSPH1RqdxhojkuXstoe4UlK79eF5TGGo= github.com/prometheus/procfs v0.20.1/go.mod h1:o9EMBZGRyvDrSPH1RqdxhojkuXstoe4UlK79eF5TGGo=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
github.com/santhosh-tekuri/jsonschema/v5 v5.3.1 h1:lZUw3E0/J3roVtGQ+SCrUrg3ON6NgVqpn3+iol9aGu4= github.com/santhosh-tekuri/jsonschema/v5 v5.3.1 h1:lZUw3E0/J3roVtGQ+SCrUrg3ON6NgVqpn3+iol9aGu4=
@@ -87,8 +87,8 @@ github.com/shopspring/decimal v1.3.1/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFR
github.com/shopspring/decimal v1.4.0 h1:bxl37RwXBklmTi0C79JfXCEBD1cqqHt0bbgBAGFp81k= github.com/shopspring/decimal v1.4.0 h1:bxl37RwXBklmTi0C79JfXCEBD1cqqHt0bbgBAGFp81k=
github.com/shopspring/decimal v1.4.0/go.mod h1:gawqmDU56v4yIKSwfBSFip1HdCCXN8/+DMd9qYNcwME= github.com/shopspring/decimal v1.4.0/go.mod h1:gawqmDU56v4yIKSwfBSFip1HdCCXN8/+DMd9qYNcwME=
github.com/spkg/bom v0.0.0-20160624110644-59b7046e48ad/go.mod h1:qLr4V1qq6nMqFKkMo8ZTx3f+BZEkzsRUY10Xsm2mwU0= github.com/spkg/bom v0.0.0-20160624110644-59b7046e48ad/go.mod h1:qLr4V1qq6nMqFKkMo8ZTx3f+BZEkzsRUY10Xsm2mwU0=
github.com/stmcginnis/gofish v0.21.3 h1:EBLCHfORnbx7MPw7lplOOVe9QAD1T3XRVz6+a1Z4z5Q= github.com/stmcginnis/gofish v0.21.4 h1:daexK8sh31CgeSMkPUNs21HWHHA9ecCPJPyLCTxukCg=
github.com/stmcginnis/gofish v0.21.3/go.mod h1:PzF5i8ecRG9A2ol8XT64npKUunyraJ+7t0kYMpQAtqU= github.com/stmcginnis/gofish v0.21.4/go.mod h1:PzF5i8ecRG9A2ol8XT64npKUunyraJ+7t0kYMpQAtqU=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
@@ -99,25 +99,20 @@ github.com/tklauser/numcpus v0.11.0 h1:nSTwhKH5e1dMNsCdVBukSZrURJRoHbSEQjdEbY+9R
github.com/tklauser/numcpus v0.11.0/go.mod h1:z+LwcLq54uWZTX0u/bGobaV34u6V7KNlTZejzM6/3MQ= github.com/tklauser/numcpus v0.11.0/go.mod h1:z+LwcLq54uWZTX0u/bGobaV34u6V7KNlTZejzM6/3MQ=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.yaml.in/yaml/v2 v2.4.3 h1:6gvOSjQoTB3vt1l+CU+tSyi/HOjfOjRLJ4YwYZGwRO0= go.yaml.in/yaml/v2 v2.4.4 h1:tuyd0P+2Ont/d6e2rl3be67goVK4R6deVxCUX5vyPaQ=
go.yaml.in/yaml/v2 v2.4.3/go.mod h1:zSxWcmIDjOzPXpjlTTbAsKokqkDNAVtZO0WOMiT90s8= go.yaml.in/yaml/v2 v2.4.4/go.mod h1:gMZqIpDtDqOfM0uNfy0SkpRhvUryYH0Z6wdMYcacYXQ=
golang.design/x/thread v0.0.0-20210122121316-335e9adffdf1 h1:P7S/GeHBAFEZIYp0ePPs2kHXoazz8q2KsyxHyQVGCJg= golang.design/x/thread v0.0.0-20210122121316-335e9adffdf1 h1:P7S/GeHBAFEZIYp0ePPs2kHXoazz8q2KsyxHyQVGCJg=
golang.design/x/thread v0.0.0-20210122121316-335e9adffdf1/go.mod h1:9CWpnTUmlQkfdpdutA1nNf4iE5lAVt3QZOu0Z6hahBE= golang.design/x/thread v0.0.0-20210122121316-335e9adffdf1/go.mod h1:9CWpnTUmlQkfdpdutA1nNf4iE5lAVt3QZOu0Z6hahBE=
golang.org/x/crypto v0.48.0 h1:/VRzVqiRSggnhY7gNRxPauEQ5Drw9haKdM0jqfcCFts= golang.org/x/crypto v0.49.0 h1:+Ng2ULVvLHnJ/ZFEq4KdcDd/cfjrrjjNSXNzxg0Y4U4=
golang.org/x/crypto v0.48.0/go.mod h1:r0kV5h3qnFPlQnBSrULhlsRfryS2pmewsg+XfMgkVos= golang.org/x/crypto v0.49.0/go.mod h1:ErX4dUh2UM+CFYiXZRTcMpEcN8b/1gxEuv3nODoYtCA=
golang.org/x/exp v0.0.0-20260218203240-3dfff04db8fa h1:Zt3DZoOFFYkKhDT3v7Lm9FDMEV06GpzjG2jrqW+QTE0= golang.org/x/net v0.52.0 h1:He/TN1l0e4mmR3QqHMT2Xab3Aj3L9qjbhRm78/6jrW0=
golang.org/x/exp v0.0.0-20260218203240-3dfff04db8fa/go.mod h1:K79w1Vqn7PoiZn+TkNpx3BUWUQksGO3JcVX6qIjytmA= golang.org/x/net v0.52.0/go.mod h1:R1MAz7uMZxVMualyPXb+VaqGSa3LIaUqk0eEt3w36Sw=
golang.org/x/net v0.51.0 h1:94R/GTO7mt3/4wIKpcR5gkGmRLOuE/2hNGeWq/GBIFo=
golang.org/x/net v0.51.0/go.mod h1:aamm+2QF5ogm02fjy5Bb7CQ0WMt1/WVM7FtyaTLlA9Y=
golang.org/x/sys v0.0.0-20210122093101-04d7465088b8/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210122093101-04d7465088b8/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.41.0 h1:Ivj+2Cp/ylzLiEU89QhWblYnOE9zerudt9Ftecq2C6k= golang.org/x/sys v0.42.0 h1:omrd2nAlyT5ESRdCLYdm3+fMfNFE/+Rf4bDIQImRJeo=
golang.org/x/sys v0.41.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= golang.org/x/sys v0.42.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
golang.org/x/time v0.14.0 h1:MRx4UaLrDotUKUdCIqzPC48t1Y9hANFKIRpNx+Te8PI= golang.org/x/time v0.14.0 h1:MRx4UaLrDotUKUdCIqzPC48t1Y9hANFKIRpNx+Te8PI=
golang.org/x/time v0.14.0/go.mod h1:eL/Oa2bBBK0TkX57Fyni+NgnyQQN4LitPmob2Hjnqw4= golang.org/x/time v0.14.0/go.mod h1:eL/Oa2bBBK0TkX57Fyni+NgnyQQN4LitPmob2Hjnqw4=
google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE= google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE=
google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@@ -94,8 +94,7 @@ func (c *metricAggregator) Init(output chan lp.CCMessage) error {
// Set hostname // Set hostname
hostname, err := os.Hostname() hostname, err := os.Hostname()
if err != nil { if err != nil {
cclog.Error(err.Error()) return fmt.Errorf("metricAggregator: failed to get hostname: %w", err)
return err
} }
// Drop domain part of host name // Drop domain part of host name
c.constants["hostname"] = strings.SplitN(hostname, `.`, 2)[0] c.constants["hostname"] = strings.SplitN(hostname, `.`, 2)[0]

View File

@@ -8,6 +8,7 @@
package metricRouter package metricRouter
import ( import (
"fmt"
"sync" "sync"
"time" "time"
@@ -70,8 +71,7 @@ func (c *metricCache) Init(output chan lp.CCMessage, ticker mct.MultiChanTicker,
// The code is executed by the MetricCache goroutine // The code is executed by the MetricCache goroutine
c.aggEngine, err = agg.NewAggregator(c.output) c.aggEngine, err = agg.NewAggregator(c.output)
if err != nil { if err != nil {
cclog.ComponentError("MetricCache", "Cannot create aggregator") return fmt.Errorf("MetricCache: failed to create aggregator: %w", err)
return err
} }
return nil return nil

View File

@@ -8,6 +8,7 @@
package metricRouter package metricRouter
import ( import (
"bytes"
"encoding/json" "encoding/json"
"fmt" "fmt"
"maps" "maps"
@@ -46,7 +47,6 @@ type metricRouterConfig struct {
MaxForward int `json:"max_forward"` // Number of maximal forwarded metrics at one select MaxForward int `json:"max_forward"` // Number of maximal forwarded metrics at one select
NormalizeUnits bool `json:"normalize_units"` // Check unit meta flag and normalize it using cc-units NormalizeUnits bool `json:"normalize_units"` // Check unit meta flag and normalize it using cc-units
ChangeUnitPrefix map[string]string `json:"change_unit_prefix"` // Add prefix that should be applied to the metrics ChangeUnitPrefix map[string]string `json:"change_unit_prefix"` // Add prefix that should be applied to the metrics
// dropMetrics map[string]bool // Internal map for O(1) lookup
MessageProcessor json.RawMessage `json:"process_messages,omitempty"` MessageProcessor json.RawMessage `json:"process_messages,omitempty"`
} }
@@ -102,18 +102,17 @@ func (r *metricRouter) Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, rout
// Drop domain part of host name // Drop domain part of host name
r.hostname = strings.SplitN(hostname, `.`, 2)[0] r.hostname = strings.SplitN(hostname, `.`, 2)[0]
err = json.Unmarshal(routerConfig, &r.config) d := json.NewDecoder(bytes.NewReader(routerConfig))
if err != nil { d.DisallowUnknownFields()
cclog.ComponentError("MetricRouter", err.Error()) if err := d.Decode(&r.config); err != nil {
return err return fmt.Errorf("failed to decode metric router config: %w", err)
} }
r.maxForward = max(1, r.config.MaxForward) r.maxForward = max(1, r.config.MaxForward)
if r.config.NumCacheIntervals > 0 { if r.config.NumCacheIntervals > 0 {
r.cache, err = NewCache(r.cache_input, r.ticker, &r.cachewg, r.config.NumCacheIntervals) r.cache, err = NewCache(r.cache_input, r.ticker, &r.cachewg, r.config.NumCacheIntervals)
if err != nil { if err != nil {
cclog.ComponentError("MetricRouter", "MetricCache initialization failed:", err.Error()) return fmt.Errorf("MetricRouter: failed to initialize MetricCache: %w", err)
return err
} }
for _, agg := range r.config.IntervalAgg { for _, agg := range r.config.IntervalAgg {
err = r.cache.AddAggregation(agg.Name, agg.Function, agg.Condition, agg.Tags, agg.Meta) err = r.cache.AddAggregation(agg.Name, agg.Function, agg.Condition, agg.Tags, agg.Meta)