Compare commits

..

5 Commits

Author SHA1 Message Date
Thomas Roehl
9dd6ff1a76 Add StatsAPI to README 2022-04-02 16:07:13 +02:00
Thomas Roehl
257b4a64b5 Add missing main API file 2022-04-02 16:06:51 +02:00
Thomas Roehl
5eeb097136 Add stats counters to sinks 2022-04-02 16:06:03 +02:00
Thomas Roehl
4a4992877c Add stats counters to collectors 2022-04-02 16:05:52 +02:00
Thomas Roehl
9447685a69 Add StatsApi. Started if a configuration file is set in global config.json 2022-04-02 16:05:27 +02:00
44 changed files with 810 additions and 695 deletions

View File

@@ -20,6 +20,7 @@ There is a main configuration file with basic settings that point to the other c
"collectors" : "collectors.json",
"receivers" : "receivers.json",
"router" : "router.json",
"stats_api" : "api.json",
"interval": 10,
"duration": 1
}
@@ -32,6 +33,7 @@ See the component READMEs for their configuration:
* [`sinks`](./sinks/README.md)
* [`receivers`](./receivers/README.md)
* [`router`](./internal/metricRouter/README.md)
* [`stats_api`](./internal/metricRouter/StatsApi.md)
# Installation

View File

@@ -28,6 +28,7 @@ type CentralConfigFile struct {
RouterConfigFile string `json:"router"`
SinkConfigFile string `json:"sinks"`
ReceiverConfigFile string `json:"receivers,omitempty"`
StatsApiConfigFile string `json:"stats_api,omitempty"`
}
func LoadCentralConfiguration(file string, config *CentralConfigFile) error {
@@ -52,6 +53,7 @@ type RuntimeConfig struct {
CollectManager collectors.CollectorManager
SinkManager sinks.SinkManager
ReceiveManager receivers.ReceiveManager
StatsApi mr.StatsApi
MultiChanTicker mct.MultiChanTicker
Channels []chan lp.CCMetric
@@ -152,11 +154,16 @@ func shutdownHandler(config *RuntimeConfig, shutdownSignal chan os.Signal) {
cclog.Debug("Shutdown SinkManager...")
config.SinkManager.Close()
}
if config.StatsApi != nil {
cclog.Debug("Shutdown StatsApi...")
config.StatsApi.Close()
}
}
func mainFunc() int {
var err error
use_recv := false
use_api := false
// Initialize runtime configuration
rcfg := RuntimeConfig{
@@ -164,6 +171,7 @@ func mainFunc() int {
CollectManager: nil,
SinkManager: nil,
ReceiveManager: nil,
StatsApi: nil,
CliArgs: ReadCli(),
}
@@ -253,6 +261,16 @@ func mainFunc() int {
use_recv = true
}
// Create new statistics API manager
if len(rcfg.ConfigFile.StatsApiConfigFile) > 0 {
rcfg.StatsApi, err = mr.NewStatsApi(rcfg.MultiChanTicker, &rcfg.Sync, rcfg.ConfigFile.StatsApiConfigFile)
if err != nil {
cclog.Error(err.Error())
return 1
}
use_api = true
}
// Create shutdown handler
shutdownSignal := make(chan os.Signal, 1)
signal.Notify(shutdownSignal, os.Interrupt)
@@ -260,6 +278,11 @@ func mainFunc() int {
rcfg.Sync.Add(1)
go shutdownHandler(&rcfg, shutdownSignal)
// Start the stats api early to be prepared for init settings
if use_api {
rcfg.StatsApi.Start()
}
// Start the managers
rcfg.MetricRouter.Start()
rcfg.SinkManager.Start()

View File

@@ -16,6 +16,7 @@ import (
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
)
const DEFAULT_BEEGFS_CMD = "beegfs-ctl"
@@ -29,10 +30,11 @@ type BeegfsMetaCollectorConfig struct {
type BeegfsMetaCollector struct {
metricCollector
tags map[string]string
matches map[string]string
config BeegfsMetaCollectorConfig
skipFS map[string]struct{}
tags map[string]string
matches map[string]string
config BeegfsMetaCollectorConfig
skipFS map[string]struct{}
statsProcessedMetrics int64
}
func (m *BeegfsMetaCollector) Init(config json.RawMessage) error {
@@ -105,6 +107,7 @@ func (m *BeegfsMetaCollector) Init(config json.RawMessage) error {
if err != nil {
return fmt.Errorf("BeegfsMetaCollector.Init(): Failed to find beegfs-ctl binary '%s': %v", m.config.Beegfs, err)
}
m.statsProcessedMetrics = 0
m.init = true
return nil
}
@@ -218,10 +221,12 @@ func (m *BeegfsMetaCollector) Read(interval time.Duration, output chan lp.CCMetr
y, err := lp.New(key, m.tags, m.meta, map[string]interface{}{"value": value}, time.Now())
if err == nil {
output <- y
m.statsProcessedMetrics++
}
}
}
}
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
}
func (m *BeegfsMetaCollector) Close() {

View File

@@ -16,6 +16,7 @@ import (
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
)
// Struct for the collector-specific JSON config
@@ -27,10 +28,11 @@ type BeegfsStorageCollectorConfig struct {
type BeegfsStorageCollector struct {
metricCollector
tags map[string]string
matches map[string]string
config BeegfsStorageCollectorConfig
skipFS map[string]struct{}
tags map[string]string
matches map[string]string
config BeegfsStorageCollectorConfig
skipFS map[string]struct{}
statsProcessedMetrics int64
}
func (m *BeegfsStorageCollector) Init(config json.RawMessage) error {
@@ -98,6 +100,7 @@ func (m *BeegfsStorageCollector) Init(config json.RawMessage) error {
if err != nil {
return fmt.Errorf("BeegfsStorageCollector.Init(): Failed to find beegfs-ctl binary '%s': %v", m.config.Beegfs, err)
}
m.statsProcessedMetrics = 0
m.init = true
return nil
}
@@ -210,10 +213,12 @@ func (m *BeegfsStorageCollector) Read(interval time.Duration, output chan lp.CCM
y, err := lp.New(key, m.tags, m.meta, map[string]interface{}{"value": value}, time.Now())
if err == nil {
output <- y
m.statsProcessedMetrics++
}
}
}
}
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
}
func (m *BeegfsStorageCollector) Close() {

View File

@@ -12,6 +12,7 @@ import (
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
)
//
@@ -36,7 +37,8 @@ type CPUFreqCpuInfoCollectorTopology struct {
type CPUFreqCpuInfoCollector struct {
metricCollector
topology []*CPUFreqCpuInfoCollectorTopology
topology []*CPUFreqCpuInfoCollectorTopology
statsProcessedMetrics int64
}
func (m *CPUFreqCpuInfoCollector) Init(config json.RawMessage) error {
@@ -155,7 +157,7 @@ func (m *CPUFreqCpuInfoCollector) Init(config json.RawMessage) error {
"package_id": t.physicalPackageID,
}
}
m.statsProcessedMetrics = 0
m.init = true
return nil
}
@@ -196,6 +198,7 @@ func (m *CPUFreqCpuInfoCollector) Read(interval time.Duration, output chan lp.CC
return
}
if y, err := lp.New("cpufreq", t.tagSet, m.meta, map[string]interface{}{"value": value}, now); err == nil {
m.statsProcessedMetrics++
output <- y
}
}
@@ -203,6 +206,7 @@ func (m *CPUFreqCpuInfoCollector) Read(interval time.Duration, output chan lp.CC
}
}
}
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
}
func (m *CPUFreqCpuInfoCollector) Close() {

View File

@@ -11,6 +11,7 @@ import (
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
"golang.org/x/sys/unix"
)
@@ -39,8 +40,9 @@ type CPUFreqCollectorTopology struct {
//
type CPUFreqCollector struct {
metricCollector
topology []CPUFreqCollectorTopology
config struct {
topology []CPUFreqCollectorTopology
statsProcessedMetrics int64
config struct {
ExcludeMetrics []string `json:"exclude_metrics,omitempty"`
}
}
@@ -166,7 +168,7 @@ func (m *CPUFreqCollector) Init(config json.RawMessage) error {
"package_id": t.physicalPackageID,
}
}
m.statsProcessedMetrics = 0
m.init = true
return nil
}
@@ -203,9 +205,11 @@ func (m *CPUFreqCollector) Read(interval time.Duration, output chan lp.CCMetric)
}
if y, err := lp.New("cpufreq", t.tagSet, m.meta, map[string]interface{}{"value": cpuFreq}, now); err == nil {
m.statsProcessedMetrics++
output <- y
}
}
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
}
func (m *CPUFreqCollector) Close() {

View File

@@ -11,6 +11,7 @@ import (
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
)
const CPUSTATFILE = `/proc/stat`
@@ -21,10 +22,11 @@ type CpustatCollectorConfig struct {
type CpustatCollector struct {
metricCollector
config CpustatCollectorConfig
matches map[string]int
cputags map[string]map[string]string
nodetags map[string]string
config CpustatCollectorConfig
matches map[string]int
cputags map[string]map[string]string
nodetags map[string]string
statsProcessedMetrics int64
}
func (m *CpustatCollector) Init(config json.RawMessage) error {
@@ -86,6 +88,7 @@ func (m *CpustatCollector) Init(config json.RawMessage) error {
num_cpus++
}
}
m.statsProcessedMetrics = 0
m.init = true
return nil
}
@@ -106,6 +109,7 @@ func (m *CpustatCollector) parseStatLine(linefields []string, tags map[string]st
for name, value := range values {
y, err := lp.New(name, tags, m.meta, map[string]interface{}{"value": (value * 100.0) / total}, t)
if err == nil {
m.statsProcessedMetrics++
output <- y
}
}
@@ -141,8 +145,10 @@ func (m *CpustatCollector) Read(interval time.Duration, output chan lp.CCMetric)
time.Now(),
)
if err == nil {
m.statsProcessedMetrics++
output <- num_cpus_metric
}
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
}
func (m *CpustatCollector) Close() {

View File

@@ -10,6 +10,7 @@ import (
"time"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
influx "github.com/influxdata/line-protocol"
)
@@ -23,11 +24,14 @@ type CustomCmdCollectorConfig struct {
type CustomCmdCollector struct {
metricCollector
handler *influx.MetricHandler
parser *influx.Parser
config CustomCmdCollectorConfig
commands []string
files []string
handler *influx.MetricHandler
parser *influx.Parser
config CustomCmdCollectorConfig
commands []string
files []string
statsProcessedMetrics int64
statsProcessedCommands int64
statsProcessedFiles int64
}
func (m *CustomCmdCollector) Init(config json.RawMessage) error {
@@ -66,6 +70,9 @@ func (m *CustomCmdCollector) Init(config json.RawMessage) error {
m.handler = influx.NewMetricHandler()
m.parser = influx.NewParser(m.handler)
m.parser.SetTimeFunc(DefaultTime)
m.statsProcessedMetrics = 0
m.statsProcessedFiles = 0
m.statsProcessedCommands = 0
m.init = true
return nil
}
@@ -100,9 +107,13 @@ func (m *CustomCmdCollector) Read(interval time.Duration, output chan lp.CCMetri
y := lp.FromInfluxMetric(c)
if err == nil {
m.statsProcessedMetrics++
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
output <- y
}
}
m.statsProcessedCommands++
stats.ComponentStatInt(m.name, "processed_commands", m.statsProcessedCommands)
}
for _, file := range m.files {
buffer, err := ioutil.ReadFile(file)
@@ -122,9 +133,13 @@ func (m *CustomCmdCollector) Read(interval time.Duration, output chan lp.CCMetri
}
y := lp.FromInfluxMetric(f)
if err == nil {
m.statsProcessedMetrics++
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
output <- y
}
}
m.statsProcessedFiles++
stats.ComponentStatInt(m.name, "processed_files", m.statsProcessedFiles)
}
}

View File

