mirror of
				https://github.com/ClusterCockpit/cc-metric-collector.git
				synced 2025-11-04 02:35:07 +01:00 
			
		
		
		
	New Message processor (#118)
* Add cpu_used (all-cpu_idle) to CpustatCollector * Update cc-metric-collector.init * Allow selection of timestamp precision in HttpSink * Add comment about precision requirement for cc-metric-store * Fix for API changes in gofish@v0.15.0 * Update requirements to latest version * Read sensors through redfish * Update golang toolchain to 1.21 * Remove stray error check * Update main config in configuration.md * Update Release action to use golang 1.22 stable release, no golang RPMs anymore * Update runonce action to use golang 1.22 stable release, no golang RPMs anymore * New message processor to check whether a message should be dropped or manipulate it in flight * Create a copy of message before manipulation --------- Co-authored-by: Holger Obermaier <Holger.Obermaier@kit.edu> Co-authored-by: Holger Obermaier <40787752+ho-ob@users.noreply.github.com>
This commit is contained in:
		
				
					committed by
					
						
						Thomas Roehl
					
				
			
			
				
	
			
			
			
						parent
						
							704d332082
						
					
				
				
					commit
					6d7604c74f
				
			
							
								
								
									
										84
									
								
								pkg/messageProcessor/README.md
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										84
									
								
								pkg/messageProcessor/README.md
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,84 @@
 | 
			
		||||
# Message Processor Component
 | 
			
		||||
 | 
			
		||||
Multiple parts of in the ClusterCockit ecosystem require the processing of CCMessages.
 | 
			
		||||
The main CC application using it is `cc-metric-collector`. The processing part there was originally in the metric router, the central
 | 
			
		||||
hub connecting collectors (reading local data), receivers (receiving remote data) and sinks (sending data). Already in early stages, the
 | 
			
		||||
lack of flexibility caused some trouble:
 | 
			
		||||
 | 
			
		||||
> The sysadmins wanted to keep operating their Ganglia based monitoring infrastructure while we developed the CC stack. Ganglia wants the core metrics with
 | 
			
		||||
> a specific name and resolution (right unit prefix) but there was no conversion of the data in the CC stack, so CC frontend developers wanted a different
 | 
			
		||||
> resolution for some metrics. The issue was basically the `mem_used` metric showing the currently used memory of the node. Ganglia wants it in `kByte` as provided
 | 
			
		||||
> by the Linux operating system but CC wanted it in `GByte`.
 | 
			
		||||
 | 
			
		||||
## For developers
 | 
			
		||||
 | 
			
		||||
Whenever you receive or are about to send a message out, you should provide some processing.
 | 
			
		||||
 | 
			
		||||
### Configuration of component
 | 
			
		||||
 | 
			
		||||
New operations can be added to the message processor at runtime. Of course, they can also be removed again. For the initial setup, having a configuration file
 | 
			
		||||
or some fields in a configuration file for the processing.
 | 
			
		||||
 | 
			
		||||
The message processor uses the following configuration
 | 
			
		||||
```golang
 | 
			
		||||
type messageProcessorConfig struct {
 | 
			
		||||
	DropMessages     []string          `json:"drop_messages"`      // List of metric names to drop. For fine-grained dropping use drop_messages_if
 | 
			
		||||
	DropMessagesIf   []string          `json:"drop_messages_if"`   // List of evaluatable terms to drop messages
 | 
			
		||||
	RenameMessages   map[string]string `json:"rename_messages"`    // Map to rename metric name from key to value
 | 
			
		||||
	NormalizeUnits   bool              `json:"normalize_units"`    // Check unit meta flag and normalize it using cc-units
 | 
			
		||||
	ChangeUnitPrefix map[string]string `json:"change_unit_prefix"` // Add prefix that should be applied to the messages
 | 
			
		||||
}
 | 
			
		||||
```
 | 
			
		||||
 | 
			
		||||
In order to load the configuration from a `json.RawMessage`:
 | 
			
		||||
```golang
 | 
			
		||||
mp, _ := NewMessageProcessor()
 | 
			
		||||
 | 
			
		||||
mp.FromConfigJSON(configJson)
 | 
			
		||||
```
 | 
			
		||||
 | 
			
		||||
### Using the component
 | 
			
		||||
After initialization and adding the different operations, the `ProcessMessage()` function applies all operations and returns whether the message should be dropped.
 | 
			
		||||
 | 
			
		||||
```golang
 | 
			
		||||
m := lp.CCMetric{}
 | 
			
		||||
 | 
			
		||||
drop, err := mp.ProcessMessage(m)
 | 
			
		||||
if !drop {
 | 
			
		||||
    // process further
 | 
			
		||||
}
 | 
			
		||||
```
 | 
			
		||||
 | 
			
		||||
#### Overhead
 | 
			
		||||
 | 
			
		||||
The operations taking conditions are pre-processed, which is commonly the time consuming part but, of course, with each added operation, the time to process a message
 | 
			
		||||
increases.
 | 
			
		||||
 | 
			
		||||
## For users
 | 
			
		||||
 | 
			
		||||
### Syntax for evaluatable terms
 | 
			
		||||
 | 
			
		||||
The message processor uses `gval` for evaluating the terms. It provides a basic set of operators like string comparison and arithmetic operations.
 | 
			
		||||
 | 
			
		||||
Accessible for operations are
 | 
			
		||||
- `name` of the message
 | 
			
		||||
- `timestamp` of the message
 | 
			
		||||
- `type`, `type-id` of the message (also `tag_type` and `tag_type-id`)
 | 
			
		||||
- `stype`, `stype-id` of the message (if message has theses tags, also `tag_stype` and `tag_stype-id`)
 | 
			
		||||
- `value` for a CCMetric message (also `field_value`)
 | 
			
		||||
- `event` for a CCEvent message (also `field_event`)
 | 
			
		||||
- `control` for a CCControl message (also `field_control`)
 | 
			
		||||
- `log` for a CCLog message (also `field_log`)
 | 
			
		||||
 | 
			
		||||
Generally, all tags are accessible with `tag_<tagkey>`, all meta information with `meta_<metakey>` and fields with `field_<fieldkey>`.
 | 
			
		||||
 | 
			
		||||
- Comparing strings: `==`, `!=`, `match(str, regex)` (use `%` instead of `\`!)
 | 
			
		||||
- Combining conditions: `&&`, `||`
 | 
			
		||||
- Comparing numbers: `==`, `!=`, `<`, `>`, `<=`, `>=`
 | 
			
		||||
- Test lists: `<value> in <list>`
 | 
			
		||||
- Topological tests: `tag_type-id in getCpuListOfType("socket", "1")` (test if the metric belongs to socket 1 in local node topology)
 | 
			
		||||
 | 
			
		||||
Often the operations are written in JSON files for loading them at startup. In JSON, some characters are not allowed. Therefore, the term syntax reflects that:
 | 
			
		||||
- use `''` instead of `""` for strings
 | 
			
		||||
- for the regexes, use `%` instead of `\`
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										987
									
								
								pkg/messageProcessor/messageProcessor.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										987
									
								
								pkg/messageProcessor/messageProcessor.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,987 @@
 | 
			
		||||
package messageprocessor
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"encoding/json"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"strings"
 | 
			
		||||
	"sync"
 | 
			
		||||
 | 
			
		||||
	lp2 "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message"
 | 
			
		||||
	cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
 | 
			
		||||
	lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
 | 
			
		||||
	"github.com/expr-lang/expr"
 | 
			
		||||
	"github.com/expr-lang/expr/vm"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// Message processor add/delete tag/meta configuration
 | 
			
		||||
type messageProcessorTagConfig struct {
 | 
			
		||||
	Key       string `json:"key"`   // Tag name
 | 
			
		||||
	Value     string `json:"value"` // Tag value
 | 
			
		||||
	Condition string `json:"if"`    // Condition for adding or removing corresponding tag
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type messageProcessorConfig struct {
 | 
			
		||||
	StageOrder       []string                    `json:"stage_order,omitempty"`        // List of stages to execute them in the specified order and to skip unrequired ones
 | 
			
		||||
	DropMessages     []string                    `json:"drop_messages,omitempty"`      // List of metric names to drop. For fine-grained dropping use drop_messages_if
 | 
			
		||||
	DropMessagesIf   []string                    `json:"drop_messages_if,omitempty"`   // List of evaluatable terms to drop messages
 | 
			
		||||
	RenameMessages   map[string]string           `json:"rename_messages,omitempty"`    // Map of metric names to rename
 | 
			
		||||
	RenameMessagesIf map[string]string           `json:"rename_messages_if,omitempty"` // Map to rename metric name based on a condition
 | 
			
		||||
	NormalizeUnits   bool                        `json:"normalize_units,omitempty"`    // Check unit meta flag and normalize it using cc-units
 | 
			
		||||
	ChangeUnitPrefix map[string]string           `json:"change_unit_prefix,omitempty"` // Add prefix that should be applied to the messages
 | 
			
		||||
	AddTagsIf        []messageProcessorTagConfig `json:"add_tags_if"`                  // List of tags that are added when the condition is met
 | 
			
		||||
	DelTagsIf        []messageProcessorTagConfig `json:"delete_tags_if"`               // List of tags that are removed when the condition is met
 | 
			
		||||
	AddMetaIf        []messageProcessorTagConfig `json:"add_meta_if"`                  // List of meta infos that are added when the condition is met
 | 
			
		||||
	DelMetaIf        []messageProcessorTagConfig `json:"delete_meta_if"`               // List of meta infos that are removed when the condition is met
 | 
			
		||||
	AddFieldIf       []messageProcessorTagConfig `json:"add_fields_if"`                // List of fields that are added when the condition is met
 | 
			
		||||
	DelFieldIf       []messageProcessorTagConfig `json:"delete_fields_if"`             // List of fields that are removed when the condition is met
 | 
			
		||||
	DropByType       []string                    `json:"drop_by_message_type"`         // List of message types that should be dropped
 | 
			
		||||
	MoveTagToMeta    []messageProcessorTagConfig `json:"move_tag_to_meta_if"`
 | 
			
		||||
	MoveTagToField   []messageProcessorTagConfig `json:"move_tag_to_field_if"`
 | 
			
		||||
	MoveMetaToTag    []messageProcessorTagConfig `json:"move_meta_to_tag_if"`
 | 
			
		||||
	MoveMetaToField  []messageProcessorTagConfig `json:"move_meta_to_field_if"`
 | 
			
		||||
	MoveFieldToTag   []messageProcessorTagConfig `json:"move_field_to_tag_if"`
 | 
			
		||||
	MoveFieldToMeta  []messageProcessorTagConfig `json:"move_field_to_meta_if"`
 | 
			
		||||
	AddBaseEnv       map[string]interface{}      `json:"add_base_env"`
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type messageProcessor struct {
 | 
			
		||||
 | 
			
		||||
	// For thread-safety
 | 
			
		||||
	mutex sync.RWMutex
 | 
			
		||||
 | 
			
		||||
	// mapping contains all evalables as strings to gval.Evaluable
 | 
			
		||||
	// because it is not possible to get the original string out of
 | 
			
		||||
	// a gval.Evaluable
 | 
			
		||||
	mapping map[string]*vm.Program
 | 
			
		||||
 | 
			
		||||
	stages           []string                 // order of stage execution
 | 
			
		||||
	dropMessages     map[string]struct{}      // internal lookup map
 | 
			
		||||
	dropTypes        map[string]struct{}      // internal lookup map
 | 
			
		||||
	dropMessagesIf   map[*vm.Program]struct{} // pre-processed dropMessagesIf
 | 
			
		||||
	renameMessages   map[string]string        // internal lookup map
 | 
			
		||||
	renameMessagesIf map[*vm.Program]string   // pre-processed RenameMessagesIf
 | 
			
		||||
	changeUnitPrefix map[*vm.Program]string   // pre-processed ChangeUnitPrefix
 | 
			
		||||
	normalizeUnits   bool
 | 
			
		||||
	addTagsIf        map[*vm.Program]messageProcessorTagConfig // pre-processed AddTagsIf
 | 
			
		||||
	deleteTagsIf     map[*vm.Program]messageProcessorTagConfig // pre-processed DelTagsIf
 | 
			
		||||
	addMetaIf        map[*vm.Program]messageProcessorTagConfig // pre-processed AddMetaIf
 | 
			
		||||
	deleteMetaIf     map[*vm.Program]messageProcessorTagConfig // pre-processed DelMetaIf
 | 
			
		||||
	addFieldIf       map[*vm.Program]messageProcessorTagConfig // pre-processed AddFieldIf
 | 
			
		||||
	deleteFieldIf    map[*vm.Program]messageProcessorTagConfig // pre-processed DelFieldIf
 | 
			
		||||
	moveTagToMeta    map[*vm.Program]messageProcessorTagConfig // pre-processed MoveTagToMeta
 | 
			
		||||
	moveTagToField   map[*vm.Program]messageProcessorTagConfig // pre-processed MoveTagToField
 | 
			
		||||
	moveMetaToTag    map[*vm.Program]messageProcessorTagConfig // pre-processed MoveMetaToTag
 | 
			
		||||
	moveMetaToField  map[*vm.Program]messageProcessorTagConfig // pre-processed MoveMetaToField
 | 
			
		||||
	moveFieldToTag   map[*vm.Program]messageProcessorTagConfig // pre-processed MoveFieldToTag
 | 
			
		||||
	moveFieldToMeta  map[*vm.Program]messageProcessorTagConfig // pre-processed MoveFieldToMeta
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type MessageProcessor interface {
 | 
			
		||||
	// Functions to set the execution order of the processing stages
 | 
			
		||||
	SetStages([]string) error
 | 
			
		||||
	DefaultStages() []string
 | 
			
		||||
	// Function to add variables to the base evaluation environment
 | 
			
		||||
	AddBaseEnv(env map[string]interface{}) error
 | 
			
		||||
	// Functions to add and remove rules
 | 
			
		||||
	AddDropMessagesByName(name string) error
 | 
			
		||||
	RemoveDropMessagesByName(name string)
 | 
			
		||||
	AddDropMessagesByCondition(condition string) error
 | 
			
		||||
	RemoveDropMessagesByCondition(condition string)
 | 
			
		||||
	AddRenameMetricByCondition(condition string, name string) error
 | 
			
		||||
	RemoveRenameMetricByCondition(condition string)
 | 
			
		||||
	AddRenameMetricByName(from, to string) error
 | 
			
		||||
	RemoveRenameMetricByName(from string)
 | 
			
		||||
	SetNormalizeUnits(settings bool)
 | 
			
		||||
	AddChangeUnitPrefix(condition string, prefix string) error
 | 
			
		||||
	RemoveChangeUnitPrefix(condition string)
 | 
			
		||||
	AddAddTagsByCondition(condition, key, value string) error
 | 
			
		||||
	RemoveAddTagsByCondition(condition string)
 | 
			
		||||
	AddDeleteTagsByCondition(condition, key, value string) error
 | 
			
		||||
	RemoveDeleteTagsByCondition(condition string)
 | 
			
		||||
	AddAddMetaByCondition(condition, key, value string) error
 | 
			
		||||
	RemoveAddMetaByCondition(condition string)
 | 
			
		||||
	AddDeleteMetaByCondition(condition, key, value string) error
 | 
			
		||||
	RemoveDeleteMetaByCondition(condition string)
 | 
			
		||||
	AddMoveTagToMeta(condition, key, value string) error
 | 
			
		||||
	RemoveMoveTagToMeta(condition string)
 | 
			
		||||
	AddMoveTagToFields(condition, key, value string) error
 | 
			
		||||
	RemoveMoveTagToFields(condition string)
 | 
			
		||||
	AddMoveMetaToTags(condition, key, value string) error
 | 
			
		||||
	RemoveMoveMetaToTags(condition string)
 | 
			
		||||
	AddMoveMetaToFields(condition, key, value string) error
 | 
			
		||||
	RemoveMoveMetaToFields(condition string)
 | 
			
		||||
	AddMoveFieldToTags(condition, key, value string) error
 | 
			
		||||
	RemoveMoveFieldToTags(condition string)
 | 
			
		||||
	AddMoveFieldToMeta(condition, key, value string) error
 | 
			
		||||
	RemoveMoveFieldToMeta(condition string)
 | 
			
		||||
	// Read in a JSON configuration
 | 
			
		||||
	FromConfigJSON(config json.RawMessage) error
 | 
			
		||||
	// Processing functions for legacy CCMetric and current CCMessage
 | 
			
		||||
	ProcessMetric(m lp.CCMetric) (lp2.CCMessage, error)
 | 
			
		||||
	ProcessMessage(m lp2.CCMessage) (lp2.CCMessage, error)
 | 
			
		||||
	//EvalToBool(condition string, parameters map[string]interface{}) (bool, error)
 | 
			
		||||
	//EvalToFloat64(condition string, parameters map[string]interface{}) (float64, error)
 | 
			
		||||
	//EvalToString(condition string, parameters map[string]interface{}) (string, error)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
const (
 | 
			
		||||
	STAGENAME_DROP_BY_NAME       string = "drop_by_name"
 | 
			
		||||
	STAGENAME_DROP_BY_TYPE       string = "drop_by_type"
 | 
			
		||||
	STAGENAME_DROP_IF            string = "drop_if"
 | 
			
		||||
	STAGENAME_ADD_TAG            string = "add_tag"
 | 
			
		||||
	STAGENAME_DELETE_TAG         string = "delete_tag"
 | 
			
		||||
	STAGENAME_MOVE_TAG_META      string = "move_tag_to_meta"
 | 
			
		||||
	STAGENAME_MOVE_TAG_FIELD     string = "move_tag_to_fields"
 | 
			
		||||
	STAGENAME_ADD_META           string = "add_meta"
 | 
			
		||||
	STAGENAME_DELETE_META        string = "delete_meta"
 | 
			
		||||
	STAGENAME_MOVE_META_TAG      string = "move_meta_to_tags"
 | 
			
		||||
	STAGENAME_MOVE_META_FIELD    string = "move_meta_to_fields"
 | 
			
		||||
	STAGENAME_ADD_FIELD          string = "add_field"
 | 
			
		||||
	STAGENAME_DELETE_FIELD       string = "delete_field"
 | 
			
		||||
	STAGENAME_MOVE_FIELD_TAG     string = "move_field_to_tags"
 | 
			
		||||
	STAGENAME_MOVE_FIELD_META    string = "move_field_to_meta"
 | 
			
		||||
	STAGENAME_RENAME_BY_NAME     string = "rename"
 | 
			
		||||
	STAGENAME_RENAME_IF          string = "rename_if"
 | 
			
		||||
	STAGENAME_CHANGE_UNIT_PREFIX string = "change_unit_prefix"
 | 
			
		||||
	STAGENAME_NORMALIZE_UNIT     string = "normalize_unit"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
var StageNames = []string{
 | 
			
		||||
	STAGENAME_DROP_BY_NAME,
 | 
			
		||||
	STAGENAME_DROP_BY_TYPE,
 | 
			
		||||
	STAGENAME_DROP_IF,
 | 
			
		||||
	STAGENAME_ADD_TAG,
 | 
			
		||||
	STAGENAME_DELETE_TAG,
 | 
			
		||||
	STAGENAME_MOVE_TAG_META,
 | 
			
		||||
	STAGENAME_MOVE_TAG_FIELD,
 | 
			
		||||
	STAGENAME_ADD_META,
 | 
			
		||||
	STAGENAME_DELETE_META,
 | 
			
		||||
	STAGENAME_MOVE_META_TAG,
 | 
			
		||||
	STAGENAME_MOVE_META_FIELD,
 | 
			
		||||
	STAGENAME_ADD_FIELD,
 | 
			
		||||
	STAGENAME_DELETE_FIELD,
 | 
			
		||||
	STAGENAME_MOVE_FIELD_TAG,
 | 
			
		||||
	STAGENAME_MOVE_FIELD_META,
 | 
			
		||||
	STAGENAME_RENAME_BY_NAME,
 | 
			
		||||
	STAGENAME_RENAME_IF,
 | 
			
		||||
	STAGENAME_CHANGE_UNIT_PREFIX,
 | 
			
		||||
	STAGENAME_NORMALIZE_UNIT,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
var paramMapPool = sync.Pool{
 | 
			
		||||
	New: func() any {
 | 
			
		||||
		return make(map[string]interface{})
 | 
			
		||||
	},
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func sanitizeExprString(key string) string {
 | 
			
		||||
	return strings.ReplaceAll(key, "type-id", "typeid")
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func getParamMap(point lp.CCMetric) map[string]interface{} {
 | 
			
		||||
	params := paramMapPool.Get().(map[string]interface{})
 | 
			
		||||
	params["message"] = point
 | 
			
		||||
	params["msg"] = point
 | 
			
		||||
	params["name"] = point.Name()
 | 
			
		||||
	params["timestamp"] = point.Time().Unix()
 | 
			
		||||
	params["time"] = params["timestamp"]
 | 
			
		||||
 | 
			
		||||
	fields := paramMapPool.Get().(map[string]interface{})
 | 
			
		||||
	for key, value := range point.Fields() {
 | 
			
		||||
		fields[key] = value
 | 
			
		||||
		switch key {
 | 
			
		||||
		case "value":
 | 
			
		||||
			params["messagetype"] = "metric"
 | 
			
		||||
			params["value"] = value
 | 
			
		||||
			params["metric"] = value
 | 
			
		||||
		case "event":
 | 
			
		||||
			params["messagetype"] = "event"
 | 
			
		||||
			params["event"] = value
 | 
			
		||||
		case "control":
 | 
			
		||||
			params["messagetype"] = "control"
 | 
			
		||||
			params["control"] = value
 | 
			
		||||
		case "log":
 | 
			
		||||
			params["messagetype"] = "log"
 | 
			
		||||
			params["log"] = value
 | 
			
		||||
		default:
 | 
			
		||||
			params["messagetype"] = "unknown"
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	params["msgtype"] = params["messagetype"]
 | 
			
		||||
	params["fields"] = fields
 | 
			
		||||
	params["field"] = fields
 | 
			
		||||
	tags := paramMapPool.Get().(map[string]interface{})
 | 
			
		||||
	for key, value := range point.Tags() {
 | 
			
		||||
		tags[sanitizeExprString(key)] = value
 | 
			
		||||
	}
 | 
			
		||||
	params["tags"] = tags
 | 
			
		||||
	params["tag"] = tags
 | 
			
		||||
	meta := paramMapPool.Get().(map[string]interface{})
 | 
			
		||||
	for key, value := range point.Meta() {
 | 
			
		||||
		meta[sanitizeExprString(key)] = value
 | 
			
		||||
	}
 | 
			
		||||
	params["meta"] = meta
 | 
			
		||||
	return params
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
var baseenv = map[string]interface{}{
 | 
			
		||||
	"name":        "",
 | 
			
		||||
	"messagetype": "unknown",
 | 
			
		||||
	"msgtype":     "unknown",
 | 
			
		||||
	"tag": map[string]interface{}{
 | 
			
		||||
		"type":     "unknown",
 | 
			
		||||
		"typeid":   "0",
 | 
			
		||||
		"stype":    "unknown",
 | 
			
		||||
		"stypeid":  "0",
 | 
			
		||||
		"hostname": "localhost",
 | 
			
		||||
		"cluster":  "nocluster",
 | 
			
		||||
	},
 | 
			
		||||
	"tags": map[string]interface{}{
 | 
			
		||||
		"type":     "unknown",
 | 
			
		||||
		"typeid":   "0",
 | 
			
		||||
		"stype":    "unknown",
 | 
			
		||||
		"stypeid":  "0",
 | 
			
		||||
		"hostname": "localhost",
 | 
			
		||||
		"cluster":  "nocluster",
 | 
			
		||||
	},
 | 
			
		||||
	"meta": map[string]interface{}{
 | 
			
		||||
		"unit":   "invalid",
 | 
			
		||||
		"source": "unknown",
 | 
			
		||||
	},
 | 
			
		||||
	"fields": map[string]interface{}{
 | 
			
		||||
		"value":   0,
 | 
			
		||||
		"event":   "",
 | 
			
		||||
		"control": "",
 | 
			
		||||
		"log":     "",
 | 
			
		||||
	},
 | 
			
		||||
	"field": map[string]interface{}{
 | 
			
		||||
		"value":   0,
 | 
			
		||||
		"event":   "",
 | 
			
		||||
		"control": "",
 | 
			
		||||
		"log":     "",
 | 
			
		||||
	},
 | 
			
		||||
	"timestamp": 1234567890,
 | 
			
		||||
	"msg":       lp2.EmptyMessage(),
 | 
			
		||||
	"message":   lp2.EmptyMessage(),
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func addBaseEnvWalker(values map[string]interface{}) map[string]interface{} {
 | 
			
		||||
	out := make(map[string]interface{})
 | 
			
		||||
	for k, v := range values {
 | 
			
		||||
		switch value := v.(type) {
 | 
			
		||||
		case int, int32, int64, uint, uint32, uint64, string, float32, float64:
 | 
			
		||||
			out[k] = value
 | 
			
		||||
		case map[string]interface{}:
 | 
			
		||||
			if _, ok := baseenv[k]; !ok {
 | 
			
		||||
				out[k] = addBaseEnvWalker(value)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return out
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (mp *messageProcessor) AddBaseEnv(env map[string]interface{}) error {
 | 
			
		||||
	for k, v := range env {
 | 
			
		||||
		switch value := v.(type) {
 | 
			
		||||
		case int, int32, int64, uint, uint32, uint64, string, float32, float64:
 | 
			
		||||
			baseenv[k] = value
 | 
			
		||||
		case map[string]interface{}:
 | 
			
		||||
			if _, ok := baseenv[k]; !ok {
 | 
			
		||||
				baseenv[k] = addBaseEnvWalker(value)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (mp *messageProcessor) init() error {
 | 
			
		||||
	mp.stages = make([]string, 0)
 | 
			
		||||
	mp.mapping = make(map[string]*vm.Program)
 | 
			
		||||
	mp.dropMessages = make(map[string]struct{})
 | 
			
		||||
	mp.dropTypes = make(map[string]struct{})
 | 
			
		||||
	mp.dropMessagesIf = make(map[*vm.Program]struct{})
 | 
			
		||||
	mp.renameMessages = make(map[string]string)
 | 
			
		||||
	mp.renameMessagesIf = make(map[*vm.Program]string)
 | 
			
		||||
	mp.changeUnitPrefix = make(map[*vm.Program]string)
 | 
			
		||||
	mp.addTagsIf = make(map[*vm.Program]messageProcessorTagConfig)
 | 
			
		||||
	mp.addMetaIf = make(map[*vm.Program]messageProcessorTagConfig)
 | 
			
		||||
	mp.addFieldIf = make(map[*vm.Program]messageProcessorTagConfig)
 | 
			
		||||
	mp.deleteTagsIf = make(map[*vm.Program]messageProcessorTagConfig)
 | 
			
		||||
	mp.deleteMetaIf = make(map[*vm.Program]messageProcessorTagConfig)
 | 
			
		||||
	mp.deleteFieldIf = make(map[*vm.Program]messageProcessorTagConfig)
 | 
			
		||||
	mp.moveFieldToMeta = make(map[*vm.Program]messageProcessorTagConfig)
 | 
			
		||||
	mp.moveFieldToTag = make(map[*vm.Program]messageProcessorTagConfig)
 | 
			
		||||
	mp.moveMetaToField = make(map[*vm.Program]messageProcessorTagConfig)
 | 
			
		||||
	mp.moveMetaToTag = make(map[*vm.Program]messageProcessorTagConfig)
 | 
			
		||||
	mp.moveTagToField = make(map[*vm.Program]messageProcessorTagConfig)
 | 
			
		||||
	mp.moveTagToMeta = make(map[*vm.Program]messageProcessorTagConfig)
 | 
			
		||||
	mp.normalizeUnits = false
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (mp *messageProcessor) AddDropMessagesByName(name string) error {
 | 
			
		||||
	mp.mutex.Lock()
 | 
			
		||||
	if _, ok := mp.dropMessages[name]; !ok {
 | 
			
		||||
		mp.dropMessages[name] = struct{}{}
 | 
			
		||||
	}
 | 
			
		||||
	mp.mutex.Unlock()
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (mp *messageProcessor) RemoveDropMessagesByName(name string) {
 | 
			
		||||
	mp.mutex.Lock()
 | 
			
		||||
	delete(mp.dropMessages, name)
 | 
			
		||||
	mp.mutex.Unlock()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (mp *messageProcessor) AddDropMessagesByType(typestring string) error {
 | 
			
		||||
	valid := []string{"metric", "event", "control", "log"}
 | 
			
		||||
	isValid := false
 | 
			
		||||
	for _, t := range valid {
 | 
			
		||||
		if t == typestring {
 | 
			
		||||
			isValid = true
 | 
			
		||||
			break
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	if isValid {
 | 
			
		||||
		mp.mutex.Lock()
 | 
			
		||||
		if _, ok := mp.dropTypes[typestring]; !ok {
 | 
			
		||||
			cclog.ComponentDebug("MessageProcessor", "Adding type", typestring, "for dropping")
 | 
			
		||||
			mp.dropTypes[typestring] = struct{}{}
 | 
			
		||||
		}
 | 
			
		||||
		mp.mutex.Unlock()
 | 
			
		||||
	} else {
 | 
			
		||||
		return fmt.Errorf("invalid message type %s", typestring)
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (mp *messageProcessor) RemoveDropMessagesByType(typestring string) {
 | 
			
		||||
	mp.mutex.Lock()
 | 
			
		||||
	delete(mp.dropTypes, typestring)
 | 
			
		||||
	mp.mutex.Unlock()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (mp *messageProcessor) addTagConfig(condition, key, value string, config *map[*vm.Program]messageProcessorTagConfig) error {
 | 
			
		||||
	var err error
 | 
			
		||||
	evaluable, err := expr.Compile(sanitizeExprString(condition), expr.Env(baseenv), expr.AsBool())
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return fmt.Errorf("failed to create condition evaluable of '%s': %v", condition, err.Error())
 | 
			
		||||
	}
 | 
			
		||||
	mp.mutex.Lock()
 | 
			
		||||
	if _, ok := (*config)[evaluable]; !ok {
 | 
			
		||||
		mp.mapping[condition] = evaluable
 | 
			
		||||
		(*config)[evaluable] = messageProcessorTagConfig{
 | 
			
		||||
			Condition: condition,
 | 
			
		||||
			Key:       key,
 | 
			
		||||
			Value:     value,
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	mp.mutex.Unlock()
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (mp *messageProcessor) removeTagConfig(condition string, config *map[*vm.Program]messageProcessorTagConfig) {
 | 
			
		||||
	mp.mutex.Lock()
 | 
			
		||||
	if e, ok := mp.mapping[condition]; ok {
 | 
			
		||||
		delete(mp.mapping, condition)
 | 
			
		||||
		delete(*config, e)
 | 
			
		||||
	}
 | 
			
		||||
	mp.mutex.Unlock()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (mp *messageProcessor) AddAddTagsByCondition(condition, key, value string) error {
 | 
			
		||||
	return mp.addTagConfig(condition, key, value, &mp.addTagsIf)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (mp *messageProcessor) RemoveAddTagsByCondition(condition string) {
 | 
			
		||||
	mp.removeTagConfig(condition, &mp.addTagsIf)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (mp *messageProcessor) AddDeleteTagsByCondition(condition, key, value string) error {
 | 
			
		||||
	return mp.addTagConfig(condition, key, value, &mp.deleteTagsIf)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (mp *messageProcessor) RemoveDeleteTagsByCondition(condition string) {
 | 
			
		||||
	mp.removeTagConfig(condition, &mp.deleteTagsIf)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (mp *messageProcessor) AddAddMetaByCondition(condition, key, value string) error {
 | 
			
		||||
	return mp.addTagConfig(condition, key, value, &mp.addMetaIf)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (mp *messageProcessor) RemoveAddMetaByCondition(condition string) {
 | 
			
		||||
	mp.removeTagConfig(condition, &mp.addMetaIf)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (mp *messageProcessor) AddDeleteMetaByCondition(condition, key, value string) error {
 | 
			
		||||
	return mp.addTagConfig(condition, key, value, &mp.deleteMetaIf)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (mp *messageProcessor) RemoveDeleteMetaByCondition(condition string) {
 | 
			
		||||
	mp.removeTagConfig(condition, &mp.deleteMetaIf)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (mp *messageProcessor) AddAddFieldByCondition(condition, key, value string) error {
 | 
			
		||||
	return mp.addTagConfig(condition, key, value, &mp.addFieldIf)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (mp *messageProcessor) RemoveAddFieldByCondition(condition string) {
 | 
			
		||||
	mp.removeTagConfig(condition, &mp.addFieldIf)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (mp *messageProcessor) AddDeleteFieldByCondition(condition, key, value string) error {
 | 
			
		||||
	return mp.addTagConfig(condition, key, value, &mp.deleteFieldIf)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (mp *messageProcessor) RemoveDeleteFieldByCondition(condition string) {
 | 
			
		||||
	mp.removeTagConfig(condition, &mp.deleteFieldIf)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (mp *messageProcessor) AddDropMessagesByCondition(condition string) error {
 | 
			
		||||
 | 
			
		||||
	var err error
 | 
			
		||||
	evaluable, err := expr.Compile(sanitizeExprString(condition), expr.Env(baseenv), expr.AsBool())
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return fmt.Errorf("failed to create condition evaluable of '%s': %v", condition, err.Error())
 | 
			
		||||
	}
 | 
			
		||||
	mp.mutex.Lock()
 | 
			
		||||
	if _, ok := mp.dropMessagesIf[evaluable]; !ok {
 | 
			
		||||
		mp.mapping[condition] = evaluable
 | 
			
		||||
		mp.dropMessagesIf[evaluable] = struct{}{}
 | 
			
		||||
	}
 | 
			
		||||
	mp.mutex.Unlock()
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (mp *messageProcessor) RemoveDropMessagesByCondition(condition string) {
 | 
			
		||||
	mp.mutex.Lock()
 | 
			
		||||
	if e, ok := mp.mapping[condition]; ok {
 | 
			
		||||
		delete(mp.mapping, condition)
 | 
			
		||||
		delete(mp.dropMessagesIf, e)
 | 
			
		||||
	}
 | 
			
		||||
	mp.mutex.Unlock()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (mp *messageProcessor) AddRenameMetricByCondition(condition string, name string) error {
 | 
			
		||||
 | 
			
		||||
	var err error
 | 
			
		||||
	evaluable, err := expr.Compile(sanitizeExprString(condition), expr.Env(baseenv), expr.AsBool())
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return fmt.Errorf("failed to create condition evaluable of '%s': %v", condition, err.Error())
 | 
			
		||||
	}
 | 
			
		||||
	mp.mutex.Lock()
 | 
			
		||||
	if _, ok := mp.renameMessagesIf[evaluable]; !ok {
 | 
			
		||||
		mp.mapping[condition] = evaluable
 | 
			
		||||
		mp.renameMessagesIf[evaluable] = name
 | 
			
		||||
	} else {
 | 
			
		||||
		mp.renameMessagesIf[evaluable] = name
 | 
			
		||||
	}
 | 
			
		||||
	mp.mutex.Unlock()
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (mp *messageProcessor) RemoveRenameMetricByCondition(condition string) {
 | 
			
		||||
	mp.mutex.Lock()
 | 
			
		||||
	if e, ok := mp.mapping[condition]; ok {
 | 
			
		||||
		delete(mp.mapping, condition)
 | 
			
		||||
		delete(mp.renameMessagesIf, e)
 | 
			
		||||
	}
 | 
			
		||||
	mp.mutex.Unlock()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (mp *messageProcessor) SetNormalizeUnits(setting bool) {
 | 
			
		||||
	mp.normalizeUnits = setting
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (mp *messageProcessor) AddChangeUnitPrefix(condition string, prefix string) error {
 | 
			
		||||
 | 
			
		||||
	var err error
 | 
			
		||||
	evaluable, err := expr.Compile(sanitizeExprString(condition), expr.Env(baseenv), expr.AsBool())
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return fmt.Errorf("failed to create condition evaluable of '%s': %v", condition, err.Error())
 | 
			
		||||
	}
 | 
			
		||||
	mp.mutex.Lock()
 | 
			
		||||
	if _, ok := mp.changeUnitPrefix[evaluable]; !ok {
 | 
			
		||||
		mp.mapping[condition] = evaluable
 | 
			
		||||
		mp.changeUnitPrefix[evaluable] = prefix
 | 
			
		||||
	} else {
 | 
			
		||||
		mp.changeUnitPrefix[evaluable] = prefix
 | 
			
		||||
	}
 | 
			
		||||
	mp.mutex.Unlock()
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (mp *messageProcessor) RemoveChangeUnitPrefix(condition string) {
 | 
			
		||||
	mp.mutex.Lock()
 | 
			
		||||
	if e, ok := mp.mapping[condition]; ok {
 | 
			
		||||
		delete(mp.mapping, condition)
 | 
			
		||||
		delete(mp.changeUnitPrefix, e)
 | 
			
		||||
	}
 | 
			
		||||
	mp.mutex.Unlock()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (mp *messageProcessor) AddRenameMetricByName(from, to string) error {
 | 
			
		||||
	mp.mutex.Lock()
 | 
			
		||||
	if _, ok := mp.renameMessages[from]; !ok {
 | 
			
		||||
		mp.renameMessages[from] = to
 | 
			
		||||
	}
 | 
			
		||||
	mp.mutex.Unlock()
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (mp *messageProcessor) RemoveRenameMetricByName(from string) {
 | 
			
		||||
	mp.mutex.Lock()
 | 
			
		||||
	delete(mp.renameMessages, from)
 | 
			
		||||
	mp.mutex.Unlock()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (mp *messageProcessor) AddMoveTagToMeta(condition, key, value string) error {
 | 
			
		||||
	return mp.addTagConfig(condition, key, value, &mp.moveTagToMeta)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (mp *messageProcessor) RemoveMoveTagToMeta(condition string) {
 | 
			
		||||
	mp.removeTagConfig(condition, &mp.moveTagToMeta)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (mp *messageProcessor) AddMoveTagToFields(condition, key, value string) error {
 | 
			
		||||
	return mp.addTagConfig(condition, key, value, &mp.moveTagToField)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (mp *messageProcessor) RemoveMoveTagToFields(condition string) {
 | 
			
		||||
	mp.removeTagConfig(condition, &mp.moveTagToField)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (mp *messageProcessor) AddMoveMetaToTags(condition, key, value string) error {
 | 
			
		||||
	return mp.addTagConfig(condition, key, value, &mp.moveMetaToTag)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (mp *messageProcessor) RemoveMoveMetaToTags(condition string) {
 | 
			
		||||
	mp.removeTagConfig(condition, &mp.moveMetaToTag)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (mp *messageProcessor) AddMoveMetaToFields(condition, key, value string) error {
 | 
			
		||||
	return mp.addTagConfig(condition, key, value, &mp.moveMetaToField)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (mp *messageProcessor) RemoveMoveMetaToFields(condition string) {
 | 
			
		||||
	mp.removeTagConfig(condition, &mp.moveMetaToField)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (mp *messageProcessor) AddMoveFieldToTags(condition, key, value string) error {
 | 
			
		||||
	return mp.addTagConfig(condition, key, value, &mp.moveFieldToTag)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (mp *messageProcessor) RemoveMoveFieldToTags(condition string) {
 | 
			
		||||
	mp.removeTagConfig(condition, &mp.moveFieldToTag)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (mp *messageProcessor) AddMoveFieldToMeta(condition, key, value string) error {
 | 
			
		||||
	return mp.addTagConfig(condition, key, value, &mp.moveFieldToMeta)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (mp *messageProcessor) RemoveMoveFieldToMeta(condition string) {
 | 
			
		||||
	mp.removeTagConfig(condition, &mp.moveFieldToMeta)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (mp *messageProcessor) SetStages(stages []string) error {
 | 
			
		||||
	newstages := make([]string, 0)
 | 
			
		||||
	if len(stages) == 0 {
 | 
			
		||||
		mp.mutex.Lock()
 | 
			
		||||
		mp.stages = newstages
 | 
			
		||||
		mp.mutex.Unlock()
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
	for i, s := range stages {
 | 
			
		||||
		valid := false
 | 
			
		||||
		for _, v := range StageNames {
 | 
			
		||||
			if s == v {
 | 
			
		||||
				valid = true
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		if valid {
 | 
			
		||||
			newstages = append(newstages, s)
 | 
			
		||||
		} else {
 | 
			
		||||
			return fmt.Errorf("invalid stage %s at index %d", s, i)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	mp.mutex.Lock()
 | 
			
		||||
	mp.stages = newstages
 | 
			
		||||
	mp.mutex.Unlock()
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (mp *messageProcessor) DefaultStages() []string {
 | 
			
		||||
	return StageNames
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (mp *messageProcessor) FromConfigJSON(config json.RawMessage) error {
 | 
			
		||||
	var c messageProcessorConfig
 | 
			
		||||
 | 
			
		||||
	err := json.Unmarshal(config, &c)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return fmt.Errorf("failed to process config JSON: %v", err.Error())
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if len(c.StageOrder) > 0 {
 | 
			
		||||
		err = mp.SetStages(c.StageOrder)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return fmt.Errorf("failed to process config JSON: %v", err.Error())
 | 
			
		||||
		}
 | 
			
		||||
	} else {
 | 
			
		||||
		err = mp.SetStages(mp.DefaultStages())
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return fmt.Errorf("failed to process config JSON: %v", err.Error())
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for _, m := range c.DropMessages {
 | 
			
		||||
		err = mp.AddDropMessagesByName(m)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return fmt.Errorf("failed to process config JSON: %v", err.Error())
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	for _, m := range c.DropByType {
 | 
			
		||||
		err = mp.AddDropMessagesByType(m)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return fmt.Errorf("failed to process config JSON: %v", err.Error())
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	for _, m := range c.DropMessagesIf {
 | 
			
		||||
		err = mp.AddDropMessagesByCondition(m)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return fmt.Errorf("failed to process config JSON: %v", err.Error())
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	for k, v := range c.RenameMessagesIf {
 | 
			
		||||
		err = mp.AddRenameMetricByCondition(k, v)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return fmt.Errorf("failed to process config JSON: %v", err.Error())
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	for k, v := range c.RenameMessages {
 | 
			
		||||
		err = mp.AddRenameMetricByName(k, v)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return fmt.Errorf("failed to process config JSON: %v", err.Error())
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	for k, v := range c.ChangeUnitPrefix {
 | 
			
		||||
		err = mp.AddChangeUnitPrefix(k, v)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return fmt.Errorf("failed to process config JSON: %v", err.Error())
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	for _, c := range c.AddTagsIf {
 | 
			
		||||
		err = mp.AddAddTagsByCondition(c.Condition, c.Key, c.Value)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return fmt.Errorf("failed to process config JSON: %v", err.Error())
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	for _, c := range c.AddMetaIf {
 | 
			
		||||
		err = mp.AddAddMetaByCondition(c.Condition, c.Key, c.Value)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return fmt.Errorf("failed to process config JSON: %v", err.Error())
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	for _, c := range c.AddFieldIf {
 | 
			
		||||
		err = mp.AddAddFieldByCondition(c.Condition, c.Key, c.Value)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return fmt.Errorf("failed to process config JSON: %v", err.Error())
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	for _, c := range c.DelTagsIf {
 | 
			
		||||
		err = mp.AddDeleteTagsByCondition(c.Condition, c.Key, c.Value)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return fmt.Errorf("failed to process config JSON: %v", err.Error())
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	for _, c := range c.DelMetaIf {
 | 
			
		||||
		err = mp.AddDeleteMetaByCondition(c.Condition, c.Key, c.Value)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return fmt.Errorf("failed to process config JSON: %v", err.Error())
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	for _, c := range c.DelFieldIf {
 | 
			
		||||
		err = mp.AddDeleteFieldByCondition(c.Condition, c.Key, c.Value)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return fmt.Errorf("failed to process config JSON: %v", err.Error())
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	for _, c := range c.MoveTagToMeta {
 | 
			
		||||
		err = mp.AddMoveTagToMeta(c.Condition, c.Key, c.Value)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return fmt.Errorf("failed to process config JSON: %v", err.Error())
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	for _, c := range c.MoveTagToField {
 | 
			
		||||
		err = mp.AddMoveTagToFields(c.Condition, c.Key, c.Value)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return fmt.Errorf("failed to process config JSON: %v", err.Error())
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	for _, c := range c.MoveMetaToTag {
 | 
			
		||||
		err = mp.AddMoveMetaToTags(c.Condition, c.Key, c.Value)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return fmt.Errorf("failed to process config JSON: %v", err.Error())
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	for _, c := range c.MoveMetaToField {
 | 
			
		||||
		err = mp.AddMoveMetaToFields(c.Condition, c.Key, c.Value)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return fmt.Errorf("failed to process config JSON: %v", err.Error())
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	for _, c := range c.MoveFieldToTag {
 | 
			
		||||
		err = mp.AddMoveFieldToTags(c.Condition, c.Key, c.Value)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return fmt.Errorf("failed to process config JSON: %v", err.Error())
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	for _, c := range c.MoveFieldToMeta {
 | 
			
		||||
		err = mp.AddMoveFieldToMeta(c.Condition, c.Key, c.Value)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return fmt.Errorf("failed to process config JSON: %v", err.Error())
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	for _, m := range c.DropByType {
 | 
			
		||||
		err = mp.AddDropMessagesByType(m)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return fmt.Errorf("failed to process config JSON: %v", err.Error())
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	if len(c.AddBaseEnv) > 0 {
 | 
			
		||||
		err = mp.AddBaseEnv(c.AddBaseEnv)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return fmt.Errorf("failed to process config JSON: %v", err.Error())
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	mp.SetNormalizeUnits(c.NormalizeUnits)
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (mp *messageProcessor) ProcessMetric(metric lp.CCMetric) (lp2.CCMessage, error) {
 | 
			
		||||
	m, err := lp2.NewMessage(
 | 
			
		||||
		metric.Name(),
 | 
			
		||||
		metric.Tags(),
 | 
			
		||||
		metric.Meta(),
 | 
			
		||||
		metric.Fields(),
 | 
			
		||||
		metric.Time(),
 | 
			
		||||
	)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return m, fmt.Errorf("failed to parse metric to message: %v", err.Error())
 | 
			
		||||
	}
 | 
			
		||||
	return mp.ProcessMessage(m)
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (mp *messageProcessor) ProcessMessage(m lp2.CCMessage) (lp2.CCMessage, error) {
 | 
			
		||||
	var err error = nil
 | 
			
		||||
	var out lp2.CCMessage = lp2.FromMessage(m)
 | 
			
		||||
 | 
			
		||||
	name := out.Name()
 | 
			
		||||
 | 
			
		||||
	if len(mp.stages) == 0 {
 | 
			
		||||
		mp.SetStages(mp.DefaultStages())
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	mp.mutex.RLock()
 | 
			
		||||
	defer mp.mutex.RUnlock()
 | 
			
		||||
 | 
			
		||||
	params := getParamMap(out)
 | 
			
		||||
 | 
			
		||||
	defer func() {
 | 
			
		||||
		params["field"] = nil
 | 
			
		||||
		params["tag"] = nil
 | 
			
		||||
		paramMapPool.Put(params["fields"])
 | 
			
		||||
		paramMapPool.Put(params["tags"])
 | 
			
		||||
		paramMapPool.Put(params["meta"])
 | 
			
		||||
		paramMapPool.Put(params)
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	for _, s := range mp.stages {
 | 
			
		||||
		switch s {
 | 
			
		||||
		case STAGENAME_DROP_BY_NAME:
 | 
			
		||||
			if len(mp.dropMessages) > 0 {
 | 
			
		||||
				cclog.ComponentDebug("MessageProcessor", "Dropping by message name ", name)
 | 
			
		||||
				if _, ok := mp.dropMessages[name]; ok {
 | 
			
		||||
					cclog.ComponentDebug("MessageProcessor", "Drop")
 | 
			
		||||
					return nil, nil
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		case STAGENAME_DROP_BY_TYPE:
 | 
			
		||||
			if len(mp.dropTypes) > 0 {
 | 
			
		||||
				cclog.ComponentDebug("MessageProcessor", "Dropping by message type")
 | 
			
		||||
				if _, ok := mp.dropTypes[params["messagetype"].(string)]; ok {
 | 
			
		||||
					cclog.ComponentDebug("MessageProcessor", "Drop")
 | 
			
		||||
					return nil, nil
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		case STAGENAME_DROP_IF:
 | 
			
		||||
			if len(mp.dropMessagesIf) > 0 {
 | 
			
		||||
				cclog.ComponentDebug("MessageProcessor", "Dropping by condition")
 | 
			
		||||
				drop, err := dropMessagesIf(¶ms, &mp.dropMessagesIf)
 | 
			
		||||
				if err != nil {
 | 
			
		||||
					return out, fmt.Errorf("failed to evaluate: %v", err.Error())
 | 
			
		||||
				}
 | 
			
		||||
				if drop {
 | 
			
		||||
					cclog.ComponentDebug("MessageProcessor", "Drop")
 | 
			
		||||
					return nil, nil
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		case STAGENAME_RENAME_BY_NAME:
 | 
			
		||||
			if len(mp.renameMessages) > 0 {
 | 
			
		||||
				cclog.ComponentDebug("MessageProcessor", "Renaming by name match")
 | 
			
		||||
				if newname, ok := mp.renameMessages[name]; ok {
 | 
			
		||||
					cclog.ComponentDebug("MessageProcessor", "Rename to", newname)
 | 
			
		||||
					out.SetName(newname)
 | 
			
		||||
					cclog.ComponentDebug("MessageProcessor", "Add old name as 'oldname' to meta", name)
 | 
			
		||||
					out.AddMeta("oldname", name)
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		case STAGENAME_RENAME_IF:
 | 
			
		||||
			if len(mp.renameMessagesIf) > 0 {
 | 
			
		||||
				cclog.ComponentDebug("MessageProcessor", "Renaming by condition")
 | 
			
		||||
				_, err := renameMessagesIf(out, ¶ms, &mp.renameMessagesIf)
 | 
			
		||||
				if err != nil {
 | 
			
		||||
					return out, fmt.Errorf("failed to evaluate: %v", err.Error())
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		case STAGENAME_ADD_TAG:
 | 
			
		||||
			if len(mp.addTagsIf) > 0 {
 | 
			
		||||
				cclog.ComponentDebug("MessageProcessor", "Adding tags")
 | 
			
		||||
				_, err = addTagIf(out, ¶ms, &mp.addTagsIf)
 | 
			
		||||
				if err != nil {
 | 
			
		||||
					return out, fmt.Errorf("failed to evaluate: %v", err.Error())
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		case STAGENAME_DELETE_TAG:
 | 
			
		||||
			if len(mp.deleteTagsIf) > 0 {
 | 
			
		||||
				cclog.ComponentDebug("MessageProcessor", "Delete tags")
 | 
			
		||||
				_, err = deleteTagIf(out, ¶ms, &mp.deleteTagsIf)
 | 
			
		||||
				if err != nil {
 | 
			
		||||
					return out, fmt.Errorf("failed to evaluate: %v", err.Error())
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		case STAGENAME_ADD_META:
 | 
			
		||||
			if len(mp.addMetaIf) > 0 {
 | 
			
		||||
				cclog.ComponentDebug("MessageProcessor", "Adding meta information")
 | 
			
		||||
				_, err = addMetaIf(out, ¶ms, &mp.addMetaIf)
 | 
			
		||||
				if err != nil {
 | 
			
		||||
					return out, fmt.Errorf("failed to evaluate: %v", err.Error())
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		case STAGENAME_DELETE_META:
 | 
			
		||||
			if len(mp.deleteMetaIf) > 0 {
 | 
			
		||||
				cclog.ComponentDebug("MessageProcessor", "Delete meta information")
 | 
			
		||||
				_, err = deleteMetaIf(out, ¶ms, &mp.deleteMetaIf)
 | 
			
		||||
				if err != nil {
 | 
			
		||||
					return out, fmt.Errorf("failed to evaluate: %v", err.Error())
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		case STAGENAME_ADD_FIELD:
 | 
			
		||||
			if len(mp.addFieldIf) > 0 {
 | 
			
		||||
				cclog.ComponentDebug("MessageProcessor", "Adding fields")
 | 
			
		||||
				_, err = addFieldIf(out, ¶ms, &mp.addFieldIf)
 | 
			
		||||
				if err != nil {
 | 
			
		||||
					return out, fmt.Errorf("failed to evaluate: %v", err.Error())
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		case STAGENAME_DELETE_FIELD:
 | 
			
		||||
			if len(mp.deleteFieldIf) > 0 {
 | 
			
		||||
				cclog.ComponentDebug("MessageProcessor", "Delete fields")
 | 
			
		||||
				_, err = deleteFieldIf(out, ¶ms, &mp.deleteFieldIf)
 | 
			
		||||
				if err != nil {
 | 
			
		||||
					return out, fmt.Errorf("failed to evaluate: %v", err.Error())
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		case STAGENAME_MOVE_TAG_META:
 | 
			
		||||
			if len(mp.moveTagToMeta) > 0 {
 | 
			
		||||
				cclog.ComponentDebug("MessageProcessor", "Move tag to meta")
 | 
			
		||||
				_, err := moveTagToMeta(out, ¶ms, &mp.moveTagToMeta)
 | 
			
		||||
				if err != nil {
 | 
			
		||||
					return out, fmt.Errorf("failed to evaluate: %v", err.Error())
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		case STAGENAME_MOVE_TAG_FIELD:
 | 
			
		||||
			if len(mp.moveTagToField) > 0 {
 | 
			
		||||
				cclog.ComponentDebug("MessageProcessor", "Move tag to fields")
 | 
			
		||||
				_, err := moveTagToField(out, ¶ms, &mp.moveTagToField)
 | 
			
		||||
				if err != nil {
 | 
			
		||||
					return out, fmt.Errorf("failed to evaluate: %v", err.Error())
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		case STAGENAME_MOVE_META_TAG:
 | 
			
		||||
			if len(mp.moveMetaToTag) > 0 {
 | 
			
		||||
				cclog.ComponentDebug("MessageProcessor", "Move meta to tags")
 | 
			
		||||
				_, err := moveMetaToTag(out, ¶ms, &mp.moveMetaToTag)
 | 
			
		||||
				if err != nil {
 | 
			
		||||
					return out, fmt.Errorf("failed to evaluate: %v", err.Error())
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		case STAGENAME_MOVE_META_FIELD:
 | 
			
		||||
			if len(mp.moveMetaToField) > 0 {
 | 
			
		||||
				cclog.ComponentDebug("MessageProcessor", "Move meta to fields")
 | 
			
		||||
				_, err := moveMetaToField(out, ¶ms, &mp.moveMetaToField)
 | 
			
		||||
				if err != nil {
 | 
			
		||||
					return out, fmt.Errorf("failed to evaluate: %v", err.Error())
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		case STAGENAME_MOVE_FIELD_META:
 | 
			
		||||
			if len(mp.moveFieldToMeta) > 0 {
 | 
			
		||||
				cclog.ComponentDebug("MessageProcessor", "Move field to meta")
 | 
			
		||||
				_, err := moveFieldToMeta(out, ¶ms, &mp.moveFieldToMeta)
 | 
			
		||||
				if err != nil {
 | 
			
		||||
					return out, fmt.Errorf("failed to evaluate: %v", err.Error())
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		case STAGENAME_MOVE_FIELD_TAG:
 | 
			
		||||
			if len(mp.moveFieldToTag) > 0 {
 | 
			
		||||
				cclog.ComponentDebug("MessageProcessor", "Move field to tags")
 | 
			
		||||
				_, err := moveFieldToTag(out, ¶ms, &mp.moveFieldToTag)
 | 
			
		||||
				if err != nil {
 | 
			
		||||
					return out, fmt.Errorf("failed to evaluate: %v", err.Error())
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		case STAGENAME_NORMALIZE_UNIT:
 | 
			
		||||
			if mp.normalizeUnits {
 | 
			
		||||
				cclog.ComponentDebug("MessageProcessor", "Normalize units")
 | 
			
		||||
				if lp2.IsMetric(out) {
 | 
			
		||||
					_, err := normalizeUnits(out)
 | 
			
		||||
					if err != nil {
 | 
			
		||||
						return out, fmt.Errorf("failed to evaluate: %v", err.Error())
 | 
			
		||||
					}
 | 
			
		||||
				} else {
 | 
			
		||||
					cclog.ComponentDebug("MessageProcessor", "skipped, no metric")
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
		case STAGENAME_CHANGE_UNIT_PREFIX:
 | 
			
		||||
			if len(mp.changeUnitPrefix) > 0 {
 | 
			
		||||
				cclog.ComponentDebug("MessageProcessor", "Change unit prefix")
 | 
			
		||||
				if lp2.IsMetric(out) {
 | 
			
		||||
					_, err := changeUnitPrefix(out, ¶ms, &mp.changeUnitPrefix)
 | 
			
		||||
					if err != nil {
 | 
			
		||||
						return out, fmt.Errorf("failed to evaluate: %v", err.Error())
 | 
			
		||||
					}
 | 
			
		||||
				} else {
 | 
			
		||||
					cclog.ComponentDebug("MessageProcessor", "skipped, no metric")
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return out, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Get a new instace of a message processor.
 | 
			
		||||
func NewMessageProcessor() (MessageProcessor, error) {
 | 
			
		||||
	mp := new(messageProcessor)
 | 
			
		||||
	err := mp.init()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		err := fmt.Errorf("failed to create MessageProcessor: %v", err.Error())
 | 
			
		||||
		cclog.ComponentError("MessageProcessor", err.Error())
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	return mp, nil
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										265
									
								
								pkg/messageProcessor/messageProcessorFuncs.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										265
									
								
								pkg/messageProcessor/messageProcessorFuncs.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,265 @@
 | 
			
		||||
package messageprocessor
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"errors"
 | 
			
		||||
	"fmt"
 | 
			
		||||
 | 
			
		||||
	lp2 "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message"
 | 
			
		||||
	cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
 | 
			
		||||
	units "github.com/ClusterCockpit/cc-units"
 | 
			
		||||
	"github.com/expr-lang/expr"
 | 
			
		||||
	"github.com/expr-lang/expr/vm"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type MessageLocation int
 | 
			
		||||
 | 
			
		||||
const (
 | 
			
		||||
	MESSAGE_LOCATION_TAGS MessageLocation = iota
 | 
			
		||||
	MESSAGE_LOCATION_META
 | 
			
		||||
	MESSAGE_LOCATION_FIELDS
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// Abstract function to move entries from one location to another
 | 
			
		||||
func moveInMessage(message lp2.CCMessage, params *map[string]interface{}, checks *map[*vm.Program]messageProcessorTagConfig, from, to MessageLocation) (bool, error) {
 | 
			
		||||
	for d, data := range *checks {
 | 
			
		||||
		value, err := expr.Run(d, *params)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return false, fmt.Errorf("failed to evaluate: %v", err.Error())
 | 
			
		||||
		}
 | 
			
		||||
		cclog.ComponentDebug("MessageProcessor", "Move from", from, "to", to)
 | 
			
		||||
		if value.(bool) {
 | 
			
		||||
			var v string
 | 
			
		||||
			var ok bool = false
 | 
			
		||||
			switch from {
 | 
			
		||||
			case MESSAGE_LOCATION_TAGS:
 | 
			
		||||
				cclog.ComponentDebug("MessageProcessor", "Getting tag key", data.Key)
 | 
			
		||||
				v, ok = message.GetTag(data.Key)
 | 
			
		||||
			case MESSAGE_LOCATION_META:
 | 
			
		||||
				cclog.ComponentDebug("MessageProcessor", "Getting meta key", data.Key)
 | 
			
		||||
				cclog.ComponentDebug("MessageProcessor", message.Meta())
 | 
			
		||||
				v, ok = message.GetMeta(data.Key)
 | 
			
		||||
			case MESSAGE_LOCATION_FIELDS:
 | 
			
		||||
				var x interface{}
 | 
			
		||||
				cclog.ComponentDebug("MessageProcessor", "Getting field key", data.Key)
 | 
			
		||||
				x, ok = message.GetField(data.Key)
 | 
			
		||||
				v = fmt.Sprintf("%v", x)
 | 
			
		||||
			}
 | 
			
		||||
			if ok {
 | 
			
		||||
				switch from {
 | 
			
		||||
				case MESSAGE_LOCATION_TAGS:
 | 
			
		||||
					cclog.ComponentDebug("MessageProcessor", "Removing tag key", data.Key)
 | 
			
		||||
					message.RemoveTag(data.Key)
 | 
			
		||||
				case MESSAGE_LOCATION_META:
 | 
			
		||||
					cclog.ComponentDebug("MessageProcessor", "Removing meta key", data.Key)
 | 
			
		||||
					message.RemoveMeta(data.Key)
 | 
			
		||||
				case MESSAGE_LOCATION_FIELDS:
 | 
			
		||||
					cclog.ComponentDebug("MessageProcessor", "Removing field key", data.Key)
 | 
			
		||||
					message.RemoveField(data.Key)
 | 
			
		||||
				}
 | 
			
		||||
				switch to {
 | 
			
		||||
				case MESSAGE_LOCATION_TAGS:
 | 
			
		||||
					cclog.ComponentDebug("MessageProcessor", "Adding tag", data.Value, "->", v)
 | 
			
		||||
					message.AddTag(data.Value, v)
 | 
			
		||||
				case MESSAGE_LOCATION_META:
 | 
			
		||||
					cclog.ComponentDebug("MessageProcessor", "Adding meta", data.Value, "->", v)
 | 
			
		||||
					message.AddMeta(data.Value, v)
 | 
			
		||||
				case MESSAGE_LOCATION_FIELDS:
 | 
			
		||||
					cclog.ComponentDebug("MessageProcessor", "Adding field", data.Value, "->", v)
 | 
			
		||||
					message.AddField(data.Value, v)
 | 
			
		||||
				}
 | 
			
		||||
			} else {
 | 
			
		||||
				return false, fmt.Errorf("failed to get message entry: %s", data.Key)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return false, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func deleteIf(message lp2.CCMessage, params *map[string]interface{}, checks *map[*vm.Program]messageProcessorTagConfig, location MessageLocation) (bool, error) {
 | 
			
		||||
	for d, data := range *checks {
 | 
			
		||||
		value, err := expr.Run(d, *params)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return true, fmt.Errorf("failed to evaluate: %v", err.Error())
 | 
			
		||||
		}
 | 
			
		||||
		if value.(bool) {
 | 
			
		||||
			switch location {
 | 
			
		||||
			case MESSAGE_LOCATION_FIELDS:
 | 
			
		||||
				switch data.Key {
 | 
			
		||||
				case "value", "event", "log", "control":
 | 
			
		||||
					return false, errors.New("cannot delete protected fields")
 | 
			
		||||
				default:
 | 
			
		||||
					cclog.ComponentDebug("MessageProcessor", "Removing field for", data.Key)
 | 
			
		||||
					message.RemoveField(data.Key)
 | 
			
		||||
				}
 | 
			
		||||
			case MESSAGE_LOCATION_TAGS:
 | 
			
		||||
				cclog.ComponentDebug("MessageProcessor", "Removing tag for", data.Key)
 | 
			
		||||
				message.RemoveTag(data.Key)
 | 
			
		||||
			case MESSAGE_LOCATION_META:
 | 
			
		||||
				cclog.ComponentDebug("MessageProcessor", "Removing meta for", data.Key)
 | 
			
		||||
				message.RemoveMeta(data.Key)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return false, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func addIf(message lp2.CCMessage, params *map[string]interface{}, checks *map[*vm.Program]messageProcessorTagConfig, location MessageLocation) (bool, error) {
 | 
			
		||||
	for d, data := range *checks {
 | 
			
		||||
		value, err := expr.Run(d, *params)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return true, fmt.Errorf("failed to evaluate: %v", err.Error())
 | 
			
		||||
		}
 | 
			
		||||
		if value.(bool) {
 | 
			
		||||
			switch location {
 | 
			
		||||
			case MESSAGE_LOCATION_FIELDS:
 | 
			
		||||
				cclog.ComponentDebug("MessageProcessor", "Adding field", data.Value, "->", data.Value)
 | 
			
		||||
				message.AddField(data.Key, data.Value)
 | 
			
		||||
			case MESSAGE_LOCATION_TAGS:
 | 
			
		||||
				cclog.ComponentDebug("MessageProcessor", "Adding tag", data.Value, "->", data.Value)
 | 
			
		||||
				message.AddTag(data.Key, data.Value)
 | 
			
		||||
			case MESSAGE_LOCATION_META:
 | 
			
		||||
				cclog.ComponentDebug("MessageProcessor", "Adding meta", data.Value, "->", data.Value)
 | 
			
		||||
				message.AddMeta(data.Key, data.Value)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return false, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func deleteTagIf(message lp2.CCMessage, params *map[string]interface{}, checks *map[*vm.Program]messageProcessorTagConfig) (bool, error) {
 | 
			
		||||
	return deleteIf(message, params, checks, MESSAGE_LOCATION_TAGS)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func addTagIf(message lp2.CCMessage, params *map[string]interface{}, checks *map[*vm.Program]messageProcessorTagConfig) (bool, error) {
 | 
			
		||||
	return addIf(message, params, checks, MESSAGE_LOCATION_TAGS)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func moveTagToMeta(message lp2.CCMessage, params *map[string]interface{}, checks *map[*vm.Program]messageProcessorTagConfig) (bool, error) {
 | 
			
		||||
	return moveInMessage(message, params, checks, MESSAGE_LOCATION_TAGS, MESSAGE_LOCATION_META)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func moveTagToField(message lp2.CCMessage, params *map[string]interface{}, checks *map[*vm.Program]messageProcessorTagConfig) (bool, error) {
 | 
			
		||||
	return moveInMessage(message, params, checks, MESSAGE_LOCATION_TAGS, MESSAGE_LOCATION_FIELDS)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func deleteMetaIf(message lp2.CCMessage, params *map[string]interface{}, checks *map[*vm.Program]messageProcessorTagConfig) (bool, error) {
 | 
			
		||||
	return deleteIf(message, params, checks, MESSAGE_LOCATION_META)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func addMetaIf(message lp2.CCMessage, params *map[string]interface{}, checks *map[*vm.Program]messageProcessorTagConfig) (bool, error) {
 | 
			
		||||
	return addIf(message, params, checks, MESSAGE_LOCATION_META)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func moveMetaToTag(message lp2.CCMessage, params *map[string]interface{}, checks *map[*vm.Program]messageProcessorTagConfig) (bool, error) {
 | 
			
		||||
	return moveInMessage(message, params, checks, MESSAGE_LOCATION_META, MESSAGE_LOCATION_TAGS)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func moveMetaToField(message lp2.CCMessage, params *map[string]interface{}, checks *map[*vm.Program]messageProcessorTagConfig) (bool, error) {
 | 
			
		||||
	return moveInMessage(message, params, checks, MESSAGE_LOCATION_META, MESSAGE_LOCATION_FIELDS)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func deleteFieldIf(message lp2.CCMessage, params *map[string]interface{}, checks *map[*vm.Program]messageProcessorTagConfig) (bool, error) {
 | 
			
		||||
	return deleteIf(message, params, checks, MESSAGE_LOCATION_FIELDS)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func addFieldIf(message lp2.CCMessage, params *map[string]interface{}, checks *map[*vm.Program]messageProcessorTagConfig) (bool, error) {
 | 
			
		||||
	return addIf(message, params, checks, MESSAGE_LOCATION_FIELDS)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func moveFieldToTag(message lp2.CCMessage, params *map[string]interface{}, checks *map[*vm.Program]messageProcessorTagConfig) (bool, error) {
 | 
			
		||||
	return moveInMessage(message, params, checks, MESSAGE_LOCATION_FIELDS, MESSAGE_LOCATION_TAGS)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func moveFieldToMeta(message lp2.CCMessage, params *map[string]interface{}, checks *map[*vm.Program]messageProcessorTagConfig) (bool, error) {
 | 
			
		||||
	return moveInMessage(message, params, checks, MESSAGE_LOCATION_FIELDS, MESSAGE_LOCATION_META)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func dropMessagesIf(params *map[string]interface{}, checks *map[*vm.Program]struct{}) (bool, error) {
 | 
			
		||||
	for d := range *checks {
 | 
			
		||||
		value, err := expr.Run(d, *params)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return false, fmt.Errorf("failed to evaluate: %v", err.Error())
 | 
			
		||||
		}
 | 
			
		||||
		if value.(bool) {
 | 
			
		||||
			return true, nil
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return false, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func normalizeUnits(message lp2.CCMessage) (bool, error) {
 | 
			
		||||
	if in_unit, ok := message.GetMeta("unit"); ok {
 | 
			
		||||
		u := units.NewUnit(in_unit)
 | 
			
		||||
		if u.Valid() {
 | 
			
		||||
			cclog.ComponentDebug("MessageProcessor", "Update unit with", u.Short())
 | 
			
		||||
			message.AddMeta("unit", u.Short())
 | 
			
		||||
		}
 | 
			
		||||
	} else if in_unit, ok := message.GetTag("unit"); ok {
 | 
			
		||||
		u := units.NewUnit(in_unit)
 | 
			
		||||
		if u.Valid() {
 | 
			
		||||
			cclog.ComponentDebug("MessageProcessor", "Update unit with", u.Short())
 | 
			
		||||
			message.AddTag("unit", u.Short())
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return false, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func changeUnitPrefix(message lp2.CCMessage, params *map[string]interface{}, checks *map[*vm.Program]string) (bool, error) {
 | 
			
		||||
	for r, n := range *checks {
 | 
			
		||||
		value, err := expr.Run(r, *params)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return false, fmt.Errorf("failed to evaluate: %v", err.Error())
 | 
			
		||||
		}
 | 
			
		||||
		if value.(bool) {
 | 
			
		||||
			newPrefix := units.NewPrefix(n)
 | 
			
		||||
			cclog.ComponentDebug("MessageProcessor", "Condition matches, change to prefix", newPrefix.String())
 | 
			
		||||
			if in_unit, ok := message.GetMeta("unit"); ok && newPrefix != units.InvalidPrefix {
 | 
			
		||||
				u := units.NewUnit(in_unit)
 | 
			
		||||
				if u.Valid() {
 | 
			
		||||
					cclog.ComponentDebug("MessageProcessor", "Input unit", u.Short())
 | 
			
		||||
					conv, out_unit := units.GetUnitPrefixFactor(u, newPrefix)
 | 
			
		||||
					if conv != nil && out_unit.Valid() {
 | 
			
		||||
						if val, ok := message.GetField("value"); ok {
 | 
			
		||||
							cclog.ComponentDebug("MessageProcessor", "Update unit with", out_unit.Short())
 | 
			
		||||
							message.AddField("value", conv(val))
 | 
			
		||||
							message.AddMeta("unit", out_unit.Short())
 | 
			
		||||
						}
 | 
			
		||||
					}
 | 
			
		||||
				}
 | 
			
		||||
 | 
			
		||||
			} else if in_unit, ok := message.GetTag("unit"); ok && newPrefix != units.InvalidPrefix {
 | 
			
		||||
				u := units.NewUnit(in_unit)
 | 
			
		||||
				if u.Valid() {
 | 
			
		||||
					cclog.ComponentDebug("MessageProcessor", "Input unit", u.Short())
 | 
			
		||||
					conv, out_unit := units.GetUnitPrefixFactor(u, newPrefix)
 | 
			
		||||
					if conv != nil && out_unit.Valid() {
 | 
			
		||||
						if val, ok := message.GetField("value"); ok {
 | 
			
		||||
							cclog.ComponentDebug("MessageProcessor", "Update unit with", out_unit.Short())
 | 
			
		||||
							message.AddField("value", conv(val))
 | 
			
		||||
							message.AddTag("unit", out_unit.Short())
 | 
			
		||||
						}
 | 
			
		||||
					}
 | 
			
		||||
				}
 | 
			
		||||
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return false, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func renameMessagesIf(message lp2.CCMessage, params *map[string]interface{}, checks *map[*vm.Program]string) (bool, error) {
 | 
			
		||||
	for d, n := range *checks {
 | 
			
		||||
		value, err := expr.Run(d, *params)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return true, fmt.Errorf("failed to evaluate: %v", err.Error())
 | 
			
		||||
		}
 | 
			
		||||
		if value.(bool) {
 | 
			
		||||
			old := message.Name()
 | 
			
		||||
			cclog.ComponentDebug("MessageProcessor", "Rename to", n)
 | 
			
		||||
			message.SetName(n)
 | 
			
		||||
			cclog.ComponentDebug("MessageProcessor", "Add old name as 'oldname' to meta", old)
 | 
			
		||||
			message.AddMeta("oldname", old)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return false, nil
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										396
									
								
								pkg/messageProcessor/messageProcessor_test.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										396
									
								
								pkg/messageProcessor/messageProcessor_test.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,396 @@
 | 
			
		||||
package messageprocessor
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"encoding/json"
 | 
			
		||||
	"errors"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"testing"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message"
 | 
			
		||||
	cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func generate_message_lists(num_lists, num_entries int) ([][]lp.CCMessage, error) {
 | 
			
		||||
	mlist := make([][]lp.CCMessage, 0)
 | 
			
		||||
	for j := 0; j < num_lists; j++ {
 | 
			
		||||
		out := make([]lp.CCMessage, 0)
 | 
			
		||||
		for i := 0; i < num_entries; i++ {
 | 
			
		||||
			var x lp.CCMessage
 | 
			
		||||
			var err error = nil
 | 
			
		||||
			switch {
 | 
			
		||||
			case i%4 == 0:
 | 
			
		||||
				x, err = lp.NewEvent("myevent", map[string]string{"type": "socket", "type-id": "0"}, map[string]string{}, "nothing happend", time.Now())
 | 
			
		||||
			case i%4 == 1:
 | 
			
		||||
				x, err = lp.NewMetric("mymetric", map[string]string{"type": "socket", "type-id": "0"}, map[string]string{"unit": "kByte"}, 12.145, time.Now())
 | 
			
		||||
			case i%4 == 2:
 | 
			
		||||
				x, err = lp.NewLog("mylog", map[string]string{"type": "socket", "type-id": "0"}, map[string]string{}, "disk status: OK", time.Now())
 | 
			
		||||
			case i%4 == 3:
 | 
			
		||||
				x, err = lp.NewGetControl("mycontrol", map[string]string{"type": "socket", "type-id": "0"}, map[string]string{}, time.Now())
 | 
			
		||||
			}
 | 
			
		||||
			if err == nil {
 | 
			
		||||
				x.AddTag("hostname", "myhost")
 | 
			
		||||
				out = append(out, x)
 | 
			
		||||
			} else {
 | 
			
		||||
				return nil, errors.New("failed to create message")
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		mlist = append(mlist, out)
 | 
			
		||||
	}
 | 
			
		||||
	return mlist, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestNewMessageProcessor(t *testing.T) {
 | 
			
		||||
	_, err := NewMessageProcessor()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Error(err.Error())
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type Configs struct {
 | 
			
		||||
	name   string
 | 
			
		||||
	config json.RawMessage
 | 
			
		||||
	drop   bool
 | 
			
		||||
	errors bool
 | 
			
		||||
	pre    func(msg lp.CCMessage) error
 | 
			
		||||
	check  func(msg lp.CCMessage) error
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
var test_configs = []Configs{
 | 
			
		||||
	{
 | 
			
		||||
		name:   "single_dropif_nomatch",
 | 
			
		||||
		config: json.RawMessage(`{"drop_messages_if": [ "name == 'testname' && tags.type == 'socket' && tags.typeid % 2 == 1"]}`),
 | 
			
		||||
	},
 | 
			
		||||
	{
 | 
			
		||||
		name:   "drop_by_name",
 | 
			
		||||
		config: json.RawMessage(`{"drop_messages": [ "net_bytes_in"]}`),
 | 
			
		||||
		drop:   true,
 | 
			
		||||
	},
 | 
			
		||||
	{
 | 
			
		||||
		name:   "drop_by_type_match",
 | 
			
		||||
		config: json.RawMessage(`{"drop_by_message_type": [ "metric"]}`),
 | 
			
		||||
		drop:   true,
 | 
			
		||||
	},
 | 
			
		||||
	{
 | 
			
		||||
		name:   "drop_by_type_nomatch",
 | 
			
		||||
		config: json.RawMessage(`{"drop_by_message_type": [ "event"]}`),
 | 
			
		||||
	},
 | 
			
		||||
	{
 | 
			
		||||
		name:   "single_dropif_match",
 | 
			
		||||
		config: json.RawMessage(`{"drop_messages_if": [ "name == 'net_bytes_in' && tags.type == 'node'"]}`),
 | 
			
		||||
		drop:   true,
 | 
			
		||||
	},
 | 
			
		||||
	{
 | 
			
		||||
		name:   "double_dropif_match_nomatch",
 | 
			
		||||
		config: json.RawMessage(`{"drop_messages_if": [ "name == 'net_bytes_in' && tags.type == 'node'", "name == 'testname' && tags.type == 'socket' && tags.typeid % 2 == 1"]}`),
 | 
			
		||||
		drop:   true,
 | 
			
		||||
	},
 | 
			
		||||
	{
 | 
			
		||||
		name:   "rename_simple",
 | 
			
		||||
		config: json.RawMessage(`{"rename_messages": { "net_bytes_in" : "net_bytes_out", "rapl_power": "cpu_power"}}`),
 | 
			
		||||
		check: func(msg lp.CCMessage) error {
 | 
			
		||||
			if msg.Name() != "net_bytes_out" {
 | 
			
		||||
				return errors.New("expected name net_bytes_out but still have net_bytes_in")
 | 
			
		||||
			}
 | 
			
		||||
			return nil
 | 
			
		||||
		},
 | 
			
		||||
	},
 | 
			
		||||
	{
 | 
			
		||||
		name:   "rename_match",
 | 
			
		||||
		config: json.RawMessage(`{"rename_messages_if": { "name == 'net_bytes_in'" : "net_bytes_out", "name == 'rapl_power'": "cpu_power"}}`),
 | 
			
		||||
		check: func(msg lp.CCMessage) error {
 | 
			
		||||
			if msg.Name() != "net_bytes_out" {
 | 
			
		||||
				return errors.New("expected name net_bytes_out but still have net_bytes_in")
 | 
			
		||||
			}
 | 
			
		||||
			return nil
 | 
			
		||||
		},
 | 
			
		||||
	},
 | 
			
		||||
	{
 | 
			
		||||
		name:   "rename_nomatch",
 | 
			
		||||
		config: json.RawMessage(`{"rename_messages_if": { "name == 'net_bytes_out'" : "net_bytes_in", "name == 'rapl_power'": "cpu_power"}}`),
 | 
			
		||||
		check: func(msg lp.CCMessage) error {
 | 
			
		||||
			if msg.Name() != "net_bytes_in" {
 | 
			
		||||
				return errors.New("expected name net_bytes_in but still have net_bytes_out")
 | 
			
		||||
			}
 | 
			
		||||
			return nil
 | 
			
		||||
		},
 | 
			
		||||
	},
 | 
			
		||||
	{
 | 
			
		||||
		name:   "add_tag",
 | 
			
		||||
		config: json.RawMessage(`{"add_tags_if": [{"if": "name == 'net_bytes_in'", "key" : "cluster", "value" : "mycluster"}]}`),
 | 
			
		||||
		check: func(msg lp.CCMessage) error {
 | 
			
		||||
			if !msg.HasTag("cluster") {
 | 
			
		||||
				return errors.New("expected new tag 'cluster' but not present")
 | 
			
		||||
			}
 | 
			
		||||
			return nil
 | 
			
		||||
		},
 | 
			
		||||
	},
 | 
			
		||||
	{
 | 
			
		||||
		name:   "del_tag",
 | 
			
		||||
		config: json.RawMessage(`{"delete_tags_if": [{"if": "name == 'net_bytes_in'", "key" : "type"}]}`),
 | 
			
		||||
		check: func(msg lp.CCMessage) error {
 | 
			
		||||
			if msg.HasTag("type") {
 | 
			
		||||
				return errors.New("expected to have no 'type' but still present")
 | 
			
		||||
			}
 | 
			
		||||
			return nil
 | 
			
		||||
		},
 | 
			
		||||
	},
 | 
			
		||||
	{
 | 
			
		||||
		name:   "add_meta",
 | 
			
		||||
		config: json.RawMessage(`{"add_meta_if": [{"if": "name == 'net_bytes_in'", "key" : "source", "value" : "example"}]}`),
 | 
			
		||||
		check: func(msg lp.CCMessage) error {
 | 
			
		||||
			if !msg.HasMeta("source") {
 | 
			
		||||
				return errors.New("expected new tag 'source' but not present")
 | 
			
		||||
			}
 | 
			
		||||
			return nil
 | 
			
		||||
		},
 | 
			
		||||
	},
 | 
			
		||||
	{
 | 
			
		||||
		name:   "del_meta",
 | 
			
		||||
		config: json.RawMessage(`{"delete_meta_if": [{"if": "name == 'net_bytes_in'", "key" : "unit"}]}`),
 | 
			
		||||
		check: func(msg lp.CCMessage) error {
 | 
			
		||||
			if msg.HasMeta("unit") {
 | 
			
		||||
				return errors.New("expected to have no 'unit' but still present")
 | 
			
		||||
			}
 | 
			
		||||
			return nil
 | 
			
		||||
		},
 | 
			
		||||
	},
 | 
			
		||||
	{
 | 
			
		||||
		name:   "add_field",
 | 
			
		||||
		config: json.RawMessage(`{"add_fields_if": [{"if": "name == 'net_bytes_in'", "key" : "myfield", "value" : "example"}]}`),
 | 
			
		||||
		check: func(msg lp.CCMessage) error {
 | 
			
		||||
			if !msg.HasField("myfield") {
 | 
			
		||||
				return errors.New("expected new tag 'source' but not present")
 | 
			
		||||
			}
 | 
			
		||||
			return nil
 | 
			
		||||
		},
 | 
			
		||||
	},
 | 
			
		||||
	{
 | 
			
		||||
		name:   "delete_fields_if_protected",
 | 
			
		||||
		config: json.RawMessage(`{"delete_fields_if": [{"if": "name == 'net_bytes_in'", "key" : "value"}]}`),
 | 
			
		||||
		errors: true,
 | 
			
		||||
		check: func(msg lp.CCMessage) error {
 | 
			
		||||
			if !msg.HasField("value") {
 | 
			
		||||
				return errors.New("expected to still have 'value' field because it is a protected field key")
 | 
			
		||||
			}
 | 
			
		||||
			return nil
 | 
			
		||||
		},
 | 
			
		||||
	},
 | 
			
		||||
	{
 | 
			
		||||
		name:   "delete_fields_if_unprotected",
 | 
			
		||||
		config: json.RawMessage(`{"delete_fields_if": [{"if": "name == 'net_bytes_in'", "key" : "testfield"}]}`),
 | 
			
		||||
		check: func(msg lp.CCMessage) error {
 | 
			
		||||
			if msg.HasField("testfield") {
 | 
			
		||||
				return errors.New("expected to still have 'testfield' field but should be deleted")
 | 
			
		||||
			}
 | 
			
		||||
			return nil
 | 
			
		||||
		},
 | 
			
		||||
		pre: func(msg lp.CCMessage) error {
 | 
			
		||||
			msg.AddField("testfield", 4.123)
 | 
			
		||||
			return nil
 | 
			
		||||
		},
 | 
			
		||||
	},
 | 
			
		||||
	{
 | 
			
		||||
		name:   "single_change_prefix_match",
 | 
			
		||||
		config: json.RawMessage(`{"change_unit_prefix": {"name == 'net_bytes_in' && tags.type == 'node'": "M"}}`),
 | 
			
		||||
		check: func(msg lp.CCMessage) error {
 | 
			
		||||
			if u, ok := msg.GetMeta("unit"); ok {
 | 
			
		||||
				if u != "MB" {
 | 
			
		||||
					return fmt.Errorf("expected unit MB but have %s", u)
 | 
			
		||||
				}
 | 
			
		||||
			} else if u, ok := msg.GetTag("unit"); ok {
 | 
			
		||||
				if u != "MB" {
 | 
			
		||||
					return fmt.Errorf("expected unit MB but have %s", u)
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
			return nil
 | 
			
		||||
		},
 | 
			
		||||
	},
 | 
			
		||||
	{
 | 
			
		||||
		name:   "normalize_units",
 | 
			
		||||
		config: json.RawMessage(`{"normalize_units": true}`),
 | 
			
		||||
		check: func(msg lp.CCMessage) error {
 | 
			
		||||
			if u, ok := msg.GetMeta("unit"); ok {
 | 
			
		||||
				if u != "B" {
 | 
			
		||||
					return fmt.Errorf("expected unit B but have %s", u)
 | 
			
		||||
				}
 | 
			
		||||
			} else if u, ok := msg.GetTag("unit"); ok {
 | 
			
		||||
				if u != "B" {
 | 
			
		||||
					return fmt.Errorf("expected unit B but have %s", u)
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
			return nil
 | 
			
		||||
		},
 | 
			
		||||
	},
 | 
			
		||||
	{
 | 
			
		||||
		name:   "move_tag_to_meta",
 | 
			
		||||
		config: json.RawMessage(`{"move_tag_to_meta_if": [{"if": "name == 'net_bytes_in'", "key" : "type-id", "value": "typeid"}]}`),
 | 
			
		||||
		check: func(msg lp.CCMessage) error {
 | 
			
		||||
			if msg.HasTag("type-id") || !msg.HasMeta("typeid") {
 | 
			
		||||
				return errors.New("moving tag 'type-id' to meta 'typeid' failed")
 | 
			
		||||
			}
 | 
			
		||||
			return nil
 | 
			
		||||
		},
 | 
			
		||||
		pre: func(msg lp.CCMessage) error {
 | 
			
		||||
			msg.AddTag("type-id", "0")
 | 
			
		||||
			return nil
 | 
			
		||||
		},
 | 
			
		||||
	},
 | 
			
		||||
	{
 | 
			
		||||
		name:   "move_tag_to_field",
 | 
			
		||||
		config: json.RawMessage(`{"move_tag_to_field_if": [{"if": "name == 'net_bytes_in'", "key" : "type-id", "value": "typeid"}]}`),
 | 
			
		||||
		check: func(msg lp.CCMessage) error {
 | 
			
		||||
			if msg.HasTag("type-id") || !msg.HasField("typeid") {
 | 
			
		||||
				return errors.New("moving tag 'type-id' to field 'typeid' failed")
 | 
			
		||||
			}
 | 
			
		||||
			return nil
 | 
			
		||||
		},
 | 
			
		||||
		pre: func(msg lp.CCMessage) error {
 | 
			
		||||
			msg.AddTag("type-id", "0")
 | 
			
		||||
			return nil
 | 
			
		||||
		},
 | 
			
		||||
	},
 | 
			
		||||
	{
 | 
			
		||||
		name:   "move_meta_to_tag",
 | 
			
		||||
		config: json.RawMessage(`{"move_meta_to_tag_if": [{"if": "name == 'net_bytes_in'", "key" : "unit", "value": "unit"}]}`),
 | 
			
		||||
		check: func(msg lp.CCMessage) error {
 | 
			
		||||
			if msg.HasMeta("unit") || !msg.HasTag("unit") {
 | 
			
		||||
				return errors.New("moving meta 'unit' to tag 'unit' failed")
 | 
			
		||||
			}
 | 
			
		||||
			return nil
 | 
			
		||||
		},
 | 
			
		||||
	},
 | 
			
		||||
	{
 | 
			
		||||
		name:   "move_meta_to_field",
 | 
			
		||||
		config: json.RawMessage(`{"move_meta_to_field_if": [{"if": "name == 'net_bytes_in'", "key" : "unit", "value": "unit"}]}`),
 | 
			
		||||
		check: func(msg lp.CCMessage) error {
 | 
			
		||||
			if msg.HasMeta("unit") || !msg.HasField("unit") {
 | 
			
		||||
				return errors.New("moving meta 'unit' to field 'unit' failed")
 | 
			
		||||
			}
 | 
			
		||||
			return nil
 | 
			
		||||
		},
 | 
			
		||||
	},
 | 
			
		||||
	{
 | 
			
		||||
		name:   "move_field_to_tag",
 | 
			
		||||
		config: json.RawMessage(`{"move_field_to_tag_if": [{"if": "name == 'net_bytes_in'", "key" : "myfield", "value": "field"}]}`),
 | 
			
		||||
		check: func(msg lp.CCMessage) error {
 | 
			
		||||
			if msg.HasField("myfield") || !msg.HasTag("field") {
 | 
			
		||||
				return errors.New("moving meta 'myfield' to tag 'field' failed")
 | 
			
		||||
			}
 | 
			
		||||
			return nil
 | 
			
		||||
		},
 | 
			
		||||
		pre: func(msg lp.CCMessage) error {
 | 
			
		||||
			msg.AddField("myfield", 12)
 | 
			
		||||
			return nil
 | 
			
		||||
		},
 | 
			
		||||
	},
 | 
			
		||||
	{
 | 
			
		||||
		name:   "move_field_to_meta",
 | 
			
		||||
		config: json.RawMessage(`{"move_field_to_meta_if": [{"if": "name == 'net_bytes_in'", "key" : "myfield", "value": "field"}]}`),
 | 
			
		||||
		check: func(msg lp.CCMessage) error {
 | 
			
		||||
			if msg.HasField("myfield") || !msg.HasMeta("field") {
 | 
			
		||||
				return errors.New("moving meta 'myfield' to meta 'field' failed")
 | 
			
		||||
			}
 | 
			
		||||
			return nil
 | 
			
		||||
		},
 | 
			
		||||
		pre: func(msg lp.CCMessage) error {
 | 
			
		||||
			msg.AddField("myfield", 12)
 | 
			
		||||
			return nil
 | 
			
		||||
		},
 | 
			
		||||
	},
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestConfigList(t *testing.T) {
 | 
			
		||||
	for _, c := range test_configs {
 | 
			
		||||
		t.Run(c.name, func(t *testing.T) {
 | 
			
		||||
			m, err := lp.NewMetric("net_bytes_in", map[string]string{"type": "node", "type-id": "0"}, map[string]string{"unit": "Byte"}, float64(1024.0), time.Now())
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				t.Error(err.Error())
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
			if c.pre != nil {
 | 
			
		||||
				if err = c.pre(m); err != nil {
 | 
			
		||||
					t.Errorf("error running pre-test function: %v", err.Error())
 | 
			
		||||
					return
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			mp, err := NewMessageProcessor()
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				t.Error(err.Error())
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
			err = mp.FromConfigJSON(c.config)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				t.Error(err.Error())
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
			//t.Log(m.ToLineProtocol(nil))
 | 
			
		||||
			out, err := mp.ProcessMessage(m)
 | 
			
		||||
			if err != nil && !c.errors {
 | 
			
		||||
				cclog.SetDebug()
 | 
			
		||||
				mp.ProcessMessage(m)
 | 
			
		||||
				t.Error(err.Error())
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
			if out == nil && !c.drop {
 | 
			
		||||
				t.Error("fail, message should NOT be dropped but processor signalled dropping")
 | 
			
		||||
				return
 | 
			
		||||
			} else if out != nil && c.drop {
 | 
			
		||||
				t.Error("fail, message should be dropped but processor signalled NO dropping")
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
			// {
 | 
			
		||||
			// 	if c.drop {
 | 
			
		||||
			// 		t.Error("fail, message should be dropped but processor signalled NO dropping")
 | 
			
		||||
			// 	} else {
 | 
			
		||||
			// 		t.Error("fail, message should NOT be dropped but processor signalled dropping")
 | 
			
		||||
			// 	}
 | 
			
		||||
			// 	cclog.SetDebug()
 | 
			
		||||
			// 	mp.ProcessMessage(m)
 | 
			
		||||
			// 	return
 | 
			
		||||
			// }
 | 
			
		||||
			if c.check != nil {
 | 
			
		||||
				if err := c.check(out); err != nil {
 | 
			
		||||
					t.Errorf("check failed with %v", err.Error())
 | 
			
		||||
					t.Log("Rerun with debugging")
 | 
			
		||||
					cclog.SetDebug()
 | 
			
		||||
					mp.ProcessMessage(m)
 | 
			
		||||
					return
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		})
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func BenchmarkProcessing(b *testing.B) {
 | 
			
		||||
 | 
			
		||||
	mlist, err := generate_message_lists(b.N, 1000)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		b.Error(err.Error())
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	mp, err := NewMessageProcessor()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		b.Error(err.Error())
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	err = mp.FromConfigJSON(json.RawMessage(`{"move_meta_to_tag_if": [{"if" : "name == 'mymetric'", "key":"unit", "value":"unit"}]}`))
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		b.Error(err.Error())
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	b.StartTimer()
 | 
			
		||||
	for i := 0; i < b.N; i++ {
 | 
			
		||||
		for _, m := range mlist[i] {
 | 
			
		||||
			if _, err := mp.ProcessMessage(m); err != nil {
 | 
			
		||||
				b.Errorf("failed processing message '%s': %v", m.ToLineProtocol(nil), err.Error())
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	b.StopTimer()
 | 
			
		||||
	b.ReportMetric(float64(b.Elapsed())/float64(len(mlist)*b.N), "ns/message")
 | 
			
		||||
}
 | 
			
		||||
		Reference in New Issue
	
	Block a user