Use common function to add message to ILP encoder

This commit is contained in:
Thomas Roehl 2024-12-20 15:43:19 +01:00
parent 7840de7b82
commit 27faafef78
3 changed files with 67 additions and 120 deletions

View File

@ -51,11 +51,6 @@ type HttpSinkConfig struct {
Precision string `json:"precision,omitempty"` Precision string `json:"precision,omitempty"`
} }
type key_value_pair struct {
key string
value string
}
type HttpSink struct { type HttpSink struct {
sink sink
client *http.Client client *http.Client
@ -84,64 +79,7 @@ func (s *HttpSink) Write(msg lp.CCMessage) error {
// Lock for encoder usage // Lock for encoder usage
s.encoderLock.Lock() s.encoderLock.Lock()
// Encode measurement name err = EncoderAdd(&s.encoder, m)
s.encoder.StartLine(m.Name())
// copy tags and meta data which should be used as tags
s.extended_tag_list = s.extended_tag_list[:0]
for key, value := range m.Tags() {
s.extended_tag_list =
append(
s.extended_tag_list,
key_value_pair{
key: key,
value: value,
},
)
}
// for _, key := range s.config.MetaAsTags {
// if value, ok := m.GetMeta(key); ok {
// s.extended_tag_list =
// append(
// s.extended_tag_list,
// key_value_pair{
// key: key,
// value: value,
// },
// )
// }
// }
// Encode tags (they musts be in lexical order)
slices.SortFunc(
s.extended_tag_list,
func(a key_value_pair, b key_value_pair) int {
if a.key < b.key {
return -1
}
if a.key > b.key {
return +1
}
return 0
},
)
for i := range s.extended_tag_list {
s.encoder.AddTag(
s.extended_tag_list[i].key,
s.extended_tag_list[i].value,
)
}
// Encode fields
for key, value := range m.Fields() {
s.encoder.AddField(key, influx.MustNewValue(value))
}
// Encode time stamp
s.encoder.EndLine(m.Time())
// Check for encoder errors
err := s.encoder.Err()
// Unlock encoder usage // Unlock encoder usage
s.encoderLock.Unlock() s.encoderLock.Unlock()

View File

@ -5,6 +5,8 @@ import (
lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message" lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message"
mp "github.com/ClusterCockpit/cc-metric-collector/pkg/messageProcessor" mp "github.com/ClusterCockpit/cc-metric-collector/pkg/messageProcessor"
influx "github.com/influxdata/line-protocol/v2/lineprotocol"
"golang.org/x/exp/slices"
) )
type defaultSinkConfig struct { type defaultSinkConfig struct {
@ -30,3 +32,57 @@ type Sink interface {
func (s *sink) Name() string { func (s *sink) Name() string {
return s.name return s.name
} }
type key_value_pair struct {
key string
value string
}
func EncoderAdd(encoder *influx.Encoder, msg lp.CCMessage) error {
// Encode measurement name
encoder.StartLine(msg.Name())
tag_list := make([]key_value_pair, 0, 10)
// copy tags and meta data which should be used as tags
for key, value := range msg.Tags() {
tag_list =
append(
tag_list,
key_value_pair{
key: key,
value: value,
},
)
}
// Encode tags (they musts be in lexical order)
slices.SortFunc(
tag_list,
func(a key_value_pair, b key_value_pair) int {
if a.key < b.key {
return -1
}
if a.key > b.key {
return +1
}
return 0
},
)
for i := range tag_list {
encoder.AddTag(
tag_list[i].key,
tag_list[i].value,
)
}
// Encode fields
for key, value := range msg.Fields() {
encoder.AddField(key, influx.MustNewValue(value))
}
// Encode time stamp
encoder.EndLine(msg.Time())
// Return encoder errors
return encoder.Err()
}

View File

@ -30,19 +30,16 @@ type NatsSinkConfig struct {
Precision string `json:"precision,omitempty"` Precision string `json:"precision,omitempty"`
} }
type NatsSink struct { type NatsSink struct {
sink sink
client *nats.Conn client *nats.Conn
encoder influx.Encoder encoder influx.Encoder
buffer *bytes.Buffer encoderLock sync.Mutex
config NatsSinkConfig config NatsSinkConfig
lock sync.Mutex
flushDelay time.Duration flushDelay time.Duration
flushTimer *time.Timer flushTimer *time.Timer
//timerLock sync.Mutex
extended_tag_list []key_value_pair
} }
func (s *NatsSink) connect() error { func (s *NatsSink) connect() error {
@ -78,54 +75,11 @@ func (s *NatsSink) connect() error {
func (s *NatsSink) Write(m lp.CCMessage) error { func (s *NatsSink) Write(m lp.CCMessage) error {
msg, err := s.mp.ProcessMessage(m) msg, err := s.mp.ProcessMessage(m)
if err == nil && msg != nil { if err == nil && msg != nil {
s.lock.Lock() s.encoderLock.Lock()
// Encode measurement name
s.encoder.StartLine(msg.Name())
// copy tags and meta data which should be used as tags err = EncoderAdd(&s.encoder, msg)
s.extended_tag_list = s.extended_tag_list[:0]
for key, value := range m.Tags() {
s.extended_tag_list =
append(
s.extended_tag_list,
key_value_pair{
key: key,
value: value,
},
)
}
// Encode tags (they musts be in lexical order)
slices.SortFunc(
s.extended_tag_list,
func(a key_value_pair, b key_value_pair) int {
if a.key < b.key {
return -1
}
if a.key > b.key {
return +1
}
return 0
},
)
for i := range s.extended_tag_list {
s.encoder.AddTag(
s.extended_tag_list[i].key,
s.extended_tag_list[i].value,
)
}
// Encode fields s.encoderLock.Unlock()
for key, value := range msg.Fields() {
s.encoder.AddField(key, influx.MustNewValue(value))
}
// Encode time stamp
s.encoder.EndLine(msg.Time())
// Check for encoder errors
err := s.encoder.Err()
s.lock.Unlock()
if err != nil { if err != nil {
cclog.ComponentError(s.name, "Write:", err.Error()) cclog.ComponentError(s.name, "Write:", err.Error())
return err return err
@ -146,10 +100,10 @@ func (s *NatsSink) Write(m lp.CCMessage) error {
} }
func (s *NatsSink) Flush() error { func (s *NatsSink) Flush() error {
s.lock.Lock() s.encoderLock.Lock()
buf := slices.Clone(s.encoder.Bytes()) buf := slices.Clone(s.encoder.Bytes())
s.encoder.Reset() s.encoder.Reset()
s.lock.Unlock() s.encoderLock.Unlock()
if len(buf) == 0 { if len(buf) == 0 {
return nil return nil
@ -233,7 +187,6 @@ func NewNatsSink(name string, config json.RawMessage) (Sink, error) {
return nil, err return nil, err
} }
} }
s.extended_tag_list = make([]key_value_pair, 0)
return s, nil return s, nil
} }