mirror of
https://github.com/ClusterCockpit/cc-metric-collector.git
synced 2026-02-13 22:51:45 +01:00
Compare commits
32 Commits
cleanup
...
golangci-l
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ec8177e73b | ||
|
|
8d75aba882 | ||
|
|
094fa52b20 | ||
|
|
80d5adfb97 | ||
|
|
4b831f09a5 | ||
|
|
f6cd593862 | ||
|
|
6000a1a45b | ||
|
|
3633db0d46 | ||
|
|
77a9b5a977 | ||
|
|
baf7a4f2c5 | ||
|
|
2b5bf4d6a5 | ||
|
|
1f7b13349c | ||
|
|
f98800e039 | ||
|
|
396a9f8ce5 | ||
|
|
de5429201b | ||
|
|
c8f4769a82 | ||
|
|
090f6c69a9 | ||
|
|
b2af81a038 | ||
|
|
8f49c7aa67 | ||
|
|
a77cc19ddb | ||
|
|
ff0cd5803d | ||
|
|
5df7b0eb48 | ||
|
|
0f7792b4cb | ||
|
|
e6dc9eba27 | ||
|
|
c9ebef3bad | ||
|
|
faf5088385 | ||
|
|
12ab80ccad | ||
|
|
c24b3a7e4b | ||
|
|
3c03f4ac96 | ||
|
|
dddae13c7a | ||
|
|
159bee1e9f | ||
|
|
86710d9b4b |
5
Makefile
5
Makefile
@@ -72,6 +72,11 @@ staticcheck:
|
|||||||
$(GOBIN) install honnef.co/go/tools/cmd/staticcheck@latest
|
$(GOBIN) install honnef.co/go/tools/cmd/staticcheck@latest
|
||||||
$$($(GOBIN) env GOPATH)/bin/staticcheck ./...
|
$$($(GOBIN) env GOPATH)/bin/staticcheck ./...
|
||||||
|
|
||||||
|
.PHONY: golangci-lint
|
||||||
|
golangci-lint:
|
||||||
|
$(GOBIN) install github.com/golangci/golangci-lint/v2/cmd/golangci-lint@latest
|
||||||
|
$$($(GOBIN) env GOPATH)/bin/golangci-lint run
|
||||||
|
|
||||||
.ONESHELL:
|
.ONESHELL:
|
||||||
.PHONY: RPM
|
.PHONY: RPM
|
||||||
RPM: scripts/cc-metric-collector.spec
|
RPM: scripts/cc-metric-collector.spec
|
||||||
|
|||||||
@@ -67,7 +67,7 @@ A collector reads data from any source, parses it to metrics and submits these m
|
|||||||
* `Read(duration time.Duration, output chan ccMessage.CCMessage)`: Read, parse and submit data to the `output` channel as [`CCMessage`](https://github.com/ClusterCockpit/cc-lib/blob/main/ccMessage/README.md). If the collector has to measure anything for some duration, use the provided function argument `duration`.
|
* `Read(duration time.Duration, output chan ccMessage.CCMessage)`: Read, parse and submit data to the `output` channel as [`CCMessage`](https://github.com/ClusterCockpit/cc-lib/blob/main/ccMessage/README.md). If the collector has to measure anything for some duration, use the provided function argument `duration`.
|
||||||
* `Close()`: Closes down the collector.
|
* `Close()`: Closes down the collector.
|
||||||
|
|
||||||
It is recommanded to call `setup()` in the `Init()` function.
|
It is recommended to call `setup()` in the `Init()` function.
|
||||||
|
|
||||||
Finally, the collector needs to be registered in the `collectorManager.go`. There is a list of collectors called `AvailableCollectors` which is a map (`collector_type_string` -> `pointer to MetricCollector interface`). Add a new entry with a descriptive name and the new collector.
|
Finally, the collector needs to be registered in the `collectorManager.go`. There is a list of collectors called `AvailableCollectors` which is a map (`collector_type_string` -> `pointer to MetricCollector interface`). Add a new entry with a descriptive name and the new collector.
|
||||||
|
|
||||||
@@ -100,11 +100,12 @@ func (m *SampleCollector) Init(config json.RawMessage) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
m.name = "SampleCollector"
|
m.name = "SampleCollector"
|
||||||
m.setup()
|
if err := m.setup(); err != nil {
|
||||||
|
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
|
||||||
|
}
|
||||||
if len(config) > 0 {
|
if len(config) > 0 {
|
||||||
err := json.Unmarshal(config, &m.config)
|
if err := json.Unmarshal(config, &m.config); err != nil {
|
||||||
if err != nil {
|
return fmt.Errorf("%s Init(): json.Unmarshal() call failed: %w", m.name, err)
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
m.meta = map[string]string{"source": m.name, "group": "Sample"}
|
m.meta = map[string]string{"source": m.name, "group": "Sample"}
|
||||||
|
|||||||
@@ -17,6 +17,7 @@ import (
|
|||||||
"os/exec"
|
"os/exec"
|
||||||
"os/user"
|
"os/user"
|
||||||
"regexp"
|
"regexp"
|
||||||
|
"slices"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
@@ -61,7 +62,9 @@ func (m *BeegfsMetaCollector) Init(config json.RawMessage) error {
|
|||||||
"rmXA", "setXA", "mirror"}
|
"rmXA", "setXA", "mirror"}
|
||||||
|
|
||||||
m.name = "BeegfsMetaCollector"
|
m.name = "BeegfsMetaCollector"
|
||||||
m.setup()
|
if err := m.setup(); err != nil {
|
||||||
|
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
|
||||||
|
}
|
||||||
m.parallel = true
|
m.parallel = true
|
||||||
// Set default beegfs-ctl binary
|
// Set default beegfs-ctl binary
|
||||||
|
|
||||||
@@ -78,8 +81,7 @@ func (m *BeegfsMetaCollector) Init(config json.RawMessage) error {
|
|||||||
//create map with possible variables
|
//create map with possible variables
|
||||||
m.matches = make(map[string]string)
|
m.matches = make(map[string]string)
|
||||||
for _, value := range nodeMdstat_array {
|
for _, value := range nodeMdstat_array {
|
||||||
_, skip := stringArrayContains(m.config.ExcludeMetrics, value)
|
if slices.Contains(m.config.ExcludeMetrics, value) {
|
||||||
if skip {
|
|
||||||
m.matches["other"] = "0"
|
m.matches["other"] = "0"
|
||||||
} else {
|
} else {
|
||||||
m.matches["beegfs_cmeta_"+value] = "0"
|
m.matches["beegfs_cmeta_"+value] = "0"
|
||||||
|
|||||||
@@ -17,6 +17,7 @@ import (
|
|||||||
"os/exec"
|
"os/exec"
|
||||||
"os/user"
|
"os/user"
|
||||||
"regexp"
|
"regexp"
|
||||||
|
"slices"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
@@ -54,7 +55,9 @@ func (m *BeegfsStorageCollector) Init(config json.RawMessage) error {
|
|||||||
"storInf", "unlnk"}
|
"storInf", "unlnk"}
|
||||||
|
|
||||||
m.name = "BeegfsStorageCollector"
|
m.name = "BeegfsStorageCollector"
|
||||||
m.setup()
|
if err := m.setup(); err != nil {
|
||||||
|
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
|
||||||
|
}
|
||||||
m.parallel = true
|
m.parallel = true
|
||||||
// Set default beegfs-ctl binary
|
// Set default beegfs-ctl binary
|
||||||
|
|
||||||
@@ -71,8 +74,7 @@ func (m *BeegfsStorageCollector) Init(config json.RawMessage) error {
|
|||||||
//create map with possible variables
|
//create map with possible variables
|
||||||
m.matches = make(map[string]string)
|
m.matches = make(map[string]string)
|
||||||
for _, value := range storageStat_array {
|
for _, value := range storageStat_array {
|
||||||
_, skip := stringArrayContains(m.config.ExcludeMetrics, value)
|
if slices.Contains(m.config.ExcludeMetrics, value) {
|
||||||
if skip {
|
|
||||||
m.matches["other"] = "0"
|
m.matches["other"] = "0"
|
||||||
} else {
|
} else {
|
||||||
m.matches["beegfs_cstorage_"+value] = "0"
|
m.matches["beegfs_cstorage_"+value] = "0"
|
||||||
|
|||||||
@@ -9,6 +9,7 @@ package collectors
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -104,7 +105,7 @@ func (cm *collectorManager) Init(ticker mct.MultiChanTicker, duration time.Durat
|
|||||||
|
|
||||||
err = collector.Init(collectorCfg)
|
err = collector.Init(collectorCfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cclog.ComponentError("CollectorManager", "Collector", collectorName, "initialization failed:", err.Error())
|
cclog.ComponentError("CollectorManager", fmt.Sprintf("Collector %s initialization failed: %v", collectorName, err))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
cclog.ComponentDebug("CollectorManager", "ADD COLLECTOR", collector.Name())
|
cclog.ComponentDebug("CollectorManager", "ADD COLLECTOR", collector.Name())
|
||||||
|
|||||||
@@ -41,9 +41,10 @@ func (m *CPUFreqCpuInfoCollector) Init(config json.RawMessage) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
m.setup()
|
|
||||||
|
|
||||||
m.name = "CPUFreqCpuInfoCollector"
|
m.name = "CPUFreqCpuInfoCollector"
|
||||||
|
if err := m.setup(); err != nil {
|
||||||
|
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
|
||||||
|
}
|
||||||
m.parallel = true
|
m.parallel = true
|
||||||
m.meta = map[string]string{
|
m.meta = map[string]string{
|
||||||
"source": m.name,
|
"source": m.name,
|
||||||
@@ -56,7 +57,6 @@ func (m *CPUFreqCpuInfoCollector) Init(config json.RawMessage) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to open file '%s': %v", cpuInfoFile, err)
|
return fmt.Errorf("failed to open file '%s': %v", cpuInfoFile, err)
|
||||||
}
|
}
|
||||||
defer file.Close()
|
|
||||||
|
|
||||||
// Collect topology information from file cpuinfo
|
// Collect topology information from file cpuinfo
|
||||||
foundFreq := false
|
foundFreq := false
|
||||||
@@ -86,6 +86,10 @@ func (m *CPUFreqCpuInfoCollector) Init(config json.RawMessage) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err := file.Close(); err != nil {
|
||||||
|
return fmt.Errorf("%s Init(): Call to file.Close() failed: %w", m.name, err)
|
||||||
|
}
|
||||||
|
|
||||||
// were all topology information collected?
|
// were all topology information collected?
|
||||||
if foundFreq &&
|
if foundFreq &&
|
||||||
len(processor) > 0 &&
|
len(processor) > 0 &&
|
||||||
@@ -140,7 +144,13 @@ func (m *CPUFreqCpuInfoCollector) Read(interval time.Duration, output chan lp.CC
|
|||||||
fmt.Sprintf("Read(): Failed to open file '%s': %v", cpuInfoFile, err))
|
fmt.Sprintf("Read(): Failed to open file '%s': %v", cpuInfoFile, err))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
defer file.Close()
|
defer func() {
|
||||||
|
if err := file.Close(); err != nil {
|
||||||
|
cclog.ComponentError(
|
||||||
|
m.name,
|
||||||
|
fmt.Sprintf("Read(): Failed to close file '%s': %v", cpuInfoFile, err))
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
processorCounter := 0
|
processorCounter := 0
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
|
|||||||
@@ -48,7 +48,9 @@ func (m *CPUFreqCollector) Init(config json.RawMessage) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
m.name = "CPUFreqCollector"
|
m.name = "CPUFreqCollector"
|
||||||
m.setup()
|
if err := m.setup(); err != nil {
|
||||||
|
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
|
||||||
|
}
|
||||||
m.parallel = true
|
m.parallel = true
|
||||||
if len(config) > 0 {
|
if len(config) > 0 {
|
||||||
err := json.Unmarshal(config, &m.config)
|
err := json.Unmarshal(config, &m.config)
|
||||||
|
|||||||
@@ -12,6 +12,7 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
|
"slices"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
@@ -39,10 +40,17 @@ type CpustatCollector struct {
|
|||||||
|
|
||||||
func (m *CpustatCollector) Init(config json.RawMessage) error {
|
func (m *CpustatCollector) Init(config json.RawMessage) error {
|
||||||
m.name = "CpustatCollector"
|
m.name = "CpustatCollector"
|
||||||
m.setup()
|
if err := m.setup(); err != nil {
|
||||||
|
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
|
||||||
|
}
|
||||||
m.parallel = true
|
m.parallel = true
|
||||||
m.meta = map[string]string{"source": m.name, "group": "CPU"}
|
m.meta = map[string]string{
|
||||||
m.nodetags = map[string]string{"type": "node"}
|
"source": m.name,
|
||||||
|
"group": "CPU",
|
||||||
|
}
|
||||||
|
m.nodetags = map[string]string{
|
||||||
|
"type": "node",
|
||||||
|
}
|
||||||
if len(config) > 0 {
|
if len(config) > 0 {
|
||||||
err := json.Unmarshal(config, &m.config)
|
err := json.Unmarshal(config, &m.config)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -64,14 +72,7 @@ func (m *CpustatCollector) Init(config json.RawMessage) error {
|
|||||||
|
|
||||||
m.matches = make(map[string]int)
|
m.matches = make(map[string]int)
|
||||||
for match, index := range matches {
|
for match, index := range matches {
|
||||||
doExclude := false
|
if !slices.Contains(m.config.ExcludeMetrics, match) {
|
||||||
for _, exclude := range m.config.ExcludeMetrics {
|
|
||||||
if match == exclude {
|
|
||||||
doExclude = true
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if !doExclude {
|
|
||||||
m.matches[match] = index
|
m.matches[match] = index
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -79,9 +80,17 @@ func (m *CpustatCollector) Init(config json.RawMessage) error {
|
|||||||
// Check input file
|
// Check input file
|
||||||
file, err := os.Open(string(CPUSTATFILE))
|
file, err := os.Open(string(CPUSTATFILE))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cclog.ComponentError(m.name, err.Error())
|
cclog.ComponentError(
|
||||||
|
m.name,
|
||||||
|
fmt.Sprintf("Init(): Failed to open file '%s': %v", string(CPUSTATFILE), err))
|
||||||
}
|
}
|
||||||
defer file.Close()
|
defer func() {
|
||||||
|
if err := file.Close(); err != nil {
|
||||||
|
cclog.ComponentError(
|
||||||
|
m.name,
|
||||||
|
fmt.Sprintf("Init(): Failed to close file '%s': %v", string(CPUSTATFILE), err))
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
// Pre-generate tags for all CPUs
|
// Pre-generate tags for all CPUs
|
||||||
num_cpus := 0
|
num_cpus := 0
|
||||||
@@ -155,9 +164,17 @@ func (m *CpustatCollector) Read(interval time.Duration, output chan lp.CCMessage
|
|||||||
|
|
||||||
file, err := os.Open(string(CPUSTATFILE))
|
file, err := os.Open(string(CPUSTATFILE))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cclog.ComponentError(m.name, err.Error())
|
cclog.ComponentError(
|
||||||
|
m.name,
|
||||||
|
fmt.Sprintf("Read(): Failed to open file '%s': %v", string(CPUSTATFILE), err))
|
||||||
}
|
}
|
||||||
defer file.Close()
|
defer func() {
|
||||||
|
if err := file.Close(); err != nil {
|
||||||
|
cclog.ComponentError(
|
||||||
|
m.name,
|
||||||
|
fmt.Sprintf("Read(): Failed to close file '%s': %v", string(CPUSTATFILE), err))
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
scanner := bufio.NewScanner(file)
|
scanner := bufio.NewScanner(file)
|
||||||
for scanner.Scan() {
|
for scanner.Scan() {
|
||||||
|
|||||||
@@ -10,9 +10,11 @@ package collectors
|
|||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"os"
|
"os"
|
||||||
"os/exec"
|
"os/exec"
|
||||||
|
"slices"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -49,11 +51,16 @@ func (m *CustomCmdCollector) Init(config json.RawMessage) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
m.setup()
|
if err := m.setup(); err != nil {
|
||||||
|
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
|
||||||
|
}
|
||||||
for _, c := range m.config.Commands {
|
for _, c := range m.config.Commands {
|
||||||
cmdfields := strings.Fields(c)
|
cmdfields := strings.Fields(c)
|
||||||
command := exec.Command(cmdfields[0], strings.Join(cmdfields[1:], " "))
|
command := exec.Command(cmdfields[0], cmdfields[1:]...)
|
||||||
command.Wait()
|
if err := command.Wait(); err != nil {
|
||||||
|
log.Print(err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
_, err = command.Output()
|
_, err = command.Output()
|
||||||
if err == nil {
|
if err == nil {
|
||||||
m.commands = append(m.commands, c)
|
m.commands = append(m.commands, c)
|
||||||
@@ -88,8 +95,11 @@ func (m *CustomCmdCollector) Read(interval time.Duration, output chan lp.CCMessa
|
|||||||
}
|
}
|
||||||
for _, cmd := range m.commands {
|
for _, cmd := range m.commands {
|
||||||
cmdfields := strings.Fields(cmd)
|
cmdfields := strings.Fields(cmd)
|
||||||
command := exec.Command(cmdfields[0], strings.Join(cmdfields[1:], " "))
|
command := exec.Command(cmdfields[0], cmdfields[1:]...)
|
||||||
command.Wait()
|
if err := command.Wait(); err != nil {
|
||||||
|
log.Print(err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
stdout, err := command.Output()
|
stdout, err := command.Output()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Print(err)
|
log.Print(err)
|
||||||
@@ -101,8 +111,7 @@ func (m *CustomCmdCollector) Read(interval time.Duration, output chan lp.CCMessa
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
for _, c := range cmdmetrics {
|
for _, c := range cmdmetrics {
|
||||||
_, skip := stringArrayContains(m.config.ExcludeMetrics, c.Name())
|
if slices.Contains(m.config.ExcludeMetrics, c.Name()) {
|
||||||
if skip {
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -121,8 +130,7 @@ func (m *CustomCmdCollector) Read(interval time.Duration, output chan lp.CCMessa
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
for _, f := range fmetrics {
|
for _, f := range fmetrics {
|
||||||
_, skip := stringArrayContains(m.config.ExcludeMetrics, f.Name())
|
if slices.Contains(m.config.ExcludeMetrics, f.Name()) {
|
||||||
if skip {
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
output <- lp.FromInfluxMetric(f)
|
output <- lp.FromInfluxMetric(f)
|
||||||
|
|||||||
@@ -10,6 +10,7 @@ package collectors
|
|||||||
import (
|
import (
|
||||||
"bufio"
|
"bufio"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"strings"
|
"strings"
|
||||||
"syscall"
|
"syscall"
|
||||||
@@ -36,7 +37,9 @@ func (m *DiskstatCollector) Init(config json.RawMessage) error {
|
|||||||
m.name = "DiskstatCollector"
|
m.name = "DiskstatCollector"
|
||||||
m.parallel = true
|
m.parallel = true
|
||||||
m.meta = map[string]string{"source": m.name, "group": "Disk"}
|
m.meta = map[string]string{"source": m.name, "group": "Disk"}
|
||||||
m.setup()
|
if err := m.setup(); err != nil {
|
||||||
|
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
|
||||||
|
}
|
||||||
if len(config) > 0 {
|
if len(config) > 0 {
|
||||||
if err := json.Unmarshal(config, &m.config); err != nil {
|
if err := json.Unmarshal(config, &m.config); err != nil {
|
||||||
return err
|
return err
|
||||||
@@ -54,10 +57,11 @@ func (m *DiskstatCollector) Init(config json.RawMessage) error {
|
|||||||
}
|
}
|
||||||
file, err := os.Open(MOUNTFILE)
|
file, err := os.Open(MOUNTFILE)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cclog.ComponentError(m.name, err.Error())
|
return fmt.Errorf("%s Init(): file open for file \"%s\" failed: %w", m.name, MOUNTFILE, err)
|
||||||
return err
|
}
|
||||||
|
if err := file.Close(); err != nil {
|
||||||
|
return fmt.Errorf("%s Init(): file close for file \"%s\" failed: %w", m.name, MOUNTFILE, err)
|
||||||
}
|
}
|
||||||
defer file.Close()
|
|
||||||
m.init = true
|
m.init = true
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -69,10 +73,18 @@ func (m *DiskstatCollector) Read(interval time.Duration, output chan lp.CCMessag
|
|||||||
|
|
||||||
file, err := os.Open(MOUNTFILE)
|
file, err := os.Open(MOUNTFILE)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cclog.ComponentError(m.name, err.Error())
|
cclog.ComponentError(
|
||||||
|
m.name,
|
||||||
|
fmt.Sprintf("Read(): Failed to open file '%s': %v", MOUNTFILE, err))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
defer file.Close()
|
defer func() {
|
||||||
|
if err := file.Close(); err != nil {
|
||||||
|
cclog.ComponentError(
|
||||||
|
m.name,
|
||||||
|
fmt.Sprintf("Read(): Failed to close file '%s': %v", MOUNTFILE, err))
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
part_max_used := uint64(0)
|
part_max_used := uint64(0)
|
||||||
scanner := bufio.NewScanner(file)
|
scanner := bufio.NewScanner(file)
|
||||||
@@ -93,7 +105,7 @@ mountLoop:
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
mountPath := strings.Replace(linefields[1], `\040`, " ", -1)
|
mountPath := strings.ReplaceAll(linefields[1], `\040`, " ")
|
||||||
|
|
||||||
for _, excl := range m.config.ExcludeMounts {
|
for _, excl := range m.config.ExcludeMounts {
|
||||||
if strings.Contains(mountPath, excl) {
|
if strings.Contains(mountPath, excl) {
|
||||||
|
|||||||
@@ -17,6 +17,7 @@ import (
|
|||||||
"log"
|
"log"
|
||||||
"os/exec"
|
"os/exec"
|
||||||
"os/user"
|
"os/user"
|
||||||
|
"slices"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"syscall"
|
"syscall"
|
||||||
@@ -43,11 +44,11 @@ type GpfsCollectorConfig struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type GpfsMetricDefinition struct {
|
type GpfsMetricDefinition struct {
|
||||||
name string
|
name string
|
||||||
desc string
|
desc string
|
||||||
prefix string
|
prefix string
|
||||||
unit string
|
unit string
|
||||||
calc string
|
calc string
|
||||||
}
|
}
|
||||||
|
|
||||||
type GpfsCollector struct {
|
type GpfsCollector struct {
|
||||||
@@ -56,251 +57,251 @@ type GpfsCollector struct {
|
|||||||
config GpfsCollectorConfig
|
config GpfsCollectorConfig
|
||||||
sudoCmd string
|
sudoCmd string
|
||||||
skipFS map[string]struct{}
|
skipFS map[string]struct{}
|
||||||
lastTimestamp map[string]time.Time // Store timestamp of lastState per filesystem to derive bandwidths
|
lastTimestamp map[string]time.Time // Store timestamp of lastState per filesystem to derive bandwidths
|
||||||
definitions []GpfsMetricDefinition // all metrics to report
|
definitions []GpfsMetricDefinition // all metrics to report
|
||||||
lastState map[string]GpfsCollectorState // one GpfsCollectorState per filesystem
|
lastState map[string]GpfsCollectorState // one GpfsCollectorState per filesystem
|
||||||
}
|
}
|
||||||
|
|
||||||
var GpfsAbsMetrics = []GpfsMetricDefinition{
|
var GpfsAbsMetrics = []GpfsMetricDefinition{
|
||||||
{
|
{
|
||||||
name: "gpfs_num_opens",
|
name: "gpfs_num_opens",
|
||||||
desc: "number of opens",
|
desc: "number of opens",
|
||||||
prefix: "_oc_",
|
prefix: "_oc_",
|
||||||
unit: "requests",
|
unit: "requests",
|
||||||
calc: "none",
|
calc: "none",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "gpfs_num_closes",
|
name: "gpfs_num_closes",
|
||||||
desc: "number of closes",
|
desc: "number of closes",
|
||||||
prefix: "_cc_",
|
prefix: "_cc_",
|
||||||
unit: "requests",
|
unit: "requests",
|
||||||
calc: "none",
|
calc: "none",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "gpfs_num_reads",
|
name: "gpfs_num_reads",
|
||||||
desc: "number of reads",
|
desc: "number of reads",
|
||||||
prefix: "_rdc_",
|
prefix: "_rdc_",
|
||||||
unit: "requests",
|
unit: "requests",
|
||||||
calc: "none",
|
calc: "none",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "gpfs_num_writes",
|
name: "gpfs_num_writes",
|
||||||
desc: "number of writes",
|
desc: "number of writes",
|
||||||
prefix: "_wc_",
|
prefix: "_wc_",
|
||||||
unit: "requests",
|
unit: "requests",
|
||||||
calc: "none",
|
calc: "none",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "gpfs_num_readdirs",
|
name: "gpfs_num_readdirs",
|
||||||
desc: "number of readdirs",
|
desc: "number of readdirs",
|
||||||
prefix: "_dir_",
|
prefix: "_dir_",
|
||||||
unit: "requests",
|
unit: "requests",
|
||||||
calc: "none",
|
calc: "none",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "gpfs_num_inode_updates",
|
name: "gpfs_num_inode_updates",
|
||||||
desc: "number of Inode Updates",
|
desc: "number of Inode Updates",
|
||||||
prefix: "_iu_",
|
prefix: "_iu_",
|
||||||
unit: "requests",
|
unit: "requests",
|
||||||
calc: "none",
|
calc: "none",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "gpfs_bytes_read",
|
name: "gpfs_bytes_read",
|
||||||
desc: "bytes read",
|
desc: "bytes read",
|
||||||
prefix: "_br_",
|
prefix: "_br_",
|
||||||
unit: "bytes",
|
unit: "bytes",
|
||||||
calc: "none",
|
calc: "none",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "gpfs_bytes_written",
|
name: "gpfs_bytes_written",
|
||||||
desc: "bytes written",
|
desc: "bytes written",
|
||||||
prefix: "_bw_",
|
prefix: "_bw_",
|
||||||
unit: "bytes",
|
unit: "bytes",
|
||||||
calc: "none",
|
calc: "none",
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
var GpfsDiffMetrics = []GpfsMetricDefinition{
|
var GpfsDiffMetrics = []GpfsMetricDefinition{
|
||||||
{
|
{
|
||||||
name: "gpfs_num_opens_diff",
|
name: "gpfs_num_opens_diff",
|
||||||
desc: "number of opens (diff)",
|
desc: "number of opens (diff)",
|
||||||
prefix: "_oc_",
|
prefix: "_oc_",
|
||||||
unit: "requests",
|
unit: "requests",
|
||||||
calc: "difference",
|
calc: "difference",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "gpfs_num_closes_diff",
|
name: "gpfs_num_closes_diff",
|
||||||
desc: "number of closes (diff)",
|
desc: "number of closes (diff)",
|
||||||
prefix: "_cc_",
|
prefix: "_cc_",
|
||||||
unit: "requests",
|
unit: "requests",
|
||||||
calc: "difference",
|
calc: "difference",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "gpfs_num_reads_diff",
|
name: "gpfs_num_reads_diff",
|
||||||
desc: "number of reads (diff)",
|
desc: "number of reads (diff)",
|
||||||
prefix: "_rdc_",
|
prefix: "_rdc_",
|
||||||
unit: "requests",
|
unit: "requests",
|
||||||
calc: "difference",
|
calc: "difference",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "gpfs_num_writes_diff",
|
name: "gpfs_num_writes_diff",
|
||||||
desc: "number of writes (diff)",
|
desc: "number of writes (diff)",
|
||||||
prefix: "_wc_",
|
prefix: "_wc_",
|
||||||
unit: "requests",
|
unit: "requests",
|
||||||
calc: "difference",
|
calc: "difference",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "gpfs_num_readdirs_diff",
|
name: "gpfs_num_readdirs_diff",
|
||||||
desc: "number of readdirs (diff)",
|
desc: "number of readdirs (diff)",
|
||||||
prefix: "_dir_",
|
prefix: "_dir_",
|
||||||
unit: "requests",
|
unit: "requests",
|
||||||
calc: "difference",
|
calc: "difference",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "gpfs_num_inode_updates_diff",
|
name: "gpfs_num_inode_updates_diff",
|
||||||
desc: "number of Inode Updates (diff)",
|
desc: "number of Inode Updates (diff)",
|
||||||
prefix: "_iu_",
|
prefix: "_iu_",
|
||||||
unit: "requests",
|
unit: "requests",
|
||||||
calc: "difference",
|
calc: "difference",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "gpfs_bytes_read_diff",
|
name: "gpfs_bytes_read_diff",
|
||||||
desc: "bytes read (diff)",
|
desc: "bytes read (diff)",
|
||||||
prefix: "_br_",
|
prefix: "_br_",
|
||||||
unit: "bytes",
|
unit: "bytes",
|
||||||
calc: "difference",
|
calc: "difference",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "gpfs_bytes_written_diff",
|
name: "gpfs_bytes_written_diff",
|
||||||
desc: "bytes written (diff)",
|
desc: "bytes written (diff)",
|
||||||
prefix: "_bw_",
|
prefix: "_bw_",
|
||||||
unit: "bytes",
|
unit: "bytes",
|
||||||
calc: "difference",
|
calc: "difference",
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
var GpfsDeriveMetrics = []GpfsMetricDefinition{
|
var GpfsDeriveMetrics = []GpfsMetricDefinition{
|
||||||
{
|
{
|
||||||
name: "gpfs_opens_rate",
|
name: "gpfs_opens_rate",
|
||||||
desc: "number of opens (rate)",
|
desc: "number of opens (rate)",
|
||||||
prefix: "_oc_",
|
prefix: "_oc_",
|
||||||
unit: "requests/sec",
|
unit: "requests/sec",
|
||||||
calc: "derivative",
|
calc: "derivative",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "gpfs_closes_rate",
|
name: "gpfs_closes_rate",
|
||||||
desc: "number of closes (rate)",
|
desc: "number of closes (rate)",
|
||||||
prefix: "_oc_",
|
prefix: "_oc_",
|
||||||
unit: "requests/sec",
|
unit: "requests/sec",
|
||||||
calc: "derivative",
|
calc: "derivative",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "gpfs_reads_rate",
|
name: "gpfs_reads_rate",
|
||||||
desc: "number of reads (rate)",
|
desc: "number of reads (rate)",
|
||||||
prefix: "_rdc_",
|
prefix: "_rdc_",
|
||||||
unit: "requests/sec",
|
unit: "requests/sec",
|
||||||
calc: "derivative",
|
calc: "derivative",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "gpfs_writes_rate",
|
name: "gpfs_writes_rate",
|
||||||
desc: "number of writes (rate)",
|
desc: "number of writes (rate)",
|
||||||
prefix: "_wc_",
|
prefix: "_wc_",
|
||||||
unit: "requests/sec",
|
unit: "requests/sec",
|
||||||
calc: "derivative",
|
calc: "derivative",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "gpfs_readdirs_rate",
|
name: "gpfs_readdirs_rate",
|
||||||
desc: "number of readdirs (rate)",
|
desc: "number of readdirs (rate)",
|
||||||
prefix: "_dir_",
|
prefix: "_dir_",
|
||||||
unit: "requests/sec",
|
unit: "requests/sec",
|
||||||
calc: "derivative",
|
calc: "derivative",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "gpfs_inode_updates_rate",
|
name: "gpfs_inode_updates_rate",
|
||||||
desc: "number of Inode Updates (rate)",
|
desc: "number of Inode Updates (rate)",
|
||||||
prefix: "_iu_",
|
prefix: "_iu_",
|
||||||
unit: "requests/sec",
|
unit: "requests/sec",
|
||||||
calc: "derivative",
|
calc: "derivative",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "gpfs_bw_read",
|
name: "gpfs_bw_read",
|
||||||
desc: "bytes read (rate)",
|
desc: "bytes read (rate)",
|
||||||
prefix: "_br_",
|
prefix: "_br_",
|
||||||
unit: "bytes/sec",
|
unit: "bytes/sec",
|
||||||
calc: "derivative",
|
calc: "derivative",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "gpfs_bw_write",
|
name: "gpfs_bw_write",
|
||||||
desc: "bytes written (rate)",
|
desc: "bytes written (rate)",
|
||||||
prefix: "_bw_",
|
prefix: "_bw_",
|
||||||
unit: "bytes/sec",
|
unit: "bytes/sec",
|
||||||
calc: "derivative",
|
calc: "derivative",
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
var GpfsTotalMetrics = []GpfsMetricDefinition{
|
var GpfsTotalMetrics = []GpfsMetricDefinition{
|
||||||
{
|
{
|
||||||
name: "gpfs_bytes_total",
|
name: "gpfs_bytes_total",
|
||||||
desc: "bytes total",
|
desc: "bytes total",
|
||||||
prefix: "bytesTotal",
|
prefix: "bytesTotal",
|
||||||
unit: "bytes",
|
unit: "bytes",
|
||||||
calc: "none",
|
calc: "none",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "gpfs_bytes_total_diff",
|
name: "gpfs_bytes_total_diff",
|
||||||
desc: "bytes total (diff)",
|
desc: "bytes total (diff)",
|
||||||
prefix: "bytesTotal",
|
prefix: "bytesTotal",
|
||||||
unit: "bytes",
|
unit: "bytes",
|
||||||
calc: "difference",
|
calc: "difference",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "gpfs_bw_total",
|
name: "gpfs_bw_total",
|
||||||
desc: "bytes total (rate)",
|
desc: "bytes total (rate)",
|
||||||
prefix: "bytesTotal",
|
prefix: "bytesTotal",
|
||||||
unit: "bytes/sec",
|
unit: "bytes/sec",
|
||||||
calc: "derivative",
|
calc: "derivative",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "gpfs_iops",
|
name: "gpfs_iops",
|
||||||
desc: "iops",
|
desc: "iops",
|
||||||
prefix: "iops",
|
prefix: "iops",
|
||||||
unit: "requests",
|
unit: "requests",
|
||||||
calc: "none",
|
calc: "none",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "gpfs_iops_diff",
|
name: "gpfs_iops_diff",
|
||||||
desc: "iops (diff)",
|
desc: "iops (diff)",
|
||||||
prefix: "iops",
|
prefix: "iops",
|
||||||
unit: "requests",
|
unit: "requests",
|
||||||
calc: "difference",
|
calc: "difference",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "gpfs_iops_rate",
|
name: "gpfs_iops_rate",
|
||||||
desc: "iops (rate)",
|
desc: "iops (rate)",
|
||||||
prefix: "iops",
|
prefix: "iops",
|
||||||
unit: "requests/sec",
|
unit: "requests/sec",
|
||||||
calc: "derivative",
|
calc: "derivative",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "gpfs_metaops",
|
name: "gpfs_metaops",
|
||||||
desc: "metaops",
|
desc: "metaops",
|
||||||
prefix: "metaops",
|
prefix: "metaops",
|
||||||
unit: "requests",
|
unit: "requests",
|
||||||
calc: "none",
|
calc: "none",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "gpfs_metaops_diff",
|
name: "gpfs_metaops_diff",
|
||||||
desc: "metaops (diff)",
|
desc: "metaops (diff)",
|
||||||
prefix: "metaops",
|
prefix: "metaops",
|
||||||
unit: "requests",
|
unit: "requests",
|
||||||
calc: "difference",
|
calc: "difference",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "gpfs_metaops_rate",
|
name: "gpfs_metaops_rate",
|
||||||
desc: "metaops (rate)",
|
desc: "metaops (rate)",
|
||||||
prefix: "metaops",
|
prefix: "metaops",
|
||||||
unit: "requests/sec",
|
unit: "requests/sec",
|
||||||
calc: "derivative",
|
calc: "derivative",
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -310,9 +311,10 @@ func (m *GpfsCollector) Init(config json.RawMessage) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var err error
|
|
||||||
m.name = "GpfsCollector"
|
m.name = "GpfsCollector"
|
||||||
m.setup()
|
if err := m.setup(); err != nil {
|
||||||
|
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
|
||||||
|
}
|
||||||
m.parallel = true
|
m.parallel = true
|
||||||
|
|
||||||
// Set default mmpmon binary
|
// Set default mmpmon binary
|
||||||
@@ -320,7 +322,7 @@ func (m *GpfsCollector) Init(config json.RawMessage) error {
|
|||||||
|
|
||||||
// Read JSON configuration
|
// Read JSON configuration
|
||||||
if len(config) > 0 {
|
if len(config) > 0 {
|
||||||
err = json.Unmarshal(config, &m.config)
|
err := json.Unmarshal(config, &m.config)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Print(err.Error())
|
log.Print(err.Error())
|
||||||
return err
|
return err
|
||||||
@@ -366,7 +368,7 @@ func (m *GpfsCollector) Init(config json.RawMessage) error {
|
|||||||
if m.config.Sudo && !strings.HasPrefix(m.config.Mmpmon, "/") {
|
if m.config.Sudo && !strings.HasPrefix(m.config.Mmpmon, "/") {
|
||||||
return fmt.Errorf("when using sudo, mmpmon_path must be provided and an absolute path: %s", m.config.Mmpmon)
|
return fmt.Errorf("when using sudo, mmpmon_path must be provided and an absolute path: %s", m.config.Mmpmon)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check if mmpmon is in executable search path
|
// Check if mmpmon is in executable search path
|
||||||
p, err := exec.LookPath(m.config.Mmpmon)
|
p, err := exec.LookPath(m.config.Mmpmon)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -385,28 +387,28 @@ func (m *GpfsCollector) Init(config json.RawMessage) error {
|
|||||||
m.definitions = []GpfsMetricDefinition{}
|
m.definitions = []GpfsMetricDefinition{}
|
||||||
if m.config.SendAbsoluteValues {
|
if m.config.SendAbsoluteValues {
|
||||||
for _, def := range GpfsAbsMetrics {
|
for _, def := range GpfsAbsMetrics {
|
||||||
if _, skip := stringArrayContains(m.config.ExcludeMetrics, def.name); !skip {
|
if !slices.Contains(m.config.ExcludeMetrics, def.name) {
|
||||||
m.definitions = append(m.definitions, def)
|
m.definitions = append(m.definitions, def)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if m.config.SendDiffValues {
|
if m.config.SendDiffValues {
|
||||||
for _, def := range GpfsDiffMetrics {
|
for _, def := range GpfsDiffMetrics {
|
||||||
if _, skip := stringArrayContains(m.config.ExcludeMetrics, def.name); !skip {
|
if !slices.Contains(m.config.ExcludeMetrics, def.name) {
|
||||||
m.definitions = append(m.definitions, def)
|
m.definitions = append(m.definitions, def)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if m.config.SendDerivedValues {
|
if m.config.SendDerivedValues {
|
||||||
for _, def := range GpfsDeriveMetrics {
|
for _, def := range GpfsDeriveMetrics {
|
||||||
if _, skip := stringArrayContains(m.config.ExcludeMetrics, def.name); !skip {
|
if !slices.Contains(m.config.ExcludeMetrics, def.name) {
|
||||||
m.definitions = append(m.definitions, def)
|
m.definitions = append(m.definitions, def)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else if m.config.SendBandwidths {
|
} else if m.config.SendBandwidths {
|
||||||
for _, def := range GpfsDeriveMetrics {
|
for _, def := range GpfsDeriveMetrics {
|
||||||
if def.unit == "bytes/sec" {
|
if def.unit == "bytes/sec" {
|
||||||
if _, skip := stringArrayContains(m.config.ExcludeMetrics, def.name); !skip {
|
if !slices.Contains(m.config.ExcludeMetrics, def.name) {
|
||||||
m.definitions = append(m.definitions, def)
|
m.definitions = append(m.definitions, def)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -414,19 +416,19 @@ func (m *GpfsCollector) Init(config json.RawMessage) error {
|
|||||||
}
|
}
|
||||||
if m.config.SendTotalValues {
|
if m.config.SendTotalValues {
|
||||||
for _, def := range GpfsTotalMetrics {
|
for _, def := range GpfsTotalMetrics {
|
||||||
if _, skip := stringArrayContains(m.config.ExcludeMetrics, def.name); !skip {
|
if !slices.Contains(m.config.ExcludeMetrics, def.name) {
|
||||||
// only send total metrics of the types requested
|
// only send total metrics of the types requested
|
||||||
if ( def.calc == "none" && m.config.SendAbsoluteValues ) ||
|
if (def.calc == "none" && m.config.SendAbsoluteValues) ||
|
||||||
( def.calc == "difference" && m.config.SendDiffValues ) ||
|
(def.calc == "difference" && m.config.SendDiffValues) ||
|
||||||
( def.calc == "derivative" && m.config.SendDerivedValues ) {
|
(def.calc == "derivative" && m.config.SendDerivedValues) {
|
||||||
m.definitions = append(m.definitions, def)
|
m.definitions = append(m.definitions, def)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else if m.config.SendBandwidths {
|
} else if m.config.SendBandwidths {
|
||||||
for _, def := range GpfsTotalMetrics {
|
for _, def := range GpfsTotalMetrics {
|
||||||
if def.unit == "bytes/sec" {
|
if def.unit == "bytes/sec" {
|
||||||
if _, skip := stringArrayContains(m.config.ExcludeMetrics, def.name); !skip {
|
if !slices.Contains(m.config.ExcludeMetrics, def.name) {
|
||||||
m.definitions = append(m.definitions, def)
|
m.definitions = append(m.definitions, def)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -456,7 +458,7 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMessage) {
|
|||||||
} else {
|
} else {
|
||||||
cmd = exec.Command(m.config.Mmpmon, "-p", "-s")
|
cmd = exec.Command(m.config.Mmpmon, "-p", "-s")
|
||||||
}
|
}
|
||||||
|
|
||||||
cmd.Stdin = strings.NewReader("once fs_io_s\n")
|
cmd.Stdin = strings.NewReader("once fs_io_s\n")
|
||||||
cmdStdout := new(bytes.Buffer)
|
cmdStdout := new(bytes.Buffer)
|
||||||
cmdStderr := new(bytes.Buffer)
|
cmdStderr := new(bytes.Buffer)
|
||||||
@@ -617,7 +619,7 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMessage) {
|
|||||||
}
|
}
|
||||||
case "derivative":
|
case "derivative":
|
||||||
if vnew_ok && vold_ok && timeDiff > 0 {
|
if vnew_ok && vold_ok && timeDiff > 0 {
|
||||||
value = float64(vnew - vold) / timeDiff
|
value = float64(vnew-vold) / timeDiff
|
||||||
if value.(float64) < 0 {
|
if value.(float64) < 0 {
|
||||||
value = 0
|
value = 0
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -65,7 +65,9 @@ func (m *InfinibandCollector) Init(config json.RawMessage) error {
|
|||||||
|
|
||||||
var err error
|
var err error
|
||||||
m.name = "InfinibandCollector"
|
m.name = "InfinibandCollector"
|
||||||
m.setup()
|
if err := m.setup(); err != nil {
|
||||||
|
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
|
||||||
|
}
|
||||||
m.parallel = true
|
m.parallel = true
|
||||||
m.meta = map[string]string{
|
m.meta = map[string]string{
|
||||||
"source": m.name,
|
"source": m.name,
|
||||||
|
|||||||
@@ -11,7 +11,9 @@ import (
|
|||||||
"bufio"
|
"bufio"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
|
"slices"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
@@ -45,7 +47,9 @@ func (m *IOstatCollector) Init(config json.RawMessage) error {
|
|||||||
m.name = "IOstatCollector"
|
m.name = "IOstatCollector"
|
||||||
m.parallel = true
|
m.parallel = true
|
||||||
m.meta = map[string]string{"source": m.name, "group": "Disk"}
|
m.meta = map[string]string{"source": m.name, "group": "Disk"}
|
||||||
m.setup()
|
if err := m.setup(); err != nil {
|
||||||
|
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
|
||||||
|
}
|
||||||
if len(config) > 0 {
|
if len(config) > 0 {
|
||||||
err = json.Unmarshal(config, &m.config)
|
err = json.Unmarshal(config, &m.config)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -75,7 +79,7 @@ func (m *IOstatCollector) Init(config json.RawMessage) error {
|
|||||||
m.devices = make(map[string]IOstatCollectorEntry)
|
m.devices = make(map[string]IOstatCollectorEntry)
|
||||||
m.matches = make(map[string]int)
|
m.matches = make(map[string]int)
|
||||||
for k, v := range matches {
|
for k, v := range matches {
|
||||||
if _, skip := stringArrayContains(m.config.ExcludeMetrics, k); !skip {
|
if !slices.Contains(m.config.ExcludeMetrics, k) {
|
||||||
m.matches[k] = v
|
m.matches[k] = v
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -84,10 +88,8 @@ func (m *IOstatCollector) Init(config json.RawMessage) error {
|
|||||||
}
|
}
|
||||||
file, err := os.Open(IOSTATFILE)
|
file, err := os.Open(IOSTATFILE)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cclog.ComponentError(m.name, err.Error())
|
return fmt.Errorf("%s Init(): Failed to open file \"%s\": %w", m.name, IOSTATFILE, err)
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
defer file.Close()
|
|
||||||
|
|
||||||
scanner := bufio.NewScanner(file)
|
scanner := bufio.NewScanner(file)
|
||||||
for scanner.Scan() {
|
for scanner.Scan() {
|
||||||
@@ -101,7 +103,7 @@ func (m *IOstatCollector) Init(config json.RawMessage) error {
|
|||||||
if strings.Contains(device, "loop") {
|
if strings.Contains(device, "loop") {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if _, skip := stringArrayContains(m.config.ExcludeDevices, device); skip {
|
if slices.Contains(m.config.ExcludeDevices, device) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
currentValues := make(map[string]int64)
|
currentValues := make(map[string]int64)
|
||||||
@@ -127,6 +129,10 @@ func (m *IOstatCollector) Init(config json.RawMessage) error {
|
|||||||
lastValues: lastValues,
|
lastValues: lastValues,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if err := file.Close(); err != nil {
|
||||||
|
return fmt.Errorf("%s Init(): Failed to close file \"%s\": %w", m.name, IOSTATFILE, err)
|
||||||
|
}
|
||||||
|
|
||||||
m.init = true
|
m.init = true
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -138,10 +144,18 @@ func (m *IOstatCollector) Read(interval time.Duration, output chan lp.CCMessage)
|
|||||||
|
|
||||||
file, err := os.Open(IOSTATFILE)
|
file, err := os.Open(IOSTATFILE)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cclog.ComponentError(m.name, err.Error())
|
cclog.ComponentError(
|
||||||
|
m.name,
|
||||||
|
fmt.Sprintf("Read(): Failed to open file '%s': %v", IOSTATFILE, err))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
defer file.Close()
|
defer func() {
|
||||||
|
if err := file.Close(); err != nil {
|
||||||
|
cclog.ComponentError(
|
||||||
|
m.name,
|
||||||
|
fmt.Sprintf("Read(): Failed to close file '%s': %v", IOSTATFILE, err))
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
scanner := bufio.NewScanner(file)
|
scanner := bufio.NewScanner(file)
|
||||||
for scanner.Scan() {
|
for scanner.Scan() {
|
||||||
@@ -157,7 +171,7 @@ func (m *IOstatCollector) Read(interval time.Duration, output chan lp.CCMessage)
|
|||||||
if strings.Contains(device, "loop") {
|
if strings.Contains(device, "loop") {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if _, skip := stringArrayContains(m.config.ExcludeDevices, device); skip {
|
if slices.Contains(m.config.ExcludeDevices, device) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if _, ok := m.devices[device]; !ok {
|
if _, ok := m.devices[device]; !ok {
|
||||||
|
|||||||
@@ -14,7 +14,6 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"log"
|
|
||||||
"os/exec"
|
"os/exec"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
@@ -44,7 +43,9 @@ func (m *IpmiCollector) Init(config json.RawMessage) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
m.name = "IpmiCollector"
|
m.name = "IpmiCollector"
|
||||||
m.setup()
|
if err := m.setup(); err != nil {
|
||||||
|
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
|
||||||
|
}
|
||||||
m.parallel = true
|
m.parallel = true
|
||||||
m.meta = map[string]string{
|
m.meta = map[string]string{
|
||||||
"source": m.name,
|
"source": m.name,
|
||||||
@@ -116,15 +117,16 @@ func (m *IpmiCollector) readIpmiTool(cmd string, output chan lp.CCMessage) {
|
|||||||
}
|
}
|
||||||
v, err := strconv.ParseFloat(strings.TrimSpace(lv[1]), 64)
|
v, err := strconv.ParseFloat(strings.TrimSpace(lv[1]), 64)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
name := strings.ToLower(strings.Replace(strings.TrimSpace(lv[0]), " ", "_", -1))
|
name := strings.ToLower(strings.ReplaceAll(strings.TrimSpace(lv[0]), " ", "_"))
|
||||||
unit := strings.TrimSpace(lv[2])
|
unit := strings.TrimSpace(lv[2])
|
||||||
if unit == "Volts" {
|
switch unit {
|
||||||
|
case "Volts":
|
||||||
unit = "Volts"
|
unit = "Volts"
|
||||||
} else if unit == "degrees C" {
|
case "degrees C":
|
||||||
unit = "degC"
|
unit = "degC"
|
||||||
} else if unit == "degrees F" {
|
case "degrees F":
|
||||||
unit = "degF"
|
unit = "degF"
|
||||||
} else if unit == "Watts" {
|
case "Watts":
|
||||||
unit = "Watts"
|
unit = "Watts"
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -150,22 +152,29 @@ func (m *IpmiCollector) readIpmiTool(cmd string, output chan lp.CCMessage) {
|
|||||||
|
|
||||||
func (m *IpmiCollector) readIpmiSensors(cmd string, output chan lp.CCMessage) {
|
func (m *IpmiCollector) readIpmiSensors(cmd string, output chan lp.CCMessage) {
|
||||||
|
|
||||||
|
// Setup ipmisensors command
|
||||||
command := exec.Command(cmd, "--comma-separated-output", "--sdr-cache-recreate")
|
command := exec.Command(cmd, "--comma-separated-output", "--sdr-cache-recreate")
|
||||||
command.Wait()
|
stdout, _ := command.StdoutPipe()
|
||||||
stdout, err := command.Output()
|
errBuf := new(bytes.Buffer)
|
||||||
if err != nil {
|
command.Stderr = errBuf
|
||||||
log.Print(err)
|
|
||||||
|
// start command
|
||||||
|
if err := command.Start(); err != nil {
|
||||||
|
cclog.ComponentError(
|
||||||
|
m.name,
|
||||||
|
fmt.Sprintf("readIpmiSensors(): Failed to start command \"%s\": %v", command.String(), err),
|
||||||
|
)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
ll := strings.Split(string(stdout), "\n")
|
// Read command output
|
||||||
|
scanner := bufio.NewScanner(stdout)
|
||||||
for _, line := range ll {
|
for scanner.Scan() {
|
||||||
lv := strings.Split(line, ",")
|
lv := strings.Split(scanner.Text(), ",")
|
||||||
if len(lv) > 3 {
|
if len(lv) > 3 {
|
||||||
v, err := strconv.ParseFloat(lv[3], 64)
|
v, err := strconv.ParseFloat(lv[3], 64)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
name := strings.ToLower(strings.Replace(lv[1], " ", "_", -1))
|
name := strings.ToLower(strings.ReplaceAll(lv[1], " ", "_"))
|
||||||
y, err := lp.NewMessage(name, map[string]string{"type": "node"}, m.meta, map[string]interface{}{"value": v}, time.Now())
|
y, err := lp.NewMessage(name, map[string]string{"type": "node"}, m.meta, map[string]interface{}{"value": v}, time.Now())
|
||||||
if err == nil {
|
if err == nil {
|
||||||
if len(lv) > 4 {
|
if len(lv) > 4 {
|
||||||
@@ -176,6 +185,18 @@ func (m *IpmiCollector) readIpmiSensors(cmd string, output chan lp.CCMessage) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Wait for command end
|
||||||
|
if err := command.Wait(); err != nil {
|
||||||
|
errMsg, _ := io.ReadAll(errBuf)
|
||||||
|
cclog.ComponentError(
|
||||||
|
m.name,
|
||||||
|
fmt.Sprintf("readIpmiSensors(): Failed to wait for the end of command \"%s\": %v\n", command.String(), err),
|
||||||
|
)
|
||||||
|
cclog.ComponentError(m.name, fmt.Sprintf("readIpmiSensors(): command stderr: \"%s\"\n", strings.TrimSpace(string(errMsg))))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *IpmiCollector) Read(interval time.Duration, output chan lp.CCMessage) {
|
func (m *IpmiCollector) Read(interval time.Duration, output chan lp.CCMessage) {
|
||||||
|
|||||||
@@ -19,6 +19,7 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"maps"
|
||||||
"math"
|
"math"
|
||||||
"os"
|
"os"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
@@ -187,7 +188,7 @@ func getBaseFreq() float64 {
|
|||||||
for _, f := range files {
|
for _, f := range files {
|
||||||
buffer, err := os.ReadFile(f)
|
buffer, err := os.ReadFile(f)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
data := strings.Replace(string(buffer), "\n", "", -1)
|
data := strings.ReplaceAll(string(buffer), "\n", "")
|
||||||
x, err := strconv.ParseInt(data, 0, 64)
|
x, err := strconv.ParseInt(data, 0, 64)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
freq = float64(x)
|
freq = float64(x)
|
||||||
@@ -230,9 +231,13 @@ func (m *LikwidCollector) Init(config json.RawMessage) error {
|
|||||||
|
|
||||||
if m.config.ForceOverwrite {
|
if m.config.ForceOverwrite {
|
||||||
cclog.ComponentDebug(m.name, "Set LIKWID_FORCE=1")
|
cclog.ComponentDebug(m.name, "Set LIKWID_FORCE=1")
|
||||||
os.Setenv("LIKWID_FORCE", "1")
|
if err := os.Setenv("LIKWID_FORCE", "1"); err != nil {
|
||||||
|
return fmt.Errorf("error setting environment variable LIKWID_FORCE=1: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if err := m.setup(); err != nil {
|
||||||
|
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
|
||||||
}
|
}
|
||||||
m.setup()
|
|
||||||
|
|
||||||
m.meta = map[string]string{"group": "PerfCounter"}
|
m.meta = map[string]string{"group": "PerfCounter"}
|
||||||
cclog.ComponentDebug(m.name, "Get cpulist and init maps and lists")
|
cclog.ComponentDebug(m.name, "Get cpulist and init maps and lists")
|
||||||
@@ -316,7 +321,14 @@ func (m *LikwidCollector) Init(config json.RawMessage) error {
|
|||||||
case "accessdaemon":
|
case "accessdaemon":
|
||||||
if len(m.config.DaemonPath) > 0 {
|
if len(m.config.DaemonPath) > 0 {
|
||||||
p := os.Getenv("PATH")
|
p := os.Getenv("PATH")
|
||||||
os.Setenv("PATH", m.config.DaemonPath+":"+p)
|
if len(p) > 0 {
|
||||||
|
p = m.config.DaemonPath + ":" + p
|
||||||
|
} else {
|
||||||
|
p = m.config.DaemonPath
|
||||||
|
}
|
||||||
|
if err := os.Setenv("PATH", p); err != nil {
|
||||||
|
return fmt.Errorf("error setting environment variable PATH=%s: %v", p, err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
C.HPMmode(1)
|
C.HPMmode(1)
|
||||||
retCode := C.HPMinit()
|
retCode := C.HPMinit()
|
||||||
@@ -375,10 +387,18 @@ func (m *LikwidCollector) takeMeasurement(evidx int, evset LikwidEventsetConfig,
|
|||||||
// Watch changes for the lock file ()
|
// Watch changes for the lock file ()
|
||||||
watcher, err := fsnotify.NewWatcher()
|
watcher, err := fsnotify.NewWatcher()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cclog.ComponentError(m.name, err.Error())
|
cclog.ComponentError(
|
||||||
|
m.name,
|
||||||
|
fmt.Sprintf("takeMeasurement(): Failed to create a new fsnotify.Watcher: %v", err))
|
||||||
return true, err
|
return true, err
|
||||||
}
|
}
|
||||||
defer watcher.Close()
|
defer func() {
|
||||||
|
if err := watcher.Close(); err != nil {
|
||||||
|
cclog.ComponentError(
|
||||||
|
m.name,
|
||||||
|
fmt.Sprintf("takeMeasurement(): Failed to close fsnotify.Watcher: %v", err))
|
||||||
|
}
|
||||||
|
}()
|
||||||
if len(m.config.LockfilePath) > 0 {
|
if len(m.config.LockfilePath) > 0 {
|
||||||
// Check if the lock file exists
|
// Check if the lock file exists
|
||||||
info, err := os.Stat(m.config.LockfilePath)
|
info, err := os.Stat(m.config.LockfilePath)
|
||||||
@@ -388,7 +408,9 @@ func (m *LikwidCollector) takeMeasurement(evidx int, evset LikwidEventsetConfig,
|
|||||||
if createErr != nil {
|
if createErr != nil {
|
||||||
return true, fmt.Errorf("failed to create lock file: %v", createErr)
|
return true, fmt.Errorf("failed to create lock file: %v", createErr)
|
||||||
}
|
}
|
||||||
file.Close()
|
if err := file.Close(); err != nil {
|
||||||
|
return true, fmt.Errorf("failed to close lock file: %v", err)
|
||||||
|
}
|
||||||
info, err = os.Stat(m.config.LockfilePath) // Recheck the file after creation
|
info, err = os.Stat(m.config.LockfilePath) // Recheck the file after creation
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -748,9 +770,7 @@ func (m *LikwidCollector) calcGlobalMetrics(groups []LikwidEventsetConfig, inter
|
|||||||
// Here we generate parameter list
|
// Here we generate parameter list
|
||||||
params := make(map[string]float64)
|
params := make(map[string]float64)
|
||||||
for _, evset := range groups {
|
for _, evset := range groups {
|
||||||
for mname, mres := range evset.metrics[tid] {
|
maps.Copy(params, evset.metrics[tid])
|
||||||
params[mname] = mres
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
params["gotime"] = interval.Seconds()
|
params["gotime"] = interval.Seconds()
|
||||||
// Evaluate the metric
|
// Evaluate the metric
|
||||||
@@ -813,13 +833,21 @@ func (m *LikwidCollector) ReadThread(interval time.Duration, output chan lp.CCMe
|
|||||||
|
|
||||||
if !skip {
|
if !skip {
|
||||||
// read measurements and derive event set metrics
|
// read measurements and derive event set metrics
|
||||||
m.calcEventsetMetrics(e, interval, output)
|
err = m.calcEventsetMetrics(e, interval, output)
|
||||||
|
if err != nil {
|
||||||
|
cclog.ComponentError(m.name, err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
groups = append(groups, e)
|
groups = append(groups, e)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if len(groups) > 0 {
|
if len(groups) > 0 {
|
||||||
// calculate global metrics
|
// calculate global metrics
|
||||||
m.calcGlobalMetrics(groups, interval, output)
|
err = m.calcGlobalMetrics(groups, interval, output)
|
||||||
|
if err != nil {
|
||||||
|
cclog.ComponentError(m.name, err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -11,6 +11,7 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
|
"slices"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
@@ -42,7 +43,9 @@ type LoadavgCollector struct {
|
|||||||
func (m *LoadavgCollector) Init(config json.RawMessage) error {
|
func (m *LoadavgCollector) Init(config json.RawMessage) error {
|
||||||
m.name = "LoadavgCollector"
|
m.name = "LoadavgCollector"
|
||||||
m.parallel = true
|
m.parallel = true
|
||||||
m.setup()
|
if err := m.setup(); err != nil {
|
||||||
|
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
|
||||||
|
}
|
||||||
if len(config) > 0 {
|
if len(config) > 0 {
|
||||||
err := json.Unmarshal(config, &m.config)
|
err := json.Unmarshal(config, &m.config)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -64,10 +67,10 @@ func (m *LoadavgCollector) Init(config json.RawMessage) error {
|
|||||||
m.proc_skips = make([]bool, len(m.proc_matches))
|
m.proc_skips = make([]bool, len(m.proc_matches))
|
||||||
|
|
||||||
for i, name := range m.load_matches {
|
for i, name := range m.load_matches {
|
||||||
_, m.load_skips[i] = stringArrayContains(m.config.ExcludeMetrics, name)
|
m.load_skips[i] = slices.Contains(m.config.ExcludeMetrics, name)
|
||||||
}
|
}
|
||||||
for i, name := range m.proc_matches {
|
for i, name := range m.proc_matches {
|
||||||
_, m.proc_skips[i] = stringArrayContains(m.config.ExcludeMetrics, name)
|
m.proc_skips[i] = slices.Contains(m.config.ExcludeMetrics, name)
|
||||||
}
|
}
|
||||||
m.init = true
|
m.init = true
|
||||||
return nil
|
return nil
|
||||||
|
|||||||
@@ -13,6 +13,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"os/exec"
|
"os/exec"
|
||||||
"os/user"
|
"os/user"
|
||||||
|
"slices"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
@@ -61,7 +62,6 @@ func (m *LustreCollector) getDeviceDataCommand(device string) []string {
|
|||||||
} else {
|
} else {
|
||||||
command = exec.Command(m.lctl, LCTL_OPTION, statsfile)
|
command = exec.Command(m.lctl, LCTL_OPTION, statsfile)
|
||||||
}
|
}
|
||||||
command.Wait()
|
|
||||||
stdout, _ := command.Output()
|
stdout, _ := command.Output()
|
||||||
return strings.Split(string(stdout), "\n")
|
return strings.Split(string(stdout), "\n")
|
||||||
}
|
}
|
||||||
@@ -302,7 +302,9 @@ func (m *LustreCollector) Init(config json.RawMessage) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
m.setup()
|
if err := m.setup(); err != nil {
|
||||||
|
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
|
||||||
|
}
|
||||||
m.tags = map[string]string{"type": "node"}
|
m.tags = map[string]string{"type": "node"}
|
||||||
m.meta = map[string]string{"source": m.name, "group": "Lustre"}
|
m.meta = map[string]string{"source": m.name, "group": "Lustre"}
|
||||||
|
|
||||||
@@ -339,21 +341,21 @@ func (m *LustreCollector) Init(config json.RawMessage) error {
|
|||||||
m.definitions = []LustreMetricDefinition{}
|
m.definitions = []LustreMetricDefinition{}
|
||||||
if m.config.SendAbsoluteValues {
|
if m.config.SendAbsoluteValues {
|
||||||
for _, def := range LustreAbsMetrics {
|
for _, def := range LustreAbsMetrics {
|
||||||
if _, skip := stringArrayContains(m.config.ExcludeMetrics, def.name); !skip {
|
if !slices.Contains(m.config.ExcludeMetrics, def.name) {
|
||||||
m.definitions = append(m.definitions, def)
|
m.definitions = append(m.definitions, def)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if m.config.SendDiffValues {
|
if m.config.SendDiffValues {
|
||||||
for _, def := range LustreDiffMetrics {
|
for _, def := range LustreDiffMetrics {
|
||||||
if _, skip := stringArrayContains(m.config.ExcludeMetrics, def.name); !skip {
|
if !slices.Contains(m.config.ExcludeMetrics, def.name) {
|
||||||
m.definitions = append(m.definitions, def)
|
m.definitions = append(m.definitions, def)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if m.config.SendDerivedValues {
|
if m.config.SendDerivedValues {
|
||||||
for _, def := range LustreDeriveMetrics {
|
for _, def := range LustreDeriveMetrics {
|
||||||
if _, skip := stringArrayContains(m.config.ExcludeMetrics, def.name); !skip {
|
if !slices.Contains(m.config.ExcludeMetrics, def.name) {
|
||||||
m.definitions = append(m.definitions, def)
|
m.definitions = append(m.definitions, def)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -15,6 +15,7 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"regexp"
|
"regexp"
|
||||||
|
"slices"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
@@ -58,7 +59,11 @@ func getStats(filename string) map[string]MemstatStats {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
cclog.Error(err.Error())
|
cclog.Error(err.Error())
|
||||||
}
|
}
|
||||||
defer file.Close()
|
defer func() {
|
||||||
|
if err := file.Close(); err != nil {
|
||||||
|
cclog.Error(err.Error())
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
scanner := bufio.NewScanner(file)
|
scanner := bufio.NewScanner(file)
|
||||||
for scanner.Scan() {
|
for scanner.Scan() {
|
||||||
@@ -115,19 +120,20 @@ func (m *MemstatCollector) Init(config json.RawMessage) error {
|
|||||||
"MemShared": "mem_shared",
|
"MemShared": "mem_shared",
|
||||||
}
|
}
|
||||||
for k, v := range matches {
|
for k, v := range matches {
|
||||||
_, skip := stringArrayContains(m.config.ExcludeMetrics, k)
|
if !slices.Contains(m.config.ExcludeMetrics, k) {
|
||||||
if !skip {
|
|
||||||
m.matches[k] = v
|
m.matches[k] = v
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
m.sendMemUsed = false
|
m.sendMemUsed = false
|
||||||
if _, skip := stringArrayContains(m.config.ExcludeMetrics, "mem_used"); !skip {
|
if !slices.Contains(m.config.ExcludeMetrics, "mem_used") {
|
||||||
m.sendMemUsed = true
|
m.sendMemUsed = true
|
||||||
}
|
}
|
||||||
if len(m.matches) == 0 {
|
if len(m.matches) == 0 {
|
||||||
return errors.New("no metrics to collect")
|
return errors.New("no metrics to collect")
|
||||||
}
|
}
|
||||||
m.setup()
|
if err := m.setup(); err != nil {
|
||||||
|
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
|
||||||
|
}
|
||||||
|
|
||||||
if m.config.NodeStats {
|
if m.config.NodeStats {
|
||||||
if stats := getStats(MEMSTATFILE); len(stats) == 0 {
|
if stats := getStats(MEMSTATFILE); len(stats) == 0 {
|
||||||
@@ -174,7 +180,7 @@ func (m *MemstatCollector) Read(interval time.Duration, output chan lp.CCMessage
|
|||||||
sendStats := func(stats map[string]MemstatStats, tags map[string]string) {
|
sendStats := func(stats map[string]MemstatStats, tags map[string]string) {
|
||||||
for match, name := range m.matches {
|
for match, name := range m.matches {
|
||||||
var value float64 = 0
|
var value float64 = 0
|
||||||
var unit string = ""
|
unit := ""
|
||||||
if v, ok := stats[match]; ok {
|
if v, ok := stats[match]; ok {
|
||||||
value = v.value
|
value = v.value
|
||||||
if len(v.unit) > 0 {
|
if len(v.unit) > 0 {
|
||||||
|
|||||||
@@ -51,30 +51,6 @@ func (c *metricCollector) Initialized() bool {
|
|||||||
return c.init
|
return c.init
|
||||||
}
|
}
|
||||||
|
|
||||||
// intArrayContains scans an array of ints if the value str is present in the array
|
|
||||||
// If the specified value is found, the corresponding array index is returned.
|
|
||||||
// The bool value is used to signal success or failure
|
|
||||||
func intArrayContains(array []int, str int) (int, bool) {
|
|
||||||
for i, a := range array {
|
|
||||||
if a == str {
|
|
||||||
return i, true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return -1, false
|
|
||||||
}
|
|
||||||
|
|
||||||
// stringArrayContains scans an array of strings if the value str is present in the array
|
|
||||||
// If the specified value is found, the corresponding array index is returned.
|
|
||||||
// The bool value is used to signal success or failure
|
|
||||||
func stringArrayContains(array []string, str string) (int, bool) {
|
|
||||||
for i, a := range array {
|
|
||||||
if a == str {
|
|
||||||
return i, true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return -1, false
|
|
||||||
}
|
|
||||||
|
|
||||||
// RemoveFromStringList removes the string r from the array of strings s
|
// RemoveFromStringList removes the string r from the array of strings s
|
||||||
// If r is not contained in the array an error is returned
|
// If r is not contained in the array an error is returned
|
||||||
func RemoveFromStringList(s []string, r string) ([]string, error) {
|
func RemoveFromStringList(s []string, r string) ([]string, error) {
|
||||||
|
|||||||
@@ -10,8 +10,9 @@ package collectors
|
|||||||
import (
|
import (
|
||||||
"bufio"
|
"bufio"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
|
"slices"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
@@ -65,7 +66,9 @@ func getCanonicalName(raw string, aliasToCanonical map[string]string) string {
|
|||||||
func (m *NetstatCollector) Init(config json.RawMessage) error {
|
func (m *NetstatCollector) Init(config json.RawMessage) error {
|
||||||
m.name = "NetstatCollector"
|
m.name = "NetstatCollector"
|
||||||
m.parallel = true
|
m.parallel = true
|
||||||
m.setup()
|
if err := m.setup(); err != nil {
|
||||||
|
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
|
||||||
|
}
|
||||||
m.lastTimestamp = time.Now()
|
m.lastTimestamp = time.Now()
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@@ -107,10 +110,8 @@ func (m *NetstatCollector) Init(config json.RawMessage) error {
|
|||||||
// Check access to net statistic file
|
// Check access to net statistic file
|
||||||
file, err := os.Open(NETSTATFILE)
|
file, err := os.Open(NETSTATFILE)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cclog.ComponentError(m.name, err.Error())
|
return fmt.Errorf("%s Init(): failed to open netstat file \"%s\": %w", m.name, NETSTATFILE, err)
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
defer file.Close()
|
|
||||||
|
|
||||||
scanner := bufio.NewScanner(file)
|
scanner := bufio.NewScanner(file)
|
||||||
for scanner.Scan() {
|
for scanner.Scan() {
|
||||||
@@ -129,7 +130,7 @@ func (m *NetstatCollector) Init(config json.RawMessage) error {
|
|||||||
canonical := getCanonicalName(raw, m.aliasToCanonical)
|
canonical := getCanonicalName(raw, m.aliasToCanonical)
|
||||||
|
|
||||||
// Check if device is a included device
|
// Check if device is a included device
|
||||||
if _, ok := stringArrayContains(m.config.IncludeDevices, canonical); ok {
|
if slices.Contains(m.config.IncludeDevices, canonical) {
|
||||||
// Tag will contain original device name (raw).
|
// Tag will contain original device name (raw).
|
||||||
tags := map[string]string{"stype": "network", "stype-id": raw, "type": "node"}
|
tags := map[string]string{"stype": "network", "stype-id": raw, "type": "node"}
|
||||||
meta_unit_byte := map[string]string{"source": m.name, "group": "Network", "unit": "bytes"}
|
meta_unit_byte := map[string]string{"source": m.name, "group": "Network", "unit": "bytes"}
|
||||||
@@ -174,8 +175,13 @@ func (m *NetstatCollector) Init(config json.RawMessage) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Close netstat file
|
||||||
|
if err := file.Close(); err != nil {
|
||||||
|
return fmt.Errorf("%s Init(): failed to close netstat file \"%s\": %w", m.name, NETSTATFILE, err)
|
||||||
|
}
|
||||||
|
|
||||||
if len(m.matches) == 0 {
|
if len(m.matches) == 0 {
|
||||||
return errors.New("no devices to collector metrics found")
|
return fmt.Errorf("%s Init(): no devices to collect metrics found", m.name)
|
||||||
}
|
}
|
||||||
m.init = true
|
m.init = true
|
||||||
return nil
|
return nil
|
||||||
@@ -194,10 +200,18 @@ func (m *NetstatCollector) Read(interval time.Duration, output chan lp.CCMessage
|
|||||||
|
|
||||||
file, err := os.Open(NETSTATFILE)
|
file, err := os.Open(NETSTATFILE)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cclog.ComponentError(m.name, err.Error())
|
cclog.ComponentError(
|
||||||
|
m.name,
|
||||||
|
fmt.Sprintf("Read(): Failed to open file '%s': %v", NETSTATFILE, err))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
defer file.Close()
|
defer func() {
|
||||||
|
if err := file.Close(); err != nil {
|
||||||
|
cclog.ComponentError(
|
||||||
|
m.name,
|
||||||
|
fmt.Sprintf("Read(): Failed to close file '%s': %v", NETSTATFILE, err))
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
scanner := bufio.NewScanner(file)
|
scanner := bufio.NewScanner(file)
|
||||||
for scanner.Scan() {
|
for scanner.Scan() {
|
||||||
|
|||||||
@@ -11,6 +11,7 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
|
"slices"
|
||||||
|
|
||||||
// "os"
|
// "os"
|
||||||
"os/exec"
|
"os/exec"
|
||||||
@@ -18,6 +19,7 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
|
||||||
lp "github.com/ClusterCockpit/cc-lib/v2/ccMessage"
|
lp "github.com/ClusterCockpit/cc-lib/v2/ccMessage"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -44,10 +46,15 @@ type nfsCollector struct {
|
|||||||
|
|
||||||
func (m *nfsCollector) initStats() error {
|
func (m *nfsCollector) initStats() error {
|
||||||
cmd := exec.Command(m.config.Nfsstats, `-l`, `--all`)
|
cmd := exec.Command(m.config.Nfsstats, `-l`, `--all`)
|
||||||
cmd.Wait()
|
|
||||||
|
// Wait for cmd end
|
||||||
|
if err := cmd.Wait(); err != nil {
|
||||||
|
return fmt.Errorf("initStats(): %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
buffer, err := cmd.Output()
|
buffer, err := cmd.Output()
|
||||||
if err == nil {
|
if err == nil {
|
||||||
for _, line := range strings.Split(string(buffer), "\n") {
|
for line := range strings.Lines(string(buffer)) {
|
||||||
lf := strings.Fields(line)
|
lf := strings.Fields(line)
|
||||||
if len(lf) != 5 {
|
if len(lf) != 5 {
|
||||||
continue
|
continue
|
||||||
@@ -71,10 +78,15 @@ func (m *nfsCollector) initStats() error {
|
|||||||
|
|
||||||
func (m *nfsCollector) updateStats() error {
|
func (m *nfsCollector) updateStats() error {
|
||||||
cmd := exec.Command(m.config.Nfsstats, `-l`, `--all`)
|
cmd := exec.Command(m.config.Nfsstats, `-l`, `--all`)
|
||||||
cmd.Wait()
|
|
||||||
|
// Wait for cmd end
|
||||||
|
if err := cmd.Wait(); err != nil {
|
||||||
|
return fmt.Errorf("updateStats(): %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
buffer, err := cmd.Output()
|
buffer, err := cmd.Output()
|
||||||
if err == nil {
|
if err == nil {
|
||||||
for _, line := range strings.Split(string(buffer), "\n") {
|
for line := range strings.Lines(string(buffer)) {
|
||||||
lf := strings.Fields(line)
|
lf := strings.Fields(line)
|
||||||
if len(lf) != 5 {
|
if len(lf) != 5 {
|
||||||
continue
|
continue
|
||||||
@@ -119,7 +131,9 @@ func (m *nfsCollector) MainInit(config json.RawMessage) error {
|
|||||||
return fmt.Errorf("NfsCollector.Init(): Failed to find nfsstat binary '%s': %v", m.config.Nfsstats, err)
|
return fmt.Errorf("NfsCollector.Init(): Failed to find nfsstat binary '%s': %v", m.config.Nfsstats, err)
|
||||||
}
|
}
|
||||||
m.data = make(map[string]NfsCollectorData)
|
m.data = make(map[string]NfsCollectorData)
|
||||||
m.initStats()
|
if err := m.initStats(); err != nil {
|
||||||
|
return fmt.Errorf("NfsCollector.Init(): %w", err)
|
||||||
|
}
|
||||||
m.init = true
|
m.init = true
|
||||||
m.parallel = true
|
m.parallel = true
|
||||||
return nil
|
return nil
|
||||||
@@ -131,7 +145,13 @@ func (m *nfsCollector) Read(interval time.Duration, output chan lp.CCMessage) {
|
|||||||
}
|
}
|
||||||
timestamp := time.Now()
|
timestamp := time.Now()
|
||||||
|
|
||||||
m.updateStats()
|
if err := m.updateStats(); err != nil {
|
||||||
|
cclog.ComponentError(
|
||||||
|
m.name,
|
||||||
|
fmt.Sprintf("Read(): updateStats() failed: %v", err),
|
||||||
|
)
|
||||||
|
return
|
||||||
|
}
|
||||||
prefix := ""
|
prefix := ""
|
||||||
switch m.version {
|
switch m.version {
|
||||||
case "v3":
|
case "v3":
|
||||||
@@ -143,7 +163,7 @@ func (m *nfsCollector) Read(interval time.Duration, output chan lp.CCMessage) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for name, data := range m.data {
|
for name, data := range m.data {
|
||||||
if _, skip := stringArrayContains(m.config.ExcludeMetrics, name); skip {
|
if slices.Contains(m.config.ExcludeMetrics, name) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
value := data.current - data.last
|
value := data.current - data.last
|
||||||
@@ -170,13 +190,17 @@ type Nfs4Collector struct {
|
|||||||
func (m *Nfs3Collector) Init(config json.RawMessage) error {
|
func (m *Nfs3Collector) Init(config json.RawMessage) error {
|
||||||
m.name = "Nfs3Collector"
|
m.name = "Nfs3Collector"
|
||||||
m.version = `v3`
|
m.version = `v3`
|
||||||
m.setup()
|
if err := m.setup(); err != nil {
|
||||||
|
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
|
||||||
|
}
|
||||||
return m.MainInit(config)
|
return m.MainInit(config)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Nfs4Collector) Init(config json.RawMessage) error {
|
func (m *Nfs4Collector) Init(config json.RawMessage) error {
|
||||||
m.name = "Nfs4Collector"
|
m.name = "Nfs4Collector"
|
||||||
m.version = `v4`
|
m.version = `v4`
|
||||||
m.setup()
|
if err := m.setup(); err != nil {
|
||||||
|
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
|
||||||
|
}
|
||||||
return m.MainInit(config)
|
return m.MainInit(config)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -12,6 +12,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"regexp"
|
"regexp"
|
||||||
|
"slices"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
@@ -71,7 +72,7 @@ func (m *NfsIOStatCollector) readNfsiostats() map[string]map[string]int64 {
|
|||||||
// Is this a device line with mount point, remote target and NFS version?
|
// Is this a device line with mount point, remote target and NFS version?
|
||||||
dev := resolve_regex_fields(l, deviceRegex)
|
dev := resolve_regex_fields(l, deviceRegex)
|
||||||
if len(dev) > 0 {
|
if len(dev) > 0 {
|
||||||
if _, ok := stringArrayContains(m.config.ExcludeFilesystem, dev[m.key]); !ok {
|
if !slices.Contains(m.config.ExcludeFilesystem, dev[m.key]) {
|
||||||
current = dev
|
current = dev
|
||||||
if len(current["version"]) == 0 {
|
if len(current["version"]) == 0 {
|
||||||
current["version"] = "3"
|
current["version"] = "3"
|
||||||
@@ -85,7 +86,7 @@ func (m *NfsIOStatCollector) readNfsiostats() map[string]map[string]int64 {
|
|||||||
if len(bytes) > 0 {
|
if len(bytes) > 0 {
|
||||||
data[current[m.key]] = make(map[string]int64)
|
data[current[m.key]] = make(map[string]int64)
|
||||||
for name, sval := range bytes {
|
for name, sval := range bytes {
|
||||||
if _, ok := stringArrayContains(m.config.ExcludeMetrics, name); !ok {
|
if !slices.Contains(m.config.ExcludeMetrics, name) {
|
||||||
val, err := strconv.ParseInt(sval, 10, 64)
|
val, err := strconv.ParseInt(sval, 10, 64)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
data[current[m.key]][name] = val
|
data[current[m.key]][name] = val
|
||||||
@@ -102,7 +103,9 @@ func (m *NfsIOStatCollector) readNfsiostats() map[string]map[string]int64 {
|
|||||||
func (m *NfsIOStatCollector) Init(config json.RawMessage) error {
|
func (m *NfsIOStatCollector) Init(config json.RawMessage) error {
|
||||||
var err error = nil
|
var err error = nil
|
||||||
m.name = "NfsIOStatCollector"
|
m.name = "NfsIOStatCollector"
|
||||||
m.setup()
|
if err := m.setup(); err != nil {
|
||||||
|
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
|
||||||
|
}
|
||||||
m.parallel = true
|
m.parallel = true
|
||||||
m.meta = map[string]string{"source": m.name, "group": "NFS", "unit": "bytes"}
|
m.meta = map[string]string{"source": m.name, "group": "NFS", "unit": "bytes"}
|
||||||
m.tags = map[string]string{"type": "node"}
|
m.tags = map[string]string{"type": "node"}
|
||||||
|
|||||||
@@ -72,7 +72,9 @@ func (m *NUMAStatsCollector) Init(config json.RawMessage) error {
|
|||||||
|
|
||||||
m.name = "NUMAStatsCollector"
|
m.name = "NUMAStatsCollector"
|
||||||
m.parallel = true
|
m.parallel = true
|
||||||
m.setup()
|
if err := m.setup(); err != nil {
|
||||||
|
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
|
||||||
|
}
|
||||||
m.meta = map[string]string{
|
m.meta = map[string]string{
|
||||||
"source": m.name,
|
"source": m.name,
|
||||||
"group": "NUMA",
|
"group": "NUMA",
|
||||||
@@ -186,7 +188,11 @@ func (m *NUMAStatsCollector) Read(interval time.Duration, output chan lp.CCMessa
|
|||||||
t.previousValues[key] = value
|
t.previousValues[key] = value
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
file.Close()
|
if err := file.Close(); err != nil {
|
||||||
|
cclog.ComponentError(
|
||||||
|
m.name,
|
||||||
|
fmt.Sprintf("Read(): Failed to close file '%s': %v", t.file, err))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -12,6 +12,8 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
|
"maps"
|
||||||
|
"slices"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -64,7 +66,9 @@ func (m *NvidiaCollector) Init(config json.RawMessage) error {
|
|||||||
m.config.ProcessMigDevices = false
|
m.config.ProcessMigDevices = false
|
||||||
m.config.UseUuidForMigDevices = false
|
m.config.UseUuidForMigDevices = false
|
||||||
m.config.UseSliceForMigDevices = false
|
m.config.UseSliceForMigDevices = false
|
||||||
m.setup()
|
if err := m.setup(); err != nil {
|
||||||
|
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
|
||||||
|
}
|
||||||
if len(config) > 0 {
|
if len(config) > 0 {
|
||||||
err = json.Unmarshal(config, &m.config)
|
err = json.Unmarshal(config, &m.config)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -109,7 +113,7 @@ func (m *NvidiaCollector) Init(config json.RawMessage) error {
|
|||||||
|
|
||||||
// Skip excluded devices by ID
|
// Skip excluded devices by ID
|
||||||
str_i := fmt.Sprintf("%d", i)
|
str_i := fmt.Sprintf("%d", i)
|
||||||
if _, skip := stringArrayContains(m.config.ExcludeDevices, str_i); skip {
|
if slices.Contains(m.config.ExcludeDevices, str_i) {
|
||||||
cclog.ComponentDebug(m.name, "Skipping excluded device", str_i)
|
cclog.ComponentDebug(m.name, "Skipping excluded device", str_i)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@@ -137,7 +141,7 @@ func (m *NvidiaCollector) Init(config json.RawMessage) error {
|
|||||||
pciInfo.Device)
|
pciInfo.Device)
|
||||||
|
|
||||||
// Skip excluded devices specified by PCI ID
|
// Skip excluded devices specified by PCI ID
|
||||||
if _, skip := stringArrayContains(m.config.ExcludeDevices, pci_id); skip {
|
if slices.Contains(m.config.ExcludeDevices, pci_id) {
|
||||||
cclog.ComponentDebug(m.name, "Skipping excluded device", pci_id)
|
cclog.ComponentDebug(m.name, "Skipping excluded device", pci_id)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@@ -222,7 +226,7 @@ func readMemoryInfo(device *NvidiaCollectorDevice, output chan lp.CCMessage) err
|
|||||||
var total uint64
|
var total uint64
|
||||||
var used uint64
|
var used uint64
|
||||||
var reserved uint64 = 0
|
var reserved uint64 = 0
|
||||||
var v2 bool = false
|
v2 := false
|
||||||
meminfo, ret := nvml.DeviceGetMemoryInfo(device.device)
|
meminfo, ret := nvml.DeviceGetMemoryInfo(device.device)
|
||||||
if ret != nvml.SUCCESS {
|
if ret != nvml.SUCCESS {
|
||||||
err := errors.New(nvml.ErrorString(ret))
|
err := errors.New(nvml.ErrorString(ret))
|
||||||
@@ -405,7 +409,8 @@ func readEccMode(device *NvidiaCollectorDevice, output chan lp.CCMessage) error
|
|||||||
// Changing ECC modes requires a reboot.
|
// Changing ECC modes requires a reboot.
|
||||||
// The "pending" ECC mode refers to the target mode following the next reboot.
|
// The "pending" ECC mode refers to the target mode following the next reboot.
|
||||||
_, ecc_pend, ret := nvml.DeviceGetEccMode(device.device)
|
_, ecc_pend, ret := nvml.DeviceGetEccMode(device.device)
|
||||||
if ret == nvml.SUCCESS {
|
switch ret {
|
||||||
|
case nvml.SUCCESS:
|
||||||
var y lp.CCMessage
|
var y lp.CCMessage
|
||||||
var err error
|
var err error
|
||||||
switch ecc_pend {
|
switch ecc_pend {
|
||||||
@@ -419,7 +424,7 @@ func readEccMode(device *NvidiaCollectorDevice, output chan lp.CCMessage) error
|
|||||||
if err == nil {
|
if err == nil {
|
||||||
output <- y
|
output <- y
|
||||||
}
|
}
|
||||||
} else if ret == nvml.ERROR_NOT_SUPPORTED {
|
case nvml.ERROR_NOT_SUPPORTED:
|
||||||
y, err := lp.NewMessage("nv_ecc_mode", device.tags, device.meta, map[string]interface{}{"value": "N/A"}, time.Now())
|
y, err := lp.NewMessage("nv_ecc_mode", device.tags, device.meta, map[string]interface{}{"value": "N/A"}, time.Now())
|
||||||
if err == nil {
|
if err == nil {
|
||||||
output <- y
|
output <- y
|
||||||
@@ -768,7 +773,7 @@ func readRemappedRows(device *NvidiaCollectorDevice, output chan lp.CCMessage) e
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if !device.excludeMetrics["nv_remapped_rows_pending"] {
|
if !device.excludeMetrics["nv_remapped_rows_pending"] {
|
||||||
var p int = 0
|
p := 0
|
||||||
if pending {
|
if pending {
|
||||||
p = 1
|
p = 1
|
||||||
}
|
}
|
||||||
@@ -778,7 +783,7 @@ func readRemappedRows(device *NvidiaCollectorDevice, output chan lp.CCMessage) e
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if !device.excludeMetrics["nv_remapped_rows_failure"] {
|
if !device.excludeMetrics["nv_remapped_rows_failure"] {
|
||||||
var f int = 0
|
f := 0
|
||||||
if failure {
|
if failure {
|
||||||
f = 1
|
f = 1
|
||||||
}
|
}
|
||||||
@@ -1275,9 +1280,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMessage)
|
|||||||
meta: map[string]string{},
|
meta: map[string]string{},
|
||||||
excludeMetrics: excludeMetrics,
|
excludeMetrics: excludeMetrics,
|
||||||
}
|
}
|
||||||
for k, v := range m.gpus[i].tags {
|
maps.Copy(migDevice.tags, m.gpus[i].tags)
|
||||||
migDevice.tags[k] = v
|
|
||||||
}
|
|
||||||
migDevice.tags["stype"] = "mig"
|
migDevice.tags["stype"] = "mig"
|
||||||
if m.config.UseUuidForMigDevices {
|
if m.config.UseUuidForMigDevices {
|
||||||
uuid, ret := nvml.DeviceGetUUID(mdev)
|
uuid, ret := nvml.DeviceGetUUID(mdev)
|
||||||
@@ -1291,8 +1294,8 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMessage)
|
|||||||
if ret == nvml.SUCCESS {
|
if ret == nvml.SUCCESS {
|
||||||
mname, ret := nvml.DeviceGetName(mdev)
|
mname, ret := nvml.DeviceGetName(mdev)
|
||||||
if ret == nvml.SUCCESS {
|
if ret == nvml.SUCCESS {
|
||||||
x := strings.Replace(mname, name, "", -1)
|
x := strings.ReplaceAll(mname, name, "")
|
||||||
x = strings.Replace(x, "MIG", "", -1)
|
x = strings.ReplaceAll(x, "MIG", "")
|
||||||
x = strings.TrimSpace(x)
|
x = strings.TrimSpace(x)
|
||||||
migDevice.tags["stype-id"] = x
|
migDevice.tags["stype-id"] = x
|
||||||
}
|
}
|
||||||
@@ -1301,9 +1304,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMessage)
|
|||||||
if _, ok := migDevice.tags["stype-id"]; !ok {
|
if _, ok := migDevice.tags["stype-id"]; !ok {
|
||||||
migDevice.tags["stype-id"] = fmt.Sprintf("%d", j)
|
migDevice.tags["stype-id"] = fmt.Sprintf("%d", j)
|
||||||
}
|
}
|
||||||
for k, v := range m.gpus[i].meta {
|
maps.Copy(migDevice.meta, m.gpus[i].meta)
|
||||||
migDevice.meta[k] = v
|
|
||||||
}
|
|
||||||
if _, ok := migDevice.meta["uuid"]; ok && !m.config.UseUuidForMigDevices {
|
if _, ok := migDevice.meta["uuid"]; ok && !m.config.UseUuidForMigDevices {
|
||||||
uuid, ret := nvml.DeviceGetUUID(mdev)
|
uuid, ret := nvml.DeviceGetUUID(mdev)
|
||||||
if ret == nvml.SUCCESS {
|
if ret == nvml.SUCCESS {
|
||||||
@@ -1319,7 +1320,9 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMessage)
|
|||||||
|
|
||||||
func (m *NvidiaCollector) Close() {
|
func (m *NvidiaCollector) Close() {
|
||||||
if m.init {
|
if m.init {
|
||||||
nvml.Shutdown()
|
if ret := nvml.Shutdown(); ret != nvml.SUCCESS {
|
||||||
|
cclog.ComponentError(m.name, "nvml.Shutdown() not successful")
|
||||||
|
}
|
||||||
m.init = false
|
m.init = false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -54,9 +54,10 @@ func (m *RAPLCollector) Init(config json.RawMessage) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var err error = nil
|
|
||||||
m.name = "RAPLCollector"
|
m.name = "RAPLCollector"
|
||||||
m.setup()
|
if err := m.setup(); err != nil {
|
||||||
|
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
|
||||||
|
}
|
||||||
m.parallel = true
|
m.parallel = true
|
||||||
m.meta = map[string]string{
|
m.meta = map[string]string{
|
||||||
"source": m.name,
|
"source": m.name,
|
||||||
@@ -66,7 +67,7 @@ func (m *RAPLCollector) Init(config json.RawMessage) error {
|
|||||||
|
|
||||||
// Read in the JSON configuration
|
// Read in the JSON configuration
|
||||||
if len(config) > 0 {
|
if len(config) > 0 {
|
||||||
err = json.Unmarshal(config, &m.config)
|
err := json.Unmarshal(config, &m.config)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cclog.ComponentError(m.name, "Error reading config:", err.Error())
|
cclog.ComponentError(m.name, "Error reading config:", err.Error())
|
||||||
return err
|
return err
|
||||||
|
|||||||
@@ -52,7 +52,9 @@ func (m *RocmSmiCollector) Init(config json.RawMessage) error {
|
|||||||
// Always set the name early in Init() to use it in cclog.Component* functions
|
// Always set the name early in Init() to use it in cclog.Component* functions
|
||||||
m.name = "RocmSmiCollector"
|
m.name = "RocmSmiCollector"
|
||||||
// This is for later use, also call it early
|
// This is for later use, also call it early
|
||||||
m.setup()
|
if err := m.setup(); err != nil {
|
||||||
|
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
|
||||||
|
}
|
||||||
// Define meta information sent with each metric
|
// Define meta information sent with each metric
|
||||||
// (Can also be dynamic or this is the basic set with extension through AddMeta())
|
// (Can also be dynamic or this is the basic set with extension through AddMeta())
|
||||||
//m.meta = map[string]string{"source": m.name, "group": "AMD"}
|
//m.meta = map[string]string{"source": m.name, "group": "AMD"}
|
||||||
|
|||||||
@@ -9,6 +9,7 @@ package collectors
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
|
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
|
||||||
@@ -41,7 +42,9 @@ func (m *SampleCollector) Init(config json.RawMessage) error {
|
|||||||
// Always set the name early in Init() to use it in cclog.Component* functions
|
// Always set the name early in Init() to use it in cclog.Component* functions
|
||||||
m.name = "SampleCollector"
|
m.name = "SampleCollector"
|
||||||
// This is for later use, also call it early
|
// This is for later use, also call it early
|
||||||
m.setup()
|
if err := m.setup(); err != nil {
|
||||||
|
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
|
||||||
|
}
|
||||||
// Tell whether the collector should be run in parallel with others (reading files, ...)
|
// Tell whether the collector should be run in parallel with others (reading files, ...)
|
||||||
// or it should be run serially, mostly for collectors actually doing measurements
|
// or it should be run serially, mostly for collectors actually doing measurements
|
||||||
// because they should not measure the execution of the other collectors
|
// because they should not measure the execution of the other collectors
|
||||||
|
|||||||
@@ -9,6 +9,7 @@ package collectors
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -40,7 +41,9 @@ func (m *SampleTimerCollector) Init(name string, config json.RawMessage) error {
|
|||||||
// Always set the name early in Init() to use it in cclog.Component* functions
|
// Always set the name early in Init() to use it in cclog.Component* functions
|
||||||
m.name = "SampleTimerCollector"
|
m.name = "SampleTimerCollector"
|
||||||
// This is for later use, also call it early
|
// This is for later use, also call it early
|
||||||
m.setup()
|
if err := m.setup(); err != nil {
|
||||||
|
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
|
||||||
|
}
|
||||||
// Define meta information sent with each metric
|
// Define meta information sent with each metric
|
||||||
// (Can also be dynamic or this is the basic set with extension through AddMeta())
|
// (Can also be dynamic or this is the basic set with extension through AddMeta())
|
||||||
m.meta = map[string]string{"source": m.name, "group": "SAMPLE"}
|
m.meta = map[string]string{"source": m.name, "group": "SAMPLE"}
|
||||||
|
|||||||
@@ -11,7 +11,6 @@ import (
|
|||||||
"bufio"
|
"bufio"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math"
|
|
||||||
"os"
|
"os"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
@@ -47,37 +46,37 @@ type SchedstatCollector struct {
|
|||||||
// Called once by the collector manager
|
// Called once by the collector manager
|
||||||
// All tags, meta data tags and metrics that do not change over the runtime should be set here
|
// All tags, meta data tags and metrics that do not change over the runtime should be set here
|
||||||
func (m *SchedstatCollector) Init(config json.RawMessage) error {
|
func (m *SchedstatCollector) Init(config json.RawMessage) error {
|
||||||
var err error = nil
|
|
||||||
// Always set the name early in Init() to use it in cclog.Component* functions
|
// Always set the name early in Init() to use it in cclog.Component* functions
|
||||||
m.name = "SchedstatCollector"
|
m.name = "SchedstatCollector"
|
||||||
// This is for later use, also call it early
|
// This is for later use, also call it early
|
||||||
m.setup()
|
if err := m.setup(); err != nil {
|
||||||
|
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
|
||||||
|
}
|
||||||
// Tell whether the collector should be run in parallel with others (reading files, ...)
|
// Tell whether the collector should be run in parallel with others (reading files, ...)
|
||||||
// or it should be run serially, mostly for collectors acutally doing measurements
|
// or it should be run serially, mostly for collectors acutally doing measurements
|
||||||
// because they should not measure the execution of the other collectors
|
// because they should not measure the execution of the other collectors
|
||||||
m.parallel = true
|
m.parallel = true
|
||||||
// Define meta information sent with each metric
|
// Define meta information sent with each metric
|
||||||
// (Can also be dynamic or this is the basic set with extension through AddMeta())
|
// (Can also be dynamic or this is the basic set with extension through AddMeta())
|
||||||
m.meta = map[string]string{"source": m.name, "group": "SCHEDSTAT"}
|
m.meta = map[string]string{
|
||||||
|
"source": m.name,
|
||||||
|
"group": "SCHEDSTAT",
|
||||||
|
}
|
||||||
|
|
||||||
// Read in the JSON configuration
|
// Read in the JSON configuration
|
||||||
if len(config) > 0 {
|
if len(config) > 0 {
|
||||||
err = json.Unmarshal(config, &m.config)
|
if err := json.Unmarshal(config, &m.config); err != nil {
|
||||||
if err != nil {
|
return fmt.Errorf("%s Init(): Error reading config: %w", m.name, err)
|
||||||
cclog.ComponentError(m.name, "Error reading config:", err.Error())
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check input file
|
// Check input file
|
||||||
file, err := os.Open(string(SCHEDSTATFILE))
|
file, err := os.Open(SCHEDSTATFILE)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cclog.ComponentError(m.name, err.Error())
|
return fmt.Errorf("%s Init(): Failed opening scheduler statistics file \"%s\": %w", m.name, SCHEDSTATFILE, err)
|
||||||
}
|
}
|
||||||
defer file.Close()
|
|
||||||
|
|
||||||
// Pre-generate tags for all CPUs
|
// Pre-generate tags for all CPUs
|
||||||
num_cpus := 0
|
|
||||||
m.cputags = make(map[string]map[string]string)
|
m.cputags = make(map[string]map[string]string)
|
||||||
m.olddata = make(map[string]map[string]int64)
|
m.olddata = make(map[string]map[string]int64)
|
||||||
scanner := bufio.NewScanner(file)
|
scanner := bufio.NewScanner(file)
|
||||||
@@ -89,11 +88,19 @@ func (m *SchedstatCollector) Init(config json.RawMessage) error {
|
|||||||
cpu, _ := strconv.Atoi(cpustr)
|
cpu, _ := strconv.Atoi(cpustr)
|
||||||
running, _ := strconv.ParseInt(linefields[7], 10, 64)
|
running, _ := strconv.ParseInt(linefields[7], 10, 64)
|
||||||
waiting, _ := strconv.ParseInt(linefields[8], 10, 64)
|
waiting, _ := strconv.ParseInt(linefields[8], 10, 64)
|
||||||
m.cputags[linefields[0]] = map[string]string{"type": "hwthread", "type-id": fmt.Sprintf("%d", cpu)}
|
m.cputags[linefields[0]] = map[string]string{
|
||||||
m.olddata[linefields[0]] = map[string]int64{"running": running, "waiting": waiting}
|
"type": "hwthread",
|
||||||
num_cpus++
|
"type-id": fmt.Sprintf("%d", cpu),
|
||||||
|
}
|
||||||
|
m.olddata[linefields[0]] = map[string]int64{
|
||||||
|
"running": running,
|
||||||
|
"waiting": waiting,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if err := file.Close(); err != nil {
|
||||||
|
return fmt.Errorf("%s Init(): Failed closing scheduler statistics file \"%s\": %w", m.name, SCHEDSTATFILE, err)
|
||||||
|
}
|
||||||
|
|
||||||
// Save current timestamp
|
// Save current timestamp
|
||||||
m.lastTimestamp = time.Now()
|
m.lastTimestamp = time.Now()
|
||||||
@@ -109,8 +116,8 @@ func (m *SchedstatCollector) ParseProcLine(linefields []string, tags map[string]
|
|||||||
diff_running := running - m.olddata[linefields[0]]["running"]
|
diff_running := running - m.olddata[linefields[0]]["running"]
|
||||||
diff_waiting := waiting - m.olddata[linefields[0]]["waiting"]
|
diff_waiting := waiting - m.olddata[linefields[0]]["waiting"]
|
||||||
|
|
||||||
var l_running float64 = float64(diff_running) / tsdelta.Seconds() / (math.Pow(1000, 3))
|
l_running := float64(diff_running) / tsdelta.Seconds() / 1000_000_000
|
||||||
var l_waiting float64 = float64(diff_waiting) / tsdelta.Seconds() / (math.Pow(1000, 3))
|
l_waiting := float64(diff_waiting) / tsdelta.Seconds() / 1000_000_000
|
||||||
|
|
||||||
m.olddata[linefields[0]]["running"] = running
|
m.olddata[linefields[0]]["running"] = running
|
||||||
m.olddata[linefields[0]]["waiting"] = waiting
|
m.olddata[linefields[0]]["waiting"] = waiting
|
||||||
@@ -134,11 +141,19 @@ func (m *SchedstatCollector) Read(interval time.Duration, output chan lp.CCMessa
|
|||||||
now := time.Now()
|
now := time.Now()
|
||||||
tsdelta := now.Sub(m.lastTimestamp)
|
tsdelta := now.Sub(m.lastTimestamp)
|
||||||
|
|
||||||
file, err := os.Open(string(SCHEDSTATFILE))
|
file, err := os.Open(SCHEDSTATFILE)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cclog.ComponentError(m.name, err.Error())
|
cclog.ComponentError(
|
||||||
|
m.name,
|
||||||
|
fmt.Sprintf("Read(): Failed to open file '%s': %v", SCHEDSTATFILE, err))
|
||||||
}
|
}
|
||||||
defer file.Close()
|
defer func() {
|
||||||
|
if err := file.Close(); err != nil {
|
||||||
|
cclog.ComponentError(
|
||||||
|
m.name,
|
||||||
|
fmt.Sprintf("Read(): Failed to close file '%s': %v", SCHEDSTATFILE, err))
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
scanner := bufio.NewScanner(file)
|
scanner := bufio.NewScanner(file)
|
||||||
for scanner.Scan() {
|
for scanner.Scan() {
|
||||||
|
|||||||
@@ -9,6 +9,7 @@ package collectors
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
"runtime"
|
"runtime"
|
||||||
"syscall"
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
@@ -34,7 +35,9 @@ type SelfCollector struct {
|
|||||||
func (m *SelfCollector) Init(config json.RawMessage) error {
|
func (m *SelfCollector) Init(config json.RawMessage) error {
|
||||||
var err error = nil
|
var err error = nil
|
||||||
m.name = "SelfCollector"
|
m.name = "SelfCollector"
|
||||||
m.setup()
|
if err := m.setup(); err != nil {
|
||||||
|
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
|
||||||
|
}
|
||||||
m.parallel = true
|
m.parallel = true
|
||||||
m.meta = map[string]string{"source": m.name, "group": "Self"}
|
m.meta = map[string]string{"source": m.name, "group": "Self"}
|
||||||
m.tags = map[string]string{"type": "node"}
|
m.tags = map[string]string{"type": "node"}
|
||||||
|
|||||||
@@ -50,8 +50,7 @@ func ParseCPUs(cpuset string) ([]int, error) {
|
|||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
ranges := strings.Split(cpuset, ",")
|
for r := range strings.SplitSeq(cpuset, ",") {
|
||||||
for _, r := range ranges {
|
|
||||||
if strings.Contains(r, "-") {
|
if strings.Contains(r, "-") {
|
||||||
parts := strings.Split(r, "-")
|
parts := strings.Split(r, "-")
|
||||||
if len(parts) != 2 {
|
if len(parts) != 2 {
|
||||||
@@ -103,7 +102,9 @@ func (m *SlurmCgroupCollector) readFile(path string) ([]byte, error) {
|
|||||||
func (m *SlurmCgroupCollector) Init(config json.RawMessage) error {
|
func (m *SlurmCgroupCollector) Init(config json.RawMessage) error {
|
||||||
var err error
|
var err error
|
||||||
m.name = "SlurmCgroupCollector"
|
m.name = "SlurmCgroupCollector"
|
||||||
m.setup()
|
if err := m.setup(); err != nil {
|
||||||
|
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
|
||||||
|
}
|
||||||
m.parallel = true
|
m.parallel = true
|
||||||
m.meta = map[string]string{"source": m.name, "group": "SLURM"}
|
m.meta = map[string]string{"source": m.name, "group": "SLURM"}
|
||||||
m.tags = map[string]string{"type": "hwthread"}
|
m.tags = map[string]string{"type": "hwthread"}
|
||||||
|
|||||||
@@ -58,7 +58,9 @@ func (m *TempCollector) Init(config json.RawMessage) error {
|
|||||||
|
|
||||||
m.name = "TempCollector"
|
m.name = "TempCollector"
|
||||||
m.parallel = true
|
m.parallel = true
|
||||||
m.setup()
|
if err := m.setup(); err != nil {
|
||||||
|
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
|
||||||
|
}
|
||||||
if len(config) > 0 {
|
if len(config) > 0 {
|
||||||
err := json.Unmarshal(config, &m.config)
|
err := json.Unmarshal(config, &m.config)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -117,7 +119,7 @@ func (m *TempCollector) Init(config json.RawMessage) error {
|
|||||||
sensor.metricName = sensor.label
|
sensor.metricName = sensor.label
|
||||||
}
|
}
|
||||||
sensor.metricName = strings.ToLower(sensor.metricName)
|
sensor.metricName = strings.ToLower(sensor.metricName)
|
||||||
sensor.metricName = strings.Replace(sensor.metricName, " ", "_", -1)
|
sensor.metricName = strings.ReplaceAll(sensor.metricName, " ", "_")
|
||||||
// Add temperature prefix, if required
|
// Add temperature prefix, if required
|
||||||
if !strings.Contains(sensor.metricName, "temp") {
|
if !strings.Contains(sensor.metricName, "temp") {
|
||||||
sensor.metricName = "temp_" + sensor.metricName
|
sensor.metricName = "temp_" + sensor.metricName
|
||||||
|
|||||||
@@ -9,13 +9,12 @@ package collectors
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
|
||||||
"os/exec"
|
"os/exec"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
|
||||||
lp "github.com/ClusterCockpit/cc-lib/v2/ccMessage"
|
lp "github.com/ClusterCockpit/cc-lib/v2/ccMessage"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -36,12 +35,17 @@ func (m *TopProcsCollector) Init(config json.RawMessage) error {
|
|||||||
var err error
|
var err error
|
||||||
m.name = "TopProcsCollector"
|
m.name = "TopProcsCollector"
|
||||||
m.parallel = true
|
m.parallel = true
|
||||||
m.tags = map[string]string{"type": "node"}
|
m.tags = map[string]string{
|
||||||
m.meta = map[string]string{"source": m.name, "group": "TopProcs"}
|
"type": "node",
|
||||||
|
}
|
||||||
|
m.meta = map[string]string{
|
||||||
|
"source": m.name,
|
||||||
|
"group": "TopProcs",
|
||||||
|
}
|
||||||
if len(config) > 0 {
|
if len(config) > 0 {
|
||||||
err = json.Unmarshal(config, &m.config)
|
err = json.Unmarshal(config, &m.config)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return fmt.Errorf("%s Init(): json.Unmarshal() failed: %w", m.name, err)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
m.config.Num_procs = int(DEFAULT_NUM_PROCS)
|
m.config.Num_procs = int(DEFAULT_NUM_PROCS)
|
||||||
@@ -49,12 +53,13 @@ func (m *TopProcsCollector) Init(config json.RawMessage) error {
|
|||||||
if m.config.Num_procs <= 0 || m.config.Num_procs > MAX_NUM_PROCS {
|
if m.config.Num_procs <= 0 || m.config.Num_procs > MAX_NUM_PROCS {
|
||||||
return fmt.Errorf("num_procs option must be set in 'topprocs' config (range: 1-%d)", MAX_NUM_PROCS)
|
return fmt.Errorf("num_procs option must be set in 'topprocs' config (range: 1-%d)", MAX_NUM_PROCS)
|
||||||
}
|
}
|
||||||
m.setup()
|
if err := m.setup(); err != nil {
|
||||||
|
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
|
||||||
|
}
|
||||||
command := exec.Command("ps", "-Ao", "comm", "--sort=-pcpu")
|
command := exec.Command("ps", "-Ao", "comm", "--sort=-pcpu")
|
||||||
command.Wait()
|
|
||||||
_, err = command.Output()
|
_, err = command.Output()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.New("failed to execute command")
|
return fmt.Errorf("%s Init(): failed to get output from command: %w", m.name, err)
|
||||||
}
|
}
|
||||||
m.init = true
|
m.init = true
|
||||||
return nil
|
return nil
|
||||||
@@ -65,10 +70,11 @@ func (m *TopProcsCollector) Read(interval time.Duration, output chan lp.CCMessag
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
command := exec.Command("ps", "-Ao", "comm", "--sort=-pcpu")
|
command := exec.Command("ps", "-Ao", "comm", "--sort=-pcpu")
|
||||||
command.Wait()
|
|
||||||
stdout, err := command.Output()
|
stdout, err := command.Output()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Print(m.name, err)
|
cclog.ComponentError(
|
||||||
|
m.name,
|
||||||
|
fmt.Sprintf("Read(): Failed to read output from command \"%s\": %v", command.String(), err))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,6 +1,19 @@
|
|||||||
{
|
{
|
||||||
"cpufreq": {},
|
"cpufreq": {},
|
||||||
"cpufreq_cpuinfo": {},
|
"cpufreq_cpuinfo": {},
|
||||||
|
"cpustat": {
|
||||||
|
"exclude_metrics": [
|
||||||
|
"cpu_idle"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"diskstat": {
|
||||||
|
"exclude_metrics": [
|
||||||
|
"disk_total"
|
||||||
|
],
|
||||||
|
"exclude_mounts": [
|
||||||
|
"slurm-tmpfs"
|
||||||
|
]
|
||||||
|
},
|
||||||
"gpfs": {
|
"gpfs": {
|
||||||
"exclude_filesystem": [
|
"exclude_filesystem": [
|
||||||
"test_fs"
|
"test_fs"
|
||||||
@@ -21,6 +34,8 @@
|
|||||||
},
|
},
|
||||||
"numastats": {},
|
"numastats": {},
|
||||||
"nvidia": {},
|
"nvidia": {},
|
||||||
|
"schedstat": {
|
||||||
|
},
|
||||||
"tempstat": {
|
"tempstat": {
|
||||||
"report_max_temperature": true,
|
"report_max_temperature": true,
|
||||||
"report_critical_temperature": true,
|
"report_critical_temperature": true,
|
||||||
@@ -38,4 +53,4 @@
|
|||||||
"topprocs": {
|
"topprocs": {
|
||||||
"num_procs": 5
|
"num_procs": 5
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
2
go.mod
2
go.mod
@@ -11,7 +11,6 @@ require (
|
|||||||
github.com/influxdata/line-protocol v0.0.0-20210922203350-b1ad95c89adf
|
github.com/influxdata/line-protocol v0.0.0-20210922203350-b1ad95c89adf
|
||||||
github.com/tklauser/go-sysconf v0.3.16
|
github.com/tklauser/go-sysconf v0.3.16
|
||||||
golang.design/x/thread v0.0.0-20210122121316-335e9adffdf1
|
golang.design/x/thread v0.0.0-20210122121316-335e9adffdf1
|
||||||
golang.org/x/exp v0.0.0-20260112195511-716be5621a96
|
|
||||||
golang.org/x/sys v0.40.0
|
golang.org/x/sys v0.40.0
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -40,6 +39,7 @@ require (
|
|||||||
github.com/tklauser/numcpus v0.11.0 // indirect
|
github.com/tklauser/numcpus v0.11.0 // indirect
|
||||||
go.yaml.in/yaml/v2 v2.4.3 // indirect
|
go.yaml.in/yaml/v2 v2.4.3 // indirect
|
||||||
golang.org/x/crypto v0.47.0 // indirect
|
golang.org/x/crypto v0.47.0 // indirect
|
||||||
|
golang.org/x/exp v0.0.0-20260112195511-716be5621a96 // indirect
|
||||||
golang.org/x/net v0.49.0 // indirect
|
golang.org/x/net v0.49.0 // indirect
|
||||||
google.golang.org/protobuf v1.36.11 // indirect
|
google.golang.org/protobuf v1.36.11 // indirect
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -10,6 +10,7 @@ package metricAggregator
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"maps"
|
||||||
"math"
|
"math"
|
||||||
"os"
|
"os"
|
||||||
"strings"
|
"strings"
|
||||||
@@ -121,9 +122,7 @@ func (c *metricAggregator) Init(output chan lp.CCMessage) error {
|
|||||||
|
|
||||||
func (c *metricAggregator) Eval(starttime time.Time, endtime time.Time, metrics []lp.CCMessage) {
|
func (c *metricAggregator) Eval(starttime time.Time, endtime time.Time, metrics []lp.CCMessage) {
|
||||||
vars := make(map[string]interface{})
|
vars := make(map[string]interface{})
|
||||||
for k, v := range c.constants {
|
maps.Copy(vars, c.constants)
|
||||||
vars[k] = v
|
|
||||||
}
|
|
||||||
vars["starttime"] = starttime
|
vars["starttime"] = starttime
|
||||||
vars["endtime"] = endtime
|
vars["endtime"] = endtime
|
||||||
for _, f := range c.functions {
|
for _, f := range c.functions {
|
||||||
|
|||||||
@@ -11,10 +11,9 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"regexp"
|
"regexp"
|
||||||
|
"slices"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"golang.org/x/exp/slices"
|
|
||||||
|
|
||||||
topo "github.com/ClusterCockpit/cc-metric-collector/pkg/ccTopology"
|
topo "github.com/ClusterCockpit/cc-metric-collector/pkg/ccTopology"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -169,7 +168,7 @@ func medianfunc(args interface{}) (interface{}, error) {
|
|||||||
|
|
||||||
func lenfunc(args interface{}) (interface{}, error) {
|
func lenfunc(args interface{}) (interface{}, error) {
|
||||||
var err error = nil
|
var err error = nil
|
||||||
var length int = 0
|
length := 0
|
||||||
switch values := args.(type) {
|
switch values := args.(type) {
|
||||||
case []float64:
|
case []float64:
|
||||||
length = len(values)
|
length = len(values)
|
||||||
@@ -238,7 +237,7 @@ func matchfunc(args ...interface{}) (interface{}, error) {
|
|||||||
case string:
|
case string:
|
||||||
switch total := args[1].(type) {
|
switch total := args[1].(type) {
|
||||||
case string:
|
case string:
|
||||||
smatch := strings.Replace(match, "%", "\\", -1)
|
smatch := strings.ReplaceAll(match, "%", "\\")
|
||||||
regex, err := regexp.Compile(smatch)
|
regex, err := regexp.Compile(smatch)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
|
|||||||
@@ -51,7 +51,7 @@ type MetricCache interface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *metricCache) Init(output chan lp.CCMessage, ticker mct.MultiChanTicker, wg *sync.WaitGroup, numPeriods int) error {
|
func (c *metricCache) Init(output chan lp.CCMessage, ticker mct.MultiChanTicker, wg *sync.WaitGroup, numPeriods int) error {
|
||||||
var err error = nil
|
var err error
|
||||||
c.done = make(chan bool)
|
c.done = make(chan bool)
|
||||||
c.wg = wg
|
c.wg = wg
|
||||||
c.ticker = ticker
|
c.ticker = ticker
|
||||||
@@ -161,8 +161,8 @@ func (c *metricCache) DeleteAggregation(name string) error {
|
|||||||
// is the current one, index=1 the last interval and so on. Returns and empty array if a wrong index
|
// is the current one, index=1 the last interval and so on. Returns and empty array if a wrong index
|
||||||
// is given (negative index, index larger than configured number of total intervals, ...)
|
// is given (negative index, index larger than configured number of total intervals, ...)
|
||||||
func (c *metricCache) GetPeriod(index int) (time.Time, time.Time, []lp.CCMessage) {
|
func (c *metricCache) GetPeriod(index int) (time.Time, time.Time, []lp.CCMessage) {
|
||||||
var start time.Time = time.Now()
|
start := time.Now()
|
||||||
var stop time.Time = time.Now()
|
stop := time.Now()
|
||||||
var metrics []lp.CCMessage
|
var metrics []lp.CCMessage
|
||||||
if index >= 0 && index < c.numPeriods {
|
if index >= 0 && index < c.numPeriods {
|
||||||
pindex := c.curPeriod - index
|
pindex := c.curPeriod - index
|
||||||
|
|||||||
@@ -107,10 +107,8 @@ func (r *metricRouter) Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, rout
|
|||||||
cclog.ComponentError("MetricRouter", err.Error())
|
cclog.ComponentError("MetricRouter", err.Error())
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
r.maxForward = 1
|
r.maxForward = max(1, r.config.MaxForward)
|
||||||
if r.config.MaxForward > r.maxForward {
|
|
||||||
r.maxForward = r.config.MaxForward
|
|
||||||
}
|
|
||||||
if r.config.NumCacheIntervals > 0 {
|
if r.config.NumCacheIntervals > 0 {
|
||||||
r.cache, err = NewCache(r.cache_input, r.ticker, &r.cachewg, r.config.NumCacheIntervals)
|
r.cache, err = NewCache(r.cache_input, r.ticker, &r.cachewg, r.config.NumCacheIntervals)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -118,50 +116,74 @@ func (r *metricRouter) Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, rout
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
for _, agg := range r.config.IntervalAgg {
|
for _, agg := range r.config.IntervalAgg {
|
||||||
r.cache.AddAggregation(agg.Name, agg.Function, agg.Condition, agg.Tags, agg.Meta)
|
err = r.cache.AddAggregation(agg.Name, agg.Function, agg.Condition, agg.Tags, agg.Meta)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("MetricCache AddAggregation() failed: %w", err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
p, err := mp.NewMessageProcessor()
|
p, err := mp.NewMessageProcessor()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("initialization of message processor failed: %v", err.Error())
|
return fmt.Errorf("MessageProcessor NewMessageProcessor() failed: %w", err)
|
||||||
}
|
}
|
||||||
r.mp = p
|
r.mp = p
|
||||||
|
|
||||||
if len(r.config.MessageProcessor) > 0 {
|
if len(r.config.MessageProcessor) > 0 {
|
||||||
err = r.mp.FromConfigJSON(r.config.MessageProcessor)
|
err = r.mp.FromConfigJSON(r.config.MessageProcessor)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed parsing JSON for message processor: %v", err.Error())
|
return fmt.Errorf("MessageProcessor FromConfigJSON() failed: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for _, mname := range r.config.DropMetrics {
|
for _, mname := range r.config.DropMetrics {
|
||||||
r.mp.AddDropMessagesByName(mname)
|
err = r.mp.AddDropMessagesByName(mname)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("MessageProcessor AddDropMessagesByName() failed: %w", err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
for _, cond := range r.config.DropMetricsIf {
|
for _, cond := range r.config.DropMetricsIf {
|
||||||
r.mp.AddDropMessagesByCondition(cond)
|
err = r.mp.AddDropMessagesByCondition(cond)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("MessageProcessor AddDropMessagesByCondition() failed: %w", err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
for _, data := range r.config.AddTags {
|
for _, data := range r.config.AddTags {
|
||||||
cond := data.Condition
|
cond := data.Condition
|
||||||
if cond == "*" {
|
if cond == "*" {
|
||||||
cond = "true"
|
cond = "true"
|
||||||
}
|
}
|
||||||
r.mp.AddAddTagsByCondition(cond, data.Key, data.Value)
|
err = r.mp.AddAddTagsByCondition(cond, data.Key, data.Value)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("MessageProcessor AddAddTagsByCondition() failed: %w", err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
for _, data := range r.config.DelTags {
|
for _, data := range r.config.DelTags {
|
||||||
cond := data.Condition
|
cond := data.Condition
|
||||||
if cond == "*" {
|
if cond == "*" {
|
||||||
cond = "true"
|
cond = "true"
|
||||||
}
|
}
|
||||||
r.mp.AddDeleteTagsByCondition(cond, data.Key, data.Value)
|
err = r.mp.AddDeleteTagsByCondition(cond, data.Key, data.Value)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("MessageProcessor AddDeleteTagsByCondition() failed: %w", err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
for oldname, newname := range r.config.RenameMetrics {
|
for oldname, newname := range r.config.RenameMetrics {
|
||||||
r.mp.AddRenameMetricByName(oldname, newname)
|
err = r.mp.AddRenameMetricByName(oldname, newname)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("MessageProcessor AddRenameMetricByName() failed: %w", err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
for metricName, prefix := range r.config.ChangeUnitPrefix {
|
for metricName, prefix := range r.config.ChangeUnitPrefix {
|
||||||
r.mp.AddChangeUnitPrefix(fmt.Sprintf("name == '%s'", metricName), prefix)
|
err = r.mp.AddChangeUnitPrefix(fmt.Sprintf("name == '%s'", metricName), prefix)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("MessageProcessor AddChangeUnitPrefix() failed: %w", err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
r.mp.SetNormalizeUnits(r.config.NormalizeUnits)
|
r.mp.SetNormalizeUnits(r.config.NormalizeUnits)
|
||||||
|
|
||||||
r.mp.AddAddTagsByCondition("true", r.config.HostnameTagName, r.hostname)
|
err = r.mp.AddAddTagsByCondition("true", r.config.HostnameTagName, r.hostname)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("MessageProcessor AddAddTagsByCondition() failed: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
// r.config.dropMetrics = make(map[string]bool)
|
// r.config.dropMetrics = make(map[string]bool)
|
||||||
// for _, mname := range r.config.DropMetrics {
|
// for _, mname := range r.config.DropMetrics {
|
||||||
|
|||||||
@@ -13,11 +13,11 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"regexp"
|
"regexp"
|
||||||
|
"slices"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
cclogger "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
|
cclogger "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
|
||||||
"golang.org/x/exp/slices"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const SYSFS_CPUBASE = `/sys/devices/system/cpu`
|
const SYSFS_CPUBASE = `/sys/devices/system/cpu`
|
||||||
@@ -80,7 +80,7 @@ func fileToList(path string) []int {
|
|||||||
// Create list
|
// Create list
|
||||||
list := make([]int, 0)
|
list := make([]int, 0)
|
||||||
stringBuffer := strings.TrimSpace(string(buffer))
|
stringBuffer := strings.TrimSpace(string(buffer))
|
||||||
for _, valueRangeString := range strings.Split(stringBuffer, ",") {
|
for valueRangeString := range strings.SplitSeq(stringBuffer, ",") {
|
||||||
valueRange := strings.Split(valueRangeString, "-")
|
valueRange := strings.Split(valueRangeString, "-")
|
||||||
switch len(valueRange) {
|
switch len(valueRange) {
|
||||||
case 1:
|
case 1:
|
||||||
|
|||||||
Reference in New Issue
Block a user