From cca0d23efa86eddc2b577a507d400dc887d11e9c Mon Sep 17 00:00:00 2001 From: Holger Obermaier <40787752+ho-ob@users.noreply.github.com> Date: Mon, 9 Feb 2026 14:51:31 +0100 Subject: [PATCH] Golangci lint fixes (#195) * Add golangci-lin as make target * Fix: could omit type ... from declaration; it will be inferred from the right-hand side (staticcheck) * Fix func intArrayContains is unused (unused) * Fix: could use strings.ReplaceAll instead (staticcheck) * Fix: could expand call to math.Pow (staticcheck) * Fix: could use tagged switch on `...` (staticcheck) * Fix: Error return value of `...` is not checked (errcheck) * Fix: ineffectual assignment to err (ineffassign) * Fix: There is no need to wait for command completion * Add cpustat, diskstat and schedstat config * Use slices to exclude metrics * Replaced stringArrayContains by slices.Contains * Replace m[k]=v loop with maps.Copy * Use module slices from the standard library. Remove use of golang.org/x/exp/slices * Use SplitSeq and max to modernize code --- Makefile | 5 + collectors/README.md | 11 +- collectors/beegfsmetaMetric.go | 8 +- collectors/beegfsstorageMetric.go | 8 +- collectors/collectorManager.go | 3 +- collectors/cpufreqCpuinfoMetric.go | 18 +- collectors/cpufreqMetric.go | 4 +- collectors/cpustatMetric.go | 47 ++- collectors/customCmdMetric.go | 26 +- collectors/diskstatMetric.go | 26 +- collectors/gpfsMetric.go | 378 +++++++++--------- collectors/infinibandMetric.go | 4 +- collectors/iostatMetric.go | 32 +- collectors/ipmiMetric.go | 53 ++- collectors/likwidMetric.go | 52 ++- collectors/loadavgMetric.go | 9 +- collectors/lustreMetric.go | 12 +- collectors/memstatMetric.go | 18 +- collectors/metricCollector.go | 24 -- collectors/netstatMetric.go | 32 +- collectors/nfsMetric.go | 42 +- collectors/nfsiostatMetric.go | 9 +- collectors/numastatsMetric.go | 10 +- collectors/nvidiaMetric.go | 37 +- collectors/raplMetric.go | 7 +- collectors/rocmsmiMetric.go | 4 +- collectors/sampleMetric.go | 5 +- collectors/sampleTimerMetric.go | 5 +- collectors/schedstatMetric.go | 55 ++- collectors/selfMetric.go | 5 +- collectors/slurmCgroupMetric.go | 7 +- collectors/tempMetric.go | 6 +- collectors/topprocsMetric.go | 26 +- example-configs/collectors.json | 17 +- go.mod | 2 +- internal/metricAggregator/metricAggregator.go | 5 +- .../metricAggregatorFunctions.go | 7 +- internal/metricRouter/metricCache.go | 6 +- internal/metricRouter/metricRouter.go | 50 ++- pkg/ccTopology/ccTopology.go | 4 +- 40 files changed, 657 insertions(+), 422 deletions(-) diff --git a/Makefile b/Makefile index 8b62dab..0fbec79 100644 --- a/Makefile +++ b/Makefile @@ -72,6 +72,11 @@ staticcheck: $(GOBIN) install honnef.co/go/tools/cmd/staticcheck@latest $$($(GOBIN) env GOPATH)/bin/staticcheck ./... +.PHONY: golangci-lint +golangci-lint: + $(GOBIN) install github.com/golangci/golangci-lint/v2/cmd/golangci-lint@latest + $$($(GOBIN) env GOPATH)/bin/golangci-lint run + .ONESHELL: .PHONY: RPM RPM: scripts/cc-metric-collector.spec diff --git a/collectors/README.md b/collectors/README.md index 4e0f805..adfb82f 100644 --- a/collectors/README.md +++ b/collectors/README.md @@ -67,7 +67,7 @@ A collector reads data from any source, parses it to metrics and submits these m * `Read(duration time.Duration, output chan ccMessage.CCMessage)`: Read, parse and submit data to the `output` channel as [`CCMessage`](https://github.com/ClusterCockpit/cc-lib/blob/main/ccMessage/README.md). If the collector has to measure anything for some duration, use the provided function argument `duration`. * `Close()`: Closes down the collector. -It is recommanded to call `setup()` in the `Init()` function. +It is recommended to call `setup()` in the `Init()` function. Finally, the collector needs to be registered in the `collectorManager.go`. There is a list of collectors called `AvailableCollectors` which is a map (`collector_type_string` -> `pointer to MetricCollector interface`). Add a new entry with a descriptive name and the new collector. @@ -100,11 +100,12 @@ func (m *SampleCollector) Init(config json.RawMessage) error { } m.name = "SampleCollector" - m.setup() + if err := m.setup(); err != nil { + return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err) + } if len(config) > 0 { - err := json.Unmarshal(config, &m.config) - if err != nil { - return err + if err := json.Unmarshal(config, &m.config); err != nil { + return fmt.Errorf("%s Init(): json.Unmarshal() call failed: %w", m.name, err) } } m.meta = map[string]string{"source": m.name, "group": "Sample"} diff --git a/collectors/beegfsmetaMetric.go b/collectors/beegfsmetaMetric.go index b6ad47f..3c1c544 100644 --- a/collectors/beegfsmetaMetric.go +++ b/collectors/beegfsmetaMetric.go @@ -17,6 +17,7 @@ import ( "os/exec" "os/user" "regexp" + "slices" "strconv" "strings" "time" @@ -61,7 +62,9 @@ func (m *BeegfsMetaCollector) Init(config json.RawMessage) error { "rmXA", "setXA", "mirror"} m.name = "BeegfsMetaCollector" - m.setup() + if err := m.setup(); err != nil { + return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err) + } m.parallel = true // Set default beegfs-ctl binary @@ -78,8 +81,7 @@ func (m *BeegfsMetaCollector) Init(config json.RawMessage) error { //create map with possible variables m.matches = make(map[string]string) for _, value := range nodeMdstat_array { - _, skip := stringArrayContains(m.config.ExcludeMetrics, value) - if skip { + if slices.Contains(m.config.ExcludeMetrics, value) { m.matches["other"] = "0" } else { m.matches["beegfs_cmeta_"+value] = "0" diff --git a/collectors/beegfsstorageMetric.go b/collectors/beegfsstorageMetric.go index 0b65d60..0c80387 100644 --- a/collectors/beegfsstorageMetric.go +++ b/collectors/beegfsstorageMetric.go @@ -17,6 +17,7 @@ import ( "os/exec" "os/user" "regexp" + "slices" "strconv" "strings" "time" @@ -54,7 +55,9 @@ func (m *BeegfsStorageCollector) Init(config json.RawMessage) error { "storInf", "unlnk"} m.name = "BeegfsStorageCollector" - m.setup() + if err := m.setup(); err != nil { + return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err) + } m.parallel = true // Set default beegfs-ctl binary @@ -71,8 +74,7 @@ func (m *BeegfsStorageCollector) Init(config json.RawMessage) error { //create map with possible variables m.matches = make(map[string]string) for _, value := range storageStat_array { - _, skip := stringArrayContains(m.config.ExcludeMetrics, value) - if skip { + if slices.Contains(m.config.ExcludeMetrics, value) { m.matches["other"] = "0" } else { m.matches["beegfs_cstorage_"+value] = "0" diff --git a/collectors/collectorManager.go b/collectors/collectorManager.go index ee116c7..2f1423e 100644 --- a/collectors/collectorManager.go +++ b/collectors/collectorManager.go @@ -9,6 +9,7 @@ package collectors import ( "encoding/json" + "fmt" "sync" "time" @@ -104,7 +105,7 @@ func (cm *collectorManager) Init(ticker mct.MultiChanTicker, duration time.Durat err = collector.Init(collectorCfg) if err != nil { - cclog.ComponentError("CollectorManager", "Collector", collectorName, "initialization failed:", err.Error()) + cclog.ComponentError("CollectorManager", fmt.Sprintf("Collector %s initialization failed: %v", collectorName, err)) continue } cclog.ComponentDebug("CollectorManager", "ADD COLLECTOR", collector.Name()) diff --git a/collectors/cpufreqCpuinfoMetric.go b/collectors/cpufreqCpuinfoMetric.go index 34647f9..014bc09 100644 --- a/collectors/cpufreqCpuinfoMetric.go +++ b/collectors/cpufreqCpuinfoMetric.go @@ -41,9 +41,10 @@ func (m *CPUFreqCpuInfoCollector) Init(config json.RawMessage) error { return nil } - m.setup() - m.name = "CPUFreqCpuInfoCollector" + if err := m.setup(); err != nil { + return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err) + } m.parallel = true m.meta = map[string]string{ "source": m.name, @@ -56,7 +57,6 @@ func (m *CPUFreqCpuInfoCollector) Init(config json.RawMessage) error { if err != nil { return fmt.Errorf("failed to open file '%s': %v", cpuInfoFile, err) } - defer file.Close() // Collect topology information from file cpuinfo foundFreq := false @@ -86,6 +86,10 @@ func (m *CPUFreqCpuInfoCollector) Init(config json.RawMessage) error { } } + if err := file.Close(); err != nil { + return fmt.Errorf("%s Init(): Call to file.Close() failed: %w", m.name, err) + } + // were all topology information collected? if foundFreq && len(processor) > 0 && @@ -140,7 +144,13 @@ func (m *CPUFreqCpuInfoCollector) Read(interval time.Duration, output chan lp.CC fmt.Sprintf("Read(): Failed to open file '%s': %v", cpuInfoFile, err)) return } - defer file.Close() + defer func() { + if err := file.Close(); err != nil { + cclog.ComponentError( + m.name, + fmt.Sprintf("Read(): Failed to close file '%s': %v", cpuInfoFile, err)) + } + }() processorCounter := 0 now := time.Now() diff --git a/collectors/cpufreqMetric.go b/collectors/cpufreqMetric.go index 8b516f5..80c3a92 100644 --- a/collectors/cpufreqMetric.go +++ b/collectors/cpufreqMetric.go @@ -48,7 +48,9 @@ func (m *CPUFreqCollector) Init(config json.RawMessage) error { } m.name = "CPUFreqCollector" - m.setup() + if err := m.setup(); err != nil { + return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err) + } m.parallel = true if len(config) > 0 { err := json.Unmarshal(config, &m.config) diff --git a/collectors/cpustatMetric.go b/collectors/cpustatMetric.go index 98365cf..5530e73 100644 --- a/collectors/cpustatMetric.go +++ b/collectors/cpustatMetric.go @@ -12,6 +12,7 @@ import ( "encoding/json" "fmt" "os" + "slices" "strconv" "strings" "time" @@ -39,10 +40,17 @@ type CpustatCollector struct { func (m *CpustatCollector) Init(config json.RawMessage) error { m.name = "CpustatCollector" - m.setup() + if err := m.setup(); err != nil { + return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err) + } m.parallel = true - m.meta = map[string]string{"source": m.name, "group": "CPU"} - m.nodetags = map[string]string{"type": "node"} + m.meta = map[string]string{ + "source": m.name, + "group": "CPU", + } + m.nodetags = map[string]string{ + "type": "node", + } if len(config) > 0 { err := json.Unmarshal(config, &m.config) if err != nil { @@ -64,14 +72,7 @@ func (m *CpustatCollector) Init(config json.RawMessage) error { m.matches = make(map[string]int) for match, index := range matches { - doExclude := false - for _, exclude := range m.config.ExcludeMetrics { - if match == exclude { - doExclude = true - break - } - } - if !doExclude { + if !slices.Contains(m.config.ExcludeMetrics, match) { m.matches[match] = index } } @@ -79,9 +80,17 @@ func (m *CpustatCollector) Init(config json.RawMessage) error { // Check input file file, err := os.Open(string(CPUSTATFILE)) if err != nil { - cclog.ComponentError(m.name, err.Error()) + cclog.ComponentError( + m.name, + fmt.Sprintf("Init(): Failed to open file '%s': %v", string(CPUSTATFILE), err)) } - defer file.Close() + defer func() { + if err := file.Close(); err != nil { + cclog.ComponentError( + m.name, + fmt.Sprintf("Init(): Failed to close file '%s': %v", string(CPUSTATFILE), err)) + } + }() // Pre-generate tags for all CPUs num_cpus := 0 @@ -155,9 +164,17 @@ func (m *CpustatCollector) Read(interval time.Duration, output chan lp.CCMessage file, err := os.Open(string(CPUSTATFILE)) if err != nil { - cclog.ComponentError(m.name, err.Error()) + cclog.ComponentError( + m.name, + fmt.Sprintf("Read(): Failed to open file '%s': %v", string(CPUSTATFILE), err)) } - defer file.Close() + defer func() { + if err := file.Close(); err != nil { + cclog.ComponentError( + m.name, + fmt.Sprintf("Read(): Failed to close file '%s': %v", string(CPUSTATFILE), err)) + } + }() scanner := bufio.NewScanner(file) for scanner.Scan() { diff --git a/collectors/customCmdMetric.go b/collectors/customCmdMetric.go index 8e250bc..52917c5 100644 --- a/collectors/customCmdMetric.go +++ b/collectors/customCmdMetric.go @@ -10,9 +10,11 @@ package collectors import ( "encoding/json" "errors" + "fmt" "log" "os" "os/exec" + "slices" "strings" "time" @@ -49,11 +51,16 @@ func (m *CustomCmdCollector) Init(config json.RawMessage) error { return err } } - m.setup() + if err := m.setup(); err != nil { + return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err) + } for _, c := range m.config.Commands { cmdfields := strings.Fields(c) - command := exec.Command(cmdfields[0], strings.Join(cmdfields[1:], " ")) - command.Wait() + command := exec.Command(cmdfields[0], cmdfields[1:]...) + if err := command.Wait(); err != nil { + log.Print(err) + continue + } _, err = command.Output() if err == nil { m.commands = append(m.commands, c) @@ -88,8 +95,11 @@ func (m *CustomCmdCollector) Read(interval time.Duration, output chan lp.CCMessa } for _, cmd := range m.commands { cmdfields := strings.Fields(cmd) - command := exec.Command(cmdfields[0], strings.Join(cmdfields[1:], " ")) - command.Wait() + command := exec.Command(cmdfields[0], cmdfields[1:]...) + if err := command.Wait(); err != nil { + log.Print(err) + continue + } stdout, err := command.Output() if err != nil { log.Print(err) @@ -101,8 +111,7 @@ func (m *CustomCmdCollector) Read(interval time.Duration, output chan lp.CCMessa continue } for _, c := range cmdmetrics { - _, skip := stringArrayContains(m.config.ExcludeMetrics, c.Name()) - if skip { + if slices.Contains(m.config.ExcludeMetrics, c.Name()) { continue } @@ -121,8 +130,7 @@ func (m *CustomCmdCollector) Read(interval time.Duration, output chan lp.CCMessa continue } for _, f := range fmetrics { - _, skip := stringArrayContains(m.config.ExcludeMetrics, f.Name()) - if skip { + if slices.Contains(m.config.ExcludeMetrics, f.Name()) { continue } output <- lp.FromInfluxMetric(f) diff --git a/collectors/diskstatMetric.go b/collectors/diskstatMetric.go index 551ff31..9ad8512 100644 --- a/collectors/diskstatMetric.go +++ b/collectors/diskstatMetric.go @@ -10,6 +10,7 @@ package collectors import ( "bufio" "encoding/json" + "fmt" "os" "strings" "syscall" @@ -36,7 +37,9 @@ func (m *DiskstatCollector) Init(config json.RawMessage) error { m.name = "DiskstatCollector" m.parallel = true m.meta = map[string]string{"source": m.name, "group": "Disk"} - m.setup() + if err := m.setup(); err != nil { + return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err) + } if len(config) > 0 { if err := json.Unmarshal(config, &m.config); err != nil { return err @@ -54,10 +57,11 @@ func (m *DiskstatCollector) Init(config json.RawMessage) error { } file, err := os.Open(MOUNTFILE) if err != nil { - cclog.ComponentError(m.name, err.Error()) - return err + return fmt.Errorf("%s Init(): file open for file \"%s\" failed: %w", m.name, MOUNTFILE, err) + } + if err := file.Close(); err != nil { + return fmt.Errorf("%s Init(): file close for file \"%s\" failed: %w", m.name, MOUNTFILE, err) } - defer file.Close() m.init = true return nil } @@ -69,10 +73,18 @@ func (m *DiskstatCollector) Read(interval time.Duration, output chan lp.CCMessag file, err := os.Open(MOUNTFILE) if err != nil { - cclog.ComponentError(m.name, err.Error()) + cclog.ComponentError( + m.name, + fmt.Sprintf("Read(): Failed to open file '%s': %v", MOUNTFILE, err)) return } - defer file.Close() + defer func() { + if err := file.Close(); err != nil { + cclog.ComponentError( + m.name, + fmt.Sprintf("Read(): Failed to close file '%s': %v", MOUNTFILE, err)) + } + }() part_max_used := uint64(0) scanner := bufio.NewScanner(file) @@ -93,7 +105,7 @@ mountLoop: continue } - mountPath := strings.Replace(linefields[1], `\040`, " ", -1) + mountPath := strings.ReplaceAll(linefields[1], `\040`, " ") for _, excl := range m.config.ExcludeMounts { if strings.Contains(mountPath, excl) { diff --git a/collectors/gpfsMetric.go b/collectors/gpfsMetric.go index 7dc7e13..cf75432 100644 --- a/collectors/gpfsMetric.go +++ b/collectors/gpfsMetric.go @@ -17,6 +17,7 @@ import ( "log" "os/exec" "os/user" + "slices" "strconv" "strings" "syscall" @@ -43,11 +44,11 @@ type GpfsCollectorConfig struct { } type GpfsMetricDefinition struct { - name string - desc string - prefix string - unit string - calc string + name string + desc string + prefix string + unit string + calc string } type GpfsCollector struct { @@ -56,251 +57,251 @@ type GpfsCollector struct { config GpfsCollectorConfig sudoCmd string skipFS map[string]struct{} - lastTimestamp map[string]time.Time // Store timestamp of lastState per filesystem to derive bandwidths - definitions []GpfsMetricDefinition // all metrics to report + lastTimestamp map[string]time.Time // Store timestamp of lastState per filesystem to derive bandwidths + definitions []GpfsMetricDefinition // all metrics to report lastState map[string]GpfsCollectorState // one GpfsCollectorState per filesystem } var GpfsAbsMetrics = []GpfsMetricDefinition{ { - name: "gpfs_num_opens", - desc: "number of opens", - prefix: "_oc_", - unit: "requests", - calc: "none", + name: "gpfs_num_opens", + desc: "number of opens", + prefix: "_oc_", + unit: "requests", + calc: "none", }, { - name: "gpfs_num_closes", - desc: "number of closes", - prefix: "_cc_", - unit: "requests", - calc: "none", + name: "gpfs_num_closes", + desc: "number of closes", + prefix: "_cc_", + unit: "requests", + calc: "none", }, { - name: "gpfs_num_reads", - desc: "number of reads", - prefix: "_rdc_", - unit: "requests", - calc: "none", + name: "gpfs_num_reads", + desc: "number of reads", + prefix: "_rdc_", + unit: "requests", + calc: "none", }, { - name: "gpfs_num_writes", - desc: "number of writes", - prefix: "_wc_", - unit: "requests", - calc: "none", + name: "gpfs_num_writes", + desc: "number of writes", + prefix: "_wc_", + unit: "requests", + calc: "none", }, { - name: "gpfs_num_readdirs", - desc: "number of readdirs", - prefix: "_dir_", - unit: "requests", - calc: "none", + name: "gpfs_num_readdirs", + desc: "number of readdirs", + prefix: "_dir_", + unit: "requests", + calc: "none", }, { - name: "gpfs_num_inode_updates", - desc: "number of Inode Updates", - prefix: "_iu_", - unit: "requests", - calc: "none", + name: "gpfs_num_inode_updates", + desc: "number of Inode Updates", + prefix: "_iu_", + unit: "requests", + calc: "none", }, { - name: "gpfs_bytes_read", - desc: "bytes read", - prefix: "_br_", - unit: "bytes", - calc: "none", + name: "gpfs_bytes_read", + desc: "bytes read", + prefix: "_br_", + unit: "bytes", + calc: "none", }, { - name: "gpfs_bytes_written", - desc: "bytes written", - prefix: "_bw_", - unit: "bytes", - calc: "none", + name: "gpfs_bytes_written", + desc: "bytes written", + prefix: "_bw_", + unit: "bytes", + calc: "none", }, } var GpfsDiffMetrics = []GpfsMetricDefinition{ { - name: "gpfs_num_opens_diff", - desc: "number of opens (diff)", - prefix: "_oc_", - unit: "requests", - calc: "difference", + name: "gpfs_num_opens_diff", + desc: "number of opens (diff)", + prefix: "_oc_", + unit: "requests", + calc: "difference", }, { - name: "gpfs_num_closes_diff", - desc: "number of closes (diff)", - prefix: "_cc_", - unit: "requests", - calc: "difference", + name: "gpfs_num_closes_diff", + desc: "number of closes (diff)", + prefix: "_cc_", + unit: "requests", + calc: "difference", }, { - name: "gpfs_num_reads_diff", - desc: "number of reads (diff)", - prefix: "_rdc_", - unit: "requests", - calc: "difference", + name: "gpfs_num_reads_diff", + desc: "number of reads (diff)", + prefix: "_rdc_", + unit: "requests", + calc: "difference", }, { - name: "gpfs_num_writes_diff", - desc: "number of writes (diff)", - prefix: "_wc_", - unit: "requests", - calc: "difference", + name: "gpfs_num_writes_diff", + desc: "number of writes (diff)", + prefix: "_wc_", + unit: "requests", + calc: "difference", }, { - name: "gpfs_num_readdirs_diff", - desc: "number of readdirs (diff)", - prefix: "_dir_", - unit: "requests", - calc: "difference", + name: "gpfs_num_readdirs_diff", + desc: "number of readdirs (diff)", + prefix: "_dir_", + unit: "requests", + calc: "difference", }, { - name: "gpfs_num_inode_updates_diff", - desc: "number of Inode Updates (diff)", - prefix: "_iu_", - unit: "requests", - calc: "difference", + name: "gpfs_num_inode_updates_diff", + desc: "number of Inode Updates (diff)", + prefix: "_iu_", + unit: "requests", + calc: "difference", }, { - name: "gpfs_bytes_read_diff", - desc: "bytes read (diff)", - prefix: "_br_", - unit: "bytes", - calc: "difference", + name: "gpfs_bytes_read_diff", + desc: "bytes read (diff)", + prefix: "_br_", + unit: "bytes", + calc: "difference", }, { - name: "gpfs_bytes_written_diff", - desc: "bytes written (diff)", - prefix: "_bw_", - unit: "bytes", - calc: "difference", + name: "gpfs_bytes_written_diff", + desc: "bytes written (diff)", + prefix: "_bw_", + unit: "bytes", + calc: "difference", }, } var GpfsDeriveMetrics = []GpfsMetricDefinition{ { - name: "gpfs_opens_rate", - desc: "number of opens (rate)", - prefix: "_oc_", - unit: "requests/sec", - calc: "derivative", + name: "gpfs_opens_rate", + desc: "number of opens (rate)", + prefix: "_oc_", + unit: "requests/sec", + calc: "derivative", }, { - name: "gpfs_closes_rate", - desc: "number of closes (rate)", - prefix: "_oc_", - unit: "requests/sec", - calc: "derivative", + name: "gpfs_closes_rate", + desc: "number of closes (rate)", + prefix: "_oc_", + unit: "requests/sec", + calc: "derivative", }, { - name: "gpfs_reads_rate", - desc: "number of reads (rate)", - prefix: "_rdc_", - unit: "requests/sec", - calc: "derivative", + name: "gpfs_reads_rate", + desc: "number of reads (rate)", + prefix: "_rdc_", + unit: "requests/sec", + calc: "derivative", }, { - name: "gpfs_writes_rate", - desc: "number of writes (rate)", - prefix: "_wc_", - unit: "requests/sec", - calc: "derivative", + name: "gpfs_writes_rate", + desc: "number of writes (rate)", + prefix: "_wc_", + unit: "requests/sec", + calc: "derivative", }, { - name: "gpfs_readdirs_rate", - desc: "number of readdirs (rate)", - prefix: "_dir_", - unit: "requests/sec", - calc: "derivative", + name: "gpfs_readdirs_rate", + desc: "number of readdirs (rate)", + prefix: "_dir_", + unit: "requests/sec", + calc: "derivative", }, { - name: "gpfs_inode_updates_rate", - desc: "number of Inode Updates (rate)", - prefix: "_iu_", - unit: "requests/sec", - calc: "derivative", + name: "gpfs_inode_updates_rate", + desc: "number of Inode Updates (rate)", + prefix: "_iu_", + unit: "requests/sec", + calc: "derivative", }, { - name: "gpfs_bw_read", - desc: "bytes read (rate)", - prefix: "_br_", - unit: "bytes/sec", - calc: "derivative", + name: "gpfs_bw_read", + desc: "bytes read (rate)", + prefix: "_br_", + unit: "bytes/sec", + calc: "derivative", }, { - name: "gpfs_bw_write", - desc: "bytes written (rate)", - prefix: "_bw_", - unit: "bytes/sec", - calc: "derivative", + name: "gpfs_bw_write", + desc: "bytes written (rate)", + prefix: "_bw_", + unit: "bytes/sec", + calc: "derivative", }, } var GpfsTotalMetrics = []GpfsMetricDefinition{ { - name: "gpfs_bytes_total", - desc: "bytes total", - prefix: "bytesTotal", - unit: "bytes", - calc: "none", + name: "gpfs_bytes_total", + desc: "bytes total", + prefix: "bytesTotal", + unit: "bytes", + calc: "none", }, { - name: "gpfs_bytes_total_diff", - desc: "bytes total (diff)", - prefix: "bytesTotal", - unit: "bytes", - calc: "difference", + name: "gpfs_bytes_total_diff", + desc: "bytes total (diff)", + prefix: "bytesTotal", + unit: "bytes", + calc: "difference", }, { - name: "gpfs_bw_total", - desc: "bytes total (rate)", - prefix: "bytesTotal", - unit: "bytes/sec", - calc: "derivative", + name: "gpfs_bw_total", + desc: "bytes total (rate)", + prefix: "bytesTotal", + unit: "bytes/sec", + calc: "derivative", }, { - name: "gpfs_iops", - desc: "iops", - prefix: "iops", - unit: "requests", - calc: "none", + name: "gpfs_iops", + desc: "iops", + prefix: "iops", + unit: "requests", + calc: "none", }, { - name: "gpfs_iops_diff", - desc: "iops (diff)", - prefix: "iops", - unit: "requests", - calc: "difference", + name: "gpfs_iops_diff", + desc: "iops (diff)", + prefix: "iops", + unit: "requests", + calc: "difference", }, { - name: "gpfs_iops_rate", - desc: "iops (rate)", - prefix: "iops", - unit: "requests/sec", - calc: "derivative", + name: "gpfs_iops_rate", + desc: "iops (rate)", + prefix: "iops", + unit: "requests/sec", + calc: "derivative", }, { - name: "gpfs_metaops", - desc: "metaops", - prefix: "metaops", - unit: "requests", - calc: "none", + name: "gpfs_metaops", + desc: "metaops", + prefix: "metaops", + unit: "requests", + calc: "none", }, { - name: "gpfs_metaops_diff", - desc: "metaops (diff)", - prefix: "metaops", - unit: "requests", - calc: "difference", + name: "gpfs_metaops_diff", + desc: "metaops (diff)", + prefix: "metaops", + unit: "requests", + calc: "difference", }, { - name: "gpfs_metaops_rate", - desc: "metaops (rate)", - prefix: "metaops", - unit: "requests/sec", - calc: "derivative", + name: "gpfs_metaops_rate", + desc: "metaops (rate)", + prefix: "metaops", + unit: "requests/sec", + calc: "derivative", }, } @@ -310,9 +311,10 @@ func (m *GpfsCollector) Init(config json.RawMessage) error { return nil } - var err error m.name = "GpfsCollector" - m.setup() + if err := m.setup(); err != nil { + return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err) + } m.parallel = true // Set default mmpmon binary @@ -320,7 +322,7 @@ func (m *GpfsCollector) Init(config json.RawMessage) error { // Read JSON configuration if len(config) > 0 { - err = json.Unmarshal(config, &m.config) + err := json.Unmarshal(config, &m.config) if err != nil { log.Print(err.Error()) return err @@ -366,7 +368,7 @@ func (m *GpfsCollector) Init(config json.RawMessage) error { if m.config.Sudo && !strings.HasPrefix(m.config.Mmpmon, "/") { return fmt.Errorf("when using sudo, mmpmon_path must be provided and an absolute path: %s", m.config.Mmpmon) } - + // Check if mmpmon is in executable search path p, err := exec.LookPath(m.config.Mmpmon) if err != nil { @@ -385,28 +387,28 @@ func (m *GpfsCollector) Init(config json.RawMessage) error { m.definitions = []GpfsMetricDefinition{} if m.config.SendAbsoluteValues { for _, def := range GpfsAbsMetrics { - if _, skip := stringArrayContains(m.config.ExcludeMetrics, def.name); !skip { + if !slices.Contains(m.config.ExcludeMetrics, def.name) { m.definitions = append(m.definitions, def) } } } if m.config.SendDiffValues { for _, def := range GpfsDiffMetrics { - if _, skip := stringArrayContains(m.config.ExcludeMetrics, def.name); !skip { + if !slices.Contains(m.config.ExcludeMetrics, def.name) { m.definitions = append(m.definitions, def) } } } if m.config.SendDerivedValues { for _, def := range GpfsDeriveMetrics { - if _, skip := stringArrayContains(m.config.ExcludeMetrics, def.name); !skip { + if !slices.Contains(m.config.ExcludeMetrics, def.name) { m.definitions = append(m.definitions, def) } } } else if m.config.SendBandwidths { for _, def := range GpfsDeriveMetrics { if def.unit == "bytes/sec" { - if _, skip := stringArrayContains(m.config.ExcludeMetrics, def.name); !skip { + if !slices.Contains(m.config.ExcludeMetrics, def.name) { m.definitions = append(m.definitions, def) } } @@ -414,19 +416,19 @@ func (m *GpfsCollector) Init(config json.RawMessage) error { } if m.config.SendTotalValues { for _, def := range GpfsTotalMetrics { - if _, skip := stringArrayContains(m.config.ExcludeMetrics, def.name); !skip { + if !slices.Contains(m.config.ExcludeMetrics, def.name) { // only send total metrics of the types requested - if ( def.calc == "none" && m.config.SendAbsoluteValues ) || - ( def.calc == "difference" && m.config.SendDiffValues ) || - ( def.calc == "derivative" && m.config.SendDerivedValues ) { + if (def.calc == "none" && m.config.SendAbsoluteValues) || + (def.calc == "difference" && m.config.SendDiffValues) || + (def.calc == "derivative" && m.config.SendDerivedValues) { m.definitions = append(m.definitions, def) - } + } } } } else if m.config.SendBandwidths { for _, def := range GpfsTotalMetrics { if def.unit == "bytes/sec" { - if _, skip := stringArrayContains(m.config.ExcludeMetrics, def.name); !skip { + if !slices.Contains(m.config.ExcludeMetrics, def.name) { m.definitions = append(m.definitions, def) } } @@ -456,7 +458,7 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMessage) { } else { cmd = exec.Command(m.config.Mmpmon, "-p", "-s") } - + cmd.Stdin = strings.NewReader("once fs_io_s\n") cmdStdout := new(bytes.Buffer) cmdStderr := new(bytes.Buffer) @@ -617,7 +619,7 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMessage) { } case "derivative": if vnew_ok && vold_ok && timeDiff > 0 { - value = float64(vnew - vold) / timeDiff + value = float64(vnew-vold) / timeDiff if value.(float64) < 0 { value = 0 } diff --git a/collectors/infinibandMetric.go b/collectors/infinibandMetric.go index 28baac8..96ad611 100644 --- a/collectors/infinibandMetric.go +++ b/collectors/infinibandMetric.go @@ -65,7 +65,9 @@ func (m *InfinibandCollector) Init(config json.RawMessage) error { var err error m.name = "InfinibandCollector" - m.setup() + if err := m.setup(); err != nil { + return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err) + } m.parallel = true m.meta = map[string]string{ "source": m.name, diff --git a/collectors/iostatMetric.go b/collectors/iostatMetric.go index 79c1e9d..0d645b1 100644 --- a/collectors/iostatMetric.go +++ b/collectors/iostatMetric.go @@ -11,7 +11,9 @@ import ( "bufio" "encoding/json" "errors" + "fmt" "os" + "slices" "strconv" "strings" "time" @@ -45,7 +47,9 @@ func (m *IOstatCollector) Init(config json.RawMessage) error { m.name = "IOstatCollector" m.parallel = true m.meta = map[string]string{"source": m.name, "group": "Disk"} - m.setup() + if err := m.setup(); err != nil { + return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err) + } if len(config) > 0 { err = json.Unmarshal(config, &m.config) if err != nil { @@ -75,7 +79,7 @@ func (m *IOstatCollector) Init(config json.RawMessage) error { m.devices = make(map[string]IOstatCollectorEntry) m.matches = make(map[string]int) for k, v := range matches { - if _, skip := stringArrayContains(m.config.ExcludeMetrics, k); !skip { + if !slices.Contains(m.config.ExcludeMetrics, k) { m.matches[k] = v } } @@ -84,10 +88,8 @@ func (m *IOstatCollector) Init(config json.RawMessage) error { } file, err := os.Open(IOSTATFILE) if err != nil { - cclog.ComponentError(m.name, err.Error()) - return err + return fmt.Errorf("%s Init(): Failed to open file \"%s\": %w", m.name, IOSTATFILE, err) } - defer file.Close() scanner := bufio.NewScanner(file) for scanner.Scan() { @@ -101,7 +103,7 @@ func (m *IOstatCollector) Init(config json.RawMessage) error { if strings.Contains(device, "loop") { continue } - if _, skip := stringArrayContains(m.config.ExcludeDevices, device); skip { + if slices.Contains(m.config.ExcludeDevices, device) { continue } currentValues := make(map[string]int64) @@ -127,6 +129,10 @@ func (m *IOstatCollector) Init(config json.RawMessage) error { lastValues: lastValues, } } + if err := file.Close(); err != nil { + return fmt.Errorf("%s Init(): Failed to close file \"%s\": %w", m.name, IOSTATFILE, err) + } + m.init = true return err } @@ -138,10 +144,18 @@ func (m *IOstatCollector) Read(interval time.Duration, output chan lp.CCMessage) file, err := os.Open(IOSTATFILE) if err != nil { - cclog.ComponentError(m.name, err.Error()) + cclog.ComponentError( + m.name, + fmt.Sprintf("Read(): Failed to open file '%s': %v", IOSTATFILE, err)) return } - defer file.Close() + defer func() { + if err := file.Close(); err != nil { + cclog.ComponentError( + m.name, + fmt.Sprintf("Read(): Failed to close file '%s': %v", IOSTATFILE, err)) + } + }() scanner := bufio.NewScanner(file) for scanner.Scan() { @@ -157,7 +171,7 @@ func (m *IOstatCollector) Read(interval time.Duration, output chan lp.CCMessage) if strings.Contains(device, "loop") { continue } - if _, skip := stringArrayContains(m.config.ExcludeDevices, device); skip { + if slices.Contains(m.config.ExcludeDevices, device) { continue } if _, ok := m.devices[device]; !ok { diff --git a/collectors/ipmiMetric.go b/collectors/ipmiMetric.go index 4546bd3..9e6f34d 100644 --- a/collectors/ipmiMetric.go +++ b/collectors/ipmiMetric.go @@ -14,7 +14,6 @@ import ( "errors" "fmt" "io" - "log" "os/exec" "strconv" "strings" @@ -44,7 +43,9 @@ func (m *IpmiCollector) Init(config json.RawMessage) error { } m.name = "IpmiCollector" - m.setup() + if err := m.setup(); err != nil { + return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err) + } m.parallel = true m.meta = map[string]string{ "source": m.name, @@ -116,15 +117,16 @@ func (m *IpmiCollector) readIpmiTool(cmd string, output chan lp.CCMessage) { } v, err := strconv.ParseFloat(strings.TrimSpace(lv[1]), 64) if err == nil { - name := strings.ToLower(strings.Replace(strings.TrimSpace(lv[0]), " ", "_", -1)) + name := strings.ToLower(strings.ReplaceAll(strings.TrimSpace(lv[0]), " ", "_")) unit := strings.TrimSpace(lv[2]) - if unit == "Volts" { + switch unit { + case "Volts": unit = "Volts" - } else if unit == "degrees C" { + case "degrees C": unit = "degC" - } else if unit == "degrees F" { + case "degrees F": unit = "degF" - } else if unit == "Watts" { + case "Watts": unit = "Watts" } @@ -150,22 +152,29 @@ func (m *IpmiCollector) readIpmiTool(cmd string, output chan lp.CCMessage) { func (m *IpmiCollector) readIpmiSensors(cmd string, output chan lp.CCMessage) { + // Setup ipmisensors command command := exec.Command(cmd, "--comma-separated-output", "--sdr-cache-recreate") - command.Wait() - stdout, err := command.Output() - if err != nil { - log.Print(err) + stdout, _ := command.StdoutPipe() + errBuf := new(bytes.Buffer) + command.Stderr = errBuf + + // start command + if err := command.Start(); err != nil { + cclog.ComponentError( + m.name, + fmt.Sprintf("readIpmiSensors(): Failed to start command \"%s\": %v", command.String(), err), + ) return } - ll := strings.Split(string(stdout), "\n") - - for _, line := range ll { - lv := strings.Split(line, ",") + // Read command output + scanner := bufio.NewScanner(stdout) + for scanner.Scan() { + lv := strings.Split(scanner.Text(), ",") if len(lv) > 3 { v, err := strconv.ParseFloat(lv[3], 64) if err == nil { - name := strings.ToLower(strings.Replace(lv[1], " ", "_", -1)) + name := strings.ToLower(strings.ReplaceAll(lv[1], " ", "_")) y, err := lp.NewMessage(name, map[string]string{"type": "node"}, m.meta, map[string]interface{}{"value": v}, time.Now()) if err == nil { if len(lv) > 4 { @@ -176,6 +185,18 @@ func (m *IpmiCollector) readIpmiSensors(cmd string, output chan lp.CCMessage) { } } } + + // Wait for command end + if err := command.Wait(); err != nil { + errMsg, _ := io.ReadAll(errBuf) + cclog.ComponentError( + m.name, + fmt.Sprintf("readIpmiSensors(): Failed to wait for the end of command \"%s\": %v\n", command.String(), err), + ) + cclog.ComponentError(m.name, fmt.Sprintf("readIpmiSensors(): command stderr: \"%s\"\n", strings.TrimSpace(string(errMsg)))) + return + } + } func (m *IpmiCollector) Read(interval time.Duration, output chan lp.CCMessage) { diff --git a/collectors/likwidMetric.go b/collectors/likwidMetric.go index 0056ffe..f5e1c81 100644 --- a/collectors/likwidMetric.go +++ b/collectors/likwidMetric.go @@ -19,6 +19,7 @@ import ( "encoding/json" "errors" "fmt" + "maps" "math" "os" "os/signal" @@ -187,7 +188,7 @@ func getBaseFreq() float64 { for _, f := range files { buffer, err := os.ReadFile(f) if err == nil { - data := strings.Replace(string(buffer), "\n", "", -1) + data := strings.ReplaceAll(string(buffer), "\n", "") x, err := strconv.ParseInt(data, 0, 64) if err == nil { freq = float64(x) @@ -230,9 +231,13 @@ func (m *LikwidCollector) Init(config json.RawMessage) error { if m.config.ForceOverwrite { cclog.ComponentDebug(m.name, "Set LIKWID_FORCE=1") - os.Setenv("LIKWID_FORCE", "1") + if err := os.Setenv("LIKWID_FORCE", "1"); err != nil { + return fmt.Errorf("error setting environment variable LIKWID_FORCE=1: %v", err) + } + } + if err := m.setup(); err != nil { + return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err) } - m.setup() m.meta = map[string]string{"group": "PerfCounter"} cclog.ComponentDebug(m.name, "Get cpulist and init maps and lists") @@ -316,7 +321,14 @@ func (m *LikwidCollector) Init(config json.RawMessage) error { case "accessdaemon": if len(m.config.DaemonPath) > 0 { p := os.Getenv("PATH") - os.Setenv("PATH", m.config.DaemonPath+":"+p) + if len(p) > 0 { + p = m.config.DaemonPath + ":" + p + } else { + p = m.config.DaemonPath + } + if err := os.Setenv("PATH", p); err != nil { + return fmt.Errorf("error setting environment variable PATH=%s: %v", p, err) + } } C.HPMmode(1) retCode := C.HPMinit() @@ -375,10 +387,18 @@ func (m *LikwidCollector) takeMeasurement(evidx int, evset LikwidEventsetConfig, // Watch changes for the lock file () watcher, err := fsnotify.NewWatcher() if err != nil { - cclog.ComponentError(m.name, err.Error()) + cclog.ComponentError( + m.name, + fmt.Sprintf("takeMeasurement(): Failed to create a new fsnotify.Watcher: %v", err)) return true, err } - defer watcher.Close() + defer func() { + if err := watcher.Close(); err != nil { + cclog.ComponentError( + m.name, + fmt.Sprintf("takeMeasurement(): Failed to close fsnotify.Watcher: %v", err)) + } + }() if len(m.config.LockfilePath) > 0 { // Check if the lock file exists info, err := os.Stat(m.config.LockfilePath) @@ -388,7 +408,9 @@ func (m *LikwidCollector) takeMeasurement(evidx int, evset LikwidEventsetConfig, if createErr != nil { return true, fmt.Errorf("failed to create lock file: %v", createErr) } - file.Close() + if err := file.Close(); err != nil { + return true, fmt.Errorf("failed to close lock file: %v", err) + } info, err = os.Stat(m.config.LockfilePath) // Recheck the file after creation } if err != nil { @@ -748,9 +770,7 @@ func (m *LikwidCollector) calcGlobalMetrics(groups []LikwidEventsetConfig, inter // Here we generate parameter list params := make(map[string]float64) for _, evset := range groups { - for mname, mres := range evset.metrics[tid] { - params[mname] = mres - } + maps.Copy(params, evset.metrics[tid]) } params["gotime"] = interval.Seconds() // Evaluate the metric @@ -813,13 +833,21 @@ func (m *LikwidCollector) ReadThread(interval time.Duration, output chan lp.CCMe if !skip { // read measurements and derive event set metrics - m.calcEventsetMetrics(e, interval, output) + err = m.calcEventsetMetrics(e, interval, output) + if err != nil { + cclog.ComponentError(m.name, err.Error()) + return + } groups = append(groups, e) } } if len(groups) > 0 { // calculate global metrics - m.calcGlobalMetrics(groups, interval, output) + err = m.calcGlobalMetrics(groups, interval, output) + if err != nil { + cclog.ComponentError(m.name, err.Error()) + return + } } } diff --git a/collectors/loadavgMetric.go b/collectors/loadavgMetric.go index b0cbd98..7010084 100644 --- a/collectors/loadavgMetric.go +++ b/collectors/loadavgMetric.go @@ -11,6 +11,7 @@ import ( "encoding/json" "fmt" "os" + "slices" "strconv" "strings" "time" @@ -42,7 +43,9 @@ type LoadavgCollector struct { func (m *LoadavgCollector) Init(config json.RawMessage) error { m.name = "LoadavgCollector" m.parallel = true - m.setup() + if err := m.setup(); err != nil { + return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err) + } if len(config) > 0 { err := json.Unmarshal(config, &m.config) if err != nil { @@ -64,10 +67,10 @@ func (m *LoadavgCollector) Init(config json.RawMessage) error { m.proc_skips = make([]bool, len(m.proc_matches)) for i, name := range m.load_matches { - _, m.load_skips[i] = stringArrayContains(m.config.ExcludeMetrics, name) + m.load_skips[i] = slices.Contains(m.config.ExcludeMetrics, name) } for i, name := range m.proc_matches { - _, m.proc_skips[i] = stringArrayContains(m.config.ExcludeMetrics, name) + m.proc_skips[i] = slices.Contains(m.config.ExcludeMetrics, name) } m.init = true return nil diff --git a/collectors/lustreMetric.go b/collectors/lustreMetric.go index c4428eb..23bf9ae 100644 --- a/collectors/lustreMetric.go +++ b/collectors/lustreMetric.go @@ -13,6 +13,7 @@ import ( "fmt" "os/exec" "os/user" + "slices" "strconv" "strings" "time" @@ -61,7 +62,6 @@ func (m *LustreCollector) getDeviceDataCommand(device string) []string { } else { command = exec.Command(m.lctl, LCTL_OPTION, statsfile) } - command.Wait() stdout, _ := command.Output() return strings.Split(string(stdout), "\n") } @@ -302,7 +302,9 @@ func (m *LustreCollector) Init(config json.RawMessage) error { return err } } - m.setup() + if err := m.setup(); err != nil { + return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err) + } m.tags = map[string]string{"type": "node"} m.meta = map[string]string{"source": m.name, "group": "Lustre"} @@ -339,21 +341,21 @@ func (m *LustreCollector) Init(config json.RawMessage) error { m.definitions = []LustreMetricDefinition{} if m.config.SendAbsoluteValues { for _, def := range LustreAbsMetrics { - if _, skip := stringArrayContains(m.config.ExcludeMetrics, def.name); !skip { + if !slices.Contains(m.config.ExcludeMetrics, def.name) { m.definitions = append(m.definitions, def) } } } if m.config.SendDiffValues { for _, def := range LustreDiffMetrics { - if _, skip := stringArrayContains(m.config.ExcludeMetrics, def.name); !skip { + if !slices.Contains(m.config.ExcludeMetrics, def.name) { m.definitions = append(m.definitions, def) } } } if m.config.SendDerivedValues { for _, def := range LustreDeriveMetrics { - if _, skip := stringArrayContains(m.config.ExcludeMetrics, def.name); !skip { + if !slices.Contains(m.config.ExcludeMetrics, def.name) { m.definitions = append(m.definitions, def) } } diff --git a/collectors/memstatMetric.go b/collectors/memstatMetric.go index 572fb36..b934b36 100644 --- a/collectors/memstatMetric.go +++ b/collectors/memstatMetric.go @@ -15,6 +15,7 @@ import ( "os" "path/filepath" "regexp" + "slices" "strconv" "strings" "time" @@ -58,7 +59,11 @@ func getStats(filename string) map[string]MemstatStats { if err != nil { cclog.Error(err.Error()) } - defer file.Close() + defer func() { + if err := file.Close(); err != nil { + cclog.Error(err.Error()) + } + }() scanner := bufio.NewScanner(file) for scanner.Scan() { @@ -115,19 +120,20 @@ func (m *MemstatCollector) Init(config json.RawMessage) error { "MemShared": "mem_shared", } for k, v := range matches { - _, skip := stringArrayContains(m.config.ExcludeMetrics, k) - if !skip { + if !slices.Contains(m.config.ExcludeMetrics, k) { m.matches[k] = v } } m.sendMemUsed = false - if _, skip := stringArrayContains(m.config.ExcludeMetrics, "mem_used"); !skip { + if !slices.Contains(m.config.ExcludeMetrics, "mem_used") { m.sendMemUsed = true } if len(m.matches) == 0 { return errors.New("no metrics to collect") } - m.setup() + if err := m.setup(); err != nil { + return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err) + } if m.config.NodeStats { if stats := getStats(MEMSTATFILE); len(stats) == 0 { @@ -174,7 +180,7 @@ func (m *MemstatCollector) Read(interval time.Duration, output chan lp.CCMessage sendStats := func(stats map[string]MemstatStats, tags map[string]string) { for match, name := range m.matches { var value float64 = 0 - var unit string = "" + unit := "" if v, ok := stats[match]; ok { value = v.value if len(v.unit) > 0 { diff --git a/collectors/metricCollector.go b/collectors/metricCollector.go index b6799d4..3edfe0a 100644 --- a/collectors/metricCollector.go +++ b/collectors/metricCollector.go @@ -51,30 +51,6 @@ func (c *metricCollector) Initialized() bool { return c.init } -// intArrayContains scans an array of ints if the value str is present in the array -// If the specified value is found, the corresponding array index is returned. -// The bool value is used to signal success or failure -func intArrayContains(array []int, str int) (int, bool) { - for i, a := range array { - if a == str { - return i, true - } - } - return -1, false -} - -// stringArrayContains scans an array of strings if the value str is present in the array -// If the specified value is found, the corresponding array index is returned. -// The bool value is used to signal success or failure -func stringArrayContains(array []string, str string) (int, bool) { - for i, a := range array { - if a == str { - return i, true - } - } - return -1, false -} - // RemoveFromStringList removes the string r from the array of strings s // If r is not contained in the array an error is returned func RemoveFromStringList(s []string, r string) ([]string, error) { diff --git a/collectors/netstatMetric.go b/collectors/netstatMetric.go index f2535c1..877e83c 100644 --- a/collectors/netstatMetric.go +++ b/collectors/netstatMetric.go @@ -10,8 +10,9 @@ package collectors import ( "bufio" "encoding/json" - "errors" + "fmt" "os" + "slices" "strconv" "strings" "time" @@ -65,7 +66,9 @@ func getCanonicalName(raw string, aliasToCanonical map[string]string) string { func (m *NetstatCollector) Init(config json.RawMessage) error { m.name = "NetstatCollector" m.parallel = true - m.setup() + if err := m.setup(); err != nil { + return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err) + } m.lastTimestamp = time.Now() const ( @@ -107,10 +110,8 @@ func (m *NetstatCollector) Init(config json.RawMessage) error { // Check access to net statistic file file, err := os.Open(NETSTATFILE) if err != nil { - cclog.ComponentError(m.name, err.Error()) - return err + return fmt.Errorf("%s Init(): failed to open netstat file \"%s\": %w", m.name, NETSTATFILE, err) } - defer file.Close() scanner := bufio.NewScanner(file) for scanner.Scan() { @@ -129,7 +130,7 @@ func (m *NetstatCollector) Init(config json.RawMessage) error { canonical := getCanonicalName(raw, m.aliasToCanonical) // Check if device is a included device - if _, ok := stringArrayContains(m.config.IncludeDevices, canonical); ok { + if slices.Contains(m.config.IncludeDevices, canonical) { // Tag will contain original device name (raw). tags := map[string]string{"stype": "network", "stype-id": raw, "type": "node"} meta_unit_byte := map[string]string{"source": m.name, "group": "Network", "unit": "bytes"} @@ -174,8 +175,13 @@ func (m *NetstatCollector) Init(config json.RawMessage) error { } } + // Close netstat file + if err := file.Close(); err != nil { + return fmt.Errorf("%s Init(): failed to close netstat file \"%s\": %w", m.name, NETSTATFILE, err) + } + if len(m.matches) == 0 { - return errors.New("no devices to collector metrics found") + return fmt.Errorf("%s Init(): no devices to collect metrics found", m.name) } m.init = true return nil @@ -194,10 +200,18 @@ func (m *NetstatCollector) Read(interval time.Duration, output chan lp.CCMessage file, err := os.Open(NETSTATFILE) if err != nil { - cclog.ComponentError(m.name, err.Error()) + cclog.ComponentError( + m.name, + fmt.Sprintf("Read(): Failed to open file '%s': %v", NETSTATFILE, err)) return } - defer file.Close() + defer func() { + if err := file.Close(); err != nil { + cclog.ComponentError( + m.name, + fmt.Sprintf("Read(): Failed to close file '%s': %v", NETSTATFILE, err)) + } + }() scanner := bufio.NewScanner(file) for scanner.Scan() { diff --git a/collectors/nfsMetric.go b/collectors/nfsMetric.go index 78ef16f..7e8b57b 100644 --- a/collectors/nfsMetric.go +++ b/collectors/nfsMetric.go @@ -11,6 +11,7 @@ import ( "encoding/json" "fmt" "log" + "slices" // "os" "os/exec" @@ -18,6 +19,7 @@ import ( "strings" "time" + cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" lp "github.com/ClusterCockpit/cc-lib/v2/ccMessage" ) @@ -44,10 +46,15 @@ type nfsCollector struct { func (m *nfsCollector) initStats() error { cmd := exec.Command(m.config.Nfsstats, `-l`, `--all`) - cmd.Wait() + + // Wait for cmd end + if err := cmd.Wait(); err != nil { + return fmt.Errorf("initStats(): %w", err) + } + buffer, err := cmd.Output() if err == nil { - for _, line := range strings.Split(string(buffer), "\n") { + for line := range strings.Lines(string(buffer)) { lf := strings.Fields(line) if len(lf) != 5 { continue @@ -71,10 +78,15 @@ func (m *nfsCollector) initStats() error { func (m *nfsCollector) updateStats() error { cmd := exec.Command(m.config.Nfsstats, `-l`, `--all`) - cmd.Wait() + + // Wait for cmd end + if err := cmd.Wait(); err != nil { + return fmt.Errorf("updateStats(): %w", err) + } + buffer, err := cmd.Output() if err == nil { - for _, line := range strings.Split(string(buffer), "\n") { + for line := range strings.Lines(string(buffer)) { lf := strings.Fields(line) if len(lf) != 5 { continue @@ -119,7 +131,9 @@ func (m *nfsCollector) MainInit(config json.RawMessage) error { return fmt.Errorf("NfsCollector.Init(): Failed to find nfsstat binary '%s': %v", m.config.Nfsstats, err) } m.data = make(map[string]NfsCollectorData) - m.initStats() + if err := m.initStats(); err != nil { + return fmt.Errorf("NfsCollector.Init(): %w", err) + } m.init = true m.parallel = true return nil @@ -131,7 +145,13 @@ func (m *nfsCollector) Read(interval time.Duration, output chan lp.CCMessage) { } timestamp := time.Now() - m.updateStats() + if err := m.updateStats(); err != nil { + cclog.ComponentError( + m.name, + fmt.Sprintf("Read(): updateStats() failed: %v", err), + ) + return + } prefix := "" switch m.version { case "v3": @@ -143,7 +163,7 @@ func (m *nfsCollector) Read(interval time.Duration, output chan lp.CCMessage) { } for name, data := range m.data { - if _, skip := stringArrayContains(m.config.ExcludeMetrics, name); skip { + if slices.Contains(m.config.ExcludeMetrics, name) { continue } value := data.current - data.last @@ -170,13 +190,17 @@ type Nfs4Collector struct { func (m *Nfs3Collector) Init(config json.RawMessage) error { m.name = "Nfs3Collector" m.version = `v3` - m.setup() + if err := m.setup(); err != nil { + return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err) + } return m.MainInit(config) } func (m *Nfs4Collector) Init(config json.RawMessage) error { m.name = "Nfs4Collector" m.version = `v4` - m.setup() + if err := m.setup(); err != nil { + return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err) + } return m.MainInit(config) } diff --git a/collectors/nfsiostatMetric.go b/collectors/nfsiostatMetric.go index 1cce7d4..e23b8b4 100644 --- a/collectors/nfsiostatMetric.go +++ b/collectors/nfsiostatMetric.go @@ -12,6 +12,7 @@ import ( "fmt" "os" "regexp" + "slices" "strconv" "strings" "time" @@ -71,7 +72,7 @@ func (m *NfsIOStatCollector) readNfsiostats() map[string]map[string]int64 { // Is this a device line with mount point, remote target and NFS version? dev := resolve_regex_fields(l, deviceRegex) if len(dev) > 0 { - if _, ok := stringArrayContains(m.config.ExcludeFilesystem, dev[m.key]); !ok { + if !slices.Contains(m.config.ExcludeFilesystem, dev[m.key]) { current = dev if len(current["version"]) == 0 { current["version"] = "3" @@ -85,7 +86,7 @@ func (m *NfsIOStatCollector) readNfsiostats() map[string]map[string]int64 { if len(bytes) > 0 { data[current[m.key]] = make(map[string]int64) for name, sval := range bytes { - if _, ok := stringArrayContains(m.config.ExcludeMetrics, name); !ok { + if !slices.Contains(m.config.ExcludeMetrics, name) { val, err := strconv.ParseInt(sval, 10, 64) if err == nil { data[current[m.key]][name] = val @@ -102,7 +103,9 @@ func (m *NfsIOStatCollector) readNfsiostats() map[string]map[string]int64 { func (m *NfsIOStatCollector) Init(config json.RawMessage) error { var err error = nil m.name = "NfsIOStatCollector" - m.setup() + if err := m.setup(); err != nil { + return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err) + } m.parallel = true m.meta = map[string]string{"source": m.name, "group": "NFS", "unit": "bytes"} m.tags = map[string]string{"type": "node"} diff --git a/collectors/numastatsMetric.go b/collectors/numastatsMetric.go index 0344b24..b528918 100644 --- a/collectors/numastatsMetric.go +++ b/collectors/numastatsMetric.go @@ -72,7 +72,9 @@ func (m *NUMAStatsCollector) Init(config json.RawMessage) error { m.name = "NUMAStatsCollector" m.parallel = true - m.setup() + if err := m.setup(); err != nil { + return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err) + } m.meta = map[string]string{ "source": m.name, "group": "NUMA", @@ -186,7 +188,11 @@ func (m *NUMAStatsCollector) Read(interval time.Duration, output chan lp.CCMessa t.previousValues[key] = value } } - file.Close() + if err := file.Close(); err != nil { + cclog.ComponentError( + m.name, + fmt.Sprintf("Read(): Failed to close file '%s': %v", t.file, err)) + } } } diff --git a/collectors/nvidiaMetric.go b/collectors/nvidiaMetric.go index 65d46e7..db6d7e0 100644 --- a/collectors/nvidiaMetric.go +++ b/collectors/nvidiaMetric.go @@ -12,6 +12,8 @@ import ( "errors" "fmt" "log" + "maps" + "slices" "strings" "time" @@ -64,7 +66,9 @@ func (m *NvidiaCollector) Init(config json.RawMessage) error { m.config.ProcessMigDevices = false m.config.UseUuidForMigDevices = false m.config.UseSliceForMigDevices = false - m.setup() + if err := m.setup(); err != nil { + return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err) + } if len(config) > 0 { err = json.Unmarshal(config, &m.config) if err != nil { @@ -109,7 +113,7 @@ func (m *NvidiaCollector) Init(config json.RawMessage) error { // Skip excluded devices by ID str_i := fmt.Sprintf("%d", i) - if _, skip := stringArrayContains(m.config.ExcludeDevices, str_i); skip { + if slices.Contains(m.config.ExcludeDevices, str_i) { cclog.ComponentDebug(m.name, "Skipping excluded device", str_i) continue } @@ -137,7 +141,7 @@ func (m *NvidiaCollector) Init(config json.RawMessage) error { pciInfo.Device) // Skip excluded devices specified by PCI ID - if _, skip := stringArrayContains(m.config.ExcludeDevices, pci_id); skip { + if slices.Contains(m.config.ExcludeDevices, pci_id) { cclog.ComponentDebug(m.name, "Skipping excluded device", pci_id) continue } @@ -222,7 +226,7 @@ func readMemoryInfo(device *NvidiaCollectorDevice, output chan lp.CCMessage) err var total uint64 var used uint64 var reserved uint64 = 0 - var v2 bool = false + v2 := false meminfo, ret := nvml.DeviceGetMemoryInfo(device.device) if ret != nvml.SUCCESS { err := errors.New(nvml.ErrorString(ret)) @@ -405,7 +409,8 @@ func readEccMode(device *NvidiaCollectorDevice, output chan lp.CCMessage) error // Changing ECC modes requires a reboot. // The "pending" ECC mode refers to the target mode following the next reboot. _, ecc_pend, ret := nvml.DeviceGetEccMode(device.device) - if ret == nvml.SUCCESS { + switch ret { + case nvml.SUCCESS: var y lp.CCMessage var err error switch ecc_pend { @@ -419,7 +424,7 @@ func readEccMode(device *NvidiaCollectorDevice, output chan lp.CCMessage) error if err == nil { output <- y } - } else if ret == nvml.ERROR_NOT_SUPPORTED { + case nvml.ERROR_NOT_SUPPORTED: y, err := lp.NewMessage("nv_ecc_mode", device.tags, device.meta, map[string]interface{}{"value": "N/A"}, time.Now()) if err == nil { output <- y @@ -768,7 +773,7 @@ func readRemappedRows(device *NvidiaCollectorDevice, output chan lp.CCMessage) e } } if !device.excludeMetrics["nv_remapped_rows_pending"] { - var p int = 0 + p := 0 if pending { p = 1 } @@ -778,7 +783,7 @@ func readRemappedRows(device *NvidiaCollectorDevice, output chan lp.CCMessage) e } } if !device.excludeMetrics["nv_remapped_rows_failure"] { - var f int = 0 + f := 0 if failure { f = 1 } @@ -1275,9 +1280,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMessage) meta: map[string]string{}, excludeMetrics: excludeMetrics, } - for k, v := range m.gpus[i].tags { - migDevice.tags[k] = v - } + maps.Copy(migDevice.tags, m.gpus[i].tags) migDevice.tags["stype"] = "mig" if m.config.UseUuidForMigDevices { uuid, ret := nvml.DeviceGetUUID(mdev) @@ -1291,8 +1294,8 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMessage) if ret == nvml.SUCCESS { mname, ret := nvml.DeviceGetName(mdev) if ret == nvml.SUCCESS { - x := strings.Replace(mname, name, "", -1) - x = strings.Replace(x, "MIG", "", -1) + x := strings.ReplaceAll(mname, name, "") + x = strings.ReplaceAll(x, "MIG", "") x = strings.TrimSpace(x) migDevice.tags["stype-id"] = x } @@ -1301,9 +1304,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMessage) if _, ok := migDevice.tags["stype-id"]; !ok { migDevice.tags["stype-id"] = fmt.Sprintf("%d", j) } - for k, v := range m.gpus[i].meta { - migDevice.meta[k] = v - } + maps.Copy(migDevice.meta, m.gpus[i].meta) if _, ok := migDevice.meta["uuid"]; ok && !m.config.UseUuidForMigDevices { uuid, ret := nvml.DeviceGetUUID(mdev) if ret == nvml.SUCCESS { @@ -1319,7 +1320,9 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMessage) func (m *NvidiaCollector) Close() { if m.init { - nvml.Shutdown() + if ret := nvml.Shutdown(); ret != nvml.SUCCESS { + cclog.ComponentError(m.name, "nvml.Shutdown() not successful") + } m.init = false } } diff --git a/collectors/raplMetric.go b/collectors/raplMetric.go index 7978745..f46aadf 100644 --- a/collectors/raplMetric.go +++ b/collectors/raplMetric.go @@ -54,9 +54,10 @@ func (m *RAPLCollector) Init(config json.RawMessage) error { return nil } - var err error = nil m.name = "RAPLCollector" - m.setup() + if err := m.setup(); err != nil { + return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err) + } m.parallel = true m.meta = map[string]string{ "source": m.name, @@ -66,7 +67,7 @@ func (m *RAPLCollector) Init(config json.RawMessage) error { // Read in the JSON configuration if len(config) > 0 { - err = json.Unmarshal(config, &m.config) + err := json.Unmarshal(config, &m.config) if err != nil { cclog.ComponentError(m.name, "Error reading config:", err.Error()) return err diff --git a/collectors/rocmsmiMetric.go b/collectors/rocmsmiMetric.go index a44ed3f..80fe502 100644 --- a/collectors/rocmsmiMetric.go +++ b/collectors/rocmsmiMetric.go @@ -52,7 +52,9 @@ func (m *RocmSmiCollector) Init(config json.RawMessage) error { // Always set the name early in Init() to use it in cclog.Component* functions m.name = "RocmSmiCollector" // This is for later use, also call it early - m.setup() + if err := m.setup(); err != nil { + return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err) + } // Define meta information sent with each metric // (Can also be dynamic or this is the basic set with extension through AddMeta()) //m.meta = map[string]string{"source": m.name, "group": "AMD"} diff --git a/collectors/sampleMetric.go b/collectors/sampleMetric.go index c124f04..4303107 100644 --- a/collectors/sampleMetric.go +++ b/collectors/sampleMetric.go @@ -9,6 +9,7 @@ package collectors import ( "encoding/json" + "fmt" "time" cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" @@ -41,7 +42,9 @@ func (m *SampleCollector) Init(config json.RawMessage) error { // Always set the name early in Init() to use it in cclog.Component* functions m.name = "SampleCollector" // This is for later use, also call it early - m.setup() + if err := m.setup(); err != nil { + return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err) + } // Tell whether the collector should be run in parallel with others (reading files, ...) // or it should be run serially, mostly for collectors actually doing measurements // because they should not measure the execution of the other collectors diff --git a/collectors/sampleTimerMetric.go b/collectors/sampleTimerMetric.go index 86026bb..cb597bf 100644 --- a/collectors/sampleTimerMetric.go +++ b/collectors/sampleTimerMetric.go @@ -9,6 +9,7 @@ package collectors import ( "encoding/json" + "fmt" "sync" "time" @@ -40,7 +41,9 @@ func (m *SampleTimerCollector) Init(name string, config json.RawMessage) error { // Always set the name early in Init() to use it in cclog.Component* functions m.name = "SampleTimerCollector" // This is for later use, also call it early - m.setup() + if err := m.setup(); err != nil { + return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err) + } // Define meta information sent with each metric // (Can also be dynamic or this is the basic set with extension through AddMeta()) m.meta = map[string]string{"source": m.name, "group": "SAMPLE"} diff --git a/collectors/schedstatMetric.go b/collectors/schedstatMetric.go index 3193ee9..d34e5cc 100644 --- a/collectors/schedstatMetric.go +++ b/collectors/schedstatMetric.go @@ -11,7 +11,6 @@ import ( "bufio" "encoding/json" "fmt" - "math" "os" "strconv" "strings" @@ -47,37 +46,37 @@ type SchedstatCollector struct { // Called once by the collector manager // All tags, meta data tags and metrics that do not change over the runtime should be set here func (m *SchedstatCollector) Init(config json.RawMessage) error { - var err error = nil // Always set the name early in Init() to use it in cclog.Component* functions m.name = "SchedstatCollector" // This is for later use, also call it early - m.setup() + if err := m.setup(); err != nil { + return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err) + } // Tell whether the collector should be run in parallel with others (reading files, ...) // or it should be run serially, mostly for collectors acutally doing measurements // because they should not measure the execution of the other collectors m.parallel = true // Define meta information sent with each metric // (Can also be dynamic or this is the basic set with extension through AddMeta()) - m.meta = map[string]string{"source": m.name, "group": "SCHEDSTAT"} + m.meta = map[string]string{ + "source": m.name, + "group": "SCHEDSTAT", + } // Read in the JSON configuration if len(config) > 0 { - err = json.Unmarshal(config, &m.config) - if err != nil { - cclog.ComponentError(m.name, "Error reading config:", err.Error()) - return err + if err := json.Unmarshal(config, &m.config); err != nil { + return fmt.Errorf("%s Init(): Error reading config: %w", m.name, err) } } // Check input file - file, err := os.Open(string(SCHEDSTATFILE)) + file, err := os.Open(SCHEDSTATFILE) if err != nil { - cclog.ComponentError(m.name, err.Error()) + return fmt.Errorf("%s Init(): Failed opening scheduler statistics file \"%s\": %w", m.name, SCHEDSTATFILE, err) } - defer file.Close() // Pre-generate tags for all CPUs - num_cpus := 0 m.cputags = make(map[string]map[string]string) m.olddata = make(map[string]map[string]int64) scanner := bufio.NewScanner(file) @@ -89,11 +88,19 @@ func (m *SchedstatCollector) Init(config json.RawMessage) error { cpu, _ := strconv.Atoi(cpustr) running, _ := strconv.ParseInt(linefields[7], 10, 64) waiting, _ := strconv.ParseInt(linefields[8], 10, 64) - m.cputags[linefields[0]] = map[string]string{"type": "hwthread", "type-id": fmt.Sprintf("%d", cpu)} - m.olddata[linefields[0]] = map[string]int64{"running": running, "waiting": waiting} - num_cpus++ + m.cputags[linefields[0]] = map[string]string{ + "type": "hwthread", + "type-id": fmt.Sprintf("%d", cpu), + } + m.olddata[linefields[0]] = map[string]int64{ + "running": running, + "waiting": waiting, + } } } + if err := file.Close(); err != nil { + return fmt.Errorf("%s Init(): Failed closing scheduler statistics file \"%s\": %w", m.name, SCHEDSTATFILE, err) + } // Save current timestamp m.lastTimestamp = time.Now() @@ -109,8 +116,8 @@ func (m *SchedstatCollector) ParseProcLine(linefields []string, tags map[string] diff_running := running - m.olddata[linefields[0]]["running"] diff_waiting := waiting - m.olddata[linefields[0]]["waiting"] - var l_running float64 = float64(diff_running) / tsdelta.Seconds() / (math.Pow(1000, 3)) - var l_waiting float64 = float64(diff_waiting) / tsdelta.Seconds() / (math.Pow(1000, 3)) + l_running := float64(diff_running) / tsdelta.Seconds() / 1000_000_000 + l_waiting := float64(diff_waiting) / tsdelta.Seconds() / 1000_000_000 m.olddata[linefields[0]]["running"] = running m.olddata[linefields[0]]["waiting"] = waiting @@ -134,11 +141,19 @@ func (m *SchedstatCollector) Read(interval time.Duration, output chan lp.CCMessa now := time.Now() tsdelta := now.Sub(m.lastTimestamp) - file, err := os.Open(string(SCHEDSTATFILE)) + file, err := os.Open(SCHEDSTATFILE) if err != nil { - cclog.ComponentError(m.name, err.Error()) + cclog.ComponentError( + m.name, + fmt.Sprintf("Read(): Failed to open file '%s': %v", SCHEDSTATFILE, err)) } - defer file.Close() + defer func() { + if err := file.Close(); err != nil { + cclog.ComponentError( + m.name, + fmt.Sprintf("Read(): Failed to close file '%s': %v", SCHEDSTATFILE, err)) + } + }() scanner := bufio.NewScanner(file) for scanner.Scan() { diff --git a/collectors/selfMetric.go b/collectors/selfMetric.go index 7010b70..9c77db7 100644 --- a/collectors/selfMetric.go +++ b/collectors/selfMetric.go @@ -9,6 +9,7 @@ package collectors import ( "encoding/json" + "fmt" "runtime" "syscall" "time" @@ -34,7 +35,9 @@ type SelfCollector struct { func (m *SelfCollector) Init(config json.RawMessage) error { var err error = nil m.name = "SelfCollector" - m.setup() + if err := m.setup(); err != nil { + return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err) + } m.parallel = true m.meta = map[string]string{"source": m.name, "group": "Self"} m.tags = map[string]string{"type": "node"} diff --git a/collectors/slurmCgroupMetric.go b/collectors/slurmCgroupMetric.go index 245f0c5..2759709 100644 --- a/collectors/slurmCgroupMetric.go +++ b/collectors/slurmCgroupMetric.go @@ -50,8 +50,7 @@ func ParseCPUs(cpuset string) ([]int, error) { return result, nil } - ranges := strings.Split(cpuset, ",") - for _, r := range ranges { + for r := range strings.SplitSeq(cpuset, ",") { if strings.Contains(r, "-") { parts := strings.Split(r, "-") if len(parts) != 2 { @@ -103,7 +102,9 @@ func (m *SlurmCgroupCollector) readFile(path string) ([]byte, error) { func (m *SlurmCgroupCollector) Init(config json.RawMessage) error { var err error m.name = "SlurmCgroupCollector" - m.setup() + if err := m.setup(); err != nil { + return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err) + } m.parallel = true m.meta = map[string]string{"source": m.name, "group": "SLURM"} m.tags = map[string]string{"type": "hwthread"} diff --git a/collectors/tempMetric.go b/collectors/tempMetric.go index d18dadb..3471992 100644 --- a/collectors/tempMetric.go +++ b/collectors/tempMetric.go @@ -58,7 +58,9 @@ func (m *TempCollector) Init(config json.RawMessage) error { m.name = "TempCollector" m.parallel = true - m.setup() + if err := m.setup(); err != nil { + return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err) + } if len(config) > 0 { err := json.Unmarshal(config, &m.config) if err != nil { @@ -117,7 +119,7 @@ func (m *TempCollector) Init(config json.RawMessage) error { sensor.metricName = sensor.label } sensor.metricName = strings.ToLower(sensor.metricName) - sensor.metricName = strings.Replace(sensor.metricName, " ", "_", -1) + sensor.metricName = strings.ReplaceAll(sensor.metricName, " ", "_") // Add temperature prefix, if required if !strings.Contains(sensor.metricName, "temp") { sensor.metricName = "temp_" + sensor.metricName diff --git a/collectors/topprocsMetric.go b/collectors/topprocsMetric.go index 140796e..923daaa 100644 --- a/collectors/topprocsMetric.go +++ b/collectors/topprocsMetric.go @@ -9,13 +9,12 @@ package collectors import ( "encoding/json" - "errors" "fmt" - "log" "os/exec" "strings" "time" + cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" lp "github.com/ClusterCockpit/cc-lib/v2/ccMessage" ) @@ -36,12 +35,17 @@ func (m *TopProcsCollector) Init(config json.RawMessage) error { var err error m.name = "TopProcsCollector" m.parallel = true - m.tags = map[string]string{"type": "node"} - m.meta = map[string]string{"source": m.name, "group": "TopProcs"} + m.tags = map[string]string{ + "type": "node", + } + m.meta = map[string]string{ + "source": m.name, + "group": "TopProcs", + } if len(config) > 0 { err = json.Unmarshal(config, &m.config) if err != nil { - return err + return fmt.Errorf("%s Init(): json.Unmarshal() failed: %w", m.name, err) } } else { m.config.Num_procs = int(DEFAULT_NUM_PROCS) @@ -49,12 +53,13 @@ func (m *TopProcsCollector) Init(config json.RawMessage) error { if m.config.Num_procs <= 0 || m.config.Num_procs > MAX_NUM_PROCS { return fmt.Errorf("num_procs option must be set in 'topprocs' config (range: 1-%d)", MAX_NUM_PROCS) } - m.setup() + if err := m.setup(); err != nil { + return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err) + } command := exec.Command("ps", "-Ao", "comm", "--sort=-pcpu") - command.Wait() _, err = command.Output() if err != nil { - return errors.New("failed to execute command") + return fmt.Errorf("%s Init(): failed to get output from command: %w", m.name, err) } m.init = true return nil @@ -65,10 +70,11 @@ func (m *TopProcsCollector) Read(interval time.Duration, output chan lp.CCMessag return } command := exec.Command("ps", "-Ao", "comm", "--sort=-pcpu") - command.Wait() stdout, err := command.Output() if err != nil { - log.Print(m.name, err) + cclog.ComponentError( + m.name, + fmt.Sprintf("Read(): Failed to read output from command \"%s\": %v", command.String(), err)) return } diff --git a/example-configs/collectors.json b/example-configs/collectors.json index 42cb833..a8c1b78 100644 --- a/example-configs/collectors.json +++ b/example-configs/collectors.json @@ -1,6 +1,19 @@ { "cpufreq": {}, "cpufreq_cpuinfo": {}, + "cpustat": { + "exclude_metrics": [ + "cpu_idle" + ] + }, + "diskstat": { + "exclude_metrics": [ + "disk_total" + ], + "exclude_mounts": [ + "slurm-tmpfs" + ] + }, "gpfs": { "exclude_filesystem": [ "test_fs" @@ -21,6 +34,8 @@ }, "numastats": {}, "nvidia": {}, + "schedstat": { + }, "tempstat": { "report_max_temperature": true, "report_critical_temperature": true, @@ -38,4 +53,4 @@ "topprocs": { "num_procs": 5 } -} \ No newline at end of file +} diff --git a/go.mod b/go.mod index d3aef7c..d3d6007 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,6 @@ require ( github.com/influxdata/line-protocol v0.0.0-20210922203350-b1ad95c89adf github.com/tklauser/go-sysconf v0.3.16 golang.design/x/thread v0.0.0-20210122121316-335e9adffdf1 - golang.org/x/exp v0.0.0-20260112195511-716be5621a96 golang.org/x/sys v0.40.0 ) @@ -40,6 +39,7 @@ require ( github.com/tklauser/numcpus v0.11.0 // indirect go.yaml.in/yaml/v2 v2.4.3 // indirect golang.org/x/crypto v0.47.0 // indirect + golang.org/x/exp v0.0.0-20260112195511-716be5621a96 // indirect golang.org/x/net v0.49.0 // indirect google.golang.org/protobuf v1.36.11 // indirect ) diff --git a/internal/metricAggregator/metricAggregator.go b/internal/metricAggregator/metricAggregator.go index ff4c788..ae4d8b5 100644 --- a/internal/metricAggregator/metricAggregator.go +++ b/internal/metricAggregator/metricAggregator.go @@ -10,6 +10,7 @@ package metricAggregator import ( "context" "fmt" + "maps" "math" "os" "strings" @@ -121,9 +122,7 @@ func (c *metricAggregator) Init(output chan lp.CCMessage) error { func (c *metricAggregator) Eval(starttime time.Time, endtime time.Time, metrics []lp.CCMessage) { vars := make(map[string]interface{}) - for k, v := range c.constants { - vars[k] = v - } + maps.Copy(vars, c.constants) vars["starttime"] = starttime vars["endtime"] = endtime for _, f := range c.functions { diff --git a/internal/metricAggregator/metricAggregatorFunctions.go b/internal/metricAggregator/metricAggregatorFunctions.go index 58134c1..998be5a 100644 --- a/internal/metricAggregator/metricAggregatorFunctions.go +++ b/internal/metricAggregator/metricAggregatorFunctions.go @@ -11,10 +11,9 @@ import ( "errors" "fmt" "regexp" + "slices" "strings" - "golang.org/x/exp/slices" - topo "github.com/ClusterCockpit/cc-metric-collector/pkg/ccTopology" ) @@ -169,7 +168,7 @@ func medianfunc(args interface{}) (interface{}, error) { func lenfunc(args interface{}) (interface{}, error) { var err error = nil - var length int = 0 + length := 0 switch values := args.(type) { case []float64: length = len(values) @@ -238,7 +237,7 @@ func matchfunc(args ...interface{}) (interface{}, error) { case string: switch total := args[1].(type) { case string: - smatch := strings.Replace(match, "%", "\\", -1) + smatch := strings.ReplaceAll(match, "%", "\\") regex, err := regexp.Compile(smatch) if err != nil { return false, err diff --git a/internal/metricRouter/metricCache.go b/internal/metricRouter/metricCache.go index 03fcbfc..90c4674 100644 --- a/internal/metricRouter/metricCache.go +++ b/internal/metricRouter/metricCache.go @@ -51,7 +51,7 @@ type MetricCache interface { } func (c *metricCache) Init(output chan lp.CCMessage, ticker mct.MultiChanTicker, wg *sync.WaitGroup, numPeriods int) error { - var err error = nil + var err error c.done = make(chan bool) c.wg = wg c.ticker = ticker @@ -161,8 +161,8 @@ func (c *metricCache) DeleteAggregation(name string) error { // is the current one, index=1 the last interval and so on. Returns and empty array if a wrong index // is given (negative index, index larger than configured number of total intervals, ...) func (c *metricCache) GetPeriod(index int) (time.Time, time.Time, []lp.CCMessage) { - var start time.Time = time.Now() - var stop time.Time = time.Now() + start := time.Now() + stop := time.Now() var metrics []lp.CCMessage if index >= 0 && index < c.numPeriods { pindex := c.curPeriod - index diff --git a/internal/metricRouter/metricRouter.go b/internal/metricRouter/metricRouter.go index 5aadf91..6ecade9 100644 --- a/internal/metricRouter/metricRouter.go +++ b/internal/metricRouter/metricRouter.go @@ -107,10 +107,8 @@ func (r *metricRouter) Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, rout cclog.ComponentError("MetricRouter", err.Error()) return err } - r.maxForward = 1 - if r.config.MaxForward > r.maxForward { - r.maxForward = r.config.MaxForward - } + r.maxForward = max(1, r.config.MaxForward) + if r.config.NumCacheIntervals > 0 { r.cache, err = NewCache(r.cache_input, r.ticker, &r.cachewg, r.config.NumCacheIntervals) if err != nil { @@ -118,50 +116,74 @@ func (r *metricRouter) Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, rout return err } for _, agg := range r.config.IntervalAgg { - r.cache.AddAggregation(agg.Name, agg.Function, agg.Condition, agg.Tags, agg.Meta) + err = r.cache.AddAggregation(agg.Name, agg.Function, agg.Condition, agg.Tags, agg.Meta) + if err != nil { + return fmt.Errorf("MetricCache AddAggregation() failed: %w", err) + } } } p, err := mp.NewMessageProcessor() if err != nil { - return fmt.Errorf("initialization of message processor failed: %v", err.Error()) + return fmt.Errorf("MessageProcessor NewMessageProcessor() failed: %w", err) } r.mp = p if len(r.config.MessageProcessor) > 0 { err = r.mp.FromConfigJSON(r.config.MessageProcessor) if err != nil { - return fmt.Errorf("failed parsing JSON for message processor: %v", err.Error()) + return fmt.Errorf("MessageProcessor FromConfigJSON() failed: %w", err) } } for _, mname := range r.config.DropMetrics { - r.mp.AddDropMessagesByName(mname) + err = r.mp.AddDropMessagesByName(mname) + if err != nil { + return fmt.Errorf("MessageProcessor AddDropMessagesByName() failed: %w", err) + } } for _, cond := range r.config.DropMetricsIf { - r.mp.AddDropMessagesByCondition(cond) + err = r.mp.AddDropMessagesByCondition(cond) + if err != nil { + return fmt.Errorf("MessageProcessor AddDropMessagesByCondition() failed: %w", err) + } } for _, data := range r.config.AddTags { cond := data.Condition if cond == "*" { cond = "true" } - r.mp.AddAddTagsByCondition(cond, data.Key, data.Value) + err = r.mp.AddAddTagsByCondition(cond, data.Key, data.Value) + if err != nil { + return fmt.Errorf("MessageProcessor AddAddTagsByCondition() failed: %w", err) + } } for _, data := range r.config.DelTags { cond := data.Condition if cond == "*" { cond = "true" } - r.mp.AddDeleteTagsByCondition(cond, data.Key, data.Value) + err = r.mp.AddDeleteTagsByCondition(cond, data.Key, data.Value) + if err != nil { + return fmt.Errorf("MessageProcessor AddDeleteTagsByCondition() failed: %w", err) + } } for oldname, newname := range r.config.RenameMetrics { - r.mp.AddRenameMetricByName(oldname, newname) + err = r.mp.AddRenameMetricByName(oldname, newname) + if err != nil { + return fmt.Errorf("MessageProcessor AddRenameMetricByName() failed: %w", err) + } } for metricName, prefix := range r.config.ChangeUnitPrefix { - r.mp.AddChangeUnitPrefix(fmt.Sprintf("name == '%s'", metricName), prefix) + err = r.mp.AddChangeUnitPrefix(fmt.Sprintf("name == '%s'", metricName), prefix) + if err != nil { + return fmt.Errorf("MessageProcessor AddChangeUnitPrefix() failed: %w", err) + } } r.mp.SetNormalizeUnits(r.config.NormalizeUnits) - r.mp.AddAddTagsByCondition("true", r.config.HostnameTagName, r.hostname) + err = r.mp.AddAddTagsByCondition("true", r.config.HostnameTagName, r.hostname) + if err != nil { + return fmt.Errorf("MessageProcessor AddAddTagsByCondition() failed: %w", err) + } // r.config.dropMetrics = make(map[string]bool) // for _, mname := range r.config.DropMetrics { diff --git a/pkg/ccTopology/ccTopology.go b/pkg/ccTopology/ccTopology.go index b0d3bf3..569ab56 100644 --- a/pkg/ccTopology/ccTopology.go +++ b/pkg/ccTopology/ccTopology.go @@ -13,11 +13,11 @@ import ( "os" "path/filepath" "regexp" + "slices" "strconv" "strings" cclogger "github.com/ClusterCockpit/cc-lib/v2/ccLogger" - "golang.org/x/exp/slices" ) const SYSFS_CPUBASE = `/sys/devices/system/cpu` @@ -80,7 +80,7 @@ func fileToList(path string) []int { // Create list list := make([]int, 0) stringBuffer := strings.TrimSpace(string(buffer)) - for _, valueRangeString := range strings.Split(stringBuffer, ",") { + for valueRangeString := range strings.SplitSeq(stringBuffer, ",") { valueRange := strings.Split(valueRangeString, "-") switch len(valueRange) { case 1: