Add tags in lexical order as required by AddTag()

This commit is contained in:
Holger Obermaier 2023-09-19 13:33:25 +02:00
parent 9e73849081
commit c472029c2d

View File

@ -44,13 +44,19 @@ type HttpSinkConfig struct {
type HttpSink struct {
sink
client *http.Client
client *http.Client
// influx line protocol encoder
encoder influx.Encoder
// Flush() runs in another goroutine, so this encoderLock has to protect the encoder
// Flush() runs in another goroutine and accesses the influx line protocol encoder,
// so this encoderLock has to protect the encoder
encoderLock sync.Mutex
timerLock sync.Mutex
flushTimer *time.Timer
config HttpSinkConfig
// timer to run Flush()
flushTimer *time.Timer
// Lock to assure that only one timer is running at a time
timerLock sync.Mutex
config HttpSinkConfig
}
// Write sends metric m as http message
@ -62,18 +68,24 @@ func (s *HttpSink) Write(m lp.CCMetric) error {
// Encode measurement name
s.encoder.StartLine(m.Name())
// Encode tags
// copy tags and meta data which should be used as tags
tags := make(map[string]string)
keys := make([]string, 0)
for key, value := range m.Tags() {
s.encoder.AddTag(key, value)
keys = append(keys, key)
tags[key] = value
}
for _, key := range s.config.MetaAsTags {
if value, ok := m.GetMeta(key); ok {
keys = append(keys, key)
tags[key] = value
}
}
// Encode metadata as tags
for key, use_meta_as_tag := range s.meta_as_tags {
if use_meta_as_tag {
if val, ok := m.GetMeta(key); ok {
s.encoder.AddTag(key, val)
}
}
// Encode tags
slices.Sort(keys)
for _, key := range keys {
s.encoder.AddTag(key, tags[key])
}
// Encode fields
@ -234,11 +246,8 @@ func NewHttpSink(name string, config json.RawMessage) (Sink, error) {
cclog.ComponentDebug(s.name, "flushDelay", t)
}
}
// Create lookup map to use meta infos as tags in the output metric
s.meta_as_tags = make(map[string]bool)
for _, k := range s.config.MetaAsTags {
s.meta_as_tags[k] = true
}
// Create http client
s.client = &http.Client{
Transport: &http.Transport{
MaxIdleConns: 1, // We will only ever talk to one host.
@ -246,6 +255,9 @@ func NewHttpSink(name string, config json.RawMessage) (Sink, error) {
},
Timeout: s.config.timeout,
}
// Configure influx line protocol encoder
s.encoder.SetPrecision(influx.Second)
return s, nil
}