Add stats counters to collectors

This commit is contained in:
Thomas Roehl 2022-04-02 16:05:52 +02:00
parent 9447685a69
commit 4a4992877c
22 changed files with 249 additions and 83 deletions

View File

@ -16,6 +16,7 @@ import (
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
)
const DEFAULT_BEEGFS_CMD = "beegfs-ctl"
@ -33,6 +34,7 @@ type BeegfsMetaCollector struct {
matches map[string]string
config BeegfsMetaCollectorConfig
skipFS map[string]struct{}
statsProcessedMetrics int64
}
func (m *BeegfsMetaCollector) Init(config json.RawMessage) error {
@ -105,6 +107,7 @@ func (m *BeegfsMetaCollector) Init(config json.RawMessage) error {
if err != nil {
return fmt.Errorf("BeegfsMetaCollector.Init(): Failed to find beegfs-ctl binary '%s': %v", m.config.Beegfs, err)
}
m.statsProcessedMetrics = 0
m.init = true
return nil
}
@ -218,10 +221,12 @@ func (m *BeegfsMetaCollector) Read(interval time.Duration, output chan lp.CCMetr
y, err := lp.New(key, m.tags, m.meta, map[string]interface{}{"value": value}, time.Now())
if err == nil {
output <- y
m.statsProcessedMetrics++
}
}
}
}
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
}
func (m *BeegfsMetaCollector) Close() {

View File

@ -16,6 +16,7 @@ import (
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
)
// Struct for the collector-specific JSON config
@ -31,6 +32,7 @@ type BeegfsStorageCollector struct {
matches map[string]string
config BeegfsStorageCollectorConfig
skipFS map[string]struct{}
statsProcessedMetrics int64
}
func (m *BeegfsStorageCollector) Init(config json.RawMessage) error {
@ -98,6 +100,7 @@ func (m *BeegfsStorageCollector) Init(config json.RawMessage) error {
if err != nil {
return fmt.Errorf("BeegfsStorageCollector.Init(): Failed to find beegfs-ctl binary '%s': %v", m.config.Beegfs, err)
}
m.statsProcessedMetrics = 0
m.init = true
return nil
}
@ -210,10 +213,12 @@ func (m *BeegfsStorageCollector) Read(interval time.Duration, output chan lp.CCM
y, err := lp.New(key, m.tags, m.meta, map[string]interface{}{"value": value}, time.Now())
if err == nil {
output <- y
m.statsProcessedMetrics++
}
}
}
}
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
}
func (m *BeegfsStorageCollector) Close() {

View File

@ -12,6 +12,7 @@ import (
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
)
//
@ -37,6 +38,7 @@ type CPUFreqCpuInfoCollectorTopology struct {
type CPUFreqCpuInfoCollector struct {
metricCollector
topology []*CPUFreqCpuInfoCollectorTopology
statsProcessedMetrics int64
}
func (m *CPUFreqCpuInfoCollector) Init(config json.RawMessage) error {
@ -155,7 +157,7 @@ func (m *CPUFreqCpuInfoCollector) Init(config json.RawMessage) error {
"package_id": t.physicalPackageID,
}
}
m.statsProcessedMetrics = 0
m.init = true
return nil
}
@ -196,6 +198,7 @@ func (m *CPUFreqCpuInfoCollector) Read(interval time.Duration, output chan lp.CC
return
}
if y, err := lp.New("cpufreq", t.tagSet, m.meta, map[string]interface{}{"value": value}, now); err == nil {
m.statsProcessedMetrics++
output <- y
}
}
@ -203,6 +206,7 @@ func (m *CPUFreqCpuInfoCollector) Read(interval time.Duration, output chan lp.CC
}
}
}
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
}
func (m *CPUFreqCpuInfoCollector) Close() {

View File

@ -11,6 +11,7 @@ import (
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
"golang.org/x/sys/unix"
)
@ -40,6 +41,7 @@ type CPUFreqCollectorTopology struct {
type CPUFreqCollector struct {
metricCollector
topology []CPUFreqCollectorTopology
statsProcessedMetrics int64
config struct {
ExcludeMetrics []string `json:"exclude_metrics,omitempty"`
}
@ -166,7 +168,7 @@ func (m *CPUFreqCollector) Init(config json.RawMessage) error {
"package_id": t.physicalPackageID,
}
}
m.statsProcessedMetrics = 0
m.init = true
return nil
}
@ -203,9 +205,11 @@ func (m *CPUFreqCollector) Read(interval time.Duration, output chan lp.CCMetric)
}
if y, err := lp.New("cpufreq", t.tagSet, m.meta, map[string]interface{}{"value": cpuFreq}, now); err == nil {
m.statsProcessedMetrics++
output <- y
}
}
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
}
func (m *CPUFreqCollector) Close() {

View File

@ -11,6 +11,7 @@ import (
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
)
const CPUSTATFILE = `/proc/stat`
@ -25,6 +26,7 @@ type CpustatCollector struct {
matches map[string]int
cputags map[string]map[string]string
nodetags map[string]string
statsProcessedMetrics int64
}
func (m *CpustatCollector) Init(config json.RawMessage) error {
@ -86,6 +88,7 @@ func (m *CpustatCollector) Init(config json.RawMessage) error {
num_cpus++
}
}
m.statsProcessedMetrics = 0
m.init = true
return nil
}
@ -106,6 +109,7 @@ func (m *CpustatCollector) parseStatLine(linefields []string, tags map[string]st
for name, value := range values {
y, err := lp.New(name, tags, m.meta, map[string]interface{}{"value": (value * 100.0) / total}, t)
if err == nil {
m.statsProcessedMetrics++
output <- y
}
}
@ -141,8 +145,10 @@ func (m *CpustatCollector) Read(interval time.Duration, output chan lp.CCMetric)
time.Now(),
)
if err == nil {
m.statsProcessedMetrics++
output <- num_cpus_metric
}
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
}
func (m *CpustatCollector) Close() {

View File

@ -10,6 +10,7 @@ import (
"time"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
influx "github.com/influxdata/line-protocol"
)
@ -28,6 +29,9 @@ type CustomCmdCollector struct {
config CustomCmdCollectorConfig
commands []string
files []string
statsProcessedMetrics int64
statsProcessedCommands int64
statsProcessedFiles int64
}
func (m *CustomCmdCollector) Init(config json.RawMessage) error {
@ -66,6 +70,9 @@ func (m *CustomCmdCollector) Init(config json.RawMessage) error {
m.handler = influx.NewMetricHandler()
m.parser = influx.NewParser(m.handler)
m.parser.SetTimeFunc(DefaultTime)
m.statsProcessedMetrics = 0
m.statsProcessedFiles = 0
m.statsProcessedCommands = 0
m.init = true
return nil
}
@ -100,9 +107,13 @@ func (m *CustomCmdCollector) Read(interval time.Duration, output chan lp.CCMetri
y := lp.FromInfluxMetric(c)
if err == nil {
m.statsProcessedMetrics++
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
output <- y
}
}
m.statsProcessedCommands++
stats.ComponentStatInt(m.name, "processed_commands", m.statsProcessedCommands)
}
for _, file := range m.files {
buffer, err := ioutil.ReadFile(file)
@ -122,9 +133,13 @@ func (m *CustomCmdCollector) Read(interval time.Duration, output chan lp.CCMetri
}
y := lp.FromInfluxMetric(f)
if err == nil {
m.statsProcessedMetrics++
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
output <- y
}
}
m.statsProcessedFiles++
stats.ComponentStatInt(m.name, "processed_files", m.statsProcessedFiles)
}
}

