InfluxDB sink with non-blocking writes

This commit is contained in:
Thomas Roehl 2021-06-30 17:02:40 +02:00
parent 586c6c12ac
commit aa8cac14d7
3 changed files with 82 additions and 0 deletions

View File

@ -33,6 +33,7 @@ var Sinks = map[string]sinks.SinkFuncs{
"influxdb": &sinks.InfluxSink{},
"stdout": &sinks.StdoutSink{},
"nats": &sinks.NatsSink{},
"influxasync": &sinks.InfluxAsyncSink{},
}
var Receivers = map[string]receivers.ReceiverFuncs{

View File

@ -6,6 +6,7 @@ The base class/configuration is located in `metricSink.go`.
# Sinks
* `stdoutSink.go`: Writes all metrics to `stdout` in InfluxDB line protocol. The sink does not use https://github.com/influxdata/line-protocol to reduce the executed code for debugging
* `influxSink.go`: Writes all metrics to an InfluxDB database instance using a blocking writer. It uses https://github.com/influxdata/influxdb-client-go . Configuration for the server, port, user, password, database name and organisation are in the global configuration file. It uses the v2 API of Influx.
* `influxAsyncSink.go`: Writes all metrics to an InfluxDB database instance using a non-blocking writer. It uses https://github.com/influxdata/influxdb-client-go . Configuration for the server, port, ssl, password, database name and organisation are in the global configuration file. The 'password' is used for the token and the 'database' for the bucket. It uses the v2 API of Influx.
* `natsSink.go`: Sends all metrics to an NATS server using the InfluxDB line protocol as encoding. It uses https://github.com/nats-io/nats.go . Configuration for the server, port, user, password and database name are in the global configuration file. The database name is used as subject for the NATS messages.
# Installation

80
sinks/influxAsyncSink.go Normal file
View File

@ -0,0 +1,80 @@
package sinks
import (
// "context"
"crypto/tls"
"errors"
"fmt"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
influxdb2Api "github.com/influxdata/influxdb-client-go/v2/api"
"log"
"time"
)
type InfluxAsyncSink struct {
Sink
client influxdb2.Client
writeApi influxdb2Api.WriteAPI
retPolicy string
errors <-chan error
}
func (s *InfluxAsyncSink) connect() error {
var auth string
var uri string
if s.ssl {
uri = fmt.Sprintf("https://%s:%s", s.host, s.port)
} else {
uri = fmt.Sprintf("http://%s:%s", s.host, s.port)
}
if len(s.user) == 0 {
auth = s.password
} else {
auth = fmt.Sprintf("%s:%s", s.user, s.password)
}
log.Print("Using URI ", uri, " Org ", s.organization, " Bucket ", s.database)
s.client = influxdb2.NewClientWithOptions(uri, auth,
influxdb2.DefaultOptions().SetBatchSize(20).SetTLSConfig(&tls.Config{
InsecureSkipVerify: true,
}))
s.writeApi = s.client.WriteAPI(s.organization, s.database)
return nil
}
func (s *InfluxAsyncSink) Init(config SinkConfig) error {
if len(config.Host) == 0 ||
len(config.Port) == 0 ||
len(config.Database) == 0 ||
len(config.Organization) == 0 ||
len(config.Password) == 0 {
return errors.New("Not all configuration variables set required by InfluxSink")
}
s.host = config.Host
s.port = config.Port
s.database = config.Database
s.organization = config.Organization
s.user = config.User
s.password = config.Password
s.ssl = config.SSL
err := s.connect()
s.errors = s.writeApi.Errors()
go func() {
for err := range s.errors {
log.Print(err)
}
}()
return err
}
func (s *InfluxAsyncSink) Write(measurement string, tags map[string]string, fields map[string]interface{}, t time.Time) error {
p := influxdb2.NewPoint(measurement, tags, fields, t)
s.writeApi.WritePoint(p)
s.writeApi.Flush()
return nil
}
func (s *InfluxAsyncSink) Close() {
log.Print("Closing InfluxDB connection")
s.writeApi.Flush()
s.client.Close()
}