InfluxAsyncSink: Add custom flush mechanism

This commit is contained in:
Thomas Roehl 2022-04-04 02:56:23 +02:00
parent ecdb4c1bcf
commit 69f7c19659

View File

@ -32,6 +32,7 @@ type InfluxAsyncSinkConfig struct {
InfluxExponentialBase uint `json:"retry_exponential_base,omitempty"`
InfluxMaxRetries uint `json:"max_retries,omitempty"`
InfluxMaxRetryTime string `json:"max_retry_time,omitempty"`
CustomFlushInterval string `json:"custom_flush_interval,omitempty"`
}
type InfluxAsyncSink struct {
@ -42,6 +43,8 @@ type InfluxAsyncSink struct {
config InfluxAsyncSinkConfig
influxRetryInterval uint
influxMaxRetryTime uint
customFlushInterval time.Duration
flushTimer *time.Timer
}
func (s *InfluxAsyncSink) connect() error {
@ -102,6 +105,15 @@ func (s *InfluxAsyncSink) connect() error {
}
func (s *InfluxAsyncSink) Write(m lp.CCMetric) error {
if s.customFlushInterval != 0 && s.flushTimer == nil {
// Run a batched flush for all lines that have arrived in the defined interval
s.flushTimer = time.AfterFunc(s.customFlushInterval, func() {
if err := s.Flush(); err != nil {
cclog.ComponentError(s.name, "flush failed:", err.Error())
}
})
}
s.writeApi.WritePoint(
m.ToPoint(s.meta_as_tags),
)
@ -109,7 +121,11 @@ func (s *InfluxAsyncSink) Write(m lp.CCMetric) error {
}
func (s *InfluxAsyncSink) Flush() error {
cclog.ComponentDebug(s.name, "Flushing")
s.writeApi.Flush()
if s.customFlushInterval != 0 && s.flushTimer != nil {
s.flushTimer = nil
}
return nil
}
@ -132,6 +148,8 @@ func NewInfluxAsyncSink(name string, config json.RawMessage) (Sink, error) {
s.config.InfluxMaxRetries = 0
s.config.InfluxExponentialBase = 0
s.config.FlushInterval = 0
s.config.CustomFlushInterval = ""
s.customFlushInterval = time.Duration(0)
// Default retry intervals (in seconds)
// 1 2
@ -183,6 +201,15 @@ func NewInfluxAsyncSink(name string, config json.RawMessage) (Sink, error) {
s.influxRetryInterval = toUint(s.config.InfluxRetryInterval, s.influxRetryInterval)
s.influxMaxRetryTime = toUint(s.config.InfluxMaxRetryTime, s.influxMaxRetryTime)
// Use a own timer for calling Flush()
if len(s.config.CustomFlushInterval) > 0 {
t, err := time.ParseDuration(s.config.CustomFlushInterval)
if err != nil {
return nil, fmt.Errorf("invalid duration in 'custom_flush_interval': %v", err)
}
s.customFlushInterval = t
}
// Connect to InfluxDB server
if err := s.connect(); err != nil {
return nil, fmt.Errorf("unable to connect: %v", err)