Compare commits

..

19 Commits

Author SHA1 Message Date
Holger Obermaier
e187b971ff Set explicte names for reviewdog upload 2026-01-23 14:33:26 +01:00
Holger Obermaier
80d57b2389 Corrected indentation 2026-01-23 14:25:55 +01:00
Holger Obermaier
43fd38060d Revert "Try github-annotations for reporting"
This reverts commit 75c602e571.
2026-01-23 14:23:54 +01:00
Holger Obermaier
75c602e571 Try github-annotations for reporting 2026-01-23 14:21:31 +01:00
Holger Obermaier
f2b965025a Add GolangCI-Lint 2026-01-23 13:57:38 +01:00
Holger Obermaier
9e0f4d12e0 Add missing secrete 2026-01-23 13:43:14 +01:00
Holger Obermaier
32be9d8fa9 Add upload for go vet 2026-01-23 13:41:25 +01:00
Holger Obermaier
ea64c377ac Combine staticcheck and upload step 2026-01-23 13:21:54 +01:00
Holger Obermaier
75dd2354f0 Set REVIEWDOG_GITHUB_API_TOKEN 2026-01-23 12:50:06 +01:00
Holger Obermaier
5a66b9cadd Use stable golang version 2026-01-23 12:33:51 +01:00
Holger Obermaier
c63df0ca85 Use available report 2026-01-23 11:43:22 +01:00
Holger Obermaier
d66c99351e Upload staticcheck report 2026-01-23 11:28:17 +01:00
Holger Obermaier
fab45d4423 Run Static Analysis with staticcheck 2026-01-23 11:16:39 +01:00
Holger Obermaier
97483a2e78 Only run go vet 2026-01-23 11:00:01 +01:00
Holger Obermaier
86f835ac27 Change step order to assure likwid.h is available 2026-01-23 10:55:47 +01:00
Holger Obermaier
1bf9ba976c Use same version as in go.mod file 2026-01-23 10:44:03 +01:00
Holger Obermaier
e288ed9d80 Force golang Version 1.25 2026-01-23 10:41:41 +01:00
Holger Obermaier
b82647c68d Add vet tool 2026-01-23 10:39:55 +01:00
Holger Obermaier
9510771603 Static Analysis with GolangCI-Lint 2026-01-23 10:14:45 +01:00
40 changed files with 422 additions and 657 deletions

View File

@@ -72,11 +72,6 @@ staticcheck:
$(GOBIN) install honnef.co/go/tools/cmd/staticcheck@latest $(GOBIN) install honnef.co/go/tools/cmd/staticcheck@latest
$$($(GOBIN) env GOPATH)/bin/staticcheck ./... $$($(GOBIN) env GOPATH)/bin/staticcheck ./...
.PHONY: golangci-lint
golangci-lint:
$(GOBIN) install github.com/golangci/golangci-lint/v2/cmd/golangci-lint@latest
$$($(GOBIN) env GOPATH)/bin/golangci-lint run
.ONESHELL: .ONESHELL:
.PHONY: RPM .PHONY: RPM
RPM: scripts/cc-metric-collector.spec RPM: scripts/cc-metric-collector.spec

View File

@@ -67,7 +67,7 @@ A collector reads data from any source, parses it to metrics and submits these m
* `Read(duration time.Duration, output chan ccMessage.CCMessage)`: Read, parse and submit data to the `output` channel as [`CCMessage`](https://github.com/ClusterCockpit/cc-lib/blob/main/ccMessage/README.md). If the collector has to measure anything for some duration, use the provided function argument `duration`. * `Read(duration time.Duration, output chan ccMessage.CCMessage)`: Read, parse and submit data to the `output` channel as [`CCMessage`](https://github.com/ClusterCockpit/cc-lib/blob/main/ccMessage/README.md). If the collector has to measure anything for some duration, use the provided function argument `duration`.
* `Close()`: Closes down the collector. * `Close()`: Closes down the collector.
It is recommended to call `setup()` in the `Init()` function. It is recommanded to call `setup()` in the `Init()` function.
Finally, the collector needs to be registered in the `collectorManager.go`. There is a list of collectors called `AvailableCollectors` which is a map (`collector_type_string` -> `pointer to MetricCollector interface`). Add a new entry with a descriptive name and the new collector. Finally, the collector needs to be registered in the `collectorManager.go`. There is a list of collectors called `AvailableCollectors` which is a map (`collector_type_string` -> `pointer to MetricCollector interface`). Add a new entry with a descriptive name and the new collector.
@@ -100,12 +100,11 @@ func (m *SampleCollector) Init(config json.RawMessage) error {
} }
m.name = "SampleCollector" m.name = "SampleCollector"
if err := m.setup(); err != nil { m.setup()
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
}
if len(config) > 0 { if len(config) > 0 {
if err := json.Unmarshal(config, &m.config); err != nil { err := json.Unmarshal(config, &m.config)
return fmt.Errorf("%s Init(): json.Unmarshal() call failed: %w", m.name, err) if err != nil {
return err
} }
} }
m.meta = map[string]string{"source": m.name, "group": "Sample"} m.meta = map[string]string{"source": m.name, "group": "Sample"}

View File

