From ee4bd558f1ba0a6515793085c842f05207d83ad4 Mon Sep 17 00:00:00 2001 From: Holger Obermaier <40787752+ho-ob@users.noreply.github.com> Date: Fri, 6 May 2022 11:44:57 +0200 Subject: [PATCH] Cleanup: Remove unused code --- sinks/influxAsyncSink.go | 18 ++++--- sinks/influxSink.go | 102 ++++++++++++--------------------------- 2 files changed, 41 insertions(+), 79 deletions(-) diff --git a/sinks/influxAsyncSink.go b/sinks/influxAsyncSink.go index 31d127d..bf88079 100644 --- a/sinks/influxAsyncSink.go +++ b/sinks/influxAsyncSink.go @@ -25,7 +25,6 @@ type InfluxAsyncSinkConfig struct { Password string `json:"password,omitempty"` Organization string `json:"organization,omitempty"` SSL bool `json:"ssl,omitempty"` - RetentionPol string `json:"retention_policy,omitempty"` // Maximum number of points sent to server in single request. Default 5000 BatchSize uint `json:"batch_size,omitempty"` // Interval, in ms, in which is buffer flushed if it has not been already written (by reaching batch size) . Default 1000ms @@ -186,12 +185,17 @@ func NewInfluxAsyncSink(name string, config json.RawMessage) (Sink, error) { return nil, err } } - if len(s.config.Host) == 0 || - len(s.config.Port) == 0 || - len(s.config.Database) == 0 || - len(s.config.Organization) == 0 || - len(s.config.Password) == 0 { - return nil, errors.New("not all configuration variables set required by InfluxAsyncSink") + if len(s.config.Port) == 0 { + return nil, errors.New("Missing port configuration required by InfluxSink") + } + if len(s.config.Database) == 0 { + return nil, errors.New("Missing database configuration required by InfluxSink") + } + if len(s.config.Organization) == 0 { + return nil, errors.New("Missing organization configuration required by InfluxSink") + } + if len(s.config.Password) == 0 { + return nil, errors.New("Missing password configuration required by InfluxSink") } // Create lookup map to use meta infos as tags in the output metric s.meta_as_tags = make(map[string]bool) diff --git a/sinks/influxSink.go b/sinks/influxSink.go index b382c38..212647d 100644 --- a/sinks/influxSink.go +++ b/sinks/influxSink.go @@ -16,37 +16,28 @@ import ( "github.com/influxdata/influxdb-client-go/v2/api/write" ) -type InfluxSinkConfig struct { - defaultSinkConfig - Host string `json:"host,omitempty"` - Port string `json:"port,omitempty"` - Database string `json:"database,omitempty"` - User string `json:"user,omitempty"` - Password string `json:"password,omitempty"` - Organization string `json:"organization,omitempty"` - SSL bool `json:"ssl,omitempty"` - FlushDelay string `json:"flush_delay,omitempty"` - BatchSize int `json:"batch_size,omitempty"` - RetentionPol string `json:"retention_policy,omitempty"` - // InfluxRetryInterval string `json:"retry_interval"` - // InfluxExponentialBase uint `json:"retry_exponential_base"` - // InfluxMaxRetries uint `json:"max_retries"` - // InfluxMaxRetryTime string `json:"max_retry_time"` - //InfluxMaxRetryDelay string `json:"max_retry_delay"` // It is mentioned in the docs but there is no way to set it -} - type InfluxSink struct { sink - client influxdb2.Client - writeApi influxdb2Api.WriteAPIBlocking - config InfluxSinkConfig - influxRetryInterval uint - influxMaxRetryTime uint - batch []*write.Point - flushTimer *time.Timer - flushDelay time.Duration - lock sync.Mutex // Flush() runs in another goroutine, so this lock has to protect the buffer - //influxMaxRetryDelay uint + client influxdb2.Client + writeApi influxdb2Api.WriteAPIBlocking + config struct { + defaultSinkConfig + Host string `json:"host,omitempty"` + Port string `json:"port,omitempty"` + Database string `json:"database,omitempty"` + User string `json:"user,omitempty"` + Password string `json:"password,omitempty"` + Organization string `json:"organization,omitempty"` + SSL bool `json:"ssl,omitempty"` + // Maximum number of points sent to server in single request. Default 100 + BatchSize int `json:"batch_size,omitempty"` + // Interval, in which is buffer flushed if it has not been already written (by reaching batch size). Default 1s + FlushInterval string `json:"flush_delay,omitempty"` + } + batch []*write.Point + flushTimer *time.Timer + flushDelay time.Duration + lock sync.Mutex // Flush() runs in another goroutine, so this lock has to protect the buffer } // connect connects to the InfluxDB server @@ -76,23 +67,6 @@ func (s *InfluxSink) connect() error { // Set influxDB client options clientOptions := influxdb2.DefaultOptions() - // if s.influxRetryInterval != 0 { - // cclog.ComponentDebug(s.name, "MaxRetryInterval", s.influxRetryInterval) - // clientOptions.SetMaxRetryInterval(s.influxRetryInterval) - // } - // if s.influxMaxRetryTime != 0 { - // cclog.ComponentDebug(s.name, "MaxRetryTime", s.influxMaxRetryTime) - // clientOptions.SetMaxRetryTime(s.influxMaxRetryTime) - // } - // if s.config.InfluxExponentialBase != 0 { - // cclog.ComponentDebug(s.name, "Exponential Base", s.config.InfluxExponentialBase) - // clientOptions.SetExponentialBase(s.config.InfluxExponentialBase) - // } - // if s.config.InfluxMaxRetries != 0 { - // cclog.ComponentDebug(s.name, "Max Retries", s.config.InfluxMaxRetries) - // clientOptions.SetMaxRetries(s.config.InfluxMaxRetries) - // } - // Do not check InfluxDB certificate clientOptions.SetTLSConfig( &tls.Config{ @@ -126,11 +100,13 @@ func (s *InfluxSink) Write(m lp.CCMetric) error { } // Run a batched flush for all lines that have arrived in the last flush delay interval - s.flushTimer = time.AfterFunc(s.flushDelay, func() { - if err := s.Flush(); err != nil { - cclog.ComponentError(s.name, "flush failed:", err.Error()) - } - }) + s.flushTimer = time.AfterFunc( + s.flushDelay, + func() { + if err := s.Flush(); err != nil { + cclog.ComponentError(s.name, "flush failed:", err.Error()) + } + }) } // Append metric to batch slice @@ -194,7 +170,7 @@ func NewInfluxSink(name string, config json.RawMessage) (Sink, error) { // Set config default values s.config.BatchSize = 100 - s.config.FlushDelay = "1s" + s.config.FlushInterval = "1s" // Read config if len(config) > 0 { @@ -203,12 +179,6 @@ func NewInfluxSink(name string, config json.RawMessage) (Sink, error) { return nil, err } } - s.influxRetryInterval = 0 - s.influxMaxRetryTime = 0 - // s.config.InfluxRetryInterval = "" - // s.config.InfluxMaxRetryTime = "" - // s.config.InfluxMaxRetries = 0 - // s.config.InfluxExponentialBase = 0 if len(s.config.Host) == 0 { return nil, errors.New("Missing host configuration required by InfluxSink") @@ -232,21 +202,9 @@ func NewInfluxSink(name string, config json.RawMessage) (Sink, error) { s.meta_as_tags[k] = true } - // toUint := func(duration string, def uint) uint { - // if len(duration) > 0 { - // t, err := time.ParseDuration(duration) - // if err == nil { - // return uint(t.Milliseconds()) - // } - // } - // return def - // } - // s.influxRetryInterval = toUint(s.config.InfluxRetryInterval, s.influxRetryInterval) - // s.influxMaxRetryTime = toUint(s.config.InfluxMaxRetryTime, s.influxMaxRetryTime) - // Configure flush delay duration - if len(s.config.FlushDelay) > 0 { - t, err := time.ParseDuration(s.config.FlushDelay) + if len(s.config.FlushInterval) > 0 { + t, err := time.ParseDuration(s.config.FlushInterval) if err == nil { s.flushDelay = t }