mirror of
https://github.com/ClusterCockpit/cc-metric-collector.git
synced 2024-12-25 15:09:05 +01:00
Add config options for retry intervals of InfluxDB clients (#59)
This commit is contained in:
parent
17f37583fc
commit
c9b8fcdaa7
@ -6,6 +6,7 @@ import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
|
||||
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
|
||||
@ -26,15 +27,21 @@ type InfluxAsyncSinkConfig struct {
|
||||
// Maximum number of points sent to server in single request. Default 5000
|
||||
BatchSize uint `json:"batch_size,omitempty"`
|
||||
// Interval, in ms, in which is buffer flushed if it has not been already written (by reaching batch size) . Default 1000ms
|
||||
FlushInterval uint `json:"flush_interval,omitempty"`
|
||||
FlushInterval uint `json:"flush_interval,omitempty"`
|
||||
InfluxRetryInterval string `json:"retry_interval"`
|
||||
InfluxExponentialBase uint `json:"retry_exponential_base"`
|
||||
InfluxMaxRetries uint `json:"max_retries"`
|
||||
InfluxMaxRetryTime string `json:"max_retry_time"`
|
||||
}
|
||||
|
||||
type InfluxAsyncSink struct {
|
||||
sink
|
||||
client influxdb2.Client
|
||||
writeApi influxdb2Api.WriteAPI
|
||||
errors <-chan error
|
||||
config InfluxAsyncSinkConfig
|
||||
client influxdb2.Client
|
||||
writeApi influxdb2Api.WriteAPI
|
||||
errors <-chan error
|
||||
config InfluxAsyncSinkConfig
|
||||
influxRetryInterval uint
|
||||
influxMaxRetryTime uint
|
||||
}
|
||||
|
||||
func (s *InfluxAsyncSink) connect() error {
|
||||
@ -63,6 +70,11 @@ func (s *InfluxAsyncSink) connect() error {
|
||||
InsecureSkipVerify: true,
|
||||
},
|
||||
)
|
||||
clientOptions.SetMaxRetryInterval(s.influxRetryInterval)
|
||||
clientOptions.SetMaxRetryTime(s.influxMaxRetryTime)
|
||||
clientOptions.SetExponentialBase(s.config.InfluxExponentialBase)
|
||||
clientOptions.SetMaxRetries(s.config.InfluxMaxRetries)
|
||||
|
||||
s.client = influxdb2.NewClientWithOptions(uri, auth, clientOptions)
|
||||
s.writeApi = s.client.WriteAPI(s.config.Organization, s.config.Database)
|
||||
ok, err := s.client.Ping(context.Background())
|
||||
@ -99,6 +111,33 @@ func NewInfluxAsyncSink(name string, config json.RawMessage) (Sink, error) {
|
||||
|
||||
// Set default for maximum number of points sent to server in single request.
|
||||
s.config.BatchSize = 100
|
||||
s.influxRetryInterval = uint(time.Duration(1) * time.Second)
|
||||
s.config.InfluxRetryInterval = "1s"
|
||||
s.influxMaxRetryTime = uint(7 * time.Duration(24) * time.Hour)
|
||||
s.config.InfluxMaxRetryTime = "168h"
|
||||
s.config.InfluxMaxRetries = 20
|
||||
s.config.InfluxExponentialBase = 2
|
||||
|
||||
// Default retry intervals (in seconds)
|
||||
// 1 2
|
||||
// 2 4
|
||||
// 4 8
|
||||
// 8 16
|
||||
// 16 32
|
||||
// 32 64
|
||||
// 64 128
|
||||
// 128 256
|
||||
// 256 512
|
||||
// 512 1024
|
||||
// 1024 2048
|
||||
// 2048 4096
|
||||
// 4096 8192
|
||||
// 8192 16384
|
||||
// 16384 32768
|
||||
// 32768 65536
|
||||
// 65536 131072
|
||||
// 131072 262144
|
||||
// 262144 524288
|
||||
|
||||
if len(config) > 0 {
|
||||
err := json.Unmarshal(config, &s.config)
|
||||
@ -114,6 +153,16 @@ func NewInfluxAsyncSink(name string, config json.RawMessage) (Sink, error) {
|
||||
return nil, errors.New("not all configuration variables set required by InfluxAsyncSink")
|
||||
}
|
||||
|
||||
toUint := func(duration string, def uint) uint {
|
||||
t, err := time.ParseDuration(duration)
|
||||
if err == nil {
|
||||
return uint(t.Milliseconds())
|
||||
}
|
||||
return def
|
||||
}
|
||||
s.influxRetryInterval = toUint(s.config.InfluxRetryInterval, s.influxRetryInterval)
|
||||
s.influxMaxRetryTime = toUint(s.config.InfluxMaxRetryTime, s.influxMaxRetryTime)
|
||||
|
||||
// Connect to InfluxDB server
|
||||
if err := s.connect(); err != nil {
|
||||
return nil, fmt.Errorf("unable to connect: %v", err)
|
||||
|
@ -18,6 +18,10 @@ The `influxasync` sink uses the official [InfluxDB golang client](https://pkg.go
|
||||
"organization": "myorg",
|
||||
"ssl": true,
|
||||
"batch_size": 200,
|
||||
"retry_interval" : "1s",
|
||||
"retry_exponential_base" : 2,
|
||||
"max_retries": 20,
|
||||
"max_retry_time" : "168h"
|
||||
}
|
||||
}
|
||||
```
|
||||
@ -32,3 +36,9 @@ The `influxasync` sink uses the official [InfluxDB golang client](https://pkg.go
|
||||
- `organization`: Organization in the InfluxDB
|
||||
- `ssl`: Use SSL connection
|
||||
- `batch_size`: batch up metrics internally, default 100
|
||||
- `retry_interval`: Base retry interval for failed write requests, default 1s
|
||||
- `retry_exponential_base`: The retry interval is exponentially increased with this base, default 2
|
||||
- `max_retries`: Maximal number of retry attempts
|
||||
- `max_retry_time`: Maximal time to retry failed writes, default 168h (one week)
|
||||
|
||||
For information about the calculation of the retry interval settings, see [offical influxdb-client-go documentation](https://github.com/influxdata/influxdb-client-go#handling-of-failed-async-writes)
|
@ -6,6 +6,7 @@ import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
|
||||
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
|
||||
@ -15,21 +16,29 @@ import (
|
||||
|
||||
type InfluxSinkConfig struct {
|
||||
defaultSinkConfig
|
||||
Host string `json:"host,omitempty"`
|
||||
Port string `json:"port,omitempty"`
|
||||
Database string `json:"database,omitempty"`
|
||||
User string `json:"user,omitempty"`
|
||||
Password string `json:"password,omitempty"`
|
||||
Organization string `json:"organization,omitempty"`
|
||||
SSL bool `json:"ssl,omitempty"`
|
||||
RetentionPol string `json:"retention_policy,omitempty"`
|
||||
Host string `json:"host,omitempty"`
|
||||
Port string `json:"port,omitempty"`
|
||||
Database string `json:"database,omitempty"`
|
||||
User string `json:"user,omitempty"`
|
||||
Password string `json:"password,omitempty"`
|
||||
Organization string `json:"organization,omitempty"`
|
||||
SSL bool `json:"ssl,omitempty"`
|
||||
RetentionPol string `json:"retention_policy,omitempty"`
|
||||
InfluxRetryInterval string `json:"retry_interval"`
|
||||
InfluxExponentialBase uint `json:"retry_exponential_base"`
|
||||
InfluxMaxRetries uint `json:"max_retries"`
|
||||
InfluxMaxRetryTime string `json:"max_retry_time"`
|
||||
//InfluxMaxRetryDelay string `json:"max_retry_delay"` // It is mentioned in the docs but there is no way to set it
|
||||
}
|
||||
|
||||
type InfluxSink struct {
|
||||
sink
|
||||
client influxdb2.Client
|
||||
writeApi influxdb2Api.WriteAPIBlocking
|
||||
config InfluxSinkConfig
|
||||
client influxdb2.Client
|
||||
writeApi influxdb2Api.WriteAPIBlocking
|
||||
config InfluxSinkConfig
|
||||
influxRetryInterval uint
|
||||
influxMaxRetryTime uint
|
||||
//influxMaxRetryDelay uint
|
||||
}
|
||||
|
||||
func (s *InfluxSink) connect() error {
|
||||
@ -52,6 +61,12 @@ func (s *InfluxSink) connect() error {
|
||||
InsecureSkipVerify: true,
|
||||
},
|
||||
)
|
||||
|
||||
clientOptions.SetMaxRetryInterval(s.influxRetryInterval)
|
||||
clientOptions.SetMaxRetryTime(s.influxMaxRetryTime)
|
||||
clientOptions.SetExponentialBase(s.config.InfluxExponentialBase)
|
||||
clientOptions.SetMaxRetries(s.config.InfluxMaxRetries)
|
||||
|
||||
s.client = influxdb2.NewClientWithOptions(uri, auth, clientOptions)
|
||||
s.writeApi = s.client.WriteAPIBlocking(s.config.Organization, s.config.Database)
|
||||
ok, err := s.client.Ping(context.Background())
|
||||
@ -91,6 +106,13 @@ func NewInfluxSink(name string, config json.RawMessage) (Sink, error) {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
s.influxRetryInterval = uint(time.Duration(1) * time.Second)
|
||||
s.config.InfluxRetryInterval = "1s"
|
||||
s.influxMaxRetryTime = uint(7 * time.Duration(24) * time.Hour)
|
||||
s.config.InfluxMaxRetryTime = "168h"
|
||||
s.config.InfluxMaxRetries = 20
|
||||
s.config.InfluxExponentialBase = 2
|
||||
|
||||
if len(s.config.Host) == 0 ||
|
||||
len(s.config.Port) == 0 ||
|
||||
len(s.config.Database) == 0 ||
|
||||
@ -99,6 +121,16 @@ func NewInfluxSink(name string, config json.RawMessage) (Sink, error) {
|
||||
return nil, errors.New("not all configuration variables set required by InfluxSink")
|
||||
}
|
||||
|
||||
toUint := func(duration string, def uint) uint {
|
||||
t, err := time.ParseDuration(duration)
|
||||
if err == nil {
|
||||
return uint(t.Milliseconds())
|
||||
}
|
||||
return def
|
||||
}
|
||||
s.influxRetryInterval = toUint(s.config.InfluxRetryInterval, s.influxRetryInterval)
|
||||
s.influxMaxRetryTime = toUint(s.config.InfluxMaxRetryTime, s.influxMaxRetryTime)
|
||||
|
||||
// Connect to InfluxDB server
|
||||
if err := s.connect(); err != nil {
|
||||
return nil, fmt.Errorf("unable to connect: %v", err)
|
||||
|
@ -17,6 +17,10 @@ The `influxdb` sink uses the official [InfluxDB golang client](https://pkg.go.de
|
||||
"password" : "examplepw",
|
||||
"organization": "myorg",
|
||||
"ssl": true,
|
||||
"retry_interval" : "1s",
|
||||
"retry_exponential_base" : 2,
|
||||
"max_retries": 20,
|
||||
"max_retry_time" : "168h"
|
||||
}
|
||||
}
|
||||
```
|
||||
@ -30,3 +34,9 @@ The `influxdb` sink uses the official [InfluxDB golang client](https://pkg.go.de
|
||||
- `password`: Password for basic authentification
|
||||
- `organization`: Organization in the InfluxDB
|
||||
- `ssl`: Use SSL connection
|
||||
- `retry_interval`: Base retry interval for failed write requests, default 1s
|
||||
- `retry_exponential_base`: The retry interval is exponentially increased with this base, default 2
|
||||
- `max_retries`: Maximal number of retry attempts
|
||||
- `max_retry_time`: Maximal time to retry failed writes, default 168h (one week)
|
||||
|
||||
For information about the calculation of the retry interval settings, see [offical influxdb-client-go documentation](https://github.com/influxdata/influxdb-client-go#handling-of-failed-async-writes)
|
Loading…
Reference in New Issue
Block a user