diff --git a/.github/workflows/runonce.yml b/.github/workflows/runonce.yml index fedcc8b..1d712b7 100644 --- a/.github/workflows/runonce.yml +++ b/.github/workflows/runonce.yml @@ -49,7 +49,7 @@ jobs: # Running the linter requires likwid.h, which gets downloaded in the build step - name: Static Analysis with GolangCI-Lint and Upload Report with reviewdog run: | - golangci-lint run --enable modernize,staticcheck,govet | reviewdog -f=golangci-lint -name "Check golangci-lint on build-latest" -reporter=github-check -filter-mode=nofilter -fail-level none + golangci-lint run --enable errorlint,govet,misspell,modernize,prealloc,staticcheck,unconvert,wastedassign | reviewdog -f=golangci-lint -name "Check golangci-lint on build-latest" -reporter=github-check -filter-mode=nofilter -fail-level none env: REVIEWDOG_GITHUB_API_TOKEN: ${{ secrets.GITHUB_TOKEN }} diff --git a/cc-metric-collector.go b/cc-metric-collector.go index 994edda..fc07c99 100644 --- a/cc-metric-collector.go +++ b/cc-metric-collector.go @@ -50,30 +50,6 @@ type RuntimeConfig struct { Sync sync.WaitGroup } -//// Structure of the configuration file -//type GlobalConfig struct { -// Sink sinks.SinkConfig `json:"sink"` -// Interval int `json:"interval"` -// Duration int `json:"duration"` -// Collectors []string `json:"collectors"` -// Receiver receivers.ReceiverConfig `json:"receiver"` -// DefTags map[string]string `json:"default_tags"` -// CollectConfigs map[string]json.RawMessage `json:"collect_config"` -//} - -//// Load JSON configuration file -//func LoadConfiguration(file string, config *GlobalConfig) error { -// configFile, err := os.Open(file) -// defer configFile.Close() -// if err != nil { -// fmt.Println(err.Error()) -// return err -// } -// jsonParser := json.NewDecoder(configFile) -// err = jsonParser.Decode(config) -// return err -//} - func ReadCli() map[string]string { var m map[string]string cfg := flag.String("config", "./config.json", "Path to configuration file") @@ -93,22 +69,6 @@ func ReadCli() map[string]string { return m } -//func SetLogging(logfile string) error { -// var file *os.File -// var err error -// if logfile != "stderr" { -// file, err = os.OpenFile(logfile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0600) -// if err != nil { -// log.Fatal(err) -// return err -// } -// } else { -// file = os.Stderr -// } -// log.SetOutput(file) -// return nil -//} - // General shutdownHandler function that gets executed in case of interrupt or graceful shutdownHandler func shutdownHandler(config *RuntimeConfig, shutdownSignal chan os.Signal) { defer config.Sync.Done() @@ -216,11 +176,6 @@ func mainFunc() int { return 1 } - // Set log file - // if logfile := rcfg.CliArgs["logfile"]; logfile != "stderr" { - // cclog.SetOutput(logfile) - // } - // Creat new multi channel ticker rcfg.MultiChanTicker = mct.NewTicker(rcfg.Interval) diff --git a/collectors/beegfsmetaMetric.go b/collectors/beegfsmetaMetric.go index 5d44878..7d79346 100644 --- a/collectors/beegfsmetaMetric.go +++ b/collectors/beegfsmetaMetric.go @@ -78,7 +78,7 @@ func (m *BeegfsMetaCollector) Init(config json.RawMessage) error { } } - //create map with possible variables + // Create map with possible variables m.matches = make(map[string]string) for _, value := range nodeMdstat_array { if slices.Contains(m.config.ExcludeMetrics, value) { @@ -104,7 +104,7 @@ func (m *BeegfsMetaCollector) Init(config json.RawMessage) error { // Beegfs file system statistics can only be queried by user root user, err := user.Current() if err != nil { - return fmt.Errorf("BeegfsMetaCollector.Init(): Failed to get current user: %v", err) + return fmt.Errorf("BeegfsMetaCollector.Init(): Failed to get current user: %w", err) } if user.Uid != "0" { return fmt.Errorf("BeegfsMetaCollector.Init(): BeeGFS file system statistics can only be queried by user root") @@ -113,7 +113,7 @@ func (m *BeegfsMetaCollector) Init(config json.RawMessage) error { // Check if beegfs-ctl is in executable search path _, err = exec.LookPath(m.config.Beegfs) if err != nil { - return fmt.Errorf("BeegfsMetaCollector.Init(): Failed to find beegfs-ctl binary '%s': %v", m.config.Beegfs, err) + return fmt.Errorf("BeegfsMetaCollector.Init(): Failed to find beegfs-ctl binary '%s': %w", m.config.Beegfs, err) } m.init = true return nil @@ -123,7 +123,7 @@ func (m *BeegfsMetaCollector) Read(interval time.Duration, output chan lp.CCMess if !m.init { return } - //get mounpoint + // Get mounpoint buffer, _ := os.ReadFile(string("/proc/mounts")) mounts := strings.Split(string(buffer), "\n") var mountpoints []string @@ -164,12 +164,15 @@ func (m *BeegfsMetaCollector) Read(interval time.Duration, output chan lp.CCMess cmd.Stderr = cmdStderr err := cmd.Run() if err != nil { - fmt.Fprintf(os.Stderr, "BeegfsMetaCollector.Read(): Failed to execute command \"%s\": %s\n", cmd.String(), err.Error()) - fmt.Fprintf(os.Stderr, "BeegfsMetaCollector.Read(): command exit code: \"%d\"\n", cmd.ProcessState.ExitCode()) - data, _ := io.ReadAll(cmdStderr) - fmt.Fprintf(os.Stderr, "BeegfsMetaCollector.Read(): command stderr: \"%s\"\n", string(data)) - data, _ = io.ReadAll(cmdStdout) - fmt.Fprintf(os.Stderr, "BeegfsMetaCollector.Read(): command stdout: \"%s\"\n", string(data)) + dataStdErr, _ := io.ReadAll(cmdStderr) + dataStdOut, _ := io.ReadAll(cmdStdout) + cclog.ComponentError( + m.name, + fmt.Sprintf("Read(): Failed to execute command \"%s\": %v\n", cmd.String(), err), + fmt.Sprintf("Read(): command exit code: \"%d\"\n", cmd.ProcessState.ExitCode()), + fmt.Sprintf("Read(): command stderr: \"%s\"\n", string(dataStdErr)), + fmt.Sprintf("Read(): command stdout: \"%s\"\n", string(dataStdOut)), + ) return } // Read I/O statistics diff --git a/collectors/beegfsstorageMetric.go b/collectors/beegfsstorageMetric.go index feb8604..8d46e35 100644 --- a/collectors/beegfsstorageMetric.go +++ b/collectors/beegfsstorageMetric.go @@ -97,7 +97,7 @@ func (m *BeegfsStorageCollector) Init(config json.RawMessage) error { // Beegfs file system statistics can only be queried by user root user, err := user.Current() if err != nil { - return fmt.Errorf("BeegfsStorageCollector.Init(): Failed to get current user: %v", err) + return fmt.Errorf("BeegfsStorageCollector.Init(): Failed to get current user: %w", err) } if user.Uid != "0" { return fmt.Errorf("BeegfsStorageCollector.Init(): BeeGFS file system statistics can only be queried by user root") @@ -106,7 +106,7 @@ func (m *BeegfsStorageCollector) Init(config json.RawMessage) error { // Check if beegfs-ctl is in executable search path _, err = exec.LookPath(m.config.Beegfs) if err != nil { - return fmt.Errorf("BeegfsStorageCollector.Init(): Failed to find beegfs-ctl binary '%s': %v", m.config.Beegfs, err) + return fmt.Errorf("BeegfsStorageCollector.Init(): Failed to find beegfs-ctl binary '%s': %w", m.config.Beegfs, err) } m.init = true return nil @@ -156,12 +156,15 @@ func (m *BeegfsStorageCollector) Read(interval time.Duration, output chan lp.CCM cmd.Stderr = cmdStderr err := cmd.Run() if err != nil { - fmt.Fprintf(os.Stderr, "BeegfsStorageCollector.Read(): Failed to execute command \"%s\": %s\n", cmd.String(), err.Error()) - fmt.Fprintf(os.Stderr, "BeegfsStorageCollector.Read(): command exit code: \"%d\"\n", cmd.ProcessState.ExitCode()) - data, _ := io.ReadAll(cmdStderr) - fmt.Fprintf(os.Stderr, "BeegfsStorageCollector.Read(): command stderr: \"%s\"\n", string(data)) - data, _ = io.ReadAll(cmdStdout) - fmt.Fprintf(os.Stderr, "BeegfsStorageCollector.Read(): command stdout: \"%s\"\n", string(data)) + dataStdErr, _ := io.ReadAll(cmdStderr) + dataStdOut, _ := io.ReadAll(cmdStdout) + cclog.ComponentError( + m.name, + fmt.Sprintf("Read(): Failed to execute command \"%s\": %v\n", cmd.String(), err), + fmt.Sprintf("Read(): command exit code: \"%d\"\n", cmd.ProcessState.ExitCode()), + fmt.Sprintf("Read(): command stderr: \"%s\"\n", string(dataStdErr)), + fmt.Sprintf("Read(): command stdout: \"%s\"\n", string(dataStdOut)), + ) return } // Read I/O statistics diff --git a/collectors/cpufreqCpuinfoMetric.go b/collectors/cpufreqCpuinfoMetric.go index 4b2e184..dcdba00 100644 --- a/collectors/cpufreqCpuinfoMetric.go +++ b/collectors/cpufreqCpuinfoMetric.go @@ -35,7 +35,7 @@ type CPUFreqCpuInfoCollector struct { topology []CPUFreqCpuInfoCollectorTopology } -func (m *CPUFreqCpuInfoCollector) Init(config json.RawMessage) error { +func (m *CPUFreqCpuInfoCollector) Init(_ json.RawMessage) error { // Check if already initialized if m.init { return nil @@ -55,7 +55,7 @@ func (m *CPUFreqCpuInfoCollector) Init(config json.RawMessage) error { const cpuInfoFile = "/proc/cpuinfo" file, err := os.Open(cpuInfoFile) if err != nil { - return fmt.Errorf("failed to open file '%s': %v", cpuInfoFile, err) + return fmt.Errorf("%s Init(): failed to open file '%s': %w", m.name, cpuInfoFile, err) } // Collect topology information from file cpuinfo @@ -123,7 +123,7 @@ func (m *CPUFreqCpuInfoCollector) Init(config json.RawMessage) error { // Check if at least one CPU with frequency information was detected if len(m.topology) == 0 { - return fmt.Errorf("no CPU frequency info found in %s", cpuInfoFile) + return fmt.Errorf("%s Init(): no CPU frequency info found in %s", m.name, cpuInfoFile) } m.init = true diff --git a/collectors/cpufreqMetric.go b/collectors/cpufreqMetric.go index ca2190f..2b46cbc 100644 --- a/collectors/cpufreqMetric.go +++ b/collectors/cpufreqMetric.go @@ -76,15 +76,15 @@ func (m *CPUFreqCollector) Init(config json.RawMessage) error { scalingCurFreqFile := filepath.Join("/sys/devices/system/cpu", fmt.Sprintf("cpu%d", c.CpuID), "cpufreq/scaling_cur_freq") err := unix.Access(scalingCurFreqFile, unix.R_OK) if err != nil { - return fmt.Errorf("unable to access file '%s': %v", scalingCurFreqFile, err) + return fmt.Errorf("unable to access file '%s': %w", scalingCurFreqFile, err) } m.topology = append(m.topology, CPUFreqCollectorTopology{ tagSet: map[string]string{ "type": "hwthread", - "type-id": fmt.Sprint(c.CpuID), - "package_id": fmt.Sprint(c.Socket), + "type-id": strconv.Itoa(c.CpuID), + "package_id": strconv.Itoa(c.Socket), }, scalingCurFreqFile: scalingCurFreqFile, }, diff --git a/collectors/cpustatMetric.go b/collectors/cpustatMetric.go index b21d60c..c5b369c 100644 --- a/collectors/cpustatMetric.go +++ b/collectors/cpustatMetric.go @@ -108,7 +108,9 @@ func (m *CpustatCollector) Init(config json.RawMessage) error { } else if strings.HasPrefix(linefields[0], "cpu") && strings.Compare(linefields[0], "cpu") != 0 { cpustr := strings.TrimLeft(linefields[0], "cpu") cpu, _ := strconv.Atoi(cpustr) - m.cputags[linefields[0]] = map[string]string{"type": "hwthread", "type-id": fmt.Sprintf("%d", cpu)} + m.cputags[linefields[0]] = map[string]string{ + "type": "hwthread", + "type-id": strconv.Itoa(cpu)} m.olddata[linefields[0]] = make(map[string]int64) for k, v := range m.matches { m.olddata[linefields[0]][k], _ = strconv.ParseInt(linefields[v], 0, 64) @@ -191,7 +193,7 @@ func (m *CpustatCollector) Read(interval time.Duration, output chan lp.CCMessage num_cpus_metric, err := lp.NewMessage("num_cpus", m.nodetags, m.meta, - map[string]any{"value": int(num_cpus)}, + map[string]any{"value": num_cpus}, now, ) if err == nil { diff --git a/collectors/customCmdMetric.go b/collectors/customCmdMetric.go index 52917c5..595680c 100644 --- a/collectors/customCmdMetric.go +++ b/collectors/customCmdMetric.go @@ -18,6 +18,7 @@ import ( "strings" "time" + cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" lp "github.com/ClusterCockpit/cc-lib/v2/ccMessage" influx "github.com/influxdata/line-protocol" ) @@ -43,12 +44,14 @@ func (m *CustomCmdCollector) Init(config json.RawMessage) error { var err error m.name = "CustomCmdCollector" m.parallel = true - m.meta = map[string]string{"source": m.name, "group": "Custom"} + m.meta = map[string]string{ + "source": m.name, + "group": "Custom", + } if len(config) > 0 { err = json.Unmarshal(config, &m.config) if err != nil { - log.Print(err.Error()) - return err + return fmt.Errorf("%s Init(): json.Unmarshal() call failed: %w", m.name, err) } } if err := m.setup(); err != nil { @@ -57,13 +60,15 @@ func (m *CustomCmdCollector) Init(config json.RawMessage) error { for _, c := range m.config.Commands { cmdfields := strings.Fields(c) command := exec.Command(cmdfields[0], cmdfields[1:]...) - if err := command.Wait(); err != nil { - log.Print(err) - continue - } _, err = command.Output() if err == nil { m.commands = append(m.commands, c) + } else { + cclog.ComponentWarn( + m.name, + fmt.Sprintf("%s Init(): Execution of command \"%s\" failed: %v", m.name, command.String(), err), + ) + continue } } for _, f := range m.config.Files { @@ -71,7 +76,10 @@ func (m *CustomCmdCollector) Init(config json.RawMessage) error { if err == nil { m.files = append(m.files, f) } else { - log.Print(err.Error()) + cclog.ComponentWarn( + m.name, + fmt.Sprintf("%s Init(): Reading of file \"%s\" failed: %v", m.name, f, err), + ) continue } } diff --git a/collectors/diskstatMetric.go b/collectors/diskstatMetric.go index 84c2680..69050e7 100644 --- a/collectors/diskstatMetric.go +++ b/collectors/diskstatMetric.go @@ -124,7 +124,13 @@ mountLoop: tags := map[string]string{"type": "node", "device": linefields[0]} total := (stat.Blocks * uint64(stat.Bsize)) / uint64(1000000000) if m.allowedMetrics["disk_total"] { - y, err := lp.NewMessage("disk_total", tags, m.meta, map[string]any{"value": total}, time.Now()) + y, err := lp.NewMessage( + "disk_total", + tags, + m.meta, + map[string]any{ + "value": total}, + time.Now()) if err == nil { y.AddMeta("unit", "GBytes") output <- y @@ -132,7 +138,13 @@ mountLoop: } free := (stat.Bfree * uint64(stat.Bsize)) / uint64(1000000000) if m.allowedMetrics["disk_free"] { - y, err := lp.NewMessage("disk_free", tags, m.meta, map[string]any{"value": free}, time.Now()) + y, err := lp.NewMessage( + "disk_free", + tags, + m.meta, + map[string]any{ + "value": free}, + time.Now()) if err == nil { y.AddMeta("unit", "GBytes") output <- y @@ -146,7 +158,14 @@ mountLoop: } } if m.allowedMetrics["part_max_used"] { - y, err := lp.NewMessage("part_max_used", map[string]string{"type": "node"}, m.meta, map[string]any{"value": int(part_max_used)}, time.Now()) + y, err := lp.NewMessage( + "part_max_used", + map[string]string{ + "type": "node"}, + m.meta, + map[string]any{ + "value": int(part_max_used)}, + time.Now()) if err == nil { y.AddMeta("unit", "percent") output <- y diff --git a/collectors/gpfsMetric.go b/collectors/gpfsMetric.go index f83328f..68a5adb 100644 --- a/collectors/gpfsMetric.go +++ b/collectors/gpfsMetric.go @@ -14,7 +14,6 @@ import ( "errors" "fmt" "io" - "log" "os/exec" "os/user" "slices" @@ -324,8 +323,7 @@ func (m *GpfsCollector) Init(config json.RawMessage) error { if len(config) > 0 { err := json.Unmarshal(config, &m.config) if err != nil { - log.Print(err.Error()) - return err + return fmt.Errorf("%s Init(): failed to unmarshal JSON config: %w", m.name, err) } } m.meta = map[string]string{ @@ -366,7 +364,7 @@ func (m *GpfsCollector) Init(config json.RawMessage) error { // when using sudo, the full path of mmpmon must be specified because // exec.LookPath will not work as mmpmon is not executable as user if m.config.Sudo && !strings.HasPrefix(m.config.Mmpmon, "/") { - return fmt.Errorf("when using sudo, mmpmon_path must be provided and an absolute path: %s", m.config.Mmpmon) + return fmt.Errorf("%s Init(): when using sudo, mmpmon_path must be provided and an absolute path: %s", m.name, m.config.Mmpmon) } // Check if mmpmon is in executable search path @@ -379,7 +377,7 @@ func (m *GpfsCollector) Init(config json.RawMessage) error { p = m.config.Mmpmon } else { cclog.ComponentError(m.name, fmt.Sprintf("failed to find mmpmon binary '%s': %v", m.config.Mmpmon, err)) - return fmt.Errorf("failed to find mmpmon binary '%s': %v", m.config.Mmpmon, err) + return fmt.Errorf("%s Init(): failed to find mmpmon binary '%s': %w", m.name, m.config.Mmpmon, err) } } m.config.Mmpmon = p @@ -564,30 +562,30 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMessage) { // compute total metrics (map[...] will return 0 if key not found) // bytes read and written if br, br_ok := newstate["_br_"]; br_ok { - newstate["bytesTotal"] = newstate["bytesTotal"] + br + newstate["bytesTotal"] += br } if bw, bw_ok := newstate["_bw_"]; bw_ok { - newstate["bytesTotal"] = newstate["bytesTotal"] + bw + newstate["bytesTotal"] += bw } // read and write count if rdc, rdc_ok := newstate["_rdc_"]; rdc_ok { - newstate["iops"] = newstate["iops"] + rdc + newstate["iops"] += rdc } if wc, wc_ok := newstate["_wc_"]; wc_ok { - newstate["iops"] = newstate["iops"] + wc + newstate["iops"] += wc } // meta operations if oc, oc_ok := newstate["_oc_"]; oc_ok { - newstate["metaops"] = newstate["metaops"] + oc + newstate["metaops"] += oc } if cc, cc_ok := newstate["_cc_"]; cc_ok { - newstate["metaops"] = newstate["metaops"] + cc + newstate["metaops"] += cc } if dir, dir_ok := newstate["_dir_"]; dir_ok { - newstate["metaops"] = newstate["metaops"] + dir + newstate["metaops"] += dir } if iu, iu_ok := newstate["_iu_"]; iu_ok { - newstate["metaops"] = newstate["metaops"] + iu + newstate["metaops"] += iu } // send desired metrics for this filesystem for _, metric := range m.definitions { @@ -620,13 +618,13 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMessage) { case "derivative": if vnew_ok && vold_ok && timeDiff > 0 { value = float64(vnew-vold) / timeDiff - if value.(float64) < 0 { - value = 0 + if value.(float64) < 0.0 { + value = 0.0 } value_ok = true } else if vold_ok { // if the difference is not computable, return 0 - value = 0 + value = 0.0 value_ok = true } } diff --git a/collectors/infinibandMetric.go b/collectors/infinibandMetric.go index f7499d9..0fef1f1 100644 --- a/collectors/infinibandMetric.go +++ b/collectors/infinibandMetric.go @@ -90,10 +90,10 @@ func (m *InfinibandCollector) Init(config json.RawMessage) error { globPattern := filepath.Join(IB_BASEPATH, "*", "ports", "*") ibDirs, err := filepath.Glob(globPattern) if err != nil { - return fmt.Errorf("unable to glob files with pattern %s: %v", globPattern, err) + return fmt.Errorf("%s Init(): unable to glob files with pattern %s: %w", m.name, globPattern, err) } if ibDirs == nil { - return fmt.Errorf("unable to find any directories with pattern %s", globPattern) + return fmt.Errorf("%s Init(): unable to find any directories with pattern %s", m.name, globPattern) } for _, path := range ibDirs { @@ -157,7 +157,7 @@ func (m *InfinibandCollector) Init(config json.RawMessage) error { for _, counter := range portCounterFiles { err := unix.Access(counter.path, unix.R_OK) if err != nil { - return fmt.Errorf("unable to access %s: %v", counter.path, err) + return fmt.Errorf("%s Init(): unable to access %s: %w", m.name, counter.path, err) } } @@ -177,7 +177,7 @@ func (m *InfinibandCollector) Init(config json.RawMessage) error { } if len(m.info) == 0 { - return fmt.Errorf("found no IB devices") + return fmt.Errorf("%s Init(): found no IB devices", m.name) } m.init = true diff --git a/collectors/likwidMetric.go b/collectors/likwidMetric.go index 1f6d354..e8bcce0 100644 --- a/collectors/likwidMetric.go +++ b/collectors/likwidMetric.go @@ -24,7 +24,7 @@ import ( "os" "os/signal" "os/user" - "sort" + "slices" "strconv" "strings" "sync" @@ -125,22 +125,14 @@ func checkMetricType(t string) bool { return ok } -func eventsToEventStr(events map[string]string) string { - elist := make([]string, 0) - for k, v := range events { - elist = append(elist, fmt.Sprintf("%s:%s", v, k)) - } - return strings.Join(elist, ",") -} - func genLikwidEventSet(input LikwidCollectorEventsetConfig) LikwidEventsetConfig { - tmplist := make([]string, 0) - clist := make([]string, 0) + clist := make([]string, 0, len(input.Events)) for k := range input.Events { clist = append(clist, k) } - sort.Strings(clist) - elist := make([]*C.char, 0) + slices.Sort(clist) + tmplist := make([]string, 0, len(clist)) + elist := make([]*C.char, 0, len(clist)) for _, k := range clist { v := input.Events[k] tmplist = append(tmplist, fmt.Sprintf("%s:%s", v, k)) @@ -217,7 +209,7 @@ func (m *LikwidCollector) Init(config json.RawMessage) error { if len(config) > 0 { err := json.Unmarshal(config, &m.config) if err != nil { - return err + return fmt.Errorf("%s Init(): failed to unmarshal JSON config: %w", m.name, err) } } lib := dl.New(m.config.LibraryPath, LIKWID_LIB_DL_FLAGS) @@ -226,13 +218,13 @@ func (m *LikwidCollector) Init(config json.RawMessage) error { } err := lib.Open() if err != nil { - return fmt.Errorf("error opening %s: %v", m.config.LibraryPath, err) + return fmt.Errorf("error opening %s: %w", m.config.LibraryPath, err) } if m.config.ForceOverwrite { cclog.ComponentDebug(m.name, "Set LIKWID_FORCE=1") if err := os.Setenv("LIKWID_FORCE", "1"); err != nil { - return fmt.Errorf("error setting environment variable LIKWID_FORCE=1: %v", err) + return fmt.Errorf("error setting environment variable LIKWID_FORCE=1: %w", err) } } if err := m.setup(); err != nil { @@ -327,7 +319,7 @@ func (m *LikwidCollector) Init(config json.RawMessage) error { p = m.config.DaemonPath } if err := os.Setenv("PATH", p); err != nil { - return fmt.Errorf("error setting environment variable PATH=%s: %v", p, err) + return fmt.Errorf("error setting environment variable PATH=%s: %w", p, err) } } C.HPMmode(1) @@ -381,7 +373,6 @@ func (m *LikwidCollector) Init(config json.RawMessage) error { // take a measurement for 'interval' seconds of event set index 'group' func (m *LikwidCollector) takeMeasurement(evidx int, evset LikwidEventsetConfig, interval time.Duration) (bool, error) { var ret C.int - var gid C.int = -1 sigchan := make(chan os.Signal, 1) // Watch changes for the lock file () @@ -406,10 +397,10 @@ func (m *LikwidCollector) takeMeasurement(evidx int, evset LikwidEventsetConfig, // Create the lock file if it does not exist file, createErr := os.Create(m.config.LockfilePath) if createErr != nil { - return true, fmt.Errorf("failed to create lock file: %v", createErr) + return true, fmt.Errorf("failed to create lock file: %w", createErr) } if err := file.Close(); err != nil { - return true, fmt.Errorf("failed to close lock file: %v", err) + return true, fmt.Errorf("failed to close lock file: %w", err) } info, err = os.Stat(m.config.LockfilePath) // Recheck the file after creation } @@ -462,6 +453,7 @@ func (m *LikwidCollector) takeMeasurement(evidx int, evset LikwidEventsetConfig, signal.Notify(sigchan, syscall.SIGCHLD) // Add an event string to LIKWID + var gid C.int select { case <-sigchan: gid = -1 @@ -631,7 +623,7 @@ func (m *LikwidCollector) calcEventsetMetrics(evset LikwidEventsetConfig, interv ) if err == nil { if metric.Type != "node" { - y.AddTag("type-id", fmt.Sprintf("%d", domain)) + y.AddTag("type-id", strconv.Itoa(domain)) } if len(metric.Unit) > 0 { y.AddMeta("unit", metric.Unit) @@ -661,7 +653,7 @@ func (m *LikwidCollector) calcEventsetMetrics(evset LikwidEventsetConfig, interv metric.Name, map[string]string{ "type": "core", - "type-id": fmt.Sprintf("%d", coreID), + "type-id": strconv.Itoa(coreID), }, m.meta, map[string]any{ @@ -698,7 +690,7 @@ func (m *LikwidCollector) calcEventsetMetrics(evset LikwidEventsetConfig, interv metric.Name, map[string]string{ "type": "socket", - "type-id": fmt.Sprintf("%d", socketID), + "type-id": strconv.Itoa(socketID), }, m.meta, map[string]any{ @@ -800,7 +792,7 @@ func (m *LikwidCollector) calcGlobalMetrics(groups []LikwidEventsetConfig, inter ) if err == nil { if metric.Type != "node" { - y.AddTag("type-id", fmt.Sprintf("%d", domain)) + y.AddTag("type-id", strconv.Itoa(domain)) } if len(metric.Unit) > 0 { y.AddMeta("unit", metric.Unit) @@ -816,7 +808,7 @@ func (m *LikwidCollector) calcGlobalMetrics(groups []LikwidEventsetConfig, inter } func (m *LikwidCollector) ReadThread(interval time.Duration, output chan lp.CCMessage) { - var err error = nil + var err error groups := make([]LikwidEventsetConfig, 0) for evidx, evset := range m.config.Eventsets { diff --git a/collectors/memstatMetric.go b/collectors/memstatMetric.go index e3b7e87..56a5633 100644 --- a/collectors/memstatMetric.go +++ b/collectors/memstatMetric.go @@ -159,7 +159,7 @@ func (m *MemstatCollector) Init(config json.RawMessage) error { file: f, tags: map[string]string{ "type": "memoryDomain", - "type-id": fmt.Sprintf("%d", id), + "type-id": strconv.Itoa(id), }, } m.nodefiles[id] = f diff --git a/collectors/nfsMetric.go b/collectors/nfsMetric.go index 072b21b..28b5ddb 100644 --- a/collectors/nfsMetric.go +++ b/collectors/nfsMetric.go @@ -10,7 +10,6 @@ package collectors import ( "encoding/json" "fmt" - "log" "slices" // "os" @@ -49,7 +48,7 @@ func (m *nfsCollector) initStats() error { // Wait for cmd end if err := cmd.Wait(); err != nil { - return fmt.Errorf("initStats(): %w", err) + return fmt.Errorf("%s initStats(): %w", m.name, err) } buffer, err := cmd.Output() @@ -81,7 +80,7 @@ func (m *nfsCollector) updateStats() error { // Wait for cmd end if err := cmd.Wait(); err != nil { - return fmt.Errorf("updateStats(): %w", err) + return fmt.Errorf("%s updateStats(): %w", m.name, err) } buffer, err := cmd.Output() @@ -114,8 +113,7 @@ func (m *nfsCollector) MainInit(config json.RawMessage) error { if len(config) > 0 { err := json.Unmarshal(config, &m.config) if err != nil { - log.Print(err.Error()) - return err + return fmt.Errorf("%s Init(): failed to unmarshal JSON config: %w", m.name, err) } } m.meta = map[string]string{ @@ -128,11 +126,11 @@ func (m *nfsCollector) MainInit(config json.RawMessage) error { // Check if nfsstat is in executable search path _, err := exec.LookPath(m.config.Nfsstats) if err != nil { - return fmt.Errorf("NfsCollector.Init(): Failed to find nfsstat binary '%s': %v", m.config.Nfsstats, err) + return fmt.Errorf("%s Init(): Failed to find nfsstat binary '%s': %w", m.name, m.config.Nfsstats, err) } m.data = make(map[string]NfsCollectorData) if err := m.initStats(); err != nil { - return fmt.Errorf("NfsCollector.Init(): %w", err) + return fmt.Errorf("%s Init(): %w", m.name, err) } m.init = true m.parallel = true @@ -152,7 +150,7 @@ func (m *nfsCollector) Read(interval time.Duration, output chan lp.CCMessage) { ) return } - prefix := "" + var prefix string switch m.version { case "v3": prefix = "nfs3" diff --git a/collectors/nfsiostatMetric.go b/collectors/nfsiostatMetric.go index 76ef237..c2d1c25 100644 --- a/collectors/nfsiostatMetric.go +++ b/collectors/nfsiostatMetric.go @@ -143,7 +143,13 @@ func (m *NfsIOStatCollector) Read(interval time.Duration, output chan lp.CCMessa if old, ok := m.data[mntpoint]; ok { for name, newVal := range values { if m.config.SendAbsoluteValues { - msg, err := lp.NewMessage(fmt.Sprintf("nfsio_%s", name), m.tags, m.meta, map[string]any{"value": newVal}, now) + msg, err := lp.NewMessage( + "nfsio_"+name, + m.tags, + m.meta, + map[string]any{ + "value": newVal}, + now) if err == nil { msg.AddTag("stype", "filesystem") msg.AddTag("stype-id", mntpoint) diff --git a/collectors/numastatsMetric.go b/collectors/numastatsMetric.go index b528918..cb37d46 100644 --- a/collectors/numastatsMetric.go +++ b/collectors/numastatsMetric.go @@ -84,7 +84,7 @@ func (m *NUMAStatsCollector) Init(config json.RawMessage) error { if len(config) > 0 { err := json.Unmarshal(config, &m.config) if err != nil { - return fmt.Errorf("unable to unmarshal numastat configuration: %s", err.Error()) + return fmt.Errorf("%s Init(): unable to unmarshal numastat configuration: %w", m.name, err) } } @@ -93,10 +93,10 @@ func (m *NUMAStatsCollector) Init(config json.RawMessage) error { globPattern := base + "[0-9]*" dirs, err := filepath.Glob(globPattern) if err != nil { - return fmt.Errorf("unable to glob files with pattern '%s'", globPattern) + return fmt.Errorf("%s Init(): unable to glob files with pattern '%s'", m.name, globPattern) } if dirs == nil { - return fmt.Errorf("unable to find any files with pattern '%s'", globPattern) + return fmt.Errorf("%s Init(): unable to find any files with pattern '%s'", m.name, globPattern) } m.topology = make([]NUMAStatsCollectorTopolgy, 0, len(dirs)) for _, dir := range dirs { diff --git a/collectors/nvidiaMetric.go b/collectors/nvidiaMetric.go index 6a06daa..ee92c51 100644 --- a/collectors/nvidiaMetric.go +++ b/collectors/nvidiaMetric.go @@ -14,6 +14,7 @@ import ( "log" "maps" "slices" + "strconv" "strings" "time" @@ -112,7 +113,7 @@ func (m *NvidiaCollector) Init(config json.RawMessage) error { for i := range num_gpus { // Skip excluded devices by ID - str_i := fmt.Sprintf("%d", i) + str_i := strconv.Itoa(i) if slices.Contains(m.config.ExcludeDevices, str_i) { cclog.ComponentDebug(m.name, "Skipping excluded device", str_i) continue @@ -239,7 +240,7 @@ func readMemoryInfo(device *NvidiaCollectorDevice, output chan lp.CCMessage) err if !device.excludeMetrics["nv_fb_mem_total"] { t := float64(total) / (1024 * 1024) - y, err := lp.NewMessage("nv_fb_mem_total", device.tags, device.meta, map[string]any{"value": t}, time.Now()) + y, err := lp.NewMetric("nv_fb_mem_total", device.tags, device.meta, t, time.Now()) if err == nil { y.AddMeta("unit", "MByte") output <- y @@ -248,7 +249,7 @@ func readMemoryInfo(device *NvidiaCollectorDevice, output chan lp.CCMessage) err if !device.excludeMetrics["nv_fb_mem_used"] { f := float64(used) / (1024 * 1024) - y, err := lp.NewMessage("nv_fb_mem_used", device.tags, device.meta, map[string]any{"value": f}, time.Now()) + y, err := lp.NewMetric("nv_fb_mem_used", device.tags, device.meta, f, time.Now()) if err == nil { y.AddMeta("unit", "MByte") output <- y @@ -257,7 +258,7 @@ func readMemoryInfo(device *NvidiaCollectorDevice, output chan lp.CCMessage) err if v2 && !device.excludeMetrics["nv_fb_mem_reserved"] { r := float64(reserved) / (1024 * 1024) - y, err := lp.NewMessage("nv_fb_mem_reserved", device.tags, device.meta, map[string]any{"value": r}, time.Now()) + y, err := lp.NewMetric("nv_fb_mem_reserved", device.tags, device.meta, r, time.Now()) if err == nil { y.AddMeta("unit", "MByte") output <- y @@ -276,7 +277,7 @@ func readBarMemoryInfo(device *NvidiaCollectorDevice, output chan lp.CCMessage) } if !device.excludeMetrics["nv_bar1_mem_total"] { t := float64(meminfo.Bar1Total) / (1024 * 1024) - y, err := lp.NewMessage("nv_bar1_mem_total", device.tags, device.meta, map[string]any{"value": t}, time.Now()) + y, err := lp.NewMetric("nv_bar1_mem_total", device.tags, device.meta, t, time.Now()) if err == nil { y.AddMeta("unit", "MByte") output <- y @@ -284,7 +285,7 @@ func readBarMemoryInfo(device *NvidiaCollectorDevice, output chan lp.CCMessage) } if !device.excludeMetrics["nv_bar1_mem_used"] { t := float64(meminfo.Bar1Used) / (1024 * 1024) - y, err := lp.NewMessage("nv_bar1_mem_used", device.tags, device.meta, map[string]any{"value": t}, time.Now()) + y, err := lp.NewMetric("nv_bar1_mem_used", device.tags, device.meta, t, time.Now()) if err == nil { y.AddMeta("unit", "MByte") output <- y @@ -318,14 +319,14 @@ func readUtilization(device *NvidiaCollectorDevice, output chan lp.CCMessage) er util, ret := nvml.DeviceGetUtilizationRates(device.device) if ret == nvml.SUCCESS { if !device.excludeMetrics["nv_util"] { - y, err := lp.NewMessage("nv_util", device.tags, device.meta, map[string]any{"value": float64(util.Gpu)}, time.Now()) + y, err := lp.NewMetric("nv_util", device.tags, device.meta, float64(util.Gpu), time.Now()) if err == nil { y.AddMeta("unit", "%") output <- y } } if !device.excludeMetrics["nv_mem_util"] { - y, err := lp.NewMessage("nv_mem_util", device.tags, device.meta, map[string]any{"value": float64(util.Memory)}, time.Now()) + y, err := lp.NewMetric("nv_mem_util", device.tags, device.meta, float64(util.Memory), time.Now()) if err == nil { y.AddMeta("unit", "%") output <- y @@ -345,7 +346,7 @@ func readTemp(device *NvidiaCollectorDevice, output chan lp.CCMessage) error { // * NVML_TEMPERATURE_COUNT temp, ret := nvml.DeviceGetTemperature(device.device, nvml.TEMPERATURE_GPU) if ret == nvml.SUCCESS { - y, err := lp.NewMessage("nv_temp", device.tags, device.meta, map[string]any{"value": float64(temp)}, time.Now()) + y, err := lp.NewMetric("nv_temp", device.tags, device.meta, float64(temp), time.Now()) if err == nil { y.AddMeta("unit", "degC") output <- y @@ -368,7 +369,7 @@ func readFan(device *NvidiaCollectorDevice, output chan lp.CCMessage) error { // This value may exceed 100% in certain cases. fan, ret := nvml.DeviceGetFanSpeed(device.device) if ret == nvml.SUCCESS { - y, err := lp.NewMessage("nv_fan", device.tags, device.meta, map[string]any{"value": float64(fan)}, time.Now()) + y, err := lp.NewMetric("nv_fan", device.tags, device.meta, float64(fan), time.Now()) if err == nil { y.AddMeta("unit", "%") output <- y @@ -378,27 +379,6 @@ func readFan(device *NvidiaCollectorDevice, output chan lp.CCMessage) error { return nil } -// func readFans(device *NvidiaCollectorDevice, output chan lp.CCMessage) error { -// if !device.excludeMetrics["nv_fan"] { -// numFans, ret := nvml.DeviceGetNumFans(device.device) -// if ret == nvml.SUCCESS { -// for i := 0; i < numFans; i++ { -// fan, ret := nvml.DeviceGetFanSpeed_v2(device.device, i) -// if ret == nvml.SUCCESS { -// y, err := lp.NewMessage("nv_fan", device.tags, device.meta, map[string]interface{}{"value": float64(fan)}, time.Now()) -// if err == nil { -// y.AddMeta("unit", "%") -// y.AddTag("stype", "fan") -// y.AddTag("stype-id", fmt.Sprintf("%d", i)) -// output <- y -// } -// } -// } -// } -// } -// return nil -// } - func readEccMode(device *NvidiaCollectorDevice, output chan lp.CCMessage) error { if !device.excludeMetrics["nv_ecc_mode"] { // Retrieves the current and pending ECC modes for the device. @@ -415,17 +395,17 @@ func readEccMode(device *NvidiaCollectorDevice, output chan lp.CCMessage) error var err error switch ecc_pend { case nvml.FEATURE_DISABLED: - y, err = lp.NewMessage("nv_ecc_mode", device.tags, device.meta, map[string]any{"value": "OFF"}, time.Now()) + y, err = lp.NewMetric("nv_ecc_mode", device.tags, device.meta, "OFF", time.Now()) case nvml.FEATURE_ENABLED: - y, err = lp.NewMessage("nv_ecc_mode", device.tags, device.meta, map[string]any{"value": "ON"}, time.Now()) + y, err = lp.NewMetric("nv_ecc_mode", device.tags, device.meta, "ON", time.Now()) default: - y, err = lp.NewMessage("nv_ecc_mode", device.tags, device.meta, map[string]any{"value": "UNKNOWN"}, time.Now()) + y, err = lp.NewMetric("nv_ecc_mode", device.tags, device.meta, "UNKNOWN", time.Now()) } if err == nil { output <- y } case nvml.ERROR_NOT_SUPPORTED: - y, err := lp.NewMessage("nv_ecc_mode", device.tags, device.meta, map[string]any{"value": "N/A"}, time.Now()) + y, err := lp.NewMetric("nv_ecc_mode", device.tags, device.meta, "N/A", time.Now()) if err == nil { output <- y } @@ -445,7 +425,7 @@ func readPerfState(device *NvidiaCollectorDevice, output chan lp.CCMessage) erro // 32: Unknown performance state. pState, ret := nvml.DeviceGetPerformanceState(device.device) if ret == nvml.SUCCESS { - y, err := lp.NewMessage("nv_perf_state", device.tags, device.meta, map[string]any{"value": fmt.Sprintf("P%d", int(pState))}, time.Now()) + y, err := lp.NewMetric("nv_perf_state", device.tags, device.meta, fmt.Sprintf("P%d", int(pState)), time.Now()) if err == nil { output <- y } @@ -471,7 +451,7 @@ func readPowerUsage(device *NvidiaCollectorDevice, output chan lp.CCMessage) err if mode == nvml.FEATURE_ENABLED { power, ret := nvml.DeviceGetPowerUsage(device.device) if ret == nvml.SUCCESS { - y, err := lp.NewMessage("nv_power_usage", device.tags, device.meta, map[string]any{"value": float64(power) / 1000}, time.Now()) + y, err := lp.NewMetric("nv_power_usage", device.tags, device.meta, float64(power)/1000, time.Now()) if err == nil { y.AddMeta("unit", "watts") output <- y @@ -497,7 +477,12 @@ func readEnergyConsumption(device *NvidiaCollectorDevice, output chan lp.CCMessa if ret == nvml.SUCCESS { if device.lastEnergyReading != 0 { if !device.excludeMetrics["nv_energy"] { - y, err := lp.NewMetric("nv_energy", device.tags, device.meta, (energy-device.lastEnergyReading)/1000, now) + y, err := lp.NewMetric( + "nv_energy", + device.tags, + device.meta, + (energy-device.lastEnergyReading)/1000, + now) if err == nil { y.AddMeta("unit", "Joules") output <- y @@ -539,7 +524,7 @@ func readClocks(device *NvidiaCollectorDevice, output chan lp.CCMessage) error { if !device.excludeMetrics["nv_graphics_clock"] { graphicsClock, ret := nvml.DeviceGetClockInfo(device.device, nvml.CLOCK_GRAPHICS) if ret == nvml.SUCCESS { - y, err := lp.NewMessage("nv_graphics_clock", device.tags, device.meta, map[string]any{"value": float64(graphicsClock)}, time.Now()) + y, err := lp.NewMetric("nv_graphics_clock", device.tags, device.meta, float64(graphicsClock), time.Now()) if err == nil { y.AddMeta("unit", "MHz") output <- y @@ -550,7 +535,7 @@ func readClocks(device *NvidiaCollectorDevice, output chan lp.CCMessage) error { if !device.excludeMetrics["nv_sm_clock"] { smCock, ret := nvml.DeviceGetClockInfo(device.device, nvml.CLOCK_SM) if ret == nvml.SUCCESS { - y, err := lp.NewMessage("nv_sm_clock", device.tags, device.meta, map[string]any{"value": float64(smCock)}, time.Now()) + y, err := lp.NewMetric("nv_sm_clock", device.tags, device.meta, float64(smCock), time.Now()) if err == nil { y.AddMeta("unit", "MHz") output <- y @@ -561,7 +546,7 @@ func readClocks(device *NvidiaCollectorDevice, output chan lp.CCMessage) error { if !device.excludeMetrics["nv_mem_clock"] { memClock, ret := nvml.DeviceGetClockInfo(device.device, nvml.CLOCK_MEM) if ret == nvml.SUCCESS { - y, err := lp.NewMessage("nv_mem_clock", device.tags, device.meta, map[string]any{"value": float64(memClock)}, time.Now()) + y, err := lp.NewMetric("nv_mem_clock", device.tags, device.meta, float64(memClock), time.Now()) if err == nil { y.AddMeta("unit", "MHz") output <- y @@ -571,7 +556,7 @@ func readClocks(device *NvidiaCollectorDevice, output chan lp.CCMessage) error { if !device.excludeMetrics["nv_video_clock"] { memClock, ret := nvml.DeviceGetClockInfo(device.device, nvml.CLOCK_VIDEO) if ret == nvml.SUCCESS { - y, err := lp.NewMessage("nv_video_clock", device.tags, device.meta, map[string]any{"value": float64(memClock)}, time.Now()) + y, err := lp.NewMetric("nv_video_clock", device.tags, device.meta, float64(memClock), time.Now()) if err == nil { y.AddMeta("unit", "MHz") output <- y @@ -652,7 +637,7 @@ func readEccErrors(device *NvidiaCollectorDevice, output chan lp.CCMessage) erro // i.e. the total set of errors across the entire device. ecc_db, ret := nvml.DeviceGetTotalEccErrors(device.device, nvml.MEMORY_ERROR_TYPE_UNCORRECTED, nvml.AGGREGATE_ECC) if ret == nvml.SUCCESS { - y, err := lp.NewMessage("nv_ecc_uncorrected_error", device.tags, device.meta, map[string]any{"value": float64(ecc_db)}, time.Now()) + y, err := lp.NewMetric("nv_ecc_uncorrected_error", device.tags, device.meta, float64(ecc_db), time.Now()) if err == nil { output <- y } @@ -661,7 +646,7 @@ func readEccErrors(device *NvidiaCollectorDevice, output chan lp.CCMessage) erro if !device.excludeMetrics["nv_ecc_corrected_error"] { ecc_sb, ret := nvml.DeviceGetTotalEccErrors(device.device, nvml.MEMORY_ERROR_TYPE_CORRECTED, nvml.AGGREGATE_ECC) if ret == nvml.SUCCESS { - y, err := lp.NewMessage("nv_ecc_corrected_error", device.tags, device.meta, map[string]any{"value": float64(ecc_sb)}, time.Now()) + y, err := lp.NewMetric("nv_ecc_corrected_error", device.tags, device.meta, float64(ecc_sb), time.Now()) if err == nil { output <- y } @@ -680,7 +665,7 @@ func readPowerLimit(device *NvidiaCollectorDevice, output chan lp.CCMessage) err // If the card's total power draw reaches this limit the power management algorithm kicks in. pwr_limit, ret := nvml.DeviceGetPowerManagementLimit(device.device) if ret == nvml.SUCCESS { - y, err := lp.NewMessage("nv_power_max_limit", device.tags, device.meta, map[string]any{"value": float64(pwr_limit) / 1000}, time.Now()) + y, err := lp.NewMetric("nv_power_max_limit", device.tags, device.meta, float64(pwr_limit)/1000, time.Now()) if err == nil { y.AddMeta("unit", "watts") output <- y @@ -707,7 +692,7 @@ func readEncUtilization(device *NvidiaCollectorDevice, output chan lp.CCMessage) // Note: On MIG-enabled GPUs, querying encoder utilization is not currently supported. enc_util, _, ret := nvml.DeviceGetEncoderUtilization(device.device) if ret == nvml.SUCCESS { - y, err := lp.NewMessage("nv_encoder_util", device.tags, device.meta, map[string]any{"value": float64(enc_util)}, time.Now()) + y, err := lp.NewMetric("nv_encoder_util", device.tags, device.meta, float64(enc_util), time.Now()) if err == nil { y.AddMeta("unit", "%") output <- y @@ -734,7 +719,7 @@ func readDecUtilization(device *NvidiaCollectorDevice, output chan lp.CCMessage) // Note: On MIG-enabled GPUs, querying encoder utilization is not currently supported. dec_util, _, ret := nvml.DeviceGetDecoderUtilization(device.device) if ret == nvml.SUCCESS { - y, err := lp.NewMessage("nv_decoder_util", device.tags, device.meta, map[string]any{"value": float64(dec_util)}, time.Now()) + y, err := lp.NewMetric("nv_decoder_util", device.tags, device.meta, float64(dec_util), time.Now()) if err == nil { y.AddMeta("unit", "%") output <- y @@ -761,13 +746,13 @@ func readRemappedRows(device *NvidiaCollectorDevice, output chan lp.CCMessage) e corrected, uncorrected, pending, failure, ret := nvml.DeviceGetRemappedRows(device.device) if ret == nvml.SUCCESS { if !device.excludeMetrics["nv_remapped_rows_corrected"] { - y, err := lp.NewMessage("nv_remapped_rows_corrected", device.tags, device.meta, map[string]any{"value": float64(corrected)}, time.Now()) + y, err := lp.NewMetric("nv_remapped_rows_corrected", device.tags, device.meta, float64(corrected), time.Now()) if err == nil { output <- y } } if !device.excludeMetrics["nv_remapped_rows_uncorrected"] { - y, err := lp.NewMessage("nv_remapped_rows_corrected", device.tags, device.meta, map[string]any{"value": float64(uncorrected)}, time.Now()) + y, err := lp.NewMetric("nv_remapped_rows_corrected", device.tags, device.meta, float64(uncorrected), time.Now()) if err == nil { output <- y } @@ -777,7 +762,7 @@ func readRemappedRows(device *NvidiaCollectorDevice, output chan lp.CCMessage) e if pending { p = 1 } - y, err := lp.NewMessage("nv_remapped_rows_pending", device.tags, device.meta, map[string]any{"value": p}, time.Now()) + y, err := lp.NewMetric("nv_remapped_rows_pending", device.tags, device.meta, p, time.Now()) if err == nil { output <- y } @@ -787,7 +772,7 @@ func readRemappedRows(device *NvidiaCollectorDevice, output chan lp.CCMessage) e if failure { f = 1 } - y, err := lp.NewMessage("nv_remapped_rows_failure", device.tags, device.meta, map[string]any{"value": f}, time.Now()) + y, err := lp.NewMetric("nv_remapped_rows_failure", device.tags, device.meta, f, time.Now()) if err == nil { output <- y } @@ -821,7 +806,7 @@ func readProcessCounts(device *NvidiaCollectorDevice, output chan lp.CCMessage) // Querying per-instance information using MIG device handles is not supported if the device is in vGPU Host virtualization mode. procList, ret := nvml.DeviceGetComputeRunningProcesses(device.device) if ret == nvml.SUCCESS { - y, err := lp.NewMessage("nv_compute_processes", device.tags, device.meta, map[string]any{"value": len(procList)}, time.Now()) + y, err := lp.NewMetric("nv_compute_processes", device.tags, device.meta, len(procList), time.Now()) if err == nil { output <- y } @@ -850,7 +835,7 @@ func readProcessCounts(device *NvidiaCollectorDevice, output chan lp.CCMessage) // Querying per-instance information using MIG device handles is not supported if the device is in vGPU Host virtualization mode. procList, ret := nvml.DeviceGetGraphicsRunningProcesses(device.device) if ret == nvml.SUCCESS { - y, err := lp.NewMessage("nv_graphics_processes", device.tags, device.meta, map[string]any{"value": len(procList)}, time.Now()) + y, err := lp.NewMetric("nv_graphics_processes", device.tags, device.meta, len(procList), time.Now()) if err == nil { output <- y } @@ -880,7 +865,7 @@ func readProcessCounts(device *NvidiaCollectorDevice, output chan lp.CCMessage) // // Querying per-instance information using MIG device handles is not supported if the device is in vGPU Host virtualization mode. // procList, ret := nvml.DeviceGetMPSComputeRunningProcesses(device.device) // if ret == nvml.SUCCESS { - // y, err := lp.NewMessage("nv_mps_compute_processes", device.tags, device.meta, map[string]interface{}{"value": len(procList)}, time.Now()) + // y, err := lp.NewMetric("nv_mps_compute_processes", device.tags, device.meta, len(procList), time.Now()) // if err == nil { // output <- y // } @@ -908,7 +893,7 @@ func readViolationStats(device *NvidiaCollectorDevice, output chan lp.CCMessage) violTime, ret = nvml.DeviceGetViolationStatus(device.device, nvml.PERF_POLICY_POWER) if ret == nvml.SUCCESS { t := float64(violTime.ViolationTime) * 1e-9 - y, err := lp.NewMessage("nv_violation_power", device.tags, device.meta, map[string]any{"value": t}, time.Now()) + y, err := lp.NewMetric("nv_violation_power", device.tags, device.meta, t, time.Now()) if err == nil { y.AddMeta("unit", "sec") output <- y @@ -920,7 +905,7 @@ func readViolationStats(device *NvidiaCollectorDevice, output chan lp.CCMessage) violTime, ret = nvml.DeviceGetViolationStatus(device.device, nvml.PERF_POLICY_THERMAL) if ret == nvml.SUCCESS { t := float64(violTime.ViolationTime) * 1e-9 - y, err := lp.NewMessage("nv_violation_thermal", device.tags, device.meta, map[string]any{"value": t}, time.Now()) + y, err := lp.NewMetric("nv_violation_thermal", device.tags, device.meta, t, time.Now()) if err == nil { y.AddMeta("unit", "sec") output <- y @@ -932,7 +917,7 @@ func readViolationStats(device *NvidiaCollectorDevice, output chan lp.CCMessage) violTime, ret = nvml.DeviceGetViolationStatus(device.device, nvml.PERF_POLICY_SYNC_BOOST) if ret == nvml.SUCCESS { t := float64(violTime.ViolationTime) * 1e-9 - y, err := lp.NewMessage("nv_violation_sync_boost", device.tags, device.meta, map[string]any{"value": t}, time.Now()) + y, err := lp.NewMetric("nv_violation_sync_boost", device.tags, device.meta, t, time.Now()) if err == nil { y.AddMeta("unit", "sec") output <- y @@ -944,7 +929,7 @@ func readViolationStats(device *NvidiaCollectorDevice, output chan lp.CCMessage) violTime, ret = nvml.DeviceGetViolationStatus(device.device, nvml.PERF_POLICY_BOARD_LIMIT) if ret == nvml.SUCCESS { t := float64(violTime.ViolationTime) * 1e-9 - y, err := lp.NewMessage("nv_violation_board_limit", device.tags, device.meta, map[string]any{"value": t}, time.Now()) + y, err := lp.NewMetric("nv_violation_board_limit", device.tags, device.meta, t, time.Now()) if err == nil { y.AddMeta("unit", "sec") output <- y @@ -956,7 +941,7 @@ func readViolationStats(device *NvidiaCollectorDevice, output chan lp.CCMessage) violTime, ret = nvml.DeviceGetViolationStatus(device.device, nvml.PERF_POLICY_LOW_UTILIZATION) if ret == nvml.SUCCESS { t := float64(violTime.ViolationTime) * 1e-9 - y, err := lp.NewMessage("nv_violation_low_util", device.tags, device.meta, map[string]any{"value": t}, time.Now()) + y, err := lp.NewMetric("nv_violation_low_util", device.tags, device.meta, t, time.Now()) if err == nil { y.AddMeta("unit", "sec") output <- y @@ -968,7 +953,7 @@ func readViolationStats(device *NvidiaCollectorDevice, output chan lp.CCMessage) violTime, ret = nvml.DeviceGetViolationStatus(device.device, nvml.PERF_POLICY_RELIABILITY) if ret == nvml.SUCCESS { t := float64(violTime.ViolationTime) * 1e-9 - y, err := lp.NewMessage("nv_violation_reliability", device.tags, device.meta, map[string]any{"value": t}, time.Now()) + y, err := lp.NewMetric("nv_violation_reliability", device.tags, device.meta, t, time.Now()) if err == nil { y.AddMeta("unit", "sec") output <- y @@ -980,7 +965,7 @@ func readViolationStats(device *NvidiaCollectorDevice, output chan lp.CCMessage) violTime, ret = nvml.DeviceGetViolationStatus(device.device, nvml.PERF_POLICY_TOTAL_APP_CLOCKS) if ret == nvml.SUCCESS { t := float64(violTime.ViolationTime) * 1e-9 - y, err := lp.NewMessage("nv_violation_below_app_clock", device.tags, device.meta, map[string]any{"value": t}, time.Now()) + y, err := lp.NewMetric("nv_violation_below_app_clock", device.tags, device.meta, t, time.Now()) if err == nil { y.AddMeta("unit", "sec") output <- y @@ -992,7 +977,7 @@ func readViolationStats(device *NvidiaCollectorDevice, output chan lp.CCMessage) violTime, ret = nvml.DeviceGetViolationStatus(device.device, nvml.PERF_POLICY_TOTAL_BASE_CLOCKS) if ret == nvml.SUCCESS { t := float64(violTime.ViolationTime) * 1e-9 - y, err := lp.NewMessage("nv_violation_below_base_clock", device.tags, device.meta, map[string]any{"value": t}, time.Now()) + y, err := lp.NewMetric("nv_violation_below_base_clock", device.tags, device.meta, t, time.Now()) if err == nil { y.AddMeta("unit", "sec") output <- y @@ -1022,12 +1007,12 @@ func readNVLinkStats(device *NvidiaCollectorDevice, output chan lp.CCMessage) er if !device.excludeMetrics["nv_nvlink_crc_errors"] { // Data link receive data CRC error counter count, ret := nvml.DeviceGetNvLinkErrorCounter(device.device, i, nvml.NVLINK_ERROR_DL_CRC_DATA) - aggregate_crc_errors = aggregate_crc_errors + count + aggregate_crc_errors += count if ret == nvml.SUCCESS { - y, err := lp.NewMessage("nv_nvlink_crc_errors", device.tags, device.meta, map[string]any{"value": count}, time.Now()) + y, err := lp.NewMetric("nv_nvlink_crc_errors", device.tags, device.meta, count, time.Now()) if err == nil { y.AddTag("stype", "nvlink") - y.AddTag("stype-id", fmt.Sprintf("%d", i)) + y.AddTag("stype-id", strconv.Itoa(i)) output <- y } } @@ -1035,12 +1020,12 @@ func readNVLinkStats(device *NvidiaCollectorDevice, output chan lp.CCMessage) er if !device.excludeMetrics["nv_nvlink_ecc_errors"] { // Data link receive data ECC error counter count, ret := nvml.DeviceGetNvLinkErrorCounter(device.device, i, nvml.NVLINK_ERROR_DL_ECC_DATA) - aggregate_ecc_errors = aggregate_ecc_errors + count + aggregate_ecc_errors += count if ret == nvml.SUCCESS { - y, err := lp.NewMessage("nv_nvlink_ecc_errors", device.tags, device.meta, map[string]any{"value": count}, time.Now()) + y, err := lp.NewMetric("nv_nvlink_ecc_errors", device.tags, device.meta, count, time.Now()) if err == nil { y.AddTag("stype", "nvlink") - y.AddTag("stype-id", fmt.Sprintf("%d", i)) + y.AddTag("stype-id", strconv.Itoa(i)) output <- y } } @@ -1048,12 +1033,12 @@ func readNVLinkStats(device *NvidiaCollectorDevice, output chan lp.CCMessage) er if !device.excludeMetrics["nv_nvlink_replay_errors"] { // Data link transmit replay error counter count, ret := nvml.DeviceGetNvLinkErrorCounter(device.device, i, nvml.NVLINK_ERROR_DL_REPLAY) - aggregate_replay_errors = aggregate_replay_errors + count + aggregate_replay_errors += count if ret == nvml.SUCCESS { - y, err := lp.NewMessage("nv_nvlink_replay_errors", device.tags, device.meta, map[string]any{"value": count}, time.Now()) + y, err := lp.NewMetric("nv_nvlink_replay_errors", device.tags, device.meta, count, time.Now()) if err == nil { y.AddTag("stype", "nvlink") - y.AddTag("stype-id", fmt.Sprintf("%d", i)) + y.AddTag("stype-id", strconv.Itoa(i)) output <- y } } @@ -1061,12 +1046,12 @@ func readNVLinkStats(device *NvidiaCollectorDevice, output chan lp.CCMessage) er if !device.excludeMetrics["nv_nvlink_recovery_errors"] { // Data link transmit recovery error counter count, ret := nvml.DeviceGetNvLinkErrorCounter(device.device, i, nvml.NVLINK_ERROR_DL_RECOVERY) - aggregate_recovery_errors = aggregate_recovery_errors + count + aggregate_recovery_errors += count if ret == nvml.SUCCESS { - y, err := lp.NewMessage("nv_nvlink_recovery_errors", device.tags, device.meta, map[string]any{"value": count}, time.Now()) + y, err := lp.NewMetric("nv_nvlink_recovery_errors", device.tags, device.meta, count, time.Now()) if err == nil { y.AddTag("stype", "nvlink") - y.AddTag("stype-id", fmt.Sprintf("%d", i)) + y.AddTag("stype-id", strconv.Itoa(i)) output <- y } } @@ -1074,12 +1059,12 @@ func readNVLinkStats(device *NvidiaCollectorDevice, output chan lp.CCMessage) er if !device.excludeMetrics["nv_nvlink_crc_flit_errors"] { // Data link receive flow control digit CRC error counter count, ret := nvml.DeviceGetNvLinkErrorCounter(device.device, i, nvml.NVLINK_ERROR_DL_CRC_FLIT) - aggregate_crc_flit_errors = aggregate_crc_flit_errors + count + aggregate_crc_flit_errors += count if ret == nvml.SUCCESS { - y, err := lp.NewMessage("nv_nvlink_crc_flit_errors", device.tags, device.meta, map[string]any{"value": count}, time.Now()) + y, err := lp.NewMetric("nv_nvlink_crc_flit_errors", device.tags, device.meta, count, time.Now()) if err == nil { y.AddTag("stype", "nvlink") - y.AddTag("stype-id", fmt.Sprintf("%d", i)) + y.AddTag("stype-id", strconv.Itoa(i)) output <- y } } @@ -1091,7 +1076,7 @@ func readNVLinkStats(device *NvidiaCollectorDevice, output chan lp.CCMessage) er // Export aggegated values if !device.excludeMetrics["nv_nvlink_crc_errors"] { // Data link receive data CRC error counter - y, err := lp.NewMessage("nv_nvlink_crc_errors_sum", device.tags, device.meta, map[string]any{"value": aggregate_crc_errors}, time.Now()) + y, err := lp.NewMetric("nv_nvlink_crc_errors_sum", device.tags, device.meta, aggregate_crc_errors, time.Now()) if err == nil { y.AddTag("stype", "nvlink") output <- y @@ -1099,7 +1084,7 @@ func readNVLinkStats(device *NvidiaCollectorDevice, output chan lp.CCMessage) er } if !device.excludeMetrics["nv_nvlink_ecc_errors"] { // Data link receive data ECC error counter - y, err := lp.NewMessage("nv_nvlink_ecc_errors_sum", device.tags, device.meta, map[string]any{"value": aggregate_ecc_errors}, time.Now()) + y, err := lp.NewMetric("nv_nvlink_ecc_errors_sum", device.tags, device.meta, aggregate_ecc_errors, time.Now()) if err == nil { y.AddTag("stype", "nvlink") output <- y @@ -1107,7 +1092,7 @@ func readNVLinkStats(device *NvidiaCollectorDevice, output chan lp.CCMessage) er } if !device.excludeMetrics["nv_nvlink_replay_errors"] { // Data link transmit replay error counter - y, err := lp.NewMessage("nv_nvlink_replay_errors_sum", device.tags, device.meta, map[string]any{"value": aggregate_replay_errors}, time.Now()) + y, err := lp.NewMetric("nv_nvlink_replay_errors_sum", device.tags, device.meta, aggregate_replay_errors, time.Now()) if err == nil { y.AddTag("stype", "nvlink") output <- y @@ -1115,7 +1100,7 @@ func readNVLinkStats(device *NvidiaCollectorDevice, output chan lp.CCMessage) er } if !device.excludeMetrics["nv_nvlink_recovery_errors"] { // Data link transmit recovery error counter - y, err := lp.NewMessage("nv_nvlink_recovery_errors_sum", device.tags, device.meta, map[string]any{"value": aggregate_recovery_errors}, time.Now()) + y, err := lp.NewMetric("nv_nvlink_recovery_errors_sum", device.tags, device.meta, aggregate_recovery_errors, time.Now()) if err == nil { y.AddTag("stype", "nvlink") output <- y @@ -1123,7 +1108,7 @@ func readNVLinkStats(device *NvidiaCollectorDevice, output chan lp.CCMessage) er } if !device.excludeMetrics["nv_nvlink_crc_flit_errors"] { // Data link receive flow control digit CRC error counter - y, err := lp.NewMessage("nv_nvlink_crc_flit_errors_sum", device.tags, device.meta, map[string]any{"value": aggregate_crc_flit_errors}, time.Now()) + y, err := lp.NewMetric("nv_nvlink_crc_flit_errors_sum", device.tags, device.meta, aggregate_crc_flit_errors, time.Now()) if err == nil { y.AddTag("stype", "nvlink") output <- y @@ -1302,7 +1287,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMessage) } } if _, ok := migDevice.tags["stype-id"]; !ok { - migDevice.tags["stype-id"] = fmt.Sprintf("%d", j) + migDevice.tags["stype-id"] = strconv.Itoa(j) } maps.Copy(migDevice.meta, m.gpus[i].meta) if _, ok := migDevice.meta["uuid"]; ok && !m.config.UseUuidForMigDevices { diff --git a/collectors/rocmsmiMetric.go b/collectors/rocmsmiMetric.go index c64a726..04b32b9 100644 --- a/collectors/rocmsmiMetric.go +++ b/collectors/rocmsmiMetric.go @@ -12,6 +12,7 @@ import ( "errors" "fmt" "slices" + "strconv" "time" cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" @@ -91,7 +92,7 @@ func (m *RocmSmiCollector) Init(config json.RawMessage) error { m.devices = make([]RocmSmiCollectorDevice, 0) for i := range numDevs { - str_i := fmt.Sprintf("%d", i) + str_i := strconv.Itoa(i) if slices.Contains(m.config.ExcludeDevices, str_i) { continue } @@ -297,7 +298,7 @@ func (m *RocmSmiCollector) Read(interval time.Duration, output chan lp.CCMessage y, err := lp.NewMessage("rocm_temp_hbm", dev.tags, dev.meta, map[string]any{"value": value}, timestamp) if err == nil { y.AddTag("stype", "device") - y.AddTag("stype-id", fmt.Sprintf("%d", i)) + y.AddTag("stype-id", strconv.Itoa(i)) output <- y } } diff --git a/collectors/sampleTimerMetric.go b/collectors/sampleTimerMetric.go index 0f00f50..ca16ee5 100644 --- a/collectors/sampleTimerMetric.go +++ b/collectors/sampleTimerMetric.go @@ -37,11 +37,11 @@ type SampleTimerCollector struct { } func (m *SampleTimerCollector) Init(name string, config json.RawMessage) error { - var err error = nil + var err error // Always set the name early in Init() to use it in cclog.Component* functions m.name = "SampleTimerCollector" // This is for later use, also call it early - if err := m.setup(); err != nil { + if err = m.setup(); err != nil { return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err) } // Define meta information sent with each metric diff --git a/collectors/schedstatMetric.go b/collectors/schedstatMetric.go index f3dc2ae..64ebd12 100644 --- a/collectors/schedstatMetric.go +++ b/collectors/schedstatMetric.go @@ -53,7 +53,7 @@ func (m *SchedstatCollector) Init(config json.RawMessage) error { return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err) } // Tell whether the collector should be run in parallel with others (reading files, ...) - // or it should be run serially, mostly for collectors acutally doing measurements + // or it should be run serially, mostly for collectors actually doing measurements // because they should not measure the execution of the other collectors m.parallel = true // Define meta information sent with each metric @@ -90,7 +90,7 @@ func (m *SchedstatCollector) Init(config json.RawMessage) error { waiting, _ := strconv.ParseInt(linefields[8], 10, 64) m.cputags[linefields[0]] = map[string]string{ "type": "hwthread", - "type-id": fmt.Sprintf("%d", cpu), + "type-id": strconv.Itoa(cpu), } m.olddata[linefields[0]] = map[string]int64{ "running": running, diff --git a/collectors/slurmCgroupMetric.go b/collectors/slurmCgroupMetric.go index 8a2c492..6c0a1c0 100644 --- a/collectors/slurmCgroupMetric.go +++ b/collectors/slurmCgroupMetric.go @@ -79,9 +79,10 @@ func ParseCPUs(cpuset string) ([]int, error) { } func GetAllCPUs() ([]int, error) { - data, err := os.ReadFile("/sys/devices/system/cpu/online") + cpuOnline := "/sys/devices/system/cpu/online" + data, err := os.ReadFile(cpuOnline) if err != nil { - return nil, fmt.Errorf("failed to read /sys/devices/system/cpu/online: %v", err) + return nil, fmt.Errorf("failed to read file \"%s\": %w", cpuOnline, err) } return ParseCPUs(strings.TrimSpace(string(data))) } @@ -106,16 +107,18 @@ func (m *SlurmCgroupCollector) Init(config json.RawMessage) error { return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err) } m.parallel = true - m.meta = map[string]string{"source": m.name, "group": "SLURM"} - m.tags = map[string]string{"type": "hwthread"} + m.meta = map[string]string{ + "source": m.name, + "group": "SLURM"} + m.tags = map[string]string{ + "type": "hwthread"} m.cpuUsed = make(map[int]bool) m.cgroupBase = defaultCgroupBase if len(config) > 0 { err = json.Unmarshal(config, &m.config) if err != nil { - cclog.ComponentError(m.name, "Error reading config:", err.Error()) - return err + return fmt.Errorf("%s Init(): Error reading JSON config: %w", m.name, err) } m.excludeMetrics = make(map[string]struct{}) for _, metric := range m.config.ExcludeMetrics { @@ -130,19 +133,16 @@ func (m *SlurmCgroupCollector) Init(config json.RawMessage) error { if !m.useSudo { user, err := user.Current() if err != nil { - cclog.ComponentError(m.name, "Failed to get current user:", err.Error()) - return err + return fmt.Errorf("%s Init(): Failed to get current user: %w", m.name, err) } if user.Uid != "0" { - cclog.ComponentError(m.name, "Reading cgroup files requires root privileges (or enable use_sudo in config)") - return fmt.Errorf("not root") + return fmt.Errorf("%s Init(): Reading cgroup files requires root privileges (or enable use_sudo in config)", m.name) } } m.allCPUs, err = GetAllCPUs() if err != nil { - cclog.ComponentError(m.name, "Error reading online CPUs:", err.Error()) - return err + return fmt.Errorf("%s Init(): Error reading online CPUs: %w", m.name, err) } m.init = true @@ -159,7 +159,9 @@ func (m *SlurmCgroupCollector) ReadJobData(jobdir string) (SlurmJobData, error) CpuSet: []int{}, } - cg := func(f string) string { return filepath.Join(m.cgroupBase, jobdir, f) } + cg := func(f string) string { + return filepath.Join(m.cgroupBase, jobdir, f) + } memUsage, err := m.readFile(cg("memory.current")) if err == nil { @@ -208,8 +210,8 @@ func (m *SlurmCgroupCollector) ReadJobData(jobdir string) (SlurmJobData, error) } } if usageUsec > 0 { - jobdata.CpuUsageUser = (userUsec * 100 / usageUsec) - jobdata.CpuUsageSys = (systemUsec * 100 / usageUsec) + jobdata.CpuUsageUser = (userUsec * 100.0 / usageUsec) + jobdata.CpuUsageSys = (systemUsec * 100.0 / usageUsec) } } @@ -252,12 +254,18 @@ func (m *SlurmCgroupCollector) Read(interval time.Duration, output chan lp.CCMes for _, cpu := range jobdata.CpuSet { coreTags := map[string]string{ "type": "hwthread", - "type-id": fmt.Sprintf("%d", cpu), + "type-id": strconv.Itoa(cpu), } if coreCount > 0 && !m.isExcluded("job_mem_used") { memPerCore := jobdata.MemoryUsage / coreCount - if y, err := lp.NewMessage("job_mem_used", coreTags, m.meta, map[string]any{"value": memPerCore}, timestamp); err == nil { + if y, err := lp.NewMessage( + "job_mem_used", + coreTags, + m.meta, + map[string]any{ + "value": memPerCore}, + timestamp); err == nil { y.AddMeta("unit", "Bytes") output <- y } @@ -265,7 +273,13 @@ func (m *SlurmCgroupCollector) Read(interval time.Duration, output chan lp.CCMes if coreCount > 0 && !m.isExcluded("job_max_mem_used") { maxMemPerCore := jobdata.MaxMemoryUsage / coreCount - if y, err := lp.NewMessage("job_max_mem_used", coreTags, m.meta, map[string]any{"value": maxMemPerCore}, timestamp); err == nil { + if y, err := lp.NewMessage( + "job_max_mem_used", + coreTags, + m.meta, + map[string]any{ + "value": maxMemPerCore}, + timestamp); err == nil { y.AddMeta("unit", "Bytes") output <- y } @@ -273,7 +287,13 @@ func (m *SlurmCgroupCollector) Read(interval time.Duration, output chan lp.CCMes if coreCount > 0 && !m.isExcluded("job_mem_limit") { limitPerCore := jobdata.LimitMemoryUsage / coreCount - if y, err := lp.NewMessage("job_mem_limit", coreTags, m.meta, map[string]any{"value": limitPerCore}, timestamp); err == nil { + if y, err := lp.NewMessage( + "job_mem_limit", + coreTags, + m.meta, + map[string]any{ + "value": limitPerCore}, + timestamp); err == nil { y.AddMeta("unit", "Bytes") output <- y } @@ -281,7 +301,13 @@ func (m *SlurmCgroupCollector) Read(interval time.Duration, output chan lp.CCMes if coreCount > 0 && !m.isExcluded("job_user_cpu") { cpuUserPerCore := jobdata.CpuUsageUser / coreCount - if y, err := lp.NewMessage("job_user_cpu", coreTags, m.meta, map[string]any{"value": cpuUserPerCore}, timestamp); err == nil { + if y, err := lp.NewMessage( + "job_user_cpu", + coreTags, + m.meta, + map[string]any{ + "value": cpuUserPerCore}, + timestamp); err == nil { y.AddMeta("unit", "%") output <- y } @@ -289,7 +315,13 @@ func (m *SlurmCgroupCollector) Read(interval time.Duration, output chan lp.CCMes if coreCount > 0 && !m.isExcluded("job_sys_cpu") { cpuSysPerCore := jobdata.CpuUsageSys / coreCount - if y, err := lp.NewMessage("job_sys_cpu", coreTags, m.meta, map[string]any{"value": cpuSysPerCore}, timestamp); err == nil { + if y, err := lp.NewMessage( + "job_sys_cpu", + coreTags, + m.meta, + map[string]any{ + "value": cpuSysPerCore}, + timestamp); err == nil { y.AddMeta("unit", "%") output <- y } @@ -304,25 +336,43 @@ func (m *SlurmCgroupCollector) Read(interval time.Duration, output chan lp.CCMes if !m.cpuUsed[cpu] { coreTags := map[string]string{ "type": "hwthread", - "type-id": fmt.Sprintf("%d", cpu), + "type-id": strconv.Itoa(cpu), } if !m.isExcluded("job_mem_used") { - if y, err := lp.NewMessage("job_mem_used", coreTags, m.meta, map[string]any{"value": 0}, timestamp); err == nil { + if y, err := lp.NewMessage( + "job_mem_used", + coreTags, + m.meta, + map[string]any{ + "value": 0}, + timestamp); err == nil { y.AddMeta("unit", "Bytes") output <- y } } if !m.isExcluded("job_max_mem_used") { - if y, err := lp.NewMessage("job_max_mem_used", coreTags, m.meta, map[string]any{"value": 0}, timestamp); err == nil { + if y, err := lp.NewMessage( + "job_max_mem_used", + coreTags, + m.meta, + map[string]any{ + "value": 0}, + timestamp); err == nil { y.AddMeta("unit", "Bytes") output <- y } } if !m.isExcluded("job_mem_limit") { - if y, err := lp.NewMessage("job_mem_limit", coreTags, m.meta, map[string]any{"value": 0}, timestamp); err == nil { + if y, err := lp.NewMessage( + "job_mem_limit", + coreTags, + m.meta, + map[string]any{ + "value": 0}, + timestamp); err == nil { y.AddMeta("unit", "Bytes") output <- y } diff --git a/collectors/tempMetric.go b/collectors/tempMetric.go index 26e4573..7ed5d43 100644 --- a/collectors/tempMetric.go +++ b/collectors/tempMetric.go @@ -64,7 +64,7 @@ func (m *TempCollector) Init(config json.RawMessage) error { if len(config) > 0 { err := json.Unmarshal(config, &m.config) if err != nil { - return err + return fmt.Errorf("%s Init(): failed to unmarshal JSON config: %w", m.name, err) } } @@ -80,10 +80,10 @@ func (m *TempCollector) Init(config json.RawMessage) error { globPattern := filepath.Join("/sys/class/hwmon", "*", "temp*_input") inputFiles, err := filepath.Glob(globPattern) if err != nil { - return fmt.Errorf("unable to glob files with pattern '%s': %v", globPattern, err) + return fmt.Errorf("%s Init(): unable to glob files with pattern '%s': %w", m.name, globPattern, err) } if inputFiles == nil { - return fmt.Errorf("unable to find any files with pattern '%s'", globPattern) + return fmt.Errorf("%s Init(): unable to find any files with pattern '%s'", m.name, globPattern) } // Get sensor name for each temperature sensor file @@ -172,7 +172,7 @@ func (m *TempCollector) Init(config json.RawMessage) error { // Empty sensors map if len(m.sensors) == 0 { - return fmt.Errorf("no temperature sensors found") + return fmt.Errorf("%s Init(): no temperature sensors found", m.name) } // Finished initialization diff --git a/collectors/topprocsMetric.go b/collectors/topprocsMetric.go index aa9dcf2..f44b7db 100644 --- a/collectors/topprocsMetric.go +++ b/collectors/topprocsMetric.go @@ -81,7 +81,13 @@ func (m *TopProcsCollector) Read(interval time.Duration, output chan lp.CCMessag lines := strings.Split(string(stdout), "\n") for i := 1; i < m.config.Num_procs+1; i++ { name := fmt.Sprintf("topproc%d", i) - y, err := lp.NewMessage(name, m.tags, m.meta, map[string]any{"value": string(lines[i])}, time.Now()) + y, err := lp.NewMessage( + name, + m.tags, + m.meta, + map[string]any{ + "value": lines[i]}, + time.Now()) if err == nil { output <- y } diff --git a/internal/metricAggregator/metricAggregator.go b/internal/metricAggregator/metricAggregator.go index 4c2d6b1..18a4a30 100644 --- a/internal/metricAggregator/metricAggregator.go +++ b/internal/metricAggregator/metricAggregator.go @@ -137,7 +137,6 @@ func (c *metricAggregator) Eval(starttime time.Time, endtime time.Time, metrics matches := make([]lp.CCMessage, 0) for _, m := range metrics { vars["metric"] = m - //value, err := gval.Evaluate(f.Condition, vars, c.language) value, err := f.gvalCond.EvalBool(context.Background(), vars) if err != nil { cclog.ComponentError("MetricCache", "COLLECT", f.Name, "COND", f.Condition, ":", err.Error()) @@ -171,22 +170,22 @@ func (c *metricAggregator) Eval(starttime time.Time, endtime time.Time, metrics // Check, that only values of one type were collected countValueTypes := 0 if len(valuesFloat64) > 0 { - countValueTypes += 1 + countValueTypes++ } if len(valuesFloat32) > 0 { - countValueTypes += 1 + countValueTypes++ } if len(valuesInt) > 0 { - countValueTypes += 1 + countValueTypes++ } if len(valuesInt32) > 0 { - countValueTypes += 1 + countValueTypes++ } if len(valuesInt64) > 0 { - countValueTypes += 1 + countValueTypes++ } if len(valuesBool) > 0 { - countValueTypes += 1 + countValueTypes++ } if countValueTypes > 1 { cclog.ComponentError("MetricCache", "Collected values of different types") @@ -337,7 +336,9 @@ func (c *metricAggregator) DeleteAggregation(name string) error { if i == -1 { return fmt.Errorf("no aggregation for metric name %s", name) } - c.functions = slices.Delete(c.functions, i, i) + copy(c.functions[i:], c.functions[i+1:]) + c.functions[len(c.functions)-1] = nil + c.functions = c.functions[:len(c.functions)-1] return nil } diff --git a/internal/metricAggregator/metricAggregatorFunctions.go b/internal/metricAggregator/metricAggregatorFunctions.go index d0a72c5..ca5ecf9 100644 --- a/internal/metricAggregator/metricAggregatorFunctions.go +++ b/internal/metricAggregator/metricAggregatorFunctions.go @@ -12,6 +12,7 @@ import ( "fmt" "regexp" "slices" + "strconv" "strings" topo "github.com/ClusterCockpit/cc-metric-collector/pkg/ccTopology" @@ -208,7 +209,7 @@ func infunc(a any, b any) (any, error) { case []int: return slices.Contains(total, match), nil case string: - smatch := fmt.Sprintf("%d", match) + smatch := strconv.Itoa(match) return strings.Contains(total, smatch), nil } diff --git a/internal/metricRouter/metricCache.go b/internal/metricRouter/metricCache.go index 90c4674..f5f9222 100644 --- a/internal/metricRouter/metricCache.go +++ b/internal/metricRouter/metricCache.go @@ -137,12 +137,12 @@ func (c *metricCache) Add(metric lp.CCMessage) { p := c.intervals[c.curPeriod] if p.numMetrics < p.sizeMetrics { p.metrics[p.numMetrics] = metric - p.numMetrics = p.numMetrics + 1 + p.numMetrics++ p.stopstamp = metric.Time() } else { p.metrics = append(p.metrics, metric) - p.numMetrics = p.numMetrics + 1 - p.sizeMetrics = p.sizeMetrics + 1 + p.numMetrics++ + p.sizeMetrics++ p.stopstamp = metric.Time() } c.lock.Unlock() diff --git a/internal/metricRouter/metricRouter.go b/internal/metricRouter/metricRouter.go index 993733a..6c7398a 100644 --- a/internal/metricRouter/metricRouter.go +++ b/internal/metricRouter/metricRouter.go @@ -186,10 +186,6 @@ func (r *metricRouter) Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, rout return fmt.Errorf("MessageProcessor AddAddTagsByCondition() failed: %w", err) } - // r.config.dropMetrics = make(map[string]bool) - // for _, mname := range r.config.DropMetrics { - // r.config.dropMetrics[mname] = true - // } return nil } @@ -208,7 +204,7 @@ func getParamMap(point lp.CCMessage) map[string]any { return params } -// DoAddTags adds a tag when condition is fullfiled +// DoAddTags adds a tag when condition is fulfilled func (r *metricRouter) DoAddTags(point lp.CCMessage) { var conditionMatches bool for _, m := range r.config.AddTags { @@ -230,83 +226,6 @@ func (r *metricRouter) DoAddTags(point lp.CCMessage) { } } -// DoDelTags removes a tag when condition is fullfiled -// func (r *metricRouter) DoDelTags(point lp.CCMessage) { -// var conditionMatches bool -// for _, m := range r.config.DelTags { -// if m.Condition == "*" { -// // Condition is always matched -// conditionMatches = true -// } else { -// // Evaluate condition -// var err error -// conditionMatches, err = agg.EvalBoolCondition(m.Condition, getParamMap(point)) -// if err != nil { -// cclog.ComponentError("MetricRouter", err.Error()) -// conditionMatches = false -// } -// } -// if conditionMatches { -// point.RemoveTag(m.Key) -// } -// } -// } - -// Conditional test whether a metric should be dropped -// func (r *metricRouter) dropMetric(point lp.CCMessage) bool { -// // Simple drop check -// if conditionMatches, ok := r.config.dropMetrics[point.Name()]; ok { -// return conditionMatches -// } - -// // Checking the dropping conditions -// for _, m := range r.config.DropMetricsIf { -// conditionMatches, err := agg.EvalBoolCondition(m, getParamMap(point)) -// if err != nil { -// cclog.ComponentError("MetricRouter", err.Error()) -// conditionMatches = false -// } -// if conditionMatches { -// return conditionMatches -// } -// } - -// // No dropping condition met -// return false -// } - -// func (r *metricRouter) prepareUnit(point lp.CCMessage) bool { -// if r.config.NormalizeUnits { -// if in_unit, ok := point.GetMeta("unit"); ok { -// u := units.NewUnit(in_unit) -// if u.Valid() { -// point.AddMeta("unit", u.Short()) -// } -// } -// } -// if newP, ok := r.config.ChangeUnitPrefix[point.Name()]; ok { - -// newPrefix := units.NewPrefix(newP) - -// if in_unit, ok := point.GetMeta("unit"); ok && newPrefix != units.InvalidPrefix { -// u := units.NewUnit(in_unit) -// if u.Valid() { -// cclog.ComponentDebug("MetricRouter", "Change prefix to", newP, "for metric", point.Name()) -// conv, out_unit := units.GetUnitPrefixFactor(u, newPrefix) -// if conv != nil && out_unit.Valid() { -// if val, ok := point.GetField("value"); ok { -// point.AddField("value", conv(val)) -// point.AddMeta("unit", out_unit.Short()) -// } -// } -// } - -// } -// } - -// return true -// } - // Start starts the metric router func (r *metricRouter) Start() { // start timer if configured @@ -322,28 +241,7 @@ func (r *metricRouter) Start() { cclog.ComponentDebug("MetricRouter", "DONE") } - // Forward takes a received metric, adds or deletes tags - // and forwards it to the output channels - // forward := func(point lp.CCMessage) { - // cclog.ComponentDebug("MetricRouter", "FORWARD", point) - // r.DoAddTags(point) - // r.DoDelTags(point) - // name := point.Name() - // if new, ok := r.config.RenameMetrics[name]; ok { - // point.SetName(new) - // point.AddMeta("oldname", name) - // r.DoAddTags(point) - // r.DoDelTags(point) - // } - - // r.prepareUnit(point) - - // for _, o := range r.outputs { - // o <- point - // } - // } - - // Foward message received from collector channel + // Forward message received from collector channel coll_forward := func(p lp.CCMessage) { // receive from metric collector //p.AddTag(r.config.HostnameTagName, r.hostname) @@ -356,11 +254,6 @@ func (r *metricRouter) Start() { o <- m } } - // if !r.dropMetric(p) { - // for _, o := range r.outputs { - // o <- point - // } - // } // even if the metric is dropped, it is stored in the cache for // aggregations if r.config.NumCacheIntervals > 0 { @@ -380,9 +273,6 @@ func (r *metricRouter) Start() { o <- m } } - // if !r.dropMetric(p) { - // forward(p) - // } } // Forward message received from cache channel diff --git a/pkg/ccTopology/ccTopology.go b/pkg/ccTopology/ccTopology.go index 569ab56..0b1bf17 100644 --- a/pkg/ccTopology/ccTopology.go +++ b/pkg/ccTopology/ccTopology.go @@ -51,14 +51,13 @@ var cache struct { func fileToInt(path string) int { buffer, err := os.ReadFile(path) if err != nil { - log.Print(err) - cclogger.ComponentError("ccTopology", "fileToInt", "Reading", path, ":", err.Error()) + cclogger.ComponentError("ccTopology", fmt.Sprintf("fileToInt(): Reading \"%s\": %v", path, err)) return -1 } stringBuffer := strings.TrimSpace(string(buffer)) id, err := strconv.Atoi(stringBuffer) if err != nil { - cclogger.ComponentError("ccTopology", "fileToInt", "Parsing", path, ":", stringBuffer, err.Error()) + cclogger.ComponentError("ccTopology", fmt.Sprintf("fileToInt(): Parsing \"%s\": %v", stringBuffer, err)) return -1 } return id @@ -304,20 +303,19 @@ func GetTypeList(topology_type string) []int { } func GetTypeId(hwt HwthreadEntry, topology_type string) (int, error) { - var err error = nil switch topology_type { case "node": - return 0, err + return 0, nil case "socket": - return hwt.Socket, err + return hwt.Socket, nil case "die": - return hwt.Die, err + return hwt.Die, nil case "memoryDomain": - return hwt.NumaDomain, err + return hwt.NumaDomain, nil case "core": - return hwt.Core, err + return hwt.Core, nil case "hwthread": - return hwt.CpuID, err + return hwt.CpuID, nil } return -1, fmt.Errorf("unknown topology type '%s'", topology_type) } diff --git a/pkg/multiChanTicker/multiChanTicker.go b/pkg/multiChanTicker/multiChanTicker.go index d208d33..86f3376 100644 --- a/pkg/multiChanTicker/multiChanTicker.go +++ b/pkg/multiChanTicker/multiChanTicker.go @@ -21,7 +21,7 @@ type multiChanTicker struct { type MultiChanTicker interface { Init(duration time.Duration) - AddChannel(chan time.Time) + AddChannel(channel chan time.Time) Close() }