mirror of
				https://github.com/ClusterCockpit/cc-metric-collector.git
				synced 2025-10-22 05:45:07 +02:00 
			
		
		
		
	Compare commits
	
		
			1 Commits
		
	
	
		
			v0.6.0
			...
			httpsink_w
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
|  | feb8f16af8 | 
							
								
								
									
										4
									
								
								.github/ci-config.json
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										4
									
								
								.github/ci-config.json
									
									
									
									
										vendored
									
									
								
							| @@ -3,6 +3,6 @@ | ||||
|   "collectors" : ".github/ci-collectors.json", | ||||
|   "receivers" : ".github/ci-receivers.json", | ||||
|   "router" : ".github/ci-router.json", | ||||
|   "interval": "5s", | ||||
|   "duration": "1s" | ||||
|   "interval": 5, | ||||
|   "duration": 1 | ||||
| } | ||||
|   | ||||
							
								
								
									
										4
									
								
								.github/ci-sinks.json
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										4
									
								
								.github/ci-sinks.json
									
									
									
									
										vendored
									
									
								
							| @@ -1,8 +1,6 @@ | ||||
| { | ||||
|   "testoutput" : { | ||||
|     "type" : "stdout", | ||||
|     "meta_as_tags" : [ | ||||
|       "unit" | ||||
|     ] | ||||
|     "meta_as_tags" : true | ||||
|   } | ||||
| } | ||||
|   | ||||
							
								
								
									
										5
									
								
								.github/workflows/Release.yml
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										5
									
								
								.github/workflows/Release.yml
									
									
									
									
										vendored
									
									
								
							| @@ -40,10 +40,7 @@ jobs: | ||||
|  | ||||
|     # Use dnf to install build dependencies | ||||
|     - name: Install build dependencies | ||||
|       run: | | ||||
|         dnf --assumeyes install 'dnf-command(builddep)' | ||||
|         dnf --assumeyes install which | ||||
|         dnf --assumeyes builddep scripts/cc-metric-collector.spec | ||||
|       run: dnf --assumeyes builddep scripts/cc-metric-collector.spec | ||||
|  | ||||
|     - name: RPM build MetricCollector | ||||
|       id: rpmbuild | ||||
|   | ||||
							
								
								
									
										42
									
								
								.github/workflows/runonce.yml
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										42
									
								
								.github/workflows/runonce.yml
									
									
									
									
										vendored
									
									
								
							| @@ -7,32 +7,6 @@ name: Run Test | ||||
| on: push | ||||
|  | ||||
| jobs: | ||||
|   # | ||||
|   # Job build-1-18 | ||||
|   # Build on latest Ubuntu using golang version 1.18 | ||||
|   # | ||||
|   build-1-18: | ||||
|     runs-on: ubuntu-latest | ||||
|     steps: | ||||
|     # See: https://github.com/marketplace/actions/checkout | ||||
|     # Checkout git repository and submodules | ||||
|     - name: Checkout | ||||
|       uses: actions/checkout@v2 | ||||
|       with: | ||||
|         submodules: recursive | ||||
|  | ||||
|     # See: https://github.com/marketplace/actions/setup-go-environment | ||||
|     - name: Setup Golang | ||||
|       uses: actions/setup-go@v3 | ||||
|       with: | ||||
|         go-version: '1.18.2' | ||||
|  | ||||
|     - name: Build MetricCollector | ||||
|       run: make | ||||
|  | ||||
|     - name: Run MetricCollector once | ||||
|       run: ./cc-metric-collector --once --config .github/ci-config.json | ||||
|  | ||||
|   # | ||||
|   # Job build-1-17 | ||||
|   # Build on latest Ubuntu using golang version 1.17  | ||||
| @@ -49,9 +23,13 @@ jobs: | ||||
|  | ||||
|     # See: https://github.com/marketplace/actions/setup-go-environment | ||||
|     - name: Setup Golang | ||||
|       uses: actions/setup-go@v3 | ||||
|       uses: actions/setup-go@v2 | ||||
|       with: | ||||
|         go-version: '1.17.7' | ||||
|         go-version: '^1.17.7' | ||||
|  | ||||
|     # Install libganglia | ||||
|     - name: Setup Ganglia | ||||
|       run: sudo apt install ganglia-monitor libganglia1 | ||||
|  | ||||
|     - name: Build MetricCollector | ||||
|       run: make | ||||
| @@ -75,9 +53,13 @@ jobs: | ||||
|  | ||||
|     # See: https://github.com/marketplace/actions/setup-go-environment | ||||
|     - name: Setup Golang | ||||
|       uses: actions/setup-go@v3 | ||||
|       uses: actions/setup-go@v2 | ||||
|       with: | ||||
|         go-version: '1.16.7' # The version AlmaLinux 8.5 uses | ||||
|         go-version: '^1.16.7' # The version AlmaLinux 8.5 uses | ||||
|  | ||||
|     # Install libganglia | ||||
|     - name: Setup Ganglia | ||||
|       run: sudo apt install ganglia-monitor libganglia1 | ||||
|  | ||||
|     - name: Build MetricCollector | ||||
|       run: make | ||||
|   | ||||
							
								
								
									
										28
									
								
								Makefile
									
									
									
									
									
								
							
							
						
						
									
										28
									
								
								Makefile
									
									
									
									
									
								
							| @@ -16,23 +16,15 @@ COMPONENT_DIRS   := collectors \ | ||||
