Reuse flush timer

This commit is contained in:
Holger Obermaier 2023-09-26 15:04:39 +02:00
parent 19ec6d06db
commit 1e606a1aa1

View File

@ -105,17 +105,28 @@ func (s *InfluxSink) connect() error {
func (s *InfluxSink) Write(m lp.CCMetric) error { func (s *InfluxSink) Write(m lp.CCMetric) error {
if s.flushDelay != 0 && s.flushTimerMutex.TryLock() { if s.flushDelay != 0 && s.flushTimerMutex.TryLock() {
// Run a batched flush for all metrics that arrived in the last flush delay interval
cclog.ComponentDebug(s.name, "Starting new flush timer") // Setup flush timer when flush delay is configured
s.flushTimer = time.AfterFunc( // and no other timer is already running
s.flushDelay, if s.flushTimer != nil {
func() {
defer s.flushTimerMutex.Unlock() // Restarting existing flush timer
cclog.ComponentDebug(s.name, "Starting flush in flush timer") cclog.ComponentDebug(s.name, "Restarting flush timer")
if err := s.Flush(); err != nil { s.flushTimer.Reset(s.flushDelay)
cclog.ComponentError(s.name, "Flush timer: flush failed:", err) } else {
}
}) // Creating and starting flush timer
cclog.ComponentDebug(s.name, "Starting new flush timer")
s.flushTimer = time.AfterFunc(
s.flushDelay,
func() {
defer s.flushTimerMutex.Unlock()
cclog.ComponentDebug(s.name, "Starting flush in flush timer")
if err := s.Flush(); err != nil {
cclog.ComponentError(s.name, "Flush timer: flush failed:", err)
}
})
}
} }
// Lock access to batch slice // Lock access to batch slice