Fix for metrics without units and reduce debugging messages for messageProcessor

This commit is contained in:
Thomas Roehl 2024-12-19 14:33:04 +01:00
parent 2f6f8c846a
commit 83d5ad72fd
2 changed files with 50 additions and 53 deletions

View File

@ -802,45 +802,45 @@ func (mp *messageProcessor) ProcessMessage(m lp.CCMessage) (lp.CCMessage, error)
switch s {
case STAGENAME_DROP_BY_NAME:
if len(mp.dropMessages) > 0 {
cclog.ComponentDebug("MessageProcessor", "Dropping by message name ", name)
//cclog.ComponentDebug("MessageProcessor", "Dropping by message name ", name)
if _, ok := mp.dropMessages[name]; ok {
cclog.ComponentDebug("MessageProcessor", "Drop")
//cclog.ComponentDebug("MessageProcessor", "Drop")
return nil, nil
}
}
case STAGENAME_DROP_BY_TYPE:
if len(mp.dropTypes) > 0 {
cclog.ComponentDebug("MessageProcessor", "Dropping by message type")
//cclog.ComponentDebug("MessageProcessor", "Dropping by message type")
if _, ok := mp.dropTypes[params["messagetype"].(string)]; ok {
cclog.ComponentDebug("MessageProcessor", "Drop")
//cclog.ComponentDebug("MessageProcessor", "Drop")
return nil, nil
}
}
case STAGENAME_DROP_IF:
if len(mp.dropMessagesIf) > 0 {
cclog.ComponentDebug("MessageProcessor", "Dropping by condition")
//cclog.ComponentDebug("MessageProcessor", "Dropping by condition")
drop, err := dropMessagesIf(&params, &mp.dropMessagesIf)
if err != nil {
return out, fmt.Errorf("failed to evaluate: %v", err.Error())
}
if drop {
cclog.ComponentDebug("MessageProcessor", "Drop")
//cclog.ComponentDebug("MessageProcessor", "Drop")
return nil, nil
}
}
case STAGENAME_RENAME_BY_NAME:
if len(mp.renameMessages) > 0 {
cclog.ComponentDebug("MessageProcessor", "Renaming by name match")
//cclog.ComponentDebug("MessageProcessor", "Renaming by name match")
if newname, ok := mp.renameMessages[name]; ok {
cclog.ComponentDebug("MessageProcessor", "Rename to", newname)
//cclog.ComponentDebug("MessageProcessor", "Rename to", newname)
out.SetName(newname)
cclog.ComponentDebug("MessageProcessor", "Add old name as 'oldname' to meta", name)
//cclog.ComponentDebug("MessageProcessor", "Add old name as 'oldname' to meta", name)
out.AddMeta("oldname", name)
}
}
case STAGENAME_RENAME_IF:
if len(mp.renameMessagesIf) > 0 {
cclog.ComponentDebug("MessageProcessor", "Renaming by condition")
//cclog.ComponentDebug("MessageProcessor", "Renaming by condition")
_, err := renameMessagesIf(out, &params, &mp.renameMessagesIf)
if err != nil {
return out, fmt.Errorf("failed to evaluate: %v", err.Error())
@ -848,7 +848,7 @@ func (mp *messageProcessor) ProcessMessage(m lp.CCMessage) (lp.CCMessage, error)
}
case STAGENAME_ADD_TAG:
if len(mp.addTagsIf) > 0 {
cclog.ComponentDebug("MessageProcessor", "Adding tags")
//cclog.ComponentDebug("MessageProcessor", "Adding tags")
_, err = addTagIf(out, &params, &mp.addTagsIf)
if err != nil {
return out, fmt.Errorf("failed to evaluate: %v", err.Error())
@ -856,7 +856,7 @@ func (mp *messageProcessor) ProcessMessage(m lp.CCMessage) (lp.CCMessage, error)
}
case STAGENAME_DELETE_TAG:
if len(mp.deleteTagsIf) > 0 {
cclog.ComponentDebug("MessageProcessor", "Delete tags")
//cclog.ComponentDebug("MessageProcessor", "Delete tags")
_, err = deleteTagIf(out, &params, &mp.deleteTagsIf)
if err != nil {
return out, fmt.Errorf("failed to evaluate: %v", err.Error())
@ -864,7 +864,7 @@ func (mp *messageProcessor) ProcessMessage(m lp.CCMessage) (lp.CCMessage, error)
}
case STAGENAME_ADD_META:
if len(mp.addMetaIf) > 0 {
cclog.ComponentDebug("MessageProcessor", "Adding meta information")
//cclog.ComponentDebug("MessageProcessor", "Adding meta information")
_, err = addMetaIf(out, &params, &mp.addMetaIf)
if err != nil {
return out, fmt.Errorf("failed to evaluate: %v", err.Error())
@ -872,7 +872,7 @@ func (mp *messageProcessor) ProcessMessage(m lp.CCMessage) (lp.CCMessage, error)
}
case STAGENAME_DELETE_META:
if len(mp.deleteMetaIf) > 0 {
cclog.ComponentDebug("MessageProcessor", "Delete meta information")
//cclog.ComponentDebug("MessageProcessor", "Delete meta information")
_, err = deleteMetaIf(out, &params, &mp.deleteMetaIf)
if err != nil {
return out, fmt.Errorf("failed to evaluate: %v", err.Error())
@ -880,7 +880,7 @@ func (mp *messageProcessor) ProcessMessage(m lp.CCMessage) (lp.CCMessage, error)
}
case STAGENAME_ADD_FIELD:
if len(mp.addFieldIf) > 0 {
cclog.ComponentDebug("MessageProcessor", "Adding fields")
//cclog.ComponentDebug("MessageProcessor", "Adding fields")
_, err = addFieldIf(out, &params, &mp.addFieldIf)
if err != nil {
return out, fmt.Errorf("failed to evaluate: %v", err.Error())
@ -888,7 +888,7 @@ func (mp *messageProcessor) ProcessMessage(m lp.CCMessage) (lp.CCMessage, error)
}
case STAGENAME_DELETE_FIELD:
if len(mp.deleteFieldIf) > 0 {
cclog.ComponentDebug("MessageProcessor", "Delete fields")
//cclog.ComponentDebug("MessageProcessor", "Delete fields")
_, err = deleteFieldIf(out, &params, &mp.deleteFieldIf)
if err != nil {
return out, fmt.Errorf("failed to evaluate: %v", err.Error())
@ -896,7 +896,7 @@ func (mp *messageProcessor) ProcessMessage(m lp.CCMessage) (lp.CCMessage, error)
}
case STAGENAME_MOVE_TAG_META:
if len(mp.moveTagToMeta) > 0 {
cclog.ComponentDebug("MessageProcessor", "Move tag to meta")
//cclog.ComponentDebug("MessageProcessor", "Move tag to meta")
_, err := moveTagToMeta(out, &params, &mp.moveTagToMeta)
if err != nil {
return out, fmt.Errorf("failed to evaluate: %v", err.Error())
@ -904,7 +904,7 @@ func (mp *messageProcessor) ProcessMessage(m lp.CCMessage) (lp.CCMessage, error)
}
case STAGENAME_MOVE_TAG_FIELD:
if len(mp.moveTagToField) > 0 {
cclog.ComponentDebug("MessageProcessor", "Move tag to fields")
//cclog.ComponentDebug("MessageProcessor", "Move tag to fields")
_, err := moveTagToField(out, &params, &mp.moveTagToField)
if err != nil {
return out, fmt.Errorf("failed to evaluate: %v", err.Error())
@ -912,7 +912,7 @@ func (mp *messageProcessor) ProcessMessage(m lp.CCMessage) (lp.CCMessage, error)
}
case STAGENAME_MOVE_META_TAG:
if len(mp.moveMetaToTag) > 0 {
cclog.ComponentDebug("MessageProcessor", "Move meta to tags")
//cclog.ComponentDebug("MessageProcessor", "Move meta to tags")
_, err := moveMetaToTag(out, &params, &mp.moveMetaToTag)
if err != nil {
return out, fmt.Errorf("failed to evaluate: %v", err.Error())
@ -920,7 +920,7 @@ func (mp *messageProcessor) ProcessMessage(m lp.CCMessage) (lp.CCMessage, error)
}
case STAGENAME_MOVE_META_FIELD:
if len(mp.moveMetaToField) > 0 {
cclog.ComponentDebug("MessageProcessor", "Move meta to fields")
//cclog.ComponentDebug("MessageProcessor", "Move meta to fields")
_, err := moveMetaToField(out, &params, &mp.moveMetaToField)
if err != nil {
return out, fmt.Errorf("failed to evaluate: %v", err.Error())
@ -928,7 +928,7 @@ func (mp *messageProcessor) ProcessMessage(m lp.CCMessage) (lp.CCMessage, error)
}
case STAGENAME_MOVE_FIELD_META:
if len(mp.moveFieldToMeta) > 0 {
cclog.ComponentDebug("MessageProcessor", "Move field to meta")
//cclog.ComponentDebug("MessageProcessor", "Move field to meta")
_, err := moveFieldToMeta(out, &params, &mp.moveFieldToMeta)
if err != nil {
return out, fmt.Errorf("failed to evaluate: %v", err.Error())
@ -936,7 +936,7 @@ func (mp *messageProcessor) ProcessMessage(m lp.CCMessage) (lp.CCMessage, error)
}
case STAGENAME_MOVE_FIELD_TAG:
if len(mp.moveFieldToTag) > 0 {
cclog.ComponentDebug("MessageProcessor", "Move field to tags")
//cclog.ComponentDebug("MessageProcessor", "Move field to tags")
_, err := moveFieldToTag(out, &params, &mp.moveFieldToTag)
if err != nil {
return out, fmt.Errorf("failed to evaluate: %v", err.Error())
@ -944,7 +944,7 @@ func (mp *messageProcessor) ProcessMessage(m lp.CCMessage) (lp.CCMessage, error)
}
case STAGENAME_NORMALIZE_UNIT:
if mp.normalizeUnits {
cclog.ComponentDebug("MessageProcessor", "Normalize units")
//cclog.ComponentDebug("MessageProcessor", "Normalize units")
if lp.IsMetric(out) {
_, err := normalizeUnits(out)
if err != nil {
@ -957,7 +957,7 @@ func (mp *messageProcessor) ProcessMessage(m lp.CCMessage) (lp.CCMessage, error)
case STAGENAME_CHANGE_UNIT_PREFIX:
if len(mp.changeUnitPrefix) > 0 {
cclog.ComponentDebug("MessageProcessor", "Change unit prefix")
//cclog.ComponentDebug("MessageProcessor", "Change unit prefix")
if lp.IsMetric(out) {
_, err := changeUnitPrefix(out, &params, &mp.changeUnitPrefix)
if err != nil {

View File

@ -5,7 +5,6 @@ import (
"fmt"
lp2 "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message"
cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
units "github.com/ClusterCockpit/cc-units"
"github.com/expr-lang/expr"
"github.com/expr-lang/expr/vm"
@ -26,49 +25,47 @@ func moveInMessage(message lp2.CCMessage, params *map[string]interface{}, checks
if err != nil {
return false, fmt.Errorf("failed to evaluate: %v", err.Error())
}
cclog.ComponentDebug("MessageProcessor", "Move from", from, "to", to)
//cclog.ComponentDebug("MessageProcessor", "Move from", from, "to", to)
if value.(bool) {
var v string
var ok bool = false
switch from {
case MESSAGE_LOCATION_TAGS:
cclog.ComponentDebug("MessageProcessor", "Getting tag key", data.Key)
//cclog.ComponentDebug("MessageProcessor", "Getting tag key", data.Key)
v, ok = message.GetTag(data.Key)
case MESSAGE_LOCATION_META:
cclog.ComponentDebug("MessageProcessor", "Getting meta key", data.Key)
cclog.ComponentDebug("MessageProcessor", message.Meta())
//cclog.ComponentDebug("MessageProcessor", "Getting meta key", data.Key)
//cclog.ComponentDebug("MessageProcessor", message.Meta())
v, ok = message.GetMeta(data.Key)
case MESSAGE_LOCATION_FIELDS:
var x interface{}
cclog.ComponentDebug("MessageProcessor", "Getting field key", data.Key)
//cclog.ComponentDebug("MessageProcessor", "Getting field key", data.Key)
x, ok = message.GetField(data.Key)
v = fmt.Sprintf("%v", x)
}
if ok {
switch from {
case MESSAGE_LOCATION_TAGS:
cclog.ComponentDebug("MessageProcessor", "Removing tag key", data.Key)
//cclog.ComponentDebug("MessageProcessor", "Removing tag key", data.Key)
message.RemoveTag(data.Key)
case MESSAGE_LOCATION_META:
cclog.ComponentDebug("MessageProcessor", "Removing meta key", data.Key)
//cclog.ComponentDebug("MessageProcessor", "Removing meta key", data.Key)
message.RemoveMeta(data.Key)
case MESSAGE_LOCATION_FIELDS:
cclog.ComponentDebug("MessageProcessor", "Removing field key", data.Key)
//cclog.ComponentDebug("MessageProcessor", "Removing field key", data.Key)
message.RemoveField(data.Key)
}
switch to {
case MESSAGE_LOCATION_TAGS:
cclog.ComponentDebug("MessageProcessor", "Adding tag", data.Value, "->", v)
//cclog.ComponentDebug("MessageProcessor", "Adding tag", data.Value, "->", v)
message.AddTag(data.Value, v)
case MESSAGE_LOCATION_META:
cclog.ComponentDebug("MessageProcessor", "Adding meta", data.Value, "->", v)
//cclog.ComponentDebug("MessageProcessor", "Adding meta", data.Value, "->", v)
message.AddMeta(data.Value, v)
case MESSAGE_LOCATION_FIELDS:
cclog.ComponentDebug("MessageProcessor", "Adding field", data.Value, "->", v)
//cclog.ComponentDebug("MessageProcessor", "Adding field", data.Value, "->", v)
message.AddField(data.Value, v)
}
} else {
return false, fmt.Errorf("failed to get message entry: %s", data.Key)
}
}
}
@ -88,14 +85,14 @@ func deleteIf(message lp2.CCMessage, params *map[string]interface{}, checks *map
case "value", "event", "log", "control":
return false, errors.New("cannot delete protected fields")
default:
cclog.ComponentDebug("MessageProcessor", "Removing field for", data.Key)
//cclog.ComponentDebug("MessageProcessor", "Removing field for", data.Key)
message.RemoveField(data.Key)
}
case MESSAGE_LOCATION_TAGS:
cclog.ComponentDebug("MessageProcessor", "Removing tag for", data.Key)
//cclog.ComponentDebug("MessageProcessor", "Removing tag for", data.Key)
message.RemoveTag(data.Key)
case MESSAGE_LOCATION_META:
cclog.ComponentDebug("MessageProcessor", "Removing meta for", data.Key)
//cclog.ComponentDebug("MessageProcessor", "Removing meta for", data.Key)
message.RemoveMeta(data.Key)
}
}
@ -112,13 +109,13 @@ func addIf(message lp2.CCMessage, params *map[string]interface{}, checks *map[*v
if value.(bool) {
switch location {
case MESSAGE_LOCATION_FIELDS:
cclog.ComponentDebug("MessageProcessor", "Adding field", data.Value, "->", data.Value)
//cclog.ComponentDebug("MessageProcessor", "Adding field", data.Value, "->", data.Value)
message.AddField(data.Key, data.Value)
case MESSAGE_LOCATION_TAGS:
cclog.ComponentDebug("MessageProcessor", "Adding tag", data.Value, "->", data.Value)
//cclog.ComponentDebug("MessageProcessor", "Adding tag", data.Value, "->", data.Value)
message.AddTag(data.Key, data.Value)
case MESSAGE_LOCATION_META:
cclog.ComponentDebug("MessageProcessor", "Adding meta", data.Value, "->", data.Value)
//cclog.ComponentDebug("MessageProcessor", "Adding meta", data.Value, "->", data.Value)
message.AddMeta(data.Key, data.Value)
}
}
@ -191,13 +188,13 @@ func normalizeUnits(message lp2.CCMessage) (bool, error) {
if in_unit, ok := message.GetMeta("unit"); ok {
u := units.NewUnit(in_unit)
if u.Valid() {
cclog.ComponentDebug("MessageProcessor", "Update unit with", u.Short())
//cclog.ComponentDebug("MessageProcessor", "Update unit with", u.Short())
message.AddMeta("unit", u.Short())
}
} else if in_unit, ok := message.GetTag("unit"); ok {
u := units.NewUnit(in_unit)
if u.Valid() {
cclog.ComponentDebug("MessageProcessor", "Update unit with", u.Short())
//cclog.ComponentDebug("MessageProcessor", "Update unit with", u.Short())
message.AddTag("unit", u.Short())
}
}
@ -212,15 +209,15 @@ func changeUnitPrefix(message lp2.CCMessage, params *map[string]interface{}, che
}
if value.(bool) {
newPrefix := units.NewPrefix(n)
cclog.ComponentDebug("MessageProcessor", "Condition matches, change to prefix", newPrefix.String())
//cclog.ComponentDebug("MessageProcessor", "Condition matches, change to prefix", newPrefix.String())
if in_unit, ok := message.GetMeta("unit"); ok && newPrefix != units.InvalidPrefix {
u := units.NewUnit(in_unit)
if u.Valid() {
cclog.ComponentDebug("MessageProcessor", "Input unit", u.Short())
//cclog.ComponentDebug("MessageProcessor", "Input unit", u.Short())
conv, out_unit := units.GetUnitPrefixFactor(u, newPrefix)
if conv != nil && out_unit.Valid() {
if val, ok := message.GetField("value"); ok {
cclog.ComponentDebug("MessageProcessor", "Update unit with", out_unit.Short())
//cclog.ComponentDebug("MessageProcessor", "Update unit with", out_unit.Short())
message.AddField("value", conv(val))
message.AddMeta("unit", out_unit.Short())
}
@ -230,11 +227,11 @@ func changeUnitPrefix(message lp2.CCMessage, params *map[string]interface{}, che
} else if in_unit, ok := message.GetTag("unit"); ok && newPrefix != units.InvalidPrefix {
u := units.NewUnit(in_unit)
if u.Valid() {
cclog.ComponentDebug("MessageProcessor", "Input unit", u.Short())
//cclog.ComponentDebug("MessageProcessor", "Input unit", u.Short())
conv, out_unit := units.GetUnitPrefixFactor(u, newPrefix)
if conv != nil && out_unit.Valid() {
if val, ok := message.GetField("value"); ok {
cclog.ComponentDebug("MessageProcessor", "Update unit with", out_unit.Short())
//cclog.ComponentDebug("MessageProcessor", "Update unit with", out_unit.Short())
message.AddField("value", conv(val))
message.AddTag("unit", out_unit.Short())
}
@ -255,9 +252,9 @@ func renameMessagesIf(message lp2.CCMessage, params *map[string]interface{}, che
}
if value.(bool) {
old := message.Name()
cclog.ComponentDebug("MessageProcessor", "Rename to", n)
//cclog.ComponentDebug("MessageProcessor", "Rename to", n)
message.SetName(n)
cclog.ComponentDebug("MessageProcessor", "Add old name as 'oldname' to meta", old)
//cclog.ComponentDebug("MessageProcessor", "Add old name as 'oldname' to meta", old)
message.AddMeta("oldname", old)
}
}