From 9f65365f9d0869efd217c6b159408fc2ba6a521b Mon Sep 17 00:00:00 2001 From: Holger Obermaier <40787752+ho-ob@users.noreply.github.com> Date: Fri, 29 Sep 2023 10:36:42 +0200 Subject: [PATCH] Add Influx client options --- sinks/influxSink.go | 45 +++++++++++++++++++++++++++++++++++++++++++++ sinks/influxSink.md | 6 ++++++ 2 files changed, 51 insertions(+) diff --git a/sinks/influxSink.go b/sinks/influxSink.go index 2ed2267..6bad453 100644 --- a/sinks/influxSink.go +++ b/sinks/influxSink.go @@ -40,6 +40,17 @@ type InfluxSink struct { // Number of metrics that are dropped when buffer is full // Default: 100 DropRate int `json:"drop_rate,omitempty"` + + // Influx client options: + + // maximum delay between each retry attempt + InfluxMaxRetryInterval string `json:"retry_interval,omitempty"` + // base for the exponential retry delay + InfluxExponentialBase uint `json:"retry_exponential_base,omitempty"` + // maximum count of retry attempts of failed writes + InfluxMaxRetries uint `json:"max_retries,omitempty"` + // maximum total retry timeout + InfluxMaxRetryTime string `json:"max_retry_time,omitempty"` } batch []*write.Point flushTimer *time.Timer @@ -78,6 +89,40 @@ func (s *InfluxSink) connect() error { // Set influxDB client options clientOptions := influxdb2.DefaultOptions() + // Set the maximum delay between each retry attempt + if len(s.config.InfluxMaxRetryInterval) > 0 { + if t, err := time.ParseDuration(s.config.InfluxMaxRetryInterval); err == nil { + influxMaxRetryInterval := uint(t.Milliseconds()) + cclog.ComponentDebug(s.name, "Influx MaxRetryInterval", s.config.InfluxMaxRetryInterval) + clientOptions.SetMaxRetryInterval(influxMaxRetryInterval) + } else { + cclog.ComponentError(s.name, "Failed to parse duration for Influx MaxRetryInterval: ", s.config.InfluxMaxRetryInterval) + } + } + + // Set the base for the exponential retry delay + if s.config.InfluxExponentialBase != 0 { + cclog.ComponentDebug(s.name, "Influx Exponential Base", s.config.InfluxExponentialBase) + clientOptions.SetExponentialBase(s.config.InfluxExponentialBase) + } + + // Set maximum count of retry attempts of failed writes + if s.config.InfluxMaxRetries != 0 { + cclog.ComponentDebug(s.name, "Influx Max Retries", s.config.InfluxMaxRetries) + clientOptions.SetMaxRetries(s.config.InfluxMaxRetries) + } + + // Set the maximum total retry timeout + if len(s.config.InfluxMaxRetryTime) > 0 { + if t, err := time.ParseDuration(s.config.InfluxMaxRetryTime); err == nil { + influxMaxRetryTime := uint(t.Milliseconds()) + cclog.ComponentDebug(s.name, "MaxRetryTime", s.config.InfluxMaxRetryTime) + clientOptions.SetMaxRetryTime(influxMaxRetryTime) + } else { + cclog.ComponentError(s.name, "Failed to parse duration for Influx MaxRetryInterval: ", s.config.InfluxMaxRetryInterval) + } + } + // Do not check InfluxDB certificate clientOptions.SetTLSConfig( &tls.Config{ diff --git a/sinks/influxSink.md b/sinks/influxSink.md index 8f9ce83..f811b11 100644 --- a/sinks/influxSink.md +++ b/sinks/influxSink.md @@ -35,3 +35,9 @@ The `influxdb` sink uses the official [InfluxDB golang client](https://pkg.go.de - `flush_delay`: Group metrics coming in to a single batch - `batch_size`: Maximal batch size +Influx client options: + +- `retry_interval`: maximum delay between each retry attempt +- `retry_exponential_base`: base for the exponential retry delay +- `max_retries`: maximum count of retry attempts of failed writes +- `max_retry_time`: maximum total retry timeout