From 9ccc5a6ca7f6cdcc9e73e2cbdb6252add5cbf54f Mon Sep 17 00:00:00 2001 From: Holger Obermaier <40787752+ho-ob@users.noreply.github.com> Date: Thu, 23 Jun 2022 21:53:02 +0200 Subject: [PATCH] Allow only one timer at a time --- sinks/influxSink.go | 42 ++++++++---------------------------------- 1 file changed, 8 insertions(+), 34 deletions(-) diff --git a/sinks/influxSink.go b/sinks/influxSink.go index 3b1382b..0668f27 100644 --- a/sinks/influxSink.go +++ b/sinks/influxSink.go @@ -37,9 +37,6 @@ type InfluxSink struct { // the metrics are sent without further delay. // Default: 1s FlushInterval string `json:"flush_delay,omitempty"` - // Time interval after which sending of the metrics is retried after a failed sending. - // Default: 5s - RetryInterval string `json:"retry_delay,omitempty"` // Number of metrics that are dropped when buffer is full // Default: 100 DropRate int `json:"drop_rate,omitempty"` @@ -47,7 +44,6 @@ type InfluxSink struct { batch []*write.Point flushTimer *time.Timer flushDelay time.Duration - retryDelay time.Duration batchMutex sync.Mutex // Flush() runs in another goroutine, so this lock has to protect the buffer } @@ -106,25 +102,23 @@ func (s *InfluxSink) connect() error { } func (s *InfluxSink) Write(m lp.CCMetric) error { - // Lock access to batch slice - s.batchMutex.Lock() - if len(s.batch) == 0 && s.flushDelay != 0 { - // This is the first write since the last flush, start the flushTimer! - if s.flushTimer != nil && s.flushTimer.Stop() { - cclog.ComponentDebug(s.name, "unexpected: the flushTimer was already running?") - } - - // Run a batched flush for all lines that have arrived in the last flush delay interval + if s.flushDelay != 0 && s.flushTimer == nil { + // Run a batched flush for all metrics that arrived in the last flush delay interval s.flushTimer = time.AfterFunc( s.flushDelay, func() { - if err := s.Flush(); err != nil { + err := s.Flush() + if err != nil { cclog.ComponentError(s.name, "Flush timer: flush failed:", err) } + s.flushTimer = nil }) } + // Lock access to batch slice + s.batchMutex.Lock() + // batch slice full, dropping oldest metric(s) // e.g. when previous flushes failed and batch slice was not cleared if len(s.batch) == s.config.BatchSize { @@ -175,18 +169,7 @@ func (s *InfluxSink) Flush() error { // Send metrics from batch slice err := s.writeApi.WritePoint(context.Background(), s.batch...) if err != nil { - cclog.ComponentError(s.name, "Flush(): Flush of", len(s.batch), "metrics failed:", err) - - // Setup timer to retry flush - time.AfterFunc( - s.retryDelay, - func() { - if err := s.Flush(); err != nil { - cclog.ComponentError(s.name, "Retry timer: Flush failed:", err) - } - }) - return err } @@ -217,7 +200,6 @@ func NewInfluxSink(name string, config json.RawMessage) (Sink, error) { // Set config default values s.config.BatchSize = 1000 s.config.FlushInterval = "1s" - s.config.RetryInterval = "5s" s.config.DropRate = 100 // Read config @@ -258,14 +240,6 @@ func NewInfluxSink(name string, config json.RawMessage) (Sink, error) { } } - // Configure flush delay duration - if len(s.config.RetryInterval) > 0 { - t, err := time.ParseDuration(s.config.RetryInterval) - if err == nil { - s.retryDelay = t - } - } - if !(s.config.BatchSize > 0) { return s, fmt.Errorf("batch_size=%d in InfluxDB config must be > 0", s.config.BatchSize) }