From 162cce0fdae24f2ac93ea98e52c3ae8ffe250640 Mon Sep 17 00:00:00 2001 From: Thomas Gruber Date: Wed, 14 Dec 2022 17:02:39 +0100 Subject: [PATCH] Merge `develop` branch into `main` (#96) * InfiniBandCollector: Scale raw readings from octets to bytes * Fix clock frequency coming from LikwidCollector and update docs * Build DEB package for Ubuntu 20.04 for releases * Fix memstat collector with numa_stats option * Remove useless prints from MemstatCollector * Replace ioutils with os and io (#87) * Use lower case for error strings in RocmSmiCollector * move maybe-usable-by-other-cc-components to pkg. Fix all files to use the new paths (#88) * Add collector for monitoring the execution of cc-metric-collector itself (#81) * Add collector to monitor execution of cc-metric-collector itself * Register SelfCollector * Fix import paths for moved packages * Check if at least one CPU with frequency information was detected * Correct type: /proc/stats -> /proc/stat * Update README.md * Run ipmitool asynchron. Improved error handling. * Corrected some typos * Add running average power limit (RAPL) metric collector * Add running average power limit (RAPL) metric collector * Do not mess up with the orignal configuration * * Corrected json config in numastatsMetric.md * Added some debug output to numastatsMetric.go * Fixed computing number of physical packages for non continous physical package IDs (e.g. on Ampere Altra Q80-30) * Fix kernel panic for receiver config with missing receiver type * Add receiver to gather remote IPMI sensor metrics * Added config option to add ipmi-sensors command line options * Add documentaion for IPMI receiver * Update to latest version of included go modules * Add go.mod to App dependency * Try to use common metric tags across hardware vendors * Add IPMI metric: current * remove prefix enumeration like 01-... * Add IPMI receiver example configuration to receivers.json * Minimal formating changes * Add hostlist package * Added tests for hostlist Expand() * Use package hostlist to expand a host list * Use package hostlist to expand a host list * Some servers return "ConsumedPowerWatt":65535 instead of "ConsumedPowerWatt":null * Updated to latest package versions * Do not allow unknown fields in JSON configuration file * Add workflow to customize packages to docs * NFS I/O Stats Collector (#91) * Initial version * Delete values for vanished mount points and comments * Fix for Likwid collector (#95) * Run LIKWID in separate thread and check metric type * Change LIKWID collector documentation to use 'type' instead of 'scope' * Re-initialize LIKWID after one read is missing due to lock toggle * Register cc-metric-collector at Zenodo (#93) * Add initial version of Zenodo project file * Orcid ID added * Update .zenodo.json Co-authored-by: Holger Obermaier * Update ipmiMetric.go Co-authored-by: Holger Obermaier <40787752+ho-ob@users.noreply.github.com> Co-authored-by: Holger Obermaier --- .zenodo.json | 29 ++ Makefile | 2 +- collectors/README.md | 2 +- collectors/collectorManager.go | 1 + collectors/cpufreqCpuinfoMetric.go | 5 + collectors/cpufreqMetric.go | 30 +- collectors/cpufreqMetric.md | 4 +- collectors/cpustatMetric.md | 4 +- collectors/ipmiMetric.go | 97 ++++-- collectors/ipmiMetric.md | 5 +- collectors/likwidMetric.go | 247 ++++++++----- collectors/likwidMetric.md | 18 +- collectors/nfsiostatMetric.go | 166 +++++++++ collectors/nfsiostatMetric.md | 27 ++ collectors/numastatsMetric.go | 39 ++- collectors/numastatsMetric.md | 8 +- collectors/raplMetric.go | 262 ++++++++++++++ collectors/raplMetric.md | 18 + collectors/sampleMetric.go | 10 +- collectors/sampleTimerMetric.go | 6 +- docs/building.md | 20 +- go.mod | 33 +- pkg/hostlist/hostlist.go | 125 +++++++ pkg/hostlist/hostlist_test.go | 126 +++++++ receivers.json | 35 +- receivers/ipmiReceiver.go | 534 +++++++++++++++++++++++++++++ receivers/ipmiReceiver.md | 48 +++ receivers/receiveManager.go | 10 +- receivers/redfishReceiver.go | 41 ++- receivers/redfishReceiver.md | 14 +- 30 files changed, 1738 insertions(+), 228 deletions(-) create mode 100644 .zenodo.json create mode 100644 collectors/nfsiostatMetric.go create mode 100644 collectors/nfsiostatMetric.md create mode 100644 collectors/raplMetric.go create mode 100644 collectors/raplMetric.md create mode 100644 pkg/hostlist/hostlist.go create mode 100644 pkg/hostlist/hostlist_test.go create mode 100644 receivers/ipmiReceiver.go create mode 100644 receivers/ipmiReceiver.md diff --git a/.zenodo.json b/.zenodo.json new file mode 100644 index 0000000..482eb5e --- /dev/null +++ b/.zenodo.json @@ -0,0 +1,29 @@ +{ + "title": "cc-metric-collector", + "description": "Monitoring agent for ClusterCockpit.", + "creators": [ + { + "affiliation": "Regionales Rechenzentrum Erlangen, Friedrich-Alexander-Universität Erlangen-Nürnberg", + "name": "Thomas Gruber", + "orcid": "0000-0001-5560-6964" + }, + { + "affiliation": "Steinbuch Centre for Computing, Karlsruher Institut für Technologie", + "name": "Holger Obermaier", + "orcid": "0000-0002-6830-6626" + } + ], + "upload_type": "software", + "license": "MIT", + "access_right": "open", + "keywords": [ + "performance-monitoring", + "cluster-monitoring", + "open-source" + ], + "communities": [ + { + "identifier": "clustercockpit" + } + ] +} diff --git a/Makefile b/Makefile index d63176d..721e20f 100644 --- a/Makefile +++ b/Makefile @@ -22,7 +22,7 @@ GOBIN = $(shell which go) .PHONY: all all: $(APP) -$(APP): $(GOSRC) +$(APP): $(GOSRC) go.mod make -C collectors $(GOBIN) get $(GOBIN) build -o $(APP) $(GOSRC_APP) diff --git a/collectors/README.md b/collectors/README.md index 8002ed2..24119ad 100644 --- a/collectors/README.md +++ b/collectors/README.md @@ -51,7 +51,7 @@ A collector reads data from any source, parses it to metrics and submits these m * `Name() string`: Return the name of the collector * `Init(config json.RawMessage) error`: Initializes the collector using the given collector-specific config in JSON. Check if needed files/commands exists, ... * `Initialized() bool`: Check if a collector is successfully initialized -* `Read(duration time.Duration, output chan ccMetric.CCMetric)`: Read, parse and submit data to the `output` channel as [`CCMetric`](../internal/ccMetric/README.md). If the collector has to measure anything for some duration, use the provided function argument `duration`. +* `Read(duration time.Duration, output chan ccMetric.CCMetric)`: Read, parse and submit data to the `output` channel as [`CCMetric`](../internal/ccMetric/README.md). If the collector has to measure anything for some duration, use the provided function argument `duration`. * `Close()`: Closes down the collector. It is recommanded to call `setup()` in the `Init()` function. diff --git a/collectors/collectorManager.go b/collectors/collectorManager.go index ea648ef..11b501a 100644 --- a/collectors/collectorManager.go +++ b/collectors/collectorManager.go @@ -36,6 +36,7 @@ var AvailableCollectors = map[string]MetricCollector{ "numastats": new(NUMAStatsCollector), "beegfs_meta": new(BeegfsMetaCollector), "beegfs_storage": new(BeegfsStorageCollector), + "rapl": new(RAPLCollector), "rocm_smi": new(RocmSmiCollector), "self": new(SelfCollector), "schedstat": new(SchedstatCollector), diff --git a/collectors/cpufreqCpuinfoMetric.go b/collectors/cpufreqCpuinfoMetric.go index a6716d3..85f6d9e 100644 --- a/collectors/cpufreqCpuinfoMetric.go +++ b/collectors/cpufreqCpuinfoMetric.go @@ -142,6 +142,11 @@ func (m *CPUFreqCpuInfoCollector) Init(config json.RawMessage) error { } } + // Check if at least one CPU with frequency information was detected + if len(m.topology) == 0 { + return fmt.Errorf("No CPU frequency info found in %s", cpuInfoFile) + } + numPhysicalPackageID_int := maxPhysicalPackageID + 1 numPhysicalPackageID := fmt.Sprint(numPhysicalPackageID_int) numNonHT := fmt.Sprint(numNonHT_int) diff --git a/collectors/cpufreqMetric.go b/collectors/cpufreqMetric.go index cf67457..d55b4c1 100644 --- a/collectors/cpufreqMetric.go +++ b/collectors/cpufreqMetric.go @@ -23,20 +23,18 @@ type CPUFreqCollectorTopology struct { numPhysicalPackages string // number of sockets / packages numPhysicalPackages_int int64 isHT bool - numNonHT string // number of non hyperthreading processors + numNonHT string // number of non hyper-threading processors numNonHT_int int64 scalingCurFreqFile string tagSet map[string]string } -// // CPUFreqCollector // a metric collector to measure the current frequency of the CPUs // as obtained from the hardware (in KHz) -// Only measure on the first hyper thread +// Only measure on the first hyper-thread // // See: https://www.kernel.org/doc/html/latest/admin-guide/pm/cpufreq.html -// type CPUFreqCollector struct { metricCollector topology []CPUFreqCollectorTopology @@ -126,7 +124,7 @@ func (m *CPUFreqCollector) Init(config json.RawMessage) error { t.scalingCurFreqFile = scalingCurFreqFile } - // is processor a hyperthread? + // is processor a hyper-thread? coreSeenBefore := make(map[string]bool) for i := range m.topology { t := &m.topology[i] @@ -136,23 +134,20 @@ func (m *CPUFreqCollector) Init(config json.RawMessage) error { coreSeenBefore[globalID] = true } - // number of non hyper thread cores and packages / sockets + // number of non hyper-thread cores and packages / sockets var numNonHT_int int64 = 0 - var maxPhysicalPackageID int64 = 0 + PhysicalPackageIDs := make(map[int64]struct{}) for i := range m.topology { t := &m.topology[i] - // Update maxPackageID - if t.physicalPackageID_int > maxPhysicalPackageID { - maxPhysicalPackageID = t.physicalPackageID_int - } - if !t.isHT { numNonHT_int++ } + + PhysicalPackageIDs[t.physicalPackageID_int] = struct{}{} } - numPhysicalPackageID_int := maxPhysicalPackageID + 1 + numPhysicalPackageID_int := int64(len(PhysicalPackageIDs)) numPhysicalPackageID := fmt.Sprint(numPhysicalPackageID_int) numNonHT := fmt.Sprint(numNonHT_int) for i := range m.topology { @@ -168,6 +163,13 @@ func (m *CPUFreqCollector) Init(config json.RawMessage) error { } } + // Initialized + cclog.ComponentDebug( + m.name, + "initialized", + numPhysicalPackageID_int, "physical packages,", + len(cpuDirs), "CPUs,", + numNonHT, "non-hyper-threading CPUs") m.init = true return nil } @@ -182,7 +184,7 @@ func (m *CPUFreqCollector) Read(interval time.Duration, output chan lp.CCMetric) for i := range m.topology { t := &m.topology[i] - // skip hyperthreads + // skip hyper-threads if t.isHT { continue } diff --git a/collectors/cpufreqMetric.md b/collectors/cpufreqMetric.md index 71a6446..14f9a00 100644 --- a/collectors/cpufreqMetric.md +++ b/collectors/cpufreqMetric.md @@ -1,4 +1,5 @@ ## `cpufreq_cpuinfo` collector + ```json "cpufreq": { "exclude_metrics": [] @@ -8,4 +9,5 @@ The `cpufreq` collector reads the clock frequency from `/sys/devices/system/cpu/cpu*/cpufreq` and outputs a handful **hwthread** metrics. Metrics: -* `cpufreq` \ No newline at end of file + +* `cpufreq` diff --git a/collectors/cpustatMetric.md b/collectors/cpustatMetric.md index 8122afe..8963536 100644 --- a/collectors/cpustatMetric.md +++ b/collectors/cpustatMetric.md @@ -1,5 +1,6 @@ ## `cpustat` collector + ```json "cpustat": { "exclude_metrics": [ @@ -8,9 +9,10 @@ } ``` -The `cpustat` collector reads data from `/proc/stats` and outputs a handful **node** and **hwthread** metrics. If a metric is not required, it can be excluded from forwarding it to the sink. +The `cpustat` collector reads data from `/proc/stat` and outputs a handful **node** and **hwthread** metrics. If a metric is not required, it can be excluded from forwarding it to the sink. Metrics: + * `cpu_user` * `cpu_nice` * `cpu_system` diff --git a/collectors/ipmiMetric.go b/collectors/ipmiMetric.go index 32c4c45..d53d8af 100644 --- a/collectors/ipmiMetric.go +++ b/collectors/ipmiMetric.go @@ -1,51 +1,57 @@ package collectors import ( + "bufio" + "bytes" "encoding/json" "errors" + "fmt" + "io" "log" - "os" "os/exec" "strconv" "strings" "time" - + cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric" ) -const IPMITOOL_PATH = `ipmitool` const IPMISENSORS_PATH = `ipmi-sensors` -type IpmiCollectorConfig struct { - ExcludeDevices []string `json:"exclude_devices"` - IpmitoolPath string `json:"ipmitool_path"` - IpmisensorsPath string `json:"ipmisensors_path"` -} - type IpmiCollector struct { metricCollector - //tags map[string]string - //matches map[string]string - config IpmiCollectorConfig + config struct { + ExcludeDevices []string `json:"exclude_devices"` + IpmitoolPath string `json:"ipmitool_path"` + IpmisensorsPath string `json:"ipmisensors_path"` + } ipmitool string ipmisensors string } func (m *IpmiCollector) Init(config json.RawMessage) error { + // Check if already initialized + if m.init { + return nil + } + m.name = "IpmiCollector" m.setup() m.parallel = true - m.meta = map[string]string{"source": m.name, "group": "IPMI"} - m.config.IpmitoolPath = string(IPMITOOL_PATH) - m.config.IpmisensorsPath = string(IPMISENSORS_PATH) - m.ipmitool = "" - m.ipmisensors = "" + m.meta = map[string]string{ + "source": m.name, + "group": "IPMI", + } + // default path to IPMI tools + m.config.IpmitoolPath = "ipmitool" + m.config.IpmisensorsPath = "ipmi-sensors" if len(config) > 0 { err := json.Unmarshal(config, &m.config) if err != nil { return err } } + // Check if executables ipmitool or ipmisensors are found p, err := exec.LookPath(m.config.IpmitoolPath) if err == nil { m.ipmitool = p @@ -62,25 +68,33 @@ func (m *IpmiCollector) Init(config json.RawMessage) error { } func (m *IpmiCollector) readIpmiTool(cmd string, output chan lp.CCMetric) { + + // Setup ipmitool command command := exec.Command(cmd, "sensor") - command.Wait() - stdout, err := command.Output() - if err != nil { - log.Print(err) + stdout, _ := command.StdoutPipe() + errBuf := new(bytes.Buffer) + command.Stderr = errBuf + + // start command + if err := command.Start(); err != nil { + cclog.ComponentError( + m.name, + fmt.Sprintf("readIpmiTool(): Failed to start command \"%s\": %v", command.String(), err), + ) return } - ll := strings.Split(string(stdout), "\n") - - for _, line := range ll { - lv := strings.Split(line, "|") + // Read command output + scanner := bufio.NewScanner(stdout) + for scanner.Scan() { + lv := strings.Split(scanner.Text(), "|") if len(lv) < 3 { continue } - v, err := strconv.ParseFloat(strings.Trim(lv[1], " "), 64) + v, err := strconv.ParseFloat(strings.TrimSpace(lv[1]), 64) if err == nil { - name := strings.ToLower(strings.Replace(strings.Trim(lv[0], " "), " ", "_", -1)) - unit := strings.Trim(lv[2], " ") + name := strings.ToLower(strings.Replace(strings.TrimSpace(lv[0]), " ", "_", -1)) + unit := strings.TrimSpace(lv[2]) if unit == "Volts" { unit = "Volts" } else if unit == "degrees C" { @@ -98,6 +112,17 @@ func (m *IpmiCollector) readIpmiTool(cmd string, output chan lp.CCMetric) { } } } + + // Wait for command end + if err := command.Wait(); err != nil { + errMsg, _ := io.ReadAll(errBuf) + cclog.ComponentError( + m.name, + fmt.Sprintf("readIpmiTool(): Failed to wait for the end of command \"%s\": %v\n", command.String(), err), + fmt.Sprintf("readIpmiTool(): command stderr: \"%s\"\n", string(errMsg)), + ) + return + } } func (m *IpmiCollector) readIpmiSensors(cmd string, output chan lp.CCMetric) { @@ -131,16 +156,16 @@ func (m *IpmiCollector) readIpmiSensors(cmd string, output chan lp.CCMetric) { } func (m *IpmiCollector) Read(interval time.Duration, output chan lp.CCMetric) { + + // Check if already initialized + if !m.init { + return + } + if len(m.config.IpmitoolPath) > 0 { - _, err := os.Stat(m.config.IpmitoolPath) - if err == nil { - m.readIpmiTool(m.config.IpmitoolPath, output) - } + m.readIpmiTool(m.config.IpmitoolPath, output) } else if len(m.config.IpmisensorsPath) > 0 { - _, err := os.Stat(m.config.IpmisensorsPath) - if err == nil { - m.readIpmiSensors(m.config.IpmisensorsPath, output) - } + m.readIpmiSensors(m.config.IpmisensorsPath, output) } } diff --git a/collectors/ipmiMetric.md b/collectors/ipmiMetric.md index fe83759..3c976d2 100644 --- a/collectors/ipmiMetric.md +++ b/collectors/ipmiMetric.md @@ -8,9 +8,6 @@ } ``` -The `ipmistat` collector reads data from `ipmitool` (`ipmitool sensor`) or `ipmi-sensors` (`ipmi-sensors --sdr-cache-recreate --comma-separated-output`). +The `ipmistat` collector reads data from `ipmitool` (`ipmitool sensor`) or `ipmi-sensors` (`ipmi-sensors --sdr-cache-recreate --comma-separated-output`). The metrics depend on the output of the underlying tools but contain temperature, power and energy metrics. - - - diff --git a/collectors/likwidMetric.go b/collectors/likwidMetric.go index 265d84c..68749d2 100644 --- a/collectors/likwidMetric.go +++ b/collectors/likwidMetric.go @@ -28,6 +28,7 @@ import ( lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric" topo "github.com/ClusterCockpit/cc-metric-collector/pkg/ccTopology" "github.com/NVIDIA/go-nvml/pkg/dl" + "golang.design/x/thread" ) const ( @@ -71,18 +72,20 @@ type LikwidCollectorConfig struct { type LikwidCollector struct { metricCollector - cpulist []C.int - cpu2tid map[int]int - sock2tid map[int]int - metrics map[C.int]map[string]int - groups []C.int - config LikwidCollectorConfig - gmresults map[int]map[string]float64 - basefreq float64 - running bool - initialized bool - likwidGroups map[C.int]LikwidEventsetConfig - lock sync.Mutex + cpulist []C.int + cpu2tid map[int]int + sock2tid map[int]int + metrics map[C.int]map[string]int + groups []C.int + config LikwidCollectorConfig + gmresults map[int]map[string]float64 + basefreq float64 + running bool + initialized bool + needs_reinit bool + likwidGroups map[C.int]LikwidEventsetConfig + lock sync.Mutex + measureThread thread.Thread } type LikwidMetric struct { @@ -92,6 +95,18 @@ type LikwidMetric struct { group_idx int } +func checkMetricType(t string) bool { + valid := map[string]bool{ + "node": true, + "socket": true, + "hwthread": true, + "core": true, + "memoryDomain": true, + } + _, ok := valid[t] + return ok +} + func eventsToEventStr(events map[string]string) string { elist := make([]string, 0) for k, v := range events { @@ -179,6 +194,7 @@ func (m *LikwidCollector) Init(config json.RawMessage) error { m.name = "LikwidCollector" m.parallel = false m.initialized = false + m.needs_reinit = true m.running = false m.config.AccessMode = LIKWID_DEF_ACCESSMODE m.config.LibraryPath = LIKWID_LIB_NAME @@ -239,7 +255,7 @@ func (m *LikwidCollector) Init(config json.RawMessage) error { } for _, metric := range evset.Metrics { // Try to evaluate the metric - if testLikwidMetricFormula(metric.Calc, params) { + if testLikwidMetricFormula(metric.Calc, params) && checkMetricType(metric.Type) { // Add the computable metric to the parameter list for the global metrics globalParams = append(globalParams, metric.Name) totalMetrics++ @@ -257,6 +273,9 @@ func (m *LikwidCollector) Init(config json.RawMessage) error { if !testLikwidMetricFormula(metric.Calc, globalParams) { cclog.ComponentError(m.name, "Calculation for metric", metric.Name, "failed") metric.Calc = "" + } else if !checkMetricType(metric.Type) { + cclog.ComponentError(m.name, "Metric", metric.Name, "has invalid type") + metric.Calc = "" } else { totalMetrics++ } @@ -268,6 +287,7 @@ func (m *LikwidCollector) Init(config json.RawMessage) error { cclog.ComponentError(m.name, err.Error()) return err } + m.measureThread = thread.New() m.init = true return nil } @@ -281,6 +301,7 @@ func (m *LikwidCollector) takeMeasurement(evset LikwidEventsetConfig, interval t if ret != 0 { var err error = nil var skip bool = false + cclog.ComponentDebug(m.name, "Setup returns", ret) if ret == -37 { skip = true } else { @@ -289,6 +310,7 @@ func (m *LikwidCollector) takeMeasurement(evset LikwidEventsetConfig, interval t m.lock.Unlock() return skip, err } + m.running = true ret = C.perfmon_startCounters() if ret != 0 { var err error = nil @@ -301,7 +323,7 @@ func (m *LikwidCollector) takeMeasurement(evset LikwidEventsetConfig, interval t m.lock.Unlock() return skip, err } - m.running = true + ret = C.perfmon_readCounters() time.Sleep(interval) m.running = false ret = C.perfmon_stopCounters() @@ -316,6 +338,24 @@ func (m *LikwidCollector) takeMeasurement(evset LikwidEventsetConfig, interval t m.lock.Unlock() return skip, err } + m.running = false + runtime := float64(C.perfmon_getLastTimeOfGroup(evset.gid)) + // Go over events and get the results + for eidx, counter := range evset.eorder { + gctr := C.GoString(counter) + for _, tid := range m.cpu2tid { + res := C.perfmon_getLastResult(evset.gid, C.int(eidx), C.int(tid)) + fres := float64(res) + if m.config.InvalidToZero && (math.IsNaN(fres) || math.IsInf(fres, 0)) { + cclog.ComponentDebug(m.name, "Sanitize", gctr, "to zero") + fres = 0.0 + } + evset.results[tid][gctr] = fres + } + } + for _, tid := range m.cpu2tid { + evset.results[tid]["time"] = runtime + } } m.lock.Unlock() return false, nil @@ -325,19 +365,8 @@ func (m *LikwidCollector) takeMeasurement(evset LikwidEventsetConfig, interval t func (m *LikwidCollector) calcEventsetMetrics(evset LikwidEventsetConfig, interval time.Duration, output chan lp.CCMetric) error { invClock := float64(1.0 / m.basefreq) - // Go over events and get the results - for eidx, counter := range evset.eorder { - gctr := C.GoString(counter) - for _, tid := range m.cpu2tid { - res := C.perfmon_getLastResult(evset.gid, C.int(eidx), C.int(tid)) - fres := float64(res) - if m.config.InvalidToZero && (math.IsNaN(fres) || math.IsInf(fres, 0)) { - fres = 0.0 - } - evset.results[tid][gctr] = fres - evset.results[tid]["time"] = interval.Seconds() - evset.results[tid]["inverseClock"] = invClock - } + for _, tid := range m.cpu2tid { + evset.results[tid]["inverseClock"] = invClock } // Go over the event set metrics, derive the value out of the event:counter values and send it @@ -431,6 +460,28 @@ func (m *LikwidCollector) calcGlobalMetrics(interval time.Duration, output chan return nil } +func (m *LikwidCollector) ReInit() error { + C.perfmon_finalize() + ret := C.perfmon_init(C.int(len(m.cpulist)), &m.cpulist[0]) + if ret != 0 { + return nil + } + for i, evset := range m.config.Eventsets { + var gid C.int + if len(evset.Events) > 0 { + //skip := false + likwidGroup := genLikwidEventSet(evset) + gid = C.perfmon_addEventSet(likwidGroup.estr) + if gid >= 0 { + likwidGroup.gid = gid + likwidGroup.internal = i + m.likwidGroups[gid] = likwidGroup + } + } + } + return nil +} + func (m *LikwidCollector) LateInit() error { var ret C.int if m.initialized { @@ -445,6 +496,9 @@ func (m *LikwidCollector) LateInit() error { os.Setenv("PATH", m.config.DaemonPath+":"+p) } C.HPMmode(1) + for _, c := range m.cpulist { + C.HPMaddThread(c) + } } cclog.ComponentDebug(m.name, "initialize LIKWID topology") ret = C.topology_init() @@ -468,48 +522,53 @@ func (m *LikwidCollector) LateInit() error { m.basefreq = getBaseFreq() cclog.ComponentDebug(m.name, "BaseFreq", m.basefreq) - cclog.ComponentDebug(m.name, "initialize LIKWID perfmon module") - ret = C.perfmon_init(C.int(len(m.cpulist)), &m.cpulist[0]) - if ret != 0 { - var err error = nil - C.topology_finalize() - if ret != -22 { - err = errors.New("failed to initialize LIKWID perfmon") - cclog.ComponentError(m.name, err.Error()) - } else { - err = errors.New("access to LIKWID perfmon locked") - } - return err + if m.needs_reinit { + m.ReInit() + m.needs_reinit = false } - // While adding the events, we test the metrics whether they can be computed at all - for i, evset := range m.config.Eventsets { - var gid C.int - if len(evset.Events) > 0 { - skip := false - likwidGroup := genLikwidEventSet(evset) - for _, g := range m.likwidGroups { - if likwidGroup.go_estr == g.go_estr { - skip = true - break - } - } - if skip { - continue - } - // Now we add the list of events to likwid - gid = C.perfmon_addEventSet(likwidGroup.estr) - if gid >= 0 { - likwidGroup.gid = gid - likwidGroup.internal = i - m.likwidGroups[gid] = likwidGroup - } - } else { - cclog.ComponentError(m.name, "Invalid Likwid eventset config, no events given") - continue - } + // cclog.ComponentDebug(m.name, "initialize LIKWID perfmon module") + // ret = C.perfmon_init(C.int(len(m.cpulist)), &m.cpulist[0]) + // if ret != 0 { + // var err error = nil + // C.topology_finalize() + // if ret != -22 { + // err = errors.New("failed to initialize LIKWID perfmon") + // cclog.ComponentError(m.name, err.Error()) + // } else { + // err = errors.New("access to LIKWID perfmon locked") + // } + // return err + // } - } + // // While adding the events, we test the metrics whether they can be computed at all + // for i, evset := range m.config.Eventsets { + // var gid C.int + // if len(evset.Events) > 0 { + // //skip := false + // likwidGroup := genLikwidEventSet(evset) + // // for _, g := range m.likwidGroups { + // // if likwidGroup.go_estr == g.go_estr { + // // skip = true + // // break + // // } + // // } + // // if skip { + // // continue + // // } + // // Now we add the list of events to likwid + // gid = C.perfmon_addEventSet(likwidGroup.estr) + // if gid >= 0 { + // likwidGroup.gid = gid + // likwidGroup.internal = i + // m.likwidGroups[gid] = likwidGroup + // } + // } else { + // cclog.ComponentError(m.name, "Invalid Likwid eventset config, no events given") + // continue + // } + + // } // If no event set could be added, shut down LikwidCollector if len(m.likwidGroups) == 0 { @@ -540,38 +599,48 @@ func (m *LikwidCollector) Read(interval time.Duration, output chan lp.CCMetric) return } - if !m.initialized { - m.lock.Lock() - err = m.LateInit() - if err != nil { + m.measureThread.Call(func() { + if !m.initialized { + m.lock.Lock() + err = m.LateInit() + if err != nil { + m.lock.Unlock() + cclog.ComponentError(m.name, "lateinit failed") + return + } + m.initialized = true m.lock.Unlock() - return + skip = true } - m.initialized = true - m.lock.Unlock() - } - if m.initialized && !skip { - for _, evset := range m.likwidGroups { - if !skip { - // measure event set 'i' for 'interval' seconds - skip, err = m.takeMeasurement(evset, interval) - if err != nil { - cclog.ComponentError(m.name, err.Error()) - return + if m.initialized && !skip { + time := interval + for _, evset := range m.likwidGroups { + if !skip { + // measure event set 'i' for 'interval' seconds + skip, err = m.takeMeasurement(evset, interval) + if err != nil { + cclog.ComponentError(m.name, err.Error()) + return + } + } + + if !skip { + // read measurements and derive event set metrics + m.calcEventsetMetrics(evset, time, output) } } if !skip { - // read measurements and derive event set metrics - m.calcEventsetMetrics(evset, interval, output) + // use the event set metrics to derive the global metrics + m.calcGlobalMetrics(time, output) + } + if skip { + m.needs_reinit = true + m.initialized = false } } - if !skip { - // use the event set metrics to derive the global metrics - m.calcGlobalMetrics(interval, output) - } - } + }) } func (m *LikwidCollector) Close() { diff --git a/collectors/likwidMetric.md b/collectors/likwidMetric.md index 54640dc..c080027 100644 --- a/collectors/likwidMetric.md +++ b/collectors/likwidMetric.md @@ -41,7 +41,7 @@ The `likwid` collector is probably the most complicated collector. The LIKWID li The `likwid` configuration consists of two parts, the `eventsets` and `globalmetrics`: - An event set list itself has two parts, the `events` and a set of derivable `metrics`. Each of the `events` is a `counter:event` pair in LIKWID's syntax. The `metrics` are a list of formulas to derive the metric value from the measurements of the `events`' values. Each metric has a name, the formula, a type and a publish flag. There is an optional `unit` field. Counter names can be used like variables in the formulas, so `PMC0+PMC1` sums the measurements for the both events configured in the counters `PMC0` and `PMC1`. You can optionally use `time` for the measurement time and `inverseClock` for `1.0/baseCpuFrequency`. The type tells the LikwidCollector whether it is a metric for each hardware thread (`cpu`) or each CPU socket (`socket`). You may specify a unit for the metric with `unit`. The last one is the publishing flag. It tells the LikwidCollector whether a metric should be sent to the router or is only used internally to compute a global metric. -- The `globalmetrics` are metrics which require data from multiple event set measurements to be derived. The inputs are the metrics in the event sets. Similar to the metrics in the event sets, the global metrics are defined by a name, a formula, a scope and a publish flag. See event set metrics for details. The only difference is that there is no access to the raw event measurements anymore but only to the metrics. Also `time` and `inverseClock` cannot be used anymore. So, the idea is to derive a metric in the `eventsets` section and reuse it in the `globalmetrics` part. If you need a metric only for deriving the global metrics, disable forwarding of the event set metrics (`"publish": false`). **Be aware** that the combination might be misleading because the "behavior" of a metric changes over time and the multiple measurements might count different computing phases. Similar to the metrics in the eventset, you can specify a metric unit with the `unit` field. +- The `globalmetrics` are metrics which require data from multiple event set measurements to be derived. The inputs are the metrics in the event sets. Similar to the metrics in the event sets, the global metrics are defined by a name, a formula, a type and a publish flag. See event set metrics for details. The only difference is that there is no access to the raw event measurements anymore but only to the metrics. Also `time` and `inverseClock` cannot be used anymore. So, the idea is to derive a metric in the `eventsets` section and reuse it in the `globalmetrics` part. If you need a metric only for deriving the global metrics, disable forwarding of the event set metrics (`"publish": false`). **Be aware** that the combination might be misleading because the "behavior" of a metric changes over time and the multiple measurements might count different computing phases. Similar to the metrics in the eventset, you can specify a metric unit with the `unit` field. Additional options: - `force_overwrite`: Same as setting `LIKWID_FORCE=1`. In case counters are already in-use, LIKWID overwrites their configuration to do its measurements @@ -50,20 +50,20 @@ Additional options: - `accessdaemon_path`: Folder of the accessDaemon `likwid-accessD` (like `/usr/local/sbin`) - `liblikwid_path`: Location of `liblikwid.so` including file name like `/usr/local/lib/liblikwid.so` -### Available metric scopes +### Available metric types Hardware performance counters are scattered all over the system nowadays. A counter coveres a specific part of the system. While there are hardware thread specific counter for CPU cycles, instructions and so on, some others are specific for a whole CPU socket/package. To address that, the LikwidCollector provides the specification of a `type` for each metric. - `hwthread` : One metric per CPU hardware thread with the tags `"type" : "hwthread"` and `"type-id" : "$hwthread_id"` - `socket` : One metric per CPU socket/package with the tags `"type" : "socket"` and `"type-id" : "$socket_id"` -**Note:** You cannot specify `socket` scope for a metric that is measured at `hwthread` scope, so some kind of expert knowledge or lookup work in the [Likwid Wiki](https://github.com/RRZE-HPC/likwid/wiki) is required. Get the scope of each counter from the *Architecture* pages and as soon as one counter in a metric is socket-specific, the whole metric is socket-specific. +**Note:** You cannot specify `socket` type for a metric that is measured at `hwthread` type, so some kind of expert knowledge or lookup work in the [Likwid Wiki](https://github.com/RRZE-HPC/likwid/wiki) is required. Get the type of each counter from the *Architecture* pages and as soon as one counter in a metric is socket-specific, the whole metric is socket-specific. As a guideline: -- All counters `FIXCx`, `PMCy` and `TMAz` have the scope `hwthread` -- All counters names containing `BOX` have the scope `socket` -- All `PWRx` counters have scope `socket`, except `"PWR1" : "RAPL_CORE_ENERGY"` has `hwthread` scope -- All `DFCx` counters have scope `socket` +- All counters `FIXCx`, `PMCy` and `TMAz` have the type `hwthread` +- All counters names containing `BOX` have the type `socket` +- All `PWRx` counters have type `socket`, except `"PWR1" : "RAPL_CORE_ENERGY"` has `hwthread` type +- All `DFCx` counters have type `socket` ### Help with the configuration @@ -93,7 +93,7 @@ $ scripts/likwid_perfgroup_to_cc_config.py ICX MEM_DP "name": "Runtime (RDTSC) [s]", "publish": true, "unit": "seconds" - "scope": "hwthread" + "type": "hwthread" }, { "..." : "..." @@ -245,7 +245,7 @@ METRICS -> "metrics": [ IPC PMC0/PMC1 -> { -> "name" : "IPC", -> "calc" : "PMC0/PMC1", - -> "scope": "hwthread", + -> "type": "hwthread", -> "publish": true -> } -> ] diff --git a/collectors/nfsiostatMetric.go b/collectors/nfsiostatMetric.go new file mode 100644 index 0000000..810215c --- /dev/null +++ b/collectors/nfsiostatMetric.go @@ -0,0 +1,166 @@ +package collectors + +import ( + "encoding/json" + "fmt" + "os" + "regexp" + "strconv" + "strings" + "time" + + cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" + lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric" +) + +// These are the fields we read from the JSON configuration +type NfsIOStatCollectorConfig struct { + ExcludeMetrics []string `json:"exclude_metrics,omitempty"` + ExcludeFilesystem []string `json:"exclude_filesystem,omitempty"` + UseServerAddressAsSType bool `json:"use_server_as_stype,omitempty"` +} + +// This contains all variables we need during execution and the variables +// defined by metricCollector (name, init, ...) +type NfsIOStatCollector struct { + metricCollector + config NfsIOStatCollectorConfig // the configuration structure + meta map[string]string // default meta information + tags map[string]string // default tags + data map[string]map[string]int64 // data storage for difference calculation + key string // which device info should be used as subtype ID? 'server' or 'mntpoint', see NfsIOStatCollectorConfig.UseServerAddressAsSType +} + +var deviceRegex = regexp.MustCompile(`device (?P[^ ]+) mounted on (?P[^ ]+) with fstype nfs(?P\d*) statvers=[\d\.]+`) +var bytesRegex = regexp.MustCompile(`\s+bytes:\s+(?P[^ ]+) (?P[^ ]+) (?P[^ ]+) (?P[^ ]+) (?P[^ ]+) (?P[^ ]+) (?P[^ ]+) (?P[^ ]+)`) + +func resolve_regex_fields(s string, regex *regexp.Regexp) map[string]string { + fields := make(map[string]string) + groups := regex.SubexpNames() + for _, match := range regex.FindAllStringSubmatch(s, -1) { + for groupIdx, group := range match { + if len(groups[groupIdx]) > 0 { + fields[groups[groupIdx]] = group + } + } + } + return fields +} + +func (m *NfsIOStatCollector) readNfsiostats() map[string]map[string]int64 { + data := make(map[string]map[string]int64) + filename := "/proc/self/mountstats" + stats, err := os.ReadFile(filename) + if err != nil { + return data + } + + lines := strings.Split(string(stats), "\n") + var current map[string]string = nil + for _, l := range lines { + // Is this a device line with mount point, remote target and NFS version? + dev := resolve_regex_fields(l, deviceRegex) + if len(dev) > 0 { + if _, ok := stringArrayContains(m.config.ExcludeFilesystem, dev[m.key]); !ok { + current = dev + if len(current["version"]) == 0 { + current["version"] = "3" + } + } + } + + if len(current) > 0 { + // Byte line parsing (if found the device for it) + bytes := resolve_regex_fields(l, bytesRegex) + if len(bytes) > 0 { + data[current[m.key]] = make(map[string]int64) + for name, sval := range bytes { + if _, ok := stringArrayContains(m.config.ExcludeMetrics, name); !ok { + val, err := strconv.ParseInt(sval, 10, 64) + if err == nil { + data[current[m.key]][name] = val + } + } + + } + current = nil + } + } + } + return data +} + +func (m *NfsIOStatCollector) Init(config json.RawMessage) error { + var err error = nil + m.name = "NfsIOStatCollector" + m.setup() + m.parallel = true + m.meta = map[string]string{"source": m.name, "group": "NFS", "unit": "bytes"} + m.tags = map[string]string{"type": "node"} + m.config.UseServerAddressAsSType = false + if len(config) > 0 { + err = json.Unmarshal(config, &m.config) + if err != nil { + cclog.ComponentError(m.name, "Error reading config:", err.Error()) + return err + } + } + m.key = "mntpoint" + if m.config.UseServerAddressAsSType { + m.key = "server" + } + m.data = m.readNfsiostats() + m.init = true + return err +} + +func (m *NfsIOStatCollector) Read(interval time.Duration, output chan lp.CCMetric) { + timestamp := time.Now() + + // Get the current values for all mountpoints + newdata := m.readNfsiostats() + + for mntpoint, values := range newdata { + // Was the mount point already present in the last iteration + if old, ok := m.data[mntpoint]; ok { + // Calculate the difference of old and new values + for i := range values { + x := values[i] - old[i] + y, err := lp.New(fmt.Sprintf("nfsio_%s", i), m.tags, m.meta, map[string]interface{}{"value": x}, timestamp) + if err == nil { + if strings.HasPrefix(i, "page") { + y.AddMeta("unit", "4K_Pages") + } + y.AddTag("stype", "filesystem") + y.AddTag("stype-id", mntpoint) + // Send it to output channel + output <- y + } + // Update old to the new value for the next iteration + old[i] = values[i] + } + } else { + // First time we see this mount point, store all values + m.data[mntpoint] = values + } + } + // Reset entries that do not exist anymore + for mntpoint := range m.data { + found := false + for new := range newdata { + if new == mntpoint { + found = true + break + } + } + if !found { + m.data[mntpoint] = nil + } + } + +} + +func (m *NfsIOStatCollector) Close() { + // Unset flag + m.init = false +} diff --git a/collectors/nfsiostatMetric.md b/collectors/nfsiostatMetric.md new file mode 100644 index 0000000..7f374e7 --- /dev/null +++ b/collectors/nfsiostatMetric.md @@ -0,0 +1,27 @@ +## `nfsiostat` collector + +```json + "nfsiostat": { + "exclude_metrics": [ + "nfsio_oread" + ], + "exclude_filesystems" : [ + "/mnt", + ], + "use_server_as_stype": false + } +``` + +The `nfsiostat` collector reads data from `/proc/self/mountstats` and outputs a handful **node** metrics for each NFS filesystem. If a metric or filesystem is not required, it can be excluded from forwarding it to the sink. + +Metrics: +* `nfsio_nread`: Bytes transferred by normal `read()` calls +* `nfsio_nwrite`: Bytes transferred by normal `write()` calls +* `nfsio_oread`: Bytes transferred by `read()` calls with `O_DIRECT` +* `nfsio_owrite`: Bytes transferred by `write()` calls with `O_DIRECT` +* `nfsio_pageread`: Pages transferred by `read()` calls +* `nfsio_pagewrite`: Pages transferred by `write()` calls +* `nfsio_nfsread`: Bytes transferred for reading from the server +* `nfsio_nfswrite`: Pages transferred by writing to the server + +The `nfsiostat` collector adds the mountpoint to the tags as `stype=filesystem,stype-id=`. If the server address should be used instead of the mountpoint, use the `use_server_as_stype` config setting. \ No newline at end of file diff --git a/collectors/numastatsMetric.go b/collectors/numastatsMetric.go index 8eaac67..f79b45b 100644 --- a/collectors/numastatsMetric.go +++ b/collectors/numastatsMetric.go @@ -14,29 +14,38 @@ import ( lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric" ) -// -// Numa policy hit/miss statistics +// Non-Uniform Memory Access (NUMA) policy hit/miss statistics // // numa_hit: -// A process wanted to allocate memory from this node, and succeeded. +// +// A process wanted to allocate memory from this node, and succeeded. +// // numa_miss: -// A process wanted to allocate memory from another node, -// but ended up with memory from this node. +// +// A process wanted to allocate memory from another node, +// but ended up with memory from this node. +// // numa_foreign: -// A process wanted to allocate on this node, -// but ended up with memory from another node. +// +// A process wanted to allocate on this node, +// but ended up with memory from another node. +// // local_node: -// A process ran on this node's CPU, -// and got memory from this node. +// +// A process ran on this node's CPU, +// and got memory from this node. +// // other_node: -// A process ran on a different node's CPU -// and got memory from this node. +// +// A process ran on a different node's CPU +// and got memory from this node. +// // interleave_hit: -// Interleaving wanted to allocate from this node -// and succeeded. +// +// Interleaving wanted to allocate from this node +// and succeeded. // // See: https://www.kernel.org/doc/html/latest/admin-guide/numastat.html -// type NUMAStatsCollectorTopolgy struct { file string tagSet map[string]string @@ -82,6 +91,8 @@ func (m *NUMAStatsCollector) Init(config json.RawMessage) error { }) } + // Initialized + cclog.ComponentDebug(m.name, "initialized", len(m.topology), "NUMA domains") m.init = true return nil } diff --git a/collectors/numastatsMetric.md b/collectors/numastatsMetric.md index 8eb1a0c..cb9ab2f 100644 --- a/collectors/numastatsMetric.md +++ b/collectors/numastatsMetric.md @@ -1,15 +1,17 @@ ## `numastat` collector + ```json - "numastat": {} + "numastats": {} ``` -The `numastat` collector reads data from `/sys/devices/system/node/node*/numastat` and outputs a handful **memoryDomain** metrics. See: https://www.kernel.org/doc/html/latest/admin-guide/numastat.html +The `numastat` collector reads data from `/sys/devices/system/node/node*/numastat` and outputs a handful **memoryDomain** metrics. See: Metrics: + * `numastats_numa_hit`: A process wanted to allocate memory from this node, and succeeded. * `numastats_numa_miss`: A process wanted to allocate memory from another node, but ended up with memory from this node. * `numastats_numa_foreign`: A process wanted to allocate on this node, but ended up with memory from another node. * `numastats_local_node`: A process ran on this node's CPU, and got memory from this node. * `numastats_other_node`: A process ran on a different node's CPU, and got memory from this node. -* `numastats_interleave_hit`: Interleaving wanted to allocate from this node and succeeded. \ No newline at end of file +* `numastats_interleave_hit`: Interleaving wanted to allocate from this node and succeeded. diff --git a/collectors/raplMetric.go b/collectors/raplMetric.go new file mode 100644 index 0000000..57c456b --- /dev/null +++ b/collectors/raplMetric.go @@ -0,0 +1,262 @@ +package collectors + +import ( + "encoding/json" + "fmt" + "os" + "path/filepath" + "strconv" + "strings" + "time" + + cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" + lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric" +) + +// running average power limit (RAPL) monitoring attributes for a zone +type RAPLZoneInfo struct { + // tags describing the RAPL zone: + // * zone_name, subzone_name: e.g. psys, dram, core, uncore, package-0 + // * zone_id: e.g. 0:1 (zone 0 sub zone 1) + tags map[string]string + energyFilepath string // path to a file containing the zones current energy counter in micro joules + energy int64 // current reading of the energy counter in micro joules + energyTimestamp time.Time // timestamp when energy counter was read + maxEnergyRange int64 // Range of the above energy counter in micro-joules +} + +type RAPLCollector struct { + metricCollector + config struct { + // Exclude IDs for RAPL zones, e.g. + // * 0 for zone 0 + // * 0:1 for zone 0 subzone 1 + ExcludeByID []string `json:"exclude_device_by_id,omitempty"` + // Exclude names for RAPL zones, e.g. psys, dram, core, uncore, package-0 + ExcludeByName []string `json:"exclude_device_by_name,omitempty"` + } + RAPLZoneInfo []RAPLZoneInfo + meta map[string]string // default meta information +} + +// Init initializes the running average power limit (RAPL) collector +func (m *RAPLCollector) Init(config json.RawMessage) error { + + // Check if already initialized + if m.init { + return nil + } + + var err error = nil + m.name = "RAPLCollector" + m.setup() + m.parallel = true + m.meta = map[string]string{ + "source": m.name, + "group": "energy", + "unit": "Watt", + } + + // Read in the JSON configuration + if len(config) > 0 { + err = json.Unmarshal(config, &m.config) + if err != nil { + cclog.ComponentError(m.name, "Error reading config:", err.Error()) + return err + } + } + + // Configure excluded RAPL zones + isIDExcluded := make(map[string]bool) + if m.config.ExcludeByID != nil { + for _, ID := range m.config.ExcludeByID { + isIDExcluded[ID] = true + } + } + isNameExcluded := make(map[string]bool) + if m.config.ExcludeByName != nil { + for _, name := range m.config.ExcludeByName { + isNameExcluded[name] = true + } + } + + // readZoneInfo reads RAPL monitoring attributes for a zone given by zonePath + // See: https://www.kernel.org/doc/html/latest/power/powercap/powercap.html#monitoring-attributes + readZoneInfo := func(zonePath string) (z struct { + name string // zones name e.g. psys, dram, core, uncore, package-0 + energyFilepath string // path to a file containing the zones current energy counter in micro joules + energy int64 // current reading of the energy counter in micro joules + energyTimestamp time.Time // timestamp when energy counter was read + maxEnergyRange int64 // Range of the above energy counter in micro-joules + ok bool // Are all information available? + }) { + // zones name e.g. psys, dram, core, uncore, package-0 + foundName := false + if v, err := + os.ReadFile( + filepath.Join(zonePath, "name")); err == nil { + foundName = true + z.name = strings.TrimSpace(string(v)) + } + + // path to a file containing the zones current energy counter in micro joules + z.energyFilepath = filepath.Join(zonePath, "energy_uj") + + // current reading of the energy counter in micro joules + foundEnergy := false + if v, err := os.ReadFile(z.energyFilepath); err == nil { + // timestamp when energy counter was read + z.energyTimestamp = time.Now() + if i, err := strconv.ParseInt(strings.TrimSpace(string(v)), 10, 64); err == nil { + foundEnergy = true + z.energy = i + } + } + + // Range of the above energy counter in micro-joules + foundMaxEnergyRange := false + if v, err := + os.ReadFile( + filepath.Join(zonePath, "max_energy_range_uj")); err == nil { + if i, err := strconv.ParseInt(strings.TrimSpace(string(v)), 10, 64); err == nil { + foundMaxEnergyRange = true + z.maxEnergyRange = i + } + } + + // Are all information available? + z.ok = foundName && foundEnergy && foundMaxEnergyRange + + return + } + + powerCapPrefix := "/sys/devices/virtual/powercap" + controlType := "intel-rapl" + controlTypePath := filepath.Join(powerCapPrefix, controlType) + + // Find all RAPL zones + zonePrefix := filepath.Join(controlTypePath, controlType+":") + zonesPath, err := filepath.Glob(zonePrefix + "*") + if err != nil || zonesPath == nil { + return fmt.Errorf("unable to find any zones under %s", controlTypePath) + } + + for _, zonePath := range zonesPath { + zoneID := strings.TrimPrefix(zonePath, zonePrefix) + z := readZoneInfo(zonePath) + if z.ok && + !isIDExcluded[zoneID] && + !isNameExcluded[z.name] { + + // Add RAPL monitoring attributes for a zone + m.RAPLZoneInfo = + append( + m.RAPLZoneInfo, + RAPLZoneInfo{ + tags: map[string]string{ + "id": zoneID, + "zone_name": z.name, + }, + energyFilepath: z.energyFilepath, + energy: z.energy, + energyTimestamp: z.energyTimestamp, + maxEnergyRange: z.maxEnergyRange, + }) + } + + // find all sub zones for the given zone + subZonePrefix := filepath.Join(zonePath, controlType+":"+zoneID+":") + subZonesPath, err := filepath.Glob(subZonePrefix + "*") + if err != nil || subZonesPath == nil { + continue + } + + for _, subZonePath := range subZonesPath { + subZoneID := strings.TrimPrefix(subZonePath, subZonePrefix) + sz := readZoneInfo(subZonePath) + if len(zoneID) > 0 && len(z.name) > 0 && + sz.ok && + !isIDExcluded[zoneID+":"+subZoneID] && + !isNameExcluded[sz.name] { + m.RAPLZoneInfo = + append( + m.RAPLZoneInfo, + RAPLZoneInfo{ + tags: map[string]string{ + "id": zoneID + ":" + subZoneID, + "zone_name": z.name, + "sub_zone_name": sz.name, + }, + energyFilepath: sz.energyFilepath, + energy: sz.energy, + energyTimestamp: sz.energyTimestamp, + maxEnergyRange: sz.maxEnergyRange, + }) + } + } + } + + if m.RAPLZoneInfo == nil { + return fmt.Errorf("no running average power limit (RAPL) device found in %s", controlTypePath) + + } + + // Initialized + cclog.ComponentDebug( + m.name, + "initialized", + len(m.RAPLZoneInfo), + "zones with running average power limit (RAPL) monitoring attributes") + m.init = true + + return err +} + +// Read reads running average power limit (RAPL) monitoring attributes for all initialized zones +// See: https://www.kernel.org/doc/html/latest/power/powercap/powercap.html#monitoring-attributes +func (m *RAPLCollector) Read(interval time.Duration, output chan lp.CCMetric) { + + for i := range m.RAPLZoneInfo { + p := &m.RAPLZoneInfo[i] + + // Read current value of the energy counter in micro joules + if v, err := os.ReadFile(p.energyFilepath); err == nil { + energyTimestamp := time.Now() + if i, err := strconv.ParseInt(strings.TrimSpace(string(v)), 10, 64); err == nil { + energy := i + + // Compute average power (Δ energy / Δ time) + energyDiff := energy - p.energy + if energyDiff < 0 { + // Handle overflow: + // ( p.maxEnergyRange - p.energy ) + energy + // = p.maxEnergyRange + ( energy - p.energy ) + // = p.maxEnergyRange + diffEnergy + energyDiff += p.maxEnergyRange + } + timeDiff := energyTimestamp.Sub(p.energyTimestamp) + averagePower := float64(energyDiff) / float64(timeDiff.Microseconds()) + + y, err := lp.New( + "rapl_average_power", + p.tags, + m.meta, + map[string]interface{}{"value": averagePower}, + energyTimestamp) + if err == nil { + output <- y + } + + // Save current energy counter state + p.energy = energy + p.energyTimestamp = energyTimestamp + } + } + } +} + +// Close closes running average power limit (RAPL) metric collector +func (m *RAPLCollector) Close() { + // Unset flag + m.init = false +} diff --git a/collectors/raplMetric.md b/collectors/raplMetric.md new file mode 100644 index 0000000..f857d7c --- /dev/null +++ b/collectors/raplMetric.md @@ -0,0 +1,18 @@ +# Running average power limit (RAPL) metric collector + +This collector reads running average power limit (RAPL) monitoring attributes to compute average power consumption metrics. See . + +The Likwid metric collector provides similar functionality. + +## Configuration + +```json + "rapl": { + "exclude_device_by_id": ["0:1", "0:2"], + "exclude_device_by_name": ["psys"] + } +``` + +## Metrics + +* `rapl_average_power`: average power consumption in Watt. The average is computed over the entire runtime from the last measurement to the current measurement diff --git a/collectors/sampleMetric.go b/collectors/sampleMetric.go index 056ed85..4c3ac66 100644 --- a/collectors/sampleMetric.go +++ b/collectors/sampleMetric.go @@ -17,9 +17,9 @@ type SampleCollectorConfig struct { // defined by metricCollector (name, init, ...) type SampleCollector struct { metricCollector - config SampleTimerCollectorConfig // the configuration structure - meta map[string]string // default meta information - tags map[string]string // default tags + config SampleCollectorConfig // the configuration structure + meta map[string]string // default meta information + tags map[string]string // default tags } // Functions to implement MetricCollector interface @@ -36,14 +36,14 @@ func (m *SampleCollector) Init(config json.RawMessage) error { // This is for later use, also call it early m.setup() // Tell whether the collector should be run in parallel with others (reading files, ...) - // or it should be run serially, mostly for collectors acutally doing measurements + // or it should be run serially, mostly for collectors actually doing measurements // because they should not measure the execution of the other collectors m.parallel = true // Define meta information sent with each metric // (Can also be dynamic or this is the basic set with extension through AddMeta()) m.meta = map[string]string{"source": m.name, "group": "SAMPLE"} // Define tags sent with each metric - // The 'type' tag is always needed, it defines the granulatity of the metric + // The 'type' tag is always needed, it defines the granularity of the metric // node -> whole system // socket -> CPU socket (requires socket ID as 'type-id' tag) // die -> CPU die (requires CPU die ID as 'type-id' tag) diff --git a/collectors/sampleTimerMetric.go b/collectors/sampleTimerMetric.go index 8b09bc1..dfac808 100644 --- a/collectors/sampleTimerMetric.go +++ b/collectors/sampleTimerMetric.go @@ -38,7 +38,7 @@ func (m *SampleTimerCollector) Init(name string, config json.RawMessage) error { // (Can also be dynamic or this is the basic set with extension through AddMeta()) m.meta = map[string]string{"source": m.name, "group": "SAMPLE"} // Define tags sent with each metric - // The 'type' tag is always needed, it defines the granulatity of the metric + // The 'type' tag is always needed, it defines the granularity of the metric // node -> whole system // socket -> CPU socket (requires socket ID as 'type-id' tag) // cpu -> single CPU hardware thread (requires cpu ID as 'type-id' tag) @@ -60,7 +60,7 @@ func (m *SampleTimerCollector) Init(name string, config json.RawMessage) error { // Storage for output channel m.output = nil - // Mangement channel for the timer function. + // Management channel for the timer function. m.done = make(chan bool) // Create the own ticker m.ticker = time.NewTicker(m.interval) @@ -94,7 +94,7 @@ func (m *SampleTimerCollector) ReadMetrics(timestamp time.Time) { value := 1.0 - // If you want to measure something for a specific amout of time, use interval + // If you want to measure something for a specific amount of time, use interval // start := readState() // time.Sleep(interval) // stop := readState() diff --git a/docs/building.md b/docs/building.md index 968c454..53a5fb0 100644 --- a/docs/building.md +++ b/docs/building.md @@ -37,7 +37,9 @@ $ install --mode 644 \ $ systemctl enable cc-metric-collector ``` -## RPM +## Packaging + +### RPM In order to get a RPM packages for cc-metric-collector, just use: @@ -47,7 +49,7 @@ $ make RPM It uses the RPM SPEC file `scripts/cc-metric-collector.spec` and requires the RPM tools (`rpm` and `rpmspec`) and `git`. -## DEB +### DEB In order to get very simple Debian packages for cc-metric-collector, just use: @@ -57,4 +59,16 @@ $ make DEB It uses the DEB control file `scripts/cc-metric-collector.control` and requires `dpkg-deb`, `awk`, `sed` and `git`. It creates only a binary deb package. -_This option is not well tested and therefore experimental_ \ No newline at end of file +_This option is not well tested and therefore experimental_ + +### Customizing RPMs or DEB packages + +If you want to customize the RPMs or DEB packages for your local system, use the following workflow. + +- (if there is already a fork in the private account, delete it and wait until Github realizes the deletion) +- Fork the cc-metric-collector repository (if Github hasn't realized it, it creates a fork named cc-metric-collector2) +- Go to private cc-metric-collector repository and enable Github Actions +- Do changes to the scripts, code, ... Commit and push your changes. +- Tag the new commit with `v0.x.y-` (`git tag v0.x.y-`) +- Push tags to repository (`git push --tags`) +- Wait until the Release action finishes. It creates fresh RPMs and DEBs in your private repository on the Releases page. diff --git a/go.mod b/go.mod index 30a961c..9a9bf4b 100644 --- a/go.mod +++ b/go.mod @@ -6,37 +6,36 @@ require ( github.com/ClusterCockpit/cc-units v0.3.0 github.com/ClusterCockpit/go-rocm-smi v0.3.0 github.com/NVIDIA/go-nvml v0.11.6-0 - github.com/PaesslerAG/gval v1.2.0 + github.com/PaesslerAG/gval v1.2.1 github.com/gorilla/mux v1.8.0 - github.com/influxdata/influxdb-client-go/v2 v2.9.0 - github.com/influxdata/influxdb-client-go/v2 v2.9.1 + github.com/influxdata/influxdb-client-go/v2 v2.12.1 github.com/influxdata/line-protocol v0.0.0-20210922203350-b1ad95c89adf - github.com/nats-io/nats.go v1.16.0 - github.com/prometheus/client_golang v1.12.2 + github.com/nats-io/nats.go v1.21.0 + github.com/prometheus/client_golang v1.14.0 github.com/stmcginnis/gofish v0.13.0 - github.com/tklauser/go-sysconf v0.3.10 - golang.org/x/sys v0.0.0-20220712014510-0a85c31ab51e + github.com/tklauser/go-sysconf v0.3.11 + golang.org/x/sys v0.3.0 ) require ( + github.com/apapsch/go-jsonmerge/v2 v2.0.0 // indirect github.com/beorn7/perks v1.0.1 // indirect - github.com/cespare/xxhash/v2 v2.1.2 // indirect - github.com/deepmap/oapi-codegen v1.11.0 // indirect + github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/deepmap/oapi-codegen v1.12.4 // indirect github.com/golang/protobuf v1.5.2 // indirect github.com/google/uuid v1.3.0 // indirect - github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect + github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect github.com/nats-io/nats-server/v2 v2.8.4 // indirect github.com/nats-io/nkeys v0.3.0 // indirect github.com/nats-io/nuid v1.0.1 // indirect github.com/pkg/errors v0.9.1 // indirect - github.com/prometheus/client_model v0.2.0 // indirect + github.com/prometheus/client_model v0.3.0 // indirect github.com/prometheus/common v0.37.0 // indirect - github.com/prometheus/procfs v0.7.3 // indirect + github.com/prometheus/procfs v0.8.0 // indirect github.com/shopspring/decimal v1.3.1 // indirect - github.com/tklauser/numcpus v0.4.0 // indirect - golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d // indirect - golang.org/x/net v0.0.0-20220708220712-1185a9018129 // indirect - google.golang.org/protobuf v1.28.0 // indirect - gopkg.in/yaml.v2 v2.4.0 // indirect + github.com/tklauser/numcpus v0.6.0 // indirect + golang.org/x/crypto v0.3.0 // indirect + golang.org/x/net v0.3.0 // indirect + google.golang.org/protobuf v1.28.1 // indirect ) diff --git a/pkg/hostlist/hostlist.go b/pkg/hostlist/hostlist.go new file mode 100644 index 0000000..6ed5ae0 --- /dev/null +++ b/pkg/hostlist/hostlist.go @@ -0,0 +1,125 @@ +package hostlist + +import ( + "fmt" + "regexp" + "sort" + "strconv" + "strings" +) + +func Expand(in string) (result []string, err error) { + + // Create ranges regular expression + reStNumber := "[[:digit:]]+" + reStRange := reStNumber + "-" + reStNumber + reStOptionalNumberOrRange := "(" + reStNumber + ",|" + reStRange + ",)*" + reStNumberOrRange := "(" + reStNumber + "|" + reStRange + ")" + reStBraceLeft := "[[]" + reStBraceRight := "[]]" + reStRanges := reStBraceLeft + + reStOptionalNumberOrRange + + reStNumberOrRange + + reStBraceRight + reRanges := regexp.MustCompile(reStRanges) + + // Create host list regular expression + reStDNSChars := "[a-zA-Z0-9-]+" + reStPrefix := "^(" + reStDNSChars + ")" + reStOptionalSuffix := "(" + reStDNSChars + ")?" + re := regexp.MustCompile(reStPrefix + "([[][0-9,-]+[]])?" + reStOptionalSuffix) + + // Remove all delimiters from the input + in = strings.TrimLeft(in, ", ") + + for len(in) > 0 { + if v := re.FindStringSubmatch(in); v != nil { + + // Remove matched part from the input + lenPrefix := len(v[0]) + in = in[lenPrefix:] + + // Remove all delimiters from the input + in = strings.TrimLeft(in, ", ") + + // matched prefix, range and suffix + hlPrefix := v[1] + hlRanges := v[2] + hlSuffix := v[3] + + // Single node without ranges + if hlRanges == "" { + result = append(result, hlPrefix) + continue + } + + // Node with ranges + if v := reRanges.FindStringSubmatch(hlRanges); v != nil { + + // Remove braces + hlRanges = hlRanges[1 : len(hlRanges)-1] + + // Split host ranges at , + for _, hlRange := range strings.Split(hlRanges, ",") { + + // Split host range at - + RangeStartEnd := strings.Split(hlRange, "-") + + // Range is only a single number + if len(RangeStartEnd) == 1 { + result = append(result, hlPrefix+RangeStartEnd[0]+hlSuffix) + continue + } + + // Range has a start and an end + widthRangeStart := len(RangeStartEnd[0]) + widthRangeEnd := len(RangeStartEnd[1]) + iStart, _ := strconv.ParseUint(RangeStartEnd[0], 10, 64) + iEnd, _ := strconv.ParseUint(RangeStartEnd[1], 10, 64) + if iStart > iEnd { + return nil, fmt.Errorf("single range start is greater than end: %s", hlRange) + } + + // Create print format string for range numbers + doPadding := widthRangeStart == widthRangeEnd + widthPadding := widthRangeStart + var formatString string + if doPadding { + formatString = "%0" + fmt.Sprint(widthPadding) + "d" + } else { + formatString = "%d" + } + formatString = hlPrefix + formatString + hlSuffix + + // Add nodes from this range + for i := iStart; i <= iEnd; i++ { + result = append(result, fmt.Sprintf(formatString, i)) + } + } + } else { + return nil, fmt.Errorf("not at hostlist range: %s", hlRanges) + } + } else { + return nil, fmt.Errorf("not a hostlist: %s", in) + } + } + + if result != nil { + // sort + sort.Strings(result) + + // uniq + previous := 1 + for current := 1; current < len(result); current++ { + if result[current-1] != result[current] { + if previous != current { + result[previous] = result[current] + } + previous++ + } + } + result = result[:previous] + } + + return +} diff --git a/pkg/hostlist/hostlist_test.go b/pkg/hostlist/hostlist_test.go new file mode 100644 index 0000000..d5c57ad --- /dev/null +++ b/pkg/hostlist/hostlist_test.go @@ -0,0 +1,126 @@ +package hostlist + +import ( + "testing" +) + +func TestExpand(t *testing.T) { + + // Compare two slices of strings + equal := func(a, b []string) bool { + if len(a) != len(b) { + return false + } + for i, v := range a { + if v != b[i] { + return false + } + } + return true + } + + type testDefinition struct { + input string + resultExpected []string + errorExpected bool + } + + expandTests := []testDefinition{ + { + // Single node + input: "n1", + resultExpected: []string{"n1"}, + errorExpected: false, + }, + { + // Single node, duplicated + input: "n1,n1", + resultExpected: []string{"n1"}, + errorExpected: false, + }, + { + // Single node with padding + input: "n[01]", + resultExpected: []string{"n01"}, + errorExpected: false, + }, + { + // Single node with suffix + input: "n[01]-p", + resultExpected: []string{"n01-p"}, + errorExpected: false, + }, + { + // Multiple nodes with a single range + input: "n[1-2]", + resultExpected: []string{"n1", "n2"}, + errorExpected: false, + }, + { + // Multiple nodes with a single range and a single index + input: "n[1-2,3]", + resultExpected: []string{"n1", "n2", "n3"}, + errorExpected: false, + }, + { + // Multiple nodes with different prefixes + input: "n[1-2],m[1,2]", + resultExpected: []string{"m1", "m2", "n1", "n2"}, + errorExpected: false, + }, + { + // Multiple nodes with different suffixes + input: "n[1-2]-p,n[1,2]-q", + resultExpected: []string{"n1-p", "n1-q", "n2-p", "n2-q"}, + errorExpected: false, + }, + { + // Multiple nodes with and without node ranges + input: " n09, n[01-04,06-07,09] , , n10,n04", + resultExpected: []string{"n01", "n02", "n03", "n04", "n06", "n07", "n09", "n10"}, + errorExpected: false, + }, + { + // Forbidden DNS character + input: "n@", + resultExpected: []string{}, + errorExpected: true, + }, + { + // Forbidden range + input: "n[1-2-2,3]", + resultExpected: []string{}, + errorExpected: true, + }, + { + // Forbidden range limits + input: "n[2-1]", + resultExpected: []string{}, + errorExpected: true, + }, + } + + for _, expandTest := range expandTests { + result, err := Expand(expandTest.input) + + hasError := err != nil + if hasError != expandTest.errorExpected && hasError { + t.Errorf("Expand('%s') failed: unexpected error '%v'", + expandTest.input, err) + continue + } + if hasError != expandTest.errorExpected && !hasError { + t.Errorf("Expand('%s') did not fail as expected: got result '%+v'", + expandTest.input, result) + continue + } + if !hasError && !equal(result, expandTest.resultExpected) { + t.Errorf("Expand('%s') failed: got result '%+v', expected result '%v'", + expandTest.input, result, expandTest.resultExpected) + continue + } + + t.Logf("Checked hostlist.Expand('%s'): result = '%+v', err = '%v'", + expandTest.input, result, err) + } +} diff --git a/receivers.json b/receivers.json index cd78eb6..dac979e 100644 --- a/receivers.json +++ b/receivers.json @@ -1,25 +1,44 @@ { - "natsrecv" : { + "natsrecv": { "type": "nats", "address": "nats://my-url", - "port" : "4222", + "port": "4222", "database": "testcluster" }, "redfish_recv": { "type": "redfish", + "endpoint": "https://%h-bmc", "client_config": [ { - "hostname": "my-host-1", + "host_list": "my-host-1-[1-2]", "username": "username-1", - "password": "password-1", - "endpoint": "https://my-endpoint-1" + "password": "password-1" + }, + { + "host_list": "my-host-2-[1,2]", + "username": "username-2", + "password": "password-2" + } + ] + }, + "ipmi_recv": { + "type": "ipmi", + "endpoint": "ipmi-sensors://%h-ipmi", + "exclude_metrics": [ + "fan_speed", + "voltage" + ], + "client_config": [ + { + "username": "username-1", + "password": "password-1", + "host_list": "my-host-1-[1-2]" }, { - "hostname": "my-host-2", "username": "username-2", "password": "password-2", - "endpoint": "https://my-endpoint-2" + "host_list": "my-host-2-[1,2]" } ] } -} +} \ No newline at end of file diff --git a/receivers/ipmiReceiver.go b/receivers/ipmiReceiver.go new file mode 100644 index 0000000..e3d544a --- /dev/null +++ b/receivers/ipmiReceiver.go @@ -0,0 +1,534 @@ +package receivers + +import ( + "bufio" + "bytes" + "encoding/json" + "fmt" + "io" + "os/exec" + "regexp" + "strconv" + "strings" + "sync" + "time" + + cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" + lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric" + "github.com/ClusterCockpit/cc-metric-collector/pkg/hostlist" +) + +type IPMIReceiverClientConfig struct { + + // Hostname the IPMI service belongs to + Protocol string // Protocol / tool to use for IPMI sensor reading + DriverType string // Out of band IPMI driver + Fanout int // Maximum number of simultaneous IPMI connections + NumHosts int // Number of remote IPMI devices with the same configuration + IPMIHosts string // List of remote IPMI devices to communicate with + IPMI2HostMapping map[string]string // Mapping between IPMI device name and host name + Username string // User name to authenticate with + Password string // Password to use for authentication + CLIOptions []string // Additional command line options for ipmi-sensors + isExcluded map[string]bool // is metric excluded +} + +type IPMIReceiver struct { + receiver + config struct { + Interval time.Duration + + // Client config for each IPMI hosts + ClientConfigs []IPMIReceiverClientConfig + } + + // Storage for static information + meta map[string]string + + done chan bool // channel to finish / stop IPMI receiver + wg sync.WaitGroup // wait group for IPMI receiver +} + +// doReadMetrics reads metrics from all configure IPMI hosts. +func (r *IPMIReceiver) doReadMetric() { + for i := range r.config.ClientConfigs { + clientConfig := &r.config.ClientConfigs[i] + var cmd_options []string + if clientConfig.Protocol == "ipmi-sensors" { + cmd_options = append(cmd_options, + "--always-prefix", + "--sdr-cache-recreate", + // Attempt to interpret OEM data, such as event data, sensor readings, or general extra info + "--interpret-oem-data", + // Ignore not-available (i.e. N/A) sensors in output + "--ignore-not-available-sensors", + // Ignore unrecognized sensor events + "--ignore-unrecognized-events", + // Output fields in comma separated format + "--comma-separated-output", + // Do not output column headers + "--no-header-output", + // Output non-abbreviated units (e.g. 'Amps' instead of 'A'). + // May aid in disambiguation of units (e.g. 'C' for Celsius or Coulombs). + "--non-abbreviated-units", + "--fanout", fmt.Sprint(clientConfig.Fanout), + "--driver-type", clientConfig.DriverType, + "--hostname", clientConfig.IPMIHosts, + "--username", clientConfig.Username, + "--password", clientConfig.Password, + ) + cmd_options := append(cmd_options, clientConfig.CLIOptions...) + + command := exec.Command("ipmi-sensors", cmd_options...) + stdout, _ := command.StdoutPipe() + errBuf := new(bytes.Buffer) + command.Stderr = errBuf + + // start command + if err := command.Start(); err != nil { + cclog.ComponentError( + r.name, + fmt.Sprintf("doReadMetric(): Failed to start command \"%s\": %v", command.String(), err), + ) + continue + } + + // Read command output + const ( + idxID = iota + idxName + idxType + idxReading + idxUnits + idxEvent + ) + numPrefixRegex := regexp.MustCompile("^[[:digit:]][[:digit:]]-(.*)$") + scanner := bufio.NewScanner(stdout) + for scanner.Scan() { + // Read host + v1 := strings.Split(scanner.Text(), ": ") + if len(v1) != 2 { + continue + } + host, ok := clientConfig.IPMI2HostMapping[v1[0]] + if !ok { + continue + } + + // Read sensors + v2 := strings.Split(v1[1], ",") + if len(v2) != 6 { + continue + } + // Skip sensors with non available sensor readings + if v2[idxReading] == "N/A" { + continue + } + + metric := strings.ToLower(v2[idxType]) + name := strings.ToLower( + strings.Replace( + strings.TrimSpace( + v2[idxName]), " ", "_", -1)) + // remove prefix enumeration like 01-... + if v := numPrefixRegex.FindStringSubmatch(name); v != nil { + name = v[1] + } + unit := v2[idxUnits] + if unit == "Watts" { + + // Power + metric = "power" + name = strings.TrimSuffix(name, "_power") + name = strings.TrimSuffix(name, "_pwr") + name = strings.TrimPrefix(name, "pwr_") + } else if metric == "voltage" && + unit == "Volts" { + + // Voltage + name = strings.TrimPrefix(name, "volt_") + } else if metric == "current" && + unit == "Amps" { + + // Current + unit = "Ampere" + } else if metric == "temperature" && + unit == "degrees C" { + + // Temperature + name = strings.TrimSuffix(name, "_temp") + unit = "degC" + } else if metric == "temperature" && + unit == "degrees F" { + + // Temperature + name = strings.TrimSuffix(name, "_temp") + unit = "degF" + } else if metric == "fan" && unit == "RPM" { + + // Fan speed + metric = "fan_speed" + name = strings.TrimSuffix(name, "_tach") + name = strings.TrimPrefix(name, "spd_") + } else if (metric == "cooling device" || + metric == "other units based sensor") && + name == "system_air_flow" && + unit == "CFM" { + + // Air flow + metric = "air_flow" + name = strings.TrimSuffix(name, "_air_flow") + unit = "CubicFeetPerMinute" + } else if (metric == "processor" || + metric == "other units based sensor") && + (name == "cpu_utilization" || + name == "io_utilization" || + name == "mem_utilization" || + name == "sys_utilization") && + (unit == "unspecified" || + unit == "%") { + + // Utilization + metric = "utilization" + name = strings.TrimSuffix(name, "_utilization") + unit = "percent" + } else { + if false { + // Debug output for unprocessed metrics + fmt.Printf( + "host: '%s', metric: '%s', name: '%s', unit: '%s'\n", + host, metric, name, unit) + } + continue + } + + // Skip excluded metrics + if clientConfig.isExcluded[metric] { + continue + } + + // Parse sensor value + value, err := strconv.ParseFloat(v2[idxReading], 64) + if err != nil { + continue + } + + y, err := lp.New( + metric, + map[string]string{ + "hostname": host, + "type": "node", + "name": name, + }, + map[string]string{ + "source": r.name, + "group": "IPMI", + "unit": unit, + }, + map[string]interface{}{ + "value": value, + }, + time.Now()) + if err == nil { + r.sink <- y + } + } + + // Wait for command end + if err := command.Wait(); err != nil { + errMsg, _ := io.ReadAll(errBuf) + cclog.ComponentError( + r.name, + fmt.Sprintf("doReadMetric(): Failed to wait for the end of command \"%s\": %v\n", + strings.Replace(command.String(), clientConfig.Password, "", -1), err), + fmt.Sprintf("doReadMetric(): command stderr: \"%s\"\n", string(errMsg)), + ) + } + } + } +} + +func (r *IPMIReceiver) Start() { + cclog.ComponentDebug(r.name, "START") + + // Start IPMI receiver + r.wg.Add(1) + go func() { + defer r.wg.Done() + + // Create ticker + ticker := time.NewTicker(r.config.Interval) + defer ticker.Stop() + + for { + r.doReadMetric() + + select { + case tickerTime := <-ticker.C: + // Check if we missed the ticker event + if since := time.Since(tickerTime); since > 5*time.Second { + cclog.ComponentInfo(r.name, "Missed ticker event for more then", since) + } + + // process ticker event -> continue + continue + case <-r.done: + // process done event + return + } + } + }() + + cclog.ComponentDebug(r.name, "STARTED") +} + +// Close receiver: close network connection, close files, close libraries, ... +func (r *IPMIReceiver) Close() { + cclog.ComponentDebug(r.name, "CLOSE") + + // Send the signal and wait + close(r.done) + r.wg.Wait() + + cclog.ComponentDebug(r.name, "DONE") +} + +// NewIPMIReceiver creates a new instance of the redfish receiver +// Initialize the receiver by giving it a name and reading in the config JSON +func NewIPMIReceiver(name string, config json.RawMessage) (Receiver, error) { + r := new(IPMIReceiver) + + // Config options from config file + configJSON := struct { + Type string `json:"type"` + + // How often the IPMI sensor metrics should be read and send to the sink (default: 30 s) + IntervalString string `json:"interval,omitempty"` + + // Maximum number of simultaneous IPMI connections (default: 64) + Fanout int `json:"fanout,omitempty"` + + // Out of band IPMI driver (default: LAN_2_0) + DriverType string `json:"driver_type,omitempty"` + + // Default client username, password and endpoint + Username *string `json:"username"` // User name to authenticate with + Password *string `json:"password"` // Password to use for authentication + Endpoint *string `json:"endpoint"` // URL of the IPMI device + + // Globally excluded metrics + ExcludeMetrics []string `json:"exclude_metrics,omitempty"` + + ClientConfigs []struct { + Fanout int `json:"fanout,omitempty"` // Maximum number of simultaneous IPMI connections (default: 64) + DriverType string `json:"driver_type,omitempty"` // Out of band IPMI driver (default: LAN_2_0) + HostList string `json:"host_list"` // List of hosts with the same client configuration + Username *string `json:"username"` // User name to authenticate with + Password *string `json:"password"` // Password to use for authentication + Endpoint *string `json:"endpoint"` // URL of the IPMI service + + // Per client excluded metrics + ExcludeMetrics []string `json:"exclude_metrics,omitempty"` + + // Additional command line options for ipmi-sensors + CLIOptions []string `json:"cli_options,omitempty"` + } `json:"client_config"` + }{ + // Set defaults values + // Allow overwriting these defaults by reading config JSON + Fanout: 64, + DriverType: "LAN_2_0", + IntervalString: "30s", + } + + // Set name of IPMIReceiver + r.name = fmt.Sprintf("IPMIReceiver(%s)", name) + + // Create done channel + r.done = make(chan bool) + + // Set static information + r.meta = map[string]string{"source": r.name} + + // Read the IPMI receiver specific JSON config + if len(config) > 0 { + d := json.NewDecoder(bytes.NewReader(config)) + d.DisallowUnknownFields() + if err := d.Decode(&configJSON); err != nil { + cclog.ComponentError(r.name, "Error reading config:", err.Error()) + return nil, err + } + } + + // Convert interval string representation to duration + var err error + r.config.Interval, err = time.ParseDuration(configJSON.IntervalString) + if err != nil { + err := fmt.Errorf( + "Failed to parse duration string interval='%s': %w", + configJSON.IntervalString, + err, + ) + cclog.Error(r.name, err) + return nil, err + } + + // Create client config from JSON config + totalNumHosts := 0 + for i := range configJSON.ClientConfigs { + clientConfigJSON := &configJSON.ClientConfigs[i] + + var endpoint string + if clientConfigJSON.Endpoint != nil { + endpoint = *clientConfigJSON.Endpoint + } else if configJSON.Endpoint != nil { + endpoint = *configJSON.Endpoint + } else { + err := fmt.Errorf("client config number %v requires endpoint", i) + cclog.ComponentError(r.name, err) + return nil, err + } + + fanout := configJSON.Fanout + if clientConfigJSON.Fanout != 0 { + fanout = clientConfigJSON.Fanout + } + + driverType := configJSON.DriverType + if clientConfigJSON.DriverType != "" { + driverType = clientConfigJSON.DriverType + } + if driverType != "LAN" && driverType != "LAN_2_0" { + err := fmt.Errorf("client config number %v has invalid driver type %s", i, driverType) + cclog.ComponentError(r.name, err) + return nil, err + } + + var protocol string + var host_pattern string + if e := strings.Split(endpoint, "://"); len(e) == 2 { + protocol = e[0] + host_pattern = e[1] + } else { + err := fmt.Errorf("client config number %v has invalid endpoint %s", i, endpoint) + cclog.ComponentError(r.name, err) + return nil, err + } + + var username string + if clientConfigJSON.Username != nil { + username = *clientConfigJSON.Username + } else if configJSON.Username != nil { + username = *configJSON.Username + } else { + err := fmt.Errorf("client config number %v requires username", i) + cclog.ComponentError(r.name, err) + return nil, err + } + + var password string + if clientConfigJSON.Password != nil { + password = *clientConfigJSON.Password + } else if configJSON.Password != nil { + password = *configJSON.Password + } else { + err := fmt.Errorf("client config number %v requires password", i) + cclog.ComponentError(r.name, err) + return nil, err + } + + // Create mapping between IPMI host name and node host name + // This also guaranties that all IPMI host names are unique + ipmi2HostMapping := make(map[string]string) + hostList, err := hostlist.Expand(clientConfigJSON.HostList) + if err != nil { + err := fmt.Errorf("client config number %d failed to parse host list %s: %v", + i, clientConfigJSON.HostList, err) + cclog.ComponentError(r.name, err) + return nil, err + } + for _, host := range hostList { + ipmiHost := strings.Replace(host_pattern, "%h", host, -1) + ipmi2HostMapping[ipmiHost] = host + } + + numHosts := len(ipmi2HostMapping) + totalNumHosts += numHosts + ipmiHostList := make([]string, 0, numHosts) + for ipmiHost := range ipmi2HostMapping { + ipmiHostList = append(ipmiHostList, ipmiHost) + } + + // Additional command line options + for _, v := range clientConfigJSON.CLIOptions { + switch { + case v == "-u" || strings.HasPrefix(v, "--username"): + err := fmt.Errorf("client config number %v: do not set username in cli_options. Use json config username instead", i) + cclog.ComponentError(r.name, err) + return nil, err + case v == "-p" || strings.HasPrefix(v, "--password"): + err := fmt.Errorf("client config number %v: do not set password in cli_options. Use json config password instead", i) + cclog.ComponentError(r.name, err) + return nil, err + case v == "-h" || strings.HasPrefix(v, "--hostname"): + err := fmt.Errorf("client config number %v: do not set hostname in cli_options. Use json config host_list instead", i) + cclog.ComponentError(r.name, err) + return nil, err + case v == "-D" || strings.HasPrefix(v, "--driver-type"): + err := fmt.Errorf("client config number %v: do not set driver type in cli_options. Use json config driver_type instead", i) + cclog.ComponentError(r.name, err) + return nil, err + case v == "-F" || strings.HasPrefix(v, " --fanout"): + err := fmt.Errorf("client config number %v: do not set fanout in cli_options. Use json config fanout instead", i) + cclog.ComponentError(r.name, err) + return nil, err + case v == "--always-prefix" || + v == "--sdr-cache-recreate" || + v == "--interpret-oem-data" || + v == "--ignore-not-available-sensors" || + v == "--ignore-unrecognized-events" || + v == "--comma-separated-output" || + v == "--no-header-output" || + v == "--non-abbreviated-units": + err := fmt.Errorf("client config number %v: Do not use option %s in cli_options, it is used internally", i, v) + cclog.ComponentError(r.name, err) + return nil, err + } + } + cliOptions := make([]string, 0) + cliOptions = append(cliOptions, clientConfigJSON.CLIOptions...) + + // Is metrics excluded globally or per client + isExcluded := make(map[string]bool) + for _, key := range clientConfigJSON.ExcludeMetrics { + isExcluded[key] = true + } + for _, key := range configJSON.ExcludeMetrics { + isExcluded[key] = true + } + + r.config.ClientConfigs = append( + r.config.ClientConfigs, + IPMIReceiverClientConfig{ + Protocol: protocol, + Fanout: fanout, + DriverType: driverType, + NumHosts: numHosts, + IPMIHosts: strings.Join(ipmiHostList, ","), + IPMI2HostMapping: ipmi2HostMapping, + Username: username, + Password: password, + CLIOptions: cliOptions, + isExcluded: isExcluded, + }) + } + + if totalNumHosts == 0 { + err := fmt.Errorf("at least one IPMI host config is required") + cclog.ComponentError(r.name, err) + return nil, err + } + + cclog.ComponentInfo(r.name, "monitoring", totalNumHosts, "IPMI hosts") + return r, nil +} diff --git a/receivers/ipmiReceiver.md b/receivers/ipmiReceiver.md new file mode 100644 index 0000000..7d7369f --- /dev/null +++ b/receivers/ipmiReceiver.md @@ -0,0 +1,48 @@ +## IPMI Receiver + +The IPMI Receiver uses `ipmi-sensors` from the [FreeIPMI](https://www.gnu.org/software/freeipmi/) project to read IPMI sensor readings and sensor data repository (SDR) information. The available metrics depend on the sensors provided by the hardware vendor but typically contain temperature, fan speed, voltage and power metrics. + +### Configuration structure + +```json +{ + "": { + "type": "ipmi", + "interval": "30s", + "fanout": 256, + "username": "", + "password": "", + "endpoint": "ipmi-sensors://%h-bmc", + "exclude_metrics": [ "fan_speed", "voltage" ], + "client_config": [ + { + "host_list": "n[1,2-4]" + }, + { + "host_list": "n[5-6]", + "driver_type": "LAN", + "cli_options": [ "--workaround-flags=..." ], + "password": "" + } + ] + } +} +``` + +Global settings: + +- `interval`: How often the IPMI sensor metrics should be read and send to the sink (default: 30 s) + +Global and per IPMI device settings (per IPMI device settings overwrite the global settings): + +- `exclude_metrics`: list of excluded metrics e.g. fan_speed, power, temperature, utilization, voltage +- `fanout`: Maximum number of simultaneous IPMI connections (default: 64) +- `driver_type`: Out of band IPMI driver (default: LAN_2_0) +- `username`: User name to authenticate with +- `password`: Password to use for authentication +- `endpoint`: URL of the IPMI device (placeholder `%h` gets replaced by the hostname) + +Per IPMI device settings: + +- `host_list`: List of hosts with the same client configuration +- `cli_options`: Additional command line options for ipmi-sensors diff --git a/receivers/receiveManager.go b/receivers/receiveManager.go index c47c3cc..31853fe 100644 --- a/receivers/receiveManager.go +++ b/receivers/receiveManager.go @@ -2,6 +2,7 @@ package receivers import ( "encoding/json" + "fmt" "os" "sync" @@ -10,6 +11,7 @@ import ( ) var AvailableReceivers = map[string]func(name string, config json.RawMessage) (Receiver, error){ + "ipmi": NewIPMIReceiver, "nats": NewNatsReceiver, "redfish": NewRedfishReceiver, } @@ -71,9 +73,13 @@ func (rm *receiveManager) AddInput(name string, rawConfig json.RawMessage) error cclog.ComponentError("ReceiveManager", "SKIP", config.Type, "JSON config error:", err.Error()) return err } + if config.Type == "" { + cclog.ComponentError("ReceiveManager", "SKIP", "JSON config for receiver", name, "does not contain a receiver type") + return fmt.Errorf("JSON config for receiver %s does not contain a receiver type", name) + } if _, found := AvailableReceivers[config.Type]; !found { - cclog.ComponentError("ReceiveManager", "SKIP", config.Type, "unknown receiver:", err.Error()) - return err + cclog.ComponentError("ReceiveManager", "SKIP", "unknown receiver type:", config.Type) + return fmt.Errorf("unknown receiver type: %s", config.Type) } r, err := AvailableReceivers[config.Type](name, rawConfig) if err != nil { diff --git a/receivers/redfishReceiver.go b/receivers/redfishReceiver.go index de9744d..767a2de 100644 --- a/receivers/redfishReceiver.go +++ b/receivers/redfishReceiver.go @@ -1,9 +1,11 @@ package receivers import ( + "bytes" "crypto/tls" "encoding/json" "fmt" + "io" "net/http" "strconv" "strings" @@ -12,6 +14,7 @@ import ( cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric" + "github.com/ClusterCockpit/cc-metric-collector/pkg/hostlist" // See: https://pkg.go.dev/github.com/stmcginnis/gofish "github.com/stmcginnis/gofish" @@ -346,9 +349,17 @@ func (r *RedfishReceiver) readProcessorMetrics( // This property shall contain the temperature, in Celsius, of the processor. TemperatureCelsius float32 `json:"TemperatureCelsius"` } - err = json.NewDecoder(resp.Body).Decode(&processorMetrics) + body, err := io.ReadAll(resp.Body) if err != nil { - return fmt.Errorf("unable to decode JSON for processor metrics: %+w", err) + return fmt.Errorf("unable to read JSON for processor metrics: %+w", err) + } + err = json.Unmarshal(body, &processorMetrics) + if err != nil { + return fmt.Errorf( + "unable to unmarshal JSON='%s' for processor metrics: %+w", + string(body), + err, + ) } processorMetrics.SetClient(processor.Client) @@ -380,7 +391,9 @@ func (r *RedfishReceiver) readProcessorMetrics( namePower := "consumed_power" - if !clientConfig.isExcluded[namePower] { + if !clientConfig.isExcluded[namePower] && + // Some servers return "ConsumedPowerWatt":65535 instead of "ConsumedPowerWatt":null + processorMetrics.ConsumedPowerWatt != 65535 { y, err := lp.New(namePower, tags, metaPower, map[string]interface{}{ "value": processorMetrics.ConsumedPowerWatt, @@ -630,10 +643,10 @@ func NewRedfishReceiver(name string, config json.RawMessage) (Receiver, error) { ExcludeMetrics []string `json:"exclude_metrics,omitempty"` ClientConfigs []struct { - HostList []string `json:"host_list"` // List of hosts with the same client configuration - Username *string `json:"username"` // User name to authenticate with - Password *string `json:"password"` // Password to use for authentication - Endpoint *string `json:"endpoint"` // URL of the redfish service + HostList string `json:"host_list"` // List of hosts with the same client configuration + Username *string `json:"username"` // User name to authenticate with + Password *string `json:"password"` // Password to use for authentication + Endpoint *string `json:"endpoint"` // URL of the redfish service // Per client disable collection of power,processor or thermal metrics DisablePowerMetrics bool `json:"disable_power_metrics"` @@ -660,8 +673,9 @@ func NewRedfishReceiver(name string, config json.RawMessage) (Receiver, error) { // Read the redfish receiver specific JSON config if len(config) > 0 { - err := json.Unmarshal(config, &configJSON) - if err != nil { + d := json.NewDecoder(bytes.NewReader(config)) + d.DisallowUnknownFields() + if err := d.Decode(&configJSON); err != nil { cclog.ComponentError(r.name, "Error reading config:", err.Error()) return nil, err } @@ -763,7 +777,14 @@ func NewRedfishReceiver(name string, config json.RawMessage) (Receiver, error) { isExcluded[key] = true } - for _, host := range clientConfigJSON.HostList { + hostList, err := hostlist.Expand(clientConfigJSON.HostList) + if err != nil { + err := fmt.Errorf("client config number %d failed to parse host list %s: %v", + i, clientConfigJSON.HostList, err) + cclog.ComponentError(r.name, err) + return nil, err + } + for _, host := range hostList { // Endpoint of the redfish service endpoint := strings.Replace(endpoint_pattern, "%h", host, -1) diff --git a/receivers/redfishReceiver.md b/receivers/redfishReceiver.md index 1bc3ed8..7eb6f34 100644 --- a/receivers/redfishReceiver.md +++ b/receivers/redfishReceiver.md @@ -8,22 +8,22 @@ The Redfish receiver uses the [Redfish (specification)](https://www.dmtf.org/sta { "": { "type": "redfish", - "username": "", - "password": "", + "username": "", + "password": "", "endpoint": "https://%h-bmc", "exclude_metrics": [ "min_consumed_watts" ], "client_config": [ { - "host_list": [ "", "" ] + "host_list": "n[1,2-4]" }, { - "host_list": [ "", "" ] + "host_list": "n5" "disable_power_metrics": true }, { - "host_list": [ "" ], - "username": "", - "password": "", + "host_list": "n6" ], + "username": "", + "password": "", "endpoint": "https://%h-BMC", "disable_thermal_metrics": true }