Merge branch 'develop' of github.com:ClusterCockpit/cc-metric-collector into develop

This commit is contained in:
Thomas Roehl 2022-03-15 15:24:29 +01:00
commit 2b8266d1d2
23 changed files with 789 additions and 499 deletions

View File

@ -1,64 +0,0 @@
# See: https://docs.github.com/en/actions/using-workflows/workflow-syntax-for-github-actions
# Workflow name
name: AlmaLinux 8.5 RPM build
# Run on tag push
on:
push:
tags:
- '**'
jobs:
#
# Build on AlmaLinux 8.5 using go-toolset
#
AlmaLinux-RPM-build:
runs-on: ubuntu-latest
# See: https://hub.docker.com/_/almalinux
container: almalinux:8.5
steps:
# Use dnf to install development packages
- name: Install development packages
run: dnf --assumeyes group install "Development Tools" "RPM Development Tools"
# Checkout git repository and submodules
# fetch-depth must be 0 to use git describe
# See: https://github.com/marketplace/actions/checkout
- name: Checkout
uses: actions/checkout@v2
with:
submodules: recursive
fetch-depth: 0
# Use dnf to install build dependencies
- name: Install build dependencies
run: dnf --assumeyes builddep scripts/cc-metric-collector.spec
- name: RPM build MetricCollector
id: rpmbuild
run: make RPM
# See: https://github.com/actions/upload-artifact
- name: Save RPM as artifact
uses: actions/upload-artifact@v2
with:
name: cc-metric-collector RPM for AlmaLinux 8.5
path: ${{ steps.rpmbuild.outputs.RPM }}
- name: Save SRPM as artifact
uses: actions/upload-artifact@v2
with:
name: cc-metric-collector SRPM for AlmaLinux 8.5
path: ${{ steps.rpmbuild.outputs.SRPM }}
# See: https://github.com/softprops/action-gh-release
- name: Release
uses: softprops/action-gh-release@v1
if: startsWith(github.ref, 'refs/tags/')
with:
name: cc-metric-collector-${{github.ref_name}}
files: |
${{ steps.rpmbuild.outputs.RPM }}
${{ steps.rpmbuild.outputs.SRPM }}

View File

@ -1,64 +0,0 @@
# See: https://docs.github.com/en/actions/using-workflows/workflow-syntax-for-github-actions
# Workflow name
name: Red Hat Universal Base Image 8 RPM build
# Run on tag push
on:
push:
tags:
- '**'
jobs:
#
# Build on UBI 8 using go-toolset
#
UBI-8-RPM-build:
runs-on: ubuntu-latest
# See: https://catalog.redhat.com/software/containers/ubi8/ubi/5c359854d70cc534b3a3784e?container-tabs=gti
container: registry.access.redhat.com/ubi8/ubi:8.5-226.1645809065
steps:
# Use dnf to install development packages
- name: Install development packages
run: dnf --assumeyes --disableplugin=subscription-manager install rpm-build go-srpm-macros rpm-build-libs rpm-libs gcc make python38 git
# Checkout git repository and submodules
# fetch-depth must be 0 to use git describe
# See: https://github.com/marketplace/actions/checkout
- name: Checkout
uses: actions/checkout@v2
with:
submodules: recursive
fetch-depth: 0
# Use dnf to install build dependencies
- name: Install build dependencies
run: dnf --assumeyes --disableplugin=subscription-manager builddep scripts/cc-metric-collector.spec
- name: RPM build MetricCollector
id: rpmbuild
run: make RPM
# See: https://github.com/actions/upload-artifact
- name: Save RPM as artifact
uses: actions/upload-artifact@v2
with:
name: cc-metric-collector RPM for UBI 8
path: ${{ steps.rpmbuild.outputs.RPM }}
- name: Save SRPM as artifact
uses: actions/upload-artifact@v2
with:
name: cc-metric-collector SRPM for UBI 8
path: ${{ steps.rpmbuild.outputs.SRPM }}
# See: https://github.com/softprops/action-gh-release
- name: Release
uses: softprops/action-gh-release@v1
if: startsWith(github.ref, 'refs/tags/')
with:
name: cc-metric-collector-${{github.ref_name}}
files: |
${{ steps.rpmbuild.outputs.RPM }}
${{ steps.rpmbuild.outputs.SRPM }}

184
.github/workflows/Release.yml vendored Normal file
View File

@ -0,0 +1,184 @@
# See: https://docs.github.com/en/actions/using-workflows/workflow-syntax-for-github-actions
# Workflow name
name: Release
# Run on tag push
on:
push:
tags:
- '**'
jobs:
#
# Build on AlmaLinux 8.5 using go-toolset
#
AlmaLinux-RPM-build:
runs-on: ubuntu-latest
# See: https://hub.docker.com/_/almalinux
container: almalinux:8.5
# The job outputs link to the outputs of the 'rpmrename' step
# Only job outputs can be used in child jobs
outputs:
rpm : ${{steps.rpmrename.outputs.RPM}}
srpm : ${{steps.rpmrename.outputs.SRPM}}
steps:
# Use dnf to install development packages
- name: Install development packages
run: dnf --assumeyes group install "Development Tools" "RPM Development Tools"
# Checkout git repository and submodules
# fetch-depth must be 0 to use git describe
# See: https://github.com/marketplace/actions/checkout
- name: Checkout
uses: actions/checkout@v2
with:
submodules: recursive
fetch-depth: 0
# Use dnf to install build dependencies
- name: Install build dependencies
run: dnf --assumeyes builddep scripts/cc-metric-collector.spec
- name: RPM build MetricCollector
id: rpmbuild
run: make RPM
# AlmaLinux 8.5 is a derivate of RedHat Enterprise Linux 8 (UBI8),
# so the created RPM both contain the substring 'el8' in the RPM file names
# This step replaces the substring 'el8' to 'alma85'. It uses the move operation
# because it is unclear whether the default AlmaLinux 8.5 container contains the
# 'rename' command. This way we also get the new names for output.
- name: Rename RPMs (s/el8/alma85/)
id: rpmrename
run: |
OLD_RPM="${{steps.rpmbuild.outputs.RPM}}"
OLD_SRPM="${{steps.rpmbuild.outputs.SRPM}}"
NEW_RPM="${OLD_RPM/el8/alma85}"
NEW_SRPM=${OLD_SRPM/el8/alma85}
mv "${OLD_RPM}" "${NEW_RPM}"
mv "${OLD_SRPM}" "${NEW_SRPM}"
echo "::set-output name=SRPM::${NEW_SRPM}"
echo "::set-output name=RPM::${NEW_RPM}"
# See: https://github.com/actions/upload-artifact
- name: Save RPM as artifact
uses: actions/upload-artifact@v2
with:
name: cc-metric-collector RPM for AlmaLinux 8.5
path: ${{ steps.rpmrename.outputs.RPM }}
- name: Save SRPM as artifact
uses: actions/upload-artifact@v2
with:
name: cc-metric-collector SRPM for AlmaLinux 8.5
path: ${{ steps.rpmrename.outputs.SRPM }}
#
# Build on UBI 8 using go-toolset
#
UBI-8-RPM-build:
runs-on: ubuntu-latest
# See: https://catalog.redhat.com/software/containers/ubi8/ubi/5c359854d70cc534b3a3784e?container-tabs=gti
container: registry.access.redhat.com/ubi8/ubi:8.5-226.1645809065
# The job outputs link to the outputs of the 'rpmbuild' step
outputs:
rpm : ${{steps.rpmbuild.outputs.RPM}}
srpm : ${{steps.rpmbuild.outputs.SRPM}}
steps:
# Use dnf to install development packages
- name: Install development packages
run: dnf --assumeyes --disableplugin=subscription-manager install rpm-build go-srpm-macros rpm-build-libs rpm-libs gcc make python38 git
# Checkout git repository and submodules
# fetch-depth must be 0 to use git describe
# See: https://github.com/marketplace/actions/checkout
- name: Checkout
uses: actions/checkout@v2
with:
submodules: recursive
fetch-depth: 0
# Use dnf to install build dependencies
- name: Install build dependencies
run: dnf --assumeyes --disableplugin=subscription-manager builddep scripts/cc-metric-collector.spec
- name: RPM build MetricCollector
id: rpmbuild
run: make RPM
# See: https://github.com/actions/upload-artifact
- name: Save RPM as artifact
uses: actions/upload-artifact@v2
with:
name: cc-metric-collector RPM for UBI 8
path: ${{ steps.rpmbuild.outputs.RPM }}
- name: Save SRPM as artifact
uses: actions/upload-artifact@v2
with:
name: cc-metric-collector SRPM for UBI 8
path: ${{ steps.rpmbuild.outputs.SRPM }}
#
# Create release with fresh RPMs
#
Release:
runs-on: ubuntu-latest
# We need the RPMs, so add dependency
needs: [AlmaLinux-RPM-build, UBI-8-RPM-build]
steps:
# See: https://github.com/actions/download-artifact
- name: Download AlmaLinux 8.5 RPM
uses: actions/download-artifact@v2
with:
name: cc-metric-collector RPM for AlmaLinux 8.5
- name: Download AlmaLinux 8.5 SRPM
uses: actions/download-artifact@v2
with:
name: cc-metric-collector SRPM for AlmaLinux 8.5
- name: Download UBI 8 RPM
uses: actions/download-artifact@v2
with:
name: cc-metric-collector RPM for UBI 8
- name: Download UBI 8 SRPM
uses: actions/download-artifact@v2
with:
name: cc-metric-collector SRPM for UBI 8
# The download actions do not publish the name of the downloaded file,
# so we re-use the job outputs of the parent jobs. The files are all
# downloaded to the current folder.
# The gh-release action afterwards does not accept file lists but all
# files have to be listed at 'files'. The step creates one output per
# RPM package (2 per distro)
- name: Set RPM variables
id: files
run: |
ALMA_85_RPM=$(basename "${{ needs.AlmaLinux-RPM-build.outputs.rpm}}")
ALMA_85_SRPM=$(basename "${{ needs.AlmaLinux-RPM-build.outputs.srpm}}")
UBI_8_RPM=$(basename "${{ needs.UBI-8-RPM-build.outputs.rpm}}")
UBI_8_SRPM=$(basename "${{ needs.UBI-8-RPM-build.outputs.srpm}}")
echo "ALMA_85_RPM::${ALMA_85_RPM}"
echo "ALMA_85_SRPM::${ALMA_85_SRPM}"
echo "UBI_8_RPM::${UBI_8_RPM}"
echo "UBI_8_SRPM::${UBI_8_SRPM}"
echo "::set-output name=ALMA_85_RPM::${ALMA_85_RPM}"
echo "::set-output name=ALMA_85_SRPM::${ALMA_85_SRPM}"
echo "::set-output name=UBI_8_RPM::${UBI_8_RPM}"
echo "::set-output name=UBI_8_SRPM::${UBI_8_SRPM}"
# See: https://github.com/softprops/action-gh-release
- name: Release
uses: softprops/action-gh-release@v1
if: startsWith(github.ref, 'refs/tags/')
with:
name: cc-metric-collector-${{github.ref_name}}
files: |
${{ steps.files.outputs.ALMA_85_RPM }}
${{ steps.files.outputs.ALMA_85_SRPM }}
${{ steps.files.outputs.UBI_8_RPM }}
${{ steps.files.outputs.UBI_8_SRPM }}

