automatic flush in NatsSink

This commit is contained in:
Lou Knauer 2022-06-02 10:55:11 +02:00
parent e13695307f
commit 7708f78796

View File

@ -5,6 +5,7 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"sync"
"time" "time"
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
@ -15,11 +16,12 @@ import (
type NatsSinkConfig struct { type NatsSinkConfig struct {
defaultSinkConfig defaultSinkConfig
Host string `json:"host,omitempty"` Host string `json:"host,omitempty"`
Port string `json:"port,omitempty"` Port string `json:"port,omitempty"`
Database string `json:"database,omitempty"` Subject string `json:"subject,omitempty"`
User string `json:"user,omitempty"` User string `json:"user,omitempty"`
Password string `json:"password,omitempty"` Password string `json:"password,omitempty"`
FlushDelay string `json:"flush-delay,omitempty"`
} }
type NatsSink struct { type NatsSink struct {
@ -28,6 +30,10 @@ type NatsSink struct {
encoder *influx.Encoder encoder *influx.Encoder
buffer *bytes.Buffer buffer *bytes.Buffer
config NatsSinkConfig config NatsSinkConfig
lock sync.Mutex
flushDelay time.Duration
flushTimer *time.Timer
} }
func (s *NatsSink) connect() error { func (s *NatsSink) connect() error {
@ -54,37 +60,49 @@ func (s *NatsSink) connect() error {
} }
func (s *NatsSink) Write(m lp.CCMetric) error { func (s *NatsSink) Write(m lp.CCMetric) error {
if s.client != nil { s.lock.Lock()
_, err := s.encoder.Encode(m.ToPoint(s.meta_as_tags)) _, err := s.encoder.Encode(m.ToPoint(s.meta_as_tags))
if err != nil { s.lock.Unlock()
cclog.ComponentError(s.name, "Write:", err.Error()) if err != nil {
return err cclog.ComponentError(s.name, "Write:", err.Error())
} return err
} }
if s.flushDelay == 0 {
s.Flush()
} else if s.flushTimer == nil {
s.flushTimer = time.AfterFunc(s.flushDelay, func() {
s.Flush()
})
} else {
s.flushTimer.Reset(s.flushDelay)
}
return nil return nil
} }
func (s *NatsSink) Flush() error { func (s *NatsSink) Flush() error {
if s.client != nil { s.lock.Lock()
if err := s.client.Publish(s.config.Database, s.buffer.Bytes()); err != nil { buf := append([]byte{}, s.buffer.Bytes()...) // copy bytes
cclog.ComponentError(s.name, "Flush:", err.Error()) s.buffer.Reset()
return err s.lock.Unlock()
}
s.buffer.Reset() if err := s.client.Publish(s.config.Subject, buf); err != nil {
cclog.ComponentError(s.name, "Flush:", err.Error())
return err
} }
return nil return nil
} }
func (s *NatsSink) Close() { func (s *NatsSink) Close() {
if s.client != nil { cclog.ComponentDebug(s.name, "Close")
cclog.ComponentDebug(s.name, "Close") s.client.Close()
s.client.Close()
}
} }
func NewNatsSink(name string, config json.RawMessage) (Sink, error) { func NewNatsSink(name string, config json.RawMessage) (Sink, error) {
s := new(NatsSink) s := new(NatsSink)
s.name = fmt.Sprintf("NatsSink(%s)", name) s.name = fmt.Sprintf("NatsSink(%s)", name)
s.flushDelay = 10 * time.Second
if len(config) > 0 { if len(config) > 0 {
err := json.Unmarshal(config, &s.config) err := json.Unmarshal(config, &s.config)
if err != nil { if err != nil {
@ -94,7 +112,7 @@ func NewNatsSink(name string, config json.RawMessage) (Sink, error) {
} }
if len(s.config.Host) == 0 || if len(s.config.Host) == 0 ||
len(s.config.Port) == 0 || len(s.config.Port) == 0 ||
len(s.config.Database) == 0 { len(s.config.Subject) == 0 {
return nil, errors.New("not all configuration variables set required by NatsSink") return nil, errors.New("not all configuration variables set required by NatsSink")
} }
// Create lookup map to use meta infos as tags in the output metric // Create lookup map to use meta infos as tags in the output metric
@ -112,5 +130,15 @@ func NewNatsSink(name string, config json.RawMessage) (Sink, error) {
if err := s.connect(); err != nil { if err := s.connect(); err != nil {
return nil, fmt.Errorf("unable to connect: %v", err) return nil, fmt.Errorf("unable to connect: %v", err)
} }
s.flushTimer = nil
if len(s.config.FlushDelay) != 0 {
var err error
s.flushDelay, err = time.ParseDuration(s.config.FlushDelay)
if err != nil {
return nil, err
}
}
return s, nil return s, nil
} }