View File

@ -11,6 +11,7 @@ import (
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
)
// "log"
@ -23,9 +24,8 @@ type DiskstatCollectorConfig struct {
type DiskstatCollector struct {
metricCollector
//matches map[string]int
config IOstatCollectorConfig
//devices map[string]IOstatCollectorEntry
config DiskstatCollectorConfig
statsProcessedMetrics int64
}
func (m *DiskstatCollector) Init(config json.RawMessage) error {
@ -44,6 +44,7 @@ func (m *DiskstatCollector) Init(config json.RawMessage) error {
return err
}
defer file.Close()
m.statsProcessedMetrics = 0
m.init = true
return nil
}
@ -89,12 +90,16 @@ func (m *DiskstatCollector) Read(interval time.Duration, output chan lp.CCMetric
y, err := lp.New("disk_total", tags, m.meta, map[string]interface{}{"value": total}, time.Now())
if err == nil {
y.AddMeta("unit", "GBytes")
m.statsProcessedMetrics++
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
output <- y
}
free := (stat.Bfree * uint64(stat.Bsize)) / uint64(1000000000)
y, err = lp.New("disk_free", tags, m.meta, map[string]interface{}{"value": free}, time.Now())
if err == nil {
y.AddMeta("unit", "GBytes")
m.statsProcessedMetrics++
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
output <- y
}
perc := (100 * (total - free)) / total
@ -105,6 +110,8 @@ func (m *DiskstatCollector) Read(interval time.Duration, output chan lp.CCMetric
y, err := lp.New("part_max_used", map[string]string{"type": "node"}, m.meta, map[string]interface{}{"value": int(part_max_used)}, time.Now())
if err == nil {
y.AddMeta("unit", "percent")
m.statsProcessedMetrics++
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
output <- y
}
}

View File

@ -15,6 +15,7 @@ import (
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
)
const DEFAULT_GPFS_CMD = "mmpmon"
@ -35,6 +36,7 @@ type GpfsCollector struct {
skipFS map[string]struct{}
lastTimestamp time.Time // Store time stamp of last tick to derive bandwidths
lastState map[string]GpfsCollectorLastState
statsProcessedMetrics int64
}
func (m *GpfsCollector) Init(config json.RawMessage) error {
@ -86,7 +88,7 @@ func (m *GpfsCollector) Init(config json.RawMessage) error {
return fmt.Errorf("failed to find mmpmon binary '%s': %v", m.config.Mmpmon, err)
}
m.config.Mmpmon = p
m.statsProcessedMetrics = 0
m.init = true
return nil
}
@ -211,12 +213,14 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) {
}
if y, err := lp.New("gpfs_bytes_read", m.tags, m.meta, map[string]interface{}{"value": bytesRead}, timestamp); err == nil {
output <- y
m.statsProcessedMetrics++
}
if m.config.SendBandwidths {
if lastBytesRead := m.lastState[filesystem].bytesRead; lastBytesRead >= 0 {
bwRead := float64(bytesRead-lastBytesRead) / timeDiff
if y, err := lp.New("gpfs_bw_read", m.tags, m.meta, map[string]interface{}{"value": bwRead}, timestamp); err == nil {
output <- y
m.statsProcessedMetrics++
}
}
}
@ -231,12 +235,14 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) {
}
if y, err := lp.New("gpfs_bytes_written", m.tags, m.meta, map[string]interface{}{"value": bytesWritten}, timestamp); err == nil {
output <- y
m.statsProcessedMetrics++
}
if m.config.SendBandwidths {
if lastBytesWritten := m.lastState[filesystem].bytesRead; lastBytesWritten >= 0 {
bwWrite := float64(bytesWritten-lastBytesWritten) / timeDiff
if y, err := lp.New("gpfs_bw_write", m.tags, m.meta, map[string]interface{}{"value": bwWrite}, timestamp); err == nil {
output <- y
m.statsProcessedMetrics++
}
}
}
@ -258,6 +264,7 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) {
}
if y, err := lp.New("gpfs_num_opens", m.tags, m.meta, map[string]interface{}{"value": numOpens}, timestamp); err == nil {
output <- y
m.statsProcessedMetrics++
}
// number of closes
@ -270,6 +277,7 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) {
}
if y, err := lp.New("gpfs_num_closes", m.tags, m.meta, map[string]interface{}{"value": numCloses}, timestamp); err == nil {
output <- y
m.statsProcessedMetrics++
}
// number of reads
@ -282,6 +290,7 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) {
}
if y, err := lp.New("gpfs_num_reads", m.tags, m.meta, map[string]interface{}{"value": numReads}, timestamp); err == nil {
output <- y
m.statsProcessedMetrics++
}
// number of writes
@ -294,6 +303,7 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) {
}
if y, err := lp.New("gpfs_num_writes", m.tags, m.meta, map[string]interface{}{"value": numWrites}, timestamp); err == nil {
output <- y
m.statsProcessedMetrics++
}
// number of read directories
@ -306,6 +316,7 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) {
}
if y, err := lp.New("gpfs_num_readdirs", m.tags, m.meta, map[string]interface{}{"value": numReaddirs}, timestamp); err == nil {
output <- y
m.statsProcessedMetrics++
}
// Number of inode updates
@ -317,9 +328,11 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) {
continue
}
if y, err := lp.New("gpfs_num_inode_updates", m.tags, m.meta, map[string]interface{}{"value": numInodeUpdates}, timestamp); err == nil {
m.statsProcessedMetrics++
output <- y
}
}
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
}
func (m *GpfsCollector) Close() {

View File

@ -7,6 +7,7 @@ import (
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
"golang.org/x/sys/unix"
"encoding/json"
@ -41,6 +42,7 @@ type InfinibandCollector struct {
}
info []*InfinibandCollectorInfo
lastTimestamp time.Time // Store time stamp of last tick to derive bandwidths
statsProcessedMetrics int64
}
// Init initializes the Infiniband collector by walking through files below IB_BASEPATH
@ -149,7 +151,7 @@ func (m *InfinibandCollector) Init(config json.RawMessage) error {
if len(m.info) == 0 {
return fmt.Errorf("found no IB devices")
}
m.statsProcessedMetrics = 0
m.init = true
return nil
}
@ -196,6 +198,7 @@ func (m *InfinibandCollector) Read(interval time.Duration, output chan lp.CCMetr
if y, err := lp.New(counterName, info.tagSet, m.meta, map[string]interface{}{"value": v}, now); err == nil {
y.AddMeta("unit", counterDef.unit)
output <- y
m.statsProcessedMetrics++
}
}
@ -206,6 +209,7 @@ func (m *InfinibandCollector) Read(interval time.Duration, output chan lp.CCMetr
if y, err := lp.New(counterName+"_bw", info.tagSet, m.meta, map[string]interface{}{"value": rate}, now); err == nil {
y.AddMeta("unit", counterDef.unit+"/sec")
output <- y
m.statsProcessedMetrics++
}
}
// Save current state
@ -214,6 +218,7 @@ func (m *InfinibandCollector) Read(interval time.Duration, output chan lp.CCMetr
}
}
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
}
func (m *InfinibandCollector) Close() {

View File

@ -6,6 +6,7 @@ import (
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
// "log"
"encoding/json"
@ -32,6 +33,7 @@ type IOstatCollector struct {
matches map[string]int
config IOstatCollectorConfig
devices map[string]IOstatCollectorEntry
statsProcessedMetrics int64
}
func (m *IOstatCollector) Init(config json.RawMessage) error {
@ -102,6 +104,7 @@ func (m *IOstatCollector) Init(config json.RawMessage) error {
lastValues: values,
}
}
m.statsProcessedMetrics = 0
m.init = true
return err
}
@ -141,6 +144,7 @@ func (m *IOstatCollector) Read(interval time.Duration, output chan lp.CCMetric)
y, err := lp.New(name, entry.tags, m.meta, map[string]interface{}{"value": int(diff)}, time.Now())
if err == nil {
output <- y
m.statsProcessedMetrics++
}
}
entry.lastValues[name] = x
@ -148,6 +152,7 @@ func (m *IOstatCollector) Read(interval time.Duration, output chan lp.CCMetric)
}
m.devices[device] = entry
}
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
}
func (m *IOstatCollector) Close() {

View File

@ -11,6 +11,7 @@ import (
"time"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
)
const IPMITOOL_PATH = `ipmitool`
@ -29,6 +30,7 @@ type IpmiCollector struct {
config IpmiCollectorConfig
ipmitool string
ipmisensors string
statsProcessedMetrics int64
}
func (m *IpmiCollector) Init(config json.RawMessage) error {
@ -56,6 +58,7 @@ func (m *IpmiCollector) Init(config json.RawMessage) error {
if len(m.ipmitool) == 0 && len(m.ipmisensors) == 0 {
return errors.New("no IPMI reader found")
}
m.statsProcessedMetrics = 0
m.init = true
return nil
}
@ -94,6 +97,7 @@ func (m *IpmiCollector) readIpmiTool(cmd string, output chan lp.CCMetric) {
if err == nil {
y.AddMeta("unit", unit)
output <- y
m.statsProcessedMetrics++
}
}
}
@ -123,6 +127,7 @@ func (m *IpmiCollector) readIpmiSensors(cmd string, output chan lp.CCMetric) {
y.AddMeta("unit", lv[4])
}
output <- y
m.statsProcessedMetrics++
}
}
}
@ -141,6 +146,7 @@ func (m *IpmiCollector) Read(interval time.Duration, output chan lp.CCMetric) {
m.readIpmiSensors(m.config.IpmisensorsPath, output)
}
}
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
}
func (m *IpmiCollector) Close() {

View File

@ -28,6 +28,7 @@ import (
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
topo "github.com/ClusterCockpit/cc-metric-collector/internal/ccTopology"
agg "github.com/ClusterCockpit/cc-metric-collector/internal/metricAggregator"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
"github.com/NVIDIA/go-nvml/pkg/dl"
)
@ -84,6 +85,9 @@ type LikwidCollector struct {
initialized bool
likwidGroups map[C.int]LikwidEventsetConfig
lock sync.Mutex
statsMeasurements int64
statsProcessedMetrics int64
statsPublishedMetrics int64
}
type LikwidMetric struct {
@ -267,6 +271,9 @@ func (m *LikwidCollector) Init(config json.RawMessage) error {
cclog.ComponentError(m.name, err.Error())
return err
}
m.statsMeasurements = 0
m.statsProcessedMetrics = 0
m.statsPublishedMetrics = 0
m.init = true
return nil
}
@ -274,6 +281,7 @@ func (m *LikwidCollector) Init(config json.RawMessage) error {
// take a measurement for 'interval' seconds of event set index 'group'
func (m *LikwidCollector) takeMeasurement(evset LikwidEventsetConfig, interval time.Duration) (bool, error) {
var ret C.int
m.lock.Lock()
if m.initialized {
ret = C.perfmon_setupCounters(evset.gid)
@ -317,6 +325,8 @@ func (m *LikwidCollector) takeMeasurement(evset LikwidEventsetConfig, interval t
}
}
m.lock.Unlock()
m.statsMeasurements++
stats.ComponentStatInt(m.name, "measurements", m.statsMeasurements)
return false, nil
}
@ -357,6 +367,8 @@ func (m *LikwidCollector) calcEventsetMetrics(evset LikwidEventsetConfig, interv
if m.config.InvalidToZero && math.IsInf(value, 0) {
value = 0.0
}
m.statsProcessedMetrics++
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
// Now we have the result, send it with the proper tags
if !math.IsNaN(value) {
if metric.Publish {
@ -369,6 +381,8 @@ func (m *LikwidCollector) calcEventsetMetrics(evset LikwidEventsetConfig, interv
if len(metric.Unit) > 0 {
y.AddMeta("unit", metric.Unit)
}
m.statsPublishedMetrics++
stats.ComponentStatInt(m.name, "published_metrics", m.statsPublishedMetrics)
output <- y
}
}
@ -409,6 +423,8 @@ func (m *LikwidCollector) calcGlobalMetrics(interval time.Duration, output chan
if m.config.InvalidToZero && math.IsInf(value, 0) {
value = 0.0
}
m.statsProcessedMetrics++
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
// Now we have the result, send it with the proper tags
if !math.IsNaN(value) {
if metric.Publish {
@ -422,6 +438,8 @@ func (m *LikwidCollector) calcGlobalMetrics(interval time.Duration, output chan
if len(metric.Unit) > 0 {
y.AddMeta("unit", metric.Unit)
}
m.statsPublishedMetrics++
stats.ComponentStatInt(m.name, "published_metrics", m.statsPublishedMetrics)
output <- y
}
}

View File

@ -10,6 +10,7 @@ import (
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
)
//
@ -32,6 +33,7 @@ type LoadavgCollector struct {
config struct {
ExcludeMetrics []string `json:"exclude_metrics,omitempty"`
}
statsProcessedMetrics int64
}
func (m *LoadavgCollector) Init(config json.RawMessage) error {
@ -63,6 +65,7 @@ func (m *LoadavgCollector) Init(config json.RawMessage) error {
for i, name := range m.proc_matches {
_, m.proc_skips[i] = stringArrayContains(m.config.ExcludeMetrics, name)
}
m.statsProcessedMetrics = 0
m.init = true
return nil
}
@ -98,6 +101,7 @@ func (m *LoadavgCollector) Read(interval time.Duration, output chan lp.CCMetric)
y, err := lp.New(name, m.tags, m.meta, map[string]interface{}{"value": x}, now)
if err == nil {
output <- y
m.statsProcessedMetrics++
}
}
@ -117,9 +121,10 @@ func (m *LoadavgCollector) Read(interval time.Duration, output chan lp.CCMetric)
y, err := lp.New(name, m.tags, m.meta, map[string]interface{}{"value": x}, now)
if err == nil {
output <- y
m.statsProcessedMetrics++
}
}
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
}
func (m *LoadavgCollector) Close() {

View File

@ -12,6 +12,7 @@ import (
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
)
const LUSTRE_SYSFS = `/sys/fs/lustre`
@ -44,6 +45,7 @@ type LustreCollector struct {
lastTimestamp time.Time // Store time stamp of last tick to derive bandwidths
definitions []LustreMetricDefinition // Combined list without excluded metrics
stats map[string]map[string]int64 // Data for last value per device and metric
statsProcessedMetrics int64
}
func (m *LustreCollector) getDeviceDataCommand(device string) []string {
@ -372,6 +374,7 @@ func (m *LustreCollector) Init(config json.RawMessage) error {
}
}
m.lastTimestamp = time.Now()
m.statsProcessedMetrics = 0
m.init = true
return nil
}
@ -418,11 +421,13 @@ func (m *LustreCollector) Read(interval time.Duration, output chan lp.CCMetric)
y.AddMeta("unit", def.unit)
}
output <- y
m.statsProcessedMetrics++
}
devData[def.name] = use_x
}
}
m.lastTimestamp = now
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
}
func (m *LustreCollector) Close() {

View File

@ -14,6 +14,7 @@ import (
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
)
const MEMSTATFILE = "/proc/meminfo"
@ -38,6 +39,7 @@ type MemstatCollector struct {
config MemstatCollectorConfig
nodefiles map[int]MemstatCollectorNode
sendMemUsed bool
statsProcessedMetrics int64
}
type MemstatStats struct {
@ -153,6 +155,7 @@ func (m *MemstatCollector) Init(config json.RawMessage) error {
}
}
}
m.statsProcessedMetrics = 0
m.init = true
return err
}
@ -178,6 +181,7 @@ func (m *MemstatCollector) Read(interval time.Duration, output chan lp.CCMetric)
if len(unit) > 0 {
y.AddMeta("unit", unit)
}
m.statsProcessedMetrics++
output <- y
}
}
@ -207,6 +211,7 @@ func (m *MemstatCollector) Read(interval time.Duration, output chan lp.CCMetric)
if len(unit) > 0 {
y.AddMeta("unit", unit)
}
m.statsProcessedMetrics++
output <- y
}
}
@ -223,6 +228,7 @@ func (m *MemstatCollector) Read(interval time.Duration, output chan lp.CCMetric)
sendStats(stats, nodeConf.tags)
}
}
stats.ComponentStatInt(m.name, "collected_metrics", m.statsProcessedMetrics)
}
func (m *MemstatCollector) Close() {

View File

@ -11,6 +11,7 @@ import (
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
)
const NETSTATFILE = "/proc/net/dev"
@ -35,6 +36,7 @@ type NetstatCollector struct {
config NetstatCollectorConfig
matches map[string][]NetstatCollectorMetric
lastTimestamp time.Time
statsProcessedMetrics int64
}
func (m *NetstatCollector) Init(config json.RawMessage) error {
@ -148,6 +150,7 @@ func (m *NetstatCollector) Init(config json.RawMessage) error {
if len(m.matches) == 0 {
return errors.New("no devices to collector metrics found")
}
m.statsProcessedMetrics = 0
m.init = true
return nil
}
@ -198,6 +201,7 @@ func (m *NetstatCollector) Read(interval time.Duration, output chan lp.CCMetric)
if m.config.SendAbsoluteValues {
if y, err := lp.New(metric.name, metric.tags, metric.meta, map[string]interface{}{"value": v}, now); err == nil {
output <- y
m.statsProcessedMetrics++
}
}
if m.config.SendDerivedValues {
@ -205,6 +209,7 @@ func (m *NetstatCollector) Read(interval time.Duration, output chan lp.CCMetric)
rate := float64(v-metric.lastValue) / timeDiff
if y, err := lp.New(metric.name+"_bw", metric.tags, metric.meta_rates, map[string]interface{}{"value": rate}, now); err == nil {
output <- y
m.statsProcessedMetrics++
}
}
metric.lastValue = v
@ -212,6 +217,7 @@ func (m *NetstatCollector) Read(interval time.Duration, output chan lp.CCMetric)
}
}
}
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
}
func (m *NetstatCollector) Close() {

View File

@ -12,6 +12,7 @@ import (
"time"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
)
// First part contains the code for the general NfsCollector.
@ -33,6 +34,7 @@ type nfsCollector struct {
ExcludeMetrics []string `json:"exclude_metrics,omitempty"`
}
data map[string]NfsCollectorData
statsProcessedMetrics int64
}
func (m *nfsCollector) initStats() error {
@ -113,6 +115,7 @@ func (m *nfsCollector) MainInit(config json.RawMessage) error {
}
m.data = make(map[string]NfsCollectorData)
m.initStats()
m.statsProcessedMetrics = 0
m.init = true
return nil
}
@ -143,8 +146,10 @@ func (m *nfsCollector) Read(interval time.Duration, output chan lp.CCMetric) {
if err == nil {
y.AddMeta("version", m.version)
output <- y
m.statsProcessedMetrics++
}
}
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
}
func (m *nfsCollector) Close() {

View File

@ -12,6 +12,7 @@ import (
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
)
//
@ -45,6 +46,7 @@ type NUMAStatsCollectorTopolgy struct {
type NUMAStatsCollector struct {
metricCollector
topology []NUMAStatsCollectorTopolgy
statsProcessedMetrics int64
}
func (m *NUMAStatsCollector) Init(config json.RawMessage) error {
@ -80,7 +82,7 @@ func (m *NUMAStatsCollector) Init(config json.RawMessage) error {
tagSet: map[string]string{"memoryDomain": node},
})
}
m.statsProcessedMetrics = 0
m.init = true
return nil
}
@ -127,11 +129,13 @@ func (m *NUMAStatsCollector) Read(interval time.Duration, output chan lp.CCMetri
)
if err == nil {
output <- y
m.statsProcessedMetrics++
}
}
file.Close()
}
stats.ComponentStatInt(m.name, "collected_metrics", m.statsProcessedMetrics)
}
func (m *NUMAStatsCollector) Close() {

View File

@ -9,6 +9,7 @@ import (
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
"github.com/NVIDIA/go-nvml/pkg/nvml"
)
@ -29,6 +30,7 @@ type NvidiaCollector struct {
num_gpus int
config NvidiaCollectorConfig
gpus []NvidiaCollectorDevice
statsProcessedMetrics int64
}
func (m *NvidiaCollector) CatchPanic() {
@ -120,7 +122,7 @@ func (m *NvidiaCollector) Init(config json.RawMessage) error {
pciInfo.Device)
}
}
m.statsProcessedMetrics = 0
m.init = true
return nil
}
@ -151,6 +153,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric)
if err == nil {
y.AddMeta("unit", "%")
output <- y
m.statsProcessedMetrics++
}
}
if !device.excludeMetrics["nv_mem_util"] {
@ -158,6 +161,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric)
if err == nil {
y.AddMeta("unit", "%")
output <- y
m.statsProcessedMetrics++
}
}
}
@ -186,6 +190,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric)
if err == nil {
y.AddMeta("unit", "MByte")
output <- y
m.statsProcessedMetrics++
}
}
@ -195,6 +200,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric)
if err == nil {
y.AddMeta("unit", "MByte")
output <- y
m.statsProcessedMetrics++
}
}
}
@ -212,6 +218,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric)
if err == nil {
y.AddMeta("unit", "degC")
output <- y
m.statsProcessedMetrics++
}
}
}
@ -232,6 +239,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric)
if err == nil {
y.AddMeta("unit", "%")
output <- y
m.statsProcessedMetrics++
}
}
}
@ -258,11 +266,13 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric)
}
if err == nil {
output <- y
m.statsProcessedMetrics++
}
} else if ret == nvml.ERROR_NOT_SUPPORTED {
y, err := lp.New("nv_ecc_mode", device.tags, m.meta, map[string]interface{}{"value": "N/A"}, time.Now())
if err == nil {
output <- y
m.statsProcessedMetrics++
}
}
}
@ -280,6 +290,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric)
y, err := lp.New("nv_perf_state", device.tags, m.meta, map[string]interface{}{"value": fmt.Sprintf("P%d", int(pState))}, time.Now())
if err == nil {
output <- y
m.statsProcessedMetrics++
}
}
}
@ -296,6 +307,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric)
if err == nil {
y.AddMeta("unit", "watts")
output <- y
m.statsProcessedMetrics++
}
}
}
@ -313,6 +325,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric)
if err == nil {
y.AddMeta("unit", "MHz")
output <- y
m.statsProcessedMetrics++
}
}
}
@ -324,6 +337,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric)
if err == nil {
y.AddMeta("unit", "MHz")
output <- y
m.statsProcessedMetrics++
}
}
}
@ -335,6 +349,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric)
if err == nil {
y.AddMeta("unit", "MHz")
output <- y
m.statsProcessedMetrics++
}
}
}
@ -357,6 +372,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric)
if err == nil {
y.AddMeta("unit", "MHz")
output <- y
m.statsProcessedMetrics++
}
}
}
@ -368,6 +384,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric)
if err == nil {
y.AddMeta("unit", "MHz")
output <- y
m.statsProcessedMetrics++
}
}
}
@ -379,6 +396,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric)
if err == nil {
y.AddMeta("unit", "MHz")
output <- y
m.statsProcessedMetrics++
}
}
}
@ -398,6 +416,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric)
y, err := lp.New("nv_ecc_db_error", device.tags, m.meta, map[string]interface{}{"value": float64(ecc_db)}, time.Now())
if err == nil {
output <- y
m.statsProcessedMetrics++
}
}
}
@ -408,6 +427,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric)
y, err := lp.New("nv_ecc_sb_error", device.tags, m.meta, map[string]interface{}{"value": float64(ecc_sb)}, time.Now())
if err == nil {
output <- y
m.statsProcessedMetrics++
}
}
}
@ -425,6 +445,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric)
if err == nil {
y.AddMeta("unit", "watts")
output <- y
m.statsProcessedMetrics++
}
}
}
@ -441,6 +462,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric)
if err == nil {
y.AddMeta("unit", "%")
output <- y
m.statsProcessedMetrics++
}
}
}
@ -457,11 +479,12 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric)
if err == nil {
y.AddMeta("unit", "%")
output <- y
m.statsProcessedMetrics++
}
}
}
}
stats.ComponentStatInt(m.name, "collected_metrics", m.statsProcessedMetrics)
}
func (m *NvidiaCollector) Close() {

View File

@ -6,6 +6,7 @@ import (
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
)
// These are the fields we read from the JSON configuration
@ -20,6 +21,7 @@ type SampleCollector struct {
config SampleTimerCollectorConfig // the configuration structure
meta map[string]string // default meta information
tags map[string]string // default tags
statsCount int64
}
// Functions to implement MetricCollector interface
@ -58,6 +60,9 @@ func (m *SampleCollector) Init(config json.RawMessage) error {
// for all topological entities (sockets, NUMA domains, ...)
// Return some useful error message in case of any failures
// Initialize counts for statistics
m.statsCount = 0
// Set this flag only if everything is initialized properly, all required files exist, ...
m.init = true
return err
@ -80,8 +85,11 @@ func (m *SampleCollector) Read(interval time.Duration, output chan lp.CCMetric)
if err == nil {
// Send it to output channel
output <- y
// increment count for each sent metric or any other operation
m.statsCount++
}
// Set stats for the component
stats.ComponentStatInt(m.name, "count", m.statsCount)
}
// Close metric collector: close network connection, close files, close libraries, ...

View File

@ -11,6 +11,7 @@ import (
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
)
// See: https://www.kernel.org/doc/html/latest/hwmon/sysfs-interface.html
@ -41,6 +42,7 @@ type TempCollector struct {
ReportCriticalTemp bool `json:"report_critical_temperature"`
}
sensors []*TempCollectorSensor
statsProcessedMetrics int64
}
func (m *TempCollector) Init(config json.RawMessage) error {
@ -162,6 +164,7 @@ func (m *TempCollector) Init(config json.RawMessage) error {
}
// Finished initialization
m.statsProcessedMetrics = 0
m.init = true
return nil
}
@ -194,6 +197,7 @@ func (m *TempCollector) Read(interval time.Duration, output chan lp.CCMetric) {
)
if err == nil {
output <- y
m.statsProcessedMetrics++
}
// max temperature
@ -207,6 +211,7 @@ func (m *TempCollector) Read(interval time.Duration, output chan lp.CCMetric) {
)
if err == nil {
output <- y
m.statsProcessedMetrics++
}
}
@ -221,10 +226,11 @@ func (m *TempCollector) Read(interval time.Duration, output chan lp.CCMetric) {
)
if err == nil {
output <- y
m.statsProcessedMetrics++
}
}
}
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
}
func (m *TempCollector) Close() {

View File

@ -10,6 +10,7 @@ import (
"time"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
)
const MAX_NUM_PROCS = 10
@ -23,6 +24,7 @@ type TopProcsCollector struct {
metricCollector
tags map[string]string
config TopProcsCollectorConfig
statsProcessedMetrics int64
}
func (m *TopProcsCollector) Init(config json.RawMessage) error {
@ -48,6 +50,7 @@ func (m *TopProcsCollector) Init(config json.RawMessage) error {
if err != nil {
return errors.New("failed to execute command")
}
m.statsProcessedMetrics = 0
m.init = true
return nil
}
@ -70,8 +73,10 @@ func (m *TopProcsCollector) Read(interval time.Duration, output chan lp.CCMetric
y, err := lp.New(name, m.tags, m.meta, map[string]interface{}{"value": string(lines[i])}, time.Now())
if err == nil {
output <- y
m.statsProcessedMetrics++
}
}
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
}
func (m *TopProcsCollector) Close() {