mirror of
https://github.com/ClusterCockpit/cc-metric-collector.git
synced 2025-07-19 11:21:41 +02:00
Compare commits
43 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
ff08eaeb43 | ||
|
64c41be34c | ||
|
f4af520b2a | ||
|
31f10b3163 | ||
|
9ece27eec6 | ||
|
fdbdb79527 | ||
|
948c34d74d | ||
|
60de21c41e | ||
|
276c00442a | ||
|
c61b8d2877 | ||
|
6023abd028 | ||
|
0753c81156 | ||
|
092e7f6a71 | ||
|
f7e8b52667 | ||
|
02baef8c71 | ||
|
33d954f767 | ||
|
a5325a6535 | ||
|
d40163cf8f | ||
|
33fec95eac | ||
|
2c08e53be4 | ||
|
a2f9b23e85 | ||
|
d98076c792 | ||
|
a203370aaa | ||
|
f099a311a0 | ||
|
fe3a8d59b0 | ||
|
bac1f18b1d | ||
|
c8bca59de4 | ||
|
16c03d2aa2 | ||
|
2f044f4b58 | ||
|
f911ff802c | ||
|
2f36375470 | ||
|
6843902909 | ||
|
73981527d3 | ||
|
d542f32baa | ||
|
6b6566b0aa | ||
|
3598aed090 | ||
|
24e12ccc57 | ||
|
18a226183c | ||
|
9cfbe10247 | ||
|
66275ecf74 | ||
|
b4cc6d54ea | ||
|
45714fe337 | ||
|
a97c705f4c |
64
.github/workflows/AlmaLinux.yml
vendored
Normal file
64
.github/workflows/AlmaLinux.yml
vendored
Normal file
@@ -0,0 +1,64 @@
|
||||
# See: https://docs.github.com/en/actions/using-workflows/workflow-syntax-for-github-actions
|
||||
|
||||
# Workflow name
|
||||
name: AlmaLinux 8.5 RPM build
|
||||
|
||||
# Run on tag push
|
||||
on:
|
||||
push:
|
||||
tags:
|
||||
- '**'
|
||||
|
||||
jobs:
|
||||
|
||||
#
|
||||
# Build on AlmaLinux 8.5 using go-toolset
|
||||
#
|
||||
AlmaLinux-RPM-build:
|
||||
runs-on: ubuntu-latest
|
||||
# See: https://hub.docker.com/_/almalinux
|
||||
container: almalinux:8.5
|
||||
steps:
|
||||
|
||||
# Use dnf to install development packages
|
||||
- name: Install development packages
|
||||
run: dnf --assumeyes group install "Development Tools" "RPM Development Tools"
|
||||
|
||||
# Checkout git repository and submodules
|
||||
# fetch-depth must be 0 to use git describe
|
||||
# See: https://github.com/marketplace/actions/checkout
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v2
|
||||
with:
|
||||
submodules: recursive
|
||||
fetch-depth: 0
|
||||
|
||||
# Use dnf to install build dependencies
|
||||
- name: Install build dependencies
|
||||
run: dnf --assumeyes builddep scripts/cc-metric-collector.spec
|
||||
|
||||
- name: RPM build MetricCollector
|
||||
id: rpmbuild
|
||||
run: make RPM
|
||||
|
||||
# See: https://github.com/actions/upload-artifact
|
||||
- name: Save RPM as artifact
|
||||
uses: actions/upload-artifact@v2
|
||||
with:
|
||||
name: cc-metric-collector RPM for AlmaLinux 8.5
|
||||
path: ${{ steps.rpmbuild.outputs.RPM }}
|
||||
- name: Save SRPM as artifact
|
||||
uses: actions/upload-artifact@v2
|
||||
with:
|
||||
name: cc-metric-collector SRPM for AlmaLinux 8.5
|
||||
path: ${{ steps.rpmbuild.outputs.SRPM }}
|
||||
|
||||
# See: https://github.com/softprops/action-gh-release
|
||||
- name: Release
|
||||
uses: softprops/action-gh-release@v1
|
||||
if: startsWith(github.ref, 'refs/tags/')
|
||||
with:
|
||||
name: cc-metric-collector-${{github.ref_name}}
|
||||
files: |
|
||||
${{ steps.rpmbuild.outputs.RPM }}
|
||||
${{ steps.rpmbuild.outputs.SRPM }}
|
64
.github/workflows/RedHatUniversalBaseImage.yml
vendored
Normal file
64
.github/workflows/RedHatUniversalBaseImage.yml
vendored
Normal file
@@ -0,0 +1,64 @@
|
||||
# See: https://docs.github.com/en/actions/using-workflows/workflow-syntax-for-github-actions
|
||||
|
||||
# Workflow name
|
||||
name: Red Hat Universal Base Image 8 RPM build
|
||||
|
||||
# Run on tag push
|
||||
on:
|
||||
push:
|
||||
tags:
|
||||
- '**'
|
||||
|
||||
jobs:
|
||||
|
||||
#
|
||||
# Build on UBI 8 using go-toolset
|
||||
#
|
||||
UBI-8-RPM-build:
|
||||
runs-on: ubuntu-latest
|
||||
# See: https://catalog.redhat.com/software/containers/ubi8/ubi/5c359854d70cc534b3a3784e?container-tabs=gti
|
||||
container: registry.access.redhat.com/ubi8/ubi:8.5-226.1645809065
|
||||
steps:
|
||||
|
||||
# Use dnf to install development packages
|
||||
- name: Install development packages
|
||||
run: dnf --assumeyes --disableplugin=subscription-manager install rpm-build go-srpm-macros rpm-build-libs rpm-libs gcc make python38 git
|
||||
|
||||
# Checkout git repository and submodules
|
||||
# fetch-depth must be 0 to use git describe
|
||||
# See: https://github.com/marketplace/actions/checkout
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v2
|
||||
with:
|
||||
submodules: recursive
|
||||
fetch-depth: 0
|
||||
|
||||
# Use dnf to install build dependencies
|
||||
- name: Install build dependencies
|
||||
run: dnf --assumeyes --disableplugin=subscription-manager builddep scripts/cc-metric-collector.spec
|
||||
|
||||
- name: RPM build MetricCollector
|
||||
id: rpmbuild
|
||||
run: make RPM
|
||||
|
||||
# See: https://github.com/actions/upload-artifact
|
||||
- name: Save RPM as artifact
|
||||
uses: actions/upload-artifact@v2
|
||||
with:
|
||||
name: cc-metric-collector RPM for UBI 8
|
||||
path: ${{ steps.rpmbuild.outputs.RPM }}
|
||||
- name: Save SRPM as artifact
|
||||
uses: actions/upload-artifact@v2
|
||||
with:
|
||||
name: cc-metric-collector SRPM for UBI 8
|
||||
path: ${{ steps.rpmbuild.outputs.SRPM }}
|
||||
|
||||
# See: https://github.com/softprops/action-gh-release
|
||||
- name: Release
|
||||
uses: softprops/action-gh-release@v1
|
||||
if: startsWith(github.ref, 'refs/tags/')
|
||||
with:
|
||||
name: cc-metric-collector-${{github.ref_name}}
|
||||
files: |
|
||||
${{ steps.rpmbuild.outputs.RPM }}
|
||||
${{ steps.rpmbuild.outputs.SRPM }}
|
58
.github/workflows/rpmbuild.yml
vendored
58
.github/workflows/rpmbuild.yml
vendored
@@ -1,58 +0,0 @@
|
||||
name: Run RPM Build
|
||||
on:
|
||||
push:
|
||||
tags:
|
||||
- '**'
|
||||
|
||||
jobs:
|
||||
build-alma-8_5:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
- uses: TomTheBear/rpmbuild@alma8.5
|
||||
id: rpm
|
||||
name: Build RPM package on AlmaLinux 8.5
|
||||
with:
|
||||
spec_file: "./scripts/cc-metric-collector.spec"
|
||||
- name: Save RPM as artifact
|
||||
uses: actions/upload-artifact@v1.0.0
|
||||
with:
|
||||
name: cc-metric-collector RPM AlmaLinux 8.5
|
||||
path: ${{ steps.rpm.outputs.rpm_path }}
|
||||
- name: Save SRPM as artifact
|
||||
uses: actions/upload-artifact@v1.0.0
|
||||
with:
|
||||
name: cc-metric-collector SRPM AlmaLinux 8.5
|
||||
path: ${{ steps.rpm.outputs.source_rpm_path }}
|
||||
- name: Release
|
||||
uses: softprops/action-gh-release@v1
|
||||
with:
|
||||
name: cc-metric-collector-${{github.ref_name}}
|
||||
files: |
|
||||
${{ steps.rpm.outputs.source_rpm_path }}
|
||||
${{ steps.rpm.outputs.rpm_path }}
|
||||
build-rhel-ubi8:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
- uses: TomTheBear/rpmbuild@rh-ubi8
|
||||
id: rpm
|
||||
name: Build RPM package on Red Hat Universal Base Image 8
|
||||
with:
|
||||
spec_file: "./scripts/cc-metric-collector.spec"
|
||||
- name: Save RPM as artifact
|
||||
uses: actions/upload-artifact@v1.0.0
|
||||
with:
|
||||
name: cc-metric-collector RPM Red Hat Universal Base Image 8
|
||||
path: ${{ steps.rpm.outputs.rpm_path }}
|
||||
- name: Save SRPM as artifact
|
||||
uses: actions/upload-artifact@v1.0.0
|
||||
with:
|
||||
name: cc-metric-collector SRPM Red Hat Universal Base Image 8
|
||||
path: ${{ steps.rpm.outputs.source_rpm_path }}
|
||||
- name: Release
|
||||
uses: softprops/action-gh-release@v1
|
||||
with:
|
||||
files: |
|
||||
${{ steps.rpm.outputs.source_rpm_path }}
|
||||
${{ steps.rpm.outputs.rpm_path }}
|
36
.github/workflows/runonce.yml
vendored
36
.github/workflows/runonce.yml
vendored
@@ -1,46 +1,68 @@
|
||||
# See: https://docs.github.com/en/actions/using-workflows/workflow-syntax-for-github-actions
|
||||
|
||||
# Workflow name
|
||||
name: Run Test
|
||||
|
||||
# Run on event push
|
||||
on: push
|
||||
|
||||
jobs:
|
||||
#
|
||||
# Job build-1-17
|
||||
# Build on latest Ubuntu using golang version 1.17
|
||||
#
|
||||
build-1-17:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
# See: https://github.com/marketplace/actions/checkout
|
||||
# Checkout git repository and submodules
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v2
|
||||
with:
|
||||
submodules: recursive
|
||||
|
||||
# See: https://github.com/marketplace/actions/setup-go-environment
|
||||
- name: Setup Golang
|
||||
uses: actions/setup-go@v2.1.5
|
||||
uses: actions/setup-go@v2
|
||||
with:
|
||||
go-version: '^1.17.6'
|
||||
go-version: '^1.17.7'
|
||||
|
||||
# Install libganglia
|
||||
- name: Setup Ganglia
|
||||
run: sudo apt install ganglia-monitor libganglia1
|
||||
|
||||
- name: Build MetricCollector
|
||||
run: make
|
||||
|
||||
- name: Run MetricCollector
|
||||
- name: Run MetricCollector once
|
||||
run: ./cc-metric-collector --once --config .github/ci-config.json
|
||||
|
||||
#
|
||||
# Job build-1-16
|
||||
# Build on latest Ubuntu using golang version 1.16
|
||||
#
|
||||
build-1-16:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
# See: https://github.com/marketplace/actions/checkout
|
||||
# Checkout git repository and submodules
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v2
|
||||
with:
|
||||
submodules: recursive
|
||||
|
||||
# See: https://github.com/marketplace/actions/setup-go-environment
|
||||
- name: Setup Golang
|
||||
uses: actions/setup-go@v2.1.5
|
||||
uses: actions/setup-go@v2
|
||||
with:
|
||||
go-version: '^1.16.7' # The version AlmaLinux 8.5 uses
|
||||
|
||||
# Install libganglia
|
||||
- name: Setup Ganglia
|
||||
run: sudo apt install ganglia-monitor libganglia1
|
||||
|
||||
- name: Build MetricCollector
|
||||
run: make
|
||||
|
||||
- name: Run MetricCollectorlibganglia1
|
||||
- name: Run MetricCollector once
|
||||
run: ./cc-metric-collector --once --config .github/ci-config.json
|
||||
|
41
Makefile
41
Makefile
@@ -1,5 +1,5 @@
|
||||
APP = cc-metric-collector
|
||||
GOSRC_APP := metric-collector.go
|
||||
GOSRC_APP := cc-metric-collector.go
|
||||
GOSRC_COLLECTORS := $(wildcard collectors/*.go)
|
||||
GOSRC_SINKS := $(wildcard sinks/*.go)
|
||||
GOSRC_RECEIVERS := $(wildcard receivers/*.go)
|
||||
@@ -21,13 +21,18 @@ all: $(APP)
|
||||
|
||||
$(APP): $(GOSRC)
|
||||
make -C collectors
|
||||
make -C sinks
|
||||
go get
|
||||
go build -o $(APP) $(GOSRC_APP)
|
||||
|
||||
.PHONY: clean
|
||||
.ONESHELL:
|
||||
clean:
|
||||
@for COMP in $(COMPONENT_DIRS); do if [ -e $$COMP/Makefile ]; then make -C $$COMP clean; fi; done
|
||||
@for COMP in $(COMPONENT_DIRS)
|
||||
do
|
||||
if [[ -e $$COMP/Makefile ]]; then
|
||||
make -C $$COMP clean
|
||||
fi
|
||||
done
|
||||
rm -f $(APP)
|
||||
|
||||
.PHONY: fmt
|
||||
@@ -51,3 +56,33 @@ vet:
|
||||
staticcheck:
|
||||
go install honnef.co/go/tools/cmd/staticcheck@latest
|
||||
$$(go env GOPATH)/bin/staticcheck ./...
|
||||
|
||||
.ONESHELL:
|
||||
.PHONY: RPM
|
||||
RPM: scripts/cc-metric-collector.spec
|
||||
@WORKSPACE="$${PWD}"
|
||||
@SPECFILE="$${WORKSPACE}/scripts/cc-metric-collector.spec"
|
||||
# Setup RPM build tree
|
||||
@eval $$(rpm --eval "ARCH='%{_arch}' RPMDIR='%{_rpmdir}' SOURCEDIR='%{_sourcedir}' SPECDIR='%{_specdir}' SRPMDIR='%{_srcrpmdir}' BUILDDIR='%{_builddir}'")
|
||||
@mkdir --parents --verbose "$${RPMDIR}" "$${SOURCEDIR}" "$${SPECDIR}" "$${SRPMDIR}" "$${BUILDDIR}"
|
||||
# Create source tarball
|
||||
@COMMITISH="HEAD"
|
||||
@VERS=$$(git describe --tags $${COMMITISH})
|
||||
@VERS=$${VERS#v}
|
||||
@VERS=$${VERS//-/_}
|
||||
@eval $$(rpmspec --query --queryformat "NAME='%{name}' VERSION='%{version}' RELEASE='%{release}' NVR='%{NVR}' NVRA='%{NVRA}'" --define="VERS $${VERS}" "$${SPECFILE}")
|
||||
@PREFIX="$${NAME}-$${VERSION}"
|
||||
@FORMAT="tar.gz"
|
||||
@SRCFILE="$${SOURCEDIR}/$${PREFIX}.$${FORMAT}"
|
||||
@git archive --verbose --format "$${FORMAT}" --prefix="$${PREFIX}/" --output="$${SRCFILE}" $${COMMITISH}
|
||||
# Build RPM and SRPM
|
||||
@rpmbuild -ba --define="VERS $${VERS}" --rmsource --clean "$${SPECFILE}"
|
||||
# Report RPMs and SRPMs when in GitHub Workflow
|
||||
@if [[ "$${GITHUB_ACTIONS}" == true ]]; then
|
||||
@ RPMFILE="$${RPMDIR}/$${ARCH}/$${NVRA}.rpm"
|
||||
@ SRPMFILE="$${SRPMDIR}/$${NVR}.src.rpm"
|
||||
@ echo "RPM: $${RPMFILE}"
|
||||
@ echo "SRPM: $${SRPMFILE}"
|
||||
@ echo "::set-output name=SRPM::$${SRPMFILE}"
|
||||
@ echo "::set-output name=RPM::$${RPMFILE}"
|
||||
@fi
|
||||
|
34
README.md
34
README.md
@@ -56,7 +56,41 @@ Usage of metric-collector:
|
||||
-once
|
||||
Run all collectors only once
|
||||
```
|
||||
# Scenarios
|
||||
|
||||
The metric collector was designed with flexibility in mind, so it can be used in many scenarios. Here are a few:
|
||||
|
||||
```mermaid
|
||||
flowchart TD
|
||||
subgraph a ["Cluster A"]
|
||||
nodeA[NodeA with CC collector]
|
||||
nodeB[NodeB with CC collector]
|
||||
nodeC[NodeC with CC collector]
|
||||
end
|
||||
a --> db[(Database)]
|
||||
db <--> ccweb("Webfrontend")
|
||||
```
|
||||
|
||||
``` mermaid
|
||||
flowchart TD
|
||||
subgraph a [ClusterA]
|
||||
direction LR
|
||||
nodeA[NodeA with CC collector]
|
||||
nodeB[NodeB with CC collector]
|
||||
nodeC[NodeC with CC collector]
|
||||
end
|
||||
subgraph b [ClusterB]
|
||||
direction LR
|
||||
nodeD[NodeD with CC collector]
|
||||
nodeE[NodeE with CC collector]
|
||||
nodeF[NodeF with CC collector]
|
||||
end
|
||||
a --> ccrecv{"CC collector as receiver"}
|
||||
b --> ccrecv
|
||||
ccrecv --> db[("Database1")]
|
||||
ccrecv -.-> db2[("Database2")]
|
||||
db <-.-> ccweb("Webfrontend")
|
||||
```
|
||||
|
||||
# Contributing
|
||||
The ClusterCockpit ecosystem is designed to be used by different HPC computing centers. Since configurations and setups differ between the centers, the centers likely have to put some work into the cc-metric-collector to gather all desired metrics.
|
||||
|
@@ -55,16 +55,13 @@ $(BUILD_FOLDER)/likwid-$(LIKWID_VERSION): $(BUILD_FOLDER)/likwid-$(LIKWID_VERSIO
|
||||
tar -C $(BUILD_FOLDER) -xf $(BUILD_FOLDER)/likwid-$(LIKWID_VERSION).tar.gz
|
||||
|
||||
$(INSTALL_FOLDER)/liblikwid.a: $(BUILD_FOLDER)/likwid-$(LIKWID_VERSION) $(INSTALL_FOLDER)
|
||||
sed -i -e s+"PREFIX ?= .*"+"PREFIX = $(LIKWID_BASE)"+g \
|
||||
-e s+"SHARED_LIBRARY = .*"+"SHARED_LIBRARY = false"+g \
|
||||
-e s+"ACCESSMODE = .*"+"ACCESSMODE = $(ACCESSMODE)"+g \
|
||||
-e s+"INSTALLED_ACCESSDAEMON = .*"+"INSTALLED_ACCESSDAEMON = $(DAEMON_INSTALLDIR)/likwid-accessD"+g \
|
||||
$(BUILD_FOLDER)/likwid-$(LIKWID_VERSION)/config.mk
|
||||
cd $(BUILD_FOLDER)/likwid-$(LIKWID_VERSION) && make
|
||||
cp $(BUILD_FOLDER)/likwid-$(LIKWID_VERSION)/liblikwid.a $(INSTALL_FOLDER)
|
||||
cp $(BUILD_FOLDER)/likwid-$(LIKWID_VERSION)/ext/hwloc/liblikwid-hwloc.a $(INSTALL_FOLDER)
|
||||
cp $(BUILD_FOLDER)/likwid-$(LIKWID_VERSION)/src/includes/likwid*.h $(INSTALL_FOLDER)
|
||||
cp $(BUILD_FOLDER)/likwid-$(LIKWID_VERSION)/src/includes/bstrlib.h $(INSTALL_FOLDER)
|
||||
cd "$(BUILD_FOLDER)/likwid-$(LIKWID_VERSION)" && make "PREFIX=$(LIKWID_BASE)" "SHARED_LIBRARY=false" "ACCESSMODE=$(ACCESSMODE)" "INSTALLED_ACCESSDAEMON=$(DAEMON_INSTALLDIR)/likwid-accessD"
|
||||
cp \
|
||||
$(BUILD_FOLDER)/likwid-$(LIKWID_VERSION)/liblikwid.a \
|
||||
$(BUILD_FOLDER)/likwid-$(LIKWID_VERSION)/ext/hwloc/liblikwid-hwloc.a \
|
||||
$(BUILD_FOLDER)/likwid-$(LIKWID_VERSION)/src/includes/likwid*.h \
|
||||
$(BUILD_FOLDER)/likwid-$(LIKWID_VERSION)/src/includes/bstrlib.h \
|
||||
$(INSTALL_FOLDER)
|
||||
|
||||
$(DAEMON_INSTALLDIR)/likwid-accessD: $(BUILD_FOLDER)/likwid-$(LIKWID_VERSION)/likwid-accessD
|
||||
sudo -u $(DAEMON_USER) -g $(DAEMON_GROUP) install -m 4775 $(BUILD_FOLDER)/likwid-$(LIKWID_VERSION)/likwid-accessD $(DAEMON_INSTALLDIR)/likwid-accessD
|
||||
|
@@ -102,7 +102,7 @@ func (m *DiskstatCollector) Read(interval time.Duration, output chan lp.CCMetric
|
||||
part_max_used = perc
|
||||
}
|
||||
}
|
||||
y, err := lp.New("part_max_used", map[string]string{"type": "node"}, m.meta, map[string]interface{}{"value": part_max_used}, time.Now())
|
||||
y, err := lp.New("part_max_used", map[string]string{"type": "node"}, m.meta, map[string]interface{}{"value": int(part_max_used)}, time.Now())
|
||||
if err == nil {
|
||||
y.AddMeta("unit", "percent")
|
||||
output <- y
|
||||
|
@@ -2,7 +2,7 @@ package collectors
|
||||
|
||||
/*
|
||||
#cgo CFLAGS: -I./likwid
|
||||
#cgo LDFLAGS: -L./likwid -llikwid -llikwid-hwloc -lm -Wl,--unresolved-symbols=ignore-in-object-files
|
||||
#cgo LDFLAGS: -Wl,--unresolved-symbols=ignore-in-object-files
|
||||
#include <stdlib.h>
|
||||
#include <likwid.h>
|
||||
*/
|
||||
@@ -71,8 +71,9 @@ func GetAllMetricScopes() []MetricScope {
|
||||
}
|
||||
|
||||
const (
|
||||
LIKWID_LIB_NAME = "liblikwid.so"
|
||||
LIKWID_LIB_DL_FLAGS = dl.RTLD_LAZY | dl.RTLD_GLOBAL
|
||||
LIKWID_LIB_NAME = "liblikwid.so"
|
||||
LIKWID_LIB_DL_FLAGS = dl.RTLD_LAZY | dl.RTLD_GLOBAL
|
||||
LIKWID_DEF_ACCESSMODE = "direct"
|
||||
)
|
||||
|
||||
type LikwidCollectorMetricConfig struct {
|
||||
@@ -95,6 +96,8 @@ type LikwidCollectorConfig struct {
|
||||
Metrics []LikwidCollectorMetricConfig `json:"globalmetrics,omitempty"`
|
||||
ForceOverwrite bool `json:"force_overwrite,omitempty"`
|
||||
InvalidToZero bool `json:"invalid_to_zero,omitempty"`
|
||||
AccessMode string `json:"access_mode,omitempty"`
|
||||
DaemonPath string `json:"accessdaemon_path,omitempty"`
|
||||
}
|
||||
|
||||
type LikwidCollector struct {
|
||||
@@ -260,6 +263,7 @@ func (m *LikwidCollector) getResponsiblities() map[MetricScope]map[int]int {
|
||||
func (m *LikwidCollector) Init(config json.RawMessage) error {
|
||||
var ret C.int
|
||||
m.name = "LikwidCollector"
|
||||
m.config.AccessMode = LIKWID_DEF_ACCESSMODE
|
||||
if len(config) > 0 {
|
||||
err := json.Unmarshal(config, &m.config)
|
||||
if err != nil {
|
||||
@@ -270,6 +274,11 @@ func (m *LikwidCollector) Init(config json.RawMessage) error {
|
||||
if lib == nil {
|
||||
return fmt.Errorf("error instantiating DynamicLibrary for %s", LIKWID_LIB_NAME)
|
||||
}
|
||||
err := lib.Open()
|
||||
if err != nil {
|
||||
return fmt.Errorf("error opening %s: %v", LIKWID_LIB_NAME, err)
|
||||
}
|
||||
|
||||
if m.config.ForceOverwrite {
|
||||
cclog.ComponentDebug(m.name, "Set LIKWID_FORCE=1")
|
||||
os.Setenv("LIKWID_FORCE", "1")
|
||||
@@ -301,6 +310,16 @@ func (m *LikwidCollector) Init(config json.RawMessage) error {
|
||||
m.initGranularity()
|
||||
// Generate map for MetricScope -> scope_id (like socket id) -> responsible id (offset in cpulist)
|
||||
m.scopeRespTids = m.getResponsiblities()
|
||||
switch m.config.AccessMode {
|
||||
case "direct":
|
||||
C.HPMmode(0)
|
||||
case "accessdaemon":
|
||||
if len(m.config.DaemonPath) > 0 {
|
||||
p := os.Getenv("PATH")
|
||||
os.Setenv("PATH", m.config.DaemonPath+":"+p)
|
||||
}
|
||||
C.HPMmode(1)
|
||||
}
|
||||
|
||||
cclog.ComponentDebug(m.name, "initialize LIKWID perfmon module")
|
||||
ret = C.perfmon_init(C.int(len(m.cpulist)), &m.cpulist[0])
|
||||
|
@@ -4,10 +4,12 @@
|
||||
The `likwid` collector is probably the most complicated collector. The LIKWID library is included as static library with *direct* access mode. The *direct* access mode is suitable if the daemon is executed by a root user. The static library does not contain the performance groups, so all information needs to be provided in the configuration.
|
||||
|
||||
The `likwid` configuration consists of two parts, the "eventsets" and "globalmetrics":
|
||||
- An event set list itself has two parts, the "events" and a set of derivable "metrics". Each of the "events" is a counter:event pair in LIKWID's syntax. The "metrics" are a list of formulas to derive the metric value from the measurements of the "events". Each metric has a name, the formula, a scope and a publish flag. A counter names can be used like variables in the formulas, so `PMC0+PMC1` sums the measurements for the both events configured in the counters `PMC0` and `PMC1`. The scope tells the Collector whether it is a metric for each hardware thread (`cpu`) or each CPU socket (`socket`). The last one is the publishing flag. It tells the collector whether a metric should be sent to the router.
|
||||
- An event set list itself has two parts, the "events" and a set of derivable "metrics". Each of the "events" is a counter:event pair in LIKWID's syntax. The "metrics" are a list of formulas to derive the metric value from the measurements of the "events". Each metric has a name, the formula, a scope and a publish flag. Counter names can be used like variables in the formulas, so `PMC0+PMC1` sums the measurements for the both events configured in the counters `PMC0` and `PMC1`. The scope tells the Collector whether it is a metric for each hardware thread (`cpu`) or each CPU socket (`socket`). The last one is the publishing flag. It tells the collector whether a metric should be sent to the router.
|
||||
- The global metrics are metrics which require data from all event set measurements to be derived. The inputs are the metrics in the event sets. Similar to the metrics in the event sets, the global metrics are defined by a name, a formula, a scope and a publish flag. See event set metrics for details. The only difference is that there is no access to the raw event measurements anymore but only to the metrics. So, the idea is to derive a metric in the "eventsets" section and reuse it in the "globalmetrics" part. If you need a metric only for deriving the global metrics, disable forwarding of the event set metrics. **Be aware** that the combination might be misleading because the "behavior" of a metric changes over time and the multiple measurements might count different computing phases.
|
||||
|
||||
Additional options:
|
||||
- `access_mode` : Method to use for hardware performance monitoring (`direct` access as root user, `accessdaemon` for the daemon mode)
|
||||
- `accessdaemon_path`: Folder with the access daemon `likwid-accessD`, commonly `$LIKWID_INSTALL_LOC/sbin`
|
||||
- `force_overwrite`: Same as setting `LIKWID_FORCE=1`. In case counters are already in-use, LIKWID overwrites their configuration to do its measurements
|
||||
- `invalid_to_zero`: In some cases, the calculations result in `NaN` or `Inf`. With this option, all `NaN` and `Inf` values are replaces with `0.0`.
|
||||
|
||||
@@ -26,6 +28,56 @@ As a guideline:
|
||||
- All `PWRx` counters have scope `socket`, except `"PWR1" : "RAPL_CORE_ENERGY"` has `cpu` scope
|
||||
- All `DFCx` counters have scope `socket`
|
||||
|
||||
### Help with the configuration
|
||||
|
||||
The configuration for the `likwid` collector is quite complicated. Most users don't use LIKWID with the event:counter notation but rely on the performance groups defined by the LIKWID team for each architecture. In order to help with the `likwid` collector configuration, we included a script `scripts/likwid_perfgroup_to_cc_config.py` that creates the configuration of an `eventset` from a performance group (using a LIKWID installation in `$PATH`):
|
||||
```
|
||||
$ likwid-perfctr -i
|
||||
[...]
|
||||
short name: ICX
|
||||
[...]
|
||||
$ likwid-perfctr -a
|
||||
[...]
|
||||
MEM_DP
|
||||
MEM
|
||||
FLOPS_SP
|
||||
CLOCK
|
||||
[...]
|
||||
$ scripts/likwid_perfgroup_to_cc_config.py ICX MEM_DP
|
||||
{
|
||||
"events": {
|
||||
"FIXC0": "INSTR_RETIRED_ANY",
|
||||
"..." : "..."
|
||||
},
|
||||
"metrics" : [
|
||||
{
|
||||
"calc": "time",
|
||||
"name": "Runtime (RDTSC) [s]",
|
||||
"publish": true,
|
||||
"scope": "hwthread"
|
||||
},
|
||||
{
|
||||
"..." : "..."
|
||||
}
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
You can copy this JSON and add it to the `eventsets` list. If you specify multiple event sets, you can add globally derived metrics in the extra `global_metrics` section with the metric names as variables.
|
||||
|
||||
### Mixed usage between daemon and users
|
||||
|
||||
LIKWID checks the file `/var/run/likwid.lock` before performing any interfering operations. Who is allowed to access the counters is determined by the owner of the file. If it does not exist, it is created for the current user. So, if you want to temporarly allow counter access to a user (e.g. in a job):
|
||||
|
||||
Before (SLURM prolog, ...)
|
||||
```
|
||||
$ chwon $JOBUSER /var/run/likwid.lock
|
||||
```
|
||||
|
||||
After (SLURM epilog, ...)
|
||||
```
|
||||
$ chwon $CCUSER /var/run/likwid.lock
|
||||
```
|
||||
|
||||
### Example configuration
|
||||
|
||||
|
@@ -22,6 +22,7 @@ type LustreCollectorConfig struct {
|
||||
LCtlCommand string `json:"lctl_command"`
|
||||
ExcludeMetrics []string `json:"exclude_metrics"`
|
||||
SendAllMetrics bool `json:"send_all_metrics"`
|
||||
Sudo bool `json:"use_sudo"`
|
||||
}
|
||||
|
||||
type LustreCollector struct {
|
||||
@@ -31,11 +32,17 @@ type LustreCollector struct {
|
||||
stats map[string]map[string]int64
|
||||
config LustreCollectorConfig
|
||||
lctl string
|
||||
sudoCmd string
|
||||
}
|
||||
|
||||
func (m *LustreCollector) getDeviceDataCommand(device string) []string {
|
||||
var command *exec.Cmd
|
||||
statsfile := fmt.Sprintf("llite.%s.stats", device)
|
||||
command := exec.Command(m.lctl, LCTL_OPTION, statsfile)
|
||||
if m.config.Sudo {
|
||||
command = exec.Command(m.sudoCmd, m.lctl, LCTL_OPTION, statsfile)
|
||||
} else {
|
||||
command = exec.Command(m.lctl, LCTL_OPTION, statsfile)
|
||||
}
|
||||
command.Wait()
|
||||
stdout, _ := command.Output()
|
||||
return strings.Split(string(stdout), "\n")
|
||||
@@ -109,7 +116,7 @@ func (m *LustreCollector) Init(config json.RawMessage) error {
|
||||
return err
|
||||
}
|
||||
if user.Uid != "0" {
|
||||
cclog.ComponentError(m.name, "Lustre file system statistics can only be queried by user root:", err.Error())
|
||||
cclog.ComponentError(m.name, "Lustre file system statistics can only be queried by user root")
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -136,6 +143,12 @@ func (m *LustreCollector) Init(config json.RawMessage) error {
|
||||
}
|
||||
}
|
||||
m.lctl = p
|
||||
if m.config.Sudo {
|
||||
p, err := exec.LookPath("sudo")
|
||||
if err != nil {
|
||||
m.sudoCmd = p
|
||||
}
|
||||
}
|
||||
|
||||
devices := m.getDevices()
|
||||
if len(devices) == 0 {
|
||||
|
@@ -1,35 +1,76 @@
|
||||
package collectors
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
|
||||
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
|
||||
)
|
||||
|
||||
const MEMSTATFILE = `/proc/meminfo`
|
||||
const MEMSTATFILE = "/proc/meminfo"
|
||||
const NUMA_MEMSTAT_BASE = "/sys/devices/system/node"
|
||||
|
||||
type MemstatCollectorConfig struct {
|
||||
ExcludeMetrics []string `json:"exclude_metrics"`
|
||||
NodeStats bool `json:"node_stats,omitempty"`
|
||||
NumaStats bool `json:"numa_stats,omitempty"`
|
||||
}
|
||||
|
||||
type MemstatCollectorNode struct {
|
||||
file string
|
||||
tags map[string]string
|
||||
}
|
||||
|
||||
type MemstatCollector struct {
|
||||
metricCollector
|
||||
stats map[string]int64
|
||||
tags map[string]string
|
||||
matches map[string]string
|
||||
config MemstatCollectorConfig
|
||||
stats map[string]int64
|
||||
tags map[string]string
|
||||
matches map[string]string
|
||||
config MemstatCollectorConfig
|
||||
nodefiles map[int]MemstatCollectorNode
|
||||
}
|
||||
|
||||
func getStats(filename string) map[string]float64 {
|
||||
stats := make(map[string]float64)
|
||||
file, err := os.Open(filename)
|
||||
if err != nil {
|
||||
cclog.Error(err.Error())
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
scanner := bufio.NewScanner(file)
|
||||
for scanner.Scan() {
|
||||
line := scanner.Text()
|
||||
linefields := strings.Fields(line)
|
||||
if len(linefields) == 3 {
|
||||
v, err := strconv.ParseFloat(linefields[1], 64)
|
||||
if err == nil {
|
||||
stats[strings.Trim(linefields[0], ":")] = v
|
||||
}
|
||||
} else if len(linefields) == 5 {
|
||||
v, err := strconv.ParseFloat(linefields[3], 64)
|
||||
if err == nil {
|
||||
stats[strings.Trim(linefields[0], ":")] = v
|
||||
}
|
||||
}
|
||||
}
|
||||
return stats
|
||||
}
|
||||
|
||||
func (m *MemstatCollector) Init(config json.RawMessage) error {
|
||||
var err error
|
||||
m.name = "MemstatCollector"
|
||||
m.config.NodeStats = true
|
||||
m.config.NumaStats = false
|
||||
if len(config) > 0 {
|
||||
err = json.Unmarshal(config, &m.config)
|
||||
if err != nil {
|
||||
@@ -40,7 +81,8 @@ func (m *MemstatCollector) Init(config json.RawMessage) error {
|
||||
m.stats = make(map[string]int64)
|
||||
m.matches = make(map[string]string)
|
||||
m.tags = map[string]string{"type": "node"}
|
||||
matches := map[string]string{`MemTotal`: "mem_total",
|
||||
matches := map[string]string{
|
||||
"MemTotal": "mem_total",
|
||||
"SwapTotal": "swap_total",
|
||||
"SReclaimable": "mem_sreclaimable",
|
||||
"Slab": "mem_slab",
|
||||
@@ -48,7 +90,9 @@ func (m *MemstatCollector) Init(config json.RawMessage) error {
|
||||
"Buffers": "mem_buffers",
|
||||
"Cached": "mem_cached",
|
||||
"MemAvailable": "mem_available",
|
||||
"SwapFree": "swap_free"}
|
||||
"SwapFree": "swap_free",
|
||||
"MemShared": "mem_shared",
|
||||
}
|
||||
for k, v := range matches {
|
||||
_, skip := stringArrayContains(m.config.ExcludeMetrics, k)
|
||||
if !skip {
|
||||
@@ -56,13 +100,44 @@ func (m *MemstatCollector) Init(config json.RawMessage) error {
|
||||
}
|
||||
}
|
||||
if len(m.matches) == 0 {
|
||||
return errors.New("No metrics to collect")
|
||||
return errors.New("no metrics to collect")
|
||||
}
|
||||
m.setup()
|
||||
_, err = ioutil.ReadFile(string(MEMSTATFILE))
|
||||
if err == nil {
|
||||
m.init = true
|
||||
|
||||
if m.config.NodeStats {
|
||||
if stats := getStats(MEMSTATFILE); len(stats) == 0 {
|
||||
return fmt.Errorf("cannot read data from file %s", MEMSTATFILE)
|
||||
}
|
||||
}
|
||||
|
||||
if m.config.NumaStats {
|
||||
globPattern := filepath.Join(NUMA_MEMSTAT_BASE, "node[0-9]*", "meminfo")
|
||||
regex := regexp.MustCompile(filepath.Join(NUMA_MEMSTAT_BASE, "node(\\d+)", "meminfo"))
|
||||
files, err := filepath.Glob(globPattern)
|
||||
if err == nil {
|
||||
m.nodefiles = make(map[int]MemstatCollectorNode)
|
||||
for _, f := range files {
|
||||
if stats := getStats(f); len(stats) == 0 {
|
||||
return fmt.Errorf("cannot read data from file %s", f)
|
||||
}
|
||||
rematch := regex.FindStringSubmatch(f)
|
||||
if len(rematch) == 2 {
|
||||
id, err := strconv.Atoi(rematch[1])
|
||||
if err == nil {
|
||||
f := MemstatCollectorNode{
|
||||
file: f,
|
||||
tags: map[string]string{
|
||||
"type": "memoryDomain",
|
||||
"type-id": fmt.Sprintf("%d", id),
|
||||
},
|
||||
}
|
||||
m.nodefiles[id] = f
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
m.init = true
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -71,56 +146,41 @@ func (m *MemstatCollector) Read(interval time.Duration, output chan lp.CCMetric)
|
||||
return
|
||||
}
|
||||
|
||||
buffer, err := ioutil.ReadFile(string(MEMSTATFILE))
|
||||
if err != nil {
|
||||
log.Print(err)
|
||||
return
|
||||
}
|
||||
|
||||
ll := strings.Split(string(buffer), "\n")
|
||||
for _, line := range ll {
|
||||
ls := strings.Split(line, `:`)
|
||||
if len(ls) > 1 {
|
||||
lv := strings.Fields(ls[1])
|
||||
m.stats[ls[0]], err = strconv.ParseInt(lv[0], 0, 64)
|
||||
sendStats := func(stats map[string]float64, tags map[string]string) {
|
||||
for match, name := range m.matches {
|
||||
var value float64 = 0
|
||||
if v, ok := stats[match]; ok {
|
||||
value = v
|
||||
}
|
||||
y, err := lp.New(name, tags, m.meta, map[string]interface{}{"value": value}, time.Now())
|
||||
if err == nil {
|
||||
output <- y
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if _, exists := m.stats[`MemTotal`]; !exists {
|
||||
err = errors.New("Parse error")
|
||||
log.Print(err)
|
||||
return
|
||||
}
|
||||
|
||||
for match, name := range m.matches {
|
||||
if _, exists := m.stats[match]; !exists {
|
||||
err = fmt.Errorf("Parse error for %s : %s", match, name)
|
||||
log.Print(err)
|
||||
continue
|
||||
}
|
||||
y, err := lp.New(name, m.tags, m.meta, map[string]interface{}{"value": int(float64(m.stats[match]) * 1.0e-3)}, time.Now())
|
||||
if err == nil {
|
||||
output <- y
|
||||
}
|
||||
}
|
||||
|
||||
if _, free := m.stats[`MemFree`]; free {
|
||||
if _, buffers := m.stats[`Buffers`]; buffers {
|
||||
if _, cached := m.stats[`Cached`]; cached {
|
||||
memUsed := m.stats[`MemTotal`] - (m.stats[`MemFree`] + m.stats[`Buffers`] + m.stats[`Cached`])
|
||||
_, skip := stringArrayContains(m.config.ExcludeMetrics, "mem_used")
|
||||
y, err := lp.New("mem_used", m.tags, m.meta, map[string]interface{}{"value": int(float64(memUsed) * 1.0e-3)}, time.Now())
|
||||
if err == nil && !skip {
|
||||
output <- y
|
||||
if _, skip := stringArrayContains(m.config.ExcludeMetrics, "mem_used"); !skip {
|
||||
if freeVal, free := stats["MemFree"]; free {
|
||||
if bufVal, buffers := stats["Buffers"]; buffers {
|
||||
if cacheVal, cached := stats["Cached"]; cached {
|
||||
memUsed := stats["MemTotal"] - (freeVal + bufVal + cacheVal)
|
||||
y, err := lp.New("mem_used", tags, m.meta, map[string]interface{}{"value": memUsed}, time.Now())
|
||||
if err == nil {
|
||||
output <- y
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if _, found := m.stats[`MemShared`]; found {
|
||||
_, skip := stringArrayContains(m.config.ExcludeMetrics, "mem_shared")
|
||||
y, err := lp.New("mem_shared", m.tags, m.meta, map[string]interface{}{"value": int(float64(m.stats[`MemShared`]) * 1.0e-3)}, time.Now())
|
||||
if err == nil && !skip {
|
||||
output <- y
|
||||
|
||||
if m.config.NodeStats {
|
||||
nodestats := getStats(MEMSTATFILE)
|
||||
sendStats(nodestats, m.tags)
|
||||
}
|
||||
|
||||
if m.config.NumaStats {
|
||||
for _, nodeConf := range m.nodefiles {
|
||||
stats := getStats(nodeConf.file)
|
||||
sendStats(stats, nodeConf.tags)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@@ -13,29 +13,30 @@ import (
|
||||
)
|
||||
|
||||
type MetricCollector interface {
|
||||
Name() string
|
||||
Init(config json.RawMessage) error
|
||||
Initialized() bool
|
||||
Read(duration time.Duration, output chan lp.CCMetric)
|
||||
Close()
|
||||
Name() string // Name of the metric collector
|
||||
Init(config json.RawMessage) error // Initialize metric collector
|
||||
Initialized() bool // Is metric collector initialized?
|
||||
Read(duration time.Duration, output chan lp.CCMetric) // Read metrics from metric collector
|
||||
Close() // Close / finish metric collector
|
||||
}
|
||||
|
||||
type metricCollector struct {
|
||||
name string
|
||||
init bool
|
||||
meta map[string]string
|
||||
name string // name of the metric
|
||||
init bool // is metric collector initialized?
|
||||
meta map[string]string // static meta data tags
|
||||
}
|
||||
|
||||
// Name() returns the name of the metric collector
|
||||
// Name returns the name of the metric collector
|
||||
func (c *metricCollector) Name() string {
|
||||
return c.name
|
||||
}
|
||||
|
||||
// Setup is for future use
|
||||
func (c *metricCollector) setup() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Initialized() indicates whether the metric collector has been initialized.
|
||||
// Initialized indicates whether the metric collector has been initialized
|
||||
func (c *metricCollector) Initialized() bool {
|
||||
return c.init
|
||||
}
|
||||
@@ -64,6 +65,7 @@ func stringArrayContains(array []string, str string) (int, bool) {
|
||||
return -1, false
|
||||
}
|
||||
|
||||
// SocketList returns the list of physical sockets as read from /proc/cpuinfo
|
||||
func SocketList() []int {
|
||||
buffer, err := ioutil.ReadFile("/proc/cpuinfo")
|
||||
if err != nil {
|
||||
@@ -89,6 +91,7 @@ func SocketList() []int {
|
||||
return packs
|
||||
}
|
||||
|
||||
// CpuList returns the list of physical CPUs (in contrast to logical CPUs) as read from /proc/cpuinfo
|
||||
func CpuList() []int {
|
||||
buffer, err := ioutil.ReadFile("/proc/cpuinfo")
|
||||
if err != nil {
|
||||
@@ -117,8 +120,8 @@ func CpuList() []int {
|
||||
// RemoveFromStringList removes the string r from the array of strings s
|
||||
// If r is not contained in the array an error is returned
|
||||
func RemoveFromStringList(s []string, r string) ([]string, error) {
|
||||
for i, item := range s {
|
||||
if r == item {
|
||||
for i := range s {
|
||||
if r == s[i] {
|
||||
return append(s[:i], s[i+1:]...), nil
|
||||
}
|
||||
}
|
||||
|
92
collectors/sampleMetric.go
Normal file
92
collectors/sampleMetric.go
Normal file
@@ -0,0 +1,92 @@
|
||||
package collectors
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"time"
|
||||
|
||||
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
|
||||
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
|
||||
)
|
||||
|
||||
// These are the fields we read from the JSON configuration
|
||||
type SampleCollectorConfig struct {
|
||||
Interval string `json:"interval"`
|
||||
}
|
||||
|
||||
// This contains all variables we need during execution and the variables
|
||||
// defined by metricCollector (name, init, ...)
|
||||
type SampleCollector struct {
|
||||
metricCollector
|
||||
config SampleTimerCollectorConfig // the configuration structure
|
||||
meta map[string]string // default meta information
|
||||
tags map[string]string // default tags
|
||||
}
|
||||
|
||||
// Functions to implement MetricCollector interface
|
||||
// Init(...), Read(...), Close()
|
||||
// See: metricCollector.go
|
||||
|
||||
// Init initializes the sample collector
|
||||
// Called once by the collector manager
|
||||
// All tags, meta data tags and metrics that do not change over the runtime should be set here
|
||||
func (m *SampleCollector) Init(config json.RawMessage) error {
|
||||
var err error = nil
|
||||
// Always set the name early in Init() to use it in cclog.Component* functions
|
||||
m.name = "InternalCollector"
|
||||
// This is for later use, also call it early
|
||||
m.setup()
|
||||
// Define meta information sent with each metric
|
||||
// (Can also be dynamic or this is the basic set with extension through AddMeta())
|
||||
m.meta = map[string]string{"source": m.name, "group": "SAMPLE"}
|
||||
// Define tags sent with each metric
|
||||
// The 'type' tag is always needed, it defines the granulatity of the metric
|
||||
// node -> whole system
|
||||
// socket -> CPU socket (requires socket ID as 'type-id' tag)
|
||||
// cpu -> single CPU hardware thread (requires cpu ID as 'type-id' tag)
|
||||
m.tags = map[string]string{"type": "node"}
|
||||
// Read in the JSON configuration
|
||||
if len(config) > 0 {
|
||||
err = json.Unmarshal(config, &m.config)
|
||||
if err != nil {
|
||||
cclog.ComponentError(m.name, "Error reading config:", err.Error())
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Set up everything that the collector requires during the Read() execution
|
||||
// Check files required, test execution of some commands, create data structure
|
||||
// for all topological entities (sockets, NUMA domains, ...)
|
||||
// Return some useful error message in case of any failures
|
||||
|
||||
// Set this flag only if everything is initialized properly, all required files exist, ...
|
||||
m.init = true
|
||||
return err
|
||||
}
|
||||
|
||||
// Read collects all metrics belonging to the sample collector
|
||||
// and sends them through the output channel to the collector manager
|
||||
func (m *SampleCollector) Read(interval time.Duration, output chan lp.CCMetric) {
|
||||
// Create a sample metric
|
||||
timestamp := time.Now()
|
||||
|
||||
value := 1.0
|
||||
// If you want to measure something for a specific amount of time, use interval
|
||||
// start := readState()
|
||||
// time.Sleep(interval)
|
||||
// stop := readState()
|
||||
// value = (stop - start) / interval.Seconds()
|
||||
|
||||
y, err := lp.New("sample_metric", m.tags, m.meta, map[string]interface{}{"value": value}, timestamp)
|
||||
if err == nil {
|
||||
// Send it to output channel
|
||||
output <- y
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// Close metric collector: close network connection, close files, close libraries, ...
|
||||
// Called once by the collector manager
|
||||
func (m *SampleCollector) Close() {
|
||||
// Unset flag
|
||||
m.init = false
|
||||
}
|
122
collectors/sampleTimerMetric.go
Normal file
122
collectors/sampleTimerMetric.go
Normal file
@@ -0,0 +1,122 @@
|
||||
package collectors
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
|
||||
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
|
||||
)
|
||||
|
||||
// These are the fields we read from the JSON configuration
|
||||
type SampleTimerCollectorConfig struct {
|
||||
Interval string `json:"interval"`
|
||||
}
|
||||
|
||||
// This contains all variables we need during execution and the variables
|
||||
// defined by metricCollector (name, init, ...)
|
||||
type SampleTimerCollector struct {
|
||||
metricCollector
|
||||
wg sync.WaitGroup // sync group for management
|
||||
done chan bool // channel for management
|
||||
meta map[string]string // default meta information
|
||||
tags map[string]string // default tags
|
||||
config SampleTimerCollectorConfig // the configuration structure
|
||||
interval time.Duration // the interval parsed from configuration
|
||||
ticker *time.Ticker // own timer
|
||||
output chan lp.CCMetric // own internal output channel
|
||||
}
|
||||
|
||||
func (m *SampleTimerCollector) Init(name string, config json.RawMessage) error {
|
||||
var err error = nil
|
||||
// Always set the name early in Init() to use it in cclog.Component* functions
|
||||
m.name = "SampleTimerCollector"
|
||||
// This is for later use, also call it early
|
||||
m.setup()
|
||||
// Define meta information sent with each metric
|
||||
// (Can also be dynamic or this is the basic set with extension through AddMeta())
|
||||
m.meta = map[string]string{"source": m.name, "group": "SAMPLE"}
|
||||
// Define tags sent with each metric
|
||||
// The 'type' tag is always needed, it defines the granulatity of the metric
|
||||
// node -> whole system
|
||||
// socket -> CPU socket (requires socket ID as 'type-id' tag)
|
||||
// cpu -> single CPU hardware thread (requires cpu ID as 'type-id' tag)
|
||||
m.tags = map[string]string{"type": "node"}
|
||||
// Read in the JSON configuration
|
||||
if len(config) > 0 {
|
||||
err = json.Unmarshal(config, &m.config)
|
||||
if err != nil {
|
||||
cclog.ComponentError(m.name, "Error reading config:", err.Error())
|
||||
return err
|
||||
}
|
||||
}
|
||||
// Parse the read interval duration
|
||||
m.interval, err = time.ParseDuration(m.config.Interval)
|
||||
if err != nil {
|
||||
cclog.ComponentError(m.name, "Error parsing interval:", err.Error())
|
||||
return err
|
||||
}
|
||||
|
||||
// Storage for output channel
|
||||
m.output = nil
|
||||
// Mangement channel for the timer function.
|
||||
m.done = make(chan bool)
|
||||
// Create the own ticker
|
||||
m.ticker = time.NewTicker(m.interval)
|
||||
|
||||
// Start the timer loop with return functionality by sending 'true' to the done channel
|
||||
m.wg.Add(1)
|
||||
go func() {
|
||||
select {
|
||||
case <-m.done:
|
||||
// Exit the timer loop
|
||||
cclog.ComponentDebug(m.name, "Closing...")
|
||||
m.wg.Done()
|
||||
return
|
||||
case timestamp := <-m.ticker.C:
|
||||
// This is executed every timer tick but we have to wait until the first
|
||||
// Read() to get the output channel
|
||||
if m.output != nil {
|
||||
m.ReadMetrics(timestamp)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Set this flag only if everything is initialized properly, all required files exist, ...
|
||||
m.init = true
|
||||
return err
|
||||
}
|
||||
|
||||
// This function is called at each interval timer tick
|
||||
func (m *SampleTimerCollector) ReadMetrics(timestamp time.Time) {
|
||||
// Create a sample metric
|
||||
|
||||
value := 1.0
|
||||
|
||||
// If you want to measure something for a specific amout of time, use interval
|
||||
// start := readState()
|
||||
// time.Sleep(interval)
|
||||
// stop := readState()
|
||||
// value = (stop - start) / interval.Seconds()
|
||||
|
||||
y, err := lp.New("sample_metric", m.tags, m.meta, map[string]interface{}{"value": value}, timestamp)
|
||||
if err == nil && m.output != nil {
|
||||
// Send it to output channel if we have a valid channel
|
||||
m.output <- y
|
||||
}
|
||||
}
|
||||
|
||||
func (m *SampleTimerCollector) Read(interval time.Duration, output chan lp.CCMetric) {
|
||||
// Capture output channel
|
||||
m.output = output
|
||||
}
|
||||
|
||||
func (m *SampleTimerCollector) Close() {
|
||||
// Send signal to the timer loop to stop it
|
||||
m.done <- true
|
||||
// Wait until the timer loop is done
|
||||
m.wg.Wait()
|
||||
// Unset flag
|
||||
m.init = false
|
||||
}
|
@@ -6,39 +6,23 @@ This folder contains the ReceiveManager and receiver implementations for the cc-
|
||||
|
||||
The configuration file for the receivers is a list of configurations. The `type` field in each specifies which receiver to initialize.
|
||||
|
||||
```json
|
||||
[
|
||||
{
|
||||
"type": "nats",
|
||||
"address": "nats://my-url",
|
||||
"port" : "4222",
|
||||
"database": "testcluster"
|
||||
}
|
||||
]
|
||||
```
|
||||
|
||||
|
||||
## Type `nats`
|
||||
|
||||
```json
|
||||
{
|
||||
"type": "nats",
|
||||
"address": "<nats-URI or hostname>",
|
||||
"port" : "<portnumber>",
|
||||
"database": "<subscribe topic>"
|
||||
"myreceivername" : {
|
||||
"type": "receiver-type",
|
||||
<receiver-specific configuration>
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
The `nats` receiver subscribes to the topic `database` and listens on `address` and `port` for metrics in the InfluxDB line protocol.
|
||||
This allows to specify
|
||||
|
||||
## Available receivers
|
||||
|
||||
- [`nats`](./natsReceiver.md): Receive metrics from the NATS network
|
||||
- [`prometheus`](./prometheusReceiver.md): Scrape data from a Prometheus client
|
||||
|
||||
# Contributing own receivers
|
||||
A receiver contains three functions and is derived from the type `Receiver` (in `metricReceiver.go`):
|
||||
* `Init(config ReceiverConfig) error`
|
||||
* `Start() error`
|
||||
* `Close()`
|
||||
* `Name() string`
|
||||
* `SetSink(sink chan ccMetric.CCMetric)`
|
||||
A receiver contains a few functions and is derived from the type `Receiver` (in `metricReceiver.go`):
|
||||
|
||||
The data structures should be set up in `Init()` like opening a file or server connection. The `Start()` function should either start a go routine or issue some other asynchronous mechanism for receiving metrics. The `Close()` function should tear down anything created in `Init()`.
|
||||
|
||||
Finally, the receiver needs to be registered in the `receiveManager.go`. There is a list of receivers called `AvailableReceivers` which is a map (`receiver_type_string` -> `pointer to Receiver interface`). Add a new entry with a descriptive name and the new receiver.
|
||||
For an example, check the [sample receiver](./sampleReceiver.go)
|
||||
|
@@ -1,9 +1,6 @@
|
||||
package receivers
|
||||
|
||||
import (
|
||||
// "time"
|
||||
"encoding/json"
|
||||
|
||||
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
|
||||
)
|
||||
|
||||
@@ -11,6 +8,7 @@ type defaultReceiverConfig struct {
|
||||
Type string `json:"type"`
|
||||
}
|
||||
|
||||
// Receiver configuration: Listen address, port
|
||||
type ReceiverConfig struct {
|
||||
Addr string `json:"address"`
|
||||
Port string `json:"port"`
|
||||
@@ -20,23 +18,23 @@ type ReceiverConfig struct {
|
||||
}
|
||||
|
||||
type receiver struct {
|
||||
typename string
|
||||
name string
|
||||
sink chan lp.CCMetric
|
||||
name string
|
||||
sink chan lp.CCMetric
|
||||
}
|
||||
|
||||
type Receiver interface {
|
||||
Init(name string, config json.RawMessage) error
|
||||
Start()
|
||||
Close()
|
||||
Name() string
|
||||
SetSink(sink chan lp.CCMetric)
|
||||
Close() // Close / finish metric receiver
|
||||
Name() string // Name of the metric receiver
|
||||
SetSink(sink chan lp.CCMetric) // Set sink channel
|
||||
}
|
||||
|
||||
// Name returns the name of the metric receiver
|
||||
func (r *receiver) Name() string {
|
||||
return r.name
|
||||
}
|
||||
|
||||
// SetSink set the sink channel
|
||||
func (r *receiver) SetSink(sink chan lp.CCMetric) {
|
||||
r.sink = sink
|
||||
}
|
||||
|
@@ -32,39 +32,6 @@ var DefaultTime = func() time.Time {
|
||||
return time.Unix(42, 0)
|
||||
}
|
||||
|
||||
func (r *NatsReceiver) Init(name string, config json.RawMessage) error {
|
||||
r.typename = "NatsReceiver"
|
||||
r.name = name
|
||||
r.config.Addr = nats.DefaultURL
|
||||
r.config.Port = "4222"
|
||||
if len(config) > 0 {
|
||||
err := json.Unmarshal(config, &r.config)
|
||||
if err != nil {
|
||||
cclog.ComponentError(r.name, "Error reading config:", err.Error())
|
||||
return err
|
||||
}
|
||||
}
|
||||
if len(r.config.Addr) == 0 ||
|
||||
len(r.config.Port) == 0 ||
|
||||
len(r.config.Subject) == 0 {
|
||||
return errors.New("not all configuration variables set required by NatsReceiver")
|
||||
}
|
||||
r.meta = map[string]string{"source": r.name}
|
||||
uri := fmt.Sprintf("%s:%s", r.config.Addr, r.config.Port)
|
||||
cclog.ComponentDebug(r.name, "INIT", uri, "Subject", r.config.Subject)
|
||||
nc, err := nats.Connect(uri)
|
||||
if err == nil {
|
||||
r.nc = nc
|
||||
} else {
|
||||
r.nc = nil
|
||||
return err
|
||||
}
|
||||
r.handler = influx.NewMetricHandler()
|
||||
r.parser = influx.NewParser(r.handler)
|
||||
r.parser.SetTimeFunc(DefaultTime)
|
||||
return err
|
||||
}
|
||||
|
||||
func (r *NatsReceiver) Start() {
|
||||
cclog.ComponentDebug(r.name, "START")
|
||||
r.nc.Subscribe(r.config.Subject, r._NatsReceive)
|
||||
@@ -91,3 +58,35 @@ func (r *NatsReceiver) Close() {
|
||||
r.nc.Close()
|
||||
}
|
||||
}
|
||||
|
||||
func NewNatsReceiver(name string, config json.RawMessage) (Receiver, error) {
|
||||
r := new(NatsReceiver)
|
||||
r.name = fmt.Sprintf("NatsReceiver(%s)", name)
|
||||
r.config.Addr = nats.DefaultURL
|
||||
r.config.Port = "4222"
|
||||
if len(config) > 0 {
|
||||
err := json.Unmarshal(config, &r.config)
|
||||
if err != nil {
|
||||
cclog.ComponentError(r.name, "Error reading config:", err.Error())
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
if len(r.config.Addr) == 0 ||
|
||||
len(r.config.Port) == 0 ||
|
||||
len(r.config.Subject) == 0 {
|
||||
return nil, errors.New("not all configuration variables set required by NatsReceiver")
|
||||
}
|
||||
r.meta = map[string]string{"source": r.name}
|
||||
uri := fmt.Sprintf("%s:%s", r.config.Addr, r.config.Port)
|
||||
cclog.ComponentDebug(r.name, "NewNatsReceiver", uri, "Subject", r.config.Subject)
|
||||
if nc, err := nats.Connect(uri); err == nil {
|
||||
r.nc = nc
|
||||
} else {
|
||||
r.nc = nil
|
||||
return nil, err
|
||||
}
|
||||
r.handler = influx.NewMetricHandler()
|
||||
r.parser = influx.NewParser(r.handler)
|
||||
r.parser.SetTimeFunc(DefaultTime)
|
||||
return r, nil
|
||||
}
|
||||
|
21
receivers/natsReceiver.md
Normal file
21
receivers/natsReceiver.md
Normal file
@@ -0,0 +1,21 @@
|
||||
## `nats` receiver
|
||||
|
||||
The `nats` receiver can be used receive metrics from the NATS network. The `nats` receiver subscribes to the topic `database` and listens on `address` and `port` for metrics in the InfluxDB line protocol.
|
||||
|
||||
### Configuration structure
|
||||
|
||||
```json
|
||||
{
|
||||
"<name>": {
|
||||
"type": "nats",
|
||||
"address" : "nats-server.example.org",
|
||||
"port" : "4222",
|
||||
"subject" : "subject"
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
- `type`: makes the receiver a `nats` receiver
|
||||
- `address`: Address of the NATS control server
|
||||
- `port`: Port of the NATS control server
|
||||
- `subject`: Subscribes to this subject and receive metrics
|
122
receivers/prometheusReceiver.go
Normal file
122
receivers/prometheusReceiver.go
Normal file
@@ -0,0 +1,122 @@
|
||||
package receivers
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
|
||||
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
|
||||
)
|
||||
|
||||
type PrometheusReceiverConfig struct {
|
||||
defaultReceiverConfig
|
||||
Addr string `json:"address"`
|
||||
Port string `json:"port"`
|
||||
Path string `json:"path"`
|
||||
Interval string `json:"interval"`
|
||||
SSL bool `json:"ssl"`
|
||||
}
|
||||
|
||||
type PrometheusReceiver struct {
|
||||
receiver
|
||||
meta map[string]string
|
||||
config PrometheusReceiverConfig
|
||||
interval time.Duration
|
||||
done chan bool
|
||||
wg sync.WaitGroup
|
||||
ticker *time.Ticker
|
||||
uri string
|
||||
}
|
||||
|
||||
func (r *PrometheusReceiver) Start() {
|
||||
cclog.ComponentDebug(r.name, "START", r.uri)
|
||||
r.wg.Add(1)
|
||||
|
||||
r.ticker = time.NewTicker(r.interval)
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-r.done:
|
||||
r.wg.Done()
|
||||
return
|
||||
case t := <-r.ticker.C:
|
||||
resp, err := http.Get(r.uri)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
scanner := bufio.NewScanner(resp.Body)
|
||||
for scanner.Scan() {
|
||||
line := scanner.Text()
|
||||
if strings.HasPrefix(line, "#") {
|
||||
continue
|
||||
}
|
||||
lineSplit := strings.Fields(line)
|
||||
// separate metric name from tags (labels in Prometheus)
|
||||
tags := map[string]string{}
|
||||
name := lineSplit[0]
|
||||
if sindex := strings.Index(name, "{"); sindex >= 0 {
|
||||
eindex := strings.Index(name, "}")
|
||||
for _, kv := range strings.Split(name[sindex+1:eindex], ",") {
|
||||
eq := strings.Index(kv, "=")
|
||||
tags[kv[0:eq]] = strings.Trim(kv[eq+1:], "\"")
|
||||
}
|
||||
name = lineSplit[0][0:sindex]
|
||||
}
|
||||
value, err := strconv.ParseFloat(lineSplit[1], 64)
|
||||
if err == nil {
|
||||
y, err := lp.New(name, tags, r.meta, map[string]interface{}{"value": value}, t)
|
||||
if err == nil {
|
||||
r.sink <- y
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (r *PrometheusReceiver) Close() {
|
||||
cclog.ComponentDebug(r.name, "CLOSE")
|
||||
r.done <- true
|
||||
r.wg.Wait()
|
||||
}
|
||||
|
||||
func NewPrometheusReceiver(name string, config json.RawMessage) (Receiver, error) {
|
||||
r := new(PrometheusReceiver)
|
||||
r.name = fmt.Sprintf("PrometheusReceiver(%s)", name)
|
||||
if len(config) > 0 {
|
||||
err := json.Unmarshal(config, &r.config)
|
||||
if err != nil {
|
||||
cclog.ComponentError(r.name, "Error reading config:", err.Error())
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
if len(r.config.Addr) == 0 ||
|
||||
len(r.config.Port) == 0 ||
|
||||
len(r.config.Interval) == 0 {
|
||||
return nil, errors.New("not all configuration variables set required by PrometheusReceiver (address and port)")
|
||||
}
|
||||
if len(r.config.Interval) > 0 {
|
||||
t, err := time.ParseDuration(r.config.Interval)
|
||||
if err == nil {
|
||||
r.interval = t
|
||||
}
|
||||
}
|
||||
r.meta = map[string]string{"source": r.name}
|
||||
proto := "http"
|
||||
if r.config.SSL {
|
||||
proto = "https"
|
||||
}
|
||||
r.uri = fmt.Sprintf("%s://%s:%s/%s", proto, r.config.Addr, r.config.Port, r.config.Path)
|
||||
return r, nil
|
||||
}
|
27
receivers/prometheusReceiver.md
Normal file
27
receivers/prometheusReceiver.md
Normal file
@@ -0,0 +1,27 @@
|
||||
## `prometheus` receiver
|
||||
|
||||
The `prometheus` receiver can be used to scrape the metrics of a single `prometheus` client. It does **not** use any official Golang library but making simple HTTP get requests and parse the response.
|
||||
|
||||
### Configuration structure
|
||||
|
||||
```json
|
||||
{
|
||||
"<name>": {
|
||||
"type": "prometheus",
|
||||
"address" : "testpromhost",
|
||||
"port" : "12345",
|
||||
"path" : "/prometheus",
|
||||
"interval": "5s",
|
||||
"ssl" : true,
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
- `type`: makes the receiver a `prometheus` receiver
|
||||
- `address`: Hostname or IP of the Prometheus agent
|
||||
- `port`: Port of Prometheus agent
|
||||
- `path`: Path to the Prometheus endpoint
|
||||
- `interval`: Scrape the Prometheus endpoint in this interval (default '5s')
|
||||
- `ssl`: Use SSL or not
|
||||
|
||||
The receiver requests data from `http(s)://<address>:<port>/<path>`.
|
@@ -9,8 +9,8 @@ import (
|
||||
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
|
||||
)
|
||||
|
||||
var AvailableReceivers = map[string]Receiver{
|
||||
"nats": &NatsReceiver{},
|
||||
var AvailableReceivers = map[string]func(name string, config json.RawMessage) (Receiver, error){
|
||||
"nats": NewNatsReceiver,
|
||||
}
|
||||
|
||||
type receiveManager struct {
|
||||
@@ -30,11 +30,13 @@ type ReceiveManager interface {
|
||||
}
|
||||
|
||||
func (rm *receiveManager) Init(wg *sync.WaitGroup, receiverConfigFile string) error {
|
||||
// Initialize struct fields
|
||||
rm.inputs = make([]Receiver, 0)
|
||||
rm.output = nil
|
||||
rm.done = make(chan bool)
|
||||
rm.wg = wg
|
||||
rm.config = make([]json.RawMessage, 0)
|
||||
|
||||
configFile, err := os.Open(receiverConfigFile)
|
||||
if err != nil {
|
||||
cclog.ComponentError("ReceiveManager", err.Error())
|
||||
@@ -51,6 +53,7 @@ func (rm *receiveManager) Init(wg *sync.WaitGroup, receiverConfigFile string) er
|
||||
for name, raw := range rawConfigs {
|
||||
rm.AddInput(name, raw)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -75,10 +78,9 @@ func (rm *receiveManager) AddInput(name string, rawConfig json.RawMessage) error
|
||||
cclog.ComponentError("ReceiveManager", "SKIP", config.Type, "unknown receiver:", err.Error())
|
||||
return err
|
||||
}
|
||||
r := AvailableReceivers[config.Type]
|
||||
err = r.Init(name, rawConfig)
|
||||
r, err := AvailableReceivers[config.Type](name, rawConfig)
|
||||
if err != nil {
|
||||
cclog.ComponentError("ReceiveManager", "SKIP", r.Name(), "initialization failed:", err.Error())
|
||||
cclog.ComponentError("ReceiveManager", "SKIP", name, "initialization failed:", err.Error())
|
||||
return err
|
||||
}
|
||||
rm.inputs = append(rm.inputs, r)
|
||||
|
91
receivers/sampleReceiver.go
Normal file
91
receivers/sampleReceiver.go
Normal file
@@ -0,0 +1,91 @@
|
||||
package receivers
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
|
||||
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
|
||||
)
|
||||
|
||||
// SampleReceiver configuration: receiver type, listen address, port
|
||||
type SampleReceiverConfig struct {
|
||||
Type string `json:"type"`
|
||||
Addr string `json:"address"`
|
||||
Port string `json:"port"`
|
||||
}
|
||||
|
||||
type SampleReceiver struct {
|
||||
receiver
|
||||
config SampleReceiverConfig
|
||||
|
||||
// Storage for static information
|
||||
meta map[string]string
|
||||
// Use in case of own go routine
|
||||
// done chan bool
|
||||
// wg sync.WaitGroup
|
||||
}
|
||||
|
||||
// Implement functions required for Receiver interface
|
||||
// Start(), Close()
|
||||
// See: metricReceiver.go
|
||||
|
||||
func (r *SampleReceiver) Start() {
|
||||
cclog.ComponentDebug(r.name, "START")
|
||||
|
||||
// Start server process like http.ListenAndServe()
|
||||
|
||||
// or use own go routine but always make sure it exits
|
||||
// as soon as it gets the signal of the r.done channel
|
||||
// r.wg.Add(1)
|
||||
// go func() {
|
||||
// for {
|
||||
// select {
|
||||
// case <-r.done:
|
||||
// r.wg.Done()
|
||||
// return
|
||||
// }
|
||||
// }
|
||||
// r.wg.Done()
|
||||
// }()
|
||||
}
|
||||
|
||||
// Close receiver: close network connection, close files, close libraries, ...
|
||||
func (r *SampleReceiver) Close() {
|
||||
cclog.ComponentDebug(r.name, "CLOSE")
|
||||
|
||||
// Close server like http.Shutdown()
|
||||
|
||||
// in case of own go routine, send the signal and wait
|
||||
// r.done <- true
|
||||
// r.wg.Wait()
|
||||
}
|
||||
|
||||
// New function to create a new instance of the receiver
|
||||
// Initialize the receiver by giving it a name and reading in the config JSON
|
||||
func NewSampleReceiver(name string, config json.RawMessage) (Receiver, error) {
|
||||
r := new(SampleReceiver)
|
||||
|
||||
// Set name of SampleReceiver
|
||||
// The name should be chosen in such a way that different instances of SampleReceiver can be distinguished
|
||||
r.name = fmt.Sprintf("SampleReceiver(%s)", name)
|
||||
|
||||
// Set static information
|
||||
r.meta = map[string]string{"source": r.name}
|
||||
|
||||
// Set defaults in r.config
|
||||
// Allow overwriting these defaults by reading config JSON
|
||||
|
||||
// Read the sample receiver specific JSON config
|
||||
if len(config) > 0 {
|
||||
err := json.Unmarshal(config, &r.config)
|
||||
if err != nil {
|
||||
cclog.ComponentError(r.name, "Error reading config:", err.Error())
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// Check that all required fields in the configuration are set
|
||||
// Use 'if len(r.config.Option) > 0' for strings
|
||||
|
||||
return r, nil
|
||||
}
|
@@ -1,5 +1,5 @@
|
||||
Name: cc-metric-collector
|
||||
Version: 0.2
|
||||
Version: %{VERS}
|
||||
Release: 1%{?dist}
|
||||
Summary: Metric collection daemon from the ClusterCockpit suite
|
||||
|
||||
@@ -7,6 +7,7 @@ License: MIT
|
||||
Source0: %{name}-%{version}.tar.gz
|
||||
|
||||
BuildRequires: go-toolset
|
||||
BuildRequires: systemd-rpm-macros
|
||||
# for internal LIKWID installation
|
||||
BuildRequires: wget perl-Data-Dumper
|
||||
|
||||
@@ -34,11 +35,15 @@ install -Dpm 0600 receivers.json %{buildroot}%{_sysconfdir}/%{name}/receivers.js
|
||||
install -Dpm 0600 router.json %{buildroot}%{_sysconfdir}/%{name}/router.json
|
||||
install -Dpm 0644 scripts/%{name}.service %{buildroot}%{_unitdir}/%{name}.service
|
||||
install -Dpm 0600 scripts/%{name}.config %{buildroot}%{_sysconfdir}/default/%{name}
|
||||
install -Dpm 0644 scripts/%{name}.sysusers %{buildroot}%{_sysusersdir}/%{name}.conf
|
||||
|
||||
|
||||
%check
|
||||
# go test should be here... :)
|
||||
|
||||
%pre
|
||||
%sysusers_create_package scripts/%{name}.sysusers
|
||||
|
||||
%post
|
||||
%systemd_post %{name}.service
|
||||
|
||||
@@ -46,17 +51,23 @@ install -Dpm 0600 scripts/%{name}.config %{buildroot}%{_sysconfdir}/default/%{na
|
||||
%systemd_preun %{name}.service
|
||||
|
||||
%files
|
||||
# Binary
|
||||
%attr(-,clustercockpit,clustercockpit) %{_sbindir}/%{name}
|
||||
# Configuration
|
||||
%dir %{_sysconfdir}/%{name}
|
||||
%{_sbindir}/%{name}
|
||||
%attr(0600,clustercockpit,clustercockpit) %config(noreplace) %{_sysconfdir}/%{name}/%{name}.json
|
||||
%attr(0600,clustercockpit,clustercockpit) %config(noreplace) %{_sysconfdir}/%{name}/collectors.json
|
||||
%attr(0600,clustercockpit,clustercockpit) %config(noreplace) %{_sysconfdir}/%{name}/sinks.json
|
||||
%attr(0600,clustercockpit,clustercockpit) %config(noreplace) %{_sysconfdir}/%{name}/receivers.json
|
||||
%attr(0600,clustercockpit,clustercockpit) %config(noreplace) %{_sysconfdir}/%{name}/router.json
|
||||
# Systemd
|
||||
%{_sysusersdir}/%{name}.conf
|
||||
%{_unitdir}/%{name}.service
|
||||
%{_sysconfdir}/default/%{name}
|
||||
%attr(0600,root,root) %config(noreplace) %{_sysconfdir}/%{name}/%{name}.json
|
||||
%attr(0600,root,root) %config(noreplace) %{_sysconfdir}/%{name}/collectors.json
|
||||
%attr(0600,root,root) %config(noreplace) %{_sysconfdir}/%{name}/sinks.json
|
||||
%attr(0600,root,root) %config(noreplace) %{_sysconfdir}/%{name}/receivers.json
|
||||
%attr(0600,root,root) %config(noreplace) %{_sysconfdir}/%{name}/router.json
|
||||
|
||||
%changelog
|
||||
* Thu Mar 03 2022 Thomas Gruber - 0.3
|
||||
- Add clustercockpit user installation
|
||||
* Mon Feb 14 2022 Thomas Gruber - 0.2
|
||||
- Add component specific configuration files
|
||||
- Add %attr to config files
|
||||
|
2
scripts/cc-metric-collector.sysusers
Normal file
2
scripts/cc-metric-collector.sysusers
Normal file
@@ -0,0 +1,2 @@
|
||||
#Type Name ID GECOS Home directory Shell
|
||||
u clustercockpit - "User for ClusterCockpit" /run/cc-metric-collector /sbin/nologin
|
@@ -39,7 +39,7 @@ def group_to_json(groupfile):
|
||||
llist = re.split("\s+", line)
|
||||
calc = llist[-1]
|
||||
metric = " ".join(llist[:-1])
|
||||
scope = "hwthread"
|
||||
scope = "cpu"
|
||||
if "BOX" in calc:
|
||||
scope = "socket"
|
||||
if "PWR" in calc:
|
||||
|
@@ -1,12 +0,0 @@
|
||||
|
||||
all: libganglia.so
|
||||
|
||||
libganglia.so:
|
||||
@find /usr ! -readable -prune -o -type d ! -executable -prune -o -name "$@*" -print0 | \
|
||||
xargs --null --no-run-if-empty --replace \
|
||||
ln --symbolic --verbose --force '{}' "$@"
|
||||
|
||||
clean:
|
||||
rm -f libganglia.so
|
||||
|
||||
.PHONY: clean
|
@@ -6,9 +6,11 @@ This folder contains the SinkManager and sink implementations for the cc-metric-
|
||||
- [`stdout`](./stdoutSink.md): Print all metrics to `stdout`, `stderr` or a file
|
||||
- [`http`](./httpSink.md): Send metrics to an HTTP server as POST requests
|
||||
- [`influxdb`](./influxSink.md): Send metrics to an [InfluxDB](https://www.influxdata.com/products/influxdb/) database
|
||||
- [`influxasync`](./influxAsyncSink.md): Send metrics to an [InfluxDB](https://www.influxdata.com/products/influxdb/) database with non-blocking write API
|
||||
- [`nats`](./natsSink.md): Publish metrics to the [NATS](https://nats.io/) network overlay system
|
||||
- [`ganglia`](./gangliaSink.md): Publish metrics in the [Ganglia Monitoring System](http://ganglia.info/) using the `gmetric` CLI tool
|
||||
- [`libganglia`](./libgangliaSink.md): Publish metrics in the [Ganglia Monitoring System](http://ganglia.info/) directly using `libganglia.so`
|
||||
- [`prometeus`](./prometheusSink.md): Publish metrics for the [Prometheus Monitoring System](https://prometheus.io/)
|
||||
|
||||
# Configuration
|
||||
|
||||
@@ -34,11 +36,12 @@ The configuration file for the sinks is a list of configurations. The `type` fie
|
||||
|
||||
|
||||
# Contributing own sinks
|
||||
A sink contains four functions and is derived from the type `sink`:
|
||||
* `Init(config json.RawMessage) error`
|
||||
A sink contains five functions and is derived from the type `sink`:
|
||||
* `Init(name string, config json.RawMessage) error`
|
||||
* `Write(point CCMetric) error`
|
||||
* `Flush() error`
|
||||
* `Close()`
|
||||
* `New<Typename>(name string, config json.RawMessage) (Sink, error)` (calls the `Init()` function)
|
||||
|
||||
The data structures should be set up in `Init()` like opening a file or server connection. The `Write()` function writes/sends the data. For non-blocking sinks, the `Flush()` method tells the sink to drain its internal buffers. The `Close()` function should tear down anything created in `Init()`.
|
||||
|
||||
@@ -65,8 +68,8 @@ type SampleSink struct {
|
||||
}
|
||||
|
||||
// Initialize the sink by giving it a name and reading in the config JSON
|
||||
func (s *SampleSink) Init(config json.RawMessage) error {
|
||||
s.name = "SampleSink" // Always specify a name here
|
||||
func (s *SampleSink) Init(name string, config json.RawMessage) error {
|
||||
s.name = fmt.Sprintf("SampleSink(%s)", name) // Always specify a name here
|
||||
// Read in the config JSON
|
||||
if len(config) > 0 {
|
||||
err := json.Unmarshal(config, &s.config)
|
||||
@@ -91,4 +94,13 @@ func (s *SampleSink) Flush() error {
|
||||
|
||||
// Close sink: close network connection, close files, close libraries, ...
|
||||
func (s *SampleSink) Close() {}
|
||||
|
||||
|
||||
// New function to create a new instance of the sink
|
||||
func NewSampleSink(name string, config json.RawMessage) (Sink, error) {
|
||||
s := new(SampleSink)
|
||||
err := s.Init(name, config)
|
||||
return s, err
|
||||
}
|
||||
|
||||
```
|
@@ -1,6 +1,7 @@
|
||||
package sinks
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
|
||||
@@ -23,11 +24,8 @@ func GangliaMetricName(point lp.CCMetric) string {
|
||||
return name
|
||||
}
|
||||
|
||||
func GangliaMetricRename(point lp.CCMetric) string {
|
||||
name := point.Name()
|
||||
if name == "mem_total" || name == "swap_total" {
|
||||
return name
|
||||
} else if name == "net_bytes_in" {
|
||||
func GangliaMetricRename(name string) string {
|
||||
if name == "net_bytes_in" {
|
||||
return "bytes_in"
|
||||
} else if name == "net_bytes_out" {
|
||||
return "bytes_out"
|
||||
@@ -48,3 +46,213 @@ func GangliaSlopeType(point lp.CCMetric) uint {
|
||||
}
|
||||
return 3
|
||||
}
|
||||
|
||||
const DEFAULT_GANGLIA_METRIC_TMAX = 300
|
||||
const DEFAULT_GANGLIA_METRIC_SLOPE = "both"
|
||||
|
||||
type GangliaMetric struct {
|
||||
Name string
|
||||
Type string
|
||||
Slope string
|
||||
Tmax int
|
||||
Unit string
|
||||
}
|
||||
|
||||
type GangliaMetricGroup struct {
|
||||
Name string
|
||||
Metrics []GangliaMetric
|
||||
}
|
||||
|
||||
var CommonGangliaMetrics = []GangliaMetricGroup{
|
||||
{
|
||||
Name: "memory",
|
||||
Metrics: []GangliaMetric{
|
||||
{"mem_total", "float", "zero", 1200, "KB"},
|
||||
{"swap_total", "float", "zero", 1200, "KB"},
|
||||
{"mem_free", "float", "both", 180, "KB"},
|
||||
{"mem_shared", "float", "both", 180, "KB"},
|
||||
{"mem_buffers", "float", "both", 180, "KB"},
|
||||
{"mem_cached", "float", "both", 180, "KB"},
|
||||
{"swap_free", "float", "both", 180, "KB"},
|
||||
{"mem_sreclaimable", "float", "both", 180, "KB"},
|
||||
{"mem_slab", "float", "both", 180, "KB"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "cpu",
|
||||
Metrics: []GangliaMetric{
|
||||
{"cpu_num", "uint32", "zero", 1200, "CPUs"},
|
||||
{"cpu_speed", "uint32", "zero", 1200, "MHz"},
|
||||
{"cpu_user", "float", "both", 90, "%"},
|
||||
{"cpu_nice", "float", "both", 90, "%"},
|
||||
{"cpu_system", "float", "both", 90, "%"},
|
||||
{"cpu_idle", "float", "both", 3800, "%"},
|
||||
{"cpu_aidle", "float", "both", 90, "%"},
|
||||
{"cpu_wio", "float", "both", 90, "%"},
|
||||
{"cpu_intr", "float", "both", 90, "%"},
|
||||
{"cpu_sintr", "float", "both", 90, "%"},
|
||||
{"cpu_steal", "float", "both", 90, "%"},
|
||||
{"cpu_guest", "float", "both", 90, "%"},
|
||||
{"cpu_gnice", "float", "both", 90, "%"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "load",
|
||||
Metrics: []GangliaMetric{
|
||||
{"load_one", "float", "both", 70, ""},
|
||||
{"load_five", "float", "both", 325, ""},
|
||||
{"load_fifteen", "float", "both", 950, ""},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "disk",
|
||||
Metrics: []GangliaMetric{
|
||||
{"disk_total", "double", "both", 1200, "GB"},
|
||||
{"disk_free", "double", "both", 180, "GB"},
|
||||
{"part_max_used", "float", "both", 180, "%"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "network",
|
||||
Metrics: []GangliaMetric{
|
||||
{"bytes_out", "float", "both", 300, "bytes/sec"},
|
||||
{"bytes_in", "float", "both", 300, "bytes/sec"},
|
||||
{"pkts_in", "float", "both", 300, "packets/sec"},
|
||||
{"pkts_out", "float", "both", 300, "packets/sec"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "process",
|
||||
Metrics: []GangliaMetric{
|
||||
{"proc_run", "uint32", "both", 950, ""},
|
||||
{"proc_total", "uint32", "both", 950, ""},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "system",
|
||||
Metrics: []GangliaMetric{
|
||||
{"boottime", "uint32", "zero", 1200, "s"},
|
||||
{"sys_clock", "uint32", "zero", 1200, "s"},
|
||||
{"machine_type", "string", "zero", 1200, ""},
|
||||
{"os_name", "string", "zero", 1200, ""},
|
||||
{"os_release", "string", "zero", 1200, ""},
|
||||
{"mtu", "uint32", "both", 1200, ""},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
type GangliaMetricConfig struct {
|
||||
Type string
|
||||
Slope string
|
||||
Tmax int
|
||||
Unit string
|
||||
Group string
|
||||
Value string
|
||||
}
|
||||
|
||||
func GetCommonGangliaConfig(point lp.CCMetric) GangliaMetricConfig {
|
||||
mname := GangliaMetricRename(point.Name())
|
||||
for _, group := range CommonGangliaMetrics {
|
||||
for _, metric := range group.Metrics {
|
||||
if metric.Name == mname {
|
||||
valueStr := ""
|
||||
value, ok := point.GetField("value")
|
||||
if ok {
|
||||
switch real := value.(type) {
|
||||
case float64:
|
||||
valueStr = fmt.Sprintf("%f", real)
|
||||
case float32:
|
||||
valueStr = fmt.Sprintf("%f", real)
|
||||
case int64:
|
||||
valueStr = fmt.Sprintf("%d", real)
|
||||
case int32:
|
||||
valueStr = fmt.Sprintf("%d", real)
|
||||
case int:
|
||||
valueStr = fmt.Sprintf("%d", real)
|
||||
case uint64:
|
||||
valueStr = fmt.Sprintf("%d", real)
|
||||
case uint32:
|
||||
valueStr = fmt.Sprintf("%d", real)
|
||||
case uint:
|
||||
valueStr = fmt.Sprintf("%d", real)
|
||||
case string:
|
||||
valueStr = real
|
||||
default:
|
||||
}
|
||||
}
|
||||
return GangliaMetricConfig{
|
||||
Group: group.Name,
|
||||
Type: metric.Type,
|
||||
Slope: metric.Slope,
|
||||
Tmax: metric.Tmax,
|
||||
Unit: metric.Unit,
|
||||
Value: valueStr,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return GangliaMetricConfig{
|
||||
Group: "",
|
||||
Type: "",
|
||||
Slope: "",
|
||||
Tmax: 0,
|
||||
Unit: "",
|
||||
Value: "",
|
||||
}
|
||||
}
|
||||
|
||||
func GetGangliaConfig(point lp.CCMetric) GangliaMetricConfig {
|
||||
group := ""
|
||||
if g, ok := point.GetMeta("group"); ok {
|
||||
group = g
|
||||
}
|
||||
unit := ""
|
||||
if u, ok := point.GetMeta("unit"); ok {
|
||||
unit = u
|
||||
}
|
||||
valueType := "double"
|
||||
valueStr := ""
|
||||
value, ok := point.GetField("value")
|
||||
if ok {
|
||||
switch real := value.(type) {
|
||||
case float64:
|
||||
valueStr = fmt.Sprintf("%f", real)
|
||||
valueType = "double"
|
||||
case float32:
|
||||
valueStr = fmt.Sprintf("%f", real)
|
||||
valueType = "float"
|
||||
case int64:
|
||||
valueStr = fmt.Sprintf("%d", real)
|
||||
valueType = "int32"
|
||||
case int32:
|
||||
valueStr = fmt.Sprintf("%d", real)
|
||||
valueType = "int32"
|
||||
case int:
|
||||
valueStr = fmt.Sprintf("%d", real)
|
||||
valueType = "int32"
|
||||
case uint64:
|
||||
valueStr = fmt.Sprintf("%d", real)
|
||||
valueType = "uint32"
|
||||
case uint32:
|
||||
valueStr = fmt.Sprintf("%d", real)
|
||||
valueType = "uint32"
|
||||
case uint:
|
||||
valueStr = fmt.Sprintf("%d", real)
|
||||
valueType = "uint32"
|
||||
case string:
|
||||
valueStr = real
|
||||
valueType = "string"
|
||||
default:
|
||||
valueType = "invalid"
|
||||
}
|
||||
}
|
||||
|
||||
return GangliaMetricConfig{
|
||||
Group: group,
|
||||
Type: valueType,
|
||||
Slope: DEFAULT_GANGLIA_METRIC_SLOPE,
|
||||
Tmax: DEFAULT_GANGLIA_METRIC_TMAX,
|
||||
Unit: unit,
|
||||
Value: valueStr,
|
||||
}
|
||||
}
|
||||
|
@@ -24,6 +24,7 @@ type GangliaSinkConfig struct {
|
||||
AddTagsAsDesc bool `json:"add_tags_as_desc,omitempty"`
|
||||
ClusterName string `json:"cluster_name,omitempty"`
|
||||
AddTypeToName bool `json:"add_type_to_name,omitempty"`
|
||||
AddUnits bool `json:"add_units,omitempty"`
|
||||
}
|
||||
|
||||
type GangliaSink struct {
|
||||
@@ -33,16 +34,73 @@ type GangliaSink struct {
|
||||
config GangliaSinkConfig
|
||||
}
|
||||
|
||||
func (s *GangliaSink) Init(config json.RawMessage) error {
|
||||
func (s *GangliaSink) Write(point lp.CCMetric) error {
|
||||
var err error = nil
|
||||
s.name = "GangliaSink"
|
||||
//var tagsstr []string
|
||||
var argstr []string
|
||||
|
||||
// Get metric name
|
||||
metricname := GangliaMetricRename(point.Name())
|
||||
|
||||
// Get metric config (type, value, ... in suitable format)
|
||||
conf := GetCommonGangliaConfig(point)
|
||||
if len(conf.Type) == 0 {
|
||||
conf = GetGangliaConfig(point)
|
||||
}
|
||||
if len(conf.Type) == 0 {
|
||||
return fmt.Errorf("metric %s has no 'value' field", metricname)
|
||||
}
|
||||
|
||||
if s.config.AddGangliaGroup {
|
||||
argstr = append(argstr, fmt.Sprintf("--group=%s", conf.Group))
|
||||
}
|
||||
if s.config.AddUnits && len(conf.Unit) > 0 {
|
||||
argstr = append(argstr, fmt.Sprintf("--units=%s", conf.Unit))
|
||||
}
|
||||
|
||||
if len(s.config.ClusterName) > 0 {
|
||||
argstr = append(argstr, fmt.Sprintf("--cluster=%s", s.config.ClusterName))
|
||||
}
|
||||
// if s.config.AddTagsAsDesc && len(tagsstr) > 0 {
|
||||
// argstr = append(argstr, fmt.Sprintf("--desc=%q", strings.Join(tagsstr, ",")))
|
||||
// }
|
||||
if len(s.gmetric_config) > 0 {
|
||||
argstr = append(argstr, fmt.Sprintf("--conf=%s", s.gmetric_config))
|
||||
}
|
||||
if s.config.AddTypeToName {
|
||||
argstr = append(argstr, fmt.Sprintf("--name=%s", GangliaMetricName(point)))
|
||||
} else {
|
||||
argstr = append(argstr, fmt.Sprintf("--name=%s", metricname))
|
||||
}
|
||||
argstr = append(argstr, fmt.Sprintf("--slope=%s", conf.Slope))
|
||||
argstr = append(argstr, fmt.Sprintf("--value=%s", conf.Value))
|
||||
argstr = append(argstr, fmt.Sprintf("--type=%s", conf.Type))
|
||||
argstr = append(argstr, fmt.Sprintf("--tmax=%d", conf.Tmax))
|
||||
|
||||
cclog.ComponentDebug(s.name, s.gmetric_path, strings.Join(argstr, " "))
|
||||
command := exec.Command(s.gmetric_path, argstr...)
|
||||
command.Wait()
|
||||
_, err = command.Output()
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *GangliaSink) Flush() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *GangliaSink) Close() {
|
||||
}
|
||||
|
||||
func NewGangliaSink(name string, config json.RawMessage) (Sink, error) {
|
||||
s := new(GangliaSink)
|
||||
s.name = fmt.Sprintf("GangliaSink(%s)", name)
|
||||
s.config.AddTagsAsDesc = false
|
||||
s.config.AddGangliaGroup = false
|
||||
if len(config) > 0 {
|
||||
err := json.Unmarshal(config, &s.config)
|
||||
if err != nil {
|
||||
cclog.ComponentError(s.name, "Error reading config for", s.name, ":", err.Error())
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
s.gmetric_path = ""
|
||||
@@ -60,98 +118,10 @@ func (s *GangliaSink) Init(config json.RawMessage) error {
|
||||
}
|
||||
}
|
||||
if len(s.gmetric_path) == 0 {
|
||||
err = errors.New("cannot find executable 'gmetric'")
|
||||
return nil, errors.New("cannot find executable 'gmetric'")
|
||||
}
|
||||
if len(s.config.GmetricConfig) > 0 {
|
||||
s.gmetric_config = s.config.GmetricConfig
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *GangliaSink) Write(point lp.CCMetric) error {
|
||||
var err error = nil
|
||||
var tagsstr []string
|
||||
var argstr []string
|
||||
if s.config.AddGangliaGroup {
|
||||
if point.HasTag("group") {
|
||||
g, _ := point.GetTag("group")
|
||||
argstr = append(argstr, fmt.Sprintf("--group=%s", g))
|
||||
} else if point.HasMeta("group") {
|
||||
g, _ := point.GetMeta("group")
|
||||
argstr = append(argstr, fmt.Sprintf("--group=%s", g))
|
||||
}
|
||||
}
|
||||
|
||||
for key, value := range point.Tags() {
|
||||
switch key {
|
||||
case "unit":
|
||||
argstr = append(argstr, fmt.Sprintf("--units=%s", value))
|
||||
default:
|
||||
tagsstr = append(tagsstr, fmt.Sprintf("%s=%s", key, value))
|
||||
}
|
||||
}
|
||||
if s.config.MetaAsTags {
|
||||
for key, value := range point.Meta() {
|
||||
switch key {
|
||||
case "unit":
|
||||
argstr = append(argstr, fmt.Sprintf("--units=%s", value))
|
||||
default:
|
||||
tagsstr = append(tagsstr, fmt.Sprintf("%s=%s", key, value))
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(s.config.ClusterName) > 0 {
|
||||
argstr = append(argstr, fmt.Sprintf("--cluster=%s", s.config.ClusterName))
|
||||
}
|
||||
if s.config.AddTagsAsDesc && len(tagsstr) > 0 {
|
||||
argstr = append(argstr, fmt.Sprintf("--desc=%q", strings.Join(tagsstr, ",")))
|
||||
}
|
||||
if len(s.gmetric_config) > 0 {
|
||||
argstr = append(argstr, fmt.Sprintf("--conf=%s", s.gmetric_config))
|
||||
}
|
||||
name := GangliaMetricRename(point)
|
||||
if s.config.AddTypeToName {
|
||||
argstr = append(argstr, fmt.Sprintf("--name=%s", GangliaMetricName(point)))
|
||||
} else {
|
||||
argstr = append(argstr, fmt.Sprintf("--name=%s", name))
|
||||
}
|
||||
slope := GangliaSlopeType(point)
|
||||
slopeStr := "both"
|
||||
if slope == 0 {
|
||||
slopeStr = "zero"
|
||||
}
|
||||
argstr = append(argstr, fmt.Sprintf("--slope=%s", slopeStr))
|
||||
|
||||
for k, v := range point.Fields() {
|
||||
if k == "value" {
|
||||
switch value := v.(type) {
|
||||
case float64:
|
||||
argstr = append(argstr,
|
||||
fmt.Sprintf("--value=%v", value), "--type=double")
|
||||
case float32:
|
||||
argstr = append(argstr,
|
||||
fmt.Sprintf("--value=%v", value), "--type=float")
|
||||
case int:
|
||||
argstr = append(argstr,
|
||||
fmt.Sprintf("--value=%d", value), "--type=int32")
|
||||
case int64:
|
||||
argstr = append(argstr,
|
||||
fmt.Sprintf("--value=%d", value), "--type=int32")
|
||||
case string:
|
||||
argstr = append(argstr,
|
||||
fmt.Sprintf("--value=%q", value), "--type=string")
|
||||
}
|
||||
}
|
||||
}
|
||||
command := exec.Command(s.gmetric_path, argstr...)
|
||||
command.Wait()
|
||||
_, err = command.Output()
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *GangliaSink) Flush() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *GangliaSink) Close() {
|
||||
return s, nil
|
||||
}
|
||||
|
@@ -38,57 +38,6 @@ type HttpSink struct {
|
||||
flushDelay time.Duration
|
||||
}
|
||||
|
||||
func (s *HttpSink) Init(config json.RawMessage) error {
|
||||
// Set default values
|
||||
s.name = "HttpSink"
|
||||
s.config.MaxIdleConns = 10
|
||||
s.config.IdleConnTimeout = "5s"
|
||||
s.config.Timeout = "5s"
|
||||
s.config.FlushDelay = "1s"
|
||||
|
||||
// Read config
|
||||
if len(config) > 0 {
|
||||
err := json.Unmarshal(config, &s.config)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if len(s.config.URL) == 0 {
|
||||
return errors.New("`url` config option is required for HTTP sink")
|
||||
}
|
||||
if s.config.MaxIdleConns > 0 {
|
||||
s.maxIdleConns = s.config.MaxIdleConns
|
||||
}
|
||||
if len(s.config.IdleConnTimeout) > 0 {
|
||||
t, err := time.ParseDuration(s.config.IdleConnTimeout)
|
||||
if err == nil {
|
||||
s.idleConnTimeout = t
|
||||
}
|
||||
}
|
||||
if len(s.config.Timeout) > 0 {
|
||||
t, err := time.ParseDuration(s.config.Timeout)
|
||||
if err == nil {
|
||||
s.timeout = t
|
||||
}
|
||||
}
|
||||
if len(s.config.FlushDelay) > 0 {
|
||||
t, err := time.ParseDuration(s.config.FlushDelay)
|
||||
if err == nil {
|
||||
s.flushDelay = t
|
||||
}
|
||||
}
|
||||
tr := &http.Transport{
|
||||
MaxIdleConns: s.maxIdleConns,
|
||||
IdleConnTimeout: s.idleConnTimeout,
|
||||
}
|
||||
s.client = &http.Client{Transport: tr, Timeout: s.timeout}
|
||||
s.buffer = &bytes.Buffer{}
|
||||
s.encoder = influx.NewEncoder(s.buffer)
|
||||
s.encoder.SetPrecision(time.Second)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *HttpSink) Write(m lp.CCMetric) error {
|
||||
if s.buffer.Len() == 0 && s.flushDelay != 0 {
|
||||
// This is the first write since the last flush, start the flushTimer!
|
||||
@@ -169,3 +118,54 @@ func (s *HttpSink) Close() {
|
||||
}
|
||||
s.client.CloseIdleConnections()
|
||||
}
|
||||
|
||||
func NewHttpSink(name string, config json.RawMessage) (Sink, error) {
|
||||
s := new(HttpSink)
|
||||
// Set default values
|
||||
s.name = fmt.Sprintf("HttpSink(%s)", name)
|
||||
s.config.MaxIdleConns = 10
|
||||
s.config.IdleConnTimeout = "5s"
|
||||
s.config.Timeout = "5s"
|
||||
s.config.FlushDelay = "1s"
|
||||
|
||||
// Read config
|
||||
if len(config) > 0 {
|
||||
err := json.Unmarshal(config, &s.config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
if len(s.config.URL) == 0 {
|
||||
return nil, errors.New("`url` config option is required for HTTP sink")
|
||||
}
|
||||
if s.config.MaxIdleConns > 0 {
|
||||
s.maxIdleConns = s.config.MaxIdleConns
|
||||
}
|
||||
if len(s.config.IdleConnTimeout) > 0 {
|
||||
t, err := time.ParseDuration(s.config.IdleConnTimeout)
|
||||
if err == nil {
|
||||
s.idleConnTimeout = t
|
||||
}
|
||||
}
|
||||
if len(s.config.Timeout) > 0 {
|
||||
t, err := time.ParseDuration(s.config.Timeout)
|
||||
if err == nil {
|
||||
s.timeout = t
|
||||
}
|
||||
}
|
||||
if len(s.config.FlushDelay) > 0 {
|
||||
t, err := time.ParseDuration(s.config.FlushDelay)
|
||||
if err == nil {
|
||||
s.flushDelay = t
|
||||
}
|
||||
}
|
||||
tr := &http.Transport{
|
||||
MaxIdleConns: s.maxIdleConns,
|
||||
IdleConnTimeout: s.idleConnTimeout,
|
||||
}
|
||||
s.client = &http.Client{Transport: tr, Timeout: s.timeout}
|
||||
s.buffer = &bytes.Buffer{}
|
||||
s.encoder = influx.NewEncoder(s.buffer)
|
||||
s.encoder.SetPrecision(time.Second)
|
||||
return s, nil
|
||||
}
|
||||
|
@@ -1,6 +1,7 @@
|
||||
package sinks
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
@@ -30,11 +31,10 @@ type InfluxAsyncSinkConfig struct {
|
||||
|
||||
type InfluxAsyncSink struct {
|
||||
sink
|
||||
client influxdb2.Client
|
||||
writeApi influxdb2Api.WriteAPI
|
||||
retPolicy string
|
||||
errors <-chan error
|
||||
config InfluxAsyncSinkConfig
|
||||
client influxdb2.Client
|
||||
writeApi influxdb2Api.WriteAPI
|
||||
errors <-chan error
|
||||
config InfluxAsyncSinkConfig
|
||||
}
|
||||
|
||||
func (s *InfluxAsyncSink) connect() error {
|
||||
@@ -65,42 +65,16 @@ func (s *InfluxAsyncSink) connect() error {
|
||||
)
|
||||
s.client = influxdb2.NewClientWithOptions(uri, auth, clientOptions)
|
||||
s.writeApi = s.client.WriteAPI(s.config.Organization, s.config.Database)
|
||||
ok, err := s.client.Ping(context.Background())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !ok {
|
||||
return fmt.Errorf("connection to %s not healthy", uri)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *InfluxAsyncSink) Init(config json.RawMessage) error {
|
||||
s.name = "InfluxSink"
|
||||
|
||||
// Set default for maximum number of points sent to server in single request.
|
||||
s.config.BatchSize = 100
|
||||
|
||||
if len(config) > 0 {
|
||||
err := json.Unmarshal(config, &s.config)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if len(s.config.Host) == 0 ||
|
||||
len(s.config.Port) == 0 ||
|
||||
len(s.config.Database) == 0 ||
|
||||
len(s.config.Organization) == 0 ||
|
||||
len(s.config.Password) == 0 {
|
||||
return errors.New("not all configuration variables set required by InfluxAsyncSink")
|
||||
}
|
||||
|
||||
// Connect to InfluxDB server
|
||||
err := s.connect()
|
||||
|
||||
// Start background: Read from error channel
|
||||
s.errors = s.writeApi.Errors()
|
||||
go func() {
|
||||
for err := range s.errors {
|
||||
cclog.ComponentError(s.name, err.Error())
|
||||
}
|
||||
}()
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *InfluxAsyncSink) Write(m lp.CCMetric) error {
|
||||
s.writeApi.WritePoint(
|
||||
m.ToPoint(s.config.MetaAsTags),
|
||||
@@ -118,3 +92,40 @@ func (s *InfluxAsyncSink) Close() {
|
||||
s.writeApi.Flush()
|
||||
s.client.Close()
|
||||
}
|
||||
|
||||
func NewInfluxAsyncSink(name string, config json.RawMessage) (Sink, error) {
|
||||
s := new(InfluxAsyncSink)
|
||||
s.name = fmt.Sprintf("InfluxSink(%s)", name)
|
||||
|
||||
// Set default for maximum number of points sent to server in single request.
|
||||
s.config.BatchSize = 100
|
||||
|
||||
if len(config) > 0 {
|
||||
err := json.Unmarshal(config, &s.config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
if len(s.config.Host) == 0 ||
|
||||
len(s.config.Port) == 0 ||
|
||||
len(s.config.Database) == 0 ||
|
||||
len(s.config.Organization) == 0 ||
|
||||
len(s.config.Password) == 0 {
|
||||
return nil, errors.New("not all configuration variables set required by InfluxAsyncSink")
|
||||
}
|
||||
|
||||
// Connect to InfluxDB server
|
||||
if err := s.connect(); err != nil {
|
||||
return nil, fmt.Errorf("unable to connect: %v", err)
|
||||
}
|
||||
|
||||
// Start background: Read from error channel
|
||||
s.errors = s.writeApi.Errors()
|
||||
go func() {
|
||||
for err := range s.errors {
|
||||
cclog.ComponentError(s.name, err.Error())
|
||||
}
|
||||
}()
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
@@ -54,29 +54,16 @@ func (s *InfluxSink) connect() error {
|
||||
)
|
||||
s.client = influxdb2.NewClientWithOptions(uri, auth, clientOptions)
|
||||
s.writeApi = s.client.WriteAPIBlocking(s.config.Organization, s.config.Database)
|
||||
ok, err := s.client.Ping(context.Background())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !ok {
|
||||
return fmt.Errorf("connection to %s not healthy", uri)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *InfluxSink) Init(config json.RawMessage) error {
|
||||
s.name = "InfluxSink"
|
||||
if len(config) > 0 {
|
||||
err := json.Unmarshal(config, &s.config)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if len(s.config.Host) == 0 ||
|
||||
len(s.config.Port) == 0 ||
|
||||
len(s.config.Database) == 0 ||
|
||||
len(s.config.Organization) == 0 ||
|
||||
len(s.config.Password) == 0 {
|
||||
return errors.New("not all configuration variables set required by InfluxSink")
|
||||
}
|
||||
|
||||
// Connect to InfluxDB server
|
||||
return s.connect()
|
||||
}
|
||||
|
||||
func (s *InfluxSink) Write(m lp.CCMetric) error {
|
||||
err :=
|
||||
s.writeApi.WritePoint(
|
||||
@@ -94,3 +81,27 @@ func (s *InfluxSink) Close() {
|
||||
cclog.ComponentDebug(s.name, "Closing InfluxDB connection")
|
||||
s.client.Close()
|
||||
}
|
||||
|
||||
func NewInfluxSink(name string, config json.RawMessage) (Sink, error) {
|
||||
s := new(InfluxSink)
|
||||
s.name = fmt.Sprintf("InfluxSink(%s)", name)
|
||||
if len(config) > 0 {
|
||||
err := json.Unmarshal(config, &s.config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
if len(s.config.Host) == 0 ||
|
||||
len(s.config.Port) == 0 ||
|
||||
len(s.config.Database) == 0 ||
|
||||
len(s.config.Organization) == 0 ||
|
||||
len(s.config.Password) == 0 {
|
||||
return nil, errors.New("not all configuration variables set required by InfluxSink")
|
||||
}
|
||||
|
||||
// Connect to InfluxDB server
|
||||
if err := s.connect(); err != nil {
|
||||
return nil, fmt.Errorf("unable to connect: %v", err)
|
||||
}
|
||||
return s, nil
|
||||
}
|
||||
|
@@ -82,21 +82,21 @@ const (
|
||||
GMOND_CONFIG_FILE = `/etc/ganglia/gmond.conf`
|
||||
)
|
||||
|
||||
type LibgangliaSinkSpecialMetric struct {
|
||||
MetricName string `json:"metric_name,omitempty"`
|
||||
NewName string `json:"new_name,omitempty"`
|
||||
Slope string `json:"slope,omitempty"`
|
||||
}
|
||||
// type LibgangliaSinkSpecialMetric struct {
|
||||
// MetricName string `json:"metric_name,omitempty"`
|
||||
// NewName string `json:"new_name,omitempty"`
|
||||
// Slope string `json:"slope,omitempty"`
|
||||
// }
|
||||
|
||||
type LibgangliaSinkConfig struct {
|
||||
defaultSinkConfig
|
||||
GangliaLib string `json:"libganglia_path,omitempty"`
|
||||
GmondConfig string `json:"gmond_config,omitempty"`
|
||||
AddGangliaGroup bool `json:"add_ganglia_group,omitempty"`
|
||||
AddTypeToName bool `json:"add_type_to_name,omitempty"`
|
||||
AddUnits bool `json:"add_units,omitempty"`
|
||||
ClusterName string `json:"cluster_name,omitempty"`
|
||||
SpecialMetrics map[string]LibgangliaSinkSpecialMetric `json:"rename_metrics,omitempty"` // Map to rename metric name from key to value
|
||||
GangliaLib string `json:"libganglia_path,omitempty"`
|
||||
GmondConfig string `json:"gmond_config,omitempty"`
|
||||
AddGangliaGroup bool `json:"add_ganglia_group,omitempty"`
|
||||
AddTypeToName bool `json:"add_type_to_name,omitempty"`
|
||||
AddUnits bool `json:"add_units,omitempty"`
|
||||
ClusterName string `json:"cluster_name,omitempty"`
|
||||
//SpecialMetrics map[string]LibgangliaSinkSpecialMetric `json:"rename_metrics,omitempty"` // Map to rename metric name from key to value
|
||||
//AddTagsAsDesc bool `json:"add_tags_as_desc,omitempty"`
|
||||
}
|
||||
|
||||
@@ -109,65 +109,6 @@ type LibgangliaSink struct {
|
||||
cstrCache map[string]*C.char
|
||||
}
|
||||
|
||||
func (s *LibgangliaSink) Init(config json.RawMessage) error {
|
||||
var err error = nil
|
||||
s.name = "LibgangliaSink"
|
||||
//s.config.AddTagsAsDesc = false
|
||||
s.config.AddGangliaGroup = false
|
||||
s.config.AddTypeToName = false
|
||||
s.config.AddUnits = true
|
||||
s.config.GmondConfig = string(GMOND_CONFIG_FILE)
|
||||
s.config.GangliaLib = string(GANGLIA_LIB_NAME)
|
||||
if len(config) > 0 {
|
||||
err = json.Unmarshal(config, &s.config)
|
||||
if err != nil {
|
||||
cclog.ComponentError(s.name, "Error reading config:", err.Error())
|
||||
return err
|
||||
}
|
||||
}
|
||||
lib := dl.New(s.config.GangliaLib, GANGLIA_LIB_DL_FLAGS)
|
||||
if lib == nil {
|
||||
return fmt.Errorf("error instantiating DynamicLibrary for %s", s.config.GangliaLib)
|
||||
}
|
||||
err = lib.Open()
|
||||
if err != nil {
|
||||
return fmt.Errorf("error opening %s: %v", s.config.GangliaLib, err)
|
||||
}
|
||||
|
||||
// Set up cache for the C strings
|
||||
s.cstrCache = make(map[string]*C.char)
|
||||
// s.cstrCache["globals"] = C.CString("globals")
|
||||
|
||||
// s.cstrCache["override_hostname"] = C.CString("override_hostname")
|
||||
// s.cstrCache["override_ip"] = C.CString("override_ip")
|
||||
|
||||
// Add some constant strings
|
||||
s.cstrCache["GROUP"] = C.CString("GROUP")
|
||||
s.cstrCache["CLUSTER"] = C.CString("CLUSTER")
|
||||
s.cstrCache[""] = C.CString("")
|
||||
|
||||
// Add cluster name for lookup in Write()
|
||||
if len(s.config.ClusterName) > 0 {
|
||||
s.cstrCache[s.config.ClusterName] = C.CString(s.config.ClusterName)
|
||||
}
|
||||
// Add supported types for later lookup in Write()
|
||||
s.cstrCache["double"] = C.CString("double")
|
||||
s.cstrCache["int32"] = C.CString("int32")
|
||||
s.cstrCache["string"] = C.CString("string")
|
||||
|
||||
// Create Ganglia pool
|
||||
s.global_context = C.Ganglia_pool_create(nil)
|
||||
// Load Ganglia configuration
|
||||
s.cstrCache[s.config.GmondConfig] = C.CString(s.config.GmondConfig)
|
||||
s.gmond_config = C.Ganglia_gmond_config_create(s.cstrCache[s.config.GmondConfig], 0)
|
||||
//globals := C.cfg_getsec(gmond_config, s.cstrCache["globals"])
|
||||
//override_hostname := C.cfg_getstr(globals, s.cstrCache["override_hostname"])
|
||||
//override_ip := C.cfg_getstr(globals, s.cstrCache["override_ip"])
|
||||
|
||||
s.send_channels = C.Ganglia_udp_send_channels_create(s.global_context, s.gmond_config)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *LibgangliaSink) Write(point lp.CCMetric) error {
|
||||
var err error = nil
|
||||
var c_name *C.char
|
||||
@@ -184,72 +125,48 @@ func (s *LibgangliaSink) Write(point lp.CCMetric) error {
|
||||
}
|
||||
|
||||
// Get metric name
|
||||
metricname := GangliaMetricRename(point)
|
||||
if s.config.AddTypeToName {
|
||||
c_name = lookup(GangliaMetricName(point))
|
||||
} else {
|
||||
c_name = lookup(metricname)
|
||||
}
|
||||
metricname := GangliaMetricRename(point.Name())
|
||||
|
||||
// Get the value C string and lookup the type string in the cache
|
||||
value, ok := point.GetField("value")
|
||||
if !ok {
|
||||
conf := GetCommonGangliaConfig(point)
|
||||
if len(conf.Type) == 0 {
|
||||
conf = GetGangliaConfig(point)
|
||||
}
|
||||
if len(conf.Type) == 0 {
|
||||
return fmt.Errorf("metric %s has no 'value' field", metricname)
|
||||
}
|
||||
switch real := value.(type) {
|
||||
case float64:
|
||||
c_value = C.CString(fmt.Sprintf("%f", real))
|
||||
c_type = lookup("double")
|
||||
case float32:
|
||||
c_value = C.CString(fmt.Sprintf("%f", real))
|
||||
c_type = lookup("float")
|
||||
case int64:
|
||||
c_value = C.CString(fmt.Sprintf("%d", real))
|
||||
c_type = lookup("int32")
|
||||
case int32:
|
||||
c_value = C.CString(fmt.Sprintf("%d", real))
|
||||
c_type = lookup("int32")
|
||||
case int:
|
||||
c_value = C.CString(fmt.Sprintf("%d", real))
|
||||
c_type = lookup("int32")
|
||||
case string:
|
||||
c_value = C.CString(real)
|
||||
c_type = lookup("string")
|
||||
default:
|
||||
return fmt.Errorf("metric %s has invalid 'value' type for %s", point.Name(), s.name)
|
||||
|
||||
if s.config.AddTypeToName {
|
||||
metricname = GangliaMetricName(point)
|
||||
}
|
||||
|
||||
c_value = C.CString(conf.Value)
|
||||
c_type = lookup(conf.Type)
|
||||
c_name = lookup(metricname)
|
||||
|
||||
// Add unit
|
||||
unit := ""
|
||||
if s.config.AddUnits {
|
||||
if tagunit, tagok := point.GetTag("unit"); tagok {
|
||||
c_unit = lookup(tagunit)
|
||||
} else if metaunit, metaok := point.GetMeta("unit"); metaok {
|
||||
c_unit = lookup(metaunit)
|
||||
} else {
|
||||
c_unit = lookup("")
|
||||
}
|
||||
} else {
|
||||
c_unit = lookup("")
|
||||
unit = conf.Unit
|
||||
}
|
||||
c_unit = lookup(unit)
|
||||
|
||||
// Determine the slope of the metric. Ganglia's own collector mostly use
|
||||
// 'both' but the mem and swap total uses 'zero'.
|
||||
slope := GangliaSlopeType(point)
|
||||
slope_type := C.GANGLIA_SLOPE_BOTH
|
||||
switch slope {
|
||||
case 0:
|
||||
switch conf.Slope {
|
||||
case "zero":
|
||||
slope_type = C.GANGLIA_SLOPE_ZERO
|
||||
case "both":
|
||||
slope_type = C.GANGLIA_SLOPE_BOTH
|
||||
}
|
||||
|
||||
// Create a new Ganglia metric
|
||||
gmetric := C.Ganglia_metric_create(s.global_context)
|
||||
// Set name, value, type and unit in the Ganglia metric
|
||||
// Since we don't have this information from the collectors,
|
||||
// we assume that the metric value can go up and down (slope),
|
||||
// and there is no maximum for 'dmax' and 'tmax'.
|
||||
// Ganglia's collectors set 'tmax' but not 'dmax'
|
||||
// The default slope_type is both directions, so up and down. Some metrics want 'zero' slope, probably constant.
|
||||
// The 'tmax' value is by default 300.
|
||||
rval := C.int(0)
|
||||
rval = C.Ganglia_metric_set(gmetric, c_name, c_value, c_type, c_unit, C.uint(slope_type), 0, 0)
|
||||
rval = C.Ganglia_metric_set(gmetric, c_name, c_value, c_type, c_unit, C.uint(slope_type), C.uint(conf.Tmax), 0)
|
||||
switch rval {
|
||||
case 1:
|
||||
C.free(unsafe.Pointer(c_value))
|
||||
@@ -259,10 +176,10 @@ func (s *LibgangliaSink) Write(point lp.CCMetric) error {
|
||||
return errors.New("one of your parameters has an invalid character '\"'")
|
||||
case 3:
|
||||
C.free(unsafe.Pointer(c_value))
|
||||
return fmt.Errorf("the type parameter \"%s\" is not a valid type", C.GoString(c_type))
|
||||
return fmt.Errorf("the type parameter \"%s\" is not a valid type", conf.Type)
|
||||
case 4:
|
||||
C.free(unsafe.Pointer(c_value))
|
||||
return fmt.Errorf("the value parameter \"%s\" does not represent a number", C.GoString(c_value))
|
||||
return fmt.Errorf("the value parameter \"%s\" does not represent a number", conf.Value)
|
||||
default:
|
||||
}
|
||||
|
||||
@@ -271,8 +188,8 @@ func (s *LibgangliaSink) Write(point lp.CCMetric) error {
|
||||
C.Ganglia_metadata_add(gmetric, lookup("CLUSTER"), lookup(s.config.ClusterName))
|
||||
}
|
||||
// Set the group metadata in the Ganglia metric if configured
|
||||
if group, ok := point.GetMeta("group"); ok && s.config.AddGangliaGroup {
|
||||
c_group := lookup(group)
|
||||
if s.config.AddGangliaGroup {
|
||||
c_group := lookup(conf.Group)
|
||||
C.Ganglia_metadata_add(gmetric, lookup("GROUP"), c_group)
|
||||
}
|
||||
|
||||
@@ -307,3 +224,63 @@ func (s *LibgangliaSink) Close() {
|
||||
C.free(unsafe.Pointer(cstr))
|
||||
}
|
||||
}
|
||||
|
||||
func NewLibgangliaSink(name string, config json.RawMessage) (Sink, error) {
|
||||
s := new(LibgangliaSink)
|
||||
var err error = nil
|
||||
s.name = fmt.Sprintf("LibgangliaSink(%s)", name)
|
||||
//s.config.AddTagsAsDesc = false
|
||||
s.config.AddGangliaGroup = false
|
||||
s.config.AddTypeToName = false
|
||||
s.config.AddUnits = true
|
||||
s.config.GmondConfig = string(GMOND_CONFIG_FILE)
|
||||
s.config.GangliaLib = string(GANGLIA_LIB_NAME)
|
||||
if len(config) > 0 {
|
||||
err = json.Unmarshal(config, &s.config)
|
||||
if err != nil {
|
||||
cclog.ComponentError(s.name, "Error reading config:", err.Error())
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
lib := dl.New(s.config.GangliaLib, GANGLIA_LIB_DL_FLAGS)
|
||||
if lib == nil {
|
||||
return nil, fmt.Errorf("error instantiating DynamicLibrary for %s", s.config.GangliaLib)
|
||||
}
|
||||
err = lib.Open()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error opening %s: %v", s.config.GangliaLib, err)
|
||||
}
|
||||
|
||||
// Set up cache for the C strings
|
||||
s.cstrCache = make(map[string]*C.char)
|
||||
// s.cstrCache["globals"] = C.CString("globals")
|
||||
|
||||
// s.cstrCache["override_hostname"] = C.CString("override_hostname")
|
||||
// s.cstrCache["override_ip"] = C.CString("override_ip")
|
||||
|
||||
// Add some constant strings
|
||||
s.cstrCache["GROUP"] = C.CString("GROUP")
|
||||
s.cstrCache["CLUSTER"] = C.CString("CLUSTER")
|
||||
s.cstrCache[""] = C.CString("")
|
||||
|
||||
// Add cluster name for lookup in Write()
|
||||
if len(s.config.ClusterName) > 0 {
|
||||
s.cstrCache[s.config.ClusterName] = C.CString(s.config.ClusterName)
|
||||
}
|
||||
// Add supported types for later lookup in Write()
|
||||
s.cstrCache["double"] = C.CString("double")
|
||||
s.cstrCache["int32"] = C.CString("int32")
|
||||
s.cstrCache["string"] = C.CString("string")
|
||||
|
||||
// Create Ganglia pool
|
||||
s.global_context = C.Ganglia_pool_create(nil)
|
||||
// Load Ganglia configuration
|
||||
s.cstrCache[s.config.GmondConfig] = C.CString(s.config.GmondConfig)
|
||||
s.gmond_config = C.Ganglia_gmond_config_create(s.cstrCache[s.config.GmondConfig], 0)
|
||||
//globals := C.cfg_getsec(gmond_config, s.cstrCache["globals"])
|
||||
//override_hostname := C.cfg_getstr(globals, s.cstrCache["override_hostname"])
|
||||
//override_ip := C.cfg_getstr(globals, s.cstrCache["override_ip"])
|
||||
|
||||
s.send_channels = C.Ganglia_udp_send_channels_create(s.global_context, s.gmond_config)
|
||||
return s, nil
|
||||
}
|
||||
|
@@ -1,8 +1,6 @@
|
||||
package sinks
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
|
||||
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
|
||||
)
|
||||
|
||||
@@ -12,18 +10,18 @@ type defaultSinkConfig struct {
|
||||
}
|
||||
|
||||
type sink struct {
|
||||
meta_as_tags bool
|
||||
name string
|
||||
meta_as_tags bool // Use meta data tags as tags
|
||||
name string // Name of the sink
|
||||
}
|
||||
|
||||
type Sink interface {
|
||||
Init(config json.RawMessage) error
|
||||
Write(point lp.CCMetric) error
|
||||
Flush() error
|
||||
Close()
|
||||
Name() string
|
||||
Write(point lp.CCMetric) error // Write metric to the sink
|
||||
Flush() error // Flush buffered metrics
|
||||
Close() // Close / finish metric sink
|
||||
Name() string // Name of the metric sink
|
||||
}
|
||||
|
||||
// Name returns the name of the metric sink
|
||||
func (s *sink) Name() string {
|
||||
return s.name
|
||||
}
|
||||
|
@@ -53,30 +53,6 @@ func (s *NatsSink) connect() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *NatsSink) Init(config json.RawMessage) error {
|
||||
s.name = "NatsSink"
|
||||
if len(config) > 0 {
|
||||
err := json.Unmarshal(config, &s.config)
|
||||
if err != nil {
|
||||
cclog.ComponentError(s.name, "Error reading config for", s.name, ":", err.Error())
|
||||
return err
|
||||
}
|
||||
}
|
||||
if len(s.config.Host) == 0 ||
|
||||
len(s.config.Port) == 0 ||
|
||||
len(s.config.Database) == 0 {
|
||||
return errors.New("not all configuration variables set required by NatsSink")
|
||||
}
|
||||
// Setup Influx line protocol
|
||||
s.buffer = &bytes.Buffer{}
|
||||
s.buffer.Grow(1025)
|
||||
s.encoder = influx.NewEncoder(s.buffer)
|
||||
s.encoder.SetPrecision(time.Second)
|
||||
s.encoder.SetMaxLineBytes(1024)
|
||||
// Setup infos for connection
|
||||
return s.connect()
|
||||
}
|
||||
|
||||
func (s *NatsSink) Write(m lp.CCMetric) error {
|
||||
if s.client != nil {
|
||||
_, err := s.encoder.Encode(m.ToPoint(s.config.MetaAsTags))
|
||||
@@ -105,3 +81,31 @@ func (s *NatsSink) Close() {
|
||||
s.client.Close()
|
||||
}
|
||||
}
|
||||
|
||||
func NewNatsSink(name string, config json.RawMessage) (Sink, error) {
|
||||
s := new(NatsSink)
|
||||
s.name = fmt.Sprintf("NatsSink(%s)", name)
|
||||
if len(config) > 0 {
|
||||
err := json.Unmarshal(config, &s.config)
|
||||
if err != nil {
|
||||
cclog.ComponentError(s.name, "Error reading config for", s.name, ":", err.Error())
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
if len(s.config.Host) == 0 ||
|
||||
len(s.config.Port) == 0 ||
|
||||
len(s.config.Database) == 0 {
|
||||
return nil, errors.New("not all configuration variables set required by NatsSink")
|
||||
}
|
||||
// Setup Influx line protocol
|
||||
s.buffer = &bytes.Buffer{}
|
||||
s.buffer.Grow(1025)
|
||||
s.encoder = influx.NewEncoder(s.buffer)
|
||||
s.encoder.SetPrecision(time.Second)
|
||||
s.encoder.SetMaxLineBytes(1024)
|
||||
// Setup infos for connection
|
||||
if err := s.connect(); err != nil {
|
||||
return nil, fmt.Errorf("Unable to connect: %v", err)
|
||||
}
|
||||
return s, nil
|
||||
}
|
||||
|
199
sinks/prometheusSink.go
Normal file
199
sinks/prometheusSink.go
Normal file
@@ -0,0 +1,199 @@
|
||||
package sinks
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
|
||||
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
)
|
||||
|
||||
type PrometheusSinkConfig struct {
|
||||
defaultSinkConfig
|
||||
Host string `json:"host,omitempty"`
|
||||
Port string `json:"port"`
|
||||
Path string `json:"path,omitempty"`
|
||||
GroupAsNameSpace bool `json:"group_as_namespace,omitempty"`
|
||||
// User string `json:"user,omitempty"`
|
||||
// Password string `json:"password,omitempty"`
|
||||
// FlushDelay string `json:"flush_delay,omitempty"`
|
||||
}
|
||||
|
||||
type PrometheusSink struct {
|
||||
sink
|
||||
config PrometheusSinkConfig
|
||||
labelMetrics map[string]*prometheus.GaugeVec
|
||||
nodeMetrics map[string]prometheus.Gauge
|
||||
promWg sync.WaitGroup
|
||||
promServer *http.Server
|
||||
}
|
||||
|
||||
func intToFloat64(input interface{}) (float64, error) {
|
||||
switch value := input.(type) {
|
||||
case float64:
|
||||
return value, nil
|
||||
case float32:
|
||||
return float64(value), nil
|
||||
case int:
|
||||
return float64(value), nil
|
||||
case int32:
|
||||
return float64(value), nil
|
||||
case int64:
|
||||
return float64(value), nil
|
||||
}
|
||||
return 0, errors.New("cannot cast value to float64")
|
||||
}
|
||||
|
||||
func getLabelValue(metric lp.CCMetric) []string {
|
||||
labelValues := []string{}
|
||||
if tid, tidok := metric.GetTag("type-id"); tidok && metric.HasTag("type") {
|
||||
labelValues = append(labelValues, tid)
|
||||
}
|
||||
if d, ok := metric.GetTag("device"); ok {
|
||||
labelValues = append(labelValues, d)
|
||||
} else if d, ok := metric.GetMeta("device"); ok {
|
||||
labelValues = append(labelValues, d)
|
||||
}
|
||||
return labelValues
|
||||
}
|
||||
|
||||
func getLabelNames(metric lp.CCMetric) []string {
|
||||
labelNames := []string{}
|
||||
if t, tok := metric.GetTag("type"); tok && metric.HasTag("type-id") {
|
||||
labelNames = append(labelNames, t)
|
||||
}
|
||||
if _, ok := metric.GetTag("device"); ok {
|
||||
labelNames = append(labelNames, "device")
|
||||
} else if _, ok := metric.GetMeta("device"); ok {
|
||||
labelNames = append(labelNames, "device")
|
||||
}
|
||||
return labelNames
|
||||
}
|
||||
|
||||
func (s *PrometheusSink) newMetric(metric lp.CCMetric) error {
|
||||
var value float64 = 0
|
||||
name := metric.Name()
|
||||
opts := prometheus.GaugeOpts{
|
||||
Name: name,
|
||||
}
|
||||
labels := getLabelNames(metric)
|
||||
labelValues := getLabelValue(metric)
|
||||
if len(labels) > 0 && len(labels) != len(labelValues) {
|
||||
return fmt.Errorf("cannot detect metric labels for metric %s", name)
|
||||
}
|
||||
|
||||
if metricValue, ok := metric.GetField("value"); ok {
|
||||
if floatValue, err := intToFloat64(metricValue); err == nil {
|
||||
value = floatValue
|
||||
} else {
|
||||
return fmt.Errorf("metric %s with value '%v' cannot be casted to float64", name, metricValue)
|
||||
}
|
||||
}
|
||||
if s.config.GroupAsNameSpace && metric.HasMeta("group") {
|
||||
g, _ := metric.GetMeta("group")
|
||||
opts.Namespace = strings.ToLower(g)
|
||||
}
|
||||
|
||||
if len(labels) > 0 {
|
||||
new := prometheus.NewGaugeVec(opts, labels)
|
||||
new.WithLabelValues(labelValues...).Set(value)
|
||||
s.labelMetrics[name] = new
|
||||
prometheus.Register(new)
|
||||
} else {
|
||||
new := prometheus.NewGauge(opts)
|
||||
new.Set(value)
|
||||
s.nodeMetrics[name] = new
|
||||
prometheus.Register(new)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *PrometheusSink) updateMetric(metric lp.CCMetric) error {
|
||||
var value float64 = 0.0
|
||||
name := metric.Name()
|
||||
labelValues := getLabelValue(metric)
|
||||
|
||||
if metricValue, ok := metric.GetField("value"); ok {
|
||||
if floatValue, err := intToFloat64(metricValue); err == nil {
|
||||
value = floatValue
|
||||
} else {
|
||||
return fmt.Errorf("metric %s with value '%v' cannot be casted to float64", name, metricValue)
|
||||
}
|
||||
}
|
||||
|
||||
if len(labelValues) > 0 {
|
||||
if _, ok := s.labelMetrics[name]; !ok {
|
||||
err := s.newMetric(metric)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
s.labelMetrics[name].WithLabelValues(labelValues...).Set(value)
|
||||
} else {
|
||||
if _, ok := s.labelMetrics[name]; !ok {
|
||||
err := s.newMetric(metric)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
s.nodeMetrics[name].Set(value)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *PrometheusSink) Write(m lp.CCMetric) error {
|
||||
return s.updateMetric(m)
|
||||
}
|
||||
|
||||
func (s *PrometheusSink) Flush() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *PrometheusSink) Close() {
|
||||
cclog.ComponentDebug(s.name, "CLOSE")
|
||||
s.promServer.Shutdown(context.Background())
|
||||
s.promWg.Wait()
|
||||
}
|
||||
|
||||
func NewPrometheusSink(name string, config json.RawMessage) (Sink, error) {
|
||||
s := new(PrometheusSink)
|
||||
s.name = "PrometheusSink"
|
||||
if len(config) > 0 {
|
||||
err := json.Unmarshal(config, &s.config)
|
||||
if err != nil {
|
||||
cclog.ComponentError(s.name, "Error reading config for", s.name, ":", err.Error())
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
if len(s.config.Port) == 0 {
|
||||
err := errors.New("not all configuration variables set required by PrometheusSink")
|
||||
cclog.ComponentError(s.name, err.Error())
|
||||
return nil, err
|
||||
}
|
||||
s.labelMetrics = make(map[string]*prometheus.GaugeVec)
|
||||
s.nodeMetrics = make(map[string]prometheus.Gauge)
|
||||
s.promWg.Add(1)
|
||||
go func() {
|
||||
router := mux.NewRouter()
|
||||
// Prometheus endpoint
|
||||
router.Path("/" + s.config.Path).Handler(promhttp.Handler())
|
||||
|
||||
url := fmt.Sprintf("%s:%s", s.config.Host, s.config.Port)
|
||||
cclog.ComponentDebug(s.name, "Serving Prometheus metrics at", fmt.Sprintf("%s:%s/%s", s.config.Host, s.config.Port, s.config.Path))
|
||||
s.promServer = &http.Server{Addr: url, Handler: router}
|
||||
err := s.promServer.ListenAndServe()
|
||||
if err != nil && err.Error() != "http: Server closed" {
|
||||
cclog.ComponentError(s.name, err.Error())
|
||||
}
|
||||
s.promWg.Done()
|
||||
}()
|
||||
return s, nil
|
||||
}
|
23
sinks/prometheusSink.md
Normal file
23
sinks/prometheusSink.md
Normal file
@@ -0,0 +1,23 @@
|
||||
## `prometheus` sink
|
||||
|
||||
The `prometheus` sink publishes all metrics via an HTTP server ready to be scraped by a [Prometheus](https://prometheus.io) server. It creates gauge metrics for all node metrics and gauge vectors for all metrics with a subtype like 'device', 'cpu' or 'socket'.
|
||||
|
||||
|
||||
### Configuration structure
|
||||
|
||||
```json
|
||||
{
|
||||
"<name>": {
|
||||
"type": "prometheus",
|
||||
"host": "localhost",
|
||||
"port": "8080",
|
||||
"path": "metrics"
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
- `type`: makes the sink an `prometheus` sink
|
||||
- `host`: The HTTP server gets bound to that IP/hostname
|
||||
- `port`: Portnumber (as string) for the HTTP server
|
||||
- `path`: Path where the metrics should be servered. The metrics will be published at `host`:`port`/`path`
|
||||
- `group_as_namespace`: Most metrics contain a group as meta information like 'memory', 'load'. With this the metric names are extended to `group`_`name` if possible.
|
73
sinks/sampleSink.go
Normal file
73
sinks/sampleSink.go
Normal file
@@ -0,0 +1,73 @@
|
||||
package sinks
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
|
||||
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
|
||||
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
|
||||
)
|
||||
|
||||
type SampleSinkConfig struct {
|
||||
// defines JSON tags for 'type' and 'meta_as_tags'
|
||||
// See: metricSink.go
|
||||
defaultSinkConfig
|
||||
// Additional config options, for SampleSink
|
||||
}
|
||||
|
||||
type SampleSink struct {
|
||||
// declares elements 'name' and 'meta_as_tags'
|
||||
sink
|
||||
config SampleSinkConfig // entry point to the SampleSinkConfig
|
||||
}
|
||||
|
||||
// Implement functions required for Sink interface
|
||||
// Write(...), Flush(), Close()
|
||||
// See: metricSink.go
|
||||
|
||||
// Code to submit a single CCMetric to the sink
|
||||
func (s *SampleSink) Write(point lp.CCMetric) error {
|
||||
log.Print(point)
|
||||
return nil
|
||||
}
|
||||
|
||||
// If the sink uses batched sends internally, you can tell to flush its buffers
|
||||
func (s *SampleSink) Flush() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close sink: close network connection, close files, close libraries, ...
|
||||
func (s *SampleSink) Close() {
|
||||
cclog.ComponentDebug(s.name, "CLOSE")
|
||||
}
|
||||
|
||||
// New function to create a new instance of the sink
|
||||
// Initialize the sink by giving it a name and reading in the config JSON
|
||||
func NewSampleSink(name string, config json.RawMessage) (Sink, error) {
|
||||
s := new(SampleSink)
|
||||
|
||||
// Set name of sampleSink
|
||||
// The name should be chosen in such a way that different instances of SampleSink can be distinguished
|
||||
s.name = fmt.Sprintf("SampleSink(%s)", name) // Always specify a name here
|
||||
|
||||
// Set defaults in s.config
|
||||
// Allow overwriting these defaults by reading config JSON
|
||||
|
||||
// Read in the config JSON
|
||||
if len(config) > 0 {
|
||||
err := json.Unmarshal(config, &s.config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// Check if all required fields in the config are set
|
||||
// E.g. use 'len(s.config.Option) > 0' for string settings
|
||||
|
||||
// Establish connection to the server, library, ...
|
||||
// Check required files exist and lookup path(s) of executable(s)
|
||||
|
||||
// Return (nil, meaningful error message) in case of errors
|
||||
return s, nil
|
||||
}
|
@@ -13,14 +13,14 @@ import (
|
||||
const SINK_MAX_FORWARD = 50
|
||||
|
||||
// Map of all available sinks
|
||||
var AvailableSinks = map[string]Sink{
|
||||
"influxdb": new(InfluxSink),
|
||||
"stdout": new(StdoutSink),
|
||||
"nats": new(NatsSink),
|
||||
"http": new(HttpSink),
|
||||
"ganglia": new(GangliaSink),
|
||||
"influxasync": new(InfluxAsyncSink),
|
||||
"libganglia": new(LibgangliaSink),
|
||||
var AvailableSinks = map[string]func(name string, config json.RawMessage) (Sink, error){
|
||||
"ganglia": NewGangliaSink,
|
||||
"libganglia": NewLibgangliaSink,
|
||||
"stdout": NewStdoutSink,
|
||||
"nats": NewNatsSink,
|
||||
"influxdb": NewInfluxSink,
|
||||
"influxasync": NewInfluxAsyncSink,
|
||||
"http": NewHttpSink,
|
||||
}
|
||||
|
||||
// Metric collector manager data structure
|
||||
@@ -149,8 +149,7 @@ func (sm *sinkManager) AddOutput(name string, rawConfig json.RawMessage) error {
|
||||
cclog.ComponentError("SinkManager", "SKIP", name, "unknown sink:", sinkConfig.Type)
|
||||
return err
|
||||
}
|
||||
s := AvailableSinks[sinkConfig.Type]
|
||||
err = s.Init(rawConfig)
|
||||
s, err := AvailableSinks[sinkConfig.Type](name, rawConfig)
|
||||
if err != nil {
|
||||
cclog.ComponentError("SinkManager", "SKIP", s.Name(), "initialization failed:", err.Error())
|
||||
return err
|
||||
|
@@ -19,34 +19,6 @@ type StdoutSink struct {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *StdoutSink) Init(config json.RawMessage) error {
|
||||
s.name = "StdoutSink"
|
||||
if len(config) > 0 {
|
||||
err := json.Unmarshal(config, &s.config)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
s.output = os.Stdout
|
||||
if len(s.config.Output) > 0 {
|
||||
switch strings.ToLower(s.config.Output) {
|
||||
case "stdout":
|
||||
s.output = os.Stdout
|
||||
case "stderr":
|
||||
s.output = os.Stderr
|
||||
default:
|
||||
f, err := os.OpenFile(s.config.Output, os.O_CREATE|os.O_WRONLY, os.FileMode(0600))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s.output = f
|
||||
}
|
||||
}
|
||||
s.meta_as_tags = s.config.MetaAsTags
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *StdoutSink) Write(m lp.CCMetric) error {
|
||||
fmt.Fprint(
|
||||
s.output,
|
||||
@@ -65,3 +37,33 @@ func (s *StdoutSink) Close() {
|
||||
s.output.Close()
|
||||
}
|
||||
}
|
||||
|
||||
func NewStdoutSink(name string, config json.RawMessage) (Sink, error) {
|
||||
s := new(StdoutSink)
|
||||
s.name = fmt.Sprintf("StdoutSink(%s)", name)
|
||||
if len(config) > 0 {
|
||||
err := json.Unmarshal(config, &s.config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
s.output = os.Stdout
|
||||
if len(s.config.Output) > 0 {
|
||||
switch strings.ToLower(s.config.Output) {
|
||||
case "stdout":
|
||||
s.output = os.Stdout
|
||||
case "stderr":
|
||||
s.output = os.Stderr
|
||||
default:
|
||||
f, err := os.OpenFile(s.config.Output, os.O_CREATE|os.O_WRONLY, os.FileMode(0600))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
s.output = f
|
||||
}
|
||||
}
|
||||
s.meta_as_tags = s.config.MetaAsTags
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
Reference in New Issue
Block a user