| 			internal/multiChanTicker | ||||
|  | ||||
| BINDIR = bin | ||||
| GOBIN = $(shell which go) | ||||
|  | ||||
|  | ||||
| .PHONY: all | ||||
| all: $(APP) | ||||
|  | ||||
| $(APP): $(GOSRC) | ||||
| 	if [ "$(shell $(GOBIN) version | cut -d' ' -f 3 | cut -d '.' -f1-2)" = "go1.16" ]; then \ | ||||
| 		echo "1.16"; \ | ||||
| 		cp go.mod.1.16 go.mod; \ | ||||
| 	else \ | ||||
| 		echo "1.17+"; \ | ||||
| 		cp go.mod.1.17+ go.mod; \ | ||||
| 	fi | ||||
| 	make -C collectors | ||||
| 	$(GOBIN) get | ||||
| 	$(GOBIN) build -o $(APP) $(GOSRC_APP) | ||||
| 	go get | ||||
| 	go build -o $(APP) $(GOSRC_APP) | ||||
|  | ||||
| install: $(APP) | ||||
| 	@WORKSPACE=$(PREFIX) | ||||
| @@ -59,25 +51,25 @@ clean: | ||||
|  | ||||
| .PHONY: fmt | ||||
| fmt: | ||||
| 	$(GOBIN) fmt $(GOSRC_COLLECTORS) | ||||
| 	$(GOBIN) fmt $(GOSRC_SINKS) | ||||
| 	$(GOBIN) fmt $(GOSRC_RECEIVERS) | ||||
| 	$(GOBIN) fmt $(GOSRC_APP) | ||||
| 	@for F in $(GOSRC_INTERNAL); do $(GOBIN) fmt $$F; done | ||||
| 	go fmt $(GOSRC_COLLECTORS) | ||||
| 	go fmt $(GOSRC_SINKS) | ||||
| 	go fmt $(GOSRC_RECEIVERS) | ||||
| 	go fmt $(GOSRC_APP) | ||||
| 	@for F in $(GOSRC_INTERNAL); do go fmt $$F; done | ||||
|  | ||||
|  | ||||
| # Examine Go source code and reports suspicious constructs | ||||
| .PHONY: vet | ||||
| vet: | ||||
| 	$(GOBIN) vet ./... | ||||
| 	go vet ./... | ||||
|  | ||||
|  | ||||
| # Run linter for the Go programming language. | ||||
| # Using static analysis, it finds bugs and performance issues, offers simplifications, and enforces style rules | ||||
| .PHONY: staticcheck | ||||
| staticcheck: | ||||
| 	$(GOBIN) install honnef.co/go/tools/cmd/staticcheck@latest | ||||
| 	$$($(GOBIN) env GOPATH)/bin/staticcheck ./... | ||||
| 	go install honnef.co/go/tools/cmd/staticcheck@latest | ||||
| 	$$(go env GOPATH)/bin/staticcheck ./... | ||||
|  | ||||
| .ONESHELL: | ||||
| .PHONY: RPM | ||||
|   | ||||
| @@ -22,8 +22,8 @@ import ( | ||||
| ) | ||||
|  | ||||
| type CentralConfigFile struct { | ||||
| 	Interval            string `json:"interval"` | ||||
| 	Duration            string `json:"duration"` | ||||
| 	Interval            int    `json:"interval"` | ||||
| 	Duration            int    `json:"duration"` | ||||
| 	CollectorConfigFile string `json:"collectors"` | ||||
| 	RouterConfigFile    string `json:"router"` | ||||
| 	SinkConfigFile      string `json:"sinks"` | ||||
| @@ -173,36 +173,16 @@ func mainFunc() int { | ||||
| 		cclog.Error("Error reading configuration file ", rcfg.CliArgs["configfile"], ": ", err.Error()) | ||||
| 		return 1 | ||||
| 	} | ||||
|  | ||||
| 	// Properly use duration parser with inputs like '60s', '5m' or similar | ||||
| 	if len(rcfg.ConfigFile.Interval) > 0 { | ||||
| 		t, err := time.ParseDuration(rcfg.ConfigFile.Interval) | ||||
| 		if err != nil { | ||||
| 			cclog.Error("Configuration value 'interval' no valid duration") | ||||
| 		} | ||||
| 		rcfg.Interval = t | ||||
| 		if rcfg.Interval == 0 { | ||||
| 	if rcfg.ConfigFile.Interval <= 0 || time.Duration(rcfg.ConfigFile.Interval)*time.Second <= 0 { | ||||
| 		cclog.Error("Configuration value 'interval' must be greater than zero") | ||||
| 		return 1 | ||||
| 	} | ||||
| 	} | ||||
|  | ||||
| 	// Properly use duration parser with inputs like '60s', '5m' or similar | ||||
| 	if len(rcfg.ConfigFile.Duration) > 0 { | ||||
| 		t, err := time.ParseDuration(rcfg.ConfigFile.Duration) | ||||
| 		if err != nil { | ||||
| 			cclog.Error("Configuration value 'duration' no valid duration") | ||||
| 		} | ||||
| 		rcfg.Duration = t | ||||
| 		if rcfg.Duration == 0 { | ||||
| 	rcfg.Interval = time.Duration(rcfg.ConfigFile.Interval) * time.Second | ||||
| 	if rcfg.ConfigFile.Duration <= 0 || time.Duration(rcfg.ConfigFile.Duration)*time.Second <= 0 { | ||||
| 		cclog.Error("Configuration value 'duration' must be greater than zero") | ||||
| 		return 1 | ||||
| 	} | ||||
| 	} | ||||
| 	if rcfg.Duration > rcfg.Interval { | ||||
| 		cclog.Error("The interval should be greater than duration") | ||||
| 		return 1 | ||||
| 	} | ||||
| 	rcfg.Duration = time.Duration(rcfg.ConfigFile.Duration) * time.Second | ||||
|  | ||||
| 	if len(rcfg.ConfigFile.RouterConfigFile) == 0 { | ||||
| 		cclog.Error("Metric router configuration file must be set") | ||||
| @@ -291,7 +271,7 @@ func mainFunc() int { | ||||
|  | ||||
| 	// Wait until one tick has passed. This is a workaround | ||||
| 	if rcfg.CliArgs["once"] == "true" { | ||||
| 		x := 1.2 * float64(rcfg.Interval.Seconds()) | ||||
| 		x := 1.2 * float64(rcfg.ConfigFile.Interval) | ||||
| 		time.Sleep(time.Duration(int(x)) * time.Second) | ||||
| 		shutdownSignal <- os.Interrupt | ||||
| 	} | ||||
|   | ||||
| @@ -1,28 +1,22 @@ | ||||
|  | ||||
| all: likwid | ||||
|  | ||||
|  | ||||
| # LIKWID version | ||||
| LIKWID_VERSION = 5.2.1 | ||||
| LIKWID_INSTALLED_FOLDER=$(shell dirname $(shell which likwid-topology 2>/dev/null) 2>/dev/null) | ||||
|  | ||||
| LIKWID_FOLDER="$(shell pwd)/likwid" | ||||
|  | ||||
| all: $(LIKWID_FOLDER)/likwid.h | ||||
|  | ||||
| .ONESHELL: | ||||
| .PHONY: $(LIKWID_FOLDER)/likwid.h | ||||
| $(LIKWID_FOLDER)/likwid.h: | ||||
| 	if [ "$(LIKWID_INSTALLED_FOLDER)" != "" ]; then \ | ||||
| 		BASE="$(LIKWID_INSTALLED_FOLDER)/../include"; \ | ||||
| 		mkdir -p $(LIKWID_FOLDER); \ | ||||
| 		cp $$BASE/*.h $(LIKWID_FOLDER); \ | ||||
| 	else \ | ||||
| 		BUILD_FOLDER="$${PWD}/likwidbuild"; \ | ||||
| 		if [ -d $(LIKWID_FOLDER) ]; then rm -r $(LIKWID_FOLDER); fi; \ | ||||
| 		mkdir --parents --verbose  $(LIKWID_FOLDER) $${BUILD_FOLDER}; \ | ||||
| 		wget -P "$${BUILD_FOLDER}" http://ftp.rrze.uni-erlangen.de/mirrors/likwid/likwid-$(LIKWID_VERSION).tar.gz; \ | ||||
| 		tar -C $${BUILD_FOLDER} -xf $${BUILD_FOLDER}/likwid-$(LIKWID_VERSION).tar.gz; \ | ||||
| 		install -Dpm 0644 $${BUILD_FOLDER}/likwid-$(LIKWID_VERSION)/src/includes/likwid*.h $(LIKWID_FOLDER)/; \ | ||||
| 		install -Dpm 0644 $${BUILD_FOLDER}/likwid-$(LIKWID_VERSION)/src/includes/bstrlib.h $(LIKWID_FOLDER)/; \ | ||||
| 		rm -r $${BUILD_FOLDER}; \ | ||||
| 	fi | ||||
| .PHONY: likwid | ||||
| likwid: | ||||
| 	INSTALL_FOLDER="$${PWD}/likwid" | ||||
| 	BUILD_FOLDER="$${PWD}/likwidbuild" | ||||
| 	if [ -d $${INSTALL_FOLDER} ]; then rm -r $${INSTALL_FOLDER}; fi | ||||
| 	mkdir --parents --verbose  $${INSTALL_FOLDER} $${BUILD_FOLDER} | ||||
| 	wget -P "$${BUILD_FOLDER}" ftp://ftp.rrze.uni-erlangen.de/mirrors/likwid/likwid-$(LIKWID_VERSION).tar.gz | ||||
| 	tar -C $${BUILD_FOLDER} -xf $${BUILD_FOLDER}/likwid-$(LIKWID_VERSION).tar.gz | ||||
| 	install -Dpm 0644 $${BUILD_FOLDER}/likwid-$(LIKWID_VERSION)/src/includes/likwid*.h $${INSTALL_FOLDER}/ | ||||
| 	install -Dpm 0644 $${BUILD_FOLDER}/likwid-$(LIKWID_VERSION)/src/includes/bstrlib.h $${INSTALL_FOLDER}/ | ||||
| 	rm -r $${BUILD_FOLDER} | ||||
|  | ||||
|  | ||||
| clean: | ||||
|   | ||||
| @@ -39,7 +39,6 @@ In contrast to the configuration files for sinks and receivers, the collectors c | ||||
| * [`gpfs`](./gpfsMetric.md) | ||||
| * [`beegfs_meta`](./beegfsmetaMetric.md) | ||||
| * [`beegfs_storage`](./beegfsstorageMetric.md) | ||||
| * [`rocm_smi`](./rocmsmiMetric.md) | ||||
|  | ||||
| ## Todos | ||||
|  | ||||
|   | ||||
| @@ -55,7 +55,6 @@ func (m *BeegfsMetaCollector) Init(config json.RawMessage) error { | ||||
|  | ||||
| 	m.name = "BeegfsMetaCollector" | ||||
| 	m.setup() | ||||
| 	m.parallel = true | ||||
| 	// Set default beegfs-ctl binary | ||||
|  | ||||
| 	m.config.Beegfs = DEFAULT_BEEGFS_CMD | ||||
|   | ||||
| @@ -48,7 +48,6 @@ func (m *BeegfsStorageCollector) Init(config json.RawMessage) error { | ||||
|  | ||||
| 	m.name = "BeegfsStorageCollector" | ||||
| 	m.setup() | ||||
| 	m.parallel = true | ||||
| 	// Set default beegfs-ctl binary | ||||
|  | ||||
| 	m.config.Beegfs = DEFAULT_BEEGFS_CMD | ||||
|   | ||||
| @@ -36,21 +36,17 @@ var AvailableCollectors = map[string]MetricCollector{ | ||||
| 	"numastats":        new(NUMAStatsCollector), | ||||
| 	"beegfs_meta":      new(BeegfsMetaCollector), | ||||
| 	"beegfs_storage":   new(BeegfsStorageCollector), | ||||
| 	"rocm_smi":        new(RocmSmiCollector), | ||||
| } | ||||
|  | ||||
| // Metric collector manager data structure | ||||
| type collectorManager struct { | ||||
| 	collectors   []MetricCollector          // List of metric collectors to read in parallel | ||||
| 	serial       []MetricCollector          // List of metric collectors to read serially | ||||
| 	collectors []MetricCollector          // List of metric collectors to use | ||||
| 	output     chan lp.CCMetric           // Output channels | ||||
| 	done       chan bool                  // channel to finish / stop metric collector manager | ||||
| 	ticker     mct.MultiChanTicker        // periodically ticking once each interval | ||||
| 	duration   time.Duration              // duration (for metrics that measure over a given duration) | ||||
| 	wg         *sync.WaitGroup            // wait group for all goroutines in cc-metric-collector | ||||
| 	config     map[string]json.RawMessage // json encoded config for collector manager | ||||
| 	collector_wg sync.WaitGroup             // internally used wait group for the parallel reading of collector | ||||
| 	parallel_run bool                       // Flag whether the collectors are currently read in parallel | ||||
| } | ||||
|  | ||||
| // Metric collector manager access functions | ||||
| @@ -70,7 +66,6 @@ type CollectorManager interface { | ||||
| // Initialization is done for all configured collectors | ||||
| func (cm *collectorManager) Init(ticker mct.MultiChanTicker, duration time.Duration, wg *sync.WaitGroup, collectConfigFile string) error { | ||||
| 	cm.collectors = make([]MetricCollector, 0) | ||||
| 	cm.serial = make([]MetricCollector, 0) | ||||
| 	cm.output = nil | ||||
| 	cm.done = make(chan bool) | ||||
| 	cm.wg = wg | ||||
| @@ -105,11 +100,7 @@ func (cm *collectorManager) Init(ticker mct.MultiChanTicker, duration time.Durat | ||||
| 			continue | ||||
| 		} | ||||
| 		cclog.ComponentDebug("CollectorManager", "ADD COLLECTOR", collector.Name()) | ||||
| 		if collector.Parallel() { | ||||
| 		cm.collectors = append(cm.collectors, collector) | ||||
| 		} else { | ||||
| 			cm.serial = append(cm.serial, collector) | ||||
| 		} | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
| @@ -125,10 +116,6 @@ func (cm *collectorManager) Start() { | ||||
| 		// Collector manager is done | ||||
| 		done := func() { | ||||
| 			// close all metric collectors | ||||
| 			if cm.parallel_run { | ||||
| 				cm.collector_wg.Wait() | ||||
| 				cm.parallel_run = false | ||||
| 			} | ||||
| 			for _, c := range cm.collectors { | ||||
| 				c.Close() | ||||
| 			} | ||||
| @@ -143,26 +130,7 @@ func (cm *collectorManager) Start() { | ||||
| 				done() | ||||
| 				return | ||||
| 			case t := <-tick: | ||||
| 				cm.parallel_run = true | ||||
| 				for _, c := range cm.collectors { | ||||
| 					// Wait for done signal or execute the collector | ||||
| 					select { | ||||
| 					case <-cm.done: | ||||
| 						done() | ||||
| 						return | ||||
| 					default: | ||||
| 						// Read metrics from collector c via goroutine | ||||
| 						cclog.ComponentDebug("CollectorManager", c.Name(), t) | ||||
| 						cm.collector_wg.Add(1) | ||||
| 						go func(myc MetricCollector) { | ||||
| 							myc.Read(cm.duration, cm.output) | ||||
| 							cm.collector_wg.Done() | ||||
| 						}(c) | ||||
| 					} | ||||
| 				} | ||||
| 				cm.collector_wg.Wait() | ||||
| 				cm.parallel_run = false | ||||
| 				for _, c := range cm.serial { | ||||
| 					// Wait for done signal or execute the collector | ||||
| 					select { | ||||
| 					case <-cm.done: | ||||
|   | ||||
| @@ -48,7 +48,6 @@ func (m *CPUFreqCpuInfoCollector) Init(config json.RawMessage) error { | ||||
| 	m.setup() | ||||
|  | ||||
| 	m.name = "CPUFreqCpuInfoCollector" | ||||
| 	m.parallel = true | ||||
| 	m.meta = map[string]string{ | ||||
| 		"source": m.name, | ||||
| 		"group":  "CPU", | ||||
| @@ -151,7 +150,7 @@ func (m *CPUFreqCpuInfoCollector) Init(config json.RawMessage) error { | ||||
| 		t.numNonHT = numNonHT | ||||
| 		t.numNonHT_int = numNonHT_int | ||||
| 		t.tagSet = map[string]string{ | ||||
| 			"type":       "hwthread", | ||||
| 			"type":       "cpu", | ||||
| 			"type-id":    t.processor, | ||||
| 			"package_id": t.physicalPackageID, | ||||
| 		} | ||||
|   | ||||
| @@ -4,7 +4,7 @@ | ||||
|   "cpufreq_cpuinfo": {} | ||||
| ``` | ||||
|  | ||||
| The `cpufreq_cpuinfo` collector reads the clock frequency from `/proc/cpuinfo` and outputs a handful **hwthread** metrics. | ||||
| The `cpufreq_cpuinfo` collector reads the clock frequency from `/proc/cpuinfo` and outputs a handful **cpu** metrics. | ||||
|  | ||||
| Metrics: | ||||
| * `cpufreq` | ||||
|   | ||||
| @@ -53,7 +53,6 @@ func (m *CPUFreqCollector) Init(config json.RawMessage) error { | ||||
|  | ||||
| 	m.name = "CPUFreqCollector" | ||||
| 	m.setup() | ||||
| 	m.parallel = true | ||||
| 	if len(config) > 0 { | ||||
| 		err := json.Unmarshal(config, &m.config) | ||||
| 		if err != nil { | ||||
| @@ -162,7 +161,7 @@ func (m *CPUFreqCollector) Init(config json.RawMessage) error { | ||||
| 		t.numNonHT = numNonHT | ||||
| 		t.numNonHT_int = numNonHT_int | ||||
| 		t.tagSet = map[string]string{ | ||||
| 			"type":       "hwthread", | ||||
| 			"type":       "cpu", | ||||
| 			"type-id":    t.processor, | ||||
| 			"package_id": t.physicalPackageID, | ||||
| 		} | ||||
|   | ||||
| @@ -5,7 +5,7 @@ | ||||
|   } | ||||
| ``` | ||||
|  | ||||
| The `cpufreq` collector reads the clock frequency from `/sys/devices/system/cpu/cpu*/cpufreq` and outputs a handful **hwthread** metrics. | ||||
| The `cpufreq` collector reads the clock frequency from `/sys/devices/system/cpu/cpu*/cpufreq` and outputs a handful **cpu** metrics. | ||||
|  | ||||
| Metrics: | ||||
| * `cpufreq` | ||||
| @@ -30,7 +30,6 @@ type CpustatCollector struct { | ||||
| func (m *CpustatCollector) Init(config json.RawMessage) error { | ||||
| 	m.name = "CpustatCollector" | ||||
| 	m.setup() | ||||
| 	m.parallel = true | ||||
| 	m.meta = map[string]string{"source": m.name, "group": "CPU", "unit": "Percent"} | ||||
| 	m.nodetags = map[string]string{"type": "node"} | ||||
| 	if len(config) > 0 { | ||||
| @@ -83,7 +82,7 @@ func (m *CpustatCollector) Init(config json.RawMessage) error { | ||||
| 		if strings.HasPrefix(linefields[0], "cpu") && strings.Compare(linefields[0], "cpu") != 0 { | ||||
| 			cpustr := strings.TrimLeft(linefields[0], "cpu") | ||||
| 			cpu, _ := strconv.Atoi(cpustr) | ||||
| 			m.cputags[linefields[0]] = map[string]string{"type": "hwthread", "type-id": fmt.Sprintf("%d", cpu)} | ||||
| 			m.cputags[linefields[0]] = map[string]string{"type": "cpu", "type-id": fmt.Sprintf("%d", cpu)} | ||||
| 			num_cpus++ | ||||
| 		} | ||||
| 	} | ||||
|   | ||||
| @@ -33,7 +33,6 @@ type CustomCmdCollector struct { | ||||
| func (m *CustomCmdCollector) Init(config json.RawMessage) error { | ||||
| 	var err error | ||||
| 	m.name = "CustomCmdCollector" | ||||
| 	m.parallel = true | ||||
| 	m.meta = map[string]string{"source": m.name, "group": "Custom"} | ||||
| 	if len(config) > 0 { | ||||
| 		err = json.Unmarshal(config, &m.config) | ||||
|   | ||||
| @@ -29,7 +29,6 @@ type DiskstatCollector struct { | ||||
|  | ||||
| func (m *DiskstatCollector) Init(config json.RawMessage) error { | ||||
| 	m.name = "DiskstatCollector" | ||||
| 	m.parallel = true | ||||
| 	m.meta = map[string]string{"source": m.name, "group": "Disk"} | ||||
| 	m.setup() | ||||
| 	if len(config) > 0 { | ||||
| @@ -78,18 +77,11 @@ func (m *DiskstatCollector) Read(interval time.Duration, output chan lp.CCMetric | ||||
| 			continue | ||||
| 		} | ||||
| 		path := strings.Replace(linefields[1], `\040`, " ", -1) | ||||
| 		stat := syscall.Statfs_t{ | ||||
| 			Blocks: 0, | ||||
| 			Bsize:  0, | ||||
| 			Bfree:  0, | ||||
| 		} | ||||
| 		stat := syscall.Statfs_t{} | ||||
| 		err := syscall.Statfs(path, &stat) | ||||
| 		if err != nil { | ||||
| 			continue | ||||
| 		} | ||||
| 		if stat.Blocks == 0 || stat.Bsize == 0 { | ||||
| 			continue | ||||
| 		} | ||||
| 		tags := map[string]string{"type": "node", "device": linefields[0]} | ||||
| 		total := (stat.Blocks * uint64(stat.Bsize)) / uint64(1000000000) | ||||
| 		y, err := lp.New("disk_total", tags, m.meta, map[string]interface{}{"value": total}, time.Now()) | ||||
| @@ -103,13 +95,11 @@ func (m *DiskstatCollector) Read(interval time.Duration, output chan lp.CCMetric | ||||
| 			y.AddMeta("unit", "GBytes") | ||||
| 			output <- y | ||||
| 		} | ||||
| 		if total > 0 { | ||||
| 		perc := (100 * (total - free)) / total | ||||
| 		if perc > part_max_used { | ||||
| 			part_max_used = perc | ||||
| 		} | ||||
| 	} | ||||
| 	} | ||||
| 	y, err := lp.New("part_max_used", map[string]string{"type": "node"}, m.meta, map[string]interface{}{"value": int(part_max_used)}, time.Now()) | ||||
| 	if err == nil { | ||||
| 		y.AddMeta("unit", "percent") | ||||
|   | ||||
| @@ -46,7 +46,6 @@ func (m *GpfsCollector) Init(config json.RawMessage) error { | ||||
| 	var err error | ||||
| 	m.name = "GpfsCollector" | ||||
| 	m.setup() | ||||
| 	m.parallel = true | ||||
|  | ||||
| 	// Set default mmpmon binary | ||||
| 	m.config.Mmpmon = DEFAULT_GPFS_CMD | ||||
|   | ||||
| @@ -54,7 +54,6 @@ func (m *InfinibandCollector) Init(config json.RawMessage) error { | ||||
| 	var err error | ||||
| 	m.name = "InfinibandCollector" | ||||
| 	m.setup() | ||||
| 	m.parallel = true | ||||
| 	m.meta = map[string]string{ | ||||
| 		"source": m.name, | ||||
| 		"group":  "Network", | ||||
|   | ||||
| @@ -37,7 +37,6 @@ type IOstatCollector struct { | ||||
| func (m *IOstatCollector) Init(config json.RawMessage) error { | ||||
| 	var err error | ||||
| 	m.name = "IOstatCollector" | ||||
| 	m.parallel = true | ||||
| 	m.meta = map[string]string{"source": m.name, "group": "Disk"} | ||||
| 	m.setup() | ||||
| 	if len(config) > 0 { | ||||
|   | ||||
| @@ -34,7 +34,6 @@ type IpmiCollector struct { | ||||
| func (m *IpmiCollector) Init(config json.RawMessage) error { | ||||
| 	m.name = "IpmiCollector" | ||||
| 	m.setup() | ||||
| 	m.parallel = true | ||||
| 	m.meta = map[string]string{"source": m.name, "group": "IPMI"} | ||||
| 	m.config.IpmitoolPath = string(IPMITOOL_PATH) | ||||
| 	m.config.IpmisensorsPath = string(IPMISENSORS_PATH) | ||||
|   | ||||
| @@ -177,7 +177,6 @@ func getBaseFreq() float64 { | ||||
|  | ||||
| func (m *LikwidCollector) Init(config json.RawMessage) error { | ||||
| 	m.name = "LikwidCollector" | ||||
| 	m.parallel = false | ||||
| 	m.initialized = false | ||||
| 	m.running = false | ||||
| 	m.config.AccessMode = LIKWID_DEF_ACCESSMODE | ||||
| @@ -205,7 +204,7 @@ func (m *LikwidCollector) Init(config json.RawMessage) error { | ||||
|  | ||||
| 	m.meta = map[string]string{"group": "PerfCounter"} | ||||
| 	cclog.ComponentDebug(m.name, "Get cpulist and init maps and lists") | ||||
| 	cpulist := topo.HwthreadList() | ||||
| 	cpulist := topo.CpuList() | ||||
| 	m.cpulist = make([]C.int, len(cpulist)) | ||||
| 	m.cpu2tid = make(map[int]int) | ||||
| 	for i, c := range cpulist { | ||||
|   | ||||
| @@ -19,7 +19,7 @@ The `likwid` collector is probably the most complicated collector. The LIKWID li | ||||
|             "calc": "COUNTER0 + COUNTER1", | ||||
|             "publish": false, | ||||
|             "unit": "myunit", | ||||
|             "type": "hwthread" | ||||
|             "type": "cpu" | ||||
|           } | ||||
|         ] | ||||
|       } | ||||
| @@ -30,7 +30,7 @@ The `likwid` collector is probably the most complicated collector. The LIKWID li | ||||
|         "calc": "sum_01", | ||||
|         "publish": true, | ||||
|         "unit": "myunit", | ||||
|         "type": "hwthread" | ||||
|         "type": "cpu" | ||||
|       } | ||||
|     ] | ||||
|   } | ||||
| @@ -51,15 +51,15 @@ Additional options: | ||||
|  | ||||
| Hardware performance counters are scattered all over the system nowadays. A counter coveres a specific part of the system. While there are hardware thread specific counter for CPU cycles, instructions and so on, some others are specific for a whole CPU socket/package. To address that, the LikwidCollector provides the specification of a `type` for each metric. | ||||
|  | ||||
| - `hwthread` : One metric per CPU hardware thread with the tags `"type" : "hwthread"` and `"type-id" : "$hwthread_id"` | ||||
| - `cpu` : One metric per CPU hardware thread with the tags `"type" : "cpu"` and `"type-id" : "$cpu_id"` | ||||
| - `socket` : One metric per CPU socket/package with the tags `"type" : "socket"` and `"type-id" : "$socket_id"` | ||||
|  | ||||
| **Note:** You cannot specify `socket` scope for a metric that is measured at `hwthread` scope, so some kind of expert knowledge or lookup work in the [Likwid Wiki](https://github.com/RRZE-HPC/likwid/wiki) is required. Get the scope of each counter from the *Architecture* pages and as soon as one counter in a metric is socket-specific, the whole metric is socket-specific. | ||||
| **Note:** You should not specify the `socket` type for a metric that is measured at `cpu` scope and vice versa, so some kind of expert knowledge or lookup work in the [Likwid Wiki](https://github.com/RRZE-HPC/likwid/wiki) is required. Get the scope of each counter from the *Architecture* pages and as soon as one counter in a metric is socket-specific, the whole metric is socket-specific. | ||||
|  | ||||
| As a guideline: | ||||
| - All counters `FIXCx`, `PMCy` and `TMAz` have the scope `hwthread` | ||||
| - All counters `FIXCx`, `PMCy` and `TMAz` have the scope `cpu` | ||||
| - All counters names containing `BOX` have the scope `socket` | ||||
| - All `PWRx` counters have scope `socket`, except `"PWR1" : "RAPL_CORE_ENERGY"` has `hwthread` scope | ||||
| - All `PWRx` counters have scope `socket`, except `"PWR1" : "RAPL_CORE_ENERGY"` has `cpu` scope (AMD Zen) | ||||
| - All `DFCx` counters have scope `socket` | ||||
|  | ||||
| ### Help with the configuration | ||||
| @@ -90,7 +90,7 @@ $ scripts/likwid_perfgroup_to_cc_config.py ICX MEM_DP | ||||
|       "name": "Runtime (RDTSC) [s]", | ||||
|       "publish": true, | ||||
|       "unit": "seconds" | ||||
|       "scope": "hwthread" | ||||
|       "scope": "cpu" | ||||
|     }, | ||||
|     { | ||||
|       "..." : "..." | ||||
| @@ -147,20 +147,20 @@ One might think this does not happen often but often used metrics in the world o | ||||
|           { | ||||
|             "name": "ipc", | ||||
|             "calc": "PMC0/PMC1", | ||||
|             "type": "hwthread", | ||||
|             "type": "cpu", | ||||
|             "publish": true | ||||
|           }, | ||||
|           { | ||||
|             "name": "flops_any", | ||||
|             "calc": "0.000001*PMC2/time", | ||||
|             "unit": "MFlops/s", | ||||
|             "type": "hwthread", | ||||
|             "type": "cpu", | ||||
|             "publish": true | ||||
|           }, | ||||
|           { | ||||
|             "name": "clock", | ||||
|             "calc": "0.000001*(FIXC1/FIXC2)/inverseClock", | ||||
|             "type": "hwthread", | ||||
|             "type": "cpu", | ||||
|             "unit": "MHz", | ||||
|             "publish": true | ||||
|           }, | ||||
| @@ -219,33 +219,3 @@ One might think this does not happen often but often used metrics in the world o | ||||
|   } | ||||
| ``` | ||||
|  | ||||
| ### How to get the eventsets and metrics from LIKWID | ||||
|  | ||||
| The `likwid` collector reads hardware performance counters at a **hwthread** and **socket** level. The configuration looks quite complicated but it is basically copy&paste from [LIKWID's performance groups](https://github.com/RRZE-HPC/likwid/tree/master/groups). The collector made multiple iterations and tried to use the performance groups but it lacked flexibility. The current way of configuration provides most flexibility. | ||||
|  | ||||
| The logic is as following: There are multiple eventsets, each consisting of a list of counters+events and a list of metrics. If you compare a common performance group with the example setting above, there is not much difference: | ||||
| ``` | ||||
| EVENTSET                         ->   "events": { | ||||
| FIXC1 ACTUAL_CPU_CLOCK           ->     "FIXC1": "ACTUAL_CPU_CLOCK", | ||||
| FIXC2 MAX_CPU_CLOCK              ->     "FIXC2": "MAX_CPU_CLOCK", | ||||
| PMC0  RETIRED_INSTRUCTIONS       ->     "PMC0" : "RETIRED_INSTRUCTIONS", | ||||
| PMC1  CPU_CLOCKS_UNHALTED        ->     "PMC1" : "CPU_CLOCKS_UNHALTED", | ||||
| PMC2  RETIRED_SSE_AVX_FLOPS_ALL  ->     "PMC2": "RETIRED_SSE_AVX_FLOPS_ALL", | ||||
| PMC3  MERGE                      ->     "PMC3": "MERGE", | ||||
|                                  ->   } | ||||
| ``` | ||||
|  | ||||
| The metrics are following the same procedure: | ||||
|  | ||||
| ``` | ||||
| METRICS                          ->   "metrics": [ | ||||
| IPC   PMC0/PMC1                  ->     { | ||||
|                                  ->       "name" : "IPC", | ||||
|                                  ->       "calc" : "PMC0/PMC1", | ||||
|                                  ->       "scope": "hwthread", | ||||
|                                  ->       "publish": true | ||||
|                                  ->     } | ||||
|                                  ->   ] | ||||
| ``` | ||||
|  | ||||
| The script `scripts/likwid_perfgroup_to_cc_config.py` might help you. | ||||
|   | ||||
| @@ -36,7 +36,6 @@ type LoadavgCollector struct { | ||||
|  | ||||
| func (m *LoadavgCollector) Init(config json.RawMessage) error { | ||||
| 	m.name = "LoadavgCollector" | ||||
| 	m.parallel = true | ||||
| 	m.setup() | ||||
| 	if len(config) > 0 { | ||||
| 		err := json.Unmarshal(config, &m.config) | ||||
|   | ||||
| @@ -288,7 +288,6 @@ var LustreDeriveMetrics = []LustreMetricDefinition{ | ||||
| func (m *LustreCollector) Init(config json.RawMessage) error { | ||||
| 	var err error | ||||
| 	m.name = "LustreCollector" | ||||
| 	m.parallel = true | ||||
| 	if len(config) > 0 { | ||||
| 		err = json.Unmarshal(config, &m.config) | ||||
| 		if err != nil { | ||||
|   | ||||
| @@ -81,7 +81,6 @@ func getStats(filename string) map[string]MemstatStats { | ||||
| func (m *MemstatCollector) Init(config json.RawMessage) error { | ||||
| 	var err error | ||||
| 	m.name = "MemstatCollector" | ||||
| 	m.parallel = true | ||||
| 	m.config.NodeStats = true | ||||
| 	m.config.NumaStats = false | ||||
| 	if len(config) > 0 { | ||||
| @@ -160,7 +159,6 @@ func (m *MemstatCollector) Init(config json.RawMessage) error { | ||||
|  | ||||
| func (m *MemstatCollector) Read(interval time.Duration, output chan lp.CCMetric) { | ||||
| 	if !m.init { | ||||
| 		cclog.ComponentPrint(m.name, "Here") | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
|   | ||||
| @@ -3,6 +3,10 @@ package collectors | ||||
| import ( | ||||
| 	"encoding/json" | ||||
| 	"fmt" | ||||
| 	"io/ioutil" | ||||
| 	"log" | ||||
| 	"strconv" | ||||
| 	"strings" | ||||
| 	"time" | ||||
|  | ||||
| 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | ||||
| @@ -12,7 +16,6 @@ type MetricCollector interface { | ||||
| 	Name() string                                         // Name of the metric collector | ||||
| 	Init(config json.RawMessage) error                    // Initialize metric collector | ||||
| 	Initialized() bool                                    // Is metric collector initialized? | ||||
| 	Parallel() bool | ||||
| 	Read(duration time.Duration, output chan lp.CCMetric) // Read metrics from metric collector | ||||
| 	Close()                                               // Close / finish metric collector | ||||
| } | ||||
| @@ -20,7 +23,6 @@ type MetricCollector interface { | ||||
| type metricCollector struct { | ||||
| 	name string            // name of the metric | ||||
| 	init bool              // is metric collector initialized? | ||||
| 	parallel bool              // can the metric collector be executed in parallel with others | ||||
| 	meta map[string]string // static meta data tags | ||||
| } | ||||
|  | ||||
| @@ -29,11 +31,6 @@ func (c *metricCollector) Name() string { | ||||
| 	return c.name | ||||
| } | ||||
|  | ||||
| // Name returns the name of the metric collector | ||||
| func (c *metricCollector) Parallel() bool { | ||||
| 	return c.parallel | ||||
| } | ||||
|  | ||||
| // Setup is for future use | ||||
| func (c *metricCollector) setup() error { | ||||
| 	return nil | ||||
| @@ -68,6 +65,58 @@ func stringArrayContains(array []string, str string) (int, bool) { | ||||
| 	return -1, false | ||||
| } | ||||
|  | ||||
| // SocketList returns the list of physical sockets as read from /proc/cpuinfo | ||||
| func SocketList() []int { | ||||
| 	buffer, err := ioutil.ReadFile("/proc/cpuinfo") | ||||
| 	if err != nil { | ||||
| 		log.Print(err) | ||||
| 		return nil | ||||
| 	} | ||||
| 	ll := strings.Split(string(buffer), "\n") | ||||
| 	var packs []int | ||||
| 	for _, line := range ll { | ||||
| 		if strings.HasPrefix(line, "physical id") { | ||||
| 			lv := strings.Fields(line) | ||||
| 			id, err := strconv.ParseInt(lv[3], 10, 32) | ||||
| 			if err != nil { | ||||
| 				log.Print(err) | ||||
| 				return packs | ||||
| 			} | ||||
| 			_, found := intArrayContains(packs, int(id)) | ||||
| 			if !found { | ||||
| 				packs = append(packs, int(id)) | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
| 	return packs | ||||
| } | ||||
|  | ||||
| // CpuList returns the list of physical CPUs (in contrast to logical CPUs) as read from /proc/cpuinfo | ||||
| func CpuList() []int { | ||||
| 	buffer, err := ioutil.ReadFile("/proc/cpuinfo") | ||||
| 	if err != nil { | ||||
| 		log.Print(err) | ||||
| 		return nil | ||||
| 	} | ||||
| 	ll := strings.Split(string(buffer), "\n") | ||||
| 	var cpulist []int | ||||
| 	for _, line := range ll { | ||||
| 		if strings.HasPrefix(line, "processor") { | ||||
| 			lv := strings.Fields(line) | ||||
| 			id, err := strconv.ParseInt(lv[2], 10, 32) | ||||
| 			if err != nil { | ||||
| 				log.Print(err) | ||||
| 				return cpulist | ||||
| 			} | ||||
| 			_, found := intArrayContains(cpulist, int(id)) | ||||
| 			if !found { | ||||
| 				cpulist = append(cpulist, int(id)) | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
| 	return cpulist | ||||
| } | ||||
|  | ||||
| // RemoveFromStringList removes the string r from the array of strings s | ||||
| // If r is not contained in the array an error is returned | ||||
| func RemoveFromStringList(s []string, r string) ([]string, error) { | ||||
|   | ||||
| @@ -39,7 +39,6 @@ type NetstatCollector struct { | ||||
|  | ||||
| func (m *NetstatCollector) Init(config json.RawMessage) error { | ||||
| 	m.name = "NetstatCollector" | ||||
| 	m.parallel = true | ||||
| 	m.setup() | ||||
| 	m.lastTimestamp = time.Now() | ||||
|  | ||||
|   | ||||
| @@ -114,7 +114,6 @@ func (m *nfsCollector) MainInit(config json.RawMessage) error { | ||||
| 	m.data = make(map[string]NfsCollectorData) | ||||
| 	m.initStats() | ||||
| 	m.init = true | ||||
| 	m.parallel = true | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
|   | ||||
| @@ -54,7 +54,6 @@ func (m *NUMAStatsCollector) Init(config json.RawMessage) error { | ||||
| 	} | ||||
|  | ||||
| 	m.name = "NUMAStatsCollector" | ||||
| 	m.parallel = true | ||||
| 	m.setup() | ||||
| 	m.meta = map[string]string{ | ||||
| 		"source": m.name, | ||||
|   | ||||
										
											
												File diff suppressed because it is too large
												Load Diff
											
										
									
								
							| @@ -4,73 +4,37 @@ | ||||
| ```json | ||||
|   "nvidia": { | ||||
|     "exclude_devices" : [ | ||||
|       "0","1", "0000000:ff:01.0" | ||||
|       "0","1" | ||||
|     ], | ||||
|     "exclude_metrics": [ | ||||
|       "nv_fb_mem_used", | ||||
|       "nv_fb_memory", | ||||
|       "nv_fan" | ||||
|     ], | ||||
|     "process_mig_devices": false, | ||||
|     "use_pci_info_as_type_id": true, | ||||
|     "add_pci_info_tag": false, | ||||
|     "add_uuid_meta": false, | ||||
|     "add_board_number_meta": false, | ||||
|     "add_serial_meta": false, | ||||
|     "use_uuid_for_mig_device": false, | ||||
|     "use_slice_for_mig_device": false | ||||
|     ] | ||||
|   } | ||||
| ``` | ||||
|  | ||||
| The `nvidia` collector can be configured to leave out specific devices with the `exclude_devices` option. It takes IDs as supplied to the NVML with `nvmlDeviceGetHandleByIndex()` or the PCI address in NVML format (`%08X:%02X:%02X.0`). Metrics (listed below) that should not be sent to the MetricRouter can be excluded with the `exclude_metrics` option. Commonly only the physical GPUs are monitored. If MIG devices should be analyzed as well, set `process_mig_devices` (adds `stype=mig,stype-id=<mig_index>`). With the options `use_uuid_for_mig_device` and `use_slice_for_mig_device`, the `<mig_index>` can be replaced with the UUID (e.g. `MIG-6a9f7cc8-6d5b-5ce0-92de-750edc4d8849`) or the MIG slice name (e.g. `1g.5gb`). | ||||
|  | ||||
| The metrics sent by the `nvidia` collector use `accelerator` as `type` tag. For the `type-id`, it uses the device handle index by default. With the `use_pci_info_as_type_id` option, the PCI ID is used instead. If both values should be added as tags, activate the `add_pci_info_tag` option. It uses the device handle index as `type-id` and adds the PCI ID as separate `pci_identifier` tag. | ||||
|  | ||||
| Optionally, it is possible to add the UUID, the board part number and the serial to the meta informations. They are not sent to the sinks (if not configured otherwise). | ||||
|  | ||||
|  | ||||
| Metrics: | ||||
| * `nv_util` | ||||
| * `nv_mem_util` | ||||
| * `nv_fb_mem_total` | ||||
| * `nv_fb_mem_used` | ||||
| * `nv_bar1_mem_total` | ||||
| * `nv_bar1_mem_used` | ||||
| * `nv_mem_total` | ||||
| * `nv_fb_memory` | ||||
| * `nv_temp` | ||||
| * `nv_fan` | ||||
| * `nv_ecc_mode` | ||||
| * `nv_perf_state` | ||||
| * `nv_power_usage` | ||||
| * `nv_graphics_clock` | ||||
| * `nv_sm_clock` | ||||
| * `nv_mem_clock` | ||||
| * `nv_video_clock` | ||||
| * `nv_power_usage_report` | ||||
| * `nv_graphics_clock_report` | ||||
| * `nv_sm_clock_report` | ||||
| * `nv_mem_clock_report` | ||||
| * `nv_max_graphics_clock` | ||||
| * `nv_max_sm_clock` | ||||
| * `nv_max_mem_clock` | ||||
| * `nv_max_video_clock` | ||||
| * `nv_ecc_uncorrected_error` | ||||
| * `nv_ecc_corrected_error` | ||||
| * `nv_power_max_limit` | ||||
| * `nv_ecc_db_error` | ||||
| * `nv_ecc_sb_error` | ||||
| * `nv_power_man_limit` | ||||
| * `nv_encoder_util` | ||||
| * `nv_decoder_util` | ||||
| * `nv_remapped_rows_corrected` | ||||
| * `nv_remapped_rows_uncorrected` | ||||
| * `nv_remapped_rows_pending` | ||||
| * `nv_remapped_rows_failure` | ||||
| * `nv_compute_processes` | ||||
| * `nv_graphics_processes` | ||||
| * `nv_violation_power` | ||||
| * `nv_violation_thermal` | ||||
| * `nv_violation_sync_boost` | ||||
| * `nv_violation_board_limit` | ||||
| * `nv_violation_low_util` | ||||
| * `nv_violation_reliability` | ||||
| * `nv_violation_below_app_clock` | ||||
| * `nv_violation_below_base_clock` | ||||
| * `nv_nvlink_crc_flit_errors` | ||||
| * `nv_nvlink_crc_errors` | ||||
| * `nv_nvlink_ecc_errors` | ||||
| * `nv_nvlink_replay_errors` | ||||
| * `nv_nvlink_recovery_errors` | ||||
|  | ||||
| Some metrics add the additional sub type tag (`stype`) like the `nv_nvlink_*` metrics set `stype=nvlink,stype-id=<link_number>`.  | ||||
| It uses a separate `type` in the metrics. The output metric looks like this: | ||||
| `<name>,type=accelerator,type-id=<nvidia-gpu-id> value=<metric value> <timestamp>` | ||||
|  | ||||
|   | ||||
| @@ -1,319 +0,0 @@ | ||||
| package collectors | ||||
|  | ||||
| import ( | ||||
| 	"encoding/json" | ||||
| 	"errors" | ||||
| 	"fmt" | ||||
| 	"time" | ||||
|  | ||||
| 	cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" | ||||
| 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | ||||
| 	"github.com/ClusterCockpit/go-rocm-smi/pkg/rocm_smi" | ||||
| ) | ||||
|  | ||||
| type RocmSmiCollectorConfig struct { | ||||
| 	ExcludeMetrics     []string `json:"exclude_metrics,omitempty"` | ||||
| 	ExcludeDevices     []string `json:"exclude_devices,omitempty"` | ||||
| 	AddPciInfoTag      bool     `json:"add_pci_info_tag,omitempty"` | ||||
| 	UsePciInfoAsTypeId bool     `json:"use_pci_info_as_type_id,omitempty"` | ||||
| 	AddSerialMeta      bool     `json:"add_serial_meta,omitempty"` | ||||
| } | ||||
|  | ||||
| type RocmSmiCollectorDevice struct { | ||||
| 	device         rocm_smi.DeviceHandle | ||||
| 	index          int | ||||
| 	tags           map[string]string // default tags | ||||
| 	meta           map[string]string // default meta information | ||||
| 	excludeMetrics map[string]bool   // copy of exclude metrics from config | ||||
| } | ||||
|  | ||||
| type RocmSmiCollector struct { | ||||
| 	metricCollector | ||||
| 	config  RocmSmiCollectorConfig // the configuration structure | ||||
| 	devices []RocmSmiCollectorDevice | ||||
| } | ||||
|  | ||||
| // Functions to implement MetricCollector interface | ||||
| // Init(...), Read(...), Close() | ||||
| // See: metricCollector.go | ||||
|  | ||||
| // Init initializes the sample collector | ||||
| // Called once by the collector manager | ||||
| // All tags, meta data tags and metrics that do not change over the runtime should be set here | ||||
| func (m *RocmSmiCollector) Init(config json.RawMessage) error { | ||||
| 	var err error = nil | ||||
| 	// Always set the name early in Init() to use it in cclog.Component* functions | ||||
| 	m.name = "RocmSmiCollector" | ||||
| 	// This is for later use, also call it early | ||||
| 	m.setup() | ||||
| 	// Define meta information sent with each metric | ||||
| 	// (Can also be dynamic or this is the basic set with extension through AddMeta()) | ||||
| 	//m.meta = map[string]string{"source": m.name, "group": "AMD"} | ||||
| 	// Define tags sent with each metric | ||||
| 	// The 'type' tag is always needed, it defines the granulatity of the metric | ||||
| 	// node -> whole system | ||||
| 	// socket -> CPU socket (requires socket ID as 'type-id' tag) | ||||
| 	// cpu -> single CPU hardware thread (requires cpu ID as 'type-id' tag) | ||||
| 	//m.tags = map[string]string{"type": "node"} | ||||
| 	// Read in the JSON configuration | ||||
| 	if len(config) > 0 { | ||||
| 		err = json.Unmarshal(config, &m.config) | ||||
| 		if err != nil { | ||||
| 			cclog.ComponentError(m.name, "Error reading config:", err.Error()) | ||||
| 			return err | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	ret := rocm_smi.Init() | ||||
| 	if ret != rocm_smi.STATUS_SUCCESS { | ||||
| 		err = errors.New("Failed to initialize ROCm SMI library") | ||||
| 		cclog.ComponentError(m.name, err.Error()) | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	numDevs, ret := rocm_smi.NumMonitorDevices() | ||||
| 	if ret != rocm_smi.STATUS_SUCCESS { | ||||
| 		err = errors.New("Failed to get number of GPUs from ROCm SMI library") | ||||
| 		cclog.ComponentError(m.name, err.Error()) | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	exclDev := func(s string) bool { | ||||
| 		skip_device := false | ||||
| 		for _, excl := range m.config.ExcludeDevices { | ||||
| 			if excl == s { | ||||
| 				skip_device = true | ||||
| 				break | ||||
| 			} | ||||
| 		} | ||||
| 		return skip_device | ||||
| 	} | ||||
|  | ||||
| 	m.devices = make([]RocmSmiCollectorDevice, 0) | ||||
|  | ||||
| 	for i := 0; i < numDevs; i++ { | ||||
| 		str_i := fmt.Sprintf("%d", i) | ||||
| 		if exclDev(str_i) { | ||||
| 			continue | ||||
| 		} | ||||
| 		device, ret := rocm_smi.DeviceGetHandleByIndex(i) | ||||
| 		if ret != rocm_smi.STATUS_SUCCESS { | ||||
| 			err = fmt.Errorf("Failed to get handle for GPU %d", i) | ||||
| 			cclog.ComponentError(m.name, err.Error()) | ||||
| 			return err | ||||
| 		} | ||||
|  | ||||
| 		pciInfo, ret := rocm_smi.DeviceGetPciInfo(device) | ||||
| 		if ret != rocm_smi.STATUS_SUCCESS { | ||||
| 			err = fmt.Errorf("Failed to get PCI information for GPU %d", i) | ||||
| 			cclog.ComponentError(m.name, err.Error()) | ||||
| 			return err | ||||
| 		} | ||||
|  | ||||
| 		pciId := fmt.Sprintf( | ||||
| 			"%08X:%02X:%02X.%X", | ||||
| 			pciInfo.Domain, | ||||
| 			pciInfo.Bus, | ||||
| 			pciInfo.Device, | ||||
| 			pciInfo.Function) | ||||
|  | ||||
| 		if exclDev(pciId) { | ||||
| 			continue | ||||
| 		} | ||||
|  | ||||
| 		dev := RocmSmiCollectorDevice{ | ||||
| 			device: device, | ||||
| 			tags: map[string]string{ | ||||
| 				"type":    "accelerator", | ||||
| 				"type-id": str_i, | ||||
| 			}, | ||||
| 			meta: map[string]string{ | ||||
| 				"source": m.name, | ||||
| 				"group":  "AMD", | ||||
| 			}, | ||||
| 		} | ||||
| 		if m.config.UsePciInfoAsTypeId { | ||||
| 			dev.tags["type-id"] = pciId | ||||
| 		} else if m.config.AddPciInfoTag { | ||||
| 			dev.tags["pci_identifier"] = pciId | ||||
| 		} | ||||
|  | ||||
| 		if m.config.AddSerialMeta { | ||||
| 			serial, ret := rocm_smi.DeviceGetSerialNumber(device) | ||||
| 			if ret != rocm_smi.STATUS_SUCCESS { | ||||
| 				cclog.ComponentError(m.name, "Unable to get serial number for device at index", i, ":", rocm_smi.StatusStringNoError(ret)) | ||||
| 			} else { | ||||
| 				dev.meta["serial"] = serial | ||||
| 			} | ||||
| 		} | ||||
| 		// Add excluded metrics | ||||
| 		dev.excludeMetrics = map[string]bool{} | ||||
| 		for _, e := range m.config.ExcludeMetrics { | ||||
| 			dev.excludeMetrics[e] = true | ||||
| 		} | ||||
| 		dev.index = i | ||||
| 		m.devices = append(m.devices, dev) | ||||
| 	} | ||||
|  | ||||
| 	// Set this flag only if everything is initialized properly, all required files exist, ... | ||||
| 	m.init = true | ||||
| 	return err | ||||
| } | ||||
|  | ||||
| // Read collects all metrics belonging to the sample collector | ||||
| // and sends them through the output channel to the collector manager | ||||
| func (m *RocmSmiCollector) Read(interval time.Duration, output chan lp.CCMetric) { | ||||
| 	// Create a sample metric | ||||
| 	timestamp := time.Now() | ||||
|  | ||||
| 	for _, dev := range m.devices { | ||||
| 		metrics, ret := rocm_smi.DeviceGetMetrics(dev.device) | ||||
| 		if ret != rocm_smi.STATUS_SUCCESS { | ||||
| 			cclog.ComponentError(m.name, "Unable to get metrics for device at index", dev.index, ":", rocm_smi.StatusStringNoError(ret)) | ||||
| 			continue | ||||
| 		} | ||||
|  | ||||
| 		if !dev.excludeMetrics["rocm_gfx_util"] { | ||||
| 			value := metrics.Average_gfx_activity | ||||
| 			y, err := lp.New("rocm_gfx_util", dev.tags, dev.meta, map[string]interface{}{"value": value}, timestamp) | ||||
| 			if err == nil { | ||||
| 				output <- y | ||||
| 			} | ||||
| 		} | ||||
| 		if !dev.excludeMetrics["rocm_umc_util"] { | ||||
| 			value := metrics.Average_umc_activity | ||||
| 			y, err := lp.New("rocm_umc_util", dev.tags, dev.meta, map[string]interface{}{"value": value}, timestamp) | ||||
| 			if err == nil { | ||||
| 				output <- y | ||||
| 			} | ||||
| 		} | ||||
| 		if !dev.excludeMetrics["rocm_mm_util"] { | ||||
| 			value := metrics.Average_mm_activity | ||||
| 			y, err := lp.New("rocm_mm_util", dev.tags, dev.meta, map[string]interface{}{"value": value}, timestamp) | ||||
| 			if err == nil { | ||||
| 				output <- y | ||||
| 			} | ||||
| 		} | ||||
| 		if !dev.excludeMetrics["rocm_avg_power"] { | ||||
| 			value := metrics.Average_socket_power | ||||
| 			y, err := lp.New("rocm_avg_power", dev.tags, dev.meta, map[string]interface{}{"value": value}, timestamp) | ||||
| 			if err == nil { | ||||
| 				output <- y | ||||
| 			} | ||||
| 		} | ||||
| 		if !dev.excludeMetrics["rocm_temp_mem"] { | ||||
| 			value := metrics.Temperature_mem | ||||
| 			y, err := lp.New("rocm_temp_mem", dev.tags, dev.meta, map[string]interface{}{"value": value}, timestamp) | ||||
| 			if err == nil { | ||||
| 				output <- y | ||||
| 			} | ||||
| 		} | ||||
| 		if !dev.excludeMetrics["rocm_temp_hotspot"] { | ||||
| 			value := metrics.Temperature_hotspot | ||||
| 			y, err := lp.New("rocm_temp_hotspot", dev.tags, dev.meta, map[string]interface{}{"value": value}, timestamp) | ||||
| 			if err == nil { | ||||
| 				output <- y | ||||
| 			} | ||||
| 		} | ||||
| 		if !dev.excludeMetrics["rocm_temp_edge"] { | ||||
| 			value := metrics.Temperature_edge | ||||
| 			y, err := lp.New("rocm_temp_edge", dev.tags, dev.meta, map[string]interface{}{"value": value}, timestamp) | ||||
| 			if err == nil { | ||||
| 				output <- y | ||||
| 			} | ||||
| 		} | ||||
| 		if !dev.excludeMetrics["rocm_temp_vrgfx"] { | ||||
| 			value := metrics.Temperature_vrgfx | ||||
| 			y, err := lp.New("rocm_temp_vrgfx", dev.tags, dev.meta, map[string]interface{}{"value": value}, timestamp) | ||||
| 			if err == nil { | ||||
| 				output <- y | ||||
| 			} | ||||
| 		} | ||||
| 		if !dev.excludeMetrics["rocm_temp_vrsoc"] { | ||||
| 			value := metrics.Temperature_vrsoc | ||||
| 			y, err := lp.New("rocm_temp_vrsoc", dev.tags, dev.meta, map[string]interface{}{"value": value}, timestamp) | ||||
| 			if err == nil { | ||||
| 				output <- y | ||||
| 			} | ||||
| 		} | ||||
| 		if !dev.excludeMetrics["rocm_temp_vrmem"] { | ||||
| 			value := metrics.Temperature_vrmem | ||||
| 			y, err := lp.New("rocm_temp_vrmem", dev.tags, dev.meta, map[string]interface{}{"value": value}, timestamp) | ||||
| 			if err == nil { | ||||
| 				output <- y | ||||
| 			} | ||||
| 		} | ||||
| 		if !dev.excludeMetrics["rocm_gfx_clock"] { | ||||
| 			value := metrics.Average_gfxclk_frequency | ||||
| 			y, err := lp.New("rocm_gfx_clock", dev.tags, dev.meta, map[string]interface{}{"value": value}, timestamp) | ||||
| 			if err == nil { | ||||
| 				output <- y | ||||
| 			} | ||||
| 		} | ||||
| 		if !dev.excludeMetrics["rocm_soc_clock"] { | ||||
| 			value := metrics.Average_socclk_frequency | ||||
| 			y, err := lp.New("rocm_soc_clock", dev.tags, dev.meta, map[string]interface{}{"value": value}, timestamp) | ||||
| 			if err == nil { | ||||
| 				output <- y | ||||
| 			} | ||||
| 		} | ||||
| 		if !dev.excludeMetrics["rocm_u_clock"] { | ||||
| 			value := metrics.Average_uclk_frequency | ||||
| 			y, err := lp.New("rocm_u_clock", dev.tags, dev.meta, map[string]interface{}{"value": value}, timestamp) | ||||
| 			if err == nil { | ||||
| 				output <- y | ||||
| 			} | ||||
| 		} | ||||
| 		if !dev.excludeMetrics["rocm_v0_clock"] { | ||||
| 			value := metrics.Average_vclk0_frequency | ||||
| 			y, err := lp.New("rocm_v0_clock", dev.tags, dev.meta, map[string]interface{}{"value": value}, timestamp) | ||||
| 			if err == nil { | ||||
| 				output <- y | ||||
| 			} | ||||
| 		} | ||||
| 		if !dev.excludeMetrics["rocm_v1_clock"] { | ||||
| 			value := metrics.Average_vclk1_frequency | ||||
| 			y, err := lp.New("rocm_v1_clock", dev.tags, dev.meta, map[string]interface{}{"value": value}, timestamp) | ||||
| 			if err == nil { | ||||
| 				output <- y | ||||
| 			} | ||||
| 		} | ||||
| 		if !dev.excludeMetrics["rocm_d0_clock"] { | ||||
| 			value := metrics.Average_dclk0_frequency | ||||
| 			y, err := lp.New("rocm_d0_clock", dev.tags, dev.meta, map[string]interface{}{"value": value}, timestamp) | ||||
| 			if err == nil { | ||||
| 				output <- y | ||||
| 			} | ||||
| 		} | ||||
| 		if !dev.excludeMetrics["rocm_d1_clock"] { | ||||
| 			value := metrics.Average_dclk1_frequency | ||||
| 			y, err := lp.New("rocm_d1_clock", dev.tags, dev.meta, map[string]interface{}{"value": value}, timestamp) | ||||
| 			if err == nil { | ||||
| 				output <- y | ||||
| 			} | ||||
| 		} | ||||
| 		if !dev.excludeMetrics["rocm_temp_hbm"] { | ||||
| 			for i := 0; i < rocm_smi.NUM_HBM_INSTANCES; i++ { | ||||
| 				value := metrics.Temperature_hbm[i] | ||||
| 				y, err := lp.New("rocm_temp_hbm", dev.tags, dev.meta, map[string]interface{}{"value": value}, timestamp) | ||||
| 				if err == nil { | ||||
| 					y.AddTag("stype", "device") | ||||
| 					y.AddTag("stype-id", fmt.Sprintf("%d", i)) | ||||
| 					output <- y | ||||
| 				} | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| } | ||||
|  | ||||
| // Close metric collector: close network connection, close files, close libraries, ... | ||||
| // Called once by the collector manager | ||||
| func (m *RocmSmiCollector) Close() { | ||||
| 	// Unset flag | ||||
| 	ret := rocm_smi.Shutdown() | ||||
| 	if ret != rocm_smi.STATUS_SUCCESS { | ||||
| 		cclog.ComponentError(m.name, "Failed to shutdown ROCm SMI library") | ||||
| 	} | ||||
| 	m.init = false | ||||
| } | ||||
| @@ -1,47 +0,0 @@ | ||||
|  | ||||
| ## `rocm_smi` collector | ||||
|  | ||||
| ```json | ||||
|   "rocm_smi": { | ||||
|     "exclude_devices": [ | ||||
|       "0","1", "0000000:ff:01.0" | ||||
|     ], | ||||
|     "exclude_metrics": [ | ||||
|       "rocm_mm_util", | ||||
|       "rocm_temp_vrsoc" | ||||
|     ], | ||||
|     "use_pci_info_as_type_id": true, | ||||
|     "add_pci_info_tag": false, | ||||
|     "add_serial_meta": false, | ||||
|   } | ||||
| ``` | ||||
|  | ||||
| The `rocm_smi` collector can be configured to leave out specific devices with the `exclude_devices` option. It takes logical IDs in the list of available devices or the PCI address similar to NVML format (`%08X:%02X:%02X.0`). Metrics (listed below) that should not be sent to the MetricRouter can be excluded with the `exclude_metrics` option.  | ||||
|  | ||||
| The metrics sent by the `rocm_smi` collector use `accelerator` as `type` tag. For the `type-id`, it uses the device handle index by default. With the `use_pci_info_as_type_id` option, the PCI ID is used instead. If both values should be added as tags, activate the `add_pci_info_tag` option. It uses the device handle index as `type-id` and adds the PCI ID as separate `pci_identifier` tag. | ||||
|  | ||||
| Optionally, it is possible to add the serial to the meta informations. They are not sent to the sinks (if not configured otherwise). | ||||
|  | ||||
|  | ||||
| Metrics: | ||||
| * `rocm_gfx_util` | ||||
| * `rocm_umc_util` | ||||
| * `rocm_mm_util` | ||||
| * `rocm_avg_power` | ||||
| * `rocm_temp_mem` | ||||
| * `rocm_temp_hotspot` | ||||
| * `rocm_temp_edge` | ||||
| * `rocm_temp_vrgfx` | ||||
| * `rocm_temp_vrsoc` | ||||
| * `rocm_temp_vrmem` | ||||
| * `rocm_gfx_clock` | ||||
| * `rocm_soc_clock` | ||||
| * `rocm_u_clock` | ||||
| * `rocm_v0_clock` | ||||
| * `rocm_v1_clock` | ||||
| * `rocm_d0_clock` | ||||
| * `rocm_d1_clock` | ||||
| * `rocm_temp_hbm` | ||||
|  | ||||
|  | ||||
| Some metrics add the additional sub type tag (`stype`) like the `rocm_temp_hbm` metrics set `stype=device,stype-id=<HBM_slice_number>`.  | ||||
| @@ -35,10 +35,6 @@ func (m *SampleCollector) Init(config json.RawMessage) error { | ||||
| 	m.name = "InternalCollector" | ||||
| 	// This is for later use, also call it early | ||||
| 	m.setup() | ||||
| 	// Tell whether the collector should be run in parallel with others (reading files, ...) | ||||
| 	// or it should be run serially, mostly for collectors acutally doing measurements | ||||
| 	// because they should not measure the execution of the other collectors | ||||
| 	m.parallel = true | ||||
| 	// Define meta information sent with each metric | ||||
| 	// (Can also be dynamic or this is the basic set with extension through AddMeta()) | ||||
| 	m.meta = map[string]string{"source": m.name, "group": "SAMPLE"} | ||||
| @@ -46,12 +42,7 @@ func (m *SampleCollector) Init(config json.RawMessage) error { | ||||
| 	// The 'type' tag is always needed, it defines the granulatity of the metric | ||||
| 	// node -> whole system | ||||
| 	// socket -> CPU socket (requires socket ID as 'type-id' tag) | ||||
| 	// die -> CPU die (requires CPU die ID as 'type-id' tag) | ||||
| 	// memoryDomain -> NUMA domain (requires NUMA domain ID as 'type-id' tag) | ||||
| 	// llc -> Last level cache (requires last level cache ID as 'type-id' tag) | ||||
| 	// core -> single CPU core that may consist of multiple hardware threads (SMT) (requires core ID as 'type-id' tag) | ||||
| 	// hwthtread -> single CPU hardware thread (requires hardware thread ID as 'type-id' tag) | ||||
| 	// accelerator -> A accelerator device like GPU or FPGA (requires an accelerator ID as 'type-id' tag) | ||||
| 	// cpu -> single CPU hardware thread (requires cpu ID as 'type-id' tag) | ||||
| 	m.tags = map[string]string{"type": "node"} | ||||
| 	// Read in the JSON configuration | ||||
| 	if len(config) > 0 { | ||||
|   | ||||
| @@ -50,7 +50,6 @@ func (m *TempCollector) Init(config json.RawMessage) error { | ||||
| 	} | ||||
|  | ||||
| 	m.name = "TempCollector" | ||||
| 	m.parallel = true | ||||
| 	m.setup() | ||||
| 	if len(config) > 0 { | ||||
| 		err := json.Unmarshal(config, &m.config) | ||||
| @@ -117,10 +116,6 @@ func (m *TempCollector) Init(config json.RawMessage) error { | ||||
| 		} | ||||
|  | ||||
| 		// Sensor file | ||||
| 		_, err = ioutil.ReadFile(file) | ||||
| 		if err != nil { | ||||
| 			continue | ||||
| 		} | ||||
| 		sensor.file = file | ||||
|  | ||||
| 		// Sensor tags | ||||
|   | ||||
| @@ -28,7 +28,6 @@ type TopProcsCollector struct { | ||||
| func (m *TopProcsCollector) Init(config json.RawMessage) error { | ||||
| 	var err error | ||||
| 	m.name = "TopProcsCollector" | ||||
| 	m.parallel = true | ||||
| 	m.tags = map[string]string{"type": "node"} | ||||
| 	m.meta = map[string]string{"source": m.name, "group": "TopProcs"} | ||||
| 	if len(config) > 0 { | ||||
|   | ||||
							
								
								
									
										12
									
								
								config.json
									
									
									
									
									
								
							
							
						
						
									
										12
									
								
								config.json
									
									
									
									
									
								
							| @@ -1,8 +1,8 @@ | ||||
| { | ||||
|   "sinks": "./sinks.json", | ||||
|   "collectors" : "./collectors.json", | ||||
|   "receivers" : "./receivers.json", | ||||
|   "router" : "./router.json", | ||||
|   "interval": "10s", | ||||
|   "duration": "1s" | ||||
|   "sinks": "sinks.json", | ||||
|   "collectors" : "collectors.json", | ||||
|   "receivers" : "receivers.json", | ||||
|   "router" : "router.json", | ||||
|   "interval": 10, | ||||
|   "duration": 1 | ||||
| } | ||||
|   | ||||
							
								
								
									
										23
									
								
								go.mod
									
									
									
									
									
								
							
							
						
						
									
										23
									
								
								go.mod
									
									
									
									
									
								
							| @@ -1,35 +1,16 @@ | ||||
| module github.com/ClusterCockpit/cc-metric-collector | ||||
|  | ||||
| go 1.17 | ||||
| go 1.16 | ||||
|  | ||||
| require ( | ||||
| 	github.com/ClusterCockpit/cc-units v0.0.0-20220318130935-92a0c6442220 | ||||
| 	github.com/NVIDIA/go-nvml v0.11.6-0 | ||||
| 	github.com/PaesslerAG/gval v1.1.2 | ||||
| 	github.com/gorilla/mux v1.8.0 | ||||
| 	github.com/influxdata/influxdb-client-go/v2 v2.8.1 | ||||
| 	github.com/influxdata/line-protocol v0.0.0-20210922203350-b1ad95c89adf | ||||
| 	github.com/nats-io/nats-server/v2 v2.8.0 // indirect | ||||
| 	github.com/nats-io/nats.go v1.14.0 | ||||
| 	github.com/prometheus/client_golang v1.12.1 | ||||
| 	github.com/stmcginnis/gofish v0.13.0 | ||||
| 	golang.org/x/sys v0.0.0-20220412211240-33da011f77ad | ||||
| ) | ||||
|  | ||||
| require ( | ||||
| 	github.com/beorn7/perks v1.0.1 // indirect | ||||
| 	github.com/cespare/xxhash/v2 v2.1.2 // indirect | ||||
| 	github.com/deepmap/oapi-codegen v1.8.2 // indirect | ||||
| 	github.com/golang/protobuf v1.5.2 // indirect | ||||
| 	github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect | ||||
| 	github.com/nats-io/nats-server/v2 v2.8.0 // indirect | ||||
| 	github.com/nats-io/nkeys v0.3.0 // indirect | ||||
| 	github.com/nats-io/nuid v1.0.1 // indirect | ||||
| 	github.com/pkg/errors v0.9.1 // indirect | ||||
| 	github.com/prometheus/client_model v0.2.0 // indirect | ||||
| 	github.com/prometheus/common v0.32.1 // indirect | ||||
| 	github.com/prometheus/procfs v0.7.3 // indirect | ||||
| 	golang.org/x/crypto v0.0.0-20220112180741-5e0467b6c7ce // indirect | ||||
| 	golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 // indirect | ||||
| 	google.golang.org/protobuf v1.26.0 // indirect | ||||
| 	gopkg.in/yaml.v2 v2.4.0 // indirect | ||||
| ) | ||||
|   | ||||
							
								
								
									
										16
									
								
								go.mod.1.16
									
									
									
									
									
								
							
							
						
						
									
										16
									
								
								go.mod.1.16
									
									
									
									
									
								
							| @@ -1,16 +0,0 @@ | ||||
| module github.com/ClusterCockpit/cc-metric-collector | ||||
|  | ||||
| go 1.16 | ||||
|  | ||||
| require ( | ||||
| 	github.com/NVIDIA/go-nvml v0.11.6-0 | ||||
| 	github.com/PaesslerAG/gval v1.1.2 | ||||
| 	github.com/gorilla/mux v1.8.0 | ||||
| 	github.com/influxdata/influxdb-client-go/v2 v2.7.0 | ||||
| 	github.com/influxdata/line-protocol v0.0.0-20210922203350-b1ad95c89adf | ||||
| 	github.com/nats-io/nats-server/v2 v2.8.0 // indirect | ||||
| 	github.com/nats-io/nats.go v1.14.0 | ||||
| 	github.com/prometheus/client_golang v1.12.1 | ||||
| 	github.com/stmcginnis/gofish v0.13.0 | ||||
| 	golang.org/x/sys v0.0.0-20220412211240-33da011f77ad | ||||
| ) | ||||
							
								
								
									
										16
									
								
								go.mod.1.17+
									
									
									
									
									
								
							
							
						
						
									
										16
									
								
								go.mod.1.17+
									
									
									
									
									
								
							| @@ -1,16 +0,0 @@ | ||||
| module github.com/ClusterCockpit/cc-metric-collector | ||||
|  | ||||
| go 1.17 | ||||
|  | ||||
| require ( | ||||
| 	github.com/NVIDIA/go-nvml v0.11.6-0 | ||||
| 	github.com/PaesslerAG/gval v1.1.2 | ||||
| 	github.com/gorilla/mux v1.8.0 | ||||
| 	github.com/influxdata/influxdb-client-go/v2 v2.8.1 | ||||
| 	github.com/influxdata/line-protocol v0.0.0-20210922203350-b1ad95c89adf | ||||
| 	github.com/nats-io/nats-server/v2 v2.8.0 // indirect | ||||
| 	github.com/nats-io/nats.go v1.14.0 | ||||
| 	github.com/prometheus/client_golang v1.12.1 | ||||
| 	github.com/stmcginnis/gofish v0.13.0 | ||||
| 	golang.org/x/sys v0.0.0-20220412211240-33da011f77ad | ||||
| ) | ||||
| @@ -29,7 +29,6 @@ func intArrayContains(array []int, str int) (int, bool) { | ||||
| 	return -1, false | ||||
| } | ||||
|  | ||||
| // Used internally for sysfs file reads | ||||
| func fileToInt(path string) int { | ||||
| 	buffer, err := ioutil.ReadFile(path) | ||||
| 	if err != nil { | ||||
| @@ -48,7 +47,6 @@ func fileToInt(path string) int { | ||||
| 	return int(id) | ||||
| } | ||||
|  | ||||
| // Get list of CPU socket IDs | ||||
| func SocketList() []int { | ||||
| 	buffer, err := ioutil.ReadFile(string(PROCFS_CPUINFO)) | ||||
| 	if err != nil { | ||||
| @@ -56,7 +54,7 @@ func SocketList() []int { | ||||
| 		return nil | ||||
| 	} | ||||
| 	ll := strings.Split(string(buffer), "\n") | ||||
| 	packs := make([]int, 0) | ||||
| 	var packs []int | ||||
| 	for _, line := range ll { | ||||
| 		if strings.HasPrefix(line, "physical id") { | ||||
| 			lv := strings.Fields(line) | ||||
| @@ -74,8 +72,7 @@ func SocketList() []int { | ||||
| 	return packs | ||||
| } | ||||
|  | ||||
| // Get list of hardware thread IDs in the order of listing in /proc/cpuinfo | ||||
| func HwthreadList() []int { | ||||
| func CpuList() []int { | ||||
| 	buffer, err := ioutil.ReadFile(string(PROCFS_CPUINFO)) | ||||
| 	if err != nil { | ||||
| 		log.Print(err) | ||||
| @@ -100,13 +97,6 @@ func HwthreadList() []int { | ||||
| 	return cpulist | ||||
| } | ||||
|  | ||||
| // Get list of hardware thread IDs in the order of listing in /proc/cpuinfo | ||||
| // Deprecated! Use HwthreadList() | ||||
| func CpuList() []int { | ||||
| 	return HwthreadList() | ||||
| } | ||||
|  | ||||
| // Get list of CPU core IDs in the order of listing in /proc/cpuinfo | ||||
| func CoreList() []int { | ||||
| 	buffer, err := ioutil.ReadFile(string(PROCFS_CPUINFO)) | ||||
| 	if err != nil { | ||||
| @@ -132,7 +122,6 @@ func CoreList() []int { | ||||
| 	return corelist | ||||
| } | ||||
|  | ||||
| // Get list of NUMA node IDs | ||||
| func NumaNodeList() []int { | ||||
| 	numaList := make([]int, 0) | ||||
| 	globPath := filepath.Join(string(SYSFS_NUMABASE), "node*") | ||||
| @@ -167,9 +156,8 @@ func NumaNodeList() []int { | ||||
| 	return numaList | ||||
| } | ||||
|  | ||||
| // Get list of CPU die IDs | ||||
| func DieList() []int { | ||||
| 	cpulist := HwthreadList() | ||||
| 	cpulist := CpuList() | ||||
| 	dielist := make([]int, 0) | ||||
| 	for _, c := range cpulist { | ||||
| 		diepath := filepath.Join(string(SYSFS_CPUBASE), fmt.Sprintf("cpu%d", c), "topology/die_id") | ||||
| @@ -187,27 +175,7 @@ func DieList() []int { | ||||
| 	return SocketList() | ||||
| } | ||||
|  | ||||
| // Get list of specified type using the naming format inside ClusterCockpit | ||||
| func GetTypeList(topology_type string) []int { | ||||
| 	switch topology_type { | ||||
| 	case "node": | ||||
| 		return []int{0} | ||||
| 	case "socket": | ||||
| 		return SocketList() | ||||
| 	case "die": | ||||
| 		return DieList() | ||||
| 	case "memoryDomain": | ||||
| 		return NumaNodeList() | ||||
| 	case "core": | ||||
| 		return CoreList() | ||||
| 	case "hwthread": | ||||
| 		return HwthreadList() | ||||
| 	} | ||||
| 	return []int{} | ||||
| } | ||||
|  | ||||
| // Structure holding all information about a hardware thread | ||||
| type HwthreadEntry struct { | ||||
| type CpuEntry struct { | ||||
| 	Cpuid      int | ||||
| 	SMT        int | ||||
| 	Core       int | ||||
| @@ -216,25 +184,25 @@ type HwthreadEntry struct { | ||||
| 	Die        int | ||||
| } | ||||
|  | ||||
| func CpuData() []HwthreadEntry { | ||||
| func CpuData() []CpuEntry { | ||||
|  | ||||
| 	// fileToInt := func(path string) int { | ||||
| 	// 	buffer, err := ioutil.ReadFile(path) | ||||
| 	// 	if err != nil { | ||||
| 	// 		log.Print(err) | ||||
| 	// 		//cclogger.ComponentError("ccTopology", "Reading", path, ":", err.Error()) | ||||
| 	// 		return -1 | ||||
| 	// 	} | ||||
| 	// 	sbuffer := strings.Replace(string(buffer), "\n", "", -1) | ||||
| 	// 	var id int64 | ||||
| 	// 	//_, err = fmt.Scanf("%d", sbuffer, &id) | ||||
| 	// 	id, err = strconv.ParseInt(sbuffer, 10, 32) | ||||
| 	// 	if err != nil { | ||||
| 	// 		cclogger.ComponentError("ccTopology", "Parsing", path, ":", sbuffer, err.Error()) | ||||
| 	// 		return -1 | ||||
| 	// 	} | ||||
| 	// 	return int(id) | ||||
| 	// } | ||||
| 	fileToInt := func(path string) int { | ||||
| 		buffer, err := ioutil.ReadFile(path) | ||||
| 		if err != nil { | ||||
| 			log.Print(err) | ||||
| 			//cclogger.ComponentError("ccTopology", "Reading", path, ":", err.Error()) | ||||
| 			return -1 | ||||
| 		} | ||||
| 		sbuffer := strings.Replace(string(buffer), "\n", "", -1) | ||||
| 		var id int64 | ||||
| 		//_, err = fmt.Scanf("%d", sbuffer, &id) | ||||
| 		id, err = strconv.ParseInt(sbuffer, 10, 32) | ||||
| 		if err != nil { | ||||
| 			cclogger.ComponentError("ccTopology", "Parsing", path, ":", sbuffer, err.Error()) | ||||
| 			return -1 | ||||
| 		} | ||||
| 		return int(id) | ||||
| 	} | ||||
| 	getCore := func(basepath string) int { | ||||
| 		return fileToInt(fmt.Sprintf("%s/core_id", basepath)) | ||||
| 	} | ||||
| @@ -292,9 +260,9 @@ func CpuData() []HwthreadEntry { | ||||
| 		return 0 | ||||
| 	} | ||||
|  | ||||
| 	clist := make([]HwthreadEntry, 0) | ||||
| 	for _, c := range HwthreadList() { | ||||
| 		clist = append(clist, HwthreadEntry{Cpuid: c}) | ||||
| 	clist := make([]CpuEntry, 0) | ||||
| 	for _, c := range CpuList() { | ||||
| 		clist = append(clist, CpuEntry{Cpuid: c}) | ||||
| 	} | ||||
| 	for i, centry := range clist { | ||||
| 		centry.Socket = -1 | ||||
| @@ -330,7 +298,6 @@ func CpuData() []HwthreadEntry { | ||||
| 	return clist | ||||
| } | ||||
|  | ||||
| // Structure holding basic information about a CPU | ||||
| type CpuInformation struct { | ||||
| 	NumHWthreads   int | ||||
| 	SMTWidth       int | ||||
| @@ -340,7 +307,6 @@ type CpuInformation struct { | ||||
| 	NumNumaDomains int | ||||
| } | ||||
|  | ||||
| // Get basic information about the CPU | ||||
| func CpuInfo() CpuInformation { | ||||
| 	var c CpuInformation | ||||
|  | ||||
| @@ -376,8 +342,7 @@ func CpuInfo() CpuInformation { | ||||
| 	return c | ||||
| } | ||||
|  | ||||
| // Get the CPU socket ID for a given hardware thread ID | ||||
| func GetHwthreadSocket(cpuid int) int { | ||||
| func GetCpuSocket(cpuid int) int { | ||||
| 	cdata := CpuData() | ||||
| 	for _, d := range cdata { | ||||
| 		if d.Cpuid == cpuid { | ||||
| @@ -387,8 +352,7 @@ func GetHwthreadSocket(cpuid int) int { | ||||
| 	return -1 | ||||
| } | ||||
|  | ||||
| // Get the NUMA node ID for a given hardware thread ID | ||||
| func GetHwthreadNumaDomain(cpuid int) int { | ||||
| func GetCpuNumaDomain(cpuid int) int { | ||||
| 	cdata := CpuData() | ||||
| 	for _, d := range cdata { | ||||
| 		if d.Cpuid == cpuid { | ||||
| @@ -398,8 +362,7 @@ func GetHwthreadNumaDomain(cpuid int) int { | ||||
| 	return -1 | ||||
| } | ||||
|  | ||||
| // Get the CPU die ID for a given hardware thread ID | ||||
| func GetHwthreadDie(cpuid int) int { | ||||
| func GetCpuDie(cpuid int) int { | ||||
| 	cdata := CpuData() | ||||
| 	for _, d := range cdata { | ||||
| 		if d.Cpuid == cpuid { | ||||
| @@ -409,8 +372,7 @@ func GetHwthreadDie(cpuid int) int { | ||||
| 	return -1 | ||||
| } | ||||
|  | ||||
| // Get the CPU core ID for a given hardware thread ID | ||||
| func GetHwthreadCore(cpuid int) int { | ||||
| func GetCpuCore(cpuid int) int { | ||||
| 	cdata := CpuData() | ||||
| 	for _, d := range cdata { | ||||
| 		if d.Cpuid == cpuid { | ||||
| @@ -420,8 +382,7 @@ func GetHwthreadCore(cpuid int) int { | ||||
| 	return -1 | ||||
| } | ||||
|  | ||||
| // Get the all hardware thread ID associated with a CPU socket | ||||
| func GetSocketHwthreads(socket int) []int { | ||||
| func GetSocketCpus(socket int) []int { | ||||
| 	all := CpuData() | ||||
| 	cpulist := make([]int, 0) | ||||
| 	for _, d := range all { | ||||
| @@ -432,8 +393,7 @@ func GetSocketHwthreads(socket int) []int { | ||||
| 	return cpulist | ||||
| } | ||||
|  | ||||
| // Get the all hardware thread ID associated with a NUMA node | ||||
| func GetNumaDomainHwthreads(domain int) []int { | ||||
| func GetNumaDomainCpus(domain int) []int { | ||||
| 	all := CpuData() | ||||
| 	cpulist := make([]int, 0) | ||||
| 	for _, d := range all { | ||||
| @@ -444,8 +404,7 @@ func GetNumaDomainHwthreads(domain int) []int { | ||||
| 	return cpulist | ||||
| } | ||||
|  | ||||
| // Get the all hardware thread ID associated with a CPU die | ||||
| func GetDieHwthreads(die int) []int { | ||||
| func GetDieCpus(die int) []int { | ||||
| 	all := CpuData() | ||||
| 	cpulist := make([]int, 0) | ||||
| 	for _, d := range all { | ||||
| @@ -456,8 +415,7 @@ func GetDieHwthreads(die int) []int { | ||||
| 	return cpulist | ||||
| } | ||||
|  | ||||
| // Get the all hardware thread ID associated with a CPU core | ||||
| func GetCoreHwthreads(core int) []int { | ||||
| func GetCoreCpus(core int) []int { | ||||
| 	all := CpuData() | ||||
| 	cpulist := make([]int, 0) | ||||
| 	for _, d := range all { | ||||
|   | ||||
| @@ -246,7 +246,7 @@ func matchfunc(args ...interface{}) (interface{}, error) { | ||||
| func getCpuCoreFunc(args ...interface{}) (interface{}, error) { | ||||
| 	switch cpuid := args[0].(type) { | ||||
| 	case int: | ||||
| 		return topo.GetHwthreadCore(cpuid), nil | ||||
| 		return topo.GetCpuCore(cpuid), nil | ||||
| 	} | ||||
| 	return -1, errors.New("function 'getCpuCore' accepts only an 'int' cpuid") | ||||
| } | ||||
| @@ -255,7 +255,7 @@ func getCpuCoreFunc(args ...interface{}) (interface{}, error) { | ||||
| func getCpuSocketFunc(args ...interface{}) (interface{}, error) { | ||||
| 	switch cpuid := args[0].(type) { | ||||
| 	case int: | ||||
| 		return topo.GetHwthreadSocket(cpuid), nil | ||||
| 		return topo.GetCpuSocket(cpuid), nil | ||||
| 	} | ||||
| 	return -1, errors.New("function 'getCpuCore' accepts only an 'int' cpuid") | ||||
| } | ||||
| @@ -264,7 +264,7 @@ func getCpuSocketFunc(args ...interface{}) (interface{}, error) { | ||||
| func getCpuNumaDomainFunc(args ...interface{}) (interface{}, error) { | ||||
| 	switch cpuid := args[0].(type) { | ||||
| 	case int: | ||||
| 		return topo.GetHwthreadNumaDomain(cpuid), nil | ||||
| 		return topo.GetCpuNumaDomain(cpuid), nil | ||||
| 	} | ||||
| 	return -1, errors.New("function 'getCpuNuma' accepts only an 'int' cpuid") | ||||
| } | ||||
| @@ -273,7 +273,7 @@ func getCpuNumaDomainFunc(args ...interface{}) (interface{}, error) { | ||||
| func getCpuDieFunc(args ...interface{}) (interface{}, error) { | ||||
| 	switch cpuid := args[0].(type) { | ||||
| 	case int: | ||||
| 		return topo.GetHwthreadDie(cpuid), nil | ||||
| 		return topo.GetCpuDie(cpuid), nil | ||||
| 	} | ||||
| 	return -1, errors.New("function 'getCpuDie' accepts only an 'int' cpuid") | ||||
| } | ||||
| @@ -336,7 +336,7 @@ func getCpuListOfDieFunc(args ...interface{}) (interface{}, error) { | ||||
|  | ||||
| // wrapper function to get a list of all cpuids of the node | ||||
| func getCpuListOfNode(args ...interface{}) (interface{}, error) { | ||||
| 	return topo.HwthreadList(), nil | ||||
| 	return topo.CpuList(), nil | ||||
| } | ||||
|  | ||||
| // helper function to get the cpuid list for a CCMetric type tag set (type and type-id) | ||||
| @@ -348,14 +348,14 @@ func getCpuListOfType(args ...interface{}) (interface{}, error) { | ||||
| 	case string: | ||||
| 		switch typ { | ||||
| 		case "node": | ||||
| 			return topo.HwthreadList(), nil | ||||
| 			return topo.CpuList(), nil | ||||
| 		case "socket": | ||||
| 			return getCpuListOfSocketFunc(args[1]) | ||||
| 		case "numadomain": | ||||
| 			return getCpuListOfNumaDomainFunc(args[1]) | ||||
| 		case "core": | ||||
| 			return getCpuListOfCoreFunc(args[1]) | ||||
| 		case "hwthread": | ||||
| 		case "cpu": | ||||
| 			var cpu int | ||||
|  | ||||
| 			switch id := args[1].(type) { | ||||
|   | ||||
| @@ -52,11 +52,6 @@ The CCMetric router sits in between the collectors and the sinks and can be used | ||||
|     ], | ||||
|     "rename_metrics" : { | ||||
|         "metric_12345" : "mymetric" | ||||
|     }, | ||||
|     "normalize_units" : true, | ||||
|     "change_unit_prefix" : { | ||||
|       "mem_used" : "G", | ||||
|       "mem_total" : "G" | ||||
|     } | ||||
| } | ||||
| ``` | ||||
| @@ -197,14 +192,6 @@ This option takes a list of evaluable conditions and performs them one after the | ||||
| ``` | ||||
| The first line is comparable with the example in `drop_metrics`, it drops all metrics starting with `drop_metric_` and ending with a number. The second line drops all metrics of the first hardware thread (**not** recommended) | ||||
|  | ||||
| # Manipulating the metric units | ||||
|  | ||||
| ## The `normalize_units` option | ||||
| The cc-metric-collector tries to read the data from the system as it is reported. If available, it tries to read the metric unit from the system as well (e.g. from `/proc/meminfo`). The problem is that, depending on the source, the metric units are named differently. Just think about `byte`, `Byte`, `B`, `bytes`, ... | ||||
| The [cc-units](https://github.com/ClusterCockpit/cc-units) package provides us a normalization option to use the same metric unit name for all metrics. It this option is set to true, all `unit` meta tags are normalized. | ||||
|  | ||||
| ## The `change_unit_prefix` section | ||||
| It is often the case that metrics are reported by the system using a rather outdated unit prefix (like `/proc/meminfo` still uses kByte despite current memory sizes are in the GByte range). If you want to change the prefix of a unit, you can do that with the help of [cc-units](https://github.com/ClusterCockpit/cc-units). The setting works on the metric name and requires the new prefix for the metric. The cc-units package determines the scaling factor. | ||||
|  | ||||
| # Aggregate metric values of the current interval with the `interval_aggregates` option | ||||
|  | ||||
| @@ -252,22 +239,3 @@ Use cases for `interval_aggregates`: | ||||
|     } | ||||
|   } | ||||
| ``` | ||||
|  | ||||
| # Order of operations | ||||
|  | ||||
| The router performs the above mentioned options in a specific order. In order to get the logic you want for a specific metric, it is crucial to know the processing order: | ||||
|  | ||||
| - Add the `hostname` tag (c) | ||||
| - Manipulate the timestamp to the interval timestamp (c,r) | ||||
| - Drop metrics based on `drop_metrics` and `drop_metrics_if` (c,r) | ||||
| - Add tags based on `add_tags` (c,r) | ||||
| - Delete tags based on `del_tags` (c,r) | ||||
| - Rename metric based on `rename_metric` (c,r) | ||||
|   - Add tags based on `add_tags` to still work if the configuration uses the new name (c,r)  | ||||
|   - Delete tags based on `del_tags` to still work if the configuration uses the new name (c,r) | ||||
| - Normalize units when `normalize_units` is set (c,r) | ||||
| - Convert unit prefix based on `change_unit_prefix` (c,r) | ||||
|  | ||||
| Legend: | ||||
| - 'c' if metric is coming from a collector | ||||
| - 'r' if metric is coming from a receiver | ||||
|   | ||||
| @@ -12,7 +12,6 @@ import ( | ||||
| 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | ||||
| 	agg "github.com/ClusterCockpit/cc-metric-collector/internal/metricAggregator" | ||||
| 	mct "github.com/ClusterCockpit/cc-metric-collector/internal/multiChanTicker" | ||||
| 	units "github.com/ClusterCockpit/cc-units" | ||||
| ) | ||||
|  | ||||
| const ROUTER_MAX_FORWARD = 50 | ||||
| @@ -36,8 +35,6 @@ type metricRouterConfig struct { | ||||
| 	IntervalStamp     bool                                 `json:"interval_timestamp"`  // Update timestamp periodically by ticker each interval? | ||||
| 	NumCacheIntervals int                                  `json:"num_cache_intervals"` // Number of intervals of cached metrics for evaluation | ||||
| 	MaxForward        int                                  `json:"max_forward"`         // Number of maximal forwarded metrics at one select | ||||
| 	NormalizeUnits    bool                                 `json:"normalize_units"`     // Check unit meta flag and normalize it using cc-units | ||||
| 	ChangeUnitPrefix  map[string]string                    `json:"change_unit_prefix"`  // Add prefix that should be applied to the metrics | ||||
| 	dropMetrics       map[string]bool                      // Internal map for O(1) lookup | ||||
| } | ||||
|  | ||||
| @@ -210,38 +207,6 @@ func (r *metricRouter) dropMetric(point lp.CCMetric) bool { | ||||
| 	return false | ||||
| } | ||||
|  | ||||
| func (r *metricRouter) prepareUnit(point lp.CCMetric) bool { | ||||
| 	if r.config.NormalizeUnits { | ||||
| 		if in_unit, ok := point.GetMeta("unit"); ok { | ||||
| 			u := units.NewUnit(in_unit) | ||||
| 			if u.Valid() { | ||||
| 				point.AddMeta("unit", u.Short()) | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
| 	if newP, ok := r.config.ChangeUnitPrefix[point.Name()]; ok { | ||||
|  | ||||
| 		newPrefix := units.NewPrefix(newP) | ||||
|  | ||||
| 		if in_unit, ok := point.GetMeta("unit"); ok && newPrefix != units.InvalidPrefix { | ||||
| 			u := units.NewUnit(in_unit) | ||||
| 			if u.Valid() { | ||||
| 				cclog.ComponentDebug("MetricRouter", "Change prefix to", newP, "for metric", point.Name()) | ||||
| 				conv, out_unit := units.GetUnitPrefixFactor(u, newPrefix) | ||||
| 				if conv != nil && out_unit.Valid() { | ||||
| 					if val, ok := point.GetField("value"); ok { | ||||
| 						point.AddField("value", conv(val)) | ||||
| 						point.AddMeta("unit", out_unit.Short()) | ||||
| 					} | ||||
| 				} | ||||
| 			} | ||||
|  | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	return true | ||||
| } | ||||
|  | ||||
| // Start starts the metric router | ||||
| func (r *metricRouter) Start() { | ||||
| 	// start timer if configured | ||||
| @@ -267,11 +232,9 @@ func (r *metricRouter) Start() { | ||||
| 		if new, ok := r.config.RenameMetrics[name]; ok { | ||||
| 			point.SetName(new) | ||||
| 			point.AddMeta("oldname", name) | ||||
| 		} | ||||
| 		r.DoAddTags(point) | ||||
| 		r.DoDelTags(point) | ||||
| 		} | ||||
|  | ||||
| 		r.prepareUnit(point) | ||||
|  | ||||
| 		for _, o := range r.outputs { | ||||
| 			o <- point | ||||
|   | ||||
| @@ -15,9 +15,3 @@ CONF_DIR=/etc/cc-metric-collector | ||||
| CONF_FILE=/etc/cc-metric-collector/cc-metric-collector.json | ||||
|  | ||||
| RESTART_ON_UPGRADE=true | ||||
|  | ||||
| # Golang runtime debugging. (see: https://pkg.go.dev/runtime) | ||||
| # GODEBUG=gctrace=1 | ||||
|  | ||||
| # Golang garbage collection target percentage | ||||
| # GOGC=100 | ||||
|   | ||||
| @@ -75,7 +75,7 @@ case "$1" in | ||||
| 	fi | ||||
|  | ||||
| 	# Start Daemon | ||||
| 	start-stop-daemon --start -b --chdir "$WORK_DIR" --user "$CC_USER" -c "$CC_USER" --pidfile "$PID_FILE" --exec $DAEMON -- $CC_OPTS | ||||
| 	start-stop-daemon --start -b --chdir "$WORK_DIR" --user "$CC_USER" -c "$CC_USER" --pidfile "$PID_FILE" --exec $DAEMON -- $DAEMON_OPTS | ||||
| 	return=$? | ||||
| 	if [ $return -eq 0 ] | ||||
| 	then | ||||
|   | ||||
| @@ -39,7 +39,7 @@ def group_to_json(groupfile): | ||||
|             llist = re.split("\s+", line) | ||||
|             calc = llist[-1] | ||||
|             metric = " ".join(llist[:-1]) | ||||
|             scope = "hwthread" | ||||
|             scope = "cpu" | ||||
|             if "BOX" in calc: | ||||
|                 scope = "socket" | ||||
|             if "PWR" in calc: | ||||
|   | ||||
| @@ -19,9 +19,10 @@ type HttpSinkConfig struct { | ||||
| 	URL             string `json:"url,omitempty"` | ||||
| 	JWT             string `json:"jwt,omitempty"` | ||||
| 	Timeout         string `json:"timeout,omitempty"` | ||||
| 	MaxIdleConns    int    `json:"max_idle_connections,omitempty"` | ||||
| 	IdleConnTimeout string `json:"idle_connection_timeout,omitempty"` | ||||
| 	FlushDelay      string `json:"flush_delay,omitempty"` | ||||
| 	MaxRetries      int    `json:"max_retries,omitempty"` | ||||
| 	BatchSize       int    `json:"batch_size,omitempty"` | ||||
| } | ||||
|  | ||||
| type HttpSink struct { | ||||
| @@ -32,56 +33,63 @@ type HttpSink struct { | ||||
| 	buffer          *bytes.Buffer | ||||
| 	flushTimer      *time.Timer | ||||
| 	config          HttpSinkConfig | ||||
| 	maxIdleConns    int | ||||
| 	idleConnTimeout time.Duration | ||||
| 	timeout         time.Duration | ||||
| 	flushDelay      time.Duration | ||||
| 	batchSize       int | ||||
| } | ||||
|  | ||||
| func (s *HttpSink) Write(m lp.CCMetric) error { | ||||
| 	p := m.ToPoint(s.meta_as_tags) | ||||
| 	s.lock.Lock() | ||||
| 	firstWriteOfBatch := s.buffer.Len() == 0 | ||||
| 	_, err := s.encoder.Encode(p) | ||||
| 	s.lock.Unlock() | ||||
| 	if err != nil { | ||||
| 		cclog.ComponentError(s.name, "encoding failed:", err.Error()) | ||||
| 		return err | ||||
| 	if s.buffer.Len() == 0 && s.flushDelay != 0 { | ||||
| 		// This is the first write since the last flush, start the flushTimer! | ||||
| 		if s.flushTimer != nil && s.flushTimer.Stop() { | ||||
| 			cclog.ComponentDebug(s.name, "unexpected: the flushTimer was already running?") | ||||
| 		} | ||||
|  | ||||
| 	if s.flushDelay == 0 { | ||||
| 		return s.Flush() | ||||
| 	} | ||||
|  | ||||
| 	if firstWriteOfBatch { | ||||
| 		if s.flushTimer == nil { | ||||
| 		// Run a batched flush for all lines that have arrived in the last second | ||||
| 		s.flushTimer = time.AfterFunc(s.flushDelay, func() { | ||||
| 			if err := s.Flush(); err != nil { | ||||
| 				cclog.ComponentError(s.name, "flush failed:", err.Error()) | ||||
| 			} | ||||
| 		}) | ||||
| 		} else { | ||||
| 			s.flushTimer.Reset(s.flushDelay) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	return nil | ||||
| 	p := m.ToPoint(s.meta_as_tags) | ||||
|  | ||||
| 	s.lock.Lock() | ||||
| 	_, err := s.encoder.Encode(p) | ||||
| 	s.batchSize++ | ||||
| 	s.lock.Unlock() // defer does not work here as Flush() takes the lock as well | ||||
|  | ||||
| 	if err != nil { | ||||
| 		cclog.ComponentError(s.name, "encoding failed:", err.Error()) | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	// Flush synchronously if "flush_delay" is zero | ||||
| 	if s.flushDelay == 0 { | ||||
| 		return s.Flush() | ||||
| 	} | ||||
| 	if s.batchSize == s.config.BatchSize { | ||||
| 		return s.Flush() | ||||
| 	} | ||||
|  | ||||
| 	return err | ||||
| } | ||||
|  | ||||
| func (s *HttpSink) Flush() error { | ||||
| 	// Own lock for as short as possible: the time it takes to copy the buffer. | ||||
| 	// buffer is read by client.Do, prevent concurrent modifications | ||||
| 	s.lock.Lock() | ||||
| 	buf := make([]byte, s.buffer.Len()) | ||||
| 	copy(buf, s.buffer.Bytes()) | ||||
| 	s.buffer.Reset() | ||||
| 	s.lock.Unlock() | ||||
| 	if len(buf) == 0 { | ||||
| 	defer s.lock.Unlock() | ||||
|  | ||||
| 	// Do not flush empty buffer | ||||
| 	if s.buffer.Len() == 0 { | ||||
| 		return nil | ||||
| 	} | ||||
|  | ||||
| 	var res *http.Response | ||||
| 	for i := 0; i < s.config.MaxRetries; i++ { | ||||
| 	// Create new request to send buffer | ||||
| 		req, err := http.NewRequest(http.MethodPost, s.config.URL, bytes.NewReader(buf)) | ||||
| 	req, err := http.NewRequest(http.MethodPost, s.config.URL, s.buffer) | ||||
| 	if err != nil { | ||||
| 		cclog.ComponentError(s.name, "failed to create request:", err.Error()) | ||||
| 		return err | ||||
| @@ -92,25 +100,22 @@ func (s *HttpSink) Flush() error { | ||||
| 		req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", s.config.JWT)) | ||||
| 	} | ||||
|  | ||||
| 		// Do request | ||||
| 		res, err = s.client.Do(req) | ||||
| 	// Send | ||||
| 	res, err := s.client.Do(req) | ||||
|  | ||||
| 	// Clear buffer | ||||
| 	s.buffer.Reset() | ||||
| 	s.batchSize = 0 | ||||
|  | ||||
| 	// Handle transport/tcp errors | ||||
| 	if err != nil { | ||||
| 		cclog.ComponentError(s.name, "transport/tcp error:", err.Error()) | ||||
| 			// Wait between retries | ||||
| 			time.Sleep(time.Duration(i+1) * (time.Second / 2)) | ||||
| 			continue | ||||
| 		} | ||||
|  | ||||
| 		break | ||||
| 	} | ||||
|  | ||||
| 	if res == nil { | ||||
| 		return errors.New("flush failed due to repeated errors") | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	// Handle application errors | ||||
| 	if res.StatusCode != http.StatusOK { | ||||
| 		err := errors.New(res.Status) | ||||
| 		err = errors.New(res.Status) | ||||
| 		cclog.ComponentError(s.name, "application error:", err.Error()) | ||||
| 		return err | ||||
| 	} | ||||
| @@ -130,10 +135,11 @@ func NewHttpSink(name string, config json.RawMessage) (Sink, error) { | ||||
| 	s := new(HttpSink) | ||||
| 	// Set default values | ||||
| 	s.name = fmt.Sprintf("HttpSink(%s)", name) | ||||
| 	s.config.IdleConnTimeout = "120s" // should be larger than the measurement interval. | ||||
| 	s.config.MaxIdleConns = 10 | ||||
| 	s.config.IdleConnTimeout = "5s" | ||||
| 	s.config.Timeout = "5s" | ||||
| 	s.config.FlushDelay = "5s" | ||||
| 	s.config.MaxRetries = 3 | ||||
| 	s.config.FlushDelay = "1s" | ||||
| 	s.config.BatchSize = 100 | ||||
|  | ||||
| 	// Read config | ||||
| 	if len(config) > 0 { | ||||
| @@ -145,6 +151,9 @@ func NewHttpSink(name string, config json.RawMessage) (Sink, error) { | ||||
| 	if len(s.config.URL) == 0 { | ||||
| 		return nil, errors.New("`url` config option is required for HTTP sink") | ||||
| 	} | ||||
| 	if s.config.MaxIdleConns > 0 { | ||||
| 		s.maxIdleConns = s.config.MaxIdleConns | ||||
| 	} | ||||
| 	if len(s.config.IdleConnTimeout) > 0 { | ||||
| 		t, err := time.ParseDuration(s.config.IdleConnTimeout) | ||||
| 		if err == nil { | ||||
| @@ -169,7 +178,7 @@ func NewHttpSink(name string, config json.RawMessage) (Sink, error) { | ||||
| 		s.meta_as_tags[k] = true | ||||
| 	} | ||||
| 	tr := &http.Transport{ | ||||
| 		MaxIdleConns:    1, // We will only ever talk to one host. | ||||
| 		MaxIdleConns:    s.maxIdleConns, | ||||
| 		IdleConnTimeout: s.idleConnTimeout, | ||||
| 	} | ||||
| 	s.client = &http.Client{Transport: tr, Timeout: s.timeout} | ||||
|   | ||||
| @@ -15,6 +15,7 @@ The `http` sink uses POST requests to a HTTP server to submit the metrics in the | ||||
|     "max_idle_connections" : 10, | ||||
|     "idle_connection_timeout" : "5s", | ||||
|     "flush_delay": "2s", | ||||
|     "batch_size" : 100 | ||||
|   } | ||||
| } | ||||
| ``` | ||||
| @@ -27,3 +28,4 @@ The `http` sink uses POST requests to a HTTP server to submit the metrics in the | ||||
| - `max_idle_connections`: Maximally idle connections (default 10) | ||||
| - `idle_connection_timeout`: Timeout for idle connections (default '5s') | ||||
| - `flush_delay`: Batch all writes arriving in during this duration (default '1s', batching can be disabled by setting it to 0) | ||||
| - `batch_size`: Maximal number of batched metrics. Either it is flushed because batch size or the `flush_delay` is reached | ||||
|   | ||||
| @@ -25,6 +25,7 @@ type InfluxAsyncSinkConfig struct { | ||||
| 	Password     string `json:"password,omitempty"` | ||||
| 	Organization string `json:"organization,omitempty"` | ||||
| 	SSL          bool   `json:"ssl,omitempty"` | ||||
| 	RetentionPol string `json:"retention_policy,omitempty"` | ||||
| 	// Maximum number of points sent to server in single request. Default 5000 | ||||
| 	BatchSize uint `json:"batch_size,omitempty"` | ||||
| 	// Interval, in ms, in which is buffer flushed if it has not been already written (by reaching batch size) . Default 1000ms | ||||
| @@ -185,17 +186,12 @@ func NewInfluxAsyncSink(name string, config json.RawMessage) (Sink, error) { | ||||
| 			return nil, err | ||||
| 		} | ||||
| 	} | ||||
| 	if len(s.config.Port) == 0 { | ||||
| 		return nil, errors.New("Missing port configuration required by InfluxSink") | ||||
| 	} | ||||
| 	if len(s.config.Database) == 0 { | ||||
| 		return nil, errors.New("Missing database configuration required by InfluxSink") | ||||
| 	} | ||||
| 	if len(s.config.Organization) == 0 { | ||||
| 		return nil, errors.New("Missing organization configuration required by InfluxSink") | ||||
| 	} | ||||
| 	if len(s.config.Password) == 0 { | ||||
| 		return nil, errors.New("Missing password configuration required by InfluxSink") | ||||
| 	if len(s.config.Host) == 0 || | ||||
| 		len(s.config.Port) == 0 || | ||||
| 		len(s.config.Database) == 0 || | ||||
| 		len(s.config.Organization) == 0 || | ||||
| 		len(s.config.Password) == 0 { | ||||
| 		return nil, errors.New("not all configuration variables set required by InfluxAsyncSink") | ||||
| 	} | ||||
| 	// Create lookup map to use meta infos as tags in the output metric | ||||
| 	s.meta_as_tags = make(map[string]bool) | ||||
|   | ||||
| @@ -16,11 +16,7 @@ import ( | ||||
| 	"github.com/influxdata/influxdb-client-go/v2/api/write" | ||||
| ) | ||||
|  | ||||
| type InfluxSink struct { | ||||
| 	sink | ||||
| 	client   influxdb2.Client | ||||
| 	writeApi influxdb2Api.WriteAPIBlocking | ||||
| 	config   struct { | ||||
| type InfluxSinkConfig struct { | ||||
| 	defaultSinkConfig | ||||
| 	Host         string `json:"host,omitempty"` | ||||
| 	Port         string `json:"port,omitempty"` | ||||
| @@ -29,15 +25,28 @@ type InfluxSink struct { | ||||
| 	Password     string `json:"password,omitempty"` | ||||
| 	Organization string `json:"organization,omitempty"` | ||||
| 	SSL          bool   `json:"ssl,omitempty"` | ||||
| 		// Maximum number of points sent to server in single request. Default 100 | ||||
| 	FlushDelay   string `json:"flush_delay,omitempty"` | ||||
| 	BatchSize    int    `json:"batch_size,omitempty"` | ||||
| 		// Interval, in which is buffer flushed if it has not been already written (by reaching batch size). Default 1s | ||||
| 		FlushInterval string `json:"flush_delay,omitempty"` | ||||
| 	RetentionPol string `json:"retention_policy,omitempty"` | ||||
| 	// InfluxRetryInterval   string `json:"retry_interval"` | ||||
| 	// InfluxExponentialBase uint   `json:"retry_exponential_base"` | ||||
| 	// InfluxMaxRetries      uint   `json:"max_retries"` | ||||
| 	// InfluxMaxRetryTime    string `json:"max_retry_time"` | ||||
| 	//InfluxMaxRetryDelay  string `json:"max_retry_delay"` // It is mentioned in the docs but there is no way to set it | ||||
| } | ||||
|  | ||||
| type InfluxSink struct { | ||||
| 	sink | ||||
| 	client              influxdb2.Client | ||||
| 	writeApi            influxdb2Api.WriteAPIBlocking | ||||
| 	config              InfluxSinkConfig | ||||
| 	influxRetryInterval uint | ||||
| 	influxMaxRetryTime  uint | ||||
| 	batch               []*write.Point | ||||
| 	flushTimer          *time.Timer | ||||
| 	flushDelay          time.Duration | ||||
| 	lock                sync.Mutex // Flush() runs in another goroutine, so this lock has to protect the buffer | ||||
| 	//influxMaxRetryDelay uint | ||||
| } | ||||
|  | ||||
| // connect connects to the InfluxDB server | ||||
| @@ -67,6 +76,23 @@ func (s *InfluxSink) connect() error { | ||||
| 	// Set influxDB client options | ||||
| 	clientOptions := influxdb2.DefaultOptions() | ||||
|  | ||||
| 	// if s.influxRetryInterval != 0 { | ||||
| 	// 	cclog.ComponentDebug(s.name, "MaxRetryInterval", s.influxRetryInterval) | ||||
| 	// 	clientOptions.SetMaxRetryInterval(s.influxRetryInterval) | ||||
| 	// } | ||||
| 	// if s.influxMaxRetryTime != 0 { | ||||
| 	// 	cclog.ComponentDebug(s.name, "MaxRetryTime", s.influxMaxRetryTime) | ||||
| 	// 	clientOptions.SetMaxRetryTime(s.influxMaxRetryTime) | ||||
| 	// } | ||||
| 	// if s.config.InfluxExponentialBase != 0 { | ||||
| 	// 	cclog.ComponentDebug(s.name, "Exponential Base", s.config.InfluxExponentialBase) | ||||
| 	// 	clientOptions.SetExponentialBase(s.config.InfluxExponentialBase) | ||||
| 	// } | ||||
| 	// if s.config.InfluxMaxRetries != 0 { | ||||
| 	// 	cclog.ComponentDebug(s.name, "Max Retries", s.config.InfluxMaxRetries) | ||||
| 	// 	clientOptions.SetMaxRetries(s.config.InfluxMaxRetries) | ||||
| 	// } | ||||
|  | ||||
| 	// Do not check InfluxDB certificate | ||||
| 	clientOptions.SetTLSConfig( | ||||
| 		&tls.Config{ | ||||
| @@ -100,9 +126,7 @@ func (s *InfluxSink) Write(m lp.CCMetric) error { | ||||
| 		} | ||||
|  | ||||
| 		// Run a batched flush for all lines that have arrived in the last flush delay interval | ||||
| 		s.flushTimer = time.AfterFunc( | ||||
| 			s.flushDelay, | ||||
| 			func() { | ||||
| 		s.flushTimer = time.AfterFunc(s.flushDelay, func() { | ||||
| 			if err := s.Flush(); err != nil { | ||||
| 				cclog.ComponentError(s.name, "flush failed:", err.Error()) | ||||
| 			} | ||||
| @@ -170,7 +194,7 @@ func NewInfluxSink(name string, config json.RawMessage) (Sink, error) { | ||||
|  | ||||
| 	// Set config default values | ||||
| 	s.config.BatchSize = 100 | ||||
| 	s.config.FlushInterval = "1s" | ||||
| 	s.config.FlushDelay = "1s" | ||||
|  | ||||
| 	// Read config | ||||
| 	if len(config) > 0 { | ||||
| @@ -179,6 +203,12 @@ func NewInfluxSink(name string, config json.RawMessage) (Sink, error) { | ||||
| 			return nil, err | ||||
| 		} | ||||
| 	} | ||||
| 	s.influxRetryInterval = 0 | ||||
| 	s.influxMaxRetryTime = 0 | ||||
| 	// s.config.InfluxRetryInterval = "" | ||||
| 	// s.config.InfluxMaxRetryTime = "" | ||||
| 	// s.config.InfluxMaxRetries = 0 | ||||
| 	// s.config.InfluxExponentialBase = 0 | ||||
|  | ||||
| 	if len(s.config.Host) == 0 { | ||||
| 		return nil, errors.New("Missing host configuration required by InfluxSink") | ||||
| @@ -202,9 +232,21 @@ func NewInfluxSink(name string, config json.RawMessage) (Sink, error) { | ||||
| 		s.meta_as_tags[k] = true | ||||
| 	} | ||||
|  | ||||
| 	// toUint := func(duration string, def uint) uint { | ||||
| 	// 	if len(duration) > 0 { | ||||
| 	// 		t, err := time.ParseDuration(duration) | ||||
| 	// 		if err == nil { | ||||
| 	// 			return uint(t.Milliseconds()) | ||||
| 	// 		} | ||||
| 	// 	} | ||||
| 	// 	return def | ||||
| 	// } | ||||
| 	// s.influxRetryInterval = toUint(s.config.InfluxRetryInterval, s.influxRetryInterval) | ||||
| 	// s.influxMaxRetryTime = toUint(s.config.InfluxMaxRetryTime, s.influxMaxRetryTime) | ||||
|  | ||||
| 	// Configure flush delay duration | ||||
| 	if len(s.config.FlushInterval) > 0 { | ||||
| 		t, err := time.ParseDuration(s.config.FlushInterval) | ||||
| 	if len(s.config.FlushDelay) > 0 { | ||||
| 		t, err := time.ParseDuration(s.config.FlushDelay) | ||||
| 		if err == nil { | ||||
| 			s.flushDelay = t | ||||
| 		} | ||||
|   | ||||
| @@ -5,7 +5,6 @@ import ( | ||||
| 	"encoding/json" | ||||
| 	"errors" | ||||
| 	"fmt" | ||||
| 	"sync" | ||||
| 	"time" | ||||
|  | ||||
| 	cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" | ||||
| @@ -18,10 +17,9 @@ type NatsSinkConfig struct { | ||||
| 	defaultSinkConfig | ||||
| 	Host     string `json:"host,omitempty"` | ||||
| 	Port     string `json:"port,omitempty"` | ||||
| 	Subject    string `json:"subject,omitempty"` | ||||
| 	Database string `json:"database,omitempty"` | ||||
| 	User     string `json:"user,omitempty"` | ||||
| 	Password string `json:"password,omitempty"` | ||||
| 	FlushDelay string `json:"flush_delay,omitempty"` | ||||
| } | ||||
|  | ||||
| type NatsSink struct { | ||||
| @@ -30,10 +28,6 @@ type NatsSink struct { | ||||
| 	encoder *influx.Encoder | ||||
| 	buffer  *bytes.Buffer | ||||
| 	config  NatsSinkConfig | ||||
|  | ||||
| 	lock       sync.Mutex | ||||
| 	flushDelay time.Duration | ||||
| 	flushTimer *time.Timer | ||||
| } | ||||
|  | ||||
| func (s *NatsSink) connect() error { | ||||
| @@ -60,53 +54,37 @@ func (s *NatsSink) connect() error { | ||||
| } | ||||
|  | ||||
| func (s *NatsSink) Write(m lp.CCMetric) error { | ||||
| 	s.lock.Lock() | ||||
| 	if s.client != nil { | ||||
| 		_, err := s.encoder.Encode(m.ToPoint(s.meta_as_tags)) | ||||
| 	s.lock.Unlock() | ||||
| 		if err != nil { | ||||
| 			cclog.ComponentError(s.name, "Write:", err.Error()) | ||||
| 			return err | ||||
| 		} | ||||
|  | ||||
| 	if s.flushDelay == 0 { | ||||
| 		s.Flush() | ||||
| 	} else if s.flushTimer == nil { | ||||
| 		s.flushTimer = time.AfterFunc(s.flushDelay, func() { | ||||
| 			s.Flush() | ||||
| 		}) | ||||
| 	} else { | ||||
| 		s.flushTimer.Reset(s.flushDelay) | ||||
| 	} | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (s *NatsSink) Flush() error { | ||||
| 	s.lock.Lock() | ||||
| 	buf := append([]byte{}, s.buffer.Bytes()...) // copy bytes | ||||
| 	s.buffer.Reset() | ||||
| 	s.lock.Unlock() | ||||
|  | ||||
| 	if len(buf) == 0 { | ||||
| 		return nil | ||||
| 	} | ||||
|  | ||||
| 	if err := s.client.Publish(s.config.Subject, buf); err != nil { | ||||
| 	if s.client != nil { | ||||
| 		if err := s.client.Publish(s.config.Database, s.buffer.Bytes()); err != nil { | ||||
| 			cclog.ComponentError(s.name, "Flush:", err.Error()) | ||||
| 			return err | ||||
| 		} | ||||
| 		s.buffer.Reset() | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (s *NatsSink) Close() { | ||||
| 	if s.client != nil { | ||||
| 		cclog.ComponentDebug(s.name, "Close") | ||||
| 		s.client.Close() | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func NewNatsSink(name string, config json.RawMessage) (Sink, error) { | ||||
| 	s := new(NatsSink) | ||||
| 	s.name = fmt.Sprintf("NatsSink(%s)", name) | ||||
| 	s.flushDelay = 10 * time.Second | ||||
| 	if len(config) > 0 { | ||||
| 		err := json.Unmarshal(config, &s.config) | ||||
| 		if err != nil { | ||||
| @@ -116,7 +94,7 @@ func NewNatsSink(name string, config json.RawMessage) (Sink, error) { | ||||
| 	} | ||||
| 	if len(s.config.Host) == 0 || | ||||
| 		len(s.config.Port) == 0 || | ||||
| 		len(s.config.Subject) == 0 { | ||||
| 		len(s.config.Database) == 0 { | ||||
| 		return nil, errors.New("not all configuration variables set required by NatsSink") | ||||
| 	} | ||||
| 	// Create lookup map to use meta infos as tags in the output metric | ||||
| @@ -134,15 +112,5 @@ func NewNatsSink(name string, config json.RawMessage) (Sink, error) { | ||||
| 	if err := s.connect(); err != nil { | ||||
| 		return nil, fmt.Errorf("unable to connect: %v", err) | ||||
| 	} | ||||
|  | ||||
| 	s.flushTimer = nil | ||||
| 	if len(s.config.FlushDelay) != 0 { | ||||
| 		var err error | ||||
| 		s.flushDelay, err = time.ParseDuration(s.config.FlushDelay) | ||||
| 		if err != nil { | ||||
| 			return nil, err | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	return s, nil | ||||
| } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user