@@ -3,6 +3,7 @@ package collectors
import (
"bufio"
"encoding/json"
"fmt"
"os"
"strings"
"syscall"
@@ -10,6 +11,7 @@ import (
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
)
// "log"
@@ -22,9 +24,8 @@ type DiskstatCollectorConfig struct {
type DiskstatCollector struct {
metricCollector
//matches map[string]int
config IOstatCollectorConfig
//devices map[string]IOstatCollectorEntry
config DiskstatCollectorConfig
statsProcessedMetrics int64
}
func (m *DiskstatCollector) Init(config json.RawMessage) error {
@@ -43,6 +44,7 @@ func (m *DiskstatCollector) Init(config json.RawMessage) error {
return err
}
defer file.Close()
m.statsProcessedMetrics = 0
m.init = true
return nil
}
@@ -80,19 +82,24 @@ func (m *DiskstatCollector) Read(interval time.Duration, output chan lp.CCMetric
stat := syscall.Statfs_t{}
err := syscall.Statfs(path, &stat)
if err != nil {
continue
fmt.Println(err.Error())
return
}
tags := map[string]string{"type": "node", "device": linefields[0]}
total := (stat.Blocks * uint64(stat.Bsize)) / uint64(1000000000)
y, err := lp.New("disk_total", tags, m.meta, map[string]interface{}{"value": total}, time.Now())
if err == nil {
y.AddMeta("unit", "GBytes")
m.statsProcessedMetrics++
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
output <- y
}
free := (stat.Bfree * uint64(stat.Bsize)) / uint64(1000000000)
y, err = lp.New("disk_free", tags, m.meta, map[string]interface{}{"value": free}, time.Now())
if err == nil {
y.AddMeta("unit", "GBytes")
m.statsProcessedMetrics++
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
output <- y
}
perc := (100 * (total - free)) / total
@@ -103,6 +110,8 @@ func (m *DiskstatCollector) Read(interval time.Duration, output chan lp.CCMetric
y, err := lp.New("part_max_used", map[string]string{"type": "node"}, m.meta, map[string]interface{}{"value": int(part_max_used)}, time.Now())
if err == nil {
y.AddMeta("unit", "percent")
m.statsProcessedMetrics++
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
output <- y
}
}

View File

@@ -15,6 +15,7 @@ import (
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
)
const DEFAULT_GPFS_CMD = "mmpmon"
@@ -32,9 +33,10 @@ type GpfsCollector struct {
ExcludeFilesystem []string `json:"exclude_filesystem,omitempty"`
SendBandwidths bool `json:"send_bandwidths"`
}
skipFS map[string]struct{}
lastTimestamp time.Time // Store time stamp of last tick to derive bandwidths
lastState map[string]GpfsCollectorLastState
skipFS map[string]struct{}
lastTimestamp time.Time // Store time stamp of last tick to derive bandwidths
lastState map[string]GpfsCollectorLastState
statsProcessedMetrics int64
}
func (m *GpfsCollector) Init(config json.RawMessage) error {
@@ -70,7 +72,6 @@ func (m *GpfsCollector) Init(config json.RawMessage) error {
for _, fs := range m.config.ExcludeFilesystem {
m.skipFS[fs] = struct{}{}
}
m.lastState = make(map[string]GpfsCollectorLastState)
// GPFS / IBM Spectrum Scale file system statistics can only be queried by user root
user, err := user.Current()
@@ -87,7 +88,7 @@ func (m *GpfsCollector) Init(config json.RawMessage) error {
return fmt.Errorf("failed to find mmpmon binary '%s': %v", m.config.Mmpmon, err)
}
m.config.Mmpmon = p
m.statsProcessedMetrics = 0
m.init = true
return nil
}
@@ -163,16 +164,11 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) {
continue
}
// Add filesystem tag
m.tags["filesystem"] = filesystem
// Create initial last state
if m.config.SendBandwidths {
if _, ok := m.lastState[filesystem]; !ok {
m.lastState[filesystem] = GpfsCollectorLastState{
bytesRead: -1,
bytesWritten: -1,
}
if _, ok := m.lastState[filesystem]; !ok {
m.lastState[filesystem] = GpfsCollectorLastState{
bytesRead: -1,
bytesWritten: -1,
}
}
@@ -217,12 +213,14 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) {
}
if y, err := lp.New("gpfs_bytes_read", m.tags, m.meta, map[string]interface{}{"value": bytesRead}, timestamp); err == nil {
output <- y
m.statsProcessedMetrics++
}
if m.config.SendBandwidths {
if lastBytesRead := m.lastState[filesystem].bytesRead; lastBytesRead >= 0 {
bwRead := float64(bytesRead-lastBytesRead) / timeDiff
if y, err := lp.New("gpfs_bw_read", m.tags, m.meta, map[string]interface{}{"value": bwRead}, timestamp); err == nil {
output <- y
m.statsProcessedMetrics++
}
}
}
@@ -237,12 +235,14 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) {
}
if y, err := lp.New("gpfs_bytes_written", m.tags, m.meta, map[string]interface{}{"value": bytesWritten}, timestamp); err == nil {
output <- y
m.statsProcessedMetrics++
}
if m.config.SendBandwidths {
if lastBytesWritten := m.lastState[filesystem].bytesRead; lastBytesWritten >= 0 {
bwWrite := float64(bytesWritten-lastBytesWritten) / timeDiff
if y, err := lp.New("gpfs_bw_write", m.tags, m.meta, map[string]interface{}{"value": bwWrite}, timestamp); err == nil {
output <- y
m.statsProcessedMetrics++
}
}
}
@@ -264,6 +264,7 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) {
}
if y, err := lp.New("gpfs_num_opens", m.tags, m.meta, map[string]interface{}{"value": numOpens}, timestamp); err == nil {
output <- y
m.statsProcessedMetrics++
}
// number of closes
@@ -276,6 +277,7 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) {
}
if y, err := lp.New("gpfs_num_closes", m.tags, m.meta, map[string]interface{}{"value": numCloses}, timestamp); err == nil {
output <- y
m.statsProcessedMetrics++
}
// number of reads
@@ -288,6 +290,7 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) {
}
if y, err := lp.New("gpfs_num_reads", m.tags, m.meta, map[string]interface{}{"value": numReads}, timestamp); err == nil {
output <- y
m.statsProcessedMetrics++
}
// number of writes
@@ -300,6 +303,7 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) {
}
if y, err := lp.New("gpfs_num_writes", m.tags, m.meta, map[string]interface{}{"value": numWrites}, timestamp); err == nil {
output <- y
m.statsProcessedMetrics++
}
// number of read directories
@@ -312,6 +316,7 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) {
}
if y, err := lp.New("gpfs_num_readdirs", m.tags, m.meta, map[string]interface{}{"value": numReaddirs}, timestamp); err == nil {
output <- y
m.statsProcessedMetrics++
}
// Number of inode updates
@@ -323,9 +328,11 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) {
continue
}
if y, err := lp.New("gpfs_num_inode_updates", m.tags, m.meta, map[string]interface{}{"value": numInodeUpdates}, timestamp); err == nil {
m.statsProcessedMetrics++
output <- y
}
}
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
}
func (m *GpfsCollector) Close() {

View File

@@ -7,6 +7,7 @@ import (
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
"golang.org/x/sys/unix"
"encoding/json"
@@ -39,8 +40,9 @@ type InfinibandCollector struct {
SendAbsoluteValues bool `json:"send_abs_values"` // Send absolut values as read from sys filesystem
SendDerivedValues bool `json:"send_derived_values"` // Send derived values e.g. rates
}
info []*InfinibandCollectorInfo
lastTimestamp time.Time // Store time stamp of last tick to derive bandwidths
info []*InfinibandCollectorInfo
lastTimestamp time.Time // Store time stamp of last tick to derive bandwidths
statsProcessedMetrics int64
}
// Init initializes the Infiniband collector by walking through files below IB_BASEPATH
@@ -149,7 +151,7 @@ func (m *InfinibandCollector) Init(config json.RawMessage) error {
if len(m.info) == 0 {
return fmt.Errorf("found no IB devices")
}
m.statsProcessedMetrics = 0
m.init = true
return nil
}
@@ -196,6 +198,7 @@ func (m *InfinibandCollector) Read(interval time.Duration, output chan lp.CCMetr
if y, err := lp.New(counterName, info.tagSet, m.meta, map[string]interface{}{"value": v}, now); err == nil {
y.AddMeta("unit", counterDef.unit)
output <- y
m.statsProcessedMetrics++
}
}
@@ -206,6 +209,7 @@ func (m *InfinibandCollector) Read(interval time.Duration, output chan lp.CCMetr
if y, err := lp.New(counterName+"_bw", info.tagSet, m.meta, map[string]interface{}{"value": rate}, now); err == nil {
y.AddMeta("unit", counterDef.unit+"/sec")
output <- y
m.statsProcessedMetrics++
}
}
// Save current state
@@ -214,6 +218,7 @@ func (m *InfinibandCollector) Read(interval time.Duration, output chan lp.CCMetr
}
}
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
}
func (m *InfinibandCollector) Close() {

View File

@@ -6,6 +6,7 @@ import (
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
// "log"
"encoding/json"
@@ -29,9 +30,10 @@ type IOstatCollectorEntry struct {
type IOstatCollector struct {
metricCollector
matches map[string]int
config IOstatCollectorConfig
devices map[string]IOstatCollectorEntry
matches map[string]int
config IOstatCollectorConfig
devices map[string]IOstatCollectorEntry
statsProcessedMetrics int64
}
func (m *IOstatCollector) Init(config json.RawMessage) error {
@@ -102,6 +104,7 @@ func (m *IOstatCollector) Init(config json.RawMessage) error {
lastValues: values,
}
}
m.statsProcessedMetrics = 0
m.init = true
return err
}
@@ -141,6 +144,7 @@ func (m *IOstatCollector) Read(interval time.Duration, output chan lp.CCMetric)
y, err := lp.New(name, entry.tags, m.meta, map[string]interface{}{"value": int(diff)}, time.Now())
if err == nil {
output <- y
m.statsProcessedMetrics++
}
}
entry.lastValues[name] = x
@@ -148,6 +152,7 @@ func (m *IOstatCollector) Read(interval time.Duration, output chan lp.CCMetric)
}
m.devices[device] = entry
}
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
}
func (m *IOstatCollector) Close() {

View File

@@ -11,6 +11,7 @@ import (
"time"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
)
const IPMITOOL_PATH = `ipmitool`
@@ -26,9 +27,10 @@ type IpmiCollector struct {
metricCollector
//tags map[string]string
//matches map[string]string
config IpmiCollectorConfig
ipmitool string
ipmisensors string
config IpmiCollectorConfig
ipmitool string
ipmisensors string
statsProcessedMetrics int64
}
func (m *IpmiCollector) Init(config json.RawMessage) error {
@@ -56,6 +58,7 @@ func (m *IpmiCollector) Init(config json.RawMessage) error {
if len(m.ipmitool) == 0 && len(m.ipmisensors) == 0 {
return errors.New("no IPMI reader found")
}
m.statsProcessedMetrics = 0
m.init = true
return nil
}
@@ -94,6 +97,7 @@ func (m *IpmiCollector) readIpmiTool(cmd string, output chan lp.CCMetric) {
if err == nil {
y.AddMeta("unit", unit)
output <- y
m.statsProcessedMetrics++
}
}
}
@@ -123,6 +127,7 @@ func (m *IpmiCollector) readIpmiSensors(cmd string, output chan lp.CCMetric) {
y.AddMeta("unit", lv[4])
}
output <- y
m.statsProcessedMetrics++
}
}
}
@@ -141,6 +146,7 @@ func (m *IpmiCollector) Read(interval time.Duration, output chan lp.CCMetric) {
m.readIpmiSensors(m.config.IpmisensorsPath, output)
}
}
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
}
func (m *IpmiCollector) Close() {

View File

@@ -28,6 +28,7 @@ import (
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
topo "github.com/ClusterCockpit/cc-metric-collector/internal/ccTopology"
agg "github.com/ClusterCockpit/cc-metric-collector/internal/metricAggregator"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
"github.com/NVIDIA/go-nvml/pkg/dl"
)
@@ -72,18 +73,21 @@ type LikwidCollectorConfig struct {
type LikwidCollector struct {
metricCollector
cpulist []C.int
cpu2tid map[int]int
sock2tid map[int]int
metrics map[C.int]map[string]int
groups []C.int
config LikwidCollectorConfig
gmresults map[int]map[string]float64
basefreq float64
running bool
initialized bool
likwidGroups map[C.int]LikwidEventsetConfig
lock sync.Mutex
cpulist []C.int
cpu2tid map[int]int
sock2tid map[int]int
metrics map[C.int]map[string]int
groups []C.int
config LikwidCollectorConfig
gmresults map[int]map[string]float64
basefreq float64
running bool
initialized bool
likwidGroups map[C.int]LikwidEventsetConfig
lock sync.Mutex
statsMeasurements int64
statsProcessedMetrics int64
statsPublishedMetrics int64
}
type LikwidMetric struct {
@@ -267,6 +271,9 @@ func (m *LikwidCollector) Init(config json.RawMessage) error {
cclog.ComponentError(m.name, err.Error())
return err
}
m.statsMeasurements = 0
m.statsProcessedMetrics = 0
m.statsPublishedMetrics = 0
m.init = true
return nil
}
@@ -274,6 +281,7 @@ func (m *LikwidCollector) Init(config json.RawMessage) error {
// take a measurement for 'interval' seconds of event set index 'group'
func (m *LikwidCollector) takeMeasurement(evset LikwidEventsetConfig, interval time.Duration) (bool, error) {
var ret C.int
m.lock.Lock()
if m.initialized {
ret = C.perfmon_setupCounters(evset.gid)
@@ -317,6 +325,8 @@ func (m *LikwidCollector) takeMeasurement(evset LikwidEventsetConfig, interval t
}
}
m.lock.Unlock()
m.statsMeasurements++
stats.ComponentStatInt(m.name, "measurements", m.statsMeasurements)
return false, nil
}
@@ -329,11 +339,7 @@ func (m *LikwidCollector) calcEventsetMetrics(evset LikwidEventsetConfig, interv
gctr := C.GoString(counter)
for _, tid := range m.cpu2tid {
res := C.perfmon_getLastResult(evset.gid, C.int(eidx), C.int(tid))
fres := float64(res)
if m.config.InvalidToZero && (math.IsNaN(fres) || math.IsInf(fres, 0)) {
fres = 0.0
}
evset.results[tid][gctr] = fres
evset.results[tid][gctr] = float64(res)
evset.results[tid]["time"] = interval.Seconds()
evset.results[tid]["inverseClock"] = invClock
}
@@ -352,12 +358,17 @@ func (m *LikwidCollector) calcEventsetMetrics(evset LikwidEventsetConfig, interv
value, err := agg.EvalFloat64Condition(metric.Calc, evset.results[tid])
if err != nil {
cclog.ComponentError(m.name, "Calculation for metric", metric.Name, "failed:", err.Error())
value = 0.0
}
if m.config.InvalidToZero && (math.IsNaN(value) || math.IsInf(value, 0)) {
value = 0.0
continue
}
evset.metrics[tid][metric.Name] = value
if m.config.InvalidToZero && math.IsNaN(value) {
value = 0.0
}
if m.config.InvalidToZero && math.IsInf(value, 0) {
value = 0.0
}
m.statsProcessedMetrics++
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
// Now we have the result, send it with the proper tags
if !math.IsNaN(value) {
if metric.Publish {
@@ -370,6 +381,8 @@ func (m *LikwidCollector) calcEventsetMetrics(evset LikwidEventsetConfig, interv
if len(metric.Unit) > 0 {
y.AddMeta("unit", metric.Unit)
}
m.statsPublishedMetrics++
stats.ComponentStatInt(m.name, "published_metrics", m.statsPublishedMetrics)
output <- y
}
}
@@ -401,12 +414,17 @@ func (m *LikwidCollector) calcGlobalMetrics(interval time.Duration, output chan
value, err := agg.EvalFloat64Condition(metric.Calc, params)
if err != nil {
cclog.ComponentError(m.name, "Calculation for metric", metric.Name, "failed:", err.Error())
value = 0.0
}
if m.config.InvalidToZero && (math.IsNaN(value) || math.IsInf(value, 0)) {
value = 0.0
continue
}
m.gmresults[tid][metric.Name] = value
if m.config.InvalidToZero && math.IsNaN(value) {
value = 0.0
}
if m.config.InvalidToZero && math.IsInf(value, 0) {
value = 0.0
}
m.statsProcessedMetrics++
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
// Now we have the result, send it with the proper tags
if !math.IsNaN(value) {
if metric.Publish {
@@ -420,6 +438,8 @@ func (m *LikwidCollector) calcGlobalMetrics(interval time.Duration, output chan
if len(metric.Unit) > 0 {
y.AddMeta("unit", metric.Unit)
}
m.statsPublishedMetrics++
stats.ComponentStatInt(m.name, "published_metrics", m.statsPublishedMetrics)
output <- y
}
}

View File

@@ -3,63 +3,32 @@
The `likwid` collector is probably the most complicated collector. The LIKWID library is included as static library with *direct* access mode. The *direct* access mode is suitable if the daemon is executed by a root user. The static library does not contain the performance groups, so all information needs to be provided in the configuration.
```json
"likwid": {
"force_overwrite" : false,
"invalid_to_zero" : false,
"eventsets": [
{
"events" : {
"COUNTER0": "EVENT0",
"COUNTER1": "EVENT1",
},
"metrics" : [
{
"name": "sum_01",
"calc": "COUNTER0 + COUNTER1",
"publish": false,
"unit": "myunit",
"type": "cpu"
}
]
}
]
"globalmetrics" : [
{
"name": "global_sum",
"calc": "sum_01",
"publish": true,
"unit": "myunit",
"type": "cpu"
}
]
}
```
The `likwid` configuration consists of two parts, the `eventsets` and `globalmetrics`:
- An event set list itself has two parts, the `events` and a set of derivable `metrics`. Each of the `events` is a `counter:event` pair in LIKWID's syntax. The `metrics` are a list of formulas to derive the metric value from the measurements of the `events`' values. Each metric has a name, the formula, a type and a publish flag. There is an optional `unit` field. Counter names can be used like variables in the formulas, so `PMC0+PMC1` sums the measurements for the both events configured in the counters `PMC0` and `PMC1`. You can optionally use `time` for the measurement time and `inverseClock` for `1.0/baseCpuFrequency`. The type tells the LikwidCollector whether it is a metric for each hardware thread (`cpu`) or each CPU socket (`socket`). You may specify a unit for the metric with `unit`. The last one is the publishing flag. It tells the LikwidCollector whether a metric should be sent to the router or is only used internally to compute a global metric.
- The `globalmetrics` are metrics which require data from multiple event set measurements to be derived. The inputs are the metrics in the event sets. Similar to the metrics in the event sets, the global metrics are defined by a name, a formula, a scope and a publish flag. See event set metrics for details. The only difference is that there is no access to the raw event measurements anymore but only to the metrics. Also `time` and `inverseClock` cannot be used anymore. So, the idea is to derive a metric in the `eventsets` section and reuse it in the `globalmetrics` part. If you need a metric only for deriving the global metrics, disable forwarding of the event set metrics (`"publish": false`). **Be aware** that the combination might be misleading because the "behavior" of a metric changes over time and the multiple measurements might count different computing phases. Similar to the metrics in the eventset, you can specify a metric unit with the `unit` field.
The `likwid` configuration consists of two parts, the "eventsets" and "globalmetrics":
- An event set list itself has two parts, the "events" and a set of derivable "metrics". Each of the "events" is a counter:event pair in LIKWID's syntax. The "metrics" are a list of formulas to derive the metric value from the measurements of the "events". Each metric has a name, the formula, a scope and a publish flag. Counter names can be used like variables in the formulas, so `PMC0+PMC1` sums the measurements for the both events configured in the counters `PMC0` and `PMC1`. The scope tells the Collector whether it is a metric for each hardware thread (`cpu`) or each CPU socket (`socket`). You may specify a unit for the metric with `unit`. The last one is the publishing flag. It tells the collector whether a metric should be sent to the router.
- The global metrics are metrics which require data from all event set measurements to be derived. The inputs are the metrics in the event sets. Similar to the metrics in the event sets, the global metrics are defined by a name, a formula, a scope and a publish flag. See event set metrics for details. The only difference is that there is no access to the raw event measurements anymore but only to the metrics. So, the idea is to derive a metric in the "eventsets" section and reuse it in the "globalmetrics" part. If you need a metric only for deriving the global metrics, disable forwarding of the event set metrics (`publish=false`). **Be aware** that the combination might be misleading because the "behavior" of a metric changes over time and the multiple measurements might count different computing phases. Similar to the metrics in the eventset, you can specify a metric unit with the `unit` field.
Additional options:
- `access_mode` : Method to use for hardware performance monitoring (`direct` access as root user, `accessdaemon` for the daemon mode)
- `accessdaemon_path`: Folder with the access daemon `likwid-accessD`, commonly `$LIKWID_INSTALL_LOC/sbin`
- `force_overwrite`: Same as setting `LIKWID_FORCE=1`. In case counters are already in-use, LIKWID overwrites their configuration to do its measurements
- `invalid_to_zero`: In some cases, the calculations result in `NaN` or `Inf`. With this option, all `NaN` and `Inf` values are replaces with `0.0`. See below in [seperate section](./likwidMetric.md#invalid_to_zero-option)
- `access_mode`: Specify LIKWID access mode: `direct` for direct register access as root user or `accessdaemon`. The access mode `perf_event` is current untested.
- `accessdaemon_path`: Folder of the accessDaemon `likwid-accessD` (like `/usr/local/sbin`)
- `liblikwid_path`: Location of `liblikwid.so` including file name like `/usr/local/lib/liblikwid.so`
- `invalid_to_zero`: In some cases, the calculations result in `NaN` or `Inf`. With this option, all `NaN` and `Inf` values are replaces with `0.0`.
- `access_mode`: Specify LIKWID access mode: `direct` for direct register access as root user or `accessdaemon`
- `accessdaemon_path`: Folder of the accessDaemon `likwid-accessD`
- `liblikwid_path`: Location of `liblikwid.so`
### Available metric scopes
Hardware performance counters are scattered all over the system nowadays. A counter coveres a specific part of the system. While there are hardware thread specific counter for CPU cycles, instructions and so on, some others are specific for a whole CPU socket/package. To address that, the LikwidCollector provides the specification of a `type` for each metric.
Hardware performance counters are scattered all over the system nowadays. A counter coveres a specific part of the system. While there are hardware thread specific counter for CPU cycles, instructions and so on, some others are specific for a whole CPU socket/package. To address that, the collector provides the specification of a 'scope' for each metric.
- `cpu` : One metric per CPU hardware thread with the tags `"type" : "cpu"` and `"type-id" : "$cpu_id"`
- `socket` : One metric per CPU socket/package with the tags `"type" : "socket"` and `"type-id" : "$socket_id"`
**Note:** You should not specify the `socket` type for a metric that is measured at `cpu` scope and vice versa, so some kind of expert knowledge or lookup work in the [Likwid Wiki](https://github.com/RRZE-HPC/likwid/wiki) is required. Get the scope of each counter from the *Architecture* pages and as soon as one counter in a metric is socket-specific, the whole metric is socket-specific.
**Note:** You cannot specify `socket` scope for a metric that is measured at `cpu` scope, so some kind of expert knowledge or lookup work in the [Likwid Wiki](https://github.com/RRZE-HPC/likwid/wiki) is required. Get the scope of each counter from the *Architecture* pages and as soon as one counter in a metric is socket-specific, the whole metric is socket-specific.
As a guideline:
- All counters `FIXCx`, `PMCy` and `TMAz` have the scope `cpu`
- All counters names containing `BOX` have the scope `socket`
- All `PWRx` counters have scope `socket`, except `"PWR1" : "RAPL_CORE_ENERGY"` has `cpu` scope (AMD Zen)
- All `PWRx` counters have scope `socket`, except `"PWR1" : "RAPL_CORE_ENERGY"` has `cpu` scope
- All `DFCx` counters have scope `socket`
### Help with the configuration
@@ -81,7 +50,6 @@ $ scripts/likwid_perfgroup_to_cc_config.py ICX MEM_DP
{
"events": {
"FIXC0": "INSTR_RETIRED_ANY",
"FIXC1": "CPU_CLK_UNHALTED_CORE",
"..." : "..."
},
"metrics" : [
@@ -107,28 +75,21 @@ LIKWID checks the file `/var/run/likwid.lock` before performing any interfering
Before (SLURM prolog, ...)
```
$ chown $JOBUSER /var/run/likwid.lock
$ chwon $JOBUSER /var/run/likwid.lock
```
After (SLURM epilog, ...)
```
$ chown $CCUSER /var/run/likwid.lock
$ chwon $CCUSER /var/run/likwid.lock
```
### `invalid_to_zero` option
In some cases LIKWID returns `0.0` for some events that are further used in processing and maybe used as divisor in a calculation. After evaluation of a metric, the result might be `NaN` or `+-Inf`. These resulting metrics are commonly not created and forwarded to the router because the [InfluxDB line protocol](https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol/#float) does not support these special floating-point values. If you want to have them sent, this option forces these metric values to be `0.0` instead.
One might think this does not happen often but often used metrics in the world of performance engineering like Instructions-per-Cycle (IPC) or more frequently the actual CPU clock are derived with events like `CPU_CLK_UNHALTED_CORE` (Intel) which do not increment in halted state (as the name implies). In there are different power management systems in a chip which can cause a hardware thread to go in such a state. Moreover, if no cycles are executed by the core, also many other events are not incremented as well (like `INSTR_RETIRED_ANY` for retired instructions and part of IPC).
### Example configuration
#### AMD Zen3
```json
"likwid": {
"force_overwrite" : false,
"invalid_to_zero" : false,
"nan_to_zero" : false,
"eventsets": [
{
"events": {
@@ -219,3 +180,33 @@ One might think this does not happen often but often used metrics in the world o
}
```
### How to get the eventsets and metrics from LIKWID
The `likwid` collector reads hardware performance counters at a **cpu** and **socket** level. The configuration looks quite complicated but it is basically copy&paste from [LIKWID's performance groups](https://github.com/RRZE-HPC/likwid/tree/master/groups). The collector made multiple iterations and tried to use the performance groups but it lacked flexibility. The current way of configuration provides most flexibility.
The logic is as following: There are multiple eventsets, each consisting of a list of counters+events and a list of metrics. If you compare a common performance group with the example setting above, there is not much difference:
```
EVENTSET -> "events": {
FIXC1 ACTUAL_CPU_CLOCK -> "FIXC1": "ACTUAL_CPU_CLOCK",
FIXC2 MAX_CPU_CLOCK -> "FIXC2": "MAX_CPU_CLOCK",
PMC0 RETIRED_INSTRUCTIONS -> "PMC0" : "RETIRED_INSTRUCTIONS",
PMC1 CPU_CLOCKS_UNHALTED -> "PMC1" : "CPU_CLOCKS_UNHALTED",
PMC2 RETIRED_SSE_AVX_FLOPS_ALL -> "PMC2": "RETIRED_SSE_AVX_FLOPS_ALL",
PMC3 MERGE -> "PMC3": "MERGE",
-> }
```
The metrics are following the same procedure:
```
METRICS -> "metrics": [
IPC PMC0/PMC1 -> {
-> "name" : "IPC",
-> "calc" : "PMC0/PMC1",
-> "scope": "cpu",
-> "publish": true
-> }
-> ]
```
The script `scripts/likwid_perfgroup_to_cc_config.py` might help you.

View File

@@ -10,6 +10,7 @@ import (
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
)
//
@@ -32,6 +33,7 @@ type LoadavgCollector struct {
config struct {
ExcludeMetrics []string `json:"exclude_metrics,omitempty"`
}
statsProcessedMetrics int64
}
func (m *LoadavgCollector) Init(config json.RawMessage) error {
@@ -63,6 +65,7 @@ func (m *LoadavgCollector) Init(config json.RawMessage) error {
for i, name := range m.proc_matches {
_, m.proc_skips[i] = stringArrayContains(m.config.ExcludeMetrics, name)
}
m.statsProcessedMetrics = 0
m.init = true
return nil
}
@@ -98,6 +101,7 @@ func (m *LoadavgCollector) Read(interval time.Duration, output chan lp.CCMetric)
y, err := lp.New(name, m.tags, m.meta, map[string]interface{}{"value": x}, now)
if err == nil {
output <- y
m.statsProcessedMetrics++
}
}
@@ -117,9 +121,10 @@ func (m *LoadavgCollector) Read(interval time.Duration, output chan lp.CCMetric)
y, err := lp.New(name, m.tags, m.meta, map[string]interface{}{"value": x}, now)
if err == nil {
output <- y
m.statsProcessedMetrics++
}
}
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
}
func (m *LoadavgCollector) Close() {

View File

@@ -12,6 +12,7 @@ import (
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
)
const LUSTRE_SYSFS = `/sys/fs/lustre`
@@ -37,13 +38,14 @@ type LustreMetricDefinition struct {
type LustreCollector struct {
metricCollector
tags map[string]string
config LustreCollectorConfig
lctl string
sudoCmd string
lastTimestamp time.Time // Store time stamp of last tick to derive bandwidths
definitions []LustreMetricDefinition // Combined list without excluded metrics
stats map[string]map[string]int64 // Data for last value per device and metric
tags map[string]string
config LustreCollectorConfig
lctl string
sudoCmd string
lastTimestamp time.Time // Store time stamp of last tick to derive bandwidths
definitions []LustreMetricDefinition // Combined list without excluded metrics
stats map[string]map[string]int64 // Data for last value per device and metric
statsProcessedMetrics int64
}
func (m *LustreCollector) getDeviceDataCommand(device string) []string {
@@ -372,6 +374,7 @@ func (m *LustreCollector) Init(config json.RawMessage) error {
}
}
m.lastTimestamp = time.Now()
m.statsProcessedMetrics = 0
m.init = true
return nil
}
@@ -418,11 +421,13 @@ func (m *LustreCollector) Read(interval time.Duration, output chan lp.CCMetric)
y.AddMeta("unit", def.unit)
}
output <- y
m.statsProcessedMetrics++
}
devData[def.name] = use_x
}
}
m.lastTimestamp = now
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
}
func (m *LustreCollector) Close() {

View File

@@ -14,6 +14,7 @@ import (
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
)
const MEMSTATFILE = "/proc/meminfo"
@@ -32,12 +33,13 @@ type MemstatCollectorNode struct {
type MemstatCollector struct {
metricCollector
stats map[string]int64
tags map[string]string
matches map[string]string
config MemstatCollectorConfig
nodefiles map[int]MemstatCollectorNode
sendMemUsed bool
stats map[string]int64
tags map[string]string
matches map[string]string
config MemstatCollectorConfig
nodefiles map[int]MemstatCollectorNode
sendMemUsed bool
statsProcessedMetrics int64
}
type MemstatStats struct {
@@ -153,6 +155,7 @@ func (m *MemstatCollector) Init(config json.RawMessage) error {
}
}
}
m.statsProcessedMetrics = 0
m.init = true
return err
}
@@ -178,6 +181,7 @@ func (m *MemstatCollector) Read(interval time.Duration, output chan lp.CCMetric)
if len(unit) > 0 {
y.AddMeta("unit", unit)
}
m.statsProcessedMetrics++
output <- y
}
}
@@ -207,6 +211,7 @@ func (m *MemstatCollector) Read(interval time.Duration, output chan lp.CCMetric)
if len(unit) > 0 {
y.AddMeta("unit", unit)
}
m.statsProcessedMetrics++
output <- y
}
}
@@ -223,6 +228,7 @@ func (m *MemstatCollector) Read(interval time.Duration, output chan lp.CCMetric)
sendStats(stats, nodeConf.tags)
}
}
stats.ComponentStatInt(m.name, "collected_metrics", m.statsProcessedMetrics)
}
func (m *MemstatCollector) Close() {

View File

@@ -11,6 +11,7 @@ import (
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
)
const NETSTATFILE = "/proc/net/dev"
@@ -32,9 +33,10 @@ type NetstatCollectorMetric struct {
type NetstatCollector struct {
metricCollector
config NetstatCollectorConfig
matches map[string][]NetstatCollectorMetric
lastTimestamp time.Time
config NetstatCollectorConfig
matches map[string][]NetstatCollectorMetric
lastTimestamp time.Time
statsProcessedMetrics int64
}
func (m *NetstatCollector) Init(config json.RawMessage) error {
@@ -148,6 +150,7 @@ func (m *NetstatCollector) Init(config json.RawMessage) error {
if len(m.matches) == 0 {
return errors.New("no devices to collector metrics found")
}
m.statsProcessedMetrics = 0
m.init = true
return nil
}
@@ -198,6 +201,7 @@ func (m *NetstatCollector) Read(interval time.Duration, output chan lp.CCMetric)
if m.config.SendAbsoluteValues {
if y, err := lp.New(metric.name, metric.tags, metric.meta, map[string]interface{}{"value": v}, now); err == nil {
output <- y
m.statsProcessedMetrics++
}
}
if m.config.SendDerivedValues {
@@ -205,6 +209,7 @@ func (m *NetstatCollector) Read(interval time.Duration, output chan lp.CCMetric)
rate := float64(v-metric.lastValue) / timeDiff
if y, err := lp.New(metric.name+"_bw", metric.tags, metric.meta_rates, map[string]interface{}{"value": rate}, now); err == nil {
output <- y
m.statsProcessedMetrics++
}
}
metric.lastValue = v
@@ -212,6 +217,7 @@ func (m *NetstatCollector) Read(interval time.Duration, output chan lp.CCMetric)
}
}
}
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
}
func (m *NetstatCollector) Close() {

View File

@@ -12,6 +12,7 @@ import (
"time"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
)
// First part contains the code for the general NfsCollector.
@@ -32,7 +33,8 @@ type nfsCollector struct {
Nfsstats string `json:"nfsstat"`
ExcludeMetrics []string `json:"exclude_metrics,omitempty"`
}
data map[string]NfsCollectorData
data map[string]NfsCollectorData
statsProcessedMetrics int64
}
func (m *nfsCollector) initStats() error {
@@ -113,6 +115,7 @@ func (m *nfsCollector) MainInit(config json.RawMessage) error {
}
m.data = make(map[string]NfsCollectorData)
m.initStats()
m.statsProcessedMetrics = 0
m.init = true
return nil
}
@@ -143,8 +146,10 @@ func (m *nfsCollector) Read(interval time.Duration, output chan lp.CCMetric) {
if err == nil {
y.AddMeta("version", m.version)
output <- y
m.statsProcessedMetrics++
}
}
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
}
func (m *nfsCollector) Close() {

View File

@@ -12,6 +12,7 @@ import (
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
)
//
@@ -44,7 +45,8 @@ type NUMAStatsCollectorTopolgy struct {
type NUMAStatsCollector struct {
metricCollector
topology []NUMAStatsCollectorTopolgy
topology []NUMAStatsCollectorTopolgy
statsProcessedMetrics int64
}
func (m *NUMAStatsCollector) Init(config json.RawMessage) error {
@@ -80,7 +82,7 @@ func (m *NUMAStatsCollector) Init(config json.RawMessage) error {
tagSet: map[string]string{"memoryDomain": node},
})
}
m.statsProcessedMetrics = 0
m.init = true
return nil
}
@@ -127,11 +129,13 @@ func (m *NUMAStatsCollector) Read(interval time.Duration, output chan lp.CCMetri
)
if err == nil {
output <- y
m.statsProcessedMetrics++
}
}
file.Close()
}
stats.ComponentStatInt(m.name, "collected_metrics", m.statsProcessedMetrics)
}
func (m *NUMAStatsCollector) Close() {

View File

@@ -9,6 +9,7 @@ import (
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
"github.com/NVIDIA/go-nvml/pkg/nvml"
)
@@ -26,9 +27,10 @@ type NvidiaCollectorDevice struct {
type NvidiaCollector struct {
metricCollector
num_gpus int
config NvidiaCollectorConfig
gpus []NvidiaCollectorDevice
num_gpus int
config NvidiaCollectorConfig
gpus []NvidiaCollectorDevice
statsProcessedMetrics int64
}
func (m *NvidiaCollector) CatchPanic() {
@@ -120,7 +122,7 @@ func (m *NvidiaCollector) Init(config json.RawMessage) error {
pciInfo.Device)
}
}
m.statsProcessedMetrics = 0
m.init = true
return nil
}
@@ -151,6 +153,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric)
if err == nil {
y.AddMeta("unit", "%")
output <- y
m.statsProcessedMetrics++
}
}
if !device.excludeMetrics["nv_mem_util"] {
@@ -158,6 +161,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric)
if err == nil {
y.AddMeta("unit", "%")
output <- y
m.statsProcessedMetrics++
}
}
}
@@ -186,6 +190,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric)
if err == nil {
y.AddMeta("unit", "MByte")
output <- y
m.statsProcessedMetrics++
}
}
@@ -195,6 +200,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric)
if err == nil {
y.AddMeta("unit", "MByte")
output <- y
m.statsProcessedMetrics++
}
}
}
@@ -212,6 +218,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric)
if err == nil {
y.AddMeta("unit", "degC")
output <- y
m.statsProcessedMetrics++
}
}
}
@@ -232,6 +239,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric)
if err == nil {
y.AddMeta("unit", "%")
output <- y
m.statsProcessedMetrics++
}
}
}
@@ -258,11 +266,13 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric)
}
if err == nil {
output <- y
m.statsProcessedMetrics++
}
} else if ret == nvml.ERROR_NOT_SUPPORTED {
y, err := lp.New("nv_ecc_mode", device.tags, m.meta, map[string]interface{}{"value": "N/A"}, time.Now())
if err == nil {
output <- y
m.statsProcessedMetrics++
}
}
}
@@ -280,6 +290,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric)
y, err := lp.New("nv_perf_state", device.tags, m.meta, map[string]interface{}{"value": fmt.Sprintf("P%d", int(pState))}, time.Now())
if err == nil {
output <- y
m.statsProcessedMetrics++
}
}
}
@@ -296,6 +307,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric)
if err == nil {
y.AddMeta("unit", "watts")
output <- y
m.statsProcessedMetrics++
}
}
}
@@ -313,6 +325,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric)
if err == nil {
y.AddMeta("unit", "MHz")
output <- y
m.statsProcessedMetrics++
}
}
}
@@ -324,6 +337,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric)
if err == nil {
y.AddMeta("unit", "MHz")
output <- y
m.statsProcessedMetrics++
}
}
}
@@ -335,6 +349,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric)
if err == nil {
y.AddMeta("unit", "MHz")
output <- y
m.statsProcessedMetrics++
}
}
}
@@ -357,6 +372,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric)
if err == nil {
y.AddMeta("unit", "MHz")
output <- y
m.statsProcessedMetrics++
}
}
}
@@ -368,6 +384,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric)
if err == nil {
y.AddMeta("unit", "MHz")
output <- y
m.statsProcessedMetrics++
}
}
}
@@ -379,6 +396,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric)
if err == nil {
y.AddMeta("unit", "MHz")
output <- y
m.statsProcessedMetrics++
}
}
}
@@ -398,6 +416,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric)
y, err := lp.New("nv_ecc_db_error", device.tags, m.meta, map[string]interface{}{"value": float64(ecc_db)}, time.Now())
if err == nil {
output <- y
m.statsProcessedMetrics++
}
}
}
@@ -408,6 +427,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric)
y, err := lp.New("nv_ecc_sb_error", device.tags, m.meta, map[string]interface{}{"value": float64(ecc_sb)}, time.Now())
if err == nil {
output <- y
m.statsProcessedMetrics++
}
}
}
@@ -425,6 +445,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric)
if err == nil {
y.AddMeta("unit", "watts")
output <- y
m.statsProcessedMetrics++
}
}
}
@@ -441,6 +462,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric)
if err == nil {
y.AddMeta("unit", "%")
output <- y
m.statsProcessedMetrics++
}
}
}
@@ -457,11 +479,12 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric)
if err == nil {
y.AddMeta("unit", "%")
output <- y
m.statsProcessedMetrics++
}
}
}
}
stats.ComponentStatInt(m.name, "collected_metrics", m.statsProcessedMetrics)
}
func (m *NvidiaCollector) Close() {

View File

@@ -6,6 +6,7 @@ import (
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
)
// These are the fields we read from the JSON configuration
@@ -17,9 +18,10 @@ type SampleCollectorConfig struct {
// defined by metricCollector (name, init, ...)
type SampleCollector struct {
metricCollector
config SampleTimerCollectorConfig // the configuration structure
meta map[string]string // default meta information
tags map[string]string // default tags
config SampleTimerCollectorConfig // the configuration structure
meta map[string]string // default meta information
tags map[string]string // default tags
statsCount int64
}
// Functions to implement MetricCollector interface
@@ -58,6 +60,9 @@ func (m *SampleCollector) Init(config json.RawMessage) error {
// for all topological entities (sockets, NUMA domains, ...)
// Return some useful error message in case of any failures
// Initialize counts for statistics
m.statsCount = 0
// Set this flag only if everything is initialized properly, all required files exist, ...
m.init = true
return err
@@ -80,8 +85,11 @@ func (m *SampleCollector) Read(interval time.Duration, output chan lp.CCMetric)
if err == nil {
// Send it to output channel
output <- y
// increment count for each sent metric or any other operation
m.statsCount++
}
// Set stats for the component
stats.ComponentStatInt(m.name, "count", m.statsCount)
}
// Close metric collector: close network connection, close files, close libraries, ...

View File

@@ -11,6 +11,7 @@ import (
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
)
// See: https://www.kernel.org/doc/html/latest/hwmon/sysfs-interface.html
@@ -40,7 +41,8 @@ type TempCollector struct {
ReportMaxTemp bool `json:"report_max_temperature"`
ReportCriticalTemp bool `json:"report_critical_temperature"`
}
sensors []*TempCollectorSensor
sensors []*TempCollectorSensor
statsProcessedMetrics int64
}
func (m *TempCollector) Init(config json.RawMessage) error {
@@ -162,6 +164,7 @@ func (m *TempCollector) Init(config json.RawMessage) error {
}
// Finished initialization
m.statsProcessedMetrics = 0
m.init = true
return nil
}
@@ -194,6 +197,7 @@ func (m *TempCollector) Read(interval time.Duration, output chan lp.CCMetric) {
)
if err == nil {
output <- y
m.statsProcessedMetrics++
}
// max temperature
@@ -207,6 +211,7 @@ func (m *TempCollector) Read(interval time.Duration, output chan lp.CCMetric) {
)
if err == nil {
output <- y
m.statsProcessedMetrics++
}
}
@@ -221,10 +226,11 @@ func (m *TempCollector) Read(interval time.Duration, output chan lp.CCMetric) {
)
if err == nil {
output <- y
m.statsProcessedMetrics++
}
}
}
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
}
func (m *TempCollector) Close() {

View File

@@ -10,6 +10,7 @@ import (
"time"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
)
const MAX_NUM_PROCS = 10
@@ -21,8 +22,9 @@ type TopProcsCollectorConfig struct {
type TopProcsCollector struct {
metricCollector
tags map[string]string
config TopProcsCollectorConfig
tags map[string]string
config TopProcsCollectorConfig
statsProcessedMetrics int64
}
func (m *TopProcsCollector) Init(config json.RawMessage) error {
@@ -48,6 +50,7 @@ func (m *TopProcsCollector) Init(config json.RawMessage) error {
if err != nil {
return errors.New("failed to execute command")
}
m.statsProcessedMetrics = 0
m.init = true
return nil
}
@@ -70,8 +73,10 @@ func (m *TopProcsCollector) Read(interval time.Duration, output chan lp.CCMetric
y, err := lp.New(name, m.tags, m.meta, map[string]interface{}{"value": string(lines[i])}, time.Now())
if err == nil {
output <- y
m.statsProcessedMetrics++
}
}
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
}
func (m *TopProcsCollector) Close() {

21
go.mod
View File

@@ -3,14 +3,17 @@ module github.com/ClusterCockpit/cc-metric-collector
go 1.16
require (
github.com/NVIDIA/go-nvml v0.11.6-0
github.com/PaesslerAG/gval v1.1.2
github.com/gorilla/mux v1.8.0
github.com/influxdata/influxdb-client-go/v2 v2.8.1
github.com/NVIDIA/go-nvml v0.11.1-0
github.com/influxdata/influxdb-client-go/v2 v2.7.0
github.com/influxdata/line-protocol v0.0.0-20210922203350-b1ad95c89adf
github.com/nats-io/nats-server/v2 v2.8.0 // indirect
github.com/nats-io/nats.go v1.14.0
github.com/prometheus/client_golang v1.12.1
github.com/stmcginnis/gofish v0.13.0
golang.org/x/sys v0.0.0-20220412211240-33da011f77ad
github.com/nats-io/nats.go v1.13.1-0.20211122170419-d7c1d78a50fc
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9
gopkg.in/Knetic/govaluate.v2 v2.3.0
)
require (
github.com/PaesslerAG/gval v1.1.2
github.com/golang/protobuf v1.5.2 // indirect
github.com/nats-io/nats-server/v2 v2.7.0 // indirect
google.golang.org/protobuf v1.27.1 // indirect
)

View File

@@ -0,0 +1,17 @@
# Stats API
The Stats API can be used for debugging. It publishes counts at an HTTP endpoint as JSON from different componenets of the CC Metric Collector.
# Configuration
The Stats API has an own configuration file to specify the listen host and port. The defaults are `localhost` and `8080`.
```json
{
"bindhost" : "",
"port" : "8080",
"publish_collectorstate" : true
}
```
The `bindhost` and `port` can be used to specify the listen host and port. The `publish_collectorstate` needs to be `true`, otherwise nothing is presented. This option is for future use if we need to publish more infos using different domains.

View File

@@ -0,0 +1,232 @@
package metricRouter
import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"sync"
"time"
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
mct "github.com/ClusterCockpit/cc-metric-collector/internal/multiChanTicker"
"github.com/gorilla/mux"
)
type statsApiConfig struct {
PublishCollectorState bool `json:"publish_collectorstate"`
Host string `json:"bindhost"`
Port string `json:"port"`
}
// Metric cache data structure
type statsApi struct {
name string
input chan lp.CCMetric
indone chan bool
outdone chan bool
config statsApiConfig
wg *sync.WaitGroup
statsWg sync.WaitGroup
ticker mct.MultiChanTicker
tickchan chan time.Time
server *http.Server
router *mux.Router
lock sync.Mutex
baseurl string
stats map[string]map[string]int64
outStats map[string]map[string]int64
}
type StatsApi interface {
Start()
Close()
StatsFunc(w http.ResponseWriter, r *http.Request)
}
var statsApiServer *statsApi = nil
func (a *statsApi) updateStats(point lp.CCMetric) {
switch point.Name() {
case "_stats":
if name, nok := point.GetMeta("source"); nok {
var compStats map[string]int64
var ok bool
if compStats, ok = a.stats[name]; !ok {
a.stats[name] = make(map[string]int64)
compStats = a.stats[name]
}
for k, v := range point.Fields() {
switch value := v.(type) {
case int:
compStats[k] = int64(value)
case uint:
compStats[k] = int64(value)
case int32:
compStats[k] = int64(value)
case uint32:
compStats[k] = int64(value)
case int64:
compStats[k] = int64(value)
case uint64:
compStats[k] = int64(value)
default:
cclog.ComponentDebug(a.name, "Unusable stats for", k, ". Values should be int64")
}
}
a.stats[name] = compStats
}
}
}
func (a *statsApi) Start() {
a.ticker.AddChannel(a.tickchan)
a.wg.Add(1)
a.statsWg.Add(1)
go func() {
a.stats = make(map[string]map[string]int64)
defer a.statsWg.Done()
for {
select {
case <-a.indone:
cclog.ComponentDebug(a.name, "INPUT DONE")
close(a.indone)
return
case p := <-a.input:
a.lock.Lock()
a.updateStats(p)
a.lock.Unlock()
}
}
}()
a.statsWg.Add(1)
go func() {
a.outStats = make(map[string]map[string]int64)
defer a.statsWg.Done()
a.lock.Lock()
for comp, compData := range a.stats {
var outData map[string]int64
var ok bool
if outData, ok = a.outStats[comp]; !ok {
outData = make(map[string]int64)
}
for k, v := range compData {
outData[k] = v
}
a.outStats[comp] = outData
}
a.lock.Unlock()
for {
select {
case <-a.outdone:
cclog.ComponentDebug(a.name, "OUTPUT DONE")
close(a.outdone)
return
case <-a.tickchan:
a.lock.Lock()
for comp, compData := range a.stats {
var outData map[string]int64
var ok bool
if outData, ok = a.outStats[comp]; !ok {
outData = make(map[string]int64)
}
for k, v := range compData {
outData[k] = v
}
a.outStats[comp] = outData
}
a.lock.Unlock()
}
}
}()
a.statsWg.Add(1)
go func() {
defer a.statsWg.Done()
err := a.server.ListenAndServe()
if err != nil && err.Error() != "http: Server closed" {
cclog.ComponentError(a.name, err.Error())
}
cclog.ComponentDebug(a.name, "SERVER DONE")
}()
cclog.ComponentDebug(a.name, "STARTED")
}
func (a *statsApi) StatsFunc(w http.ResponseWriter, r *http.Request) {
data, err := json.Marshal(a.outStats)
if err == nil {
w.Header().Set("Content-Type", "application/json")
io.WriteString(w, string(data))
}
}
// Close finishes / stops the metric cache
func (a *statsApi) Close() {
cclog.ComponentDebug(a.name, "CLOSE")
a.indone <- true
a.outdone <- true
a.server.Shutdown(context.Background())
// wait for close of channel r.done
<-a.indone
<-a.outdone
a.statsWg.Wait()
a.wg.Done()
//a.wg.Wait()
}
func NewStatsApi(ticker mct.MultiChanTicker, wg *sync.WaitGroup, statsApiConfigfile string) (StatsApi, error) {
a := new(statsApi)
a.name = "StatsApi"
a.config.Host = "localhost"
a.config.Port = "8080"
configFile, err := os.Open(statsApiConfigfile)
if err != nil {
cclog.ComponentError(a.name, err.Error())
return nil, err
}
defer configFile.Close()
jsonParser := json.NewDecoder(configFile)
err = jsonParser.Decode(&a.config)
if err != nil {
cclog.ComponentError(a.name, err.Error())
return nil, err
}
a.input = make(chan lp.CCMetric)
a.ticker = ticker
a.tickchan = make(chan time.Time)
a.wg = wg
a.indone = make(chan bool)
a.outdone = make(chan bool)
a.router = mux.NewRouter()
a.baseurl = fmt.Sprintf("%s:%s", a.config.Host, a.config.Port)
a.server = &http.Server{Addr: a.baseurl, Handler: a.router}
if a.config.PublishCollectorState {
a.router.HandleFunc("/", a.StatsFunc)
}
statsApiServer = a
return a, nil
}
func ComponentStatInt(component string, key string, value int64) {
if statsApiServer == nil {
return
}
y, err := lp.New("_stats", map[string]string{}, map[string]string{"source": component}, map[string]interface{}{key: value}, time.Now())
if err == nil {
statsApiServer.input <- y
}
}
func ComponentStatString(component string, key string, value int64) {
if statsApiServer == nil {
return
}
y, err := lp.New("_stats", map[string]string{}, map[string]string{"source": component}, map[string]interface{}{key: value}, time.Now())
if err == nil {
statsApiServer.input <- y
}
}

View File

@@ -40,19 +40,26 @@ type metricRouterConfig struct {
// Metric router data structure
type metricRouter struct {
hostname string // Hostname used in tags
coll_input chan lp.CCMetric // Input channel from CollectorManager
recv_input chan lp.CCMetric // Input channel from ReceiveManager
cache_input chan lp.CCMetric // Input channel from MetricCache
outputs []chan lp.CCMetric // List of all output channels
done chan bool // channel to finish / stop metric router
wg *sync.WaitGroup // wait group for all goroutines in cc-metric-collector
timestamp time.Time // timestamp periodically updated by ticker each interval
ticker mct.MultiChanTicker // periodically ticking once each interval
config metricRouterConfig // json encoded config for metric router
cache MetricCache // pointer to MetricCache
cachewg sync.WaitGroup // wait group for MetricCache
maxForward int // number of metrics to forward maximally in one iteration
hostname string // Hostname used in tags
coll_input chan lp.CCMetric // Input channel from CollectorManager
recv_input chan lp.CCMetric // Input channel from ReceiveManager
cache_input chan lp.CCMetric // Input channel from MetricCache
outputs []chan lp.CCMetric // List of all output channels
done chan bool // channel to finish / stop metric router
wg *sync.WaitGroup // wait group for all goroutines in cc-metric-collector
timestamp time.Time // timestamp periodically updated by ticker each interval
timerdone chan bool // channel to finish / stop timestamp updater
ticker mct.MultiChanTicker // periodically ticking once each interval
config metricRouterConfig // json encoded config for metric router
cache MetricCache // pointer to MetricCache
cachewg sync.WaitGroup // wait group for MetricCache
maxForward int // number of metrics to forward maximally in one iteration
statsCollForward int64
statsRecvForward int64
statsCacheForward int64
statsTotalForward int64
statsDropped int64
statsRenamed int64
}
// MetricRouter access functions
@@ -120,9 +127,39 @@ func (r *metricRouter) Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, rout
for _, mname := range r.config.DropMetrics {
r.config.dropMetrics[mname] = true
}
r.statsCollForward = 0
r.statsRecvForward = 0
r.statsCacheForward = 0
r.statsTotalForward = 0
r.statsDropped = 0
r.statsRenamed = 0
return nil
}
// StartTimer starts a timer which updates timestamp periodically
func (r *metricRouter) StartTimer() {
m := make(chan time.Time)
r.ticker.AddChannel(m)
r.timerdone = make(chan bool)
r.wg.Add(1)
go func() {
defer r.wg.Done()
for {
select {
case <-r.timerdone:
close(r.timerdone)
cclog.ComponentDebug("MetricRouter", "TIMER DONE")
return
case t := <-m:
cclog.ComponentDebug("MetricRouter", "INTERVAL_TICK", t.Unix())
r.timestamp = t
}
}
}()
cclog.ComponentDebug("MetricRouter", "TIMER START")
}
func getParamMap(point lp.CCMetric) map[string]interface{} {
params := make(map[string]interface{})
params["metric"] = point
@@ -211,9 +248,8 @@ func (r *metricRouter) dropMetric(point lp.CCMetric) bool {
func (r *metricRouter) Start() {
// start timer if configured
r.timestamp = time.Now()
timeChan := make(chan time.Time)
if r.config.IntervalStamp {
r.ticker.AddChannel(timeChan)
r.StartTimer()
}
// Router manager is done
@@ -230,6 +266,8 @@ func (r *metricRouter) Start() {
r.DoDelTags(point)
name := point.Name()
if new, ok := r.config.RenameMetrics[name]; ok {
r.statsRenamed++
ComponentStatInt("MetricRouter", "renamed", r.statsRenamed)
point.SetName(new)
point.AddMeta("oldname", name)
}
@@ -249,7 +287,14 @@ func (r *metricRouter) Start() {
p.SetTime(r.timestamp)
}
if !r.dropMetric(p) {
r.statsCollForward++
r.statsTotalForward++
ComponentStatInt("MetricRouter", "collector_forward", r.statsCollForward)
ComponentStatInt("MetricRouter", "total_forward", r.statsTotalForward)
forward(p)
} else {
r.statsDropped++
ComponentStatInt("MetricRouter", "dropped", r.statsDropped)
}
// even if the metric is dropped, it is stored in the cache for
// aggregations
@@ -265,7 +310,14 @@ func (r *metricRouter) Start() {
p.SetTime(r.timestamp)
}
if !r.dropMetric(p) {
r.statsRecvForward++
r.statsTotalForward++
ComponentStatInt("MetricRouter", "receiver_forward", r.statsRecvForward)
ComponentStatInt("MetricRouter", "total_forward", r.statsTotalForward)
forward(p)
} else {
r.statsDropped++
ComponentStatInt("MetricRouter", "dropped", r.statsDropped)
}
}
@@ -274,7 +326,14 @@ func (r *metricRouter) Start() {
// receive from metric collector
if !r.dropMetric(p) {
p.AddTag(r.config.HostnameTagName, r.hostname)
r.statsCacheForward++
r.statsTotalForward++
ComponentStatInt("MetricRouter", "cache_forward", r.statsCacheForward)
ComponentStatInt("MetricRouter", "total_forward", r.statsTotalForward)
forward(p)
} else {
r.statsDropped++
ComponentStatInt("MetricRouter", "dropped", r.statsDropped)
}
}
@@ -293,10 +352,6 @@ func (r *metricRouter) Start() {
done()
return
case timestamp := <-timeChan:
r.timestamp = timestamp
cclog.ComponentDebug("MetricRouter", "Update timestamp", r.timestamp.UnixNano())
case p := <-r.coll_input:
coll_forward(p)
for i := 0; len(r.coll_input) > 0 && i < (r.maxForward-1); i++ {
@@ -342,6 +397,14 @@ func (r *metricRouter) Close() {
// wait for close of channel r.done
<-r.done
// stop timer
if r.config.IntervalStamp {
cclog.ComponentDebug("MetricRouter", "TIMER CLOSE")
r.timerdone <- true
// wait for close of channel r.timerdone
<-r.timerdone
}
// stop metric cache
if r.config.NumCacheIntervals > 0 {
cclog.ComponentDebug("MetricRouter", "CACHE CLOSE")

View File

@@ -4,22 +4,5 @@
"address": "nats://my-url",
"port" : "4222",
"database": "testcluster"
},
"redfish_recv": {
"type": "redfish",
"client_config": [
{
"hostname": "my-host-1",
"username": "username-1",
"password": "password-1",
"endpoint": "https://my-endpoint-1"
},
{
"hostname": "my-host-2",
"username": "username-2",
"password": "password-2",
"endpoint": "https://my-endpoint-2"
}
]
}
}

View File

@@ -10,13 +10,14 @@ import (
)
var AvailableReceivers = map[string]func(name string, config json.RawMessage) (Receiver, error){
"nats": NewNatsReceiver,
"redfish": NewRedfishReceiver,
"nats": NewNatsReceiver,
}
type receiveManager struct {
inputs []Receiver
output chan lp.CCMetric
done chan bool
wg *sync.WaitGroup
config []json.RawMessage
}
@@ -32,6 +33,8 @@ func (rm *receiveManager) Init(wg *sync.WaitGroup, receiverConfigFile string) er
// Initialize struct fields
rm.inputs = make([]Receiver, 0)
rm.output = nil
rm.done = make(chan bool)
rm.wg = wg
rm.config = make([]json.RawMessage, 0)
configFile, err := os.Open(receiverConfigFile)
@@ -55,7 +58,7 @@ func (rm *receiveManager) Init(wg *sync.WaitGroup, receiverConfigFile string) er
}
func (rm *receiveManager) Start() {
cclog.ComponentDebug("ReceiveManager", "START")
rm.wg.Add(1)
for _, r := range rm.inputs {
cclog.ComponentDebug("ReceiveManager", "START", r.Name())
@@ -94,19 +97,16 @@ func (rm *receiveManager) AddOutput(output chan lp.CCMetric) {
}
func (rm *receiveManager) Close() {
cclog.ComponentDebug("ReceiveManager", "CLOSE")
// Close all receivers
for _, r := range rm.inputs {
cclog.ComponentDebug("ReceiveManager", "CLOSE", r.Name())
r.Close()
}
cclog.ComponentDebug("ReceiveManager", "DONE")
rm.wg.Done()
cclog.ComponentDebug("ReceiveManager", "CLOSE")
}
func New(wg *sync.WaitGroup, receiverConfigFile string) (ReceiveManager, error) {
r := new(receiveManager)
r := &receiveManager{}
err := r.Init(wg, receiverConfigFile)
if err != nil {
return nil, err

View File

@@ -1,324 +0,0 @@
package receivers
import (
"encoding/json"
"fmt"
"strconv"
"sync"
"time"
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
// See: https://pkg.go.dev/github.com/stmcginnis/gofish
"github.com/stmcginnis/gofish"
)
// RedfishReceiver configuration:
type RedfishReceiver struct {
receiver
config struct {
Type string `json:"type"`
Fanout int `json:"fanout,omitempty"` // Default fanout: 64
Interval int `json:"interval,omitempty"` // Default interval: 30s
// Client config for each redfish service
ClientConfigs []struct {
Hostname *string `json:"hostname"`
Username *string `json:"username"`
Password *string `json:"password"`
Endpoint *string `json:"endpoint"`
Insecure *bool `json:"insecure,omitempty"`
ExcludeMetrics []string `json:"exclude_metrics,omitempty"`
gofish gofish.ClientConfig
} `json:"client_config"`
}
done chan bool // channel to finish / stop redfish receiver
wg sync.WaitGroup // wait group for redfish receiver
}
// Start starts the redfish receiver
func (r *RedfishReceiver) Start() {
cclog.ComponentDebug(r.name, "START")
// readPowerMetric reads readfish power metric from the endpoint configured in conf
readPowerMetric := func(clientConfigIndex int) error {
clientConfig := &r.config.ClientConfigs[clientConfigIndex]
// Connect to redfish service
c, err := gofish.Connect(clientConfig.gofish)
if err != nil {
c := struct {
Username string
Endpoint string
BasicAuth bool
Insecure bool
}{
Username: clientConfig.gofish.Username,
Endpoint: clientConfig.gofish.Endpoint,
BasicAuth: clientConfig.gofish.BasicAuth,
Insecure: clientConfig.gofish.Insecure,
}
return fmt.Errorf("readPowerMetric: gofish.Connect(%+v) failed: %v", c, err)
}
defer c.Logout()
// Get all chassis managed by this service
chassis_list, err := c.Service.Chassis()
if err != nil {
return fmt.Errorf("readPowerMetric: c.Service.Chassis() failed: %v", err)
}
for _, chassis := range chassis_list {
timestamp := time.Now()
// Get power information for each chassis
power, err := chassis.Power()
if err != nil {
return fmt.Errorf("readPowerMetric: chassis.Power() failed: %v", err)
}
if power == nil {
continue
}
// Read min, max and average consumed watts for each power control
for _, pc := range power.PowerControl {
// Map of collected metrics
metrics := map[string]float32{
// PowerConsumedWatts shall represent the actual power being consumed (in
// Watts) by the chassis
"consumed_watts": pc.PowerConsumedWatts,
// AverageConsumedWatts shall represent the
// average power level that occurred averaged over the last IntervalInMin
// minutes.
"average_consumed_watts": pc.PowerMetrics.AverageConsumedWatts,
// MinConsumedWatts shall represent the
// minimum power level in watts that occurred within the last
// IntervalInMin minutes.
"min_consumed_watts": pc.PowerMetrics.MinConsumedWatts,
// MaxConsumedWatts shall represent the
// maximum power level in watts that occurred within the last
// IntervalInMin minutes
"max_consumed_watts": pc.PowerMetrics.MaxConsumedWatts,
}
intervalInMin := strconv.FormatFloat(float64(pc.PowerMetrics.IntervalInMin), 'f', -1, 32)
// Metrics to exclude
for _, key := range clientConfig.ExcludeMetrics {
delete(metrics, key)
}
// Set tags
tags := map[string]string{
"hostname": *clientConfig.Hostname,
"type": "node",
// ID uniquely identifies the resource
"id": pc.ID,
// MemberID shall uniquely identify the member within the collection. For
// services supporting Redfish v1.6 or higher, this value shall be the
// zero-based array index.
"member_id": pc.MemberID,
// PhysicalContext shall be a description of the affected device(s) or region
// within the chassis to which this power control applies.
"physical_context": string(pc.PhysicalContext),
// Name
"power_control_name": pc.Name,
}
// Delete empty tags
for key, value := range tags {
if value == "" {
delete(tags, key)
}
}
// Set meta data tags
meta := map[string]string{
"source": r.name,
"group": "Energy",
"interval_in_minutes": intervalInMin,
"unit": "watts",
}
// Delete empty meta data tags
for key, value := range meta {
if value == "" {
delete(meta, key)
}
}
for name, value := range metrics {
y, err := lp.New(name, tags, meta,
map[string]interface{}{
"value": value,
},
timestamp)
if err == nil {
r.sink <- y
}
}
}
}
return nil
}
// doReadPowerMetric read power metrics for all configure redfish services.
// To compensate latencies of the Redfish services a fanout is used.
doReadPowerMetric := func() {
// Compute fanout to use
realFanout := r.config.Fanout
if len(r.config.ClientConfigs) < realFanout {
realFanout = len(r.config.ClientConfigs)
}
// Create wait group and input channel for workers
var workerWaitGroup sync.WaitGroup
workerInput := make(chan int, realFanout)
// Create worker go routines
for i := 0; i < realFanout; i++ {
// Increment worker wait group counter
workerWaitGroup.Add(1)
go func() {
// Decrement worker wait group counter
defer workerWaitGroup.Done()
// Read power metrics for each client config
for clientConfigIndex := range workerInput {
err := readPowerMetric(clientConfigIndex)
if err != nil {
cclog.ComponentError(r.name, err)
}
}
}()
}
// Distribute client configs to workers
for i := range r.config.ClientConfigs {
// Check done channel status
select {
case workerInput <- i:
case <-r.done:
// process done event
// Stop workers, clear channel and wait for all workers to finish
close(workerInput)
for range workerInput {
}
workerWaitGroup.Wait()
return
}
}
// Stop workers and wait for all workers to finish
close(workerInput)
workerWaitGroup.Wait()
}
// Start redfish receiver
r.wg.Add(1)
go func() {
defer r.wg.Done()
// Create ticker
ticker := time.NewTicker(time.Duration(r.config.Interval) * time.Second)
defer ticker.Stop()
for {
doReadPowerMetric()
select {
case <-ticker.C:
// process ticker event -> continue
continue
case <-r.done:
// process done event
return
}
}
}()
cclog.ComponentDebug(r.name, "STARTED")
}
// Close redfish receiver
func (r *RedfishReceiver) Close() {
cclog.ComponentDebug(r.name, "CLOSE")
// Send the signal and wait
close(r.done)
r.wg.Wait()
cclog.ComponentDebug(r.name, "DONE")
}
// New function to create a new instance of the receiver
// Initialize the receiver by giving it a name and reading in the config JSON
func NewRedfishReceiver(name string, config json.RawMessage) (Receiver, error) {
r := new(RedfishReceiver)
// Set name
r.name = fmt.Sprintf("RedfishReceiver(%s)", name)
// Create done channel
r.done = make(chan bool)
// Set defaults in r.config
// Allow overwriting these defaults by reading config JSON
r.config.Fanout = 64
r.config.Interval = 30
// Read the redfish receiver specific JSON config
if len(config) > 0 {
err := json.Unmarshal(config, &r.config)
if err != nil {
cclog.ComponentError(r.name, "Error reading config:", err.Error())
return nil, err
}
}
// Create gofish client config
for i := range r.config.ClientConfigs {
clientConfig := &r.config.ClientConfigs[i]
gofishConfig := &clientConfig.gofish
if clientConfig.Hostname == nil {
err := fmt.Errorf("client config number %v requires hostname", i)
cclog.ComponentError(r.name, err)
return nil, err
}
if clientConfig.Endpoint == nil {
err := fmt.Errorf("client config number %v requires endpoint", i)
cclog.ComponentError(r.name, err)
return nil, err
}
gofishConfig.Endpoint = *clientConfig.Endpoint
if clientConfig.Username == nil {
err := fmt.Errorf("client config number %v requires username", i)
cclog.ComponentError(r.name, err)
return nil, err
}
gofishConfig.Username = *clientConfig.Username
if clientConfig.Password == nil {
err := fmt.Errorf("client config number %v requires password", i)
cclog.ComponentError(r.name, err)
return nil, err
}
gofishConfig.Password = *clientConfig.Password
gofishConfig.Insecure = true
if clientConfig.Insecure != nil {
gofishConfig.Insecure = *clientConfig.Insecure
}
}
return r, nil
}

View File

@@ -36,26 +36,16 @@ func (r *SampleReceiver) Start() {
// or use own go routine but always make sure it exits
// as soon as it gets the signal of the r.done channel
//
// r.done = make(chan bool)
// r.wg.Add(1)
// go func() {
// defer r.wg.Done()
//
// // Create ticker
// ticker := time.NewTicker(30 * time.Second)
// defer ticker.Stop()
//
// for {
// readMetric()
// select {
// case <-ticker.C:
// // process ticker event -> continue
// continue
// case <-r.done:
// return
// }
// }
// for {
// select {
// case <-r.done:
// r.wg.Done()
// return
// }
// }
// r.wg.Done()
// }()
}

View File

@@ -1,8 +1,6 @@
{
"mystdout": {
"type": "stdout",
"meta_as_tags": [
"unit"
]
"mystdout" : {
"type" : "stdout",
"meta_as_tags" : true
}
}
}

View File

@@ -11,6 +11,7 @@ import (
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
)
const GMETRIC_EXEC = `gmetric`
@@ -29,9 +30,10 @@ type GangliaSinkConfig struct {
type GangliaSink struct {
sink
gmetric_path string
gmetric_config string
config GangliaSinkConfig
gmetric_path string
gmetric_config string
config GangliaSinkConfig
statsSentMetrics int64
}
func (s *GangliaSink) Write(point lp.CCMetric) error {
@@ -78,6 +80,8 @@ func (s *GangliaSink) Write(point lp.CCMetric) error {
command := exec.Command(s.gmetric_path, argstr...)
command.Wait()
_, err = command.Output()
s.statsSentMetrics++
stats.ComponentStatInt(s.name, "sent_metrics", s.statsSentMetrics)
return err
}
@@ -120,5 +124,6 @@ func NewGangliaSink(name string, config json.RawMessage) (Sink, error) {
if len(s.config.GmetricConfig) > 0 {
s.gmetric_config = s.config.GmetricConfig
}
s.statsSentMetrics = 0
return s, nil
}

View File

@@ -11,6 +11,7 @@ import (
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
influx "github.com/influxdata/line-protocol"
)
@@ -22,7 +23,6 @@ type HttpSinkConfig struct {
MaxIdleConns int `json:"max_idle_connections,omitempty"`
IdleConnTimeout string `json:"idle_connection_timeout,omitempty"`
FlushDelay string `json:"flush_delay,omitempty"`
BatchSize int `json:"batch_size,omitempty"`
}
type HttpSink struct {
@@ -37,7 +37,8 @@ type HttpSink struct {
idleConnTimeout time.Duration
timeout time.Duration
flushDelay time.Duration
batchSize int
statsProcessed int64
statsFlushes int64
}
func (s *HttpSink) Write(m lp.CCMetric) error {
@@ -59,21 +60,19 @@ func (s *HttpSink) Write(m lp.CCMetric) error {
s.lock.Lock()
_, err := s.encoder.Encode(p)
s.batchSize++
s.lock.Unlock() // defer does not work here as Flush() takes the lock as well
if err != nil {
cclog.ComponentError(s.name, "encoding failed:", err.Error())
return err
}
s.statsProcessed++
stats.ComponentStatInt(s.name, "processed_metrics", s.statsProcessed)
// Flush synchronously if "flush_delay" is zero
if s.flushDelay == 0 {
return s.Flush()
}
if s.batchSize == s.config.BatchSize {
return s.Flush()
}
return err
}
@@ -105,7 +104,6 @@ func (s *HttpSink) Flush() error {
// Clear buffer
s.buffer.Reset()
s.batchSize = 0
// Handle transport/tcp errors
if err != nil {
@@ -119,6 +117,8 @@ func (s *HttpSink) Flush() error {
cclog.ComponentError(s.name, "application error:", err.Error())
return err
}
s.statsFlushes++
stats.ComponentStatInt(s.name, "flushes", s.statsFlushes)
return nil
}
@@ -139,7 +139,6 @@ func NewHttpSink(name string, config json.RawMessage) (Sink, error) {
s.config.IdleConnTimeout = "5s"
s.config.Timeout = "5s"
s.config.FlushDelay = "1s"
s.config.BatchSize = 100
// Read config
if len(config) > 0 {
@@ -185,5 +184,7 @@ func NewHttpSink(name string, config json.RawMessage) (Sink, error) {
s.buffer = &bytes.Buffer{}
s.encoder = influx.NewEncoder(s.buffer)
s.encoder.SetPrecision(time.Second)
s.statsFlushes = 0
s.statsProcessed = 0
return s, nil
}

View File

@@ -15,7 +15,6 @@ The `http` sink uses POST requests to a HTTP server to submit the metrics in the
"max_idle_connections" : 10,
"idle_connection_timeout" : "5s",
"flush_delay": "2s",
"batch_size" : 100
}
}
```
@@ -28,4 +27,3 @@ The `http` sink uses POST requests to a HTTP server to submit the metrics in the
- `max_idle_connections`: Maximally idle connections (default 10)
- `idle_connection_timeout`: Timeout for idle connections (default '5s')
- `flush_delay`: Batch all writes arriving in during this duration (default '1s', batching can be disabled by setting it to 0)
- `batch_size`: Maximal number of batched metrics. Either it is flushed because batch size or the `flush_delay` is reached

View File

@@ -6,14 +6,13 @@ import (
"encoding/json"
"errors"
"fmt"
"strings"
"time"
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
influxdb2Api "github.com/influxdata/influxdb-client-go/v2/api"
influxdb2ApiHttp "github.com/influxdata/influxdb-client-go/v2/api/http"
)
type InfluxAsyncSinkConfig struct {
@@ -34,8 +33,6 @@ type InfluxAsyncSinkConfig struct {
InfluxExponentialBase uint `json:"retry_exponential_base,omitempty"`
InfluxMaxRetries uint `json:"max_retries,omitempty"`
InfluxMaxRetryTime string `json:"max_retry_time,omitempty"`
CustomFlushInterval string `json:"custom_flush_interval,omitempty"`
MaxRetryAttempts uint `json:"max_retry_attempts,omitempty"`
}
type InfluxAsyncSink struct {
@@ -46,8 +43,9 @@ type InfluxAsyncSink struct {
config InfluxAsyncSinkConfig
influxRetryInterval uint
influxMaxRetryTime uint
customFlushInterval time.Duration
flushTimer *time.Timer
sentMetrics int64
statsFlushes int64
statsErrors int64
}
func (s *InfluxAsyncSink) connect() error {
@@ -104,35 +102,22 @@ func (s *InfluxAsyncSink) connect() error {
if !ok {
return fmt.Errorf("connection to %s not healthy", uri)
}
s.writeApi.SetWriteFailedCallback(func(batch string, err influxdb2ApiHttp.Error, retryAttempts uint) bool {
mlist := strings.Split(batch, "\n")
cclog.ComponentError(s.name, fmt.Sprintf("Failed to write batch with %d metrics %d times (max: %d): %s", len(mlist), retryAttempts, s.config.MaxRetryAttempts, err.Error()))
return retryAttempts <= s.config.MaxRetryAttempts
})
return nil
}
func (s *InfluxAsyncSink) Write(m lp.CCMetric) error {
if s.customFlushInterval != 0 && s.flushTimer == nil {
// Run a batched flush for all lines that have arrived in the defined interval
s.flushTimer = time.AfterFunc(s.customFlushInterval, func() {
if err := s.Flush(); err != nil {
cclog.ComponentError(s.name, "flush failed:", err.Error())
}
})
}
s.writeApi.WritePoint(
m.ToPoint(s.meta_as_tags),
)
s.sentMetrics++
stats.ComponentStatInt(s.name, "send_metrics", s.sentMetrics)
return nil
}
func (s *InfluxAsyncSink) Flush() error {
cclog.ComponentDebug(s.name, "Flushing")
s.writeApi.Flush()
if s.customFlushInterval != 0 && s.flushTimer != nil {
s.flushTimer = nil
}
s.statsFlushes++
stats.ComponentStatInt(s.name, "flushes", s.statsFlushes)
return nil
}
@@ -155,9 +140,6 @@ func NewInfluxAsyncSink(name string, config json.RawMessage) (Sink, error) {
s.config.InfluxMaxRetries = 0
s.config.InfluxExponentialBase = 0
s.config.FlushInterval = 0
s.config.CustomFlushInterval = ""
s.customFlushInterval = time.Duration(0)
s.config.MaxRetryAttempts = 1
// Default retry intervals (in seconds)
// 1 2
@@ -209,27 +191,23 @@ func NewInfluxAsyncSink(name string, config json.RawMessage) (Sink, error) {
s.influxRetryInterval = toUint(s.config.InfluxRetryInterval, s.influxRetryInterval)
s.influxMaxRetryTime = toUint(s.config.InfluxMaxRetryTime, s.influxMaxRetryTime)
// Use a own timer for calling Flush()
if len(s.config.CustomFlushInterval) > 0 {
t, err := time.ParseDuration(s.config.CustomFlushInterval)
if err != nil {
return nil, fmt.Errorf("invalid duration in 'custom_flush_interval': %v", err)
}
s.customFlushInterval = t
}
// Connect to InfluxDB server
if err := s.connect(); err != nil {
return nil, fmt.Errorf("unable to connect: %v", err)
}
// Start background: Read from error channel
s.statsErrors = 0
s.errors = s.writeApi.Errors()
go func() {
for err := range s.errors {
s.statsErrors++
stats.ComponentStatInt(s.name, "errors", s.statsErrors)
cclog.ComponentError(s.name, err.Error())
}
}()
s.sentMetrics = 0
s.statsFlushes = 0
return s, nil
}

View File

@@ -11,6 +11,7 @@ import (
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
influxdb2Api "github.com/influxdata/influxdb-client-go/v2/api"
"github.com/influxdata/influxdb-client-go/v2/api/write"
@@ -37,43 +38,34 @@ type InfluxSinkConfig struct {
type InfluxSink struct {
sink
client influxdb2.Client
writeApi influxdb2Api.WriteAPIBlocking
config InfluxSinkConfig
influxRetryInterval uint
influxMaxRetryTime uint
batch []*write.Point
flushTimer *time.Timer
flushDelay time.Duration
lock sync.Mutex // Flush() runs in another goroutine, so this lock has to protect the buffer
client influxdb2.Client
writeApi influxdb2Api.WriteAPIBlocking
config InfluxSinkConfig
influxRetryInterval uint
influxMaxRetryTime uint
batch []*write.Point
flushTimer *time.Timer
flushDelay time.Duration
lock sync.Mutex // Flush() runs in another goroutine, so this lock has to protect the buffer
statsSentMetrics int64
statsProcessedMetrics int64
//influxMaxRetryDelay uint
}
// connect connects to the InfluxDB server
func (s *InfluxSink) connect() error {
// URI options:
// * http://host:port
// * https://host:port
var auth string
var uri string
if s.config.SSL {
uri = fmt.Sprintf("https://%s:%s", s.config.Host, s.config.Port)
} else {
uri = fmt.Sprintf("http://%s:%s", s.config.Host, s.config.Port)
}
// Authentication options:
// * token
// * username:password
var auth string
if len(s.config.User) == 0 {
auth = s.config.Password
} else {
auth = fmt.Sprintf("%s:%s", s.config.User, s.config.Password)
}
cclog.ComponentDebug(s.name, "Using URI", uri, "Org", s.config.Organization, "Bucket", s.config.Database)
// Set influxDB client options
clientOptions := influxdb2.DefaultOptions()
// if s.influxRetryInterval != 0 {
@@ -93,7 +85,6 @@ func (s *InfluxSink) connect() error {
// clientOptions.SetMaxRetries(s.config.InfluxMaxRetries)
// }
// Do not check InfluxDB certificate
clientOptions.SetTLSConfig(
&tls.Config{
InsecureSkipVerify: true,
@@ -102,11 +93,8 @@ func (s *InfluxSink) connect() error {
clientOptions.SetPrecision(time.Second)
// Create new writeAPI
s.client = influxdb2.NewClientWithOptions(uri, auth, clientOptions)
s.writeApi = s.client.WriteAPIBlocking(s.config.Organization, s.config.Database)
// Check InfluxDB server accessibility
ok, err := s.client.Ping(context.Background())
if err != nil {
return err
@@ -118,65 +106,53 @@ func (s *InfluxSink) connect() error {
}
func (s *InfluxSink) Write(m lp.CCMetric) error {
// err :=
// s.writeApi.WritePoint(
// context.Background(),
// m.ToPoint(s.meta_as_tags),
// )
if len(s.batch) == 0 && s.flushDelay != 0 {
// This is the first write since the last flush, start the flushTimer!
if s.flushTimer != nil && s.flushTimer.Stop() {
cclog.ComponentDebug(s.name, "unexpected: the flushTimer was already running?")
}
// Run a batched flush for all lines that have arrived in the last flush delay interval
// Run a batched flush for all lines that have arrived in the last second
s.flushTimer = time.AfterFunc(s.flushDelay, func() {
if err := s.Flush(); err != nil {
cclog.ComponentError(s.name, "flush failed:", err.Error())
}
})
}
// Append metric to batch slice
p := m.ToPoint(s.meta_as_tags)
s.lock.Lock()
s.statsProcessedMetrics++
s.batch = append(s.batch, p)
s.lock.Unlock()
stats.ComponentStatInt(s.name, "processed_metrics", s.statsProcessedMetrics)
// Flush synchronously if "flush_delay" is zero
if s.flushDelay == 0 {
return s.Flush()
}
// Flush if batch size is reached
if len(s.batch) == s.config.BatchSize {
return s.Flush()
}
return nil
}
// Flush sends all metrics buffered in batch slice to InfluxDB server
func (s *InfluxSink) Flush() error {
// Lock access to batch slice
s.lock.Lock()
defer s.lock.Unlock()
// Nothing to do, batch slice is empty
if len(s.batch) == 0 {
return nil
}
// Send metrics from batch slice
err := s.writeApi.WritePoint(context.Background(), s.batch...)
if err != nil {
cclog.ComponentError(s.name, "flush failed:", err.Error())
return err
}
// Clear batch slice
for i := range s.batch {
s.batch[i] = nil
}
s.statsSentMetrics += int64(len(s.batch))
stats.ComponentStatInt(s.name, "sent_metrics", s.statsSentMetrics)
s.batch = s.batch[:0]
return nil
}
@@ -187,16 +163,11 @@ func (s *InfluxSink) Close() {
s.client.Close()
}
// NewInfluxSink create a new InfluxDB sink
func NewInfluxSink(name string, config json.RawMessage) (Sink, error) {
s := new(InfluxSink)
s.name = fmt.Sprintf("InfluxSink(%s)", name)
// Set config default values
s.config.BatchSize = 100
s.config.FlushDelay = "1s"
// Read config
if len(config) > 0 {
err := json.Unmarshal(config, &s.config)
if err != nil {
@@ -210,22 +181,13 @@ func NewInfluxSink(name string, config json.RawMessage) (Sink, error) {
// s.config.InfluxMaxRetries = 0
// s.config.InfluxExponentialBase = 0
if len(s.config.Host) == 0 {
return nil, errors.New("Missing host configuration required by InfluxSink")
if len(s.config.Host) == 0 ||
len(s.config.Port) == 0 ||
len(s.config.Database) == 0 ||
len(s.config.Organization) == 0 ||
len(s.config.Password) == 0 {
return nil, errors.New("not all configuration variables set required by InfluxSink")
}
if len(s.config.Port) == 0 {
return nil, errors.New("Missing port configuration required by InfluxSink")
}
if len(s.config.Database) == 0 {
return nil, errors.New("Missing database configuration required by InfluxSink")
}
if len(s.config.Organization) == 0 {
return nil, errors.New("Missing organization configuration required by InfluxSink")
}
if len(s.config.Password) == 0 {
return nil, errors.New("Missing password configuration required by InfluxSink")
}
// Create lookup map to use meta infos as tags in the output metric
s.meta_as_tags = make(map[string]bool)
for _, k := range s.config.MetaAsTags {
@@ -244,20 +206,19 @@ func NewInfluxSink(name string, config json.RawMessage) (Sink, error) {
// s.influxRetryInterval = toUint(s.config.InfluxRetryInterval, s.influxRetryInterval)
// s.influxMaxRetryTime = toUint(s.config.InfluxMaxRetryTime, s.influxMaxRetryTime)
// Configure flush delay duration
if len(s.config.FlushDelay) > 0 {
t, err := time.ParseDuration(s.config.FlushDelay)
if err == nil {
s.flushDelay = t
}
}
// allocate batch slice
s.batch = make([]*write.Point, 0, s.config.BatchSize)
// Connect to InfluxDB server
if err := s.connect(); err != nil {
return nil, fmt.Errorf("unable to connect: %v", err)
}
s.statsSentMetrics = 0
s.statsProcessedMetrics = 0
return s, nil
}

View File

@@ -73,6 +73,7 @@ import (
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
"github.com/NVIDIA/go-nvml/pkg/dl"
)
@@ -102,11 +103,12 @@ type LibgangliaSinkConfig struct {
type LibgangliaSink struct {
sink
config LibgangliaSinkConfig
global_context C.Ganglia_pool
gmond_config C.Ganglia_gmond_config
send_channels C.Ganglia_udp_send_channels
cstrCache map[string]*C.char
config LibgangliaSinkConfig
global_context C.Ganglia_pool
gmond_config C.Ganglia_gmond_config
send_channels C.Ganglia_udp_send_channels
cstrCache map[string]*C.char
statsSentMetrics int64
}
func (s *LibgangliaSink) Write(point lp.CCMetric) error {
@@ -202,6 +204,8 @@ func (s *LibgangliaSink) Write(point lp.CCMetric) error {
C.Ganglia_metric_destroy(gmetric)
// Free the value C string, the only one not stored in the cache
C.free(unsafe.Pointer(c_value))
s.statsSentMetrics++
stats.ComponentStatInt(s.name, "sent_metrics", s.statsSentMetrics)
return err
}
@@ -247,7 +251,7 @@ func NewLibgangliaSink(name string, config json.RawMessage) (Sink, error) {
if err != nil {
return nil, fmt.Errorf("error opening %s: %v", s.config.GangliaLib, err)
}
s.statsSentMetrics = 0
// Set up cache for the C strings
s.cstrCache = make(map[string]*C.char)
// s.cstrCache["globals"] = C.CString("globals")

View File

@@ -11,6 +11,7 @@ import (
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
"github.com/gorilla/mux"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
@@ -29,11 +30,12 @@ type PrometheusSinkConfig struct {
type PrometheusSink struct {
sink
config PrometheusSinkConfig
labelMetrics map[string]*prometheus.GaugeVec
nodeMetrics map[string]prometheus.Gauge
promWg sync.WaitGroup
promServer *http.Server
config PrometheusSinkConfig
labelMetrics map[string]*prometheus.GaugeVec
nodeMetrics map[string]prometheus.Gauge
promWg sync.WaitGroup
promServer *http.Server
statsSentMetrics int64
}
func intToFloat64(input interface{}) (float64, error) {
@@ -113,6 +115,8 @@ func (s *PrometheusSink) newMetric(metric lp.CCMetric) error {
s.nodeMetrics[name] = new
prometheus.Register(new)
}
s.statsSentMetrics++
stats.ComponentStatInt(s.name, "sent_metrics", s.statsSentMetrics)
return nil
}
@@ -146,6 +150,8 @@ func (s *PrometheusSink) updateMetric(metric lp.CCMetric) error {
}
s.nodeMetrics[name].Set(value)
}
s.statsSentMetrics++
stats.ComponentStatInt(s.name, "sent_metrics", s.statsSentMetrics)
return nil
}

View File

@@ -7,6 +7,7 @@ import (
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
)
type SampleSinkConfig struct {
@@ -14,12 +15,15 @@ type SampleSinkConfig struct {
// See: metricSink.go
defaultSinkConfig
// Additional config options, for SampleSink
}
type SampleSink struct {
// declares elements 'name' and 'meta_as_tags' (string to bool map!)
sink
config SampleSinkConfig // entry point to the SampleSinkConfig
// Stats counters
statsSentMetrics int64
}
// Implement functions required for Sink interface
@@ -30,6 +34,8 @@ type SampleSink struct {
func (s *SampleSink) Write(point lp.CCMetric) error {
// based on s.meta_as_tags use meta infos as tags
log.Print(point)
s.statsSentMetrics++
stats.ComponentStatInt(s.name, "sent_metrics", s.statsSentMetrics)
return nil
}
@@ -63,6 +69,9 @@ func NewSampleSink(name string, config json.RawMessage) (Sink, error) {
}
}
// Initalize stats counters
s.statsSentMetrics = 0
// Create lookup map to use meta infos as tags in the output metric
s.meta_as_tags = make(map[string]bool)
for _, k := range s.config.MetaAsTags {

View File

@@ -102,13 +102,19 @@ func (sm *sinkManager) Start() {
}
toTheSinks := func(p lp.CCMetric) {
var wg sync.WaitGroup
// Send received metric to all outputs
cclog.ComponentDebug("SinkManager", "WRITE", p)
for _, s := range sm.sinks {
if err := s.Write(p); err != nil {
cclog.ComponentError("SinkManager", "WRITE", s.Name(), "write failed:", err.Error())
}
wg.Add(1)
go func(s Sink) {
if err := s.Write(p); err != nil {
cclog.ComponentError("SinkManager", "WRITE", s.Name(), "write failed:", err.Error())
}
wg.Done()
}(s)
}
wg.Wait()
}
for {

View File

@@ -8,6 +8,7 @@ import (
// "time"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
)
type StdoutSink struct {
@@ -17,6 +18,7 @@ type StdoutSink struct {
defaultSinkConfig
Output string `json:"output_file,omitempty"`
}
sentMetrics int64
}
func (s *StdoutSink) Write(m lp.CCMetric) error {
@@ -24,6 +26,8 @@ func (s *StdoutSink) Write(m lp.CCMetric) error {
s.output,
m.ToLineProtocol(s.meta_as_tags),
)
s.sentMetrics++
stats.ComponentStatInt(s.name, "sent_metrics", s.sentMetrics)
return nil
}
@@ -68,6 +72,7 @@ func NewStdoutSink(name string, config json.RawMessage) (Sink, error) {
for _, k := range s.config.MetaAsTags {
s.meta_as_tags[k] = true
}
s.sentMetrics = 0
return s, nil
}