diff --git a/sinks/natsSink.go b/sinks/natsSink.go index 0597e9b..3e2e728 100644 --- a/sinks/natsSink.go +++ b/sinks/natsSink.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + "sync" "time" cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" @@ -15,11 +16,12 @@ import ( type NatsSinkConfig struct { defaultSinkConfig - Host string `json:"host,omitempty"` - Port string `json:"port,omitempty"` - Database string `json:"database,omitempty"` - User string `json:"user,omitempty"` - Password string `json:"password,omitempty"` + Host string `json:"host,omitempty"` + Port string `json:"port,omitempty"` + Subject string `json:"subject,omitempty"` + User string `json:"user,omitempty"` + Password string `json:"password,omitempty"` + FlushDelay string `json:"flush-delay,omitempty"` } type NatsSink struct { @@ -28,6 +30,10 @@ type NatsSink struct { encoder *influx.Encoder buffer *bytes.Buffer config NatsSinkConfig + + lock sync.Mutex + flushDelay time.Duration + flushTimer *time.Timer } func (s *NatsSink) connect() error { @@ -54,37 +60,49 @@ func (s *NatsSink) connect() error { } func (s *NatsSink) Write(m lp.CCMetric) error { - if s.client != nil { - _, err := s.encoder.Encode(m.ToPoint(s.meta_as_tags)) - if err != nil { - cclog.ComponentError(s.name, "Write:", err.Error()) - return err - } + s.lock.Lock() + _, err := s.encoder.Encode(m.ToPoint(s.meta_as_tags)) + s.lock.Unlock() + if err != nil { + cclog.ComponentError(s.name, "Write:", err.Error()) + return err } + + if s.flushDelay == 0 { + s.Flush() + } else if s.flushTimer == nil { + s.flushTimer = time.AfterFunc(s.flushDelay, func() { + s.Flush() + }) + } else { + s.flushTimer.Reset(s.flushDelay) + } + return nil } func (s *NatsSink) Flush() error { - if s.client != nil { - if err := s.client.Publish(s.config.Database, s.buffer.Bytes()); err != nil { - cclog.ComponentError(s.name, "Flush:", err.Error()) - return err - } - s.buffer.Reset() + s.lock.Lock() + buf := append([]byte{}, s.buffer.Bytes()...) // copy bytes + s.buffer.Reset() + s.lock.Unlock() + + if err := s.client.Publish(s.config.Subject, buf); err != nil { + cclog.ComponentError(s.name, "Flush:", err.Error()) + return err } return nil } func (s *NatsSink) Close() { - if s.client != nil { - cclog.ComponentDebug(s.name, "Close") - s.client.Close() - } + cclog.ComponentDebug(s.name, "Close") + s.client.Close() } func NewNatsSink(name string, config json.RawMessage) (Sink, error) { s := new(NatsSink) s.name = fmt.Sprintf("NatsSink(%s)", name) + s.flushDelay = 10 * time.Second if len(config) > 0 { err := json.Unmarshal(config, &s.config) if err != nil { @@ -94,7 +112,7 @@ func NewNatsSink(name string, config json.RawMessage) (Sink, error) { } if len(s.config.Host) == 0 || len(s.config.Port) == 0 || - len(s.config.Database) == 0 { + len(s.config.Subject) == 0 { return nil, errors.New("not all configuration variables set required by NatsSink") } // Create lookup map to use meta infos as tags in the output metric @@ -112,5 +130,15 @@ func NewNatsSink(name string, config json.RawMessage) (Sink, error) { if err := s.connect(); err != nil { return nil, fmt.Errorf("unable to connect: %v", err) } + + s.flushTimer = nil + if len(s.config.FlushDelay) != 0 { + var err error + s.flushDelay, err = time.ParseDuration(s.config.FlushDelay) + if err != nil { + return nil, err + } + } + return s, nil }