mirror of
https://github.com/ClusterCockpit/cc-metric-collector.git
synced 2026-03-18 14:27:30 +01:00
Compare commits
21 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1937ef2587 | ||
|
|
35510d3d39 | ||
|
|
ef5e4c2604 | ||
|
|
44401318e4 | ||
|
|
2e60d3111c | ||
|
|
e8734c02db | ||
|
|
54650d40a6 | ||
|
|
e7050834f5 | ||
|
|
893a0d69de | ||
|
|
345119866a | ||
|
|
ec917cf802 | ||
|
|
c7cfc0723b | ||
|
|
4f2685f4c4 | ||
|
|
439bfacfd9 | ||
|
|
cd4ac9c885 | ||
|
|
eeb60ba0df | ||
|
|
a481a34dcd | ||
|
|
b65576431e | ||
|
|
a927565868 | ||
|
|
0b67993eb0 | ||
|
|
4164e3d1a3 |
13
Makefile
13
Makefile
@@ -27,6 +27,17 @@ $(APP): $(GOSRC) go.mod
|
||||
$(GOBIN) get
|
||||
$(GOBIN) build -o $(APP) $(GOSRC_APP)
|
||||
|
||||
# -ldflags:
|
||||
# -s : drops the OS symbol table
|
||||
# -w : drops DWARF
|
||||
# -> Panic stack traces still show function names and file:line
|
||||
.PHONY: build-stripped
|
||||
build-stripped:
|
||||
make -C collectors
|
||||
$(GOBIN) get
|
||||
$(GOBIN) build -ldflags "-s -w" -trimpath -o $(APP) $(GOSRC_APP)
|
||||
|
||||
.PHONY: install
|
||||
install: $(APP)
|
||||
@WORKSPACE=$(PREFIX)
|
||||
@if [ -z "$${WORKSPACE}" ]; then exit 1; fi
|
||||
@@ -89,7 +100,7 @@ staticcheck:
|
||||
.PHONY: golangci-lint
|
||||
golangci-lint:
|
||||
$(GOBIN) install github.com/golangci/golangci-lint/v2/cmd/golangci-lint@latest
|
||||
$$($(GOBIN) env GOPATH)/bin/golangci-lint run
|
||||
$$($(GOBIN) env GOPATH)/bin/golangci-lint run --enable errorlint,govet,misspell,modernize,prealloc,staticcheck,unconvert,wastedassign
|
||||
|
||||
.ONESHELL:
|
||||
.PHONY: RPM
|
||||
|
||||
@@ -8,6 +8,7 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"flag"
|
||||
"os"
|
||||
@@ -48,22 +49,22 @@ type RuntimeConfig struct {
|
||||
Sync sync.WaitGroup
|
||||
}
|
||||
|
||||
// ReadCli reads the command line arguments
|
||||
func ReadCli() map[string]string {
|
||||
var m map[string]string
|
||||
cfg := flag.String("config", "./config.json", "Path to configuration file")
|
||||
logfile := flag.String("log", "stderr", "Path for logfile")
|
||||
once := flag.Bool("once", false, "Run all collectors only once")
|
||||
loglevel := flag.String("loglevel", "info", "Set log level")
|
||||
flag.Parse()
|
||||
m = make(map[string]string)
|
||||
m["configfile"] = *cfg
|
||||
m["logfile"] = *logfile
|
||||
m := map[string]string{
|
||||
"configfile": *cfg,
|
||||
"logfile": *logfile,
|
||||
"once": "false",
|
||||
"loglevel": *loglevel,
|
||||
}
|
||||
if *once {
|
||||
m["once"] = "true"
|
||||
} else {
|
||||
m["once"] = "false"
|
||||
}
|
||||
m["loglevel"] = *loglevel
|
||||
return m
|
||||
}
|
||||
|
||||
@@ -120,9 +121,10 @@ func mainFunc() int {
|
||||
|
||||
// Load and check configuration
|
||||
main := ccconf.GetPackageConfig("main")
|
||||
err = json.Unmarshal(main, &rcfg.ConfigFile)
|
||||
if err != nil {
|
||||
cclog.Error("Error reading configuration file ", rcfg.CliArgs["configfile"], ": ", err.Error())
|
||||
d := json.NewDecoder(bytes.NewReader(main))
|
||||
d.DisallowUnknownFields()
|
||||
if err := d.Decode(&rcfg.ConfigFile); err != nil {
|
||||
cclog.Errorf("Error reading configuration file %s: %v", rcfg.CliArgs["configfile"], err)
|
||||
return 1
|
||||
}
|
||||
|
||||
|
||||
@@ -59,6 +59,7 @@ In contrast to the configuration files for sinks and receivers, the collectors c
|
||||
* [ ] Aggreate metrics to higher topology entity (sum hwthread metrics to socket metric, ...). Needs to be configurable
|
||||
|
||||
# Contributing own collectors
|
||||
|
||||
A collector reads data from any source, parses it to metrics and submits these metrics to the `metric-collector`. A collector provides three function:
|
||||
|
||||
* `Name() string`: Return the name of the collector
|
||||
@@ -104,8 +105,10 @@ func (m *SampleCollector) Init(config json.RawMessage) error {
|
||||
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
|
||||
}
|
||||
if len(config) > 0 {
|
||||
if err := json.Unmarshal(config, &m.config); err != nil {
|
||||
return fmt.Errorf("%s Init(): json.Unmarshal() call failed: %w", m.name, err)
|
||||
d := json.NewDecoder(bytes.NewReader(config))
|
||||
d.DisallowUnknownFields()
|
||||
if err := d.Decode(&m.config); err != nil {
|
||||
return fmt.Errorf("%s Init(): Error decoding JSON config: %w", m.name, err)
|
||||
}
|
||||
}
|
||||
m.meta = map[string]string{"source": m.name, "group": "Sample"}
|
||||
|
||||
@@ -32,7 +32,7 @@ const DEFAULT_BEEGFS_CMD = "beegfs-ctl"
|
||||
type BeegfsMetaCollectorConfig struct {
|
||||
Beegfs string `json:"beegfs_path"`
|
||||
ExcludeMetrics []string `json:"exclude_metrics,omitempty"`
|
||||
ExcludeFilesystem []string `json:"exclude_filesystem"`
|
||||
ExcludeFilesystems []string `json:"exclude_filesystem"`
|
||||
}
|
||||
|
||||
type BeegfsMetaCollector struct {
|
||||
@@ -74,9 +74,10 @@ func (m *BeegfsMetaCollector) Init(config json.RawMessage) error {
|
||||
|
||||
// Read JSON configuration
|
||||
if len(config) > 0 {
|
||||
err := json.Unmarshal(config, &m.config)
|
||||
if err != nil {
|
||||
return err
|
||||
d := json.NewDecoder(bytes.NewReader(config))
|
||||
d.DisallowUnknownFields()
|
||||
if err := d.Decode(&m.config); err != nil {
|
||||
return fmt.Errorf("%s Init(): Failed to decode JSON config: %w", m.name, err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -99,23 +100,23 @@ func (m *BeegfsMetaCollector) Init(config json.RawMessage) error {
|
||||
"filesystem": "",
|
||||
}
|
||||
m.skipFS = make(map[string]struct{})
|
||||
for _, fs := range m.config.ExcludeFilesystem {
|
||||
for _, fs := range m.config.ExcludeFilesystems {
|
||||
m.skipFS[fs] = struct{}{}
|
||||
}
|
||||
|
||||
// Beegfs file system statistics can only be queried by user root
|
||||
user, err := user.Current()
|
||||
if err != nil {
|
||||
return fmt.Errorf("BeegfsMetaCollector.Init(): Failed to get current user: %w", err)
|
||||
return fmt.Errorf("%s Init(): Failed to get current user: %w", m.name, err)
|
||||
}
|
||||
if user.Uid != "0" {
|
||||
return fmt.Errorf("BeegfsMetaCollector.Init(): BeeGFS file system statistics can only be queried by user root")
|
||||
return fmt.Errorf("%s Init(): BeeGFS file system statistics can only be queried by user root", m.name)
|
||||
}
|
||||
|
||||
// Check if beegfs-ctl is in executable search path
|
||||
_, err = exec.LookPath(m.config.Beegfs)
|
||||
if err != nil {
|
||||
return fmt.Errorf("BeegfsMetaCollector.Init(): Failed to find beegfs-ctl binary '%s': %w", m.config.Beegfs, err)
|
||||
return fmt.Errorf("%s Init(): Failed to find beegfs-ctl binary '%s': %w", m.name, m.config.Beegfs, err)
|
||||
}
|
||||
m.init = true
|
||||
return nil
|
||||
|
||||
@@ -30,7 +30,7 @@ import (
|
||||
type BeegfsStorageCollectorConfig struct {
|
||||
Beegfs string `json:"beegfs_path"`
|
||||
ExcludeMetrics []string `json:"exclude_metrics,omitempty"`
|
||||
ExcludeFilesystem []string `json:"exclude_filesystem"`
|
||||
ExcludeFilesystems []string `json:"exclude_filesystem"`
|
||||
}
|
||||
|
||||
type BeegfsStorageCollector struct {
|
||||
@@ -67,9 +67,10 @@ func (m *BeegfsStorageCollector) Init(config json.RawMessage) error {
|
||||
|
||||
// Read JSON configuration
|
||||
if len(config) > 0 {
|
||||
err := json.Unmarshal(config, &m.config)
|
||||
if err != nil {
|
||||
return err
|
||||
d := json.NewDecoder(bytes.NewReader(config))
|
||||
d.DisallowUnknownFields()
|
||||
if err := d.Decode(&m.config); err != nil {
|
||||
return fmt.Errorf("%s Init(): failed to decode JSON config: %w", m.name, err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -92,23 +93,23 @@ func (m *BeegfsStorageCollector) Init(config json.RawMessage) error {
|
||||
"filesystem": "",
|
||||
}
|
||||
m.skipFS = make(map[string]struct{})
|
||||
for _, fs := range m.config.ExcludeFilesystem {
|
||||
for _, fs := range m.config.ExcludeFilesystems {
|
||||
m.skipFS[fs] = struct{}{}
|
||||
}
|
||||
|
||||
// Beegfs file system statistics can only be queried by user root
|
||||
user, err := user.Current()
|
||||
if err != nil {
|
||||
return fmt.Errorf("BeegfsStorageCollector.Init(): Failed to get current user: %w", err)
|
||||
return fmt.Errorf("%s Init(): Failed to get current user: %w", m.name, err)
|
||||
}
|
||||
if user.Uid != "0" {
|
||||
return fmt.Errorf("BeegfsStorageCollector.Init(): BeeGFS file system statistics can only be queried by user root")
|
||||
return fmt.Errorf("%s Init(): BeeGFS file system statistics can only be queried by user root", m.name)
|
||||
}
|
||||
|
||||
// Check if beegfs-ctl is in executable search path
|
||||
_, err = exec.LookPath(m.config.Beegfs)
|
||||
if err != nil {
|
||||
return fmt.Errorf("BeegfsStorageCollector.Init(): Failed to find beegfs-ctl binary '%s': %w", m.config.Beegfs, err)
|
||||
return fmt.Errorf("%s Init(): Failed to find beegfs-ctl binary '%s': %w", m.name, m.config.Beegfs, err)
|
||||
}
|
||||
m.init = true
|
||||
return nil
|
||||
|
||||
@@ -8,6 +8,7 @@
|
||||
package collectors
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"sync"
|
||||
@@ -48,6 +49,7 @@ var AvailableCollectors = map[string]MetricCollector{
|
||||
"schedstat": new(SchedstatCollector),
|
||||
"nfsiostat": new(NfsIOStatCollector),
|
||||
"slurm_cgroup": new(SlurmCgroupCollector),
|
||||
"smartmon": new(SmartMonCollector),
|
||||
}
|
||||
|
||||
// Metric collector manager data structure
|
||||
@@ -88,10 +90,10 @@ func (cm *collectorManager) Init(ticker mct.MultiChanTicker, duration time.Durat
|
||||
cm.ticker = ticker
|
||||
cm.duration = duration
|
||||
|
||||
err := json.Unmarshal(collectConfig, &cm.config)
|
||||
if err != nil {
|
||||
cclog.Error(err.Error())
|
||||
return err
|
||||
d := json.NewDecoder(bytes.NewReader(collectConfig))
|
||||
d.DisallowUnknownFields()
|
||||
if err := d.Decode(&cm.config); err != nil {
|
||||
return fmt.Errorf("%s Init(): Error decoding collector manager config: %w", "CollectorManager", err)
|
||||
}
|
||||
|
||||
// Initialize configured collectors
|
||||
@@ -102,7 +104,7 @@ func (cm *collectorManager) Init(ticker mct.MultiChanTicker, duration time.Durat
|
||||
}
|
||||
collector := AvailableCollectors[collectorName]
|
||||
|
||||
err = collector.Init(collectorCfg)
|
||||
err := collector.Init(collectorCfg)
|
||||
if err != nil {
|
||||
cclog.ComponentError("CollectorManager", fmt.Sprintf("Collector %s initialization failed: %v", collectorName, err))
|
||||
continue
|
||||
|
||||
@@ -12,7 +12,9 @@ hugo_path: docs/reference/cc-metric-collector/collectors/cpufreq_cpuinfo.md
|
||||
## `cpufreq_cpuinfo` collector
|
||||
|
||||
```json
|
||||
"cpufreq_cpuinfo": {}
|
||||
"cpufreq_cpuinfo": {
|
||||
"exclude_metrics": []
|
||||
}
|
||||
```
|
||||
|
||||
The `cpufreq_cpuinfo` collector reads the clock frequency from `/proc/cpuinfo` and outputs a handful **hwthread** metrics.
|
||||
|
||||
@@ -8,6 +8,7 @@
|
||||
package collectors
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
@@ -54,9 +55,10 @@ func (m *CPUFreqCollector) Init(config json.RawMessage) error {
|
||||
}
|
||||
m.parallel = true
|
||||
if len(config) > 0 {
|
||||
err := json.Unmarshal(config, &m.config)
|
||||
if err != nil {
|
||||
return err
|
||||
d := json.NewDecoder(bytes.NewReader(config))
|
||||
d.DisallowUnknownFields()
|
||||
if err := d.Decode(&m.config); err != nil {
|
||||
return fmt.Errorf("%s Init(): failed to decode JSON config: %w", m.name, err)
|
||||
}
|
||||
}
|
||||
m.meta = map[string]string{
|
||||
@@ -77,7 +79,7 @@ func (m *CPUFreqCollector) Init(config json.RawMessage) error {
|
||||
scalingCurFreqFile := filepath.Join("/sys/devices/system/cpu", fmt.Sprintf("cpu%d", c.CpuID), "cpufreq/scaling_cur_freq")
|
||||
err := unix.Access(scalingCurFreqFile, unix.R_OK)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to access file '%s': %w", scalingCurFreqFile, err)
|
||||
return fmt.Errorf("%s Init(): unable to access file '%s': %w", m.name, scalingCurFreqFile, err)
|
||||
}
|
||||
|
||||
m.topology = append(m.topology,
|
||||
|
||||
@@ -9,6 +9,7 @@ package collectors
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
@@ -53,9 +54,10 @@ func (m *CpustatCollector) Init(config json.RawMessage) error {
|
||||
"type": "node",
|
||||
}
|
||||
if len(config) > 0 {
|
||||
err := json.Unmarshal(config, &m.config)
|
||||
if err != nil {
|
||||
return err
|
||||
d := json.NewDecoder(bytes.NewReader(config))
|
||||
d.DisallowUnknownFields()
|
||||
if err := d.Decode(&m.config); err != nil {
|
||||
return fmt.Errorf("%s Init(): Error decoding JSON config: %w", m.name, err)
|
||||
}
|
||||
}
|
||||
matches := map[string]int{
|
||||
@@ -79,19 +81,10 @@ func (m *CpustatCollector) Init(config json.RawMessage) error {
|
||||
}
|
||||
|
||||
// Check input file
|
||||
file, err := os.Open(string(CPUSTATFILE))
|
||||
file, err := os.Open(CPUSTATFILE)
|
||||
if err != nil {
|
||||
cclog.ComponentError(
|
||||
m.name,
|
||||
fmt.Sprintf("Init(): Failed to open file '%s': %v", string(CPUSTATFILE), err))
|
||||
return fmt.Errorf("%s Init(): Failed to open file '%s': %w", m.name, CPUSTATFILE, err)
|
||||
}
|
||||
defer func() {
|
||||
if err := file.Close(); err != nil {
|
||||
cclog.ComponentError(
|
||||
m.name,
|
||||
fmt.Sprintf("Init(): Failed to close file '%s': %v", string(CPUSTATFILE), err))
|
||||
}
|
||||
}()
|
||||
|
||||
// Pre-generate tags for all CPUs
|
||||
num_cpus := 0
|
||||
@@ -120,6 +113,12 @@ func (m *CpustatCollector) Init(config json.RawMessage) error {
|
||||
num_cpus++
|
||||
}
|
||||
}
|
||||
|
||||
// Close file
|
||||
if err := file.Close(); err != nil {
|
||||
return fmt.Errorf("%s Init(): Failed to close file '%s': %w", m.name, CPUSTATFILE, err)
|
||||
}
|
||||
|
||||
m.lastTimestamp = time.Now()
|
||||
m.init = true
|
||||
return nil
|
||||
@@ -166,11 +165,11 @@ func (m *CpustatCollector) Read(interval time.Duration, output chan lp.CCMessage
|
||||
now := time.Now()
|
||||
tsdelta := now.Sub(m.lastTimestamp)
|
||||
|
||||
file, err := os.Open(string(CPUSTATFILE))
|
||||
file, err := os.Open(CPUSTATFILE)
|
||||
if err != nil {
|
||||
cclog.ComponentError(
|
||||
m.name,
|
||||
fmt.Sprintf("Read(): Failed to open file '%s': %v", string(CPUSTATFILE), err))
|
||||
fmt.Sprintf("Read(): Failed to open file '%s': %v", CPUSTATFILE, err))
|
||||
}
|
||||
defer func() {
|
||||
if err := file.Close(); err != nil {
|
||||
|
||||
@@ -8,8 +8,8 @@
|
||||
package collectors
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"os/exec"
|
||||
@@ -47,8 +47,10 @@ func (m *CustomCmdCollector) Init(config json.RawMessage) error {
|
||||
|
||||
// Read configuration
|
||||
if len(config) > 0 {
|
||||
if err := json.Unmarshal(config, &m.config); err != nil {
|
||||
return fmt.Errorf("%s Init(): json.Unmarshal() call failed: %w", m.name, err)
|
||||
d := json.NewDecoder(bytes.NewReader(config))
|
||||
d.DisallowUnknownFields()
|
||||
if err := d.Decode(&m.config); err != nil {
|
||||
return fmt.Errorf("%s Init(): Error decoding JSON config: %w", m.name, err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -82,7 +84,7 @@ func (m *CustomCmdCollector) Init(config json.RawMessage) error {
|
||||
}
|
||||
|
||||
if len(m.files) == 0 && len(m.cmdFieldsSlice) == 0 {
|
||||
return errors.New("no metrics to collect")
|
||||
return fmt.Errorf("%s Init(): no metrics to collect", m.name)
|
||||
}
|
||||
m.init = true
|
||||
return nil
|
||||
|
||||
@@ -9,6 +9,7 @@ package collectors
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
@@ -42,8 +43,10 @@ func (m *DiskstatCollector) Init(config json.RawMessage) error {
|
||||
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
|
||||
}
|
||||
if len(config) > 0 {
|
||||
if err := json.Unmarshal(config, &m.config); err != nil {
|
||||
return err
|
||||
d := json.NewDecoder(bytes.NewReader(config))
|
||||
d.DisallowUnknownFields()
|
||||
if err := d.Decode(&m.config); err != nil {
|
||||
return fmt.Errorf("%s Init(): Error decoding JSON config: %w", m.name, err)
|
||||
}
|
||||
}
|
||||
m.allowedMetrics = map[string]bool{
|
||||
|
||||
@@ -32,7 +32,7 @@ type GpfsCollectorState map[string]int64
|
||||
|
||||
type GpfsCollectorConfig struct {
|
||||
Mmpmon string `json:"mmpmon_path,omitempty"`
|
||||
ExcludeFilesystem []string `json:"exclude_filesystem,omitempty"`
|
||||
ExcludeFilesystems []string `json:"exclude_filesystem,omitempty"`
|
||||
ExcludeMetrics []string `json:"exclude_metrics,omitempty"`
|
||||
Sudo bool `json:"use_sudo,omitempty"`
|
||||
SendAbsoluteValues bool `json:"send_abs_values,omitempty"`
|
||||
@@ -322,9 +322,10 @@ func (m *GpfsCollector) Init(config json.RawMessage) error {
|
||||
|
||||
// Read JSON configuration
|
||||
if len(config) > 0 {
|
||||
err := json.Unmarshal(config, &m.config)
|
||||
if err != nil {
|
||||
return fmt.Errorf("%s Init(): failed to unmarshal JSON config: %w", m.name, err)
|
||||
d := json.NewDecoder(bytes.NewReader(config))
|
||||
d.DisallowUnknownFields()
|
||||
if err := d.Decode(&m.config); err != nil {
|
||||
return fmt.Errorf("%s Init(): failed to decode JSON config: %w", m.name, err)
|
||||
}
|
||||
}
|
||||
m.meta = map[string]string{
|
||||
@@ -336,7 +337,7 @@ func (m *GpfsCollector) Init(config json.RawMessage) error {
|
||||
"filesystem": "",
|
||||
}
|
||||
m.skipFS = make(map[string]struct{})
|
||||
for _, fs := range m.config.ExcludeFilesystem {
|
||||
for _, fs := range m.config.ExcludeFilesystems {
|
||||
m.skipFS[fs] = struct{}{}
|
||||
}
|
||||
m.lastState = make(map[string]GpfsCollectorState)
|
||||
@@ -346,18 +347,15 @@ func (m *GpfsCollector) Init(config json.RawMessage) error {
|
||||
if !m.config.Sudo {
|
||||
user, err := user.Current()
|
||||
if err != nil {
|
||||
cclog.ComponentError(m.name, "Failed to get current user:", err.Error())
|
||||
return err
|
||||
return fmt.Errorf("%s Init(): failed to get current user: %w", m.name, err)
|
||||
}
|
||||
if user.Uid != "0" {
|
||||
cclog.ComponentError(m.name, "GPFS file system statistics can only be queried by user root")
|
||||
return err
|
||||
return fmt.Errorf("%s Init(): GPFS file system statistics can only be queried by user root", m.name)
|
||||
}
|
||||
} else {
|
||||
p, err := exec.LookPath("sudo")
|
||||
if err != nil {
|
||||
cclog.ComponentError(m.name, "Cannot find 'sudo'")
|
||||
return err
|
||||
return fmt.Errorf("%s Init(): cannot find 'sudo': %w", m.name, err)
|
||||
}
|
||||
m.sudoCmd = p
|
||||
}
|
||||
@@ -377,7 +375,6 @@ func (m *GpfsCollector) Init(config json.RawMessage) error {
|
||||
// the file was given in the config, use it
|
||||
p = m.config.Mmpmon
|
||||
} else {
|
||||
cclog.ComponentError(m.name, fmt.Sprintf("failed to find mmpmon binary '%s': %v", m.config.Mmpmon, err))
|
||||
return fmt.Errorf("%s Init(): failed to find mmpmon binary '%s': %w", m.name, m.config.Mmpmon, err)
|
||||
}
|
||||
}
|
||||
@@ -434,7 +431,7 @@ func (m *GpfsCollector) Init(config json.RawMessage) error {
|
||||
}
|
||||
}
|
||||
if len(m.definitions) == 0 {
|
||||
return errors.New("no metrics to collect")
|
||||
return fmt.Errorf("%s Init(): no metrics to collect", m.name)
|
||||
}
|
||||
|
||||
m.init = true
|
||||
|
||||
@@ -14,7 +14,7 @@ hugo_path: docs/reference/cc-metric-collector/collectors/gpfs.md
|
||||
```json
|
||||
"gpfs": {
|
||||
"mmpmon_path": "/path/to/mmpmon",
|
||||
"use_sudo": "true",
|
||||
"use_sudo": true,
|
||||
"exclude_filesystem": [
|
||||
"fs1"
|
||||
],
|
||||
|
||||
@@ -8,6 +8,7 @@
|
||||
package collectors
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
@@ -79,9 +80,10 @@ func (m *InfinibandCollector) Init(config json.RawMessage) error {
|
||||
m.config.SendDerivedValues = false
|
||||
// Read configuration file, allow overwriting default config
|
||||
if len(config) > 0 {
|
||||
err = json.Unmarshal(config, &m.config)
|
||||
if err != nil {
|
||||
return err
|
||||
d := json.NewDecoder(bytes.NewReader(config))
|
||||
d.DisallowUnknownFields()
|
||||
if err := d.Decode(&m.config); err != nil {
|
||||
return fmt.Errorf("%s Init(): failed to decode JSON config: %w", m.name, err)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -9,8 +9,8 @@ package collectors
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"slices"
|
||||
@@ -44,7 +44,6 @@ type IOstatCollector struct {
|
||||
}
|
||||
|
||||
func (m *IOstatCollector) Init(config json.RawMessage) error {
|
||||
var err error
|
||||
m.name = "IOstatCollector"
|
||||
m.parallel = true
|
||||
m.meta = map[string]string{"source": m.name, "group": "Disk"}
|
||||
@@ -52,9 +51,10 @@ func (m *IOstatCollector) Init(config json.RawMessage) error {
|
||||
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
|
||||
}
|
||||
if len(config) > 0 {
|
||||
err = json.Unmarshal(config, &m.config)
|
||||
if err != nil {
|
||||
return err
|
||||
d := json.NewDecoder(bytes.NewReader(config))
|
||||
d.DisallowUnknownFields()
|
||||
if err := d.Decode(&m.config); err != nil {
|
||||
return fmt.Errorf("%s Init(): Error decoding JSON config: %w", m.name, err)
|
||||
}
|
||||
}
|
||||
// https://www.kernel.org/doc/html/latest/admin-guide/iostats.html
|
||||
@@ -85,7 +85,7 @@ func (m *IOstatCollector) Init(config json.RawMessage) error {
|
||||
}
|
||||
}
|
||||
if len(m.matches) == 0 {
|
||||
return errors.New("no metrics to collect")
|
||||
return fmt.Errorf("%s Init(): no metrics to collect", m.name)
|
||||
}
|
||||
file, err := os.Open(IOSTATFILE)
|
||||
if err != nil {
|
||||
@@ -135,7 +135,7 @@ func (m *IOstatCollector) Init(config json.RawMessage) error {
|
||||
}
|
||||
|
||||
m.init = true
|
||||
return err
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *IOstatCollector) Read(interval time.Duration, output chan lp.CCMessage) {
|
||||
|
||||
@@ -11,7 +11,6 @@ import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"os/exec"
|
||||
@@ -56,9 +55,10 @@ func (m *IpmiCollector) Init(config json.RawMessage) error {
|
||||
m.config.IpmitoolPath = "ipmitool"
|
||||
m.config.IpmisensorsPath = "ipmi-sensors"
|
||||
if len(config) > 0 {
|
||||
err := json.Unmarshal(config, &m.config)
|
||||
if err != nil {
|
||||
return err
|
||||
d := json.NewDecoder(bytes.NewReader(config))
|
||||
d.DisallowUnknownFields()
|
||||
if err := d.Decode(&m.config); err != nil {
|
||||
return fmt.Errorf("%s Init(): Error decoding JSON config: %w", m.name, err)
|
||||
}
|
||||
}
|
||||
// Check if executables ipmitool or ipmisensors are found
|
||||
@@ -67,7 +67,7 @@ func (m *IpmiCollector) Init(config json.RawMessage) error {
|
||||
command := exec.Command(p)
|
||||
err := command.Run()
|
||||
if err != nil {
|
||||
cclog.ComponentError(m.name, fmt.Sprintf("Failed to execute %s: %v", p, err.Error()))
|
||||
cclog.ComponentError(m.name, fmt.Sprintf("Failed to execute %s: %s", p, err.Error()))
|
||||
m.ipmitool = ""
|
||||
} else {
|
||||
m.ipmitool = p
|
||||
@@ -78,14 +78,14 @@ func (m *IpmiCollector) Init(config json.RawMessage) error {
|
||||
command := exec.Command(p)
|
||||
err := command.Run()
|
||||
if err != nil {
|
||||
cclog.ComponentError(m.name, fmt.Sprintf("Failed to execute %s: %v", p, err.Error()))
|
||||
cclog.ComponentError(m.name, fmt.Sprintf("Failed to execute %s: %s", p, err.Error()))
|
||||
m.ipmisensors = ""
|
||||
} else {
|
||||
m.ipmisensors = p
|
||||
}
|
||||
}
|
||||
if len(m.ipmitool) == 0 && len(m.ipmisensors) == 0 {
|
||||
return errors.New("no usable IPMI reader found")
|
||||
return fmt.Errorf("%s Init(): no usable IPMI reader found", m.name)
|
||||
}
|
||||
|
||||
m.init = true
|
||||
|
||||
@@ -14,7 +14,7 @@ hugo_path: docs/reference/cc-metric-collector/collectors/ipmi.md
|
||||
```json
|
||||
"ipmistat": {
|
||||
"ipmitool_path": "/path/to/ipmitool",
|
||||
"ipmisensors_path": "/path/to/ipmi-sensors",
|
||||
"ipmisensors_path": "/path/to/ipmi-sensors"
|
||||
}
|
||||
```
|
||||
|
||||
|
||||
@@ -16,8 +16,8 @@ package collectors
|
||||
import "C"
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"maps"
|
||||
"math"
|
||||
@@ -207,24 +207,25 @@ func (m *LikwidCollector) Init(config json.RawMessage) error {
|
||||
m.config.LibraryPath = LIKWID_LIB_NAME
|
||||
m.config.LockfilePath = LIKWID_DEF_LOCKFILE
|
||||
if len(config) > 0 {
|
||||
err := json.Unmarshal(config, &m.config)
|
||||
if err != nil {
|
||||
return fmt.Errorf("%s Init(): failed to unmarshal JSON config: %w", m.name, err)
|
||||
d := json.NewDecoder(bytes.NewReader(config))
|
||||
d.DisallowUnknownFields()
|
||||
if err := d.Decode(&m.config); err != nil {
|
||||
return fmt.Errorf("%s Init(): Error decoding JSON config: %w", m.name, err)
|
||||
}
|
||||
}
|
||||
lib := dl.New(m.config.LibraryPath, LIKWID_LIB_DL_FLAGS)
|
||||
if lib == nil {
|
||||
return fmt.Errorf("error instantiating DynamicLibrary for %s", m.config.LibraryPath)
|
||||
return fmt.Errorf("%s Init(): error instantiating DynamicLibrary for %s", m.name, m.config.LibraryPath)
|
||||
}
|
||||
err := lib.Open()
|
||||
if err != nil {
|
||||
return fmt.Errorf("error opening %s: %w", m.config.LibraryPath, err)
|
||||
return fmt.Errorf("%s Init(): error opening %s: %w", m.name, m.config.LibraryPath, err)
|
||||
}
|
||||
|
||||
if m.config.ForceOverwrite {
|
||||
cclog.ComponentDebug(m.name, "Set LIKWID_FORCE=1")
|
||||
if err := os.Setenv("LIKWID_FORCE", "1"); err != nil {
|
||||
return fmt.Errorf("error setting environment variable LIKWID_FORCE=1: %w", err)
|
||||
return fmt.Errorf("%s Init(): error setting environment variable LIKWID_FORCE=1: %w", m.name, err)
|
||||
}
|
||||
}
|
||||
if err := m.setup(); err != nil {
|
||||
@@ -295,16 +296,12 @@ func (m *LikwidCollector) Init(config json.RawMessage) error {
|
||||
|
||||
// If no event set could be added, shut down LikwidCollector
|
||||
if totalMetrics == 0 {
|
||||
err := errors.New("no LIKWID eventset or metric usable")
|
||||
cclog.ComponentError(m.name, err.Error())
|
||||
return err
|
||||
return fmt.Errorf("%s Init(): no LIKWID eventset or metric usable", m.name)
|
||||
}
|
||||
|
||||
ret := C.topology_init()
|
||||
if ret != 0 {
|
||||
err := errors.New("failed to initialize topology module")
|
||||
cclog.ComponentError(m.name, err.Error())
|
||||
return err
|
||||
return fmt.Errorf("%s Init(): failed to initialize topology module", m.name)
|
||||
}
|
||||
m.measureThread = thread.New()
|
||||
switch m.config.AccessMode {
|
||||
@@ -319,7 +316,7 @@ func (m *LikwidCollector) Init(config json.RawMessage) error {
|
||||
p = m.config.DaemonPath
|
||||
}
|
||||
if err := os.Setenv("PATH", p); err != nil {
|
||||
return fmt.Errorf("error setting environment variable PATH=%s: %w", p, err)
|
||||
return fmt.Errorf("%s Init(): error setting environment variable PATH=%s: %w", m.name, p, err)
|
||||
}
|
||||
}
|
||||
C.HPMmode(1)
|
||||
|
||||
@@ -8,6 +8,7 @@
|
||||
package collectors
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
@@ -48,9 +49,10 @@ func (m *LoadavgCollector) Init(config json.RawMessage) error {
|
||||
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
|
||||
}
|
||||
if len(config) > 0 {
|
||||
err := json.Unmarshal(config, &m.config)
|
||||
if err != nil {
|
||||
return err
|
||||
d := json.NewDecoder(bytes.NewReader(config))
|
||||
d.DisallowUnknownFields()
|
||||
if err := d.Decode(&m.config); err != nil {
|
||||
return fmt.Errorf("%s Init(): Error decoding JSON config: %w", m.name, err)
|
||||
}
|
||||
}
|
||||
m.meta = map[string]string{
|
||||
@@ -63,16 +65,17 @@ func (m *LoadavgCollector) Init(config json.RawMessage) error {
|
||||
"load_five",
|
||||
"load_fifteen",
|
||||
}
|
||||
m.load_skips = make([]bool, len(m.load_matches))
|
||||
m.proc_matches = []string{
|
||||
"proc_run",
|
||||
"proc_total",
|
||||
}
|
||||
m.proc_skips = make([]bool, len(m.proc_matches))
|
||||
|
||||
m.load_skips = make([]bool, len(m.load_matches))
|
||||
for i, name := range m.load_matches {
|
||||
m.load_skips[i] = slices.Contains(m.config.ExcludeMetrics, name)
|
||||
}
|
||||
|
||||
m.proc_skips = make([]bool, len(m.proc_matches))
|
||||
for i, name := range m.proc_matches {
|
||||
m.proc_skips[i] = slices.Contains(m.config.ExcludeMetrics, name)
|
||||
}
|
||||
|
||||
@@ -8,6 +8,7 @@
|
||||
package collectors
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
@@ -18,7 +19,6 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
|
||||
lp "github.com/ClusterCockpit/cc-lib/v2/ccMessage"
|
||||
)
|
||||
|
||||
@@ -300,9 +300,10 @@ func (m *LustreCollector) Init(config json.RawMessage) error {
|
||||
m.name = "LustreCollector"
|
||||
m.parallel = true
|
||||
if len(config) > 0 {
|
||||
err = json.Unmarshal(config, &m.config)
|
||||
if err != nil {
|
||||
return err
|
||||
d := json.NewDecoder(bytes.NewReader(config))
|
||||
d.DisallowUnknownFields()
|
||||
if err := d.Decode(&m.config); err != nil {
|
||||
return fmt.Errorf("%s Init(): Error decoding JSON config: %w", m.name, err)
|
||||
}
|
||||
}
|
||||
if err := m.setup(); err != nil {
|
||||
@@ -316,18 +317,15 @@ func (m *LustreCollector) Init(config json.RawMessage) error {
|
||||
if !m.config.Sudo {
|
||||
user, err := user.Current()
|
||||
if err != nil {
|
||||
cclog.ComponentError(m.name, "Failed to get current user:", err.Error())
|
||||
return err
|
||||
return fmt.Errorf("%s Init(): Failed to get current user: %w", m.name, err)
|
||||
}
|
||||
if user.Uid != "0" {
|
||||
cclog.ComponentError(m.name, "Lustre file system statistics can only be queried by user root")
|
||||
return err
|
||||
return fmt.Errorf("%s Init(): Lustre file system statistics can only be queried by user root", m.name)
|
||||
}
|
||||
} else {
|
||||
p, err := exec.LookPath("sudo")
|
||||
if err != nil {
|
||||
cclog.ComponentError(m.name, "Cannot find 'sudo'")
|
||||
return err
|
||||
return fmt.Errorf("%s Init(): Cannot find 'sudo': %w", m.name, err)
|
||||
}
|
||||
m.sudoCmd = p
|
||||
}
|
||||
@@ -336,7 +334,7 @@ func (m *LustreCollector) Init(config json.RawMessage) error {
|
||||
if err != nil {
|
||||
p, err = exec.LookPath(LCTL_CMD)
|
||||
if err != nil {
|
||||
return err
|
||||
return fmt.Errorf("%s Init(): Cannot find %s command: %w", m.name, LCTL_CMD, err)
|
||||
}
|
||||
}
|
||||
m.lctl = p
|
||||
@@ -364,12 +362,12 @@ func (m *LustreCollector) Init(config json.RawMessage) error {
|
||||
}
|
||||
}
|
||||
if len(m.definitions) == 0 {
|
||||
return errors.New("no metrics to collect")
|
||||
return fmt.Errorf("%s Init(): no metrics to collect", m.name)
|
||||
}
|
||||
|
||||
devices := m.getDevices()
|
||||
if len(devices) == 0 {
|
||||
return errors.New("no Lustre devices found")
|
||||
return fmt.Errorf("%s Init(): no Lustre devices found", m.name)
|
||||
}
|
||||
m.stats = make(map[string]map[string]int64)
|
||||
for _, d := range devices {
|
||||
|
||||
@@ -9,8 +9,8 @@ package collectors
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
@@ -95,15 +95,15 @@ func getStats(filename string) map[string]MemstatStats {
|
||||
}
|
||||
|
||||
func (m *MemstatCollector) Init(config json.RawMessage) error {
|
||||
var err error
|
||||
m.name = "MemstatCollector"
|
||||
m.parallel = true
|
||||
m.config.NodeStats = true
|
||||
m.config.NumaStats = false
|
||||
if len(config) > 0 {
|
||||
err = json.Unmarshal(config, &m.config)
|
||||
if err != nil {
|
||||
return err
|
||||
d := json.NewDecoder(bytes.NewReader(config))
|
||||
d.DisallowUnknownFields()
|
||||
if err := d.Decode(&m.config); err != nil {
|
||||
return fmt.Errorf("%s Init(): Error decoding JSON config: %w", m.name, err)
|
||||
}
|
||||
}
|
||||
m.meta = map[string]string{"source": m.name, "group": "Memory"}
|
||||
@@ -132,7 +132,7 @@ func (m *MemstatCollector) Init(config json.RawMessage) error {
|
||||
m.sendMemUsed = true
|
||||
}
|
||||
if len(m.matches) == 0 {
|
||||
return errors.New("no metrics to collect")
|
||||
return fmt.Errorf("%s Init(): no metrics to collect", m.name)
|
||||
}
|
||||
if err := m.setup(); err != nil {
|
||||
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
|
||||
@@ -140,7 +140,7 @@ func (m *MemstatCollector) Init(config json.RawMessage) error {
|
||||
|
||||
if m.config.NodeStats {
|
||||
if stats := getStats(MEMSTATFILE); len(stats) == 0 {
|
||||
return fmt.Errorf("cannot read data from file %s", MEMSTATFILE)
|
||||
return fmt.Errorf("%s Init(): cannot read data from file %s", m.name, MEMSTATFILE)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -152,7 +152,7 @@ func (m *MemstatCollector) Init(config json.RawMessage) error {
|
||||
m.nodefiles = make(map[int]MemstatCollectorNode)
|
||||
for _, f := range files {
|
||||
if stats := getStats(f); len(stats) == 0 {
|
||||
return fmt.Errorf("cannot read data from file %s", f)
|
||||
return fmt.Errorf("%s Init(): cannot read data from file %s", m.name, f)
|
||||
}
|
||||
rematch := regex.FindStringSubmatch(f)
|
||||
if len(rematch) == 2 {
|
||||
@@ -172,7 +172,7 @@ func (m *MemstatCollector) Init(config json.RawMessage) error {
|
||||
}
|
||||
}
|
||||
m.init = true
|
||||
return err
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *MemstatCollector) Read(interval time.Duration, output chan lp.CCMessage) {
|
||||
|
||||
@@ -9,6 +9,7 @@ package collectors
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
@@ -99,10 +100,10 @@ func (m *NetstatCollector) Init(config json.RawMessage) error {
|
||||
m.config.SendDerivedValues = false
|
||||
// Read configuration file, allow overwriting default config
|
||||
if len(config) > 0 {
|
||||
err := json.Unmarshal(config, &m.config)
|
||||
if err != nil {
|
||||
cclog.ComponentError(m.name, "Error reading config:", err.Error())
|
||||
return err
|
||||
d := json.NewDecoder(bytes.NewReader(config))
|
||||
d.DisallowUnknownFields()
|
||||
if err := d.Decode(&m.config); err != nil {
|
||||
return fmt.Errorf("%s Init(): failed to decode JSON config: %w", m.name, err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -133,11 +134,31 @@ func (m *NetstatCollector) Init(config json.RawMessage) error {
|
||||
// Check if device is a included device
|
||||
if slices.Contains(m.config.IncludeDevices, canonical) {
|
||||
// Tag will contain original device name (raw).
|
||||
tags := map[string]string{"stype": "network", "stype-id": raw, "type": "node"}
|
||||
meta_unit_byte := map[string]string{"source": m.name, "group": "Network", "unit": "bytes"}
|
||||
meta_unit_byte_per_sec := map[string]string{"source": m.name, "group": "Network", "unit": "bytes/sec"}
|
||||
meta_unit_pkts := map[string]string{"source": m.name, "group": "Network", "unit": "packets"}
|
||||
meta_unit_pkts_per_sec := map[string]string{"source": m.name, "group": "Network", "unit": "packets/sec"}
|
||||
tags := map[string]string{
|
||||
"stype": "network",
|
||||
"stype-id": raw,
|
||||
"type": "node",
|
||||
}
|
||||
meta_unit_byte := map[string]string{
|
||||
"source": m.name,
|
||||
"group": "Network",
|
||||
"unit": "bytes",
|
||||
}
|
||||
meta_unit_byte_per_sec := map[string]string{
|
||||
"source": m.name,
|
||||
"group": "Network",
|
||||
"unit": "bytes/sec",
|
||||
}
|
||||
meta_unit_pkts := map[string]string{
|
||||
"source": m.name,
|
||||
"group": "Network",
|
||||
"unit": "packets",
|
||||
}
|
||||
meta_unit_pkts_per_sec := map[string]string{
|
||||
"source": m.name,
|
||||
"group": "Network",
|
||||
"unit": "packets/sec",
|
||||
}
|
||||
|
||||
m.matches[canonical] = []NetstatCollectorMetric{
|
||||
{
|
||||
|
||||
@@ -8,6 +8,7 @@
|
||||
package collectors
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"slices"
|
||||
@@ -45,12 +46,7 @@ type nfsCollector struct {
|
||||
}
|
||||
|
||||
func (m *nfsCollector) updateStats() error {
|
||||
cmd := exec.Command(m.config.Nfsstats, `-l`, `--all`)
|
||||
|
||||
// Wait for cmd end
|
||||
if err := cmd.Wait(); err != nil {
|
||||
return fmt.Errorf("%s updateStats(): %w", m.name, err)
|
||||
}
|
||||
cmd := exec.Command(m.config.Nfsstats, "-l", "--all")
|
||||
|
||||
buffer, err := cmd.Output()
|
||||
if err != nil {
|
||||
@@ -95,9 +91,10 @@ func (m *nfsCollector) MainInit(config json.RawMessage) error {
|
||||
m.config.Nfsstats = string(NFSSTAT_EXEC)
|
||||
// Read JSON configuration
|
||||
if len(config) > 0 {
|
||||
err := json.Unmarshal(config, &m.config)
|
||||
if err != nil {
|
||||
return fmt.Errorf("%s Init(): failed to unmarshal JSON config: %w", m.name, err)
|
||||
d := json.NewDecoder(bytes.NewReader(config))
|
||||
d.DisallowUnknownFields()
|
||||
if err := d.Decode(&m.config); err != nil {
|
||||
return fmt.Errorf("%s Init(): failed to decode JSON config: %w", m.name, err)
|
||||
}
|
||||
}
|
||||
m.meta = map[string]string{
|
||||
|
||||
@@ -8,6 +8,7 @@
|
||||
package collectors
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
@@ -17,14 +18,13 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
|
||||
lp "github.com/ClusterCockpit/cc-lib/v2/ccMessage"
|
||||
)
|
||||
|
||||
// These are the fields we read from the JSON configuration
|
||||
type NfsIOStatCollectorConfig struct {
|
||||
ExcludeMetrics []string `json:"exclude_metrics,omitempty"`
|
||||
ExcludeFilesystem []string `json:"exclude_filesystem,omitempty"`
|
||||
ExcludeFilesystems []string `json:"exclude_filesystem,omitempty"`
|
||||
UseServerAddressAsSType bool `json:"use_server_as_stype,omitempty"`
|
||||
SendAbsoluteValues bool `json:"send_abs_values"`
|
||||
SendDerivedValues bool `json:"send_derived_values"`
|
||||
@@ -75,7 +75,7 @@ func (m *NfsIOStatCollector) readNfsiostats() map[string]map[string]int64 {
|
||||
// Is this a device line with mount point, remote target and NFS version?
|
||||
dev := resolve_regex_fields(l, deviceRegex)
|
||||
if len(dev) > 0 {
|
||||
if !slices.Contains(m.config.ExcludeFilesystem, dev[m.key]) {
|
||||
if !slices.Contains(m.config.ExcludeFilesystems, dev[m.key]) {
|
||||
current = dev
|
||||
if len(current["version"]) == 0 {
|
||||
current["version"] = "3"
|
||||
@@ -104,7 +104,6 @@ func (m *NfsIOStatCollector) readNfsiostats() map[string]map[string]int64 {
|
||||
}
|
||||
|
||||
func (m *NfsIOStatCollector) Init(config json.RawMessage) error {
|
||||
var err error = nil
|
||||
m.name = "NfsIOStatCollector"
|
||||
if err := m.setup(); err != nil {
|
||||
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
|
||||
@@ -117,10 +116,10 @@ func (m *NfsIOStatCollector) Init(config json.RawMessage) error {
|
||||
m.config.SendAbsoluteValues = true
|
||||
m.config.SendDerivedValues = false
|
||||
if len(config) > 0 {
|
||||
err = json.Unmarshal(config, &m.config)
|
||||
if err != nil {
|
||||
cclog.ComponentError(m.name, "Error reading config:", err.Error())
|
||||
return err
|
||||
d := json.NewDecoder(bytes.NewReader(config))
|
||||
d.DisallowUnknownFields()
|
||||
if err := d.Decode(&m.config); err != nil {
|
||||
return fmt.Errorf("%s Init(): failed to decode JSON config: %w", m.name, err)
|
||||
}
|
||||
}
|
||||
m.key = "mntpoint"
|
||||
@@ -130,7 +129,7 @@ func (m *NfsIOStatCollector) Init(config json.RawMessage) error {
|
||||
m.data = m.readNfsiostats()
|
||||
m.lastTimestamp = time.Now()
|
||||
m.init = true
|
||||
return err
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *NfsIOStatCollector) Read(interval time.Duration, output chan lp.CCMessage) {
|
||||
|
||||
@@ -16,7 +16,7 @@ hugo_path: docs/reference/cc-metric-collector/collectors/nfsio.md
|
||||
"exclude_metrics": [
|
||||
"oread", "pageread"
|
||||
],
|
||||
"exclude_filesystems": [
|
||||
"exclude_filesystem": [
|
||||
"/mnt"
|
||||
],
|
||||
"use_server_as_stype": false,
|
||||
|
||||
@@ -2,6 +2,7 @@ package collectors
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
@@ -83,9 +84,10 @@ func (m *NUMAStatsCollector) Init(config json.RawMessage) error {
|
||||
|
||||
m.config.SendAbsoluteValues = true
|
||||
if len(config) > 0 {
|
||||
err := json.Unmarshal(config, &m.config)
|
||||
if err != nil {
|
||||
return fmt.Errorf("%s Init(): unable to unmarshal numastat configuration: %w", m.name, err)
|
||||
d := json.NewDecoder(bytes.NewReader(config))
|
||||
d.DisallowUnknownFields()
|
||||
if err := d.Decode(&m.config); err != nil {
|
||||
return fmt.Errorf("%s Init(): Error decoding JSON config: %w", m.name, err)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -72,9 +72,10 @@ func (m *NvidiaCollector) Init(config json.RawMessage) error {
|
||||
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
|
||||
}
|
||||
if len(config) > 0 {
|
||||
err = json.Unmarshal(config, &m.config)
|
||||
if err != nil {
|
||||
return err
|
||||
d := json.NewDecoder(strings.NewReader(string(config)))
|
||||
d.DisallowUnknownFields()
|
||||
if err = d.Decode(&m.config); err != nil {
|
||||
return fmt.Errorf("%s Init(): Error decoding JSON config: %w", m.name, err)
|
||||
}
|
||||
}
|
||||
m.meta = map[string]string{
|
||||
@@ -90,22 +91,18 @@ func (m *NvidiaCollector) Init(config json.RawMessage) error {
|
||||
// Error: NVML library not found
|
||||
// (nvml.ErrorString can not be used in this case)
|
||||
if ret == nvml.ERROR_LIBRARY_NOT_FOUND {
|
||||
err = fmt.Errorf("NVML library not found")
|
||||
cclog.ComponentError(m.name, err.Error())
|
||||
return err
|
||||
return fmt.Errorf("%s Init(): NVML library not found", m.name)
|
||||
}
|
||||
if ret != nvml.SUCCESS {
|
||||
err = errors.New(nvml.ErrorString(ret))
|
||||
cclog.ComponentError(m.name, "Unable to initialize NVML", err.Error())
|
||||
return err
|
||||
return fmt.Errorf("%s Init(): Unable to initialize NVML: %w", m.name, err)
|
||||
}
|
||||
|
||||
// Number of NVIDIA GPUs
|
||||
num_gpus, ret := nvml.DeviceGetCount()
|
||||
if ret != nvml.SUCCESS {
|
||||
err = errors.New(nvml.ErrorString(ret))
|
||||
cclog.ComponentError(m.name, "Unable to get device count", err.Error())
|
||||
return err
|
||||
return fmt.Errorf("%s Init(): Unable to get device count: %w", m.name, err)
|
||||
}
|
||||
|
||||
// For all GPUs
|
||||
|
||||
@@ -8,6 +8,7 @@
|
||||
package collectors
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
@@ -67,10 +68,10 @@ func (m *RAPLCollector) Init(config json.RawMessage) error {
|
||||
|
||||
// Read in the JSON configuration
|
||||
if len(config) > 0 {
|
||||
err := json.Unmarshal(config, &m.config)
|
||||
if err != nil {
|
||||
cclog.ComponentError(m.name, "Error reading config:", err.Error())
|
||||
return err
|
||||
d := json.NewDecoder(bytes.NewReader(config))
|
||||
d.DisallowUnknownFields()
|
||||
if err := d.Decode(&m.config); err != nil {
|
||||
return fmt.Errorf("%s Init(): Error decoding JSON config: %w", m.name, err)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -8,8 +8,8 @@
|
||||
package collectors
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"slices"
|
||||
"strconv"
|
||||
@@ -51,7 +51,6 @@ type RocmSmiCollector struct {
|
||||
// Called once by the collector manager
|
||||
// All tags, meta data tags and metrics that do not change over the runtime should be set here
|
||||
func (m *RocmSmiCollector) Init(config json.RawMessage) error {
|
||||
var err error = nil
|
||||
// Always set the name early in Init() to use it in cclog.Component* functions
|
||||
m.name = "RocmSmiCollector"
|
||||
// This is for later use, also call it early
|
||||
@@ -60,25 +59,21 @@ func (m *RocmSmiCollector) Init(config json.RawMessage) error {
|
||||
}
|
||||
// Read in the JSON configuration
|
||||
if len(config) > 0 {
|
||||
err = json.Unmarshal(config, &m.config)
|
||||
if err != nil {
|
||||
cclog.ComponentError(m.name, "Error reading config:", err.Error())
|
||||
return err
|
||||
d := json.NewDecoder(bytes.NewReader(config))
|
||||
d.DisallowUnknownFields()
|
||||
if err := d.Decode(&m.config); err != nil {
|
||||
return fmt.Errorf("%s Init(): Error decoding JSON config: %w", m.name, err)
|
||||
}
|
||||
}
|
||||
|
||||
ret := rocm_smi.Init()
|
||||
if ret != rocm_smi.STATUS_SUCCESS {
|
||||
err = errors.New("failed to initialize ROCm SMI library")
|
||||
cclog.ComponentError(m.name, err.Error())
|
||||
return err
|
||||
return fmt.Errorf("%s Init(): failed to initialize ROCm SMI library", m.name)
|
||||
}
|
||||
|
||||
numDevs, ret := rocm_smi.NumMonitorDevices()
|
||||
if ret != rocm_smi.STATUS_SUCCESS {
|
||||
err = errors.New("failed to get number of GPUs from ROCm SMI library")
|
||||
cclog.ComponentError(m.name, err.Error())
|
||||
return err
|
||||
return fmt.Errorf("%s Init(): failed to get number of GPUs from ROCm SMI library", m.name)
|
||||
}
|
||||
|
||||
m.devices = make([]RocmSmiCollectorDevice, 0)
|
||||
@@ -90,16 +85,12 @@ func (m *RocmSmiCollector) Init(config json.RawMessage) error {
|
||||
}
|
||||
device, ret := rocm_smi.DeviceGetHandleByIndex(i)
|
||||
if ret != rocm_smi.STATUS_SUCCESS {
|
||||
err = fmt.Errorf("failed to get handle for GPU %d", i)
|
||||
cclog.ComponentError(m.name, err.Error())
|
||||
return err
|
||||
return fmt.Errorf("%s Init(): failed to get get handle for GPU %d", m.name, i)
|
||||
}
|
||||
|
||||
pciInfo, ret := rocm_smi.DeviceGetPciInfo(device)
|
||||
if ret != rocm_smi.STATUS_SUCCESS {
|
||||
err = fmt.Errorf("failed to get PCI information for GPU %d", i)
|
||||
cclog.ComponentError(m.name, err.Error())
|
||||
return err
|
||||
return fmt.Errorf("%s Init(): failed to get PCI information for GPU %d", m.name, i)
|
||||
}
|
||||
|
||||
pciId := fmt.Sprintf(
|
||||
@@ -149,7 +140,7 @@ func (m *RocmSmiCollector) Init(config json.RawMessage) error {
|
||||
|
||||
// Set this flag only if everything is initialized properly, all required files exist, ...
|
||||
m.init = true
|
||||
return err
|
||||
return nil
|
||||
}
|
||||
|
||||
// Read collects all metrics belonging to the sample collector
|
||||
|
||||
@@ -8,11 +8,11 @@
|
||||
package collectors
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
|
||||
lp "github.com/ClusterCockpit/cc-lib/v2/ccMessage"
|
||||
)
|
||||
|
||||
@@ -52,7 +52,10 @@ func (m *SampleCollector) Init(config json.RawMessage) error {
|
||||
m.parallel = true
|
||||
// Define meta information sent with each metric
|
||||
// (Can also be dynamic or this is the basic set with extension through AddMeta())
|
||||
m.meta = map[string]string{"source": m.name, "group": "SAMPLE"}
|
||||
m.meta = map[string]string{
|
||||
"source": m.name,
|
||||
"group": "SAMPLE",
|
||||
}
|
||||
// Define tags sent with each metric
|
||||
// The 'type' tag is always needed, it defines the granularity of the metric
|
||||
// node -> whole system
|
||||
@@ -63,13 +66,15 @@ func (m *SampleCollector) Init(config json.RawMessage) error {
|
||||
// core -> single CPU core that may consist of multiple hardware threads (SMT) (requires core ID as 'type-id' tag)
|
||||
// hwthtread -> single CPU hardware thread (requires hardware thread ID as 'type-id' tag)
|
||||
// accelerator -> A accelerator device like GPU or FPGA (requires an accelerator ID as 'type-id' tag)
|
||||
m.tags = map[string]string{"type": "node"}
|
||||
m.tags = map[string]string{
|
||||
"type": "node",
|
||||
}
|
||||
// Read in the JSON configuration
|
||||
if len(config) > 0 {
|
||||
err = json.Unmarshal(config, &m.config)
|
||||
if err != nil {
|
||||
cclog.ComponentError(m.name, "Error reading config:", err.Error())
|
||||
return err
|
||||
d := json.NewDecoder(bytes.NewReader(config))
|
||||
d.DisallowUnknownFields()
|
||||
if err := d.Decode(&m.config); err != nil {
|
||||
return fmt.Errorf("%s Init(): Error decoding JSON config: %w", m.name, err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -96,7 +101,7 @@ func (m *SampleCollector) Read(interval time.Duration, output chan lp.CCMessage)
|
||||
// stop := readState()
|
||||
// value = (stop - start) / interval.Seconds()
|
||||
|
||||
y, err := lp.NewMessage("sample_metric", m.tags, m.meta, map[string]any{"value": value}, timestamp)
|
||||
y, err := lp.NewMetric("sample_metric", m.tags, m.meta, value, timestamp)
|
||||
if err == nil {
|
||||
// Send it to output channel
|
||||
output <- y
|
||||
|
||||
@@ -8,6 +8,7 @@
|
||||
package collectors
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"sync"
|
||||
@@ -47,26 +48,30 @@ func (m *SampleTimerCollector) Init(name string, config json.RawMessage) error {
|
||||
}
|
||||
// Define meta information sent with each metric
|
||||
// (Can also be dynamic or this is the basic set with extension through AddMeta())
|
||||
m.meta = map[string]string{"source": m.name, "group": "SAMPLE"}
|
||||
m.meta = map[string]string{
|
||||
"source": m.name,
|
||||
"group": "SAMPLE",
|
||||
}
|
||||
// Define tags sent with each metric
|
||||
// The 'type' tag is always needed, it defines the granularity of the metric
|
||||
// node -> whole system
|
||||
// socket -> CPU socket (requires socket ID as 'type-id' tag)
|
||||
// cpu -> single CPU hardware thread (requires cpu ID as 'type-id' tag)
|
||||
m.tags = map[string]string{"type": "node"}
|
||||
m.tags = map[string]string{
|
||||
"type": "node",
|
||||
}
|
||||
// Read in the JSON configuration
|
||||
if len(config) > 0 {
|
||||
err = json.Unmarshal(config, &m.config)
|
||||
if err != nil {
|
||||
cclog.ComponentError(m.name, "Error reading config:", err.Error())
|
||||
return err
|
||||
d := json.NewDecoder(bytes.NewReader(config))
|
||||
d.DisallowUnknownFields()
|
||||
if err := d.Decode(&m.config); err != nil {
|
||||
return fmt.Errorf("%s Init(): error decoding JSON config: %w", m.name, err)
|
||||
}
|
||||
}
|
||||
// Parse the read interval duration
|
||||
m.interval, err = time.ParseDuration(m.config.Interval)
|
||||
if err != nil {
|
||||
cclog.ComponentError(m.name, "Error parsing interval:", err.Error())
|
||||
return err
|
||||
return fmt.Errorf("%s Init(): error parsing interval: %w", m.name, err)
|
||||
}
|
||||
|
||||
// Storage for output channel
|
||||
@@ -77,13 +82,11 @@ func (m *SampleTimerCollector) Init(name string, config json.RawMessage) error {
|
||||
m.ticker = time.NewTicker(m.interval)
|
||||
|
||||
// Start the timer loop with return functionality by sending 'true' to the done channel
|
||||
m.wg.Add(1)
|
||||
go func() {
|
||||
m.wg.Go(func() {
|
||||
select {
|
||||
case <-m.done:
|
||||
// Exit the timer loop
|
||||
cclog.ComponentDebug(m.name, "Closing...")
|
||||
m.wg.Done()
|
||||
return
|
||||
case timestamp := <-m.ticker.C:
|
||||
// This is executed every timer tick but we have to wait until the first
|
||||
@@ -92,7 +95,7 @@ func (m *SampleTimerCollector) Init(name string, config json.RawMessage) error {
|
||||
m.ReadMetrics(timestamp)
|
||||
}
|
||||
}
|
||||
}()
|
||||
})
|
||||
|
||||
// Set this flag only if everything is initialized properly, all required files exist, ...
|
||||
m.init = true
|
||||
@@ -111,7 +114,7 @@ func (m *SampleTimerCollector) ReadMetrics(timestamp time.Time) {
|
||||
// stop := readState()
|
||||
// value = (stop - start) / interval.Seconds()
|
||||
|
||||
y, err := lp.NewMessage("sample_metric", m.tags, m.meta, map[string]any{"value": value}, timestamp)
|
||||
y, err := lp.NewMetric("sample_metric", m.tags, m.meta, value, timestamp)
|
||||
if err == nil && m.output != nil {
|
||||
// Send it to output channel if we have a valid channel
|
||||
m.output <- y
|
||||
|
||||
@@ -9,6 +9,7 @@ package collectors
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
@@ -66,8 +67,10 @@ func (m *SchedstatCollector) Init(config json.RawMessage) error {
|
||||
|
||||
// Read in the JSON configuration
|
||||
if len(config) > 0 {
|
||||
if err := json.Unmarshal(config, &m.config); err != nil {
|
||||
return fmt.Errorf("%s Init(): Error reading config: %w", m.name, err)
|
||||
d := json.NewDecoder(bytes.NewReader(config))
|
||||
d.DisallowUnknownFields()
|
||||
if err := d.Decode(&m.config); err != nil {
|
||||
return fmt.Errorf("%s Init(): failed to decode JSON config: %w", m.name, err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -124,7 +127,7 @@ func (m *SchedstatCollector) ParseProcLine(linefields []string, tags map[string]
|
||||
m.olddata[linefields[0]]["waiting"] = waiting
|
||||
value := l_running + l_waiting
|
||||
|
||||
y, err := lp.NewMessage("cpu_load_core", tags, m.meta, map[string]any{"value": value}, now)
|
||||
y, err := lp.NewMetric("cpu_load_core", tags, m.meta, value, now)
|
||||
if err == nil {
|
||||
// Send it to output channel
|
||||
output <- y
|
||||
|
||||
@@ -8,13 +8,13 @@
|
||||
package collectors
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"runtime"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
|
||||
lp "github.com/ClusterCockpit/cc-lib/v2/ccMessage"
|
||||
)
|
||||
|
||||
@@ -40,13 +40,18 @@ func (m *SelfCollector) Init(config json.RawMessage) error {
|
||||
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
|
||||
}
|
||||
m.parallel = true
|
||||
m.meta = map[string]string{"source": m.name, "group": "Self"}
|
||||
m.tags = map[string]string{"type": "node"}
|
||||
m.meta = map[string]string{
|
||||
"source": m.name,
|
||||
"group": "Self",
|
||||
}
|
||||
m.tags = map[string]string{
|
||||
"type": "node",
|
||||
}
|
||||
if len(config) > 0 {
|
||||
err = json.Unmarshal(config, &m.config)
|
||||
if err != nil {
|
||||
cclog.ComponentError(m.name, "Error reading config:", err.Error())
|
||||
return err
|
||||
d := json.NewDecoder(bytes.NewReader(config))
|
||||
d.DisallowUnknownFields()
|
||||
if err := d.Decode(&m.config); err != nil {
|
||||
return fmt.Errorf("%s Init(): Error decoding JSON config: %w", m.name, err)
|
||||
}
|
||||
}
|
||||
m.init = true
|
||||
@@ -60,49 +65,49 @@ func (m *SelfCollector) Read(interval time.Duration, output chan lp.CCMessage) {
|
||||
var memstats runtime.MemStats
|
||||
runtime.ReadMemStats(&memstats)
|
||||
|
||||
y, err := lp.NewMessage("total_alloc", m.tags, m.meta, map[string]any{"value": memstats.TotalAlloc}, timestamp)
|
||||
y, err := lp.NewMetric("total_alloc", m.tags, m.meta, memstats.TotalAlloc, timestamp)
|
||||
if err == nil {
|
||||
y.AddMeta("unit", "Bytes")
|
||||
output <- y
|
||||
}
|
||||
y, err = lp.NewMessage("heap_alloc", m.tags, m.meta, map[string]any{"value": memstats.HeapAlloc}, timestamp)
|
||||
y, err = lp.NewMetric("heap_alloc", m.tags, m.meta, memstats.HeapAlloc, timestamp)
|
||||
if err == nil {
|
||||
y.AddMeta("unit", "Bytes")
|
||||
output <- y
|
||||
}
|
||||
y, err = lp.NewMessage("heap_sys", m.tags, m.meta, map[string]any{"value": memstats.HeapSys}, timestamp)
|
||||
y, err = lp.NewMetric("heap_sys", m.tags, m.meta, memstats.HeapSys, timestamp)
|
||||
if err == nil {
|
||||
y.AddMeta("unit", "Bytes")
|
||||
output <- y
|
||||
}
|
||||
y, err = lp.NewMessage("heap_idle", m.tags, m.meta, map[string]any{"value": memstats.HeapIdle}, timestamp)
|
||||
y, err = lp.NewMetric("heap_idle", m.tags, m.meta, memstats.HeapIdle, timestamp)
|
||||
if err == nil {
|
||||
y.AddMeta("unit", "Bytes")
|
||||
output <- y
|
||||
}
|
||||
y, err = lp.NewMessage("heap_inuse", m.tags, m.meta, map[string]any{"value": memstats.HeapInuse}, timestamp)
|
||||
y, err = lp.NewMetric("heap_inuse", m.tags, m.meta, memstats.HeapInuse, timestamp)
|
||||
if err == nil {
|
||||
y.AddMeta("unit", "Bytes")
|
||||
output <- y
|
||||
}
|
||||
y, err = lp.NewMessage("heap_released", m.tags, m.meta, map[string]any{"value": memstats.HeapReleased}, timestamp)
|
||||
y, err = lp.NewMetric("heap_released", m.tags, m.meta, memstats.HeapReleased, timestamp)
|
||||
if err == nil {
|
||||
y.AddMeta("unit", "Bytes")
|
||||
output <- y
|
||||
}
|
||||
y, err = lp.NewMessage("heap_objects", m.tags, m.meta, map[string]any{"value": memstats.HeapObjects}, timestamp)
|
||||
y, err = lp.NewMetric("heap_objects", m.tags, m.meta, memstats.HeapObjects, timestamp)
|
||||
if err == nil {
|
||||
output <- y
|
||||
}
|
||||
}
|
||||
if m.config.GoRoutines {
|
||||
y, err := lp.NewMessage("num_goroutines", m.tags, m.meta, map[string]any{"value": runtime.NumGoroutine()}, timestamp)
|
||||
y, err := lp.NewMetric("num_goroutines", m.tags, m.meta, runtime.NumGoroutine(), timestamp)
|
||||
if err == nil {
|
||||
output <- y
|
||||
}
|
||||
}
|
||||
if m.config.CgoCalls {
|
||||
y, err := lp.NewMessage("num_cgo_calls", m.tags, m.meta, map[string]any{"value": runtime.NumCgoCall()}, timestamp)
|
||||
y, err := lp.NewMetric("num_cgo_calls", m.tags, m.meta, runtime.NumCgoCall(), timestamp)
|
||||
if err == nil {
|
||||
output <- y
|
||||
}
|
||||
@@ -113,35 +118,35 @@ func (m *SelfCollector) Read(interval time.Duration, output chan lp.CCMessage) {
|
||||
if err == nil {
|
||||
sec, nsec := rusage.Utime.Unix()
|
||||
t := float64(sec) + (float64(nsec) * 1e-9)
|
||||
y, err := lp.NewMessage("rusage_user_time", m.tags, m.meta, map[string]any{"value": t}, timestamp)
|
||||
y, err := lp.NewMetric("rusage_user_time", m.tags, m.meta, t, timestamp)
|
||||
if err == nil {
|
||||
y.AddMeta("unit", "seconds")
|
||||
output <- y
|
||||
}
|
||||
sec, nsec = rusage.Stime.Unix()
|
||||
t = float64(sec) + (float64(nsec) * 1e-9)
|
||||
y, err = lp.NewMessage("rusage_system_time", m.tags, m.meta, map[string]any{"value": t}, timestamp)
|
||||
y, err = lp.NewMetric("rusage_system_time", m.tags, m.meta, t, timestamp)
|
||||
if err == nil {
|
||||
y.AddMeta("unit", "seconds")
|
||||
output <- y
|
||||
}
|
||||
y, err = lp.NewMessage("rusage_vol_ctx_switch", m.tags, m.meta, map[string]any{"value": rusage.Nvcsw}, timestamp)
|
||||
y, err = lp.NewMetric("rusage_vol_ctx_switch", m.tags, m.meta, rusage.Nvcsw, timestamp)
|
||||
if err == nil {
|
||||
output <- y
|
||||
}
|
||||
y, err = lp.NewMessage("rusage_invol_ctx_switch", m.tags, m.meta, map[string]any{"value": rusage.Nivcsw}, timestamp)
|
||||
y, err = lp.NewMetric("rusage_invol_ctx_switch", m.tags, m.meta, rusage.Nivcsw, timestamp)
|
||||
if err == nil {
|
||||
output <- y
|
||||
}
|
||||
y, err = lp.NewMessage("rusage_signals", m.tags, m.meta, map[string]any{"value": rusage.Nsignals}, timestamp)
|
||||
y, err = lp.NewMetric("rusage_signals", m.tags, m.meta, rusage.Nsignals, timestamp)
|
||||
if err == nil {
|
||||
output <- y
|
||||
}
|
||||
y, err = lp.NewMessage("rusage_major_pgfaults", m.tags, m.meta, map[string]any{"value": rusage.Majflt}, timestamp)
|
||||
y, err = lp.NewMetric("rusage_major_pgfaults", m.tags, m.meta, rusage.Majflt, timestamp)
|
||||
if err == nil {
|
||||
output <- y
|
||||
}
|
||||
y, err = lp.NewMessage("rusage_minor_pgfaults", m.tags, m.meta, map[string]any{"value": rusage.Minflt}, timestamp)
|
||||
y, err = lp.NewMetric("rusage_minor_pgfaults", m.tags, m.meta, rusage.Minflt, timestamp)
|
||||
if err == nil {
|
||||
output <- y
|
||||
}
|
||||
|
||||
@@ -119,8 +119,9 @@ func (m *SlurmCgroupCollector) Init(config json.RawMessage) error {
|
||||
m.cgroupBase = defaultCgroupBase
|
||||
|
||||
if len(config) > 0 {
|
||||
err = json.Unmarshal(config, &m.config)
|
||||
if err != nil {
|
||||
d := json.NewDecoder(strings.NewReader(string(config)))
|
||||
d.DisallowUnknownFields()
|
||||
if err = d.Decode(&m.config); err != nil {
|
||||
return fmt.Errorf("%s Init(): Error reading JSON config: %w", m.name, err)
|
||||
}
|
||||
m.excludeMetrics = make(map[string]struct{})
|
||||
|
||||
360
collectors/smartmonMetric.go
Normal file
360
collectors/smartmonMetric.go
Normal file
@@ -0,0 +1,360 @@
|
||||
package collectors
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os/exec"
|
||||
"slices"
|
||||
"time"
|
||||
|
||||
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
|
||||
lp "github.com/ClusterCockpit/cc-lib/v2/ccMessage"
|
||||
)
|
||||
|
||||
type SmartMonCollectorConfig struct {
|
||||
UseSudo bool `json:"use_sudo,omitempty"`
|
||||
ExcludeDevices []string `json:"exclude_devices,omitempty"`
|
||||
ExcludeMetrics []string `json:"excludeMetrics,omitempty"`
|
||||
Devices []struct {
|
||||
Name string `json:"name"`
|
||||
Type string `json:"type"`
|
||||
} `json:"devices,omitempty"`
|
||||
}
|
||||
|
||||
type deviceT struct {
|
||||
Name string `json:"name"`
|
||||
Type string `json:"type"`
|
||||
queryCommand []string
|
||||
}
|
||||
|
||||
type SmartMonCollector struct {
|
||||
metricCollector
|
||||
config SmartMonCollectorConfig // the configuration structure
|
||||
meta map[string]string // default meta information
|
||||
tags map[string]string // default tags
|
||||
devices []deviceT // smartmon devices
|
||||
sudoCmd string // Full path to 'sudo' command
|
||||
smartCtlCmd string // Full path to 'smartctl' command
|
||||
excludeMetric struct {
|
||||
temp,
|
||||
percentUsed,
|
||||
availSpare,
|
||||
dataUnitsRead,
|
||||
dataUnitsWrite,
|
||||
hostReads,
|
||||
hostWrites,
|
||||
powerCycles,
|
||||
powerOn,
|
||||
UnsafeShutdowns,
|
||||
mediaErrors,
|
||||
errlogEntries,
|
||||
warnTempTime,
|
||||
critCompTime bool
|
||||
}
|
||||
}
|
||||
|
||||
func (m *SmartMonCollector) getSmartmonDevices() error {
|
||||
// Use configured devices
|
||||
if len(m.config.Devices) > 0 {
|
||||
for _, configDevice := range m.config.Devices {
|
||||
if !slices.Contains(m.config.ExcludeDevices, configDevice.Name) {
|
||||
d := deviceT{
|
||||
Name: configDevice.Name,
|
||||
Type: configDevice.Type,
|
||||
}
|
||||
if m.config.UseSudo {
|
||||
d.queryCommand = append(d.queryCommand, m.sudoCmd)
|
||||
}
|
||||
d.queryCommand = append(d.queryCommand, m.smartCtlCmd, "--json=c", "--device="+d.Type, "--all", d.Name)
|
||||
|
||||
m.devices = append(m.devices, d)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Use scan command
|
||||
var scanCmd []string
|
||||
if m.config.UseSudo {
|
||||
scanCmd = append(scanCmd, m.sudoCmd)
|
||||
}
|
||||
scanCmd = append(scanCmd, m.smartCtlCmd, "--scan", "--json=c")
|
||||
command := exec.Command(scanCmd[0], scanCmd[1:]...)
|
||||
|
||||
stdout, err := command.Output()
|
||||
if err != nil {
|
||||
return fmt.Errorf(
|
||||
"%s getSmartmonDevices(): Failed to execute device scan command %s: %w",
|
||||
m.name, command.String(), err)
|
||||
}
|
||||
|
||||
var scanOutput struct {
|
||||
Devices []deviceT `json:"devices"`
|
||||
}
|
||||
err = json.Unmarshal(stdout, &scanOutput)
|
||||
if err != nil {
|
||||
return fmt.Errorf("%s getSmartmonDevices(): Failed to parse JSON output from device scan command: %w",
|
||||
m.name, err)
|
||||
}
|
||||
|
||||
m.devices = make([]deviceT, 0)
|
||||
for _, d := range scanOutput.Devices {
|
||||
if !slices.Contains(m.config.ExcludeDevices, d.Name) {
|
||||
if m.config.UseSudo {
|
||||
d.queryCommand = append(d.queryCommand, m.sudoCmd)
|
||||
}
|
||||
d.queryCommand = append(d.queryCommand, m.smartCtlCmd, "--json=c", "--device="+d.Type, "--all", d.Name)
|
||||
|
||||
m.devices = append(m.devices, d)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *SmartMonCollector) Init(config json.RawMessage) error {
|
||||
m.name = "SmartMonCollector"
|
||||
if err := m.setup(); err != nil {
|
||||
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
|
||||
}
|
||||
m.parallel = true
|
||||
m.meta = map[string]string{
|
||||
"source": m.name,
|
||||
"group": "Disk",
|
||||
}
|
||||
m.tags = map[string]string{
|
||||
"type": "node",
|
||||
"stype": "disk",
|
||||
}
|
||||
|
||||
// Read in the JSON configuration
|
||||
if len(config) > 0 {
|
||||
d := json.NewDecoder(bytes.NewReader(config))
|
||||
d.DisallowUnknownFields()
|
||||
if err := d.Decode(&m.config); err != nil {
|
||||
return fmt.Errorf("%s Init(): Error reading config: %w", m.name, err)
|
||||
}
|
||||
}
|
||||
for _, excludeMetric := range m.config.ExcludeMetrics {
|
||||
switch excludeMetric {
|
||||
case "smartmon_temp":
|
||||
m.excludeMetric.temp = true
|
||||
case "smartmon_percent_used":
|
||||
m.excludeMetric.percentUsed = true
|
||||
case "smartmon_avail_spare":
|
||||
m.excludeMetric.availSpare = true
|
||||
case "smartmon_data_units_read":
|
||||
m.excludeMetric.dataUnitsRead = true
|
||||
case "smartmon_data_units_write":
|
||||
m.excludeMetric.dataUnitsWrite = true
|
||||
case "smartmon_host_reads":
|
||||
m.excludeMetric.hostReads = true
|
||||
case "smartmon_host_writes":
|
||||
m.excludeMetric.hostWrites = true
|
||||
case "smartmon_power_cycles":
|
||||
m.excludeMetric.powerCycles = true
|
||||
case "smartmon_power_on":
|
||||
m.excludeMetric.powerOn = true
|
||||
case "smartmon_unsafe_shutdowns":
|
||||
m.excludeMetric.UnsafeShutdowns = true
|
||||
case "smartmon_media_errors":
|
||||
m.excludeMetric.mediaErrors = true
|
||||
case "smartmon_errlog_entries":
|
||||
m.excludeMetric.errlogEntries = true
|
||||
case "smartmon_warn_temp_time":
|
||||
m.excludeMetric.warnTempTime = true
|
||||
case "smartmon_crit_comp_time":
|
||||
m.excludeMetric.critCompTime = true
|
||||
default:
|
||||
return fmt.Errorf("%s Init(): Unknown excluded metric: %s", m.name, excludeMetric)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// Check if sudo and smartctl are in search path
|
||||
if m.config.UseSudo {
|
||||
p, err := exec.LookPath("sudo")
|
||||
if err != nil {
|
||||
return fmt.Errorf("%s Init(): No sudo command found in search path: %w", m.name, err)
|
||||
}
|
||||
m.sudoCmd = p
|
||||
}
|
||||
p, err := exec.LookPath("smartctl")
|
||||
if err != nil {
|
||||
return fmt.Errorf("%s Init(): No smartctl command found in search path: %w", m.name, err)
|
||||
}
|
||||
m.smartCtlCmd = p
|
||||
|
||||
if err = m.getSmartmonDevices(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
m.init = true
|
||||
return err
|
||||
}
|
||||
|
||||
type SmartMonData struct {
|
||||
SerialNumber string `json:"serial_number"`
|
||||
UserCapacity struct {
|
||||
Blocks int `json:"blocks"`
|
||||
Bytes int `json:"bytes"`
|
||||
} `json:"user_capacity"`
|
||||
HealthLog struct {
|
||||
// Available SMART health information:
|
||||
// sudo smartctl -a --json=c /dev/nvme0 | jq --color-output | less --RAW-CONTROL-CHARS
|
||||
Temperature int `json:"temperature"`
|
||||
PercentageUsed int `json:"percentage_used"`
|
||||
AvailableSpare int `json:"available_spare"`
|
||||
DataUnitsRead int `json:"data_units_read"`
|
||||
DataUnitsWrite int `json:"data_units_written"`
|
||||
HostReads int `json:"host_reads"`
|
||||
HostWrites int `json:"host_writes"`
|
||||
PowerCycles int `json:"power_cycles"`
|
||||
PowerOnHours int `json:"power_on_hours"`
|
||||
UnsafeShutdowns int `json:"unsafe_shutdowns"`
|
||||
MediaErrors int `json:"media_errors"`
|
||||
NumErrorLogEntries int `json:"num_err_log_entries"`
|
||||
WarnTempTime int `json:"warning_temp_time"`
|
||||
CriticalCompTime int `json:"critical_comp_time"`
|
||||
} `json:"nvme_smart_health_information_log"`
|
||||
}
|
||||
|
||||
func (m *SmartMonCollector) Read(interval time.Duration, output chan lp.CCMessage) {
|
||||
timestamp := time.Now()
|
||||
for _, d := range m.devices {
|
||||
var data SmartMonData
|
||||
command := exec.Command(d.queryCommand[0], d.queryCommand[1:]...)
|
||||
|
||||
stdout, err := command.Output()
|
||||
if err != nil {
|
||||
cclog.ComponentError(m.name, "cannot read data for device", d.Name)
|
||||
continue
|
||||
}
|
||||
err = json.Unmarshal(stdout, &data)
|
||||
if err != nil {
|
||||
cclog.ComponentError(m.name, "cannot unmarshal data for device", d.Name)
|
||||
continue
|
||||
}
|
||||
if !m.excludeMetric.temp {
|
||||
y, err := lp.NewMetric(
|
||||
"smartmon_temp", m.tags, m.meta, data.HealthLog.Temperature, timestamp)
|
||||
if err == nil {
|
||||
y.AddTag("stype-id", d.Name)
|
||||
y.AddMeta("unit", "degC")
|
||||
output <- y
|
||||
}
|
||||
}
|
||||
if !m.excludeMetric.percentUsed {
|
||||
y, err := lp.NewMetric(
|
||||
"smartmon_percent_used", m.tags, m.meta, data.HealthLog.PercentageUsed, timestamp)
|
||||
if err == nil {
|
||||
y.AddTag("stype-id", d.Name)
|
||||
y.AddMeta("unit", "percent")
|
||||
output <- y
|
||||
}
|
||||
}
|
||||
if !m.excludeMetric.availSpare {
|
||||
y, err := lp.NewMetric(
|
||||
"smartmon_avail_spare", m.tags, m.meta, data.HealthLog.AvailableSpare, timestamp)
|
||||
if err == nil {
|
||||
y.AddTag("stype-id", d.Name)
|
||||
y.AddMeta("unit", "percent")
|
||||
output <- y
|
||||
}
|
||||
}
|
||||
if !m.excludeMetric.dataUnitsRead {
|
||||
y, err := lp.NewMetric(
|
||||
"smartmon_data_units_read", m.tags, m.meta, data.HealthLog.DataUnitsRead, timestamp)
|
||||
if err == nil {
|
||||
y.AddTag("stype-id", d.Name)
|
||||
output <- y
|
||||
}
|
||||
}
|
||||
if !m.excludeMetric.dataUnitsWrite {
|
||||
y, err := lp.NewMetric(
|
||||
"smartmon_data_units_write", m.tags, m.meta, data.HealthLog.DataUnitsWrite, timestamp)
|
||||
if err == nil {
|
||||
y.AddTag("stype-id", d.Name)
|
||||
output <- y
|
||||
}
|
||||
}
|
||||
if !m.excludeMetric.hostReads {
|
||||
y, err := lp.NewMetric(
|
||||
"smartmon_host_reads", m.tags, m.meta, data.HealthLog.HostReads, timestamp)
|
||||
if err == nil {
|
||||
y.AddTag("stype-id", d.Name)
|
||||
output <- y
|
||||
}
|
||||
}
|
||||
if !m.excludeMetric.hostWrites {
|
||||
y, err := lp.NewMetric(
|
||||
"smartmon_host_writes", m.tags, m.meta, data.HealthLog.HostWrites, timestamp)
|
||||
if err == nil {
|
||||
y.AddTag("stype-id", d.Name)
|
||||
output <- y
|
||||
}
|
||||
}
|
||||
if !m.excludeMetric.powerCycles {
|
||||
y, err := lp.NewMetric(
|
||||
"smartmon_power_cycles", m.tags, m.meta, data.HealthLog.PowerCycles, timestamp)
|
||||
if err == nil {
|
||||
y.AddTag("stype-id", d.Name)
|
||||
output <- y
|
||||
}
|
||||
}
|
||||
if !m.excludeMetric.powerOn {
|
||||
y, err := lp.NewMetric(
|
||||
"smartmon_power_on", m.tags, m.meta, int64(data.HealthLog.PowerOnHours)*3600, timestamp)
|
||||
if err == nil {
|
||||
y.AddTag("stype-id", d.Name)
|
||||
y.AddMeta("unit", "sec")
|
||||
output <- y
|
||||
}
|
||||
}
|
||||
if !m.excludeMetric.UnsafeShutdowns {
|
||||
y, err := lp.NewMetric(
|
||||
"smartmon_unsafe_shutdowns", m.tags, m.meta, data.HealthLog.UnsafeShutdowns, timestamp)
|
||||
if err == nil {
|
||||
y.AddTag("stype-id", d.Name)
|
||||
output <- y
|
||||
}
|
||||
}
|
||||
if !m.excludeMetric.mediaErrors {
|
||||
y, err := lp.NewMetric(
|
||||
"smartmon_media_errors", m.tags, m.meta, data.HealthLog.MediaErrors, timestamp)
|
||||
if err == nil {
|
||||
y.AddTag("stype-id", d.Name)
|
||||
output <- y
|
||||
}
|
||||
}
|
||||
if !m.excludeMetric.errlogEntries {
|
||||
y, err := lp.NewMetric(
|
||||
"smartmon_errlog_entries", m.tags, m.meta, data.HealthLog.NumErrorLogEntries, timestamp)
|
||||
if err == nil {
|
||||
y.AddTag("stype-id", d.Name)
|
||||
output <- y
|
||||
}
|
||||
}
|
||||
if !m.excludeMetric.warnTempTime {
|
||||
y, err := lp.NewMetric(
|
||||
"smartmon_warn_temp_time", m.tags, m.meta, data.HealthLog.WarnTempTime, timestamp)
|
||||
if err == nil {
|
||||
y.AddTag("stype-id", d.Name)
|
||||
output <- y
|
||||
}
|
||||
}
|
||||
if !m.excludeMetric.critCompTime {
|
||||
y, err := lp.NewMetric(
|
||||
"smartmon_crit_comp_time", m.tags, m.meta, data.HealthLog.CriticalCompTime, timestamp)
|
||||
if err == nil {
|
||||
y.AddTag("stype-id", d.Name)
|
||||
output <- y
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (m *SmartMonCollector) Close() {
|
||||
m.init = false
|
||||
}
|
||||
52
collectors/smartmonMetric.md
Normal file
52
collectors/smartmonMetric.md
Normal file
@@ -0,0 +1,52 @@
|
||||
<!--
|
||||
---
|
||||
title: smartmon metric collector
|
||||
description: Collect S.M.A.R.T data from NVMEs
|
||||
categories: [cc-metric-collector]
|
||||
tags: ['Admin']
|
||||
weight: 2
|
||||
hugo_path: docs/reference/cc-metric-collector/collectors/smartmonMetric.md
|
||||
---
|
||||
-->
|
||||
|
||||
## `smartmon` collector
|
||||
|
||||
```json
|
||||
"smartmon": {
|
||||
"use_sudo": true,
|
||||
"exclude_devices": [
|
||||
"/dev/sda"
|
||||
],
|
||||
"excludeMetrics": [
|
||||
"smartmon_warn_temp_time",
|
||||
"smartmon_crit_comp_time"
|
||||
],
|
||||
"devices": [
|
||||
{
|
||||
"name": "/dev/nvme0",
|
||||
"type": "nvme"
|
||||
}
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
The `smartmon` collector retrieves S.M.A.R.T data from NVMEs via command `smartctl`.
|
||||
|
||||
Available NVMEs can be either automatically detected by a device scan or manually added with the "devices" config option.
|
||||
|
||||
Metrics:
|
||||
|
||||
* `smartmon_temp`: Temperature of the device (`unit=degC`)
|
||||
* `smartmon_avail_spare`: Amount of spare left (`unit=percent`)
|
||||
* `smartmon_percent_used`: Percentage of the device is used (`unit=percent`)
|
||||
* `smartmon_data_units_read`: Read data units
|
||||
* `smartmon_data_units_write`: Written data units
|
||||
* `smartmon_host_reads`: Read operations
|
||||
* `smartmon_host_writes`: Write operations
|
||||
* `smartmon_power_cycles`: Number of power cycles
|
||||
* `smartmon_power_on`: Seconds the device is powered on (`unit=seconds`)
|
||||
* `smartmon_unsafe_shutdowns`: Count of unsafe shutdowns
|
||||
* `smartmon_media_errors`: Media errors of the device
|
||||
* `smartmon_errlog_entries`: Error log entries
|
||||
* `smartmon_warn_temp_time`: Time above the warning temperature threshold
|
||||
* `smartmon_crit_comp_time`: Time above the critical composite temperature threshold
|
||||
@@ -8,6 +8,7 @@
|
||||
package collectors
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
@@ -63,9 +64,10 @@ func (m *TempCollector) Init(config json.RawMessage) error {
|
||||
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
|
||||
}
|
||||
if len(config) > 0 {
|
||||
err := json.Unmarshal(config, &m.config)
|
||||
if err != nil {
|
||||
return fmt.Errorf("%s Init(): failed to unmarshal JSON config: %w", m.name, err)
|
||||
d := json.NewDecoder(bytes.NewReader(config))
|
||||
d.DisallowUnknownFields()
|
||||
if err := d.Decode(&m.config); err != nil {
|
||||
return fmt.Errorf("%s Init(): Error decoding JSON config: %w", m.name, err)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -8,6 +8,7 @@
|
||||
package collectors
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os/exec"
|
||||
@@ -46,9 +47,10 @@ func (m *TopProcsCollector) Init(config json.RawMessage) error {
|
||||
"group": "TopProcs",
|
||||
}
|
||||
if len(config) > 0 {
|
||||
err = json.Unmarshal(config, &m.config)
|
||||
if err != nil {
|
||||
return fmt.Errorf("%s Init(): json.Unmarshal() failed: %w", m.name, err)
|
||||
d := json.NewDecoder(bytes.NewReader(config))
|
||||
d.DisallowUnknownFields()
|
||||
if err := d.Decode(&m.config); err != nil {
|
||||
return fmt.Errorf("%s Init(): Error decoding JSON config: %w", m.name, err)
|
||||
}
|
||||
} else {
|
||||
m.config.Num_procs = int(DEFAULT_NUM_PROCS)
|
||||
|
||||
@@ -34,8 +34,8 @@
|
||||
},
|
||||
"numastats": {},
|
||||
"nvidia": {},
|
||||
"schedstat": {
|
||||
},
|
||||
"schedstat": {},
|
||||
"smartmon": {},
|
||||
"tempstat": {
|
||||
"report_max_temperature": true,
|
||||
"report_critical_temperature": true,
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"process_messages" : {
|
||||
"add_tag_if": [
|
||||
"add_tags_if": [
|
||||
{
|
||||
"key" : "cluster",
|
||||
"value" : "testcluster",
|
||||
@@ -12,7 +12,7 @@
|
||||
"if" : "name == 'temp_package_id_0'"
|
||||
}
|
||||
],
|
||||
"delete_tag_if": [
|
||||
"delete_meta_if": [
|
||||
{
|
||||
"key" : "unit",
|
||||
"if" : "true"
|
||||
|
||||
5
go.mod
5
go.mod
@@ -3,14 +3,14 @@ module github.com/ClusterCockpit/cc-metric-collector
|
||||
go 1.25.0
|
||||
|
||||
require (
|
||||
github.com/ClusterCockpit/cc-lib/v2 v2.7.0
|
||||
github.com/ClusterCockpit/cc-lib/v2 v2.8.2
|
||||
github.com/ClusterCockpit/go-rocm-smi v0.3.0
|
||||
github.com/NVIDIA/go-nvml v0.13.0-1
|
||||
github.com/PaesslerAG/gval v1.2.4
|
||||
github.com/fsnotify/fsnotify v1.9.0
|
||||
github.com/tklauser/go-sysconf v0.3.16
|
||||
golang.design/x/thread v0.0.0-20210122121316-335e9adffdf1
|
||||
golang.org/x/sys v0.41.0
|
||||
golang.org/x/sys v0.42.0
|
||||
)
|
||||
|
||||
require (
|
||||
@@ -39,7 +39,6 @@ require (
|
||||
github.com/tklauser/numcpus v0.11.0 // indirect
|
||||
go.yaml.in/yaml/v2 v2.4.3 // indirect
|
||||
golang.org/x/crypto v0.48.0 // indirect
|
||||
golang.org/x/exp v0.0.0-20260218203240-3dfff04db8fa // indirect
|
||||
golang.org/x/net v0.51.0 // indirect
|
||||
google.golang.org/protobuf v1.36.11 // indirect
|
||||
)
|
||||
|
||||
12
go.sum
12
go.sum
@@ -1,5 +1,7 @@
|
||||
github.com/ClusterCockpit/cc-lib/v2 v2.7.0 h1:EMTShk6rMTR1wlfmQ8SVCawH1OdltUbD3kVQmaW+5pE=
|
||||
github.com/ClusterCockpit/cc-lib/v2 v2.7.0/go.mod h1:0Etx8WMs0lYZ4tiOQizY18CQop+2i3WROvU9rMUxHA4=
|
||||
github.com/ClusterCockpit/cc-lib/v2 v2.8.0 h1:ROduRzRuusi+6kLB991AAu3Pp2AHOasQJFJc7JU/n/E=
|
||||
github.com/ClusterCockpit/cc-lib/v2 v2.8.0/go.mod h1:FwD8vnTIbBM3ngeLNKmCvp9FoSjQZm7xnuaVxEKR23o=
|
||||
github.com/ClusterCockpit/cc-lib/v2 v2.8.2 h1:rCLZk8wz8yq8xBnBEdVKigvA2ngR8dPmHbEFwxxb3jw=
|
||||
github.com/ClusterCockpit/cc-lib/v2 v2.8.2/go.mod h1:FwD8vnTIbBM3ngeLNKmCvp9FoSjQZm7xnuaVxEKR23o=
|
||||
github.com/ClusterCockpit/cc-line-protocol/v2 v2.4.0 h1:hIzxgTBWcmCIHtoDKDkSCsKCOCOwUC34sFsbD2wcW0Q=
|
||||
github.com/ClusterCockpit/cc-line-protocol/v2 v2.4.0/go.mod h1:y42qUu+YFmu5fdNuUAS4VbbIKxVjxCvbVqFdpdh8ahY=
|
||||
github.com/ClusterCockpit/go-rocm-smi v0.3.0 h1:1qZnSpG7/NyLtc7AjqnUL9Jb8xtqG1nMVgp69rJfaR8=
|
||||
@@ -105,13 +107,11 @@ golang.design/x/thread v0.0.0-20210122121316-335e9adffdf1 h1:P7S/GeHBAFEZIYp0ePP
|
||||
golang.design/x/thread v0.0.0-20210122121316-335e9adffdf1/go.mod h1:9CWpnTUmlQkfdpdutA1nNf4iE5lAVt3QZOu0Z6hahBE=
|
||||
golang.org/x/crypto v0.48.0 h1:/VRzVqiRSggnhY7gNRxPauEQ5Drw9haKdM0jqfcCFts=
|
||||
golang.org/x/crypto v0.48.0/go.mod h1:r0kV5h3qnFPlQnBSrULhlsRfryS2pmewsg+XfMgkVos=
|
||||
golang.org/x/exp v0.0.0-20260218203240-3dfff04db8fa h1:Zt3DZoOFFYkKhDT3v7Lm9FDMEV06GpzjG2jrqW+QTE0=
|
||||
golang.org/x/exp v0.0.0-20260218203240-3dfff04db8fa/go.mod h1:K79w1Vqn7PoiZn+TkNpx3BUWUQksGO3JcVX6qIjytmA=
|
||||
golang.org/x/net v0.51.0 h1:94R/GTO7mt3/4wIKpcR5gkGmRLOuE/2hNGeWq/GBIFo=
|
||||
golang.org/x/net v0.51.0/go.mod h1:aamm+2QF5ogm02fjy5Bb7CQ0WMt1/WVM7FtyaTLlA9Y=
|
||||
golang.org/x/sys v0.0.0-20210122093101-04d7465088b8/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.41.0 h1:Ivj+2Cp/ylzLiEU89QhWblYnOE9zerudt9Ftecq2C6k=
|
||||
golang.org/x/sys v0.41.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
|
||||
golang.org/x/sys v0.42.0 h1:omrd2nAlyT5ESRdCLYdm3+fMfNFE/+Rf4bDIQImRJeo=
|
||||
golang.org/x/sys v0.42.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
|
||||
golang.org/x/time v0.14.0 h1:MRx4UaLrDotUKUdCIqzPC48t1Y9hANFKIRpNx+Te8PI=
|
||||
golang.org/x/time v0.14.0/go.mod h1:eL/Oa2bBBK0TkX57Fyni+NgnyQQN4LitPmob2Hjnqw4=
|
||||
google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE=
|
||||
|
||||
@@ -94,8 +94,7 @@ func (c *metricAggregator) Init(output chan lp.CCMessage) error {
|
||||
// Set hostname
|
||||
hostname, err := os.Hostname()
|
||||
if err != nil {
|
||||
cclog.Error(err.Error())
|
||||
return err
|
||||
return fmt.Errorf("metricAggregator: failed to get hostname: %w", err)
|
||||
}
|
||||
// Drop domain part of host name
|
||||
c.constants["hostname"] = strings.SplitN(hostname, `.`, 2)[0]
|
||||
|
||||
@@ -8,6 +8,7 @@
|
||||
package metricRouter
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -70,8 +71,7 @@ func (c *metricCache) Init(output chan lp.CCMessage, ticker mct.MultiChanTicker,
|
||||
// The code is executed by the MetricCache goroutine
|
||||
c.aggEngine, err = agg.NewAggregator(c.output)
|
||||
if err != nil {
|
||||
cclog.ComponentError("MetricCache", "Cannot create aggregator")
|
||||
return err
|
||||
return fmt.Errorf("MetricCache: failed to create aggregator: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
@@ -8,6 +8,7 @@
|
||||
package metricRouter
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"maps"
|
||||
@@ -46,7 +47,6 @@ type metricRouterConfig struct {
|
||||
MaxForward int `json:"max_forward"` // Number of maximal forwarded metrics at one select
|
||||
NormalizeUnits bool `json:"normalize_units"` // Check unit meta flag and normalize it using cc-units
|
||||
ChangeUnitPrefix map[string]string `json:"change_unit_prefix"` // Add prefix that should be applied to the metrics
|
||||
// dropMetrics map[string]bool // Internal map for O(1) lookup
|
||||
MessageProcessor json.RawMessage `json:"process_messages,omitempty"`
|
||||
}
|
||||
|
||||
@@ -102,18 +102,17 @@ func (r *metricRouter) Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, rout
|
||||
// Drop domain part of host name
|
||||
r.hostname = strings.SplitN(hostname, `.`, 2)[0]
|
||||
|
||||
err = json.Unmarshal(routerConfig, &r.config)
|
||||
if err != nil {
|
||||
cclog.ComponentError("MetricRouter", err.Error())
|
||||
return err
|
||||
d := json.NewDecoder(bytes.NewReader(routerConfig))
|
||||
d.DisallowUnknownFields()
|
||||
if err := d.Decode(&r.config); err != nil {
|
||||
return fmt.Errorf("failed to decode metric router config: %w", err)
|
||||
}
|
||||
r.maxForward = max(1, r.config.MaxForward)
|
||||
|
||||
if r.config.NumCacheIntervals > 0 {
|
||||
r.cache, err = NewCache(r.cache_input, r.ticker, &r.cachewg, r.config.NumCacheIntervals)
|
||||
if err != nil {
|
||||
cclog.ComponentError("MetricRouter", "MetricCache initialization failed:", err.Error())
|
||||
return err
|
||||
return fmt.Errorf("MetricRouter: failed to initialize MetricCache: %w", err)
|
||||
}
|
||||
for _, agg := range r.config.IntervalAgg {
|
||||
err = r.cache.AddAggregation(agg.Name, agg.Function, agg.Condition, agg.Tags, agg.Meta)
|
||||
|
||||
Reference in New Issue
Block a user