@@ -17,7 +17,6 @@ import (
"os/exec" "os/exec"
"os/user" "os/user"
"regexp" "regexp"
"slices"
"strconv" "strconv"
"strings" "strings"
"time" "time"
@@ -62,9 +61,7 @@ func (m *BeegfsMetaCollector) Init(config json.RawMessage) error {
"rmXA", "setXA", "mirror"} "rmXA", "setXA", "mirror"}
m.name = "BeegfsMetaCollector" m.name = "BeegfsMetaCollector"
if err := m.setup(); err != nil { m.setup()
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
}
m.parallel = true m.parallel = true
// Set default beegfs-ctl binary // Set default beegfs-ctl binary
@@ -81,7 +78,8 @@ func (m *BeegfsMetaCollector) Init(config json.RawMessage) error {
//create map with possible variables //create map with possible variables
m.matches = make(map[string]string) m.matches = make(map[string]string)
for _, value := range nodeMdstat_array { for _, value := range nodeMdstat_array {
if slices.Contains(m.config.ExcludeMetrics, value) { _, skip := stringArrayContains(m.config.ExcludeMetrics, value)
if skip {
m.matches["other"] = "0" m.matches["other"] = "0"
} else { } else {
m.matches["beegfs_cmeta_"+value] = "0" m.matches["beegfs_cmeta_"+value] = "0"

View File

@@ -17,7 +17,6 @@ import (
"os/exec" "os/exec"
"os/user" "os/user"
"regexp" "regexp"
"slices"
"strconv" "strconv"
"strings" "strings"
"time" "time"
@@ -55,9 +54,7 @@ func (m *BeegfsStorageCollector) Init(config json.RawMessage) error {
"storInf", "unlnk"} "storInf", "unlnk"}
m.name = "BeegfsStorageCollector" m.name = "BeegfsStorageCollector"
if err := m.setup(); err != nil { m.setup()
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
}
m.parallel = true m.parallel = true
// Set default beegfs-ctl binary // Set default beegfs-ctl binary
@@ -74,7 +71,8 @@ func (m *BeegfsStorageCollector) Init(config json.RawMessage) error {
//create map with possible variables //create map with possible variables
m.matches = make(map[string]string) m.matches = make(map[string]string)
for _, value := range storageStat_array { for _, value := range storageStat_array {
if slices.Contains(m.config.ExcludeMetrics, value) { _, skip := stringArrayContains(m.config.ExcludeMetrics, value)
if skip {
m.matches["other"] = "0" m.matches["other"] = "0"
} else { } else {
m.matches["beegfs_cstorage_"+value] = "0" m.matches["beegfs_cstorage_"+value] = "0"

View File

@@ -9,7 +9,6 @@ package collectors
import ( import (
"encoding/json" "encoding/json"
"fmt"
"sync" "sync"
"time" "time"
@@ -105,7 +104,7 @@ func (cm *collectorManager) Init(ticker mct.MultiChanTicker, duration time.Durat
err = collector.Init(collectorCfg) err = collector.Init(collectorCfg)
if err != nil { if err != nil {
cclog.ComponentError("CollectorManager", fmt.Sprintf("Collector %s initialization failed: %v", collectorName, err)) cclog.ComponentError("CollectorManager", "Collector", collectorName, "initialization failed:", err.Error())
continue continue
} }
cclog.ComponentDebug("CollectorManager", "ADD COLLECTOR", collector.Name()) cclog.ComponentDebug("CollectorManager", "ADD COLLECTOR", collector.Name())

View File

@@ -41,10 +41,9 @@ func (m *CPUFreqCpuInfoCollector) Init(config json.RawMessage) error {
return nil return nil
} }
m.setup()
m.name = "CPUFreqCpuInfoCollector" m.name = "CPUFreqCpuInfoCollector"
if err := m.setup(); err != nil {
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
}
m.parallel = true m.parallel = true
m.meta = map[string]string{ m.meta = map[string]string{
"source": m.name, "source": m.name,
@@ -57,6 +56,7 @@ func (m *CPUFreqCpuInfoCollector) Init(config json.RawMessage) error {
if err != nil { if err != nil {
return fmt.Errorf("failed to open file '%s': %v", cpuInfoFile, err) return fmt.Errorf("failed to open file '%s': %v", cpuInfoFile, err)
} }
defer file.Close()
// Collect topology information from file cpuinfo // Collect topology information from file cpuinfo
foundFreq := false foundFreq := false
@@ -86,10 +86,6 @@ func (m *CPUFreqCpuInfoCollector) Init(config json.RawMessage) error {
} }
} }
if err := file.Close(); err != nil {
return fmt.Errorf("%s Init(): Call to file.Close() failed: %w", m.name, err)
}
// were all topology information collected? // were all topology information collected?
if foundFreq && if foundFreq &&
len(processor) > 0 && len(processor) > 0 &&
@@ -144,13 +140,7 @@ func (m *CPUFreqCpuInfoCollector) Read(interval time.Duration, output chan lp.CC
fmt.Sprintf("Read(): Failed to open file '%s': %v", cpuInfoFile, err)) fmt.Sprintf("Read(): Failed to open file '%s': %v", cpuInfoFile, err))
return return
} }
defer func() { defer file.Close()
if err := file.Close(); err != nil {
cclog.ComponentError(
m.name,
fmt.Sprintf("Read(): Failed to close file '%s': %v", cpuInfoFile, err))
}
}()
processorCounter := 0 processorCounter := 0
now := time.Now() now := time.Now()

View File

@@ -48,9 +48,7 @@ func (m *CPUFreqCollector) Init(config json.RawMessage) error {
} }
m.name = "CPUFreqCollector" m.name = "CPUFreqCollector"
if err := m.setup(); err != nil { m.setup()
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
}
m.parallel = true m.parallel = true
if len(config) > 0 { if len(config) > 0 {
err := json.Unmarshal(config, &m.config) err := json.Unmarshal(config, &m.config)

View File

@@ -12,7 +12,6 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"os" "os"
"slices"
"strconv" "strconv"
"strings" "strings"
"time" "time"
@@ -40,17 +39,10 @@ type CpustatCollector struct {
func (m *CpustatCollector) Init(config json.RawMessage) error { func (m *CpustatCollector) Init(config json.RawMessage) error {
m.name = "CpustatCollector" m.name = "CpustatCollector"
if err := m.setup(); err != nil { m.setup()
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
}
m.parallel = true m.parallel = true
m.meta = map[string]string{ m.meta = map[string]string{"source": m.name, "group": "CPU"}
"source": m.name, m.nodetags = map[string]string{"type": "node"}
"group": "CPU",
}
m.nodetags = map[string]string{
"type": "node",
}
if len(config) > 0 { if len(config) > 0 {
err := json.Unmarshal(config, &m.config) err := json.Unmarshal(config, &m.config)
if err != nil { if err != nil {
@@ -72,7 +64,14 @@ func (m *CpustatCollector) Init(config json.RawMessage) error {
m.matches = make(map[string]int) m.matches = make(map[string]int)
for match, index := range matches { for match, index := range matches {
if !slices.Contains(m.config.ExcludeMetrics, match) { doExclude := false
for _, exclude := range m.config.ExcludeMetrics {
if match == exclude {
doExclude = true
break
}
}
if !doExclude {
m.matches[match] = index m.matches[match] = index
} }
} }
@@ -80,17 +79,9 @@ func (m *CpustatCollector) Init(config json.RawMessage) error {
// Check input file // Check input file
file, err := os.Open(string(CPUSTATFILE)) file, err := os.Open(string(CPUSTATFILE))
if err != nil { if err != nil {
cclog.ComponentError( cclog.ComponentError(m.name, err.Error())
m.name,
fmt.Sprintf("Init(): Failed to open file '%s': %v", string(CPUSTATFILE), err))
} }
defer func() { defer file.Close()
if err := file.Close(); err != nil {
cclog.ComponentError(
m.name,
fmt.Sprintf("Init(): Failed to close file '%s': %v", string(CPUSTATFILE), err))
}
}()
// Pre-generate tags for all CPUs // Pre-generate tags for all CPUs
num_cpus := 0 num_cpus := 0
@@ -164,17 +155,9 @@ func (m *CpustatCollector) Read(interval time.Duration, output chan lp.CCMessage
file, err := os.Open(string(CPUSTATFILE)) file, err := os.Open(string(CPUSTATFILE))
if err != nil { if err != nil {
cclog.ComponentError( cclog.ComponentError(m.name, err.Error())
m.name,
fmt.Sprintf("Read(): Failed to open file '%s': %v", string(CPUSTATFILE), err))
} }
defer func() { defer file.Close()
if err := file.Close(); err != nil {
cclog.ComponentError(
m.name,
fmt.Sprintf("Read(): Failed to close file '%s': %v", string(CPUSTATFILE), err))
}
}()
scanner := bufio.NewScanner(file) scanner := bufio.NewScanner(file)
for scanner.Scan() { for scanner.Scan() {

View File

@@ -10,11 +10,9 @@ package collectors
import ( import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt"
"log" "log"
"os" "os"
"os/exec" "os/exec"
"slices"
"strings" "strings"
"time" "time"
@@ -51,16 +49,11 @@ func (m *CustomCmdCollector) Init(config json.RawMessage) error {
return err return err
} }
} }
if err := m.setup(); err != nil { m.setup()
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
}
for _, c := range m.config.Commands { for _, c := range m.config.Commands {
cmdfields := strings.Fields(c) cmdfields := strings.Fields(c)
command := exec.Command(cmdfields[0], cmdfields[1:]...) command := exec.Command(cmdfields[0], strings.Join(cmdfields[1:], " "))
if err := command.Wait(); err != nil { command.Wait()
log.Print(err)
continue
}
_, err = command.Output() _, err = command.Output()
if err == nil { if err == nil {
m.commands = append(m.commands, c) m.commands = append(m.commands, c)
@@ -95,11 +88,8 @@ func (m *CustomCmdCollector) Read(interval time.Duration, output chan lp.CCMessa
} }
for _, cmd := range m.commands { for _, cmd := range m.commands {
cmdfields := strings.Fields(cmd) cmdfields := strings.Fields(cmd)
command := exec.Command(cmdfields[0], cmdfields[1:]...) command := exec.Command(cmdfields[0], strings.Join(cmdfields[1:], " "))
if err := command.Wait(); err != nil { command.Wait()
log.Print(err)
continue
}
stdout, err := command.Output() stdout, err := command.Output()
if err != nil { if err != nil {
log.Print(err) log.Print(err)
@@ -111,7 +101,8 @@ func (m *CustomCmdCollector) Read(interval time.Duration, output chan lp.CCMessa
continue continue
} }
for _, c := range cmdmetrics { for _, c := range cmdmetrics {
if slices.Contains(m.config.ExcludeMetrics, c.Name()) { _, skip := stringArrayContains(m.config.ExcludeMetrics, c.Name())
if skip {
continue continue
} }
@@ -130,7 +121,8 @@ func (m *CustomCmdCollector) Read(interval time.Duration, output chan lp.CCMessa
continue continue
} }
for _, f := range fmetrics { for _, f := range fmetrics {
if slices.Contains(m.config.ExcludeMetrics, f.Name()) { _, skip := stringArrayContains(m.config.ExcludeMetrics, f.Name())
if skip {
continue continue
} }
output <- lp.FromInfluxMetric(f) output <- lp.FromInfluxMetric(f)

View File

@@ -10,7 +10,6 @@ package collectors
import ( import (
"bufio" "bufio"
"encoding/json" "encoding/json"
"fmt"
"os" "os"
"strings" "strings"
"syscall" "syscall"
@@ -37,9 +36,7 @@ func (m *DiskstatCollector) Init(config json.RawMessage) error {
m.name = "DiskstatCollector" m.name = "DiskstatCollector"
m.parallel = true m.parallel = true
m.meta = map[string]string{"source": m.name, "group": "Disk"} m.meta = map[string]string{"source": m.name, "group": "Disk"}
if err := m.setup(); err != nil { m.setup()
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
}
if len(config) > 0 { if len(config) > 0 {
if err := json.Unmarshal(config, &m.config); err != nil { if err := json.Unmarshal(config, &m.config); err != nil {
return err return err
@@ -57,11 +54,10 @@ func (m *DiskstatCollector) Init(config json.RawMessage) error {
} }
file, err := os.Open(MOUNTFILE) file, err := os.Open(MOUNTFILE)
if err != nil { if err != nil {
return fmt.Errorf("%s Init(): file open for file \"%s\" failed: %w", m.name, MOUNTFILE, err) cclog.ComponentError(m.name, err.Error())
} return err
if err := file.Close(); err != nil {
return fmt.Errorf("%s Init(): file close for file \"%s\" failed: %w", m.name, MOUNTFILE, err)
} }
defer file.Close()
m.init = true m.init = true
return nil return nil
} }
@@ -73,18 +69,10 @@ func (m *DiskstatCollector) Read(interval time.Duration, output chan lp.CCMessag
file, err := os.Open(MOUNTFILE) file, err := os.Open(MOUNTFILE)
if err != nil { if err != nil {
cclog.ComponentError( cclog.ComponentError(m.name, err.Error())
m.name,
fmt.Sprintf("Read(): Failed to open file '%s': %v", MOUNTFILE, err))
return return
} }
defer func() { defer file.Close()
if err := file.Close(); err != nil {
cclog.ComponentError(
m.name,
fmt.Sprintf("Read(): Failed to close file '%s': %v", MOUNTFILE, err))
}
}()
part_max_used := uint64(0) part_max_used := uint64(0)
scanner := bufio.NewScanner(file) scanner := bufio.NewScanner(file)
@@ -105,7 +93,7 @@ mountLoop:
continue continue
} }
mountPath := strings.ReplaceAll(linefields[1], `\040`, " ") mountPath := strings.Replace(linefields[1], `\040`, " ", -1)
for _, excl := range m.config.ExcludeMounts { for _, excl := range m.config.ExcludeMounts {
if strings.Contains(mountPath, excl) { if strings.Contains(mountPath, excl) {

View File

@@ -17,7 +17,6 @@ import (
"log" "log"
"os/exec" "os/exec"
"os/user" "os/user"
"slices"
"strconv" "strconv"
"strings" "strings"
"syscall" "syscall"
@@ -311,10 +310,9 @@ func (m *GpfsCollector) Init(config json.RawMessage) error {
return nil return nil
} }
var err error
m.name = "GpfsCollector" m.name = "GpfsCollector"
if err := m.setup(); err != nil { m.setup()
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
}
m.parallel = true m.parallel = true
// Set default mmpmon binary // Set default mmpmon binary
@@ -322,7 +320,7 @@ func (m *GpfsCollector) Init(config json.RawMessage) error {
// Read JSON configuration // Read JSON configuration
if len(config) > 0 { if len(config) > 0 {
err := json.Unmarshal(config, &m.config) err = json.Unmarshal(config, &m.config)
if err != nil { if err != nil {
log.Print(err.Error()) log.Print(err.Error())
return err return err
@@ -387,28 +385,28 @@ func (m *GpfsCollector) Init(config json.RawMessage) error {
m.definitions = []GpfsMetricDefinition{} m.definitions = []GpfsMetricDefinition{}
if m.config.SendAbsoluteValues { if m.config.SendAbsoluteValues {
for _, def := range GpfsAbsMetrics { for _, def := range GpfsAbsMetrics {
if !slices.Contains(m.config.ExcludeMetrics, def.name) { if _, skip := stringArrayContains(m.config.ExcludeMetrics, def.name); !skip {
m.definitions = append(m.definitions, def) m.definitions = append(m.definitions, def)
} }
} }
} }
if m.config.SendDiffValues { if m.config.SendDiffValues {
for _, def := range GpfsDiffMetrics { for _, def := range GpfsDiffMetrics {
if !slices.Contains(m.config.ExcludeMetrics, def.name) { if _, skip := stringArrayContains(m.config.ExcludeMetrics, def.name); !skip {
m.definitions = append(m.definitions, def) m.definitions = append(m.definitions, def)
} }
} }
} }
if m.config.SendDerivedValues { if m.config.SendDerivedValues {
for _, def := range GpfsDeriveMetrics { for _, def := range GpfsDeriveMetrics {
if !slices.Contains(m.config.ExcludeMetrics, def.name) { if _, skip := stringArrayContains(m.config.ExcludeMetrics, def.name); !skip {
m.definitions = append(m.definitions, def) m.definitions = append(m.definitions, def)
} }
} }
} else if m.config.SendBandwidths { } else if m.config.SendBandwidths {
for _, def := range GpfsDeriveMetrics { for _, def := range GpfsDeriveMetrics {
if def.unit == "bytes/sec" { if def.unit == "bytes/sec" {
if !slices.Contains(m.config.ExcludeMetrics, def.name) { if _, skip := stringArrayContains(m.config.ExcludeMetrics, def.name); !skip {
m.definitions = append(m.definitions, def) m.definitions = append(m.definitions, def)
} }
} }
@@ -416,11 +414,11 @@ func (m *GpfsCollector) Init(config json.RawMessage) error {
} }
if m.config.SendTotalValues { if m.config.SendTotalValues {
for _, def := range GpfsTotalMetrics { for _, def := range GpfsTotalMetrics {
if !slices.Contains(m.config.ExcludeMetrics, def.name) { if _, skip := stringArrayContains(m.config.ExcludeMetrics, def.name); !skip {
// only send total metrics of the types requested // only send total metrics of the types requested
if (def.calc == "none" && m.config.SendAbsoluteValues) || if ( def.calc == "none" && m.config.SendAbsoluteValues ) ||
(def.calc == "difference" && m.config.SendDiffValues) || ( def.calc == "difference" && m.config.SendDiffValues ) ||
(def.calc == "derivative" && m.config.SendDerivedValues) { ( def.calc == "derivative" && m.config.SendDerivedValues ) {
m.definitions = append(m.definitions, def) m.definitions = append(m.definitions, def)
} }
} }
@@ -428,7 +426,7 @@ func (m *GpfsCollector) Init(config json.RawMessage) error {
} else if m.config.SendBandwidths { } else if m.config.SendBandwidths {
for _, def := range GpfsTotalMetrics { for _, def := range GpfsTotalMetrics {
if def.unit == "bytes/sec" { if def.unit == "bytes/sec" {
if !slices.Contains(m.config.ExcludeMetrics, def.name) { if _, skip := stringArrayContains(m.config.ExcludeMetrics, def.name); !skip {
m.definitions = append(m.definitions, def) m.definitions = append(m.definitions, def)
} }
} }
@@ -619,7 +617,7 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMessage) {
} }
case "derivative": case "derivative":
if vnew_ok && vold_ok && timeDiff > 0 { if vnew_ok && vold_ok && timeDiff > 0 {
value = float64(vnew-vold) / timeDiff value = float64(vnew - vold) / timeDiff
if value.(float64) < 0 { if value.(float64) < 0 {
value = 0 value = 0
} }

View File

@@ -65,9 +65,7 @@ func (m *InfinibandCollector) Init(config json.RawMessage) error {
var err error var err error
m.name = "InfinibandCollector" m.name = "InfinibandCollector"
if err := m.setup(); err != nil { m.setup()
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
}
m.parallel = true m.parallel = true
m.meta = map[string]string{ m.meta = map[string]string{
"source": m.name, "source": m.name,

View File

@@ -11,9 +11,7 @@ import (
"bufio" "bufio"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt"
"os" "os"
"slices"
"strconv" "strconv"
"strings" "strings"
"time" "time"
@@ -47,9 +45,7 @@ func (m *IOstatCollector) Init(config json.RawMessage) error {
m.name = "IOstatCollector" m.name = "IOstatCollector"
m.parallel = true m.parallel = true
m.meta = map[string]string{"source": m.name, "group": "Disk"} m.meta = map[string]string{"source": m.name, "group": "Disk"}
if err := m.setup(); err != nil { m.setup()
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
}
if len(config) > 0 { if len(config) > 0 {
err = json.Unmarshal(config, &m.config) err = json.Unmarshal(config, &m.config)
if err != nil { if err != nil {
@@ -79,7 +75,7 @@ func (m *IOstatCollector) Init(config json.RawMessage) error {
m.devices = make(map[string]IOstatCollectorEntry) m.devices = make(map[string]IOstatCollectorEntry)
m.matches = make(map[string]int) m.matches = make(map[string]int)
for k, v := range matches { for k, v := range matches {
if !slices.Contains(m.config.ExcludeMetrics, k) { if _, skip := stringArrayContains(m.config.ExcludeMetrics, k); !skip {
m.matches[k] = v m.matches[k] = v
} }
} }
@@ -88,8 +84,10 @@ func (m *IOstatCollector) Init(config json.RawMessage) error {
} }
file, err := os.Open(IOSTATFILE) file, err := os.Open(IOSTATFILE)
if err != nil { if err != nil {
return fmt.Errorf("%s Init(): Failed to open file \"%s\": %w", m.name, IOSTATFILE, err) cclog.ComponentError(m.name, err.Error())
return err
} }
defer file.Close()
scanner := bufio.NewScanner(file) scanner := bufio.NewScanner(file)
for scanner.Scan() { for scanner.Scan() {
@@ -103,7 +101,7 @@ func (m *IOstatCollector) Init(config json.RawMessage) error {
if strings.Contains(device, "loop") { if strings.Contains(device, "loop") {
continue continue
} }
if slices.Contains(m.config.ExcludeDevices, device) { if _, skip := stringArrayContains(m.config.ExcludeDevices, device); skip {
continue continue
} }
currentValues := make(map[string]int64) currentValues := make(map[string]int64)
@@ -129,10 +127,6 @@ func (m *IOstatCollector) Init(config json.RawMessage) error {
lastValues: lastValues, lastValues: lastValues,
} }
} }
if err := file.Close(); err != nil {
return fmt.Errorf("%s Init(): Failed to close file \"%s\": %w", m.name, IOSTATFILE, err)
}
m.init = true m.init = true
return err return err
} }
@@ -144,18 +138,10 @@ func (m *IOstatCollector) Read(interval time.Duration, output chan lp.CCMessage)
file, err := os.Open(IOSTATFILE) file, err := os.Open(IOSTATFILE)
if err != nil { if err != nil {
cclog.ComponentError( cclog.ComponentError(m.name, err.Error())
m.name,
fmt.Sprintf("Read(): Failed to open file '%s': %v", IOSTATFILE, err))
return return
} }
defer func() { defer file.Close()
if err := file.Close(); err != nil {
cclog.ComponentError(
m.name,
fmt.Sprintf("Read(): Failed to close file '%s': %v", IOSTATFILE, err))
}
}()
scanner := bufio.NewScanner(file) scanner := bufio.NewScanner(file)
for scanner.Scan() { for scanner.Scan() {
@@ -171,7 +157,7 @@ func (m *IOstatCollector) Read(interval time.Duration, output chan lp.CCMessage)
if strings.Contains(device, "loop") { if strings.Contains(device, "loop") {
continue continue
} }
if slices.Contains(m.config.ExcludeDevices, device) { if _, skip := stringArrayContains(m.config.ExcludeDevices, device); skip {
continue continue
} }
if _, ok := m.devices[device]; !ok { if _, ok := m.devices[device]; !ok {

View File

@@ -14,6 +14,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"io" "io"
"log"
"os/exec" "os/exec"
"strconv" "strconv"
"strings" "strings"
@@ -43,9 +44,7 @@ func (m *IpmiCollector) Init(config json.RawMessage) error {
} }
m.name = "IpmiCollector" m.name = "IpmiCollector"
if err := m.setup(); err != nil { m.setup()
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
}
m.parallel = true m.parallel = true
m.meta = map[string]string{ m.meta = map[string]string{
"source": m.name, "source": m.name,
@@ -117,16 +116,15 @@ func (m *IpmiCollector) readIpmiTool(cmd string, output chan lp.CCMessage) {
} }
v, err := strconv.ParseFloat(strings.TrimSpace(lv[1]), 64) v, err := strconv.ParseFloat(strings.TrimSpace(lv[1]), 64)
if err == nil { if err == nil {
name := strings.ToLower(strings.ReplaceAll(strings.TrimSpace(lv[0]), " ", "_")) name := strings.ToLower(strings.Replace(strings.TrimSpace(lv[0]), " ", "_", -1))
unit := strings.TrimSpace(lv[2]) unit := strings.TrimSpace(lv[2])
switch unit { if unit == "Volts" {
case "Volts":
unit = "Volts" unit = "Volts"
case "degrees C": } else if unit == "degrees C" {
unit = "degC" unit = "degC"
case "degrees F": } else if unit == "degrees F" {
unit = "degF" unit = "degF"
case "Watts": } else if unit == "Watts" {
unit = "Watts" unit = "Watts"
} }
@@ -152,29 +150,22 @@ func (m *IpmiCollector) readIpmiTool(cmd string, output chan lp.CCMessage) {
func (m *IpmiCollector) readIpmiSensors(cmd string, output chan lp.CCMessage) { func (m *IpmiCollector) readIpmiSensors(cmd string, output chan lp.CCMessage) {
// Setup ipmisensors command
command := exec.Command(cmd, "--comma-separated-output", "--sdr-cache-recreate") command := exec.Command(cmd, "--comma-separated-output", "--sdr-cache-recreate")
stdout, _ := command.StdoutPipe() command.Wait()
errBuf := new(bytes.Buffer) stdout, err := command.Output()
command.Stderr = errBuf if err != nil {
log.Print(err)
// start command
if err := command.Start(); err != nil {
cclog.ComponentError(
m.name,
fmt.Sprintf("readIpmiSensors(): Failed to start command \"%s\": %v", command.String(), err),
)
return return
} }
// Read command output ll := strings.Split(string(stdout), "\n")
scanner := bufio.NewScanner(stdout)
for scanner.Scan() { for _, line := range ll {
lv := strings.Split(scanner.Text(), ",") lv := strings.Split(line, ",")
if len(lv) > 3 { if len(lv) > 3 {
v, err := strconv.ParseFloat(lv[3], 64) v, err := strconv.ParseFloat(lv[3], 64)
if err == nil { if err == nil {
name := strings.ToLower(strings.ReplaceAll(lv[1], " ", "_")) name := strings.ToLower(strings.Replace(lv[1], " ", "_", -1))
y, err := lp.NewMessage(name, map[string]string{"type": "node"}, m.meta, map[string]interface{}{"value": v}, time.Now()) y, err := lp.NewMessage(name, map[string]string{"type": "node"}, m.meta, map[string]interface{}{"value": v}, time.Now())
if err == nil { if err == nil {
if len(lv) > 4 { if len(lv) > 4 {
@@ -185,18 +176,6 @@ func (m *IpmiCollector) readIpmiSensors(cmd string, output chan lp.CCMessage) {
} }
} }
} }
// Wait for command end
if err := command.Wait(); err != nil {
errMsg, _ := io.ReadAll(errBuf)
cclog.ComponentError(
m.name,
fmt.Sprintf("readIpmiSensors(): Failed to wait for the end of command \"%s\": %v\n", command.String(), err),
)
cclog.ComponentError(m.name, fmt.Sprintf("readIpmiSensors(): command stderr: \"%s\"\n", strings.TrimSpace(string(errMsg))))
return
}
} }
func (m *IpmiCollector) Read(interval time.Duration, output chan lp.CCMessage) { func (m *IpmiCollector) Read(interval time.Duration, output chan lp.CCMessage) {

View File

@@ -19,7 +19,6 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"maps"
"math" "math"
"os" "os"
"os/signal" "os/signal"
@@ -188,7 +187,7 @@ func getBaseFreq() float64 {
for _, f := range files { for _, f := range files {
buffer, err := os.ReadFile(f) buffer, err := os.ReadFile(f)
if err == nil { if err == nil {
data := strings.ReplaceAll(string(buffer), "\n", "") data := strings.Replace(string(buffer), "\n", "", -1)
x, err := strconv.ParseInt(data, 0, 64) x, err := strconv.ParseInt(data, 0, 64)
if err == nil { if err == nil {
freq = float64(x) freq = float64(x)
@@ -231,13 +230,9 @@ func (m *LikwidCollector) Init(config json.RawMessage) error {
if m.config.ForceOverwrite { if m.config.ForceOverwrite {
cclog.ComponentDebug(m.name, "Set LIKWID_FORCE=1") cclog.ComponentDebug(m.name, "Set LIKWID_FORCE=1")
if err := os.Setenv("LIKWID_FORCE", "1"); err != nil { os.Setenv("LIKWID_FORCE", "1")
return fmt.Errorf("error setting environment variable LIKWID_FORCE=1: %v", err)
}
}
if err := m.setup(); err != nil {
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
} }
m.setup()
m.meta = map[string]string{"group": "PerfCounter"} m.meta = map[string]string{"group": "PerfCounter"}
cclog.ComponentDebug(m.name, "Get cpulist and init maps and lists") cclog.ComponentDebug(m.name, "Get cpulist and init maps and lists")
@@ -321,14 +316,7 @@ func (m *LikwidCollector) Init(config json.RawMessage) error {
case "accessdaemon": case "accessdaemon":
if len(m.config.DaemonPath) > 0 { if len(m.config.DaemonPath) > 0 {
p := os.Getenv("PATH") p := os.Getenv("PATH")
if len(p) > 0 { os.Setenv("PATH", m.config.DaemonPath+":"+p)
p = m.config.DaemonPath + ":" + p
} else {
p = m.config.DaemonPath
}
if err := os.Setenv("PATH", p); err != nil {
return fmt.Errorf("error setting environment variable PATH=%s: %v", p, err)
}
} }
C.HPMmode(1) C.HPMmode(1)
retCode := C.HPMinit() retCode := C.HPMinit()
@@ -387,18 +375,10 @@ func (m *LikwidCollector) takeMeasurement(evidx int, evset LikwidEventsetConfig,
// Watch changes for the lock file () // Watch changes for the lock file ()
watcher, err := fsnotify.NewWatcher() watcher, err := fsnotify.NewWatcher()
if err != nil { if err != nil {
cclog.ComponentError( cclog.ComponentError(m.name, err.Error())
m.name,
fmt.Sprintf("takeMeasurement(): Failed to create a new fsnotify.Watcher: %v", err))
return true, err return true, err
} }
defer func() { defer watcher.Close()
if err := watcher.Close(); err != nil {
cclog.ComponentError(
m.name,
fmt.Sprintf("takeMeasurement(): Failed to close fsnotify.Watcher: %v", err))
}
}()
if len(m.config.LockfilePath) > 0 { if len(m.config.LockfilePath) > 0 {
// Check if the lock file exists // Check if the lock file exists
info, err := os.Stat(m.config.LockfilePath) info, err := os.Stat(m.config.LockfilePath)
@@ -408,9 +388,7 @@ func (m *LikwidCollector) takeMeasurement(evidx int, evset LikwidEventsetConfig,
if createErr != nil { if createErr != nil {
return true, fmt.Errorf("failed to create lock file: %v", createErr) return true, fmt.Errorf("failed to create lock file: %v", createErr)
} }
if err := file.Close(); err != nil { file.Close()
return true, fmt.Errorf("failed to close lock file: %v", err)
}
info, err = os.Stat(m.config.LockfilePath) // Recheck the file after creation info, err = os.Stat(m.config.LockfilePath) // Recheck the file after creation
} }
if err != nil { if err != nil {
@@ -770,7 +748,9 @@ func (m *LikwidCollector) calcGlobalMetrics(groups []LikwidEventsetConfig, inter
// Here we generate parameter list // Here we generate parameter list
params := make(map[string]float64) params := make(map[string]float64)
for _, evset := range groups { for _, evset := range groups {
maps.Copy(params, evset.metrics[tid]) for mname, mres := range evset.metrics[tid] {
params[mname] = mres
}
} }
params["gotime"] = interval.Seconds() params["gotime"] = interval.Seconds()
// Evaluate the metric // Evaluate the metric
@@ -833,21 +813,13 @@ func (m *LikwidCollector) ReadThread(interval time.Duration, output chan lp.CCMe
if !skip { if !skip {
// read measurements and derive event set metrics // read measurements and derive event set metrics
err = m.calcEventsetMetrics(e, interval, output) m.calcEventsetMetrics(e, interval, output)
if err != nil {
cclog.ComponentError(m.name, err.Error())
return
}
groups = append(groups, e) groups = append(groups, e)
} }
} }
if len(groups) > 0 { if len(groups) > 0 {
// calculate global metrics // calculate global metrics
err = m.calcGlobalMetrics(groups, interval, output) m.calcGlobalMetrics(groups, interval, output)
if err != nil {
cclog.ComponentError(m.name, err.Error())
return
}
} }
} }

View File

@@ -11,7 +11,6 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"os" "os"
"slices"
"strconv" "strconv"
"strings" "strings"
"time" "time"
@@ -43,9 +42,7 @@ type LoadavgCollector struct {
func (m *LoadavgCollector) Init(config json.RawMessage) error { func (m *LoadavgCollector) Init(config json.RawMessage) error {
m.name = "LoadavgCollector" m.name = "LoadavgCollector"
m.parallel = true m.parallel = true
if err := m.setup(); err != nil { m.setup()
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
}
if len(config) > 0 { if len(config) > 0 {
err := json.Unmarshal(config, &m.config) err := json.Unmarshal(config, &m.config)
if err != nil { if err != nil {
@@ -67,10 +64,10 @@ func (m *LoadavgCollector) Init(config json.RawMessage) error {
m.proc_skips = make([]bool, len(m.proc_matches)) m.proc_skips = make([]bool, len(m.proc_matches))
for i, name := range m.load_matches { for i, name := range m.load_matches {
m.load_skips[i] = slices.Contains(m.config.ExcludeMetrics, name) _, m.load_skips[i] = stringArrayContains(m.config.ExcludeMetrics, name)
} }
for i, name := range m.proc_matches { for i, name := range m.proc_matches {
m.proc_skips[i] = slices.Contains(m.config.ExcludeMetrics, name) _, m.proc_skips[i] = stringArrayContains(m.config.ExcludeMetrics, name)
} }
m.init = true m.init = true
return nil return nil

View File

@@ -13,7 +13,6 @@ import (
"fmt" "fmt"
"os/exec" "os/exec"
"os/user" "os/user"
"slices"
"strconv" "strconv"
"strings" "strings"
"time" "time"
@@ -62,6 +61,7 @@ func (m *LustreCollector) getDeviceDataCommand(device string) []string {
} else { } else {
command = exec.Command(m.lctl, LCTL_OPTION, statsfile) command = exec.Command(m.lctl, LCTL_OPTION, statsfile)
} }
command.Wait()
stdout, _ := command.Output() stdout, _ := command.Output()
return strings.Split(string(stdout), "\n") return strings.Split(string(stdout), "\n")
} }
@@ -302,9 +302,7 @@ func (m *LustreCollector) Init(config json.RawMessage) error {
return err return err
} }
} }
if err := m.setup(); err != nil { m.setup()
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
}
m.tags = map[string]string{"type": "node"} m.tags = map[string]string{"type": "node"}
m.meta = map[string]string{"source": m.name, "group": "Lustre"} m.meta = map[string]string{"source": m.name, "group": "Lustre"}
@@ -341,21 +339,21 @@ func (m *LustreCollector) Init(config json.RawMessage) error {
m.definitions = []LustreMetricDefinition{} m.definitions = []LustreMetricDefinition{}
if m.config.SendAbsoluteValues { if m.config.SendAbsoluteValues {
for _, def := range LustreAbsMetrics { for _, def := range LustreAbsMetrics {
if !slices.Contains(m.config.ExcludeMetrics, def.name) { if _, skip := stringArrayContains(m.config.ExcludeMetrics, def.name); !skip {
m.definitions = append(m.definitions, def) m.definitions = append(m.definitions, def)
} }
} }
} }
if m.config.SendDiffValues { if m.config.SendDiffValues {
for _, def := range LustreDiffMetrics { for _, def := range LustreDiffMetrics {
if !slices.Contains(m.config.ExcludeMetrics, def.name) { if _, skip := stringArrayContains(m.config.ExcludeMetrics, def.name); !skip {
m.definitions = append(m.definitions, def) m.definitions = append(m.definitions, def)
} }
} }
} }
if m.config.SendDerivedValues { if m.config.SendDerivedValues {
for _, def := range LustreDeriveMetrics { for _, def := range LustreDeriveMetrics {
if !slices.Contains(m.config.ExcludeMetrics, def.name) { if _, skip := stringArrayContains(m.config.ExcludeMetrics, def.name); !skip {
m.definitions = append(m.definitions, def) m.definitions = append(m.definitions, def)
} }
} }

View File

@@ -15,7 +15,6 @@ import (
"os" "os"
"path/filepath" "path/filepath"
"regexp" "regexp"
"slices"
"strconv" "strconv"
"strings" "strings"
"time" "time"
@@ -59,11 +58,7 @@ func getStats(filename string) map[string]MemstatStats {
if err != nil { if err != nil {
cclog.Error(err.Error()) cclog.Error(err.Error())
} }
defer func() { defer file.Close()
if err := file.Close(); err != nil {
cclog.Error(err.Error())
}
}()
scanner := bufio.NewScanner(file) scanner := bufio.NewScanner(file)
for scanner.Scan() { for scanner.Scan() {
@@ -120,20 +115,19 @@ func (m *MemstatCollector) Init(config json.RawMessage) error {
"MemShared": "mem_shared", "MemShared": "mem_shared",
} }
for k, v := range matches { for k, v := range matches {
if !slices.Contains(m.config.ExcludeMetrics, k) { _, skip := stringArrayContains(m.config.ExcludeMetrics, k)
if !skip {
m.matches[k] = v m.matches[k] = v
} }
} }
m.sendMemUsed = false m.sendMemUsed = false
if !slices.Contains(m.config.ExcludeMetrics, "mem_used") { if _, skip := stringArrayContains(m.config.ExcludeMetrics, "mem_used"); !skip {
m.sendMemUsed = true m.sendMemUsed = true
} }
if len(m.matches) == 0 { if len(m.matches) == 0 {
return errors.New("no metrics to collect") return errors.New("no metrics to collect")
} }
if err := m.setup(); err != nil { m.setup()
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
}
if m.config.NodeStats { if m.config.NodeStats {
if stats := getStats(MEMSTATFILE); len(stats) == 0 { if stats := getStats(MEMSTATFILE); len(stats) == 0 {
@@ -180,7 +174,7 @@ func (m *MemstatCollector) Read(interval time.Duration, output chan lp.CCMessage
sendStats := func(stats map[string]MemstatStats, tags map[string]string) { sendStats := func(stats map[string]MemstatStats, tags map[string]string) {
for match, name := range m.matches { for match, name := range m.matches {
var value float64 = 0 var value float64 = 0
unit := "" var unit string = ""
if v, ok := stats[match]; ok { if v, ok := stats[match]; ok {
value = v.value value = v.value
if len(v.unit) > 0 { if len(v.unit) > 0 {

View File

@@ -51,6 +51,30 @@ func (c *metricCollector) Initialized() bool {
return c.init return c.init
} }
// intArrayContains scans an array of ints if the value str is present in the array
// If the specified value is found, the corresponding array index is returned.
// The bool value is used to signal success or failure
func intArrayContains(array []int, str int) (int, bool) {
for i, a := range array {
if a == str {
return i, true
}
}
return -1, false
}
// stringArrayContains scans an array of strings if the value str is present in the array
// If the specified value is found, the corresponding array index is returned.
// The bool value is used to signal success or failure
func stringArrayContains(array []string, str string) (int, bool) {
for i, a := range array {
if a == str {
return i, true
}
}
return -1, false
}
// RemoveFromStringList removes the string r from the array of strings s // RemoveFromStringList removes the string r from the array of strings s
// If r is not contained in the array an error is returned // If r is not contained in the array an error is returned
func RemoveFromStringList(s []string, r string) ([]string, error) { func RemoveFromStringList(s []string, r string) ([]string, error) {

View File

@@ -10,9 +10,8 @@ package collectors
import ( import (
"bufio" "bufio"
"encoding/json" "encoding/json"
"fmt" "errors"
"os" "os"
"slices"
"strconv" "strconv"
"strings" "strings"
"time" "time"
@@ -66,9 +65,7 @@ func getCanonicalName(raw string, aliasToCanonical map[string]string) string {
func (m *NetstatCollector) Init(config json.RawMessage) error { func (m *NetstatCollector) Init(config json.RawMessage) error {
m.name = "NetstatCollector" m.name = "NetstatCollector"
m.parallel = true m.parallel = true
if err := m.setup(); err != nil { m.setup()
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
}
m.lastTimestamp = time.Now() m.lastTimestamp = time.Now()
const ( const (
@@ -110,8 +107,10 @@ func (m *NetstatCollector) Init(config json.RawMessage) error {
// Check access to net statistic file // Check access to net statistic file
file, err := os.Open(NETSTATFILE) file, err := os.Open(NETSTATFILE)
if err != nil { if err != nil {
return fmt.Errorf("%s Init(): failed to open netstat file \"%s\": %w", m.name, NETSTATFILE, err) cclog.ComponentError(m.name, err.Error())
return err
} }
defer file.Close()
scanner := bufio.NewScanner(file) scanner := bufio.NewScanner(file)
for scanner.Scan() { for scanner.Scan() {
@@ -130,7 +129,7 @@ func (m *NetstatCollector) Init(config json.RawMessage) error {
canonical := getCanonicalName(raw, m.aliasToCanonical) canonical := getCanonicalName(raw, m.aliasToCanonical)
// Check if device is a included device // Check if device is a included device
if slices.Contains(m.config.IncludeDevices, canonical) { if _, ok := stringArrayContains(m.config.IncludeDevices, canonical); ok {
// Tag will contain original device name (raw). // Tag will contain original device name (raw).
tags := map[string]string{"stype": "network", "stype-id": raw, "type": "node"} tags := map[string]string{"stype": "network", "stype-id": raw, "type": "node"}
meta_unit_byte := map[string]string{"source": m.name, "group": "Network", "unit": "bytes"} meta_unit_byte := map[string]string{"source": m.name, "group": "Network", "unit": "bytes"}
@@ -175,13 +174,8 @@ func (m *NetstatCollector) Init(config json.RawMessage) error {
} }
} }
// Close netstat file
if err := file.Close(); err != nil {
return fmt.Errorf("%s Init(): failed to close netstat file \"%s\": %w", m.name, NETSTATFILE, err)
}
if len(m.matches) == 0 { if len(m.matches) == 0 {
return fmt.Errorf("%s Init(): no devices to collect metrics found", m.name) return errors.New("no devices to collector metrics found")
} }
m.init = true m.init = true
return nil return nil
@@ -200,18 +194,10 @@ func (m *NetstatCollector) Read(interval time.Duration, output chan lp.CCMessage
file, err := os.Open(NETSTATFILE) file, err := os.Open(NETSTATFILE)
if err != nil { if err != nil {
cclog.ComponentError( cclog.ComponentError(m.name, err.Error())
m.name,
fmt.Sprintf("Read(): Failed to open file '%s': %v", NETSTATFILE, err))
return return
} }
defer func() { defer file.Close()
if err := file.Close(); err != nil {
cclog.ComponentError(
m.name,
fmt.Sprintf("Read(): Failed to close file '%s': %v", NETSTATFILE, err))
}
}()
scanner := bufio.NewScanner(file) scanner := bufio.NewScanner(file)
for scanner.Scan() { for scanner.Scan() {

View File

@@ -11,7 +11,6 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"log" "log"
"slices"
// "os" // "os"
"os/exec" "os/exec"
@@ -19,7 +18,6 @@ import (
"strings" "strings"
"time" "time"
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
lp "github.com/ClusterCockpit/cc-lib/v2/ccMessage" lp "github.com/ClusterCockpit/cc-lib/v2/ccMessage"
) )
@@ -46,15 +44,10 @@ type nfsCollector struct {
func (m *nfsCollector) initStats() error { func (m *nfsCollector) initStats() error {
cmd := exec.Command(m.config.Nfsstats, `-l`, `--all`) cmd := exec.Command(m.config.Nfsstats, `-l`, `--all`)
cmd.Wait()
// Wait for cmd end
if err := cmd.Wait(); err != nil {
return fmt.Errorf("initStats(): %w", err)
}
buffer, err := cmd.Output() buffer, err := cmd.Output()
if err == nil { if err == nil {
for line := range strings.Lines(string(buffer)) { for _, line := range strings.Split(string(buffer), "\n") {
lf := strings.Fields(line) lf := strings.Fields(line)
if len(lf) != 5 { if len(lf) != 5 {
continue continue
@@ -78,15 +71,10 @@ func (m *nfsCollector) initStats() error {
func (m *nfsCollector) updateStats() error { func (m *nfsCollector) updateStats() error {
cmd := exec.Command(m.config.Nfsstats, `-l`, `--all`) cmd := exec.Command(m.config.Nfsstats, `-l`, `--all`)
cmd.Wait()
// Wait for cmd end
if err := cmd.Wait(); err != nil {
return fmt.Errorf("updateStats(): %w", err)
}
buffer, err := cmd.Output() buffer, err := cmd.Output()
if err == nil { if err == nil {
for line := range strings.Lines(string(buffer)) { for _, line := range strings.Split(string(buffer), "\n") {
lf := strings.Fields(line) lf := strings.Fields(line)
if len(lf) != 5 { if len(lf) != 5 {
continue continue
@@ -131,9 +119,7 @@ func (m *nfsCollector) MainInit(config json.RawMessage) error {
return fmt.Errorf("NfsCollector.Init(): Failed to find nfsstat binary '%s': %v", m.config.Nfsstats, err) return fmt.Errorf("NfsCollector.Init(): Failed to find nfsstat binary '%s': %v", m.config.Nfsstats, err)
} }
m.data = make(map[string]NfsCollectorData) m.data = make(map[string]NfsCollectorData)
if err := m.initStats(); err != nil { m.initStats()
return fmt.Errorf("NfsCollector.Init(): %w", err)
}
m.init = true m.init = true
m.parallel = true m.parallel = true
return nil return nil
@@ -145,13 +131,7 @@ func (m *nfsCollector) Read(interval time.Duration, output chan lp.CCMessage) {
} }
timestamp := time.Now() timestamp := time.Now()
if err := m.updateStats(); err != nil { m.updateStats()
cclog.ComponentError(
m.name,
fmt.Sprintf("Read(): updateStats() failed: %v", err),
)
return
}
prefix := "" prefix := ""
switch m.version { switch m.version {
case "v3": case "v3":
@@ -163,7 +143,7 @@ func (m *nfsCollector) Read(interval time.Duration, output chan lp.CCMessage) {
} }
for name, data := range m.data { for name, data := range m.data {
if slices.Contains(m.config.ExcludeMetrics, name) { if _, skip := stringArrayContains(m.config.ExcludeMetrics, name); skip {
continue continue
} }
value := data.current - data.last value := data.current - data.last
@@ -190,17 +170,13 @@ type Nfs4Collector struct {
func (m *Nfs3Collector) Init(config json.RawMessage) error { func (m *Nfs3Collector) Init(config json.RawMessage) error {
m.name = "Nfs3Collector" m.name = "Nfs3Collector"
m.version = `v3` m.version = `v3`
if err := m.setup(); err != nil { m.setup()
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
}
return m.MainInit(config) return m.MainInit(config)
} }
func (m *Nfs4Collector) Init(config json.RawMessage) error { func (m *Nfs4Collector) Init(config json.RawMessage) error {
m.name = "Nfs4Collector" m.name = "Nfs4Collector"
m.version = `v4` m.version = `v4`
if err := m.setup(); err != nil { m.setup()
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
}
return m.MainInit(config) return m.MainInit(config)
} }

View File

@@ -12,7 +12,6 @@ import (
"fmt" "fmt"
"os" "os"
"regexp" "regexp"
"slices"
"strconv" "strconv"
"strings" "strings"
"time" "time"
@@ -72,7 +71,7 @@ func (m *NfsIOStatCollector) readNfsiostats() map[string]map[string]int64 {
// Is this a device line with mount point, remote target and NFS version? // Is this a device line with mount point, remote target and NFS version?
dev := resolve_regex_fields(l, deviceRegex) dev := resolve_regex_fields(l, deviceRegex)
if len(dev) > 0 { if len(dev) > 0 {
if !slices.Contains(m.config.ExcludeFilesystem, dev[m.key]) { if _, ok := stringArrayContains(m.config.ExcludeFilesystem, dev[m.key]); !ok {
current = dev current = dev
if len(current["version"]) == 0 { if len(current["version"]) == 0 {
current["version"] = "3" current["version"] = "3"
@@ -86,7 +85,7 @@ func (m *NfsIOStatCollector) readNfsiostats() map[string]map[string]int64 {
if len(bytes) > 0 { if len(bytes) > 0 {
data[current[m.key]] = make(map[string]int64) data[current[m.key]] = make(map[string]int64)
for name, sval := range bytes { for name, sval := range bytes {
if !slices.Contains(m.config.ExcludeMetrics, name) { if _, ok := stringArrayContains(m.config.ExcludeMetrics, name); !ok {
val, err := strconv.ParseInt(sval, 10, 64) val, err := strconv.ParseInt(sval, 10, 64)
if err == nil { if err == nil {
data[current[m.key]][name] = val data[current[m.key]][name] = val
@@ -103,9 +102,7 @@ func (m *NfsIOStatCollector) readNfsiostats() map[string]map[string]int64 {
func (m *NfsIOStatCollector) Init(config json.RawMessage) error { func (m *NfsIOStatCollector) Init(config json.RawMessage) error {
var err error = nil var err error = nil
m.name = "NfsIOStatCollector" m.name = "NfsIOStatCollector"
if err := m.setup(); err != nil { m.setup()
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
}
m.parallel = true m.parallel = true
m.meta = map[string]string{"source": m.name, "group": "NFS", "unit": "bytes"} m.meta = map[string]string{"source": m.name, "group": "NFS", "unit": "bytes"}
m.tags = map[string]string{"type": "node"} m.tags = map[string]string{"type": "node"}

View File

@@ -72,9 +72,7 @@ func (m *NUMAStatsCollector) Init(config json.RawMessage) error {
m.name = "NUMAStatsCollector" m.name = "NUMAStatsCollector"
m.parallel = true m.parallel = true
if err := m.setup(); err != nil { m.setup()
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
}
m.meta = map[string]string{ m.meta = map[string]string{
"source": m.name, "source": m.name,
"group": "NUMA", "group": "NUMA",
@@ -188,11 +186,7 @@ func (m *NUMAStatsCollector) Read(interval time.Duration, output chan lp.CCMessa
t.previousValues[key] = value t.previousValues[key] = value
} }
} }
if err := file.Close(); err != nil { file.Close()
cclog.ComponentError(
m.name,
fmt.Sprintf("Read(): Failed to close file '%s': %v", t.file, err))
}
} }
} }

View File

@@ -12,8 +12,6 @@ import (
"errors" "errors"
"fmt" "fmt"
"log" "log"
"maps"
"slices"
"strings" "strings"
"time" "time"
@@ -66,9 +64,7 @@ func (m *NvidiaCollector) Init(config json.RawMessage) error {
m.config.ProcessMigDevices = false m.config.ProcessMigDevices = false
m.config.UseUuidForMigDevices = false m.config.UseUuidForMigDevices = false
m.config.UseSliceForMigDevices = false m.config.UseSliceForMigDevices = false
if err := m.setup(); err != nil { m.setup()
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
}
if len(config) > 0 { if len(config) > 0 {
err = json.Unmarshal(config, &m.config) err = json.Unmarshal(config, &m.config)
if err != nil { if err != nil {
@@ -113,7 +109,7 @@ func (m *NvidiaCollector) Init(config json.RawMessage) error {
// Skip excluded devices by ID // Skip excluded devices by ID
str_i := fmt.Sprintf("%d", i) str_i := fmt.Sprintf("%d", i)
if slices.Contains(m.config.ExcludeDevices, str_i) { if _, skip := stringArrayContains(m.config.ExcludeDevices, str_i); skip {
cclog.ComponentDebug(m.name, "Skipping excluded device", str_i) cclog.ComponentDebug(m.name, "Skipping excluded device", str_i)
continue continue
} }
@@ -141,7 +137,7 @@ func (m *NvidiaCollector) Init(config json.RawMessage) error {
pciInfo.Device) pciInfo.Device)
// Skip excluded devices specified by PCI ID // Skip excluded devices specified by PCI ID
if slices.Contains(m.config.ExcludeDevices, pci_id) { if _, skip := stringArrayContains(m.config.ExcludeDevices, pci_id); skip {
cclog.ComponentDebug(m.name, "Skipping excluded device", pci_id) cclog.ComponentDebug(m.name, "Skipping excluded device", pci_id)
continue continue
} }
@@ -226,7 +222,7 @@ func readMemoryInfo(device *NvidiaCollectorDevice, output chan lp.CCMessage) err
var total uint64 var total uint64
var used uint64 var used uint64
var reserved uint64 = 0 var reserved uint64 = 0
v2 := false var v2 bool = false
meminfo, ret := nvml.DeviceGetMemoryInfo(device.device) meminfo, ret := nvml.DeviceGetMemoryInfo(device.device)
if ret != nvml.SUCCESS { if ret != nvml.SUCCESS {
err := errors.New(nvml.ErrorString(ret)) err := errors.New(nvml.ErrorString(ret))
@@ -409,8 +405,7 @@ func readEccMode(device *NvidiaCollectorDevice, output chan lp.CCMessage) error
// Changing ECC modes requires a reboot. // Changing ECC modes requires a reboot.
// The "pending" ECC mode refers to the target mode following the next reboot. // The "pending" ECC mode refers to the target mode following the next reboot.
_, ecc_pend, ret := nvml.DeviceGetEccMode(device.device) _, ecc_pend, ret := nvml.DeviceGetEccMode(device.device)
switch ret { if ret == nvml.SUCCESS {
case nvml.SUCCESS:
var y lp.CCMessage var y lp.CCMessage
var err error var err error
switch ecc_pend { switch ecc_pend {
@@ -424,7 +419,7 @@ func readEccMode(device *NvidiaCollectorDevice, output chan lp.CCMessage) error
if err == nil { if err == nil {
output <- y output <- y
} }
case nvml.ERROR_NOT_SUPPORTED: } else if ret == nvml.ERROR_NOT_SUPPORTED {
y, err := lp.NewMessage("nv_ecc_mode", device.tags, device.meta, map[string]interface{}{"value": "N/A"}, time.Now()) y, err := lp.NewMessage("nv_ecc_mode", device.tags, device.meta, map[string]interface{}{"value": "N/A"}, time.Now())
if err == nil { if err == nil {
output <- y output <- y
@@ -773,7 +768,7 @@ func readRemappedRows(device *NvidiaCollectorDevice, output chan lp.CCMessage) e
} }
} }
if !device.excludeMetrics["nv_remapped_rows_pending"] { if !device.excludeMetrics["nv_remapped_rows_pending"] {
p := 0 var p int = 0
if pending { if pending {
p = 1 p = 1
} }
@@ -783,7 +778,7 @@ func readRemappedRows(device *NvidiaCollectorDevice, output chan lp.CCMessage) e
} }
} }
if !device.excludeMetrics["nv_remapped_rows_failure"] { if !device.excludeMetrics["nv_remapped_rows_failure"] {
f := 0 var f int = 0
if failure { if failure {
f = 1 f = 1
} }
@@ -1280,7 +1275,9 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMessage)
meta: map[string]string{}, meta: map[string]string{},
excludeMetrics: excludeMetrics, excludeMetrics: excludeMetrics,
} }
maps.Copy(migDevice.tags, m.gpus[i].tags) for k, v := range m.gpus[i].tags {
migDevice.tags[k] = v
}
migDevice.tags["stype"] = "mig" migDevice.tags["stype"] = "mig"
if m.config.UseUuidForMigDevices { if m.config.UseUuidForMigDevices {
uuid, ret := nvml.DeviceGetUUID(mdev) uuid, ret := nvml.DeviceGetUUID(mdev)
@@ -1294,8 +1291,8 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMessage)
if ret == nvml.SUCCESS { if ret == nvml.SUCCESS {
mname, ret := nvml.DeviceGetName(mdev) mname, ret := nvml.DeviceGetName(mdev)
if ret == nvml.SUCCESS { if ret == nvml.SUCCESS {
x := strings.ReplaceAll(mname, name, "") x := strings.Replace(mname, name, "", -1)
x = strings.ReplaceAll(x, "MIG", "") x = strings.Replace(x, "MIG", "", -1)
x = strings.TrimSpace(x) x = strings.TrimSpace(x)
migDevice.tags["stype-id"] = x migDevice.tags["stype-id"] = x
} }
@@ -1304,7 +1301,9 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMessage)
if _, ok := migDevice.tags["stype-id"]; !ok { if _, ok := migDevice.tags["stype-id"]; !ok {
migDevice.tags["stype-id"] = fmt.Sprintf("%d", j) migDevice.tags["stype-id"] = fmt.Sprintf("%d", j)
} }
maps.Copy(migDevice.meta, m.gpus[i].meta) for k, v := range m.gpus[i].meta {
migDevice.meta[k] = v
}
if _, ok := migDevice.meta["uuid"]; ok && !m.config.UseUuidForMigDevices { if _, ok := migDevice.meta["uuid"]; ok && !m.config.UseUuidForMigDevices {
uuid, ret := nvml.DeviceGetUUID(mdev) uuid, ret := nvml.DeviceGetUUID(mdev)
if ret == nvml.SUCCESS { if ret == nvml.SUCCESS {
@@ -1320,9 +1319,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMessage)
func (m *NvidiaCollector) Close() { func (m *NvidiaCollector) Close() {
if m.init { if m.init {
if ret := nvml.Shutdown(); ret != nvml.SUCCESS { nvml.Shutdown()
cclog.ComponentError(m.name, "nvml.Shutdown() not successful")
}
m.init = false m.init = false
} }
} }

View File

@@ -54,10 +54,9 @@ func (m *RAPLCollector) Init(config json.RawMessage) error {
return nil return nil
} }
var err error = nil
m.name = "RAPLCollector" m.name = "RAPLCollector"
if err := m.setup(); err != nil { m.setup()
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
}
m.parallel = true m.parallel = true
m.meta = map[string]string{ m.meta = map[string]string{
"source": m.name, "source": m.name,
@@ -67,7 +66,7 @@ func (m *RAPLCollector) Init(config json.RawMessage) error {
// Read in the JSON configuration // Read in the JSON configuration
if len(config) > 0 { if len(config) > 0 {
err := json.Unmarshal(config, &m.config) err = json.Unmarshal(config, &m.config)
if err != nil { if err != nil {
cclog.ComponentError(m.name, "Error reading config:", err.Error()) cclog.ComponentError(m.name, "Error reading config:", err.Error())
return err return err

View File

@@ -52,9 +52,7 @@ func (m *RocmSmiCollector) Init(config json.RawMessage) error {
// Always set the name early in Init() to use it in cclog.Component* functions // Always set the name early in Init() to use it in cclog.Component* functions
m.name = "RocmSmiCollector" m.name = "RocmSmiCollector"
// This is for later use, also call it early // This is for later use, also call it early
if err := m.setup(); err != nil { m.setup()
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
}
// Define meta information sent with each metric // Define meta information sent with each metric
// (Can also be dynamic or this is the basic set with extension through AddMeta()) // (Can also be dynamic or this is the basic set with extension through AddMeta())
//m.meta = map[string]string{"source": m.name, "group": "AMD"} //m.meta = map[string]string{"source": m.name, "group": "AMD"}

View File

@@ -9,7 +9,6 @@ package collectors
import ( import (
"encoding/json" "encoding/json"
"fmt"
"time" "time"
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
@@ -42,9 +41,7 @@ func (m *SampleCollector) Init(config json.RawMessage) error {
// Always set the name early in Init() to use it in cclog.Component* functions // Always set the name early in Init() to use it in cclog.Component* functions
m.name = "SampleCollector" m.name = "SampleCollector"
// This is for later use, also call it early // This is for later use, also call it early
if err := m.setup(); err != nil { m.setup()
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
}
// Tell whether the collector should be run in parallel with others (reading files, ...) // Tell whether the collector should be run in parallel with others (reading files, ...)
// or it should be run serially, mostly for collectors actually doing measurements // or it should be run serially, mostly for collectors actually doing measurements
// because they should not measure the execution of the other collectors // because they should not measure the execution of the other collectors

View File

@@ -9,7 +9,6 @@ package collectors
import ( import (
"encoding/json" "encoding/json"
"fmt"
"sync" "sync"
"time" "time"
@@ -41,9 +40,7 @@ func (m *SampleTimerCollector) Init(name string, config json.RawMessage) error {
// Always set the name early in Init() to use it in cclog.Component* functions // Always set the name early in Init() to use it in cclog.Component* functions
m.name = "SampleTimerCollector" m.name = "SampleTimerCollector"
// This is for later use, also call it early // This is for later use, also call it early
if err := m.setup(); err != nil { m.setup()
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
}
// Define meta information sent with each metric // Define meta information sent with each metric
// (Can also be dynamic or this is the basic set with extension through AddMeta()) // (Can also be dynamic or this is the basic set with extension through AddMeta())
m.meta = map[string]string{"source": m.name, "group": "SAMPLE"} m.meta = map[string]string{"source": m.name, "group": "SAMPLE"}

View File

@@ -11,6 +11,7 @@ import (
"bufio" "bufio"
"encoding/json" "encoding/json"
"fmt" "fmt"
"math"
"os" "os"
"strconv" "strconv"
"strings" "strings"
@@ -46,37 +47,37 @@ type SchedstatCollector struct {
// Called once by the collector manager // Called once by the collector manager
// All tags, meta data tags and metrics that do not change over the runtime should be set here // All tags, meta data tags and metrics that do not change over the runtime should be set here
func (m *SchedstatCollector) Init(config json.RawMessage) error { func (m *SchedstatCollector) Init(config json.RawMessage) error {
var err error = nil
// Always set the name early in Init() to use it in cclog.Component* functions // Always set the name early in Init() to use it in cclog.Component* functions
m.name = "SchedstatCollector" m.name = "SchedstatCollector"
// This is for later use, also call it early // This is for later use, also call it early
if err := m.setup(); err != nil { m.setup()
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
}
// Tell whether the collector should be run in parallel with others (reading files, ...) // Tell whether the collector should be run in parallel with others (reading files, ...)
// or it should be run serially, mostly for collectors acutally doing measurements // or it should be run serially, mostly for collectors acutally doing measurements
// because they should not measure the execution of the other collectors // because they should not measure the execution of the other collectors
m.parallel = true m.parallel = true
// Define meta information sent with each metric // Define meta information sent with each metric
// (Can also be dynamic or this is the basic set with extension through AddMeta()) // (Can also be dynamic or this is the basic set with extension through AddMeta())
m.meta = map[string]string{ m.meta = map[string]string{"source": m.name, "group": "SCHEDSTAT"}
"source": m.name,
"group": "SCHEDSTAT",
}
// Read in the JSON configuration // Read in the JSON configuration
if len(config) > 0 { if len(config) > 0 {
if err := json.Unmarshal(config, &m.config); err != nil { err = json.Unmarshal(config, &m.config)
return fmt.Errorf("%s Init(): Error reading config: %w", m.name, err) if err != nil {
cclog.ComponentError(m.name, "Error reading config:", err.Error())
return err
} }
} }
// Check input file // Check input file
file, err := os.Open(SCHEDSTATFILE) file, err := os.Open(string(SCHEDSTATFILE))
if err != nil { if err != nil {
return fmt.Errorf("%s Init(): Failed opening scheduler statistics file \"%s\": %w", m.name, SCHEDSTATFILE, err) cclog.ComponentError(m.name, err.Error())
} }
defer file.Close()
// Pre-generate tags for all CPUs // Pre-generate tags for all CPUs
num_cpus := 0
m.cputags = make(map[string]map[string]string) m.cputags = make(map[string]map[string]string)
m.olddata = make(map[string]map[string]int64) m.olddata = make(map[string]map[string]int64)
scanner := bufio.NewScanner(file) scanner := bufio.NewScanner(file)
@@ -88,18 +89,10 @@ func (m *SchedstatCollector) Init(config json.RawMessage) error {
cpu, _ := strconv.Atoi(cpustr) cpu, _ := strconv.Atoi(cpustr)
running, _ := strconv.ParseInt(linefields[7], 10, 64) running, _ := strconv.ParseInt(linefields[7], 10, 64)
waiting, _ := strconv.ParseInt(linefields[8], 10, 64) waiting, _ := strconv.ParseInt(linefields[8], 10, 64)
m.cputags[linefields[0]] = map[string]string{ m.cputags[linefields[0]] = map[string]string{"type": "hwthread", "type-id": fmt.Sprintf("%d", cpu)}
"type": "hwthread", m.olddata[linefields[0]] = map[string]int64{"running": running, "waiting": waiting}
"type-id": fmt.Sprintf("%d", cpu), num_cpus++
} }
m.olddata[linefields[0]] = map[string]int64{
"running": running,
"waiting": waiting,
}
}
}
if err := file.Close(); err != nil {
return fmt.Errorf("%s Init(): Failed closing scheduler statistics file \"%s\": %w", m.name, SCHEDSTATFILE, err)
} }
// Save current timestamp // Save current timestamp
@@ -116,8 +109,8 @@ func (m *SchedstatCollector) ParseProcLine(linefields []string, tags map[string]
diff_running := running - m.olddata[linefields[0]]["running"] diff_running := running - m.olddata[linefields[0]]["running"]
diff_waiting := waiting - m.olddata[linefields[0]]["waiting"] diff_waiting := waiting - m.olddata[linefields[0]]["waiting"]
l_running := float64(diff_running) / tsdelta.Seconds() / 1000_000_000 var l_running float64 = float64(diff_running) / tsdelta.Seconds() / (math.Pow(1000, 3))
l_waiting := float64(diff_waiting) / tsdelta.Seconds() / 1000_000_000 var l_waiting float64 = float64(diff_waiting) / tsdelta.Seconds() / (math.Pow(1000, 3))
m.olddata[linefields[0]]["running"] = running m.olddata[linefields[0]]["running"] = running
m.olddata[linefields[0]]["waiting"] = waiting m.olddata[linefields[0]]["waiting"] = waiting
@@ -141,19 +134,11 @@ func (m *SchedstatCollector) Read(interval time.Duration, output chan lp.CCMessa
now := time.Now() now := time.Now()
tsdelta := now.Sub(m.lastTimestamp) tsdelta := now.Sub(m.lastTimestamp)
file, err := os.Open(SCHEDSTATFILE) file, err := os.Open(string(SCHEDSTATFILE))
if err != nil { if err != nil {
cclog.ComponentError( cclog.ComponentError(m.name, err.Error())
m.name,
fmt.Sprintf("Read(): Failed to open file '%s': %v", SCHEDSTATFILE, err))
} }
defer func() { defer file.Close()
if err := file.Close(); err != nil {
cclog.ComponentError(
m.name,
fmt.Sprintf("Read(): Failed to close file '%s': %v", SCHEDSTATFILE, err))
}
}()
scanner := bufio.NewScanner(file) scanner := bufio.NewScanner(file)
for scanner.Scan() { for scanner.Scan() {

View File

@@ -9,7 +9,6 @@ package collectors
import ( import (
"encoding/json" "encoding/json"
"fmt"
"runtime" "runtime"
"syscall" "syscall"
"time" "time"
@@ -35,9 +34,7 @@ type SelfCollector struct {
func (m *SelfCollector) Init(config json.RawMessage) error { func (m *SelfCollector) Init(config json.RawMessage) error {
var err error = nil var err error = nil
m.name = "SelfCollector" m.name = "SelfCollector"
if err := m.setup(); err != nil { m.setup()
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
}
m.parallel = true m.parallel = true
m.meta = map[string]string{"source": m.name, "group": "Self"} m.meta = map[string]string{"source": m.name, "group": "Self"}
m.tags = map[string]string{"type": "node"} m.tags = map[string]string{"type": "node"}

View File

@@ -50,7 +50,8 @@ func ParseCPUs(cpuset string) ([]int, error) {
return result, nil return result, nil
} }
for r := range strings.SplitSeq(cpuset, ",") { ranges := strings.Split(cpuset, ",")
for _, r := range ranges {
if strings.Contains(r, "-") { if strings.Contains(r, "-") {
parts := strings.Split(r, "-") parts := strings.Split(r, "-")
if len(parts) != 2 { if len(parts) != 2 {
@@ -102,9 +103,7 @@ func (m *SlurmCgroupCollector) readFile(path string) ([]byte, error) {
func (m *SlurmCgroupCollector) Init(config json.RawMessage) error { func (m *SlurmCgroupCollector) Init(config json.RawMessage) error {
var err error var err error
m.name = "SlurmCgroupCollector" m.name = "SlurmCgroupCollector"
if err := m.setup(); err != nil { m.setup()
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
}
m.parallel = true m.parallel = true
m.meta = map[string]string{"source": m.name, "group": "SLURM"} m.meta = map[string]string{"source": m.name, "group": "SLURM"}
m.tags = map[string]string{"type": "hwthread"} m.tags = map[string]string{"type": "hwthread"}

View File

@@ -58,9 +58,7 @@ func (m *TempCollector) Init(config json.RawMessage) error {
m.name = "TempCollector" m.name = "TempCollector"
m.parallel = true m.parallel = true
if err := m.setup(); err != nil { m.setup()
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
}
if len(config) > 0 { if len(config) > 0 {
err := json.Unmarshal(config, &m.config) err := json.Unmarshal(config, &m.config)
if err != nil { if err != nil {
@@ -119,7 +117,7 @@ func (m *TempCollector) Init(config json.RawMessage) error {
sensor.metricName = sensor.label sensor.metricName = sensor.label
} }
sensor.metricName = strings.ToLower(sensor.metricName) sensor.metricName = strings.ToLower(sensor.metricName)
sensor.metricName = strings.ReplaceAll(sensor.metricName, " ", "_") sensor.metricName = strings.Replace(sensor.metricName, " ", "_", -1)
// Add temperature prefix, if required // Add temperature prefix, if required
if !strings.Contains(sensor.metricName, "temp") { if !strings.Contains(sensor.metricName, "temp") {
sensor.metricName = "temp_" + sensor.metricName sensor.metricName = "temp_" + sensor.metricName

View File

@@ -9,12 +9,13 @@ package collectors
import ( import (
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"log"
"os/exec" "os/exec"
"strings" "strings"
"time" "time"
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
lp "github.com/ClusterCockpit/cc-lib/v2/ccMessage" lp "github.com/ClusterCockpit/cc-lib/v2/ccMessage"
) )
@@ -35,17 +36,12 @@ func (m *TopProcsCollector) Init(config json.RawMessage) error {
var err error var err error
m.name = "TopProcsCollector" m.name = "TopProcsCollector"
m.parallel = true m.parallel = true
m.tags = map[string]string{ m.tags = map[string]string{"type": "node"}
"type": "node", m.meta = map[string]string{"source": m.name, "group": "TopProcs"}
}
m.meta = map[string]string{
"source": m.name,
"group": "TopProcs",
}
if len(config) > 0 { if len(config) > 0 {
err = json.Unmarshal(config, &m.config) err = json.Unmarshal(config, &m.config)
if err != nil { if err != nil {
return fmt.Errorf("%s Init(): json.Unmarshal() failed: %w", m.name, err) return err
} }
} else { } else {
m.config.Num_procs = int(DEFAULT_NUM_PROCS) m.config.Num_procs = int(DEFAULT_NUM_PROCS)
@@ -53,13 +49,12 @@ func (m *TopProcsCollector) Init(config json.RawMessage) error {
if m.config.Num_procs <= 0 || m.config.Num_procs > MAX_NUM_PROCS { if m.config.Num_procs <= 0 || m.config.Num_procs > MAX_NUM_PROCS {
return fmt.Errorf("num_procs option must be set in 'topprocs' config (range: 1-%d)", MAX_NUM_PROCS) return fmt.Errorf("num_procs option must be set in 'topprocs' config (range: 1-%d)", MAX_NUM_PROCS)
} }
if err := m.setup(); err != nil { m.setup()
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
}
command := exec.Command("ps", "-Ao", "comm", "--sort=-pcpu") command := exec.Command("ps", "-Ao", "comm", "--sort=-pcpu")
command.Wait()
_, err = command.Output() _, err = command.Output()
if err != nil { if err != nil {
return fmt.Errorf("%s Init(): failed to get output from command: %w", m.name, err) return errors.New("failed to execute command")
} }
m.init = true m.init = true
return nil return nil
@@ -70,11 +65,10 @@ func (m *TopProcsCollector) Read(interval time.Duration, output chan lp.CCMessag
return return
} }
command := exec.Command("ps", "-Ao", "comm", "--sort=-pcpu") command := exec.Command("ps", "-Ao", "comm", "--sort=-pcpu")
command.Wait()
stdout, err := command.Output() stdout, err := command.Output()
if err != nil { if err != nil {
cclog.ComponentError( log.Print(m.name, err)
m.name,
fmt.Sprintf("Read(): Failed to read output from command \"%s\": %v", command.String(), err))
return return
} }

View File

@@ -1,19 +1,6 @@
{ {
"cpufreq": {}, "cpufreq": {},
"cpufreq_cpuinfo": {}, "cpufreq_cpuinfo": {},
"cpustat": {
"exclude_metrics": [
"cpu_idle"
]
},
"diskstat": {
"exclude_metrics": [
"disk_total"
],
"exclude_mounts": [
"slurm-tmpfs"
]
},
"gpfs": { "gpfs": {
"exclude_filesystem": [ "exclude_filesystem": [
"test_fs" "test_fs"
@@ -34,8 +21,6 @@
}, },
"numastats": {}, "numastats": {},
"nvidia": {}, "nvidia": {},
"schedstat": {
},
"tempstat": { "tempstat": {
"report_max_temperature": true, "report_max_temperature": true,
"report_critical_temperature": true, "report_critical_temperature": true,

2
go.mod
View File

@@ -11,6 +11,7 @@ require (
github.com/influxdata/line-protocol v0.0.0-20210922203350-b1ad95c89adf github.com/influxdata/line-protocol v0.0.0-20210922203350-b1ad95c89adf
github.com/tklauser/go-sysconf v0.3.16 github.com/tklauser/go-sysconf v0.3.16
golang.design/x/thread v0.0.0-20210122121316-335e9adffdf1 golang.design/x/thread v0.0.0-20210122121316-335e9adffdf1
golang.org/x/exp v0.0.0-20260112195511-716be5621a96
golang.org/x/sys v0.40.0 golang.org/x/sys v0.40.0
) )
@@ -39,7 +40,6 @@ require (
github.com/tklauser/numcpus v0.11.0 // indirect github.com/tklauser/numcpus v0.11.0 // indirect
go.yaml.in/yaml/v2 v2.4.3 // indirect go.yaml.in/yaml/v2 v2.4.3 // indirect
golang.org/x/crypto v0.47.0 // indirect golang.org/x/crypto v0.47.0 // indirect
golang.org/x/exp v0.0.0-20260112195511-716be5621a96 // indirect
golang.org/x/net v0.49.0 // indirect golang.org/x/net v0.49.0 // indirect
google.golang.org/protobuf v1.36.11 // indirect google.golang.org/protobuf v1.36.11 // indirect
) )

View File

@@ -10,7 +10,6 @@ package metricAggregator
import ( import (
"context" "context"
"fmt" "fmt"
"maps"
"math" "math"
"os" "os"
"strings" "strings"
@@ -122,7 +121,9 @@ func (c *metricAggregator) Init(output chan lp.CCMessage) error {
func (c *metricAggregator) Eval(starttime time.Time, endtime time.Time, metrics []lp.CCMessage) { func (c *metricAggregator) Eval(starttime time.Time, endtime time.Time, metrics []lp.CCMessage) {
vars := make(map[string]interface{}) vars := make(map[string]interface{})
maps.Copy(vars, c.constants) for k, v := range c.constants {
vars[k] = v
}
vars["starttime"] = starttime vars["starttime"] = starttime
vars["endtime"] = endtime vars["endtime"] = endtime
for _, f := range c.functions { for _, f := range c.functions {

View File

@@ -11,9 +11,10 @@ import (
"errors" "errors"
"fmt" "fmt"
"regexp" "regexp"
"slices"
"strings" "strings"
"golang.org/x/exp/slices"
topo "github.com/ClusterCockpit/cc-metric-collector/pkg/ccTopology" topo "github.com/ClusterCockpit/cc-metric-collector/pkg/ccTopology"
) )
@@ -168,7 +169,7 @@ func medianfunc(args interface{}) (interface{}, error) {
func lenfunc(args interface{}) (interface{}, error) { func lenfunc(args interface{}) (interface{}, error) {
var err error = nil var err error = nil
length := 0 var length int = 0
switch values := args.(type) { switch values := args.(type) {
case []float64: case []float64:
length = len(values) length = len(values)
@@ -237,7 +238,7 @@ func matchfunc(args ...interface{}) (interface{}, error) {
case string: case string:
switch total := args[1].(type) { switch total := args[1].(type) {
case string: case string:
smatch := strings.ReplaceAll(match, "%", "\\") smatch := strings.Replace(match, "%", "\\", -1)
regex, err := regexp.Compile(smatch) regex, err := regexp.Compile(smatch)
if err != nil { if err != nil {
return false, err return false, err

View File

@@ -51,7 +51,7 @@ type MetricCache interface {
} }
func (c *metricCache) Init(output chan lp.CCMessage, ticker mct.MultiChanTicker, wg *sync.WaitGroup, numPeriods int) error { func (c *metricCache) Init(output chan lp.CCMessage, ticker mct.MultiChanTicker, wg *sync.WaitGroup, numPeriods int) error {
var err error var err error = nil
c.done = make(chan bool) c.done = make(chan bool)
c.wg = wg c.wg = wg
c.ticker = ticker c.ticker = ticker
@@ -161,8 +161,8 @@ func (c *metricCache) DeleteAggregation(name string) error {
// is the current one, index=1 the last interval and so on. Returns and empty array if a wrong index // is the current one, index=1 the last interval and so on. Returns and empty array if a wrong index
// is given (negative index, index larger than configured number of total intervals, ...) // is given (negative index, index larger than configured number of total intervals, ...)
func (c *metricCache) GetPeriod(index int) (time.Time, time.Time, []lp.CCMessage) { func (c *metricCache) GetPeriod(index int) (time.Time, time.Time, []lp.CCMessage) {
start := time.Now() var start time.Time = time.Now()
stop := time.Now() var stop time.Time = time.Now()
var metrics []lp.CCMessage var metrics []lp.CCMessage
if index >= 0 && index < c.numPeriods { if index >= 0 && index < c.numPeriods {
pindex := c.curPeriod - index pindex := c.curPeriod - index

View File

@@ -107,8 +107,10 @@ func (r *metricRouter) Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, rout
cclog.ComponentError("MetricRouter", err.Error()) cclog.ComponentError("MetricRouter", err.Error())
return err return err
} }
r.maxForward = max(1, r.config.MaxForward) r.maxForward = 1
if r.config.MaxForward > r.maxForward {
r.maxForward = r.config.MaxForward
}
if r.config.NumCacheIntervals > 0 { if r.config.NumCacheIntervals > 0 {
r.cache, err = NewCache(r.cache_input, r.ticker, &r.cachewg, r.config.NumCacheIntervals) r.cache, err = NewCache(r.cache_input, r.ticker, &r.cachewg, r.config.NumCacheIntervals)
if err != nil { if err != nil {
@@ -116,74 +118,50 @@ func (r *metricRouter) Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, rout
return err return err
} }
for _, agg := range r.config.IntervalAgg { for _, agg := range r.config.IntervalAgg {
err = r.cache.AddAggregation(agg.Name, agg.Function, agg.Condition, agg.Tags, agg.Meta) r.cache.AddAggregation(agg.Name, agg.Function, agg.Condition, agg.Tags, agg.Meta)
if err != nil {
return fmt.Errorf("MetricCache AddAggregation() failed: %w", err)
}
} }
} }
p, err := mp.NewMessageProcessor() p, err := mp.NewMessageProcessor()
if err != nil { if err != nil {
return fmt.Errorf("MessageProcessor NewMessageProcessor() failed: %w", err) return fmt.Errorf("initialization of message processor failed: %v", err.Error())
} }
r.mp = p r.mp = p
if len(r.config.MessageProcessor) > 0 { if len(r.config.MessageProcessor) > 0 {
err = r.mp.FromConfigJSON(r.config.MessageProcessor) err = r.mp.FromConfigJSON(r.config.MessageProcessor)
if err != nil { if err != nil {
return fmt.Errorf("MessageProcessor FromConfigJSON() failed: %w", err) return fmt.Errorf("failed parsing JSON for message processor: %v", err.Error())
} }
} }
for _, mname := range r.config.DropMetrics { for _, mname := range r.config.DropMetrics {
err = r.mp.AddDropMessagesByName(mname) r.mp.AddDropMessagesByName(mname)
if err != nil {
return fmt.Errorf("MessageProcessor AddDropMessagesByName() failed: %w", err)
}
} }
for _, cond := range r.config.DropMetricsIf { for _, cond := range r.config.DropMetricsIf {
err = r.mp.AddDropMessagesByCondition(cond) r.mp.AddDropMessagesByCondition(cond)
if err != nil {
return fmt.Errorf("MessageProcessor AddDropMessagesByCondition() failed: %w", err)
}
} }
for _, data := range r.config.AddTags { for _, data := range r.config.AddTags {
cond := data.Condition cond := data.Condition
if cond == "*" { if cond == "*" {
cond = "true" cond = "true"
} }
err = r.mp.AddAddTagsByCondition(cond, data.Key, data.Value) r.mp.AddAddTagsByCondition(cond, data.Key, data.Value)
if err != nil {
return fmt.Errorf("MessageProcessor AddAddTagsByCondition() failed: %w", err)
}
} }
for _, data := range r.config.DelTags { for _, data := range r.config.DelTags {
cond := data.Condition cond := data.Condition
if cond == "*" { if cond == "*" {
cond = "true" cond = "true"
} }
err = r.mp.AddDeleteTagsByCondition(cond, data.Key, data.Value) r.mp.AddDeleteTagsByCondition(cond, data.Key, data.Value)
if err != nil {
return fmt.Errorf("MessageProcessor AddDeleteTagsByCondition() failed: %w", err)
}
} }
for oldname, newname := range r.config.RenameMetrics { for oldname, newname := range r.config.RenameMetrics {
err = r.mp.AddRenameMetricByName(oldname, newname) r.mp.AddRenameMetricByName(oldname, newname)
if err != nil {
return fmt.Errorf("MessageProcessor AddRenameMetricByName() failed: %w", err)
}
} }
for metricName, prefix := range r.config.ChangeUnitPrefix { for metricName, prefix := range r.config.ChangeUnitPrefix {
err = r.mp.AddChangeUnitPrefix(fmt.Sprintf("name == '%s'", metricName), prefix) r.mp.AddChangeUnitPrefix(fmt.Sprintf("name == '%s'", metricName), prefix)
if err != nil {
return fmt.Errorf("MessageProcessor AddChangeUnitPrefix() failed: %w", err)
}
} }
r.mp.SetNormalizeUnits(r.config.NormalizeUnits) r.mp.SetNormalizeUnits(r.config.NormalizeUnits)
err = r.mp.AddAddTagsByCondition("true", r.config.HostnameTagName, r.hostname) r.mp.AddAddTagsByCondition("true", r.config.HostnameTagName, r.hostname)
if err != nil {
return fmt.Errorf("MessageProcessor AddAddTagsByCondition() failed: %w", err)
}
// r.config.dropMetrics = make(map[string]bool) // r.config.dropMetrics = make(map[string]bool)
// for _, mname := range r.config.DropMetrics { // for _, mname := range r.config.DropMetrics {

View File

@@ -13,11 +13,11 @@ import (
"os" "os"
"path/filepath" "path/filepath"
"regexp" "regexp"
"slices"
"strconv" "strconv"
"strings" "strings"
cclogger "github.com/ClusterCockpit/cc-lib/v2/ccLogger" cclogger "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
"golang.org/x/exp/slices"
) )
const SYSFS_CPUBASE = `/sys/devices/system/cpu` const SYSFS_CPUBASE = `/sys/devices/system/cpu`
@@ -80,7 +80,7 @@ func fileToList(path string) []int {
// Create list // Create list
list := make([]int, 0) list := make([]int, 0)
stringBuffer := strings.TrimSpace(string(buffer)) stringBuffer := strings.TrimSpace(string(buffer))
for valueRangeString := range strings.SplitSeq(stringBuffer, ",") { for _, valueRangeString := range strings.Split(stringBuffer, ",") {
valueRange := strings.Split(valueRangeString, "-") valueRange := strings.Split(valueRangeString, "-")
switch len(valueRange) { switch len(valueRange) {
case 1: case 1: