From 98ba5efc69ddbe25a9d49cdfe15552c1e0a157eb Mon Sep 17 00:00:00 2001 From: Thomas Roehl Date: Fri, 12 Jul 2024 17:40:31 +0200 Subject: [PATCH] Create a copy of message before manipulation --- pkg/messageProcessor/messageProcessor.go | 96 ++++++++++--------- pkg/messageProcessor/messageProcessor_test.go | 29 ++++-- 2 files changed, 68 insertions(+), 57 deletions(-) diff --git a/pkg/messageProcessor/messageProcessor.go b/pkg/messageProcessor/messageProcessor.go index 16bd542..3a3d1cf 100644 --- a/pkg/messageProcessor/messageProcessor.go +++ b/pkg/messageProcessor/messageProcessor.go @@ -117,8 +117,8 @@ type MessageProcessor interface { // Read in a JSON configuration FromConfigJSON(config json.RawMessage) error // Processing functions for legacy CCMetric and current CCMessage - ProcessMetric(m lp.CCMetric) (bool, error) - ProcessMessage(m lp2.CCMessage) (bool, error) + ProcessMetric(m lp.CCMetric) (lp2.CCMessage, error) + ProcessMessage(m lp2.CCMessage) (lp2.CCMessage, error) //EvalToBool(condition string, parameters map[string]interface{}) (bool, error) //EvalToFloat64(condition string, parameters map[string]interface{}) (float64, error) //EvalToString(condition string, parameters map[string]interface{}) (string, error) @@ -759,7 +759,7 @@ func (mp *messageProcessor) FromConfigJSON(config json.RawMessage) error { return nil } -func (mp *messageProcessor) ProcessMetric(metric lp.CCMetric) (bool, error) { +func (mp *messageProcessor) ProcessMetric(metric lp.CCMetric) (lp2.CCMessage, error) { m, err := lp2.NewMessage( metric.Name(), metric.Tags(), @@ -768,16 +768,17 @@ func (mp *messageProcessor) ProcessMetric(metric lp.CCMetric) (bool, error) { metric.Time(), ) if err != nil { - return true, fmt.Errorf("failed to parse metric to message: %v", err.Error()) + return m, fmt.Errorf("failed to parse metric to message: %v", err.Error()) } return mp.ProcessMessage(m) } -func (mp *messageProcessor) ProcessMessage(m lp2.CCMessage) (bool, error) { - var drop bool = false +func (mp *messageProcessor) ProcessMessage(m lp2.CCMessage) (lp2.CCMessage, error) { var err error = nil - name := m.Name() + var out lp2.CCMessage = lp2.FromMessage(m) + + name := out.Name() if len(mp.stages) == 0 { mp.SetStages(mp.DefaultStages()) @@ -786,7 +787,7 @@ func (mp *messageProcessor) ProcessMessage(m lp2.CCMessage) (bool, error) { mp.mutex.RLock() defer mp.mutex.RUnlock() - params := getParamMap(m) + params := getParamMap(out) defer func() { params["field"] = nil @@ -804,7 +805,7 @@ func (mp *messageProcessor) ProcessMessage(m lp2.CCMessage) (bool, error) { cclog.ComponentDebug("MessageProcessor", "Dropping by message name ", name) if _, ok := mp.dropMessages[name]; ok { cclog.ComponentDebug("MessageProcessor", "Drop") - return true, nil + return nil, nil } } case STAGENAME_DROP_BY_TYPE: @@ -812,18 +813,19 @@ func (mp *messageProcessor) ProcessMessage(m lp2.CCMessage) (bool, error) { cclog.ComponentDebug("MessageProcessor", "Dropping by message type") if _, ok := mp.dropTypes[params["messagetype"].(string)]; ok { cclog.ComponentDebug("MessageProcessor", "Drop") - return true, nil + return nil, nil } } case STAGENAME_DROP_IF: if len(mp.dropMessagesIf) > 0 { cclog.ComponentDebug("MessageProcessor", "Dropping by condition") - drop, err = dropMessagesIf(¶ms, &mp.dropMessagesIf) + drop, err := dropMessagesIf(¶ms, &mp.dropMessagesIf) if err != nil { - return drop, fmt.Errorf("failed to evaluate: %v", err.Error()) + return out, fmt.Errorf("failed to evaluate: %v", err.Error()) } if drop { cclog.ComponentDebug("MessageProcessor", "Drop") + return nil, nil } } case STAGENAME_RENAME_BY_NAME: @@ -831,122 +833,122 @@ func (mp *messageProcessor) ProcessMessage(m lp2.CCMessage) (bool, error) { cclog.ComponentDebug("MessageProcessor", "Renaming by name match") if newname, ok := mp.renameMessages[name]; ok { cclog.ComponentDebug("MessageProcessor", "Rename to", newname) - m.SetName(newname) + out.SetName(newname) cclog.ComponentDebug("MessageProcessor", "Add old name as 'oldname' to meta", name) - m.AddMeta("oldname", name) + out.AddMeta("oldname", name) } } case STAGENAME_RENAME_IF: if len(mp.renameMessagesIf) > 0 { cclog.ComponentDebug("MessageProcessor", "Renaming by condition") - _, err := renameMessagesIf(m, ¶ms, &mp.renameMessagesIf) + _, err := renameMessagesIf(out, ¶ms, &mp.renameMessagesIf) if err != nil { - return drop, fmt.Errorf("failed to evaluate: %v", err.Error()) + return out, fmt.Errorf("failed to evaluate: %v", err.Error()) } } case STAGENAME_ADD_TAG: if len(mp.addTagsIf) > 0 { cclog.ComponentDebug("MessageProcessor", "Adding tags") - _, err = addTagIf(m, ¶ms, &mp.addTagsIf) + _, err = addTagIf(out, ¶ms, &mp.addTagsIf) if err != nil { - return drop, fmt.Errorf("failed to evaluate: %v", err.Error()) + return out, fmt.Errorf("failed to evaluate: %v", err.Error()) } } case STAGENAME_DELETE_TAG: if len(mp.deleteTagsIf) > 0 { cclog.ComponentDebug("MessageProcessor", "Delete tags") - _, err = deleteTagIf(m, ¶ms, &mp.deleteTagsIf) + _, err = deleteTagIf(out, ¶ms, &mp.deleteTagsIf) if err != nil { - return drop, fmt.Errorf("failed to evaluate: %v", err.Error()) + return out, fmt.Errorf("failed to evaluate: %v", err.Error()) } } case STAGENAME_ADD_META: if len(mp.addMetaIf) > 0 { cclog.ComponentDebug("MessageProcessor", "Adding meta information") - _, err = addMetaIf(m, ¶ms, &mp.addMetaIf) + _, err = addMetaIf(out, ¶ms, &mp.addMetaIf) if err != nil { - return drop, fmt.Errorf("failed to evaluate: %v", err.Error()) + return out, fmt.Errorf("failed to evaluate: %v", err.Error()) } } case STAGENAME_DELETE_META: if len(mp.deleteMetaIf) > 0 { cclog.ComponentDebug("MessageProcessor", "Delete meta information") - _, err = deleteMetaIf(m, ¶ms, &mp.deleteMetaIf) + _, err = deleteMetaIf(out, ¶ms, &mp.deleteMetaIf) if err != nil { - return drop, fmt.Errorf("failed to evaluate: %v", err.Error()) + return out, fmt.Errorf("failed to evaluate: %v", err.Error()) } } case STAGENAME_ADD_FIELD: if len(mp.addFieldIf) > 0 { cclog.ComponentDebug("MessageProcessor", "Adding fields") - _, err = addFieldIf(m, ¶ms, &mp.addFieldIf) + _, err = addFieldIf(out, ¶ms, &mp.addFieldIf) if err != nil { - return drop, fmt.Errorf("failed to evaluate: %v", err.Error()) + return out, fmt.Errorf("failed to evaluate: %v", err.Error()) } } case STAGENAME_DELETE_FIELD: if len(mp.deleteFieldIf) > 0 { cclog.ComponentDebug("MessageProcessor", "Delete fields") - _, err = deleteFieldIf(m, ¶ms, &mp.deleteFieldIf) + _, err = deleteFieldIf(out, ¶ms, &mp.deleteFieldIf) if err != nil { - return drop, fmt.Errorf("failed to evaluate: %v", err.Error()) + return out, fmt.Errorf("failed to evaluate: %v", err.Error()) } } case STAGENAME_MOVE_TAG_META: if len(mp.moveTagToMeta) > 0 { cclog.ComponentDebug("MessageProcessor", "Move tag to meta") - _, err := moveTagToMeta(m, ¶ms, &mp.moveTagToMeta) + _, err := moveTagToMeta(out, ¶ms, &mp.moveTagToMeta) if err != nil { - return drop, fmt.Errorf("failed to evaluate: %v", err.Error()) + return out, fmt.Errorf("failed to evaluate: %v", err.Error()) } } case STAGENAME_MOVE_TAG_FIELD: if len(mp.moveTagToField) > 0 { cclog.ComponentDebug("MessageProcessor", "Move tag to fields") - _, err := moveTagToField(m, ¶ms, &mp.moveTagToField) + _, err := moveTagToField(out, ¶ms, &mp.moveTagToField) if err != nil { - return drop, fmt.Errorf("failed to evaluate: %v", err.Error()) + return out, fmt.Errorf("failed to evaluate: %v", err.Error()) } } case STAGENAME_MOVE_META_TAG: if len(mp.moveMetaToTag) > 0 { cclog.ComponentDebug("MessageProcessor", "Move meta to tags") - _, err := moveMetaToTag(m, ¶ms, &mp.moveMetaToTag) + _, err := moveMetaToTag(out, ¶ms, &mp.moveMetaToTag) if err != nil { - return drop, fmt.Errorf("failed to evaluate: %v", err.Error()) + return out, fmt.Errorf("failed to evaluate: %v", err.Error()) } } case STAGENAME_MOVE_META_FIELD: if len(mp.moveMetaToField) > 0 { cclog.ComponentDebug("MessageProcessor", "Move meta to fields") - _, err := moveMetaToField(m, ¶ms, &mp.moveMetaToField) + _, err := moveMetaToField(out, ¶ms, &mp.moveMetaToField) if err != nil { - return drop, fmt.Errorf("failed to evaluate: %v", err.Error()) + return out, fmt.Errorf("failed to evaluate: %v", err.Error()) } } case STAGENAME_MOVE_FIELD_META: if len(mp.moveFieldToMeta) > 0 { cclog.ComponentDebug("MessageProcessor", "Move field to meta") - _, err := moveFieldToMeta(m, ¶ms, &mp.moveFieldToMeta) + _, err := moveFieldToMeta(out, ¶ms, &mp.moveFieldToMeta) if err != nil { - return drop, fmt.Errorf("failed to evaluate: %v", err.Error()) + return out, fmt.Errorf("failed to evaluate: %v", err.Error()) } } case STAGENAME_MOVE_FIELD_TAG: if len(mp.moveFieldToTag) > 0 { cclog.ComponentDebug("MessageProcessor", "Move field to tags") - _, err := moveFieldToTag(m, ¶ms, &mp.moveFieldToTag) + _, err := moveFieldToTag(out, ¶ms, &mp.moveFieldToTag) if err != nil { - return drop, fmt.Errorf("failed to evaluate: %v", err.Error()) + return out, fmt.Errorf("failed to evaluate: %v", err.Error()) } } case STAGENAME_NORMALIZE_UNIT: if mp.normalizeUnits { cclog.ComponentDebug("MessageProcessor", "Normalize units") - if lp2.IsMetric(m) { - _, err := normalizeUnits(m) + if lp2.IsMetric(out) { + _, err := normalizeUnits(out) if err != nil { - return drop, fmt.Errorf("failed to evaluate: %v", err.Error()) + return out, fmt.Errorf("failed to evaluate: %v", err.Error()) } } else { cclog.ComponentDebug("MessageProcessor", "skipped, no metric") @@ -956,10 +958,10 @@ func (mp *messageProcessor) ProcessMessage(m lp2.CCMessage) (bool, error) { case STAGENAME_CHANGE_UNIT_PREFIX: if len(mp.changeUnitPrefix) > 0 { cclog.ComponentDebug("MessageProcessor", "Change unit prefix") - if lp2.IsMetric(m) { - _, err := changeUnitPrefix(m, ¶ms, &mp.changeUnitPrefix) + if lp2.IsMetric(out) { + _, err := changeUnitPrefix(out, ¶ms, &mp.changeUnitPrefix) if err != nil { - return drop, fmt.Errorf("failed to evaluate: %v", err.Error()) + return out, fmt.Errorf("failed to evaluate: %v", err.Error()) } } else { cclog.ComponentDebug("MessageProcessor", "skipped, no metric") @@ -969,7 +971,7 @@ func (mp *messageProcessor) ProcessMessage(m lp2.CCMessage) (bool, error) { } - return drop, nil + return out, nil } // Get a new instace of a message processor. diff --git a/pkg/messageProcessor/messageProcessor_test.go b/pkg/messageProcessor/messageProcessor_test.go index b04e932..55a3088 100644 --- a/pkg/messageProcessor/messageProcessor_test.go +++ b/pkg/messageProcessor/messageProcessor_test.go @@ -326,28 +326,37 @@ func TestConfigList(t *testing.T) { return } //t.Log(m.ToLineProtocol(nil)) - drop, err := mp.ProcessMessage(m) + out, err := mp.ProcessMessage(m) if err != nil && !c.errors { cclog.SetDebug() mp.ProcessMessage(m) t.Error(err.Error()) return } - if drop != c.drop { - if c.drop { - t.Error("fail, message should be dropped but processor signalled NO dropping") - } else { - t.Error("fail, message should NOT be dropped but processor signalled dropping") - } - cclog.SetDebug() - mp.ProcessMessage(m) + if out == nil && !c.drop { + t.Error("fail, message should NOT be dropped but processor signalled dropping") + return + } else if out != nil && c.drop { + t.Error("fail, message should be dropped but processor signalled NO dropping") + return } + // { + // if c.drop { + // t.Error("fail, message should be dropped but processor signalled NO dropping") + // } else { + // t.Error("fail, message should NOT be dropped but processor signalled dropping") + // } + // cclog.SetDebug() + // mp.ProcessMessage(m) + // return + // } if c.check != nil { - if err := c.check(m); err != nil { + if err := c.check(out); err != nil { t.Errorf("check failed with %v", err.Error()) t.Log("Rerun with debugging") cclog.SetDebug() mp.ProcessMessage(m) + return } } })