From 539581f95254deddc8ddc8308581ee26bd68944f Mon Sep 17 00:00:00 2001 From: Holger Obermaier <40787752+ho-ob@users.noreply.github.com> Date: Mon, 16 Feb 2026 14:16:03 +0100 Subject: [PATCH] Format with gofumpt --- Makefile | 11 ++ cc-metric-collector.go | 6 +- collectors/beegfsmetaMetric.go | 11 +- collectors/beegfsstorageMetric.go | 19 +-- collectors/collectorManager.go | 1 - collectors/cpufreqCpuinfoMetric.go | 1 - collectors/cpustatMetric.go | 3 +- collectors/diskstatMetric.go | 16 +- collectors/infinibandMetric.go | 81 +++++----- collectors/ipmiMetric.go | 4 - collectors/likwidMetric.go | 119 +++++++-------- collectors/loadavgMetric.go | 9 +- collectors/lustreMetric.go | 8 +- collectors/memstatMetric.go | 6 +- collectors/nfsiostatMetric.go | 9 +- collectors/nvidiaMetric.go | 2 +- collectors/raplMetric.go | 83 +++++------ collectors/rocmsmiMetric.go | 10 -- collectors/sampleMetric.go | 1 - collectors/schedstatMetric.go | 3 +- collectors/slurmCgroupMetric.go | 30 ++-- collectors/tempMetric.go | 2 - collectors/topprocsMetric.go | 9 +- internal/metricAggregator/metricAggregator.go | 12 +- .../metricAggregatorFunctions.go | 3 +- internal/metricRouter/metricCache.go | 2 - internal/metricRouter/metricRouter.go | 3 - pkg/ccTopology/ccTopology.go | 140 +++++++++--------- 28 files changed, 294 insertions(+), 310 deletions(-) diff --git a/Makefile b/Makefile index 0fbec79..dc57050 100644 --- a/Makefile +++ b/Makefile @@ -58,6 +58,17 @@ fmt: $(GOBIN) fmt $(GOSRC_APP) @for F in $(GOSRC_INTERNAL); do $(GOBIN) fmt $$F; done +# gofumpt : +# Enforce a stricter format than gofmt +.PHONY: gofumpt +fmt: + $(GOBIN) install mvdan.cc/gofumpt@latest + gofumpt -w $(GOSRC_COLLECTORS) + gofumpt -w $(GOSRC_SINKS) + gofumpt -w $(GOSRC_RECEIVERS) + gofumpt -w $(GOSRC_APP) + @for F in $(GOSRC_INTERNAL); do gofumpt -w $$F; done + # Examine Go source code and reports suspicious constructs .PHONY: vet diff --git a/cc-metric-collector.go b/cc-metric-collector.go index fc07c99..ad008fb 100644 --- a/cc-metric-collector.go +++ b/cc-metric-collector.go @@ -12,16 +12,14 @@ import ( "flag" "os" "os/signal" + "sync" "syscall" + "time" "github.com/ClusterCockpit/cc-lib/v2/receivers" "github.com/ClusterCockpit/cc-lib/v2/sinks" "github.com/ClusterCockpit/cc-metric-collector/collectors" - // "strings" - "sync" - "time" - ccconf "github.com/ClusterCockpit/cc-lib/v2/ccConfig" cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" lp "github.com/ClusterCockpit/cc-lib/v2/ccMessage" diff --git a/collectors/beegfsmetaMetric.go b/collectors/beegfsmetaMetric.go index a47a306..3037838 100644 --- a/collectors/beegfsmetaMetric.go +++ b/collectors/beegfsmetaMetric.go @@ -50,7 +50,7 @@ func (m *BeegfsMetaCollector) Init(config json.RawMessage) error { return nil } // Metrics - var nodeMdstat_array = [39]string{ + nodeMdstat_array := [39]string{ "sum", "ack", "close", "entInf", "fndOwn", "mkdir", "create", "rddir", "refrEn", "mdsInf", "rmdir", "rmLnk", @@ -60,7 +60,8 @@ func (m *BeegfsMetaCollector) Init(config json.RawMessage) error { "lookLI", "statLI", "revalLI", "openLI", "createLI", "hardlnk", "flckAp", "flckEn", "flckRg", "dirparent", "listXA", "getXA", - "rmXA", "setXA", "mirror"} + "rmXA", "setXA", "mirror", + } m.name = "BeegfsMetaCollector" if err := m.setup(); err != nil { @@ -154,7 +155,6 @@ func (m *BeegfsMetaCollector) Read(interval time.Duration, output chan lp.CCMess // --nodetype=meta: The node type to query (meta, storage). // --interval: // --mount=/mnt/beeond/: Which mount point - //cmd := exec.Command(m.config.Beegfs, "/root/mc/test.txt") mountoption := "--mount=" + mountpoint cmd := exec.Command(m.config.Beegfs, "--clientstats", "--nodetype=meta", mountoption, "--allstats") @@ -180,14 +180,12 @@ func (m *BeegfsMetaCollector) Read(interval time.Duration, output chan lp.CCMess scanner := bufio.NewScanner(cmdStdout) sumLine := regexp.MustCompile(`^Sum:\s+\d+\s+\[[a-zA-Z]+\]+`) - //Line := regexp.MustCompile(`^(.*)\s+(\d)+\s+\[([a-zA-Z]+)\]+`) statsLine := regexp.MustCompile(`^(.*?)\s+?(\d.*?)$`) singleSpacePattern := regexp.MustCompile(`\s+`) removePattern := regexp.MustCompile(`[\[|\]]`) for scanner.Scan() { readLine := scanner.Text() - //fmt.Println(readLine) // Jump few lines, we only want the I/O stats from nodes if !sumLine.MatchString(readLine) { continue @@ -196,7 +194,7 @@ func (m *BeegfsMetaCollector) Read(interval time.Duration, output chan lp.CCMess match := statsLine.FindStringSubmatch(readLine) // nodeName = "Sum:" or would be nodes // nodeName := match[1] - //Remove multiple whitespaces + // Remove multiple whitespaces dummy := removePattern.ReplaceAllString(match[2], " ") metaStats := strings.TrimSpace(singleSpacePattern.ReplaceAllString(dummy, " ")) split := strings.Split(metaStats, " ") @@ -222,7 +220,6 @@ func (m *BeegfsMetaCollector) Read(interval time.Duration, output chan lp.CCMess fmt.Sprintf("Metric (other): Failed to convert str written '%s' to float: %v", m.matches["other"], err)) continue } - //mdStat["other"] = fmt.Sprintf("%f", f1+f2) m.matches["beegfs_cstorage_other"] = fmt.Sprintf("%f", f1+f2) } } diff --git a/collectors/beegfsstorageMetric.go b/collectors/beegfsstorageMetric.go index 8900519..4434d30 100644 --- a/collectors/beegfsstorageMetric.go +++ b/collectors/beegfsstorageMetric.go @@ -48,12 +48,13 @@ func (m *BeegfsStorageCollector) Init(config json.RawMessage) error { return nil } // Metrics - var storageStat_array = [18]string{ + storageStat_array := [18]string{ "sum", "ack", "sChDrct", "getFSize", "sAttr", "statfs", "trunc", "close", "fsync", "ops-rd", "MiB-rd/s", "ops-wr", "MiB-wr/s", "gendbg", "hrtbeat", "remNode", - "storInf", "unlnk"} + "storInf", "unlnk", + } m.name = "BeegfsStorageCollector" if err := m.setup(); err != nil { @@ -72,7 +73,7 @@ func (m *BeegfsStorageCollector) Init(config json.RawMessage) error { } } - //create map with possible variables + // Create map with possible variables m.matches = make(map[string]string) for _, value := range storageStat_array { if slices.Contains(m.config.ExcludeMetrics, value) { @@ -117,11 +118,10 @@ func (m *BeegfsStorageCollector) Read(interval time.Duration, output chan lp.CCM if !m.init { return } - //get mounpoint - buffer, _ := os.ReadFile(string("/proc/mounts")) - mounts := strings.Split(string(buffer), "\n") + // Get mounpoint + buffer, _ := os.ReadFile("/proc/mounts") var mountpoints []string - for _, line := range mounts { + for line := range strings.Lines(string(buffer)) { if len(line) == 0 { continue } @@ -146,7 +146,6 @@ func (m *BeegfsStorageCollector) Read(interval time.Duration, output chan lp.CCM // --nodetype=meta: The node type to query (meta, storage). // --interval: // --mount=/mnt/beeond/: Which mount point - //cmd := exec.Command(m.config.Beegfs, "/root/mc/test.txt") mountoption := "--mount=" + mountpoint cmd := exec.Command(m.config.Beegfs, "--clientstats", "--nodetype=storage", mountoption, "--allstats") @@ -172,7 +171,6 @@ func (m *BeegfsStorageCollector) Read(interval time.Duration, output chan lp.CCM scanner := bufio.NewScanner(cmdStdout) sumLine := regexp.MustCompile(`^Sum:\s+\d+\s+\[[a-zA-Z]+\]+`) - //Line := regexp.MustCompile(`^(.*)\s+(\d)+\s+\[([a-zA-Z]+)\]+`) statsLine := regexp.MustCompile(`^(.*?)\s+?(\d.*?)$`) singleSpacePattern := regexp.MustCompile(`\s+`) removePattern := regexp.MustCompile(`[\[|\]]`) @@ -187,7 +185,7 @@ func (m *BeegfsStorageCollector) Read(interval time.Duration, output chan lp.CCM match := statsLine.FindStringSubmatch(readLine) // nodeName = "Sum:" or would be nodes // nodeName := match[1] - //Remove multiple whitespaces + // Remove multiple whitespaces dummy := removePattern.ReplaceAllString(match[2], " ") metaStats := strings.TrimSpace(singleSpacePattern.ReplaceAllString(dummy, " ")) split := strings.Split(metaStats, " ") @@ -198,7 +196,6 @@ func (m *BeegfsStorageCollector) Read(interval time.Duration, output chan lp.CCM for i := 0; i <= len(split)-1; i += 2 { if _, ok := m.matches[split[i+1]]; ok { m.matches["beegfs_cstorage_"+split[i+1]] = split[i] - //m.matches[split[i+1]] = split[i] } else { f1, err := strconv.ParseFloat(m.matches["other"], 32) if err != nil { diff --git a/collectors/collectorManager.go b/collectors/collectorManager.go index c01356a..1bbb78a 100644 --- a/collectors/collectorManager.go +++ b/collectors/collectorManager.go @@ -20,7 +20,6 @@ import ( // Map of all available metric collectors var AvailableCollectors = map[string]MetricCollector{ - "likwid": new(LikwidCollector), "loadavg": new(LoadavgCollector), "memstat": new(MemstatCollector), diff --git a/collectors/cpufreqCpuinfoMetric.go b/collectors/cpufreqCpuinfoMetric.go index 090db4b..a79ca43 100644 --- a/collectors/cpufreqCpuinfoMetric.go +++ b/collectors/cpufreqCpuinfoMetric.go @@ -10,7 +10,6 @@ package collectors import ( "bufio" "encoding/json" - "fmt" "os" "strconv" diff --git a/collectors/cpustatMetric.go b/collectors/cpustatMetric.go index 46f91f4..085e6e4 100644 --- a/collectors/cpustatMetric.go +++ b/collectors/cpustatMetric.go @@ -111,7 +111,8 @@ func (m *CpustatCollector) Init(config json.RawMessage) error { cpu, _ := strconv.Atoi(cpustr) m.cputags[linefields[0]] = map[string]string{ "type": "hwthread", - "type-id": strconv.Itoa(cpu)} + "type-id": strconv.Itoa(cpu), + } m.olddata[linefields[0]] = make(map[string]int64) for k, v := range m.matches { m.olddata[linefields[0]][k], _ = strconv.ParseInt(linefields[v], 0, 64) diff --git a/collectors/diskstatMetric.go b/collectors/diskstatMetric.go index 2d6061f..7699968 100644 --- a/collectors/diskstatMetric.go +++ b/collectors/diskstatMetric.go @@ -123,28 +123,30 @@ mountLoop: continue } tags := map[string]string{"type": "node", "device": linefields[0]} - total := (stat.Blocks * uint64(stat.Bsize)) / uint64(1000000000) + total := (stat.Blocks * uint64(stat.Bsize)) / uint64(1000_000_000) if m.allowedMetrics["disk_total"] { y, err := lp.NewMessage( "disk_total", tags, m.meta, map[string]any{ - "value": total}, + "value": total, + }, time.Now()) if err == nil { y.AddMeta("unit", "GBytes") output <- y } } - free := (stat.Bfree * uint64(stat.Bsize)) / uint64(1000000000) + free := (stat.Bfree * uint64(stat.Bsize)) / uint64(1000_000_000) if m.allowedMetrics["disk_free"] { y, err := lp.NewMessage( "disk_free", tags, m.meta, map[string]any{ - "value": free}, + "value": free, + }, time.Now()) if err == nil { y.AddMeta("unit", "GBytes") @@ -162,10 +164,12 @@ mountLoop: y, err := lp.NewMessage( "part_max_used", map[string]string{ - "type": "node"}, + "type": "node", + }, m.meta, map[string]any{ - "value": int(part_max_used)}, + "value": int(part_max_used), + }, time.Now()) if err == nil { y.AddMeta("unit", "percent") diff --git a/collectors/infinibandMetric.go b/collectors/infinibandMetric.go index 1464683..2059011 100644 --- a/collectors/infinibandMetric.go +++ b/collectors/infinibandMetric.go @@ -8,19 +8,18 @@ package collectors import ( + "encoding/json" "fmt" "os" + "path/filepath" "slices" + "strconv" + "strings" + "time" cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" lp "github.com/ClusterCockpit/cc-lib/v2/ccMessage" "golang.org/x/sys/unix" - - "encoding/json" - "path/filepath" - "strconv" - "strings" - "time" ) const IB_BASEPATH = "/sys/class/infiniband/" @@ -59,7 +58,6 @@ type InfinibandCollector struct { // Init initializes the Infiniband collector by walking through files below IB_BASEPATH func (m *InfinibandCollector) Init(config json.RawMessage) error { - // Check if already initialized if m.init { return nil @@ -187,7 +185,6 @@ func (m *InfinibandCollector) Init(config json.RawMessage) error { // Read reads Infiniband counter files below IB_BASEPATH func (m *InfinibandCollector) Read(interval time.Duration, output chan lp.CCMessage) { - // Check if already initialized if !m.init { return @@ -233,15 +230,14 @@ func (m *InfinibandCollector) Read(interval time.Duration, output chan lp.CCMess // Send absolut values if m.config.SendAbsoluteValues { - if y, err := - lp.NewMessage( - counterDef.name, - info.tagSet, - m.meta, - map[string]any{ - "value": counterDef.currentState, - }, - now); err == nil { + if y, err := lp.NewMessage( + counterDef.name, + info.tagSet, + m.meta, + map[string]any{ + "value": counterDef.currentState, + }, + now); err == nil { y.AddMeta("unit", counterDef.unit) output <- y } @@ -251,15 +247,14 @@ func (m *InfinibandCollector) Read(interval time.Duration, output chan lp.CCMess if m.config.SendDerivedValues { if counterDef.lastState >= 0 { rate := float64((counterDef.currentState - counterDef.lastState)) / timeDiff - if y, err := - lp.NewMessage( - counterDef.name+"_bw", - info.tagSet, - m.meta, - map[string]any{ - "value": rate, - }, - now); err == nil { + if y, err := lp.NewMessage( + counterDef.name+"_bw", + info.tagSet, + m.meta, + map[string]any{ + "value": rate, + }, + now); err == nil { y.AddMeta("unit", counterDef.unit+"/sec") output <- y @@ -281,28 +276,26 @@ func (m *InfinibandCollector) Read(interval time.Duration, output chan lp.CCMess // Send total values if m.config.SendTotalValues { - if y, err := - lp.NewMessage( - "ib_total", - info.tagSet, - m.meta, - map[string]any{ - "value": ib_total, - }, - now); err == nil { + if y, err := lp.NewMessage( + "ib_total", + info.tagSet, + m.meta, + map[string]any{ + "value": ib_total, + }, + now); err == nil { y.AddMeta("unit", "bytes") output <- y } - if y, err := - lp.NewMessage( - "ib_total_pkts", - info.tagSet, - m.meta, - map[string]any{ - "value": ib_total_pkts, - }, - now); err == nil { + if y, err := lp.NewMessage( + "ib_total_pkts", + info.tagSet, + m.meta, + map[string]any{ + "value": ib_total_pkts, + }, + now); err == nil { y.AddMeta("unit", "packets") output <- y } diff --git a/collectors/ipmiMetric.go b/collectors/ipmiMetric.go index d56af2f..63f6253 100644 --- a/collectors/ipmiMetric.go +++ b/collectors/ipmiMetric.go @@ -93,7 +93,6 @@ func (m *IpmiCollector) Init(config json.RawMessage) error { } func (m *IpmiCollector) readIpmiTool(cmd string, output chan lp.CCMessage) { - // Setup ipmitool command command := exec.Command(cmd, "sensor") stdout, _ := command.StdoutPipe() @@ -152,7 +151,6 @@ func (m *IpmiCollector) readIpmiTool(cmd string, output chan lp.CCMessage) { } func (m *IpmiCollector) readIpmiSensors(cmd string, output chan lp.CCMessage) { - // Setup ipmisensors command command := exec.Command(cmd, "--comma-separated-output", "--sdr-cache-recreate") stdout, _ := command.StdoutPipe() @@ -197,11 +195,9 @@ func (m *IpmiCollector) readIpmiSensors(cmd string, output chan lp.CCMessage) { cclog.ComponentError(m.name, fmt.Sprintf("readIpmiSensors(): command stderr: \"%s\"\n", strings.TrimSpace(string(errMsg)))) return } - } func (m *IpmiCollector) Read(interval time.Duration, output chan lp.CCMessage) { - // Check if already initialized if !m.init { return diff --git a/collectors/likwidMetric.go b/collectors/likwidMetric.go index 3db84cf..b03bb18 100644 --- a/collectors/likwidMetric.go +++ b/collectors/likwidMetric.go @@ -609,18 +609,17 @@ func (m *LikwidCollector) calcEventsetMetrics(evset LikwidEventsetConfig, interv evset.metrics[tid][metric.Name] = value // Now we have the result, send it with the proper tags if !math.IsNaN(value) && metric.Publish { - y, err := - lp.NewMessage( - metric.Name, - map[string]string{ - "type": metric.Type, - }, - m.meta, - map[string]any{ - "value": value, - }, - now, - ) + y, err := lp.NewMessage( + metric.Name, + map[string]string{ + "type": metric.Type, + }, + m.meta, + map[string]any{ + "value": value, + }, + now, + ) if err == nil { if metric.Type != "node" { y.AddTag("type-id", strconv.Itoa(domain)) @@ -648,19 +647,18 @@ func (m *LikwidCollector) calcEventsetMetrics(evset LikwidEventsetConfig, interv } for coreID, value := range totalCoreValues { - y, err := - lp.NewMessage( - metric.Name, - map[string]string{ - "type": "core", - "type-id": strconv.Itoa(coreID), - }, - m.meta, - map[string]any{ - "value": value, - }, - now, - ) + y, err := lp.NewMessage( + metric.Name, + map[string]string{ + "type": "core", + "type-id": strconv.Itoa(coreID), + }, + m.meta, + map[string]any{ + "value": value, + }, + now, + ) if err != nil { continue } @@ -685,19 +683,18 @@ func (m *LikwidCollector) calcEventsetMetrics(evset LikwidEventsetConfig, interv } for socketID, value := range totalSocketValues { - y, err := - lp.NewMessage( - metric.Name, - map[string]string{ - "type": "socket", - "type-id": strconv.Itoa(socketID), - }, - m.meta, - map[string]any{ - "value": value, - }, - now, - ) + y, err := lp.NewMessage( + metric.Name, + map[string]string{ + "type": "socket", + "type-id": strconv.Itoa(socketID), + }, + m.meta, + map[string]any{ + "value": value, + }, + now, + ) if err != nil { continue } @@ -720,18 +717,17 @@ func (m *LikwidCollector) calcEventsetMetrics(evset LikwidEventsetConfig, interv } } - y, err := - lp.NewMessage( - metric.Name, - map[string]string{ - "type": "node", - }, - m.meta, - map[string]any{ - "value": totalNodeValue, - }, - now, - ) + y, err := lp.NewMessage( + metric.Name, + map[string]string{ + "type": "node", + }, + m.meta, + map[string]any{ + "value": totalNodeValue, + }, + now, + ) if err != nil { continue } @@ -778,18 +774,17 @@ func (m *LikwidCollector) calcGlobalMetrics(groups []LikwidEventsetConfig, inter // Now we have the result, send it with the proper tags if !math.IsNaN(value) { if metric.Publish { - y, err := - lp.NewMessage( - metric.Name, - map[string]string{ - "type": metric.Type, - }, - m.meta, - map[string]any{ - "value": value, - }, - now, - ) + y, err := lp.NewMessage( + metric.Name, + map[string]string{ + "type": metric.Type, + }, + m.meta, + map[string]any{ + "value": value, + }, + now, + ) if err == nil { if metric.Type != "node" { y.AddTag("type-id", strconv.Itoa(domain)) diff --git a/collectors/loadavgMetric.go b/collectors/loadavgMetric.go index af7a2f1..a158e80 100644 --- a/collectors/loadavgMetric.go +++ b/collectors/loadavgMetric.go @@ -55,16 +55,19 @@ func (m *LoadavgCollector) Init(config json.RawMessage) error { } m.meta = map[string]string{ "source": m.name, - "group": "LOAD"} + "group": "LOAD", + } m.tags = map[string]string{"type": "node"} m.load_matches = []string{ "load_one", "load_five", - "load_fifteen"} + "load_fifteen", + } m.load_skips = make([]bool, len(m.load_matches)) m.proc_matches = []string{ "proc_run", - "proc_total"} + "proc_total", + } m.proc_skips = make([]bool, len(m.proc_matches)) for i, name := range m.load_matches { diff --git a/collectors/lustreMetric.go b/collectors/lustreMetric.go index c9eec97..f79c1fe 100644 --- a/collectors/lustreMetric.go +++ b/collectors/lustreMetric.go @@ -22,9 +22,11 @@ import ( lp "github.com/ClusterCockpit/cc-lib/v2/ccMessage" ) -const LUSTRE_SYSFS = `/sys/fs/lustre` -const LCTL_CMD = `lctl` -const LCTL_OPTION = `get_param` +const ( + LUSTRE_SYSFS = `/sys/fs/lustre` + LCTL_CMD = `lctl` + LCTL_OPTION = `get_param` +) type LustreCollectorConfig struct { LCtlCommand string `json:"lctl_command,omitempty"` diff --git a/collectors/memstatMetric.go b/collectors/memstatMetric.go index 963fdc0..a2acdb1 100644 --- a/collectors/memstatMetric.go +++ b/collectors/memstatMetric.go @@ -24,8 +24,10 @@ import ( lp "github.com/ClusterCockpit/cc-lib/v2/ccMessage" ) -const MEMSTATFILE = "/proc/meminfo" -const NUMA_MEMSTAT_BASE = "/sys/devices/system/node" +const ( + MEMSTATFILE = "/proc/meminfo" + NUMA_MEMSTAT_BASE = "/sys/devices/system/node" +) type MemstatCollectorConfig struct { ExcludeMetrics []string `json:"exclude_metrics"` diff --git a/collectors/nfsiostatMetric.go b/collectors/nfsiostatMetric.go index ad14391..abb55a9 100644 --- a/collectors/nfsiostatMetric.go +++ b/collectors/nfsiostatMetric.go @@ -43,8 +43,10 @@ type NfsIOStatCollector struct { lastTimestamp time.Time } -var deviceRegex = regexp.MustCompile(`device (?P[^ ]+) mounted on (?P[^ ]+) with fstype nfs(?P\d*) statvers=[\d\.]+`) -var bytesRegex = regexp.MustCompile(`\s+bytes:\s+(?P[^ ]+) (?P[^ ]+) (?P[^ ]+) (?P[^ ]+) (?P[^ ]+) (?P[^ ]+) (?P[^ ]+) (?P[^ ]+)`) +var ( + deviceRegex = regexp.MustCompile(`device (?P[^ ]+) mounted on (?P[^ ]+) with fstype nfs(?P\d*) statvers=[\d\.]+`) + bytesRegex = regexp.MustCompile(`\s+bytes:\s+(?P[^ ]+) (?P[^ ]+) (?P[^ ]+) (?P[^ ]+) (?P[^ ]+) (?P[^ ]+) (?P[^ ]+) (?P[^ ]+)`) +) func resolve_regex_fields(s string, regex *regexp.Regexp) map[string]string { fields := make(map[string]string) @@ -149,7 +151,8 @@ func (m *NfsIOStatCollector) Read(interval time.Duration, output chan lp.CCMessa m.tags, m.meta, map[string]any{ - "value": newVal}, + "value": newVal, + }, now) if err == nil { msg.AddTag("stype", "filesystem") diff --git a/collectors/nvidiaMetric.go b/collectors/nvidiaMetric.go index 13109d5..2f401ac 100644 --- a/collectors/nvidiaMetric.go +++ b/collectors/nvidiaMetric.go @@ -1226,7 +1226,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMessage) } // Actual read loop over all attached Nvidia GPUs - for i := 0; i < m.num_gpus; i++ { + for i := range m.num_gpus { readAll(&m.gpus[i], output) diff --git a/collectors/raplMetric.go b/collectors/raplMetric.go index 1bc6d55..c2fc245 100644 --- a/collectors/raplMetric.go +++ b/collectors/raplMetric.go @@ -49,7 +49,6 @@ type RAPLCollector struct { // Init initializes the running average power limit (RAPL) collector func (m *RAPLCollector) Init(config json.RawMessage) error { - // Check if already initialized if m.init { return nil @@ -91,19 +90,20 @@ func (m *RAPLCollector) Init(config json.RawMessage) error { // readZoneInfo reads RAPL monitoring attributes for a zone given by zonePath // See: https://www.kernel.org/doc/html/latest/power/powercap/powercap.html#monitoring-attributes - readZoneInfo := func(zonePath string) (z struct { - name string // zones name e.g. psys, dram, core, uncore, package-0 - energyFilepath string // path to a file containing the zones current energy counter in micro joules - energy int64 // current reading of the energy counter in micro joules - energyTimestamp time.Time // timestamp when energy counter was read - maxEnergyRange int64 // Range of the above energy counter in micro-joules - ok bool // Are all information available? - }) { + readZoneInfo := func(zonePath string) ( + z struct { + name string // zones name e.g. psys, dram, core, uncore, package-0 + energyFilepath string // path to a file containing the zones current energy counter in micro joules + energy int64 // current reading of the energy counter in micro joules + energyTimestamp time.Time // timestamp when energy counter was read + maxEnergyRange int64 // Range of the above energy counter in micro-joules + ok bool // Are all information available? + }, + ) { // zones name e.g. psys, dram, core, uncore, package-0 foundName := false - if v, err := - os.ReadFile( - filepath.Join(zonePath, "name")); err == nil { + if v, err := os.ReadFile( + filepath.Join(zonePath, "name")); err == nil { foundName = true z.name = strings.TrimSpace(string(v)) } @@ -124,9 +124,8 @@ func (m *RAPLCollector) Init(config json.RawMessage) error { // Range of the above energy counter in micro-joules foundMaxEnergyRange := false - if v, err := - os.ReadFile( - filepath.Join(zonePath, "max_energy_range_uj")); err == nil { + if v, err := os.ReadFile( + filepath.Join(zonePath, "max_energy_range_uj")); err == nil { if i, err := strconv.ParseInt(strings.TrimSpace(string(v)), 10, 64); err == nil { foundMaxEnergyRange = true z.maxEnergyRange = i @@ -158,19 +157,18 @@ func (m *RAPLCollector) Init(config json.RawMessage) error { !isNameExcluded[z.name] { // Add RAPL monitoring attributes for a zone - m.RAPLZoneInfo = - append( - m.RAPLZoneInfo, - RAPLZoneInfo{ - tags: map[string]string{ - "id": zoneID, - "zone_name": z.name, - }, - energyFilepath: z.energyFilepath, - energy: z.energy, - energyTimestamp: z.energyTimestamp, - maxEnergyRange: z.maxEnergyRange, - }) + m.RAPLZoneInfo = append( + m.RAPLZoneInfo, + RAPLZoneInfo{ + tags: map[string]string{ + "id": zoneID, + "zone_name": z.name, + }, + energyFilepath: z.energyFilepath, + energy: z.energy, + energyTimestamp: z.energyTimestamp, + maxEnergyRange: z.maxEnergyRange, + }) } // find all sub zones for the given zone @@ -187,27 +185,25 @@ func (m *RAPLCollector) Init(config json.RawMessage) error { sz.ok && !isIDExcluded[zoneID+":"+subZoneID] && !isNameExcluded[sz.name] { - m.RAPLZoneInfo = - append( - m.RAPLZoneInfo, - RAPLZoneInfo{ - tags: map[string]string{ - "id": zoneID + ":" + subZoneID, - "zone_name": z.name, - "sub_zone_name": sz.name, - }, - energyFilepath: sz.energyFilepath, - energy: sz.energy, - energyTimestamp: sz.energyTimestamp, - maxEnergyRange: sz.maxEnergyRange, - }) + m.RAPLZoneInfo = append( + m.RAPLZoneInfo, + RAPLZoneInfo{ + tags: map[string]string{ + "id": zoneID + ":" + subZoneID, + "zone_name": z.name, + "sub_zone_name": sz.name, + }, + energyFilepath: sz.energyFilepath, + energy: sz.energy, + energyTimestamp: sz.energyTimestamp, + maxEnergyRange: sz.maxEnergyRange, + }) } } } if m.RAPLZoneInfo == nil { return fmt.Errorf("no running average power limit (RAPL) device found in %s", controlTypePath) - } // Initialized @@ -224,7 +220,6 @@ func (m *RAPLCollector) Init(config json.RawMessage) error { // Read reads running average power limit (RAPL) monitoring attributes for all initialized zones // See: https://www.kernel.org/doc/html/latest/power/powercap/powercap.html#monitoring-attributes func (m *RAPLCollector) Read(interval time.Duration, output chan lp.CCMessage) { - for i := range m.RAPLZoneInfo { p := &m.RAPLZoneInfo[i] diff --git a/collectors/rocmsmiMetric.go b/collectors/rocmsmiMetric.go index c7e724e..cbfa929 100644 --- a/collectors/rocmsmiMetric.go +++ b/collectors/rocmsmiMetric.go @@ -58,15 +58,6 @@ func (m *RocmSmiCollector) Init(config json.RawMessage) error { if err := m.setup(); err != nil { return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err) } - // Define meta information sent with each metric - // (Can also be dynamic or this is the basic set with extension through AddMeta()) - //m.meta = map[string]string{"source": m.name, "group": "AMD"} - // Define tags sent with each metric - // The 'type' tag is always needed, it defines the granulatity of the metric - // node -> whole system - // socket -> CPU socket (requires socket ID as 'type-id' tag) - // cpu -> single CPU hardware thread (requires cpu ID as 'type-id' tag) - //m.tags = map[string]string{"type": "node"} // Read in the JSON configuration if len(config) > 0 { err = json.Unmarshal(config, &m.config) @@ -305,7 +296,6 @@ func (m *RocmSmiCollector) Read(interval time.Duration, output chan lp.CCMessage } } } - } // Close metric collector: close network connection, close files, close libraries, ... diff --git a/collectors/sampleMetric.go b/collectors/sampleMetric.go index 5a5f37c..4a9d82f 100644 --- a/collectors/sampleMetric.go +++ b/collectors/sampleMetric.go @@ -101,7 +101,6 @@ func (m *SampleCollector) Read(interval time.Duration, output chan lp.CCMessage) // Send it to output channel output <- y } - } // Close metric collector: close network connection, close files, close libraries, ... diff --git a/collectors/schedstatMetric.go b/collectors/schedstatMetric.go index 92bb4c9..313fcfc 100644 --- a/collectors/schedstatMetric.go +++ b/collectors/schedstatMetric.go @@ -138,7 +138,7 @@ func (m *SchedstatCollector) Read(interval time.Duration, output chan lp.CCMessa return } - //timestamps + // timestamps now := time.Now() tsdelta := now.Sub(m.lastTimestamp) @@ -166,7 +166,6 @@ func (m *SchedstatCollector) Read(interval time.Duration, output chan lp.CCMessa } m.lastTimestamp = now - } // Close metric collector: close network connection, close files, close libraries, ... diff --git a/collectors/slurmCgroupMetric.go b/collectors/slurmCgroupMetric.go index a8db0cd..f7ced2e 100644 --- a/collectors/slurmCgroupMetric.go +++ b/collectors/slurmCgroupMetric.go @@ -110,9 +110,11 @@ func (m *SlurmCgroupCollector) Init(config json.RawMessage) error { m.parallel = true m.meta = map[string]string{ "source": m.name, - "group": "SLURM"} + "group": "SLURM", + } m.tags = map[string]string{ - "type": "hwthread"} + "type": "hwthread", + } m.cpuUsed = make(map[int]bool) m.cgroupBase = defaultCgroupBase @@ -265,7 +267,8 @@ func (m *SlurmCgroupCollector) Read(interval time.Duration, output chan lp.CCMes coreTags, m.meta, map[string]any{ - "value": memPerCore}, + "value": memPerCore, + }, timestamp); err == nil { y.AddMeta("unit", "Bytes") output <- y @@ -279,7 +282,8 @@ func (m *SlurmCgroupCollector) Read(interval time.Duration, output chan lp.CCMes coreTags, m.meta, map[string]any{ - "value": maxMemPerCore}, + "value": maxMemPerCore, + }, timestamp); err == nil { y.AddMeta("unit", "Bytes") output <- y @@ -293,7 +297,8 @@ func (m *SlurmCgroupCollector) Read(interval time.Duration, output chan lp.CCMes coreTags, m.meta, map[string]any{ - "value": limitPerCore}, + "value": limitPerCore, + }, timestamp); err == nil { y.AddMeta("unit", "Bytes") output <- y @@ -307,7 +312,8 @@ func (m *SlurmCgroupCollector) Read(interval time.Duration, output chan lp.CCMes coreTags, m.meta, map[string]any{ - "value": cpuUserPerCore}, + "value": cpuUserPerCore, + }, timestamp); err == nil { y.AddMeta("unit", "%") output <- y @@ -321,7 +327,8 @@ func (m *SlurmCgroupCollector) Read(interval time.Duration, output chan lp.CCMes coreTags, m.meta, map[string]any{ - "value": cpuSysPerCore}, + "value": cpuSysPerCore, + }, timestamp); err == nil { y.AddMeta("unit", "%") output <- y @@ -346,7 +353,8 @@ func (m *SlurmCgroupCollector) Read(interval time.Duration, output chan lp.CCMes coreTags, m.meta, map[string]any{ - "value": 0}, + "value": 0, + }, timestamp); err == nil { y.AddMeta("unit", "Bytes") output <- y @@ -359,7 +367,8 @@ func (m *SlurmCgroupCollector) Read(interval time.Duration, output chan lp.CCMes coreTags, m.meta, map[string]any{ - "value": 0}, + "value": 0, + }, timestamp); err == nil { y.AddMeta("unit", "Bytes") output <- y @@ -372,7 +381,8 @@ func (m *SlurmCgroupCollector) Read(interval time.Duration, output chan lp.CCMes coreTags, m.meta, map[string]any{ - "value": 0}, + "value": 0, + }, timestamp); err == nil { y.AddMeta("unit", "Bytes") output <- y diff --git a/collectors/tempMetric.go b/collectors/tempMetric.go index bfc6a18..0b3ff57 100644 --- a/collectors/tempMetric.go +++ b/collectors/tempMetric.go @@ -182,7 +182,6 @@ func (m *TempCollector) Init(config json.RawMessage) error { } func (m *TempCollector) Read(interval time.Duration, output chan lp.CCMessage) { - for _, sensor := range m.sensors { // Read sensor file buffer, err := os.ReadFile(sensor.file) @@ -239,7 +238,6 @@ func (m *TempCollector) Read(interval time.Duration, output chan lp.CCMessage) { } } } - } func (m *TempCollector) Close() { diff --git a/collectors/topprocsMetric.go b/collectors/topprocsMetric.go index 6875b0b..bec0e6c 100644 --- a/collectors/topprocsMetric.go +++ b/collectors/topprocsMetric.go @@ -18,8 +18,10 @@ import ( lp "github.com/ClusterCockpit/cc-lib/v2/ccMessage" ) -const MAX_NUM_PROCS = 10 -const DEFAULT_NUM_PROCS = 2 +const ( + MAX_NUM_PROCS = 10 + DEFAULT_NUM_PROCS = 2 +) type TopProcsCollectorConfig struct { Num_procs int `json:"num_procs"` @@ -87,7 +89,8 @@ func (m *TopProcsCollector) Read(interval time.Duration, output chan lp.CCMessag m.tags, m.meta, map[string]any{ - "value": lines[i]}, + "value": lines[i], + }, time.Now()) if err == nil { output <- y diff --git a/internal/metricAggregator/metricAggregator.go b/internal/metricAggregator/metricAggregator.go index 18a4a30..6d4320a 100644 --- a/internal/metricAggregator/metricAggregator.go +++ b/internal/metricAggregator/metricAggregator.go @@ -72,10 +72,12 @@ var metricCacheLanguage = gval.NewLanguage( gval.Function("getCpuList", getCpuListOfNode), gval.Function("getCpuListOfType", getCpuListOfType), ) + var language gval.Language = gval.NewLanguage( gval.Full(), metricCacheLanguage, ) + var evaluables = struct { mapping map[string]gval.Evaluable mutex sync.Mutex @@ -359,10 +361,9 @@ func EvalBoolCondition(condition string, params map[string]any) (bool, error) { evaluable, ok := evaluables.mapping[condition] evaluables.mutex.Unlock() if !ok { - newcond := + newcond := strings.ReplaceAll( strings.ReplaceAll( - strings.ReplaceAll( - condition, "'", "\""), "%", "\\") + condition, "'", "\""), "%", "\\") var err error evaluable, err = language.NewEvaluable(newcond) if err != nil { @@ -381,10 +382,9 @@ func EvalFloat64Condition(condition string, params map[string]float64) (float64, evaluable, ok := evaluables.mapping[condition] evaluables.mutex.Unlock() if !ok { - newcond := + newcond := strings.ReplaceAll( strings.ReplaceAll( - strings.ReplaceAll( - condition, "'", "\""), "%", "\\") + condition, "'", "\""), "%", "\\") var err error evaluable, err = language.NewEvaluable(newcond) if err != nil { diff --git a/internal/metricAggregator/metricAggregatorFunctions.go b/internal/metricAggregator/metricAggregatorFunctions.go index ca5ecf9..ed64cc8 100644 --- a/internal/metricAggregator/metricAggregatorFunctions.go +++ b/internal/metricAggregator/metricAggregatorFunctions.go @@ -35,7 +35,6 @@ func sumAnyType[T float64 | float32 | int | int32 | int64](values []T) (T, error // Sum up values func sumfunc(args any) (any, error) { - var err error switch values := args.(type) { case []float64: @@ -168,7 +167,7 @@ func medianfunc(args any) (any, error) { */ func lenfunc(args any) (any, error) { - var err error = nil + var err error length := 0 switch values := args.(type) { case []float64: diff --git a/internal/metricRouter/metricCache.go b/internal/metricRouter/metricCache.go index 0a49c64..4acfa90 100644 --- a/internal/metricRouter/metricCache.go +++ b/internal/metricRouter/metricCache.go @@ -79,7 +79,6 @@ func (c *metricCache) Init(output chan lp.CCMessage, ticker mct.MultiChanTicker, // Start starts the metric cache func (c *metricCache) Start() { - c.tickchan = make(chan time.Time) c.ticker.AddChannel(c.tickchan) // Router cache is done @@ -171,7 +170,6 @@ func (c *metricCache) GetPeriod(index int) (time.Time, time.Time, []lp.CCMessage start = c.intervals[pindex].startstamp stop = c.intervals[pindex].stopstamp metrics = c.intervals[pindex].metrics - //return c.intervals[pindex].startstamp, c.intervals[pindex].stopstamp, c.intervals[pindex].metrics } else { metrics = make([]lp.CCMessage, 0) } diff --git a/internal/metricRouter/metricRouter.go b/internal/metricRouter/metricRouter.go index fc5e9ae..6087387 100644 --- a/internal/metricRouter/metricRouter.go +++ b/internal/metricRouter/metricRouter.go @@ -17,7 +17,6 @@ import ( "time" cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" - lp "github.com/ClusterCockpit/cc-lib/v2/ccMessage" mp "github.com/ClusterCockpit/cc-lib/v2/messageProcessor" agg "github.com/ClusterCockpit/cc-metric-collector/internal/metricAggregator" @@ -244,7 +243,6 @@ func (r *metricRouter) Start() { // Forward message received from collector channel coll_forward := func(p lp.CCMessage) { // receive from metric collector - //p.AddTag(r.config.HostnameTagName, r.hostname) if r.config.IntervalStamp { p.SetTime(r.timestamp) } @@ -292,7 +290,6 @@ func (r *metricRouter) Start() { } r.wg.Go(func() { - for { select { case <-r.done: diff --git a/pkg/ccTopology/ccTopology.go b/pkg/ccTopology/ccTopology.go index 8763835..2d8cf84 100644 --- a/pkg/ccTopology/ccTopology.go +++ b/pkg/ccTopology/ccTopology.go @@ -111,79 +111,76 @@ func fileToList(path string) []int { // init initializes the cache structure func init() { + getHWThreads := func() []int { + globPath := filepath.Join(SYSFS_CPUBASE, "cpu[0-9]*") + regexPath := filepath.Join(SYSFS_CPUBASE, "cpu([[:digit:]]+)") + regex := regexp.MustCompile(regexPath) - getHWThreads := - func() []int { - globPath := filepath.Join(SYSFS_CPUBASE, "cpu[0-9]*") - regexPath := filepath.Join(SYSFS_CPUBASE, "cpu([[:digit:]]+)") - regex := regexp.MustCompile(regexPath) + // File globbing for hardware threads + files, err := filepath.Glob(globPath) + if err != nil { + cclogger.ComponentError("CCTopology", "init:getHWThreads", err.Error()) + return nil + } - // File globbing for hardware threads - files, err := filepath.Glob(globPath) - if err != nil { - cclogger.ComponentError("CCTopology", "init:getHWThreads", err.Error()) + hwThreadIDs := make([]int, len(files)) + for i, file := range files { + // Extract hardware thread ID + matches := regex.FindStringSubmatch(file) + if len(matches) != 2 { + cclogger.ComponentError("CCTopology", "init:getHWThreads: Failed to extract hardware thread ID from ", file) return nil } - hwThreadIDs := make([]int, len(files)) - for i, file := range files { - // Extract hardware thread ID - matches := regex.FindStringSubmatch(file) - if len(matches) != 2 { - cclogger.ComponentError("CCTopology", "init:getHWThreads: Failed to extract hardware thread ID from ", file) - return nil - } - - // Convert hardware thread ID to int - id, err := strconv.Atoi(matches[1]) - if err != nil { - cclogger.ComponentError("CCTopology", "init:getHWThreads: Failed to convert to int hardware thread ID ", matches[1]) - return nil - } - - hwThreadIDs[i] = id - } - - // Sort hardware thread IDs - slices.Sort(hwThreadIDs) - return hwThreadIDs - } - - getNumaDomain := - func(basePath string) int { - globPath := filepath.Join(basePath, "node*") - regexPath := filepath.Join(basePath, "node([[:digit:]]+)") - regex := regexp.MustCompile(regexPath) - - // File globbing for NUMA node - files, err := filepath.Glob(globPath) - if err != nil { - cclogger.ComponentError("CCTopology", "init:getNumaDomain", err.Error()) - return -1 - } - - // Check, that exactly one NUMA domain was found - if len(files) != 1 { - cclogger.ComponentError("CCTopology", "init:getNumaDomain", "Number of NUMA domains != 1: ", len(files)) - return -1 - } - - // Extract NUMA node ID - matches := regex.FindStringSubmatch(files[0]) - if len(matches) != 2 { - cclogger.ComponentError("CCTopology", "init:getNumaDomain", "Failed to extract NUMA node ID from: ", files[0]) - return -1 - } - + // Convert hardware thread ID to int id, err := strconv.Atoi(matches[1]) if err != nil { - cclogger.ComponentError("CCTopology", "init:getNumaDomain", "Failed to parse NUMA node ID from: ", matches[1]) - return -1 + cclogger.ComponentError("CCTopology", "init:getHWThreads: Failed to convert to int hardware thread ID ", matches[1]) + return nil } - return id + hwThreadIDs[i] = id } + // Sort hardware thread IDs + slices.Sort(hwThreadIDs) + return hwThreadIDs + } + + getNumaDomain := func(basePath string) int { + globPath := filepath.Join(basePath, "node*") + regexPath := filepath.Join(basePath, "node([[:digit:]]+)") + regex := regexp.MustCompile(regexPath) + + // File globbing for NUMA node + files, err := filepath.Glob(globPath) + if err != nil { + cclogger.ComponentError("CCTopology", "init:getNumaDomain", err.Error()) + return -1 + } + + // Check, that exactly one NUMA domain was found + if len(files) != 1 { + cclogger.ComponentError("CCTopology", "init:getNumaDomain", "Number of NUMA domains != 1: ", len(files)) + return -1 + } + + // Extract NUMA node ID + matches := regex.FindStringSubmatch(files[0]) + if len(matches) != 2 { + cclogger.ComponentError("CCTopology", "init:getNumaDomain", "Failed to extract NUMA node ID from: ", files[0]) + return -1 + } + + id, err := strconv.Atoi(matches[1]) + if err != nil { + cclogger.ComponentError("CCTopology", "init:getNumaDomain", "Failed to parse NUMA node ID from: ", matches[1]) + return -1 + } + + return id + } + cache.HwthreadList = getHWThreads() cache.CoreList = make([]int, len(cache.HwthreadList)) cache.SocketList = make([]int, len(cache.HwthreadList)) @@ -218,16 +215,15 @@ func init() { // Lookup NUMA domain id cache.NumaDomainList[i] = getNumaDomain(cpuBase) - cache.CpuData[i] = - HwthreadEntry{ - CpuID: cache.HwthreadList[i], - SMT: cache.SMTList[i], - CoreCPUsList: coreCPUsList, - Socket: cache.SocketList[i], - NumaDomain: cache.NumaDomainList[i], - Die: cache.DieList[i], - Core: cache.CoreList[i], - } + cache.CpuData[i] = HwthreadEntry{ + CpuID: cache.HwthreadList[i], + SMT: cache.SMTList[i], + CoreCPUsList: coreCPUsList, + Socket: cache.SocketList[i], + NumaDomain: cache.NumaDomainList[i], + Die: cache.DieList[i], + Core: cache.CoreList[i], + } } slices.Sort(cache.HwthreadList)