Compare commits

..

2 Commits

Author SHA1 Message Date
Thomas Roehl
813b59b16e Required LIKWID 5.3.0 2024-04-10 20:00:58 +02:00
Thomas Roehl
303fe1d80f Add collector for always running energy measurements with LIKWID 2024-04-10 19:57:08 +02:00
14 changed files with 488 additions and 761 deletions

View File

@@ -45,10 +45,10 @@ jobs:
- name: Install build dependencies
run: |
dnf --assumeyes install \
http://mirror.centos.org/centos/8-stream/AppStream/x86_64/os/Packages/golang-1.21.7-1.module_el8+960+4060efbe.x86_64.rpm \
http://mirror.centos.org/centos/8-stream/AppStream/x86_64/os/Packages/golang-bin-1.21.7-1.module_el8+960+4060efbe.x86_64.rpm \
http://mirror.centos.org/centos/8-stream/AppStream/x86_64/os/Packages/golang-src-1.21.7-1.module_el8+960+4060efbe.noarch.rpm \
http://mirror.centos.org/centos/8-stream/AppStream/x86_64/os/Packages/go-toolset-1.21.7-1.module_el8+960+4060efbe.x86_64.rpm
http://mirror.centos.org/centos/8-stream/AppStream/x86_64/os/Packages/golang-1.20.6-2.module_el8+658+f14b2092.x86_64.rpm \
http://mirror.centos.org/centos/8-stream/AppStream/x86_64/os/Packages/golang-bin-1.20.6-2.module_el8+658+f14b2092.x86_64.rpm \
http://mirror.centos.org/centos/8-stream/AppStream/x86_64/os/Packages/golang-src-1.20.6-2.module_el8+658+f14b2092.noarch.rpm \
http://mirror.centos.org/centos/8-stream/AppStream/x86_64/os/Packages/go-toolset-1.20.6-1.module_el8+602+8bb8a8d6.x86_64.rpm
- name: RPM build MetricCollector
id: rpmbuild
@@ -115,10 +115,10 @@ jobs:
- name: Install build dependencies
run: |
dnf --assumeyes --disableplugin=subscription-manager install \
http://mirror.centos.org/centos/8-stream/AppStream/x86_64/os/Packages/golang-1.21.7-1.module_el8+960+4060efbe.x86_64.rpm \
http://mirror.centos.org/centos/8-stream/AppStream/x86_64/os/Packages/golang-bin-1.21.7-1.module_el8+960+4060efbe.x86_64.rpm \
http://mirror.centos.org/centos/8-stream/AppStream/x86_64/os/Packages/golang-src-1.21.7-1.module_el8+960+4060efbe.noarch.rpm \
http://mirror.centos.org/centos/8-stream/AppStream/x86_64/os/Packages/go-toolset-1.21.7-1.module_el8+960+4060efbe.x86_64.rpm
http://mirror.centos.org/centos/8-stream/AppStream/x86_64/os/Packages/golang-1.20.6-2.module_el8+658+f14b2092.x86_64.rpm \
http://mirror.centos.org/centos/8-stream/AppStream/x86_64/os/Packages/golang-bin-1.20.6-2.module_el8+658+f14b2092.x86_64.rpm \
http://mirror.centos.org/centos/8-stream/AppStream/x86_64/os/Packages/golang-src-1.20.6-2.module_el8+658+f14b2092.noarch.rpm \
http://mirror.centos.org/centos/8-stream/AppStream/x86_64/os/Packages/go-toolset-1.20.6-1.module_el8+602+8bb8a8d6.x86_64.rpm
- name: RPM build MetricCollector
id: rpmbuild

View File

@@ -10,6 +10,32 @@ on:
jobs:
#
# Job build-1-20
# Build on latest Ubuntu using golang version 1.20
#
build-1-20:
runs-on: ubuntu-latest
steps:
# See: https://github.com/marketplace/actions/checkout
# Checkout git repository and submodules
- name: Checkout
uses: actions/checkout@v4
with:
submodules: recursive
# See: https://github.com/marketplace/actions/setup-go-environment
- name: Setup Golang
uses: actions/setup-go@v4
with:
go-version: '1.20'
- name: Build MetricCollector
run: make
- name: Run MetricCollector once
run: ./cc-metric-collector --once --config .github/ci-config.json
#
# Job build-1-21
# Build on latest Ubuntu using golang version 1.21
@@ -36,32 +62,6 @@ jobs:
- name: Run MetricCollector once
run: ./cc-metric-collector --once --config .github/ci-config.json
#
# Job build-1-22
# Build on latest Ubuntu using golang version 1.22
#
build-1-22:
runs-on: ubuntu-latest
steps:
# See: https://github.com/marketplace/actions/checkout
# Checkout git repository and submodules
- name: Checkout
uses: actions/checkout@v4
with:
submodules: recursive
# See: https://github.com/marketplace/actions/setup-go-environment
- name: Setup Golang
uses: actions/setup-go@v4
with:
go-version: '1.22'
- name: Build MetricCollector
run: make
- name: Run MetricCollector once
run: ./cc-metric-collector --once --config .github/ci-config.json
#
# Build on AlmaLinux 8 using go-toolset
#
@@ -92,10 +92,10 @@ jobs:
- name: Install build dependencies
run: |
dnf --assumeyes install \
http://mirror.centos.org/centos/8-stream/AppStream/x86_64/os/Packages/golang-1.21.7-1.module_el8+960+4060efbe.x86_64.rpm \
http://mirror.centos.org/centos/8-stream/AppStream/x86_64/os/Packages/golang-bin-1.21.7-1.module_el8+960+4060efbe.x86_64.rpm \
http://mirror.centos.org/centos/8-stream/AppStream/x86_64/os/Packages/golang-src-1.21.7-1.module_el8+960+4060efbe.noarch.rpm \
http://mirror.centos.org/centos/8-stream/AppStream/x86_64/os/Packages/go-toolset-1.21.7-1.module_el8+960+4060efbe.x86_64.rpm
http://mirror.centos.org/centos/8-stream/AppStream/x86_64/os/Packages/golang-1.20.6-2.module_el8+658+f14b2092.x86_64.rpm \
http://mirror.centos.org/centos/8-stream/AppStream/x86_64/os/Packages/golang-bin-1.20.6-2.module_el8+658+f14b2092.x86_64.rpm \
http://mirror.centos.org/centos/8-stream/AppStream/x86_64/os/Packages/golang-src-1.20.6-2.module_el8+658+f14b2092.noarch.rpm \
http://mirror.centos.org/centos/8-stream/AppStream/x86_64/os/Packages/go-toolset-1.20.6-1.module_el8+602+8bb8a8d6.x86_64.rpm
- name: RPM build MetricCollector
id: rpmbuild
@@ -130,10 +130,10 @@ jobs:
- name: Install build dependencies
run: |
dnf --assumeyes --disableplugin=subscription-manager install \
http://mirror.centos.org/centos/8-stream/AppStream/x86_64/os/Packages/golang-1.21.7-1.module_el8+960+4060efbe.x86_64.rpm \
http://mirror.centos.org/centos/8-stream/AppStream/x86_64/os/Packages/golang-bin-1.21.7-1.module_el8+960+4060efbe.x86_64.rpm \
http://mirror.centos.org/centos/8-stream/AppStream/x86_64/os/Packages/golang-src-1.21.7-1.module_el8+960+4060efbe.noarch.rpm \
http://mirror.centos.org/centos/8-stream/AppStream/x86_64/os/Packages/go-toolset-1.21.7-1.module_el8+960+4060efbe.x86_64.rpm
http://mirror.centos.org/centos/8-stream/AppStream/x86_64/os/Packages/golang-1.20.6-2.module_el8+658+f14b2092.x86_64.rpm \
http://mirror.centos.org/centos/8-stream/AppStream/x86_64/os/Packages/golang-bin-1.20.6-2.module_el8+658+f14b2092.x86_64.rpm \
http://mirror.centos.org/centos/8-stream/AppStream/x86_64/os/Packages/golang-src-1.20.6-2.module_el8+658+f14b2092.noarch.rpm \
http://mirror.centos.org/centos/8-stream/AppStream/x86_64/os/Packages/go-toolset-1.20.6-1.module_el8+602+8bb8a8d6.x86_64.rpm
- name: RPM build MetricCollector
id: rpmbuild
@@ -174,4 +174,4 @@ jobs:
run: |
export PATH=/usr/local/go/bin:/usr/local/go/pkg/tool/linux_amd64:$PATH
git config --global --add safe.directory /__w/cc-metric-collector/cc-metric-collector
make DEB
make DEB

