Meta to tags list and map for sinks (#63)

* Change ccMetric->Influx functions

* Use a meta_as_tags string list in config but create a lookup map afterwards

* Add meta as tag logic to sampleSink
This commit is contained in:
Thomas Gruber 2022-03-15 16:16:26 +01:00 committed by GitHub
parent aa1afd745e
commit 57629a2e0a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 52 additions and 28 deletions

View File

@ -24,8 +24,8 @@ type ccMetric struct {
// ccMetric access functions // ccMetric access functions
type CCMetric interface { type CCMetric interface {
ToPoint(metaAsTags bool) *write.Point // Generate influxDB point for data type ccMetric ToPoint(metaAsTags map[string]bool) *write.Point // Generate influxDB point for data type ccMetric
ToLineProtocol(metaAsTags bool) string // Generate influxDB line protocol for data type ccMetric ToLineProtocol(metaAsTags map[string]bool) string // Generate influxDB line protocol for data type ccMetric
Name() string // Get metric name Name() string // Get metric name
SetName(name string) // Set metric name SetName(name string) // Set metric name
@ -61,25 +61,18 @@ func (m *ccMetric) String() string {
} }
// ToLineProtocol generates influxDB line protocol for data type ccMetric // ToLineProtocol generates influxDB line protocol for data type ccMetric
func (m *ccMetric) ToPoint(metaAsTags bool) (p *write.Point) { func (m *ccMetric) ToPoint(metaAsTags map[string]bool) (p *write.Point) {
if !metaAsTags {
p = influxdb2.NewPoint(m.name, m.tags, m.fields, m.tm) p = influxdb2.NewPoint(m.name, m.tags, m.fields, m.tm)
} else { for key, ok1 := range metaAsTags {
tags := make(map[string]string, len(m.tags)+len(m.meta)) if val, ok2 := m.GetMeta(key); ok1 && ok2 {
for key, value := range m.tags { p.AddTag(key, val)
tags[key] = value
} }
for key, value := range m.meta {
tags[key] = value
} }
p = influxdb2.NewPoint(m.name, tags, m.fields, m.tm) return p
}
return
} }
// ToLineProtocol generates influxDB line protocol for data type ccMetric // ToLineProtocol generates influxDB line protocol for data type ccMetric
func (m *ccMetric) ToLineProtocol(metaAsTags bool) string { func (m *ccMetric) ToLineProtocol(metaAsTags map[string]bool) string {
return write.PointToLineProtocol( return write.PointToLineProtocol(
m.ToPoint(metaAsTags), m.ToPoint(metaAsTags),

View File

@ -53,7 +53,7 @@ func (s *HttpSink) Write(m lp.CCMetric) error {
}) })
} }
p := m.ToPoint(s.config.MetaAsTags) p := m.ToPoint(s.meta_as_tags)
s.lock.Lock() s.lock.Lock()
_, err := s.encoder.Encode(p) _, err := s.encoder.Encode(p)
@ -159,6 +159,11 @@ func NewHttpSink(name string, config json.RawMessage) (Sink, error) {
s.flushDelay = t s.flushDelay = t
} }
} }
// Create lookup map to use meta infos as tags in the output metric
s.meta_as_tags = make(map[string]bool)
for _, k := range s.config.MetaAsTags {
s.meta_as_tags[k] = true
}
tr := &http.Transport{ tr := &http.Transport{
MaxIdleConns: s.maxIdleConns, MaxIdleConns: s.maxIdleConns,
IdleConnTimeout: s.idleConnTimeout, IdleConnTimeout: s.idleConnTimeout,

View File

@ -89,7 +89,7 @@ func (s *InfluxAsyncSink) connect() error {
func (s *InfluxAsyncSink) Write(m lp.CCMetric) error { func (s *InfluxAsyncSink) Write(m lp.CCMetric) error {
s.writeApi.WritePoint( s.writeApi.WritePoint(
m.ToPoint(s.config.MetaAsTags), m.ToPoint(s.meta_as_tags),
) )
return nil return nil
} }
@ -152,6 +152,11 @@ func NewInfluxAsyncSink(name string, config json.RawMessage) (Sink, error) {
len(s.config.Password) == 0 { len(s.config.Password) == 0 {
return nil, errors.New("not all configuration variables set required by InfluxAsyncSink") return nil, errors.New("not all configuration variables set required by InfluxAsyncSink")
} }
// Create lookup map to use meta infos as tags in the output metric
s.meta_as_tags = make(map[string]bool)
for _, k := range s.config.MetaAsTags {
s.meta_as_tags[k] = true
}
toUint := func(duration string, def uint) uint { toUint := func(duration string, def uint) uint {
t, err := time.ParseDuration(duration) t, err := time.ParseDuration(duration)

View File

@ -83,7 +83,7 @@ func (s *InfluxSink) Write(m lp.CCMetric) error {
err := err :=
s.writeApi.WritePoint( s.writeApi.WritePoint(
context.Background(), context.Background(),
m.ToPoint(s.config.MetaAsTags), m.ToPoint(s.meta_as_tags),
) )
return err return err
} }
@ -120,6 +120,11 @@ func NewInfluxSink(name string, config json.RawMessage) (Sink, error) {
len(s.config.Password) == 0 { len(s.config.Password) == 0 {
return nil, errors.New("not all configuration variables set required by InfluxSink") return nil, errors.New("not all configuration variables set required by InfluxSink")
} }
// Create lookup map to use meta infos as tags in the output metric
s.meta_as_tags = make(map[string]bool)
for _, k := range s.config.MetaAsTags {
s.meta_as_tags[k] = true
}
toUint := func(duration string, def uint) uint { toUint := func(duration string, def uint) uint {
t, err := time.ParseDuration(duration) t, err := time.ParseDuration(duration)

View File

@ -5,12 +5,12 @@ import (
) )
type defaultSinkConfig struct { type defaultSinkConfig struct {
MetaAsTags bool `json:"meta_as_tags,omitempty"` MetaAsTags []string `json:"meta_as_tags,omitempty"`
Type string `json:"type"` Type string `json:"type"`
} }
type sink struct { type sink struct {
meta_as_tags bool // Use meta data tags as tags meta_as_tags map[string]bool // Use meta data tags as tags
name string // Name of the sink name string // Name of the sink
} }

View File

@ -55,7 +55,7 @@ func (s *NatsSink) connect() error {
func (s *NatsSink) Write(m lp.CCMetric) error { func (s *NatsSink) Write(m lp.CCMetric) error {
if s.client != nil { if s.client != nil {
_, err := s.encoder.Encode(m.ToPoint(s.config.MetaAsTags)) _, err := s.encoder.Encode(m.ToPoint(s.meta_as_tags))
if err != nil { if err != nil {
cclog.ComponentError(s.name, "Write:", err.Error()) cclog.ComponentError(s.name, "Write:", err.Error())
return err return err
@ -97,6 +97,11 @@ func NewNatsSink(name string, config json.RawMessage) (Sink, error) {
len(s.config.Database) == 0 { len(s.config.Database) == 0 {
return nil, errors.New("not all configuration variables set required by NatsSink") return nil, errors.New("not all configuration variables set required by NatsSink")
} }
// Create lookup map to use meta infos as tags in the output metric
s.meta_as_tags = make(map[string]bool)
for _, k := range s.config.MetaAsTags {
s.meta_as_tags[k] = true
}
// Setup Influx line protocol // Setup Influx line protocol
s.buffer = &bytes.Buffer{} s.buffer = &bytes.Buffer{}
s.buffer.Grow(1025) s.buffer.Grow(1025)
@ -105,7 +110,7 @@ func NewNatsSink(name string, config json.RawMessage) (Sink, error) {
s.encoder.SetMaxLineBytes(1024) s.encoder.SetMaxLineBytes(1024)
// Setup infos for connection // Setup infos for connection
if err := s.connect(); err != nil { if err := s.connect(); err != nil {
return nil, fmt.Errorf("Unable to connect: %v", err) return nil, fmt.Errorf("unable to connect: %v", err)
} }
return s, nil return s, nil
} }

View File

@ -10,14 +10,14 @@ import (
) )
type SampleSinkConfig struct { type SampleSinkConfig struct {
// defines JSON tags for 'type' and 'meta_as_tags' // defines JSON tags for 'type' and 'meta_as_tags' (string list)
// See: metricSink.go // See: metricSink.go
defaultSinkConfig defaultSinkConfig
// Additional config options, for SampleSink // Additional config options, for SampleSink
} }
type SampleSink struct { type SampleSink struct {
// declares elements 'name' and 'meta_as_tags' // declares elements 'name' and 'meta_as_tags' (string to bool map!)
sink sink
config SampleSinkConfig // entry point to the SampleSinkConfig config SampleSinkConfig // entry point to the SampleSinkConfig
} }
@ -28,6 +28,7 @@ type SampleSink struct {
// Code to submit a single CCMetric to the sink // Code to submit a single CCMetric to the sink
func (s *SampleSink) Write(point lp.CCMetric) error { func (s *SampleSink) Write(point lp.CCMetric) error {
// based on s.meta_as_tags use meta infos as tags
log.Print(point) log.Print(point)
return nil return nil
} }
@ -62,6 +63,12 @@ func NewSampleSink(name string, config json.RawMessage) (Sink, error) {
} }
} }
// Create lookup map to use meta infos as tags in the output metric
s.meta_as_tags = make(map[string]bool)
for _, k := range s.config.MetaAsTags {
s.meta_as_tags[k] = true
}
// Check if all required fields in the config are set // Check if all required fields in the config are set
// E.g. use 'len(s.config.Option) > 0' for string settings // E.g. use 'len(s.config.Option) > 0' for string settings

View File

@ -63,7 +63,11 @@ func NewStdoutSink(name string, config json.RawMessage) (Sink, error) {
s.output = f s.output = f
} }
} }
s.meta_as_tags = s.config.MetaAsTags // Create lookup map to use meta infos as tags in the output metric
s.meta_as_tags = make(map[string]bool)
for _, k := range s.config.MetaAsTags {
s.meta_as_tags[k] = true
}
return s, nil return s, nil
} }