From c472029c2d19e62ab03e223672238e2f94f3dae7 Mon Sep 17 00:00:00 2001 From: Holger Obermaier <40787752+ho-ob@users.noreply.github.com> Date: Tue, 19 Sep 2023 13:33:25 +0200 Subject: [PATCH] Add tags in lexical order as required by AddTag() --- sinks/httpSink.go | 50 +++++++++++++++++++++++++++++------------------ 1 file changed, 31 insertions(+), 19 deletions(-) diff --git a/sinks/httpSink.go b/sinks/httpSink.go index 83440d0..c449d1b 100644 --- a/sinks/httpSink.go +++ b/sinks/httpSink.go @@ -44,13 +44,19 @@ type HttpSinkConfig struct { type HttpSink struct { sink - client *http.Client + client *http.Client + // influx line protocol encoder encoder influx.Encoder - // Flush() runs in another goroutine, so this encoderLock has to protect the encoder + // Flush() runs in another goroutine and accesses the influx line protocol encoder, + // so this encoderLock has to protect the encoder encoderLock sync.Mutex - timerLock sync.Mutex - flushTimer *time.Timer - config HttpSinkConfig + + // timer to run Flush() + flushTimer *time.Timer + // Lock to assure that only one timer is running at a time + timerLock sync.Mutex + + config HttpSinkConfig } // Write sends metric m as http message @@ -62,18 +68,24 @@ func (s *HttpSink) Write(m lp.CCMetric) error { // Encode measurement name s.encoder.StartLine(m.Name()) - // Encode tags + // copy tags and meta data which should be used as tags + tags := make(map[string]string) + keys := make([]string, 0) for key, value := range m.Tags() { - s.encoder.AddTag(key, value) + keys = append(keys, key) + tags[key] = value + } + for _, key := range s.config.MetaAsTags { + if value, ok := m.GetMeta(key); ok { + keys = append(keys, key) + tags[key] = value + } } - // Encode metadata as tags - for key, use_meta_as_tag := range s.meta_as_tags { - if use_meta_as_tag { - if val, ok := m.GetMeta(key); ok { - s.encoder.AddTag(key, val) - } - } + // Encode tags + slices.Sort(keys) + for _, key := range keys { + s.encoder.AddTag(key, tags[key]) } // Encode fields @@ -234,11 +246,8 @@ func NewHttpSink(name string, config json.RawMessage) (Sink, error) { cclog.ComponentDebug(s.name, "flushDelay", t) } } - // Create lookup map to use meta infos as tags in the output metric - s.meta_as_tags = make(map[string]bool) - for _, k := range s.config.MetaAsTags { - s.meta_as_tags[k] = true - } + + // Create http client s.client = &http.Client{ Transport: &http.Transport{ MaxIdleConns: 1, // We will only ever talk to one host. @@ -246,6 +255,9 @@ func NewHttpSink(name string, config json.RawMessage) (Sink, error) { }, Timeout: s.config.timeout, } + + // Configure influx line protocol encoder s.encoder.SetPrecision(influx.Second) + return s, nil }