Fix JSON keys in message processor configuration

This commit is contained in:
Thomas Roehl 2024-12-12 05:23:54 +01:00
parent 8fd60afad9
commit beeea9e3aa

View File

@ -6,18 +6,18 @@ import (
"strings" "strings"
"sync" "sync"
lp2 "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message" lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message"
cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric" lplegacy "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
"github.com/expr-lang/expr" "github.com/expr-lang/expr"
"github.com/expr-lang/expr/vm" "github.com/expr-lang/expr/vm"
) )
// Message processor add/delete tag/meta configuration // Message processor add/delete tag/meta configuration
type messageProcessorTagConfig struct { type messageProcessorTagConfig struct {
Key string `json:"key"` // Tag name Key string `json:"key"` // Tag name
Value string `json:"value"` // Tag value Value string `json:"value,omitempty"` // Tag value
Condition string `json:"if"` // Condition for adding or removing corresponding tag Condition string `json:"if"` // Condition for adding or removing corresponding tag
} }
type messageProcessorConfig struct { type messageProcessorConfig struct {
@ -32,8 +32,8 @@ type messageProcessorConfig struct {
DelTagsIf []messageProcessorTagConfig `json:"delete_tags_if"` // List of tags that are removed when the condition is met DelTagsIf []messageProcessorTagConfig `json:"delete_tags_if"` // List of tags that are removed when the condition is met
AddMetaIf []messageProcessorTagConfig `json:"add_meta_if"` // List of meta infos that are added when the condition is met AddMetaIf []messageProcessorTagConfig `json:"add_meta_if"` // List of meta infos that are added when the condition is met
DelMetaIf []messageProcessorTagConfig `json:"delete_meta_if"` // List of meta infos that are removed when the condition is met DelMetaIf []messageProcessorTagConfig `json:"delete_meta_if"` // List of meta infos that are removed when the condition is met
AddFieldIf []messageProcessorTagConfig `json:"add_fields_if"` // List of fields that are added when the condition is met AddFieldIf []messageProcessorTagConfig `json:"add_field_if"` // List of fields that are added when the condition is met
DelFieldIf []messageProcessorTagConfig `json:"delete_fields_if"` // List of fields that are removed when the condition is met DelFieldIf []messageProcessorTagConfig `json:"delete_field_if"` // List of fields that are removed when the condition is met
DropByType []string `json:"drop_by_message_type"` // List of message types that should be dropped DropByType []string `json:"drop_by_message_type"` // List of message types that should be dropped
MoveTagToMeta []messageProcessorTagConfig `json:"move_tag_to_meta_if"` MoveTagToMeta []messageProcessorTagConfig `json:"move_tag_to_meta_if"`
MoveTagToField []messageProcessorTagConfig `json:"move_tag_to_field_if"` MoveTagToField []messageProcessorTagConfig `json:"move_tag_to_field_if"`
@ -117,8 +117,8 @@ type MessageProcessor interface {
// Read in a JSON configuration // Read in a JSON configuration
FromConfigJSON(config json.RawMessage) error FromConfigJSON(config json.RawMessage) error
// Processing functions for legacy CCMetric and current CCMessage // Processing functions for legacy CCMetric and current CCMessage
ProcessMetric(m lp.CCMetric) (lp2.CCMessage, error) ProcessMetric(m lplegacy.CCMetric) (lp.CCMessage, error)
ProcessMessage(m lp2.CCMessage) (lp2.CCMessage, error) ProcessMessage(m lp.CCMessage) (lp.CCMessage, error)
//EvalToBool(condition string, parameters map[string]interface{}) (bool, error) //EvalToBool(condition string, parameters map[string]interface{}) (bool, error)
//EvalToFloat64(condition string, parameters map[string]interface{}) (float64, error) //EvalToFloat64(condition string, parameters map[string]interface{}) (float64, error)
//EvalToString(condition string, parameters map[string]interface{}) (string, error) //EvalToString(condition string, parameters map[string]interface{}) (string, error)
@ -261,8 +261,8 @@ var baseenv = map[string]interface{}{
"log": "", "log": "",
}, },
"timestamp": 1234567890, "timestamp": 1234567890,
"msg": lp2.EmptyMessage(), "msg": lp.EmptyMessage(),
"message": lp2.EmptyMessage(), "message": lp.EmptyMessage(),
} }
func addBaseEnvWalker(values map[string]interface{}) map[string]interface{} { func addBaseEnvWalker(values map[string]interface{}) map[string]interface{} {
@ -759,8 +759,8 @@ func (mp *messageProcessor) FromConfigJSON(config json.RawMessage) error {
return nil return nil
} }
func (mp *messageProcessor) ProcessMetric(metric lp.CCMetric) (lp2.CCMessage, error) { func (mp *messageProcessor) ProcessMetric(metric lplegacy.CCMetric) (lp.CCMessage, error) {
m, err := lp2.NewMessage( m, err := lp.NewMessage(
metric.Name(), metric.Name(),
metric.Tags(), metric.Tags(),
metric.Meta(), metric.Meta(),
@ -774,9 +774,9 @@ func (mp *messageProcessor) ProcessMetric(metric lp.CCMetric) (lp2.CCMessage, er
} }
func (mp *messageProcessor) ProcessMessage(m lp2.CCMessage) (lp2.CCMessage, error) { func (mp *messageProcessor) ProcessMessage(m lp.CCMessage) (lp.CCMessage, error) {
var err error = nil var err error = nil
var out lp2.CCMessage = lp2.FromMessage(m) var out lp.CCMessage = lp.FromMessage(m)
name := out.Name() name := out.Name()
@ -945,7 +945,7 @@ func (mp *messageProcessor) ProcessMessage(m lp2.CCMessage) (lp2.CCMessage, erro
case STAGENAME_NORMALIZE_UNIT: case STAGENAME_NORMALIZE_UNIT:
if mp.normalizeUnits { if mp.normalizeUnits {
cclog.ComponentDebug("MessageProcessor", "Normalize units") cclog.ComponentDebug("MessageProcessor", "Normalize units")
if lp2.IsMetric(out) { if lp.IsMetric(out) {
_, err := normalizeUnits(out) _, err := normalizeUnits(out)
if err != nil { if err != nil {
return out, fmt.Errorf("failed to evaluate: %v", err.Error()) return out, fmt.Errorf("failed to evaluate: %v", err.Error())
@ -958,7 +958,7 @@ func (mp *messageProcessor) ProcessMessage(m lp2.CCMessage) (lp2.CCMessage, erro
case STAGENAME_CHANGE_UNIT_PREFIX: case STAGENAME_CHANGE_UNIT_PREFIX:
if len(mp.changeUnitPrefix) > 0 { if len(mp.changeUnitPrefix) > 0 {
cclog.ComponentDebug("MessageProcessor", "Change unit prefix") cclog.ComponentDebug("MessageProcessor", "Change unit prefix")
if lp2.IsMetric(out) { if lp.IsMetric(out) {
_, err := changeUnitPrefix(out, &params, &mp.changeUnitPrefix) _, err := changeUnitPrefix(out, &params, &mp.changeUnitPrefix)
if err != nil { if err != nil {
return out, fmt.Errorf("failed to evaluate: %v", err.Error()) return out, fmt.Errorf("failed to evaluate: %v", err.Error())