Add config option flush_interval

This commit is contained in:
Holger Obermaier 2022-02-10 09:43:02 +01:00
parent 82138df48e
commit 2aa8c812a6

View File

@ -22,7 +22,10 @@ type InfluxAsyncSinkConfig struct {
Organization string `json:"organization,omitempty"` Organization string `json:"organization,omitempty"`
SSL bool `json:"ssl,omitempty"` SSL bool `json:"ssl,omitempty"`
RetentionPol string `json:"retention_policy,omitempty"` RetentionPol string `json:"retention_policy,omitempty"`
// Maximum number of points sent to server in single request. Default 5000
BatchSize uint `json:"batch_size,omitempty"` BatchSize uint `json:"batch_size,omitempty"`
// Interval, in ms, in which is buffer flushed if it has not been already written (by reaching batch size) . Default 1000ms
FlushInterval uint `json:"flush_interval,omitempty"`
} }
type InfluxAsyncSink struct { type InfluxAsyncSink struct {
@ -48,12 +51,13 @@ func (s *InfluxAsyncSink) connect() error {
auth = fmt.Sprintf("%s:%s", s.config.User, s.config.Password) auth = fmt.Sprintf("%s:%s", s.config.User, s.config.Password)
} }
cclog.ComponentDebug(s.name, "Using URI", uri, "Org", s.config.Organization, "Bucket", s.config.Database) cclog.ComponentDebug(s.name, "Using URI", uri, "Org", s.config.Organization, "Bucket", s.config.Database)
batch := s.config.BatchSize
if batch == 0 {
batch = 100
}
clientOptions := influxdb2.DefaultOptions() clientOptions := influxdb2.DefaultOptions()
clientOptions.SetBatchSize(batch) if s.config.BatchSize != 0 {
clientOptions.SetBatchSize(s.config.BatchSize)
}
if s.config.FlushInterval != 0 {
clientOptions.SetFlushInterval(s.config.FlushInterval)
}
clientOptions.SetTLSConfig( clientOptions.SetTLSConfig(
&tls.Config{ &tls.Config{
InsecureSkipVerify: true, InsecureSkipVerify: true,
@ -66,7 +70,10 @@ func (s *InfluxAsyncSink) connect() error {
func (s *InfluxAsyncSink) Init(config json.RawMessage) error { func (s *InfluxAsyncSink) Init(config json.RawMessage) error {
s.name = "InfluxSink" s.name = "InfluxSink"
// Set default for maximum number of points sent to server in single request.
s.config.BatchSize = 100 s.config.BatchSize = 100
if len(config) > 0 { if len(config) > 0 {
err := json.Unmarshal(config, &s.config) err := json.Unmarshal(config, &s.config)
if err != nil { if err != nil {