Merge branch 'develop' into self_collector

This commit is contained in:
Thomas Gruber 2022-10-10 12:06:11 +02:00 committed by GitHub
commit b0dce35289
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
72 changed files with 1393 additions and 424 deletions

View File

@ -133,13 +133,63 @@ jobs:
name: cc-metric-collector SRPM for UBI 8 name: cc-metric-collector SRPM for UBI 8
path: ${{ steps.rpmbuild.outputs.SRPM }} path: ${{ steps.rpmbuild.outputs.SRPM }}
#
# Build on Ubuntu 20.04 using official go package
#
Ubuntu-focal-build:
runs-on: ubuntu-latest
container: ubuntu:20.04
# The job outputs link to the outputs of the 'debrename' step
# Only job outputs can be used in child jobs
outputs:
deb : ${{steps.debrename.outputs.DEB}}
steps:
# Use apt to install development packages
- name: Install development packages
run: |
apt update && apt --assume-yes upgrade
apt --assume-yes install build-essential sed git wget bash
# Checkout git repository and submodules
# fetch-depth must be 0 to use git describe
# See: https://github.com/marketplace/actions/checkout
- name: Checkout
uses: actions/checkout@v2
with:
submodules: recursive
fetch-depth: 0
# Use official golang package
- name: Install Golang
run: |
wget -q https://go.dev/dl/go1.19.1.linux-amd64.tar.gz
tar -C /usr/local -xzf go1.19.1.linux-amd64.tar.gz
export PATH=/usr/local/go/bin:/usr/local/go/pkg/tool/linux_amd64:$PATH
go version
- name: DEB build MetricCollector
id: dpkg-build
run: |
export PATH=/usr/local/go/bin:/usr/local/go/pkg/tool/linux_amd64:$PATH
make DEB
- name: Rename DEB (add '_ubuntu20.04')
id: debrename
run: |
OLD_DEB_NAME=$(echo "${{steps.dpkg-build.outputs.DEB}}" | rev | cut -d '.' -f 2- | rev)
NEW_DEB_FILE="${OLD_DEB_NAME}_ubuntu20.04.deb"
mv "${{steps.dpkg-build.outputs.DEB}}" "${NEW_DEB_FILE}"
echo "::set-output name=DEB::${NEW_DEB_FILE}"
# See: https://github.com/actions/upload-artifact
- name: Save DEB as artifact
uses: actions/upload-artifact@v2
with:
name: cc-metric-collector DEB for Ubuntu 20.04
path: ${{ steps.debrename.outputs.DEB }}
# #
# Create release with fresh RPMs # Create release with fresh RPMs
# #
Release: Release:
runs-on: ubuntu-latest runs-on: ubuntu-latest
# We need the RPMs, so add dependency # We need the RPMs, so add dependency
needs: [AlmaLinux-RPM-build, UBI-8-RPM-build] needs: [AlmaLinux-RPM-build, UBI-8-RPM-build, Ubuntu-focal-build]
steps: steps:
# See: https://github.com/actions/download-artifact # See: https://github.com/actions/download-artifact
@ -161,6 +211,11 @@ jobs:
with: with:
name: cc-metric-collector SRPM for UBI 8 name: cc-metric-collector SRPM for UBI 8
- name: Download Ubuntu 20.04 DEB
uses: actions/download-artifact@v2
with:
name: cc-metric-collector DEB for Ubuntu 20.04
# The download actions do not publish the name of the downloaded file, # The download actions do not publish the name of the downloaded file,
# so we re-use the job outputs of the parent jobs. The files are all # so we re-use the job outputs of the parent jobs. The files are all
# downloaded to the current folder. # downloaded to the current folder.
@ -174,14 +229,17 @@ jobs:
ALMA_85_SRPM=$(basename "${{ needs.AlmaLinux-RPM-build.outputs.srpm}}") ALMA_85_SRPM=$(basename "${{ needs.AlmaLinux-RPM-build.outputs.srpm}}")
UBI_8_RPM=$(basename "${{ needs.UBI-8-RPM-build.outputs.rpm}}") UBI_8_RPM=$(basename "${{ needs.UBI-8-RPM-build.outputs.rpm}}")
UBI_8_SRPM=$(basename "${{ needs.UBI-8-RPM-build.outputs.srpm}}") UBI_8_SRPM=$(basename "${{ needs.UBI-8-RPM-build.outputs.srpm}}")
U_2004_DEB=$(basename "${{ needs.Ubuntu-focal-build.outputs.deb}}")
echo "ALMA_85_RPM::${ALMA_85_RPM}" echo "ALMA_85_RPM::${ALMA_85_RPM}"
echo "ALMA_85_SRPM::${ALMA_85_SRPM}" echo "ALMA_85_SRPM::${ALMA_85_SRPM}"
echo "UBI_8_RPM::${UBI_8_RPM}" echo "UBI_8_RPM::${UBI_8_RPM}"
echo "UBI_8_SRPM::${UBI_8_SRPM}" echo "UBI_8_SRPM::${UBI_8_SRPM}"
echo "U_2004_DEB::${U_2004_DEB}"
echo "::set-output name=ALMA_85_RPM::${ALMA_85_RPM}" echo "::set-output name=ALMA_85_RPM::${ALMA_85_RPM}"
echo "::set-output name=ALMA_85_SRPM::${ALMA_85_SRPM}" echo "::set-output name=ALMA_85_SRPM::${ALMA_85_SRPM}"
echo "::set-output name=UBI_8_RPM::${UBI_8_RPM}" echo "::set-output name=UBI_8_RPM::${UBI_8_RPM}"
echo "::set-output name=UBI_8_SRPM::${UBI_8_SRPM}" echo "::set-output name=UBI_8_SRPM::${UBI_8_SRPM}"
echo "::set-output name=U_2004_DEB::${U_2004_DEB}"
# See: https://github.com/softprops/action-gh-release # See: https://github.com/softprops/action-gh-release
- name: Release - name: Release
@ -194,3 +252,4 @@ jobs:
${{ steps.files.outputs.ALMA_85_SRPM }} ${{ steps.files.outputs.ALMA_85_SRPM }}
${{ steps.files.outputs.UBI_8_RPM }} ${{ steps.files.outputs.UBI_8_RPM }}
${{ steps.files.outputs.UBI_8_SRPM }} ${{ steps.files.outputs.UBI_8_SRPM }}
${{ steps.files.outputs.U_2004_DEB }}

View File

@ -32,3 +32,29 @@ jobs:
- name: Run MetricCollector once - name: Run MetricCollector once
run: ./cc-metric-collector --once --config .github/ci-config.json run: ./cc-metric-collector --once --config .github/ci-config.json
#
# Job build-1-19
# Build on latest Ubuntu using golang version 1.19
#
build-1-19:
runs-on: ubuntu-latest
steps:
# See: https://github.com/marketplace/actions/checkout
# Checkout git repository and submodules
- name: Checkout
uses: actions/checkout@v2
with:
submodules: recursive
# See: https://github.com/marketplace/actions/setup-go-environment
- name: Setup Golang
uses: actions/setup-go@v3
with:
go-version: '1.19'
- name: Build MetricCollector
run: make
- name: Run MetricCollector once
run: ./cc-metric-collector --once --config .github/ci-config.json

View File

@ -1,6 +1,6 @@
# cc-metric-collector # cc-metric-collector
A node agent for measuring, processing and forwarding node level metrics. It is part of the ClusterCockpit ecosystem. A node agent for measuring, processing and forwarding node level metrics. It is part of the [ClusterCockpit ecosystem](./docs/introduction.md).
The metric collector sends (and receives) metric in the [InfluxDB line protocol](https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol/) as it provides flexibility while providing a separation between tags (like index columns in relational databases) and fields (like data columns). The metric collector sends (and receives) metric in the [InfluxDB line protocol](https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol/) as it provides flexibility while providing a separation between tags (like index columns in relational databases) and fields (like data columns).
@ -11,7 +11,7 @@ The receiver runs as a go routine side-by-side with the timer loop and asynchron
# Configuration # Configuration
Configuration is implemented using a single json document that is distributed over network and may be persisted as file. Configuration is implemented using a single json document that is distributed over network and may be persisted as file.
Supported metrics are documented [here](https://github.com/ClusterCockpit/cc-specifications/blob/master/metrics/lineprotocol_alternative.md). Supported metrics are documented [here](https://github.com/ClusterCockpit/cc-specifications/blob/master/interfaces/lineprotocol/README.md).
There is a main configuration file with basic settings that point to the other configuration files for the different components. There is a main configuration file with basic settings that point to the other configuration files for the different components.
@ -26,7 +26,7 @@ There is a main configuration file with basic settings that point to the other c
} }
``` ```
The `interval` defines how often the metrics should be read and send to the sink. The `duration` tells collectors how long one measurement has to take. This is important for some collectors, like the `likwid` collector. The `interval` defines how often the metrics should be read and send to the sink. The `duration` tells collectors how long one measurement has to take. This is important for some collectors, like the `likwid` collector. For more information, see [here](./docs/configuration.md).
See the component READMEs for their configuration: See the component READMEs for their configuration:
@ -44,6 +44,8 @@ $ go get (requires at least golang 1.16)
$ make $ make
``` ```
For more information, see [here](./docs/building.md).
# Running # Running
``` ```

View File

