mirror of
				https://github.com/ClusterCockpit/cc-metric-collector.git
				synced 2025-11-04 02:35:07 +01:00 
			
		
		
		
	Ccmessage migration (#119)
* Add cpu_used (all-cpu_idle) to CpustatCollector * Update cc-metric-collector.init * Allow selection of timestamp precision in HttpSink * Add comment about precision requirement for cc-metric-store * Fix for API changes in gofish@v0.15.0 * Update requirements to latest version * Read sensors through redfish * Update golang toolchain to 1.21 * Remove stray error check * Update main config in configuration.md * Update Release action to use golang 1.22 stable release, no golang RPMs anymore * Update runonce action to use golang 1.22 stable release, no golang RPMs anymore * Switch to CCMessage for all files. --------- Co-authored-by: Holger Obermaier <Holger.Obermaier@kit.edu> Co-authored-by: Holger Obermaier <40787752+ho-ob@users.noreply.github.com>
This commit is contained in:
		@@ -11,7 +11,7 @@ import (
 | 
			
		||||
 | 
			
		||||
	cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
 | 
			
		||||
 | 
			
		||||
	lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
 | 
			
		||||
	lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message"
 | 
			
		||||
	topo "github.com/ClusterCockpit/cc-metric-collector/pkg/ccTopology"
 | 
			
		||||
 | 
			
		||||
	"github.com/PaesslerAG/gval"
 | 
			
		||||
@@ -31,14 +31,14 @@ type metricAggregator struct {
 | 
			
		||||
	functions []*MetricAggregatorIntervalConfig
 | 
			
		||||
	constants map[string]interface{}
 | 
			
		||||
	language  gval.Language
 | 
			
		||||
	output    chan lp.CCMetric
 | 
			
		||||
	output    chan lp.CCMessage
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type MetricAggregator interface {
 | 
			
		||||
	AddAggregation(name, function, condition string, tags, meta map[string]string) error
 | 
			
		||||
	DeleteAggregation(name string) error
 | 
			
		||||
	Init(output chan lp.CCMetric) error
 | 
			
		||||
	Eval(starttime time.Time, endtime time.Time, metrics []lp.CCMetric)
 | 
			
		||||
	Init(output chan lp.CCMessage) error
 | 
			
		||||
	Eval(starttime time.Time, endtime time.Time, metrics []lp.CCMessage)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
var metricCacheLanguage = gval.NewLanguage(
 | 
			
		||||
@@ -74,7 +74,7 @@ var evaluables = struct {
 | 
			
		||||
	mapping: make(map[string]gval.Evaluable),
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *metricAggregator) Init(output chan lp.CCMetric) error {
 | 
			
		||||
func (c *metricAggregator) Init(output chan lp.CCMessage) error {
 | 
			
		||||
	c.output = output
 | 
			
		||||
	c.functions = make([]*MetricAggregatorIntervalConfig, 0)
 | 
			
		||||
	c.constants = make(map[string]interface{})
 | 
			
		||||
@@ -112,7 +112,7 @@ func (c *metricAggregator) Init(output chan lp.CCMetric) error {
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *metricAggregator) Eval(starttime time.Time, endtime time.Time, metrics []lp.CCMetric) {
 | 
			
		||||
func (c *metricAggregator) Eval(starttime time.Time, endtime time.Time, metrics []lp.CCMessage) {
 | 
			
		||||
	vars := make(map[string]interface{})
 | 
			
		||||
	for k, v := range c.constants {
 | 
			
		||||
		vars[k] = v
 | 
			
		||||
@@ -127,7 +127,7 @@ func (c *metricAggregator) Eval(starttime time.Time, endtime time.Time, metrics
 | 
			
		||||
		var valuesInt32 []int32
 | 
			
		||||
		var valuesInt64 []int64
 | 
			
		||||
		var valuesBool []bool
 | 
			
		||||
		matches := make([]lp.CCMetric, 0)
 | 
			
		||||
		matches := make([]lp.CCMessage, 0)
 | 
			
		||||
		for _, m := range metrics {
 | 
			
		||||
			vars["metric"] = m
 | 
			
		||||
			//value, err := gval.Evaluate(f.Condition, vars, c.language)
 | 
			
		||||
@@ -216,7 +216,7 @@ func (c *metricAggregator) Eval(starttime time.Time, endtime time.Time, metrics
 | 
			
		||||
				break
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			copy_tags := func(tags map[string]string, metrics []lp.CCMetric) map[string]string {
 | 
			
		||||
			copy_tags := func(tags map[string]string, metrics []lp.CCMessage) map[string]string {
 | 
			
		||||
				out := make(map[string]string)
 | 
			
		||||
				for key, value := range tags {
 | 
			
		||||
					switch value {
 | 
			
		||||
@@ -233,7 +233,7 @@ func (c *metricAggregator) Eval(starttime time.Time, endtime time.Time, metrics
 | 
			
		||||
				}
 | 
			
		||||
				return out
 | 
			
		||||
			}
 | 
			
		||||
			copy_meta := func(meta map[string]string, metrics []lp.CCMetric) map[string]string {
 | 
			
		||||
			copy_meta := func(meta map[string]string, metrics []lp.CCMessage) map[string]string {
 | 
			
		||||
				out := make(map[string]string)
 | 
			
		||||
				for key, value := range meta {
 | 
			
		||||
					switch value {
 | 
			
		||||
@@ -253,18 +253,18 @@ func (c *metricAggregator) Eval(starttime time.Time, endtime time.Time, metrics
 | 
			
		||||
			tags := copy_tags(f.Tags, matches)
 | 
			
		||||
			meta := copy_meta(f.Meta, matches)
 | 
			
		||||
 | 
			
		||||
			var m lp.CCMetric
 | 
			
		||||
			var m lp.CCMessage
 | 
			
		||||
			switch t := value.(type) {
 | 
			
		||||
			case float64:
 | 
			
		||||
				m, err = lp.New(f.Name, tags, meta, map[string]interface{}{"value": t}, starttime)
 | 
			
		||||
				m, err = lp.NewMessage(f.Name, tags, meta, map[string]interface{}{"value": t}, starttime)
 | 
			
		||||
			case float32:
 | 
			
		||||
				m, err = lp.New(f.Name, tags, meta, map[string]interface{}{"value": t}, starttime)
 | 
			
		||||
				m, err = lp.NewMessage(f.Name, tags, meta, map[string]interface{}{"value": t}, starttime)
 | 
			
		||||
			case int:
 | 
			
		||||
				m, err = lp.New(f.Name, tags, meta, map[string]interface{}{"value": t}, starttime)
 | 
			
		||||
				m, err = lp.NewMessage(f.Name, tags, meta, map[string]interface{}{"value": t}, starttime)
 | 
			
		||||
			case int64:
 | 
			
		||||
				m, err = lp.New(f.Name, tags, meta, map[string]interface{}{"value": t}, starttime)
 | 
			
		||||
				m, err = lp.NewMessage(f.Name, tags, meta, map[string]interface{}{"value": t}, starttime)
 | 
			
		||||
			case string:
 | 
			
		||||
				m, err = lp.New(f.Name, tags, meta, map[string]interface{}{"value": t}, starttime)
 | 
			
		||||
				m, err = lp.NewMessage(f.Name, tags, meta, map[string]interface{}{"value": t}, starttime)
 | 
			
		||||
			default:
 | 
			
		||||
				cclog.ComponentError("MetricCache", "Gval returned invalid type", t, "skipping metric", f.Name)
 | 
			
		||||
			}
 | 
			
		||||
@@ -389,7 +389,7 @@ func EvalFloat64Condition(condition string, params map[string]float64) (float64,
 | 
			
		||||
	return value, err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func NewAggregator(output chan lp.CCMetric) (MetricAggregator, error) {
 | 
			
		||||
func NewAggregator(output chan lp.CCMessage) (MetricAggregator, error) {
 | 
			
		||||
	a := new(metricAggregator)
 | 
			
		||||
	err := a.Init(output)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
 
 | 
			
		||||
@@ -7,7 +7,7 @@ import (
 | 
			
		||||
	cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
 | 
			
		||||
 | 
			
		||||
	agg "github.com/ClusterCockpit/cc-metric-collector/internal/metricAggregator"
 | 
			
		||||
	lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
 | 
			
		||||
	lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message"
 | 
			
		||||
	mct "github.com/ClusterCockpit/cc-metric-collector/pkg/multiChanTicker"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
@@ -16,7 +16,7 @@ type metricCachePeriod struct {
 | 
			
		||||
	stopstamp   time.Time
 | 
			
		||||
	numMetrics  int
 | 
			
		||||
	sizeMetrics int
 | 
			
		||||
	metrics     []lp.CCMetric
 | 
			
		||||
	metrics     []lp.CCMessage
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Metric cache data structure
 | 
			
		||||
@@ -29,21 +29,21 @@ type metricCache struct {
 | 
			
		||||
	ticker     mct.MultiChanTicker
 | 
			
		||||
	tickchan   chan time.Time
 | 
			
		||||
	done       chan bool
 | 
			
		||||
	output     chan lp.CCMetric
 | 
			
		||||
	output     chan lp.CCMessage
 | 
			
		||||
	aggEngine  agg.MetricAggregator
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type MetricCache interface {
 | 
			
		||||
	Init(output chan lp.CCMetric, ticker mct.MultiChanTicker, wg *sync.WaitGroup, numPeriods int) error
 | 
			
		||||
	Init(output chan lp.CCMessage, ticker mct.MultiChanTicker, wg *sync.WaitGroup, numPeriods int) error
 | 
			
		||||
	Start()
 | 
			
		||||
	Add(metric lp.CCMetric)
 | 
			
		||||
	GetPeriod(index int) (time.Time, time.Time, []lp.CCMetric)
 | 
			
		||||
	Add(metric lp.CCMessage)
 | 
			
		||||
	GetPeriod(index int) (time.Time, time.Time, []lp.CCMessage)
 | 
			
		||||
	AddAggregation(name, function, condition string, tags, meta map[string]string) error
 | 
			
		||||
	DeleteAggregation(name string) error
 | 
			
		||||
	Close()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *metricCache) Init(output chan lp.CCMetric, ticker mct.MultiChanTicker, wg *sync.WaitGroup, numPeriods int) error {
 | 
			
		||||
func (c *metricCache) Init(output chan lp.CCMessage, ticker mct.MultiChanTicker, wg *sync.WaitGroup, numPeriods int) error {
 | 
			
		||||
	var err error = nil
 | 
			
		||||
	c.done = make(chan bool)
 | 
			
		||||
	c.wg = wg
 | 
			
		||||
@@ -55,7 +55,7 @@ func (c *metricCache) Init(output chan lp.CCMetric, ticker mct.MultiChanTicker,
 | 
			
		||||
		p := new(metricCachePeriod)
 | 
			
		||||
		p.numMetrics = 0
 | 
			
		||||
		p.sizeMetrics = 0
 | 
			
		||||
		p.metrics = make([]lp.CCMetric, 0)
 | 
			
		||||
		p.metrics = make([]lp.CCMessage, 0)
 | 
			
		||||
		c.intervals = append(c.intervals, p)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@@ -124,7 +124,7 @@ func (c *metricCache) Start() {
 | 
			
		||||
// Add a metric to the cache. The interval is defined by the global timer (rotate() in Start())
 | 
			
		||||
// The intervals list is used as round-robin buffer and the metric list grows dynamically and
 | 
			
		||||
// to avoid reallocations
 | 
			
		||||
func (c *metricCache) Add(metric lp.CCMetric) {
 | 
			
		||||
func (c *metricCache) Add(metric lp.CCMessage) {
 | 
			
		||||
	if c.curPeriod >= 0 && c.curPeriod < c.numPeriods {
 | 
			
		||||
		c.lock.Lock()
 | 
			
		||||
		p := c.intervals[c.curPeriod]
 | 
			
		||||
@@ -153,10 +153,10 @@ func (c *metricCache) DeleteAggregation(name string) error {
 | 
			
		||||
// Get all metrics of a interval. The index is the difference to the current interval, so index=0
 | 
			
		||||
// is the current one, index=1 the last interval and so on. Returns and empty array if a wrong index
 | 
			
		||||
// is given (negative index, index larger than configured number of total intervals, ...)
 | 
			
		||||
func (c *metricCache) GetPeriod(index int) (time.Time, time.Time, []lp.CCMetric) {
 | 
			
		||||
func (c *metricCache) GetPeriod(index int) (time.Time, time.Time, []lp.CCMessage) {
 | 
			
		||||
	var start time.Time = time.Now()
 | 
			
		||||
	var stop time.Time = time.Now()
 | 
			
		||||
	var metrics []lp.CCMetric
 | 
			
		||||
	var metrics []lp.CCMessage
 | 
			
		||||
	if index >= 0 && index < c.numPeriods {
 | 
			
		||||
		pindex := c.curPeriod - index
 | 
			
		||||
		if pindex < 0 {
 | 
			
		||||
@@ -168,10 +168,10 @@ func (c *metricCache) GetPeriod(index int) (time.Time, time.Time, []lp.CCMetric)
 | 
			
		||||
			metrics = c.intervals[pindex].metrics
 | 
			
		||||
			//return c.intervals[pindex].startstamp, c.intervals[pindex].stopstamp, c.intervals[pindex].metrics
 | 
			
		||||
		} else {
 | 
			
		||||
			metrics = make([]lp.CCMetric, 0)
 | 
			
		||||
			metrics = make([]lp.CCMessage, 0)
 | 
			
		||||
		}
 | 
			
		||||
	} else {
 | 
			
		||||
		metrics = make([]lp.CCMetric, 0)
 | 
			
		||||
		metrics = make([]lp.CCMessage, 0)
 | 
			
		||||
	}
 | 
			
		||||
	return start, stop, metrics
 | 
			
		||||
}
 | 
			
		||||
@@ -182,7 +182,7 @@ func (c *metricCache) Close() {
 | 
			
		||||
	c.done <- true
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func NewCache(output chan lp.CCMetric, ticker mct.MultiChanTicker, wg *sync.WaitGroup, numPeriods int) (MetricCache, error) {
 | 
			
		||||
func NewCache(output chan lp.CCMessage, ticker mct.MultiChanTicker, wg *sync.WaitGroup, numPeriods int) (MetricCache, error) {
 | 
			
		||||
	c := new(metricCache)
 | 
			
		||||
	err := c.Init(output, ticker, wg, numPeriods)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
 
 | 
			
		||||
@@ -10,7 +10,7 @@ import (
 | 
			
		||||
	cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
 | 
			
		||||
 | 
			
		||||
	agg "github.com/ClusterCockpit/cc-metric-collector/internal/metricAggregator"
 | 
			
		||||
	lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
 | 
			
		||||
	lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message"
 | 
			
		||||
	mct "github.com/ClusterCockpit/cc-metric-collector/pkg/multiChanTicker"
 | 
			
		||||
	units "github.com/ClusterCockpit/cc-units"
 | 
			
		||||
)
 | 
			
		||||
@@ -44,10 +44,10 @@ type metricRouterConfig struct {
 | 
			
		||||
// Metric router data structure
 | 
			
		||||
type metricRouter struct {
 | 
			
		||||
	hostname    string              // Hostname used in tags
 | 
			
		||||
	coll_input  chan lp.CCMetric    // Input channel from CollectorManager
 | 
			
		||||
	recv_input  chan lp.CCMetric    // Input channel from ReceiveManager
 | 
			
		||||
	cache_input chan lp.CCMetric    // Input channel from MetricCache
 | 
			
		||||
	outputs     []chan lp.CCMetric  // List of all output channels
 | 
			
		||||
	coll_input  chan lp.CCMessage    // Input channel from CollectorManager
 | 
			
		||||
	recv_input  chan lp.CCMessage    // Input channel from ReceiveManager
 | 
			
		||||
	cache_input chan lp.CCMessage    // Input channel from MetricCache
 | 
			
		||||
	outputs     []chan lp.CCMessage  // List of all output channels
 | 
			
		||||
	done        chan bool           // channel to finish / stop metric router
 | 
			
		||||
	wg          *sync.WaitGroup     // wait group for all goroutines in cc-metric-collector
 | 
			
		||||
	timestamp   time.Time           // timestamp periodically updated by ticker each interval
 | 
			
		||||
@@ -61,9 +61,9 @@ type metricRouter struct {
 | 
			
		||||
// MetricRouter access functions
 | 
			
		||||
type MetricRouter interface {
 | 
			
		||||
	Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, routerConfigFile string) error
 | 
			
		||||
	AddCollectorInput(input chan lp.CCMetric)
 | 
			
		||||
	AddReceiverInput(input chan lp.CCMetric)
 | 
			
		||||
	AddOutput(output chan lp.CCMetric)
 | 
			
		||||
	AddCollectorInput(input chan lp.CCMessage)
 | 
			
		||||
	AddReceiverInput(input chan lp.CCMessage)
 | 
			
		||||
	AddOutput(output chan lp.CCMessage)
 | 
			
		||||
	Start()
 | 
			
		||||
	Close()
 | 
			
		||||
}
 | 
			
		||||
@@ -75,9 +75,9 @@ type MetricRouter interface {
 | 
			
		||||
// * ticker (from variable ticker)
 | 
			
		||||
// * configuration (read from config file in variable routerConfigFile)
 | 
			
		||||
func (r *metricRouter) Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, routerConfigFile string) error {
 | 
			
		||||
	r.outputs = make([]chan lp.CCMetric, 0)
 | 
			
		||||
	r.outputs = make([]chan lp.CCMessage, 0)
 | 
			
		||||
	r.done = make(chan bool)
 | 
			
		||||
	r.cache_input = make(chan lp.CCMetric)
 | 
			
		||||
	r.cache_input = make(chan lp.CCMessage)
 | 
			
		||||
	r.wg = wg
 | 
			
		||||
	r.ticker = ticker
 | 
			
		||||
	r.config.MaxForward = ROUTER_MAX_FORWARD
 | 
			
		||||
@@ -126,7 +126,7 @@ func (r *metricRouter) Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, rout
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func getParamMap(point lp.CCMetric) map[string]interface{} {
 | 
			
		||||
func getParamMap(point lp.CCMessage) map[string]interface{} {
 | 
			
		||||
	params := make(map[string]interface{})
 | 
			
		||||
	params["metric"] = point
 | 
			
		||||
	params["name"] = point.Name()
 | 
			
		||||
@@ -144,7 +144,7 @@ func getParamMap(point lp.CCMetric) map[string]interface{} {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// DoAddTags adds a tag when condition is fullfiled
 | 
			
		||||
func (r *metricRouter) DoAddTags(point lp.CCMetric) {
 | 
			
		||||
func (r *metricRouter) DoAddTags(point lp.CCMessage) {
 | 
			
		||||
	var conditionMatches bool
 | 
			
		||||
	for _, m := range r.config.AddTags {
 | 
			
		||||
		if m.Condition == "*" {
 | 
			
		||||
@@ -166,7 +166,7 @@ func (r *metricRouter) DoAddTags(point lp.CCMetric) {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// DoDelTags removes a tag when condition is fullfiled
 | 
			
		||||
func (r *metricRouter) DoDelTags(point lp.CCMetric) {
 | 
			
		||||
func (r *metricRouter) DoDelTags(point lp.CCMessage) {
 | 
			
		||||
	var conditionMatches bool
 | 
			
		||||
	for _, m := range r.config.DelTags {
 | 
			
		||||
		if m.Condition == "*" {
 | 
			
		||||
@@ -188,7 +188,7 @@ func (r *metricRouter) DoDelTags(point lp.CCMetric) {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Conditional test whether a metric should be dropped
 | 
			
		||||
func (r *metricRouter) dropMetric(point lp.CCMetric) bool {
 | 
			
		||||
func (r *metricRouter) dropMetric(point lp.CCMessage) bool {
 | 
			
		||||
	// Simple drop check
 | 
			
		||||
	if conditionMatches, ok := r.config.dropMetrics[point.Name()]; ok {
 | 
			
		||||
		return conditionMatches
 | 
			
		||||
@@ -210,7 +210,7 @@ func (r *metricRouter) dropMetric(point lp.CCMetric) bool {
 | 
			
		||||
	return false
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (r *metricRouter) prepareUnit(point lp.CCMetric) bool {
 | 
			
		||||
func (r *metricRouter) prepareUnit(point lp.CCMessage) bool {
 | 
			
		||||
	if r.config.NormalizeUnits {
 | 
			
		||||
		if in_unit, ok := point.GetMeta("unit"); ok {
 | 
			
		||||
			u := units.NewUnit(in_unit)
 | 
			
		||||
@@ -259,7 +259,7 @@ func (r *metricRouter) Start() {
 | 
			
		||||
 | 
			
		||||
	// Forward takes a received metric, adds or deletes tags
 | 
			
		||||
	// and forwards it to the output channels
 | 
			
		||||
	forward := func(point lp.CCMetric) {
 | 
			
		||||
	forward := func(point lp.CCMessage) {
 | 
			
		||||
		cclog.ComponentDebug("MetricRouter", "FORWARD", point)
 | 
			
		||||
		r.DoAddTags(point)
 | 
			
		||||
		r.DoDelTags(point)
 | 
			
		||||
@@ -279,7 +279,7 @@ func (r *metricRouter) Start() {
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Foward message received from collector channel
 | 
			
		||||
	coll_forward := func(p lp.CCMetric) {
 | 
			
		||||
	coll_forward := func(p lp.CCMessage) {
 | 
			
		||||
		// receive from metric collector
 | 
			
		||||
		p.AddTag(r.config.HostnameTagName, r.hostname)
 | 
			
		||||
		if r.config.IntervalStamp {
 | 
			
		||||
@@ -296,7 +296,7 @@ func (r *metricRouter) Start() {
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Forward message received from receivers channel
 | 
			
		||||
	recv_forward := func(p lp.CCMetric) {
 | 
			
		||||
	recv_forward := func(p lp.CCMessage) {
 | 
			
		||||
		// receive from receive manager
 | 
			
		||||
		if r.config.IntervalStamp {
 | 
			
		||||
			p.SetTime(r.timestamp)
 | 
			
		||||
@@ -307,7 +307,7 @@ func (r *metricRouter) Start() {
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Forward message received from cache channel
 | 
			
		||||
	cache_forward := func(p lp.CCMetric) {
 | 
			
		||||
	cache_forward := func(p lp.CCMessage) {
 | 
			
		||||
		// receive from metric collector
 | 
			
		||||
		if !r.dropMetric(p) {
 | 
			
		||||
			p.AddTag(r.config.HostnameTagName, r.hostname)
 | 
			
		||||
@@ -358,17 +358,17 @@ func (r *metricRouter) Start() {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// AddCollectorInput adds a channel between metric collector and metric router
 | 
			
		||||
func (r *metricRouter) AddCollectorInput(input chan lp.CCMetric) {
 | 
			
		||||
func (r *metricRouter) AddCollectorInput(input chan lp.CCMessage) {
 | 
			
		||||
	r.coll_input = input
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// AddReceiverInput adds a channel between metric receiver and metric router
 | 
			
		||||
func (r *metricRouter) AddReceiverInput(input chan lp.CCMetric) {
 | 
			
		||||
func (r *metricRouter) AddReceiverInput(input chan lp.CCMessage) {
 | 
			
		||||
	r.recv_input = input
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// AddOutput adds a output channel to the metric router
 | 
			
		||||
func (r *metricRouter) AddOutput(output chan lp.CCMetric) {
 | 
			
		||||
func (r *metricRouter) AddOutput(output chan lp.CCMessage) {
 | 
			
		||||
	r.outputs = append(r.outputs, output)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user