Add Flush method to sink interface

This commit is contained in:
Lou Knauer 2021-10-12 13:43:58 +02:00
parent dc4b8d13c2
commit cdc1811576
5 changed files with 21 additions and 2 deletions

View File

@ -269,6 +269,10 @@ func main() {
tmpPoints = tmpPoints[1:] tmpPoints = tmpPoints[1:]
} }
} }
if err := sink.Flush(); err != nil {
log.Printf("sink error: %s\n", err)
}
} }
} }
}() }()

View File

@ -5,6 +5,7 @@ import (
"crypto/tls" "crypto/tls"
"errors" "errors"
"fmt" "fmt"
influxdb2 "github.com/influxdata/influxdb-client-go/v2" influxdb2 "github.com/influxdata/influxdb-client-go/v2"
influxdb2Api "github.com/influxdata/influxdb-client-go/v2/api" influxdb2Api "github.com/influxdata/influxdb-client-go/v2/api"
lp "github.com/influxdata/line-protocol" lp "github.com/influxdata/line-protocol"
@ -57,8 +58,8 @@ func (s *InfluxSink) Init(config SinkConfig) error {
} }
func (s *InfluxSink) Write(point lp.MutableMetric) error { func (s *InfluxSink) Write(point lp.MutableMetric) error {
var tags map[string]string tags := map[string]string{}
var fields map[string]interface{} fields := map[string]interface{}{}
for _, t := range point.TagList() { for _, t := range point.TagList() {
tags[t.Key] = t.Value tags[t.Key] = t.Value
} }
@ -70,6 +71,10 @@ func (s *InfluxSink) Write(point lp.MutableMetric) error {
return err return err
} }
func (s *InfluxSink) Flush() error {
return nil
}
func (s *InfluxSink) Close() { func (s *InfluxSink) Close() {
log.Print("Closing InfluxDB connection") log.Print("Closing InfluxDB connection")
s.client.Close() s.client.Close()

View File

@ -29,5 +29,6 @@ type Sink struct {
type SinkFuncs interface { type SinkFuncs interface {
Init(config SinkConfig) error Init(config SinkConfig) error
Write(point lp.MutableMetric) error Write(point lp.MutableMetric) error
Flush() error
Close() Close()
} }

View File

@ -79,6 +79,10 @@ func (s *NatsSink) Write(point lp.MutableMetric) error {
return nil return nil
} }
func (s *NatsSink) Flush() error {
return nil
}
func (s *NatsSink) Close() { func (s *NatsSink) Close() {
log.Print("Closing Nats connection") log.Print("Closing Nats connection")
if s.client != nil { if s.client != nil {

View File

@ -4,6 +4,7 @@ import (
"fmt" "fmt"
"math" "math"
"strings" "strings"
// "time" // "time"
lp "github.com/influxdata/line-protocol" lp "github.com/influxdata/line-protocol"
) )
@ -54,6 +55,10 @@ func (s *StdoutSink) Write(point lp.MutableMetric) error {
return nil return nil
} }
func (s *StdoutSink) Flush() error {
return nil
}
func (s *StdoutSink) Close() { func (s *StdoutSink) Close() {
return return
} }