@ -15,10 +15,10 @@ import (
"sync" "sync"
"time" "time"
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
mr "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" mr "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
mct "github.com/ClusterCockpit/cc-metric-collector/internal/multiChanTicker" cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
mct "github.com/ClusterCockpit/cc-metric-collector/pkg/multiChanTicker"
) )
type CentralConfigFile struct { type CentralConfigFile struct {

View File

@ -35,7 +35,7 @@ In contrast to the configuration files for sinks and receivers, the collectors c
* [`nfs4stat`](./nfs4Metric.md) * [`nfs4stat`](./nfs4Metric.md)
* [`cpufreq`](./cpufreqMetric.md) * [`cpufreq`](./cpufreqMetric.md)
* [`cpufreq_cpuinfo`](./cpufreqCpuinfoMetric.md) * [`cpufreq_cpuinfo`](./cpufreqCpuinfoMetric.md)
* [`numastat`](./numastatMetric.md) * [`numastats`](./numastatsMetric.md)
* [`gpfs`](./gpfsMetric.md) * [`gpfs`](./gpfsMetric.md)
* [`beegfs_meta`](./beegfsmetaMetric.md) * [`beegfs_meta`](./beegfsmetaMetric.md)
* [`beegfs_storage`](./beegfsstorageMetric.md) * [`beegfs_storage`](./beegfsstorageMetric.md)

View File

@ -5,7 +5,7 @@ import (
"bytes" "bytes"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io/ioutil" "io"
"os" "os"
"os/exec" "os/exec"
"os/user" "os/user"
@ -14,8 +14,8 @@ import (
"strings" "strings"
"time" "time"
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
) )
const DEFAULT_BEEGFS_CMD = "beegfs-ctl" const DEFAULT_BEEGFS_CMD = "beegfs-ctl"
@ -115,7 +115,7 @@ func (m *BeegfsMetaCollector) Read(interval time.Duration, output chan lp.CCMetr
return return
} }
//get mounpoint //get mounpoint
buffer, _ := ioutil.ReadFile(string("/proc/mounts")) buffer, _ := os.ReadFile(string("/proc/mounts"))
mounts := strings.Split(string(buffer), "\n") mounts := strings.Split(string(buffer), "\n")
var mountpoints []string var mountpoints []string
for _, line := range mounts { for _, line := range mounts {
@ -157,9 +157,9 @@ func (m *BeegfsMetaCollector) Read(interval time.Duration, output chan lp.CCMetr
if err != nil { if err != nil {
fmt.Fprintf(os.Stderr, "BeegfsMetaCollector.Read(): Failed to execute command \"%s\": %s\n", cmd.String(), err.Error()) fmt.Fprintf(os.Stderr, "BeegfsMetaCollector.Read(): Failed to execute command \"%s\": %s\n", cmd.String(), err.Error())
fmt.Fprintf(os.Stderr, "BeegfsMetaCollector.Read(): command exit code: \"%d\"\n", cmd.ProcessState.ExitCode()) fmt.Fprintf(os.Stderr, "BeegfsMetaCollector.Read(): command exit code: \"%d\"\n", cmd.ProcessState.ExitCode())
data, _ := ioutil.ReadAll(cmdStderr) data, _ := io.ReadAll(cmdStderr)
fmt.Fprintf(os.Stderr, "BeegfsMetaCollector.Read(): command stderr: \"%s\"\n", string(data)) fmt.Fprintf(os.Stderr, "BeegfsMetaCollector.Read(): command stderr: \"%s\"\n", string(data))
data, _ = ioutil.ReadAll(cmdStdout) data, _ = io.ReadAll(cmdStdout)
fmt.Fprintf(os.Stderr, "BeegfsMetaCollector.Read(): command stdout: \"%s\"\n", string(data)) fmt.Fprintf(os.Stderr, "BeegfsMetaCollector.Read(): command stdout: \"%s\"\n", string(data))
return return
} }

View File

@ -5,7 +5,7 @@ import (
"bytes" "bytes"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io/ioutil" "io"
"os" "os"
"os/exec" "os/exec"
"os/user" "os/user"
@ -14,8 +14,8 @@ import (
"strings" "strings"
"time" "time"
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
) )
// Struct for the collector-specific JSON config // Struct for the collector-specific JSON config
@ -108,7 +108,7 @@ func (m *BeegfsStorageCollector) Read(interval time.Duration, output chan lp.CCM
return return
} }
//get mounpoint //get mounpoint
buffer, _ := ioutil.ReadFile(string("/proc/mounts")) buffer, _ := os.ReadFile(string("/proc/mounts"))
mounts := strings.Split(string(buffer), "\n") mounts := strings.Split(string(buffer), "\n")
var mountpoints []string var mountpoints []string
for _, line := range mounts { for _, line := range mounts {
@ -149,9 +149,9 @@ func (m *BeegfsStorageCollector) Read(interval time.Duration, output chan lp.CCM
if err != nil { if err != nil {
fmt.Fprintf(os.Stderr, "BeegfsStorageCollector.Read(): Failed to execute command \"%s\": %s\n", cmd.String(), err.Error()) fmt.Fprintf(os.Stderr, "BeegfsStorageCollector.Read(): Failed to execute command \"%s\": %s\n", cmd.String(), err.Error())
fmt.Fprintf(os.Stderr, "BeegfsStorageCollector.Read(): command exit code: \"%d\"\n", cmd.ProcessState.ExitCode()) fmt.Fprintf(os.Stderr, "BeegfsStorageCollector.Read(): command exit code: \"%d\"\n", cmd.ProcessState.ExitCode())
data, _ := ioutil.ReadAll(cmdStderr) data, _ := io.ReadAll(cmdStderr)
fmt.Fprintf(os.Stderr, "BeegfsStorageCollector.Read(): command stderr: \"%s\"\n", string(data)) fmt.Fprintf(os.Stderr, "BeegfsStorageCollector.Read(): command stderr: \"%s\"\n", string(data))
data, _ = ioutil.ReadAll(cmdStdout) data, _ = io.ReadAll(cmdStdout)
fmt.Fprintf(os.Stderr, "BeegfsStorageCollector.Read(): command stdout: \"%s\"\n", string(data)) fmt.Fprintf(os.Stderr, "BeegfsStorageCollector.Read(): command stdout: \"%s\"\n", string(data))
return return
} }

View File

@ -6,9 +6,9 @@ import (
"sync" "sync"
"time" "time"
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
mct "github.com/ClusterCockpit/cc-metric-collector/internal/multiChanTicker" mct "github.com/ClusterCockpit/cc-metric-collector/pkg/multiChanTicker"
) )
// Map of all available metric collectors // Map of all available metric collectors
@ -38,6 +38,7 @@ var AvailableCollectors = map[string]MetricCollector{
"beegfs_storage": new(BeegfsStorageCollector), "beegfs_storage": new(BeegfsStorageCollector),
"rocm_smi": new(RocmSmiCollector), "rocm_smi": new(RocmSmiCollector),
"self": new(SelfCollector), "self": new(SelfCollector),
"schedstat": new(SchedstatCollector),
} }
// Metric collector manager data structure // Metric collector manager data structure

View File

@ -10,8 +10,8 @@ import (
"strings" "strings"
"time" "time"
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
) )
// //

View File

@ -3,14 +3,14 @@ package collectors
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"io/ioutil" "os"
"path/filepath" "path/filepath"
"strconv" "strconv"
"strings" "strings"
"time" "time"
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
"golang.org/x/sys/unix" "golang.org/x/sys/unix"
) )
@ -88,7 +88,7 @@ func (m *CPUFreqCollector) Init(config json.RawMessage) error {
// Read package ID // Read package ID
physicalPackageIDFile := filepath.Join(cpuDir, "topology", "physical_package_id") physicalPackageIDFile := filepath.Join(cpuDir, "topology", "physical_package_id")
line, err := ioutil.ReadFile(physicalPackageIDFile) line, err := os.ReadFile(physicalPackageIDFile)
if err != nil { if err != nil {
return fmt.Errorf("unable to read physical package ID from file '%s': %v", physicalPackageIDFile, err) return fmt.Errorf("unable to read physical package ID from file '%s': %v", physicalPackageIDFile, err)
} }
@ -100,7 +100,7 @@ func (m *CPUFreqCollector) Init(config json.RawMessage) error {
// Read core ID // Read core ID
coreIDFile := filepath.Join(cpuDir, "topology", "core_id") coreIDFile := filepath.Join(cpuDir, "topology", "core_id")
line, err = ioutil.ReadFile(coreIDFile) line, err = os.ReadFile(coreIDFile)
if err != nil { if err != nil {
return fmt.Errorf("unable to read core ID from file '%s': %v", coreIDFile, err) return fmt.Errorf("unable to read core ID from file '%s': %v", coreIDFile, err)
} }
@ -188,7 +188,7 @@ func (m *CPUFreqCollector) Read(interval time.Duration, output chan lp.CCMetric)
} }
// Read current frequency // Read current frequency
line, err := ioutil.ReadFile(t.scalingCurFreqFile) line, err := os.ReadFile(t.scalingCurFreqFile)
if err != nil { if err != nil {
cclog.ComponentError( cclog.ComponentError(
m.name, m.name,

View File

@ -9,8 +9,9 @@ import (
"strings" "strings"
"time" "time"
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
sysconf "github.com/tklauser/go-sysconf"
) )
const CPUSTATFILE = `/proc/stat` const CPUSTATFILE = `/proc/stat`
@ -22,9 +23,11 @@ type CpustatCollectorConfig struct {
type CpustatCollector struct { type CpustatCollector struct {
metricCollector metricCollector
config CpustatCollectorConfig config CpustatCollectorConfig
lastTimestamp time.Time // Store time stamp of last tick to derive values
matches map[string]int matches map[string]int
cputags map[string]map[string]string cputags map[string]map[string]string
nodetags map[string]string nodetags map[string]string
olddata map[string]map[string]int64
} }
func (m *CpustatCollector) Init(config json.RawMessage) error { func (m *CpustatCollector) Init(config json.RawMessage) error {
@ -76,36 +79,48 @@ func (m *CpustatCollector) Init(config json.RawMessage) error {
// Pre-generate tags for all CPUs // Pre-generate tags for all CPUs
num_cpus := 0 num_cpus := 0
m.cputags = make(map[string]map[string]string) m.cputags = make(map[string]map[string]string)
m.olddata = make(map[string]map[string]int64)
scanner := bufio.NewScanner(file) scanner := bufio.NewScanner(file)
for scanner.Scan() { for scanner.Scan() {
line := scanner.Text() line := scanner.Text()
linefields := strings.Fields(line) linefields := strings.Fields(line)
if strings.HasPrefix(linefields[0], "cpu") && strings.Compare(linefields[0], "cpu") != 0 { if strings.Compare(linefields[0], "cpu") == 0 {
m.olddata["cpu"] = make(map[string]int64)
for k, v := range m.matches {
m.olddata["cpu"][k], _ = strconv.ParseInt(linefields[v], 0, 64)
}
} else if strings.HasPrefix(linefields[0], "cpu") && strings.Compare(linefields[0], "cpu") != 0 {
cpustr := strings.TrimLeft(linefields[0], "cpu") cpustr := strings.TrimLeft(linefields[0], "cpu")
cpu, _ := strconv.Atoi(cpustr) cpu, _ := strconv.Atoi(cpustr)
m.cputags[linefields[0]] = map[string]string{"type": "hwthread", "type-id": fmt.Sprintf("%d", cpu)} m.cputags[linefields[0]] = map[string]string{"type": "hwthread", "type-id": fmt.Sprintf("%d", cpu)}
m.olddata[linefields[0]] = make(map[string]int64)
for k, v := range m.matches {
m.olddata[linefields[0]][k], _ = strconv.ParseInt(linefields[v], 0, 64)
}
num_cpus++ num_cpus++
} }
} }
m.lastTimestamp = time.Now()
m.init = true m.init = true
return nil return nil
} }
func (m *CpustatCollector) parseStatLine(linefields []string, tags map[string]string, output chan lp.CCMetric) { func (m *CpustatCollector) parseStatLine(linefields []string, tags map[string]string, output chan lp.CCMetric, now time.Time, tsdelta time.Duration) {
values := make(map[string]float64) values := make(map[string]float64)
total := 0.0 clktck, _ := sysconf.Sysconf(sysconf.SC_CLK_TCK)
for match, index := range m.matches { for match, index := range m.matches {
if len(match) > 0 { if len(match) > 0 {
x, err := strconv.ParseInt(linefields[index], 0, 64) x, err := strconv.ParseInt(linefields[index], 0, 64)
if err == nil { if err == nil {
values[match] = float64(x) vdiff := x - m.olddata[linefields[0]][match]
total += values[match] m.olddata[linefields[0]][match] = x // Store new value for next run
values[match] = float64(vdiff) / float64(tsdelta.Seconds()) / float64(clktck)
} }
} }
} }
t := time.Now()
for name, value := range values { for name, value := range values {
y, err := lp.New(name, tags, m.meta, map[string]interface{}{"value": (value * 100.0) / total}, t) y, err := lp.New(name, tags, m.meta, map[string]interface{}{"value": value * 100}, now)
if err == nil { if err == nil {
output <- y output <- y
} }
@ -117,6 +132,9 @@ func (m *CpustatCollector) Read(interval time.Duration, output chan lp.CCMetric)
return return
} }
num_cpus := 0 num_cpus := 0
now := time.Now()
tsdelta := now.Sub(m.lastTimestamp)
file, err := os.Open(string(CPUSTATFILE)) file, err := os.Open(string(CPUSTATFILE))
if err != nil { if err != nil {
cclog.ComponentError(m.name, err.Error()) cclog.ComponentError(m.name, err.Error())
@ -128,9 +146,9 @@ func (m *CpustatCollector) Read(interval time.Duration, output chan lp.CCMetric)
line := scanner.Text() line := scanner.Text()
linefields := strings.Fields(line) linefields := strings.Fields(line)
if strings.Compare(linefields[0], "cpu") == 0 { if strings.Compare(linefields[0], "cpu") == 0 {
m.parseStatLine(linefields, m.nodetags, output) m.parseStatLine(linefields, m.nodetags, output, now, tsdelta)
} else if strings.HasPrefix(linefields[0], "cpu") { } else if strings.HasPrefix(linefields[0], "cpu") {
m.parseStatLine(linefields, m.cputags[linefields[0]], output) m.parseStatLine(linefields, m.cputags[linefields[0]], output, now, tsdelta)
num_cpus++ num_cpus++
} }
} }
@ -139,11 +157,13 @@ func (m *CpustatCollector) Read(interval time.Duration, output chan lp.CCMetric)
m.nodetags, m.nodetags,
m.meta, m.meta,
map[string]interface{}{"value": int(num_cpus)}, map[string]interface{}{"value": int(num_cpus)},
time.Now(), now,
) )
if err == nil { if err == nil {
output <- num_cpus_metric output <- num_cpus_metric
} }
m.lastTimestamp = now
} }
func (m *CpustatCollector) Close() { func (m *CpustatCollector) Close() {

View File

@ -3,13 +3,13 @@ package collectors
import ( import (
"encoding/json" "encoding/json"
"errors" "errors"
"io/ioutil"
"log" "log"
"os"
"os/exec" "os/exec"
"strings" "strings"
"time" "time"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
influx "github.com/influxdata/line-protocol" influx "github.com/influxdata/line-protocol"
) )
@ -53,7 +53,7 @@ func (m *CustomCmdCollector) Init(config json.RawMessage) error {
} }
} }
for _, f := range m.config.Files { for _, f := range m.config.Files {
_, err = ioutil.ReadFile(f) _, err = os.ReadFile(f)
if err == nil { if err == nil {
m.files = append(m.files, f) m.files = append(m.files, f)
} else { } else {
@ -106,7 +106,7 @@ func (m *CustomCmdCollector) Read(interval time.Duration, output chan lp.CCMetri
} }
} }
for _, file := range m.files { for _, file := range m.files {
buffer, err := ioutil.ReadFile(file) buffer, err := os.ReadFile(file)
if err != nil { if err != nil {
log.Print(err) log.Print(err)
return return

View File

@ -8,8 +8,8 @@ import (
"syscall" "syscall"
"time" "time"
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
) )
// "log" // "log"

View File

@ -5,7 +5,7 @@ import (
"bytes" "bytes"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io/ioutil" "io"
"log" "log"
"os/exec" "os/exec"
"os/user" "os/user"
@ -13,8 +13,8 @@ import (
"strings" "strings"
"time" "time"
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
) )
const DEFAULT_GPFS_CMD = "mmpmon" const DEFAULT_GPFS_CMD = "mmpmon"
@ -118,8 +118,8 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) {
cmd.Stderr = cmdStderr cmd.Stderr = cmdStderr
err := cmd.Run() err := cmd.Run()
if err != nil { if err != nil {
dataStdErr, _ := ioutil.ReadAll(cmdStderr) dataStdErr, _ := io.ReadAll(cmdStderr)
dataStdOut, _ := ioutil.ReadAll(cmdStdout) dataStdOut, _ := io.ReadAll(cmdStdout)
cclog.ComponentError( cclog.ComponentError(
m.name, m.name,
fmt.Sprintf("Read(): Failed to execute command \"%s\": %v\n", cmd.String(), err), fmt.Sprintf("Read(): Failed to execute command \"%s\": %v\n", cmd.String(), err),

View File

@ -2,11 +2,10 @@ package collectors
import ( import (
"fmt" "fmt"
"io/ioutil"
"os" "os"
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
"golang.org/x/sys/unix" "golang.org/x/sys/unix"
"encoding/json" "encoding/json"
@ -21,6 +20,7 @@ const IB_BASEPATH = "/sys/class/infiniband/"
type InfinibandCollectorMetric struct { type InfinibandCollectorMetric struct {
path string path string
unit string unit string
scale int64
} }
type InfinibandCollectorInfo struct { type InfinibandCollectorInfo struct {
@ -84,7 +84,7 @@ func (m *InfinibandCollector) Init(config json.RawMessage) error {
for _, path := range ibDirs { for _, path := range ibDirs {
// Skip, when no LID is assigned // Skip, when no LID is assigned
line, err := ioutil.ReadFile(filepath.Join(path, "lid")) line, err := os.ReadFile(filepath.Join(path, "lid"))
if err != nil { if err != nil {
continue continue
} }
@ -113,10 +113,10 @@ func (m *InfinibandCollector) Init(config json.RawMessage) error {
// Check access to counter files // Check access to counter files
countersDir := filepath.Join(path, "counters") countersDir := filepath.Join(path, "counters")
portCounterFiles := map[string]InfinibandCollectorMetric{ portCounterFiles := map[string]InfinibandCollectorMetric{
"ib_recv": {path: filepath.Join(countersDir, "port_rcv_data"), unit: "bytes"}, "ib_recv": {path: filepath.Join(countersDir, "port_rcv_data"), unit: "bytes", scale: 4},
"ib_xmit": {path: filepath.Join(countersDir, "port_xmit_data"), unit: "bytes"}, "ib_xmit": {path: filepath.Join(countersDir, "port_xmit_data"), unit: "bytes", scale: 4},
"ib_recv_pkts": {path: filepath.Join(countersDir, "port_rcv_packets"), unit: "packets"}, "ib_recv_pkts": {path: filepath.Join(countersDir, "port_rcv_packets"), unit: "packets", scale: 1},
"ib_xmit_pkts": {path: filepath.Join(countersDir, "port_xmit_packets"), unit: "packets"}, "ib_xmit_pkts": {path: filepath.Join(countersDir, "port_xmit_packets"), unit: "packets", scale: 1},
} }
for _, counter := range portCounterFiles { for _, counter := range portCounterFiles {
err := unix.Access(counter.path, unix.R_OK) err := unix.Access(counter.path, unix.R_OK)
@ -174,7 +174,7 @@ func (m *InfinibandCollector) Read(interval time.Duration, output chan lp.CCMetr
for counterName, counterDef := range info.portCounterFiles { for counterName, counterDef := range info.portCounterFiles {
// Read counter file // Read counter file
line, err := ioutil.ReadFile(counterDef.path) line, err := os.ReadFile(counterDef.path)
if err != nil { if err != nil {
cclog.ComponentError( cclog.ComponentError(
m.name, m.name,
@ -191,6 +191,8 @@ func (m *InfinibandCollector) Read(interval time.Duration, output chan lp.CCMetr
fmt.Sprintf("Read(): Failed to convert Infininiband metrice %s='%s' to int64: %v", counterName, data, err)) fmt.Sprintf("Read(): Failed to convert Infininiband metrice %s='%s' to int64: %v", counterName, data, err))
continue continue
} }
// Scale raw value
v *= counterDef.scale
// Send absolut values // Send absolut values
if m.config.SendAbsoluteValues { if m.config.SendAbsoluteValues {

View File

@ -4,8 +4,8 @@ import (
"bufio" "bufio"
"os" "os"
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
// "log" // "log"
"encoding/json" "encoding/json"

View File

@ -10,7 +10,7 @@ import (
"strings" "strings"
"time" "time"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
) )
const IPMITOOL_PATH = `ipmitool` const IPMITOOL_PATH = `ipmitool`

View File

@ -12,7 +12,6 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"io/ioutil"
"math" "math"
"os" "os"
"os/signal" "os/signal"
@ -24,10 +23,10 @@ import (
"time" "time"
"unsafe" "unsafe"
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
topo "github.com/ClusterCockpit/cc-metric-collector/internal/ccTopology"
agg "github.com/ClusterCockpit/cc-metric-collector/internal/metricAggregator" agg "github.com/ClusterCockpit/cc-metric-collector/internal/metricAggregator"
cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
topo "github.com/ClusterCockpit/cc-metric-collector/pkg/ccTopology"
"github.com/NVIDIA/go-nvml/pkg/dl" "github.com/NVIDIA/go-nvml/pkg/dl"
) )
@ -154,12 +153,13 @@ func getBaseFreq() float64 {
} }
var freq float64 = math.NaN() var freq float64 = math.NaN()
for _, f := range files { for _, f := range files {
buffer, err := ioutil.ReadFile(f) buffer, err := os.ReadFile(f)
if err == nil { if err == nil {
data := strings.Replace(string(buffer), "\n", "", -1) data := strings.Replace(string(buffer), "\n", "", -1)
x, err := strconv.ParseInt(data, 0, 64) x, err := strconv.ParseInt(data, 0, 64)
if err == nil { if err == nil {
freq = float64(x) * 1e6 freq = float64(x)
break
} }
} }
} }
@ -168,11 +168,11 @@ func getBaseFreq() float64 {
C.power_init(0) C.power_init(0)
info := C.get_powerInfo() info := C.get_powerInfo()
if float64(info.baseFrequency) != 0 { if float64(info.baseFrequency) != 0 {
freq = float64(info.baseFrequency) * 1e6 freq = float64(info.baseFrequency)
} }
C.power_finalize() C.power_finalize()
} }
return freq return freq * 1e3
} }
func (m *LikwidCollector) Init(config json.RawMessage) error { func (m *LikwidCollector) Init(config json.RawMessage) error {

View File

@ -7,6 +7,9 @@ The `likwid` collector is probably the most complicated collector. The LIKWID li
"likwid": { "likwid": {
"force_overwrite" : false, "force_overwrite" : false,
"invalid_to_zero" : false, "invalid_to_zero" : false,
"liblikwid_path" : "/path/to/liblikwid.so",
"accessdaemon_path" : "/folder/that/contains/likwid-accessD",
"access_mode" : "direct or accessdaemon or perf_event",
"eventsets": [ "eventsets": [
{ {
"events" : { "events" : {

View File

@ -3,13 +3,13 @@ package collectors
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"io/ioutil" "os"
"strconv" "strconv"
"strings" "strings"
"time" "time"
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
) )
// //
@ -72,7 +72,7 @@ func (m *LoadavgCollector) Read(interval time.Duration, output chan lp.CCMetric)
if !m.init { if !m.init {
return return
} }
buffer, err := ioutil.ReadFile(LOADAVGFILE) buffer, err := os.ReadFile(LOADAVGFILE)
if err != nil { if err != nil {
if err != nil { if err != nil {
cclog.ComponentError( cclog.ComponentError(

View File

@ -10,8 +10,8 @@ import (
"strings" "strings"
"time" "time"
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
) )
const LUSTRE_SYSFS = `/sys/fs/lustre` const LUSTRE_SYSFS = `/sys/fs/lustre`

View File

@ -12,8 +12,8 @@ import (
"strings" "strings"
"time" "time"
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
) )
const MEMSTATFILE = "/proc/meminfo" const MEMSTATFILE = "/proc/meminfo"
@ -68,7 +68,8 @@ func getStats(filename string) map[string]MemstatStats {
} else if len(linefields) == 5 { } else if len(linefields) == 5 {
v, err := strconv.ParseFloat(linefields[3], 64) v, err := strconv.ParseFloat(linefields[3], 64)
if err == nil { if err == nil {
stats[strings.Trim(linefields[0], ":")] = MemstatStats{ cclog.ComponentDebug("getStats", strings.Trim(linefields[2], ":"), v, linefields[4])
stats[strings.Trim(linefields[2], ":")] = MemstatStats{
value: v, value: v,
unit: linefields[4], unit: linefields[4],
} }
@ -160,7 +161,6 @@ func (m *MemstatCollector) Init(config json.RawMessage) error {
func (m *MemstatCollector) Read(interval time.Duration, output chan lp.CCMetric) { func (m *MemstatCollector) Read(interval time.Duration, output chan lp.CCMetric) {
if !m.init { if !m.init {
cclog.ComponentPrint(m.name, "Here")
return return
} }
@ -188,16 +188,20 @@ func (m *MemstatCollector) Read(interval time.Duration, output chan lp.CCMetric)
unit := "" unit := ""
if totalVal, total := stats["MemTotal"]; total { if totalVal, total := stats["MemTotal"]; total {
if freeVal, free := stats["MemFree"]; free { if freeVal, free := stats["MemFree"]; free {
if bufVal, buffers := stats["Buffers"]; buffers { memUsed = totalVal.value - freeVal.value
if cacheVal, cached := stats["Cached"]; cached {
memUsed = totalVal.value - (freeVal.value + bufVal.value + cacheVal.value)
if len(totalVal.unit) > 0 { if len(totalVal.unit) > 0 {
unit = totalVal.unit unit = totalVal.unit
} else if len(freeVal.unit) > 0 { } else if len(freeVal.unit) > 0 {
unit = freeVal.unit unit = freeVal.unit
} else if len(bufVal.unit) > 0 { }
if bufVal, buffers := stats["Buffers"]; buffers {
memUsed -= bufVal.value
if len(bufVal.unit) > 0 && len(unit) == 0 {
unit = bufVal.unit unit = bufVal.unit
} else if len(cacheVal.unit) > 0 { }
if cacheVal, cached := stats["Cached"]; cached {
memUsed -= cacheVal.value
if len(cacheVal.unit) > 0 && len(unit) == 0 {
unit = cacheVal.unit unit = cacheVal.unit
} }
} }

View File

@ -5,7 +5,7 @@ import (
"fmt" "fmt"
"time" "time"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
) )
type MetricCollector interface { type MetricCollector interface {

View File

@ -9,8 +9,8 @@ import (
"strings" "strings"
"time" "time"
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
) )
const NETSTATFILE = "/proc/net/dev" const NETSTATFILE = "/proc/net/dev"

View File

@ -11,7 +11,7 @@ import (
"strings" "strings"
"time" "time"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
) )
// First part contains the code for the general NfsCollector. // First part contains the code for the general NfsCollector.

View File

@ -10,8 +10,8 @@ import (
"strings" "strings"
"time" "time"
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
) )
// //

View File

@ -8,8 +8,8 @@ import (
"strings" "strings"
"time" "time"
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
"github.com/NVIDIA/go-nvml/pkg/nvml" "github.com/NVIDIA/go-nvml/pkg/nvml"
) )

View File

@ -6,8 +6,8 @@ import (
"fmt" "fmt"
"time" "time"
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
"github.com/ClusterCockpit/go-rocm-smi/pkg/rocm_smi" "github.com/ClusterCockpit/go-rocm-smi/pkg/rocm_smi"
) )
@ -66,14 +66,14 @@ func (m *RocmSmiCollector) Init(config json.RawMessage) error {
ret := rocm_smi.Init() ret := rocm_smi.Init()
if ret != rocm_smi.STATUS_SUCCESS { if ret != rocm_smi.STATUS_SUCCESS {
err = errors.New("Failed to initialize ROCm SMI library") err = errors.New("failed to initialize ROCm SMI library")
cclog.ComponentError(m.name, err.Error()) cclog.ComponentError(m.name, err.Error())
return err return err
} }
numDevs, ret := rocm_smi.NumMonitorDevices() numDevs, ret := rocm_smi.NumMonitorDevices()
if ret != rocm_smi.STATUS_SUCCESS { if ret != rocm_smi.STATUS_SUCCESS {
err = errors.New("Failed to get number of GPUs from ROCm SMI library") err = errors.New("failed to get number of GPUs from ROCm SMI library")
cclog.ComponentError(m.name, err.Error()) cclog.ComponentError(m.name, err.Error())
return err return err
} }
@ -98,14 +98,14 @@ func (m *RocmSmiCollector) Init(config json.RawMessage) error {
} }
device, ret := rocm_smi.DeviceGetHandleByIndex(i) device, ret := rocm_smi.DeviceGetHandleByIndex(i)
if ret != rocm_smi.STATUS_SUCCESS { if ret != rocm_smi.STATUS_SUCCESS {
err = fmt.Errorf("Failed to get handle for GPU %d", i) err = fmt.Errorf("failed to get handle for GPU %d", i)
cclog.ComponentError(m.name, err.Error()) cclog.ComponentError(m.name, err.Error())
return err return err
} }
pciInfo, ret := rocm_smi.DeviceGetPciInfo(device) pciInfo, ret := rocm_smi.DeviceGetPciInfo(device)
if ret != rocm_smi.STATUS_SUCCESS { if ret != rocm_smi.STATUS_SUCCESS {
err = fmt.Errorf("Failed to get PCI information for GPU %d", i) err = fmt.Errorf("failed to get PCI information for GPU %d", i)
cclog.ComponentError(m.name, err.Error()) cclog.ComponentError(m.name, err.Error())
return err return err
} }

View File

@ -4,8 +4,8 @@ import (
"encoding/json" "encoding/json"
"time" "time"
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
) )
// These are the fields we read from the JSON configuration // These are the fields we read from the JSON configuration

View File

@ -5,8 +5,8 @@ import (
"sync" "sync"
"time" "time"
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
) )
// These are the fields we read from the JSON configuration // These are the fields we read from the JSON configuration

View File

@ -0,0 +1,154 @@
package collectors
import (
"bufio"
"encoding/json"
"fmt"
"math"
"os"
"strconv"
"strings"
"time"
cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
)
const SCHEDSTATFILE = `/proc/schedstat`
// These are the fields we read from the JSON configuration
type SchedstatCollectorConfig struct {
ExcludeMetrics []string `json:"exclude_metrics,omitempty"`
}
// This contains all variables we need during execution and the variables
// defined by metricCollector (name, init, ...)
type SchedstatCollector struct {
metricCollector
config SchedstatCollectorConfig // the configuration structure
lastTimestamp time.Time // Store time stamp of last tick to derive values
meta map[string]string // default meta information
cputags map[string]map[string]string // default tags
olddata map[string]map[string]int64 // default tags
}
// Functions to implement MetricCollector interface
// Init(...), Read(...), Close()
// See: metricCollector.go
// Init initializes the sample collector
// Called once by the collector manager
// All tags, meta data tags and metrics that do not change over the runtime should be set here
func (m *SchedstatCollector) Init(config json.RawMessage) error {
var err error = nil
// Always set the name early in Init() to use it in cclog.Component* functions
m.name = "SchedstatCollector"
// This is for later use, also call it early
m.setup()
// Tell whether the collector should be run in parallel with others (reading files, ...)
// or it should be run serially, mostly for collectors acutally doing measurements
// because they should not measure the execution of the other collectors
m.parallel = true
// Define meta information sent with each metric
// (Can also be dynamic or this is the basic set with extension through AddMeta())
m.meta = map[string]string{"source": m.name, "group": "SCHEDSTAT"}
// Read in the JSON configuration
if len(config) > 0 {
err = json.Unmarshal(config, &m.config)
if err != nil {
cclog.ComponentError(m.name, "Error reading config:", err.Error())
return err
}
}
// Check input file
file, err := os.Open(string(SCHEDSTATFILE))
if err != nil {
cclog.ComponentError(m.name, err.Error())
}
defer file.Close()
// Pre-generate tags for all CPUs
num_cpus := 0
m.cputags = make(map[string]map[string]string)
m.olddata = make(map[string]map[string]int64)
scanner := bufio.NewScanner(file)
for scanner.Scan() {
line := scanner.Text()
linefields := strings.Fields(line)
if strings.HasPrefix(linefields[0], "cpu") && strings.Compare(linefields[0], "cpu") != 0 {
cpustr := strings.TrimLeft(linefields[0], "cpu")
cpu, _ := strconv.Atoi(cpustr)
running, _ := strconv.ParseInt(linefields[7], 10, 64)
waiting, _ := strconv.ParseInt(linefields[8], 10, 64)
m.cputags[linefields[0]] = map[string]string{"type": "hwthread", "type-id": fmt.Sprintf("%d", cpu)}
m.olddata[linefields[0]] = map[string]int64{"running": running, "waiting": waiting}
num_cpus++
}
}
// Save current timestamp
m.lastTimestamp = time.Now()
// Set this flag only if everything is initialized properly, all required files exist, ...
m.init = true
return err
}
func (m *SchedstatCollector) ParseProcLine(linefields []string, tags map[string]string, output chan lp.CCMetric, now time.Time, tsdelta time.Duration) {
running, _ := strconv.ParseInt(linefields[7], 10, 64)
waiting, _ := strconv.ParseInt(linefields[8], 10, 64)
diff_running := running - m.olddata[linefields[0]]["running"]
diff_waiting := waiting - m.olddata[linefields[0]]["waiting"]
var l_running float64 = float64(diff_running) / tsdelta.Seconds() / (math.Pow(1000, 3))
var l_waiting float64 = float64(diff_waiting) / tsdelta.Seconds() / (math.Pow(1000, 3))
m.olddata[linefields[0]]["running"] = running
m.olddata[linefields[0]]["waiting"] = waiting
value := l_running + l_waiting
y, err := lp.New("cpu_load_core", tags, m.meta, map[string]interface{}{"value": value}, now)
if err == nil {
// Send it to output channel
output <- y
}
}
// Read collects all metrics belonging to the sample collector
// and sends them through the output channel to the collector manager
func (m *SchedstatCollector) Read(interval time.Duration, output chan lp.CCMetric) {
if !m.init {
return
}
//timestamps
now := time.Now()
tsdelta := now.Sub(m.lastTimestamp)
file, err := os.Open(string(SCHEDSTATFILE))
if err != nil {
cclog.ComponentError(m.name, err.Error())
}
defer file.Close()
scanner := bufio.NewScanner(file)
for scanner.Scan() {
line := scanner.Text()
linefields := strings.Fields(line)
if strings.HasPrefix(linefields[0], "cpu") {
m.ParseProcLine(linefields, m.cputags[linefields[0]], output, now, tsdelta)
}
}
m.lastTimestamp = now
}
// Close metric collector: close network connection, close files, close libraries, ...
// Called once by the collector manager
func (m *SchedstatCollector) Close() {
// Unset flag
m.init = false
}

View File

@ -0,0 +1,11 @@
## `schedstat` collector
```json
"schedstat": {
}
```
The `schedstat` collector reads data from /proc/schedstat and calculates a load value, separated by hwthread. This might be useful to detect bad cpu pinning on shared nodes etc.
Metric:
* `cpu_load_core`

View File

@ -3,14 +3,14 @@ package collectors
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"io/ioutil" "os"
"path/filepath" "path/filepath"
"strconv" "strconv"
"strings" "strings"
"time" "time"
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
) )
// See: https://www.kernel.org/doc/html/latest/hwmon/sysfs-interface.html // See: https://www.kernel.org/doc/html/latest/hwmon/sysfs-interface.html
@ -83,14 +83,14 @@ func (m *TempCollector) Init(config json.RawMessage) error {
// sensor name // sensor name
nameFile := filepath.Join(filepath.Dir(file), "name") nameFile := filepath.Join(filepath.Dir(file), "name")
name, err := ioutil.ReadFile(nameFile) name, err := os.ReadFile(nameFile)
if err == nil { if err == nil {
sensor.name = strings.TrimSpace(string(name)) sensor.name = strings.TrimSpace(string(name))
} }
// sensor label // sensor label
labelFile := strings.TrimSuffix(file, "_input") + "_label" labelFile := strings.TrimSuffix(file, "_input") + "_label"
label, err := ioutil.ReadFile(labelFile) label, err := os.ReadFile(labelFile)
if err == nil { if err == nil {
sensor.label = strings.TrimSpace(string(label)) sensor.label = strings.TrimSpace(string(label))
} }
@ -117,7 +117,7 @@ func (m *TempCollector) Init(config json.RawMessage) error {
} }
// Sensor file // Sensor file
_, err = ioutil.ReadFile(file) _, err = os.ReadFile(file)
if err != nil { if err != nil {
continue continue
} }
@ -139,7 +139,7 @@ func (m *TempCollector) Init(config json.RawMessage) error {
// max temperature // max temperature
if m.config.ReportMaxTemp { if m.config.ReportMaxTemp {
maxTempFile := strings.TrimSuffix(file, "_input") + "_max" maxTempFile := strings.TrimSuffix(file, "_input") + "_max"
if buffer, err := ioutil.ReadFile(maxTempFile); err == nil { if buffer, err := os.ReadFile(maxTempFile); err == nil {
if x, err := strconv.ParseInt(strings.TrimSpace(string(buffer)), 10, 64); err == nil { if x, err := strconv.ParseInt(strings.TrimSpace(string(buffer)), 10, 64); err == nil {
sensor.maxTempName = strings.Replace(sensor.metricName, "temp", "max_temp", 1) sensor.maxTempName = strings.Replace(sensor.metricName, "temp", "max_temp", 1)
sensor.maxTemp = x / 1000 sensor.maxTemp = x / 1000
@ -150,7 +150,7 @@ func (m *TempCollector) Init(config json.RawMessage) error {
// critical temperature // critical temperature
if m.config.ReportCriticalTemp { if m.config.ReportCriticalTemp {
criticalTempFile := strings.TrimSuffix(file, "_input") + "_crit" criticalTempFile := strings.TrimSuffix(file, "_input") + "_crit"
if buffer, err := ioutil.ReadFile(criticalTempFile); err == nil { if buffer, err := os.ReadFile(criticalTempFile); err == nil {
if x, err := strconv.ParseInt(strings.TrimSpace(string(buffer)), 10, 64); err == nil { if x, err := strconv.ParseInt(strings.TrimSpace(string(buffer)), 10, 64); err == nil {
sensor.critTempName = strings.Replace(sensor.metricName, "temp", "crit_temp", 1) sensor.critTempName = strings.Replace(sensor.metricName, "temp", "crit_temp", 1)
sensor.critTemp = x / 1000 sensor.critTemp = x / 1000
@ -175,7 +175,7 @@ func (m *TempCollector) Read(interval time.Duration, output chan lp.CCMetric) {
for _, sensor := range m.sensors { for _, sensor := range m.sensors {
// Read sensor file // Read sensor file
buffer, err := ioutil.ReadFile(sensor.file) buffer, err := os.ReadFile(sensor.file)
if err != nil { if err != nil {
cclog.ComponentError( cclog.ComponentError(
m.name, m.name,

View File

@ -9,7 +9,7 @@ import (
"strings" "strings"
"time" "time"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
) )
const MAX_NUM_PROCS = 10 const MAX_NUM_PROCS = 10

60
docs/building.md Normal file
View File

@ -0,0 +1,60 @@
# Building the cc-metric-collector
In most cases, a simple `make` in the main folder is enough to get a `cc-metric-collector` binary. It is basically a `go build` but some collectors require additional tasks. There is currently no Golang interface to LIKWID, so it uses `cgo` to create bindings but `cgo` requires the LIKWID header files. Therefore, it checks whether LIKWID is installed and if not it downloads LIKWID and copies the headers.
## System integration
The main configuration settings for system integration are pre-defined in `scripts/cc-metric-collector.config`. The file contains the UNIX user and group used for execution, the PID file location and other settings. Adjust it accordingly and copy it to `/etc/default/cc-metric-collector`
```bash
$ install --mode 644 \
--owner $CC_USER \
--group $CC_GROUP \
scripts/cc-metric-collector.config /etc/default/cc-metric-collector
$ edit /etc/default/cc-metric-collector
```
### SysVinit and similar
If you are using a init system based in `/etc/init.d` daemons, you can use the sample `scripts/cc-metric-collector.init`. It reads the basic configuration from `/etc/default/cc-metric-collector`
```bash
$ install --mode 755 \
--owner $CC_USER \
--group $CC_GROUP \
scripts/cc-metric-collector.init /etc/init.d/cc-metric-collector
```
### Systemd
If you are using `systemd` as init system, you can use the sample systemd service file `scripts/cc-metric-collector.service`, the configuration file `scripts/cc-metric-collector.config`.
```bash
$ install --mode 644 \
--owner $CC_USER \
--group $CC_GROUP \
scripts/cc-metric-collector.service /etc/systemd/system/cc-metric-collector.service
$ systemctl enable cc-metric-collector
```
## RPM
In order to get a RPM packages for cc-metric-collector, just use:
```bash
$ make RPM
```
It uses the RPM SPEC file `scripts/cc-metric-collector.spec` and requires the RPM tools (`rpm` and `rpmspec`) and `git`.
## DEB
In order to get very simple Debian packages for cc-metric-collector, just use:
```bash
$ make DEB
```
It uses the DEB control file `scripts/cc-metric-collector.control` and requires `dpkg-deb`, `awk`, `sed` and `git`. It creates only a binary deb package.
_This option is not well tested and therefore experimental_

23
docs/introduction.md Normal file
View File

@ -0,0 +1,23 @@
# The ClusterCockpit Project
The ClusterCockpit project is a joined project of computing centers in Europe to set up a cluster monitoring stack for small to mid-sized computing centers under the lead of NHR@FAU.
# The ClusterCockpit Stack
In cluster environment, there are commonly a lot of systems dedicated for computation, backend servers for file systems and frontend servers for the user interaction and cluster control. The ClusterCockpit Stack is mainly used for monitoring the compute systems with some interaction to the frontend servers. It consists of multiple components:
- cc-metric-collector: Monitor resource usage on the compute systems
- cc-metric-store: In-memory database
- cc-backend & cc-frontend: The web-based visualizer
# CC Metric Collector
The CC Metric Collector project was started to provide a useful set of metrics for HPC and data science related compute systems. It runs as a system daemon and gathers system data periodically to forward the metrics to one or more databases. One of the provided backends can be used for the cc-metric-store but many others exist like InfluxDB time-series databases, the Ganglia Monitoring System or the Prometheus Monitoring System.
The data is gathered by so-called "Collectors", forwarded to an internal router for on-the-fly manipulation (tagging, aggregation, ...) which pushes the metrics to the different metric writers called "Sinks". There is a forth component, the "Receivers", which receive data through some networking system like a HTTP server at any time.
# CC Metric Store
The CC Metric Store is a data management system with short-term in-memory and long-term file-base metric storage.
# CC Backend and CC Frontend
The CC Backend and Frontend form together the web interface for ClusterCockpit.

2
go.mod
View File

@ -13,6 +13,7 @@ require (
github.com/nats-io/nats.go v1.16.0 github.com/nats-io/nats.go v1.16.0
github.com/prometheus/client_golang v1.12.2 github.com/prometheus/client_golang v1.12.2
github.com/stmcginnis/gofish v0.13.0 github.com/stmcginnis/gofish v0.13.0
github.com/tklauser/go-sysconf v0.3.10
golang.org/x/sys v0.0.0-20220712014510-0a85c31ab51e golang.org/x/sys v0.0.0-20220712014510-0a85c31ab51e
) )
@ -31,6 +32,7 @@ require (
github.com/prometheus/common v0.37.0 // indirect github.com/prometheus/common v0.37.0 // indirect
github.com/prometheus/procfs v0.7.3 // indirect github.com/prometheus/procfs v0.7.3 // indirect
github.com/shopspring/decimal v1.3.1 // indirect github.com/shopspring/decimal v1.3.1 // indirect
github.com/tklauser/numcpus v0.4.0 // indirect
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d // indirect golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d // indirect
golang.org/x/net v0.0.0-20220708220712-1185a9018129 // indirect golang.org/x/net v0.0.0-20220708220712-1185a9018129 // indirect
google.golang.org/protobuf v1.28.0 // indirect google.golang.org/protobuf v1.28.0 // indirect

5
go.sum
View File

@ -287,6 +287,10 @@ github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY= github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/tklauser/go-sysconf v0.3.10 h1:IJ1AZGZRWbY8T5Vfk04D9WOA5WSejdflXxP03OUqALw=
github.com/tklauser/go-sysconf v0.3.10/go.mod h1:C8XykCvCb+Gn0oNCWPIlcb0RuglQTYaQ2hGm7jmxEFk=
github.com/tklauser/numcpus v0.4.0 h1:E53Dm1HjH1/R2/aoCtXtPgzmElmn51aOkhCFSuZq//o=
github.com/tklauser/numcpus v0.4.0/go.mod h1:1+UI3pD8NW14VMwdgJNJ1ESk2UnwhAnz5hMwiKKqXCQ=
github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw= github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw=
github.com/ugorji/go v1.2.7/go.mod h1:nF9osbDWLy6bDVv/Rtoh6QgnvNDpmCalQV5urGCCS6M= github.com/ugorji/go v1.2.7/go.mod h1:nF9osbDWLy6bDVv/Rtoh6QgnvNDpmCalQV5urGCCS6M=
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY= github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
@ -445,6 +449,7 @@ golang.org/x/sys v0.0.0-20211019181941-9d821ace8654/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20211103235746-7861aae1554b/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211103235746-7861aae1554b/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220128215802-99c3d69c2c27/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220513210249-45d2b4557a2a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220513210249-45d2b4557a2a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220712014510-0a85c31ab51e h1:NHvCuwuS43lGnYhten69ZWqi2QOj/CiDNcKbVqwVoew= golang.org/x/sys v0.0.0-20220712014510-0a85c31ab51e h1:NHvCuwuS43lGnYhten69ZWqi2QOj/CiDNcKbVqwVoew=
golang.org/x/sys v0.0.0-20220712014510-0a85c31ab51e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220712014510-0a85c31ab51e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=

View File

@ -1,32 +0,0 @@
# ClusterCockpit metrics
As described in the [ClusterCockpit specifications](https://github.com/ClusterCockpit/cc-specifications), the whole ClusterCockpit stack uses metrics in the InfluxDB line protocol format. This is also the input and output format for the ClusterCockpit Metric Collector but internally it uses an extended format while processing, named CCMetric.
It is basically a copy of the [InfluxDB line protocol](https://github.com/influxdata/line-protocol) `MutableMetric` interface with one extension. Besides the tags and fields, it contains a list of meta information (re-using the `Tag` structure of the original protocol):
```golang
type ccMetric struct {
name string // same as
tags []*influx.Tag // original
fields []*influx.Field // Influx
tm time.Time // line-protocol
meta []*influx.Tag
}
type CCMetric interface {
influx.MutableMetric // the same functions as defined by influx.MutableMetric
RemoveTag(key string) // this is not published by the original influx.MutableMetric
Meta() map[string]string
MetaList() []*inlux.Tag
AddMeta(key, value string)
HasMeta(key string) bool
GetMeta(key string) (string, bool)
RemoveMeta(key string)
}
```
The `CCMetric` interface provides the same functions as the `MutableMetric` like `{Add, Remove, Has}{Tag, Field}` and additionally provides `{Add, Remove, Has}Meta`.
The InfluxDB protocol creates a new metric with `influx.New(name, tags, fields, time)` while CCMetric uses `ccMetric.New(name, tags, meta, fields, time)` where `tags` and `meta` are both of type `map[string]string`.
You can copy a CCMetric with `FromMetric(other CCMetric) CCMetric`. If you get an `influx.Metric` from a function, like the line protocol parser, you can use `FromInfluxMetric(other influx.Metric) CCMetric` to get a CCMetric out of it (see `NatsReceiver` for an example).

View File

@ -9,10 +9,10 @@ import (
"sync" "sync"
"time" "time"
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
topo "github.com/ClusterCockpit/cc-metric-collector/internal/ccTopology" topo "github.com/ClusterCockpit/cc-metric-collector/pkg/ccTopology"
"github.com/PaesslerAG/gval" "github.com/PaesslerAG/gval"
) )

View File

@ -8,8 +8,8 @@ import (
"sort" "sort"
"strings" "strings"
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
topo "github.com/ClusterCockpit/cc-metric-collector/internal/ccTopology" topo "github.com/ClusterCockpit/cc-metric-collector/pkg/ccTopology"
) )
/* /*

View File

@ -4,11 +4,11 @@ import (
"sync" "sync"
"time" "time"
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
agg "github.com/ClusterCockpit/cc-metric-collector/internal/metricAggregator" agg "github.com/ClusterCockpit/cc-metric-collector/internal/metricAggregator"
mct "github.com/ClusterCockpit/cc-metric-collector/internal/multiChanTicker" lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
mct "github.com/ClusterCockpit/cc-metric-collector/pkg/multiChanTicker"
) )
type metricCachePeriod struct { type metricCachePeriod struct {

View File

@ -7,11 +7,11 @@ import (
"sync" "sync"
"time" "time"
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
agg "github.com/ClusterCockpit/cc-metric-collector/internal/metricAggregator" agg "github.com/ClusterCockpit/cc-metric-collector/internal/metricAggregator"
mct "github.com/ClusterCockpit/cc-metric-collector/internal/multiChanTicker" lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
mct "github.com/ClusterCockpit/cc-metric-collector/pkg/multiChanTicker"
units "github.com/ClusterCockpit/cc-units" units "github.com/ClusterCockpit/cc-units"
) )

57
pkg/ccMetric/README.md Normal file
View File

@ -0,0 +1,57 @@
# ClusterCockpit metrics
As described in the [ClusterCockpit specifications](https://github.com/ClusterCockpit/cc-specifications), the whole ClusterCockpit stack uses metrics in the InfluxDB line protocol format. This is also the input and output format for the ClusterCockpit Metric Collector but internally it uses an extended format while processing, named CCMetric.
It is basically a copy of the [InfluxDB line protocol](https://github.com/influxdata/line-protocol) `MutableMetric` interface with one extension. Besides the tags and fields, it contains a list of meta information (re-using the `Tag` structure of the original protocol):
```golang
type ccMetric struct {
name string // Measurement name
meta map[string]string // map of meta data tags
tags map[string]string // map of of tags
fields map[string]interface{} // map of of fields
tm time.Time // timestamp
}
type CCMetric interface {
ToPoint(metaAsTags map[string]bool) *write.Point // Generate influxDB point for data type ccMetric
ToLineProtocol(metaAsTags map[string]bool) string // Generate influxDB line protocol for data type ccMetric
String() string // Return line-protocol like string
Name() string // Get metric name
SetName(name string) // Set metric name
Time() time.Time // Get timestamp
SetTime(t time.Time) // Set timestamp
Tags() map[string]string // Map of tags
AddTag(key, value string) // Add a tag
GetTag(key string) (value string, ok bool) // Get a tag by its key
HasTag(key string) (ok bool) // Check if a tag key is present
RemoveTag(key string) // Remove a tag by its key
Meta() map[string]string // Map of meta data tags
AddMeta(key, value string) // Add a meta data tag
GetMeta(key string) (value string, ok bool) // Get a meta data tab addressed by its key
HasMeta(key string) (ok bool) // Check if a meta data key is present
RemoveMeta(key string) // Remove a meta data tag by its key
Fields() map[string]interface{} // Map of fields
AddField(key string, value interface{}) // Add a field
GetField(key string) (value interface{}, ok bool) // Get a field addressed by its key
HasField(key string) (ok bool) // Check if a field key is present
RemoveField(key string) // Remove a field addressed by its key
}
func New(name string, tags map[string]string, meta map[string]string, fields map[string]interface{}, tm time.Time) (CCMetric, error)
func FromMetric(other CCMetric) CCMetric
func FromInfluxMetric(other lp.Metric) CCMetric
```
The `CCMetric` interface provides the same functions as the `MutableMetric` like `{Add, Get, Remove, Has}{Tag, Field}` and additionally provides `{Add, Get, Remove, Has}Meta`.
The InfluxDB protocol creates a new metric with `influx.New(name, tags, fields, time)` while CCMetric uses `ccMetric.New(name, tags, meta, fields, time)` where `tags` and `meta` are both of type `map[string]string`.
You can copy a CCMetric with `FromMetric(other CCMetric) CCMetric`. If you get an `influx.Metric` from a function, like the line protocol parser, you can use `FromInfluxMetric(other influx.Metric) CCMetric` to get a CCMetric out of it (see `NatsReceiver` for an example).
Although the [cc-specifications](https://github.com/ClusterCockpit/cc-specifications/blob/master/interfaces/lineprotocol/README.md) defines that there is only a `value` field for the metric value, the CCMetric still can have multiple values similar to the InfluxDB line protocol.

View File

@ -50,6 +50,7 @@ type CCMetric interface {
GetField(key string) (value interface{}, ok bool) // Get a field addressed by its key GetField(key string) (value interface{}, ok bool) // Get a field addressed by its key
HasField(key string) (ok bool) // Check if a field key is present HasField(key string) (ok bool) // Check if a field key is present
RemoveField(key string) // Remove a field addressed by its key RemoveField(key string) // Remove a field addressed by its key
String() string // Return line-protocol like string
} }
// String implements the stringer interface for data type ccMetric // String implements the stringer interface for data type ccMetric
@ -217,23 +218,26 @@ func New(
} }
// FromMetric copies the metric <other> // FromMetric copies the metric <other>
func FromMetric(other ccMetric) CCMetric { func FromMetric(other CCMetric) CCMetric {
otags := other.Tags()
ometa := other.Meta()
ofields := other.Fields()
m := &ccMetric{ m := &ccMetric{
name: other.Name(), name: other.Name(),
tags: make(map[string]string, len(other.tags)), tags: make(map[string]string, len(otags)),
meta: make(map[string]string, len(other.meta)), meta: make(map[string]string, len(ometa)),
fields: make(map[string]interface{}, len(other.fields)), fields: make(map[string]interface{}, len(ofields)),
tm: other.Time(), tm: other.Time(),
} }
// deep copy tags, meta data tags and fields // deep copy tags, meta data tags and fields
for key, value := range other.tags { for key, value := range otags {
m.tags[key] = value m.tags[key] = value
} }
for key, value := range other.meta { for key, value := range ometa {
m.meta[key] = value m.meta[key] = value
} }
for key, value := range other.fields { for key, value := range ofields {
m.fields[key] = value m.fields[key] = value
} }
return m return m

View File

@ -10,7 +10,7 @@ import (
"strconv" "strconv"
"strings" "strings"
cclogger "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" cclogger "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
) )
const SYSFS_NUMABASE = `/sys/devices/system/node` const SYSFS_NUMABASE = `/sys/devices/system/node`

View File

@ -3,7 +3,7 @@ package multiChanTicker
import ( import (
"time" "time"
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
) )
type multiChanTicker struct { type multiChanTicker struct {

View File

@ -5,13 +5,13 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"io/ioutil" "io"
"net/http" "net/http"
"strings" "strings"
"sync" "sync"
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
"github.com/gorilla/mux" "github.com/gorilla/mux"
influx "github.com/influxdata/line-protocol" influx "github.com/influxdata/line-protocol"
) )
@ -84,7 +84,7 @@ func (r *HttpReceiver) ServerHttp(w http.ResponseWriter, req *http.Request) {
return return
} }
body, err := ioutil.ReadAll(req.Body) body, err := io.ReadAll(req.Body)
if err != nil { if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
return return

View File

@ -1,7 +1,7 @@
package receivers package receivers
import ( import (
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
) )
type defaultReceiverConfig struct { type defaultReceiverConfig struct {

View File

@ -6,8 +6,8 @@ import (
"fmt" "fmt"
"time" "time"
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
influx "github.com/influxdata/line-protocol" influx "github.com/influxdata/line-protocol"
nats "github.com/nats-io/nats.go" nats "github.com/nats-io/nats.go"
) )

View File

@ -12,8 +12,8 @@ import (
"sync" "sync"
"time" "time"
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
) )
type PrometheusReceiverConfig struct { type PrometheusReceiverConfig struct {

View File

@ -5,8 +5,8 @@ import (
"os" "os"
"sync" "sync"
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
) )
var AvailableReceivers = map[string]func(name string, config json.RawMessage) (Receiver, error){ var AvailableReceivers = map[string]func(name string, config json.RawMessage) (Receiver, error){

View File

@ -1,129 +1,272 @@
package receivers package receivers
import ( import (
"crypto/tls"
"encoding/json" "encoding/json"
"fmt" "fmt"
"net/http"
"strconv" "strconv"
"strings"
"sync" "sync"
"time" "time"
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
// See: https://pkg.go.dev/github.com/stmcginnis/gofish // See: https://pkg.go.dev/github.com/stmcginnis/gofish
"github.com/stmcginnis/gofish" "github.com/stmcginnis/gofish"
"github.com/stmcginnis/gofish/common"
"github.com/stmcginnis/gofish/redfish"
) )
type RedfishReceiverClientConfig struct {
// Hostname the redfish service belongs to
Hostname string
// is metric excluded globally or per client
isExcluded map[string](bool)
doPowerMetric bool
doProcessorMetrics bool
doThermalMetrics bool
skipProcessorMetricsURL map[string]bool
gofish gofish.ClientConfig
}
// RedfishReceiver configuration: // RedfishReceiver configuration:
type RedfishReceiver struct { type RedfishReceiver struct {
receiver receiver
config struct { config struct {
Type string `json:"type"` fanout int
Fanout int `json:"fanout,omitempty"` // Default fanout: 64 Interval time.Duration
Interval int `json:"interval,omitempty"` // Default interval: 30s HttpTimeout time.Duration
// Client config for each redfish service // Client config for each redfish service
ClientConfigs []struct { ClientConfigs []RedfishReceiverClientConfig
Hostname *string `json:"hostname"`
Username *string `json:"username"`
Password *string `json:"password"`
Endpoint *string `json:"endpoint"`
Insecure *bool `json:"insecure,omitempty"`
ExcludeMetrics []string `json:"exclude_metrics,omitempty"`
gofish gofish.ClientConfig
} `json:"client_config"`
} }
done chan bool // channel to finish / stop redfish receiver done chan bool // channel to finish / stop redfish receiver
wg sync.WaitGroup // wait group for redfish receiver wg sync.WaitGroup // wait group for redfish receiver
} }
// Start starts the redfish receiver // readThermalMetrics reads thermal metrics from a redfish device
func (r *RedfishReceiver) Start() { func (r *RedfishReceiver) readThermalMetrics(
cclog.ComponentDebug(r.name, "START") clientConfig *RedfishReceiverClientConfig,
chassis *redfish.Chassis) error {
// readPowerMetric reads readfish power metric from the endpoint configured in conf // Get thermal information for each chassis
readPowerMetric := func(clientConfigIndex int) error { thermal, err := chassis.Thermal()
clientConfig := &r.config.ClientConfigs[clientConfigIndex]
// Connect to redfish service
c, err := gofish.Connect(clientConfig.gofish)
if err != nil { if err != nil {
c := struct { return fmt.Errorf("readMetrics: chassis.Thermal() failed: %v", err)
Username string }
Endpoint string
BasicAuth bool // Skip empty thermal information
Insecure bool if thermal == nil {
}{ return nil
Username: clientConfig.gofish.Username,
Endpoint: clientConfig.gofish.Endpoint,
BasicAuth: clientConfig.gofish.BasicAuth,
Insecure: clientConfig.gofish.Insecure,
}
return fmt.Errorf("readPowerMetric: gofish.Connect(%+v) failed: %v", c, err)
}
defer c.Logout()
// Get all chassis managed by this service
chassis_list, err := c.Service.Chassis()
if err != nil {
return fmt.Errorf("readPowerMetric: c.Service.Chassis() failed: %v", err)
} }
for _, chassis := range chassis_list {
timestamp := time.Now() timestamp := time.Now()
for _, temperature := range thermal.Temperatures {
// Skip, when temperature metric is excluded
if clientConfig.isExcluded["temperature"] {
break
}
// Skip all temperatures which are not in enabled state
if temperature.Status.State != "" && temperature.Status.State != common.EnabledState {
continue
}
tags := map[string]string{
"hostname": clientConfig.Hostname,
"type": "node",
// ChassisType shall indicate the physical form factor for the type of chassis
"chassis_typ": string(chassis.ChassisType),
// Chassis name
"chassis_name": chassis.Name,
// ID uniquely identifies the resource
"temperature_id": temperature.ID,
// MemberID shall uniquely identify the member within the collection. For
// services supporting Redfish v1.6 or higher, this value shall be the
// zero-based array index.
"temperature_member_id": temperature.MemberID,
// PhysicalContext shall be a description of the affected device or region
// within the chassis to which this temperature measurement applies
"temperature_physical_context": string(temperature.PhysicalContext),
// Name
"temperature_name": temperature.Name,
}
// Delete empty tags
for key, value := range tags {
if value == "" {
delete(tags, key)
}
}
// Set meta data tags
meta := map[string]string{
"source": r.name,
"group": "Temperature",
"unit": "degC",
}
// ReadingCelsius shall be the current value of the temperature sensor's reading.
value := temperature.ReadingCelsius
y, err := lp.New("temperature", tags, meta,
map[string]interface{}{
"value": value,
},
timestamp)
if err == nil {
r.sink <- y
}
}
for _, fan := range thermal.Fans {
// Skip, when fan_speed metric is excluded
if clientConfig.isExcluded["fan_speed"] {
break
}
// Skip all fans which are not in enabled state
if fan.Status.State != common.EnabledState {
continue
}
tags := map[string]string{
"hostname": clientConfig.Hostname,
"type": "node",
// ChassisType shall indicate the physical form factor for the type of chassis
"chassis_typ": string(chassis.ChassisType),
// Chassis name
"chassis_name": chassis.Name,
// ID uniquely identifies the resource
"fan_id": fan.ID,
// MemberID shall uniquely identify the member within the collection. For
// services supporting Redfish v1.6 or higher, this value shall be the
// zero-based array index.
"fan_member_id": fan.MemberID,
// PhysicalContext shall be a description of the affected device or region
// within the chassis to which this fan is associated
"fan_physical_context": string(fan.PhysicalContext),
// Name
"fan_name": fan.Name,
}
// Delete empty tags
for key, value := range tags {
if value == "" {
delete(tags, key)
}
}
// Set meta data tags
meta := map[string]string{
"source": r.name,
"group": "FanSpeed",
"unit": string(fan.ReadingUnits),
}
// Reading shall be the current value of the fan sensor's reading
value := fan.Reading
y, err := lp.New("fan_speed", tags, meta,
map[string]interface{}{
"value": value,
},
timestamp)
if err == nil {
r.sink <- y
}
}
return nil
}
// readPowerMetrics reads power metrics from a redfish device
func (r *RedfishReceiver) readPowerMetrics(
clientConfig *RedfishReceiverClientConfig,
chassis *redfish.Chassis) error {
// Get power information for each chassis // Get power information for each chassis
power, err := chassis.Power() power, err := chassis.Power()
if err != nil { if err != nil {
return fmt.Errorf("readPowerMetric: chassis.Power() failed: %v", err) return fmt.Errorf("readMetrics: chassis.Power() failed: %v", err)
} }
// Skip empty power information
if power == nil { if power == nil {
continue return nil
} }
timestamp := time.Now()
// Read min, max and average consumed watts for each power control // Read min, max and average consumed watts for each power control
for _, pc := range power.PowerControl { for _, pc := range power.PowerControl {
// Skip all power controls which are not in enabled state
if pc.Status.State != "" && pc.Status.State != common.EnabledState {
continue
}
// Map of collected metrics // Map of collected metrics
metrics := map[string]float32{ metrics := make(map[string]float32)
// PowerConsumedWatts shall represent the actual power being consumed (in // PowerConsumedWatts shall represent the actual power being consumed (in
// Watts) by the chassis // Watts) by the chassis
"consumed_watts": pc.PowerConsumedWatts, if !clientConfig.isExcluded["consumed_watts"] {
metrics["consumed_watts"] = pc.PowerConsumedWatts
}
// AverageConsumedWatts shall represent the // AverageConsumedWatts shall represent the
// average power level that occurred averaged over the last IntervalInMin // average power level that occurred averaged over the last IntervalInMin
// minutes. // minutes.
"average_consumed_watts": pc.PowerMetrics.AverageConsumedWatts, if !clientConfig.isExcluded["average_consumed_watts"] {
metrics["average_consumed_watts"] = pc.PowerMetrics.AverageConsumedWatts
}
// MinConsumedWatts shall represent the // MinConsumedWatts shall represent the
// minimum power level in watts that occurred within the last // minimum power level in watts that occurred within the last
// IntervalInMin minutes. // IntervalInMin minutes.
"min_consumed_watts": pc.PowerMetrics.MinConsumedWatts, if !clientConfig.isExcluded["min_consumed_watts"] {
metrics["min_consumed_watts"] = pc.PowerMetrics.MinConsumedWatts
}
// MaxConsumedWatts shall represent the // MaxConsumedWatts shall represent the
// maximum power level in watts that occurred within the last // maximum power level in watts that occurred within the last
// IntervalInMin minutes // IntervalInMin minutes
"max_consumed_watts": pc.PowerMetrics.MaxConsumedWatts, if !clientConfig.isExcluded["max_consumed_watts"] {
} metrics["max_consumed_watts"] = pc.PowerMetrics.MaxConsumedWatts
intervalInMin := strconv.FormatFloat(float64(pc.PowerMetrics.IntervalInMin), 'f', -1, 32)
// Metrics to exclude
for _, key := range clientConfig.ExcludeMetrics {
delete(metrics, key)
} }
// IntervalInMin shall represent the time interval (or window), in minutes,
// in which the PowerMetrics properties are measured over.
// Should be an integer, but some Dell implementations return as a float
intervalInMin :=
strconv.FormatFloat(
float64(pc.PowerMetrics.IntervalInMin), 'f', -1, 32)
// Set tags // Set tags
tags := map[string]string{ tags := map[string]string{
"hostname": *clientConfig.Hostname, "hostname": clientConfig.Hostname,
"type": "node", "type": "node",
// ChassisType shall indicate the physical form factor for the type of chassis
"chassis_typ": string(chassis.ChassisType),
// Chassis name
"chassis_name": chassis.Name,
// ID uniquely identifies the resource // ID uniquely identifies the resource
"id": pc.ID, "power_control_id": pc.ID,
// MemberID shall uniquely identify the member within the collection. For // MemberID shall uniquely identify the member within the collection. For
// services supporting Redfish v1.6 or higher, this value shall be the // services supporting Redfish v1.6 or higher, this value shall be the
// zero-based array index. // zero-based array index.
"member_id": pc.MemberID, "power_control_member_id": pc.MemberID,
// PhysicalContext shall be a description of the affected device(s) or region // PhysicalContext shall be a description of the affected device(s) or region
// within the chassis to which this power control applies. // within the chassis to which this power control applies.
"physical_context": string(pc.PhysicalContext), "power_control_physical_context": string(pc.PhysicalContext),
// Name // Name
"power_control_name": pc.Name, "power_control_name": pc.Name,
} }
@ -162,27 +305,213 @@ func (r *RedfishReceiver) Start() {
} }
} }
} }
return nil
}
// readProcessorMetrics reads processor metrics from a redfish device
// See: https://redfish.dmtf.org/schemas/v1/ProcessorMetrics.json
func (r *RedfishReceiver) readProcessorMetrics(
clientConfig *RedfishReceiverClientConfig,
processor *redfish.Processor) error {
timestamp := time.Now()
// URL to processor metrics
URL := processor.ODataID + "/ProcessorMetrics"
// Skip previously detected non existing URLs
if clientConfig.skipProcessorMetricsURL[URL] {
return nil
}
resp, err := processor.Client.Get(URL)
if err != nil {
// Skip non existing URLs
if statusCode := err.(*common.Error).HTTPReturnedStatusCode; statusCode == http.StatusNotFound {
clientConfig.skipProcessorMetricsURL[URL] = true
return nil
}
return fmt.Errorf("processor.Client.Get(%v) failed: %+w", URL, err)
}
var processorMetrics struct {
common.Entity
ODataType string `json:"@odata.type"`
ODataEtag string `json:"@odata.etag"`
Description string `json:"Description"`
// This property shall contain the power, in watts, that the processor has consumed.
ConsumedPowerWatt float32 `json:"ConsumedPowerWatt"`
// This property shall contain the temperature, in Celsius, of the processor.
TemperatureCelsius float32 `json:"TemperatureCelsius"`
}
err = json.NewDecoder(resp.Body).Decode(&processorMetrics)
if err != nil {
return fmt.Errorf("unable to decode JSON for processor metrics: %+w", err)
}
processorMetrics.SetClient(processor.Client)
// Set tags
tags := map[string]string{
"hostname": clientConfig.Hostname,
"type": "socket",
// ProcessorType shall contain the string which identifies the type of processor contained in this Socket
"processor_typ": string(processor.ProcessorType),
// Processor name
"processor_name": processor.Name,
// ID uniquely identifies the resource
"processor_id": processor.ID,
}
// Delete empty tags
for key, value := range tags {
if value == "" {
delete(tags, key)
}
}
// Set meta data tags
metaPower := map[string]string{
"source": r.name,
"group": "Energy",
"unit": "watts",
}
namePower := "consumed_power"
if !clientConfig.isExcluded[namePower] {
y, err := lp.New(namePower, tags, metaPower,
map[string]interface{}{
"value": processorMetrics.ConsumedPowerWatt,
},
timestamp)
if err == nil {
r.sink <- y
}
}
// Set meta data tags
metaThermal := map[string]string{
"source": r.name,
"group": "Temperature",
"unit": "degC",
}
nameThermal := "temperature"
if !clientConfig.isExcluded[nameThermal] {
y, err := lp.New(nameThermal, tags, metaThermal,
map[string]interface{}{
"value": processorMetrics.TemperatureCelsius,
},
timestamp)
if err == nil {
r.sink <- y
}
}
return nil
}
// readMetrics reads redfish thermal, power and processor metrics from the redfish device
// configured in clientConfig
func (r *RedfishReceiver) readMetrics(clientConfig *RedfishReceiverClientConfig) error {
// Connect to redfish service
c, err := gofish.Connect(clientConfig.gofish)
if err != nil {
return fmt.Errorf(
"readMetrics: gofish.Connect({Username: %v, Endpoint: %v, BasicAuth: %v, HttpTimeout: %v, HttpInsecure: %v}) failed: %v",
clientConfig.gofish.Username,
clientConfig.gofish.Endpoint,
clientConfig.gofish.BasicAuth,
clientConfig.gofish.HTTPClient.Timeout,
clientConfig.gofish.HTTPClient.Transport.(*http.Transport).TLSClientConfig.InsecureSkipVerify,
err)
}
defer c.Logout()
// Create a session, when required
if _, err = c.GetSession(); err != nil {
c, err = c.CloneWithSession()
if err != nil {
return fmt.Errorf("readMetrics: Failed to create a session: %+w", err)
}
}
// Get all chassis managed by this service
isChassisListRequired :=
clientConfig.doThermalMetrics ||
clientConfig.doPowerMetric
var chassisList []*redfish.Chassis
if isChassisListRequired {
chassisList, err = c.Service.Chassis()
if err != nil {
return fmt.Errorf("readMetrics: c.Service.Chassis() failed: %v", err)
}
}
// Get all computer systems managed by this service
isComputerSystemListRequired := clientConfig.doProcessorMetrics
var computerSystemList []*redfish.ComputerSystem
if isComputerSystemListRequired {
computerSystemList, err = c.Service.Systems()
if err != nil {
return fmt.Errorf("readMetrics: c.Service.Systems() failed: %v", err)
}
}
// read thermal metrics
if clientConfig.doThermalMetrics {
for _, chassis := range chassisList {
err := r.readThermalMetrics(clientConfig, chassis)
if err != nil {
return err
}
}
}
// read power metrics
if clientConfig.doPowerMetric {
for _, chassis := range chassisList {
err = r.readPowerMetrics(clientConfig, chassis)
if err != nil {
return err
}
}
}
// read processor metrics
if clientConfig.doProcessorMetrics {
// loop for all computer systems
for _, system := range computerSystemList {
// loop for all processors
processors, err := system.Processors()
if err != nil {
return fmt.Errorf("readMetrics: system.Processors() failed: %v", err)
}
for _, processor := range processors {
err := r.readProcessorMetrics(clientConfig, processor)
if err != nil {
return err
}
}
}
} }
return nil return nil
} }
// doReadPowerMetric read power metrics for all configure redfish services. // doReadMetrics reads metrics from all configure redfish devices.
// To compensate latencies of the Redfish services a fanout is used. // To compensate latencies of the Redfish devices a fanout is used.
doReadPowerMetric := func() { func (r *RedfishReceiver) doReadMetric() {
// Compute fanout to use
realFanout := r.config.Fanout
if len(r.config.ClientConfigs) < realFanout {
realFanout = len(r.config.ClientConfigs)
}
// Create wait group and input channel for workers // Create wait group and input channel for workers
var workerWaitGroup sync.WaitGroup var workerWaitGroup sync.WaitGroup
workerInput := make(chan int, realFanout) workerInput := make(chan *RedfishReceiverClientConfig, r.config.fanout)
// Create worker go routines // Create worker go routines
for i := 0; i < realFanout; i++ { for i := 0; i < r.config.fanout; i++ {
// Increment worker wait group counter // Increment worker wait group counter
workerWaitGroup.Add(1) workerWaitGroup.Add(1)
go func() { go func() {
@ -190,8 +519,8 @@ func (r *RedfishReceiver) Start() {
defer workerWaitGroup.Done() defer workerWaitGroup.Done()
// Read power metrics for each client config // Read power metrics for each client config
for clientConfigIndex := range workerInput { for clientConfig := range workerInput {
err := readPowerMetric(clientConfigIndex) err := r.readMetrics(clientConfig)
if err != nil { if err != nil {
cclog.ComponentError(r.name, err) cclog.ComponentError(r.name, err)
} }
@ -201,9 +530,10 @@ func (r *RedfishReceiver) Start() {
// Distribute client configs to workers // Distribute client configs to workers
for i := range r.config.ClientConfigs { for i := range r.config.ClientConfigs {
// Check done channel status // Check done channel status
select { select {
case workerInput <- i: case workerInput <- &r.config.ClientConfigs[i]:
case <-r.done: case <-r.done:
// process done event // process done event
// Stop workers, clear channel and wait for all workers to finish // Stop workers, clear channel and wait for all workers to finish
@ -220,20 +550,29 @@ func (r *RedfishReceiver) Start() {
workerWaitGroup.Wait() workerWaitGroup.Wait()
} }
// Start starts the redfish receiver
func (r *RedfishReceiver) Start() {
cclog.ComponentDebug(r.name, "START")
// Start redfish receiver // Start redfish receiver
r.wg.Add(1) r.wg.Add(1)
go func() { go func() {
defer r.wg.Done() defer r.wg.Done()
// Create ticker // Create ticker
ticker := time.NewTicker(time.Duration(r.config.Interval) * time.Second) ticker := time.NewTicker(r.config.Interval)
defer ticker.Stop() defer ticker.Stop()
for { for {
doReadPowerMetric() r.doReadMetric()
select { select {
case <-ticker.C: case tickerTime := <-ticker.C:
// Check if we missed the ticker event
if since := time.Since(tickerTime); since > 5*time.Second {
cclog.ComponentInfo(r.name, "Missed ticker event for more then", since)
}
// process ticker event -> continue // process ticker event -> continue
continue continue
case <-r.done: case <-r.done:
@ -246,7 +585,7 @@ func (r *RedfishReceiver) Start() {
cclog.ComponentDebug(r.name, "STARTED") cclog.ComponentDebug(r.name, "STARTED")
} }
// Close redfish receiver // Close closes the redfish receiver
func (r *RedfishReceiver) Close() { func (r *RedfishReceiver) Close() {
cclog.ComponentDebug(r.name, "CLOSE") cclog.ComponentDebug(r.name, "CLOSE")
@ -257,68 +596,227 @@ func (r *RedfishReceiver) Close() {
cclog.ComponentDebug(r.name, "DONE") cclog.ComponentDebug(r.name, "DONE")
} }
// New function to create a new instance of the receiver // NewRedfishReceiver creates a new instance of the redfish receiver
// Initialize the receiver by giving it a name and reading in the config JSON // Initialize the receiver by giving it a name and reading in the config JSON
func NewRedfishReceiver(name string, config json.RawMessage) (Receiver, error) { func NewRedfishReceiver(name string, config json.RawMessage) (Receiver, error) {
r := new(RedfishReceiver) r := new(RedfishReceiver)
// Config options from config file
configJSON := struct {
Type string `json:"type"`
// Maximum number of simultaneous redfish connections (default: 64)
Fanout int `json:"fanout,omitempty"`
// How often the redfish power metrics should be read and send to the sink (default: 30 s)
IntervalString string `json:"interval,omitempty"`
// Control whether a client verifies the server's certificate
// (default: true == do not verify server's certificate)
HttpInsecure bool `json:"http_insecure,omitempty"`
// Time limit for requests made by this HTTP client (default: 10 s)
HttpTimeoutString string `json:"http_timeout,omitempty"`
// Default client username, password and endpoint
Username *string `json:"username"` // User name to authenticate with
Password *string `json:"password"` // Password to use for authentication
Endpoint *string `json:"endpoint"` // URL of the redfish service
// Globally disable collection of power, processor or thermal metrics
DisablePowerMetrics bool `json:"disable_power_metrics"`
DisableProcessorMetrics bool `json:"disable_processor_metrics"`
DisableThermalMetrics bool `json:"disable_thermal_metrics"`
// Globally excluded metrics
ExcludeMetrics []string `json:"exclude_metrics,omitempty"`
ClientConfigs []struct {
HostList []string `json:"host_list"` // List of hosts with the same client configuration
Username *string `json:"username"` // User name to authenticate with
Password *string `json:"password"` // Password to use for authentication
Endpoint *string `json:"endpoint"` // URL of the redfish service
// Per client disable collection of power,processor or thermal metrics
DisablePowerMetrics bool `json:"disable_power_metrics"`
DisableProcessorMetrics bool `json:"disable_processor_metrics"`
DisableThermalMetrics bool `json:"disable_thermal_metrics"`
// Per client excluded metrics
ExcludeMetrics []string `json:"exclude_metrics,omitempty"`
} `json:"client_config"`
}{
// Set defaults values
// Allow overwriting these defaults by reading config JSON
Fanout: 64,
IntervalString: "30s",
HttpTimeoutString: "10s",
HttpInsecure: true,
}
// Set name // Set name
r.name = fmt.Sprintf("RedfishReceiver(%s)", name) r.name = fmt.Sprintf("RedfishReceiver(%s)", name)
// Create done channel // Create done channel
r.done = make(chan bool) r.done = make(chan bool)
// Set defaults in r.config
// Allow overwriting these defaults by reading config JSON
r.config.Fanout = 64
r.config.Interval = 30
// Read the redfish receiver specific JSON config // Read the redfish receiver specific JSON config
if len(config) > 0 { if len(config) > 0 {
err := json.Unmarshal(config, &r.config) err := json.Unmarshal(config, &configJSON)
if err != nil { if err != nil {
cclog.ComponentError(r.name, "Error reading config:", err.Error()) cclog.ComponentError(r.name, "Error reading config:", err.Error())
return nil, err return nil, err
} }
} }
// Create gofish client config // Convert interval string representation to duration
for i := range r.config.ClientConfigs { var err error
clientConfig := &r.config.ClientConfigs[i] r.config.Interval, err = time.ParseDuration(configJSON.IntervalString)
gofishConfig := &clientConfig.gofish if err != nil {
err := fmt.Errorf(
if clientConfig.Hostname == nil { "Failed to parse duration string interval='%s': %w",
err := fmt.Errorf("client config number %v requires hostname", i) configJSON.IntervalString,
cclog.ComponentError(r.name, err) err,
)
cclog.Error(r.name, err)
return nil, err return nil, err
} }
if clientConfig.Endpoint == nil { // HTTP timeout duration
r.config.HttpTimeout, err = time.ParseDuration(configJSON.HttpTimeoutString)
if err != nil {
err := fmt.Errorf(
"Failed to parse duration string http_timeout='%s': %w",
configJSON.HttpTimeoutString,
err,
)
cclog.Error(r.name, err)
return nil, err
}
// Create new http client
customTransport := http.DefaultTransport.(*http.Transport).Clone()
customTransport.TLSClientConfig = &tls.Config{
InsecureSkipVerify: configJSON.HttpInsecure,
}
httpClient := &http.Client{
Timeout: r.config.HttpTimeout,
Transport: customTransport,
}
// Initialize client configurations
r.config.ClientConfigs = make([]RedfishReceiverClientConfig, 0)
// Create client config from JSON config
for i := range configJSON.ClientConfigs {
clientConfigJSON := &configJSON.ClientConfigs[i]
var endpoint_pattern string
if clientConfigJSON.Endpoint != nil {
endpoint_pattern = *clientConfigJSON.Endpoint
} else if configJSON.Endpoint != nil {
endpoint_pattern = *configJSON.Endpoint
} else {
err := fmt.Errorf("client config number %v requires endpoint", i) err := fmt.Errorf("client config number %v requires endpoint", i)
cclog.ComponentError(r.name, err) cclog.ComponentError(r.name, err)
return nil, err return nil, err
} }
gofishConfig.Endpoint = *clientConfig.Endpoint
if clientConfig.Username == nil { var username string
if clientConfigJSON.Username != nil {
username = *clientConfigJSON.Username
} else if configJSON.Username != nil {
username = *configJSON.Username
} else {
err := fmt.Errorf("client config number %v requires username", i) err := fmt.Errorf("client config number %v requires username", i)
cclog.ComponentError(r.name, err) cclog.ComponentError(r.name, err)
return nil, err return nil, err
} }
gofishConfig.Username = *clientConfig.Username
if clientConfig.Password == nil { var password string
if clientConfigJSON.Password != nil {
password = *clientConfigJSON.Password
} else if configJSON.Password != nil {
password = *configJSON.Password
} else {
err := fmt.Errorf("client config number %v requires password", i) err := fmt.Errorf("client config number %v requires password", i)
cclog.ComponentError(r.name, err) cclog.ComponentError(r.name, err)
return nil, err return nil, err
} }
gofishConfig.Password = *clientConfig.Password
gofishConfig.Insecure = true // Which metrics should be collected
if clientConfig.Insecure != nil { doPowerMetric :=
gofishConfig.Insecure = *clientConfig.Insecure !(configJSON.DisablePowerMetrics ||
clientConfigJSON.DisablePowerMetrics)
doProcessorMetrics :=
!(configJSON.DisableProcessorMetrics ||
clientConfigJSON.DisableProcessorMetrics)
doThermalMetrics :=
!(configJSON.DisableThermalMetrics ||
clientConfigJSON.DisableThermalMetrics)
// Is metrics excluded globally or per client
isExcluded := make(map[string]bool)
for _, key := range clientConfigJSON.ExcludeMetrics {
isExcluded[key] = true
} }
for _, key := range configJSON.ExcludeMetrics {
isExcluded[key] = true
} }
for _, host := range clientConfigJSON.HostList {
// Endpoint of the redfish service
endpoint := strings.Replace(endpoint_pattern, "%h", host, -1)
r.config.ClientConfigs = append(
r.config.ClientConfigs,
RedfishReceiverClientConfig{
Hostname: host,
isExcluded: isExcluded,
doPowerMetric: doPowerMetric,
doProcessorMetrics: doProcessorMetrics,
doThermalMetrics: doThermalMetrics,
skipProcessorMetricsURL: make(map[string]bool),
gofish: gofish.ClientConfig{
Username: username,
Password: password,
Endpoint: endpoint,
HTTPClient: httpClient,
},
})
}
}
// Compute parallel fanout to use
numClients := len(r.config.ClientConfigs)
r.config.fanout = configJSON.Fanout
if numClients < r.config.fanout {
r.config.fanout = numClients
}
if numClients == 0 {
err := fmt.Errorf("at least one client config is required")
cclog.ComponentError(r.name, err)
return nil, err
}
// Check for duplicate client configurations
isDuplicate := make(map[string]bool)
for i := range r.config.ClientConfigs {
host := r.config.ClientConfigs[i].Hostname
if isDuplicate[host] {
err := fmt.Errorf("Found duplicate client config for host %s", host)
cclog.ComponentError(r.name, err)
return nil, err
}
isDuplicate[host] = true
}
// Give some basic info about redfish receiver status
cclog.ComponentInfo(r.name, "Monitoring", numClients, "clients")
cclog.ComponentInfo(r.name, "Monitoring interval:", r.config.Interval)
cclog.ComponentInfo(r.name, "Monitoring parallel fanout:", r.config.fanout)
return r, nil return r, nil
} }

View File

@ -0,0 +1,54 @@
## Redfish receiver
The Redfish receiver uses the [Redfish (specification)](https://www.dmtf.org/standards/redfish) to query thermal and power metrics. Thermal metrics may include various fan speeds and temperatures. Power metrics may include the current power consumption of various hardware components. It may also include the minimum, maximum and average power consumption of these components in a given time interval. The receiver will poll each configured redfish device once in a given interval. Multiple devices can be accessed in parallel to increase throughput.
### Configuration structure
```json
{
"<redfish receiver name>": {
"type": "redfish",
"username": "<user A>",
"password": "<password A>",
"endpoint": "https://%h-bmc",
"exclude_metrics": [ "min_consumed_watts" ],
"client_config": [
{
"host_list": [ "<host 1>", "<host 2>" ]
},
{
"host_list": [ "<host 3>", "<host 4>" ]
"disable_power_metrics": true
},
{
"host_list": [ "<host 5>" ],
"username": "<user B>",
"password": "<password B>",
"endpoint": "https://%h-BMC",
"disable_thermal_metrics": true
}
]
}
}
```
Global settings:
- `fanout`: Maximum number of simultaneous redfish connections (default: 64)
- `interval`: How often the redfish power metrics should be read and send to the sink (default: 30 s)
- `http_insecure`: Control whether a client verifies the server's certificate (default: true == do not verify server's certificate)
- `http_timeout`: Time limit for requests made by this HTTP client (default: 10 s)
Global and per redfish device settings (per redfish device settings overwrite the global settings):
- `disable_power_metrics`: disable collection of power metrics
- `disable_processor_metrics`: disable collection of processor metrics
- `disable_thermal_metrics`: disable collection of thermal metrics
- `exclude_metrics`: list of excluded metrics
- `username`: User name to authenticate with
- `password`: Password to use for authentication
- `endpoint`: URL of the redfish service (placeholder `%h` gets replaced by the hostname)
Per redfish device settings:
- `host_list`: List of hosts with the same client configuration

View File

@ -4,7 +4,7 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
) )
// SampleReceiver configuration: receiver type, listen address, port // SampleReceiver configuration: receiver type, listen address, port

View File

@ -6,7 +6,7 @@ CC_HOME=/tmp
LOG_DIR=/var/log LOG_DIR=/var/log
DATA_DIR=/var/lib/grafana DATA_DIR=/var/lib/cc-metric-collector
MAX_OPEN_FILES=10000 MAX_OPEN_FILES=10000

View File

@ -0,0 +1,16 @@
#!/usr/bin/make -f
# You must remove unused comment lines for the released package.
#export DH_VERBOSE = 1
#export DEB_BUILD_MAINT_OPTIONS = hardening=+all
#export DEB_CFLAGS_MAINT_APPEND = -Wall -pedantic
#export DEB_LDFLAGS_MAINT_APPEND = -Wl,--as-needed
%:
dh $@
override_dh_auto_build:
make
override_dh_auto_install:
make PREFIX=/usr install

View File

@ -19,7 +19,7 @@
PATH=/bin:/usr/bin:/sbin:/usr/sbin PATH=/bin:/usr/bin:/sbin:/usr/sbin
NAME=cc-metric-collector NAME=cc-metric-collector
DESC="ClusterCockpit metric collector" DESC="ClusterCockpit metric collector"
DEFAULT=/etc/default/${NAME}.json DEFAULT=/etc/default/${NAME}
CC_USER=clustercockpit CC_USER=clustercockpit
CC_GROUP=clustercockpit CC_GROUP=clustercockpit

View File

@ -4,7 +4,7 @@ import (
"fmt" "fmt"
"strings" "strings"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
) )
func GangliaMetricName(point lp.CCMetric) string { func GangliaMetricName(point lp.CCMetric) string {

View File

@ -9,8 +9,8 @@ import (
// "time" // "time"
"os/exec" "os/exec"
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
) )
const GMETRIC_EXEC = `gmetric` const GMETRIC_EXEC = `gmetric`

View File

@ -9,8 +9,8 @@ import (
"sync" "sync"
"time" "time"
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
influx "github.com/influxdata/line-protocol" influx "github.com/influxdata/line-protocol"
) )

View File

@ -9,8 +9,8 @@ import (
"strings" "strings"
"time" "time"
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
influxdb2 "github.com/influxdata/influxdb-client-go/v2" influxdb2 "github.com/influxdata/influxdb-client-go/v2"
influxdb2Api "github.com/influxdata/influxdb-client-go/v2/api" influxdb2Api "github.com/influxdata/influxdb-client-go/v2/api"
influxdb2ApiHttp "github.com/influxdata/influxdb-client-go/v2/api/http" influxdb2ApiHttp "github.com/influxdata/influxdb-client-go/v2/api/http"

View File

@ -9,8 +9,8 @@ import (
"sync" "sync"
"time" "time"
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
influxdb2 "github.com/influxdata/influxdb-client-go/v2" influxdb2 "github.com/influxdata/influxdb-client-go/v2"
influxdb2Api "github.com/influxdata/influxdb-client-go/v2/api" influxdb2Api "github.com/influxdata/influxdb-client-go/v2/api"
"github.com/influxdata/influxdb-client-go/v2/api/write" "github.com/influxdata/influxdb-client-go/v2/api/write"

View File

@ -71,8 +71,8 @@ import (
"fmt" "fmt"
"unsafe" "unsafe"
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
"github.com/NVIDIA/go-nvml/pkg/dl" "github.com/NVIDIA/go-nvml/pkg/dl"
) )

View File

@ -1,7 +1,7 @@
package sinks package sinks
import ( import (
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
) )
type defaultSinkConfig struct { type defaultSinkConfig struct {

View File

@ -8,8 +8,8 @@ import (
"sync" "sync"
"time" "time"
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
influx "github.com/influxdata/line-protocol" influx "github.com/influxdata/line-protocol"
nats "github.com/nats-io/nats.go" nats "github.com/nats-io/nats.go"
) )

View File

@ -9,8 +9,8 @@ import (
"strings" "strings"
"sync" "sync"
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
"github.com/gorilla/mux" "github.com/gorilla/mux"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp" "github.com/prometheus/client_golang/prometheus/promhttp"

View File

@ -5,8 +5,8 @@ import (
"fmt" "fmt"
"log" "log"
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
) )
type SampleSinkConfig struct { type SampleSinkConfig struct {

View File

@ -6,8 +6,8 @@ import (
"os" "os"
"sync" "sync"
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
) )
const SINK_MAX_FORWARD = 50 const SINK_MAX_FORWARD = 50

View File

@ -7,7 +7,7 @@ import (
"strings" "strings"
// "time" // "time"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
) )
type StdoutSink struct { type StdoutSink struct {