Add non-blocking InfluxDB sink (#29)

* Add non-blocking InfluxDB sink

* Add configurable batch size
This commit is contained in:
Thomas Gruber 2022-02-07 16:51:46 +01:00 committed by GitHub
parent 6dd95d6fed
commit d1e66201a6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 160 additions and 6 deletions

119
sinks/influxAsyncSink.go Normal file
View File

@ -0,0 +1,119 @@
package sinks
import (
// "context"
"crypto/tls"
"encoding/json"
"errors"
"fmt"
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
influxdb2Api "github.com/influxdata/influxdb-client-go/v2/api"
"github.com/influxdata/influxdb-client-go/v2/api/write"
)
type InfluxAsyncSinkConfig struct {
defaultSinkConfig
Host string `json:"host,omitempty"`
Port string `json:"port,omitempty"`
Database string `json:"database,omitempty"`
User string `json:"user,omitempty"`
Password string `json:"password,omitempty"`
Organization string `json:"organization,omitempty"`
SSL bool `json:"ssl,omitempty"`
RetentionPol string `json:"retention_policy,omitempty"`
BatchSize uint `json:"batch_size,omitempty"`
}
type InfluxAsyncSink struct {
sink
client influxdb2.Client
writeApi influxdb2Api.WriteAPI
retPolicy string
errors <-chan error
config InfluxAsyncSinkConfig
}
func (s *InfluxAsyncSink) connect() error {
var auth string
var uri string
if s.config.SSL {
uri = fmt.Sprintf("https://%s:%s", s.config.Host, s.config.Port)
} else {
uri = fmt.Sprintf("http://%s:%s", s.config.Host, s.config.Port)
}
if len(s.config.User) == 0 {
auth = s.config.Password
} else {
auth = fmt.Sprintf("%s:%s", s.config.User, s.config.Password)
}
cclog.ComponentDebug(s.name, "Using URI", uri, "Org", s.config.Organization, "Bucket", s.config.Database)
batch := s.config.BatchSize
if batch == 0 {
batch = 100
}
s.client = influxdb2.NewClientWithOptions(uri, auth,
influxdb2.DefaultOptions().SetBatchSize(batch).SetTLSConfig(&tls.Config{
InsecureSkipVerify: true,
}))
s.writeApi = s.client.WriteAPI(s.config.Organization, s.config.Database)
return nil
}
func (s *InfluxAsyncSink) Init(config json.RawMessage) error {
s.name = "InfluxSink"
s.config.BatchSize = 100
if len(config) > 0 {
err := json.Unmarshal(config, &s.config)
if err != nil {
return err
}
}
if len(s.config.Host) == 0 ||
len(s.config.Port) == 0 ||
len(s.config.Database) == 0 ||
len(s.config.Organization) == 0 ||
len(s.config.Password) == 0 {
return errors.New("not all configuration variables set required by InfluxAsyncSink")
}
err := s.connect()
s.errors = s.writeApi.Errors()
go func() {
for err := range s.errors {
cclog.ComponentError(s.name, err.Error())
}
}()
return err
}
func (s *InfluxAsyncSink) Write(point lp.CCMetric) error {
var p *write.Point
if s.config.MetaAsTags {
tags := map[string]string{}
for k, v := range point.Tags() {
tags[k] = v
}
for k, v := range point.Meta() {
tags[k] = v
}
p = influxdb2.NewPoint(point.Name(), tags, point.Fields(), point.Time())
} else {
p = influxdb2.NewPoint(point.Name(), point.Tags(), point.Fields(), point.Time())
}
s.writeApi.WritePoint(p)
return nil
}
func (s *InfluxAsyncSink) Flush() error {
s.writeApi.Flush()
return nil
}
func (s *InfluxAsyncSink) Close() {
cclog.ComponentDebug(s.name, "Closing InfluxDB connection")
s.writeApi.Flush()
s.client.Close()
}

34
sinks/influxAsyncSink.md Normal file
View File

@ -0,0 +1,34 @@
## `influxasync` sink
The `influxasync` sink uses the official [InfluxDB golang client](https://pkg.go.dev/github.com/influxdata/influxdb-client-go/v2) to write the metrics to an InfluxDB database in a **non-blocking** fashion. It provides only support for V2 write endpoints (InfluxDB 1.8.0 or later).
### Configuration structure
```json
{
"<name>": {
"type": "influxasync",
"meta_as_tags" : true,
"database" : "mymetrics",
"host": "dbhost.example.com",
"port": "4222",
"user": "exampleuser",
"password" : "examplepw",
"organization": "myorg",
"ssl": true,
"batch_size": 200,
}
}
```
- `type`: makes the sink an `influxdb` sink
- `meta_as_tags`: print all meta information as tags in the output (optional)
- `database`: All metrics are written to this bucket
- `host`: Hostname of the InfluxDB database server
- `port`: Portnumber (as string) of the InfluxDB database server
- `user`: Username for basic authentification
- `password`: Password for basic authentification
- `organization`: Organization in the InfluxDB
- `ssl`: Use SSL connection
- `batch_size`: batch up metrics internally, default 100

View File

@ -1,6 +1,6 @@
## `influxdb` sink ## `influxdb` sink
The `influxdb` sink uses the official [InfluxDB golang client](https://pkg.go.dev/github.com/influxdata/influxdb-client-go/v2) to write the metrics to an InfluxDB database. It provides only support for V2 write endpoints (InfluxDB 1.8.0 or later). The `influxdb` sink uses the official [InfluxDB golang client](https://pkg.go.dev/github.com/influxdata/influxdb-client-go/v2) to write the metrics to an InfluxDB database in a **blocking** fashion. It provides only support for V2 write endpoints (InfluxDB 1.8.0 or later).
### Configuration structure ### Configuration structure

View File

@ -12,11 +12,12 @@ import (
// Map of all available sinks // Map of all available sinks
var AvailableSinks = map[string]Sink{ var AvailableSinks = map[string]Sink{
"influxdb": new(InfluxSink), "influxdb": new(InfluxSink),
"stdout": new(StdoutSink), "stdout": new(StdoutSink),
"nats": new(NatsSink), "nats": new(NatsSink),
"http": new(HttpSink), "http": new(HttpSink),
"ganglia": new(GangliaSink), "ganglia": new(GangliaSink),
"influxasync": new(InfluxAsyncSink),
} }
// Metric collector manager data structure // Metric collector manager data structure