Compare commits

..

21 Commits

Author SHA1 Message Date
Thomas Roehl
9dd6ff1a76 Add StatsAPI to README 2022-04-02 16:07:13 +02:00
Thomas Roehl
257b4a64b5 Add missing main API file 2022-04-02 16:06:51 +02:00
Thomas Roehl
5eeb097136 Add stats counters to sinks 2022-04-02 16:06:03 +02:00
Thomas Roehl
4a4992877c Add stats counters to collectors 2022-04-02 16:05:52 +02:00
Thomas Roehl
9447685a69 Add StatsApi. Started if a configuration file is set in global config.json 2022-04-02 16:05:27 +02:00
Thomas Roehl
28348bd108 InfluxSink: Use batch&flush logic from HttpSink 2022-04-01 18:37:45 +02:00
Thomas Roehl
a3b9d8a90b HttpSink: Use sink name in error outputs 2022-04-01 18:36:54 +02:00
Thomas Roehl
7e43e9171e Use default options. Overwrite if anything is configured differently. Use seconds as precision 2022-04-01 17:26:56 +02:00
Thomas Roehl
5d25a7bf12 Add units to InfiniBandCollector 2022-04-01 17:14:26 +02:00
Thomas Roehl
83b4343310 Likwid receives signal at first Read, check when re-initializing 2022-04-01 17:10:31 +02:00
Thomas Roehl
f1d3cabdc6 Merge branch 'develop' of github.com:ClusterCockpit/cc-metric-collector into develop 2022-04-01 12:45:25 +02:00
Thomas Roehl
43bcce6fb5 Merge branch 'develop' of github.com:ClusterCockpit/cc-metric-collector into develop 2022-03-22 18:05:38 +01:00
Thomas Roehl
beebcd7145 Merge branch 'develop' of github.com:ClusterCockpit/cc-metric-collector into develop 2022-03-15 16:16:39 +01:00
Thomas Roehl
082eea525a Merge branch 'develop' of github.com:ClusterCockpit/cc-metric-collector into develop 2022-03-15 16:10:41 +01:00
Thomas Roehl
2b8266d1d2 Merge branch 'develop' of github.com:ClusterCockpit/cc-metric-collector into develop 2022-03-15 15:24:29 +01:00
Thomas Roehl
d835724d93 Merge branch 'develop' of github.com:ClusterCockpit/cc-metric-collector into develop 2022-03-09 17:02:58 +01:00
Thomas Roehl
c5082bbffe Merge branch 'develop' of github.com:ClusterCockpit/cc-metric-collector into develop 2022-03-07 16:13:08 +01:00
Thomas Roehl
4c1263312b Merge branch 'develop' of github.com:ClusterCockpit/cc-metric-collector into develop 2022-02-25 15:06:06 +01:00
Thomas Roehl
940623585c Merge branch 'develop' of github.com:ClusterCockpit/cc-metric-collector into develop 2022-02-25 13:53:34 +01:00
Thomas Roehl
87ecb12c6f Merge branch 'develop' of github.com:ClusterCockpit/cc-metric-collector into develop 2022-02-24 18:28:52 +01:00
Thomas Roehl
ae64eddcc8 Remove doubled import 2022-02-21 14:50:53 +01:00
37 changed files with 864 additions and 202 deletions

View File

@@ -20,6 +20,7 @@ There is a main configuration file with basic settings that point to the other c
"collectors" : "collectors.json",
"receivers" : "receivers.json",
"router" : "router.json",
"stats_api" : "api.json",
"interval": 10,
"duration": 1
}
@@ -32,6 +33,7 @@ See the component READMEs for their configuration:
* [`sinks`](./sinks/README.md)
* [`receivers`](./receivers/README.md)
* [`router`](./internal/metricRouter/README.md)
* [`stats_api`](./internal/metricRouter/StatsApi.md)
# Installation

View File

@@ -28,6 +28,7 @@ type CentralConfigFile struct {
RouterConfigFile string `json:"router"`
SinkConfigFile string `json:"sinks"`
ReceiverConfigFile string `json:"receivers,omitempty"`
StatsApiConfigFile string `json:"stats_api,omitempty"`
}
func LoadCentralConfiguration(file string, config *CentralConfigFile) error {
@@ -52,6 +53,7 @@ type RuntimeConfig struct {
CollectManager collectors.CollectorManager
SinkManager sinks.SinkManager
ReceiveManager receivers.ReceiveManager
StatsApi mr.StatsApi
MultiChanTicker mct.MultiChanTicker
Channels []chan lp.CCMetric
@@ -152,11 +154,16 @@ func shutdownHandler(config *RuntimeConfig, shutdownSignal chan os.Signal) {
cclog.Debug("Shutdown SinkManager...")
config.SinkManager.Close()
}
if config.StatsApi != nil {
cclog.Debug("Shutdown StatsApi...")
config.StatsApi.Close()
}
}
func mainFunc() int {
var err error
use_recv := false
use_api := false
// Initialize runtime configuration
rcfg := RuntimeConfig{
@@ -164,6 +171,7 @@ func mainFunc() int {
CollectManager: nil,
SinkManager: nil,
ReceiveManager: nil,
StatsApi: nil,
CliArgs: ReadCli(),
}
@@ -253,6 +261,16 @@ func mainFunc() int {
use_recv = true
}
// Create new statistics API manager
if len(rcfg.ConfigFile.StatsApiConfigFile) > 0 {
rcfg.StatsApi, err = mr.NewStatsApi(rcfg.MultiChanTicker, &rcfg.Sync, rcfg.ConfigFile.StatsApiConfigFile)
if err != nil {
cclog.Error(err.Error())
return 1
}
use_api = true
}
// Create shutdown handler
shutdownSignal := make(chan os.Signal, 1)
signal.Notify(shutdownSignal, os.Interrupt)
@@ -260,6 +278,11 @@ func mainFunc() int {
rcfg.Sync.Add(1)
go shutdownHandler(&rcfg, shutdownSignal)
// Start the stats api early to be prepared for init settings
if use_api {
rcfg.StatsApi.Start()
}
// Start the managers
rcfg.MetricRouter.Start()
rcfg.SinkManager.Start()

View File

@@ -16,6 +16,7 @@ import (
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
)
const DEFAULT_BEEGFS_CMD = "beegfs-ctl"
@@ -29,10 +30,11 @@ type BeegfsMetaCollectorConfig struct {
type BeegfsMetaCollector struct {
metricCollector
tags map[string]string
matches map[string]string
config BeegfsMetaCollectorConfig
skipFS map[string]struct{}
tags map[string]string
matches map[string]string
config BeegfsMetaCollectorConfig
skipFS map[string]struct{}
statsProcessedMetrics int64
}
func (m *BeegfsMetaCollector) Init(config json.RawMessage) error {
@@ -105,6 +107,7 @@ func (m *BeegfsMetaCollector) Init(config json.RawMessage) error {
if err != nil {
return fmt.Errorf("BeegfsMetaCollector.Init(): Failed to find beegfs-ctl binary '%s': %v", m.config.Beegfs, err)
}
m.statsProcessedMetrics = 0
m.init = true
return nil
}
@@ -218,10 +221,12 @@ func (m *BeegfsMetaCollector) Read(interval time.Duration, output chan lp.CCMetr
y, err := lp.New(key, m.tags, m.meta, map[string]interface{}{"value": value}, time.Now())
if err == nil {
output <- y
m.statsProcessedMetrics++
}
}
}
}
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
}
func (m *BeegfsMetaCollector) Close() {

View File

@@ -16,6 +16,7 @@ import (
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
)
// Struct for the collector-specific JSON config
@@ -27,10 +28,11 @@ type BeegfsStorageCollectorConfig struct {
type BeegfsStorageCollector struct {
metricCollector
tags map[string]string
matches map[string]string
config BeegfsStorageCollectorConfig
skipFS map[string]struct{}
tags map[string]string
matches map[string]string
config BeegfsStorageCollectorConfig
skipFS map[string]struct{}
statsProcessedMetrics int64
}
func (m *BeegfsStorageCollector) Init(config json.RawMessage) error {
@@ -98,6 +100,7 @@ func (m *BeegfsStorageCollector) Init(config json.RawMessage) error {
if err != nil {
return fmt.Errorf("BeegfsStorageCollector.Init(): Failed to find beegfs-ctl binary '%s': %v", m.config.Beegfs, err)
}
m.statsProcessedMetrics = 0
m.init = true
return nil
}
@@ -210,10 +213,12 @@ func (m *BeegfsStorageCollector) Read(interval time.Duration, output chan lp.CCM
y, err := lp.New(key, m.tags, m.meta, map[string]interface{}{"value": value}, time.Now())
if err == nil {
output <- y
m.statsProcessedMetrics++
}
}
}
}
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
}
func (m *BeegfsStorageCollector) Close() {

View File

@@ -12,6 +12,7 @@ import (
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
)
//
@@ -36,7 +37,8 @@ type CPUFreqCpuInfoCollectorTopology struct {
type CPUFreqCpuInfoCollector struct {
metricCollector
topology []*CPUFreqCpuInfoCollectorTopology
topology []*CPUFreqCpuInfoCollectorTopology
statsProcessedMetrics int64
}
func (m *CPUFreqCpuInfoCollector) Init(config json.RawMessage) error {
@@ -155,7 +157,7 @@ func (m *CPUFreqCpuInfoCollector) Init(config json.RawMessage) error {
"package_id": t.physicalPackageID,
}
}
m.statsProcessedMetrics = 0
m.init = true
return nil
}
@@ -196,6 +198,7 @@ func (m *CPUFreqCpuInfoCollector) Read(interval time.Duration, output chan lp.CC
return
}
if y, err := lp.New("cpufreq", t.tagSet, m.meta, map[string]interface{}{"value": value}, now); err == nil {
m.statsProcessedMetrics++
output <- y
}
}
@@ -203,6 +206,7 @@ func (m *CPUFreqCpuInfoCollector) Read(interval time.Duration, output chan lp.CC
}
}
}
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
}
func (m *CPUFreqCpuInfoCollector) Close() {

View File

@@ -11,6 +11,7 @@ import (
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
"golang.org/x/sys/unix"
)
@@ -39,8 +40,9 @@ type CPUFreqCollectorTopology struct {
//
type CPUFreqCollector struct {
metricCollector
topology []CPUFreqCollectorTopology
config struct {
topology []CPUFreqCollectorTopology
statsProcessedMetrics int64
config struct {
ExcludeMetrics []string `json:"exclude_metrics,omitempty"`
}
}
@@ -166,7 +168,7 @@ func (m *CPUFreqCollector) Init(config json.RawMessage) error {
"package_id": t.physicalPackageID,
}
}
m.statsProcessedMetrics = 0
m.init = true
return nil
}
@@ -203,9 +205,11 @@ func (m *CPUFreqCollector) Read(interval time.Duration, output chan lp.CCMetric)
}
if y, err := lp.New("cpufreq", t.tagSet, m.meta, map[string]interface{}{"value": cpuFreq}, now); err == nil {
m.statsProcessedMetrics++
output <- y
}
}
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
}
func (m *CPUFreqCollector) Close() {

View File

@@ -11,6 +11,7 @@ import (
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
)
const CPUSTATFILE = `/proc/stat`
@@ -21,10 +22,11 @@ type CpustatCollectorConfig struct {
type CpustatCollector struct {
metricCollector
config CpustatCollectorConfig
matches map[string]int
cputags map[string]map[string]string
nodetags map[string]string
config CpustatCollectorConfig
matches map[string]int
cputags map[string]map[string]string
nodetags map[string]string
statsProcessedMetrics int64
}
func (m *CpustatCollector) Init(config json.RawMessage) error {
@@ -86,6 +88,7 @@ func (m *CpustatCollector) Init(config json.RawMessage) error {
num_cpus++
}
}
m.statsProcessedMetrics = 0
m.init = true
return nil
}
@@ -106,6 +109,7 @@ func (m *CpustatCollector) parseStatLine(linefields []string, tags map[string]st
for name, value := range values {
y, err := lp.New(name, tags, m.meta, map[string]interface{}{"value": (value * 100.0) / total}, t)
if err == nil {
m.statsProcessedMetrics++
output <- y
}
}
@@ -141,8 +145,10 @@ func (m *CpustatCollector) Read(interval time.Duration, output chan lp.CCMetric)
time.Now(),
)
if err == nil {
m.statsProcessedMetrics++
output <- num_cpus_metric
}
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
}
func (m *CpustatCollector) Close() {

View File

@@ -10,6 +10,7 @@ import (
"time"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
influx "github.com/influxdata/line-protocol"
)
@@ -23,11 +24,14 @@ type CustomCmdCollectorConfig struct {
type CustomCmdCollector struct {
metricCollector
handler *influx.MetricHandler
parser *influx.Parser
config CustomCmdCollectorConfig
commands []string
files []string
handler *influx.MetricHandler
parser *influx.Parser
config CustomCmdCollectorConfig
commands []string
files []string
statsProcessedMetrics int64
statsProcessedCommands int64
statsProcessedFiles int64
}
func (m *CustomCmdCollector) Init(config json.RawMessage) error {
@@ -66,6 +70,9 @@ func (m *CustomCmdCollector) Init(config json.RawMessage) error {
m.handler = influx.NewMetricHandler()
m.parser = influx.NewParser(m.handler)
m.parser.SetTimeFunc(DefaultTime)
m.statsProcessedMetrics = 0
m.statsProcessedFiles = 0
m.statsProcessedCommands = 0
m.init = true
return nil
}
@@ -100,9 +107,13 @@ func (m *CustomCmdCollector) Read(interval time.Duration, output chan lp.CCMetri
y := lp.FromInfluxMetric(c)
if err == nil {
m.statsProcessedMetrics++
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
output <- y
}
}
m.statsProcessedCommands++
stats.ComponentStatInt(m.name, "processed_commands", m.statsProcessedCommands)
}
for _, file := range m.files {
buffer, err := ioutil.ReadFile(file)
@@ -122,9 +133,13 @@ func (m *CustomCmdCollector) Read(interval time.Duration, output chan lp.CCMetri
}
y := lp.FromInfluxMetric(f)
if err == nil {
m.statsProcessedMetrics++
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
output <- y
}
}
m.statsProcessedFiles++
stats.ComponentStatInt(m.name, "processed_files", m.statsProcessedFiles)
}
}

