From 8d85bd53f15731a5f4c36722aa3f47d23886522d Mon Sep 17 00:00:00 2001 From: Thomas Gruber Date: Wed, 8 Jun 2022 15:25:40 +0200 Subject: [PATCH] Merge latest development changes to main branch (#79) * Cleanup: Remove unused code * Use Golang duration parser for 'interval' and 'duration' in main config * Update handling of LIKWID headers. Download only if not already present in the system. Fixes #73 * Units with cc-units (#64) * Add option to normalize units with cc-unit * Add unit conversion to router * Add option to change unit prefix in the router * Add to MetricRouter README * Add order of operations in router to README * Use second add_tags/del_tags only if metric gets renamed * Skip disks in DiskstatCollector that have size=0 * Check readability of sensor files in TempCollector * Fix for --once option * Rename `cpu` type to `hwthread` (#69) * Rename 'cpu' type to 'hwthread' to avoid naming clashes with MetricStore and CC-Webfrontend * Collectors in parallel (#74) * Provide info to CollectorManager whether the collector can be executed in parallel with others * Split serial and parallel collectors. Read in parallel first * Update NvidiaCollector with new metrics, MIG and NvLink support (#75) * CC topology module update (#76) * Rename CPU to hardware thread, write some comments * Do renaming in other parts * Remove CpuList and SocketList function from metricCollector. Available in ccTopology * Option to use MIG UUID as subtype-id in NvidiaCollector * Option to use MIG slice name as subtype-id in NvidiaCollector * MetricRouter: Fix JSON in README * Fix for Github Action to really use the selected version * Remove Ganglia installation in runonce Action and add Go 1.18 * Fix daemon options in init script * Add separate go.mod files to use it with deprecated 1.16 * Minor updates for Makefiles * fix string comparison * AMD ROCm SMI collector (#77) * Add collector for AMD ROCm SMI metrics * Fix import path * Fix imports * Remove Board Number * store GPU index explicitly * Remove board number from description * Use http instead of ftp to download likwid * Fix serial number in rocmCollector * Improved http sink (#78) * automatic flush in NatsSink * tweak default options of HttpSink * shorter cirt. section and retries for HttpSink * fix error handling * Remove file added by mistake. * Use http instead of ftp to download likwid * Fix serial number in rocmCollector Co-authored-by: Thomas Roehl Co-authored-by: Holger Obermaier <40787752+ho-ob@users.noreply.github.com> Co-authored-by: Lou --- .github/ci-config.json | 4 +- .github/ci-sinks.json | 4 +- .github/workflows/runonce.yml | 42 +- Makefile | 28 +- cc-metric-collector.go | 42 +- collectors/Makefile | 36 +- collectors/README.md | 1 + collectors/beegfsmetaMetric.go | 1 + collectors/beegfsstorageMetric.go | 1 + collectors/collectorManager.go | 92 +- collectors/cpufreqCpuinfoMetric.go | 3 +- collectors/cpufreqCpuinfoMetric.md | 2 +- collectors/cpufreqMetric.go | 3 +- collectors/cpufreqMetric.md | 2 +- collectors/cpustatMetric.go | 3 +- collectors/customCmdMetric.go | 1 + collectors/diskstatMetric.go | 18 +- collectors/gpfsMetric.go | 1 + collectors/infinibandMetric.go | 1 + collectors/iostatMetric.go | 1 + collectors/ipmiMetric.go | 1 + collectors/likwidMetric.go | 3 +- collectors/likwidMetric.md | 50 +- collectors/loadavgMetric.go | 1 + collectors/lustreMetric.go | 1 + collectors/memstatMetric.go | 2 + collectors/metricCollector.go | 75 +- collectors/netstatMetric.go | 1 + collectors/nfsMetric.go | 1 + collectors/numastatsMetric.go | 1 + collectors/nvidiaMetric.go | 1360 +++++++++++++---- collectors/nvidiaMetric.md | 68 +- collectors/rocmsmiMetric.go | 319 ++++ collectors/rocmsmiMetric.md | 47 + collectors/sampleMetric.go | 11 +- collectors/tempMetric.go | 5 + collectors/topprocsMetric.go | 1 + config.json | 12 +- go.mod | 23 +- go.mod.1.16 | 16 + go.mod.1.17+ | 16 + internal/ccTopology/ccTopology.go | 108 +- .../metricAggregatorFunctions.go | 14 +- internal/metricRouter/README.md | 32 + internal/metricRouter/metricRouter.go | 41 +- scripts/cc-metric-collector.init | 2 +- scripts/likwid_perfgroup_to_cc_config.py | 2 +- sinks/httpSink.go | 107 +- sinks/influxAsyncSink.go | 18 +- sinks/influxSink.go | 102 +- sinks/natsSink.go | 76 +- 51 files changed, 2097 insertions(+), 705 deletions(-) create mode 100644 collectors/rocmsmiMetric.go create mode 100644 collectors/rocmsmiMetric.md create mode 100644 go.mod.1.16 create mode 100644 go.mod.1.17+ diff --git a/.github/ci-config.json b/.github/ci-config.json index 15b2e6f..1c4ba97 100644 --- a/.github/ci-config.json +++ b/.github/ci-config.json @@ -3,6 +3,6 @@ "collectors" : ".github/ci-collectors.json", "receivers" : ".github/ci-receivers.json", "router" : ".github/ci-router.json", - "interval": 5, - "duration": 1 + "interval": "5s", + "duration": "1s" } diff --git a/.github/ci-sinks.json b/.github/ci-sinks.json index aa8ae80..2b78305 100644 --- a/.github/ci-sinks.json +++ b/.github/ci-sinks.json @@ -1,6 +1,8 @@ { "testoutput" : { "type" : "stdout", - "meta_as_tags" : true + "meta_as_tags" : [ + "unit" + ] } } diff --git a/.github/workflows/runonce.yml b/.github/workflows/runonce.yml index be161ea..a12e002 100644 --- a/.github/workflows/runonce.yml +++ b/.github/workflows/runonce.yml @@ -7,6 +7,32 @@ name: Run Test on: push jobs: + # + # Job build-1-18 + # Build on latest Ubuntu using golang version 1.18 + # + build-1-18: + runs-on: ubuntu-latest + steps: + # See: https://github.com/marketplace/actions/checkout + # Checkout git repository and submodules + - name: Checkout + uses: actions/checkout@v2 + with: + submodules: recursive + + # See: https://github.com/marketplace/actions/setup-go-environment + - name: Setup Golang + uses: actions/setup-go@v3 + with: + go-version: '1.18.2' + + - name: Build MetricCollector + run: make + + - name: Run MetricCollector once + run: ./cc-metric-collector --once --config .github/ci-config.json + # # Job build-1-17 # Build on latest Ubuntu using golang version 1.17 @@ -23,13 +49,9 @@ jobs: # See: https://github.com/marketplace/actions/setup-go-environment - name: Setup Golang - uses: actions/setup-go@v2 + uses: actions/setup-go@v3 with: - go-version: '^1.17.7' - - # Install libganglia - - name: Setup Ganglia - run: sudo apt install ganglia-monitor libganglia1 + go-version: '1.17.7' - name: Build MetricCollector run: make @@ -53,13 +75,9 @@ jobs: # See: https://github.com/marketplace/actions/setup-go-environment - name: Setup Golang - uses: actions/setup-go@v2 + uses: actions/setup-go@v3 with: - go-version: '^1.16.7' # The version AlmaLinux 8.5 uses - - # Install libganglia - - name: Setup Ganglia - run: sudo apt install ganglia-monitor libganglia1 + go-version: '1.16.7' # The version AlmaLinux 8.5 uses - name: Build MetricCollector run: make diff --git a/Makefile b/Makefile index 0a7ad04..8a6748a 100644 --- a/Makefile +++ b/Makefile @@ -16,15 +16,23 @@ COMPONENT_DIRS := collectors \ internal/multiChanTicker BINDIR = bin +GOBIN = $(shell which go) .PHONY: all all: $(APP) $(APP): $(GOSRC) + if [ "$(shell $(GOBIN) version | cut -d' ' -f 3 | cut -d '.' -f1-2)" = "go1.16" ]; then \ + echo "1.16"; \ + cp go.mod.1.16 go.mod; \ + else \ + echo "1.17+"; \ + cp go.mod.1.17+ go.mod; \ + fi make -C collectors - go get - go build -o $(APP) $(GOSRC_APP) + $(GOBIN) get + $(GOBIN) build -o $(APP) $(GOSRC_APP) install: $(APP) @WORKSPACE=$(PREFIX) @@ -51,25 +59,25 @@ clean: .PHONY: fmt fmt: - go fmt $(GOSRC_COLLECTORS) - go fmt $(GOSRC_SINKS) - go fmt $(GOSRC_RECEIVERS) - go fmt $(GOSRC_APP) - @for F in $(GOSRC_INTERNAL); do go fmt $$F; done + $(GOBIN) fmt $(GOSRC_COLLECTORS) + $(GOBIN) fmt $(GOSRC_SINKS) + $(GOBIN) fmt $(GOSRC_RECEIVERS) + $(GOBIN) fmt $(GOSRC_APP) + @for F in $(GOSRC_INTERNAL); do $(GOBIN) fmt $$F; done # Examine Go source code and reports suspicious constructs .PHONY: vet vet: - go vet ./... + $(GOBIN) vet ./... # Run linter for the Go programming language. # Using static analysis, it finds bugs and performance issues, offers simplifications, and enforces style rules .PHONY: staticcheck staticcheck: - go install honnef.co/go/tools/cmd/staticcheck@latest - $$(go env GOPATH)/bin/staticcheck ./... + $(GOBIN) install honnef.co/go/tools/cmd/staticcheck@latest + $$($(GOBIN) env GOPATH)/bin/staticcheck ./... .ONESHELL: .PHONY: RPM diff --git a/cc-metric-collector.go b/cc-metric-collector.go index e6388df..42f7843 100644 --- a/cc-metric-collector.go +++ b/cc-metric-collector.go @@ -22,8 +22,8 @@ import ( ) type CentralConfigFile struct { - Interval int `json:"interval"` - Duration int `json:"duration"` + Interval string `json:"interval"` + Duration string `json:"duration"` CollectorConfigFile string `json:"collectors"` RouterConfigFile string `json:"router"` SinkConfigFile string `json:"sinks"` @@ -173,16 +173,36 @@ func mainFunc() int { cclog.Error("Error reading configuration file ", rcfg.CliArgs["configfile"], ": ", err.Error()) return 1 } - if rcfg.ConfigFile.Interval <= 0 || time.Duration(rcfg.ConfigFile.Interval)*time.Second <= 0 { - cclog.Error("Configuration value 'interval' must be greater than zero") + + // Properly use duration parser with inputs like '60s', '5m' or similar + if len(rcfg.ConfigFile.Interval) > 0 { + t, err := time.ParseDuration(rcfg.ConfigFile.Interval) + if err != nil { + cclog.Error("Configuration value 'interval' no valid duration") + } + rcfg.Interval = t + if rcfg.Interval == 0 { + cclog.Error("Configuration value 'interval' must be greater than zero") + return 1 + } + } + + // Properly use duration parser with inputs like '60s', '5m' or similar + if len(rcfg.ConfigFile.Duration) > 0 { + t, err := time.ParseDuration(rcfg.ConfigFile.Duration) + if err != nil { + cclog.Error("Configuration value 'duration' no valid duration") + } + rcfg.Duration = t + if rcfg.Duration == 0 { + cclog.Error("Configuration value 'duration' must be greater than zero") + return 1 + } + } + if rcfg.Duration > rcfg.Interval { + cclog.Error("The interval should be greater than duration") return 1 } - rcfg.Interval = time.Duration(rcfg.ConfigFile.Interval) * time.Second - if rcfg.ConfigFile.Duration <= 0 || time.Duration(rcfg.ConfigFile.Duration)*time.Second <= 0 { - cclog.Error("Configuration value 'duration' must be greater than zero") - return 1 - } - rcfg.Duration = time.Duration(rcfg.ConfigFile.Duration) * time.Second if len(rcfg.ConfigFile.RouterConfigFile) == 0 { cclog.Error("Metric router configuration file must be set") @@ -271,7 +291,7 @@ func mainFunc() int { // Wait until one tick has passed. This is a workaround if rcfg.CliArgs["once"] == "true" { - x := 1.2 * float64(rcfg.ConfigFile.Interval) + x := 1.2 * float64(rcfg.Interval.Seconds()) time.Sleep(time.Duration(int(x)) * time.Second) shutdownSignal <- os.Interrupt } diff --git a/collectors/Makefile b/collectors/Makefile index b07bccd..37e8e67 100644 --- a/collectors/Makefile +++ b/collectors/Makefile @@ -1,22 +1,28 @@ - -all: likwid - - # LIKWID version LIKWID_VERSION = 5.2.1 +LIKWID_INSTALLED_FOLDER=$(shell dirname $(shell which likwid-topology 2>/dev/null) 2>/dev/null) + +LIKWID_FOLDER="$(shell pwd)/likwid" + +all: $(LIKWID_FOLDER)/likwid.h .ONESHELL: -.PHONY: likwid -likwid: - INSTALL_FOLDER="$${PWD}/likwid" - BUILD_FOLDER="$${PWD}/likwidbuild" - if [ -d $${INSTALL_FOLDER} ]; then rm -r $${INSTALL_FOLDER}; fi - mkdir --parents --verbose $${INSTALL_FOLDER} $${BUILD_FOLDER} - wget -P "$${BUILD_FOLDER}" ftp://ftp.rrze.uni-erlangen.de/mirrors/likwid/likwid-$(LIKWID_VERSION).tar.gz - tar -C $${BUILD_FOLDER} -xf $${BUILD_FOLDER}/likwid-$(LIKWID_VERSION).tar.gz - install -Dpm 0644 $${BUILD_FOLDER}/likwid-$(LIKWID_VERSION)/src/includes/likwid*.h $${INSTALL_FOLDER}/ - install -Dpm 0644 $${BUILD_FOLDER}/likwid-$(LIKWID_VERSION)/src/includes/bstrlib.h $${INSTALL_FOLDER}/ - rm -r $${BUILD_FOLDER} +.PHONY: $(LIKWID_FOLDER)/likwid.h +$(LIKWID_FOLDER)/likwid.h: + if [ "$(LIKWID_INSTALLED_FOLDER)" != "" ]; then \ + BASE="$(LIKWID_INSTALLED_FOLDER)/../include"; \ + mkdir -p $(LIKWID_FOLDER); \ + cp $$BASE/*.h $(LIKWID_FOLDER); \ + else \ + BUILD_FOLDER="$${PWD}/likwidbuild"; \ + if [ -d $(LIKWID_FOLDER) ]; then rm -r $(LIKWID_FOLDER); fi; \ + mkdir --parents --verbose $(LIKWID_FOLDER) $${BUILD_FOLDER}; \ + wget -P "$${BUILD_FOLDER}" http://ftp.rrze.uni-erlangen.de/mirrors/likwid/likwid-$(LIKWID_VERSION).tar.gz; \ + tar -C $${BUILD_FOLDER} -xf $${BUILD_FOLDER}/likwid-$(LIKWID_VERSION).tar.gz; \ + install -Dpm 0644 $${BUILD_FOLDER}/likwid-$(LIKWID_VERSION)/src/includes/likwid*.h $(LIKWID_FOLDER)/; \ + install -Dpm 0644 $${BUILD_FOLDER}/likwid-$(LIKWID_VERSION)/src/includes/bstrlib.h $(LIKWID_FOLDER)/; \ + rm -r $${BUILD_FOLDER}; \ + fi clean: diff --git a/collectors/README.md b/collectors/README.md index 3fcdd49..10e5105 100644 --- a/collectors/README.md +++ b/collectors/README.md @@ -39,6 +39,7 @@ In contrast to the configuration files for sinks and receivers, the collectors c * [`gpfs`](./gpfsMetric.md) * [`beegfs_meta`](./beegfsmetaMetric.md) * [`beegfs_storage`](./beegfsstorageMetric.md) +* [`rocm_smi`](./rocmsmiMetric.md) ## Todos diff --git a/collectors/beegfsmetaMetric.go b/collectors/beegfsmetaMetric.go index 57b1e39..a27faf2 100644 --- a/collectors/beegfsmetaMetric.go +++ b/collectors/beegfsmetaMetric.go @@ -55,6 +55,7 @@ func (m *BeegfsMetaCollector) Init(config json.RawMessage) error { m.name = "BeegfsMetaCollector" m.setup() + m.parallel = true // Set default beegfs-ctl binary m.config.Beegfs = DEFAULT_BEEGFS_CMD diff --git a/collectors/beegfsstorageMetric.go b/collectors/beegfsstorageMetric.go index cbc8314..1160664 100644 --- a/collectors/beegfsstorageMetric.go +++ b/collectors/beegfsstorageMetric.go @@ -48,6 +48,7 @@ func (m *BeegfsStorageCollector) Init(config json.RawMessage) error { m.name = "BeegfsStorageCollector" m.setup() + m.parallel = true // Set default beegfs-ctl binary m.config.Beegfs = DEFAULT_BEEGFS_CMD diff --git a/collectors/collectorManager.go b/collectors/collectorManager.go index e9ccfe7..49a9db8 100644 --- a/collectors/collectorManager.go +++ b/collectors/collectorManager.go @@ -14,39 +14,43 @@ import ( // Map of all available metric collectors var AvailableCollectors = map[string]MetricCollector{ - "likwid": new(LikwidCollector), - "loadavg": new(LoadavgCollector), - "memstat": new(MemstatCollector), - "netstat": new(NetstatCollector), - "ibstat": new(InfinibandCollector), - "lustrestat": new(LustreCollector), - "cpustat": new(CpustatCollector), - "topprocs": new(TopProcsCollector), - "nvidia": new(NvidiaCollector), - "customcmd": new(CustomCmdCollector), - "iostat": new(IOstatCollector), - "diskstat": new(DiskstatCollector), - "tempstat": new(TempCollector), - "ipmistat": new(IpmiCollector), - "gpfs": new(GpfsCollector), - "cpufreq": new(CPUFreqCollector), - "cpufreq_cpuinfo": new(CPUFreqCpuInfoCollector), - "nfs3stat": new(Nfs3Collector), - "nfs4stat": new(Nfs4Collector), - "numastats": new(NUMAStatsCollector), - "beegfs_meta": new(BeegfsMetaCollector), - "beegfs_storage": new(BeegfsStorageCollector), + "likwid": new(LikwidCollector), + "loadavg": new(LoadavgCollector), + "memstat": new(MemstatCollector), + "netstat": new(NetstatCollector), + "ibstat": new(InfinibandCollector), + "lustrestat": new(LustreCollector), + "cpustat": new(CpustatCollector), + "topprocs": new(TopProcsCollector), + "nvidia": new(NvidiaCollector), + "customcmd": new(CustomCmdCollector), + "iostat": new(IOstatCollector), + "diskstat": new(DiskstatCollector), + "tempstat": new(TempCollector), + "ipmistat": new(IpmiCollector), + "gpfs": new(GpfsCollector), + "cpufreq": new(CPUFreqCollector), + "cpufreq_cpuinfo": new(CPUFreqCpuInfoCollector), + "nfs3stat": new(Nfs3Collector), + "nfs4stat": new(Nfs4Collector), + "numastats": new(NUMAStatsCollector), + "beegfs_meta": new(BeegfsMetaCollector), + "beegfs_storage": new(BeegfsStorageCollector), + "rocm_smi": new(RocmSmiCollector), } // Metric collector manager data structure type collectorManager struct { - collectors []MetricCollector // List of metric collectors to use - output chan lp.CCMetric // Output channels - done chan bool // channel to finish / stop metric collector manager - ticker mct.MultiChanTicker // periodically ticking once each interval - duration time.Duration // duration (for metrics that measure over a given duration) - wg *sync.WaitGroup // wait group for all goroutines in cc-metric-collector - config map[string]json.RawMessage // json encoded config for collector manager + collectors []MetricCollector // List of metric collectors to read in parallel + serial []MetricCollector // List of metric collectors to read serially + output chan lp.CCMetric // Output channels + done chan bool // channel to finish / stop metric collector manager + ticker mct.MultiChanTicker // periodically ticking once each interval + duration time.Duration // duration (for metrics that measure over a given duration) + wg *sync.WaitGroup // wait group for all goroutines in cc-metric-collector + config map[string]json.RawMessage // json encoded config for collector manager + collector_wg sync.WaitGroup // internally used wait group for the parallel reading of collector + parallel_run bool // Flag whether the collectors are currently read in parallel } // Metric collector manager access functions @@ -66,6 +70,7 @@ type CollectorManager interface { // Initialization is done for all configured collectors func (cm *collectorManager) Init(ticker mct.MultiChanTicker, duration time.Duration, wg *sync.WaitGroup, collectConfigFile string) error { cm.collectors = make([]MetricCollector, 0) + cm.serial = make([]MetricCollector, 0) cm.output = nil cm.done = make(chan bool) cm.wg = wg @@ -100,7 +105,11 @@ func (cm *collectorManager) Init(ticker mct.MultiChanTicker, duration time.Durat continue } cclog.ComponentDebug("CollectorManager", "ADD COLLECTOR", collector.Name()) - cm.collectors = append(cm.collectors, collector) + if collector.Parallel() { + cm.collectors = append(cm.collectors, collector) + } else { + cm.serial = append(cm.serial, collector) + } } return nil } @@ -116,6 +125,10 @@ func (cm *collectorManager) Start() { // Collector manager is done done := func() { // close all metric collectors + if cm.parallel_run { + cm.collector_wg.Wait() + cm.parallel_run = false + } for _, c := range cm.collectors { c.Close() } @@ -130,7 +143,26 @@ func (cm *collectorManager) Start() { done() return case t := <-tick: + cm.parallel_run = true for _, c := range cm.collectors { + // Wait for done signal or execute the collector + select { + case <-cm.done: + done() + return + default: + // Read metrics from collector c via goroutine + cclog.ComponentDebug("CollectorManager", c.Name(), t) + cm.collector_wg.Add(1) + go func(myc MetricCollector) { + myc.Read(cm.duration, cm.output) + cm.collector_wg.Done() + }(c) + } + } + cm.collector_wg.Wait() + cm.parallel_run = false + for _, c := range cm.serial { // Wait for done signal or execute the collector select { case <-cm.done: diff --git a/collectors/cpufreqCpuinfoMetric.go b/collectors/cpufreqCpuinfoMetric.go index 6c3de7a..80732ff 100644 --- a/collectors/cpufreqCpuinfoMetric.go +++ b/collectors/cpufreqCpuinfoMetric.go @@ -48,6 +48,7 @@ func (m *CPUFreqCpuInfoCollector) Init(config json.RawMessage) error { m.setup() m.name = "CPUFreqCpuInfoCollector" + m.parallel = true m.meta = map[string]string{ "source": m.name, "group": "CPU", @@ -150,7 +151,7 @@ func (m *CPUFreqCpuInfoCollector) Init(config json.RawMessage) error { t.numNonHT = numNonHT t.numNonHT_int = numNonHT_int t.tagSet = map[string]string{ - "type": "cpu", + "type": "hwthread", "type-id": t.processor, "package_id": t.physicalPackageID, } diff --git a/collectors/cpufreqCpuinfoMetric.md b/collectors/cpufreqCpuinfoMetric.md index 8b0216f..de93176 100644 --- a/collectors/cpufreqCpuinfoMetric.md +++ b/collectors/cpufreqCpuinfoMetric.md @@ -4,7 +4,7 @@ "cpufreq_cpuinfo": {} ``` -The `cpufreq_cpuinfo` collector reads the clock frequency from `/proc/cpuinfo` and outputs a handful **cpu** metrics. +The `cpufreq_cpuinfo` collector reads the clock frequency from `/proc/cpuinfo` and outputs a handful **hwthread** metrics. Metrics: * `cpufreq` diff --git a/collectors/cpufreqMetric.go b/collectors/cpufreqMetric.go index 0bf6d4c..3099900 100644 --- a/collectors/cpufreqMetric.go +++ b/collectors/cpufreqMetric.go @@ -53,6 +53,7 @@ func (m *CPUFreqCollector) Init(config json.RawMessage) error { m.name = "CPUFreqCollector" m.setup() + m.parallel = true if len(config) > 0 { err := json.Unmarshal(config, &m.config) if err != nil { @@ -161,7 +162,7 @@ func (m *CPUFreqCollector) Init(config json.RawMessage) error { t.numNonHT = numNonHT t.numNonHT_int = numNonHT_int t.tagSet = map[string]string{ - "type": "cpu", + "type": "hwthread", "type-id": t.processor, "package_id": t.physicalPackageID, } diff --git a/collectors/cpufreqMetric.md b/collectors/cpufreqMetric.md index b62d16e..71a6446 100644 --- a/collectors/cpufreqMetric.md +++ b/collectors/cpufreqMetric.md @@ -5,7 +5,7 @@ } ``` -The `cpufreq` collector reads the clock frequency from `/sys/devices/system/cpu/cpu*/cpufreq` and outputs a handful **cpu** metrics. +The `cpufreq` collector reads the clock frequency from `/sys/devices/system/cpu/cpu*/cpufreq` and outputs a handful **hwthread** metrics. Metrics: * `cpufreq` \ No newline at end of file diff --git a/collectors/cpustatMetric.go b/collectors/cpustatMetric.go index 556aad4..c0dcf13 100644 --- a/collectors/cpustatMetric.go +++ b/collectors/cpustatMetric.go @@ -30,6 +30,7 @@ type CpustatCollector struct { func (m *CpustatCollector) Init(config json.RawMessage) error { m.name = "CpustatCollector" m.setup() + m.parallel = true m.meta = map[string]string{"source": m.name, "group": "CPU", "unit": "Percent"} m.nodetags = map[string]string{"type": "node"} if len(config) > 0 { @@ -82,7 +83,7 @@ func (m *CpustatCollector) Init(config json.RawMessage) error { if strings.HasPrefix(linefields[0], "cpu") && strings.Compare(linefields[0], "cpu") != 0 { cpustr := strings.TrimLeft(linefields[0], "cpu") cpu, _ := strconv.Atoi(cpustr) - m.cputags[linefields[0]] = map[string]string{"type": "cpu", "type-id": fmt.Sprintf("%d", cpu)} + m.cputags[linefields[0]] = map[string]string{"type": "hwthread", "type-id": fmt.Sprintf("%d", cpu)} num_cpus++ } } diff --git a/collectors/customCmdMetric.go b/collectors/customCmdMetric.go index ec2109b..492dd48 100644 --- a/collectors/customCmdMetric.go +++ b/collectors/customCmdMetric.go @@ -33,6 +33,7 @@ type CustomCmdCollector struct { func (m *CustomCmdCollector) Init(config json.RawMessage) error { var err error m.name = "CustomCmdCollector" + m.parallel = true m.meta = map[string]string{"source": m.name, "group": "Custom"} if len(config) > 0 { err = json.Unmarshal(config, &m.config) diff --git a/collectors/diskstatMetric.go b/collectors/diskstatMetric.go index 4910c83..69ffe07 100644 --- a/collectors/diskstatMetric.go +++ b/collectors/diskstatMetric.go @@ -29,6 +29,7 @@ type DiskstatCollector struct { func (m *DiskstatCollector) Init(config json.RawMessage) error { m.name = "DiskstatCollector" + m.parallel = true m.meta = map[string]string{"source": m.name, "group": "Disk"} m.setup() if len(config) > 0 { @@ -77,11 +78,18 @@ func (m *DiskstatCollector) Read(interval time.Duration, output chan lp.CCMetric continue } path := strings.Replace(linefields[1], `\040`, " ", -1) - stat := syscall.Statfs_t{} + stat := syscall.Statfs_t{ + Blocks: 0, + Bsize: 0, + Bfree: 0, + } err := syscall.Statfs(path, &stat) if err != nil { continue } + if stat.Blocks == 0 || stat.Bsize == 0 { + continue + } tags := map[string]string{"type": "node", "device": linefields[0]} total := (stat.Blocks * uint64(stat.Bsize)) / uint64(1000000000) y, err := lp.New("disk_total", tags, m.meta, map[string]interface{}{"value": total}, time.Now()) @@ -95,9 +103,11 @@ func (m *DiskstatCollector) Read(interval time.Duration, output chan lp.CCMetric y.AddMeta("unit", "GBytes") output <- y } - perc := (100 * (total - free)) / total - if perc > part_max_used { - part_max_used = perc + if total > 0 { + perc := (100 * (total - free)) / total + if perc > part_max_used { + part_max_used = perc + } } } y, err := lp.New("part_max_used", map[string]string{"type": "node"}, m.meta, map[string]interface{}{"value": int(part_max_used)}, time.Now()) diff --git a/collectors/gpfsMetric.go b/collectors/gpfsMetric.go index ed63201..ca9affe 100644 --- a/collectors/gpfsMetric.go +++ b/collectors/gpfsMetric.go @@ -46,6 +46,7 @@ func (m *GpfsCollector) Init(config json.RawMessage) error { var err error m.name = "GpfsCollector" m.setup() + m.parallel = true // Set default mmpmon binary m.config.Mmpmon = DEFAULT_GPFS_CMD diff --git a/collectors/infinibandMetric.go b/collectors/infinibandMetric.go index 274e669..92ea911 100644 --- a/collectors/infinibandMetric.go +++ b/collectors/infinibandMetric.go @@ -54,6 +54,7 @@ func (m *InfinibandCollector) Init(config json.RawMessage) error { var err error m.name = "InfinibandCollector" m.setup() + m.parallel = true m.meta = map[string]string{ "source": m.name, "group": "Network", diff --git a/collectors/iostatMetric.go b/collectors/iostatMetric.go index ca7f33c..19b4157 100644 --- a/collectors/iostatMetric.go +++ b/collectors/iostatMetric.go @@ -37,6 +37,7 @@ type IOstatCollector struct { func (m *IOstatCollector) Init(config json.RawMessage) error { var err error m.name = "IOstatCollector" + m.parallel = true m.meta = map[string]string{"source": m.name, "group": "Disk"} m.setup() if len(config) > 0 { diff --git a/collectors/ipmiMetric.go b/collectors/ipmiMetric.go index 16b08ef..50605ac 100644 --- a/collectors/ipmiMetric.go +++ b/collectors/ipmiMetric.go @@ -34,6 +34,7 @@ type IpmiCollector struct { func (m *IpmiCollector) Init(config json.RawMessage) error { m.name = "IpmiCollector" m.setup() + m.parallel = true m.meta = map[string]string{"source": m.name, "group": "IPMI"} m.config.IpmitoolPath = string(IPMITOOL_PATH) m.config.IpmisensorsPath = string(IPMISENSORS_PATH) diff --git a/collectors/likwidMetric.go b/collectors/likwidMetric.go index f2229d1..c036415 100644 --- a/collectors/likwidMetric.go +++ b/collectors/likwidMetric.go @@ -177,6 +177,7 @@ func getBaseFreq() float64 { func (m *LikwidCollector) Init(config json.RawMessage) error { m.name = "LikwidCollector" + m.parallel = false m.initialized = false m.running = false m.config.AccessMode = LIKWID_DEF_ACCESSMODE @@ -204,7 +205,7 @@ func (m *LikwidCollector) Init(config json.RawMessage) error { m.meta = map[string]string{"group": "PerfCounter"} cclog.ComponentDebug(m.name, "Get cpulist and init maps and lists") - cpulist := topo.CpuList() + cpulist := topo.HwthreadList() m.cpulist = make([]C.int, len(cpulist)) m.cpu2tid = make(map[int]int) for i, c := range cpulist { diff --git a/collectors/likwidMetric.md b/collectors/likwidMetric.md index 2d622d1..1bb211f 100644 --- a/collectors/likwidMetric.md +++ b/collectors/likwidMetric.md @@ -19,7 +19,7 @@ The `likwid` collector is probably the most complicated collector. The LIKWID li "calc": "COUNTER0 + COUNTER1", "publish": false, "unit": "myunit", - "type": "cpu" + "type": "hwthread" } ] } @@ -30,7 +30,7 @@ The `likwid` collector is probably the most complicated collector. The LIKWID li "calc": "sum_01", "publish": true, "unit": "myunit", - "type": "cpu" + "type": "hwthread" } ] } @@ -51,15 +51,15 @@ Additional options: Hardware performance counters are scattered all over the system nowadays. A counter coveres a specific part of the system. While there are hardware thread specific counter for CPU cycles, instructions and so on, some others are specific for a whole CPU socket/package. To address that, the LikwidCollector provides the specification of a `type` for each metric. -- `cpu` : One metric per CPU hardware thread with the tags `"type" : "cpu"` and `"type-id" : "$cpu_id"` +- `hwthread` : One metric per CPU hardware thread with the tags `"type" : "hwthread"` and `"type-id" : "$hwthread_id"` - `socket` : One metric per CPU socket/package with the tags `"type" : "socket"` and `"type-id" : "$socket_id"` -**Note:** You should not specify the `socket` type for a metric that is measured at `cpu` scope and vice versa, so some kind of expert knowledge or lookup work in the [Likwid Wiki](https://github.com/RRZE-HPC/likwid/wiki) is required. Get the scope of each counter from the *Architecture* pages and as soon as one counter in a metric is socket-specific, the whole metric is socket-specific. +**Note:** You cannot specify `socket` scope for a metric that is measured at `hwthread` scope, so some kind of expert knowledge or lookup work in the [Likwid Wiki](https://github.com/RRZE-HPC/likwid/wiki) is required. Get the scope of each counter from the *Architecture* pages and as soon as one counter in a metric is socket-specific, the whole metric is socket-specific. As a guideline: -- All counters `FIXCx`, `PMCy` and `TMAz` have the scope `cpu` +- All counters `FIXCx`, `PMCy` and `TMAz` have the scope `hwthread` - All counters names containing `BOX` have the scope `socket` -- All `PWRx` counters have scope `socket`, except `"PWR1" : "RAPL_CORE_ENERGY"` has `cpu` scope (AMD Zen) +- All `PWRx` counters have scope `socket`, except `"PWR1" : "RAPL_CORE_ENERGY"` has `hwthread` scope - All `DFCx` counters have scope `socket` ### Help with the configuration @@ -90,7 +90,7 @@ $ scripts/likwid_perfgroup_to_cc_config.py ICX MEM_DP "name": "Runtime (RDTSC) [s]", "publish": true, "unit": "seconds" - "scope": "cpu" + "scope": "hwthread" }, { "..." : "..." @@ -147,20 +147,20 @@ One might think this does not happen often but often used metrics in the world o { "name": "ipc", "calc": "PMC0/PMC1", - "type": "cpu", + "type": "hwthread", "publish": true }, { "name": "flops_any", "calc": "0.000001*PMC2/time", "unit": "MFlops/s", - "type": "cpu", + "type": "hwthread", "publish": true }, { "name": "clock", "calc": "0.000001*(FIXC1/FIXC2)/inverseClock", - "type": "cpu", + "type": "hwthread", "unit": "MHz", "publish": true }, @@ -219,3 +219,33 @@ One might think this does not happen often but often used metrics in the world o } ``` +### How to get the eventsets and metrics from LIKWID + +The `likwid` collector reads hardware performance counters at a **hwthread** and **socket** level. The configuration looks quite complicated but it is basically copy&paste from [LIKWID's performance groups](https://github.com/RRZE-HPC/likwid/tree/master/groups). The collector made multiple iterations and tried to use the performance groups but it lacked flexibility. The current way of configuration provides most flexibility. + +The logic is as following: There are multiple eventsets, each consisting of a list of counters+events and a list of metrics. If you compare a common performance group with the example setting above, there is not much difference: +``` +EVENTSET -> "events": { +FIXC1 ACTUAL_CPU_CLOCK -> "FIXC1": "ACTUAL_CPU_CLOCK", +FIXC2 MAX_CPU_CLOCK -> "FIXC2": "MAX_CPU_CLOCK", +PMC0 RETIRED_INSTRUCTIONS -> "PMC0" : "RETIRED_INSTRUCTIONS", +PMC1 CPU_CLOCKS_UNHALTED -> "PMC1" : "CPU_CLOCKS_UNHALTED", +PMC2 RETIRED_SSE_AVX_FLOPS_ALL -> "PMC2": "RETIRED_SSE_AVX_FLOPS_ALL", +PMC3 MERGE -> "PMC3": "MERGE", + -> } +``` + +The metrics are following the same procedure: + +``` +METRICS -> "metrics": [ +IPC PMC0/PMC1 -> { + -> "name" : "IPC", + -> "calc" : "PMC0/PMC1", + -> "scope": "hwthread", + -> "publish": true + -> } + -> ] +``` + +The script `scripts/likwid_perfgroup_to_cc_config.py` might help you. diff --git a/collectors/loadavgMetric.go b/collectors/loadavgMetric.go index 3859721..58fb102 100644 --- a/collectors/loadavgMetric.go +++ b/collectors/loadavgMetric.go @@ -36,6 +36,7 @@ type LoadavgCollector struct { func (m *LoadavgCollector) Init(config json.RawMessage) error { m.name = "LoadavgCollector" + m.parallel = true m.setup() if len(config) > 0 { err := json.Unmarshal(config, &m.config) diff --git a/collectors/lustreMetric.go b/collectors/lustreMetric.go index d5a96e4..eade2ca 100644 --- a/collectors/lustreMetric.go +++ b/collectors/lustreMetric.go @@ -288,6 +288,7 @@ var LustreDeriveMetrics = []LustreMetricDefinition{ func (m *LustreCollector) Init(config json.RawMessage) error { var err error m.name = "LustreCollector" + m.parallel = true if len(config) > 0 { err = json.Unmarshal(config, &m.config) if err != nil { diff --git a/collectors/memstatMetric.go b/collectors/memstatMetric.go index c6c7f34..9841a01 100644 --- a/collectors/memstatMetric.go +++ b/collectors/memstatMetric.go @@ -81,6 +81,7 @@ func getStats(filename string) map[string]MemstatStats { func (m *MemstatCollector) Init(config json.RawMessage) error { var err error m.name = "MemstatCollector" + m.parallel = true m.config.NodeStats = true m.config.NumaStats = false if len(config) > 0 { @@ -159,6 +160,7 @@ func (m *MemstatCollector) Init(config json.RawMessage) error { func (m *MemstatCollector) Read(interval time.Duration, output chan lp.CCMetric) { if !m.init { + cclog.ComponentPrint(m.name, "Here") return } diff --git a/collectors/metricCollector.go b/collectors/metricCollector.go index 7c04e90..4d52571 100644 --- a/collectors/metricCollector.go +++ b/collectors/metricCollector.go @@ -3,27 +3,25 @@ package collectors import ( "encoding/json" "fmt" - "io/ioutil" - "log" - "strconv" - "strings" "time" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" ) type MetricCollector interface { - Name() string // Name of the metric collector - Init(config json.RawMessage) error // Initialize metric collector - Initialized() bool // Is metric collector initialized? + Name() string // Name of the metric collector + Init(config json.RawMessage) error // Initialize metric collector + Initialized() bool // Is metric collector initialized? + Parallel() bool Read(duration time.Duration, output chan lp.CCMetric) // Read metrics from metric collector Close() // Close / finish metric collector } type metricCollector struct { - name string // name of the metric - init bool // is metric collector initialized? - meta map[string]string // static meta data tags + name string // name of the metric + init bool // is metric collector initialized? + parallel bool // can the metric collector be executed in parallel with others + meta map[string]string // static meta data tags } // Name returns the name of the metric collector @@ -31,6 +29,11 @@ func (c *metricCollector) Name() string { return c.name } +// Name returns the name of the metric collector +func (c *metricCollector) Parallel() bool { + return c.parallel +} + // Setup is for future use func (c *metricCollector) setup() error { return nil @@ -65,58 +68,6 @@ func stringArrayContains(array []string, str string) (int, bool) { return -1, false } -// SocketList returns the list of physical sockets as read from /proc/cpuinfo -func SocketList() []int { - buffer, err := ioutil.ReadFile("/proc/cpuinfo") - if err != nil { - log.Print(err) - return nil - } - ll := strings.Split(string(buffer), "\n") - var packs []int - for _, line := range ll { - if strings.HasPrefix(line, "physical id") { - lv := strings.Fields(line) - id, err := strconv.ParseInt(lv[3], 10, 32) - if err != nil { - log.Print(err) - return packs - } - _, found := intArrayContains(packs, int(id)) - if !found { - packs = append(packs, int(id)) - } - } - } - return packs -} - -// CpuList returns the list of physical CPUs (in contrast to logical CPUs) as read from /proc/cpuinfo -func CpuList() []int { - buffer, err := ioutil.ReadFile("/proc/cpuinfo") - if err != nil { - log.Print(err) - return nil - } - ll := strings.Split(string(buffer), "\n") - var cpulist []int - for _, line := range ll { - if strings.HasPrefix(line, "processor") { - lv := strings.Fields(line) - id, err := strconv.ParseInt(lv[2], 10, 32) - if err != nil { - log.Print(err) - return cpulist - } - _, found := intArrayContains(cpulist, int(id)) - if !found { - cpulist = append(cpulist, int(id)) - } - } - } - return cpulist -} - // RemoveFromStringList removes the string r from the array of strings s // If r is not contained in the array an error is returned func RemoveFromStringList(s []string, r string) ([]string, error) { diff --git a/collectors/netstatMetric.go b/collectors/netstatMetric.go index d171d4b..8cfb34e 100644 --- a/collectors/netstatMetric.go +++ b/collectors/netstatMetric.go @@ -39,6 +39,7 @@ type NetstatCollector struct { func (m *NetstatCollector) Init(config json.RawMessage) error { m.name = "NetstatCollector" + m.parallel = true m.setup() m.lastTimestamp = time.Now() diff --git a/collectors/nfsMetric.go b/collectors/nfsMetric.go index c511b0d..6b15784 100644 --- a/collectors/nfsMetric.go +++ b/collectors/nfsMetric.go @@ -114,6 +114,7 @@ func (m *nfsCollector) MainInit(config json.RawMessage) error { m.data = make(map[string]NfsCollectorData) m.initStats() m.init = true + m.parallel = true return nil } diff --git a/collectors/numastatsMetric.go b/collectors/numastatsMetric.go index 52a2638..f65a019 100644 --- a/collectors/numastatsMetric.go +++ b/collectors/numastatsMetric.go @@ -54,6 +54,7 @@ func (m *NUMAStatsCollector) Init(config json.RawMessage) error { } m.name = "NUMAStatsCollector" + m.parallel = true m.setup() m.meta = map[string]string{ "source": m.name, diff --git a/collectors/nvidiaMetric.go b/collectors/nvidiaMetric.go index 24f0855..458ecd4 100644 --- a/collectors/nvidiaMetric.go +++ b/collectors/nvidiaMetric.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "log" + "strings" "time" cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" @@ -13,22 +14,30 @@ import ( ) type NvidiaCollectorConfig struct { - ExcludeMetrics []string `json:"exclude_metrics,omitempty"` - ExcludeDevices []string `json:"exclude_devices,omitempty"` - AddPciInfoTag bool `json:"add_pci_info_tag,omitempty"` + ExcludeMetrics []string `json:"exclude_metrics,omitempty"` + ExcludeDevices []string `json:"exclude_devices,omitempty"` + AddPciInfoTag bool `json:"add_pci_info_tag,omitempty"` + UsePciInfoAsTypeId bool `json:"use_pci_info_as_type_id,omitempty"` + AddUuidMeta bool `json:"add_uuid_meta,omitempty"` + AddBoardNumberMeta bool `json:"add_board_number_meta,omitempty"` + AddSerialMeta bool `json:"add_serial_meta,omitempty"` + ProcessMigDevices bool `json:"process_mig_devices,omitempty"` + UseUuidForMigDevices bool `json:"use_uuid_for_mig_device,omitempty"` + UseSliceForMigDevices bool `json:"use_slice_for_mig_device,omitempty"` } type NvidiaCollectorDevice struct { device nvml.Device excludeMetrics map[string]bool tags map[string]string + meta map[string]string } type NvidiaCollector struct { metricCollector - num_gpus int config NvidiaCollectorConfig gpus []NvidiaCollectorDevice + num_gpus int } func (m *NvidiaCollector) CatchPanic() { @@ -42,6 +51,10 @@ func (m *NvidiaCollector) Init(config json.RawMessage) error { var err error m.name = "NvidiaCollector" m.config.AddPciInfoTag = false + m.config.UsePciInfoAsTypeId = false + m.config.ProcessMigDevices = false + m.config.UseUuidForMigDevices = false + m.config.UseSliceForMigDevices = false m.setup() if len(config) > 0 { err = json.Unmarshal(config, &m.config) @@ -54,7 +67,6 @@ func (m *NvidiaCollector) Init(config json.RawMessage) error { "group": "Nvidia", } - m.num_gpus = 0 defer m.CatchPanic() // Initialize NVIDIA Management Library (NVML) @@ -74,13 +86,14 @@ func (m *NvidiaCollector) Init(config json.RawMessage) error { } // For all GPUs + idx := 0 m.gpus = make([]NvidiaCollectorDevice, num_gpus) for i := 0; i < num_gpus; i++ { - g := &m.gpus[i] - // Skip excluded devices + // Skip excluded devices by ID str_i := fmt.Sprintf("%d", i) if _, skip := stringArrayContains(m.config.ExcludeDevices, str_i); skip { + cclog.ComponentDebug(m.name, "Skipping excluded device", str_i) continue } @@ -89,14 +102,85 @@ func (m *NvidiaCollector) Init(config json.RawMessage) error { if ret != nvml.SUCCESS { err = errors.New(nvml.ErrorString(ret)) cclog.ComponentError(m.name, "Unable to get device at index", i, ":", err.Error()) - return err + continue } + + // Get device's PCI info + pciInfo, ret := nvml.DeviceGetPciInfo(device) + if ret != nvml.SUCCESS { + err = errors.New(nvml.ErrorString(ret)) + cclog.ComponentError(m.name, "Unable to get PCI info for device at index", i, ":", err.Error()) + continue + } + // Create PCI ID in the common format used by the NVML. + pci_id := fmt.Sprintf( + nvml.DEVICE_PCI_BUS_ID_FMT, + pciInfo.Domain, + pciInfo.Bus, + pciInfo.Device) + + // Skip excluded devices specified by PCI ID + if _, skip := stringArrayContains(m.config.ExcludeDevices, pci_id); skip { + cclog.ComponentDebug(m.name, "Skipping excluded device", pci_id) + continue + } + + // Select which value to use as 'type-id'. + // The PCI ID is commonly required in SLURM environments because the + // numberic IDs used by SLURM and the ones used by NVML might differ + // depending on the job type. The PCI ID is more reliable but is commonly + // not recorded for a job, so it must be added manually in prologue or epilogue + // e.g. to the comment field + tid := str_i + if m.config.UsePciInfoAsTypeId { + tid = pci_id + } + + // Now we got all infos together, populate the device list + g := &m.gpus[idx] + + // Add device handle g.device = device // Add tags g.tags = map[string]string{ "type": "accelerator", - "type-id": str_i, + "type-id": tid, + } + + // Add PCI info as tag if not already used as 'type-id' + if m.config.AddPciInfoTag && !m.config.UsePciInfoAsTypeId { + g.tags["pci_identifier"] = pci_id + } + + g.meta = map[string]string{ + "source": m.name, + "group": "Nvidia", + } + + if m.config.AddBoardNumberMeta { + board, ret := nvml.DeviceGetBoardPartNumber(device) + if ret != nvml.SUCCESS { + cclog.ComponentError(m.name, "Unable to get boart part number for device at index", i, ":", err.Error()) + } else { + g.meta["board_number"] = board + } + } + if m.config.AddSerialMeta { + serial, ret := nvml.DeviceGetSerial(device) + if ret != nvml.SUCCESS { + cclog.ComponentError(m.name, "Unable to get serial number for device at index", i, ":", err.Error()) + } else { + g.meta["serial"] = serial + } + } + if m.config.AddUuidMeta { + uuid, ret := nvml.DeviceGetUUID(device) + if ret != nvml.SUCCESS { + cclog.ComponentError(m.name, "Unable to get UUID for device at index", i, ":", err.Error()) + } else { + g.meta["uuid"] = uuid + } } // Add excluded metrics @@ -105,363 +189,1003 @@ func (m *NvidiaCollector) Init(config json.RawMessage) error { g.excludeMetrics[e] = true } - // Add PCI info as tag - if m.config.AddPciInfoTag { - pciInfo, ret := nvml.DeviceGetPciInfo(g.device) - if ret != nvml.SUCCESS { - err = errors.New(nvml.ErrorString(ret)) - cclog.ComponentError(m.name, "Unable to get PCI info for device at index", i, ":", err.Error()) - return err - } - g.tags["pci_identifier"] = fmt.Sprintf( - "%08X:%02X:%02X.0", - pciInfo.Domain, - pciInfo.Bus, - pciInfo.Device) - } + // Increment the index for the next device + idx++ } + m.num_gpus = idx m.init = true return nil } +func readMemoryInfo(device NvidiaCollectorDevice, output chan lp.CCMetric) error { + if !device.excludeMetrics["nv_fb_mem_total"] || !device.excludeMetrics["nv_fb_mem_used"] || !device.excludeMetrics["nv_fb_mem_reserved"] { + var total uint64 + var used uint64 + var reserved uint64 = 0 + var v2 bool = false + meminfo, ret := nvml.DeviceGetMemoryInfo(device.device) + if ret != nvml.SUCCESS { + err := errors.New(nvml.ErrorString(ret)) + return err + } + total = meminfo.Total + used = meminfo.Used + + if !device.excludeMetrics["nv_fb_mem_total"] { + t := float64(total) / (1024 * 1024) + y, err := lp.New("nv_fb_mem_total", device.tags, device.meta, map[string]interface{}{"value": t}, time.Now()) + if err == nil { + y.AddMeta("unit", "MByte") + output <- y + } + } + + if !device.excludeMetrics["nv_fb_mem_used"] { + f := float64(used) / (1024 * 1024) + y, err := lp.New("nv_fb_mem_used", device.tags, device.meta, map[string]interface{}{"value": f}, time.Now()) + if err == nil { + y.AddMeta("unit", "MByte") + output <- y + } + } + + if v2 && !device.excludeMetrics["nv_fb_mem_reserved"] { + r := float64(reserved) / (1024 * 1024) + y, err := lp.New("nv_fb_mem_reserved", device.tags, device.meta, map[string]interface{}{"value": r}, time.Now()) + if err == nil { + y.AddMeta("unit", "MByte") + output <- y + } + } + } + return nil +} + +func readBarMemoryInfo(device NvidiaCollectorDevice, output chan lp.CCMetric) error { + if !device.excludeMetrics["nv_bar1_mem_total"] || !device.excludeMetrics["nv_bar1_mem_used"] { + meminfo, ret := nvml.DeviceGetBAR1MemoryInfo(device.device) + if ret != nvml.SUCCESS { + err := errors.New(nvml.ErrorString(ret)) + return err + } + if !device.excludeMetrics["nv_bar1_mem_total"] { + t := float64(meminfo.Bar1Total) / (1024 * 1024) + y, err := lp.New("nv_bar1_mem_total", device.tags, device.meta, map[string]interface{}{"value": t}, time.Now()) + if err == nil { + y.AddMeta("unit", "MByte") + output <- y + } + } + if !device.excludeMetrics["nv_bar1_mem_used"] { + t := float64(meminfo.Bar1Used) / (1024 * 1024) + y, err := lp.New("nv_bar1_mem_used", device.tags, device.meta, map[string]interface{}{"value": t}, time.Now()) + if err == nil { + y.AddMeta("unit", "MByte") + output <- y + } + } + } + return nil +} + +func readUtilization(device NvidiaCollectorDevice, output chan lp.CCMetric) error { + isMig, ret := nvml.DeviceIsMigDeviceHandle(device.device) + if ret != nvml.SUCCESS { + err := errors.New(nvml.ErrorString(ret)) + return err + } + if !isMig { + return nil + } + + if !device.excludeMetrics["nv_util"] || !device.excludeMetrics["nv_mem_util"] { + // Retrieves the current utilization rates for the device's major subsystems. + // + // Available utilization rates + // * Gpu: Percent of time over the past sample period during which one or more kernels was executing on the GPU. + // * Memory: Percent of time over the past sample period during which global (device) memory was being read or written + // + // Note: + // * During driver initialization when ECC is enabled one can see high GPU and Memory Utilization readings. + // This is caused by ECC Memory Scrubbing mechanism that is performed during driver initialization. + // * On MIG-enabled GPUs, querying device utilization rates is not currently supported. + util, ret := nvml.DeviceGetUtilizationRates(device.device) + if ret == nvml.SUCCESS { + if !device.excludeMetrics["nv_util"] { + y, err := lp.New("nv_util", device.tags, device.meta, map[string]interface{}{"value": float64(util.Gpu)}, time.Now()) + if err == nil { + y.AddMeta("unit", "%") + output <- y + } + } + if !device.excludeMetrics["nv_mem_util"] { + y, err := lp.New("nv_mem_util", device.tags, device.meta, map[string]interface{}{"value": float64(util.Memory)}, time.Now()) + if err == nil { + y.AddMeta("unit", "%") + output <- y + } + } + } + } + return nil +} + +func readTemp(device NvidiaCollectorDevice, output chan lp.CCMetric) error { + if !device.excludeMetrics["nv_temp"] { + // Retrieves the current temperature readings for the device, in degrees C. + // + // Available temperature sensors: + // * TEMPERATURE_GPU: Temperature sensor for the GPU die. + // * NVML_TEMPERATURE_COUNT + temp, ret := nvml.DeviceGetTemperature(device.device, nvml.TEMPERATURE_GPU) + if ret == nvml.SUCCESS { + y, err := lp.New("nv_temp", device.tags, device.meta, map[string]interface{}{"value": float64(temp)}, time.Now()) + if err == nil { + y.AddMeta("unit", "degC") + output <- y + } + } + } + return nil +} + +func readFan(device NvidiaCollectorDevice, output chan lp.CCMetric) error { + if !device.excludeMetrics["nv_fan"] { + // Retrieves the intended operating speed of the device's fan. + // + // Note: The reported speed is the intended fan speed. + // If the fan is physically blocked and unable to spin, the output will not match the actual fan speed. + // + // For all discrete products with dedicated fans. + // + // The fan speed is expressed as a percentage of the product's maximum noise tolerance fan speed. + // This value may exceed 100% in certain cases. + fan, ret := nvml.DeviceGetFanSpeed(device.device) + if ret == nvml.SUCCESS { + y, err := lp.New("nv_fan", device.tags, device.meta, map[string]interface{}{"value": float64(fan)}, time.Now()) + if err == nil { + y.AddMeta("unit", "%") + output <- y + } + } + } + return nil +} + +// func readFans(device NvidiaCollectorDevice, output chan lp.CCMetric) error { +// if !device.excludeMetrics["nv_fan"] { +// numFans, ret := nvml.DeviceGetNumFans(device.device) +// if ret == nvml.SUCCESS { +// for i := 0; i < numFans; i++ { +// fan, ret := nvml.DeviceGetFanSpeed_v2(device.device, i) +// if ret == nvml.SUCCESS { +// y, err := lp.New("nv_fan", device.tags, device.meta, map[string]interface{}{"value": float64(fan)}, time.Now()) +// if err == nil { +// y.AddMeta("unit", "%") +// y.AddTag("stype", "fan") +// y.AddTag("stype-id", fmt.Sprintf("%d", i)) +// output <- y +// } +// } +// } +// } +// } +// return nil +// } + +func readEccMode(device NvidiaCollectorDevice, output chan lp.CCMetric) error { + if !device.excludeMetrics["nv_ecc_mode"] { + // Retrieves the current and pending ECC modes for the device. + // + // For Fermi or newer fully supported devices. Only applicable to devices with ECC. + // Requires NVML_INFOROM_ECC version 1.0 or higher. + // + // Changing ECC modes requires a reboot. + // The "pending" ECC mode refers to the target mode following the next reboot. + _, ecc_pend, ret := nvml.DeviceGetEccMode(device.device) + if ret == nvml.SUCCESS { + var y lp.CCMetric + var err error + switch ecc_pend { + case nvml.FEATURE_DISABLED: + y, err = lp.New("nv_ecc_mode", device.tags, device.meta, map[string]interface{}{"value": "OFF"}, time.Now()) + case nvml.FEATURE_ENABLED: + y, err = lp.New("nv_ecc_mode", device.tags, device.meta, map[string]interface{}{"value": "ON"}, time.Now()) + default: + y, err = lp.New("nv_ecc_mode", device.tags, device.meta, map[string]interface{}{"value": "UNKNOWN"}, time.Now()) + } + if err == nil { + output <- y + } + } else if ret == nvml.ERROR_NOT_SUPPORTED { + y, err := lp.New("nv_ecc_mode", device.tags, device.meta, map[string]interface{}{"value": "N/A"}, time.Now()) + if err == nil { + output <- y + } + } + } + return nil +} + +func readPerfState(device NvidiaCollectorDevice, output chan lp.CCMetric) error { + if !device.excludeMetrics["nv_perf_state"] { + // Retrieves the current performance state for the device. + // + // Allowed PStates: + // 0: Maximum Performance. + // .. + // 15: Minimum Performance. + // 32: Unknown performance state. + pState, ret := nvml.DeviceGetPerformanceState(device.device) + if ret == nvml.SUCCESS { + y, err := lp.New("nv_perf_state", device.tags, device.meta, map[string]interface{}{"value": fmt.Sprintf("P%d", int(pState))}, time.Now()) + if err == nil { + output <- y + } + } + } + return nil +} + +func readPowerUsage(device NvidiaCollectorDevice, output chan lp.CCMetric) error { + if !device.excludeMetrics["nv_power_usage"] { + // Retrieves power usage for this GPU in milliwatts and its associated circuitry (e.g. memory) + // + // On Fermi and Kepler GPUs the reading is accurate to within +/- 5% of current power draw. + // + // It is only available if power management mode is supported + mode, ret := nvml.DeviceGetPowerManagementMode(device.device) + if ret != nvml.SUCCESS { + return nil + } + if mode == nvml.FEATURE_ENABLED { + power, ret := nvml.DeviceGetPowerUsage(device.device) + if ret == nvml.SUCCESS { + y, err := lp.New("nv_power_usage", device.tags, device.meta, map[string]interface{}{"value": float64(power) / 1000}, time.Now()) + if err == nil { + y.AddMeta("unit", "watts") + output <- y + } + } + } + } + return nil +} + +func readClocks(device NvidiaCollectorDevice, output chan lp.CCMetric) error { + // Retrieves the current clock speeds for the device. + // + // Available clock information: + // * CLOCK_GRAPHICS: Graphics clock domain. + // * CLOCK_SM: Streaming Multiprocessor clock domain. + // * CLOCK_MEM: Memory clock domain. + if !device.excludeMetrics["nv_graphics_clock"] { + graphicsClock, ret := nvml.DeviceGetClockInfo(device.device, nvml.CLOCK_GRAPHICS) + if ret == nvml.SUCCESS { + y, err := lp.New("nv_graphics_clock", device.tags, device.meta, map[string]interface{}{"value": float64(graphicsClock)}, time.Now()) + if err == nil { + y.AddMeta("unit", "MHz") + output <- y + } + } + } + + if !device.excludeMetrics["nv_sm_clock"] { + smCock, ret := nvml.DeviceGetClockInfo(device.device, nvml.CLOCK_SM) + if ret == nvml.SUCCESS { + y, err := lp.New("nv_sm_clock", device.tags, device.meta, map[string]interface{}{"value": float64(smCock)}, time.Now()) + if err == nil { + y.AddMeta("unit", "MHz") + output <- y + } + } + } + + if !device.excludeMetrics["nv_mem_clock"] { + memClock, ret := nvml.DeviceGetClockInfo(device.device, nvml.CLOCK_MEM) + if ret == nvml.SUCCESS { + y, err := lp.New("nv_mem_clock", device.tags, device.meta, map[string]interface{}{"value": float64(memClock)}, time.Now()) + if err == nil { + y.AddMeta("unit", "MHz") + output <- y + } + } + } + if !device.excludeMetrics["nv_video_clock"] { + memClock, ret := nvml.DeviceGetClockInfo(device.device, nvml.CLOCK_VIDEO) + if ret == nvml.SUCCESS { + y, err := lp.New("nv_video_clock", device.tags, device.meta, map[string]interface{}{"value": float64(memClock)}, time.Now()) + if err == nil { + y.AddMeta("unit", "MHz") + output <- y + } + } + } + return nil +} + +func readMaxClocks(device NvidiaCollectorDevice, output chan lp.CCMetric) error { + // Retrieves the maximum clock speeds for the device. + // + // Available clock information: + // * CLOCK_GRAPHICS: Graphics clock domain. + // * CLOCK_SM: Streaming multiprocessor clock domain. + // * CLOCK_MEM: Memory clock domain. + // * CLOCK_VIDEO: Video encoder/decoder clock domain. + // * CLOCK_COUNT: Count of clock types. + // + // Note: + /// On GPUs from Fermi family current P0 clocks (reported by nvmlDeviceGetClockInfo) can differ from max clocks by few MHz. + if !device.excludeMetrics["nv_max_graphics_clock"] { + max_gclk, ret := nvml.DeviceGetMaxClockInfo(device.device, nvml.CLOCK_GRAPHICS) + if ret == nvml.SUCCESS { + y, err := lp.New("nv_max_graphics_clock", device.tags, device.meta, map[string]interface{}{"value": float64(max_gclk)}, time.Now()) + if err == nil { + y.AddMeta("unit", "MHz") + output <- y + } + } + } + + if !device.excludeMetrics["nv_max_sm_clock"] { + maxSmClock, ret := nvml.DeviceGetClockInfo(device.device, nvml.CLOCK_SM) + if ret == nvml.SUCCESS { + y, err := lp.New("nv_max_sm_clock", device.tags, device.meta, map[string]interface{}{"value": float64(maxSmClock)}, time.Now()) + if err == nil { + y.AddMeta("unit", "MHz") + output <- y + } + } + } + + if !device.excludeMetrics["nv_max_mem_clock"] { + maxMemClock, ret := nvml.DeviceGetClockInfo(device.device, nvml.CLOCK_MEM) + if ret == nvml.SUCCESS { + y, err := lp.New("nv_max_mem_clock", device.tags, device.meta, map[string]interface{}{"value": float64(maxMemClock)}, time.Now()) + if err == nil { + y.AddMeta("unit", "MHz") + output <- y + } + } + } + + if !device.excludeMetrics["nv_max_video_clock"] { + maxMemClock, ret := nvml.DeviceGetClockInfo(device.device, nvml.CLOCK_VIDEO) + if ret == nvml.SUCCESS { + y, err := lp.New("nv_max_video_clock", device.tags, device.meta, map[string]interface{}{"value": float64(maxMemClock)}, time.Now()) + if err == nil { + y.AddMeta("unit", "MHz") + output <- y + } + } + } + return nil +} + +func readEccErrors(device NvidiaCollectorDevice, output chan lp.CCMetric) error { + if !device.excludeMetrics["nv_ecc_uncorrected_error"] { + // Retrieves the total ECC error counts for the device. + // + // For Fermi or newer fully supported devices. + // Only applicable to devices with ECC. + // Requires NVML_INFOROM_ECC version 1.0 or higher. + // Requires ECC Mode to be enabled. + // + // The total error count is the sum of errors across each of the separate memory systems, + // i.e. the total set of errors across the entire device. + ecc_db, ret := nvml.DeviceGetTotalEccErrors(device.device, nvml.MEMORY_ERROR_TYPE_UNCORRECTED, nvml.AGGREGATE_ECC) + if ret == nvml.SUCCESS { + y, err := lp.New("nv_ecc_uncorrected_error", device.tags, device.meta, map[string]interface{}{"value": float64(ecc_db)}, time.Now()) + if err == nil { + output <- y + } + } + } + if !device.excludeMetrics["nv_ecc_corrected_error"] { + ecc_sb, ret := nvml.DeviceGetTotalEccErrors(device.device, nvml.MEMORY_ERROR_TYPE_CORRECTED, nvml.AGGREGATE_ECC) + if ret == nvml.SUCCESS { + y, err := lp.New("nv_ecc_corrected_error", device.tags, device.meta, map[string]interface{}{"value": float64(ecc_sb)}, time.Now()) + if err == nil { + output <- y + } + } + } + return nil +} + +func readPowerLimit(device NvidiaCollectorDevice, output chan lp.CCMetric) error { + if !device.excludeMetrics["nv_power_max_limit"] { + // Retrieves the power management limit associated with this device. + // + // For Fermi or newer fully supported devices. + // + // The power limit defines the upper boundary for the card's power draw. + // If the card's total power draw reaches this limit the power management algorithm kicks in. + pwr_limit, ret := nvml.DeviceGetPowerManagementLimit(device.device) + if ret == nvml.SUCCESS { + y, err := lp.New("nv_power_max_limit", device.tags, device.meta, map[string]interface{}{"value": float64(pwr_limit) / 1000}, time.Now()) + if err == nil { + y.AddMeta("unit", "watts") + output <- y + } + } + } + return nil +} + +func readEncUtilization(device NvidiaCollectorDevice, output chan lp.CCMetric) error { + isMig, ret := nvml.DeviceIsMigDeviceHandle(device.device) + if ret != nvml.SUCCESS { + err := errors.New(nvml.ErrorString(ret)) + return err + } + if !isMig { + return nil + } + if !device.excludeMetrics["nv_encoder_util"] { + // Retrieves the current utilization and sampling size in microseconds for the Encoder + // + // For Kepler or newer fully supported devices. + // + // Note: On MIG-enabled GPUs, querying encoder utilization is not currently supported. + enc_util, _, ret := nvml.DeviceGetEncoderUtilization(device.device) + if ret == nvml.SUCCESS { + y, err := lp.New("nv_encoder_util", device.tags, device.meta, map[string]interface{}{"value": float64(enc_util)}, time.Now()) + if err == nil { + y.AddMeta("unit", "%") + output <- y + } + } + } + return nil +} + +func readDecUtilization(device NvidiaCollectorDevice, output chan lp.CCMetric) error { + isMig, ret := nvml.DeviceIsMigDeviceHandle(device.device) + if ret != nvml.SUCCESS { + err := errors.New(nvml.ErrorString(ret)) + return err + } + if !isMig { + return nil + } + if !device.excludeMetrics["nv_decoder_util"] { + // Retrieves the current utilization and sampling size in microseconds for the Encoder + // + // For Kepler or newer fully supported devices. + // + // Note: On MIG-enabled GPUs, querying encoder utilization is not currently supported. + dec_util, _, ret := nvml.DeviceGetDecoderUtilization(device.device) + if ret == nvml.SUCCESS { + y, err := lp.New("nv_decoder_util", device.tags, device.meta, map[string]interface{}{"value": float64(dec_util)}, time.Now()) + if err == nil { + y.AddMeta("unit", "%") + output <- y + } + } + } + return nil +} + +func readRemappedRows(device NvidiaCollectorDevice, output chan lp.CCMetric) error { + if !device.excludeMetrics["nv_remapped_rows_corrected"] || + !device.excludeMetrics["nv_remapped_rows_uncorrected"] || + !device.excludeMetrics["nv_remapped_rows_pending"] || + !device.excludeMetrics["nv_remapped_rows_failure"] { + // Get number of remapped rows. The number of rows reported will be based on the cause of the remapping. + // isPending indicates whether or not there are pending remappings. + // A reset will be required to actually remap the row. + // failureOccurred will be set if a row remapping ever failed in the past. + // A pending remapping won't affect future work on the GPU since error-containment and dynamic page blacklisting will take care of that. + // + // For Ampere or newer fully supported devices. + // + // Note: On MIG-enabled GPUs with active instances, querying the number of remapped rows is not supported + corrected, uncorrected, pending, failure, ret := nvml.DeviceGetRemappedRows(device.device) + if ret == nvml.SUCCESS { + if !device.excludeMetrics["nv_remapped_rows_corrected"] { + y, err := lp.New("nv_remapped_rows_corrected", device.tags, device.meta, map[string]interface{}{"value": float64(corrected)}, time.Now()) + if err == nil { + output <- y + } + } + if !device.excludeMetrics["nv_remapped_rows_uncorrected"] { + y, err := lp.New("nv_remapped_rows_corrected", device.tags, device.meta, map[string]interface{}{"value": float64(uncorrected)}, time.Now()) + if err == nil { + output <- y + } + } + if !device.excludeMetrics["nv_remapped_rows_pending"] { + var p int = 0 + if pending { + p = 1 + } + y, err := lp.New("nv_remapped_rows_pending", device.tags, device.meta, map[string]interface{}{"value": p}, time.Now()) + if err == nil { + output <- y + } + } + if !device.excludeMetrics["nv_remapped_rows_failure"] { + var f int = 0 + if failure { + f = 1 + } + y, err := lp.New("nv_remapped_rows_failure", device.tags, device.meta, map[string]interface{}{"value": f}, time.Now()) + if err == nil { + output <- y + } + } + } + } + return nil +} + +func readProcessCounts(device NvidiaCollectorDevice, output chan lp.CCMetric) error { + if !device.excludeMetrics["nv_compute_processes"] { + // Get information about processes with a compute context on a device + // + // For Fermi &tm; or newer fully supported devices. + // + // This function returns information only about compute running processes (e.g. CUDA application which have + // active context). Any graphics applications (e.g. using OpenGL, DirectX) won't be listed by this function. + // + // To query the current number of running compute processes, call this function with *infoCount = 0. The + // return code will be NVML_ERROR_INSUFFICIENT_SIZE, or NVML_SUCCESS if none are running. For this call + // \a infos is allowed to be NULL. + // + // The usedGpuMemory field returned is all of the memory used by the application. + // + // Keep in mind that information returned by this call is dynamic and the number of elements might change in + // time. Allocate more space for \a infos table in case new compute processes are spawned. + // + // @note In MIG mode, if device handle is provided, the API returns aggregate information, only if + // the caller has appropriate privileges. Per-instance information can be queried by using + // specific MIG device handles. + // Querying per-instance information using MIG device handles is not supported if the device is in vGPU Host virtualization mode. + procList, ret := nvml.DeviceGetComputeRunningProcesses(device.device) + if ret == nvml.SUCCESS { + y, err := lp.New("nv_compute_processes", device.tags, device.meta, map[string]interface{}{"value": len(procList)}, time.Now()) + if err == nil { + output <- y + } + } + } + if !device.excludeMetrics["nv_graphics_processes"] { + // Get information about processes with a graphics context on a device + // + // For Kepler &tm; or newer fully supported devices. + // + // This function returns information only about graphics based processes + // (eg. applications using OpenGL, DirectX) + // + // To query the current number of running graphics processes, call this function with *infoCount = 0. The + // return code will be NVML_ERROR_INSUFFICIENT_SIZE, or NVML_SUCCESS if none are running. For this call + // \a infos is allowed to be NULL. + // + // The usedGpuMemory field returned is all of the memory used by the application. + // + // Keep in mind that information returned by this call is dynamic and the number of elements might change in + // time. Allocate more space for \a infos table in case new graphics processes are spawned. + // + // @note In MIG mode, if device handle is provided, the API returns aggregate information, only if + // the caller has appropriate privileges. Per-instance information can be queried by using + // specific MIG device handles. + // Querying per-instance information using MIG device handles is not supported if the device is in vGPU Host virtualization mode. + procList, ret := nvml.DeviceGetGraphicsRunningProcesses(device.device) + if ret == nvml.SUCCESS { + y, err := lp.New("nv_graphics_processes", device.tags, device.meta, map[string]interface{}{"value": len(procList)}, time.Now()) + if err == nil { + output <- y + } + } + } + // if !device.excludeMetrics["nv_mps_compute_processes"] { + // // Get information about processes with a MPS compute context on a device + // // + // // For Volta &tm; or newer fully supported devices. + // // + // // This function returns information only about compute running processes (e.g. CUDA application which have + // // active context) utilizing MPS. Any graphics applications (e.g. using OpenGL, DirectX) won't be listed by + // // this function. + // // + // // To query the current number of running compute processes, call this function with *infoCount = 0. The + // // return code will be NVML_ERROR_INSUFFICIENT_SIZE, or NVML_SUCCESS if none are running. For this call + // // \a infos is allowed to be NULL. + // // + // // The usedGpuMemory field returned is all of the memory used by the application. + // // + // // Keep in mind that information returned by this call is dynamic and the number of elements might change in + // // time. Allocate more space for \a infos table in case new compute processes are spawned. + // // + // // @note In MIG mode, if device handle is provided, the API returns aggregate information, only if + // // the caller has appropriate privileges. Per-instance information can be queried by using + // // specific MIG device handles. + // // Querying per-instance information using MIG device handles is not supported if the device is in vGPU Host virtualization mode. + // procList, ret := nvml.DeviceGetMPSComputeRunningProcesses(device.device) + // if ret == nvml.SUCCESS { + // y, err := lp.New("nv_mps_compute_processes", device.tags, device.meta, map[string]interface{}{"value": len(procList)}, time.Now()) + // if err == nil { + // output <- y + // } + // } + // } + return nil +} + +func readViolationStats(device NvidiaCollectorDevice, output chan lp.CCMetric) error { + var violTime nvml.ViolationTime + var ret nvml.Return + + // Gets the duration of time during which the device was throttled (lower than requested clocks) due to power + // or thermal constraints. + // + // The method is important to users who are tying to understand if their GPUs throttle at any point during their applications. The + // difference in violation times at two different reference times gives the indication of GPU throttling event. + // + // Violation for thermal capping is not supported at this time. + // + // For Kepler or newer fully supported devices. + + if !device.excludeMetrics["nv_violation_power"] { + // How long did power violations cause the GPU to be below application clocks + violTime, ret = nvml.DeviceGetViolationStatus(device.device, nvml.PERF_POLICY_POWER) + if ret == nvml.SUCCESS { + t := float64(violTime.ViolationTime) * 1e-9 + y, err := lp.New("nv_violation_power", device.tags, device.meta, map[string]interface{}{"value": t}, time.Now()) + if err == nil { + y.AddMeta("unit", "sec") + output <- y + } + } + } + if !device.excludeMetrics["nv_violation_thermal"] { + // How long did thermal violations cause the GPU to be below application clocks + violTime, ret = nvml.DeviceGetViolationStatus(device.device, nvml.PERF_POLICY_THERMAL) + if ret == nvml.SUCCESS { + t := float64(violTime.ViolationTime) * 1e-9 + y, err := lp.New("nv_violation_thermal", device.tags, device.meta, map[string]interface{}{"value": t}, time.Now()) + if err == nil { + y.AddMeta("unit", "sec") + output <- y + } + } + } + if !device.excludeMetrics["nv_violation_sync_boost"] { + // How long did sync boost cause the GPU to be below application clocks + violTime, ret = nvml.DeviceGetViolationStatus(device.device, nvml.PERF_POLICY_SYNC_BOOST) + if ret == nvml.SUCCESS { + t := float64(violTime.ViolationTime) * 1e-9 + y, err := lp.New("nv_violation_sync_boost", device.tags, device.meta, map[string]interface{}{"value": t}, time.Now()) + if err == nil { + y.AddMeta("unit", "sec") + output <- y + } + } + } + if !device.excludeMetrics["nv_violation_board_limit"] { + // How long did the board limit cause the GPU to be below application clocks + violTime, ret = nvml.DeviceGetViolationStatus(device.device, nvml.PERF_POLICY_BOARD_LIMIT) + if ret == nvml.SUCCESS { + t := float64(violTime.ViolationTime) * 1e-9 + y, err := lp.New("nv_violation_board_limit", device.tags, device.meta, map[string]interface{}{"value": t}, time.Now()) + if err == nil { + y.AddMeta("unit", "sec") + output <- y + } + } + } + if !device.excludeMetrics["nv_violation_low_util"] { + // How long did low utilization cause the GPU to be below application clocks + violTime, ret = nvml.DeviceGetViolationStatus(device.device, nvml.PERF_POLICY_LOW_UTILIZATION) + if ret == nvml.SUCCESS { + t := float64(violTime.ViolationTime) * 1e-9 + y, err := lp.New("nv_violation_low_util", device.tags, device.meta, map[string]interface{}{"value": t}, time.Now()) + if err == nil { + y.AddMeta("unit", "sec") + output <- y + } + } + } + if !device.excludeMetrics["nv_violation_reliability"] { + // How long did the board reliability limit cause the GPU to be below application clocks + violTime, ret = nvml.DeviceGetViolationStatus(device.device, nvml.PERF_POLICY_RELIABILITY) + if ret == nvml.SUCCESS { + t := float64(violTime.ViolationTime) * 1e-9 + y, err := lp.New("nv_violation_reliability", device.tags, device.meta, map[string]interface{}{"value": t}, time.Now()) + if err == nil { + y.AddMeta("unit", "sec") + output <- y + } + } + } + if !device.excludeMetrics["nv_violation_below_app_clock"] { + // Total time the GPU was held below application clocks by any limiter (all of above) + violTime, ret = nvml.DeviceGetViolationStatus(device.device, nvml.PERF_POLICY_TOTAL_APP_CLOCKS) + if ret == nvml.SUCCESS { + t := float64(violTime.ViolationTime) * 1e-9 + y, err := lp.New("nv_violation_below_app_clock", device.tags, device.meta, map[string]interface{}{"value": t}, time.Now()) + if err == nil { + y.AddMeta("unit", "sec") + output <- y + } + } + } + if !device.excludeMetrics["nv_violation_below_base_clock"] { + // Total time the GPU was held below base clocks + violTime, ret = nvml.DeviceGetViolationStatus(device.device, nvml.PERF_POLICY_TOTAL_BASE_CLOCKS) + if ret == nvml.SUCCESS { + t := float64(violTime.ViolationTime) * 1e-9 + y, err := lp.New("nv_violation_below_base_clock", device.tags, device.meta, map[string]interface{}{"value": t}, time.Now()) + if err == nil { + y.AddMeta("unit", "sec") + output <- y + } + } + } + + return nil +} + +func readNVLinkStats(device NvidiaCollectorDevice, output chan lp.CCMetric) error { + // Retrieves the specified error counter value + // Please refer to \a nvmlNvLinkErrorCounter_t for error counters that are available + // + // For Pascal &tm; or newer fully supported devices. + + for i := 0; i < nvml.NVLINK_MAX_LINKS; i++ { + state, ret := nvml.DeviceGetNvLinkState(device.device, i) + if ret == nvml.SUCCESS { + if state == nvml.FEATURE_ENABLED { + if !device.excludeMetrics["nv_nvlink_crc_errors"] { + // Data link receive data CRC error counter + count, ret := nvml.DeviceGetNvLinkErrorCounter(device.device, i, nvml.NVLINK_ERROR_DL_CRC_DATA) + if ret == nvml.SUCCESS { + y, err := lp.New("nv_nvlink_crc_errors", device.tags, device.meta, map[string]interface{}{"value": count}, time.Now()) + if err == nil { + y.AddTag("stype", "nvlink") + y.AddTag("stype-id", fmt.Sprintf("%d", i)) + output <- y + } + } + } + if !device.excludeMetrics["nv_nvlink_ecc_errors"] { + // Data link receive data ECC error counter + count, ret := nvml.DeviceGetNvLinkErrorCounter(device.device, i, nvml.NVLINK_ERROR_DL_ECC_DATA) + if ret == nvml.SUCCESS { + y, err := lp.New("nv_nvlink_ecc_errors", device.tags, device.meta, map[string]interface{}{"value": count}, time.Now()) + if err == nil { + y.AddTag("stype", "nvlink") + y.AddTag("stype-id", fmt.Sprintf("%d", i)) + output <- y + } + } + } + if !device.excludeMetrics["nv_nvlink_replay_errors"] { + // Data link transmit replay error counter + count, ret := nvml.DeviceGetNvLinkErrorCounter(device.device, i, nvml.NVLINK_ERROR_DL_REPLAY) + if ret == nvml.SUCCESS { + y, err := lp.New("nv_nvlink_replay_errors", device.tags, device.meta, map[string]interface{}{"value": count}, time.Now()) + if err == nil { + y.AddTag("stype", "nvlink") + y.AddTag("stype-id", fmt.Sprintf("%d", i)) + output <- y + } + } + } + if !device.excludeMetrics["nv_nvlink_recovery_errors"] { + // Data link transmit recovery error counter + count, ret := nvml.DeviceGetNvLinkErrorCounter(device.device, i, nvml.NVLINK_ERROR_DL_RECOVERY) + if ret == nvml.SUCCESS { + y, err := lp.New("nv_nvlink_recovery_errors", device.tags, device.meta, map[string]interface{}{"value": count}, time.Now()) + if err == nil { + y.AddTag("stype", "nvlink") + y.AddTag("stype-id", fmt.Sprintf("%d", i)) + output <- y + } + } + } + if !device.excludeMetrics["nv_nvlink_crc_flit_errors"] { + // Data link receive flow control digit CRC error counter + count, ret := nvml.DeviceGetNvLinkErrorCounter(device.device, i, nvml.NVLINK_ERROR_DL_CRC_FLIT) + if ret == nvml.SUCCESS { + y, err := lp.New("nv_nvlink_crc_flit_errors", device.tags, device.meta, map[string]interface{}{"value": count}, time.Now()) + if err == nil { + y.AddTag("stype", "nvlink") + y.AddTag("stype-id", fmt.Sprintf("%d", i)) + output <- y + } + } + } + } + } + } + return nil +} + func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric) { + var err error if !m.init { return } - for i := range m.gpus { - device := &m.gpus[i] - - if !device.excludeMetrics["nv_util"] || !device.excludeMetrics["nv_mem_util"] { - // Retrieves the current utilization rates for the device's major subsystems. - // - // Available utilization rates - // * Gpu: Percent of time over the past sample period during which one or more kernels was executing on the GPU. - // * Memory: Percent of time over the past sample period during which global (device) memory was being read or written - // - // Note: - // * During driver initialization when ECC is enabled one can see high GPU and Memory Utilization readings. - // This is caused by ECC Memory Scrubbing mechanism that is performed during driver initialization. - // * On MIG-enabled GPUs, querying device utilization rates is not currently supported. - util, ret := nvml.DeviceGetUtilizationRates(device.device) - if ret == nvml.SUCCESS { - if !device.excludeMetrics["nv_util"] { - y, err := lp.New("nv_util", device.tags, m.meta, map[string]interface{}{"value": float64(util.Gpu)}, time.Now()) - if err == nil { - y.AddMeta("unit", "%") - output <- y - } - } - if !device.excludeMetrics["nv_mem_util"] { - y, err := lp.New("nv_mem_util", device.tags, m.meta, map[string]interface{}{"value": float64(util.Memory)}, time.Now()) - if err == nil { - y.AddMeta("unit", "%") - output <- y - } - } - } + readAll := func(device NvidiaCollectorDevice, output chan lp.CCMetric) { + name, ret := nvml.DeviceGetName(device.device) + if ret != nvml.SUCCESS { + name = "NoName" + } + err = readMemoryInfo(device, output) + if err != nil { + cclog.ComponentDebug(m.name, "readMemoryInfo for device", name, "failed") } - if !device.excludeMetrics["nv_mem_total"] || !device.excludeMetrics["nv_fb_memory"] { - // Retrieves the amount of used, free and total memory available on the device, in bytes. - // - // Enabling ECC reduces the amount of total available memory, due to the extra required parity bits. - // - // The reported amount of used memory is equal to the sum of memory allocated by all active channels on the device. - // - // Available memory info: - // * Free: Unallocated FB memory (in bytes). - // * Total: Total installed FB memory (in bytes). - // * Used: Allocated FB memory (in bytes). Note that the driver/GPU always sets aside a small amount of memory for bookkeeping. - // - // Note: - // In MIG mode, if device handle is provided, the API returns aggregate information, only if the caller has appropriate privileges. - // Per-instance information can be queried by using specific MIG device handles. - meminfo, ret := nvml.DeviceGetMemoryInfo(device.device) - if ret == nvml.SUCCESS { - if !device.excludeMetrics["nv_mem_total"] { - t := float64(meminfo.Total) / (1024 * 1024) - y, err := lp.New("nv_mem_total", device.tags, m.meta, map[string]interface{}{"value": t}, time.Now()) - if err == nil { - y.AddMeta("unit", "MByte") - output <- y - } - } - - if !device.excludeMetrics["nv_fb_memory"] { - f := float64(meminfo.Used) / (1024 * 1024) - y, err := lp.New("nv_fb_memory", device.tags, m.meta, map[string]interface{}{"value": f}, time.Now()) - if err == nil { - y.AddMeta("unit", "MByte") - output <- y - } - } - } + err = readUtilization(device, output) + if err != nil { + cclog.ComponentDebug(m.name, "readUtilization for device", name, "failed") } - if !device.excludeMetrics["nv_temp"] { - // Retrieves the current temperature readings for the device, in degrees C. - // - // Available temperature sensors: - // * TEMPERATURE_GPU: Temperature sensor for the GPU die. - // * NVML_TEMPERATURE_COUNT - temp, ret := nvml.DeviceGetTemperature(device.device, nvml.TEMPERATURE_GPU) - if ret == nvml.SUCCESS { - y, err := lp.New("nv_temp", device.tags, m.meta, map[string]interface{}{"value": float64(temp)}, time.Now()) - if err == nil { - y.AddMeta("unit", "degC") - output <- y - } - } + err = readTemp(device, output) + if err != nil { + cclog.ComponentDebug(m.name, "readTemp for device", name, "failed") } - if !device.excludeMetrics["nv_fan"] { - // Retrieves the intended operating speed of the device's fan. - // - // Note: The reported speed is the intended fan speed. - // If the fan is physically blocked and unable to spin, the output will not match the actual fan speed. - // - // For all discrete products with dedicated fans. - // - // The fan speed is expressed as a percentage of the product's maximum noise tolerance fan speed. - // This value may exceed 100% in certain cases. - fan, ret := nvml.DeviceGetFanSpeed(device.device) - if ret == nvml.SUCCESS { - y, err := lp.New("nv_fan", device.tags, m.meta, map[string]interface{}{"value": float64(fan)}, time.Now()) - if err == nil { - y.AddMeta("unit", "%") - output <- y - } - } + err = readFan(device, output) + if err != nil { + cclog.ComponentDebug(m.name, "readFan for device", name, "failed") } - if !device.excludeMetrics["nv_ecc_mode"] { - // Retrieves the current and pending ECC modes for the device. - // - // For Fermi or newer fully supported devices. Only applicable to devices with ECC. - // Requires NVML_INFOROM_ECC version 1.0 or higher. - // - // Changing ECC modes requires a reboot. - // The "pending" ECC mode refers to the target mode following the next reboot. - _, ecc_pend, ret := nvml.DeviceGetEccMode(device.device) - if ret == nvml.SUCCESS { - var y lp.CCMetric - var err error - switch ecc_pend { - case nvml.FEATURE_DISABLED: - y, err = lp.New("nv_ecc_mode", device.tags, m.meta, map[string]interface{}{"value": "OFF"}, time.Now()) - case nvml.FEATURE_ENABLED: - y, err = lp.New("nv_ecc_mode", device.tags, m.meta, map[string]interface{}{"value": "ON"}, time.Now()) - default: - y, err = lp.New("nv_ecc_mode", device.tags, m.meta, map[string]interface{}{"value": "UNKNOWN"}, time.Now()) - } - if err == nil { - output <- y - } - } else if ret == nvml.ERROR_NOT_SUPPORTED { - y, err := lp.New("nv_ecc_mode", device.tags, m.meta, map[string]interface{}{"value": "N/A"}, time.Now()) - if err == nil { - output <- y - } - } + err = readEccMode(device, output) + if err != nil { + cclog.ComponentDebug(m.name, "readEccMode for device", name, "failed") } - if !device.excludeMetrics["nv_perf_state"] { - // Retrieves the current performance state for the device. - // - // Allowed PStates: - // 0: Maximum Performance. - // .. - // 15: Minimum Performance. - // 32: Unknown performance state. - pState, ret := nvml.DeviceGetPerformanceState(device.device) - if ret == nvml.SUCCESS { - y, err := lp.New("nv_perf_state", device.tags, m.meta, map[string]interface{}{"value": fmt.Sprintf("P%d", int(pState))}, time.Now()) - if err == nil { - output <- y - } - } + err = readPerfState(device, output) + if err != nil { + cclog.ComponentDebug(m.name, "readPerfState for device", name, "failed") } - if !device.excludeMetrics["nv_power_usage_report"] { - // Retrieves power usage for this GPU in milliwatts and its associated circuitry (e.g. memory) - // - // On Fermi and Kepler GPUs the reading is accurate to within +/- 5% of current power draw. - // - // It is only available if power management mode is supported - power, ret := nvml.DeviceGetPowerUsage(device.device) - if ret == nvml.SUCCESS { - y, err := lp.New("nv_power_usage_report", device.tags, m.meta, map[string]interface{}{"value": float64(power) / 1000}, time.Now()) - if err == nil { - y.AddMeta("unit", "watts") - output <- y - } - } + err = readPowerUsage(device, output) + if err != nil { + cclog.ComponentDebug(m.name, "readPowerUsage for device", name, "failed") } - // Retrieves the current clock speeds for the device. - // - // Available clock information: - // * CLOCK_GRAPHICS: Graphics clock domain. - // * CLOCK_SM: Streaming Multiprocessor clock domain. - // * CLOCK_MEM: Memory clock domain. - if !device.excludeMetrics["nv_graphics_clock_report"] { - graphicsClock, ret := nvml.DeviceGetClockInfo(device.device, nvml.CLOCK_GRAPHICS) - if ret == nvml.SUCCESS { - y, err := lp.New("nv_graphics_clock_report", device.tags, m.meta, map[string]interface{}{"value": float64(graphicsClock)}, time.Now()) - if err == nil { - y.AddMeta("unit", "MHz") - output <- y - } - } + err = readClocks(device, output) + if err != nil { + cclog.ComponentDebug(m.name, "readClocks for device", name, "failed") } - if !device.excludeMetrics["nv_sm_clock_report"] { - smCock, ret := nvml.DeviceGetClockInfo(device.device, nvml.CLOCK_SM) - if ret == nvml.SUCCESS { - y, err := lp.New("nv_sm_clock_report", device.tags, m.meta, map[string]interface{}{"value": float64(smCock)}, time.Now()) - if err == nil { - y.AddMeta("unit", "MHz") - output <- y - } - } + err = readMaxClocks(device, output) + if err != nil { + cclog.ComponentDebug(m.name, "readMaxClocks for device", name, "failed") } - if !device.excludeMetrics["nv_mem_clock_report"] { - memClock, ret := nvml.DeviceGetClockInfo(device.device, nvml.CLOCK_MEM) - if ret == nvml.SUCCESS { - y, err := lp.New("nv_mem_clock_report", device.tags, m.meta, map[string]interface{}{"value": float64(memClock)}, time.Now()) - if err == nil { - y.AddMeta("unit", "MHz") - output <- y - } - } + err = readEccErrors(device, output) + if err != nil { + cclog.ComponentDebug(m.name, "readEccErrors for device", name, "failed") } - // Retrieves the maximum clock speeds for the device. - // - // Available clock information: - // * CLOCK_GRAPHICS: Graphics clock domain. - // * CLOCK_SM: Streaming multiprocessor clock domain. - // * CLOCK_MEM: Memory clock domain. - // * CLOCK_VIDEO: Video encoder/decoder clock domain. - // * CLOCK_COUNT: Count of clock types. - // - // Note: - /// On GPUs from Fermi family current P0 clocks (reported by nvmlDeviceGetClockInfo) can differ from max clocks by few MHz. - if !device.excludeMetrics["nv_max_graphics_clock"] { - max_gclk, ret := nvml.DeviceGetMaxClockInfo(device.device, nvml.CLOCK_GRAPHICS) - if ret == nvml.SUCCESS { - y, err := lp.New("nv_max_graphics_clock", device.tags, m.meta, map[string]interface{}{"value": float64(max_gclk)}, time.Now()) - if err == nil { - y.AddMeta("unit", "MHz") - output <- y - } - } + err = readPowerLimit(device, output) + if err != nil { + cclog.ComponentDebug(m.name, "readPowerLimit for device", name, "failed") } - if !device.excludeMetrics["nv_max_sm_clock"] { - maxSmClock, ret := nvml.DeviceGetClockInfo(device.device, nvml.CLOCK_SM) - if ret == nvml.SUCCESS { - y, err := lp.New("nv_max_sm_clock", device.tags, m.meta, map[string]interface{}{"value": float64(maxSmClock)}, time.Now()) - if err == nil { - y.AddMeta("unit", "MHz") - output <- y - } - } + err = readEncUtilization(device, output) + if err != nil { + cclog.ComponentDebug(m.name, "readEncUtilization for device", name, "failed") } - if !device.excludeMetrics["nv_max_mem_clock"] { - maxMemClock, ret := nvml.DeviceGetClockInfo(device.device, nvml.CLOCK_MEM) - if ret == nvml.SUCCESS { - y, err := lp.New("nv_max_mem_clock", device.tags, m.meta, map[string]interface{}{"value": float64(maxMemClock)}, time.Now()) - if err == nil { - y.AddMeta("unit", "MHz") - output <- y - } - } + err = readDecUtilization(device, output) + if err != nil { + cclog.ComponentDebug(m.name, "readDecUtilization for device", name, "failed") } - if !device.excludeMetrics["nv_ecc_db_error"] { - // Retrieves the total ECC error counts for the device. - // - // For Fermi or newer fully supported devices. - // Only applicable to devices with ECC. - // Requires NVML_INFOROM_ECC version 1.0 or higher. - // Requires ECC Mode to be enabled. - // - // The total error count is the sum of errors across each of the separate memory systems, - // i.e. the total set of errors across the entire device. - ecc_db, ret := nvml.DeviceGetTotalEccErrors(device.device, nvml.MEMORY_ERROR_TYPE_UNCORRECTED, nvml.AGGREGATE_ECC) - if ret == nvml.SUCCESS { - y, err := lp.New("nv_ecc_db_error", device.tags, m.meta, map[string]interface{}{"value": float64(ecc_db)}, time.Now()) - if err == nil { - output <- y - } - } + err = readRemappedRows(device, output) + if err != nil { + cclog.ComponentDebug(m.name, "readRemappedRows for device", name, "failed") } - if !device.excludeMetrics["nv_ecc_sb_error"] { - ecc_sb, ret := nvml.DeviceGetTotalEccErrors(device.device, nvml.MEMORY_ERROR_TYPE_CORRECTED, nvml.AGGREGATE_ECC) - if ret == nvml.SUCCESS { - y, err := lp.New("nv_ecc_sb_error", device.tags, m.meta, map[string]interface{}{"value": float64(ecc_sb)}, time.Now()) - if err == nil { - output <- y - } - } + err = readBarMemoryInfo(device, output) + if err != nil { + cclog.ComponentDebug(m.name, "readBarMemoryInfo for device", name, "failed") } - if !device.excludeMetrics["nv_power_man_limit"] { - // Retrieves the power management limit associated with this device. - // - // For Fermi or newer fully supported devices. - // - // The power limit defines the upper boundary for the card's power draw. - // If the card's total power draw reaches this limit the power management algorithm kicks in. - pwr_limit, ret := nvml.DeviceGetPowerManagementLimit(device.device) - if ret == nvml.SUCCESS { - y, err := lp.New("nv_power_man_limit", device.tags, m.meta, map[string]interface{}{"value": float64(pwr_limit) / 1000}, time.Now()) - if err == nil { - y.AddMeta("unit", "watts") - output <- y - } - } + err = readProcessCounts(device, output) + if err != nil { + cclog.ComponentDebug(m.name, "readProcessCounts for device", name, "failed") } - if !device.excludeMetrics["nv_encoder_util"] { - // Retrieves the current utilization and sampling size in microseconds for the Encoder - // - // For Kepler or newer fully supported devices. - // - // Note: On MIG-enabled GPUs, querying encoder utilization is not currently supported. - enc_util, _, ret := nvml.DeviceGetEncoderUtilization(device.device) - if ret == nvml.SUCCESS { - y, err := lp.New("nv_encoder_util", device.tags, m.meta, map[string]interface{}{"value": float64(enc_util)}, time.Now()) - if err == nil { - y.AddMeta("unit", "%") - output <- y - } - } + err = readViolationStats(device, output) + if err != nil { + cclog.ComponentDebug(m.name, "readViolationStats for device", name, "failed") } - if !device.excludeMetrics["nv_decoder_util"] { - // Retrieves the current utilization and sampling size in microseconds for the Decoder - // - // For Kepler or newer fully supported devices. - // - // Note: On MIG-enabled GPUs, querying decoder utilization is not currently supported. - dec_util, _, ret := nvml.DeviceGetDecoderUtilization(device.device) - if ret == nvml.SUCCESS { - y, err := lp.New("nv_decoder_util", device.tags, m.meta, map[string]interface{}{"value": float64(dec_util)}, time.Now()) - if err == nil { - y.AddMeta("unit", "%") - output <- y - } - } + err = readNVLinkStats(device, output) + if err != nil { + cclog.ComponentDebug(m.name, "readNVLinkStats for device", name, "failed") } } + // Actual read loop over all attached Nvidia GPUs + for i := 0; i < m.num_gpus; i++ { + + readAll(m.gpus[i], output) + + // Iterate over all MIG devices if any + if m.config.ProcessMigDevices { + current, _, ret := nvml.DeviceGetMigMode(m.gpus[i].device) + if ret != nvml.SUCCESS { + continue + } + if current == nvml.DEVICE_MIG_DISABLE { + continue + } + + maxMig, ret := nvml.DeviceGetMaxMigDeviceCount(m.gpus[i].device) + if ret != nvml.SUCCESS { + continue + } + if maxMig == 0 { + continue + } + cclog.ComponentDebug(m.name, "Reading MIG devices for GPU", i) + + for j := 0; j < maxMig; j++ { + mdev, ret := nvml.DeviceGetMigDeviceHandleByIndex(m.gpus[i].device, j) + if ret != nvml.SUCCESS { + continue + } + + excludeMetrics := make(map[string]bool) + for _, metric := range m.config.ExcludeMetrics { + excludeMetrics[metric] = true + } + + migDevice := NvidiaCollectorDevice{ + device: mdev, + tags: map[string]string{}, + meta: map[string]string{}, + excludeMetrics: excludeMetrics, + } + for k, v := range m.gpus[i].tags { + migDevice.tags[k] = v + } + migDevice.tags["stype"] = "mig" + if m.config.UseUuidForMigDevices { + uuid, ret := nvml.DeviceGetUUID(mdev) + if ret != nvml.SUCCESS { + cclog.ComponentError(m.name, "Unable to get UUID for mig device at index", j, ":", err.Error()) + } else { + migDevice.tags["stype-id"] = uuid + } + } else if m.config.UseSliceForMigDevices { + name, ret := nvml.DeviceGetName(m.gpus[i].device) + if ret == nvml.SUCCESS { + mname, ret := nvml.DeviceGetName(mdev) + if ret == nvml.SUCCESS { + x := strings.Replace(mname, name, "", -1) + x = strings.Replace(x, "MIG", "", -1) + x = strings.TrimSpace(x) + migDevice.tags["stype-id"] = x + } + } + } + if _, ok := migDevice.tags["stype-id"]; !ok { + migDevice.tags["stype-id"] = fmt.Sprintf("%d", j) + } + for k, v := range m.gpus[i].meta { + migDevice.meta[k] = v + } + if _, ok := migDevice.meta["uuid"]; ok && !m.config.UseUuidForMigDevices { + uuid, ret := nvml.DeviceGetUUID(mdev) + if ret == nvml.SUCCESS { + migDevice.meta["uuid"] = uuid + } + } + + readAll(migDevice, output) + } + } + } } func (m *NvidiaCollector) Close() { diff --git a/collectors/nvidiaMetric.md b/collectors/nvidiaMetric.md index afe8b9e..7f0c416 100644 --- a/collectors/nvidiaMetric.md +++ b/collectors/nvidiaMetric.md @@ -3,38 +3,74 @@ ```json "nvidia": { - "exclude_devices" : [ - "0","1" + "exclude_devices": [ + "0","1", "0000000:ff:01.0" ], "exclude_metrics": [ - "nv_fb_memory", + "nv_fb_mem_used", "nv_fan" - ] + ], + "process_mig_devices": false, + "use_pci_info_as_type_id": true, + "add_pci_info_tag": false, + "add_uuid_meta": false, + "add_board_number_meta": false, + "add_serial_meta": false, + "use_uuid_for_mig_device": false, + "use_slice_for_mig_device": false } ``` +The `nvidia` collector can be configured to leave out specific devices with the `exclude_devices` option. It takes IDs as supplied to the NVML with `nvmlDeviceGetHandleByIndex()` or the PCI address in NVML format (`%08X:%02X:%02X.0`). Metrics (listed below) that should not be sent to the MetricRouter can be excluded with the `exclude_metrics` option. Commonly only the physical GPUs are monitored. If MIG devices should be analyzed as well, set `process_mig_devices` (adds `stype=mig,stype-id=`). With the options `use_uuid_for_mig_device` and `use_slice_for_mig_device`, the `` can be replaced with the UUID (e.g. `MIG-6a9f7cc8-6d5b-5ce0-92de-750edc4d8849`) or the MIG slice name (e.g. `1g.5gb`). + +The metrics sent by the `nvidia` collector use `accelerator` as `type` tag. For the `type-id`, it uses the device handle index by default. With the `use_pci_info_as_type_id` option, the PCI ID is used instead. If both values should be added as tags, activate the `add_pci_info_tag` option. It uses the device handle index as `type-id` and adds the PCI ID as separate `pci_identifier` tag. + +Optionally, it is possible to add the UUID, the board part number and the serial to the meta informations. They are not sent to the sinks (if not configured otherwise). + + Metrics: * `nv_util` * `nv_mem_util` -* `nv_mem_total` -* `nv_fb_memory` +* `nv_fb_mem_total` +* `nv_fb_mem_used` +* `nv_bar1_mem_total` +* `nv_bar1_mem_used` * `nv_temp` * `nv_fan` * `nv_ecc_mode` * `nv_perf_state` -* `nv_power_usage_report` -* `nv_graphics_clock_report` -* `nv_sm_clock_report` -* `nv_mem_clock_report` +* `nv_power_usage` +* `nv_graphics_clock` +* `nv_sm_clock` +* `nv_mem_clock` +* `nv_video_clock` * `nv_max_graphics_clock` * `nv_max_sm_clock` * `nv_max_mem_clock` -* `nv_ecc_db_error` -* `nv_ecc_sb_error` -* `nv_power_man_limit` +* `nv_max_video_clock` +* `nv_ecc_uncorrected_error` +* `nv_ecc_corrected_error` +* `nv_power_max_limit` * `nv_encoder_util` * `nv_decoder_util` +* `nv_remapped_rows_corrected` +* `nv_remapped_rows_uncorrected` +* `nv_remapped_rows_pending` +* `nv_remapped_rows_failure` +* `nv_compute_processes` +* `nv_graphics_processes` +* `nv_violation_power` +* `nv_violation_thermal` +* `nv_violation_sync_boost` +* `nv_violation_board_limit` +* `nv_violation_low_util` +* `nv_violation_reliability` +* `nv_violation_below_app_clock` +* `nv_violation_below_base_clock` +* `nv_nvlink_crc_flit_errors` +* `nv_nvlink_crc_errors` +* `nv_nvlink_ecc_errors` +* `nv_nvlink_replay_errors` +* `nv_nvlink_recovery_errors` -It uses a separate `type` in the metrics. The output metric looks like this: -`,type=accelerator,type-id= value= ` - +Some metrics add the additional sub type tag (`stype`) like the `nv_nvlink_*` metrics set `stype=nvlink,stype-id=`. \ No newline at end of file diff --git a/collectors/rocmsmiMetric.go b/collectors/rocmsmiMetric.go new file mode 100644 index 0000000..c717a5d --- /dev/null +++ b/collectors/rocmsmiMetric.go @@ -0,0 +1,319 @@ +package collectors + +import ( + "encoding/json" + "errors" + "fmt" + "time" + + cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" + lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + "github.com/ClusterCockpit/go-rocm-smi/pkg/rocm_smi" +) + +type RocmSmiCollectorConfig struct { + ExcludeMetrics []string `json:"exclude_metrics,omitempty"` + ExcludeDevices []string `json:"exclude_devices,omitempty"` + AddPciInfoTag bool `json:"add_pci_info_tag,omitempty"` + UsePciInfoAsTypeId bool `json:"use_pci_info_as_type_id,omitempty"` + AddSerialMeta bool `json:"add_serial_meta,omitempty"` +} + +type RocmSmiCollectorDevice struct { + device rocm_smi.DeviceHandle + index int + tags map[string]string // default tags + meta map[string]string // default meta information + excludeMetrics map[string]bool // copy of exclude metrics from config +} + +type RocmSmiCollector struct { + metricCollector + config RocmSmiCollectorConfig // the configuration structure + devices []RocmSmiCollectorDevice +} + +// Functions to implement MetricCollector interface +// Init(...), Read(...), Close() +// See: metricCollector.go + +// Init initializes the sample collector +// Called once by the collector manager +// All tags, meta data tags and metrics that do not change over the runtime should be set here +func (m *RocmSmiCollector) Init(config json.RawMessage) error { + var err error = nil + // Always set the name early in Init() to use it in cclog.Component* functions + m.name = "RocmSmiCollector" + // This is for later use, also call it early + m.setup() + // Define meta information sent with each metric + // (Can also be dynamic or this is the basic set with extension through AddMeta()) + //m.meta = map[string]string{"source": m.name, "group": "AMD"} + // Define tags sent with each metric + // The 'type' tag is always needed, it defines the granulatity of the metric + // node -> whole system + // socket -> CPU socket (requires socket ID as 'type-id' tag) + // cpu -> single CPU hardware thread (requires cpu ID as 'type-id' tag) + //m.tags = map[string]string{"type": "node"} + // Read in the JSON configuration + if len(config) > 0 { + err = json.Unmarshal(config, &m.config) + if err != nil { + cclog.ComponentError(m.name, "Error reading config:", err.Error()) + return err + } + } + + ret := rocm_smi.Init() + if ret != rocm_smi.STATUS_SUCCESS { + err = errors.New("Failed to initialize ROCm SMI library") + cclog.ComponentError(m.name, err.Error()) + return err + } + + numDevs, ret := rocm_smi.NumMonitorDevices() + if ret != rocm_smi.STATUS_SUCCESS { + err = errors.New("Failed to get number of GPUs from ROCm SMI library") + cclog.ComponentError(m.name, err.Error()) + return err + } + + exclDev := func(s string) bool { + skip_device := false + for _, excl := range m.config.ExcludeDevices { + if excl == s { + skip_device = true + break + } + } + return skip_device + } + + m.devices = make([]RocmSmiCollectorDevice, 0) + + for i := 0; i < numDevs; i++ { + str_i := fmt.Sprintf("%d", i) + if exclDev(str_i) { + continue + } + device, ret := rocm_smi.DeviceGetHandleByIndex(i) + if ret != rocm_smi.STATUS_SUCCESS { + err = fmt.Errorf("Failed to get handle for GPU %d", i) + cclog.ComponentError(m.name, err.Error()) + return err + } + + pciInfo, ret := rocm_smi.DeviceGetPciInfo(device) + if ret != rocm_smi.STATUS_SUCCESS { + err = fmt.Errorf("Failed to get PCI information for GPU %d", i) + cclog.ComponentError(m.name, err.Error()) + return err + } + + pciId := fmt.Sprintf( + "%08X:%02X:%02X.%X", + pciInfo.Domain, + pciInfo.Bus, + pciInfo.Device, + pciInfo.Function) + + if exclDev(pciId) { + continue + } + + dev := RocmSmiCollectorDevice{ + device: device, + tags: map[string]string{ + "type": "accelerator", + "type-id": str_i, + }, + meta: map[string]string{ + "source": m.name, + "group": "AMD", + }, + } + if m.config.UsePciInfoAsTypeId { + dev.tags["type-id"] = pciId + } else if m.config.AddPciInfoTag { + dev.tags["pci_identifier"] = pciId + } + + if m.config.AddSerialMeta { + serial, ret := rocm_smi.DeviceGetSerialNumber(device) + if ret != rocm_smi.STATUS_SUCCESS { + cclog.ComponentError(m.name, "Unable to get serial number for device at index", i, ":", rocm_smi.StatusStringNoError(ret)) + } else { + dev.meta["serial"] = serial + } + } + // Add excluded metrics + dev.excludeMetrics = map[string]bool{} + for _, e := range m.config.ExcludeMetrics { + dev.excludeMetrics[e] = true + } + dev.index = i + m.devices = append(m.devices, dev) + } + + // Set this flag only if everything is initialized properly, all required files exist, ... + m.init = true + return err +} + +// Read collects all metrics belonging to the sample collector +// and sends them through the output channel to the collector manager +func (m *RocmSmiCollector) Read(interval time.Duration, output chan lp.CCMetric) { + // Create a sample metric + timestamp := time.Now() + + for _, dev := range m.devices { + metrics, ret := rocm_smi.DeviceGetMetrics(dev.device) + if ret != rocm_smi.STATUS_SUCCESS { + cclog.ComponentError(m.name, "Unable to get metrics for device at index", dev.index, ":", rocm_smi.StatusStringNoError(ret)) + continue + } + + if !dev.excludeMetrics["rocm_gfx_util"] { + value := metrics.Average_gfx_activity + y, err := lp.New("rocm_gfx_util", dev.tags, dev.meta, map[string]interface{}{"value": value}, timestamp) + if err == nil { + output <- y + } + } + if !dev.excludeMetrics["rocm_umc_util"] { + value := metrics.Average_umc_activity + y, err := lp.New("rocm_umc_util", dev.tags, dev.meta, map[string]interface{}{"value": value}, timestamp) + if err == nil { + output <- y + } + } + if !dev.excludeMetrics["rocm_mm_util"] { + value := metrics.Average_mm_activity + y, err := lp.New("rocm_mm_util", dev.tags, dev.meta, map[string]interface{}{"value": value}, timestamp) + if err == nil { + output <- y + } + } + if !dev.excludeMetrics["rocm_avg_power"] { + value := metrics.Average_socket_power + y, err := lp.New("rocm_avg_power", dev.tags, dev.meta, map[string]interface{}{"value": value}, timestamp) + if err == nil { + output <- y + } + } + if !dev.excludeMetrics["rocm_temp_mem"] { + value := metrics.Temperature_mem + y, err := lp.New("rocm_temp_mem", dev.tags, dev.meta, map[string]interface{}{"value": value}, timestamp) + if err == nil { + output <- y + } + } + if !dev.excludeMetrics["rocm_temp_hotspot"] { + value := metrics.Temperature_hotspot + y, err := lp.New("rocm_temp_hotspot", dev.tags, dev.meta, map[string]interface{}{"value": value}, timestamp) + if err == nil { + output <- y + } + } + if !dev.excludeMetrics["rocm_temp_edge"] { + value := metrics.Temperature_edge + y, err := lp.New("rocm_temp_edge", dev.tags, dev.meta, map[string]interface{}{"value": value}, timestamp) + if err == nil { + output <- y + } + } + if !dev.excludeMetrics["rocm_temp_vrgfx"] { + value := metrics.Temperature_vrgfx + y, err := lp.New("rocm_temp_vrgfx", dev.tags, dev.meta, map[string]interface{}{"value": value}, timestamp) + if err == nil { + output <- y + } + } + if !dev.excludeMetrics["rocm_temp_vrsoc"] { + value := metrics.Temperature_vrsoc + y, err := lp.New("rocm_temp_vrsoc", dev.tags, dev.meta, map[string]interface{}{"value": value}, timestamp) + if err == nil { + output <- y + } + } + if !dev.excludeMetrics["rocm_temp_vrmem"] { + value := metrics.Temperature_vrmem + y, err := lp.New("rocm_temp_vrmem", dev.tags, dev.meta, map[string]interface{}{"value": value}, timestamp) + if err == nil { + output <- y + } + } + if !dev.excludeMetrics["rocm_gfx_clock"] { + value := metrics.Average_gfxclk_frequency + y, err := lp.New("rocm_gfx_clock", dev.tags, dev.meta, map[string]interface{}{"value": value}, timestamp) + if err == nil { + output <- y + } + } + if !dev.excludeMetrics["rocm_soc_clock"] { + value := metrics.Average_socclk_frequency + y, err := lp.New("rocm_soc_clock", dev.tags, dev.meta, map[string]interface{}{"value": value}, timestamp) + if err == nil { + output <- y + } + } + if !dev.excludeMetrics["rocm_u_clock"] { + value := metrics.Average_uclk_frequency + y, err := lp.New("rocm_u_clock", dev.tags, dev.meta, map[string]interface{}{"value": value}, timestamp) + if err == nil { + output <- y + } + } + if !dev.excludeMetrics["rocm_v0_clock"] { + value := metrics.Average_vclk0_frequency + y, err := lp.New("rocm_v0_clock", dev.tags, dev.meta, map[string]interface{}{"value": value}, timestamp) + if err == nil { + output <- y + } + } + if !dev.excludeMetrics["rocm_v1_clock"] { + value := metrics.Average_vclk1_frequency + y, err := lp.New("rocm_v1_clock", dev.tags, dev.meta, map[string]interface{}{"value": value}, timestamp) + if err == nil { + output <- y + } + } + if !dev.excludeMetrics["rocm_d0_clock"] { + value := metrics.Average_dclk0_frequency + y, err := lp.New("rocm_d0_clock", dev.tags, dev.meta, map[string]interface{}{"value": value}, timestamp) + if err == nil { + output <- y + } + } + if !dev.excludeMetrics["rocm_d1_clock"] { + value := metrics.Average_dclk1_frequency + y, err := lp.New("rocm_d1_clock", dev.tags, dev.meta, map[string]interface{}{"value": value}, timestamp) + if err == nil { + output <- y + } + } + if !dev.excludeMetrics["rocm_temp_hbm"] { + for i := 0; i < rocm_smi.NUM_HBM_INSTANCES; i++ { + value := metrics.Temperature_hbm[i] + y, err := lp.New("rocm_temp_hbm", dev.tags, dev.meta, map[string]interface{}{"value": value}, timestamp) + if err == nil { + y.AddTag("stype", "device") + y.AddTag("stype-id", fmt.Sprintf("%d", i)) + output <- y + } + } + } + } + +} + +// Close metric collector: close network connection, close files, close libraries, ... +// Called once by the collector manager +func (m *RocmSmiCollector) Close() { + // Unset flag + ret := rocm_smi.Shutdown() + if ret != rocm_smi.STATUS_SUCCESS { + cclog.ComponentError(m.name, "Failed to shutdown ROCm SMI library") + } + m.init = false +} diff --git a/collectors/rocmsmiMetric.md b/collectors/rocmsmiMetric.md new file mode 100644 index 0000000..9c4da5e --- /dev/null +++ b/collectors/rocmsmiMetric.md @@ -0,0 +1,47 @@ + +## `rocm_smi` collector + +```json + "rocm_smi": { + "exclude_devices": [ + "0","1", "0000000:ff:01.0" + ], + "exclude_metrics": [ + "rocm_mm_util", + "rocm_temp_vrsoc" + ], + "use_pci_info_as_type_id": true, + "add_pci_info_tag": false, + "add_serial_meta": false, + } +``` + +The `rocm_smi` collector can be configured to leave out specific devices with the `exclude_devices` option. It takes logical IDs in the list of available devices or the PCI address similar to NVML format (`%08X:%02X:%02X.0`). Metrics (listed below) that should not be sent to the MetricRouter can be excluded with the `exclude_metrics` option. + +The metrics sent by the `rocm_smi` collector use `accelerator` as `type` tag. For the `type-id`, it uses the device handle index by default. With the `use_pci_info_as_type_id` option, the PCI ID is used instead. If both values should be added as tags, activate the `add_pci_info_tag` option. It uses the device handle index as `type-id` and adds the PCI ID as separate `pci_identifier` tag. + +Optionally, it is possible to add the serial to the meta informations. They are not sent to the sinks (if not configured otherwise). + + +Metrics: +* `rocm_gfx_util` +* `rocm_umc_util` +* `rocm_mm_util` +* `rocm_avg_power` +* `rocm_temp_mem` +* `rocm_temp_hotspot` +* `rocm_temp_edge` +* `rocm_temp_vrgfx` +* `rocm_temp_vrsoc` +* `rocm_temp_vrmem` +* `rocm_gfx_clock` +* `rocm_soc_clock` +* `rocm_u_clock` +* `rocm_v0_clock` +* `rocm_v1_clock` +* `rocm_d0_clock` +* `rocm_d1_clock` +* `rocm_temp_hbm` + + +Some metrics add the additional sub type tag (`stype`) like the `rocm_temp_hbm` metrics set `stype=device,stype-id=`. diff --git a/collectors/sampleMetric.go b/collectors/sampleMetric.go index 47078a6..47ec296 100644 --- a/collectors/sampleMetric.go +++ b/collectors/sampleMetric.go @@ -35,6 +35,10 @@ func (m *SampleCollector) Init(config json.RawMessage) error { m.name = "InternalCollector" // This is for later use, also call it early m.setup() + // Tell whether the collector should be run in parallel with others (reading files, ...) + // or it should be run serially, mostly for collectors acutally doing measurements + // because they should not measure the execution of the other collectors + m.parallel = true // Define meta information sent with each metric // (Can also be dynamic or this is the basic set with extension through AddMeta()) m.meta = map[string]string{"source": m.name, "group": "SAMPLE"} @@ -42,7 +46,12 @@ func (m *SampleCollector) Init(config json.RawMessage) error { // The 'type' tag is always needed, it defines the granulatity of the metric // node -> whole system // socket -> CPU socket (requires socket ID as 'type-id' tag) - // cpu -> single CPU hardware thread (requires cpu ID as 'type-id' tag) + // die -> CPU die (requires CPU die ID as 'type-id' tag) + // memoryDomain -> NUMA domain (requires NUMA domain ID as 'type-id' tag) + // llc -> Last level cache (requires last level cache ID as 'type-id' tag) + // core -> single CPU core that may consist of multiple hardware threads (SMT) (requires core ID as 'type-id' tag) + // hwthtread -> single CPU hardware thread (requires hardware thread ID as 'type-id' tag) + // accelerator -> A accelerator device like GPU or FPGA (requires an accelerator ID as 'type-id' tag) m.tags = map[string]string{"type": "node"} // Read in the JSON configuration if len(config) > 0 { diff --git a/collectors/tempMetric.go b/collectors/tempMetric.go index 7ba8eb1..af9d7fd 100644 --- a/collectors/tempMetric.go +++ b/collectors/tempMetric.go @@ -50,6 +50,7 @@ func (m *TempCollector) Init(config json.RawMessage) error { } m.name = "TempCollector" + m.parallel = true m.setup() if len(config) > 0 { err := json.Unmarshal(config, &m.config) @@ -116,6 +117,10 @@ func (m *TempCollector) Init(config json.RawMessage) error { } // Sensor file + _, err = ioutil.ReadFile(file) + if err != nil { + continue + } sensor.file = file // Sensor tags diff --git a/collectors/topprocsMetric.go b/collectors/topprocsMetric.go index 408c3cc..1f4aaca 100644 --- a/collectors/topprocsMetric.go +++ b/collectors/topprocsMetric.go @@ -28,6 +28,7 @@ type TopProcsCollector struct { func (m *TopProcsCollector) Init(config json.RawMessage) error { var err error m.name = "TopProcsCollector" + m.parallel = true m.tags = map[string]string{"type": "node"} m.meta = map[string]string{"source": m.name, "group": "TopProcs"} if len(config) > 0 { diff --git a/config.json b/config.json index 52f9df1..924bec7 100644 --- a/config.json +++ b/config.json @@ -1,8 +1,8 @@ { - "sinks": "sinks.json", - "collectors" : "collectors.json", - "receivers" : "receivers.json", - "router" : "router.json", - "interval": 10, - "duration": 1 + "sinks": "./sinks.json", + "collectors" : "./collectors.json", + "receivers" : "./receivers.json", + "router" : "./router.json", + "interval": "10s", + "duration": "1s" } diff --git a/go.mod b/go.mod index 07d46f6..a4aacb8 100644 --- a/go.mod +++ b/go.mod @@ -1,16 +1,35 @@ module github.com/ClusterCockpit/cc-metric-collector -go 1.16 +go 1.17 require ( + github.com/ClusterCockpit/cc-units v0.0.0-20220318130935-92a0c6442220 github.com/NVIDIA/go-nvml v0.11.6-0 github.com/PaesslerAG/gval v1.1.2 github.com/gorilla/mux v1.8.0 github.com/influxdata/influxdb-client-go/v2 v2.8.1 github.com/influxdata/line-protocol v0.0.0-20210922203350-b1ad95c89adf - github.com/nats-io/nats-server/v2 v2.8.0 // indirect github.com/nats-io/nats.go v1.14.0 github.com/prometheus/client_golang v1.12.1 github.com/stmcginnis/gofish v0.13.0 golang.org/x/sys v0.0.0-20220412211240-33da011f77ad ) + +require ( + github.com/beorn7/perks v1.0.1 // indirect + github.com/cespare/xxhash/v2 v2.1.2 // indirect + github.com/deepmap/oapi-codegen v1.8.2 // indirect + github.com/golang/protobuf v1.5.2 // indirect + github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect + github.com/nats-io/nats-server/v2 v2.8.0 // indirect + github.com/nats-io/nkeys v0.3.0 // indirect + github.com/nats-io/nuid v1.0.1 // indirect + github.com/pkg/errors v0.9.1 // indirect + github.com/prometheus/client_model v0.2.0 // indirect + github.com/prometheus/common v0.32.1 // indirect + github.com/prometheus/procfs v0.7.3 // indirect + golang.org/x/crypto v0.0.0-20220112180741-5e0467b6c7ce // indirect + golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 // indirect + google.golang.org/protobuf v1.26.0 // indirect + gopkg.in/yaml.v2 v2.4.0 // indirect +) diff --git a/go.mod.1.16 b/go.mod.1.16 new file mode 100644 index 0000000..2a72cb3 --- /dev/null +++ b/go.mod.1.16 @@ -0,0 +1,16 @@ +module github.com/ClusterCockpit/cc-metric-collector + +go 1.16 + +require ( + github.com/NVIDIA/go-nvml v0.11.6-0 + github.com/PaesslerAG/gval v1.1.2 + github.com/gorilla/mux v1.8.0 + github.com/influxdata/influxdb-client-go/v2 v2.7.0 + github.com/influxdata/line-protocol v0.0.0-20210922203350-b1ad95c89adf + github.com/nats-io/nats-server/v2 v2.8.0 // indirect + github.com/nats-io/nats.go v1.14.0 + github.com/prometheus/client_golang v1.12.1 + github.com/stmcginnis/gofish v0.13.0 + golang.org/x/sys v0.0.0-20220412211240-33da011f77ad +) diff --git a/go.mod.1.17+ b/go.mod.1.17+ new file mode 100644 index 0000000..b3dc6ca --- /dev/null +++ b/go.mod.1.17+ @@ -0,0 +1,16 @@ +module github.com/ClusterCockpit/cc-metric-collector + +go 1.17 + +require ( + github.com/NVIDIA/go-nvml v0.11.6-0 + github.com/PaesslerAG/gval v1.1.2 + github.com/gorilla/mux v1.8.0 + github.com/influxdata/influxdb-client-go/v2 v2.8.1 + github.com/influxdata/line-protocol v0.0.0-20210922203350-b1ad95c89adf + github.com/nats-io/nats-server/v2 v2.8.0 // indirect + github.com/nats-io/nats.go v1.14.0 + github.com/prometheus/client_golang v1.12.1 + github.com/stmcginnis/gofish v0.13.0 + golang.org/x/sys v0.0.0-20220412211240-33da011f77ad +) diff --git a/internal/ccTopology/ccTopology.go b/internal/ccTopology/ccTopology.go index f68c3f4..0ed8883 100644 --- a/internal/ccTopology/ccTopology.go +++ b/internal/ccTopology/ccTopology.go @@ -29,6 +29,7 @@ func intArrayContains(array []int, str int) (int, bool) { return -1, false } +// Used internally for sysfs file reads func fileToInt(path string) int { buffer, err := ioutil.ReadFile(path) if err != nil { @@ -47,6 +48,7 @@ func fileToInt(path string) int { return int(id) } +// Get list of CPU socket IDs func SocketList() []int { buffer, err := ioutil.ReadFile(string(PROCFS_CPUINFO)) if err != nil { @@ -54,7 +56,7 @@ func SocketList() []int { return nil } ll := strings.Split(string(buffer), "\n") - var packs []int + packs := make([]int, 0) for _, line := range ll { if strings.HasPrefix(line, "physical id") { lv := strings.Fields(line) @@ -72,7 +74,8 @@ func SocketList() []int { return packs } -func CpuList() []int { +// Get list of hardware thread IDs in the order of listing in /proc/cpuinfo +func HwthreadList() []int { buffer, err := ioutil.ReadFile(string(PROCFS_CPUINFO)) if err != nil { log.Print(err) @@ -97,6 +100,13 @@ func CpuList() []int { return cpulist } +// Get list of hardware thread IDs in the order of listing in /proc/cpuinfo +// Deprecated! Use HwthreadList() +func CpuList() []int { + return HwthreadList() +} + +// Get list of CPU core IDs in the order of listing in /proc/cpuinfo func CoreList() []int { buffer, err := ioutil.ReadFile(string(PROCFS_CPUINFO)) if err != nil { @@ -122,6 +132,7 @@ func CoreList() []int { return corelist } +// Get list of NUMA node IDs func NumaNodeList() []int { numaList := make([]int, 0) globPath := filepath.Join(string(SYSFS_NUMABASE), "node*") @@ -156,8 +167,9 @@ func NumaNodeList() []int { return numaList } +// Get list of CPU die IDs func DieList() []int { - cpulist := CpuList() + cpulist := HwthreadList() dielist := make([]int, 0) for _, c := range cpulist { diepath := filepath.Join(string(SYSFS_CPUBASE), fmt.Sprintf("cpu%d", c), "topology/die_id") @@ -175,7 +187,27 @@ func DieList() []int { return SocketList() } -type CpuEntry struct { +// Get list of specified type using the naming format inside ClusterCockpit +func GetTypeList(topology_type string) []int { + switch topology_type { + case "node": + return []int{0} + case "socket": + return SocketList() + case "die": + return DieList() + case "memoryDomain": + return NumaNodeList() + case "core": + return CoreList() + case "hwthread": + return HwthreadList() + } + return []int{} +} + +// Structure holding all information about a hardware thread +type HwthreadEntry struct { Cpuid int SMT int Core int @@ -184,25 +216,25 @@ type CpuEntry struct { Die int } -func CpuData() []CpuEntry { +func CpuData() []HwthreadEntry { - fileToInt := func(path string) int { - buffer, err := ioutil.ReadFile(path) - if err != nil { - log.Print(err) - //cclogger.ComponentError("ccTopology", "Reading", path, ":", err.Error()) - return -1 - } - sbuffer := strings.Replace(string(buffer), "\n", "", -1) - var id int64 - //_, err = fmt.Scanf("%d", sbuffer, &id) - id, err = strconv.ParseInt(sbuffer, 10, 32) - if err != nil { - cclogger.ComponentError("ccTopology", "Parsing", path, ":", sbuffer, err.Error()) - return -1 - } - return int(id) - } + // fileToInt := func(path string) int { + // buffer, err := ioutil.ReadFile(path) + // if err != nil { + // log.Print(err) + // //cclogger.ComponentError("ccTopology", "Reading", path, ":", err.Error()) + // return -1 + // } + // sbuffer := strings.Replace(string(buffer), "\n", "", -1) + // var id int64 + // //_, err = fmt.Scanf("%d", sbuffer, &id) + // id, err = strconv.ParseInt(sbuffer, 10, 32) + // if err != nil { + // cclogger.ComponentError("ccTopology", "Parsing", path, ":", sbuffer, err.Error()) + // return -1 + // } + // return int(id) + // } getCore := func(basepath string) int { return fileToInt(fmt.Sprintf("%s/core_id", basepath)) } @@ -260,9 +292,9 @@ func CpuData() []CpuEntry { return 0 } - clist := make([]CpuEntry, 0) - for _, c := range CpuList() { - clist = append(clist, CpuEntry{Cpuid: c}) + clist := make([]HwthreadEntry, 0) + for _, c := range HwthreadList() { + clist = append(clist, HwthreadEntry{Cpuid: c}) } for i, centry := range clist { centry.Socket = -1 @@ -298,6 +330,7 @@ func CpuData() []CpuEntry { return clist } +// Structure holding basic information about a CPU type CpuInformation struct { NumHWthreads int SMTWidth int @@ -307,6 +340,7 @@ type CpuInformation struct { NumNumaDomains int } +// Get basic information about the CPU func CpuInfo() CpuInformation { var c CpuInformation @@ -342,7 +376,8 @@ func CpuInfo() CpuInformation { return c } -func GetCpuSocket(cpuid int) int { +// Get the CPU socket ID for a given hardware thread ID +func GetHwthreadSocket(cpuid int) int { cdata := CpuData() for _, d := range cdata { if d.Cpuid == cpuid { @@ -352,7 +387,8 @@ func GetCpuSocket(cpuid int) int { return -1 } -func GetCpuNumaDomain(cpuid int) int { +// Get the NUMA node ID for a given hardware thread ID +func GetHwthreadNumaDomain(cpuid int) int { cdata := CpuData() for _, d := range cdata { if d.Cpuid == cpuid { @@ -362,7 +398,8 @@ func GetCpuNumaDomain(cpuid int) int { return -1 } -func GetCpuDie(cpuid int) int { +// Get the CPU die ID for a given hardware thread ID +func GetHwthreadDie(cpuid int) int { cdata := CpuData() for _, d := range cdata { if d.Cpuid == cpuid { @@ -372,7 +409,8 @@ func GetCpuDie(cpuid int) int { return -1 } -func GetCpuCore(cpuid int) int { +// Get the CPU core ID for a given hardware thread ID +func GetHwthreadCore(cpuid int) int { cdata := CpuData() for _, d := range cdata { if d.Cpuid == cpuid { @@ -382,7 +420,8 @@ func GetCpuCore(cpuid int) int { return -1 } -func GetSocketCpus(socket int) []int { +// Get the all hardware thread ID associated with a CPU socket +func GetSocketHwthreads(socket int) []int { all := CpuData() cpulist := make([]int, 0) for _, d := range all { @@ -393,7 +432,8 @@ func GetSocketCpus(socket int) []int { return cpulist } -func GetNumaDomainCpus(domain int) []int { +// Get the all hardware thread ID associated with a NUMA node +func GetNumaDomainHwthreads(domain int) []int { all := CpuData() cpulist := make([]int, 0) for _, d := range all { @@ -404,7 +444,8 @@ func GetNumaDomainCpus(domain int) []int { return cpulist } -func GetDieCpus(die int) []int { +// Get the all hardware thread ID associated with a CPU die +func GetDieHwthreads(die int) []int { all := CpuData() cpulist := make([]int, 0) for _, d := range all { @@ -415,7 +456,8 @@ func GetDieCpus(die int) []int { return cpulist } -func GetCoreCpus(core int) []int { +// Get the all hardware thread ID associated with a CPU core +func GetCoreHwthreads(core int) []int { all := CpuData() cpulist := make([]int, 0) for _, d := range all { diff --git a/internal/metricAggregator/metricAggregatorFunctions.go b/internal/metricAggregator/metricAggregatorFunctions.go index 1fbef65..945dc6d 100644 --- a/internal/metricAggregator/metricAggregatorFunctions.go +++ b/internal/metricAggregator/metricAggregatorFunctions.go @@ -246,7 +246,7 @@ func matchfunc(args ...interface{}) (interface{}, error) { func getCpuCoreFunc(args ...interface{}) (interface{}, error) { switch cpuid := args[0].(type) { case int: - return topo.GetCpuCore(cpuid), nil + return topo.GetHwthreadCore(cpuid), nil } return -1, errors.New("function 'getCpuCore' accepts only an 'int' cpuid") } @@ -255,7 +255,7 @@ func getCpuCoreFunc(args ...interface{}) (interface{}, error) { func getCpuSocketFunc(args ...interface{}) (interface{}, error) { switch cpuid := args[0].(type) { case int: - return topo.GetCpuSocket(cpuid), nil + return topo.GetHwthreadSocket(cpuid), nil } return -1, errors.New("function 'getCpuCore' accepts only an 'int' cpuid") } @@ -264,7 +264,7 @@ func getCpuSocketFunc(args ...interface{}) (interface{}, error) { func getCpuNumaDomainFunc(args ...interface{}) (interface{}, error) { switch cpuid := args[0].(type) { case int: - return topo.GetCpuNumaDomain(cpuid), nil + return topo.GetHwthreadNumaDomain(cpuid), nil } return -1, errors.New("function 'getCpuNuma' accepts only an 'int' cpuid") } @@ -273,7 +273,7 @@ func getCpuNumaDomainFunc(args ...interface{}) (interface{}, error) { func getCpuDieFunc(args ...interface{}) (interface{}, error) { switch cpuid := args[0].(type) { case int: - return topo.GetCpuDie(cpuid), nil + return topo.GetHwthreadDie(cpuid), nil } return -1, errors.New("function 'getCpuDie' accepts only an 'int' cpuid") } @@ -336,7 +336,7 @@ func getCpuListOfDieFunc(args ...interface{}) (interface{}, error) { // wrapper function to get a list of all cpuids of the node func getCpuListOfNode(args ...interface{}) (interface{}, error) { - return topo.CpuList(), nil + return topo.HwthreadList(), nil } // helper function to get the cpuid list for a CCMetric type tag set (type and type-id) @@ -348,14 +348,14 @@ func getCpuListOfType(args ...interface{}) (interface{}, error) { case string: switch typ { case "node": - return topo.CpuList(), nil + return topo.HwthreadList(), nil case "socket": return getCpuListOfSocketFunc(args[1]) case "numadomain": return getCpuListOfNumaDomainFunc(args[1]) case "core": return getCpuListOfCoreFunc(args[1]) - case "cpu": + case "hwthread": var cpu int switch id := args[1].(type) { diff --git a/internal/metricRouter/README.md b/internal/metricRouter/README.md index fe2d64f..ed99b51 100644 --- a/internal/metricRouter/README.md +++ b/internal/metricRouter/README.md @@ -52,6 +52,11 @@ The CCMetric router sits in between the collectors and the sinks and can be used ], "rename_metrics" : { "metric_12345" : "mymetric" + }, + "normalize_units" : true, + "change_unit_prefix" : { + "mem_used" : "G", + "mem_total" : "G" } } ``` @@ -192,6 +197,14 @@ This option takes a list of evaluable conditions and performs them one after the ``` The first line is comparable with the example in `drop_metrics`, it drops all metrics starting with `drop_metric_` and ending with a number. The second line drops all metrics of the first hardware thread (**not** recommended) +# Manipulating the metric units + +## The `normalize_units` option +The cc-metric-collector tries to read the data from the system as it is reported. If available, it tries to read the metric unit from the system as well (e.g. from `/proc/meminfo`). The problem is that, depending on the source, the metric units are named differently. Just think about `byte`, `Byte`, `B`, `bytes`, ... +The [cc-units](https://github.com/ClusterCockpit/cc-units) package provides us a normalization option to use the same metric unit name for all metrics. It this option is set to true, all `unit` meta tags are normalized. + +## The `change_unit_prefix` section +It is often the case that metrics are reported by the system using a rather outdated unit prefix (like `/proc/meminfo` still uses kByte despite current memory sizes are in the GByte range). If you want to change the prefix of a unit, you can do that with the help of [cc-units](https://github.com/ClusterCockpit/cc-units). The setting works on the metric name and requires the new prefix for the metric. The cc-units package determines the scaling factor. # Aggregate metric values of the current interval with the `interval_aggregates` option @@ -239,3 +252,22 @@ Use cases for `interval_aggregates`: } } ``` + +# Order of operations + +The router performs the above mentioned options in a specific order. In order to get the logic you want for a specific metric, it is crucial to know the processing order: + +- Add the `hostname` tag (c) +- Manipulate the timestamp to the interval timestamp (c,r) +- Drop metrics based on `drop_metrics` and `drop_metrics_if` (c,r) +- Add tags based on `add_tags` (c,r) +- Delete tags based on `del_tags` (c,r) +- Rename metric based on `rename_metric` (c,r) + - Add tags based on `add_tags` to still work if the configuration uses the new name (c,r) + - Delete tags based on `del_tags` to still work if the configuration uses the new name (c,r) +- Normalize units when `normalize_units` is set (c,r) +- Convert unit prefix based on `change_unit_prefix` (c,r) + +Legend: +- 'c' if metric is coming from a collector +- 'r' if metric is coming from a receiver diff --git a/internal/metricRouter/metricRouter.go b/internal/metricRouter/metricRouter.go index 8875d0e..2614ced 100644 --- a/internal/metricRouter/metricRouter.go +++ b/internal/metricRouter/metricRouter.go @@ -12,6 +12,7 @@ import ( lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" agg "github.com/ClusterCockpit/cc-metric-collector/internal/metricAggregator" mct "github.com/ClusterCockpit/cc-metric-collector/internal/multiChanTicker" + units "github.com/ClusterCockpit/cc-units" ) const ROUTER_MAX_FORWARD = 50 @@ -35,6 +36,8 @@ type metricRouterConfig struct { IntervalStamp bool `json:"interval_timestamp"` // Update timestamp periodically by ticker each interval? NumCacheIntervals int `json:"num_cache_intervals"` // Number of intervals of cached metrics for evaluation MaxForward int `json:"max_forward"` // Number of maximal forwarded metrics at one select + NormalizeUnits bool `json:"normalize_units"` // Check unit meta flag and normalize it using cc-units + ChangeUnitPrefix map[string]string `json:"change_unit_prefix"` // Add prefix that should be applied to the metrics dropMetrics map[string]bool // Internal map for O(1) lookup } @@ -207,6 +210,38 @@ func (r *metricRouter) dropMetric(point lp.CCMetric) bool { return false } +func (r *metricRouter) prepareUnit(point lp.CCMetric) bool { + if r.config.NormalizeUnits { + if in_unit, ok := point.GetMeta("unit"); ok { + u := units.NewUnit(in_unit) + if u.Valid() { + point.AddMeta("unit", u.Short()) + } + } + } + if newP, ok := r.config.ChangeUnitPrefix[point.Name()]; ok { + + newPrefix := units.NewPrefix(newP) + + if in_unit, ok := point.GetMeta("unit"); ok && newPrefix != units.InvalidPrefix { + u := units.NewUnit(in_unit) + if u.Valid() { + cclog.ComponentDebug("MetricRouter", "Change prefix to", newP, "for metric", point.Name()) + conv, out_unit := units.GetUnitPrefixFactor(u, newPrefix) + if conv != nil && out_unit.Valid() { + if val, ok := point.GetField("value"); ok { + point.AddField("value", conv(val)) + point.AddMeta("unit", out_unit.Short()) + } + } + } + + } + } + + return true +} + // Start starts the metric router func (r *metricRouter) Start() { // start timer if configured @@ -232,9 +267,11 @@ func (r *metricRouter) Start() { if new, ok := r.config.RenameMetrics[name]; ok { point.SetName(new) point.AddMeta("oldname", name) + r.DoAddTags(point) + r.DoDelTags(point) } - r.DoAddTags(point) - r.DoDelTags(point) + + r.prepareUnit(point) for _, o := range r.outputs { o <- point diff --git a/scripts/cc-metric-collector.init b/scripts/cc-metric-collector.init index 35fa965..acb82eb 100755 --- a/scripts/cc-metric-collector.init +++ b/scripts/cc-metric-collector.init @@ -75,7 +75,7 @@ case "$1" in fi # Start Daemon - start-stop-daemon --start -b --chdir "$WORK_DIR" --user "$CC_USER" -c "$CC_USER" --pidfile "$PID_FILE" --exec $DAEMON -- $DAEMON_OPTS + start-stop-daemon --start -b --chdir "$WORK_DIR" --user "$CC_USER" -c "$CC_USER" --pidfile "$PID_FILE" --exec $DAEMON -- $CC_OPTS return=$? if [ $return -eq 0 ] then diff --git a/scripts/likwid_perfgroup_to_cc_config.py b/scripts/likwid_perfgroup_to_cc_config.py index f1c3451..52959ed 100755 --- a/scripts/likwid_perfgroup_to_cc_config.py +++ b/scripts/likwid_perfgroup_to_cc_config.py @@ -39,7 +39,7 @@ def group_to_json(groupfile): llist = re.split("\s+", line) calc = llist[-1] metric = " ".join(llist[:-1]) - scope = "cpu" + scope = "hwthread" if "BOX" in calc: scope = "socket" if "PWR" in calc: diff --git a/sinks/httpSink.go b/sinks/httpSink.go index 7713638..466915d 100644 --- a/sinks/httpSink.go +++ b/sinks/httpSink.go @@ -19,9 +19,9 @@ type HttpSinkConfig struct { URL string `json:"url,omitempty"` JWT string `json:"jwt,omitempty"` Timeout string `json:"timeout,omitempty"` - MaxIdleConns int `json:"max_idle_connections,omitempty"` IdleConnTimeout string `json:"idle_connection_timeout,omitempty"` FlushDelay string `json:"flush_delay,omitempty"` + MaxRetries int `json:"max_retries,omitempty"` } type HttpSink struct { @@ -32,83 +32,85 @@ type HttpSink struct { buffer *bytes.Buffer flushTimer *time.Timer config HttpSinkConfig - maxIdleConns int idleConnTimeout time.Duration timeout time.Duration flushDelay time.Duration } func (s *HttpSink) Write(m lp.CCMetric) error { - if s.buffer.Len() == 0 && s.flushDelay != 0 { - // This is the first write since the last flush, start the flushTimer! - if s.flushTimer != nil && s.flushTimer.Stop() { - cclog.ComponentDebug(s.name, "unexpected: the flushTimer was already running?") - } - - // Run a batched flush for all lines that have arrived in the last second - s.flushTimer = time.AfterFunc(s.flushDelay, func() { - if err := s.Flush(); err != nil { - cclog.ComponentError(s.name, "flush failed:", err.Error()) - } - }) - } - p := m.ToPoint(s.meta_as_tags) - s.lock.Lock() + firstWriteOfBatch := s.buffer.Len() == 0 _, err := s.encoder.Encode(p) - s.lock.Unlock() // defer does not work here as Flush() takes the lock as well - + s.lock.Unlock() if err != nil { cclog.ComponentError(s.name, "encoding failed:", err.Error()) return err } - // Flush synchronously if "flush_delay" is zero if s.flushDelay == 0 { return s.Flush() } - return err + if firstWriteOfBatch { + if s.flushTimer == nil { + s.flushTimer = time.AfterFunc(s.flushDelay, func() { + if err := s.Flush(); err != nil { + cclog.ComponentError(s.name, "flush failed:", err.Error()) + } + }) + } else { + s.flushTimer.Reset(s.flushDelay) + } + } + + return nil } func (s *HttpSink) Flush() error { - // buffer is read by client.Do, prevent concurrent modifications + // Own lock for as short as possible: the time it takes to copy the buffer. s.lock.Lock() - defer s.lock.Unlock() - - // Do not flush empty buffer - if s.buffer.Len() == 0 { + buf := make([]byte, s.buffer.Len()) + copy(buf, s.buffer.Bytes()) + s.buffer.Reset() + s.lock.Unlock() + if len(buf) == 0 { return nil } - // Create new request to send buffer - req, err := http.NewRequest(http.MethodPost, s.config.URL, s.buffer) - if err != nil { - cclog.ComponentError(s.name, "failed to create request:", err.Error()) - return err + var res *http.Response + for i := 0; i < s.config.MaxRetries; i++ { + // Create new request to send buffer + req, err := http.NewRequest(http.MethodPost, s.config.URL, bytes.NewReader(buf)) + if err != nil { + cclog.ComponentError(s.name, "failed to create request:", err.Error()) + return err + } + + // Set authorization header + if len(s.config.JWT) != 0 { + req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", s.config.JWT)) + } + + // Do request + res, err = s.client.Do(req) + if err != nil { + cclog.ComponentError(s.name, "transport/tcp error:", err.Error()) + // Wait between retries + time.Sleep(time.Duration(i+1) * (time.Second / 2)) + continue + } + + break } - // Set authorization header - if len(s.config.JWT) != 0 { - req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", s.config.JWT)) - } - - // Send - res, err := s.client.Do(req) - - // Clear buffer - s.buffer.Reset() - - // Handle transport/tcp errors - if err != nil { - cclog.ComponentError(s.name, "transport/tcp error:", err.Error()) - return err + if res == nil { + return errors.New("flush failed due to repeated errors") } // Handle application errors if res.StatusCode != http.StatusOK { - err = errors.New(res.Status) + err := errors.New(res.Status) cclog.ComponentError(s.name, "application error:", err.Error()) return err } @@ -128,10 +130,10 @@ func NewHttpSink(name string, config json.RawMessage) (Sink, error) { s := new(HttpSink) // Set default values s.name = fmt.Sprintf("HttpSink(%s)", name) - s.config.MaxIdleConns = 10 - s.config.IdleConnTimeout = "5s" + s.config.IdleConnTimeout = "120s" // should be larger than the measurement interval. s.config.Timeout = "5s" - s.config.FlushDelay = "1s" + s.config.FlushDelay = "5s" + s.config.MaxRetries = 3 // Read config if len(config) > 0 { @@ -143,9 +145,6 @@ func NewHttpSink(name string, config json.RawMessage) (Sink, error) { if len(s.config.URL) == 0 { return nil, errors.New("`url` config option is required for HTTP sink") } - if s.config.MaxIdleConns > 0 { - s.maxIdleConns = s.config.MaxIdleConns - } if len(s.config.IdleConnTimeout) > 0 { t, err := time.ParseDuration(s.config.IdleConnTimeout) if err == nil { @@ -170,7 +169,7 @@ func NewHttpSink(name string, config json.RawMessage) (Sink, error) { s.meta_as_tags[k] = true } tr := &http.Transport{ - MaxIdleConns: s.maxIdleConns, + MaxIdleConns: 1, // We will only ever talk to one host. IdleConnTimeout: s.idleConnTimeout, } s.client = &http.Client{Transport: tr, Timeout: s.timeout} diff --git a/sinks/influxAsyncSink.go b/sinks/influxAsyncSink.go index 31d127d..bf88079 100644 --- a/sinks/influxAsyncSink.go +++ b/sinks/influxAsyncSink.go @@ -25,7 +25,6 @@ type InfluxAsyncSinkConfig struct { Password string `json:"password,omitempty"` Organization string `json:"organization,omitempty"` SSL bool `json:"ssl,omitempty"` - RetentionPol string `json:"retention_policy,omitempty"` // Maximum number of points sent to server in single request. Default 5000 BatchSize uint `json:"batch_size,omitempty"` // Interval, in ms, in which is buffer flushed if it has not been already written (by reaching batch size) . Default 1000ms @@ -186,12 +185,17 @@ func NewInfluxAsyncSink(name string, config json.RawMessage) (Sink, error) { return nil, err } } - if len(s.config.Host) == 0 || - len(s.config.Port) == 0 || - len(s.config.Database) == 0 || - len(s.config.Organization) == 0 || - len(s.config.Password) == 0 { - return nil, errors.New("not all configuration variables set required by InfluxAsyncSink") + if len(s.config.Port) == 0 { + return nil, errors.New("Missing port configuration required by InfluxSink") + } + if len(s.config.Database) == 0 { + return nil, errors.New("Missing database configuration required by InfluxSink") + } + if len(s.config.Organization) == 0 { + return nil, errors.New("Missing organization configuration required by InfluxSink") + } + if len(s.config.Password) == 0 { + return nil, errors.New("Missing password configuration required by InfluxSink") } // Create lookup map to use meta infos as tags in the output metric s.meta_as_tags = make(map[string]bool) diff --git a/sinks/influxSink.go b/sinks/influxSink.go index b382c38..212647d 100644 --- a/sinks/influxSink.go +++ b/sinks/influxSink.go @@ -16,37 +16,28 @@ import ( "github.com/influxdata/influxdb-client-go/v2/api/write" ) -type InfluxSinkConfig struct { - defaultSinkConfig - Host string `json:"host,omitempty"` - Port string `json:"port,omitempty"` - Database string `json:"database,omitempty"` - User string `json:"user,omitempty"` - Password string `json:"password,omitempty"` - Organization string `json:"organization,omitempty"` - SSL bool `json:"ssl,omitempty"` - FlushDelay string `json:"flush_delay,omitempty"` - BatchSize int `json:"batch_size,omitempty"` - RetentionPol string `json:"retention_policy,omitempty"` - // InfluxRetryInterval string `json:"retry_interval"` - // InfluxExponentialBase uint `json:"retry_exponential_base"` - // InfluxMaxRetries uint `json:"max_retries"` - // InfluxMaxRetryTime string `json:"max_retry_time"` - //InfluxMaxRetryDelay string `json:"max_retry_delay"` // It is mentioned in the docs but there is no way to set it -} - type InfluxSink struct { sink - client influxdb2.Client - writeApi influxdb2Api.WriteAPIBlocking - config InfluxSinkConfig - influxRetryInterval uint - influxMaxRetryTime uint - batch []*write.Point - flushTimer *time.Timer - flushDelay time.Duration - lock sync.Mutex // Flush() runs in another goroutine, so this lock has to protect the buffer - //influxMaxRetryDelay uint + client influxdb2.Client + writeApi influxdb2Api.WriteAPIBlocking + config struct { + defaultSinkConfig + Host string `json:"host,omitempty"` + Port string `json:"port,omitempty"` + Database string `json:"database,omitempty"` + User string `json:"user,omitempty"` + Password string `json:"password,omitempty"` + Organization string `json:"organization,omitempty"` + SSL bool `json:"ssl,omitempty"` + // Maximum number of points sent to server in single request. Default 100 + BatchSize int `json:"batch_size,omitempty"` + // Interval, in which is buffer flushed if it has not been already written (by reaching batch size). Default 1s + FlushInterval string `json:"flush_delay,omitempty"` + } + batch []*write.Point + flushTimer *time.Timer + flushDelay time.Duration + lock sync.Mutex // Flush() runs in another goroutine, so this lock has to protect the buffer } // connect connects to the InfluxDB server @@ -76,23 +67,6 @@ func (s *InfluxSink) connect() error { // Set influxDB client options clientOptions := influxdb2.DefaultOptions() - // if s.influxRetryInterval != 0 { - // cclog.ComponentDebug(s.name, "MaxRetryInterval", s.influxRetryInterval) - // clientOptions.SetMaxRetryInterval(s.influxRetryInterval) - // } - // if s.influxMaxRetryTime != 0 { - // cclog.ComponentDebug(s.name, "MaxRetryTime", s.influxMaxRetryTime) - // clientOptions.SetMaxRetryTime(s.influxMaxRetryTime) - // } - // if s.config.InfluxExponentialBase != 0 { - // cclog.ComponentDebug(s.name, "Exponential Base", s.config.InfluxExponentialBase) - // clientOptions.SetExponentialBase(s.config.InfluxExponentialBase) - // } - // if s.config.InfluxMaxRetries != 0 { - // cclog.ComponentDebug(s.name, "Max Retries", s.config.InfluxMaxRetries) - // clientOptions.SetMaxRetries(s.config.InfluxMaxRetries) - // } - // Do not check InfluxDB certificate clientOptions.SetTLSConfig( &tls.Config{ @@ -126,11 +100,13 @@ func (s *InfluxSink) Write(m lp.CCMetric) error { } // Run a batched flush for all lines that have arrived in the last flush delay interval - s.flushTimer = time.AfterFunc(s.flushDelay, func() { - if err := s.Flush(); err != nil { - cclog.ComponentError(s.name, "flush failed:", err.Error()) - } - }) + s.flushTimer = time.AfterFunc( + s.flushDelay, + func() { + if err := s.Flush(); err != nil { + cclog.ComponentError(s.name, "flush failed:", err.Error()) + } + }) } // Append metric to batch slice @@ -194,7 +170,7 @@ func NewInfluxSink(name string, config json.RawMessage) (Sink, error) { // Set config default values s.config.BatchSize = 100 - s.config.FlushDelay = "1s" + s.config.FlushInterval = "1s" // Read config if len(config) > 0 { @@ -203,12 +179,6 @@ func NewInfluxSink(name string, config json.RawMessage) (Sink, error) { return nil, err } } - s.influxRetryInterval = 0 - s.influxMaxRetryTime = 0 - // s.config.InfluxRetryInterval = "" - // s.config.InfluxMaxRetryTime = "" - // s.config.InfluxMaxRetries = 0 - // s.config.InfluxExponentialBase = 0 if len(s.config.Host) == 0 { return nil, errors.New("Missing host configuration required by InfluxSink") @@ -232,21 +202,9 @@ func NewInfluxSink(name string, config json.RawMessage) (Sink, error) { s.meta_as_tags[k] = true } - // toUint := func(duration string, def uint) uint { - // if len(duration) > 0 { - // t, err := time.ParseDuration(duration) - // if err == nil { - // return uint(t.Milliseconds()) - // } - // } - // return def - // } - // s.influxRetryInterval = toUint(s.config.InfluxRetryInterval, s.influxRetryInterval) - // s.influxMaxRetryTime = toUint(s.config.InfluxMaxRetryTime, s.influxMaxRetryTime) - // Configure flush delay duration - if len(s.config.FlushDelay) > 0 { - t, err := time.ParseDuration(s.config.FlushDelay) + if len(s.config.FlushInterval) > 0 { + t, err := time.ParseDuration(s.config.FlushInterval) if err == nil { s.flushDelay = t } diff --git a/sinks/natsSink.go b/sinks/natsSink.go index 0597e9b..4d43454 100644 --- a/sinks/natsSink.go +++ b/sinks/natsSink.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + "sync" "time" cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" @@ -15,11 +16,12 @@ import ( type NatsSinkConfig struct { defaultSinkConfig - Host string `json:"host,omitempty"` - Port string `json:"port,omitempty"` - Database string `json:"database,omitempty"` - User string `json:"user,omitempty"` - Password string `json:"password,omitempty"` + Host string `json:"host,omitempty"` + Port string `json:"port,omitempty"` + Subject string `json:"subject,omitempty"` + User string `json:"user,omitempty"` + Password string `json:"password,omitempty"` + FlushDelay string `json:"flush_delay,omitempty"` } type NatsSink struct { @@ -28,6 +30,10 @@ type NatsSink struct { encoder *influx.Encoder buffer *bytes.Buffer config NatsSinkConfig + + lock sync.Mutex + flushDelay time.Duration + flushTimer *time.Timer } func (s *NatsSink) connect() error { @@ -54,37 +60,53 @@ func (s *NatsSink) connect() error { } func (s *NatsSink) Write(m lp.CCMetric) error { - if s.client != nil { - _, err := s.encoder.Encode(m.ToPoint(s.meta_as_tags)) - if err != nil { - cclog.ComponentError(s.name, "Write:", err.Error()) - return err - } + s.lock.Lock() + _, err := s.encoder.Encode(m.ToPoint(s.meta_as_tags)) + s.lock.Unlock() + if err != nil { + cclog.ComponentError(s.name, "Write:", err.Error()) + return err } + + if s.flushDelay == 0 { + s.Flush() + } else if s.flushTimer == nil { + s.flushTimer = time.AfterFunc(s.flushDelay, func() { + s.Flush() + }) + } else { + s.flushTimer.Reset(s.flushDelay) + } + return nil } func (s *NatsSink) Flush() error { - if s.client != nil { - if err := s.client.Publish(s.config.Database, s.buffer.Bytes()); err != nil { - cclog.ComponentError(s.name, "Flush:", err.Error()) - return err - } - s.buffer.Reset() + s.lock.Lock() + buf := append([]byte{}, s.buffer.Bytes()...) // copy bytes + s.buffer.Reset() + s.lock.Unlock() + + if len(buf) == 0 { + return nil + } + + if err := s.client.Publish(s.config.Subject, buf); err != nil { + cclog.ComponentError(s.name, "Flush:", err.Error()) + return err } return nil } func (s *NatsSink) Close() { - if s.client != nil { - cclog.ComponentDebug(s.name, "Close") - s.client.Close() - } + cclog.ComponentDebug(s.name, "Close") + s.client.Close() } func NewNatsSink(name string, config json.RawMessage) (Sink, error) { s := new(NatsSink) s.name = fmt.Sprintf("NatsSink(%s)", name) + s.flushDelay = 10 * time.Second if len(config) > 0 { err := json.Unmarshal(config, &s.config) if err != nil { @@ -94,7 +116,7 @@ func NewNatsSink(name string, config json.RawMessage) (Sink, error) { } if len(s.config.Host) == 0 || len(s.config.Port) == 0 || - len(s.config.Database) == 0 { + len(s.config.Subject) == 0 { return nil, errors.New("not all configuration variables set required by NatsSink") } // Create lookup map to use meta infos as tags in the output metric @@ -112,5 +134,15 @@ func NewNatsSink(name string, config json.RawMessage) (Sink, error) { if err := s.connect(); err != nil { return nil, fmt.Errorf("unable to connect: %v", err) } + + s.flushTimer = nil + if len(s.config.FlushDelay) != 0 { + var err error + s.flushDelay, err = time.ParseDuration(s.config.FlushDelay) + if err != nil { + return nil, err + } + } + return s, nil }