diff --git a/pkg/messageProcessor/messageProcessor.go b/pkg/messageProcessor/messageProcessor.go index a82d301..adfc0a3 100644 --- a/pkg/messageProcessor/messageProcessor.go +++ b/pkg/messageProcessor/messageProcessor.go @@ -802,45 +802,45 @@ func (mp *messageProcessor) ProcessMessage(m lp.CCMessage) (lp.CCMessage, error) switch s { case STAGENAME_DROP_BY_NAME: if len(mp.dropMessages) > 0 { - cclog.ComponentDebug("MessageProcessor", "Dropping by message name ", name) + //cclog.ComponentDebug("MessageProcessor", "Dropping by message name ", name) if _, ok := mp.dropMessages[name]; ok { - cclog.ComponentDebug("MessageProcessor", "Drop") + //cclog.ComponentDebug("MessageProcessor", "Drop") return nil, nil } } case STAGENAME_DROP_BY_TYPE: if len(mp.dropTypes) > 0 { - cclog.ComponentDebug("MessageProcessor", "Dropping by message type") + //cclog.ComponentDebug("MessageProcessor", "Dropping by message type") if _, ok := mp.dropTypes[params["messagetype"].(string)]; ok { - cclog.ComponentDebug("MessageProcessor", "Drop") + //cclog.ComponentDebug("MessageProcessor", "Drop") return nil, nil } } case STAGENAME_DROP_IF: if len(mp.dropMessagesIf) > 0 { - cclog.ComponentDebug("MessageProcessor", "Dropping by condition") + //cclog.ComponentDebug("MessageProcessor", "Dropping by condition") drop, err := dropMessagesIf(¶ms, &mp.dropMessagesIf) if err != nil { return out, fmt.Errorf("failed to evaluate: %v", err.Error()) } if drop { - cclog.ComponentDebug("MessageProcessor", "Drop") + //cclog.ComponentDebug("MessageProcessor", "Drop") return nil, nil } } case STAGENAME_RENAME_BY_NAME: if len(mp.renameMessages) > 0 { - cclog.ComponentDebug("MessageProcessor", "Renaming by name match") + //cclog.ComponentDebug("MessageProcessor", "Renaming by name match") if newname, ok := mp.renameMessages[name]; ok { - cclog.ComponentDebug("MessageProcessor", "Rename to", newname) + //cclog.ComponentDebug("MessageProcessor", "Rename to", newname) out.SetName(newname) - cclog.ComponentDebug("MessageProcessor", "Add old name as 'oldname' to meta", name) + //cclog.ComponentDebug("MessageProcessor", "Add old name as 'oldname' to meta", name) out.AddMeta("oldname", name) } } case STAGENAME_RENAME_IF: if len(mp.renameMessagesIf) > 0 { - cclog.ComponentDebug("MessageProcessor", "Renaming by condition") + //cclog.ComponentDebug("MessageProcessor", "Renaming by condition") _, err := renameMessagesIf(out, ¶ms, &mp.renameMessagesIf) if err != nil { return out, fmt.Errorf("failed to evaluate: %v", err.Error()) @@ -848,7 +848,7 @@ func (mp *messageProcessor) ProcessMessage(m lp.CCMessage) (lp.CCMessage, error) } case STAGENAME_ADD_TAG: if len(mp.addTagsIf) > 0 { - cclog.ComponentDebug("MessageProcessor", "Adding tags") + //cclog.ComponentDebug("MessageProcessor", "Adding tags") _, err = addTagIf(out, ¶ms, &mp.addTagsIf) if err != nil { return out, fmt.Errorf("failed to evaluate: %v", err.Error()) @@ -856,7 +856,7 @@ func (mp *messageProcessor) ProcessMessage(m lp.CCMessage) (lp.CCMessage, error) } case STAGENAME_DELETE_TAG: if len(mp.deleteTagsIf) > 0 { - cclog.ComponentDebug("MessageProcessor", "Delete tags") + //cclog.ComponentDebug("MessageProcessor", "Delete tags") _, err = deleteTagIf(out, ¶ms, &mp.deleteTagsIf) if err != nil { return out, fmt.Errorf("failed to evaluate: %v", err.Error()) @@ -864,7 +864,7 @@ func (mp *messageProcessor) ProcessMessage(m lp.CCMessage) (lp.CCMessage, error) } case STAGENAME_ADD_META: if len(mp.addMetaIf) > 0 { - cclog.ComponentDebug("MessageProcessor", "Adding meta information") + //cclog.ComponentDebug("MessageProcessor", "Adding meta information") _, err = addMetaIf(out, ¶ms, &mp.addMetaIf) if err != nil { return out, fmt.Errorf("failed to evaluate: %v", err.Error()) @@ -872,7 +872,7 @@ func (mp *messageProcessor) ProcessMessage(m lp.CCMessage) (lp.CCMessage, error) } case STAGENAME_DELETE_META: if len(mp.deleteMetaIf) > 0 { - cclog.ComponentDebug("MessageProcessor", "Delete meta information") + //cclog.ComponentDebug("MessageProcessor", "Delete meta information") _, err = deleteMetaIf(out, ¶ms, &mp.deleteMetaIf) if err != nil { return out, fmt.Errorf("failed to evaluate: %v", err.Error()) @@ -880,7 +880,7 @@ func (mp *messageProcessor) ProcessMessage(m lp.CCMessage) (lp.CCMessage, error) } case STAGENAME_ADD_FIELD: if len(mp.addFieldIf) > 0 { - cclog.ComponentDebug("MessageProcessor", "Adding fields") + //cclog.ComponentDebug("MessageProcessor", "Adding fields") _, err = addFieldIf(out, ¶ms, &mp.addFieldIf) if err != nil { return out, fmt.Errorf("failed to evaluate: %v", err.Error()) @@ -888,7 +888,7 @@ func (mp *messageProcessor) ProcessMessage(m lp.CCMessage) (lp.CCMessage, error) } case STAGENAME_DELETE_FIELD: if len(mp.deleteFieldIf) > 0 { - cclog.ComponentDebug("MessageProcessor", "Delete fields") + //cclog.ComponentDebug("MessageProcessor", "Delete fields") _, err = deleteFieldIf(out, ¶ms, &mp.deleteFieldIf) if err != nil { return out, fmt.Errorf("failed to evaluate: %v", err.Error()) @@ -896,7 +896,7 @@ func (mp *messageProcessor) ProcessMessage(m lp.CCMessage) (lp.CCMessage, error) } case STAGENAME_MOVE_TAG_META: if len(mp.moveTagToMeta) > 0 { - cclog.ComponentDebug("MessageProcessor", "Move tag to meta") + //cclog.ComponentDebug("MessageProcessor", "Move tag to meta") _, err := moveTagToMeta(out, ¶ms, &mp.moveTagToMeta) if err != nil { return out, fmt.Errorf("failed to evaluate: %v", err.Error()) @@ -904,7 +904,7 @@ func (mp *messageProcessor) ProcessMessage(m lp.CCMessage) (lp.CCMessage, error) } case STAGENAME_MOVE_TAG_FIELD: if len(mp.moveTagToField) > 0 { - cclog.ComponentDebug("MessageProcessor", "Move tag to fields") + //cclog.ComponentDebug("MessageProcessor", "Move tag to fields") _, err := moveTagToField(out, ¶ms, &mp.moveTagToField) if err != nil { return out, fmt.Errorf("failed to evaluate: %v", err.Error()) @@ -912,7 +912,7 @@ func (mp *messageProcessor) ProcessMessage(m lp.CCMessage) (lp.CCMessage, error) } case STAGENAME_MOVE_META_TAG: if len(mp.moveMetaToTag) > 0 { - cclog.ComponentDebug("MessageProcessor", "Move meta to tags") + //cclog.ComponentDebug("MessageProcessor", "Move meta to tags") _, err := moveMetaToTag(out, ¶ms, &mp.moveMetaToTag) if err != nil { return out, fmt.Errorf("failed to evaluate: %v", err.Error()) @@ -920,7 +920,7 @@ func (mp *messageProcessor) ProcessMessage(m lp.CCMessage) (lp.CCMessage, error) } case STAGENAME_MOVE_META_FIELD: if len(mp.moveMetaToField) > 0 { - cclog.ComponentDebug("MessageProcessor", "Move meta to fields") + //cclog.ComponentDebug("MessageProcessor", "Move meta to fields") _, err := moveMetaToField(out, ¶ms, &mp.moveMetaToField) if err != nil { return out, fmt.Errorf("failed to evaluate: %v", err.Error()) @@ -928,7 +928,7 @@ func (mp *messageProcessor) ProcessMessage(m lp.CCMessage) (lp.CCMessage, error) } case STAGENAME_MOVE_FIELD_META: if len(mp.moveFieldToMeta) > 0 { - cclog.ComponentDebug("MessageProcessor", "Move field to meta") + //cclog.ComponentDebug("MessageProcessor", "Move field to meta") _, err := moveFieldToMeta(out, ¶ms, &mp.moveFieldToMeta) if err != nil { return out, fmt.Errorf("failed to evaluate: %v", err.Error()) @@ -936,7 +936,7 @@ func (mp *messageProcessor) ProcessMessage(m lp.CCMessage) (lp.CCMessage, error) } case STAGENAME_MOVE_FIELD_TAG: if len(mp.moveFieldToTag) > 0 { - cclog.ComponentDebug("MessageProcessor", "Move field to tags") + //cclog.ComponentDebug("MessageProcessor", "Move field to tags") _, err := moveFieldToTag(out, ¶ms, &mp.moveFieldToTag) if err != nil { return out, fmt.Errorf("failed to evaluate: %v", err.Error()) @@ -944,7 +944,7 @@ func (mp *messageProcessor) ProcessMessage(m lp.CCMessage) (lp.CCMessage, error) } case STAGENAME_NORMALIZE_UNIT: if mp.normalizeUnits { - cclog.ComponentDebug("MessageProcessor", "Normalize units") + //cclog.ComponentDebug("MessageProcessor", "Normalize units") if lp.IsMetric(out) { _, err := normalizeUnits(out) if err != nil { @@ -957,7 +957,7 @@ func (mp *messageProcessor) ProcessMessage(m lp.CCMessage) (lp.CCMessage, error) case STAGENAME_CHANGE_UNIT_PREFIX: if len(mp.changeUnitPrefix) > 0 { - cclog.ComponentDebug("MessageProcessor", "Change unit prefix") + //cclog.ComponentDebug("MessageProcessor", "Change unit prefix") if lp.IsMetric(out) { _, err := changeUnitPrefix(out, ¶ms, &mp.changeUnitPrefix) if err != nil { diff --git a/pkg/messageProcessor/messageProcessorFuncs.go b/pkg/messageProcessor/messageProcessorFuncs.go index 23c261e..8fa5661 100644 --- a/pkg/messageProcessor/messageProcessorFuncs.go +++ b/pkg/messageProcessor/messageProcessorFuncs.go @@ -5,7 +5,6 @@ import ( "fmt" lp2 "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message" - cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" units "github.com/ClusterCockpit/cc-units" "github.com/expr-lang/expr" "github.com/expr-lang/expr/vm" @@ -26,49 +25,47 @@ func moveInMessage(message lp2.CCMessage, params *map[string]interface{}, checks if err != nil { return false, fmt.Errorf("failed to evaluate: %v", err.Error()) } - cclog.ComponentDebug("MessageProcessor", "Move from", from, "to", to) + //cclog.ComponentDebug("MessageProcessor", "Move from", from, "to", to) if value.(bool) { var v string var ok bool = false switch from { case MESSAGE_LOCATION_TAGS: - cclog.ComponentDebug("MessageProcessor", "Getting tag key", data.Key) + //cclog.ComponentDebug("MessageProcessor", "Getting tag key", data.Key) v, ok = message.GetTag(data.Key) case MESSAGE_LOCATION_META: - cclog.ComponentDebug("MessageProcessor", "Getting meta key", data.Key) - cclog.ComponentDebug("MessageProcessor", message.Meta()) + //cclog.ComponentDebug("MessageProcessor", "Getting meta key", data.Key) + //cclog.ComponentDebug("MessageProcessor", message.Meta()) v, ok = message.GetMeta(data.Key) case MESSAGE_LOCATION_FIELDS: var x interface{} - cclog.ComponentDebug("MessageProcessor", "Getting field key", data.Key) + //cclog.ComponentDebug("MessageProcessor", "Getting field key", data.Key) x, ok = message.GetField(data.Key) v = fmt.Sprintf("%v", x) } if ok { switch from { case MESSAGE_LOCATION_TAGS: - cclog.ComponentDebug("MessageProcessor", "Removing tag key", data.Key) + //cclog.ComponentDebug("MessageProcessor", "Removing tag key", data.Key) message.RemoveTag(data.Key) case MESSAGE_LOCATION_META: - cclog.ComponentDebug("MessageProcessor", "Removing meta key", data.Key) + //cclog.ComponentDebug("MessageProcessor", "Removing meta key", data.Key) message.RemoveMeta(data.Key) case MESSAGE_LOCATION_FIELDS: - cclog.ComponentDebug("MessageProcessor", "Removing field key", data.Key) + //cclog.ComponentDebug("MessageProcessor", "Removing field key", data.Key) message.RemoveField(data.Key) } switch to { case MESSAGE_LOCATION_TAGS: - cclog.ComponentDebug("MessageProcessor", "Adding tag", data.Value, "->", v) + //cclog.ComponentDebug("MessageProcessor", "Adding tag", data.Value, "->", v) message.AddTag(data.Value, v) case MESSAGE_LOCATION_META: - cclog.ComponentDebug("MessageProcessor", "Adding meta", data.Value, "->", v) + //cclog.ComponentDebug("MessageProcessor", "Adding meta", data.Value, "->", v) message.AddMeta(data.Value, v) case MESSAGE_LOCATION_FIELDS: - cclog.ComponentDebug("MessageProcessor", "Adding field", data.Value, "->", v) + //cclog.ComponentDebug("MessageProcessor", "Adding field", data.Value, "->", v) message.AddField(data.Value, v) } - } else { - return false, fmt.Errorf("failed to get message entry: %s", data.Key) } } } @@ -88,14 +85,14 @@ func deleteIf(message lp2.CCMessage, params *map[string]interface{}, checks *map case "value", "event", "log", "control": return false, errors.New("cannot delete protected fields") default: - cclog.ComponentDebug("MessageProcessor", "Removing field for", data.Key) + //cclog.ComponentDebug("MessageProcessor", "Removing field for", data.Key) message.RemoveField(data.Key) } case MESSAGE_LOCATION_TAGS: - cclog.ComponentDebug("MessageProcessor", "Removing tag for", data.Key) + //cclog.ComponentDebug("MessageProcessor", "Removing tag for", data.Key) message.RemoveTag(data.Key) case MESSAGE_LOCATION_META: - cclog.ComponentDebug("MessageProcessor", "Removing meta for", data.Key) + //cclog.ComponentDebug("MessageProcessor", "Removing meta for", data.Key) message.RemoveMeta(data.Key) } } @@ -112,13 +109,13 @@ func addIf(message lp2.CCMessage, params *map[string]interface{}, checks *map[*v if value.(bool) { switch location { case MESSAGE_LOCATION_FIELDS: - cclog.ComponentDebug("MessageProcessor", "Adding field", data.Value, "->", data.Value) + //cclog.ComponentDebug("MessageProcessor", "Adding field", data.Value, "->", data.Value) message.AddField(data.Key, data.Value) case MESSAGE_LOCATION_TAGS: - cclog.ComponentDebug("MessageProcessor", "Adding tag", data.Value, "->", data.Value) + //cclog.ComponentDebug("MessageProcessor", "Adding tag", data.Value, "->", data.Value) message.AddTag(data.Key, data.Value) case MESSAGE_LOCATION_META: - cclog.ComponentDebug("MessageProcessor", "Adding meta", data.Value, "->", data.Value) + //cclog.ComponentDebug("MessageProcessor", "Adding meta", data.Value, "->", data.Value) message.AddMeta(data.Key, data.Value) } } @@ -191,13 +188,13 @@ func normalizeUnits(message lp2.CCMessage) (bool, error) { if in_unit, ok := message.GetMeta("unit"); ok { u := units.NewUnit(in_unit) if u.Valid() { - cclog.ComponentDebug("MessageProcessor", "Update unit with", u.Short()) + //cclog.ComponentDebug("MessageProcessor", "Update unit with", u.Short()) message.AddMeta("unit", u.Short()) } } else if in_unit, ok := message.GetTag("unit"); ok { u := units.NewUnit(in_unit) if u.Valid() { - cclog.ComponentDebug("MessageProcessor", "Update unit with", u.Short()) + //cclog.ComponentDebug("MessageProcessor", "Update unit with", u.Short()) message.AddTag("unit", u.Short()) } } @@ -212,15 +209,15 @@ func changeUnitPrefix(message lp2.CCMessage, params *map[string]interface{}, che } if value.(bool) { newPrefix := units.NewPrefix(n) - cclog.ComponentDebug("MessageProcessor", "Condition matches, change to prefix", newPrefix.String()) + //cclog.ComponentDebug("MessageProcessor", "Condition matches, change to prefix", newPrefix.String()) if in_unit, ok := message.GetMeta("unit"); ok && newPrefix != units.InvalidPrefix { u := units.NewUnit(in_unit) if u.Valid() { - cclog.ComponentDebug("MessageProcessor", "Input unit", u.Short()) + //cclog.ComponentDebug("MessageProcessor", "Input unit", u.Short()) conv, out_unit := units.GetUnitPrefixFactor(u, newPrefix) if conv != nil && out_unit.Valid() { if val, ok := message.GetField("value"); ok { - cclog.ComponentDebug("MessageProcessor", "Update unit with", out_unit.Short()) + //cclog.ComponentDebug("MessageProcessor", "Update unit with", out_unit.Short()) message.AddField("value", conv(val)) message.AddMeta("unit", out_unit.Short()) } @@ -230,11 +227,11 @@ func changeUnitPrefix(message lp2.CCMessage, params *map[string]interface{}, che } else if in_unit, ok := message.GetTag("unit"); ok && newPrefix != units.InvalidPrefix { u := units.NewUnit(in_unit) if u.Valid() { - cclog.ComponentDebug("MessageProcessor", "Input unit", u.Short()) + //cclog.ComponentDebug("MessageProcessor", "Input unit", u.Short()) conv, out_unit := units.GetUnitPrefixFactor(u, newPrefix) if conv != nil && out_unit.Valid() { if val, ok := message.GetField("value"); ok { - cclog.ComponentDebug("MessageProcessor", "Update unit with", out_unit.Short()) + //cclog.ComponentDebug("MessageProcessor", "Update unit with", out_unit.Short()) message.AddField("value", conv(val)) message.AddTag("unit", out_unit.Short()) } @@ -255,9 +252,9 @@ func renameMessagesIf(message lp2.CCMessage, params *map[string]interface{}, che } if value.(bool) { old := message.Name() - cclog.ComponentDebug("MessageProcessor", "Rename to", n) + //cclog.ComponentDebug("MessageProcessor", "Rename to", n) message.SetName(n) - cclog.ComponentDebug("MessageProcessor", "Add old name as 'oldname' to meta", old) + //cclog.ComponentDebug("MessageProcessor", "Add old name as 'oldname' to meta", old) message.AddMeta("oldname", old) } }