View File

@@ -11,6 +11,7 @@ import (
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
)
// "log"
@@ -23,9 +24,8 @@ type DiskstatCollectorConfig struct {
type DiskstatCollector struct {
metricCollector
//matches map[string]int
config IOstatCollectorConfig
//devices map[string]IOstatCollectorEntry
config DiskstatCollectorConfig
statsProcessedMetrics int64
}
func (m *DiskstatCollector) Init(config json.RawMessage) error {
@@ -44,6 +44,7 @@ func (m *DiskstatCollector) Init(config json.RawMessage) error {
return err
}
defer file.Close()
m.statsProcessedMetrics = 0
m.init = true
return nil
}
@@ -89,12 +90,16 @@ func (m *DiskstatCollector) Read(interval time.Duration, output chan lp.CCMetric
y, err := lp.New("disk_total", tags, m.meta, map[string]interface{}{"value": total}, time.Now())
if err == nil {
y.AddMeta("unit", "GBytes")
m.statsProcessedMetrics++
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
output <- y
}
free := (stat.Bfree * uint64(stat.Bsize)) / uint64(1000000000)
y, err = lp.New("disk_free", tags, m.meta, map[string]interface{}{"value": free}, time.Now())
if err == nil {
y.AddMeta("unit", "GBytes")
m.statsProcessedMetrics++
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
output <- y
}
perc := (100 * (total - free)) / total
@@ -105,6 +110,8 @@ func (m *DiskstatCollector) Read(interval time.Duration, output chan lp.CCMetric
y, err := lp.New("part_max_used", map[string]string{"type": "node"}, m.meta, map[string]interface{}{"value": int(part_max_used)}, time.Now())
if err == nil {
y.AddMeta("unit", "percent")
m.statsProcessedMetrics++
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
output <- y
}
}

View File

@@ -15,6 +15,7 @@ import (
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
)
const DEFAULT_GPFS_CMD = "mmpmon"
@@ -32,9 +33,10 @@ type GpfsCollector struct {
ExcludeFilesystem []string `json:"exclude_filesystem,omitempty"`
SendBandwidths bool `json:"send_bandwidths"`
}
skipFS map[string]struct{}
lastTimestamp time.Time // Store time stamp of last tick to derive bandwidths
lastState map[string]GpfsCollectorLastState
skipFS map[string]struct{}
lastTimestamp time.Time // Store time stamp of last tick to derive bandwidths
lastState map[string]GpfsCollectorLastState
statsProcessedMetrics int64
}
func (m *GpfsCollector) Init(config json.RawMessage) error {
@@ -86,7 +88,7 @@ func (m *GpfsCollector) Init(config json.RawMessage) error {
return fmt.Errorf("failed to find mmpmon binary '%s': %v", m.config.Mmpmon, err)
}
m.config.Mmpmon = p
m.statsProcessedMetrics = 0
m.init = true
return nil
}
@@ -211,12 +213,14 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) {
}
if y, err := lp.New("gpfs_bytes_read", m.tags, m.meta, map[string]interface{}{"value": bytesRead}, timestamp); err == nil {
output <- y
m.statsProcessedMetrics++
}
if m.config.SendBandwidths {
if lastBytesRead := m.lastState[filesystem].bytesRead; lastBytesRead >= 0 {
bwRead := float64(bytesRead-lastBytesRead) / timeDiff
if y, err := lp.New("gpfs_bw_read", m.tags, m.meta, map[string]interface{}{"value": bwRead}, timestamp); err == nil {
output <- y
m.statsProcessedMetrics++
}
}
}
@@ -231,12 +235,14 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) {
}
if y, err := lp.New("gpfs_bytes_written", m.tags, m.meta, map[string]interface{}{"value": bytesWritten}, timestamp); err == nil {
output <- y
m.statsProcessedMetrics++
}
if m.config.SendBandwidths {
if lastBytesWritten := m.lastState[filesystem].bytesRead; lastBytesWritten >= 0 {
bwWrite := float64(bytesWritten-lastBytesWritten) / timeDiff
if y, err := lp.New("gpfs_bw_write", m.tags, m.meta, map[string]interface{}{"value": bwWrite}, timestamp); err == nil {
output <- y
m.statsProcessedMetrics++
}
}
}
@@ -258,6 +264,7 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) {
}
if y, err := lp.New("gpfs_num_opens", m.tags, m.meta, map[string]interface{}{"value": numOpens}, timestamp); err == nil {
output <- y
m.statsProcessedMetrics++
}
// number of closes
@@ -270,6 +277,7 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) {
}
if y, err := lp.New("gpfs_num_closes", m.tags, m.meta, map[string]interface{}{"value": numCloses}, timestamp); err == nil {
output <- y
m.statsProcessedMetrics++
}
// number of reads
@@ -282,6 +290,7 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) {
}
if y, err := lp.New("gpfs_num_reads", m.tags, m.meta, map[string]interface{}{"value": numReads}, timestamp); err == nil {
output <- y
m.statsProcessedMetrics++
}
// number of writes
@@ -294,6 +303,7 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) {
}
if y, err := lp.New("gpfs_num_writes", m.tags, m.meta, map[string]interface{}{"value": numWrites}, timestamp); err == nil {
output <- y
m.statsProcessedMetrics++
}
// number of read directories
@@ -306,6 +316,7 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) {
}
if y, err := lp.New("gpfs_num_readdirs", m.tags, m.meta, map[string]interface{}{"value": numReaddirs}, timestamp); err == nil {
output <- y
m.statsProcessedMetrics++
}
// Number of inode updates
@@ -317,9 +328,11 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) {
continue
}
if y, err := lp.New("gpfs_num_inode_updates", m.tags, m.meta, map[string]interface{}{"value": numInodeUpdates}, timestamp); err == nil {
m.statsProcessedMetrics++
output <- y
}
}
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
}
func (m *GpfsCollector) Close() {

View File

@@ -7,6 +7,7 @@ import (
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
"golang.org/x/sys/unix"
"encoding/json"
@@ -18,13 +19,18 @@ import (
const IB_BASEPATH = "/sys/class/infiniband/"
type InfinibandCollectorMetric struct {
path string
unit string
}
type InfinibandCollectorInfo struct {
LID string // IB local Identifier (LID)
device string // IB device
port string // IB device port
portCounterFiles map[string]string // mapping counter name -> sysfs file
tagSet map[string]string // corresponding tag list
lastState map[string]int64 // State from last measurement
LID string // IB local Identifier (LID)
device string // IB device
port string // IB device port
portCounterFiles map[string]InfinibandCollectorMetric // mapping counter name -> InfinibandCollectorMetric
tagSet map[string]string // corresponding tag list
lastState map[string]int64 // State from last measurement
}
type InfinibandCollector struct {
@@ -34,8 +40,9 @@ type InfinibandCollector struct {
SendAbsoluteValues bool `json:"send_abs_values"` // Send absolut values as read from sys filesystem
SendDerivedValues bool `json:"send_derived_values"` // Send derived values e.g. rates
}
info []*InfinibandCollectorInfo
lastTimestamp time.Time // Store time stamp of last tick to derive bandwidths
info []*InfinibandCollectorInfo
lastTimestamp time.Time // Store time stamp of last tick to derive bandwidths
statsProcessedMetrics int64
}
// Init initializes the Infiniband collector by walking through files below IB_BASEPATH
@@ -106,16 +113,16 @@ func (m *InfinibandCollector) Init(config json.RawMessage) error {
// Check access to counter files
countersDir := filepath.Join(path, "counters")
portCounterFiles := map[string]string{
"ib_recv": filepath.Join(countersDir, "port_rcv_data"),
"ib_xmit": filepath.Join(countersDir, "port_xmit_data"),
"ib_recv_pkts": filepath.Join(countersDir, "port_rcv_packets"),
"ib_xmit_pkts": filepath.Join(countersDir, "port_xmit_packets"),
portCounterFiles := map[string]InfinibandCollectorMetric{
"ib_recv": {path: filepath.Join(countersDir, "port_rcv_data"), unit: "bytes"},
"ib_xmit": {path: filepath.Join(countersDir, "port_xmit_data"), unit: "bytes"},
"ib_recv_pkts": {path: filepath.Join(countersDir, "port_rcv_packets"), unit: "packets"},
"ib_xmit_pkts": {path: filepath.Join(countersDir, "port_xmit_packets"), unit: "packets"},
}
for _, counterFile := range portCounterFiles {
err := unix.Access(counterFile, unix.R_OK)
for _, counter := range portCounterFiles {
err := unix.Access(counter.path, unix.R_OK)
if err != nil {
return fmt.Errorf("unable to access %s: %v", counterFile, err)
return fmt.Errorf("unable to access %s: %v", counter.path, err)
}
}
@@ -144,7 +151,7 @@ func (m *InfinibandCollector) Init(config json.RawMessage) error {
if len(m.info) == 0 {
return fmt.Errorf("found no IB devices")
}
m.statsProcessedMetrics = 0
m.init = true
return nil
}
@@ -165,14 +172,14 @@ func (m *InfinibandCollector) Read(interval time.Duration, output chan lp.CCMetr
m.lastTimestamp = now
for _, info := range m.info {
for counterName, counterFile := range info.portCounterFiles {
for counterName, counterDef := range info.portCounterFiles {
// Read counter file
line, err := ioutil.ReadFile(counterFile)
line, err := ioutil.ReadFile(counterDef.path)
if err != nil {
cclog.ComponentError(
m.name,
fmt.Sprintf("Read(): Failed to read from file '%s': %v", counterFile, err))
fmt.Sprintf("Read(): Failed to read from file '%s': %v", counterDef.path, err))
continue
}
data := strings.TrimSpace(string(line))
@@ -189,7 +196,9 @@ func (m *InfinibandCollector) Read(interval time.Duration, output chan lp.CCMetr
// Send absolut values
if m.config.SendAbsoluteValues {
if y, err := lp.New(counterName, info.tagSet, m.meta, map[string]interface{}{"value": v}, now); err == nil {
y.AddMeta("unit", counterDef.unit)
output <- y
m.statsProcessedMetrics++
}
}
@@ -198,7 +207,9 @@ func (m *InfinibandCollector) Read(interval time.Duration, output chan lp.CCMetr
if info.lastState[counterName] >= 0 {
rate := float64((v - info.lastState[counterName])) / timeDiff
if y, err := lp.New(counterName+"_bw", info.tagSet, m.meta, map[string]interface{}{"value": rate}, now); err == nil {
y.AddMeta("unit", counterDef.unit+"/sec")
output <- y
m.statsProcessedMetrics++
}
}
// Save current state
@@ -207,6 +218,7 @@ func (m *InfinibandCollector) Read(interval time.Duration, output chan lp.CCMetr
}
}
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
}
func (m *InfinibandCollector) Close() {

View File

@@ -6,6 +6,7 @@ import (
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
// "log"
"encoding/json"
@@ -29,9 +30,10 @@ type IOstatCollectorEntry struct {
type IOstatCollector struct {
metricCollector
matches map[string]int
config IOstatCollectorConfig
devices map[string]IOstatCollectorEntry
matches map[string]int
config IOstatCollectorConfig
devices map[string]IOstatCollectorEntry
statsProcessedMetrics int64
}
func (m *IOstatCollector) Init(config json.RawMessage) error {
@@ -102,6 +104,7 @@ func (m *IOstatCollector) Init(config json.RawMessage) error {
lastValues: values,
}
}
m.statsProcessedMetrics = 0
m.init = true
return err
}
@@ -141,6 +144,7 @@ func (m *IOstatCollector) Read(interval time.Duration, output chan lp.CCMetric)
y, err := lp.New(name, entry.tags, m.meta, map[string]interface{}{"value": int(diff)}, time.Now())
if err == nil {
output <- y
m.statsProcessedMetrics++
}
}
entry.lastValues[name] = x
@@ -148,6 +152,7 @@ func (m *IOstatCollector) Read(interval time.Duration, output chan lp.CCMetric)
}
m.devices[device] = entry
}
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
}
func (m *IOstatCollector) Close() {

View File

@@ -11,6 +11,7 @@ import (
"time"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
)
const IPMITOOL_PATH = `ipmitool`
@@ -26,9 +27,10 @@ type IpmiCollector struct {
metricCollector
//tags map[string]string
//matches map[string]string
config IpmiCollectorConfig
ipmitool string
ipmisensors string
config IpmiCollectorConfig
ipmitool string
ipmisensors string
statsProcessedMetrics int64
}
func (m *IpmiCollector) Init(config json.RawMessage) error {
@@ -56,6 +58,7 @@ func (m *IpmiCollector) Init(config json.RawMessage) error {
if len(m.ipmitool) == 0 && len(m.ipmisensors) == 0 {
return errors.New("no IPMI reader found")
}
m.statsProcessedMetrics = 0
m.init = true
return nil
}
@@ -94,6 +97,7 @@ func (m *IpmiCollector) readIpmiTool(cmd string, output chan lp.CCMetric) {
if err == nil {
y.AddMeta("unit", unit)
output <- y
m.statsProcessedMetrics++
}
}
}
@@ -123,6 +127,7 @@ func (m *IpmiCollector) readIpmiSensors(cmd string, output chan lp.CCMetric) {
y.AddMeta("unit", lv[4])
}
output <- y
m.statsProcessedMetrics++
}
}
}
@@ -141,6 +146,7 @@ func (m *IpmiCollector) Read(interval time.Duration, output chan lp.CCMetric) {
m.readIpmiSensors(m.config.IpmisensorsPath, output)
}
}
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
}
func (m *IpmiCollector) Close() {

View File

@@ -16,6 +16,7 @@ import (
"math"
"os"
"os/signal"
"sort"
"strconv"
"strings"
"sync"
@@ -27,6 +28,7 @@ import (
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
topo "github.com/ClusterCockpit/cc-metric-collector/internal/ccTopology"
agg "github.com/ClusterCockpit/cc-metric-collector/internal/metricAggregator"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
"github.com/NVIDIA/go-nvml/pkg/dl"
)
@@ -54,6 +56,7 @@ type LikwidEventsetConfig struct {
gid C.int
eorder []*C.char
estr *C.char
go_estr string
results map[int]map[string]interface{}
metrics map[int]map[string]float64
}
@@ -70,18 +73,21 @@ type LikwidCollectorConfig struct {
type LikwidCollector struct {
metricCollector
cpulist []C.int
cpu2tid map[int]int
sock2tid map[int]int
metrics map[C.int]map[string]int
groups []C.int
config LikwidCollectorConfig
gmresults map[int]map[string]float64
basefreq float64
running bool
initialized bool
likwidGroups map[C.int]LikwidEventsetConfig
lock sync.Mutex
cpulist []C.int
cpu2tid map[int]int
sock2tid map[int]int
metrics map[C.int]map[string]int
groups []C.int
config LikwidCollectorConfig
gmresults map[int]map[string]float64
basefreq float64
running bool
initialized bool
likwidGroups map[C.int]LikwidEventsetConfig
lock sync.Mutex
statsMeasurements int64
statsProcessedMetrics int64
statsPublishedMetrics int64
}
type LikwidMetric struct {
@@ -101,8 +107,14 @@ func eventsToEventStr(events map[string]string) string {
func genLikwidEventSet(input LikwidCollectorEventsetConfig) LikwidEventsetConfig {
tmplist := make([]string, 0)
clist := make([]string, 0)
for k := range input.Events {
clist = append(clist, k)
}
sort.Strings(clist)
elist := make([]*C.char, 0)
for k, v := range input.Events {
for _, k := range clist {
v := input.Events[k]
tmplist = append(tmplist, fmt.Sprintf("%s:%s", v, k))
c_counter := C.CString(k)
elist = append(elist, c_counter)
@@ -124,6 +136,7 @@ func genLikwidEventSet(input LikwidCollectorEventsetConfig) LikwidEventsetConfig
gid: -1,
eorder: elist,
estr: C.CString(estr),
go_estr: estr,
results: res,
metrics: met,
}
@@ -193,7 +206,7 @@ func (m *LikwidCollector) Init(config json.RawMessage) error {
}
m.setup()
m.meta = map[string]string{"source": m.name, "group": "PerfCounter"}
m.meta = map[string]string{"group": "PerfCounter"}
cclog.ComponentDebug(m.name, "Get cpulist and init maps and lists")
cpulist := topo.CpuList()
m.cpulist = make([]C.int, len(cpulist))
@@ -258,6 +271,9 @@ func (m *LikwidCollector) Init(config json.RawMessage) error {
cclog.ComponentError(m.name, err.Error())
return err
}
m.statsMeasurements = 0
m.statsProcessedMetrics = 0
m.statsPublishedMetrics = 0
m.init = true
return nil
}
@@ -265,6 +281,7 @@ func (m *LikwidCollector) Init(config json.RawMessage) error {
// take a measurement for 'interval' seconds of event set index 'group'
func (m *LikwidCollector) takeMeasurement(evset LikwidEventsetConfig, interval time.Duration) (bool, error) {
var ret C.int
m.lock.Lock()
if m.initialized {
ret = C.perfmon_setupCounters(evset.gid)
@@ -308,6 +325,8 @@ func (m *LikwidCollector) takeMeasurement(evset LikwidEventsetConfig, interval t
}
}
m.lock.Unlock()
m.statsMeasurements++
stats.ComponentStatInt(m.name, "measurements", m.statsMeasurements)
return false, nil
}
@@ -348,6 +367,8 @@ func (m *LikwidCollector) calcEventsetMetrics(evset LikwidEventsetConfig, interv
if m.config.InvalidToZero && math.IsInf(value, 0) {
value = 0.0
}
m.statsProcessedMetrics++
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
// Now we have the result, send it with the proper tags
if !math.IsNaN(value) {
if metric.Publish {
@@ -360,6 +381,8 @@ func (m *LikwidCollector) calcEventsetMetrics(evset LikwidEventsetConfig, interv
if len(metric.Unit) > 0 {
y.AddMeta("unit", metric.Unit)
}
m.statsPublishedMetrics++
stats.ComponentStatInt(m.name, "published_metrics", m.statsPublishedMetrics)
output <- y
}
}
@@ -400,6 +423,8 @@ func (m *LikwidCollector) calcGlobalMetrics(interval time.Duration, output chan
if m.config.InvalidToZero && math.IsInf(value, 0) {
value = 0.0
}
m.statsProcessedMetrics++
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
// Now we have the result, send it with the proper tags
if !math.IsNaN(value) {
if metric.Publish {
@@ -413,6 +438,8 @@ func (m *LikwidCollector) calcGlobalMetrics(interval time.Duration, output chan
if len(metric.Unit) > 0 {
y.AddMeta("unit", metric.Unit)
}
m.statsPublishedMetrics++
stats.ComponentStatInt(m.name, "published_metrics", m.statsPublishedMetrics)
output <- y
}
}
@@ -425,6 +452,9 @@ func (m *LikwidCollector) calcGlobalMetrics(interval time.Duration, output chan
func (m *LikwidCollector) LateInit() error {
var ret C.int
if m.initialized {
return nil
}
switch m.config.AccessMode {
case "direct":
C.HPMmode(0)
@@ -475,7 +505,17 @@ func (m *LikwidCollector) LateInit() error {
for i, evset := range m.config.Eventsets {
var gid C.int
if len(evset.Events) > 0 {
skip := false
likwidGroup := genLikwidEventSet(evset)
for _, g := range m.likwidGroups {
if likwidGroup.go_estr == g.go_estr {
skip = true
break
}
}
if skip {
continue
}
// Now we add the list of events to likwid
gid = C.perfmon_addEventSet(likwidGroup.estr)
if gid >= 0 {
@@ -520,9 +560,14 @@ func (m *LikwidCollector) Read(interval time.Duration, output chan lp.CCMetric)
}
if !m.initialized {
if m.LateInit() != nil {
m.lock.Lock()
err = m.LateInit()
if err != nil {
m.lock.Unlock()
return
}
m.initialized = true
m.lock.Unlock()
}
if m.initialized && !skip {

View File

@@ -10,6 +10,7 @@ import (
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
)
//
@@ -32,6 +33,7 @@ type LoadavgCollector struct {
config struct {
ExcludeMetrics []string `json:"exclude_metrics,omitempty"`
}
statsProcessedMetrics int64
}
func (m *LoadavgCollector) Init(config json.RawMessage) error {
@@ -63,6 +65,7 @@ func (m *LoadavgCollector) Init(config json.RawMessage) error {
for i, name := range m.proc_matches {
_, m.proc_skips[i] = stringArrayContains(m.config.ExcludeMetrics, name)
}
m.statsProcessedMetrics = 0
m.init = true
return nil
}
@@ -98,6 +101,7 @@ func (m *LoadavgCollector) Read(interval time.Duration, output chan lp.CCMetric)
y, err := lp.New(name, m.tags, m.meta, map[string]interface{}{"value": x}, now)
if err == nil {
output <- y
m.statsProcessedMetrics++
}
}
@@ -117,9 +121,10 @@ func (m *LoadavgCollector) Read(interval time.Duration, output chan lp.CCMetric)
y, err := lp.New(name, m.tags, m.meta, map[string]interface{}{"value": x}, now)
if err == nil {
output <- y
m.statsProcessedMetrics++
}
}
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
}
func (m *LoadavgCollector) Close() {

View File

@@ -12,6 +12,7 @@ import (
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
)
const LUSTRE_SYSFS = `/sys/fs/lustre`
@@ -37,13 +38,14 @@ type LustreMetricDefinition struct {
type LustreCollector struct {
metricCollector
tags map[string]string
config LustreCollectorConfig
lctl string
sudoCmd string
lastTimestamp time.Time // Store time stamp of last tick to derive bandwidths
definitions []LustreMetricDefinition // Combined list without excluded metrics
stats map[string]map[string]int64 // Data for last value per device and metric
tags map[string]string
config LustreCollectorConfig
lctl string
sudoCmd string
lastTimestamp time.Time // Store time stamp of last tick to derive bandwidths
definitions []LustreMetricDefinition // Combined list without excluded metrics
stats map[string]map[string]int64 // Data for last value per device and metric
statsProcessedMetrics int64
}
func (m *LustreCollector) getDeviceDataCommand(device string) []string {
@@ -372,6 +374,7 @@ func (m *LustreCollector) Init(config json.RawMessage) error {
}
}
m.lastTimestamp = time.Now()
m.statsProcessedMetrics = 0
m.init = true
return nil
}
@@ -418,11 +421,13 @@ func (m *LustreCollector) Read(interval time.Duration, output chan lp.CCMetric)
y.AddMeta("unit", def.unit)
}
output <- y
m.statsProcessedMetrics++
}
devData[def.name] = use_x
}
}
m.lastTimestamp = now
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
}
func (m *LustreCollector) Close() {

View File

@@ -14,6 +14,7 @@ import (
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
)
const MEMSTATFILE = "/proc/meminfo"
@@ -32,12 +33,13 @@ type MemstatCollectorNode struct {
type MemstatCollector struct {
metricCollector
stats map[string]int64
tags map[string]string
matches map[string]string
config MemstatCollectorConfig
nodefiles map[int]MemstatCollectorNode
sendMemUsed bool
stats map[string]int64
tags map[string]string
matches map[string]string
config MemstatCollectorConfig
nodefiles map[int]MemstatCollectorNode
sendMemUsed bool
statsProcessedMetrics int64
}
type MemstatStats struct {
@@ -153,6 +155,7 @@ func (m *MemstatCollector) Init(config json.RawMessage) error {
}
}
}
m.statsProcessedMetrics = 0
m.init = true
return err
}
@@ -178,6 +181,7 @@ func (m *MemstatCollector) Read(interval time.Duration, output chan lp.CCMetric)
if len(unit) > 0 {
y.AddMeta("unit", unit)
}
m.statsProcessedMetrics++
output <- y
}
}
@@ -207,6 +211,7 @@ func (m *MemstatCollector) Read(interval time.Duration, output chan lp.CCMetric)
if len(unit) > 0 {
y.AddMeta("unit", unit)
}
m.statsProcessedMetrics++
output <- y
}
}
@@ -223,6 +228,7 @@ func (m *MemstatCollector) Read(interval time.Duration, output chan lp.CCMetric)
sendStats(stats, nodeConf.tags)
}
}
stats.ComponentStatInt(m.name, "collected_metrics", m.statsProcessedMetrics)
}
func (m *MemstatCollector) Close() {

View File

@@ -11,6 +11,7 @@ import (
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
)
const NETSTATFILE = "/proc/net/dev"
@@ -32,9 +33,10 @@ type NetstatCollectorMetric struct {
type NetstatCollector struct {
metricCollector
config NetstatCollectorConfig
matches map[string][]NetstatCollectorMetric
lastTimestamp time.Time
config NetstatCollectorConfig
matches map[string][]NetstatCollectorMetric
lastTimestamp time.Time
statsProcessedMetrics int64
}
func (m *NetstatCollector) Init(config json.RawMessage) error {
@@ -148,6 +150,7 @@ func (m *NetstatCollector) Init(config json.RawMessage) error {
if len(m.matches) == 0 {
return errors.New("no devices to collector metrics found")
}
m.statsProcessedMetrics = 0
m.init = true
return nil
}
@@ -198,6 +201,7 @@ func (m *NetstatCollector) Read(interval time.Duration, output chan lp.CCMetric)
if m.config.SendAbsoluteValues {
if y, err := lp.New(metric.name, metric.tags, metric.meta, map[string]interface{}{"value": v}, now); err == nil {
output <- y
m.statsProcessedMetrics++
}
}
if m.config.SendDerivedValues {
@@ -205,6 +209,7 @@ func (m *NetstatCollector) Read(interval time.Duration, output chan lp.CCMetric)
rate := float64(v-metric.lastValue) / timeDiff
if y, err := lp.New(metric.name+"_bw", metric.tags, metric.meta_rates, map[string]interface{}{"value": rate}, now); err == nil {
output <- y
m.statsProcessedMetrics++
}
}
metric.lastValue = v
@@ -212,6 +217,7 @@ func (m *NetstatCollector) Read(interval time.Duration, output chan lp.CCMetric)
}
}
}
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
}
func (m *NetstatCollector) Close() {

View File

@@ -12,6 +12,7 @@ import (
"time"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
)
// First part contains the code for the general NfsCollector.
@@ -32,7 +33,8 @@ type nfsCollector struct {
Nfsstats string `json:"nfsstat"`
ExcludeMetrics []string `json:"exclude_metrics,omitempty"`
}
data map[string]NfsCollectorData
data map[string]NfsCollectorData
statsProcessedMetrics int64
}
func (m *nfsCollector) initStats() error {
@@ -113,6 +115,7 @@ func (m *nfsCollector) MainInit(config json.RawMessage) error {
}
m.data = make(map[string]NfsCollectorData)
m.initStats()
m.statsProcessedMetrics = 0
m.init = true
return nil
}
@@ -143,8 +146,10 @@ func (m *nfsCollector) Read(interval time.Duration, output chan lp.CCMetric) {
if err == nil {
y.AddMeta("version", m.version)
output <- y
m.statsProcessedMetrics++
}
}
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
}
func (m *nfsCollector) Close() {

View File

@@ -12,6 +12,7 @@ import (
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
)
//
@@ -44,7 +45,8 @@ type NUMAStatsCollectorTopolgy struct {
type NUMAStatsCollector struct {
metricCollector
topology []NUMAStatsCollectorTopolgy
topology []NUMAStatsCollectorTopolgy
statsProcessedMetrics int64
}
func (m *NUMAStatsCollector) Init(config json.RawMessage) error {
@@ -80,7 +82,7 @@ func (m *NUMAStatsCollector) Init(config json.RawMessage) error {
tagSet: map[string]string{"memoryDomain": node},
})
}
m.statsProcessedMetrics = 0
m.init = true
return nil
}
@@ -127,11 +129,13 @@ func (m *NUMAStatsCollector) Read(interval time.Duration, output chan lp.CCMetri
)
if err == nil {
output <- y
m.statsProcessedMetrics++
}
}
file.Close()
}
stats.ComponentStatInt(m.name, "collected_metrics", m.statsProcessedMetrics)
}
func (m *NUMAStatsCollector) Close() {

View File

@@ -9,6 +9,7 @@ import (
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
"github.com/NVIDIA/go-nvml/pkg/nvml"
)
@@ -26,9 +27,10 @@ type NvidiaCollectorDevice struct {
type NvidiaCollector struct {
metricCollector
num_gpus int
config NvidiaCollectorConfig
gpus []NvidiaCollectorDevice
num_gpus int
config NvidiaCollectorConfig
gpus []NvidiaCollectorDevice
statsProcessedMetrics int64
}
func (m *NvidiaCollector) CatchPanic() {
@@ -120,7 +122,7 @@ func (m *NvidiaCollector) Init(config json.RawMessage) error {
pciInfo.Device)
}
}
m.statsProcessedMetrics = 0
m.init = true
return nil
}
@@ -151,6 +153,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric)
if err == nil {
y.AddMeta("unit", "%")
output <- y
m.statsProcessedMetrics++
}
}
if !device.excludeMetrics["nv_mem_util"] {
@@ -158,6 +161,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric)
if err == nil {
y.AddMeta("unit", "%")
output <- y
m.statsProcessedMetrics++
}
}
}
@@ -186,6 +190,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric)
if err == nil {
y.AddMeta("unit", "MByte")
output <- y
m.statsProcessedMetrics++
}
}
@@ -195,6 +200,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric)
if err == nil {
y.AddMeta("unit", "MByte")
output <- y
m.statsProcessedMetrics++
}
}
}
@@ -212,6 +218,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric)
if err == nil {
y.AddMeta("unit", "degC")
output <- y
m.statsProcessedMetrics++
}
}
}
@@ -232,6 +239,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric)
if err == nil {
y.AddMeta("unit", "%")
output <- y
m.statsProcessedMetrics++
}
}
}
@@ -258,11 +266,13 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric)
}
if err == nil {
output <- y
m.statsProcessedMetrics++
}
} else if ret == nvml.ERROR_NOT_SUPPORTED {
y, err := lp.New("nv_ecc_mode", device.tags, m.meta, map[string]interface{}{"value": "N/A"}, time.Now())
if err == nil {
output <- y
m.statsProcessedMetrics++
}
}
}
@@ -280,6 +290,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric)
y, err := lp.New("nv_perf_state", device.tags, m.meta, map[string]interface{}{"value": fmt.Sprintf("P%d", int(pState))}, time.Now())
if err == nil {
output <- y
m.statsProcessedMetrics++
}
}
}
@@ -296,6 +307,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric)
if err == nil {
y.AddMeta("unit", "watts")
output <- y
m.statsProcessedMetrics++
}
}
}
@@ -313,6 +325,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric)
if err == nil {
y.AddMeta("unit", "MHz")
output <- y
m.statsProcessedMetrics++
}
}
}
@@ -324,6 +337,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric)
if err == nil {
y.AddMeta("unit", "MHz")
output <- y
m.statsProcessedMetrics++
}
}
}
@@ -335,6 +349,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric)
if err == nil {
y.AddMeta("unit", "MHz")
output <- y
m.statsProcessedMetrics++
}
}
}
@@ -357,6 +372,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric)
if err == nil {
y.AddMeta("unit", "MHz")
output <- y
m.statsProcessedMetrics++
}
}
}
@@ -368,6 +384,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric)
if err == nil {
y.AddMeta("unit", "MHz")
output <- y
m.statsProcessedMetrics++
}
}
}
@@ -379,6 +396,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric)
if err == nil {
y.AddMeta("unit", "MHz")
output <- y
m.statsProcessedMetrics++
}
}
}
@@ -398,6 +416,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric)
y, err := lp.New("nv_ecc_db_error", device.tags, m.meta, map[string]interface{}{"value": float64(ecc_db)}, time.Now())
if err == nil {
output <- y
m.statsProcessedMetrics++
}
}
}
@@ -408,6 +427,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric)
y, err := lp.New("nv_ecc_sb_error", device.tags, m.meta, map[string]interface{}{"value": float64(ecc_sb)}, time.Now())
if err == nil {
output <- y
m.statsProcessedMetrics++
}
}
}
@@ -425,6 +445,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric)
if err == nil {
y.AddMeta("unit", "watts")
output <- y
m.statsProcessedMetrics++
}
}
}
@@ -441,6 +462,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric)
if err == nil {
y.AddMeta("unit", "%")
output <- y
m.statsProcessedMetrics++
}
}
}
@@ -457,11 +479,12 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric)
if err == nil {
y.AddMeta("unit", "%")
output <- y
m.statsProcessedMetrics++
}
}
}
}
stats.ComponentStatInt(m.name, "collected_metrics", m.statsProcessedMetrics)
}
func (m *NvidiaCollector) Close() {

View File

@@ -6,6 +6,7 @@ import (
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
)
// These are the fields we read from the JSON configuration
@@ -17,9 +18,10 @@ type SampleCollectorConfig struct {
// defined by metricCollector (name, init, ...)
type SampleCollector struct {
metricCollector
config SampleTimerCollectorConfig // the configuration structure
meta map[string]string // default meta information
tags map[string]string // default tags
config SampleTimerCollectorConfig // the configuration structure
meta map[string]string // default meta information
tags map[string]string // default tags
statsCount int64
}
// Functions to implement MetricCollector interface
@@ -58,6 +60,9 @@ func (m *SampleCollector) Init(config json.RawMessage) error {
// for all topological entities (sockets, NUMA domains, ...)
// Return some useful error message in case of any failures
// Initialize counts for statistics
m.statsCount = 0
// Set this flag only if everything is initialized properly, all required files exist, ...
m.init = true
return err
@@ -80,8 +85,11 @@ func (m *SampleCollector) Read(interval time.Duration, output chan lp.CCMetric)
if err == nil {
// Send it to output channel
output <- y
// increment count for each sent metric or any other operation
m.statsCount++
}
// Set stats for the component
stats.ComponentStatInt(m.name, "count", m.statsCount)
}
// Close metric collector: close network connection, close files, close libraries, ...

View File

@@ -11,6 +11,7 @@ import (
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
)
// See: https://www.kernel.org/doc/html/latest/hwmon/sysfs-interface.html
@@ -40,7 +41,8 @@ type TempCollector struct {
ReportMaxTemp bool `json:"report_max_temperature"`
ReportCriticalTemp bool `json:"report_critical_temperature"`
}
sensors []*TempCollectorSensor
sensors []*TempCollectorSensor
statsProcessedMetrics int64
}
func (m *TempCollector) Init(config json.RawMessage) error {
@@ -162,6 +164,7 @@ func (m *TempCollector) Init(config json.RawMessage) error {
}
// Finished initialization
m.statsProcessedMetrics = 0
m.init = true
return nil
}
@@ -194,6 +197,7 @@ func (m *TempCollector) Read(interval time.Duration, output chan lp.CCMetric) {
)
if err == nil {
output <- y
m.statsProcessedMetrics++
}
// max temperature
@@ -207,6 +211,7 @@ func (m *TempCollector) Read(interval time.Duration, output chan lp.CCMetric) {
)
if err == nil {
output <- y
m.statsProcessedMetrics++
}
}
@@ -221,10 +226,11 @@ func (m *TempCollector) Read(interval time.Duration, output chan lp.CCMetric) {
)
if err == nil {
output <- y
m.statsProcessedMetrics++
}
}
}
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
}
func (m *TempCollector) Close() {

View File

@@ -10,6 +10,7 @@ import (
"time"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
)
const MAX_NUM_PROCS = 10
@@ -21,8 +22,9 @@ type TopProcsCollectorConfig struct {
type TopProcsCollector struct {
metricCollector
tags map[string]string
config TopProcsCollectorConfig
tags map[string]string
config TopProcsCollectorConfig
statsProcessedMetrics int64
}
func (m *TopProcsCollector) Init(config json.RawMessage) error {
@@ -48,6 +50,7 @@ func (m *TopProcsCollector) Init(config json.RawMessage) error {
if err != nil {
return errors.New("failed to execute command")
}
m.statsProcessedMetrics = 0
m.init = true
return nil
}
@@ -70,8 +73,10 @@ func (m *TopProcsCollector) Read(interval time.Duration, output chan lp.CCMetric
y, err := lp.New(name, m.tags, m.meta, map[string]interface{}{"value": string(lines[i])}, time.Now())
if err == nil {
output <- y
m.statsProcessedMetrics++
}
}
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
}
func (m *TopProcsCollector) Close() {

View File

@@ -0,0 +1,17 @@
# Stats API
The Stats API can be used for debugging. It publishes counts at an HTTP endpoint as JSON from different componenets of the CC Metric Collector.
# Configuration
The Stats API has an own configuration file to specify the listen host and port. The defaults are `localhost` and `8080`.
```json
{
"bindhost" : "",
"port" : "8080",
"publish_collectorstate" : true
}
```
The `bindhost` and `port` can be used to specify the listen host and port. The `publish_collectorstate` needs to be `true`, otherwise nothing is presented. This option is for future use if we need to publish more infos using different domains.

View File

@@ -0,0 +1,232 @@
package metricRouter
import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"sync"
"time"
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
mct "github.com/ClusterCockpit/cc-metric-collector/internal/multiChanTicker"
"github.com/gorilla/mux"
)
type statsApiConfig struct {
PublishCollectorState bool `json:"publish_collectorstate"`
Host string `json:"bindhost"`
Port string `json:"port"`
}
// Metric cache data structure
type statsApi struct {
name string
input chan lp.CCMetric
indone chan bool
outdone chan bool
config statsApiConfig
wg *sync.WaitGroup
statsWg sync.WaitGroup
ticker mct.MultiChanTicker
tickchan chan time.Time
server *http.Server
router *mux.Router
lock sync.Mutex
baseurl string
stats map[string]map[string]int64
outStats map[string]map[string]int64
}
type StatsApi interface {
Start()
Close()
StatsFunc(w http.ResponseWriter, r *http.Request)
}
var statsApiServer *statsApi = nil
func (a *statsApi) updateStats(point lp.CCMetric) {
switch point.Name() {
case "_stats":
if name, nok := point.GetMeta("source"); nok {
var compStats map[string]int64
var ok bool
if compStats, ok = a.stats[name]; !ok {
a.stats[name] = make(map[string]int64)
compStats = a.stats[name]
}
for k, v := range point.Fields() {
switch value := v.(type) {
case int:
compStats[k] = int64(value)
case uint:
compStats[k] = int64(value)
case int32:
compStats[k] = int64(value)
case uint32:
compStats[k] = int64(value)
case int64:
compStats[k] = int64(value)
case uint64:
compStats[k] = int64(value)
default:
cclog.ComponentDebug(a.name, "Unusable stats for", k, ". Values should be int64")
}
}
a.stats[name] = compStats
}
}
}
func (a *statsApi) Start() {
a.ticker.AddChannel(a.tickchan)
a.wg.Add(1)
a.statsWg.Add(1)
go func() {
a.stats = make(map[string]map[string]int64)
defer a.statsWg.Done()
for {
select {
case <-a.indone:
cclog.ComponentDebug(a.name, "INPUT DONE")
close(a.indone)
return
case p := <-a.input:
a.lock.Lock()
a.updateStats(p)
a.lock.Unlock()
}
}
}()
a.statsWg.Add(1)
go func() {
a.outStats = make(map[string]map[string]int64)
defer a.statsWg.Done()
a.lock.Lock()
for comp, compData := range a.stats {
var outData map[string]int64
var ok bool
if outData, ok = a.outStats[comp]; !ok {
outData = make(map[string]int64)
}
for k, v := range compData {
outData[k] = v
}
a.outStats[comp] = outData
}
a.lock.Unlock()
for {
select {
case <-a.outdone:
cclog.ComponentDebug(a.name, "OUTPUT DONE")
close(a.outdone)
return
case <-a.tickchan:
a.lock.Lock()
for comp, compData := range a.stats {
var outData map[string]int64
var ok bool
if outData, ok = a.outStats[comp]; !ok {
outData = make(map[string]int64)
}
for k, v := range compData {
outData[k] = v
}
a.outStats[comp] = outData
}
a.lock.Unlock()
}
}
}()
a.statsWg.Add(1)
go func() {
defer a.statsWg.Done()
err := a.server.ListenAndServe()
if err != nil && err.Error() != "http: Server closed" {
cclog.ComponentError(a.name, err.Error())
}
cclog.ComponentDebug(a.name, "SERVER DONE")
}()
cclog.ComponentDebug(a.name, "STARTED")
}
func (a *statsApi) StatsFunc(w http.ResponseWriter, r *http.Request) {
data, err := json.Marshal(a.outStats)
if err == nil {
w.Header().Set("Content-Type", "application/json")
io.WriteString(w, string(data))
}
}
// Close finishes / stops the metric cache
func (a *statsApi) Close() {
cclog.ComponentDebug(a.name, "CLOSE")
a.indone <- true
a.outdone <- true
a.server.Shutdown(context.Background())
// wait for close of channel r.done
<-a.indone
<-a.outdone
a.statsWg.Wait()
a.wg.Done()
//a.wg.Wait()
}
func NewStatsApi(ticker mct.MultiChanTicker, wg *sync.WaitGroup, statsApiConfigfile string) (StatsApi, error) {
a := new(statsApi)
a.name = "StatsApi"
a.config.Host = "localhost"
a.config.Port = "8080"
configFile, err := os.Open(statsApiConfigfile)
if err != nil {
cclog.ComponentError(a.name, err.Error())
return nil, err
}
defer configFile.Close()
jsonParser := json.NewDecoder(configFile)
err = jsonParser.Decode(&a.config)
if err != nil {
cclog.ComponentError(a.name, err.Error())
return nil, err
}
a.input = make(chan lp.CCMetric)
a.ticker = ticker
a.tickchan = make(chan time.Time)
a.wg = wg
a.indone = make(chan bool)
a.outdone = make(chan bool)
a.router = mux.NewRouter()
a.baseurl = fmt.Sprintf("%s:%s", a.config.Host, a.config.Port)
a.server = &http.Server{Addr: a.baseurl, Handler: a.router}
if a.config.PublishCollectorState {
a.router.HandleFunc("/", a.StatsFunc)
}
statsApiServer = a
return a, nil
}
func ComponentStatInt(component string, key string, value int64) {
if statsApiServer == nil {
return
}
y, err := lp.New("_stats", map[string]string{}, map[string]string{"source": component}, map[string]interface{}{key: value}, time.Now())
if err == nil {
statsApiServer.input <- y
}
}
func ComponentStatString(component string, key string, value int64) {
if statsApiServer == nil {
return
}
y, err := lp.New("_stats", map[string]string{}, map[string]string{"source": component}, map[string]interface{}{key: value}, time.Now())
if err == nil {
statsApiServer.input <- y
}
}

View File

@@ -40,20 +40,26 @@ type metricRouterConfig struct {
// Metric router data structure
type metricRouter struct {
hostname string // Hostname used in tags
coll_input chan lp.CCMetric // Input channel from CollectorManager
recv_input chan lp.CCMetric // Input channel from ReceiveManager
cache_input chan lp.CCMetric // Input channel from MetricCache
outputs []chan lp.CCMetric // List of all output channels
done chan bool // channel to finish / stop metric router
wg *sync.WaitGroup // wait group for all goroutines in cc-metric-collector
timestamp time.Time // timestamp periodically updated by ticker each interval
timerdone chan bool // channel to finish / stop timestamp updater
ticker mct.MultiChanTicker // periodically ticking once each interval
config metricRouterConfig // json encoded config for metric router
cache MetricCache // pointer to MetricCache
cachewg sync.WaitGroup // wait group for MetricCache
maxForward int // number of metrics to forward maximally in one iteration
hostname string // Hostname used in tags
coll_input chan lp.CCMetric // Input channel from CollectorManager
recv_input chan lp.CCMetric // Input channel from ReceiveManager
cache_input chan lp.CCMetric // Input channel from MetricCache
outputs []chan lp.CCMetric // List of all output channels
done chan bool // channel to finish / stop metric router
wg *sync.WaitGroup // wait group for all goroutines in cc-metric-collector
timestamp time.Time // timestamp periodically updated by ticker each interval
timerdone chan bool // channel to finish / stop timestamp updater
ticker mct.MultiChanTicker // periodically ticking once each interval
config metricRouterConfig // json encoded config for metric router
cache MetricCache // pointer to MetricCache
cachewg sync.WaitGroup // wait group for MetricCache
maxForward int // number of metrics to forward maximally in one iteration
statsCollForward int64
statsRecvForward int64
statsCacheForward int64
statsTotalForward int64
statsDropped int64
statsRenamed int64
}
// MetricRouter access functions
@@ -121,6 +127,12 @@ func (r *metricRouter) Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, rout
for _, mname := range r.config.DropMetrics {
r.config.dropMetrics[mname] = true
}
r.statsCollForward = 0
r.statsRecvForward = 0
r.statsCacheForward = 0
r.statsTotalForward = 0
r.statsDropped = 0
r.statsRenamed = 0
return nil
}
@@ -140,6 +152,7 @@ func (r *metricRouter) StartTimer() {
cclog.ComponentDebug("MetricRouter", "TIMER DONE")
return
case t := <-m:
cclog.ComponentDebug("MetricRouter", "INTERVAL_TICK", t.Unix())
r.timestamp = t
}
}
@@ -253,6 +266,8 @@ func (r *metricRouter) Start() {
r.DoDelTags(point)
name := point.Name()
if new, ok := r.config.RenameMetrics[name]; ok {
r.statsRenamed++
ComponentStatInt("MetricRouter", "renamed", r.statsRenamed)
point.SetName(new)
point.AddMeta("oldname", name)
}
@@ -272,7 +287,14 @@ func (r *metricRouter) Start() {
p.SetTime(r.timestamp)
}
if !r.dropMetric(p) {
r.statsCollForward++
r.statsTotalForward++
ComponentStatInt("MetricRouter", "collector_forward", r.statsCollForward)
ComponentStatInt("MetricRouter", "total_forward", r.statsTotalForward)
forward(p)
} else {
r.statsDropped++
ComponentStatInt("MetricRouter", "dropped", r.statsDropped)
}
// even if the metric is dropped, it is stored in the cache for
// aggregations
@@ -288,7 +310,14 @@ func (r *metricRouter) Start() {
p.SetTime(r.timestamp)
}
if !r.dropMetric(p) {
r.statsRecvForward++
r.statsTotalForward++
ComponentStatInt("MetricRouter", "receiver_forward", r.statsRecvForward)
ComponentStatInt("MetricRouter", "total_forward", r.statsTotalForward)
forward(p)
} else {
r.statsDropped++
ComponentStatInt("MetricRouter", "dropped", r.statsDropped)
}
}
@@ -297,7 +326,14 @@ func (r *metricRouter) Start() {
// receive from metric collector
if !r.dropMetric(p) {
p.AddTag(r.config.HostnameTagName, r.hostname)
r.statsCacheForward++
r.statsTotalForward++
ComponentStatInt("MetricRouter", "cache_forward", r.statsCacheForward)
ComponentStatInt("MetricRouter", "total_forward", r.statsTotalForward)
forward(p)
} else {
r.statsDropped++
ComponentStatInt("MetricRouter", "dropped", r.statsDropped)
}
}

View File

@@ -11,6 +11,7 @@ import (
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
)
const GMETRIC_EXEC = `gmetric`
@@ -29,9 +30,10 @@ type GangliaSinkConfig struct {
type GangliaSink struct {
sink
gmetric_path string
gmetric_config string
config GangliaSinkConfig
gmetric_path string
gmetric_config string
config GangliaSinkConfig
statsSentMetrics int64
}
func (s *GangliaSink) Write(point lp.CCMetric) error {
@@ -78,6 +80,8 @@ func (s *GangliaSink) Write(point lp.CCMetric) error {
command := exec.Command(s.gmetric_path, argstr...)
command.Wait()
_, err = command.Output()
s.statsSentMetrics++
stats.ComponentStatInt(s.name, "sent_metrics", s.statsSentMetrics)
return err
}
@@ -120,5 +124,6 @@ func NewGangliaSink(name string, config json.RawMessage) (Sink, error) {
if len(s.config.GmetricConfig) > 0 {
s.gmetric_config = s.config.GmetricConfig
}
s.statsSentMetrics = 0
return s, nil
}

View File

@@ -11,6 +11,7 @@ import (
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
influx "github.com/influxdata/line-protocol"
)
@@ -36,19 +37,21 @@ type HttpSink struct {
idleConnTimeout time.Duration
timeout time.Duration
flushDelay time.Duration
statsProcessed int64
statsFlushes int64
}
func (s *HttpSink) Write(m lp.CCMetric) error {
if s.buffer.Len() == 0 && s.flushDelay != 0 {
// This is the first write since the last flush, start the flushTimer!
if s.flushTimer != nil && s.flushTimer.Stop() {
cclog.ComponentDebug("HttpSink", "unexpected: the flushTimer was already running?")
cclog.ComponentDebug(s.name, "unexpected: the flushTimer was already running?")
}
// Run a batched flush for all lines that have arrived in the last second
s.flushTimer = time.AfterFunc(s.flushDelay, func() {
if err := s.Flush(); err != nil {
cclog.ComponentError("HttpSink", "flush failed:", err.Error())
cclog.ComponentError(s.name, "flush failed:", err.Error())
}
})
}
@@ -60,8 +63,11 @@ func (s *HttpSink) Write(m lp.CCMetric) error {
s.lock.Unlock() // defer does not work here as Flush() takes the lock as well
if err != nil {
cclog.ComponentError(s.name, "encoding failed:", err.Error())
return err
}
s.statsProcessed++
stats.ComponentStatInt(s.name, "processed_metrics", s.statsProcessed)
// Flush synchronously if "flush_delay" is zero
if s.flushDelay == 0 {
@@ -84,6 +90,7 @@ func (s *HttpSink) Flush() error {
// Create new request to send buffer
req, err := http.NewRequest(http.MethodPost, s.config.URL, s.buffer)
if err != nil {
cclog.ComponentError(s.name, "failed to create request:", err.Error())
return err
}
@@ -100,13 +107,18 @@ func (s *HttpSink) Flush() error {
// Handle transport/tcp errors
if err != nil {
cclog.ComponentError(s.name, "transport/tcp error:", err.Error())
return err
}
// Handle application errors
if res.StatusCode != http.StatusOK {
return errors.New(res.Status)
err = errors.New(res.Status)
cclog.ComponentError(s.name, "application error:", err.Error())
return err
}
s.statsFlushes++
stats.ComponentStatInt(s.name, "flushes", s.statsFlushes)
return nil
}
@@ -114,7 +126,7 @@ func (s *HttpSink) Flush() error {
func (s *HttpSink) Close() {
s.flushTimer.Stop()
if err := s.Flush(); err != nil {
cclog.ComponentError("HttpSink", "flush failed:", err.Error())
cclog.ComponentError(s.name, "flush failed:", err.Error())
}
s.client.CloseIdleConnections()
}
@@ -172,5 +184,7 @@ func NewHttpSink(name string, config json.RawMessage) (Sink, error) {
s.buffer = &bytes.Buffer{}
s.encoder = influx.NewEncoder(s.buffer)
s.encoder.SetPrecision(time.Second)
s.statsFlushes = 0
s.statsProcessed = 0
return s, nil
}

View File

@@ -10,6 +10,7 @@ import (
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
influxdb2Api "github.com/influxdata/influxdb-client-go/v2/api"
)
@@ -28,10 +29,10 @@ type InfluxAsyncSinkConfig struct {
BatchSize uint `json:"batch_size,omitempty"`
// Interval, in ms, in which is buffer flushed if it has not been already written (by reaching batch size) . Default 1000ms
FlushInterval uint `json:"flush_interval,omitempty"`
InfluxRetryInterval string `json:"retry_interval"`
InfluxExponentialBase uint `json:"retry_exponential_base"`
InfluxMaxRetries uint `json:"max_retries"`
InfluxMaxRetryTime string `json:"max_retry_time"`
InfluxRetryInterval string `json:"retry_interval,omitempty"`
InfluxExponentialBase uint `json:"retry_exponential_base,omitempty"`
InfluxMaxRetries uint `json:"max_retries,omitempty"`
InfluxMaxRetryTime string `json:"max_retry_time,omitempty"`
}
type InfluxAsyncSink struct {
@@ -42,6 +43,9 @@ type InfluxAsyncSink struct {
config InfluxAsyncSinkConfig
influxRetryInterval uint
influxMaxRetryTime uint
sentMetrics int64
statsFlushes int64
statsErrors int64
}
func (s *InfluxAsyncSink) connect() error {
@@ -60,20 +64,34 @@ func (s *InfluxAsyncSink) connect() error {
cclog.ComponentDebug(s.name, "Using URI", uri, "Org", s.config.Organization, "Bucket", s.config.Database)
clientOptions := influxdb2.DefaultOptions()
if s.config.BatchSize != 0 {
cclog.ComponentDebug(s.name, "Batch size", s.config.BatchSize)
clientOptions.SetBatchSize(s.config.BatchSize)
}
if s.config.FlushInterval != 0 {
cclog.ComponentDebug(s.name, "Flush interval", s.config.FlushInterval)
clientOptions.SetFlushInterval(s.config.FlushInterval)
}
if s.influxRetryInterval != 0 {
cclog.ComponentDebug(s.name, "MaxRetryInterval", s.influxRetryInterval)
clientOptions.SetMaxRetryInterval(s.influxRetryInterval)
}
if s.influxMaxRetryTime != 0 {
cclog.ComponentDebug(s.name, "MaxRetryTime", s.influxMaxRetryTime)
clientOptions.SetMaxRetryTime(s.influxMaxRetryTime)
}
if s.config.InfluxExponentialBase != 0 {
cclog.ComponentDebug(s.name, "Exponential Base", s.config.InfluxExponentialBase)
clientOptions.SetExponentialBase(s.config.InfluxExponentialBase)
}
if s.config.InfluxMaxRetries != 0 {
cclog.ComponentDebug(s.name, "Max Retries", s.config.InfluxMaxRetries)
clientOptions.SetMaxRetries(s.config.InfluxMaxRetries)
}
clientOptions.SetTLSConfig(
&tls.Config{
InsecureSkipVerify: true,
},
)
clientOptions.SetMaxRetryInterval(s.influxRetryInterval)
clientOptions.SetMaxRetryTime(s.influxMaxRetryTime)
clientOptions.SetExponentialBase(s.config.InfluxExponentialBase)
clientOptions.SetMaxRetries(s.config.InfluxMaxRetries)
).SetPrecision(time.Second)
s.client = influxdb2.NewClientWithOptions(uri, auth, clientOptions)
s.writeApi = s.client.WriteAPI(s.config.Organization, s.config.Database)
@@ -91,11 +109,15 @@ func (s *InfluxAsyncSink) Write(m lp.CCMetric) error {
s.writeApi.WritePoint(
m.ToPoint(s.meta_as_tags),
)
s.sentMetrics++
stats.ComponentStatInt(s.name, "send_metrics", s.sentMetrics)
return nil
}
func (s *InfluxAsyncSink) Flush() error {
s.writeApi.Flush()
s.statsFlushes++
stats.ComponentStatInt(s.name, "flushes", s.statsFlushes)
return nil
}
@@ -110,13 +132,14 @@ func NewInfluxAsyncSink(name string, config json.RawMessage) (Sink, error) {
s.name = fmt.Sprintf("InfluxSink(%s)", name)
// Set default for maximum number of points sent to server in single request.
s.config.BatchSize = 100
s.influxRetryInterval = uint(time.Duration(1) * time.Second)
s.config.InfluxRetryInterval = "1s"
s.influxMaxRetryTime = uint(7 * time.Duration(24) * time.Hour)
s.config.InfluxMaxRetryTime = "168h"
s.config.InfluxMaxRetries = 20
s.config.InfluxExponentialBase = 2
s.config.BatchSize = 0
s.influxRetryInterval = 0
//s.config.InfluxRetryInterval = "1s"
s.influxMaxRetryTime = 0
//s.config.InfluxMaxRetryTime = "168h"
s.config.InfluxMaxRetries = 0
s.config.InfluxExponentialBase = 0
s.config.FlushInterval = 0
// Default retry intervals (in seconds)
// 1 2
@@ -174,12 +197,17 @@ func NewInfluxAsyncSink(name string, config json.RawMessage) (Sink, error) {
}
// Start background: Read from error channel
s.statsErrors = 0
s.errors = s.writeApi.Errors()
go func() {
for err := range s.errors {
s.statsErrors++
stats.ComponentStatInt(s.name, "errors", s.statsErrors)
cclog.ComponentError(s.name, err.Error())
}
}()
s.sentMetrics = 0
s.statsFlushes = 0
return s, nil
}

View File

@@ -6,38 +6,49 @@ import (
"encoding/json"
"errors"
"fmt"
"sync"
"time"
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
influxdb2Api "github.com/influxdata/influxdb-client-go/v2/api"
"github.com/influxdata/influxdb-client-go/v2/api/write"
)
type InfluxSinkConfig struct {
defaultSinkConfig
Host string `json:"host,omitempty"`
Port string `json:"port,omitempty"`
Database string `json:"database,omitempty"`
User string `json:"user,omitempty"`
Password string `json:"password,omitempty"`
Organization string `json:"organization,omitempty"`
SSL bool `json:"ssl,omitempty"`
RetentionPol string `json:"retention_policy,omitempty"`
InfluxRetryInterval string `json:"retry_interval"`
InfluxExponentialBase uint `json:"retry_exponential_base"`
InfluxMaxRetries uint `json:"max_retries"`
InfluxMaxRetryTime string `json:"max_retry_time"`
Host string `json:"host,omitempty"`
Port string `json:"port,omitempty"`
Database string `json:"database,omitempty"`
User string `json:"user,omitempty"`
Password string `json:"password,omitempty"`
Organization string `json:"organization,omitempty"`
SSL bool `json:"ssl,omitempty"`
FlushDelay string `json:"flush_delay,omitempty"`
BatchSize int `json:"batch_size,omitempty"`
RetentionPol string `json:"retention_policy,omitempty"`
// InfluxRetryInterval string `json:"retry_interval"`
// InfluxExponentialBase uint `json:"retry_exponential_base"`
// InfluxMaxRetries uint `json:"max_retries"`
// InfluxMaxRetryTime string `json:"max_retry_time"`
//InfluxMaxRetryDelay string `json:"max_retry_delay"` // It is mentioned in the docs but there is no way to set it
}
type InfluxSink struct {
sink
client influxdb2.Client
writeApi influxdb2Api.WriteAPIBlocking
config InfluxSinkConfig
influxRetryInterval uint
influxMaxRetryTime uint
client influxdb2.Client
writeApi influxdb2Api.WriteAPIBlocking
config InfluxSinkConfig
influxRetryInterval uint
influxMaxRetryTime uint
batch []*write.Point
flushTimer *time.Timer
flushDelay time.Duration
lock sync.Mutex // Flush() runs in another goroutine, so this lock has to protect the buffer
statsSentMetrics int64
statsProcessedMetrics int64
//influxMaxRetryDelay uint
}
@@ -56,16 +67,31 @@ func (s *InfluxSink) connect() error {
}
cclog.ComponentDebug(s.name, "Using URI", uri, "Org", s.config.Organization, "Bucket", s.config.Database)
clientOptions := influxdb2.DefaultOptions()
// if s.influxRetryInterval != 0 {
// cclog.ComponentDebug(s.name, "MaxRetryInterval", s.influxRetryInterval)
// clientOptions.SetMaxRetryInterval(s.influxRetryInterval)
// }
// if s.influxMaxRetryTime != 0 {
// cclog.ComponentDebug(s.name, "MaxRetryTime", s.influxMaxRetryTime)
// clientOptions.SetMaxRetryTime(s.influxMaxRetryTime)
// }
// if s.config.InfluxExponentialBase != 0 {
// cclog.ComponentDebug(s.name, "Exponential Base", s.config.InfluxExponentialBase)
// clientOptions.SetExponentialBase(s.config.InfluxExponentialBase)
// }
// if s.config.InfluxMaxRetries != 0 {
// cclog.ComponentDebug(s.name, "Max Retries", s.config.InfluxMaxRetries)
// clientOptions.SetMaxRetries(s.config.InfluxMaxRetries)
// }
clientOptions.SetTLSConfig(
&tls.Config{
InsecureSkipVerify: true,
},
)
clientOptions.SetMaxRetryInterval(s.influxRetryInterval)
clientOptions.SetMaxRetryTime(s.influxMaxRetryTime)
clientOptions.SetExponentialBase(s.config.InfluxExponentialBase)
clientOptions.SetMaxRetries(s.config.InfluxMaxRetries)
clientOptions.SetPrecision(time.Second)
s.client = influxdb2.NewClientWithOptions(uri, auth, clientOptions)
s.writeApi = s.client.WriteAPIBlocking(s.config.Organization, s.config.Database)
@@ -80,38 +106,80 @@ func (s *InfluxSink) connect() error {
}
func (s *InfluxSink) Write(m lp.CCMetric) error {
err :=
s.writeApi.WritePoint(
context.Background(),
m.ToPoint(s.meta_as_tags),
)
return err
// err :=
// s.writeApi.WritePoint(
// context.Background(),
// m.ToPoint(s.meta_as_tags),
// )
if len(s.batch) == 0 && s.flushDelay != 0 {
// This is the first write since the last flush, start the flushTimer!
if s.flushTimer != nil && s.flushTimer.Stop() {
cclog.ComponentDebug(s.name, "unexpected: the flushTimer was already running?")
}
// Run a batched flush for all lines that have arrived in the last second
s.flushTimer = time.AfterFunc(s.flushDelay, func() {
if err := s.Flush(); err != nil {
cclog.ComponentError(s.name, "flush failed:", err.Error())
}
})
}
p := m.ToPoint(s.meta_as_tags)
s.lock.Lock()
s.statsProcessedMetrics++
s.batch = append(s.batch, p)
s.lock.Unlock()
stats.ComponentStatInt(s.name, "processed_metrics", s.statsProcessedMetrics)
// Flush synchronously if "flush_delay" is zero
if s.flushDelay == 0 {
return s.Flush()
}
return nil
}
func (s *InfluxSink) Flush() error {
s.lock.Lock()
defer s.lock.Unlock()
if len(s.batch) == 0 {
return nil
}
err := s.writeApi.WritePoint(context.Background(), s.batch...)
if err != nil {
cclog.ComponentError(s.name, "flush failed:", err.Error())
return err
}
s.statsSentMetrics += int64(len(s.batch))
stats.ComponentStatInt(s.name, "sent_metrics", s.statsSentMetrics)
s.batch = s.batch[:0]
return nil
}
func (s *InfluxSink) Close() {
cclog.ComponentDebug(s.name, "Closing InfluxDB connection")
s.flushTimer.Stop()
s.Flush()
s.client.Close()
}
func NewInfluxSink(name string, config json.RawMessage) (Sink, error) {
s := new(InfluxSink)
s.name = fmt.Sprintf("InfluxSink(%s)", name)
s.config.BatchSize = 100
s.config.FlushDelay = "1s"
if len(config) > 0 {
err := json.Unmarshal(config, &s.config)
if err != nil {
return nil, err
}
}
s.influxRetryInterval = uint(time.Duration(1) * time.Second)
s.config.InfluxRetryInterval = "1s"
s.influxMaxRetryTime = uint(7 * time.Duration(24) * time.Hour)
s.config.InfluxMaxRetryTime = "168h"
s.config.InfluxMaxRetries = 20
s.config.InfluxExponentialBase = 2
s.influxRetryInterval = 0
s.influxMaxRetryTime = 0
// s.config.InfluxRetryInterval = ""
// s.config.InfluxMaxRetryTime = ""
// s.config.InfluxMaxRetries = 0
// s.config.InfluxExponentialBase = 0
if len(s.config.Host) == 0 ||
len(s.config.Port) == 0 ||
@@ -126,19 +194,31 @@ func NewInfluxSink(name string, config json.RawMessage) (Sink, error) {
s.meta_as_tags[k] = true
}
toUint := func(duration string, def uint) uint {
t, err := time.ParseDuration(duration)
// toUint := func(duration string, def uint) uint {
// if len(duration) > 0 {
// t, err := time.ParseDuration(duration)
// if err == nil {
// return uint(t.Milliseconds())
// }
// }
// return def
// }
// s.influxRetryInterval = toUint(s.config.InfluxRetryInterval, s.influxRetryInterval)
// s.influxMaxRetryTime = toUint(s.config.InfluxMaxRetryTime, s.influxMaxRetryTime)
if len(s.config.FlushDelay) > 0 {
t, err := time.ParseDuration(s.config.FlushDelay)
if err == nil {
return uint(t.Milliseconds())
s.flushDelay = t
}
return def
}
s.influxRetryInterval = toUint(s.config.InfluxRetryInterval, s.influxRetryInterval)
s.influxMaxRetryTime = toUint(s.config.InfluxMaxRetryTime, s.influxMaxRetryTime)
s.batch = make([]*write.Point, 0, s.config.BatchSize)
// Connect to InfluxDB server
if err := s.connect(); err != nil {
return nil, fmt.Errorf("unable to connect: %v", err)
}
s.statsSentMetrics = 0
s.statsProcessedMetrics = 0
return s, nil
}

View File

@@ -17,10 +17,8 @@ The `influxdb` sink uses the official [InfluxDB golang client](https://pkg.go.de
"password" : "examplepw",
"organization": "myorg",
"ssl": true,
"retry_interval" : "1s",
"retry_exponential_base" : 2,
"max_retries": 20,
"max_retry_time" : "168h"
"flush_delay" : "1s",
"batch_size" : 100
}
}
```
@@ -34,9 +32,6 @@ The `influxdb` sink uses the official [InfluxDB golang client](https://pkg.go.de
- `password`: Password for basic authentification
- `organization`: Organization in the InfluxDB
- `ssl`: Use SSL connection
- `retry_interval`: Base retry interval for failed write requests, default 1s
- `retry_exponential_base`: The retry interval is exponentially increased with this base, default 2
- `max_retries`: Maximal number of retry attempts
- `max_retry_time`: Maximal time to retry failed writes, default 168h (one week)
- `flush_delay`: Group metrics coming in to a single batch
- `batch_size`: Maximal batch size
For information about the calculation of the retry interval settings, see [offical influxdb-client-go documentation](https://github.com/influxdata/influxdb-client-go#handling-of-failed-async-writes)

View File

@@ -73,6 +73,7 @@ import (
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
"github.com/NVIDIA/go-nvml/pkg/dl"
)
@@ -102,11 +103,12 @@ type LibgangliaSinkConfig struct {
type LibgangliaSink struct {
sink
config LibgangliaSinkConfig
global_context C.Ganglia_pool
gmond_config C.Ganglia_gmond_config
send_channels C.Ganglia_udp_send_channels
cstrCache map[string]*C.char
config LibgangliaSinkConfig
global_context C.Ganglia_pool
gmond_config C.Ganglia_gmond_config
send_channels C.Ganglia_udp_send_channels
cstrCache map[string]*C.char
statsSentMetrics int64
}
func (s *LibgangliaSink) Write(point lp.CCMetric) error {
@@ -202,6 +204,8 @@ func (s *LibgangliaSink) Write(point lp.CCMetric) error {
C.Ganglia_metric_destroy(gmetric)
// Free the value C string, the only one not stored in the cache
C.free(unsafe.Pointer(c_value))
s.statsSentMetrics++
stats.ComponentStatInt(s.name, "sent_metrics", s.statsSentMetrics)
return err
}
@@ -247,7 +251,7 @@ func NewLibgangliaSink(name string, config json.RawMessage) (Sink, error) {
if err != nil {
return nil, fmt.Errorf("error opening %s: %v", s.config.GangliaLib, err)
}
s.statsSentMetrics = 0
// Set up cache for the C strings
s.cstrCache = make(map[string]*C.char)
// s.cstrCache["globals"] = C.CString("globals")

View File

@@ -11,6 +11,7 @@ import (
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
"github.com/gorilla/mux"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
@@ -29,11 +30,12 @@ type PrometheusSinkConfig struct {
type PrometheusSink struct {
sink
config PrometheusSinkConfig
labelMetrics map[string]*prometheus.GaugeVec
nodeMetrics map[string]prometheus.Gauge
promWg sync.WaitGroup
promServer *http.Server
config PrometheusSinkConfig
labelMetrics map[string]*prometheus.GaugeVec
nodeMetrics map[string]prometheus.Gauge
promWg sync.WaitGroup
promServer *http.Server
statsSentMetrics int64
}
func intToFloat64(input interface{}) (float64, error) {
@@ -113,6 +115,8 @@ func (s *PrometheusSink) newMetric(metric lp.CCMetric) error {
s.nodeMetrics[name] = new
prometheus.Register(new)
}
s.statsSentMetrics++
stats.ComponentStatInt(s.name, "sent_metrics", s.statsSentMetrics)
return nil
}
@@ -146,6 +150,8 @@ func (s *PrometheusSink) updateMetric(metric lp.CCMetric) error {
}
s.nodeMetrics[name].Set(value)
}
s.statsSentMetrics++
stats.ComponentStatInt(s.name, "sent_metrics", s.statsSentMetrics)
return nil
}

View File

@@ -7,6 +7,7 @@ import (
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
)
type SampleSinkConfig struct {
@@ -14,12 +15,15 @@ type SampleSinkConfig struct {
// See: metricSink.go
defaultSinkConfig
// Additional config options, for SampleSink
}
type SampleSink struct {
// declares elements 'name' and 'meta_as_tags' (string to bool map!)
sink
config SampleSinkConfig // entry point to the SampleSinkConfig
// Stats counters
statsSentMetrics int64
}
// Implement functions required for Sink interface
@@ -30,6 +34,8 @@ type SampleSink struct {
func (s *SampleSink) Write(point lp.CCMetric) error {
// based on s.meta_as_tags use meta infos as tags
log.Print(point)
s.statsSentMetrics++
stats.ComponentStatInt(s.name, "sent_metrics", s.statsSentMetrics)
return nil
}
@@ -63,6 +69,9 @@ func NewSampleSink(name string, config json.RawMessage) (Sink, error) {
}
}
// Initalize stats counters
s.statsSentMetrics = 0
// Create lookup map to use meta infos as tags in the output metric
s.meta_as_tags = make(map[string]bool)
for _, k := range s.config.MetaAsTags {

View File

@@ -102,13 +102,19 @@ func (sm *sinkManager) Start() {
}
toTheSinks := func(p lp.CCMetric) {
var wg sync.WaitGroup
// Send received metric to all outputs
cclog.ComponentDebug("SinkManager", "WRITE", p)
for _, s := range sm.sinks {
if err := s.Write(p); err != nil {
cclog.ComponentError("SinkManager", "WRITE", s.Name(), "write failed:", err.Error())
}
wg.Add(1)
go func(s Sink) {
if err := s.Write(p); err != nil {
cclog.ComponentError("SinkManager", "WRITE", s.Name(), "write failed:", err.Error())
}
wg.Done()
}(s)
}
wg.Wait()
}
for {

View File

@@ -8,6 +8,7 @@ import (
// "time"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
)
type StdoutSink struct {
@@ -17,6 +18,7 @@ type StdoutSink struct {
defaultSinkConfig
Output string `json:"output_file,omitempty"`
}
sentMetrics int64
}
func (s *StdoutSink) Write(m lp.CCMetric) error {
@@ -24,6 +26,8 @@ func (s *StdoutSink) Write(m lp.CCMetric) error {
s.output,
m.ToLineProtocol(s.meta_as_tags),
)
s.sentMetrics++
stats.ComponentStatInt(s.name, "sent_metrics", s.sentMetrics)
return nil
}
@@ -68,6 +72,7 @@ func NewStdoutSink(name string, config json.RawMessage) (Sink, error) {
for _, k := range s.config.MetaAsTags {
s.meta_as_tags[k] = true
}
s.sentMetrics = 0
return s, nil
}