From cdc1811576796e24d1eeac8a64c4b565792f2ece Mon Sep 17 00:00:00 2001 From: Lou Knauer Date: Tue, 12 Oct 2021 13:43:58 +0200 Subject: [PATCH] Add Flush method to sink interface --- metric-collector.go | 4 ++++ sinks/influxSink.go | 9 +++++++-- sinks/metricSink.go | 1 + sinks/natsSink.go | 4 ++++ sinks/stdoutSink.go | 5 +++++ 5 files changed, 21 insertions(+), 2 deletions(-) diff --git a/metric-collector.go b/metric-collector.go index ebcba52..ddaa865 100644 --- a/metric-collector.go +++ b/metric-collector.go @@ -269,6 +269,10 @@ func main() { tmpPoints = tmpPoints[1:] } } + + if err := sink.Flush(); err != nil { + log.Printf("sink error: %s\n", err) + } } } }() diff --git a/sinks/influxSink.go b/sinks/influxSink.go index 1b97588..40e681f 100644 --- a/sinks/influxSink.go +++ b/sinks/influxSink.go @@ -5,6 +5,7 @@ import ( "crypto/tls" "errors" "fmt" + influxdb2 "github.com/influxdata/influxdb-client-go/v2" influxdb2Api "github.com/influxdata/influxdb-client-go/v2/api" lp "github.com/influxdata/line-protocol" @@ -57,8 +58,8 @@ func (s *InfluxSink) Init(config SinkConfig) error { } func (s *InfluxSink) Write(point lp.MutableMetric) error { - var tags map[string]string - var fields map[string]interface{} + tags := map[string]string{} + fields := map[string]interface{}{} for _, t := range point.TagList() { tags[t.Key] = t.Value } @@ -70,6 +71,10 @@ func (s *InfluxSink) Write(point lp.MutableMetric) error { return err } +func (s *InfluxSink) Flush() error { + return nil +} + func (s *InfluxSink) Close() { log.Print("Closing InfluxDB connection") s.client.Close() diff --git a/sinks/metricSink.go b/sinks/metricSink.go index 3b70f78..182495a 100644 --- a/sinks/metricSink.go +++ b/sinks/metricSink.go @@ -29,5 +29,6 @@ type Sink struct { type SinkFuncs interface { Init(config SinkConfig) error Write(point lp.MutableMetric) error + Flush() error Close() } diff --git a/sinks/natsSink.go b/sinks/natsSink.go index efdd3fd..0df14f4 100644 --- a/sinks/natsSink.go +++ b/sinks/natsSink.go @@ -79,6 +79,10 @@ func (s *NatsSink) Write(point lp.MutableMetric) error { return nil } +func (s *NatsSink) Flush() error { + return nil +} + func (s *NatsSink) Close() { log.Print("Closing Nats connection") if s.client != nil { diff --git a/sinks/stdoutSink.go b/sinks/stdoutSink.go index a8f7a00..34561e0 100644 --- a/sinks/stdoutSink.go +++ b/sinks/stdoutSink.go @@ -4,6 +4,7 @@ import ( "fmt" "math" "strings" + // "time" lp "github.com/influxdata/line-protocol" ) @@ -54,6 +55,10 @@ func (s *StdoutSink) Write(point lp.MutableMetric) error { return nil } +func (s *StdoutSink) Flush() error { + return nil +} + func (s *StdoutSink) Close() { return }