diff --git a/.github/workflows/AlmaLinux.yml b/.github/workflows/AlmaLinux.yml new file mode 100644 index 0000000..dd06dd2 --- /dev/null +++ b/.github/workflows/AlmaLinux.yml @@ -0,0 +1,64 @@ +# See: https://docs.github.com/en/actions/using-workflows/workflow-syntax-for-github-actions + +# Workflow name +name: AlmaLinux 8.5 RPM build + +# Run on tag push +on: + push: + tags: + - '**' + +jobs: + + # + # Build on AlmaLinux 8.5 using go-toolset + # + AlmaLinux-RPM-build: + runs-on: ubuntu-latest + # See: https://hub.docker.com/_/almalinux + container: almalinux:8.5 + steps: + + # Use dnf to install development packages + - name: Install development packages + run: dnf --assumeyes group install "Development Tools" "RPM Development Tools" + + # Checkout git repository and submodules + # fetch-depth must be 0 to use git describe + # See: https://github.com/marketplace/actions/checkout + - name: Checkout + uses: actions/checkout@v2 + with: + submodules: recursive + fetch-depth: 0 + + # Use dnf to install build dependencies + - name: Install build dependencies + run: dnf --assumeyes builddep scripts/cc-metric-collector.spec + + - name: RPM build MetricCollector + id: rpmbuild + run: make RPM + + # See: https://github.com/actions/upload-artifact + - name: Save RPM as artifact + uses: actions/upload-artifact@v2 + with: + name: cc-metric-collector RPM for AlmaLinux 8.5 + path: ${{ steps.rpmbuild.outputs.RPM }} + - name: Save SRPM as artifact + uses: actions/upload-artifact@v2 + with: + name: cc-metric-collector SRPM for AlmaLinux 8.5 + path: ${{ steps.rpmbuild.outputs.SRPM }} + + # See: https://github.com/softprops/action-gh-release + - name: Release + uses: softprops/action-gh-release@v1 + if: startsWith(github.ref, 'refs/tags/') + with: + name: cc-metric-collector-${{github.ref_name}} + files: | + ${{ steps.rpmbuild.outputs.RPM }} + ${{ steps.rpmbuild.outputs.SRPM }} diff --git a/.github/workflows/RedHatUniversalBaseImage.yml b/.github/workflows/RedHatUniversalBaseImage.yml new file mode 100644 index 0000000..205a133 --- /dev/null +++ b/.github/workflows/RedHatUniversalBaseImage.yml @@ -0,0 +1,64 @@ +# See: https://docs.github.com/en/actions/using-workflows/workflow-syntax-for-github-actions + +# Workflow name +name: Red Hat Universal Base Image 8 RPM build + +# Run on tag push +on: + push: + tags: + - '**' + +jobs: + + # + # Build on UBI 8 using go-toolset + # + UBI-8-RPM-build: + runs-on: ubuntu-latest + # See: https://catalog.redhat.com/software/containers/ubi8/ubi/5c359854d70cc534b3a3784e?container-tabs=gti + container: registry.access.redhat.com/ubi8/ubi:8.5-226.1645809065 + steps: + + # Use dnf to install development packages + - name: Install development packages + run: dnf --assumeyes --disableplugin=subscription-manager install rpm-build go-srpm-macros rpm-build-libs rpm-libs gcc make python38 git + + # Checkout git repository and submodules + # fetch-depth must be 0 to use git describe + # See: https://github.com/marketplace/actions/checkout + - name: Checkout + uses: actions/checkout@v2 + with: + submodules: recursive + fetch-depth: 0 + + # Use dnf to install build dependencies + - name: Install build dependencies + run: dnf --assumeyes --disableplugin=subscription-manager builddep scripts/cc-metric-collector.spec + + - name: RPM build MetricCollector + id: rpmbuild + run: make RPM + + # See: https://github.com/actions/upload-artifact + - name: Save RPM as artifact + uses: actions/upload-artifact@v2 + with: + name: cc-metric-collector RPM for UBI 8 + path: ${{ steps.rpmbuild.outputs.RPM }} + - name: Save SRPM as artifact + uses: actions/upload-artifact@v2 + with: + name: cc-metric-collector SRPM for UBI 8 + path: ${{ steps.rpmbuild.outputs.SRPM }} + + # See: https://github.com/softprops/action-gh-release + - name: Release + uses: softprops/action-gh-release@v1 + if: startsWith(github.ref, 'refs/tags/') + with: + name: cc-metric-collector-${{github.ref_name}} + files: | + ${{ steps.rpmbuild.outputs.RPM }} + ${{ steps.rpmbuild.outputs.SRPM }} diff --git a/.github/workflows/rpmbuild.yml b/.github/workflows/rpmbuild.yml deleted file mode 100644 index 9c6ae13..0000000 --- a/.github/workflows/rpmbuild.yml +++ /dev/null @@ -1,58 +0,0 @@ -name: Run RPM Build -on: - push: - tags: - - '**' - -jobs: - build-alma-8_5: - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v2 - - uses: TomTheBear/rpmbuild@alma8.5 - id: rpm - name: Build RPM package on AlmaLinux 8.5 - with: - spec_file: "./scripts/cc-metric-collector.spec" - - name: Save RPM as artifact - uses: actions/upload-artifact@v1.0.0 - with: - name: cc-metric-collector RPM AlmaLinux 8.5 - path: ${{ steps.rpm.outputs.rpm_path }} - - name: Save SRPM as artifact - uses: actions/upload-artifact@v1.0.0 - with: - name: cc-metric-collector SRPM AlmaLinux 8.5 - path: ${{ steps.rpm.outputs.source_rpm_path }} - - name: Release - uses: softprops/action-gh-release@v1 - with: - name: cc-metric-collector-${{github.ref_name}} - files: | - ${{ steps.rpm.outputs.source_rpm_path }} - ${{ steps.rpm.outputs.rpm_path }} - build-rhel-ubi8: - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v2 - - uses: TomTheBear/rpmbuild@rh-ubi8 - id: rpm - name: Build RPM package on Red Hat Universal Base Image 8 - with: - spec_file: "./scripts/cc-metric-collector.spec" - - name: Save RPM as artifact - uses: actions/upload-artifact@v1.0.0 - with: - name: cc-metric-collector RPM Red Hat Universal Base Image 8 - path: ${{ steps.rpm.outputs.rpm_path }} - - name: Save SRPM as artifact - uses: actions/upload-artifact@v1.0.0 - with: - name: cc-metric-collector SRPM Red Hat Universal Base Image 8 - path: ${{ steps.rpm.outputs.source_rpm_path }} - - name: Release - uses: softprops/action-gh-release@v1 - with: - files: | - ${{ steps.rpm.outputs.source_rpm_path }} - ${{ steps.rpm.outputs.rpm_path }} diff --git a/.github/workflows/runonce.yml b/.github/workflows/runonce.yml index da5b86c..be161ea 100644 --- a/.github/workflows/runonce.yml +++ b/.github/workflows/runonce.yml @@ -1,46 +1,68 @@ +# See: https://docs.github.com/en/actions/using-workflows/workflow-syntax-for-github-actions + +# Workflow name name: Run Test + +# Run on event push on: push jobs: + # + # Job build-1-17 + # Build on latest Ubuntu using golang version 1.17 + # build-1-17: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + # See: https://github.com/marketplace/actions/checkout + # Checkout git repository and submodules + - name: Checkout + uses: actions/checkout@v2 with: submodules: recursive # See: https://github.com/marketplace/actions/setup-go-environment - name: Setup Golang - uses: actions/setup-go@v2.1.5 + uses: actions/setup-go@v2 with: - go-version: '^1.17.6' + go-version: '^1.17.7' + # Install libganglia - name: Setup Ganglia run: sudo apt install ganglia-monitor libganglia1 - name: Build MetricCollector run: make - - name: Run MetricCollector + - name: Run MetricCollector once run: ./cc-metric-collector --once --config .github/ci-config.json + + # + # Job build-1-16 + # Build on latest Ubuntu using golang version 1.16 + # build-1-16: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + # See: https://github.com/marketplace/actions/checkout + # Checkout git repository and submodules + - name: Checkout + uses: actions/checkout@v2 with: submodules: recursive # See: https://github.com/marketplace/actions/setup-go-environment - name: Setup Golang - uses: actions/setup-go@v2.1.5 + uses: actions/setup-go@v2 with: go-version: '^1.16.7' # The version AlmaLinux 8.5 uses + # Install libganglia - name: Setup Ganglia run: sudo apt install ganglia-monitor libganglia1 - name: Build MetricCollector run: make - - name: Run MetricCollectorlibganglia1 + - name: Run MetricCollector once run: ./cc-metric-collector --once --config .github/ci-config.json diff --git a/Makefile b/Makefile index b32fb6b..d747899 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,5 @@ APP = cc-metric-collector -GOSRC_APP := metric-collector.go +GOSRC_APP := cc-metric-collector.go GOSRC_COLLECTORS := $(wildcard collectors/*.go) GOSRC_SINKS := $(wildcard sinks/*.go) GOSRC_RECEIVERS := $(wildcard receivers/*.go) @@ -56,3 +56,33 @@ vet: staticcheck: go install honnef.co/go/tools/cmd/staticcheck@latest $$(go env GOPATH)/bin/staticcheck ./... + +.ONESHELL: +.PHONY: RPM +RPM: scripts/cc-metric-collector.spec + @WORKSPACE="$${PWD}" + @SPECFILE="$${WORKSPACE}/scripts/cc-metric-collector.spec" + # Setup RPM build tree + @eval $$(rpm --eval "ARCH='%{_arch}' RPMDIR='%{_rpmdir}' SOURCEDIR='%{_sourcedir}' SPECDIR='%{_specdir}' SRPMDIR='%{_srcrpmdir}' BUILDDIR='%{_builddir}'") + @mkdir --parents --verbose "$${RPMDIR}" "$${SOURCEDIR}" "$${SPECDIR}" "$${SRPMDIR}" "$${BUILDDIR}" + # Create source tarball + @COMMITISH="HEAD" + @VERS=$$(git describe --tags $${COMMITISH}) + @VERS=$${VERS#v} + @VERS=$${VERS//-/_} + @eval $$(rpmspec --query --queryformat "NAME='%{name}' VERSION='%{version}' RELEASE='%{release}' NVR='%{NVR}' NVRA='%{NVRA}'" --define="VERS $${VERS}" "$${SPECFILE}") + @PREFIX="$${NAME}-$${VERSION}" + @FORMAT="tar.gz" + @SRCFILE="$${SOURCEDIR}/$${PREFIX}.$${FORMAT}" + @git archive --verbose --format "$${FORMAT}" --prefix="$${PREFIX}/" --output="$${SRCFILE}" $${COMMITISH} + # Build RPM and SRPM + @rpmbuild -ba --define="VERS $${VERS}" --rmsource --clean "$${SPECFILE}" + # Report RPMs and SRPMs when in GitHub Workflow + @if [[ "$${GITHUB_ACTIONS}" == true ]]; then + @ RPMFILE="$${RPMDIR}/$${ARCH}/$${NVRA}.rpm" + @ SRPMFILE="$${SRPMDIR}/$${NVR}.src.rpm" + @ echo "RPM: $${RPMFILE}" + @ echo "SRPM: $${SRPMFILE}" + @ echo "::set-output name=SRPM::$${SRPMFILE}" + @ echo "::set-output name=RPM::$${RPMFILE}" + @fi diff --git a/metric-collector.go b/cc-metric-collector.go similarity index 100% rename from metric-collector.go rename to cc-metric-collector.go diff --git a/collectors/Makefile b/collectors/Makefile index 0c637b5..b07bccd 100644 --- a/collectors/Makefile +++ b/collectors/Makefile @@ -1,82 +1,25 @@ -# Use central installation -CENTRAL_INSTALL = false -# How to access hardware performance counters through LIKWID. -# Recommended is 'direct' mode -ACCESSMODE = direct -####################################################################### -# if CENTRAL_INSTALL == true -####################################################################### -# Path to central installation (if CENTRAL_INSTALL=true) -LIKWID_BASE=/apps/likwid/5.2.1 -# LIKWID version (should be same major version as central installation, 5.2.x) +all: likwid + + +# LIKWID version LIKWID_VERSION = 5.2.1 -####################################################################### -# if CENTRAL_INSTALL == false and ACCESSMODE == accessdaemon -####################################################################### -# Where to install the accessdaemon -DAEMON_INSTALLDIR = /usr/local -# Which user to use for the accessdaemon -DAEMON_USER = root -# Which group to use for the accessdaemon -DAEMON_GROUP = root +.ONESHELL: +.PHONY: likwid +likwid: + INSTALL_FOLDER="$${PWD}/likwid" + BUILD_FOLDER="$${PWD}/likwidbuild" + if [ -d $${INSTALL_FOLDER} ]; then rm -r $${INSTALL_FOLDER}; fi + mkdir --parents --verbose $${INSTALL_FOLDER} $${BUILD_FOLDER} + wget -P "$${BUILD_FOLDER}" ftp://ftp.rrze.uni-erlangen.de/mirrors/likwid/likwid-$(LIKWID_VERSION).tar.gz + tar -C $${BUILD_FOLDER} -xf $${BUILD_FOLDER}/likwid-$(LIKWID_VERSION).tar.gz + install -Dpm 0644 $${BUILD_FOLDER}/likwid-$(LIKWID_VERSION)/src/includes/likwid*.h $${INSTALL_FOLDER}/ + install -Dpm 0644 $${BUILD_FOLDER}/likwid-$(LIKWID_VERSION)/src/includes/bstrlib.h $${INSTALL_FOLDER}/ + rm -r $${BUILD_FOLDER} - -################################################# -# No need to change anything below this line -################################################# -INSTALL_FOLDER = ./likwid -BUILD_FOLDER = ./likwid/build - -ifneq ($(strip $(CENTRAL_INSTALL)),true) -LIKWID_BASE := $(shell pwd)/$(INSTALL_FOLDER) -DAEMON_BASE := $(LIKWID_BASE) -GROUPS_BASE := $(LIKWID_BASE)/groups -all: $(INSTALL_FOLDER)/liblikwid.a cleanup -else -DAEMON_BASE= $(LIKWID_BASE)/sbin -all: $(INSTALL_FOLDER)/liblikwid.a cleanup -endif - - - -$(BUILD_FOLDER)/likwid-$(LIKWID_VERSION).tar.gz: $(BUILD_FOLDER) - wget -P $(BUILD_FOLDER) ftp://ftp.rrze.uni-erlangen.de/mirrors/likwid/likwid-$(LIKWID_VERSION).tar.gz - -$(BUILD_FOLDER): - mkdir -p $(BUILD_FOLDER) - -$(INSTALL_FOLDER): - mkdir -p $(INSTALL_FOLDER) - -$(BUILD_FOLDER)/likwid-$(LIKWID_VERSION): $(BUILD_FOLDER)/likwid-$(LIKWID_VERSION).tar.gz - tar -C $(BUILD_FOLDER) -xf $(BUILD_FOLDER)/likwid-$(LIKWID_VERSION).tar.gz - -$(INSTALL_FOLDER)/liblikwid.a: $(BUILD_FOLDER)/likwid-$(LIKWID_VERSION) $(INSTALL_FOLDER) - sed -i -e s+"PREFIX ?= .*"+"PREFIX = $(LIKWID_BASE)"+g \ - -e s+"SHARED_LIBRARY = .*"+"SHARED_LIBRARY = false"+g \ - -e s+"ACCESSMODE = .*"+"ACCESSMODE = $(ACCESSMODE)"+g \ - -e s+"INSTALLED_ACCESSDAEMON = .*"+"INSTALLED_ACCESSDAEMON = $(DAEMON_INSTALLDIR)/likwid-accessD"+g \ - $(BUILD_FOLDER)/likwid-$(LIKWID_VERSION)/config.mk - cd $(BUILD_FOLDER)/likwid-$(LIKWID_VERSION) && make - cp $(BUILD_FOLDER)/likwid-$(LIKWID_VERSION)/liblikwid.a $(INSTALL_FOLDER) - cp $(BUILD_FOLDER)/likwid-$(LIKWID_VERSION)/ext/hwloc/liblikwid-hwloc.a $(INSTALL_FOLDER) - cp $(BUILD_FOLDER)/likwid-$(LIKWID_VERSION)/src/includes/likwid*.h $(INSTALL_FOLDER) - cp $(BUILD_FOLDER)/likwid-$(LIKWID_VERSION)/src/includes/bstrlib.h $(INSTALL_FOLDER) - -$(DAEMON_INSTALLDIR)/likwid-accessD: $(BUILD_FOLDER)/likwid-$(LIKWID_VERSION)/likwid-accessD - sudo -u $(DAEMON_USER) -g $(DAEMON_GROUP) install -m 4775 $(BUILD_FOLDER)/likwid-$(LIKWID_VERSION)/likwid-accessD $(DAEMON_INSTALLDIR)/likwid-accessD - -prepare_collector: likwidMetric.go - cp likwidMetric.go likwidMetric.go.orig - sed -i -e s+"const GROUPPATH =.*"+"const GROUPPATH = \`$(GROUPS_BASE)\`"+g likwidMetric.go - -cleanup: - rm -rf $(BUILD_FOLDER) - -clean: cleanup +clean: rm -rf likwid .PHONY: clean diff --git a/collectors/README.md b/collectors/README.md index 00e0da7..3fcdd49 100644 --- a/collectors/README.md +++ b/collectors/README.md @@ -37,6 +37,8 @@ In contrast to the configuration files for sinks and receivers, the collectors c * [`cpufreq_cpuinfo`](./cpufreqCpuinfoMetric.md) * [`numastat`](./numastatMetric.md) * [`gpfs`](./gpfsMetric.md) +* [`beegfs_meta`](./beegfsmetaMetric.md) +* [`beegfs_storage`](./beegfsstorageMetric.md) ## Todos diff --git a/collectors/beegfsmetaMetric.go b/collectors/beegfsmetaMetric.go new file mode 100644 index 0000000..57b1e39 --- /dev/null +++ b/collectors/beegfsmetaMetric.go @@ -0,0 +1,229 @@ +package collectors + +import ( + "bufio" + "bytes" + "encoding/json" + "fmt" + "io/ioutil" + "os" + "os/exec" + "os/user" + "regexp" + "strconv" + "strings" + "time" + + cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" + lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" +) + +const DEFAULT_BEEGFS_CMD = "beegfs-ctl" + +// Struct for the collector-specific JSON config +type BeegfsMetaCollectorConfig struct { + Beegfs string `json:"beegfs_path"` + ExcludeMetrics []string `json:"exclude_metrics,omitempty"` + ExcludeFilesystem []string `json:"exclude_filesystem"` +} + +type BeegfsMetaCollector struct { + metricCollector + tags map[string]string + matches map[string]string + config BeegfsMetaCollectorConfig + skipFS map[string]struct{} +} + +func (m *BeegfsMetaCollector) Init(config json.RawMessage) error { + // Check if already initialized + if m.init { + return nil + } + // Metrics + var nodeMdstat_array = [39]string{ + "sum", "ack", "close", "entInf", + "fndOwn", "mkdir", "create", "rddir", + "refrEn", "mdsInf", "rmdir", "rmLnk", + "mvDirIns", "mvFiIns", "open", "ren", + "sChDrct", "sAttr", "sDirPat", "stat", + "statfs", "trunc", "symlnk", "unlnk", + "lookLI", "statLI", "revalLI", "openLI", + "createLI", "hardlnk", "flckAp", "flckEn", + "flckRg", "dirparent", "listXA", "getXA", + "rmXA", "setXA", "mirror"} + + m.name = "BeegfsMetaCollector" + m.setup() + // Set default beegfs-ctl binary + + m.config.Beegfs = DEFAULT_BEEGFS_CMD + + // Read JSON configuration + if len(config) > 0 { + err := json.Unmarshal(config, &m.config) + if err != nil { + return err + } + } + + //create map with possible variables + m.matches = make(map[string]string) + for _, value := range nodeMdstat_array { + _, skip := stringArrayContains(m.config.ExcludeMetrics, value) + if skip { + m.matches["other"] = "0" + } else { + m.matches["beegfs_cmeta_"+value] = "0" + } + } + + m.meta = map[string]string{ + "source": m.name, + "group": "BeegfsMeta", + } + m.tags = map[string]string{ + "type": "node", + "filesystem": "", + } + m.skipFS = make(map[string]struct{}) + for _, fs := range m.config.ExcludeFilesystem { + m.skipFS[fs] = struct{}{} + } + + // Beegfs file system statistics can only be queried by user root + user, err := user.Current() + if err != nil { + return fmt.Errorf("BeegfsMetaCollector.Init(): Failed to get current user: %v", err) + } + if user.Uid != "0" { + return fmt.Errorf("BeegfsMetaCollector.Init(): BeeGFS file system statistics can only be queried by user root") + } + + // Check if beegfs-ctl is in executable search path + _, err = exec.LookPath(m.config.Beegfs) + if err != nil { + return fmt.Errorf("BeegfsMetaCollector.Init(): Failed to find beegfs-ctl binary '%s': %v", m.config.Beegfs, err) + } + m.init = true + return nil +} + +func (m *BeegfsMetaCollector) Read(interval time.Duration, output chan lp.CCMetric) { + if !m.init { + return + } + //get mounpoint + buffer, _ := ioutil.ReadFile(string("/proc/mounts")) + mounts := strings.Split(string(buffer), "\n") + var mountpoints []string + for _, line := range mounts { + if len(line) == 0 { + continue + } + f := strings.Fields(line) + if strings.Contains(f[0], "beegfs_ondemand") { + // Skip excluded filesystems + if _, skip := m.skipFS[f[1]]; skip { + continue + } + mountpoints = append(mountpoints, f[1]) + } + } + + if len(mountpoints) == 0 { + return + } + + for _, mountpoint := range mountpoints { + m.tags["filesystem"] = mountpoint + + // bwwgfs-ctl: + // --clientstats: Show client IO statistics. + // --nodetype=meta: The node type to query (meta, storage). + // --interval: + // --mount=/mnt/beeond/: Which mount point + //cmd := exec.Command(m.config.Beegfs, "/root/mc/test.txt") + mountoption := "--mount=" + mountpoint + cmd := exec.Command(m.config.Beegfs, "--clientstats", + "--nodetype=meta", mountoption, "--allstats") + cmd.Stdin = strings.NewReader("\n") + cmdStdout := new(bytes.Buffer) + cmdStderr := new(bytes.Buffer) + cmd.Stdout = cmdStdout + cmd.Stderr = cmdStderr + err := cmd.Run() + if err != nil { + fmt.Fprintf(os.Stderr, "BeegfsMetaCollector.Read(): Failed to execute command \"%s\": %s\n", cmd.String(), err.Error()) + fmt.Fprintf(os.Stderr, "BeegfsMetaCollector.Read(): command exit code: \"%d\"\n", cmd.ProcessState.ExitCode()) + data, _ := ioutil.ReadAll(cmdStderr) + fmt.Fprintf(os.Stderr, "BeegfsMetaCollector.Read(): command stderr: \"%s\"\n", string(data)) + data, _ = ioutil.ReadAll(cmdStdout) + fmt.Fprintf(os.Stderr, "BeegfsMetaCollector.Read(): command stdout: \"%s\"\n", string(data)) + return + } + // Read I/O statistics + scanner := bufio.NewScanner(cmdStdout) + + sumLine := regexp.MustCompile(`^Sum:\s+\d+\s+\[[a-zA-Z]+\]+`) + //Line := regexp.MustCompile(`^(.*)\s+(\d)+\s+\[([a-zA-Z]+)\]+`) + statsLine := regexp.MustCompile(`^(.*?)\s+?(\d.*?)$`) + singleSpacePattern := regexp.MustCompile(`\s+`) + removePattern := regexp.MustCompile(`[\[|\]]`) + + for scanner.Scan() { + readLine := scanner.Text() + //fmt.Println(readLine) + // Jump few lines, we only want the I/O stats from nodes + if !sumLine.MatchString(readLine) { + continue + } + + match := statsLine.FindStringSubmatch(readLine) + // nodeName = "Sum:" or would be nodes + // nodeName := match[1] + //Remove multiple whitespaces + dummy := removePattern.ReplaceAllString(match[2], " ") + metaStats := strings.TrimSpace(singleSpacePattern.ReplaceAllString(dummy, " ")) + split := strings.Split(metaStats, " ") + + // fill map with values + // split[i+1] = mdname + // split[i] = amount of md operations + for i := 0; i <= len(split)-1; i += 2 { + if _, ok := m.matches[split[i+1]]; ok { + m.matches["beegfs_cmeta_"+split[i+1]] = split[i] + } else { + f1, err := strconv.ParseFloat(m.matches["other"], 32) + if err != nil { + cclog.ComponentError( + m.name, + fmt.Sprintf("Metric (other): Failed to convert str written '%s' to float: %v", m.matches["other"], err)) + continue + } + f2, err := strconv.ParseFloat(split[i], 32) + if err != nil { + cclog.ComponentError( + m.name, + fmt.Sprintf("Metric (other): Failed to convert str written '%s' to float: %v", m.matches["other"], err)) + continue + } + //mdStat["other"] = fmt.Sprintf("%f", f1+f2) + m.matches["beegfs_cstorage_other"] = fmt.Sprintf("%f", f1+f2) + } + } + + for key, data := range m.matches { + value, _ := strconv.ParseFloat(data, 32) + y, err := lp.New(key, m.tags, m.meta, map[string]interface{}{"value": value}, time.Now()) + if err == nil { + output <- y + } + } + } + } +} + +func (m *BeegfsMetaCollector) Close() { + m.init = false +} diff --git a/collectors/beegfsmetaMetric.md b/collectors/beegfsmetaMetric.md new file mode 100644 index 0000000..932e72f --- /dev/null +++ b/collectors/beegfsmetaMetric.md @@ -0,0 +1,75 @@ +## `BeeGFS on Demand` collector +This Collector is to collect BeeGFS on Demand (BeeOND) metadata clientstats. + +```json + "beegfs_meta": { + "beegfs_path": "/usr/bin/beegfs-ctl", + "exclude_filesystem": [ + "/mnt/ignore_me" + ], + "exclude_metrics": [ + "ack", + "entInf", + "fndOwn" + ] + } +``` + +The `BeeGFS On Demand (BeeOND)` collector uses the `beegfs-ctl` command to read performance metrics for +BeeGFS filesystems. + +The reported filesystems can be filtered with the `exclude_filesystem` option +in the configuration. + +The path to the `beegfs-ctl` command can be configured with the `beegfs_path` option +in the configuration. + +When using the `exclude_metrics` option, the excluded metrics are summed as `other`. + +Important: The metrics listed below, are similar to the naming of BeeGFS. The Collector prefixes these with `beegfs_cstorage`(beegfs client storage). + +For example beegfs metric `open`-> `beegfs_cstorage_open` + +Available Metrics: + +* sum +* ack +* close +* entInf +* fndOwn +* mkdir +* create +* rddir +* refrEnt +* mdsInf +* rmdir +* rmLnk +* mvDirIns +* mvFiIns +* open +* ren +* sChDrct +* sAttr +* sDirPat +* stat +* statfs +* trunc +* symlnk +* unlnk +* lookLI +* statLI +* revalLI +* openLI +* createLI +* hardlnk +* flckAp +* flckEn +* flckRg +* dirparent +* listXA +* getXA +* rmXA +* setXA +* mirror + +The collector adds a `filesystem` tag to all metrics \ No newline at end of file diff --git a/collectors/beegfsstorageMetric.go b/collectors/beegfsstorageMetric.go new file mode 100644 index 0000000..cbc8314 --- /dev/null +++ b/collectors/beegfsstorageMetric.go @@ -0,0 +1,221 @@ +package collectors + +import ( + "bufio" + "bytes" + "encoding/json" + "fmt" + "io/ioutil" + "os" + "os/exec" + "os/user" + "regexp" + "strconv" + "strings" + "time" + + cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" + lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" +) + +// Struct for the collector-specific JSON config +type BeegfsStorageCollectorConfig struct { + Beegfs string `json:"beegfs_path"` + ExcludeMetrics []string `json:"exclude_metrics,omitempty"` + ExcludeFilesystem []string `json:"exclude_filesystem"` +} + +type BeegfsStorageCollector struct { + metricCollector + tags map[string]string + matches map[string]string + config BeegfsStorageCollectorConfig + skipFS map[string]struct{} +} + +func (m *BeegfsStorageCollector) Init(config json.RawMessage) error { + // Check if already initialized + if m.init { + return nil + } + // Metrics + var storageStat_array = [18]string{ + "sum", "ack", "sChDrct", "getFSize", + "sAttr", "statfs", "trunc", "close", + "fsync", "ops-rd", "MiB-rd/s", "ops-wr", + "MiB-wr/s", "gendbg", "hrtbeat", "remNode", + "storInf", "unlnk"} + + m.name = "BeegfsStorageCollector" + m.setup() + // Set default beegfs-ctl binary + + m.config.Beegfs = DEFAULT_BEEGFS_CMD + + // Read JSON configuration + if len(config) > 0 { + err := json.Unmarshal(config, &m.config) + if err != nil { + return err + } + } + println(m.config.Beegfs) + //create map with possible variables + m.matches = make(map[string]string) + for _, value := range storageStat_array { + _, skip := stringArrayContains(m.config.ExcludeMetrics, value) + if skip { + m.matches["other"] = "0" + } else { + m.matches["beegfs_cstorage_"+value] = "0" + } + } + + m.meta = map[string]string{ + "source": m.name, + "group": "BeegfsStorage", + } + m.tags = map[string]string{ + "type": "node", + "filesystem": "", + } + m.skipFS = make(map[string]struct{}) + for _, fs := range m.config.ExcludeFilesystem { + m.skipFS[fs] = struct{}{} + } + + // Beegfs file system statistics can only be queried by user root + user, err := user.Current() + if err != nil { + return fmt.Errorf("BeegfsStorageCollector.Init(): Failed to get current user: %v", err) + } + if user.Uid != "0" { + return fmt.Errorf("BeegfsStorageCollector.Init(): BeeGFS file system statistics can only be queried by user root") + } + + // Check if beegfs-ctl is in executable search path + _, err = exec.LookPath(m.config.Beegfs) + if err != nil { + return fmt.Errorf("BeegfsStorageCollector.Init(): Failed to find beegfs-ctl binary '%s': %v", m.config.Beegfs, err) + } + m.init = true + return nil +} + +func (m *BeegfsStorageCollector) Read(interval time.Duration, output chan lp.CCMetric) { + if !m.init { + return + } + //get mounpoint + buffer, _ := ioutil.ReadFile(string("/proc/mounts")) + mounts := strings.Split(string(buffer), "\n") + var mountpoints []string + for _, line := range mounts { + if len(line) == 0 { + continue + } + f := strings.Fields(line) + if strings.Contains(f[0], "beegfs_ondemand") { + // Skip excluded filesystems + if _, skip := m.skipFS[f[1]]; skip { + continue + } + mountpoints = append(mountpoints, f[1]) + } + } + if len(mountpoints) == 0 { + return + } + // collects stats for each BeeGFS on Demand FS + for _, mountpoint := range mountpoints { + m.tags["filesystem"] = mountpoint + + // bwwgfs-ctl: + // --clientstats: Show client IO statistics. + // --nodetype=meta: The node type to query (meta, storage). + // --interval: + // --mount=/mnt/beeond/: Which mount point + //cmd := exec.Command(m.config.Beegfs, "/root/mc/test.txt") + mountoption := "--mount=" + mountpoint + cmd := exec.Command(m.config.Beegfs, "--clientstats", + "--nodetype=storage", mountoption, "--allstats") + cmd.Stdin = strings.NewReader("\n") + cmdStdout := new(bytes.Buffer) + cmdStderr := new(bytes.Buffer) + cmd.Stdout = cmdStdout + cmd.Stderr = cmdStderr + err := cmd.Run() + if err != nil { + fmt.Fprintf(os.Stderr, "BeegfsStorageCollector.Read(): Failed to execute command \"%s\": %s\n", cmd.String(), err.Error()) + fmt.Fprintf(os.Stderr, "BeegfsStorageCollector.Read(): command exit code: \"%d\"\n", cmd.ProcessState.ExitCode()) + data, _ := ioutil.ReadAll(cmdStderr) + fmt.Fprintf(os.Stderr, "BeegfsStorageCollector.Read(): command stderr: \"%s\"\n", string(data)) + data, _ = ioutil.ReadAll(cmdStdout) + fmt.Fprintf(os.Stderr, "BeegfsStorageCollector.Read(): command stdout: \"%s\"\n", string(data)) + return + } + // Read I/O statistics + scanner := bufio.NewScanner(cmdStdout) + + sumLine := regexp.MustCompile(`^Sum:\s+\d+\s+\[[a-zA-Z]+\]+`) + //Line := regexp.MustCompile(`^(.*)\s+(\d)+\s+\[([a-zA-Z]+)\]+`) + statsLine := regexp.MustCompile(`^(.*?)\s+?(\d.*?)$`) + singleSpacePattern := regexp.MustCompile(`\s+`) + removePattern := regexp.MustCompile(`[\[|\]]`) + + for scanner.Scan() { + readLine := scanner.Text() + //fmt.Println(readLine) + // Jump few lines, we only want the I/O stats from nodes + if !sumLine.MatchString(readLine) { + continue + } + + match := statsLine.FindStringSubmatch(readLine) + // nodeName = "Sum:" or would be nodes + // nodeName := match[1] + //Remove multiple whitespaces + dummy := removePattern.ReplaceAllString(match[2], " ") + metaStats := strings.TrimSpace(singleSpacePattern.ReplaceAllString(dummy, " ")) + split := strings.Split(metaStats, " ") + + // fill map with values + // split[i+1] = mdname + // split[i] = amount of operations + for i := 0; i <= len(split)-1; i += 2 { + if _, ok := m.matches[split[i+1]]; ok { + m.matches["beegfs_cstorage_"+split[i+1]] = split[i] + //m.matches[split[i+1]] = split[i] + } else { + f1, err := strconv.ParseFloat(m.matches["other"], 32) + if err != nil { + cclog.ComponentError( + m.name, + fmt.Sprintf("Metric (other): Failed to convert str written '%s' to float: %v", m.matches["other"], err)) + continue + } + f2, err := strconv.ParseFloat(split[i], 32) + if err != nil { + cclog.ComponentError( + m.name, + fmt.Sprintf("Metric (other): Failed to convert str written '%s' to float: %v", m.matches["other"], err)) + continue + } + m.matches["beegfs_cstorage_other"] = fmt.Sprintf("%f", f1+f2) + } + } + + for key, data := range m.matches { + value, _ := strconv.ParseFloat(data, 32) + y, err := lp.New(key, m.tags, m.meta, map[string]interface{}{"value": value}, time.Now()) + if err == nil { + output <- y + } + } + } + } +} + +func (m *BeegfsStorageCollector) Close() { + m.init = false +} diff --git a/collectors/beegfsstorageMetric.md b/collectors/beegfsstorageMetric.md new file mode 100644 index 0000000..519b5bf --- /dev/null +++ b/collectors/beegfsstorageMetric.md @@ -0,0 +1,55 @@ +## `BeeGFS on Demand` collector +This Collector is to collect BeeGFS on Demand (BeeOND) storage stats. + +```json + "beegfs_storage": { + "beegfs_path": "/usr/bin/beegfs-ctl", + "exclude_filesystem": [ + "/mnt/ignore_me" + ], + "exclude_metrics": [ + "ack", + "storInf", + "unlnk" + ] + } +``` + +The `BeeGFS On Demand (BeeOND)` collector uses the `beegfs-ctl` command to read performance metrics for BeeGFS filesystems. + +The reported filesystems can be filtered with the `exclude_filesystem` option +in the configuration. + +The path to the `beegfs-ctl` command can be configured with the `beegfs_path` option +in the configuration. + +When using the `exclude_metrics` option, the excluded metrics are summed as `other`. + +Important: The metrics listed below, are similar to the naming of BeeGFS. The Collector prefixes these with `beegfs_cstorage_`(beegfs client meta). +For example beegfs metric `open`-> `beegfs_cstorage_` + +Note: BeeGFS FS offers many Metadata Information. Probably it makes sense to exlcude most of them. Nevertheless, these excluded metrics will be summed as `beegfs_cstorage_other`. + +Available Metrics: + +* "sum" +* "ack" +* "sChDrct" +* "getFSize" +* "sAttr" +* "statfs" +* "trunc" +* "close" +* "fsync" +* "ops-rd" +* "MiB-rd/s" +* "ops-wr" +* "MiB-wr/s" +* "endbg" +* "hrtbeat" +* "remNode" +* "storInf" +* "unlnk" + + +The collector adds a `filesystem` tag to all metrics \ No newline at end of file diff --git a/collectors/collectorManager.go b/collectors/collectorManager.go index 86b423e..e9ccfe7 100644 --- a/collectors/collectorManager.go +++ b/collectors/collectorManager.go @@ -19,7 +19,6 @@ var AvailableCollectors = map[string]MetricCollector{ "memstat": new(MemstatCollector), "netstat": new(NetstatCollector), "ibstat": new(InfinibandCollector), - "ibstat_perfquery": new(InfinibandPerfQueryCollector), "lustrestat": new(LustreCollector), "cpustat": new(CpustatCollector), "topprocs": new(TopProcsCollector), @@ -35,6 +34,8 @@ var AvailableCollectors = map[string]MetricCollector{ "nfs3stat": new(Nfs3Collector), "nfs4stat": new(Nfs4Collector), "numastats": new(NUMAStatsCollector), + "beegfs_meta": new(BeegfsMetaCollector), + "beegfs_storage": new(BeegfsStorageCollector), } // Metric collector manager data structure diff --git a/collectors/gpfsMetric.go b/collectors/gpfsMetric.go index adbc7fb..453704c 100644 --- a/collectors/gpfsMetric.go +++ b/collectors/gpfsMetric.go @@ -17,6 +17,8 @@ import ( lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" ) +const DEFAULT_GPFS_CMD = `mmpmon` + type GpfsCollector struct { metricCollector tags map[string]string @@ -38,7 +40,7 @@ func (m *GpfsCollector) Init(config json.RawMessage) error { m.setup() // Set default mmpmon binary - m.config.Mmpmon = "/usr/lpp/mmfs/bin/mmpmon" + m.config.Mmpmon = string(DEFAULT_GPFS_CMD) // Read JSON configuration if len(config) > 0 { @@ -64,17 +66,18 @@ func (m *GpfsCollector) Init(config json.RawMessage) error { // GPFS / IBM Spectrum Scale file system statistics can only be queried by user root user, err := user.Current() if err != nil { - return fmt.Errorf("Failed to get current user: %v", err) + return fmt.Errorf("failed to get current user: %v", err) } if user.Uid != "0" { return fmt.Errorf("GPFS file system statistics can only be queried by user root") } // Check if mmpmon is in executable search path - _, err = exec.LookPath(m.config.Mmpmon) + p, err := exec.LookPath(m.config.Mmpmon) if err != nil { - return fmt.Errorf("Failed to find mmpmon binary '%s': %v", m.config.Mmpmon, err) + return fmt.Errorf("failed to find mmpmon binary '%s': %v", m.config.Mmpmon, err) } + m.config.Mmpmon = p m.init = true return nil diff --git a/collectors/gpfsMetric.md b/collectors/gpfsMetric.md index 4a6a058..4f2c897 100644 --- a/collectors/gpfsMetric.md +++ b/collectors/gpfsMetric.md @@ -16,7 +16,7 @@ The reported filesystems can be filtered with the `exclude_filesystem` option in the configuration. The path to the `mmpmon` command can be configured with the `mmpmon_path` option -in the configuration. +in the configuration. If nothing is set, the collector searches in `$PATH` for `mmpmon`. Metrics: * `bytes_read` diff --git a/collectors/infinibandPerfQueryMetric.go b/collectors/infinibandPerfQueryMetric.go deleted file mode 100644 index 72f701f..0000000 --- a/collectors/infinibandPerfQueryMetric.go +++ /dev/null @@ -1,232 +0,0 @@ -package collectors - -import ( - "fmt" - "io/ioutil" - "log" - "os/exec" - - lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" - - // "os" - "encoding/json" - "errors" - "path/filepath" - "strconv" - "strings" - "time" -) - -const PERFQUERY = `/usr/sbin/perfquery` - -type InfinibandPerfQueryCollector struct { - metricCollector - tags map[string]string - lids map[string]map[string]string - config struct { - ExcludeDevices []string `json:"exclude_devices,omitempty"` - PerfQueryPath string `json:"perfquery_path"` - } -} - -func (m *InfinibandPerfQueryCollector) Init(config json.RawMessage) error { - var err error - m.name = "InfinibandCollectorPerfQuery" - m.setup() - m.meta = map[string]string{"source": m.name, "group": "Network"} - m.tags = map[string]string{"type": "node"} - if len(config) > 0 { - err = json.Unmarshal(config, &m.config) - if err != nil { - return err - } - } - if len(m.config.PerfQueryPath) == 0 { - path, err := exec.LookPath("perfquery") - if err == nil { - m.config.PerfQueryPath = path - } - } - m.lids = make(map[string]map[string]string) - p := fmt.Sprintf("%s/*/ports/*/lid", string(IB_BASEPATH)) - files, err := filepath.Glob(p) - if err != nil { - return err - } - for _, f := range files { - lid, err := ioutil.ReadFile(f) - if err == nil { - plist := strings.Split(strings.Replace(f, string(IB_BASEPATH), "", -1), "/") - skip := false - for _, d := range m.config.ExcludeDevices { - if d == plist[0] { - skip = true - } - } - if !skip { - m.lids[plist[0]] = make(map[string]string) - m.lids[plist[0]][plist[2]] = string(lid) - } - } - } - - for _, ports := range m.lids { - for port, lid := range ports { - args := fmt.Sprintf("-r %s %s 0xf000", lid, port) - command := exec.Command(m.config.PerfQueryPath, args) - command.Wait() - _, err := command.Output() - if err != nil { - return fmt.Errorf("Failed to execute %s: %v", m.config.PerfQueryPath, err) - } - } - } - - if len(m.lids) == 0 { - return errors.New("No usable IB devices") - } - - m.init = true - return nil -} - -func (m *InfinibandPerfQueryCollector) doPerfQuery(cmd string, dev string, lid string, port string, tags map[string]string, output chan lp.CCMetric) error { - - args := fmt.Sprintf("-r %s %s 0xf000", lid, port) - command := exec.Command(cmd, args) - command.Wait() - stdout, err := command.Output() - if err != nil { - log.Print(err) - return err - } - ll := strings.Split(string(stdout), "\n") - - for _, line := range ll { - if strings.HasPrefix(line, "PortRcvData") || strings.HasPrefix(line, "RcvData") { - lv := strings.Fields(line) - v, err := strconv.ParseFloat(lv[1], 64) - if err == nil { - y, err := lp.New("ib_recv", tags, m.meta, map[string]interface{}{"value": float64(v)}, time.Now()) - if err == nil { - output <- y - } - } - } - if strings.HasPrefix(line, "PortXmitData") || strings.HasPrefix(line, "XmtData") { - lv := strings.Fields(line) - v, err := strconv.ParseFloat(lv[1], 64) - if err == nil { - y, err := lp.New("ib_xmit", tags, m.meta, map[string]interface{}{"value": float64(v)}, time.Now()) - if err == nil { - output <- y - } - } - } - if strings.HasPrefix(line, "PortRcvPkts") || strings.HasPrefix(line, "RcvPkts") { - lv := strings.Fields(line) - v, err := strconv.ParseFloat(lv[1], 64) - if err == nil { - y, err := lp.New("ib_recv_pkts", tags, m.meta, map[string]interface{}{"value": float64(v)}, time.Now()) - if err == nil { - output <- y - } - } - } - if strings.HasPrefix(line, "PortXmitPkts") || strings.HasPrefix(line, "XmtPkts") { - lv := strings.Fields(line) - v, err := strconv.ParseFloat(lv[1], 64) - if err == nil { - y, err := lp.New("ib_xmit_pkts", tags, m.meta, map[string]interface{}{"value": float64(v)}, time.Now()) - if err == nil { - output <- y - } - } - } - if strings.HasPrefix(line, "PortRcvPkts") || strings.HasPrefix(line, "RcvPkts") { - lv := strings.Fields(line) - v, err := strconv.ParseFloat(lv[1], 64) - if err == nil { - y, err := lp.New("ib_recv_pkts", tags, m.meta, map[string]interface{}{"value": float64(v)}, time.Now()) - if err == nil { - output <- y - } - } - } - if strings.HasPrefix(line, "PortXmitPkts") || strings.HasPrefix(line, "XmtPkts") { - lv := strings.Fields(line) - v, err := strconv.ParseFloat(lv[1], 64) - if err == nil { - y, err := lp.New("ib_xmit_pkts", tags, m.meta, map[string]interface{}{"value": float64(v)}, time.Now()) - if err == nil { - output <- y - } - } - } - } - return nil -} - -func (m *InfinibandPerfQueryCollector) Read(interval time.Duration, output chan lp.CCMetric) { - - if m.init { - for dev, ports := range m.lids { - for port, lid := range ports { - tags := map[string]string{ - "type": "node", - "device": dev, - "port": port, - "lid": lid} - path := fmt.Sprintf("%s/%s/ports/%s/counters/", string(IB_BASEPATH), dev, port) - buffer, err := ioutil.ReadFile(fmt.Sprintf("%s/port_rcv_data", path)) - if err == nil { - data := strings.Replace(string(buffer), "\n", "", -1) - v, err := strconv.ParseFloat(data, 64) - if err == nil { - y, err := lp.New("ib_recv", tags, m.meta, map[string]interface{}{"value": float64(v)}, time.Now()) - if err == nil { - output <- y - } - } - } - buffer, err = ioutil.ReadFile(fmt.Sprintf("%s/port_xmit_data", path)) - if err == nil { - data := strings.Replace(string(buffer), "\n", "", -1) - v, err := strconv.ParseFloat(data, 64) - if err == nil { - y, err := lp.New("ib_xmit", tags, m.meta, map[string]interface{}{"value": float64(v)}, time.Now()) - if err == nil { - output <- y - } - } - } - buffer, err = ioutil.ReadFile(fmt.Sprintf("%s/port_rcv_packets", path)) - if err == nil { - data := strings.Replace(string(buffer), "\n", "", -1) - v, err := strconv.ParseFloat(data, 64) - if err == nil { - y, err := lp.New("ib_recv_pkts", tags, m.meta, map[string]interface{}{"value": float64(v)}, time.Now()) - if err == nil { - output <- y - } - } - } - buffer, err = ioutil.ReadFile(fmt.Sprintf("%s/port_xmit_packets", path)) - if err == nil { - data := strings.Replace(string(buffer), "\n", "", -1) - v, err := strconv.ParseFloat(data, 64) - if err == nil { - y, err := lp.New("ib_xmit_pkts", tags, m.meta, map[string]interface{}{"value": float64(v)}, time.Now()) - if err == nil { - output <- y - } - } - } - } - } - } -} - -func (m *InfinibandPerfQueryCollector) Close() { - m.init = false -} diff --git a/collectors/infinibandPerfQueryMetric.md b/collectors/infinibandPerfQueryMetric.md deleted file mode 100644 index 2147963..0000000 --- a/collectors/infinibandPerfQueryMetric.md +++ /dev/null @@ -1,28 +0,0 @@ - -## `ibstat_perfquery` collector - -```json - "ibstat_perfquery": { - "perfquery_path": "/path/to/perfquery", - "exclude_devices": [ - "mlx4" - ] - } -``` - -The `ibstat_perfquery` collector includes all Infiniband devices that can be -found below `/sys/class/infiniband/` and where any of the ports provides a -LID file (`/sys/class/infiniband//ports//lid`) - -The devices can be filtered with the `exclude_devices` option in the configuration. - -For each found LID the collector calls the `perfquery` command. The path to the -`perfquery` command can be configured with the `perfquery_path` option in the configuration - -Metrics: -* `ib_recv` -* `ib_xmit` -* `ib_recv_pkts` -* `ib_xmit_pkts` - -The collector adds a `device` tag to all metrics diff --git a/collectors/likwidMetric.go b/collectors/likwidMetric.go index 8626d7c..8ab42d5 100644 --- a/collectors/likwidMetric.go +++ b/collectors/likwidMetric.go @@ -2,7 +2,7 @@ package collectors /* #cgo CFLAGS: -I./likwid -#cgo LDFLAGS: -L./likwid -llikwid -llikwid-hwloc -lm -Wl,--unresolved-symbols=ignore-in-object-files +#cgo LDFLAGS: -Wl,--unresolved-symbols=ignore-in-object-files #include #include */ @@ -71,8 +71,9 @@ func GetAllMetricScopes() []MetricScope { } const ( - LIKWID_LIB_NAME = "liblikwid.so" - LIKWID_LIB_DL_FLAGS = dl.RTLD_LAZY | dl.RTLD_GLOBAL + LIKWID_LIB_NAME = "liblikwid.so" + LIKWID_LIB_DL_FLAGS = dl.RTLD_LAZY | dl.RTLD_GLOBAL + LIKWID_DEF_ACCESSMODE = "direct" ) type LikwidCollectorMetricConfig struct { @@ -95,6 +96,8 @@ type LikwidCollectorConfig struct { Metrics []LikwidCollectorMetricConfig `json:"globalmetrics,omitempty"` ForceOverwrite bool `json:"force_overwrite,omitempty"` InvalidToZero bool `json:"invalid_to_zero,omitempty"` + AccessMode string `json:"access_mode,omitempty"` + DaemonPath string `json:"accessdaemon_path,omitempty"` } type LikwidCollector struct { @@ -260,6 +263,7 @@ func (m *LikwidCollector) getResponsiblities() map[MetricScope]map[int]int { func (m *LikwidCollector) Init(config json.RawMessage) error { var ret C.int m.name = "LikwidCollector" + m.config.AccessMode = LIKWID_DEF_ACCESSMODE if len(config) > 0 { err := json.Unmarshal(config, &m.config) if err != nil { @@ -270,6 +274,11 @@ func (m *LikwidCollector) Init(config json.RawMessage) error { if lib == nil { return fmt.Errorf("error instantiating DynamicLibrary for %s", LIKWID_LIB_NAME) } + err := lib.Open() + if err != nil { + return fmt.Errorf("error opening %s: %v", LIKWID_LIB_NAME, err) + } + if m.config.ForceOverwrite { cclog.ComponentDebug(m.name, "Set LIKWID_FORCE=1") os.Setenv("LIKWID_FORCE", "1") @@ -301,6 +310,16 @@ func (m *LikwidCollector) Init(config json.RawMessage) error { m.initGranularity() // Generate map for MetricScope -> scope_id (like socket id) -> responsible id (offset in cpulist) m.scopeRespTids = m.getResponsiblities() + switch m.config.AccessMode { + case "direct": + C.HPMmode(0) + case "accessdaemon": + if len(m.config.DaemonPath) > 0 { + p := os.Getenv("PATH") + os.Setenv("PATH", m.config.DaemonPath+":"+p) + } + C.HPMmode(1) + } cclog.ComponentDebug(m.name, "initialize LIKWID perfmon module") ret = C.perfmon_init(C.int(len(m.cpulist)), &m.cpulist[0]) diff --git a/collectors/likwidMetric.md b/collectors/likwidMetric.md index 3ef51f3..1aa4242 100644 --- a/collectors/likwidMetric.md +++ b/collectors/likwidMetric.md @@ -8,6 +8,8 @@ The `likwid` configuration consists of two parts, the "eventsets" and "globalmet - The global metrics are metrics which require data from all event set measurements to be derived. The inputs are the metrics in the event sets. Similar to the metrics in the event sets, the global metrics are defined by a name, a formula, a scope and a publish flag. See event set metrics for details. The only difference is that there is no access to the raw event measurements anymore but only to the metrics. So, the idea is to derive a metric in the "eventsets" section and reuse it in the "globalmetrics" part. If you need a metric only for deriving the global metrics, disable forwarding of the event set metrics. **Be aware** that the combination might be misleading because the "behavior" of a metric changes over time and the multiple measurements might count different computing phases. Additional options: +- `access_mode` : Method to use for hardware performance monitoring (`direct` access as root user, `accessdaemon` for the daemon mode) +- `accessdaemon_path`: Folder with the access daemon `likwid-accessD`, commonly `$LIKWID_INSTALL_LOC/sbin` - `force_overwrite`: Same as setting `LIKWID_FORCE=1`. In case counters are already in-use, LIKWID overwrites their configuration to do its measurements - `invalid_to_zero`: In some cases, the calculations result in `NaN` or `Inf`. With this option, all `NaN` and `Inf` values are replaces with `0.0`. @@ -61,7 +63,21 @@ $ scripts/likwid_perfgroup_to_cc_config.py ICX MEM_DP } ``` -You can copy this JSON and add it to the `eventsets` list. If you specify multiple event sets, you can add globally derived metrics in the extra `global_metrics` section with the metric names as variables. +You can copy this JSON and add it to the `eventsets` list. If you specify multiple event sets, you can add globally derived metrics in the extra `global_metrics` section with the metric names as variables. + +### Mixed usage between daemon and users + +LIKWID checks the file `/var/run/likwid.lock` before performing any interfering operations. Who is allowed to access the counters is determined by the owner of the file. If it does not exist, it is created for the current user. So, if you want to temporarly allow counter access to a user (e.g. in a job): + +Before (SLURM prolog, ...) +``` +$ chwon $JOBUSER /var/run/likwid.lock +``` + +After (SLURM epilog, ...) +``` +$ chwon $CCUSER /var/run/likwid.lock +``` ### Example configuration diff --git a/collectors/lustreMetric.go b/collectors/lustreMetric.go index 6d6fe26..66fd3fd 100644 --- a/collectors/lustreMetric.go +++ b/collectors/lustreMetric.go @@ -22,6 +22,7 @@ type LustreCollectorConfig struct { LCtlCommand string `json:"lctl_command"` ExcludeMetrics []string `json:"exclude_metrics"` SendAllMetrics bool `json:"send_all_metrics"` + Sudo bool `json:"use_sudo"` } type LustreCollector struct { @@ -31,11 +32,17 @@ type LustreCollector struct { stats map[string]map[string]int64 config LustreCollectorConfig lctl string + sudoCmd string } func (m *LustreCollector) getDeviceDataCommand(device string) []string { + var command *exec.Cmd statsfile := fmt.Sprintf("llite.%s.stats", device) - command := exec.Command(m.lctl, LCTL_OPTION, statsfile) + if m.config.Sudo { + command = exec.Command(m.sudoCmd, m.lctl, LCTL_OPTION, statsfile) + } else { + command = exec.Command(m.lctl, LCTL_OPTION, statsfile) + } command.Wait() stdout, _ := command.Output() return strings.Split(string(stdout), "\n") @@ -103,14 +110,16 @@ func (m *LustreCollector) Init(config json.RawMessage) error { "inode_permission": {"lustre_inode_permission": 1}} // Lustre file system statistics can only be queried by user root - user, err := user.Current() - if err != nil { - cclog.ComponentError(m.name, "Failed to get current user:", err.Error()) - return err - } - if user.Uid != "0" { - cclog.ComponentError(m.name, "Lustre file system statistics can only be queried by user root:", err.Error()) - return err + if !m.config.Sudo { + user, err := user.Current() + if err != nil { + cclog.ComponentError(m.name, "Failed to get current user:", err.Error()) + return err + } + if user.Uid != "0" { + cclog.ComponentError(m.name, "Lustre file system statistics can only be queried by user root") + return err + } } m.matches = make(map[string]map[string]int) @@ -136,6 +145,12 @@ func (m *LustreCollector) Init(config json.RawMessage) error { } } m.lctl = p + if m.config.Sudo { + p, err := exec.LookPath("sudo") + if err != nil { + m.sudoCmd = p + } + } devices := m.getDevices() if len(devices) == 0 { diff --git a/collectors/metricCollector.go b/collectors/metricCollector.go index c71ae16..747772f 100644 --- a/collectors/metricCollector.go +++ b/collectors/metricCollector.go @@ -13,29 +13,30 @@ import ( ) type MetricCollector interface { - Name() string - Init(config json.RawMessage) error - Initialized() bool - Read(duration time.Duration, output chan lp.CCMetric) - Close() + Name() string // Name of the metric collector + Init(config json.RawMessage) error // Initialize metric collector + Initialized() bool // Is metric collector initialized? + Read(duration time.Duration, output chan lp.CCMetric) // Read metrics from metric collector + Close() // Close / finish metric collector } type metricCollector struct { - name string - init bool - meta map[string]string + name string // name of the metric + init bool // is metric collector initialized? + meta map[string]string // static meta data tags } -// Name() returns the name of the metric collector +// Name returns the name of the metric collector func (c *metricCollector) Name() string { return c.name } +// Setup is for future use func (c *metricCollector) setup() error { return nil } -// Initialized() indicates whether the metric collector has been initialized. +// Initialized indicates whether the metric collector has been initialized func (c *metricCollector) Initialized() bool { return c.init } @@ -64,6 +65,7 @@ func stringArrayContains(array []string, str string) (int, bool) { return -1, false } +// SocketList returns the list of physical sockets as read from /proc/cpuinfo func SocketList() []int { buffer, err := ioutil.ReadFile("/proc/cpuinfo") if err != nil { @@ -89,6 +91,7 @@ func SocketList() []int { return packs } +// CpuList returns the list of physical CPUs (in contrast to logical CPUs) as read from /proc/cpuinfo func CpuList() []int { buffer, err := ioutil.ReadFile("/proc/cpuinfo") if err != nil { @@ -117,8 +120,8 @@ func CpuList() []int { // RemoveFromStringList removes the string r from the array of strings s // If r is not contained in the array an error is returned func RemoveFromStringList(s []string, r string) ([]string, error) { - for i, item := range s { - if r == item { + for i := range s { + if r == s[i] { return append(s[:i], s[i+1:]...), nil } } diff --git a/collectors/sampleMetric.go b/collectors/sampleMetric.go index cd3b4cc..47078a6 100644 --- a/collectors/sampleMetric.go +++ b/collectors/sampleMetric.go @@ -22,6 +22,13 @@ type SampleCollector struct { tags map[string]string // default tags } +// Functions to implement MetricCollector interface +// Init(...), Read(...), Close() +// See: metricCollector.go + +// Init initializes the sample collector +// Called once by the collector manager +// All tags, meta data tags and metrics that do not change over the runtime should be set here func (m *SampleCollector) Init(config json.RawMessage) error { var err error = nil // Always set the name early in Init() to use it in cclog.Component* functions @@ -56,12 +63,14 @@ func (m *SampleCollector) Init(config json.RawMessage) error { return err } +// Read collects all metrics belonging to the sample collector +// and sends them through the output channel to the collector manager func (m *SampleCollector) Read(interval time.Duration, output chan lp.CCMetric) { // Create a sample metric timestamp := time.Now() value := 1.0 - // If you want to measure something for a specific amout of time, use interval + // If you want to measure something for a specific amount of time, use interval // start := readState() // time.Sleep(interval) // stop := readState() @@ -75,6 +84,8 @@ func (m *SampleCollector) Read(interval time.Duration, output chan lp.CCMetric) } +// Close metric collector: close network connection, close files, close libraries, ... +// Called once by the collector manager func (m *SampleCollector) Close() { // Unset flag m.init = false diff --git a/docs/configuration.md b/docs/configuration.md new file mode 100644 index 0000000..a13d3a5 --- /dev/null +++ b/docs/configuration.md @@ -0,0 +1,189 @@ +# Configuring the CC metric collector + +The configuration of the CC metric collector consists of five configuration files: one global file and four component related files. + +## Global configuration + +The global file contains the paths to the other four files and some global options. + +```json +{ + "sinks": "sinks.json", + "collectors" : "collectors.json", + "receivers" : "receivers.json", + "router" : "router.json", + "interval": 10, + "duration": 1 +} +``` + +Be aware that the paths are relative to the execution folder of the cc-metric-collector binary, so it is recommended to use absolute paths. + +## Component configuration + +The others are mainly list of of subcomponents: the collectors, the receivers, the router and the sinks. Their role is best shown in a picture: + +```mermaid +flowchart LR + + subgraph col ["Collectors"] + direction TB + cpustat["cpustat"] + memstat["memstat"] + tempstat["tempstat"] + misc["..."] + end + + subgraph Receivers ["Receivers"] + direction TB + nats["NATS"] + httprecv["HTTP"] + miscrecv[...] + end + + subgraph calc["Aggregator"] + direction LR + cache["Cache"] + agg["Calculator"] + end + + subgraph sinks ["Sinks"] + direction RL + influx["InfluxDB"] + ganglia["Ganglia"] + logger["Logfile"] + miscsink["..."] + end + + cpustat --> CollectorManager["CollectorManager"] + memstat --> CollectorManager + tempstat --> CollectorManager + misc --> CollectorManager + + nats --> ReceiverManager["ReceiverManager"] + httprecv --> ReceiverManager + miscrecv --> ReceiverManager + + CollectorManager --> newrouter["Router"] + ReceiverManager -.-> newrouter + calc -.-> newrouter + newrouter --> SinkManager["SinkManager"] + newrouter -.-> calc + + SinkManager --> influx + SinkManager --> ganglia + SinkManager --> logger + SinkManager --> miscsink + + +``` + +There are four parts: +- The collectors read data from files, execute commands and call dynamically loaded library function and send it to the router +- The router can process metrics by cacheing and evaluating functions and conditions on them +- The sinks send the metrics to storage backends +- The receivers can be used to receive metrics from other collectors and forward them to the router. They can be used to create a tree-like structure of collectors. + +(A maybe better differentiation between collectors and receivers is that the collectors are called periodically while the receivers have their own logic and submit metrics at any time) + + +### Collectors configuration file + +The collectors configuration file tells which metrics should be queried from the system. The metric gathering is logically grouped in so called 'Collectors'. So there are Collectors to read CPU, memory or filesystem statistics. The collectors configuration file is a list of these collectors with collector-specific configurations: + +```json +{ + "cpustat" : {}, + "diskstat": { + "exclude_metrics": [ + "disk_total" + ] + } +} +``` + +The first one is the CPU statistics collector without any collector-specific setting. The second one enables disk mount statistics but excludes the metric `disk_total`. + +All names and possible collector-specific configuration options can be found [here](../collectors/README.md). + +Some collectors might dynamically load shared libraries. In order to enable these collectors, make sure that the shared library path is part of the `LD_LIBRARY_PATH` environment variable. + +### Sinks configuration file + +The sinks define the output/sending of metrics. The metrics can be forwarded to multiple sinks, even to sinks of the same type. The sinks configuration file is a list of these sinks, each with an individual name. + +```json +{ + "myinflux" : { + "type" : "influxasync", + "host": "localhost", + "port": "8086", + "organization" : "testorga", + "database" : "testbucket", + "password" : "" + }, + "companyinflux" : { + "type" : "influxasync", + "host": "companyhost", + "port": "8086", + "organization" : "company", + "database" : "main", + "password" : "" + } +} +``` + +The above example configuration file defines two sink, both ot type `influxasync`. They are differentiated internally by the names: `myinflux` and `companyinflux`. + +All types and possible sink-specific configuration options can be found [here](../sinks/README.md). + +Some sinks might dynamically load shared libraries. In order to enable these sinks, make sure that the shared library path is part of the `LD_LIBRARY_PATH` environment variable. + +### Router configuration file + +The collectors and the sinks are connected through the router. The router forwards the metrics to the sinks but enables some data processing. A common example is to tag all passing metrics like adding `cluster=mycluster`. But also aggregations like "take the average of all 'ipc' metrics" (ipc -> Instructions Per Cycle). Since the configurations of these aggregations can be quite complicated, we refer to the router's [README](../internal/metricRouter/README.md). + +A simple router configuration file to start with looks like this: + +```json +{ + "add_tags" : [ + { + "key" : "cluster", + "value" : "mycluster", + "if" : "*" + } + ], + "interval_timestamp" : false, + "num_cache_intervals" : 0 +} +``` + +With the `add_tags` section, we tell to attach the `cluster=mycluster` tag to each (`*` metric). The `interval_timestamp` tell the router to not touch the timestamp of metrics. It is possible to send all metrics within an interval with a common time stamp to avoid later alignment issues. The `num_cache_intervals` diables the cache completely. The cache is only required if you want to do complex metric aggregations. + +All configuration options can be found [here](../internal/metricRouter/README.md). + +### Receivers configuration file + +The receivers are a special feature of the CC Metric Collector to enable simpler integration into exising setups. While collectors query data from the local system, the receivers commonly get data from other systems through some network technology like HTTP or NATS. The idea is keep the current setup but send it to a CC Metric Collector which forwards it to the the destination system (if a sink exists for it). For most setups, the receivers are not required and an the receiver config file should contain only an empty JSON map (`{}`). + +```json +{ + "nats_rack0": { + "type": "nats", + "address" : "nats-server.example.org", + "port" : "4222", + "subject" : "rack0", + }, + "nats_rack1": { + "type": "nats", + "address" : "nats-server.example.org", + "port" : "4222", + "subject" : "rack1", + } +} +``` + +This example configuration creates two receivers with the names `nats_rack0` and `nats_rack1`. While one subscribes to metrics published with the `rack0` subject, the other one subscribes to the `rack0` subject. The NATS server is the same as it manages all subjects in a subnet. (As example, the router could add tags `rack=0` and `rack=1` respectively to the received metrics.) + +All types and possible receiver-specific configuration options can be found [here](../receivers/README.md). diff --git a/receivers/README.md b/receivers/README.md index 808dc74..3599a93 100644 --- a/receivers/README.md +++ b/receivers/README.md @@ -9,32 +9,21 @@ The configuration file for the receivers is a list of configurations. The `type` ```json { "myreceivername" : { + "type": "receiver-type", } } ``` -## Type `nats` +This allows to specify -```json -{ - "type": "nats", - "address": "", - "port" : "", - "subject": "" -} -``` +## Available receivers -The `nats` receiver subscribes to the topic `database` and listens on `address` and `port` for metrics in the InfluxDB line protocol. +- [`nats`](./natsReceiver.md): Receive metrics from the NATS network +- [`prometheus`](./prometheusReceiver.md): Scrape data from a Prometheus client +- [`http`](./httpReceiver.md): Listen for HTTP Post requests transporting metrics in InfluxDB line protocol # Contributing own receivers A receiver contains a few functions and is derived from the type `Receiver` (in `metricReceiver.go`): -* `Start() error` -* `Close()` -* `Name() string` -* `SetSink(sink chan lp.CCMetric)` -* `New(name string, config json.RawMessage)` -The data structures should be set up in `Init()` like opening a file or server connection. The `Start()` function should either start a go routine or issue some other asynchronous mechanism for receiving metrics. The `Close()` function should tear down anything created in `Init()`. - -Finally, the receiver needs to be registered in the `receiveManager.go`. There is a list of receivers called `AvailableReceivers` which is a map (`receiver_type_string` -> `pointer to NewReceiver function`). Add a new entry with a descriptive name and the new receiver. +For an example, check the [sample receiver](./sampleReceiver.go) diff --git a/receivers/httpReceiver.go b/receivers/httpReceiver.go new file mode 100644 index 0000000..e66ad5e --- /dev/null +++ b/receivers/httpReceiver.go @@ -0,0 +1,118 @@ +package receivers + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "io/ioutil" + "net/http" + "strings" + "sync" + + cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" + lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + "github.com/gorilla/mux" + influx "github.com/influxdata/line-protocol" +) + +const HTTP_RECEIVER_PORT = "8080" + +type HttpReceiverConfig struct { + Type string `json:"type"` + Addr string `json:"address"` + Port string `json:"port"` + Path string `json:"path"` +} + +type HttpReceiver struct { + receiver + handler *influx.MetricHandler + parser *influx.Parser + meta map[string]string + config HttpReceiverConfig + router *mux.Router + server *http.Server + wg sync.WaitGroup +} + +func (r *HttpReceiver) Init(name string, config json.RawMessage) error { + r.name = fmt.Sprintf("HttpReceiver(%s)", name) + r.config.Port = HTTP_RECEIVER_PORT + if len(config) > 0 { + err := json.Unmarshal(config, &r.config) + if err != nil { + cclog.ComponentError(r.name, "Error reading config:", err.Error()) + return err + } + } + if len(r.config.Port) == 0 { + return errors.New("not all configuration variables set required by HttpReceiver") + } + r.meta = map[string]string{"source": r.name} + p := r.config.Path + if !strings.HasPrefix(p, "/") { + p = "/" + p + } + uri := fmt.Sprintf("%s:%s%s", r.config.Addr, r.config.Port, p) + cclog.ComponentDebug(r.name, "INIT", uri) + r.handler = influx.NewMetricHandler() + r.parser = influx.NewParser(r.handler) + r.parser.SetTimeFunc(DefaultTime) + + r.router = mux.NewRouter() + r.router.Path(p).HandlerFunc(r.ServerHttp) + r.server = &http.Server{Addr: uri, Handler: r.router} + return nil +} + +func (r *HttpReceiver) Start() { + cclog.ComponentDebug(r.name, "START") + r.wg.Add(1) + go func() { + err := r.server.ListenAndServe() + if err != nil && err.Error() != "http: Server closed" { + cclog.ComponentError(r.name, err.Error()) + } + r.wg.Done() + }() +} + +func (r *HttpReceiver) ServerHttp(w http.ResponseWriter, req *http.Request) { + if req.Method != http.MethodPost { + http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed) + return + } + + body, err := ioutil.ReadAll(req.Body) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + metrics, err := r.parser.Parse(body) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + } + + for _, m := range metrics { + y := lp.FromInfluxMetric(m) + for k, v := range r.meta { + y.AddMeta(k, v) + } + if r.sink != nil { + r.sink <- y + } + } + + w.WriteHeader(http.StatusOK) +} + +func (r *HttpReceiver) Close() { + r.server.Shutdown(context.Background()) +} + +func NewHttpReceiver(name string, config json.RawMessage) (Receiver, error) { + r := new(HttpReceiver) + err := r.Init(name, config) + return r, err +} diff --git a/receivers/httpReceiver.md b/receivers/httpReceiver.md new file mode 100644 index 0000000..ed1e1bf --- /dev/null +++ b/receivers/httpReceiver.md @@ -0,0 +1,23 @@ +## `http` receiver + +The `http` receiver can be used receive metrics through HTTP POST requests. + +### Configuration structure + +```json +{ + "": { + "type": "http", + "address" : "", + "port" : "8080", + "path" : "/write" + } +} +``` + +- `type`: makes the receiver a `http` receiver +- `address`: Listen address +- `port`: Listen port +- `path`: URL path for the write endpoint + +The HTTP endpoint listens to `http://
:/` diff --git a/receivers/metricReceiver.go b/receivers/metricReceiver.go index e133354..6b85fd4 100644 --- a/receivers/metricReceiver.go +++ b/receivers/metricReceiver.go @@ -8,6 +8,7 @@ type defaultReceiverConfig struct { Type string `json:"type"` } +// Receiver configuration: Listen address, port type ReceiverConfig struct { Addr string `json:"address"` Port string `json:"port"` @@ -23,15 +24,17 @@ type receiver struct { type Receiver interface { Start() - Close() - Name() string - SetSink(sink chan lp.CCMetric) + Close() // Close / finish metric receiver + Name() string // Name of the metric receiver + SetSink(sink chan lp.CCMetric) // Set sink channel } +// Name returns the name of the metric receiver func (r *receiver) Name() string { return r.name } +// SetSink set the sink channel func (r *receiver) SetSink(sink chan lp.CCMetric) { r.sink = sink } diff --git a/receivers/natsReceiver.md b/receivers/natsReceiver.md new file mode 100644 index 0000000..4a7b7a4 --- /dev/null +++ b/receivers/natsReceiver.md @@ -0,0 +1,21 @@ +## `nats` receiver + +The `nats` receiver can be used receive metrics from the NATS network. The `nats` receiver subscribes to the topic `database` and listens on `address` and `port` for metrics in the InfluxDB line protocol. + +### Configuration structure + +```json +{ + "": { + "type": "nats", + "address" : "nats-server.example.org", + "port" : "4222", + "subject" : "subject" + } +} +``` + +- `type`: makes the receiver a `nats` receiver +- `address`: Address of the NATS control server +- `port`: Port of the NATS control server +- `subject`: Subscribes to this subject and receive metrics diff --git a/receivers/prometheusReceiver.md b/receivers/prometheusReceiver.md index 5fe6e46..83de7f6 100644 --- a/receivers/prometheusReceiver.md +++ b/receivers/prometheusReceiver.md @@ -10,7 +10,7 @@ The `prometheus` receiver can be used to scrape the metrics of a single `prometh "type": "prometheus", "address" : "testpromhost", "port" : "12345", - "port" : "/prometheus", + "path" : "/prometheus", "interval": "5s", "ssl" : true, } diff --git a/receivers/sampleReceiver.go b/receivers/sampleReceiver.go index e9edc90..2892d56 100644 --- a/receivers/sampleReceiver.go +++ b/receivers/sampleReceiver.go @@ -7,6 +7,7 @@ import ( cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" ) +// SampleReceiver configuration: receiver type, listen address, port type SampleReceiverConfig struct { Type string `json:"type"` Addr string `json:"address"` @@ -24,6 +25,10 @@ type SampleReceiver struct { // wg sync.WaitGroup } +// Implement functions required for Receiver interface +// Start(), Close() +// See: metricReceiver.go + func (r *SampleReceiver) Start() { cclog.ComponentDebug(r.name, "START") @@ -44,6 +49,7 @@ func (r *SampleReceiver) Start() { // }() } +// Close receiver: close network connection, close files, close libraries, ... func (r *SampleReceiver) Close() { cclog.ComponentDebug(r.name, "CLOSE") @@ -54,13 +60,21 @@ func (r *SampleReceiver) Close() { // r.wg.Wait() } +// New function to create a new instance of the receiver +// Initialize the receiver by giving it a name and reading in the config JSON func NewSampleReceiver(name string, config json.RawMessage) (Receiver, error) { r := new(SampleReceiver) - r.name = fmt.Sprintf("HttpReceiver(%s)", name) + + // Set name of SampleReceiver + // The name should be chosen in such a way that different instances of SampleReceiver can be distinguished + r.name = fmt.Sprintf("SampleReceiver(%s)", name) // Set static information r.meta = map[string]string{"source": r.name} + // Set defaults in r.config + // Allow overwriting these defaults by reading config JSON + // Read the sample receiver specific JSON config if len(config) > 0 { err := json.Unmarshal(config, &r.config) diff --git a/scripts/cc-metric-collector.spec b/scripts/cc-metric-collector.spec index 65a9b55..9d55b4f 100644 --- a/scripts/cc-metric-collector.spec +++ b/scripts/cc-metric-collector.spec @@ -1,5 +1,5 @@ Name: cc-metric-collector -Version: 0.2 +Version: %{VERS} Release: 1%{?dist} Summary: Metric collection daemon from the ClusterCockpit suite @@ -7,8 +7,9 @@ License: MIT Source0: %{name}-%{version}.tar.gz BuildRequires: go-toolset -# for internal LIKWID installation -BuildRequires: wget perl-Data-Dumper +BuildRequires: systemd-rpm-macros +# for header downloads +BuildRequires: wget Provides: %{name} = %{version} @@ -34,11 +35,15 @@ install -Dpm 0600 receivers.json %{buildroot}%{_sysconfdir}/%{name}/receivers.js install -Dpm 0600 router.json %{buildroot}%{_sysconfdir}/%{name}/router.json install -Dpm 0644 scripts/%{name}.service %{buildroot}%{_unitdir}/%{name}.service install -Dpm 0600 scripts/%{name}.config %{buildroot}%{_sysconfdir}/default/%{name} +install -Dpm 0644 scripts/%{name}.sysusers %{buildroot}%{_sysusersdir}/%{name}.conf %check # go test should be here... :) +%pre +%sysusers_create_package scripts/%{name}.sysusers + %post %systemd_post %{name}.service @@ -46,17 +51,23 @@ install -Dpm 0600 scripts/%{name}.config %{buildroot}%{_sysconfdir}/default/%{na %systemd_preun %{name}.service %files +# Binary +%attr(-,clustercockpit,clustercockpit) %{_sbindir}/%{name} +# Config %dir %{_sysconfdir}/%{name} -%{_sbindir}/%{name} +%attr(0600,clustercockpit,clustercockpit) %config(noreplace) %{_sysconfdir}/%{name}/%{name}.json +%attr(0600,clustercockpit,clustercockpit) %config(noreplace) %{_sysconfdir}/%{name}/collectors.json +%attr(0600,clustercockpit,clustercockpit) %config(noreplace) %{_sysconfdir}/%{name}/sinks.json +%attr(0600,clustercockpit,clustercockpit) %config(noreplace) %{_sysconfdir}/%{name}/receivers.json +%attr(0600,clustercockpit,clustercockpit) %config(noreplace) %{_sysconfdir}/%{name}/router.json +# Systemd %{_unitdir}/%{name}.service %{_sysconfdir}/default/%{name} -%attr(0600,root,root) %config(noreplace) %{_sysconfdir}/%{name}/%{name}.json -%attr(0600,root,root) %config(noreplace) %{_sysconfdir}/%{name}/collectors.json -%attr(0600,root,root) %config(noreplace) %{_sysconfdir}/%{name}/sinks.json -%attr(0600,root,root) %config(noreplace) %{_sysconfdir}/%{name}/receivers.json -%attr(0600,root,root) %config(noreplace) %{_sysconfdir}/%{name}/router.json +%{_sysusersdir}/%{name}.conf %changelog +* Thu Mar 03 2022 Thomas Gruber - 0.3 +- Add clustercockpit user installation * Mon Feb 14 2022 Thomas Gruber - 0.2 - Add component specific configuration files - Add %attr to config files diff --git a/scripts/cc-metric-collector.sysusers b/scripts/cc-metric-collector.sysusers new file mode 100644 index 0000000..6ce3700 --- /dev/null +++ b/scripts/cc-metric-collector.sysusers @@ -0,0 +1,2 @@ +#Type Name ID GECOS Home directory Shell +u clustercockpit - "User for ClusterCockpit" /run/cc-metric-collector /sbin/nologin diff --git a/sinks/metricSink.go b/sinks/metricSink.go index 8fe02d7..d5356d0 100644 --- a/sinks/metricSink.go +++ b/sinks/metricSink.go @@ -10,17 +10,18 @@ type defaultSinkConfig struct { } type sink struct { - meta_as_tags bool - name string + meta_as_tags bool // Use meta data tags as tags + name string // Name of the sink } type Sink interface { - Write(point lp.CCMetric) error - Flush() error - Close() - Name() string + Write(point lp.CCMetric) error // Write metric to the sink + Flush() error // Flush buffered metrics + Close() // Close / finish metric sink + Name() string // Name of the metric sink } +// Name returns the name of the metric sink func (s *sink) Name() string { return s.name } diff --git a/sinks/sampleSink.go b/sinks/sampleSink.go index be0196a..3913a29 100644 --- a/sinks/sampleSink.go +++ b/sinks/sampleSink.go @@ -10,15 +10,22 @@ import ( ) type SampleSinkConfig struct { - defaultSinkConfig // defines JSON tags for 'type' and 'meta_as_tags' - // Add additional options + // defines JSON tags for 'type' and 'meta_as_tags' + // See: metricSink.go + defaultSinkConfig + // Additional config options, for SampleSink } type SampleSink struct { - sink // declarate 'name' and 'meta_as_tags' + // declares elements 'name' and 'meta_as_tags' + sink config SampleSinkConfig // entry point to the SampleSinkConfig } +// Implement functions required for Sink interface +// Write(...), Flush(), Close() +// See: metricSink.go + // Code to submit a single CCMetric to the sink func (s *SampleSink) Write(point lp.CCMetric) error { log.Print(point) @@ -39,9 +46,13 @@ func (s *SampleSink) Close() { // Initialize the sink by giving it a name and reading in the config JSON func NewSampleSink(name string, config json.RawMessage) (Sink, error) { s := new(SampleSink) + + // Set name of sampleSink + // The name should be chosen in such a way that different instances of SampleSink can be distinguished s.name = fmt.Sprintf("SampleSink(%s)", name) // Always specify a name here // Set defaults in s.config + // Allow overwriting these defaults by reading config JSON // Read in the config JSON if len(config) > 0 { @@ -52,7 +63,7 @@ func NewSampleSink(name string, config json.RawMessage) (Sink, error) { } // Check if all required fields in the config are set - // Use 'len(s.config.Option) > 0' for string settings + // E.g. use 'len(s.config.Option) > 0' for string settings // Establish connection to the server, library, ... // Check required files exist and lookup path(s) of executable(s)