mirror of
https://github.com/ClusterCockpit/cc-metric-collector.git
synced 2024-11-10 04:27:25 +01:00
New message processor to check whether a message should be dropped or manipulate it in flight
This commit is contained in:
parent
b69efdc2a4
commit
6c9c14ed54
84
pkg/messageProcessor/README.md
Normal file
84
pkg/messageProcessor/README.md
Normal file
@ -0,0 +1,84 @@
|
|||||||
|
# Message Processor Component
|
||||||
|
|
||||||
|
Multiple parts of in the ClusterCockit ecosystem require the processing of CCMessages.
|
||||||
|
The main CC application using it is `cc-metric-collector`. The processing part there was originally in the metric router, the central
|
||||||
|
hub connecting collectors (reading local data), receivers (receiving remote data) and sinks (sending data). Already in early stages, the
|
||||||
|
lack of flexibility caused some trouble:
|
||||||
|
|
||||||
|
> The sysadmins wanted to keep operating their Ganglia based monitoring infrastructure while we developed the CC stack. Ganglia wants the core metrics with
|
||||||
|
> a specific name and resolution (right unit prefix) but there was no conversion of the data in the CC stack, so CC frontend developers wanted a different
|
||||||
|
> resolution for some metrics. The issue was basically the `mem_used` metric showing the currently used memory of the node. Ganglia wants it in `kByte` as provided
|
||||||
|
> by the Linux operating system but CC wanted it in `GByte`.
|
||||||
|
|
||||||
|
## For developers
|
||||||
|
|
||||||
|
Whenever you receive or are about to send a message out, you should provide some processing.
|
||||||
|
|
||||||
|
### Configuration of component
|
||||||
|
|
||||||
|
New operations can be added to the message processor at runtime. Of course, they can also be removed again. For the initial setup, having a configuration file
|
||||||
|
or some fields in a configuration file for the processing.
|
||||||
|
|
||||||
|
The message processor uses the following configuration
|
||||||
|
```golang
|
||||||
|
type messageProcessorConfig struct {
|
||||||
|
DropMessages []string `json:"drop_messages"` // List of metric names to drop. For fine-grained dropping use drop_messages_if
|
||||||
|
DropMessagesIf []string `json:"drop_messages_if"` // List of evaluatable terms to drop messages
|
||||||
|
RenameMessages map[string]string `json:"rename_messages"` // Map to rename metric name from key to value
|
||||||
|
NormalizeUnits bool `json:"normalize_units"` // Check unit meta flag and normalize it using cc-units
|
||||||
|
ChangeUnitPrefix map[string]string `json:"change_unit_prefix"` // Add prefix that should be applied to the messages
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
In order to load the configuration from a `json.RawMessage`:
|
||||||
|
```golang
|
||||||
|
mp, _ := NewMessageProcessor()
|
||||||
|
|
||||||
|
mp.FromConfigJSON(configJson)
|
||||||
|
```
|
||||||
|
|
||||||
|
### Using the component
|
||||||
|
After initialization and adding the different operations, the `ProcessMessage()` function applies all operations and returns whether the message should be dropped.
|
||||||
|
|
||||||
|
```golang
|
||||||
|
m := lp.CCMetric{}
|
||||||
|
|
||||||
|
drop, err := mp.ProcessMessage(m)
|
||||||
|
if !drop {
|
||||||
|
// process further
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
#### Overhead
|
||||||
|
|
||||||
|
The operations taking conditions are pre-processed, which is commonly the time consuming part but, of course, with each added operation, the time to process a message
|
||||||
|
increases.
|
||||||
|
|
||||||
|
## For users
|
||||||
|
|
||||||
|
### Syntax for evaluatable terms
|
||||||
|
|
||||||
|
The message processor uses `gval` for evaluating the terms. It provides a basic set of operators like string comparison and arithmetic operations.
|
||||||
|
|
||||||
|
Accessible for operations are
|
||||||
|
- `name` of the message
|
||||||
|
- `timestamp` of the message
|
||||||
|
- `type`, `type-id` of the message (also `tag_type` and `tag_type-id`)
|
||||||
|
- `stype`, `stype-id` of the message (if message has theses tags, also `tag_stype` and `tag_stype-id`)
|
||||||
|
- `value` for a CCMetric message (also `field_value`)
|
||||||
|
- `event` for a CCEvent message (also `field_event`)
|
||||||
|
- `control` for a CCControl message (also `field_control`)
|
||||||
|
- `log` for a CCLog message (also `field_log`)
|
||||||
|
|
||||||
|
Generally, all tags are accessible with `tag_<tagkey>`, all meta information with `meta_<metakey>` and fields with `field_<fieldkey>`.
|
||||||
|
|
||||||
|
- Comparing strings: `==`, `!=`, `match(str, regex)` (use `%` instead of `\`!)
|
||||||
|
- Combining conditions: `&&`, `||`
|
||||||
|
- Comparing numbers: `==`, `!=`, `<`, `>`, `<=`, `>=`
|
||||||
|
- Test lists: `<value> in <list>`
|
||||||
|
- Topological tests: `tag_type-id in getCpuListOfType("socket", "1")` (test if the metric belongs to socket 1 in local node topology)
|
||||||
|
|
||||||
|
Often the operations are written in JSON files for loading them at startup. In JSON, some characters are not allowed. Therefore, the term syntax reflects that:
|
||||||
|
- use `''` instead of `""` for strings
|
||||||
|
- for the regexes, use `%` instead of `\`
|
||||||
|
|
985
pkg/messageProcessor/messageProcessor.go
Normal file
985
pkg/messageProcessor/messageProcessor.go
Normal file
@ -0,0 +1,985 @@
|
|||||||
|
package messageprocessor
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
lp2 "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message"
|
||||||
|
cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
|
||||||
|
lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
|
||||||
|
"github.com/expr-lang/expr"
|
||||||
|
"github.com/expr-lang/expr/vm"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Message processor add/delete tag/meta configuration
|
||||||
|
type messageProcessorTagConfig struct {
|
||||||
|
Key string `json:"key"` // Tag name
|
||||||
|
Value string `json:"value"` // Tag value
|
||||||
|
Condition string `json:"if"` // Condition for adding or removing corresponding tag
|
||||||
|
}
|
||||||
|
|
||||||
|
type messageProcessorConfig struct {
|
||||||
|
StageOrder []string `json:"stage_order,omitempty"` // List of stages to execute them in the specified order and to skip unrequired ones
|
||||||
|
DropMessages []string `json:"drop_messages,omitempty"` // List of metric names to drop. For fine-grained dropping use drop_messages_if
|
||||||
|
DropMessagesIf []string `json:"drop_messages_if,omitempty"` // List of evaluatable terms to drop messages
|
||||||
|
RenameMessages map[string]string `json:"rename_messages,omitempty"` // Map of metric names to rename
|
||||||
|
RenameMessagesIf map[string]string `json:"rename_messages_if,omitempty"` // Map to rename metric name based on a condition
|
||||||
|
NormalizeUnits bool `json:"normalize_units,omitempty"` // Check unit meta flag and normalize it using cc-units
|
||||||
|
ChangeUnitPrefix map[string]string `json:"change_unit_prefix,omitempty"` // Add prefix that should be applied to the messages
|
||||||
|
AddTagsIf []messageProcessorTagConfig `json:"add_tags_if"` // List of tags that are added when the condition is met
|
||||||
|
DelTagsIf []messageProcessorTagConfig `json:"delete_tags_if"` // List of tags that are removed when the condition is met
|
||||||
|
AddMetaIf []messageProcessorTagConfig `json:"add_meta_if"` // List of meta infos that are added when the condition is met
|
||||||
|
DelMetaIf []messageProcessorTagConfig `json:"delete_meta_if"` // List of meta infos that are removed when the condition is met
|
||||||
|
AddFieldIf []messageProcessorTagConfig `json:"add_fields_if"` // List of fields that are added when the condition is met
|
||||||
|
DelFieldIf []messageProcessorTagConfig `json:"delete_fields_if"` // List of fields that are removed when the condition is met
|
||||||
|
DropByType []string `json:"drop_by_message_type"` // List of message types that should be dropped
|
||||||
|
MoveTagToMeta []messageProcessorTagConfig `json:"move_tag_to_meta_if"`
|
||||||
|
MoveTagToField []messageProcessorTagConfig `json:"move_tag_to_field_if"`
|
||||||
|
MoveMetaToTag []messageProcessorTagConfig `json:"move_meta_to_tag_if"`
|
||||||
|
MoveMetaToField []messageProcessorTagConfig `json:"move_meta_to_field_if"`
|
||||||
|
MoveFieldToTag []messageProcessorTagConfig `json:"move_field_to_tag_if"`
|
||||||
|
MoveFieldToMeta []messageProcessorTagConfig `json:"move_field_to_meta_if"`
|
||||||
|
AddBaseEnv map[string]interface{} `json:"add_base_env"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type messageProcessor struct {
|
||||||
|
|
||||||
|
// For thread-safety
|
||||||
|
mutex sync.RWMutex
|
||||||
|
|
||||||
|
// mapping contains all evalables as strings to gval.Evaluable
|
||||||
|
// because it is not possible to get the original string out of
|
||||||
|
// a gval.Evaluable
|
||||||
|
mapping map[string]*vm.Program
|
||||||
|
|
||||||
|
stages []string // order of stage execution
|
||||||
|
dropMessages map[string]struct{} // internal lookup map
|
||||||
|
dropTypes map[string]struct{} // internal lookup map
|
||||||
|
dropMessagesIf map[*vm.Program]struct{} // pre-processed dropMessagesIf
|
||||||
|
renameMessages map[string]string // internal lookup map
|
||||||
|
renameMessagesIf map[*vm.Program]string // pre-processed RenameMessagesIf
|
||||||
|
changeUnitPrefix map[*vm.Program]string // pre-processed ChangeUnitPrefix
|
||||||
|
normalizeUnits bool
|
||||||
|
addTagsIf map[*vm.Program]messageProcessorTagConfig // pre-processed AddTagsIf
|
||||||
|
deleteTagsIf map[*vm.Program]messageProcessorTagConfig // pre-processed DelTagsIf
|
||||||
|
addMetaIf map[*vm.Program]messageProcessorTagConfig // pre-processed AddMetaIf
|
||||||
|
deleteMetaIf map[*vm.Program]messageProcessorTagConfig // pre-processed DelMetaIf
|
||||||
|
addFieldIf map[*vm.Program]messageProcessorTagConfig // pre-processed AddFieldIf
|
||||||
|
deleteFieldIf map[*vm.Program]messageProcessorTagConfig // pre-processed DelFieldIf
|
||||||
|
moveTagToMeta map[*vm.Program]messageProcessorTagConfig // pre-processed MoveTagToMeta
|
||||||
|
moveTagToField map[*vm.Program]messageProcessorTagConfig // pre-processed MoveTagToField
|
||||||
|
moveMetaToTag map[*vm.Program]messageProcessorTagConfig // pre-processed MoveMetaToTag
|
||||||
|
moveMetaToField map[*vm.Program]messageProcessorTagConfig // pre-processed MoveMetaToField
|
||||||
|
moveFieldToTag map[*vm.Program]messageProcessorTagConfig // pre-processed MoveFieldToTag
|
||||||
|
moveFieldToMeta map[*vm.Program]messageProcessorTagConfig // pre-processed MoveFieldToMeta
|
||||||
|
}
|
||||||
|
|
||||||
|
type MessageProcessor interface {
|
||||||
|
// Functions to set the execution order of the processing stages
|
||||||
|
SetStages([]string) error
|
||||||
|
DefaultStages() []string
|
||||||
|
// Function to add variables to the base evaluation environment
|
||||||
|
AddBaseEnv(env map[string]interface{}) error
|
||||||
|
// Functions to add and remove rules
|
||||||
|
AddDropMessagesByName(name string) error
|
||||||
|
RemoveDropMessagesByName(name string)
|
||||||
|
AddDropMessagesByCondition(condition string) error
|
||||||
|
RemoveDropMessagesByCondition(condition string)
|
||||||
|
AddRenameMetricByCondition(condition string, name string) error
|
||||||
|
RemoveRenameMetricByCondition(condition string)
|
||||||
|
AddRenameMetricByName(from, to string) error
|
||||||
|
RemoveRenameMetricByName(from string)
|
||||||
|
SetNormalizeUnits(settings bool)
|
||||||
|
AddChangeUnitPrefix(condition string, prefix string) error
|
||||||
|
RemoveChangeUnitPrefix(condition string)
|
||||||
|
AddAddTagsByCondition(condition, key, value string) error
|
||||||
|
RemoveAddTagsByCondition(condition string)
|
||||||
|
AddDeleteTagsByCondition(condition, key, value string) error
|
||||||
|
RemoveDeleteTagsByCondition(condition string)
|
||||||
|
AddAddMetaByCondition(condition, key, value string) error
|
||||||
|
RemoveAddMetaByCondition(condition string)
|
||||||
|
AddDeleteMetaByCondition(condition, key, value string) error
|
||||||
|
RemoveDeleteMetaByCondition(condition string)
|
||||||
|
AddMoveTagToMeta(condition, key, value string) error
|
||||||
|
RemoveMoveTagToMeta(condition string)
|
||||||
|
AddMoveTagToFields(condition, key, value string) error
|
||||||
|
RemoveMoveTagToFields(condition string)
|
||||||
|
AddMoveMetaToTags(condition, key, value string) error
|
||||||
|
RemoveMoveMetaToTags(condition string)
|
||||||
|
AddMoveMetaToFields(condition, key, value string) error
|
||||||
|
RemoveMoveMetaToFields(condition string)
|
||||||
|
AddMoveFieldToTags(condition, key, value string) error
|
||||||
|
RemoveMoveFieldToTags(condition string)
|
||||||
|
AddMoveFieldToMeta(condition, key, value string) error
|
||||||
|
RemoveMoveFieldToMeta(condition string)
|
||||||
|
// Read in a JSON configuration
|
||||||
|
FromConfigJSON(config json.RawMessage) error
|
||||||
|
// Processing functions for legacy CCMetric and current CCMessage
|
||||||
|
ProcessMetric(m lp.CCMetric) (bool, error)
|
||||||
|
ProcessMessage(m lp2.CCMessage) (bool, error)
|
||||||
|
//EvalToBool(condition string, parameters map[string]interface{}) (bool, error)
|
||||||
|
//EvalToFloat64(condition string, parameters map[string]interface{}) (float64, error)
|
||||||
|
//EvalToString(condition string, parameters map[string]interface{}) (string, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
const (
|
||||||
|
STAGENAME_DROP_BY_NAME string = "drop_by_name"
|
||||||
|
STAGENAME_DROP_BY_TYPE string = "drop_by_type"
|
||||||
|
STAGENAME_DROP_IF string = "drop_if"
|
||||||
|
STAGENAME_ADD_TAG string = "add_tag"
|
||||||
|
STAGENAME_DELETE_TAG string = "delete_tag"
|
||||||
|
STAGENAME_MOVE_TAG_META string = "move_tag_to_meta"
|
||||||
|
STAGENAME_MOVE_TAG_FIELD string = "move_tag_to_fields"
|
||||||
|
STAGENAME_ADD_META string = "add_meta"
|
||||||
|
STAGENAME_DELETE_META string = "delete_meta"
|
||||||
|
STAGENAME_MOVE_META_TAG string = "move_meta_to_tags"
|
||||||
|
STAGENAME_MOVE_META_FIELD string = "move_meta_to_fields"
|
||||||
|
STAGENAME_ADD_FIELD string = "add_field"
|
||||||
|
STAGENAME_DELETE_FIELD string = "delete_field"
|
||||||
|
STAGENAME_MOVE_FIELD_TAG string = "move_field_to_tags"
|
||||||
|
STAGENAME_MOVE_FIELD_META string = "move_field_to_meta"
|
||||||
|
STAGENAME_RENAME_BY_NAME string = "rename"
|
||||||
|
STAGENAME_RENAME_IF string = "rename_if"
|
||||||
|
STAGENAME_CHANGE_UNIT_PREFIX string = "change_unit_prefix"
|
||||||
|
STAGENAME_NORMALIZE_UNIT string = "normalize_unit"
|
||||||
|
)
|
||||||
|
|
||||||
|
var StageNames = []string{
|
||||||
|
STAGENAME_DROP_BY_NAME,
|
||||||
|
STAGENAME_DROP_BY_TYPE,
|
||||||
|
STAGENAME_DROP_IF,
|
||||||
|
STAGENAME_ADD_TAG,
|
||||||
|
STAGENAME_DELETE_TAG,
|
||||||
|
STAGENAME_MOVE_TAG_META,
|
||||||
|
STAGENAME_MOVE_TAG_FIELD,
|
||||||
|
STAGENAME_ADD_META,
|
||||||
|
STAGENAME_DELETE_META,
|
||||||
|
STAGENAME_MOVE_META_TAG,
|
||||||
|
STAGENAME_MOVE_META_FIELD,
|
||||||
|
STAGENAME_ADD_FIELD,
|
||||||
|
STAGENAME_DELETE_FIELD,
|
||||||
|
STAGENAME_MOVE_FIELD_TAG,
|
||||||
|
STAGENAME_MOVE_FIELD_META,
|
||||||
|
STAGENAME_RENAME_BY_NAME,
|
||||||
|
STAGENAME_RENAME_IF,
|
||||||
|
STAGENAME_CHANGE_UNIT_PREFIX,
|
||||||
|
STAGENAME_NORMALIZE_UNIT,
|
||||||
|
}
|
||||||
|
|
||||||
|
var paramMapPool = sync.Pool{
|
||||||
|
New: func() any {
|
||||||
|
return make(map[string]interface{})
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
func sanitizeExprString(key string) string {
|
||||||
|
return strings.ReplaceAll(key, "type-id", "typeid")
|
||||||
|
}
|
||||||
|
|
||||||
|
func getParamMap(point lp.CCMetric) map[string]interface{} {
|
||||||
|
params := paramMapPool.Get().(map[string]interface{})
|
||||||
|
params["message"] = point
|
||||||
|
params["msg"] = point
|
||||||
|
params["name"] = point.Name()
|
||||||
|
params["timestamp"] = point.Time().Unix()
|
||||||
|
params["time"] = params["timestamp"]
|
||||||
|
|
||||||
|
fields := paramMapPool.Get().(map[string]interface{})
|
||||||
|
for key, value := range point.Fields() {
|
||||||
|
fields[key] = value
|
||||||
|
switch key {
|
||||||
|
case "value":
|
||||||
|
params["messagetype"] = "metric"
|
||||||
|
params["value"] = value
|
||||||
|
params["metric"] = value
|
||||||
|
case "event":
|
||||||
|
params["messagetype"] = "event"
|
||||||
|
params["event"] = value
|
||||||
|
case "control":
|
||||||
|
params["messagetype"] = "control"
|
||||||
|
params["control"] = value
|
||||||
|
case "log":
|
||||||
|
params["messagetype"] = "log"
|
||||||
|
params["log"] = value
|
||||||
|
default:
|
||||||
|
params["messagetype"] = "unknown"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
params["msgtype"] = params["messagetype"]
|
||||||
|
params["fields"] = fields
|
||||||
|
params["field"] = fields
|
||||||
|
tags := paramMapPool.Get().(map[string]interface{})
|
||||||
|
for key, value := range point.Tags() {
|
||||||
|
tags[sanitizeExprString(key)] = value
|
||||||
|
}
|
||||||
|
params["tags"] = tags
|
||||||
|
params["tag"] = tags
|
||||||
|
meta := paramMapPool.Get().(map[string]interface{})
|
||||||
|
for key, value := range point.Meta() {
|
||||||
|
meta[sanitizeExprString(key)] = value
|
||||||
|
}
|
||||||
|
params["meta"] = meta
|
||||||
|
return params
|
||||||
|
}
|
||||||
|
|
||||||
|
var baseenv = map[string]interface{}{
|
||||||
|
"name": "",
|
||||||
|
"messagetype": "unknown",
|
||||||
|
"msgtype": "unknown",
|
||||||
|
"tag": map[string]interface{}{
|
||||||
|
"type": "unknown",
|
||||||
|
"typeid": "0",
|
||||||
|
"stype": "unknown",
|
||||||
|
"stypeid": "0",
|
||||||
|
"hostname": "localhost",
|
||||||
|
"cluster": "nocluster",
|
||||||
|
},
|
||||||
|
"tags": map[string]interface{}{
|
||||||
|
"type": "unknown",
|
||||||
|
"typeid": "0",
|
||||||
|
"stype": "unknown",
|
||||||
|
"stypeid": "0",
|
||||||
|
"hostname": "localhost",
|
||||||
|
"cluster": "nocluster",
|
||||||
|
},
|
||||||
|
"meta": map[string]interface{}{
|
||||||
|
"unit": "invalid",
|
||||||
|
"source": "unknown",
|
||||||
|
},
|
||||||
|
"fields": map[string]interface{}{
|
||||||
|
"value": 0,
|
||||||
|
"event": "",
|
||||||
|
"control": "",
|
||||||
|
"log": "",
|
||||||
|
},
|
||||||
|
"field": map[string]interface{}{
|
||||||
|
"value": 0,
|
||||||
|
"event": "",
|
||||||
|
"control": "",
|
||||||
|
"log": "",
|
||||||
|
},
|
||||||
|
"timestamp": 1234567890,
|
||||||
|
"msg": lp2.EmptyMessage(),
|
||||||
|
"message": lp2.EmptyMessage(),
|
||||||
|
}
|
||||||
|
|
||||||
|
func addBaseEnvWalker(values map[string]interface{}) map[string]interface{} {
|
||||||
|
out := make(map[string]interface{})
|
||||||
|
for k, v := range values {
|
||||||
|
switch value := v.(type) {
|
||||||
|
case int, int32, int64, uint, uint32, uint64, string, float32, float64:
|
||||||
|
out[k] = value
|
||||||
|
case map[string]interface{}:
|
||||||
|
if _, ok := baseenv[k]; !ok {
|
||||||
|
out[k] = addBaseEnvWalker(value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mp *messageProcessor) AddBaseEnv(env map[string]interface{}) error {
|
||||||
|
for k, v := range env {
|
||||||
|
switch value := v.(type) {
|
||||||
|
case int, int32, int64, uint, uint32, uint64, string, float32, float64:
|
||||||
|
baseenv[k] = value
|
||||||
|
case map[string]interface{}:
|
||||||
|
if _, ok := baseenv[k]; !ok {
|
||||||
|
baseenv[k] = addBaseEnvWalker(value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mp *messageProcessor) init() error {
|
||||||
|
mp.stages = make([]string, 0)
|
||||||
|
mp.mapping = make(map[string]*vm.Program)
|
||||||
|
mp.dropMessages = make(map[string]struct{})
|
||||||
|
mp.dropTypes = make(map[string]struct{})
|
||||||
|
mp.dropMessagesIf = make(map[*vm.Program]struct{})
|
||||||
|
mp.renameMessages = make(map[string]string)
|
||||||
|
mp.renameMessagesIf = make(map[*vm.Program]string)
|
||||||
|
mp.changeUnitPrefix = make(map[*vm.Program]string)
|
||||||
|
mp.addTagsIf = make(map[*vm.Program]messageProcessorTagConfig)
|
||||||
|
mp.addMetaIf = make(map[*vm.Program]messageProcessorTagConfig)
|
||||||
|
mp.addFieldIf = make(map[*vm.Program]messageProcessorTagConfig)
|
||||||
|
mp.deleteTagsIf = make(map[*vm.Program]messageProcessorTagConfig)
|
||||||
|
mp.deleteMetaIf = make(map[*vm.Program]messageProcessorTagConfig)
|
||||||
|
mp.deleteFieldIf = make(map[*vm.Program]messageProcessorTagConfig)
|
||||||
|
mp.moveFieldToMeta = make(map[*vm.Program]messageProcessorTagConfig)
|
||||||
|
mp.moveFieldToTag = make(map[*vm.Program]messageProcessorTagConfig)
|
||||||
|
mp.moveMetaToField = make(map[*vm.Program]messageProcessorTagConfig)
|
||||||
|
mp.moveMetaToTag = make(map[*vm.Program]messageProcessorTagConfig)
|
||||||
|
mp.moveTagToField = make(map[*vm.Program]messageProcessorTagConfig)
|
||||||
|
mp.moveTagToMeta = make(map[*vm.Program]messageProcessorTagConfig)
|
||||||
|
mp.normalizeUnits = false
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mp *messageProcessor) AddDropMessagesByName(name string) error {
|
||||||
|
mp.mutex.Lock()
|
||||||
|
if _, ok := mp.dropMessages[name]; !ok {
|
||||||
|
mp.dropMessages[name] = struct{}{}
|
||||||
|
}
|
||||||
|
mp.mutex.Unlock()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mp *messageProcessor) RemoveDropMessagesByName(name string) {
|
||||||
|
mp.mutex.Lock()
|
||||||
|
delete(mp.dropMessages, name)
|
||||||
|
mp.mutex.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mp *messageProcessor) AddDropMessagesByType(typestring string) error {
|
||||||
|
valid := []string{"metric", "event", "control", "log"}
|
||||||
|
isValid := false
|
||||||
|
for _, t := range valid {
|
||||||
|
if t == typestring {
|
||||||
|
isValid = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if isValid {
|
||||||
|
mp.mutex.Lock()
|
||||||
|
if _, ok := mp.dropTypes[typestring]; !ok {
|
||||||
|
cclog.ComponentDebug("MessageProcessor", "Adding type", typestring, "for dropping")
|
||||||
|
mp.dropTypes[typestring] = struct{}{}
|
||||||
|
}
|
||||||
|
mp.mutex.Unlock()
|
||||||
|
} else {
|
||||||
|
return fmt.Errorf("invalid message type %s", typestring)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mp *messageProcessor) RemoveDropMessagesByType(typestring string) {
|
||||||
|
mp.mutex.Lock()
|
||||||
|
delete(mp.dropTypes, typestring)
|
||||||
|
mp.mutex.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mp *messageProcessor) addTagConfig(condition, key, value string, config *map[*vm.Program]messageProcessorTagConfig) error {
|
||||||
|
var err error
|
||||||
|
evaluable, err := expr.Compile(sanitizeExprString(condition), expr.Env(baseenv), expr.AsBool())
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to create condition evaluable of '%s': %v", condition, err.Error())
|
||||||
|
}
|
||||||
|
mp.mutex.Lock()
|
||||||
|
if _, ok := (*config)[evaluable]; !ok {
|
||||||
|
mp.mapping[condition] = evaluable
|
||||||
|
(*config)[evaluable] = messageProcessorTagConfig{
|
||||||
|
Condition: condition,
|
||||||
|
Key: key,
|
||||||
|
Value: value,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
mp.mutex.Unlock()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mp *messageProcessor) removeTagConfig(condition string, config *map[*vm.Program]messageProcessorTagConfig) {
|
||||||
|
mp.mutex.Lock()
|
||||||
|
if e, ok := mp.mapping[condition]; ok {
|
||||||
|
delete(mp.mapping, condition)
|
||||||
|
delete(*config, e)
|
||||||
|
}
|
||||||
|
mp.mutex.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mp *messageProcessor) AddAddTagsByCondition(condition, key, value string) error {
|
||||||
|
return mp.addTagConfig(condition, key, value, &mp.addTagsIf)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mp *messageProcessor) RemoveAddTagsByCondition(condition string) {
|
||||||
|
mp.removeTagConfig(condition, &mp.addTagsIf)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mp *messageProcessor) AddDeleteTagsByCondition(condition, key, value string) error {
|
||||||
|
return mp.addTagConfig(condition, key, value, &mp.deleteTagsIf)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mp *messageProcessor) RemoveDeleteTagsByCondition(condition string) {
|
||||||
|
mp.removeTagConfig(condition, &mp.deleteTagsIf)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mp *messageProcessor) AddAddMetaByCondition(condition, key, value string) error {
|
||||||
|
return mp.addTagConfig(condition, key, value, &mp.addMetaIf)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mp *messageProcessor) RemoveAddMetaByCondition(condition string) {
|
||||||
|
mp.removeTagConfig(condition, &mp.addMetaIf)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mp *messageProcessor) AddDeleteMetaByCondition(condition, key, value string) error {
|
||||||
|
return mp.addTagConfig(condition, key, value, &mp.deleteMetaIf)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mp *messageProcessor) RemoveDeleteMetaByCondition(condition string) {
|
||||||
|
mp.removeTagConfig(condition, &mp.deleteMetaIf)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mp *messageProcessor) AddAddFieldByCondition(condition, key, value string) error {
|
||||||
|
return mp.addTagConfig(condition, key, value, &mp.addFieldIf)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mp *messageProcessor) RemoveAddFieldByCondition(condition string) {
|
||||||
|
mp.removeTagConfig(condition, &mp.addFieldIf)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mp *messageProcessor) AddDeleteFieldByCondition(condition, key, value string) error {
|
||||||
|
return mp.addTagConfig(condition, key, value, &mp.deleteFieldIf)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mp *messageProcessor) RemoveDeleteFieldByCondition(condition string) {
|
||||||
|
mp.removeTagConfig(condition, &mp.deleteFieldIf)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mp *messageProcessor) AddDropMessagesByCondition(condition string) error {
|
||||||
|
|
||||||
|
var err error
|
||||||
|
evaluable, err := expr.Compile(sanitizeExprString(condition), expr.Env(baseenv), expr.AsBool())
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to create condition evaluable of '%s': %v", condition, err.Error())
|
||||||
|
}
|
||||||
|
mp.mutex.Lock()
|
||||||
|
if _, ok := mp.dropMessagesIf[evaluable]; !ok {
|
||||||
|
mp.mapping[condition] = evaluable
|
||||||
|
mp.dropMessagesIf[evaluable] = struct{}{}
|
||||||
|
}
|
||||||
|
mp.mutex.Unlock()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mp *messageProcessor) RemoveDropMessagesByCondition(condition string) {
|
||||||
|
mp.mutex.Lock()
|
||||||
|
if e, ok := mp.mapping[condition]; ok {
|
||||||
|
delete(mp.mapping, condition)
|
||||||
|
delete(mp.dropMessagesIf, e)
|
||||||
|
}
|
||||||
|
mp.mutex.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mp *messageProcessor) AddRenameMetricByCondition(condition string, name string) error {
|
||||||
|
|
||||||
|
var err error
|
||||||
|
evaluable, err := expr.Compile(sanitizeExprString(condition), expr.Env(baseenv), expr.AsBool())
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to create condition evaluable of '%s': %v", condition, err.Error())
|
||||||
|
}
|
||||||
|
mp.mutex.Lock()
|
||||||
|
if _, ok := mp.renameMessagesIf[evaluable]; !ok {
|
||||||
|
mp.mapping[condition] = evaluable
|
||||||
|
mp.renameMessagesIf[evaluable] = name
|
||||||
|
} else {
|
||||||
|
mp.renameMessagesIf[evaluable] = name
|
||||||
|
}
|
||||||
|
mp.mutex.Unlock()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mp *messageProcessor) RemoveRenameMetricByCondition(condition string) {
|
||||||
|
mp.mutex.Lock()
|
||||||
|
if e, ok := mp.mapping[condition]; ok {
|
||||||
|
delete(mp.mapping, condition)
|
||||||
|
delete(mp.renameMessagesIf, e)
|
||||||
|
}
|
||||||
|
mp.mutex.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mp *messageProcessor) SetNormalizeUnits(setting bool) {
|
||||||
|
mp.normalizeUnits = setting
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mp *messageProcessor) AddChangeUnitPrefix(condition string, prefix string) error {
|
||||||
|
|
||||||
|
var err error
|
||||||
|
evaluable, err := expr.Compile(sanitizeExprString(condition), expr.Env(baseenv), expr.AsBool())
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to create condition evaluable of '%s': %v", condition, err.Error())
|
||||||
|
}
|
||||||
|
mp.mutex.Lock()
|
||||||
|
if _, ok := mp.changeUnitPrefix[evaluable]; !ok {
|
||||||
|
mp.mapping[condition] = evaluable
|
||||||
|
mp.changeUnitPrefix[evaluable] = prefix
|
||||||
|
} else {
|
||||||
|
mp.changeUnitPrefix[evaluable] = prefix
|
||||||
|
}
|
||||||
|
mp.mutex.Unlock()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mp *messageProcessor) RemoveChangeUnitPrefix(condition string) {
|
||||||
|
mp.mutex.Lock()
|
||||||
|
if e, ok := mp.mapping[condition]; ok {
|
||||||
|
delete(mp.mapping, condition)
|
||||||
|
delete(mp.changeUnitPrefix, e)
|
||||||
|
}
|
||||||
|
mp.mutex.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mp *messageProcessor) AddRenameMetricByName(from, to string) error {
|
||||||
|
mp.mutex.Lock()
|
||||||
|
if _, ok := mp.renameMessages[from]; !ok {
|
||||||
|
mp.renameMessages[from] = to
|
||||||
|
}
|
||||||
|
mp.mutex.Unlock()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mp *messageProcessor) RemoveRenameMetricByName(from string) {
|
||||||
|
mp.mutex.Lock()
|
||||||
|
delete(mp.renameMessages, from)
|
||||||
|
mp.mutex.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mp *messageProcessor) AddMoveTagToMeta(condition, key, value string) error {
|
||||||
|
return mp.addTagConfig(condition, key, value, &mp.moveTagToMeta)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mp *messageProcessor) RemoveMoveTagToMeta(condition string) {
|
||||||
|
mp.removeTagConfig(condition, &mp.moveTagToMeta)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mp *messageProcessor) AddMoveTagToFields(condition, key, value string) error {
|
||||||
|
return mp.addTagConfig(condition, key, value, &mp.moveTagToField)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mp *messageProcessor) RemoveMoveTagToFields(condition string) {
|
||||||
|
mp.removeTagConfig(condition, &mp.moveTagToField)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mp *messageProcessor) AddMoveMetaToTags(condition, key, value string) error {
|
||||||
|
return mp.addTagConfig(condition, key, value, &mp.moveMetaToTag)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mp *messageProcessor) RemoveMoveMetaToTags(condition string) {
|
||||||
|
mp.removeTagConfig(condition, &mp.moveMetaToTag)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mp *messageProcessor) AddMoveMetaToFields(condition, key, value string) error {
|
||||||
|
return mp.addTagConfig(condition, key, value, &mp.moveMetaToField)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mp *messageProcessor) RemoveMoveMetaToFields(condition string) {
|
||||||
|
mp.removeTagConfig(condition, &mp.moveMetaToField)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mp *messageProcessor) AddMoveFieldToTags(condition, key, value string) error {
|
||||||
|
return mp.addTagConfig(condition, key, value, &mp.moveFieldToTag)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mp *messageProcessor) RemoveMoveFieldToTags(condition string) {
|
||||||
|
mp.removeTagConfig(condition, &mp.moveFieldToTag)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mp *messageProcessor) AddMoveFieldToMeta(condition, key, value string) error {
|
||||||
|
return mp.addTagConfig(condition, key, value, &mp.moveFieldToMeta)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mp *messageProcessor) RemoveMoveFieldToMeta(condition string) {
|
||||||
|
mp.removeTagConfig(condition, &mp.moveFieldToMeta)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mp *messageProcessor) SetStages(stages []string) error {
|
||||||
|
newstages := make([]string, 0)
|
||||||
|
if len(stages) == 0 {
|
||||||
|
mp.mutex.Lock()
|
||||||
|
mp.stages = newstages
|
||||||
|
mp.mutex.Unlock()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
for i, s := range stages {
|
||||||
|
valid := false
|
||||||
|
for _, v := range StageNames {
|
||||||
|
if s == v {
|
||||||
|
valid = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if valid {
|
||||||
|
newstages = append(newstages, s)
|
||||||
|
} else {
|
||||||
|
return fmt.Errorf("invalid stage %s at index %d", s, i)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
mp.mutex.Lock()
|
||||||
|
mp.stages = newstages
|
||||||
|
mp.mutex.Unlock()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mp *messageProcessor) DefaultStages() []string {
|
||||||
|
return StageNames
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mp *messageProcessor) FromConfigJSON(config json.RawMessage) error {
|
||||||
|
var c messageProcessorConfig
|
||||||
|
|
||||||
|
err := json.Unmarshal(config, &c)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to process config JSON: %v", err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(c.StageOrder) > 0 {
|
||||||
|
err = mp.SetStages(c.StageOrder)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to process config JSON: %v", err.Error())
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
err = mp.SetStages(mp.DefaultStages())
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to process config JSON: %v", err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, m := range c.DropMessages {
|
||||||
|
err = mp.AddDropMessagesByName(m)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to process config JSON: %v", err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for _, m := range c.DropByType {
|
||||||
|
err = mp.AddDropMessagesByType(m)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to process config JSON: %v", err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for _, m := range c.DropMessagesIf {
|
||||||
|
err = mp.AddDropMessagesByCondition(m)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to process config JSON: %v", err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for k, v := range c.RenameMessagesIf {
|
||||||
|
err = mp.AddRenameMetricByCondition(k, v)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to process config JSON: %v", err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for k, v := range c.RenameMessages {
|
||||||
|
err = mp.AddRenameMetricByName(k, v)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to process config JSON: %v", err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for k, v := range c.ChangeUnitPrefix {
|
||||||
|
err = mp.AddChangeUnitPrefix(k, v)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to process config JSON: %v", err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for _, c := range c.AddTagsIf {
|
||||||
|
err = mp.AddAddTagsByCondition(c.Condition, c.Key, c.Value)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to process config JSON: %v", err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for _, c := range c.AddMetaIf {
|
||||||
|
err = mp.AddAddMetaByCondition(c.Condition, c.Key, c.Value)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to process config JSON: %v", err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for _, c := range c.AddFieldIf {
|
||||||
|
err = mp.AddAddFieldByCondition(c.Condition, c.Key, c.Value)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to process config JSON: %v", err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for _, c := range c.DelTagsIf {
|
||||||
|
err = mp.AddDeleteTagsByCondition(c.Condition, c.Key, c.Value)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to process config JSON: %v", err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for _, c := range c.DelMetaIf {
|
||||||
|
err = mp.AddDeleteMetaByCondition(c.Condition, c.Key, c.Value)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to process config JSON: %v", err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for _, c := range c.DelFieldIf {
|
||||||
|
err = mp.AddDeleteFieldByCondition(c.Condition, c.Key, c.Value)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to process config JSON: %v", err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for _, c := range c.MoveTagToMeta {
|
||||||
|
err = mp.AddMoveTagToMeta(c.Condition, c.Key, c.Value)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to process config JSON: %v", err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for _, c := range c.MoveTagToField {
|
||||||
|
err = mp.AddMoveTagToFields(c.Condition, c.Key, c.Value)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to process config JSON: %v", err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for _, c := range c.MoveMetaToTag {
|
||||||
|
err = mp.AddMoveMetaToTags(c.Condition, c.Key, c.Value)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to process config JSON: %v", err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for _, c := range c.MoveMetaToField {
|
||||||
|
err = mp.AddMoveMetaToFields(c.Condition, c.Key, c.Value)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to process config JSON: %v", err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for _, c := range c.MoveFieldToTag {
|
||||||
|
err = mp.AddMoveFieldToTags(c.Condition, c.Key, c.Value)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to process config JSON: %v", err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for _, c := range c.MoveFieldToMeta {
|
||||||
|
err = mp.AddMoveFieldToMeta(c.Condition, c.Key, c.Value)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to process config JSON: %v", err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for _, m := range c.DropByType {
|
||||||
|
err = mp.AddDropMessagesByType(m)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to process config JSON: %v", err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(c.AddBaseEnv) > 0 {
|
||||||
|
err = mp.AddBaseEnv(c.AddBaseEnv)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to process config JSON: %v", err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
mp.SetNormalizeUnits(c.NormalizeUnits)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mp *messageProcessor) ProcessMetric(metric lp.CCMetric) (bool, error) {
|
||||||
|
m, err := lp2.NewMessage(
|
||||||
|
metric.Name(),
|
||||||
|
metric.Tags(),
|
||||||
|
metric.Meta(),
|
||||||
|
metric.Fields(),
|
||||||
|
metric.Time(),
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return true, fmt.Errorf("failed to parse metric to message: %v", err.Error())
|
||||||
|
}
|
||||||
|
return mp.ProcessMessage(m)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mp *messageProcessor) ProcessMessage(m lp2.CCMessage) (bool, error) {
|
||||||
|
var drop bool = false
|
||||||
|
var err error = nil
|
||||||
|
name := m.Name()
|
||||||
|
|
||||||
|
if len(mp.stages) == 0 {
|
||||||
|
mp.SetStages(mp.DefaultStages())
|
||||||
|
}
|
||||||
|
|
||||||
|
mp.mutex.RLock()
|
||||||
|
defer mp.mutex.RUnlock()
|
||||||
|
|
||||||
|
params := getParamMap(m)
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
params["field"] = nil
|
||||||
|
params["tag"] = nil
|
||||||
|
paramMapPool.Put(params["fields"])
|
||||||
|
paramMapPool.Put(params["tags"])
|
||||||
|
paramMapPool.Put(params["meta"])
|
||||||
|
paramMapPool.Put(params)
|
||||||
|
}()
|
||||||
|
|
||||||
|
for _, s := range mp.stages {
|
||||||
|
switch s {
|
||||||
|
case STAGENAME_DROP_BY_NAME:
|
||||||
|
if len(mp.dropMessages) > 0 {
|
||||||
|
cclog.ComponentDebug("MessageProcessor", "Dropping by message name ", name)
|
||||||
|
if _, ok := mp.dropMessages[name]; ok {
|
||||||
|
cclog.ComponentDebug("MessageProcessor", "Drop")
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case STAGENAME_DROP_BY_TYPE:
|
||||||
|
if len(mp.dropTypes) > 0 {
|
||||||
|
cclog.ComponentDebug("MessageProcessor", "Dropping by message type")
|
||||||
|
if _, ok := mp.dropTypes[params["messagetype"].(string)]; ok {
|
||||||
|
cclog.ComponentDebug("MessageProcessor", "Drop")
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case STAGENAME_DROP_IF:
|
||||||
|
if len(mp.dropMessagesIf) > 0 {
|
||||||
|
cclog.ComponentDebug("MessageProcessor", "Dropping by condition")
|
||||||
|
drop, err = dropMessagesIf(¶ms, &mp.dropMessagesIf)
|
||||||
|
if err != nil {
|
||||||
|
return drop, fmt.Errorf("failed to evaluate: %v", err.Error())
|
||||||
|
}
|
||||||
|
if drop {
|
||||||
|
cclog.ComponentDebug("MessageProcessor", "Drop")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case STAGENAME_RENAME_BY_NAME:
|
||||||
|
if len(mp.renameMessages) > 0 {
|
||||||
|
cclog.ComponentDebug("MessageProcessor", "Renaming by name match")
|
||||||
|
if newname, ok := mp.renameMessages[name]; ok {
|
||||||
|
cclog.ComponentDebug("MessageProcessor", "Rename to", newname)
|
||||||
|
m.SetName(newname)
|
||||||
|
cclog.ComponentDebug("MessageProcessor", "Add old name as 'oldname' to meta", name)
|
||||||
|
m.AddMeta("oldname", name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case STAGENAME_RENAME_IF:
|
||||||
|
if len(mp.renameMessagesIf) > 0 {
|
||||||
|
cclog.ComponentDebug("MessageProcessor", "Renaming by condition")
|
||||||
|
_, err := renameMessagesIf(m, ¶ms, &mp.renameMessagesIf)
|
||||||
|
if err != nil {
|
||||||
|
return drop, fmt.Errorf("failed to evaluate: %v", err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case STAGENAME_ADD_TAG:
|
||||||
|
if len(mp.addTagsIf) > 0 {
|
||||||
|
cclog.ComponentDebug("MessageProcessor", "Adding tags")
|
||||||
|
_, err = addTagIf(m, ¶ms, &mp.addTagsIf)
|
||||||
|
if err != nil {
|
||||||
|
return drop, fmt.Errorf("failed to evaluate: %v", err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case STAGENAME_DELETE_TAG:
|
||||||
|
if len(mp.deleteTagsIf) > 0 {
|
||||||
|
cclog.ComponentDebug("MessageProcessor", "Delete tags")
|
||||||
|
_, err = deleteTagIf(m, ¶ms, &mp.deleteTagsIf)
|
||||||
|
if err != nil {
|
||||||
|
return drop, fmt.Errorf("failed to evaluate: %v", err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case STAGENAME_ADD_META:
|
||||||
|
if len(mp.addMetaIf) > 0 {
|
||||||
|
cclog.ComponentDebug("MessageProcessor", "Adding meta information")
|
||||||
|
_, err = addMetaIf(m, ¶ms, &mp.addMetaIf)
|
||||||
|
if err != nil {
|
||||||
|
return drop, fmt.Errorf("failed to evaluate: %v", err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case STAGENAME_DELETE_META:
|
||||||
|
if len(mp.deleteMetaIf) > 0 {
|
||||||
|
cclog.ComponentDebug("MessageProcessor", "Delete meta information")
|
||||||
|
_, err = deleteMetaIf(m, ¶ms, &mp.deleteMetaIf)
|
||||||
|
if err != nil {
|
||||||
|
return drop, fmt.Errorf("failed to evaluate: %v", err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case STAGENAME_ADD_FIELD:
|
||||||
|
if len(mp.addFieldIf) > 0 {
|
||||||
|
cclog.ComponentDebug("MessageProcessor", "Adding fields")
|
||||||
|
_, err = addFieldIf(m, ¶ms, &mp.addFieldIf)
|
||||||
|
if err != nil {
|
||||||
|
return drop, fmt.Errorf("failed to evaluate: %v", err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case STAGENAME_DELETE_FIELD:
|
||||||
|
if len(mp.deleteFieldIf) > 0 {
|
||||||
|
cclog.ComponentDebug("MessageProcessor", "Delete fields")
|
||||||
|
_, err = deleteFieldIf(m, ¶ms, &mp.deleteFieldIf)
|
||||||
|
if err != nil {
|
||||||
|
return drop, fmt.Errorf("failed to evaluate: %v", err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case STAGENAME_MOVE_TAG_META:
|
||||||
|
if len(mp.moveTagToMeta) > 0 {
|
||||||
|
cclog.ComponentDebug("MessageProcessor", "Move tag to meta")
|
||||||
|
_, err := moveTagToMeta(m, ¶ms, &mp.moveTagToMeta)
|
||||||
|
if err != nil {
|
||||||
|
return drop, fmt.Errorf("failed to evaluate: %v", err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case STAGENAME_MOVE_TAG_FIELD:
|
||||||
|
if len(mp.moveTagToField) > 0 {
|
||||||
|
cclog.ComponentDebug("MessageProcessor", "Move tag to fields")
|
||||||
|
_, err := moveTagToField(m, ¶ms, &mp.moveTagToField)
|
||||||
|
if err != nil {
|
||||||
|
return drop, fmt.Errorf("failed to evaluate: %v", err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case STAGENAME_MOVE_META_TAG:
|
||||||
|
if len(mp.moveMetaToTag) > 0 {
|
||||||
|
cclog.ComponentDebug("MessageProcessor", "Move meta to tags")
|
||||||
|
_, err := moveMetaToTag(m, ¶ms, &mp.moveMetaToTag)
|
||||||
|
if err != nil {
|
||||||
|
return drop, fmt.Errorf("failed to evaluate: %v", err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case STAGENAME_MOVE_META_FIELD:
|
||||||
|
if len(mp.moveMetaToField) > 0 {
|
||||||
|
cclog.ComponentDebug("MessageProcessor", "Move meta to fields")
|
||||||
|
_, err := moveMetaToField(m, ¶ms, &mp.moveMetaToField)
|
||||||
|
if err != nil {
|
||||||
|
return drop, fmt.Errorf("failed to evaluate: %v", err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case STAGENAME_MOVE_FIELD_META:
|
||||||
|
if len(mp.moveFieldToMeta) > 0 {
|
||||||
|
cclog.ComponentDebug("MessageProcessor", "Move field to meta")
|
||||||
|
_, err := moveFieldToMeta(m, ¶ms, &mp.moveFieldToMeta)
|
||||||
|
if err != nil {
|
||||||
|
return drop, fmt.Errorf("failed to evaluate: %v", err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case STAGENAME_MOVE_FIELD_TAG:
|
||||||
|
if len(mp.moveFieldToTag) > 0 {
|
||||||
|
cclog.ComponentDebug("MessageProcessor", "Move field to tags")
|
||||||
|
_, err := moveFieldToTag(m, ¶ms, &mp.moveFieldToTag)
|
||||||
|
if err != nil {
|
||||||
|
return drop, fmt.Errorf("failed to evaluate: %v", err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case STAGENAME_NORMALIZE_UNIT:
|
||||||
|
if mp.normalizeUnits {
|
||||||
|
cclog.ComponentDebug("MessageProcessor", "Normalize units")
|
||||||
|
if lp2.IsMetric(m) {
|
||||||
|
_, err := normalizeUnits(m)
|
||||||
|
if err != nil {
|
||||||
|
return drop, fmt.Errorf("failed to evaluate: %v", err.Error())
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
cclog.ComponentDebug("MessageProcessor", "skipped, no metric")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
case STAGENAME_CHANGE_UNIT_PREFIX:
|
||||||
|
if len(mp.changeUnitPrefix) > 0 {
|
||||||
|
cclog.ComponentDebug("MessageProcessor", "Change unit prefix")
|
||||||
|
if lp2.IsMetric(m) {
|
||||||
|
_, err := changeUnitPrefix(m, ¶ms, &mp.changeUnitPrefix)
|
||||||
|
if err != nil {
|
||||||
|
return drop, fmt.Errorf("failed to evaluate: %v", err.Error())
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
cclog.ComponentDebug("MessageProcessor", "skipped, no metric")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
return drop, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get a new instace of a message processor.
|
||||||
|
func NewMessageProcessor() (MessageProcessor, error) {
|
||||||
|
mp := new(messageProcessor)
|
||||||
|
err := mp.init()
|
||||||
|
if err != nil {
|
||||||
|
err := fmt.Errorf("failed to create MessageProcessor: %v", err.Error())
|
||||||
|
cclog.ComponentError("MessageProcessor", err.Error())
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return mp, nil
|
||||||
|
}
|
265
pkg/messageProcessor/messageProcessorFuncs.go
Normal file
265
pkg/messageProcessor/messageProcessorFuncs.go
Normal file
@ -0,0 +1,265 @@
|
|||||||
|
package messageprocessor
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
lp2 "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message"
|
||||||
|
cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
|
||||||
|
units "github.com/ClusterCockpit/cc-units"
|
||||||
|
"github.com/expr-lang/expr"
|
||||||
|
"github.com/expr-lang/expr/vm"
|
||||||
|
)
|
||||||
|
|
||||||
|
type MessageLocation int
|
||||||
|
|
||||||
|
const (
|
||||||
|
MESSAGE_LOCATION_TAGS MessageLocation = iota
|
||||||
|
MESSAGE_LOCATION_META
|
||||||
|
MESSAGE_LOCATION_FIELDS
|
||||||
|
)
|
||||||
|
|
||||||
|
// Abstract function to move entries from one location to another
|
||||||
|
func moveInMessage(message lp2.CCMessage, params *map[string]interface{}, checks *map[*vm.Program]messageProcessorTagConfig, from, to MessageLocation) (bool, error) {
|
||||||
|
for d, data := range *checks {
|
||||||
|
value, err := expr.Run(d, *params)
|
||||||
|
if err != nil {
|
||||||
|
return false, fmt.Errorf("failed to evaluate: %v", err.Error())
|
||||||
|
}
|
||||||
|
cclog.ComponentDebug("MessageProcessor", "Move from", from, "to", to)
|
||||||
|
if value.(bool) {
|
||||||
|
var v string
|
||||||
|
var ok bool = false
|
||||||
|
switch from {
|
||||||
|
case MESSAGE_LOCATION_TAGS:
|
||||||
|
cclog.ComponentDebug("MessageProcessor", "Getting tag key", data.Key)
|
||||||
|
v, ok = message.GetTag(data.Key)
|
||||||
|
case MESSAGE_LOCATION_META:
|
||||||
|
cclog.ComponentDebug("MessageProcessor", "Getting meta key", data.Key)
|
||||||
|
cclog.ComponentDebug("MessageProcessor", message.Meta())
|
||||||
|
v, ok = message.GetMeta(data.Key)
|
||||||
|
case MESSAGE_LOCATION_FIELDS:
|
||||||
|
var x interface{}
|
||||||
|
cclog.ComponentDebug("MessageProcessor", "Getting field key", data.Key)
|
||||||
|
x, ok = message.GetField(data.Key)
|
||||||
|
v = fmt.Sprintf("%v", x)
|
||||||
|
}
|
||||||
|
if ok {
|
||||||
|
switch from {
|
||||||
|
case MESSAGE_LOCATION_TAGS:
|
||||||
|
cclog.ComponentDebug("MessageProcessor", "Removing tag key", data.Key)
|
||||||
|
message.RemoveTag(data.Key)
|
||||||
|
case MESSAGE_LOCATION_META:
|
||||||
|
cclog.ComponentDebug("MessageProcessor", "Removing meta key", data.Key)
|
||||||
|
message.RemoveMeta(data.Key)
|
||||||
|
case MESSAGE_LOCATION_FIELDS:
|
||||||
|
cclog.ComponentDebug("MessageProcessor", "Removing field key", data.Key)
|
||||||
|
message.RemoveField(data.Key)
|
||||||
|
}
|
||||||
|
switch to {
|
||||||
|
case MESSAGE_LOCATION_TAGS:
|
||||||
|
cclog.ComponentDebug("MessageProcessor", "Adding tag", data.Value, "->", v)
|
||||||
|
message.AddTag(data.Value, v)
|
||||||
|
case MESSAGE_LOCATION_META:
|
||||||
|
cclog.ComponentDebug("MessageProcessor", "Adding meta", data.Value, "->", v)
|
||||||
|
message.AddMeta(data.Value, v)
|
||||||
|
case MESSAGE_LOCATION_FIELDS:
|
||||||
|
cclog.ComponentDebug("MessageProcessor", "Adding field", data.Value, "->", v)
|
||||||
|
message.AddField(data.Value, v)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return false, fmt.Errorf("failed to get message entry: %s", data.Key)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func deleteIf(message lp2.CCMessage, params *map[string]interface{}, checks *map[*vm.Program]messageProcessorTagConfig, location MessageLocation) (bool, error) {
|
||||||
|
for d, data := range *checks {
|
||||||
|
value, err := expr.Run(d, *params)
|
||||||
|
if err != nil {
|
||||||
|
return true, fmt.Errorf("failed to evaluate: %v", err.Error())
|
||||||
|
}
|
||||||
|
if value.(bool) {
|
||||||
|
switch location {
|
||||||
|
case MESSAGE_LOCATION_FIELDS:
|
||||||
|
switch data.Key {
|
||||||
|
case "value", "event", "log", "control":
|
||||||
|
return false, errors.New("cannot delete protected fields")
|
||||||
|
default:
|
||||||
|
cclog.ComponentDebug("MessageProcessor", "Removing field for", data.Key)
|
||||||
|
message.RemoveField(data.Key)
|
||||||
|
}
|
||||||
|
case MESSAGE_LOCATION_TAGS:
|
||||||
|
cclog.ComponentDebug("MessageProcessor", "Removing tag for", data.Key)
|
||||||
|
message.RemoveTag(data.Key)
|
||||||
|
case MESSAGE_LOCATION_META:
|
||||||
|
cclog.ComponentDebug("MessageProcessor", "Removing meta for", data.Key)
|
||||||
|
message.RemoveMeta(data.Key)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func addIf(message lp2.CCMessage, params *map[string]interface{}, checks *map[*vm.Program]messageProcessorTagConfig, location MessageLocation) (bool, error) {
|
||||||
|
for d, data := range *checks {
|
||||||
|
value, err := expr.Run(d, *params)
|
||||||
|
if err != nil {
|
||||||
|
return true, fmt.Errorf("failed to evaluate: %v", err.Error())
|
||||||
|
}
|
||||||
|
if value.(bool) {
|
||||||
|
switch location {
|
||||||
|
case MESSAGE_LOCATION_FIELDS:
|
||||||
|
cclog.ComponentDebug("MessageProcessor", "Adding field", data.Value, "->", data.Value)
|
||||||
|
message.AddField(data.Key, data.Value)
|
||||||
|
case MESSAGE_LOCATION_TAGS:
|
||||||
|
cclog.ComponentDebug("MessageProcessor", "Adding tag", data.Value, "->", data.Value)
|
||||||
|
message.AddTag(data.Key, data.Value)
|
||||||
|
case MESSAGE_LOCATION_META:
|
||||||
|
cclog.ComponentDebug("MessageProcessor", "Adding meta", data.Value, "->", data.Value)
|
||||||
|
message.AddMeta(data.Key, data.Value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func deleteTagIf(message lp2.CCMessage, params *map[string]interface{}, checks *map[*vm.Program]messageProcessorTagConfig) (bool, error) {
|
||||||
|
return deleteIf(message, params, checks, MESSAGE_LOCATION_TAGS)
|
||||||
|
}
|
||||||
|
|
||||||
|
func addTagIf(message lp2.CCMessage, params *map[string]interface{}, checks *map[*vm.Program]messageProcessorTagConfig) (bool, error) {
|
||||||
|
return addIf(message, params, checks, MESSAGE_LOCATION_TAGS)
|
||||||
|
}
|
||||||
|
|
||||||
|
func moveTagToMeta(message lp2.CCMessage, params *map[string]interface{}, checks *map[*vm.Program]messageProcessorTagConfig) (bool, error) {
|
||||||
|
return moveInMessage(message, params, checks, MESSAGE_LOCATION_TAGS, MESSAGE_LOCATION_META)
|
||||||
|
}
|
||||||
|
|
||||||
|
func moveTagToField(message lp2.CCMessage, params *map[string]interface{}, checks *map[*vm.Program]messageProcessorTagConfig) (bool, error) {
|
||||||
|
return moveInMessage(message, params, checks, MESSAGE_LOCATION_TAGS, MESSAGE_LOCATION_FIELDS)
|
||||||
|
}
|
||||||
|
|
||||||
|
func deleteMetaIf(message lp2.CCMessage, params *map[string]interface{}, checks *map[*vm.Program]messageProcessorTagConfig) (bool, error) {
|
||||||
|
return deleteIf(message, params, checks, MESSAGE_LOCATION_META)
|
||||||
|
}
|
||||||
|
|
||||||
|
func addMetaIf(message lp2.CCMessage, params *map[string]interface{}, checks *map[*vm.Program]messageProcessorTagConfig) (bool, error) {
|
||||||
|
return addIf(message, params, checks, MESSAGE_LOCATION_META)
|
||||||
|
}
|
||||||
|
|
||||||
|
func moveMetaToTag(message lp2.CCMessage, params *map[string]interface{}, checks *map[*vm.Program]messageProcessorTagConfig) (bool, error) {
|
||||||
|
return moveInMessage(message, params, checks, MESSAGE_LOCATION_META, MESSAGE_LOCATION_TAGS)
|
||||||
|
}
|
||||||
|
|
||||||
|
func moveMetaToField(message lp2.CCMessage, params *map[string]interface{}, checks *map[*vm.Program]messageProcessorTagConfig) (bool, error) {
|
||||||
|
return moveInMessage(message, params, checks, MESSAGE_LOCATION_META, MESSAGE_LOCATION_FIELDS)
|
||||||
|
}
|
||||||
|
|
||||||
|
func deleteFieldIf(message lp2.CCMessage, params *map[string]interface{}, checks *map[*vm.Program]messageProcessorTagConfig) (bool, error) {
|
||||||
|
return deleteIf(message, params, checks, MESSAGE_LOCATION_FIELDS)
|
||||||
|
}
|
||||||
|
|
||||||
|
func addFieldIf(message lp2.CCMessage, params *map[string]interface{}, checks *map[*vm.Program]messageProcessorTagConfig) (bool, error) {
|
||||||
|
return addIf(message, params, checks, MESSAGE_LOCATION_FIELDS)
|
||||||
|
}
|
||||||
|
|
||||||
|
func moveFieldToTag(message lp2.CCMessage, params *map[string]interface{}, checks *map[*vm.Program]messageProcessorTagConfig) (bool, error) {
|
||||||
|
return moveInMessage(message, params, checks, MESSAGE_LOCATION_FIELDS, MESSAGE_LOCATION_TAGS)
|
||||||
|
}
|
||||||
|
|
||||||
|
func moveFieldToMeta(message lp2.CCMessage, params *map[string]interface{}, checks *map[*vm.Program]messageProcessorTagConfig) (bool, error) {
|
||||||
|
return moveInMessage(message, params, checks, MESSAGE_LOCATION_FIELDS, MESSAGE_LOCATION_META)
|
||||||
|
}
|
||||||
|
|
||||||
|
func dropMessagesIf(params *map[string]interface{}, checks *map[*vm.Program]struct{}) (bool, error) {
|
||||||
|
for d := range *checks {
|
||||||
|
value, err := expr.Run(d, *params)
|
||||||
|
if err != nil {
|
||||||
|
return false, fmt.Errorf("failed to evaluate: %v", err.Error())
|
||||||
|
}
|
||||||
|
if value.(bool) {
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func normalizeUnits(message lp2.CCMessage) (bool, error) {
|
||||||
|
if in_unit, ok := message.GetMeta("unit"); ok {
|
||||||
|
u := units.NewUnit(in_unit)
|
||||||
|
if u.Valid() {
|
||||||
|
cclog.ComponentDebug("MessageProcessor", "Update unit with", u.Short())
|
||||||
|
message.AddMeta("unit", u.Short())
|
||||||
|
}
|
||||||
|
} else if in_unit, ok := message.GetTag("unit"); ok {
|
||||||
|
u := units.NewUnit(in_unit)
|
||||||
|
if u.Valid() {
|
||||||
|
cclog.ComponentDebug("MessageProcessor", "Update unit with", u.Short())
|
||||||
|
message.AddTag("unit", u.Short())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func changeUnitPrefix(message lp2.CCMessage, params *map[string]interface{}, checks *map[*vm.Program]string) (bool, error) {
|
||||||
|
for r, n := range *checks {
|
||||||
|
value, err := expr.Run(r, *params)
|
||||||
|
if err != nil {
|
||||||
|
return false, fmt.Errorf("failed to evaluate: %v", err.Error())
|
||||||
|
}
|
||||||
|
if value.(bool) {
|
||||||
|
newPrefix := units.NewPrefix(n)
|
||||||
|
cclog.ComponentDebug("MessageProcessor", "Condition matches, change to prefix", newPrefix.String())
|
||||||
|
if in_unit, ok := message.GetMeta("unit"); ok && newPrefix != units.InvalidPrefix {
|
||||||
|
u := units.NewUnit(in_unit)
|
||||||
|
if u.Valid() {
|
||||||
|
cclog.ComponentDebug("MessageProcessor", "Input unit", u.Short())
|
||||||
|
conv, out_unit := units.GetUnitPrefixFactor(u, newPrefix)
|
||||||
|
if conv != nil && out_unit.Valid() {
|
||||||
|
if val, ok := message.GetField("value"); ok {
|
||||||
|
cclog.ComponentDebug("MessageProcessor", "Update unit with", out_unit.Short())
|
||||||
|
message.AddField("value", conv(val))
|
||||||
|
message.AddMeta("unit", out_unit.Short())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
} else if in_unit, ok := message.GetTag("unit"); ok && newPrefix != units.InvalidPrefix {
|
||||||
|
u := units.NewUnit(in_unit)
|
||||||
|
if u.Valid() {
|
||||||
|
cclog.ComponentDebug("MessageProcessor", "Input unit", u.Short())
|
||||||
|
conv, out_unit := units.GetUnitPrefixFactor(u, newPrefix)
|
||||||
|
if conv != nil && out_unit.Valid() {
|
||||||
|
if val, ok := message.GetField("value"); ok {
|
||||||
|
cclog.ComponentDebug("MessageProcessor", "Update unit with", out_unit.Short())
|
||||||
|
message.AddField("value", conv(val))
|
||||||
|
message.AddTag("unit", out_unit.Short())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func renameMessagesIf(message lp2.CCMessage, params *map[string]interface{}, checks *map[*vm.Program]string) (bool, error) {
|
||||||
|
for d, n := range *checks {
|
||||||
|
value, err := expr.Run(d, *params)
|
||||||
|
if err != nil {
|
||||||
|
return true, fmt.Errorf("failed to evaluate: %v", err.Error())
|
||||||
|
}
|
||||||
|
if value.(bool) {
|
||||||
|
old := message.Name()
|
||||||
|
cclog.ComponentDebug("MessageProcessor", "Rename to", n)
|
||||||
|
message.SetName(n)
|
||||||
|
cclog.ComponentDebug("MessageProcessor", "Add old name as 'oldname' to meta", old)
|
||||||
|
message.AddMeta("oldname", old)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false, nil
|
||||||
|
}
|
387
pkg/messageProcessor/messageProcessor_test.go
Normal file
387
pkg/messageProcessor/messageProcessor_test.go
Normal file
@ -0,0 +1,387 @@
|
|||||||
|
package messageprocessor
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message"
|
||||||
|
cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
|
||||||
|
)
|
||||||
|
|
||||||
|
func generate_message_lists(num_lists, num_entries int) ([][]lp.CCMessage, error) {
|
||||||
|
mlist := make([][]lp.CCMessage, 0)
|
||||||
|
for j := 0; j < num_lists; j++ {
|
||||||
|
out := make([]lp.CCMessage, 0)
|
||||||
|
for i := 0; i < num_entries; i++ {
|
||||||
|
var x lp.CCMessage
|
||||||
|
var err error = nil
|
||||||
|
switch {
|
||||||
|
case i%4 == 0:
|
||||||
|
x, err = lp.NewEvent("myevent", map[string]string{"type": "socket", "type-id": "0"}, map[string]string{}, "nothing happend", time.Now())
|
||||||
|
case i%4 == 1:
|
||||||
|
x, err = lp.NewMetric("mymetric", map[string]string{"type": "socket", "type-id": "0"}, map[string]string{"unit": "kByte"}, 12.145, time.Now())
|
||||||
|
case i%4 == 2:
|
||||||
|
x, err = lp.NewLog("mylog", map[string]string{"type": "socket", "type-id": "0"}, map[string]string{}, "disk status: OK", time.Now())
|
||||||
|
case i%4 == 3:
|
||||||
|
x, err = lp.NewGetControl("mycontrol", map[string]string{"type": "socket", "type-id": "0"}, map[string]string{}, time.Now())
|
||||||
|
}
|
||||||
|
if err == nil {
|
||||||
|
x.AddTag("hostname", "myhost")
|
||||||
|
out = append(out, x)
|
||||||
|
} else {
|
||||||
|
return nil, errors.New("failed to create message")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
mlist = append(mlist, out)
|
||||||
|
}
|
||||||
|
return mlist, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNewMessageProcessor(t *testing.T) {
|
||||||
|
_, err := NewMessageProcessor()
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type Configs struct {
|
||||||
|
name string
|
||||||
|
config json.RawMessage
|
||||||
|
drop bool
|
||||||
|
errors bool
|
||||||
|
pre func(msg lp.CCMessage) error
|
||||||
|
check func(msg lp.CCMessage) error
|
||||||
|
}
|
||||||
|
|
||||||
|
var test_configs = []Configs{
|
||||||
|
{
|
||||||
|
name: "single_dropif_nomatch",
|
||||||
|
config: json.RawMessage(`{"drop_messages_if": [ "name == 'testname' && tags.type == 'socket' && tags.typeid % 2 == 1"]}`),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "drop_by_name",
|
||||||
|
config: json.RawMessage(`{"drop_messages": [ "net_bytes_in"]}`),
|
||||||
|
drop: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "drop_by_type_match",
|
||||||
|
config: json.RawMessage(`{"drop_by_message_type": [ "metric"]}`),
|
||||||
|
drop: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "drop_by_type_nomatch",
|
||||||
|
config: json.RawMessage(`{"drop_by_message_type": [ "event"]}`),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "single_dropif_match",
|
||||||
|
config: json.RawMessage(`{"drop_messages_if": [ "name == 'net_bytes_in' && tags.type == 'node'"]}`),
|
||||||
|
drop: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "double_dropif_match_nomatch",
|
||||||
|
config: json.RawMessage(`{"drop_messages_if": [ "name == 'net_bytes_in' && tags.type == 'node'", "name == 'testname' && tags.type == 'socket' && tags.typeid % 2 == 1"]}`),
|
||||||
|
drop: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "rename_simple",
|
||||||
|
config: json.RawMessage(`{"rename_messages": { "net_bytes_in" : "net_bytes_out", "rapl_power": "cpu_power"}}`),
|
||||||
|
check: func(msg lp.CCMessage) error {
|
||||||
|
if msg.Name() != "net_bytes_out" {
|
||||||
|
return errors.New("expected name net_bytes_out but still have net_bytes_in")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "rename_match",
|
||||||
|
config: json.RawMessage(`{"rename_messages_if": { "name == 'net_bytes_in'" : "net_bytes_out", "name == 'rapl_power'": "cpu_power"}}`),
|
||||||
|
check: func(msg lp.CCMessage) error {
|
||||||
|
if msg.Name() != "net_bytes_out" {
|
||||||
|
return errors.New("expected name net_bytes_out but still have net_bytes_in")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "rename_nomatch",
|
||||||
|
config: json.RawMessage(`{"rename_messages_if": { "name == 'net_bytes_out'" : "net_bytes_in", "name == 'rapl_power'": "cpu_power"}}`),
|
||||||
|
check: func(msg lp.CCMessage) error {
|
||||||
|
if msg.Name() != "net_bytes_in" {
|
||||||
|
return errors.New("expected name net_bytes_in but still have net_bytes_out")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "add_tag",
|
||||||
|
config: json.RawMessage(`{"add_tags_if": [{"if": "name == 'net_bytes_in'", "key" : "cluster", "value" : "mycluster"}]}`),
|
||||||
|
check: func(msg lp.CCMessage) error {
|
||||||
|
if !msg.HasTag("cluster") {
|
||||||
|
return errors.New("expected new tag 'cluster' but not present")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "del_tag",
|
||||||
|
config: json.RawMessage(`{"delete_tags_if": [{"if": "name == 'net_bytes_in'", "key" : "type"}]}`),
|
||||||
|
check: func(msg lp.CCMessage) error {
|
||||||
|
if msg.HasTag("type") {
|
||||||
|
return errors.New("expected to have no 'type' but still present")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "add_meta",
|
||||||
|
config: json.RawMessage(`{"add_meta_if": [{"if": "name == 'net_bytes_in'", "key" : "source", "value" : "example"}]}`),
|
||||||
|
check: func(msg lp.CCMessage) error {
|
||||||
|
if !msg.HasMeta("source") {
|
||||||
|
return errors.New("expected new tag 'source' but not present")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "del_meta",
|
||||||
|
config: json.RawMessage(`{"delete_meta_if": [{"if": "name == 'net_bytes_in'", "key" : "unit"}]}`),
|
||||||
|
check: func(msg lp.CCMessage) error {
|
||||||
|
if msg.HasMeta("unit") {
|
||||||
|
return errors.New("expected to have no 'unit' but still present")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "add_field",
|
||||||
|
config: json.RawMessage(`{"add_fields_if": [{"if": "name == 'net_bytes_in'", "key" : "myfield", "value" : "example"}]}`),
|
||||||
|
check: func(msg lp.CCMessage) error {
|
||||||
|
if !msg.HasField("myfield") {
|
||||||
|
return errors.New("expected new tag 'source' but not present")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "delete_fields_if_protected",
|
||||||
|
config: json.RawMessage(`{"delete_fields_if": [{"if": "name == 'net_bytes_in'", "key" : "value"}]}`),
|
||||||
|
errors: true,
|
||||||
|
check: func(msg lp.CCMessage) error {
|
||||||
|
if !msg.HasField("value") {
|
||||||
|
return errors.New("expected to still have 'value' field because it is a protected field key")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "delete_fields_if_unprotected",
|
||||||
|
config: json.RawMessage(`{"delete_fields_if": [{"if": "name == 'net_bytes_in'", "key" : "testfield"}]}`),
|
||||||
|
check: func(msg lp.CCMessage) error {
|
||||||
|
if msg.HasField("testfield") {
|
||||||
|
return errors.New("expected to still have 'testfield' field but should be deleted")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
pre: func(msg lp.CCMessage) error {
|
||||||
|
msg.AddField("testfield", 4.123)
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "single_change_prefix_match",
|
||||||
|
config: json.RawMessage(`{"change_unit_prefix": {"name == 'net_bytes_in' && tags.type == 'node'": "M"}}`),
|
||||||
|
check: func(msg lp.CCMessage) error {
|
||||||
|
if u, ok := msg.GetMeta("unit"); ok {
|
||||||
|
if u != "MB" {
|
||||||
|
return fmt.Errorf("expected unit MB but have %s", u)
|
||||||
|
}
|
||||||
|
} else if u, ok := msg.GetTag("unit"); ok {
|
||||||
|
if u != "MB" {
|
||||||
|
return fmt.Errorf("expected unit MB but have %s", u)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "normalize_units",
|
||||||
|
config: json.RawMessage(`{"normalize_units": true}`),
|
||||||
|
check: func(msg lp.CCMessage) error {
|
||||||
|
if u, ok := msg.GetMeta("unit"); ok {
|
||||||
|
if u != "B" {
|
||||||
|
return fmt.Errorf("expected unit B but have %s", u)
|
||||||
|
}
|
||||||
|
} else if u, ok := msg.GetTag("unit"); ok {
|
||||||
|
if u != "B" {
|
||||||
|
return fmt.Errorf("expected unit B but have %s", u)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "move_tag_to_meta",
|
||||||
|
config: json.RawMessage(`{"move_tag_to_meta_if": [{"if": "name == 'net_bytes_in'", "key" : "type-id", "value": "typeid"}]}`),
|
||||||
|
check: func(msg lp.CCMessage) error {
|
||||||
|
if msg.HasTag("type-id") || !msg.HasMeta("typeid") {
|
||||||
|
return errors.New("moving tag 'type-id' to meta 'typeid' failed")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
pre: func(msg lp.CCMessage) error {
|
||||||
|
msg.AddTag("type-id", "0")
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "move_tag_to_field",
|
||||||
|
config: json.RawMessage(`{"move_tag_to_field_if": [{"if": "name == 'net_bytes_in'", "key" : "type-id", "value": "typeid"}]}`),
|
||||||
|
check: func(msg lp.CCMessage) error {
|
||||||
|
if msg.HasTag("type-id") || !msg.HasField("typeid") {
|
||||||
|
return errors.New("moving tag 'type-id' to field 'typeid' failed")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
pre: func(msg lp.CCMessage) error {
|
||||||
|
msg.AddTag("type-id", "0")
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "move_meta_to_tag",
|
||||||
|
config: json.RawMessage(`{"move_meta_to_tag_if": [{"if": "name == 'net_bytes_in'", "key" : "unit", "value": "unit"}]}`),
|
||||||
|
check: func(msg lp.CCMessage) error {
|
||||||
|
if msg.HasMeta("unit") || !msg.HasTag("unit") {
|
||||||
|
return errors.New("moving meta 'unit' to tag 'unit' failed")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "move_meta_to_field",
|
||||||
|
config: json.RawMessage(`{"move_meta_to_field_if": [{"if": "name == 'net_bytes_in'", "key" : "unit", "value": "unit"}]}`),
|
||||||
|
check: func(msg lp.CCMessage) error {
|
||||||
|
if msg.HasMeta("unit") || !msg.HasField("unit") {
|
||||||
|
return errors.New("moving meta 'unit' to field 'unit' failed")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "move_field_to_tag",
|
||||||
|
config: json.RawMessage(`{"move_field_to_tag_if": [{"if": "name == 'net_bytes_in'", "key" : "myfield", "value": "field"}]}`),
|
||||||
|
check: func(msg lp.CCMessage) error {
|
||||||
|
if msg.HasField("myfield") || !msg.HasTag("field") {
|
||||||
|
return errors.New("moving meta 'myfield' to tag 'field' failed")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
pre: func(msg lp.CCMessage) error {
|
||||||
|
msg.AddField("myfield", 12)
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "move_field_to_meta",
|
||||||
|
config: json.RawMessage(`{"move_field_to_meta_if": [{"if": "name == 'net_bytes_in'", "key" : "myfield", "value": "field"}]}`),
|
||||||
|
check: func(msg lp.CCMessage) error {
|
||||||
|
if msg.HasField("myfield") || !msg.HasMeta("field") {
|
||||||
|
return errors.New("moving meta 'myfield' to meta 'field' failed")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
pre: func(msg lp.CCMessage) error {
|
||||||
|
msg.AddField("myfield", 12)
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestConfigList(t *testing.T) {
|
||||||
|
for _, c := range test_configs {
|
||||||
|
t.Run(c.name, func(t *testing.T) {
|
||||||
|
m, err := lp.NewMetric("net_bytes_in", map[string]string{"type": "node", "type-id": "0"}, map[string]string{"unit": "Byte"}, float64(1024.0), time.Now())
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if c.pre != nil {
|
||||||
|
if err = c.pre(m); err != nil {
|
||||||
|
t.Errorf("error running pre-test function: %v", err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
mp, err := NewMessageProcessor()
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
err = mp.FromConfigJSON(c.config)
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
//t.Log(m.ToLineProtocol(nil))
|
||||||
|
drop, err := mp.ProcessMessage(m)
|
||||||
|
if err != nil && !c.errors {
|
||||||
|
cclog.SetDebug()
|
||||||
|
mp.ProcessMessage(m)
|
||||||
|
t.Error(err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if drop != c.drop {
|
||||||
|
if c.drop {
|
||||||
|
t.Error("fail, message should be dropped but processor signalled NO dropping")
|
||||||
|
} else {
|
||||||
|
t.Error("fail, message should NOT be dropped but processor signalled dropping")
|
||||||
|
}
|
||||||
|
cclog.SetDebug()
|
||||||
|
mp.ProcessMessage(m)
|
||||||
|
}
|
||||||
|
if c.check != nil {
|
||||||
|
if err := c.check(m); err != nil {
|
||||||
|
t.Errorf("check failed with %v", err.Error())
|
||||||
|
t.Log("Rerun with debugging")
|
||||||
|
cclog.SetDebug()
|
||||||
|
mp.ProcessMessage(m)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkProcessing(b *testing.B) {
|
||||||
|
|
||||||
|
mlist, err := generate_message_lists(b.N, 1000)
|
||||||
|
if err != nil {
|
||||||
|
b.Error(err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
mp, err := NewMessageProcessor()
|
||||||
|
if err != nil {
|
||||||
|
b.Error(err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
err = mp.FromConfigJSON(json.RawMessage(`{"move_meta_to_tag_if": [{"if" : "name == 'mymetric'", "key":"unit", "value":"unit"}]}`))
|
||||||
|
if err != nil {
|
||||||
|
b.Error(err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
b.StartTimer()
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
for _, m := range mlist[i] {
|
||||||
|
if _, err := mp.ProcessMessage(m); err != nil {
|
||||||
|
b.Errorf("failed processing message '%s': %v", m.ToLineProtocol(nil), err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
b.StopTimer()
|
||||||
|
b.ReportMetric(float64(b.Elapsed())/float64(len(mlist)*b.N), "ns/message")
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user