From 57629a2e0a2d19f2079941d2aa78052ee273639d Mon Sep 17 00:00:00 2001 From: Thomas Gruber Date: Tue, 15 Mar 2022 16:16:26 +0100 Subject: [PATCH] Meta to tags list and map for sinks (#63) * Change ccMetric->Influx functions * Use a meta_as_tags string list in config but create a lookup map afterwards * Add meta as tag logic to sampleSink --- internal/ccMetric/ccMetric.go | 25 +++++++++---------------- sinks/httpSink.go | 7 ++++++- sinks/influxAsyncSink.go | 7 ++++++- sinks/influxSink.go | 7 ++++++- sinks/metricSink.go | 8 ++++---- sinks/natsSink.go | 9 +++++++-- sinks/sampleSink.go | 11 +++++++++-- sinks/stdoutSink.go | 6 +++++- 8 files changed, 52 insertions(+), 28 deletions(-) diff --git a/internal/ccMetric/ccMetric.go b/internal/ccMetric/ccMetric.go index 1de325a..661b9a4 100644 --- a/internal/ccMetric/ccMetric.go +++ b/internal/ccMetric/ccMetric.go @@ -24,8 +24,8 @@ type ccMetric struct { // ccMetric access functions type CCMetric interface { - ToPoint(metaAsTags bool) *write.Point // Generate influxDB point for data type ccMetric - ToLineProtocol(metaAsTags bool) string // Generate influxDB line protocol for data type ccMetric + ToPoint(metaAsTags map[string]bool) *write.Point // Generate influxDB point for data type ccMetric + ToLineProtocol(metaAsTags map[string]bool) string // Generate influxDB line protocol for data type ccMetric Name() string // Get metric name SetName(name string) // Set metric name @@ -61,25 +61,18 @@ func (m *ccMetric) String() string { } // ToLineProtocol generates influxDB line protocol for data type ccMetric -func (m *ccMetric) ToPoint(metaAsTags bool) (p *write.Point) { - - if !metaAsTags { - p = influxdb2.NewPoint(m.name, m.tags, m.fields, m.tm) - } else { - tags := make(map[string]string, len(m.tags)+len(m.meta)) - for key, value := range m.tags { - tags[key] = value +func (m *ccMetric) ToPoint(metaAsTags map[string]bool) (p *write.Point) { + p = influxdb2.NewPoint(m.name, m.tags, m.fields, m.tm) + for key, ok1 := range metaAsTags { + if val, ok2 := m.GetMeta(key); ok1 && ok2 { + p.AddTag(key, val) } - for key, value := range m.meta { - tags[key] = value - } - p = influxdb2.NewPoint(m.name, tags, m.fields, m.tm) } - return + return p } // ToLineProtocol generates influxDB line protocol for data type ccMetric -func (m *ccMetric) ToLineProtocol(metaAsTags bool) string { +func (m *ccMetric) ToLineProtocol(metaAsTags map[string]bool) string { return write.PointToLineProtocol( m.ToPoint(metaAsTags), diff --git a/sinks/httpSink.go b/sinks/httpSink.go index c2dd2ea..398eaf3 100644 --- a/sinks/httpSink.go +++ b/sinks/httpSink.go @@ -53,7 +53,7 @@ func (s *HttpSink) Write(m lp.CCMetric) error { }) } - p := m.ToPoint(s.config.MetaAsTags) + p := m.ToPoint(s.meta_as_tags) s.lock.Lock() _, err := s.encoder.Encode(p) @@ -159,6 +159,11 @@ func NewHttpSink(name string, config json.RawMessage) (Sink, error) { s.flushDelay = t } } + // Create lookup map to use meta infos as tags in the output metric + s.meta_as_tags = make(map[string]bool) + for _, k := range s.config.MetaAsTags { + s.meta_as_tags[k] = true + } tr := &http.Transport{ MaxIdleConns: s.maxIdleConns, IdleConnTimeout: s.idleConnTimeout, diff --git a/sinks/influxAsyncSink.go b/sinks/influxAsyncSink.go index a2cb64a..213f2d6 100644 --- a/sinks/influxAsyncSink.go +++ b/sinks/influxAsyncSink.go @@ -89,7 +89,7 @@ func (s *InfluxAsyncSink) connect() error { func (s *InfluxAsyncSink) Write(m lp.CCMetric) error { s.writeApi.WritePoint( - m.ToPoint(s.config.MetaAsTags), + m.ToPoint(s.meta_as_tags), ) return nil } @@ -152,6 +152,11 @@ func NewInfluxAsyncSink(name string, config json.RawMessage) (Sink, error) { len(s.config.Password) == 0 { return nil, errors.New("not all configuration variables set required by InfluxAsyncSink") } + // Create lookup map to use meta infos as tags in the output metric + s.meta_as_tags = make(map[string]bool) + for _, k := range s.config.MetaAsTags { + s.meta_as_tags[k] = true + } toUint := func(duration string, def uint) uint { t, err := time.ParseDuration(duration) diff --git a/sinks/influxSink.go b/sinks/influxSink.go index ed3bb09..1987342 100644 --- a/sinks/influxSink.go +++ b/sinks/influxSink.go @@ -83,7 +83,7 @@ func (s *InfluxSink) Write(m lp.CCMetric) error { err := s.writeApi.WritePoint( context.Background(), - m.ToPoint(s.config.MetaAsTags), + m.ToPoint(s.meta_as_tags), ) return err } @@ -120,6 +120,11 @@ func NewInfluxSink(name string, config json.RawMessage) (Sink, error) { len(s.config.Password) == 0 { return nil, errors.New("not all configuration variables set required by InfluxSink") } + // Create lookup map to use meta infos as tags in the output metric + s.meta_as_tags = make(map[string]bool) + for _, k := range s.config.MetaAsTags { + s.meta_as_tags[k] = true + } toUint := func(duration string, def uint) uint { t, err := time.ParseDuration(duration) diff --git a/sinks/metricSink.go b/sinks/metricSink.go index d5356d0..c6c6860 100644 --- a/sinks/metricSink.go +++ b/sinks/metricSink.go @@ -5,13 +5,13 @@ import ( ) type defaultSinkConfig struct { - MetaAsTags bool `json:"meta_as_tags,omitempty"` - Type string `json:"type"` + MetaAsTags []string `json:"meta_as_tags,omitempty"` + Type string `json:"type"` } type sink struct { - meta_as_tags bool // Use meta data tags as tags - name string // Name of the sink + meta_as_tags map[string]bool // Use meta data tags as tags + name string // Name of the sink } type Sink interface { diff --git a/sinks/natsSink.go b/sinks/natsSink.go index 0d7987e..0597e9b 100644 --- a/sinks/natsSink.go +++ b/sinks/natsSink.go @@ -55,7 +55,7 @@ func (s *NatsSink) connect() error { func (s *NatsSink) Write(m lp.CCMetric) error { if s.client != nil { - _, err := s.encoder.Encode(m.ToPoint(s.config.MetaAsTags)) + _, err := s.encoder.Encode(m.ToPoint(s.meta_as_tags)) if err != nil { cclog.ComponentError(s.name, "Write:", err.Error()) return err @@ -97,6 +97,11 @@ func NewNatsSink(name string, config json.RawMessage) (Sink, error) { len(s.config.Database) == 0 { return nil, errors.New("not all configuration variables set required by NatsSink") } + // Create lookup map to use meta infos as tags in the output metric + s.meta_as_tags = make(map[string]bool) + for _, k := range s.config.MetaAsTags { + s.meta_as_tags[k] = true + } // Setup Influx line protocol s.buffer = &bytes.Buffer{} s.buffer.Grow(1025) @@ -105,7 +110,7 @@ func NewNatsSink(name string, config json.RawMessage) (Sink, error) { s.encoder.SetMaxLineBytes(1024) // Setup infos for connection if err := s.connect(); err != nil { - return nil, fmt.Errorf("Unable to connect: %v", err) + return nil, fmt.Errorf("unable to connect: %v", err) } return s, nil } diff --git a/sinks/sampleSink.go b/sinks/sampleSink.go index 3913a29..2a823e6 100644 --- a/sinks/sampleSink.go +++ b/sinks/sampleSink.go @@ -10,14 +10,14 @@ import ( ) type SampleSinkConfig struct { - // defines JSON tags for 'type' and 'meta_as_tags' + // defines JSON tags for 'type' and 'meta_as_tags' (string list) // See: metricSink.go defaultSinkConfig // Additional config options, for SampleSink } type SampleSink struct { - // declares elements 'name' and 'meta_as_tags' + // declares elements 'name' and 'meta_as_tags' (string to bool map!) sink config SampleSinkConfig // entry point to the SampleSinkConfig } @@ -28,6 +28,7 @@ type SampleSink struct { // Code to submit a single CCMetric to the sink func (s *SampleSink) Write(point lp.CCMetric) error { + // based on s.meta_as_tags use meta infos as tags log.Print(point) return nil } @@ -62,6 +63,12 @@ func NewSampleSink(name string, config json.RawMessage) (Sink, error) { } } + // Create lookup map to use meta infos as tags in the output metric + s.meta_as_tags = make(map[string]bool) + for _, k := range s.config.MetaAsTags { + s.meta_as_tags[k] = true + } + // Check if all required fields in the config are set // E.g. use 'len(s.config.Option) > 0' for string settings diff --git a/sinks/stdoutSink.go b/sinks/stdoutSink.go index acf2621..e091af3 100644 --- a/sinks/stdoutSink.go +++ b/sinks/stdoutSink.go @@ -63,7 +63,11 @@ func NewStdoutSink(name string, config json.RawMessage) (Sink, error) { s.output = f } } - s.meta_as_tags = s.config.MetaAsTags + // Create lookup map to use meta infos as tags in the output metric + s.meta_as_tags = make(map[string]bool) + for _, k := range s.config.MetaAsTags { + s.meta_as_tags[k] = true + } return s, nil }