From beeea9e3aab8d829694b4017c36283b33bb3bef6 Mon Sep 17 00:00:00 2001 From: Thomas Roehl Date: Thu, 12 Dec 2024 05:23:54 +0100 Subject: [PATCH] Fix JSON keys in message processor configuration --- pkg/messageProcessor/messageProcessor.go | 34 ++++++++++++------------ 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/pkg/messageProcessor/messageProcessor.go b/pkg/messageProcessor/messageProcessor.go index 3a3d1cf..a82d301 100644 --- a/pkg/messageProcessor/messageProcessor.go +++ b/pkg/messageProcessor/messageProcessor.go @@ -6,18 +6,18 @@ import ( "strings" "sync" - lp2 "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message" + lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message" cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" - lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric" + lplegacy "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric" "github.com/expr-lang/expr" "github.com/expr-lang/expr/vm" ) // Message processor add/delete tag/meta configuration type messageProcessorTagConfig struct { - Key string `json:"key"` // Tag name - Value string `json:"value"` // Tag value - Condition string `json:"if"` // Condition for adding or removing corresponding tag + Key string `json:"key"` // Tag name + Value string `json:"value,omitempty"` // Tag value + Condition string `json:"if"` // Condition for adding or removing corresponding tag } type messageProcessorConfig struct { @@ -32,8 +32,8 @@ type messageProcessorConfig struct { DelTagsIf []messageProcessorTagConfig `json:"delete_tags_if"` // List of tags that are removed when the condition is met AddMetaIf []messageProcessorTagConfig `json:"add_meta_if"` // List of meta infos that are added when the condition is met DelMetaIf []messageProcessorTagConfig `json:"delete_meta_if"` // List of meta infos that are removed when the condition is met - AddFieldIf []messageProcessorTagConfig `json:"add_fields_if"` // List of fields that are added when the condition is met - DelFieldIf []messageProcessorTagConfig `json:"delete_fields_if"` // List of fields that are removed when the condition is met + AddFieldIf []messageProcessorTagConfig `json:"add_field_if"` // List of fields that are added when the condition is met + DelFieldIf []messageProcessorTagConfig `json:"delete_field_if"` // List of fields that are removed when the condition is met DropByType []string `json:"drop_by_message_type"` // List of message types that should be dropped MoveTagToMeta []messageProcessorTagConfig `json:"move_tag_to_meta_if"` MoveTagToField []messageProcessorTagConfig `json:"move_tag_to_field_if"` @@ -117,8 +117,8 @@ type MessageProcessor interface { // Read in a JSON configuration FromConfigJSON(config json.RawMessage) error // Processing functions for legacy CCMetric and current CCMessage - ProcessMetric(m lp.CCMetric) (lp2.CCMessage, error) - ProcessMessage(m lp2.CCMessage) (lp2.CCMessage, error) + ProcessMetric(m lplegacy.CCMetric) (lp.CCMessage, error) + ProcessMessage(m lp.CCMessage) (lp.CCMessage, error) //EvalToBool(condition string, parameters map[string]interface{}) (bool, error) //EvalToFloat64(condition string, parameters map[string]interface{}) (float64, error) //EvalToString(condition string, parameters map[string]interface{}) (string, error) @@ -261,8 +261,8 @@ var baseenv = map[string]interface{}{ "log": "", }, "timestamp": 1234567890, - "msg": lp2.EmptyMessage(), - "message": lp2.EmptyMessage(), + "msg": lp.EmptyMessage(), + "message": lp.EmptyMessage(), } func addBaseEnvWalker(values map[string]interface{}) map[string]interface{} { @@ -759,8 +759,8 @@ func (mp *messageProcessor) FromConfigJSON(config json.RawMessage) error { return nil } -func (mp *messageProcessor) ProcessMetric(metric lp.CCMetric) (lp2.CCMessage, error) { - m, err := lp2.NewMessage( +func (mp *messageProcessor) ProcessMetric(metric lplegacy.CCMetric) (lp.CCMessage, error) { + m, err := lp.NewMessage( metric.Name(), metric.Tags(), metric.Meta(), @@ -774,9 +774,9 @@ func (mp *messageProcessor) ProcessMetric(metric lp.CCMetric) (lp2.CCMessage, er } -func (mp *messageProcessor) ProcessMessage(m lp2.CCMessage) (lp2.CCMessage, error) { +func (mp *messageProcessor) ProcessMessage(m lp.CCMessage) (lp.CCMessage, error) { var err error = nil - var out lp2.CCMessage = lp2.FromMessage(m) + var out lp.CCMessage = lp.FromMessage(m) name := out.Name() @@ -945,7 +945,7 @@ func (mp *messageProcessor) ProcessMessage(m lp2.CCMessage) (lp2.CCMessage, erro case STAGENAME_NORMALIZE_UNIT: if mp.normalizeUnits { cclog.ComponentDebug("MessageProcessor", "Normalize units") - if lp2.IsMetric(out) { + if lp.IsMetric(out) { _, err := normalizeUnits(out) if err != nil { return out, fmt.Errorf("failed to evaluate: %v", err.Error()) @@ -958,7 +958,7 @@ func (mp *messageProcessor) ProcessMessage(m lp2.CCMessage) (lp2.CCMessage, erro case STAGENAME_CHANGE_UNIT_PREFIX: if len(mp.changeUnitPrefix) > 0 { cclog.ComponentDebug("MessageProcessor", "Change unit prefix") - if lp2.IsMetric(out) { + if lp.IsMetric(out) { _, err := changeUnitPrefix(out, ¶ms, &mp.changeUnitPrefix) if err != nil { return out, fmt.Errorf("failed to evaluate: %v", err.Error())