From aa8cac14d7bc39b1dcd232711344604aac9ee855 Mon Sep 17 00:00:00 2001 From: Thomas Roehl Date: Wed, 30 Jun 2021 17:02:40 +0200 Subject: [PATCH] InfluxDB sink with non-blocking writes --- metric-collector.go | 1 + sinks/README.md | 1 + sinks/influxAsyncSink.go | 80 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 82 insertions(+) create mode 100644 sinks/influxAsyncSink.go diff --git a/metric-collector.go b/metric-collector.go index 840837f..c4e7538 100644 --- a/metric-collector.go +++ b/metric-collector.go @@ -33,6 +33,7 @@ var Sinks = map[string]sinks.SinkFuncs{ "influxdb": &sinks.InfluxSink{}, "stdout": &sinks.StdoutSink{}, "nats": &sinks.NatsSink{}, + "influxasync": &sinks.InfluxAsyncSink{}, } var Receivers = map[string]receivers.ReceiverFuncs{ diff --git a/sinks/README.md b/sinks/README.md index 059ed88..4ab2a2f 100644 --- a/sinks/README.md +++ b/sinks/README.md @@ -6,6 +6,7 @@ The base class/configuration is located in `metricSink.go`. # Sinks * `stdoutSink.go`: Writes all metrics to `stdout` in InfluxDB line protocol. The sink does not use https://github.com/influxdata/line-protocol to reduce the executed code for debugging * `influxSink.go`: Writes all metrics to an InfluxDB database instance using a blocking writer. It uses https://github.com/influxdata/influxdb-client-go . Configuration for the server, port, user, password, database name and organisation are in the global configuration file. It uses the v2 API of Influx. +* `influxAsyncSink.go`: Writes all metrics to an InfluxDB database instance using a non-blocking writer. It uses https://github.com/influxdata/influxdb-client-go . Configuration for the server, port, ssl, password, database name and organisation are in the global configuration file. The 'password' is used for the token and the 'database' for the bucket. It uses the v2 API of Influx. * `natsSink.go`: Sends all metrics to an NATS server using the InfluxDB line protocol as encoding. It uses https://github.com/nats-io/nats.go . Configuration for the server, port, user, password and database name are in the global configuration file. The database name is used as subject for the NATS messages. # Installation diff --git a/sinks/influxAsyncSink.go b/sinks/influxAsyncSink.go new file mode 100644 index 0000000..3564848 --- /dev/null +++ b/sinks/influxAsyncSink.go @@ -0,0 +1,80 @@ +package sinks + +import ( + // "context" + "crypto/tls" + "errors" + "fmt" + influxdb2 "github.com/influxdata/influxdb-client-go/v2" + influxdb2Api "github.com/influxdata/influxdb-client-go/v2/api" + "log" + "time" +) + +type InfluxAsyncSink struct { + Sink + client influxdb2.Client + writeApi influxdb2Api.WriteAPI + retPolicy string + errors <-chan error +} + +func (s *InfluxAsyncSink) connect() error { + var auth string + var uri string + if s.ssl { + uri = fmt.Sprintf("https://%s:%s", s.host, s.port) + } else { + uri = fmt.Sprintf("http://%s:%s", s.host, s.port) + } + if len(s.user) == 0 { + auth = s.password + } else { + auth = fmt.Sprintf("%s:%s", s.user, s.password) + } + log.Print("Using URI ", uri, " Org ", s.organization, " Bucket ", s.database) + s.client = influxdb2.NewClientWithOptions(uri, auth, + influxdb2.DefaultOptions().SetBatchSize(20).SetTLSConfig(&tls.Config{ + InsecureSkipVerify: true, + })) + s.writeApi = s.client.WriteAPI(s.organization, s.database) + return nil +} + +func (s *InfluxAsyncSink) Init(config SinkConfig) error { + if len(config.Host) == 0 || + len(config.Port) == 0 || + len(config.Database) == 0 || + len(config.Organization) == 0 || + len(config.Password) == 0 { + return errors.New("Not all configuration variables set required by InfluxSink") + } + s.host = config.Host + s.port = config.Port + s.database = config.Database + s.organization = config.Organization + s.user = config.User + s.password = config.Password + s.ssl = config.SSL + err := s.connect() + s.errors = s.writeApi.Errors() + go func() { + for err := range s.errors { + log.Print(err) + } + }() + return err +} + +func (s *InfluxAsyncSink) Write(measurement string, tags map[string]string, fields map[string]interface{}, t time.Time) error { + p := influxdb2.NewPoint(measurement, tags, fields, t) + s.writeApi.WritePoint(p) + s.writeApi.Flush() + return nil +} + +func (s *InfluxAsyncSink) Close() { + log.Print("Closing InfluxDB connection") + s.writeApi.Flush() + s.client.Close() +}