From c9b8fcdaa75a1a9db35984645d17d76aead4e235 Mon Sep 17 00:00:00 2001 From: Thomas Gruber Date: Fri, 11 Mar 2022 13:43:03 +0100 Subject: [PATCH] Add config options for retry intervals of InfluxDB clients (#59) --- sinks/influxAsyncSink.go | 59 ++++++++++++++++++++++++++++++++++++---- sinks/influxAsyncSink.md | 12 +++++++- sinks/influxSink.go | 54 ++++++++++++++++++++++++++++-------- sinks/influxSink.md | 12 +++++++- 4 files changed, 119 insertions(+), 18 deletions(-) diff --git a/sinks/influxAsyncSink.go b/sinks/influxAsyncSink.go index 7b38873..a2cb64a 100644 --- a/sinks/influxAsyncSink.go +++ b/sinks/influxAsyncSink.go @@ -6,6 +6,7 @@ import ( "encoding/json" "errors" "fmt" + "time" cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" @@ -26,15 +27,21 @@ type InfluxAsyncSinkConfig struct { // Maximum number of points sent to server in single request. Default 5000 BatchSize uint `json:"batch_size,omitempty"` // Interval, in ms, in which is buffer flushed if it has not been already written (by reaching batch size) . Default 1000ms - FlushInterval uint `json:"flush_interval,omitempty"` + FlushInterval uint `json:"flush_interval,omitempty"` + InfluxRetryInterval string `json:"retry_interval"` + InfluxExponentialBase uint `json:"retry_exponential_base"` + InfluxMaxRetries uint `json:"max_retries"` + InfluxMaxRetryTime string `json:"max_retry_time"` } type InfluxAsyncSink struct { sink - client influxdb2.Client - writeApi influxdb2Api.WriteAPI - errors <-chan error - config InfluxAsyncSinkConfig + client influxdb2.Client + writeApi influxdb2Api.WriteAPI + errors <-chan error + config InfluxAsyncSinkConfig + influxRetryInterval uint + influxMaxRetryTime uint } func (s *InfluxAsyncSink) connect() error { @@ -63,6 +70,11 @@ func (s *InfluxAsyncSink) connect() error { InsecureSkipVerify: true, }, ) + clientOptions.SetMaxRetryInterval(s.influxRetryInterval) + clientOptions.SetMaxRetryTime(s.influxMaxRetryTime) + clientOptions.SetExponentialBase(s.config.InfluxExponentialBase) + clientOptions.SetMaxRetries(s.config.InfluxMaxRetries) + s.client = influxdb2.NewClientWithOptions(uri, auth, clientOptions) s.writeApi = s.client.WriteAPI(s.config.Organization, s.config.Database) ok, err := s.client.Ping(context.Background()) @@ -99,6 +111,33 @@ func NewInfluxAsyncSink(name string, config json.RawMessage) (Sink, error) { // Set default for maximum number of points sent to server in single request. s.config.BatchSize = 100 + s.influxRetryInterval = uint(time.Duration(1) * time.Second) + s.config.InfluxRetryInterval = "1s" + s.influxMaxRetryTime = uint(7 * time.Duration(24) * time.Hour) + s.config.InfluxMaxRetryTime = "168h" + s.config.InfluxMaxRetries = 20 + s.config.InfluxExponentialBase = 2 + + // Default retry intervals (in seconds) + // 1 2 + // 2 4 + // 4 8 + // 8 16 + // 16 32 + // 32 64 + // 64 128 + // 128 256 + // 256 512 + // 512 1024 + // 1024 2048 + // 2048 4096 + // 4096 8192 + // 8192 16384 + // 16384 32768 + // 32768 65536 + // 65536 131072 + // 131072 262144 + // 262144 524288 if len(config) > 0 { err := json.Unmarshal(config, &s.config) @@ -114,6 +153,16 @@ func NewInfluxAsyncSink(name string, config json.RawMessage) (Sink, error) { return nil, errors.New("not all configuration variables set required by InfluxAsyncSink") } + toUint := func(duration string, def uint) uint { + t, err := time.ParseDuration(duration) + if err == nil { + return uint(t.Milliseconds()) + } + return def + } + s.influxRetryInterval = toUint(s.config.InfluxRetryInterval, s.influxRetryInterval) + s.influxMaxRetryTime = toUint(s.config.InfluxMaxRetryTime, s.influxMaxRetryTime) + // Connect to InfluxDB server if err := s.connect(); err != nil { return nil, fmt.Errorf("unable to connect: %v", err) diff --git a/sinks/influxAsyncSink.md b/sinks/influxAsyncSink.md index 286c93c..951d67d 100644 --- a/sinks/influxAsyncSink.md +++ b/sinks/influxAsyncSink.md @@ -18,6 +18,10 @@ The `influxasync` sink uses the official [InfluxDB golang client](https://pkg.go "organization": "myorg", "ssl": true, "batch_size": 200, + "retry_interval" : "1s", + "retry_exponential_base" : 2, + "max_retries": 20, + "max_retry_time" : "168h" } } ``` @@ -31,4 +35,10 @@ The `influxasync` sink uses the official [InfluxDB golang client](https://pkg.go - `password`: Password for basic authentification - `organization`: Organization in the InfluxDB - `ssl`: Use SSL connection -- `batch_size`: batch up metrics internally, default 100 \ No newline at end of file +- `batch_size`: batch up metrics internally, default 100 +- `retry_interval`: Base retry interval for failed write requests, default 1s +- `retry_exponential_base`: The retry interval is exponentially increased with this base, default 2 +- `max_retries`: Maximal number of retry attempts +- `max_retry_time`: Maximal time to retry failed writes, default 168h (one week) + +For information about the calculation of the retry interval settings, see [offical influxdb-client-go documentation](https://github.com/influxdata/influxdb-client-go#handling-of-failed-async-writes) \ No newline at end of file diff --git a/sinks/influxSink.go b/sinks/influxSink.go index 11859b2..ed3bb09 100644 --- a/sinks/influxSink.go +++ b/sinks/influxSink.go @@ -6,6 +6,7 @@ import ( "encoding/json" "errors" "fmt" + "time" cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" @@ -15,21 +16,29 @@ import ( type InfluxSinkConfig struct { defaultSinkConfig - Host string `json:"host,omitempty"` - Port string `json:"port,omitempty"` - Database string `json:"database,omitempty"` - User string `json:"user,omitempty"` - Password string `json:"password,omitempty"` - Organization string `json:"organization,omitempty"` - SSL bool `json:"ssl,omitempty"` - RetentionPol string `json:"retention_policy,omitempty"` + Host string `json:"host,omitempty"` + Port string `json:"port,omitempty"` + Database string `json:"database,omitempty"` + User string `json:"user,omitempty"` + Password string `json:"password,omitempty"` + Organization string `json:"organization,omitempty"` + SSL bool `json:"ssl,omitempty"` + RetentionPol string `json:"retention_policy,omitempty"` + InfluxRetryInterval string `json:"retry_interval"` + InfluxExponentialBase uint `json:"retry_exponential_base"` + InfluxMaxRetries uint `json:"max_retries"` + InfluxMaxRetryTime string `json:"max_retry_time"` + //InfluxMaxRetryDelay string `json:"max_retry_delay"` // It is mentioned in the docs but there is no way to set it } type InfluxSink struct { sink - client influxdb2.Client - writeApi influxdb2Api.WriteAPIBlocking - config InfluxSinkConfig + client influxdb2.Client + writeApi influxdb2Api.WriteAPIBlocking + config InfluxSinkConfig + influxRetryInterval uint + influxMaxRetryTime uint + //influxMaxRetryDelay uint } func (s *InfluxSink) connect() error { @@ -52,6 +61,12 @@ func (s *InfluxSink) connect() error { InsecureSkipVerify: true, }, ) + + clientOptions.SetMaxRetryInterval(s.influxRetryInterval) + clientOptions.SetMaxRetryTime(s.influxMaxRetryTime) + clientOptions.SetExponentialBase(s.config.InfluxExponentialBase) + clientOptions.SetMaxRetries(s.config.InfluxMaxRetries) + s.client = influxdb2.NewClientWithOptions(uri, auth, clientOptions) s.writeApi = s.client.WriteAPIBlocking(s.config.Organization, s.config.Database) ok, err := s.client.Ping(context.Background()) @@ -91,6 +106,13 @@ func NewInfluxSink(name string, config json.RawMessage) (Sink, error) { return nil, err } } + s.influxRetryInterval = uint(time.Duration(1) * time.Second) + s.config.InfluxRetryInterval = "1s" + s.influxMaxRetryTime = uint(7 * time.Duration(24) * time.Hour) + s.config.InfluxMaxRetryTime = "168h" + s.config.InfluxMaxRetries = 20 + s.config.InfluxExponentialBase = 2 + if len(s.config.Host) == 0 || len(s.config.Port) == 0 || len(s.config.Database) == 0 || @@ -99,6 +121,16 @@ func NewInfluxSink(name string, config json.RawMessage) (Sink, error) { return nil, errors.New("not all configuration variables set required by InfluxSink") } + toUint := func(duration string, def uint) uint { + t, err := time.ParseDuration(duration) + if err == nil { + return uint(t.Milliseconds()) + } + return def + } + s.influxRetryInterval = toUint(s.config.InfluxRetryInterval, s.influxRetryInterval) + s.influxMaxRetryTime = toUint(s.config.InfluxMaxRetryTime, s.influxMaxRetryTime) + // Connect to InfluxDB server if err := s.connect(); err != nil { return nil, fmt.Errorf("unable to connect: %v", err) diff --git a/sinks/influxSink.md b/sinks/influxSink.md index bd0f576..a099895 100644 --- a/sinks/influxSink.md +++ b/sinks/influxSink.md @@ -17,6 +17,10 @@ The `influxdb` sink uses the official [InfluxDB golang client](https://pkg.go.de "password" : "examplepw", "organization": "myorg", "ssl": true, + "retry_interval" : "1s", + "retry_exponential_base" : 2, + "max_retries": 20, + "max_retry_time" : "168h" } } ``` @@ -29,4 +33,10 @@ The `influxdb` sink uses the official [InfluxDB golang client](https://pkg.go.de - `user`: Username for basic authentification - `password`: Password for basic authentification - `organization`: Organization in the InfluxDB -- `ssl`: Use SSL connection \ No newline at end of file +- `ssl`: Use SSL connection +- `retry_interval`: Base retry interval for failed write requests, default 1s +- `retry_exponential_base`: The retry interval is exponentially increased with this base, default 2 +- `max_retries`: Maximal number of retry attempts +- `max_retry_time`: Maximal time to retry failed writes, default 168h (one week) + +For information about the calculation of the retry interval settings, see [offical influxdb-client-go documentation](https://github.com/influxdata/influxdb-client-go#handling-of-failed-async-writes) \ No newline at end of file