Add configurable batch size

This commit is contained in:
Thomas Roehl 2022-02-07 16:33:46 +01:00
parent d3dc796244
commit 7d5b3c7d73
2 changed files with 10 additions and 2 deletions

View File

@ -24,6 +24,7 @@ type InfluxAsyncSinkConfig struct {
Organization string `json:"organization,omitempty"` Organization string `json:"organization,omitempty"`
SSL bool `json:"ssl,omitempty"` SSL bool `json:"ssl,omitempty"`
RetentionPol string `json:"retention_policy,omitempty"` RetentionPol string `json:"retention_policy,omitempty"`
BatchSize uint `json:"batch_size,omitempty"`
} }
type InfluxAsyncSink struct { type InfluxAsyncSink struct {
@ -49,8 +50,12 @@ func (s *InfluxAsyncSink) connect() error {
auth = fmt.Sprintf("%s:%s", s.config.User, s.config.Password) auth = fmt.Sprintf("%s:%s", s.config.User, s.config.Password)
} }
cclog.ComponentDebug(s.name, "Using URI", uri, "Org", s.config.Organization, "Bucket", s.config.Database) cclog.ComponentDebug(s.name, "Using URI", uri, "Org", s.config.Organization, "Bucket", s.config.Database)
batch := s.config.BatchSize
if batch == 0 {
batch = 100
}
s.client = influxdb2.NewClientWithOptions(uri, auth, s.client = influxdb2.NewClientWithOptions(uri, auth,
influxdb2.DefaultOptions().SetBatchSize(20).SetTLSConfig(&tls.Config{ influxdb2.DefaultOptions().SetBatchSize(batch).SetTLSConfig(&tls.Config{
InsecureSkipVerify: true, InsecureSkipVerify: true,
})) }))
s.writeApi = s.client.WriteAPI(s.config.Organization, s.config.Database) s.writeApi = s.client.WriteAPI(s.config.Organization, s.config.Database)
@ -59,6 +64,7 @@ func (s *InfluxAsyncSink) connect() error {
func (s *InfluxAsyncSink) Init(config json.RawMessage) error { func (s *InfluxAsyncSink) Init(config json.RawMessage) error {
s.name = "InfluxSink" s.name = "InfluxSink"
s.config.BatchSize = 100
if len(config) > 0 { if len(config) > 0 {
err := json.Unmarshal(config, &s.config) err := json.Unmarshal(config, &s.config)
if err != nil { if err != nil {

View File

@ -17,6 +17,7 @@ The `influxasync` sink uses the official [InfluxDB golang client](https://pkg.go
"password" : "examplepw", "password" : "examplepw",
"organization": "myorg", "organization": "myorg",
"ssl": true, "ssl": true,
"batch_size": 200,
} }
} }
``` ```
@ -30,3 +31,4 @@ The `influxasync` sink uses the official [InfluxDB golang client](https://pkg.go
- `password`: Password for basic authentification - `password`: Password for basic authentification
- `organization`: Organization in the InfluxDB - `organization`: Organization in the InfluxDB
- `ssl`: Use SSL connection - `ssl`: Use SSL connection
- `batch_size`: batch up metrics internally, default 100