cc-metric-collector/pkg/messageProcessor
Thomas Gruber 7840de7b82
Merge develop branch into main (#123)
* Add cpu_used (all-cpu_idle) to CpustatCollector

* Update cc-metric-collector.init

* Allow selection of timestamp precision in HttpSink

* Add comment about precision requirement for cc-metric-store

* Fix for API changes in gofish@v0.15.0

* Update requirements to latest version

* Read sensors through redfish

* Update golang toolchain to 1.21

* Remove stray error check

* Update main config in configuration.md

* Update Release action to use golang 1.22 stable release, no golang RPMs anymore

* Update runonce action to use golang 1.22 stable release, no golang RPMs anymore

* Update README.md

Use right JSON type in configuration

* Update sink's README

* Test whether ipmitool or ipmi-sensors can be executed without errors

* Little fixes to the prometheus sink (#115)

* Add uint64 to float64 cast option

* Add prometheus sink to the list of available sinks

* Add aggregated counters by gpu for nvlink errors

---------

Co-authored-by: Michael Schwarz <schwarz@uni-paderborn.de>

* Ccmessage migration (#119)

* Add cpu_used (all-cpu_idle) to CpustatCollector

* Update cc-metric-collector.init

* Allow selection of timestamp precision in HttpSink

* Add comment about precision requirement for cc-metric-store

* Fix for API changes in gofish@v0.15.0

* Update requirements to latest version

* Read sensors through redfish

* Update golang toolchain to 1.21

* Remove stray error check

* Update main config in configuration.md

* Update Release action to use golang 1.22 stable release, no golang RPMs anymore

* Update runonce action to use golang 1.22 stable release, no golang RPMs anymore

* Switch to CCMessage for all files.

---------

Co-authored-by: Holger Obermaier <Holger.Obermaier@kit.edu>
Co-authored-by: Holger Obermaier <40787752+ho-ob@users.noreply.github.com>

* Switch to ccmessage also for latest additions in nvidiaMetric

* New Message processor (#118)

* Add cpu_used (all-cpu_idle) to CpustatCollector

* Update cc-metric-collector.init

* Allow selection of timestamp precision in HttpSink

* Add comment about precision requirement for cc-metric-store

* Fix for API changes in gofish@v0.15.0

* Update requirements to latest version

* Read sensors through redfish

* Update golang toolchain to 1.21

* Remove stray error check

* Update main config in configuration.md

* Update Release action to use golang 1.22 stable release, no golang RPMs anymore

* Update runonce action to use golang 1.22 stable release, no golang RPMs anymore

* New message processor to check whether a message should be dropped or manipulate it in flight

* Create a copy of message before manipulation

---------

Co-authored-by: Holger Obermaier <Holger.Obermaier@kit.edu>
Co-authored-by: Holger Obermaier <40787752+ho-ob@users.noreply.github.com>

* Update collector's Makefile and go.mod/sum files

* Use message processor in router, all sinks and all receivers

* Add support for credential file (NKEY) to NATS sink and receiver

* Fix JSON keys in message processor configuration

* Update docs for message processor, router and the default router config file

* Add link to expr syntax and fix regex matching docs

* Update sample collectors

* Minor style change in collector manager

* Some helpers for ccTopology

* LIKWID collector: write log owner change only once

* Fix for metrics without units and reduce debugging messages for messageProcessor

* Use shorted hostname for hostname added by router

* Define default port for NATS

* CPUstat collector: only add unit for applicable metrics

* Add precision option to all sinks using Influx's encoder

* Add message processor to all sink documentation

* Add units to documentation of cpustat collector

---------

Co-authored-by: Holger Obermaier <Holger.Obermaier@kit.edu>
Co-authored-by: Holger Obermaier <40787752+ho-ob@users.noreply.github.com>
Co-authored-by: oscarminus <me@oscarminus.de>
Co-authored-by: Michael Schwarz <schwarz@uni-paderborn.de>
2024-12-19 23:00:14 +01:00
..
messageProcessor_test.go New Message processor (#118) 2024-12-11 19:06:50 +01:00
messageProcessor.go Merge develop branch into main (#123) 2024-12-19 23:00:14 +01:00
messageProcessorFuncs.go Merge develop branch into main (#123) 2024-12-19 23:00:14 +01:00
README.md Merge develop branch into main (#123) 2024-12-19 23:00:14 +01:00

Message Processor Component

Multiple parts of in the ClusterCockit ecosystem require the processing of CCMessages. The main CC application using it is cc-metric-collector. The processing part there was originally in the metric router, the central hub connecting collectors (reading local data), receivers (receiving remote data) and sinks (sending data). Already in early stages, the lack of flexibility caused some trouble:

The sysadmins wanted to keep operating their Ganglia based monitoring infrastructure while we developed the CC stack. Ganglia wants the core metrics with a specific name and resolution (right unit prefix) but there was no conversion of the data in the CC stack, so CC frontend developers wanted a different resolution for some metrics. The issue was basically the mem_used metric showing the currently used memory of the node. Ganglia wants it in kByte as provided by the Linux operating system but CC wanted it in GByte.

With the message processor, the Ganglia sinks can apply the unit prefix changes individually and name the metrics as required by Ganglia.

For developers

Whenever you receive or are about to send a message out, you should provide some processing.

Configuration of component

New operations can be added to the message processor at runtime. Of course, they can also be removed again. For the initial setup, having a configuration file or some fields in a configuration file for the processing.

The message processor uses the following configuration

{
	"drop_messages": [
		"name_of_message_to_drop"
	],
	"drop_messages_if": [
		"condition_when_to_drop_message",
		"name == 'drop_this'",
		"tag.hostname == 'this_host'",
		"meta.unit != 'MB'"
	],
	"rename_messages" : {
		"old_message_name" : "new_message_name"
	},
	"rename_messages_if": {
		"condition_when_to_rename_message" : "new_name"
	},
	"add_tags_if": [
		{
			"if" : "condition_when_to_add_tag",
			"key": "name_for_new_tag",
			"value": "new_tag_value"
		}
	],
	"delete_tags_if": [
		{
			"if" : "condition_when_to_delete_tag",
			"key": "name_of_tag"
		}
	],
	"add_meta_if": [
		{
			"if" : "condition_when_to_add_meta_info",
			"key": "name_for_new_meta_info",
			"value": "new_meta_info_value"
		}
	],
	"delete_meta_if": [
		{
			"if" : "condition_when_to_delete_meta_info",
			"key": "name_of_meta_info"
		}
	],
	"add_field_if": [
		{
			"if" : "condition_when_to_add_field",
			"key": "name_for_new_field",
			"value": "new_field_value_but_only_string_at_the_moment"
		}
	],
	"delete_field_if": [
		{
			"if" : "condition_when_to_delete_field",
			"key": "name_of_field"
		}
	],
	"move_tag_to_meta_if": [
		{
			"if" : "condition_when_to_move_tag_to_meta_info_including_its_value",
			"key": "name_of_tag",
			"value": "name_of_meta_info"
		}
	],
	"move_tag_to_field_if": [
		{
			"if" : "condition_when_to_move_tag_to_fields_including_its_value",
			"key": "name_of_tag",
			"value": "name_of_field"
		}
	],
	"move_meta_to_tag_if": [
		{
			"if" : "condition_when_to_move_meta_info_to_tags_including_its_value",
			"key": "name_of_meta_info",
			"value": "name_of_tag"
		}
	],
	"move_meta_to_field_if": [
		{
			"if" : "condition_when_to_move_meta_info_to_fields_including_its_value",
			"key": "name_of_tag",
			"value": "name_of_meta_info"
		}
	],
	"move_field_to_tag_if": [
		{
			"if" : "condition_when_to_move_field_to_tags_including_its_stringified_value",
			"key": "name_of_field",
			"value": "name_of_tag"
		}
	],
	"move_field_to_meta_if": [
		{
			"if" : "condition_when_to_move_field_to_meta_info_including_its_stringified_value",
			"key": "name_of_field",
			"value": "name_of_meta_info"
		}
	],
	"drop_by_message_type": [
		"metric",
		"event",
		"log",
		"control"
	],
	"change_unit_prefix": {
		"name == 'metric_with_wrong_unit_prefix'" : "G",
		"only_if_messagetype == 'metric'": "T"
	},
	"normalize_units": true,
	"add_base_env": {
		"MY_CONSTANT_FOR_CUSTOM_CONDITIONS": 1.0,
		"output_value_for_test_metrics": 42.0,
	},
	"stage_order": [
		"rename_messages_if",
		"drop_messages"
	]
}

The options change_unit_prefix and normalize_units are only applied to CCMetrics. It is not possible to delete the field related to each message type as defined in cc-specification. In short:

  • CCMetrics always have to have a field named value
  • CCEvents always have to have a field named event
  • CCLogs always have to have a field named log
  • CCControl messages always have to have a field named control

With add_base_env, one can specifiy mykey=myvalue pairs that can be used in conditions like tag.type == mykey.

The order in which each message is processed, can be specified with the stage_order option. The stage names are the keys in the JSON configuration, thus change_unit_prefix, move_field_to_meta_if, etc. Stages can be listed multiple times.

Using the component

In order to load the configuration from a json.RawMessage:

mp, err := NewMessageProcessor()
if err != nil {
	log.Error("failed to create new message processor")
}
mp.FromConfigJSON(configJson)

After initialization and adding the different operations, the ProcessMessage() function applies all operations and returns whether the message should be dropped.

m := lp.CCMetric{}

x, err := mp.ProcessMessage(m)
if err != nil {
	// handle error
}
if x != nil {
    // process x further
} else {
	// this message got dropped
}

Single operations can be added and removed at runtime

type MessageProcessor interface {
	// Functions to set the execution order of the processing stages
	SetStages([]string) error
	DefaultStages() []string
	// Function to add variables to the base evaluation environment
	AddBaseEnv(env map[string]interface{}) error
	// Functions to add and remove rules
	AddDropMessagesByName(name string) error
	RemoveDropMessagesByName(name string)
	AddDropMessagesByCondition(condition string) error
	RemoveDropMessagesByCondition(condition string)
	AddRenameMetricByCondition(condition string, name string) error
	RemoveRenameMetricByCondition(condition string)
	AddRenameMetricByName(from, to string) error
	RemoveRenameMetricByName(from string)
	SetNormalizeUnits(settings bool)
	AddChangeUnitPrefix(condition string, prefix string) error
	RemoveChangeUnitPrefix(condition string)
	AddAddTagsByCondition(condition, key, value string) error
	RemoveAddTagsByCondition(condition string)
	AddDeleteTagsByCondition(condition, key, value string) error
	RemoveDeleteTagsByCondition(condition string)
	AddAddMetaByCondition(condition, key, value string) error
	RemoveAddMetaByCondition(condition string)
	AddDeleteMetaByCondition(condition, key, value string) error
	RemoveDeleteMetaByCondition(condition string)
	AddMoveTagToMeta(condition, key, value string) error
	RemoveMoveTagToMeta(condition string)
	AddMoveTagToFields(condition, key, value string) error
	RemoveMoveTagToFields(condition string)
	AddMoveMetaToTags(condition, key, value string) error
	RemoveMoveMetaToTags(condition string)
	AddMoveMetaToFields(condition, key, value string) error
	RemoveMoveMetaToFields(condition string)
	AddMoveFieldToTags(condition, key, value string) error
	RemoveMoveFieldToTags(condition string)
	AddMoveFieldToMeta(condition, key, value string) error
	RemoveMoveFieldToMeta(condition string)
	// Read in a JSON configuration
	FromConfigJSON(config json.RawMessage) error
	ProcessMessage(m lp2.CCMessage) (lp2.CCMessage, error)
	// Processing functions for legacy CCMetric and current CCMessage
	ProcessMetric(m lp.CCMetric) (lp2.CCMessage, error)
}

Syntax for evaluatable terms

The message processor uses gval for evaluating the terms. It provides a basic set of operators like string comparison and arithmetic operations.

Accessible for operations are

  • name of the message
  • timestamp or time of the message
  • type, type-id of the message (also tag_type, tag_type-id and tag_typeid)
  • stype, stype-id of the message (if message has theses tags, also tag_stype, tag_stype-id and tag_stypeid)
  • value for a CCMetric message (also field_value)
  • event for a CCEvent message (also field_event)
  • control for a CCControl message (also field_control)
  • log for a CCLog message (also field_log)
  • messagetype or msgtype. Possible values event, metric, log and control.

Generally, all tags are accessible with tag_<tagkey>, tags_<tagkey> or tags.<tagkey>. Similarly for all fields with field[s]?[_.]<fieldkey>. For meta information meta[_.]<metakey> (there is no metas[_.]<metakey>).

The syntax of expr is accepted with some additions:

  • Comparing strings: ==, !=, str matches regex (use % instead of \!)
  • Combining conditions: &&, ||
  • Comparing numbers: ==, !=, <, >, <=, >=
  • Test lists: <value> in <list>
  • Topological tests: tag_type-id in getCpuListOfType("socket", "1") (test if the metric belongs to socket 1 in local node topology)

Often the operations are written in JSON files for loading them at startup. In JSON, some characters are not allowed. Therefore, the term syntax reflects that:

  • use '' instead of "" for strings
  • for the regexes, use % instead of \

For operations that should be applied on all messages, use the condition true.

Overhead

The operations taking conditions are pre-processed, which is commonly the time consuming part but, of course, with each added operation, the time to process a message increases. Moreover, the processing creates a copy of the message.