Stricter json parsing (#204)

This commit is contained in:
Holger Obermaier
2026-03-11 15:59:14 +01:00
committed by GitHub
parent a927565868
commit b65576431e
39 changed files with 246 additions and 191 deletions

View File

@@ -8,6 +8,7 @@
package main
import (
"bytes"
"encoding/json"
"flag"
"os"
@@ -48,22 +49,22 @@ type RuntimeConfig struct {
Sync sync.WaitGroup
}
// ReadCli reads the command line arguments
func ReadCli() map[string]string {
var m map[string]string
cfg := flag.String("config", "./config.json", "Path to configuration file")
logfile := flag.String("log", "stderr", "Path for logfile")
once := flag.Bool("once", false, "Run all collectors only once")
loglevel := flag.String("loglevel", "info", "Set log level")
flag.Parse()
m = make(map[string]string)
m["configfile"] = *cfg
m["logfile"] = *logfile
m := map[string]string{
"configfile": *cfg,
"logfile": *logfile,
"once": "false",
"loglevel": *loglevel,
}
if *once {
m["once"] = "true"
} else {
m["once"] = "false"
}
m["loglevel"] = *loglevel
return m
}
@@ -120,9 +121,10 @@ func mainFunc() int {
// Load and check configuration
main := ccconf.GetPackageConfig("main")
err = json.Unmarshal(main, &rcfg.ConfigFile)
if err != nil {
cclog.Error("Error reading configuration file ", rcfg.CliArgs["configfile"], ": ", err.Error())
d := json.NewDecoder(bytes.NewReader(main))
d.DisallowUnknownFields()
if err := d.Decode(&rcfg.ConfigFile); err != nil {
cclog.Errorf("Error reading configuration file %s: %v", rcfg.CliArgs["configfile"], err)
return 1
}

View File

@@ -59,6 +59,7 @@ In contrast to the configuration files for sinks and receivers, the collectors c
* [ ] Aggreate metrics to higher topology entity (sum hwthread metrics to socket metric, ...). Needs to be configurable
# Contributing own collectors
A collector reads data from any source, parses it to metrics and submits these metrics to the `metric-collector`. A collector provides three function:
* `Name() string`: Return the name of the collector
@@ -104,8 +105,10 @@ func (m *SampleCollector) Init(config json.RawMessage) error {
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
}
if len(config) > 0 {
if err := json.Unmarshal(config, &m.config); err != nil {
return fmt.Errorf("%s Init(): json.Unmarshal() call failed: %w", m.name, err)
d := json.NewDecoder(bytes.NewReader(config))
d.DisallowUnknownFields()
if err := d.Decode(&m.config); err != nil {
return fmt.Errorf("%s Init(): Error decoding JSON config: %w", m.name, err)
}
}
m.meta = map[string]string{"source": m.name, "group": "Sample"}

View File

@@ -32,7 +32,7 @@ const DEFAULT_BEEGFS_CMD = "beegfs-ctl"
type BeegfsMetaCollectorConfig struct {
Beegfs string `json:"beegfs_path"`
ExcludeMetrics []string `json:"exclude_metrics,omitempty"`
ExcludeFilesystem []string `json:"exclude_filesystem"`
ExcludeFilesystems []string `json:"exclude_filesystem"`
}
type BeegfsMetaCollector struct {
@@ -74,9 +74,10 @@ func (m *BeegfsMetaCollector) Init(config json.RawMessage) error {
// Read JSON configuration
if len(config) > 0 {
err := json.Unmarshal(config, &m.config)
if err != nil {
return err
d := json.NewDecoder(bytes.NewReader(config))
d.DisallowUnknownFields()
if err := d.Decode(&m.config); err != nil {
return fmt.Errorf("%s Init(): Failed to decode JSON config: %w", m.name, err)
}
}
@@ -99,7 +100,7 @@ func (m *BeegfsMetaCollector) Init(config json.RawMessage) error {
"filesystem": "",
}
m.skipFS = make(map[string]struct{})
for _, fs := range m.config.ExcludeFilesystem {
for _, fs := range m.config.ExcludeFilesystems {
m.skipFS[fs] = struct{}{}
}

View File

@@ -30,7 +30,7 @@ import (
type BeegfsStorageCollectorConfig struct {
Beegfs string `json:"beegfs_path"`
ExcludeMetrics []string `json:"exclude_metrics,omitempty"`
ExcludeFilesystem []string `json:"exclude_filesystem"`
ExcludeFilesystems []string `json:"exclude_filesystem"`
}
type BeegfsStorageCollector struct {
@@ -67,9 +67,10 @@ func (m *BeegfsStorageCollector) Init(config json.RawMessage) error {
// Read JSON configuration
if len(config) > 0 {
err := json.Unmarshal(config, &m.config)
if err != nil {
return err
d := json.NewDecoder(bytes.NewReader(config))
d.DisallowUnknownFields()
if err := d.Decode(&m.config); err != nil {
return fmt.Errorf("%s Init(): failed to decode JSON config: %w", m.name, err)
}
}
@@ -92,7 +93,7 @@ func (m *BeegfsStorageCollector) Init(config json.RawMessage) error {
"filesystem": "",
}
m.skipFS = make(map[string]struct{})
for _, fs := range m.config.ExcludeFilesystem {
for _, fs := range m.config.ExcludeFilesystems {
m.skipFS[fs] = struct{}{}
}

View File

@@ -8,6 +8,7 @@
package collectors
import (
"bytes"
"encoding/json"
"fmt"
"sync"
@@ -88,10 +89,10 @@ func (cm *collectorManager) Init(ticker mct.MultiChanTicker, duration time.Durat
cm.ticker = ticker
cm.duration = duration
err := json.Unmarshal(collectConfig, &cm.config)
if err != nil {
cclog.Error(err.Error())
return err
d := json.NewDecoder(bytes.NewReader(collectConfig))
d.DisallowUnknownFields()
if err := d.Decode(&cm.config); err != nil {
return fmt.Errorf("%s Init(): Error decoding collector manager config: %w", "CollectorManager", err)
}
// Initialize configured collectors
@@ -102,7 +103,7 @@ func (cm *collectorManager) Init(ticker mct.MultiChanTicker, duration time.Durat
}
collector := AvailableCollectors[collectorName]
err = collector.Init(collectorCfg)
err := collector.Init(collectorCfg)
if err != nil {
cclog.ComponentError("CollectorManager", fmt.Sprintf("Collector %s initialization failed: %v", collectorName, err))
continue

View File

@@ -12,7 +12,9 @@ hugo_path: docs/reference/cc-metric-collector/collectors/cpufreq_cpuinfo.md
## `cpufreq_cpuinfo` collector
```json
"cpufreq_cpuinfo": {}
"cpufreq_cpuinfo": {
"exclude_metrics": []
}
```
The `cpufreq_cpuinfo` collector reads the clock frequency from `/proc/cpuinfo` and outputs a handful **hwthread** metrics.

View File

@@ -8,6 +8,7 @@
package collectors
import (
"bytes"
"encoding/json"
"fmt"
"os"
@@ -54,9 +55,10 @@ func (m *CPUFreqCollector) Init(config json.RawMessage) error {
}
m.parallel = true
if len(config) > 0 {
err := json.Unmarshal(config, &m.config)
if err != nil {
return err
d := json.NewDecoder(bytes.NewReader(config))
d.DisallowUnknownFields()
if err := d.Decode(&m.config); err != nil {
return fmt.Errorf("%s Init(): failed to decode JSON config: %w", m.name, err)
}
}
m.meta = map[string]string{

View File

@@ -9,6 +9,7 @@ package collectors
import (
"bufio"
"bytes"
"encoding/json"
"fmt"
"os"
@@ -53,9 +54,10 @@ func (m *CpustatCollector) Init(config json.RawMessage) error {
"type": "node",
}
if len(config) > 0 {
err := json.Unmarshal(config, &m.config)
if err != nil {
return err
d := json.NewDecoder(bytes.NewReader(config))
d.DisallowUnknownFields()
if err := d.Decode(&m.config); err != nil {
return fmt.Errorf("%s Init(): Error decoding JSON config: %w", m.name, err)
}
}
matches := map[string]int{
@@ -79,19 +81,10 @@ func (m *CpustatCollector) Init(config json.RawMessage) error {
}
// Check input file
file, err := os.Open(string(CPUSTATFILE))
file, err := os.Open(CPUSTATFILE)
if err != nil {
cclog.ComponentError(
m.name,
fmt.Sprintf("Init(): Failed to open file '%s': %v", string(CPUSTATFILE), err))
return fmt.Errorf("%s Init(): Failed to open file '%s': %w", m.name, CPUSTATFILE, err)
}
defer func() {
if err := file.Close(); err != nil {
cclog.ComponentError(
m.name,
fmt.Sprintf("Init(): Failed to close file '%s': %v", string(CPUSTATFILE), err))
}
}()
// Pre-generate tags for all CPUs
num_cpus := 0
@@ -120,6 +113,12 @@ func (m *CpustatCollector) Init(config json.RawMessage) error {
num_cpus++
}
}
// Close file
if err := file.Close(); err != nil {
return fmt.Errorf("%s Init(): Failed to close file '%s': %w", m.name, CPUSTATFILE, err)
}
m.lastTimestamp = time.Now()
m.init = true
return nil
@@ -166,11 +165,11 @@ func (m *CpustatCollector) Read(interval time.Duration, output chan lp.CCMessage
now := time.Now()
tsdelta := now.Sub(m.lastTimestamp)
file, err := os.Open(string(CPUSTATFILE))
file, err := os.Open(CPUSTATFILE)
if err != nil {
cclog.ComponentError(
m.name,
fmt.Sprintf("Read(): Failed to open file '%s': %v", string(CPUSTATFILE), err))
fmt.Sprintf("Read(): Failed to open file '%s': %v", CPUSTATFILE, err))
}
defer func() {
if err := file.Close(); err != nil {

View File

@@ -8,6 +8,7 @@
package collectors
import (
"bytes"
"encoding/json"
"errors"
"fmt"
@@ -47,8 +48,10 @@ func (m *CustomCmdCollector) Init(config json.RawMessage) error {
// Read configuration
if len(config) > 0 {
if err := json.Unmarshal(config, &m.config); err != nil {
return fmt.Errorf("%s Init(): json.Unmarshal() call failed: %w", m.name, err)
d := json.NewDecoder(bytes.NewReader(config))
d.DisallowUnknownFields()
if err := d.Decode(&m.config); err != nil {
return fmt.Errorf("%s Init(): Error decoding JSON config: %w", m.name, err)
}
}

View File

@@ -9,6 +9,7 @@ package collectors
import (
"bufio"
"bytes"
"encoding/json"
"fmt"
"os"
@@ -42,8 +43,10 @@ func (m *DiskstatCollector) Init(config json.RawMessage) error {
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
}
if len(config) > 0 {
if err := json.Unmarshal(config, &m.config); err != nil {
return err
d := json.NewDecoder(bytes.NewReader(config))
d.DisallowUnknownFields()
if err := d.Decode(&m.config); err != nil {
return fmt.Errorf("%s Init(): Error decoding JSON config: %w", m.name, err)
}
}
m.allowedMetrics = map[string]bool{

View File

@@ -32,7 +32,7 @@ type GpfsCollectorState map[string]int64
type GpfsCollectorConfig struct {
Mmpmon string `json:"mmpmon_path,omitempty"`
ExcludeFilesystem []string `json:"exclude_filesystem,omitempty"`
ExcludeFilesystems []string `json:"exclude_filesystem,omitempty"`
ExcludeMetrics []string `json:"exclude_metrics,omitempty"`
Sudo bool `json:"use_sudo,omitempty"`
SendAbsoluteValues bool `json:"send_abs_values,omitempty"`
@@ -322,9 +322,10 @@ func (m *GpfsCollector) Init(config json.RawMessage) error {
// Read JSON configuration
if len(config) > 0 {
err := json.Unmarshal(config, &m.config)
if err != nil {
return fmt.Errorf("%s Init(): failed to unmarshal JSON config: %w", m.name, err)
d := json.NewDecoder(bytes.NewReader(config))
d.DisallowUnknownFields()
if err := d.Decode(&m.config); err != nil {
return fmt.Errorf("%s Init(): failed to decode JSON config: %w", m.name, err)
}
}
m.meta = map[string]string{
@@ -336,7 +337,7 @@ func (m *GpfsCollector) Init(config json.RawMessage) error {
"filesystem": "",
}
m.skipFS = make(map[string]struct{})
for _, fs := range m.config.ExcludeFilesystem {
for _, fs := range m.config.ExcludeFilesystems {
m.skipFS[fs] = struct{}{}
}
m.lastState = make(map[string]GpfsCollectorState)
@@ -377,7 +378,6 @@ func (m *GpfsCollector) Init(config json.RawMessage) error {
// the file was given in the config, use it
p = m.config.Mmpmon
} else {
cclog.ComponentError(m.name, fmt.Sprintf("failed to find mmpmon binary '%s': %v", m.config.Mmpmon, err))
return fmt.Errorf("%s Init(): failed to find mmpmon binary '%s': %w", m.name, m.config.Mmpmon, err)
}
}

View File

@@ -14,7 +14,7 @@ hugo_path: docs/reference/cc-metric-collector/collectors/gpfs.md
```json
"gpfs": {
"mmpmon_path": "/path/to/mmpmon",
"use_sudo": "true",
"use_sudo": true,
"exclude_filesystem": [
"fs1"
],

View File

@@ -8,6 +8,7 @@
package collectors
import (
"bytes"
"encoding/json"
"fmt"
"os"
@@ -79,9 +80,10 @@ func (m *InfinibandCollector) Init(config json.RawMessage) error {
m.config.SendDerivedValues = false
// Read configuration file, allow overwriting default config
if len(config) > 0 {
err = json.Unmarshal(config, &m.config)
if err != nil {
return err
d := json.NewDecoder(bytes.NewReader(config))
d.DisallowUnknownFields()
if err := d.Decode(&m.config); err != nil {
return fmt.Errorf("%s Init(): failed to decode JSON config: %w", m.name, err)
}
}

View File

@@ -9,6 +9,7 @@ package collectors
import (
"bufio"
"bytes"
"encoding/json"
"errors"
"fmt"
@@ -52,9 +53,10 @@ func (m *IOstatCollector) Init(config json.RawMessage) error {
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
}
if len(config) > 0 {
err = json.Unmarshal(config, &m.config)
if err != nil {
return err
d := json.NewDecoder(bytes.NewReader(config))
d.DisallowUnknownFields()
if err := d.Decode(&m.config); err != nil {
return fmt.Errorf("%s Init(): Error decoding JSON config: %w", m.name, err)
}
}
// https://www.kernel.org/doc/html/latest/admin-guide/iostats.html

View File

@@ -56,9 +56,10 @@ func (m *IpmiCollector) Init(config json.RawMessage) error {
m.config.IpmitoolPath = "ipmitool"
m.config.IpmisensorsPath = "ipmi-sensors"
if len(config) > 0 {
err := json.Unmarshal(config, &m.config)
if err != nil {
return err
d := json.NewDecoder(bytes.NewReader(config))
d.DisallowUnknownFields()
if err := d.Decode(&m.config); err != nil {
return fmt.Errorf("%s Init(): Error decoding JSON config: %w", m.name, err)
}
}
// Check if executables ipmitool or ipmisensors are found

View File

@@ -14,7 +14,7 @@ hugo_path: docs/reference/cc-metric-collector/collectors/ipmi.md
```json
"ipmistat": {
"ipmitool_path": "/path/to/ipmitool",
"ipmisensors_path": "/path/to/ipmi-sensors",
"ipmisensors_path": "/path/to/ipmi-sensors"
}
```

View File

@@ -16,6 +16,7 @@ package collectors
import "C"
import (
"bytes"
"encoding/json"
"errors"
"fmt"
@@ -207,9 +208,10 @@ func (m *LikwidCollector) Init(config json.RawMessage) error {
m.config.LibraryPath = LIKWID_LIB_NAME
m.config.LockfilePath = LIKWID_DEF_LOCKFILE
if len(config) > 0 {
err := json.Unmarshal(config, &m.config)
if err != nil {
return fmt.Errorf("%s Init(): failed to unmarshal JSON config: %w", m.name, err)
d := json.NewDecoder(bytes.NewReader(config))
d.DisallowUnknownFields()
if err := d.Decode(&m.config); err != nil {
return fmt.Errorf("%s Init(): Error decoding JSON config: %w", m.name, err)
}
}
lib := dl.New(m.config.LibraryPath, LIKWID_LIB_DL_FLAGS)

View File

@@ -8,6 +8,7 @@
package collectors
import (
"bytes"
"encoding/json"
"fmt"
"os"
@@ -48,9 +49,10 @@ func (m *LoadavgCollector) Init(config json.RawMessage) error {
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
}
if len(config) > 0 {
err := json.Unmarshal(config, &m.config)
if err != nil {
return err
d := json.NewDecoder(bytes.NewReader(config))
d.DisallowUnknownFields()
if err := d.Decode(&m.config); err != nil {
return fmt.Errorf("%s Init(): Error decoding JSON config: %w", m.name, err)
}
}
m.meta = map[string]string{
@@ -63,16 +65,17 @@ func (m *LoadavgCollector) Init(config json.RawMessage) error {
"load_five",
"load_fifteen",
}
m.load_skips = make([]bool, len(m.load_matches))
m.proc_matches = []string{
"proc_run",
"proc_total",
}
m.proc_skips = make([]bool, len(m.proc_matches))
m.load_skips = make([]bool, len(m.load_matches))
for i, name := range m.load_matches {
m.load_skips[i] = slices.Contains(m.config.ExcludeMetrics, name)
}
m.proc_skips = make([]bool, len(m.proc_matches))
for i, name := range m.proc_matches {
m.proc_skips[i] = slices.Contains(m.config.ExcludeMetrics, name)
}

View File

@@ -8,6 +8,7 @@
package collectors
import (
"bytes"
"encoding/json"
"errors"
"fmt"
@@ -300,9 +301,10 @@ func (m *LustreCollector) Init(config json.RawMessage) error {
m.name = "LustreCollector"
m.parallel = true
if len(config) > 0 {
err = json.Unmarshal(config, &m.config)
if err != nil {
return err
d := json.NewDecoder(bytes.NewReader(config))
d.DisallowUnknownFields()
if err := d.Decode(&m.config); err != nil {
return fmt.Errorf("%s Init(): Error decoding JSON config: %w", m.name, err)
}
}
if err := m.setup(); err != nil {

View File

@@ -9,6 +9,7 @@ package collectors
import (
"bufio"
"bytes"
"encoding/json"
"errors"
"fmt"
@@ -101,9 +102,10 @@ func (m *MemstatCollector) Init(config json.RawMessage) error {
m.config.NodeStats = true
m.config.NumaStats = false
if len(config) > 0 {
err = json.Unmarshal(config, &m.config)
if err != nil {
return err
d := json.NewDecoder(bytes.NewReader(config))
d.DisallowUnknownFields()
if err := d.Decode(&m.config); err != nil {
return fmt.Errorf("%s Init(): Error decoding JSON config: %w", m.name, err)
}
}
m.meta = map[string]string{"source": m.name, "group": "Memory"}

View File

@@ -9,6 +9,7 @@ package collectors
import (
"bufio"
"bytes"
"encoding/json"
"fmt"
"os"
@@ -99,10 +100,10 @@ func (m *NetstatCollector) Init(config json.RawMessage) error {
m.config.SendDerivedValues = false
// Read configuration file, allow overwriting default config
if len(config) > 0 {
err := json.Unmarshal(config, &m.config)
if err != nil {
cclog.ComponentError(m.name, "Error reading config:", err.Error())
return err
d := json.NewDecoder(bytes.NewReader(config))
d.DisallowUnknownFields()
if err := d.Decode(&m.config); err != nil {
return fmt.Errorf("%s Init(): failed to decode JSON config: %w", m.name, err)
}
}

View File

@@ -8,6 +8,7 @@
package collectors
import (
"bytes"
"encoding/json"
"fmt"
"slices"
@@ -45,12 +46,7 @@ type nfsCollector struct {
}
func (m *nfsCollector) updateStats() error {
cmd := exec.Command(m.config.Nfsstats, `-l`, `--all`)
// Wait for cmd end
if err := cmd.Wait(); err != nil {
return fmt.Errorf("%s updateStats(): %w", m.name, err)
}
cmd := exec.Command(m.config.Nfsstats, "-l", "--all")
buffer, err := cmd.Output()
if err != nil {
@@ -95,9 +91,10 @@ func (m *nfsCollector) MainInit(config json.RawMessage) error {
m.config.Nfsstats = string(NFSSTAT_EXEC)
// Read JSON configuration
if len(config) > 0 {
err := json.Unmarshal(config, &m.config)
if err != nil {
return fmt.Errorf("%s Init(): failed to unmarshal JSON config: %w", m.name, err)
d := json.NewDecoder(bytes.NewReader(config))
d.DisallowUnknownFields()
if err := d.Decode(&m.config); err != nil {
return fmt.Errorf("%s Init(): failed to decode JSON config: %w", m.name, err)
}
}
m.meta = map[string]string{

View File

@@ -8,6 +8,7 @@
package collectors
import (
"bytes"
"encoding/json"
"fmt"
"os"
@@ -17,14 +18,13 @@ import (
"strings"
"time"
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
lp "github.com/ClusterCockpit/cc-lib/v2/ccMessage"
)
// These are the fields we read from the JSON configuration
type NfsIOStatCollectorConfig struct {
ExcludeMetrics []string `json:"exclude_metrics,omitempty"`
ExcludeFilesystem []string `json:"exclude_filesystem,omitempty"`
ExcludeFilesystems []string `json:"exclude_filesystem,omitempty"`
UseServerAddressAsSType bool `json:"use_server_as_stype,omitempty"`
SendAbsoluteValues bool `json:"send_abs_values"`
SendDerivedValues bool `json:"send_derived_values"`
@@ -75,7 +75,7 @@ func (m *NfsIOStatCollector) readNfsiostats() map[string]map[string]int64 {
// Is this a device line with mount point, remote target and NFS version?
dev := resolve_regex_fields(l, deviceRegex)
if len(dev) > 0 {
if !slices.Contains(m.config.ExcludeFilesystem, dev[m.key]) {
if !slices.Contains(m.config.ExcludeFilesystems, dev[m.key]) {
current = dev
if len(current["version"]) == 0 {
current["version"] = "3"
@@ -117,10 +117,10 @@ func (m *NfsIOStatCollector) Init(config json.RawMessage) error {
m.config.SendAbsoluteValues = true
m.config.SendDerivedValues = false
if len(config) > 0 {
err = json.Unmarshal(config, &m.config)
if err != nil {
cclog.ComponentError(m.name, "Error reading config:", err.Error())
return err
d := json.NewDecoder(bytes.NewReader(config))
d.DisallowUnknownFields()
if err := d.Decode(&m.config); err != nil {
return fmt.Errorf("%s Init(): failed to decode JSON config: %w", m.name, err)
}
}
m.key = "mntpoint"

View File

@@ -16,7 +16,7 @@ hugo_path: docs/reference/cc-metric-collector/collectors/nfsio.md
"exclude_metrics": [
"oread", "pageread"
],
"exclude_filesystems": [
"exclude_filesystem": [
"/mnt"
],
"use_server_as_stype": false,

View File

@@ -2,6 +2,7 @@ package collectors
import (
"bufio"
"bytes"
"encoding/json"
"fmt"
"os"
@@ -83,9 +84,10 @@ func (m *NUMAStatsCollector) Init(config json.RawMessage) error {
m.config.SendAbsoluteValues = true
if len(config) > 0 {
err := json.Unmarshal(config, &m.config)
if err != nil {
return fmt.Errorf("%s Init(): unable to unmarshal numastat configuration: %w", m.name, err)
d := json.NewDecoder(bytes.NewReader(config))
d.DisallowUnknownFields()
if err := d.Decode(&m.config); err != nil {
return fmt.Errorf("%s Init(): Error decoding JSON config: %w", m.name, err)
}
}

View File

@@ -72,9 +72,10 @@ func (m *NvidiaCollector) Init(config json.RawMessage) error {
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
}
if len(config) > 0 {
err = json.Unmarshal(config, &m.config)
if err != nil {
return err
d := json.NewDecoder(strings.NewReader(string(config)))
d.DisallowUnknownFields()
if err = d.Decode(&m.config); err != nil {
return fmt.Errorf("%s Init(): Error decoding JSON config: %w", m.name, err)
}
}
m.meta = map[string]string{

View File

@@ -8,6 +8,7 @@
package collectors
import (
"bytes"
"encoding/json"
"fmt"
"os"
@@ -67,10 +68,10 @@ func (m *RAPLCollector) Init(config json.RawMessage) error {
// Read in the JSON configuration
if len(config) > 0 {
err := json.Unmarshal(config, &m.config)
if err != nil {
cclog.ComponentError(m.name, "Error reading config:", err.Error())
return err
d := json.NewDecoder(bytes.NewReader(config))
d.DisallowUnknownFields()
if err := d.Decode(&m.config); err != nil {
return fmt.Errorf("%s Init(): Error decoding JSON config: %w", m.name, err)
}
}

View File

@@ -8,6 +8,7 @@
package collectors
import (
"bytes"
"encoding/json"
"errors"
"fmt"
@@ -60,10 +61,10 @@ func (m *RocmSmiCollector) Init(config json.RawMessage) error {
}
// Read in the JSON configuration
if len(config) > 0 {
err = json.Unmarshal(config, &m.config)
if err != nil {
cclog.ComponentError(m.name, "Error reading config:", err.Error())
return err
d := json.NewDecoder(bytes.NewReader(config))
d.DisallowUnknownFields()
if err := d.Decode(&m.config); err != nil {
return fmt.Errorf("%s Init(): Error decoding JSON config: %w", m.name, err)
}
}

View File

@@ -8,11 +8,11 @@
package collectors
import (
"bytes"
"encoding/json"
"fmt"
"time"
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
lp "github.com/ClusterCockpit/cc-lib/v2/ccMessage"
)
@@ -52,7 +52,10 @@ func (m *SampleCollector) Init(config json.RawMessage) error {
m.parallel = true
// Define meta information sent with each metric
// (Can also be dynamic or this is the basic set with extension through AddMeta())
m.meta = map[string]string{"source": m.name, "group": "SAMPLE"}
m.meta = map[string]string{
"source": m.name,
"group": "SAMPLE",
}
// Define tags sent with each metric
// The 'type' tag is always needed, it defines the granularity of the metric
// node -> whole system
@@ -63,13 +66,15 @@ func (m *SampleCollector) Init(config json.RawMessage) error {
// core -> single CPU core that may consist of multiple hardware threads (SMT) (requires core ID as 'type-id' tag)
// hwthtread -> single CPU hardware thread (requires hardware thread ID as 'type-id' tag)
// accelerator -> A accelerator device like GPU or FPGA (requires an accelerator ID as 'type-id' tag)
m.tags = map[string]string{"type": "node"}
m.tags = map[string]string{
"type": "node",
}
// Read in the JSON configuration
if len(config) > 0 {
err = json.Unmarshal(config, &m.config)
if err != nil {
cclog.ComponentError(m.name, "Error reading config:", err.Error())
return err
d := json.NewDecoder(bytes.NewReader(config))
d.DisallowUnknownFields()
if err := d.Decode(&m.config); err != nil {
return fmt.Errorf("%s Init(): Error decoding JSON config: %w", m.name, err)
}
}
@@ -96,7 +101,7 @@ func (m *SampleCollector) Read(interval time.Duration, output chan lp.CCMessage)
// stop := readState()
// value = (stop - start) / interval.Seconds()
y, err := lp.NewMessage("sample_metric", m.tags, m.meta, map[string]any{"value": value}, timestamp)
y, err := lp.NewMetric("sample_metric", m.tags, m.meta, value, timestamp)
if err == nil {
// Send it to output channel
output <- y

View File

@@ -8,6 +8,7 @@
package collectors
import (
"bytes"
"encoding/json"
"fmt"
"sync"
@@ -47,26 +48,30 @@ func (m *SampleTimerCollector) Init(name string, config json.RawMessage) error {
}
// Define meta information sent with each metric
// (Can also be dynamic or this is the basic set with extension through AddMeta())
m.meta = map[string]string{"source": m.name, "group": "SAMPLE"}
m.meta = map[string]string{
"source": m.name,
"group": "SAMPLE",
}
// Define tags sent with each metric
// The 'type' tag is always needed, it defines the granularity of the metric
// node -> whole system
// socket -> CPU socket (requires socket ID as 'type-id' tag)
// cpu -> single CPU hardware thread (requires cpu ID as 'type-id' tag)
m.tags = map[string]string{"type": "node"}
m.tags = map[string]string{
"type": "node",
}
// Read in the JSON configuration
if len(config) > 0 {
err = json.Unmarshal(config, &m.config)
if err != nil {
cclog.ComponentError(m.name, "Error reading config:", err.Error())
return err
d := json.NewDecoder(bytes.NewReader(config))
d.DisallowUnknownFields()
if err := d.Decode(&m.config); err != nil {
return fmt.Errorf("%s Init(): error decoding JSON config: %w", m.name, err)
}
}
// Parse the read interval duration
m.interval, err = time.ParseDuration(m.config.Interval)
if err != nil {
cclog.ComponentError(m.name, "Error parsing interval:", err.Error())
return err
return fmt.Errorf("%s Init(): error parsing interval: %w", m.name, err)
}
// Storage for output channel
@@ -77,13 +82,11 @@ func (m *SampleTimerCollector) Init(name string, config json.RawMessage) error {
m.ticker = time.NewTicker(m.interval)
// Start the timer loop with return functionality by sending 'true' to the done channel
m.wg.Add(1)
go func() {
m.wg.Go(func() {
select {
case <-m.done:
// Exit the timer loop
cclog.ComponentDebug(m.name, "Closing...")
m.wg.Done()
return
case timestamp := <-m.ticker.C:
// This is executed every timer tick but we have to wait until the first
@@ -92,7 +95,7 @@ func (m *SampleTimerCollector) Init(name string, config json.RawMessage) error {
m.ReadMetrics(timestamp)
}
}
}()
})
// Set this flag only if everything is initialized properly, all required files exist, ...
m.init = true
@@ -111,7 +114,7 @@ func (m *SampleTimerCollector) ReadMetrics(timestamp time.Time) {
// stop := readState()
// value = (stop - start) / interval.Seconds()
y, err := lp.NewMessage("sample_metric", m.tags, m.meta, map[string]any{"value": value}, timestamp)
y, err := lp.NewMetric("sample_metric", m.tags, m.meta, value, timestamp)
if err == nil && m.output != nil {
// Send it to output channel if we have a valid channel
m.output <- y

View File

@@ -9,6 +9,7 @@ package collectors
import (
"bufio"
"bytes"
"encoding/json"
"fmt"
"os"
@@ -66,8 +67,10 @@ func (m *SchedstatCollector) Init(config json.RawMessage) error {
// Read in the JSON configuration
if len(config) > 0 {
if err := json.Unmarshal(config, &m.config); err != nil {
return fmt.Errorf("%s Init(): Error reading config: %w", m.name, err)
d := json.NewDecoder(bytes.NewReader(config))
d.DisallowUnknownFields()
if err := d.Decode(&m.config); err != nil {
return fmt.Errorf("%s Init(): failed to decode JSON config: %w", m.name, err)
}
}
@@ -124,7 +127,7 @@ func (m *SchedstatCollector) ParseProcLine(linefields []string, tags map[string]
m.olddata[linefields[0]]["waiting"] = waiting
value := l_running + l_waiting
y, err := lp.NewMessage("cpu_load_core", tags, m.meta, map[string]any{"value": value}, now)
y, err := lp.NewMetric("cpu_load_core", tags, m.meta, value, now)
if err == nil {
// Send it to output channel
output <- y

View File

@@ -8,13 +8,13 @@
package collectors
import (
"bytes"
"encoding/json"
"fmt"
"runtime"
"syscall"
"time"
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
lp "github.com/ClusterCockpit/cc-lib/v2/ccMessage"
)
@@ -40,13 +40,18 @@ func (m *SelfCollector) Init(config json.RawMessage) error {
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
}
m.parallel = true
m.meta = map[string]string{"source": m.name, "group": "Self"}
m.tags = map[string]string{"type": "node"}
m.meta = map[string]string{
"source": m.name,
"group": "Self",
}
m.tags = map[string]string{
"type": "node",
}
if len(config) > 0 {
err = json.Unmarshal(config, &m.config)
if err != nil {
cclog.ComponentError(m.name, "Error reading config:", err.Error())
return err
d := json.NewDecoder(bytes.NewReader(config))
d.DisallowUnknownFields()
if err := d.Decode(&m.config); err != nil {
return fmt.Errorf("%s Init(): Error decoding JSON config: %w", m.name, err)
}
}
m.init = true
@@ -60,49 +65,49 @@ func (m *SelfCollector) Read(interval time.Duration, output chan lp.CCMessage) {
var memstats runtime.MemStats
runtime.ReadMemStats(&memstats)
y, err := lp.NewMessage("total_alloc", m.tags, m.meta, map[string]any{"value": memstats.TotalAlloc}, timestamp)
y, err := lp.NewMetric("total_alloc", m.tags, m.meta, memstats.TotalAlloc, timestamp)
if err == nil {
y.AddMeta("unit", "Bytes")
output <- y
}
y, err = lp.NewMessage("heap_alloc", m.tags, m.meta, map[string]any{"value": memstats.HeapAlloc}, timestamp)
y, err = lp.NewMetric("heap_alloc", m.tags, m.meta, memstats.HeapAlloc, timestamp)
if err == nil {
y.AddMeta("unit", "Bytes")
output <- y
}
y, err = lp.NewMessage("heap_sys", m.tags, m.meta, map[string]any{"value": memstats.HeapSys}, timestamp)
y, err = lp.NewMetric("heap_sys", m.tags, m.meta, memstats.HeapSys, timestamp)
if err == nil {
y.AddMeta("unit", "Bytes")
output <- y
}
y, err = lp.NewMessage("heap_idle", m.tags, m.meta, map[string]any{"value": memstats.HeapIdle}, timestamp)
y, err = lp.NewMetric("heap_idle", m.tags, m.meta, memstats.HeapIdle, timestamp)
if err == nil {
y.AddMeta("unit", "Bytes")
output <- y
}
y, err = lp.NewMessage("heap_inuse", m.tags, m.meta, map[string]any{"value": memstats.HeapInuse}, timestamp)
y, err = lp.NewMetric("heap_inuse", m.tags, m.meta, memstats.HeapInuse, timestamp)
if err == nil {
y.AddMeta("unit", "Bytes")
output <- y
}
y, err = lp.NewMessage("heap_released", m.tags, m.meta, map[string]any{"value": memstats.HeapReleased}, timestamp)
y, err = lp.NewMetric("heap_released", m.tags, m.meta, memstats.HeapReleased, timestamp)
if err == nil {
y.AddMeta("unit", "Bytes")
output <- y
}
y, err = lp.NewMessage("heap_objects", m.tags, m.meta, map[string]any{"value": memstats.HeapObjects}, timestamp)
y, err = lp.NewMetric("heap_objects", m.tags, m.meta, memstats.HeapObjects, timestamp)
if err == nil {
output <- y
}
}
if m.config.GoRoutines {
y, err := lp.NewMessage("num_goroutines", m.tags, m.meta, map[string]any{"value": runtime.NumGoroutine()}, timestamp)
y, err := lp.NewMetric("num_goroutines", m.tags, m.meta, runtime.NumGoroutine(), timestamp)
if err == nil {
output <- y
}
}
if m.config.CgoCalls {
y, err := lp.NewMessage("num_cgo_calls", m.tags, m.meta, map[string]any{"value": runtime.NumCgoCall()}, timestamp)
y, err := lp.NewMetric("num_cgo_calls", m.tags, m.meta, runtime.NumCgoCall(), timestamp)
if err == nil {
output <- y
}
@@ -113,35 +118,35 @@ func (m *SelfCollector) Read(interval time.Duration, output chan lp.CCMessage) {
if err == nil {
sec, nsec := rusage.Utime.Unix()
t := float64(sec) + (float64(nsec) * 1e-9)
y, err := lp.NewMessage("rusage_user_time", m.tags, m.meta, map[string]any{"value": t}, timestamp)
y, err := lp.NewMetric("rusage_user_time", m.tags, m.meta, t, timestamp)
if err == nil {
y.AddMeta("unit", "seconds")
output <- y
}
sec, nsec = rusage.Stime.Unix()
t = float64(sec) + (float64(nsec) * 1e-9)
y, err = lp.NewMessage("rusage_system_time", m.tags, m.meta, map[string]any{"value": t}, timestamp)
y, err = lp.NewMetric("rusage_system_time", m.tags, m.meta, t, timestamp)
if err == nil {
y.AddMeta("unit", "seconds")
output <- y
}
y, err = lp.NewMessage("rusage_vol_ctx_switch", m.tags, m.meta, map[string]any{"value": rusage.Nvcsw}, timestamp)
y, err = lp.NewMetric("rusage_vol_ctx_switch", m.tags, m.meta, rusage.Nvcsw, timestamp)
if err == nil {
output <- y
}
y, err = lp.NewMessage("rusage_invol_ctx_switch", m.tags, m.meta, map[string]any{"value": rusage.Nivcsw}, timestamp)
y, err = lp.NewMetric("rusage_invol_ctx_switch", m.tags, m.meta, rusage.Nivcsw, timestamp)
if err == nil {
output <- y
}
y, err = lp.NewMessage("rusage_signals", m.tags, m.meta, map[string]any{"value": rusage.Nsignals}, timestamp)
y, err = lp.NewMetric("rusage_signals", m.tags, m.meta, rusage.Nsignals, timestamp)
if err == nil {
output <- y
}
y, err = lp.NewMessage("rusage_major_pgfaults", m.tags, m.meta, map[string]any{"value": rusage.Majflt}, timestamp)
y, err = lp.NewMetric("rusage_major_pgfaults", m.tags, m.meta, rusage.Majflt, timestamp)
if err == nil {
output <- y
}
y, err = lp.NewMessage("rusage_minor_pgfaults", m.tags, m.meta, map[string]any{"value": rusage.Minflt}, timestamp)
y, err = lp.NewMetric("rusage_minor_pgfaults", m.tags, m.meta, rusage.Minflt, timestamp)
if err == nil {
output <- y
}

View File

@@ -119,8 +119,9 @@ func (m *SlurmCgroupCollector) Init(config json.RawMessage) error {
m.cgroupBase = defaultCgroupBase
if len(config) > 0 {
err = json.Unmarshal(config, &m.config)
if err != nil {
d := json.NewDecoder(strings.NewReader(string(config)))
d.DisallowUnknownFields()
if err = d.Decode(&m.config); err != nil {
return fmt.Errorf("%s Init(): Error reading JSON config: %w", m.name, err)
}
m.excludeMetrics = make(map[string]struct{})

View File

@@ -8,6 +8,7 @@
package collectors
import (
"bytes"
"encoding/json"
"fmt"
"os"
@@ -63,9 +64,10 @@ func (m *TempCollector) Init(config json.RawMessage) error {
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
}
if len(config) > 0 {
err := json.Unmarshal(config, &m.config)
if err != nil {
return fmt.Errorf("%s Init(): failed to unmarshal JSON config: %w", m.name, err)
d := json.NewDecoder(bytes.NewReader(config))
d.DisallowUnknownFields()
if err := d.Decode(&m.config); err != nil {
return fmt.Errorf("%s Init(): Error decoding JSON config: %w", m.name, err)
}
}

View File

@@ -8,6 +8,7 @@
package collectors
import (
"bytes"
"encoding/json"
"fmt"
"os/exec"
@@ -46,9 +47,10 @@ func (m *TopProcsCollector) Init(config json.RawMessage) error {
"group": "TopProcs",
}
if len(config) > 0 {
err = json.Unmarshal(config, &m.config)
if err != nil {
return fmt.Errorf("%s Init(): json.Unmarshal() failed: %w", m.name, err)
d := json.NewDecoder(bytes.NewReader(config))
d.DisallowUnknownFields()
if err := d.Decode(&m.config); err != nil {
return fmt.Errorf("%s Init(): Error decoding JSON config: %w", m.name, err)
}
} else {
m.config.Num_procs = int(DEFAULT_NUM_PROCS)

View File

@@ -8,6 +8,7 @@
package metricRouter
import (
"bytes"
"encoding/json"
"fmt"
"maps"
@@ -46,7 +47,6 @@ type metricRouterConfig struct {
MaxForward int `json:"max_forward"` // Number of maximal forwarded metrics at one select
NormalizeUnits bool `json:"normalize_units"` // Check unit meta flag and normalize it using cc-units
ChangeUnitPrefix map[string]string `json:"change_unit_prefix"` // Add prefix that should be applied to the metrics
// dropMetrics map[string]bool // Internal map for O(1) lookup
MessageProcessor json.RawMessage `json:"process_messages,omitempty"`
}
@@ -102,10 +102,10 @@ func (r *metricRouter) Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, rout
// Drop domain part of host name
r.hostname = strings.SplitN(hostname, `.`, 2)[0]
err = json.Unmarshal(routerConfig, &r.config)
if err != nil {
cclog.ComponentError("MetricRouter", err.Error())
return err
d := json.NewDecoder(bytes.NewReader(routerConfig))
d.DisallowUnknownFields()
if err := d.Decode(&r.config); err != nil {
return fmt.Errorf("failed to decode metric router config: %w", err)
}
r.maxForward = max(1, r.config.MaxForward)