mirror of
https://github.com/ClusterCockpit/cc-metric-collector.git
synced 2025-01-26 22:09:06 +01:00
7840de7b82
* Add cpu_used (all-cpu_idle) to CpustatCollector * Update cc-metric-collector.init * Allow selection of timestamp precision in HttpSink * Add comment about precision requirement for cc-metric-store * Fix for API changes in gofish@v0.15.0 * Update requirements to latest version * Read sensors through redfish * Update golang toolchain to 1.21 * Remove stray error check * Update main config in configuration.md * Update Release action to use golang 1.22 stable release, no golang RPMs anymore * Update runonce action to use golang 1.22 stable release, no golang RPMs anymore * Update README.md Use right JSON type in configuration * Update sink's README * Test whether ipmitool or ipmi-sensors can be executed without errors * Little fixes to the prometheus sink (#115) * Add uint64 to float64 cast option * Add prometheus sink to the list of available sinks * Add aggregated counters by gpu for nvlink errors --------- Co-authored-by: Michael Schwarz <schwarz@uni-paderborn.de> * Ccmessage migration (#119) * Add cpu_used (all-cpu_idle) to CpustatCollector * Update cc-metric-collector.init * Allow selection of timestamp precision in HttpSink * Add comment about precision requirement for cc-metric-store * Fix for API changes in gofish@v0.15.0 * Update requirements to latest version * Read sensors through redfish * Update golang toolchain to 1.21 * Remove stray error check * Update main config in configuration.md * Update Release action to use golang 1.22 stable release, no golang RPMs anymore * Update runonce action to use golang 1.22 stable release, no golang RPMs anymore * Switch to CCMessage for all files. --------- Co-authored-by: Holger Obermaier <Holger.Obermaier@kit.edu> Co-authored-by: Holger Obermaier <40787752+ho-ob@users.noreply.github.com> * Switch to ccmessage also for latest additions in nvidiaMetric * New Message processor (#118) * Add cpu_used (all-cpu_idle) to CpustatCollector * Update cc-metric-collector.init * Allow selection of timestamp precision in HttpSink * Add comment about precision requirement for cc-metric-store * Fix for API changes in gofish@v0.15.0 * Update requirements to latest version * Read sensors through redfish * Update golang toolchain to 1.21 * Remove stray error check * Update main config in configuration.md * Update Release action to use golang 1.22 stable release, no golang RPMs anymore * Update runonce action to use golang 1.22 stable release, no golang RPMs anymore * New message processor to check whether a message should be dropped or manipulate it in flight * Create a copy of message before manipulation --------- Co-authored-by: Holger Obermaier <Holger.Obermaier@kit.edu> Co-authored-by: Holger Obermaier <40787752+ho-ob@users.noreply.github.com> * Update collector's Makefile and go.mod/sum files * Use message processor in router, all sinks and all receivers * Add support for credential file (NKEY) to NATS sink and receiver * Fix JSON keys in message processor configuration * Update docs for message processor, router and the default router config file * Add link to expr syntax and fix regex matching docs * Update sample collectors * Minor style change in collector manager * Some helpers for ccTopology * LIKWID collector: write log owner change only once * Fix for metrics without units and reduce debugging messages for messageProcessor * Use shorted hostname for hostname added by router * Define default port for NATS * CPUstat collector: only add unit for applicable metrics * Add precision option to all sinks using Influx's encoder * Add message processor to all sink documentation * Add units to documentation of cpustat collector --------- Co-authored-by: Holger Obermaier <Holger.Obermaier@kit.edu> Co-authored-by: Holger Obermaier <40787752+ho-ob@users.noreply.github.com> Co-authored-by: oscarminus <me@oscarminus.de> Co-authored-by: Michael Schwarz <schwarz@uni-paderborn.de>
263 lines
9.8 KiB
Go
263 lines
9.8 KiB
Go
package messageprocessor
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
|
|
lp2 "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message"
|
|
units "github.com/ClusterCockpit/cc-units"
|
|
"github.com/expr-lang/expr"
|
|
"github.com/expr-lang/expr/vm"
|
|
)
|
|
|
|
type MessageLocation int
|
|
|
|
const (
|
|
MESSAGE_LOCATION_TAGS MessageLocation = iota
|
|
MESSAGE_LOCATION_META
|
|
MESSAGE_LOCATION_FIELDS
|
|
)
|
|
|
|
// Abstract function to move entries from one location to another
|
|
func moveInMessage(message lp2.CCMessage, params *map[string]interface{}, checks *map[*vm.Program]messageProcessorTagConfig, from, to MessageLocation) (bool, error) {
|
|
for d, data := range *checks {
|
|
value, err := expr.Run(d, *params)
|
|
if err != nil {
|
|
return false, fmt.Errorf("failed to evaluate: %v", err.Error())
|
|
}
|
|
//cclog.ComponentDebug("MessageProcessor", "Move from", from, "to", to)
|
|
if value.(bool) {
|
|
var v string
|
|
var ok bool = false
|
|
switch from {
|
|
case MESSAGE_LOCATION_TAGS:
|
|
//cclog.ComponentDebug("MessageProcessor", "Getting tag key", data.Key)
|
|
v, ok = message.GetTag(data.Key)
|
|
case MESSAGE_LOCATION_META:
|
|
//cclog.ComponentDebug("MessageProcessor", "Getting meta key", data.Key)
|
|
//cclog.ComponentDebug("MessageProcessor", message.Meta())
|
|
v, ok = message.GetMeta(data.Key)
|
|
case MESSAGE_LOCATION_FIELDS:
|
|
var x interface{}
|
|
//cclog.ComponentDebug("MessageProcessor", "Getting field key", data.Key)
|
|
x, ok = message.GetField(data.Key)
|
|
v = fmt.Sprintf("%v", x)
|
|
}
|
|
if ok {
|
|
switch from {
|
|
case MESSAGE_LOCATION_TAGS:
|
|
//cclog.ComponentDebug("MessageProcessor", "Removing tag key", data.Key)
|
|
message.RemoveTag(data.Key)
|
|
case MESSAGE_LOCATION_META:
|
|
//cclog.ComponentDebug("MessageProcessor", "Removing meta key", data.Key)
|
|
message.RemoveMeta(data.Key)
|
|
case MESSAGE_LOCATION_FIELDS:
|
|
//cclog.ComponentDebug("MessageProcessor", "Removing field key", data.Key)
|
|
message.RemoveField(data.Key)
|
|
}
|
|
switch to {
|
|
case MESSAGE_LOCATION_TAGS:
|
|
//cclog.ComponentDebug("MessageProcessor", "Adding tag", data.Value, "->", v)
|
|
message.AddTag(data.Value, v)
|
|
case MESSAGE_LOCATION_META:
|
|
//cclog.ComponentDebug("MessageProcessor", "Adding meta", data.Value, "->", v)
|
|
message.AddMeta(data.Value, v)
|
|
case MESSAGE_LOCATION_FIELDS:
|
|
//cclog.ComponentDebug("MessageProcessor", "Adding field", data.Value, "->", v)
|
|
message.AddField(data.Value, v)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return false, nil
|
|
}
|
|
|
|
func deleteIf(message lp2.CCMessage, params *map[string]interface{}, checks *map[*vm.Program]messageProcessorTagConfig, location MessageLocation) (bool, error) {
|
|
for d, data := range *checks {
|
|
value, err := expr.Run(d, *params)
|
|
if err != nil {
|
|
return true, fmt.Errorf("failed to evaluate: %v", err.Error())
|
|
}
|
|
if value.(bool) {
|
|
switch location {
|
|
case MESSAGE_LOCATION_FIELDS:
|
|
switch data.Key {
|
|
case "value", "event", "log", "control":
|
|
return false, errors.New("cannot delete protected fields")
|
|
default:
|
|
//cclog.ComponentDebug("MessageProcessor", "Removing field for", data.Key)
|
|
message.RemoveField(data.Key)
|
|
}
|
|
case MESSAGE_LOCATION_TAGS:
|
|
//cclog.ComponentDebug("MessageProcessor", "Removing tag for", data.Key)
|
|
message.RemoveTag(data.Key)
|
|
case MESSAGE_LOCATION_META:
|
|
//cclog.ComponentDebug("MessageProcessor", "Removing meta for", data.Key)
|
|
message.RemoveMeta(data.Key)
|
|
}
|
|
}
|
|
}
|
|
return false, nil
|
|
}
|
|
|
|
func addIf(message lp2.CCMessage, params *map[string]interface{}, checks *map[*vm.Program]messageProcessorTagConfig, location MessageLocation) (bool, error) {
|
|
for d, data := range *checks {
|
|
value, err := expr.Run(d, *params)
|
|
if err != nil {
|
|
return true, fmt.Errorf("failed to evaluate: %v", err.Error())
|
|
}
|
|
if value.(bool) {
|
|
switch location {
|
|
case MESSAGE_LOCATION_FIELDS:
|
|
//cclog.ComponentDebug("MessageProcessor", "Adding field", data.Value, "->", data.Value)
|
|
message.AddField(data.Key, data.Value)
|
|
case MESSAGE_LOCATION_TAGS:
|
|
//cclog.ComponentDebug("MessageProcessor", "Adding tag", data.Value, "->", data.Value)
|
|
message.AddTag(data.Key, data.Value)
|
|
case MESSAGE_LOCATION_META:
|
|
//cclog.ComponentDebug("MessageProcessor", "Adding meta", data.Value, "->", data.Value)
|
|
message.AddMeta(data.Key, data.Value)
|
|
}
|
|
}
|
|
}
|
|
return false, nil
|
|
}
|
|
|
|
func deleteTagIf(message lp2.CCMessage, params *map[string]interface{}, checks *map[*vm.Program]messageProcessorTagConfig) (bool, error) {
|
|
return deleteIf(message, params, checks, MESSAGE_LOCATION_TAGS)
|
|
}
|
|
|
|
func addTagIf(message lp2.CCMessage, params *map[string]interface{}, checks *map[*vm.Program]messageProcessorTagConfig) (bool, error) {
|
|
return addIf(message, params, checks, MESSAGE_LOCATION_TAGS)
|
|
}
|
|
|
|
func moveTagToMeta(message lp2.CCMessage, params *map[string]interface{}, checks *map[*vm.Program]messageProcessorTagConfig) (bool, error) {
|
|
return moveInMessage(message, params, checks, MESSAGE_LOCATION_TAGS, MESSAGE_LOCATION_META)
|
|
}
|
|
|
|
func moveTagToField(message lp2.CCMessage, params *map[string]interface{}, checks *map[*vm.Program]messageProcessorTagConfig) (bool, error) {
|
|
return moveInMessage(message, params, checks, MESSAGE_LOCATION_TAGS, MESSAGE_LOCATION_FIELDS)
|
|
}
|
|
|
|
func deleteMetaIf(message lp2.CCMessage, params *map[string]interface{}, checks *map[*vm.Program]messageProcessorTagConfig) (bool, error) {
|
|
return deleteIf(message, params, checks, MESSAGE_LOCATION_META)
|
|
}
|
|
|
|
func addMetaIf(message lp2.CCMessage, params *map[string]interface{}, checks *map[*vm.Program]messageProcessorTagConfig) (bool, error) {
|
|
return addIf(message, params, checks, MESSAGE_LOCATION_META)
|
|
}
|
|
|
|
func moveMetaToTag(message lp2.CCMessage, params *map[string]interface{}, checks *map[*vm.Program]messageProcessorTagConfig) (bool, error) {
|
|
return moveInMessage(message, params, checks, MESSAGE_LOCATION_META, MESSAGE_LOCATION_TAGS)
|
|
}
|
|
|
|
func moveMetaToField(message lp2.CCMessage, params *map[string]interface{}, checks *map[*vm.Program]messageProcessorTagConfig) (bool, error) {
|
|
return moveInMessage(message, params, checks, MESSAGE_LOCATION_META, MESSAGE_LOCATION_FIELDS)
|
|
}
|
|
|
|
func deleteFieldIf(message lp2.CCMessage, params *map[string]interface{}, checks *map[*vm.Program]messageProcessorTagConfig) (bool, error) {
|
|
return deleteIf(message, params, checks, MESSAGE_LOCATION_FIELDS)
|
|
}
|
|
|
|
func addFieldIf(message lp2.CCMessage, params *map[string]interface{}, checks *map[*vm.Program]messageProcessorTagConfig) (bool, error) {
|
|
return addIf(message, params, checks, MESSAGE_LOCATION_FIELDS)
|
|
}
|
|
|
|
func moveFieldToTag(message lp2.CCMessage, params *map[string]interface{}, checks *map[*vm.Program]messageProcessorTagConfig) (bool, error) {
|
|
return moveInMessage(message, params, checks, MESSAGE_LOCATION_FIELDS, MESSAGE_LOCATION_TAGS)
|
|
}
|
|
|
|
func moveFieldToMeta(message lp2.CCMessage, params *map[string]interface{}, checks *map[*vm.Program]messageProcessorTagConfig) (bool, error) {
|
|
return moveInMessage(message, params, checks, MESSAGE_LOCATION_FIELDS, MESSAGE_LOCATION_META)
|
|
}
|
|
|
|
func dropMessagesIf(params *map[string]interface{}, checks *map[*vm.Program]struct{}) (bool, error) {
|
|
for d := range *checks {
|
|
value, err := expr.Run(d, *params)
|
|
if err != nil {
|
|
return false, fmt.Errorf("failed to evaluate: %v", err.Error())
|
|
}
|
|
if value.(bool) {
|
|
return true, nil
|
|
}
|
|
}
|
|
return false, nil
|
|
}
|
|
|
|
func normalizeUnits(message lp2.CCMessage) (bool, error) {
|
|
if in_unit, ok := message.GetMeta("unit"); ok {
|
|
u := units.NewUnit(in_unit)
|
|
if u.Valid() {
|
|
//cclog.ComponentDebug("MessageProcessor", "Update unit with", u.Short())
|
|
message.AddMeta("unit", u.Short())
|
|
}
|
|
} else if in_unit, ok := message.GetTag("unit"); ok {
|
|
u := units.NewUnit(in_unit)
|
|
if u.Valid() {
|
|
//cclog.ComponentDebug("MessageProcessor", "Update unit with", u.Short())
|
|
message.AddTag("unit", u.Short())
|
|
}
|
|
}
|
|
return false, nil
|
|
}
|
|
|
|
func changeUnitPrefix(message lp2.CCMessage, params *map[string]interface{}, checks *map[*vm.Program]string) (bool, error) {
|
|
for r, n := range *checks {
|
|
value, err := expr.Run(r, *params)
|
|
if err != nil {
|
|
return false, fmt.Errorf("failed to evaluate: %v", err.Error())
|
|
}
|
|
if value.(bool) {
|
|
newPrefix := units.NewPrefix(n)
|
|
//cclog.ComponentDebug("MessageProcessor", "Condition matches, change to prefix", newPrefix.String())
|
|
if in_unit, ok := message.GetMeta("unit"); ok && newPrefix != units.InvalidPrefix {
|
|
u := units.NewUnit(in_unit)
|
|
if u.Valid() {
|
|
//cclog.ComponentDebug("MessageProcessor", "Input unit", u.Short())
|
|
conv, out_unit := units.GetUnitPrefixFactor(u, newPrefix)
|
|
if conv != nil && out_unit.Valid() {
|
|
if val, ok := message.GetField("value"); ok {
|
|
//cclog.ComponentDebug("MessageProcessor", "Update unit with", out_unit.Short())
|
|
message.AddField("value", conv(val))
|
|
message.AddMeta("unit", out_unit.Short())
|
|
}
|
|
}
|
|
}
|
|
|
|
} else if in_unit, ok := message.GetTag("unit"); ok && newPrefix != units.InvalidPrefix {
|
|
u := units.NewUnit(in_unit)
|
|
if u.Valid() {
|
|
//cclog.ComponentDebug("MessageProcessor", "Input unit", u.Short())
|
|
conv, out_unit := units.GetUnitPrefixFactor(u, newPrefix)
|
|
if conv != nil && out_unit.Valid() {
|
|
if val, ok := message.GetField("value"); ok {
|
|
//cclog.ComponentDebug("MessageProcessor", "Update unit with", out_unit.Short())
|
|
message.AddField("value", conv(val))
|
|
message.AddTag("unit", out_unit.Short())
|
|
}
|
|
}
|
|
}
|
|
|
|
}
|
|
}
|
|
}
|
|
return false, nil
|
|
}
|
|
|
|
func renameMessagesIf(message lp2.CCMessage, params *map[string]interface{}, checks *map[*vm.Program]string) (bool, error) {
|
|
for d, n := range *checks {
|
|
value, err := expr.Run(d, *params)
|
|
if err != nil {
|
|
return true, fmt.Errorf("failed to evaluate: %v", err.Error())
|
|
}
|
|
if value.(bool) {
|
|
old := message.Name()
|
|
//cclog.ComponentDebug("MessageProcessor", "Rename to", n)
|
|
message.SetName(n)
|
|
//cclog.ComponentDebug("MessageProcessor", "Add old name as 'oldname' to meta", old)
|
|
message.AddMeta("oldname", old)
|
|
}
|
|
}
|
|
return false, nil
|
|
}
|