mirror of
				https://github.com/ClusterCockpit/cc-metric-collector.git
				synced 2025-10-20 21:05:06 +02:00 
			
		
		
		
	Compare commits
	
		
			5 Commits
		
	
	
		
			slurm_cgro
			...
			http_stats
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
|  | 9dd6ff1a76 | ||
|  | 257b4a64b5 | ||
|  | 5eeb097136 | ||
|  | 4a4992877c | ||
|  | 9447685a69 | 
| @@ -20,6 +20,7 @@ There is a main configuration file with basic settings that point to the other c | |||||||
|   "collectors" : "collectors.json", |   "collectors" : "collectors.json", | ||||||
|   "receivers" : "receivers.json", |   "receivers" : "receivers.json", | ||||||
|   "router" : "router.json", |   "router" : "router.json", | ||||||
|  |   "stats_api" : "api.json", | ||||||
|   "interval": 10, |   "interval": 10, | ||||||
|   "duration": 1 |   "duration": 1 | ||||||
| } | } | ||||||
| @@ -32,6 +33,7 @@ See the component READMEs for their configuration: | |||||||
| * [`sinks`](./sinks/README.md) | * [`sinks`](./sinks/README.md) | ||||||
| * [`receivers`](./receivers/README.md) | * [`receivers`](./receivers/README.md) | ||||||
| * [`router`](./internal/metricRouter/README.md) | * [`router`](./internal/metricRouter/README.md) | ||||||
|  | * [`stats_api`](./internal/metricRouter/StatsApi.md) | ||||||
|  |  | ||||||
|  |  | ||||||
| # Installation | # Installation | ||||||
|   | |||||||
| @@ -28,6 +28,7 @@ type CentralConfigFile struct { | |||||||
| 	RouterConfigFile    string `json:"router"` | 	RouterConfigFile    string `json:"router"` | ||||||
| 	SinkConfigFile      string `json:"sinks"` | 	SinkConfigFile      string `json:"sinks"` | ||||||
| 	ReceiverConfigFile  string `json:"receivers,omitempty"` | 	ReceiverConfigFile  string `json:"receivers,omitempty"` | ||||||
|  | 	StatsApiConfigFile  string `json:"stats_api,omitempty"` | ||||||
| } | } | ||||||
|  |  | ||||||
| func LoadCentralConfiguration(file string, config *CentralConfigFile) error { | func LoadCentralConfiguration(file string, config *CentralConfigFile) error { | ||||||
| @@ -52,6 +53,7 @@ type RuntimeConfig struct { | |||||||
| 	CollectManager  collectors.CollectorManager | 	CollectManager  collectors.CollectorManager | ||||||
| 	SinkManager     sinks.SinkManager | 	SinkManager     sinks.SinkManager | ||||||
| 	ReceiveManager  receivers.ReceiveManager | 	ReceiveManager  receivers.ReceiveManager | ||||||
|  | 	StatsApi        mr.StatsApi | ||||||
| 	MultiChanTicker mct.MultiChanTicker | 	MultiChanTicker mct.MultiChanTicker | ||||||
|  |  | ||||||
| 	Channels []chan lp.CCMetric | 	Channels []chan lp.CCMetric | ||||||
| @@ -152,11 +154,16 @@ func shutdownHandler(config *RuntimeConfig, shutdownSignal chan os.Signal) { | |||||||
| 		cclog.Debug("Shutdown SinkManager...") | 		cclog.Debug("Shutdown SinkManager...") | ||||||
| 		config.SinkManager.Close() | 		config.SinkManager.Close() | ||||||
| 	} | 	} | ||||||
|  | 	if config.StatsApi != nil { | ||||||
|  | 		cclog.Debug("Shutdown StatsApi...") | ||||||
|  | 		config.StatsApi.Close() | ||||||
|  | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| func mainFunc() int { | func mainFunc() int { | ||||||
| 	var err error | 	var err error | ||||||
| 	use_recv := false | 	use_recv := false | ||||||
|  | 	use_api := false | ||||||
|  |  | ||||||
| 	// Initialize runtime configuration | 	// Initialize runtime configuration | ||||||
| 	rcfg := RuntimeConfig{ | 	rcfg := RuntimeConfig{ | ||||||
| @@ -164,6 +171,7 @@ func mainFunc() int { | |||||||
| 		CollectManager: nil, | 		CollectManager: nil, | ||||||
| 		SinkManager:    nil, | 		SinkManager:    nil, | ||||||
| 		ReceiveManager: nil, | 		ReceiveManager: nil, | ||||||
|  | 		StatsApi:       nil, | ||||||
| 		CliArgs:        ReadCli(), | 		CliArgs:        ReadCli(), | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| @@ -253,6 +261,16 @@ func mainFunc() int { | |||||||
| 		use_recv = true | 		use_recv = true | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | 	// Create new statistics API manager | ||||||
|  | 	if len(rcfg.ConfigFile.StatsApiConfigFile) > 0 { | ||||||
|  | 		rcfg.StatsApi, err = mr.NewStatsApi(rcfg.MultiChanTicker, &rcfg.Sync, rcfg.ConfigFile.StatsApiConfigFile) | ||||||
|  | 		if err != nil { | ||||||
|  | 			cclog.Error(err.Error()) | ||||||
|  | 			return 1 | ||||||
|  | 		} | ||||||
|  | 		use_api = true | ||||||
|  | 	} | ||||||
|  |  | ||||||
| 	// Create shutdown handler | 	// Create shutdown handler | ||||||
| 	shutdownSignal := make(chan os.Signal, 1) | 	shutdownSignal := make(chan os.Signal, 1) | ||||||
| 	signal.Notify(shutdownSignal, os.Interrupt) | 	signal.Notify(shutdownSignal, os.Interrupt) | ||||||
| @@ -260,6 +278,11 @@ func mainFunc() int { | |||||||
| 	rcfg.Sync.Add(1) | 	rcfg.Sync.Add(1) | ||||||
| 	go shutdownHandler(&rcfg, shutdownSignal) | 	go shutdownHandler(&rcfg, shutdownSignal) | ||||||
|  |  | ||||||
|  | 	// Start the stats api early to be prepared for init settings | ||||||
|  | 	if use_api { | ||||||
|  | 		rcfg.StatsApi.Start() | ||||||
|  | 	} | ||||||
|  |  | ||||||
| 	// Start the managers | 	// Start the managers | ||||||
| 	rcfg.MetricRouter.Start() | 	rcfg.MetricRouter.Start() | ||||||
| 	rcfg.SinkManager.Start() | 	rcfg.SinkManager.Start() | ||||||
|   | |||||||
| @@ -16,6 +16,7 @@ import ( | |||||||
|  |  | ||||||
| 	cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" | 	cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" | ||||||
| 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | ||||||
|  | 	stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| const DEFAULT_BEEGFS_CMD = "beegfs-ctl" | const DEFAULT_BEEGFS_CMD = "beegfs-ctl" | ||||||
| @@ -29,10 +30,11 @@ type BeegfsMetaCollectorConfig struct { | |||||||
|  |  | ||||||
| type BeegfsMetaCollector struct { | type BeegfsMetaCollector struct { | ||||||
| 	metricCollector | 	metricCollector | ||||||
| 	tags    map[string]string | 	tags                  map[string]string | ||||||
| 	matches map[string]string | 	matches               map[string]string | ||||||
| 	config  BeegfsMetaCollectorConfig | 	config                BeegfsMetaCollectorConfig | ||||||
| 	skipFS  map[string]struct{} | 	skipFS                map[string]struct{} | ||||||
|  | 	statsProcessedMetrics int64 | ||||||
| } | } | ||||||
|  |  | ||||||
| func (m *BeegfsMetaCollector) Init(config json.RawMessage) error { | func (m *BeegfsMetaCollector) Init(config json.RawMessage) error { | ||||||
| @@ -105,6 +107,7 @@ func (m *BeegfsMetaCollector) Init(config json.RawMessage) error { | |||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return fmt.Errorf("BeegfsMetaCollector.Init(): Failed to find beegfs-ctl binary '%s': %v", m.config.Beegfs, err) | 		return fmt.Errorf("BeegfsMetaCollector.Init(): Failed to find beegfs-ctl binary '%s': %v", m.config.Beegfs, err) | ||||||
| 	} | 	} | ||||||
|  | 	m.statsProcessedMetrics = 0 | ||||||
| 	m.init = true | 	m.init = true | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
| @@ -218,10 +221,12 @@ func (m *BeegfsMetaCollector) Read(interval time.Duration, output chan lp.CCMetr | |||||||
| 				y, err := lp.New(key, m.tags, m.meta, map[string]interface{}{"value": value}, time.Now()) | 				y, err := lp.New(key, m.tags, m.meta, map[string]interface{}{"value": value}, time.Now()) | ||||||
| 				if err == nil { | 				if err == nil { | ||||||
| 					output <- y | 					output <- y | ||||||
|  | 					m.statsProcessedMetrics++ | ||||||
| 				} | 				} | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
|  | 	stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics) | ||||||
| } | } | ||||||
|  |  | ||||||
| func (m *BeegfsMetaCollector) Close() { | func (m *BeegfsMetaCollector) Close() { | ||||||
|   | |||||||
| @@ -16,6 +16,7 @@ import ( | |||||||
|  |  | ||||||
| 	cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" | 	cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" | ||||||
| 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | ||||||
|  | 	stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| // Struct for the collector-specific JSON config | // Struct for the collector-specific JSON config | ||||||
| @@ -27,10 +28,11 @@ type BeegfsStorageCollectorConfig struct { | |||||||
|  |  | ||||||
| type BeegfsStorageCollector struct { | type BeegfsStorageCollector struct { | ||||||
| 	metricCollector | 	metricCollector | ||||||
| 	tags    map[string]string | 	tags                  map[string]string | ||||||
| 	matches map[string]string | 	matches               map[string]string | ||||||
| 	config  BeegfsStorageCollectorConfig | 	config                BeegfsStorageCollectorConfig | ||||||
| 	skipFS  map[string]struct{} | 	skipFS                map[string]struct{} | ||||||
|  | 	statsProcessedMetrics int64 | ||||||
| } | } | ||||||
|  |  | ||||||
| func (m *BeegfsStorageCollector) Init(config json.RawMessage) error { | func (m *BeegfsStorageCollector) Init(config json.RawMessage) error { | ||||||
| @@ -98,6 +100,7 @@ func (m *BeegfsStorageCollector) Init(config json.RawMessage) error { | |||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return fmt.Errorf("BeegfsStorageCollector.Init(): Failed to find beegfs-ctl binary '%s': %v", m.config.Beegfs, err) | 		return fmt.Errorf("BeegfsStorageCollector.Init(): Failed to find beegfs-ctl binary '%s': %v", m.config.Beegfs, err) | ||||||
| 	} | 	} | ||||||
|  | 	m.statsProcessedMetrics = 0 | ||||||
| 	m.init = true | 	m.init = true | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
| @@ -210,10 +213,12 @@ func (m *BeegfsStorageCollector) Read(interval time.Duration, output chan lp.CCM | |||||||
| 				y, err := lp.New(key, m.tags, m.meta, map[string]interface{}{"value": value}, time.Now()) | 				y, err := lp.New(key, m.tags, m.meta, map[string]interface{}{"value": value}, time.Now()) | ||||||
| 				if err == nil { | 				if err == nil { | ||||||
| 					output <- y | 					output <- y | ||||||
|  | 					m.statsProcessedMetrics++ | ||||||
| 				} | 				} | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
|  | 	stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics) | ||||||
| } | } | ||||||
|  |  | ||||||
| func (m *BeegfsStorageCollector) Close() { | func (m *BeegfsStorageCollector) Close() { | ||||||
|   | |||||||
| @@ -12,6 +12,7 @@ import ( | |||||||
|  |  | ||||||
| 	cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" | 	cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" | ||||||
| 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | ||||||
|  | 	stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| // | // | ||||||
| @@ -36,7 +37,8 @@ type CPUFreqCpuInfoCollectorTopology struct { | |||||||
|  |  | ||||||
| type CPUFreqCpuInfoCollector struct { | type CPUFreqCpuInfoCollector struct { | ||||||
| 	metricCollector | 	metricCollector | ||||||
| 	topology []*CPUFreqCpuInfoCollectorTopology | 	topology              []*CPUFreqCpuInfoCollectorTopology | ||||||
|  | 	statsProcessedMetrics int64 | ||||||
| } | } | ||||||
|  |  | ||||||
| func (m *CPUFreqCpuInfoCollector) Init(config json.RawMessage) error { | func (m *CPUFreqCpuInfoCollector) Init(config json.RawMessage) error { | ||||||
| @@ -155,7 +157,7 @@ func (m *CPUFreqCpuInfoCollector) Init(config json.RawMessage) error { | |||||||
| 			"package_id": t.physicalPackageID, | 			"package_id": t.physicalPackageID, | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
|  | 	m.statsProcessedMetrics = 0 | ||||||
| 	m.init = true | 	m.init = true | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
| @@ -196,6 +198,7 @@ func (m *CPUFreqCpuInfoCollector) Read(interval time.Duration, output chan lp.CC | |||||||
| 						return | 						return | ||||||
| 					} | 					} | ||||||
| 					if y, err := lp.New("cpufreq", t.tagSet, m.meta, map[string]interface{}{"value": value}, now); err == nil { | 					if y, err := lp.New("cpufreq", t.tagSet, m.meta, map[string]interface{}{"value": value}, now); err == nil { | ||||||
|  | 						m.statsProcessedMetrics++ | ||||||
| 						output <- y | 						output <- y | ||||||
| 					} | 					} | ||||||
| 				} | 				} | ||||||
| @@ -203,6 +206,7 @@ func (m *CPUFreqCpuInfoCollector) Read(interval time.Duration, output chan lp.CC | |||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
|  | 	stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics) | ||||||
| } | } | ||||||
|  |  | ||||||
| func (m *CPUFreqCpuInfoCollector) Close() { | func (m *CPUFreqCpuInfoCollector) Close() { | ||||||
|   | |||||||
| @@ -11,6 +11,7 @@ import ( | |||||||
|  |  | ||||||
| 	cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" | 	cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" | ||||||
| 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | ||||||
|  | 	stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" | ||||||
| 	"golang.org/x/sys/unix" | 	"golang.org/x/sys/unix" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| @@ -39,8 +40,9 @@ type CPUFreqCollectorTopology struct { | |||||||
| // | // | ||||||
| type CPUFreqCollector struct { | type CPUFreqCollector struct { | ||||||
| 	metricCollector | 	metricCollector | ||||||
| 	topology []CPUFreqCollectorTopology | 	topology              []CPUFreqCollectorTopology | ||||||
| 	config   struct { | 	statsProcessedMetrics int64 | ||||||
|  | 	config                struct { | ||||||
| 		ExcludeMetrics []string `json:"exclude_metrics,omitempty"` | 		ExcludeMetrics []string `json:"exclude_metrics,omitempty"` | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
| @@ -166,7 +168,7 @@ func (m *CPUFreqCollector) Init(config json.RawMessage) error { | |||||||
| 			"package_id": t.physicalPackageID, | 			"package_id": t.physicalPackageID, | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
|  | 	m.statsProcessedMetrics = 0 | ||||||
| 	m.init = true | 	m.init = true | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
| @@ -203,9 +205,11 @@ func (m *CPUFreqCollector) Read(interval time.Duration, output chan lp.CCMetric) | |||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 		if y, err := lp.New("cpufreq", t.tagSet, m.meta, map[string]interface{}{"value": cpuFreq}, now); err == nil { | 		if y, err := lp.New("cpufreq", t.tagSet, m.meta, map[string]interface{}{"value": cpuFreq}, now); err == nil { | ||||||
|  | 			m.statsProcessedMetrics++ | ||||||
| 			output <- y | 			output <- y | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
|  | 	stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics) | ||||||
| } | } | ||||||
|  |  | ||||||
| func (m *CPUFreqCollector) Close() { | func (m *CPUFreqCollector) Close() { | ||||||
|   | |||||||
| @@ -11,6 +11,7 @@ import ( | |||||||
|  |  | ||||||
| 	cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" | 	cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" | ||||||
| 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | ||||||
|  | 	stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| const CPUSTATFILE = `/proc/stat` | const CPUSTATFILE = `/proc/stat` | ||||||
| @@ -21,10 +22,11 @@ type CpustatCollectorConfig struct { | |||||||
|  |  | ||||||
| type CpustatCollector struct { | type CpustatCollector struct { | ||||||
| 	metricCollector | 	metricCollector | ||||||
| 	config   CpustatCollectorConfig | 	config                CpustatCollectorConfig | ||||||
| 	matches  map[string]int | 	matches               map[string]int | ||||||
| 	cputags  map[string]map[string]string | 	cputags               map[string]map[string]string | ||||||
| 	nodetags map[string]string | 	nodetags              map[string]string | ||||||
|  | 	statsProcessedMetrics int64 | ||||||
| } | } | ||||||
|  |  | ||||||
| func (m *CpustatCollector) Init(config json.RawMessage) error { | func (m *CpustatCollector) Init(config json.RawMessage) error { | ||||||
| @@ -86,6 +88,7 @@ func (m *CpustatCollector) Init(config json.RawMessage) error { | |||||||
| 			num_cpus++ | 			num_cpus++ | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
|  | 	m.statsProcessedMetrics = 0 | ||||||
| 	m.init = true | 	m.init = true | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
| @@ -106,6 +109,7 @@ func (m *CpustatCollector) parseStatLine(linefields []string, tags map[string]st | |||||||
| 	for name, value := range values { | 	for name, value := range values { | ||||||
| 		y, err := lp.New(name, tags, m.meta, map[string]interface{}{"value": (value * 100.0) / total}, t) | 		y, err := lp.New(name, tags, m.meta, map[string]interface{}{"value": (value * 100.0) / total}, t) | ||||||
| 		if err == nil { | 		if err == nil { | ||||||
|  | 			m.statsProcessedMetrics++ | ||||||
| 			output <- y | 			output <- y | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| @@ -141,8 +145,10 @@ func (m *CpustatCollector) Read(interval time.Duration, output chan lp.CCMetric) | |||||||
| 		time.Now(), | 		time.Now(), | ||||||
| 	) | 	) | ||||||
| 	if err == nil { | 	if err == nil { | ||||||
|  | 		m.statsProcessedMetrics++ | ||||||
| 		output <- num_cpus_metric | 		output <- num_cpus_metric | ||||||
| 	} | 	} | ||||||
|  | 	stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics) | ||||||
| } | } | ||||||
|  |  | ||||||
| func (m *CpustatCollector) Close() { | func (m *CpustatCollector) Close() { | ||||||
|   | |||||||
| @@ -10,6 +10,7 @@ import ( | |||||||
| 	"time" | 	"time" | ||||||
|  |  | ||||||
| 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | ||||||
|  | 	stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" | ||||||
| 	influx "github.com/influxdata/line-protocol" | 	influx "github.com/influxdata/line-protocol" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| @@ -23,11 +24,14 @@ type CustomCmdCollectorConfig struct { | |||||||
|  |  | ||||||
| type CustomCmdCollector struct { | type CustomCmdCollector struct { | ||||||
| 	metricCollector | 	metricCollector | ||||||
| 	handler  *influx.MetricHandler | 	handler                *influx.MetricHandler | ||||||
| 	parser   *influx.Parser | 	parser                 *influx.Parser | ||||||
| 	config   CustomCmdCollectorConfig | 	config                 CustomCmdCollectorConfig | ||||||
| 	commands []string | 	commands               []string | ||||||
| 	files    []string | 	files                  []string | ||||||
|  | 	statsProcessedMetrics  int64 | ||||||
|  | 	statsProcessedCommands int64 | ||||||
|  | 	statsProcessedFiles    int64 | ||||||
| } | } | ||||||
|  |  | ||||||
| func (m *CustomCmdCollector) Init(config json.RawMessage) error { | func (m *CustomCmdCollector) Init(config json.RawMessage) error { | ||||||
| @@ -66,6 +70,9 @@ func (m *CustomCmdCollector) Init(config json.RawMessage) error { | |||||||
| 	m.handler = influx.NewMetricHandler() | 	m.handler = influx.NewMetricHandler() | ||||||
| 	m.parser = influx.NewParser(m.handler) | 	m.parser = influx.NewParser(m.handler) | ||||||
| 	m.parser.SetTimeFunc(DefaultTime) | 	m.parser.SetTimeFunc(DefaultTime) | ||||||
|  | 	m.statsProcessedMetrics = 0 | ||||||
|  | 	m.statsProcessedFiles = 0 | ||||||
|  | 	m.statsProcessedCommands = 0 | ||||||
| 	m.init = true | 	m.init = true | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
| @@ -100,9 +107,13 @@ func (m *CustomCmdCollector) Read(interval time.Duration, output chan lp.CCMetri | |||||||
|  |  | ||||||
| 			y := lp.FromInfluxMetric(c) | 			y := lp.FromInfluxMetric(c) | ||||||
| 			if err == nil { | 			if err == nil { | ||||||
|  | 				m.statsProcessedMetrics++ | ||||||
|  | 				stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics) | ||||||
| 				output <- y | 				output <- y | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
|  | 		m.statsProcessedCommands++ | ||||||
|  | 		stats.ComponentStatInt(m.name, "processed_commands", m.statsProcessedCommands) | ||||||
| 	} | 	} | ||||||
| 	for _, file := range m.files { | 	for _, file := range m.files { | ||||||
| 		buffer, err := ioutil.ReadFile(file) | 		buffer, err := ioutil.ReadFile(file) | ||||||
| @@ -122,9 +133,13 @@ func (m *CustomCmdCollector) Read(interval time.Duration, output chan lp.CCMetri | |||||||
| 			} | 			} | ||||||
| 			y := lp.FromInfluxMetric(f) | 			y := lp.FromInfluxMetric(f) | ||||||
| 			if err == nil { | 			if err == nil { | ||||||
|  | 				m.statsProcessedMetrics++ | ||||||
|  | 				stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics) | ||||||
| 				output <- y | 				output <- y | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
|  | 		m.statsProcessedFiles++ | ||||||
|  | 		stats.ComponentStatInt(m.name, "processed_files", m.statsProcessedFiles) | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
|   | |||||||
| @@ -11,6 +11,7 @@ import ( | |||||||
|  |  | ||||||
| 	cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" | 	cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" | ||||||
| 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | ||||||
|  | 	stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| //	"log" | //	"log" | ||||||
| @@ -23,9 +24,8 @@ type DiskstatCollectorConfig struct { | |||||||
|  |  | ||||||
| type DiskstatCollector struct { | type DiskstatCollector struct { | ||||||
| 	metricCollector | 	metricCollector | ||||||
| 	//matches map[string]int | 	config                DiskstatCollectorConfig | ||||||
| 	config IOstatCollectorConfig | 	statsProcessedMetrics int64 | ||||||
| 	//devices map[string]IOstatCollectorEntry |  | ||||||
| } | } | ||||||
|  |  | ||||||
| func (m *DiskstatCollector) Init(config json.RawMessage) error { | func (m *DiskstatCollector) Init(config json.RawMessage) error { | ||||||
| @@ -44,6 +44,7 @@ func (m *DiskstatCollector) Init(config json.RawMessage) error { | |||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
| 	defer file.Close() | 	defer file.Close() | ||||||
|  | 	m.statsProcessedMetrics = 0 | ||||||
| 	m.init = true | 	m.init = true | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
| @@ -89,12 +90,16 @@ func (m *DiskstatCollector) Read(interval time.Duration, output chan lp.CCMetric | |||||||
| 		y, err := lp.New("disk_total", tags, m.meta, map[string]interface{}{"value": total}, time.Now()) | 		y, err := lp.New("disk_total", tags, m.meta, map[string]interface{}{"value": total}, time.Now()) | ||||||
| 		if err == nil { | 		if err == nil { | ||||||
| 			y.AddMeta("unit", "GBytes") | 			y.AddMeta("unit", "GBytes") | ||||||
|  | 			m.statsProcessedMetrics++ | ||||||
|  | 			stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics) | ||||||
| 			output <- y | 			output <- y | ||||||
| 		} | 		} | ||||||
| 		free := (stat.Bfree * uint64(stat.Bsize)) / uint64(1000000000) | 		free := (stat.Bfree * uint64(stat.Bsize)) / uint64(1000000000) | ||||||
| 		y, err = lp.New("disk_free", tags, m.meta, map[string]interface{}{"value": free}, time.Now()) | 		y, err = lp.New("disk_free", tags, m.meta, map[string]interface{}{"value": free}, time.Now()) | ||||||
| 		if err == nil { | 		if err == nil { | ||||||
| 			y.AddMeta("unit", "GBytes") | 			y.AddMeta("unit", "GBytes") | ||||||
|  | 			m.statsProcessedMetrics++ | ||||||
|  | 			stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics) | ||||||
| 			output <- y | 			output <- y | ||||||
| 		} | 		} | ||||||
| 		perc := (100 * (total - free)) / total | 		perc := (100 * (total - free)) / total | ||||||
| @@ -105,6 +110,8 @@ func (m *DiskstatCollector) Read(interval time.Duration, output chan lp.CCMetric | |||||||
| 	y, err := lp.New("part_max_used", map[string]string{"type": "node"}, m.meta, map[string]interface{}{"value": int(part_max_used)}, time.Now()) | 	y, err := lp.New("part_max_used", map[string]string{"type": "node"}, m.meta, map[string]interface{}{"value": int(part_max_used)}, time.Now()) | ||||||
| 	if err == nil { | 	if err == nil { | ||||||
| 		y.AddMeta("unit", "percent") | 		y.AddMeta("unit", "percent") | ||||||
|  | 		m.statsProcessedMetrics++ | ||||||
|  | 		stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics) | ||||||
| 		output <- y | 		output <- y | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|   | |||||||
| @@ -15,6 +15,7 @@ import ( | |||||||
|  |  | ||||||
| 	cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" | 	cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" | ||||||
| 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | ||||||
|  | 	stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| const DEFAULT_GPFS_CMD = "mmpmon" | const DEFAULT_GPFS_CMD = "mmpmon" | ||||||
| @@ -32,9 +33,10 @@ type GpfsCollector struct { | |||||||
| 		ExcludeFilesystem []string `json:"exclude_filesystem,omitempty"` | 		ExcludeFilesystem []string `json:"exclude_filesystem,omitempty"` | ||||||
| 		SendBandwidths    bool     `json:"send_bandwidths"` | 		SendBandwidths    bool     `json:"send_bandwidths"` | ||||||
| 	} | 	} | ||||||
| 	skipFS        map[string]struct{} | 	skipFS                map[string]struct{} | ||||||
| 	lastTimestamp time.Time // Store time stamp of last tick to derive bandwidths | 	lastTimestamp         time.Time // Store time stamp of last tick to derive bandwidths | ||||||
| 	lastState     map[string]GpfsCollectorLastState | 	lastState             map[string]GpfsCollectorLastState | ||||||
|  | 	statsProcessedMetrics int64 | ||||||
| } | } | ||||||
|  |  | ||||||
| func (m *GpfsCollector) Init(config json.RawMessage) error { | func (m *GpfsCollector) Init(config json.RawMessage) error { | ||||||
| @@ -86,7 +88,7 @@ func (m *GpfsCollector) Init(config json.RawMessage) error { | |||||||
| 		return fmt.Errorf("failed to find mmpmon binary '%s': %v", m.config.Mmpmon, err) | 		return fmt.Errorf("failed to find mmpmon binary '%s': %v", m.config.Mmpmon, err) | ||||||
| 	} | 	} | ||||||
| 	m.config.Mmpmon = p | 	m.config.Mmpmon = p | ||||||
|  | 	m.statsProcessedMetrics = 0 | ||||||
| 	m.init = true | 	m.init = true | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
| @@ -211,12 +213,14 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) { | |||||||
| 		} | 		} | ||||||
| 		if y, err := lp.New("gpfs_bytes_read", m.tags, m.meta, map[string]interface{}{"value": bytesRead}, timestamp); err == nil { | 		if y, err := lp.New("gpfs_bytes_read", m.tags, m.meta, map[string]interface{}{"value": bytesRead}, timestamp); err == nil { | ||||||
| 			output <- y | 			output <- y | ||||||
|  | 			m.statsProcessedMetrics++ | ||||||
| 		} | 		} | ||||||
| 		if m.config.SendBandwidths { | 		if m.config.SendBandwidths { | ||||||
| 			if lastBytesRead := m.lastState[filesystem].bytesRead; lastBytesRead >= 0 { | 			if lastBytesRead := m.lastState[filesystem].bytesRead; lastBytesRead >= 0 { | ||||||
| 				bwRead := float64(bytesRead-lastBytesRead) / timeDiff | 				bwRead := float64(bytesRead-lastBytesRead) / timeDiff | ||||||
| 				if y, err := lp.New("gpfs_bw_read", m.tags, m.meta, map[string]interface{}{"value": bwRead}, timestamp); err == nil { | 				if y, err := lp.New("gpfs_bw_read", m.tags, m.meta, map[string]interface{}{"value": bwRead}, timestamp); err == nil { | ||||||
| 					output <- y | 					output <- y | ||||||
|  | 					m.statsProcessedMetrics++ | ||||||
| 				} | 				} | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
| @@ -231,12 +235,14 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) { | |||||||
| 		} | 		} | ||||||
| 		if y, err := lp.New("gpfs_bytes_written", m.tags, m.meta, map[string]interface{}{"value": bytesWritten}, timestamp); err == nil { | 		if y, err := lp.New("gpfs_bytes_written", m.tags, m.meta, map[string]interface{}{"value": bytesWritten}, timestamp); err == nil { | ||||||
| 			output <- y | 			output <- y | ||||||
|  | 			m.statsProcessedMetrics++ | ||||||
| 		} | 		} | ||||||
| 		if m.config.SendBandwidths { | 		if m.config.SendBandwidths { | ||||||
| 			if lastBytesWritten := m.lastState[filesystem].bytesRead; lastBytesWritten >= 0 { | 			if lastBytesWritten := m.lastState[filesystem].bytesRead; lastBytesWritten >= 0 { | ||||||
| 				bwWrite := float64(bytesWritten-lastBytesWritten) / timeDiff | 				bwWrite := float64(bytesWritten-lastBytesWritten) / timeDiff | ||||||
| 				if y, err := lp.New("gpfs_bw_write", m.tags, m.meta, map[string]interface{}{"value": bwWrite}, timestamp); err == nil { | 				if y, err := lp.New("gpfs_bw_write", m.tags, m.meta, map[string]interface{}{"value": bwWrite}, timestamp); err == nil { | ||||||
| 					output <- y | 					output <- y | ||||||
|  | 					m.statsProcessedMetrics++ | ||||||
| 				} | 				} | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
| @@ -258,6 +264,7 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) { | |||||||
| 		} | 		} | ||||||
| 		if y, err := lp.New("gpfs_num_opens", m.tags, m.meta, map[string]interface{}{"value": numOpens}, timestamp); err == nil { | 		if y, err := lp.New("gpfs_num_opens", m.tags, m.meta, map[string]interface{}{"value": numOpens}, timestamp); err == nil { | ||||||
| 			output <- y | 			output <- y | ||||||
|  | 			m.statsProcessedMetrics++ | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 		// number of closes | 		// number of closes | ||||||
| @@ -270,6 +277,7 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) { | |||||||
| 		} | 		} | ||||||
| 		if y, err := lp.New("gpfs_num_closes", m.tags, m.meta, map[string]interface{}{"value": numCloses}, timestamp); err == nil { | 		if y, err := lp.New("gpfs_num_closes", m.tags, m.meta, map[string]interface{}{"value": numCloses}, timestamp); err == nil { | ||||||
| 			output <- y | 			output <- y | ||||||
|  | 			m.statsProcessedMetrics++ | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 		// number of reads | 		// number of reads | ||||||
| @@ -282,6 +290,7 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) { | |||||||
| 		} | 		} | ||||||
| 		if y, err := lp.New("gpfs_num_reads", m.tags, m.meta, map[string]interface{}{"value": numReads}, timestamp); err == nil { | 		if y, err := lp.New("gpfs_num_reads", m.tags, m.meta, map[string]interface{}{"value": numReads}, timestamp); err == nil { | ||||||
| 			output <- y | 			output <- y | ||||||
|  | 			m.statsProcessedMetrics++ | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 		// number of writes | 		// number of writes | ||||||
| @@ -294,6 +303,7 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) { | |||||||
| 		} | 		} | ||||||
| 		if y, err := lp.New("gpfs_num_writes", m.tags, m.meta, map[string]interface{}{"value": numWrites}, timestamp); err == nil { | 		if y, err := lp.New("gpfs_num_writes", m.tags, m.meta, map[string]interface{}{"value": numWrites}, timestamp); err == nil { | ||||||
| 			output <- y | 			output <- y | ||||||
|  | 			m.statsProcessedMetrics++ | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 		// number of read directories | 		// number of read directories | ||||||
| @@ -306,6 +316,7 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) { | |||||||
| 		} | 		} | ||||||
| 		if y, err := lp.New("gpfs_num_readdirs", m.tags, m.meta, map[string]interface{}{"value": numReaddirs}, timestamp); err == nil { | 		if y, err := lp.New("gpfs_num_readdirs", m.tags, m.meta, map[string]interface{}{"value": numReaddirs}, timestamp); err == nil { | ||||||
| 			output <- y | 			output <- y | ||||||
|  | 			m.statsProcessedMetrics++ | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 		// Number of inode updates | 		// Number of inode updates | ||||||
| @@ -317,9 +328,11 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) { | |||||||
| 			continue | 			continue | ||||||
| 		} | 		} | ||||||
| 		if y, err := lp.New("gpfs_num_inode_updates", m.tags, m.meta, map[string]interface{}{"value": numInodeUpdates}, timestamp); err == nil { | 		if y, err := lp.New("gpfs_num_inode_updates", m.tags, m.meta, map[string]interface{}{"value": numInodeUpdates}, timestamp); err == nil { | ||||||
|  | 			m.statsProcessedMetrics++ | ||||||
| 			output <- y | 			output <- y | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
|  | 	stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics) | ||||||
| } | } | ||||||
|  |  | ||||||
| func (m *GpfsCollector) Close() { | func (m *GpfsCollector) Close() { | ||||||
|   | |||||||
| @@ -7,6 +7,7 @@ import ( | |||||||
|  |  | ||||||
| 	cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" | 	cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" | ||||||
| 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | ||||||
|  | 	stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" | ||||||
| 	"golang.org/x/sys/unix" | 	"golang.org/x/sys/unix" | ||||||
|  |  | ||||||
| 	"encoding/json" | 	"encoding/json" | ||||||
| @@ -39,8 +40,9 @@ type InfinibandCollector struct { | |||||||
| 		SendAbsoluteValues bool     `json:"send_abs_values"`           // Send absolut values as read from sys filesystem | 		SendAbsoluteValues bool     `json:"send_abs_values"`           // Send absolut values as read from sys filesystem | ||||||
| 		SendDerivedValues  bool     `json:"send_derived_values"`       // Send derived values e.g. rates | 		SendDerivedValues  bool     `json:"send_derived_values"`       // Send derived values e.g. rates | ||||||
| 	} | 	} | ||||||
| 	info          []*InfinibandCollectorInfo | 	info                  []*InfinibandCollectorInfo | ||||||
| 	lastTimestamp time.Time // Store time stamp of last tick to derive bandwidths | 	lastTimestamp         time.Time // Store time stamp of last tick to derive bandwidths | ||||||
|  | 	statsProcessedMetrics int64 | ||||||
| } | } | ||||||
|  |  | ||||||
| // Init initializes the Infiniband collector by walking through files below IB_BASEPATH | // Init initializes the Infiniband collector by walking through files below IB_BASEPATH | ||||||
| @@ -149,7 +151,7 @@ func (m *InfinibandCollector) Init(config json.RawMessage) error { | |||||||
| 	if len(m.info) == 0 { | 	if len(m.info) == 0 { | ||||||
| 		return fmt.Errorf("found no IB devices") | 		return fmt.Errorf("found no IB devices") | ||||||
| 	} | 	} | ||||||
|  | 	m.statsProcessedMetrics = 0 | ||||||
| 	m.init = true | 	m.init = true | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
| @@ -196,6 +198,7 @@ func (m *InfinibandCollector) Read(interval time.Duration, output chan lp.CCMetr | |||||||
| 				if y, err := lp.New(counterName, info.tagSet, m.meta, map[string]interface{}{"value": v}, now); err == nil { | 				if y, err := lp.New(counterName, info.tagSet, m.meta, map[string]interface{}{"value": v}, now); err == nil { | ||||||
| 					y.AddMeta("unit", counterDef.unit) | 					y.AddMeta("unit", counterDef.unit) | ||||||
| 					output <- y | 					output <- y | ||||||
|  | 					m.statsProcessedMetrics++ | ||||||
| 				} | 				} | ||||||
| 			} | 			} | ||||||
|  |  | ||||||
| @@ -206,6 +209,7 @@ func (m *InfinibandCollector) Read(interval time.Duration, output chan lp.CCMetr | |||||||
| 					if y, err := lp.New(counterName+"_bw", info.tagSet, m.meta, map[string]interface{}{"value": rate}, now); err == nil { | 					if y, err := lp.New(counterName+"_bw", info.tagSet, m.meta, map[string]interface{}{"value": rate}, now); err == nil { | ||||||
| 						y.AddMeta("unit", counterDef.unit+"/sec") | 						y.AddMeta("unit", counterDef.unit+"/sec") | ||||||
| 						output <- y | 						output <- y | ||||||
|  | 						m.statsProcessedMetrics++ | ||||||
| 					} | 					} | ||||||
| 				} | 				} | ||||||
| 				// Save current state | 				// Save current state | ||||||
| @@ -214,6 +218,7 @@ func (m *InfinibandCollector) Read(interval time.Duration, output chan lp.CCMetr | |||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 	} | 	} | ||||||
|  | 	stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics) | ||||||
| } | } | ||||||
|  |  | ||||||
| func (m *InfinibandCollector) Close() { | func (m *InfinibandCollector) Close() { | ||||||
|   | |||||||
| @@ -6,6 +6,7 @@ import ( | |||||||
|  |  | ||||||
| 	cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" | 	cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" | ||||||
| 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | ||||||
|  | 	stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" | ||||||
|  |  | ||||||
| 	//	"log" | 	//	"log" | ||||||
| 	"encoding/json" | 	"encoding/json" | ||||||
| @@ -29,9 +30,10 @@ type IOstatCollectorEntry struct { | |||||||
|  |  | ||||||
| type IOstatCollector struct { | type IOstatCollector struct { | ||||||
| 	metricCollector | 	metricCollector | ||||||
| 	matches map[string]int | 	matches               map[string]int | ||||||
| 	config  IOstatCollectorConfig | 	config                IOstatCollectorConfig | ||||||
| 	devices map[string]IOstatCollectorEntry | 	devices               map[string]IOstatCollectorEntry | ||||||
|  | 	statsProcessedMetrics int64 | ||||||
| } | } | ||||||
|  |  | ||||||
| func (m *IOstatCollector) Init(config json.RawMessage) error { | func (m *IOstatCollector) Init(config json.RawMessage) error { | ||||||
| @@ -102,6 +104,7 @@ func (m *IOstatCollector) Init(config json.RawMessage) error { | |||||||
| 			lastValues: values, | 			lastValues: values, | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
|  | 	m.statsProcessedMetrics = 0 | ||||||
| 	m.init = true | 	m.init = true | ||||||
| 	return err | 	return err | ||||||
| } | } | ||||||
| @@ -141,6 +144,7 @@ func (m *IOstatCollector) Read(interval time.Duration, output chan lp.CCMetric) | |||||||
| 					y, err := lp.New(name, entry.tags, m.meta, map[string]interface{}{"value": int(diff)}, time.Now()) | 					y, err := lp.New(name, entry.tags, m.meta, map[string]interface{}{"value": int(diff)}, time.Now()) | ||||||
| 					if err == nil { | 					if err == nil { | ||||||
| 						output <- y | 						output <- y | ||||||
|  | 						m.statsProcessedMetrics++ | ||||||
| 					} | 					} | ||||||
| 				} | 				} | ||||||
| 				entry.lastValues[name] = x | 				entry.lastValues[name] = x | ||||||
| @@ -148,6 +152,7 @@ func (m *IOstatCollector) Read(interval time.Duration, output chan lp.CCMetric) | |||||||
| 		} | 		} | ||||||
| 		m.devices[device] = entry | 		m.devices[device] = entry | ||||||
| 	} | 	} | ||||||
|  | 	stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics) | ||||||
| } | } | ||||||
|  |  | ||||||
| func (m *IOstatCollector) Close() { | func (m *IOstatCollector) Close() { | ||||||
|   | |||||||
| @@ -11,6 +11,7 @@ import ( | |||||||
| 	"time" | 	"time" | ||||||
|  |  | ||||||
| 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | ||||||
|  | 	stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| const IPMITOOL_PATH = `ipmitool` | const IPMITOOL_PATH = `ipmitool` | ||||||
| @@ -26,9 +27,10 @@ type IpmiCollector struct { | |||||||
| 	metricCollector | 	metricCollector | ||||||
| 	//tags        map[string]string | 	//tags        map[string]string | ||||||
| 	//matches     map[string]string | 	//matches     map[string]string | ||||||
| 	config      IpmiCollectorConfig | 	config                IpmiCollectorConfig | ||||||
| 	ipmitool    string | 	ipmitool              string | ||||||
| 	ipmisensors string | 	ipmisensors           string | ||||||
|  | 	statsProcessedMetrics int64 | ||||||
| } | } | ||||||
|  |  | ||||||
| func (m *IpmiCollector) Init(config json.RawMessage) error { | func (m *IpmiCollector) Init(config json.RawMessage) error { | ||||||
| @@ -56,6 +58,7 @@ func (m *IpmiCollector) Init(config json.RawMessage) error { | |||||||
| 	if len(m.ipmitool) == 0 && len(m.ipmisensors) == 0 { | 	if len(m.ipmitool) == 0 && len(m.ipmisensors) == 0 { | ||||||
| 		return errors.New("no IPMI reader found") | 		return errors.New("no IPMI reader found") | ||||||
| 	} | 	} | ||||||
|  | 	m.statsProcessedMetrics = 0 | ||||||
| 	m.init = true | 	m.init = true | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
| @@ -94,6 +97,7 @@ func (m *IpmiCollector) readIpmiTool(cmd string, output chan lp.CCMetric) { | |||||||
| 			if err == nil { | 			if err == nil { | ||||||
| 				y.AddMeta("unit", unit) | 				y.AddMeta("unit", unit) | ||||||
| 				output <- y | 				output <- y | ||||||
|  | 				m.statsProcessedMetrics++ | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| @@ -123,6 +127,7 @@ func (m *IpmiCollector) readIpmiSensors(cmd string, output chan lp.CCMetric) { | |||||||
| 						y.AddMeta("unit", lv[4]) | 						y.AddMeta("unit", lv[4]) | ||||||
| 					} | 					} | ||||||
| 					output <- y | 					output <- y | ||||||
|  | 					m.statsProcessedMetrics++ | ||||||
| 				} | 				} | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
| @@ -141,6 +146,7 @@ func (m *IpmiCollector) Read(interval time.Duration, output chan lp.CCMetric) { | |||||||
| 			m.readIpmiSensors(m.config.IpmisensorsPath, output) | 			m.readIpmiSensors(m.config.IpmisensorsPath, output) | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
|  | 	stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics) | ||||||
| } | } | ||||||
|  |  | ||||||
| func (m *IpmiCollector) Close() { | func (m *IpmiCollector) Close() { | ||||||
|   | |||||||
| @@ -28,6 +28,7 @@ import ( | |||||||
| 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | ||||||
| 	topo "github.com/ClusterCockpit/cc-metric-collector/internal/ccTopology" | 	topo "github.com/ClusterCockpit/cc-metric-collector/internal/ccTopology" | ||||||
| 	agg "github.com/ClusterCockpit/cc-metric-collector/internal/metricAggregator" | 	agg "github.com/ClusterCockpit/cc-metric-collector/internal/metricAggregator" | ||||||
|  | 	stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" | ||||||
| 	"github.com/NVIDIA/go-nvml/pkg/dl" | 	"github.com/NVIDIA/go-nvml/pkg/dl" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| @@ -72,18 +73,21 @@ type LikwidCollectorConfig struct { | |||||||
|  |  | ||||||
| type LikwidCollector struct { | type LikwidCollector struct { | ||||||
| 	metricCollector | 	metricCollector | ||||||
| 	cpulist      []C.int | 	cpulist               []C.int | ||||||
| 	cpu2tid      map[int]int | 	cpu2tid               map[int]int | ||||||
| 	sock2tid     map[int]int | 	sock2tid              map[int]int | ||||||
| 	metrics      map[C.int]map[string]int | 	metrics               map[C.int]map[string]int | ||||||
| 	groups       []C.int | 	groups                []C.int | ||||||
| 	config       LikwidCollectorConfig | 	config                LikwidCollectorConfig | ||||||
| 	gmresults    map[int]map[string]float64 | 	gmresults             map[int]map[string]float64 | ||||||
| 	basefreq     float64 | 	basefreq              float64 | ||||||
| 	running      bool | 	running               bool | ||||||
| 	initialized  bool | 	initialized           bool | ||||||
| 	likwidGroups map[C.int]LikwidEventsetConfig | 	likwidGroups          map[C.int]LikwidEventsetConfig | ||||||
| 	lock         sync.Mutex | 	lock                  sync.Mutex | ||||||
|  | 	statsMeasurements     int64 | ||||||
|  | 	statsProcessedMetrics int64 | ||||||
|  | 	statsPublishedMetrics int64 | ||||||
| } | } | ||||||
|  |  | ||||||
| type LikwidMetric struct { | type LikwidMetric struct { | ||||||
| @@ -267,6 +271,9 @@ func (m *LikwidCollector) Init(config json.RawMessage) error { | |||||||
| 		cclog.ComponentError(m.name, err.Error()) | 		cclog.ComponentError(m.name, err.Error()) | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
|  | 	m.statsMeasurements = 0 | ||||||
|  | 	m.statsProcessedMetrics = 0 | ||||||
|  | 	m.statsPublishedMetrics = 0 | ||||||
| 	m.init = true | 	m.init = true | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
| @@ -274,6 +281,7 @@ func (m *LikwidCollector) Init(config json.RawMessage) error { | |||||||
| // take a measurement for 'interval' seconds of event set index 'group' | // take a measurement for 'interval' seconds of event set index 'group' | ||||||
| func (m *LikwidCollector) takeMeasurement(evset LikwidEventsetConfig, interval time.Duration) (bool, error) { | func (m *LikwidCollector) takeMeasurement(evset LikwidEventsetConfig, interval time.Duration) (bool, error) { | ||||||
| 	var ret C.int | 	var ret C.int | ||||||
|  |  | ||||||
| 	m.lock.Lock() | 	m.lock.Lock() | ||||||
| 	if m.initialized { | 	if m.initialized { | ||||||
| 		ret = C.perfmon_setupCounters(evset.gid) | 		ret = C.perfmon_setupCounters(evset.gid) | ||||||
| @@ -317,6 +325,8 @@ func (m *LikwidCollector) takeMeasurement(evset LikwidEventsetConfig, interval t | |||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 	m.lock.Unlock() | 	m.lock.Unlock() | ||||||
|  | 	m.statsMeasurements++ | ||||||
|  | 	stats.ComponentStatInt(m.name, "measurements", m.statsMeasurements) | ||||||
| 	return false, nil | 	return false, nil | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -357,6 +367,8 @@ func (m *LikwidCollector) calcEventsetMetrics(evset LikwidEventsetConfig, interv | |||||||
| 				if m.config.InvalidToZero && math.IsInf(value, 0) { | 				if m.config.InvalidToZero && math.IsInf(value, 0) { | ||||||
| 					value = 0.0 | 					value = 0.0 | ||||||
| 				} | 				} | ||||||
|  | 				m.statsProcessedMetrics++ | ||||||
|  | 				stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics) | ||||||
| 				// Now we have the result, send it with the proper tags | 				// Now we have the result, send it with the proper tags | ||||||
| 				if !math.IsNaN(value) { | 				if !math.IsNaN(value) { | ||||||
| 					if metric.Publish { | 					if metric.Publish { | ||||||
| @@ -369,6 +381,8 @@ func (m *LikwidCollector) calcEventsetMetrics(evset LikwidEventsetConfig, interv | |||||||
| 							if len(metric.Unit) > 0 { | 							if len(metric.Unit) > 0 { | ||||||
| 								y.AddMeta("unit", metric.Unit) | 								y.AddMeta("unit", metric.Unit) | ||||||
| 							} | 							} | ||||||
|  | 							m.statsPublishedMetrics++ | ||||||
|  | 							stats.ComponentStatInt(m.name, "published_metrics", m.statsPublishedMetrics) | ||||||
| 							output <- y | 							output <- y | ||||||
| 						} | 						} | ||||||
| 					} | 					} | ||||||
| @@ -409,6 +423,8 @@ func (m *LikwidCollector) calcGlobalMetrics(interval time.Duration, output chan | |||||||
| 				if m.config.InvalidToZero && math.IsInf(value, 0) { | 				if m.config.InvalidToZero && math.IsInf(value, 0) { | ||||||
| 					value = 0.0 | 					value = 0.0 | ||||||
| 				} | 				} | ||||||
|  | 				m.statsProcessedMetrics++ | ||||||
|  | 				stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics) | ||||||
| 				// Now we have the result, send it with the proper tags | 				// Now we have the result, send it with the proper tags | ||||||
| 				if !math.IsNaN(value) { | 				if !math.IsNaN(value) { | ||||||
| 					if metric.Publish { | 					if metric.Publish { | ||||||
| @@ -422,6 +438,8 @@ func (m *LikwidCollector) calcGlobalMetrics(interval time.Duration, output chan | |||||||
| 							if len(metric.Unit) > 0 { | 							if len(metric.Unit) > 0 { | ||||||
| 								y.AddMeta("unit", metric.Unit) | 								y.AddMeta("unit", metric.Unit) | ||||||
| 							} | 							} | ||||||
|  | 							m.statsPublishedMetrics++ | ||||||
|  | 							stats.ComponentStatInt(m.name, "published_metrics", m.statsPublishedMetrics) | ||||||
| 							output <- y | 							output <- y | ||||||
| 						} | 						} | ||||||
| 					} | 					} | ||||||
|   | |||||||
| @@ -10,6 +10,7 @@ import ( | |||||||
|  |  | ||||||
| 	cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" | 	cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" | ||||||
| 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | ||||||
|  | 	stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| // | // | ||||||
| @@ -32,6 +33,7 @@ type LoadavgCollector struct { | |||||||
| 	config       struct { | 	config       struct { | ||||||
| 		ExcludeMetrics []string `json:"exclude_metrics,omitempty"` | 		ExcludeMetrics []string `json:"exclude_metrics,omitempty"` | ||||||
| 	} | 	} | ||||||
|  | 	statsProcessedMetrics int64 | ||||||
| } | } | ||||||
|  |  | ||||||
| func (m *LoadavgCollector) Init(config json.RawMessage) error { | func (m *LoadavgCollector) Init(config json.RawMessage) error { | ||||||
| @@ -63,6 +65,7 @@ func (m *LoadavgCollector) Init(config json.RawMessage) error { | |||||||
| 	for i, name := range m.proc_matches { | 	for i, name := range m.proc_matches { | ||||||
| 		_, m.proc_skips[i] = stringArrayContains(m.config.ExcludeMetrics, name) | 		_, m.proc_skips[i] = stringArrayContains(m.config.ExcludeMetrics, name) | ||||||
| 	} | 	} | ||||||
|  | 	m.statsProcessedMetrics = 0 | ||||||
| 	m.init = true | 	m.init = true | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
| @@ -98,6 +101,7 @@ func (m *LoadavgCollector) Read(interval time.Duration, output chan lp.CCMetric) | |||||||
| 		y, err := lp.New(name, m.tags, m.meta, map[string]interface{}{"value": x}, now) | 		y, err := lp.New(name, m.tags, m.meta, map[string]interface{}{"value": x}, now) | ||||||
| 		if err == nil { | 		if err == nil { | ||||||
| 			output <- y | 			output <- y | ||||||
|  | 			m.statsProcessedMetrics++ | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| @@ -117,9 +121,10 @@ func (m *LoadavgCollector) Read(interval time.Duration, output chan lp.CCMetric) | |||||||
| 		y, err := lp.New(name, m.tags, m.meta, map[string]interface{}{"value": x}, now) | 		y, err := lp.New(name, m.tags, m.meta, map[string]interface{}{"value": x}, now) | ||||||
| 		if err == nil { | 		if err == nil { | ||||||
| 			output <- y | 			output <- y | ||||||
|  | 			m.statsProcessedMetrics++ | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 	} | 	} | ||||||
|  | 	stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics) | ||||||
| } | } | ||||||
|  |  | ||||||
| func (m *LoadavgCollector) Close() { | func (m *LoadavgCollector) Close() { | ||||||
|   | |||||||
| @@ -12,6 +12,7 @@ import ( | |||||||
|  |  | ||||||
| 	cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" | 	cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" | ||||||
| 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | ||||||
|  | 	stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| const LUSTRE_SYSFS = `/sys/fs/lustre` | const LUSTRE_SYSFS = `/sys/fs/lustre` | ||||||
| @@ -37,13 +38,14 @@ type LustreMetricDefinition struct { | |||||||
|  |  | ||||||
| type LustreCollector struct { | type LustreCollector struct { | ||||||
| 	metricCollector | 	metricCollector | ||||||
| 	tags          map[string]string | 	tags                  map[string]string | ||||||
| 	config        LustreCollectorConfig | 	config                LustreCollectorConfig | ||||||
| 	lctl          string | 	lctl                  string | ||||||
| 	sudoCmd       string | 	sudoCmd               string | ||||||
| 	lastTimestamp time.Time                   // Store time stamp of last tick to derive bandwidths | 	lastTimestamp         time.Time                   // Store time stamp of last tick to derive bandwidths | ||||||
| 	definitions   []LustreMetricDefinition    // Combined list without excluded metrics | 	definitions           []LustreMetricDefinition    // Combined list without excluded metrics | ||||||
| 	stats         map[string]map[string]int64 // Data for last value per device and metric | 	stats                 map[string]map[string]int64 // Data for last value per device and metric | ||||||
|  | 	statsProcessedMetrics int64 | ||||||
| } | } | ||||||
|  |  | ||||||
| func (m *LustreCollector) getDeviceDataCommand(device string) []string { | func (m *LustreCollector) getDeviceDataCommand(device string) []string { | ||||||
| @@ -372,6 +374,7 @@ func (m *LustreCollector) Init(config json.RawMessage) error { | |||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 	m.lastTimestamp = time.Now() | 	m.lastTimestamp = time.Now() | ||||||
|  | 	m.statsProcessedMetrics = 0 | ||||||
| 	m.init = true | 	m.init = true | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
| @@ -418,11 +421,13 @@ func (m *LustreCollector) Read(interval time.Duration, output chan lp.CCMetric) | |||||||
| 					y.AddMeta("unit", def.unit) | 					y.AddMeta("unit", def.unit) | ||||||
| 				} | 				} | ||||||
| 				output <- y | 				output <- y | ||||||
|  | 				m.statsProcessedMetrics++ | ||||||
| 			} | 			} | ||||||
| 			devData[def.name] = use_x | 			devData[def.name] = use_x | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 	m.lastTimestamp = now | 	m.lastTimestamp = now | ||||||
|  | 	stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics) | ||||||
| } | } | ||||||
|  |  | ||||||
| func (m *LustreCollector) Close() { | func (m *LustreCollector) Close() { | ||||||
|   | |||||||
| @@ -14,6 +14,7 @@ import ( | |||||||
|  |  | ||||||
| 	cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" | 	cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" | ||||||
| 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | ||||||
|  | 	stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| const MEMSTATFILE = "/proc/meminfo" | const MEMSTATFILE = "/proc/meminfo" | ||||||
| @@ -32,12 +33,13 @@ type MemstatCollectorNode struct { | |||||||
|  |  | ||||||
| type MemstatCollector struct { | type MemstatCollector struct { | ||||||
| 	metricCollector | 	metricCollector | ||||||
| 	stats       map[string]int64 | 	stats                 map[string]int64 | ||||||
| 	tags        map[string]string | 	tags                  map[string]string | ||||||
| 	matches     map[string]string | 	matches               map[string]string | ||||||
| 	config      MemstatCollectorConfig | 	config                MemstatCollectorConfig | ||||||
| 	nodefiles   map[int]MemstatCollectorNode | 	nodefiles             map[int]MemstatCollectorNode | ||||||
| 	sendMemUsed bool | 	sendMemUsed           bool | ||||||
|  | 	statsProcessedMetrics int64 | ||||||
| } | } | ||||||
|  |  | ||||||
| type MemstatStats struct { | type MemstatStats struct { | ||||||
| @@ -153,6 +155,7 @@ func (m *MemstatCollector) Init(config json.RawMessage) error { | |||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
|  | 	m.statsProcessedMetrics = 0 | ||||||
| 	m.init = true | 	m.init = true | ||||||
| 	return err | 	return err | ||||||
| } | } | ||||||
| @@ -178,6 +181,7 @@ func (m *MemstatCollector) Read(interval time.Duration, output chan lp.CCMetric) | |||||||
| 				if len(unit) > 0 { | 				if len(unit) > 0 { | ||||||
| 					y.AddMeta("unit", unit) | 					y.AddMeta("unit", unit) | ||||||
| 				} | 				} | ||||||
|  | 				m.statsProcessedMetrics++ | ||||||
| 				output <- y | 				output <- y | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
| @@ -207,6 +211,7 @@ func (m *MemstatCollector) Read(interval time.Duration, output chan lp.CCMetric) | |||||||
| 				if len(unit) > 0 { | 				if len(unit) > 0 { | ||||||
| 					y.AddMeta("unit", unit) | 					y.AddMeta("unit", unit) | ||||||
| 				} | 				} | ||||||
|  | 				m.statsProcessedMetrics++ | ||||||
| 				output <- y | 				output <- y | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
| @@ -223,6 +228,7 @@ func (m *MemstatCollector) Read(interval time.Duration, output chan lp.CCMetric) | |||||||
| 			sendStats(stats, nodeConf.tags) | 			sendStats(stats, nodeConf.tags) | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
|  | 	stats.ComponentStatInt(m.name, "collected_metrics", m.statsProcessedMetrics) | ||||||
| } | } | ||||||
|  |  | ||||||
| func (m *MemstatCollector) Close() { | func (m *MemstatCollector) Close() { | ||||||
|   | |||||||
| @@ -11,6 +11,7 @@ import ( | |||||||
|  |  | ||||||
| 	cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" | 	cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" | ||||||
| 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | ||||||
|  | 	stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| const NETSTATFILE = "/proc/net/dev" | const NETSTATFILE = "/proc/net/dev" | ||||||
| @@ -32,9 +33,10 @@ type NetstatCollectorMetric struct { | |||||||
|  |  | ||||||
| type NetstatCollector struct { | type NetstatCollector struct { | ||||||
| 	metricCollector | 	metricCollector | ||||||
| 	config        NetstatCollectorConfig | 	config                NetstatCollectorConfig | ||||||
| 	matches       map[string][]NetstatCollectorMetric | 	matches               map[string][]NetstatCollectorMetric | ||||||
| 	lastTimestamp time.Time | 	lastTimestamp         time.Time | ||||||
|  | 	statsProcessedMetrics int64 | ||||||
| } | } | ||||||
|  |  | ||||||
| func (m *NetstatCollector) Init(config json.RawMessage) error { | func (m *NetstatCollector) Init(config json.RawMessage) error { | ||||||
| @@ -148,6 +150,7 @@ func (m *NetstatCollector) Init(config json.RawMessage) error { | |||||||
| 	if len(m.matches) == 0 { | 	if len(m.matches) == 0 { | ||||||
| 		return errors.New("no devices to collector metrics found") | 		return errors.New("no devices to collector metrics found") | ||||||
| 	} | 	} | ||||||
|  | 	m.statsProcessedMetrics = 0 | ||||||
| 	m.init = true | 	m.init = true | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
| @@ -198,6 +201,7 @@ func (m *NetstatCollector) Read(interval time.Duration, output chan lp.CCMetric) | |||||||
| 				if m.config.SendAbsoluteValues { | 				if m.config.SendAbsoluteValues { | ||||||
| 					if y, err := lp.New(metric.name, metric.tags, metric.meta, map[string]interface{}{"value": v}, now); err == nil { | 					if y, err := lp.New(metric.name, metric.tags, metric.meta, map[string]interface{}{"value": v}, now); err == nil { | ||||||
| 						output <- y | 						output <- y | ||||||
|  | 						m.statsProcessedMetrics++ | ||||||
| 					} | 					} | ||||||
| 				} | 				} | ||||||
| 				if m.config.SendDerivedValues { | 				if m.config.SendDerivedValues { | ||||||
| @@ -205,6 +209,7 @@ func (m *NetstatCollector) Read(interval time.Duration, output chan lp.CCMetric) | |||||||
| 						rate := float64(v-metric.lastValue) / timeDiff | 						rate := float64(v-metric.lastValue) / timeDiff | ||||||
| 						if y, err := lp.New(metric.name+"_bw", metric.tags, metric.meta_rates, map[string]interface{}{"value": rate}, now); err == nil { | 						if y, err := lp.New(metric.name+"_bw", metric.tags, metric.meta_rates, map[string]interface{}{"value": rate}, now); err == nil { | ||||||
| 							output <- y | 							output <- y | ||||||
|  | 							m.statsProcessedMetrics++ | ||||||
| 						} | 						} | ||||||
| 					} | 					} | ||||||
| 					metric.lastValue = v | 					metric.lastValue = v | ||||||
| @@ -212,6 +217,7 @@ func (m *NetstatCollector) Read(interval time.Duration, output chan lp.CCMetric) | |||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
|  | 	stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics) | ||||||
| } | } | ||||||
|  |  | ||||||
| func (m *NetstatCollector) Close() { | func (m *NetstatCollector) Close() { | ||||||
|   | |||||||
| @@ -12,6 +12,7 @@ import ( | |||||||
| 	"time" | 	"time" | ||||||
|  |  | ||||||
| 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | ||||||
|  | 	stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| // First part contains the code for the general NfsCollector. | // First part contains the code for the general NfsCollector. | ||||||
| @@ -32,7 +33,8 @@ type nfsCollector struct { | |||||||
| 		Nfsstats       string   `json:"nfsstat"` | 		Nfsstats       string   `json:"nfsstat"` | ||||||
| 		ExcludeMetrics []string `json:"exclude_metrics,omitempty"` | 		ExcludeMetrics []string `json:"exclude_metrics,omitempty"` | ||||||
| 	} | 	} | ||||||
| 	data map[string]NfsCollectorData | 	data                  map[string]NfsCollectorData | ||||||
|  | 	statsProcessedMetrics int64 | ||||||
| } | } | ||||||
|  |  | ||||||
| func (m *nfsCollector) initStats() error { | func (m *nfsCollector) initStats() error { | ||||||
| @@ -113,6 +115,7 @@ func (m *nfsCollector) MainInit(config json.RawMessage) error { | |||||||
| 	} | 	} | ||||||
| 	m.data = make(map[string]NfsCollectorData) | 	m.data = make(map[string]NfsCollectorData) | ||||||
| 	m.initStats() | 	m.initStats() | ||||||
|  | 	m.statsProcessedMetrics = 0 | ||||||
| 	m.init = true | 	m.init = true | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
| @@ -143,8 +146,10 @@ func (m *nfsCollector) Read(interval time.Duration, output chan lp.CCMetric) { | |||||||
| 		if err == nil { | 		if err == nil { | ||||||
| 			y.AddMeta("version", m.version) | 			y.AddMeta("version", m.version) | ||||||
| 			output <- y | 			output <- y | ||||||
|  | 			m.statsProcessedMetrics++ | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
|  | 	stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics) | ||||||
| } | } | ||||||
|  |  | ||||||
| func (m *nfsCollector) Close() { | func (m *nfsCollector) Close() { | ||||||
|   | |||||||
| @@ -12,6 +12,7 @@ import ( | |||||||
|  |  | ||||||
| 	cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" | 	cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" | ||||||
| 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | ||||||
|  | 	stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| // | // | ||||||
| @@ -44,7 +45,8 @@ type NUMAStatsCollectorTopolgy struct { | |||||||
|  |  | ||||||
| type NUMAStatsCollector struct { | type NUMAStatsCollector struct { | ||||||
| 	metricCollector | 	metricCollector | ||||||
| 	topology []NUMAStatsCollectorTopolgy | 	topology              []NUMAStatsCollectorTopolgy | ||||||
|  | 	statsProcessedMetrics int64 | ||||||
| } | } | ||||||
|  |  | ||||||
| func (m *NUMAStatsCollector) Init(config json.RawMessage) error { | func (m *NUMAStatsCollector) Init(config json.RawMessage) error { | ||||||
| @@ -80,7 +82,7 @@ func (m *NUMAStatsCollector) Init(config json.RawMessage) error { | |||||||
| 				tagSet: map[string]string{"memoryDomain": node}, | 				tagSet: map[string]string{"memoryDomain": node}, | ||||||
| 			}) | 			}) | ||||||
| 	} | 	} | ||||||
|  | 	m.statsProcessedMetrics = 0 | ||||||
| 	m.init = true | 	m.init = true | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
| @@ -127,11 +129,13 @@ func (m *NUMAStatsCollector) Read(interval time.Duration, output chan lp.CCMetri | |||||||
| 			) | 			) | ||||||
| 			if err == nil { | 			if err == nil { | ||||||
| 				output <- y | 				output <- y | ||||||
|  | 				m.statsProcessedMetrics++ | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 		file.Close() | 		file.Close() | ||||||
| 	} | 	} | ||||||
|  | 	stats.ComponentStatInt(m.name, "collected_metrics", m.statsProcessedMetrics) | ||||||
| } | } | ||||||
|  |  | ||||||
| func (m *NUMAStatsCollector) Close() { | func (m *NUMAStatsCollector) Close() { | ||||||
|   | |||||||
| @@ -9,6 +9,7 @@ import ( | |||||||
|  |  | ||||||
| 	cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" | 	cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" | ||||||
| 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | ||||||
|  | 	stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" | ||||||
| 	"github.com/NVIDIA/go-nvml/pkg/nvml" | 	"github.com/NVIDIA/go-nvml/pkg/nvml" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| @@ -26,9 +27,10 @@ type NvidiaCollectorDevice struct { | |||||||
|  |  | ||||||
| type NvidiaCollector struct { | type NvidiaCollector struct { | ||||||
| 	metricCollector | 	metricCollector | ||||||
| 	num_gpus int | 	num_gpus              int | ||||||
| 	config   NvidiaCollectorConfig | 	config                NvidiaCollectorConfig | ||||||
| 	gpus     []NvidiaCollectorDevice | 	gpus                  []NvidiaCollectorDevice | ||||||
|  | 	statsProcessedMetrics int64 | ||||||
| } | } | ||||||
|  |  | ||||||
| func (m *NvidiaCollector) CatchPanic() { | func (m *NvidiaCollector) CatchPanic() { | ||||||
| @@ -120,7 +122,7 @@ func (m *NvidiaCollector) Init(config json.RawMessage) error { | |||||||
| 				pciInfo.Device) | 				pciInfo.Device) | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
|  | 	m.statsProcessedMetrics = 0 | ||||||
| 	m.init = true | 	m.init = true | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
| @@ -151,6 +153,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric) | |||||||
| 					if err == nil { | 					if err == nil { | ||||||
| 						y.AddMeta("unit", "%") | 						y.AddMeta("unit", "%") | ||||||
| 						output <- y | 						output <- y | ||||||
|  | 						m.statsProcessedMetrics++ | ||||||
| 					} | 					} | ||||||
| 				} | 				} | ||||||
| 				if !device.excludeMetrics["nv_mem_util"] { | 				if !device.excludeMetrics["nv_mem_util"] { | ||||||
| @@ -158,6 +161,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric) | |||||||
| 					if err == nil { | 					if err == nil { | ||||||
| 						y.AddMeta("unit", "%") | 						y.AddMeta("unit", "%") | ||||||
| 						output <- y | 						output <- y | ||||||
|  | 						m.statsProcessedMetrics++ | ||||||
| 					} | 					} | ||||||
| 				} | 				} | ||||||
| 			} | 			} | ||||||
| @@ -186,6 +190,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric) | |||||||
| 					if err == nil { | 					if err == nil { | ||||||
| 						y.AddMeta("unit", "MByte") | 						y.AddMeta("unit", "MByte") | ||||||
| 						output <- y | 						output <- y | ||||||
|  | 						m.statsProcessedMetrics++ | ||||||
| 					} | 					} | ||||||
| 				} | 				} | ||||||
|  |  | ||||||
| @@ -195,6 +200,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric) | |||||||
| 					if err == nil { | 					if err == nil { | ||||||
| 						y.AddMeta("unit", "MByte") | 						y.AddMeta("unit", "MByte") | ||||||
| 						output <- y | 						output <- y | ||||||
|  | 						m.statsProcessedMetrics++ | ||||||
| 					} | 					} | ||||||
| 				} | 				} | ||||||
| 			} | 			} | ||||||
| @@ -212,6 +218,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric) | |||||||
| 				if err == nil { | 				if err == nil { | ||||||
| 					y.AddMeta("unit", "degC") | 					y.AddMeta("unit", "degC") | ||||||
| 					output <- y | 					output <- y | ||||||
|  | 					m.statsProcessedMetrics++ | ||||||
| 				} | 				} | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
| @@ -232,6 +239,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric) | |||||||
| 				if err == nil { | 				if err == nil { | ||||||
| 					y.AddMeta("unit", "%") | 					y.AddMeta("unit", "%") | ||||||
| 					output <- y | 					output <- y | ||||||
|  | 					m.statsProcessedMetrics++ | ||||||
| 				} | 				} | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
| @@ -258,11 +266,13 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric) | |||||||
| 				} | 				} | ||||||
| 				if err == nil { | 				if err == nil { | ||||||
| 					output <- y | 					output <- y | ||||||
|  | 					m.statsProcessedMetrics++ | ||||||
| 				} | 				} | ||||||
| 			} else if ret == nvml.ERROR_NOT_SUPPORTED { | 			} else if ret == nvml.ERROR_NOT_SUPPORTED { | ||||||
| 				y, err := lp.New("nv_ecc_mode", device.tags, m.meta, map[string]interface{}{"value": "N/A"}, time.Now()) | 				y, err := lp.New("nv_ecc_mode", device.tags, m.meta, map[string]interface{}{"value": "N/A"}, time.Now()) | ||||||
| 				if err == nil { | 				if err == nil { | ||||||
| 					output <- y | 					output <- y | ||||||
|  | 					m.statsProcessedMetrics++ | ||||||
| 				} | 				} | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
| @@ -280,6 +290,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric) | |||||||
| 				y, err := lp.New("nv_perf_state", device.tags, m.meta, map[string]interface{}{"value": fmt.Sprintf("P%d", int(pState))}, time.Now()) | 				y, err := lp.New("nv_perf_state", device.tags, m.meta, map[string]interface{}{"value": fmt.Sprintf("P%d", int(pState))}, time.Now()) | ||||||
| 				if err == nil { | 				if err == nil { | ||||||
| 					output <- y | 					output <- y | ||||||
|  | 					m.statsProcessedMetrics++ | ||||||
| 				} | 				} | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
| @@ -296,6 +307,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric) | |||||||
| 				if err == nil { | 				if err == nil { | ||||||
| 					y.AddMeta("unit", "watts") | 					y.AddMeta("unit", "watts") | ||||||
| 					output <- y | 					output <- y | ||||||
|  | 					m.statsProcessedMetrics++ | ||||||
| 				} | 				} | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
| @@ -313,6 +325,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric) | |||||||
| 				if err == nil { | 				if err == nil { | ||||||
| 					y.AddMeta("unit", "MHz") | 					y.AddMeta("unit", "MHz") | ||||||
| 					output <- y | 					output <- y | ||||||
|  | 					m.statsProcessedMetrics++ | ||||||
| 				} | 				} | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
| @@ -324,6 +337,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric) | |||||||
| 				if err == nil { | 				if err == nil { | ||||||
| 					y.AddMeta("unit", "MHz") | 					y.AddMeta("unit", "MHz") | ||||||
| 					output <- y | 					output <- y | ||||||
|  | 					m.statsProcessedMetrics++ | ||||||
| 				} | 				} | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
| @@ -335,6 +349,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric) | |||||||
| 				if err == nil { | 				if err == nil { | ||||||
| 					y.AddMeta("unit", "MHz") | 					y.AddMeta("unit", "MHz") | ||||||
| 					output <- y | 					output <- y | ||||||
|  | 					m.statsProcessedMetrics++ | ||||||
| 				} | 				} | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
| @@ -357,6 +372,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric) | |||||||
| 				if err == nil { | 				if err == nil { | ||||||
| 					y.AddMeta("unit", "MHz") | 					y.AddMeta("unit", "MHz") | ||||||
| 					output <- y | 					output <- y | ||||||
|  | 					m.statsProcessedMetrics++ | ||||||
| 				} | 				} | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
| @@ -368,6 +384,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric) | |||||||
| 				if err == nil { | 				if err == nil { | ||||||
| 					y.AddMeta("unit", "MHz") | 					y.AddMeta("unit", "MHz") | ||||||
| 					output <- y | 					output <- y | ||||||
|  | 					m.statsProcessedMetrics++ | ||||||
| 				} | 				} | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
| @@ -379,6 +396,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric) | |||||||
| 				if err == nil { | 				if err == nil { | ||||||
| 					y.AddMeta("unit", "MHz") | 					y.AddMeta("unit", "MHz") | ||||||
| 					output <- y | 					output <- y | ||||||
|  | 					m.statsProcessedMetrics++ | ||||||
| 				} | 				} | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
| @@ -398,6 +416,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric) | |||||||
| 				y, err := lp.New("nv_ecc_db_error", device.tags, m.meta, map[string]interface{}{"value": float64(ecc_db)}, time.Now()) | 				y, err := lp.New("nv_ecc_db_error", device.tags, m.meta, map[string]interface{}{"value": float64(ecc_db)}, time.Now()) | ||||||
| 				if err == nil { | 				if err == nil { | ||||||
| 					output <- y | 					output <- y | ||||||
|  | 					m.statsProcessedMetrics++ | ||||||
| 				} | 				} | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
| @@ -408,6 +427,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric) | |||||||
| 				y, err := lp.New("nv_ecc_sb_error", device.tags, m.meta, map[string]interface{}{"value": float64(ecc_sb)}, time.Now()) | 				y, err := lp.New("nv_ecc_sb_error", device.tags, m.meta, map[string]interface{}{"value": float64(ecc_sb)}, time.Now()) | ||||||
| 				if err == nil { | 				if err == nil { | ||||||
| 					output <- y | 					output <- y | ||||||
|  | 					m.statsProcessedMetrics++ | ||||||
| 				} | 				} | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
| @@ -425,6 +445,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric) | |||||||
| 				if err == nil { | 				if err == nil { | ||||||
| 					y.AddMeta("unit", "watts") | 					y.AddMeta("unit", "watts") | ||||||
| 					output <- y | 					output <- y | ||||||
|  | 					m.statsProcessedMetrics++ | ||||||
| 				} | 				} | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
| @@ -441,6 +462,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric) | |||||||
| 				if err == nil { | 				if err == nil { | ||||||
| 					y.AddMeta("unit", "%") | 					y.AddMeta("unit", "%") | ||||||
| 					output <- y | 					output <- y | ||||||
|  | 					m.statsProcessedMetrics++ | ||||||
| 				} | 				} | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
| @@ -457,11 +479,12 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric) | |||||||
| 				if err == nil { | 				if err == nil { | ||||||
| 					y.AddMeta("unit", "%") | 					y.AddMeta("unit", "%") | ||||||
| 					output <- y | 					output <- y | ||||||
|  | 					m.statsProcessedMetrics++ | ||||||
| 				} | 				} | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
|  | 	stats.ComponentStatInt(m.name, "collected_metrics", m.statsProcessedMetrics) | ||||||
| } | } | ||||||
|  |  | ||||||
| func (m *NvidiaCollector) Close() { | func (m *NvidiaCollector) Close() { | ||||||
|   | |||||||
| @@ -6,6 +6,7 @@ import ( | |||||||
|  |  | ||||||
| 	cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" | 	cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" | ||||||
| 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | ||||||
|  | 	stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| // These are the fields we read from the JSON configuration | // These are the fields we read from the JSON configuration | ||||||
| @@ -17,9 +18,10 @@ type SampleCollectorConfig struct { | |||||||
| // defined by metricCollector (name, init, ...) | // defined by metricCollector (name, init, ...) | ||||||
| type SampleCollector struct { | type SampleCollector struct { | ||||||
| 	metricCollector | 	metricCollector | ||||||
| 	config SampleTimerCollectorConfig // the configuration structure | 	config     SampleTimerCollectorConfig // the configuration structure | ||||||
| 	meta   map[string]string          // default meta information | 	meta       map[string]string          // default meta information | ||||||
| 	tags   map[string]string          // default tags | 	tags       map[string]string          // default tags | ||||||
|  | 	statsCount int64 | ||||||
| } | } | ||||||
|  |  | ||||||
| // Functions to implement MetricCollector interface | // Functions to implement MetricCollector interface | ||||||
| @@ -58,6 +60,9 @@ func (m *SampleCollector) Init(config json.RawMessage) error { | |||||||
| 	// for all topological entities (sockets, NUMA domains, ...) | 	// for all topological entities (sockets, NUMA domains, ...) | ||||||
| 	// Return some useful error message in case of any failures | 	// Return some useful error message in case of any failures | ||||||
|  |  | ||||||
|  | 	// Initialize counts for statistics | ||||||
|  | 	m.statsCount = 0 | ||||||
|  |  | ||||||
| 	// Set this flag only if everything is initialized properly, all required files exist, ... | 	// Set this flag only if everything is initialized properly, all required files exist, ... | ||||||
| 	m.init = true | 	m.init = true | ||||||
| 	return err | 	return err | ||||||
| @@ -80,8 +85,11 @@ func (m *SampleCollector) Read(interval time.Duration, output chan lp.CCMetric) | |||||||
| 	if err == nil { | 	if err == nil { | ||||||
| 		// Send it to output channel | 		// Send it to output channel | ||||||
| 		output <- y | 		output <- y | ||||||
|  | 		// increment count for each sent metric or any other operation | ||||||
|  | 		m.statsCount++ | ||||||
| 	} | 	} | ||||||
|  | 	// Set stats for the component | ||||||
|  | 	stats.ComponentStatInt(m.name, "count", m.statsCount) | ||||||
| } | } | ||||||
|  |  | ||||||
| // Close metric collector: close network connection, close files, close libraries, ... | // Close metric collector: close network connection, close files, close libraries, ... | ||||||
|   | |||||||
| @@ -11,6 +11,7 @@ import ( | |||||||
|  |  | ||||||
| 	cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" | 	cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" | ||||||
| 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | ||||||
|  | 	stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| // See: https://www.kernel.org/doc/html/latest/hwmon/sysfs-interface.html | // See: https://www.kernel.org/doc/html/latest/hwmon/sysfs-interface.html | ||||||
| @@ -40,7 +41,8 @@ type TempCollector struct { | |||||||
| 		ReportMaxTemp      bool                         `json:"report_max_temperature"` | 		ReportMaxTemp      bool                         `json:"report_max_temperature"` | ||||||
| 		ReportCriticalTemp bool                         `json:"report_critical_temperature"` | 		ReportCriticalTemp bool                         `json:"report_critical_temperature"` | ||||||
| 	} | 	} | ||||||
| 	sensors []*TempCollectorSensor | 	sensors               []*TempCollectorSensor | ||||||
|  | 	statsProcessedMetrics int64 | ||||||
| } | } | ||||||
|  |  | ||||||
| func (m *TempCollector) Init(config json.RawMessage) error { | func (m *TempCollector) Init(config json.RawMessage) error { | ||||||
| @@ -162,6 +164,7 @@ func (m *TempCollector) Init(config json.RawMessage) error { | |||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	// Finished initialization | 	// Finished initialization | ||||||
|  | 	m.statsProcessedMetrics = 0 | ||||||
| 	m.init = true | 	m.init = true | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
| @@ -194,6 +197,7 @@ func (m *TempCollector) Read(interval time.Duration, output chan lp.CCMetric) { | |||||||
| 		) | 		) | ||||||
| 		if err == nil { | 		if err == nil { | ||||||
| 			output <- y | 			output <- y | ||||||
|  | 			m.statsProcessedMetrics++ | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 		// max temperature | 		// max temperature | ||||||
| @@ -207,6 +211,7 @@ func (m *TempCollector) Read(interval time.Duration, output chan lp.CCMetric) { | |||||||
| 			) | 			) | ||||||
| 			if err == nil { | 			if err == nil { | ||||||
| 				output <- y | 				output <- y | ||||||
|  | 				m.statsProcessedMetrics++ | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
| @@ -221,10 +226,11 @@ func (m *TempCollector) Read(interval time.Duration, output chan lp.CCMetric) { | |||||||
| 			) | 			) | ||||||
| 			if err == nil { | 			if err == nil { | ||||||
| 				output <- y | 				output <- y | ||||||
|  | 				m.statsProcessedMetrics++ | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
|  | 	stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics) | ||||||
| } | } | ||||||
|  |  | ||||||
| func (m *TempCollector) Close() { | func (m *TempCollector) Close() { | ||||||
|   | |||||||
| @@ -10,6 +10,7 @@ import ( | |||||||
| 	"time" | 	"time" | ||||||
|  |  | ||||||
| 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | ||||||
|  | 	stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| const MAX_NUM_PROCS = 10 | const MAX_NUM_PROCS = 10 | ||||||
| @@ -21,8 +22,9 @@ type TopProcsCollectorConfig struct { | |||||||
|  |  | ||||||
| type TopProcsCollector struct { | type TopProcsCollector struct { | ||||||
| 	metricCollector | 	metricCollector | ||||||
| 	tags   map[string]string | 	tags                  map[string]string | ||||||
| 	config TopProcsCollectorConfig | 	config                TopProcsCollectorConfig | ||||||
|  | 	statsProcessedMetrics int64 | ||||||
| } | } | ||||||
|  |  | ||||||
| func (m *TopProcsCollector) Init(config json.RawMessage) error { | func (m *TopProcsCollector) Init(config json.RawMessage) error { | ||||||
| @@ -48,6 +50,7 @@ func (m *TopProcsCollector) Init(config json.RawMessage) error { | |||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return errors.New("failed to execute command") | 		return errors.New("failed to execute command") | ||||||
| 	} | 	} | ||||||
|  | 	m.statsProcessedMetrics = 0 | ||||||
| 	m.init = true | 	m.init = true | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
| @@ -70,8 +73,10 @@ func (m *TopProcsCollector) Read(interval time.Duration, output chan lp.CCMetric | |||||||
| 		y, err := lp.New(name, m.tags, m.meta, map[string]interface{}{"value": string(lines[i])}, time.Now()) | 		y, err := lp.New(name, m.tags, m.meta, map[string]interface{}{"value": string(lines[i])}, time.Now()) | ||||||
| 		if err == nil { | 		if err == nil { | ||||||
| 			output <- y | 			output <- y | ||||||
|  | 			m.statsProcessedMetrics++ | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
|  | 	stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics) | ||||||
| } | } | ||||||
|  |  | ||||||
| func (m *TopProcsCollector) Close() { | func (m *TopProcsCollector) Close() { | ||||||
|   | |||||||
							
								
								
									
										17
									
								
								internal/metricRouter/StatsApi.md
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										17
									
								
								internal/metricRouter/StatsApi.md
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,17 @@ | |||||||
|  | # Stats API | ||||||
|  |  | ||||||
|  | The Stats API can be used for debugging. It publishes counts at an HTTP endpoint as JSON from different componenets of the CC Metric Collector. | ||||||
|  |  | ||||||
|  | # Configuration | ||||||
|  |  | ||||||
|  | The Stats API has an own configuration file to specify the listen host and port. The defaults are `localhost` and `8080`. | ||||||
|  |  | ||||||
|  | ```json | ||||||
|  | { | ||||||
|  |   "bindhost" : "", | ||||||
|  |   "port" : "8080", | ||||||
|  |   "publish_collectorstate" : true | ||||||
|  | } | ||||||
|  | ``` | ||||||
|  |  | ||||||
|  | The `bindhost` and `port` can be used to specify the listen host and port. The `publish_collectorstate` needs to be `true`, otherwise nothing is presented. This option is for future use if we need to publish more infos using different domains. | ||||||
							
								
								
									
										232
									
								
								internal/metricRouter/metricApi.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										232
									
								
								internal/metricRouter/metricApi.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,232 @@ | |||||||
|  | package metricRouter | ||||||
|  |  | ||||||
|  | import ( | ||||||
|  | 	"context" | ||||||
|  | 	"encoding/json" | ||||||
|  | 	"fmt" | ||||||
|  | 	"io" | ||||||
|  | 	"net/http" | ||||||
|  | 	"os" | ||||||
|  | 	"sync" | ||||||
|  | 	"time" | ||||||
|  |  | ||||||
|  | 	cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" | ||||||
|  | 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | ||||||
|  | 	mct "github.com/ClusterCockpit/cc-metric-collector/internal/multiChanTicker" | ||||||
|  | 	"github.com/gorilla/mux" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | type statsApiConfig struct { | ||||||
|  | 	PublishCollectorState bool   `json:"publish_collectorstate"` | ||||||
|  | 	Host                  string `json:"bindhost"` | ||||||
|  | 	Port                  string `json:"port"` | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Metric cache data structure | ||||||
|  | type statsApi struct { | ||||||
|  | 	name     string | ||||||
|  | 	input    chan lp.CCMetric | ||||||
|  | 	indone   chan bool | ||||||
|  | 	outdone  chan bool | ||||||
|  | 	config   statsApiConfig | ||||||
|  | 	wg       *sync.WaitGroup | ||||||
|  | 	statsWg  sync.WaitGroup | ||||||
|  | 	ticker   mct.MultiChanTicker | ||||||
|  | 	tickchan chan time.Time | ||||||
|  | 	server   *http.Server | ||||||
|  | 	router   *mux.Router | ||||||
|  | 	lock     sync.Mutex | ||||||
|  | 	baseurl  string | ||||||
|  | 	stats    map[string]map[string]int64 | ||||||
|  | 	outStats map[string]map[string]int64 | ||||||
|  | } | ||||||
|  |  | ||||||
|  | type StatsApi interface { | ||||||
|  | 	Start() | ||||||
|  | 	Close() | ||||||
|  | 	StatsFunc(w http.ResponseWriter, r *http.Request) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | var statsApiServer *statsApi = nil | ||||||
|  |  | ||||||
|  | func (a *statsApi) updateStats(point lp.CCMetric) { | ||||||
|  | 	switch point.Name() { | ||||||
|  | 	case "_stats": | ||||||
|  | 		if name, nok := point.GetMeta("source"); nok { | ||||||
|  | 			var compStats map[string]int64 | ||||||
|  | 			var ok bool | ||||||
|  |  | ||||||
|  | 			if compStats, ok = a.stats[name]; !ok { | ||||||
|  | 				a.stats[name] = make(map[string]int64) | ||||||
|  | 				compStats = a.stats[name] | ||||||
|  | 			} | ||||||
|  | 			for k, v := range point.Fields() { | ||||||
|  | 				switch value := v.(type) { | ||||||
|  | 				case int: | ||||||
|  | 					compStats[k] = int64(value) | ||||||
|  | 				case uint: | ||||||
|  | 					compStats[k] = int64(value) | ||||||
|  | 				case int32: | ||||||
|  | 					compStats[k] = int64(value) | ||||||
|  | 				case uint32: | ||||||
|  | 					compStats[k] = int64(value) | ||||||
|  | 				case int64: | ||||||
|  | 					compStats[k] = int64(value) | ||||||
|  | 				case uint64: | ||||||
|  | 					compStats[k] = int64(value) | ||||||
|  | 				default: | ||||||
|  | 					cclog.ComponentDebug(a.name, "Unusable stats for", k, ". Values should be int64") | ||||||
|  | 				} | ||||||
|  | 			} | ||||||
|  | 			a.stats[name] = compStats | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (a *statsApi) Start() { | ||||||
|  | 	a.ticker.AddChannel(a.tickchan) | ||||||
|  | 	a.wg.Add(1) | ||||||
|  | 	a.statsWg.Add(1) | ||||||
|  | 	go func() { | ||||||
|  | 		a.stats = make(map[string]map[string]int64) | ||||||
|  | 		defer a.statsWg.Done() | ||||||
|  | 		for { | ||||||
|  | 			select { | ||||||
|  | 			case <-a.indone: | ||||||
|  | 				cclog.ComponentDebug(a.name, "INPUT DONE") | ||||||
|  | 				close(a.indone) | ||||||
|  | 				return | ||||||
|  | 			case p := <-a.input: | ||||||
|  | 				a.lock.Lock() | ||||||
|  | 				a.updateStats(p) | ||||||
|  | 				a.lock.Unlock() | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 	}() | ||||||
|  | 	a.statsWg.Add(1) | ||||||
|  | 	go func() { | ||||||
|  | 		a.outStats = make(map[string]map[string]int64) | ||||||
|  | 		defer a.statsWg.Done() | ||||||
|  | 		a.lock.Lock() | ||||||
|  | 		for comp, compData := range a.stats { | ||||||
|  | 			var outData map[string]int64 | ||||||
|  | 			var ok bool | ||||||
|  | 			if outData, ok = a.outStats[comp]; !ok { | ||||||
|  | 				outData = make(map[string]int64) | ||||||
|  | 			} | ||||||
|  | 			for k, v := range compData { | ||||||
|  | 				outData[k] = v | ||||||
|  | 			} | ||||||
|  | 			a.outStats[comp] = outData | ||||||
|  | 		} | ||||||
|  | 		a.lock.Unlock() | ||||||
|  | 		for { | ||||||
|  | 			select { | ||||||
|  | 			case <-a.outdone: | ||||||
|  | 				cclog.ComponentDebug(a.name, "OUTPUT DONE") | ||||||
|  | 				close(a.outdone) | ||||||
|  | 				return | ||||||
|  | 			case <-a.tickchan: | ||||||
|  | 				a.lock.Lock() | ||||||
|  | 				for comp, compData := range a.stats { | ||||||
|  | 					var outData map[string]int64 | ||||||
|  | 					var ok bool | ||||||
|  | 					if outData, ok = a.outStats[comp]; !ok { | ||||||
|  | 						outData = make(map[string]int64) | ||||||
|  | 					} | ||||||
|  | 					for k, v := range compData { | ||||||
|  | 						outData[k] = v | ||||||
|  | 					} | ||||||
|  | 					a.outStats[comp] = outData | ||||||
|  | 				} | ||||||
|  | 				a.lock.Unlock() | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 	}() | ||||||
|  | 	a.statsWg.Add(1) | ||||||
|  | 	go func() { | ||||||
|  | 		defer a.statsWg.Done() | ||||||
|  | 		err := a.server.ListenAndServe() | ||||||
|  | 		if err != nil && err.Error() != "http: Server closed" { | ||||||
|  | 			cclog.ComponentError(a.name, err.Error()) | ||||||
|  | 		} | ||||||
|  | 		cclog.ComponentDebug(a.name, "SERVER DONE") | ||||||
|  | 	}() | ||||||
|  | 	cclog.ComponentDebug(a.name, "STARTED") | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (a *statsApi) StatsFunc(w http.ResponseWriter, r *http.Request) { | ||||||
|  | 	data, err := json.Marshal(a.outStats) | ||||||
|  | 	if err == nil { | ||||||
|  | 		w.Header().Set("Content-Type", "application/json") | ||||||
|  | 		io.WriteString(w, string(data)) | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Close finishes / stops the metric cache | ||||||
|  | func (a *statsApi) Close() { | ||||||
|  | 	cclog.ComponentDebug(a.name, "CLOSE") | ||||||
|  | 	a.indone <- true | ||||||
|  | 	a.outdone <- true | ||||||
|  | 	a.server.Shutdown(context.Background()) | ||||||
|  | 	// wait for close of channel r.done | ||||||
|  | 	<-a.indone | ||||||
|  | 	<-a.outdone | ||||||
|  | 	a.statsWg.Wait() | ||||||
|  | 	a.wg.Done() | ||||||
|  |  | ||||||
|  | 	//a.wg.Wait() | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func NewStatsApi(ticker mct.MultiChanTicker, wg *sync.WaitGroup, statsApiConfigfile string) (StatsApi, error) { | ||||||
|  | 	a := new(statsApi) | ||||||
|  | 	a.name = "StatsApi" | ||||||
|  | 	a.config.Host = "localhost" | ||||||
|  | 	a.config.Port = "8080" | ||||||
|  | 	configFile, err := os.Open(statsApiConfigfile) | ||||||
|  | 	if err != nil { | ||||||
|  | 		cclog.ComponentError(a.name, err.Error()) | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  | 	defer configFile.Close() | ||||||
|  | 	jsonParser := json.NewDecoder(configFile) | ||||||
|  | 	err = jsonParser.Decode(&a.config) | ||||||
|  | 	if err != nil { | ||||||
|  | 		cclog.ComponentError(a.name, err.Error()) | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  | 	a.input = make(chan lp.CCMetric) | ||||||
|  | 	a.ticker = ticker | ||||||
|  | 	a.tickchan = make(chan time.Time) | ||||||
|  | 	a.wg = wg | ||||||
|  | 	a.indone = make(chan bool) | ||||||
|  | 	a.outdone = make(chan bool) | ||||||
|  | 	a.router = mux.NewRouter() | ||||||
|  | 	a.baseurl = fmt.Sprintf("%s:%s", a.config.Host, a.config.Port) | ||||||
|  | 	a.server = &http.Server{Addr: a.baseurl, Handler: a.router} | ||||||
|  | 	if a.config.PublishCollectorState { | ||||||
|  | 		a.router.HandleFunc("/", a.StatsFunc) | ||||||
|  | 	} | ||||||
|  | 	statsApiServer = a | ||||||
|  | 	return a, nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func ComponentStatInt(component string, key string, value int64) { | ||||||
|  | 	if statsApiServer == nil { | ||||||
|  | 		return | ||||||
|  | 	} | ||||||
|  | 	y, err := lp.New("_stats", map[string]string{}, map[string]string{"source": component}, map[string]interface{}{key: value}, time.Now()) | ||||||
|  | 	if err == nil { | ||||||
|  | 		statsApiServer.input <- y | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func ComponentStatString(component string, key string, value int64) { | ||||||
|  | 	if statsApiServer == nil { | ||||||
|  | 		return | ||||||
|  | 	} | ||||||
|  | 	y, err := lp.New("_stats", map[string]string{}, map[string]string{"source": component}, map[string]interface{}{key: value}, time.Now()) | ||||||
|  | 	if err == nil { | ||||||
|  | 		statsApiServer.input <- y | ||||||
|  | 	} | ||||||
|  | } | ||||||
| @@ -40,20 +40,26 @@ type metricRouterConfig struct { | |||||||
|  |  | ||||||
| // Metric router data structure | // Metric router data structure | ||||||
| type metricRouter struct { | type metricRouter struct { | ||||||
| 	hostname    string              // Hostname used in tags | 	hostname          string              // Hostname used in tags | ||||||
| 	coll_input  chan lp.CCMetric    // Input channel from CollectorManager | 	coll_input        chan lp.CCMetric    // Input channel from CollectorManager | ||||||
| 	recv_input  chan lp.CCMetric    // Input channel from ReceiveManager | 	recv_input        chan lp.CCMetric    // Input channel from ReceiveManager | ||||||
| 	cache_input chan lp.CCMetric    // Input channel from MetricCache | 	cache_input       chan lp.CCMetric    // Input channel from MetricCache | ||||||
| 	outputs     []chan lp.CCMetric  // List of all output channels | 	outputs           []chan lp.CCMetric  // List of all output channels | ||||||
| 	done        chan bool           // channel to finish / stop metric router | 	done              chan bool           // channel to finish / stop metric router | ||||||
| 	wg          *sync.WaitGroup     // wait group for all goroutines in cc-metric-collector | 	wg                *sync.WaitGroup     // wait group for all goroutines in cc-metric-collector | ||||||
| 	timestamp   time.Time           // timestamp periodically updated by ticker each interval | 	timestamp         time.Time           // timestamp periodically updated by ticker each interval | ||||||
| 	timerdone   chan bool           // channel to finish / stop timestamp updater | 	timerdone         chan bool           // channel to finish / stop timestamp updater | ||||||
| 	ticker      mct.MultiChanTicker // periodically ticking once each interval | 	ticker            mct.MultiChanTicker // periodically ticking once each interval | ||||||
| 	config      metricRouterConfig  // json encoded config for metric router | 	config            metricRouterConfig  // json encoded config for metric router | ||||||
| 	cache       MetricCache         // pointer to MetricCache | 	cache             MetricCache         // pointer to MetricCache | ||||||
| 	cachewg     sync.WaitGroup      // wait group for MetricCache | 	cachewg           sync.WaitGroup      // wait group for MetricCache | ||||||
| 	maxForward  int                 // number of metrics to forward maximally in one iteration | 	maxForward        int                 // number of metrics to forward maximally in one iteration | ||||||
|  | 	statsCollForward  int64 | ||||||
|  | 	statsRecvForward  int64 | ||||||
|  | 	statsCacheForward int64 | ||||||
|  | 	statsTotalForward int64 | ||||||
|  | 	statsDropped      int64 | ||||||
|  | 	statsRenamed      int64 | ||||||
| } | } | ||||||
|  |  | ||||||
| // MetricRouter access functions | // MetricRouter access functions | ||||||
| @@ -121,6 +127,12 @@ func (r *metricRouter) Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, rout | |||||||
| 	for _, mname := range r.config.DropMetrics { | 	for _, mname := range r.config.DropMetrics { | ||||||
| 		r.config.dropMetrics[mname] = true | 		r.config.dropMetrics[mname] = true | ||||||
| 	} | 	} | ||||||
|  | 	r.statsCollForward = 0 | ||||||
|  | 	r.statsRecvForward = 0 | ||||||
|  | 	r.statsCacheForward = 0 | ||||||
|  | 	r.statsTotalForward = 0 | ||||||
|  | 	r.statsDropped = 0 | ||||||
|  | 	r.statsRenamed = 0 | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -140,6 +152,7 @@ func (r *metricRouter) StartTimer() { | |||||||
| 				cclog.ComponentDebug("MetricRouter", "TIMER DONE") | 				cclog.ComponentDebug("MetricRouter", "TIMER DONE") | ||||||
| 				return | 				return | ||||||
| 			case t := <-m: | 			case t := <-m: | ||||||
|  | 				cclog.ComponentDebug("MetricRouter", "INTERVAL_TICK", t.Unix()) | ||||||
| 				r.timestamp = t | 				r.timestamp = t | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
| @@ -253,6 +266,8 @@ func (r *metricRouter) Start() { | |||||||
| 		r.DoDelTags(point) | 		r.DoDelTags(point) | ||||||
| 		name := point.Name() | 		name := point.Name() | ||||||
| 		if new, ok := r.config.RenameMetrics[name]; ok { | 		if new, ok := r.config.RenameMetrics[name]; ok { | ||||||
|  | 			r.statsRenamed++ | ||||||
|  | 			ComponentStatInt("MetricRouter", "renamed", r.statsRenamed) | ||||||
| 			point.SetName(new) | 			point.SetName(new) | ||||||
| 			point.AddMeta("oldname", name) | 			point.AddMeta("oldname", name) | ||||||
| 		} | 		} | ||||||
| @@ -272,7 +287,14 @@ func (r *metricRouter) Start() { | |||||||
| 			p.SetTime(r.timestamp) | 			p.SetTime(r.timestamp) | ||||||
| 		} | 		} | ||||||
| 		if !r.dropMetric(p) { | 		if !r.dropMetric(p) { | ||||||
|  | 			r.statsCollForward++ | ||||||
|  | 			r.statsTotalForward++ | ||||||
|  | 			ComponentStatInt("MetricRouter", "collector_forward", r.statsCollForward) | ||||||
|  | 			ComponentStatInt("MetricRouter", "total_forward", r.statsTotalForward) | ||||||
| 			forward(p) | 			forward(p) | ||||||
|  | 		} else { | ||||||
|  | 			r.statsDropped++ | ||||||
|  | 			ComponentStatInt("MetricRouter", "dropped", r.statsDropped) | ||||||
| 		} | 		} | ||||||
| 		// even if the metric is dropped, it is stored in the cache for | 		// even if the metric is dropped, it is stored in the cache for | ||||||
| 		// aggregations | 		// aggregations | ||||||
| @@ -288,7 +310,14 @@ func (r *metricRouter) Start() { | |||||||
| 			p.SetTime(r.timestamp) | 			p.SetTime(r.timestamp) | ||||||
| 		} | 		} | ||||||
| 		if !r.dropMetric(p) { | 		if !r.dropMetric(p) { | ||||||
|  | 			r.statsRecvForward++ | ||||||
|  | 			r.statsTotalForward++ | ||||||
|  | 			ComponentStatInt("MetricRouter", "receiver_forward", r.statsRecvForward) | ||||||
|  | 			ComponentStatInt("MetricRouter", "total_forward", r.statsTotalForward) | ||||||
| 			forward(p) | 			forward(p) | ||||||
|  | 		} else { | ||||||
|  | 			r.statsDropped++ | ||||||
|  | 			ComponentStatInt("MetricRouter", "dropped", r.statsDropped) | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| @@ -297,7 +326,14 @@ func (r *metricRouter) Start() { | |||||||
| 		// receive from metric collector | 		// receive from metric collector | ||||||
| 		if !r.dropMetric(p) { | 		if !r.dropMetric(p) { | ||||||
| 			p.AddTag(r.config.HostnameTagName, r.hostname) | 			p.AddTag(r.config.HostnameTagName, r.hostname) | ||||||
|  | 			r.statsCacheForward++ | ||||||
|  | 			r.statsTotalForward++ | ||||||
|  | 			ComponentStatInt("MetricRouter", "cache_forward", r.statsCacheForward) | ||||||
|  | 			ComponentStatInt("MetricRouter", "total_forward", r.statsTotalForward) | ||||||
| 			forward(p) | 			forward(p) | ||||||
|  | 		} else { | ||||||
|  | 			r.statsDropped++ | ||||||
|  | 			ComponentStatInt("MetricRouter", "dropped", r.statsDropped) | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|   | |||||||
| @@ -11,6 +11,7 @@ import ( | |||||||
|  |  | ||||||
| 	cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" | 	cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" | ||||||
| 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | ||||||
|  | 	stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| const GMETRIC_EXEC = `gmetric` | const GMETRIC_EXEC = `gmetric` | ||||||
| @@ -29,9 +30,10 @@ type GangliaSinkConfig struct { | |||||||
|  |  | ||||||
| type GangliaSink struct { | type GangliaSink struct { | ||||||
| 	sink | 	sink | ||||||
| 	gmetric_path   string | 	gmetric_path     string | ||||||
| 	gmetric_config string | 	gmetric_config   string | ||||||
| 	config         GangliaSinkConfig | 	config           GangliaSinkConfig | ||||||
|  | 	statsSentMetrics int64 | ||||||
| } | } | ||||||
|  |  | ||||||
| func (s *GangliaSink) Write(point lp.CCMetric) error { | func (s *GangliaSink) Write(point lp.CCMetric) error { | ||||||
| @@ -78,6 +80,8 @@ func (s *GangliaSink) Write(point lp.CCMetric) error { | |||||||
| 	command := exec.Command(s.gmetric_path, argstr...) | 	command := exec.Command(s.gmetric_path, argstr...) | ||||||
| 	command.Wait() | 	command.Wait() | ||||||
| 	_, err = command.Output() | 	_, err = command.Output() | ||||||
|  | 	s.statsSentMetrics++ | ||||||
|  | 	stats.ComponentStatInt(s.name, "sent_metrics", s.statsSentMetrics) | ||||||
| 	return err | 	return err | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -120,5 +124,6 @@ func NewGangliaSink(name string, config json.RawMessage) (Sink, error) { | |||||||
| 	if len(s.config.GmetricConfig) > 0 { | 	if len(s.config.GmetricConfig) > 0 { | ||||||
| 		s.gmetric_config = s.config.GmetricConfig | 		s.gmetric_config = s.config.GmetricConfig | ||||||
| 	} | 	} | ||||||
|  | 	s.statsSentMetrics = 0 | ||||||
| 	return s, nil | 	return s, nil | ||||||
| } | } | ||||||
|   | |||||||
| @@ -11,6 +11,7 @@ import ( | |||||||
|  |  | ||||||
| 	cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" | 	cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" | ||||||
| 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | ||||||
|  | 	stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" | ||||||
| 	influx "github.com/influxdata/line-protocol" | 	influx "github.com/influxdata/line-protocol" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| @@ -36,6 +37,8 @@ type HttpSink struct { | |||||||
| 	idleConnTimeout time.Duration | 	idleConnTimeout time.Duration | ||||||
| 	timeout         time.Duration | 	timeout         time.Duration | ||||||
| 	flushDelay      time.Duration | 	flushDelay      time.Duration | ||||||
|  | 	statsProcessed  int64 | ||||||
|  | 	statsFlushes    int64 | ||||||
| } | } | ||||||
|  |  | ||||||
| func (s *HttpSink) Write(m lp.CCMetric) error { | func (s *HttpSink) Write(m lp.CCMetric) error { | ||||||
| @@ -63,6 +66,8 @@ func (s *HttpSink) Write(m lp.CCMetric) error { | |||||||
| 		cclog.ComponentError(s.name, "encoding failed:", err.Error()) | 		cclog.ComponentError(s.name, "encoding failed:", err.Error()) | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
|  | 	s.statsProcessed++ | ||||||
|  | 	stats.ComponentStatInt(s.name, "processed_metrics", s.statsProcessed) | ||||||
|  |  | ||||||
| 	// Flush synchronously if "flush_delay" is zero | 	// Flush synchronously if "flush_delay" is zero | ||||||
| 	if s.flushDelay == 0 { | 	if s.flushDelay == 0 { | ||||||
| @@ -112,6 +117,8 @@ func (s *HttpSink) Flush() error { | |||||||
| 		cclog.ComponentError(s.name, "application error:", err.Error()) | 		cclog.ComponentError(s.name, "application error:", err.Error()) | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
|  | 	s.statsFlushes++ | ||||||
|  | 	stats.ComponentStatInt(s.name, "flushes", s.statsFlushes) | ||||||
|  |  | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
| @@ -177,5 +184,7 @@ func NewHttpSink(name string, config json.RawMessage) (Sink, error) { | |||||||
| 	s.buffer = &bytes.Buffer{} | 	s.buffer = &bytes.Buffer{} | ||||||
| 	s.encoder = influx.NewEncoder(s.buffer) | 	s.encoder = influx.NewEncoder(s.buffer) | ||||||
| 	s.encoder.SetPrecision(time.Second) | 	s.encoder.SetPrecision(time.Second) | ||||||
|  | 	s.statsFlushes = 0 | ||||||
|  | 	s.statsProcessed = 0 | ||||||
| 	return s, nil | 	return s, nil | ||||||
| } | } | ||||||
|   | |||||||
| @@ -10,6 +10,7 @@ import ( | |||||||
|  |  | ||||||
| 	cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" | 	cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" | ||||||
| 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | ||||||
|  | 	stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" | ||||||
| 	influxdb2 "github.com/influxdata/influxdb-client-go/v2" | 	influxdb2 "github.com/influxdata/influxdb-client-go/v2" | ||||||
| 	influxdb2Api "github.com/influxdata/influxdb-client-go/v2/api" | 	influxdb2Api "github.com/influxdata/influxdb-client-go/v2/api" | ||||||
| ) | ) | ||||||
| @@ -42,6 +43,9 @@ type InfluxAsyncSink struct { | |||||||
| 	config              InfluxAsyncSinkConfig | 	config              InfluxAsyncSinkConfig | ||||||
| 	influxRetryInterval uint | 	influxRetryInterval uint | ||||||
| 	influxMaxRetryTime  uint | 	influxMaxRetryTime  uint | ||||||
|  | 	sentMetrics         int64 | ||||||
|  | 	statsFlushes        int64 | ||||||
|  | 	statsErrors         int64 | ||||||
| } | } | ||||||
|  |  | ||||||
| func (s *InfluxAsyncSink) connect() error { | func (s *InfluxAsyncSink) connect() error { | ||||||
| @@ -105,11 +109,15 @@ func (s *InfluxAsyncSink) Write(m lp.CCMetric) error { | |||||||
| 	s.writeApi.WritePoint( | 	s.writeApi.WritePoint( | ||||||
| 		m.ToPoint(s.meta_as_tags), | 		m.ToPoint(s.meta_as_tags), | ||||||
| 	) | 	) | ||||||
|  | 	s.sentMetrics++ | ||||||
|  | 	stats.ComponentStatInt(s.name, "send_metrics", s.sentMetrics) | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
| func (s *InfluxAsyncSink) Flush() error { | func (s *InfluxAsyncSink) Flush() error { | ||||||
| 	s.writeApi.Flush() | 	s.writeApi.Flush() | ||||||
|  | 	s.statsFlushes++ | ||||||
|  | 	stats.ComponentStatInt(s.name, "flushes", s.statsFlushes) | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -189,12 +197,17 @@ func NewInfluxAsyncSink(name string, config json.RawMessage) (Sink, error) { | |||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	// Start background: Read from error channel | 	// Start background: Read from error channel | ||||||
|  | 	s.statsErrors = 0 | ||||||
| 	s.errors = s.writeApi.Errors() | 	s.errors = s.writeApi.Errors() | ||||||
| 	go func() { | 	go func() { | ||||||
| 		for err := range s.errors { | 		for err := range s.errors { | ||||||
|  | 			s.statsErrors++ | ||||||
|  | 			stats.ComponentStatInt(s.name, "errors", s.statsErrors) | ||||||
| 			cclog.ComponentError(s.name, err.Error()) | 			cclog.ComponentError(s.name, err.Error()) | ||||||
| 		} | 		} | ||||||
| 	}() | 	}() | ||||||
|  |  | ||||||
|  | 	s.sentMetrics = 0 | ||||||
|  | 	s.statsFlushes = 0 | ||||||
| 	return s, nil | 	return s, nil | ||||||
| } | } | ||||||
|   | |||||||
| @@ -11,6 +11,7 @@ import ( | |||||||
|  |  | ||||||
| 	cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" | 	cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" | ||||||
| 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | ||||||
|  | 	stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" | ||||||
| 	influxdb2 "github.com/influxdata/influxdb-client-go/v2" | 	influxdb2 "github.com/influxdata/influxdb-client-go/v2" | ||||||
| 	influxdb2Api "github.com/influxdata/influxdb-client-go/v2/api" | 	influxdb2Api "github.com/influxdata/influxdb-client-go/v2/api" | ||||||
| 	"github.com/influxdata/influxdb-client-go/v2/api/write" | 	"github.com/influxdata/influxdb-client-go/v2/api/write" | ||||||
| @@ -37,15 +38,17 @@ type InfluxSinkConfig struct { | |||||||
|  |  | ||||||
| type InfluxSink struct { | type InfluxSink struct { | ||||||
| 	sink | 	sink | ||||||
| 	client              influxdb2.Client | 	client                influxdb2.Client | ||||||
| 	writeApi            influxdb2Api.WriteAPIBlocking | 	writeApi              influxdb2Api.WriteAPIBlocking | ||||||
| 	config              InfluxSinkConfig | 	config                InfluxSinkConfig | ||||||
| 	influxRetryInterval uint | 	influxRetryInterval   uint | ||||||
| 	influxMaxRetryTime  uint | 	influxMaxRetryTime    uint | ||||||
| 	batch               []*write.Point | 	batch                 []*write.Point | ||||||
| 	flushTimer          *time.Timer | 	flushTimer            *time.Timer | ||||||
| 	flushDelay          time.Duration | 	flushDelay            time.Duration | ||||||
| 	lock                sync.Mutex // Flush() runs in another goroutine, so this lock has to protect the buffer | 	lock                  sync.Mutex // Flush() runs in another goroutine, so this lock has to protect the buffer | ||||||
|  | 	statsSentMetrics      int64 | ||||||
|  | 	statsProcessedMetrics int64 | ||||||
| 	//influxMaxRetryDelay uint | 	//influxMaxRetryDelay uint | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -123,8 +126,10 @@ func (s *InfluxSink) Write(m lp.CCMetric) error { | |||||||
| 	} | 	} | ||||||
| 	p := m.ToPoint(s.meta_as_tags) | 	p := m.ToPoint(s.meta_as_tags) | ||||||
| 	s.lock.Lock() | 	s.lock.Lock() | ||||||
|  | 	s.statsProcessedMetrics++ | ||||||
| 	s.batch = append(s.batch, p) | 	s.batch = append(s.batch, p) | ||||||
| 	s.lock.Unlock() | 	s.lock.Unlock() | ||||||
|  | 	stats.ComponentStatInt(s.name, "processed_metrics", s.statsProcessedMetrics) | ||||||
|  |  | ||||||
| 	// Flush synchronously if "flush_delay" is zero | 	// Flush synchronously if "flush_delay" is zero | ||||||
| 	if s.flushDelay == 0 { | 	if s.flushDelay == 0 { | ||||||
| @@ -145,6 +150,8 @@ func (s *InfluxSink) Flush() error { | |||||||
| 		cclog.ComponentError(s.name, "flush failed:", err.Error()) | 		cclog.ComponentError(s.name, "flush failed:", err.Error()) | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
|  | 	s.statsSentMetrics += int64(len(s.batch)) | ||||||
|  | 	stats.ComponentStatInt(s.name, "sent_metrics", s.statsSentMetrics) | ||||||
| 	s.batch = s.batch[:0] | 	s.batch = s.batch[:0] | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
| @@ -211,5 +218,7 @@ func NewInfluxSink(name string, config json.RawMessage) (Sink, error) { | |||||||
| 	if err := s.connect(); err != nil { | 	if err := s.connect(); err != nil { | ||||||
| 		return nil, fmt.Errorf("unable to connect: %v", err) | 		return nil, fmt.Errorf("unable to connect: %v", err) | ||||||
| 	} | 	} | ||||||
|  | 	s.statsSentMetrics = 0 | ||||||
|  | 	s.statsProcessedMetrics = 0 | ||||||
| 	return s, nil | 	return s, nil | ||||||
| } | } | ||||||
|   | |||||||
| @@ -73,6 +73,7 @@ import ( | |||||||
|  |  | ||||||
| 	cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" | 	cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" | ||||||
| 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | ||||||
|  | 	stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" | ||||||
| 	"github.com/NVIDIA/go-nvml/pkg/dl" | 	"github.com/NVIDIA/go-nvml/pkg/dl" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| @@ -102,11 +103,12 @@ type LibgangliaSinkConfig struct { | |||||||
|  |  | ||||||
| type LibgangliaSink struct { | type LibgangliaSink struct { | ||||||
| 	sink | 	sink | ||||||
| 	config         LibgangliaSinkConfig | 	config           LibgangliaSinkConfig | ||||||
| 	global_context C.Ganglia_pool | 	global_context   C.Ganglia_pool | ||||||
| 	gmond_config   C.Ganglia_gmond_config | 	gmond_config     C.Ganglia_gmond_config | ||||||
| 	send_channels  C.Ganglia_udp_send_channels | 	send_channels    C.Ganglia_udp_send_channels | ||||||
| 	cstrCache      map[string]*C.char | 	cstrCache        map[string]*C.char | ||||||
|  | 	statsSentMetrics int64 | ||||||
| } | } | ||||||
|  |  | ||||||
| func (s *LibgangliaSink) Write(point lp.CCMetric) error { | func (s *LibgangliaSink) Write(point lp.CCMetric) error { | ||||||
| @@ -202,6 +204,8 @@ func (s *LibgangliaSink) Write(point lp.CCMetric) error { | |||||||
| 	C.Ganglia_metric_destroy(gmetric) | 	C.Ganglia_metric_destroy(gmetric) | ||||||
| 	// Free the value C string, the only one not stored in the cache | 	// Free the value C string, the only one not stored in the cache | ||||||
| 	C.free(unsafe.Pointer(c_value)) | 	C.free(unsafe.Pointer(c_value)) | ||||||
|  | 	s.statsSentMetrics++ | ||||||
|  | 	stats.ComponentStatInt(s.name, "sent_metrics", s.statsSentMetrics) | ||||||
| 	return err | 	return err | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -247,7 +251,7 @@ func NewLibgangliaSink(name string, config json.RawMessage) (Sink, error) { | |||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return nil, fmt.Errorf("error opening %s: %v", s.config.GangliaLib, err) | 		return nil, fmt.Errorf("error opening %s: %v", s.config.GangliaLib, err) | ||||||
| 	} | 	} | ||||||
|  | 	s.statsSentMetrics = 0 | ||||||
| 	// Set up cache for the C strings | 	// Set up cache for the C strings | ||||||
| 	s.cstrCache = make(map[string]*C.char) | 	s.cstrCache = make(map[string]*C.char) | ||||||
| 	// s.cstrCache["globals"] = C.CString("globals") | 	// s.cstrCache["globals"] = C.CString("globals") | ||||||
|   | |||||||
| @@ -11,6 +11,7 @@ import ( | |||||||
|  |  | ||||||
| 	cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" | 	cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" | ||||||
| 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | ||||||
|  | 	stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" | ||||||
| 	"github.com/gorilla/mux" | 	"github.com/gorilla/mux" | ||||||
| 	"github.com/prometheus/client_golang/prometheus" | 	"github.com/prometheus/client_golang/prometheus" | ||||||
| 	"github.com/prometheus/client_golang/prometheus/promhttp" | 	"github.com/prometheus/client_golang/prometheus/promhttp" | ||||||
| @@ -29,11 +30,12 @@ type PrometheusSinkConfig struct { | |||||||
|  |  | ||||||
| type PrometheusSink struct { | type PrometheusSink struct { | ||||||
| 	sink | 	sink | ||||||
| 	config       PrometheusSinkConfig | 	config           PrometheusSinkConfig | ||||||
| 	labelMetrics map[string]*prometheus.GaugeVec | 	labelMetrics     map[string]*prometheus.GaugeVec | ||||||
| 	nodeMetrics  map[string]prometheus.Gauge | 	nodeMetrics      map[string]prometheus.Gauge | ||||||
| 	promWg       sync.WaitGroup | 	promWg           sync.WaitGroup | ||||||
| 	promServer   *http.Server | 	promServer       *http.Server | ||||||
|  | 	statsSentMetrics int64 | ||||||
| } | } | ||||||
|  |  | ||||||
| func intToFloat64(input interface{}) (float64, error) { | func intToFloat64(input interface{}) (float64, error) { | ||||||
| @@ -113,6 +115,8 @@ func (s *PrometheusSink) newMetric(metric lp.CCMetric) error { | |||||||
| 		s.nodeMetrics[name] = new | 		s.nodeMetrics[name] = new | ||||||
| 		prometheus.Register(new) | 		prometheus.Register(new) | ||||||
| 	} | 	} | ||||||
|  | 	s.statsSentMetrics++ | ||||||
|  | 	stats.ComponentStatInt(s.name, "sent_metrics", s.statsSentMetrics) | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -146,6 +150,8 @@ func (s *PrometheusSink) updateMetric(metric lp.CCMetric) error { | |||||||
| 		} | 		} | ||||||
| 		s.nodeMetrics[name].Set(value) | 		s.nodeMetrics[name].Set(value) | ||||||
| 	} | 	} | ||||||
|  | 	s.statsSentMetrics++ | ||||||
|  | 	stats.ComponentStatInt(s.name, "sent_metrics", s.statsSentMetrics) | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
|   | |||||||
| @@ -7,6 +7,7 @@ import ( | |||||||
|  |  | ||||||
| 	cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" | 	cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" | ||||||
| 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | ||||||
|  | 	stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| type SampleSinkConfig struct { | type SampleSinkConfig struct { | ||||||
| @@ -14,12 +15,15 @@ type SampleSinkConfig struct { | |||||||
| 	// See: metricSink.go | 	// See: metricSink.go | ||||||
| 	defaultSinkConfig | 	defaultSinkConfig | ||||||
| 	// Additional config options, for SampleSink | 	// Additional config options, for SampleSink | ||||||
|  |  | ||||||
| } | } | ||||||
|  |  | ||||||
| type SampleSink struct { | type SampleSink struct { | ||||||
| 	// declares elements 	'name' and 'meta_as_tags' (string to bool map!) | 	// declares elements 	'name' and 'meta_as_tags' (string to bool map!) | ||||||
| 	sink | 	sink | ||||||
| 	config SampleSinkConfig // entry point to the SampleSinkConfig | 	config SampleSinkConfig // entry point to the SampleSinkConfig | ||||||
|  | 	// Stats counters | ||||||
|  | 	statsSentMetrics int64 | ||||||
| } | } | ||||||
|  |  | ||||||
| // Implement functions required for Sink interface | // Implement functions required for Sink interface | ||||||
| @@ -30,6 +34,8 @@ type SampleSink struct { | |||||||
| func (s *SampleSink) Write(point lp.CCMetric) error { | func (s *SampleSink) Write(point lp.CCMetric) error { | ||||||
| 	// based on s.meta_as_tags use meta infos as tags | 	// based on s.meta_as_tags use meta infos as tags | ||||||
| 	log.Print(point) | 	log.Print(point) | ||||||
|  | 	s.statsSentMetrics++ | ||||||
|  | 	stats.ComponentStatInt(s.name, "sent_metrics", s.statsSentMetrics) | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -63,6 +69,9 @@ func NewSampleSink(name string, config json.RawMessage) (Sink, error) { | |||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | 	// Initalize stats counters | ||||||
|  | 	s.statsSentMetrics = 0 | ||||||
|  |  | ||||||
| 	// Create lookup map to use meta infos as tags in the output metric | 	// Create lookup map to use meta infos as tags in the output metric | ||||||
| 	s.meta_as_tags = make(map[string]bool) | 	s.meta_as_tags = make(map[string]bool) | ||||||
| 	for _, k := range s.config.MetaAsTags { | 	for _, k := range s.config.MetaAsTags { | ||||||
|   | |||||||
| @@ -102,13 +102,19 @@ func (sm *sinkManager) Start() { | |||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 		toTheSinks := func(p lp.CCMetric) { | 		toTheSinks := func(p lp.CCMetric) { | ||||||
|  | 			var wg sync.WaitGroup | ||||||
| 			// Send received metric to all outputs | 			// Send received metric to all outputs | ||||||
| 			cclog.ComponentDebug("SinkManager", "WRITE", p) | 			cclog.ComponentDebug("SinkManager", "WRITE", p) | ||||||
| 			for _, s := range sm.sinks { | 			for _, s := range sm.sinks { | ||||||
| 				if err := s.Write(p); err != nil { | 				wg.Add(1) | ||||||
| 					cclog.ComponentError("SinkManager", "WRITE", s.Name(), "write failed:", err.Error()) | 				go func(s Sink) { | ||||||
| 				} | 					if err := s.Write(p); err != nil { | ||||||
|  | 						cclog.ComponentError("SinkManager", "WRITE", s.Name(), "write failed:", err.Error()) | ||||||
|  | 					} | ||||||
|  | 					wg.Done() | ||||||
|  | 				}(s) | ||||||
| 			} | 			} | ||||||
|  | 			wg.Wait() | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 		for { | 		for { | ||||||
|   | |||||||
| @@ -8,6 +8,7 @@ import ( | |||||||
|  |  | ||||||
| 	//	"time" | 	//	"time" | ||||||
| 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | ||||||
|  | 	stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| type StdoutSink struct { | type StdoutSink struct { | ||||||
| @@ -17,6 +18,7 @@ type StdoutSink struct { | |||||||
| 		defaultSinkConfig | 		defaultSinkConfig | ||||||
| 		Output string `json:"output_file,omitempty"` | 		Output string `json:"output_file,omitempty"` | ||||||
| 	} | 	} | ||||||
|  | 	sentMetrics int64 | ||||||
| } | } | ||||||
|  |  | ||||||
| func (s *StdoutSink) Write(m lp.CCMetric) error { | func (s *StdoutSink) Write(m lp.CCMetric) error { | ||||||
| @@ -24,6 +26,8 @@ func (s *StdoutSink) Write(m lp.CCMetric) error { | |||||||
| 		s.output, | 		s.output, | ||||||
| 		m.ToLineProtocol(s.meta_as_tags), | 		m.ToLineProtocol(s.meta_as_tags), | ||||||
| 	) | 	) | ||||||
|  | 	s.sentMetrics++ | ||||||
|  | 	stats.ComponentStatInt(s.name, "sent_metrics", s.sentMetrics) | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -68,6 +72,7 @@ func NewStdoutSink(name string, config json.RawMessage) (Sink, error) { | |||||||
| 	for _, k := range s.config.MetaAsTags { | 	for _, k := range s.config.MetaAsTags { | ||||||
| 		s.meta_as_tags[k] = true | 		s.meta_as_tags[k] = true | ||||||
| 	} | 	} | ||||||
|  | 	s.sentMetrics = 0 | ||||||
|  |  | ||||||
| 	return s, nil | 	return s, nil | ||||||
| } | } | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user