From 7708f78796c9c6cc7736fde1aa819db2a6c66e72 Mon Sep 17 00:00:00 2001
From: Lou Knauer <lou.knauer@gmx.de>
Date: Thu, 2 Jun 2022 10:55:11 +0200
Subject: [PATCH] automatic flush in NatsSink

---
 sinks/natsSink.go | 72 ++++++++++++++++++++++++++++++++---------------
 1 file changed, 50 insertions(+), 22 deletions(-)

diff --git a/sinks/natsSink.go b/sinks/natsSink.go
index 0597e9b..3e2e728 100644
--- a/sinks/natsSink.go
+++ b/sinks/natsSink.go
@@ -5,6 +5,7 @@ import (
 	"encoding/json"
 	"errors"
 	"fmt"
+	"sync"
 	"time"
 
 	cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
@@ -15,11 +16,12 @@ import (
 
 type NatsSinkConfig struct {
 	defaultSinkConfig
-	Host     string `json:"host,omitempty"`
-	Port     string `json:"port,omitempty"`
-	Database string `json:"database,omitempty"`
-	User     string `json:"user,omitempty"`
-	Password string `json:"password,omitempty"`
+	Host       string `json:"host,omitempty"`
+	Port       string `json:"port,omitempty"`
+	Subject    string `json:"subject,omitempty"`
+	User       string `json:"user,omitempty"`
+	Password   string `json:"password,omitempty"`
+	FlushDelay string `json:"flush-delay,omitempty"`
 }
 
 type NatsSink struct {
@@ -28,6 +30,10 @@ type NatsSink struct {
 	encoder *influx.Encoder
 	buffer  *bytes.Buffer
 	config  NatsSinkConfig
+
+	lock       sync.Mutex
+	flushDelay time.Duration
+	flushTimer *time.Timer
 }
 
 func (s *NatsSink) connect() error {
@@ -54,37 +60,49 @@ func (s *NatsSink) connect() error {
 }
 
 func (s *NatsSink) Write(m lp.CCMetric) error {
-	if s.client != nil {
-		_, err := s.encoder.Encode(m.ToPoint(s.meta_as_tags))
-		if err != nil {
-			cclog.ComponentError(s.name, "Write:", err.Error())
-			return err
-		}
+	s.lock.Lock()
+	_, err := s.encoder.Encode(m.ToPoint(s.meta_as_tags))
+	s.lock.Unlock()
+	if err != nil {
+		cclog.ComponentError(s.name, "Write:", err.Error())
+		return err
 	}
+
+	if s.flushDelay == 0 {
+		s.Flush()
+	} else if s.flushTimer == nil {
+		s.flushTimer = time.AfterFunc(s.flushDelay, func() {
+			s.Flush()
+		})
+	} else {
+		s.flushTimer.Reset(s.flushDelay)
+	}
+
 	return nil
 }
 
 func (s *NatsSink) Flush() error {
-	if s.client != nil {
-		if err := s.client.Publish(s.config.Database, s.buffer.Bytes()); err != nil {
-			cclog.ComponentError(s.name, "Flush:", err.Error())
-			return err
-		}
-		s.buffer.Reset()
+	s.lock.Lock()
+	buf := append([]byte{}, s.buffer.Bytes()...) // copy bytes
+	s.buffer.Reset()
+	s.lock.Unlock()
+
+	if err := s.client.Publish(s.config.Subject, buf); err != nil {
+		cclog.ComponentError(s.name, "Flush:", err.Error())
+		return err
 	}
 	return nil
 }
 
 func (s *NatsSink) Close() {
-	if s.client != nil {
-		cclog.ComponentDebug(s.name, "Close")
-		s.client.Close()
-	}
+	cclog.ComponentDebug(s.name, "Close")
+	s.client.Close()
 }
 
 func NewNatsSink(name string, config json.RawMessage) (Sink, error) {
 	s := new(NatsSink)
 	s.name = fmt.Sprintf("NatsSink(%s)", name)
+	s.flushDelay = 10 * time.Second
 	if len(config) > 0 {
 		err := json.Unmarshal(config, &s.config)
 		if err != nil {
@@ -94,7 +112,7 @@ func NewNatsSink(name string, config json.RawMessage) (Sink, error) {
 	}
 	if len(s.config.Host) == 0 ||
 		len(s.config.Port) == 0 ||
-		len(s.config.Database) == 0 {
+		len(s.config.Subject) == 0 {
 		return nil, errors.New("not all configuration variables set required by NatsSink")
 	}
 	// Create lookup map to use meta infos as tags in the output metric
@@ -112,5 +130,15 @@ func NewNatsSink(name string, config json.RawMessage) (Sink, error) {
 	if err := s.connect(); err != nil {
 		return nil, fmt.Errorf("unable to connect: %v", err)
 	}
+
+	s.flushTimer = nil
+	if len(s.config.FlushDelay) != 0 {
+		var err error
+		s.flushDelay, err = time.ParseDuration(s.config.FlushDelay)
+		if err != nil {
+			return nil, err
+		}
+	}
+
 	return s, nil
 }