diff --git a/sinks/influxAsyncSink.go b/sinks/influxAsyncSink.go index 213f2d6..e22f941 100644 --- a/sinks/influxAsyncSink.go +++ b/sinks/influxAsyncSink.go @@ -28,10 +28,10 @@ type InfluxAsyncSinkConfig struct { BatchSize uint `json:"batch_size,omitempty"` // Interval, in ms, in which is buffer flushed if it has not been already written (by reaching batch size) . Default 1000ms FlushInterval uint `json:"flush_interval,omitempty"` - InfluxRetryInterval string `json:"retry_interval"` - InfluxExponentialBase uint `json:"retry_exponential_base"` - InfluxMaxRetries uint `json:"max_retries"` - InfluxMaxRetryTime string `json:"max_retry_time"` + InfluxRetryInterval string `json:"retry_interval,omitempty"` + InfluxExponentialBase uint `json:"retry_exponential_base,omitempty"` + InfluxMaxRetries uint `json:"max_retries,omitempty"` + InfluxMaxRetryTime string `json:"max_retry_time,omitempty"` } type InfluxAsyncSink struct { @@ -60,20 +60,34 @@ func (s *InfluxAsyncSink) connect() error { cclog.ComponentDebug(s.name, "Using URI", uri, "Org", s.config.Organization, "Bucket", s.config.Database) clientOptions := influxdb2.DefaultOptions() if s.config.BatchSize != 0 { + cclog.ComponentDebug(s.name, "Batch size", s.config.BatchSize) clientOptions.SetBatchSize(s.config.BatchSize) } if s.config.FlushInterval != 0 { + cclog.ComponentDebug(s.name, "Flush interval", s.config.FlushInterval) clientOptions.SetFlushInterval(s.config.FlushInterval) } + if s.influxRetryInterval != 0 { + cclog.ComponentDebug(s.name, "MaxRetryInterval", s.influxRetryInterval) + clientOptions.SetMaxRetryInterval(s.influxRetryInterval) + } + if s.influxMaxRetryTime != 0 { + cclog.ComponentDebug(s.name, "MaxRetryTime", s.influxMaxRetryTime) + clientOptions.SetMaxRetryTime(s.influxMaxRetryTime) + } + if s.config.InfluxExponentialBase != 0 { + cclog.ComponentDebug(s.name, "Exponential Base", s.config.InfluxExponentialBase) + clientOptions.SetExponentialBase(s.config.InfluxExponentialBase) + } + if s.config.InfluxMaxRetries != 0 { + cclog.ComponentDebug(s.name, "Max Retries", s.config.InfluxMaxRetries) + clientOptions.SetMaxRetries(s.config.InfluxMaxRetries) + } clientOptions.SetTLSConfig( &tls.Config{ InsecureSkipVerify: true, }, - ) - clientOptions.SetMaxRetryInterval(s.influxRetryInterval) - clientOptions.SetMaxRetryTime(s.influxMaxRetryTime) - clientOptions.SetExponentialBase(s.config.InfluxExponentialBase) - clientOptions.SetMaxRetries(s.config.InfluxMaxRetries) + ).SetPrecision(time.Second) s.client = influxdb2.NewClientWithOptions(uri, auth, clientOptions) s.writeApi = s.client.WriteAPI(s.config.Organization, s.config.Database) @@ -110,13 +124,14 @@ func NewInfluxAsyncSink(name string, config json.RawMessage) (Sink, error) { s.name = fmt.Sprintf("InfluxSink(%s)", name) // Set default for maximum number of points sent to server in single request. - s.config.BatchSize = 100 - s.influxRetryInterval = uint(time.Duration(1) * time.Second) - s.config.InfluxRetryInterval = "1s" - s.influxMaxRetryTime = uint(7 * time.Duration(24) * time.Hour) - s.config.InfluxMaxRetryTime = "168h" - s.config.InfluxMaxRetries = 20 - s.config.InfluxExponentialBase = 2 + s.config.BatchSize = 0 + s.influxRetryInterval = 0 + //s.config.InfluxRetryInterval = "1s" + s.influxMaxRetryTime = 0 + //s.config.InfluxMaxRetryTime = "168h" + s.config.InfluxMaxRetries = 0 + s.config.InfluxExponentialBase = 0 + s.config.FlushInterval = 0 // Default retry intervals (in seconds) // 1 2