Improved dropping of metrics failed to send

This commit is contained in:
Holger Obermaier 2022-06-21 07:59:24 +02:00
parent 580d21d8bb
commit 0ca6d1a794

View File

@ -33,10 +33,12 @@ type InfluxSink struct {
BatchSize int `json:"batch_size,omitempty"`
// Interval, in which is buffer flushed if it has not been already written (by reaching batch size). Default 1s
FlushInterval string `json:"flush_delay,omitempty"`
RetryInterval string `json:"retry_delay,omitempty"`
}
batch []*write.Point
flushTimer *time.Timer
flushDelay time.Duration
retryDelay time.Duration
lock sync.Mutex // Flush() runs in another goroutine, so this lock has to protect the buffer
}
@ -92,6 +94,8 @@ func (s *InfluxSink) connect() error {
}
func (s *InfluxSink) Write(m lp.CCMetric) error {
// Lock access to batch slice
s.lock.Lock()
if len(s.batch) == 0 && s.flushDelay != 0 {
// This is the first write since the last flush, start the flushTimer!
@ -109,22 +113,34 @@ func (s *InfluxSink) Write(m lp.CCMetric) error {
})
}
// batch slice full, dropping oldest metric
// e.g. when previous flushes failed and batch slice was not cleared
if len(s.batch) == s.config.BatchSize {
newSize := len(s.batch) - 1
for i := 0; i < newSize; i++ {
s.batch[i] = s.batch[i+1]
}
s.batch[newSize] = nil
s.batch = s.batch[:newSize]
cclog.ComponentError(s.name, "Batch slice full, dropping oldest metric")
}
// Append metric to batch slice
p := m.ToPoint(s.meta_as_tags)
s.lock.Lock()
s.batch = append(s.batch, p)
s.lock.Unlock()
// Flush synchronously if "flush_delay" is zero
if s.flushDelay == 0 {
return s.Flush()
}
// or
// Flush if batch size is reached
if len(s.batch) == s.config.BatchSize {
if s.flushDelay == 0 ||
len(s.batch) == s.config.BatchSize {
// Unlock access to batch slice
s.lock.Unlock()
return s.Flush()
}
// Unlock access to batch slice
s.lock.Unlock()
return nil
}
@ -142,20 +158,27 @@ func (s *InfluxSink) Flush() error {
// Send metrics from batch slice
err := s.writeApi.WritePoint(context.Background(), s.batch...)
if err != nil {
// Setup timer to retry flush
time.AfterFunc(
s.retryDelay,
func() {
if err := s.Flush(); err != nil {
cclog.ComponentError(s.name, "flush retry failed:", err.Error())
}
})
cclog.ComponentError(s.name, "flush failed:", err.Error())
return err
}
// Clear batch slice
// (when sending the metrics failed, metrics get dropped and are lost)
for i := range s.batch {
s.batch[i] = nil
}
s.batch = s.batch[:0]
// Report errors for sending metrics
if err != nil {
cclog.ComponentError(s.name, "flush failed:", err.Error())
return err
}
return nil
}
@ -174,6 +197,7 @@ func NewInfluxSink(name string, config json.RawMessage) (Sink, error) {
// Set config default values
s.config.BatchSize = 100
s.config.FlushInterval = "1s"
s.config.RetryInterval = "5s"
// Read config
if len(config) > 0 {
@ -213,6 +237,14 @@ func NewInfluxSink(name string, config json.RawMessage) (Sink, error) {
}
}
// Configure flush delay duration
if len(s.config.RetryInterval) > 0 {
t, err := time.ParseDuration(s.config.RetryInterval)
if err == nil {
s.retryDelay = t
}
}
// allocate batch slice
s.batch = make([]*write.Point, 0, s.config.BatchSize)