Use a meta_as_tags string list in config but create a lookup map afterwards

This commit is contained in:
Thomas Roehl 2022-03-09 11:01:41 +01:00
parent 08968b1894
commit 4800b33ceb
6 changed files with 34 additions and 10 deletions

View File

@ -53,7 +53,7 @@ func (s *HttpSink) Write(m lp.CCMetric) error {
}) })
} }
p := m.ToPoint(s.config.MetaAsTags) p := m.ToPoint(s.meta_as_tags)
s.lock.Lock() s.lock.Lock()
_, err := s.encoder.Encode(p) _, err := s.encoder.Encode(p)
@ -159,6 +159,11 @@ func NewHttpSink(name string, config json.RawMessage) (Sink, error) {
s.flushDelay = t s.flushDelay = t
} }
} }
// Create lookup map to use meta infos as tags in the output metric
s.meta_as_tags = make(map[string]bool)
for _, k := range s.config.MetaAsTags {
s.meta_as_tags[k] = true
}
tr := &http.Transport{ tr := &http.Transport{
MaxIdleConns: s.maxIdleConns, MaxIdleConns: s.maxIdleConns,
IdleConnTimeout: s.idleConnTimeout, IdleConnTimeout: s.idleConnTimeout,

View File

@ -77,7 +77,7 @@ func (s *InfluxAsyncSink) connect() error {
func (s *InfluxAsyncSink) Write(m lp.CCMetric) error { func (s *InfluxAsyncSink) Write(m lp.CCMetric) error {
s.writeApi.WritePoint( s.writeApi.WritePoint(
m.ToPoint(s.config.MetaAsTags), m.ToPoint(s.meta_as_tags),
) )
return nil return nil
} }
@ -113,6 +113,11 @@ func NewInfluxAsyncSink(name string, config json.RawMessage) (Sink, error) {
len(s.config.Password) == 0 { len(s.config.Password) == 0 {
return nil, errors.New("not all configuration variables set required by InfluxAsyncSink") return nil, errors.New("not all configuration variables set required by InfluxAsyncSink")
} }
// Create lookup map to use meta infos as tags in the output metric
s.meta_as_tags = make(map[string]bool)
for _, k := range s.config.MetaAsTags {
s.meta_as_tags[k] = true
}
// Connect to InfluxDB server // Connect to InfluxDB server
if err := s.connect(); err != nil { if err := s.connect(); err != nil {

View File

@ -68,7 +68,7 @@ func (s *InfluxSink) Write(m lp.CCMetric) error {
err := err :=
s.writeApi.WritePoint( s.writeApi.WritePoint(
context.Background(), context.Background(),
m.ToPoint(s.config.MetaAsTags), m.ToPoint(s.meta_as_tags),
) )
return err return err
} }
@ -98,6 +98,11 @@ func NewInfluxSink(name string, config json.RawMessage) (Sink, error) {
len(s.config.Password) == 0 { len(s.config.Password) == 0 {
return nil, errors.New("not all configuration variables set required by InfluxSink") return nil, errors.New("not all configuration variables set required by InfluxSink")
} }
// Create lookup map to use meta infos as tags in the output metric
s.meta_as_tags = make(map[string]bool)
for _, k := range s.config.MetaAsTags {
s.meta_as_tags[k] = true
}
// Connect to InfluxDB server // Connect to InfluxDB server
if err := s.connect(); err != nil { if err := s.connect(); err != nil {

View File

@ -5,13 +5,13 @@ import (
) )
type defaultSinkConfig struct { type defaultSinkConfig struct {
MetaAsTags bool `json:"meta_as_tags,omitempty"` MetaAsTags []string `json:"meta_as_tags,omitempty"`
Type string `json:"type"` Type string `json:"type"`
} }
type sink struct { type sink struct {
meta_as_tags bool // Use meta data tags as tags meta_as_tags map[string]bool // Use meta data tags as tags
name string // Name of the sink name string // Name of the sink
} }
type Sink interface { type Sink interface {

View File

@ -55,7 +55,7 @@ func (s *NatsSink) connect() error {
func (s *NatsSink) Write(m lp.CCMetric) error { func (s *NatsSink) Write(m lp.CCMetric) error {
if s.client != nil { if s.client != nil {
_, err := s.encoder.Encode(m.ToPoint(s.config.MetaAsTags)) _, err := s.encoder.Encode(m.ToPoint(s.meta_as_tags))
if err != nil { if err != nil {
cclog.ComponentError(s.name, "Write:", err.Error()) cclog.ComponentError(s.name, "Write:", err.Error())
return err return err
@ -97,6 +97,11 @@ func NewNatsSink(name string, config json.RawMessage) (Sink, error) {
len(s.config.Database) == 0 { len(s.config.Database) == 0 {
return nil, errors.New("not all configuration variables set required by NatsSink") return nil, errors.New("not all configuration variables set required by NatsSink")
} }
// Create lookup map to use meta infos as tags in the output metric
s.meta_as_tags = make(map[string]bool)
for _, k := range s.config.MetaAsTags {
s.meta_as_tags[k] = true
}
// Setup Influx line protocol // Setup Influx line protocol
s.buffer = &bytes.Buffer{} s.buffer = &bytes.Buffer{}
s.buffer.Grow(1025) s.buffer.Grow(1025)
@ -105,7 +110,7 @@ func NewNatsSink(name string, config json.RawMessage) (Sink, error) {
s.encoder.SetMaxLineBytes(1024) s.encoder.SetMaxLineBytes(1024)
// Setup infos for connection // Setup infos for connection
if err := s.connect(); err != nil { if err := s.connect(); err != nil {
return nil, fmt.Errorf("Unable to connect: %v", err) return nil, fmt.Errorf("unable to connect: %v", err)
} }
return s, nil return s, nil
} }

View File

@ -63,7 +63,11 @@ func NewStdoutSink(name string, config json.RawMessage) (Sink, error) {
s.output = f s.output = f
} }
} }
s.meta_as_tags = s.config.MetaAsTags // Create lookup map to use meta infos as tags in the output metric
s.meta_as_tags = make(map[string]bool)
for _, k := range s.config.MetaAsTags {
s.meta_as_tags[k] = true
}
return s, nil return s, nil
} }