From 0ca6d1a7944275be98021df3e84c4de2b92de1b5 Mon Sep 17 00:00:00 2001 From: Holger Obermaier <40787752+ho-ob@users.noreply.github.com> Date: Tue, 21 Jun 2022 07:59:24 +0200 Subject: [PATCH] Improved dropping of metrics failed to send --- sinks/influxSink.go | 60 ++++++++++++++++++++++++++++++++++----------- 1 file changed, 46 insertions(+), 14 deletions(-) diff --git a/sinks/influxSink.go b/sinks/influxSink.go index 78dda56..dd3d5a3 100644 --- a/sinks/influxSink.go +++ b/sinks/influxSink.go @@ -33,10 +33,12 @@ type InfluxSink struct { BatchSize int `json:"batch_size,omitempty"` // Interval, in which is buffer flushed if it has not been already written (by reaching batch size). Default 1s FlushInterval string `json:"flush_delay,omitempty"` + RetryInterval string `json:"retry_delay,omitempty"` } batch []*write.Point flushTimer *time.Timer flushDelay time.Duration + retryDelay time.Duration lock sync.Mutex // Flush() runs in another goroutine, so this lock has to protect the buffer } @@ -92,6 +94,8 @@ func (s *InfluxSink) connect() error { } func (s *InfluxSink) Write(m lp.CCMetric) error { + // Lock access to batch slice + s.lock.Lock() if len(s.batch) == 0 && s.flushDelay != 0 { // This is the first write since the last flush, start the flushTimer! @@ -109,22 +113,34 @@ func (s *InfluxSink) Write(m lp.CCMetric) error { }) } + // batch slice full, dropping oldest metric + // e.g. when previous flushes failed and batch slice was not cleared + if len(s.batch) == s.config.BatchSize { + newSize := len(s.batch) - 1 + for i := 0; i < newSize; i++ { + s.batch[i] = s.batch[i+1] + } + s.batch[newSize] = nil + s.batch = s.batch[:newSize] + cclog.ComponentError(s.name, "Batch slice full, dropping oldest metric") + } + // Append metric to batch slice p := m.ToPoint(s.meta_as_tags) - s.lock.Lock() s.batch = append(s.batch, p) - s.lock.Unlock() // Flush synchronously if "flush_delay" is zero - if s.flushDelay == 0 { - return s.Flush() - } - + // or // Flush if batch size is reached - if len(s.batch) == s.config.BatchSize { + if s.flushDelay == 0 || + len(s.batch) == s.config.BatchSize { + // Unlock access to batch slice + s.lock.Unlock() return s.Flush() } + // Unlock access to batch slice + s.lock.Unlock() return nil } @@ -142,20 +158,27 @@ func (s *InfluxSink) Flush() error { // Send metrics from batch slice err := s.writeApi.WritePoint(context.Background(), s.batch...) + if err != nil { + + // Setup timer to retry flush + time.AfterFunc( + s.retryDelay, + func() { + if err := s.Flush(); err != nil { + cclog.ComponentError(s.name, "flush retry failed:", err.Error()) + } + }) + + cclog.ComponentError(s.name, "flush failed:", err.Error()) + return err + } // Clear batch slice - // (when sending the metrics failed, metrics get dropped and are lost) for i := range s.batch { s.batch[i] = nil } s.batch = s.batch[:0] - // Report errors for sending metrics - if err != nil { - cclog.ComponentError(s.name, "flush failed:", err.Error()) - return err - } - return nil } @@ -174,6 +197,7 @@ func NewInfluxSink(name string, config json.RawMessage) (Sink, error) { // Set config default values s.config.BatchSize = 100 s.config.FlushInterval = "1s" + s.config.RetryInterval = "5s" // Read config if len(config) > 0 { @@ -213,6 +237,14 @@ func NewInfluxSink(name string, config json.RawMessage) (Sink, error) { } } + // Configure flush delay duration + if len(s.config.RetryInterval) > 0 { + t, err := time.ParseDuration(s.config.RetryInterval) + if err == nil { + s.retryDelay = t + } + } + // allocate batch slice s.batch = make([]*write.Point, 0, s.config.BatchSize)