View File

@@ -1,5 +1,5 @@
# LIKWID version
LIKWID_VERSION := 5.2.2
LIKWID_VERSION := 5.3.0
LIKWID_INSTALLED_FOLDER := $(shell dirname $$(which likwid-topology 2>/dev/null) 2>/dev/null)
LIKWID_FOLDER := $(CURDIR)/likwid

View File

@@ -41,6 +41,7 @@ var AvailableCollectors = map[string]MetricCollector{
"self": new(SelfCollector),
"schedstat": new(SchedstatCollector),
"nfsiostat": new(NfsIOStatCollector),
"likwidenergy": new(LikwidEnergyCollector),
}
// Metric collector manager data structure

View File

@@ -99,7 +99,10 @@ func (m *CustomCmdCollector) Read(interval time.Duration, output chan lp.CCMetri
continue
}
output <- lp.FromInfluxMetric(c)
y := lp.FromInfluxMetric(c)
if err == nil {
output <- y
}
}
}
for _, file := range m.files {
@@ -118,7 +121,10 @@ func (m *CustomCmdCollector) Read(interval time.Duration, output chan lp.CCMetri
if skip {
continue
}
output <- lp.FromInfluxMetric(f)
y := lp.FromInfluxMetric(f)
if err == nil {
output <- y
}
}
}
}

View File

@@ -0,0 +1,281 @@
package collectors
/*
#cgo CFLAGS: -I./likwid
#cgo LDFLAGS: -Wl,--unresolved-symbols=ignore-in-object-files
#include <stdlib.h>
#include <likwid.h>
*/
import "C"
import (
"encoding/json"
"fmt"
"os"
"strings"
"time"
cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
topo "github.com/ClusterCockpit/cc-metric-collector/pkg/ccTopology"
"github.com/NVIDIA/go-nvml/pkg/dl"
)
const (
LIKWIDENERGY_LIB_NAME = "liblikwid.so"
LIKWIDENERGY_LIB_DL_FLAGS = dl.RTLD_LAZY | dl.RTLD_GLOBAL
LIKWIDENERGY_DEF_ACCESSMODE = "direct"
LIKWIDENERGY_DEF_LOCKFILE = "/var/run/likwid.lock"
)
// These are the fields we read from the JSON configuration
type LikwidEnergyCollectorConfig struct {
AccessMode string `json:"access_mode,omitempty"`
DaemonPath string `json:"accessdaemon_path,omitempty"`
LibraryPath string `json:"liblikwid_path,omitempty"`
LockfilePath string `json:"lockfile_path,omitempty"`
SendDiff bool `json:"send_difference,omitempty"`
SendAbs bool `json:"send_absolute,omitempty"`
}
type LikwidEnergyDomainEntry struct {
readcpu int
value uint32
total uint64
tags map[string]string
}
type LikwidEnergyDomain struct {
values map[int]LikwidEnergyDomainEntry
granularity string
metricname string
domaintype int
energyUnit float64
}
// This contains all variables we need during execution and the variables
// defined by metricCollector (name, init, ...)
type LikwidEnergyCollector struct {
metricCollector
config LikwidEnergyCollectorConfig // the configuration structure
meta map[string]string // default meta information
tags map[string]string // default tags
domains map[int]LikwidEnergyDomain
}
// Init initializes the sample collector
// Called once by the collector manager
// All tags, meta data tags and metrics that do not change over the runtime should be set here
func (m *LikwidEnergyCollector) Init(config json.RawMessage) error {
var err error = nil
// Always set the name early in Init() to use it in cclog.Component* functions
m.name = "LikwidEnergyCollector"
// This is for later use, also call it early
m.setup()
// Tell whether the collector should be run in parallel with others (reading files, ...)
// or it should be run serially, mostly for collectors actually doing measurements
// because they should not measure the execution of the other collectors
m.parallel = true
// Define meta information sent with each metric
// (Can also be dynamic or this is the basic set with extension through AddMeta())
m.meta = map[string]string{"source": m.name, "group": "LIKWID", "unit": "Joules"}
// Define tags sent with each metric
// The 'type' tag is always needed, it defines the granularity of the metric
// node -> whole system
// socket -> CPU socket (requires socket ID as 'type-id' tag)
// die -> CPU die (requires CPU die ID as 'type-id' tag)
// memoryDomain -> NUMA domain (requires NUMA domain ID as 'type-id' tag)
// llc -> Last level cache (requires last level cache ID as 'type-id' tag)
// core -> single CPU core that may consist of multiple hardware threads (SMT) (requires core ID as 'type-id' tag)
// hwthtread -> single CPU hardware thread (requires hardware thread ID as 'type-id' tag)
// accelerator -> A accelerator device like GPU or FPGA (requires an accelerator ID as 'type-id' tag)
m.tags = map[string]string{}
// Read in the JSON configuration
m.config.AccessMode = LIKWID_DEF_ACCESSMODE
m.config.LibraryPath = LIKWID_LIB_NAME
m.config.LockfilePath = LIKWID_DEF_LOCKFILE
m.config.SendAbs = true
m.config.SendDiff = true
if len(config) > 0 {
err = json.Unmarshal(config, &m.config)
if err != nil {
cclog.ComponentError(m.name, "Error reading config:", err.Error())
return err
}
}
cclog.ComponentDebug(m.name, "Opening ", m.config.LibraryPath)
lib := dl.New(m.config.LibraryPath, LIKWID_LIB_DL_FLAGS)
if lib == nil {
return fmt.Errorf("error instantiating DynamicLibrary for %s", m.config.LibraryPath)
}
err = lib.Open()
if err != nil {
return fmt.Errorf("error opening %s: %v", m.config.LibraryPath, err)
}
cclog.ComponentDebug(m.name, "Init topology ", m.config.AccessMode)
ret := C.topology_init()
if ret != 0 {
return fmt.Errorf("error initializing topology: %d", ret)
}
cclog.ComponentDebug(m.name, "Setting accessmode ", m.config.AccessMode)
switch m.config.AccessMode {
case "direct":
C.HPMmode(0)
case "accessdaemon":
if len(m.config.DaemonPath) > 0 {
p := os.Getenv("PATH")
os.Setenv("PATH", m.config.DaemonPath+":"+p)
}
C.HPMmode(1)
retCode := C.HPMinit()
if retCode != 0 {
err := fmt.Errorf("C.HPMinit() failed with return code %v", retCode)
cclog.ComponentError(m.name, err.Error())
}
}
initCpus := make([]int, 0)
ret = C.HPMaddThread(0)
if ret != 0 {
return fmt.Errorf("error initializing access: %d", ret)
}
initCpus = append(initCpus, 0)
cinfo := C.get_cpuInfo()
domainnames := make(map[int]string)
if cinfo.isIntel == C.int(1) {
domainnames[0] = "pkg"
domainnames[1] = "pp0"
domainnames[2] = "pp1"
domainnames[3] = "dram"
domainnames[4] = "platform"
} else {
switch cinfo.family {
case 0x17:
domainnames[0] = "core"
domainnames[1] = "pkg"
case 0x19:
switch cinfo.model {
case 0x01, 0x21, 0x50:
domainnames[0] = "core"
domainnames[1] = "pkg"
case 0x61, 0x11:
domainnames[0] = "core"
domainnames[1] = "l3"
}
}
}
// Set up everything that the collector requires during the Read() execution
// Check files required, test execution of some commands, create data structure
// for all topological entities (sockets, NUMA domains, ...)
// Return some useful error message in case of any failures
cclog.ComponentDebug(m.name, "Initializing Power module")
ret = C.power_init(0)
if ret == C.int(0) {
cclog.ComponentPrint(m.name, "No RAPL support")
}
m.domains = make(map[int]LikwidEnergyDomain)
Pinfo := C.get_powerInfo()
for i := 0; i < int(Pinfo.numDomains); i++ {
d := Pinfo.domains[C.int(i)]
name := domainnames[int(d._type)]
domain := LikwidEnergyDomain{
values: make(map[int]LikwidEnergyDomainEntry),
metricname: fmt.Sprintf("likwidenergy_%s", strings.ToLower(name)),
granularity: "socket",
domaintype: int(d._type),
energyUnit: float64(C.power_getEnergyUnit(C.int(d._type))),
}
if name == "core" {
domain.granularity = "core"
}
for _, c := range topo.GetTypeList(domain.granularity) {
clist := topo.GetSocketHwthreads(c)
if len(clist) > 0 {
var cur C.PowerData
if _, ok := intArrayContains(initCpus, clist[0]); !ok {
initCpus = append(initCpus, clist[0])
C.HPMaddThread(C.int(clist[0]))
}
cclog.ComponentDebug(m.name, "Reading current value on CPU ", clist[0], " for ", domain.metricname, "on", domain.granularity, c)
ret = C.power_start(&cur, C.int(clist[0]), C.PowerType(domain.domaintype))
cclog.ComponentDebug(m.name, "Reading ", uint64(cur.before))
if ret == 0 {
domain.values[c] = LikwidEnergyDomainEntry{
readcpu: clist[0],
value: uint32(cur.before),
total: uint64(cur.before),
tags: map[string]string{
"type": domain.granularity,
"type-id": fmt.Sprintf("%d", c),
},
}
}
}
}
cclog.ComponentDebug(m.name, "Adding domain ", domain.metricname, " with granularity ", domain.granularity)
m.domains[domain.domaintype] = domain
}
// Set this flag only if everything is initialized properly, all required files exist, ...
m.init = true
return err
}
// Read collects all metrics belonging to the sample collector
// and sends them through the output channel to the collector manager
func (m *LikwidEnergyCollector) Read(interval time.Duration, output chan lp.CCMetric) {
// Create a sample metric
timestamp := time.Now()
for dt, domain := range m.domains {
for i, entry := range domain.values {
var cur C.PowerData
ret := C.power_start(&cur, C.int(entry.readcpu), C.PowerType(dt))
if ret == 0 {
now := uint32(cur.before)
diff := now - entry.value
if now < entry.value {
diff = (^uint32(0)) - entry.value
diff += now
}
if m.config.SendDiff {
y, err := lp.New(domain.metricname, entry.tags, m.meta, map[string]interface{}{"value": float64(diff) * domain.energyUnit}, timestamp)
if err == nil {
for k, v := range m.tags {
y.AddTag(k, v)
}
// Send it to output channel
output <- y
}
}
if m.config.SendAbs {
total := float64(entry.total + uint64(diff))
y, err := lp.New(fmt.Sprintf("%s_abs", domain.metricname), entry.tags, m.meta, map[string]interface{}{"value": total * domain.energyUnit}, timestamp)
if err == nil {
for k, v := range m.tags {
y.AddTag(k, v)
}
// Send it to output channel
output <- y
}
}
entry.value = uint32(cur.before)
entry.total += uint64(diff)
domain.values[i] = entry
}
}
}
}
// Close metric collector: close network connection, close files, close libraries, ...
// Called once by the collector manager
func (m *LikwidEnergyCollector) Close() {
C.power_finalize()
C.topology_finalize()
// Unset flag
m.init = false
}

