Use mutex to ensure only on flush timer is running

This commit is contained in:
Holger Obermaier 2022-06-24 09:08:20 +02:00
parent 9ccc5a6ca7
commit 04819d9db2

View File

@ -41,10 +41,11 @@ type InfluxSink struct {
// Default: 100 // Default: 100
DropRate int `json:"drop_rate,omitempty"` DropRate int `json:"drop_rate,omitempty"`
} }
batch []*write.Point batch []*write.Point
flushTimer *time.Timer flushTimer *time.Timer
flushDelay time.Duration flushDelay time.Duration
batchMutex sync.Mutex // Flush() runs in another goroutine, so this lock has to protect the buffer batchMutex sync.Mutex // Flush() runs in another goroutine, so this lock has to protect the buffer
flushTimerMutex sync.Mutex // Ensure only one flush timer is running
} }
// connect connects to the InfluxDB server // connect connects to the InfluxDB server
@ -103,16 +104,17 @@ func (s *InfluxSink) connect() error {
func (s *InfluxSink) Write(m lp.CCMetric) error { func (s *InfluxSink) Write(m lp.CCMetric) error {
if s.flushDelay != 0 && s.flushTimer == nil { if s.flushDelay != 0 && s.flushTimerMutex.TryLock() {
// Run a batched flush for all metrics that arrived in the last flush delay interval // Run a batched flush for all metrics that arrived in the last flush delay interval
cclog.ComponentDebug(s.name, "Starting new flush timer")
s.flushTimer = time.AfterFunc( s.flushTimer = time.AfterFunc(
s.flushDelay, s.flushDelay,
func() { func() {
err := s.Flush() defer s.flushTimerMutex.Unlock()
if err != nil { cclog.ComponentDebug(s.name, "Starting flush in flush timer")
if err := s.Flush(); err != nil {
cclog.ComponentError(s.name, "Flush timer: flush failed:", err) cclog.ComponentError(s.name, "Flush timer: flush failed:", err)
} }
s.flushTimer = nil
}) })
} }
@ -131,7 +133,7 @@ func (s *InfluxSink) Write(m lp.CCMetric) error {
s.batch[i] = nil s.batch[i] = nil
} }
s.batch = s.batch[:newSize] s.batch = s.batch[:newSize]
cclog.ComponentError(s.name, "Batch slice full, dropping ", s.config.DropRate, "oldest metric(s)") cclog.ComponentError(s.name, "Batch slice full, dropping", s.config.DropRate, "oldest metric(s)")
} }
// Append metric to batch slice // Append metric to batch slice