mirror of
				https://github.com/ClusterCockpit/cc-metric-collector.git
				synced 2025-10-20 12:55:08 +02:00 
			
		
		
		
	Compare commits
	
		
			5 Commits
		
	
	
		
			numastats_
			...
			http_stats
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
|  | 9dd6ff1a76 | ||
|  | 257b4a64b5 | ||
|  | 5eeb097136 | ||
|  | 4a4992877c | ||
|  | 9447685a69 | 
| @@ -20,6 +20,7 @@ There is a main configuration file with basic settings that point to the other c | ||||
|   "collectors" : "collectors.json", | ||||
|   "receivers" : "receivers.json", | ||||
|   "router" : "router.json", | ||||
|   "stats_api" : "api.json", | ||||
|   "interval": 10, | ||||
|   "duration": 1 | ||||
| } | ||||
| @@ -32,6 +33,7 @@ See the component READMEs for their configuration: | ||||
| * [`sinks`](./sinks/README.md) | ||||
| * [`receivers`](./receivers/README.md) | ||||
| * [`router`](./internal/metricRouter/README.md) | ||||
| * [`stats_api`](./internal/metricRouter/StatsApi.md) | ||||
|  | ||||
|  | ||||
| # Installation | ||||
|   | ||||
| @@ -28,6 +28,7 @@ type CentralConfigFile struct { | ||||
| 	RouterConfigFile    string `json:"router"` | ||||
| 	SinkConfigFile      string `json:"sinks"` | ||||
| 	ReceiverConfigFile  string `json:"receivers,omitempty"` | ||||
| 	StatsApiConfigFile  string `json:"stats_api,omitempty"` | ||||
| } | ||||
|  | ||||
| func LoadCentralConfiguration(file string, config *CentralConfigFile) error { | ||||
| @@ -52,6 +53,7 @@ type RuntimeConfig struct { | ||||
| 	CollectManager  collectors.CollectorManager | ||||
| 	SinkManager     sinks.SinkManager | ||||
| 	ReceiveManager  receivers.ReceiveManager | ||||
| 	StatsApi        mr.StatsApi | ||||
| 	MultiChanTicker mct.MultiChanTicker | ||||
|  | ||||
| 	Channels []chan lp.CCMetric | ||||
| @@ -152,11 +154,16 @@ func shutdownHandler(config *RuntimeConfig, shutdownSignal chan os.Signal) { | ||||
| 		cclog.Debug("Shutdown SinkManager...") | ||||
| 		config.SinkManager.Close() | ||||
| 	} | ||||
| 	if config.StatsApi != nil { | ||||
| 		cclog.Debug("Shutdown StatsApi...") | ||||
| 		config.StatsApi.Close() | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func mainFunc() int { | ||||
| 	var err error | ||||
| 	use_recv := false | ||||
| 	use_api := false | ||||
|  | ||||
| 	// Initialize runtime configuration | ||||
| 	rcfg := RuntimeConfig{ | ||||
| @@ -164,6 +171,7 @@ func mainFunc() int { | ||||
| 		CollectManager: nil, | ||||
| 		SinkManager:    nil, | ||||
| 		ReceiveManager: nil, | ||||
| 		StatsApi:       nil, | ||||
| 		CliArgs:        ReadCli(), | ||||
| 	} | ||||
|  | ||||
| @@ -253,6 +261,16 @@ func mainFunc() int { | ||||
| 		use_recv = true | ||||
| 	} | ||||
|  | ||||
| 	// Create new statistics API manager | ||||
| 	if len(rcfg.ConfigFile.StatsApiConfigFile) > 0 { | ||||
| 		rcfg.StatsApi, err = mr.NewStatsApi(rcfg.MultiChanTicker, &rcfg.Sync, rcfg.ConfigFile.StatsApiConfigFile) | ||||
| 		if err != nil { | ||||
| 			cclog.Error(err.Error()) | ||||
| 			return 1 | ||||
| 		} | ||||
| 		use_api = true | ||||
| 	} | ||||
|  | ||||
| 	// Create shutdown handler | ||||
| 	shutdownSignal := make(chan os.Signal, 1) | ||||
| 	signal.Notify(shutdownSignal, os.Interrupt) | ||||
| @@ -260,6 +278,11 @@ func mainFunc() int { | ||||
| 	rcfg.Sync.Add(1) | ||||
| 	go shutdownHandler(&rcfg, shutdownSignal) | ||||
|  | ||||
| 	// Start the stats api early to be prepared for init settings | ||||
| 	if use_api { | ||||
| 		rcfg.StatsApi.Start() | ||||
| 	} | ||||
|  | ||||
| 	// Start the managers | ||||
| 	rcfg.MetricRouter.Start() | ||||
| 	rcfg.SinkManager.Start() | ||||
|   | ||||
| @@ -16,6 +16,7 @@ import ( | ||||
|  | ||||
| 	cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" | ||||
| 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | ||||
| 	stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" | ||||
| ) | ||||
|  | ||||
| const DEFAULT_BEEGFS_CMD = "beegfs-ctl" | ||||
| @@ -33,6 +34,7 @@ type BeegfsMetaCollector struct { | ||||
| 	matches               map[string]string | ||||
| 	config                BeegfsMetaCollectorConfig | ||||
| 	skipFS                map[string]struct{} | ||||
| 	statsProcessedMetrics int64 | ||||
| } | ||||
|  | ||||
| func (m *BeegfsMetaCollector) Init(config json.RawMessage) error { | ||||
| @@ -105,6 +107,7 @@ func (m *BeegfsMetaCollector) Init(config json.RawMessage) error { | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("BeegfsMetaCollector.Init(): Failed to find beegfs-ctl binary '%s': %v", m.config.Beegfs, err) | ||||
| 	} | ||||
| 	m.statsProcessedMetrics = 0 | ||||
| 	m.init = true | ||||
| 	return nil | ||||
| } | ||||
| @@ -218,10 +221,12 @@ func (m *BeegfsMetaCollector) Read(interval time.Duration, output chan lp.CCMetr | ||||
| 				y, err := lp.New(key, m.tags, m.meta, map[string]interface{}{"value": value}, time.Now()) | ||||
| 				if err == nil { | ||||
| 					output <- y | ||||
| 					m.statsProcessedMetrics++ | ||||
| 				} | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
| 	stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics) | ||||
| } | ||||
|  | ||||
| func (m *BeegfsMetaCollector) Close() { | ||||
|   | ||||
| @@ -16,6 +16,7 @@ import ( | ||||
|  | ||||
| 	cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" | ||||
| 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | ||||
| 	stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" | ||||
| ) | ||||
|  | ||||
| // Struct for the collector-specific JSON config | ||||
| @@ -31,6 +32,7 @@ type BeegfsStorageCollector struct { | ||||
| 	matches               map[string]string | ||||
| 	config                BeegfsStorageCollectorConfig | ||||
| 	skipFS                map[string]struct{} | ||||
| 	statsProcessedMetrics int64 | ||||
| } | ||||
|  | ||||
| func (m *BeegfsStorageCollector) Init(config json.RawMessage) error { | ||||
| @@ -98,6 +100,7 @@ func (m *BeegfsStorageCollector) Init(config json.RawMessage) error { | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("BeegfsStorageCollector.Init(): Failed to find beegfs-ctl binary '%s': %v", m.config.Beegfs, err) | ||||
| 	} | ||||
| 	m.statsProcessedMetrics = 0 | ||||
| 	m.init = true | ||||
| 	return nil | ||||
| } | ||||
| @@ -210,10 +213,12 @@ func (m *BeegfsStorageCollector) Read(interval time.Duration, output chan lp.CCM | ||||
| 				y, err := lp.New(key, m.tags, m.meta, map[string]interface{}{"value": value}, time.Now()) | ||||
| 				if err == nil { | ||||
| 					output <- y | ||||
| 					m.statsProcessedMetrics++ | ||||
| 				} | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
| 	stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics) | ||||
| } | ||||
|  | ||||
| func (m *BeegfsStorageCollector) Close() { | ||||
|   | ||||
| @@ -12,6 +12,7 @@ import ( | ||||
|  | ||||
| 	cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" | ||||
| 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | ||||
| 	stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" | ||||
| ) | ||||
|  | ||||
| // | ||||
| @@ -37,6 +38,7 @@ type CPUFreqCpuInfoCollectorTopology struct { | ||||
| type CPUFreqCpuInfoCollector struct { | ||||
| 	metricCollector | ||||
| 	topology              []*CPUFreqCpuInfoCollectorTopology | ||||
| 	statsProcessedMetrics int64 | ||||
| } | ||||
|  | ||||
| func (m *CPUFreqCpuInfoCollector) Init(config json.RawMessage) error { | ||||
| @@ -155,7 +157,7 @@ func (m *CPUFreqCpuInfoCollector) Init(config json.RawMessage) error { | ||||
| 			"package_id": t.physicalPackageID, | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	m.statsProcessedMetrics = 0 | ||||
| 	m.init = true | ||||
| 	return nil | ||||
| } | ||||
| @@ -196,6 +198,7 @@ func (m *CPUFreqCpuInfoCollector) Read(interval time.Duration, output chan lp.CC | ||||
| 						return | ||||
| 					} | ||||
| 					if y, err := lp.New("cpufreq", t.tagSet, m.meta, map[string]interface{}{"value": value}, now); err == nil { | ||||
| 						m.statsProcessedMetrics++ | ||||
| 						output <- y | ||||
| 					} | ||||
| 				} | ||||
| @@ -203,6 +206,7 @@ func (m *CPUFreqCpuInfoCollector) Read(interval time.Duration, output chan lp.CC | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
| 	stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics) | ||||
| } | ||||
|  | ||||
| func (m *CPUFreqCpuInfoCollector) Close() { | ||||
|   | ||||
| @@ -11,6 +11,7 @@ import ( | ||||
|  | ||||
| 	cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" | ||||
| 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | ||||
| 	stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" | ||||
| 	"golang.org/x/sys/unix" | ||||
| ) | ||||
|  | ||||
| @@ -40,6 +41,7 @@ type CPUFreqCollectorTopology struct { | ||||
| type CPUFreqCollector struct { | ||||
| 	metricCollector | ||||
| 	topology              []CPUFreqCollectorTopology | ||||
| 	statsProcessedMetrics int64 | ||||
| 	config                struct { | ||||
| 		ExcludeMetrics []string `json:"exclude_metrics,omitempty"` | ||||
| 	} | ||||
| @@ -166,7 +168,7 @@ func (m *CPUFreqCollector) Init(config json.RawMessage) error { | ||||
| 			"package_id": t.physicalPackageID, | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	m.statsProcessedMetrics = 0 | ||||
| 	m.init = true | ||||
| 	return nil | ||||
| } | ||||
| @@ -203,9 +205,11 @@ func (m *CPUFreqCollector) Read(interval time.Duration, output chan lp.CCMetric) | ||||
| 		} | ||||
|  | ||||
| 		if y, err := lp.New("cpufreq", t.tagSet, m.meta, map[string]interface{}{"value": cpuFreq}, now); err == nil { | ||||
| 			m.statsProcessedMetrics++ | ||||
| 			output <- y | ||||
| 		} | ||||
| 	} | ||||
| 	stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics) | ||||
| } | ||||
|  | ||||
| func (m *CPUFreqCollector) Close() { | ||||
|   | ||||
| @@ -11,6 +11,7 @@ import ( | ||||
|  | ||||
| 	cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" | ||||
| 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | ||||
| 	stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" | ||||
| ) | ||||
|  | ||||
| const CPUSTATFILE = `/proc/stat` | ||||
| @@ -25,6 +26,7 @@ type CpustatCollector struct { | ||||
| 	matches               map[string]int | ||||
| 	cputags               map[string]map[string]string | ||||
| 	nodetags              map[string]string | ||||
| 	statsProcessedMetrics int64 | ||||
| } | ||||
|  | ||||
| func (m *CpustatCollector) Init(config json.RawMessage) error { | ||||
| @@ -86,6 +88,7 @@ func (m *CpustatCollector) Init(config json.RawMessage) error { | ||||
| 			num_cpus++ | ||||
| 		} | ||||
| 	} | ||||
| 	m.statsProcessedMetrics = 0 | ||||
| 	m.init = true | ||||
| 	return nil | ||||
| } | ||||
| @@ -106,6 +109,7 @@ func (m *CpustatCollector) parseStatLine(linefields []string, tags map[string]st | ||||
| 	for name, value := range values { | ||||
| 		y, err := lp.New(name, tags, m.meta, map[string]interface{}{"value": (value * 100.0) / total}, t) | ||||
| 		if err == nil { | ||||
| 			m.statsProcessedMetrics++ | ||||
| 			output <- y | ||||
| 		} | ||||
| 	} | ||||
| @@ -141,8 +145,10 @@ func (m *CpustatCollector) Read(interval time.Duration, output chan lp.CCMetric) | ||||
| 		time.Now(), | ||||
| 	) | ||||
| 	if err == nil { | ||||
| 		m.statsProcessedMetrics++ | ||||
| 		output <- num_cpus_metric | ||||
| 	} | ||||
| 	stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics) | ||||
| } | ||||
|  | ||||
| func (m *CpustatCollector) Close() { | ||||
|   | ||||
| @@ -10,6 +10,7 @@ import ( | ||||
| 	"time" | ||||
|  | ||||
| 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | ||||
| 	stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" | ||||
| 	influx "github.com/influxdata/line-protocol" | ||||
| ) | ||||
|  | ||||
| @@ -28,6 +29,9 @@ type CustomCmdCollector struct { | ||||
| 	config                 CustomCmdCollectorConfig | ||||
| 	commands               []string | ||||
| 	files                  []string | ||||
| 	statsProcessedMetrics  int64 | ||||
| 	statsProcessedCommands int64 | ||||
| 	statsProcessedFiles    int64 | ||||
| } | ||||
|  | ||||
| func (m *CustomCmdCollector) Init(config json.RawMessage) error { | ||||
| @@ -66,6 +70,9 @@ func (m *CustomCmdCollector) Init(config json.RawMessage) error { | ||||
| 	m.handler = influx.NewMetricHandler() | ||||
| 	m.parser = influx.NewParser(m.handler) | ||||
| 	m.parser.SetTimeFunc(DefaultTime) | ||||
| 	m.statsProcessedMetrics = 0 | ||||
| 	m.statsProcessedFiles = 0 | ||||
| 	m.statsProcessedCommands = 0 | ||||
| 	m.init = true | ||||
| 	return nil | ||||
| } | ||||
| @@ -100,9 +107,13 @@ func (m *CustomCmdCollector) Read(interval time.Duration, output chan lp.CCMetri | ||||
|  | ||||
| 			y := lp.FromInfluxMetric(c) | ||||
| 			if err == nil { | ||||
| 				m.statsProcessedMetrics++ | ||||
| 				stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics) | ||||
| 				output <- y | ||||
| 			} | ||||
| 		} | ||||
| 		m.statsProcessedCommands++ | ||||
| 		stats.ComponentStatInt(m.name, "processed_commands", m.statsProcessedCommands) | ||||
| 	} | ||||
| 	for _, file := range m.files { | ||||
| 		buffer, err := ioutil.ReadFile(file) | ||||
| @@ -122,9 +133,13 @@ func (m *CustomCmdCollector) Read(interval time.Duration, output chan lp.CCMetri | ||||
| 			} | ||||
| 			y := lp.FromInfluxMetric(f) | ||||
| 			if err == nil { | ||||
| 				m.statsProcessedMetrics++ | ||||
| 				stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics) | ||||
| 				output <- y | ||||
| 			} | ||||
| 		} | ||||
| 		m.statsProcessedFiles++ | ||||
| 		stats.ComponentStatInt(m.name, "processed_files", m.statsProcessedFiles) | ||||
| 	} | ||||
| } | ||||
|  | ||||
|   | ||||
| @@ -11,6 +11,7 @@ import ( | ||||
|  | ||||
| 	cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" | ||||
| 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | ||||
| 	stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" | ||||
| ) | ||||
|  | ||||
| //	"log" | ||||
| @@ -23,9 +24,8 @@ type DiskstatCollectorConfig struct { | ||||
|  | ||||
| type DiskstatCollector struct { | ||||
| 	metricCollector | ||||
| 	//matches map[string]int | ||||
| 	config IOstatCollectorConfig | ||||
| 	//devices map[string]IOstatCollectorEntry | ||||
| 	config                DiskstatCollectorConfig | ||||
| 	statsProcessedMetrics int64 | ||||
| } | ||||
|  | ||||
| func (m *DiskstatCollector) Init(config json.RawMessage) error { | ||||
| @@ -44,6 +44,7 @@ func (m *DiskstatCollector) Init(config json.RawMessage) error { | ||||
| 		return err | ||||
| 	} | ||||
| 	defer file.Close() | ||||
| 	m.statsProcessedMetrics = 0 | ||||
| 	m.init = true | ||||
| 	return nil | ||||
| } | ||||
| @@ -89,12 +90,16 @@ func (m *DiskstatCollector) Read(interval time.Duration, output chan lp.CCMetric | ||||
| 		y, err := lp.New("disk_total", tags, m.meta, map[string]interface{}{"value": total}, time.Now()) | ||||
| 		if err == nil { | ||||
| 			y.AddMeta("unit", "GBytes") | ||||
| 			m.statsProcessedMetrics++ | ||||
| 			stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics) | ||||
| 			output <- y | ||||
| 		} | ||||
| 		free := (stat.Bfree * uint64(stat.Bsize)) / uint64(1000000000) | ||||
| 		y, err = lp.New("disk_free", tags, m.meta, map[string]interface{}{"value": free}, time.Now()) | ||||
| 		if err == nil { | ||||
| 			y.AddMeta("unit", "GBytes") | ||||
| 			m.statsProcessedMetrics++ | ||||
| 			stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics) | ||||
| 			output <- y | ||||
| 		} | ||||
| 		perc := (100 * (total - free)) / total | ||||
| @@ -105,6 +110,8 @@ func (m *DiskstatCollector) Read(interval time.Duration, output chan lp.CCMetric | ||||
| 	y, err := lp.New("part_max_used", map[string]string{"type": "node"}, m.meta, map[string]interface{}{"value": int(part_max_used)}, time.Now()) | ||||
| 	if err == nil { | ||||
| 		y.AddMeta("unit", "percent") | ||||
| 		m.statsProcessedMetrics++ | ||||
| 		stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics) | ||||
| 		output <- y | ||||
| 	} | ||||
| } | ||||
|   | ||||
| @@ -15,6 +15,7 @@ import ( | ||||
|  | ||||
| 	cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" | ||||
| 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | ||||
| 	stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" | ||||
| ) | ||||
|  | ||||
| const DEFAULT_GPFS_CMD = "mmpmon" | ||||
| @@ -35,6 +36,7 @@ type GpfsCollector struct { | ||||
| 	skipFS                map[string]struct{} | ||||
| 	lastTimestamp         time.Time // Store time stamp of last tick to derive bandwidths | ||||
| 	lastState             map[string]GpfsCollectorLastState | ||||
| 	statsProcessedMetrics int64 | ||||
| } | ||||
|  | ||||
| func (m *GpfsCollector) Init(config json.RawMessage) error { | ||||
| @@ -86,7 +88,7 @@ func (m *GpfsCollector) Init(config json.RawMessage) error { | ||||
| 		return fmt.Errorf("failed to find mmpmon binary '%s': %v", m.config.Mmpmon, err) | ||||
| 	} | ||||
| 	m.config.Mmpmon = p | ||||
|  | ||||
| 	m.statsProcessedMetrics = 0 | ||||
| 	m.init = true | ||||
| 	return nil | ||||
| } | ||||
| @@ -211,12 +213,14 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) { | ||||
| 		} | ||||
| 		if y, err := lp.New("gpfs_bytes_read", m.tags, m.meta, map[string]interface{}{"value": bytesRead}, timestamp); err == nil { | ||||
| 			output <- y | ||||
| 			m.statsProcessedMetrics++ | ||||
| 		} | ||||
| 		if m.config.SendBandwidths { | ||||
| 			if lastBytesRead := m.lastState[filesystem].bytesRead; lastBytesRead >= 0 { | ||||
| 				bwRead := float64(bytesRead-lastBytesRead) / timeDiff | ||||
| 				if y, err := lp.New("gpfs_bw_read", m.tags, m.meta, map[string]interface{}{"value": bwRead}, timestamp); err == nil { | ||||
| 					output <- y | ||||
| 					m.statsProcessedMetrics++ | ||||
| 				} | ||||
| 			} | ||||
| 		} | ||||
| @@ -231,12 +235,14 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) { | ||||
| 		} | ||||
| 		if y, err := lp.New("gpfs_bytes_written", m.tags, m.meta, map[string]interface{}{"value": bytesWritten}, timestamp); err == nil { | ||||
| 			output <- y | ||||
| 			m.statsProcessedMetrics++ | ||||
| 		} | ||||
| 		if m.config.SendBandwidths { | ||||
| 			if lastBytesWritten := m.lastState[filesystem].bytesRead; lastBytesWritten >= 0 { | ||||
| 				bwWrite := float64(bytesWritten-lastBytesWritten) / timeDiff | ||||
| 				if y, err := lp.New("gpfs_bw_write", m.tags, m.meta, map[string]interface{}{"value": bwWrite}, timestamp); err == nil { | ||||
| 					output <- y | ||||
| 					m.statsProcessedMetrics++ | ||||
| 				} | ||||
| 			} | ||||
| 		} | ||||
| @@ -258,6 +264,7 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) { | ||||
| 		} | ||||
| 		if y, err := lp.New("gpfs_num_opens", m.tags, m.meta, map[string]interface{}{"value": numOpens}, timestamp); err == nil { | ||||
| 			output <- y | ||||
| 			m.statsProcessedMetrics++ | ||||
| 		} | ||||
|  | ||||
| 		// number of closes | ||||
| @@ -270,6 +277,7 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) { | ||||
| 		} | ||||
| 		if y, err := lp.New("gpfs_num_closes", m.tags, m.meta, map[string]interface{}{"value": numCloses}, timestamp); err == nil { | ||||
| 			output <- y | ||||
| 			m.statsProcessedMetrics++ | ||||
| 		} | ||||
|  | ||||
| 		// number of reads | ||||
| @@ -282,6 +290,7 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) { | ||||
| 		} | ||||
| 		if y, err := lp.New("gpfs_num_reads", m.tags, m.meta, map[string]interface{}{"value": numReads}, timestamp); err == nil { | ||||
| 			output <- y | ||||
| 			m.statsProcessedMetrics++ | ||||
| 		} | ||||
|  | ||||
| 		// number of writes | ||||
| @@ -294,6 +303,7 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) { | ||||
| 		} | ||||
| 		if y, err := lp.New("gpfs_num_writes", m.tags, m.meta, map[string]interface{}{"value": numWrites}, timestamp); err == nil { | ||||
| 			output <- y | ||||
| 			m.statsProcessedMetrics++ | ||||
| 		} | ||||
|  | ||||
| 		// number of read directories | ||||
| @@ -306,6 +316,7 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) { | ||||
| 		} | ||||
| 		if y, err := lp.New("gpfs_num_readdirs", m.tags, m.meta, map[string]interface{}{"value": numReaddirs}, timestamp); err == nil { | ||||
| 			output <- y | ||||
| 			m.statsProcessedMetrics++ | ||||
| 		} | ||||
|  | ||||
| 		// Number of inode updates | ||||
| @@ -317,9 +328,11 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) { | ||||
| 			continue | ||||
| 		} | ||||
| 		if y, err := lp.New("gpfs_num_inode_updates", m.tags, m.meta, map[string]interface{}{"value": numInodeUpdates}, timestamp); err == nil { | ||||
| 			m.statsProcessedMetrics++ | ||||
| 			output <- y | ||||
| 		} | ||||
| 	} | ||||
| 	stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics) | ||||
| } | ||||
|  | ||||
| func (m *GpfsCollector) Close() { | ||||
|   | ||||
| @@ -7,6 +7,7 @@ import ( | ||||
|  | ||||
| 	cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" | ||||
| 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | ||||
| 	stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" | ||||
| 	"golang.org/x/sys/unix" | ||||
|  | ||||
| 	"encoding/json" | ||||
| @@ -41,6 +42,7 @@ type InfinibandCollector struct { | ||||
| 	} | ||||
| 	info                  []*InfinibandCollectorInfo | ||||
| 	lastTimestamp         time.Time // Store time stamp of last tick to derive bandwidths | ||||
| 	statsProcessedMetrics int64 | ||||
| } | ||||
|  | ||||
| // Init initializes the Infiniband collector by walking through files below IB_BASEPATH | ||||
| @@ -149,7 +151,7 @@ func (m *InfinibandCollector) Init(config json.RawMessage) error { | ||||
| 	if len(m.info) == 0 { | ||||
| 		return fmt.Errorf("found no IB devices") | ||||
| 	} | ||||
|  | ||||
| 	m.statsProcessedMetrics = 0 | ||||
| 	m.init = true | ||||
| 	return nil | ||||
| } | ||||
| @@ -196,6 +198,7 @@ func (m *InfinibandCollector) Read(interval time.Duration, output chan lp.CCMetr | ||||
| 				if y, err := lp.New(counterName, info.tagSet, m.meta, map[string]interface{}{"value": v}, now); err == nil { | ||||
| 					y.AddMeta("unit", counterDef.unit) | ||||
| 					output <- y | ||||
| 					m.statsProcessedMetrics++ | ||||
| 				} | ||||
| 			} | ||||
|  | ||||
| @@ -206,6 +209,7 @@ func (m *InfinibandCollector) Read(interval time.Duration, output chan lp.CCMetr | ||||
| 					if y, err := lp.New(counterName+"_bw", info.tagSet, m.meta, map[string]interface{}{"value": rate}, now); err == nil { | ||||
| 						y.AddMeta("unit", counterDef.unit+"/sec") | ||||
| 						output <- y | ||||
| 						m.statsProcessedMetrics++ | ||||
| 					} | ||||
| 				} | ||||
| 				// Save current state | ||||
| @@ -214,6 +218,7 @@ func (m *InfinibandCollector) Read(interval time.Duration, output chan lp.CCMetr | ||||
| 		} | ||||
|  | ||||
| 	} | ||||
| 	stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics) | ||||
| } | ||||
|  | ||||
| func (m *InfinibandCollector) Close() { | ||||
|   | ||||
| @@ -6,6 +6,7 @@ import ( | ||||
|  | ||||
| 	cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" | ||||
| 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | ||||
| 	stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" | ||||
|  | ||||
| 	//	"log" | ||||
| 	"encoding/json" | ||||
| @@ -32,6 +33,7 @@ type IOstatCollector struct { | ||||
| 	matches               map[string]int | ||||
| 	config                IOstatCollectorConfig | ||||
| 	devices               map[string]IOstatCollectorEntry | ||||
| 	statsProcessedMetrics int64 | ||||
| } | ||||
|  | ||||
| func (m *IOstatCollector) Init(config json.RawMessage) error { | ||||
| @@ -102,6 +104,7 @@ func (m *IOstatCollector) Init(config json.RawMessage) error { | ||||
| 			lastValues: values, | ||||
| 		} | ||||
| 	} | ||||
| 	m.statsProcessedMetrics = 0 | ||||
| 	m.init = true | ||||
| 	return err | ||||
| } | ||||
| @@ -141,6 +144,7 @@ func (m *IOstatCollector) Read(interval time.Duration, output chan lp.CCMetric) | ||||
| 					y, err := lp.New(name, entry.tags, m.meta, map[string]interface{}{"value": int(diff)}, time.Now()) | ||||
| 					if err == nil { | ||||
| 						output <- y | ||||
| 						m.statsProcessedMetrics++ | ||||
| 					} | ||||
| 				} | ||||
| 				entry.lastValues[name] = x | ||||
| @@ -148,6 +152,7 @@ func (m *IOstatCollector) Read(interval time.Duration, output chan lp.CCMetric) | ||||
| 		} | ||||
| 		m.devices[device] = entry | ||||
| 	} | ||||
| 	stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics) | ||||
| } | ||||
|  | ||||
| func (m *IOstatCollector) Close() { | ||||
|   | ||||
| @@ -11,6 +11,7 @@ import ( | ||||
| 	"time" | ||||
|  | ||||
| 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | ||||
| 	stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" | ||||
| ) | ||||
|  | ||||
| const IPMITOOL_PATH = `ipmitool` | ||||
| @@ -29,6 +30,7 @@ type IpmiCollector struct { | ||||
| 	config                IpmiCollectorConfig | ||||
| 	ipmitool              string | ||||
| 	ipmisensors           string | ||||
| 	statsProcessedMetrics int64 | ||||
| } | ||||
|  | ||||
| func (m *IpmiCollector) Init(config json.RawMessage) error { | ||||
| @@ -56,6 +58,7 @@ func (m *IpmiCollector) Init(config json.RawMessage) error { | ||||
| 	if len(m.ipmitool) == 0 && len(m.ipmisensors) == 0 { | ||||
| 		return errors.New("no IPMI reader found") | ||||
| 	} | ||||
| 	m.statsProcessedMetrics = 0 | ||||
| 	m.init = true | ||||
| 	return nil | ||||
| } | ||||
| @@ -94,6 +97,7 @@ func (m *IpmiCollector) readIpmiTool(cmd string, output chan lp.CCMetric) { | ||||
| 			if err == nil { | ||||
| 				y.AddMeta("unit", unit) | ||||
| 				output <- y | ||||
| 				m.statsProcessedMetrics++ | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
| @@ -123,6 +127,7 @@ func (m *IpmiCollector) readIpmiSensors(cmd string, output chan lp.CCMetric) { | ||||
| 						y.AddMeta("unit", lv[4]) | ||||
| 					} | ||||
| 					output <- y | ||||
| 					m.statsProcessedMetrics++ | ||||
| 				} | ||||
| 			} | ||||
| 		} | ||||
| @@ -141,6 +146,7 @@ func (m *IpmiCollector) Read(interval time.Duration, output chan lp.CCMetric) { | ||||
| 			m.readIpmiSensors(m.config.IpmisensorsPath, output) | ||||
| 		} | ||||
| 	} | ||||
| 	stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics) | ||||
| } | ||||
|  | ||||
| func (m *IpmiCollector) Close() { | ||||
|   | ||||
| @@ -28,6 +28,7 @@ import ( | ||||
| 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | ||||
| 	topo "github.com/ClusterCockpit/cc-metric-collector/internal/ccTopology" | ||||
| 	agg "github.com/ClusterCockpit/cc-metric-collector/internal/metricAggregator" | ||||
| 	stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" | ||||
| 	"github.com/NVIDIA/go-nvml/pkg/dl" | ||||
| ) | ||||
|  | ||||
| @@ -84,6 +85,9 @@ type LikwidCollector struct { | ||||
| 	initialized           bool | ||||
| 	likwidGroups          map[C.int]LikwidEventsetConfig | ||||
| 	lock                  sync.Mutex | ||||
| 	statsMeasurements     int64 | ||||
| 	statsProcessedMetrics int64 | ||||
| 	statsPublishedMetrics int64 | ||||
| } | ||||
|  | ||||
| type LikwidMetric struct { | ||||
| @@ -267,6 +271,9 @@ func (m *LikwidCollector) Init(config json.RawMessage) error { | ||||
| 		cclog.ComponentError(m.name, err.Error()) | ||||
| 		return err | ||||
| 	} | ||||
| 	m.statsMeasurements = 0 | ||||
| 	m.statsProcessedMetrics = 0 | ||||
| 	m.statsPublishedMetrics = 0 | ||||
| 	m.init = true | ||||
| 	return nil | ||||
| } | ||||
| @@ -274,6 +281,7 @@ func (m *LikwidCollector) Init(config json.RawMessage) error { | ||||
| // take a measurement for 'interval' seconds of event set index 'group' | ||||
| func (m *LikwidCollector) takeMeasurement(evset LikwidEventsetConfig, interval time.Duration) (bool, error) { | ||||
| 	var ret C.int | ||||
|  | ||||
| 	m.lock.Lock() | ||||
| 	if m.initialized { | ||||
| 		ret = C.perfmon_setupCounters(evset.gid) | ||||
| @@ -317,6 +325,8 @@ func (m *LikwidCollector) takeMeasurement(evset LikwidEventsetConfig, interval t | ||||
| 		} | ||||
| 	} | ||||
| 	m.lock.Unlock() | ||||
| 	m.statsMeasurements++ | ||||
| 	stats.ComponentStatInt(m.name, "measurements", m.statsMeasurements) | ||||
| 	return false, nil | ||||
| } | ||||
|  | ||||
| @@ -357,6 +367,8 @@ func (m *LikwidCollector) calcEventsetMetrics(evset LikwidEventsetConfig, interv | ||||
| 				if m.config.InvalidToZero && math.IsInf(value, 0) { | ||||
| 					value = 0.0 | ||||
| 				} | ||||
| 				m.statsProcessedMetrics++ | ||||
| 				stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics) | ||||
| 				// Now we have the result, send it with the proper tags | ||||
| 				if !math.IsNaN(value) { | ||||
| 					if metric.Publish { | ||||
| @@ -369,6 +381,8 @@ func (m *LikwidCollector) calcEventsetMetrics(evset LikwidEventsetConfig, interv | ||||
| 							if len(metric.Unit) > 0 { | ||||
| 								y.AddMeta("unit", metric.Unit) | ||||
| 							} | ||||
| 							m.statsPublishedMetrics++ | ||||
| 							stats.ComponentStatInt(m.name, "published_metrics", m.statsPublishedMetrics) | ||||
| 							output <- y | ||||
| 						} | ||||
| 					} | ||||
| @@ -409,6 +423,8 @@ func (m *LikwidCollector) calcGlobalMetrics(interval time.Duration, output chan | ||||
| 				if m.config.InvalidToZero && math.IsInf(value, 0) { | ||||
| 					value = 0.0 | ||||
| 				} | ||||
| 				m.statsProcessedMetrics++ | ||||
| 				stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics) | ||||
| 				// Now we have the result, send it with the proper tags | ||||
| 				if !math.IsNaN(value) { | ||||
| 					if metric.Publish { | ||||
| @@ -422,6 +438,8 @@ func (m *LikwidCollector) calcGlobalMetrics(interval time.Duration, output chan | ||||
| 							if len(metric.Unit) > 0 { | ||||
| 								y.AddMeta("unit", metric.Unit) | ||||
| 							} | ||||
| 							m.statsPublishedMetrics++ | ||||
| 							stats.ComponentStatInt(m.name, "published_metrics", m.statsPublishedMetrics) | ||||
| 							output <- y | ||||
| 						} | ||||
| 					} | ||||
|   | ||||
| @@ -10,6 +10,7 @@ import ( | ||||
|  | ||||
| 	cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" | ||||
| 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | ||||
| 	stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" | ||||
| ) | ||||
|  | ||||
| // | ||||
| @@ -32,6 +33,7 @@ type LoadavgCollector struct { | ||||
| 	config       struct { | ||||
| 		ExcludeMetrics []string `json:"exclude_metrics,omitempty"` | ||||
| 	} | ||||
| 	statsProcessedMetrics int64 | ||||
| } | ||||
|  | ||||
| func (m *LoadavgCollector) Init(config json.RawMessage) error { | ||||
| @@ -63,6 +65,7 @@ func (m *LoadavgCollector) Init(config json.RawMessage) error { | ||||
| 	for i, name := range m.proc_matches { | ||||
| 		_, m.proc_skips[i] = stringArrayContains(m.config.ExcludeMetrics, name) | ||||
| 	} | ||||
| 	m.statsProcessedMetrics = 0 | ||||
| 	m.init = true | ||||
| 	return nil | ||||
| } | ||||
| @@ -98,6 +101,7 @@ func (m *LoadavgCollector) Read(interval time.Duration, output chan lp.CCMetric) | ||||
| 		y, err := lp.New(name, m.tags, m.meta, map[string]interface{}{"value": x}, now) | ||||
| 		if err == nil { | ||||
| 			output <- y | ||||
| 			m.statsProcessedMetrics++ | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| @@ -117,9 +121,10 @@ func (m *LoadavgCollector) Read(interval time.Duration, output chan lp.CCMetric) | ||||
| 		y, err := lp.New(name, m.tags, m.meta, map[string]interface{}{"value": x}, now) | ||||
| 		if err == nil { | ||||
| 			output <- y | ||||
| 			m.statsProcessedMetrics++ | ||||
| 		} | ||||
|  | ||||
| 	} | ||||
| 	stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics) | ||||
| } | ||||
|  | ||||
| func (m *LoadavgCollector) Close() { | ||||
|   | ||||
| @@ -12,6 +12,7 @@ import ( | ||||
|  | ||||
| 	cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" | ||||
| 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | ||||
| 	stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" | ||||
| ) | ||||
|  | ||||
| const LUSTRE_SYSFS = `/sys/fs/lustre` | ||||
| @@ -44,6 +45,7 @@ type LustreCollector struct { | ||||
| 	lastTimestamp         time.Time                   // Store time stamp of last tick to derive bandwidths | ||||
| 	definitions           []LustreMetricDefinition    // Combined list without excluded metrics | ||||
| 	stats                 map[string]map[string]int64 // Data for last value per device and metric | ||||
| 	statsProcessedMetrics int64 | ||||
| } | ||||
|  | ||||
| func (m *LustreCollector) getDeviceDataCommand(device string) []string { | ||||
| @@ -372,6 +374,7 @@ func (m *LustreCollector) Init(config json.RawMessage) error { | ||||
| 		} | ||||
| 	} | ||||
| 	m.lastTimestamp = time.Now() | ||||
| 	m.statsProcessedMetrics = 0 | ||||
| 	m.init = true | ||||
| 	return nil | ||||
| } | ||||
| @@ -418,11 +421,13 @@ func (m *LustreCollector) Read(interval time.Duration, output chan lp.CCMetric) | ||||
| 					y.AddMeta("unit", def.unit) | ||||
| 				} | ||||
| 				output <- y | ||||
| 				m.statsProcessedMetrics++ | ||||
| 			} | ||||
| 			devData[def.name] = use_x | ||||
| 		} | ||||
| 	} | ||||
| 	m.lastTimestamp = now | ||||
| 	stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics) | ||||
| } | ||||
|  | ||||
| func (m *LustreCollector) Close() { | ||||
|   | ||||
| @@ -14,6 +14,7 @@ import ( | ||||
|  | ||||
| 	cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" | ||||
| 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | ||||
| 	stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" | ||||
| ) | ||||
|  | ||||
| const MEMSTATFILE = "/proc/meminfo" | ||||
| @@ -38,6 +39,7 @@ type MemstatCollector struct { | ||||
| 	config                MemstatCollectorConfig | ||||
| 	nodefiles             map[int]MemstatCollectorNode | ||||
| 	sendMemUsed           bool | ||||
| 	statsProcessedMetrics int64 | ||||
| } | ||||
|  | ||||
| type MemstatStats struct { | ||||
| @@ -153,6 +155,7 @@ func (m *MemstatCollector) Init(config json.RawMessage) error { | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
| 	m.statsProcessedMetrics = 0 | ||||
| 	m.init = true | ||||
| 	return err | ||||
| } | ||||
| @@ -178,6 +181,7 @@ func (m *MemstatCollector) Read(interval time.Duration, output chan lp.CCMetric) | ||||
| 				if len(unit) > 0 { | ||||
| 					y.AddMeta("unit", unit) | ||||
| 				} | ||||
| 				m.statsProcessedMetrics++ | ||||
| 				output <- y | ||||
| 			} | ||||
| 		} | ||||
| @@ -207,6 +211,7 @@ func (m *MemstatCollector) Read(interval time.Duration, output chan lp.CCMetric) | ||||
| 				if len(unit) > 0 { | ||||
| 					y.AddMeta("unit", unit) | ||||
| 				} | ||||
| 				m.statsProcessedMetrics++ | ||||
| 				output <- y | ||||
| 			} | ||||
| 		} | ||||
| @@ -223,6 +228,7 @@ func (m *MemstatCollector) Read(interval time.Duration, output chan lp.CCMetric) | ||||
| 			sendStats(stats, nodeConf.tags) | ||||
| 		} | ||||
| 	} | ||||
| 	stats.ComponentStatInt(m.name, "collected_metrics", m.statsProcessedMetrics) | ||||
| } | ||||
|  | ||||
| func (m *MemstatCollector) Close() { | ||||
|   | ||||
| @@ -11,6 +11,7 @@ import ( | ||||
|  | ||||
| 	cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" | ||||
| 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | ||||
| 	stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" | ||||
| ) | ||||
|  | ||||
| const NETSTATFILE = "/proc/net/dev" | ||||
| @@ -35,6 +36,7 @@ type NetstatCollector struct { | ||||
| 	config                NetstatCollectorConfig | ||||
| 	matches               map[string][]NetstatCollectorMetric | ||||
| 	lastTimestamp         time.Time | ||||
| 	statsProcessedMetrics int64 | ||||
| } | ||||
|  | ||||
| func (m *NetstatCollector) Init(config json.RawMessage) error { | ||||
| @@ -148,6 +150,7 @@ func (m *NetstatCollector) Init(config json.RawMessage) error { | ||||
| 	if len(m.matches) == 0 { | ||||
| 		return errors.New("no devices to collector metrics found") | ||||
| 	} | ||||
| 	m.statsProcessedMetrics = 0 | ||||
| 	m.init = true | ||||
| 	return nil | ||||
| } | ||||
| @@ -198,6 +201,7 @@ func (m *NetstatCollector) Read(interval time.Duration, output chan lp.CCMetric) | ||||
| 				if m.config.SendAbsoluteValues { | ||||
| 					if y, err := lp.New(metric.name, metric.tags, metric.meta, map[string]interface{}{"value": v}, now); err == nil { | ||||
| 						output <- y | ||||
| 						m.statsProcessedMetrics++ | ||||
| 					} | ||||
| 				} | ||||
| 				if m.config.SendDerivedValues { | ||||
| @@ -205,6 +209,7 @@ func (m *NetstatCollector) Read(interval time.Duration, output chan lp.CCMetric) | ||||
| 						rate := float64(v-metric.lastValue) / timeDiff | ||||
| 						if y, err := lp.New(metric.name+"_bw", metric.tags, metric.meta_rates, map[string]interface{}{"value": rate}, now); err == nil { | ||||
| 							output <- y | ||||
| 							m.statsProcessedMetrics++ | ||||
| 						} | ||||
| 					} | ||||
| 					metric.lastValue = v | ||||
| @@ -212,6 +217,7 @@ func (m *NetstatCollector) Read(interval time.Duration, output chan lp.CCMetric) | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
| 	stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics) | ||||
| } | ||||
|  | ||||
| func (m *NetstatCollector) Close() { | ||||
|   | ||||
| @@ -12,6 +12,7 @@ import ( | ||||
| 	"time" | ||||
|  | ||||
| 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | ||||
| 	stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" | ||||
| ) | ||||
|  | ||||
| // First part contains the code for the general NfsCollector. | ||||
| @@ -33,6 +34,7 @@ type nfsCollector struct { | ||||
| 		ExcludeMetrics []string `json:"exclude_metrics,omitempty"` | ||||
| 	} | ||||
| 	data                  map[string]NfsCollectorData | ||||
| 	statsProcessedMetrics int64 | ||||
| } | ||||
|  | ||||
| func (m *nfsCollector) initStats() error { | ||||
| @@ -113,6 +115,7 @@ func (m *nfsCollector) MainInit(config json.RawMessage) error { | ||||
| 	} | ||||
| 	m.data = make(map[string]NfsCollectorData) | ||||
| 	m.initStats() | ||||
| 	m.statsProcessedMetrics = 0 | ||||
| 	m.init = true | ||||
| 	return nil | ||||
| } | ||||
| @@ -143,8 +146,10 @@ func (m *nfsCollector) Read(interval time.Duration, output chan lp.CCMetric) { | ||||
| 		if err == nil { | ||||
| 			y.AddMeta("version", m.version) | ||||
| 			output <- y | ||||
| 			m.statsProcessedMetrics++ | ||||
| 		} | ||||
| 	} | ||||
| 	stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics) | ||||
| } | ||||
|  | ||||
| func (m *nfsCollector) Close() { | ||||
|   | ||||
| @@ -12,6 +12,7 @@ import ( | ||||
|  | ||||
| 	cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" | ||||
| 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | ||||
| 	stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" | ||||
| ) | ||||
|  | ||||
| // | ||||
| @@ -45,6 +46,7 @@ type NUMAStatsCollectorTopolgy struct { | ||||
| type NUMAStatsCollector struct { | ||||
| 	metricCollector | ||||
| 	topology              []NUMAStatsCollectorTopolgy | ||||
| 	statsProcessedMetrics int64 | ||||
| } | ||||
|  | ||||
| func (m *NUMAStatsCollector) Init(config json.RawMessage) error { | ||||
| @@ -80,7 +82,7 @@ func (m *NUMAStatsCollector) Init(config json.RawMessage) error { | ||||
| 				tagSet: map[string]string{"memoryDomain": node}, | ||||
| 			}) | ||||
| 	} | ||||
|  | ||||
| 	m.statsProcessedMetrics = 0 | ||||
| 	m.init = true | ||||
| 	return nil | ||||
| } | ||||
| @@ -127,11 +129,13 @@ func (m *NUMAStatsCollector) Read(interval time.Duration, output chan lp.CCMetri | ||||
| 			) | ||||
| 			if err == nil { | ||||
| 				output <- y | ||||
| 				m.statsProcessedMetrics++ | ||||
| 			} | ||||
| 		} | ||||
|  | ||||
| 		file.Close() | ||||
| 	} | ||||
| 	stats.ComponentStatInt(m.name, "collected_metrics", m.statsProcessedMetrics) | ||||
| } | ||||
|  | ||||
| func (m *NUMAStatsCollector) Close() { | ||||
|   | ||||
| @@ -9,6 +9,7 @@ import ( | ||||
|  | ||||
| 	cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" | ||||
| 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | ||||
| 	stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" | ||||
| 	"github.com/NVIDIA/go-nvml/pkg/nvml" | ||||
| ) | ||||
|  | ||||
| @@ -29,6 +30,7 @@ type NvidiaCollector struct { | ||||
| 	num_gpus              int | ||||
| 	config                NvidiaCollectorConfig | ||||
| 	gpus                  []NvidiaCollectorDevice | ||||
| 	statsProcessedMetrics int64 | ||||
| } | ||||
|  | ||||
| func (m *NvidiaCollector) CatchPanic() { | ||||
| @@ -120,7 +122,7 @@ func (m *NvidiaCollector) Init(config json.RawMessage) error { | ||||
| 				pciInfo.Device) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	m.statsProcessedMetrics = 0 | ||||
| 	m.init = true | ||||
| 	return nil | ||||
| } | ||||
| @@ -151,6 +153,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric) | ||||
| 					if err == nil { | ||||
| 						y.AddMeta("unit", "%") | ||||
| 						output <- y | ||||
| 						m.statsProcessedMetrics++ | ||||
| 					} | ||||
| 				} | ||||
| 				if !device.excludeMetrics["nv_mem_util"] { | ||||
| @@ -158,6 +161,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric) | ||||
| 					if err == nil { | ||||
| 						y.AddMeta("unit", "%") | ||||
| 						output <- y | ||||
| 						m.statsProcessedMetrics++ | ||||
| 					} | ||||
| 				} | ||||
| 			} | ||||
| @@ -186,6 +190,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric) | ||||
| 					if err == nil { | ||||
| 						y.AddMeta("unit", "MByte") | ||||
| 						output <- y | ||||
| 						m.statsProcessedMetrics++ | ||||
| 					} | ||||
| 				} | ||||
|  | ||||
| @@ -195,6 +200,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric) | ||||
| 					if err == nil { | ||||
| 						y.AddMeta("unit", "MByte") | ||||
| 						output <- y | ||||
| 						m.statsProcessedMetrics++ | ||||
| 					} | ||||
| 				} | ||||
| 			} | ||||
| @@ -212,6 +218,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric) | ||||
| 				if err == nil { | ||||
| 					y.AddMeta("unit", "degC") | ||||
| 					output <- y | ||||
| 					m.statsProcessedMetrics++ | ||||
| 				} | ||||
| 			} | ||||
| 		} | ||||
| @@ -232,6 +239,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric) | ||||
| 				if err == nil { | ||||
| 					y.AddMeta("unit", "%") | ||||
| 					output <- y | ||||
| 					m.statsProcessedMetrics++ | ||||
| 				} | ||||
| 			} | ||||
| 		} | ||||
| @@ -258,11 +266,13 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric) | ||||
| 				} | ||||
| 				if err == nil { | ||||
| 					output <- y | ||||
| 					m.statsProcessedMetrics++ | ||||
| 				} | ||||
| 			} else if ret == nvml.ERROR_NOT_SUPPORTED { | ||||
| 				y, err := lp.New("nv_ecc_mode", device.tags, m.meta, map[string]interface{}{"value": "N/A"}, time.Now()) | ||||
| 				if err == nil { | ||||
| 					output <- y | ||||
| 					m.statsProcessedMetrics++ | ||||
| 				} | ||||
| 			} | ||||
| 		} | ||||
| @@ -280,6 +290,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric) | ||||
| 				y, err := lp.New("nv_perf_state", device.tags, m.meta, map[string]interface{}{"value": fmt.Sprintf("P%d", int(pState))}, time.Now()) | ||||
| 				if err == nil { | ||||
| 					output <- y | ||||
| 					m.statsProcessedMetrics++ | ||||
| 				} | ||||
| 			} | ||||
| 		} | ||||
| @@ -296,6 +307,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric) | ||||
| 				if err == nil { | ||||
| 					y.AddMeta("unit", "watts") | ||||
| 					output <- y | ||||
| 					m.statsProcessedMetrics++ | ||||
| 				} | ||||
| 			} | ||||
| 		} | ||||
| @@ -313,6 +325,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric) | ||||
| 				if err == nil { | ||||
| 					y.AddMeta("unit", "MHz") | ||||
| 					output <- y | ||||
| 					m.statsProcessedMetrics++ | ||||
| 				} | ||||
| 			} | ||||
| 		} | ||||
| @@ -324,6 +337,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric) | ||||
| 				if err == nil { | ||||
| 					y.AddMeta("unit", "MHz") | ||||
| 					output <- y | ||||
| 					m.statsProcessedMetrics++ | ||||
| 				} | ||||
| 			} | ||||
| 		} | ||||
| @@ -335,6 +349,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric) | ||||
| 				if err == nil { | ||||
| 					y.AddMeta("unit", "MHz") | ||||
| 					output <- y | ||||
| 					m.statsProcessedMetrics++ | ||||
| 				} | ||||
| 			} | ||||
| 		} | ||||
| @@ -357,6 +372,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric) | ||||
| 				if err == nil { | ||||
| 					y.AddMeta("unit", "MHz") | ||||
| 					output <- y | ||||
| 					m.statsProcessedMetrics++ | ||||
| 				} | ||||
| 			} | ||||
| 		} | ||||
| @@ -368,6 +384,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric) | ||||
| 				if err == nil { | ||||
| 					y.AddMeta("unit", "MHz") | ||||
| 					output <- y | ||||
| 					m.statsProcessedMetrics++ | ||||
| 				} | ||||
| 			} | ||||
| 		} | ||||
| @@ -379,6 +396,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric) | ||||
| 				if err == nil { | ||||
| 					y.AddMeta("unit", "MHz") | ||||
| 					output <- y | ||||
| 					m.statsProcessedMetrics++ | ||||
| 				} | ||||
| 			} | ||||
| 		} | ||||
| @@ -398,6 +416,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric) | ||||
| 				y, err := lp.New("nv_ecc_db_error", device.tags, m.meta, map[string]interface{}{"value": float64(ecc_db)}, time.Now()) | ||||
| 				if err == nil { | ||||
| 					output <- y | ||||
| 					m.statsProcessedMetrics++ | ||||
| 				} | ||||
| 			} | ||||
| 		} | ||||
| @@ -408,6 +427,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric) | ||||
| 				y, err := lp.New("nv_ecc_sb_error", device.tags, m.meta, map[string]interface{}{"value": float64(ecc_sb)}, time.Now()) | ||||
| 				if err == nil { | ||||
| 					output <- y | ||||
| 					m.statsProcessedMetrics++ | ||||
| 				} | ||||
| 			} | ||||
| 		} | ||||
| @@ -425,6 +445,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric) | ||||
| 				if err == nil { | ||||
| 					y.AddMeta("unit", "watts") | ||||
| 					output <- y | ||||
| 					m.statsProcessedMetrics++ | ||||
| 				} | ||||
| 			} | ||||
| 		} | ||||
| @@ -441,6 +462,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric) | ||||
| 				if err == nil { | ||||
| 					y.AddMeta("unit", "%") | ||||
| 					output <- y | ||||
| 					m.statsProcessedMetrics++ | ||||
| 				} | ||||
| 			} | ||||
| 		} | ||||
| @@ -457,11 +479,12 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric) | ||||
| 				if err == nil { | ||||
| 					y.AddMeta("unit", "%") | ||||
| 					output <- y | ||||
| 					m.statsProcessedMetrics++ | ||||
| 				} | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	stats.ComponentStatInt(m.name, "collected_metrics", m.statsProcessedMetrics) | ||||
| } | ||||
|  | ||||
| func (m *NvidiaCollector) Close() { | ||||
|   | ||||
| @@ -6,6 +6,7 @@ import ( | ||||
|  | ||||
| 	cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" | ||||
| 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | ||||
| 	stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" | ||||
| ) | ||||
|  | ||||
| // These are the fields we read from the JSON configuration | ||||
| @@ -20,6 +21,7 @@ type SampleCollector struct { | ||||
| 	config     SampleTimerCollectorConfig // the configuration structure | ||||
| 	meta       map[string]string          // default meta information | ||||
| 	tags       map[string]string          // default tags | ||||
| 	statsCount int64 | ||||
| } | ||||
|  | ||||
| // Functions to implement MetricCollector interface | ||||
| @@ -58,6 +60,9 @@ func (m *SampleCollector) Init(config json.RawMessage) error { | ||||
| 	// for all topological entities (sockets, NUMA domains, ...) | ||||
| 	// Return some useful error message in case of any failures | ||||
|  | ||||
| 	// Initialize counts for statistics | ||||
| 	m.statsCount = 0 | ||||
|  | ||||
| 	// Set this flag only if everything is initialized properly, all required files exist, ... | ||||
| 	m.init = true | ||||
| 	return err | ||||
| @@ -80,8 +85,11 @@ func (m *SampleCollector) Read(interval time.Duration, output chan lp.CCMetric) | ||||
| 	if err == nil { | ||||
| 		// Send it to output channel | ||||
| 		output <- y | ||||
| 		// increment count for each sent metric or any other operation | ||||
| 		m.statsCount++ | ||||
| 	} | ||||
|  | ||||
| 	// Set stats for the component | ||||
| 	stats.ComponentStatInt(m.name, "count", m.statsCount) | ||||
| } | ||||
|  | ||||
| // Close metric collector: close network connection, close files, close libraries, ... | ||||
|   | ||||
| @@ -11,6 +11,7 @@ import ( | ||||
|  | ||||
| 	cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" | ||||
| 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | ||||
| 	stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" | ||||
| ) | ||||
|  | ||||
| // See: https://www.kernel.org/doc/html/latest/hwmon/sysfs-interface.html | ||||
| @@ -41,6 +42,7 @@ type TempCollector struct { | ||||
| 		ReportCriticalTemp bool                         `json:"report_critical_temperature"` | ||||
| 	} | ||||
| 	sensors               []*TempCollectorSensor | ||||
| 	statsProcessedMetrics int64 | ||||
| } | ||||
|  | ||||
| func (m *TempCollector) Init(config json.RawMessage) error { | ||||
| @@ -162,6 +164,7 @@ func (m *TempCollector) Init(config json.RawMessage) error { | ||||
| 	} | ||||
|  | ||||
| 	// Finished initialization | ||||
| 	m.statsProcessedMetrics = 0 | ||||
| 	m.init = true | ||||
| 	return nil | ||||
| } | ||||
| @@ -194,6 +197,7 @@ func (m *TempCollector) Read(interval time.Duration, output chan lp.CCMetric) { | ||||
| 		) | ||||
| 		if err == nil { | ||||
| 			output <- y | ||||
| 			m.statsProcessedMetrics++ | ||||
| 		} | ||||
|  | ||||
| 		// max temperature | ||||
| @@ -207,6 +211,7 @@ func (m *TempCollector) Read(interval time.Duration, output chan lp.CCMetric) { | ||||
| 			) | ||||
| 			if err == nil { | ||||
| 				output <- y | ||||
| 				m.statsProcessedMetrics++ | ||||
| 			} | ||||
| 		} | ||||
|  | ||||
| @@ -221,10 +226,11 @@ func (m *TempCollector) Read(interval time.Duration, output chan lp.CCMetric) { | ||||
| 			) | ||||
| 			if err == nil { | ||||
| 				output <- y | ||||
| 				m.statsProcessedMetrics++ | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics) | ||||
| } | ||||
|  | ||||
| func (m *TempCollector) Close() { | ||||
|   | ||||
| @@ -10,6 +10,7 @@ import ( | ||||
| 	"time" | ||||
|  | ||||
| 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | ||||
| 	stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" | ||||
| ) | ||||
|  | ||||
| const MAX_NUM_PROCS = 10 | ||||
| @@ -23,6 +24,7 @@ type TopProcsCollector struct { | ||||
| 	metricCollector | ||||
| 	tags                  map[string]string | ||||
| 	config                TopProcsCollectorConfig | ||||
| 	statsProcessedMetrics int64 | ||||
| } | ||||
|  | ||||
| func (m *TopProcsCollector) Init(config json.RawMessage) error { | ||||
| @@ -48,6 +50,7 @@ func (m *TopProcsCollector) Init(config json.RawMessage) error { | ||||
| 	if err != nil { | ||||
| 		return errors.New("failed to execute command") | ||||
| 	} | ||||
| 	m.statsProcessedMetrics = 0 | ||||
| 	m.init = true | ||||
| 	return nil | ||||
| } | ||||
| @@ -70,8 +73,10 @@ func (m *TopProcsCollector) Read(interval time.Duration, output chan lp.CCMetric | ||||
| 		y, err := lp.New(name, m.tags, m.meta, map[string]interface{}{"value": string(lines[i])}, time.Now()) | ||||
| 		if err == nil { | ||||
| 			output <- y | ||||
| 			m.statsProcessedMetrics++ | ||||
| 		} | ||||
| 	} | ||||
| 	stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics) | ||||
| } | ||||
|  | ||||
| func (m *TopProcsCollector) Close() { | ||||
|   | ||||
							
								
								
									
										17
									
								
								internal/metricRouter/StatsApi.md
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										17
									
								
								internal/metricRouter/StatsApi.md
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,17 @@ | ||||
| # Stats API | ||||
|  | ||||
| The Stats API can be used for debugging. It publishes counts at an HTTP endpoint as JSON from different componenets of the CC Metric Collector. | ||||
|  | ||||
| # Configuration | ||||
|  | ||||
| The Stats API has an own configuration file to specify the listen host and port. The defaults are `localhost` and `8080`. | ||||
|  | ||||
| ```json | ||||
| { | ||||
|   "bindhost" : "", | ||||
|   "port" : "8080", | ||||
|   "publish_collectorstate" : true | ||||
| } | ||||
| ``` | ||||
|  | ||||
| The `bindhost` and `port` can be used to specify the listen host and port. The `publish_collectorstate` needs to be `true`, otherwise nothing is presented. This option is for future use if we need to publish more infos using different domains. | ||||
							
								
								
									
										232
									
								
								internal/metricRouter/metricApi.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										232
									
								
								internal/metricRouter/metricApi.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,232 @@ | ||||
| package metricRouter | ||||
|  | ||||
| import ( | ||||
| 	"context" | ||||
| 	"encoding/json" | ||||
| 	"fmt" | ||||
| 	"io" | ||||
| 	"net/http" | ||||
| 	"os" | ||||
| 	"sync" | ||||
| 	"time" | ||||
|  | ||||
| 	cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" | ||||
| 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | ||||
| 	mct "github.com/ClusterCockpit/cc-metric-collector/internal/multiChanTicker" | ||||
| 	"github.com/gorilla/mux" | ||||
| ) | ||||
|  | ||||
| type statsApiConfig struct { | ||||
| 	PublishCollectorState bool   `json:"publish_collectorstate"` | ||||
| 	Host                  string `json:"bindhost"` | ||||
| 	Port                  string `json:"port"` | ||||
| } | ||||
|  | ||||
| // Metric cache data structure | ||||
| type statsApi struct { | ||||
| 	name     string | ||||
| 	input    chan lp.CCMetric | ||||
| 	indone   chan bool | ||||
| 	outdone  chan bool | ||||
| 	config   statsApiConfig | ||||
| 	wg       *sync.WaitGroup | ||||
| 	statsWg  sync.WaitGroup | ||||
| 	ticker   mct.MultiChanTicker | ||||
| 	tickchan chan time.Time | ||||
| 	server   *http.Server | ||||
| 	router   *mux.Router | ||||
| 	lock     sync.Mutex | ||||
| 	baseurl  string | ||||
| 	stats    map[string]map[string]int64 | ||||
| 	outStats map[string]map[string]int64 | ||||
| } | ||||
|  | ||||
| type StatsApi interface { | ||||
| 	Start() | ||||
| 	Close() | ||||
| 	StatsFunc(w http.ResponseWriter, r *http.Request) | ||||
| } | ||||
|  | ||||
| var statsApiServer *statsApi = nil | ||||
|  | ||||
| func (a *statsApi) updateStats(point lp.CCMetric) { | ||||
| 	switch point.Name() { | ||||
| 	case "_stats": | ||||
| 		if name, nok := point.GetMeta("source"); nok { | ||||
| 			var compStats map[string]int64 | ||||
| 			var ok bool | ||||
|  | ||||
| 			if compStats, ok = a.stats[name]; !ok { | ||||
| 				a.stats[name] = make(map[string]int64) | ||||
| 				compStats = a.stats[name] | ||||
| 			} | ||||
| 			for k, v := range point.Fields() { | ||||
| 				switch value := v.(type) { | ||||
| 				case int: | ||||
| 					compStats[k] = int64(value) | ||||
| 				case uint: | ||||
| 					compStats[k] = int64(value) | ||||
| 				case int32: | ||||
| 					compStats[k] = int64(value) | ||||
| 				case uint32: | ||||
| 					compStats[k] = int64(value) | ||||
| 				case int64: | ||||
| 					compStats[k] = int64(value) | ||||
| 				case uint64: | ||||
| 					compStats[k] = int64(value) | ||||
| 				default: | ||||
| 					cclog.ComponentDebug(a.name, "Unusable stats for", k, ". Values should be int64") | ||||
| 				} | ||||
| 			} | ||||
| 			a.stats[name] = compStats | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (a *statsApi) Start() { | ||||
| 	a.ticker.AddChannel(a.tickchan) | ||||
| 	a.wg.Add(1) | ||||
| 	a.statsWg.Add(1) | ||||
| 	go func() { | ||||
| 		a.stats = make(map[string]map[string]int64) | ||||
| 		defer a.statsWg.Done() | ||||
| 		for { | ||||
| 			select { | ||||
| 			case <-a.indone: | ||||
| 				cclog.ComponentDebug(a.name, "INPUT DONE") | ||||
| 				close(a.indone) | ||||
| 				return | ||||
| 			case p := <-a.input: | ||||
| 				a.lock.Lock() | ||||
| 				a.updateStats(p) | ||||
| 				a.lock.Unlock() | ||||
| 			} | ||||
| 		} | ||||
| 	}() | ||||
| 	a.statsWg.Add(1) | ||||
| 	go func() { | ||||
| 		a.outStats = make(map[string]map[string]int64) | ||||
| 		defer a.statsWg.Done() | ||||
| 		a.lock.Lock() | ||||
| 		for comp, compData := range a.stats { | ||||
| 			var outData map[string]int64 | ||||
| 			var ok bool | ||||
| 			if outData, ok = a.outStats[comp]; !ok { | ||||
| 				outData = make(map[string]int64) | ||||
| 			} | ||||
| 			for k, v := range compData { | ||||
| 				outData[k] = v | ||||
| 			} | ||||
| 			a.outStats[comp] = outData | ||||
| 		} | ||||
| 		a.lock.Unlock() | ||||
| 		for { | ||||
| 			select { | ||||
| 			case <-a.outdone: | ||||
| 				cclog.ComponentDebug(a.name, "OUTPUT DONE") | ||||
| 				close(a.outdone) | ||||
| 				return | ||||
| 			case <-a.tickchan: | ||||
| 				a.lock.Lock() | ||||
| 				for comp, compData := range a.stats { | ||||
| 					var outData map[string]int64 | ||||
| 					var ok bool | ||||
| 					if outData, ok = a.outStats[comp]; !ok { | ||||
| 						outData = make(map[string]int64) | ||||
| 					} | ||||
| 					for k, v := range compData { | ||||
| 						outData[k] = v | ||||
| 					} | ||||
| 					a.outStats[comp] = outData | ||||
| 				} | ||||
| 				a.lock.Unlock() | ||||
| 			} | ||||
| 		} | ||||
| 	}() | ||||
| 	a.statsWg.Add(1) | ||||
| 	go func() { | ||||
| 		defer a.statsWg.Done() | ||||
| 		err := a.server.ListenAndServe() | ||||
| 		if err != nil && err.Error() != "http: Server closed" { | ||||
| 			cclog.ComponentError(a.name, err.Error()) | ||||
| 		} | ||||
| 		cclog.ComponentDebug(a.name, "SERVER DONE") | ||||
| 	}() | ||||
| 	cclog.ComponentDebug(a.name, "STARTED") | ||||
| } | ||||
|  | ||||
| func (a *statsApi) StatsFunc(w http.ResponseWriter, r *http.Request) { | ||||
| 	data, err := json.Marshal(a.outStats) | ||||
| 	if err == nil { | ||||
| 		w.Header().Set("Content-Type", "application/json") | ||||
| 		io.WriteString(w, string(data)) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // Close finishes / stops the metric cache | ||||
| func (a *statsApi) Close() { | ||||
| 	cclog.ComponentDebug(a.name, "CLOSE") | ||||
| 	a.indone <- true | ||||
| 	a.outdone <- true | ||||
| 	a.server.Shutdown(context.Background()) | ||||
| 	// wait for close of channel r.done | ||||
| 	<-a.indone | ||||
| 	<-a.outdone | ||||
| 	a.statsWg.Wait() | ||||
| 	a.wg.Done() | ||||
|  | ||||
| 	//a.wg.Wait() | ||||
| } | ||||
|  | ||||
| func NewStatsApi(ticker mct.MultiChanTicker, wg *sync.WaitGroup, statsApiConfigfile string) (StatsApi, error) { | ||||
| 	a := new(statsApi) | ||||
| 	a.name = "StatsApi" | ||||
| 	a.config.Host = "localhost" | ||||
| 	a.config.Port = "8080" | ||||
| 	configFile, err := os.Open(statsApiConfigfile) | ||||
| 	if err != nil { | ||||
| 		cclog.ComponentError(a.name, err.Error()) | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	defer configFile.Close() | ||||
| 	jsonParser := json.NewDecoder(configFile) | ||||
| 	err = jsonParser.Decode(&a.config) | ||||
| 	if err != nil { | ||||
| 		cclog.ComponentError(a.name, err.Error()) | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	a.input = make(chan lp.CCMetric) | ||||
| 	a.ticker = ticker | ||||
| 	a.tickchan = make(chan time.Time) | ||||
| 	a.wg = wg | ||||
| 	a.indone = make(chan bool) | ||||
| 	a.outdone = make(chan bool) | ||||
| 	a.router = mux.NewRouter() | ||||
| 	a.baseurl = fmt.Sprintf("%s:%s", a.config.Host, a.config.Port) | ||||
| 	a.server = &http.Server{Addr: a.baseurl, Handler: a.router} | ||||
| 	if a.config.PublishCollectorState { | ||||
| 		a.router.HandleFunc("/", a.StatsFunc) | ||||
| 	} | ||||
| 	statsApiServer = a | ||||
| 	return a, nil | ||||
| } | ||||
|  | ||||
| func ComponentStatInt(component string, key string, value int64) { | ||||
| 	if statsApiServer == nil { | ||||
| 		return | ||||
| 	} | ||||
| 	y, err := lp.New("_stats", map[string]string{}, map[string]string{"source": component}, map[string]interface{}{key: value}, time.Now()) | ||||
| 	if err == nil { | ||||
| 		statsApiServer.input <- y | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func ComponentStatString(component string, key string, value int64) { | ||||
| 	if statsApiServer == nil { | ||||
| 		return | ||||
| 	} | ||||
| 	y, err := lp.New("_stats", map[string]string{}, map[string]string{"source": component}, map[string]interface{}{key: value}, time.Now()) | ||||
| 	if err == nil { | ||||
| 		statsApiServer.input <- y | ||||
| 	} | ||||
| } | ||||
| @@ -54,6 +54,12 @@ type metricRouter struct { | ||||
| 	cache             MetricCache         // pointer to MetricCache | ||||
| 	cachewg           sync.WaitGroup      // wait group for MetricCache | ||||
| 	maxForward        int                 // number of metrics to forward maximally in one iteration | ||||
| 	statsCollForward  int64 | ||||
| 	statsRecvForward  int64 | ||||
| 	statsCacheForward int64 | ||||
| 	statsTotalForward int64 | ||||
| 	statsDropped      int64 | ||||
| 	statsRenamed      int64 | ||||
| } | ||||
|  | ||||
| // MetricRouter access functions | ||||
| @@ -121,6 +127,12 @@ func (r *metricRouter) Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, rout | ||||
| 	for _, mname := range r.config.DropMetrics { | ||||
| 		r.config.dropMetrics[mname] = true | ||||
| 	} | ||||
| 	r.statsCollForward = 0 | ||||
| 	r.statsRecvForward = 0 | ||||
| 	r.statsCacheForward = 0 | ||||
| 	r.statsTotalForward = 0 | ||||
| 	r.statsDropped = 0 | ||||
| 	r.statsRenamed = 0 | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| @@ -140,6 +152,7 @@ func (r *metricRouter) StartTimer() { | ||||
| 				cclog.ComponentDebug("MetricRouter", "TIMER DONE") | ||||
| 				return | ||||
| 			case t := <-m: | ||||
| 				cclog.ComponentDebug("MetricRouter", "INTERVAL_TICK", t.Unix()) | ||||
| 				r.timestamp = t | ||||
| 			} | ||||
| 		} | ||||
| @@ -253,6 +266,8 @@ func (r *metricRouter) Start() { | ||||
| 		r.DoDelTags(point) | ||||
| 		name := point.Name() | ||||
| 		if new, ok := r.config.RenameMetrics[name]; ok { | ||||
| 			r.statsRenamed++ | ||||
| 			ComponentStatInt("MetricRouter", "renamed", r.statsRenamed) | ||||
| 			point.SetName(new) | ||||
| 			point.AddMeta("oldname", name) | ||||
| 		} | ||||
| @@ -272,7 +287,14 @@ func (r *metricRouter) Start() { | ||||
| 			p.SetTime(r.timestamp) | ||||
| 		} | ||||
| 		if !r.dropMetric(p) { | ||||
| 			r.statsCollForward++ | ||||
| 			r.statsTotalForward++ | ||||
| 			ComponentStatInt("MetricRouter", "collector_forward", r.statsCollForward) | ||||
| 			ComponentStatInt("MetricRouter", "total_forward", r.statsTotalForward) | ||||
| 			forward(p) | ||||
| 		} else { | ||||
| 			r.statsDropped++ | ||||
| 			ComponentStatInt("MetricRouter", "dropped", r.statsDropped) | ||||
| 		} | ||||
| 		// even if the metric is dropped, it is stored in the cache for | ||||
| 		// aggregations | ||||
| @@ -288,7 +310,14 @@ func (r *metricRouter) Start() { | ||||
| 			p.SetTime(r.timestamp) | ||||
| 		} | ||||
| 		if !r.dropMetric(p) { | ||||
| 			r.statsRecvForward++ | ||||
| 			r.statsTotalForward++ | ||||
| 			ComponentStatInt("MetricRouter", "receiver_forward", r.statsRecvForward) | ||||
| 			ComponentStatInt("MetricRouter", "total_forward", r.statsTotalForward) | ||||
| 			forward(p) | ||||
| 		} else { | ||||
| 			r.statsDropped++ | ||||
| 			ComponentStatInt("MetricRouter", "dropped", r.statsDropped) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| @@ -297,7 +326,14 @@ func (r *metricRouter) Start() { | ||||
| 		// receive from metric collector | ||||
| 		if !r.dropMetric(p) { | ||||
| 			p.AddTag(r.config.HostnameTagName, r.hostname) | ||||
| 			r.statsCacheForward++ | ||||
| 			r.statsTotalForward++ | ||||
| 			ComponentStatInt("MetricRouter", "cache_forward", r.statsCacheForward) | ||||
| 			ComponentStatInt("MetricRouter", "total_forward", r.statsTotalForward) | ||||
| 			forward(p) | ||||
| 		} else { | ||||
| 			r.statsDropped++ | ||||
| 			ComponentStatInt("MetricRouter", "dropped", r.statsDropped) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
|   | ||||
| @@ -11,6 +11,7 @@ import ( | ||||
|  | ||||
| 	cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" | ||||
| 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | ||||
| 	stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" | ||||
| ) | ||||
|  | ||||
| const GMETRIC_EXEC = `gmetric` | ||||
| @@ -32,6 +33,7 @@ type GangliaSink struct { | ||||
| 	gmetric_path     string | ||||
| 	gmetric_config   string | ||||
| 	config           GangliaSinkConfig | ||||
| 	statsSentMetrics int64 | ||||
| } | ||||
|  | ||||
| func (s *GangliaSink) Write(point lp.CCMetric) error { | ||||
| @@ -78,6 +80,8 @@ func (s *GangliaSink) Write(point lp.CCMetric) error { | ||||
| 	command := exec.Command(s.gmetric_path, argstr...) | ||||
| 	command.Wait() | ||||
| 	_, err = command.Output() | ||||
| 	s.statsSentMetrics++ | ||||
| 	stats.ComponentStatInt(s.name, "sent_metrics", s.statsSentMetrics) | ||||
| 	return err | ||||
| } | ||||
|  | ||||
| @@ -120,5 +124,6 @@ func NewGangliaSink(name string, config json.RawMessage) (Sink, error) { | ||||
| 	if len(s.config.GmetricConfig) > 0 { | ||||
| 		s.gmetric_config = s.config.GmetricConfig | ||||
| 	} | ||||
| 	s.statsSentMetrics = 0 | ||||
| 	return s, nil | ||||
| } | ||||
|   | ||||
| @@ -11,6 +11,7 @@ import ( | ||||
|  | ||||
| 	cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" | ||||
| 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | ||||
| 	stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" | ||||
| 	influx "github.com/influxdata/line-protocol" | ||||
| ) | ||||
|  | ||||
| @@ -36,6 +37,8 @@ type HttpSink struct { | ||||
| 	idleConnTimeout time.Duration | ||||
| 	timeout         time.Duration | ||||
| 	flushDelay      time.Duration | ||||
| 	statsProcessed  int64 | ||||
| 	statsFlushes    int64 | ||||
| } | ||||
|  | ||||
| func (s *HttpSink) Write(m lp.CCMetric) error { | ||||
| @@ -63,6 +66,8 @@ func (s *HttpSink) Write(m lp.CCMetric) error { | ||||
| 		cclog.ComponentError(s.name, "encoding failed:", err.Error()) | ||||
| 		return err | ||||
| 	} | ||||
| 	s.statsProcessed++ | ||||
| 	stats.ComponentStatInt(s.name, "processed_metrics", s.statsProcessed) | ||||
|  | ||||
| 	// Flush synchronously if "flush_delay" is zero | ||||
| 	if s.flushDelay == 0 { | ||||
| @@ -112,6 +117,8 @@ func (s *HttpSink) Flush() error { | ||||
| 		cclog.ComponentError(s.name, "application error:", err.Error()) | ||||
| 		return err | ||||
| 	} | ||||
| 	s.statsFlushes++ | ||||
| 	stats.ComponentStatInt(s.name, "flushes", s.statsFlushes) | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
| @@ -177,5 +184,7 @@ func NewHttpSink(name string, config json.RawMessage) (Sink, error) { | ||||
| 	s.buffer = &bytes.Buffer{} | ||||
| 	s.encoder = influx.NewEncoder(s.buffer) | ||||
| 	s.encoder.SetPrecision(time.Second) | ||||
| 	s.statsFlushes = 0 | ||||
| 	s.statsProcessed = 0 | ||||
| 	return s, nil | ||||
| } | ||||
|   | ||||
| @@ -10,6 +10,7 @@ import ( | ||||
|  | ||||
| 	cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" | ||||
| 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | ||||
| 	stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" | ||||
| 	influxdb2 "github.com/influxdata/influxdb-client-go/v2" | ||||
| 	influxdb2Api "github.com/influxdata/influxdb-client-go/v2/api" | ||||
| ) | ||||
| @@ -42,6 +43,9 @@ type InfluxAsyncSink struct { | ||||
| 	config              InfluxAsyncSinkConfig | ||||
| 	influxRetryInterval uint | ||||
| 	influxMaxRetryTime  uint | ||||
| 	sentMetrics         int64 | ||||
| 	statsFlushes        int64 | ||||
| 	statsErrors         int64 | ||||
| } | ||||
|  | ||||
| func (s *InfluxAsyncSink) connect() error { | ||||
| @@ -105,11 +109,15 @@ func (s *InfluxAsyncSink) Write(m lp.CCMetric) error { | ||||
| 	s.writeApi.WritePoint( | ||||
| 		m.ToPoint(s.meta_as_tags), | ||||
| 	) | ||||
| 	s.sentMetrics++ | ||||
| 	stats.ComponentStatInt(s.name, "send_metrics", s.sentMetrics) | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (s *InfluxAsyncSink) Flush() error { | ||||
| 	s.writeApi.Flush() | ||||
| 	s.statsFlushes++ | ||||
| 	stats.ComponentStatInt(s.name, "flushes", s.statsFlushes) | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| @@ -189,12 +197,17 @@ func NewInfluxAsyncSink(name string, config json.RawMessage) (Sink, error) { | ||||
| 	} | ||||
|  | ||||
| 	// Start background: Read from error channel | ||||
| 	s.statsErrors = 0 | ||||
| 	s.errors = s.writeApi.Errors() | ||||
| 	go func() { | ||||
| 		for err := range s.errors { | ||||
| 			s.statsErrors++ | ||||
| 			stats.ComponentStatInt(s.name, "errors", s.statsErrors) | ||||
| 			cclog.ComponentError(s.name, err.Error()) | ||||
| 		} | ||||
| 	}() | ||||
|  | ||||
| 	s.sentMetrics = 0 | ||||
| 	s.statsFlushes = 0 | ||||
| 	return s, nil | ||||
| } | ||||
|   | ||||
| @@ -11,6 +11,7 @@ import ( | ||||
|  | ||||
| 	cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" | ||||
| 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | ||||
| 	stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" | ||||
| 	influxdb2 "github.com/influxdata/influxdb-client-go/v2" | ||||
| 	influxdb2Api "github.com/influxdata/influxdb-client-go/v2/api" | ||||
| 	"github.com/influxdata/influxdb-client-go/v2/api/write" | ||||
| @@ -46,6 +47,8 @@ type InfluxSink struct { | ||||
| 	flushTimer            *time.Timer | ||||
| 	flushDelay            time.Duration | ||||
| 	lock                  sync.Mutex // Flush() runs in another goroutine, so this lock has to protect the buffer | ||||
| 	statsSentMetrics      int64 | ||||
| 	statsProcessedMetrics int64 | ||||
| 	//influxMaxRetryDelay uint | ||||
| } | ||||
|  | ||||
| @@ -123,8 +126,10 @@ func (s *InfluxSink) Write(m lp.CCMetric) error { | ||||
| 	} | ||||
| 	p := m.ToPoint(s.meta_as_tags) | ||||
| 	s.lock.Lock() | ||||
| 	s.statsProcessedMetrics++ | ||||
| 	s.batch = append(s.batch, p) | ||||
| 	s.lock.Unlock() | ||||
| 	stats.ComponentStatInt(s.name, "processed_metrics", s.statsProcessedMetrics) | ||||
|  | ||||
| 	// Flush synchronously if "flush_delay" is zero | ||||
| 	if s.flushDelay == 0 { | ||||
| @@ -145,6 +150,8 @@ func (s *InfluxSink) Flush() error { | ||||
| 		cclog.ComponentError(s.name, "flush failed:", err.Error()) | ||||
| 		return err | ||||
| 	} | ||||
| 	s.statsSentMetrics += int64(len(s.batch)) | ||||
| 	stats.ComponentStatInt(s.name, "sent_metrics", s.statsSentMetrics) | ||||
| 	s.batch = s.batch[:0] | ||||
| 	return nil | ||||
| } | ||||
| @@ -211,5 +218,7 @@ func NewInfluxSink(name string, config json.RawMessage) (Sink, error) { | ||||
| 	if err := s.connect(); err != nil { | ||||
| 		return nil, fmt.Errorf("unable to connect: %v", err) | ||||
| 	} | ||||
| 	s.statsSentMetrics = 0 | ||||
| 	s.statsProcessedMetrics = 0 | ||||
| 	return s, nil | ||||
| } | ||||
|   | ||||
| @@ -73,6 +73,7 @@ import ( | ||||
|  | ||||
| 	cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" | ||||
| 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | ||||
| 	stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" | ||||
| 	"github.com/NVIDIA/go-nvml/pkg/dl" | ||||
| ) | ||||
|  | ||||
| @@ -107,6 +108,7 @@ type LibgangliaSink struct { | ||||
| 	gmond_config     C.Ganglia_gmond_config | ||||
| 	send_channels    C.Ganglia_udp_send_channels | ||||
| 	cstrCache        map[string]*C.char | ||||
| 	statsSentMetrics int64 | ||||
| } | ||||
|  | ||||
| func (s *LibgangliaSink) Write(point lp.CCMetric) error { | ||||
| @@ -202,6 +204,8 @@ func (s *LibgangliaSink) Write(point lp.CCMetric) error { | ||||
| 	C.Ganglia_metric_destroy(gmetric) | ||||
| 	// Free the value C string, the only one not stored in the cache | ||||
| 	C.free(unsafe.Pointer(c_value)) | ||||
| 	s.statsSentMetrics++ | ||||
| 	stats.ComponentStatInt(s.name, "sent_metrics", s.statsSentMetrics) | ||||
| 	return err | ||||
| } | ||||
|  | ||||
| @@ -247,7 +251,7 @@ func NewLibgangliaSink(name string, config json.RawMessage) (Sink, error) { | ||||
| 	if err != nil { | ||||
| 		return nil, fmt.Errorf("error opening %s: %v", s.config.GangliaLib, err) | ||||
| 	} | ||||
|  | ||||
| 	s.statsSentMetrics = 0 | ||||
| 	// Set up cache for the C strings | ||||
| 	s.cstrCache = make(map[string]*C.char) | ||||
| 	// s.cstrCache["globals"] = C.CString("globals") | ||||
|   | ||||
| @@ -11,6 +11,7 @@ import ( | ||||
|  | ||||
| 	cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" | ||||
| 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | ||||
| 	stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" | ||||
| 	"github.com/gorilla/mux" | ||||
| 	"github.com/prometheus/client_golang/prometheus" | ||||
| 	"github.com/prometheus/client_golang/prometheus/promhttp" | ||||
| @@ -34,6 +35,7 @@ type PrometheusSink struct { | ||||
| 	nodeMetrics      map[string]prometheus.Gauge | ||||
| 	promWg           sync.WaitGroup | ||||
| 	promServer       *http.Server | ||||
| 	statsSentMetrics int64 | ||||
| } | ||||
|  | ||||
| func intToFloat64(input interface{}) (float64, error) { | ||||
| @@ -113,6 +115,8 @@ func (s *PrometheusSink) newMetric(metric lp.CCMetric) error { | ||||
| 		s.nodeMetrics[name] = new | ||||
| 		prometheus.Register(new) | ||||
| 	} | ||||
| 	s.statsSentMetrics++ | ||||
| 	stats.ComponentStatInt(s.name, "sent_metrics", s.statsSentMetrics) | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| @@ -146,6 +150,8 @@ func (s *PrometheusSink) updateMetric(metric lp.CCMetric) error { | ||||
| 		} | ||||
| 		s.nodeMetrics[name].Set(value) | ||||
| 	} | ||||
| 	s.statsSentMetrics++ | ||||
| 	stats.ComponentStatInt(s.name, "sent_metrics", s.statsSentMetrics) | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
|   | ||||
| @@ -7,6 +7,7 @@ import ( | ||||
|  | ||||
| 	cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" | ||||
| 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | ||||
| 	stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" | ||||
| ) | ||||
|  | ||||
| type SampleSinkConfig struct { | ||||
| @@ -14,12 +15,15 @@ type SampleSinkConfig struct { | ||||
| 	// See: metricSink.go | ||||
| 	defaultSinkConfig | ||||
| 	// Additional config options, for SampleSink | ||||
|  | ||||
| } | ||||
|  | ||||
| type SampleSink struct { | ||||
| 	// declares elements 	'name' and 'meta_as_tags' (string to bool map!) | ||||
| 	sink | ||||
| 	config SampleSinkConfig // entry point to the SampleSinkConfig | ||||
| 	// Stats counters | ||||
| 	statsSentMetrics int64 | ||||
| } | ||||
|  | ||||
| // Implement functions required for Sink interface | ||||
| @@ -30,6 +34,8 @@ type SampleSink struct { | ||||
| func (s *SampleSink) Write(point lp.CCMetric) error { | ||||
| 	// based on s.meta_as_tags use meta infos as tags | ||||
| 	log.Print(point) | ||||
| 	s.statsSentMetrics++ | ||||
| 	stats.ComponentStatInt(s.name, "sent_metrics", s.statsSentMetrics) | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| @@ -63,6 +69,9 @@ func NewSampleSink(name string, config json.RawMessage) (Sink, error) { | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	// Initalize stats counters | ||||
| 	s.statsSentMetrics = 0 | ||||
|  | ||||
| 	// Create lookup map to use meta infos as tags in the output metric | ||||
| 	s.meta_as_tags = make(map[string]bool) | ||||
| 	for _, k := range s.config.MetaAsTags { | ||||
|   | ||||
| @@ -102,13 +102,19 @@ func (sm *sinkManager) Start() { | ||||
| 		} | ||||
|  | ||||
| 		toTheSinks := func(p lp.CCMetric) { | ||||
| 			var wg sync.WaitGroup | ||||
| 			// Send received metric to all outputs | ||||
| 			cclog.ComponentDebug("SinkManager", "WRITE", p) | ||||
| 			for _, s := range sm.sinks { | ||||
| 				wg.Add(1) | ||||
| 				go func(s Sink) { | ||||
| 					if err := s.Write(p); err != nil { | ||||
| 						cclog.ComponentError("SinkManager", "WRITE", s.Name(), "write failed:", err.Error()) | ||||
| 					} | ||||
| 					wg.Done() | ||||
| 				}(s) | ||||
| 			} | ||||
| 			wg.Wait() | ||||
| 		} | ||||
|  | ||||
| 		for { | ||||
|   | ||||
| @@ -8,6 +8,7 @@ import ( | ||||
|  | ||||
| 	//	"time" | ||||
| 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | ||||
| 	stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" | ||||
| ) | ||||
|  | ||||
| type StdoutSink struct { | ||||
| @@ -17,6 +18,7 @@ type StdoutSink struct { | ||||
| 		defaultSinkConfig | ||||
| 		Output string `json:"output_file,omitempty"` | ||||
| 	} | ||||
| 	sentMetrics int64 | ||||
| } | ||||
|  | ||||
| func (s *StdoutSink) Write(m lp.CCMetric) error { | ||||
| @@ -24,6 +26,8 @@ func (s *StdoutSink) Write(m lp.CCMetric) error { | ||||
| 		s.output, | ||||
| 		m.ToLineProtocol(s.meta_as_tags), | ||||
| 	) | ||||
| 	s.sentMetrics++ | ||||
| 	stats.ComponentStatInt(s.name, "sent_metrics", s.sentMetrics) | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| @@ -68,6 +72,7 @@ func NewStdoutSink(name string, config json.RawMessage) (Sink, error) { | ||||
| 	for _, k := range s.config.MetaAsTags { | ||||
| 		s.meta_as_tags[k] = true | ||||
| 	} | ||||
| 	s.sentMetrics = 0 | ||||
|  | ||||
| 	return s, nil | ||||
| } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user