mirror of
https://github.com/ClusterCockpit/cc-metric-collector.git
synced 2025-04-05 21:25:55 +02:00
Update docs for message processor, router and the default router config file
This commit is contained in:
parent
beeea9e3aa
commit
e91fc6004f
@ -1,15 +1,21 @@
|
||||
# CC Metric Router
|
||||
|
||||
The CCMetric router sits in between the collectors and the sinks and can be used to add and remove tags to/from traversing [CCMetrics](../ccMetric/README.md).
|
||||
The CCMetric router sits in between the collectors and the sinks and can be used to add and remove tags to/from traversing [CCMessages](https://pkg.go.dev/github.com/ClusterCockpit/cc-energy-manager@v0.0.0-20240919152819-92a17f2da4f7/pkg/cc-message.
|
||||
|
||||
|
||||
# Configuration
|
||||
|
||||
**Note**: Use the [message processor configuration](../../pkg/messageProcessor/README.md) with option `process_messages`.
|
||||
|
||||
```json
|
||||
{
|
||||
"num_cache_intervals" : 1,
|
||||
"interval_timestamp" : true,
|
||||
"hostname_tag" : "hostname",
|
||||
"max_forward" : 50,
|
||||
"process_messages": {
|
||||
"see": "pkg/messageProcessor/README.md"
|
||||
},
|
||||
"add_tags" : [
|
||||
{
|
||||
"key" : "cluster",
|
||||
@ -63,6 +69,8 @@ The CCMetric router sits in between the collectors and the sinks and can be used
|
||||
|
||||
There are three main options `add_tags`, `delete_tags` and `interval_timestamp`. `add_tags` and `delete_tags` are lists consisting of dicts with `key`, `value` and `if`. The `value` can be omitted in the `delete_tags` part as it only uses the `key` for removal. The `interval_timestamp` setting means that a unique timestamp is applied to all metrics traversing the router during an interval.
|
||||
|
||||
**Note**: Use the [message processor configuration](../../pkg/messageProcessor/README.md) (option `process_messages`) instead of `add_tags`, `delete_tags`, `drop_metrics`, `drop_metrics_if`, `rename_metrics`, `normalize_units` and `change_unit_prefix`. These options are deprecated and will be removed in future versions. Until then, they are added to the message processor.
|
||||
|
||||
# Processing order in the router
|
||||
|
||||
- Add the `hostname_tag` tag (if sent by collectors or cache)
|
||||
@ -96,6 +104,8 @@ Every time the router receives a metric through any of the channels, it tries to
|
||||
|
||||
# The `rename_metrics` option
|
||||
|
||||
__deprecated__
|
||||
|
||||
In the ClusterCockpit world we specified a set of standard metrics. Since some collectors determine the metric names based on files, execuables and libraries, they might change from system to system (or installation to installtion, OS to OS, ...). In order to get the common names, you can rename incoming metrics before sending them to the sink. If the metric name matches the `oldname`, it is changed to `newname`
|
||||
|
||||
```json
|
||||
@ -107,6 +117,8 @@ In the ClusterCockpit world we specified a set of standard metrics. Since some c
|
||||
|
||||
# Conditional manipulation of tags (`add_tags` and `del_tags`)
|
||||
|
||||
__deprecated__
|
||||
|
||||
Common config format:
|
||||
```json
|
||||
{
|
||||
@ -118,6 +130,8 @@ Common config format:
|
||||
|
||||
## The `del_tags` option
|
||||
|
||||
__deprecated__
|
||||
|
||||
The collectors are free to add whatever `key=value` pair to the metric tags (although the usage of tags should be minimized). If you want to delete a tag afterwards, you can do that. When the `if` condition matches on a metric, the `key` is removed from the metric's tags.
|
||||
|
||||
If you want to remove a tag for all metrics, use the condition wildcard `*`. The `value` field can be omitted in the `del_tags` case.
|
||||
@ -129,6 +143,8 @@ Never delete tags:
|
||||
|
||||
## The `add_tags` option
|
||||
|
||||
__deprecated__
|
||||
|
||||
In some cases, metrics should be tagged or an existing tag changed based on some condition. This can be done in the `add_tags` section. When the `if` condition evaluates to `true`, the tag `key` is added or gets changed to the new `value`.
|
||||
|
||||
If the CCMetric name is equal to `temp_package_id_0`, it adds an additional tag `test=testing` to the metric.
|
||||
@ -170,6 +186,8 @@ In some cases, you want to drop a metric and don't get it forwarded to the sinks
|
||||
|
||||
## The `drop_metrics` section
|
||||
|
||||
__deprecated__
|
||||
|
||||
The argument is a list of metric names. No futher checks are performed, only a comparison of the metric name
|
||||
|
||||
```json
|
||||
@ -185,6 +203,8 @@ The example drops all metrics with the name `drop_metric_1` and `drop_metric_2`.
|
||||
|
||||
## The `drop_metrics_if` section
|
||||
|
||||
__deprecated__
|
||||
|
||||
This option takes a list of evaluable conditions and performs them one after the other on **all** metrics incoming from the collectors and the metric cache (aka `interval_aggregates`).
|
||||
|
||||
```json
|
||||
@ -200,15 +220,22 @@ The first line is comparable with the example in `drop_metrics`, it drops all me
|
||||
# Manipulating the metric units
|
||||
|
||||
## The `normalize_units` option
|
||||
|
||||
__deprecated__
|
||||
|
||||
|
||||
The cc-metric-collector tries to read the data from the system as it is reported. If available, it tries to read the metric unit from the system as well (e.g. from `/proc/meminfo`). The problem is that, depending on the source, the metric units are named differently. Just think about `byte`, `Byte`, `B`, `bytes`, ...
|
||||
The [cc-units](https://github.com/ClusterCockpit/cc-units) package provides us a normalization option to use the same metric unit name for all metrics. It this option is set to true, all `unit` meta tags are normalized.
|
||||
|
||||
## The `change_unit_prefix` section
|
||||
|
||||
__deprecated__
|
||||
|
||||
It is often the case that metrics are reported by the system using a rather outdated unit prefix (like `/proc/meminfo` still uses kByte despite current memory sizes are in the GByte range). If you want to change the prefix of a unit, you can do that with the help of [cc-units](https://github.com/ClusterCockpit/cc-units). The setting works on the metric name and requires the new prefix for the metric. The cc-units package determines the scaling factor.
|
||||
|
||||
# Aggregate metric values of the current interval with the `interval_aggregates` option
|
||||
|
||||
**Note:** `interval_aggregates` works only if `num_cache_intervals` > 0
|
||||
**Note:** `interval_aggregates` works only if `num_cache_intervals` > 0 and is **experimental**
|
||||
|
||||
In some cases, you need to derive new metrics based on the metrics arriving during an interval. This can be done in the `interval_aggregates` section. The logic is similar to the other metric manipulation and filtering options. A cache stores all metrics that arrive during an interval. At the beginning of the *next* interval, the list of metrics is submitted to the MetricAggregator. It derives new metrics and submits them back to the MetricRouter, so they are sent in the next interval but have the timestamp of the previous interval beginning.
|
||||
|
||||
|
@ -10,6 +10,8 @@ lack of flexibility caused some trouble:
|
||||
> resolution for some metrics. The issue was basically the `mem_used` metric showing the currently used memory of the node. Ganglia wants it in `kByte` as provided
|
||||
> by the Linux operating system but CC wanted it in `GByte`.
|
||||
|
||||
With the message processor, the Ganglia sinks can apply the unit prefix changes individually and name the metrics as required by Ganglia.
|
||||
|
||||
## For developers
|
||||
|
||||
Whenever you receive or are about to send a message out, you should provide some processing.
|
||||
@ -20,41 +22,209 @@ New operations can be added to the message processor at runtime. Of course, they
|
||||
or some fields in a configuration file for the processing.
|
||||
|
||||
The message processor uses the following configuration
|
||||
```golang
|
||||
type messageProcessorConfig struct {
|
||||
DropMessages []string `json:"drop_messages"` // List of metric names to drop. For fine-grained dropping use drop_messages_if
|
||||
DropMessagesIf []string `json:"drop_messages_if"` // List of evaluatable terms to drop messages
|
||||
RenameMessages map[string]string `json:"rename_messages"` // Map to rename metric name from key to value
|
||||
NormalizeUnits bool `json:"normalize_units"` // Check unit meta flag and normalize it using cc-units
|
||||
ChangeUnitPrefix map[string]string `json:"change_unit_prefix"` // Add prefix that should be applied to the messages
|
||||
```json
|
||||
{
|
||||
"drop_messages": [
|
||||
"name_of_message_to_drop"
|
||||
],
|
||||
"drop_messages_if": [
|
||||
"condition_when_to_drop_message",
|
||||
"name == 'drop_this'",
|
||||
"tag.hostname == 'this_host'",
|
||||
"meta.unit != 'MB'"
|
||||
],
|
||||
"rename_messages" : {
|
||||
"old_message_name" : "new_message_name"
|
||||
},
|
||||
"rename_messages_if": {
|
||||
"condition_when_to_rename_message" : "new_name"
|
||||
},
|
||||
"add_tags_if": [
|
||||
{
|
||||
"if" : "condition_when_to_add_tag",
|
||||
"key": "name_for_new_tag",
|
||||
"value": "new_tag_value"
|
||||
}
|
||||
],
|
||||
"delete_tags_if": [
|
||||
{
|
||||
"if" : "condition_when_to_delete_tag",
|
||||
"key": "name_of_tag"
|
||||
}
|
||||
],
|
||||
"add_meta_if": [
|
||||
{
|
||||
"if" : "condition_when_to_add_meta_info",
|
||||
"key": "name_for_new_meta_info",
|
||||
"value": "new_meta_info_value"
|
||||
}
|
||||
],
|
||||
"delete_meta_if": [
|
||||
{
|
||||
"if" : "condition_when_to_delete_meta_info",
|
||||
"key": "name_of_meta_info"
|
||||
}
|
||||
],
|
||||
"add_field_if": [
|
||||
{
|
||||
"if" : "condition_when_to_add_field",
|
||||
"key": "name_for_new_field",
|
||||
"value": "new_field_value_but_only_string_at_the_moment"
|
||||
}
|
||||
],
|
||||
"delete_field_if": [
|
||||
{
|
||||
"if" : "condition_when_to_delete_field",
|
||||
"key": "name_of_field"
|
||||
}
|
||||
],
|
||||
"move_tag_to_meta_if": [
|
||||
{
|
||||
"if" : "condition_when_to_move_tag_to_meta_info_including_its_value",
|
||||
"key": "name_of_tag",
|
||||
"value": "name_of_meta_info"
|
||||
}
|
||||
],
|
||||
"move_tag_to_field_if": [
|
||||
{
|
||||
"if" : "condition_when_to_move_tag_to_fields_including_its_value",
|
||||
"key": "name_of_tag",
|
||||
"value": "name_of_field"
|
||||
}
|
||||
],
|
||||
"move_meta_to_tag_if": [
|
||||
{
|
||||
"if" : "condition_when_to_move_meta_info_to_tags_including_its_value",
|
||||
"key": "name_of_meta_info",
|
||||
"value": "name_of_tag"
|
||||
}
|
||||
],
|
||||
"move_meta_to_field_if": [
|
||||
{
|
||||
"if" : "condition_when_to_move_meta_info_to_fields_including_its_value",
|
||||
"key": "name_of_tag",
|
||||
"value": "name_of_meta_info"
|
||||
}
|
||||
],
|
||||
"move_field_to_tag_if": [
|
||||
{
|
||||
"if" : "condition_when_to_move_field_to_tags_including_its_stringified_value",
|
||||
"key": "name_of_field",
|
||||
"value": "name_of_tag"
|
||||
}
|
||||
],
|
||||
"move_field_to_meta_if": [
|
||||
{
|
||||
"if" : "condition_when_to_move_field_to_meta_info_including_its_stringified_value",
|
||||
"key": "name_of_field",
|
||||
"value": "name_of_meta_info"
|
||||
}
|
||||
],
|
||||
"drop_by_message_type": [
|
||||
"metric",
|
||||
"event",
|
||||
"log",
|
||||
"control"
|
||||
],
|
||||
"change_unit_prefix": {
|
||||
"name == 'metric_with_wrong_unit_prefix'" : "G",
|
||||
"only_if_messagetype == 'metric'": "T"
|
||||
},
|
||||
"normalize_units": true,
|
||||
"add_base_env": {
|
||||
"MY_CONSTANT_FOR_CUSTOM_CONDITIONS": 1.0,
|
||||
"output_value_for_test_metrics": 42.0,
|
||||
},
|
||||
"stage_order": [
|
||||
"rename_messages_if",
|
||||
"drop_messages"
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
The options `change_unit_prefix` and `normalize_units` are only applied to CCMetrics. It is not possible to delete the field related to each message type as defined in [cc-specification](https://github.com/ClusterCockpit/cc-specifications/tree/master/interfaces/lineprotocol). In short:
|
||||
- CCMetrics always have to have a field named `value`
|
||||
- CCEvents always have to have a field named `event`
|
||||
- CCLogs always have to have a field named `log`
|
||||
- CCControl messages always have to have a field named `control`
|
||||
|
||||
With `add_base_env`, one can specifiy mykey=myvalue pairs that can be used in conditions like `tag.type == mykey`.
|
||||
|
||||
The order in which each message is processed, can be specified with the `stage_order` option. The stage names are the keys in the JSON configuration, thus `change_unit_prefix`, `move_field_to_meta_if`, etc. Stages can be listed multiple times.
|
||||
|
||||
### Using the component
|
||||
In order to load the configuration from a `json.RawMessage`:
|
||||
```golang
|
||||
mp, _ := NewMessageProcessor()
|
||||
|
||||
mp, err := NewMessageProcessor()
|
||||
if err != nil {
|
||||
log.Error("failed to create new message processor")
|
||||
}
|
||||
mp.FromConfigJSON(configJson)
|
||||
```
|
||||
|
||||
### Using the component
|
||||
After initialization and adding the different operations, the `ProcessMessage()` function applies all operations and returns whether the message should be dropped.
|
||||
|
||||
```golang
|
||||
m := lp.CCMetric{}
|
||||
|
||||
drop, err := mp.ProcessMessage(m)
|
||||
if !drop {
|
||||
// process further
|
||||
x, err := mp.ProcessMessage(m)
|
||||
if err != nil {
|
||||
// handle error
|
||||
}
|
||||
if x != nil {
|
||||
// process x further
|
||||
} else {
|
||||
// this message got dropped
|
||||
}
|
||||
```
|
||||
|
||||
#### Overhead
|
||||
|
||||
The operations taking conditions are pre-processed, which is commonly the time consuming part but, of course, with each added operation, the time to process a message
|
||||
increases.
|
||||
|
||||
## For users
|
||||
Single operations can be added and removed at runtime
|
||||
```golang
|
||||
type MessageProcessor interface {
|
||||
// Functions to set the execution order of the processing stages
|
||||
SetStages([]string) error
|
||||
DefaultStages() []string
|
||||
// Function to add variables to the base evaluation environment
|
||||
AddBaseEnv(env map[string]interface{}) error
|
||||
// Functions to add and remove rules
|
||||
AddDropMessagesByName(name string) error
|
||||
RemoveDropMessagesByName(name string)
|
||||
AddDropMessagesByCondition(condition string) error
|
||||
RemoveDropMessagesByCondition(condition string)
|
||||
AddRenameMetricByCondition(condition string, name string) error
|
||||
RemoveRenameMetricByCondition(condition string)
|
||||
AddRenameMetricByName(from, to string) error
|
||||
RemoveRenameMetricByName(from string)
|
||||
SetNormalizeUnits(settings bool)
|
||||
AddChangeUnitPrefix(condition string, prefix string) error
|
||||
RemoveChangeUnitPrefix(condition string)
|
||||
AddAddTagsByCondition(condition, key, value string) error
|
||||
RemoveAddTagsByCondition(condition string)
|
||||
AddDeleteTagsByCondition(condition, key, value string) error
|
||||
RemoveDeleteTagsByCondition(condition string)
|
||||
AddAddMetaByCondition(condition, key, value string) error
|
||||
RemoveAddMetaByCondition(condition string)
|
||||
AddDeleteMetaByCondition(condition, key, value string) error
|
||||
RemoveDeleteMetaByCondition(condition string)
|
||||
AddMoveTagToMeta(condition, key, value string) error
|
||||
RemoveMoveTagToMeta(condition string)
|
||||
AddMoveTagToFields(condition, key, value string) error
|
||||
RemoveMoveTagToFields(condition string)
|
||||
AddMoveMetaToTags(condition, key, value string) error
|
||||
RemoveMoveMetaToTags(condition string)
|
||||
AddMoveMetaToFields(condition, key, value string) error
|
||||
RemoveMoveMetaToFields(condition string)
|
||||
AddMoveFieldToTags(condition, key, value string) error
|
||||
RemoveMoveFieldToTags(condition string)
|
||||
AddMoveFieldToMeta(condition, key, value string) error
|
||||
RemoveMoveFieldToMeta(condition string)
|
||||
// Read in a JSON configuration
|
||||
FromConfigJSON(config json.RawMessage) error
|
||||
ProcessMessage(m lp2.CCMessage) (lp2.CCMessage, error)
|
||||
// Processing functions for legacy CCMetric and current CCMessage
|
||||
ProcessMetric(m lp.CCMetric) (lp2.CCMessage, error)
|
||||
}
|
||||
```
|
||||
|
||||
### Syntax for evaluatable terms
|
||||
|
||||
@ -62,15 +232,16 @@ The message processor uses `gval` for evaluating the terms. It provides a basic
|
||||
|
||||
Accessible for operations are
|
||||
- `name` of the message
|
||||
- `timestamp` of the message
|
||||
- `type`, `type-id` of the message (also `tag_type` and `tag_type-id`)
|
||||
- `stype`, `stype-id` of the message (if message has theses tags, also `tag_stype` and `tag_stype-id`)
|
||||
- `timestamp` or `time` of the message
|
||||
- `type`, `type-id` of the message (also `tag_type`, `tag_type-id` and `tag_typeid`)
|
||||
- `stype`, `stype-id` of the message (if message has theses tags, also `tag_stype`, `tag_stype-id` and `tag_stypeid`)
|
||||
- `value` for a CCMetric message (also `field_value`)
|
||||
- `event` for a CCEvent message (also `field_event`)
|
||||
- `control` for a CCControl message (also `field_control`)
|
||||
- `log` for a CCLog message (also `field_log`)
|
||||
- `messagetype` or `msgtype`. Possible values `event`, `metric`, `log` and `control`.
|
||||
|
||||
Generally, all tags are accessible with `tag_<tagkey>`, all meta information with `meta_<metakey>` and fields with `field_<fieldkey>`.
|
||||
Generally, all tags are accessible with `tag_<tagkey>`, `tags_<tagkey>` or `tags.<tagkey>`. Similarly for all fields with `field[s]?[_.]<fieldkey>`. For meta information `meta[_.]<metakey>` (there is no `metas[_.]<metakey>`).
|
||||
|
||||
- Comparing strings: `==`, `!=`, `match(str, regex)` (use `%` instead of `\`!)
|
||||
- Combining conditions: `&&`, `||`
|
||||
@ -82,3 +253,9 @@ Often the operations are written in JSON files for loading them at startup. In J
|
||||
- use `''` instead of `""` for strings
|
||||
- for the regexes, use `%` instead of `\`
|
||||
|
||||
For operations that should be applied on all messages, use the condition `true`.
|
||||
|
||||
### Overhead
|
||||
|
||||
The operations taking conditions are pre-processed, which is commonly the time consuming part but, of course, with each added operation, the time to process a message
|
||||
increases. Moreover, the processing creates a copy of the message.
|
39
router.json
39
router.json
@ -1,22 +1,23 @@
|
||||
{
|
||||
"add_tags" : [
|
||||
{
|
||||
"key" : "cluster",
|
||||
"value" : "testcluster",
|
||||
"if" : "*"
|
||||
},
|
||||
{
|
||||
"key" : "test",
|
||||
"value" : "testing",
|
||||
"if" : "name == 'temp_package_id_0'"
|
||||
}
|
||||
],
|
||||
"delete_tags" : [
|
||||
{
|
||||
"key" : "unit",
|
||||
"value" : "*",
|
||||
"if" : "*"
|
||||
}
|
||||
],
|
||||
"process_messages" : {
|
||||
"add_tag_if": [
|
||||
{
|
||||
"key" : "cluster",
|
||||
"value" : "testcluster",
|
||||
"if" : "true"
|
||||
},
|
||||
{
|
||||
"key" : "test",
|
||||
"value" : "testing",
|
||||
"if" : "name == 'temp_package_id_0'"
|
||||
}
|
||||
],
|
||||
"delete_tag_if": [
|
||||
{
|
||||
"key" : "unit",
|
||||
"if" : "true"
|
||||
}
|
||||
]
|
||||
},
|
||||
"interval_timestamp" : true
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user