Format with gofumpt

This commit is contained in:
Holger Obermaier
2026-02-16 14:16:03 +01:00
parent 9bb21b807a
commit 539581f952
28 changed files with 294 additions and 310 deletions

View File

@@ -58,6 +58,17 @@ fmt:
$(GOBIN) fmt $(GOSRC_APP) $(GOBIN) fmt $(GOSRC_APP)
@for F in $(GOSRC_INTERNAL); do $(GOBIN) fmt $$F; done @for F in $(GOSRC_INTERNAL); do $(GOBIN) fmt $$F; done
# gofumpt <https://github.com/mvdan/gofumpt>:
# Enforce a stricter format than gofmt
.PHONY: gofumpt
fmt:
$(GOBIN) install mvdan.cc/gofumpt@latest
gofumpt -w $(GOSRC_COLLECTORS)
gofumpt -w $(GOSRC_SINKS)
gofumpt -w $(GOSRC_RECEIVERS)
gofumpt -w $(GOSRC_APP)
@for F in $(GOSRC_INTERNAL); do gofumpt -w $$F; done
# Examine Go source code and reports suspicious constructs # Examine Go source code and reports suspicious constructs
.PHONY: vet .PHONY: vet

View File

@@ -12,16 +12,14 @@ import (
"flag" "flag"
"os" "os"
"os/signal" "os/signal"
"sync"
"syscall" "syscall"
"time"
"github.com/ClusterCockpit/cc-lib/v2/receivers" "github.com/ClusterCockpit/cc-lib/v2/receivers"
"github.com/ClusterCockpit/cc-lib/v2/sinks" "github.com/ClusterCockpit/cc-lib/v2/sinks"
"github.com/ClusterCockpit/cc-metric-collector/collectors" "github.com/ClusterCockpit/cc-metric-collector/collectors"
// "strings"
"sync"
"time"
ccconf "github.com/ClusterCockpit/cc-lib/v2/ccConfig" ccconf "github.com/ClusterCockpit/cc-lib/v2/ccConfig"
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
lp "github.com/ClusterCockpit/cc-lib/v2/ccMessage" lp "github.com/ClusterCockpit/cc-lib/v2/ccMessage"

View File

@@ -50,7 +50,7 @@ func (m *BeegfsMetaCollector) Init(config json.RawMessage) error {
return nil return nil
} }
// Metrics // Metrics
var nodeMdstat_array = [39]string{ nodeMdstat_array := [39]string{
"sum", "ack", "close", "entInf", "sum", "ack", "close", "entInf",
"fndOwn", "mkdir", "create", "rddir", "fndOwn", "mkdir", "create", "rddir",
"refrEn", "mdsInf", "rmdir", "rmLnk", "refrEn", "mdsInf", "rmdir", "rmLnk",
@@ -60,7 +60,8 @@ func (m *BeegfsMetaCollector) Init(config json.RawMessage) error {
"lookLI", "statLI", "revalLI", "openLI", "lookLI", "statLI", "revalLI", "openLI",
"createLI", "hardlnk", "flckAp", "flckEn", "createLI", "hardlnk", "flckAp", "flckEn",
"flckRg", "dirparent", "listXA", "getXA", "flckRg", "dirparent", "listXA", "getXA",
"rmXA", "setXA", "mirror"} "rmXA", "setXA", "mirror",
}
m.name = "BeegfsMetaCollector" m.name = "BeegfsMetaCollector"
if err := m.setup(); err != nil { if err := m.setup(); err != nil {
@@ -154,7 +155,6 @@ func (m *BeegfsMetaCollector) Read(interval time.Duration, output chan lp.CCMess
// --nodetype=meta: The node type to query (meta, storage). // --nodetype=meta: The node type to query (meta, storage).
// --interval: // --interval:
// --mount=/mnt/beeond/: Which mount point // --mount=/mnt/beeond/: Which mount point
//cmd := exec.Command(m.config.Beegfs, "/root/mc/test.txt")
mountoption := "--mount=" + mountpoint mountoption := "--mount=" + mountpoint
cmd := exec.Command(m.config.Beegfs, "--clientstats", cmd := exec.Command(m.config.Beegfs, "--clientstats",
"--nodetype=meta", mountoption, "--allstats") "--nodetype=meta", mountoption, "--allstats")
@@ -180,14 +180,12 @@ func (m *BeegfsMetaCollector) Read(interval time.Duration, output chan lp.CCMess
scanner := bufio.NewScanner(cmdStdout) scanner := bufio.NewScanner(cmdStdout)
sumLine := regexp.MustCompile(`^Sum:\s+\d+\s+\[[a-zA-Z]+\]+`) sumLine := regexp.MustCompile(`^Sum:\s+\d+\s+\[[a-zA-Z]+\]+`)
//Line := regexp.MustCompile(`^(.*)\s+(\d)+\s+\[([a-zA-Z]+)\]+`)
statsLine := regexp.MustCompile(`^(.*?)\s+?(\d.*?)$`) statsLine := regexp.MustCompile(`^(.*?)\s+?(\d.*?)$`)
singleSpacePattern := regexp.MustCompile(`\s+`) singleSpacePattern := regexp.MustCompile(`\s+`)
removePattern := regexp.MustCompile(`[\[|\]]`) removePattern := regexp.MustCompile(`[\[|\]]`)
for scanner.Scan() { for scanner.Scan() {
readLine := scanner.Text() readLine := scanner.Text()
//fmt.Println(readLine)
// Jump few lines, we only want the I/O stats from nodes // Jump few lines, we only want the I/O stats from nodes
if !sumLine.MatchString(readLine) { if !sumLine.MatchString(readLine) {
continue continue
@@ -196,7 +194,7 @@ func (m *BeegfsMetaCollector) Read(interval time.Duration, output chan lp.CCMess
match := statsLine.FindStringSubmatch(readLine) match := statsLine.FindStringSubmatch(readLine)
// nodeName = "Sum:" or would be nodes // nodeName = "Sum:" or would be nodes
// nodeName := match[1] // nodeName := match[1]
//Remove multiple whitespaces // Remove multiple whitespaces
dummy := removePattern.ReplaceAllString(match[2], " ") dummy := removePattern.ReplaceAllString(match[2], " ")
metaStats := strings.TrimSpace(singleSpacePattern.ReplaceAllString(dummy, " ")) metaStats := strings.TrimSpace(singleSpacePattern.ReplaceAllString(dummy, " "))
split := strings.Split(metaStats, " ") split := strings.Split(metaStats, " ")
@@ -222,7 +220,6 @@ func (m *BeegfsMetaCollector) Read(interval time.Duration, output chan lp.CCMess
fmt.Sprintf("Metric (other): Failed to convert str written '%s' to float: %v", m.matches["other"], err)) fmt.Sprintf("Metric (other): Failed to convert str written '%s' to float: %v", m.matches["other"], err))
continue continue
} }
//mdStat["other"] = fmt.Sprintf("%f", f1+f2)
m.matches["beegfs_cstorage_other"] = fmt.Sprintf("%f", f1+f2) m.matches["beegfs_cstorage_other"] = fmt.Sprintf("%f", f1+f2)
} }
} }

View File

@@ -48,12 +48,13 @@ func (m *BeegfsStorageCollector) Init(config json.RawMessage) error {
return nil return nil
} }
// Metrics // Metrics
var storageStat_array = [18]string{ storageStat_array := [18]string{
"sum", "ack", "sChDrct", "getFSize", "sum", "ack", "sChDrct", "getFSize",
"sAttr", "statfs", "trunc", "close", "sAttr", "statfs", "trunc", "close",
"fsync", "ops-rd", "MiB-rd/s", "ops-wr", "fsync", "ops-rd", "MiB-rd/s", "ops-wr",
"MiB-wr/s", "gendbg", "hrtbeat", "remNode", "MiB-wr/s", "gendbg", "hrtbeat", "remNode",
"storInf", "unlnk"} "storInf", "unlnk",
}
m.name = "BeegfsStorageCollector" m.name = "BeegfsStorageCollector"
if err := m.setup(); err != nil { if err := m.setup(); err != nil {
@@ -72,7 +73,7 @@ func (m *BeegfsStorageCollector) Init(config json.RawMessage) error {
} }
} }
//create map with possible variables // Create map with possible variables
m.matches = make(map[string]string) m.matches = make(map[string]string)
for _, value := range storageStat_array { for _, value := range storageStat_array {
if slices.Contains(m.config.ExcludeMetrics, value) { if slices.Contains(m.config.ExcludeMetrics, value) {
@@ -117,11 +118,10 @@ func (m *BeegfsStorageCollector) Read(interval time.Duration, output chan lp.CCM
if !m.init { if !m.init {
return return
} }
//get mounpoint // Get mounpoint
buffer, _ := os.ReadFile(string("/proc/mounts")) buffer, _ := os.ReadFile("/proc/mounts")
mounts := strings.Split(string(buffer), "\n")
var mountpoints []string var mountpoints []string
for _, line := range mounts { for line := range strings.Lines(string(buffer)) {
if len(line) == 0 { if len(line) == 0 {
continue continue
} }
@@ -146,7 +146,6 @@ func (m *BeegfsStorageCollector) Read(interval time.Duration, output chan lp.CCM
// --nodetype=meta: The node type to query (meta, storage). // --nodetype=meta: The node type to query (meta, storage).
// --interval: // --interval:
// --mount=/mnt/beeond/: Which mount point // --mount=/mnt/beeond/: Which mount point
//cmd := exec.Command(m.config.Beegfs, "/root/mc/test.txt")
mountoption := "--mount=" + mountpoint mountoption := "--mount=" + mountpoint
cmd := exec.Command(m.config.Beegfs, "--clientstats", cmd := exec.Command(m.config.Beegfs, "--clientstats",
"--nodetype=storage", mountoption, "--allstats") "--nodetype=storage", mountoption, "--allstats")
@@ -172,7 +171,6 @@ func (m *BeegfsStorageCollector) Read(interval time.Duration, output chan lp.CCM
scanner := bufio.NewScanner(cmdStdout) scanner := bufio.NewScanner(cmdStdout)
sumLine := regexp.MustCompile(`^Sum:\s+\d+\s+\[[a-zA-Z]+\]+`) sumLine := regexp.MustCompile(`^Sum:\s+\d+\s+\[[a-zA-Z]+\]+`)
//Line := regexp.MustCompile(`^(.*)\s+(\d)+\s+\[([a-zA-Z]+)\]+`)
statsLine := regexp.MustCompile(`^(.*?)\s+?(\d.*?)$`) statsLine := regexp.MustCompile(`^(.*?)\s+?(\d.*?)$`)
singleSpacePattern := regexp.MustCompile(`\s+`) singleSpacePattern := regexp.MustCompile(`\s+`)
removePattern := regexp.MustCompile(`[\[|\]]`) removePattern := regexp.MustCompile(`[\[|\]]`)
@@ -187,7 +185,7 @@ func (m *BeegfsStorageCollector) Read(interval time.Duration, output chan lp.CCM
match := statsLine.FindStringSubmatch(readLine) match := statsLine.FindStringSubmatch(readLine)
// nodeName = "Sum:" or would be nodes // nodeName = "Sum:" or would be nodes
// nodeName := match[1] // nodeName := match[1]
//Remove multiple whitespaces // Remove multiple whitespaces
dummy := removePattern.ReplaceAllString(match[2], " ") dummy := removePattern.ReplaceAllString(match[2], " ")
metaStats := strings.TrimSpace(singleSpacePattern.ReplaceAllString(dummy, " ")) metaStats := strings.TrimSpace(singleSpacePattern.ReplaceAllString(dummy, " "))
split := strings.Split(metaStats, " ") split := strings.Split(metaStats, " ")
@@ -198,7 +196,6 @@ func (m *BeegfsStorageCollector) Read(interval time.Duration, output chan lp.CCM
for i := 0; i <= len(split)-1; i += 2 { for i := 0; i <= len(split)-1; i += 2 {
if _, ok := m.matches[split[i+1]]; ok { if _, ok := m.matches[split[i+1]]; ok {
m.matches["beegfs_cstorage_"+split[i+1]] = split[i] m.matches["beegfs_cstorage_"+split[i+1]] = split[i]
//m.matches[split[i+1]] = split[i]
} else { } else {
f1, err := strconv.ParseFloat(m.matches["other"], 32) f1, err := strconv.ParseFloat(m.matches["other"], 32)
if err != nil { if err != nil {

View File

@@ -20,7 +20,6 @@ import (
// Map of all available metric collectors // Map of all available metric collectors
var AvailableCollectors = map[string]MetricCollector{ var AvailableCollectors = map[string]MetricCollector{
"likwid": new(LikwidCollector), "likwid": new(LikwidCollector),
"loadavg": new(LoadavgCollector), "loadavg": new(LoadavgCollector),
"memstat": new(MemstatCollector), "memstat": new(MemstatCollector),

View File

@@ -10,7 +10,6 @@ package collectors
import ( import (
"bufio" "bufio"
"encoding/json" "encoding/json"
"fmt" "fmt"
"os" "os"
"strconv" "strconv"

View File

@@ -111,7 +111,8 @@ func (m *CpustatCollector) Init(config json.RawMessage) error {
cpu, _ := strconv.Atoi(cpustr) cpu, _ := strconv.Atoi(cpustr)
m.cputags[linefields[0]] = map[string]string{ m.cputags[linefields[0]] = map[string]string{
"type": "hwthread", "type": "hwthread",
"type-id": strconv.Itoa(cpu)} "type-id": strconv.Itoa(cpu),
}
m.olddata[linefields[0]] = make(map[string]int64) m.olddata[linefields[0]] = make(map[string]int64)
for k, v := range m.matches { for k, v := range m.matches {
m.olddata[linefields[0]][k], _ = strconv.ParseInt(linefields[v], 0, 64) m.olddata[linefields[0]][k], _ = strconv.ParseInt(linefields[v], 0, 64)

View File

@@ -123,28 +123,30 @@ mountLoop:
continue continue
} }
tags := map[string]string{"type": "node", "device": linefields[0]} tags := map[string]string{"type": "node", "device": linefields[0]}
total := (stat.Blocks * uint64(stat.Bsize)) / uint64(1000000000) total := (stat.Blocks * uint64(stat.Bsize)) / uint64(1000_000_000)
if m.allowedMetrics["disk_total"] { if m.allowedMetrics["disk_total"] {
y, err := lp.NewMessage( y, err := lp.NewMessage(
"disk_total", "disk_total",
tags, tags,
m.meta, m.meta,
map[string]any{ map[string]any{
"value": total}, "value": total,
},
time.Now()) time.Now())
if err == nil { if err == nil {
y.AddMeta("unit", "GBytes") y.AddMeta("unit", "GBytes")
output <- y output <- y
} }
} }
free := (stat.Bfree * uint64(stat.Bsize)) / uint64(1000000000) free := (stat.Bfree * uint64(stat.Bsize)) / uint64(1000_000_000)
if m.allowedMetrics["disk_free"] { if m.allowedMetrics["disk_free"] {
y, err := lp.NewMessage( y, err := lp.NewMessage(
"disk_free", "disk_free",
tags, tags,
m.meta, m.meta,
map[string]any{ map[string]any{
"value": free}, "value": free,
},
time.Now()) time.Now())
if err == nil { if err == nil {
y.AddMeta("unit", "GBytes") y.AddMeta("unit", "GBytes")
@@ -162,10 +164,12 @@ mountLoop:
y, err := lp.NewMessage( y, err := lp.NewMessage(
"part_max_used", "part_max_used",
map[string]string{ map[string]string{
"type": "node"}, "type": "node",
},
m.meta, m.meta,
map[string]any{ map[string]any{
"value": int(part_max_used)}, "value": int(part_max_used),
},
time.Now()) time.Now())
if err == nil { if err == nil {
y.AddMeta("unit", "percent") y.AddMeta("unit", "percent")

View File

@@ -8,19 +8,18 @@
package collectors package collectors
import ( import (
"encoding/json"
"fmt" "fmt"
"os" "os"
"path/filepath"
"slices" "slices"
"strconv"
"strings"
"time"
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
lp "github.com/ClusterCockpit/cc-lib/v2/ccMessage" lp "github.com/ClusterCockpit/cc-lib/v2/ccMessage"
"golang.org/x/sys/unix" "golang.org/x/sys/unix"
"encoding/json"
"path/filepath"
"strconv"
"strings"
"time"
) )
const IB_BASEPATH = "/sys/class/infiniband/" const IB_BASEPATH = "/sys/class/infiniband/"
@@ -59,7 +58,6 @@ type InfinibandCollector struct {
// Init initializes the Infiniband collector by walking through files below IB_BASEPATH // Init initializes the Infiniband collector by walking through files below IB_BASEPATH
func (m *InfinibandCollector) Init(config json.RawMessage) error { func (m *InfinibandCollector) Init(config json.RawMessage) error {
// Check if already initialized // Check if already initialized
if m.init { if m.init {
return nil return nil
@@ -187,7 +185,6 @@ func (m *InfinibandCollector) Init(config json.RawMessage) error {
// Read reads Infiniband counter files below IB_BASEPATH // Read reads Infiniband counter files below IB_BASEPATH
func (m *InfinibandCollector) Read(interval time.Duration, output chan lp.CCMessage) { func (m *InfinibandCollector) Read(interval time.Duration, output chan lp.CCMessage) {
// Check if already initialized // Check if already initialized
if !m.init { if !m.init {
return return
@@ -233,8 +230,7 @@ func (m *InfinibandCollector) Read(interval time.Duration, output chan lp.CCMess
// Send absolut values // Send absolut values
if m.config.SendAbsoluteValues { if m.config.SendAbsoluteValues {
if y, err := if y, err := lp.NewMessage(
lp.NewMessage(
counterDef.name, counterDef.name,
info.tagSet, info.tagSet,
m.meta, m.meta,
@@ -251,8 +247,7 @@ func (m *InfinibandCollector) Read(interval time.Duration, output chan lp.CCMess
if m.config.SendDerivedValues { if m.config.SendDerivedValues {
if counterDef.lastState >= 0 { if counterDef.lastState >= 0 {
rate := float64((counterDef.currentState - counterDef.lastState)) / timeDiff rate := float64((counterDef.currentState - counterDef.lastState)) / timeDiff
if y, err := if y, err := lp.NewMessage(
lp.NewMessage(
counterDef.name+"_bw", counterDef.name+"_bw",
info.tagSet, info.tagSet,
m.meta, m.meta,
@@ -281,8 +276,7 @@ func (m *InfinibandCollector) Read(interval time.Duration, output chan lp.CCMess
// Send total values // Send total values
if m.config.SendTotalValues { if m.config.SendTotalValues {
if y, err := if y, err := lp.NewMessage(
lp.NewMessage(
"ib_total", "ib_total",
info.tagSet, info.tagSet,
m.meta, m.meta,
@@ -294,8 +288,7 @@ func (m *InfinibandCollector) Read(interval time.Duration, output chan lp.CCMess
output <- y output <- y
} }
if y, err := if y, err := lp.NewMessage(
lp.NewMessage(
"ib_total_pkts", "ib_total_pkts",
info.tagSet, info.tagSet,
m.meta, m.meta,

View File

@@ -93,7 +93,6 @@ func (m *IpmiCollector) Init(config json.RawMessage) error {
} }
func (m *IpmiCollector) readIpmiTool(cmd string, output chan lp.CCMessage) { func (m *IpmiCollector) readIpmiTool(cmd string, output chan lp.CCMessage) {
// Setup ipmitool command // Setup ipmitool command
command := exec.Command(cmd, "sensor") command := exec.Command(cmd, "sensor")
stdout, _ := command.StdoutPipe() stdout, _ := command.StdoutPipe()
@@ -152,7 +151,6 @@ func (m *IpmiCollector) readIpmiTool(cmd string, output chan lp.CCMessage) {
} }
func (m *IpmiCollector) readIpmiSensors(cmd string, output chan lp.CCMessage) { func (m *IpmiCollector) readIpmiSensors(cmd string, output chan lp.CCMessage) {
// Setup ipmisensors command // Setup ipmisensors command
command := exec.Command(cmd, "--comma-separated-output", "--sdr-cache-recreate") command := exec.Command(cmd, "--comma-separated-output", "--sdr-cache-recreate")
stdout, _ := command.StdoutPipe() stdout, _ := command.StdoutPipe()
@@ -197,11 +195,9 @@ func (m *IpmiCollector) readIpmiSensors(cmd string, output chan lp.CCMessage) {
cclog.ComponentError(m.name, fmt.Sprintf("readIpmiSensors(): command stderr: \"%s\"\n", strings.TrimSpace(string(errMsg)))) cclog.ComponentError(m.name, fmt.Sprintf("readIpmiSensors(): command stderr: \"%s\"\n", strings.TrimSpace(string(errMsg))))
return return
} }
} }
func (m *IpmiCollector) Read(interval time.Duration, output chan lp.CCMessage) { func (m *IpmiCollector) Read(interval time.Duration, output chan lp.CCMessage) {
// Check if already initialized // Check if already initialized
if !m.init { if !m.init {
return return

View File

@@ -609,8 +609,7 @@ func (m *LikwidCollector) calcEventsetMetrics(evset LikwidEventsetConfig, interv
evset.metrics[tid][metric.Name] = value evset.metrics[tid][metric.Name] = value
// Now we have the result, send it with the proper tags // Now we have the result, send it with the proper tags
if !math.IsNaN(value) && metric.Publish { if !math.IsNaN(value) && metric.Publish {
y, err := y, err := lp.NewMessage(
lp.NewMessage(
metric.Name, metric.Name,
map[string]string{ map[string]string{
"type": metric.Type, "type": metric.Type,
@@ -648,8 +647,7 @@ func (m *LikwidCollector) calcEventsetMetrics(evset LikwidEventsetConfig, interv
} }
for coreID, value := range totalCoreValues { for coreID, value := range totalCoreValues {
y, err := y, err := lp.NewMessage(
lp.NewMessage(
metric.Name, metric.Name,
map[string]string{ map[string]string{
"type": "core", "type": "core",
@@ -685,8 +683,7 @@ func (m *LikwidCollector) calcEventsetMetrics(evset LikwidEventsetConfig, interv
} }
for socketID, value := range totalSocketValues { for socketID, value := range totalSocketValues {
y, err := y, err := lp.NewMessage(
lp.NewMessage(
metric.Name, metric.Name,
map[string]string{ map[string]string{
"type": "socket", "type": "socket",
@@ -720,8 +717,7 @@ func (m *LikwidCollector) calcEventsetMetrics(evset LikwidEventsetConfig, interv
} }
} }
y, err := y, err := lp.NewMessage(
lp.NewMessage(
metric.Name, metric.Name,
map[string]string{ map[string]string{
"type": "node", "type": "node",
@@ -778,8 +774,7 @@ func (m *LikwidCollector) calcGlobalMetrics(groups []LikwidEventsetConfig, inter
// Now we have the result, send it with the proper tags // Now we have the result, send it with the proper tags
if !math.IsNaN(value) { if !math.IsNaN(value) {
if metric.Publish { if metric.Publish {
y, err := y, err := lp.NewMessage(
lp.NewMessage(
metric.Name, metric.Name,
map[string]string{ map[string]string{
"type": metric.Type, "type": metric.Type,

View File

@@ -55,16 +55,19 @@ func (m *LoadavgCollector) Init(config json.RawMessage) error {
} }
m.meta = map[string]string{ m.meta = map[string]string{
"source": m.name, "source": m.name,
"group": "LOAD"} "group": "LOAD",
}
m.tags = map[string]string{"type": "node"} m.tags = map[string]string{"type": "node"}
m.load_matches = []string{ m.load_matches = []string{
"load_one", "load_one",
"load_five", "load_five",
"load_fifteen"} "load_fifteen",
}
m.load_skips = make([]bool, len(m.load_matches)) m.load_skips = make([]bool, len(m.load_matches))
m.proc_matches = []string{ m.proc_matches = []string{
"proc_run", "proc_run",
"proc_total"} "proc_total",
}
m.proc_skips = make([]bool, len(m.proc_matches)) m.proc_skips = make([]bool, len(m.proc_matches))
for i, name := range m.load_matches { for i, name := range m.load_matches {

View File

@@ -22,9 +22,11 @@ import (
lp "github.com/ClusterCockpit/cc-lib/v2/ccMessage" lp "github.com/ClusterCockpit/cc-lib/v2/ccMessage"
) )
const LUSTRE_SYSFS = `/sys/fs/lustre` const (
const LCTL_CMD = `lctl` LUSTRE_SYSFS = `/sys/fs/lustre`
const LCTL_OPTION = `get_param` LCTL_CMD = `lctl`
LCTL_OPTION = `get_param`
)
type LustreCollectorConfig struct { type LustreCollectorConfig struct {
LCtlCommand string `json:"lctl_command,omitempty"` LCtlCommand string `json:"lctl_command,omitempty"`

View File

@@ -24,8 +24,10 @@ import (
lp "github.com/ClusterCockpit/cc-lib/v2/ccMessage" lp "github.com/ClusterCockpit/cc-lib/v2/ccMessage"
) )
const MEMSTATFILE = "/proc/meminfo" const (
const NUMA_MEMSTAT_BASE = "/sys/devices/system/node" MEMSTATFILE = "/proc/meminfo"
NUMA_MEMSTAT_BASE = "/sys/devices/system/node"
)
type MemstatCollectorConfig struct { type MemstatCollectorConfig struct {
ExcludeMetrics []string `json:"exclude_metrics"` ExcludeMetrics []string `json:"exclude_metrics"`

View File

@@ -43,8 +43,10 @@ type NfsIOStatCollector struct {
lastTimestamp time.Time lastTimestamp time.Time
} }
var deviceRegex = regexp.MustCompile(`device (?P<server>[^ ]+) mounted on (?P<mntpoint>[^ ]+) with fstype nfs(?P<version>\d*) statvers=[\d\.]+`) var (
var bytesRegex = regexp.MustCompile(`\s+bytes:\s+(?P<nread>[^ ]+) (?P<nwrite>[^ ]+) (?P<dread>[^ ]+) (?P<dwrite>[^ ]+) (?P<nfsread>[^ ]+) (?P<nfswrite>[^ ]+) (?P<pageread>[^ ]+) (?P<pagewrite>[^ ]+)`) deviceRegex = regexp.MustCompile(`device (?P<server>[^ ]+) mounted on (?P<mntpoint>[^ ]+) with fstype nfs(?P<version>\d*) statvers=[\d\.]+`)
bytesRegex = regexp.MustCompile(`\s+bytes:\s+(?P<nread>[^ ]+) (?P<nwrite>[^ ]+) (?P<dread>[^ ]+) (?P<dwrite>[^ ]+) (?P<nfsread>[^ ]+) (?P<nfswrite>[^ ]+) (?P<pageread>[^ ]+) (?P<pagewrite>[^ ]+)`)
)
func resolve_regex_fields(s string, regex *regexp.Regexp) map[string]string { func resolve_regex_fields(s string, regex *regexp.Regexp) map[string]string {
fields := make(map[string]string) fields := make(map[string]string)
@@ -149,7 +151,8 @@ func (m *NfsIOStatCollector) Read(interval time.Duration, output chan lp.CCMessa
m.tags, m.tags,
m.meta, m.meta,
map[string]any{ map[string]any{
"value": newVal}, "value": newVal,
},
now) now)
if err == nil { if err == nil {
msg.AddTag("stype", "filesystem") msg.AddTag("stype", "filesystem")

View File

@@ -1226,7 +1226,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMessage)
} }
// Actual read loop over all attached Nvidia GPUs // Actual read loop over all attached Nvidia GPUs
for i := 0; i < m.num_gpus; i++ { for i := range m.num_gpus {
readAll(&m.gpus[i], output) readAll(&m.gpus[i], output)

View File

@@ -49,7 +49,6 @@ type RAPLCollector struct {
// Init initializes the running average power limit (RAPL) collector // Init initializes the running average power limit (RAPL) collector
func (m *RAPLCollector) Init(config json.RawMessage) error { func (m *RAPLCollector) Init(config json.RawMessage) error {
// Check if already initialized // Check if already initialized
if m.init { if m.init {
return nil return nil
@@ -91,18 +90,19 @@ func (m *RAPLCollector) Init(config json.RawMessage) error {
// readZoneInfo reads RAPL monitoring attributes for a zone given by zonePath // readZoneInfo reads RAPL monitoring attributes for a zone given by zonePath
// See: https://www.kernel.org/doc/html/latest/power/powercap/powercap.html#monitoring-attributes // See: https://www.kernel.org/doc/html/latest/power/powercap/powercap.html#monitoring-attributes
readZoneInfo := func(zonePath string) (z struct { readZoneInfo := func(zonePath string) (
z struct {
name string // zones name e.g. psys, dram, core, uncore, package-0 name string // zones name e.g. psys, dram, core, uncore, package-0
energyFilepath string // path to a file containing the zones current energy counter in micro joules energyFilepath string // path to a file containing the zones current energy counter in micro joules
energy int64 // current reading of the energy counter in micro joules energy int64 // current reading of the energy counter in micro joules
energyTimestamp time.Time // timestamp when energy counter was read energyTimestamp time.Time // timestamp when energy counter was read
maxEnergyRange int64 // Range of the above energy counter in micro-joules maxEnergyRange int64 // Range of the above energy counter in micro-joules
ok bool // Are all information available? ok bool // Are all information available?
}) { },
) {
// zones name e.g. psys, dram, core, uncore, package-0 // zones name e.g. psys, dram, core, uncore, package-0
foundName := false foundName := false
if v, err := if v, err := os.ReadFile(
os.ReadFile(
filepath.Join(zonePath, "name")); err == nil { filepath.Join(zonePath, "name")); err == nil {
foundName = true foundName = true
z.name = strings.TrimSpace(string(v)) z.name = strings.TrimSpace(string(v))
@@ -124,8 +124,7 @@ func (m *RAPLCollector) Init(config json.RawMessage) error {
// Range of the above energy counter in micro-joules // Range of the above energy counter in micro-joules
foundMaxEnergyRange := false foundMaxEnergyRange := false
if v, err := if v, err := os.ReadFile(
os.ReadFile(
filepath.Join(zonePath, "max_energy_range_uj")); err == nil { filepath.Join(zonePath, "max_energy_range_uj")); err == nil {
if i, err := strconv.ParseInt(strings.TrimSpace(string(v)), 10, 64); err == nil { if i, err := strconv.ParseInt(strings.TrimSpace(string(v)), 10, 64); err == nil {
foundMaxEnergyRange = true foundMaxEnergyRange = true
@@ -158,8 +157,7 @@ func (m *RAPLCollector) Init(config json.RawMessage) error {
!isNameExcluded[z.name] { !isNameExcluded[z.name] {
// Add RAPL monitoring attributes for a zone // Add RAPL monitoring attributes for a zone
m.RAPLZoneInfo = m.RAPLZoneInfo = append(
append(
m.RAPLZoneInfo, m.RAPLZoneInfo,
RAPLZoneInfo{ RAPLZoneInfo{
tags: map[string]string{ tags: map[string]string{
@@ -187,8 +185,7 @@ func (m *RAPLCollector) Init(config json.RawMessage) error {
sz.ok && sz.ok &&
!isIDExcluded[zoneID+":"+subZoneID] && !isIDExcluded[zoneID+":"+subZoneID] &&
!isNameExcluded[sz.name] { !isNameExcluded[sz.name] {
m.RAPLZoneInfo = m.RAPLZoneInfo = append(
append(
m.RAPLZoneInfo, m.RAPLZoneInfo,
RAPLZoneInfo{ RAPLZoneInfo{
tags: map[string]string{ tags: map[string]string{
@@ -207,7 +204,6 @@ func (m *RAPLCollector) Init(config json.RawMessage) error {
if m.RAPLZoneInfo == nil { if m.RAPLZoneInfo == nil {
return fmt.Errorf("no running average power limit (RAPL) device found in %s", controlTypePath) return fmt.Errorf("no running average power limit (RAPL) device found in %s", controlTypePath)
} }
// Initialized // Initialized
@@ -224,7 +220,6 @@ func (m *RAPLCollector) Init(config json.RawMessage) error {
// Read reads running average power limit (RAPL) monitoring attributes for all initialized zones // Read reads running average power limit (RAPL) monitoring attributes for all initialized zones
// See: https://www.kernel.org/doc/html/latest/power/powercap/powercap.html#monitoring-attributes // See: https://www.kernel.org/doc/html/latest/power/powercap/powercap.html#monitoring-attributes
func (m *RAPLCollector) Read(interval time.Duration, output chan lp.CCMessage) { func (m *RAPLCollector) Read(interval time.Duration, output chan lp.CCMessage) {
for i := range m.RAPLZoneInfo { for i := range m.RAPLZoneInfo {
p := &m.RAPLZoneInfo[i] p := &m.RAPLZoneInfo[i]

View File

@@ -58,15 +58,6 @@ func (m *RocmSmiCollector) Init(config json.RawMessage) error {
if err := m.setup(); err != nil { if err := m.setup(); err != nil {
return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err) return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err)
} }
// Define meta information sent with each metric
// (Can also be dynamic or this is the basic set with extension through AddMeta())
//m.meta = map[string]string{"source": m.name, "group": "AMD"}
// Define tags sent with each metric
// The 'type' tag is always needed, it defines the granulatity of the metric
// node -> whole system
// socket -> CPU socket (requires socket ID as 'type-id' tag)
// cpu -> single CPU hardware thread (requires cpu ID as 'type-id' tag)
//m.tags = map[string]string{"type": "node"}
// Read in the JSON configuration // Read in the JSON configuration
if len(config) > 0 { if len(config) > 0 {
err = json.Unmarshal(config, &m.config) err = json.Unmarshal(config, &m.config)
@@ -305,7 +296,6 @@ func (m *RocmSmiCollector) Read(interval time.Duration, output chan lp.CCMessage
} }
} }
} }
} }
// Close metric collector: close network connection, close files, close libraries, ... // Close metric collector: close network connection, close files, close libraries, ...

View File

@@ -101,7 +101,6 @@ func (m *SampleCollector) Read(interval time.Duration, output chan lp.CCMessage)
// Send it to output channel // Send it to output channel
output <- y output <- y
} }
} }
// Close metric collector: close network connection, close files, close libraries, ... // Close metric collector: close network connection, close files, close libraries, ...

View File

@@ -138,7 +138,7 @@ func (m *SchedstatCollector) Read(interval time.Duration, output chan lp.CCMessa
return return
} }
//timestamps // timestamps
now := time.Now() now := time.Now()
tsdelta := now.Sub(m.lastTimestamp) tsdelta := now.Sub(m.lastTimestamp)
@@ -166,7 +166,6 @@ func (m *SchedstatCollector) Read(interval time.Duration, output chan lp.CCMessa
} }
m.lastTimestamp = now m.lastTimestamp = now
} }
// Close metric collector: close network connection, close files, close libraries, ... // Close metric collector: close network connection, close files, close libraries, ...

View File

@@ -110,9 +110,11 @@ func (m *SlurmCgroupCollector) Init(config json.RawMessage) error {
m.parallel = true m.parallel = true
m.meta = map[string]string{ m.meta = map[string]string{
"source": m.name, "source": m.name,
"group": "SLURM"} "group": "SLURM",
}
m.tags = map[string]string{ m.tags = map[string]string{
"type": "hwthread"} "type": "hwthread",
}
m.cpuUsed = make(map[int]bool) m.cpuUsed = make(map[int]bool)
m.cgroupBase = defaultCgroupBase m.cgroupBase = defaultCgroupBase
@@ -265,7 +267,8 @@ func (m *SlurmCgroupCollector) Read(interval time.Duration, output chan lp.CCMes
coreTags, coreTags,
m.meta, m.meta,
map[string]any{ map[string]any{
"value": memPerCore}, "value": memPerCore,
},
timestamp); err == nil { timestamp); err == nil {
y.AddMeta("unit", "Bytes") y.AddMeta("unit", "Bytes")
output <- y output <- y
@@ -279,7 +282,8 @@ func (m *SlurmCgroupCollector) Read(interval time.Duration, output chan lp.CCMes
coreTags, coreTags,
m.meta, m.meta,
map[string]any{ map[string]any{
"value": maxMemPerCore}, "value": maxMemPerCore,
},
timestamp); err == nil { timestamp); err == nil {
y.AddMeta("unit", "Bytes") y.AddMeta("unit", "Bytes")
output <- y output <- y
@@ -293,7 +297,8 @@ func (m *SlurmCgroupCollector) Read(interval time.Duration, output chan lp.CCMes
coreTags, coreTags,
m.meta, m.meta,
map[string]any{ map[string]any{
"value": limitPerCore}, "value": limitPerCore,
},
timestamp); err == nil { timestamp); err == nil {
y.AddMeta("unit", "Bytes") y.AddMeta("unit", "Bytes")
output <- y output <- y
@@ -307,7 +312,8 @@ func (m *SlurmCgroupCollector) Read(interval time.Duration, output chan lp.CCMes
coreTags, coreTags,
m.meta, m.meta,
map[string]any{ map[string]any{
"value": cpuUserPerCore}, "value": cpuUserPerCore,
},
timestamp); err == nil { timestamp); err == nil {
y.AddMeta("unit", "%") y.AddMeta("unit", "%")
output <- y output <- y
@@ -321,7 +327,8 @@ func (m *SlurmCgroupCollector) Read(interval time.Duration, output chan lp.CCMes
coreTags, coreTags,
m.meta, m.meta,
map[string]any{ map[string]any{
"value": cpuSysPerCore}, "value": cpuSysPerCore,
},
timestamp); err == nil { timestamp); err == nil {
y.AddMeta("unit", "%") y.AddMeta("unit", "%")
output <- y output <- y
@@ -346,7 +353,8 @@ func (m *SlurmCgroupCollector) Read(interval time.Duration, output chan lp.CCMes
coreTags, coreTags,
m.meta, m.meta,
map[string]any{ map[string]any{
"value": 0}, "value": 0,
},
timestamp); err == nil { timestamp); err == nil {
y.AddMeta("unit", "Bytes") y.AddMeta("unit", "Bytes")
output <- y output <- y
@@ -359,7 +367,8 @@ func (m *SlurmCgroupCollector) Read(interval time.Duration, output chan lp.CCMes
coreTags, coreTags,
m.meta, m.meta,
map[string]any{ map[string]any{
"value": 0}, "value": 0,
},
timestamp); err == nil { timestamp); err == nil {
y.AddMeta("unit", "Bytes") y.AddMeta("unit", "Bytes")
output <- y output <- y
@@ -372,7 +381,8 @@ func (m *SlurmCgroupCollector) Read(interval time.Duration, output chan lp.CCMes
coreTags, coreTags,
m.meta, m.meta,
map[string]any{ map[string]any{
"value": 0}, "value": 0,
},
timestamp); err == nil { timestamp); err == nil {
y.AddMeta("unit", "Bytes") y.AddMeta("unit", "Bytes")
output <- y output <- y

View File

@@ -182,7 +182,6 @@ func (m *TempCollector) Init(config json.RawMessage) error {
} }
func (m *TempCollector) Read(interval time.Duration, output chan lp.CCMessage) { func (m *TempCollector) Read(interval time.Duration, output chan lp.CCMessage) {
for _, sensor := range m.sensors { for _, sensor := range m.sensors {
// Read sensor file // Read sensor file
buffer, err := os.ReadFile(sensor.file) buffer, err := os.ReadFile(sensor.file)
@@ -239,7 +238,6 @@ func (m *TempCollector) Read(interval time.Duration, output chan lp.CCMessage) {
} }
} }
} }
} }
func (m *TempCollector) Close() { func (m *TempCollector) Close() {

View File

@@ -18,8 +18,10 @@ import (
lp "github.com/ClusterCockpit/cc-lib/v2/ccMessage" lp "github.com/ClusterCockpit/cc-lib/v2/ccMessage"
) )
const MAX_NUM_PROCS = 10 const (
const DEFAULT_NUM_PROCS = 2 MAX_NUM_PROCS = 10
DEFAULT_NUM_PROCS = 2
)
type TopProcsCollectorConfig struct { type TopProcsCollectorConfig struct {
Num_procs int `json:"num_procs"` Num_procs int `json:"num_procs"`
@@ -87,7 +89,8 @@ func (m *TopProcsCollector) Read(interval time.Duration, output chan lp.CCMessag
m.tags, m.tags,
m.meta, m.meta,
map[string]any{ map[string]any{
"value": lines[i]}, "value": lines[i],
},
time.Now()) time.Now())
if err == nil { if err == nil {
output <- y output <- y

View File

@@ -72,10 +72,12 @@ var metricCacheLanguage = gval.NewLanguage(
gval.Function("getCpuList", getCpuListOfNode), gval.Function("getCpuList", getCpuListOfNode),
gval.Function("getCpuListOfType", getCpuListOfType), gval.Function("getCpuListOfType", getCpuListOfType),
) )
var language gval.Language = gval.NewLanguage( var language gval.Language = gval.NewLanguage(
gval.Full(), gval.Full(),
metricCacheLanguage, metricCacheLanguage,
) )
var evaluables = struct { var evaluables = struct {
mapping map[string]gval.Evaluable mapping map[string]gval.Evaluable
mutex sync.Mutex mutex sync.Mutex
@@ -359,8 +361,7 @@ func EvalBoolCondition(condition string, params map[string]any) (bool, error) {
evaluable, ok := evaluables.mapping[condition] evaluable, ok := evaluables.mapping[condition]
evaluables.mutex.Unlock() evaluables.mutex.Unlock()
if !ok { if !ok {
newcond := newcond := strings.ReplaceAll(
strings.ReplaceAll(
strings.ReplaceAll( strings.ReplaceAll(
condition, "'", "\""), "%", "\\") condition, "'", "\""), "%", "\\")
var err error var err error
@@ -381,8 +382,7 @@ func EvalFloat64Condition(condition string, params map[string]float64) (float64,
evaluable, ok := evaluables.mapping[condition] evaluable, ok := evaluables.mapping[condition]
evaluables.mutex.Unlock() evaluables.mutex.Unlock()
if !ok { if !ok {
newcond := newcond := strings.ReplaceAll(
strings.ReplaceAll(
strings.ReplaceAll( strings.ReplaceAll(
condition, "'", "\""), "%", "\\") condition, "'", "\""), "%", "\\")
var err error var err error

View File

@@ -35,7 +35,6 @@ func sumAnyType[T float64 | float32 | int | int32 | int64](values []T) (T, error
// Sum up values // Sum up values
func sumfunc(args any) (any, error) { func sumfunc(args any) (any, error) {
var err error var err error
switch values := args.(type) { switch values := args.(type) {
case []float64: case []float64:
@@ -168,7 +167,7 @@ func medianfunc(args any) (any, error) {
*/ */
func lenfunc(args any) (any, error) { func lenfunc(args any) (any, error) {
var err error = nil var err error
length := 0 length := 0
switch values := args.(type) { switch values := args.(type) {
case []float64: case []float64:

View File

@@ -79,7 +79,6 @@ func (c *metricCache) Init(output chan lp.CCMessage, ticker mct.MultiChanTicker,
// Start starts the metric cache // Start starts the metric cache
func (c *metricCache) Start() { func (c *metricCache) Start() {
c.tickchan = make(chan time.Time) c.tickchan = make(chan time.Time)
c.ticker.AddChannel(c.tickchan) c.ticker.AddChannel(c.tickchan)
// Router cache is done // Router cache is done
@@ -171,7 +170,6 @@ func (c *metricCache) GetPeriod(index int) (time.Time, time.Time, []lp.CCMessage
start = c.intervals[pindex].startstamp start = c.intervals[pindex].startstamp
stop = c.intervals[pindex].stopstamp stop = c.intervals[pindex].stopstamp
metrics = c.intervals[pindex].metrics metrics = c.intervals[pindex].metrics
//return c.intervals[pindex].startstamp, c.intervals[pindex].stopstamp, c.intervals[pindex].metrics
} else { } else {
metrics = make([]lp.CCMessage, 0) metrics = make([]lp.CCMessage, 0)
} }

View File

@@ -17,7 +17,6 @@ import (
"time" "time"
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
lp "github.com/ClusterCockpit/cc-lib/v2/ccMessage" lp "github.com/ClusterCockpit/cc-lib/v2/ccMessage"
mp "github.com/ClusterCockpit/cc-lib/v2/messageProcessor" mp "github.com/ClusterCockpit/cc-lib/v2/messageProcessor"
agg "github.com/ClusterCockpit/cc-metric-collector/internal/metricAggregator" agg "github.com/ClusterCockpit/cc-metric-collector/internal/metricAggregator"
@@ -244,7 +243,6 @@ func (r *metricRouter) Start() {
// Forward message received from collector channel // Forward message received from collector channel
coll_forward := func(p lp.CCMessage) { coll_forward := func(p lp.CCMessage) {
// receive from metric collector // receive from metric collector
//p.AddTag(r.config.HostnameTagName, r.hostname)
if r.config.IntervalStamp { if r.config.IntervalStamp {
p.SetTime(r.timestamp) p.SetTime(r.timestamp)
} }
@@ -292,7 +290,6 @@ func (r *metricRouter) Start() {
} }
r.wg.Go(func() { r.wg.Go(func() {
for { for {
select { select {
case <-r.done: case <-r.done:

View File

@@ -111,9 +111,7 @@ func fileToList(path string) []int {
// init initializes the cache structure // init initializes the cache structure
func init() { func init() {
getHWThreads := func() []int {
getHWThreads :=
func() []int {
globPath := filepath.Join(SYSFS_CPUBASE, "cpu[0-9]*") globPath := filepath.Join(SYSFS_CPUBASE, "cpu[0-9]*")
regexPath := filepath.Join(SYSFS_CPUBASE, "cpu([[:digit:]]+)") regexPath := filepath.Join(SYSFS_CPUBASE, "cpu([[:digit:]]+)")
regex := regexp.MustCompile(regexPath) regex := regexp.MustCompile(regexPath)
@@ -149,8 +147,7 @@ func init() {
return hwThreadIDs return hwThreadIDs
} }
getNumaDomain := getNumaDomain := func(basePath string) int {
func(basePath string) int {
globPath := filepath.Join(basePath, "node*") globPath := filepath.Join(basePath, "node*")
regexPath := filepath.Join(basePath, "node([[:digit:]]+)") regexPath := filepath.Join(basePath, "node([[:digit:]]+)")
regex := regexp.MustCompile(regexPath) regex := regexp.MustCompile(regexPath)
@@ -218,8 +215,7 @@ func init() {
// Lookup NUMA domain id // Lookup NUMA domain id
cache.NumaDomainList[i] = getNumaDomain(cpuBase) cache.NumaDomainList[i] = getNumaDomain(cpuBase)
cache.CpuData[i] = cache.CpuData[i] = HwthreadEntry{
HwthreadEntry{
CpuID: cache.HwthreadList[i], CpuID: cache.HwthreadList[i],
SMT: cache.SMTList[i], SMT: cache.SMTList[i],
CoreCPUsList: coreCPUsList, CoreCPUsList: coreCPUsList,