From 14ca9256221cf8a2beb810b216a08baae4c068f3 Mon Sep 17 00:00:00 2001 From: Thomas Roehl Date: Wed, 11 Dec 2024 20:53:22 +0100 Subject: [PATCH] Use message processor in router, all sinks and all receivers --- internal/metricRouter/metricRouter.go | 263 ++++++++++++++++---------- receivers/httpReceiver.go | 135 +++++++------ receivers/ipmiReceiver.go | 53 +++++- receivers/metricReceiver.go | 11 +- receivers/natsReceiver.go | 126 ++++++------ receivers/redfishReceiver.go | 71 +++++-- receivers/sampleReceiver.go | 27 ++- sinks/gangliaSink.go | 97 ++++++---- sinks/httpSink.go | 124 +++++++----- sinks/influxAsyncSink.go | 28 ++- sinks/influxSink.go | 129 +++++++------ sinks/libgangliaSink.go | 176 +++++++++-------- sinks/metricSink.go | 19 +- sinks/natsSink.go | 40 ++-- sinks/prometheusSink.go | 23 ++- sinks/sampleSink.go | 30 ++- sinks/stdoutSink.go | 35 +++- sinks/stdoutSink.md | 6 +- 18 files changed, 880 insertions(+), 513 deletions(-) diff --git a/internal/metricRouter/metricRouter.go b/internal/metricRouter/metricRouter.go index 2ad26d9..a3b41c4 100644 --- a/internal/metricRouter/metricRouter.go +++ b/internal/metricRouter/metricRouter.go @@ -2,6 +2,7 @@ package metricRouter import ( "encoding/json" + "fmt" "os" "strings" "sync" @@ -9,10 +10,10 @@ import ( cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" - agg "github.com/ClusterCockpit/cc-metric-collector/internal/metricAggregator" lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message" + agg "github.com/ClusterCockpit/cc-metric-collector/internal/metricAggregator" + mp "github.com/ClusterCockpit/cc-metric-collector/pkg/messageProcessor" mct "github.com/ClusterCockpit/cc-metric-collector/pkg/multiChanTicker" - units "github.com/ClusterCockpit/cc-units" ) const ROUTER_MAX_FORWARD = 50 @@ -38,16 +39,17 @@ type metricRouterConfig struct { MaxForward int `json:"max_forward"` // Number of maximal forwarded metrics at one select NormalizeUnits bool `json:"normalize_units"` // Check unit meta flag and normalize it using cc-units ChangeUnitPrefix map[string]string `json:"change_unit_prefix"` // Add prefix that should be applied to the metrics - dropMetrics map[string]bool // Internal map for O(1) lookup + // dropMetrics map[string]bool // Internal map for O(1) lookup + MessageProcessor json.RawMessage `json:"process_message,omitempty"` } // Metric router data structure type metricRouter struct { hostname string // Hostname used in tags - coll_input chan lp.CCMessage // Input channel from CollectorManager - recv_input chan lp.CCMessage // Input channel from ReceiveManager - cache_input chan lp.CCMessage // Input channel from MetricCache - outputs []chan lp.CCMessage // List of all output channels + coll_input chan lp.CCMessage // Input channel from CollectorManager + recv_input chan lp.CCMessage // Input channel from ReceiveManager + cache_input chan lp.CCMessage // Input channel from MetricCache + outputs []chan lp.CCMessage // List of all output channels done chan bool // channel to finish / stop metric router wg *sync.WaitGroup // wait group for all goroutines in cc-metric-collector timestamp time.Time // timestamp periodically updated by ticker each interval @@ -56,6 +58,7 @@ type metricRouter struct { cache MetricCache // pointer to MetricCache cachewg sync.WaitGroup // wait group for MetricCache maxForward int // number of metrics to forward maximally in one iteration + mp mp.MessageProcessor } // MetricRouter access functions @@ -119,10 +122,52 @@ func (r *metricRouter) Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, rout r.cache.AddAggregation(agg.Name, agg.Function, agg.Condition, agg.Tags, agg.Meta) } } - r.config.dropMetrics = make(map[string]bool) - for _, mname := range r.config.DropMetrics { - r.config.dropMetrics[mname] = true + p, err := mp.NewMessageProcessor() + if err != nil { + return fmt.Errorf("initialization of message processor failed: %v", err.Error()) } + r.mp = p + + if len(r.config.MessageProcessor) > 0 { + err = r.mp.FromConfigJSON(r.config.MessageProcessor) + if err != nil { + return fmt.Errorf("failed parsing JSON for message processor: %v", err.Error()) + } + } + for _, mname := range r.config.DropMetrics { + r.mp.AddDropMessagesByName(mname) + } + for _, cond := range r.config.DropMetricsIf { + r.mp.AddDropMessagesByCondition(cond) + } + for _, data := range r.config.AddTags { + cond := data.Condition + if cond == "*" { + cond = "true" + } + r.mp.AddAddTagsByCondition(cond, data.Key, data.Value) + } + for _, data := range r.config.DelTags { + cond := data.Condition + if cond == "*" { + cond = "true" + } + r.mp.AddDeleteTagsByCondition(cond, data.Key, data.Value) + } + for oldname, newname := range r.config.RenameMetrics { + r.mp.AddRenameMetricByName(oldname, newname) + } + for metricName, prefix := range r.config.ChangeUnitPrefix { + r.mp.AddChangeUnitPrefix(fmt.Sprintf("name == '%s'", metricName), prefix) + } + r.mp.SetNormalizeUnits(r.config.NormalizeUnits) + + r.mp.AddAddTagsByCondition("true", r.config.HostnameTagName, hostname) + + // r.config.dropMetrics = make(map[string]bool) + // for _, mname := range r.config.DropMetrics { + // r.config.dropMetrics[mname] = true + // } return nil } @@ -166,81 +211,81 @@ func (r *metricRouter) DoAddTags(point lp.CCMessage) { } // DoDelTags removes a tag when condition is fullfiled -func (r *metricRouter) DoDelTags(point lp.CCMessage) { - var conditionMatches bool - for _, m := range r.config.DelTags { - if m.Condition == "*" { - // Condition is always matched - conditionMatches = true - } else { - // Evaluate condition - var err error - conditionMatches, err = agg.EvalBoolCondition(m.Condition, getParamMap(point)) - if err != nil { - cclog.ComponentError("MetricRouter", err.Error()) - conditionMatches = false - } - } - if conditionMatches { - point.RemoveTag(m.Key) - } - } -} +// func (r *metricRouter) DoDelTags(point lp.CCMessage) { +// var conditionMatches bool +// for _, m := range r.config.DelTags { +// if m.Condition == "*" { +// // Condition is always matched +// conditionMatches = true +// } else { +// // Evaluate condition +// var err error +// conditionMatches, err = agg.EvalBoolCondition(m.Condition, getParamMap(point)) +// if err != nil { +// cclog.ComponentError("MetricRouter", err.Error()) +// conditionMatches = false +// } +// } +// if conditionMatches { +// point.RemoveTag(m.Key) +// } +// } +// } // Conditional test whether a metric should be dropped -func (r *metricRouter) dropMetric(point lp.CCMessage) bool { - // Simple drop check - if conditionMatches, ok := r.config.dropMetrics[point.Name()]; ok { - return conditionMatches - } +// func (r *metricRouter) dropMetric(point lp.CCMessage) bool { +// // Simple drop check +// if conditionMatches, ok := r.config.dropMetrics[point.Name()]; ok { +// return conditionMatches +// } - // Checking the dropping conditions - for _, m := range r.config.DropMetricsIf { - conditionMatches, err := agg.EvalBoolCondition(m, getParamMap(point)) - if err != nil { - cclog.ComponentError("MetricRouter", err.Error()) - conditionMatches = false - } - if conditionMatches { - return conditionMatches - } - } +// // Checking the dropping conditions +// for _, m := range r.config.DropMetricsIf { +// conditionMatches, err := agg.EvalBoolCondition(m, getParamMap(point)) +// if err != nil { +// cclog.ComponentError("MetricRouter", err.Error()) +// conditionMatches = false +// } +// if conditionMatches { +// return conditionMatches +// } +// } - // No dropping condition met - return false -} +// // No dropping condition met +// return false +// } -func (r *metricRouter) prepareUnit(point lp.CCMessage) bool { - if r.config.NormalizeUnits { - if in_unit, ok := point.GetMeta("unit"); ok { - u := units.NewUnit(in_unit) - if u.Valid() { - point.AddMeta("unit", u.Short()) - } - } - } - if newP, ok := r.config.ChangeUnitPrefix[point.Name()]; ok { +// func (r *metricRouter) prepareUnit(point lp.CCMessage) bool { +// if r.config.NormalizeUnits { +// if in_unit, ok := point.GetMeta("unit"); ok { +// u := units.NewUnit(in_unit) +// if u.Valid() { +// point.AddMeta("unit", u.Short()) +// } +// } +// } +// if newP, ok := r.config.ChangeUnitPrefix[point.Name()]; ok { - newPrefix := units.NewPrefix(newP) +// newPrefix := units.NewPrefix(newP) - if in_unit, ok := point.GetMeta("unit"); ok && newPrefix != units.InvalidPrefix { - u := units.NewUnit(in_unit) - if u.Valid() { - cclog.ComponentDebug("MetricRouter", "Change prefix to", newP, "for metric", point.Name()) - conv, out_unit := units.GetUnitPrefixFactor(u, newPrefix) - if conv != nil && out_unit.Valid() { - if val, ok := point.GetField("value"); ok { - point.AddField("value", conv(val)) - point.AddMeta("unit", out_unit.Short()) - } - } - } +// if in_unit, ok := point.GetMeta("unit"); ok && newPrefix != units.InvalidPrefix { +// u := units.NewUnit(in_unit) +// if u.Valid() { +// cclog.ComponentDebug("MetricRouter", "Change prefix to", newP, "for metric", point.Name()) +// conv, out_unit := units.GetUnitPrefixFactor(u, newPrefix) +// if conv != nil && out_unit.Valid() { +// if val, ok := point.GetField("value"); ok { +// point.AddField("value", conv(val)) +// point.AddMeta("unit", out_unit.Short()) +// } +// } +// } - } - } +// } +// } - return true -} +// return true +// } // Start starts the metric router func (r *metricRouter) Start() { @@ -259,39 +304,47 @@ func (r *metricRouter) Start() { // Forward takes a received metric, adds or deletes tags // and forwards it to the output channels - forward := func(point lp.CCMessage) { - cclog.ComponentDebug("MetricRouter", "FORWARD", point) - r.DoAddTags(point) - r.DoDelTags(point) - name := point.Name() - if new, ok := r.config.RenameMetrics[name]; ok { - point.SetName(new) - point.AddMeta("oldname", name) - r.DoAddTags(point) - r.DoDelTags(point) - } + // forward := func(point lp.CCMessage) { + // cclog.ComponentDebug("MetricRouter", "FORWARD", point) + // r.DoAddTags(point) + // r.DoDelTags(point) + // name := point.Name() + // if new, ok := r.config.RenameMetrics[name]; ok { + // point.SetName(new) + // point.AddMeta("oldname", name) + // r.DoAddTags(point) + // r.DoDelTags(point) + // } - r.prepareUnit(point) + // r.prepareUnit(point) - for _, o := range r.outputs { - o <- point - } - } + // for _, o := range r.outputs { + // o <- point + // } + // } // Foward message received from collector channel coll_forward := func(p lp.CCMessage) { // receive from metric collector - p.AddTag(r.config.HostnameTagName, r.hostname) + //p.AddTag(r.config.HostnameTagName, r.hostname) if r.config.IntervalStamp { p.SetTime(r.timestamp) } - if !r.dropMetric(p) { - forward(p) + m, err := r.mp.ProcessMessage(p) + if err == nil && m != nil { + for _, o := range r.outputs { + o <- m + } } + // if !r.dropMetric(p) { + // for _, o := range r.outputs { + // o <- point + // } + // } // even if the metric is dropped, it is stored in the cache for // aggregations if r.config.NumCacheIntervals > 0 { - r.cache.Add(p) + r.cache.Add(m) } } @@ -301,17 +354,25 @@ func (r *metricRouter) Start() { if r.config.IntervalStamp { p.SetTime(r.timestamp) } - if !r.dropMetric(p) { - forward(p) + m, err := r.mp.ProcessMessage(p) + if err == nil && m != nil { + for _, o := range r.outputs { + o <- m + } } + // if !r.dropMetric(p) { + // forward(p) + // } } // Forward message received from cache channel cache_forward := func(p lp.CCMessage) { // receive from metric collector - if !r.dropMetric(p) { - p.AddTag(r.config.HostnameTagName, r.hostname) - forward(p) + m, err := r.mp.ProcessMessage(p) + if err == nil && m != nil { + for _, o := range r.outputs { + o <- m + } } } diff --git a/receivers/httpReceiver.go b/receivers/httpReceiver.go index e8251d7..d7965c6 100644 --- a/receivers/httpReceiver.go +++ b/receivers/httpReceiver.go @@ -10,15 +10,16 @@ import ( "sync" "time" - cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message" + cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" + mp "github.com/ClusterCockpit/cc-metric-collector/pkg/messageProcessor" influx "github.com/influxdata/line-protocol/v2/lineprotocol" ) const HTTP_RECEIVER_PORT = "8080" type HttpReceiverConfig struct { - Type string `json:"type"` + defaultReceiverConfig Addr string `json:"address"` Port string `json:"port"` Path string `json:"path"` @@ -39,7 +40,7 @@ type HttpReceiverConfig struct { type HttpReceiver struct { receiver - meta map[string]string + //meta map[string]string config HttpReceiverConfig server *http.Server wg sync.WaitGroup @@ -85,8 +86,20 @@ func (r *HttpReceiver) Init(name string, config json.RawMessage) error { if r.config.useBasicAuth && len(r.config.Password) == 0 { return errors.New("basic authentication requires password") } + msgp, err := mp.NewMessageProcessor() + if err != nil { + return fmt.Errorf("initialization of message processor failed: %v", err.Error()) + } + r.mp = msgp + if len(r.config.MessageProcessor) > 0 { + err = r.mp.FromConfigJSON(r.config.MessageProcessor) + if err != nil { + return fmt.Errorf("failed parsing JSON for message processor: %v", err.Error()) + } + } + r.mp.AddAddMetaByCondition("true", "source", r.name) - r.meta = map[string]string{"source": r.name} + //r.meta = map[string]string{"source": r.name} p := r.config.Path if !strings.HasPrefix(p, "/") { p = "/" + p @@ -137,80 +150,82 @@ func (r *HttpReceiver) ServerHttp(w http.ResponseWriter, req *http.Request) { return } } + if r.sink != nil { + d := influx.NewDecoder(req.Body) + for d.Next() { - d := influx.NewDecoder(req.Body) - for d.Next() { - - // Decode measurement name - measurement, err := d.Measurement() - if err != nil { - msg := "ServerHttp: Failed to decode measurement: " + err.Error() - cclog.ComponentError(r.name, msg) - http.Error(w, msg, http.StatusInternalServerError) - return - } - - // Decode tags - tags := make(map[string]string) - for { - key, value, err := d.NextTag() + // Decode measurement name + measurement, err := d.Measurement() if err != nil { - msg := "ServerHttp: Failed to decode tag: " + err.Error() + msg := "ServerHttp: Failed to decode measurement: " + err.Error() cclog.ComponentError(r.name, msg) http.Error(w, msg, http.StatusInternalServerError) return } - if key == nil { - break - } - tags[string(key)] = string(value) - } - // Decode fields - fields := make(map[string]interface{}) - for { - key, value, err := d.NextField() + // Decode tags + tags := make(map[string]string) + for { + key, value, err := d.NextTag() + if err != nil { + msg := "ServerHttp: Failed to decode tag: " + err.Error() + cclog.ComponentError(r.name, msg) + http.Error(w, msg, http.StatusInternalServerError) + return + } + if key == nil { + break + } + tags[string(key)] = string(value) + } + + // Decode fields + fields := make(map[string]interface{}) + for { + key, value, err := d.NextField() + if err != nil { + msg := "ServerHttp: Failed to decode field: " + err.Error() + cclog.ComponentError(r.name, msg) + http.Error(w, msg, http.StatusInternalServerError) + return + } + if key == nil { + break + } + fields[string(key)] = value.Interface() + } + + // Decode time stamp + t, err := d.Time(influx.Nanosecond, time.Time{}) if err != nil { - msg := "ServerHttp: Failed to decode field: " + err.Error() + msg := "ServerHttp: Failed to decode time stamp: " + err.Error() cclog.ComponentError(r.name, msg) http.Error(w, msg, http.StatusInternalServerError) return } - if key == nil { - break - } - fields[string(key)] = value.Interface() - } - // Decode time stamp - t, err := d.Time(influx.Nanosecond, time.Time{}) + y, _ := lp.NewMessage( + string(measurement), + tags, + nil, + fields, + t, + ) + + m, err := r.mp.ProcessMessage(y) + if err == nil && m != nil { + r.sink <- m + } + + } + // Check for IO errors + err := d.Err() if err != nil { - msg := "ServerHttp: Failed to decode time stamp: " + err.Error() + msg := "ServerHttp: Failed to decode: " + err.Error() cclog.ComponentError(r.name, msg) http.Error(w, msg, http.StatusInternalServerError) return } - - y, _ := lp.NewMessage( - string(measurement), - tags, - r.meta, - fields, - t, - ) - - if r.sink != nil { - r.sink <- y - } - } - - // Check for IO errors - err := d.Err() - if err != nil { - msg := "ServerHttp: Failed to decode: " + err.Error() - cclog.ComponentError(r.name, msg) - http.Error(w, msg, http.StatusInternalServerError) - return } w.WriteHeader(http.StatusOK) diff --git a/receivers/ipmiReceiver.go b/receivers/ipmiReceiver.go index d9598ba..9f045ee 100644 --- a/receivers/ipmiReceiver.go +++ b/receivers/ipmiReceiver.go @@ -13,9 +13,10 @@ import ( "sync" "time" - cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message" + cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" "github.com/ClusterCockpit/cc-metric-collector/pkg/hostlist" + mp "github.com/ClusterCockpit/cc-metric-collector/pkg/messageProcessor" ) type IPMIReceiverClientConfig struct { @@ -31,11 +32,13 @@ type IPMIReceiverClientConfig struct { Password string // Password to use for authentication CLIOptions []string // Additional command line options for ipmi-sensors isExcluded map[string]bool // is metric excluded + mp mp.MessageProcessor } type IPMIReceiver struct { receiver config struct { + defaultReceiverConfig Interval time.Duration // Client config for each IPMI hosts @@ -43,10 +46,11 @@ type IPMIReceiver struct { } // Storage for static information - meta map[string]string + //meta map[string]string done chan bool // channel to finish / stop IPMI receiver wg sync.WaitGroup // wait group for IPMI receiver + mp mp.MessageProcessor } // doReadMetrics reads metrics from all configure IPMI hosts. @@ -230,7 +234,14 @@ func (r *IPMIReceiver) doReadMetric() { }, time.Now()) if err == nil { - r.sink <- y + mc, err := clientConfig.mp.ProcessMessage(y) + if err == nil && mc != nil { + m, err := r.mp.ProcessMessage(mc) + if err == nil && m != nil { + r.sink <- m + } + } + } } @@ -296,11 +307,12 @@ func (r *IPMIReceiver) Close() { // NewIPMIReceiver creates a new instance of the redfish receiver // Initialize the receiver by giving it a name and reading in the config JSON func NewIPMIReceiver(name string, config json.RawMessage) (Receiver, error) { + var err error r := new(IPMIReceiver) // Config options from config file configJSON := struct { - Type string `json:"type"` + defaultReceiverConfig // How often the IPMI sensor metrics should be read and send to the sink (default: 30 s) IntervalString string `json:"interval,omitempty"` @@ -331,7 +343,8 @@ func NewIPMIReceiver(name string, config json.RawMessage) (Receiver, error) { ExcludeMetrics []string `json:"exclude_metrics,omitempty"` // Additional command line options for ipmi-sensors - CLIOptions []string `json:"cli_options,omitempty"` + CLIOptions []string `json:"cli_options,omitempty"` + MessageProcessor json.RawMessage `json:"process_messages,omitempty"` } `json:"client_config"` }{ // Set defaults values @@ -347,8 +360,15 @@ func NewIPMIReceiver(name string, config json.RawMessage) (Receiver, error) { // Create done channel r.done = make(chan bool) + p, err := mp.NewMessageProcessor() + if err != nil { + return nil, fmt.Errorf("initialization of message processor failed: %v", err.Error()) + } + r.mp = p + // Set static information - r.meta = map[string]string{"source": r.name} + //r.meta = map[string]string{"source": r.name} + r.mp.AddAddMetaByCondition("true", "source", r.name) // Read the IPMI receiver specific JSON config if len(config) > 0 { @@ -360,12 +380,18 @@ func NewIPMIReceiver(name string, config json.RawMessage) (Receiver, error) { } } + if len(r.config.MessageProcessor) > 0 { + err = r.mp.FromConfigJSON(r.config.MessageProcessor) + if err != nil { + return nil, fmt.Errorf("failed parsing JSON for message processor: %v", err.Error()) + } + } // Convert interval string representation to duration - var err error + r.config.Interval, err = time.ParseDuration(configJSON.IntervalString) if err != nil { err := fmt.Errorf( - "Failed to parse duration string interval='%s': %w", + "failed to parse duration string interval='%s': %w", configJSON.IntervalString, err, ) @@ -506,6 +532,16 @@ func NewIPMIReceiver(name string, config json.RawMessage) (Receiver, error) { for _, key := range configJSON.ExcludeMetrics { isExcluded[key] = true } + p, err := mp.NewMessageProcessor() + if err != nil { + return nil, fmt.Errorf("initialization of message processor failed: %v", err.Error()) + } + if len(clientConfigJSON.MessageProcessor) > 0 { + err = p.FromConfigJSON(clientConfigJSON.MessageProcessor) + if err != nil { + return nil, fmt.Errorf("failed parsing JSON for message processor: %v", err.Error()) + } + } r.config.ClientConfigs = append( r.config.ClientConfigs, @@ -520,6 +556,7 @@ func NewIPMIReceiver(name string, config json.RawMessage) (Receiver, error) { Password: password, CLIOptions: cliOptions, isExcluded: isExcluded, + mp: p, }) } diff --git a/receivers/metricReceiver.go b/receivers/metricReceiver.go index d7e6592..609eab7 100644 --- a/receivers/metricReceiver.go +++ b/receivers/metricReceiver.go @@ -1,11 +1,15 @@ package receivers import ( + "encoding/json" + lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message" + mp "github.com/ClusterCockpit/cc-metric-collector/pkg/messageProcessor" ) type defaultReceiverConfig struct { - Type string `json:"type"` + Type string `json:"type"` + MessageProcessor json.RawMessage `json:"process_messages,omitempty"` } // Receiver configuration: Listen address, port @@ -20,12 +24,13 @@ type ReceiverConfig struct { type receiver struct { name string sink chan lp.CCMessage + mp mp.MessageProcessor } type Receiver interface { Start() - Close() // Close / finish metric receiver - Name() string // Name of the metric receiver + Close() // Close / finish metric receiver + Name() string // Name of the metric receiver SetSink(sink chan lp.CCMessage) // Set sink channel } diff --git a/receivers/natsReceiver.go b/receivers/natsReceiver.go index abf7030..15254f5 100644 --- a/receivers/natsReceiver.go +++ b/receivers/natsReceiver.go @@ -6,14 +6,15 @@ import ( "fmt" "time" - cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message" + cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" + mp "github.com/ClusterCockpit/cc-metric-collector/pkg/messageProcessor" influx "github.com/influxdata/line-protocol/v2/lineprotocol" nats "github.com/nats-io/nats.go" ) type NatsReceiverConfig struct { - Type string `json:"type"` + defaultReceiverConfig Addr string `json:"address"` Port string `json:"port"` Subject string `json:"subject"` @@ -21,8 +22,8 @@ type NatsReceiverConfig struct { type NatsReceiver struct { receiver - nc *nats.Conn - meta map[string]string + nc *nats.Conn + //meta map[string]string config NatsReceiverConfig } @@ -36,65 +37,68 @@ func (r *NatsReceiver) Start() { // _NatsReceive receives subscribed messages from the NATS server func (r *NatsReceiver) _NatsReceive(m *nats.Msg) { - d := influx.NewDecoderWithBytes(m.Data) - for d.Next() { + if r.sink != nil { + d := influx.NewDecoderWithBytes(m.Data) + for d.Next() { - // Decode measurement name - measurement, err := d.Measurement() - if err != nil { - msg := "_NatsReceive: Failed to decode measurement: " + err.Error() - cclog.ComponentError(r.name, msg) - return - } - - // Decode tags - tags := make(map[string]string) - for { - key, value, err := d.NextTag() + // Decode measurement name + measurement, err := d.Measurement() if err != nil { - msg := "_NatsReceive: Failed to decode tag: " + err.Error() + msg := "_NatsReceive: Failed to decode measurement: " + err.Error() cclog.ComponentError(r.name, msg) return } - if key == nil { - break - } - tags[string(key)] = string(value) - } - // Decode fields - fields := make(map[string]interface{}) - for { - key, value, err := d.NextField() + // Decode tags + tags := make(map[string]string) + for { + key, value, err := d.NextTag() + if err != nil { + msg := "_NatsReceive: Failed to decode tag: " + err.Error() + cclog.ComponentError(r.name, msg) + return + } + if key == nil { + break + } + tags[string(key)] = string(value) + } + + // Decode fields + fields := make(map[string]interface{}) + for { + key, value, err := d.NextField() + if err != nil { + msg := "_NatsReceive: Failed to decode field: " + err.Error() + cclog.ComponentError(r.name, msg) + return + } + if key == nil { + break + } + fields[string(key)] = value.Interface() + } + + // Decode time stamp + t, err := d.Time(influx.Nanosecond, time.Time{}) if err != nil { - msg := "_NatsReceive: Failed to decode field: " + err.Error() + msg := "_NatsReceive: Failed to decode time: " + err.Error() cclog.ComponentError(r.name, msg) return } - if key == nil { - break + + y, _ := lp.NewMessage( + string(measurement), + tags, + nil, + fields, + t, + ) + + m, err := r.mp.ProcessMessage(y) + if err == nil && m != nil { + r.sink <- m } - fields[string(key)] = value.Interface() - } - - // Decode time stamp - t, err := d.Time(influx.Nanosecond, time.Time{}) - if err != nil { - msg := "_NatsReceive: Failed to decode time: " + err.Error() - cclog.ComponentError(r.name, msg) - return - } - - y, _ := lp.NewMessage( - string(measurement), - tags, - r.meta, - fields, - t, - ) - - if r.sink != nil { - r.sink <- y } } } @@ -127,11 +131,23 @@ func NewNatsReceiver(name string, config json.RawMessage) (Receiver, error) { len(r.config.Subject) == 0 { return nil, errors.New("not all configuration variables set required by NatsReceiver") } + p, err := mp.NewMessageProcessor() + if err != nil { + return nil, fmt.Errorf("initialization of message processor failed: %v", err.Error()) + } + r.mp = p + if len(r.config.MessageProcessor) > 0 { + err = r.mp.FromConfigJSON(r.config.MessageProcessor) + if err != nil { + return nil, fmt.Errorf("failed parsing JSON for message processor: %v", err.Error()) + } + } // Set metadata - r.meta = map[string]string{ - "source": r.name, - } + // r.meta = map[string]string{ + // "source": r.name, + // } + r.mp.AddAddMetaByCondition("true", "source", r.name) // Connect to NATS server url := fmt.Sprintf("nats://%s:%s", r.config.Addr, r.config.Port) diff --git a/receivers/redfishReceiver.go b/receivers/redfishReceiver.go index 96c873c..b237231 100644 --- a/receivers/redfishReceiver.go +++ b/receivers/redfishReceiver.go @@ -13,9 +13,10 @@ import ( "sync" "time" - cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message" + cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" "github.com/ClusterCockpit/cc-metric-collector/pkg/hostlist" + mp "github.com/ClusterCockpit/cc-metric-collector/pkg/messageProcessor" // See: https://pkg.go.dev/github.com/stmcginnis/gofish "github.com/stmcginnis/gofish" @@ -42,6 +43,8 @@ type RedfishReceiverClientConfig struct { readSensorURLs map[string][]string gofish gofish.ClientConfig + + mp mp.MessageProcessor } // RedfishReceiver configuration: @@ -49,6 +52,7 @@ type RedfishReceiver struct { receiver config struct { + defaultReceiverConfig fanout int Interval time.Duration HttpTimeout time.Duration @@ -79,13 +83,19 @@ func setMetricValue(value any) map[string]interface{} { } // sendMetric sends the metric through the sink channel -func (r *RedfishReceiver) sendMetric(name string, tags map[string]string, meta map[string]string, value any, timestamp time.Time) { +func (r *RedfishReceiver) sendMetric(mp mp.MessageProcessor, name string, tags map[string]string, meta map[string]string, value any, timestamp time.Time) { deleteEmptyTags(tags) deleteEmptyTags(meta) y, err := lp.NewMessage(name, tags, meta, setMetricValue(value), timestamp) if err == nil { - r.sink <- y + mc, err := mp.ProcessMessage(y) + if err == nil && mc != nil { + m, err := r.mp.ProcessMessage(mc) + if err == nil && m != nil { + r.sink <- m + } + } } } @@ -119,7 +129,7 @@ func (r *RedfishReceiver) readSensors( "unit": "degC", } - r.sendMetric("temperature", tags, meta, sensor.Reading, time.Now()) + r.sendMetric(clientConfig.mp, "temperature", tags, meta, sensor.Reading, time.Now()) } writeFanSpeedSensor := func(sensor *redfish.Sensor) { @@ -145,7 +155,7 @@ func (r *RedfishReceiver) readSensors( "unit": string(sensor.ReadingUnits), } - r.sendMetric("fan_speed", tags, meta, sensor.Reading, time.Now()) + r.sendMetric(clientConfig.mp, "fan_speed", tags, meta, sensor.Reading, time.Now()) } writePowerSensor := func(sensor *redfish.Sensor) { @@ -172,7 +182,7 @@ func (r *RedfishReceiver) readSensors( "unit": "watts", } - r.sendMetric("power", tags, meta, sensor.Reading, time.Now()) + r.sendMetric(clientConfig.mp, "power", tags, meta, sensor.Reading, time.Now()) } if _, ok := clientConfig.readSensorURLs[chassis.ID]; !ok { @@ -340,7 +350,7 @@ func (r *RedfishReceiver) readThermalMetrics( // ReadingCelsius shall be the current value of the temperature sensor's reading. value := temperature.ReadingCelsius - r.sendMetric("temperature", tags, meta, value, timestamp) + r.sendMetric(clientConfig.mp, "temperature", tags, meta, value, timestamp) } for _, fan := range thermal.Fans { @@ -381,7 +391,7 @@ func (r *RedfishReceiver) readThermalMetrics( "unit": string(fan.ReadingUnits), } - r.sendMetric("fan_speed", tags, meta, fan.Reading, timestamp) + r.sendMetric(clientConfig.mp, "fan_speed", tags, meta, fan.Reading, timestamp) } return nil @@ -479,7 +489,7 @@ func (r *RedfishReceiver) readPowerMetrics( } for name, value := range metrics { - r.sendMetric(name, tags, meta, value, timestamp) + r.sendMetric(clientConfig.mp, name, tags, meta, value, timestamp) } } @@ -561,7 +571,7 @@ func (r *RedfishReceiver) readProcessorMetrics( if !clientConfig.isExcluded[namePower] && // Some servers return "ConsumedPowerWatt":65535 instead of "ConsumedPowerWatt":null processorMetrics.ConsumedPowerWatt != 65535 { - r.sendMetric(namePower, tags, metaPower, processorMetrics.ConsumedPowerWatt, timestamp) + r.sendMetric(clientConfig.mp, namePower, tags, metaPower, processorMetrics.ConsumedPowerWatt, timestamp) } // Set meta data tags metaThermal := map[string]string{ @@ -573,7 +583,7 @@ func (r *RedfishReceiver) readProcessorMetrics( nameThermal := "temperature" if !clientConfig.isExcluded[nameThermal] { - r.sendMetric(nameThermal, tags, metaThermal, processorMetrics.TemperatureCelsius, timestamp) + r.sendMetric(clientConfig.mp, nameThermal, tags, metaThermal, processorMetrics.TemperatureCelsius, timestamp) } return nil } @@ -776,11 +786,13 @@ func (r *RedfishReceiver) Close() { // NewRedfishReceiver creates a new instance of the redfish receiver // Initialize the receiver by giving it a name and reading in the config JSON func NewRedfishReceiver(name string, config json.RawMessage) (Receiver, error) { + var err error r := new(RedfishReceiver) // Config options from config file configJSON := struct { - Type string `json:"type"` + Type string `json:"type"` + MessageProcessor json.RawMessage `json:"process_messages,omitempty"` // Maximum number of simultaneous redfish connections (default: 64) Fanout int `json:"fanout,omitempty"` @@ -820,7 +832,8 @@ func NewRedfishReceiver(name string, config json.RawMessage) (Receiver, error) { DisableThermalMetrics bool `json:"disable_thermal_metrics"` // Per client excluded metrics - ExcludeMetrics []string `json:"exclude_metrics,omitempty"` + ExcludeMetrics []string `json:"exclude_metrics,omitempty"` + MessageProcessor json.RawMessage `json:"process_messages,omitempty"` } `json:"client_config"` }{ // Set defaults values @@ -846,13 +859,24 @@ func NewRedfishReceiver(name string, config json.RawMessage) (Receiver, error) { return nil, err } } + p, err := mp.NewMessageProcessor() + if err != nil { + return nil, fmt.Errorf("initialization of message processor failed: %v", err.Error()) + } + r.mp = p + if len(r.config.MessageProcessor) > 0 { + err = r.mp.FromConfigJSON(r.config.MessageProcessor) + if err != nil { + return nil, fmt.Errorf("failed parsing JSON for message processor: %v", err.Error()) + } + } // Convert interval string representation to duration - var err error + r.config.Interval, err = time.ParseDuration(configJSON.IntervalString) if err != nil { err := fmt.Errorf( - "Failed to parse duration string interval='%s': %w", + "failed to parse duration string interval='%s': %w", configJSON.IntervalString, err, ) @@ -864,7 +888,7 @@ func NewRedfishReceiver(name string, config json.RawMessage) (Receiver, error) { r.config.HttpTimeout, err = time.ParseDuration(configJSON.HttpTimeoutString) if err != nil { err := fmt.Errorf( - "Failed to parse duration string http_timeout='%s': %w", + "failed to parse duration string http_timeout='%s': %w", configJSON.HttpTimeoutString, err, ) @@ -948,6 +972,18 @@ func NewRedfishReceiver(name string, config json.RawMessage) (Receiver, error) { for _, key := range configJSON.ExcludeMetrics { isExcluded[key] = true } + p, err = mp.NewMessageProcessor() + if err != nil { + cclog.ComponentError(r.name, err.Error()) + return nil, err + } + if len(clientConfigJSON.MessageProcessor) > 0 { + err = p.FromConfigJSON(clientConfigJSON.MessageProcessor) + if err != nil { + cclog.ComponentError(r.name, err.Error()) + return nil, err + } + } hostList, err := hostlist.Expand(clientConfigJSON.HostList) if err != nil { @@ -978,6 +1014,7 @@ func NewRedfishReceiver(name string, config json.RawMessage) (Receiver, error) { Endpoint: endpoint, HTTPClient: httpClient, }, + mp: p, }) } @@ -1002,7 +1039,7 @@ func NewRedfishReceiver(name string, config json.RawMessage) (Receiver, error) { for i := range r.config.ClientConfigs { host := r.config.ClientConfigs[i].Hostname if isDuplicate[host] { - err := fmt.Errorf("Found duplicate client config for host %s", host) + err := fmt.Errorf("found duplicate client config for host %s", host) cclog.ComponentError(r.name, err) return nil, err } diff --git a/receivers/sampleReceiver.go b/receivers/sampleReceiver.go index 86e68cd..8fe7e1c 100644 --- a/receivers/sampleReceiver.go +++ b/receivers/sampleReceiver.go @@ -5,11 +5,13 @@ import ( "fmt" cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" + mp "github.com/ClusterCockpit/cc-metric-collector/pkg/messageProcessor" ) // SampleReceiver configuration: receiver type, listen address, port +// The defaultReceiverConfig contains the keys 'type' and 'process_messages' type SampleReceiverConfig struct { - Type string `json:"type"` + defaultReceiverConfig Addr string `json:"address"` Port string `json:"port"` } @@ -19,7 +21,6 @@ type SampleReceiver struct { config SampleReceiverConfig // Storage for static information - meta map[string]string // Use in case of own go routine // done chan bool // wg sync.WaitGroup @@ -79,8 +80,19 @@ func NewSampleReceiver(name string, config json.RawMessage) (Receiver, error) { // The name should be chosen in such a way that different instances of SampleReceiver can be distinguished r.name = fmt.Sprintf("SampleReceiver(%s)", name) + // create new message processor + p, err := mp.NewMessageProcessor() + if err != nil { + cclog.ComponentError(r.name, "Initialization of message processor failed:", err.Error()) + return nil, fmt.Errorf("initialization of message processor failed: %v", err.Error()) + } + r.mp = p // Set static information - r.meta = map[string]string{"source": r.name} + err = r.mp.AddAddMetaByCondition("true", "source", r.name) + if err != nil { + cclog.ComponentError(r.name, fmt.Sprintf("Failed to add static information source=%s:", r.name), err.Error()) + return nil, fmt.Errorf("failed to add static information source=%s: %v", r.name, err.Error()) + } // Set defaults in r.config // Allow overwriting these defaults by reading config JSON @@ -94,6 +106,15 @@ func NewSampleReceiver(name string, config json.RawMessage) (Receiver, error) { } } + // Add message processor config + if len(r.config.MessageProcessor) > 0 { + err = r.mp.FromConfigJSON(r.config.MessageProcessor) + if err != nil { + cclog.ComponentError(r.name, "Failed parsing JSON for message processor:", err.Error()) + return nil, fmt.Errorf("failed parsing JSON for message processor: %v", err.Error()) + } + } + // Check that all required fields in the configuration are set // Use 'if len(r.config.Option) > 0' for strings diff --git a/sinks/gangliaSink.go b/sinks/gangliaSink.go index a49bc30..e716ae4 100644 --- a/sinks/gangliaSink.go +++ b/sinks/gangliaSink.go @@ -10,8 +10,9 @@ import ( // "time" "os/exec" - cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message" + cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" + mp "github.com/ClusterCockpit/cc-metric-collector/pkg/messageProcessor" ) const GMETRIC_EXEC = `gmetric` @@ -35,50 +36,53 @@ type GangliaSink struct { config GangliaSinkConfig } -func (s *GangliaSink) Write(point lp.CCMessage) error { +func (s *GangliaSink) Write(msg lp.CCMessage) error { var err error = nil //var tagsstr []string var argstr []string - // Get metric config (type, value, ... in suitable format) - conf := GetCommonGangliaConfig(point) - if len(conf.Type) == 0 { - conf = GetGangliaConfig(point) - } - if len(conf.Type) == 0 { - return fmt.Errorf("metric %q (Ganglia name %q) has no 'value' field", point.Name(), conf.Name) - } + point, err := s.mp.ProcessMessage(msg) + if err == nil && point != nil { + // Get metric config (type, value, ... in suitable format) + conf := GetCommonGangliaConfig(point) + if len(conf.Type) == 0 { + conf = GetGangliaConfig(point) + } + if len(conf.Type) == 0 { + return fmt.Errorf("metric %q (Ganglia name %q) has no 'value' field", point.Name(), conf.Name) + } - if s.config.AddGangliaGroup { - argstr = append(argstr, fmt.Sprintf("--group=%s", conf.Group)) - } - if s.config.AddUnits && len(conf.Unit) > 0 { - argstr = append(argstr, fmt.Sprintf("--units=%s", conf.Unit)) - } + if s.config.AddGangliaGroup { + argstr = append(argstr, fmt.Sprintf("--group=%s", conf.Group)) + } + if s.config.AddUnits && len(conf.Unit) > 0 { + argstr = append(argstr, fmt.Sprintf("--units=%s", conf.Unit)) + } - if len(s.config.ClusterName) > 0 { - argstr = append(argstr, fmt.Sprintf("--cluster=%s", s.config.ClusterName)) - } - // if s.config.AddTagsAsDesc && len(tagsstr) > 0 { - // argstr = append(argstr, fmt.Sprintf("--desc=%q", strings.Join(tagsstr, ","))) - // } - if len(s.gmetric_config) > 0 { - argstr = append(argstr, fmt.Sprintf("--conf=%s", s.gmetric_config)) - } - if s.config.AddTypeToName { - argstr = append(argstr, fmt.Sprintf("--name=%s", GangliaMetricName(point))) - } else { - argstr = append(argstr, fmt.Sprintf("--name=%s", conf.Name)) - } - argstr = append(argstr, fmt.Sprintf("--slope=%s", conf.Slope)) - argstr = append(argstr, fmt.Sprintf("--value=%s", conf.Value)) - argstr = append(argstr, fmt.Sprintf("--type=%s", conf.Type)) - argstr = append(argstr, fmt.Sprintf("--tmax=%d", conf.Tmax)) + if len(s.config.ClusterName) > 0 { + argstr = append(argstr, fmt.Sprintf("--cluster=%s", s.config.ClusterName)) + } + // if s.config.AddTagsAsDesc && len(tagsstr) > 0 { + // argstr = append(argstr, fmt.Sprintf("--desc=%q", strings.Join(tagsstr, ","))) + // } + if len(s.gmetric_config) > 0 { + argstr = append(argstr, fmt.Sprintf("--conf=%s", s.gmetric_config)) + } + if s.config.AddTypeToName { + argstr = append(argstr, fmt.Sprintf("--name=%s", GangliaMetricName(point))) + } else { + argstr = append(argstr, fmt.Sprintf("--name=%s", conf.Name)) + } + argstr = append(argstr, fmt.Sprintf("--slope=%s", conf.Slope)) + argstr = append(argstr, fmt.Sprintf("--value=%s", conf.Value)) + argstr = append(argstr, fmt.Sprintf("--type=%s", conf.Type)) + argstr = append(argstr, fmt.Sprintf("--tmax=%d", conf.Tmax)) - cclog.ComponentDebug(s.name, s.gmetric_path, strings.Join(argstr, " ")) - command := exec.Command(s.gmetric_path, argstr...) - command.Wait() - _, err = command.Output() + cclog.ComponentDebug(s.name, s.gmetric_path, strings.Join(argstr, " ")) + command := exec.Command(s.gmetric_path, argstr...) + command.Wait() + _, err = command.Output() + } return err } @@ -104,6 +108,13 @@ func NewGangliaSink(name string, config json.RawMessage) (Sink, error) { } s.gmetric_path = "" s.gmetric_config = "" + + p, err := mp.NewMessageProcessor() + if err != nil { + return nil, fmt.Errorf("initialization of message processor failed: %v", err.Error()) + } + s.mp = p + if len(s.config.GmetricPath) > 0 { p, err := exec.LookPath(s.config.GmetricPath) if err == nil { @@ -122,5 +133,15 @@ func NewGangliaSink(name string, config json.RawMessage) (Sink, error) { if len(s.config.GmetricConfig) > 0 { s.gmetric_config = s.config.GmetricConfig } + if len(s.config.MessageProcessor) > 0 { + err = s.mp.FromConfigJSON(s.config.MessageProcessor) + if err != nil { + return nil, fmt.Errorf("failed parsing JSON for message processor: %v", err.Error()) + } + } + for _, k := range s.config.MetaAsTags { + s.mp.AddMoveMetaToTags("true", k, k) + } + return s, nil } diff --git a/sinks/httpSink.go b/sinks/httpSink.go index 6a94b8d..b846cdc 100644 --- a/sinks/httpSink.go +++ b/sinks/httpSink.go @@ -9,8 +9,9 @@ import ( "sync" "time" - cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message" + cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" + mp "github.com/ClusterCockpit/cc-metric-collector/pkg/messageProcessor" influx "github.com/influxdata/line-protocol/v2/lineprotocol" "golang.org/x/exp/slices" ) @@ -75,28 +76,20 @@ type HttpSink struct { } // Write sends metric m as http message -func (s *HttpSink) Write(m lp.CCMessage) error { +func (s *HttpSink) Write(msg lp.CCMessage) error { - // Lock for encoder usage - s.encoderLock.Lock() + // submit m only after applying processing/dropping rules + m, err := s.mp.ProcessMessage(msg) + if err == nil && m != nil { + // Lock for encoder usage + s.encoderLock.Lock() - // Encode measurement name - s.encoder.StartLine(m.Name()) + // Encode measurement name + s.encoder.StartLine(m.Name()) - // copy tags and meta data which should be used as tags - s.extended_tag_list = s.extended_tag_list[:0] - for key, value := range m.Tags() { - s.extended_tag_list = - append( - s.extended_tag_list, - key_value_pair{ - key: key, - value: value, - }, - ) - } - for _, key := range s.config.MetaAsTags { - if value, ok := m.GetMeta(key); ok { + // copy tags and meta data which should be used as tags + s.extended_tag_list = s.extended_tag_list[:0] + for key, value := range m.Tags() { s.extended_tag_list = append( s.extended_tag_list, @@ -106,45 +99,57 @@ func (s *HttpSink) Write(m lp.CCMessage) error { }, ) } - } + // for _, key := range s.config.MetaAsTags { + // if value, ok := m.GetMeta(key); ok { + // s.extended_tag_list = + // append( + // s.extended_tag_list, + // key_value_pair{ + // key: key, + // value: value, + // }, + // ) + // } + // } - // Encode tags (they musts be in lexical order) - slices.SortFunc( - s.extended_tag_list, - func(a key_value_pair, b key_value_pair) int { - if a.key < b.key { - return -1 - } - if a.key > b.key { - return +1 - } - return 0 - }, - ) - for i := range s.extended_tag_list { - s.encoder.AddTag( - s.extended_tag_list[i].key, - s.extended_tag_list[i].value, + // Encode tags (they musts be in lexical order) + slices.SortFunc( + s.extended_tag_list, + func(a key_value_pair, b key_value_pair) int { + if a.key < b.key { + return -1 + } + if a.key > b.key { + return +1 + } + return 0 + }, ) - } + for i := range s.extended_tag_list { + s.encoder.AddTag( + s.extended_tag_list[i].key, + s.extended_tag_list[i].value, + ) + } - // Encode fields - for key, value := range m.Fields() { - s.encoder.AddField(key, influx.MustNewValue(value)) - } + // Encode fields + for key, value := range m.Fields() { + s.encoder.AddField(key, influx.MustNewValue(value)) + } - // Encode time stamp - s.encoder.EndLine(m.Time()) + // Encode time stamp + s.encoder.EndLine(m.Time()) - // Check for encoder errors - err := s.encoder.Err() + // Check for encoder errors + err := s.encoder.Err() - // Unlock encoder usage - s.encoderLock.Unlock() + // Unlock encoder usage + s.encoderLock.Unlock() - // Check that encoding worked - if err != nil { - return fmt.Errorf("encoding failed: %v", err) + // Check that encoding worked + if err != nil { + return fmt.Errorf("encoding failed: %v", err) + } } if s.config.flushDelay == 0 { @@ -297,6 +302,11 @@ func NewHttpSink(name string, config json.RawMessage) (Sink, error) { if s.config.useBasicAuth && len(s.config.Password) == 0 { return nil, errors.New("basic authentication requires password") } + p, err := mp.NewMessageProcessor() + if err != nil { + return nil, fmt.Errorf("initialization of message processor failed: %v", err.Error()) + } + s.mp = p if len(s.config.IdleConnTimeout) > 0 { t, err := time.ParseDuration(s.config.IdleConnTimeout) @@ -319,6 +329,16 @@ func NewHttpSink(name string, config json.RawMessage) (Sink, error) { cclog.ComponentDebug(s.name, "Init(): flushDelay", t) } } + if len(s.config.MessageProcessor) > 0 { + err = p.FromConfigJSON(s.config.MessageProcessor) + if err != nil { + return nil, fmt.Errorf("failed parsing JSON for message processor: %v", err.Error()) + } + } + for _, k := range s.config.MetaAsTags { + s.mp.AddMoveMetaToTags("true", k, k) + } + precision := influx.Nanosecond if len(s.config.Precision) > 0 { switch s.config.Precision { diff --git a/sinks/influxAsyncSink.go b/sinks/influxAsyncSink.go index 9f0892e..2f705c8 100644 --- a/sinks/influxAsyncSink.go +++ b/sinks/influxAsyncSink.go @@ -10,8 +10,9 @@ import ( "strings" "time" - cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message" + cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" + mp "github.com/ClusterCockpit/cc-metric-collector/pkg/messageProcessor" influxdb2 "github.com/influxdata/influxdb-client-go/v2" influxdb2Api "github.com/influxdata/influxdb-client-go/v2/api" influxdb2ApiHttp "github.com/influxdata/influxdb-client-go/v2/api/http" @@ -121,9 +122,10 @@ func (s *InfluxAsyncSink) Write(m lp.CCMessage) error { } }) } - s.writeApi.WritePoint( - m.ToPoint(s.meta_as_tags), - ) + msg, err := s.mp.ProcessMessage(m) + if err == nil && msg != nil { + s.writeApi.WritePoint(msg.ToPoint(nil)) + } return nil } @@ -200,10 +202,24 @@ func NewInfluxAsyncSink(name string, config json.RawMessage) (Sink, error) { if len(s.config.Password) == 0 { return nil, errors.New("missing password configuration required by InfluxSink") } + p, err := mp.NewMessageProcessor() + if err != nil { + return nil, fmt.Errorf("initialization of message processor failed: %v", err.Error()) + } + s.mp = p + if len(s.config.MessageProcessor) > 0 { + err = s.mp.FromConfigJSON(s.config.MessageProcessor) + if err != nil { + return nil, fmt.Errorf("failed parsing JSON for message processor: %v", err.Error()) + } + } // Create lookup map to use meta infos as tags in the output metric - s.meta_as_tags = make(map[string]bool) + // s.meta_as_tags = make(map[string]bool) + // for _, k := range s.config.MetaAsTags { + // s.meta_as_tags[k] = true + // } for _, k := range s.config.MetaAsTags { - s.meta_as_tags[k] = true + s.mp.AddMoveMetaToTags("true", k, k) } toUint := func(duration string, def uint) uint { diff --git a/sinks/influxSink.go b/sinks/influxSink.go index 6dc8388..4fac48e 100644 --- a/sinks/influxSink.go +++ b/sinks/influxSink.go @@ -10,8 +10,9 @@ import ( "sync" "time" - cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message" + cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" + mp "github.com/ClusterCockpit/cc-metric-collector/pkg/messageProcessor" influxdb2 "github.com/influxdata/influxdb-client-go/v2" influxdb2Api "github.com/influxdata/influxdb-client-go/v2/api" influx "github.com/influxdata/line-protocol/v2/lineprotocol" @@ -224,28 +225,19 @@ func (s *InfluxSink) connect() error { } // Write sends metric m in influxDB line protocol -func (s *InfluxSink) Write(m lp.CCMessage) error { +func (s *InfluxSink) Write(msg lp.CCMessage) error { - // Lock for encoder usage - s.encoderLock.Lock() + m, err := s.mp.ProcessMessage(msg) + if err == nil && m != nil { + // Lock for encoder usage + s.encoderLock.Lock() - // Encode measurement name - s.encoder.StartLine(m.Name()) + // Encode measurement name + s.encoder.StartLine(m.Name()) - // copy tags and meta data which should be used as tags - s.extended_tag_list = s.extended_tag_list[:0] - for key, value := range m.Tags() { - s.extended_tag_list = - append( - s.extended_tag_list, - key_value_pair{ - key: key, - value: value, - }, - ) - } - for _, key := range s.config.MetaAsTags { - if value, ok := m.GetMeta(key); ok { + // copy tags and meta data which should be used as tags + s.extended_tag_list = s.extended_tag_list[:0] + for key, value := range m.Tags() { s.extended_tag_list = append( s.extended_tag_list, @@ -255,45 +247,57 @@ func (s *InfluxSink) Write(m lp.CCMessage) error { }, ) } - } + // for _, key := range s.config.MetaAsTags { + // if value, ok := m.GetMeta(key); ok { + // s.extended_tag_list = + // append( + // s.extended_tag_list, + // key_value_pair{ + // key: key, + // value: value, + // }, + // ) + // } + // } - // Encode tags (they musts be in lexical order) - slices.SortFunc( - s.extended_tag_list, - func(a key_value_pair, b key_value_pair) int { - if a.key < b.key { - return -1 - } - if a.key > b.key { - return +1 - } - return 0 - }, - ) - for i := range s.extended_tag_list { - s.encoder.AddTag( - s.extended_tag_list[i].key, - s.extended_tag_list[i].value, + // Encode tags (they musts be in lexical order) + slices.SortFunc( + s.extended_tag_list, + func(a key_value_pair, b key_value_pair) int { + if a.key < b.key { + return -1 + } + if a.key > b.key { + return +1 + } + return 0 + }, ) + for i := range s.extended_tag_list { + s.encoder.AddTag( + s.extended_tag_list[i].key, + s.extended_tag_list[i].value, + ) + } + + // Encode fields + for key, value := range m.Fields() { + s.encoder.AddField(key, influx.MustNewValue(value)) + } + + // Encode time stamp + s.encoder.EndLine(m.Time()) + + // Check for encoder errors + if err := s.encoder.Err(); err != nil { + // Unlock encoder usage + s.encoderLock.Unlock() + + return fmt.Errorf("encoding failed: %v", err) + } + s.numRecordsInEncoder++ } - // Encode fields - for key, value := range m.Fields() { - s.encoder.AddField(key, influx.MustNewValue(value)) - } - - // Encode time stamp - s.encoder.EndLine(m.Time()) - - // Check for encoder errors - if err := s.encoder.Err(); err != nil { - // Unlock encoder usage - s.encoderLock.Unlock() - - return fmt.Errorf("Encoding failed: %v", err) - } - s.numRecordsInEncoder++ - if s.config.flushDelay == 0 { // Unlock encoder usage s.encoderLock.Unlock() @@ -443,11 +447,20 @@ func NewInfluxSink(name string, config json.RawMessage) (Sink, error) { if len(s.config.Password) == 0 { return s, errors.New("missing password configuration required by InfluxSink") } + p, err := mp.NewMessageProcessor() + if err != nil { + return nil, fmt.Errorf("initialization of message processor failed: %v", err.Error()) + } + s.mp = p - // Create lookup map to use meta infos as tags in the output metric - s.meta_as_tags = make(map[string]bool) + if len(s.config.MessageProcessor) > 0 { + err = p.FromConfigJSON(s.config.MessageProcessor) + if err != nil { + return nil, fmt.Errorf("failed parsing JSON for message processor: %v", err.Error()) + } + } for _, k := range s.config.MetaAsTags { - s.meta_as_tags[k] = true + s.mp.AddMoveMetaToTags("true", k, k) } // Configure flush delay duration diff --git a/sinks/libgangliaSink.go b/sinks/libgangliaSink.go index 9b87de7..4700eee 100644 --- a/sinks/libgangliaSink.go +++ b/sinks/libgangliaSink.go @@ -72,8 +72,9 @@ import ( "fmt" "unsafe" - cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message" + cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" + mp "github.com/ClusterCockpit/cc-metric-collector/pkg/messageProcessor" "github.com/NVIDIA/go-nvml/pkg/dl" ) @@ -110,99 +111,102 @@ type LibgangliaSink struct { cstrCache map[string]*C.char } -func (s *LibgangliaSink) Write(point lp.CCMessage) error { +func (s *LibgangliaSink) Write(msg lp.CCMessage) error { var err error = nil var c_name *C.char var c_value *C.char var c_type *C.char var c_unit *C.char - // helper function for looking up C strings in the cache - lookup := func(key string) *C.char { - if _, exist := s.cstrCache[key]; !exist { - s.cstrCache[key] = C.CString(key) + point, err := s.mp.ProcessMessage(msg) + if err == nil && point != nil { + // helper function for looking up C strings in the cache + lookup := func(key string) *C.char { + if _, exist := s.cstrCache[key]; !exist { + s.cstrCache[key] = C.CString(key) + } + return s.cstrCache[key] } - return s.cstrCache[key] - } - conf := GetCommonGangliaConfig(point) - if len(conf.Type) == 0 { - conf = GetGangliaConfig(point) - } - if len(conf.Type) == 0 { - return fmt.Errorf("metric %q (Ganglia name %q) has no 'value' field", point.Name(), conf.Name) - } + conf := GetCommonGangliaConfig(point) + if len(conf.Type) == 0 { + conf = GetGangliaConfig(point) + } + if len(conf.Type) == 0 { + return fmt.Errorf("metric %q (Ganglia name %q) has no 'value' field", point.Name(), conf.Name) + } - if s.config.AddTypeToName { - conf.Name = GangliaMetricName(point) - } + if s.config.AddTypeToName { + conf.Name = GangliaMetricName(point) + } - c_value = C.CString(conf.Value) - c_type = lookup(conf.Type) - c_name = lookup(conf.Name) + c_value = C.CString(conf.Value) + c_type = lookup(conf.Type) + c_name = lookup(conf.Name) - // Add unit - unit := "" - if s.config.AddUnits { - unit = conf.Unit - } - c_unit = lookup(unit) + // Add unit + unit := "" + if s.config.AddUnits { + unit = conf.Unit + } + c_unit = lookup(unit) - // Determine the slope of the metric. Ganglia's own collector mostly use - // 'both' but the mem and swap total uses 'zero'. - slope_type := C.GANGLIA_SLOPE_BOTH - switch conf.Slope { - case "zero": - slope_type = C.GANGLIA_SLOPE_ZERO - case "both": - slope_type = C.GANGLIA_SLOPE_BOTH - } + // Determine the slope of the metric. Ganglia's own collector mostly use + // 'both' but the mem and swap total uses 'zero'. + slope_type := C.GANGLIA_SLOPE_BOTH + switch conf.Slope { + case "zero": + slope_type = C.GANGLIA_SLOPE_ZERO + case "both": + slope_type = C.GANGLIA_SLOPE_BOTH + } - // Create a new Ganglia metric - gmetric := C.Ganglia_metric_create(s.global_context) - // Set name, value, type and unit in the Ganglia metric - // The default slope_type is both directions, so up and down. Some metrics want 'zero' slope, probably constant. - // The 'tmax' value is by default 300. - rval := C.int(0) - rval = C.Ganglia_metric_set(gmetric, c_name, c_value, c_type, c_unit, C.uint(slope_type), C.uint(conf.Tmax), 0) - switch rval { - case 1: + // Create a new Ganglia metric + gmetric := C.Ganglia_metric_create(s.global_context) + // Set name, value, type and unit in the Ganglia metric + // The default slope_type is both directions, so up and down. Some metrics want 'zero' slope, probably constant. + // The 'tmax' value is by default 300. + rval := C.int(0) + rval = C.Ganglia_metric_set(gmetric, c_name, c_value, c_type, c_unit, C.uint(slope_type), C.uint(conf.Tmax), 0) + switch rval { + case 1: + C.free(unsafe.Pointer(c_value)) + return errors.New("invalid parameters") + case 2: + C.free(unsafe.Pointer(c_value)) + return errors.New("one of your parameters has an invalid character '\"'") + case 3: + C.free(unsafe.Pointer(c_value)) + return fmt.Errorf("the type parameter \"%s\" is not a valid type", conf.Type) + case 4: + C.free(unsafe.Pointer(c_value)) + return fmt.Errorf("the value parameter \"%s\" does not represent a number", conf.Value) + default: + } + + // Set the cluster name, otherwise it takes it from the configuration file + if len(s.config.ClusterName) > 0 { + C.Ganglia_metadata_add(gmetric, lookup("CLUSTER"), lookup(s.config.ClusterName)) + } + // Set the group metadata in the Ganglia metric if configured + if s.config.AddGangliaGroup { + c_group := lookup(conf.Group) + C.Ganglia_metadata_add(gmetric, lookup("GROUP"), c_group) + } + + // Now we send the metric + // gmetric does provide some more options like description and other options + // but they are not provided by the collectors + rval = C.Ganglia_metric_send(gmetric, s.send_channels) + if rval != 0 { + err = fmt.Errorf("there was an error sending metric %s to %d of the send channels ", point.Name(), rval) + // fall throuph to use Ganglia_metric_destroy from common cleanup + } + // Cleanup Ganglia metric + C.Ganglia_metric_destroy(gmetric) + // Free the value C string, the only one not stored in the cache C.free(unsafe.Pointer(c_value)) - return errors.New("invalid parameters") - case 2: - C.free(unsafe.Pointer(c_value)) - return errors.New("one of your parameters has an invalid character '\"'") - case 3: - C.free(unsafe.Pointer(c_value)) - return fmt.Errorf("the type parameter \"%s\" is not a valid type", conf.Type) - case 4: - C.free(unsafe.Pointer(c_value)) - return fmt.Errorf("the value parameter \"%s\" does not represent a number", conf.Value) - default: } - - // Set the cluster name, otherwise it takes it from the configuration file - if len(s.config.ClusterName) > 0 { - C.Ganglia_metadata_add(gmetric, lookup("CLUSTER"), lookup(s.config.ClusterName)) - } - // Set the group metadata in the Ganglia metric if configured - if s.config.AddGangliaGroup { - c_group := lookup(conf.Group) - C.Ganglia_metadata_add(gmetric, lookup("GROUP"), c_group) - } - - // Now we send the metric - // gmetric does provide some more options like description and other options - // but they are not provided by the collectors - rval = C.Ganglia_metric_send(gmetric, s.send_channels) - if rval != 0 { - err = fmt.Errorf("there was an error sending metric %s to %d of the send channels ", point.Name(), rval) - // fall throuph to use Ganglia_metric_destroy from common cleanup - } - // Cleanup Ganglia metric - C.Ganglia_metric_destroy(gmetric) - // Free the value C string, the only one not stored in the cache - C.free(unsafe.Pointer(c_value)) return err } @@ -241,6 +245,20 @@ func NewLibgangliaSink(name string, config json.RawMessage) (Sink, error) { return nil, err } } + p, err := mp.NewMessageProcessor() + if err != nil { + return nil, fmt.Errorf("initialization of message processor failed: %v", err.Error()) + } + s.mp = p + if len(s.config.MessageProcessor) > 0 { + err = s.mp.FromConfigJSON(s.config.MessageProcessor) + if err != nil { + return nil, fmt.Errorf("failed parsing JSON for message processor: %v", err.Error()) + } + } + for _, k := range s.config.MetaAsTags { + s.mp.AddMoveMetaToTags("true", k, k) + } lib := dl.New(s.config.GangliaLib, GANGLIA_LIB_DL_FLAGS) if lib == nil { return nil, fmt.Errorf("error instantiating DynamicLibrary for %s", s.config.GangliaLib) diff --git a/sinks/metricSink.go b/sinks/metricSink.go index 6e0c1e3..4cac04b 100644 --- a/sinks/metricSink.go +++ b/sinks/metricSink.go @@ -1,24 +1,29 @@ package sinks import ( + "encoding/json" + lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message" + mp "github.com/ClusterCockpit/cc-metric-collector/pkg/messageProcessor" ) type defaultSinkConfig struct { - MetaAsTags []string `json:"meta_as_tags,omitempty"` - Type string `json:"type"` + MetaAsTags []string `json:"meta_as_tags,omitempty"` + MessageProcessor json.RawMessage `json:"process_messages,omitempty"` + Type string `json:"type"` } type sink struct { - meta_as_tags map[string]bool // Use meta data tags as tags - name string // Name of the sink + meta_as_tags map[string]bool // Use meta data tags as tags + mp mp.MessageProcessor // message processor for the sink + name string // Name of the sink } type Sink interface { Write(point lp.CCMessage) error // Write metric to the sink - Flush() error // Flush buffered metrics - Close() // Close / finish metric sink - Name() string // Name of the metric sink + Flush() error // Flush buffered metrics + Close() // Close / finish metric sink + Name() string // Name of the metric sink } // Name returns the name of the metric sink diff --git a/sinks/natsSink.go b/sinks/natsSink.go index 8a21510..04a0765 100644 --- a/sinks/natsSink.go +++ b/sinks/natsSink.go @@ -8,8 +8,9 @@ import ( "sync" "time" - cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message" + cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" + mp "github.com/ClusterCockpit/cc-metric-collector/pkg/messageProcessor" influx "github.com/influxdata/line-protocol" nats "github.com/nats-io/nats.go" ) @@ -60,12 +61,15 @@ func (s *NatsSink) connect() error { } func (s *NatsSink) Write(m lp.CCMessage) error { - s.lock.Lock() - _, err := s.encoder.Encode(m.ToPoint(s.meta_as_tags)) - s.lock.Unlock() - if err != nil { - cclog.ComponentError(s.name, "Write:", err.Error()) - return err + msg, err := s.mp.ProcessMessage(m) + if err == nil && msg != nil { + s.lock.Lock() + _, err := s.encoder.Encode(msg.ToPoint(nil)) + s.lock.Unlock() + if err != nil { + cclog.ComponentError(s.name, "Write:", err.Error()) + return err + } } if s.flushDelay == 0 { @@ -120,11 +124,25 @@ func NewNatsSink(name string, config json.RawMessage) (Sink, error) { len(s.config.Subject) == 0 { return nil, errors.New("not all configuration variables set required by NatsSink") } - // Create lookup map to use meta infos as tags in the output metric - s.meta_as_tags = make(map[string]bool) - for _, k := range s.config.MetaAsTags { - s.meta_as_tags[k] = true + p, err := mp.NewMessageProcessor() + if err != nil { + return nil, fmt.Errorf("initialization of message processor failed: %v", err.Error()) } + s.mp = p + if len(s.config.MessageProcessor) > 0 { + err = s.mp.FromConfigJSON(s.config.MessageProcessor) + if err != nil { + return nil, fmt.Errorf("failed parsing JSON for message processor: %v", err.Error()) + } + } + // Create lookup map to use meta infos as tags in the output metric + for _, k := range s.config.MetaAsTags { + s.mp.AddMoveMetaToTags("true", k, k) + } + // s.meta_as_tags = make(map[string]bool) + // for _, k := range s.config.MetaAsTags { + // s.meta_as_tags[k] = true + // } // Setup Influx line protocol s.buffer = &bytes.Buffer{} s.buffer.Grow(1025) diff --git a/sinks/prometheusSink.go b/sinks/prometheusSink.go index ab8cbfd..677cfd6 100644 --- a/sinks/prometheusSink.go +++ b/sinks/prometheusSink.go @@ -10,8 +10,9 @@ import ( "strings" "sync" - cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message" + cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" + mp "github.com/ClusterCockpit/cc-metric-collector/pkg/messageProcessor" "github.com/gorilla/mux" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" @@ -153,7 +154,11 @@ func (s *PrometheusSink) updateMetric(metric lp.CCMessage) error { } func (s *PrometheusSink) Write(m lp.CCMessage) error { - return s.updateMetric(m) + msg, err := s.mp.ProcessMessage(m) + if err == nil && msg != nil { + err = s.updateMetric(m) + } + return err } func (s *PrometheusSink) Flush() error { @@ -182,6 +187,20 @@ func NewPrometheusSink(name string, config json.RawMessage) (Sink, error) { cclog.ComponentError(s.name, err.Error()) return nil, err } + p, err := mp.NewMessageProcessor() + if err != nil { + return nil, fmt.Errorf("initialization of message processor failed: %v", err.Error()) + } + s.mp = p + if len(s.config.MessageProcessor) > 0 { + err = p.FromConfigJSON(s.config.MessageProcessor) + if err != nil { + return nil, fmt.Errorf("failed parsing JSON for message processor: %v", err.Error()) + } + } + for _, k := range s.config.MetaAsTags { + s.mp.AddMoveMetaToTags("true", k, k) + } s.labelMetrics = make(map[string]*prometheus.GaugeVec) s.nodeMetrics = make(map[string]prometheus.Gauge) s.promWg.Add(1) diff --git a/sinks/sampleSink.go b/sinks/sampleSink.go index da617da..24e2911 100644 --- a/sinks/sampleSink.go +++ b/sinks/sampleSink.go @@ -6,8 +6,9 @@ import ( "fmt" "log" - cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message" + cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" + mp "github.com/ClusterCockpit/cc-metric-collector/pkg/messageProcessor" ) type SampleSinkConfig struct { @@ -30,7 +31,12 @@ type SampleSink struct { // Code to submit a single CCMetric to the sink func (s *SampleSink) Write(point lp.CCMessage) error { // based on s.meta_as_tags use meta infos as tags - log.Print(point) + // moreover, submit the point to the message processor + // to apply drop/modify rules + msg, err := s.mp.ProcessMessage(point) + if err == nil && msg != nil { + log.Print(msg) + } return nil } @@ -66,10 +72,24 @@ func NewSampleSink(name string, config json.RawMessage) (Sink, error) { } } - // Create lookup map to use meta infos as tags in the output metric - s.meta_as_tags = make(map[string]bool) + // Initialize and configure the message processor + p, err := mp.NewMessageProcessor() + if err != nil { + return nil, fmt.Errorf("initialization of message processor failed: %v", err.Error()) + } + s.mp = p + + // Add message processor configuration + if len(s.config.MessageProcessor) > 0 { + err = p.FromConfigJSON(s.config.MessageProcessor) + if err != nil { + return nil, fmt.Errorf("failed parsing JSON for message processor: %v", err.Error()) + } + } + // Add rules to move meta information to tag space + // Replacing the legacy 'meta_as_tags' configuration for _, k := range s.config.MetaAsTags { - s.meta_as_tags[k] = true + s.mp.AddMoveMetaToTags("true", k, k) } // Check if all required fields in the config are set diff --git a/sinks/stdoutSink.go b/sinks/stdoutSink.go index c9cda7e..a95e866 100644 --- a/sinks/stdoutSink.go +++ b/sinks/stdoutSink.go @@ -8,8 +8,9 @@ import ( "strings" // "time" - cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message" + cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" + mp "github.com/ClusterCockpit/cc-metric-collector/pkg/messageProcessor" ) type StdoutSink struct { @@ -22,10 +23,13 @@ type StdoutSink struct { } func (s *StdoutSink) Write(m lp.CCMessage) error { - fmt.Fprint( - s.output, - m.ToLineProtocol(s.meta_as_tags), - ) + msg, err := s.mp.ProcessMessage(m) + if err == nil && msg != nil { + fmt.Fprint( + s.output, + msg.ToLineProtocol(s.meta_as_tags), + ) + } return nil } @@ -41,6 +45,7 @@ func (s *StdoutSink) Close() { } func NewStdoutSink(name string, config json.RawMessage) (Sink, error) { + s := new(StdoutSink) s.name = fmt.Sprintf("StdoutSink(%s)", name) if len(config) > 0 { @@ -51,6 +56,11 @@ func NewStdoutSink(name string, config json.RawMessage) (Sink, error) { return nil, err } } + p, err := mp.NewMessageProcessor() + if err != nil { + return nil, fmt.Errorf("initialization of message processor failed: %v", err.Error()) + } + s.mp = p s.output = os.Stdout if len(s.config.Output) > 0 { @@ -67,10 +77,21 @@ func NewStdoutSink(name string, config json.RawMessage) (Sink, error) { s.output = f } } + + // Add message processor configuration + if len(s.config.MessageProcessor) > 0 { + err = s.mp.FromConfigJSON(s.config.MessageProcessor) + if err != nil { + return nil, fmt.Errorf("failed parsing JSON for message processor: %v", err.Error()) + } + } // Create lookup map to use meta infos as tags in the output metric - s.meta_as_tags = make(map[string]bool) + // s.meta_as_tags = make(map[string]bool) + // for _, k := range s.config.MetaAsTags { + // s.meta_as_tags[k] = true + // } for _, k := range s.config.MetaAsTags { - s.meta_as_tags[k] = true + s.mp.AddMoveMetaToTags("true", k, k) } return s, nil diff --git a/sinks/stdoutSink.md b/sinks/stdoutSink.md index 3fe3308..14ef4bd 100644 --- a/sinks/stdoutSink.md +++ b/sinks/stdoutSink.md @@ -10,7 +10,10 @@ The `stdout` sink is the most simple sink provided by cc-metric-collector. It wr "": { "type": "stdout", "meta_as_tags" : [], - "output_file" : "mylogfile.log" + "output_file" : "mylogfile.log", + "process_messages" : { + "see" : "docs of message processor for valid fields" + } } } ``` @@ -18,5 +21,6 @@ The `stdout` sink is the most simple sink provided by cc-metric-collector. It wr - `type`: makes the sink an `stdout` sink - `meta_as_tags`: print meta information as tags in the output (optional) - `output_file`: Write all data to the selected file (optional). There are two 'special' files: `stdout` and `stderr`. If this option is not provided, the default value is `stdout` +- `process_messages`: Process messages with given rules before progressing or dropping