diff --git a/pkg/messageProcessor/README.md b/pkg/messageProcessor/README.md new file mode 100644 index 0000000..2fef020 --- /dev/null +++ b/pkg/messageProcessor/README.md @@ -0,0 +1,84 @@ +# Message Processor Component + +Multiple parts of in the ClusterCockit ecosystem require the processing of CCMessages. +The main CC application using it is `cc-metric-collector`. The processing part there was originally in the metric router, the central +hub connecting collectors (reading local data), receivers (receiving remote data) and sinks (sending data). Already in early stages, the +lack of flexibility caused some trouble: + +> The sysadmins wanted to keep operating their Ganglia based monitoring infrastructure while we developed the CC stack. Ganglia wants the core metrics with +> a specific name and resolution (right unit prefix) but there was no conversion of the data in the CC stack, so CC frontend developers wanted a different +> resolution for some metrics. The issue was basically the `mem_used` metric showing the currently used memory of the node. Ganglia wants it in `kByte` as provided +> by the Linux operating system but CC wanted it in `GByte`. + +## For developers + +Whenever you receive or are about to send a message out, you should provide some processing. + +### Configuration of component + +New operations can be added to the message processor at runtime. Of course, they can also be removed again. For the initial setup, having a configuration file +or some fields in a configuration file for the processing. + +The message processor uses the following configuration +```golang +type messageProcessorConfig struct { + DropMessages []string `json:"drop_messages"` // List of metric names to drop. For fine-grained dropping use drop_messages_if + DropMessagesIf []string `json:"drop_messages_if"` // List of evaluatable terms to drop messages + RenameMessages map[string]string `json:"rename_messages"` // Map to rename metric name from key to value + NormalizeUnits bool `json:"normalize_units"` // Check unit meta flag and normalize it using cc-units + ChangeUnitPrefix map[string]string `json:"change_unit_prefix"` // Add prefix that should be applied to the messages +} +``` + +In order to load the configuration from a `json.RawMessage`: +```golang +mp, _ := NewMessageProcessor() + +mp.FromConfigJSON(configJson) +``` + +### Using the component +After initialization and adding the different operations, the `ProcessMessage()` function applies all operations and returns whether the message should be dropped. + +```golang +m := lp.CCMetric{} + +drop, err := mp.ProcessMessage(m) +if !drop { + // process further +} +``` + +#### Overhead + +The operations taking conditions are pre-processed, which is commonly the time consuming part but, of course, with each added operation, the time to process a message +increases. + +## For users + +### Syntax for evaluatable terms + +The message processor uses `gval` for evaluating the terms. It provides a basic set of operators like string comparison and arithmetic operations. + +Accessible for operations are +- `name` of the message +- `timestamp` of the message +- `type`, `type-id` of the message (also `tag_type` and `tag_type-id`) +- `stype`, `stype-id` of the message (if message has theses tags, also `tag_stype` and `tag_stype-id`) +- `value` for a CCMetric message (also `field_value`) +- `event` for a CCEvent message (also `field_event`) +- `control` for a CCControl message (also `field_control`) +- `log` for a CCLog message (also `field_log`) + +Generally, all tags are accessible with `tag_`, all meta information with `meta_` and fields with `field_`. + +- Comparing strings: `==`, `!=`, `match(str, regex)` (use `%` instead of `\`!) +- Combining conditions: `&&`, `||` +- Comparing numbers: `==`, `!=`, `<`, `>`, `<=`, `>=` +- Test lists: ` in ` +- Topological tests: `tag_type-id in getCpuListOfType("socket", "1")` (test if the metric belongs to socket 1 in local node topology) + +Often the operations are written in JSON files for loading them at startup. In JSON, some characters are not allowed. Therefore, the term syntax reflects that: +- use `''` instead of `""` for strings +- for the regexes, use `%` instead of `\` + diff --git a/pkg/messageProcessor/messageProcessor.go b/pkg/messageProcessor/messageProcessor.go new file mode 100644 index 0000000..16bd542 --- /dev/null +++ b/pkg/messageProcessor/messageProcessor.go @@ -0,0 +1,985 @@ +package messageprocessor + +import ( + "encoding/json" + "fmt" + "strings" + "sync" + + lp2 "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message" + cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" + lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric" + "github.com/expr-lang/expr" + "github.com/expr-lang/expr/vm" +) + +// Message processor add/delete tag/meta configuration +type messageProcessorTagConfig struct { + Key string `json:"key"` // Tag name + Value string `json:"value"` // Tag value + Condition string `json:"if"` // Condition for adding or removing corresponding tag +} + +type messageProcessorConfig struct { + StageOrder []string `json:"stage_order,omitempty"` // List of stages to execute them in the specified order and to skip unrequired ones + DropMessages []string `json:"drop_messages,omitempty"` // List of metric names to drop. For fine-grained dropping use drop_messages_if + DropMessagesIf []string `json:"drop_messages_if,omitempty"` // List of evaluatable terms to drop messages + RenameMessages map[string]string `json:"rename_messages,omitempty"` // Map of metric names to rename + RenameMessagesIf map[string]string `json:"rename_messages_if,omitempty"` // Map to rename metric name based on a condition + NormalizeUnits bool `json:"normalize_units,omitempty"` // Check unit meta flag and normalize it using cc-units + ChangeUnitPrefix map[string]string `json:"change_unit_prefix,omitempty"` // Add prefix that should be applied to the messages + AddTagsIf []messageProcessorTagConfig `json:"add_tags_if"` // List of tags that are added when the condition is met + DelTagsIf []messageProcessorTagConfig `json:"delete_tags_if"` // List of tags that are removed when the condition is met + AddMetaIf []messageProcessorTagConfig `json:"add_meta_if"` // List of meta infos that are added when the condition is met + DelMetaIf []messageProcessorTagConfig `json:"delete_meta_if"` // List of meta infos that are removed when the condition is met + AddFieldIf []messageProcessorTagConfig `json:"add_fields_if"` // List of fields that are added when the condition is met + DelFieldIf []messageProcessorTagConfig `json:"delete_fields_if"` // List of fields that are removed when the condition is met + DropByType []string `json:"drop_by_message_type"` // List of message types that should be dropped + MoveTagToMeta []messageProcessorTagConfig `json:"move_tag_to_meta_if"` + MoveTagToField []messageProcessorTagConfig `json:"move_tag_to_field_if"` + MoveMetaToTag []messageProcessorTagConfig `json:"move_meta_to_tag_if"` + MoveMetaToField []messageProcessorTagConfig `json:"move_meta_to_field_if"` + MoveFieldToTag []messageProcessorTagConfig `json:"move_field_to_tag_if"` + MoveFieldToMeta []messageProcessorTagConfig `json:"move_field_to_meta_if"` + AddBaseEnv map[string]interface{} `json:"add_base_env"` +} + +type messageProcessor struct { + + // For thread-safety + mutex sync.RWMutex + + // mapping contains all evalables as strings to gval.Evaluable + // because it is not possible to get the original string out of + // a gval.Evaluable + mapping map[string]*vm.Program + + stages []string // order of stage execution + dropMessages map[string]struct{} // internal lookup map + dropTypes map[string]struct{} // internal lookup map + dropMessagesIf map[*vm.Program]struct{} // pre-processed dropMessagesIf + renameMessages map[string]string // internal lookup map + renameMessagesIf map[*vm.Program]string // pre-processed RenameMessagesIf + changeUnitPrefix map[*vm.Program]string // pre-processed ChangeUnitPrefix + normalizeUnits bool + addTagsIf map[*vm.Program]messageProcessorTagConfig // pre-processed AddTagsIf + deleteTagsIf map[*vm.Program]messageProcessorTagConfig // pre-processed DelTagsIf + addMetaIf map[*vm.Program]messageProcessorTagConfig // pre-processed AddMetaIf + deleteMetaIf map[*vm.Program]messageProcessorTagConfig // pre-processed DelMetaIf + addFieldIf map[*vm.Program]messageProcessorTagConfig // pre-processed AddFieldIf + deleteFieldIf map[*vm.Program]messageProcessorTagConfig // pre-processed DelFieldIf + moveTagToMeta map[*vm.Program]messageProcessorTagConfig // pre-processed MoveTagToMeta + moveTagToField map[*vm.Program]messageProcessorTagConfig // pre-processed MoveTagToField + moveMetaToTag map[*vm.Program]messageProcessorTagConfig // pre-processed MoveMetaToTag + moveMetaToField map[*vm.Program]messageProcessorTagConfig // pre-processed MoveMetaToField + moveFieldToTag map[*vm.Program]messageProcessorTagConfig // pre-processed MoveFieldToTag + moveFieldToMeta map[*vm.Program]messageProcessorTagConfig // pre-processed MoveFieldToMeta +} + +type MessageProcessor interface { + // Functions to set the execution order of the processing stages + SetStages([]string) error + DefaultStages() []string + // Function to add variables to the base evaluation environment + AddBaseEnv(env map[string]interface{}) error + // Functions to add and remove rules + AddDropMessagesByName(name string) error + RemoveDropMessagesByName(name string) + AddDropMessagesByCondition(condition string) error + RemoveDropMessagesByCondition(condition string) + AddRenameMetricByCondition(condition string, name string) error + RemoveRenameMetricByCondition(condition string) + AddRenameMetricByName(from, to string) error + RemoveRenameMetricByName(from string) + SetNormalizeUnits(settings bool) + AddChangeUnitPrefix(condition string, prefix string) error + RemoveChangeUnitPrefix(condition string) + AddAddTagsByCondition(condition, key, value string) error + RemoveAddTagsByCondition(condition string) + AddDeleteTagsByCondition(condition, key, value string) error + RemoveDeleteTagsByCondition(condition string) + AddAddMetaByCondition(condition, key, value string) error + RemoveAddMetaByCondition(condition string) + AddDeleteMetaByCondition(condition, key, value string) error + RemoveDeleteMetaByCondition(condition string) + AddMoveTagToMeta(condition, key, value string) error + RemoveMoveTagToMeta(condition string) + AddMoveTagToFields(condition, key, value string) error + RemoveMoveTagToFields(condition string) + AddMoveMetaToTags(condition, key, value string) error + RemoveMoveMetaToTags(condition string) + AddMoveMetaToFields(condition, key, value string) error + RemoveMoveMetaToFields(condition string) + AddMoveFieldToTags(condition, key, value string) error + RemoveMoveFieldToTags(condition string) + AddMoveFieldToMeta(condition, key, value string) error + RemoveMoveFieldToMeta(condition string) + // Read in a JSON configuration + FromConfigJSON(config json.RawMessage) error + // Processing functions for legacy CCMetric and current CCMessage + ProcessMetric(m lp.CCMetric) (bool, error) + ProcessMessage(m lp2.CCMessage) (bool, error) + //EvalToBool(condition string, parameters map[string]interface{}) (bool, error) + //EvalToFloat64(condition string, parameters map[string]interface{}) (float64, error) + //EvalToString(condition string, parameters map[string]interface{}) (string, error) +} + +const ( + STAGENAME_DROP_BY_NAME string = "drop_by_name" + STAGENAME_DROP_BY_TYPE string = "drop_by_type" + STAGENAME_DROP_IF string = "drop_if" + STAGENAME_ADD_TAG string = "add_tag" + STAGENAME_DELETE_TAG string = "delete_tag" + STAGENAME_MOVE_TAG_META string = "move_tag_to_meta" + STAGENAME_MOVE_TAG_FIELD string = "move_tag_to_fields" + STAGENAME_ADD_META string = "add_meta" + STAGENAME_DELETE_META string = "delete_meta" + STAGENAME_MOVE_META_TAG string = "move_meta_to_tags" + STAGENAME_MOVE_META_FIELD string = "move_meta_to_fields" + STAGENAME_ADD_FIELD string = "add_field" + STAGENAME_DELETE_FIELD string = "delete_field" + STAGENAME_MOVE_FIELD_TAG string = "move_field_to_tags" + STAGENAME_MOVE_FIELD_META string = "move_field_to_meta" + STAGENAME_RENAME_BY_NAME string = "rename" + STAGENAME_RENAME_IF string = "rename_if" + STAGENAME_CHANGE_UNIT_PREFIX string = "change_unit_prefix" + STAGENAME_NORMALIZE_UNIT string = "normalize_unit" +) + +var StageNames = []string{ + STAGENAME_DROP_BY_NAME, + STAGENAME_DROP_BY_TYPE, + STAGENAME_DROP_IF, + STAGENAME_ADD_TAG, + STAGENAME_DELETE_TAG, + STAGENAME_MOVE_TAG_META, + STAGENAME_MOVE_TAG_FIELD, + STAGENAME_ADD_META, + STAGENAME_DELETE_META, + STAGENAME_MOVE_META_TAG, + STAGENAME_MOVE_META_FIELD, + STAGENAME_ADD_FIELD, + STAGENAME_DELETE_FIELD, + STAGENAME_MOVE_FIELD_TAG, + STAGENAME_MOVE_FIELD_META, + STAGENAME_RENAME_BY_NAME, + STAGENAME_RENAME_IF, + STAGENAME_CHANGE_UNIT_PREFIX, + STAGENAME_NORMALIZE_UNIT, +} + +var paramMapPool = sync.Pool{ + New: func() any { + return make(map[string]interface{}) + }, +} + +func sanitizeExprString(key string) string { + return strings.ReplaceAll(key, "type-id", "typeid") +} + +func getParamMap(point lp.CCMetric) map[string]interface{} { + params := paramMapPool.Get().(map[string]interface{}) + params["message"] = point + params["msg"] = point + params["name"] = point.Name() + params["timestamp"] = point.Time().Unix() + params["time"] = params["timestamp"] + + fields := paramMapPool.Get().(map[string]interface{}) + for key, value := range point.Fields() { + fields[key] = value + switch key { + case "value": + params["messagetype"] = "metric" + params["value"] = value + params["metric"] = value + case "event": + params["messagetype"] = "event" + params["event"] = value + case "control": + params["messagetype"] = "control" + params["control"] = value + case "log": + params["messagetype"] = "log" + params["log"] = value + default: + params["messagetype"] = "unknown" + } + } + params["msgtype"] = params["messagetype"] + params["fields"] = fields + params["field"] = fields + tags := paramMapPool.Get().(map[string]interface{}) + for key, value := range point.Tags() { + tags[sanitizeExprString(key)] = value + } + params["tags"] = tags + params["tag"] = tags + meta := paramMapPool.Get().(map[string]interface{}) + for key, value := range point.Meta() { + meta[sanitizeExprString(key)] = value + } + params["meta"] = meta + return params +} + +var baseenv = map[string]interface{}{ + "name": "", + "messagetype": "unknown", + "msgtype": "unknown", + "tag": map[string]interface{}{ + "type": "unknown", + "typeid": "0", + "stype": "unknown", + "stypeid": "0", + "hostname": "localhost", + "cluster": "nocluster", + }, + "tags": map[string]interface{}{ + "type": "unknown", + "typeid": "0", + "stype": "unknown", + "stypeid": "0", + "hostname": "localhost", + "cluster": "nocluster", + }, + "meta": map[string]interface{}{ + "unit": "invalid", + "source": "unknown", + }, + "fields": map[string]interface{}{ + "value": 0, + "event": "", + "control": "", + "log": "", + }, + "field": map[string]interface{}{ + "value": 0, + "event": "", + "control": "", + "log": "", + }, + "timestamp": 1234567890, + "msg": lp2.EmptyMessage(), + "message": lp2.EmptyMessage(), +} + +func addBaseEnvWalker(values map[string]interface{}) map[string]interface{} { + out := make(map[string]interface{}) + for k, v := range values { + switch value := v.(type) { + case int, int32, int64, uint, uint32, uint64, string, float32, float64: + out[k] = value + case map[string]interface{}: + if _, ok := baseenv[k]; !ok { + out[k] = addBaseEnvWalker(value) + } + } + } + return out +} + +func (mp *messageProcessor) AddBaseEnv(env map[string]interface{}) error { + for k, v := range env { + switch value := v.(type) { + case int, int32, int64, uint, uint32, uint64, string, float32, float64: + baseenv[k] = value + case map[string]interface{}: + if _, ok := baseenv[k]; !ok { + baseenv[k] = addBaseEnvWalker(value) + } + } + } + return nil +} + +func (mp *messageProcessor) init() error { + mp.stages = make([]string, 0) + mp.mapping = make(map[string]*vm.Program) + mp.dropMessages = make(map[string]struct{}) + mp.dropTypes = make(map[string]struct{}) + mp.dropMessagesIf = make(map[*vm.Program]struct{}) + mp.renameMessages = make(map[string]string) + mp.renameMessagesIf = make(map[*vm.Program]string) + mp.changeUnitPrefix = make(map[*vm.Program]string) + mp.addTagsIf = make(map[*vm.Program]messageProcessorTagConfig) + mp.addMetaIf = make(map[*vm.Program]messageProcessorTagConfig) + mp.addFieldIf = make(map[*vm.Program]messageProcessorTagConfig) + mp.deleteTagsIf = make(map[*vm.Program]messageProcessorTagConfig) + mp.deleteMetaIf = make(map[*vm.Program]messageProcessorTagConfig) + mp.deleteFieldIf = make(map[*vm.Program]messageProcessorTagConfig) + mp.moveFieldToMeta = make(map[*vm.Program]messageProcessorTagConfig) + mp.moveFieldToTag = make(map[*vm.Program]messageProcessorTagConfig) + mp.moveMetaToField = make(map[*vm.Program]messageProcessorTagConfig) + mp.moveMetaToTag = make(map[*vm.Program]messageProcessorTagConfig) + mp.moveTagToField = make(map[*vm.Program]messageProcessorTagConfig) + mp.moveTagToMeta = make(map[*vm.Program]messageProcessorTagConfig) + mp.normalizeUnits = false + return nil +} + +func (mp *messageProcessor) AddDropMessagesByName(name string) error { + mp.mutex.Lock() + if _, ok := mp.dropMessages[name]; !ok { + mp.dropMessages[name] = struct{}{} + } + mp.mutex.Unlock() + return nil +} + +func (mp *messageProcessor) RemoveDropMessagesByName(name string) { + mp.mutex.Lock() + delete(mp.dropMessages, name) + mp.mutex.Unlock() +} + +func (mp *messageProcessor) AddDropMessagesByType(typestring string) error { + valid := []string{"metric", "event", "control", "log"} + isValid := false + for _, t := range valid { + if t == typestring { + isValid = true + break + } + } + if isValid { + mp.mutex.Lock() + if _, ok := mp.dropTypes[typestring]; !ok { + cclog.ComponentDebug("MessageProcessor", "Adding type", typestring, "for dropping") + mp.dropTypes[typestring] = struct{}{} + } + mp.mutex.Unlock() + } else { + return fmt.Errorf("invalid message type %s", typestring) + } + return nil +} + +func (mp *messageProcessor) RemoveDropMessagesByType(typestring string) { + mp.mutex.Lock() + delete(mp.dropTypes, typestring) + mp.mutex.Unlock() +} + +func (mp *messageProcessor) addTagConfig(condition, key, value string, config *map[*vm.Program]messageProcessorTagConfig) error { + var err error + evaluable, err := expr.Compile(sanitizeExprString(condition), expr.Env(baseenv), expr.AsBool()) + if err != nil { + return fmt.Errorf("failed to create condition evaluable of '%s': %v", condition, err.Error()) + } + mp.mutex.Lock() + if _, ok := (*config)[evaluable]; !ok { + mp.mapping[condition] = evaluable + (*config)[evaluable] = messageProcessorTagConfig{ + Condition: condition, + Key: key, + Value: value, + } + } + mp.mutex.Unlock() + return nil +} + +func (mp *messageProcessor) removeTagConfig(condition string, config *map[*vm.Program]messageProcessorTagConfig) { + mp.mutex.Lock() + if e, ok := mp.mapping[condition]; ok { + delete(mp.mapping, condition) + delete(*config, e) + } + mp.mutex.Unlock() +} + +func (mp *messageProcessor) AddAddTagsByCondition(condition, key, value string) error { + return mp.addTagConfig(condition, key, value, &mp.addTagsIf) +} + +func (mp *messageProcessor) RemoveAddTagsByCondition(condition string) { + mp.removeTagConfig(condition, &mp.addTagsIf) +} + +func (mp *messageProcessor) AddDeleteTagsByCondition(condition, key, value string) error { + return mp.addTagConfig(condition, key, value, &mp.deleteTagsIf) +} + +func (mp *messageProcessor) RemoveDeleteTagsByCondition(condition string) { + mp.removeTagConfig(condition, &mp.deleteTagsIf) +} + +func (mp *messageProcessor) AddAddMetaByCondition(condition, key, value string) error { + return mp.addTagConfig(condition, key, value, &mp.addMetaIf) +} + +func (mp *messageProcessor) RemoveAddMetaByCondition(condition string) { + mp.removeTagConfig(condition, &mp.addMetaIf) +} + +func (mp *messageProcessor) AddDeleteMetaByCondition(condition, key, value string) error { + return mp.addTagConfig(condition, key, value, &mp.deleteMetaIf) +} + +func (mp *messageProcessor) RemoveDeleteMetaByCondition(condition string) { + mp.removeTagConfig(condition, &mp.deleteMetaIf) +} + +func (mp *messageProcessor) AddAddFieldByCondition(condition, key, value string) error { + return mp.addTagConfig(condition, key, value, &mp.addFieldIf) +} + +func (mp *messageProcessor) RemoveAddFieldByCondition(condition string) { + mp.removeTagConfig(condition, &mp.addFieldIf) +} + +func (mp *messageProcessor) AddDeleteFieldByCondition(condition, key, value string) error { + return mp.addTagConfig(condition, key, value, &mp.deleteFieldIf) +} + +func (mp *messageProcessor) RemoveDeleteFieldByCondition(condition string) { + mp.removeTagConfig(condition, &mp.deleteFieldIf) +} + +func (mp *messageProcessor) AddDropMessagesByCondition(condition string) error { + + var err error + evaluable, err := expr.Compile(sanitizeExprString(condition), expr.Env(baseenv), expr.AsBool()) + if err != nil { + return fmt.Errorf("failed to create condition evaluable of '%s': %v", condition, err.Error()) + } + mp.mutex.Lock() + if _, ok := mp.dropMessagesIf[evaluable]; !ok { + mp.mapping[condition] = evaluable + mp.dropMessagesIf[evaluable] = struct{}{} + } + mp.mutex.Unlock() + return nil +} + +func (mp *messageProcessor) RemoveDropMessagesByCondition(condition string) { + mp.mutex.Lock() + if e, ok := mp.mapping[condition]; ok { + delete(mp.mapping, condition) + delete(mp.dropMessagesIf, e) + } + mp.mutex.Unlock() +} + +func (mp *messageProcessor) AddRenameMetricByCondition(condition string, name string) error { + + var err error + evaluable, err := expr.Compile(sanitizeExprString(condition), expr.Env(baseenv), expr.AsBool()) + if err != nil { + return fmt.Errorf("failed to create condition evaluable of '%s': %v", condition, err.Error()) + } + mp.mutex.Lock() + if _, ok := mp.renameMessagesIf[evaluable]; !ok { + mp.mapping[condition] = evaluable + mp.renameMessagesIf[evaluable] = name + } else { + mp.renameMessagesIf[evaluable] = name + } + mp.mutex.Unlock() + return nil +} + +func (mp *messageProcessor) RemoveRenameMetricByCondition(condition string) { + mp.mutex.Lock() + if e, ok := mp.mapping[condition]; ok { + delete(mp.mapping, condition) + delete(mp.renameMessagesIf, e) + } + mp.mutex.Unlock() +} + +func (mp *messageProcessor) SetNormalizeUnits(setting bool) { + mp.normalizeUnits = setting +} + +func (mp *messageProcessor) AddChangeUnitPrefix(condition string, prefix string) error { + + var err error + evaluable, err := expr.Compile(sanitizeExprString(condition), expr.Env(baseenv), expr.AsBool()) + if err != nil { + return fmt.Errorf("failed to create condition evaluable of '%s': %v", condition, err.Error()) + } + mp.mutex.Lock() + if _, ok := mp.changeUnitPrefix[evaluable]; !ok { + mp.mapping[condition] = evaluable + mp.changeUnitPrefix[evaluable] = prefix + } else { + mp.changeUnitPrefix[evaluable] = prefix + } + mp.mutex.Unlock() + return nil +} + +func (mp *messageProcessor) RemoveChangeUnitPrefix(condition string) { + mp.mutex.Lock() + if e, ok := mp.mapping[condition]; ok { + delete(mp.mapping, condition) + delete(mp.changeUnitPrefix, e) + } + mp.mutex.Unlock() +} + +func (mp *messageProcessor) AddRenameMetricByName(from, to string) error { + mp.mutex.Lock() + if _, ok := mp.renameMessages[from]; !ok { + mp.renameMessages[from] = to + } + mp.mutex.Unlock() + return nil +} + +func (mp *messageProcessor) RemoveRenameMetricByName(from string) { + mp.mutex.Lock() + delete(mp.renameMessages, from) + mp.mutex.Unlock() +} + +func (mp *messageProcessor) AddMoveTagToMeta(condition, key, value string) error { + return mp.addTagConfig(condition, key, value, &mp.moveTagToMeta) +} + +func (mp *messageProcessor) RemoveMoveTagToMeta(condition string) { + mp.removeTagConfig(condition, &mp.moveTagToMeta) +} + +func (mp *messageProcessor) AddMoveTagToFields(condition, key, value string) error { + return mp.addTagConfig(condition, key, value, &mp.moveTagToField) +} + +func (mp *messageProcessor) RemoveMoveTagToFields(condition string) { + mp.removeTagConfig(condition, &mp.moveTagToField) +} + +func (mp *messageProcessor) AddMoveMetaToTags(condition, key, value string) error { + return mp.addTagConfig(condition, key, value, &mp.moveMetaToTag) +} + +func (mp *messageProcessor) RemoveMoveMetaToTags(condition string) { + mp.removeTagConfig(condition, &mp.moveMetaToTag) +} + +func (mp *messageProcessor) AddMoveMetaToFields(condition, key, value string) error { + return mp.addTagConfig(condition, key, value, &mp.moveMetaToField) +} + +func (mp *messageProcessor) RemoveMoveMetaToFields(condition string) { + mp.removeTagConfig(condition, &mp.moveMetaToField) +} + +func (mp *messageProcessor) AddMoveFieldToTags(condition, key, value string) error { + return mp.addTagConfig(condition, key, value, &mp.moveFieldToTag) +} + +func (mp *messageProcessor) RemoveMoveFieldToTags(condition string) { + mp.removeTagConfig(condition, &mp.moveFieldToTag) +} + +func (mp *messageProcessor) AddMoveFieldToMeta(condition, key, value string) error { + return mp.addTagConfig(condition, key, value, &mp.moveFieldToMeta) +} + +func (mp *messageProcessor) RemoveMoveFieldToMeta(condition string) { + mp.removeTagConfig(condition, &mp.moveFieldToMeta) +} + +func (mp *messageProcessor) SetStages(stages []string) error { + newstages := make([]string, 0) + if len(stages) == 0 { + mp.mutex.Lock() + mp.stages = newstages + mp.mutex.Unlock() + return nil + } + for i, s := range stages { + valid := false + for _, v := range StageNames { + if s == v { + valid = true + } + } + if valid { + newstages = append(newstages, s) + } else { + return fmt.Errorf("invalid stage %s at index %d", s, i) + } + } + mp.mutex.Lock() + mp.stages = newstages + mp.mutex.Unlock() + return nil +} + +func (mp *messageProcessor) DefaultStages() []string { + return StageNames +} + +func (mp *messageProcessor) FromConfigJSON(config json.RawMessage) error { + var c messageProcessorConfig + + err := json.Unmarshal(config, &c) + if err != nil { + return fmt.Errorf("failed to process config JSON: %v", err.Error()) + } + + if len(c.StageOrder) > 0 { + err = mp.SetStages(c.StageOrder) + if err != nil { + return fmt.Errorf("failed to process config JSON: %v", err.Error()) + } + } else { + err = mp.SetStages(mp.DefaultStages()) + if err != nil { + return fmt.Errorf("failed to process config JSON: %v", err.Error()) + } + } + + for _, m := range c.DropMessages { + err = mp.AddDropMessagesByName(m) + if err != nil { + return fmt.Errorf("failed to process config JSON: %v", err.Error()) + } + } + for _, m := range c.DropByType { + err = mp.AddDropMessagesByType(m) + if err != nil { + return fmt.Errorf("failed to process config JSON: %v", err.Error()) + } + } + for _, m := range c.DropMessagesIf { + err = mp.AddDropMessagesByCondition(m) + if err != nil { + return fmt.Errorf("failed to process config JSON: %v", err.Error()) + } + } + for k, v := range c.RenameMessagesIf { + err = mp.AddRenameMetricByCondition(k, v) + if err != nil { + return fmt.Errorf("failed to process config JSON: %v", err.Error()) + } + } + for k, v := range c.RenameMessages { + err = mp.AddRenameMetricByName(k, v) + if err != nil { + return fmt.Errorf("failed to process config JSON: %v", err.Error()) + } + } + for k, v := range c.ChangeUnitPrefix { + err = mp.AddChangeUnitPrefix(k, v) + if err != nil { + return fmt.Errorf("failed to process config JSON: %v", err.Error()) + } + } + for _, c := range c.AddTagsIf { + err = mp.AddAddTagsByCondition(c.Condition, c.Key, c.Value) + if err != nil { + return fmt.Errorf("failed to process config JSON: %v", err.Error()) + } + } + for _, c := range c.AddMetaIf { + err = mp.AddAddMetaByCondition(c.Condition, c.Key, c.Value) + if err != nil { + return fmt.Errorf("failed to process config JSON: %v", err.Error()) + } + } + for _, c := range c.AddFieldIf { + err = mp.AddAddFieldByCondition(c.Condition, c.Key, c.Value) + if err != nil { + return fmt.Errorf("failed to process config JSON: %v", err.Error()) + } + } + for _, c := range c.DelTagsIf { + err = mp.AddDeleteTagsByCondition(c.Condition, c.Key, c.Value) + if err != nil { + return fmt.Errorf("failed to process config JSON: %v", err.Error()) + } + } + for _, c := range c.DelMetaIf { + err = mp.AddDeleteMetaByCondition(c.Condition, c.Key, c.Value) + if err != nil { + return fmt.Errorf("failed to process config JSON: %v", err.Error()) + } + } + for _, c := range c.DelFieldIf { + err = mp.AddDeleteFieldByCondition(c.Condition, c.Key, c.Value) + if err != nil { + return fmt.Errorf("failed to process config JSON: %v", err.Error()) + } + } + for _, c := range c.MoveTagToMeta { + err = mp.AddMoveTagToMeta(c.Condition, c.Key, c.Value) + if err != nil { + return fmt.Errorf("failed to process config JSON: %v", err.Error()) + } + } + for _, c := range c.MoveTagToField { + err = mp.AddMoveTagToFields(c.Condition, c.Key, c.Value) + if err != nil { + return fmt.Errorf("failed to process config JSON: %v", err.Error()) + } + } + for _, c := range c.MoveMetaToTag { + err = mp.AddMoveMetaToTags(c.Condition, c.Key, c.Value) + if err != nil { + return fmt.Errorf("failed to process config JSON: %v", err.Error()) + } + } + for _, c := range c.MoveMetaToField { + err = mp.AddMoveMetaToFields(c.Condition, c.Key, c.Value) + if err != nil { + return fmt.Errorf("failed to process config JSON: %v", err.Error()) + } + } + for _, c := range c.MoveFieldToTag { + err = mp.AddMoveFieldToTags(c.Condition, c.Key, c.Value) + if err != nil { + return fmt.Errorf("failed to process config JSON: %v", err.Error()) + } + } + for _, c := range c.MoveFieldToMeta { + err = mp.AddMoveFieldToMeta(c.Condition, c.Key, c.Value) + if err != nil { + return fmt.Errorf("failed to process config JSON: %v", err.Error()) + } + } + for _, m := range c.DropByType { + err = mp.AddDropMessagesByType(m) + if err != nil { + return fmt.Errorf("failed to process config JSON: %v", err.Error()) + } + } + if len(c.AddBaseEnv) > 0 { + err = mp.AddBaseEnv(c.AddBaseEnv) + if err != nil { + return fmt.Errorf("failed to process config JSON: %v", err.Error()) + } + } + mp.SetNormalizeUnits(c.NormalizeUnits) + return nil +} + +func (mp *messageProcessor) ProcessMetric(metric lp.CCMetric) (bool, error) { + m, err := lp2.NewMessage( + metric.Name(), + metric.Tags(), + metric.Meta(), + metric.Fields(), + metric.Time(), + ) + if err != nil { + return true, fmt.Errorf("failed to parse metric to message: %v", err.Error()) + } + return mp.ProcessMessage(m) + +} + +func (mp *messageProcessor) ProcessMessage(m lp2.CCMessage) (bool, error) { + var drop bool = false + var err error = nil + name := m.Name() + + if len(mp.stages) == 0 { + mp.SetStages(mp.DefaultStages()) + } + + mp.mutex.RLock() + defer mp.mutex.RUnlock() + + params := getParamMap(m) + + defer func() { + params["field"] = nil + params["tag"] = nil + paramMapPool.Put(params["fields"]) + paramMapPool.Put(params["tags"]) + paramMapPool.Put(params["meta"]) + paramMapPool.Put(params) + }() + + for _, s := range mp.stages { + switch s { + case STAGENAME_DROP_BY_NAME: + if len(mp.dropMessages) > 0 { + cclog.ComponentDebug("MessageProcessor", "Dropping by message name ", name) + if _, ok := mp.dropMessages[name]; ok { + cclog.ComponentDebug("MessageProcessor", "Drop") + return true, nil + } + } + case STAGENAME_DROP_BY_TYPE: + if len(mp.dropTypes) > 0 { + cclog.ComponentDebug("MessageProcessor", "Dropping by message type") + if _, ok := mp.dropTypes[params["messagetype"].(string)]; ok { + cclog.ComponentDebug("MessageProcessor", "Drop") + return true, nil + } + } + case STAGENAME_DROP_IF: + if len(mp.dropMessagesIf) > 0 { + cclog.ComponentDebug("MessageProcessor", "Dropping by condition") + drop, err = dropMessagesIf(¶ms, &mp.dropMessagesIf) + if err != nil { + return drop, fmt.Errorf("failed to evaluate: %v", err.Error()) + } + if drop { + cclog.ComponentDebug("MessageProcessor", "Drop") + } + } + case STAGENAME_RENAME_BY_NAME: + if len(mp.renameMessages) > 0 { + cclog.ComponentDebug("MessageProcessor", "Renaming by name match") + if newname, ok := mp.renameMessages[name]; ok { + cclog.ComponentDebug("MessageProcessor", "Rename to", newname) + m.SetName(newname) + cclog.ComponentDebug("MessageProcessor", "Add old name as 'oldname' to meta", name) + m.AddMeta("oldname", name) + } + } + case STAGENAME_RENAME_IF: + if len(mp.renameMessagesIf) > 0 { + cclog.ComponentDebug("MessageProcessor", "Renaming by condition") + _, err := renameMessagesIf(m, ¶ms, &mp.renameMessagesIf) + if err != nil { + return drop, fmt.Errorf("failed to evaluate: %v", err.Error()) + } + } + case STAGENAME_ADD_TAG: + if len(mp.addTagsIf) > 0 { + cclog.ComponentDebug("MessageProcessor", "Adding tags") + _, err = addTagIf(m, ¶ms, &mp.addTagsIf) + if err != nil { + return drop, fmt.Errorf("failed to evaluate: %v", err.Error()) + } + } + case STAGENAME_DELETE_TAG: + if len(mp.deleteTagsIf) > 0 { + cclog.ComponentDebug("MessageProcessor", "Delete tags") + _, err = deleteTagIf(m, ¶ms, &mp.deleteTagsIf) + if err != nil { + return drop, fmt.Errorf("failed to evaluate: %v", err.Error()) + } + } + case STAGENAME_ADD_META: + if len(mp.addMetaIf) > 0 { + cclog.ComponentDebug("MessageProcessor", "Adding meta information") + _, err = addMetaIf(m, ¶ms, &mp.addMetaIf) + if err != nil { + return drop, fmt.Errorf("failed to evaluate: %v", err.Error()) + } + } + case STAGENAME_DELETE_META: + if len(mp.deleteMetaIf) > 0 { + cclog.ComponentDebug("MessageProcessor", "Delete meta information") + _, err = deleteMetaIf(m, ¶ms, &mp.deleteMetaIf) + if err != nil { + return drop, fmt.Errorf("failed to evaluate: %v", err.Error()) + } + } + case STAGENAME_ADD_FIELD: + if len(mp.addFieldIf) > 0 { + cclog.ComponentDebug("MessageProcessor", "Adding fields") + _, err = addFieldIf(m, ¶ms, &mp.addFieldIf) + if err != nil { + return drop, fmt.Errorf("failed to evaluate: %v", err.Error()) + } + } + case STAGENAME_DELETE_FIELD: + if len(mp.deleteFieldIf) > 0 { + cclog.ComponentDebug("MessageProcessor", "Delete fields") + _, err = deleteFieldIf(m, ¶ms, &mp.deleteFieldIf) + if err != nil { + return drop, fmt.Errorf("failed to evaluate: %v", err.Error()) + } + } + case STAGENAME_MOVE_TAG_META: + if len(mp.moveTagToMeta) > 0 { + cclog.ComponentDebug("MessageProcessor", "Move tag to meta") + _, err := moveTagToMeta(m, ¶ms, &mp.moveTagToMeta) + if err != nil { + return drop, fmt.Errorf("failed to evaluate: %v", err.Error()) + } + } + case STAGENAME_MOVE_TAG_FIELD: + if len(mp.moveTagToField) > 0 { + cclog.ComponentDebug("MessageProcessor", "Move tag to fields") + _, err := moveTagToField(m, ¶ms, &mp.moveTagToField) + if err != nil { + return drop, fmt.Errorf("failed to evaluate: %v", err.Error()) + } + } + case STAGENAME_MOVE_META_TAG: + if len(mp.moveMetaToTag) > 0 { + cclog.ComponentDebug("MessageProcessor", "Move meta to tags") + _, err := moveMetaToTag(m, ¶ms, &mp.moveMetaToTag) + if err != nil { + return drop, fmt.Errorf("failed to evaluate: %v", err.Error()) + } + } + case STAGENAME_MOVE_META_FIELD: + if len(mp.moveMetaToField) > 0 { + cclog.ComponentDebug("MessageProcessor", "Move meta to fields") + _, err := moveMetaToField(m, ¶ms, &mp.moveMetaToField) + if err != nil { + return drop, fmt.Errorf("failed to evaluate: %v", err.Error()) + } + } + case STAGENAME_MOVE_FIELD_META: + if len(mp.moveFieldToMeta) > 0 { + cclog.ComponentDebug("MessageProcessor", "Move field to meta") + _, err := moveFieldToMeta(m, ¶ms, &mp.moveFieldToMeta) + if err != nil { + return drop, fmt.Errorf("failed to evaluate: %v", err.Error()) + } + } + case STAGENAME_MOVE_FIELD_TAG: + if len(mp.moveFieldToTag) > 0 { + cclog.ComponentDebug("MessageProcessor", "Move field to tags") + _, err := moveFieldToTag(m, ¶ms, &mp.moveFieldToTag) + if err != nil { + return drop, fmt.Errorf("failed to evaluate: %v", err.Error()) + } + } + case STAGENAME_NORMALIZE_UNIT: + if mp.normalizeUnits { + cclog.ComponentDebug("MessageProcessor", "Normalize units") + if lp2.IsMetric(m) { + _, err := normalizeUnits(m) + if err != nil { + return drop, fmt.Errorf("failed to evaluate: %v", err.Error()) + } + } else { + cclog.ComponentDebug("MessageProcessor", "skipped, no metric") + } + } + + case STAGENAME_CHANGE_UNIT_PREFIX: + if len(mp.changeUnitPrefix) > 0 { + cclog.ComponentDebug("MessageProcessor", "Change unit prefix") + if lp2.IsMetric(m) { + _, err := changeUnitPrefix(m, ¶ms, &mp.changeUnitPrefix) + if err != nil { + return drop, fmt.Errorf("failed to evaluate: %v", err.Error()) + } + } else { + cclog.ComponentDebug("MessageProcessor", "skipped, no metric") + } + } + } + + } + + return drop, nil +} + +// Get a new instace of a message processor. +func NewMessageProcessor() (MessageProcessor, error) { + mp := new(messageProcessor) + err := mp.init() + if err != nil { + err := fmt.Errorf("failed to create MessageProcessor: %v", err.Error()) + cclog.ComponentError("MessageProcessor", err.Error()) + return nil, err + } + return mp, nil +} diff --git a/pkg/messageProcessor/messageProcessorFuncs.go b/pkg/messageProcessor/messageProcessorFuncs.go new file mode 100644 index 0000000..23c261e --- /dev/null +++ b/pkg/messageProcessor/messageProcessorFuncs.go @@ -0,0 +1,265 @@ +package messageprocessor + +import ( + "errors" + "fmt" + + lp2 "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message" + cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" + units "github.com/ClusterCockpit/cc-units" + "github.com/expr-lang/expr" + "github.com/expr-lang/expr/vm" +) + +type MessageLocation int + +const ( + MESSAGE_LOCATION_TAGS MessageLocation = iota + MESSAGE_LOCATION_META + MESSAGE_LOCATION_FIELDS +) + +// Abstract function to move entries from one location to another +func moveInMessage(message lp2.CCMessage, params *map[string]interface{}, checks *map[*vm.Program]messageProcessorTagConfig, from, to MessageLocation) (bool, error) { + for d, data := range *checks { + value, err := expr.Run(d, *params) + if err != nil { + return false, fmt.Errorf("failed to evaluate: %v", err.Error()) + } + cclog.ComponentDebug("MessageProcessor", "Move from", from, "to", to) + if value.(bool) { + var v string + var ok bool = false + switch from { + case MESSAGE_LOCATION_TAGS: + cclog.ComponentDebug("MessageProcessor", "Getting tag key", data.Key) + v, ok = message.GetTag(data.Key) + case MESSAGE_LOCATION_META: + cclog.ComponentDebug("MessageProcessor", "Getting meta key", data.Key) + cclog.ComponentDebug("MessageProcessor", message.Meta()) + v, ok = message.GetMeta(data.Key) + case MESSAGE_LOCATION_FIELDS: + var x interface{} + cclog.ComponentDebug("MessageProcessor", "Getting field key", data.Key) + x, ok = message.GetField(data.Key) + v = fmt.Sprintf("%v", x) + } + if ok { + switch from { + case MESSAGE_LOCATION_TAGS: + cclog.ComponentDebug("MessageProcessor", "Removing tag key", data.Key) + message.RemoveTag(data.Key) + case MESSAGE_LOCATION_META: + cclog.ComponentDebug("MessageProcessor", "Removing meta key", data.Key) + message.RemoveMeta(data.Key) + case MESSAGE_LOCATION_FIELDS: + cclog.ComponentDebug("MessageProcessor", "Removing field key", data.Key) + message.RemoveField(data.Key) + } + switch to { + case MESSAGE_LOCATION_TAGS: + cclog.ComponentDebug("MessageProcessor", "Adding tag", data.Value, "->", v) + message.AddTag(data.Value, v) + case MESSAGE_LOCATION_META: + cclog.ComponentDebug("MessageProcessor", "Adding meta", data.Value, "->", v) + message.AddMeta(data.Value, v) + case MESSAGE_LOCATION_FIELDS: + cclog.ComponentDebug("MessageProcessor", "Adding field", data.Value, "->", v) + message.AddField(data.Value, v) + } + } else { + return false, fmt.Errorf("failed to get message entry: %s", data.Key) + } + } + } + return false, nil +} + +func deleteIf(message lp2.CCMessage, params *map[string]interface{}, checks *map[*vm.Program]messageProcessorTagConfig, location MessageLocation) (bool, error) { + for d, data := range *checks { + value, err := expr.Run(d, *params) + if err != nil { + return true, fmt.Errorf("failed to evaluate: %v", err.Error()) + } + if value.(bool) { + switch location { + case MESSAGE_LOCATION_FIELDS: + switch data.Key { + case "value", "event", "log", "control": + return false, errors.New("cannot delete protected fields") + default: + cclog.ComponentDebug("MessageProcessor", "Removing field for", data.Key) + message.RemoveField(data.Key) + } + case MESSAGE_LOCATION_TAGS: + cclog.ComponentDebug("MessageProcessor", "Removing tag for", data.Key) + message.RemoveTag(data.Key) + case MESSAGE_LOCATION_META: + cclog.ComponentDebug("MessageProcessor", "Removing meta for", data.Key) + message.RemoveMeta(data.Key) + } + } + } + return false, nil +} + +func addIf(message lp2.CCMessage, params *map[string]interface{}, checks *map[*vm.Program]messageProcessorTagConfig, location MessageLocation) (bool, error) { + for d, data := range *checks { + value, err := expr.Run(d, *params) + if err != nil { + return true, fmt.Errorf("failed to evaluate: %v", err.Error()) + } + if value.(bool) { + switch location { + case MESSAGE_LOCATION_FIELDS: + cclog.ComponentDebug("MessageProcessor", "Adding field", data.Value, "->", data.Value) + message.AddField(data.Key, data.Value) + case MESSAGE_LOCATION_TAGS: + cclog.ComponentDebug("MessageProcessor", "Adding tag", data.Value, "->", data.Value) + message.AddTag(data.Key, data.Value) + case MESSAGE_LOCATION_META: + cclog.ComponentDebug("MessageProcessor", "Adding meta", data.Value, "->", data.Value) + message.AddMeta(data.Key, data.Value) + } + } + } + return false, nil +} + +func deleteTagIf(message lp2.CCMessage, params *map[string]interface{}, checks *map[*vm.Program]messageProcessorTagConfig) (bool, error) { + return deleteIf(message, params, checks, MESSAGE_LOCATION_TAGS) +} + +func addTagIf(message lp2.CCMessage, params *map[string]interface{}, checks *map[*vm.Program]messageProcessorTagConfig) (bool, error) { + return addIf(message, params, checks, MESSAGE_LOCATION_TAGS) +} + +func moveTagToMeta(message lp2.CCMessage, params *map[string]interface{}, checks *map[*vm.Program]messageProcessorTagConfig) (bool, error) { + return moveInMessage(message, params, checks, MESSAGE_LOCATION_TAGS, MESSAGE_LOCATION_META) +} + +func moveTagToField(message lp2.CCMessage, params *map[string]interface{}, checks *map[*vm.Program]messageProcessorTagConfig) (bool, error) { + return moveInMessage(message, params, checks, MESSAGE_LOCATION_TAGS, MESSAGE_LOCATION_FIELDS) +} + +func deleteMetaIf(message lp2.CCMessage, params *map[string]interface{}, checks *map[*vm.Program]messageProcessorTagConfig) (bool, error) { + return deleteIf(message, params, checks, MESSAGE_LOCATION_META) +} + +func addMetaIf(message lp2.CCMessage, params *map[string]interface{}, checks *map[*vm.Program]messageProcessorTagConfig) (bool, error) { + return addIf(message, params, checks, MESSAGE_LOCATION_META) +} + +func moveMetaToTag(message lp2.CCMessage, params *map[string]interface{}, checks *map[*vm.Program]messageProcessorTagConfig) (bool, error) { + return moveInMessage(message, params, checks, MESSAGE_LOCATION_META, MESSAGE_LOCATION_TAGS) +} + +func moveMetaToField(message lp2.CCMessage, params *map[string]interface{}, checks *map[*vm.Program]messageProcessorTagConfig) (bool, error) { + return moveInMessage(message, params, checks, MESSAGE_LOCATION_META, MESSAGE_LOCATION_FIELDS) +} + +func deleteFieldIf(message lp2.CCMessage, params *map[string]interface{}, checks *map[*vm.Program]messageProcessorTagConfig) (bool, error) { + return deleteIf(message, params, checks, MESSAGE_LOCATION_FIELDS) +} + +func addFieldIf(message lp2.CCMessage, params *map[string]interface{}, checks *map[*vm.Program]messageProcessorTagConfig) (bool, error) { + return addIf(message, params, checks, MESSAGE_LOCATION_FIELDS) +} + +func moveFieldToTag(message lp2.CCMessage, params *map[string]interface{}, checks *map[*vm.Program]messageProcessorTagConfig) (bool, error) { + return moveInMessage(message, params, checks, MESSAGE_LOCATION_FIELDS, MESSAGE_LOCATION_TAGS) +} + +func moveFieldToMeta(message lp2.CCMessage, params *map[string]interface{}, checks *map[*vm.Program]messageProcessorTagConfig) (bool, error) { + return moveInMessage(message, params, checks, MESSAGE_LOCATION_FIELDS, MESSAGE_LOCATION_META) +} + +func dropMessagesIf(params *map[string]interface{}, checks *map[*vm.Program]struct{}) (bool, error) { + for d := range *checks { + value, err := expr.Run(d, *params) + if err != nil { + return false, fmt.Errorf("failed to evaluate: %v", err.Error()) + } + if value.(bool) { + return true, nil + } + } + return false, nil +} + +func normalizeUnits(message lp2.CCMessage) (bool, error) { + if in_unit, ok := message.GetMeta("unit"); ok { + u := units.NewUnit(in_unit) + if u.Valid() { + cclog.ComponentDebug("MessageProcessor", "Update unit with", u.Short()) + message.AddMeta("unit", u.Short()) + } + } else if in_unit, ok := message.GetTag("unit"); ok { + u := units.NewUnit(in_unit) + if u.Valid() { + cclog.ComponentDebug("MessageProcessor", "Update unit with", u.Short()) + message.AddTag("unit", u.Short()) + } + } + return false, nil +} + +func changeUnitPrefix(message lp2.CCMessage, params *map[string]interface{}, checks *map[*vm.Program]string) (bool, error) { + for r, n := range *checks { + value, err := expr.Run(r, *params) + if err != nil { + return false, fmt.Errorf("failed to evaluate: %v", err.Error()) + } + if value.(bool) { + newPrefix := units.NewPrefix(n) + cclog.ComponentDebug("MessageProcessor", "Condition matches, change to prefix", newPrefix.String()) + if in_unit, ok := message.GetMeta("unit"); ok && newPrefix != units.InvalidPrefix { + u := units.NewUnit(in_unit) + if u.Valid() { + cclog.ComponentDebug("MessageProcessor", "Input unit", u.Short()) + conv, out_unit := units.GetUnitPrefixFactor(u, newPrefix) + if conv != nil && out_unit.Valid() { + if val, ok := message.GetField("value"); ok { + cclog.ComponentDebug("MessageProcessor", "Update unit with", out_unit.Short()) + message.AddField("value", conv(val)) + message.AddMeta("unit", out_unit.Short()) + } + } + } + + } else if in_unit, ok := message.GetTag("unit"); ok && newPrefix != units.InvalidPrefix { + u := units.NewUnit(in_unit) + if u.Valid() { + cclog.ComponentDebug("MessageProcessor", "Input unit", u.Short()) + conv, out_unit := units.GetUnitPrefixFactor(u, newPrefix) + if conv != nil && out_unit.Valid() { + if val, ok := message.GetField("value"); ok { + cclog.ComponentDebug("MessageProcessor", "Update unit with", out_unit.Short()) + message.AddField("value", conv(val)) + message.AddTag("unit", out_unit.Short()) + } + } + } + + } + } + } + return false, nil +} + +func renameMessagesIf(message lp2.CCMessage, params *map[string]interface{}, checks *map[*vm.Program]string) (bool, error) { + for d, n := range *checks { + value, err := expr.Run(d, *params) + if err != nil { + return true, fmt.Errorf("failed to evaluate: %v", err.Error()) + } + if value.(bool) { + old := message.Name() + cclog.ComponentDebug("MessageProcessor", "Rename to", n) + message.SetName(n) + cclog.ComponentDebug("MessageProcessor", "Add old name as 'oldname' to meta", old) + message.AddMeta("oldname", old) + } + } + return false, nil +} diff --git a/pkg/messageProcessor/messageProcessor_test.go b/pkg/messageProcessor/messageProcessor_test.go new file mode 100644 index 0000000..b04e932 --- /dev/null +++ b/pkg/messageProcessor/messageProcessor_test.go @@ -0,0 +1,387 @@ +package messageprocessor + +import ( + "encoding/json" + "errors" + "fmt" + "testing" + "time" + + lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message" + cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" +) + +func generate_message_lists(num_lists, num_entries int) ([][]lp.CCMessage, error) { + mlist := make([][]lp.CCMessage, 0) + for j := 0; j < num_lists; j++ { + out := make([]lp.CCMessage, 0) + for i := 0; i < num_entries; i++ { + var x lp.CCMessage + var err error = nil + switch { + case i%4 == 0: + x, err = lp.NewEvent("myevent", map[string]string{"type": "socket", "type-id": "0"}, map[string]string{}, "nothing happend", time.Now()) + case i%4 == 1: + x, err = lp.NewMetric("mymetric", map[string]string{"type": "socket", "type-id": "0"}, map[string]string{"unit": "kByte"}, 12.145, time.Now()) + case i%4 == 2: + x, err = lp.NewLog("mylog", map[string]string{"type": "socket", "type-id": "0"}, map[string]string{}, "disk status: OK", time.Now()) + case i%4 == 3: + x, err = lp.NewGetControl("mycontrol", map[string]string{"type": "socket", "type-id": "0"}, map[string]string{}, time.Now()) + } + if err == nil { + x.AddTag("hostname", "myhost") + out = append(out, x) + } else { + return nil, errors.New("failed to create message") + } + } + mlist = append(mlist, out) + } + return mlist, nil +} + +func TestNewMessageProcessor(t *testing.T) { + _, err := NewMessageProcessor() + if err != nil { + t.Error(err.Error()) + } +} + +type Configs struct { + name string + config json.RawMessage + drop bool + errors bool + pre func(msg lp.CCMessage) error + check func(msg lp.CCMessage) error +} + +var test_configs = []Configs{ + { + name: "single_dropif_nomatch", + config: json.RawMessage(`{"drop_messages_if": [ "name == 'testname' && tags.type == 'socket' && tags.typeid % 2 == 1"]}`), + }, + { + name: "drop_by_name", + config: json.RawMessage(`{"drop_messages": [ "net_bytes_in"]}`), + drop: true, + }, + { + name: "drop_by_type_match", + config: json.RawMessage(`{"drop_by_message_type": [ "metric"]}`), + drop: true, + }, + { + name: "drop_by_type_nomatch", + config: json.RawMessage(`{"drop_by_message_type": [ "event"]}`), + }, + { + name: "single_dropif_match", + config: json.RawMessage(`{"drop_messages_if": [ "name == 'net_bytes_in' && tags.type == 'node'"]}`), + drop: true, + }, + { + name: "double_dropif_match_nomatch", + config: json.RawMessage(`{"drop_messages_if": [ "name == 'net_bytes_in' && tags.type == 'node'", "name == 'testname' && tags.type == 'socket' && tags.typeid % 2 == 1"]}`), + drop: true, + }, + { + name: "rename_simple", + config: json.RawMessage(`{"rename_messages": { "net_bytes_in" : "net_bytes_out", "rapl_power": "cpu_power"}}`), + check: func(msg lp.CCMessage) error { + if msg.Name() != "net_bytes_out" { + return errors.New("expected name net_bytes_out but still have net_bytes_in") + } + return nil + }, + }, + { + name: "rename_match", + config: json.RawMessage(`{"rename_messages_if": { "name == 'net_bytes_in'" : "net_bytes_out", "name == 'rapl_power'": "cpu_power"}}`), + check: func(msg lp.CCMessage) error { + if msg.Name() != "net_bytes_out" { + return errors.New("expected name net_bytes_out but still have net_bytes_in") + } + return nil + }, + }, + { + name: "rename_nomatch", + config: json.RawMessage(`{"rename_messages_if": { "name == 'net_bytes_out'" : "net_bytes_in", "name == 'rapl_power'": "cpu_power"}}`), + check: func(msg lp.CCMessage) error { + if msg.Name() != "net_bytes_in" { + return errors.New("expected name net_bytes_in but still have net_bytes_out") + } + return nil + }, + }, + { + name: "add_tag", + config: json.RawMessage(`{"add_tags_if": [{"if": "name == 'net_bytes_in'", "key" : "cluster", "value" : "mycluster"}]}`), + check: func(msg lp.CCMessage) error { + if !msg.HasTag("cluster") { + return errors.New("expected new tag 'cluster' but not present") + } + return nil + }, + }, + { + name: "del_tag", + config: json.RawMessage(`{"delete_tags_if": [{"if": "name == 'net_bytes_in'", "key" : "type"}]}`), + check: func(msg lp.CCMessage) error { + if msg.HasTag("type") { + return errors.New("expected to have no 'type' but still present") + } + return nil + }, + }, + { + name: "add_meta", + config: json.RawMessage(`{"add_meta_if": [{"if": "name == 'net_bytes_in'", "key" : "source", "value" : "example"}]}`), + check: func(msg lp.CCMessage) error { + if !msg.HasMeta("source") { + return errors.New("expected new tag 'source' but not present") + } + return nil + }, + }, + { + name: "del_meta", + config: json.RawMessage(`{"delete_meta_if": [{"if": "name == 'net_bytes_in'", "key" : "unit"}]}`), + check: func(msg lp.CCMessage) error { + if msg.HasMeta("unit") { + return errors.New("expected to have no 'unit' but still present") + } + return nil + }, + }, + { + name: "add_field", + config: json.RawMessage(`{"add_fields_if": [{"if": "name == 'net_bytes_in'", "key" : "myfield", "value" : "example"}]}`), + check: func(msg lp.CCMessage) error { + if !msg.HasField("myfield") { + return errors.New("expected new tag 'source' but not present") + } + return nil + }, + }, + { + name: "delete_fields_if_protected", + config: json.RawMessage(`{"delete_fields_if": [{"if": "name == 'net_bytes_in'", "key" : "value"}]}`), + errors: true, + check: func(msg lp.CCMessage) error { + if !msg.HasField("value") { + return errors.New("expected to still have 'value' field because it is a protected field key") + } + return nil + }, + }, + { + name: "delete_fields_if_unprotected", + config: json.RawMessage(`{"delete_fields_if": [{"if": "name == 'net_bytes_in'", "key" : "testfield"}]}`), + check: func(msg lp.CCMessage) error { + if msg.HasField("testfield") { + return errors.New("expected to still have 'testfield' field but should be deleted") + } + return nil + }, + pre: func(msg lp.CCMessage) error { + msg.AddField("testfield", 4.123) + return nil + }, + }, + { + name: "single_change_prefix_match", + config: json.RawMessage(`{"change_unit_prefix": {"name == 'net_bytes_in' && tags.type == 'node'": "M"}}`), + check: func(msg lp.CCMessage) error { + if u, ok := msg.GetMeta("unit"); ok { + if u != "MB" { + return fmt.Errorf("expected unit MB but have %s", u) + } + } else if u, ok := msg.GetTag("unit"); ok { + if u != "MB" { + return fmt.Errorf("expected unit MB but have %s", u) + } + } + return nil + }, + }, + { + name: "normalize_units", + config: json.RawMessage(`{"normalize_units": true}`), + check: func(msg lp.CCMessage) error { + if u, ok := msg.GetMeta("unit"); ok { + if u != "B" { + return fmt.Errorf("expected unit B but have %s", u) + } + } else if u, ok := msg.GetTag("unit"); ok { + if u != "B" { + return fmt.Errorf("expected unit B but have %s", u) + } + } + return nil + }, + }, + { + name: "move_tag_to_meta", + config: json.RawMessage(`{"move_tag_to_meta_if": [{"if": "name == 'net_bytes_in'", "key" : "type-id", "value": "typeid"}]}`), + check: func(msg lp.CCMessage) error { + if msg.HasTag("type-id") || !msg.HasMeta("typeid") { + return errors.New("moving tag 'type-id' to meta 'typeid' failed") + } + return nil + }, + pre: func(msg lp.CCMessage) error { + msg.AddTag("type-id", "0") + return nil + }, + }, + { + name: "move_tag_to_field", + config: json.RawMessage(`{"move_tag_to_field_if": [{"if": "name == 'net_bytes_in'", "key" : "type-id", "value": "typeid"}]}`), + check: func(msg lp.CCMessage) error { + if msg.HasTag("type-id") || !msg.HasField("typeid") { + return errors.New("moving tag 'type-id' to field 'typeid' failed") + } + return nil + }, + pre: func(msg lp.CCMessage) error { + msg.AddTag("type-id", "0") + return nil + }, + }, + { + name: "move_meta_to_tag", + config: json.RawMessage(`{"move_meta_to_tag_if": [{"if": "name == 'net_bytes_in'", "key" : "unit", "value": "unit"}]}`), + check: func(msg lp.CCMessage) error { + if msg.HasMeta("unit") || !msg.HasTag("unit") { + return errors.New("moving meta 'unit' to tag 'unit' failed") + } + return nil + }, + }, + { + name: "move_meta_to_field", + config: json.RawMessage(`{"move_meta_to_field_if": [{"if": "name == 'net_bytes_in'", "key" : "unit", "value": "unit"}]}`), + check: func(msg lp.CCMessage) error { + if msg.HasMeta("unit") || !msg.HasField("unit") { + return errors.New("moving meta 'unit' to field 'unit' failed") + } + return nil + }, + }, + { + name: "move_field_to_tag", + config: json.RawMessage(`{"move_field_to_tag_if": [{"if": "name == 'net_bytes_in'", "key" : "myfield", "value": "field"}]}`), + check: func(msg lp.CCMessage) error { + if msg.HasField("myfield") || !msg.HasTag("field") { + return errors.New("moving meta 'myfield' to tag 'field' failed") + } + return nil + }, + pre: func(msg lp.CCMessage) error { + msg.AddField("myfield", 12) + return nil + }, + }, + { + name: "move_field_to_meta", + config: json.RawMessage(`{"move_field_to_meta_if": [{"if": "name == 'net_bytes_in'", "key" : "myfield", "value": "field"}]}`), + check: func(msg lp.CCMessage) error { + if msg.HasField("myfield") || !msg.HasMeta("field") { + return errors.New("moving meta 'myfield' to meta 'field' failed") + } + return nil + }, + pre: func(msg lp.CCMessage) error { + msg.AddField("myfield", 12) + return nil + }, + }, +} + +func TestConfigList(t *testing.T) { + for _, c := range test_configs { + t.Run(c.name, func(t *testing.T) { + m, err := lp.NewMetric("net_bytes_in", map[string]string{"type": "node", "type-id": "0"}, map[string]string{"unit": "Byte"}, float64(1024.0), time.Now()) + if err != nil { + t.Error(err.Error()) + return + } + if c.pre != nil { + if err = c.pre(m); err != nil { + t.Errorf("error running pre-test function: %v", err.Error()) + return + } + } + + mp, err := NewMessageProcessor() + if err != nil { + t.Error(err.Error()) + return + } + err = mp.FromConfigJSON(c.config) + if err != nil { + t.Error(err.Error()) + return + } + //t.Log(m.ToLineProtocol(nil)) + drop, err := mp.ProcessMessage(m) + if err != nil && !c.errors { + cclog.SetDebug() + mp.ProcessMessage(m) + t.Error(err.Error()) + return + } + if drop != c.drop { + if c.drop { + t.Error("fail, message should be dropped but processor signalled NO dropping") + } else { + t.Error("fail, message should NOT be dropped but processor signalled dropping") + } + cclog.SetDebug() + mp.ProcessMessage(m) + } + if c.check != nil { + if err := c.check(m); err != nil { + t.Errorf("check failed with %v", err.Error()) + t.Log("Rerun with debugging") + cclog.SetDebug() + mp.ProcessMessage(m) + } + } + }) + } +} + +func BenchmarkProcessing(b *testing.B) { + + mlist, err := generate_message_lists(b.N, 1000) + if err != nil { + b.Error(err.Error()) + return + } + + mp, err := NewMessageProcessor() + if err != nil { + b.Error(err.Error()) + return + } + err = mp.FromConfigJSON(json.RawMessage(`{"move_meta_to_tag_if": [{"if" : "name == 'mymetric'", "key":"unit", "value":"unit"}]}`)) + if err != nil { + b.Error(err.Error()) + return + } + + b.StartTimer() + for i := 0; i < b.N; i++ { + for _, m := range mlist[i] { + if _, err := mp.ProcessMessage(m); err != nil { + b.Errorf("failed processing message '%s': %v", m.ToLineProtocol(nil), err.Error()) + return + } + } + } + b.StopTimer() + b.ReportMetric(float64(b.Elapsed())/float64(len(mlist)*b.N), "ns/message") +}