diff --git a/.github/workflows/Release.yml b/.github/workflows/Release.yml index f7143cb..1d1906b 100644 --- a/.github/workflows/Release.yml +++ b/.github/workflows/Release.yml @@ -133,13 +133,63 @@ jobs: name: cc-metric-collector SRPM for UBI 8 path: ${{ steps.rpmbuild.outputs.SRPM }} + # + # Build on Ubuntu 20.04 using official go package + # + Ubuntu-focal-build: + runs-on: ubuntu-latest + container: ubuntu:20.04 + # The job outputs link to the outputs of the 'debrename' step + # Only job outputs can be used in child jobs + outputs: + deb : ${{steps.debrename.outputs.DEB}} + steps: + # Use apt to install development packages + - name: Install development packages + run: | + apt update && apt --assume-yes upgrade + apt --assume-yes install build-essential sed git wget bash + # Checkout git repository and submodules + # fetch-depth must be 0 to use git describe + # See: https://github.com/marketplace/actions/checkout + - name: Checkout + uses: actions/checkout@v2 + with: + submodules: recursive + fetch-depth: 0 + # Use official golang package + - name: Install Golang + run: | + wget -q https://go.dev/dl/go1.19.1.linux-amd64.tar.gz + tar -C /usr/local -xzf go1.19.1.linux-amd64.tar.gz + export PATH=/usr/local/go/bin:/usr/local/go/pkg/tool/linux_amd64:$PATH + go version + - name: DEB build MetricCollector + id: dpkg-build + run: | + export PATH=/usr/local/go/bin:/usr/local/go/pkg/tool/linux_amd64:$PATH + make DEB + - name: Rename DEB (add '_ubuntu20.04') + id: debrename + run: | + OLD_DEB_NAME=$(echo "${{steps.dpkg-build.outputs.DEB}}" | rev | cut -d '.' -f 2- | rev) + NEW_DEB_FILE="${OLD_DEB_NAME}_ubuntu20.04.deb" + mv "${{steps.dpkg-build.outputs.DEB}}" "${NEW_DEB_FILE}" + echo "::set-output name=DEB::${NEW_DEB_FILE}" + # See: https://github.com/actions/upload-artifact + - name: Save DEB as artifact + uses: actions/upload-artifact@v2 + with: + name: cc-metric-collector DEB for Ubuntu 20.04 + path: ${{ steps.debrename.outputs.DEB }} + # # Create release with fresh RPMs # Release: runs-on: ubuntu-latest # We need the RPMs, so add dependency - needs: [AlmaLinux-RPM-build, UBI-8-RPM-build] + needs: [AlmaLinux-RPM-build, UBI-8-RPM-build, Ubuntu-focal-build] steps: # See: https://github.com/actions/download-artifact @@ -161,6 +211,11 @@ jobs: with: name: cc-metric-collector SRPM for UBI 8 + - name: Download Ubuntu 20.04 DEB + uses: actions/download-artifact@v2 + with: + name: cc-metric-collector DEB for Ubuntu 20.04 + # The download actions do not publish the name of the downloaded file, # so we re-use the job outputs of the parent jobs. The files are all # downloaded to the current folder. @@ -174,14 +229,17 @@ jobs: ALMA_85_SRPM=$(basename "${{ needs.AlmaLinux-RPM-build.outputs.srpm}}") UBI_8_RPM=$(basename "${{ needs.UBI-8-RPM-build.outputs.rpm}}") UBI_8_SRPM=$(basename "${{ needs.UBI-8-RPM-build.outputs.srpm}}") + U_2004_DEB=$(basename "${{ needs.Ubuntu-focal-build.outputs.deb}}") echo "ALMA_85_RPM::${ALMA_85_RPM}" echo "ALMA_85_SRPM::${ALMA_85_SRPM}" echo "UBI_8_RPM::${UBI_8_RPM}" echo "UBI_8_SRPM::${UBI_8_SRPM}" + echo "U_2004_DEB::${U_2004_DEB}" echo "::set-output name=ALMA_85_RPM::${ALMA_85_RPM}" echo "::set-output name=ALMA_85_SRPM::${ALMA_85_SRPM}" echo "::set-output name=UBI_8_RPM::${UBI_8_RPM}" echo "::set-output name=UBI_8_SRPM::${UBI_8_SRPM}" + echo "::set-output name=U_2004_DEB::${U_2004_DEB}" # See: https://github.com/softprops/action-gh-release - name: Release @@ -194,3 +252,4 @@ jobs: ${{ steps.files.outputs.ALMA_85_SRPM }} ${{ steps.files.outputs.UBI_8_RPM }} ${{ steps.files.outputs.UBI_8_SRPM }} + ${{ steps.files.outputs.U_2004_DEB }} diff --git a/.github/workflows/runonce.yml b/.github/workflows/runonce.yml index 2036179..5218145 100644 --- a/.github/workflows/runonce.yml +++ b/.github/workflows/runonce.yml @@ -31,4 +31,30 @@ jobs: run: make - name: Run MetricCollector once - run: ./cc-metric-collector --once --config .github/ci-config.json \ No newline at end of file + run: ./cc-metric-collector --once --config .github/ci-config.json + + # + # Job build-1-19 + # Build on latest Ubuntu using golang version 1.19 + # + build-1-19: + runs-on: ubuntu-latest + steps: + # See: https://github.com/marketplace/actions/checkout + # Checkout git repository and submodules + - name: Checkout + uses: actions/checkout@v2 + with: + submodules: recursive + + # See: https://github.com/marketplace/actions/setup-go-environment + - name: Setup Golang + uses: actions/setup-go@v3 + with: + go-version: '1.19' + + - name: Build MetricCollector + run: make + + - name: Run MetricCollector once + run: ./cc-metric-collector --once --config .github/ci-config.json diff --git a/README.md b/README.md index 530989e..65bde1d 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # cc-metric-collector -A node agent for measuring, processing and forwarding node level metrics. It is part of the ClusterCockpit ecosystem. +A node agent for measuring, processing and forwarding node level metrics. It is part of the [ClusterCockpit ecosystem](./docs/introduction.md). The metric collector sends (and receives) metric in the [InfluxDB line protocol](https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol/) as it provides flexibility while providing a separation between tags (like index columns in relational databases) and fields (like data columns). @@ -11,7 +11,7 @@ The receiver runs as a go routine side-by-side with the timer loop and asynchron # Configuration Configuration is implemented using a single json document that is distributed over network and may be persisted as file. -Supported metrics are documented [here](https://github.com/ClusterCockpit/cc-specifications/blob/master/metrics/lineprotocol_alternative.md). +Supported metrics are documented [here](https://github.com/ClusterCockpit/cc-specifications/blob/master/interfaces/lineprotocol/README.md). There is a main configuration file with basic settings that point to the other configuration files for the different components. @@ -26,7 +26,7 @@ There is a main configuration file with basic settings that point to the other c } ``` -The `interval` defines how often the metrics should be read and send to the sink. The `duration` tells collectors how long one measurement has to take. This is important for some collectors, like the `likwid` collector. +The `interval` defines how often the metrics should be read and send to the sink. The `duration` tells collectors how long one measurement has to take. This is important for some collectors, like the `likwid` collector. For more information, see [here](./docs/configuration.md). See the component READMEs for their configuration: @@ -44,6 +44,8 @@ $ go get (requires at least golang 1.16) $ make ``` +For more information, see [here](./docs/building.md). + # Running ``` diff --git a/cc-metric-collector.go b/cc-metric-collector.go index 42f7843..5544af8 100644 --- a/cc-metric-collector.go +++ b/cc-metric-collector.go @@ -15,10 +15,10 @@ import ( "sync" "time" - cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" - lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" mr "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" - mct "github.com/ClusterCockpit/cc-metric-collector/internal/multiChanTicker" + cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" + lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric" + mct "github.com/ClusterCockpit/cc-metric-collector/pkg/multiChanTicker" ) type CentralConfigFile struct { diff --git a/collectors/README.md b/collectors/README.md index 10e5105..8002ed2 100644 --- a/collectors/README.md +++ b/collectors/README.md @@ -35,7 +35,7 @@ In contrast to the configuration files for sinks and receivers, the collectors c * [`nfs4stat`](./nfs4Metric.md) * [`cpufreq`](./cpufreqMetric.md) * [`cpufreq_cpuinfo`](./cpufreqCpuinfoMetric.md) -* [`numastat`](./numastatMetric.md) +* [`numastats`](./numastatsMetric.md) * [`gpfs`](./gpfsMetric.md) * [`beegfs_meta`](./beegfsmetaMetric.md) * [`beegfs_storage`](./beegfsstorageMetric.md) diff --git a/collectors/beegfsmetaMetric.go b/collectors/beegfsmetaMetric.go index a27faf2..d202773 100644 --- a/collectors/beegfsmetaMetric.go +++ b/collectors/beegfsmetaMetric.go @@ -5,7 +5,7 @@ import ( "bytes" "encoding/json" "fmt" - "io/ioutil" + "io" "os" "os/exec" "os/user" @@ -14,8 +14,8 @@ import ( "strings" "time" - cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" - lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" + lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric" ) const DEFAULT_BEEGFS_CMD = "beegfs-ctl" @@ -115,7 +115,7 @@ func (m *BeegfsMetaCollector) Read(interval time.Duration, output chan lp.CCMetr return } //get mounpoint - buffer, _ := ioutil.ReadFile(string("/proc/mounts")) + buffer, _ := os.ReadFile(string("/proc/mounts")) mounts := strings.Split(string(buffer), "\n") var mountpoints []string for _, line := range mounts { @@ -157,9 +157,9 @@ func (m *BeegfsMetaCollector) Read(interval time.Duration, output chan lp.CCMetr if err != nil { fmt.Fprintf(os.Stderr, "BeegfsMetaCollector.Read(): Failed to execute command \"%s\": %s\n", cmd.String(), err.Error()) fmt.Fprintf(os.Stderr, "BeegfsMetaCollector.Read(): command exit code: \"%d\"\n", cmd.ProcessState.ExitCode()) - data, _ := ioutil.ReadAll(cmdStderr) + data, _ := io.ReadAll(cmdStderr) fmt.Fprintf(os.Stderr, "BeegfsMetaCollector.Read(): command stderr: \"%s\"\n", string(data)) - data, _ = ioutil.ReadAll(cmdStdout) + data, _ = io.ReadAll(cmdStdout) fmt.Fprintf(os.Stderr, "BeegfsMetaCollector.Read(): command stdout: \"%s\"\n", string(data)) return } diff --git a/collectors/beegfsstorageMetric.go b/collectors/beegfsstorageMetric.go index 1160664..be57e0f 100644 --- a/collectors/beegfsstorageMetric.go +++ b/collectors/beegfsstorageMetric.go @@ -5,7 +5,7 @@ import ( "bytes" "encoding/json" "fmt" - "io/ioutil" + "io" "os" "os/exec" "os/user" @@ -14,8 +14,8 @@ import ( "strings" "time" - cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" - lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" + lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric" ) // Struct for the collector-specific JSON config @@ -108,7 +108,7 @@ func (m *BeegfsStorageCollector) Read(interval time.Duration, output chan lp.CCM return } //get mounpoint - buffer, _ := ioutil.ReadFile(string("/proc/mounts")) + buffer, _ := os.ReadFile(string("/proc/mounts")) mounts := strings.Split(string(buffer), "\n") var mountpoints []string for _, line := range mounts { @@ -149,9 +149,9 @@ func (m *BeegfsStorageCollector) Read(interval time.Duration, output chan lp.CCM if err != nil { fmt.Fprintf(os.Stderr, "BeegfsStorageCollector.Read(): Failed to execute command \"%s\": %s\n", cmd.String(), err.Error()) fmt.Fprintf(os.Stderr, "BeegfsStorageCollector.Read(): command exit code: \"%d\"\n", cmd.ProcessState.ExitCode()) - data, _ := ioutil.ReadAll(cmdStderr) + data, _ := io.ReadAll(cmdStderr) fmt.Fprintf(os.Stderr, "BeegfsStorageCollector.Read(): command stderr: \"%s\"\n", string(data)) - data, _ = ioutil.ReadAll(cmdStdout) + data, _ = io.ReadAll(cmdStdout) fmt.Fprintf(os.Stderr, "BeegfsStorageCollector.Read(): command stdout: \"%s\"\n", string(data)) return } diff --git a/collectors/collectorManager.go b/collectors/collectorManager.go index b341b1b..ea648ef 100644 --- a/collectors/collectorManager.go +++ b/collectors/collectorManager.go @@ -6,9 +6,9 @@ import ( "sync" "time" - cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" - lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" - mct "github.com/ClusterCockpit/cc-metric-collector/internal/multiChanTicker" + cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" + lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric" + mct "github.com/ClusterCockpit/cc-metric-collector/pkg/multiChanTicker" ) // Map of all available metric collectors @@ -38,6 +38,7 @@ var AvailableCollectors = map[string]MetricCollector{ "beegfs_storage": new(BeegfsStorageCollector), "rocm_smi": new(RocmSmiCollector), "self": new(SelfCollector), + "schedstat": new(SchedstatCollector), } // Metric collector manager data structure diff --git a/collectors/cpufreqCpuinfoMetric.go b/collectors/cpufreqCpuinfoMetric.go index 80732ff..a6716d3 100644 --- a/collectors/cpufreqCpuinfoMetric.go +++ b/collectors/cpufreqCpuinfoMetric.go @@ -10,8 +10,8 @@ import ( "strings" "time" - cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" - lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" + lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric" ) // diff --git a/collectors/cpufreqMetric.go b/collectors/cpufreqMetric.go index e6a0081..cf67457 100644 --- a/collectors/cpufreqMetric.go +++ b/collectors/cpufreqMetric.go @@ -3,14 +3,14 @@ package collectors import ( "encoding/json" "fmt" - "io/ioutil" + "os" "path/filepath" "strconv" "strings" "time" - cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" - lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" + lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric" "golang.org/x/sys/unix" ) @@ -88,7 +88,7 @@ func (m *CPUFreqCollector) Init(config json.RawMessage) error { // Read package ID physicalPackageIDFile := filepath.Join(cpuDir, "topology", "physical_package_id") - line, err := ioutil.ReadFile(physicalPackageIDFile) + line, err := os.ReadFile(physicalPackageIDFile) if err != nil { return fmt.Errorf("unable to read physical package ID from file '%s': %v", physicalPackageIDFile, err) } @@ -100,7 +100,7 @@ func (m *CPUFreqCollector) Init(config json.RawMessage) error { // Read core ID coreIDFile := filepath.Join(cpuDir, "topology", "core_id") - line, err = ioutil.ReadFile(coreIDFile) + line, err = os.ReadFile(coreIDFile) if err != nil { return fmt.Errorf("unable to read core ID from file '%s': %v", coreIDFile, err) } @@ -188,7 +188,7 @@ func (m *CPUFreqCollector) Read(interval time.Duration, output chan lp.CCMetric) } // Read current frequency - line, err := ioutil.ReadFile(t.scalingCurFreqFile) + line, err := os.ReadFile(t.scalingCurFreqFile) if err != nil { cclog.ComponentError( m.name, diff --git a/collectors/cpustatMetric.go b/collectors/cpustatMetric.go index c0dcf13..bd6ec2f 100644 --- a/collectors/cpustatMetric.go +++ b/collectors/cpustatMetric.go @@ -9,8 +9,9 @@ import ( "strings" "time" - cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" - lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" + lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric" + sysconf "github.com/tklauser/go-sysconf" ) const CPUSTATFILE = `/proc/stat` @@ -21,10 +22,12 @@ type CpustatCollectorConfig struct { type CpustatCollector struct { metricCollector - config CpustatCollectorConfig - matches map[string]int - cputags map[string]map[string]string - nodetags map[string]string + config CpustatCollectorConfig + lastTimestamp time.Time // Store time stamp of last tick to derive values + matches map[string]int + cputags map[string]map[string]string + nodetags map[string]string + olddata map[string]map[string]int64 } func (m *CpustatCollector) Init(config json.RawMessage) error { @@ -76,36 +79,48 @@ func (m *CpustatCollector) Init(config json.RawMessage) error { // Pre-generate tags for all CPUs num_cpus := 0 m.cputags = make(map[string]map[string]string) + m.olddata = make(map[string]map[string]int64) scanner := bufio.NewScanner(file) for scanner.Scan() { line := scanner.Text() linefields := strings.Fields(line) - if strings.HasPrefix(linefields[0], "cpu") && strings.Compare(linefields[0], "cpu") != 0 { + if strings.Compare(linefields[0], "cpu") == 0 { + m.olddata["cpu"] = make(map[string]int64) + for k, v := range m.matches { + m.olddata["cpu"][k], _ = strconv.ParseInt(linefields[v], 0, 64) + } + } else if strings.HasPrefix(linefields[0], "cpu") && strings.Compare(linefields[0], "cpu") != 0 { cpustr := strings.TrimLeft(linefields[0], "cpu") cpu, _ := strconv.Atoi(cpustr) m.cputags[linefields[0]] = map[string]string{"type": "hwthread", "type-id": fmt.Sprintf("%d", cpu)} + m.olddata[linefields[0]] = make(map[string]int64) + for k, v := range m.matches { + m.olddata[linefields[0]][k], _ = strconv.ParseInt(linefields[v], 0, 64) + } num_cpus++ } } + m.lastTimestamp = time.Now() m.init = true return nil } -func (m *CpustatCollector) parseStatLine(linefields []string, tags map[string]string, output chan lp.CCMetric) { +func (m *CpustatCollector) parseStatLine(linefields []string, tags map[string]string, output chan lp.CCMetric, now time.Time, tsdelta time.Duration) { values := make(map[string]float64) - total := 0.0 + clktck, _ := sysconf.Sysconf(sysconf.SC_CLK_TCK) for match, index := range m.matches { if len(match) > 0 { x, err := strconv.ParseInt(linefields[index], 0, 64) if err == nil { - values[match] = float64(x) - total += values[match] + vdiff := x - m.olddata[linefields[0]][match] + m.olddata[linefields[0]][match] = x // Store new value for next run + values[match] = float64(vdiff) / float64(tsdelta.Seconds()) / float64(clktck) } } } - t := time.Now() + for name, value := range values { - y, err := lp.New(name, tags, m.meta, map[string]interface{}{"value": (value * 100.0) / total}, t) + y, err := lp.New(name, tags, m.meta, map[string]interface{}{"value": value * 100}, now) if err == nil { output <- y } @@ -117,6 +132,9 @@ func (m *CpustatCollector) Read(interval time.Duration, output chan lp.CCMetric) return } num_cpus := 0 + now := time.Now() + tsdelta := now.Sub(m.lastTimestamp) + file, err := os.Open(string(CPUSTATFILE)) if err != nil { cclog.ComponentError(m.name, err.Error()) @@ -128,9 +146,9 @@ func (m *CpustatCollector) Read(interval time.Duration, output chan lp.CCMetric) line := scanner.Text() linefields := strings.Fields(line) if strings.Compare(linefields[0], "cpu") == 0 { - m.parseStatLine(linefields, m.nodetags, output) + m.parseStatLine(linefields, m.nodetags, output, now, tsdelta) } else if strings.HasPrefix(linefields[0], "cpu") { - m.parseStatLine(linefields, m.cputags[linefields[0]], output) + m.parseStatLine(linefields, m.cputags[linefields[0]], output, now, tsdelta) num_cpus++ } } @@ -139,11 +157,13 @@ func (m *CpustatCollector) Read(interval time.Duration, output chan lp.CCMetric) m.nodetags, m.meta, map[string]interface{}{"value": int(num_cpus)}, - time.Now(), + now, ) if err == nil { output <- num_cpus_metric } + + m.lastTimestamp = now } func (m *CpustatCollector) Close() { diff --git a/collectors/customCmdMetric.go b/collectors/customCmdMetric.go index 492dd48..e150014 100644 --- a/collectors/customCmdMetric.go +++ b/collectors/customCmdMetric.go @@ -3,13 +3,13 @@ package collectors import ( "encoding/json" "errors" - "io/ioutil" "log" + "os" "os/exec" "strings" "time" - lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric" influx "github.com/influxdata/line-protocol" ) @@ -53,7 +53,7 @@ func (m *CustomCmdCollector) Init(config json.RawMessage) error { } } for _, f := range m.config.Files { - _, err = ioutil.ReadFile(f) + _, err = os.ReadFile(f) if err == nil { m.files = append(m.files, f) } else { @@ -106,7 +106,7 @@ func (m *CustomCmdCollector) Read(interval time.Duration, output chan lp.CCMetri } } for _, file := range m.files { - buffer, err := ioutil.ReadFile(file) + buffer, err := os.ReadFile(file) if err != nil { log.Print(err) return diff --git a/collectors/diskstatMetric.go b/collectors/diskstatMetric.go index 69ffe07..d1ec4fc 100644 --- a/collectors/diskstatMetric.go +++ b/collectors/diskstatMetric.go @@ -8,8 +8,8 @@ import ( "syscall" "time" - cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" - lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" + lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric" ) // "log" diff --git a/collectors/gpfsMetric.go b/collectors/gpfsMetric.go index ca9affe..b3f9fc4 100644 --- a/collectors/gpfsMetric.go +++ b/collectors/gpfsMetric.go @@ -5,7 +5,7 @@ import ( "bytes" "encoding/json" "fmt" - "io/ioutil" + "io" "log" "os/exec" "os/user" @@ -13,8 +13,8 @@ import ( "strings" "time" - cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" - lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" + lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric" ) const DEFAULT_GPFS_CMD = "mmpmon" @@ -118,8 +118,8 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) { cmd.Stderr = cmdStderr err := cmd.Run() if err != nil { - dataStdErr, _ := ioutil.ReadAll(cmdStderr) - dataStdOut, _ := ioutil.ReadAll(cmdStdout) + dataStdErr, _ := io.ReadAll(cmdStderr) + dataStdOut, _ := io.ReadAll(cmdStdout) cclog.ComponentError( m.name, fmt.Sprintf("Read(): Failed to execute command \"%s\": %v\n", cmd.String(), err), diff --git a/collectors/infinibandMetric.go b/collectors/infinibandMetric.go index 92ea911..7560af9 100644 --- a/collectors/infinibandMetric.go +++ b/collectors/infinibandMetric.go @@ -2,11 +2,10 @@ package collectors import ( "fmt" - "io/ioutil" "os" - cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" - lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" + lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric" "golang.org/x/sys/unix" "encoding/json" @@ -19,8 +18,9 @@ import ( const IB_BASEPATH = "/sys/class/infiniband/" type InfinibandCollectorMetric struct { - path string - unit string + path string + unit string + scale int64 } type InfinibandCollectorInfo struct { @@ -84,7 +84,7 @@ func (m *InfinibandCollector) Init(config json.RawMessage) error { for _, path := range ibDirs { // Skip, when no LID is assigned - line, err := ioutil.ReadFile(filepath.Join(path, "lid")) + line, err := os.ReadFile(filepath.Join(path, "lid")) if err != nil { continue } @@ -113,10 +113,10 @@ func (m *InfinibandCollector) Init(config json.RawMessage) error { // Check access to counter files countersDir := filepath.Join(path, "counters") portCounterFiles := map[string]InfinibandCollectorMetric{ - "ib_recv": {path: filepath.Join(countersDir, "port_rcv_data"), unit: "bytes"}, - "ib_xmit": {path: filepath.Join(countersDir, "port_xmit_data"), unit: "bytes"}, - "ib_recv_pkts": {path: filepath.Join(countersDir, "port_rcv_packets"), unit: "packets"}, - "ib_xmit_pkts": {path: filepath.Join(countersDir, "port_xmit_packets"), unit: "packets"}, + "ib_recv": {path: filepath.Join(countersDir, "port_rcv_data"), unit: "bytes", scale: 4}, + "ib_xmit": {path: filepath.Join(countersDir, "port_xmit_data"), unit: "bytes", scale: 4}, + "ib_recv_pkts": {path: filepath.Join(countersDir, "port_rcv_packets"), unit: "packets", scale: 1}, + "ib_xmit_pkts": {path: filepath.Join(countersDir, "port_xmit_packets"), unit: "packets", scale: 1}, } for _, counter := range portCounterFiles { err := unix.Access(counter.path, unix.R_OK) @@ -174,7 +174,7 @@ func (m *InfinibandCollector) Read(interval time.Duration, output chan lp.CCMetr for counterName, counterDef := range info.portCounterFiles { // Read counter file - line, err := ioutil.ReadFile(counterDef.path) + line, err := os.ReadFile(counterDef.path) if err != nil { cclog.ComponentError( m.name, @@ -191,6 +191,8 @@ func (m *InfinibandCollector) Read(interval time.Duration, output chan lp.CCMetr fmt.Sprintf("Read(): Failed to convert Infininiband metrice %s='%s' to int64: %v", counterName, data, err)) continue } + // Scale raw value + v *= counterDef.scale // Send absolut values if m.config.SendAbsoluteValues { diff --git a/collectors/iostatMetric.go b/collectors/iostatMetric.go index 19b4157..4d1dbd1 100644 --- a/collectors/iostatMetric.go +++ b/collectors/iostatMetric.go @@ -4,8 +4,8 @@ import ( "bufio" "os" - cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" - lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" + lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric" // "log" "encoding/json" diff --git a/collectors/ipmiMetric.go b/collectors/ipmiMetric.go index 50605ac..32c4c45 100644 --- a/collectors/ipmiMetric.go +++ b/collectors/ipmiMetric.go @@ -10,7 +10,7 @@ import ( "strings" "time" - lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric" ) const IPMITOOL_PATH = `ipmitool` diff --git a/collectors/likwidMetric.go b/collectors/likwidMetric.go index c036415..265d84c 100644 --- a/collectors/likwidMetric.go +++ b/collectors/likwidMetric.go @@ -12,7 +12,6 @@ import ( "encoding/json" "errors" "fmt" - "io/ioutil" "math" "os" "os/signal" @@ -24,10 +23,10 @@ import ( "time" "unsafe" - cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" - lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" - topo "github.com/ClusterCockpit/cc-metric-collector/internal/ccTopology" agg "github.com/ClusterCockpit/cc-metric-collector/internal/metricAggregator" + cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" + lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric" + topo "github.com/ClusterCockpit/cc-metric-collector/pkg/ccTopology" "github.com/NVIDIA/go-nvml/pkg/dl" ) @@ -154,12 +153,13 @@ func getBaseFreq() float64 { } var freq float64 = math.NaN() for _, f := range files { - buffer, err := ioutil.ReadFile(f) + buffer, err := os.ReadFile(f) if err == nil { data := strings.Replace(string(buffer), "\n", "", -1) x, err := strconv.ParseInt(data, 0, 64) if err == nil { - freq = float64(x) * 1e6 + freq = float64(x) + break } } } @@ -168,11 +168,11 @@ func getBaseFreq() float64 { C.power_init(0) info := C.get_powerInfo() if float64(info.baseFrequency) != 0 { - freq = float64(info.baseFrequency) * 1e6 + freq = float64(info.baseFrequency) } C.power_finalize() } - return freq + return freq * 1e3 } func (m *LikwidCollector) Init(config json.RawMessage) error { diff --git a/collectors/likwidMetric.md b/collectors/likwidMetric.md index 1bb211f..54640dc 100644 --- a/collectors/likwidMetric.md +++ b/collectors/likwidMetric.md @@ -7,6 +7,9 @@ The `likwid` collector is probably the most complicated collector. The LIKWID li "likwid": { "force_overwrite" : false, "invalid_to_zero" : false, + "liblikwid_path" : "/path/to/liblikwid.so", + "accessdaemon_path" : "/folder/that/contains/likwid-accessD", + "access_mode" : "direct or accessdaemon or perf_event", "eventsets": [ { "events" : { diff --git a/collectors/loadavgMetric.go b/collectors/loadavgMetric.go index 58fb102..887e63e 100644 --- a/collectors/loadavgMetric.go +++ b/collectors/loadavgMetric.go @@ -3,13 +3,13 @@ package collectors import ( "encoding/json" "fmt" - "io/ioutil" + "os" "strconv" "strings" "time" - cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" - lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" + lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric" ) // @@ -72,7 +72,7 @@ func (m *LoadavgCollector) Read(interval time.Duration, output chan lp.CCMetric) if !m.init { return } - buffer, err := ioutil.ReadFile(LOADAVGFILE) + buffer, err := os.ReadFile(LOADAVGFILE) if err != nil { if err != nil { cclog.ComponentError( diff --git a/collectors/lustreMetric.go b/collectors/lustreMetric.go index eade2ca..bcb9ca6 100644 --- a/collectors/lustreMetric.go +++ b/collectors/lustreMetric.go @@ -10,8 +10,8 @@ import ( "strings" "time" - cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" - lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" + lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric" ) const LUSTRE_SYSFS = `/sys/fs/lustre` diff --git a/collectors/memstatMetric.go b/collectors/memstatMetric.go index 9841a01..4aec4c8 100644 --- a/collectors/memstatMetric.go +++ b/collectors/memstatMetric.go @@ -12,8 +12,8 @@ import ( "strings" "time" - cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" - lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" + lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric" ) const MEMSTATFILE = "/proc/meminfo" @@ -68,7 +68,8 @@ func getStats(filename string) map[string]MemstatStats { } else if len(linefields) == 5 { v, err := strconv.ParseFloat(linefields[3], 64) if err == nil { - stats[strings.Trim(linefields[0], ":")] = MemstatStats{ + cclog.ComponentDebug("getStats", strings.Trim(linefields[2], ":"), v, linefields[4]) + stats[strings.Trim(linefields[2], ":")] = MemstatStats{ value: v, unit: linefields[4], } @@ -160,7 +161,6 @@ func (m *MemstatCollector) Init(config json.RawMessage) error { func (m *MemstatCollector) Read(interval time.Duration, output chan lp.CCMetric) { if !m.init { - cclog.ComponentPrint(m.name, "Here") return } @@ -188,16 +188,20 @@ func (m *MemstatCollector) Read(interval time.Duration, output chan lp.CCMetric) unit := "" if totalVal, total := stats["MemTotal"]; total { if freeVal, free := stats["MemFree"]; free { + memUsed = totalVal.value - freeVal.value + if len(totalVal.unit) > 0 { + unit = totalVal.unit + } else if len(freeVal.unit) > 0 { + unit = freeVal.unit + } if bufVal, buffers := stats["Buffers"]; buffers { + memUsed -= bufVal.value + if len(bufVal.unit) > 0 && len(unit) == 0 { + unit = bufVal.unit + } if cacheVal, cached := stats["Cached"]; cached { - memUsed = totalVal.value - (freeVal.value + bufVal.value + cacheVal.value) - if len(totalVal.unit) > 0 { - unit = totalVal.unit - } else if len(freeVal.unit) > 0 { - unit = freeVal.unit - } else if len(bufVal.unit) > 0 { - unit = bufVal.unit - } else if len(cacheVal.unit) > 0 { + memUsed -= cacheVal.value + if len(cacheVal.unit) > 0 && len(unit) == 0 { unit = cacheVal.unit } } diff --git a/collectors/metricCollector.go b/collectors/metricCollector.go index 4d52571..f09fa61 100644 --- a/collectors/metricCollector.go +++ b/collectors/metricCollector.go @@ -5,7 +5,7 @@ import ( "fmt" "time" - lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric" ) type MetricCollector interface { diff --git a/collectors/netstatMetric.go b/collectors/netstatMetric.go index 8cfb34e..8428df1 100644 --- a/collectors/netstatMetric.go +++ b/collectors/netstatMetric.go @@ -9,8 +9,8 @@ import ( "strings" "time" - cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" - lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" + lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric" ) const NETSTATFILE = "/proc/net/dev" diff --git a/collectors/nfsMetric.go b/collectors/nfsMetric.go index 6b15784..7dca096 100644 --- a/collectors/nfsMetric.go +++ b/collectors/nfsMetric.go @@ -11,7 +11,7 @@ import ( "strings" "time" - lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric" ) // First part contains the code for the general NfsCollector. diff --git a/collectors/numastatsMetric.go b/collectors/numastatsMetric.go index f65a019..8eaac67 100644 --- a/collectors/numastatsMetric.go +++ b/collectors/numastatsMetric.go @@ -10,8 +10,8 @@ import ( "strings" "time" - cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" - lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" + lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric" ) // diff --git a/collectors/nvidiaMetric.go b/collectors/nvidiaMetric.go index 0eb7c8a..a1fed03 100644 --- a/collectors/nvidiaMetric.go +++ b/collectors/nvidiaMetric.go @@ -8,8 +8,8 @@ import ( "strings" "time" - cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" - lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" + lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric" "github.com/NVIDIA/go-nvml/pkg/nvml" ) diff --git a/collectors/rocmsmiMetric.go b/collectors/rocmsmiMetric.go index c717a5d..9d8625d 100644 --- a/collectors/rocmsmiMetric.go +++ b/collectors/rocmsmiMetric.go @@ -6,8 +6,8 @@ import ( "fmt" "time" - cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" - lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" + lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric" "github.com/ClusterCockpit/go-rocm-smi/pkg/rocm_smi" ) @@ -66,14 +66,14 @@ func (m *RocmSmiCollector) Init(config json.RawMessage) error { ret := rocm_smi.Init() if ret != rocm_smi.STATUS_SUCCESS { - err = errors.New("Failed to initialize ROCm SMI library") + err = errors.New("failed to initialize ROCm SMI library") cclog.ComponentError(m.name, err.Error()) return err } numDevs, ret := rocm_smi.NumMonitorDevices() if ret != rocm_smi.STATUS_SUCCESS { - err = errors.New("Failed to get number of GPUs from ROCm SMI library") + err = errors.New("failed to get number of GPUs from ROCm SMI library") cclog.ComponentError(m.name, err.Error()) return err } @@ -98,14 +98,14 @@ func (m *RocmSmiCollector) Init(config json.RawMessage) error { } device, ret := rocm_smi.DeviceGetHandleByIndex(i) if ret != rocm_smi.STATUS_SUCCESS { - err = fmt.Errorf("Failed to get handle for GPU %d", i) + err = fmt.Errorf("failed to get handle for GPU %d", i) cclog.ComponentError(m.name, err.Error()) return err } pciInfo, ret := rocm_smi.DeviceGetPciInfo(device) if ret != rocm_smi.STATUS_SUCCESS { - err = fmt.Errorf("Failed to get PCI information for GPU %d", i) + err = fmt.Errorf("failed to get PCI information for GPU %d", i) cclog.ComponentError(m.name, err.Error()) return err } diff --git a/collectors/sampleMetric.go b/collectors/sampleMetric.go index 47ec296..056ed85 100644 --- a/collectors/sampleMetric.go +++ b/collectors/sampleMetric.go @@ -4,8 +4,8 @@ import ( "encoding/json" "time" - cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" - lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" + lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric" ) // These are the fields we read from the JSON configuration diff --git a/collectors/sampleTimerMetric.go b/collectors/sampleTimerMetric.go index aa6807e..8b09bc1 100644 --- a/collectors/sampleTimerMetric.go +++ b/collectors/sampleTimerMetric.go @@ -5,8 +5,8 @@ import ( "sync" "time" - cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" - lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" + lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric" ) // These are the fields we read from the JSON configuration diff --git a/collectors/schedstatMetric.go b/collectors/schedstatMetric.go new file mode 100644 index 0000000..8c010ed --- /dev/null +++ b/collectors/schedstatMetric.go @@ -0,0 +1,154 @@ +package collectors + +import ( + "bufio" + "encoding/json" + "fmt" + "math" + "os" + "strconv" + "strings" + "time" + + cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" + lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric" +) + +const SCHEDSTATFILE = `/proc/schedstat` + +// These are the fields we read from the JSON configuration +type SchedstatCollectorConfig struct { + ExcludeMetrics []string `json:"exclude_metrics,omitempty"` +} + +// This contains all variables we need during execution and the variables +// defined by metricCollector (name, init, ...) +type SchedstatCollector struct { + metricCollector + config SchedstatCollectorConfig // the configuration structure + lastTimestamp time.Time // Store time stamp of last tick to derive values + meta map[string]string // default meta information + cputags map[string]map[string]string // default tags + olddata map[string]map[string]int64 // default tags +} + +// Functions to implement MetricCollector interface +// Init(...), Read(...), Close() +// See: metricCollector.go + +// Init initializes the sample collector +// Called once by the collector manager +// All tags, meta data tags and metrics that do not change over the runtime should be set here +func (m *SchedstatCollector) Init(config json.RawMessage) error { + var err error = nil + // Always set the name early in Init() to use it in cclog.Component* functions + m.name = "SchedstatCollector" + // This is for later use, also call it early + m.setup() + // Tell whether the collector should be run in parallel with others (reading files, ...) + // or it should be run serially, mostly for collectors acutally doing measurements + // because they should not measure the execution of the other collectors + m.parallel = true + // Define meta information sent with each metric + // (Can also be dynamic or this is the basic set with extension through AddMeta()) + m.meta = map[string]string{"source": m.name, "group": "SCHEDSTAT"} + + // Read in the JSON configuration + if len(config) > 0 { + err = json.Unmarshal(config, &m.config) + if err != nil { + cclog.ComponentError(m.name, "Error reading config:", err.Error()) + return err + } + } + + // Check input file + file, err := os.Open(string(SCHEDSTATFILE)) + if err != nil { + cclog.ComponentError(m.name, err.Error()) + } + defer file.Close() + + // Pre-generate tags for all CPUs + num_cpus := 0 + m.cputags = make(map[string]map[string]string) + m.olddata = make(map[string]map[string]int64) + scanner := bufio.NewScanner(file) + for scanner.Scan() { + line := scanner.Text() + linefields := strings.Fields(line) + if strings.HasPrefix(linefields[0], "cpu") && strings.Compare(linefields[0], "cpu") != 0 { + cpustr := strings.TrimLeft(linefields[0], "cpu") + cpu, _ := strconv.Atoi(cpustr) + running, _ := strconv.ParseInt(linefields[7], 10, 64) + waiting, _ := strconv.ParseInt(linefields[8], 10, 64) + m.cputags[linefields[0]] = map[string]string{"type": "hwthread", "type-id": fmt.Sprintf("%d", cpu)} + m.olddata[linefields[0]] = map[string]int64{"running": running, "waiting": waiting} + num_cpus++ + } + } + + // Save current timestamp + m.lastTimestamp = time.Now() + + // Set this flag only if everything is initialized properly, all required files exist, ... + m.init = true + return err +} + +func (m *SchedstatCollector) ParseProcLine(linefields []string, tags map[string]string, output chan lp.CCMetric, now time.Time, tsdelta time.Duration) { + running, _ := strconv.ParseInt(linefields[7], 10, 64) + waiting, _ := strconv.ParseInt(linefields[8], 10, 64) + diff_running := running - m.olddata[linefields[0]]["running"] + diff_waiting := waiting - m.olddata[linefields[0]]["waiting"] + + var l_running float64 = float64(diff_running) / tsdelta.Seconds() / (math.Pow(1000, 3)) + var l_waiting float64 = float64(diff_waiting) / tsdelta.Seconds() / (math.Pow(1000, 3)) + + m.olddata[linefields[0]]["running"] = running + m.olddata[linefields[0]]["waiting"] = waiting + value := l_running + l_waiting + + y, err := lp.New("cpu_load_core", tags, m.meta, map[string]interface{}{"value": value}, now) + if err == nil { + // Send it to output channel + output <- y + } +} + +// Read collects all metrics belonging to the sample collector +// and sends them through the output channel to the collector manager +func (m *SchedstatCollector) Read(interval time.Duration, output chan lp.CCMetric) { + if !m.init { + return + } + + //timestamps + now := time.Now() + tsdelta := now.Sub(m.lastTimestamp) + + file, err := os.Open(string(SCHEDSTATFILE)) + if err != nil { + cclog.ComponentError(m.name, err.Error()) + } + defer file.Close() + + scanner := bufio.NewScanner(file) + for scanner.Scan() { + line := scanner.Text() + linefields := strings.Fields(line) + if strings.HasPrefix(linefields[0], "cpu") { + m.ParseProcLine(linefields, m.cputags[linefields[0]], output, now, tsdelta) + } + } + + m.lastTimestamp = now + +} + +// Close metric collector: close network connection, close files, close libraries, ... +// Called once by the collector manager +func (m *SchedstatCollector) Close() { + // Unset flag + m.init = false +} diff --git a/collectors/schedstatMetric.md b/collectors/schedstatMetric.md new file mode 100644 index 0000000..6369eca --- /dev/null +++ b/collectors/schedstatMetric.md @@ -0,0 +1,11 @@ + +## `schedstat` collector +```json + "schedstat": { + } +``` + +The `schedstat` collector reads data from /proc/schedstat and calculates a load value, separated by hwthread. This might be useful to detect bad cpu pinning on shared nodes etc. + +Metric: +* `cpu_load_core` \ No newline at end of file diff --git a/collectors/tempMetric.go b/collectors/tempMetric.go index af9d7fd..303be4a 100644 --- a/collectors/tempMetric.go +++ b/collectors/tempMetric.go @@ -3,14 +3,14 @@ package collectors import ( "encoding/json" "fmt" - "io/ioutil" + "os" "path/filepath" "strconv" "strings" "time" - cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" - lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" + lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric" ) // See: https://www.kernel.org/doc/html/latest/hwmon/sysfs-interface.html @@ -83,14 +83,14 @@ func (m *TempCollector) Init(config json.RawMessage) error { // sensor name nameFile := filepath.Join(filepath.Dir(file), "name") - name, err := ioutil.ReadFile(nameFile) + name, err := os.ReadFile(nameFile) if err == nil { sensor.name = strings.TrimSpace(string(name)) } // sensor label labelFile := strings.TrimSuffix(file, "_input") + "_label" - label, err := ioutil.ReadFile(labelFile) + label, err := os.ReadFile(labelFile) if err == nil { sensor.label = strings.TrimSpace(string(label)) } @@ -117,7 +117,7 @@ func (m *TempCollector) Init(config json.RawMessage) error { } // Sensor file - _, err = ioutil.ReadFile(file) + _, err = os.ReadFile(file) if err != nil { continue } @@ -139,7 +139,7 @@ func (m *TempCollector) Init(config json.RawMessage) error { // max temperature if m.config.ReportMaxTemp { maxTempFile := strings.TrimSuffix(file, "_input") + "_max" - if buffer, err := ioutil.ReadFile(maxTempFile); err == nil { + if buffer, err := os.ReadFile(maxTempFile); err == nil { if x, err := strconv.ParseInt(strings.TrimSpace(string(buffer)), 10, 64); err == nil { sensor.maxTempName = strings.Replace(sensor.metricName, "temp", "max_temp", 1) sensor.maxTemp = x / 1000 @@ -150,7 +150,7 @@ func (m *TempCollector) Init(config json.RawMessage) error { // critical temperature if m.config.ReportCriticalTemp { criticalTempFile := strings.TrimSuffix(file, "_input") + "_crit" - if buffer, err := ioutil.ReadFile(criticalTempFile); err == nil { + if buffer, err := os.ReadFile(criticalTempFile); err == nil { if x, err := strconv.ParseInt(strings.TrimSpace(string(buffer)), 10, 64); err == nil { sensor.critTempName = strings.Replace(sensor.metricName, "temp", "crit_temp", 1) sensor.critTemp = x / 1000 @@ -175,7 +175,7 @@ func (m *TempCollector) Read(interval time.Duration, output chan lp.CCMetric) { for _, sensor := range m.sensors { // Read sensor file - buffer, err := ioutil.ReadFile(sensor.file) + buffer, err := os.ReadFile(sensor.file) if err != nil { cclog.ComponentError( m.name, diff --git a/collectors/topprocsMetric.go b/collectors/topprocsMetric.go index 1f4aaca..08dbae0 100644 --- a/collectors/topprocsMetric.go +++ b/collectors/topprocsMetric.go @@ -9,7 +9,7 @@ import ( "strings" "time" - lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric" ) const MAX_NUM_PROCS = 10 diff --git a/docs/building.md b/docs/building.md new file mode 100644 index 0000000..968c454 --- /dev/null +++ b/docs/building.md @@ -0,0 +1,60 @@ +# Building the cc-metric-collector + +In most cases, a simple `make` in the main folder is enough to get a `cc-metric-collector` binary. It is basically a `go build` but some collectors require additional tasks. There is currently no Golang interface to LIKWID, so it uses `cgo` to create bindings but `cgo` requires the LIKWID header files. Therefore, it checks whether LIKWID is installed and if not it downloads LIKWID and copies the headers. + +## System integration + +The main configuration settings for system integration are pre-defined in `scripts/cc-metric-collector.config`. The file contains the UNIX user and group used for execution, the PID file location and other settings. Adjust it accordingly and copy it to `/etc/default/cc-metric-collector` + +```bash +$ install --mode 644 \ + --owner $CC_USER \ + --group $CC_GROUP \ + scripts/cc-metric-collector.config /etc/default/cc-metric-collector +$ edit /etc/default/cc-metric-collector +``` + +### SysVinit and similar + +If you are using a init system based in `/etc/init.d` daemons, you can use the sample `scripts/cc-metric-collector.init`. It reads the basic configuration from `/etc/default/cc-metric-collector` + +```bash +$ install --mode 755 \ + --owner $CC_USER \ + --group $CC_GROUP \ + scripts/cc-metric-collector.init /etc/init.d/cc-metric-collector +``` + +### Systemd + +If you are using `systemd` as init system, you can use the sample systemd service file `scripts/cc-metric-collector.service`, the configuration file `scripts/cc-metric-collector.config`. + +```bash +$ install --mode 644 \ + --owner $CC_USER \ + --group $CC_GROUP \ + scripts/cc-metric-collector.service /etc/systemd/system/cc-metric-collector.service +$ systemctl enable cc-metric-collector +``` + +## RPM + +In order to get a RPM packages for cc-metric-collector, just use: + +```bash +$ make RPM +``` + +It uses the RPM SPEC file `scripts/cc-metric-collector.spec` and requires the RPM tools (`rpm` and `rpmspec`) and `git`. + +## DEB + +In order to get very simple Debian packages for cc-metric-collector, just use: + +```bash +$ make DEB +``` + +It uses the DEB control file `scripts/cc-metric-collector.control` and requires `dpkg-deb`, `awk`, `sed` and `git`. It creates only a binary deb package. + +_This option is not well tested and therefore experimental_ \ No newline at end of file diff --git a/docs/introduction.md b/docs/introduction.md new file mode 100644 index 0000000..d8c351a --- /dev/null +++ b/docs/introduction.md @@ -0,0 +1,23 @@ +# The ClusterCockpit Project + +The ClusterCockpit project is a joined project of computing centers in Europe to set up a cluster monitoring stack for small to mid-sized computing centers under the lead of NHR@FAU. + +# The ClusterCockpit Stack + +In cluster environment, there are commonly a lot of systems dedicated for computation, backend servers for file systems and frontend servers for the user interaction and cluster control. The ClusterCockpit Stack is mainly used for monitoring the compute systems with some interaction to the frontend servers. It consists of multiple components: + +- cc-metric-collector: Monitor resource usage on the compute systems +- cc-metric-store: In-memory database +- cc-backend & cc-frontend: The web-based visualizer + +# CC Metric Collector + +The CC Metric Collector project was started to provide a useful set of metrics for HPC and data science related compute systems. It runs as a system daemon and gathers system data periodically to forward the metrics to one or more databases. One of the provided backends can be used for the cc-metric-store but many others exist like InfluxDB time-series databases, the Ganglia Monitoring System or the Prometheus Monitoring System. + +The data is gathered by so-called "Collectors", forwarded to an internal router for on-the-fly manipulation (tagging, aggregation, ...) which pushes the metrics to the different metric writers called "Sinks". There is a forth component, the "Receivers", which receive data through some networking system like a HTTP server at any time. + +# CC Metric Store +The CC Metric Store is a data management system with short-term in-memory and long-term file-base metric storage. + +# CC Backend and CC Frontend +The CC Backend and Frontend form together the web interface for ClusterCockpit. \ No newline at end of file diff --git a/go.mod b/go.mod index c1d66f2..c6f53dc 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,7 @@ require ( github.com/nats-io/nats.go v1.16.0 github.com/prometheus/client_golang v1.12.2 github.com/stmcginnis/gofish v0.13.0 + github.com/tklauser/go-sysconf v0.3.10 golang.org/x/sys v0.0.0-20220712014510-0a85c31ab51e ) @@ -31,6 +32,7 @@ require ( github.com/prometheus/common v0.37.0 // indirect github.com/prometheus/procfs v0.7.3 // indirect github.com/shopspring/decimal v1.3.1 // indirect + github.com/tklauser/numcpus v0.4.0 // indirect golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d // indirect golang.org/x/net v0.0.0-20220708220712-1185a9018129 // indirect google.golang.org/protobuf v1.28.0 // indirect diff --git a/go.sum b/go.sum index 9f65f7e..03d7b08 100644 --- a/go.sum +++ b/go.sum @@ -287,6 +287,10 @@ github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/tklauser/go-sysconf v0.3.10 h1:IJ1AZGZRWbY8T5Vfk04D9WOA5WSejdflXxP03OUqALw= +github.com/tklauser/go-sysconf v0.3.10/go.mod h1:C8XykCvCb+Gn0oNCWPIlcb0RuglQTYaQ2hGm7jmxEFk= +github.com/tklauser/numcpus v0.4.0 h1:E53Dm1HjH1/R2/aoCtXtPgzmElmn51aOkhCFSuZq//o= +github.com/tklauser/numcpus v0.4.0/go.mod h1:1+UI3pD8NW14VMwdgJNJ1ESk2UnwhAnz5hMwiKKqXCQ= github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw= github.com/ugorji/go v1.2.7/go.mod h1:nF9osbDWLy6bDVv/Rtoh6QgnvNDpmCalQV5urGCCS6M= github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY= @@ -445,6 +449,7 @@ golang.org/x/sys v0.0.0-20211019181941-9d821ace8654/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20211103235746-7861aae1554b/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220128215802-99c3d69c2c27/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220513210249-45d2b4557a2a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220712014510-0a85c31ab51e h1:NHvCuwuS43lGnYhten69ZWqi2QOj/CiDNcKbVqwVoew= golang.org/x/sys v0.0.0-20220712014510-0a85c31ab51e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/internal/ccMetric/README.md b/internal/ccMetric/README.md deleted file mode 100644 index 1787ff0..0000000 --- a/internal/ccMetric/README.md +++ /dev/null @@ -1,32 +0,0 @@ -# ClusterCockpit metrics - -As described in the [ClusterCockpit specifications](https://github.com/ClusterCockpit/cc-specifications), the whole ClusterCockpit stack uses metrics in the InfluxDB line protocol format. This is also the input and output format for the ClusterCockpit Metric Collector but internally it uses an extended format while processing, named CCMetric. - -It is basically a copy of the [InfluxDB line protocol](https://github.com/influxdata/line-protocol) `MutableMetric` interface with one extension. Besides the tags and fields, it contains a list of meta information (re-using the `Tag` structure of the original protocol): - -```golang -type ccMetric struct { - name string // same as - tags []*influx.Tag // original - fields []*influx.Field // Influx - tm time.Time // line-protocol - meta []*influx.Tag -} - -type CCMetric interface { - influx.MutableMetric // the same functions as defined by influx.MutableMetric - RemoveTag(key string) // this is not published by the original influx.MutableMetric - Meta() map[string]string - MetaList() []*inlux.Tag - AddMeta(key, value string) - HasMeta(key string) bool - GetMeta(key string) (string, bool) - RemoveMeta(key string) -} -``` - -The `CCMetric` interface provides the same functions as the `MutableMetric` like `{Add, Remove, Has}{Tag, Field}` and additionally provides `{Add, Remove, Has}Meta`. - -The InfluxDB protocol creates a new metric with `influx.New(name, tags, fields, time)` while CCMetric uses `ccMetric.New(name, tags, meta, fields, time)` where `tags` and `meta` are both of type `map[string]string`. - -You can copy a CCMetric with `FromMetric(other CCMetric) CCMetric`. If you get an `influx.Metric` from a function, like the line protocol parser, you can use `FromInfluxMetric(other influx.Metric) CCMetric` to get a CCMetric out of it (see `NatsReceiver` for an example). diff --git a/internal/metricAggregator/metricAggregator.go b/internal/metricAggregator/metricAggregator.go index f5c7ada..50bc963 100644 --- a/internal/metricAggregator/metricAggregator.go +++ b/internal/metricAggregator/metricAggregator.go @@ -9,10 +9,10 @@ import ( "sync" "time" - cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" + cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" - lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" - topo "github.com/ClusterCockpit/cc-metric-collector/internal/ccTopology" + lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric" + topo "github.com/ClusterCockpit/cc-metric-collector/pkg/ccTopology" "github.com/PaesslerAG/gval" ) diff --git a/internal/metricAggregator/metricAggregatorFunctions.go b/internal/metricAggregator/metricAggregatorFunctions.go index 945dc6d..68d205b 100644 --- a/internal/metricAggregator/metricAggregatorFunctions.go +++ b/internal/metricAggregator/metricAggregatorFunctions.go @@ -8,8 +8,8 @@ import ( "sort" "strings" - cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" - topo "github.com/ClusterCockpit/cc-metric-collector/internal/ccTopology" + cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" + topo "github.com/ClusterCockpit/cc-metric-collector/pkg/ccTopology" ) /* diff --git a/internal/metricRouter/metricCache.go b/internal/metricRouter/metricCache.go index 8886f47..81e69a9 100644 --- a/internal/metricRouter/metricCache.go +++ b/internal/metricRouter/metricCache.go @@ -4,11 +4,11 @@ import ( "sync" "time" - cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" + cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" - lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" agg "github.com/ClusterCockpit/cc-metric-collector/internal/metricAggregator" - mct "github.com/ClusterCockpit/cc-metric-collector/internal/multiChanTicker" + lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric" + mct "github.com/ClusterCockpit/cc-metric-collector/pkg/multiChanTicker" ) type metricCachePeriod struct { diff --git a/internal/metricRouter/metricRouter.go b/internal/metricRouter/metricRouter.go index 2614ced..32ac0f3 100644 --- a/internal/metricRouter/metricRouter.go +++ b/internal/metricRouter/metricRouter.go @@ -7,11 +7,11 @@ import ( "sync" "time" - cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" + cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" - lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" agg "github.com/ClusterCockpit/cc-metric-collector/internal/metricAggregator" - mct "github.com/ClusterCockpit/cc-metric-collector/internal/multiChanTicker" + lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric" + mct "github.com/ClusterCockpit/cc-metric-collector/pkg/multiChanTicker" units "github.com/ClusterCockpit/cc-units" ) diff --git a/internal/ccLogger/cclogger.go b/pkg/ccLogger/cclogger.go similarity index 100% rename from internal/ccLogger/cclogger.go rename to pkg/ccLogger/cclogger.go diff --git a/pkg/ccMetric/README.md b/pkg/ccMetric/README.md new file mode 100644 index 0000000..71a3a6e --- /dev/null +++ b/pkg/ccMetric/README.md @@ -0,0 +1,57 @@ +# ClusterCockpit metrics + +As described in the [ClusterCockpit specifications](https://github.com/ClusterCockpit/cc-specifications), the whole ClusterCockpit stack uses metrics in the InfluxDB line protocol format. This is also the input and output format for the ClusterCockpit Metric Collector but internally it uses an extended format while processing, named CCMetric. + +It is basically a copy of the [InfluxDB line protocol](https://github.com/influxdata/line-protocol) `MutableMetric` interface with one extension. Besides the tags and fields, it contains a list of meta information (re-using the `Tag` structure of the original protocol): + +```golang +type ccMetric struct { + name string // Measurement name + meta map[string]string // map of meta data tags + tags map[string]string // map of of tags + fields map[string]interface{} // map of of fields + tm time.Time // timestamp +} + +type CCMetric interface { + ToPoint(metaAsTags map[string]bool) *write.Point // Generate influxDB point for data type ccMetric + ToLineProtocol(metaAsTags map[string]bool) string // Generate influxDB line protocol for data type ccMetric + String() string // Return line-protocol like string + + Name() string // Get metric name + SetName(name string) // Set metric name + + Time() time.Time // Get timestamp + SetTime(t time.Time) // Set timestamp + + Tags() map[string]string // Map of tags + AddTag(key, value string) // Add a tag + GetTag(key string) (value string, ok bool) // Get a tag by its key + HasTag(key string) (ok bool) // Check if a tag key is present + RemoveTag(key string) // Remove a tag by its key + + Meta() map[string]string // Map of meta data tags + AddMeta(key, value string) // Add a meta data tag + GetMeta(key string) (value string, ok bool) // Get a meta data tab addressed by its key + HasMeta(key string) (ok bool) // Check if a meta data key is present + RemoveMeta(key string) // Remove a meta data tag by its key + + Fields() map[string]interface{} // Map of fields + AddField(key string, value interface{}) // Add a field + GetField(key string) (value interface{}, ok bool) // Get a field addressed by its key + HasField(key string) (ok bool) // Check if a field key is present + RemoveField(key string) // Remove a field addressed by its key +} + +func New(name string, tags map[string]string, meta map[string]string, fields map[string]interface{}, tm time.Time) (CCMetric, error) +func FromMetric(other CCMetric) CCMetric +func FromInfluxMetric(other lp.Metric) CCMetric +``` + +The `CCMetric` interface provides the same functions as the `MutableMetric` like `{Add, Get, Remove, Has}{Tag, Field}` and additionally provides `{Add, Get, Remove, Has}Meta`. + +The InfluxDB protocol creates a new metric with `influx.New(name, tags, fields, time)` while CCMetric uses `ccMetric.New(name, tags, meta, fields, time)` where `tags` and `meta` are both of type `map[string]string`. + +You can copy a CCMetric with `FromMetric(other CCMetric) CCMetric`. If you get an `influx.Metric` from a function, like the line protocol parser, you can use `FromInfluxMetric(other influx.Metric) CCMetric` to get a CCMetric out of it (see `NatsReceiver` for an example). + +Although the [cc-specifications](https://github.com/ClusterCockpit/cc-specifications/blob/master/interfaces/lineprotocol/README.md) defines that there is only a `value` field for the metric value, the CCMetric still can have multiple values similar to the InfluxDB line protocol. diff --git a/internal/ccMetric/ccMetric.go b/pkg/ccMetric/ccMetric.go similarity index 95% rename from internal/ccMetric/ccMetric.go rename to pkg/ccMetric/ccMetric.go index 661b9a4..8ad18cc 100644 --- a/internal/ccMetric/ccMetric.go +++ b/pkg/ccMetric/ccMetric.go @@ -50,6 +50,7 @@ type CCMetric interface { GetField(key string) (value interface{}, ok bool) // Get a field addressed by its key HasField(key string) (ok bool) // Check if a field key is present RemoveField(key string) // Remove a field addressed by its key + String() string // Return line-protocol like string } // String implements the stringer interface for data type ccMetric @@ -217,23 +218,26 @@ func New( } // FromMetric copies the metric -func FromMetric(other ccMetric) CCMetric { +func FromMetric(other CCMetric) CCMetric { + otags := other.Tags() + ometa := other.Meta() + ofields := other.Fields() m := &ccMetric{ name: other.Name(), - tags: make(map[string]string, len(other.tags)), - meta: make(map[string]string, len(other.meta)), - fields: make(map[string]interface{}, len(other.fields)), + tags: make(map[string]string, len(otags)), + meta: make(map[string]string, len(ometa)), + fields: make(map[string]interface{}, len(ofields)), tm: other.Time(), } // deep copy tags, meta data tags and fields - for key, value := range other.tags { + for key, value := range otags { m.tags[key] = value } - for key, value := range other.meta { + for key, value := range ometa { m.meta[key] = value } - for key, value := range other.fields { + for key, value := range ofields { m.fields[key] = value } return m diff --git a/internal/ccTopology/ccTopology.go b/pkg/ccTopology/ccTopology.go similarity index 99% rename from internal/ccTopology/ccTopology.go rename to pkg/ccTopology/ccTopology.go index 0ed8883..f7d7092 100644 --- a/internal/ccTopology/ccTopology.go +++ b/pkg/ccTopology/ccTopology.go @@ -10,7 +10,7 @@ import ( "strconv" "strings" - cclogger "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" + cclogger "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" ) const SYSFS_NUMABASE = `/sys/devices/system/node` diff --git a/internal/multiChanTicker/README.md b/pkg/multiChanTicker/README.md similarity index 100% rename from internal/multiChanTicker/README.md rename to pkg/multiChanTicker/README.md diff --git a/internal/multiChanTicker/multiChanTicker.go b/pkg/multiChanTicker/multiChanTicker.go similarity index 93% rename from internal/multiChanTicker/multiChanTicker.go rename to pkg/multiChanTicker/multiChanTicker.go index e0eca43..1791474 100644 --- a/internal/multiChanTicker/multiChanTicker.go +++ b/pkg/multiChanTicker/multiChanTicker.go @@ -3,7 +3,7 @@ package multiChanTicker import ( "time" - cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" + cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" ) type multiChanTicker struct { diff --git a/receivers/httpReceiver.go b/receivers/httpReceiver.go index e66ad5e..efd31ac 100644 --- a/receivers/httpReceiver.go +++ b/receivers/httpReceiver.go @@ -5,13 +5,13 @@ import ( "encoding/json" "errors" "fmt" - "io/ioutil" + "io" "net/http" "strings" "sync" - cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" - lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" + lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric" "github.com/gorilla/mux" influx "github.com/influxdata/line-protocol" ) @@ -84,7 +84,7 @@ func (r *HttpReceiver) ServerHttp(w http.ResponseWriter, req *http.Request) { return } - body, err := ioutil.ReadAll(req.Body) + body, err := io.ReadAll(req.Body) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return diff --git a/receivers/metricReceiver.go b/receivers/metricReceiver.go index 6b85fd4..1edef4e 100644 --- a/receivers/metricReceiver.go +++ b/receivers/metricReceiver.go @@ -1,7 +1,7 @@ package receivers import ( - lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric" ) type defaultReceiverConfig struct { diff --git a/receivers/natsReceiver.go b/receivers/natsReceiver.go index 1a5f47b..095a7ee 100644 --- a/receivers/natsReceiver.go +++ b/receivers/natsReceiver.go @@ -6,8 +6,8 @@ import ( "fmt" "time" - cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" - lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" + lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric" influx "github.com/influxdata/line-protocol" nats "github.com/nats-io/nats.go" ) diff --git a/receivers/prometheusReceiver.go b/receivers/prometheusReceiver.go index c22969d..7846c1d 100644 --- a/receivers/prometheusReceiver.go +++ b/receivers/prometheusReceiver.go @@ -12,8 +12,8 @@ import ( "sync" "time" - cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" - lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" + lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric" ) type PrometheusReceiverConfig struct { diff --git a/receivers/receiveManager.go b/receivers/receiveManager.go index 7a20fac..c47c3cc 100644 --- a/receivers/receiveManager.go +++ b/receivers/receiveManager.go @@ -5,8 +5,8 @@ import ( "os" "sync" - cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" - lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" + lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric" ) var AvailableReceivers = map[string]func(name string, config json.RawMessage) (Receiver, error){ diff --git a/receivers/redfishReceiver.go b/receivers/redfishReceiver.go index d50cbc7..de9744d 100644 --- a/receivers/redfishReceiver.go +++ b/receivers/redfishReceiver.go @@ -1,239 +1,578 @@ package receivers import ( + "crypto/tls" "encoding/json" "fmt" + "net/http" "strconv" + "strings" "sync" "time" - cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" - lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" + lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric" // See: https://pkg.go.dev/github.com/stmcginnis/gofish "github.com/stmcginnis/gofish" + "github.com/stmcginnis/gofish/common" + "github.com/stmcginnis/gofish/redfish" ) +type RedfishReceiverClientConfig struct { + + // Hostname the redfish service belongs to + Hostname string + + // is metric excluded globally or per client + isExcluded map[string](bool) + + doPowerMetric bool + doProcessorMetrics bool + doThermalMetrics bool + + skipProcessorMetricsURL map[string]bool + + gofish gofish.ClientConfig +} + // RedfishReceiver configuration: type RedfishReceiver struct { receiver + config struct { - Type string `json:"type"` - Fanout int `json:"fanout,omitempty"` // Default fanout: 64 - Interval int `json:"interval,omitempty"` // Default interval: 30s + fanout int + Interval time.Duration + HttpTimeout time.Duration // Client config for each redfish service - ClientConfigs []struct { - Hostname *string `json:"hostname"` - Username *string `json:"username"` - Password *string `json:"password"` - Endpoint *string `json:"endpoint"` - Insecure *bool `json:"insecure,omitempty"` - ExcludeMetrics []string `json:"exclude_metrics,omitempty"` - gofish gofish.ClientConfig - } `json:"client_config"` + ClientConfigs []RedfishReceiverClientConfig } done chan bool // channel to finish / stop redfish receiver wg sync.WaitGroup // wait group for redfish receiver } -// Start starts the redfish receiver -func (r *RedfishReceiver) Start() { - cclog.ComponentDebug(r.name, "START") +// readThermalMetrics reads thermal metrics from a redfish device +func (r *RedfishReceiver) readThermalMetrics( + clientConfig *RedfishReceiverClientConfig, + chassis *redfish.Chassis) error { - // readPowerMetric reads readfish power metric from the endpoint configured in conf - readPowerMetric := func(clientConfigIndex int) error { - - clientConfig := &r.config.ClientConfigs[clientConfigIndex] - - // Connect to redfish service - c, err := gofish.Connect(clientConfig.gofish) - if err != nil { - c := struct { - Username string - Endpoint string - BasicAuth bool - Insecure bool - }{ - Username: clientConfig.gofish.Username, - Endpoint: clientConfig.gofish.Endpoint, - BasicAuth: clientConfig.gofish.BasicAuth, - Insecure: clientConfig.gofish.Insecure, - } - return fmt.Errorf("readPowerMetric: gofish.Connect(%+v) failed: %v", c, err) - } - defer c.Logout() - - // Get all chassis managed by this service - chassis_list, err := c.Service.Chassis() - if err != nil { - return fmt.Errorf("readPowerMetric: c.Service.Chassis() failed: %v", err) - } - - for _, chassis := range chassis_list { - timestamp := time.Now() - - // Get power information for each chassis - power, err := chassis.Power() - if err != nil { - return fmt.Errorf("readPowerMetric: chassis.Power() failed: %v", err) - } - if power == nil { - continue - } - - // Read min, max and average consumed watts for each power control - for _, pc := range power.PowerControl { - - // Map of collected metrics - metrics := map[string]float32{ - // PowerConsumedWatts shall represent the actual power being consumed (in - // Watts) by the chassis - "consumed_watts": pc.PowerConsumedWatts, - // AverageConsumedWatts shall represent the - // average power level that occurred averaged over the last IntervalInMin - // minutes. - "average_consumed_watts": pc.PowerMetrics.AverageConsumedWatts, - // MinConsumedWatts shall represent the - // minimum power level in watts that occurred within the last - // IntervalInMin minutes. - "min_consumed_watts": pc.PowerMetrics.MinConsumedWatts, - // MaxConsumedWatts shall represent the - // maximum power level in watts that occurred within the last - // IntervalInMin minutes - "max_consumed_watts": pc.PowerMetrics.MaxConsumedWatts, - } - intervalInMin := strconv.FormatFloat(float64(pc.PowerMetrics.IntervalInMin), 'f', -1, 32) - - // Metrics to exclude - for _, key := range clientConfig.ExcludeMetrics { - delete(metrics, key) - } - - // Set tags - tags := map[string]string{ - "hostname": *clientConfig.Hostname, - "type": "node", - // ID uniquely identifies the resource - "id": pc.ID, - // MemberID shall uniquely identify the member within the collection. For - // services supporting Redfish v1.6 or higher, this value shall be the - // zero-based array index. - "member_id": pc.MemberID, - // PhysicalContext shall be a description of the affected device(s) or region - // within the chassis to which this power control applies. - "physical_context": string(pc.PhysicalContext), - // Name - "power_control_name": pc.Name, - } - - // Delete empty tags - for key, value := range tags { - if value == "" { - delete(tags, key) - } - } - - // Set meta data tags - meta := map[string]string{ - "source": r.name, - "group": "Energy", - "interval_in_minutes": intervalInMin, - "unit": "watts", - } - - // Delete empty meta data tags - for key, value := range meta { - if value == "" { - delete(meta, key) - } - } - - for name, value := range metrics { - - y, err := lp.New(name, tags, meta, - map[string]interface{}{ - "value": value, - }, - timestamp) - if err == nil { - r.sink <- y - } - } - } - } + // Get thermal information for each chassis + thermal, err := chassis.Thermal() + if err != nil { + return fmt.Errorf("readMetrics: chassis.Thermal() failed: %v", err) + } + // Skip empty thermal information + if thermal == nil { return nil } - // doReadPowerMetric read power metrics for all configure redfish services. - // To compensate latencies of the Redfish services a fanout is used. - doReadPowerMetric := func() { + timestamp := time.Now() - // Compute fanout to use - realFanout := r.config.Fanout - if len(r.config.ClientConfigs) < realFanout { - realFanout = len(r.config.ClientConfigs) + for _, temperature := range thermal.Temperatures { + + // Skip, when temperature metric is excluded + if clientConfig.isExcluded["temperature"] { + break } - // Create wait group and input channel for workers - var workerWaitGroup sync.WaitGroup - workerInput := make(chan int, realFanout) - - // Create worker go routines - for i := 0; i < realFanout; i++ { - // Increment worker wait group counter - workerWaitGroup.Add(1) - go func() { - // Decrement worker wait group counter - defer workerWaitGroup.Done() - - // Read power metrics for each client config - for clientConfigIndex := range workerInput { - err := readPowerMetric(clientConfigIndex) - if err != nil { - cclog.ComponentError(r.name, err) - } - } - }() + // Skip all temperatures which are not in enabled state + if temperature.Status.State != "" && temperature.Status.State != common.EnabledState { + continue } - // Distribute client configs to workers - for i := range r.config.ClientConfigs { - // Check done channel status - select { - case workerInput <- i: - case <-r.done: - // process done event - // Stop workers, clear channel and wait for all workers to finish - close(workerInput) - for range workerInput { - } - workerWaitGroup.Wait() - return + tags := map[string]string{ + "hostname": clientConfig.Hostname, + "type": "node", + // ChassisType shall indicate the physical form factor for the type of chassis + "chassis_typ": string(chassis.ChassisType), + // Chassis name + "chassis_name": chassis.Name, + // ID uniquely identifies the resource + "temperature_id": temperature.ID, + // MemberID shall uniquely identify the member within the collection. For + // services supporting Redfish v1.6 or higher, this value shall be the + // zero-based array index. + "temperature_member_id": temperature.MemberID, + // PhysicalContext shall be a description of the affected device or region + // within the chassis to which this temperature measurement applies + "temperature_physical_context": string(temperature.PhysicalContext), + // Name + "temperature_name": temperature.Name, + } + + // Delete empty tags + for key, value := range tags { + if value == "" { + delete(tags, key) } } - // Stop workers and wait for all workers to finish - close(workerInput) - workerWaitGroup.Wait() + // Set meta data tags + meta := map[string]string{ + "source": r.name, + "group": "Temperature", + "unit": "degC", + } + + // ReadingCelsius shall be the current value of the temperature sensor's reading. + value := temperature.ReadingCelsius + + y, err := lp.New("temperature", tags, meta, + map[string]interface{}{ + "value": value, + }, + timestamp) + if err == nil { + r.sink <- y + } } + for _, fan := range thermal.Fans { + // Skip, when fan_speed metric is excluded + if clientConfig.isExcluded["fan_speed"] { + break + } + + // Skip all fans which are not in enabled state + if fan.Status.State != common.EnabledState { + continue + } + + tags := map[string]string{ + "hostname": clientConfig.Hostname, + "type": "node", + // ChassisType shall indicate the physical form factor for the type of chassis + "chassis_typ": string(chassis.ChassisType), + // Chassis name + "chassis_name": chassis.Name, + // ID uniquely identifies the resource + "fan_id": fan.ID, + // MemberID shall uniquely identify the member within the collection. For + // services supporting Redfish v1.6 or higher, this value shall be the + // zero-based array index. + "fan_member_id": fan.MemberID, + // PhysicalContext shall be a description of the affected device or region + // within the chassis to which this fan is associated + "fan_physical_context": string(fan.PhysicalContext), + // Name + "fan_name": fan.Name, + } + + // Delete empty tags + for key, value := range tags { + if value == "" { + delete(tags, key) + } + } + + // Set meta data tags + meta := map[string]string{ + "source": r.name, + "group": "FanSpeed", + "unit": string(fan.ReadingUnits), + } + + // Reading shall be the current value of the fan sensor's reading + value := fan.Reading + + y, err := lp.New("fan_speed", tags, meta, + map[string]interface{}{ + "value": value, + }, + timestamp) + if err == nil { + r.sink <- y + } + } + + return nil +} + +// readPowerMetrics reads power metrics from a redfish device +func (r *RedfishReceiver) readPowerMetrics( + clientConfig *RedfishReceiverClientConfig, + chassis *redfish.Chassis) error { + + // Get power information for each chassis + power, err := chassis.Power() + if err != nil { + return fmt.Errorf("readMetrics: chassis.Power() failed: %v", err) + } + + // Skip empty power information + if power == nil { + return nil + } + + timestamp := time.Now() + + // Read min, max and average consumed watts for each power control + for _, pc := range power.PowerControl { + + // Skip all power controls which are not in enabled state + if pc.Status.State != "" && pc.Status.State != common.EnabledState { + continue + } + + // Map of collected metrics + metrics := make(map[string]float32) + + // PowerConsumedWatts shall represent the actual power being consumed (in + // Watts) by the chassis + if !clientConfig.isExcluded["consumed_watts"] { + metrics["consumed_watts"] = pc.PowerConsumedWatts + } + // AverageConsumedWatts shall represent the + // average power level that occurred averaged over the last IntervalInMin + // minutes. + if !clientConfig.isExcluded["average_consumed_watts"] { + metrics["average_consumed_watts"] = pc.PowerMetrics.AverageConsumedWatts + } + // MinConsumedWatts shall represent the + // minimum power level in watts that occurred within the last + // IntervalInMin minutes. + if !clientConfig.isExcluded["min_consumed_watts"] { + metrics["min_consumed_watts"] = pc.PowerMetrics.MinConsumedWatts + } + // MaxConsumedWatts shall represent the + // maximum power level in watts that occurred within the last + // IntervalInMin minutes + if !clientConfig.isExcluded["max_consumed_watts"] { + metrics["max_consumed_watts"] = pc.PowerMetrics.MaxConsumedWatts + } + // IntervalInMin shall represent the time interval (or window), in minutes, + // in which the PowerMetrics properties are measured over. + // Should be an integer, but some Dell implementations return as a float + intervalInMin := + strconv.FormatFloat( + float64(pc.PowerMetrics.IntervalInMin), 'f', -1, 32) + + // Set tags + tags := map[string]string{ + "hostname": clientConfig.Hostname, + "type": "node", + // ChassisType shall indicate the physical form factor for the type of chassis + "chassis_typ": string(chassis.ChassisType), + // Chassis name + "chassis_name": chassis.Name, + // ID uniquely identifies the resource + "power_control_id": pc.ID, + // MemberID shall uniquely identify the member within the collection. For + // services supporting Redfish v1.6 or higher, this value shall be the + // zero-based array index. + "power_control_member_id": pc.MemberID, + // PhysicalContext shall be a description of the affected device(s) or region + // within the chassis to which this power control applies. + "power_control_physical_context": string(pc.PhysicalContext), + // Name + "power_control_name": pc.Name, + } + + // Delete empty tags + for key, value := range tags { + if value == "" { + delete(tags, key) + } + } + + // Set meta data tags + meta := map[string]string{ + "source": r.name, + "group": "Energy", + "interval_in_minutes": intervalInMin, + "unit": "watts", + } + + // Delete empty meta data tags + for key, value := range meta { + if value == "" { + delete(meta, key) + } + } + + for name, value := range metrics { + + y, err := lp.New(name, tags, meta, + map[string]interface{}{ + "value": value, + }, + timestamp) + if err == nil { + r.sink <- y + } + } + } + + return nil +} + +// readProcessorMetrics reads processor metrics from a redfish device +// See: https://redfish.dmtf.org/schemas/v1/ProcessorMetrics.json +func (r *RedfishReceiver) readProcessorMetrics( + clientConfig *RedfishReceiverClientConfig, + processor *redfish.Processor) error { + + timestamp := time.Now() + + // URL to processor metrics + URL := processor.ODataID + "/ProcessorMetrics" + + // Skip previously detected non existing URLs + if clientConfig.skipProcessorMetricsURL[URL] { + return nil + } + + resp, err := processor.Client.Get(URL) + if err != nil { + // Skip non existing URLs + if statusCode := err.(*common.Error).HTTPReturnedStatusCode; statusCode == http.StatusNotFound { + clientConfig.skipProcessorMetricsURL[URL] = true + return nil + } + + return fmt.Errorf("processor.Client.Get(%v) failed: %+w", URL, err) + } + + var processorMetrics struct { + common.Entity + ODataType string `json:"@odata.type"` + ODataEtag string `json:"@odata.etag"` + Description string `json:"Description"` + // This property shall contain the power, in watts, that the processor has consumed. + ConsumedPowerWatt float32 `json:"ConsumedPowerWatt"` + // This property shall contain the temperature, in Celsius, of the processor. + TemperatureCelsius float32 `json:"TemperatureCelsius"` + } + err = json.NewDecoder(resp.Body).Decode(&processorMetrics) + if err != nil { + return fmt.Errorf("unable to decode JSON for processor metrics: %+w", err) + } + processorMetrics.SetClient(processor.Client) + + // Set tags + tags := map[string]string{ + "hostname": clientConfig.Hostname, + "type": "socket", + // ProcessorType shall contain the string which identifies the type of processor contained in this Socket + "processor_typ": string(processor.ProcessorType), + // Processor name + "processor_name": processor.Name, + // ID uniquely identifies the resource + "processor_id": processor.ID, + } + + // Delete empty tags + for key, value := range tags { + if value == "" { + delete(tags, key) + } + } + + // Set meta data tags + metaPower := map[string]string{ + "source": r.name, + "group": "Energy", + "unit": "watts", + } + + namePower := "consumed_power" + + if !clientConfig.isExcluded[namePower] { + y, err := lp.New(namePower, tags, metaPower, + map[string]interface{}{ + "value": processorMetrics.ConsumedPowerWatt, + }, + timestamp) + if err == nil { + r.sink <- y + } + } + // Set meta data tags + metaThermal := map[string]string{ + "source": r.name, + "group": "Temperature", + "unit": "degC", + } + + nameThermal := "temperature" + + if !clientConfig.isExcluded[nameThermal] { + y, err := lp.New(nameThermal, tags, metaThermal, + map[string]interface{}{ + "value": processorMetrics.TemperatureCelsius, + }, + timestamp) + if err == nil { + r.sink <- y + } + } + return nil +} + +// readMetrics reads redfish thermal, power and processor metrics from the redfish device +// configured in clientConfig +func (r *RedfishReceiver) readMetrics(clientConfig *RedfishReceiverClientConfig) error { + + // Connect to redfish service + c, err := gofish.Connect(clientConfig.gofish) + if err != nil { + return fmt.Errorf( + "readMetrics: gofish.Connect({Username: %v, Endpoint: %v, BasicAuth: %v, HttpTimeout: %v, HttpInsecure: %v}) failed: %v", + clientConfig.gofish.Username, + clientConfig.gofish.Endpoint, + clientConfig.gofish.BasicAuth, + clientConfig.gofish.HTTPClient.Timeout, + clientConfig.gofish.HTTPClient.Transport.(*http.Transport).TLSClientConfig.InsecureSkipVerify, + err) + } + defer c.Logout() + + // Create a session, when required + if _, err = c.GetSession(); err != nil { + c, err = c.CloneWithSession() + if err != nil { + return fmt.Errorf("readMetrics: Failed to create a session: %+w", err) + } + } + + // Get all chassis managed by this service + isChassisListRequired := + clientConfig.doThermalMetrics || + clientConfig.doPowerMetric + var chassisList []*redfish.Chassis + if isChassisListRequired { + chassisList, err = c.Service.Chassis() + if err != nil { + return fmt.Errorf("readMetrics: c.Service.Chassis() failed: %v", err) + } + } + + // Get all computer systems managed by this service + isComputerSystemListRequired := clientConfig.doProcessorMetrics + var computerSystemList []*redfish.ComputerSystem + if isComputerSystemListRequired { + computerSystemList, err = c.Service.Systems() + if err != nil { + return fmt.Errorf("readMetrics: c.Service.Systems() failed: %v", err) + } + } + + // read thermal metrics + if clientConfig.doThermalMetrics { + for _, chassis := range chassisList { + err := r.readThermalMetrics(clientConfig, chassis) + if err != nil { + return err + } + } + } + + // read power metrics + if clientConfig.doPowerMetric { + for _, chassis := range chassisList { + err = r.readPowerMetrics(clientConfig, chassis) + if err != nil { + return err + } + } + } + + // read processor metrics + if clientConfig.doProcessorMetrics { + // loop for all computer systems + for _, system := range computerSystemList { + + // loop for all processors + processors, err := system.Processors() + if err != nil { + return fmt.Errorf("readMetrics: system.Processors() failed: %v", err) + } + for _, processor := range processors { + err := r.readProcessorMetrics(clientConfig, processor) + if err != nil { + return err + } + } + } + } + + return nil +} + +// doReadMetrics reads metrics from all configure redfish devices. +// To compensate latencies of the Redfish devices a fanout is used. +func (r *RedfishReceiver) doReadMetric() { + + // Create wait group and input channel for workers + var workerWaitGroup sync.WaitGroup + workerInput := make(chan *RedfishReceiverClientConfig, r.config.fanout) + + // Create worker go routines + for i := 0; i < r.config.fanout; i++ { + // Increment worker wait group counter + workerWaitGroup.Add(1) + go func() { + // Decrement worker wait group counter + defer workerWaitGroup.Done() + + // Read power metrics for each client config + for clientConfig := range workerInput { + err := r.readMetrics(clientConfig) + if err != nil { + cclog.ComponentError(r.name, err) + } + } + }() + } + + // Distribute client configs to workers + for i := range r.config.ClientConfigs { + + // Check done channel status + select { + case workerInput <- &r.config.ClientConfigs[i]: + case <-r.done: + // process done event + // Stop workers, clear channel and wait for all workers to finish + close(workerInput) + for range workerInput { + } + workerWaitGroup.Wait() + return + } + } + + // Stop workers and wait for all workers to finish + close(workerInput) + workerWaitGroup.Wait() +} + +// Start starts the redfish receiver +func (r *RedfishReceiver) Start() { + cclog.ComponentDebug(r.name, "START") + // Start redfish receiver r.wg.Add(1) go func() { defer r.wg.Done() // Create ticker - ticker := time.NewTicker(time.Duration(r.config.Interval) * time.Second) + ticker := time.NewTicker(r.config.Interval) defer ticker.Stop() for { - doReadPowerMetric() + r.doReadMetric() select { - case <-ticker.C: + case tickerTime := <-ticker.C: + // Check if we missed the ticker event + if since := time.Since(tickerTime); since > 5*time.Second { + cclog.ComponentInfo(r.name, "Missed ticker event for more then", since) + } + // process ticker event -> continue continue case <-r.done: @@ -246,7 +585,7 @@ func (r *RedfishReceiver) Start() { cclog.ComponentDebug(r.name, "STARTED") } -// Close redfish receiver +// Close closes the redfish receiver func (r *RedfishReceiver) Close() { cclog.ComponentDebug(r.name, "CLOSE") @@ -257,68 +596,227 @@ func (r *RedfishReceiver) Close() { cclog.ComponentDebug(r.name, "DONE") } -// New function to create a new instance of the receiver +// NewRedfishReceiver creates a new instance of the redfish receiver // Initialize the receiver by giving it a name and reading in the config JSON func NewRedfishReceiver(name string, config json.RawMessage) (Receiver, error) { r := new(RedfishReceiver) + // Config options from config file + configJSON := struct { + Type string `json:"type"` + + // Maximum number of simultaneous redfish connections (default: 64) + Fanout int `json:"fanout,omitempty"` + // How often the redfish power metrics should be read and send to the sink (default: 30 s) + IntervalString string `json:"interval,omitempty"` + + // Control whether a client verifies the server's certificate + // (default: true == do not verify server's certificate) + HttpInsecure bool `json:"http_insecure,omitempty"` + // Time limit for requests made by this HTTP client (default: 10 s) + HttpTimeoutString string `json:"http_timeout,omitempty"` + + // Default client username, password and endpoint + Username *string `json:"username"` // User name to authenticate with + Password *string `json:"password"` // Password to use for authentication + Endpoint *string `json:"endpoint"` // URL of the redfish service + + // Globally disable collection of power, processor or thermal metrics + DisablePowerMetrics bool `json:"disable_power_metrics"` + DisableProcessorMetrics bool `json:"disable_processor_metrics"` + DisableThermalMetrics bool `json:"disable_thermal_metrics"` + + // Globally excluded metrics + ExcludeMetrics []string `json:"exclude_metrics,omitempty"` + + ClientConfigs []struct { + HostList []string `json:"host_list"` // List of hosts with the same client configuration + Username *string `json:"username"` // User name to authenticate with + Password *string `json:"password"` // Password to use for authentication + Endpoint *string `json:"endpoint"` // URL of the redfish service + + // Per client disable collection of power,processor or thermal metrics + DisablePowerMetrics bool `json:"disable_power_metrics"` + DisableProcessorMetrics bool `json:"disable_processor_metrics"` + DisableThermalMetrics bool `json:"disable_thermal_metrics"` + + // Per client excluded metrics + ExcludeMetrics []string `json:"exclude_metrics,omitempty"` + } `json:"client_config"` + }{ + // Set defaults values + // Allow overwriting these defaults by reading config JSON + Fanout: 64, + IntervalString: "30s", + HttpTimeoutString: "10s", + HttpInsecure: true, + } + // Set name r.name = fmt.Sprintf("RedfishReceiver(%s)", name) // Create done channel r.done = make(chan bool) - // Set defaults in r.config - // Allow overwriting these defaults by reading config JSON - r.config.Fanout = 64 - r.config.Interval = 30 - // Read the redfish receiver specific JSON config if len(config) > 0 { - err := json.Unmarshal(config, &r.config) + err := json.Unmarshal(config, &configJSON) if err != nil { cclog.ComponentError(r.name, "Error reading config:", err.Error()) return nil, err } } - // Create gofish client config - for i := range r.config.ClientConfigs { - clientConfig := &r.config.ClientConfigs[i] - gofishConfig := &clientConfig.gofish + // Convert interval string representation to duration + var err error + r.config.Interval, err = time.ParseDuration(configJSON.IntervalString) + if err != nil { + err := fmt.Errorf( + "Failed to parse duration string interval='%s': %w", + configJSON.IntervalString, + err, + ) + cclog.Error(r.name, err) + return nil, err + } - if clientConfig.Hostname == nil { - err := fmt.Errorf("client config number %v requires hostname", i) - cclog.ComponentError(r.name, err) - return nil, err - } + // HTTP timeout duration + r.config.HttpTimeout, err = time.ParseDuration(configJSON.HttpTimeoutString) + if err != nil { + err := fmt.Errorf( + "Failed to parse duration string http_timeout='%s': %w", + configJSON.HttpTimeoutString, + err, + ) + cclog.Error(r.name, err) + return nil, err + } - if clientConfig.Endpoint == nil { + // Create new http client + customTransport := http.DefaultTransport.(*http.Transport).Clone() + customTransport.TLSClientConfig = &tls.Config{ + InsecureSkipVerify: configJSON.HttpInsecure, + } + httpClient := &http.Client{ + Timeout: r.config.HttpTimeout, + Transport: customTransport, + } + + // Initialize client configurations + r.config.ClientConfigs = make([]RedfishReceiverClientConfig, 0) + + // Create client config from JSON config + for i := range configJSON.ClientConfigs { + + clientConfigJSON := &configJSON.ClientConfigs[i] + + var endpoint_pattern string + if clientConfigJSON.Endpoint != nil { + endpoint_pattern = *clientConfigJSON.Endpoint + } else if configJSON.Endpoint != nil { + endpoint_pattern = *configJSON.Endpoint + } else { err := fmt.Errorf("client config number %v requires endpoint", i) cclog.ComponentError(r.name, err) return nil, err } - gofishConfig.Endpoint = *clientConfig.Endpoint - if clientConfig.Username == nil { + var username string + if clientConfigJSON.Username != nil { + username = *clientConfigJSON.Username + } else if configJSON.Username != nil { + username = *configJSON.Username + } else { err := fmt.Errorf("client config number %v requires username", i) cclog.ComponentError(r.name, err) return nil, err } - gofishConfig.Username = *clientConfig.Username - if clientConfig.Password == nil { + var password string + if clientConfigJSON.Password != nil { + password = *clientConfigJSON.Password + } else if configJSON.Password != nil { + password = *configJSON.Password + } else { err := fmt.Errorf("client config number %v requires password", i) cclog.ComponentError(r.name, err) return nil, err } - gofishConfig.Password = *clientConfig.Password - gofishConfig.Insecure = true - if clientConfig.Insecure != nil { - gofishConfig.Insecure = *clientConfig.Insecure + // Which metrics should be collected + doPowerMetric := + !(configJSON.DisablePowerMetrics || + clientConfigJSON.DisablePowerMetrics) + doProcessorMetrics := + !(configJSON.DisableProcessorMetrics || + clientConfigJSON.DisableProcessorMetrics) + doThermalMetrics := + !(configJSON.DisableThermalMetrics || + clientConfigJSON.DisableThermalMetrics) + + // Is metrics excluded globally or per client + isExcluded := make(map[string]bool) + for _, key := range clientConfigJSON.ExcludeMetrics { + isExcluded[key] = true } + for _, key := range configJSON.ExcludeMetrics { + isExcluded[key] = true + } + + for _, host := range clientConfigJSON.HostList { + + // Endpoint of the redfish service + endpoint := strings.Replace(endpoint_pattern, "%h", host, -1) + + r.config.ClientConfigs = append( + r.config.ClientConfigs, + RedfishReceiverClientConfig{ + Hostname: host, + isExcluded: isExcluded, + doPowerMetric: doPowerMetric, + doProcessorMetrics: doProcessorMetrics, + doThermalMetrics: doThermalMetrics, + skipProcessorMetricsURL: make(map[string]bool), + gofish: gofish.ClientConfig{ + Username: username, + Password: password, + Endpoint: endpoint, + HTTPClient: httpClient, + }, + }) + } + } + // Compute parallel fanout to use + numClients := len(r.config.ClientConfigs) + r.config.fanout = configJSON.Fanout + if numClients < r.config.fanout { + r.config.fanout = numClients + } + + if numClients == 0 { + err := fmt.Errorf("at least one client config is required") + cclog.ComponentError(r.name, err) + return nil, err + } + + // Check for duplicate client configurations + isDuplicate := make(map[string]bool) + for i := range r.config.ClientConfigs { + host := r.config.ClientConfigs[i].Hostname + if isDuplicate[host] { + err := fmt.Errorf("Found duplicate client config for host %s", host) + cclog.ComponentError(r.name, err) + return nil, err + } + isDuplicate[host] = true + } + + // Give some basic info about redfish receiver status + cclog.ComponentInfo(r.name, "Monitoring", numClients, "clients") + cclog.ComponentInfo(r.name, "Monitoring interval:", r.config.Interval) + cclog.ComponentInfo(r.name, "Monitoring parallel fanout:", r.config.fanout) + return r, nil } diff --git a/receivers/redfishReceiver.md b/receivers/redfishReceiver.md new file mode 100644 index 0000000..1bc3ed8 --- /dev/null +++ b/receivers/redfishReceiver.md @@ -0,0 +1,54 @@ +## Redfish receiver + +The Redfish receiver uses the [Redfish (specification)](https://www.dmtf.org/standards/redfish) to query thermal and power metrics. Thermal metrics may include various fan speeds and temperatures. Power metrics may include the current power consumption of various hardware components. It may also include the minimum, maximum and average power consumption of these components in a given time interval. The receiver will poll each configured redfish device once in a given interval. Multiple devices can be accessed in parallel to increase throughput. + +### Configuration structure + +```json +{ + "": { + "type": "redfish", + "username": "", + "password": "", + "endpoint": "https://%h-bmc", + "exclude_metrics": [ "min_consumed_watts" ], + "client_config": [ + { + "host_list": [ "", "" ] + }, + { + "host_list": [ "", "" ] + "disable_power_metrics": true + }, + { + "host_list": [ "" ], + "username": "", + "password": "", + "endpoint": "https://%h-BMC", + "disable_thermal_metrics": true + } + ] + } +} +``` + +Global settings: + +- `fanout`: Maximum number of simultaneous redfish connections (default: 64) +- `interval`: How often the redfish power metrics should be read and send to the sink (default: 30 s) +- `http_insecure`: Control whether a client verifies the server's certificate (default: true == do not verify server's certificate) +- `http_timeout`: Time limit for requests made by this HTTP client (default: 10 s) + +Global and per redfish device settings (per redfish device settings overwrite the global settings): + +- `disable_power_metrics`: disable collection of power metrics +- `disable_processor_metrics`: disable collection of processor metrics +- `disable_thermal_metrics`: disable collection of thermal metrics +- `exclude_metrics`: list of excluded metrics +- `username`: User name to authenticate with +- `password`: Password to use for authentication +- `endpoint`: URL of the redfish service (placeholder `%h` gets replaced by the hostname) + +Per redfish device settings: + +- `host_list`: List of hosts with the same client configuration diff --git a/receivers/sampleReceiver.go b/receivers/sampleReceiver.go index 19d6f25..86e68cd 100644 --- a/receivers/sampleReceiver.go +++ b/receivers/sampleReceiver.go @@ -4,7 +4,7 @@ import ( "encoding/json" "fmt" - cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" + cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" ) // SampleReceiver configuration: receiver type, listen address, port diff --git a/scripts/cc-metric-collector.config b/scripts/cc-metric-collector.config index 988b0ff..0f5e1ba 100644 --- a/scripts/cc-metric-collector.config +++ b/scripts/cc-metric-collector.config @@ -6,7 +6,7 @@ CC_HOME=/tmp LOG_DIR=/var/log -DATA_DIR=/var/lib/grafana +DATA_DIR=/var/lib/cc-metric-collector MAX_OPEN_FILES=10000 diff --git a/scripts/cc-metric-collector.deb.rules b/scripts/cc-metric-collector.deb.rules new file mode 100644 index 0000000..18e2ea8 --- /dev/null +++ b/scripts/cc-metric-collector.deb.rules @@ -0,0 +1,16 @@ + +#!/usr/bin/make -f +# You must remove unused comment lines for the released package. +#export DH_VERBOSE = 1 +#export DEB_BUILD_MAINT_OPTIONS = hardening=+all +#export DEB_CFLAGS_MAINT_APPEND = -Wall -pedantic +#export DEB_LDFLAGS_MAINT_APPEND = -Wl,--as-needed + +%: + dh $@ + +override_dh_auto_build: + make + +override_dh_auto_install: + make PREFIX=/usr install \ No newline at end of file diff --git a/scripts/cc-metric-collector.init b/scripts/cc-metric-collector.init index acb82eb..1a7993b 100755 --- a/scripts/cc-metric-collector.init +++ b/scripts/cc-metric-collector.init @@ -19,7 +19,7 @@ PATH=/bin:/usr/bin:/sbin:/usr/sbin NAME=cc-metric-collector DESC="ClusterCockpit metric collector" -DEFAULT=/etc/default/${NAME}.json +DEFAULT=/etc/default/${NAME} CC_USER=clustercockpit CC_GROUP=clustercockpit diff --git a/sinks/gangliaCommon.go b/sinks/gangliaCommon.go index f92550b..1c846f0 100644 --- a/sinks/gangliaCommon.go +++ b/sinks/gangliaCommon.go @@ -4,7 +4,7 @@ import ( "fmt" "strings" - lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric" ) func GangliaMetricName(point lp.CCMetric) string { diff --git a/sinks/gangliaSink.go b/sinks/gangliaSink.go index 5324123..b6a0646 100644 --- a/sinks/gangliaSink.go +++ b/sinks/gangliaSink.go @@ -9,8 +9,8 @@ import ( // "time" "os/exec" - cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" - lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" + lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric" ) const GMETRIC_EXEC = `gmetric` diff --git a/sinks/httpSink.go b/sinks/httpSink.go index 466915d..49d98b0 100644 --- a/sinks/httpSink.go +++ b/sinks/httpSink.go @@ -9,8 +9,8 @@ import ( "sync" "time" - cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" - lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" + lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric" influx "github.com/influxdata/line-protocol" ) diff --git a/sinks/influxAsyncSink.go b/sinks/influxAsyncSink.go index bf88079..ccf5302 100644 --- a/sinks/influxAsyncSink.go +++ b/sinks/influxAsyncSink.go @@ -9,8 +9,8 @@ import ( "strings" "time" - cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" - lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" + lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric" influxdb2 "github.com/influxdata/influxdb-client-go/v2" influxdb2Api "github.com/influxdata/influxdb-client-go/v2/api" influxdb2ApiHttp "github.com/influxdata/influxdb-client-go/v2/api/http" diff --git a/sinks/influxSink.go b/sinks/influxSink.go index 90f8da8..94704cd 100644 --- a/sinks/influxSink.go +++ b/sinks/influxSink.go @@ -9,8 +9,8 @@ import ( "sync" "time" - cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" - lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" + lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric" influxdb2 "github.com/influxdata/influxdb-client-go/v2" influxdb2Api "github.com/influxdata/influxdb-client-go/v2/api" "github.com/influxdata/influxdb-client-go/v2/api/write" diff --git a/sinks/libgangliaSink.go b/sinks/libgangliaSink.go index 3651584..bb5488a 100644 --- a/sinks/libgangliaSink.go +++ b/sinks/libgangliaSink.go @@ -71,8 +71,8 @@ import ( "fmt" "unsafe" - cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" - lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" + lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric" "github.com/NVIDIA/go-nvml/pkg/dl" ) diff --git a/sinks/metricSink.go b/sinks/metricSink.go index c6c6860..2fd429c 100644 --- a/sinks/metricSink.go +++ b/sinks/metricSink.go @@ -1,7 +1,7 @@ package sinks import ( - lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric" ) type defaultSinkConfig struct { diff --git a/sinks/natsSink.go b/sinks/natsSink.go index 4d43454..a9a980d 100644 --- a/sinks/natsSink.go +++ b/sinks/natsSink.go @@ -8,8 +8,8 @@ import ( "sync" "time" - cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" - lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" + lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric" influx "github.com/influxdata/line-protocol" nats "github.com/nats-io/nats.go" ) diff --git a/sinks/prometheusSink.go b/sinks/prometheusSink.go index 5011ac0..7a1163d 100644 --- a/sinks/prometheusSink.go +++ b/sinks/prometheusSink.go @@ -9,8 +9,8 @@ import ( "strings" "sync" - cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" - lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" + lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric" "github.com/gorilla/mux" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" diff --git a/sinks/sampleSink.go b/sinks/sampleSink.go index 2a823e6..9d9e991 100644 --- a/sinks/sampleSink.go +++ b/sinks/sampleSink.go @@ -5,8 +5,8 @@ import ( "fmt" "log" - cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" - lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" + lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric" ) type SampleSinkConfig struct { diff --git a/sinks/sinkManager.go b/sinks/sinkManager.go index 6af8614..392346d 100644 --- a/sinks/sinkManager.go +++ b/sinks/sinkManager.go @@ -6,8 +6,8 @@ import ( "os" "sync" - cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" - lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" + lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric" ) const SINK_MAX_FORWARD = 50 diff --git a/sinks/stdoutSink.go b/sinks/stdoutSink.go index e091af3..eadef4e 100644 --- a/sinks/stdoutSink.go +++ b/sinks/stdoutSink.go @@ -7,7 +7,7 @@ import ( "strings" // "time" - lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric" ) type StdoutSink struct {