View File

@@ -0,0 +1,20 @@
## `likwidenergy` collector
In contrast to the more general [`likwid` collector](./likwidMetric.md), this collector just reads the RAPL counters to provide energy metrics. In contrast to the `likwid` collector, this collector keeps the energy counters running the whole time and not just of a measurement interval. It covers all RAPL domains (`PKG`, `DRAM`, `PP0`, `PP1`, ...). Depending whether the domain is per socket, per L3 segment or per core, metrics are read and send.
```json
{
"likwidenergy" : {
"liblikwid_path" : "/path/to/liblikwid.so",
"accessdaemon_path" : "/folder/that/contains/likwid-accessD",
"access_mode" : "direct or accessdaemon",
"send_difference": true,
"send_absolute": true
}
}
```
The first three entries (`liblikwid_path`, `accessdaemon_path` and `access_mode`) are required to set up the access to the RAPL counters. The `access_mode` = `perf_event` is not supported at the moment.
With `send_differences` the difference to the last measurement is provided to the system. With `send_absolute`, the absolute value since start of the system is submitted as metric. It reads the counter at initialization and then updates the value after each measurement.

24
go.mod
View File

@@ -1,6 +1,6 @@
module github.com/ClusterCockpit/cc-metric-collector
go 1.21
go 1.20
require (
github.com/ClusterCockpit/cc-units v0.4.0
@@ -12,30 +12,30 @@ require (
github.com/influxdata/influxdb-client-go/v2 v2.13.0
github.com/influxdata/line-protocol v0.0.0-20210922203350-b1ad95c89adf
github.com/influxdata/line-protocol/v2 v2.2.1
github.com/nats-io/nats.go v1.33.1
github.com/prometheus/client_golang v1.19.0
github.com/nats-io/nats.go v1.32.0
github.com/prometheus/client_golang v1.18.0
github.com/stmcginnis/gofish v0.15.0
github.com/tklauser/go-sysconf v0.3.13
golang.design/x/thread v0.0.0-20210122121316-335e9adffdf1
golang.org/x/exp v0.0.0-20240222234643-814bf88cf225
golang.org/x/sys v0.18.0
golang.org/x/exp v0.0.0-20240119083558-1b970713d09a
golang.org/x/sys v0.16.0
)
require (
github.com/apapsch/go-jsonmerge/v2 v2.0.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/klauspost/compress v1.17.7 // indirect
github.com/google/uuid v1.5.0 // indirect
github.com/klauspost/compress v1.17.4 // indirect
github.com/nats-io/nkeys v0.4.7 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/oapi-codegen/runtime v1.1.1 // indirect
github.com/prometheus/client_model v0.6.0 // indirect
github.com/prometheus/common v0.49.0 // indirect
github.com/prometheus/client_model v0.5.0 // indirect
github.com/prometheus/common v0.46.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
github.com/shopspring/decimal v1.3.1 // indirect
github.com/tklauser/numcpus v0.7.0 // indirect
golang.org/x/crypto v0.21.0 // indirect
golang.org/x/net v0.22.0 // indirect
google.golang.org/protobuf v1.33.0 // indirect
golang.org/x/crypto v0.18.0 // indirect
golang.org/x/net v0.20.0 // indirect
google.golang.org/protobuf v1.32.0 // indirect
)

58
go.sum
View File

@@ -7,6 +7,7 @@ github.com/NVIDIA/go-nvml v0.12.0-2 h1:Sg239yy7jmopu/cuvYauoMj9fOpcGMngxVxxS1EBX
github.com/NVIDIA/go-nvml v0.12.0-2/go.mod h1:7ruy85eOM73muOc/I37euONSwEyFqZsv5ED9AogD4G0=
github.com/PaesslerAG/gval v1.2.2 h1:Y7iBzhgE09IGTt5QgGQ2IdaYYYOU134YGHBThD+wm9E=
github.com/PaesslerAG/gval v1.2.2/go.mod h1:XRFLwvmkTEdYziLdaCeCa5ImcGVrfQbeNUbVR+C6xac=
github.com/PaesslerAG/jsonpath v0.1.0 h1:gADYeifvlqK3R3i2cR5B4DGgxLXIPb3TRTH1mGi0jPI=
github.com/PaesslerAG/jsonpath v0.1.0/go.mod h1:4BzmtoM/PI8fPO4aQGIusjGxGir2BzcV0grWtFzq1Y8=
github.com/RaveNoX/go-jsoncommentstrip v1.0.0/go.mod h1:78ihd09MekBnJnxpICcwzCMzGrKSKYe4AqU6PDYYpjk=
github.com/apapsch/go-jsonmerge/v2 v2.0.0 h1:axGnT1gRIfimI7gJifB699GoE/oq+F2MU7Dml6nw9rQ=
@@ -18,16 +19,20 @@ github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/frankban/quicktest v1.11.0/go.mod h1:K+q6oSqb0W0Ininfk863uOk1lMy69l/P6txr3mVT54s=
github.com/frankban/quicktest v1.11.2/go.mod h1:K+q6oSqb0W0Ininfk863uOk1lMy69l/P6txr3mVT54s=
github.com/frankban/quicktest v1.13.0 h1:yNZif1OkDfNoDfb9zZa9aXIpejNR4F23Wely0c+Qdqk=
github.com/frankban/quicktest v1.13.0/go.mod h1:qLE0fzW0VuyUAJgPU19zByoIr0HtCHN/r/VLSOOIySU=
github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA=
github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM=
github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/uuid v1.5.0 h1:1p67kYwdtXjb0gL0BPiP1Av9wiZPo5A8z2cWkTZ+eyU=
github.com/google/uuid v1.5.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY=
github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ=
github.com/influxdata/influxdb-client-go/v2 v2.13.0 h1:ioBbLmR5NMbAjP4UVA5r9b5xGjpABD7j65pI8kFphDM=
@@ -35,20 +40,24 @@ github.com/influxdata/influxdb-client-go/v2 v2.13.0/go.mod h1:k+spCbt9hcvqvUiz0s
github.com/influxdata/line-protocol v0.0.0-20210922203350-b1ad95c89adf h1:7JTmneyiNEwVBOHSjoMxiWAqB992atOeepeFYegn5RU=
github.com/influxdata/line-protocol v0.0.0-20210922203350-b1ad95c89adf/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo=
github.com/influxdata/line-protocol-corpus v0.0.0-20210519164801-ca6fa5da0184/go.mod h1:03nmhxzZ7Xk2pdG+lmMd7mHDfeVOYFyhOgwO61qWU98=
github.com/influxdata/line-protocol-corpus v0.0.0-20210922080147-aa28ccfb8937 h1:MHJNQ+p99hFATQm6ORoLmpUCF7ovjwEFshs/NHzAbig=
github.com/influxdata/line-protocol-corpus v0.0.0-20210922080147-aa28ccfb8937/go.mod h1:BKR9c0uHSmRgM/se9JhFHtTT7JTO67X23MtKMHtZcpo=
github.com/influxdata/line-protocol/v2 v2.0.0-20210312151457-c52fdecb625a/go.mod h1:6+9Xt5Sq1rWx+glMgxhcg2c0DUaehK+5TDcPZ76GypY=
github.com/influxdata/line-protocol/v2 v2.1.0/go.mod h1:QKw43hdUBg3GTk2iC3iyCxksNj7PX9aUSeYOYE/ceHY=
github.com/influxdata/line-protocol/v2 v2.2.1 h1:EAPkqJ9Km4uAxtMRgUubJyqAr6zgWM0dznKMLRauQRE=
github.com/influxdata/line-protocol/v2 v2.2.1/go.mod h1:DmB3Cnh+3oxmG6LOBIxce4oaL4CPj3OmMPgvauXh+tM=
github.com/juju/gnuflag v0.0.0-20171113085948-2ce1bb71843d/go.mod h1:2PavIy+JPciBPrBUjwbNvtwB6RQlve+hkpll6QSNmOE=
github.com/klauspost/compress v1.17.7 h1:ehO88t2UGzQK66LMdE8tibEd1ErmzZjNEqWkjLAKQQg=
github.com/klauspost/compress v1.17.7/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/klauspost/compress v1.17.4 h1:Ej5ixsIri7BrIjBkRZLTo6ghwrEtHFk7ijlczPW4fZ4=
github.com/klauspost/compress v1.17.4/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/nats-io/nats.go v1.33.1 h1:8TxLZZ/seeEfR97qV0/Bl939tpDnt2Z2fK3HkPypj70=
github.com/nats-io/nats.go v1.33.1/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
github.com/nats-io/nats.go v1.32.0 h1:Bx9BZS+aXYlxW08k8Gd3yR2s73pV5XSoAQUyp1Kwvp0=
github.com/nats-io/nats.go v1.32.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
@@ -56,15 +65,18 @@ github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OS
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/oapi-codegen/runtime v1.1.1 h1:EXLHh0DXIJnWhdRPN2w4MXAzFyE4CskzhNLUmtpMYro=
github.com/oapi-codegen/runtime v1.1.1/go.mod h1:SK9X900oXmPWilYR5/WKPzt3Kqxn/uS/+lbpREv+eCg=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v1.19.0 h1:ygXvpU1AoN1MhdzckN+PyD9QJOSD4x7kmXYlnfbA6JU=
github.com/prometheus/client_golang v1.19.0/go.mod h1:ZRM9uEAypZakd+q/x7+gmsvXdURP+DABIEIjnmDdp+k=
github.com/prometheus/client_model v0.6.0 h1:k1v3CzpSRUTrKMppY35TLwPvxHqBu0bYgxZzqGIgaos=
github.com/prometheus/client_model v0.6.0/go.mod h1:NTQHnmxFpouOD0DpvP4XujX3CdOAGQPoaGhyTchlyt8=
github.com/prometheus/common v0.49.0 h1:ToNTdK4zSnPVJmh698mGFkDor9wBI/iGaJy5dbH1EgI=
github.com/prometheus/common v0.49.0/go.mod h1:Kxm+EULxRbUkjGU6WFsQqo3ORzB4tyKvlWFOE9mB2sE=
github.com/prometheus/client_golang v1.18.0 h1:HzFfmkOzH5Q8L8G+kSJKUx5dtG87sewO+FoDDqP5Tbk=
github.com/prometheus/client_golang v1.18.0/go.mod h1:T+GXkCk5wSJyOqMIzVgvvjFDlkOQntgjkJWKrN5txjA=
github.com/prometheus/client_model v0.5.0 h1:VQw1hfvPvk3Uv6Qf29VrPF32JB6rtbgI6cYPYQjL0Qw=
github.com/prometheus/client_model v0.5.0/go.mod h1:dTiFglRmd66nLR9Pv9f0mZi7B7fk5Pm3gvsjB5tr+kI=
github.com/prometheus/common v0.46.0 h1:doXzt5ybi1HBKpsZOL0sSkaNHJJqkyfEWZGGqqScV0Y=
github.com/prometheus/common v0.46.0/go.mod h1:Tp0qkxpb9Jsg54QMe+EAmqXkSV7Evdy1BTn+g2pa/hQ=
github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo=
github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
github.com/shopspring/decimal v1.3.1 h1:2Usl1nmF/WZucqkFZhnfFYxxxu8LG21F6nPQBE5gKV8=
github.com/shopspring/decimal v1.3.1/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o=
github.com/spkg/bom v0.0.0-20160624110644-59b7046e48ad/go.mod h1:qLr4V1qq6nMqFKkMo8ZTx3f+BZEkzsRUY10Xsm2mwU0=
@@ -76,6 +88,7 @@ github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpE
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/tklauser/go-sysconf v0.3.13 h1:GBUpcahXSpR2xN01jhkNAbTLRk2Yzgggk8IM08lq3r4=
github.com/tklauser/go-sysconf v0.3.13/go.mod h1:zwleP4Q4OehZHGn4CYZDipCgg9usW5IJePewFCGVEa0=
@@ -83,20 +96,21 @@ github.com/tklauser/numcpus v0.7.0 h1:yjuerZP127QG9m5Zh/mSO4wqurYil27tHrqwRoRjpr
github.com/tklauser/numcpus v0.7.0/go.mod h1:bb6dMVcj8A42tSE7i32fsIUCbQNllK5iDguyOZRUzAY=
golang.design/x/thread v0.0.0-20210122121316-335e9adffdf1 h1:P7S/GeHBAFEZIYp0ePPs2kHXoazz8q2KsyxHyQVGCJg=
golang.design/x/thread v0.0.0-20210122121316-335e9adffdf1/go.mod h1:9CWpnTUmlQkfdpdutA1nNf4iE5lAVt3QZOu0Z6hahBE=
golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA=
golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs=
golang.org/x/exp v0.0.0-20240222234643-814bf88cf225 h1:LfspQV/FYTatPTr/3HzIcmiUFH7PGP+OQ6mgDYo3yuQ=
golang.org/x/exp v0.0.0-20240222234643-814bf88cf225/go.mod h1:CxmFvTBINI24O/j8iY7H1xHzx2i4OsyguNBmN/uPtqc=
golang.org/x/net v0.22.0 h1:9sGLhx7iRIHEiX0oAJ3MRZMUCElJgy7Br1nO+AMN3Tc=
golang.org/x/net v0.22.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
golang.org/x/crypto v0.18.0 h1:PGVlW0xEltQnzFZ55hkuX5+KLyrMYhHld1YHO4AKcdc=
golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg=
golang.org/x/exp v0.0.0-20240119083558-1b970713d09a h1:Q8/wZp0KX97QFTc2ywcOE0YRjZPVIx+MXInMzdvQqcA=
golang.org/x/exp v0.0.0-20240119083558-1b970713d09a/go.mod h1:idGWGoKP1toJGkd5/ig9ZLuPcZBC3ewk7SzmH0uou08=
golang.org/x/net v0.20.0 h1:aCL9BSgETF1k+blQaYUBx9hJ9LOGP3gAVemcZlf1Kpo=
golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY=
golang.org/x/sys v0.0.0-20210122093101-04d7465088b8/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4=
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU=
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI=
google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I=
google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@@ -6,7 +6,6 @@ import (
"encoding/json"
"fmt"
"io"
"maps"
"net/http"
"strconv"
"strings"
@@ -33,14 +32,10 @@ type RedfishReceiverClientConfig struct {
doPowerMetric bool
doProcessorMetrics bool
doSensors bool
doThermalMetrics bool
skipProcessorMetricsURL map[string]bool
// readSensorURLs stores for each chassis ID a list of sensor URLs to read
readSensorURLs map[string][]string
gofish gofish.ClientConfig
}
@@ -61,226 +56,7 @@ type RedfishReceiver struct {
wg sync.WaitGroup // wait group for redfish receiver
}
// deleteEmptyTags removes tags or meta data tags with empty value
func deleteEmptyTags(tags map[string]string) {
maps.DeleteFunc(
tags,
func(key string, value string) bool {
return value == ""
},
)
}
// setMetricValue sets the value entry in the fields map
func setMetricValue(value any) map[string]interface{} {
return map[string]interface{}{
"value": value,
}
}
// sendMetric sends the metric through the sink channel
func (r *RedfishReceiver) sendMetric(name string, tags map[string]string, meta map[string]string, value any, timestamp time.Time) {
deleteEmptyTags(tags)
deleteEmptyTags(meta)
y, err := lp.New(name, tags, meta, setMetricValue(value), timestamp)
if err == nil {
r.sink <- y
}
}
// readSensors reads sensors from a redfish device
// See: https://redfish.dmtf.org/schemas/v1/Sensor.json
// Redfish URI: /redfish/v1/Chassis/{ChassisId}/Sensors/{SensorId}
func (r *RedfishReceiver) readSensors(
clientConfig *RedfishReceiverClientConfig,
chassis *redfish.Chassis) error {
writeTemperatureSensor := func(sensor *redfish.Sensor) {
tags := map[string]string{
"hostname": clientConfig.Hostname,
"type": "node",
// ChassisType shall indicate the physical form factor for the type of chassis
"chassis_typ": string(chassis.ChassisType),
// Chassis name
"chassis_name": chassis.Name,
// ID uniquely identifies the resource
"sensor_id": sensor.ID,
// The area or device to which this sensor measurement applies
"temperature_physical_context": string(sensor.PhysicalContext),
// Name
"temperature_name": sensor.Name,
}
// Set meta data tags
meta := map[string]string{
"source": r.name,
"group": "Temperature",
"unit": "degC",
}
r.sendMetric("temperature", tags, meta, sensor.Reading, time.Now())
}
writeFanSpeedSensor := func(sensor *redfish.Sensor) {
tags := map[string]string{
"hostname": clientConfig.Hostname,
"type": "node",
// ChassisType shall indicate the physical form factor for the type of chassis
"chassis_typ": string(chassis.ChassisType),
// Chassis name
"chassis_name": chassis.Name,
// ID uniquely identifies the resource
"sensor_id": sensor.ID,
// The area or device to which this sensor measurement applies
"fan_physical_context": string(sensor.PhysicalContext),
// Name
"fan_name": sensor.Name,
}
// Set meta data tags
meta := map[string]string{
"source": r.name,
"group": "FanSpeed",
"unit": string(sensor.ReadingUnits),
}
r.sendMetric("fan_speed", tags, meta, sensor.Reading, time.Now())
}
writePowerSensor := func(sensor *redfish.Sensor) {
// Set tags
tags := map[string]string{
"hostname": clientConfig.Hostname,
"type": "node",
// ChassisType shall indicate the physical form factor for the type of chassis
"chassis_typ": string(chassis.ChassisType),
// Chassis name
"chassis_name": chassis.Name,
// ID uniquely identifies the resource
"sensor_id": sensor.ID,
// The area or device to which this sensor measurement applies
"power_physical_context": string(sensor.PhysicalContext),
// Name
"power_name": sensor.Name,
}
// Set meta data tags
meta := map[string]string{
"source": r.name,
"group": "Energy",
"unit": "watts",
}
r.sendMetric("power", tags, meta, sensor.Reading, time.Now())
}
if _, ok := clientConfig.readSensorURLs[chassis.ID]; !ok {
// First time run of read sensors for this chassis
clientConfig.readSensorURLs[chassis.ID] = make([]string, 0)
// Get sensor information for this chassis
sensors, err := chassis.Sensors()
if err != nil {
return fmt.Errorf("readSensors: chassis.Sensors() failed: %v", err)
}
// Skip empty sensors information
if sensors == nil {
return nil
}
for _, sensor := range sensors {
// Skip all sensors which are not in enabled state or which are unhealthy
if sensor.Status.State != common.EnabledState || sensor.Status.Health != common.OKHealth {
continue
}
// Skip sensors with missing readings units or type
if sensor.ReadingUnits == "" || sensor.ReadingType == "" {
continue
}
// Power readings
if (sensor.ReadingType == redfish.PowerReadingType && sensor.ReadingUnits == "Watts") ||
(sensor.ReadingType == redfish.CurrentReadingType && sensor.ReadingUnits == "Watts") {
if clientConfig.isExcluded["power"] {
continue
}
clientConfig.readSensorURLs[chassis.ID] = append(clientConfig.readSensorURLs[chassis.ID], sensor.ODataID)
writePowerSensor(sensor)
continue
}
// Fan speed readings
if (sensor.ReadingType == redfish.AirFlowReadingType && sensor.ReadingUnits == "RPM") ||
(sensor.ReadingType == redfish.AirFlowReadingType && sensor.ReadingUnits == "Percent") {
// Skip, when fan_speed metric is excluded
if clientConfig.isExcluded["fan_speed"] {
continue
}
clientConfig.readSensorURLs[chassis.ID] = append(clientConfig.readSensorURLs[chassis.ID], sensor.ODataID)
writeFanSpeedSensor(sensor)
}
// Temperature readings
if sensor.ReadingType == redfish.TemperatureReadingType && sensor.ReadingUnits == "C" {
if clientConfig.isExcluded["temperature"] {
continue
}
clientConfig.readSensorURLs[chassis.ID] = append(clientConfig.readSensorURLs[chassis.ID], sensor.ODataID)
writeTemperatureSensor(sensor)
continue
}
}
} else {
common.CollectCollection(
func(uri string) {
sensor, err := redfish.GetSensor(chassis.GetClient(), uri)
if err != nil {
cclog.ComponentError(r.name, "redfish.GetSensor() for uri '", uri, "' failed")
}
// Power readings
if (sensor.ReadingType == redfish.PowerReadingType && sensor.ReadingUnits == "Watts") ||
(sensor.ReadingType == redfish.CurrentReadingType && sensor.ReadingUnits == "Watts") {
writePowerSensor(sensor)
return
}
// Fan speed readings
if (sensor.ReadingType == redfish.AirFlowReadingType && sensor.ReadingUnits == "RPM") ||
(sensor.ReadingType == redfish.AirFlowReadingType && sensor.ReadingUnits == "Percent") {
writeFanSpeedSensor(sensor)
return
}
// Temperature readings
if sensor.ReadingType == redfish.TemperatureReadingType && sensor.ReadingUnits == "C" {
writeTemperatureSensor(sensor)
return
}
},
clientConfig.readSensorURLs[chassis.ID])
}
return nil
}
// readThermalMetrics reads thermal metrics from a redfish device
// See: https://redfish.dmtf.org/schemas/v1/Thermal.json
// Redfish URI: /redfish/v1/Chassis/{ChassisId}/Thermal
// -> deprecated in favor of the ThermalSubsystem schema
// -> on Lenovo servers /redfish/v1/Chassis/{ChassisId}/ThermalSubsystem/ThermalMetrics links to /redfish/v1/Chassis/{ChassisId}/Sensors/{SensorId}
func (r *RedfishReceiver) readThermalMetrics(
clientConfig *RedfishReceiverClientConfig,
chassis *redfish.Chassis) error {
@@ -330,6 +106,13 @@ func (r *RedfishReceiver) readThermalMetrics(
"temperature_name": temperature.Name,
}
// Delete empty tags
for key, value := range tags {
if value == "" {
delete(tags, key)
}
}
// Set meta data tags
meta := map[string]string{
"source": r.name,
@@ -340,7 +123,14 @@ func (r *RedfishReceiver) readThermalMetrics(
// ReadingCelsius shall be the current value of the temperature sensor's reading.
value := temperature.ReadingCelsius
r.sendMetric("temperature", tags, meta, value, timestamp)
y, err := lp.New("temperature", tags, meta,
map[string]interface{}{
"value": value,
},
timestamp)
if err == nil {
r.sink <- y
}
}
for _, fan := range thermal.Fans {
@@ -374,6 +164,13 @@ func (r *RedfishReceiver) readThermalMetrics(
"fan_name": fan.Name,
}
// Delete empty tags
for key, value := range tags {
if value == "" {
delete(tags, key)
}
}
// Set meta data tags
meta := map[string]string{
"source": r.name,
@@ -381,16 +178,23 @@ func (r *RedfishReceiver) readThermalMetrics(
"unit": string(fan.ReadingUnits),
}
r.sendMetric("fan_speed", tags, meta, fan.Reading, timestamp)
// Reading shall be the current value of the fan sensor's reading
value := fan.Reading
y, err := lp.New("fan_speed", tags, meta,
map[string]interface{}{
"value": value,
},
timestamp)
if err == nil {
r.sink <- y
}
}
return nil
}
// readPowerMetrics reads power metrics from a redfish device
// See: https://redfish.dmtf.org/schemas/v1/Power.json
// Redfish URI: /redfish/v1/Chassis/{ChassisId}/Power
// -> deprecated in favor of the PowerSubsystem schema
func (r *RedfishReceiver) readPowerMetrics(
clientConfig *RedfishReceiverClientConfig,
chassis *redfish.Chassis) error {
@@ -470,6 +274,13 @@ func (r *RedfishReceiver) readPowerMetrics(
"power_control_name": pc.Name,
}
// Delete empty tags
for key, value := range tags {
if value == "" {
delete(tags, key)
}
}
// Set meta data tags
meta := map[string]string{
"source": r.name,
@@ -478,8 +289,23 @@ func (r *RedfishReceiver) readPowerMetrics(
"unit": "watts",
}
// Delete empty meta data tags
for key, value := range meta {
if value == "" {
delete(meta, key)
}
}
for name, value := range metrics {
r.sendMetric(name, tags, meta, value, timestamp)
y, err := lp.New(name, tags, meta,
map[string]interface{}{
"value": value,
},
timestamp)
if err == nil {
r.sink <- y
}
}
}
@@ -488,7 +314,6 @@ func (r *RedfishReceiver) readPowerMetrics(
// readProcessorMetrics reads processor metrics from a redfish device
// See: https://redfish.dmtf.org/schemas/v1/ProcessorMetrics.json
// Redfish URI: /redfish/v1/Systems/{ComputerSystemId}/Processors/{ProcessorId}/ProcessorMetrics
func (r *RedfishReceiver) readProcessorMetrics(
clientConfig *RedfishReceiverClientConfig,
processor *redfish.Processor) error {
@@ -549,6 +374,13 @@ func (r *RedfishReceiver) readProcessorMetrics(
"processor_id": processor.ID,
}
// Delete empty tags
for key, value := range tags {
if value == "" {
delete(tags, key)
}
}
// Set meta data tags
metaPower := map[string]string{
"source": r.name,
@@ -561,7 +393,14 @@ func (r *RedfishReceiver) readProcessorMetrics(
if !clientConfig.isExcluded[namePower] &&
// Some servers return "ConsumedPowerWatt":65535 instead of "ConsumedPowerWatt":null
processorMetrics.ConsumedPowerWatt != 65535 {
r.sendMetric(namePower, tags, metaPower, processorMetrics.ConsumedPowerWatt, timestamp)
y, err := lp.New(namePower, tags, metaPower,
map[string]interface{}{
"value": processorMetrics.ConsumedPowerWatt,
},
timestamp)
if err == nil {
r.sink <- y
}
}
// Set meta data tags
metaThermal := map[string]string{
@@ -573,7 +412,14 @@ func (r *RedfishReceiver) readProcessorMetrics(
nameThermal := "temperature"
if !clientConfig.isExcluded[nameThermal] {
r.sendMetric(nameThermal, tags, metaThermal, processorMetrics.TemperatureCelsius, timestamp)
y, err := lp.New(nameThermal, tags, metaThermal,
map[string]interface{}{
"value": processorMetrics.TemperatureCelsius,
},
timestamp)
if err == nil {
r.sink <- y
}
}
return nil
}
@@ -606,8 +452,7 @@ func (r *RedfishReceiver) readMetrics(clientConfig *RedfishReceiverClientConfig)
// Get all chassis managed by this service
isChassisListRequired :=
clientConfig.doSensors ||
clientConfig.doThermalMetrics ||
clientConfig.doThermalMetrics ||
clientConfig.doPowerMetric
var chassisList []*redfish.Chassis
if isChassisListRequired {
@@ -627,16 +472,6 @@ func (r *RedfishReceiver) readMetrics(clientConfig *RedfishReceiverClientConfig)
}
}
// Read sensors
if clientConfig.doSensors {
for _, chassis := range chassisList {
err := r.readSensors(clientConfig, chassis)
if err != nil {
return err
}
}
}
// read thermal metrics
if clientConfig.doThermalMetrics {
for _, chassis := range chassisList {
@@ -801,7 +636,6 @@ func NewRedfishReceiver(name string, config json.RawMessage) (Receiver, error) {
// Globally disable collection of power, processor or thermal metrics
DisablePowerMetrics bool `json:"disable_power_metrics"`
DisableProcessorMetrics bool `json:"disable_processor_metrics"`
DisableSensors bool `json:"disable_sensors"`
DisableThermalMetrics bool `json:"disable_thermal_metrics"`
// Globally excluded metrics
@@ -816,7 +650,6 @@ func NewRedfishReceiver(name string, config json.RawMessage) (Receiver, error) {
// Per client disable collection of power,processor or thermal metrics
DisablePowerMetrics bool `json:"disable_power_metrics"`
DisableProcessorMetrics bool `json:"disable_processor_metrics"`
DisableSensors bool `json:"disable_sensors"`
DisableThermalMetrics bool `json:"disable_thermal_metrics"`
// Per client excluded metrics
@@ -933,9 +766,6 @@ func NewRedfishReceiver(name string, config json.RawMessage) (Receiver, error) {
doProcessorMetrics :=
!(configJSON.DisableProcessorMetrics ||
clientConfigJSON.DisableProcessorMetrics)
doSensors :=
!(configJSON.DisableSensors ||
clientConfigJSON.DisableSensors)
doThermalMetrics :=
!(configJSON.DisableThermalMetrics ||
clientConfigJSON.DisableThermalMetrics)
@@ -968,10 +798,8 @@ func NewRedfishReceiver(name string, config json.RawMessage) (Receiver, error) {
isExcluded: isExcluded,
doPowerMetric: doPowerMetric,
doProcessorMetrics: doProcessorMetrics,
doSensors: doSensors,
doThermalMetrics: doThermalMetrics,
skipProcessorMetricsURL: make(map[string]bool),
readSensorURLs: map[string][]string{},
gofish: gofish.ClientConfig{
Username: username,
Password: password,

View File

@@ -17,17 +17,15 @@ The Redfish receiver uses the [Redfish (specification)](https://www.dmtf.org/sta
"host_list": "n[1,2-4]"
},
{
"host_list": "n5",
"disable_power_metrics": true,
"disable_processor_metrics": true,
"disable_thermal_metrics": true
"host_list": "n5"
"disable_power_metrics": true
},
{
"host_list": "n6" ],
"username": "<Username 2>",
"password": "<Password 2>",
"endpoint": "https://%h-BMC",
"disable_sensor_metrics": true
"disable_thermal_metrics": true
}
]
}
@@ -43,18 +41,9 @@ Global settings:
Global and per redfish device settings (per redfish device settings overwrite the global settings):
- `disable_power_metrics`:
disable collection of power metrics
(`/redfish/v1/Chassis/{ChassisId}/Power`)
- `disable_processor_metrics`:
disable collection of processor metrics
(`/redfish/v1/Systems/{ComputerSystemId}/Processors/{ProcessorId}/ProcessorMetrics`)
- `disable_sensors`:
disable collection of fan, power and thermal sensor metrics
(`/redfish/v1/Chassis/{ChassisId}/Sensors/{SensorId}`)
- `disable_thermal_metrics`:
disable collection of thermal metrics
(`/redfish/v1/Chassis/{ChassisId}/Thermal`)
- `disable_power_metrics`: disable collection of power metrics
- `disable_processor_metrics`: disable collection of processor metrics
- `disable_thermal_metrics`: disable collection of thermal metrics
- `exclude_metrics`: list of excluded metrics
- `username`: User name to authenticate with
- `password`: Password to use for authentication

View File

@@ -1,372 +0,0 @@
package sinks
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"net"
"os"
"sync"
"time"
cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
"github.com/go-mqtt/mqtt"
influx "github.com/influxdata/line-protocol/v2/lineprotocol"
"golang.org/x/exp/slices"
)
type MqttSinkConfig struct {
// defines JSON tags for 'type' and 'meta_as_tags' (string list)
// See: metricSink.go
defaultSinkConfig
// Additional config options, for MqttSink
ClientID string `json:"client_id"`
PersistenceDirectory string `json:"persistence_directory,omitempty"`
// Maximum number of points sent to server in single request.
// Default: 1000
BatchSize int `json:"batch_size,omitempty"`
// Time interval for delayed sending of metrics.
// If the buffers are already filled before the end of this interval,
// the metrics are sent without further delay.
// Default: 1s
FlushInterval string `json:"flush_delay,omitempty"`
flushDelay time.Duration
DialProtocol string `json:"dial_protocol"`
Hostname string `json:"hostname"`
Port int `json:"port"`
PauseTimeout string `json:"pause_timeout"`
pauseTimeout time.Duration
KeepAlive uint16 `json:"keep_alive_seconds"`
Username string `json:"username,omitempty"`
Password string `json:"password,omitempty"`
}
type MqttSink struct {
// declares elements 'name' and 'meta_as_tags' (string to bool map!)
sink
config MqttSinkConfig // entry point to the MqttSinkConfig
// influx line protocol encoder
encoder influx.Encoder
// number of records stored in the encoder
numRecordsInEncoder int
// List of tags and meta data tags which should be used as tags
extended_tag_list []key_value_pair
// Flush() runs in another goroutine and accesses the influx line protocol encoder,
// so this encoderLock has to protect the encoder and numRecordsInEncoder
encoderLock sync.Mutex
// timer to run Flush()
flushTimer *time.Timer
// Lock to assure that only one timer is running at a time
timerLock sync.Mutex
// WaitGroup to ensure only one send operation is running at a time
sendWaitGroup sync.WaitGroup
client *mqtt.Client
mqttconfig mqtt.Config
}
// Implement functions required for Sink interface
// Write(...), Flush(), Close()
// See: metricSink.go
// Code to submit a single CCMetric to the sink
func (s *MqttSink) Write(m lp.CCMetric) error {
// Lock for encoder usage
s.encoderLock.Lock()
// Encode measurement name
s.encoder.StartLine(m.Name())
// copy tags and meta data which should be used as tags
s.extended_tag_list = s.extended_tag_list[:0]
for key, value := range m.Tags() {
s.extended_tag_list =
append(
s.extended_tag_list,
key_value_pair{
key: key,
value: value,
},
)
}
for _, key := range s.config.MetaAsTags {
if value, ok := m.GetMeta(key); ok {
s.extended_tag_list =
append(
s.extended_tag_list,
key_value_pair{
key: key,
value: value,
},
)
}
}
// Encode tags (they musts be in lexical order)
slices.SortFunc(
s.extended_tag_list,
func(a key_value_pair, b key_value_pair) int {
if a.key < b.key {
return -1
}
if a.key > b.key {
return +1
}
return 0
},
)
for i := range s.extended_tag_list {
s.encoder.AddTag(
s.extended_tag_list[i].key,
s.extended_tag_list[i].value,
)
}
// Encode fields
for key, value := range m.Fields() {
s.encoder.AddField(key, influx.MustNewValue(value))
}
// Encode time stamp
s.encoder.EndLine(m.Time())
// Check for encoder errors
if err := s.encoder.Err(); err != nil {
// Unlock encoder usage
s.encoderLock.Unlock()
return fmt.Errorf("encoding failed: %v", err)
}
s.numRecordsInEncoder++
if s.config.flushDelay == 0 {
// Unlock encoder usage
s.encoderLock.Unlock()
// Directly flush if no flush delay is configured
return s.Flush()
} else if s.numRecordsInEncoder == s.config.BatchSize {
// Unlock encoder usage
s.encoderLock.Unlock()
// Stop flush timer
if s.flushTimer != nil {
if ok := s.flushTimer.Stop(); ok {
cclog.ComponentDebug(s.name, "Write(): Stopped flush timer. Batch size limit reached before flush delay")
s.timerLock.Unlock()
}
}
// Flush if batch size is reached
return s.Flush()
} else if s.timerLock.TryLock() {
// Setup flush timer when flush delay is configured
// and no other timer is already running
if s.flushTimer != nil {
// Restarting existing flush timer
cclog.ComponentDebug(s.name, "Write(): Restarting flush timer")
s.flushTimer.Reset(s.config.flushDelay)
} else {
// Creating and starting flush timer
cclog.ComponentDebug(s.name, "Write(): Starting new flush timer")
s.flushTimer = time.AfterFunc(
s.config.flushDelay,
func() {
defer s.timerLock.Unlock()
cclog.ComponentDebug(s.name, "Starting flush triggered by flush timer")
if err := s.Flush(); err != nil {
cclog.ComponentError(s.name, "Flush triggered by flush timer: flush failed:", err)
}
})
}
}
// Unlock encoder usage
s.encoderLock.Unlock()
return nil
}
// If the sink uses batched sends internally, you can tell to flush its buffers
func (s *MqttSink) Flush() error {
// Lock for encoder usage
// Own lock for as short as possible: the time it takes to clone the buffer.
s.encoderLock.Lock()
buf := slices.Clone(s.encoder.Bytes())
numRecordsInBuf := s.numRecordsInEncoder
s.encoder.Reset()
s.numRecordsInEncoder = 0
// Unlock encoder usage
s.encoderLock.Unlock()
if len(buf) == 0 {
return nil
}
cclog.ComponentDebug(s.name, "Flush(): Flushing", numRecordsInBuf, "metrics")
// Asynchron send of encoder metrics
s.sendWaitGroup.Add(1)
go func() {
defer s.sendWaitGroup.Done()
//startTime := time.Now()
for {
exchange, err := s.client.PublishAtLeastOnce(buf, s.config.ClientID)
switch {
case err == nil:
return
case mqtt.IsDeny(err), errors.Is(err, mqtt.ErrClosed):
return
case errors.Is(err, mqtt.ErrMax):
time.Sleep(s.config.pauseTimeout)
default:
time.Sleep(s.config.pauseTimeout)
continue
}
for err := range exchange {
if errors.Is(err, mqtt.ErrClosed) {
return
}
}
return
}
}()
return nil
}
// Close sink: close network connection, close files, close libraries, ...
func (s *MqttSink) Close() {
cclog.ComponentDebug(s.name, "CLOSE")
// Stop existing timer and immediately flush
if s.flushTimer != nil {
if ok := s.flushTimer.Stop(); ok {
s.timerLock.Unlock()
}
}
// Flush
if err := s.Flush(); err != nil {
cclog.ComponentError(s.name, "Close():", "Flush failed:", err)
}
// Wait for send operations to finish
s.sendWaitGroup.Wait()
s.client.Close()
s.client = nil
}
// New function to create a new instance of the sink
// Initialize the sink by giving it a name and reading in the config JSON
func NewMqttSink(name string, config json.RawMessage) (Sink, error) {
s := new(MqttSink)
// Set name of sampleSink
// The name should be chosen in such a way that different instances of MqttSink can be distinguished
s.name = fmt.Sprintf("MqttSink(%s)", name) // Always specify a name here
// Set defaults in s.config
// Allow overwriting these defaults by reading config JSON
s.config.PauseTimeout = "4s"
s.config.pauseTimeout = time.Duration(4) * time.Second
s.config.DialProtocol = "tcp"
s.config.Hostname = "localhost"
s.config.Port = 1883
// Read in the config JSON
if len(config) > 0 {
d := json.NewDecoder(bytes.NewReader(config))
d.DisallowUnknownFields()
if err := d.Decode(&s.config); err != nil {
cclog.ComponentError(s.name, "Error reading config:", err.Error())
return nil, err
}
}
// Create lookup map to use meta infos as tags in the output metric
s.meta_as_tags = make(map[string]bool)
for _, k := range s.config.MetaAsTags {
s.meta_as_tags[k] = true
}
// Check if all required fields in the config are set
// E.g. use 'len(s.config.Option) > 0' for string settings
if t, err := time.ParseDuration(s.config.PauseTimeout); err == nil {
s.config.pauseTimeout = t
} else {
err := fmt.Errorf("to parse duration for PauseTimeout: %s", s.config.PauseTimeout)
cclog.ComponentError(s.name, err.Error())
return nil, err
}
if t, err := time.ParseDuration(s.config.FlushInterval); err == nil {
s.config.flushDelay = t
} else {
err := fmt.Errorf("to parse duration for FlushInterval: %s", s.config.FlushInterval)
cclog.ComponentError(s.name, err.Error())
return nil, err
}
switch s.config.DialProtocol {
case "tcp", "udp":
default:
err := errors.New("failed to parse dial protocol, allowed: tcp, udp")
cclog.ComponentError(s.name, err.Error())
return nil, err
}
var persistence mqtt.Persistence
if len(s.config.PersistenceDirectory) > 0 {
persistence = mqtt.FileSystem(s.config.PersistenceDirectory)
} else {
tmpdir, err := os.MkdirTemp("", "mqtt")
if err == nil {
persistence = mqtt.FileSystem(tmpdir)
}
}
// Establish connection to the server, library, ...
// Check required files exist and lookup path(s) of executable(s)
dialer := mqtt.NewDialer(s.config.DialProtocol, net.JoinHostPort(s.config.Hostname, fmt.Sprintf("%d", s.config.Port)))
s.mqttconfig = mqtt.Config{
Dialer: dialer,
PauseTimeout: s.config.pauseTimeout,
KeepAlive: uint16(s.config.KeepAlive),
}
if len(s.config.Username) > 0 {
s.mqttconfig.UserName = s.config.Username
}
if len(s.config.Password) > 0 {
s.mqttconfig.Password = []byte(s.config.Password)
}
client, err := mqtt.InitSession(s.config.ClientID, persistence, &s.mqttconfig)
if err != nil {
return nil, err
}
s.client = client
// Return (nil, meaningful error message) in case of errors
return s, nil
}

View File

@@ -1,39 +0,0 @@
## `mqtt` sink
The `mqtt` sink publishes all metrics into a MQTT network.
### Configuration structure
```json
{
"<name>": {
"type": "mqtt",
"client_id" : "myid",
"persistence_directory": "/tmp",
"batch_size": 1000,
"flush_delay": "1s",
"dial_protocol": "tcp",
"host": "dbhost.example.com",
"port": 1883,
"user": "exampleuser",
"password" : "examplepw",
"pause_timeout": "1s",
"keep_alive_seconds": 10,
"meta_as_tags" : [],
}
}
```
- `type`: makes the sink an `mqtt` sink
- `client_id`: MQTT clients use a client_id to talk to the broker
- `persistence_directory`: MQTT stores messages temporarly on disc if the broker is not available. Folder needs to be writable (default: `/tmp`)
- `pause_timeout`: Waittime when published failed
- `keep_alive_seconds`: Keep the connection alive for some time. Recommended to be longer than global `interval`.
- `flush_delay`: Group metrics coming in to a single batch
- `batch_size`: Maximal batch size. If `batch_size` is reached before the end of `flush_delay`, the metrics are sent without further delay
- `dial_protocol`: Use `tcp` or `udp` for the MQTT communication
- `host`: Hostname of the MQTT broker
- `port`: Port number of the MQTT broker
- `user`: Username for authentication
- `password`: Password for authentication
- `meta_as_tags`: print all meta information as tags in the output (optional)

View File

@@ -21,7 +21,6 @@ var AvailableSinks = map[string]func(name string, config json.RawMessage) (Sink,
"influxdb": NewInfluxSink,
"influxasync": NewInfluxAsyncSink,
"http": NewHttpSink,
"mqtt": NewMqttSink,
}
// Metric collector manager data structure