Merge branch 'main' into sqlite3_sink

This commit is contained in:
Thomas Roehl
2021-11-25 18:23:04 +01:00
34 changed files with 2446 additions and 470 deletions

View File

@@ -5,9 +5,10 @@ The base class/configuration is located in `metricSink.go`.
# Sinks
* `stdoutSink.go`: Writes all metrics to `stdout` in InfluxDB line protocol. The sink does not use https://github.com/influxdata/line-protocol to reduce the executed code for debugging
* `influxSink.go`: Writes all metrics to an InfluxDB database instance using a blocking writer. It uses https://github.com/influxdata/influxdb-client-go . Configuration for the server, port, user, password and database name are in the global configuration file
* `influxSink.go`: Writes all metrics to an InfluxDB database instance using a blocking writer. It uses https://github.com/influxdata/influxdb-client-go . Configuration for the server, port, ssl, password, database name and organisation are in the global configuration file. The 'password' is used for the token and the 'database' for the bucket. It uses the v2 API of Influx.
* `natsSink.go`: Sends all metrics to an NATS server using the InfluxDB line protocol as encoding. It uses https://github.com/nats-io/nats.go . Configuration for the server, port, user, password and database name are in the global configuration file. The database name is used as subject for the NATS messages.
* `sqliteSink.go`: Writes all metrics to a Sqlite3 database. It uses https://github.com/mattn/go-sqlite3
* `httpSink.go`: Sends all metrics to an HTTP endpoint `http://<host>:<port>/<database>` using a POST request. The body of the request will consist of lines in the InfluxDB line protocol. In case password is specified, that password is used as a JWT in the 'Authorization' header.
# Installation
Nothing to do, all sinks are pure Go code

68
sinks/httpSink.go Normal file
View File

@@ -0,0 +1,68 @@
package sinks
import (
"bytes"
"errors"
"fmt"
"net/http"
"time"
lp "github.com/influxdata/line-protocol"
)
type HttpSink struct {
Sink
client *http.Client
url, jwt string
encoder *lp.Encoder
buffer *bytes.Buffer
}
func (s *HttpSink) Init(config SinkConfig) error {
if len(config.Host) == 0 || len(config.Port) == 0 {
return errors.New("`host`, `port` and `database` config options required for TCP sink")
}
s.client = &http.Client{}
s.url = fmt.Sprintf("http://%s:%s/%s", config.Host, config.Port, config.Database)
s.port = config.Port
s.jwt = config.Password
s.buffer = &bytes.Buffer{}
s.encoder = lp.NewEncoder(s.buffer)
s.encoder.SetPrecision(time.Second)
return nil
}
func (s *HttpSink) Write(point lp.MutableMetric) error {
_, err := s.encoder.Encode(point)
return err
}
func (s *HttpSink) Flush() error {
req, err := http.NewRequest(http.MethodPost, s.url, s.buffer)
if err != nil {
return err
}
if len(s.jwt) != 0 {
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", s.jwt))
}
res, err := s.client.Do(req)
s.buffer.Reset()
if err != nil {
return err
}
if res.StatusCode != 200 {
return errors.New(res.Status)
}
return nil
}
func (s *HttpSink) Close() {
s.client.CloseIdleConnections()
}

View File

