cc-metric-collector/pkg/messageProcessor
2024-12-23 17:52:34 +01:00
..
messageProcessor_test.go New Message processor (#118) 2024-12-11 19:06:50 +01:00
messageProcessor.go Mark all JSON config fields of message processor as omitempty 2024-12-23 17:52:34 +01:00
messageProcessorFuncs.go Merge develop branch into main (#123) 2024-12-19 23:00:14 +01:00
README.md Merge develop branch into main (#123) 2024-12-19 23:00:14 +01:00

Message Processor Component

Multiple parts of in the ClusterCockit ecosystem require the processing of CCMessages. The main CC application using it is cc-metric-collector. The processing part there was originally in the metric router, the central hub connecting collectors (reading local data), receivers (receiving remote data) and sinks (sending data). Already in early stages, the lack of flexibility caused some trouble:

The sysadmins wanted to keep operating their Ganglia based monitoring infrastructure while we developed the CC stack. Ganglia wants the core metrics with a specific name and resolution (right unit prefix) but there was no conversion of the data in the CC stack, so CC frontend developers wanted a different resolution for some metrics. The issue was basically the mem_used metric showing the currently used memory of the node. Ganglia wants it in kByte as provided by the Linux operating system but CC wanted it in GByte.

With the message processor, the Ganglia sinks can apply the unit prefix changes individually and name the metrics as required by Ganglia.

For developers

Whenever you receive or are about to send a message out, you should provide some processing.

Configuration of component

New operations can be added to the message processor at runtime. Of course, they can also be removed again. For the initial setup, having a configuration file or some fields in a configuration file for the processing.

The message processor uses the following configuration

{
	"drop_messages": [
		"name_of_message_to_drop"
	],
	"drop_messages_if": [
		"condition_when_to_drop_message",
		"name == 'drop_this'",
		"tag.hostname == 'this_host'",
		"meta.unit != 'MB'"
	],
	"rename_messages" : {
		"old_message_name" : "new_message_name"
	},
	"rename_messages_if": {
		"condition_when_to_rename_message" : "new_name"
	},
	"add_tags_if": [
		{
			"if" : "condition_when_to_add_tag",
			"key": "name_for_new_tag",
			"value": "new_tag_value"
		}
	],
	"delete_tags_if": [
		{
			"if" : "condition_when_to_delete_tag",
			"key": "name_of_tag"
		}
	],
	"add_meta_if": [
		{
			"if" : "condition_when_to_add_meta_info",
			"key": "name_for_new_meta_info",
			"value": "new_meta_info_value"
		}
	],
	"delete_meta_if": [
		{
			"if" : "condition_when_to_delete_meta_info",
			"key": "name_of_meta_info"
		}
	],
	"add_field_if": [
		{
			"if" : "condition_when_to_add_field",
			"key": "name_for_new_field",
			"value": "new_field_value_but_only_string_at_the_moment"
		}
	],
	"delete_field_if": [
		{
			"if" : "condition_when_to_delete_field",
			"key": "name_of_field"
		}
	],
	"move_tag_to_meta_if": [
		{
			"if" : "condition_when_to_move_tag_to_meta_info_including_its_value",
			"key": "name_of_tag",
			"value": "name_of_meta_info"
		}
	],
	"move_tag_to_field_if": [
		{
			"if" : "condition_when_to_move_tag_to_fields_including_its_value",
			"key": "name_of_tag",
			"value": "name_of_field"
		}
	],
	"move_meta_to_tag_if": [
		{
			"if" : "condition_when_to_move_meta_info_to_tags_including_its_value",
			"key": "name_of_meta_info",
			"value": "name_of_tag"
		}
	],
	"move_meta_to_field_if": [
		{
			"if" : "condition_when_to_move_meta_info_to_fields_including_its_value",
			"key": "name_of_tag",
			"value": "name_of_meta_info"
		}
	],
	"move_field_to_tag_if": [
		{
			"if" : "condition_when_to_move_field_to_tags_including_its_stringified_value",
			"key": "name_of_field",
			"value": "name_of_tag"
		}
	],
	"move_field_to_meta_if": [
		{
			"if" : "condition_when_to_move_field_to_meta_info_including_its_stringified_value",
			"key": "name_of_field",
			"value": "name_of_meta_info"
		}
	],
	"drop_by_message_type": [
		"metric",
		"event",
		"log",
		"control"
	],
	"change_unit_prefix": {
		"name == 'metric_with_wrong_unit_prefix'" : "G",
		"only_if_messagetype == 'metric'": "T"
	},
	"normalize_units": true,
	"add_base_env": {
		"MY_CONSTANT_FOR_CUSTOM_CONDITIONS": 1.0,
		"output_value_for_test_metrics": 42.0,
	},
	"stage_order": [
		"rename_messages_if",
		"drop_messages"
	]
}

The options change_unit_prefix and normalize_units are only applied to CCMetrics. It is not possible to delete the field related to each message type as defined in cc-specification. In short:

  • CCMetrics always have to have a field named value
  • CCEvents always have to have a field named event
  • CCLogs always have to have a field named log
  • CCControl messages always have to have a field named control

With add_base_env, one can specifiy mykey=myvalue pairs that can be used in conditions like tag.type == mykey.

The order in which each message is processed, can be specified with the stage_order option. The stage names are the keys in the JSON configuration, thus change_unit_prefix, move_field_to_meta_if, etc. Stages can be listed multiple times.

Using the component

In order to load the configuration from a json.RawMessage:

mp, err := NewMessageProcessor()
if err != nil {
	log.Error("failed to create new message processor")
}
mp.FromConfigJSON(configJson)

After initialization and adding the different operations, the ProcessMessage() function applies all operations and returns whether the message should be dropped.

m := lp.CCMetric{}

x, err := mp.ProcessMessage(m)
if err != nil {
	// handle error
}
if x != nil {
    // process x further
} else {
	// this message got dropped
}

Single operations can be added and removed at runtime

type MessageProcessor interface {
	// Functions to set the execution order of the processing stages
	SetStages([]string) error
	DefaultStages() []string
	// Function to add variables to the base evaluation environment
	AddBaseEnv(env map[string]interface{}) error
	// Functions to add and remove rules
	AddDropMessagesByName(name string) error
	RemoveDropMessagesByName(name string)
	AddDropMessagesByCondition(condition string) error
	RemoveDropMessagesByCondition(condition string)
	AddRenameMetricByCondition(condition string, name string) error
	RemoveRenameMetricByCondition(condition string)
	AddRenameMetricByName(from, to string) error
	RemoveRenameMetricByName(from string)
	SetNormalizeUnits(settings bool)
	AddChangeUnitPrefix(condition string, prefix string) error
	RemoveChangeUnitPrefix(condition string)
	AddAddTagsByCondition(condition, key, value string) error
	RemoveAddTagsByCondition(condition string)
	AddDeleteTagsByCondition(condition, key, value string) error
	RemoveDeleteTagsByCondition(condition string)
	AddAddMetaByCondition(condition, key, value string) error
	RemoveAddMetaByCondition(condition string)
	AddDeleteMetaByCondition(condition, key, value string) error
	RemoveDeleteMetaByCondition(condition string)
	AddMoveTagToMeta(condition, key, value string) error
	RemoveMoveTagToMeta(condition string)
	AddMoveTagToFields(condition, key, value string) error
	RemoveMoveTagToFields(condition string)
	AddMoveMetaToTags(condition, key, value string) error
	RemoveMoveMetaToTags(condition string)
	AddMoveMetaToFields(condition, key, value string) error
	RemoveMoveMetaToFields(condition string)
	AddMoveFieldToTags(condition, key, value string) error
	RemoveMoveFieldToTags(condition string)
	AddMoveFieldToMeta(condition, key, value string) error
	RemoveMoveFieldToMeta(condition string)
	// Read in a JSON configuration
	FromConfigJSON(config json.RawMessage) error
	ProcessMessage(m lp2.CCMessage) (lp2.CCMessage, error)
	// Processing functions for legacy CCMetric and current CCMessage
	ProcessMetric(m lp.CCMetric) (lp2.CCMessage, error)
}

Syntax for evaluatable terms

The message processor uses gval for evaluating the terms. It provides a basic set of operators like string comparison and arithmetic operations.

Accessible for operations are

  • name of the message
  • timestamp or time of the message
  • type, type-id of the message (also tag_type, tag_type-id and tag_typeid)
  • stype, stype-id of the message (if message has theses tags, also tag_stype, tag_stype-id and tag_stypeid)
  • value for a CCMetric message (also field_value)
  • event for a CCEvent message (also field_event)
  • control for a CCControl message (also field_control)
  • log for a CCLog message (also field_log)
  • messagetype or msgtype. Possible values event, metric, log and control.

Generally, all tags are accessible with tag_<tagkey>, tags_<tagkey> or tags.<tagkey>. Similarly for all fields with field[s]?[_.]<fieldkey>. For meta information meta[_.]<metakey> (there is no metas[_.]<metakey>).

The syntax of expr is accepted with some additions:

  • Comparing strings: ==, !=, str matches regex (use % instead of \!)
  • Combining conditions: &&, ||
  • Comparing numbers: ==, !=, <, >, <=, >=
  • Test lists: <value> in <list>
  • Topological tests: tag_type-id in getCpuListOfType("socket", "1") (test if the metric belongs to socket 1 in local node topology)

Often the operations are written in JSON files for loading them at startup. In JSON, some characters are not allowed. Therefore, the term syntax reflects that:

  • use '' instead of "" for strings
  • for the regexes, use % instead of \

For operations that should be applied on all messages, use the condition true.

Overhead

The operations taking conditions are pre-processed, which is commonly the time consuming part but, of course, with each added operation, the time to process a message increases. Moreover, the processing creates a copy of the message.