mirror of
				https://github.com/ClusterCockpit/cc-metric-collector.git
				synced 2025-11-04 10:45:06 +01:00 
			
		
		
		
	Make maxForward configurable, save old name in meta in rename metrics and make the hostname tag key configurable
This commit is contained in:
		@@ -25,6 +25,7 @@ type metricRouterTagConfig struct {
 | 
			
		||||
 | 
			
		||||
// Metric router configuration
 | 
			
		||||
type metricRouterConfig struct {
 | 
			
		||||
	HostnameTagName   string                               `json:"hostname_tag"`        // Key name used when adding the hostname to a metric (default 'hostname')
 | 
			
		||||
	AddTags           []metricRouterTagConfig              `json:"add_tags"`            // List of tags that are added when the condition is met
 | 
			
		||||
	DelTags           []metricRouterTagConfig              `json:"delete_tags"`         // List of tags that are removed when the condition is met
 | 
			
		||||
	IntervalAgg       []agg.MetricAggregatorIntervalConfig `json:"interval_aggregates"` // List of aggregation function processed at the end of an interval
 | 
			
		||||
@@ -33,6 +34,7 @@ type metricRouterConfig struct {
 | 
			
		||||
	RenameMetrics     map[string]string                    `json:"rename_metrics"`      // Map to rename metric name from key to value
 | 
			
		||||
	IntervalStamp     bool                                 `json:"interval_timestamp"`  // Update timestamp periodically by ticker each interval?
 | 
			
		||||
	NumCacheIntervals int                                  `json:"num_cache_intervals"` // Number of intervals of cached metrics for evaluation
 | 
			
		||||
	MaxForward        int                                  `json:"max_forward"`         // Number of maximal forwarded metrics at one select
 | 
			
		||||
	dropMetrics       map[string]bool                      // Internal map for O(1) lookup
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -76,7 +78,8 @@ func (r *metricRouter) Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, rout
 | 
			
		||||
	r.cache_input = make(chan lp.CCMetric)
 | 
			
		||||
	r.wg = wg
 | 
			
		||||
	r.ticker = ticker
 | 
			
		||||
	r.maxForward = ROUTER_MAX_FORWARD
 | 
			
		||||
	r.config.MaxForward = ROUTER_MAX_FORWARD
 | 
			
		||||
	r.config.HostnameTagName = "hostname"
 | 
			
		||||
 | 
			
		||||
	// Set hostname
 | 
			
		||||
	hostname, err := os.Hostname()
 | 
			
		||||
@@ -100,6 +103,7 @@ func (r *metricRouter) Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, rout
 | 
			
		||||
		cclog.ComponentError("MetricRouter", err.Error())
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	r.maxForward = r.config.MaxForward
 | 
			
		||||
	if r.config.NumCacheIntervals > 0 {
 | 
			
		||||
		r.cache, err = NewCache(r.cache_input, r.ticker, &r.cachewg, r.config.NumCacheIntervals)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
@@ -244,8 +248,10 @@ func (r *metricRouter) Start() {
 | 
			
		||||
		cclog.ComponentDebug("MetricRouter", "FORWARD", point)
 | 
			
		||||
		r.DoAddTags(point)
 | 
			
		||||
		r.DoDelTags(point)
 | 
			
		||||
		if new, ok := r.config.RenameMetrics[point.Name()]; ok {
 | 
			
		||||
		name := point.Name()
 | 
			
		||||
		if new, ok := r.config.RenameMetrics[name]; ok {
 | 
			
		||||
			point.SetName(new)
 | 
			
		||||
			point.AddMeta("oldname", name)
 | 
			
		||||
		}
 | 
			
		||||
		r.DoAddTags(point)
 | 
			
		||||
		r.DoDelTags(point)
 | 
			
		||||
@@ -258,7 +264,7 @@ func (r *metricRouter) Start() {
 | 
			
		||||
	// Foward message received from collector channel
 | 
			
		||||
	coll_forward := func(p lp.CCMetric) {
 | 
			
		||||
		// receive from metric collector
 | 
			
		||||
		p.AddTag("hostname", r.hostname)
 | 
			
		||||
		p.AddTag(r.config.HostnameTagName, r.hostname)
 | 
			
		||||
		if r.config.IntervalStamp {
 | 
			
		||||
			p.SetTime(r.timestamp)
 | 
			
		||||
		}
 | 
			
		||||
@@ -287,7 +293,7 @@ func (r *metricRouter) Start() {
 | 
			
		||||
	cache_forward := func(p lp.CCMetric) {
 | 
			
		||||
		// receive from metric collector
 | 
			
		||||
		if !r.dropMetric(p) {
 | 
			
		||||
			p.AddTag("hostname", r.hostname)
 | 
			
		||||
			p.AddTag(r.config.HostnameTagName, r.hostname)
 | 
			
		||||
			forward(p)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
@@ -309,19 +315,19 @@ func (r *metricRouter) Start() {
 | 
			
		||||
 | 
			
		||||
			case p := <-r.coll_input:
 | 
			
		||||
				coll_forward(p)
 | 
			
		||||
				for i := 0; len(r.coll_input) > 0 && i < r.maxForward; i++ {
 | 
			
		||||
				for i := 0; len(r.coll_input) > 0 && i < (r.maxForward-1); i++ {
 | 
			
		||||
					coll_forward(<-r.coll_input)
 | 
			
		||||
				}
 | 
			
		||||
 | 
			
		||||
			case p := <-r.recv_input:
 | 
			
		||||
				recv_forward(p)
 | 
			
		||||
				for i := 0; len(r.recv_input) > 0 && i < r.maxForward; i++ {
 | 
			
		||||
				for i := 0; len(r.recv_input) > 0 && i < (r.maxForward-1); i++ {
 | 
			
		||||
					recv_forward(<-r.recv_input)
 | 
			
		||||
				}
 | 
			
		||||
 | 
			
		||||
			case p := <-r.cache_input:
 | 
			
		||||
				cache_forward(p)
 | 
			
		||||
				for i := 0; len(r.cache_input) > 0 && i < r.maxForward; i++ {
 | 
			
		||||
				for i := 0; len(r.cache_input) > 0 && i < (r.maxForward-1); i++ {
 | 
			
		||||
					cache_forward(<-r.cache_input)
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user