@@ -2,12 +2,14 @@ package sinks
import (
"context"
"crypto/tls"
"errors"
"fmt"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
influxdb2Api "github.com/influxdata/influxdb-client-go/v2/api"
lp "github.com/influxdata/line-protocol"
"log"
"time"
)
type InfluxSink struct {
@@ -19,14 +21,20 @@ type InfluxSink struct {
func (s *InfluxSink) connect() error {
var auth string
uri := fmt.Sprintf("http://%s:%s", s.host, s.port)
var uri string
if s.ssl {
uri = fmt.Sprintf("https://%s:%s", s.host, s.port)
} else {
uri = fmt.Sprintf("http://%s:%s", s.host, s.port)
}
if len(s.user) == 0 {
auth = s.password
} else {
auth = fmt.Sprintf("%s:%s", s.user, s.password)
}
log.Print("Using URI ", uri, " Org ", s.organization, " Bucket ", s.database)
s.client = influxdb2.NewClient(uri, auth)
s.client = influxdb2.NewClientWithOptions(uri, auth,
influxdb2.DefaultOptions().SetTLSConfig(&tls.Config{InsecureSkipVerify: true}))
s.writeApi = s.client.WriteAPIBlocking(s.organization, s.database)
return nil
}
@@ -45,15 +53,28 @@ func (s *InfluxSink) Init(config SinkConfig) error {
s.organization = config.Organization
s.user = config.User
s.password = config.Password
s.ssl = config.SSL
return s.connect()
}
func (s *InfluxSink) Write(measurement string, tags map[string]string, fields map[string]interface{}, t time.Time) error {
p := influxdb2.NewPoint(measurement, tags, fields, t)
func (s *InfluxSink) Write(point lp.MutableMetric) error {
tags := map[string]string{}
fields := map[string]interface{}{}
for _, t := range point.TagList() {
tags[t.Key] = t.Value
}
for _, f := range point.FieldList() {
fields[f.Key] = f.Value
}
p := influxdb2.NewPoint(point.Name(), tags, fields, point.Time())
err := s.writeApi.WritePoint(context.Background(), p)
return err
}
func (s *InfluxSink) Flush() error {
return nil
}
func (s *InfluxSink) Close() {
log.Print("Closing InfluxDB connection")
s.client.Close()

View File

@@ -1,7 +1,8 @@
package sinks
import (
"time"
// "time"
lp "github.com/influxdata/line-protocol"
)
type SinkConfig struct {
@@ -12,6 +13,7 @@ type SinkConfig struct {
Password string `json:"password"`
Organization string `json:"organization"`
Type string `json:"type"`
SSL bool `json:"ssl"`
}
type Sink struct {
@@ -21,10 +23,12 @@ type Sink struct {
password string
database string
organization string
ssl bool
}
type SinkFuncs interface {
Init(config SinkConfig) error
Write(measurement string, tags map[string]string, fields map[string]interface{}, t time.Time) error
Write(point lp.MutableMetric) error
Flush() error
Close()
}

View File

@@ -4,7 +4,7 @@ import (
"bytes"
"errors"
"fmt"
protocol "github.com/influxdata/line-protocol"
lp "github.com/influxdata/line-protocol"
nats "github.com/nats-io/nats.go"
"log"
"time"
@@ -13,7 +13,7 @@ import (
type NatsSink struct {
Sink
client *nats.Conn
encoder *protocol.Encoder
encoder *lp.Encoder
buffer *bytes.Buffer
}
@@ -46,31 +46,43 @@ func (s *NatsSink) Init(config SinkConfig) error {
// Setup Influx line protocol
s.buffer = &bytes.Buffer{}
s.buffer.Grow(1025)
s.encoder = protocol.NewEncoder(s.buffer)
s.encoder = lp.NewEncoder(s.buffer)
s.encoder.SetPrecision(time.Second)
s.encoder.SetMaxLineBytes(1024)
// Setup infos for connection
return s.connect()
}
func (s *NatsSink) Write(measurement string, tags map[string]string, fields map[string]interface{}, t time.Time) error {
func (s *NatsSink) Write(point lp.MutableMetric) error {
if s.client != nil {
m, err := protocol.New(measurement, tags, fields, t)
if err != nil {
log.Print(err)
return err
}
_, err = s.encoder.Encode(m)
// var tags map[string]string
// var fields map[string]interface{}
// for _, t := range point.TagList() {
// tags[t.Key] = t.Value
// }
// for _, f := range point.FieldList() {
// fields[f.Key] = f.Value
// }
// m, err := protocol.New(point.Name(), tags, fields, point.Time())
// if err != nil {
// log.Print(err)
// return err
// }
_, err := s.encoder.Encode(point)
if err != nil {
log.Print(err)
return err
}
s.client.Publish(s.database, s.buffer.Bytes())
s.buffer.Reset()
}
return nil
}
func (s *NatsSink) Flush() error {
return nil
}
func (s *NatsSink) Close() {
log.Print("Closing Nats connection")
if s.client != nil {

View File

@@ -4,7 +4,9 @@ import (
"fmt"
"math"
"strings"
"time"
// "time"
lp "github.com/influxdata/line-protocol"
)
type StdoutSink struct {
@@ -15,30 +17,48 @@ func (s *StdoutSink) Init(config SinkConfig) error {
return nil
}
func (s *StdoutSink) Write(measurement string, tags map[string]string, fields map[string]interface{}, t time.Time) error {
func (s *StdoutSink) Write(point lp.MutableMetric) error {
var tagsstr []string
var fieldstr []string
for k, v := range tags {
tagsstr = append(tagsstr, fmt.Sprintf("%s=%s", k, v))
for _, t := range point.TagList() {
tagsstr = append(tagsstr, fmt.Sprintf("%s=%s", t.Key, t.Value))
}
for k, v := range fields {
switch v.(type) {
for _, f := range point.FieldList() {
switch f.Value.(type) {
case float64:
if !math.IsNaN(v.(float64)) {
fieldstr = append(fieldstr, fmt.Sprintf("%s=%v", k, v.(float64)))
if !math.IsNaN(f.Value.(float64)) {
fieldstr = append(fieldstr, fmt.Sprintf("%s=%v", f.Key, f.Value.(float64)))
} else {
fieldstr = append(fieldstr, fmt.Sprintf("%s=0.0", f.Key))
}
case float32:
if !math.IsNaN(float64(f.Value.(float32))) {
fieldstr = append(fieldstr, fmt.Sprintf("%s=%v", f.Key, f.Value.(float32)))
} else {
fieldstr = append(fieldstr, fmt.Sprintf("%s=0.0", f.Key))
}
case int:
fieldstr = append(fieldstr, fmt.Sprintf("%s=%d", f.Key, f.Value.(int)))
case int64:
fieldstr = append(fieldstr, fmt.Sprintf("%s=%d", f.Key, f.Value.(int64)))
case string:
fieldstr = append(fieldstr, fmt.Sprintf("%s=%q", k, v.(string)))
fieldstr = append(fieldstr, fmt.Sprintf("%s=%q", f.Key, f.Value.(string)))
default:
fieldstr = append(fieldstr, fmt.Sprintf("%s=%v", f.Key, f.Value))
}
}
if len(tagsstr) > 0 {
fmt.Printf("%s,%s %s %d\n", measurement, strings.Join(tagsstr, ","), strings.Join(fieldstr, ","), t.Unix())
fmt.Printf("%s,%s %s %d\n", point.Name(), strings.Join(tagsstr, ","), strings.Join(fieldstr, ","), point.Time().Unix())
} else {
fmt.Printf("%s %s %d\n", measurement, strings.Join(fieldstr, ","), t.Unix())
fmt.Printf("%s %s %d\n", point.Name(), strings.Join(fieldstr, ","), point.Time().Unix())
}
return nil
}
func (s *StdoutSink) Flush() error {
return nil
}
func (s *StdoutSink) Close() {
return
}