mirror of
https://github.com/ClusterCockpit/cc-metric-collector.git
synced 2025-07-23 05:11:40 +02:00
Merge development branch to main (#141)
* Remove go-toolkit as build requirement for RPM builds if run in CI * Remove condition around BuildRequires and use go-toolkit for RPM builds * use go-toolkit for RPM builds * Install go-toolkit to fulfill build requirements for RPM * Add golang-race for UBI9 and Alma9 * Fix wrongly named packages * Fix wrongly named packages * Fix Release part * Fix Release part * Fix documentation of RAPL collector * Mark all JSON config fields of message processor as omitempty * Generate HUGO inputs out of Markdown files * Check creation of CCMessage in NATS receiver * Use CCMessage FromBytes instead of Influx's decoder * Rename 'process_message' to 'process_messages' in metricRouter config This makes the behavior more consistent with the other modules, which have their MessageProcessor named 'process_messages'. This most likely was just a typo. * Add optional interface alias in netstat (#130) * Check creation of CCMessage in NATS receiver * add optional interface aliases for netstatMetric * small fix --------- Co-authored-by: Thomas Roehl <thomas.roehl@fau.de> Co-authored-by: exterr2f <Robert.Externbrink@rub.de> Co-authored-by: Thomas Gruber <Thomas.Roehl@googlemail.com> * Fix excluded metrics for diskstat and add exclude_mounts (#131) * Check creation of CCMessage in NATS receiver * fix excluded metrics and add optional mountpoint exclude --------- Co-authored-by: Thomas Roehl <thomas.roehl@fau.de> Co-authored-by: exterr2f <Robert.Externbrink@rub.de> Co-authored-by: Thomas Gruber <Thomas.Roehl@googlemail.com> * Add derived values for nfsiostat (#132) * Check creation of CCMessage in NATS receiver * add derived_values for nfsiostatMetric --------- Co-authored-by: Thomas Roehl <thomas.roehl@fau.de> Co-authored-by: exterr2f <Robert.Externbrink@rub.de> Co-authored-by: Thomas Gruber <Thomas.Roehl@googlemail.com> * Add exclude_devices to iostat (#133) * Check creation of CCMessage in NATS receiver * add exclude_device for iostatMetric * add md file --------- Co-authored-by: Thomas Roehl <thomas.roehl@fau.de> Co-authored-by: exterr2f <Robert.Externbrink@rub.de> Co-authored-by: Thomas Gruber <Thomas.Roehl@googlemail.com> * Add derived_values for numastats (#134) * Check creation of CCMessage in NATS receiver * add derived_values for numastats * change to ccMessage * remove vim command artefact --------- Co-authored-by: Thomas Roehl <thomas.roehl@fau.de> Co-authored-by: exterr2f <Robert.Externbrink@rub.de> Co-authored-by: Thomas Gruber <Thomas.Roehl@googlemail.com> * Fix artifacts of not done cc-lib switch * Fix artifacts in netstat collector of not done cc-lib switch * Change to cc-lib (#135) * Change to ccMessage from cc-lib * Remove local development path * Use receiver, sinks, ccLogger and ccConfig from cc-lib * Fix ccLogger import path * Update CI * Delete mountpoint when it vanishes, not just its data (#137) --------- Co-authored-by: Michael Panzlaff <michael.panzlaff@fau.de> Co-authored-by: brinkcoder <Robert.Externbrink@ruhr-uni-bochum.de> Co-authored-by: exterr2f <Robert.Externbrink@rub.de>
This commit is contained in:
@@ -1,113 +0,0 @@
|
||||
package cclogger
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"runtime"
|
||||
)
|
||||
|
||||
var (
|
||||
globalDebug = false
|
||||
stdout = os.Stdout
|
||||
stderr = os.Stderr
|
||||
debugLog *log.Logger = nil
|
||||
infoLog *log.Logger = nil
|
||||
errorLog *log.Logger = nil
|
||||
warnLog *log.Logger = nil
|
||||
defaultLog *log.Logger = nil
|
||||
)
|
||||
|
||||
func initLogger() {
|
||||
if debugLog == nil {
|
||||
debugLog = log.New(stderr, "DEBUG ", log.LstdFlags)
|
||||
}
|
||||
if infoLog == nil {
|
||||
infoLog = log.New(stdout, "INFO ", log.LstdFlags)
|
||||
}
|
||||
if errorLog == nil {
|
||||
errorLog = log.New(stderr, "ERROR ", log.LstdFlags)
|
||||
}
|
||||
if warnLog == nil {
|
||||
warnLog = log.New(stderr, "WARN ", log.LstdFlags)
|
||||
}
|
||||
if defaultLog == nil {
|
||||
defaultLog = log.New(stdout, "", log.LstdFlags)
|
||||
}
|
||||
}
|
||||
|
||||
func Print(e ...interface{}) {
|
||||
initLogger()
|
||||
defaultLog.Print(e...)
|
||||
}
|
||||
|
||||
func ComponentPrint(component string, e ...interface{}) {
|
||||
initLogger()
|
||||
defaultLog.Print(fmt.Sprintf("[%s] ", component), e)
|
||||
}
|
||||
|
||||
func Info(e ...interface{}) {
|
||||
initLogger()
|
||||
infoLog.Print(e...)
|
||||
}
|
||||
|
||||
func ComponentInfo(component string, e ...interface{}) {
|
||||
initLogger()
|
||||
infoLog.Print(fmt.Sprintf("[%s] ", component), e)
|
||||
}
|
||||
|
||||
func Debug(e ...interface{}) {
|
||||
initLogger()
|
||||
if globalDebug {
|
||||
debugLog.Print(e...)
|
||||
}
|
||||
}
|
||||
|
||||
func ComponentDebug(component string, e ...interface{}) {
|
||||
initLogger()
|
||||
if globalDebug && debugLog != nil {
|
||||
//CCComponentPrint(debugLog, component, e)
|
||||
debugLog.Print(fmt.Sprintf("[%s] ", component), e)
|
||||
}
|
||||
}
|
||||
|
||||
func Error(e ...interface{}) {
|
||||
initLogger()
|
||||
_, fn, line, _ := runtime.Caller(1)
|
||||
errorLog.Print(fmt.Sprintf("[%s:%d] ", fn, line), e)
|
||||
}
|
||||
|
||||
func ComponentError(component string, e ...interface{}) {
|
||||
initLogger()
|
||||
_, fn, line, _ := runtime.Caller(1)
|
||||
errorLog.Print(fmt.Sprintf("[%s|%s:%d] ", component, fn, line), e)
|
||||
}
|
||||
|
||||
func SetDebug() {
|
||||
globalDebug = true
|
||||
initLogger()
|
||||
}
|
||||
|
||||
func SetOutput(filename string) {
|
||||
if filename == "stderr" {
|
||||
if stderr != os.Stderr && stderr != os.Stdout {
|
||||
stderr.Close()
|
||||
}
|
||||
stderr = os.Stderr
|
||||
} else if filename == "stdout" {
|
||||
if stderr != os.Stderr && stderr != os.Stdout {
|
||||
stderr.Close()
|
||||
}
|
||||
stderr = os.Stdout
|
||||
} else {
|
||||
file, err := os.OpenFile(filename, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0600)
|
||||
if err == nil {
|
||||
defer file.Close()
|
||||
stderr = file
|
||||
}
|
||||
}
|
||||
debugLog = nil
|
||||
errorLog = nil
|
||||
warnLog = nil
|
||||
initLogger()
|
||||
}
|
@@ -1,57 +0,0 @@
|
||||
# ClusterCockpit metrics
|
||||
|
||||
As described in the [ClusterCockpit specifications](https://github.com/ClusterCockpit/cc-specifications), the whole ClusterCockpit stack uses metrics in the InfluxDB line protocol format. This is also the input and output format for the ClusterCockpit Metric Collector but internally it uses an extended format while processing, named CCMetric.
|
||||
|
||||
It is basically a copy of the [InfluxDB line protocol](https://github.com/influxdata/line-protocol) `MutableMetric` interface with one extension. Besides the tags and fields, it contains a list of meta information (re-using the `Tag` structure of the original protocol):
|
||||
|
||||
```golang
|
||||
type ccMetric struct {
|
||||
name string // Measurement name
|
||||
meta map[string]string // map of meta data tags
|
||||
tags map[string]string // map of of tags
|
||||
fields map[string]interface{} // map of of fields
|
||||
tm time.Time // timestamp
|
||||
}
|
||||
|
||||
type CCMetric interface {
|
||||
ToPoint(metaAsTags map[string]bool) *write.Point // Generate influxDB point for data type ccMetric
|
||||
ToLineProtocol(metaAsTags map[string]bool) string // Generate influxDB line protocol for data type ccMetric
|
||||
String() string // Return line-protocol like string
|
||||
|
||||
Name() string // Get metric name
|
||||
SetName(name string) // Set metric name
|
||||
|
||||
Time() time.Time // Get timestamp
|
||||
SetTime(t time.Time) // Set timestamp
|
||||
|
||||
Tags() map[string]string // Map of tags
|
||||
AddTag(key, value string) // Add a tag
|
||||
GetTag(key string) (value string, ok bool) // Get a tag by its key
|
||||
HasTag(key string) (ok bool) // Check if a tag key is present
|
||||
RemoveTag(key string) // Remove a tag by its key
|
||||
|
||||
Meta() map[string]string // Map of meta data tags
|
||||
AddMeta(key, value string) // Add a meta data tag
|
||||
GetMeta(key string) (value string, ok bool) // Get a meta data tab addressed by its key
|
||||
HasMeta(key string) (ok bool) // Check if a meta data key is present
|
||||
RemoveMeta(key string) // Remove a meta data tag by its key
|
||||
|
||||
Fields() map[string]interface{} // Map of fields
|
||||
AddField(key string, value interface{}) // Add a field
|
||||
GetField(key string) (value interface{}, ok bool) // Get a field addressed by its key
|
||||
HasField(key string) (ok bool) // Check if a field key is present
|
||||
RemoveField(key string) // Remove a field addressed by its key
|
||||
}
|
||||
|
||||
func New(name string, tags map[string]string, meta map[string]string, fields map[string]interface{}, tm time.Time) (CCMetric, error)
|
||||
func FromMetric(other CCMetric) CCMetric
|
||||
func FromInfluxMetric(other lp.Metric) CCMetric
|
||||
```
|
||||
|
||||
The `CCMetric` interface provides the same functions as the `MutableMetric` like `{Add, Get, Remove, Has}{Tag, Field}` and additionally provides `{Add, Get, Remove, Has}Meta`.
|
||||
|
||||
The InfluxDB protocol creates a new metric with `influx.New(name, tags, fields, time)` while CCMetric uses `ccMetric.New(name, tags, meta, fields, time)` where `tags` and `meta` are both of type `map[string]string`.
|
||||
|
||||
You can copy a CCMetric with `FromMetric(other CCMetric) CCMetric`. If you get an `influx.Metric` from a function, like the line protocol parser, you can use `FromInfluxMetric(other influx.Metric) CCMetric` to get a CCMetric out of it (see `NatsReceiver` for an example).
|
||||
|
||||
Although the [cc-specifications](https://github.com/ClusterCockpit/cc-specifications/blob/master/interfaces/lineprotocol/README.md) defines that there is only a `value` field for the metric value, the CCMetric still can have multiple values similar to the InfluxDB line protocol.
|
@@ -1,353 +0,0 @@
|
||||
package ccmetric
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
|
||||
write "github.com/influxdata/influxdb-client-go/v2/api/write"
|
||||
lp "github.com/influxdata/line-protocol" // MIT license
|
||||
"golang.org/x/exp/maps"
|
||||
)
|
||||
|
||||
// Most functions are derived from github.com/influxdata/line-protocol/metric.go
|
||||
// The metric type is extended with an extra meta information list re-using the Tag
|
||||
// type.
|
||||
//
|
||||
// See: https://docs.influxdata.com/influxdb/latest/reference/syntax/line-protocol/
|
||||
type ccMetric struct {
|
||||
name string // Measurement name
|
||||
meta map[string]string // map of meta data tags
|
||||
tags map[string]string // map of of tags
|
||||
fields map[string]interface{} // map of of fields
|
||||
tm time.Time // timestamp
|
||||
}
|
||||
|
||||
// ccMetric access functions
|
||||
type CCMetric interface {
|
||||
ToPoint(metaAsTags map[string]bool) *write.Point // Generate influxDB point for data type ccMetric
|
||||
ToLineProtocol(metaAsTags map[string]bool) string // Generate influxDB line protocol for data type ccMetric
|
||||
|
||||
Name() string // Get metric name
|
||||
SetName(name string) // Set metric name
|
||||
|
||||
Time() time.Time // Get timestamp
|
||||
SetTime(t time.Time) // Set timestamp
|
||||
|
||||
Tags() map[string]string // Map of tags
|
||||
AddTag(key, value string) // Add a tag
|
||||
GetTag(key string) (value string, ok bool) // Get a tag by its key
|
||||
HasTag(key string) (ok bool) // Check if a tag key is present
|
||||
RemoveTag(key string) // Remove a tag by its key
|
||||
|
||||
Meta() map[string]string // Map of meta data tags
|
||||
AddMeta(key, value string) // Add a meta data tag
|
||||
GetMeta(key string) (value string, ok bool) // Get a meta data tab addressed by its key
|
||||
HasMeta(key string) (ok bool) // Check if a meta data key is present
|
||||
RemoveMeta(key string) // Remove a meta data tag by its key
|
||||
|
||||
Fields() map[string]interface{} // Map of fields
|
||||
AddField(key string, value interface{}) // Add a field
|
||||
GetField(key string) (value interface{}, ok bool) // Get a field addressed by its key
|
||||
HasField(key string) (ok bool) // Check if a field key is present
|
||||
RemoveField(key string) // Remove a field addressed by its key
|
||||
String() string // Return line-protocol like string
|
||||
}
|
||||
|
||||
// String implements the stringer interface for data type ccMetric
|
||||
func (m *ccMetric) String() string {
|
||||
return fmt.Sprintf(
|
||||
"Name: %s, Tags: %+v, Meta: %+v, fields: %+v, Timestamp: %d",
|
||||
m.name, m.tags, m.meta, m.fields, m.tm.UnixNano(),
|
||||
)
|
||||
}
|
||||
|
||||
// ToLineProtocol generates influxDB line protocol for data type ccMetric
|
||||
func (m *ccMetric) ToPoint(metaAsTags map[string]bool) (p *write.Point) {
|
||||
p = influxdb2.NewPoint(m.name, m.tags, m.fields, m.tm)
|
||||
for key, use_as_tag := range metaAsTags {
|
||||
if use_as_tag {
|
||||
if value, ok := m.GetMeta(key); ok {
|
||||
p.AddTag(key, value)
|
||||
}
|
||||
}
|
||||
}
|
||||
return p
|
||||
}
|
||||
|
||||
// ToLineProtocol generates influxDB line protocol for data type ccMetric
|
||||
func (m *ccMetric) ToLineProtocol(metaAsTags map[string]bool) string {
|
||||
|
||||
return write.PointToLineProtocol(
|
||||
m.ToPoint(metaAsTags),
|
||||
time.Nanosecond,
|
||||
)
|
||||
}
|
||||
|
||||
// Name returns the measurement name
|
||||
func (m *ccMetric) Name() string {
|
||||
return m.name
|
||||
}
|
||||
|
||||
// SetName sets the measurement name
|
||||
func (m *ccMetric) SetName(name string) {
|
||||
m.name = name
|
||||
}
|
||||
|
||||
// Time returns timestamp
|
||||
func (m *ccMetric) Time() time.Time {
|
||||
return m.tm
|
||||
}
|
||||
|
||||
// SetTime sets the timestamp
|
||||
func (m *ccMetric) SetTime(t time.Time) {
|
||||
m.tm = t
|
||||
}
|
||||
|
||||
// Tags returns the the list of tags as key-value-mapping
|
||||
func (m *ccMetric) Tags() map[string]string {
|
||||
return m.tags
|
||||
}
|
||||
|
||||
// AddTag adds a tag (consisting of key and value) to the map of tags
|
||||
func (m *ccMetric) AddTag(key, value string) {
|
||||
m.tags[key] = value
|
||||
}
|
||||
|
||||
// GetTag returns the tag with tag's key equal to <key>
|
||||
func (m *ccMetric) GetTag(key string) (string, bool) {
|
||||
value, ok := m.tags[key]
|
||||
return value, ok
|
||||
}
|
||||
|
||||
// HasTag checks if a tag with key equal to <key> is present in the list of tags
|
||||
func (m *ccMetric) HasTag(key string) bool {
|
||||
_, ok := m.tags[key]
|
||||
return ok
|
||||
}
|
||||
|
||||
// RemoveTag removes the tag with tag's key equal to <key>
|
||||
func (m *ccMetric) RemoveTag(key string) {
|
||||
delete(m.tags, key)
|
||||
}
|
||||
|
||||
// Meta returns the meta data tags as key-value mapping
|
||||
func (m *ccMetric) Meta() map[string]string {
|
||||
return m.meta
|
||||
}
|
||||
|
||||
// AddMeta adds a meta data tag (consisting of key and value) to the map of meta data tags
|
||||
func (m *ccMetric) AddMeta(key, value string) {
|
||||
m.meta[key] = value
|
||||
}
|
||||
|
||||
// GetMeta returns the meta data tag with meta data's key equal to <key>
|
||||
func (m *ccMetric) GetMeta(key string) (string, bool) {
|
||||
value, ok := m.meta[key]
|
||||
return value, ok
|
||||
}
|
||||
|
||||
// HasMeta checks if a meta data tag with meta data's key equal to <key> is present in the map of meta data tags
|
||||
func (m *ccMetric) HasMeta(key string) bool {
|
||||
_, ok := m.meta[key]
|
||||
return ok
|
||||
}
|
||||
|
||||
// RemoveMeta removes the meta data tag with tag's key equal to <key>
|
||||
func (m *ccMetric) RemoveMeta(key string) {
|
||||
delete(m.meta, key)
|
||||
}
|
||||
|
||||
// Fields returns the list of fields as key-value-mapping
|
||||
func (m *ccMetric) Fields() map[string]interface{} {
|
||||
return m.fields
|
||||
}
|
||||
|
||||
// AddField adds a field (consisting of key and value) to the map of fields
|
||||
func (m *ccMetric) AddField(key string, value interface{}) {
|
||||
m.fields[key] = value
|
||||
}
|
||||
|
||||
// GetField returns the field with field's key equal to <key>
|
||||
func (m *ccMetric) GetField(key string) (interface{}, bool) {
|
||||
v, ok := m.fields[key]
|
||||
return v, ok
|
||||
}
|
||||
|
||||
// HasField checks if a field with field's key equal to <key> is present in the map of fields
|
||||
func (m *ccMetric) HasField(key string) bool {
|
||||
_, ok := m.fields[key]
|
||||
return ok
|
||||
}
|
||||
|
||||
// RemoveField removes the field with field's key equal to <key>
|
||||
// from the map of fields
|
||||
func (m *ccMetric) RemoveField(key string) {
|
||||
delete(m.fields, key)
|
||||
}
|
||||
|
||||
// New creates a new measurement point
|
||||
func New(
|
||||
name string,
|
||||
tags map[string]string,
|
||||
meta map[string]string,
|
||||
fields map[string]interface{},
|
||||
tm time.Time,
|
||||
) (CCMetric, error) {
|
||||
m := &ccMetric{
|
||||
name: name,
|
||||
tags: maps.Clone(tags),
|
||||
meta: maps.Clone(meta),
|
||||
fields: make(map[string]interface{}, len(fields)),
|
||||
tm: tm,
|
||||
}
|
||||
|
||||
// deep copy fields
|
||||
for k, v := range fields {
|
||||
v := convertField(v)
|
||||
if v == nil {
|
||||
continue
|
||||
}
|
||||
m.fields[k] = v
|
||||
}
|
||||
|
||||
return m, nil
|
||||
}
|
||||
|
||||
// FromMetric copies the metric <other>
|
||||
func FromMetric(other CCMetric) CCMetric {
|
||||
|
||||
return &ccMetric{
|
||||
name: other.Name(),
|
||||
tags: maps.Clone(other.Tags()),
|
||||
meta: maps.Clone(other.Meta()),
|
||||
fields: maps.Clone(other.Fields()),
|
||||
tm: other.Time(),
|
||||
}
|
||||
}
|
||||
|
||||
// FromInfluxMetric copies the influxDB line protocol metric <other>
|
||||
func FromInfluxMetric(other lp.Metric) CCMetric {
|
||||
m := &ccMetric{
|
||||
name: other.Name(),
|
||||
tags: make(map[string]string),
|
||||
meta: make(map[string]string),
|
||||
fields: make(map[string]interface{}),
|
||||
tm: other.Time(),
|
||||
}
|
||||
|
||||
// deep copy tags and fields
|
||||
for _, otherTag := range other.TagList() {
|
||||
m.tags[otherTag.Key] = otherTag.Value
|
||||
}
|
||||
for _, otherField := range other.FieldList() {
|
||||
m.fields[otherField.Key] = otherField.Value
|
||||
}
|
||||
return m
|
||||
}
|
||||
|
||||
// convertField converts data types of fields by the following schemata:
|
||||
//
|
||||
// *float32, *float64, float32, float64 -> float64
|
||||
// *int, *int8, *int16, *int32, *int64, int, int8, int16, int32, int64 -> int64
|
||||
//
|
||||
// *uint, *uint8, *uint16, *uint32, *uint64, uint, uint8, uint16, uint32, uint64 -> uint64
|
||||
// *[]byte, *string, []byte, string -> string
|
||||
// *bool, bool -> bool
|
||||
func convertField(v interface{}) interface{} {
|
||||
switch v := v.(type) {
|
||||
case float64:
|
||||
return v
|
||||
case int64:
|
||||
return v
|
||||
case string:
|
||||
return v
|
||||
case bool:
|
||||
return v
|
||||
case int:
|
||||
return int64(v)
|
||||
case uint:
|
||||
return uint64(v)
|
||||
case uint64:
|
||||
return uint64(v)
|
||||
case []byte:
|
||||
return string(v)
|
||||
case int32:
|
||||
return int64(v)
|
||||
case int16:
|
||||
return int64(v)
|
||||
case int8:
|
||||
return int64(v)
|
||||
case uint32:
|
||||
return uint64(v)
|
||||
case uint16:
|
||||
return uint64(v)
|
||||
case uint8:
|
||||
return uint64(v)
|
||||
case float32:
|
||||
return float64(v)
|
||||
case *float64:
|
||||
if v != nil {
|
||||
return *v
|
||||
}
|
||||
case *int64:
|
||||
if v != nil {
|
||||
return *v
|
||||
}
|
||||
case *string:
|
||||
if v != nil {
|
||||
return *v
|
||||
}
|
||||
case *bool:
|
||||
if v != nil {
|
||||
return *v
|
||||
}
|
||||
case *int:
|
||||
if v != nil {
|
||||
return int64(*v)
|
||||
}
|
||||
case *uint:
|
||||
if v != nil {
|
||||
return uint64(*v)
|
||||
}
|
||||
case *uint64:
|
||||
if v != nil {
|
||||
return uint64(*v)
|
||||
}
|
||||
case *[]byte:
|
||||
if v != nil {
|
||||
return string(*v)
|
||||
}
|
||||
case *int32:
|
||||
if v != nil {
|
||||
return int64(*v)
|
||||
}
|
||||
case *int16:
|
||||
if v != nil {
|
||||
return int64(*v)
|
||||
}
|
||||
case *int8:
|
||||
if v != nil {
|
||||
return int64(*v)
|
||||
}
|
||||
case *uint32:
|
||||
if v != nil {
|
||||
return uint64(*v)
|
||||
}
|
||||
case *uint16:
|
||||
if v != nil {
|
||||
return uint64(*v)
|
||||
}
|
||||
case *uint8:
|
||||
if v != nil {
|
||||
return uint64(*v)
|
||||
}
|
||||
case *float32:
|
||||
if v != nil {
|
||||
return float64(*v)
|
||||
}
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
return nil
|
||||
}
|
@@ -9,7 +9,7 @@ import (
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
cclogger "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
|
||||
cclogger "github.com/ClusterCockpit/cc-lib/ccLogger"
|
||||
"golang.org/x/exp/slices"
|
||||
)
|
||||
|
||||
|
@@ -1,266 +0,0 @@
|
||||
# Message Processor Component
|
||||
|
||||
Multiple parts of in the ClusterCockit ecosystem require the processing of CCMessages.
|
||||
The main CC application using it is `cc-metric-collector`. The processing part there was originally in the metric router, the central
|
||||
hub connecting collectors (reading local data), receivers (receiving remote data) and sinks (sending data). Already in early stages, the
|
||||
lack of flexibility caused some trouble:
|
||||
|
||||
> The sysadmins wanted to keep operating their Ganglia based monitoring infrastructure while we developed the CC stack. Ganglia wants the core metrics with
|
||||
> a specific name and resolution (right unit prefix) but there was no conversion of the data in the CC stack, so CC frontend developers wanted a different
|
||||
> resolution for some metrics. The issue was basically the `mem_used` metric showing the currently used memory of the node. Ganglia wants it in `kByte` as provided
|
||||
> by the Linux operating system but CC wanted it in `GByte`.
|
||||
|
||||
With the message processor, the Ganglia sinks can apply the unit prefix changes individually and name the metrics as required by Ganglia.
|
||||
|
||||
## For developers
|
||||
|
||||
Whenever you receive or are about to send a message out, you should provide some processing.
|
||||
|
||||
### Configuration of component
|
||||
|
||||
New operations can be added to the message processor at runtime. Of course, they can also be removed again. For the initial setup, having a configuration file
|
||||
or some fields in a configuration file for the processing.
|
||||
|
||||
The message processor uses the following configuration
|
||||
|
||||
```json
|
||||
{
|
||||
"drop_messages": [
|
||||
"name_of_message_to_drop"
|
||||
],
|
||||
"drop_messages_if": [
|
||||
"condition_when_to_drop_message",
|
||||
"name == 'drop_this'",
|
||||
"tag.hostname == 'this_host'",
|
||||
"meta.unit != 'MB'"
|
||||
],
|
||||
"rename_messages" : {
|
||||
"old_message_name" : "new_message_name"
|
||||
},
|
||||
"rename_messages_if": {
|
||||
"condition_when_to_rename_message" : "new_name"
|
||||
},
|
||||
"add_tags_if": [
|
||||
{
|
||||
"if" : "condition_when_to_add_tag",
|
||||
"key": "name_for_new_tag",
|
||||
"value": "new_tag_value"
|
||||
}
|
||||
],
|
||||
"delete_tags_if": [
|
||||
{
|
||||
"if" : "condition_when_to_delete_tag",
|
||||
"key": "name_of_tag"
|
||||
}
|
||||
],
|
||||
"add_meta_if": [
|
||||
{
|
||||
"if" : "condition_when_to_add_meta_info",
|
||||
"key": "name_for_new_meta_info",
|
||||
"value": "new_meta_info_value"
|
||||
}
|
||||
],
|
||||
"delete_meta_if": [
|
||||
{
|
||||
"if" : "condition_when_to_delete_meta_info",
|
||||
"key": "name_of_meta_info"
|
||||
}
|
||||
],
|
||||
"add_field_if": [
|
||||
{
|
||||
"if" : "condition_when_to_add_field",
|
||||
"key": "name_for_new_field",
|
||||
"value": "new_field_value_but_only_string_at_the_moment"
|
||||
}
|
||||
],
|
||||
"delete_field_if": [
|
||||
{
|
||||
"if" : "condition_when_to_delete_field",
|
||||
"key": "name_of_field"
|
||||
}
|
||||
],
|
||||
"move_tag_to_meta_if": [
|
||||
{
|
||||
"if" : "condition_when_to_move_tag_to_meta_info_including_its_value",
|
||||
"key": "name_of_tag",
|
||||
"value": "name_of_meta_info"
|
||||
}
|
||||
],
|
||||
"move_tag_to_field_if": [
|
||||
{
|
||||
"if" : "condition_when_to_move_tag_to_fields_including_its_value",
|
||||
"key": "name_of_tag",
|
||||
"value": "name_of_field"
|
||||
}
|
||||
],
|
||||
"move_meta_to_tag_if": [
|
||||
{
|
||||
"if" : "condition_when_to_move_meta_info_to_tags_including_its_value",
|
||||
"key": "name_of_meta_info",
|
||||
"value": "name_of_tag"
|
||||
}
|
||||
],
|
||||
"move_meta_to_field_if": [
|
||||
{
|
||||
"if" : "condition_when_to_move_meta_info_to_fields_including_its_value",
|
||||
"key": "name_of_tag",
|
||||
"value": "name_of_meta_info"
|
||||
}
|
||||
],
|
||||
"move_field_to_tag_if": [
|
||||
{
|
||||
"if" : "condition_when_to_move_field_to_tags_including_its_stringified_value",
|
||||
"key": "name_of_field",
|
||||
"value": "name_of_tag"
|
||||
}
|
||||
],
|
||||
"move_field_to_meta_if": [
|
||||
{
|
||||
"if" : "condition_when_to_move_field_to_meta_info_including_its_stringified_value",
|
||||
"key": "name_of_field",
|
||||
"value": "name_of_meta_info"
|
||||
}
|
||||
],
|
||||
"drop_by_message_type": [
|
||||
"metric",
|
||||
"event",
|
||||
"log",
|
||||
"control"
|
||||
],
|
||||
"change_unit_prefix": {
|
||||
"name == 'metric_with_wrong_unit_prefix'" : "G",
|
||||
"only_if_messagetype == 'metric'": "T"
|
||||
},
|
||||
"normalize_units": true,
|
||||
"add_base_env": {
|
||||
"MY_CONSTANT_FOR_CUSTOM_CONDITIONS": 1.0,
|
||||
"output_value_for_test_metrics": 42.0,
|
||||
},
|
||||
"stage_order": [
|
||||
"rename_messages_if",
|
||||
"drop_messages"
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
The options `change_unit_prefix` and `normalize_units` are only applied to CCMetrics. It is not possible to delete the field related to each message type as defined in [cc-specification](https://github.com/ClusterCockpit/cc-specifications/tree/master/interfaces/lineprotocol). In short:
|
||||
- CCMetrics always have to have a field named `value`
|
||||
- CCEvents always have to have a field named `event`
|
||||
- CCLogs always have to have a field named `log`
|
||||
- CCControl messages always have to have a field named `control`
|
||||
|
||||
With `add_base_env`, one can specifiy mykey=myvalue pairs that can be used in conditions like `tag.type == mykey`.
|
||||
|
||||
The order in which each message is processed, can be specified with the `stage_order` option. The stage names are the keys in the JSON configuration, thus `change_unit_prefix`, `move_field_to_meta_if`, etc. Stages can be listed multiple times.
|
||||
|
||||
### Using the component
|
||||
In order to load the configuration from a `json.RawMessage`:
|
||||
```golang
|
||||
mp, err := NewMessageProcessor()
|
||||
if err != nil {
|
||||
log.Error("failed to create new message processor")
|
||||
}
|
||||
mp.FromConfigJSON(configJson)
|
||||
```
|
||||
|
||||
After initialization and adding the different operations, the `ProcessMessage()` function applies all operations and returns whether the message should be dropped.
|
||||
|
||||
```golang
|
||||
m := lp.CCMetric{}
|
||||
|
||||
x, err := mp.ProcessMessage(m)
|
||||
if err != nil {
|
||||
// handle error
|
||||
}
|
||||
if x != nil {
|
||||
// process x further
|
||||
} else {
|
||||
// this message got dropped
|
||||
}
|
||||
```
|
||||
|
||||
Single operations can be added and removed at runtime
|
||||
```golang
|
||||
type MessageProcessor interface {
|
||||
// Functions to set the execution order of the processing stages
|
||||
SetStages([]string) error
|
||||
DefaultStages() []string
|
||||
// Function to add variables to the base evaluation environment
|
||||
AddBaseEnv(env map[string]interface{}) error
|
||||
// Functions to add and remove rules
|
||||
AddDropMessagesByName(name string) error
|
||||
RemoveDropMessagesByName(name string)
|
||||
AddDropMessagesByCondition(condition string) error
|
||||
RemoveDropMessagesByCondition(condition string)
|
||||
AddRenameMetricByCondition(condition string, name string) error
|
||||
RemoveRenameMetricByCondition(condition string)
|
||||
AddRenameMetricByName(from, to string) error
|
||||
RemoveRenameMetricByName(from string)
|
||||
SetNormalizeUnits(settings bool)
|
||||
AddChangeUnitPrefix(condition string, prefix string) error
|
||||
RemoveChangeUnitPrefix(condition string)
|
||||
AddAddTagsByCondition(condition, key, value string) error
|
||||
RemoveAddTagsByCondition(condition string)
|
||||
AddDeleteTagsByCondition(condition, key, value string) error
|
||||
RemoveDeleteTagsByCondition(condition string)
|
||||
AddAddMetaByCondition(condition, key, value string) error
|
||||
RemoveAddMetaByCondition(condition string)
|
||||
AddDeleteMetaByCondition(condition, key, value string) error
|
||||
RemoveDeleteMetaByCondition(condition string)
|
||||
AddMoveTagToMeta(condition, key, value string) error
|
||||
RemoveMoveTagToMeta(condition string)
|
||||
AddMoveTagToFields(condition, key, value string) error
|
||||
RemoveMoveTagToFields(condition string)
|
||||
AddMoveMetaToTags(condition, key, value string) error
|
||||
RemoveMoveMetaToTags(condition string)
|
||||
AddMoveMetaToFields(condition, key, value string) error
|
||||
RemoveMoveMetaToFields(condition string)
|
||||
AddMoveFieldToTags(condition, key, value string) error
|
||||
RemoveMoveFieldToTags(condition string)
|
||||
AddMoveFieldToMeta(condition, key, value string) error
|
||||
RemoveMoveFieldToMeta(condition string)
|
||||
// Read in a JSON configuration
|
||||
FromConfigJSON(config json.RawMessage) error
|
||||
ProcessMessage(m lp2.CCMessage) (lp2.CCMessage, error)
|
||||
// Processing functions for legacy CCMetric and current CCMessage
|
||||
ProcessMetric(m lp.CCMetric) (lp2.CCMessage, error)
|
||||
}
|
||||
```
|
||||
|
||||
|
||||
### Syntax for evaluatable terms
|
||||
|
||||
The message processor uses `gval` for evaluating the terms. It provides a basic set of operators like string comparison and arithmetic operations.
|
||||
|
||||
Accessible for operations are
|
||||
- `name` of the message
|
||||
- `timestamp` or `time` of the message
|
||||
- `type`, `type-id` of the message (also `tag_type`, `tag_type-id` and `tag_typeid`)
|
||||
- `stype`, `stype-id` of the message (if message has theses tags, also `tag_stype`, `tag_stype-id` and `tag_stypeid`)
|
||||
- `value` for a CCMetric message (also `field_value`)
|
||||
- `event` for a CCEvent message (also `field_event`)
|
||||
- `control` for a CCControl message (also `field_control`)
|
||||
- `log` for a CCLog message (also `field_log`)
|
||||
- `messagetype` or `msgtype`. Possible values `event`, `metric`, `log` and `control`.
|
||||
|
||||
Generally, all tags are accessible with `tag_<tagkey>`, `tags_<tagkey>` or `tags.<tagkey>`. Similarly for all fields with `field[s]?[_.]<fieldkey>`. For meta information `meta[_.]<metakey>` (there is no `metas[_.]<metakey>`).
|
||||
|
||||
The [syntax of `expr`](https://expr-lang.org/docs/language-definition) is accepted with some additions:
|
||||
- Comparing strings: `==`, `!=`, `str matches regex` (use `%` instead of `\`!)
|
||||
- Combining conditions: `&&`, `||`
|
||||
- Comparing numbers: `==`, `!=`, `<`, `>`, `<=`, `>=`
|
||||
- Test lists: `<value> in <list>`
|
||||
- Topological tests: `tag_type-id in getCpuListOfType("socket", "1")` (test if the metric belongs to socket 1 in local node topology)
|
||||
|
||||
Often the operations are written in JSON files for loading them at startup. In JSON, some characters are not allowed. Therefore, the term syntax reflects that:
|
||||
- use `''` instead of `""` for strings
|
||||
- for the regexes, use `%` instead of `\`
|
||||
|
||||
|
||||
For operations that should be applied on all messages, use the condition `true`.
|
||||
|
||||
### Overhead
|
||||
|
||||
The operations taking conditions are pre-processed, which is commonly the time consuming part but, of course, with each added operation, the time to process a message
|
||||
increases. Moreover, the processing creates a copy of the message.
|
||||
|
@@ -1,988 +0,0 @@
|
||||
package messageprocessor
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message"
|
||||
cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
|
||||
lplegacy "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
|
||||
|
||||
"github.com/expr-lang/expr"
|
||||
"github.com/expr-lang/expr/vm"
|
||||
)
|
||||
|
||||
// Message processor add/delete tag/meta configuration
|
||||
type messageProcessorTagConfig struct {
|
||||
Key string `json:"key"` // Tag name
|
||||
Value string `json:"value,omitempty"` // Tag value
|
||||
Condition string `json:"if"` // Condition for adding or removing corresponding tag
|
||||
}
|
||||
|
||||
type messageProcessorConfig struct {
|
||||
StageOrder []string `json:"stage_order,omitempty"` // List of stages to execute them in the specified order and to skip unrequired ones
|
||||
DropMessages []string `json:"drop_messages,omitempty"` // List of metric names to drop. For fine-grained dropping use drop_messages_if
|
||||
DropMessagesIf []string `json:"drop_messages_if,omitempty"` // List of evaluatable terms to drop messages
|
||||
RenameMessages map[string]string `json:"rename_messages,omitempty"` // Map of metric names to rename
|
||||
RenameMessagesIf map[string]string `json:"rename_messages_if,omitempty"` // Map to rename metric name based on a condition
|
||||
NormalizeUnits bool `json:"normalize_units,omitempty"` // Check unit meta flag and normalize it using cc-units
|
||||
ChangeUnitPrefix map[string]string `json:"change_unit_prefix,omitempty"` // Add prefix that should be applied to the messages
|
||||
AddTagsIf []messageProcessorTagConfig `json:"add_tags_if"` // List of tags that are added when the condition is met
|
||||
DelTagsIf []messageProcessorTagConfig `json:"delete_tags_if"` // List of tags that are removed when the condition is met
|
||||
AddMetaIf []messageProcessorTagConfig `json:"add_meta_if"` // List of meta infos that are added when the condition is met
|
||||
DelMetaIf []messageProcessorTagConfig `json:"delete_meta_if"` // List of meta infos that are removed when the condition is met
|
||||
AddFieldIf []messageProcessorTagConfig `json:"add_field_if"` // List of fields that are added when the condition is met
|
||||
DelFieldIf []messageProcessorTagConfig `json:"delete_field_if"` // List of fields that are removed when the condition is met
|
||||
DropByType []string `json:"drop_by_message_type"` // List of message types that should be dropped
|
||||
MoveTagToMeta []messageProcessorTagConfig `json:"move_tag_to_meta_if"`
|
||||
MoveTagToField []messageProcessorTagConfig `json:"move_tag_to_field_if"`
|
||||
MoveMetaToTag []messageProcessorTagConfig `json:"move_meta_to_tag_if"`
|
||||
MoveMetaToField []messageProcessorTagConfig `json:"move_meta_to_field_if"`
|
||||
MoveFieldToTag []messageProcessorTagConfig `json:"move_field_to_tag_if"`
|
||||
MoveFieldToMeta []messageProcessorTagConfig `json:"move_field_to_meta_if"`
|
||||
AddBaseEnv map[string]interface{} `json:"add_base_env"`
|
||||
}
|
||||
|
||||
type messageProcessor struct {
|
||||
|
||||
// For thread-safety
|
||||
mutex sync.RWMutex
|
||||
|
||||
// mapping contains all evalables as strings to gval.Evaluable
|
||||
// because it is not possible to get the original string out of
|
||||
// a gval.Evaluable
|
||||
mapping map[string]*vm.Program
|
||||
|
||||
stages []string // order of stage execution
|
||||
dropMessages map[string]struct{} // internal lookup map
|
||||
dropTypes map[string]struct{} // internal lookup map
|
||||
dropMessagesIf map[*vm.Program]struct{} // pre-processed dropMessagesIf
|
||||
renameMessages map[string]string // internal lookup map
|
||||
renameMessagesIf map[*vm.Program]string // pre-processed RenameMessagesIf
|
||||
changeUnitPrefix map[*vm.Program]string // pre-processed ChangeUnitPrefix
|
||||
normalizeUnits bool
|
||||
addTagsIf map[*vm.Program]messageProcessorTagConfig // pre-processed AddTagsIf
|
||||
deleteTagsIf map[*vm.Program]messageProcessorTagConfig // pre-processed DelTagsIf
|
||||
addMetaIf map[*vm.Program]messageProcessorTagConfig // pre-processed AddMetaIf
|
||||
deleteMetaIf map[*vm.Program]messageProcessorTagConfig // pre-processed DelMetaIf
|
||||
addFieldIf map[*vm.Program]messageProcessorTagConfig // pre-processed AddFieldIf
|
||||
deleteFieldIf map[*vm.Program]messageProcessorTagConfig // pre-processed DelFieldIf
|
||||
moveTagToMeta map[*vm.Program]messageProcessorTagConfig // pre-processed MoveTagToMeta
|
||||
moveTagToField map[*vm.Program]messageProcessorTagConfig // pre-processed MoveTagToField
|
||||
moveMetaToTag map[*vm.Program]messageProcessorTagConfig // pre-processed MoveMetaToTag
|
||||
moveMetaToField map[*vm.Program]messageProcessorTagConfig // pre-processed MoveMetaToField
|
||||
moveFieldToTag map[*vm.Program]messageProcessorTagConfig // pre-processed MoveFieldToTag
|
||||
moveFieldToMeta map[*vm.Program]messageProcessorTagConfig // pre-processed MoveFieldToMeta
|
||||
}
|
||||
|
||||
type MessageProcessor interface {
|
||||
// Functions to set the execution order of the processing stages
|
||||
SetStages([]string) error
|
||||
DefaultStages() []string
|
||||
// Function to add variables to the base evaluation environment
|
||||
AddBaseEnv(env map[string]interface{}) error
|
||||
// Functions to add and remove rules
|
||||
AddDropMessagesByName(name string) error
|
||||
RemoveDropMessagesByName(name string)
|
||||
AddDropMessagesByCondition(condition string) error
|
||||
RemoveDropMessagesByCondition(condition string)
|
||||
AddRenameMetricByCondition(condition string, name string) error
|
||||
RemoveRenameMetricByCondition(condition string)
|
||||
AddRenameMetricByName(from, to string) error
|
||||
RemoveRenameMetricByName(from string)
|
||||
SetNormalizeUnits(settings bool)
|
||||
AddChangeUnitPrefix(condition string, prefix string) error
|
||||
RemoveChangeUnitPrefix(condition string)
|
||||
AddAddTagsByCondition(condition, key, value string) error
|
||||
RemoveAddTagsByCondition(condition string)
|
||||
AddDeleteTagsByCondition(condition, key, value string) error
|
||||
RemoveDeleteTagsByCondition(condition string)
|
||||
AddAddMetaByCondition(condition, key, value string) error
|
||||
RemoveAddMetaByCondition(condition string)
|
||||
AddDeleteMetaByCondition(condition, key, value string) error
|
||||
RemoveDeleteMetaByCondition(condition string)
|
||||
AddMoveTagToMeta(condition, key, value string) error
|
||||
RemoveMoveTagToMeta(condition string)
|
||||
AddMoveTagToFields(condition, key, value string) error
|
||||
RemoveMoveTagToFields(condition string)
|
||||
AddMoveMetaToTags(condition, key, value string) error
|
||||
RemoveMoveMetaToTags(condition string)
|
||||
AddMoveMetaToFields(condition, key, value string) error
|
||||
RemoveMoveMetaToFields(condition string)
|
||||
AddMoveFieldToTags(condition, key, value string) error
|
||||
RemoveMoveFieldToTags(condition string)
|
||||
AddMoveFieldToMeta(condition, key, value string) error
|
||||
RemoveMoveFieldToMeta(condition string)
|
||||
// Read in a JSON configuration
|
||||
FromConfigJSON(config json.RawMessage) error
|
||||
// Processing functions for legacy CCMetric and current CCMessage
|
||||
ProcessMetric(m lplegacy.CCMetric) (lp.CCMessage, error)
|
||||
ProcessMessage(m lp.CCMessage) (lp.CCMessage, error)
|
||||
//EvalToBool(condition string, parameters map[string]interface{}) (bool, error)
|
||||
//EvalToFloat64(condition string, parameters map[string]interface{}) (float64, error)
|
||||
//EvalToString(condition string, parameters map[string]interface{}) (string, error)
|
||||
}
|
||||
|
||||
const (
|
||||
STAGENAME_DROP_BY_NAME string = "drop_by_name"
|
||||
STAGENAME_DROP_BY_TYPE string = "drop_by_type"
|
||||
STAGENAME_DROP_IF string = "drop_if"
|
||||
STAGENAME_ADD_TAG string = "add_tag"
|
||||
STAGENAME_DELETE_TAG string = "delete_tag"
|
||||
STAGENAME_MOVE_TAG_META string = "move_tag_to_meta"
|
||||
STAGENAME_MOVE_TAG_FIELD string = "move_tag_to_fields"
|
||||
STAGENAME_ADD_META string = "add_meta"
|
||||
STAGENAME_DELETE_META string = "delete_meta"
|
||||
STAGENAME_MOVE_META_TAG string = "move_meta_to_tags"
|
||||
STAGENAME_MOVE_META_FIELD string = "move_meta_to_fields"
|
||||
STAGENAME_ADD_FIELD string = "add_field"
|
||||
STAGENAME_DELETE_FIELD string = "delete_field"
|
||||
STAGENAME_MOVE_FIELD_TAG string = "move_field_to_tags"
|
||||
STAGENAME_MOVE_FIELD_META string = "move_field_to_meta"
|
||||
STAGENAME_RENAME_BY_NAME string = "rename"
|
||||
STAGENAME_RENAME_IF string = "rename_if"
|
||||
STAGENAME_CHANGE_UNIT_PREFIX string = "change_unit_prefix"
|
||||
STAGENAME_NORMALIZE_UNIT string = "normalize_unit"
|
||||
)
|
||||
|
||||
var StageNames = []string{
|
||||
STAGENAME_DROP_BY_NAME,
|
||||
STAGENAME_DROP_BY_TYPE,
|
||||
STAGENAME_DROP_IF,
|
||||
STAGENAME_ADD_TAG,
|
||||
STAGENAME_DELETE_TAG,
|
||||
STAGENAME_MOVE_TAG_META,
|
||||
STAGENAME_MOVE_TAG_FIELD,
|
||||
STAGENAME_ADD_META,
|
||||
STAGENAME_DELETE_META,
|
||||
STAGENAME_MOVE_META_TAG,
|
||||
STAGENAME_MOVE_META_FIELD,
|
||||
STAGENAME_ADD_FIELD,
|
||||
STAGENAME_DELETE_FIELD,
|
||||
STAGENAME_MOVE_FIELD_TAG,
|
||||
STAGENAME_MOVE_FIELD_META,
|
||||
STAGENAME_RENAME_BY_NAME,
|
||||
STAGENAME_RENAME_IF,
|
||||
STAGENAME_CHANGE_UNIT_PREFIX,
|
||||
STAGENAME_NORMALIZE_UNIT,
|
||||
}
|
||||
|
||||
var paramMapPool = sync.Pool{
|
||||
New: func() any {
|
||||
return make(map[string]interface{})
|
||||
},
|
||||
}
|
||||
|
||||
func sanitizeExprString(key string) string {
|
||||
return strings.ReplaceAll(key, "type-id", "typeid")
|
||||
}
|
||||
|
||||
func getParamMap(point lp.CCMetric) map[string]interface{} {
|
||||
params := paramMapPool.Get().(map[string]interface{})
|
||||
params["message"] = point
|
||||
params["msg"] = point
|
||||
params["name"] = point.Name()
|
||||
params["timestamp"] = point.Time().Unix()
|
||||
params["time"] = params["timestamp"]
|
||||
|
||||
fields := paramMapPool.Get().(map[string]interface{})
|
||||
for key, value := range point.Fields() {
|
||||
fields[key] = value
|
||||
switch key {
|
||||
case "value":
|
||||
params["messagetype"] = "metric"
|
||||
params["value"] = value
|
||||
params["metric"] = value
|
||||
case "event":
|
||||
params["messagetype"] = "event"
|
||||
params["event"] = value
|
||||
case "control":
|
||||
params["messagetype"] = "control"
|
||||
params["control"] = value
|
||||
case "log":
|
||||
params["messagetype"] = "log"
|
||||
params["log"] = value
|
||||
default:
|
||||
params["messagetype"] = "unknown"
|
||||
}
|
||||
}
|
||||
params["msgtype"] = params["messagetype"]
|
||||
params["fields"] = fields
|
||||
params["field"] = fields
|
||||
tags := paramMapPool.Get().(map[string]interface{})
|
||||
for key, value := range point.Tags() {
|
||||
tags[sanitizeExprString(key)] = value
|
||||
}
|
||||
params["tags"] = tags
|
||||
params["tag"] = tags
|
||||
meta := paramMapPool.Get().(map[string]interface{})
|
||||
for key, value := range point.Meta() {
|
||||
meta[sanitizeExprString(key)] = value
|
||||
}
|
||||
params["meta"] = meta
|
||||
return params
|
||||
}
|
||||
|
||||
var baseenv = map[string]interface{}{
|
||||
"name": "",
|
||||
"messagetype": "unknown",
|
||||
"msgtype": "unknown",
|
||||
"tag": map[string]interface{}{
|
||||
"type": "unknown",
|
||||
"typeid": "0",
|
||||
"stype": "unknown",
|
||||
"stypeid": "0",
|
||||
"hostname": "localhost",
|
||||
"cluster": "nocluster",
|
||||
},
|
||||
"tags": map[string]interface{}{
|
||||
"type": "unknown",
|
||||
"typeid": "0",
|
||||
"stype": "unknown",
|
||||
"stypeid": "0",
|
||||
"hostname": "localhost",
|
||||
"cluster": "nocluster",
|
||||
},
|
||||
"meta": map[string]interface{}{
|
||||
"unit": "invalid",
|
||||
"source": "unknown",
|
||||
},
|
||||
"fields": map[string]interface{}{
|
||||
"value": 0,
|
||||
"event": "",
|
||||
"control": "",
|
||||
"log": "",
|
||||
},
|
||||
"field": map[string]interface{}{
|
||||
"value": 0,
|
||||
"event": "",
|
||||
"control": "",
|
||||
"log": "",
|
||||
},
|
||||
"timestamp": 1234567890,
|
||||
"msg": lp.EmptyMessage(),
|
||||
"message": lp.EmptyMessage(),
|
||||
}
|
||||
|
||||
func addBaseEnvWalker(values map[string]interface{}) map[string]interface{} {
|
||||
out := make(map[string]interface{})
|
||||
for k, v := range values {
|
||||
switch value := v.(type) {
|
||||
case int, int32, int64, uint, uint32, uint64, string, float32, float64:
|
||||
out[k] = value
|
||||
case map[string]interface{}:
|
||||
if _, ok := baseenv[k]; !ok {
|
||||
out[k] = addBaseEnvWalker(value)
|
||||
}
|
||||
}
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
func (mp *messageProcessor) AddBaseEnv(env map[string]interface{}) error {
|
||||
for k, v := range env {
|
||||
switch value := v.(type) {
|
||||
case int, int32, int64, uint, uint32, uint64, string, float32, float64:
|
||||
baseenv[k] = value
|
||||
case map[string]interface{}:
|
||||
if _, ok := baseenv[k]; !ok {
|
||||
baseenv[k] = addBaseEnvWalker(value)
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mp *messageProcessor) init() error {
|
||||
mp.stages = make([]string, 0)
|
||||
mp.mapping = make(map[string]*vm.Program)
|
||||
mp.dropMessages = make(map[string]struct{})
|
||||
mp.dropTypes = make(map[string]struct{})
|
||||
mp.dropMessagesIf = make(map[*vm.Program]struct{})
|
||||
mp.renameMessages = make(map[string]string)
|
||||
mp.renameMessagesIf = make(map[*vm.Program]string)
|
||||
mp.changeUnitPrefix = make(map[*vm.Program]string)
|
||||
mp.addTagsIf = make(map[*vm.Program]messageProcessorTagConfig)
|
||||
mp.addMetaIf = make(map[*vm.Program]messageProcessorTagConfig)
|
||||
mp.addFieldIf = make(map[*vm.Program]messageProcessorTagConfig)
|
||||
mp.deleteTagsIf = make(map[*vm.Program]messageProcessorTagConfig)
|
||||
mp.deleteMetaIf = make(map[*vm.Program]messageProcessorTagConfig)
|
||||
mp.deleteFieldIf = make(map[*vm.Program]messageProcessorTagConfig)
|
||||
mp.moveFieldToMeta = make(map[*vm.Program]messageProcessorTagConfig)
|
||||
mp.moveFieldToTag = make(map[*vm.Program]messageProcessorTagConfig)
|
||||
mp.moveMetaToField = make(map[*vm.Program]messageProcessorTagConfig)
|
||||
mp.moveMetaToTag = make(map[*vm.Program]messageProcessorTagConfig)
|
||||
mp.moveTagToField = make(map[*vm.Program]messageProcessorTagConfig)
|
||||
mp.moveTagToMeta = make(map[*vm.Program]messageProcessorTagConfig)
|
||||
mp.normalizeUnits = false
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mp *messageProcessor) AddDropMessagesByName(name string) error {
|
||||
mp.mutex.Lock()
|
||||
if _, ok := mp.dropMessages[name]; !ok {
|
||||
mp.dropMessages[name] = struct{}{}
|
||||
}
|
||||
mp.mutex.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mp *messageProcessor) RemoveDropMessagesByName(name string) {
|
||||
mp.mutex.Lock()
|
||||
delete(mp.dropMessages, name)
|
||||
mp.mutex.Unlock()
|
||||
}
|
||||
|
||||
func (mp *messageProcessor) AddDropMessagesByType(typestring string) error {
|
||||
valid := []string{"metric", "event", "control", "log"}
|
||||
isValid := false
|
||||
for _, t := range valid {
|
||||
if t == typestring {
|
||||
isValid = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if isValid {
|
||||
mp.mutex.Lock()
|
||||
if _, ok := mp.dropTypes[typestring]; !ok {
|
||||
cclog.ComponentDebug("MessageProcessor", "Adding type", typestring, "for dropping")
|
||||
mp.dropTypes[typestring] = struct{}{}
|
||||
}
|
||||
mp.mutex.Unlock()
|
||||
} else {
|
||||
return fmt.Errorf("invalid message type %s", typestring)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mp *messageProcessor) RemoveDropMessagesByType(typestring string) {
|
||||
mp.mutex.Lock()
|
||||
delete(mp.dropTypes, typestring)
|
||||
mp.mutex.Unlock()
|
||||
}
|
||||
|
||||
func (mp *messageProcessor) addTagConfig(condition, key, value string, config *map[*vm.Program]messageProcessorTagConfig) error {
|
||||
var err error
|
||||
evaluable, err := expr.Compile(sanitizeExprString(condition), expr.Env(baseenv), expr.AsBool())
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create condition evaluable of '%s': %v", condition, err.Error())
|
||||
}
|
||||
mp.mutex.Lock()
|
||||
if _, ok := (*config)[evaluable]; !ok {
|
||||
mp.mapping[condition] = evaluable
|
||||
(*config)[evaluable] = messageProcessorTagConfig{
|
||||
Condition: condition,
|
||||
Key: key,
|
||||
Value: value,
|
||||
}
|
||||
}
|
||||
mp.mutex.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mp *messageProcessor) removeTagConfig(condition string, config *map[*vm.Program]messageProcessorTagConfig) {
|
||||
mp.mutex.Lock()
|
||||
if e, ok := mp.mapping[condition]; ok {
|
||||
delete(mp.mapping, condition)
|
||||
delete(*config, e)
|
||||
}
|
||||
mp.mutex.Unlock()
|
||||
}
|
||||
|
||||
func (mp *messageProcessor) AddAddTagsByCondition(condition, key, value string) error {
|
||||
return mp.addTagConfig(condition, key, value, &mp.addTagsIf)
|
||||
}
|
||||
|
||||
func (mp *messageProcessor) RemoveAddTagsByCondition(condition string) {
|
||||
mp.removeTagConfig(condition, &mp.addTagsIf)
|
||||
}
|
||||
|
||||
func (mp *messageProcessor) AddDeleteTagsByCondition(condition, key, value string) error {
|
||||
return mp.addTagConfig(condition, key, value, &mp.deleteTagsIf)
|
||||
}
|
||||
|
||||
func (mp *messageProcessor) RemoveDeleteTagsByCondition(condition string) {
|
||||
mp.removeTagConfig(condition, &mp.deleteTagsIf)
|
||||
}
|
||||
|
||||
func (mp *messageProcessor) AddAddMetaByCondition(condition, key, value string) error {
|
||||
return mp.addTagConfig(condition, key, value, &mp.addMetaIf)
|
||||
}
|
||||
|
||||
func (mp *messageProcessor) RemoveAddMetaByCondition(condition string) {
|
||||
mp.removeTagConfig(condition, &mp.addMetaIf)
|
||||
}
|
||||
|
||||
func (mp *messageProcessor) AddDeleteMetaByCondition(condition, key, value string) error {
|
||||
return mp.addTagConfig(condition, key, value, &mp.deleteMetaIf)
|
||||
}
|
||||
|
||||
func (mp *messageProcessor) RemoveDeleteMetaByCondition(condition string) {
|
||||
mp.removeTagConfig(condition, &mp.deleteMetaIf)
|
||||
}
|
||||
|
||||
func (mp *messageProcessor) AddAddFieldByCondition(condition, key, value string) error {
|
||||
return mp.addTagConfig(condition, key, value, &mp.addFieldIf)
|
||||
}
|
||||
|
||||
func (mp *messageProcessor) RemoveAddFieldByCondition(condition string) {
|
||||
mp.removeTagConfig(condition, &mp.addFieldIf)
|
||||
}
|
||||
|
||||
func (mp *messageProcessor) AddDeleteFieldByCondition(condition, key, value string) error {
|
||||
return mp.addTagConfig(condition, key, value, &mp.deleteFieldIf)
|
||||
}
|
||||
|
||||
func (mp *messageProcessor) RemoveDeleteFieldByCondition(condition string) {
|
||||
mp.removeTagConfig(condition, &mp.deleteFieldIf)
|
||||
}
|
||||
|
||||
func (mp *messageProcessor) AddDropMessagesByCondition(condition string) error {
|
||||
|
||||
var err error
|
||||
evaluable, err := expr.Compile(sanitizeExprString(condition), expr.Env(baseenv), expr.AsBool())
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create condition evaluable of '%s': %v", condition, err.Error())
|
||||
}
|
||||
mp.mutex.Lock()
|
||||
if _, ok := mp.dropMessagesIf[evaluable]; !ok {
|
||||
mp.mapping[condition] = evaluable
|
||||
mp.dropMessagesIf[evaluable] = struct{}{}
|
||||
}
|
||||
mp.mutex.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mp *messageProcessor) RemoveDropMessagesByCondition(condition string) {
|
||||
mp.mutex.Lock()
|
||||
if e, ok := mp.mapping[condition]; ok {
|
||||
delete(mp.mapping, condition)
|
||||
delete(mp.dropMessagesIf, e)
|
||||
}
|
||||
mp.mutex.Unlock()
|
||||
}
|
||||
|
||||
func (mp *messageProcessor) AddRenameMetricByCondition(condition string, name string) error {
|
||||
|
||||
var err error
|
||||
evaluable, err := expr.Compile(sanitizeExprString(condition), expr.Env(baseenv), expr.AsBool())
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create condition evaluable of '%s': %v", condition, err.Error())
|
||||
}
|
||||
mp.mutex.Lock()
|
||||
if _, ok := mp.renameMessagesIf[evaluable]; !ok {
|
||||
mp.mapping[condition] = evaluable
|
||||
mp.renameMessagesIf[evaluable] = name
|
||||
} else {
|
||||
mp.renameMessagesIf[evaluable] = name
|
||||
}
|
||||
mp.mutex.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mp *messageProcessor) RemoveRenameMetricByCondition(condition string) {
|
||||
mp.mutex.Lock()
|
||||
if e, ok := mp.mapping[condition]; ok {
|
||||
delete(mp.mapping, condition)
|
||||
delete(mp.renameMessagesIf, e)
|
||||
}
|
||||
mp.mutex.Unlock()
|
||||
}
|
||||
|
||||
func (mp *messageProcessor) SetNormalizeUnits(setting bool) {
|
||||
mp.normalizeUnits = setting
|
||||
}
|
||||
|
||||
func (mp *messageProcessor) AddChangeUnitPrefix(condition string, prefix string) error {
|
||||
|
||||
var err error
|
||||
evaluable, err := expr.Compile(sanitizeExprString(condition), expr.Env(baseenv), expr.AsBool())
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create condition evaluable of '%s': %v", condition, err.Error())
|
||||
}
|
||||
mp.mutex.Lock()
|
||||
if _, ok := mp.changeUnitPrefix[evaluable]; !ok {
|
||||
mp.mapping[condition] = evaluable
|
||||
mp.changeUnitPrefix[evaluable] = prefix
|
||||
} else {
|
||||
mp.changeUnitPrefix[evaluable] = prefix
|
||||
}
|
||||
mp.mutex.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mp *messageProcessor) RemoveChangeUnitPrefix(condition string) {
|
||||
mp.mutex.Lock()
|
||||
if e, ok := mp.mapping[condition]; ok {
|
||||
delete(mp.mapping, condition)
|
||||
delete(mp.changeUnitPrefix, e)
|
||||
}
|
||||
mp.mutex.Unlock()
|
||||
}
|
||||
|
||||
func (mp *messageProcessor) AddRenameMetricByName(from, to string) error {
|
||||
mp.mutex.Lock()
|
||||
if _, ok := mp.renameMessages[from]; !ok {
|
||||
mp.renameMessages[from] = to
|
||||
}
|
||||
mp.mutex.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mp *messageProcessor) RemoveRenameMetricByName(from string) {
|
||||
mp.mutex.Lock()
|
||||
delete(mp.renameMessages, from)
|
||||
mp.mutex.Unlock()
|
||||
}
|
||||
|
||||
func (mp *messageProcessor) AddMoveTagToMeta(condition, key, value string) error {
|
||||
return mp.addTagConfig(condition, key, value, &mp.moveTagToMeta)
|
||||
}
|
||||
|
||||
func (mp *messageProcessor) RemoveMoveTagToMeta(condition string) {
|
||||
mp.removeTagConfig(condition, &mp.moveTagToMeta)
|
||||
}
|
||||
|
||||
func (mp *messageProcessor) AddMoveTagToFields(condition, key, value string) error {
|
||||
return mp.addTagConfig(condition, key, value, &mp.moveTagToField)
|
||||
}
|
||||
|
||||
func (mp *messageProcessor) RemoveMoveTagToFields(condition string) {
|
||||
mp.removeTagConfig(condition, &mp.moveTagToField)
|
||||
}
|
||||
|
||||
func (mp *messageProcessor) AddMoveMetaToTags(condition, key, value string) error {
|
||||
return mp.addTagConfig(condition, key, value, &mp.moveMetaToTag)
|
||||
}
|
||||
|
||||
func (mp *messageProcessor) RemoveMoveMetaToTags(condition string) {
|
||||
mp.removeTagConfig(condition, &mp.moveMetaToTag)
|
||||
}
|
||||
|
||||
func (mp *messageProcessor) AddMoveMetaToFields(condition, key, value string) error {
|
||||
return mp.addTagConfig(condition, key, value, &mp.moveMetaToField)
|
||||
}
|
||||
|
||||
func (mp *messageProcessor) RemoveMoveMetaToFields(condition string) {
|
||||
mp.removeTagConfig(condition, &mp.moveMetaToField)
|
||||
}
|
||||
|
||||
func (mp *messageProcessor) AddMoveFieldToTags(condition, key, value string) error {
|
||||
return mp.addTagConfig(condition, key, value, &mp.moveFieldToTag)
|
||||
}
|
||||
|
||||
func (mp *messageProcessor) RemoveMoveFieldToTags(condition string) {
|
||||
mp.removeTagConfig(condition, &mp.moveFieldToTag)
|
||||
}
|
||||
|
||||
func (mp *messageProcessor) AddMoveFieldToMeta(condition, key, value string) error {
|
||||
return mp.addTagConfig(condition, key, value, &mp.moveFieldToMeta)
|
||||
}
|
||||
|
||||
func (mp *messageProcessor) RemoveMoveFieldToMeta(condition string) {
|
||||
mp.removeTagConfig(condition, &mp.moveFieldToMeta)
|
||||
}
|
||||
|
||||
func (mp *messageProcessor) SetStages(stages []string) error {
|
||||
newstages := make([]string, 0)
|
||||
if len(stages) == 0 {
|
||||
mp.mutex.Lock()
|
||||
mp.stages = newstages
|
||||
mp.mutex.Unlock()
|
||||
return nil
|
||||
}
|
||||
for i, s := range stages {
|
||||
valid := false
|
||||
for _, v := range StageNames {
|
||||
if s == v {
|
||||
valid = true
|
||||
}
|
||||
}
|
||||
if valid {
|
||||
newstages = append(newstages, s)
|
||||
} else {
|
||||
return fmt.Errorf("invalid stage %s at index %d", s, i)
|
||||
}
|
||||
}
|
||||
mp.mutex.Lock()
|
||||
mp.stages = newstages
|
||||
mp.mutex.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mp *messageProcessor) DefaultStages() []string {
|
||||
return StageNames
|
||||
}
|
||||
|
||||
func (mp *messageProcessor) FromConfigJSON(config json.RawMessage) error {
|
||||
var c messageProcessorConfig
|
||||
|
||||
err := json.Unmarshal(config, &c)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to process config JSON: %v", err.Error())
|
||||
}
|
||||
|
||||
if len(c.StageOrder) > 0 {
|
||||
err = mp.SetStages(c.StageOrder)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to process config JSON: %v", err.Error())
|
||||
}
|
||||
} else {
|
||||
err = mp.SetStages(mp.DefaultStages())
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to process config JSON: %v", err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
for _, m := range c.DropMessages {
|
||||
err = mp.AddDropMessagesByName(m)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to process config JSON: %v", err.Error())
|
||||
}
|
||||
}
|
||||
for _, m := range c.DropByType {
|
||||
err = mp.AddDropMessagesByType(m)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to process config JSON: %v", err.Error())
|
||||
}
|
||||
}
|
||||
for _, m := range c.DropMessagesIf {
|
||||
err = mp.AddDropMessagesByCondition(m)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to process config JSON: %v", err.Error())
|
||||
}
|
||||
}
|
||||
for k, v := range c.RenameMessagesIf {
|
||||
err = mp.AddRenameMetricByCondition(k, v)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to process config JSON: %v", err.Error())
|
||||
}
|
||||
}
|
||||
for k, v := range c.RenameMessages {
|
||||
err = mp.AddRenameMetricByName(k, v)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to process config JSON: %v", err.Error())
|
||||
}
|
||||
}
|
||||
for k, v := range c.ChangeUnitPrefix {
|
||||
err = mp.AddChangeUnitPrefix(k, v)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to process config JSON: %v", err.Error())
|
||||
}
|
||||
}
|
||||
for _, c := range c.AddTagsIf {
|
||||
err = mp.AddAddTagsByCondition(c.Condition, c.Key, c.Value)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to process config JSON: %v", err.Error())
|
||||
}
|
||||
}
|
||||
for _, c := range c.AddMetaIf {
|
||||
err = mp.AddAddMetaByCondition(c.Condition, c.Key, c.Value)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to process config JSON: %v", err.Error())
|
||||
}
|
||||
}
|
||||
for _, c := range c.AddFieldIf {
|
||||
err = mp.AddAddFieldByCondition(c.Condition, c.Key, c.Value)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to process config JSON: %v", err.Error())
|
||||
}
|
||||
}
|
||||
for _, c := range c.DelTagsIf {
|
||||
err = mp.AddDeleteTagsByCondition(c.Condition, c.Key, c.Value)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to process config JSON: %v", err.Error())
|
||||
}
|
||||
}
|
||||
for _, c := range c.DelMetaIf {
|
||||
err = mp.AddDeleteMetaByCondition(c.Condition, c.Key, c.Value)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to process config JSON: %v", err.Error())
|
||||
}
|
||||
}
|
||||
for _, c := range c.DelFieldIf {
|
||||
err = mp.AddDeleteFieldByCondition(c.Condition, c.Key, c.Value)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to process config JSON: %v", err.Error())
|
||||
}
|
||||
}
|
||||
for _, c := range c.MoveTagToMeta {
|
||||
err = mp.AddMoveTagToMeta(c.Condition, c.Key, c.Value)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to process config JSON: %v", err.Error())
|
||||
}
|
||||
}
|
||||
for _, c := range c.MoveTagToField {
|
||||
err = mp.AddMoveTagToFields(c.Condition, c.Key, c.Value)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to process config JSON: %v", err.Error())
|
||||
}
|
||||
}
|
||||
for _, c := range c.MoveMetaToTag {
|
||||
err = mp.AddMoveMetaToTags(c.Condition, c.Key, c.Value)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to process config JSON: %v", err.Error())
|
||||
}
|
||||
}
|
||||
for _, c := range c.MoveMetaToField {
|
||||
err = mp.AddMoveMetaToFields(c.Condition, c.Key, c.Value)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to process config JSON: %v", err.Error())
|
||||
}
|
||||
}
|
||||
for _, c := range c.MoveFieldToTag {
|
||||
err = mp.AddMoveFieldToTags(c.Condition, c.Key, c.Value)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to process config JSON: %v", err.Error())
|
||||
}
|
||||
}
|
||||
for _, c := range c.MoveFieldToMeta {
|
||||
err = mp.AddMoveFieldToMeta(c.Condition, c.Key, c.Value)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to process config JSON: %v", err.Error())
|
||||
}
|
||||
}
|
||||
for _, m := range c.DropByType {
|
||||
err = mp.AddDropMessagesByType(m)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to process config JSON: %v", err.Error())
|
||||
}
|
||||
}
|
||||
if len(c.AddBaseEnv) > 0 {
|
||||
err = mp.AddBaseEnv(c.AddBaseEnv)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to process config JSON: %v", err.Error())
|
||||
}
|
||||
}
|
||||
mp.SetNormalizeUnits(c.NormalizeUnits)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mp *messageProcessor) ProcessMetric(metric lplegacy.CCMetric) (lp.CCMessage, error) {
|
||||
m, err := lp.NewMessage(
|
||||
metric.Name(),
|
||||
metric.Tags(),
|
||||
metric.Meta(),
|
||||
metric.Fields(),
|
||||
metric.Time(),
|
||||
)
|
||||
if err != nil {
|
||||
return m, fmt.Errorf("failed to parse metric to message: %v", err.Error())
|
||||
}
|
||||
return mp.ProcessMessage(m)
|
||||
|
||||
}
|
||||
|
||||
func (mp *messageProcessor) ProcessMessage(m lp.CCMessage) (lp.CCMessage, error) {
|
||||
var err error = nil
|
||||
var out lp.CCMessage = lp.FromMessage(m)
|
||||
|
||||
name := out.Name()
|
||||
|
||||
if len(mp.stages) == 0 {
|
||||
mp.SetStages(mp.DefaultStages())
|
||||
}
|
||||
|
||||
mp.mutex.RLock()
|
||||
defer mp.mutex.RUnlock()
|
||||
|
||||
params := getParamMap(out)
|
||||
|
||||
defer func() {
|
||||
params["field"] = nil
|
||||
params["tag"] = nil
|
||||
paramMapPool.Put(params["fields"])
|
||||
paramMapPool.Put(params["tags"])
|
||||
paramMapPool.Put(params["meta"])
|
||||
paramMapPool.Put(params)
|
||||
}()
|
||||
|
||||
for _, s := range mp.stages {
|
||||
switch s {
|
||||
case STAGENAME_DROP_BY_NAME:
|
||||
if len(mp.dropMessages) > 0 {
|
||||
//cclog.ComponentDebug("MessageProcessor", "Dropping by message name ", name)
|
||||
if _, ok := mp.dropMessages[name]; ok {
|
||||
//cclog.ComponentDebug("MessageProcessor", "Drop")
|
||||
return nil, nil
|
||||
}
|
||||
}
|
||||
case STAGENAME_DROP_BY_TYPE:
|
||||
if len(mp.dropTypes) > 0 {
|
||||
//cclog.ComponentDebug("MessageProcessor", "Dropping by message type")
|
||||
if _, ok := mp.dropTypes[params["messagetype"].(string)]; ok {
|
||||
//cclog.ComponentDebug("MessageProcessor", "Drop")
|
||||
return nil, nil
|
||||
}
|
||||
}
|
||||
case STAGENAME_DROP_IF:
|
||||
if len(mp.dropMessagesIf) > 0 {
|
||||
//cclog.ComponentDebug("MessageProcessor", "Dropping by condition")
|
||||
drop, err := dropMessagesIf(¶ms, &mp.dropMessagesIf)
|
||||
if err != nil {
|
||||
return out, fmt.Errorf("failed to evaluate: %v", err.Error())
|
||||
}
|
||||
if drop {
|
||||
//cclog.ComponentDebug("MessageProcessor", "Drop")
|
||||
return nil, nil
|
||||
}
|
||||
}
|
||||
case STAGENAME_RENAME_BY_NAME:
|
||||
if len(mp.renameMessages) > 0 {
|
||||
//cclog.ComponentDebug("MessageProcessor", "Renaming by name match")
|
||||
if newname, ok := mp.renameMessages[name]; ok {
|
||||
//cclog.ComponentDebug("MessageProcessor", "Rename to", newname)
|
||||
out.SetName(newname)
|
||||
//cclog.ComponentDebug("MessageProcessor", "Add old name as 'oldname' to meta", name)
|
||||
out.AddMeta("oldname", name)
|
||||
}
|
||||
}
|
||||
case STAGENAME_RENAME_IF:
|
||||
if len(mp.renameMessagesIf) > 0 {
|
||||
//cclog.ComponentDebug("MessageProcessor", "Renaming by condition")
|
||||
_, err := renameMessagesIf(out, ¶ms, &mp.renameMessagesIf)
|
||||
if err != nil {
|
||||
return out, fmt.Errorf("failed to evaluate: %v", err.Error())
|
||||
}
|
||||
}
|
||||
case STAGENAME_ADD_TAG:
|
||||
if len(mp.addTagsIf) > 0 {
|
||||
//cclog.ComponentDebug("MessageProcessor", "Adding tags")
|
||||
_, err = addTagIf(out, ¶ms, &mp.addTagsIf)
|
||||
if err != nil {
|
||||
return out, fmt.Errorf("failed to evaluate: %v", err.Error())
|
||||
}
|
||||
}
|
||||
case STAGENAME_DELETE_TAG:
|
||||
if len(mp.deleteTagsIf) > 0 {
|
||||
//cclog.ComponentDebug("MessageProcessor", "Delete tags")
|
||||
_, err = deleteTagIf(out, ¶ms, &mp.deleteTagsIf)
|
||||
if err != nil {
|
||||
return out, fmt.Errorf("failed to evaluate: %v", err.Error())
|
||||
}
|
||||
}
|
||||
case STAGENAME_ADD_META:
|
||||
if len(mp.addMetaIf) > 0 {
|
||||
//cclog.ComponentDebug("MessageProcessor", "Adding meta information")
|
||||
_, err = addMetaIf(out, ¶ms, &mp.addMetaIf)
|
||||
if err != nil {
|
||||
return out, fmt.Errorf("failed to evaluate: %v", err.Error())
|
||||
}
|
||||
}
|
||||
case STAGENAME_DELETE_META:
|
||||
if len(mp.deleteMetaIf) > 0 {
|
||||
//cclog.ComponentDebug("MessageProcessor", "Delete meta information")
|
||||
_, err = deleteMetaIf(out, ¶ms, &mp.deleteMetaIf)
|
||||
if err != nil {
|
||||
return out, fmt.Errorf("failed to evaluate: %v", err.Error())
|
||||
}
|
||||
}
|
||||
case STAGENAME_ADD_FIELD:
|
||||
if len(mp.addFieldIf) > 0 {
|
||||
//cclog.ComponentDebug("MessageProcessor", "Adding fields")
|
||||
_, err = addFieldIf(out, ¶ms, &mp.addFieldIf)
|
||||
if err != nil {
|
||||
return out, fmt.Errorf("failed to evaluate: %v", err.Error())
|
||||
}
|
||||
}
|
||||
case STAGENAME_DELETE_FIELD:
|
||||
if len(mp.deleteFieldIf) > 0 {
|
||||
//cclog.ComponentDebug("MessageProcessor", "Delete fields")
|
||||
_, err = deleteFieldIf(out, ¶ms, &mp.deleteFieldIf)
|
||||
if err != nil {
|
||||
return out, fmt.Errorf("failed to evaluate: %v", err.Error())
|
||||
}
|
||||
}
|
||||
case STAGENAME_MOVE_TAG_META:
|
||||
if len(mp.moveTagToMeta) > 0 {
|
||||
//cclog.ComponentDebug("MessageProcessor", "Move tag to meta")
|
||||
_, err := moveTagToMeta(out, ¶ms, &mp.moveTagToMeta)
|
||||
if err != nil {
|
||||
return out, fmt.Errorf("failed to evaluate: %v", err.Error())
|
||||
}
|
||||
}
|
||||
case STAGENAME_MOVE_TAG_FIELD:
|
||||
if len(mp.moveTagToField) > 0 {
|
||||
//cclog.ComponentDebug("MessageProcessor", "Move tag to fields")
|
||||
_, err := moveTagToField(out, ¶ms, &mp.moveTagToField)
|
||||
if err != nil {
|
||||
return out, fmt.Errorf("failed to evaluate: %v", err.Error())
|
||||
}
|
||||
}
|
||||
case STAGENAME_MOVE_META_TAG:
|
||||
if len(mp.moveMetaToTag) > 0 {
|
||||
//cclog.ComponentDebug("MessageProcessor", "Move meta to tags")
|
||||
_, err := moveMetaToTag(out, ¶ms, &mp.moveMetaToTag)
|
||||
if err != nil {
|
||||
return out, fmt.Errorf("failed to evaluate: %v", err.Error())
|
||||
}
|
||||
}
|
||||
case STAGENAME_MOVE_META_FIELD:
|
||||
if len(mp.moveMetaToField) > 0 {
|
||||
//cclog.ComponentDebug("MessageProcessor", "Move meta to fields")
|
||||
_, err := moveMetaToField(out, ¶ms, &mp.moveMetaToField)
|
||||
if err != nil {
|
||||
return out, fmt.Errorf("failed to evaluate: %v", err.Error())
|
||||
}
|
||||
}
|
||||
case STAGENAME_MOVE_FIELD_META:
|
||||
if len(mp.moveFieldToMeta) > 0 {
|
||||
//cclog.ComponentDebug("MessageProcessor", "Move field to meta")
|
||||
_, err := moveFieldToMeta(out, ¶ms, &mp.moveFieldToMeta)
|
||||
if err != nil {
|
||||
return out, fmt.Errorf("failed to evaluate: %v", err.Error())
|
||||
}
|
||||
}
|
||||
case STAGENAME_MOVE_FIELD_TAG:
|
||||
if len(mp.moveFieldToTag) > 0 {
|
||||
//cclog.ComponentDebug("MessageProcessor", "Move field to tags")
|
||||
_, err := moveFieldToTag(out, ¶ms, &mp.moveFieldToTag)
|
||||
if err != nil {
|
||||
return out, fmt.Errorf("failed to evaluate: %v", err.Error())
|
||||
}
|
||||
}
|
||||
case STAGENAME_NORMALIZE_UNIT:
|
||||
if mp.normalizeUnits {
|
||||
//cclog.ComponentDebug("MessageProcessor", "Normalize units")
|
||||
if lp.IsMetric(out) {
|
||||
_, err := normalizeUnits(out)
|
||||
if err != nil {
|
||||
return out, fmt.Errorf("failed to evaluate: %v", err.Error())
|
||||
}
|
||||
} else {
|
||||
cclog.ComponentDebug("MessageProcessor", "skipped, no metric")
|
||||
}
|
||||
}
|
||||
|
||||
case STAGENAME_CHANGE_UNIT_PREFIX:
|
||||
if len(mp.changeUnitPrefix) > 0 {
|
||||
//cclog.ComponentDebug("MessageProcessor", "Change unit prefix")
|
||||
if lp.IsMetric(out) {
|
||||
_, err := changeUnitPrefix(out, ¶ms, &mp.changeUnitPrefix)
|
||||
if err != nil {
|
||||
return out, fmt.Errorf("failed to evaluate: %v", err.Error())
|
||||
}
|
||||
} else {
|
||||
cclog.ComponentDebug("MessageProcessor", "skipped, no metric")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// Get a new instace of a message processor.
|
||||
func NewMessageProcessor() (MessageProcessor, error) {
|
||||
mp := new(messageProcessor)
|
||||
err := mp.init()
|
||||
if err != nil {
|
||||
err := fmt.Errorf("failed to create MessageProcessor: %v", err.Error())
|
||||
cclog.ComponentError("MessageProcessor", err.Error())
|
||||
return nil, err
|
||||
}
|
||||
return mp, nil
|
||||
}
|
@@ -1,262 +0,0 @@
|
||||
package messageprocessor
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
lp2 "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message"
|
||||
units "github.com/ClusterCockpit/cc-units"
|
||||
"github.com/expr-lang/expr"
|
||||
"github.com/expr-lang/expr/vm"
|
||||
)
|
||||
|
||||
type MessageLocation int
|
||||
|
||||
const (
|
||||
MESSAGE_LOCATION_TAGS MessageLocation = iota
|
||||
MESSAGE_LOCATION_META
|
||||
MESSAGE_LOCATION_FIELDS
|
||||
)
|
||||
|
||||
// Abstract function to move entries from one location to another
|
||||
func moveInMessage(message lp2.CCMessage, params *map[string]interface{}, checks *map[*vm.Program]messageProcessorTagConfig, from, to MessageLocation) (bool, error) {
|
||||
for d, data := range *checks {
|
||||
value, err := expr.Run(d, *params)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("failed to evaluate: %v", err.Error())
|
||||
}
|
||||
//cclog.ComponentDebug("MessageProcessor", "Move from", from, "to", to)
|
||||
if value.(bool) {
|
||||
var v string
|
||||
var ok bool = false
|
||||
switch from {
|
||||
case MESSAGE_LOCATION_TAGS:
|
||||
//cclog.ComponentDebug("MessageProcessor", "Getting tag key", data.Key)
|
||||
v, ok = message.GetTag(data.Key)
|
||||
case MESSAGE_LOCATION_META:
|
||||
//cclog.ComponentDebug("MessageProcessor", "Getting meta key", data.Key)
|
||||
//cclog.ComponentDebug("MessageProcessor", message.Meta())
|
||||
v, ok = message.GetMeta(data.Key)
|
||||
case MESSAGE_LOCATION_FIELDS:
|
||||
var x interface{}
|
||||
//cclog.ComponentDebug("MessageProcessor", "Getting field key", data.Key)
|
||||
x, ok = message.GetField(data.Key)
|
||||
v = fmt.Sprintf("%v", x)
|
||||
}
|
||||
if ok {
|
||||
switch from {
|
||||
case MESSAGE_LOCATION_TAGS:
|
||||
//cclog.ComponentDebug("MessageProcessor", "Removing tag key", data.Key)
|
||||
message.RemoveTag(data.Key)
|
||||
case MESSAGE_LOCATION_META:
|
||||
//cclog.ComponentDebug("MessageProcessor", "Removing meta key", data.Key)
|
||||
message.RemoveMeta(data.Key)
|
||||
case MESSAGE_LOCATION_FIELDS:
|
||||
//cclog.ComponentDebug("MessageProcessor", "Removing field key", data.Key)
|
||||
message.RemoveField(data.Key)
|
||||
}
|
||||
switch to {
|
||||
case MESSAGE_LOCATION_TAGS:
|
||||
//cclog.ComponentDebug("MessageProcessor", "Adding tag", data.Value, "->", v)
|
||||
message.AddTag(data.Value, v)
|
||||
case MESSAGE_LOCATION_META:
|
||||
//cclog.ComponentDebug("MessageProcessor", "Adding meta", data.Value, "->", v)
|
||||
message.AddMeta(data.Value, v)
|
||||
case MESSAGE_LOCATION_FIELDS:
|
||||
//cclog.ComponentDebug("MessageProcessor", "Adding field", data.Value, "->", v)
|
||||
message.AddField(data.Value, v)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func deleteIf(message lp2.CCMessage, params *map[string]interface{}, checks *map[*vm.Program]messageProcessorTagConfig, location MessageLocation) (bool, error) {
|
||||
for d, data := range *checks {
|
||||
value, err := expr.Run(d, *params)
|
||||
if err != nil {
|
||||
return true, fmt.Errorf("failed to evaluate: %v", err.Error())
|
||||
}
|
||||
if value.(bool) {
|
||||
switch location {
|
||||
case MESSAGE_LOCATION_FIELDS:
|
||||
switch data.Key {
|
||||
case "value", "event", "log", "control":
|
||||
return false, errors.New("cannot delete protected fields")
|
||||
default:
|
||||
//cclog.ComponentDebug("MessageProcessor", "Removing field for", data.Key)
|
||||
message.RemoveField(data.Key)
|
||||
}
|
||||
case MESSAGE_LOCATION_TAGS:
|
||||
//cclog.ComponentDebug("MessageProcessor", "Removing tag for", data.Key)
|
||||
message.RemoveTag(data.Key)
|
||||
case MESSAGE_LOCATION_META:
|
||||
//cclog.ComponentDebug("MessageProcessor", "Removing meta for", data.Key)
|
||||
message.RemoveMeta(data.Key)
|
||||
}
|
||||
}
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func addIf(message lp2.CCMessage, params *map[string]interface{}, checks *map[*vm.Program]messageProcessorTagConfig, location MessageLocation) (bool, error) {
|
||||
for d, data := range *checks {
|
||||
value, err := expr.Run(d, *params)
|
||||
if err != nil {
|
||||
return true, fmt.Errorf("failed to evaluate: %v", err.Error())
|
||||
}
|
||||
if value.(bool) {
|
||||
switch location {
|
||||
case MESSAGE_LOCATION_FIELDS:
|
||||
//cclog.ComponentDebug("MessageProcessor", "Adding field", data.Value, "->", data.Value)
|
||||
message.AddField(data.Key, data.Value)
|
||||
case MESSAGE_LOCATION_TAGS:
|
||||
//cclog.ComponentDebug("MessageProcessor", "Adding tag", data.Value, "->", data.Value)
|
||||
message.AddTag(data.Key, data.Value)
|
||||
case MESSAGE_LOCATION_META:
|
||||
//cclog.ComponentDebug("MessageProcessor", "Adding meta", data.Value, "->", data.Value)
|
||||
message.AddMeta(data.Key, data.Value)
|
||||
}
|
||||
}
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func deleteTagIf(message lp2.CCMessage, params *map[string]interface{}, checks *map[*vm.Program]messageProcessorTagConfig) (bool, error) {
|
||||
return deleteIf(message, params, checks, MESSAGE_LOCATION_TAGS)
|
||||
}
|
||||
|
||||
func addTagIf(message lp2.CCMessage, params *map[string]interface{}, checks *map[*vm.Program]messageProcessorTagConfig) (bool, error) {
|
||||
return addIf(message, params, checks, MESSAGE_LOCATION_TAGS)
|
||||
}
|
||||
|
||||
func moveTagToMeta(message lp2.CCMessage, params *map[string]interface{}, checks *map[*vm.Program]messageProcessorTagConfig) (bool, error) {
|
||||
return moveInMessage(message, params, checks, MESSAGE_LOCATION_TAGS, MESSAGE_LOCATION_META)
|
||||
}
|
||||
|
||||
func moveTagToField(message lp2.CCMessage, params *map[string]interface{}, checks *map[*vm.Program]messageProcessorTagConfig) (bool, error) {
|
||||
return moveInMessage(message, params, checks, MESSAGE_LOCATION_TAGS, MESSAGE_LOCATION_FIELDS)
|
||||
}
|
||||
|
||||
func deleteMetaIf(message lp2.CCMessage, params *map[string]interface{}, checks *map[*vm.Program]messageProcessorTagConfig) (bool, error) {
|
||||
return deleteIf(message, params, checks, MESSAGE_LOCATION_META)
|
||||
}
|
||||
|
||||
func addMetaIf(message lp2.CCMessage, params *map[string]interface{}, checks *map[*vm.Program]messageProcessorTagConfig) (bool, error) {
|
||||
return addIf(message, params, checks, MESSAGE_LOCATION_META)
|
||||
}
|
||||
|
||||
func moveMetaToTag(message lp2.CCMessage, params *map[string]interface{}, checks *map[*vm.Program]messageProcessorTagConfig) (bool, error) {
|
||||
return moveInMessage(message, params, checks, MESSAGE_LOCATION_META, MESSAGE_LOCATION_TAGS)
|
||||
}
|
||||
|
||||
func moveMetaToField(message lp2.CCMessage, params *map[string]interface{}, checks *map[*vm.Program]messageProcessorTagConfig) (bool, error) {
|
||||
return moveInMessage(message, params, checks, MESSAGE_LOCATION_META, MESSAGE_LOCATION_FIELDS)
|
||||
}
|
||||
|
||||
func deleteFieldIf(message lp2.CCMessage, params *map[string]interface{}, checks *map[*vm.Program]messageProcessorTagConfig) (bool, error) {
|
||||
return deleteIf(message, params, checks, MESSAGE_LOCATION_FIELDS)
|
||||
}
|
||||
|
||||
func addFieldIf(message lp2.CCMessage, params *map[string]interface{}, checks *map[*vm.Program]messageProcessorTagConfig) (bool, error) {
|
||||
return addIf(message, params, checks, MESSAGE_LOCATION_FIELDS)
|
||||
}
|
||||
|
||||
func moveFieldToTag(message lp2.CCMessage, params *map[string]interface{}, checks *map[*vm.Program]messageProcessorTagConfig) (bool, error) {
|
||||
return moveInMessage(message, params, checks, MESSAGE_LOCATION_FIELDS, MESSAGE_LOCATION_TAGS)
|
||||
}
|
||||
|
||||
func moveFieldToMeta(message lp2.CCMessage, params *map[string]interface{}, checks *map[*vm.Program]messageProcessorTagConfig) (bool, error) {
|
||||
return moveInMessage(message, params, checks, MESSAGE_LOCATION_FIELDS, MESSAGE_LOCATION_META)
|
||||
}
|
||||
|
||||
func dropMessagesIf(params *map[string]interface{}, checks *map[*vm.Program]struct{}) (bool, error) {
|
||||
for d := range *checks {
|
||||
value, err := expr.Run(d, *params)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("failed to evaluate: %v", err.Error())
|
||||
}
|
||||
if value.(bool) {
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func normalizeUnits(message lp2.CCMessage) (bool, error) {
|
||||
if in_unit, ok := message.GetMeta("unit"); ok {
|
||||
u := units.NewUnit(in_unit)
|
||||
if u.Valid() {
|
||||
//cclog.ComponentDebug("MessageProcessor", "Update unit with", u.Short())
|
||||
message.AddMeta("unit", u.Short())
|
||||
}
|
||||
} else if in_unit, ok := message.GetTag("unit"); ok {
|
||||
u := units.NewUnit(in_unit)
|
||||
if u.Valid() {
|
||||
//cclog.ComponentDebug("MessageProcessor", "Update unit with", u.Short())
|
||||
message.AddTag("unit", u.Short())
|
||||
}
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func changeUnitPrefix(message lp2.CCMessage, params *map[string]interface{}, checks *map[*vm.Program]string) (bool, error) {
|
||||
for r, n := range *checks {
|
||||
value, err := expr.Run(r, *params)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("failed to evaluate: %v", err.Error())
|
||||
}
|
||||
if value.(bool) {
|
||||
newPrefix := units.NewPrefix(n)
|
||||
//cclog.ComponentDebug("MessageProcessor", "Condition matches, change to prefix", newPrefix.String())
|
||||
if in_unit, ok := message.GetMeta("unit"); ok && newPrefix != units.InvalidPrefix {
|
||||
u := units.NewUnit(in_unit)
|
||||
if u.Valid() {
|
||||
//cclog.ComponentDebug("MessageProcessor", "Input unit", u.Short())
|
||||
conv, out_unit := units.GetUnitPrefixFactor(u, newPrefix)
|
||||
if conv != nil && out_unit.Valid() {
|
||||
if val, ok := message.GetField("value"); ok {
|
||||
//cclog.ComponentDebug("MessageProcessor", "Update unit with", out_unit.Short())
|
||||
message.AddField("value", conv(val))
|
||||
message.AddMeta("unit", out_unit.Short())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
} else if in_unit, ok := message.GetTag("unit"); ok && newPrefix != units.InvalidPrefix {
|
||||
u := units.NewUnit(in_unit)
|
||||
if u.Valid() {
|
||||
//cclog.ComponentDebug("MessageProcessor", "Input unit", u.Short())
|
||||
conv, out_unit := units.GetUnitPrefixFactor(u, newPrefix)
|
||||
if conv != nil && out_unit.Valid() {
|
||||
if val, ok := message.GetField("value"); ok {
|
||||
//cclog.ComponentDebug("MessageProcessor", "Update unit with", out_unit.Short())
|
||||
message.AddField("value", conv(val))
|
||||
message.AddTag("unit", out_unit.Short())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func renameMessagesIf(message lp2.CCMessage, params *map[string]interface{}, checks *map[*vm.Program]string) (bool, error) {
|
||||
for d, n := range *checks {
|
||||
value, err := expr.Run(d, *params)
|
||||
if err != nil {
|
||||
return true, fmt.Errorf("failed to evaluate: %v", err.Error())
|
||||
}
|
||||
if value.(bool) {
|
||||
old := message.Name()
|
||||
//cclog.ComponentDebug("MessageProcessor", "Rename to", n)
|
||||
message.SetName(n)
|
||||
//cclog.ComponentDebug("MessageProcessor", "Add old name as 'oldname' to meta", old)
|
||||
message.AddMeta("oldname", old)
|
||||
}
|
||||
}
|
||||
return false, nil
|
||||
}
|
@@ -1,396 +0,0 @@
|
||||
package messageprocessor
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message"
|
||||
cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
|
||||
)
|
||||
|
||||
func generate_message_lists(num_lists, num_entries int) ([][]lp.CCMessage, error) {
|
||||
mlist := make([][]lp.CCMessage, 0)
|
||||
for j := 0; j < num_lists; j++ {
|
||||
out := make([]lp.CCMessage, 0)
|
||||
for i := 0; i < num_entries; i++ {
|
||||
var x lp.CCMessage
|
||||
var err error = nil
|
||||
switch {
|
||||
case i%4 == 0:
|
||||
x, err = lp.NewEvent("myevent", map[string]string{"type": "socket", "type-id": "0"}, map[string]string{}, "nothing happend", time.Now())
|
||||
case i%4 == 1:
|
||||
x, err = lp.NewMetric("mymetric", map[string]string{"type": "socket", "type-id": "0"}, map[string]string{"unit": "kByte"}, 12.145, time.Now())
|
||||
case i%4 == 2:
|
||||
x, err = lp.NewLog("mylog", map[string]string{"type": "socket", "type-id": "0"}, map[string]string{}, "disk status: OK", time.Now())
|
||||
case i%4 == 3:
|
||||
x, err = lp.NewGetControl("mycontrol", map[string]string{"type": "socket", "type-id": "0"}, map[string]string{}, time.Now())
|
||||
}
|
||||
if err == nil {
|
||||
x.AddTag("hostname", "myhost")
|
||||
out = append(out, x)
|
||||
} else {
|
||||
return nil, errors.New("failed to create message")
|
||||
}
|
||||
}
|
||||
mlist = append(mlist, out)
|
||||
}
|
||||
return mlist, nil
|
||||
}
|
||||
|
||||
func TestNewMessageProcessor(t *testing.T) {
|
||||
_, err := NewMessageProcessor()
|
||||
if err != nil {
|
||||
t.Error(err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
type Configs struct {
|
||||
name string
|
||||
config json.RawMessage
|
||||
drop bool
|
||||
errors bool
|
||||
pre func(msg lp.CCMessage) error
|
||||
check func(msg lp.CCMessage) error
|
||||
}
|
||||
|
||||
var test_configs = []Configs{
|
||||
{
|
||||
name: "single_dropif_nomatch",
|
||||
config: json.RawMessage(`{"drop_messages_if": [ "name == 'testname' && tags.type == 'socket' && tags.typeid % 2 == 1"]}`),
|
||||
},
|
||||
{
|
||||
name: "drop_by_name",
|
||||
config: json.RawMessage(`{"drop_messages": [ "net_bytes_in"]}`),
|
||||
drop: true,
|
||||
},
|
||||
{
|
||||
name: "drop_by_type_match",
|
||||
config: json.RawMessage(`{"drop_by_message_type": [ "metric"]}`),
|
||||
drop: true,
|
||||
},
|
||||
{
|
||||
name: "drop_by_type_nomatch",
|
||||
config: json.RawMessage(`{"drop_by_message_type": [ "event"]}`),
|
||||
},
|
||||
{
|
||||
name: "single_dropif_match",
|
||||
config: json.RawMessage(`{"drop_messages_if": [ "name == 'net_bytes_in' && tags.type == 'node'"]}`),
|
||||
drop: true,
|
||||
},
|
||||
{
|
||||
name: "double_dropif_match_nomatch",
|
||||
config: json.RawMessage(`{"drop_messages_if": [ "name == 'net_bytes_in' && tags.type == 'node'", "name == 'testname' && tags.type == 'socket' && tags.typeid % 2 == 1"]}`),
|
||||
drop: true,
|
||||
},
|
||||
{
|
||||
name: "rename_simple",
|
||||
config: json.RawMessage(`{"rename_messages": { "net_bytes_in" : "net_bytes_out", "rapl_power": "cpu_power"}}`),
|
||||
check: func(msg lp.CCMessage) error {
|
||||
if msg.Name() != "net_bytes_out" {
|
||||
return errors.New("expected name net_bytes_out but still have net_bytes_in")
|
||||
}
|
||||
return nil
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "rename_match",
|
||||
config: json.RawMessage(`{"rename_messages_if": { "name == 'net_bytes_in'" : "net_bytes_out", "name == 'rapl_power'": "cpu_power"}}`),
|
||||
check: func(msg lp.CCMessage) error {
|
||||
if msg.Name() != "net_bytes_out" {
|
||||
return errors.New("expected name net_bytes_out but still have net_bytes_in")
|
||||
}
|
||||
return nil
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "rename_nomatch",
|
||||
config: json.RawMessage(`{"rename_messages_if": { "name == 'net_bytes_out'" : "net_bytes_in", "name == 'rapl_power'": "cpu_power"}}`),
|
||||
check: func(msg lp.CCMessage) error {
|
||||
if msg.Name() != "net_bytes_in" {
|
||||
return errors.New("expected name net_bytes_in but still have net_bytes_out")
|
||||
}
|
||||
return nil
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "add_tag",
|
||||
config: json.RawMessage(`{"add_tags_if": [{"if": "name == 'net_bytes_in'", "key" : "cluster", "value" : "mycluster"}]}`),
|
||||
check: func(msg lp.CCMessage) error {
|
||||
if !msg.HasTag("cluster") {
|
||||
return errors.New("expected new tag 'cluster' but not present")
|
||||
}
|
||||
return nil
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "del_tag",
|
||||
config: json.RawMessage(`{"delete_tags_if": [{"if": "name == 'net_bytes_in'", "key" : "type"}]}`),
|
||||
check: func(msg lp.CCMessage) error {
|
||||
if msg.HasTag("type") {
|
||||
return errors.New("expected to have no 'type' but still present")
|
||||
}
|
||||
return nil
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "add_meta",
|
||||
config: json.RawMessage(`{"add_meta_if": [{"if": "name == 'net_bytes_in'", "key" : "source", "value" : "example"}]}`),
|
||||
check: func(msg lp.CCMessage) error {
|
||||
if !msg.HasMeta("source") {
|
||||
return errors.New("expected new tag 'source' but not present")
|
||||
}
|
||||
return nil
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "del_meta",
|
||||
config: json.RawMessage(`{"delete_meta_if": [{"if": "name == 'net_bytes_in'", "key" : "unit"}]}`),
|
||||
check: func(msg lp.CCMessage) error {
|
||||
if msg.HasMeta("unit") {
|
||||
return errors.New("expected to have no 'unit' but still present")
|
||||
}
|
||||
return nil
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "add_field",
|
||||
config: json.RawMessage(`{"add_fields_if": [{"if": "name == 'net_bytes_in'", "key" : "myfield", "value" : "example"}]}`),
|
||||
check: func(msg lp.CCMessage) error {
|
||||
if !msg.HasField("myfield") {
|
||||
return errors.New("expected new tag 'source' but not present")
|
||||
}
|
||||
return nil
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "delete_fields_if_protected",
|
||||
config: json.RawMessage(`{"delete_fields_if": [{"if": "name == 'net_bytes_in'", "key" : "value"}]}`),
|
||||
errors: true,
|
||||
check: func(msg lp.CCMessage) error {
|
||||
if !msg.HasField("value") {
|
||||
return errors.New("expected to still have 'value' field because it is a protected field key")
|
||||
}
|
||||
return nil
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "delete_fields_if_unprotected",
|
||||
config: json.RawMessage(`{"delete_fields_if": [{"if": "name == 'net_bytes_in'", "key" : "testfield"}]}`),
|
||||
check: func(msg lp.CCMessage) error {
|
||||
if msg.HasField("testfield") {
|
||||
return errors.New("expected to still have 'testfield' field but should be deleted")
|
||||
}
|
||||
return nil
|
||||
},
|
||||
pre: func(msg lp.CCMessage) error {
|
||||
msg.AddField("testfield", 4.123)
|
||||
return nil
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "single_change_prefix_match",
|
||||
config: json.RawMessage(`{"change_unit_prefix": {"name == 'net_bytes_in' && tags.type == 'node'": "M"}}`),
|
||||
check: func(msg lp.CCMessage) error {
|
||||
if u, ok := msg.GetMeta("unit"); ok {
|
||||
if u != "MB" {
|
||||
return fmt.Errorf("expected unit MB but have %s", u)
|
||||
}
|
||||
} else if u, ok := msg.GetTag("unit"); ok {
|
||||
if u != "MB" {
|
||||
return fmt.Errorf("expected unit MB but have %s", u)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "normalize_units",
|
||||
config: json.RawMessage(`{"normalize_units": true}`),
|
||||
check: func(msg lp.CCMessage) error {
|
||||
if u, ok := msg.GetMeta("unit"); ok {
|
||||
if u != "B" {
|
||||
return fmt.Errorf("expected unit B but have %s", u)
|
||||
}
|
||||
} else if u, ok := msg.GetTag("unit"); ok {
|
||||
if u != "B" {
|
||||
return fmt.Errorf("expected unit B but have %s", u)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "move_tag_to_meta",
|
||||
config: json.RawMessage(`{"move_tag_to_meta_if": [{"if": "name == 'net_bytes_in'", "key" : "type-id", "value": "typeid"}]}`),
|
||||
check: func(msg lp.CCMessage) error {
|
||||
if msg.HasTag("type-id") || !msg.HasMeta("typeid") {
|
||||
return errors.New("moving tag 'type-id' to meta 'typeid' failed")
|
||||
}
|
||||
return nil
|
||||
},
|
||||
pre: func(msg lp.CCMessage) error {
|
||||
msg.AddTag("type-id", "0")
|
||||
return nil
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "move_tag_to_field",
|
||||
config: json.RawMessage(`{"move_tag_to_field_if": [{"if": "name == 'net_bytes_in'", "key" : "type-id", "value": "typeid"}]}`),
|
||||
check: func(msg lp.CCMessage) error {
|
||||
if msg.HasTag("type-id") || !msg.HasField("typeid") {
|
||||
return errors.New("moving tag 'type-id' to field 'typeid' failed")
|
||||
}
|
||||
return nil
|
||||
},
|
||||
pre: func(msg lp.CCMessage) error {
|
||||
msg.AddTag("type-id", "0")
|
||||
return nil
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "move_meta_to_tag",
|
||||
config: json.RawMessage(`{"move_meta_to_tag_if": [{"if": "name == 'net_bytes_in'", "key" : "unit", "value": "unit"}]}`),
|
||||
check: func(msg lp.CCMessage) error {
|
||||
if msg.HasMeta("unit") || !msg.HasTag("unit") {
|
||||
return errors.New("moving meta 'unit' to tag 'unit' failed")
|
||||
}
|
||||
return nil
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "move_meta_to_field",
|
||||
config: json.RawMessage(`{"move_meta_to_field_if": [{"if": "name == 'net_bytes_in'", "key" : "unit", "value": "unit"}]}`),
|
||||
check: func(msg lp.CCMessage) error {
|
||||
if msg.HasMeta("unit") || !msg.HasField("unit") {
|
||||
return errors.New("moving meta 'unit' to field 'unit' failed")
|
||||
}
|
||||
return nil
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "move_field_to_tag",
|
||||
config: json.RawMessage(`{"move_field_to_tag_if": [{"if": "name == 'net_bytes_in'", "key" : "myfield", "value": "field"}]}`),
|
||||
check: func(msg lp.CCMessage) error {
|
||||
if msg.HasField("myfield") || !msg.HasTag("field") {
|
||||
return errors.New("moving meta 'myfield' to tag 'field' failed")
|
||||
}
|
||||
return nil
|
||||
},
|
||||
pre: func(msg lp.CCMessage) error {
|
||||
msg.AddField("myfield", 12)
|
||||
return nil
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "move_field_to_meta",
|
||||
config: json.RawMessage(`{"move_field_to_meta_if": [{"if": "name == 'net_bytes_in'", "key" : "myfield", "value": "field"}]}`),
|
||||
check: func(msg lp.CCMessage) error {
|
||||
if msg.HasField("myfield") || !msg.HasMeta("field") {
|
||||
return errors.New("moving meta 'myfield' to meta 'field' failed")
|
||||
}
|
||||
return nil
|
||||
},
|
||||
pre: func(msg lp.CCMessage) error {
|
||||
msg.AddField("myfield", 12)
|
||||
return nil
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
func TestConfigList(t *testing.T) {
|
||||
for _, c := range test_configs {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
m, err := lp.NewMetric("net_bytes_in", map[string]string{"type": "node", "type-id": "0"}, map[string]string{"unit": "Byte"}, float64(1024.0), time.Now())
|
||||
if err != nil {
|
||||
t.Error(err.Error())
|
||||
return
|
||||
}
|
||||
if c.pre != nil {
|
||||
if err = c.pre(m); err != nil {
|
||||
t.Errorf("error running pre-test function: %v", err.Error())
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
mp, err := NewMessageProcessor()
|
||||
if err != nil {
|
||||
t.Error(err.Error())
|
||||
return
|
||||
}
|
||||
err = mp.FromConfigJSON(c.config)
|
||||
if err != nil {
|
||||
t.Error(err.Error())
|
||||
return
|
||||
}
|
||||
//t.Log(m.ToLineProtocol(nil))
|
||||
out, err := mp.ProcessMessage(m)
|
||||
if err != nil && !c.errors {
|
||||
cclog.SetDebug()
|
||||
mp.ProcessMessage(m)
|
||||
t.Error(err.Error())
|
||||
return
|
||||
}
|
||||
if out == nil && !c.drop {
|
||||
t.Error("fail, message should NOT be dropped but processor signalled dropping")
|
||||
return
|
||||
} else if out != nil && c.drop {
|
||||
t.Error("fail, message should be dropped but processor signalled NO dropping")
|
||||
return
|
||||
}
|
||||
// {
|
||||
// if c.drop {
|
||||
// t.Error("fail, message should be dropped but processor signalled NO dropping")
|
||||
// } else {
|
||||
// t.Error("fail, message should NOT be dropped but processor signalled dropping")
|
||||
// }
|
||||
// cclog.SetDebug()
|
||||
// mp.ProcessMessage(m)
|
||||
// return
|
||||
// }
|
||||
if c.check != nil {
|
||||
if err := c.check(out); err != nil {
|
||||
t.Errorf("check failed with %v", err.Error())
|
||||
t.Log("Rerun with debugging")
|
||||
cclog.SetDebug()
|
||||
mp.ProcessMessage(m)
|
||||
return
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkProcessing(b *testing.B) {
|
||||
|
||||
mlist, err := generate_message_lists(b.N, 1000)
|
||||
if err != nil {
|
||||
b.Error(err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
mp, err := NewMessageProcessor()
|
||||
if err != nil {
|
||||
b.Error(err.Error())
|
||||
return
|
||||
}
|
||||
err = mp.FromConfigJSON(json.RawMessage(`{"move_meta_to_tag_if": [{"if" : "name == 'mymetric'", "key":"unit", "value":"unit"}]}`))
|
||||
if err != nil {
|
||||
b.Error(err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
b.StartTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
for _, m := range mlist[i] {
|
||||
if _, err := mp.ProcessMessage(m); err != nil {
|
||||
b.Errorf("failed processing message '%s': %v", m.ToLineProtocol(nil), err.Error())
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
b.StopTimer()
|
||||
b.ReportMetric(float64(b.Elapsed())/float64(len(mlist)*b.N), "ns/message")
|
||||
}
|
@@ -3,7 +3,7 @@ package multiChanTicker
|
||||
import (
|
||||
"time"
|
||||
|
||||
cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
|
||||
cclog "github.com/ClusterCockpit/cc-lib/ccLogger"
|
||||
)
|
||||
|
||||
type multiChanTicker struct {
|
||||
|
Reference in New Issue
Block a user