From 95c0803a3c42680bc2d074f52c5f1fcc265f0eb6 Mon Sep 17 00:00:00 2001 From: Thomas Roehl Date: Fri, 20 Dec 2024 15:43:19 +0100 Subject: [PATCH] Use common function to add message to ILP encoder --- sinks/httpSink.go | 64 +------------------------------------------ sinks/metricSink.go | 56 +++++++++++++++++++++++++++++++++++++ sinks/natsSink.go | 67 +++++++-------------------------------------- 3 files changed, 67 insertions(+), 120 deletions(-) diff --git a/sinks/httpSink.go b/sinks/httpSink.go index 44d6dea..67dd85a 100644 --- a/sinks/httpSink.go +++ b/sinks/httpSink.go @@ -51,11 +51,6 @@ type HttpSinkConfig struct { Precision string `json:"precision,omitempty"` } -type key_value_pair struct { - key string - value string -} - type HttpSink struct { sink client *http.Client @@ -84,64 +79,7 @@ func (s *HttpSink) Write(msg lp.CCMessage) error { // Lock for encoder usage s.encoderLock.Lock() - // Encode measurement name - s.encoder.StartLine(m.Name()) - - // copy tags and meta data which should be used as tags - s.extended_tag_list = s.extended_tag_list[:0] - for key, value := range m.Tags() { - s.extended_tag_list = - append( - s.extended_tag_list, - key_value_pair{ - key: key, - value: value, - }, - ) - } - // for _, key := range s.config.MetaAsTags { - // if value, ok := m.GetMeta(key); ok { - // s.extended_tag_list = - // append( - // s.extended_tag_list, - // key_value_pair{ - // key: key, - // value: value, - // }, - // ) - // } - // } - - // Encode tags (they musts be in lexical order) - slices.SortFunc( - s.extended_tag_list, - func(a key_value_pair, b key_value_pair) int { - if a.key < b.key { - return -1 - } - if a.key > b.key { - return +1 - } - return 0 - }, - ) - for i := range s.extended_tag_list { - s.encoder.AddTag( - s.extended_tag_list[i].key, - s.extended_tag_list[i].value, - ) - } - - // Encode fields - for key, value := range m.Fields() { - s.encoder.AddField(key, influx.MustNewValue(value)) - } - - // Encode time stamp - s.encoder.EndLine(m.Time()) - - // Check for encoder errors - err := s.encoder.Err() + err = EncoderAdd(&s.encoder, m) // Unlock encoder usage s.encoderLock.Unlock() diff --git a/sinks/metricSink.go b/sinks/metricSink.go index 4cac04b..f3e95a3 100644 --- a/sinks/metricSink.go +++ b/sinks/metricSink.go @@ -5,6 +5,8 @@ import ( lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message" mp "github.com/ClusterCockpit/cc-metric-collector/pkg/messageProcessor" + influx "github.com/influxdata/line-protocol/v2/lineprotocol" + "golang.org/x/exp/slices" ) type defaultSinkConfig struct { @@ -30,3 +32,57 @@ type Sink interface { func (s *sink) Name() string { return s.name } + +type key_value_pair struct { + key string + value string +} + +func EncoderAdd(encoder *influx.Encoder, msg lp.CCMessage) error { + // Encode measurement name + encoder.StartLine(msg.Name()) + + tag_list := make([]key_value_pair, 0, 10) + + // copy tags and meta data which should be used as tags + for key, value := range msg.Tags() { + tag_list = + append( + tag_list, + key_value_pair{ + key: key, + value: value, + }, + ) + } + // Encode tags (they musts be in lexical order) + slices.SortFunc( + tag_list, + func(a key_value_pair, b key_value_pair) int { + if a.key < b.key { + return -1 + } + if a.key > b.key { + return +1 + } + return 0 + }, + ) + for i := range tag_list { + encoder.AddTag( + tag_list[i].key, + tag_list[i].value, + ) + } + + // Encode fields + for key, value := range msg.Fields() { + encoder.AddField(key, influx.MustNewValue(value)) + } + + // Encode time stamp + encoder.EndLine(msg.Time()) + + // Return encoder errors + return encoder.Err() +} diff --git a/sinks/natsSink.go b/sinks/natsSink.go index 1982bfe..b2f0b18 100644 --- a/sinks/natsSink.go +++ b/sinks/natsSink.go @@ -30,19 +30,16 @@ type NatsSinkConfig struct { Precision string `json:"precision,omitempty"` } - type NatsSink struct { sink - client *nats.Conn - encoder influx.Encoder - buffer *bytes.Buffer - config NatsSinkConfig + client *nats.Conn + encoder influx.Encoder + encoderLock sync.Mutex + config NatsSinkConfig - lock sync.Mutex flushDelay time.Duration flushTimer *time.Timer - - extended_tag_list []key_value_pair + //timerLock sync.Mutex } func (s *NatsSink) connect() error { @@ -78,54 +75,11 @@ func (s *NatsSink) connect() error { func (s *NatsSink) Write(m lp.CCMessage) error { msg, err := s.mp.ProcessMessage(m) if err == nil && msg != nil { - s.lock.Lock() - // Encode measurement name - s.encoder.StartLine(msg.Name()) + s.encoderLock.Lock() - // copy tags and meta data which should be used as tags - s.extended_tag_list = s.extended_tag_list[:0] - for key, value := range m.Tags() { - s.extended_tag_list = - append( - s.extended_tag_list, - key_value_pair{ - key: key, - value: value, - }, - ) - } - // Encode tags (they musts be in lexical order) - slices.SortFunc( - s.extended_tag_list, - func(a key_value_pair, b key_value_pair) int { - if a.key < b.key { - return -1 - } - if a.key > b.key { - return +1 - } - return 0 - }, - ) - for i := range s.extended_tag_list { - s.encoder.AddTag( - s.extended_tag_list[i].key, - s.extended_tag_list[i].value, - ) - } + err = EncoderAdd(&s.encoder, msg) - // Encode fields - for key, value := range msg.Fields() { - s.encoder.AddField(key, influx.MustNewValue(value)) - } - - // Encode time stamp - s.encoder.EndLine(msg.Time()) - - // Check for encoder errors - err := s.encoder.Err() - - s.lock.Unlock() + s.encoderLock.Unlock() if err != nil { cclog.ComponentError(s.name, "Write:", err.Error()) return err @@ -146,10 +100,10 @@ func (s *NatsSink) Write(m lp.CCMessage) error { } func (s *NatsSink) Flush() error { - s.lock.Lock() + s.encoderLock.Lock() buf := slices.Clone(s.encoder.Bytes()) s.encoder.Reset() - s.lock.Unlock() + s.encoderLock.Unlock() if len(buf) == 0 { return nil @@ -233,7 +187,6 @@ func NewNatsSink(name string, config json.RawMessage) (Sink, error) { return nil, err } } - s.extended_tag_list = make([]key_value_pair, 0) return s, nil }