mirror of
				https://github.com/ClusterCockpit/cc-metric-collector.git
				synced 2025-11-04 02:35:07 +01:00 
			
		
		
		
	Use message processor in router, all sinks and all receivers
This commit is contained in:
		@@ -2,6 +2,7 @@ package metricRouter
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"encoding/json"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"os"
 | 
			
		||||
	"strings"
 | 
			
		||||
	"sync"
 | 
			
		||||
@@ -9,10 +10,10 @@ import (
 | 
			
		||||
 | 
			
		||||
	cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
 | 
			
		||||
 | 
			
		||||
	agg "github.com/ClusterCockpit/cc-metric-collector/internal/metricAggregator"
 | 
			
		||||
	lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message"
 | 
			
		||||
	agg "github.com/ClusterCockpit/cc-metric-collector/internal/metricAggregator"
 | 
			
		||||
	mp "github.com/ClusterCockpit/cc-metric-collector/pkg/messageProcessor"
 | 
			
		||||
	mct "github.com/ClusterCockpit/cc-metric-collector/pkg/multiChanTicker"
 | 
			
		||||
	units "github.com/ClusterCockpit/cc-units"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
const ROUTER_MAX_FORWARD = 50
 | 
			
		||||
@@ -38,16 +39,17 @@ type metricRouterConfig struct {
 | 
			
		||||
	MaxForward        int                                  `json:"max_forward"`         // Number of maximal forwarded metrics at one select
 | 
			
		||||
	NormalizeUnits    bool                                 `json:"normalize_units"`     // Check unit meta flag and normalize it using cc-units
 | 
			
		||||
	ChangeUnitPrefix  map[string]string                    `json:"change_unit_prefix"`  // Add prefix that should be applied to the metrics
 | 
			
		||||
	dropMetrics       map[string]bool                      // Internal map for O(1) lookup
 | 
			
		||||
	// dropMetrics       map[string]bool                      // Internal map for O(1) lookup
 | 
			
		||||
	MessageProcessor json.RawMessage `json:"process_message,omitempty"`
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Metric router data structure
 | 
			
		||||
type metricRouter struct {
 | 
			
		||||
	hostname    string              // Hostname used in tags
 | 
			
		||||
	coll_input  chan lp.CCMessage    // Input channel from CollectorManager
 | 
			
		||||
	recv_input  chan lp.CCMessage    // Input channel from ReceiveManager
 | 
			
		||||
	cache_input chan lp.CCMessage    // Input channel from MetricCache
 | 
			
		||||
	outputs     []chan lp.CCMessage  // List of all output channels
 | 
			
		||||
	coll_input  chan lp.CCMessage   // Input channel from CollectorManager
 | 
			
		||||
	recv_input  chan lp.CCMessage   // Input channel from ReceiveManager
 | 
			
		||||
	cache_input chan lp.CCMessage   // Input channel from MetricCache
 | 
			
		||||
	outputs     []chan lp.CCMessage // List of all output channels
 | 
			
		||||
	done        chan bool           // channel to finish / stop metric router
 | 
			
		||||
	wg          *sync.WaitGroup     // wait group for all goroutines in cc-metric-collector
 | 
			
		||||
	timestamp   time.Time           // timestamp periodically updated by ticker each interval
 | 
			
		||||
@@ -56,6 +58,7 @@ type metricRouter struct {
 | 
			
		||||
	cache       MetricCache         // pointer to MetricCache
 | 
			
		||||
	cachewg     sync.WaitGroup      // wait group for MetricCache
 | 
			
		||||
	maxForward  int                 // number of metrics to forward maximally in one iteration
 | 
			
		||||
	mp          mp.MessageProcessor
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// MetricRouter access functions
 | 
			
		||||
@@ -119,10 +122,52 @@ func (r *metricRouter) Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, rout
 | 
			
		||||
			r.cache.AddAggregation(agg.Name, agg.Function, agg.Condition, agg.Tags, agg.Meta)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	r.config.dropMetrics = make(map[string]bool)
 | 
			
		||||
	for _, mname := range r.config.DropMetrics {
 | 
			
		||||
		r.config.dropMetrics[mname] = true
 | 
			
		||||
	p, err := mp.NewMessageProcessor()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return fmt.Errorf("initialization of message processor failed: %v", err.Error())
 | 
			
		||||
	}
 | 
			
		||||
	r.mp = p
 | 
			
		||||
 | 
			
		||||
	if len(r.config.MessageProcessor) > 0 {
 | 
			
		||||
		err = r.mp.FromConfigJSON(r.config.MessageProcessor)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return fmt.Errorf("failed parsing JSON for message processor: %v", err.Error())
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	for _, mname := range r.config.DropMetrics {
 | 
			
		||||
		r.mp.AddDropMessagesByName(mname)
 | 
			
		||||
	}
 | 
			
		||||
	for _, cond := range r.config.DropMetricsIf {
 | 
			
		||||
		r.mp.AddDropMessagesByCondition(cond)
 | 
			
		||||
	}
 | 
			
		||||
	for _, data := range r.config.AddTags {
 | 
			
		||||
		cond := data.Condition
 | 
			
		||||
		if cond == "*" {
 | 
			
		||||
			cond = "true"
 | 
			
		||||
		}
 | 
			
		||||
		r.mp.AddAddTagsByCondition(cond, data.Key, data.Value)
 | 
			
		||||
	}
 | 
			
		||||
	for _, data := range r.config.DelTags {
 | 
			
		||||
		cond := data.Condition
 | 
			
		||||
		if cond == "*" {
 | 
			
		||||
			cond = "true"
 | 
			
		||||
		}
 | 
			
		||||
		r.mp.AddDeleteTagsByCondition(cond, data.Key, data.Value)
 | 
			
		||||
	}
 | 
			
		||||
	for oldname, newname := range r.config.RenameMetrics {
 | 
			
		||||
		r.mp.AddRenameMetricByName(oldname, newname)
 | 
			
		||||
	}
 | 
			
		||||
	for metricName, prefix := range r.config.ChangeUnitPrefix {
 | 
			
		||||
		r.mp.AddChangeUnitPrefix(fmt.Sprintf("name == '%s'", metricName), prefix)
 | 
			
		||||
	}
 | 
			
		||||
	r.mp.SetNormalizeUnits(r.config.NormalizeUnits)
 | 
			
		||||
 | 
			
		||||
	r.mp.AddAddTagsByCondition("true", r.config.HostnameTagName, hostname)
 | 
			
		||||
 | 
			
		||||
	// r.config.dropMetrics = make(map[string]bool)
 | 
			
		||||
	// for _, mname := range r.config.DropMetrics {
 | 
			
		||||
	// 	r.config.dropMetrics[mname] = true
 | 
			
		||||
	// }
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -166,81 +211,81 @@ func (r *metricRouter) DoAddTags(point lp.CCMessage) {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// DoDelTags removes a tag when condition is fullfiled
 | 
			
		||||
func (r *metricRouter) DoDelTags(point lp.CCMessage) {
 | 
			
		||||
	var conditionMatches bool
 | 
			
		||||
	for _, m := range r.config.DelTags {
 | 
			
		||||
		if m.Condition == "*" {
 | 
			
		||||
			// Condition is always matched
 | 
			
		||||
			conditionMatches = true
 | 
			
		||||
		} else {
 | 
			
		||||
			// Evaluate condition
 | 
			
		||||
			var err error
 | 
			
		||||
			conditionMatches, err = agg.EvalBoolCondition(m.Condition, getParamMap(point))
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				cclog.ComponentError("MetricRouter", err.Error())
 | 
			
		||||
				conditionMatches = false
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		if conditionMatches {
 | 
			
		||||
			point.RemoveTag(m.Key)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
// func (r *metricRouter) DoDelTags(point lp.CCMessage) {
 | 
			
		||||
// 	var conditionMatches bool
 | 
			
		||||
// 	for _, m := range r.config.DelTags {
 | 
			
		||||
// 		if m.Condition == "*" {
 | 
			
		||||
// 			// Condition is always matched
 | 
			
		||||
// 			conditionMatches = true
 | 
			
		||||
// 		} else {
 | 
			
		||||
// 			// Evaluate condition
 | 
			
		||||
// 			var err error
 | 
			
		||||
// 			conditionMatches, err = agg.EvalBoolCondition(m.Condition, getParamMap(point))
 | 
			
		||||
// 			if err != nil {
 | 
			
		||||
// 				cclog.ComponentError("MetricRouter", err.Error())
 | 
			
		||||
// 				conditionMatches = false
 | 
			
		||||
// 			}
 | 
			
		||||
// 		}
 | 
			
		||||
// 		if conditionMatches {
 | 
			
		||||
// 			point.RemoveTag(m.Key)
 | 
			
		||||
// 		}
 | 
			
		||||
// 	}
 | 
			
		||||
// }
 | 
			
		||||
 | 
			
		||||
// Conditional test whether a metric should be dropped
 | 
			
		||||
func (r *metricRouter) dropMetric(point lp.CCMessage) bool {
 | 
			
		||||
	// Simple drop check
 | 
			
		||||
	if conditionMatches, ok := r.config.dropMetrics[point.Name()]; ok {
 | 
			
		||||
		return conditionMatches
 | 
			
		||||
	}
 | 
			
		||||
// func (r *metricRouter) dropMetric(point lp.CCMessage) bool {
 | 
			
		||||
// 	// Simple drop check
 | 
			
		||||
// 	if conditionMatches, ok := r.config.dropMetrics[point.Name()]; ok {
 | 
			
		||||
// 		return conditionMatches
 | 
			
		||||
// 	}
 | 
			
		||||
 | 
			
		||||
	// Checking the dropping conditions
 | 
			
		||||
	for _, m := range r.config.DropMetricsIf {
 | 
			
		||||
		conditionMatches, err := agg.EvalBoolCondition(m, getParamMap(point))
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			cclog.ComponentError("MetricRouter", err.Error())
 | 
			
		||||
			conditionMatches = false
 | 
			
		||||
		}
 | 
			
		||||
		if conditionMatches {
 | 
			
		||||
			return conditionMatches
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
// 	// Checking the dropping conditions
 | 
			
		||||
// 	for _, m := range r.config.DropMetricsIf {
 | 
			
		||||
// 		conditionMatches, err := agg.EvalBoolCondition(m, getParamMap(point))
 | 
			
		||||
// 		if err != nil {
 | 
			
		||||
// 			cclog.ComponentError("MetricRouter", err.Error())
 | 
			
		||||
// 			conditionMatches = false
 | 
			
		||||
// 		}
 | 
			
		||||
// 		if conditionMatches {
 | 
			
		||||
// 			return conditionMatches
 | 
			
		||||
// 		}
 | 
			
		||||
// 	}
 | 
			
		||||
 | 
			
		||||
	// No dropping condition met
 | 
			
		||||
	return false
 | 
			
		||||
}
 | 
			
		||||
// 	// No dropping condition met
 | 
			
		||||
// 	return false
 | 
			
		||||
// }
 | 
			
		||||
 | 
			
		||||
func (r *metricRouter) prepareUnit(point lp.CCMessage) bool {
 | 
			
		||||
	if r.config.NormalizeUnits {
 | 
			
		||||
		if in_unit, ok := point.GetMeta("unit"); ok {
 | 
			
		||||
			u := units.NewUnit(in_unit)
 | 
			
		||||
			if u.Valid() {
 | 
			
		||||
				point.AddMeta("unit", u.Short())
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	if newP, ok := r.config.ChangeUnitPrefix[point.Name()]; ok {
 | 
			
		||||
// func (r *metricRouter) prepareUnit(point lp.CCMessage) bool {
 | 
			
		||||
// 	if r.config.NormalizeUnits {
 | 
			
		||||
// 		if in_unit, ok := point.GetMeta("unit"); ok {
 | 
			
		||||
// 			u := units.NewUnit(in_unit)
 | 
			
		||||
// 			if u.Valid() {
 | 
			
		||||
// 				point.AddMeta("unit", u.Short())
 | 
			
		||||
// 			}
 | 
			
		||||
// 		}
 | 
			
		||||
// 	}
 | 
			
		||||
// 	if newP, ok := r.config.ChangeUnitPrefix[point.Name()]; ok {
 | 
			
		||||
 | 
			
		||||
		newPrefix := units.NewPrefix(newP)
 | 
			
		||||
// 		newPrefix := units.NewPrefix(newP)
 | 
			
		||||
 | 
			
		||||
		if in_unit, ok := point.GetMeta("unit"); ok && newPrefix != units.InvalidPrefix {
 | 
			
		||||
			u := units.NewUnit(in_unit)
 | 
			
		||||
			if u.Valid() {
 | 
			
		||||
				cclog.ComponentDebug("MetricRouter", "Change prefix to", newP, "for metric", point.Name())
 | 
			
		||||
				conv, out_unit := units.GetUnitPrefixFactor(u, newPrefix)
 | 
			
		||||
				if conv != nil && out_unit.Valid() {
 | 
			
		||||
					if val, ok := point.GetField("value"); ok {
 | 
			
		||||
						point.AddField("value", conv(val))
 | 
			
		||||
						point.AddMeta("unit", out_unit.Short())
 | 
			
		||||
					}
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
// 		if in_unit, ok := point.GetMeta("unit"); ok && newPrefix != units.InvalidPrefix {
 | 
			
		||||
// 			u := units.NewUnit(in_unit)
 | 
			
		||||
// 			if u.Valid() {
 | 
			
		||||
// 				cclog.ComponentDebug("MetricRouter", "Change prefix to", newP, "for metric", point.Name())
 | 
			
		||||
// 				conv, out_unit := units.GetUnitPrefixFactor(u, newPrefix)
 | 
			
		||||
// 				if conv != nil && out_unit.Valid() {
 | 
			
		||||
// 					if val, ok := point.GetField("value"); ok {
 | 
			
		||||
// 						point.AddField("value", conv(val))
 | 
			
		||||
// 						point.AddMeta("unit", out_unit.Short())
 | 
			
		||||
// 					}
 | 
			
		||||
// 				}
 | 
			
		||||
// 			}
 | 
			
		||||
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
// 		}
 | 
			
		||||
// 	}
 | 
			
		||||
 | 
			
		||||
	return true
 | 
			
		||||
}
 | 
			
		||||
// 	return true
 | 
			
		||||
// }
 | 
			
		||||
 | 
			
		||||
// Start starts the metric router
 | 
			
		||||
func (r *metricRouter) Start() {
 | 
			
		||||
@@ -259,39 +304,47 @@ func (r *metricRouter) Start() {
 | 
			
		||||
 | 
			
		||||
	// Forward takes a received metric, adds or deletes tags
 | 
			
		||||
	// and forwards it to the output channels
 | 
			
		||||
	forward := func(point lp.CCMessage) {
 | 
			
		||||
		cclog.ComponentDebug("MetricRouter", "FORWARD", point)
 | 
			
		||||
		r.DoAddTags(point)
 | 
			
		||||
		r.DoDelTags(point)
 | 
			
		||||
		name := point.Name()
 | 
			
		||||
		if new, ok := r.config.RenameMetrics[name]; ok {
 | 
			
		||||
			point.SetName(new)
 | 
			
		||||
			point.AddMeta("oldname", name)
 | 
			
		||||
			r.DoAddTags(point)
 | 
			
		||||
			r.DoDelTags(point)
 | 
			
		||||
		}
 | 
			
		||||
	// forward := func(point lp.CCMessage) {
 | 
			
		||||
	// 	cclog.ComponentDebug("MetricRouter", "FORWARD", point)
 | 
			
		||||
	// 	r.DoAddTags(point)
 | 
			
		||||
	// 	r.DoDelTags(point)
 | 
			
		||||
	// 	name := point.Name()
 | 
			
		||||
	// 	if new, ok := r.config.RenameMetrics[name]; ok {
 | 
			
		||||
	// 		point.SetName(new)
 | 
			
		||||
	// 		point.AddMeta("oldname", name)
 | 
			
		||||
	// 		r.DoAddTags(point)
 | 
			
		||||
	// 		r.DoDelTags(point)
 | 
			
		||||
	// 	}
 | 
			
		||||
 | 
			
		||||
		r.prepareUnit(point)
 | 
			
		||||
	// 	r.prepareUnit(point)
 | 
			
		||||
 | 
			
		||||
		for _, o := range r.outputs {
 | 
			
		||||
			o <- point
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	// 	for _, o := range r.outputs {
 | 
			
		||||
	// 		o <- point
 | 
			
		||||
	// 	}
 | 
			
		||||
	// }
 | 
			
		||||
 | 
			
		||||
	// Foward message received from collector channel
 | 
			
		||||
	coll_forward := func(p lp.CCMessage) {
 | 
			
		||||
		// receive from metric collector
 | 
			
		||||
		p.AddTag(r.config.HostnameTagName, r.hostname)
 | 
			
		||||
		//p.AddTag(r.config.HostnameTagName, r.hostname)
 | 
			
		||||
		if r.config.IntervalStamp {
 | 
			
		||||
			p.SetTime(r.timestamp)
 | 
			
		||||
		}
 | 
			
		||||
		if !r.dropMetric(p) {
 | 
			
		||||
			forward(p)
 | 
			
		||||
		m, err := r.mp.ProcessMessage(p)
 | 
			
		||||
		if err == nil && m != nil {
 | 
			
		||||
			for _, o := range r.outputs {
 | 
			
		||||
				o <- m
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		// if !r.dropMetric(p) {
 | 
			
		||||
		// 	for _, o := range r.outputs {
 | 
			
		||||
		// 		o <- point
 | 
			
		||||
		// 	}
 | 
			
		||||
		// }
 | 
			
		||||
		// even if the metric is dropped, it is stored in the cache for
 | 
			
		||||
		// aggregations
 | 
			
		||||
		if r.config.NumCacheIntervals > 0 {
 | 
			
		||||
			r.cache.Add(p)
 | 
			
		||||
			r.cache.Add(m)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@@ -301,17 +354,25 @@ func (r *metricRouter) Start() {
 | 
			
		||||
		if r.config.IntervalStamp {
 | 
			
		||||
			p.SetTime(r.timestamp)
 | 
			
		||||
		}
 | 
			
		||||
		if !r.dropMetric(p) {
 | 
			
		||||
			forward(p)
 | 
			
		||||
		m, err := r.mp.ProcessMessage(p)
 | 
			
		||||
		if err == nil && m != nil {
 | 
			
		||||
			for _, o := range r.outputs {
 | 
			
		||||
				o <- m
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		// if !r.dropMetric(p) {
 | 
			
		||||
		// 	forward(p)
 | 
			
		||||
		// }
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Forward message received from cache channel
 | 
			
		||||
	cache_forward := func(p lp.CCMessage) {
 | 
			
		||||
		// receive from metric collector
 | 
			
		||||
		if !r.dropMetric(p) {
 | 
			
		||||
			p.AddTag(r.config.HostnameTagName, r.hostname)
 | 
			
		||||
			forward(p)
 | 
			
		||||
		m, err := r.mp.ProcessMessage(p)
 | 
			
		||||
		if err == nil && m != nil {
 | 
			
		||||
			for _, o := range r.outputs {
 | 
			
		||||
				o <- m
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -10,15 +10,16 @@ import (
 | 
			
		||||
	"sync"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
 | 
			
		||||
	lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message"
 | 
			
		||||
	cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
 | 
			
		||||
	mp "github.com/ClusterCockpit/cc-metric-collector/pkg/messageProcessor"
 | 
			
		||||
	influx "github.com/influxdata/line-protocol/v2/lineprotocol"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
const HTTP_RECEIVER_PORT = "8080"
 | 
			
		||||
 | 
			
		||||
type HttpReceiverConfig struct {
 | 
			
		||||
	Type string `json:"type"`
 | 
			
		||||
	defaultReceiverConfig
 | 
			
		||||
	Addr string `json:"address"`
 | 
			
		||||
	Port string `json:"port"`
 | 
			
		||||
	Path string `json:"path"`
 | 
			
		||||
@@ -39,7 +40,7 @@ type HttpReceiverConfig struct {
 | 
			
		||||
 | 
			
		||||
type HttpReceiver struct {
 | 
			
		||||
	receiver
 | 
			
		||||
	meta   map[string]string
 | 
			
		||||
	//meta   map[string]string
 | 
			
		||||
	config HttpReceiverConfig
 | 
			
		||||
	server *http.Server
 | 
			
		||||
	wg     sync.WaitGroup
 | 
			
		||||
@@ -85,8 +86,20 @@ func (r *HttpReceiver) Init(name string, config json.RawMessage) error {
 | 
			
		||||
	if r.config.useBasicAuth && len(r.config.Password) == 0 {
 | 
			
		||||
		return errors.New("basic authentication requires password")
 | 
			
		||||
	}
 | 
			
		||||
	msgp, err := mp.NewMessageProcessor()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return fmt.Errorf("initialization of message processor failed: %v", err.Error())
 | 
			
		||||
	}
 | 
			
		||||
	r.mp = msgp
 | 
			
		||||
	if len(r.config.MessageProcessor) > 0 {
 | 
			
		||||
		err = r.mp.FromConfigJSON(r.config.MessageProcessor)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return fmt.Errorf("failed parsing JSON for message processor: %v", err.Error())
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	r.mp.AddAddMetaByCondition("true", "source", r.name)
 | 
			
		||||
 | 
			
		||||
	r.meta = map[string]string{"source": r.name}
 | 
			
		||||
	//r.meta = map[string]string{"source": r.name}
 | 
			
		||||
	p := r.config.Path
 | 
			
		||||
	if !strings.HasPrefix(p, "/") {
 | 
			
		||||
		p = "/" + p
 | 
			
		||||
@@ -137,80 +150,82 @@ func (r *HttpReceiver) ServerHttp(w http.ResponseWriter, req *http.Request) {
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	if r.sink != nil {
 | 
			
		||||
		d := influx.NewDecoder(req.Body)
 | 
			
		||||
		for d.Next() {
 | 
			
		||||
 | 
			
		||||
	d := influx.NewDecoder(req.Body)
 | 
			
		||||
	for d.Next() {
 | 
			
		||||
 | 
			
		||||
		// Decode measurement name
 | 
			
		||||
		measurement, err := d.Measurement()
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			msg := "ServerHttp: Failed to decode measurement: " + err.Error()
 | 
			
		||||
			cclog.ComponentError(r.name, msg)
 | 
			
		||||
			http.Error(w, msg, http.StatusInternalServerError)
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		// Decode tags
 | 
			
		||||
		tags := make(map[string]string)
 | 
			
		||||
		for {
 | 
			
		||||
			key, value, err := d.NextTag()
 | 
			
		||||
			// Decode measurement name
 | 
			
		||||
			measurement, err := d.Measurement()
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				msg := "ServerHttp: Failed to decode tag: " + err.Error()
 | 
			
		||||
				msg := "ServerHttp: Failed to decode measurement: " + err.Error()
 | 
			
		||||
				cclog.ComponentError(r.name, msg)
 | 
			
		||||
				http.Error(w, msg, http.StatusInternalServerError)
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
			if key == nil {
 | 
			
		||||
				break
 | 
			
		||||
			}
 | 
			
		||||
			tags[string(key)] = string(value)
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		// Decode fields
 | 
			
		||||
		fields := make(map[string]interface{})
 | 
			
		||||
		for {
 | 
			
		||||
			key, value, err := d.NextField()
 | 
			
		||||
			// Decode tags
 | 
			
		||||
			tags := make(map[string]string)
 | 
			
		||||
			for {
 | 
			
		||||
				key, value, err := d.NextTag()
 | 
			
		||||
				if err != nil {
 | 
			
		||||
					msg := "ServerHttp: Failed to decode tag: " + err.Error()
 | 
			
		||||
					cclog.ComponentError(r.name, msg)
 | 
			
		||||
					http.Error(w, msg, http.StatusInternalServerError)
 | 
			
		||||
					return
 | 
			
		||||
				}
 | 
			
		||||
				if key == nil {
 | 
			
		||||
					break
 | 
			
		||||
				}
 | 
			
		||||
				tags[string(key)] = string(value)
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			// Decode fields
 | 
			
		||||
			fields := make(map[string]interface{})
 | 
			
		||||
			for {
 | 
			
		||||
				key, value, err := d.NextField()
 | 
			
		||||
				if err != nil {
 | 
			
		||||
					msg := "ServerHttp: Failed to decode field: " + err.Error()
 | 
			
		||||
					cclog.ComponentError(r.name, msg)
 | 
			
		||||
					http.Error(w, msg, http.StatusInternalServerError)
 | 
			
		||||
					return
 | 
			
		||||
				}
 | 
			
		||||
				if key == nil {
 | 
			
		||||
					break
 | 
			
		||||
				}
 | 
			
		||||
				fields[string(key)] = value.Interface()
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			// Decode time stamp
 | 
			
		||||
			t, err := d.Time(influx.Nanosecond, time.Time{})
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				msg := "ServerHttp: Failed to decode field: " + err.Error()
 | 
			
		||||
				msg := "ServerHttp: Failed to decode time stamp: " + err.Error()
 | 
			
		||||
				cclog.ComponentError(r.name, msg)
 | 
			
		||||
				http.Error(w, msg, http.StatusInternalServerError)
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
			if key == nil {
 | 
			
		||||
				break
 | 
			
		||||
			}
 | 
			
		||||
			fields[string(key)] = value.Interface()
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		// Decode time stamp
 | 
			
		||||
		t, err := d.Time(influx.Nanosecond, time.Time{})
 | 
			
		||||
			y, _ := lp.NewMessage(
 | 
			
		||||
				string(measurement),
 | 
			
		||||
				tags,
 | 
			
		||||
				nil,
 | 
			
		||||
				fields,
 | 
			
		||||
				t,
 | 
			
		||||
			)
 | 
			
		||||
 | 
			
		||||
			m, err := r.mp.ProcessMessage(y)
 | 
			
		||||
			if err == nil && m != nil {
 | 
			
		||||
				r.sink <- m
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
		}
 | 
			
		||||
		// Check for IO errors
 | 
			
		||||
		err := d.Err()
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			msg := "ServerHttp: Failed to decode time stamp: " + err.Error()
 | 
			
		||||
			msg := "ServerHttp: Failed to decode: " + err.Error()
 | 
			
		||||
			cclog.ComponentError(r.name, msg)
 | 
			
		||||
			http.Error(w, msg, http.StatusInternalServerError)
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		y, _ := lp.NewMessage(
 | 
			
		||||
			string(measurement),
 | 
			
		||||
			tags,
 | 
			
		||||
			r.meta,
 | 
			
		||||
			fields,
 | 
			
		||||
			t,
 | 
			
		||||
		)
 | 
			
		||||
 | 
			
		||||
		if r.sink != nil {
 | 
			
		||||
			r.sink <- y
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Check for IO errors
 | 
			
		||||
	err := d.Err()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		msg := "ServerHttp: Failed to decode: " + err.Error()
 | 
			
		||||
		cclog.ComponentError(r.name, msg)
 | 
			
		||||
		http.Error(w, msg, http.StatusInternalServerError)
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	w.WriteHeader(http.StatusOK)
 | 
			
		||||
 
 | 
			
		||||
@@ -13,9 +13,10 @@ import (
 | 
			
		||||
	"sync"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
 | 
			
		||||
	lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message"
 | 
			
		||||
	cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
 | 
			
		||||
	"github.com/ClusterCockpit/cc-metric-collector/pkg/hostlist"
 | 
			
		||||
	mp "github.com/ClusterCockpit/cc-metric-collector/pkg/messageProcessor"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type IPMIReceiverClientConfig struct {
 | 
			
		||||
@@ -31,11 +32,13 @@ type IPMIReceiverClientConfig struct {
 | 
			
		||||
	Password         string            // Password to use for authentication
 | 
			
		||||
	CLIOptions       []string          // Additional command line options for ipmi-sensors
 | 
			
		||||
	isExcluded       map[string]bool   // is metric excluded
 | 
			
		||||
	mp               mp.MessageProcessor
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type IPMIReceiver struct {
 | 
			
		||||
	receiver
 | 
			
		||||
	config struct {
 | 
			
		||||
		defaultReceiverConfig
 | 
			
		||||
		Interval time.Duration
 | 
			
		||||
 | 
			
		||||
		// Client config for each IPMI hosts
 | 
			
		||||
@@ -43,10 +46,11 @@ type IPMIReceiver struct {
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Storage for static information
 | 
			
		||||
	meta map[string]string
 | 
			
		||||
	//meta map[string]string
 | 
			
		||||
 | 
			
		||||
	done chan bool      // channel to finish / stop IPMI receiver
 | 
			
		||||
	wg   sync.WaitGroup // wait group for IPMI receiver
 | 
			
		||||
	mp   mp.MessageProcessor
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// doReadMetrics reads metrics from all configure IPMI hosts.
 | 
			
		||||
@@ -230,7 +234,14 @@ func (r *IPMIReceiver) doReadMetric() {
 | 
			
		||||
					},
 | 
			
		||||
					time.Now())
 | 
			
		||||
				if err == nil {
 | 
			
		||||
					r.sink <- y
 | 
			
		||||
					mc, err := clientConfig.mp.ProcessMessage(y)
 | 
			
		||||
					if err == nil && mc != nil {
 | 
			
		||||
						m, err := r.mp.ProcessMessage(mc)
 | 
			
		||||
						if err == nil && m != nil {
 | 
			
		||||
							r.sink <- m
 | 
			
		||||
						}
 | 
			
		||||
					}
 | 
			
		||||
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
@@ -296,11 +307,12 @@ func (r *IPMIReceiver) Close() {
 | 
			
		||||
// NewIPMIReceiver creates a new instance of the redfish receiver
 | 
			
		||||
// Initialize the receiver by giving it a name and reading in the config JSON
 | 
			
		||||
func NewIPMIReceiver(name string, config json.RawMessage) (Receiver, error) {
 | 
			
		||||
	var err error
 | 
			
		||||
	r := new(IPMIReceiver)
 | 
			
		||||
 | 
			
		||||
	// Config options from config file
 | 
			
		||||
	configJSON := struct {
 | 
			
		||||
		Type string `json:"type"`
 | 
			
		||||
		defaultReceiverConfig
 | 
			
		||||
 | 
			
		||||
		// How often the IPMI sensor metrics should be read and send to the sink (default: 30 s)
 | 
			
		||||
		IntervalString string `json:"interval,omitempty"`
 | 
			
		||||
@@ -331,7 +343,8 @@ func NewIPMIReceiver(name string, config json.RawMessage) (Receiver, error) {
 | 
			
		||||
			ExcludeMetrics []string `json:"exclude_metrics,omitempty"`
 | 
			
		||||
 | 
			
		||||
			// Additional command line options for ipmi-sensors
 | 
			
		||||
			CLIOptions []string `json:"cli_options,omitempty"`
 | 
			
		||||
			CLIOptions       []string        `json:"cli_options,omitempty"`
 | 
			
		||||
			MessageProcessor json.RawMessage `json:"process_messages,omitempty"`
 | 
			
		||||
		} `json:"client_config"`
 | 
			
		||||
	}{
 | 
			
		||||
		// Set defaults values
 | 
			
		||||
@@ -347,8 +360,15 @@ func NewIPMIReceiver(name string, config json.RawMessage) (Receiver, error) {
 | 
			
		||||
	// Create done channel
 | 
			
		||||
	r.done = make(chan bool)
 | 
			
		||||
 | 
			
		||||
	p, err := mp.NewMessageProcessor()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, fmt.Errorf("initialization of message processor failed: %v", err.Error())
 | 
			
		||||
	}
 | 
			
		||||
	r.mp = p
 | 
			
		||||
 | 
			
		||||
	// Set static information
 | 
			
		||||
	r.meta = map[string]string{"source": r.name}
 | 
			
		||||
	//r.meta = map[string]string{"source": r.name}
 | 
			
		||||
	r.mp.AddAddMetaByCondition("true", "source", r.name)
 | 
			
		||||
 | 
			
		||||
	// Read the IPMI receiver specific JSON config
 | 
			
		||||
	if len(config) > 0 {
 | 
			
		||||
@@ -360,12 +380,18 @@ func NewIPMIReceiver(name string, config json.RawMessage) (Receiver, error) {
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if len(r.config.MessageProcessor) > 0 {
 | 
			
		||||
		err = r.mp.FromConfigJSON(r.config.MessageProcessor)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return nil, fmt.Errorf("failed parsing JSON for message processor: %v", err.Error())
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	// Convert interval string representation to duration
 | 
			
		||||
	var err error
 | 
			
		||||
 | 
			
		||||
	r.config.Interval, err = time.ParseDuration(configJSON.IntervalString)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		err := fmt.Errorf(
 | 
			
		||||
			"Failed to parse duration string interval='%s': %w",
 | 
			
		||||
			"failed to parse duration string interval='%s': %w",
 | 
			
		||||
			configJSON.IntervalString,
 | 
			
		||||
			err,
 | 
			
		||||
		)
 | 
			
		||||
@@ -506,6 +532,16 @@ func NewIPMIReceiver(name string, config json.RawMessage) (Receiver, error) {
 | 
			
		||||
		for _, key := range configJSON.ExcludeMetrics {
 | 
			
		||||
			isExcluded[key] = true
 | 
			
		||||
		}
 | 
			
		||||
		p, err := mp.NewMessageProcessor()
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return nil, fmt.Errorf("initialization of message processor failed: %v", err.Error())
 | 
			
		||||
		}
 | 
			
		||||
		if len(clientConfigJSON.MessageProcessor) > 0 {
 | 
			
		||||
			err = p.FromConfigJSON(clientConfigJSON.MessageProcessor)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				return nil, fmt.Errorf("failed parsing JSON for message processor: %v", err.Error())
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		r.config.ClientConfigs = append(
 | 
			
		||||
			r.config.ClientConfigs,
 | 
			
		||||
@@ -520,6 +556,7 @@ func NewIPMIReceiver(name string, config json.RawMessage) (Receiver, error) {
 | 
			
		||||
				Password:         password,
 | 
			
		||||
				CLIOptions:       cliOptions,
 | 
			
		||||
				isExcluded:       isExcluded,
 | 
			
		||||
				mp:               p,
 | 
			
		||||
			})
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -1,11 +1,15 @@
 | 
			
		||||
package receivers
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"encoding/json"
 | 
			
		||||
 | 
			
		||||
	lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message"
 | 
			
		||||
	mp "github.com/ClusterCockpit/cc-metric-collector/pkg/messageProcessor"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type defaultReceiverConfig struct {
 | 
			
		||||
	Type string `json:"type"`
 | 
			
		||||
	Type             string          `json:"type"`
 | 
			
		||||
	MessageProcessor json.RawMessage `json:"process_messages,omitempty"`
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Receiver configuration: Listen address, port
 | 
			
		||||
@@ -20,12 +24,13 @@ type ReceiverConfig struct {
 | 
			
		||||
type receiver struct {
 | 
			
		||||
	name string
 | 
			
		||||
	sink chan lp.CCMessage
 | 
			
		||||
	mp   mp.MessageProcessor
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type Receiver interface {
 | 
			
		||||
	Start()
 | 
			
		||||
	Close()                        // Close / finish metric receiver
 | 
			
		||||
	Name() string                  // Name of the metric receiver
 | 
			
		||||
	Close()                         // Close / finish metric receiver
 | 
			
		||||
	Name() string                   // Name of the metric receiver
 | 
			
		||||
	SetSink(sink chan lp.CCMessage) // Set sink channel
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -6,14 +6,15 @@ import (
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
 | 
			
		||||
	lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message"
 | 
			
		||||
	cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
 | 
			
		||||
	mp "github.com/ClusterCockpit/cc-metric-collector/pkg/messageProcessor"
 | 
			
		||||
	influx "github.com/influxdata/line-protocol/v2/lineprotocol"
 | 
			
		||||
	nats "github.com/nats-io/nats.go"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type NatsReceiverConfig struct {
 | 
			
		||||
	Type    string `json:"type"`
 | 
			
		||||
	defaultReceiverConfig
 | 
			
		||||
	Addr    string `json:"address"`
 | 
			
		||||
	Port    string `json:"port"`
 | 
			
		||||
	Subject string `json:"subject"`
 | 
			
		||||
@@ -21,8 +22,8 @@ type NatsReceiverConfig struct {
 | 
			
		||||
 | 
			
		||||
type NatsReceiver struct {
 | 
			
		||||
	receiver
 | 
			
		||||
	nc     *nats.Conn
 | 
			
		||||
	meta   map[string]string
 | 
			
		||||
	nc *nats.Conn
 | 
			
		||||
	//meta   map[string]string
 | 
			
		||||
	config NatsReceiverConfig
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -36,65 +37,68 @@ func (r *NatsReceiver) Start() {
 | 
			
		||||
// _NatsReceive receives subscribed messages from the NATS server
 | 
			
		||||
func (r *NatsReceiver) _NatsReceive(m *nats.Msg) {
 | 
			
		||||
 | 
			
		||||
	d := influx.NewDecoderWithBytes(m.Data)
 | 
			
		||||
	for d.Next() {
 | 
			
		||||
	if r.sink != nil {
 | 
			
		||||
		d := influx.NewDecoderWithBytes(m.Data)
 | 
			
		||||
		for d.Next() {
 | 
			
		||||
 | 
			
		||||
		// Decode measurement name
 | 
			
		||||
		measurement, err := d.Measurement()
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			msg := "_NatsReceive: Failed to decode measurement: " + err.Error()
 | 
			
		||||
			cclog.ComponentError(r.name, msg)
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		// Decode tags
 | 
			
		||||
		tags := make(map[string]string)
 | 
			
		||||
		for {
 | 
			
		||||
			key, value, err := d.NextTag()
 | 
			
		||||
			// Decode measurement name
 | 
			
		||||
			measurement, err := d.Measurement()
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				msg := "_NatsReceive: Failed to decode tag: " + err.Error()
 | 
			
		||||
				msg := "_NatsReceive: Failed to decode measurement: " + err.Error()
 | 
			
		||||
				cclog.ComponentError(r.name, msg)
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
			if key == nil {
 | 
			
		||||
				break
 | 
			
		||||
			}
 | 
			
		||||
			tags[string(key)] = string(value)
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		// Decode fields
 | 
			
		||||
		fields := make(map[string]interface{})
 | 
			
		||||
		for {
 | 
			
		||||
			key, value, err := d.NextField()
 | 
			
		||||
			// Decode tags
 | 
			
		||||
			tags := make(map[string]string)
 | 
			
		||||
			for {
 | 
			
		||||
				key, value, err := d.NextTag()
 | 
			
		||||
				if err != nil {
 | 
			
		||||
					msg := "_NatsReceive: Failed to decode tag: " + err.Error()
 | 
			
		||||
					cclog.ComponentError(r.name, msg)
 | 
			
		||||
					return
 | 
			
		||||
				}
 | 
			
		||||
				if key == nil {
 | 
			
		||||
					break
 | 
			
		||||
				}
 | 
			
		||||
				tags[string(key)] = string(value)
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			// Decode fields
 | 
			
		||||
			fields := make(map[string]interface{})
 | 
			
		||||
			for {
 | 
			
		||||
				key, value, err := d.NextField()
 | 
			
		||||
				if err != nil {
 | 
			
		||||
					msg := "_NatsReceive: Failed to decode field: " + err.Error()
 | 
			
		||||
					cclog.ComponentError(r.name, msg)
 | 
			
		||||
					return
 | 
			
		||||
				}
 | 
			
		||||
				if key == nil {
 | 
			
		||||
					break
 | 
			
		||||
				}
 | 
			
		||||
				fields[string(key)] = value.Interface()
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			// Decode time stamp
 | 
			
		||||
			t, err := d.Time(influx.Nanosecond, time.Time{})
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				msg := "_NatsReceive: Failed to decode field: " + err.Error()
 | 
			
		||||
				msg := "_NatsReceive: Failed to decode time: " + err.Error()
 | 
			
		||||
				cclog.ComponentError(r.name, msg)
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
			if key == nil {
 | 
			
		||||
				break
 | 
			
		||||
 | 
			
		||||
			y, _ := lp.NewMessage(
 | 
			
		||||
				string(measurement),
 | 
			
		||||
				tags,
 | 
			
		||||
				nil,
 | 
			
		||||
				fields,
 | 
			
		||||
				t,
 | 
			
		||||
			)
 | 
			
		||||
 | 
			
		||||
			m, err := r.mp.ProcessMessage(y)
 | 
			
		||||
			if err == nil && m != nil {
 | 
			
		||||
				r.sink <- m
 | 
			
		||||
			}
 | 
			
		||||
			fields[string(key)] = value.Interface()
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		// Decode time stamp
 | 
			
		||||
		t, err := d.Time(influx.Nanosecond, time.Time{})
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			msg := "_NatsReceive: Failed to decode time: " + err.Error()
 | 
			
		||||
			cclog.ComponentError(r.name, msg)
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		y, _ := lp.NewMessage(
 | 
			
		||||
			string(measurement),
 | 
			
		||||
			tags,
 | 
			
		||||
			r.meta,
 | 
			
		||||
			fields,
 | 
			
		||||
			t,
 | 
			
		||||
		)
 | 
			
		||||
 | 
			
		||||
		if r.sink != nil {
 | 
			
		||||
			r.sink <- y
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
@@ -127,11 +131,23 @@ func NewNatsReceiver(name string, config json.RawMessage) (Receiver, error) {
 | 
			
		||||
		len(r.config.Subject) == 0 {
 | 
			
		||||
		return nil, errors.New("not all configuration variables set required by NatsReceiver")
 | 
			
		||||
	}
 | 
			
		||||
	p, err := mp.NewMessageProcessor()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, fmt.Errorf("initialization of message processor failed: %v", err.Error())
 | 
			
		||||
	}
 | 
			
		||||
	r.mp = p
 | 
			
		||||
	if len(r.config.MessageProcessor) > 0 {
 | 
			
		||||
		err = r.mp.FromConfigJSON(r.config.MessageProcessor)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return nil, fmt.Errorf("failed parsing JSON for message processor: %v", err.Error())
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Set metadata
 | 
			
		||||
	r.meta = map[string]string{
 | 
			
		||||
		"source": r.name,
 | 
			
		||||
	}
 | 
			
		||||
	// r.meta = map[string]string{
 | 
			
		||||
	// 	"source": r.name,
 | 
			
		||||
	// }
 | 
			
		||||
	r.mp.AddAddMetaByCondition("true", "source", r.name)
 | 
			
		||||
 | 
			
		||||
	// Connect to NATS server
 | 
			
		||||
	url := fmt.Sprintf("nats://%s:%s", r.config.Addr, r.config.Port)
 | 
			
		||||
 
 | 
			
		||||
@@ -13,9 +13,10 @@ import (
 | 
			
		||||
	"sync"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
 | 
			
		||||
	lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message"
 | 
			
		||||
	cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
 | 
			
		||||
	"github.com/ClusterCockpit/cc-metric-collector/pkg/hostlist"
 | 
			
		||||
	mp "github.com/ClusterCockpit/cc-metric-collector/pkg/messageProcessor"
 | 
			
		||||
 | 
			
		||||
	// See: https://pkg.go.dev/github.com/stmcginnis/gofish
 | 
			
		||||
	"github.com/stmcginnis/gofish"
 | 
			
		||||
@@ -42,6 +43,8 @@ type RedfishReceiverClientConfig struct {
 | 
			
		||||
	readSensorURLs map[string][]string
 | 
			
		||||
 | 
			
		||||
	gofish gofish.ClientConfig
 | 
			
		||||
 | 
			
		||||
	mp mp.MessageProcessor
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// RedfishReceiver configuration:
 | 
			
		||||
@@ -49,6 +52,7 @@ type RedfishReceiver struct {
 | 
			
		||||
	receiver
 | 
			
		||||
 | 
			
		||||
	config struct {
 | 
			
		||||
		defaultReceiverConfig
 | 
			
		||||
		fanout      int
 | 
			
		||||
		Interval    time.Duration
 | 
			
		||||
		HttpTimeout time.Duration
 | 
			
		||||
@@ -79,13 +83,19 @@ func setMetricValue(value any) map[string]interface{} {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// sendMetric sends the metric through the sink channel
 | 
			
		||||
func (r *RedfishReceiver) sendMetric(name string, tags map[string]string, meta map[string]string, value any, timestamp time.Time) {
 | 
			
		||||
func (r *RedfishReceiver) sendMetric(mp mp.MessageProcessor, name string, tags map[string]string, meta map[string]string, value any, timestamp time.Time) {
 | 
			
		||||
 | 
			
		||||
	deleteEmptyTags(tags)
 | 
			
		||||
	deleteEmptyTags(meta)
 | 
			
		||||
	y, err := lp.NewMessage(name, tags, meta, setMetricValue(value), timestamp)
 | 
			
		||||
	if err == nil {
 | 
			
		||||
		r.sink <- y
 | 
			
		||||
		mc, err := mp.ProcessMessage(y)
 | 
			
		||||
		if err == nil && mc != nil {
 | 
			
		||||
			m, err := r.mp.ProcessMessage(mc)
 | 
			
		||||
			if err == nil && m != nil {
 | 
			
		||||
				r.sink <- m
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -119,7 +129,7 @@ func (r *RedfishReceiver) readSensors(
 | 
			
		||||
			"unit":   "degC",
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		r.sendMetric("temperature", tags, meta, sensor.Reading, time.Now())
 | 
			
		||||
		r.sendMetric(clientConfig.mp, "temperature", tags, meta, sensor.Reading, time.Now())
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	writeFanSpeedSensor := func(sensor *redfish.Sensor) {
 | 
			
		||||
@@ -145,7 +155,7 @@ func (r *RedfishReceiver) readSensors(
 | 
			
		||||
			"unit":   string(sensor.ReadingUnits),
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		r.sendMetric("fan_speed", tags, meta, sensor.Reading, time.Now())
 | 
			
		||||
		r.sendMetric(clientConfig.mp, "fan_speed", tags, meta, sensor.Reading, time.Now())
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	writePowerSensor := func(sensor *redfish.Sensor) {
 | 
			
		||||
@@ -172,7 +182,7 @@ func (r *RedfishReceiver) readSensors(
 | 
			
		||||
			"unit":   "watts",
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		r.sendMetric("power", tags, meta, sensor.Reading, time.Now())
 | 
			
		||||
		r.sendMetric(clientConfig.mp, "power", tags, meta, sensor.Reading, time.Now())
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if _, ok := clientConfig.readSensorURLs[chassis.ID]; !ok {
 | 
			
		||||
@@ -340,7 +350,7 @@ func (r *RedfishReceiver) readThermalMetrics(
 | 
			
		||||
		// ReadingCelsius shall be the current value of the temperature sensor's reading.
 | 
			
		||||
		value := temperature.ReadingCelsius
 | 
			
		||||
 | 
			
		||||
		r.sendMetric("temperature", tags, meta, value, timestamp)
 | 
			
		||||
		r.sendMetric(clientConfig.mp, "temperature", tags, meta, value, timestamp)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for _, fan := range thermal.Fans {
 | 
			
		||||
@@ -381,7 +391,7 @@ func (r *RedfishReceiver) readThermalMetrics(
 | 
			
		||||
			"unit":   string(fan.ReadingUnits),
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		r.sendMetric("fan_speed", tags, meta, fan.Reading, timestamp)
 | 
			
		||||
		r.sendMetric(clientConfig.mp, "fan_speed", tags, meta, fan.Reading, timestamp)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return nil
 | 
			
		||||
@@ -479,7 +489,7 @@ func (r *RedfishReceiver) readPowerMetrics(
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		for name, value := range metrics {
 | 
			
		||||
			r.sendMetric(name, tags, meta, value, timestamp)
 | 
			
		||||
			r.sendMetric(clientConfig.mp, name, tags, meta, value, timestamp)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@@ -561,7 +571,7 @@ func (r *RedfishReceiver) readProcessorMetrics(
 | 
			
		||||
	if !clientConfig.isExcluded[namePower] &&
 | 
			
		||||
		// Some servers return "ConsumedPowerWatt":65535 instead of "ConsumedPowerWatt":null
 | 
			
		||||
		processorMetrics.ConsumedPowerWatt != 65535 {
 | 
			
		||||
		r.sendMetric(namePower, tags, metaPower, processorMetrics.ConsumedPowerWatt, timestamp)
 | 
			
		||||
		r.sendMetric(clientConfig.mp, namePower, tags, metaPower, processorMetrics.ConsumedPowerWatt, timestamp)
 | 
			
		||||
	}
 | 
			
		||||
	// Set meta data tags
 | 
			
		||||
	metaThermal := map[string]string{
 | 
			
		||||
@@ -573,7 +583,7 @@ func (r *RedfishReceiver) readProcessorMetrics(
 | 
			
		||||
	nameThermal := "temperature"
 | 
			
		||||
 | 
			
		||||
	if !clientConfig.isExcluded[nameThermal] {
 | 
			
		||||
		r.sendMetric(nameThermal, tags, metaThermal, processorMetrics.TemperatureCelsius, timestamp)
 | 
			
		||||
		r.sendMetric(clientConfig.mp, nameThermal, tags, metaThermal, processorMetrics.TemperatureCelsius, timestamp)
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
@@ -776,11 +786,13 @@ func (r *RedfishReceiver) Close() {
 | 
			
		||||
// NewRedfishReceiver creates a new instance of the redfish receiver
 | 
			
		||||
// Initialize the receiver by giving it a name and reading in the config JSON
 | 
			
		||||
func NewRedfishReceiver(name string, config json.RawMessage) (Receiver, error) {
 | 
			
		||||
	var err error
 | 
			
		||||
	r := new(RedfishReceiver)
 | 
			
		||||
 | 
			
		||||
	// Config options from config file
 | 
			
		||||
	configJSON := struct {
 | 
			
		||||
		Type string `json:"type"`
 | 
			
		||||
		Type             string          `json:"type"`
 | 
			
		||||
		MessageProcessor json.RawMessage `json:"process_messages,omitempty"`
 | 
			
		||||
 | 
			
		||||
		// Maximum number of simultaneous redfish connections (default: 64)
 | 
			
		||||
		Fanout int `json:"fanout,omitempty"`
 | 
			
		||||
@@ -820,7 +832,8 @@ func NewRedfishReceiver(name string, config json.RawMessage) (Receiver, error) {
 | 
			
		||||
			DisableThermalMetrics   bool `json:"disable_thermal_metrics"`
 | 
			
		||||
 | 
			
		||||
			// Per client excluded metrics
 | 
			
		||||
			ExcludeMetrics []string `json:"exclude_metrics,omitempty"`
 | 
			
		||||
			ExcludeMetrics   []string        `json:"exclude_metrics,omitempty"`
 | 
			
		||||
			MessageProcessor json.RawMessage `json:"process_messages,omitempty"`
 | 
			
		||||
		} `json:"client_config"`
 | 
			
		||||
	}{
 | 
			
		||||
		// Set defaults values
 | 
			
		||||
@@ -846,13 +859,24 @@ func NewRedfishReceiver(name string, config json.RawMessage) (Receiver, error) {
 | 
			
		||||
			return nil, err
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	p, err := mp.NewMessageProcessor()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, fmt.Errorf("initialization of message processor failed: %v", err.Error())
 | 
			
		||||
	}
 | 
			
		||||
	r.mp = p
 | 
			
		||||
	if len(r.config.MessageProcessor) > 0 {
 | 
			
		||||
		err = r.mp.FromConfigJSON(r.config.MessageProcessor)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return nil, fmt.Errorf("failed parsing JSON for message processor: %v", err.Error())
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Convert interval string representation to duration
 | 
			
		||||
	var err error
 | 
			
		||||
 | 
			
		||||
	r.config.Interval, err = time.ParseDuration(configJSON.IntervalString)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		err := fmt.Errorf(
 | 
			
		||||
			"Failed to parse duration string interval='%s': %w",
 | 
			
		||||
			"failed to parse duration string interval='%s': %w",
 | 
			
		||||
			configJSON.IntervalString,
 | 
			
		||||
			err,
 | 
			
		||||
		)
 | 
			
		||||
@@ -864,7 +888,7 @@ func NewRedfishReceiver(name string, config json.RawMessage) (Receiver, error) {
 | 
			
		||||
	r.config.HttpTimeout, err = time.ParseDuration(configJSON.HttpTimeoutString)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		err := fmt.Errorf(
 | 
			
		||||
			"Failed to parse duration string http_timeout='%s': %w",
 | 
			
		||||
			"failed to parse duration string http_timeout='%s': %w",
 | 
			
		||||
			configJSON.HttpTimeoutString,
 | 
			
		||||
			err,
 | 
			
		||||
		)
 | 
			
		||||
@@ -948,6 +972,18 @@ func NewRedfishReceiver(name string, config json.RawMessage) (Receiver, error) {
 | 
			
		||||
		for _, key := range configJSON.ExcludeMetrics {
 | 
			
		||||
			isExcluded[key] = true
 | 
			
		||||
		}
 | 
			
		||||
		p, err = mp.NewMessageProcessor()
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			cclog.ComponentError(r.name, err.Error())
 | 
			
		||||
			return nil, err
 | 
			
		||||
		}
 | 
			
		||||
		if len(clientConfigJSON.MessageProcessor) > 0 {
 | 
			
		||||
			err = p.FromConfigJSON(clientConfigJSON.MessageProcessor)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				cclog.ComponentError(r.name, err.Error())
 | 
			
		||||
				return nil, err
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		hostList, err := hostlist.Expand(clientConfigJSON.HostList)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
@@ -978,6 +1014,7 @@ func NewRedfishReceiver(name string, config json.RawMessage) (Receiver, error) {
 | 
			
		||||
						Endpoint:   endpoint,
 | 
			
		||||
						HTTPClient: httpClient,
 | 
			
		||||
					},
 | 
			
		||||
					mp: p,
 | 
			
		||||
				})
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
@@ -1002,7 +1039,7 @@ func NewRedfishReceiver(name string, config json.RawMessage) (Receiver, error) {
 | 
			
		||||
	for i := range r.config.ClientConfigs {
 | 
			
		||||
		host := r.config.ClientConfigs[i].Hostname
 | 
			
		||||
		if isDuplicate[host] {
 | 
			
		||||
			err := fmt.Errorf("Found duplicate client config for host %s", host)
 | 
			
		||||
			err := fmt.Errorf("found duplicate client config for host %s", host)
 | 
			
		||||
			cclog.ComponentError(r.name, err)
 | 
			
		||||
			return nil, err
 | 
			
		||||
		}
 | 
			
		||||
 
 | 
			
		||||
@@ -5,11 +5,13 @@ import (
 | 
			
		||||
	"fmt"
 | 
			
		||||
 | 
			
		||||
	cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
 | 
			
		||||
	mp "github.com/ClusterCockpit/cc-metric-collector/pkg/messageProcessor"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// SampleReceiver configuration: receiver type, listen address, port
 | 
			
		||||
// The defaultReceiverConfig contains the keys 'type' and 'process_messages'
 | 
			
		||||
type SampleReceiverConfig struct {
 | 
			
		||||
	Type string `json:"type"`
 | 
			
		||||
	defaultReceiverConfig
 | 
			
		||||
	Addr string `json:"address"`
 | 
			
		||||
	Port string `json:"port"`
 | 
			
		||||
}
 | 
			
		||||
@@ -19,7 +21,6 @@ type SampleReceiver struct {
 | 
			
		||||
	config SampleReceiverConfig
 | 
			
		||||
 | 
			
		||||
	// Storage for static information
 | 
			
		||||
	meta map[string]string
 | 
			
		||||
	// Use in case of own go routine
 | 
			
		||||
	// done chan bool
 | 
			
		||||
	// wg   sync.WaitGroup
 | 
			
		||||
@@ -79,8 +80,19 @@ func NewSampleReceiver(name string, config json.RawMessage) (Receiver, error) {
 | 
			
		||||
	// The name should be chosen in such a way that different instances of SampleReceiver can be distinguished
 | 
			
		||||
	r.name = fmt.Sprintf("SampleReceiver(%s)", name)
 | 
			
		||||
 | 
			
		||||
	// create new message processor
 | 
			
		||||
	p, err := mp.NewMessageProcessor()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		cclog.ComponentError(r.name, "Initialization of message processor failed:", err.Error())
 | 
			
		||||
		return nil, fmt.Errorf("initialization of message processor failed: %v", err.Error())
 | 
			
		||||
	}
 | 
			
		||||
	r.mp = p
 | 
			
		||||
	// Set static information
 | 
			
		||||
	r.meta = map[string]string{"source": r.name}
 | 
			
		||||
	err = r.mp.AddAddMetaByCondition("true", "source", r.name)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		cclog.ComponentError(r.name, fmt.Sprintf("Failed to add static information source=%s:", r.name), err.Error())
 | 
			
		||||
		return nil, fmt.Errorf("failed to add static information source=%s: %v", r.name, err.Error())
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Set defaults in r.config
 | 
			
		||||
	// Allow overwriting these defaults by reading config JSON
 | 
			
		||||
@@ -94,6 +106,15 @@ func NewSampleReceiver(name string, config json.RawMessage) (Receiver, error) {
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Add message processor config
 | 
			
		||||
	if len(r.config.MessageProcessor) > 0 {
 | 
			
		||||
		err = r.mp.FromConfigJSON(r.config.MessageProcessor)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			cclog.ComponentError(r.name, "Failed parsing JSON for message processor:", err.Error())
 | 
			
		||||
			return nil, fmt.Errorf("failed parsing JSON for message processor: %v", err.Error())
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Check that all required fields in the configuration are set
 | 
			
		||||
	// Use 'if len(r.config.Option) > 0' for strings
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -10,8 +10,9 @@ import (
 | 
			
		||||
	//	"time"
 | 
			
		||||
	"os/exec"
 | 
			
		||||
 | 
			
		||||
	cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
 | 
			
		||||
	lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message"
 | 
			
		||||
	cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
 | 
			
		||||
	mp "github.com/ClusterCockpit/cc-metric-collector/pkg/messageProcessor"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
const GMETRIC_EXEC = `gmetric`
 | 
			
		||||
@@ -35,50 +36,53 @@ type GangliaSink struct {
 | 
			
		||||
	config         GangliaSinkConfig
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *GangliaSink) Write(point lp.CCMessage) error {
 | 
			
		||||
func (s *GangliaSink) Write(msg lp.CCMessage) error {
 | 
			
		||||
	var err error = nil
 | 
			
		||||
	//var tagsstr []string
 | 
			
		||||
	var argstr []string
 | 
			
		||||
 | 
			
		||||
	// Get metric config (type, value, ... in suitable format)
 | 
			
		||||
	conf := GetCommonGangliaConfig(point)
 | 
			
		||||
	if len(conf.Type) == 0 {
 | 
			
		||||
		conf = GetGangliaConfig(point)
 | 
			
		||||
	}
 | 
			
		||||
	if len(conf.Type) == 0 {
 | 
			
		||||
		return fmt.Errorf("metric %q (Ganglia name %q) has no 'value' field", point.Name(), conf.Name)
 | 
			
		||||
	}
 | 
			
		||||
	point, err := s.mp.ProcessMessage(msg)
 | 
			
		||||
	if err == nil && point != nil {
 | 
			
		||||
		// Get metric config (type, value, ... in suitable format)
 | 
			
		||||
		conf := GetCommonGangliaConfig(point)
 | 
			
		||||
		if len(conf.Type) == 0 {
 | 
			
		||||
			conf = GetGangliaConfig(point)
 | 
			
		||||
		}
 | 
			
		||||
		if len(conf.Type) == 0 {
 | 
			
		||||
			return fmt.Errorf("metric %q (Ganglia name %q) has no 'value' field", point.Name(), conf.Name)
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
	if s.config.AddGangliaGroup {
 | 
			
		||||
		argstr = append(argstr, fmt.Sprintf("--group=%s", conf.Group))
 | 
			
		||||
	}
 | 
			
		||||
	if s.config.AddUnits && len(conf.Unit) > 0 {
 | 
			
		||||
		argstr = append(argstr, fmt.Sprintf("--units=%s", conf.Unit))
 | 
			
		||||
	}
 | 
			
		||||
		if s.config.AddGangliaGroup {
 | 
			
		||||
			argstr = append(argstr, fmt.Sprintf("--group=%s", conf.Group))
 | 
			
		||||
		}
 | 
			
		||||
		if s.config.AddUnits && len(conf.Unit) > 0 {
 | 
			
		||||
			argstr = append(argstr, fmt.Sprintf("--units=%s", conf.Unit))
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
	if len(s.config.ClusterName) > 0 {
 | 
			
		||||
		argstr = append(argstr, fmt.Sprintf("--cluster=%s", s.config.ClusterName))
 | 
			
		||||
	}
 | 
			
		||||
	// if s.config.AddTagsAsDesc && len(tagsstr) > 0 {
 | 
			
		||||
	// 	argstr = append(argstr, fmt.Sprintf("--desc=%q", strings.Join(tagsstr, ",")))
 | 
			
		||||
	// }
 | 
			
		||||
	if len(s.gmetric_config) > 0 {
 | 
			
		||||
		argstr = append(argstr, fmt.Sprintf("--conf=%s", s.gmetric_config))
 | 
			
		||||
	}
 | 
			
		||||
	if s.config.AddTypeToName {
 | 
			
		||||
		argstr = append(argstr, fmt.Sprintf("--name=%s", GangliaMetricName(point)))
 | 
			
		||||
	} else {
 | 
			
		||||
		argstr = append(argstr, fmt.Sprintf("--name=%s", conf.Name))
 | 
			
		||||
	}
 | 
			
		||||
	argstr = append(argstr, fmt.Sprintf("--slope=%s", conf.Slope))
 | 
			
		||||
	argstr = append(argstr, fmt.Sprintf("--value=%s", conf.Value))
 | 
			
		||||
	argstr = append(argstr, fmt.Sprintf("--type=%s", conf.Type))
 | 
			
		||||
	argstr = append(argstr, fmt.Sprintf("--tmax=%d", conf.Tmax))
 | 
			
		||||
		if len(s.config.ClusterName) > 0 {
 | 
			
		||||
			argstr = append(argstr, fmt.Sprintf("--cluster=%s", s.config.ClusterName))
 | 
			
		||||
		}
 | 
			
		||||
		// if s.config.AddTagsAsDesc && len(tagsstr) > 0 {
 | 
			
		||||
		// 	argstr = append(argstr, fmt.Sprintf("--desc=%q", strings.Join(tagsstr, ",")))
 | 
			
		||||
		// }
 | 
			
		||||
		if len(s.gmetric_config) > 0 {
 | 
			
		||||
			argstr = append(argstr, fmt.Sprintf("--conf=%s", s.gmetric_config))
 | 
			
		||||
		}
 | 
			
		||||
		if s.config.AddTypeToName {
 | 
			
		||||
			argstr = append(argstr, fmt.Sprintf("--name=%s", GangliaMetricName(point)))
 | 
			
		||||
		} else {
 | 
			
		||||
			argstr = append(argstr, fmt.Sprintf("--name=%s", conf.Name))
 | 
			
		||||
		}
 | 
			
		||||
		argstr = append(argstr, fmt.Sprintf("--slope=%s", conf.Slope))
 | 
			
		||||
		argstr = append(argstr, fmt.Sprintf("--value=%s", conf.Value))
 | 
			
		||||
		argstr = append(argstr, fmt.Sprintf("--type=%s", conf.Type))
 | 
			
		||||
		argstr = append(argstr, fmt.Sprintf("--tmax=%d", conf.Tmax))
 | 
			
		||||
 | 
			
		||||
	cclog.ComponentDebug(s.name, s.gmetric_path, strings.Join(argstr, " "))
 | 
			
		||||
	command := exec.Command(s.gmetric_path, argstr...)
 | 
			
		||||
	command.Wait()
 | 
			
		||||
	_, err = command.Output()
 | 
			
		||||
		cclog.ComponentDebug(s.name, s.gmetric_path, strings.Join(argstr, " "))
 | 
			
		||||
		command := exec.Command(s.gmetric_path, argstr...)
 | 
			
		||||
		command.Wait()
 | 
			
		||||
		_, err = command.Output()
 | 
			
		||||
	}
 | 
			
		||||
	return err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -104,6 +108,13 @@ func NewGangliaSink(name string, config json.RawMessage) (Sink, error) {
 | 
			
		||||
	}
 | 
			
		||||
	s.gmetric_path = ""
 | 
			
		||||
	s.gmetric_config = ""
 | 
			
		||||
 | 
			
		||||
	p, err := mp.NewMessageProcessor()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, fmt.Errorf("initialization of message processor failed: %v", err.Error())
 | 
			
		||||
	}
 | 
			
		||||
	s.mp = p
 | 
			
		||||
 | 
			
		||||
	if len(s.config.GmetricPath) > 0 {
 | 
			
		||||
		p, err := exec.LookPath(s.config.GmetricPath)
 | 
			
		||||
		if err == nil {
 | 
			
		||||
@@ -122,5 +133,15 @@ func NewGangliaSink(name string, config json.RawMessage) (Sink, error) {
 | 
			
		||||
	if len(s.config.GmetricConfig) > 0 {
 | 
			
		||||
		s.gmetric_config = s.config.GmetricConfig
 | 
			
		||||
	}
 | 
			
		||||
	if len(s.config.MessageProcessor) > 0 {
 | 
			
		||||
		err = s.mp.FromConfigJSON(s.config.MessageProcessor)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return nil, fmt.Errorf("failed parsing JSON for message processor: %v", err.Error())
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	for _, k := range s.config.MetaAsTags {
 | 
			
		||||
		s.mp.AddMoveMetaToTags("true", k, k)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return s, nil
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -9,8 +9,9 @@ import (
 | 
			
		||||
	"sync"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
 | 
			
		||||
	lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message"
 | 
			
		||||
	cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
 | 
			
		||||
	mp "github.com/ClusterCockpit/cc-metric-collector/pkg/messageProcessor"
 | 
			
		||||
	influx "github.com/influxdata/line-protocol/v2/lineprotocol"
 | 
			
		||||
	"golang.org/x/exp/slices"
 | 
			
		||||
)
 | 
			
		||||
@@ -75,28 +76,20 @@ type HttpSink struct {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Write sends metric m as http message
 | 
			
		||||
func (s *HttpSink) Write(m lp.CCMessage) error {
 | 
			
		||||
func (s *HttpSink) Write(msg lp.CCMessage) error {
 | 
			
		||||
 | 
			
		||||
	// Lock for encoder usage
 | 
			
		||||
	s.encoderLock.Lock()
 | 
			
		||||
	// submit m only after applying processing/dropping rules
 | 
			
		||||
	m, err := s.mp.ProcessMessage(msg)
 | 
			
		||||
	if err == nil && m != nil {
 | 
			
		||||
		// Lock for encoder usage
 | 
			
		||||
		s.encoderLock.Lock()
 | 
			
		||||
 | 
			
		||||
	// Encode measurement name
 | 
			
		||||
	s.encoder.StartLine(m.Name())
 | 
			
		||||
		// Encode measurement name
 | 
			
		||||
		s.encoder.StartLine(m.Name())
 | 
			
		||||
 | 
			
		||||
	// copy tags and meta data which should be used as tags
 | 
			
		||||
	s.extended_tag_list = s.extended_tag_list[:0]
 | 
			
		||||
	for key, value := range m.Tags() {
 | 
			
		||||
		s.extended_tag_list =
 | 
			
		||||
			append(
 | 
			
		||||
				s.extended_tag_list,
 | 
			
		||||
				key_value_pair{
 | 
			
		||||
					key:   key,
 | 
			
		||||
					value: value,
 | 
			
		||||
				},
 | 
			
		||||
			)
 | 
			
		||||
	}
 | 
			
		||||
	for _, key := range s.config.MetaAsTags {
 | 
			
		||||
		if value, ok := m.GetMeta(key); ok {
 | 
			
		||||
		// copy tags and meta data which should be used as tags
 | 
			
		||||
		s.extended_tag_list = s.extended_tag_list[:0]
 | 
			
		||||
		for key, value := range m.Tags() {
 | 
			
		||||
			s.extended_tag_list =
 | 
			
		||||
				append(
 | 
			
		||||
					s.extended_tag_list,
 | 
			
		||||
@@ -106,45 +99,57 @@ func (s *HttpSink) Write(m lp.CCMessage) error {
 | 
			
		||||
					},
 | 
			
		||||
				)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
		// for _, key := range s.config.MetaAsTags {
 | 
			
		||||
		// 	if value, ok := m.GetMeta(key); ok {
 | 
			
		||||
		// 		s.extended_tag_list =
 | 
			
		||||
		// 			append(
 | 
			
		||||
		// 				s.extended_tag_list,
 | 
			
		||||
		// 				key_value_pair{
 | 
			
		||||
		// 					key:   key,
 | 
			
		||||
		// 					value: value,
 | 
			
		||||
		// 				},
 | 
			
		||||
		// 			)
 | 
			
		||||
		// 	}
 | 
			
		||||
		// }
 | 
			
		||||
 | 
			
		||||
	// Encode tags (they musts be in lexical order)
 | 
			
		||||
	slices.SortFunc(
 | 
			
		||||
		s.extended_tag_list,
 | 
			
		||||
		func(a key_value_pair, b key_value_pair) int {
 | 
			
		||||
			if a.key < b.key {
 | 
			
		||||
				return -1
 | 
			
		||||
			}
 | 
			
		||||
			if a.key > b.key {
 | 
			
		||||
				return +1
 | 
			
		||||
			}
 | 
			
		||||
			return 0
 | 
			
		||||
		},
 | 
			
		||||
	)
 | 
			
		||||
	for i := range s.extended_tag_list {
 | 
			
		||||
		s.encoder.AddTag(
 | 
			
		||||
			s.extended_tag_list[i].key,
 | 
			
		||||
			s.extended_tag_list[i].value,
 | 
			
		||||
		// Encode tags (they musts be in lexical order)
 | 
			
		||||
		slices.SortFunc(
 | 
			
		||||
			s.extended_tag_list,
 | 
			
		||||
			func(a key_value_pair, b key_value_pair) int {
 | 
			
		||||
				if a.key < b.key {
 | 
			
		||||
					return -1
 | 
			
		||||
				}
 | 
			
		||||
				if a.key > b.key {
 | 
			
		||||
					return +1
 | 
			
		||||
				}
 | 
			
		||||
				return 0
 | 
			
		||||
			},
 | 
			
		||||
		)
 | 
			
		||||
	}
 | 
			
		||||
		for i := range s.extended_tag_list {
 | 
			
		||||
			s.encoder.AddTag(
 | 
			
		||||
				s.extended_tag_list[i].key,
 | 
			
		||||
				s.extended_tag_list[i].value,
 | 
			
		||||
			)
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
	// Encode fields
 | 
			
		||||
	for key, value := range m.Fields() {
 | 
			
		||||
		s.encoder.AddField(key, influx.MustNewValue(value))
 | 
			
		||||
	}
 | 
			
		||||
		// Encode fields
 | 
			
		||||
		for key, value := range m.Fields() {
 | 
			
		||||
			s.encoder.AddField(key, influx.MustNewValue(value))
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
	// Encode time stamp
 | 
			
		||||
	s.encoder.EndLine(m.Time())
 | 
			
		||||
		// Encode time stamp
 | 
			
		||||
		s.encoder.EndLine(m.Time())
 | 
			
		||||
 | 
			
		||||
	// Check for encoder errors
 | 
			
		||||
	err := s.encoder.Err()
 | 
			
		||||
		// Check for encoder errors
 | 
			
		||||
		err := s.encoder.Err()
 | 
			
		||||
 | 
			
		||||
	// Unlock encoder usage
 | 
			
		||||
	s.encoderLock.Unlock()
 | 
			
		||||
		// Unlock encoder usage
 | 
			
		||||
		s.encoderLock.Unlock()
 | 
			
		||||
 | 
			
		||||
	// Check that encoding worked
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return fmt.Errorf("encoding failed: %v", err)
 | 
			
		||||
		// Check that encoding worked
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return fmt.Errorf("encoding failed: %v", err)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if s.config.flushDelay == 0 {
 | 
			
		||||
@@ -297,6 +302,11 @@ func NewHttpSink(name string, config json.RawMessage) (Sink, error) {
 | 
			
		||||
	if s.config.useBasicAuth && len(s.config.Password) == 0 {
 | 
			
		||||
		return nil, errors.New("basic authentication requires password")
 | 
			
		||||
	}
 | 
			
		||||
	p, err := mp.NewMessageProcessor()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, fmt.Errorf("initialization of message processor failed: %v", err.Error())
 | 
			
		||||
	}
 | 
			
		||||
	s.mp = p
 | 
			
		||||
 | 
			
		||||
	if len(s.config.IdleConnTimeout) > 0 {
 | 
			
		||||
		t, err := time.ParseDuration(s.config.IdleConnTimeout)
 | 
			
		||||
@@ -319,6 +329,16 @@ func NewHttpSink(name string, config json.RawMessage) (Sink, error) {
 | 
			
		||||
			cclog.ComponentDebug(s.name, "Init(): flushDelay", t)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	if len(s.config.MessageProcessor) > 0 {
 | 
			
		||||
		err = p.FromConfigJSON(s.config.MessageProcessor)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return nil, fmt.Errorf("failed parsing JSON for message processor: %v", err.Error())
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	for _, k := range s.config.MetaAsTags {
 | 
			
		||||
		s.mp.AddMoveMetaToTags("true", k, k)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	precision := influx.Nanosecond
 | 
			
		||||
	if len(s.config.Precision) > 0 {
 | 
			
		||||
		switch s.config.Precision {
 | 
			
		||||
 
 | 
			
		||||
@@ -10,8 +10,9 @@ import (
 | 
			
		||||
	"strings"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
 | 
			
		||||
	lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message"
 | 
			
		||||
	cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
 | 
			
		||||
	mp "github.com/ClusterCockpit/cc-metric-collector/pkg/messageProcessor"
 | 
			
		||||
	influxdb2 "github.com/influxdata/influxdb-client-go/v2"
 | 
			
		||||
	influxdb2Api "github.com/influxdata/influxdb-client-go/v2/api"
 | 
			
		||||
	influxdb2ApiHttp "github.com/influxdata/influxdb-client-go/v2/api/http"
 | 
			
		||||
@@ -121,9 +122,10 @@ func (s *InfluxAsyncSink) Write(m lp.CCMessage) error {
 | 
			
		||||
			}
 | 
			
		||||
		})
 | 
			
		||||
	}
 | 
			
		||||
	s.writeApi.WritePoint(
 | 
			
		||||
		m.ToPoint(s.meta_as_tags),
 | 
			
		||||
	)
 | 
			
		||||
	msg, err := s.mp.ProcessMessage(m)
 | 
			
		||||
	if err == nil && msg != nil {
 | 
			
		||||
		s.writeApi.WritePoint(msg.ToPoint(nil))
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -200,10 +202,24 @@ func NewInfluxAsyncSink(name string, config json.RawMessage) (Sink, error) {
 | 
			
		||||
	if len(s.config.Password) == 0 {
 | 
			
		||||
		return nil, errors.New("missing password configuration required by InfluxSink")
 | 
			
		||||
	}
 | 
			
		||||
	p, err := mp.NewMessageProcessor()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, fmt.Errorf("initialization of message processor failed: %v", err.Error())
 | 
			
		||||
	}
 | 
			
		||||
	s.mp = p
 | 
			
		||||
	if len(s.config.MessageProcessor) > 0 {
 | 
			
		||||
		err = s.mp.FromConfigJSON(s.config.MessageProcessor)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return nil, fmt.Errorf("failed parsing JSON for message processor: %v", err.Error())
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	// Create lookup map to use meta infos as tags in the output metric
 | 
			
		||||
	s.meta_as_tags = make(map[string]bool)
 | 
			
		||||
	// s.meta_as_tags = make(map[string]bool)
 | 
			
		||||
	// for _, k := range s.config.MetaAsTags {
 | 
			
		||||
	// 	s.meta_as_tags[k] = true
 | 
			
		||||
	// }
 | 
			
		||||
	for _, k := range s.config.MetaAsTags {
 | 
			
		||||
		s.meta_as_tags[k] = true
 | 
			
		||||
		s.mp.AddMoveMetaToTags("true", k, k)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	toUint := func(duration string, def uint) uint {
 | 
			
		||||
 
 | 
			
		||||
@@ -10,8 +10,9 @@ import (
 | 
			
		||||
	"sync"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
 | 
			
		||||
	lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message"
 | 
			
		||||
	cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
 | 
			
		||||
	mp "github.com/ClusterCockpit/cc-metric-collector/pkg/messageProcessor"
 | 
			
		||||
	influxdb2 "github.com/influxdata/influxdb-client-go/v2"
 | 
			
		||||
	influxdb2Api "github.com/influxdata/influxdb-client-go/v2/api"
 | 
			
		||||
	influx "github.com/influxdata/line-protocol/v2/lineprotocol"
 | 
			
		||||
@@ -224,28 +225,19 @@ func (s *InfluxSink) connect() error {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Write sends metric m in influxDB line protocol
 | 
			
		||||
func (s *InfluxSink) Write(m lp.CCMessage) error {
 | 
			
		||||
func (s *InfluxSink) Write(msg lp.CCMessage) error {
 | 
			
		||||
 | 
			
		||||
	// Lock for encoder usage
 | 
			
		||||
	s.encoderLock.Lock()
 | 
			
		||||
	m, err := s.mp.ProcessMessage(msg)
 | 
			
		||||
	if err == nil && m != nil {
 | 
			
		||||
		// Lock for encoder usage
 | 
			
		||||
		s.encoderLock.Lock()
 | 
			
		||||
 | 
			
		||||
	// Encode measurement name
 | 
			
		||||
	s.encoder.StartLine(m.Name())
 | 
			
		||||
		// Encode measurement name
 | 
			
		||||
		s.encoder.StartLine(m.Name())
 | 
			
		||||
 | 
			
		||||
	// copy tags and meta data which should be used as tags
 | 
			
		||||
	s.extended_tag_list = s.extended_tag_list[:0]
 | 
			
		||||
	for key, value := range m.Tags() {
 | 
			
		||||
		s.extended_tag_list =
 | 
			
		||||
			append(
 | 
			
		||||
				s.extended_tag_list,
 | 
			
		||||
				key_value_pair{
 | 
			
		||||
					key:   key,
 | 
			
		||||
					value: value,
 | 
			
		||||
				},
 | 
			
		||||
			)
 | 
			
		||||
	}
 | 
			
		||||
	for _, key := range s.config.MetaAsTags {
 | 
			
		||||
		if value, ok := m.GetMeta(key); ok {
 | 
			
		||||
		// copy tags and meta data which should be used as tags
 | 
			
		||||
		s.extended_tag_list = s.extended_tag_list[:0]
 | 
			
		||||
		for key, value := range m.Tags() {
 | 
			
		||||
			s.extended_tag_list =
 | 
			
		||||
				append(
 | 
			
		||||
					s.extended_tag_list,
 | 
			
		||||
@@ -255,45 +247,57 @@ func (s *InfluxSink) Write(m lp.CCMessage) error {
 | 
			
		||||
					},
 | 
			
		||||
				)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
		// for _, key := range s.config.MetaAsTags {
 | 
			
		||||
		// 	if value, ok := m.GetMeta(key); ok {
 | 
			
		||||
		// 		s.extended_tag_list =
 | 
			
		||||
		// 			append(
 | 
			
		||||
		// 				s.extended_tag_list,
 | 
			
		||||
		// 				key_value_pair{
 | 
			
		||||
		// 					key:   key,
 | 
			
		||||
		// 					value: value,
 | 
			
		||||
		// 				},
 | 
			
		||||
		// 			)
 | 
			
		||||
		// 	}
 | 
			
		||||
		// }
 | 
			
		||||
 | 
			
		||||
	// Encode tags (they musts be in lexical order)
 | 
			
		||||
	slices.SortFunc(
 | 
			
		||||
		s.extended_tag_list,
 | 
			
		||||
		func(a key_value_pair, b key_value_pair) int {
 | 
			
		||||
			if a.key < b.key {
 | 
			
		||||
				return -1
 | 
			
		||||
			}
 | 
			
		||||
			if a.key > b.key {
 | 
			
		||||
				return +1
 | 
			
		||||
			}
 | 
			
		||||
			return 0
 | 
			
		||||
		},
 | 
			
		||||
	)
 | 
			
		||||
	for i := range s.extended_tag_list {
 | 
			
		||||
		s.encoder.AddTag(
 | 
			
		||||
			s.extended_tag_list[i].key,
 | 
			
		||||
			s.extended_tag_list[i].value,
 | 
			
		||||
		// Encode tags (they musts be in lexical order)
 | 
			
		||||
		slices.SortFunc(
 | 
			
		||||
			s.extended_tag_list,
 | 
			
		||||
			func(a key_value_pair, b key_value_pair) int {
 | 
			
		||||
				if a.key < b.key {
 | 
			
		||||
					return -1
 | 
			
		||||
				}
 | 
			
		||||
				if a.key > b.key {
 | 
			
		||||
					return +1
 | 
			
		||||
				}
 | 
			
		||||
				return 0
 | 
			
		||||
			},
 | 
			
		||||
		)
 | 
			
		||||
		for i := range s.extended_tag_list {
 | 
			
		||||
			s.encoder.AddTag(
 | 
			
		||||
				s.extended_tag_list[i].key,
 | 
			
		||||
				s.extended_tag_list[i].value,
 | 
			
		||||
			)
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		// Encode fields
 | 
			
		||||
		for key, value := range m.Fields() {
 | 
			
		||||
			s.encoder.AddField(key, influx.MustNewValue(value))
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		// Encode time stamp
 | 
			
		||||
		s.encoder.EndLine(m.Time())
 | 
			
		||||
 | 
			
		||||
		// Check for encoder errors
 | 
			
		||||
		if err := s.encoder.Err(); err != nil {
 | 
			
		||||
			// Unlock encoder usage
 | 
			
		||||
			s.encoderLock.Unlock()
 | 
			
		||||
 | 
			
		||||
			return fmt.Errorf("encoding failed: %v", err)
 | 
			
		||||
		}
 | 
			
		||||
		s.numRecordsInEncoder++
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Encode fields
 | 
			
		||||
	for key, value := range m.Fields() {
 | 
			
		||||
		s.encoder.AddField(key, influx.MustNewValue(value))
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Encode time stamp
 | 
			
		||||
	s.encoder.EndLine(m.Time())
 | 
			
		||||
 | 
			
		||||
	// Check for encoder errors
 | 
			
		||||
	if err := s.encoder.Err(); err != nil {
 | 
			
		||||
		// Unlock encoder usage
 | 
			
		||||
		s.encoderLock.Unlock()
 | 
			
		||||
 | 
			
		||||
		return fmt.Errorf("Encoding failed: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	s.numRecordsInEncoder++
 | 
			
		||||
 | 
			
		||||
	if s.config.flushDelay == 0 {
 | 
			
		||||
		// Unlock encoder usage
 | 
			
		||||
		s.encoderLock.Unlock()
 | 
			
		||||
@@ -443,11 +447,20 @@ func NewInfluxSink(name string, config json.RawMessage) (Sink, error) {
 | 
			
		||||
	if len(s.config.Password) == 0 {
 | 
			
		||||
		return s, errors.New("missing password configuration required by InfluxSink")
 | 
			
		||||
	}
 | 
			
		||||
	p, err := mp.NewMessageProcessor()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, fmt.Errorf("initialization of message processor failed: %v", err.Error())
 | 
			
		||||
	}
 | 
			
		||||
	s.mp = p
 | 
			
		||||
 | 
			
		||||
	// Create lookup map to use meta infos as tags in the output metric
 | 
			
		||||
	s.meta_as_tags = make(map[string]bool)
 | 
			
		||||
	if len(s.config.MessageProcessor) > 0 {
 | 
			
		||||
		err = p.FromConfigJSON(s.config.MessageProcessor)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return nil, fmt.Errorf("failed parsing JSON for message processor: %v", err.Error())
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	for _, k := range s.config.MetaAsTags {
 | 
			
		||||
		s.meta_as_tags[k] = true
 | 
			
		||||
		s.mp.AddMoveMetaToTags("true", k, k)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Configure flush delay duration
 | 
			
		||||
 
 | 
			
		||||
@@ -72,8 +72,9 @@ import (
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"unsafe"
 | 
			
		||||
 | 
			
		||||
	cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
 | 
			
		||||
	lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message"
 | 
			
		||||
	cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
 | 
			
		||||
	mp "github.com/ClusterCockpit/cc-metric-collector/pkg/messageProcessor"
 | 
			
		||||
	"github.com/NVIDIA/go-nvml/pkg/dl"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
@@ -110,99 +111,102 @@ type LibgangliaSink struct {
 | 
			
		||||
	cstrCache      map[string]*C.char
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *LibgangliaSink) Write(point lp.CCMessage) error {
 | 
			
		||||
func (s *LibgangliaSink) Write(msg lp.CCMessage) error {
 | 
			
		||||
	var err error = nil
 | 
			
		||||
	var c_name *C.char
 | 
			
		||||
	var c_value *C.char
 | 
			
		||||
	var c_type *C.char
 | 
			
		||||
	var c_unit *C.char
 | 
			
		||||
 | 
			
		||||
	// helper function for looking up C strings in the cache
 | 
			
		||||
	lookup := func(key string) *C.char {
 | 
			
		||||
		if _, exist := s.cstrCache[key]; !exist {
 | 
			
		||||
			s.cstrCache[key] = C.CString(key)
 | 
			
		||||
	point, err := s.mp.ProcessMessage(msg)
 | 
			
		||||
	if err == nil && point != nil {
 | 
			
		||||
		// helper function for looking up C strings in the cache
 | 
			
		||||
		lookup := func(key string) *C.char {
 | 
			
		||||
			if _, exist := s.cstrCache[key]; !exist {
 | 
			
		||||
				s.cstrCache[key] = C.CString(key)
 | 
			
		||||
			}
 | 
			
		||||
			return s.cstrCache[key]
 | 
			
		||||
		}
 | 
			
		||||
		return s.cstrCache[key]
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	conf := GetCommonGangliaConfig(point)
 | 
			
		||||
	if len(conf.Type) == 0 {
 | 
			
		||||
		conf = GetGangliaConfig(point)
 | 
			
		||||
	}
 | 
			
		||||
	if len(conf.Type) == 0 {
 | 
			
		||||
		return fmt.Errorf("metric %q (Ganglia name %q) has no 'value' field", point.Name(), conf.Name)
 | 
			
		||||
	}
 | 
			
		||||
		conf := GetCommonGangliaConfig(point)
 | 
			
		||||
		if len(conf.Type) == 0 {
 | 
			
		||||
			conf = GetGangliaConfig(point)
 | 
			
		||||
		}
 | 
			
		||||
		if len(conf.Type) == 0 {
 | 
			
		||||
			return fmt.Errorf("metric %q (Ganglia name %q) has no 'value' field", point.Name(), conf.Name)
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
	if s.config.AddTypeToName {
 | 
			
		||||
		conf.Name = GangliaMetricName(point)
 | 
			
		||||
	}
 | 
			
		||||
		if s.config.AddTypeToName {
 | 
			
		||||
			conf.Name = GangliaMetricName(point)
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
	c_value = C.CString(conf.Value)
 | 
			
		||||
	c_type = lookup(conf.Type)
 | 
			
		||||
	c_name = lookup(conf.Name)
 | 
			
		||||
		c_value = C.CString(conf.Value)
 | 
			
		||||
		c_type = lookup(conf.Type)
 | 
			
		||||
		c_name = lookup(conf.Name)
 | 
			
		||||
 | 
			
		||||
	// Add unit
 | 
			
		||||
	unit := ""
 | 
			
		||||
	if s.config.AddUnits {
 | 
			
		||||
		unit = conf.Unit
 | 
			
		||||
	}
 | 
			
		||||
	c_unit = lookup(unit)
 | 
			
		||||
		// Add unit
 | 
			
		||||
		unit := ""
 | 
			
		||||
		if s.config.AddUnits {
 | 
			
		||||
			unit = conf.Unit
 | 
			
		||||
		}
 | 
			
		||||
		c_unit = lookup(unit)
 | 
			
		||||
 | 
			
		||||
	// Determine the slope of the metric. Ganglia's own collector mostly use
 | 
			
		||||
	// 'both' but the mem and swap total uses 'zero'.
 | 
			
		||||
	slope_type := C.GANGLIA_SLOPE_BOTH
 | 
			
		||||
	switch conf.Slope {
 | 
			
		||||
	case "zero":
 | 
			
		||||
		slope_type = C.GANGLIA_SLOPE_ZERO
 | 
			
		||||
	case "both":
 | 
			
		||||
		slope_type = C.GANGLIA_SLOPE_BOTH
 | 
			
		||||
	}
 | 
			
		||||
		// Determine the slope of the metric. Ganglia's own collector mostly use
 | 
			
		||||
		// 'both' but the mem and swap total uses 'zero'.
 | 
			
		||||
		slope_type := C.GANGLIA_SLOPE_BOTH
 | 
			
		||||
		switch conf.Slope {
 | 
			
		||||
		case "zero":
 | 
			
		||||
			slope_type = C.GANGLIA_SLOPE_ZERO
 | 
			
		||||
		case "both":
 | 
			
		||||
			slope_type = C.GANGLIA_SLOPE_BOTH
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
	// Create a new Ganglia metric
 | 
			
		||||
	gmetric := C.Ganglia_metric_create(s.global_context)
 | 
			
		||||
	// Set name, value, type and unit in the Ganglia metric
 | 
			
		||||
	// The default slope_type is both directions, so up and down. Some metrics want 'zero' slope, probably constant.
 | 
			
		||||
	// The 'tmax' value is by default 300.
 | 
			
		||||
	rval := C.int(0)
 | 
			
		||||
	rval = C.Ganglia_metric_set(gmetric, c_name, c_value, c_type, c_unit, C.uint(slope_type), C.uint(conf.Tmax), 0)
 | 
			
		||||
	switch rval {
 | 
			
		||||
	case 1:
 | 
			
		||||
		// Create a new Ganglia metric
 | 
			
		||||
		gmetric := C.Ganglia_metric_create(s.global_context)
 | 
			
		||||
		// Set name, value, type and unit in the Ganglia metric
 | 
			
		||||
		// The default slope_type is both directions, so up and down. Some metrics want 'zero' slope, probably constant.
 | 
			
		||||
		// The 'tmax' value is by default 300.
 | 
			
		||||
		rval := C.int(0)
 | 
			
		||||
		rval = C.Ganglia_metric_set(gmetric, c_name, c_value, c_type, c_unit, C.uint(slope_type), C.uint(conf.Tmax), 0)
 | 
			
		||||
		switch rval {
 | 
			
		||||
		case 1:
 | 
			
		||||
			C.free(unsafe.Pointer(c_value))
 | 
			
		||||
			return errors.New("invalid parameters")
 | 
			
		||||
		case 2:
 | 
			
		||||
			C.free(unsafe.Pointer(c_value))
 | 
			
		||||
			return errors.New("one of your parameters has an invalid character '\"'")
 | 
			
		||||
		case 3:
 | 
			
		||||
			C.free(unsafe.Pointer(c_value))
 | 
			
		||||
			return fmt.Errorf("the type parameter \"%s\" is not a valid type", conf.Type)
 | 
			
		||||
		case 4:
 | 
			
		||||
			C.free(unsafe.Pointer(c_value))
 | 
			
		||||
			return fmt.Errorf("the value parameter \"%s\" does not represent a number", conf.Value)
 | 
			
		||||
		default:
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		// Set the cluster name, otherwise it takes it from the configuration file
 | 
			
		||||
		if len(s.config.ClusterName) > 0 {
 | 
			
		||||
			C.Ganglia_metadata_add(gmetric, lookup("CLUSTER"), lookup(s.config.ClusterName))
 | 
			
		||||
		}
 | 
			
		||||
		// Set the group metadata in the Ganglia metric if configured
 | 
			
		||||
		if s.config.AddGangliaGroup {
 | 
			
		||||
			c_group := lookup(conf.Group)
 | 
			
		||||
			C.Ganglia_metadata_add(gmetric, lookup("GROUP"), c_group)
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		// Now we send the metric
 | 
			
		||||
		// gmetric does provide some more options like description and other options
 | 
			
		||||
		// but they are not provided by the collectors
 | 
			
		||||
		rval = C.Ganglia_metric_send(gmetric, s.send_channels)
 | 
			
		||||
		if rval != 0 {
 | 
			
		||||
			err = fmt.Errorf("there was an error sending metric %s to %d of the send channels ", point.Name(), rval)
 | 
			
		||||
			// fall throuph to use Ganglia_metric_destroy from common cleanup
 | 
			
		||||
		}
 | 
			
		||||
		// Cleanup Ganglia metric
 | 
			
		||||
		C.Ganglia_metric_destroy(gmetric)
 | 
			
		||||
		// Free the value C string, the only one not stored in the cache
 | 
			
		||||
		C.free(unsafe.Pointer(c_value))
 | 
			
		||||
		return errors.New("invalid parameters")
 | 
			
		||||
	case 2:
 | 
			
		||||
		C.free(unsafe.Pointer(c_value))
 | 
			
		||||
		return errors.New("one of your parameters has an invalid character '\"'")
 | 
			
		||||
	case 3:
 | 
			
		||||
		C.free(unsafe.Pointer(c_value))
 | 
			
		||||
		return fmt.Errorf("the type parameter \"%s\" is not a valid type", conf.Type)
 | 
			
		||||
	case 4:
 | 
			
		||||
		C.free(unsafe.Pointer(c_value))
 | 
			
		||||
		return fmt.Errorf("the value parameter \"%s\" does not represent a number", conf.Value)
 | 
			
		||||
	default:
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Set the cluster name, otherwise it takes it from the configuration file
 | 
			
		||||
	if len(s.config.ClusterName) > 0 {
 | 
			
		||||
		C.Ganglia_metadata_add(gmetric, lookup("CLUSTER"), lookup(s.config.ClusterName))
 | 
			
		||||
	}
 | 
			
		||||
	// Set the group metadata in the Ganglia metric if configured
 | 
			
		||||
	if s.config.AddGangliaGroup {
 | 
			
		||||
		c_group := lookup(conf.Group)
 | 
			
		||||
		C.Ganglia_metadata_add(gmetric, lookup("GROUP"), c_group)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Now we send the metric
 | 
			
		||||
	// gmetric does provide some more options like description and other options
 | 
			
		||||
	// but they are not provided by the collectors
 | 
			
		||||
	rval = C.Ganglia_metric_send(gmetric, s.send_channels)
 | 
			
		||||
	if rval != 0 {
 | 
			
		||||
		err = fmt.Errorf("there was an error sending metric %s to %d of the send channels ", point.Name(), rval)
 | 
			
		||||
		// fall throuph to use Ganglia_metric_destroy from common cleanup
 | 
			
		||||
	}
 | 
			
		||||
	// Cleanup Ganglia metric
 | 
			
		||||
	C.Ganglia_metric_destroy(gmetric)
 | 
			
		||||
	// Free the value C string, the only one not stored in the cache
 | 
			
		||||
	C.free(unsafe.Pointer(c_value))
 | 
			
		||||
	return err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -241,6 +245,20 @@ func NewLibgangliaSink(name string, config json.RawMessage) (Sink, error) {
 | 
			
		||||
			return nil, err
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	p, err := mp.NewMessageProcessor()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, fmt.Errorf("initialization of message processor failed: %v", err.Error())
 | 
			
		||||
	}
 | 
			
		||||
	s.mp = p
 | 
			
		||||
	if len(s.config.MessageProcessor) > 0 {
 | 
			
		||||
		err = s.mp.FromConfigJSON(s.config.MessageProcessor)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return nil, fmt.Errorf("failed parsing JSON for message processor: %v", err.Error())
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	for _, k := range s.config.MetaAsTags {
 | 
			
		||||
		s.mp.AddMoveMetaToTags("true", k, k)
 | 
			
		||||
	}
 | 
			
		||||
	lib := dl.New(s.config.GangliaLib, GANGLIA_LIB_DL_FLAGS)
 | 
			
		||||
	if lib == nil {
 | 
			
		||||
		return nil, fmt.Errorf("error instantiating DynamicLibrary for %s", s.config.GangliaLib)
 | 
			
		||||
 
 | 
			
		||||
@@ -1,24 +1,29 @@
 | 
			
		||||
package sinks
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"encoding/json"
 | 
			
		||||
 | 
			
		||||
	lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message"
 | 
			
		||||
	mp "github.com/ClusterCockpit/cc-metric-collector/pkg/messageProcessor"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type defaultSinkConfig struct {
 | 
			
		||||
	MetaAsTags []string `json:"meta_as_tags,omitempty"`
 | 
			
		||||
	Type       string   `json:"type"`
 | 
			
		||||
	MetaAsTags       []string        `json:"meta_as_tags,omitempty"`
 | 
			
		||||
	MessageProcessor json.RawMessage `json:"process_messages,omitempty"`
 | 
			
		||||
	Type             string          `json:"type"`
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type sink struct {
 | 
			
		||||
	meta_as_tags map[string]bool // Use meta data tags as tags
 | 
			
		||||
	name         string          // Name of the sink
 | 
			
		||||
	meta_as_tags map[string]bool     // Use meta data tags as tags
 | 
			
		||||
	mp           mp.MessageProcessor // message processor for the sink
 | 
			
		||||
	name         string              // Name of the sink
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type Sink interface {
 | 
			
		||||
	Write(point lp.CCMessage) error // Write metric to the sink
 | 
			
		||||
	Flush() error                  // Flush buffered metrics
 | 
			
		||||
	Close()                        // Close / finish metric sink
 | 
			
		||||
	Name() string                  // Name of the metric sink
 | 
			
		||||
	Flush() error                   // Flush buffered metrics
 | 
			
		||||
	Close()                         // Close / finish metric sink
 | 
			
		||||
	Name() string                   // Name of the metric sink
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Name returns the name of the metric sink
 | 
			
		||||
 
 | 
			
		||||
@@ -8,8 +8,9 @@ import (
 | 
			
		||||
	"sync"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
 | 
			
		||||
	lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message"
 | 
			
		||||
	cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
 | 
			
		||||
	mp "github.com/ClusterCockpit/cc-metric-collector/pkg/messageProcessor"
 | 
			
		||||
	influx "github.com/influxdata/line-protocol"
 | 
			
		||||
	nats "github.com/nats-io/nats.go"
 | 
			
		||||
)
 | 
			
		||||
@@ -60,12 +61,15 @@ func (s *NatsSink) connect() error {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *NatsSink) Write(m lp.CCMessage) error {
 | 
			
		||||
	s.lock.Lock()
 | 
			
		||||
	_, err := s.encoder.Encode(m.ToPoint(s.meta_as_tags))
 | 
			
		||||
	s.lock.Unlock()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		cclog.ComponentError(s.name, "Write:", err.Error())
 | 
			
		||||
		return err
 | 
			
		||||
	msg, err := s.mp.ProcessMessage(m)
 | 
			
		||||
	if err == nil && msg != nil {
 | 
			
		||||
		s.lock.Lock()
 | 
			
		||||
		_, err := s.encoder.Encode(msg.ToPoint(nil))
 | 
			
		||||
		s.lock.Unlock()
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			cclog.ComponentError(s.name, "Write:", err.Error())
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if s.flushDelay == 0 {
 | 
			
		||||
@@ -120,11 +124,25 @@ func NewNatsSink(name string, config json.RawMessage) (Sink, error) {
 | 
			
		||||
		len(s.config.Subject) == 0 {
 | 
			
		||||
		return nil, errors.New("not all configuration variables set required by NatsSink")
 | 
			
		||||
	}
 | 
			
		||||
	// Create lookup map to use meta infos as tags in the output metric
 | 
			
		||||
	s.meta_as_tags = make(map[string]bool)
 | 
			
		||||
	for _, k := range s.config.MetaAsTags {
 | 
			
		||||
		s.meta_as_tags[k] = true
 | 
			
		||||
	p, err := mp.NewMessageProcessor()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, fmt.Errorf("initialization of message processor failed: %v", err.Error())
 | 
			
		||||
	}
 | 
			
		||||
	s.mp = p
 | 
			
		||||
	if len(s.config.MessageProcessor) > 0 {
 | 
			
		||||
		err = s.mp.FromConfigJSON(s.config.MessageProcessor)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return nil, fmt.Errorf("failed parsing JSON for message processor: %v", err.Error())
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	// Create lookup map to use meta infos as tags in the output metric
 | 
			
		||||
	for _, k := range s.config.MetaAsTags {
 | 
			
		||||
		s.mp.AddMoveMetaToTags("true", k, k)
 | 
			
		||||
	}
 | 
			
		||||
	// s.meta_as_tags = make(map[string]bool)
 | 
			
		||||
	// for _, k := range s.config.MetaAsTags {
 | 
			
		||||
	// 	s.meta_as_tags[k] = true
 | 
			
		||||
	// }
 | 
			
		||||
	// Setup Influx line protocol
 | 
			
		||||
	s.buffer = &bytes.Buffer{}
 | 
			
		||||
	s.buffer.Grow(1025)
 | 
			
		||||
 
 | 
			
		||||
@@ -10,8 +10,9 @@ import (
 | 
			
		||||
	"strings"
 | 
			
		||||
	"sync"
 | 
			
		||||
 | 
			
		||||
	cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
 | 
			
		||||
	lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message"
 | 
			
		||||
	cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
 | 
			
		||||
	mp "github.com/ClusterCockpit/cc-metric-collector/pkg/messageProcessor"
 | 
			
		||||
	"github.com/gorilla/mux"
 | 
			
		||||
	"github.com/prometheus/client_golang/prometheus"
 | 
			
		||||
	"github.com/prometheus/client_golang/prometheus/promhttp"
 | 
			
		||||
@@ -153,7 +154,11 @@ func (s *PrometheusSink) updateMetric(metric lp.CCMessage) error {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *PrometheusSink) Write(m lp.CCMessage) error {
 | 
			
		||||
	return s.updateMetric(m)
 | 
			
		||||
	msg, err := s.mp.ProcessMessage(m)
 | 
			
		||||
	if err == nil && msg != nil {
 | 
			
		||||
		err = s.updateMetric(m)
 | 
			
		||||
	}
 | 
			
		||||
	return err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *PrometheusSink) Flush() error {
 | 
			
		||||
@@ -182,6 +187,20 @@ func NewPrometheusSink(name string, config json.RawMessage) (Sink, error) {
 | 
			
		||||
		cclog.ComponentError(s.name, err.Error())
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	p, err := mp.NewMessageProcessor()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, fmt.Errorf("initialization of message processor failed: %v", err.Error())
 | 
			
		||||
	}
 | 
			
		||||
	s.mp = p
 | 
			
		||||
	if len(s.config.MessageProcessor) > 0 {
 | 
			
		||||
		err = p.FromConfigJSON(s.config.MessageProcessor)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return nil, fmt.Errorf("failed parsing JSON for message processor: %v", err.Error())
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	for _, k := range s.config.MetaAsTags {
 | 
			
		||||
		s.mp.AddMoveMetaToTags("true", k, k)
 | 
			
		||||
	}
 | 
			
		||||
	s.labelMetrics = make(map[string]*prometheus.GaugeVec)
 | 
			
		||||
	s.nodeMetrics = make(map[string]prometheus.Gauge)
 | 
			
		||||
	s.promWg.Add(1)
 | 
			
		||||
 
 | 
			
		||||
@@ -6,8 +6,9 @@ import (
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"log"
 | 
			
		||||
 | 
			
		||||
	cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
 | 
			
		||||
	lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message"
 | 
			
		||||
	cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
 | 
			
		||||
	mp "github.com/ClusterCockpit/cc-metric-collector/pkg/messageProcessor"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type SampleSinkConfig struct {
 | 
			
		||||
@@ -30,7 +31,12 @@ type SampleSink struct {
 | 
			
		||||
// Code to submit a single CCMetric to the sink
 | 
			
		||||
func (s *SampleSink) Write(point lp.CCMessage) error {
 | 
			
		||||
	// based on s.meta_as_tags use meta infos as tags
 | 
			
		||||
	log.Print(point)
 | 
			
		||||
	// moreover, submit the point to the message processor
 | 
			
		||||
	// to apply drop/modify rules
 | 
			
		||||
	msg, err := s.mp.ProcessMessage(point)
 | 
			
		||||
	if err == nil && msg != nil {
 | 
			
		||||
		log.Print(msg)
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -66,10 +72,24 @@ func NewSampleSink(name string, config json.RawMessage) (Sink, error) {
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Create lookup map to use meta infos as tags in the output metric
 | 
			
		||||
	s.meta_as_tags = make(map[string]bool)
 | 
			
		||||
	// Initialize and configure the message processor
 | 
			
		||||
	p, err := mp.NewMessageProcessor()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, fmt.Errorf("initialization of message processor failed: %v", err.Error())
 | 
			
		||||
	}
 | 
			
		||||
	s.mp = p
 | 
			
		||||
 | 
			
		||||
	// Add message processor configuration
 | 
			
		||||
	if len(s.config.MessageProcessor) > 0 {
 | 
			
		||||
		err = p.FromConfigJSON(s.config.MessageProcessor)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return nil, fmt.Errorf("failed parsing JSON for message processor: %v", err.Error())
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	// Add rules to move meta information to tag space
 | 
			
		||||
	// Replacing the legacy 'meta_as_tags' configuration
 | 
			
		||||
	for _, k := range s.config.MetaAsTags {
 | 
			
		||||
		s.meta_as_tags[k] = true
 | 
			
		||||
		s.mp.AddMoveMetaToTags("true", k, k)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Check if all required fields in the config are set
 | 
			
		||||
 
 | 
			
		||||
@@ -8,8 +8,9 @@ import (
 | 
			
		||||
	"strings"
 | 
			
		||||
 | 
			
		||||
	//	"time"
 | 
			
		||||
	cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
 | 
			
		||||
	lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message"
 | 
			
		||||
	cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
 | 
			
		||||
	mp "github.com/ClusterCockpit/cc-metric-collector/pkg/messageProcessor"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type StdoutSink struct {
 | 
			
		||||
@@ -22,10 +23,13 @@ type StdoutSink struct {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *StdoutSink) Write(m lp.CCMessage) error {
 | 
			
		||||
	fmt.Fprint(
 | 
			
		||||
		s.output,
 | 
			
		||||
		m.ToLineProtocol(s.meta_as_tags),
 | 
			
		||||
	)
 | 
			
		||||
	msg, err := s.mp.ProcessMessage(m)
 | 
			
		||||
	if err == nil && msg != nil {
 | 
			
		||||
		fmt.Fprint(
 | 
			
		||||
			s.output,
 | 
			
		||||
			msg.ToLineProtocol(s.meta_as_tags),
 | 
			
		||||
		)
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -41,6 +45,7 @@ func (s *StdoutSink) Close() {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func NewStdoutSink(name string, config json.RawMessage) (Sink, error) {
 | 
			
		||||
 | 
			
		||||
	s := new(StdoutSink)
 | 
			
		||||
	s.name = fmt.Sprintf("StdoutSink(%s)", name)
 | 
			
		||||
	if len(config) > 0 {
 | 
			
		||||
@@ -51,6 +56,11 @@ func NewStdoutSink(name string, config json.RawMessage) (Sink, error) {
 | 
			
		||||
			return nil, err
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	p, err := mp.NewMessageProcessor()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, fmt.Errorf("initialization of message processor failed: %v", err.Error())
 | 
			
		||||
	}
 | 
			
		||||
	s.mp = p
 | 
			
		||||
 | 
			
		||||
	s.output = os.Stdout
 | 
			
		||||
	if len(s.config.Output) > 0 {
 | 
			
		||||
@@ -67,10 +77,21 @@ func NewStdoutSink(name string, config json.RawMessage) (Sink, error) {
 | 
			
		||||
			s.output = f
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Add message processor configuration
 | 
			
		||||
	if len(s.config.MessageProcessor) > 0 {
 | 
			
		||||
		err = s.mp.FromConfigJSON(s.config.MessageProcessor)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return nil, fmt.Errorf("failed parsing JSON for message processor: %v", err.Error())
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	// Create lookup map to use meta infos as tags in the output metric
 | 
			
		||||
	s.meta_as_tags = make(map[string]bool)
 | 
			
		||||
	// s.meta_as_tags = make(map[string]bool)
 | 
			
		||||
	// for _, k := range s.config.MetaAsTags {
 | 
			
		||||
	// 	s.meta_as_tags[k] = true
 | 
			
		||||
	// }
 | 
			
		||||
	for _, k := range s.config.MetaAsTags {
 | 
			
		||||
		s.meta_as_tags[k] = true
 | 
			
		||||
		s.mp.AddMoveMetaToTags("true", k, k)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return s, nil
 | 
			
		||||
 
 | 
			
		||||
@@ -10,7 +10,10 @@ The `stdout` sink is the most simple sink provided by cc-metric-collector. It wr
 | 
			
		||||
  "<name>": {
 | 
			
		||||
    "type": "stdout",
 | 
			
		||||
    "meta_as_tags" : [],
 | 
			
		||||
    "output_file" : "mylogfile.log"
 | 
			
		||||
    "output_file" : "mylogfile.log",
 | 
			
		||||
    "process_messages" : {
 | 
			
		||||
      "see" : "docs of message processor for valid fields"
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
}
 | 
			
		||||
```
 | 
			
		||||
@@ -18,5 +21,6 @@ The `stdout` sink is the most simple sink provided by cc-metric-collector. It wr
 | 
			
		||||
- `type`: makes the sink an `stdout` sink
 | 
			
		||||
- `meta_as_tags`: print meta information as tags in the output (optional)
 | 
			
		||||
- `output_file`: Write all data to the selected file (optional). There are two 'special' files: `stdout` and `stderr`. If this option is not provided, the default value is `stdout`
 | 
			
		||||
- `process_messages`: Process messages with given rules before progressing or dropping
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user