2021-03-26 16:48:09 +01:00
|
|
|
package sinks
|
|
|
|
|
|
|
|
import (
|
2024-12-19 23:00:14 +01:00
|
|
|
"encoding/json"
|
|
|
|
|
|
|
|
lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message"
|
|
|
|
mp "github.com/ClusterCockpit/cc-metric-collector/pkg/messageProcessor"
|
2024-12-20 15:43:19 +01:00
|
|
|
influx "github.com/influxdata/line-protocol/v2/lineprotocol"
|
|
|
|
"golang.org/x/exp/slices"
|
2021-03-26 16:48:09 +01:00
|
|
|
)
|
|
|
|
|
2022-02-04 18:12:24 +01:00
|
|
|
type defaultSinkConfig struct {
|
2024-12-19 23:00:14 +01:00
|
|
|
MetaAsTags []string `json:"meta_as_tags,omitempty"`
|
|
|
|
MessageProcessor json.RawMessage `json:"process_messages,omitempty"`
|
|
|
|
Type string `json:"type"`
|
2021-05-18 15:16:10 +02:00
|
|
|
}
|
|
|
|
|
2022-01-25 15:37:43 +01:00
|
|
|
type sink struct {
|
2024-12-19 23:00:14 +01:00
|
|
|
meta_as_tags map[string]bool // Use meta data tags as tags
|
|
|
|
mp mp.MessageProcessor // message processor for the sink
|
|
|
|
name string // Name of the sink
|
2021-03-26 16:48:09 +01:00
|
|
|
}
|
|
|
|
|
2022-01-25 15:37:43 +01:00
|
|
|
type Sink interface {
|
2024-12-19 23:00:14 +01:00
|
|
|
Write(point lp.CCMessage) error // Write metric to the sink
|
|
|
|
Flush() error // Flush buffered metrics
|
|
|
|
Close() // Close / finish metric sink
|
|
|
|
Name() string // Name of the metric sink
|
2022-01-25 15:37:43 +01:00
|
|
|
}
|
|
|
|
|
2022-02-28 12:16:48 +01:00
|
|
|
// Name returns the name of the metric sink
|
2022-01-25 15:37:43 +01:00
|
|
|
func (s *sink) Name() string {
|
|
|
|
return s.name
|
2021-03-26 16:48:09 +01:00
|
|
|
}
|
2024-12-20 15:43:19 +01:00
|
|
|
|
|
|
|
type key_value_pair struct {
|
|
|
|
key string
|
|
|
|
value string
|
|
|
|
}
|
|
|
|
|
|
|
|
func EncoderAdd(encoder *influx.Encoder, msg lp.CCMessage) error {
|
|
|
|
// Encode measurement name
|
|
|
|
encoder.StartLine(msg.Name())
|
|
|
|
|
|
|
|
tag_list := make([]key_value_pair, 0, 10)
|
|
|
|
|
|
|
|
// copy tags and meta data which should be used as tags
|
|
|
|
for key, value := range msg.Tags() {
|
|
|
|
tag_list =
|
|
|
|
append(
|
|
|
|
tag_list,
|
|
|
|
key_value_pair{
|
|
|
|
key: key,
|
|
|
|
value: value,
|
|
|
|
},
|
|
|
|
)
|
|
|
|
}
|
|
|
|
// Encode tags (they musts be in lexical order)
|
|
|
|
slices.SortFunc(
|
|
|
|
tag_list,
|
|
|
|
func(a key_value_pair, b key_value_pair) int {
|
|
|
|
if a.key < b.key {
|
|
|
|
return -1
|
|
|
|
}
|
|
|
|
if a.key > b.key {
|
|
|
|
return +1
|
|
|
|
}
|
|
|
|
return 0
|
|
|
|
},
|
|
|
|
)
|
|
|
|
for i := range tag_list {
|
|
|
|
encoder.AddTag(
|
|
|
|
tag_list[i].key,
|
|
|
|
tag_list[i].value,
|
|
|
|
)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Encode fields
|
|
|
|
for key, value := range msg.Fields() {
|
|
|
|
encoder.AddField(key, influx.MustNewValue(value))
|
|
|
|
}
|
|
|
|
|
|
|
|
// Encode time stamp
|
|
|
|
encoder.EndLine(msg.Time())
|
|
|
|
|
|
|
|
// Return encoder errors
|
|
|
|
return encoder.Err()
|
|
|
|
}
|