diff --git a/sinks/influxSink.go b/sinks/influxSink.go index 0668f27..90f8da8 100644 --- a/sinks/influxSink.go +++ b/sinks/influxSink.go @@ -41,10 +41,11 @@ type InfluxSink struct { // Default: 100 DropRate int `json:"drop_rate,omitempty"` } - batch []*write.Point - flushTimer *time.Timer - flushDelay time.Duration - batchMutex sync.Mutex // Flush() runs in another goroutine, so this lock has to protect the buffer + batch []*write.Point + flushTimer *time.Timer + flushDelay time.Duration + batchMutex sync.Mutex // Flush() runs in another goroutine, so this lock has to protect the buffer + flushTimerMutex sync.Mutex // Ensure only one flush timer is running } // connect connects to the InfluxDB server @@ -103,16 +104,17 @@ func (s *InfluxSink) connect() error { func (s *InfluxSink) Write(m lp.CCMetric) error { - if s.flushDelay != 0 && s.flushTimer == nil { + if s.flushDelay != 0 && s.flushTimerMutex.TryLock() { // Run a batched flush for all metrics that arrived in the last flush delay interval + cclog.ComponentDebug(s.name, "Starting new flush timer") s.flushTimer = time.AfterFunc( s.flushDelay, func() { - err := s.Flush() - if err != nil { + defer s.flushTimerMutex.Unlock() + cclog.ComponentDebug(s.name, "Starting flush in flush timer") + if err := s.Flush(); err != nil { cclog.ComponentError(s.name, "Flush timer: flush failed:", err) } - s.flushTimer = nil }) } @@ -131,7 +133,7 @@ func (s *InfluxSink) Write(m lp.CCMetric) error { s.batch[i] = nil } s.batch = s.batch[:newSize] - cclog.ComponentError(s.name, "Batch slice full, dropping ", s.config.DropRate, "oldest metric(s)") + cclog.ComponentError(s.name, "Batch slice full, dropping", s.config.DropRate, "oldest metric(s)") } // Append metric to batch slice