mirror of
https://github.com/ClusterCockpit/cc-metric-collector.git
synced 2025-07-20 20:01:40 +02:00
Compare commits
28 Commits
v0.4
...
http_stats
Author | SHA1 | Date | |
---|---|---|---|
|
9dd6ff1a76 | ||
|
257b4a64b5 | ||
|
5eeb097136 | ||
|
4a4992877c | ||
|
9447685a69 | ||
|
28348bd108 | ||
|
a3b9d8a90b | ||
|
7e43e9171e | ||
|
5d25a7bf12 | ||
|
83b4343310 | ||
|
f1d3cabdc6 | ||
|
2a014b6fba | ||
|
50479f9325 | ||
|
e0e91844bc | ||
|
296225f3a8 | ||
|
43bcce6fb5 | ||
|
622e94ae0e | ||
|
c506114480 | ||
|
657543dded | ||
|
beebcd7145 | ||
|
082eea525a | ||
|
2b8266d1d2 | ||
|
d835724d93 | ||
|
c5082bbffe | ||
|
4c1263312b | ||
|
940623585c | ||
|
87ecb12c6f | ||
|
ae64eddcc8 |
@@ -20,6 +20,7 @@ There is a main configuration file with basic settings that point to the other c
|
||||
"collectors" : "collectors.json",
|
||||
"receivers" : "receivers.json",
|
||||
"router" : "router.json",
|
||||
"stats_api" : "api.json",
|
||||
"interval": 10,
|
||||
"duration": 1
|
||||
}
|
||||
@@ -32,6 +33,7 @@ See the component READMEs for their configuration:
|
||||
* [`sinks`](./sinks/README.md)
|
||||
* [`receivers`](./receivers/README.md)
|
||||
* [`router`](./internal/metricRouter/README.md)
|
||||
* [`stats_api`](./internal/metricRouter/StatsApi.md)
|
||||
|
||||
|
||||
# Installation
|
||||
|
@@ -28,6 +28,7 @@ type CentralConfigFile struct {
|
||||
RouterConfigFile string `json:"router"`
|
||||
SinkConfigFile string `json:"sinks"`
|
||||
ReceiverConfigFile string `json:"receivers,omitempty"`
|
||||
StatsApiConfigFile string `json:"stats_api,omitempty"`
|
||||
}
|
||||
|
||||
func LoadCentralConfiguration(file string, config *CentralConfigFile) error {
|
||||
@@ -52,6 +53,7 @@ type RuntimeConfig struct {
|
||||
CollectManager collectors.CollectorManager
|
||||
SinkManager sinks.SinkManager
|
||||
ReceiveManager receivers.ReceiveManager
|
||||
StatsApi mr.StatsApi
|
||||
MultiChanTicker mct.MultiChanTicker
|
||||
|
||||
Channels []chan lp.CCMetric
|
||||
@@ -152,11 +154,16 @@ func shutdownHandler(config *RuntimeConfig, shutdownSignal chan os.Signal) {
|
||||
cclog.Debug("Shutdown SinkManager...")
|
||||
config.SinkManager.Close()
|
||||
}
|
||||
if config.StatsApi != nil {
|
||||
cclog.Debug("Shutdown StatsApi...")
|
||||
config.StatsApi.Close()
|
||||
}
|
||||
}
|
||||
|
||||
func mainFunc() int {
|
||||
var err error
|
||||
use_recv := false
|
||||
use_api := false
|
||||
|
||||
// Initialize runtime configuration
|
||||
rcfg := RuntimeConfig{
|
||||
@@ -164,6 +171,7 @@ func mainFunc() int {
|
||||
CollectManager: nil,
|
||||
SinkManager: nil,
|
||||
ReceiveManager: nil,
|
||||
StatsApi: nil,
|
||||
CliArgs: ReadCli(),
|
||||
}
|
||||
|
||||
@@ -253,6 +261,16 @@ func mainFunc() int {
|
||||
use_recv = true
|
||||
}
|
||||
|
||||
// Create new statistics API manager
|
||||
if len(rcfg.ConfigFile.StatsApiConfigFile) > 0 {
|
||||
rcfg.StatsApi, err = mr.NewStatsApi(rcfg.MultiChanTicker, &rcfg.Sync, rcfg.ConfigFile.StatsApiConfigFile)
|
||||
if err != nil {
|
||||
cclog.Error(err.Error())
|
||||
return 1
|
||||
}
|
||||
use_api = true
|
||||
}
|
||||
|
||||
// Create shutdown handler
|
||||
shutdownSignal := make(chan os.Signal, 1)
|
||||
signal.Notify(shutdownSignal, os.Interrupt)
|
||||
@@ -260,6 +278,11 @@ func mainFunc() int {
|
||||
rcfg.Sync.Add(1)
|
||||
go shutdownHandler(&rcfg, shutdownSignal)
|
||||
|
||||
// Start the stats api early to be prepared for init settings
|
||||
if use_api {
|
||||
rcfg.StatsApi.Start()
|
||||
}
|
||||
|
||||
// Start the managers
|
||||
rcfg.MetricRouter.Start()
|
||||
rcfg.SinkManager.Start()
|
||||
|
@@ -16,6 +16,7 @@ import (
|
||||
|
||||
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
|
||||
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
|
||||
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
|
||||
)
|
||||
|
||||
const DEFAULT_BEEGFS_CMD = "beegfs-ctl"
|
||||
@@ -33,6 +34,7 @@ type BeegfsMetaCollector struct {
|
||||
matches map[string]string
|
||||
config BeegfsMetaCollectorConfig
|
||||
skipFS map[string]struct{}
|
||||
statsProcessedMetrics int64
|
||||
}
|
||||
|
||||
func (m *BeegfsMetaCollector) Init(config json.RawMessage) error {
|
||||
@@ -105,6 +107,7 @@ func (m *BeegfsMetaCollector) Init(config json.RawMessage) error {
|
||||
if err != nil {
|
||||
return fmt.Errorf("BeegfsMetaCollector.Init(): Failed to find beegfs-ctl binary '%s': %v", m.config.Beegfs, err)
|
||||
}
|
||||
m.statsProcessedMetrics = 0
|
||||
m.init = true
|
||||
return nil
|
||||
}
|
||||
@@ -218,10 +221,12 @@ func (m *BeegfsMetaCollector) Read(interval time.Duration, output chan lp.CCMetr
|
||||
y, err := lp.New(key, m.tags, m.meta, map[string]interface{}{"value": value}, time.Now())
|
||||
if err == nil {
|
||||
output <- y
|
||||
m.statsProcessedMetrics++
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
|
||||
}
|
||||
|
||||
func (m *BeegfsMetaCollector) Close() {
|
||||
|
@@ -16,6 +16,7 @@ import (
|
||||
|
||||
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
|
||||
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
|
||||
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
|
||||
)
|
||||
|
||||
// Struct for the collector-specific JSON config
|
||||
@@ -31,6 +32,7 @@ type BeegfsStorageCollector struct {
|
||||
matches map[string]string
|
||||
config BeegfsStorageCollectorConfig
|
||||
skipFS map[string]struct{}
|
||||
statsProcessedMetrics int64
|
||||
}
|
||||
|
||||
func (m *BeegfsStorageCollector) Init(config json.RawMessage) error {
|
||||
@@ -98,6 +100,7 @@ func (m *BeegfsStorageCollector) Init(config json.RawMessage) error {
|
||||
if err != nil {
|
||||
return fmt.Errorf("BeegfsStorageCollector.Init(): Failed to find beegfs-ctl binary '%s': %v", m.config.Beegfs, err)
|
||||
}
|
||||
m.statsProcessedMetrics = 0
|
||||
m.init = true
|
||||
return nil
|
||||
}
|
||||
@@ -210,10 +213,12 @@ func (m *BeegfsStorageCollector) Read(interval time.Duration, output chan lp.CCM
|
||||
y, err := lp.New(key, m.tags, m.meta, map[string]interface{}{"value": value}, time.Now())
|
||||
if err == nil {
|
||||
output <- y
|
||||
m.statsProcessedMetrics++
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
|
||||
}
|
||||
|
||||
func (m *BeegfsStorageCollector) Close() {
|
||||
|
@@ -12,6 +12,7 @@ import (
|
||||
|
||||
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
|
||||
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
|
||||
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
|
||||
)
|
||||
|
||||
//
|
||||
@@ -37,6 +38,7 @@ type CPUFreqCpuInfoCollectorTopology struct {
|
||||
type CPUFreqCpuInfoCollector struct {
|
||||
metricCollector
|
||||
topology []*CPUFreqCpuInfoCollectorTopology
|
||||
statsProcessedMetrics int64
|
||||
}
|
||||
|
||||
func (m *CPUFreqCpuInfoCollector) Init(config json.RawMessage) error {
|
||||
@@ -155,7 +157,7 @@ func (m *CPUFreqCpuInfoCollector) Init(config json.RawMessage) error {
|
||||
"package_id": t.physicalPackageID,
|
||||
}
|
||||
}
|
||||
|
||||
m.statsProcessedMetrics = 0
|
||||
m.init = true
|
||||
return nil
|
||||
}
|
||||
@@ -196,6 +198,7 @@ func (m *CPUFreqCpuInfoCollector) Read(interval time.Duration, output chan lp.CC
|
||||
return
|
||||
}
|
||||
if y, err := lp.New("cpufreq", t.tagSet, m.meta, map[string]interface{}{"value": value}, now); err == nil {
|
||||
m.statsProcessedMetrics++
|
||||
output <- y
|
||||
}
|
||||
}
|
||||
@@ -203,6 +206,7 @@ func (m *CPUFreqCpuInfoCollector) Read(interval time.Duration, output chan lp.CC
|
||||
}
|
||||
}
|
||||
}
|
||||
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
|
||||
}
|
||||
|
||||
func (m *CPUFreqCpuInfoCollector) Close() {
|
||||
|
@@ -11,6 +11,7 @@ import (
|
||||
|
||||
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
|
||||
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
|
||||
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
|
||||
"golang.org/x/sys/unix"
|
||||
)
|
||||
|
||||
@@ -40,6 +41,7 @@ type CPUFreqCollectorTopology struct {
|
||||
type CPUFreqCollector struct {
|
||||
metricCollector
|
||||
topology []CPUFreqCollectorTopology
|
||||
statsProcessedMetrics int64
|
||||
config struct {
|
||||
ExcludeMetrics []string `json:"exclude_metrics,omitempty"`
|
||||
}
|
||||
@@ -166,7 +168,7 @@ func (m *CPUFreqCollector) Init(config json.RawMessage) error {
|
||||
"package_id": t.physicalPackageID,
|
||||
}
|
||||
}
|
||||
|
||||
m.statsProcessedMetrics = 0
|
||||
m.init = true
|
||||
return nil
|
||||
}
|
||||
@@ -203,9 +205,11 @@ func (m *CPUFreqCollector) Read(interval time.Duration, output chan lp.CCMetric)
|
||||
}
|
||||
|
||||
if y, err := lp.New("cpufreq", t.tagSet, m.meta, map[string]interface{}{"value": cpuFreq}, now); err == nil {
|
||||
m.statsProcessedMetrics++
|
||||
output <- y
|
||||
}
|
||||
}
|
||||
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
|
||||
}
|
||||
|
||||
func (m *CPUFreqCollector) Close() {
|
||||
|
@@ -11,6 +11,7 @@ import (
|
||||
|
||||
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
|
||||
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
|
||||
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
|
||||
)
|
||||
|
||||
const CPUSTATFILE = `/proc/stat`
|
||||
@@ -25,6 +26,7 @@ type CpustatCollector struct {
|
||||
matches map[string]int
|
||||
cputags map[string]map[string]string
|
||||
nodetags map[string]string
|
||||
statsProcessedMetrics int64
|
||||
}
|
||||
|
||||
func (m *CpustatCollector) Init(config json.RawMessage) error {
|
||||
@@ -86,6 +88,7 @@ func (m *CpustatCollector) Init(config json.RawMessage) error {
|
||||
num_cpus++
|
||||
}
|
||||
}
|
||||
m.statsProcessedMetrics = 0
|
||||
m.init = true
|
||||
return nil
|
||||
}
|
||||
@@ -106,6 +109,7 @@ func (m *CpustatCollector) parseStatLine(linefields []string, tags map[string]st
|
||||
for name, value := range values {
|
||||
y, err := lp.New(name, tags, m.meta, map[string]interface{}{"value": (value * 100.0) / total}, t)
|
||||
if err == nil {
|
||||
m.statsProcessedMetrics++
|
||||
output <- y
|
||||
}
|
||||
}
|
||||
@@ -141,8 +145,10 @@ func (m *CpustatCollector) Read(interval time.Duration, output chan lp.CCMetric)
|
||||
time.Now(),
|
||||
)
|
||||
if err == nil {
|
||||
m.statsProcessedMetrics++
|
||||
output <- num_cpus_metric
|
||||
}
|
||||
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
|
||||
}
|
||||
|
||||
func (m *CpustatCollector) Close() {
|
||||
|
@@ -10,6 +10,7 @@ import (
|
||||
"time"
|
||||
|
||||
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
|
||||
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
|
||||
influx "github.com/influxdata/line-protocol"
|
||||
)
|
||||
|
||||
@@ -28,6 +29,9 @@ type CustomCmdCollector struct {
|
||||
config CustomCmdCollectorConfig
|
||||
commands []string
|
||||
files []string
|
||||
statsProcessedMetrics int64
|
||||
statsProcessedCommands int64
|
||||
statsProcessedFiles int64
|
||||
}
|
||||
|
||||
func (m *CustomCmdCollector) Init(config json.RawMessage) error {
|
||||
@@ -66,6 +70,9 @@ func (m *CustomCmdCollector) Init(config json.RawMessage) error {
|
||||
m.handler = influx.NewMetricHandler()
|
||||
m.parser = influx.NewParser(m.handler)
|
||||
m.parser.SetTimeFunc(DefaultTime)
|
||||
m.statsProcessedMetrics = 0
|
||||
m.statsProcessedFiles = 0
|
||||
m.statsProcessedCommands = 0
|
||||
m.init = true
|
||||
return nil
|
||||
}
|
||||
@@ -100,9 +107,13 @@ func (m *CustomCmdCollector) Read(interval time.Duration, output chan lp.CCMetri
|
||||
|
||||
y := lp.FromInfluxMetric(c)
|
||||
if err == nil {
|
||||
m.statsProcessedMetrics++
|
||||
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
|
||||
output <- y
|
||||
}
|
||||
}
|
||||
m.statsProcessedCommands++
|
||||
stats.ComponentStatInt(m.name, "processed_commands", m.statsProcessedCommands)
|
||||
}
|
||||
for _, file := range m.files {
|
||||
buffer, err := ioutil.ReadFile(file)
|
||||
@@ -122,9 +133,13 @@ func (m *CustomCmdCollector) Read(interval time.Duration, output chan lp.CCMetri
|
||||
}
|
||||
y := lp.FromInfluxMetric(f)
|
||||
if err == nil {
|
||||
m.statsProcessedMetrics++
|
||||
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
|
||||
output <- y
|
||||
}
|
||||
}
|
||||
m.statsProcessedFiles++
|
||||
stats.ComponentStatInt(m.name, "processed_files", m.statsProcessedFiles)
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -11,6 +11,7 @@ import (
|
||||
|
||||
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
|
||||
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
|
||||
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
|
||||
)
|
||||
|
||||
// "log"
|
||||
@@ -23,9 +24,8 @@ type DiskstatCollectorConfig struct {
|
||||
|
||||
type DiskstatCollector struct {
|
||||
metricCollector
|
||||
//matches map[string]int
|
||||
config IOstatCollectorConfig
|
||||
//devices map[string]IOstatCollectorEntry
|
||||
config DiskstatCollectorConfig
|
||||
statsProcessedMetrics int64
|
||||
}
|
||||
|
||||
func (m *DiskstatCollector) Init(config json.RawMessage) error {
|
||||
@@ -44,6 +44,7 @@ func (m *DiskstatCollector) Init(config json.RawMessage) error {
|
||||
return err
|
||||
}
|
||||
defer file.Close()
|
||||
m.statsProcessedMetrics = 0
|
||||
m.init = true
|
||||
return nil
|
||||
}
|
||||
@@ -89,12 +90,16 @@ func (m *DiskstatCollector) Read(interval time.Duration, output chan lp.CCMetric
|
||||
y, err := lp.New("disk_total", tags, m.meta, map[string]interface{}{"value": total}, time.Now())
|
||||
if err == nil {
|
||||
y.AddMeta("unit", "GBytes")
|
||||
m.statsProcessedMetrics++
|
||||
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
|
||||
output <- y
|
||||
}
|
||||
free := (stat.Bfree * uint64(stat.Bsize)) / uint64(1000000000)
|
||||
y, err = lp.New("disk_free", tags, m.meta, map[string]interface{}{"value": free}, time.Now())
|
||||
if err == nil {
|
||||
y.AddMeta("unit", "GBytes")
|
||||
m.statsProcessedMetrics++
|
||||
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
|
||||
output <- y
|
||||
}
|
||||
perc := (100 * (total - free)) / total
|
||||
@@ -105,6 +110,8 @@ func (m *DiskstatCollector) Read(interval time.Duration, output chan lp.CCMetric
|
||||
y, err := lp.New("part_max_used", map[string]string{"type": "node"}, m.meta, map[string]interface{}{"value": int(part_max_used)}, time.Now())
|
||||
if err == nil {
|
||||
y.AddMeta("unit", "percent")
|
||||
m.statsProcessedMetrics++
|
||||
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
|
||||
output <- y
|
||||
}
|
||||
}
|
||||
|
@@ -15,6 +15,7 @@ import (
|
||||
|
||||
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
|
||||
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
|
||||
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
|
||||
)
|
||||
|
||||
const DEFAULT_GPFS_CMD = "mmpmon"
|
||||
@@ -35,6 +36,7 @@ type GpfsCollector struct {
|
||||
skipFS map[string]struct{}
|
||||
lastTimestamp time.Time // Store time stamp of last tick to derive bandwidths
|
||||
lastState map[string]GpfsCollectorLastState
|
||||
statsProcessedMetrics int64
|
||||
}
|
||||
|
||||
func (m *GpfsCollector) Init(config json.RawMessage) error {
|
||||
@@ -86,7 +88,7 @@ func (m *GpfsCollector) Init(config json.RawMessage) error {
|
||||
return fmt.Errorf("failed to find mmpmon binary '%s': %v", m.config.Mmpmon, err)
|
||||
}
|
||||
m.config.Mmpmon = p
|
||||
|
||||
m.statsProcessedMetrics = 0
|
||||
m.init = true
|
||||
return nil
|
||||
}
|
||||
@@ -211,12 +213,14 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) {
|
||||
}
|
||||
if y, err := lp.New("gpfs_bytes_read", m.tags, m.meta, map[string]interface{}{"value": bytesRead}, timestamp); err == nil {
|
||||
output <- y
|
||||
m.statsProcessedMetrics++
|
||||
}
|
||||
if m.config.SendBandwidths {
|
||||
if lastBytesRead := m.lastState[filesystem].bytesRead; lastBytesRead >= 0 {
|
||||
bwRead := float64(bytesRead-lastBytesRead) / timeDiff
|
||||
if y, err := lp.New("gpfs_bw_read", m.tags, m.meta, map[string]interface{}{"value": bwRead}, timestamp); err == nil {
|
||||
output <- y
|
||||
m.statsProcessedMetrics++
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -231,12 +235,14 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) {
|
||||
}
|
||||
if y, err := lp.New("gpfs_bytes_written", m.tags, m.meta, map[string]interface{}{"value": bytesWritten}, timestamp); err == nil {
|
||||
output <- y
|
||||
m.statsProcessedMetrics++
|
||||
}
|
||||
if m.config.SendBandwidths {
|
||||
if lastBytesWritten := m.lastState[filesystem].bytesRead; lastBytesWritten >= 0 {
|
||||
bwWrite := float64(bytesWritten-lastBytesWritten) / timeDiff
|
||||
if y, err := lp.New("gpfs_bw_write", m.tags, m.meta, map[string]interface{}{"value": bwWrite}, timestamp); err == nil {
|
||||
output <- y
|
||||
m.statsProcessedMetrics++
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -258,6 +264,7 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) {
|
||||
}
|
||||
if y, err := lp.New("gpfs_num_opens", m.tags, m.meta, map[string]interface{}{"value": numOpens}, timestamp); err == nil {
|
||||
output <- y
|
||||
m.statsProcessedMetrics++
|
||||
}
|
||||
|
||||
// number of closes
|
||||
@@ -270,6 +277,7 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) {
|
||||
}
|
||||
if y, err := lp.New("gpfs_num_closes", m.tags, m.meta, map[string]interface{}{"value": numCloses}, timestamp); err == nil {
|
||||
output <- y
|
||||
m.statsProcessedMetrics++
|
||||
}
|
||||
|
||||
// number of reads
|
||||
@@ -282,6 +290,7 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) {
|
||||
}
|
||||
if y, err := lp.New("gpfs_num_reads", m.tags, m.meta, map[string]interface{}{"value": numReads}, timestamp); err == nil {
|
||||
output <- y
|
||||
m.statsProcessedMetrics++
|
||||
}
|
||||
|
||||
// number of writes
|
||||
@@ -294,6 +303,7 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) {
|
||||
}
|
||||
if y, err := lp.New("gpfs_num_writes", m.tags, m.meta, map[string]interface{}{"value": numWrites}, timestamp); err == nil {
|
||||
output <- y
|
||||
m.statsProcessedMetrics++
|
||||
}
|
||||
|
||||
// number of read directories
|
||||
@@ -306,6 +316,7 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) {
|
||||
}
|
||||
if y, err := lp.New("gpfs_num_readdirs", m.tags, m.meta, map[string]interface{}{"value": numReaddirs}, timestamp); err == nil {
|
||||
output <- y
|
||||
m.statsProcessedMetrics++
|
||||
}
|
||||
|
||||
// Number of inode updates
|
||||
@@ -317,9 +328,11 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) {
|
||||
continue
|
||||
}
|
||||
if y, err := lp.New("gpfs_num_inode_updates", m.tags, m.meta, map[string]interface{}{"value": numInodeUpdates}, timestamp); err == nil {
|
||||
m.statsProcessedMetrics++
|
||||
output <- y
|
||||
}
|
||||
}
|
||||
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
|
||||
}
|
||||
|
||||
func (m *GpfsCollector) Close() {
|
||||
|
@@ -7,6 +7,7 @@ import (
|
||||
|
||||
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
|
||||
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
|
||||
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
|
||||
"golang.org/x/sys/unix"
|
||||
|
||||
"encoding/json"
|
||||
@@ -18,11 +19,16 @@ import (
|
||||
|
||||
const IB_BASEPATH = "/sys/class/infiniband/"
|
||||
|
||||
type InfinibandCollectorMetric struct {
|
||||
path string
|
||||
unit string
|
||||
}
|
||||
|
||||
type InfinibandCollectorInfo struct {
|
||||
LID string // IB local Identifier (LID)
|
||||
device string // IB device
|
||||
port string // IB device port
|
||||
portCounterFiles map[string]string // mapping counter name -> sysfs file
|
||||
portCounterFiles map[string]InfinibandCollectorMetric // mapping counter name -> InfinibandCollectorMetric
|
||||
tagSet map[string]string // corresponding tag list
|
||||
lastState map[string]int64 // State from last measurement
|
||||
}
|
||||
@@ -36,6 +42,7 @@ type InfinibandCollector struct {
|
||||
}
|
||||
info []*InfinibandCollectorInfo
|
||||
lastTimestamp time.Time // Store time stamp of last tick to derive bandwidths
|
||||
statsProcessedMetrics int64
|
||||
}
|
||||
|
||||
// Init initializes the Infiniband collector by walking through files below IB_BASEPATH
|
||||
@@ -106,16 +113,16 @@ func (m *InfinibandCollector) Init(config json.RawMessage) error {
|
||||
|
||||
// Check access to counter files
|
||||
countersDir := filepath.Join(path, "counters")
|
||||
portCounterFiles := map[string]string{
|
||||
"ib_recv": filepath.Join(countersDir, "port_rcv_data"),
|
||||
"ib_xmit": filepath.Join(countersDir, "port_xmit_data"),
|
||||
"ib_recv_pkts": filepath.Join(countersDir, "port_rcv_packets"),
|
||||
"ib_xmit_pkts": filepath.Join(countersDir, "port_xmit_packets"),
|
||||
portCounterFiles := map[string]InfinibandCollectorMetric{
|
||||
"ib_recv": {path: filepath.Join(countersDir, "port_rcv_data"), unit: "bytes"},
|
||||
"ib_xmit": {path: filepath.Join(countersDir, "port_xmit_data"), unit: "bytes"},
|
||||
"ib_recv_pkts": {path: filepath.Join(countersDir, "port_rcv_packets"), unit: "packets"},
|
||||
"ib_xmit_pkts": {path: filepath.Join(countersDir, "port_xmit_packets"), unit: "packets"},
|
||||
}
|
||||
for _, counterFile := range portCounterFiles {
|
||||
err := unix.Access(counterFile, unix.R_OK)
|
||||
for _, counter := range portCounterFiles {
|
||||
err := unix.Access(counter.path, unix.R_OK)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to access %s: %v", counterFile, err)
|
||||
return fmt.Errorf("unable to access %s: %v", counter.path, err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -144,7 +151,7 @@ func (m *InfinibandCollector) Init(config json.RawMessage) error {
|
||||
if len(m.info) == 0 {
|
||||
return fmt.Errorf("found no IB devices")
|
||||
}
|
||||
|
||||
m.statsProcessedMetrics = 0
|
||||
m.init = true
|
||||
return nil
|
||||
}
|
||||
@@ -165,14 +172,14 @@ func (m *InfinibandCollector) Read(interval time.Duration, output chan lp.CCMetr
|
||||
m.lastTimestamp = now
|
||||
|
||||
for _, info := range m.info {
|
||||
for counterName, counterFile := range info.portCounterFiles {
|
||||
for counterName, counterDef := range info.portCounterFiles {
|
||||
|
||||
// Read counter file
|
||||
line, err := ioutil.ReadFile(counterFile)
|
||||
line, err := ioutil.ReadFile(counterDef.path)
|
||||
if err != nil {
|
||||
cclog.ComponentError(
|
||||
m.name,
|
||||
fmt.Sprintf("Read(): Failed to read from file '%s': %v", counterFile, err))
|
||||
fmt.Sprintf("Read(): Failed to read from file '%s': %v", counterDef.path, err))
|
||||
continue
|
||||
}
|
||||
data := strings.TrimSpace(string(line))
|
||||
@@ -189,7 +196,9 @@ func (m *InfinibandCollector) Read(interval time.Duration, output chan lp.CCMetr
|
||||
// Send absolut values
|
||||
if m.config.SendAbsoluteValues {
|
||||
if y, err := lp.New(counterName, info.tagSet, m.meta, map[string]interface{}{"value": v}, now); err == nil {
|
||||
y.AddMeta("unit", counterDef.unit)
|
||||
output <- y
|
||||
m.statsProcessedMetrics++
|
||||
}
|
||||
}
|
||||
|
||||
@@ -198,7 +207,9 @@ func (m *InfinibandCollector) Read(interval time.Duration, output chan lp.CCMetr
|
||||
if info.lastState[counterName] >= 0 {
|
||||
rate := float64((v - info.lastState[counterName])) / timeDiff
|
||||
if y, err := lp.New(counterName+"_bw", info.tagSet, m.meta, map[string]interface{}{"value": rate}, now); err == nil {
|
||||
y.AddMeta("unit", counterDef.unit+"/sec")
|
||||
output <- y
|
||||
m.statsProcessedMetrics++
|
||||
}
|
||||
}
|
||||
// Save current state
|
||||
@@ -207,6 +218,7 @@ func (m *InfinibandCollector) Read(interval time.Duration, output chan lp.CCMetr
|
||||
}
|
||||
|
||||
}
|
||||
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
|
||||
}
|
||||
|
||||
func (m *InfinibandCollector) Close() {
|
||||
|
@@ -6,6 +6,7 @@ import (
|
||||
|
||||
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
|
||||
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
|
||||
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
|
||||
|
||||
// "log"
|
||||
"encoding/json"
|
||||
@@ -32,6 +33,7 @@ type IOstatCollector struct {
|
||||
matches map[string]int
|
||||
config IOstatCollectorConfig
|
||||
devices map[string]IOstatCollectorEntry
|
||||
statsProcessedMetrics int64
|
||||
}
|
||||
|
||||
func (m *IOstatCollector) Init(config json.RawMessage) error {
|
||||
@@ -102,6 +104,7 @@ func (m *IOstatCollector) Init(config json.RawMessage) error {
|
||||
lastValues: values,
|
||||
}
|
||||
}
|
||||
m.statsProcessedMetrics = 0
|
||||
m.init = true
|
||||
return err
|
||||
}
|
||||
@@ -141,6 +144,7 @@ func (m *IOstatCollector) Read(interval time.Duration, output chan lp.CCMetric)
|
||||
y, err := lp.New(name, entry.tags, m.meta, map[string]interface{}{"value": int(diff)}, time.Now())
|
||||
if err == nil {
|
||||
output <- y
|
||||
m.statsProcessedMetrics++
|
||||
}
|
||||
}
|
||||
entry.lastValues[name] = x
|
||||
@@ -148,6 +152,7 @@ func (m *IOstatCollector) Read(interval time.Duration, output chan lp.CCMetric)
|
||||
}
|
||||
m.devices[device] = entry
|
||||
}
|
||||
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
|
||||
}
|
||||
|
||||
func (m *IOstatCollector) Close() {
|
||||
|
@@ -11,6 +11,7 @@ import (
|
||||
"time"
|
||||
|
||||
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
|
||||
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
|
||||
)
|
||||
|
||||
const IPMITOOL_PATH = `ipmitool`
|
||||
@@ -29,6 +30,7 @@ type IpmiCollector struct {
|
||||
config IpmiCollectorConfig
|
||||
ipmitool string
|
||||
ipmisensors string
|
||||
statsProcessedMetrics int64
|
||||
}
|
||||
|
||||
func (m *IpmiCollector) Init(config json.RawMessage) error {
|
||||
@@ -56,6 +58,7 @@ func (m *IpmiCollector) Init(config json.RawMessage) error {
|
||||
if len(m.ipmitool) == 0 && len(m.ipmisensors) == 0 {
|
||||
return errors.New("no IPMI reader found")
|
||||
}
|
||||
m.statsProcessedMetrics = 0
|
||||
m.init = true
|
||||
return nil
|
||||
}
|
||||
@@ -94,6 +97,7 @@ func (m *IpmiCollector) readIpmiTool(cmd string, output chan lp.CCMetric) {
|
||||
if err == nil {
|
||||
y.AddMeta("unit", unit)
|
||||
output <- y
|
||||
m.statsProcessedMetrics++
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -123,6 +127,7 @@ func (m *IpmiCollector) readIpmiSensors(cmd string, output chan lp.CCMetric) {
|
||||
y.AddMeta("unit", lv[4])
|
||||
}
|
||||
output <- y
|
||||
m.statsProcessedMetrics++
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -141,6 +146,7 @@ func (m *IpmiCollector) Read(interval time.Duration, output chan lp.CCMetric) {
|
||||
m.readIpmiSensors(m.config.IpmisensorsPath, output)
|
||||
}
|
||||
}
|
||||
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
|
||||
}
|
||||
|
||||
func (m *IpmiCollector) Close() {
|
||||
|
@@ -15,8 +15,12 @@ import (
|
||||
"io/ioutil"
|
||||
"math"
|
||||
"os"
|
||||
"os/signal"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"syscall"
|
||||
"time"
|
||||
"unsafe"
|
||||
|
||||
@@ -24,6 +28,7 @@ import (
|
||||
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
|
||||
topo "github.com/ClusterCockpit/cc-metric-collector/internal/ccTopology"
|
||||
agg "github.com/ClusterCockpit/cc-metric-collector/internal/metricAggregator"
|
||||
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
|
||||
"github.com/NVIDIA/go-nvml/pkg/dl"
|
||||
)
|
||||
|
||||
@@ -46,6 +51,16 @@ type LikwidCollectorEventsetConfig struct {
|
||||
Metrics []LikwidCollectorMetricConfig `json:"metrics"`
|
||||
}
|
||||
|
||||
type LikwidEventsetConfig struct {
|
||||
internal int
|
||||
gid C.int
|
||||
eorder []*C.char
|
||||
estr *C.char
|
||||
go_estr string
|
||||
results map[int]map[string]interface{}
|
||||
metrics map[int]map[string]float64
|
||||
}
|
||||
|
||||
type LikwidCollectorConfig struct {
|
||||
Eventsets []LikwidCollectorEventsetConfig `json:"eventsets"`
|
||||
Metrics []LikwidCollectorMetricConfig `json:"globalmetrics,omitempty"`
|
||||
@@ -64,11 +79,15 @@ type LikwidCollector struct {
|
||||
metrics map[C.int]map[string]int
|
||||
groups []C.int
|
||||
config LikwidCollectorConfig
|
||||
results map[int]map[int]map[string]interface{}
|
||||
mresults map[int]map[int]map[string]float64
|
||||
gmresults map[int]map[string]float64
|
||||
basefreq float64
|
||||
running bool
|
||||
initialized bool
|
||||
likwidGroups map[C.int]LikwidEventsetConfig
|
||||
lock sync.Mutex
|
||||
statsMeasurements int64
|
||||
statsProcessedMetrics int64
|
||||
statsPublishedMetrics int64
|
||||
}
|
||||
|
||||
type LikwidMetric struct {
|
||||
@@ -86,14 +105,60 @@ func eventsToEventStr(events map[string]string) string {
|
||||
return strings.Join(elist, ",")
|
||||
}
|
||||
|
||||
func genLikwidEventSet(input LikwidCollectorEventsetConfig) LikwidEventsetConfig {
|
||||
tmplist := make([]string, 0)
|
||||
clist := make([]string, 0)
|
||||
for k := range input.Events {
|
||||
clist = append(clist, k)
|
||||
}
|
||||
sort.Strings(clist)
|
||||
elist := make([]*C.char, 0)
|
||||
for _, k := range clist {
|
||||
v := input.Events[k]
|
||||
tmplist = append(tmplist, fmt.Sprintf("%s:%s", v, k))
|
||||
c_counter := C.CString(k)
|
||||
elist = append(elist, c_counter)
|
||||
}
|
||||
estr := strings.Join(tmplist, ",")
|
||||
res := make(map[int]map[string]interface{})
|
||||
met := make(map[int]map[string]float64)
|
||||
for _, i := range topo.CpuList() {
|
||||
res[i] = make(map[string]interface{})
|
||||
for k := range input.Events {
|
||||
res[i][k] = 0.0
|
||||
}
|
||||
met[i] = make(map[string]float64)
|
||||
for _, v := range input.Metrics {
|
||||
res[i][v.Name] = 0.0
|
||||
}
|
||||
}
|
||||
return LikwidEventsetConfig{
|
||||
gid: -1,
|
||||
eorder: elist,
|
||||
estr: C.CString(estr),
|
||||
go_estr: estr,
|
||||
results: res,
|
||||
metrics: met,
|
||||
}
|
||||
}
|
||||
|
||||
func testLikwidMetricFormula(formula string, params []string) bool {
|
||||
myparams := make(map[string]interface{})
|
||||
for _, p := range params {
|
||||
myparams[p] = float64(1.0)
|
||||
}
|
||||
_, err := agg.EvalFloat64Condition(formula, myparams)
|
||||
return err == nil
|
||||
}
|
||||
|
||||
func getBaseFreq() float64 {
|
||||
files := []string{
|
||||
"/sys/devices/system/cpu/cpu0/cpufreq/bios_limit",
|
||||
"/sys/devices/system/cpu/cpu0/cpufreq/base_frequency",
|
||||
}
|
||||
var freq float64 = math.NaN()
|
||||
C.power_init(0)
|
||||
info := C.get_powerInfo()
|
||||
if float64(info.baseFrequency) != 0 {
|
||||
freq = float64(info.baseFrequency) * 1e6
|
||||
} else {
|
||||
buffer, err := ioutil.ReadFile("/sys/devices/system/cpu/cpu0/cpufreq/bios_limit")
|
||||
for _, f := range files {
|
||||
buffer, err := ioutil.ReadFile(f)
|
||||
if err == nil {
|
||||
data := strings.Replace(string(buffer), "\n", "", -1)
|
||||
x, err := strconv.ParseInt(data, 0, 64)
|
||||
@@ -102,12 +167,22 @@ func getBaseFreq() float64 {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if math.IsNaN(freq) {
|
||||
C.power_init(0)
|
||||
info := C.get_powerInfo()
|
||||
if float64(info.baseFrequency) != 0 {
|
||||
freq = float64(info.baseFrequency) * 1e6
|
||||
}
|
||||
C.power_finalize()
|
||||
}
|
||||
return freq
|
||||
}
|
||||
|
||||
func (m *LikwidCollector) Init(config json.RawMessage) error {
|
||||
var ret C.int
|
||||
m.name = "LikwidCollector"
|
||||
m.initialized = false
|
||||
m.running = false
|
||||
m.config.AccessMode = LIKWID_DEF_ACCESSMODE
|
||||
m.config.LibraryPath = LIKWID_LIB_NAME
|
||||
if len(config) > 0 {
|
||||
@@ -131,7 +206,7 @@ func (m *LikwidCollector) Init(config json.RawMessage) error {
|
||||
}
|
||||
m.setup()
|
||||
|
||||
m.meta = map[string]string{"source": m.name, "group": "PerfCounter"}
|
||||
m.meta = map[string]string{"group": "PerfCounter"}
|
||||
cclog.ComponentDebug(m.name, "Get cpulist and init maps and lists")
|
||||
cpulist := topo.CpuList()
|
||||
m.cpulist = make([]C.int, len(cpulist))
|
||||
@@ -140,172 +215,138 @@ func (m *LikwidCollector) Init(config json.RawMessage) error {
|
||||
m.cpulist[i] = C.int(c)
|
||||
m.cpu2tid[c] = i
|
||||
}
|
||||
m.sock2tid = make(map[int]int)
|
||||
tmp := make([]C.int, 1)
|
||||
for _, sid := range topo.SocketList() {
|
||||
cstr := C.CString(fmt.Sprintf("S%d:0", sid))
|
||||
ret = C.cpustr_to_cpulist(cstr, &tmp[0], 1)
|
||||
if ret > 0 {
|
||||
m.sock2tid[sid] = m.cpu2tid[int(tmp[0])]
|
||||
}
|
||||
C.free(unsafe.Pointer(cstr))
|
||||
}
|
||||
m.results = make(map[int]map[int]map[string]interface{})
|
||||
m.mresults = make(map[int]map[int]map[string]float64)
|
||||
|
||||
m.likwidGroups = make(map[C.int]LikwidEventsetConfig)
|
||||
|
||||
// m.results = make(map[int]map[int]map[string]interface{})
|
||||
// m.mresults = make(map[int]map[int]map[string]float64)
|
||||
m.gmresults = make(map[int]map[string]float64)
|
||||
cclog.ComponentDebug(m.name, "initialize LIKWID topology")
|
||||
ret = C.topology_init()
|
||||
if ret != 0 {
|
||||
err := errors.New("failed to initialize LIKWID topology")
|
||||
cclog.ComponentError(m.name, err.Error())
|
||||
return err
|
||||
}
|
||||
|
||||
switch m.config.AccessMode {
|
||||
case "direct":
|
||||
C.HPMmode(0)
|
||||
case "accessdaemon":
|
||||
if len(m.config.DaemonPath) > 0 {
|
||||
p := os.Getenv("PATH")
|
||||
os.Setenv("PATH", m.config.DaemonPath+":"+p)
|
||||
}
|
||||
C.HPMmode(1)
|
||||
}
|
||||
|
||||
cclog.ComponentDebug(m.name, "initialize LIKWID perfmon module")
|
||||
ret = C.perfmon_init(C.int(len(m.cpulist)), &m.cpulist[0])
|
||||
if ret != 0 {
|
||||
C.topology_finalize()
|
||||
err := errors.New("failed to initialize LIKWID topology")
|
||||
cclog.ComponentError(m.name, err.Error())
|
||||
return err
|
||||
for _, tid := range m.cpu2tid {
|
||||
m.gmresults[tid] = make(map[string]float64)
|
||||
}
|
||||
|
||||
// This is for the global metrics computation test
|
||||
globalParams := make(map[string]interface{})
|
||||
globalParams["time"] = float64(1.0)
|
||||
globalParams["inverseClock"] = float64(1.0)
|
||||
// While adding the events, we test the metrics whether they can be computed at all
|
||||
for i, evset := range m.config.Eventsets {
|
||||
var gid C.int
|
||||
var cstr *C.char
|
||||
if len(evset.Events) > 0 {
|
||||
estr := eventsToEventStr(evset.Events)
|
||||
totalMetrics := 0
|
||||
// Generate parameter list for the metric computing test
|
||||
params := make(map[string]interface{})
|
||||
params["time"] = float64(1.0)
|
||||
params["inverseClock"] = float64(1.0)
|
||||
params := make([]string, 0)
|
||||
params = append(params, "time", "inverseClock")
|
||||
// Generate parameter list for the global metric computing test
|
||||
globalParams := make([]string, 0)
|
||||
globalParams = append(globalParams, "time", "inverseClock")
|
||||
// We test the eventset metrics whether they can be computed at all
|
||||
for _, evset := range m.config.Eventsets {
|
||||
if len(evset.Events) > 0 {
|
||||
params = params[:2]
|
||||
for counter := range evset.Events {
|
||||
params[counter] = float64(1.0)
|
||||
params = append(params, counter)
|
||||
}
|
||||
for _, metric := range evset.Metrics {
|
||||
// Try to evaluate the metric
|
||||
_, err := agg.EvalFloat64Condition(metric.Calc, params)
|
||||
if err != nil {
|
||||
cclog.ComponentError(m.name, "Calculation for metric", metric.Name, "failed:", err.Error())
|
||||
continue
|
||||
}
|
||||
// If the metric is not in the parameter list for the global metrics, add it
|
||||
if _, ok := globalParams[metric.Name]; !ok {
|
||||
globalParams[metric.Name] = float64(1.0)
|
||||
if testLikwidMetricFormula(metric.Calc, params) {
|
||||
// Add the computable metric to the parameter list for the global metrics
|
||||
globalParams = append(globalParams, metric.Name)
|
||||
totalMetrics++
|
||||
} else {
|
||||
metric.Calc = ""
|
||||
}
|
||||
}
|
||||
// Now we add the list of events to likwid
|
||||
cstr = C.CString(estr)
|
||||
gid = C.perfmon_addEventSet(cstr)
|
||||
} else {
|
||||
cclog.ComponentError(m.name, "Invalid Likwid eventset config, no events given")
|
||||
continue
|
||||
}
|
||||
if gid >= 0 {
|
||||
m.groups = append(m.groups, gid)
|
||||
}
|
||||
C.free(unsafe.Pointer(cstr))
|
||||
m.results[i] = make(map[int]map[string]interface{})
|
||||
m.mresults[i] = make(map[int]map[string]float64)
|
||||
for tid := range m.cpulist {
|
||||
m.results[i][tid] = make(map[string]interface{})
|
||||
m.mresults[i][tid] = make(map[string]float64)
|
||||
if i == 0 {
|
||||
m.gmresults[tid] = make(map[string]float64)
|
||||
}
|
||||
}
|
||||
}
|
||||
for _, metric := range m.config.Metrics {
|
||||
// Try to evaluate the global metric
|
||||
_, err := agg.EvalFloat64Condition(metric.Calc, globalParams)
|
||||
if err != nil {
|
||||
cclog.ComponentError(m.name, "Calculation for metric", metric.Name, "failed:", err.Error())
|
||||
continue
|
||||
if !testLikwidMetricFormula(metric.Calc, globalParams) {
|
||||
cclog.ComponentError(m.name, "Calculation for metric", metric.Name, "failed")
|
||||
metric.Calc = ""
|
||||
} else {
|
||||
totalMetrics++
|
||||
}
|
||||
}
|
||||
|
||||
// If no event set could be added, shut down LikwidCollector
|
||||
if len(m.groups) == 0 {
|
||||
C.perfmon_finalize()
|
||||
C.topology_finalize()
|
||||
err := errors.New("no LIKWID performance group initialized")
|
||||
if totalMetrics == 0 {
|
||||
err := errors.New("no LIKWID eventset or metric usable")
|
||||
cclog.ComponentError(m.name, err.Error())
|
||||
return err
|
||||
}
|
||||
m.basefreq = getBaseFreq()
|
||||
cclog.ComponentDebug(m.name, "BaseFreq", m.basefreq)
|
||||
m.statsMeasurements = 0
|
||||
m.statsProcessedMetrics = 0
|
||||
m.statsPublishedMetrics = 0
|
||||
m.init = true
|
||||
return nil
|
||||
}
|
||||
|
||||
// take a measurement for 'interval' seconds of event set index 'group'
|
||||
func (m *LikwidCollector) takeMeasurement(group int, interval time.Duration) error {
|
||||
func (m *LikwidCollector) takeMeasurement(evset LikwidEventsetConfig, interval time.Duration) (bool, error) {
|
||||
var ret C.int
|
||||
gid := m.groups[group]
|
||||
ret = C.perfmon_setupCounters(gid)
|
||||
|
||||
m.lock.Lock()
|
||||
if m.initialized {
|
||||
ret = C.perfmon_setupCounters(evset.gid)
|
||||
if ret != 0 {
|
||||
gctr := C.GoString(C.perfmon_getGroupName(gid))
|
||||
err := fmt.Errorf("failed to setup performance group %d (%s)", gid, gctr)
|
||||
return err
|
||||
var err error = nil
|
||||
var skip bool = false
|
||||
if ret == -37 {
|
||||
skip = true
|
||||
} else {
|
||||
err = fmt.Errorf("failed to setup performance group %d", evset.gid)
|
||||
}
|
||||
m.lock.Unlock()
|
||||
return skip, err
|
||||
}
|
||||
ret = C.perfmon_startCounters()
|
||||
if ret != 0 {
|
||||
gctr := C.GoString(C.perfmon_getGroupName(gid))
|
||||
err := fmt.Errorf("failed to start performance group %d (%s)", gid, gctr)
|
||||
return err
|
||||
var err error = nil
|
||||
var skip bool = false
|
||||
if ret == -37 {
|
||||
skip = true
|
||||
} else {
|
||||
err = fmt.Errorf("failed to setup performance group %d", evset.gid)
|
||||
}
|
||||
m.lock.Unlock()
|
||||
return skip, err
|
||||
}
|
||||
m.running = true
|
||||
time.Sleep(interval)
|
||||
m.running = false
|
||||
ret = C.perfmon_stopCounters()
|
||||
if ret != 0 {
|
||||
gctr := C.GoString(C.perfmon_getGroupName(gid))
|
||||
err := fmt.Errorf("failed to stop performance group %d (%s)", gid, gctr)
|
||||
return err
|
||||
var err error = nil
|
||||
var skip bool = false
|
||||
if ret == -37 {
|
||||
skip = true
|
||||
} else {
|
||||
err = fmt.Errorf("failed to setup performance group %d", evset.gid)
|
||||
}
|
||||
return nil
|
||||
m.lock.Unlock()
|
||||
return skip, err
|
||||
}
|
||||
}
|
||||
m.lock.Unlock()
|
||||
m.statsMeasurements++
|
||||
stats.ComponentStatInt(m.name, "measurements", m.statsMeasurements)
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// Get all measurement results for an event set, derive the metric values out of the measurement results and send it
|
||||
func (m *LikwidCollector) calcEventsetMetrics(group int, interval time.Duration, output chan lp.CCMetric) error {
|
||||
var eidx C.int
|
||||
evset := m.config.Eventsets[group]
|
||||
gid := m.groups[group]
|
||||
func (m *LikwidCollector) calcEventsetMetrics(evset LikwidEventsetConfig, interval time.Duration, output chan lp.CCMetric) error {
|
||||
invClock := float64(1.0 / m.basefreq)
|
||||
|
||||
// Go over events and get the results
|
||||
for eidx = 0; int(eidx) < len(evset.Events); eidx++ {
|
||||
ctr := C.perfmon_getCounterName(gid, eidx)
|
||||
gctr := C.GoString(ctr)
|
||||
|
||||
for eidx, counter := range evset.eorder {
|
||||
gctr := C.GoString(counter)
|
||||
for _, tid := range m.cpu2tid {
|
||||
if tid >= 0 {
|
||||
m.results[group][tid]["time"] = interval.Seconds()
|
||||
m.results[group][tid]["inverseClock"] = invClock
|
||||
res := C.perfmon_getLastResult(gid, eidx, C.int(tid))
|
||||
m.results[group][tid][gctr] = float64(res)
|
||||
}
|
||||
res := C.perfmon_getLastResult(evset.gid, C.int(eidx), C.int(tid))
|
||||
evset.results[tid][gctr] = float64(res)
|
||||
evset.results[tid]["time"] = interval.Seconds()
|
||||
evset.results[tid]["inverseClock"] = invClock
|
||||
}
|
||||
}
|
||||
|
||||
// Go over the event set metrics, derive the value out of the event:counter values and send it
|
||||
for _, metric := range evset.Metrics {
|
||||
for _, metric := range m.config.Eventsets[evset.internal].Metrics {
|
||||
// The metric scope is determined in the Init() function
|
||||
// Get the map scope-id -> tids
|
||||
scopemap := m.cpu2tid
|
||||
@@ -313,19 +354,21 @@ func (m *LikwidCollector) calcEventsetMetrics(group int, interval time.Duration,
|
||||
scopemap = m.sock2tid
|
||||
}
|
||||
for domain, tid := range scopemap {
|
||||
if tid >= 0 {
|
||||
value, err := agg.EvalFloat64Condition(metric.Calc, m.results[group][tid])
|
||||
if tid >= 0 && len(metric.Calc) > 0 {
|
||||
value, err := agg.EvalFloat64Condition(metric.Calc, evset.results[tid])
|
||||
if err != nil {
|
||||
cclog.ComponentError(m.name, "Calculation for metric", metric.Name, "failed:", err.Error())
|
||||
continue
|
||||
}
|
||||
m.mresults[group][tid][metric.Name] = value
|
||||
evset.metrics[tid][metric.Name] = value
|
||||
if m.config.InvalidToZero && math.IsNaN(value) {
|
||||
value = 0.0
|
||||
}
|
||||
if m.config.InvalidToZero && math.IsInf(value, 0) {
|
||||
value = 0.0
|
||||
}
|
||||
m.statsProcessedMetrics++
|
||||
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
|
||||
// Now we have the result, send it with the proper tags
|
||||
if !math.IsNaN(value) {
|
||||
if metric.Publish {
|
||||
@@ -338,6 +381,8 @@ func (m *LikwidCollector) calcEventsetMetrics(group int, interval time.Duration,
|
||||
if len(metric.Unit) > 0 {
|
||||
y.AddMeta("unit", metric.Unit)
|
||||
}
|
||||
m.statsPublishedMetrics++
|
||||
stats.ComponentStatInt(m.name, "published_metrics", m.statsPublishedMetrics)
|
||||
output <- y
|
||||
}
|
||||
}
|
||||
@@ -360,8 +405,8 @@ func (m *LikwidCollector) calcGlobalMetrics(interval time.Duration, output chan
|
||||
if tid >= 0 {
|
||||
// Here we generate parameter list
|
||||
params := make(map[string]interface{})
|
||||
for j := range m.groups {
|
||||
for mname, mres := range m.mresults[j][tid] {
|
||||
for _, evset := range m.likwidGroups {
|
||||
for mname, mres := range evset.metrics[tid] {
|
||||
params[mname] = mres
|
||||
}
|
||||
}
|
||||
@@ -378,6 +423,8 @@ func (m *LikwidCollector) calcGlobalMetrics(interval time.Duration, output chan
|
||||
if m.config.InvalidToZero && math.IsInf(value, 0) {
|
||||
value = 0.0
|
||||
}
|
||||
m.statsProcessedMetrics++
|
||||
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
|
||||
// Now we have the result, send it with the proper tags
|
||||
if !math.IsNaN(value) {
|
||||
if metric.Publish {
|
||||
@@ -391,6 +438,8 @@ func (m *LikwidCollector) calcGlobalMetrics(interval time.Duration, output chan
|
||||
if len(metric.Unit) > 0 {
|
||||
y.AddMeta("unit", metric.Unit)
|
||||
}
|
||||
m.statsPublishedMetrics++
|
||||
stats.ComponentStatInt(m.name, "published_metrics", m.statsPublishedMetrics)
|
||||
output <- y
|
||||
}
|
||||
}
|
||||
@@ -401,38 +450,163 @@ func (m *LikwidCollector) calcGlobalMetrics(interval time.Duration, output chan
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *LikwidCollector) LateInit() error {
|
||||
var ret C.int
|
||||
if m.initialized {
|
||||
return nil
|
||||
}
|
||||
switch m.config.AccessMode {
|
||||
case "direct":
|
||||
C.HPMmode(0)
|
||||
case "accessdaemon":
|
||||
if len(m.config.DaemonPath) > 0 {
|
||||
p := os.Getenv("PATH")
|
||||
os.Setenv("PATH", m.config.DaemonPath+":"+p)
|
||||
}
|
||||
C.HPMmode(1)
|
||||
}
|
||||
cclog.ComponentDebug(m.name, "initialize LIKWID topology")
|
||||
ret = C.topology_init()
|
||||
if ret != 0 {
|
||||
err := errors.New("failed to initialize LIKWID topology")
|
||||
cclog.ComponentError(m.name, err.Error())
|
||||
return err
|
||||
}
|
||||
|
||||
m.sock2tid = make(map[int]int)
|
||||
tmp := make([]C.int, 1)
|
||||
for _, sid := range topo.SocketList() {
|
||||
cstr := C.CString(fmt.Sprintf("S%d:0", sid))
|
||||
ret = C.cpustr_to_cpulist(cstr, &tmp[0], 1)
|
||||
if ret > 0 {
|
||||
m.sock2tid[sid] = m.cpu2tid[int(tmp[0])]
|
||||
}
|
||||
C.free(unsafe.Pointer(cstr))
|
||||
}
|
||||
|
||||
m.basefreq = getBaseFreq()
|
||||
cclog.ComponentDebug(m.name, "BaseFreq", m.basefreq)
|
||||
|
||||
cclog.ComponentDebug(m.name, "initialize LIKWID perfmon module")
|
||||
ret = C.perfmon_init(C.int(len(m.cpulist)), &m.cpulist[0])
|
||||
if ret != 0 {
|
||||
var err error = nil
|
||||
C.topology_finalize()
|
||||
if ret != -22 {
|
||||
err = errors.New("failed to initialize LIKWID perfmon")
|
||||
cclog.ComponentError(m.name, err.Error())
|
||||
} else {
|
||||
err = errors.New("access to LIKWID perfmon locked")
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// While adding the events, we test the metrics whether they can be computed at all
|
||||
for i, evset := range m.config.Eventsets {
|
||||
var gid C.int
|
||||
if len(evset.Events) > 0 {
|
||||
skip := false
|
||||
likwidGroup := genLikwidEventSet(evset)
|
||||
for _, g := range m.likwidGroups {
|
||||
if likwidGroup.go_estr == g.go_estr {
|
||||
skip = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if skip {
|
||||
continue
|
||||
}
|
||||
// Now we add the list of events to likwid
|
||||
gid = C.perfmon_addEventSet(likwidGroup.estr)
|
||||
if gid >= 0 {
|
||||
likwidGroup.gid = gid
|
||||
likwidGroup.internal = i
|
||||
m.likwidGroups[gid] = likwidGroup
|
||||
}
|
||||
} else {
|
||||
cclog.ComponentError(m.name, "Invalid Likwid eventset config, no events given")
|
||||
continue
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// If no event set could be added, shut down LikwidCollector
|
||||
if len(m.likwidGroups) == 0 {
|
||||
C.perfmon_finalize()
|
||||
C.topology_finalize()
|
||||
err := errors.New("no LIKWID performance group initialized")
|
||||
cclog.ComponentError(m.name, err.Error())
|
||||
return err
|
||||
}
|
||||
sigchan := make(chan os.Signal, 1)
|
||||
signal.Notify(sigchan, syscall.SIGCHLD)
|
||||
signal.Notify(sigchan, os.Interrupt)
|
||||
go func() {
|
||||
<-sigchan
|
||||
|
||||
signal.Stop(sigchan)
|
||||
m.initialized = false
|
||||
}()
|
||||
m.initialized = true
|
||||
return nil
|
||||
}
|
||||
|
||||
// main read function taking multiple measurement rounds, each 'interval' seconds long
|
||||
func (m *LikwidCollector) Read(interval time.Duration, output chan lp.CCMetric) {
|
||||
var skip bool = false
|
||||
var err error
|
||||
if !m.init {
|
||||
return
|
||||
}
|
||||
|
||||
for i := range m.groups {
|
||||
if !m.initialized {
|
||||
m.lock.Lock()
|
||||
err = m.LateInit()
|
||||
if err != nil {
|
||||
m.lock.Unlock()
|
||||
return
|
||||
}
|
||||
m.initialized = true
|
||||
m.lock.Unlock()
|
||||
}
|
||||
|
||||
if m.initialized && !skip {
|
||||
for _, evset := range m.likwidGroups {
|
||||
if !skip {
|
||||
// measure event set 'i' for 'interval' seconds
|
||||
err := m.takeMeasurement(i, interval)
|
||||
skip, err = m.takeMeasurement(evset, interval)
|
||||
if err != nil {
|
||||
cclog.ComponentError(m.name, err.Error())
|
||||
return
|
||||
}
|
||||
// read measurements and derive event set metrics
|
||||
m.calcEventsetMetrics(i, interval, output)
|
||||
}
|
||||
|
||||
if !skip {
|
||||
// read measurements and derive event set metrics
|
||||
m.calcEventsetMetrics(evset, interval, output)
|
||||
}
|
||||
}
|
||||
if !skip {
|
||||
// use the event set metrics to derive the global metrics
|
||||
m.calcGlobalMetrics(interval, output)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (m *LikwidCollector) Close() {
|
||||
if m.init {
|
||||
cclog.ComponentDebug(m.name, "Closing ...")
|
||||
m.init = false
|
||||
if m.running {
|
||||
cclog.ComponentDebug(m.name, "Stopping counters")
|
||||
C.perfmon_stopCounters()
|
||||
}
|
||||
cclog.ComponentDebug(m.name, "Closing ...")
|
||||
m.lock.Lock()
|
||||
if m.initialized {
|
||||
cclog.ComponentDebug(m.name, "Finalize LIKWID perfmon module")
|
||||
C.perfmon_finalize()
|
||||
m.initialized = false
|
||||
}
|
||||
m.lock.Unlock()
|
||||
cclog.ComponentDebug(m.name, "Finalize LIKWID topology module")
|
||||
C.topology_finalize()
|
||||
|
||||
cclog.ComponentDebug(m.name, "Closing done")
|
||||
}
|
||||
}
|
||||
|
@@ -10,6 +10,7 @@ import (
|
||||
|
||||
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
|
||||
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
|
||||
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
|
||||
)
|
||||
|
||||
//
|
||||
@@ -32,6 +33,7 @@ type LoadavgCollector struct {
|
||||
config struct {
|
||||
ExcludeMetrics []string `json:"exclude_metrics,omitempty"`
|
||||
}
|
||||
statsProcessedMetrics int64
|
||||
}
|
||||
|
||||
func (m *LoadavgCollector) Init(config json.RawMessage) error {
|
||||
@@ -63,6 +65,7 @@ func (m *LoadavgCollector) Init(config json.RawMessage) error {
|
||||
for i, name := range m.proc_matches {
|
||||
_, m.proc_skips[i] = stringArrayContains(m.config.ExcludeMetrics, name)
|
||||
}
|
||||
m.statsProcessedMetrics = 0
|
||||
m.init = true
|
||||
return nil
|
||||
}
|
||||
@@ -98,6 +101,7 @@ func (m *LoadavgCollector) Read(interval time.Duration, output chan lp.CCMetric)
|
||||
y, err := lp.New(name, m.tags, m.meta, map[string]interface{}{"value": x}, now)
|
||||
if err == nil {
|
||||
output <- y
|
||||
m.statsProcessedMetrics++
|
||||
}
|
||||
}
|
||||
|
||||
@@ -117,9 +121,10 @@ func (m *LoadavgCollector) Read(interval time.Duration, output chan lp.CCMetric)
|
||||
y, err := lp.New(name, m.tags, m.meta, map[string]interface{}{"value": x}, now)
|
||||
if err == nil {
|
||||
output <- y
|
||||
m.statsProcessedMetrics++
|
||||
}
|
||||
|
||||
}
|
||||
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
|
||||
}
|
||||
|
||||
func (m *LoadavgCollector) Close() {
|
||||
|
@@ -12,6 +12,7 @@ import (
|
||||
|
||||
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
|
||||
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
|
||||
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
|
||||
)
|
||||
|
||||
const LUSTRE_SYSFS = `/sys/fs/lustre`
|
||||
@@ -44,6 +45,7 @@ type LustreCollector struct {
|
||||
lastTimestamp time.Time // Store time stamp of last tick to derive bandwidths
|
||||
definitions []LustreMetricDefinition // Combined list without excluded metrics
|
||||
stats map[string]map[string]int64 // Data for last value per device and metric
|
||||
statsProcessedMetrics int64
|
||||
}
|
||||
|
||||
func (m *LustreCollector) getDeviceDataCommand(device string) []string {
|
||||
@@ -372,6 +374,7 @@ func (m *LustreCollector) Init(config json.RawMessage) error {
|
||||
}
|
||||
}
|
||||
m.lastTimestamp = time.Now()
|
||||
m.statsProcessedMetrics = 0
|
||||
m.init = true
|
||||
return nil
|
||||
}
|
||||
@@ -418,11 +421,13 @@ func (m *LustreCollector) Read(interval time.Duration, output chan lp.CCMetric)
|
||||
y.AddMeta("unit", def.unit)
|
||||
}
|
||||
output <- y
|
||||
m.statsProcessedMetrics++
|
||||
}
|
||||
devData[def.name] = use_x
|
||||
}
|
||||
}
|
||||
m.lastTimestamp = now
|
||||
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
|
||||
}
|
||||
|
||||
func (m *LustreCollector) Close() {
|
||||
|
@@ -14,6 +14,7 @@ import (
|
||||
|
||||
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
|
||||
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
|
||||
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
|
||||
)
|
||||
|
||||
const MEMSTATFILE = "/proc/meminfo"
|
||||
@@ -38,10 +39,16 @@ type MemstatCollector struct {
|
||||
config MemstatCollectorConfig
|
||||
nodefiles map[int]MemstatCollectorNode
|
||||
sendMemUsed bool
|
||||
statsProcessedMetrics int64
|
||||
}
|
||||
|
||||
func getStats(filename string) map[string]float64 {
|
||||
stats := make(map[string]float64)
|
||||
type MemstatStats struct {
|
||||
value float64
|
||||
unit string
|
||||
}
|
||||
|
||||
func getStats(filename string) map[string]MemstatStats {
|
||||
stats := make(map[string]MemstatStats)
|
||||
file, err := os.Open(filename)
|
||||
if err != nil {
|
||||
cclog.Error(err.Error())
|
||||
@@ -55,12 +62,18 @@ func getStats(filename string) map[string]float64 {
|
||||
if len(linefields) == 3 {
|
||||
v, err := strconv.ParseFloat(linefields[1], 64)
|
||||
if err == nil {
|
||||
stats[strings.Trim(linefields[0], ":")] = v
|
||||
stats[strings.Trim(linefields[0], ":")] = MemstatStats{
|
||||
value: v,
|
||||
unit: linefields[2],
|
||||
}
|
||||
}
|
||||
} else if len(linefields) == 5 {
|
||||
v, err := strconv.ParseFloat(linefields[3], 64)
|
||||
if err == nil {
|
||||
stats[strings.Trim(linefields[0], ":")] = v
|
||||
stats[strings.Trim(linefields[0], ":")] = MemstatStats{
|
||||
value: v,
|
||||
unit: linefields[4],
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -78,7 +91,7 @@ func (m *MemstatCollector) Init(config json.RawMessage) error {
|
||||
return err
|
||||
}
|
||||
}
|
||||
m.meta = map[string]string{"source": m.name, "group": "Memory", "unit": "GByte"}
|
||||
m.meta = map[string]string{"source": m.name, "group": "Memory"}
|
||||
m.stats = make(map[string]int64)
|
||||
m.matches = make(map[string]string)
|
||||
m.tags = map[string]string{"type": "node"}
|
||||
@@ -142,6 +155,7 @@ func (m *MemstatCollector) Init(config json.RawMessage) error {
|
||||
}
|
||||
}
|
||||
}
|
||||
m.statsProcessedMetrics = 0
|
||||
m.init = true
|
||||
return err
|
||||
}
|
||||
@@ -151,30 +165,53 @@ func (m *MemstatCollector) Read(interval time.Duration, output chan lp.CCMetric)
|
||||
return
|
||||
}
|
||||
|
||||
sendStats := func(stats map[string]float64, tags map[string]string) {
|
||||
sendStats := func(stats map[string]MemstatStats, tags map[string]string) {
|
||||
for match, name := range m.matches {
|
||||
var value float64 = 0
|
||||
var unit string = ""
|
||||
if v, ok := stats[match]; ok {
|
||||
value = v
|
||||
value = v.value
|
||||
if len(v.unit) > 0 {
|
||||
unit = v.unit
|
||||
}
|
||||
y, err := lp.New(name, tags, m.meta, map[string]interface{}{"value": value * 1e-6}, time.Now())
|
||||
}
|
||||
|
||||
y, err := lp.New(name, tags, m.meta, map[string]interface{}{"value": value}, time.Now())
|
||||
if err == nil {
|
||||
if len(unit) > 0 {
|
||||
y.AddMeta("unit", unit)
|
||||
}
|
||||
m.statsProcessedMetrics++
|
||||
output <- y
|
||||
}
|
||||
}
|
||||
if m.sendMemUsed {
|
||||
memUsed := 0.0
|
||||
unit := ""
|
||||
if totalVal, total := stats["MemTotal"]; total {
|
||||
if freeVal, free := stats["MemFree"]; free {
|
||||
if bufVal, buffers := stats["Buffers"]; buffers {
|
||||
if cacheVal, cached := stats["Cached"]; cached {
|
||||
memUsed = totalVal - (freeVal + bufVal + cacheVal)
|
||||
memUsed = totalVal.value - (freeVal.value + bufVal.value + cacheVal.value)
|
||||
if len(totalVal.unit) > 0 {
|
||||
unit = totalVal.unit
|
||||
} else if len(freeVal.unit) > 0 {
|
||||
unit = freeVal.unit
|
||||
} else if len(bufVal.unit) > 0 {
|
||||
unit = bufVal.unit
|
||||
} else if len(cacheVal.unit) > 0 {
|
||||
unit = cacheVal.unit
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
y, err := lp.New("mem_used", tags, m.meta, map[string]interface{}{"value": memUsed * 1e-6}, time.Now())
|
||||
}
|
||||
y, err := lp.New("mem_used", tags, m.meta, map[string]interface{}{"value": memUsed}, time.Now())
|
||||
if err == nil {
|
||||
if len(unit) > 0 {
|
||||
y.AddMeta("unit", unit)
|
||||
}
|
||||
m.statsProcessedMetrics++
|
||||
output <- y
|
||||
}
|
||||
}
|
||||
@@ -191,6 +228,7 @@ func (m *MemstatCollector) Read(interval time.Duration, output chan lp.CCMetric)
|
||||
sendStats(stats, nodeConf.tags)
|
||||
}
|
||||
}
|
||||
stats.ComponentStatInt(m.name, "collected_metrics", m.statsProcessedMetrics)
|
||||
}
|
||||
|
||||
func (m *MemstatCollector) Close() {
|
||||
|
@@ -11,6 +11,7 @@ import (
|
||||
|
||||
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
|
||||
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
|
||||
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
|
||||
)
|
||||
|
||||
const NETSTATFILE = "/proc/net/dev"
|
||||
@@ -35,6 +36,7 @@ type NetstatCollector struct {
|
||||
config NetstatCollectorConfig
|
||||
matches map[string][]NetstatCollectorMetric
|
||||
lastTimestamp time.Time
|
||||
statsProcessedMetrics int64
|
||||
}
|
||||
|
||||
func (m *NetstatCollector) Init(config json.RawMessage) error {
|
||||
@@ -148,6 +150,7 @@ func (m *NetstatCollector) Init(config json.RawMessage) error {
|
||||
if len(m.matches) == 0 {
|
||||
return errors.New("no devices to collector metrics found")
|
||||
}
|
||||
m.statsProcessedMetrics = 0
|
||||
m.init = true
|
||||
return nil
|
||||
}
|
||||
@@ -198,6 +201,7 @@ func (m *NetstatCollector) Read(interval time.Duration, output chan lp.CCMetric)
|
||||
if m.config.SendAbsoluteValues {
|
||||
if y, err := lp.New(metric.name, metric.tags, metric.meta, map[string]interface{}{"value": v}, now); err == nil {
|
||||
output <- y
|
||||
m.statsProcessedMetrics++
|
||||
}
|
||||
}
|
||||
if m.config.SendDerivedValues {
|
||||
@@ -205,6 +209,7 @@ func (m *NetstatCollector) Read(interval time.Duration, output chan lp.CCMetric)
|
||||
rate := float64(v-metric.lastValue) / timeDiff
|
||||
if y, err := lp.New(metric.name+"_bw", metric.tags, metric.meta_rates, map[string]interface{}{"value": rate}, now); err == nil {
|
||||
output <- y
|
||||
m.statsProcessedMetrics++
|
||||
}
|
||||
}
|
||||
metric.lastValue = v
|
||||
@@ -212,6 +217,7 @@ func (m *NetstatCollector) Read(interval time.Duration, output chan lp.CCMetric)
|
||||
}
|
||||
}
|
||||
}
|
||||
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
|
||||
}
|
||||
|
||||
func (m *NetstatCollector) Close() {
|
||||
|
@@ -12,6 +12,7 @@ import (
|
||||
"time"
|
||||
|
||||
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
|
||||
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
|
||||
)
|
||||
|
||||
// First part contains the code for the general NfsCollector.
|
||||
@@ -33,10 +34,11 @@ type nfsCollector struct {
|
||||
ExcludeMetrics []string `json:"exclude_metrics,omitempty"`
|
||||
}
|
||||
data map[string]NfsCollectorData
|
||||
statsProcessedMetrics int64
|
||||
}
|
||||
|
||||
func (m *nfsCollector) initStats() error {
|
||||
cmd := exec.Command(m.config.Nfsstats, `-l`)
|
||||
cmd := exec.Command(m.config.Nfsstats, `-l`, `--all`)
|
||||
cmd.Wait()
|
||||
buffer, err := cmd.Output()
|
||||
if err == nil {
|
||||
@@ -52,7 +54,7 @@ func (m *nfsCollector) initStats() error {
|
||||
if err == nil {
|
||||
x := m.data[name]
|
||||
x.current = value
|
||||
x.last = 0
|
||||
x.last = value
|
||||
m.data[name] = x
|
||||
}
|
||||
}
|
||||
@@ -63,7 +65,7 @@ func (m *nfsCollector) initStats() error {
|
||||
}
|
||||
|
||||
func (m *nfsCollector) updateStats() error {
|
||||
cmd := exec.Command(m.config.Nfsstats, `-l`)
|
||||
cmd := exec.Command(m.config.Nfsstats, `-l`, `--all`)
|
||||
cmd.Wait()
|
||||
buffer, err := cmd.Output()
|
||||
if err == nil {
|
||||
@@ -113,6 +115,7 @@ func (m *nfsCollector) MainInit(config json.RawMessage) error {
|
||||
}
|
||||
m.data = make(map[string]NfsCollectorData)
|
||||
m.initStats()
|
||||
m.statsProcessedMetrics = 0
|
||||
m.init = true
|
||||
return nil
|
||||
}
|
||||
@@ -143,8 +146,10 @@ func (m *nfsCollector) Read(interval time.Duration, output chan lp.CCMetric) {
|
||||
if err == nil {
|
||||
y.AddMeta("version", m.version)
|
||||
output <- y
|
||||
m.statsProcessedMetrics++
|
||||
}
|
||||
}
|
||||
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
|
||||
}
|
||||
|
||||
func (m *nfsCollector) Close() {
|
||||
|
@@ -12,6 +12,7 @@ import (
|
||||
|
||||
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
|
||||
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
|
||||
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
|
||||
)
|
||||
|
||||
//
|
||||
@@ -45,6 +46,7 @@ type NUMAStatsCollectorTopolgy struct {
|
||||
type NUMAStatsCollector struct {
|
||||
metricCollector
|
||||
topology []NUMAStatsCollectorTopolgy
|
||||
statsProcessedMetrics int64
|
||||
}
|
||||
|
||||
func (m *NUMAStatsCollector) Init(config json.RawMessage) error {
|
||||
@@ -80,7 +82,7 @@ func (m *NUMAStatsCollector) Init(config json.RawMessage) error {
|
||||
tagSet: map[string]string{"memoryDomain": node},
|
||||
})
|
||||
}
|
||||
|
||||
m.statsProcessedMetrics = 0
|
||||
m.init = true
|
||||
return nil
|
||||
}
|
||||
@@ -127,11 +129,13 @@ func (m *NUMAStatsCollector) Read(interval time.Duration, output chan lp.CCMetri
|
||||
)
|
||||
if err == nil {
|
||||
output <- y
|
||||
m.statsProcessedMetrics++
|
||||
}
|
||||
}
|
||||
|
||||
file.Close()
|
||||
}
|
||||
stats.ComponentStatInt(m.name, "collected_metrics", m.statsProcessedMetrics)
|
||||
}
|
||||
|
||||
func (m *NUMAStatsCollector) Close() {
|
||||
|
@@ -9,6 +9,7 @@ import (
|
||||
|
||||
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
|
||||
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
|
||||
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
|
||||
"github.com/NVIDIA/go-nvml/pkg/nvml"
|
||||
)
|
||||
|
||||
@@ -29,6 +30,7 @@ type NvidiaCollector struct {
|
||||
num_gpus int
|
||||
config NvidiaCollectorConfig
|
||||
gpus []NvidiaCollectorDevice
|
||||
statsProcessedMetrics int64
|
||||
}
|
||||
|
||||
func (m *NvidiaCollector) CatchPanic() {
|
||||
@@ -120,7 +122,7 @@ func (m *NvidiaCollector) Init(config json.RawMessage) error {
|
||||
pciInfo.Device)
|
||||
}
|
||||
}
|
||||
|
||||
m.statsProcessedMetrics = 0
|
||||
m.init = true
|
||||
return nil
|
||||
}
|
||||
@@ -151,6 +153,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric)
|
||||
if err == nil {
|
||||
y.AddMeta("unit", "%")
|
||||
output <- y
|
||||
m.statsProcessedMetrics++
|
||||
}
|
||||
}
|
||||
if !device.excludeMetrics["nv_mem_util"] {
|
||||
@@ -158,6 +161,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric)
|
||||
if err == nil {
|
||||
y.AddMeta("unit", "%")
|
||||
output <- y
|
||||
m.statsProcessedMetrics++
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -186,6 +190,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric)
|
||||
if err == nil {
|
||||
y.AddMeta("unit", "MByte")
|
||||
output <- y
|
||||
m.statsProcessedMetrics++
|
||||
}
|
||||
}
|
||||
|
||||
@@ -195,6 +200,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric)
|
||||
if err == nil {
|
||||
y.AddMeta("unit", "MByte")
|
||||
output <- y
|
||||
m.statsProcessedMetrics++
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -212,6 +218,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric)
|
||||
if err == nil {
|
||||
y.AddMeta("unit", "degC")
|
||||
output <- y
|
||||
m.statsProcessedMetrics++
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -232,6 +239,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric)
|
||||
if err == nil {
|
||||
y.AddMeta("unit", "%")
|
||||
output <- y
|
||||
m.statsProcessedMetrics++
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -258,11 +266,13 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric)
|
||||
}
|
||||
if err == nil {
|
||||
output <- y
|
||||
m.statsProcessedMetrics++
|
||||
}
|
||||
} else if ret == nvml.ERROR_NOT_SUPPORTED {
|
||||
y, err := lp.New("nv_ecc_mode", device.tags, m.meta, map[string]interface{}{"value": "N/A"}, time.Now())
|
||||
if err == nil {
|
||||
output <- y
|
||||
m.statsProcessedMetrics++
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -280,6 +290,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric)
|
||||
y, err := lp.New("nv_perf_state", device.tags, m.meta, map[string]interface{}{"value": fmt.Sprintf("P%d", int(pState))}, time.Now())
|
||||
if err == nil {
|
||||
output <- y
|
||||
m.statsProcessedMetrics++
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -296,6 +307,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric)
|
||||
if err == nil {
|
||||
y.AddMeta("unit", "watts")
|
||||
output <- y
|
||||
m.statsProcessedMetrics++
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -313,6 +325,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric)
|
||||
if err == nil {
|
||||
y.AddMeta("unit", "MHz")
|
||||
output <- y
|
||||
m.statsProcessedMetrics++
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -324,6 +337,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric)
|
||||
if err == nil {
|
||||
y.AddMeta("unit", "MHz")
|
||||
output <- y
|
||||
m.statsProcessedMetrics++
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -335,6 +349,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric)
|
||||
if err == nil {
|
||||
y.AddMeta("unit", "MHz")
|
||||
output <- y
|
||||
m.statsProcessedMetrics++
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -357,6 +372,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric)
|
||||
if err == nil {
|
||||
y.AddMeta("unit", "MHz")
|
||||
output <- y
|
||||
m.statsProcessedMetrics++
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -368,6 +384,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric)
|
||||
if err == nil {
|
||||
y.AddMeta("unit", "MHz")
|
||||
output <- y
|
||||
m.statsProcessedMetrics++
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -379,6 +396,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric)
|
||||
if err == nil {
|
||||
y.AddMeta("unit", "MHz")
|
||||
output <- y
|
||||
m.statsProcessedMetrics++
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -398,6 +416,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric)
|
||||
y, err := lp.New("nv_ecc_db_error", device.tags, m.meta, map[string]interface{}{"value": float64(ecc_db)}, time.Now())
|
||||
if err == nil {
|
||||
output <- y
|
||||
m.statsProcessedMetrics++
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -408,6 +427,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric)
|
||||
y, err := lp.New("nv_ecc_sb_error", device.tags, m.meta, map[string]interface{}{"value": float64(ecc_sb)}, time.Now())
|
||||
if err == nil {
|
||||
output <- y
|
||||
m.statsProcessedMetrics++
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -425,6 +445,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric)
|
||||
if err == nil {
|
||||
y.AddMeta("unit", "watts")
|
||||
output <- y
|
||||
m.statsProcessedMetrics++
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -441,6 +462,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric)
|
||||
if err == nil {
|
||||
y.AddMeta("unit", "%")
|
||||
output <- y
|
||||
m.statsProcessedMetrics++
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -457,11 +479,12 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric)
|
||||
if err == nil {
|
||||
y.AddMeta("unit", "%")
|
||||
output <- y
|
||||
m.statsProcessedMetrics++
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
stats.ComponentStatInt(m.name, "collected_metrics", m.statsProcessedMetrics)
|
||||
}
|
||||
|
||||
func (m *NvidiaCollector) Close() {
|
||||
|
@@ -6,6 +6,7 @@ import (
|
||||
|
||||
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
|
||||
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
|
||||
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
|
||||
)
|
||||
|
||||
// These are the fields we read from the JSON configuration
|
||||
@@ -20,6 +21,7 @@ type SampleCollector struct {
|
||||
config SampleTimerCollectorConfig // the configuration structure
|
||||
meta map[string]string // default meta information
|
||||
tags map[string]string // default tags
|
||||
statsCount int64
|
||||
}
|
||||
|
||||
// Functions to implement MetricCollector interface
|
||||
@@ -58,6 +60,9 @@ func (m *SampleCollector) Init(config json.RawMessage) error {
|
||||
// for all topological entities (sockets, NUMA domains, ...)
|
||||
// Return some useful error message in case of any failures
|
||||
|
||||
// Initialize counts for statistics
|
||||
m.statsCount = 0
|
||||
|
||||
// Set this flag only if everything is initialized properly, all required files exist, ...
|
||||
m.init = true
|
||||
return err
|
||||
@@ -80,8 +85,11 @@ func (m *SampleCollector) Read(interval time.Duration, output chan lp.CCMetric)
|
||||
if err == nil {
|
||||
// Send it to output channel
|
||||
output <- y
|
||||
// increment count for each sent metric or any other operation
|
||||
m.statsCount++
|
||||
}
|
||||
|
||||
// Set stats for the component
|
||||
stats.ComponentStatInt(m.name, "count", m.statsCount)
|
||||
}
|
||||
|
||||
// Close metric collector: close network connection, close files, close libraries, ...
|
||||
|
@@ -11,6 +11,7 @@ import (
|
||||
|
||||
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
|
||||
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
|
||||
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
|
||||
)
|
||||
|
||||
// See: https://www.kernel.org/doc/html/latest/hwmon/sysfs-interface.html
|
||||
@@ -41,6 +42,7 @@ type TempCollector struct {
|
||||
ReportCriticalTemp bool `json:"report_critical_temperature"`
|
||||
}
|
||||
sensors []*TempCollectorSensor
|
||||
statsProcessedMetrics int64
|
||||
}
|
||||
|
||||
func (m *TempCollector) Init(config json.RawMessage) error {
|
||||
@@ -162,6 +164,7 @@ func (m *TempCollector) Init(config json.RawMessage) error {
|
||||
}
|
||||
|
||||
// Finished initialization
|
||||
m.statsProcessedMetrics = 0
|
||||
m.init = true
|
||||
return nil
|
||||
}
|
||||
@@ -194,6 +197,7 @@ func (m *TempCollector) Read(interval time.Duration, output chan lp.CCMetric) {
|
||||
)
|
||||
if err == nil {
|
||||
output <- y
|
||||
m.statsProcessedMetrics++
|
||||
}
|
||||
|
||||
// max temperature
|
||||
@@ -207,6 +211,7 @@ func (m *TempCollector) Read(interval time.Duration, output chan lp.CCMetric) {
|
||||
)
|
||||
if err == nil {
|
||||
output <- y
|
||||
m.statsProcessedMetrics++
|
||||
}
|
||||
}
|
||||
|
||||
@@ -221,10 +226,11 @@ func (m *TempCollector) Read(interval time.Duration, output chan lp.CCMetric) {
|
||||
)
|
||||
if err == nil {
|
||||
output <- y
|
||||
m.statsProcessedMetrics++
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
|
||||
}
|
||||
|
||||
func (m *TempCollector) Close() {
|
||||
|
@@ -10,6 +10,7 @@ import (
|
||||
"time"
|
||||
|
||||
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
|
||||
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
|
||||
)
|
||||
|
||||
const MAX_NUM_PROCS = 10
|
||||
@@ -23,6 +24,7 @@ type TopProcsCollector struct {
|
||||
metricCollector
|
||||
tags map[string]string
|
||||
config TopProcsCollectorConfig
|
||||
statsProcessedMetrics int64
|
||||
}
|
||||
|
||||
func (m *TopProcsCollector) Init(config json.RawMessage) error {
|
||||
@@ -48,6 +50,7 @@ func (m *TopProcsCollector) Init(config json.RawMessage) error {
|
||||
if err != nil {
|
||||
return errors.New("failed to execute command")
|
||||
}
|
||||
m.statsProcessedMetrics = 0
|
||||
m.init = true
|
||||
return nil
|
||||
}
|
||||
@@ -70,8 +73,10 @@ func (m *TopProcsCollector) Read(interval time.Duration, output chan lp.CCMetric
|
||||
y, err := lp.New(name, m.tags, m.meta, map[string]interface{}{"value": string(lines[i])}, time.Now())
|
||||
if err == nil {
|
||||
output <- y
|
||||
m.statsProcessedMetrics++
|
||||
}
|
||||
}
|
||||
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
|
||||
}
|
||||
|
||||
func (m *TopProcsCollector) Close() {
|
||||
|
@@ -169,7 +169,10 @@ func DieList() []int {
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(dielist) > 0 {
|
||||
return dielist
|
||||
}
|
||||
return SocketList()
|
||||
}
|
||||
|
||||
type CpuEntry struct {
|
||||
@@ -261,7 +264,7 @@ func CpuData() []CpuEntry {
|
||||
for _, c := range CpuList() {
|
||||
clist = append(clist, CpuEntry{Cpuid: c})
|
||||
}
|
||||
for _, centry := range clist {
|
||||
for i, centry := range clist {
|
||||
centry.Socket = -1
|
||||
centry.Numadomain = -1
|
||||
centry.Die = -1
|
||||
@@ -289,6 +292,8 @@ func CpuData() []CpuEntry {
|
||||
// Lookup NUMA domain id
|
||||
centry.Numadomain = getNumaDomain(base)
|
||||
|
||||
// Update values in output list
|
||||
clist[i] = centry
|
||||
}
|
||||
return clist
|
||||
}
|
||||
|
@@ -8,6 +8,8 @@ The CCMetric router sits in between the collectors and the sinks and can be used
|
||||
{
|
||||
"num_cache_intervals" : 1,
|
||||
"interval_timestamp" : true,
|
||||
"hostname_tag" : "hostname",
|
||||
"max_forward" : 50,
|
||||
"add_tags" : [
|
||||
{
|
||||
"key" : "cluster",
|
||||
@@ -55,6 +57,20 @@ The CCMetric router sits in between the collectors and the sinks and can be used
|
||||
```
|
||||
|
||||
There are three main options `add_tags`, `delete_tags` and `interval_timestamp`. `add_tags` and `delete_tags` are lists consisting of dicts with `key`, `value` and `if`. The `value` can be omitted in the `delete_tags` part as it only uses the `key` for removal. The `interval_timestamp` setting means that a unique timestamp is applied to all metrics traversing the router during an interval.
|
||||
|
||||
# Processing order in the router
|
||||
|
||||
- Add the `hostname_tag` tag (if sent by collectors or cache)
|
||||
- If `interval_timestamp == true`, change time of metrics
|
||||
- Check if metric should be dropped (`drop_metrics` and `drop_metrics_if`)
|
||||
- Add tags from `add_tags`
|
||||
- Delete tags from `del_tags`
|
||||
- Rename metric based on `rename_metrics` and store old name as `oldname` in meta information
|
||||
- Add tags from `add_tags` (if you used the new name in the `if` condition)
|
||||
- Delete tags from `del_tags` (if you used the new name in the `if` condition)
|
||||
- Send to sinks
|
||||
- Move to cache (if `num_cache_intervals > 0`)
|
||||
|
||||
# The `interval_timestamp` option
|
||||
|
||||
The collectors' `Read()` functions are not called simultaneously and therefore the metrics gathered in an interval can have different timestamps. If you want to avoid that and have a common timestamp (the beginning of the interval), set this option to `true` and the MetricRouter sets the time.
|
||||
@@ -65,6 +81,14 @@ If the MetricRouter should buffer metrics of intervals in a MetricCache, this op
|
||||
|
||||
A `num_cache_intervals > 0` is required to use the `interval_aggregates` option.
|
||||
|
||||
# The `hostname_tag` option
|
||||
|
||||
By default, the router tags metrics with the hostname for all locally created metrics. The default tag name is `hostname`, but it can be changed if your organization wants anything else
|
||||
|
||||
# The `max_forward` option
|
||||
|
||||
Every time the router receives a metric through any of the channels, it tries to directly read up to `max_forward` metrics from the same channel. This was done as the router thread would go to sleep and wake up with every arriving metric. The default are `50` metrics at once and `max_forward` needs to greater than `1`.
|
||||
|
||||
# The `rename_metrics` option
|
||||
|
||||
In the ClusterCockpit world we specified a set of standard metrics. Since some collectors determine the metric names based on files, execuables and libraries, they might change from system to system (or installation to installtion, OS to OS, ...). In order to get the common names, you can rename incoming metrics before sending them to the sink. If the metric name matches the `oldname`, it is changed to `newname`
|
||||
|
17
internal/metricRouter/StatsApi.md
Normal file
17
internal/metricRouter/StatsApi.md
Normal file
@@ -0,0 +1,17 @@
|
||||
# Stats API
|
||||
|
||||
The Stats API can be used for debugging. It publishes counts at an HTTP endpoint as JSON from different componenets of the CC Metric Collector.
|
||||
|
||||
# Configuration
|
||||
|
||||
The Stats API has an own configuration file to specify the listen host and port. The defaults are `localhost` and `8080`.
|
||||
|
||||
```json
|
||||
{
|
||||
"bindhost" : "",
|
||||
"port" : "8080",
|
||||
"publish_collectorstate" : true
|
||||
}
|
||||
```
|
||||
|
||||
The `bindhost` and `port` can be used to specify the listen host and port. The `publish_collectorstate` needs to be `true`, otherwise nothing is presented. This option is for future use if we need to publish more infos using different domains.
|
232
internal/metricRouter/metricApi.go
Normal file
232
internal/metricRouter/metricApi.go
Normal file
@@ -0,0 +1,232 @@
|
||||
package metricRouter
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"os"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
|
||||
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
|
||||
mct "github.com/ClusterCockpit/cc-metric-collector/internal/multiChanTicker"
|
||||
"github.com/gorilla/mux"
|
||||
)
|
||||
|
||||
type statsApiConfig struct {
|
||||
PublishCollectorState bool `json:"publish_collectorstate"`
|
||||
Host string `json:"bindhost"`
|
||||
Port string `json:"port"`
|
||||
}
|
||||
|
||||
// Metric cache data structure
|
||||
type statsApi struct {
|
||||
name string
|
||||
input chan lp.CCMetric
|
||||
indone chan bool
|
||||
outdone chan bool
|
||||
config statsApiConfig
|
||||
wg *sync.WaitGroup
|
||||
statsWg sync.WaitGroup
|
||||
ticker mct.MultiChanTicker
|
||||
tickchan chan time.Time
|
||||
server *http.Server
|
||||
router *mux.Router
|
||||
lock sync.Mutex
|
||||
baseurl string
|
||||
stats map[string]map[string]int64
|
||||
outStats map[string]map[string]int64
|
||||
}
|
||||
|
||||
type StatsApi interface {
|
||||
Start()
|
||||
Close()
|
||||
StatsFunc(w http.ResponseWriter, r *http.Request)
|
||||
}
|
||||
|
||||
var statsApiServer *statsApi = nil
|
||||
|
||||
func (a *statsApi) updateStats(point lp.CCMetric) {
|
||||
switch point.Name() {
|
||||
case "_stats":
|
||||
if name, nok := point.GetMeta("source"); nok {
|
||||
var compStats map[string]int64
|
||||
var ok bool
|
||||
|
||||
if compStats, ok = a.stats[name]; !ok {
|
||||
a.stats[name] = make(map[string]int64)
|
||||
compStats = a.stats[name]
|
||||
}
|
||||
for k, v := range point.Fields() {
|
||||
switch value := v.(type) {
|
||||
case int:
|
||||
compStats[k] = int64(value)
|
||||
case uint:
|
||||
compStats[k] = int64(value)
|
||||
case int32:
|
||||
compStats[k] = int64(value)
|
||||
case uint32:
|
||||
compStats[k] = int64(value)
|
||||
case int64:
|
||||
compStats[k] = int64(value)
|
||||
case uint64:
|
||||
compStats[k] = int64(value)
|
||||
default:
|
||||
cclog.ComponentDebug(a.name, "Unusable stats for", k, ". Values should be int64")
|
||||
}
|
||||
}
|
||||
a.stats[name] = compStats
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (a *statsApi) Start() {
|
||||
a.ticker.AddChannel(a.tickchan)
|
||||
a.wg.Add(1)
|
||||
a.statsWg.Add(1)
|
||||
go func() {
|
||||
a.stats = make(map[string]map[string]int64)
|
||||
defer a.statsWg.Done()
|
||||
for {
|
||||
select {
|
||||
case <-a.indone:
|
||||
cclog.ComponentDebug(a.name, "INPUT DONE")
|
||||
close(a.indone)
|
||||
return
|
||||
case p := <-a.input:
|
||||
a.lock.Lock()
|
||||
a.updateStats(p)
|
||||
a.lock.Unlock()
|
||||
}
|
||||
}
|
||||
}()
|
||||
a.statsWg.Add(1)
|
||||
go func() {
|
||||
a.outStats = make(map[string]map[string]int64)
|
||||
defer a.statsWg.Done()
|
||||
a.lock.Lock()
|
||||
for comp, compData := range a.stats {
|
||||
var outData map[string]int64
|
||||
var ok bool
|
||||
if outData, ok = a.outStats[comp]; !ok {
|
||||
outData = make(map[string]int64)
|
||||
}
|
||||
for k, v := range compData {
|
||||
outData[k] = v
|
||||
}
|
||||
a.outStats[comp] = outData
|
||||
}
|
||||
a.lock.Unlock()
|
||||
for {
|
||||
select {
|
||||
case <-a.outdone:
|
||||
cclog.ComponentDebug(a.name, "OUTPUT DONE")
|
||||
close(a.outdone)
|
||||
return
|
||||
case <-a.tickchan:
|
||||
a.lock.Lock()
|
||||
for comp, compData := range a.stats {
|
||||
var outData map[string]int64
|
||||
var ok bool
|
||||
if outData, ok = a.outStats[comp]; !ok {
|
||||
outData = make(map[string]int64)
|
||||
}
|
||||
for k, v := range compData {
|
||||
outData[k] = v
|
||||
}
|
||||
a.outStats[comp] = outData
|
||||
}
|
||||
a.lock.Unlock()
|
||||
}
|
||||
}
|
||||
}()
|
||||
a.statsWg.Add(1)
|
||||
go func() {
|
||||
defer a.statsWg.Done()
|
||||
err := a.server.ListenAndServe()
|
||||
if err != nil && err.Error() != "http: Server closed" {
|
||||
cclog.ComponentError(a.name, err.Error())
|
||||
}
|
||||
cclog.ComponentDebug(a.name, "SERVER DONE")
|
||||
}()
|
||||
cclog.ComponentDebug(a.name, "STARTED")
|
||||
}
|
||||
|
||||
func (a *statsApi) StatsFunc(w http.ResponseWriter, r *http.Request) {
|
||||
data, err := json.Marshal(a.outStats)
|
||||
if err == nil {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
io.WriteString(w, string(data))
|
||||
}
|
||||
}
|
||||
|
||||
// Close finishes / stops the metric cache
|
||||
func (a *statsApi) Close() {
|
||||
cclog.ComponentDebug(a.name, "CLOSE")
|
||||
a.indone <- true
|
||||
a.outdone <- true
|
||||
a.server.Shutdown(context.Background())
|
||||
// wait for close of channel r.done
|
||||
<-a.indone
|
||||
<-a.outdone
|
||||
a.statsWg.Wait()
|
||||
a.wg.Done()
|
||||
|
||||
//a.wg.Wait()
|
||||
}
|
||||
|
||||
func NewStatsApi(ticker mct.MultiChanTicker, wg *sync.WaitGroup, statsApiConfigfile string) (StatsApi, error) {
|
||||
a := new(statsApi)
|
||||
a.name = "StatsApi"
|
||||
a.config.Host = "localhost"
|
||||
a.config.Port = "8080"
|
||||
configFile, err := os.Open(statsApiConfigfile)
|
||||
if err != nil {
|
||||
cclog.ComponentError(a.name, err.Error())
|
||||
return nil, err
|
||||
}
|
||||
defer configFile.Close()
|
||||
jsonParser := json.NewDecoder(configFile)
|
||||
err = jsonParser.Decode(&a.config)
|
||||
if err != nil {
|
||||
cclog.ComponentError(a.name, err.Error())
|
||||
return nil, err
|
||||
}
|
||||
a.input = make(chan lp.CCMetric)
|
||||
a.ticker = ticker
|
||||
a.tickchan = make(chan time.Time)
|
||||
a.wg = wg
|
||||
a.indone = make(chan bool)
|
||||
a.outdone = make(chan bool)
|
||||
a.router = mux.NewRouter()
|
||||
a.baseurl = fmt.Sprintf("%s:%s", a.config.Host, a.config.Port)
|
||||
a.server = &http.Server{Addr: a.baseurl, Handler: a.router}
|
||||
if a.config.PublishCollectorState {
|
||||
a.router.HandleFunc("/", a.StatsFunc)
|
||||
}
|
||||
statsApiServer = a
|
||||
return a, nil
|
||||
}
|
||||
|
||||
func ComponentStatInt(component string, key string, value int64) {
|
||||
if statsApiServer == nil {
|
||||
return
|
||||
}
|
||||
y, err := lp.New("_stats", map[string]string{}, map[string]string{"source": component}, map[string]interface{}{key: value}, time.Now())
|
||||
if err == nil {
|
||||
statsApiServer.input <- y
|
||||
}
|
||||
}
|
||||
|
||||
func ComponentStatString(component string, key string, value int64) {
|
||||
if statsApiServer == nil {
|
||||
return
|
||||
}
|
||||
y, err := lp.New("_stats", map[string]string{}, map[string]string{"source": component}, map[string]interface{}{key: value}, time.Now())
|
||||
if err == nil {
|
||||
statsApiServer.input <- y
|
||||
}
|
||||
}
|
@@ -54,6 +54,12 @@ type metricRouter struct {
|
||||
cache MetricCache // pointer to MetricCache
|
||||
cachewg sync.WaitGroup // wait group for MetricCache
|
||||
maxForward int // number of metrics to forward maximally in one iteration
|
||||
statsCollForward int64
|
||||
statsRecvForward int64
|
||||
statsCacheForward int64
|
||||
statsTotalForward int64
|
||||
statsDropped int64
|
||||
statsRenamed int64
|
||||
}
|
||||
|
||||
// MetricRouter access functions
|
||||
@@ -103,7 +109,10 @@ func (r *metricRouter) Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, rout
|
||||
cclog.ComponentError("MetricRouter", err.Error())
|
||||
return err
|
||||
}
|
||||
r.maxForward = 1
|
||||
if r.config.MaxForward > r.maxForward {
|
||||
r.maxForward = r.config.MaxForward
|
||||
}
|
||||
if r.config.NumCacheIntervals > 0 {
|
||||
r.cache, err = NewCache(r.cache_input, r.ticker, &r.cachewg, r.config.NumCacheIntervals)
|
||||
if err != nil {
|
||||
@@ -118,6 +127,12 @@ func (r *metricRouter) Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, rout
|
||||
for _, mname := range r.config.DropMetrics {
|
||||
r.config.dropMetrics[mname] = true
|
||||
}
|
||||
r.statsCollForward = 0
|
||||
r.statsRecvForward = 0
|
||||
r.statsCacheForward = 0
|
||||
r.statsTotalForward = 0
|
||||
r.statsDropped = 0
|
||||
r.statsRenamed = 0
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -137,6 +152,7 @@ func (r *metricRouter) StartTimer() {
|
||||
cclog.ComponentDebug("MetricRouter", "TIMER DONE")
|
||||
return
|
||||
case t := <-m:
|
||||
cclog.ComponentDebug("MetricRouter", "INTERVAL_TICK", t.Unix())
|
||||
r.timestamp = t
|
||||
}
|
||||
}
|
||||
@@ -250,6 +266,8 @@ func (r *metricRouter) Start() {
|
||||
r.DoDelTags(point)
|
||||
name := point.Name()
|
||||
if new, ok := r.config.RenameMetrics[name]; ok {
|
||||
r.statsRenamed++
|
||||
ComponentStatInt("MetricRouter", "renamed", r.statsRenamed)
|
||||
point.SetName(new)
|
||||
point.AddMeta("oldname", name)
|
||||
}
|
||||
@@ -269,7 +287,14 @@ func (r *metricRouter) Start() {
|
||||
p.SetTime(r.timestamp)
|
||||
}
|
||||
if !r.dropMetric(p) {
|
||||
r.statsCollForward++
|
||||
r.statsTotalForward++
|
||||
ComponentStatInt("MetricRouter", "collector_forward", r.statsCollForward)
|
||||
ComponentStatInt("MetricRouter", "total_forward", r.statsTotalForward)
|
||||
forward(p)
|
||||
} else {
|
||||
r.statsDropped++
|
||||
ComponentStatInt("MetricRouter", "dropped", r.statsDropped)
|
||||
}
|
||||
// even if the metric is dropped, it is stored in the cache for
|
||||
// aggregations
|
||||
@@ -285,7 +310,14 @@ func (r *metricRouter) Start() {
|
||||
p.SetTime(r.timestamp)
|
||||
}
|
||||
if !r.dropMetric(p) {
|
||||
r.statsRecvForward++
|
||||
r.statsTotalForward++
|
||||
ComponentStatInt("MetricRouter", "receiver_forward", r.statsRecvForward)
|
||||
ComponentStatInt("MetricRouter", "total_forward", r.statsTotalForward)
|
||||
forward(p)
|
||||
} else {
|
||||
r.statsDropped++
|
||||
ComponentStatInt("MetricRouter", "dropped", r.statsDropped)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -294,7 +326,14 @@ func (r *metricRouter) Start() {
|
||||
// receive from metric collector
|
||||
if !r.dropMetric(p) {
|
||||
p.AddTag(r.config.HostnameTagName, r.hostname)
|
||||
r.statsCacheForward++
|
||||
r.statsTotalForward++
|
||||
ComponentStatInt("MetricRouter", "cache_forward", r.statsCacheForward)
|
||||
ComponentStatInt("MetricRouter", "total_forward", r.statsTotalForward)
|
||||
forward(p)
|
||||
} else {
|
||||
r.statsDropped++
|
||||
ComponentStatInt("MetricRouter", "dropped", r.statsDropped)
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -11,6 +11,7 @@ import (
|
||||
|
||||
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
|
||||
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
|
||||
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
|
||||
)
|
||||
|
||||
const GMETRIC_EXEC = `gmetric`
|
||||
@@ -32,6 +33,7 @@ type GangliaSink struct {
|
||||
gmetric_path string
|
||||
gmetric_config string
|
||||
config GangliaSinkConfig
|
||||
statsSentMetrics int64
|
||||
}
|
||||
|
||||
func (s *GangliaSink) Write(point lp.CCMetric) error {
|
||||
@@ -78,6 +80,8 @@ func (s *GangliaSink) Write(point lp.CCMetric) error {
|
||||
command := exec.Command(s.gmetric_path, argstr...)
|
||||
command.Wait()
|
||||
_, err = command.Output()
|
||||
s.statsSentMetrics++
|
||||
stats.ComponentStatInt(s.name, "sent_metrics", s.statsSentMetrics)
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -120,5 +124,6 @@ func NewGangliaSink(name string, config json.RawMessage) (Sink, error) {
|
||||
if len(s.config.GmetricConfig) > 0 {
|
||||
s.gmetric_config = s.config.GmetricConfig
|
||||
}
|
||||
s.statsSentMetrics = 0
|
||||
return s, nil
|
||||
}
|
||||
|
@@ -11,6 +11,7 @@ import (
|
||||
|
||||
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
|
||||
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
|
||||
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
|
||||
influx "github.com/influxdata/line-protocol"
|
||||
)
|
||||
|
||||
@@ -36,19 +37,21 @@ type HttpSink struct {
|
||||
idleConnTimeout time.Duration
|
||||
timeout time.Duration
|
||||
flushDelay time.Duration
|
||||
statsProcessed int64
|
||||
statsFlushes int64
|
||||
}
|
||||
|
||||
func (s *HttpSink) Write(m lp.CCMetric) error {
|
||||
if s.buffer.Len() == 0 && s.flushDelay != 0 {
|
||||
// This is the first write since the last flush, start the flushTimer!
|
||||
if s.flushTimer != nil && s.flushTimer.Stop() {
|
||||
cclog.ComponentDebug("HttpSink", "unexpected: the flushTimer was already running?")
|
||||
cclog.ComponentDebug(s.name, "unexpected: the flushTimer was already running?")
|
||||
}
|
||||
|
||||
// Run a batched flush for all lines that have arrived in the last second
|
||||
s.flushTimer = time.AfterFunc(s.flushDelay, func() {
|
||||
if err := s.Flush(); err != nil {
|
||||
cclog.ComponentError("HttpSink", "flush failed:", err.Error())
|
||||
cclog.ComponentError(s.name, "flush failed:", err.Error())
|
||||
}
|
||||
})
|
||||
}
|
||||
@@ -60,8 +63,11 @@ func (s *HttpSink) Write(m lp.CCMetric) error {
|
||||
s.lock.Unlock() // defer does not work here as Flush() takes the lock as well
|
||||
|
||||
if err != nil {
|
||||
cclog.ComponentError(s.name, "encoding failed:", err.Error())
|
||||
return err
|
||||
}
|
||||
s.statsProcessed++
|
||||
stats.ComponentStatInt(s.name, "processed_metrics", s.statsProcessed)
|
||||
|
||||
// Flush synchronously if "flush_delay" is zero
|
||||
if s.flushDelay == 0 {
|
||||
@@ -84,6 +90,7 @@ func (s *HttpSink) Flush() error {
|
||||
// Create new request to send buffer
|
||||
req, err := http.NewRequest(http.MethodPost, s.config.URL, s.buffer)
|
||||
if err != nil {
|
||||
cclog.ComponentError(s.name, "failed to create request:", err.Error())
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -100,13 +107,18 @@ func (s *HttpSink) Flush() error {
|
||||
|
||||
// Handle transport/tcp errors
|
||||
if err != nil {
|
||||
cclog.ComponentError(s.name, "transport/tcp error:", err.Error())
|
||||
return err
|
||||
}
|
||||
|
||||
// Handle application errors
|
||||
if res.StatusCode != http.StatusOK {
|
||||
return errors.New(res.Status)
|
||||
err = errors.New(res.Status)
|
||||
cclog.ComponentError(s.name, "application error:", err.Error())
|
||||
return err
|
||||
}
|
||||
s.statsFlushes++
|
||||
stats.ComponentStatInt(s.name, "flushes", s.statsFlushes)
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -114,7 +126,7 @@ func (s *HttpSink) Flush() error {
|
||||
func (s *HttpSink) Close() {
|
||||
s.flushTimer.Stop()
|
||||
if err := s.Flush(); err != nil {
|
||||
cclog.ComponentError("HttpSink", "flush failed:", err.Error())
|
||||
cclog.ComponentError(s.name, "flush failed:", err.Error())
|
||||
}
|
||||
s.client.CloseIdleConnections()
|
||||
}
|
||||
@@ -172,5 +184,7 @@ func NewHttpSink(name string, config json.RawMessage) (Sink, error) {
|
||||
s.buffer = &bytes.Buffer{}
|
||||
s.encoder = influx.NewEncoder(s.buffer)
|
||||
s.encoder.SetPrecision(time.Second)
|
||||
s.statsFlushes = 0
|
||||
s.statsProcessed = 0
|
||||
return s, nil
|
||||
}
|
||||
|
@@ -10,6 +10,7 @@ import (
|
||||
|
||||
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
|
||||
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
|
||||
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
|
||||
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
|
||||
influxdb2Api "github.com/influxdata/influxdb-client-go/v2/api"
|
||||
)
|
||||
@@ -28,10 +29,10 @@ type InfluxAsyncSinkConfig struct {
|
||||
BatchSize uint `json:"batch_size,omitempty"`
|
||||
// Interval, in ms, in which is buffer flushed if it has not been already written (by reaching batch size) . Default 1000ms
|
||||
FlushInterval uint `json:"flush_interval,omitempty"`
|
||||
InfluxRetryInterval string `json:"retry_interval"`
|
||||
InfluxExponentialBase uint `json:"retry_exponential_base"`
|
||||
InfluxMaxRetries uint `json:"max_retries"`
|
||||
InfluxMaxRetryTime string `json:"max_retry_time"`
|
||||
InfluxRetryInterval string `json:"retry_interval,omitempty"`
|
||||
InfluxExponentialBase uint `json:"retry_exponential_base,omitempty"`
|
||||
InfluxMaxRetries uint `json:"max_retries,omitempty"`
|
||||
InfluxMaxRetryTime string `json:"max_retry_time,omitempty"`
|
||||
}
|
||||
|
||||
type InfluxAsyncSink struct {
|
||||
@@ -42,6 +43,9 @@ type InfluxAsyncSink struct {
|
||||
config InfluxAsyncSinkConfig
|
||||
influxRetryInterval uint
|
||||
influxMaxRetryTime uint
|
||||
sentMetrics int64
|
||||
statsFlushes int64
|
||||
statsErrors int64
|
||||
}
|
||||
|
||||
func (s *InfluxAsyncSink) connect() error {
|
||||
@@ -60,20 +64,34 @@ func (s *InfluxAsyncSink) connect() error {
|
||||
cclog.ComponentDebug(s.name, "Using URI", uri, "Org", s.config.Organization, "Bucket", s.config.Database)
|
||||
clientOptions := influxdb2.DefaultOptions()
|
||||
if s.config.BatchSize != 0 {
|
||||
cclog.ComponentDebug(s.name, "Batch size", s.config.BatchSize)
|
||||
clientOptions.SetBatchSize(s.config.BatchSize)
|
||||
}
|
||||
if s.config.FlushInterval != 0 {
|
||||
cclog.ComponentDebug(s.name, "Flush interval", s.config.FlushInterval)
|
||||
clientOptions.SetFlushInterval(s.config.FlushInterval)
|
||||
}
|
||||
if s.influxRetryInterval != 0 {
|
||||
cclog.ComponentDebug(s.name, "MaxRetryInterval", s.influxRetryInterval)
|
||||
clientOptions.SetMaxRetryInterval(s.influxRetryInterval)
|
||||
}
|
||||
if s.influxMaxRetryTime != 0 {
|
||||
cclog.ComponentDebug(s.name, "MaxRetryTime", s.influxMaxRetryTime)
|
||||
clientOptions.SetMaxRetryTime(s.influxMaxRetryTime)
|
||||
}
|
||||
if s.config.InfluxExponentialBase != 0 {
|
||||
cclog.ComponentDebug(s.name, "Exponential Base", s.config.InfluxExponentialBase)
|
||||
clientOptions.SetExponentialBase(s.config.InfluxExponentialBase)
|
||||
}
|
||||
if s.config.InfluxMaxRetries != 0 {
|
||||
cclog.ComponentDebug(s.name, "Max Retries", s.config.InfluxMaxRetries)
|
||||
clientOptions.SetMaxRetries(s.config.InfluxMaxRetries)
|
||||
}
|
||||
clientOptions.SetTLSConfig(
|
||||
&tls.Config{
|
||||
InsecureSkipVerify: true,
|
||||
},
|
||||
)
|
||||
clientOptions.SetMaxRetryInterval(s.influxRetryInterval)
|
||||
clientOptions.SetMaxRetryTime(s.influxMaxRetryTime)
|
||||
clientOptions.SetExponentialBase(s.config.InfluxExponentialBase)
|
||||
clientOptions.SetMaxRetries(s.config.InfluxMaxRetries)
|
||||
).SetPrecision(time.Second)
|
||||
|
||||
s.client = influxdb2.NewClientWithOptions(uri, auth, clientOptions)
|
||||
s.writeApi = s.client.WriteAPI(s.config.Organization, s.config.Database)
|
||||
@@ -91,11 +109,15 @@ func (s *InfluxAsyncSink) Write(m lp.CCMetric) error {
|
||||
s.writeApi.WritePoint(
|
||||
m.ToPoint(s.meta_as_tags),
|
||||
)
|
||||
s.sentMetrics++
|
||||
stats.ComponentStatInt(s.name, "send_metrics", s.sentMetrics)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *InfluxAsyncSink) Flush() error {
|
||||
s.writeApi.Flush()
|
||||
s.statsFlushes++
|
||||
stats.ComponentStatInt(s.name, "flushes", s.statsFlushes)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -110,13 +132,14 @@ func NewInfluxAsyncSink(name string, config json.RawMessage) (Sink, error) {
|
||||
s.name = fmt.Sprintf("InfluxSink(%s)", name)
|
||||
|
||||
// Set default for maximum number of points sent to server in single request.
|
||||
s.config.BatchSize = 100
|
||||
s.influxRetryInterval = uint(time.Duration(1) * time.Second)
|
||||
s.config.InfluxRetryInterval = "1s"
|
||||
s.influxMaxRetryTime = uint(7 * time.Duration(24) * time.Hour)
|
||||
s.config.InfluxMaxRetryTime = "168h"
|
||||
s.config.InfluxMaxRetries = 20
|
||||
s.config.InfluxExponentialBase = 2
|
||||
s.config.BatchSize = 0
|
||||
s.influxRetryInterval = 0
|
||||
//s.config.InfluxRetryInterval = "1s"
|
||||
s.influxMaxRetryTime = 0
|
||||
//s.config.InfluxMaxRetryTime = "168h"
|
||||
s.config.InfluxMaxRetries = 0
|
||||
s.config.InfluxExponentialBase = 0
|
||||
s.config.FlushInterval = 0
|
||||
|
||||
// Default retry intervals (in seconds)
|
||||
// 1 2
|
||||
@@ -174,12 +197,17 @@ func NewInfluxAsyncSink(name string, config json.RawMessage) (Sink, error) {
|
||||
}
|
||||
|
||||
// Start background: Read from error channel
|
||||
s.statsErrors = 0
|
||||
s.errors = s.writeApi.Errors()
|
||||
go func() {
|
||||
for err := range s.errors {
|
||||
s.statsErrors++
|
||||
stats.ComponentStatInt(s.name, "errors", s.statsErrors)
|
||||
cclog.ComponentError(s.name, err.Error())
|
||||
}
|
||||
}()
|
||||
|
||||
s.sentMetrics = 0
|
||||
s.statsFlushes = 0
|
||||
return s, nil
|
||||
}
|
||||
|
@@ -6,12 +6,15 @@ import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
|
||||
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
|
||||
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
|
||||
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
|
||||
influxdb2Api "github.com/influxdata/influxdb-client-go/v2/api"
|
||||
"github.com/influxdata/influxdb-client-go/v2/api/write"
|
||||
)
|
||||
|
||||
type InfluxSinkConfig struct {
|
||||
@@ -23,11 +26,13 @@ type InfluxSinkConfig struct {
|
||||
Password string `json:"password,omitempty"`
|
||||
Organization string `json:"organization,omitempty"`
|
||||
SSL bool `json:"ssl,omitempty"`
|
||||
FlushDelay string `json:"flush_delay,omitempty"`
|
||||
BatchSize int `json:"batch_size,omitempty"`
|
||||
RetentionPol string `json:"retention_policy,omitempty"`
|
||||
InfluxRetryInterval string `json:"retry_interval"`
|
||||
InfluxExponentialBase uint `json:"retry_exponential_base"`
|
||||
InfluxMaxRetries uint `json:"max_retries"`
|
||||
InfluxMaxRetryTime string `json:"max_retry_time"`
|
||||
// InfluxRetryInterval string `json:"retry_interval"`
|
||||
// InfluxExponentialBase uint `json:"retry_exponential_base"`
|
||||
// InfluxMaxRetries uint `json:"max_retries"`
|
||||
// InfluxMaxRetryTime string `json:"max_retry_time"`
|
||||
//InfluxMaxRetryDelay string `json:"max_retry_delay"` // It is mentioned in the docs but there is no way to set it
|
||||
}
|
||||
|
||||
@@ -38,6 +43,12 @@ type InfluxSink struct {
|
||||
config InfluxSinkConfig
|
||||
influxRetryInterval uint
|
||||
influxMaxRetryTime uint
|
||||
batch []*write.Point
|
||||
flushTimer *time.Timer
|
||||
flushDelay time.Duration
|
||||
lock sync.Mutex // Flush() runs in another goroutine, so this lock has to protect the buffer
|
||||
statsSentMetrics int64
|
||||
statsProcessedMetrics int64
|
||||
//influxMaxRetryDelay uint
|
||||
}
|
||||
|
||||
@@ -56,16 +67,31 @@ func (s *InfluxSink) connect() error {
|
||||
}
|
||||
cclog.ComponentDebug(s.name, "Using URI", uri, "Org", s.config.Organization, "Bucket", s.config.Database)
|
||||
clientOptions := influxdb2.DefaultOptions()
|
||||
|
||||
// if s.influxRetryInterval != 0 {
|
||||
// cclog.ComponentDebug(s.name, "MaxRetryInterval", s.influxRetryInterval)
|
||||
// clientOptions.SetMaxRetryInterval(s.influxRetryInterval)
|
||||
// }
|
||||
// if s.influxMaxRetryTime != 0 {
|
||||
// cclog.ComponentDebug(s.name, "MaxRetryTime", s.influxMaxRetryTime)
|
||||
// clientOptions.SetMaxRetryTime(s.influxMaxRetryTime)
|
||||
// }
|
||||
// if s.config.InfluxExponentialBase != 0 {
|
||||
// cclog.ComponentDebug(s.name, "Exponential Base", s.config.InfluxExponentialBase)
|
||||
// clientOptions.SetExponentialBase(s.config.InfluxExponentialBase)
|
||||
// }
|
||||
// if s.config.InfluxMaxRetries != 0 {
|
||||
// cclog.ComponentDebug(s.name, "Max Retries", s.config.InfluxMaxRetries)
|
||||
// clientOptions.SetMaxRetries(s.config.InfluxMaxRetries)
|
||||
// }
|
||||
|
||||
clientOptions.SetTLSConfig(
|
||||
&tls.Config{
|
||||
InsecureSkipVerify: true,
|
||||
},
|
||||
)
|
||||
|
||||
clientOptions.SetMaxRetryInterval(s.influxRetryInterval)
|
||||
clientOptions.SetMaxRetryTime(s.influxMaxRetryTime)
|
||||
clientOptions.SetExponentialBase(s.config.InfluxExponentialBase)
|
||||
clientOptions.SetMaxRetries(s.config.InfluxMaxRetries)
|
||||
clientOptions.SetPrecision(time.Second)
|
||||
|
||||
s.client = influxdb2.NewClientWithOptions(uri, auth, clientOptions)
|
||||
s.writeApi = s.client.WriteAPIBlocking(s.config.Organization, s.config.Database)
|
||||
@@ -80,38 +106,80 @@ func (s *InfluxSink) connect() error {
|
||||
}
|
||||
|
||||
func (s *InfluxSink) Write(m lp.CCMetric) error {
|
||||
err :=
|
||||
s.writeApi.WritePoint(
|
||||
context.Background(),
|
||||
m.ToPoint(s.meta_as_tags),
|
||||
)
|
||||
return err
|
||||
// err :=
|
||||
// s.writeApi.WritePoint(
|
||||
// context.Background(),
|
||||
// m.ToPoint(s.meta_as_tags),
|
||||
// )
|
||||
if len(s.batch) == 0 && s.flushDelay != 0 {
|
||||
// This is the first write since the last flush, start the flushTimer!
|
||||
if s.flushTimer != nil && s.flushTimer.Stop() {
|
||||
cclog.ComponentDebug(s.name, "unexpected: the flushTimer was already running?")
|
||||
}
|
||||
|
||||
// Run a batched flush for all lines that have arrived in the last second
|
||||
s.flushTimer = time.AfterFunc(s.flushDelay, func() {
|
||||
if err := s.Flush(); err != nil {
|
||||
cclog.ComponentError(s.name, "flush failed:", err.Error())
|
||||
}
|
||||
})
|
||||
}
|
||||
p := m.ToPoint(s.meta_as_tags)
|
||||
s.lock.Lock()
|
||||
s.statsProcessedMetrics++
|
||||
s.batch = append(s.batch, p)
|
||||
s.lock.Unlock()
|
||||
stats.ComponentStatInt(s.name, "processed_metrics", s.statsProcessedMetrics)
|
||||
|
||||
// Flush synchronously if "flush_delay" is zero
|
||||
if s.flushDelay == 0 {
|
||||
return s.Flush()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *InfluxSink) Flush() error {
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
if len(s.batch) == 0 {
|
||||
return nil
|
||||
}
|
||||
err := s.writeApi.WritePoint(context.Background(), s.batch...)
|
||||
if err != nil {
|
||||
cclog.ComponentError(s.name, "flush failed:", err.Error())
|
||||
return err
|
||||
}
|
||||
s.statsSentMetrics += int64(len(s.batch))
|
||||
stats.ComponentStatInt(s.name, "sent_metrics", s.statsSentMetrics)
|
||||
s.batch = s.batch[:0]
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *InfluxSink) Close() {
|
||||
cclog.ComponentDebug(s.name, "Closing InfluxDB connection")
|
||||
s.flushTimer.Stop()
|
||||
s.Flush()
|
||||
s.client.Close()
|
||||
}
|
||||
|
||||
func NewInfluxSink(name string, config json.RawMessage) (Sink, error) {
|
||||
s := new(InfluxSink)
|
||||
s.name = fmt.Sprintf("InfluxSink(%s)", name)
|
||||
s.config.BatchSize = 100
|
||||
s.config.FlushDelay = "1s"
|
||||
if len(config) > 0 {
|
||||
err := json.Unmarshal(config, &s.config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
s.influxRetryInterval = uint(time.Duration(1) * time.Second)
|
||||
s.config.InfluxRetryInterval = "1s"
|
||||
s.influxMaxRetryTime = uint(7 * time.Duration(24) * time.Hour)
|
||||
s.config.InfluxMaxRetryTime = "168h"
|
||||
s.config.InfluxMaxRetries = 20
|
||||
s.config.InfluxExponentialBase = 2
|
||||
s.influxRetryInterval = 0
|
||||
s.influxMaxRetryTime = 0
|
||||
// s.config.InfluxRetryInterval = ""
|
||||
// s.config.InfluxMaxRetryTime = ""
|
||||
// s.config.InfluxMaxRetries = 0
|
||||
// s.config.InfluxExponentialBase = 0
|
||||
|
||||
if len(s.config.Host) == 0 ||
|
||||
len(s.config.Port) == 0 ||
|
||||
@@ -126,19 +194,31 @@ func NewInfluxSink(name string, config json.RawMessage) (Sink, error) {
|
||||
s.meta_as_tags[k] = true
|
||||
}
|
||||
|
||||
toUint := func(duration string, def uint) uint {
|
||||
t, err := time.ParseDuration(duration)
|
||||
// toUint := func(duration string, def uint) uint {
|
||||
// if len(duration) > 0 {
|
||||
// t, err := time.ParseDuration(duration)
|
||||
// if err == nil {
|
||||
// return uint(t.Milliseconds())
|
||||
// }
|
||||
// }
|
||||
// return def
|
||||
// }
|
||||
// s.influxRetryInterval = toUint(s.config.InfluxRetryInterval, s.influxRetryInterval)
|
||||
// s.influxMaxRetryTime = toUint(s.config.InfluxMaxRetryTime, s.influxMaxRetryTime)
|
||||
|
||||
if len(s.config.FlushDelay) > 0 {
|
||||
t, err := time.ParseDuration(s.config.FlushDelay)
|
||||
if err == nil {
|
||||
return uint(t.Milliseconds())
|
||||
s.flushDelay = t
|
||||
}
|
||||
return def
|
||||
}
|
||||
s.influxRetryInterval = toUint(s.config.InfluxRetryInterval, s.influxRetryInterval)
|
||||
s.influxMaxRetryTime = toUint(s.config.InfluxMaxRetryTime, s.influxMaxRetryTime)
|
||||
s.batch = make([]*write.Point, 0, s.config.BatchSize)
|
||||
|
||||
// Connect to InfluxDB server
|
||||
if err := s.connect(); err != nil {
|
||||
return nil, fmt.Errorf("unable to connect: %v", err)
|
||||
}
|
||||
s.statsSentMetrics = 0
|
||||
s.statsProcessedMetrics = 0
|
||||
return s, nil
|
||||
}
|
||||
|
@@ -17,10 +17,8 @@ The `influxdb` sink uses the official [InfluxDB golang client](https://pkg.go.de
|
||||
"password" : "examplepw",
|
||||
"organization": "myorg",
|
||||
"ssl": true,
|
||||
"retry_interval" : "1s",
|
||||
"retry_exponential_base" : 2,
|
||||
"max_retries": 20,
|
||||
"max_retry_time" : "168h"
|
||||
"flush_delay" : "1s",
|
||||
"batch_size" : 100
|
||||
}
|
||||
}
|
||||
```
|
||||
@@ -34,9 +32,6 @@ The `influxdb` sink uses the official [InfluxDB golang client](https://pkg.go.de
|
||||
- `password`: Password for basic authentification
|
||||
- `organization`: Organization in the InfluxDB
|
||||
- `ssl`: Use SSL connection
|
||||
- `retry_interval`: Base retry interval for failed write requests, default 1s
|
||||
- `retry_exponential_base`: The retry interval is exponentially increased with this base, default 2
|
||||
- `max_retries`: Maximal number of retry attempts
|
||||
- `max_retry_time`: Maximal time to retry failed writes, default 168h (one week)
|
||||
- `flush_delay`: Group metrics coming in to a single batch
|
||||
- `batch_size`: Maximal batch size
|
||||
|
||||
For information about the calculation of the retry interval settings, see [offical influxdb-client-go documentation](https://github.com/influxdata/influxdb-client-go#handling-of-failed-async-writes)
|
@@ -73,6 +73,7 @@ import (
|
||||
|
||||
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
|
||||
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
|
||||
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
|
||||
"github.com/NVIDIA/go-nvml/pkg/dl"
|
||||
)
|
||||
|
||||
@@ -107,6 +108,7 @@ type LibgangliaSink struct {
|
||||
gmond_config C.Ganglia_gmond_config
|
||||
send_channels C.Ganglia_udp_send_channels
|
||||
cstrCache map[string]*C.char
|
||||
statsSentMetrics int64
|
||||
}
|
||||
|
||||
func (s *LibgangliaSink) Write(point lp.CCMetric) error {
|
||||
@@ -202,6 +204,8 @@ func (s *LibgangliaSink) Write(point lp.CCMetric) error {
|
||||
C.Ganglia_metric_destroy(gmetric)
|
||||
// Free the value C string, the only one not stored in the cache
|
||||
C.free(unsafe.Pointer(c_value))
|
||||
s.statsSentMetrics++
|
||||
stats.ComponentStatInt(s.name, "sent_metrics", s.statsSentMetrics)
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -247,7 +251,7 @@ func NewLibgangliaSink(name string, config json.RawMessage) (Sink, error) {
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error opening %s: %v", s.config.GangliaLib, err)
|
||||
}
|
||||
|
||||
s.statsSentMetrics = 0
|
||||
// Set up cache for the C strings
|
||||
s.cstrCache = make(map[string]*C.char)
|
||||
// s.cstrCache["globals"] = C.CString("globals")
|
||||
|
@@ -11,6 +11,7 @@ import (
|
||||
|
||||
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
|
||||
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
|
||||
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
@@ -34,6 +35,7 @@ type PrometheusSink struct {
|
||||
nodeMetrics map[string]prometheus.Gauge
|
||||
promWg sync.WaitGroup
|
||||
promServer *http.Server
|
||||
statsSentMetrics int64
|
||||
}
|
||||
|
||||
func intToFloat64(input interface{}) (float64, error) {
|
||||
@@ -113,6 +115,8 @@ func (s *PrometheusSink) newMetric(metric lp.CCMetric) error {
|
||||
s.nodeMetrics[name] = new
|
||||
prometheus.Register(new)
|
||||
}
|
||||
s.statsSentMetrics++
|
||||
stats.ComponentStatInt(s.name, "sent_metrics", s.statsSentMetrics)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -146,6 +150,8 @@ func (s *PrometheusSink) updateMetric(metric lp.CCMetric) error {
|
||||
}
|
||||
s.nodeMetrics[name].Set(value)
|
||||
}
|
||||
s.statsSentMetrics++
|
||||
stats.ComponentStatInt(s.name, "sent_metrics", s.statsSentMetrics)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@@ -7,6 +7,7 @@ import (
|
||||
|
||||
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
|
||||
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
|
||||
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
|
||||
)
|
||||
|
||||
type SampleSinkConfig struct {
|
||||
@@ -14,12 +15,15 @@ type SampleSinkConfig struct {
|
||||
// See: metricSink.go
|
||||
defaultSinkConfig
|
||||
// Additional config options, for SampleSink
|
||||
|
||||
}
|
||||
|
||||
type SampleSink struct {
|
||||
// declares elements 'name' and 'meta_as_tags' (string to bool map!)
|
||||
sink
|
||||
config SampleSinkConfig // entry point to the SampleSinkConfig
|
||||
// Stats counters
|
||||
statsSentMetrics int64
|
||||
}
|
||||
|
||||
// Implement functions required for Sink interface
|
||||
@@ -30,6 +34,8 @@ type SampleSink struct {
|
||||
func (s *SampleSink) Write(point lp.CCMetric) error {
|
||||
// based on s.meta_as_tags use meta infos as tags
|
||||
log.Print(point)
|
||||
s.statsSentMetrics++
|
||||
stats.ComponentStatInt(s.name, "sent_metrics", s.statsSentMetrics)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -63,6 +69,9 @@ func NewSampleSink(name string, config json.RawMessage) (Sink, error) {
|
||||
}
|
||||
}
|
||||
|
||||
// Initalize stats counters
|
||||
s.statsSentMetrics = 0
|
||||
|
||||
// Create lookup map to use meta infos as tags in the output metric
|
||||
s.meta_as_tags = make(map[string]bool)
|
||||
for _, k := range s.config.MetaAsTags {
|
||||
|
@@ -102,13 +102,19 @@ func (sm *sinkManager) Start() {
|
||||
}
|
||||
|
||||
toTheSinks := func(p lp.CCMetric) {
|
||||
var wg sync.WaitGroup
|
||||
// Send received metric to all outputs
|
||||
cclog.ComponentDebug("SinkManager", "WRITE", p)
|
||||
for _, s := range sm.sinks {
|
||||
wg.Add(1)
|
||||
go func(s Sink) {
|
||||
if err := s.Write(p); err != nil {
|
||||
cclog.ComponentError("SinkManager", "WRITE", s.Name(), "write failed:", err.Error())
|
||||
}
|
||||
wg.Done()
|
||||
}(s)
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
for {
|
||||
|
@@ -8,6 +8,7 @@ import (
|
||||
|
||||
// "time"
|
||||
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
|
||||
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
|
||||
)
|
||||
|
||||
type StdoutSink struct {
|
||||
@@ -17,6 +18,7 @@ type StdoutSink struct {
|
||||
defaultSinkConfig
|
||||
Output string `json:"output_file,omitempty"`
|
||||
}
|
||||
sentMetrics int64
|
||||
}
|
||||
|
||||
func (s *StdoutSink) Write(m lp.CCMetric) error {
|
||||
@@ -24,6 +26,8 @@ func (s *StdoutSink) Write(m lp.CCMetric) error {
|
||||
s.output,
|
||||
m.ToLineProtocol(s.meta_as_tags),
|
||||
)
|
||||
s.sentMetrics++
|
||||
stats.ComponentStatInt(s.name, "sent_metrics", s.sentMetrics)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -68,6 +72,7 @@ func NewStdoutSink(name string, config json.RawMessage) (Sink, error) {
|
||||
for _, k := range s.config.MetaAsTags {
|
||||
s.meta_as_tags[k] = true
|
||||
}
|
||||
s.sentMetrics = 0
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
Reference in New Issue
Block a user