mirror of
https://github.com/ClusterCockpit/cc-metric-collector.git
synced 2025-04-06 13:35:55 +02:00
Merge branch 'develop' into derived_metrics
This commit is contained in:
commit
e1f4b0e12c
64
.github/workflows/AlmaLinux.yml
vendored
64
.github/workflows/AlmaLinux.yml
vendored
@ -1,64 +0,0 @@
|
||||
# See: https://docs.github.com/en/actions/using-workflows/workflow-syntax-for-github-actions
|
||||
|
||||
# Workflow name
|
||||
name: AlmaLinux 8.5 RPM build
|
||||
|
||||
# Run on tag push
|
||||
on:
|
||||
push:
|
||||
tags:
|
||||
- '**'
|
||||
|
||||
jobs:
|
||||
|
||||
#
|
||||
# Build on AlmaLinux 8.5 using go-toolset
|
||||
#
|
||||
AlmaLinux-RPM-build:
|
||||
runs-on: ubuntu-latest
|
||||
# See: https://hub.docker.com/_/almalinux
|
||||
container: almalinux:8.5
|
||||
steps:
|
||||
|
||||
# Use dnf to install development packages
|
||||
- name: Install development packages
|
||||
run: dnf --assumeyes group install "Development Tools" "RPM Development Tools"
|
||||
|
||||
# Checkout git repository and submodules
|
||||
# fetch-depth must be 0 to use git describe
|
||||
# See: https://github.com/marketplace/actions/checkout
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v2
|
||||
with:
|
||||
submodules: recursive
|
||||
fetch-depth: 0
|
||||
|
||||
# Use dnf to install build dependencies
|
||||
- name: Install build dependencies
|
||||
run: dnf --assumeyes builddep scripts/cc-metric-collector.spec
|
||||
|
||||
- name: RPM build MetricCollector
|
||||
id: rpmbuild
|
||||
run: make RPM
|
||||
|
||||
# See: https://github.com/actions/upload-artifact
|
||||
- name: Save RPM as artifact
|
||||
uses: actions/upload-artifact@v2
|
||||
with:
|
||||
name: cc-metric-collector RPM for AlmaLinux 8.5
|
||||
path: ${{ steps.rpmbuild.outputs.RPM }}
|
||||
- name: Save SRPM as artifact
|
||||
uses: actions/upload-artifact@v2
|
||||
with:
|
||||
name: cc-metric-collector SRPM for AlmaLinux 8.5
|
||||
path: ${{ steps.rpmbuild.outputs.SRPM }}
|
||||
|
||||
# See: https://github.com/softprops/action-gh-release
|
||||
- name: Release
|
||||
uses: softprops/action-gh-release@v1
|
||||
if: startsWith(github.ref, 'refs/tags/')
|
||||
with:
|
||||
name: cc-metric-collector-${{github.ref_name}}
|
||||
files: |
|
||||
${{ steps.rpmbuild.outputs.RPM }}
|
||||
${{ steps.rpmbuild.outputs.SRPM }}
|
64
.github/workflows/RedHatUniversalBaseImage.yml
vendored
64
.github/workflows/RedHatUniversalBaseImage.yml
vendored
@ -1,64 +0,0 @@
|
||||
# See: https://docs.github.com/en/actions/using-workflows/workflow-syntax-for-github-actions
|
||||
|
||||
# Workflow name
|
||||
name: Red Hat Universal Base Image 8 RPM build
|
||||
|
||||
# Run on tag push
|
||||
on:
|
||||
push:
|
||||
tags:
|
||||
- '**'
|
||||
|
||||
jobs:
|
||||
|
||||
#
|
||||
# Build on UBI 8 using go-toolset
|
||||
#
|
||||
UBI-8-RPM-build:
|
||||
runs-on: ubuntu-latest
|
||||
# See: https://catalog.redhat.com/software/containers/ubi8/ubi/5c359854d70cc534b3a3784e?container-tabs=gti
|
||||
container: registry.access.redhat.com/ubi8/ubi:8.5-226.1645809065
|
||||
steps:
|
||||
|
||||
# Use dnf to install development packages
|
||||
- name: Install development packages
|
||||
run: dnf --assumeyes --disableplugin=subscription-manager install rpm-build go-srpm-macros rpm-build-libs rpm-libs gcc make python38 git
|
||||
|
||||
# Checkout git repository and submodules
|
||||
# fetch-depth must be 0 to use git describe
|
||||
# See: https://github.com/marketplace/actions/checkout
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v2
|
||||
with:
|
||||
submodules: recursive
|
||||
fetch-depth: 0
|
||||
|
||||
# Use dnf to install build dependencies
|
||||
- name: Install build dependencies
|
||||
run: dnf --assumeyes --disableplugin=subscription-manager builddep scripts/cc-metric-collector.spec
|
||||
|
||||
- name: RPM build MetricCollector
|
||||
id: rpmbuild
|
||||
run: make RPM
|
||||
|
||||
# See: https://github.com/actions/upload-artifact
|
||||
- name: Save RPM as artifact
|
||||
uses: actions/upload-artifact@v2
|
||||
with:
|
||||
name: cc-metric-collector RPM for UBI 8
|
||||
path: ${{ steps.rpmbuild.outputs.RPM }}
|
||||
- name: Save SRPM as artifact
|
||||
uses: actions/upload-artifact@v2
|
||||
with:
|
||||
name: cc-metric-collector SRPM for UBI 8
|
||||
path: ${{ steps.rpmbuild.outputs.SRPM }}
|
||||
|
||||
# See: https://github.com/softprops/action-gh-release
|
||||
- name: Release
|
||||
uses: softprops/action-gh-release@v1
|
||||
if: startsWith(github.ref, 'refs/tags/')
|
||||
with:
|
||||
name: cc-metric-collector-${{github.ref_name}}
|
||||
files: |
|
||||
${{ steps.rpmbuild.outputs.RPM }}
|
||||
${{ steps.rpmbuild.outputs.SRPM }}
|
184
.github/workflows/Release.yml
vendored
Normal file
184
.github/workflows/Release.yml
vendored
Normal file
@ -0,0 +1,184 @@
|
||||
# See: https://docs.github.com/en/actions/using-workflows/workflow-syntax-for-github-actions
|
||||
|
||||
# Workflow name
|
||||
name: Release
|
||||
|
||||
# Run on tag push
|
||||
on:
|
||||
push:
|
||||
tags:
|
||||
- '**'
|
||||
|
||||
jobs:
|
||||
|
||||
#
|
||||
# Build on AlmaLinux 8.5 using go-toolset
|
||||
#
|
||||
AlmaLinux-RPM-build:
|
||||
runs-on: ubuntu-latest
|
||||
# See: https://hub.docker.com/_/almalinux
|
||||
container: almalinux:8.5
|
||||
# The job outputs link to the outputs of the 'rpmrename' step
|
||||
# Only job outputs can be used in child jobs
|
||||
outputs:
|
||||
rpm : ${{steps.rpmrename.outputs.RPM}}
|
||||
srpm : ${{steps.rpmrename.outputs.SRPM}}
|
||||
steps:
|
||||
|
||||
# Use dnf to install development packages
|
||||
- name: Install development packages
|
||||
run: dnf --assumeyes group install "Development Tools" "RPM Development Tools"
|
||||
|
||||
# Checkout git repository and submodules
|
||||
# fetch-depth must be 0 to use git describe
|
||||
# See: https://github.com/marketplace/actions/checkout
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v2
|
||||
with:
|
||||
submodules: recursive
|
||||
fetch-depth: 0
|
||||
|
||||
# Use dnf to install build dependencies
|
||||
- name: Install build dependencies
|
||||
run: dnf --assumeyes builddep scripts/cc-metric-collector.spec
|
||||
|
||||
- name: RPM build MetricCollector
|
||||
id: rpmbuild
|
||||
run: make RPM
|
||||
|
||||
# AlmaLinux 8.5 is a derivate of RedHat Enterprise Linux 8 (UBI8),
|
||||
# so the created RPM both contain the substring 'el8' in the RPM file names
|
||||
# This step replaces the substring 'el8' to 'alma85'. It uses the move operation
|
||||
# because it is unclear whether the default AlmaLinux 8.5 container contains the
|
||||
# 'rename' command. This way we also get the new names for output.
|
||||
- name: Rename RPMs (s/el8/alma85/)
|
||||
id: rpmrename
|
||||
run: |
|
||||
OLD_RPM="${{steps.rpmbuild.outputs.RPM}}"
|
||||
OLD_SRPM="${{steps.rpmbuild.outputs.SRPM}}"
|
||||
NEW_RPM="${OLD_RPM/el8/alma85}"
|
||||
NEW_SRPM=${OLD_SRPM/el8/alma85}
|
||||
mv "${OLD_RPM}" "${NEW_RPM}"
|
||||
mv "${OLD_SRPM}" "${NEW_SRPM}"
|
||||
echo "::set-output name=SRPM::${NEW_SRPM}"
|
||||
echo "::set-output name=RPM::${NEW_RPM}"
|
||||
|
||||
# See: https://github.com/actions/upload-artifact
|
||||
- name: Save RPM as artifact
|
||||
uses: actions/upload-artifact@v2
|
||||
with:
|
||||
name: cc-metric-collector RPM for AlmaLinux 8.5
|
||||
path: ${{ steps.rpmrename.outputs.RPM }}
|
||||
- name: Save SRPM as artifact
|
||||
uses: actions/upload-artifact@v2
|
||||
with:
|
||||
name: cc-metric-collector SRPM for AlmaLinux 8.5
|
||||
path: ${{ steps.rpmrename.outputs.SRPM }}
|
||||
|
||||
#
|
||||
# Build on UBI 8 using go-toolset
|
||||
#
|
||||
UBI-8-RPM-build:
|
||||
runs-on: ubuntu-latest
|
||||
# See: https://catalog.redhat.com/software/containers/ubi8/ubi/5c359854d70cc534b3a3784e?container-tabs=gti
|
||||
container: registry.access.redhat.com/ubi8/ubi:8.5-226.1645809065
|
||||
# The job outputs link to the outputs of the 'rpmbuild' step
|
||||
outputs:
|
||||
rpm : ${{steps.rpmbuild.outputs.RPM}}
|
||||
srpm : ${{steps.rpmbuild.outputs.SRPM}}
|
||||
steps:
|
||||
|
||||
# Use dnf to install development packages
|
||||
- name: Install development packages
|
||||
run: dnf --assumeyes --disableplugin=subscription-manager install rpm-build go-srpm-macros rpm-build-libs rpm-libs gcc make python38 git
|
||||
|
||||
# Checkout git repository and submodules
|
||||
# fetch-depth must be 0 to use git describe
|
||||
# See: https://github.com/marketplace/actions/checkout
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v2
|
||||
with:
|
||||
submodules: recursive
|
||||
fetch-depth: 0
|
||||
|
||||
# Use dnf to install build dependencies
|
||||
- name: Install build dependencies
|
||||
run: dnf --assumeyes --disableplugin=subscription-manager builddep scripts/cc-metric-collector.spec
|
||||
|
||||
- name: RPM build MetricCollector
|
||||
id: rpmbuild
|
||||
run: make RPM
|
||||
|
||||
# See: https://github.com/actions/upload-artifact
|
||||
- name: Save RPM as artifact
|
||||
uses: actions/upload-artifact@v2
|
||||
with:
|
||||
name: cc-metric-collector RPM for UBI 8
|
||||
path: ${{ steps.rpmbuild.outputs.RPM }}
|
||||
- name: Save SRPM as artifact
|
||||
uses: actions/upload-artifact@v2
|
||||
with:
|
||||
name: cc-metric-collector SRPM for UBI 8
|
||||
path: ${{ steps.rpmbuild.outputs.SRPM }}
|
||||
|
||||
#
|
||||
# Create release with fresh RPMs
|
||||
#
|
||||
Release:
|
||||
runs-on: ubuntu-latest
|
||||
# We need the RPMs, so add dependency
|
||||
needs: [AlmaLinux-RPM-build, UBI-8-RPM-build]
|
||||
|
||||
steps:
|
||||
# See: https://github.com/actions/download-artifact
|
||||
- name: Download AlmaLinux 8.5 RPM
|
||||
uses: actions/download-artifact@v2
|
||||
with:
|
||||
name: cc-metric-collector RPM for AlmaLinux 8.5
|
||||
- name: Download AlmaLinux 8.5 SRPM
|
||||
uses: actions/download-artifact@v2
|
||||
with:
|
||||
name: cc-metric-collector SRPM for AlmaLinux 8.5
|
||||
|
||||
- name: Download UBI 8 RPM
|
||||
uses: actions/download-artifact@v2
|
||||
with:
|
||||
name: cc-metric-collector RPM for UBI 8
|
||||
- name: Download UBI 8 SRPM
|
||||
uses: actions/download-artifact@v2
|
||||
with:
|
||||
name: cc-metric-collector SRPM for UBI 8
|
||||
|
||||
# The download actions do not publish the name of the downloaded file,
|
||||
# so we re-use the job outputs of the parent jobs. The files are all
|
||||
# downloaded to the current folder.
|
||||
# The gh-release action afterwards does not accept file lists but all
|
||||
# files have to be listed at 'files'. The step creates one output per
|
||||
# RPM package (2 per distro)
|
||||
- name: Set RPM variables
|
||||
id: files
|
||||
run: |
|
||||
ALMA_85_RPM=$(basename "${{ needs.AlmaLinux-RPM-build.outputs.rpm}}")
|
||||
ALMA_85_SRPM=$(basename "${{ needs.AlmaLinux-RPM-build.outputs.srpm}}")
|
||||
UBI_8_RPM=$(basename "${{ needs.UBI-8-RPM-build.outputs.rpm}}")
|
||||
UBI_8_SRPM=$(basename "${{ needs.UBI-8-RPM-build.outputs.srpm}}")
|
||||
echo "ALMA_85_RPM::${ALMA_85_RPM}"
|
||||
echo "ALMA_85_SRPM::${ALMA_85_SRPM}"
|
||||
echo "UBI_8_RPM::${UBI_8_RPM}"
|
||||
echo "UBI_8_SRPM::${UBI_8_SRPM}"
|
||||
echo "::set-output name=ALMA_85_RPM::${ALMA_85_RPM}"
|
||||
echo "::set-output name=ALMA_85_SRPM::${ALMA_85_SRPM}"
|
||||
echo "::set-output name=UBI_8_RPM::${UBI_8_RPM}"
|
||||
echo "::set-output name=UBI_8_SRPM::${UBI_8_SRPM}"
|
||||
|
||||
# See: https://github.com/softprops/action-gh-release
|
||||
- name: Release
|
||||
uses: softprops/action-gh-release@v1
|
||||
if: startsWith(github.ref, 'refs/tags/')
|
||||
with:
|
||||
name: cc-metric-collector-${{github.ref_name}}
|
||||
files: |
|
||||
${{ steps.files.outputs.ALMA_85_RPM }}
|
||||
${{ steps.files.outputs.ALMA_85_SRPM }}
|
||||
${{ steps.files.outputs.UBI_8_RPM }}
|
||||
${{ steps.files.outputs.UBI_8_SRPM }}
|
53
Makefile
53
Makefile
@ -15,6 +15,8 @@ COMPONENT_DIRS := collectors \
|
||||
internal/ccTopology \
|
||||
internal/multiChanTicker
|
||||
|
||||
BINDIR = bin
|
||||
|
||||
|
||||
.PHONY: all
|
||||
all: $(APP)
|
||||
@ -24,15 +26,27 @@ $(APP): $(GOSRC)
|
||||
go get
|
||||
go build -o $(APP) $(GOSRC_APP)
|
||||
|
||||
install: $(APP)
|
||||
@WORKSPACE=$(PREFIX)
|
||||
@if [ -z "$${WORKSPACE}" ]; then exit 1; fi
|
||||
@mkdir --parents --verbose $${WORKSPACE}/usr/$(BINDIR)
|
||||
@install -Dpm 755 $(APP) $${WORKSPACE}/usr/$(BINDIR)/$(APP)
|
||||
@mkdir --parents --verbose $${WORKSPACE}/etc/cc-metric-collector $${WORKSPACE}/etc/default $${WORKSPACE}/etc/systemd/system $${WORKSPACE}/etc/init.d
|
||||
@install -Dpm 600 config.json $${WORKSPACE}/etc/cc-metric-collector/cc-metric-collector.json
|
||||
@sed -i -e s+"\"./"+"\"/etc/cc-metric-collector/"+g $${WORKSPACE}/etc/cc-metric-collector/cc-metric-collector.json
|
||||
@install -Dpm 600 sinks.json $${WORKSPACE}/etc/cc-metric-collector/sinks.json
|
||||
@install -Dpm 600 collectors.json $${WORKSPACE}/etc/cc-metric-collector/collectors.json
|
||||
@install -Dpm 600 router.json $${WORKSPACE}/etc/cc-metric-collector/router.json
|
||||
@install -Dpm 600 receivers.json $${WORKSPACE}/etc/cc-metric-collector/receivers.json
|
||||
@install -Dpm 600 scripts/cc-metric-collector.config $${WORKSPACE}/etc/default/cc-metric-collector
|
||||
@install -Dpm 644 scripts/cc-metric-collector.service $${WORKSPACE}/etc/systemd/system/cc-metric-collector.service
|
||||
@install -Dpm 644 scripts/cc-metric-collector.init $${WORKSPACE}/etc/init.d/cc-metric-collector
|
||||
|
||||
|
||||
.PHONY: clean
|
||||
.ONESHELL:
|
||||
clean:
|
||||
@for COMP in $(COMPONENT_DIRS)
|
||||
do
|
||||
if [[ -e $$COMP/Makefile ]]; then
|
||||
make -C $$COMP clean
|
||||
fi
|
||||
done
|
||||
@for COMP in $(COMPONENT_DIRS); do if [ -e $$COMP/Makefile ]; then make -C $$COMP clean; fi; done
|
||||
rm -f $(APP)
|
||||
|
||||
.PHONY: fmt
|
||||
@ -69,7 +83,7 @@ RPM: scripts/cc-metric-collector.spec
|
||||
@COMMITISH="HEAD"
|
||||
@VERS=$$(git describe --tags $${COMMITISH})
|
||||
@VERS=$${VERS#v}
|
||||
@VERS=$${VERS//-/_}
|
||||
@VERS=$$(echo $$VERS | sed -e s+'-'+'_'+g)
|
||||
@eval $$(rpmspec --query --queryformat "NAME='%{name}' VERSION='%{version}' RELEASE='%{release}' NVR='%{NVR}' NVRA='%{NVRA}'" --define="VERS $${VERS}" "$${SPECFILE}")
|
||||
@PREFIX="$${NAME}-$${VERSION}"
|
||||
@FORMAT="tar.gz"
|
||||
@ -86,3 +100,28 @@ RPM: scripts/cc-metric-collector.spec
|
||||
@ echo "::set-output name=SRPM::$${SRPMFILE}"
|
||||
@ echo "::set-output name=RPM::$${RPMFILE}"
|
||||
@fi
|
||||
|
||||
.PHONY: DEB
|
||||
DEB: scripts/cc-metric-collector.deb.control $(APP)
|
||||
@BASEDIR=$${PWD}
|
||||
@WORKSPACE=$${PWD}/.dpkgbuild
|
||||
@DEBIANDIR=$${WORKSPACE}/debian
|
||||
@DEBIANBINDIR=$${WORKSPACE}/DEBIAN
|
||||
@mkdir --parents --verbose $$WORKSPACE $$DEBIANBINDIR
|
||||
#@mkdir --parents --verbose $$DEBIANDIR
|
||||
@CONTROLFILE="$${BASEDIR}/scripts/cc-metric-collector.deb.control"
|
||||
@COMMITISH="HEAD"
|
||||
@VERS=$$(git describe --tags --abbrev=0 $${COMMITISH})
|
||||
@VERS=$${VERS#v}
|
||||
@VERS=$$(echo $$VERS | sed -e s+'-'+'_'+g)
|
||||
@ARCH=$$(uname -m)
|
||||
@ARCH=$$(echo $$ARCH | sed -e s+'_'+'-'+g)
|
||||
@PREFIX="$${NAME}-$${VERSION}_$${ARCH}"
|
||||
@SIZE_BYTES=$$(du -bcs --exclude=.dpkgbuild "$$WORKSPACE"/ | awk '{print $$1}' | head -1 | sed -e 's/^0\+//')
|
||||
@SIZE="$$(awk -v size="$$SIZE_BYTES" 'BEGIN {print (size/1024)+1}' | awk '{print int($$0)}')"
|
||||
#@sed -e s+"{VERSION}"+"$$VERS"+g -e s+"{INSTALLED_SIZE}"+"$$SIZE"+g -e s+"{ARCH}"+"$$ARCH"+g $$CONTROLFILE > $${DEBIANDIR}/control
|
||||
@sed -e s+"{VERSION}"+"$$VERS"+g -e s+"{INSTALLED_SIZE}"+"$$SIZE"+g -e s+"{ARCH}"+"$$ARCH"+g $$CONTROLFILE > $${DEBIANBINDIR}/control
|
||||
@make PREFIX=$${WORKSPACE} install
|
||||
@DEB_FILE="cc-metric-collector_$${VERS}_$${ARCH}.deb"
|
||||
@dpkg-deb -b $${WORKSPACE} "$$DEB_FILE"
|
||||
@rm -r "$${WORKSPACE}"
|
||||
|
@ -15,7 +15,6 @@ import (
|
||||
"io/ioutil"
|
||||
"math"
|
||||
"os"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
@ -28,48 +27,6 @@ import (
|
||||
"github.com/NVIDIA/go-nvml/pkg/dl"
|
||||
)
|
||||
|
||||
type MetricScope string
|
||||
|
||||
const (
|
||||
METRIC_SCOPE_HWTHREAD = iota
|
||||
METRIC_SCOPE_CORE
|
||||
METRIC_SCOPE_LLC
|
||||
METRIC_SCOPE_NUMA
|
||||
METRIC_SCOPE_DIE
|
||||
METRIC_SCOPE_SOCKET
|
||||
METRIC_SCOPE_NODE
|
||||
)
|
||||
|
||||
func (ms MetricScope) String() string {
|
||||
return string(ms)
|
||||
}
|
||||
|
||||
func (ms MetricScope) Likwid() string {
|
||||
LikwidDomains := map[string]string{
|
||||
"cpu": "",
|
||||
"core": "",
|
||||
"llc": "C",
|
||||
"numadomain": "M",
|
||||
"die": "D",
|
||||
"socket": "S",
|
||||
"node": "N",
|
||||
}
|
||||
return LikwidDomains[string(ms)]
|
||||
}
|
||||
|
||||
func (ms MetricScope) Granularity() int {
|
||||
for i, g := range GetAllMetricScopes() {
|
||||
if ms == g {
|
||||
return i
|
||||
}
|
||||
}
|
||||
return -1
|
||||
}
|
||||
|
||||
func GetAllMetricScopes() []MetricScope {
|
||||
return []MetricScope{"cpu" /*, "core", "llc", "numadomain", "die",*/, "socket", "node"}
|
||||
}
|
||||
|
||||
const (
|
||||
LIKWID_LIB_NAME = "liblikwid.so"
|
||||
LIKWID_LIB_DL_FLAGS = dl.RTLD_LAZY | dl.RTLD_GLOBAL
|
||||
@ -77,18 +34,16 @@ const (
|
||||
)
|
||||
|
||||
type LikwidCollectorMetricConfig struct {
|
||||
Name string `json:"name"` // Name of the metric
|
||||
Calc string `json:"calc"` // Calculation for the metric using
|
||||
//Aggr string `json:"aggregation"` // if scope unequal to LIKWID metric scope, the values are combined (sum, min, max, mean or avg, median)
|
||||
Scope MetricScope `json:"scope"` // scope for calculation. subscopes are aggregated using the 'aggregation' function
|
||||
Publish bool `json:"publish"`
|
||||
granulatity MetricScope
|
||||
Name string `json:"name"` // Name of the metric
|
||||
Calc string `json:"calc"` // Calculation for the metric using
|
||||
Type string `json:"type"` // Metric type (aka node, socket, cpu, ...)
|
||||
Publish bool `json:"publish"`
|
||||
Unit string `json:"unit"` // Unit of metric if any
|
||||
}
|
||||
|
||||
type LikwidCollectorEventsetConfig struct {
|
||||
Events map[string]string `json:"events"`
|
||||
granulatity map[string]MetricScope
|
||||
Metrics []LikwidCollectorMetricConfig `json:"metrics"`
|
||||
Events map[string]string `json:"events"`
|
||||
Metrics []LikwidCollectorMetricConfig `json:"metrics"`
|
||||
}
|
||||
|
||||
type LikwidCollectorConfig struct {
|
||||
@ -98,28 +53,28 @@ type LikwidCollectorConfig struct {
|
||||
InvalidToZero bool `json:"invalid_to_zero,omitempty"`
|
||||
AccessMode string `json:"access_mode,omitempty"`
|
||||
DaemonPath string `json:"accessdaemon_path,omitempty"`
|
||||
LibraryPath string `json:"liblikwid_path,omitempty"`
|
||||
}
|
||||
|
||||
type LikwidCollector struct {
|
||||
metricCollector
|
||||
cpulist []C.int
|
||||
cpu2tid map[int]int
|
||||
sock2tid map[int]int
|
||||
scopeRespTids map[MetricScope]map[int]int
|
||||
metrics map[C.int]map[string]int
|
||||
groups []C.int
|
||||
config LikwidCollectorConfig
|
||||
results map[int]map[int]map[string]interface{}
|
||||
mresults map[int]map[int]map[string]float64
|
||||
gmresults map[int]map[string]float64
|
||||
basefreq float64
|
||||
running bool
|
||||
cpulist []C.int
|
||||
cpu2tid map[int]int
|
||||
sock2tid map[int]int
|
||||
metrics map[C.int]map[string]int
|
||||
groups []C.int
|
||||
config LikwidCollectorConfig
|
||||
results map[int]map[int]map[string]interface{}
|
||||
mresults map[int]map[int]map[string]float64
|
||||
gmresults map[int]map[string]float64
|
||||
basefreq float64
|
||||
running bool
|
||||
}
|
||||
|
||||
type LikwidMetric struct {
|
||||
name string
|
||||
search string
|
||||
scope MetricScope
|
||||
scope string
|
||||
group_idx int
|
||||
}
|
||||
|
||||
@ -131,152 +86,43 @@ func eventsToEventStr(events map[string]string) string {
|
||||
return strings.Join(elist, ",")
|
||||
}
|
||||
|
||||
func getGranularity(counter, event string) MetricScope {
|
||||
if strings.HasPrefix(counter, "PMC") || strings.HasPrefix(counter, "FIXC") {
|
||||
return "cpu"
|
||||
} else if strings.Contains(counter, "BOX") || strings.Contains(counter, "DEV") {
|
||||
return "socket"
|
||||
} else if strings.HasPrefix(counter, "PWR") {
|
||||
if event == "RAPL_CORE_ENERGY" {
|
||||
return "cpu"
|
||||
} else {
|
||||
return "socket"
|
||||
}
|
||||
}
|
||||
return "unknown"
|
||||
}
|
||||
|
||||
func getBaseFreq() float64 {
|
||||
var freq float64 = math.NaN()
|
||||
C.power_init(0)
|
||||
info := C.get_powerInfo()
|
||||
if float64(info.baseFrequency) != 0 {
|
||||
freq = float64(info.baseFrequency) * 1e3
|
||||
freq = float64(info.baseFrequency) * 1e6
|
||||
} else {
|
||||
buffer, err := ioutil.ReadFile("/sys/devices/system/cpu/cpu0/cpufreq/bios_limit")
|
||||
if err == nil {
|
||||
data := strings.Replace(string(buffer), "\n", "", -1)
|
||||
x, err := strconv.ParseInt(data, 0, 64)
|
||||
if err == nil {
|
||||
freq = float64(x) * 1e3
|
||||
freq = float64(x) * 1e6
|
||||
}
|
||||
}
|
||||
}
|
||||
return freq
|
||||
}
|
||||
|
||||
func (m *LikwidCollector) initGranularity() {
|
||||
splitRegex := regexp.MustCompile("[+-/*()]")
|
||||
for _, evset := range m.config.Eventsets {
|
||||
evset.granulatity = make(map[string]MetricScope)
|
||||
for counter, event := range evset.Events {
|
||||
gran := getGranularity(counter, event)
|
||||
if gran.Granularity() >= 0 {
|
||||
evset.granulatity[counter] = gran
|
||||
}
|
||||
}
|
||||
for i, metric := range evset.Metrics {
|
||||
s := splitRegex.Split(metric.Calc, -1)
|
||||
gran := MetricScope("cpu")
|
||||
evset.Metrics[i].granulatity = gran
|
||||
for _, x := range s {
|
||||
if _, ok := evset.Events[x]; ok {
|
||||
if evset.granulatity[x].Granularity() > gran.Granularity() {
|
||||
gran = evset.granulatity[x]
|
||||
}
|
||||
}
|
||||
}
|
||||
evset.Metrics[i].granulatity = gran
|
||||
}
|
||||
}
|
||||
for i, metric := range m.config.Metrics {
|
||||
s := splitRegex.Split(metric.Calc, -1)
|
||||
gran := MetricScope("cpu")
|
||||
m.config.Metrics[i].granulatity = gran
|
||||
for _, x := range s {
|
||||
for _, evset := range m.config.Eventsets {
|
||||
for _, m := range evset.Metrics {
|
||||
if m.Name == x && m.granulatity.Granularity() > gran.Granularity() {
|
||||
gran = m.granulatity
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
m.config.Metrics[i].granulatity = gran
|
||||
}
|
||||
}
|
||||
|
||||
type TopoResolveFunc func(cpuid int) int
|
||||
|
||||
func (m *LikwidCollector) getResponsiblities() map[MetricScope]map[int]int {
|
||||
get_cpus := func(scope MetricScope) map[int]int {
|
||||
var slist []int
|
||||
var cpu C.int
|
||||
var input func(index int) string
|
||||
switch scope {
|
||||
case "node":
|
||||
slist = []int{0}
|
||||
input = func(index int) string { return "N:0" }
|
||||
case "socket":
|
||||
input = func(index int) string { return fmt.Sprintf("%s%d:0", scope.Likwid(), index) }
|
||||
slist = topo.SocketList()
|
||||
// case "numadomain":
|
||||
// input = func(index int) string { return fmt.Sprintf("%s%d:0", scope.Likwid(), index) }
|
||||
// slist = topo.NumaNodeList()
|
||||
// cclog.Debug(scope, " ", input(0), " ", slist)
|
||||
// case "die":
|
||||
// input = func(index int) string { return fmt.Sprintf("%s%d:0", scope.Likwid(), index) }
|
||||
// slist = topo.DieList()
|
||||
// case "llc":
|
||||
// input = fmt.Sprintf("%s%d:0", scope.Likwid(), s)
|
||||
// slist = topo.LLCacheList()
|
||||
case "cpu":
|
||||
input = func(index int) string { return fmt.Sprintf("%d", index) }
|
||||
slist = topo.CpuList()
|
||||
case "hwthread":
|
||||
input = func(index int) string { return fmt.Sprintf("%d", index) }
|
||||
slist = topo.CpuList()
|
||||
}
|
||||
outmap := make(map[int]int)
|
||||
for _, s := range slist {
|
||||
t := C.CString(input(s))
|
||||
clen := C.cpustr_to_cpulist(t, &cpu, 1)
|
||||
if int(clen) == 1 {
|
||||
outmap[s] = m.cpu2tid[int(cpu)]
|
||||
} else {
|
||||
cclog.Error(fmt.Sprintf("Cannot determine responsible CPU for %s", input(s)))
|
||||
outmap[s] = -1
|
||||
}
|
||||
C.free(unsafe.Pointer(t))
|
||||
}
|
||||
return outmap
|
||||
}
|
||||
|
||||
scopes := GetAllMetricScopes()
|
||||
complete := make(map[MetricScope]map[int]int)
|
||||
for _, s := range scopes {
|
||||
complete[s] = get_cpus(s)
|
||||
}
|
||||
return complete
|
||||
}
|
||||
|
||||
func (m *LikwidCollector) Init(config json.RawMessage) error {
|
||||
var ret C.int
|
||||
m.name = "LikwidCollector"
|
||||
m.config.AccessMode = LIKWID_DEF_ACCESSMODE
|
||||
m.config.LibraryPath = LIKWID_LIB_NAME
|
||||
if len(config) > 0 {
|
||||
err := json.Unmarshal(config, &m.config)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
lib := dl.New(LIKWID_LIB_NAME, LIKWID_LIB_DL_FLAGS)
|
||||
lib := dl.New(m.config.LibraryPath, LIKWID_LIB_DL_FLAGS)
|
||||
if lib == nil {
|
||||
return fmt.Errorf("error instantiating DynamicLibrary for %s", LIKWID_LIB_NAME)
|
||||
return fmt.Errorf("error instantiating DynamicLibrary for %s", m.config.LibraryPath)
|
||||
}
|
||||
err := lib.Open()
|
||||
if err != nil {
|
||||
return fmt.Errorf("error opening %s: %v", LIKWID_LIB_NAME, err)
|
||||
return fmt.Errorf("error opening %s: %v", m.config.LibraryPath, err)
|
||||
}
|
||||
|
||||
if m.config.ForceOverwrite {
|
||||
@ -306,10 +152,6 @@ func (m *LikwidCollector) Init(config json.RawMessage) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// Determine which counter works at which level. PMC*: cpu, *BOX*: socket, ...
|
||||
m.initGranularity()
|
||||
// Generate map for MetricScope -> scope_id (like socket id) -> responsible id (offset in cpulist)
|
||||
m.scopeRespTids = m.getResponsiblities()
|
||||
switch m.config.AccessMode {
|
||||
case "direct":
|
||||
C.HPMmode(0)
|
||||
@ -336,29 +178,36 @@ func (m *LikwidCollector) Init(config json.RawMessage) error {
|
||||
globalParams["inverseClock"] = float64(1.0)
|
||||
// While adding the events, we test the metrics whether they can be computed at all
|
||||
for i, evset := range m.config.Eventsets {
|
||||
estr := eventsToEventStr(evset.Events)
|
||||
// Generate parameter list for the metric computing test
|
||||
params := make(map[string]interface{})
|
||||
params["time"] = float64(1.0)
|
||||
params["inverseClock"] = float64(1.0)
|
||||
for counter := range evset.Events {
|
||||
params[counter] = float64(1.0)
|
||||
}
|
||||
for _, metric := range evset.Metrics {
|
||||
// Try to evaluate the metric
|
||||
_, err := agg.EvalFloat64Condition(metric.Calc, params)
|
||||
if err != nil {
|
||||
cclog.ComponentError(m.name, "Calculation for metric", metric.Name, "failed:", err.Error())
|
||||
continue
|
||||
var gid C.int
|
||||
var cstr *C.char
|
||||
if len(evset.Events) > 0 {
|
||||
estr := eventsToEventStr(evset.Events)
|
||||
// Generate parameter list for the metric computing test
|
||||
params := make(map[string]interface{})
|
||||
params["time"] = float64(1.0)
|
||||
params["inverseClock"] = float64(1.0)
|
||||
for counter := range evset.Events {
|
||||
params[counter] = float64(1.0)
|
||||
}
|
||||
// If the metric is not in the parameter list for the global metrics, add it
|
||||
if _, ok := globalParams[metric.Name]; !ok {
|
||||
globalParams[metric.Name] = float64(1.0)
|
||||
for _, metric := range evset.Metrics {
|
||||
// Try to evaluate the metric
|
||||
_, err := agg.EvalFloat64Condition(metric.Calc, params)
|
||||
if err != nil {
|
||||
cclog.ComponentError(m.name, "Calculation for metric", metric.Name, "failed:", err.Error())
|
||||
continue
|
||||
}
|
||||
// If the metric is not in the parameter list for the global metrics, add it
|
||||
if _, ok := globalParams[metric.Name]; !ok {
|
||||
globalParams[metric.Name] = float64(1.0)
|
||||
}
|
||||
}
|
||||
// Now we add the list of events to likwid
|
||||
cstr = C.CString(estr)
|
||||
gid = C.perfmon_addEventSet(cstr)
|
||||
} else {
|
||||
cclog.ComponentError(m.name, "Invalid Likwid eventset config, no events given")
|
||||
continue
|
||||
}
|
||||
// Now we add the list of events to likwid
|
||||
cstr := C.CString(estr)
|
||||
gid := C.perfmon_addEventSet(cstr)
|
||||
if gid >= 0 {
|
||||
m.groups = append(m.groups, gid)
|
||||
}
|
||||
@ -434,15 +283,9 @@ func (m *LikwidCollector) calcEventsetMetrics(group int, interval time.Duration,
|
||||
// Go over events and get the results
|
||||
for eidx = 0; int(eidx) < len(evset.Events); eidx++ {
|
||||
ctr := C.perfmon_getCounterName(gid, eidx)
|
||||
ev := C.perfmon_getEventName(gid, eidx)
|
||||
gctr := C.GoString(ctr)
|
||||
gev := C.GoString(ev)
|
||||
// MetricScope for the counter (and if needed the event)
|
||||
scope := getGranularity(gctr, gev)
|
||||
// Get the map scope-id -> tids
|
||||
// This way we read less counters like only the responsible hardware thread for a socket
|
||||
scopemap := m.scopeRespTids[scope]
|
||||
for _, tid := range scopemap {
|
||||
|
||||
for _, tid := range m.cpu2tid {
|
||||
if tid >= 0 {
|
||||
m.results[group][tid]["time"] = interval.Seconds()
|
||||
m.results[group][tid]["inverseClock"] = invClock
|
||||
@ -456,7 +299,10 @@ func (m *LikwidCollector) calcEventsetMetrics(group int, interval time.Duration,
|
||||
for _, metric := range evset.Metrics {
|
||||
// The metric scope is determined in the Init() function
|
||||
// Get the map scope-id -> tids
|
||||
scopemap := m.scopeRespTids[metric.Scope]
|
||||
scopemap := m.cpu2tid
|
||||
if metric.Type == "socket" {
|
||||
scopemap = m.sock2tid
|
||||
}
|
||||
for domain, tid := range scopemap {
|
||||
if tid >= 0 {
|
||||
value, err := agg.EvalFloat64Condition(metric.Calc, m.results[group][tid])
|
||||
@ -474,13 +320,15 @@ func (m *LikwidCollector) calcEventsetMetrics(group int, interval time.Duration,
|
||||
// Now we have the result, send it with the proper tags
|
||||
if !math.IsNaN(value) {
|
||||
if metric.Publish {
|
||||
tags := map[string]string{"type": metric.Scope.String()}
|
||||
if metric.Scope != "node" {
|
||||
tags["type-id"] = fmt.Sprintf("%d", domain)
|
||||
}
|
||||
fields := map[string]interface{}{"value": value}
|
||||
y, err := lp.New(metric.Name, tags, m.meta, fields, time.Now())
|
||||
y, err := lp.New(metric.Name, map[string]string{"type": metric.Type}, m.meta, fields, time.Now())
|
||||
if err == nil {
|
||||
if metric.Type != "node" {
|
||||
y.AddTag("type-id", fmt.Sprintf("%d", domain))
|
||||
}
|
||||
if len(metric.Unit) > 0 {
|
||||
y.AddMeta("unit", metric.Unit)
|
||||
}
|
||||
output <- y
|
||||
}
|
||||
}
|
||||
@ -495,7 +343,10 @@ func (m *LikwidCollector) calcEventsetMetrics(group int, interval time.Duration,
|
||||
// Go over the global metrics, derive the value out of the event sets' metric values and send it
|
||||
func (m *LikwidCollector) calcGlobalMetrics(interval time.Duration, output chan lp.CCMetric) error {
|
||||
for _, metric := range m.config.Metrics {
|
||||
scopemap := m.scopeRespTids[metric.Scope]
|
||||
scopemap := m.cpu2tid
|
||||
if metric.Type == "socket" {
|
||||
scopemap = m.sock2tid
|
||||
}
|
||||
for domain, tid := range scopemap {
|
||||
if tid >= 0 {
|
||||
// Here we generate parameter list
|
||||
@ -521,13 +372,16 @@ func (m *LikwidCollector) calcGlobalMetrics(interval time.Duration, output chan
|
||||
// Now we have the result, send it with the proper tags
|
||||
if !math.IsNaN(value) {
|
||||
if metric.Publish {
|
||||
tags := map[string]string{"type": metric.Scope.String()}
|
||||
if metric.Scope != "node" {
|
||||
tags["type-id"] = fmt.Sprintf("%d", domain)
|
||||
}
|
||||
tags := map[string]string{"type": metric.Type}
|
||||
fields := map[string]interface{}{"value": value}
|
||||
y, err := lp.New(metric.Name, tags, m.meta, fields, time.Now())
|
||||
if err == nil {
|
||||
if metric.Type != "node" {
|
||||
y.AddTag("type-id", fmt.Sprintf("%d", domain))
|
||||
}
|
||||
if len(metric.Unit) > 0 {
|
||||
y.AddMeta("unit", metric.Unit)
|
||||
}
|
||||
output <- y
|
||||
}
|
||||
}
|
||||
|
@ -4,14 +4,17 @@
|
||||
The `likwid` collector is probably the most complicated collector. The LIKWID library is included as static library with *direct* access mode. The *direct* access mode is suitable if the daemon is executed by a root user. The static library does not contain the performance groups, so all information needs to be provided in the configuration.
|
||||
|
||||
The `likwid` configuration consists of two parts, the "eventsets" and "globalmetrics":
|
||||
- An event set list itself has two parts, the "events" and a set of derivable "metrics". Each of the "events" is a counter:event pair in LIKWID's syntax. The "metrics" are a list of formulas to derive the metric value from the measurements of the "events". Each metric has a name, the formula, a scope and a publish flag. Counter names can be used like variables in the formulas, so `PMC0+PMC1` sums the measurements for the both events configured in the counters `PMC0` and `PMC1`. The scope tells the Collector whether it is a metric for each hardware thread (`cpu`) or each CPU socket (`socket`). The last one is the publishing flag. It tells the collector whether a metric should be sent to the router.
|
||||
- The global metrics are metrics which require data from all event set measurements to be derived. The inputs are the metrics in the event sets. Similar to the metrics in the event sets, the global metrics are defined by a name, a formula, a scope and a publish flag. See event set metrics for details. The only difference is that there is no access to the raw event measurements anymore but only to the metrics. So, the idea is to derive a metric in the "eventsets" section and reuse it in the "globalmetrics" part. If you need a metric only for deriving the global metrics, disable forwarding of the event set metrics. **Be aware** that the combination might be misleading because the "behavior" of a metric changes over time and the multiple measurements might count different computing phases.
|
||||
- An event set list itself has two parts, the "events" and a set of derivable "metrics". Each of the "events" is a counter:event pair in LIKWID's syntax. The "metrics" are a list of formulas to derive the metric value from the measurements of the "events". Each metric has a name, the formula, a scope and a publish flag. Counter names can be used like variables in the formulas, so `PMC0+PMC1` sums the measurements for the both events configured in the counters `PMC0` and `PMC1`. The scope tells the Collector whether it is a metric for each hardware thread (`cpu`) or each CPU socket (`socket`). You may specify a unit for the metric with `unit`. The last one is the publishing flag. It tells the collector whether a metric should be sent to the router.
|
||||
- The global metrics are metrics which require data from all event set measurements to be derived. The inputs are the metrics in the event sets. Similar to the metrics in the event sets, the global metrics are defined by a name, a formula, a scope and a publish flag. See event set metrics for details. The only difference is that there is no access to the raw event measurements anymore but only to the metrics. So, the idea is to derive a metric in the "eventsets" section and reuse it in the "globalmetrics" part. If you need a metric only for deriving the global metrics, disable forwarding of the event set metrics (`publish=false`). **Be aware** that the combination might be misleading because the "behavior" of a metric changes over time and the multiple measurements might count different computing phases. Similar to the metrics in the eventset, you can specify a metric unit with the `unit` field.
|
||||
|
||||
Additional options:
|
||||
- `access_mode` : Method to use for hardware performance monitoring (`direct` access as root user, `accessdaemon` for the daemon mode)
|
||||
- `accessdaemon_path`: Folder with the access daemon `likwid-accessD`, commonly `$LIKWID_INSTALL_LOC/sbin`
|
||||
- `force_overwrite`: Same as setting `LIKWID_FORCE=1`. In case counters are already in-use, LIKWID overwrites their configuration to do its measurements
|
||||
- `invalid_to_zero`: In some cases, the calculations result in `NaN` or `Inf`. With this option, all `NaN` and `Inf` values are replaces with `0.0`.
|
||||
- `access_mode`: Specify LIKWID access mode: `direct` for direct register access as root user or `accessdaemon`
|
||||
- `accessdaemon_path`: Folder of the accessDaemon `likwid-accessD`
|
||||
- `liblikwid_path`: Location of `liblikwid.so`
|
||||
|
||||
### Available metric scopes
|
||||
|
||||
@ -54,7 +57,8 @@ $ scripts/likwid_perfgroup_to_cc_config.py ICX MEM_DP
|
||||
"calc": "time",
|
||||
"name": "Runtime (RDTSC) [s]",
|
||||
"publish": true,
|
||||
"scope": "hwthread"
|
||||
"unit": "seconds"
|
||||
"scope": "cpu"
|
||||
},
|
||||
{
|
||||
"..." : "..."
|
||||
@ -104,25 +108,28 @@ $ chwon $CCUSER /var/run/likwid.lock
|
||||
{
|
||||
"name": "ipc",
|
||||
"calc": "PMC0/PMC1",
|
||||
"scope": "cpu",
|
||||
"type": "cpu",
|
||||
"publish": true
|
||||
},
|
||||
{
|
||||
"name": "flops_any",
|
||||
"calc": "0.000001*PMC2/time",
|
||||
"scope": "cpu",
|
||||
"unit": "MFlops/s",
|
||||
"type": "cpu",
|
||||
"publish": true
|
||||
},
|
||||
{
|
||||
"name": "clock_mhz",
|
||||
"name": "clock",
|
||||
"calc": "0.000001*(FIXC1/FIXC2)/inverseClock",
|
||||
"scope": "cpu",
|
||||
"type": "cpu",
|
||||
"unit": "MHz",
|
||||
"publish": true
|
||||
},
|
||||
{
|
||||
"name": "mem1",
|
||||
"calc": "0.000001*(DFC0+DFC1+DFC2+DFC3)*64.0/time",
|
||||
"scope": "socket",
|
||||
"unit": "Mbyte/s",
|
||||
"type": "socket",
|
||||
"publish": false
|
||||
}
|
||||
]
|
||||
@ -140,19 +147,22 @@ $ chwon $CCUSER /var/run/likwid.lock
|
||||
{
|
||||
"name": "pwr_core",
|
||||
"calc": "PWR0/time",
|
||||
"scope": "socket",
|
||||
"unit": "Watt"
|
||||
"type": "socket",
|
||||
"publish": true
|
||||
},
|
||||
{
|
||||
"name": "pwr_pkg",
|
||||
"calc": "PWR1/time",
|
||||
"scope": "socket",
|
||||
"type": "socket",
|
||||
"unit": "Watt"
|
||||
"publish": true
|
||||
},
|
||||
{
|
||||
"name": "mem2",
|
||||
"calc": "0.000001*(DFC0+DFC1+DFC2+DFC3)*64.0/time",
|
||||
"scope": "socket",
|
||||
"unit": "Mbyte/s",
|
||||
"type": "socket",
|
||||
"publish": false
|
||||
}
|
||||
]
|
||||
@ -162,7 +172,8 @@ $ chwon $CCUSER /var/run/likwid.lock
|
||||
{
|
||||
"name": "mem_bw",
|
||||
"calc": "mem1+mem2",
|
||||
"scope": "socket",
|
||||
"type": "socket",
|
||||
"unit": "Mbyte/s",
|
||||
"publish": true
|
||||
}
|
||||
]
|
||||
@ -198,3 +209,4 @@ IPC PMC0/PMC1 -> {
|
||||
-> ]
|
||||
```
|
||||
|
||||
The script `scripts/likwid_perfgroup_to_cc_config.py` might help you.
|
@ -32,11 +32,12 @@ type MemstatCollectorNode struct {
|
||||
|
||||
type MemstatCollector struct {
|
||||
metricCollector
|
||||
stats map[string]int64
|
||||
tags map[string]string
|
||||
matches map[string]string
|
||||
config MemstatCollectorConfig
|
||||
nodefiles map[int]MemstatCollectorNode
|
||||
stats map[string]int64
|
||||
tags map[string]string
|
||||
matches map[string]string
|
||||
config MemstatCollectorConfig
|
||||
nodefiles map[int]MemstatCollectorNode
|
||||
sendMemUsed bool
|
||||
}
|
||||
|
||||
func getStats(filename string) map[string]float64 {
|
||||
@ -77,7 +78,7 @@ func (m *MemstatCollector) Init(config json.RawMessage) error {
|
||||
return err
|
||||
}
|
||||
}
|
||||
m.meta = map[string]string{"source": m.name, "group": "Memory", "unit": "kByte"}
|
||||
m.meta = map[string]string{"source": m.name, "group": "Memory", "unit": "GByte"}
|
||||
m.stats = make(map[string]int64)
|
||||
m.matches = make(map[string]string)
|
||||
m.tags = map[string]string{"type": "node"}
|
||||
@ -99,6 +100,10 @@ func (m *MemstatCollector) Init(config json.RawMessage) error {
|
||||
m.matches[k] = v
|
||||
}
|
||||
}
|
||||
m.sendMemUsed = false
|
||||
if _, skip := stringArrayContains(m.config.ExcludeMetrics, "mem_used"); !skip {
|
||||
m.sendMemUsed = true
|
||||
}
|
||||
if len(m.matches) == 0 {
|
||||
return errors.New("no metrics to collect")
|
||||
}
|
||||
@ -152,23 +157,26 @@ func (m *MemstatCollector) Read(interval time.Duration, output chan lp.CCMetric)
|
||||
if v, ok := stats[match]; ok {
|
||||
value = v
|
||||
}
|
||||
y, err := lp.New(name, tags, m.meta, map[string]interface{}{"value": value}, time.Now())
|
||||
y, err := lp.New(name, tags, m.meta, map[string]interface{}{"value": value * 1e-6}, time.Now())
|
||||
if err == nil {
|
||||
output <- y
|
||||
}
|
||||
}
|
||||
if _, skip := stringArrayContains(m.config.ExcludeMetrics, "mem_used"); !skip {
|
||||
if freeVal, free := stats["MemFree"]; free {
|
||||
if bufVal, buffers := stats["Buffers"]; buffers {
|
||||
if cacheVal, cached := stats["Cached"]; cached {
|
||||
memUsed := stats["MemTotal"] - (freeVal + bufVal + cacheVal)
|
||||
y, err := lp.New("mem_used", tags, m.meta, map[string]interface{}{"value": memUsed}, time.Now())
|
||||
if err == nil {
|
||||
output <- y
|
||||
if m.sendMemUsed {
|
||||
memUsed := 0.0
|
||||
if totalVal, total := stats["MemTotal"]; total {
|
||||
if freeVal, free := stats["MemFree"]; free {
|
||||
if bufVal, buffers := stats["Buffers"]; buffers {
|
||||
if cacheVal, cached := stats["Cached"]; cached {
|
||||
memUsed = totalVal - (freeVal + bufVal + cacheVal)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
y, err := lp.New("mem_used", tags, m.meta, map[string]interface{}{"value": memUsed * 1e-6}, time.Now())
|
||||
if err == nil {
|
||||
output <- y
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -22,11 +22,12 @@ type NetstatCollectorConfig struct {
|
||||
}
|
||||
|
||||
type NetstatCollectorMetric struct {
|
||||
name string
|
||||
index int
|
||||
tags map[string]string
|
||||
rate_tags map[string]string
|
||||
lastValue int64
|
||||
name string
|
||||
index int
|
||||
tags map[string]string
|
||||
meta map[string]string
|
||||
meta_rates map[string]string
|
||||
lastValue int64
|
||||
}
|
||||
|
||||
type NetstatCollector struct {
|
||||
@ -40,29 +41,25 @@ func (m *NetstatCollector) Init(config json.RawMessage) error {
|
||||
m.name = "NetstatCollector"
|
||||
m.setup()
|
||||
m.lastTimestamp = time.Now()
|
||||
m.meta = map[string]string{
|
||||
"source": m.name,
|
||||
"group": "Network",
|
||||
}
|
||||
|
||||
const (
|
||||
fieldInterface = iota
|
||||
fieldReceiveBytes = iota
|
||||
fieldReceivePackets = iota
|
||||
fieldReceiveErrs = iota
|
||||
fieldReceiveDrop = iota
|
||||
fieldReceiveFifo = iota
|
||||
fieldReceiveFrame = iota
|
||||
fieldReceiveCompressed = iota
|
||||
fieldReceiveMulticast = iota
|
||||
fieldTransmitBytes = iota
|
||||
fieldTransmitPackets = iota
|
||||
fieldTransmitErrs = iota
|
||||
fieldTransmitDrop = iota
|
||||
fieldTransmitFifo = iota
|
||||
fieldTransmitColls = iota
|
||||
fieldTransmitCarrier = iota
|
||||
fieldTransmitCompressed = iota
|
||||
fieldInterface = iota
|
||||
fieldReceiveBytes
|
||||
fieldReceivePackets
|
||||
fieldReceiveErrs
|
||||
fieldReceiveDrop
|
||||
fieldReceiveFifo
|
||||
fieldReceiveFrame
|
||||
fieldReceiveCompressed
|
||||
fieldReceiveMulticast
|
||||
fieldTransmitBytes
|
||||
fieldTransmitPackets
|
||||
fieldTransmitErrs
|
||||
fieldTransmitDrop
|
||||
fieldTransmitFifo
|
||||
fieldTransmitColls
|
||||
fieldTransmitCarrier
|
||||
fieldTransmitCompressed
|
||||
)
|
||||
|
||||
m.matches = make(map[string][]NetstatCollectorMetric)
|
||||
@ -104,39 +101,44 @@ func (m *NetstatCollector) Init(config json.RawMessage) error {
|
||||
|
||||
// Check if device is a included device
|
||||
if _, ok := stringArrayContains(m.config.IncludeDevices, dev); ok {
|
||||
tags_unit_byte := map[string]string{"device": dev, "type": "node", "unit": "bytes"}
|
||||
tags_unit_byte_per_sec := map[string]string{"device": dev, "type": "node", "unit": "bytes/sec"}
|
||||
tags_unit_pkts := map[string]string{"device": dev, "type": "node", "unit": "packets"}
|
||||
tags_unit_pkts_per_sec := map[string]string{"device": dev, "type": "node", "unit": "packets/sec"}
|
||||
tags := map[string]string{"device": dev, "type": "node"}
|
||||
meta_unit_byte := map[string]string{"source": m.name, "group": "Network", "unit": "bytes"}
|
||||
meta_unit_byte_per_sec := map[string]string{"source": m.name, "group": "Network", "unit": "bytes/sec"}
|
||||
meta_unit_pkts := map[string]string{"source": m.name, "group": "Network", "unit": "packets"}
|
||||
meta_unit_pkts_per_sec := map[string]string{"source": m.name, "group": "Network", "unit": "packets/sec"}
|
||||
|
||||
m.matches[dev] = []NetstatCollectorMetric{
|
||||
{
|
||||
name: "net_bytes_in",
|
||||
index: fieldReceiveBytes,
|
||||
lastValue: -1,
|
||||
tags: tags_unit_byte,
|
||||
rate_tags: tags_unit_byte_per_sec,
|
||||
name: "net_bytes_in",
|
||||
index: fieldReceiveBytes,
|
||||
lastValue: -1,
|
||||
tags: tags,
|
||||
meta: meta_unit_byte,
|
||||
meta_rates: meta_unit_byte_per_sec,
|
||||
},
|
||||
{
|
||||
name: "net_pkts_in",
|
||||
index: fieldReceivePackets,
|
||||
lastValue: -1,
|
||||
tags: tags_unit_pkts,
|
||||
rate_tags: tags_unit_pkts_per_sec,
|
||||
name: "net_pkts_in",
|
||||
index: fieldReceivePackets,
|
||||
lastValue: -1,
|
||||
tags: tags,
|
||||
meta: meta_unit_pkts,
|
||||
meta_rates: meta_unit_pkts_per_sec,
|
||||
},
|
||||
{
|
||||
name: "net_bytes_out",
|
||||
index: fieldTransmitBytes,
|
||||
lastValue: -1,
|
||||
tags: tags_unit_byte,
|
||||
rate_tags: tags_unit_byte_per_sec,
|
||||
name: "net_bytes_out",
|
||||
index: fieldTransmitBytes,
|
||||
lastValue: -1,
|
||||
tags: tags,
|
||||
meta: meta_unit_byte,
|
||||
meta_rates: meta_unit_byte_per_sec,
|
||||
},
|
||||
{
|
||||
name: "net_pkts_out",
|
||||
index: fieldTransmitPackets,
|
||||
lastValue: -1,
|
||||
tags: tags_unit_pkts,
|
||||
rate_tags: tags_unit_pkts_per_sec,
|
||||
name: "net_pkts_out",
|
||||
index: fieldTransmitPackets,
|
||||
lastValue: -1,
|
||||
tags: tags,
|
||||
meta: meta_unit_pkts,
|
||||
meta_rates: meta_unit_pkts_per_sec,
|
||||
},
|
||||
}
|
||||
}
|
||||
@ -194,14 +196,14 @@ func (m *NetstatCollector) Read(interval time.Duration, output chan lp.CCMetric)
|
||||
continue
|
||||
}
|
||||
if m.config.SendAbsoluteValues {
|
||||
if y, err := lp.New(metric.name, metric.tags, m.meta, map[string]interface{}{"value": v}, now); err == nil {
|
||||
if y, err := lp.New(metric.name, metric.tags, metric.meta, map[string]interface{}{"value": v}, now); err == nil {
|
||||
output <- y
|
||||
}
|
||||
}
|
||||
if m.config.SendDerivedValues {
|
||||
if metric.lastValue >= 0 {
|
||||
rate := float64(v-metric.lastValue) / timeDiff
|
||||
if y, err := lp.New(metric.name+"_bw", metric.rate_tags, m.meta, map[string]interface{}{"value": rate}, now); err == nil {
|
||||
if y, err := lp.New(metric.name+"_bw", metric.tags, metric.meta_rates, map[string]interface{}{"value": rate}, now); err == nil {
|
||||
output <- y
|
||||
}
|
||||
}
|
||||
|
@ -25,6 +25,7 @@ type metricRouterTagConfig struct {
|
||||
|
||||
// Metric router configuration
|
||||
type metricRouterConfig struct {
|
||||
HostnameTagName string `json:"hostname_tag"` // Key name used when adding the hostname to a metric (default 'hostname')
|
||||
AddTags []metricRouterTagConfig `json:"add_tags"` // List of tags that are added when the condition is met
|
||||
DelTags []metricRouterTagConfig `json:"delete_tags"` // List of tags that are removed when the condition is met
|
||||
IntervalAgg []agg.MetricAggregatorIntervalConfig `json:"interval_aggregates"` // List of aggregation function processed at the end of an interval
|
||||
@ -33,6 +34,7 @@ type metricRouterConfig struct {
|
||||
RenameMetrics map[string]string `json:"rename_metrics"` // Map to rename metric name from key to value
|
||||
IntervalStamp bool `json:"interval_timestamp"` // Update timestamp periodically by ticker each interval?
|
||||
NumCacheIntervals int `json:"num_cache_intervals"` // Number of intervals of cached metrics for evaluation
|
||||
MaxForward int `json:"max_forward"` // Number of maximal forwarded metrics at one select
|
||||
dropMetrics map[string]bool // Internal map for O(1) lookup
|
||||
}
|
||||
|
||||
@ -76,7 +78,8 @@ func (r *metricRouter) Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, rout
|
||||
r.cache_input = make(chan lp.CCMetric)
|
||||
r.wg = wg
|
||||
r.ticker = ticker
|
||||
r.maxForward = ROUTER_MAX_FORWARD
|
||||
r.config.MaxForward = ROUTER_MAX_FORWARD
|
||||
r.config.HostnameTagName = "hostname"
|
||||
|
||||
// Set hostname
|
||||
hostname, err := os.Hostname()
|
||||
@ -100,6 +103,7 @@ func (r *metricRouter) Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, rout
|
||||
cclog.ComponentError("MetricRouter", err.Error())
|
||||
return err
|
||||
}
|
||||
r.maxForward = r.config.MaxForward
|
||||
if r.config.NumCacheIntervals > 0 {
|
||||
r.cache, err = NewCache(r.cache_input, r.ticker, &r.cachewg, r.config.NumCacheIntervals)
|
||||
if err != nil {
|
||||
@ -244,8 +248,10 @@ func (r *metricRouter) Start() {
|
||||
cclog.ComponentDebug("MetricRouter", "FORWARD", point)
|
||||
r.DoAddTags(point)
|
||||
r.DoDelTags(point)
|
||||
if new, ok := r.config.RenameMetrics[point.Name()]; ok {
|
||||
name := point.Name()
|
||||
if new, ok := r.config.RenameMetrics[name]; ok {
|
||||
point.SetName(new)
|
||||
point.AddMeta("oldname", name)
|
||||
}
|
||||
r.DoAddTags(point)
|
||||
r.DoDelTags(point)
|
||||
@ -258,7 +264,7 @@ func (r *metricRouter) Start() {
|
||||
// Foward message received from collector channel
|
||||
coll_forward := func(p lp.CCMetric) {
|
||||
// receive from metric collector
|
||||
p.AddTag("hostname", r.hostname)
|
||||
p.AddTag(r.config.HostnameTagName, r.hostname)
|
||||
if r.config.IntervalStamp {
|
||||
p.SetTime(r.timestamp)
|
||||
}
|
||||
@ -287,7 +293,7 @@ func (r *metricRouter) Start() {
|
||||
cache_forward := func(p lp.CCMetric) {
|
||||
// receive from metric collector
|
||||
if !r.dropMetric(p) {
|
||||
p.AddTag("hostname", r.hostname)
|
||||
p.AddTag(r.config.HostnameTagName, r.hostname)
|
||||
forward(p)
|
||||
}
|
||||
}
|
||||
@ -309,19 +315,19 @@ func (r *metricRouter) Start() {
|
||||
|
||||
case p := <-r.coll_input:
|
||||
coll_forward(p)
|
||||
for i := 0; len(r.coll_input) > 0 && i < r.maxForward; i++ {
|
||||
for i := 0; len(r.coll_input) > 0 && i < (r.maxForward-1); i++ {
|
||||
coll_forward(<-r.coll_input)
|
||||
}
|
||||
|
||||
case p := <-r.recv_input:
|
||||
recv_forward(p)
|
||||
for i := 0; len(r.recv_input) > 0 && i < r.maxForward; i++ {
|
||||
for i := 0; len(r.recv_input) > 0 && i < (r.maxForward-1); i++ {
|
||||
recv_forward(<-r.recv_input)
|
||||
}
|
||||
|
||||
case p := <-r.cache_input:
|
||||
cache_forward(p)
|
||||
for i := 0; len(r.cache_input) > 0 && i < r.maxForward; i++ {
|
||||
for i := 0; len(r.cache_input) > 0 && i < (r.maxForward-1); i++ {
|
||||
cache_forward(<-r.cache_input)
|
||||
}
|
||||
}
|
||||
|
12
scripts/cc-metric-collector.deb.control
Normal file
12
scripts/cc-metric-collector.deb.control
Normal file
@ -0,0 +1,12 @@
|
||||
Package: cc-metric-collector
|
||||
Version: {VERSION}
|
||||
Installed-Size: {INSTALLED_SIZE}
|
||||
Architecture: {ARCH}
|
||||
Maintainer: thomas.gruber@fau.de
|
||||
Depends: libc6 (>= 2.2.1)
|
||||
Build-Depends: debhelper-compat (= 13), git, golang-go
|
||||
Description: Metric collection daemon from the ClusterCockpit suite
|
||||
Homepage: https://github.com/ClusterCockpit/cc-metric-collector
|
||||
Source: cc-metric-collector
|
||||
Rules-Requires-Root: no
|
||||
|
@ -148,10 +148,14 @@ type GangliaMetricConfig struct {
|
||||
Unit string
|
||||
Group string
|
||||
Value string
|
||||
Name string
|
||||
}
|
||||
|
||||
func GetCommonGangliaConfig(point lp.CCMetric) GangliaMetricConfig {
|
||||
mname := GangliaMetricRename(point.Name())
|
||||
if oldname, ok := point.GetMeta("oldname"); ok {
|
||||
mname = GangliaMetricRename(oldname)
|
||||
}
|
||||
for _, group := range CommonGangliaMetrics {
|
||||
for _, metric := range group.Metrics {
|
||||
if metric.Name == mname {
|
||||
@ -187,6 +191,7 @@ func GetCommonGangliaConfig(point lp.CCMetric) GangliaMetricConfig {
|
||||
Tmax: metric.Tmax,
|
||||
Unit: metric.Unit,
|
||||
Value: valueStr,
|
||||
Name: GangliaMetricRename(mname),
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -198,10 +203,15 @@ func GetCommonGangliaConfig(point lp.CCMetric) GangliaMetricConfig {
|
||||
Tmax: 0,
|
||||
Unit: "",
|
||||
Value: "",
|
||||
Name: "",
|
||||
}
|
||||
}
|
||||
|
||||
func GetGangliaConfig(point lp.CCMetric) GangliaMetricConfig {
|
||||
mname := GangliaMetricRename(point.Name())
|
||||
if oldname, ok := point.GetMeta("oldname"); ok {
|
||||
mname = GangliaMetricRename(oldname)
|
||||
}
|
||||
group := ""
|
||||
if g, ok := point.GetMeta("group"); ok {
|
||||
group = g
|
||||
@ -254,5 +264,6 @@ func GetGangliaConfig(point lp.CCMetric) GangliaMetricConfig {
|
||||
Tmax: DEFAULT_GANGLIA_METRIC_TMAX,
|
||||
Unit: unit,
|
||||
Value: valueStr,
|
||||
Name: GangliaMetricRename(mname),
|
||||
}
|
||||
}
|
||||
|
@ -39,16 +39,13 @@ func (s *GangliaSink) Write(point lp.CCMetric) error {
|
||||
//var tagsstr []string
|
||||
var argstr []string
|
||||
|
||||
// Get metric name
|
||||
metricname := GangliaMetricRename(point.Name())
|
||||
|
||||
// Get metric config (type, value, ... in suitable format)
|
||||
conf := GetCommonGangliaConfig(point)
|
||||
if len(conf.Type) == 0 {
|
||||
conf = GetGangliaConfig(point)
|
||||
}
|
||||
if len(conf.Type) == 0 {
|
||||
return fmt.Errorf("metric %s has no 'value' field", metricname)
|
||||
return fmt.Errorf("metric %q (Ganglia name %q) has no 'value' field", point.Name(), conf.Name)
|
||||
}
|
||||
|
||||
if s.config.AddGangliaGroup {
|
||||
@ -70,7 +67,7 @@ func (s *GangliaSink) Write(point lp.CCMetric) error {
|
||||
if s.config.AddTypeToName {
|
||||
argstr = append(argstr, fmt.Sprintf("--name=%s", GangliaMetricName(point)))
|
||||
} else {
|
||||
argstr = append(argstr, fmt.Sprintf("--name=%s", metricname))
|
||||
argstr = append(argstr, fmt.Sprintf("--name=%s", conf.Name))
|
||||
}
|
||||
argstr = append(argstr, fmt.Sprintf("--slope=%s", conf.Slope))
|
||||
argstr = append(argstr, fmt.Sprintf("--value=%s", conf.Value))
|
||||
|
@ -6,6 +6,7 @@ import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
|
||||
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
|
||||
@ -26,15 +27,21 @@ type InfluxAsyncSinkConfig struct {
|
||||
// Maximum number of points sent to server in single request. Default 5000
|
||||
BatchSize uint `json:"batch_size,omitempty"`
|
||||
// Interval, in ms, in which is buffer flushed if it has not been already written (by reaching batch size) . Default 1000ms
|
||||
FlushInterval uint `json:"flush_interval,omitempty"`
|
||||
FlushInterval uint `json:"flush_interval,omitempty"`
|
||||
InfluxRetryInterval string `json:"retry_interval"`
|
||||
InfluxExponentialBase uint `json:"retry_exponential_base"`
|
||||
InfluxMaxRetries uint `json:"max_retries"`
|
||||
InfluxMaxRetryTime string `json:"max_retry_time"`
|
||||
}
|
||||
|
||||
type InfluxAsyncSink struct {
|
||||
sink
|
||||
client influxdb2.Client
|
||||
writeApi influxdb2Api.WriteAPI
|
||||
errors <-chan error
|
||||
config InfluxAsyncSinkConfig
|
||||
client influxdb2.Client
|
||||
writeApi influxdb2Api.WriteAPI
|
||||
errors <-chan error
|
||||
config InfluxAsyncSinkConfig
|
||||
influxRetryInterval uint
|
||||
influxMaxRetryTime uint
|
||||
}
|
||||
|
||||
func (s *InfluxAsyncSink) connect() error {
|
||||
@ -63,6 +70,11 @@ func (s *InfluxAsyncSink) connect() error {
|
||||
InsecureSkipVerify: true,
|
||||
},
|
||||
)
|
||||
clientOptions.SetMaxRetryInterval(s.influxRetryInterval)
|
||||
clientOptions.SetMaxRetryTime(s.influxMaxRetryTime)
|
||||
clientOptions.SetExponentialBase(s.config.InfluxExponentialBase)
|
||||
clientOptions.SetMaxRetries(s.config.InfluxMaxRetries)
|
||||
|
||||
s.client = influxdb2.NewClientWithOptions(uri, auth, clientOptions)
|
||||
s.writeApi = s.client.WriteAPI(s.config.Organization, s.config.Database)
|
||||
ok, err := s.client.Ping(context.Background())
|
||||
@ -99,6 +111,33 @@ func NewInfluxAsyncSink(name string, config json.RawMessage) (Sink, error) {
|
||||
|
||||
// Set default for maximum number of points sent to server in single request.
|
||||
s.config.BatchSize = 100
|
||||
s.influxRetryInterval = uint(time.Duration(1) * time.Second)
|
||||
s.config.InfluxRetryInterval = "1s"
|
||||
s.influxMaxRetryTime = uint(7 * time.Duration(24) * time.Hour)
|
||||
s.config.InfluxMaxRetryTime = "168h"
|
||||
s.config.InfluxMaxRetries = 20
|
||||
s.config.InfluxExponentialBase = 2
|
||||
|
||||
// Default retry intervals (in seconds)
|
||||
// 1 2
|
||||
// 2 4
|
||||
// 4 8
|
||||
// 8 16
|
||||
// 16 32
|
||||
// 32 64
|
||||
// 64 128
|
||||
// 128 256
|
||||
// 256 512
|
||||
// 512 1024
|
||||
// 1024 2048
|
||||
// 2048 4096
|
||||
// 4096 8192
|
||||
// 8192 16384
|
||||
// 16384 32768
|
||||
// 32768 65536
|
||||
// 65536 131072
|
||||
// 131072 262144
|
||||
// 262144 524288
|
||||
|
||||
if len(config) > 0 {
|
||||
err := json.Unmarshal(config, &s.config)
|
||||
@ -114,6 +153,16 @@ func NewInfluxAsyncSink(name string, config json.RawMessage) (Sink, error) {
|
||||
return nil, errors.New("not all configuration variables set required by InfluxAsyncSink")
|
||||
}
|
||||
|
||||
toUint := func(duration string, def uint) uint {
|
||||
t, err := time.ParseDuration(duration)
|
||||
if err == nil {
|
||||
return uint(t.Milliseconds())
|
||||
}
|
||||
return def
|
||||
}
|
||||
s.influxRetryInterval = toUint(s.config.InfluxRetryInterval, s.influxRetryInterval)
|
||||
s.influxMaxRetryTime = toUint(s.config.InfluxMaxRetryTime, s.influxMaxRetryTime)
|
||||
|
||||
// Connect to InfluxDB server
|
||||
if err := s.connect(); err != nil {
|
||||
return nil, fmt.Errorf("unable to connect: %v", err)
|
||||
|
@ -18,6 +18,10 @@ The `influxasync` sink uses the official [InfluxDB golang client](https://pkg.go
|
||||
"organization": "myorg",
|
||||
"ssl": true,
|
||||
"batch_size": 200,
|
||||
"retry_interval" : "1s",
|
||||
"retry_exponential_base" : 2,
|
||||
"max_retries": 20,
|
||||
"max_retry_time" : "168h"
|
||||
}
|
||||
}
|
||||
```
|
||||
@ -31,4 +35,10 @@ The `influxasync` sink uses the official [InfluxDB golang client](https://pkg.go
|
||||
- `password`: Password for basic authentification
|
||||
- `organization`: Organization in the InfluxDB
|
||||
- `ssl`: Use SSL connection
|
||||
- `batch_size`: batch up metrics internally, default 100
|
||||
- `batch_size`: batch up metrics internally, default 100
|
||||
- `retry_interval`: Base retry interval for failed write requests, default 1s
|
||||
- `retry_exponential_base`: The retry interval is exponentially increased with this base, default 2
|
||||
- `max_retries`: Maximal number of retry attempts
|
||||
- `max_retry_time`: Maximal time to retry failed writes, default 168h (one week)
|
||||
|
||||
For information about the calculation of the retry interval settings, see [offical influxdb-client-go documentation](https://github.com/influxdata/influxdb-client-go#handling-of-failed-async-writes)
|
@ -6,6 +6,7 @@ import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
|
||||
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
|
||||
@ -15,21 +16,29 @@ import (
|
||||
|
||||
type InfluxSinkConfig struct {
|
||||
defaultSinkConfig
|
||||
Host string `json:"host,omitempty"`
|
||||
Port string `json:"port,omitempty"`
|
||||
Database string `json:"database,omitempty"`
|
||||
User string `json:"user,omitempty"`
|
||||
Password string `json:"password,omitempty"`
|
||||
Organization string `json:"organization,omitempty"`
|
||||
SSL bool `json:"ssl,omitempty"`
|
||||
RetentionPol string `json:"retention_policy,omitempty"`
|
||||
Host string `json:"host,omitempty"`
|
||||
Port string `json:"port,omitempty"`
|
||||
Database string `json:"database,omitempty"`
|
||||
User string `json:"user,omitempty"`
|
||||
Password string `json:"password,omitempty"`
|
||||
Organization string `json:"organization,omitempty"`
|
||||
SSL bool `json:"ssl,omitempty"`
|
||||
RetentionPol string `json:"retention_policy,omitempty"`
|
||||
InfluxRetryInterval string `json:"retry_interval"`
|
||||
InfluxExponentialBase uint `json:"retry_exponential_base"`
|
||||
InfluxMaxRetries uint `json:"max_retries"`
|
||||
InfluxMaxRetryTime string `json:"max_retry_time"`
|
||||
//InfluxMaxRetryDelay string `json:"max_retry_delay"` // It is mentioned in the docs but there is no way to set it
|
||||
}
|
||||
|
||||
type InfluxSink struct {
|
||||
sink
|
||||
client influxdb2.Client
|
||||
writeApi influxdb2Api.WriteAPIBlocking
|
||||
config InfluxSinkConfig
|
||||
client influxdb2.Client
|
||||
writeApi influxdb2Api.WriteAPIBlocking
|
||||
config InfluxSinkConfig
|
||||
influxRetryInterval uint
|
||||
influxMaxRetryTime uint
|
||||
//influxMaxRetryDelay uint
|
||||
}
|
||||
|
||||
func (s *InfluxSink) connect() error {
|
||||
@ -52,6 +61,12 @@ func (s *InfluxSink) connect() error {
|
||||
InsecureSkipVerify: true,
|
||||
},
|
||||
)
|
||||
|
||||
clientOptions.SetMaxRetryInterval(s.influxRetryInterval)
|
||||
clientOptions.SetMaxRetryTime(s.influxMaxRetryTime)
|
||||
clientOptions.SetExponentialBase(s.config.InfluxExponentialBase)
|
||||
clientOptions.SetMaxRetries(s.config.InfluxMaxRetries)
|
||||
|
||||
s.client = influxdb2.NewClientWithOptions(uri, auth, clientOptions)
|
||||
s.writeApi = s.client.WriteAPIBlocking(s.config.Organization, s.config.Database)
|
||||
ok, err := s.client.Ping(context.Background())
|
||||
@ -91,6 +106,13 @@ func NewInfluxSink(name string, config json.RawMessage) (Sink, error) {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
s.influxRetryInterval = uint(time.Duration(1) * time.Second)
|
||||
s.config.InfluxRetryInterval = "1s"
|
||||
s.influxMaxRetryTime = uint(7 * time.Duration(24) * time.Hour)
|
||||
s.config.InfluxMaxRetryTime = "168h"
|
||||
s.config.InfluxMaxRetries = 20
|
||||
s.config.InfluxExponentialBase = 2
|
||||
|
||||
if len(s.config.Host) == 0 ||
|
||||
len(s.config.Port) == 0 ||
|
||||
len(s.config.Database) == 0 ||
|
||||
@ -99,6 +121,16 @@ func NewInfluxSink(name string, config json.RawMessage) (Sink, error) {
|
||||
return nil, errors.New("not all configuration variables set required by InfluxSink")
|
||||
}
|
||||
|
||||
toUint := func(duration string, def uint) uint {
|
||||
t, err := time.ParseDuration(duration)
|
||||
if err == nil {
|
||||
return uint(t.Milliseconds())
|
||||
}
|
||||
return def
|
||||
}
|
||||
s.influxRetryInterval = toUint(s.config.InfluxRetryInterval, s.influxRetryInterval)
|
||||
s.influxMaxRetryTime = toUint(s.config.InfluxMaxRetryTime, s.influxMaxRetryTime)
|
||||
|
||||
// Connect to InfluxDB server
|
||||
if err := s.connect(); err != nil {
|
||||
return nil, fmt.Errorf("unable to connect: %v", err)
|
||||
|
@ -17,6 +17,10 @@ The `influxdb` sink uses the official [InfluxDB golang client](https://pkg.go.de
|
||||
"password" : "examplepw",
|
||||
"organization": "myorg",
|
||||
"ssl": true,
|
||||
"retry_interval" : "1s",
|
||||
"retry_exponential_base" : 2,
|
||||
"max_retries": 20,
|
||||
"max_retry_time" : "168h"
|
||||
}
|
||||
}
|
||||
```
|
||||
@ -29,4 +33,10 @@ The `influxdb` sink uses the official [InfluxDB golang client](https://pkg.go.de
|
||||
- `user`: Username for basic authentification
|
||||
- `password`: Password for basic authentification
|
||||
- `organization`: Organization in the InfluxDB
|
||||
- `ssl`: Use SSL connection
|
||||
- `ssl`: Use SSL connection
|
||||
- `retry_interval`: Base retry interval for failed write requests, default 1s
|
||||
- `retry_exponential_base`: The retry interval is exponentially increased with this base, default 2
|
||||
- `max_retries`: Maximal number of retry attempts
|
||||
- `max_retry_time`: Maximal time to retry failed writes, default 168h (one week)
|
||||
|
||||
For information about the calculation of the retry interval settings, see [offical influxdb-client-go documentation](https://github.com/influxdata/influxdb-client-go#handling-of-failed-async-writes)
|
@ -124,24 +124,21 @@ func (s *LibgangliaSink) Write(point lp.CCMetric) error {
|
||||
return s.cstrCache[key]
|
||||
}
|
||||
|
||||
// Get metric name
|
||||
metricname := GangliaMetricRename(point.Name())
|
||||
|
||||
conf := GetCommonGangliaConfig(point)
|
||||
if len(conf.Type) == 0 {
|
||||
conf = GetGangliaConfig(point)
|
||||
}
|
||||
if len(conf.Type) == 0 {
|
||||
return fmt.Errorf("metric %s has no 'value' field", metricname)
|
||||
return fmt.Errorf("metric %q (Ganglia name %q) has no 'value' field", point.Name(), conf.Name)
|
||||
}
|
||||
|
||||
if s.config.AddTypeToName {
|
||||
metricname = GangliaMetricName(point)
|
||||
conf.Name = GangliaMetricName(point)
|
||||
}
|
||||
|
||||
c_value = C.CString(conf.Value)
|
||||
c_type = lookup(conf.Type)
|
||||
c_name = lookup(metricname)
|
||||
c_name = lookup(conf.Name)
|
||||
|
||||
// Add unit
|
||||
unit := ""
|
||||
|
Loading…
x
Reference in New Issue
Block a user