View File

@ -15,6 +15,8 @@ COMPONENT_DIRS := collectors \
internal/ccTopology \ internal/ccTopology \
internal/multiChanTicker internal/multiChanTicker
BINDIR = bin
.PHONY: all .PHONY: all
all: $(APP) all: $(APP)
@ -24,15 +26,27 @@ $(APP): $(GOSRC)
go get go get
go build -o $(APP) $(GOSRC_APP) go build -o $(APP) $(GOSRC_APP)
install: $(APP)
@WORKSPACE=$(PREFIX)
@if [ -z "$${WORKSPACE}" ]; then exit 1; fi
@mkdir --parents --verbose $${WORKSPACE}/usr/$(BINDIR)
@install -Dpm 755 $(APP) $${WORKSPACE}/usr/$(BINDIR)/$(APP)
@mkdir --parents --verbose $${WORKSPACE}/etc/cc-metric-collector $${WORKSPACE}/etc/default $${WORKSPACE}/etc/systemd/system $${WORKSPACE}/etc/init.d
@install -Dpm 600 config.json $${WORKSPACE}/etc/cc-metric-collector/cc-metric-collector.json
@sed -i -e s+"\"./"+"\"/etc/cc-metric-collector/"+g $${WORKSPACE}/etc/cc-metric-collector/cc-metric-collector.json
@install -Dpm 600 sinks.json $${WORKSPACE}/etc/cc-metric-collector/sinks.json
@install -Dpm 600 collectors.json $${WORKSPACE}/etc/cc-metric-collector/collectors.json
@install -Dpm 600 router.json $${WORKSPACE}/etc/cc-metric-collector/router.json
@install -Dpm 600 receivers.json $${WORKSPACE}/etc/cc-metric-collector/receivers.json
@install -Dpm 600 scripts/cc-metric-collector.config $${WORKSPACE}/etc/default/cc-metric-collector
@install -Dpm 644 scripts/cc-metric-collector.service $${WORKSPACE}/etc/systemd/system/cc-metric-collector.service
@install -Dpm 644 scripts/cc-metric-collector.init $${WORKSPACE}/etc/init.d/cc-metric-collector
.PHONY: clean .PHONY: clean
.ONESHELL: .ONESHELL:
clean: clean:
@for COMP in $(COMPONENT_DIRS) @for COMP in $(COMPONENT_DIRS); do if [ -e $$COMP/Makefile ]; then make -C $$COMP clean; fi; done
do
if [[ -e $$COMP/Makefile ]]; then
make -C $$COMP clean
fi
done
rm -f $(APP) rm -f $(APP)
.PHONY: fmt .PHONY: fmt
@ -69,7 +83,7 @@ RPM: scripts/cc-metric-collector.spec
@COMMITISH="HEAD" @COMMITISH="HEAD"
@VERS=$$(git describe --tags $${COMMITISH}) @VERS=$$(git describe --tags $${COMMITISH})
@VERS=$${VERS#v} @VERS=$${VERS#v}
@VERS=$${VERS//-/_} @VERS=$$(echo $$VERS | sed -e s+'-'+'_'+g)
@eval $$(rpmspec --query --queryformat "NAME='%{name}' VERSION='%{version}' RELEASE='%{release}' NVR='%{NVR}' NVRA='%{NVRA}'" --define="VERS $${VERS}" "$${SPECFILE}") @eval $$(rpmspec --query --queryformat "NAME='%{name}' VERSION='%{version}' RELEASE='%{release}' NVR='%{NVR}' NVRA='%{NVRA}'" --define="VERS $${VERS}" "$${SPECFILE}")
@PREFIX="$${NAME}-$${VERSION}" @PREFIX="$${NAME}-$${VERSION}"
@FORMAT="tar.gz" @FORMAT="tar.gz"
@ -86,3 +100,28 @@ RPM: scripts/cc-metric-collector.spec
@ echo "::set-output name=SRPM::$${SRPMFILE}" @ echo "::set-output name=SRPM::$${SRPMFILE}"
@ echo "::set-output name=RPM::$${RPMFILE}" @ echo "::set-output name=RPM::$${RPMFILE}"
@fi @fi
.PHONY: DEB
DEB: scripts/cc-metric-collector.deb.control $(APP)
@BASEDIR=$${PWD}
@WORKSPACE=$${PWD}/.dpkgbuild
@DEBIANDIR=$${WORKSPACE}/debian
@DEBIANBINDIR=$${WORKSPACE}/DEBIAN
@mkdir --parents --verbose $$WORKSPACE $$DEBIANBINDIR
#@mkdir --parents --verbose $$DEBIANDIR
@CONTROLFILE="$${BASEDIR}/scripts/cc-metric-collector.deb.control"
@COMMITISH="HEAD"
@VERS=$$(git describe --tags --abbrev=0 $${COMMITISH})
@VERS=$${VERS#v}
@VERS=$$(echo $$VERS | sed -e s+'-'+'_'+g)
@ARCH=$$(uname -m)
@ARCH=$$(echo $$ARCH | sed -e s+'_'+'-'+g)
@PREFIX="$${NAME}-$${VERSION}_$${ARCH}"
@SIZE_BYTES=$$(du -bcs --exclude=.dpkgbuild "$$WORKSPACE"/ | awk '{print $$1}' | head -1 | sed -e 's/^0\+//')
@SIZE="$$(awk -v size="$$SIZE_BYTES" 'BEGIN {print (size/1024)+1}' | awk '{print int($$0)}')"
#@sed -e s+"{VERSION}"+"$$VERS"+g -e s+"{INSTALLED_SIZE}"+"$$SIZE"+g -e s+"{ARCH}"+"$$ARCH"+g $$CONTROLFILE > $${DEBIANDIR}/control
@sed -e s+"{VERSION}"+"$$VERS"+g -e s+"{INSTALLED_SIZE}"+"$$SIZE"+g -e s+"{ARCH}"+"$$ARCH"+g $$CONTROLFILE > $${DEBIANBINDIR}/control
@make PREFIX=$${WORKSPACE} install
@DEB_FILE="cc-metric-collector_$${VERS}_$${ARCH}.deb"
@dpkg-deb -b $${WORKSPACE} "$$DEB_FILE"
@rm -r "$${WORKSPACE}"

View File

@ -12,6 +12,12 @@
"proc_total" "proc_total"
] ]
}, },
"netstat": {
"include_devices": [
"enp5s0"
],
"send_derived_values": true
},
"numastats": {}, "numastats": {},
"nvidia": {}, "nvidia": {},
"tempstat": { "tempstat": {

View File

@ -17,7 +17,12 @@ import (
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
) )
const DEFAULT_GPFS_CMD = `mmpmon` const DEFAULT_GPFS_CMD = "mmpmon"
type GpfsCollectorLastState struct {
bytesRead int64
bytesWritten int64
}
type GpfsCollector struct { type GpfsCollector struct {
metricCollector metricCollector
@ -25,8 +30,11 @@ type GpfsCollector struct {
config struct { config struct {
Mmpmon string `json:"mmpmon_path,omitempty"` Mmpmon string `json:"mmpmon_path,omitempty"`
ExcludeFilesystem []string `json:"exclude_filesystem,omitempty"` ExcludeFilesystem []string `json:"exclude_filesystem,omitempty"`
SendBandwidths bool `json:"send_bandwidths"`
} }
skipFS map[string]struct{} skipFS map[string]struct{}
lastTimestamp time.Time // Store time stamp of last tick to derive bandwidths
lastState map[string]GpfsCollectorLastState
} }
func (m *GpfsCollector) Init(config json.RawMessage) error { func (m *GpfsCollector) Init(config json.RawMessage) error {
@ -40,7 +48,7 @@ func (m *GpfsCollector) Init(config json.RawMessage) error {
m.setup() m.setup()
// Set default mmpmon binary // Set default mmpmon binary
m.config.Mmpmon = string(DEFAULT_GPFS_CMD) m.config.Mmpmon = DEFAULT_GPFS_CMD
// Read JSON configuration // Read JSON configuration
if len(config) > 0 { if len(config) > 0 {
@ -89,6 +97,13 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) {
return return
} }
// Current time stamp
now := time.Now()
// time difference to last time stamp
timeDiff := now.Sub(m.lastTimestamp).Seconds()
// Save current timestamp
m.lastTimestamp = now
// mmpmon: // mmpmon:
// -p: generate output that can be parsed // -p: generate output that can be parsed
// -s: suppress the prompt on input // -s: suppress the prompt on input
@ -148,6 +163,12 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) {
} }
m.tags["filesystem"] = filesystem m.tags["filesystem"] = filesystem
if _, ok := m.lastState[filesystem]; !ok {
m.lastState[filesystem] = GpfsCollectorLastState{
bytesRead: -1,
bytesWritten: -1,
}
}
// return code // return code
rc, err := strconv.Atoi(key_value["_rc_"]) rc, err := strconv.Atoi(key_value["_rc_"])
@ -191,6 +212,14 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) {
if y, err := lp.New("gpfs_bytes_read", m.tags, m.meta, map[string]interface{}{"value": bytesRead}, timestamp); err == nil { if y, err := lp.New("gpfs_bytes_read", m.tags, m.meta, map[string]interface{}{"value": bytesRead}, timestamp); err == nil {
output <- y output <- y
} }
if m.config.SendBandwidths {
if lastBytesRead := m.lastState[filesystem].bytesRead; lastBytesRead >= 0 {
bwRead := float64(bytesRead-lastBytesRead) / timeDiff
if y, err := lp.New("gpfs_bw_read", m.tags, m.meta, map[string]interface{}{"value": bwRead}, timestamp); err == nil {
output <- y
}
}
}
// bytes written // bytes written
bytesWritten, err := strconv.ParseInt(key_value["_bw_"], 10, 64) bytesWritten, err := strconv.ParseInt(key_value["_bw_"], 10, 64)
@ -203,6 +232,21 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) {
if y, err := lp.New("gpfs_bytes_written", m.tags, m.meta, map[string]interface{}{"value": bytesWritten}, timestamp); err == nil { if y, err := lp.New("gpfs_bytes_written", m.tags, m.meta, map[string]interface{}{"value": bytesWritten}, timestamp); err == nil {
output <- y output <- y
} }
if m.config.SendBandwidths {
if lastBytesWritten := m.lastState[filesystem].bytesRead; lastBytesWritten >= 0 {
bwWrite := float64(bytesWritten-lastBytesWritten) / timeDiff
if y, err := lp.New("gpfs_bw_write", m.tags, m.meta, map[string]interface{}{"value": bwWrite}, timestamp); err == nil {
output <- y
}
}
}
if m.config.SendBandwidths {
m.lastState[filesystem] = GpfsCollectorLastState{
bytesRead: bytesRead,
bytesWritten: bytesWritten,
}
}
// number of opens // number of opens
numOpens, err := strconv.ParseInt(key_value["_oc_"], 10, 64) numOpens, err := strconv.ParseInt(key_value["_oc_"], 10, 64)

View File

@ -5,7 +5,8 @@
"mmpmon_path": "/path/to/mmpmon", "mmpmon_path": "/path/to/mmpmon",
"exclude_filesystem": [ "exclude_filesystem": [
"fs1" "fs1"
] ],
"send_bandwidths" : true
} }
``` ```
@ -18,13 +19,16 @@ in the configuration.
The path to the `mmpmon` command can be configured with the `mmpmon_path` option The path to the `mmpmon` command can be configured with the `mmpmon_path` option
in the configuration. If nothing is set, the collector searches in `$PATH` for `mmpmon`. in the configuration. If nothing is set, the collector searches in `$PATH` for `mmpmon`.
Metrics: Metrics:
* `bytes_read` * `gpfs_bytes_read`
* `gpfs_bytes_written` * `gpfs_bytes_written`
* `gpfs_num_opens` * `gpfs_num_opens`
* `gpfs_num_closes` * `gpfs_num_closes`
* `gpfs_num_reads` * `gpfs_num_reads`
* `gpfs_num_readdirs` * `gpfs_num_readdirs`
* `gpfs_num_inode_updates` * `gpfs_num_inode_updates`
* `gpfs_bw_read` (if `send_bandwidths == true`)
* `gpfs_bw_write` (if `send_bandwidths == true`)
The collector adds a `filesystem` tag to all metrics The collector adds a `filesystem` tag to all metrics

View File

@ -16,7 +16,7 @@ import (
"time" "time"
) )
const IB_BASEPATH = `/sys/class/infiniband/` const IB_BASEPATH = "/sys/class/infiniband/"
type InfinibandCollectorInfo struct { type InfinibandCollectorInfo struct {
LID string // IB local Identifier (LID) LID string // IB local Identifier (LID)
@ -24,14 +24,18 @@ type InfinibandCollectorInfo struct {
port string // IB device port port string // IB device port
portCounterFiles map[string]string // mapping counter name -> sysfs file portCounterFiles map[string]string // mapping counter name -> sysfs file
tagSet map[string]string // corresponding tag list tagSet map[string]string // corresponding tag list
lastState map[string]int64 // State from last measurement
} }
type InfinibandCollector struct { type InfinibandCollector struct {
metricCollector metricCollector
config struct { config struct {
ExcludeDevices []string `json:"exclude_devices,omitempty"` // IB device to exclude e.g. mlx5_0 ExcludeDevices []string `json:"exclude_devices,omitempty"` // IB device to exclude e.g. mlx5_0
SendAbsoluteValues bool `json:"send_abs_values"` // Send absolut values as read from sys filesystem
SendDerivedValues bool `json:"send_derived_values"` // Send derived values e.g. rates
} }
info []*InfinibandCollectorInfo info []*InfinibandCollectorInfo
lastTimestamp time.Time // Store time stamp of last tick to derive bandwidths
} }
// Init initializes the Infiniband collector by walking through files below IB_BASEPATH // Init initializes the Infiniband collector by walking through files below IB_BASEPATH
@ -49,6 +53,11 @@ func (m *InfinibandCollector) Init(config json.RawMessage) error {
"source": m.name, "source": m.name,
"group": "Network", "group": "Network",
} }
// Set default configuration,
m.config.SendAbsoluteValues = true
m.config.SendDerivedValues = false
// Read configuration file, allow overwriting default config
if len(config) > 0 { if len(config) > 0 {
err = json.Unmarshal(config, &m.config) err = json.Unmarshal(config, &m.config)
if err != nil { if err != nil {
@ -60,10 +69,10 @@ func (m *InfinibandCollector) Init(config json.RawMessage) error {
globPattern := filepath.Join(IB_BASEPATH, "*", "ports", "*") globPattern := filepath.Join(IB_BASEPATH, "*", "ports", "*")
ibDirs, err := filepath.Glob(globPattern) ibDirs, err := filepath.Glob(globPattern)
if err != nil { if err != nil {
return fmt.Errorf("Unable to glob files with pattern %s: %v", globPattern, err) return fmt.Errorf("unable to glob files with pattern %s: %v", globPattern, err)
} }
if ibDirs == nil { if ibDirs == nil {
return fmt.Errorf("Unable to find any directories with pattern %s", globPattern) return fmt.Errorf("unable to find any directories with pattern %s", globPattern)
} }
for _, path := range ibDirs { for _, path := range ibDirs {
@ -106,10 +115,16 @@ func (m *InfinibandCollector) Init(config json.RawMessage) error {
for _, counterFile := range portCounterFiles { for _, counterFile := range portCounterFiles {
err := unix.Access(counterFile, unix.R_OK) err := unix.Access(counterFile, unix.R_OK)
if err != nil { if err != nil {
return fmt.Errorf("Unable to access %s: %v", counterFile, err) return fmt.Errorf("unable to access %s: %v", counterFile, err)
} }
} }
// Initialize last state
lastState := make(map[string]int64)
for counter := range portCounterFiles {
lastState[counter] = -1
}
m.info = append(m.info, m.info = append(m.info,
&InfinibandCollectorInfo{ &InfinibandCollectorInfo{
LID: LID, LID: LID,
@ -122,11 +137,12 @@ func (m *InfinibandCollector) Init(config json.RawMessage) error {
"port": port, "port": port,
"lid": LID, "lid": LID,
}, },
lastState: lastState,
}) })
} }
if len(m.info) == 0 { if len(m.info) == 0 {
return fmt.Errorf("Found no IB devices") return fmt.Errorf("found no IB devices")
} }
m.init = true m.init = true
@ -141,9 +157,17 @@ func (m *InfinibandCollector) Read(interval time.Duration, output chan lp.CCMetr
return return
} }
// Current time stamp
now := time.Now() now := time.Now()
// time difference to last time stamp
timeDiff := now.Sub(m.lastTimestamp).Seconds()
// Save current timestamp
m.lastTimestamp = now
for _, info := range m.info { for _, info := range m.info {
for counterName, counterFile := range info.portCounterFiles { for counterName, counterFile := range info.portCounterFiles {
// Read counter file
line, err := ioutil.ReadFile(counterFile) line, err := ioutil.ReadFile(counterFile)
if err != nil { if err != nil {
cclog.ComponentError( cclog.ComponentError(
@ -152,6 +176,8 @@ func (m *InfinibandCollector) Read(interval time.Duration, output chan lp.CCMetr
continue continue
} }
data := strings.TrimSpace(string(line)) data := strings.TrimSpace(string(line))
// convert counter to int64
v, err := strconv.ParseInt(data, 10, 64) v, err := strconv.ParseInt(data, 10, 64)
if err != nil { if err != nil {
cclog.ComponentError( cclog.ComponentError(
@ -159,11 +185,27 @@ func (m *InfinibandCollector) Read(interval time.Duration, output chan lp.CCMetr
fmt.Sprintf("Read(): Failed to convert Infininiband metrice %s='%s' to int64: %v", counterName, data, err)) fmt.Sprintf("Read(): Failed to convert Infininiband metrice %s='%s' to int64: %v", counterName, data, err))
continue continue
} }
// Send absolut values
if m.config.SendAbsoluteValues {
if y, err := lp.New(counterName, info.tagSet, m.meta, map[string]interface{}{"value": v}, now); err == nil { if y, err := lp.New(counterName, info.tagSet, m.meta, map[string]interface{}{"value": v}, now); err == nil {
output <- y output <- y
} }
} }
// Send derived values
if m.config.SendDerivedValues {
if info.lastState[counterName] >= 0 {
rate := float64((v - info.lastState[counterName])) / timeDiff
if y, err := lp.New(counterName+"_bw", info.tagSet, m.meta, map[string]interface{}{"value": rate}, now); err == nil {
output <- y
}
}
// Save current state
info.lastState[counterName] = v
}
}
} }
} }

View File

@ -5,7 +5,9 @@
"ibstat": { "ibstat": {
"exclude_devices": [ "exclude_devices": [
"mlx4" "mlx4"
] ],
"send_abs_values": true,
"send_derived_values": true
} }
``` ```
@ -22,5 +24,9 @@ Metrics:
* `ib_xmit` * `ib_xmit`
* `ib_recv_pkts` * `ib_recv_pkts`
* `ib_xmit_pkts` * `ib_xmit_pkts`
* `ib_recv_bw` (if `send_derived_values == true`)
* `ib_xmit_bw` (if `send_derived_values == true`)
* `ib_recv_pkts_bw` (if `send_derived_values == true`)
* `ib_xmit_pkts_bw` (if `send_derived_values == true`)
The collector adds a `device` tag to all metrics The collector adds a `device` tag to all metrics

View File

@ -15,7 +15,6 @@ import (
"io/ioutil" "io/ioutil"
"math" "math"
"os" "os"
"regexp"
"strconv" "strconv"
"strings" "strings"
"time" "time"
@ -28,48 +27,6 @@ import (
"github.com/NVIDIA/go-nvml/pkg/dl" "github.com/NVIDIA/go-nvml/pkg/dl"
) )
type MetricScope string
const (
METRIC_SCOPE_HWTHREAD = iota
METRIC_SCOPE_CORE
METRIC_SCOPE_LLC
METRIC_SCOPE_NUMA
METRIC_SCOPE_DIE
METRIC_SCOPE_SOCKET
METRIC_SCOPE_NODE
)
func (ms MetricScope) String() string {
return string(ms)
}
func (ms MetricScope) Likwid() string {
LikwidDomains := map[string]string{
"cpu": "",
"core": "",
"llc": "C",
"numadomain": "M",
"die": "D",
"socket": "S",
"node": "N",
}
return LikwidDomains[string(ms)]
}
func (ms MetricScope) Granularity() int {
for i, g := range GetAllMetricScopes() {
if ms == g {
return i
}
}
return -1
}
func GetAllMetricScopes() []MetricScope {
return []MetricScope{"cpu" /*, "core", "llc", "numadomain", "die",*/, "socket", "node"}
}
const ( const (
LIKWID_LIB_NAME = "liblikwid.so" LIKWID_LIB_NAME = "liblikwid.so"
LIKWID_LIB_DL_FLAGS = dl.RTLD_LAZY | dl.RTLD_GLOBAL LIKWID_LIB_DL_FLAGS = dl.RTLD_LAZY | dl.RTLD_GLOBAL
@ -79,15 +36,13 @@ const (
type LikwidCollectorMetricConfig struct { type LikwidCollectorMetricConfig struct {
Name string `json:"name"` // Name of the metric Name string `json:"name"` // Name of the metric
Calc string `json:"calc"` // Calculation for the metric using Calc string `json:"calc"` // Calculation for the metric using
//Aggr string `json:"aggregation"` // if scope unequal to LIKWID metric scope, the values are combined (sum, min, max, mean or avg, median) Type string `json:"type"` // Metric type (aka node, socket, cpu, ...)
Scope MetricScope `json:"scope"` // scope for calculation. subscopes are aggregated using the 'aggregation' function
Publish bool `json:"publish"` Publish bool `json:"publish"`
granulatity MetricScope Unit string `json:"unit"` // Unit of metric if any
} }
type LikwidCollectorEventsetConfig struct { type LikwidCollectorEventsetConfig struct {
Events map[string]string `json:"events"` Events map[string]string `json:"events"`
granulatity map[string]MetricScope
Metrics []LikwidCollectorMetricConfig `json:"metrics"` Metrics []LikwidCollectorMetricConfig `json:"metrics"`
} }
@ -98,6 +53,7 @@ type LikwidCollectorConfig struct {
InvalidToZero bool `json:"invalid_to_zero,omitempty"` InvalidToZero bool `json:"invalid_to_zero,omitempty"`
AccessMode string `json:"access_mode,omitempty"` AccessMode string `json:"access_mode,omitempty"`
DaemonPath string `json:"accessdaemon_path,omitempty"` DaemonPath string `json:"accessdaemon_path,omitempty"`
LibraryPath string `json:"liblikwid_path,omitempty"`
} }
type LikwidCollector struct { type LikwidCollector struct {
@ -105,7 +61,6 @@ type LikwidCollector struct {
cpulist []C.int cpulist []C.int
cpu2tid map[int]int cpu2tid map[int]int
sock2tid map[int]int sock2tid map[int]int
scopeRespTids map[MetricScope]map[int]int
metrics map[C.int]map[string]int metrics map[C.int]map[string]int
groups []C.int groups []C.int
config LikwidCollectorConfig config LikwidCollectorConfig
@ -119,7 +74,7 @@ type LikwidCollector struct {
type LikwidMetric struct { type LikwidMetric struct {
name string name string
search string search string
scope MetricScope scope string
group_idx int group_idx int
} }
@ -131,152 +86,43 @@ func eventsToEventStr(events map[string]string) string {
return strings.Join(elist, ",") return strings.Join(elist, ",")
} }
func getGranularity(counter, event string) MetricScope {
if strings.HasPrefix(counter, "PMC") || strings.HasPrefix(counter, "FIXC") {
return "cpu"
} else if strings.Contains(counter, "BOX") || strings.Contains(counter, "DEV") {
return "socket"
} else if strings.HasPrefix(counter, "PWR") {
if event == "RAPL_CORE_ENERGY" {
return "cpu"
} else {
return "socket"
}
}
return "unknown"
}
func getBaseFreq() float64 { func getBaseFreq() float64 {
var freq float64 = math.NaN() var freq float64 = math.NaN()
C.power_init(0) C.power_init(0)
info := C.get_powerInfo() info := C.get_powerInfo()
if float64(info.baseFrequency) != 0 { if float64(info.baseFrequency) != 0 {
freq = float64(info.baseFrequency) * 1e3 freq = float64(info.baseFrequency) * 1e6
} else { } else {
buffer, err := ioutil.ReadFile("/sys/devices/system/cpu/cpu0/cpufreq/bios_limit") buffer, err := ioutil.ReadFile("/sys/devices/system/cpu/cpu0/cpufreq/bios_limit")
if err == nil { if err == nil {
data := strings.Replace(string(buffer), "\n", "", -1) data := strings.Replace(string(buffer), "\n", "", -1)
x, err := strconv.ParseInt(data, 0, 64) x, err := strconv.ParseInt(data, 0, 64)
if err == nil { if err == nil {
freq = float64(x) * 1e3 freq = float64(x) * 1e6
} }
} }
} }
return freq return freq
} }
func (m *LikwidCollector) initGranularity() {
splitRegex := regexp.MustCompile("[+-/*()]")
for _, evset := range m.config.Eventsets {
evset.granulatity = make(map[string]MetricScope)
for counter, event := range evset.Events {
gran := getGranularity(counter, event)
if gran.Granularity() >= 0 {
evset.granulatity[counter] = gran
}
}
for i, metric := range evset.Metrics {
s := splitRegex.Split(metric.Calc, -1)
gran := MetricScope("cpu")
evset.Metrics[i].granulatity = gran
for _, x := range s {
if _, ok := evset.Events[x]; ok {
if evset.granulatity[x].Granularity() > gran.Granularity() {
gran = evset.granulatity[x]
}
}
}
evset.Metrics[i].granulatity = gran
}
}
for i, metric := range m.config.Metrics {
s := splitRegex.Split(metric.Calc, -1)
gran := MetricScope("cpu")
m.config.Metrics[i].granulatity = gran
for _, x := range s {
for _, evset := range m.config.Eventsets {
for _, m := range evset.Metrics {
if m.Name == x && m.granulatity.Granularity() > gran.Granularity() {
gran = m.granulatity
}
}
}
}
m.config.Metrics[i].granulatity = gran
}
}
type TopoResolveFunc func(cpuid int) int
func (m *LikwidCollector) getResponsiblities() map[MetricScope]map[int]int {
get_cpus := func(scope MetricScope) map[int]int {
var slist []int
var cpu C.int
var input func(index int) string
switch scope {
case "node":
slist = []int{0}
input = func(index int) string { return "N:0" }
case "socket":
input = func(index int) string { return fmt.Sprintf("%s%d:0", scope.Likwid(), index) }
slist = topo.SocketList()
// case "numadomain":
// input = func(index int) string { return fmt.Sprintf("%s%d:0", scope.Likwid(), index) }
// slist = topo.NumaNodeList()
// cclog.Debug(scope, " ", input(0), " ", slist)
// case "die":
// input = func(index int) string { return fmt.Sprintf("%s%d:0", scope.Likwid(), index) }
// slist = topo.DieList()
// case "llc":
// input = fmt.Sprintf("%s%d:0", scope.Likwid(), s)
// slist = topo.LLCacheList()
case "cpu":
input = func(index int) string { return fmt.Sprintf("%d", index) }
slist = topo.CpuList()
case "hwthread":
input = func(index int) string { return fmt.Sprintf("%d", index) }
slist = topo.CpuList()
}
outmap := make(map[int]int)
for _, s := range slist {
t := C.CString(input(s))
clen := C.cpustr_to_cpulist(t, &cpu, 1)
if int(clen) == 1 {
outmap[s] = m.cpu2tid[int(cpu)]
} else {
cclog.Error(fmt.Sprintf("Cannot determine responsible CPU for %s", input(s)))
outmap[s] = -1
}
C.free(unsafe.Pointer(t))
}
return outmap
}
scopes := GetAllMetricScopes()
complete := make(map[MetricScope]map[int]int)
for _, s := range scopes {
complete[s] = get_cpus(s)
}
return complete
}
func (m *LikwidCollector) Init(config json.RawMessage) error { func (m *LikwidCollector) Init(config json.RawMessage) error {
var ret C.int var ret C.int
m.name = "LikwidCollector" m.name = "LikwidCollector"
m.config.AccessMode = LIKWID_DEF_ACCESSMODE m.config.AccessMode = LIKWID_DEF_ACCESSMODE
m.config.LibraryPath = LIKWID_LIB_NAME
if len(config) > 0 { if len(config) > 0 {
err := json.Unmarshal(config, &m.config) err := json.Unmarshal(config, &m.config)
if err != nil { if err != nil {
return err return err
} }
} }
lib := dl.New(LIKWID_LIB_NAME, LIKWID_LIB_DL_FLAGS) lib := dl.New(m.config.LibraryPath, LIKWID_LIB_DL_FLAGS)
if lib == nil { if lib == nil {
return fmt.Errorf("error instantiating DynamicLibrary for %s", LIKWID_LIB_NAME) return fmt.Errorf("error instantiating DynamicLibrary for %s", m.config.LibraryPath)
} }
err := lib.Open() err := lib.Open()
if err != nil { if err != nil {
return fmt.Errorf("error opening %s: %v", LIKWID_LIB_NAME, err) return fmt.Errorf("error opening %s: %v", m.config.LibraryPath, err)
} }
if m.config.ForceOverwrite { if m.config.ForceOverwrite {
@ -306,10 +152,6 @@ func (m *LikwidCollector) Init(config json.RawMessage) error {
return err return err
} }
// Determine which counter works at which level. PMC*: cpu, *BOX*: socket, ...
m.initGranularity()
// Generate map for MetricScope -> scope_id (like socket id) -> responsible id (offset in cpulist)
m.scopeRespTids = m.getResponsiblities()
switch m.config.AccessMode { switch m.config.AccessMode {
case "direct": case "direct":
C.HPMmode(0) C.HPMmode(0)
@ -336,6 +178,9 @@ func (m *LikwidCollector) Init(config json.RawMessage) error {
globalParams["inverseClock"] = float64(1.0) globalParams["inverseClock"] = float64(1.0)
// While adding the events, we test the metrics whether they can be computed at all // While adding the events, we test the metrics whether they can be computed at all
for i, evset := range m.config.Eventsets { for i, evset := range m.config.Eventsets {
var gid C.int
var cstr *C.char
if len(evset.Events) > 0 {
estr := eventsToEventStr(evset.Events) estr := eventsToEventStr(evset.Events)
// Generate parameter list for the metric computing test // Generate parameter list for the metric computing test
params := make(map[string]interface{}) params := make(map[string]interface{})
@ -357,8 +202,12 @@ func (m *LikwidCollector) Init(config json.RawMessage) error {
} }
} }
// Now we add the list of events to likwid // Now we add the list of events to likwid
cstr := C.CString(estr) cstr = C.CString(estr)
gid := C.perfmon_addEventSet(cstr) gid = C.perfmon_addEventSet(cstr)
} else {
cclog.ComponentError(m.name, "Invalid Likwid eventset config, no events given")
continue
}
if gid >= 0 { if gid >= 0 {
m.groups = append(m.groups, gid) m.groups = append(m.groups, gid)
} }
@ -434,15 +283,9 @@ func (m *LikwidCollector) calcEventsetMetrics(group int, interval time.Duration,
// Go over events and get the results // Go over events and get the results
for eidx = 0; int(eidx) < len(evset.Events); eidx++ { for eidx = 0; int(eidx) < len(evset.Events); eidx++ {
ctr := C.perfmon_getCounterName(gid, eidx) ctr := C.perfmon_getCounterName(gid, eidx)
ev := C.perfmon_getEventName(gid, eidx)
gctr := C.GoString(ctr) gctr := C.GoString(ctr)
gev := C.GoString(ev)
// MetricScope for the counter (and if needed the event) for _, tid := range m.cpu2tid {
scope := getGranularity(gctr, gev)
// Get the map scope-id -> tids
// This way we read less counters like only the responsible hardware thread for a socket
scopemap := m.scopeRespTids[scope]
for _, tid := range scopemap {
if tid >= 0 { if tid >= 0 {
m.results[group][tid]["time"] = interval.Seconds() m.results[group][tid]["time"] = interval.Seconds()
m.results[group][tid]["inverseClock"] = invClock m.results[group][tid]["inverseClock"] = invClock
@ -456,7 +299,10 @@ func (m *LikwidCollector) calcEventsetMetrics(group int, interval time.Duration,
for _, metric := range evset.Metrics { for _, metric := range evset.Metrics {
// The metric scope is determined in the Init() function // The metric scope is determined in the Init() function
// Get the map scope-id -> tids // Get the map scope-id -> tids
scopemap := m.scopeRespTids[metric.Scope] scopemap := m.cpu2tid
if metric.Type == "socket" {
scopemap = m.sock2tid
}
for domain, tid := range scopemap { for domain, tid := range scopemap {
if tid >= 0 { if tid >= 0 {
value, err := agg.EvalFloat64Condition(metric.Calc, m.results[group][tid]) value, err := agg.EvalFloat64Condition(metric.Calc, m.results[group][tid])
@ -474,13 +320,15 @@ func (m *LikwidCollector) calcEventsetMetrics(group int, interval time.Duration,
// Now we have the result, send it with the proper tags // Now we have the result, send it with the proper tags
if !math.IsNaN(value) { if !math.IsNaN(value) {
if metric.Publish { if metric.Publish {
tags := map[string]string{"type": metric.Scope.String()}
if metric.Scope != "node" {
tags["type-id"] = fmt.Sprintf("%d", domain)
}
fields := map[string]interface{}{"value": value} fields := map[string]interface{}{"value": value}
y, err := lp.New(metric.Name, tags, m.meta, fields, time.Now()) y, err := lp.New(metric.Name, map[string]string{"type": metric.Type}, m.meta, fields, time.Now())
if err == nil { if err == nil {
if metric.Type != "node" {
y.AddTag("type-id", fmt.Sprintf("%d", domain))
}
if len(metric.Unit) > 0 {
y.AddMeta("unit", metric.Unit)
}
output <- y output <- y
} }
} }
@ -495,7 +343,10 @@ func (m *LikwidCollector) calcEventsetMetrics(group int, interval time.Duration,
// Go over the global metrics, derive the value out of the event sets' metric values and send it // Go over the global metrics, derive the value out of the event sets' metric values and send it
func (m *LikwidCollector) calcGlobalMetrics(interval time.Duration, output chan lp.CCMetric) error { func (m *LikwidCollector) calcGlobalMetrics(interval time.Duration, output chan lp.CCMetric) error {
for _, metric := range m.config.Metrics { for _, metric := range m.config.Metrics {
scopemap := m.scopeRespTids[metric.Scope] scopemap := m.cpu2tid
if metric.Type == "socket" {
scopemap = m.sock2tid
}
for domain, tid := range scopemap { for domain, tid := range scopemap {
if tid >= 0 { if tid >= 0 {
// Here we generate parameter list // Here we generate parameter list
@ -521,13 +372,16 @@ func (m *LikwidCollector) calcGlobalMetrics(interval time.Duration, output chan
// Now we have the result, send it with the proper tags // Now we have the result, send it with the proper tags
if !math.IsNaN(value) { if !math.IsNaN(value) {
if metric.Publish { if metric.Publish {
tags := map[string]string{"type": metric.Scope.String()} tags := map[string]string{"type": metric.Type}
if metric.Scope != "node" {
tags["type-id"] = fmt.Sprintf("%d", domain)
}
fields := map[string]interface{}{"value": value} fields := map[string]interface{}{"value": value}
y, err := lp.New(metric.Name, tags, m.meta, fields, time.Now()) y, err := lp.New(metric.Name, tags, m.meta, fields, time.Now())
if err == nil { if err == nil {
if metric.Type != "node" {
y.AddTag("type-id", fmt.Sprintf("%d", domain))
}
if len(metric.Unit) > 0 {
y.AddMeta("unit", metric.Unit)
}
output <- y output <- y
} }
} }

View File

@ -4,14 +4,17 @@
The `likwid` collector is probably the most complicated collector. The LIKWID library is included as static library with *direct* access mode. The *direct* access mode is suitable if the daemon is executed by a root user. The static library does not contain the performance groups, so all information needs to be provided in the configuration. The `likwid` collector is probably the most complicated collector. The LIKWID library is included as static library with *direct* access mode. The *direct* access mode is suitable if the daemon is executed by a root user. The static library does not contain the performance groups, so all information needs to be provided in the configuration.
The `likwid` configuration consists of two parts, the "eventsets" and "globalmetrics": The `likwid` configuration consists of two parts, the "eventsets" and "globalmetrics":
- An event set list itself has two parts, the "events" and a set of derivable "metrics". Each of the "events" is a counter:event pair in LIKWID's syntax. The "metrics" are a list of formulas to derive the metric value from the measurements of the "events". Each metric has a name, the formula, a scope and a publish flag. Counter names can be used like variables in the formulas, so `PMC0+PMC1` sums the measurements for the both events configured in the counters `PMC0` and `PMC1`. The scope tells the Collector whether it is a metric for each hardware thread (`cpu`) or each CPU socket (`socket`). The last one is the publishing flag. It tells the collector whether a metric should be sent to the router. - An event set list itself has two parts, the "events" and a set of derivable "metrics". Each of the "events" is a counter:event pair in LIKWID's syntax. The "metrics" are a list of formulas to derive the metric value from the measurements of the "events". Each metric has a name, the formula, a scope and a publish flag. Counter names can be used like variables in the formulas, so `PMC0+PMC1` sums the measurements for the both events configured in the counters `PMC0` and `PMC1`. The scope tells the Collector whether it is a metric for each hardware thread (`cpu`) or each CPU socket (`socket`). You may specify a unit for the metric with `unit`. The last one is the publishing flag. It tells the collector whether a metric should be sent to the router.
- The global metrics are metrics which require data from all event set measurements to be derived. The inputs are the metrics in the event sets. Similar to the metrics in the event sets, the global metrics are defined by a name, a formula, a scope and a publish flag. See event set metrics for details. The only difference is that there is no access to the raw event measurements anymore but only to the metrics. So, the idea is to derive a metric in the "eventsets" section and reuse it in the "globalmetrics" part. If you need a metric only for deriving the global metrics, disable forwarding of the event set metrics. **Be aware** that the combination might be misleading because the "behavior" of a metric changes over time and the multiple measurements might count different computing phases. - The global metrics are metrics which require data from all event set measurements to be derived. The inputs are the metrics in the event sets. Similar to the metrics in the event sets, the global metrics are defined by a name, a formula, a scope and a publish flag. See event set metrics for details. The only difference is that there is no access to the raw event measurements anymore but only to the metrics. So, the idea is to derive a metric in the "eventsets" section and reuse it in the "globalmetrics" part. If you need a metric only for deriving the global metrics, disable forwarding of the event set metrics (`publish=false`). **Be aware** that the combination might be misleading because the "behavior" of a metric changes over time and the multiple measurements might count different computing phases. Similar to the metrics in the eventset, you can specify a metric unit with the `unit` field.
Additional options: Additional options:
- `access_mode` : Method to use for hardware performance monitoring (`direct` access as root user, `accessdaemon` for the daemon mode) - `access_mode` : Method to use for hardware performance monitoring (`direct` access as root user, `accessdaemon` for the daemon mode)
- `accessdaemon_path`: Folder with the access daemon `likwid-accessD`, commonly `$LIKWID_INSTALL_LOC/sbin` - `accessdaemon_path`: Folder with the access daemon `likwid-accessD`, commonly `$LIKWID_INSTALL_LOC/sbin`
- `force_overwrite`: Same as setting `LIKWID_FORCE=1`. In case counters are already in-use, LIKWID overwrites their configuration to do its measurements - `force_overwrite`: Same as setting `LIKWID_FORCE=1`. In case counters are already in-use, LIKWID overwrites their configuration to do its measurements
- `invalid_to_zero`: In some cases, the calculations result in `NaN` or `Inf`. With this option, all `NaN` and `Inf` values are replaces with `0.0`. - `invalid_to_zero`: In some cases, the calculations result in `NaN` or `Inf`. With this option, all `NaN` and `Inf` values are replaces with `0.0`.
- `access_mode`: Specify LIKWID access mode: `direct` for direct register access as root user or `accessdaemon`
- `accessdaemon_path`: Folder of the accessDaemon `likwid-accessD`
- `liblikwid_path`: Location of `liblikwid.so`
### Available metric scopes ### Available metric scopes
@ -54,7 +57,8 @@ $ scripts/likwid_perfgroup_to_cc_config.py ICX MEM_DP
"calc": "time", "calc": "time",
"name": "Runtime (RDTSC) [s]", "name": "Runtime (RDTSC) [s]",
"publish": true, "publish": true,
"scope": "hwthread" "unit": "seconds"
"scope": "cpu"
}, },
{ {
"..." : "..." "..." : "..."
@ -104,25 +108,28 @@ $ chwon $CCUSER /var/run/likwid.lock
{ {
"name": "ipc", "name": "ipc",
"calc": "PMC0/PMC1", "calc": "PMC0/PMC1",
"scope": "cpu", "type": "cpu",
"publish": true "publish": true
}, },
{ {
"name": "flops_any", "name": "flops_any",
"calc": "0.000001*PMC2/time", "calc": "0.000001*PMC2/time",
"scope": "cpu", "unit": "MFlops/s",
"type": "cpu",
"publish": true "publish": true
}, },
{ {
"name": "clock_mhz", "name": "clock",
"calc": "0.000001*(FIXC1/FIXC2)/inverseClock", "calc": "0.000001*(FIXC1/FIXC2)/inverseClock",
"scope": "cpu", "type": "cpu",
"unit": "MHz",
"publish": true "publish": true
}, },
{ {
"name": "mem1", "name": "mem1",
"calc": "0.000001*(DFC0+DFC1+DFC2+DFC3)*64.0/time", "calc": "0.000001*(DFC0+DFC1+DFC2+DFC3)*64.0/time",
"scope": "socket", "unit": "Mbyte/s",
"type": "socket",
"publish": false "publish": false
} }
] ]
@ -140,19 +147,22 @@ $ chwon $CCUSER /var/run/likwid.lock
{ {
"name": "pwr_core", "name": "pwr_core",
"calc": "PWR0/time", "calc": "PWR0/time",
"scope": "socket", "unit": "Watt"
"type": "socket",
"publish": true "publish": true
}, },
{ {
"name": "pwr_pkg", "name": "pwr_pkg",
"calc": "PWR1/time", "calc": "PWR1/time",
"scope": "socket", "type": "socket",
"unit": "Watt"
"publish": true "publish": true
}, },
{ {
"name": "mem2", "name": "mem2",
"calc": "0.000001*(DFC0+DFC1+DFC2+DFC3)*64.0/time", "calc": "0.000001*(DFC0+DFC1+DFC2+DFC3)*64.0/time",
"scope": "socket", "unit": "Mbyte/s",
"type": "socket",
"publish": false "publish": false
} }
] ]
@ -162,7 +172,8 @@ $ chwon $CCUSER /var/run/likwid.lock
{ {
"name": "mem_bw", "name": "mem_bw",
"calc": "mem1+mem2", "calc": "mem1+mem2",
"scope": "socket", "type": "socket",
"unit": "Mbyte/s",
"publish": true "publish": true
} }
] ]
@ -198,3 +209,4 @@ IPC PMC0/PMC1 -> {
-> ] -> ]
``` ```
The script `scripts/likwid_perfgroup_to_cc_config.py` might help you.

View File

@ -23,6 +23,8 @@ type LustreCollectorConfig struct {
ExcludeMetrics []string `json:"exclude_metrics"` ExcludeMetrics []string `json:"exclude_metrics"`
SendAllMetrics bool `json:"send_all_metrics"` SendAllMetrics bool `json:"send_all_metrics"`
Sudo bool `json:"use_sudo"` Sudo bool `json:"use_sudo"`
SendAbsoluteValues bool `json:"send_abs_values"`
SendDerivedValues bool `json:"send_derived_values"`
} }
type LustreCollector struct { type LustreCollector struct {
@ -33,6 +35,7 @@ type LustreCollector struct {
config LustreCollectorConfig config LustreCollectorConfig
lctl string lctl string
sudoCmd string sudoCmd string
lastTimestamp time.Time // Store time stamp of last tick to derive bandwidths
} }
func (m *LustreCollector) getDeviceDataCommand(device string) []string { func (m *LustreCollector) getDeviceDataCommand(device string) []string {
@ -165,6 +168,7 @@ func (m *LustreCollector) Init(config json.RawMessage) error {
} }
} }
} }
m.lastTimestamp = time.Now()
m.init = true m.init = true
return nil return nil
} }
@ -173,6 +177,8 @@ func (m *LustreCollector) Read(interval time.Duration, output chan lp.CCMetric)
if !m.init { if !m.init {
return return
} }
now := time.Now()
tdiff := now.Sub(m.lastTimestamp)
for device, devData := range m.stats { for device, devData := range m.stats {
stats := m.getDeviceDataCommand(device) stats := m.getDeviceDataCommand(device)
processed := []string{} processed := []string{}
@ -183,14 +189,13 @@ func (m *LustreCollector) Read(interval time.Duration, output chan lp.CCMetric)
if fields, ok := m.matches[lf[0]]; ok { if fields, ok := m.matches[lf[0]]; ok {
for name, idx := range fields { for name, idx := range fields {
x, err := strconv.ParseInt(lf[idx], 0, 64) x, err := strconv.ParseInt(lf[idx], 0, 64)
if err != nil { if err == nil {
continue
}
value := x - devData[name] value := x - devData[name]
devData[name] = x devData[name] = x
if value < 0 { if value < 0 {
value = 0 value = 0
} }
if m.config.SendAbsoluteValues {
y, err := lp.New(name, m.tags, m.meta, map[string]interface{}{"value": value}, time.Now()) y, err := lp.New(name, m.tags, m.meta, map[string]interface{}{"value": value}, time.Now())
if err == nil { if err == nil {
y.AddTag("device", device) y.AddTag("device", device)
@ -203,6 +208,19 @@ func (m *LustreCollector) Read(interval time.Duration, output chan lp.CCMetric)
} }
} }
} }
if m.config.SendDerivedValues && strings.Contains(name, "bytes") {
y, err := lp.New(name+"_bw", m.tags, m.meta, map[string]interface{}{"value": float64(value) / tdiff.Seconds()}, time.Now())
if err == nil {
y.AddTag("device", device)
y.AddMeta("unit", "Bytes/sec")
output <- y
if m.config.SendAllMetrics {
processed = append(processed, name)
}
}
}
}
}
} }
} }
} }
@ -221,6 +239,7 @@ func (m *LustreCollector) Read(interval time.Duration, output chan lp.CCMetric)
} }
} }
} }
m.lastTimestamp = now
} }
func (m *LustreCollector) Close() { func (m *LustreCollector) Close() {

View File

@ -9,21 +9,26 @@
"exclude_metrics": [ "exclude_metrics": [
"setattr", "setattr",
"getattr" "getattr"
] ],
"send_abs_values" : true,
"send_derived_values" : true
} }
``` ```
The `lustrestat` collector reads from the procfs stat files for Lustre like `/proc/fs/lustre/llite/lnec-XXXXXX/stats`. The `lustrestat` collector reads from the procfs stat files for Lustre like `/proc/fs/lustre/llite/lnec-XXXXXX/stats`.
Metrics: Metrics:
* `read_bytes` * `lustre_read_bytes`
* `read_requests` * `lustre_read_requests`
* `write_bytes` * `lustre_write_bytes`
* `write_requests` * `lustre_write_requests`
* `open` * `lustre_open`
* `close` * `lustre_close`
* `getattr` * `lustre_getattr`
* `setattr` * `lustre_setattr`
* `statfs` * `lustre_statfs`
* `inode_permission` * `lustre_inode_permission`
* `lustre_read_bytes_bw` (if `send_derived_values == true`)
* `lustre_write_bytes_bw` (if `send_derived_values == true`)
This collector adds an `device` tag.

View File

@ -13,22 +13,27 @@ import (
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
) )
const NETSTATFILE = `/proc/net/dev` const NETSTATFILE = "/proc/net/dev"
type NetstatCollectorConfig struct { type NetstatCollectorConfig struct {
IncludeDevices []string `json:"include_devices"` IncludeDevices []string `json:"include_devices"`
SendAbsoluteValues bool `json:"send_abs_values"`
SendDerivedValues bool `json:"send_derived_values"`
} }
type NetstatCollectorMetric struct { type NetstatCollectorMetric struct {
name string
index int index int
lastValue float64 tags map[string]string
meta map[string]string
meta_rates map[string]string
lastValue int64
} }
type NetstatCollector struct { type NetstatCollector struct {
metricCollector metricCollector
config NetstatCollectorConfig config NetstatCollectorConfig
matches map[string]map[string]NetstatCollectorMetric matches map[string][]NetstatCollectorMetric
devtags map[string]map[string]string
lastTimestamp time.Time lastTimestamp time.Time
} }
@ -36,15 +41,33 @@ func (m *NetstatCollector) Init(config json.RawMessage) error {
m.name = "NetstatCollector" m.name = "NetstatCollector"
m.setup() m.setup()
m.lastTimestamp = time.Now() m.lastTimestamp = time.Now()
m.meta = map[string]string{"source": m.name, "group": "Network"}
m.devtags = make(map[string]map[string]string) const (
nameIndexMap := map[string]int{ fieldInterface = iota
"net_bytes_in": 1, fieldReceiveBytes
"net_pkts_in": 2, fieldReceivePackets
"net_bytes_out": 9, fieldReceiveErrs
"net_pkts_out": 10, fieldReceiveDrop
} fieldReceiveFifo
m.matches = make(map[string]map[string]NetstatCollectorMetric) fieldReceiveFrame
fieldReceiveCompressed
fieldReceiveMulticast
fieldTransmitBytes
fieldTransmitPackets
fieldTransmitErrs
fieldTransmitDrop
fieldTransmitFifo
fieldTransmitColls
fieldTransmitCarrier
fieldTransmitCompressed
)
m.matches = make(map[string][]NetstatCollectorMetric)
// Set default configuration,
m.config.SendAbsoluteValues = true
m.config.SendDerivedValues = false
// Read configuration file, allow overwriting default config
if len(config) > 0 { if len(config) > 0 {
err := json.Unmarshal(config, &m.config) err := json.Unmarshal(config, &m.config)
if err != nil { if err != nil {
@ -52,7 +75,9 @@ func (m *NetstatCollector) Init(config json.RawMessage) error {
return err return err
} }
} }
file, err := os.Open(string(NETSTATFILE))
// Check access to net statistic file
file, err := os.Open(NETSTATFILE)
if err != nil { if err != nil {
cclog.ComponentError(m.name, err.Error()) cclog.ComponentError(m.name, err.Error())
return err return err
@ -62,23 +87,65 @@ func (m *NetstatCollector) Init(config json.RawMessage) error {
scanner := bufio.NewScanner(file) scanner := bufio.NewScanner(file)
for scanner.Scan() { for scanner.Scan() {
l := scanner.Text() l := scanner.Text()
// Skip lines with no net device entry
if !strings.Contains(l, ":") { if !strings.Contains(l, ":") {
continue continue
} }
// Split line into fields
f := strings.Fields(l) f := strings.Fields(l)
// Get net device entry
dev := strings.Trim(f[0], ": ") dev := strings.Trim(f[0], ": ")
// Check if device is a included device
if _, ok := stringArrayContains(m.config.IncludeDevices, dev); ok { if _, ok := stringArrayContains(m.config.IncludeDevices, dev); ok {
m.matches[dev] = make(map[string]NetstatCollectorMetric) tags := map[string]string{"device": dev, "type": "node"}
for name, idx := range nameIndexMap { meta_unit_byte := map[string]string{"source": m.name, "group": "Network", "unit": "bytes"}
m.matches[dev][name] = NetstatCollectorMetric{ meta_unit_byte_per_sec := map[string]string{"source": m.name, "group": "Network", "unit": "bytes/sec"}
index: idx, meta_unit_pkts := map[string]string{"source": m.name, "group": "Network", "unit": "packets"}
lastValue: 0, meta_unit_pkts_per_sec := map[string]string{"source": m.name, "group": "Network", "unit": "packets/sec"}
m.matches[dev] = []NetstatCollectorMetric{
{
name: "net_bytes_in",
index: fieldReceiveBytes,
lastValue: -1,
tags: tags,
meta: meta_unit_byte,
meta_rates: meta_unit_byte_per_sec,
},
{
name: "net_pkts_in",
index: fieldReceivePackets,
lastValue: -1,
tags: tags,
meta: meta_unit_pkts,
meta_rates: meta_unit_pkts_per_sec,
},
{
name: "net_bytes_out",
index: fieldTransmitBytes,
lastValue: -1,
tags: tags,
meta: meta_unit_byte,
meta_rates: meta_unit_byte_per_sec,
},
{
name: "net_pkts_out",
index: fieldTransmitPackets,
lastValue: -1,
tags: tags,
meta: meta_unit_pkts,
meta_rates: meta_unit_pkts_per_sec,
},
} }
} }
m.devtags[dev] = map[string]string{"device": dev, "type": "node"}
} }
}
if len(m.devtags) == 0 { if len(m.matches) == 0 {
return errors.New("no devices to collector metrics found") return errors.New("no devices to collector metrics found")
} }
m.init = true m.init = true
@ -89,50 +156,62 @@ func (m *NetstatCollector) Read(interval time.Duration, output chan lp.CCMetric)
if !m.init { if !m.init {
return return
} }
// Current time stamp
now := time.Now() now := time.Now()
// time difference to last time stamp
timeDiff := now.Sub(m.lastTimestamp).Seconds()
// Save current timestamp
m.lastTimestamp = now
file, err := os.Open(string(NETSTATFILE)) file, err := os.Open(string(NETSTATFILE))
if err != nil { if err != nil {
cclog.ComponentError(m.name, err.Error()) cclog.ComponentError(m.name, err.Error())
return return
} }
defer file.Close() defer file.Close()
tdiff := now.Sub(m.lastTimestamp)
scanner := bufio.NewScanner(file) scanner := bufio.NewScanner(file)
for scanner.Scan() { for scanner.Scan() {
l := scanner.Text() l := scanner.Text()
// Skip lines with no net device entry
if !strings.Contains(l, ":") { if !strings.Contains(l, ":") {
continue continue
} }
// Split line into fields
f := strings.Fields(l) f := strings.Fields(l)
// Get net device entry
dev := strings.Trim(f[0], ":") dev := strings.Trim(f[0], ":")
// Check if device is a included device
if devmetrics, ok := m.matches[dev]; ok { if devmetrics, ok := m.matches[dev]; ok {
for name, data := range devmetrics { for i := range devmetrics {
v, err := strconv.ParseFloat(f[data.index], 64) metric := &devmetrics[i]
if err == nil {
vdiff := v - data.lastValue // Read value
value := vdiff / tdiff.Seconds() v, err := strconv.ParseInt(f[metric.index], 10, 64)
if data.lastValue == 0 { if err != nil {
value = 0 continue
}
data.lastValue = v
y, err := lp.New(name, m.devtags[dev], m.meta, map[string]interface{}{"value": value}, now)
if err == nil {
switch {
case strings.Contains(name, "byte"):
y.AddMeta("unit", "bytes/sec")
case strings.Contains(name, "pkt"):
y.AddMeta("unit", "packets/sec")
} }
if m.config.SendAbsoluteValues {
if y, err := lp.New(metric.name, metric.tags, metric.meta, map[string]interface{}{"value": v}, now); err == nil {
output <- y output <- y
} }
devmetrics[name] = data }
if m.config.SendDerivedValues {
if metric.lastValue >= 0 {
rate := float64(v-metric.lastValue) / timeDiff
if y, err := lp.New(metric.name+"_bw", metric.tags, metric.meta_rates, map[string]interface{}{"value": rate}, now); err == nil {
output <- y
}
}
metric.lastValue = v
} }
} }
} }
} }
m.lastTimestamp = time.Now()
} }
func (m *NetstatCollector) Close() { func (m *NetstatCollector) Close() {

View File

@ -5,17 +5,23 @@
"netstat": { "netstat": {
"include_devices": [ "include_devices": [
"eth0" "eth0"
] ],
"send_abs_values" : true,
"send_derived_values" : true
} }
``` ```
The `netstat` collector reads data from `/proc/net/dev` and outputs a handful **node** metrics. With the `include_devices` list you can specify which network devices should be measured. **Note**: Most other collectors use an _exclude_ list instead of an include list. The `netstat` collector reads data from `/proc/net/dev` and outputs a handful **node** metrics. With the `include_devices` list you can specify which network devices should be measured. **Note**: Most other collectors use an _exclude_ list instead of an include list.
Metrics: Metrics:
* `net_bytes_in` (`unit=bytes/sec`) * `net_bytes_in` (`unit=bytes`)
* `net_bytes_out` (`unit=bytes/sec`) * `net_bytes_out` (`unit=bytes`)
* `net_pkts_in` (`unit=packets/sec`) * `net_pkts_in` (`unit=packets`)
* `net_pkts_out` (`unit=packets/sec`) * `net_pkts_out` (`unit=packets`)
* `net_bytes_in_bw` (`unit=bytes/sec` if `send_derived_values == true`)
* `net_bytes_out_bw` (`unit=bytes/sec` if `send_derived_values == true`)
* `net_pkts_in_bw` (`unit=packets/sec` if `send_derived_values == true`)
* `net_pkts_out_bw` (`unit=packets/sec` if `send_derived_values == true`)
The device name is added as tag `device`. The device name is added as tag `device`.

View File

@ -0,0 +1,12 @@
Package: cc-metric-collector
Version: {VERSION}
Installed-Size: {INSTALLED_SIZE}
Architecture: {ARCH}
Maintainer: thomas.gruber@fau.de
Depends: libc6 (>= 2.2.1)
Build-Depends: debhelper-compat (= 13), git, golang-go
Description: Metric collection daemon from the ClusterCockpit suite
Homepage: https://github.com/ClusterCockpit/cc-metric-collector
Source: cc-metric-collector
Rules-Requires-Root: no

View File

@ -148,10 +148,14 @@ type GangliaMetricConfig struct {
Unit string Unit string
Group string Group string
Value string Value string
Name string
} }
func GetCommonGangliaConfig(point lp.CCMetric) GangliaMetricConfig { func GetCommonGangliaConfig(point lp.CCMetric) GangliaMetricConfig {
mname := GangliaMetricRename(point.Name()) mname := GangliaMetricRename(point.Name())
if oldname, ok := point.GetMeta("oldname"); ok {
mname = GangliaMetricRename(oldname)
}
for _, group := range CommonGangliaMetrics { for _, group := range CommonGangliaMetrics {
for _, metric := range group.Metrics { for _, metric := range group.Metrics {
if metric.Name == mname { if metric.Name == mname {
@ -187,6 +191,7 @@ func GetCommonGangliaConfig(point lp.CCMetric) GangliaMetricConfig {
Tmax: metric.Tmax, Tmax: metric.Tmax,
Unit: metric.Unit, Unit: metric.Unit,
Value: valueStr, Value: valueStr,
Name: GangliaMetricRename(mname),
} }
} }
} }
@ -198,10 +203,15 @@ func GetCommonGangliaConfig(point lp.CCMetric) GangliaMetricConfig {
Tmax: 0, Tmax: 0,
Unit: "", Unit: "",
Value: "", Value: "",
Name: "",
} }
} }
func GetGangliaConfig(point lp.CCMetric) GangliaMetricConfig { func GetGangliaConfig(point lp.CCMetric) GangliaMetricConfig {
mname := GangliaMetricRename(point.Name())
if oldname, ok := point.GetMeta("oldname"); ok {
mname = GangliaMetricRename(oldname)
}
group := "" group := ""
if g, ok := point.GetMeta("group"); ok { if g, ok := point.GetMeta("group"); ok {
group = g group = g
@ -254,5 +264,6 @@ func GetGangliaConfig(point lp.CCMetric) GangliaMetricConfig {
Tmax: DEFAULT_GANGLIA_METRIC_TMAX, Tmax: DEFAULT_GANGLIA_METRIC_TMAX,
Unit: unit, Unit: unit,
Value: valueStr, Value: valueStr,
Name: GangliaMetricRename(mname),
} }
} }

View File

@ -39,16 +39,13 @@ func (s *GangliaSink) Write(point lp.CCMetric) error {
//var tagsstr []string //var tagsstr []string
var argstr []string var argstr []string
// Get metric name
metricname := GangliaMetricRename(point.Name())
// Get metric config (type, value, ... in suitable format) // Get metric config (type, value, ... in suitable format)
conf := GetCommonGangliaConfig(point) conf := GetCommonGangliaConfig(point)
if len(conf.Type) == 0 { if len(conf.Type) == 0 {
conf = GetGangliaConfig(point) conf = GetGangliaConfig(point)
} }
if len(conf.Type) == 0 { if len(conf.Type) == 0 {
return fmt.Errorf("metric %s has no 'value' field", metricname) return fmt.Errorf("metric %q (Ganglia name %q) has no 'value' field", point.Name(), conf.Name)
} }
if s.config.AddGangliaGroup { if s.config.AddGangliaGroup {
@ -70,7 +67,7 @@ func (s *GangliaSink) Write(point lp.CCMetric) error {
if s.config.AddTypeToName { if s.config.AddTypeToName {
argstr = append(argstr, fmt.Sprintf("--name=%s", GangliaMetricName(point))) argstr = append(argstr, fmt.Sprintf("--name=%s", GangliaMetricName(point)))
} else { } else {
argstr = append(argstr, fmt.Sprintf("--name=%s", metricname)) argstr = append(argstr, fmt.Sprintf("--name=%s", conf.Name))
} }
argstr = append(argstr, fmt.Sprintf("--slope=%s", conf.Slope)) argstr = append(argstr, fmt.Sprintf("--slope=%s", conf.Slope))
argstr = append(argstr, fmt.Sprintf("--value=%s", conf.Value)) argstr = append(argstr, fmt.Sprintf("--value=%s", conf.Value))

View File

@ -6,6 +6,7 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"time"
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
@ -27,6 +28,10 @@ type InfluxAsyncSinkConfig struct {
BatchSize uint `json:"batch_size,omitempty"` BatchSize uint `json:"batch_size,omitempty"`
// Interval, in ms, in which is buffer flushed if it has not been already written (by reaching batch size) . Default 1000ms // Interval, in ms, in which is buffer flushed if it has not been already written (by reaching batch size) . Default 1000ms
FlushInterval uint `json:"flush_interval,omitempty"` FlushInterval uint `json:"flush_interval,omitempty"`
InfluxRetryInterval string `json:"retry_interval"`
InfluxExponentialBase uint `json:"retry_exponential_base"`
InfluxMaxRetries uint `json:"max_retries"`
InfluxMaxRetryTime string `json:"max_retry_time"`
} }
type InfluxAsyncSink struct { type InfluxAsyncSink struct {
@ -35,6 +40,8 @@ type InfluxAsyncSink struct {
writeApi influxdb2Api.WriteAPI writeApi influxdb2Api.WriteAPI
errors <-chan error errors <-chan error
config InfluxAsyncSinkConfig config InfluxAsyncSinkConfig
influxRetryInterval uint
influxMaxRetryTime uint
} }
func (s *InfluxAsyncSink) connect() error { func (s *InfluxAsyncSink) connect() error {
@ -63,6 +70,11 @@ func (s *InfluxAsyncSink) connect() error {
InsecureSkipVerify: true, InsecureSkipVerify: true,
}, },
) )
clientOptions.SetMaxRetryInterval(s.influxRetryInterval)
clientOptions.SetMaxRetryTime(s.influxMaxRetryTime)
clientOptions.SetExponentialBase(s.config.InfluxExponentialBase)
clientOptions.SetMaxRetries(s.config.InfluxMaxRetries)
s.client = influxdb2.NewClientWithOptions(uri, auth, clientOptions) s.client = influxdb2.NewClientWithOptions(uri, auth, clientOptions)
s.writeApi = s.client.WriteAPI(s.config.Organization, s.config.Database) s.writeApi = s.client.WriteAPI(s.config.Organization, s.config.Database)
ok, err := s.client.Ping(context.Background()) ok, err := s.client.Ping(context.Background())
@ -99,6 +111,33 @@ func NewInfluxAsyncSink(name string, config json.RawMessage) (Sink, error) {
// Set default for maximum number of points sent to server in single request. // Set default for maximum number of points sent to server in single request.
s.config.BatchSize = 100 s.config.BatchSize = 100
s.influxRetryInterval = uint(time.Duration(1) * time.Second)
s.config.InfluxRetryInterval = "1s"
s.influxMaxRetryTime = uint(7 * time.Duration(24) * time.Hour)
s.config.InfluxMaxRetryTime = "168h"
s.config.InfluxMaxRetries = 20
s.config.InfluxExponentialBase = 2
// Default retry intervals (in seconds)
// 1 2
// 2 4
// 4 8
// 8 16
// 16 32
// 32 64
// 64 128
// 128 256
// 256 512
// 512 1024
// 1024 2048
// 2048 4096
// 4096 8192
// 8192 16384
// 16384 32768
// 32768 65536
// 65536 131072
// 131072 262144
// 262144 524288
if len(config) > 0 { if len(config) > 0 {
err := json.Unmarshal(config, &s.config) err := json.Unmarshal(config, &s.config)
@ -114,6 +153,16 @@ func NewInfluxAsyncSink(name string, config json.RawMessage) (Sink, error) {
return nil, errors.New("not all configuration variables set required by InfluxAsyncSink") return nil, errors.New("not all configuration variables set required by InfluxAsyncSink")
} }
toUint := func(duration string, def uint) uint {
t, err := time.ParseDuration(duration)
if err == nil {
return uint(t.Milliseconds())
}
return def
}
s.influxRetryInterval = toUint(s.config.InfluxRetryInterval, s.influxRetryInterval)
s.influxMaxRetryTime = toUint(s.config.InfluxMaxRetryTime, s.influxMaxRetryTime)
// Connect to InfluxDB server // Connect to InfluxDB server
if err := s.connect(); err != nil { if err := s.connect(); err != nil {
return nil, fmt.Errorf("unable to connect: %v", err) return nil, fmt.Errorf("unable to connect: %v", err)

View File

@ -18,6 +18,10 @@ The `influxasync` sink uses the official [InfluxDB golang client](https://pkg.go
"organization": "myorg", "organization": "myorg",
"ssl": true, "ssl": true,
"batch_size": 200, "batch_size": 200,
"retry_interval" : "1s",
"retry_exponential_base" : 2,
"max_retries": 20,
"max_retry_time" : "168h"
} }
} }
``` ```
@ -32,3 +36,9 @@ The `influxasync` sink uses the official [InfluxDB golang client](https://pkg.go
- `organization`: Organization in the InfluxDB - `organization`: Organization in the InfluxDB
- `ssl`: Use SSL connection - `ssl`: Use SSL connection
- `batch_size`: batch up metrics internally, default 100 - `batch_size`: batch up metrics internally, default 100
- `retry_interval`: Base retry interval for failed write requests, default 1s
- `retry_exponential_base`: The retry interval is exponentially increased with this base, default 2
- `max_retries`: Maximal number of retry attempts
- `max_retry_time`: Maximal time to retry failed writes, default 168h (one week)
For information about the calculation of the retry interval settings, see [offical influxdb-client-go documentation](https://github.com/influxdata/influxdb-client-go#handling-of-failed-async-writes)

View File

@ -6,6 +6,7 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"time"
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
@ -23,6 +24,11 @@ type InfluxSinkConfig struct {
Organization string `json:"organization,omitempty"` Organization string `json:"organization,omitempty"`
SSL bool `json:"ssl,omitempty"` SSL bool `json:"ssl,omitempty"`
RetentionPol string `json:"retention_policy,omitempty"` RetentionPol string `json:"retention_policy,omitempty"`
InfluxRetryInterval string `json:"retry_interval"`
InfluxExponentialBase uint `json:"retry_exponential_base"`
InfluxMaxRetries uint `json:"max_retries"`
InfluxMaxRetryTime string `json:"max_retry_time"`
//InfluxMaxRetryDelay string `json:"max_retry_delay"` // It is mentioned in the docs but there is no way to set it
} }
type InfluxSink struct { type InfluxSink struct {
@ -30,6 +36,9 @@ type InfluxSink struct {
client influxdb2.Client client influxdb2.Client
writeApi influxdb2Api.WriteAPIBlocking writeApi influxdb2Api.WriteAPIBlocking
config InfluxSinkConfig config InfluxSinkConfig
influxRetryInterval uint
influxMaxRetryTime uint
//influxMaxRetryDelay uint
} }
func (s *InfluxSink) connect() error { func (s *InfluxSink) connect() error {
@ -52,6 +61,12 @@ func (s *InfluxSink) connect() error {
InsecureSkipVerify: true, InsecureSkipVerify: true,
}, },
) )
clientOptions.SetMaxRetryInterval(s.influxRetryInterval)
clientOptions.SetMaxRetryTime(s.influxMaxRetryTime)
clientOptions.SetExponentialBase(s.config.InfluxExponentialBase)
clientOptions.SetMaxRetries(s.config.InfluxMaxRetries)
s.client = influxdb2.NewClientWithOptions(uri, auth, clientOptions) s.client = influxdb2.NewClientWithOptions(uri, auth, clientOptions)
s.writeApi = s.client.WriteAPIBlocking(s.config.Organization, s.config.Database) s.writeApi = s.client.WriteAPIBlocking(s.config.Organization, s.config.Database)
ok, err := s.client.Ping(context.Background()) ok, err := s.client.Ping(context.Background())
@ -91,6 +106,13 @@ func NewInfluxSink(name string, config json.RawMessage) (Sink, error) {
return nil, err return nil, err
} }
} }
s.influxRetryInterval = uint(time.Duration(1) * time.Second)
s.config.InfluxRetryInterval = "1s"
s.influxMaxRetryTime = uint(7 * time.Duration(24) * time.Hour)
s.config.InfluxMaxRetryTime = "168h"
s.config.InfluxMaxRetries = 20
s.config.InfluxExponentialBase = 2
if len(s.config.Host) == 0 || if len(s.config.Host) == 0 ||
len(s.config.Port) == 0 || len(s.config.Port) == 0 ||
len(s.config.Database) == 0 || len(s.config.Database) == 0 ||
@ -99,6 +121,16 @@ func NewInfluxSink(name string, config json.RawMessage) (Sink, error) {
return nil, errors.New("not all configuration variables set required by InfluxSink") return nil, errors.New("not all configuration variables set required by InfluxSink")
} }
toUint := func(duration string, def uint) uint {
t, err := time.ParseDuration(duration)
if err == nil {
return uint(t.Milliseconds())
}
return def
}
s.influxRetryInterval = toUint(s.config.InfluxRetryInterval, s.influxRetryInterval)
s.influxMaxRetryTime = toUint(s.config.InfluxMaxRetryTime, s.influxMaxRetryTime)
// Connect to InfluxDB server // Connect to InfluxDB server
if err := s.connect(); err != nil { if err := s.connect(); err != nil {
return nil, fmt.Errorf("unable to connect: %v", err) return nil, fmt.Errorf("unable to connect: %v", err)

View File

@ -17,6 +17,10 @@ The `influxdb` sink uses the official [InfluxDB golang client](https://pkg.go.de
"password" : "examplepw", "password" : "examplepw",
"organization": "myorg", "organization": "myorg",
"ssl": true, "ssl": true,
"retry_interval" : "1s",
"retry_exponential_base" : 2,
"max_retries": 20,
"max_retry_time" : "168h"
} }
} }
``` ```
@ -30,3 +34,9 @@ The `influxdb` sink uses the official [InfluxDB golang client](https://pkg.go.de
- `password`: Password for basic authentification - `password`: Password for basic authentification
- `organization`: Organization in the InfluxDB - `organization`: Organization in the InfluxDB
- `ssl`: Use SSL connection - `ssl`: Use SSL connection
- `retry_interval`: Base retry interval for failed write requests, default 1s
- `retry_exponential_base`: The retry interval is exponentially increased with this base, default 2
- `max_retries`: Maximal number of retry attempts
- `max_retry_time`: Maximal time to retry failed writes, default 168h (one week)
For information about the calculation of the retry interval settings, see [offical influxdb-client-go documentation](https://github.com/influxdata/influxdb-client-go#handling-of-failed-async-writes)

View File

@ -124,24 +124,21 @@ func (s *LibgangliaSink) Write(point lp.CCMetric) error {
return s.cstrCache[key] return s.cstrCache[key]
} }
// Get metric name
metricname := GangliaMetricRename(point.Name())
conf := GetCommonGangliaConfig(point) conf := GetCommonGangliaConfig(point)
if len(conf.Type) == 0 { if len(conf.Type) == 0 {
conf = GetGangliaConfig(point) conf = GetGangliaConfig(point)
} }
if len(conf.Type) == 0 { if len(conf.Type) == 0 {
return fmt.Errorf("metric %s has no 'value' field", metricname) return fmt.Errorf("metric %q (Ganglia name %q) has no 'value' field", point.Name(), conf.Name)
} }
if s.config.AddTypeToName { if s.config.AddTypeToName {
metricname = GangliaMetricName(point) conf.Name = GangliaMetricName(point)
} }
c_value = C.CString(conf.Value) c_value = C.CString(conf.Value)
c_type = lookup(conf.Type) c_type = lookup(conf.Type)
c_name = lookup(metricname) c_name = lookup(conf.Name)
// Add unit // Add unit
unit := "" unit := ""