Merge latest development changes to main branch (#79)

* Cleanup: Remove unused code

* Use Golang duration parser for 'interval' and 'duration'
 in main config

* Update handling of LIKWID headers. Download only if not already present in the system. Fixes #73

* Units with cc-units (#64)

* Add option to normalize units with cc-unit

* Add unit conversion to router

* Add option to change unit prefix in the router

* Add to MetricRouter README

* Add order of operations in router to README

* Use second add_tags/del_tags only if metric gets renamed

* Skip disks in DiskstatCollector that have size=0

* Check readability of sensor files in TempCollector

* Fix for --once option

* Rename `cpu` type to `hwthread` (#69)

* Rename 'cpu' type to 'hwthread' to avoid naming clashes with MetricStore and CC-Webfrontend

* Collectors in parallel (#74)

* Provide info to CollectorManager whether the collector can be executed in parallel with others

* Split serial and parallel collectors. Read in parallel first

* Update NvidiaCollector with new metrics, MIG and NvLink support (#75)

* CC topology module update (#76)

* Rename CPU to hardware thread, write some comments

* Do renaming in other parts

* Remove CpuList and SocketList function from metricCollector. Available in ccTopology

* Option to use MIG UUID as subtype-id in NvidiaCollector

* Option to use MIG slice name as subtype-id in NvidiaCollector

* MetricRouter: Fix JSON in README

* Fix for Github Action to really use the selected version

* Remove Ganglia installation in runonce Action and add Go 1.18

* Fix daemon options in init script

* Add separate go.mod files to use it with deprecated 1.16

* Minor updates for Makefiles

* fix string comparison

* AMD ROCm SMI collector (#77)

* Add collector for AMD ROCm SMI metrics

* Fix import path

* Fix imports

* Remove Board Number

* store GPU index explicitly

* Remove board number from description

* Use http instead of ftp to download likwid

* Fix serial number in rocmCollector

* Improved http sink (#78)

* automatic flush in NatsSink

* tweak default options of HttpSink

* shorter cirt. section and retries for HttpSink

* fix error handling

* Remove file added by mistake.

* Use http instead of ftp to download likwid

* Fix serial number in rocmCollector

Co-authored-by: Thomas Roehl <thomas.roehl@fau.de>

Co-authored-by: Holger Obermaier <40787752+ho-ob@users.noreply.github.com>
Co-authored-by: Lou <lou.knauer@gmx.de>
This commit is contained in:
Thomas Gruber 2022-06-08 15:25:40 +02:00 committed by GitHub
parent 186a62a86b
commit 8d85bd53f1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
51 changed files with 2097 additions and 705 deletions

View File

@ -3,6 +3,6 @@
"collectors" : ".github/ci-collectors.json", "collectors" : ".github/ci-collectors.json",
"receivers" : ".github/ci-receivers.json", "receivers" : ".github/ci-receivers.json",
"router" : ".github/ci-router.json", "router" : ".github/ci-router.json",
"interval": 5, "interval": "5s",
"duration": 1 "duration": "1s"
} }

View File

@ -1,6 +1,8 @@
{ {
"testoutput" : { "testoutput" : {
"type" : "stdout", "type" : "stdout",
"meta_as_tags" : true "meta_as_tags" : [
"unit"
]
} }
} }

View File

@ -7,6 +7,32 @@ name: Run Test
on: push on: push
jobs: jobs:
#
# Job build-1-18
# Build on latest Ubuntu using golang version 1.18
#
build-1-18:
runs-on: ubuntu-latest
steps:
# See: https://github.com/marketplace/actions/checkout
# Checkout git repository and submodules
- name: Checkout
uses: actions/checkout@v2
with:
submodules: recursive
# See: https://github.com/marketplace/actions/setup-go-environment
- name: Setup Golang
uses: actions/setup-go@v3
with:
go-version: '1.18.2'
- name: Build MetricCollector
run: make
- name: Run MetricCollector once
run: ./cc-metric-collector --once --config .github/ci-config.json
# #
# Job build-1-17 # Job build-1-17
# Build on latest Ubuntu using golang version 1.17 # Build on latest Ubuntu using golang version 1.17
@ -23,13 +49,9 @@ jobs:
# See: https://github.com/marketplace/actions/setup-go-environment # See: https://github.com/marketplace/actions/setup-go-environment
- name: Setup Golang - name: Setup Golang
uses: actions/setup-go@v2 uses: actions/setup-go@v3
with: with:
go-version: '^1.17.7' go-version: '1.17.7'
# Install libganglia
- name: Setup Ganglia
run: sudo apt install ganglia-monitor libganglia1
- name: Build MetricCollector - name: Build MetricCollector
run: make run: make
@ -53,13 +75,9 @@ jobs:
# See: https://github.com/marketplace/actions/setup-go-environment # See: https://github.com/marketplace/actions/setup-go-environment
- name: Setup Golang - name: Setup Golang
uses: actions/setup-go@v2 uses: actions/setup-go@v3
with: with:
go-version: '^1.16.7' # The version AlmaLinux 8.5 uses go-version: '1.16.7' # The version AlmaLinux 8.5 uses
# Install libganglia
- name: Setup Ganglia
run: sudo apt install ganglia-monitor libganglia1
- name: Build MetricCollector - name: Build MetricCollector
run: make run: make

View File

@ -16,15 +16,23 @@ COMPONENT_DIRS := collectors \
internal/multiChanTicker internal/multiChanTicker
BINDIR = bin BINDIR = bin
GOBIN = $(shell which go)
.PHONY: all .PHONY: all
all: $(APP) all: $(APP)
$(APP): $(GOSRC) $(APP): $(GOSRC)
if [ "$(shell $(GOBIN) version | cut -d' ' -f 3 | cut -d '.' -f1-2)" = "go1.16" ]; then \
echo "1.16"; \
cp go.mod.1.16 go.mod; \
else \
echo "1.17+"; \
cp go.mod.1.17+ go.mod; \
fi
make -C collectors make -C collectors
go get $(GOBIN) get
go build -o $(APP) $(GOSRC_APP) $(GOBIN) build -o $(APP) $(GOSRC_APP)
install: $(APP) install: $(APP)
@WORKSPACE=$(PREFIX) @WORKSPACE=$(PREFIX)
@ -51,25 +59,25 @@ clean:
.PHONY: fmt .PHONY: fmt
fmt: fmt:
go fmt $(GOSRC_COLLECTORS) $(GOBIN) fmt $(GOSRC_COLLECTORS)
go fmt $(GOSRC_SINKS) $(GOBIN) fmt $(GOSRC_SINKS)
go fmt $(GOSRC_RECEIVERS) $(GOBIN) fmt $(GOSRC_RECEIVERS)
go fmt $(GOSRC_APP) $(GOBIN) fmt $(GOSRC_APP)
@for F in $(GOSRC_INTERNAL); do go fmt $$F; done @for F in $(GOSRC_INTERNAL); do $(GOBIN) fmt $$F; done
# Examine Go source code and reports suspicious constructs # Examine Go source code and reports suspicious constructs
.PHONY: vet .PHONY: vet
vet: vet:
go vet ./... $(GOBIN) vet ./...
# Run linter for the Go programming language. # Run linter for the Go programming language.
# Using static analysis, it finds bugs and performance issues, offers simplifications, and enforces style rules # Using static analysis, it finds bugs and performance issues, offers simplifications, and enforces style rules
.PHONY: staticcheck .PHONY: staticcheck
staticcheck: staticcheck:
go install honnef.co/go/tools/cmd/staticcheck@latest $(GOBIN) install honnef.co/go/tools/cmd/staticcheck@latest
$$(go env GOPATH)/bin/staticcheck ./... $$($(GOBIN) env GOPATH)/bin/staticcheck ./...
.ONESHELL: .ONESHELL:
.PHONY: RPM .PHONY: RPM

View File

@ -22,8 +22,8 @@ import (
) )
type CentralConfigFile struct { type CentralConfigFile struct {
Interval int `json:"interval"` Interval string `json:"interval"`
Duration int `json:"duration"` Duration string `json:"duration"`
CollectorConfigFile string `json:"collectors"` CollectorConfigFile string `json:"collectors"`
RouterConfigFile string `json:"router"` RouterConfigFile string `json:"router"`
SinkConfigFile string `json:"sinks"` SinkConfigFile string `json:"sinks"`
@ -173,16 +173,36 @@ func mainFunc() int {
cclog.Error("Error reading configuration file ", rcfg.CliArgs["configfile"], ": ", err.Error()) cclog.Error("Error reading configuration file ", rcfg.CliArgs["configfile"], ": ", err.Error())
return 1 return 1
} }
if rcfg.ConfigFile.Interval <= 0 || time.Duration(rcfg.ConfigFile.Interval)*time.Second <= 0 {
// Properly use duration parser with inputs like '60s', '5m' or similar
if len(rcfg.ConfigFile.Interval) > 0 {
t, err := time.ParseDuration(rcfg.ConfigFile.Interval)
if err != nil {
cclog.Error("Configuration value 'interval' no valid duration")
}
rcfg.Interval = t
if rcfg.Interval == 0 {
cclog.Error("Configuration value 'interval' must be greater than zero") cclog.Error("Configuration value 'interval' must be greater than zero")
return 1 return 1
} }
rcfg.Interval = time.Duration(rcfg.ConfigFile.Interval) * time.Second }
if rcfg.ConfigFile.Duration <= 0 || time.Duration(rcfg.ConfigFile.Duration)*time.Second <= 0 {
// Properly use duration parser with inputs like '60s', '5m' or similar
if len(rcfg.ConfigFile.Duration) > 0 {
t, err := time.ParseDuration(rcfg.ConfigFile.Duration)
if err != nil {
cclog.Error("Configuration value 'duration' no valid duration")
}
rcfg.Duration = t
if rcfg.Duration == 0 {
cclog.Error("Configuration value 'duration' must be greater than zero") cclog.Error("Configuration value 'duration' must be greater than zero")
return 1 return 1
} }
rcfg.Duration = time.Duration(rcfg.ConfigFile.Duration) * time.Second }
if rcfg.Duration > rcfg.Interval {
cclog.Error("The interval should be greater than duration")
return 1
}
if len(rcfg.ConfigFile.RouterConfigFile) == 0 { if len(rcfg.ConfigFile.RouterConfigFile) == 0 {
cclog.Error("Metric router configuration file must be set") cclog.Error("Metric router configuration file must be set")
@ -271,7 +291,7 @@ func mainFunc() int {
// Wait until one tick has passed. This is a workaround // Wait until one tick has passed. This is a workaround
if rcfg.CliArgs["once"] == "true" { if rcfg.CliArgs["once"] == "true" {
x := 1.2 * float64(rcfg.ConfigFile.Interval) x := 1.2 * float64(rcfg.Interval.Seconds())
time.Sleep(time.Duration(int(x)) * time.Second) time.Sleep(time.Duration(int(x)) * time.Second)
shutdownSignal <- os.Interrupt shutdownSignal <- os.Interrupt
} }

View File

@ -1,22 +1,28 @@
all: likwid
# LIKWID version # LIKWID version
LIKWID_VERSION = 5.2.1 LIKWID_VERSION = 5.2.1
LIKWID_INSTALLED_FOLDER=$(shell dirname $(shell which likwid-topology 2>/dev/null) 2>/dev/null)
LIKWID_FOLDER="$(shell pwd)/likwid"
all: $(LIKWID_FOLDER)/likwid.h
.ONESHELL: .ONESHELL:
.PHONY: likwid .PHONY: $(LIKWID_FOLDER)/likwid.h
likwid: $(LIKWID_FOLDER)/likwid.h:
INSTALL_FOLDER="$${PWD}/likwid" if [ "$(LIKWID_INSTALLED_FOLDER)" != "" ]; then \
BUILD_FOLDER="$${PWD}/likwidbuild" BASE="$(LIKWID_INSTALLED_FOLDER)/../include"; \
if [ -d $${INSTALL_FOLDER} ]; then rm -r $${INSTALL_FOLDER}; fi mkdir -p $(LIKWID_FOLDER); \
mkdir --parents --verbose $${INSTALL_FOLDER} $${BUILD_FOLDER} cp $$BASE/*.h $(LIKWID_FOLDER); \
wget -P "$${BUILD_FOLDER}" ftp://ftp.rrze.uni-erlangen.de/mirrors/likwid/likwid-$(LIKWID_VERSION).tar.gz else \
tar -C $${BUILD_FOLDER} -xf $${BUILD_FOLDER}/likwid-$(LIKWID_VERSION).tar.gz BUILD_FOLDER="$${PWD}/likwidbuild"; \
install -Dpm 0644 $${BUILD_FOLDER}/likwid-$(LIKWID_VERSION)/src/includes/likwid*.h $${INSTALL_FOLDER}/ if [ -d $(LIKWID_FOLDER) ]; then rm -r $(LIKWID_FOLDER); fi; \
install -Dpm 0644 $${BUILD_FOLDER}/likwid-$(LIKWID_VERSION)/src/includes/bstrlib.h $${INSTALL_FOLDER}/ mkdir --parents --verbose $(LIKWID_FOLDER) $${BUILD_FOLDER}; \
rm -r $${BUILD_FOLDER} wget -P "$${BUILD_FOLDER}" http://ftp.rrze.uni-erlangen.de/mirrors/likwid/likwid-$(LIKWID_VERSION).tar.gz; \
tar -C $${BUILD_FOLDER} -xf $${BUILD_FOLDER}/likwid-$(LIKWID_VERSION).tar.gz; \
install -Dpm 0644 $${BUILD_FOLDER}/likwid-$(LIKWID_VERSION)/src/includes/likwid*.h $(LIKWID_FOLDER)/; \
install -Dpm 0644 $${BUILD_FOLDER}/likwid-$(LIKWID_VERSION)/src/includes/bstrlib.h $(LIKWID_FOLDER)/; \
rm -r $${BUILD_FOLDER}; \
fi
clean: clean:

View File

@ -39,6 +39,7 @@ In contrast to the configuration files for sinks and receivers, the collectors c
* [`gpfs`](./gpfsMetric.md) * [`gpfs`](./gpfsMetric.md)
* [`beegfs_meta`](./beegfsmetaMetric.md) * [`beegfs_meta`](./beegfsmetaMetric.md)
* [`beegfs_storage`](./beegfsstorageMetric.md) * [`beegfs_storage`](./beegfsstorageMetric.md)
* [`rocm_smi`](./rocmsmiMetric.md)
## Todos ## Todos

View File

@ -55,6 +55,7 @@ func (m *BeegfsMetaCollector) Init(config json.RawMessage) error {
m.name = "BeegfsMetaCollector" m.name = "BeegfsMetaCollector"
m.setup() m.setup()
m.parallel = true
// Set default beegfs-ctl binary // Set default beegfs-ctl binary
m.config.Beegfs = DEFAULT_BEEGFS_CMD m.config.Beegfs = DEFAULT_BEEGFS_CMD

View File

@ -48,6 +48,7 @@ func (m *BeegfsStorageCollector) Init(config json.RawMessage) error {
m.name = "BeegfsStorageCollector" m.name = "BeegfsStorageCollector"
m.setup() m.setup()
m.parallel = true
// Set default beegfs-ctl binary // Set default beegfs-ctl binary
m.config.Beegfs = DEFAULT_BEEGFS_CMD m.config.Beegfs = DEFAULT_BEEGFS_CMD

View File

@ -36,17 +36,21 @@ var AvailableCollectors = map[string]MetricCollector{
"numastats": new(NUMAStatsCollector), "numastats": new(NUMAStatsCollector),
"beegfs_meta": new(BeegfsMetaCollector), "beegfs_meta": new(BeegfsMetaCollector),
"beegfs_storage": new(BeegfsStorageCollector), "beegfs_storage": new(BeegfsStorageCollector),
"rocm_smi": new(RocmSmiCollector),
} }
// Metric collector manager data structure // Metric collector manager data structure
type collectorManager struct { type collectorManager struct {
collectors []MetricCollector // List of metric collectors to use collectors []MetricCollector // List of metric collectors to read in parallel
serial []MetricCollector // List of metric collectors to read serially
output chan lp.CCMetric // Output channels output chan lp.CCMetric // Output channels
done chan bool // channel to finish / stop metric collector manager done chan bool // channel to finish / stop metric collector manager
ticker mct.MultiChanTicker // periodically ticking once each interval ticker mct.MultiChanTicker // periodically ticking once each interval
duration time.Duration // duration (for metrics that measure over a given duration) duration time.Duration // duration (for metrics that measure over a given duration)
wg *sync.WaitGroup // wait group for all goroutines in cc-metric-collector wg *sync.WaitGroup // wait group for all goroutines in cc-metric-collector
config map[string]json.RawMessage // json encoded config for collector manager config map[string]json.RawMessage // json encoded config for collector manager
collector_wg sync.WaitGroup // internally used wait group for the parallel reading of collector
parallel_run bool // Flag whether the collectors are currently read in parallel
} }
// Metric collector manager access functions // Metric collector manager access functions
@ -66,6 +70,7 @@ type CollectorManager interface {
// Initialization is done for all configured collectors // Initialization is done for all configured collectors
func (cm *collectorManager) Init(ticker mct.MultiChanTicker, duration time.Duration, wg *sync.WaitGroup, collectConfigFile string) error { func (cm *collectorManager) Init(ticker mct.MultiChanTicker, duration time.Duration, wg *sync.WaitGroup, collectConfigFile string) error {
cm.collectors = make([]MetricCollector, 0) cm.collectors = make([]MetricCollector, 0)
cm.serial = make([]MetricCollector, 0)
cm.output = nil cm.output = nil
cm.done = make(chan bool) cm.done = make(chan bool)
cm.wg = wg cm.wg = wg
@ -100,7 +105,11 @@ func (cm *collectorManager) Init(ticker mct.MultiChanTicker, duration time.Durat
continue continue
} }
cclog.ComponentDebug("CollectorManager", "ADD COLLECTOR", collector.Name()) cclog.ComponentDebug("CollectorManager", "ADD COLLECTOR", collector.Name())
if collector.Parallel() {
cm.collectors = append(cm.collectors, collector) cm.collectors = append(cm.collectors, collector)
} else {
cm.serial = append(cm.serial, collector)
}
} }
return nil return nil
} }
@ -116,6 +125,10 @@ func (cm *collectorManager) Start() {
// Collector manager is done // Collector manager is done
done := func() { done := func() {
// close all metric collectors // close all metric collectors
if cm.parallel_run {
cm.collector_wg.Wait()
cm.parallel_run = false
}
for _, c := range cm.collectors { for _, c := range cm.collectors {
c.Close() c.Close()
} }
@ -130,7 +143,26 @@ func (cm *collectorManager) Start() {
done() done()
return return
case t := <-tick: case t := <-tick:
cm.parallel_run = true
for _, c := range cm.collectors { for _, c := range cm.collectors {
// Wait for done signal or execute the collector
select {
case <-cm.done:
done()
return
default:
// Read metrics from collector c via goroutine
cclog.ComponentDebug("CollectorManager", c.Name(), t)
cm.collector_wg.Add(1)
go func(myc MetricCollector) {
myc.Read(cm.duration, cm.output)
cm.collector_wg.Done()
}(c)
}
}
cm.collector_wg.Wait()
cm.parallel_run = false
for _, c := range cm.serial {
// Wait for done signal or execute the collector // Wait for done signal or execute the collector
select { select {
case <-cm.done: case <-cm.done:

View File

@ -48,6 +48,7 @@ func (m *CPUFreqCpuInfoCollector) Init(config json.RawMessage) error {
m.setup() m.setup()
m.name = "CPUFreqCpuInfoCollector" m.name = "CPUFreqCpuInfoCollector"
m.parallel = true
m.meta = map[string]string{ m.meta = map[string]string{
"source": m.name, "source": m.name,
"group": "CPU", "group": "CPU",
@ -150,7 +151,7 @@ func (m *CPUFreqCpuInfoCollector) Init(config json.RawMessage) error {
t.numNonHT = numNonHT t.numNonHT = numNonHT
t.numNonHT_int = numNonHT_int t.numNonHT_int = numNonHT_int
t.tagSet = map[string]string{ t.tagSet = map[string]string{
"type": "cpu", "type": "hwthread",
"type-id": t.processor, "type-id": t.processor,
"package_id": t.physicalPackageID, "package_id": t.physicalPackageID,
} }

View File

@ -4,7 +4,7 @@
"cpufreq_cpuinfo": {} "cpufreq_cpuinfo": {}
``` ```
The `cpufreq_cpuinfo` collector reads the clock frequency from `/proc/cpuinfo` and outputs a handful **cpu** metrics. The `cpufreq_cpuinfo` collector reads the clock frequency from `/proc/cpuinfo` and outputs a handful **hwthread** metrics.
Metrics: Metrics:
* `cpufreq` * `cpufreq`

View File

@ -53,6 +53,7 @@ func (m *CPUFreqCollector) Init(config json.RawMessage) error {
m.name = "CPUFreqCollector" m.name = "CPUFreqCollector"
m.setup() m.setup()
m.parallel = true
if len(config) > 0 { if len(config) > 0 {
err := json.Unmarshal(config, &m.config) err := json.Unmarshal(config, &m.config)
if err != nil { if err != nil {
@ -161,7 +162,7 @@ func (m *CPUFreqCollector) Init(config json.RawMessage) error {
t.numNonHT = numNonHT t.numNonHT = numNonHT
t.numNonHT_int = numNonHT_int t.numNonHT_int = numNonHT_int
t.tagSet = map[string]string{ t.tagSet = map[string]string{
"type": "cpu", "type": "hwthread",
"type-id": t.processor, "type-id": t.processor,
"package_id": t.physicalPackageID, "package_id": t.physicalPackageID,
} }

View File

@ -5,7 +5,7 @@
} }
``` ```
The `cpufreq` collector reads the clock frequency from `/sys/devices/system/cpu/cpu*/cpufreq` and outputs a handful **cpu** metrics. The `cpufreq` collector reads the clock frequency from `/sys/devices/system/cpu/cpu*/cpufreq` and outputs a handful **hwthread** metrics.
Metrics: Metrics:
* `cpufreq` * `cpufreq`

View File

@ -30,6 +30,7 @@ type CpustatCollector struct {
func (m *CpustatCollector) Init(config json.RawMessage) error { func (m *CpustatCollector) Init(config json.RawMessage) error {
m.name = "CpustatCollector" m.name = "CpustatCollector"
m.setup() m.setup()
m.parallel = true
m.meta = map[string]string{"source": m.name, "group": "CPU", "unit": "Percent"} m.meta = map[string]string{"source": m.name, "group": "CPU", "unit": "Percent"}
m.nodetags = map[string]string{"type": "node"} m.nodetags = map[string]string{"type": "node"}
if len(config) > 0 { if len(config) > 0 {
@ -82,7 +83,7 @@ func (m *CpustatCollector) Init(config json.RawMessage) error {
if strings.HasPrefix(linefields[0], "cpu") && strings.Compare(linefields[0], "cpu") != 0 { if strings.HasPrefix(linefields[0], "cpu") && strings.Compare(linefields[0], "cpu") != 0 {
cpustr := strings.TrimLeft(linefields[0], "cpu") cpustr := strings.TrimLeft(linefields[0], "cpu")
cpu, _ := strconv.Atoi(cpustr) cpu, _ := strconv.Atoi(cpustr)
m.cputags[linefields[0]] = map[string]string{"type": "cpu", "type-id": fmt.Sprintf("%d", cpu)} m.cputags[linefields[0]] = map[string]string{"type": "hwthread", "type-id": fmt.Sprintf("%d", cpu)}
num_cpus++ num_cpus++
} }
} }

View File

@ -33,6 +33,7 @@ type CustomCmdCollector struct {
func (m *CustomCmdCollector) Init(config json.RawMessage) error { func (m *CustomCmdCollector) Init(config json.RawMessage) error {
var err error var err error
m.name = "CustomCmdCollector" m.name = "CustomCmdCollector"
m.parallel = true
m.meta = map[string]string{"source": m.name, "group": "Custom"} m.meta = map[string]string{"source": m.name, "group": "Custom"}
if len(config) > 0 { if len(config) > 0 {
err = json.Unmarshal(config, &m.config) err = json.Unmarshal(config, &m.config)

View File

@ -29,6 +29,7 @@ type DiskstatCollector struct {
func (m *DiskstatCollector) Init(config json.RawMessage) error { func (m *DiskstatCollector) Init(config json.RawMessage) error {
m.name = "DiskstatCollector" m.name = "DiskstatCollector"
m.parallel = true
m.meta = map[string]string{"source": m.name, "group": "Disk"} m.meta = map[string]string{"source": m.name, "group": "Disk"}
m.setup() m.setup()
if len(config) > 0 { if len(config) > 0 {
@ -77,11 +78,18 @@ func (m *DiskstatCollector) Read(interval time.Duration, output chan lp.CCMetric
continue continue
} }
path := strings.Replace(linefields[1], `\040`, " ", -1) path := strings.Replace(linefields[1], `\040`, " ", -1)
stat := syscall.Statfs_t{} stat := syscall.Statfs_t{
Blocks: 0,
Bsize: 0,
Bfree: 0,
}
err := syscall.Statfs(path, &stat) err := syscall.Statfs(path, &stat)
if err != nil { if err != nil {
continue continue
} }
if stat.Blocks == 0 || stat.Bsize == 0 {
continue
}
tags := map[string]string{"type": "node", "device": linefields[0]} tags := map[string]string{"type": "node", "device": linefields[0]}
total := (stat.Blocks * uint64(stat.Bsize)) / uint64(1000000000) total := (stat.Blocks * uint64(stat.Bsize)) / uint64(1000000000)
y, err := lp.New("disk_total", tags, m.meta, map[string]interface{}{"value": total}, time.Now()) y, err := lp.New("disk_total", tags, m.meta, map[string]interface{}{"value": total}, time.Now())
@ -95,11 +103,13 @@ func (m *DiskstatCollector) Read(interval time.Duration, output chan lp.CCMetric
y.AddMeta("unit", "GBytes") y.AddMeta("unit", "GBytes")
output <- y output <- y
} }
if total > 0 {
perc := (100 * (total - free)) / total perc := (100 * (total - free)) / total
if perc > part_max_used { if perc > part_max_used {
part_max_used = perc part_max_used = perc
} }
} }
}
y, err := lp.New("part_max_used", map[string]string{"type": "node"}, m.meta, map[string]interface{}{"value": int(part_max_used)}, time.Now()) y, err := lp.New("part_max_used", map[string]string{"type": "node"}, m.meta, map[string]interface{}{"value": int(part_max_used)}, time.Now())
if err == nil { if err == nil {
y.AddMeta("unit", "percent") y.AddMeta("unit", "percent")

View File

@ -46,6 +46,7 @@ func (m *GpfsCollector) Init(config json.RawMessage) error {
var err error var err error
m.name = "GpfsCollector" m.name = "GpfsCollector"
m.setup() m.setup()
m.parallel = true
// Set default mmpmon binary // Set default mmpmon binary
m.config.Mmpmon = DEFAULT_GPFS_CMD m.config.Mmpmon = DEFAULT_GPFS_CMD

View File

@ -54,6 +54,7 @@ func (m *InfinibandCollector) Init(config json.RawMessage) error {
var err error var err error
m.name = "InfinibandCollector" m.name = "InfinibandCollector"
m.setup() m.setup()
m.parallel = true
m.meta = map[string]string{ m.meta = map[string]string{
"source": m.name, "source": m.name,
"group": "Network", "group": "Network",

View File

@ -37,6 +37,7 @@ type IOstatCollector struct {
func (m *IOstatCollector) Init(config json.RawMessage) error { func (m *IOstatCollector) Init(config json.RawMessage) error {
var err error var err error
m.name = "IOstatCollector" m.name = "IOstatCollector"
m.parallel = true
m.meta = map[string]string{"source": m.name, "group": "Disk"} m.meta = map[string]string{"source": m.name, "group": "Disk"}
m.setup() m.setup()
if len(config) > 0 { if len(config) > 0 {

View File

@ -34,6 +34,7 @@ type IpmiCollector struct {
func (m *IpmiCollector) Init(config json.RawMessage) error { func (m *IpmiCollector) Init(config json.RawMessage) error {
m.name = "IpmiCollector" m.name = "IpmiCollector"
m.setup() m.setup()
m.parallel = true
m.meta = map[string]string{"source": m.name, "group": "IPMI"} m.meta = map[string]string{"source": m.name, "group": "IPMI"}
m.config.IpmitoolPath = string(IPMITOOL_PATH) m.config.IpmitoolPath = string(IPMITOOL_PATH)
m.config.IpmisensorsPath = string(IPMISENSORS_PATH) m.config.IpmisensorsPath = string(IPMISENSORS_PATH)

View File

@ -177,6 +177,7 @@ func getBaseFreq() float64 {
func (m *LikwidCollector) Init(config json.RawMessage) error { func (m *LikwidCollector) Init(config json.RawMessage) error {
m.name = "LikwidCollector" m.name = "LikwidCollector"
m.parallel = false
m.initialized = false m.initialized = false
m.running = false m.running = false
m.config.AccessMode = LIKWID_DEF_ACCESSMODE m.config.AccessMode = LIKWID_DEF_ACCESSMODE
@ -204,7 +205,7 @@ func (m *LikwidCollector) Init(config json.RawMessage) error {
m.meta = map[string]string{"group": "PerfCounter"} m.meta = map[string]string{"group": "PerfCounter"}
cclog.ComponentDebug(m.name, "Get cpulist and init maps and lists") cclog.ComponentDebug(m.name, "Get cpulist and init maps and lists")
cpulist := topo.CpuList() cpulist := topo.HwthreadList()
m.cpulist = make([]C.int, len(cpulist)) m.cpulist = make([]C.int, len(cpulist))
m.cpu2tid = make(map[int]int) m.cpu2tid = make(map[int]int)
for i, c := range cpulist { for i, c := range cpulist {

View File

@ -19,7 +19,7 @@ The `likwid` collector is probably the most complicated collector. The LIKWID li
"calc": "COUNTER0 + COUNTER1", "calc": "COUNTER0 + COUNTER1",
"publish": false, "publish": false,
"unit": "myunit", "unit": "myunit",
"type": "cpu" "type": "hwthread"
} }
] ]
} }
@ -30,7 +30,7 @@ The `likwid` collector is probably the most complicated collector. The LIKWID li
"calc": "sum_01", "calc": "sum_01",
"publish": true, "publish": true,
"unit": "myunit", "unit": "myunit",
"type": "cpu" "type": "hwthread"
} }
] ]
} }
@ -51,15 +51,15 @@ Additional options:
Hardware performance counters are scattered all over the system nowadays. A counter coveres a specific part of the system. While there are hardware thread specific counter for CPU cycles, instructions and so on, some others are specific for a whole CPU socket/package. To address that, the LikwidCollector provides the specification of a `type` for each metric. Hardware performance counters are scattered all over the system nowadays. A counter coveres a specific part of the system. While there are hardware thread specific counter for CPU cycles, instructions and so on, some others are specific for a whole CPU socket/package. To address that, the LikwidCollector provides the specification of a `type` for each metric.
- `cpu` : One metric per CPU hardware thread with the tags `"type" : "cpu"` and `"type-id" : "$cpu_id"` - `hwthread` : One metric per CPU hardware thread with the tags `"type" : "hwthread"` and `"type-id" : "$hwthread_id"`
- `socket` : One metric per CPU socket/package with the tags `"type" : "socket"` and `"type-id" : "$socket_id"` - `socket` : One metric per CPU socket/package with the tags `"type" : "socket"` and `"type-id" : "$socket_id"`
**Note:** You should not specify the `socket` type for a metric that is measured at `cpu` scope and vice versa, so some kind of expert knowledge or lookup work in the [Likwid Wiki](https://github.com/RRZE-HPC/likwid/wiki) is required. Get the scope of each counter from the *Architecture* pages and as soon as one counter in a metric is socket-specific, the whole metric is socket-specific. **Note:** You cannot specify `socket` scope for a metric that is measured at `hwthread` scope, so some kind of expert knowledge or lookup work in the [Likwid Wiki](https://github.com/RRZE-HPC/likwid/wiki) is required. Get the scope of each counter from the *Architecture* pages and as soon as one counter in a metric is socket-specific, the whole metric is socket-specific.
As a guideline: As a guideline:
- All counters `FIXCx`, `PMCy` and `TMAz` have the scope `cpu` - All counters `FIXCx`, `PMCy` and `TMAz` have the scope `hwthread`
- All counters names containing `BOX` have the scope `socket` - All counters names containing `BOX` have the scope `socket`
- All `PWRx` counters have scope `socket`, except `"PWR1" : "RAPL_CORE_ENERGY"` has `cpu` scope (AMD Zen) - All `PWRx` counters have scope `socket`, except `"PWR1" : "RAPL_CORE_ENERGY"` has `hwthread` scope
- All `DFCx` counters have scope `socket` - All `DFCx` counters have scope `socket`
### Help with the configuration ### Help with the configuration
@ -90,7 +90,7 @@ $ scripts/likwid_perfgroup_to_cc_config.py ICX MEM_DP
"name": "Runtime (RDTSC) [s]", "name": "Runtime (RDTSC) [s]",
"publish": true, "publish": true,
"unit": "seconds" "unit": "seconds"
"scope": "cpu" "scope": "hwthread"
}, },
{ {
"..." : "..." "..." : "..."
@ -147,20 +147,20 @@ One might think this does not happen often but often used metrics in the world o
{ {
"name": "ipc", "name": "ipc",
"calc": "PMC0/PMC1", "calc": "PMC0/PMC1",
"type": "cpu", "type": "hwthread",
"publish": true "publish": true
}, },
{ {
"name": "flops_any", "name": "flops_any",
"calc": "0.000001*PMC2/time", "calc": "0.000001*PMC2/time",
"unit": "MFlops/s", "unit": "MFlops/s",
"type": "cpu", "type": "hwthread",
"publish": true "publish": true
}, },
{ {
"name": "clock", "name": "clock",
"calc": "0.000001*(FIXC1/FIXC2)/inverseClock", "calc": "0.000001*(FIXC1/FIXC2)/inverseClock",
"type": "cpu", "type": "hwthread",
"unit": "MHz", "unit": "MHz",
"publish": true "publish": true
}, },
@ -219,3 +219,33 @@ One might think this does not happen often but often used metrics in the world o
} }
``` ```
### How to get the eventsets and metrics from LIKWID
The `likwid` collector reads hardware performance counters at a **hwthread** and **socket** level. The configuration looks quite complicated but it is basically copy&paste from [LIKWID's performance groups](https://github.com/RRZE-HPC/likwid/tree/master/groups). The collector made multiple iterations and tried to use the performance groups but it lacked flexibility. The current way of configuration provides most flexibility.
The logic is as following: There are multiple eventsets, each consisting of a list of counters+events and a list of metrics. If you compare a common performance group with the example setting above, there is not much difference:
```
EVENTSET -> "events": {
FIXC1 ACTUAL_CPU_CLOCK -> "FIXC1": "ACTUAL_CPU_CLOCK",
FIXC2 MAX_CPU_CLOCK -> "FIXC2": "MAX_CPU_CLOCK",
PMC0 RETIRED_INSTRUCTIONS -> "PMC0" : "RETIRED_INSTRUCTIONS",
PMC1 CPU_CLOCKS_UNHALTED -> "PMC1" : "CPU_CLOCKS_UNHALTED",
PMC2 RETIRED_SSE_AVX_FLOPS_ALL -> "PMC2": "RETIRED_SSE_AVX_FLOPS_ALL",
PMC3 MERGE -> "PMC3": "MERGE",
-> }
```
The metrics are following the same procedure:
```
METRICS -> "metrics": [
IPC PMC0/PMC1 -> {
-> "name" : "IPC",
-> "calc" : "PMC0/PMC1",
-> "scope": "hwthread",
-> "publish": true
-> }
-> ]
```
The script `scripts/likwid_perfgroup_to_cc_config.py` might help you.

View File

@ -36,6 +36,7 @@ type LoadavgCollector struct {
func (m *LoadavgCollector) Init(config json.RawMessage) error { func (m *LoadavgCollector) Init(config json.RawMessage) error {
m.name = "LoadavgCollector" m.name = "LoadavgCollector"
m.parallel = true
m.setup() m.setup()
if len(config) > 0 { if len(config) > 0 {
err := json.Unmarshal(config, &m.config) err := json.Unmarshal(config, &m.config)

View File

@ -288,6 +288,7 @@ var LustreDeriveMetrics = []LustreMetricDefinition{
func (m *LustreCollector) Init(config json.RawMessage) error { func (m *LustreCollector) Init(config json.RawMessage) error {
var err error var err error
m.name = "LustreCollector" m.name = "LustreCollector"
m.parallel = true
if len(config) > 0 { if len(config) > 0 {
err = json.Unmarshal(config, &m.config) err = json.Unmarshal(config, &m.config)
if err != nil { if err != nil {

View File

@ -81,6 +81,7 @@ func getStats(filename string) map[string]MemstatStats {
func (m *MemstatCollector) Init(config json.RawMessage) error { func (m *MemstatCollector) Init(config json.RawMessage) error {
var err error var err error
m.name = "MemstatCollector" m.name = "MemstatCollector"
m.parallel = true
m.config.NodeStats = true m.config.NodeStats = true
m.config.NumaStats = false m.config.NumaStats = false
if len(config) > 0 { if len(config) > 0 {
@ -159,6 +160,7 @@ func (m *MemstatCollector) Init(config json.RawMessage) error {
func (m *MemstatCollector) Read(interval time.Duration, output chan lp.CCMetric) { func (m *MemstatCollector) Read(interval time.Duration, output chan lp.CCMetric) {
if !m.init { if !m.init {
cclog.ComponentPrint(m.name, "Here")
return return
} }

View File

@ -3,10 +3,6 @@ package collectors
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"io/ioutil"
"log"
"strconv"
"strings"
"time" "time"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
@ -16,6 +12,7 @@ type MetricCollector interface {
Name() string // Name of the metric collector Name() string // Name of the metric collector
Init(config json.RawMessage) error // Initialize metric collector Init(config json.RawMessage) error // Initialize metric collector
Initialized() bool // Is metric collector initialized? Initialized() bool // Is metric collector initialized?
Parallel() bool
Read(duration time.Duration, output chan lp.CCMetric) // Read metrics from metric collector Read(duration time.Duration, output chan lp.CCMetric) // Read metrics from metric collector
Close() // Close / finish metric collector Close() // Close / finish metric collector
} }
@ -23,6 +20,7 @@ type MetricCollector interface {
type metricCollector struct { type metricCollector struct {
name string // name of the metric name string // name of the metric
init bool // is metric collector initialized? init bool // is metric collector initialized?
parallel bool // can the metric collector be executed in parallel with others
meta map[string]string // static meta data tags meta map[string]string // static meta data tags
} }
@ -31,6 +29,11 @@ func (c *metricCollector) Name() string {
return c.name return c.name
} }
// Name returns the name of the metric collector
func (c *metricCollector) Parallel() bool {
return c.parallel
}
// Setup is for future use // Setup is for future use
func (c *metricCollector) setup() error { func (c *metricCollector) setup() error {
return nil return nil
@ -65,58 +68,6 @@ func stringArrayContains(array []string, str string) (int, bool) {
return -1, false return -1, false
} }
// SocketList returns the list of physical sockets as read from /proc/cpuinfo
func SocketList() []int {
buffer, err := ioutil.ReadFile("/proc/cpuinfo")
if err != nil {
log.Print(err)
return nil
}
ll := strings.Split(string(buffer), "\n")
var packs []int
for _, line := range ll {
if strings.HasPrefix(line, "physical id") {
lv := strings.Fields(line)
id, err := strconv.ParseInt(lv[3], 10, 32)
if err != nil {
log.Print(err)
return packs
}
_, found := intArrayContains(packs, int(id))
if !found {
packs = append(packs, int(id))
}
}
}
return packs
}
// CpuList returns the list of physical CPUs (in contrast to logical CPUs) as read from /proc/cpuinfo
func CpuList() []int {
buffer, err := ioutil.ReadFile("/proc/cpuinfo")
if err != nil {
log.Print(err)
return nil
}
ll := strings.Split(string(buffer), "\n")
var cpulist []int
for _, line := range ll {
if strings.HasPrefix(line, "processor") {
lv := strings.Fields(line)
id, err := strconv.ParseInt(lv[2], 10, 32)
if err != nil {
log.Print(err)
return cpulist
}
_, found := intArrayContains(cpulist, int(id))
if !found {
cpulist = append(cpulist, int(id))
}
}
}
return cpulist
}
// RemoveFromStringList removes the string r from the array of strings s // RemoveFromStringList removes the string r from the array of strings s
// If r is not contained in the array an error is returned // If r is not contained in the array an error is returned
func RemoveFromStringList(s []string, r string) ([]string, error) { func RemoveFromStringList(s []string, r string) ([]string, error) {

View File

@ -39,6 +39,7 @@ type NetstatCollector struct {
func (m *NetstatCollector) Init(config json.RawMessage) error { func (m *NetstatCollector) Init(config json.RawMessage) error {
m.name = "NetstatCollector" m.name = "NetstatCollector"
m.parallel = true
m.setup() m.setup()
m.lastTimestamp = time.Now() m.lastTimestamp = time.Now()

View File

@ -114,6 +114,7 @@ func (m *nfsCollector) MainInit(config json.RawMessage) error {
m.data = make(map[string]NfsCollectorData) m.data = make(map[string]NfsCollectorData)
m.initStats() m.initStats()
m.init = true m.init = true
m.parallel = true
return nil return nil
} }

View File

@ -54,6 +54,7 @@ func (m *NUMAStatsCollector) Init(config json.RawMessage) error {
} }
m.name = "NUMAStatsCollector" m.name = "NUMAStatsCollector"
m.parallel = true
m.setup() m.setup()
m.meta = map[string]string{ m.meta = map[string]string{
"source": m.name, "source": m.name,

File diff suppressed because it is too large Load Diff

View File

@ -3,38 +3,74 @@
```json ```json
"nvidia": { "nvidia": {
"exclude_devices" : [ "exclude_devices": [
"0","1" "0","1", "0000000:ff:01.0"
], ],
"exclude_metrics": [ "exclude_metrics": [
"nv_fb_memory", "nv_fb_mem_used",
"nv_fan" "nv_fan"
] ],
"process_mig_devices": false,
"use_pci_info_as_type_id": true,
"add_pci_info_tag": false,
"add_uuid_meta": false,
"add_board_number_meta": false,
"add_serial_meta": false,
"use_uuid_for_mig_device": false,
"use_slice_for_mig_device": false
} }
``` ```
The `nvidia` collector can be configured to leave out specific devices with the `exclude_devices` option. It takes IDs as supplied to the NVML with `nvmlDeviceGetHandleByIndex()` or the PCI address in NVML format (`%08X:%02X:%02X.0`). Metrics (listed below) that should not be sent to the MetricRouter can be excluded with the `exclude_metrics` option. Commonly only the physical GPUs are monitored. If MIG devices should be analyzed as well, set `process_mig_devices` (adds `stype=mig,stype-id=<mig_index>`). With the options `use_uuid_for_mig_device` and `use_slice_for_mig_device`, the `<mig_index>` can be replaced with the UUID (e.g. `MIG-6a9f7cc8-6d5b-5ce0-92de-750edc4d8849`) or the MIG slice name (e.g. `1g.5gb`).
The metrics sent by the `nvidia` collector use `accelerator` as `type` tag. For the `type-id`, it uses the device handle index by default. With the `use_pci_info_as_type_id` option, the PCI ID is used instead. If both values should be added as tags, activate the `add_pci_info_tag` option. It uses the device handle index as `type-id` and adds the PCI ID as separate `pci_identifier` tag.
Optionally, it is possible to add the UUID, the board part number and the serial to the meta informations. They are not sent to the sinks (if not configured otherwise).
Metrics: Metrics:
* `nv_util` * `nv_util`
* `nv_mem_util` * `nv_mem_util`
* `nv_mem_total` * `nv_fb_mem_total`
* `nv_fb_memory` * `nv_fb_mem_used`
* `nv_bar1_mem_total`
* `nv_bar1_mem_used`
* `nv_temp` * `nv_temp`
* `nv_fan` * `nv_fan`
* `nv_ecc_mode` * `nv_ecc_mode`
* `nv_perf_state` * `nv_perf_state`
* `nv_power_usage_report` * `nv_power_usage`
* `nv_graphics_clock_report` * `nv_graphics_clock`
* `nv_sm_clock_report` * `nv_sm_clock`
* `nv_mem_clock_report` * `nv_mem_clock`
* `nv_video_clock`
* `nv_max_graphics_clock` * `nv_max_graphics_clock`
* `nv_max_sm_clock` * `nv_max_sm_clock`
* `nv_max_mem_clock` * `nv_max_mem_clock`
* `nv_ecc_db_error` * `nv_max_video_clock`
* `nv_ecc_sb_error` * `nv_ecc_uncorrected_error`
* `nv_power_man_limit` * `nv_ecc_corrected_error`
* `nv_power_max_limit`
* `nv_encoder_util` * `nv_encoder_util`
* `nv_decoder_util` * `nv_decoder_util`
* `nv_remapped_rows_corrected`
* `nv_remapped_rows_uncorrected`
* `nv_remapped_rows_pending`
* `nv_remapped_rows_failure`
* `nv_compute_processes`
* `nv_graphics_processes`
* `nv_violation_power`
* `nv_violation_thermal`
* `nv_violation_sync_boost`
* `nv_violation_board_limit`
* `nv_violation_low_util`
* `nv_violation_reliability`
* `nv_violation_below_app_clock`
* `nv_violation_below_base_clock`
* `nv_nvlink_crc_flit_errors`
* `nv_nvlink_crc_errors`
* `nv_nvlink_ecc_errors`
* `nv_nvlink_replay_errors`
* `nv_nvlink_recovery_errors`
It uses a separate `type` in the metrics. The output metric looks like this: Some metrics add the additional sub type tag (`stype`) like the `nv_nvlink_*` metrics set `stype=nvlink,stype-id=<link_number>`.
`<name>,type=accelerator,type-id=<nvidia-gpu-id> value=<metric value> <timestamp>`

319
collectors/rocmsmiMetric.go Normal file
View File

@ -0,0 +1,319 @@
package collectors
import (
"encoding/json"
"errors"
"fmt"
"time"
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
"github.com/ClusterCockpit/go-rocm-smi/pkg/rocm_smi"
)
type RocmSmiCollectorConfig struct {
ExcludeMetrics []string `json:"exclude_metrics,omitempty"`
ExcludeDevices []string `json:"exclude_devices,omitempty"`
AddPciInfoTag bool `json:"add_pci_info_tag,omitempty"`
UsePciInfoAsTypeId bool `json:"use_pci_info_as_type_id,omitempty"`
AddSerialMeta bool `json:"add_serial_meta,omitempty"`
}
type RocmSmiCollectorDevice struct {
device rocm_smi.DeviceHandle
index int
tags map[string]string // default tags
meta map[string]string // default meta information
excludeMetrics map[string]bool // copy of exclude metrics from config
}
type RocmSmiCollector struct {
metricCollector
config RocmSmiCollectorConfig // the configuration structure
devices []RocmSmiCollectorDevice
}
// Functions to implement MetricCollector interface
// Init(...), Read(...), Close()
// See: metricCollector.go
// Init initializes the sample collector
// Called once by the collector manager
// All tags, meta data tags and metrics that do not change over the runtime should be set here
func (m *RocmSmiCollector) Init(config json.RawMessage) error {
var err error = nil
// Always set the name early in Init() to use it in cclog.Component* functions
m.name = "RocmSmiCollector"
// This is for later use, also call it early
m.setup()
// Define meta information sent with each metric
// (Can also be dynamic or this is the basic set with extension through AddMeta())
//m.meta = map[string]string{"source": m.name, "group": "AMD"}
// Define tags sent with each metric
// The 'type' tag is always needed, it defines the granulatity of the metric
// node -> whole system
// socket -> CPU socket (requires socket ID as 'type-id' tag)
// cpu -> single CPU hardware thread (requires cpu ID as 'type-id' tag)
//m.tags = map[string]string{"type": "node"}
// Read in the JSON configuration
if len(config) > 0 {
err = json.Unmarshal(config, &m.config)
if err != nil {
cclog.ComponentError(m.name, "Error reading config:", err.Error())
return err
}
}
ret := rocm_smi.Init()
if ret != rocm_smi.STATUS_SUCCESS {
err = errors.New("Failed to initialize ROCm SMI library")
cclog.ComponentError(m.name, err.Error())
return err
}
numDevs, ret := rocm_smi.NumMonitorDevices()
if ret != rocm_smi.STATUS_SUCCESS {
err = errors.New("Failed to get number of GPUs from ROCm SMI library")
cclog.ComponentError(m.name, err.Error())
return err
}
exclDev := func(s string) bool {
skip_device := false
for _, excl := range m.config.ExcludeDevices {
if excl == s {
skip_device = true
break
}
}
return skip_device
}
m.devices = make([]RocmSmiCollectorDevice, 0)
for i := 0; i < numDevs; i++ {
str_i := fmt.Sprintf("%d", i)
if exclDev(str_i) {
continue
}
device, ret := rocm_smi.DeviceGetHandleByIndex(i)
if ret != rocm_smi.STATUS_SUCCESS {
err = fmt.Errorf("Failed to get handle for GPU %d", i)
cclog.ComponentError(m.name, err.Error())
return err
}
pciInfo, ret := rocm_smi.DeviceGetPciInfo(device)
if ret != rocm_smi.STATUS_SUCCESS {
err = fmt.Errorf("Failed to get PCI information for GPU %d", i)
cclog.ComponentError(m.name, err.Error())
return err
}
pciId := fmt.Sprintf(
"%08X:%02X:%02X.%X",
pciInfo.Domain,
pciInfo.Bus,
pciInfo.Device,
pciInfo.Function)
if exclDev(pciId) {
continue
}
dev := RocmSmiCollectorDevice{
device: device,
tags: map[string]string{
"type": "accelerator",
"type-id": str_i,
},
meta: map[string]string{
"source": m.name,
"group": "AMD",
},
}
if m.config.UsePciInfoAsTypeId {
dev.tags["type-id"] = pciId
} else if m.config.AddPciInfoTag {
dev.tags["pci_identifier"] = pciId
}
if m.config.AddSerialMeta {
serial, ret := rocm_smi.DeviceGetSerialNumber(device)
if ret != rocm_smi.STATUS_SUCCESS {
cclog.ComponentError(m.name, "Unable to get serial number for device at index", i, ":", rocm_smi.StatusStringNoError(ret))
} else {
dev.meta["serial"] = serial
}
}
// Add excluded metrics
dev.excludeMetrics = map[string]bool{}
for _, e := range m.config.ExcludeMetrics {
dev.excludeMetrics[e] = true
}
dev.index = i
m.devices = append(m.devices, dev)
}
// Set this flag only if everything is initialized properly, all required files exist, ...
m.init = true
return err
}
// Read collects all metrics belonging to the sample collector
// and sends them through the output channel to the collector manager
func (m *RocmSmiCollector) Read(interval time.Duration, output chan lp.CCMetric) {
// Create a sample metric
timestamp := time.Now()
for _, dev := range m.devices {
metrics, ret := rocm_smi.DeviceGetMetrics(dev.device)
if ret != rocm_smi.STATUS_SUCCESS {
cclog.ComponentError(m.name, "Unable to get metrics for device at index", dev.index, ":", rocm_smi.StatusStringNoError(ret))
continue
}
if !dev.excludeMetrics["rocm_gfx_util"] {
value := metrics.Average_gfx_activity
y, err := lp.New("rocm_gfx_util", dev.tags, dev.meta, map[string]interface{}{"value": value}, timestamp)
if err == nil {
output <- y
}
}
if !dev.excludeMetrics["rocm_umc_util"] {
value := metrics.Average_umc_activity
y, err := lp.New("rocm_umc_util", dev.tags, dev.meta, map[string]interface{}{"value": value}, timestamp)
if err == nil {
output <- y
}
}
if !dev.excludeMetrics["rocm_mm_util"] {
value := metrics.Average_mm_activity
y, err := lp.New("rocm_mm_util", dev.tags, dev.meta, map[string]interface{}{"value": value}, timestamp)
if err == nil {
output <- y
}
}
if !dev.excludeMetrics["rocm_avg_power"] {
value := metrics.Average_socket_power
y, err := lp.New("rocm_avg_power", dev.tags, dev.meta, map[string]interface{}{"value": value}, timestamp)
if err == nil {
output <- y
}
}
if !dev.excludeMetrics["rocm_temp_mem"] {
value := metrics.Temperature_mem
y, err := lp.New("rocm_temp_mem", dev.tags, dev.meta, map[string]interface{}{"value": value}, timestamp)
if err == nil {
output <- y
}
}
if !dev.excludeMetrics["rocm_temp_hotspot"] {
value := metrics.Temperature_hotspot
y, err := lp.New("rocm_temp_hotspot", dev.tags, dev.meta, map[string]interface{}{"value": value}, timestamp)
if err == nil {
output <- y
}
}
if !dev.excludeMetrics["rocm_temp_edge"] {
value := metrics.Temperature_edge
y, err := lp.New("rocm_temp_edge", dev.tags, dev.meta, map[string]interface{}{"value": value}, timestamp)
if err == nil {
output <- y
}
}
if !dev.excludeMetrics["rocm_temp_vrgfx"] {
value := metrics.Temperature_vrgfx
y, err := lp.New("rocm_temp_vrgfx", dev.tags, dev.meta, map[string]interface{}{"value": value}, timestamp)
if err == nil {
output <- y
}
}
if !dev.excludeMetrics["rocm_temp_vrsoc"] {
value := metrics.Temperature_vrsoc
y, err := lp.New("rocm_temp_vrsoc", dev.tags, dev.meta, map[string]interface{}{"value": value}, timestamp)
if err == nil {
output <- y
}
}
if !dev.excludeMetrics["rocm_temp_vrmem"] {
value := metrics.Temperature_vrmem
y, err := lp.New("rocm_temp_vrmem", dev.tags, dev.meta, map[string]interface{}{"value": value}, timestamp)
if err == nil {
output <- y
}
}
if !dev.excludeMetrics["rocm_gfx_clock"] {
value := metrics.Average_gfxclk_frequency
y, err := lp.New("rocm_gfx_clock", dev.tags, dev.meta, map[string]interface{}{"value": value}, timestamp)
if err == nil {
output <- y
}
}
if !dev.excludeMetrics["rocm_soc_clock"] {
value := metrics.Average_socclk_frequency
y, err := lp.New("rocm_soc_clock", dev.tags, dev.meta, map[string]interface{}{"value": value}, timestamp)
if err == nil {
output <- y
}
}
if !dev.excludeMetrics["rocm_u_clock"] {
value := metrics.Average_uclk_frequency
y, err := lp.New("rocm_u_clock", dev.tags, dev.meta, map[string]interface{}{"value": value}, timestamp)
if err == nil {
output <- y
}
}
if !dev.excludeMetrics["rocm_v0_clock"] {
value := metrics.Average_vclk0_frequency
y, err := lp.New("rocm_v0_clock", dev.tags, dev.meta, map[string]interface{}{"value": value}, timestamp)
if err == nil {
output <- y
}
}
if !dev.excludeMetrics["rocm_v1_clock"] {
value := metrics.Average_vclk1_frequency
y, err := lp.New("rocm_v1_clock", dev.tags, dev.meta, map[string]interface{}{"value": value}, timestamp)
if err == nil {
output <- y
}
}
if !dev.excludeMetrics["rocm_d0_clock"] {
value := metrics.Average_dclk0_frequency
y, err := lp.New("rocm_d0_clock", dev.tags, dev.meta, map[string]interface{}{"value": value}, timestamp)
if err == nil {
output <- y
}
}
if !dev.excludeMetrics["rocm_d1_clock"] {
value := metrics.Average_dclk1_frequency
y, err := lp.New("rocm_d1_clock", dev.tags, dev.meta, map[string]interface{}{"value": value}, timestamp)
if err == nil {
output <- y
}
}
if !dev.excludeMetrics["rocm_temp_hbm"] {
for i := 0; i < rocm_smi.NUM_HBM_INSTANCES; i++ {
value := metrics.Temperature_hbm[i]
y, err := lp.New("rocm_temp_hbm", dev.tags, dev.meta, map[string]interface{}{"value": value}, timestamp)
if err == nil {
y.AddTag("stype", "device")
y.AddTag("stype-id", fmt.Sprintf("%d", i))
output <- y
}
}
}
}
}
// Close metric collector: close network connection, close files, close libraries, ...
// Called once by the collector manager
func (m *RocmSmiCollector) Close() {
// Unset flag
ret := rocm_smi.Shutdown()
if ret != rocm_smi.STATUS_SUCCESS {
cclog.ComponentError(m.name, "Failed to shutdown ROCm SMI library")
}
m.init = false
}

View File

@ -0,0 +1,47 @@
## `rocm_smi` collector
```json
"rocm_smi": {
"exclude_devices": [
"0","1", "0000000:ff:01.0"
],
"exclude_metrics": [
"rocm_mm_util",
"rocm_temp_vrsoc"
],
"use_pci_info_as_type_id": true,
"add_pci_info_tag": false,
"add_serial_meta": false,
}
```
The `rocm_smi` collector can be configured to leave out specific devices with the `exclude_devices` option. It takes logical IDs in the list of available devices or the PCI address similar to NVML format (`%08X:%02X:%02X.0`). Metrics (listed below) that should not be sent to the MetricRouter can be excluded with the `exclude_metrics` option.
The metrics sent by the `rocm_smi` collector use `accelerator` as `type` tag. For the `type-id`, it uses the device handle index by default. With the `use_pci_info_as_type_id` option, the PCI ID is used instead. If both values should be added as tags, activate the `add_pci_info_tag` option. It uses the device handle index as `type-id` and adds the PCI ID as separate `pci_identifier` tag.
Optionally, it is possible to add the serial to the meta informations. They are not sent to the sinks (if not configured otherwise).
Metrics:
* `rocm_gfx_util`
* `rocm_umc_util`
* `rocm_mm_util`
* `rocm_avg_power`
* `rocm_temp_mem`
* `rocm_temp_hotspot`
* `rocm_temp_edge`
* `rocm_temp_vrgfx`
* `rocm_temp_vrsoc`
* `rocm_temp_vrmem`
* `rocm_gfx_clock`
* `rocm_soc_clock`
* `rocm_u_clock`
* `rocm_v0_clock`
* `rocm_v1_clock`
* `rocm_d0_clock`
* `rocm_d1_clock`
* `rocm_temp_hbm`
Some metrics add the additional sub type tag (`stype`) like the `rocm_temp_hbm` metrics set `stype=device,stype-id=<HBM_slice_number>`.

View File

@ -35,6 +35,10 @@ func (m *SampleCollector) Init(config json.RawMessage) error {
m.name = "InternalCollector" m.name = "InternalCollector"
// This is for later use, also call it early // This is for later use, also call it early
m.setup() m.setup()
// Tell whether the collector should be run in parallel with others (reading files, ...)
// or it should be run serially, mostly for collectors acutally doing measurements
// because they should not measure the execution of the other collectors
m.parallel = true
// Define meta information sent with each metric // Define meta information sent with each metric
// (Can also be dynamic or this is the basic set with extension through AddMeta()) // (Can also be dynamic or this is the basic set with extension through AddMeta())
m.meta = map[string]string{"source": m.name, "group": "SAMPLE"} m.meta = map[string]string{"source": m.name, "group": "SAMPLE"}
@ -42,7 +46,12 @@ func (m *SampleCollector) Init(config json.RawMessage) error {
// The 'type' tag is always needed, it defines the granulatity of the metric // The 'type' tag is always needed, it defines the granulatity of the metric
// node -> whole system // node -> whole system
// socket -> CPU socket (requires socket ID as 'type-id' tag) // socket -> CPU socket (requires socket ID as 'type-id' tag)
// cpu -> single CPU hardware thread (requires cpu ID as 'type-id' tag) // die -> CPU die (requires CPU die ID as 'type-id' tag)
// memoryDomain -> NUMA domain (requires NUMA domain ID as 'type-id' tag)
// llc -> Last level cache (requires last level cache ID as 'type-id' tag)
// core -> single CPU core that may consist of multiple hardware threads (SMT) (requires core ID as 'type-id' tag)
// hwthtread -> single CPU hardware thread (requires hardware thread ID as 'type-id' tag)
// accelerator -> A accelerator device like GPU or FPGA (requires an accelerator ID as 'type-id' tag)
m.tags = map[string]string{"type": "node"} m.tags = map[string]string{"type": "node"}
// Read in the JSON configuration // Read in the JSON configuration
if len(config) > 0 { if len(config) > 0 {

View File

@ -50,6 +50,7 @@ func (m *TempCollector) Init(config json.RawMessage) error {
} }
m.name = "TempCollector" m.name = "TempCollector"
m.parallel = true
m.setup() m.setup()
if len(config) > 0 { if len(config) > 0 {
err := json.Unmarshal(config, &m.config) err := json.Unmarshal(config, &m.config)
@ -116,6 +117,10 @@ func (m *TempCollector) Init(config json.RawMessage) error {
} }
// Sensor file // Sensor file
_, err = ioutil.ReadFile(file)
if err != nil {
continue
}
sensor.file = file sensor.file = file
// Sensor tags // Sensor tags

View File

@ -28,6 +28,7 @@ type TopProcsCollector struct {
func (m *TopProcsCollector) Init(config json.RawMessage) error { func (m *TopProcsCollector) Init(config json.RawMessage) error {
var err error var err error
m.name = "TopProcsCollector" m.name = "TopProcsCollector"
m.parallel = true
m.tags = map[string]string{"type": "node"} m.tags = map[string]string{"type": "node"}
m.meta = map[string]string{"source": m.name, "group": "TopProcs"} m.meta = map[string]string{"source": m.name, "group": "TopProcs"}
if len(config) > 0 { if len(config) > 0 {

View File

@ -1,8 +1,8 @@
{ {
"sinks": "sinks.json", "sinks": "./sinks.json",
"collectors" : "collectors.json", "collectors" : "./collectors.json",
"receivers" : "receivers.json", "receivers" : "./receivers.json",
"router" : "router.json", "router" : "./router.json",
"interval": 10, "interval": "10s",
"duration": 1 "duration": "1s"
} }

23
go.mod
View File

@ -1,16 +1,35 @@
module github.com/ClusterCockpit/cc-metric-collector module github.com/ClusterCockpit/cc-metric-collector
go 1.16 go 1.17
require ( require (
github.com/ClusterCockpit/cc-units v0.0.0-20220318130935-92a0c6442220
github.com/NVIDIA/go-nvml v0.11.6-0 github.com/NVIDIA/go-nvml v0.11.6-0
github.com/PaesslerAG/gval v1.1.2 github.com/PaesslerAG/gval v1.1.2
github.com/gorilla/mux v1.8.0 github.com/gorilla/mux v1.8.0
github.com/influxdata/influxdb-client-go/v2 v2.8.1 github.com/influxdata/influxdb-client-go/v2 v2.8.1
github.com/influxdata/line-protocol v0.0.0-20210922203350-b1ad95c89adf github.com/influxdata/line-protocol v0.0.0-20210922203350-b1ad95c89adf
github.com/nats-io/nats-server/v2 v2.8.0 // indirect
github.com/nats-io/nats.go v1.14.0 github.com/nats-io/nats.go v1.14.0
github.com/prometheus/client_golang v1.12.1 github.com/prometheus/client_golang v1.12.1
github.com/stmcginnis/gofish v0.13.0 github.com/stmcginnis/gofish v0.13.0
golang.org/x/sys v0.0.0-20220412211240-33da011f77ad golang.org/x/sys v0.0.0-20220412211240-33da011f77ad
) )
require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/deepmap/oapi-codegen v1.8.2 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/nats-io/nats-server/v2 v2.8.0 // indirect
github.com/nats-io/nkeys v0.3.0 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.32.1 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
golang.org/x/crypto v0.0.0-20220112180741-5e0467b6c7ce // indirect
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 // indirect
google.golang.org/protobuf v1.26.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
)

16
go.mod.1.16 Normal file
View File

@ -0,0 +1,16 @@
module github.com/ClusterCockpit/cc-metric-collector
go 1.16
require (
github.com/NVIDIA/go-nvml v0.11.6-0
github.com/PaesslerAG/gval v1.1.2
github.com/gorilla/mux v1.8.0
github.com/influxdata/influxdb-client-go/v2 v2.7.0
github.com/influxdata/line-protocol v0.0.0-20210922203350-b1ad95c89adf
github.com/nats-io/nats-server/v2 v2.8.0 // indirect
github.com/nats-io/nats.go v1.14.0
github.com/prometheus/client_golang v1.12.1
github.com/stmcginnis/gofish v0.13.0
golang.org/x/sys v0.0.0-20220412211240-33da011f77ad
)

16
go.mod.1.17+ Normal file
View File

@ -0,0 +1,16 @@
module github.com/ClusterCockpit/cc-metric-collector
go 1.17
require (
github.com/NVIDIA/go-nvml v0.11.6-0
github.com/PaesslerAG/gval v1.1.2
github.com/gorilla/mux v1.8.0
github.com/influxdata/influxdb-client-go/v2 v2.8.1
github.com/influxdata/line-protocol v0.0.0-20210922203350-b1ad95c89adf
github.com/nats-io/nats-server/v2 v2.8.0 // indirect
github.com/nats-io/nats.go v1.14.0
github.com/prometheus/client_golang v1.12.1
github.com/stmcginnis/gofish v0.13.0
golang.org/x/sys v0.0.0-20220412211240-33da011f77ad
)

View File

@ -29,6 +29,7 @@ func intArrayContains(array []int, str int) (int, bool) {
return -1, false return -1, false
} }
// Used internally for sysfs file reads
func fileToInt(path string) int { func fileToInt(path string) int {
buffer, err := ioutil.ReadFile(path) buffer, err := ioutil.ReadFile(path)
if err != nil { if err != nil {
@ -47,6 +48,7 @@ func fileToInt(path string) int {
return int(id) return int(id)
} }
// Get list of CPU socket IDs
func SocketList() []int { func SocketList() []int {
buffer, err := ioutil.ReadFile(string(PROCFS_CPUINFO)) buffer, err := ioutil.ReadFile(string(PROCFS_CPUINFO))
if err != nil { if err != nil {
@ -54,7 +56,7 @@ func SocketList() []int {
return nil return nil
} }
ll := strings.Split(string(buffer), "\n") ll := strings.Split(string(buffer), "\n")
var packs []int packs := make([]int, 0)
for _, line := range ll { for _, line := range ll {
if strings.HasPrefix(line, "physical id") { if strings.HasPrefix(line, "physical id") {
lv := strings.Fields(line) lv := strings.Fields(line)
@ -72,7 +74,8 @@ func SocketList() []int {
return packs return packs
} }
func CpuList() []int { // Get list of hardware thread IDs in the order of listing in /proc/cpuinfo
func HwthreadList() []int {
buffer, err := ioutil.ReadFile(string(PROCFS_CPUINFO)) buffer, err := ioutil.ReadFile(string(PROCFS_CPUINFO))
if err != nil { if err != nil {
log.Print(err) log.Print(err)
@ -97,6 +100,13 @@ func CpuList() []int {
return cpulist return cpulist
} }
// Get list of hardware thread IDs in the order of listing in /proc/cpuinfo
// Deprecated! Use HwthreadList()
func CpuList() []int {
return HwthreadList()
}
// Get list of CPU core IDs in the order of listing in /proc/cpuinfo
func CoreList() []int { func CoreList() []int {
buffer, err := ioutil.ReadFile(string(PROCFS_CPUINFO)) buffer, err := ioutil.ReadFile(string(PROCFS_CPUINFO))
if err != nil { if err != nil {
@ -122,6 +132,7 @@ func CoreList() []int {
return corelist return corelist
} }
// Get list of NUMA node IDs
func NumaNodeList() []int { func NumaNodeList() []int {
numaList := make([]int, 0) numaList := make([]int, 0)
globPath := filepath.Join(string(SYSFS_NUMABASE), "node*") globPath := filepath.Join(string(SYSFS_NUMABASE), "node*")
@ -156,8 +167,9 @@ func NumaNodeList() []int {
return numaList return numaList
} }
// Get list of CPU die IDs
func DieList() []int { func DieList() []int {
cpulist := CpuList() cpulist := HwthreadList()
dielist := make([]int, 0) dielist := make([]int, 0)
for _, c := range cpulist { for _, c := range cpulist {
diepath := filepath.Join(string(SYSFS_CPUBASE), fmt.Sprintf("cpu%d", c), "topology/die_id") diepath := filepath.Join(string(SYSFS_CPUBASE), fmt.Sprintf("cpu%d", c), "topology/die_id")
@ -175,7 +187,27 @@ func DieList() []int {
return SocketList() return SocketList()
} }
type CpuEntry struct { // Get list of specified type using the naming format inside ClusterCockpit
func GetTypeList(topology_type string) []int {
switch topology_type {
case "node":
return []int{0}
case "socket":
return SocketList()
case "die":
return DieList()
case "memoryDomain":
return NumaNodeList()
case "core":
return CoreList()
case "hwthread":
return HwthreadList()
}
return []int{}
}
// Structure holding all information about a hardware thread
type HwthreadEntry struct {
Cpuid int Cpuid int
SMT int SMT int
Core int Core int
@ -184,25 +216,25 @@ type CpuEntry struct {
Die int Die int
} }
func CpuData() []CpuEntry { func CpuData() []HwthreadEntry {
fileToInt := func(path string) int { // fileToInt := func(path string) int {
buffer, err := ioutil.ReadFile(path) // buffer, err := ioutil.ReadFile(path)
if err != nil { // if err != nil {
log.Print(err) // log.Print(err)
//cclogger.ComponentError("ccTopology", "Reading", path, ":", err.Error()) // //cclogger.ComponentError("ccTopology", "Reading", path, ":", err.Error())
return -1 // return -1
} // }
sbuffer := strings.Replace(string(buffer), "\n", "", -1) // sbuffer := strings.Replace(string(buffer), "\n", "", -1)
var id int64 // var id int64
//_, err = fmt.Scanf("%d", sbuffer, &id) // //_, err = fmt.Scanf("%d", sbuffer, &id)
id, err = strconv.ParseInt(sbuffer, 10, 32) // id, err = strconv.ParseInt(sbuffer, 10, 32)
if err != nil { // if err != nil {
cclogger.ComponentError("ccTopology", "Parsing", path, ":", sbuffer, err.Error()) // cclogger.ComponentError("ccTopology", "Parsing", path, ":", sbuffer, err.Error())
return -1 // return -1
} // }
return int(id) // return int(id)
} // }
getCore := func(basepath string) int { getCore := func(basepath string) int {
return fileToInt(fmt.Sprintf("%s/core_id", basepath)) return fileToInt(fmt.Sprintf("%s/core_id", basepath))
} }
@ -260,9 +292,9 @@ func CpuData() []CpuEntry {
return 0 return 0
} }
clist := make([]CpuEntry, 0) clist := make([]HwthreadEntry, 0)
for _, c := range CpuList() { for _, c := range HwthreadList() {
clist = append(clist, CpuEntry{Cpuid: c}) clist = append(clist, HwthreadEntry{Cpuid: c})
} }
for i, centry := range clist { for i, centry := range clist {
centry.Socket = -1 centry.Socket = -1
@ -298,6 +330,7 @@ func CpuData() []CpuEntry {
return clist return clist
} }
// Structure holding basic information about a CPU
type CpuInformation struct { type CpuInformation struct {
NumHWthreads int NumHWthreads int
SMTWidth int SMTWidth int
@ -307,6 +340,7 @@ type CpuInformation struct {
NumNumaDomains int NumNumaDomains int
} }
// Get basic information about the CPU
func CpuInfo() CpuInformation { func CpuInfo() CpuInformation {
var c CpuInformation var c CpuInformation
@ -342,7 +376,8 @@ func CpuInfo() CpuInformation {
return c return c
} }
func GetCpuSocket(cpuid int) int { // Get the CPU socket ID for a given hardware thread ID
func GetHwthreadSocket(cpuid int) int {
cdata := CpuData() cdata := CpuData()
for _, d := range cdata { for _, d := range cdata {
if d.Cpuid == cpuid { if d.Cpuid == cpuid {
@ -352,7 +387,8 @@ func GetCpuSocket(cpuid int) int {
return -1 return -1
} }
func GetCpuNumaDomain(cpuid int) int { // Get the NUMA node ID for a given hardware thread ID
func GetHwthreadNumaDomain(cpuid int) int {
cdata := CpuData() cdata := CpuData()
for _, d := range cdata { for _, d := range cdata {
if d.Cpuid == cpuid { if d.Cpuid == cpuid {
@ -362,7 +398,8 @@ func GetCpuNumaDomain(cpuid int) int {
return -1 return -1
} }
func GetCpuDie(cpuid int) int { // Get the CPU die ID for a given hardware thread ID
func GetHwthreadDie(cpuid int) int {
cdata := CpuData() cdata := CpuData()
for _, d := range cdata { for _, d := range cdata {
if d.Cpuid == cpuid { if d.Cpuid == cpuid {
@ -372,7 +409,8 @@ func GetCpuDie(cpuid int) int {
return -1 return -1
} }
func GetCpuCore(cpuid int) int { // Get the CPU core ID for a given hardware thread ID
func GetHwthreadCore(cpuid int) int {
cdata := CpuData() cdata := CpuData()
for _, d := range cdata { for _, d := range cdata {
if d.Cpuid == cpuid { if d.Cpuid == cpuid {
@ -382,7 +420,8 @@ func GetCpuCore(cpuid int) int {
return -1 return -1
} }
func GetSocketCpus(socket int) []int { // Get the all hardware thread ID associated with a CPU socket
func GetSocketHwthreads(socket int) []int {
all := CpuData() all := CpuData()
cpulist := make([]int, 0) cpulist := make([]int, 0)
for _, d := range all { for _, d := range all {
@ -393,7 +432,8 @@ func GetSocketCpus(socket int) []int {
return cpulist return cpulist
} }
func GetNumaDomainCpus(domain int) []int { // Get the all hardware thread ID associated with a NUMA node
func GetNumaDomainHwthreads(domain int) []int {
all := CpuData() all := CpuData()
cpulist := make([]int, 0) cpulist := make([]int, 0)
for _, d := range all { for _, d := range all {
@ -404,7 +444,8 @@ func GetNumaDomainCpus(domain int) []int {
return cpulist return cpulist
} }
func GetDieCpus(die int) []int { // Get the all hardware thread ID associated with a CPU die
func GetDieHwthreads(die int) []int {
all := CpuData() all := CpuData()
cpulist := make([]int, 0) cpulist := make([]int, 0)
for _, d := range all { for _, d := range all {
@ -415,7 +456,8 @@ func GetDieCpus(die int) []int {
return cpulist return cpulist
} }
func GetCoreCpus(core int) []int { // Get the all hardware thread ID associated with a CPU core
func GetCoreHwthreads(core int) []int {
all := CpuData() all := CpuData()
cpulist := make([]int, 0) cpulist := make([]int, 0)
for _, d := range all { for _, d := range all {

View File

@ -246,7 +246,7 @@ func matchfunc(args ...interface{}) (interface{}, error) {
func getCpuCoreFunc(args ...interface{}) (interface{}, error) { func getCpuCoreFunc(args ...interface{}) (interface{}, error) {
switch cpuid := args[0].(type) { switch cpuid := args[0].(type) {
case int: case int:
return topo.GetCpuCore(cpuid), nil return topo.GetHwthreadCore(cpuid), nil
} }
return -1, errors.New("function 'getCpuCore' accepts only an 'int' cpuid") return -1, errors.New("function 'getCpuCore' accepts only an 'int' cpuid")
} }
@ -255,7 +255,7 @@ func getCpuCoreFunc(args ...interface{}) (interface{}, error) {
func getCpuSocketFunc(args ...interface{}) (interface{}, error) { func getCpuSocketFunc(args ...interface{}) (interface{}, error) {
switch cpuid := args[0].(type) { switch cpuid := args[0].(type) {
case int: case int:
return topo.GetCpuSocket(cpuid), nil return topo.GetHwthreadSocket(cpuid), nil
} }
return -1, errors.New("function 'getCpuCore' accepts only an 'int' cpuid") return -1, errors.New("function 'getCpuCore' accepts only an 'int' cpuid")
} }
@ -264,7 +264,7 @@ func getCpuSocketFunc(args ...interface{}) (interface{}, error) {
func getCpuNumaDomainFunc(args ...interface{}) (interface{}, error) { func getCpuNumaDomainFunc(args ...interface{}) (interface{}, error) {
switch cpuid := args[0].(type) { switch cpuid := args[0].(type) {
case int: case int:
return topo.GetCpuNumaDomain(cpuid), nil return topo.GetHwthreadNumaDomain(cpuid), nil
} }
return -1, errors.New("function 'getCpuNuma' accepts only an 'int' cpuid") return -1, errors.New("function 'getCpuNuma' accepts only an 'int' cpuid")
} }
@ -273,7 +273,7 @@ func getCpuNumaDomainFunc(args ...interface{}) (interface{}, error) {
func getCpuDieFunc(args ...interface{}) (interface{}, error) { func getCpuDieFunc(args ...interface{}) (interface{}, error) {
switch cpuid := args[0].(type) { switch cpuid := args[0].(type) {
case int: case int:
return topo.GetCpuDie(cpuid), nil return topo.GetHwthreadDie(cpuid), nil
} }
return -1, errors.New("function 'getCpuDie' accepts only an 'int' cpuid") return -1, errors.New("function 'getCpuDie' accepts only an 'int' cpuid")
} }
@ -336,7 +336,7 @@ func getCpuListOfDieFunc(args ...interface{}) (interface{}, error) {
// wrapper function to get a list of all cpuids of the node // wrapper function to get a list of all cpuids of the node
func getCpuListOfNode(args ...interface{}) (interface{}, error) { func getCpuListOfNode(args ...interface{}) (interface{}, error) {
return topo.CpuList(), nil return topo.HwthreadList(), nil
} }
// helper function to get the cpuid list for a CCMetric type tag set (type and type-id) // helper function to get the cpuid list for a CCMetric type tag set (type and type-id)
@ -348,14 +348,14 @@ func getCpuListOfType(args ...interface{}) (interface{}, error) {
case string: case string:
switch typ { switch typ {
case "node": case "node":
return topo.CpuList(), nil return topo.HwthreadList(), nil
case "socket": case "socket":
return getCpuListOfSocketFunc(args[1]) return getCpuListOfSocketFunc(args[1])
case "numadomain": case "numadomain":
return getCpuListOfNumaDomainFunc(args[1]) return getCpuListOfNumaDomainFunc(args[1])
case "core": case "core":
return getCpuListOfCoreFunc(args[1]) return getCpuListOfCoreFunc(args[1])
case "cpu": case "hwthread":
var cpu int var cpu int
switch id := args[1].(type) { switch id := args[1].(type) {

View File

@ -52,6 +52,11 @@ The CCMetric router sits in between the collectors and the sinks and can be used
], ],
"rename_metrics" : { "rename_metrics" : {
"metric_12345" : "mymetric" "metric_12345" : "mymetric"
},
"normalize_units" : true,
"change_unit_prefix" : {
"mem_used" : "G",
"mem_total" : "G"
} }
} }
``` ```
@ -192,6 +197,14 @@ This option takes a list of evaluable conditions and performs them one after the
``` ```
The first line is comparable with the example in `drop_metrics`, it drops all metrics starting with `drop_metric_` and ending with a number. The second line drops all metrics of the first hardware thread (**not** recommended) The first line is comparable with the example in `drop_metrics`, it drops all metrics starting with `drop_metric_` and ending with a number. The second line drops all metrics of the first hardware thread (**not** recommended)
# Manipulating the metric units
## The `normalize_units` option
The cc-metric-collector tries to read the data from the system as it is reported. If available, it tries to read the metric unit from the system as well (e.g. from `/proc/meminfo`). The problem is that, depending on the source, the metric units are named differently. Just think about `byte`, `Byte`, `B`, `bytes`, ...
The [cc-units](https://github.com/ClusterCockpit/cc-units) package provides us a normalization option to use the same metric unit name for all metrics. It this option is set to true, all `unit` meta tags are normalized.
## The `change_unit_prefix` section
It is often the case that metrics are reported by the system using a rather outdated unit prefix (like `/proc/meminfo` still uses kByte despite current memory sizes are in the GByte range). If you want to change the prefix of a unit, you can do that with the help of [cc-units](https://github.com/ClusterCockpit/cc-units). The setting works on the metric name and requires the new prefix for the metric. The cc-units package determines the scaling factor.
# Aggregate metric values of the current interval with the `interval_aggregates` option # Aggregate metric values of the current interval with the `interval_aggregates` option
@ -239,3 +252,22 @@ Use cases for `interval_aggregates`:
} }
} }
``` ```
# Order of operations
The router performs the above mentioned options in a specific order. In order to get the logic you want for a specific metric, it is crucial to know the processing order:
- Add the `hostname` tag (c)
- Manipulate the timestamp to the interval timestamp (c,r)
- Drop metrics based on `drop_metrics` and `drop_metrics_if` (c,r)
- Add tags based on `add_tags` (c,r)
- Delete tags based on `del_tags` (c,r)
- Rename metric based on `rename_metric` (c,r)
- Add tags based on `add_tags` to still work if the configuration uses the new name (c,r)
- Delete tags based on `del_tags` to still work if the configuration uses the new name (c,r)
- Normalize units when `normalize_units` is set (c,r)
- Convert unit prefix based on `change_unit_prefix` (c,r)
Legend:
- 'c' if metric is coming from a collector
- 'r' if metric is coming from a receiver

View File

@ -12,6 +12,7 @@ import (
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
agg "github.com/ClusterCockpit/cc-metric-collector/internal/metricAggregator" agg "github.com/ClusterCockpit/cc-metric-collector/internal/metricAggregator"
mct "github.com/ClusterCockpit/cc-metric-collector/internal/multiChanTicker" mct "github.com/ClusterCockpit/cc-metric-collector/internal/multiChanTicker"
units "github.com/ClusterCockpit/cc-units"
) )
const ROUTER_MAX_FORWARD = 50 const ROUTER_MAX_FORWARD = 50
@ -35,6 +36,8 @@ type metricRouterConfig struct {
IntervalStamp bool `json:"interval_timestamp"` // Update timestamp periodically by ticker each interval? IntervalStamp bool `json:"interval_timestamp"` // Update timestamp periodically by ticker each interval?
NumCacheIntervals int `json:"num_cache_intervals"` // Number of intervals of cached metrics for evaluation NumCacheIntervals int `json:"num_cache_intervals"` // Number of intervals of cached metrics for evaluation
MaxForward int `json:"max_forward"` // Number of maximal forwarded metrics at one select MaxForward int `json:"max_forward"` // Number of maximal forwarded metrics at one select
NormalizeUnits bool `json:"normalize_units"` // Check unit meta flag and normalize it using cc-units
ChangeUnitPrefix map[string]string `json:"change_unit_prefix"` // Add prefix that should be applied to the metrics
dropMetrics map[string]bool // Internal map for O(1) lookup dropMetrics map[string]bool // Internal map for O(1) lookup
} }
@ -207,6 +210,38 @@ func (r *metricRouter) dropMetric(point lp.CCMetric) bool {
return false return false
} }
func (r *metricRouter) prepareUnit(point lp.CCMetric) bool {
if r.config.NormalizeUnits {
if in_unit, ok := point.GetMeta("unit"); ok {
u := units.NewUnit(in_unit)
if u.Valid() {
point.AddMeta("unit", u.Short())
}
}
}
if newP, ok := r.config.ChangeUnitPrefix[point.Name()]; ok {
newPrefix := units.NewPrefix(newP)
if in_unit, ok := point.GetMeta("unit"); ok && newPrefix != units.InvalidPrefix {
u := units.NewUnit(in_unit)
if u.Valid() {
cclog.ComponentDebug("MetricRouter", "Change prefix to", newP, "for metric", point.Name())
conv, out_unit := units.GetUnitPrefixFactor(u, newPrefix)
if conv != nil && out_unit.Valid() {
if val, ok := point.GetField("value"); ok {
point.AddField("value", conv(val))
point.AddMeta("unit", out_unit.Short())
}
}
}
}
}
return true
}
// Start starts the metric router // Start starts the metric router
func (r *metricRouter) Start() { func (r *metricRouter) Start() {
// start timer if configured // start timer if configured
@ -232,9 +267,11 @@ func (r *metricRouter) Start() {
if new, ok := r.config.RenameMetrics[name]; ok { if new, ok := r.config.RenameMetrics[name]; ok {
point.SetName(new) point.SetName(new)
point.AddMeta("oldname", name) point.AddMeta("oldname", name)
}
r.DoAddTags(point) r.DoAddTags(point)
r.DoDelTags(point) r.DoDelTags(point)
}
r.prepareUnit(point)
for _, o := range r.outputs { for _, o := range r.outputs {
o <- point o <- point

View File

@ -75,7 +75,7 @@ case "$1" in
fi fi
# Start Daemon # Start Daemon
start-stop-daemon --start -b --chdir "$WORK_DIR" --user "$CC_USER" -c "$CC_USER" --pidfile "$PID_FILE" --exec $DAEMON -- $DAEMON_OPTS start-stop-daemon --start -b --chdir "$WORK_DIR" --user "$CC_USER" -c "$CC_USER" --pidfile "$PID_FILE" --exec $DAEMON -- $CC_OPTS
return=$? return=$?
if [ $return -eq 0 ] if [ $return -eq 0 ]
then then

View File

@ -39,7 +39,7 @@ def group_to_json(groupfile):
llist = re.split("\s+", line) llist = re.split("\s+", line)
calc = llist[-1] calc = llist[-1]
metric = " ".join(llist[:-1]) metric = " ".join(llist[:-1])
scope = "cpu" scope = "hwthread"
if "BOX" in calc: if "BOX" in calc:
scope = "socket" scope = "socket"
if "PWR" in calc: if "PWR" in calc:

View File

@ -19,9 +19,9 @@ type HttpSinkConfig struct {
URL string `json:"url,omitempty"` URL string `json:"url,omitempty"`
JWT string `json:"jwt,omitempty"` JWT string `json:"jwt,omitempty"`
Timeout string `json:"timeout,omitempty"` Timeout string `json:"timeout,omitempty"`
MaxIdleConns int `json:"max_idle_connections,omitempty"`
IdleConnTimeout string `json:"idle_connection_timeout,omitempty"` IdleConnTimeout string `json:"idle_connection_timeout,omitempty"`
FlushDelay string `json:"flush_delay,omitempty"` FlushDelay string `json:"flush_delay,omitempty"`
MaxRetries int `json:"max_retries,omitempty"`
} }
type HttpSink struct { type HttpSink struct {
@ -32,58 +32,56 @@ type HttpSink struct {
buffer *bytes.Buffer buffer *bytes.Buffer
flushTimer *time.Timer flushTimer *time.Timer
config HttpSinkConfig config HttpSinkConfig
maxIdleConns int
idleConnTimeout time.Duration idleConnTimeout time.Duration
timeout time.Duration timeout time.Duration
flushDelay time.Duration flushDelay time.Duration
} }
func (s *HttpSink) Write(m lp.CCMetric) error { func (s *HttpSink) Write(m lp.CCMetric) error {
if s.buffer.Len() == 0 && s.flushDelay != 0 {
// This is the first write since the last flush, start the flushTimer!
if s.flushTimer != nil && s.flushTimer.Stop() {
cclog.ComponentDebug(s.name, "unexpected: the flushTimer was already running?")
}
// Run a batched flush for all lines that have arrived in the last second
s.flushTimer = time.AfterFunc(s.flushDelay, func() {
if err := s.Flush(); err != nil {
cclog.ComponentError(s.name, "flush failed:", err.Error())
}
})
}
p := m.ToPoint(s.meta_as_tags) p := m.ToPoint(s.meta_as_tags)
s.lock.Lock() s.lock.Lock()
firstWriteOfBatch := s.buffer.Len() == 0
_, err := s.encoder.Encode(p) _, err := s.encoder.Encode(p)
s.lock.Unlock() // defer does not work here as Flush() takes the lock as well s.lock.Unlock()
if err != nil { if err != nil {
cclog.ComponentError(s.name, "encoding failed:", err.Error()) cclog.ComponentError(s.name, "encoding failed:", err.Error())
return err return err
} }
// Flush synchronously if "flush_delay" is zero
if s.flushDelay == 0 { if s.flushDelay == 0 {
return s.Flush() return s.Flush()
} }
return err if firstWriteOfBatch {
if s.flushTimer == nil {
s.flushTimer = time.AfterFunc(s.flushDelay, func() {
if err := s.Flush(); err != nil {
cclog.ComponentError(s.name, "flush failed:", err.Error())
}
})
} else {
s.flushTimer.Reset(s.flushDelay)
}
}
return nil
} }
func (s *HttpSink) Flush() error { func (s *HttpSink) Flush() error {
// buffer is read by client.Do, prevent concurrent modifications // Own lock for as short as possible: the time it takes to copy the buffer.
s.lock.Lock() s.lock.Lock()
defer s.lock.Unlock() buf := make([]byte, s.buffer.Len())
copy(buf, s.buffer.Bytes())
// Do not flush empty buffer s.buffer.Reset()
if s.buffer.Len() == 0 { s.lock.Unlock()
if len(buf) == 0 {
return nil return nil
} }
var res *http.Response
for i := 0; i < s.config.MaxRetries; i++ {
// Create new request to send buffer // Create new request to send buffer
req, err := http.NewRequest(http.MethodPost, s.config.URL, s.buffer) req, err := http.NewRequest(http.MethodPost, s.config.URL, bytes.NewReader(buf))
if err != nil { if err != nil {
cclog.ComponentError(s.name, "failed to create request:", err.Error()) cclog.ComponentError(s.name, "failed to create request:", err.Error())
return err return err
@ -94,21 +92,25 @@ func (s *HttpSink) Flush() error {
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", s.config.JWT)) req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", s.config.JWT))
} }
// Send // Do request
res, err := s.client.Do(req) res, err = s.client.Do(req)
// Clear buffer
s.buffer.Reset()
// Handle transport/tcp errors
if err != nil { if err != nil {
cclog.ComponentError(s.name, "transport/tcp error:", err.Error()) cclog.ComponentError(s.name, "transport/tcp error:", err.Error())
return err // Wait between retries
time.Sleep(time.Duration(i+1) * (time.Second / 2))
continue
}
break
}
if res == nil {
return errors.New("flush failed due to repeated errors")
} }
// Handle application errors // Handle application errors
if res.StatusCode != http.StatusOK { if res.StatusCode != http.StatusOK {
err = errors.New(res.Status) err := errors.New(res.Status)
cclog.ComponentError(s.name, "application error:", err.Error()) cclog.ComponentError(s.name, "application error:", err.Error())
return err return err
} }
@ -128,10 +130,10 @@ func NewHttpSink(name string, config json.RawMessage) (Sink, error) {
s := new(HttpSink) s := new(HttpSink)
// Set default values // Set default values
s.name = fmt.Sprintf("HttpSink(%s)", name) s.name = fmt.Sprintf("HttpSink(%s)", name)
s.config.MaxIdleConns = 10 s.config.IdleConnTimeout = "120s" // should be larger than the measurement interval.
s.config.IdleConnTimeout = "5s"
s.config.Timeout = "5s" s.config.Timeout = "5s"
s.config.FlushDelay = "1s" s.config.FlushDelay = "5s"
s.config.MaxRetries = 3
// Read config // Read config
if len(config) > 0 { if len(config) > 0 {
@ -143,9 +145,6 @@ func NewHttpSink(name string, config json.RawMessage) (Sink, error) {
if len(s.config.URL) == 0 { if len(s.config.URL) == 0 {
return nil, errors.New("`url` config option is required for HTTP sink") return nil, errors.New("`url` config option is required for HTTP sink")
} }
if s.config.MaxIdleConns > 0 {
s.maxIdleConns = s.config.MaxIdleConns
}
if len(s.config.IdleConnTimeout) > 0 { if len(s.config.IdleConnTimeout) > 0 {
t, err := time.ParseDuration(s.config.IdleConnTimeout) t, err := time.ParseDuration(s.config.IdleConnTimeout)
if err == nil { if err == nil {
@ -170,7 +169,7 @@ func NewHttpSink(name string, config json.RawMessage) (Sink, error) {
s.meta_as_tags[k] = true s.meta_as_tags[k] = true
} }
tr := &http.Transport{ tr := &http.Transport{
MaxIdleConns: s.maxIdleConns, MaxIdleConns: 1, // We will only ever talk to one host.
IdleConnTimeout: s.idleConnTimeout, IdleConnTimeout: s.idleConnTimeout,
} }
s.client = &http.Client{Transport: tr, Timeout: s.timeout} s.client = &http.Client{Transport: tr, Timeout: s.timeout}

View File

@ -25,7 +25,6 @@ type InfluxAsyncSinkConfig struct {
Password string `json:"password,omitempty"` Password string `json:"password,omitempty"`
Organization string `json:"organization,omitempty"` Organization string `json:"organization,omitempty"`
SSL bool `json:"ssl,omitempty"` SSL bool `json:"ssl,omitempty"`
RetentionPol string `json:"retention_policy,omitempty"`
// Maximum number of points sent to server in single request. Default 5000 // Maximum number of points sent to server in single request. Default 5000
BatchSize uint `json:"batch_size,omitempty"` BatchSize uint `json:"batch_size,omitempty"`
// Interval, in ms, in which is buffer flushed if it has not been already written (by reaching batch size) . Default 1000ms // Interval, in ms, in which is buffer flushed if it has not been already written (by reaching batch size) . Default 1000ms
@ -186,12 +185,17 @@ func NewInfluxAsyncSink(name string, config json.RawMessage) (Sink, error) {
return nil, err return nil, err
} }
} }
if len(s.config.Host) == 0 || if len(s.config.Port) == 0 {
len(s.config.Port) == 0 || return nil, errors.New("Missing port configuration required by InfluxSink")
len(s.config.Database) == 0 || }
len(s.config.Organization) == 0 || if len(s.config.Database) == 0 {
len(s.config.Password) == 0 { return nil, errors.New("Missing database configuration required by InfluxSink")
return nil, errors.New("not all configuration variables set required by InfluxAsyncSink") }
if len(s.config.Organization) == 0 {
return nil, errors.New("Missing organization configuration required by InfluxSink")
}
if len(s.config.Password) == 0 {
return nil, errors.New("Missing password configuration required by InfluxSink")
} }
// Create lookup map to use meta infos as tags in the output metric // Create lookup map to use meta infos as tags in the output metric
s.meta_as_tags = make(map[string]bool) s.meta_as_tags = make(map[string]bool)

View File

@ -16,7 +16,11 @@ import (
"github.com/influxdata/influxdb-client-go/v2/api/write" "github.com/influxdata/influxdb-client-go/v2/api/write"
) )
type InfluxSinkConfig struct { type InfluxSink struct {
sink
client influxdb2.Client
writeApi influxdb2Api.WriteAPIBlocking
config struct {
defaultSinkConfig defaultSinkConfig
Host string `json:"host,omitempty"` Host string `json:"host,omitempty"`
Port string `json:"port,omitempty"` Port string `json:"port,omitempty"`
@ -25,28 +29,15 @@ type InfluxSinkConfig struct {
Password string `json:"password,omitempty"` Password string `json:"password,omitempty"`
Organization string `json:"organization,omitempty"` Organization string `json:"organization,omitempty"`
SSL bool `json:"ssl,omitempty"` SSL bool `json:"ssl,omitempty"`
FlushDelay string `json:"flush_delay,omitempty"` // Maximum number of points sent to server in single request. Default 100
BatchSize int `json:"batch_size,omitempty"` BatchSize int `json:"batch_size,omitempty"`
RetentionPol string `json:"retention_policy,omitempty"` // Interval, in which is buffer flushed if it has not been already written (by reaching batch size). Default 1s
// InfluxRetryInterval string `json:"retry_interval"` FlushInterval string `json:"flush_delay,omitempty"`
// InfluxExponentialBase uint `json:"retry_exponential_base"` }
// InfluxMaxRetries uint `json:"max_retries"`
// InfluxMaxRetryTime string `json:"max_retry_time"`
//InfluxMaxRetryDelay string `json:"max_retry_delay"` // It is mentioned in the docs but there is no way to set it
}
type InfluxSink struct {
sink
client influxdb2.Client
writeApi influxdb2Api.WriteAPIBlocking
config InfluxSinkConfig
influxRetryInterval uint
influxMaxRetryTime uint
batch []*write.Point batch []*write.Point
flushTimer *time.Timer flushTimer *time.Timer
flushDelay time.Duration flushDelay time.Duration
lock sync.Mutex // Flush() runs in another goroutine, so this lock has to protect the buffer lock sync.Mutex // Flush() runs in another goroutine, so this lock has to protect the buffer
//influxMaxRetryDelay uint
} }
// connect connects to the InfluxDB server // connect connects to the InfluxDB server
@ -76,23 +67,6 @@ func (s *InfluxSink) connect() error {
// Set influxDB client options // Set influxDB client options
clientOptions := influxdb2.DefaultOptions() clientOptions := influxdb2.DefaultOptions()
// if s.influxRetryInterval != 0 {
// cclog.ComponentDebug(s.name, "MaxRetryInterval", s.influxRetryInterval)
// clientOptions.SetMaxRetryInterval(s.influxRetryInterval)
// }
// if s.influxMaxRetryTime != 0 {
// cclog.ComponentDebug(s.name, "MaxRetryTime", s.influxMaxRetryTime)
// clientOptions.SetMaxRetryTime(s.influxMaxRetryTime)
// }
// if s.config.InfluxExponentialBase != 0 {
// cclog.ComponentDebug(s.name, "Exponential Base", s.config.InfluxExponentialBase)
// clientOptions.SetExponentialBase(s.config.InfluxExponentialBase)
// }
// if s.config.InfluxMaxRetries != 0 {
// cclog.ComponentDebug(s.name, "Max Retries", s.config.InfluxMaxRetries)
// clientOptions.SetMaxRetries(s.config.InfluxMaxRetries)
// }
// Do not check InfluxDB certificate // Do not check InfluxDB certificate
clientOptions.SetTLSConfig( clientOptions.SetTLSConfig(
&tls.Config{ &tls.Config{
@ -126,7 +100,9 @@ func (s *InfluxSink) Write(m lp.CCMetric) error {
} }
// Run a batched flush for all lines that have arrived in the last flush delay interval // Run a batched flush for all lines that have arrived in the last flush delay interval
s.flushTimer = time.AfterFunc(s.flushDelay, func() { s.flushTimer = time.AfterFunc(
s.flushDelay,
func() {
if err := s.Flush(); err != nil { if err := s.Flush(); err != nil {
cclog.ComponentError(s.name, "flush failed:", err.Error()) cclog.ComponentError(s.name, "flush failed:", err.Error())
} }
@ -194,7 +170,7 @@ func NewInfluxSink(name string, config json.RawMessage) (Sink, error) {
// Set config default values // Set config default values
s.config.BatchSize = 100 s.config.BatchSize = 100
s.config.FlushDelay = "1s" s.config.FlushInterval = "1s"
// Read config // Read config
if len(config) > 0 { if len(config) > 0 {
@ -203,12 +179,6 @@ func NewInfluxSink(name string, config json.RawMessage) (Sink, error) {
return nil, err return nil, err
} }
} }
s.influxRetryInterval = 0
s.influxMaxRetryTime = 0
// s.config.InfluxRetryInterval = ""
// s.config.InfluxMaxRetryTime = ""
// s.config.InfluxMaxRetries = 0
// s.config.InfluxExponentialBase = 0
if len(s.config.Host) == 0 { if len(s.config.Host) == 0 {
return nil, errors.New("Missing host configuration required by InfluxSink") return nil, errors.New("Missing host configuration required by InfluxSink")
@ -232,21 +202,9 @@ func NewInfluxSink(name string, config json.RawMessage) (Sink, error) {
s.meta_as_tags[k] = true s.meta_as_tags[k] = true
} }
// toUint := func(duration string, def uint) uint {
// if len(duration) > 0 {
// t, err := time.ParseDuration(duration)
// if err == nil {
// return uint(t.Milliseconds())
// }
// }
// return def
// }
// s.influxRetryInterval = toUint(s.config.InfluxRetryInterval, s.influxRetryInterval)
// s.influxMaxRetryTime = toUint(s.config.InfluxMaxRetryTime, s.influxMaxRetryTime)
// Configure flush delay duration // Configure flush delay duration
if len(s.config.FlushDelay) > 0 { if len(s.config.FlushInterval) > 0 {
t, err := time.ParseDuration(s.config.FlushDelay) t, err := time.ParseDuration(s.config.FlushInterval)
if err == nil { if err == nil {
s.flushDelay = t s.flushDelay = t
} }

View File

@ -5,6 +5,7 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"sync"
"time" "time"
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
@ -17,9 +18,10 @@ type NatsSinkConfig struct {
defaultSinkConfig defaultSinkConfig
Host string `json:"host,omitempty"` Host string `json:"host,omitempty"`
Port string `json:"port,omitempty"` Port string `json:"port,omitempty"`
Database string `json:"database,omitempty"` Subject string `json:"subject,omitempty"`
User string `json:"user,omitempty"` User string `json:"user,omitempty"`
Password string `json:"password,omitempty"` Password string `json:"password,omitempty"`
FlushDelay string `json:"flush_delay,omitempty"`
} }
type NatsSink struct { type NatsSink struct {
@ -28,6 +30,10 @@ type NatsSink struct {
encoder *influx.Encoder encoder *influx.Encoder
buffer *bytes.Buffer buffer *bytes.Buffer
config NatsSinkConfig config NatsSinkConfig
lock sync.Mutex
flushDelay time.Duration
flushTimer *time.Timer
} }
func (s *NatsSink) connect() error { func (s *NatsSink) connect() error {
@ -54,37 +60,53 @@ func (s *NatsSink) connect() error {
} }
func (s *NatsSink) Write(m lp.CCMetric) error { func (s *NatsSink) Write(m lp.CCMetric) error {
if s.client != nil { s.lock.Lock()
_, err := s.encoder.Encode(m.ToPoint(s.meta_as_tags)) _, err := s.encoder.Encode(m.ToPoint(s.meta_as_tags))
s.lock.Unlock()
if err != nil { if err != nil {
cclog.ComponentError(s.name, "Write:", err.Error()) cclog.ComponentError(s.name, "Write:", err.Error())
return err return err
} }
if s.flushDelay == 0 {
s.Flush()
} else if s.flushTimer == nil {
s.flushTimer = time.AfterFunc(s.flushDelay, func() {
s.Flush()
})
} else {
s.flushTimer.Reset(s.flushDelay)
} }
return nil return nil
} }
func (s *NatsSink) Flush() error { func (s *NatsSink) Flush() error {
if s.client != nil { s.lock.Lock()
if err := s.client.Publish(s.config.Database, s.buffer.Bytes()); err != nil { buf := append([]byte{}, s.buffer.Bytes()...) // copy bytes
s.buffer.Reset()
s.lock.Unlock()
if len(buf) == 0 {
return nil
}
if err := s.client.Publish(s.config.Subject, buf); err != nil {
cclog.ComponentError(s.name, "Flush:", err.Error()) cclog.ComponentError(s.name, "Flush:", err.Error())
return err return err
} }
s.buffer.Reset()
}
return nil return nil
} }
func (s *NatsSink) Close() { func (s *NatsSink) Close() {
if s.client != nil {
cclog.ComponentDebug(s.name, "Close") cclog.ComponentDebug(s.name, "Close")
s.client.Close() s.client.Close()
}
} }
func NewNatsSink(name string, config json.RawMessage) (Sink, error) { func NewNatsSink(name string, config json.RawMessage) (Sink, error) {
s := new(NatsSink) s := new(NatsSink)
s.name = fmt.Sprintf("NatsSink(%s)", name) s.name = fmt.Sprintf("NatsSink(%s)", name)
s.flushDelay = 10 * time.Second
if len(config) > 0 { if len(config) > 0 {
err := json.Unmarshal(config, &s.config) err := json.Unmarshal(config, &s.config)
if err != nil { if err != nil {
@ -94,7 +116,7 @@ func NewNatsSink(name string, config json.RawMessage) (Sink, error) {
} }
if len(s.config.Host) == 0 || if len(s.config.Host) == 0 ||
len(s.config.Port) == 0 || len(s.config.Port) == 0 ||
len(s.config.Database) == 0 { len(s.config.Subject) == 0 {
return nil, errors.New("not all configuration variables set required by NatsSink") return nil, errors.New("not all configuration variables set required by NatsSink")
} }
// Create lookup map to use meta infos as tags in the output metric // Create lookup map to use meta infos as tags in the output metric
@ -112,5 +134,15 @@ func NewNatsSink(name string, config json.RawMessage) (Sink, error) {
if err := s.connect(); err != nil { if err := s.connect(); err != nil {
return nil, fmt.Errorf("unable to connect: %v", err) return nil, fmt.Errorf("unable to connect: %v", err)
} }
s.flushTimer = nil
if len(s.config.FlushDelay) != 0 {
var err error
s.flushDelay, err = time.ParseDuration(s.config.FlushDelay)
if err != nil {
return nil, err
}
}
return s, nil return s, nil
} }