Compare commits

..

31 Commits

Author SHA1 Message Date
Thomas Roehl
1ce40aea16 Add AppMetricReceiver 2022-11-29 13:44:20 +01:00
Holger Obermaier
5918f96fd8 Minimal formating changes 2022-11-24 09:48:44 +01:00
Holger Obermaier
8cb87a2165 Add IPMI receiver example configuration to receivers.json 2022-11-23 10:37:31 +01:00
Holger Obermaier
3e91a37dee remove prefix enumeration like 01-... 2022-11-22 17:02:29 +01:00
Holger Obermaier
ed68baeada Add IPMI metric: current 2022-11-22 15:32:41 +01:00
Holger Obermaier
888db31dbf Try to use common metric tags across hardware vendors 2022-11-22 15:09:56 +01:00
Holger Obermaier
c938d32629 Add go.mod to App dependency 2022-11-22 09:45:29 +01:00
Holger Obermaier
d5daf54d4f Update to latest version of included go modules 2022-11-22 09:42:04 +01:00
Holger Obermaier
18bffd7c14 Add documentaion for IPMI receiver 2022-11-21 13:58:30 +01:00
Holger Obermaier
bd0105b370 Added config option to add ipmi-sensors command line options 2022-11-21 13:02:46 +01:00
Holger Obermaier
b1a8674c4c Add receiver to gather remote IPMI sensor metrics 2022-11-18 16:55:11 +01:00
Holger Obermaier
234ad3c54e Fix kernel panic for receiver config with missing receiver type 2022-11-17 11:33:13 +01:00
Holger Obermaier
7bb80780e0 Fixed computing number of physical packages for non continous physical package IDs (e.g. on Ampere Altra Q80-30) 2022-11-16 14:58:11 +01:00
Holger Obermaier
e66d52bb32 * Corrected json config in numastatsMetric.md
* Added some debug output to numastatsMetric.go
2022-11-16 14:10:25 +01:00
Holger Obermaier
9840d0193d Do not mess up with the orignal configuration 2022-11-16 09:37:40 +01:00
Holger Obermaier
ce7eef8d30 Add running average power limit (RAPL) metric collector 2022-11-15 17:15:27 +01:00
Holger Obermaier
92e45ca62c Add running average power limit (RAPL) metric collector 2022-11-15 17:09:26 +01:00
Holger Obermaier
fd10a279fc Corrected some typos 2022-11-14 09:35:02 +01:00
Holger Obermaier
9e63d0ea59 Run ipmitool asynchron. Improved error handling. 2022-11-11 16:16:14 +01:00
Thomas Gruber
76bb033a88 Update README.md 2022-11-04 14:52:09 +01:00
Holger Obermaier
deb1bcfa2f Correct type: /proc/stats -> /proc/stat 2022-10-13 15:01:39 +02:00
Holger Obermaier
7a67d5e25f Check if at least one CPU with frequency information was detected 2022-10-13 14:53:55 +02:00
Thomas Gruber
9ae0806aa9 Add collector for monitoring the execution of cc-metric-collector itself (#81)
* Add collector to monitor execution of cc-metric-collector itself

* Register SelfCollector

* Fix import paths for moved packages
2022-10-10 12:18:52 +02:00
Thomas Gruber
4bd71224df move maybe-usable-by-other-cc-components to pkg. Fix all files to use the new paths (#88) 2022-10-10 11:53:11 +02:00
Thomas Roehl
6bf3bfd10a Use lower case for error strings in RocmSmiCollector 2022-10-09 17:05:49 +02:00
Thomas Gruber
0fbff00996 Replace ioutils with os and io (#87) 2022-10-09 17:03:38 +02:00
Thomas Roehl
8849824ba9 Remove useless prints from MemstatCollector 2022-10-09 02:56:15 +02:00
Thomas Roehl
ed511b7c09 Fix memstat collector with numa_stats option 2022-09-28 15:09:36 +02:00
Thomas Roehl
a0acf01dc3 Build DEB package for Ubuntu 20.04 for releases 2022-09-28 12:19:36 +02:00
Thomas Roehl
58461f1f72 Fix clock frequency coming from LikwidCollector and update docs 2022-09-09 20:01:21 +02:00
Thomas Röhl
c09d8fb118 InfiniBandCollector: Scale raw readings from octets to bytes 2022-09-09 19:27:20 +02:00
23 changed files with 484 additions and 867 deletions

View File

@@ -1,29 +0,0 @@
{
"title": "cc-metric-collector",
"description": "Monitoring agent for ClusterCockpit.",
"creators": [
{
"affiliation": "Regionales Rechenzentrum Erlangen, Friedrich-Alexander-Universität Erlangen-Nürnberg",
"name": "Thomas Gruber",
"orcid": "0000-0001-5560-6964"
},
{
"affiliation": "Steinbuch Centre for Computing, Karlsruher Institut für Technologie",
"name": "Holger Obermaier",
"orcid": "0000-0002-6830-6626"
}
],
"upload_type": "software",
"license": "MIT",
"access_right": "open",
"keywords": [
"performance-monitoring",
"cluster-monitoring",
"open-source"
],
"communities": [
{
"identifier": "clustercockpit"
}
]
}

View File

@@ -84,7 +84,7 @@ RPM: scripts/cc-metric-collector.spec
@COMMITISH="HEAD"
@VERS=$$(git describe --tags $${COMMITISH})
@VERS=$${VERS#v}
@VERS=$$(echo $${VERS} | sed -e s+'-'+'_'+g)
@VERS=$$(echo $$VERS | sed -e s+'-'+'_'+g)
@eval $$(rpmspec --query --queryformat "NAME='%{name}' VERSION='%{version}' RELEASE='%{release}' NVR='%{NVR}' NVRA='%{NVRA}'" --define="VERS $${VERS}" "$${SPECFILE}")
@PREFIX="$${NAME}-$${VERSION}"
@FORMAT="tar.gz"
@@ -96,8 +96,10 @@ RPM: scripts/cc-metric-collector.spec
@if [[ "$${GITHUB_ACTIONS}" == true ]]; then
@ RPMFILE="$${RPMDIR}/$${ARCH}/$${NVRA}.rpm"
@ SRPMFILE="$${SRPMDIR}/$${NVR}.src.rpm"
@ echo "SRPM=$${SRPMFILE}" >> $${GITHUB_OUTPUT}
@ echo "RPM=$${RPMFILE}" >> $${GITHUB_OUTPUT}
@ echo "RPM: $${RPMFILE}"
@ echo "SRPM: $${SRPMFILE}"
@ echo "::set-output name=SRPM::$${SRPMFILE}"
@ echo "::set-output name=RPM::$${RPMFILE}"
@fi
.PHONY: DEB
@@ -106,25 +108,21 @@ DEB: scripts/cc-metric-collector.deb.control $(APP)
@WORKSPACE=$${PWD}/.dpkgbuild
@DEBIANDIR=$${WORKSPACE}/debian
@DEBIANBINDIR=$${WORKSPACE}/DEBIAN
@mkdir --parents --verbose $${WORKSPACE} $${DEBIANBINDIR}
@mkdir --parents --verbose $$WORKSPACE $$DEBIANBINDIR
#@mkdir --parents --verbose $$DEBIANDIR
@CONTROLFILE="$${BASEDIR}/scripts/cc-metric-collector.deb.control"
@COMMITISH="HEAD"
@VERS=$$(git describe --tags --abbrev=0 $${COMMITISH})
@if [ -z "$${VERS}" ]; then VERS=${GITHUB_REF_NAME}; fi
@VERS=$${VERS#v}
@VERS=$$(echo $${VERS} | sed -e s+'-'+'_'+g)
@VERS=$$(echo $$VERS | sed -e s+'-'+'_'+g)
@ARCH=$$(uname -m)
@ARCH=$$(echo $${ARCH} | sed -e s+'_'+'-'+g)
@if [ "$${ARCH}" = "x86-64" ]; then ARCH=amd64; fi
@ARCH=$$(echo $$ARCH | sed -e s+'_'+'-'+g)
@PREFIX="$${NAME}-$${VERSION}_$${ARCH}"
@SIZE_BYTES=$$(du -bcs --exclude=.dpkgbuild "$${WORKSPACE}"/ | awk '{print $$1}' | head -1 | sed -e 's/^0\+//')
@SIZE="$$(awk -v size="$${SIZE_BYTES}" 'BEGIN {print (size/1024)+1}' | awk '{print int($$0)}')"
@sed -e s+"{VERSION}"+"$${VERS}"+g -e s+"{INSTALLED_SIZE}"+"$${SIZE}"+g -e s+"{ARCH}"+"$${ARCH}"+g $${CONTROLFILE} > $${DEBIANBINDIR}/control
@SIZE_BYTES=$$(du -bcs --exclude=.dpkgbuild "$$WORKSPACE"/ | awk '{print $$1}' | head -1 | sed -e 's/^0\+//')
@SIZE="$$(awk -v size="$$SIZE_BYTES" 'BEGIN {print (size/1024)+1}' | awk '{print int($$0)}')"
#@sed -e s+"{VERSION}"+"$$VERS"+g -e s+"{INSTALLED_SIZE}"+"$$SIZE"+g -e s+"{ARCH}"+"$$ARCH"+g $$CONTROLFILE > $${DEBIANDIR}/control
@sed -e s+"{VERSION}"+"$$VERS"+g -e s+"{INSTALLED_SIZE}"+"$$SIZE"+g -e s+"{ARCH}"+"$$ARCH"+g $$CONTROLFILE > $${DEBIANBINDIR}/control
@make PREFIX=$${WORKSPACE} install
@DEB_FILE="cc-metric-collector_$${VERS}_$${ARCH}.deb"
@dpkg-deb -b $${WORKSPACE} "$${DEB_FILE}"
@if [ "$${GITHUB_ACTIONS}" = "true" ]; then
@ echo "DEB=$${DEB_FILE}" >> $${GITHUB_OUTPUT}
@fi
@dpkg-deb -b $${WORKSPACE} "$$DEB_FILE"
@rm -r "$${WORKSPACE}"

View File

@@ -8,10 +8,6 @@ There is a single timer loop that triggers all collectors serially, collects the
The receiver runs as a go routine side-by-side with the timer loop and asynchronously forwards received metrics to the sink.
[![DOI](https://zenodo.org/badge/DOI/10.5281/zenodo.7438287.svg)](https://doi.org/10.5281/zenodo.7438287)
# Configuration
Configuration is implemented using a single json document that is distributed over network and may be persisted as file.

View File

@@ -1,5 +1,5 @@
# LIKWID version
LIKWID_VERSION = 5.2.2
LIKWID_VERSION = 5.2.1
LIKWID_INSTALLED_FOLDER=$(shell dirname $(shell which likwid-topology 2>/dev/null) 2>/dev/null)
LIKWID_FOLDER="$(shell pwd)/likwid"

View File

@@ -12,6 +12,7 @@ import (
"strconv"
"strings"
"time"
cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
)

View File

@@ -15,7 +15,6 @@ import (
"math"
"os"
"os/signal"
"os/user"
"sort"
"strconv"
"strings"
@@ -29,15 +28,12 @@ import (
lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
topo "github.com/ClusterCockpit/cc-metric-collector/pkg/ccTopology"
"github.com/NVIDIA/go-nvml/pkg/dl"
"golang.design/x/thread"
fsnotify "gopkg.in/fsnotify.v0"
)
const (
LIKWID_LIB_NAME = "liblikwid.so"
LIKWID_LIB_DL_FLAGS = dl.RTLD_LAZY | dl.RTLD_GLOBAL
LIKWID_DEF_ACCESSMODE = "direct"
LIKWID_DEF_LOCKFILE = "/var/run/likwid.lock"
)
type LikwidCollectorMetricConfig struct {
@@ -71,25 +67,22 @@ type LikwidCollectorConfig struct {
AccessMode string `json:"access_mode,omitempty"`
DaemonPath string `json:"accessdaemon_path,omitempty"`
LibraryPath string `json:"liblikwid_path,omitempty"`
LockfilePath string `json:"lockfile_path,omitempty"`
}
type LikwidCollector struct {
metricCollector
cpulist []C.int
cpu2tid map[int]int
sock2tid map[int]int
metrics map[C.int]map[string]int
groups []C.int
config LikwidCollectorConfig
gmresults map[int]map[string]float64
basefreq float64
running bool
initialized bool
needs_reinit bool
likwidGroups map[C.int]LikwidEventsetConfig
lock sync.Mutex
measureThread thread.Thread
cpulist []C.int
cpu2tid map[int]int
sock2tid map[int]int
metrics map[C.int]map[string]int
groups []C.int
config LikwidCollectorConfig
gmresults map[int]map[string]float64
basefreq float64
running bool
initialized bool
likwidGroups map[C.int]LikwidEventsetConfig
lock sync.Mutex
}
type LikwidMetric struct {
@@ -99,18 +92,6 @@ type LikwidMetric struct {
group_idx int
}
func checkMetricType(t string) bool {
valid := map[string]bool{
"node": true,
"socket": true,
"hwthread": true,
"core": true,
"memoryDomain": true,
}
_, ok := valid[t]
return ok
}
func eventsToEventStr(events map[string]string) string {
elist := make([]string, 0)
for k, v := range events {
@@ -198,11 +179,9 @@ func (m *LikwidCollector) Init(config json.RawMessage) error {
m.name = "LikwidCollector"
m.parallel = false
m.initialized = false
m.needs_reinit = true
m.running = false
m.config.AccessMode = LIKWID_DEF_ACCESSMODE
m.config.LibraryPath = LIKWID_LIB_NAME
m.config.LockfilePath = LIKWID_DEF_LOCKFILE
if len(config) > 0 {
err := json.Unmarshal(config, &m.config)
if err != nil {
@@ -260,16 +239,12 @@ func (m *LikwidCollector) Init(config json.RawMessage) error {
}
for _, metric := range evset.Metrics {
// Try to evaluate the metric
cclog.ComponentDebug(m.name, "Checking", metric.Name)
if !checkMetricType(metric.Type) {
cclog.ComponentError(m.name, "Metric", metric.Name, "uses invalid type", metric.Type)
metric.Calc = ""
} else if !testLikwidMetricFormula(metric.Calc, params) {
cclog.ComponentError(m.name, "Metric", metric.Name, "cannot be calculated with given counters")
metric.Calc = ""
} else {
if testLikwidMetricFormula(metric.Calc, params) {
// Add the computable metric to the parameter list for the global metrics
globalParams = append(globalParams, metric.Name)
totalMetrics++
} else {
metric.Calc = ""
}
}
} else {
@@ -279,14 +254,8 @@ func (m *LikwidCollector) Init(config json.RawMessage) error {
}
for _, metric := range m.config.Metrics {
// Try to evaluate the global metric
if !checkMetricType(metric.Type) {
cclog.ComponentError(m.name, "Metric", metric.Name, "uses invalid type", metric.Type)
metric.Calc = ""
} else if !testLikwidMetricFormula(metric.Calc, globalParams) {
cclog.ComponentError(m.name, "Metric", metric.Name, "cannot be calculated with given counters")
metric.Calc = ""
} else if !checkMetricType(metric.Type) {
cclog.ComponentError(m.name, "Metric", metric.Name, "has invalid type")
if !testLikwidMetricFormula(metric.Calc, globalParams) {
cclog.ComponentError(m.name, "Calculation for metric", metric.Name, "failed")
metric.Calc = ""
} else {
totalMetrics++
@@ -299,194 +268,56 @@ func (m *LikwidCollector) Init(config json.RawMessage) error {
cclog.ComponentError(m.name, err.Error())
return err
}
ret := C.topology_init()
if ret != 0 {
err := errors.New("failed to initialize topology module")
cclog.ComponentError(m.name, err.Error())
return err
}
switch m.config.AccessMode {
case "direct":
C.HPMmode(0)
case "accessdaemon":
if len(m.config.DaemonPath) > 0 {
p := os.Getenv("PATH")
os.Setenv("PATH", m.config.DaemonPath+":"+p)
}
C.HPMmode(1)
for _, c := range m.cpulist {
C.HPMaddThread(c)
}
}
m.sock2tid = make(map[int]int)
tmp := make([]C.int, 1)
for _, sid := range topo.SocketList() {
cstr := C.CString(fmt.Sprintf("S%d:0", sid))
ret = C.cpustr_to_cpulist(cstr, &tmp[0], 1)
if ret > 0 {
m.sock2tid[sid] = m.cpu2tid[int(tmp[0])]
}
C.free(unsafe.Pointer(cstr))
}
m.basefreq = getBaseFreq()
m.measureThread = thread.New()
m.init = true
return nil
}
// take a measurement for 'interval' seconds of event set index 'group'
func (m *LikwidCollector) takeMeasurement(evidx int, evset LikwidEventsetConfig, interval time.Duration) (bool, error) {
func (m *LikwidCollector) takeMeasurement(evset LikwidEventsetConfig, interval time.Duration) (bool, error) {
var ret C.int
var gid C.int = -1
sigchan := make(chan os.Signal, 1)
watcher, err := fsnotify.NewWatcher()
if err != nil {
cclog.ComponentError(m.name, err.Error())
}
defer watcher.Close()
if len(m.config.LockfilePath) > 0 {
info, err := os.Stat(m.config.LockfilePath)
if err != nil {
return true, err
}
stat := info.Sys().(*syscall.Stat_t)
if stat.Uid != uint32(os.Getuid()) {
usr, err := user.LookupId(strconv.FormatUint(uint64(stat.Uid), 10))
if err == nil {
return true, fmt.Errorf("Access to performance counters locked by %s", usr.Username)
} else {
return true, fmt.Errorf("Access to performance counters locked by %d", stat.Uid)
}
}
err = watcher.Watch(m.config.LockfilePath)
if err != nil {
cclog.ComponentError(m.name, err.Error())
}
}
m.lock.Lock()
defer m.lock.Unlock()
select {
case e := <-watcher.Event:
ret = -1
if !e.IsAttrib() {
ret = C.perfmon_init(C.int(len(m.cpulist)), &m.cpulist[0])
}
default:
ret = C.perfmon_init(C.int(len(m.cpulist)), &m.cpulist[0])
}
if ret != 0 {
return true, fmt.Errorf("failed to initialize library, error %d", ret)
}
signal.Notify(sigchan, os.Interrupt)
signal.Notify(sigchan, syscall.SIGCHLD)
select {
case <-sigchan:
gid = -1
case e := <-watcher.Event:
gid = -1
if !e.IsAttrib() {
gid = C.perfmon_addEventSet(evset.estr)
}
default:
gid = C.perfmon_addEventSet(evset.estr)
}
if gid < 0 {
return true, fmt.Errorf("failed to add events %s, error %d", evset.go_estr, gid)
} else {
evset.gid = gid
//m.likwidGroups[gid] = evset
}
select {
case <-sigchan:
ret = -1
case e := <-watcher.Event:
if !e.IsAttrib() {
ret = C.perfmon_setupCounters(gid)
}
default:
ret = C.perfmon_setupCounters(gid)
}
if ret != 0 {
return true, fmt.Errorf("failed to setup events '%s', error %d", evset.go_estr, ret)
}
select {
case <-sigchan:
ret = -1
case e := <-watcher.Event:
if !e.IsAttrib() {
ret = C.perfmon_startCounters()
}
default:
ret = C.perfmon_startCounters()
}
if ret != 0 {
return true, fmt.Errorf("failed to start events '%s', error %d", evset.go_estr, ret)
}
select {
case <-sigchan:
ret = -1
case e := <-watcher.Event:
if !e.IsAttrib() {
ret = C.perfmon_readCounters()
}
default:
ret = C.perfmon_readCounters()
}
if ret != 0 {
return true, fmt.Errorf("failed to read events '%s', error %d", evset.go_estr, ret)
}
time.Sleep(interval)
select {
case <-sigchan:
ret = -1
case e := <-watcher.Event:
if !e.IsAttrib() {
ret = C.perfmon_readCounters()
}
default:
ret = C.perfmon_readCounters()
}
if ret != 0 {
return true, fmt.Errorf("failed to read events '%s', error %d", evset.go_estr, ret)
}
for eidx, counter := range evset.eorder {
gctr := C.GoString(counter)
for _, tid := range m.cpu2tid {
res := C.perfmon_getLastResult(gid, C.int(eidx), C.int(tid))
fres := float64(res)
if m.config.InvalidToZero && (math.IsNaN(fres) || math.IsInf(fres, 0)) {
fres = 0.0
if m.initialized {
ret = C.perfmon_setupCounters(evset.gid)
if ret != 0 {
var err error = nil
var skip bool = false
if ret == -37 {
skip = true
} else {
err = fmt.Errorf("failed to setup performance group %d", evset.gid)
}
evset.results[tid][gctr] = fres
m.lock.Unlock()
return skip, err
}
}
for _, tid := range m.cpu2tid {
evset.results[tid]["time"] = float64(C.perfmon_getLastTimeOfGroup(gid))
}
select {
case <-sigchan:
ret = -1
case e := <-watcher.Event:
if !e.IsAttrib() {
ret = C.perfmon_stopCounters()
ret = C.perfmon_startCounters()
if ret != 0 {
var err error = nil
var skip bool = false
if ret == -37 {
skip = true
} else {
err = fmt.Errorf("failed to setup performance group %d", evset.gid)
}
m.lock.Unlock()
return skip, err
}
default:
m.running = true
time.Sleep(interval)
m.running = false
ret = C.perfmon_stopCounters()
}
if ret != 0 {
return true, fmt.Errorf("failed to stop events '%s', error %d", evset.go_estr, ret)
}
signal.Stop(sigchan)
select {
case e := <-watcher.Event:
if !e.IsAttrib() {
C.perfmon_finalize()
if ret != 0 {
var err error = nil
var skip bool = false
if ret == -37 {
skip = true
} else {
err = fmt.Errorf("failed to setup performance group %d", evset.gid)
}
m.lock.Unlock()
return skip, err
}
default:
C.perfmon_finalize()
}
m.lock.Unlock()
return false, nil
}
@@ -494,8 +325,19 @@ func (m *LikwidCollector) takeMeasurement(evidx int, evset LikwidEventsetConfig,
func (m *LikwidCollector) calcEventsetMetrics(evset LikwidEventsetConfig, interval time.Duration, output chan lp.CCMetric) error {
invClock := float64(1.0 / m.basefreq)
for _, tid := range m.cpu2tid {
evset.results[tid]["inverseClock"] = invClock
// Go over events and get the results
for eidx, counter := range evset.eorder {
gctr := C.GoString(counter)
for _, tid := range m.cpu2tid {
res := C.perfmon_getLastResult(evset.gid, C.int(eidx), C.int(tid))
fres := float64(res)
if m.config.InvalidToZero && (math.IsNaN(fres) || math.IsInf(fres, 0)) {
fres = 0.0
}
evset.results[tid][gctr] = fres
evset.results[tid]["time"] = interval.Seconds()
evset.results[tid]["inverseClock"] = invClock
}
}
// Go over the event set metrics, derive the value out of the event:counter values and send it
@@ -541,7 +383,7 @@ func (m *LikwidCollector) calcEventsetMetrics(evset LikwidEventsetConfig, interv
}
// Go over the global metrics, derive the value out of the event sets' metric values and send it
func (m *LikwidCollector) calcGlobalMetrics(groups []LikwidEventsetConfig, interval time.Duration, output chan lp.CCMetric) error {
func (m *LikwidCollector) calcGlobalMetrics(interval time.Duration, output chan lp.CCMetric) error {
for _, metric := range m.config.Metrics {
scopemap := m.cpu2tid
if metric.Type == "socket" {
@@ -551,7 +393,7 @@ func (m *LikwidCollector) calcGlobalMetrics(groups []LikwidEventsetConfig, inter
if tid >= 0 {
// Here we generate parameter list
params := make(map[string]interface{})
for _, evset := range groups {
for _, evset := range m.likwidGroups {
for mname, mres := range evset.metrics[tid] {
params[mname] = mres
}
@@ -565,7 +407,7 @@ func (m *LikwidCollector) calcGlobalMetrics(groups []LikwidEventsetConfig, inter
if m.config.InvalidToZero && (math.IsNaN(value) || math.IsInf(value, 0)) {
value = 0.0
}
//m.gmresults[tid][metric.Name] = value
m.gmresults[tid][metric.Name] = value
// Now we have the result, send it with the proper tags
if !math.IsNaN(value) {
if metric.Publish {
@@ -589,53 +431,163 @@ func (m *LikwidCollector) calcGlobalMetrics(groups []LikwidEventsetConfig, inter
return nil
}
func (m *LikwidCollector) ReadThread(interval time.Duration, output chan lp.CCMetric) {
var err error = nil
groups := make([]LikwidEventsetConfig, 0)
for evidx, evset := range m.config.Eventsets {
e := genLikwidEventSet(evset)
e.internal = evidx
skip := false
if !skip {
// measure event set 'i' for 'interval' seconds
skip, err = m.takeMeasurement(evidx, e, interval)
if err != nil {
cclog.ComponentError(m.name, err.Error())
return
}
}
if !skip {
// read measurements and derive event set metrics
m.calcEventsetMetrics(e, interval, output)
}
groups = append(groups, e)
func (m *LikwidCollector) LateInit() error {
var ret C.int
if m.initialized {
return nil
}
// calculate global metrics
m.calcGlobalMetrics(groups, interval, output)
switch m.config.AccessMode {
case "direct":
C.HPMmode(0)
case "accessdaemon":
if len(m.config.DaemonPath) > 0 {
p := os.Getenv("PATH")
os.Setenv("PATH", m.config.DaemonPath+":"+p)
}
C.HPMmode(1)
}
cclog.ComponentDebug(m.name, "initialize LIKWID topology")
ret = C.topology_init()
if ret != 0 {
err := errors.New("failed to initialize LIKWID topology")
cclog.ComponentError(m.name, err.Error())
return err
}
m.sock2tid = make(map[int]int)
tmp := make([]C.int, 1)
for _, sid := range topo.SocketList() {
cstr := C.CString(fmt.Sprintf("S%d:0", sid))
ret = C.cpustr_to_cpulist(cstr, &tmp[0], 1)
if ret > 0 {
m.sock2tid[sid] = m.cpu2tid[int(tmp[0])]
}
C.free(unsafe.Pointer(cstr))
}
m.basefreq = getBaseFreq()
cclog.ComponentDebug(m.name, "BaseFreq", m.basefreq)
cclog.ComponentDebug(m.name, "initialize LIKWID perfmon module")
ret = C.perfmon_init(C.int(len(m.cpulist)), &m.cpulist[0])
if ret != 0 {
var err error = nil
C.topology_finalize()
if ret != -22 {
err = errors.New("failed to initialize LIKWID perfmon")
cclog.ComponentError(m.name, err.Error())
} else {
err = errors.New("access to LIKWID perfmon locked")
}
return err
}
// While adding the events, we test the metrics whether they can be computed at all
for i, evset := range m.config.Eventsets {
var gid C.int
if len(evset.Events) > 0 {
skip := false
likwidGroup := genLikwidEventSet(evset)
for _, g := range m.likwidGroups {
if likwidGroup.go_estr == g.go_estr {
skip = true
break
}
}
if skip {
continue
}
// Now we add the list of events to likwid
gid = C.perfmon_addEventSet(likwidGroup.estr)
if gid >= 0 {
likwidGroup.gid = gid
likwidGroup.internal = i
m.likwidGroups[gid] = likwidGroup
}
} else {
cclog.ComponentError(m.name, "Invalid Likwid eventset config, no events given")
continue
}
}
// If no event set could be added, shut down LikwidCollector
if len(m.likwidGroups) == 0 {
C.perfmon_finalize()
C.topology_finalize()
err := errors.New("no LIKWID performance group initialized")
cclog.ComponentError(m.name, err.Error())
return err
}
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGCHLD)
signal.Notify(sigchan, os.Interrupt)
go func() {
<-sigchan
signal.Stop(sigchan)
m.initialized = false
}()
m.initialized = true
return nil
}
// main read function taking multiple measurement rounds, each 'interval' seconds long
func (m *LikwidCollector) Read(interval time.Duration, output chan lp.CCMetric) {
//var skip bool = false
//var err error
var skip bool = false
var err error
if !m.init {
return
}
m.measureThread.Call(func() {
m.ReadThread(interval, output)
})
if !m.initialized {
m.lock.Lock()
err = m.LateInit()
if err != nil {
m.lock.Unlock()
return
}
m.initialized = true
m.lock.Unlock()
}
if m.initialized && !skip {
for _, evset := range m.likwidGroups {
if !skip {
// measure event set 'i' for 'interval' seconds
skip, err = m.takeMeasurement(evset, interval)
if err != nil {
cclog.ComponentError(m.name, err.Error())
return
}
}
if !skip {
// read measurements and derive event set metrics
m.calcEventsetMetrics(evset, interval, output)
}
}
if !skip {
// use the event set metrics to derive the global metrics
m.calcGlobalMetrics(interval, output)
}
}
}
func (m *LikwidCollector) Close() {
if m.init {
m.init = false
cclog.ComponentDebug(m.name, "Closing ...")
m.lock.Lock()
m.measureThread.Terminate()
m.initialized = false
if m.initialized {
cclog.ComponentDebug(m.name, "Finalize LIKWID perfmon module")
C.perfmon_finalize()
m.initialized = false
}
m.lock.Unlock()
cclog.ComponentDebug(m.name, "Finalize LIKWID topology module")
C.topology_finalize()
cclog.ComponentDebug(m.name, "Closing done")
}
}

View File

@@ -10,7 +10,6 @@ The `likwid` collector is probably the most complicated collector. The LIKWID li
"liblikwid_path" : "/path/to/liblikwid.so",
"accessdaemon_path" : "/folder/that/contains/likwid-accessD",
"access_mode" : "direct or accessdaemon or perf_event",
"lockfile_path" : "/var/run/likwid.lock",
"eventsets": [
{
"events" : {
@@ -42,7 +41,7 @@ The `likwid` collector is probably the most complicated collector. The LIKWID li
The `likwid` configuration consists of two parts, the `eventsets` and `globalmetrics`:
- An event set list itself has two parts, the `events` and a set of derivable `metrics`. Each of the `events` is a `counter:event` pair in LIKWID's syntax. The `metrics` are a list of formulas to derive the metric value from the measurements of the `events`' values. Each metric has a name, the formula, a type and a publish flag. There is an optional `unit` field. Counter names can be used like variables in the formulas, so `PMC0+PMC1` sums the measurements for the both events configured in the counters `PMC0` and `PMC1`. You can optionally use `time` for the measurement time and `inverseClock` for `1.0/baseCpuFrequency`. The type tells the LikwidCollector whether it is a metric for each hardware thread (`cpu`) or each CPU socket (`socket`). You may specify a unit for the metric with `unit`. The last one is the publishing flag. It tells the LikwidCollector whether a metric should be sent to the router or is only used internally to compute a global metric.
- The `globalmetrics` are metrics which require data from multiple event set measurements to be derived. The inputs are the metrics in the event sets. Similar to the metrics in the event sets, the global metrics are defined by a name, a formula, a type and a publish flag. See event set metrics for details. The only difference is that there is no access to the raw event measurements anymore but only to the metrics. Also `time` and `inverseClock` cannot be used anymore. So, the idea is to derive a metric in the `eventsets` section and reuse it in the `globalmetrics` part. If you need a metric only for deriving the global metrics, disable forwarding of the event set metrics (`"publish": false`). **Be aware** that the combination might be misleading because the "behavior" of a metric changes over time and the multiple measurements might count different computing phases. Similar to the metrics in the eventset, you can specify a metric unit with the `unit` field.
- The `globalmetrics` are metrics which require data from multiple event set measurements to be derived. The inputs are the metrics in the event sets. Similar to the metrics in the event sets, the global metrics are defined by a name, a formula, a scope and a publish flag. See event set metrics for details. The only difference is that there is no access to the raw event measurements anymore but only to the metrics. Also `time` and `inverseClock` cannot be used anymore. So, the idea is to derive a metric in the `eventsets` section and reuse it in the `globalmetrics` part. If you need a metric only for deriving the global metrics, disable forwarding of the event set metrics (`"publish": false`). **Be aware** that the combination might be misleading because the "behavior" of a metric changes over time and the multiple measurements might count different computing phases. Similar to the metrics in the eventset, you can specify a metric unit with the `unit` field.
Additional options:
- `force_overwrite`: Same as setting `LIKWID_FORCE=1`. In case counters are already in-use, LIKWID overwrites their configuration to do its measurements
@@ -50,22 +49,21 @@ Additional options:
- `access_mode`: Specify LIKWID access mode: `direct` for direct register access as root user or `accessdaemon`. The access mode `perf_event` is current untested.
- `accessdaemon_path`: Folder of the accessDaemon `likwid-accessD` (like `/usr/local/sbin`)
- `liblikwid_path`: Location of `liblikwid.so` including file name like `/usr/local/lib/liblikwid.so`
- `lockfile_path`: Location of LIKWID's lock file if multiple tools should access the hardware counters. Default `/var/run/likwid.lock`
### Available metric types
### Available metric scopes
Hardware performance counters are scattered all over the system nowadays. A counter coveres a specific part of the system. While there are hardware thread specific counter for CPU cycles, instructions and so on, some others are specific for a whole CPU socket/package. To address that, the LikwidCollector provides the specification of a `type` for each metric.
- `hwthread` : One metric per CPU hardware thread with the tags `"type" : "hwthread"` and `"type-id" : "$hwthread_id"`
- `socket` : One metric per CPU socket/package with the tags `"type" : "socket"` and `"type-id" : "$socket_id"`
**Note:** You cannot specify `socket` type for a metric that is measured at `hwthread` type, so some kind of expert knowledge or lookup work in the [Likwid Wiki](https://github.com/RRZE-HPC/likwid/wiki) is required. Get the type of each counter from the *Architecture* pages and as soon as one counter in a metric is socket-specific, the whole metric is socket-specific.
**Note:** You cannot specify `socket` scope for a metric that is measured at `hwthread` scope, so some kind of expert knowledge or lookup work in the [Likwid Wiki](https://github.com/RRZE-HPC/likwid/wiki) is required. Get the scope of each counter from the *Architecture* pages and as soon as one counter in a metric is socket-specific, the whole metric is socket-specific.
As a guideline:
- All counters `FIXCx`, `PMCy` and `TMAz` have the type `hwthread`
- All counters names containing `BOX` have the type `socket`
- All `PWRx` counters have type `socket`, except `"PWR1" : "RAPL_CORE_ENERGY"` has `hwthread` type
- All `DFCx` counters have type `socket`
- All counters `FIXCx`, `PMCy` and `TMAz` have the scope `hwthread`
- All counters names containing `BOX` have the scope `socket`
- All `PWRx` counters have scope `socket`, except `"PWR1" : "RAPL_CORE_ENERGY"` has `hwthread` scope
- All `DFCx` counters have scope `socket`
### Help with the configuration
@@ -95,7 +93,7 @@ $ scripts/likwid_perfgroup_to_cc_config.py ICX MEM_DP
"name": "Runtime (RDTSC) [s]",
"publish": true,
"unit": "seconds"
"type": "hwthread"
"scope": "hwthread"
},
{
"..." : "..."
@@ -247,7 +245,7 @@ METRICS -> "metrics": [
IPC PMC0/PMC1 -> {
-> "name" : "IPC",
-> "calc" : "PMC0/PMC1",
-> "type": "hwthread",
-> "scope": "hwthread",
-> "publish": true
-> }
-> ]

View File

@@ -1,166 +0,0 @@
package collectors
import (
"encoding/json"
"fmt"
"os"
"regexp"
"strconv"
"strings"
"time"
cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
)
// These are the fields we read from the JSON configuration
type NfsIOStatCollectorConfig struct {
ExcludeMetrics []string `json:"exclude_metrics,omitempty"`
ExcludeFilesystem []string `json:"exclude_filesystem,omitempty"`
UseServerAddressAsSType bool `json:"use_server_as_stype,omitempty"`
}
// This contains all variables we need during execution and the variables
// defined by metricCollector (name, init, ...)
type NfsIOStatCollector struct {
metricCollector
config NfsIOStatCollectorConfig // the configuration structure
meta map[string]string // default meta information
tags map[string]string // default tags
data map[string]map[string]int64 // data storage for difference calculation
key string // which device info should be used as subtype ID? 'server' or 'mntpoint', see NfsIOStatCollectorConfig.UseServerAddressAsSType
}
var deviceRegex = regexp.MustCompile(`device (?P<server>[^ ]+) mounted on (?P<mntpoint>[^ ]+) with fstype nfs(?P<version>\d*) statvers=[\d\.]+`)
var bytesRegex = regexp.MustCompile(`\s+bytes:\s+(?P<nread>[^ ]+) (?P<nwrite>[^ ]+) (?P<dread>[^ ]+) (?P<dwrite>[^ ]+) (?P<nfsread>[^ ]+) (?P<nfswrite>[^ ]+) (?P<pageread>[^ ]+) (?P<pagewrite>[^ ]+)`)
func resolve_regex_fields(s string, regex *regexp.Regexp) map[string]string {
fields := make(map[string]string)
groups := regex.SubexpNames()
for _, match := range regex.FindAllStringSubmatch(s, -1) {
for groupIdx, group := range match {
if len(groups[groupIdx]) > 0 {
fields[groups[groupIdx]] = group
}
}
}
return fields
}
func (m *NfsIOStatCollector) readNfsiostats() map[string]map[string]int64 {
data := make(map[string]map[string]int64)
filename := "/proc/self/mountstats"
stats, err := os.ReadFile(filename)
if err != nil {
return data
}
lines := strings.Split(string(stats), "\n")
var current map[string]string = nil
for _, l := range lines {
// Is this a device line with mount point, remote target and NFS version?
dev := resolve_regex_fields(l, deviceRegex)
if len(dev) > 0 {
if _, ok := stringArrayContains(m.config.ExcludeFilesystem, dev[m.key]); !ok {
current = dev
if len(current["version"]) == 0 {
current["version"] = "3"
}
}
}
if len(current) > 0 {
// Byte line parsing (if found the device for it)
bytes := resolve_regex_fields(l, bytesRegex)
if len(bytes) > 0 {
data[current[m.key]] = make(map[string]int64)
for name, sval := range bytes {
if _, ok := stringArrayContains(m.config.ExcludeMetrics, name); !ok {
val, err := strconv.ParseInt(sval, 10, 64)
if err == nil {
data[current[m.key]][name] = val
}
}
}
current = nil
}
}
}
return data
}
func (m *NfsIOStatCollector) Init(config json.RawMessage) error {
var err error = nil
m.name = "NfsIOStatCollector"
m.setup()
m.parallel = true
m.meta = map[string]string{"source": m.name, "group": "NFS", "unit": "bytes"}
m.tags = map[string]string{"type": "node"}
m.config.UseServerAddressAsSType = false
if len(config) > 0 {
err = json.Unmarshal(config, &m.config)
if err != nil {
cclog.ComponentError(m.name, "Error reading config:", err.Error())
return err
}
}
m.key = "mntpoint"
if m.config.UseServerAddressAsSType {
m.key = "server"
}
m.data = m.readNfsiostats()
m.init = true
return err
}
func (m *NfsIOStatCollector) Read(interval time.Duration, output chan lp.CCMetric) {
timestamp := time.Now()
// Get the current values for all mountpoints
newdata := m.readNfsiostats()
for mntpoint, values := range newdata {
// Was the mount point already present in the last iteration
if old, ok := m.data[mntpoint]; ok {
// Calculate the difference of old and new values
for i := range values {
x := values[i] - old[i]
y, err := lp.New(fmt.Sprintf("nfsio_%s", i), m.tags, m.meta, map[string]interface{}{"value": x}, timestamp)
if err == nil {
if strings.HasPrefix(i, "page") {
y.AddMeta("unit", "4K_Pages")
}
y.AddTag("stype", "filesystem")
y.AddTag("stype-id", mntpoint)
// Send it to output channel
output <- y
}
// Update old to the new value for the next iteration
old[i] = values[i]
}
} else {
// First time we see this mount point, store all values
m.data[mntpoint] = values
}
}
// Reset entries that do not exist anymore
for mntpoint := range m.data {
found := false
for new := range newdata {
if new == mntpoint {
found = true
break
}
}
if !found {
m.data[mntpoint] = nil
}
}
}
func (m *NfsIOStatCollector) Close() {
// Unset flag
m.init = false
}

View File

@@ -1,27 +0,0 @@
## `nfsiostat` collector
```json
"nfsiostat": {
"exclude_metrics": [
"nfsio_oread"
],
"exclude_filesystems" : [
"/mnt",
],
"use_server_as_stype": false
}
```
The `nfsiostat` collector reads data from `/proc/self/mountstats` and outputs a handful **node** metrics for each NFS filesystem. If a metric or filesystem is not required, it can be excluded from forwarding it to the sink.
Metrics:
* `nfsio_nread`: Bytes transferred by normal `read()` calls
* `nfsio_nwrite`: Bytes transferred by normal `write()` calls
* `nfsio_oread`: Bytes transferred by `read()` calls with `O_DIRECT`
* `nfsio_owrite`: Bytes transferred by `write()` calls with `O_DIRECT`
* `nfsio_pageread`: Pages transferred by `read()` calls
* `nfsio_pagewrite`: Pages transferred by `write()` calls
* `nfsio_nfsread`: Bytes transferred for reading from the server
* `nfsio_nfswrite`: Pages transferred by writing to the server
The `nfsiostat` collector adds the mountpoint to the tags as `stype=filesystem,stype-id=<mountpoint>`. If the server address should be used instead of the mountpoint, use the `use_server_as_stype` config setting.

View File

@@ -37,9 +37,7 @@ $ install --mode 644 \
$ systemctl enable cc-metric-collector
```
## Packaging
### RPM
## RPM
In order to get a RPM packages for cc-metric-collector, just use:
@@ -49,7 +47,7 @@ $ make RPM
It uses the RPM SPEC file `scripts/cc-metric-collector.spec` and requires the RPM tools (`rpm` and `rpmspec`) and `git`.
### DEB
## DEB
In order to get very simple Debian packages for cc-metric-collector, just use:
@@ -60,15 +58,3 @@ $ make DEB
It uses the DEB control file `scripts/cc-metric-collector.control` and requires `dpkg-deb`, `awk`, `sed` and `git`. It creates only a binary deb package.
_This option is not well tested and therefore experimental_
### Customizing RPMs or DEB packages
If you want to customize the RPMs or DEB packages for your local system, use the following workflow.
- (if there is already a fork in the private account, delete it and wait until Github realizes the deletion)
- Fork the cc-metric-collector repository (if Github hasn't realized it, it creates a fork named cc-metric-collector2)
- Go to private cc-metric-collector repository and enable Github Actions
- Do changes to the scripts, code, ... Commit and push your changes.
- Tag the new commit with `v0.x.y-<myversion>` (`git tag v0.x.y-<myversion>`)
- Push tags to repository (`git push --tags`)
- Wait until the Release action finishes. It creates fresh RPMs and DEBs in your private repository on the Releases page.

13
go.mod
View File

@@ -8,21 +8,20 @@ require (
github.com/NVIDIA/go-nvml v0.11.6-0
github.com/PaesslerAG/gval v1.2.1
github.com/gorilla/mux v1.8.0
github.com/influxdata/influxdb-client-go/v2 v2.12.1
github.com/influxdata/influxdb-client-go/v2 v2.12.0
github.com/influxdata/line-protocol v0.0.0-20210922203350-b1ad95c89adf
github.com/nats-io/nats.go v1.21.0
github.com/nats-io/nats.go v1.20.0
github.com/prometheus/client_golang v1.14.0
github.com/stmcginnis/gofish v0.13.0
github.com/tklauser/go-sysconf v0.3.11
golang.org/x/sys v0.3.0
golang.org/x/sys v0.2.0
)
require (
github.com/apapsch/go-jsonmerge/v2 v2.0.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/deepmap/oapi-codegen v1.12.4 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/deepmap/oapi-codegen v1.12.3 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
@@ -36,6 +35,6 @@ require (
github.com/shopspring/decimal v1.3.1 // indirect
github.com/tklauser/numcpus v0.6.0 // indirect
golang.org/x/crypto v0.3.0 // indirect
golang.org/x/net v0.3.0 // indirect
golang.org/x/net v0.2.0 // indirect
google.golang.org/protobuf v1.28.1 // indirect
)

View File

@@ -1,125 +0,0 @@
package hostlist
import (
"fmt"
"regexp"
"sort"
"strconv"
"strings"
)
func Expand(in string) (result []string, err error) {
// Create ranges regular expression
reStNumber := "[[:digit:]]+"
reStRange := reStNumber + "-" + reStNumber
reStOptionalNumberOrRange := "(" + reStNumber + ",|" + reStRange + ",)*"
reStNumberOrRange := "(" + reStNumber + "|" + reStRange + ")"
reStBraceLeft := "[[]"
reStBraceRight := "[]]"
reStRanges := reStBraceLeft +
reStOptionalNumberOrRange +
reStNumberOrRange +
reStBraceRight
reRanges := regexp.MustCompile(reStRanges)
// Create host list regular expression
reStDNSChars := "[a-zA-Z0-9-]+"
reStPrefix := "^(" + reStDNSChars + ")"
reStOptionalSuffix := "(" + reStDNSChars + ")?"
re := regexp.MustCompile(reStPrefix + "([[][0-9,-]+[]])?" + reStOptionalSuffix)
// Remove all delimiters from the input
in = strings.TrimLeft(in, ", ")
for len(in) > 0 {
if v := re.FindStringSubmatch(in); v != nil {
// Remove matched part from the input
lenPrefix := len(v[0])
in = in[lenPrefix:]
// Remove all delimiters from the input
in = strings.TrimLeft(in, ", ")
// matched prefix, range and suffix
hlPrefix := v[1]
hlRanges := v[2]
hlSuffix := v[3]
// Single node without ranges
if hlRanges == "" {
result = append(result, hlPrefix)
continue
}
// Node with ranges
if v := reRanges.FindStringSubmatch(hlRanges); v != nil {
// Remove braces
hlRanges = hlRanges[1 : len(hlRanges)-1]
// Split host ranges at ,
for _, hlRange := range strings.Split(hlRanges, ",") {
// Split host range at -
RangeStartEnd := strings.Split(hlRange, "-")
// Range is only a single number
if len(RangeStartEnd) == 1 {
result = append(result, hlPrefix+RangeStartEnd[0]+hlSuffix)
continue
}
// Range has a start and an end
widthRangeStart := len(RangeStartEnd[0])
widthRangeEnd := len(RangeStartEnd[1])
iStart, _ := strconv.ParseUint(RangeStartEnd[0], 10, 64)
iEnd, _ := strconv.ParseUint(RangeStartEnd[1], 10, 64)
if iStart > iEnd {
return nil, fmt.Errorf("single range start is greater than end: %s", hlRange)
}
// Create print format string for range numbers
doPadding := widthRangeStart == widthRangeEnd
widthPadding := widthRangeStart
var formatString string
if doPadding {
formatString = "%0" + fmt.Sprint(widthPadding) + "d"
} else {
formatString = "%d"
}
formatString = hlPrefix + formatString + hlSuffix
// Add nodes from this range
for i := iStart; i <= iEnd; i++ {
result = append(result, fmt.Sprintf(formatString, i))
}
}
} else {
return nil, fmt.Errorf("not at hostlist range: %s", hlRanges)
}
} else {
return nil, fmt.Errorf("not a hostlist: %s", in)
}
}
if result != nil {
// sort
sort.Strings(result)
// uniq
previous := 1
for current := 1; current < len(result); current++ {
if result[current-1] != result[current] {
if previous != current {
result[previous] = result[current]
}
previous++
}
}
result = result[:previous]
}
return
}

View File

@@ -1,126 +0,0 @@
package hostlist
import (
"testing"
)
func TestExpand(t *testing.T) {
// Compare two slices of strings
equal := func(a, b []string) bool {
if len(a) != len(b) {
return false
}
for i, v := range a {
if v != b[i] {
return false
}
}
return true
}
type testDefinition struct {
input string
resultExpected []string
errorExpected bool
}
expandTests := []testDefinition{
{
// Single node
input: "n1",
resultExpected: []string{"n1"},
errorExpected: false,
},
{
// Single node, duplicated
input: "n1,n1",
resultExpected: []string{"n1"},
errorExpected: false,
},
{
// Single node with padding
input: "n[01]",
resultExpected: []string{"n01"},
errorExpected: false,
},
{
// Single node with suffix
input: "n[01]-p",
resultExpected: []string{"n01-p"},
errorExpected: false,
},
{
// Multiple nodes with a single range
input: "n[1-2]",
resultExpected: []string{"n1", "n2"},
errorExpected: false,
},
{
// Multiple nodes with a single range and a single index
input: "n[1-2,3]",
resultExpected: []string{"n1", "n2", "n3"},
errorExpected: false,
},
{
// Multiple nodes with different prefixes
input: "n[1-2],m[1,2]",
resultExpected: []string{"m1", "m2", "n1", "n2"},
errorExpected: false,
},
{
// Multiple nodes with different suffixes
input: "n[1-2]-p,n[1,2]-q",
resultExpected: []string{"n1-p", "n1-q", "n2-p", "n2-q"},
errorExpected: false,
},
{
// Multiple nodes with and without node ranges
input: " n09, n[01-04,06-07,09] , , n10,n04",
resultExpected: []string{"n01", "n02", "n03", "n04", "n06", "n07", "n09", "n10"},
errorExpected: false,
},
{
// Forbidden DNS character
input: "n@",
resultExpected: []string{},
errorExpected: true,
},
{
// Forbidden range
input: "n[1-2-2,3]",
resultExpected: []string{},
errorExpected: true,
},
{
// Forbidden range limits
input: "n[2-1]",
resultExpected: []string{},
errorExpected: true,
},
}
for _, expandTest := range expandTests {
result, err := Expand(expandTest.input)
hasError := err != nil
if hasError != expandTest.errorExpected && hasError {
t.Errorf("Expand('%s') failed: unexpected error '%v'",
expandTest.input, err)
continue
}
if hasError != expandTest.errorExpected && !hasError {
t.Errorf("Expand('%s') did not fail as expected: got result '%+v'",
expandTest.input, result)
continue
}
if !hasError && !equal(result, expandTest.resultExpected) {
t.Errorf("Expand('%s') failed: got result '%+v', expected result '%v'",
expandTest.input, result, expandTest.resultExpected)
continue
}
t.Logf("Checked hostlist.Expand('%s'): result = '%+v', err = '%v'",
expandTest.input, result, err)
}
}

View File

@@ -7,23 +7,23 @@
},
"redfish_recv": {
"type": "redfish",
"endpoint": "https://%h-bmc",
"client_config": [
{
"host_list": "my-host-1-[1-2]",
"hostname": "my-host-1",
"username": "username-1",
"password": "password-1"
"password": "password-1",
"endpoint": "https://my-endpoint-1"
},
{
"host_list": "my-host-2-[1,2]",
"hostname": "my-host-2",
"username": "username-2",
"password": "password-2"
"password": "password-2",
"endpoint": "https://my-endpoint-2"
}
]
},
"ipmi_recv": {
"type": "ipmi",
"endpoint": "ipmi-sensors://%h-ipmi",
"exclude_metrics": [
"fan_speed",
"voltage"
@@ -32,12 +32,18 @@
{
"username": "username-1",
"password": "password-1",
"host_list": "my-host-1-[1-2]"
"endpoint": "ipmi-sensors://my-endpoint-1",
"host_list": [
"my-host-1"
]
},
{
"username": "username-2",
"password": "password-2",
"host_list": "my-host-2-[1,2]"
"endpoint": "ipmi-sensors://my-endpoint-2",
"host_list": [
"my-host-2"
]
}
]
}

View File

@@ -0,0 +1,164 @@
package receivers
import (
"bufio"
"encoding/json"
"fmt"
"net"
"os"
"sync"
cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
influx "github.com/influxdata/line-protocol"
)
// SampleReceiver configuration: receiver type, listen address, port
type AppMetricReceiverConfig struct {
Type string `json:"type"`
SocketFile string `json:"socket_file"`
}
type AppMetricReceiver struct {
receiver
config AppMetricReceiverConfig
// Storage for static information
meta map[string]string
// Use in case of own go routine
done chan bool
wg sync.WaitGroup
// Influx stuff
handler *influx.MetricHandler
parser *influx.Parser
// WaitGroup for individual connections
connWg sync.WaitGroup
listener net.Listener
}
func (r *AppMetricReceiver) newConnection(conn net.Conn) {
//defer conn.Close()
//defer wg.Done()
buffer, err := bufio.NewReader(conn).ReadBytes('\n')
if err != nil {
conn.Close()
return
}
metrics, err := r.parser.Parse(buffer)
if err != nil {
cclog.ComponentError(r.name, "failed to parse received metrics")
return
}
for _, m := range metrics {
y := lp.FromInfluxMetric(m)
for k, v := range r.meta {
y.AddMeta(k, v)
}
if r.sink != nil {
r.sink <- y
}
}
r.newConnection(conn)
}
func (r *AppMetricReceiver) newAccepter(listenSocket net.Listener) {
accept_loop:
for {
select {
case <-r.done:
break accept_loop
default:
conn, err := listenSocket.Accept()
if err == nil {
r.connWg.Add(1)
go func() {
r.newConnection(conn)
r.connWg.Done()
}()
}
}
}
r.wg.Done()
}
// Implement functions required for Receiver interface
// Start(), Close()
// See: metricReceiver.go
func (r *AppMetricReceiver) Start() {
var err error = nil
cclog.ComponentDebug(r.name, "START")
r.listener, err = net.Listen("unix", r.config.SocketFile)
if err != nil {
cclog.ComponentError(r.name, "failed to listen at socket", r.config.SocketFile)
}
if _, err := os.Stat(r.config.SocketFile); err != nil {
cclog.ComponentError(r.name, "failed to create socket", r.config.SocketFile)
}
r.done = make(chan bool)
r.wg.Add(1)
go r.newAccepter(r.listener)
}
// Close receiver: close network connection, close files, close libraries, ...
func (r *AppMetricReceiver) Close() {
cclog.ComponentDebug(r.name, "CLOSE")
if _, err := os.Stat(r.config.SocketFile); err == nil {
if err := os.RemoveAll(r.config.SocketFile); err != nil {
cclog.ComponentError(r.name, "Failed to remove UNIX socket", r.config.SocketFile)
}
}
// in case of own go routine, send the signal and wait
r.listener.Close()
r.done <- true
close(r.done)
r.connWg.Wait()
r.wg.Wait()
}
// New function to create a new instance of the receiver
// Initialize the receiver by giving it a name and reading in the config JSON
func NewAppMetricReceiver(name string, config json.RawMessage) (Receiver, error) {
r := new(AppMetricReceiver)
// Set name of SampleReceiver
// The name should be chosen in such a way that different instances of SampleReceiver can be distinguished
r.name = fmt.Sprintf("AppMetricReceiver(%s)", name)
// Set static information
r.meta = map[string]string{"source": r.name}
// Set defaults in r.config
// Allow overwriting these defaults by reading config JSON
r.config.SocketFile = "/tmp/cc.sock"
// Read the sample receiver specific JSON config
if len(config) > 0 {
err := json.Unmarshal(config, &r.config)
if err != nil {
cclog.ComponentError(r.name, "Error reading config:", err.Error())
return nil, err
}
}
if len(r.config.SocketFile) == 0 {
cclog.ComponentError(r.name, "Invalid socket_file setting:", r.config.SocketFile)
return nil, fmt.Errorf("invalid socket_file setting: %s", r.config.SocketFile)
}
// Check that all required fields in the configuration are set
// Use 'if len(r.config.Option) > 0' for strings
r.handler = influx.NewMetricHandler()
r.parser = influx.NewParser(r.handler)
r.parser.SetTimeFunc(DefaultTime)
return r, nil
}

View File

@@ -0,0 +1,23 @@
## `appmetrics` receiver
The `appmetrics` receiver can be used to submit metrics from an application into the monitoring system. It listens for incoming connections on a UNIX socket.
### Configuration structure
```json
{
"<name>": {
"type": "appmetrics",
"socket_file" : "/tmp/cc.sock",
}
}
```
- `type`: makes the receiver a `appmetrics` receiver
- `socket_file`: Listen UNIX socket
### Inputs from applications
Applcations can connect to the `appmetrics` socket and provide metric in the [InfluxDB line protocol](https://github.com/influxdata/line-protocol). It is currently not possible to submit meta information as the Influx line protocol does not know them.

View File

@@ -15,7 +15,6 @@ import (
cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
"github.com/ClusterCockpit/cc-metric-collector/pkg/hostlist"
)
type IPMIReceiverClientConfig struct {
@@ -320,12 +319,12 @@ func NewIPMIReceiver(name string, config json.RawMessage) (Receiver, error) {
ExcludeMetrics []string `json:"exclude_metrics,omitempty"`
ClientConfigs []struct {
Fanout int `json:"fanout,omitempty"` // Maximum number of simultaneous IPMI connections (default: 64)
DriverType string `json:"driver_type,omitempty"` // Out of band IPMI driver (default: LAN_2_0)
HostList string `json:"host_list"` // List of hosts with the same client configuration
Username *string `json:"username"` // User name to authenticate with
Password *string `json:"password"` // Password to use for authentication
Endpoint *string `json:"endpoint"` // URL of the IPMI service
Fanout int `json:"fanout,omitempty"` // Maximum number of simultaneous IPMI connections (default: 64)
DriverType string `json:"driver_type,omitempty"` // Out of band IPMI driver (default: LAN_2_0)
HostList []string `json:"host_list"` // List of hosts with the same client configuration
Username *string `json:"username"` // User name to authenticate with
Password *string `json:"password"` // Password to use for authentication
Endpoint *string `json:"endpoint"` // URL of the IPMI service
// Per client excluded metrics
ExcludeMetrics []string `json:"exclude_metrics,omitempty"`
@@ -352,9 +351,8 @@ func NewIPMIReceiver(name string, config json.RawMessage) (Receiver, error) {
// Read the IPMI receiver specific JSON config
if len(config) > 0 {
d := json.NewDecoder(bytes.NewReader(config))
d.DisallowUnknownFields()
if err := d.Decode(&configJSON); err != nil {
err := json.Unmarshal(config, &configJSON)
if err != nil {
cclog.ComponentError(r.name, "Error reading config:", err.Error())
return nil, err
}
@@ -437,17 +435,10 @@ func NewIPMIReceiver(name string, config json.RawMessage) (Receiver, error) {
return nil, err
}
// Create mapping between IPMI host name and node host name
// This also guaranties that all IPMI host names are unique
// Create mapping between ipmi hostname and node hostname
// This also guaranties that all ipmi hostnames are uniqu
ipmi2HostMapping := make(map[string]string)
hostList, err := hostlist.Expand(clientConfigJSON.HostList)
if err != nil {
err := fmt.Errorf("client config number %d failed to parse host list %s: %v",
i, clientConfigJSON.HostList, err)
cclog.ComponentError(r.name, err)
return nil, err
}
for _, host := range hostList {
for _, host := range clientConfigJSON.HostList {
ipmiHost := strings.Replace(host_pattern, "%h", host, -1)
ipmi2HostMapping[ipmiHost] = host
}

View File

@@ -12,14 +12,14 @@ The IPMI Receiver uses `ipmi-sensors` from the [FreeIPMI](https://www.gnu.org/so
"fanout": 256,
"username": "<Username>",
"password": "<Password>",
"endpoint": "ipmi-sensors://%h-bmc",
"endpoint": "ipmi-sensors://%h-p",
"exclude_metrics": [ "fan_speed", "voltage" ],
"client_config": [
{
"host_list": "n[1,2-4]"
"host_list": ["n1", "n2", "n3", "n4" ]
},
{
"host_list": "n[5-6]",
"host_list": [ "n5", "n6" ],
"driver_type": "LAN",
"cli_options": [ "--workaround-flags=..." ],
"password": "<Password 2>"

View File

@@ -11,9 +11,10 @@ import (
)
var AvailableReceivers = map[string]func(name string, config json.RawMessage) (Receiver, error){
"ipmi": NewIPMIReceiver,
"nats": NewNatsReceiver,
"redfish": NewRedfishReceiver,
"ipmi": NewIPMIReceiver,
"nats": NewNatsReceiver,
"redfish": NewRedfishReceiver,
"appmetrics": NewAppMetricReceiver,
}
type receiveManager struct {

View File

@@ -1,11 +1,9 @@
package receivers
import (
"bytes"
"crypto/tls"
"encoding/json"
"fmt"
"io"
"net/http"
"strconv"
"strings"
@@ -14,7 +12,6 @@ import (
cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
"github.com/ClusterCockpit/cc-metric-collector/pkg/hostlist"
// See: https://pkg.go.dev/github.com/stmcginnis/gofish
"github.com/stmcginnis/gofish"
@@ -349,17 +346,9 @@ func (r *RedfishReceiver) readProcessorMetrics(
// This property shall contain the temperature, in Celsius, of the processor.
TemperatureCelsius float32 `json:"TemperatureCelsius"`
}
body, err := io.ReadAll(resp.Body)
err = json.NewDecoder(resp.Body).Decode(&processorMetrics)
if err != nil {
return fmt.Errorf("unable to read JSON for processor metrics: %+w", err)
}
err = json.Unmarshal(body, &processorMetrics)
if err != nil {
return fmt.Errorf(
"unable to unmarshal JSON='%s' for processor metrics: %+w",
string(body),
err,
)
return fmt.Errorf("unable to decode JSON for processor metrics: %+w", err)
}
processorMetrics.SetClient(processor.Client)
@@ -391,9 +380,7 @@ func (r *RedfishReceiver) readProcessorMetrics(
namePower := "consumed_power"
if !clientConfig.isExcluded[namePower] &&
// Some servers return "ConsumedPowerWatt":65535 instead of "ConsumedPowerWatt":null
processorMetrics.ConsumedPowerWatt != 65535 {
if !clientConfig.isExcluded[namePower] {
y, err := lp.New(namePower, tags, metaPower,
map[string]interface{}{
"value": processorMetrics.ConsumedPowerWatt,
@@ -643,10 +630,10 @@ func NewRedfishReceiver(name string, config json.RawMessage) (Receiver, error) {
ExcludeMetrics []string `json:"exclude_metrics,omitempty"`
ClientConfigs []struct {
HostList string `json:"host_list"` // List of hosts with the same client configuration
Username *string `json:"username"` // User name to authenticate with
Password *string `json:"password"` // Password to use for authentication
Endpoint *string `json:"endpoint"` // URL of the redfish service
HostList []string `json:"host_list"` // List of hosts with the same client configuration
Username *string `json:"username"` // User name to authenticate with
Password *string `json:"password"` // Password to use for authentication
Endpoint *string `json:"endpoint"` // URL of the redfish service
// Per client disable collection of power,processor or thermal metrics
DisablePowerMetrics bool `json:"disable_power_metrics"`
@@ -673,9 +660,8 @@ func NewRedfishReceiver(name string, config json.RawMessage) (Receiver, error) {
// Read the redfish receiver specific JSON config
if len(config) > 0 {
d := json.NewDecoder(bytes.NewReader(config))
d.DisallowUnknownFields()
if err := d.Decode(&configJSON); err != nil {
err := json.Unmarshal(config, &configJSON)
if err != nil {
cclog.ComponentError(r.name, "Error reading config:", err.Error())
return nil, err
}
@@ -777,14 +763,7 @@ func NewRedfishReceiver(name string, config json.RawMessage) (Receiver, error) {
isExcluded[key] = true
}
hostList, err := hostlist.Expand(clientConfigJSON.HostList)
if err != nil {
err := fmt.Errorf("client config number %d failed to parse host list %s: %v",
i, clientConfigJSON.HostList, err)
cclog.ComponentError(r.name, err)
return nil, err
}
for _, host := range hostList {
for _, host := range clientConfigJSON.HostList {
// Endpoint of the redfish service
endpoint := strings.Replace(endpoint_pattern, "%h", host, -1)

View File

@@ -8,22 +8,22 @@ The Redfish receiver uses the [Redfish (specification)](https://www.dmtf.org/sta
{
"<redfish receiver name>": {
"type": "redfish",
"username": "<Username>",
"password": "<Password>",
"username": "<user A>",
"password": "<password A>",
"endpoint": "https://%h-bmc",
"exclude_metrics": [ "min_consumed_watts" ],
"client_config": [
{
"host_list": "n[1,2-4]"
"host_list": [ "<host 1>", "<host 2>" ]
},
{
"host_list": "n5"
"host_list": [ "<host 3>", "<host 4>" ]
"disable_power_metrics": true
},
{
"host_list": "n6" ],
"username": "<Username 2>",
"password": "<Password 2>",
"host_list": [ "<host 5>" ],
"username": "<user B>",
"password": "<password B>",
"endpoint": "https://%h-BMC",
"disable_thermal_metrics": true
}

View File

@@ -10,8 +10,6 @@ BuildRequires: go-toolset
BuildRequires: systemd-rpm-macros
# for header downloads
BuildRequires: wget
# Recommended when using the sysusers_create_package macro
Requires(pre): /usr/bin/systemd-sysusers
Provides: %{name} = %{version}

View File

@@ -8,9 +8,7 @@ The `http` sink uses POST requests to a HTTP server to submit the metrics in the
{
"<name>": {
"type": "http",
"meta_as_tags" : [
"meta-key"
],
"meta_as_tags" : true,
"url" : "https://my-monitoring.example.com:1234/api/write",
"jwt" : "blabla.blabla.blabla",
"timeout": "5s",
@@ -22,7 +20,7 @@ The `http` sink uses POST requests to a HTTP server to submit the metrics in the
```
- `type`: makes the sink an `http` sink
- `meta_as_tags`: Move specific meta information to the tags in the output (optional)
- `meta_as_tags`: print all meta information as tags in the output (optional)
- `url`: The full URL of the endpoint
- `jwt`: JSON web tokens for authentification (Using the *Bearer* scheme)
- `timeout`: General timeout for the HTTP client (default '5s')