From dd40c852ca2492a43df368267d49570250114c45 Mon Sep 17 00:00:00 2001 From: Holger Obermaier <40787752+ho-ob@users.noreply.github.com> Date: Mon, 9 Oct 2023 11:01:01 +0200 Subject: [PATCH] Stop flush timer, when immediatelly flushing --- sinks/influxSink.go | 50 ++++++++++++++++++++++++++++----------------- 1 file changed, 31 insertions(+), 19 deletions(-) diff --git a/sinks/influxSink.go b/sinks/influxSink.go index e9b2580..83b9dbb 100644 --- a/sinks/influxSink.go +++ b/sinks/influxSink.go @@ -87,7 +87,7 @@ func (s *InfluxSink) connect() error { } else { auth = fmt.Sprintf("%s:%s", s.config.User, s.config.Password) } - cclog.ComponentDebug(s.name, + cclog.ComponentDebug(s.name, "connect():", "Using URI='"+uri+"'", "Org='"+s.config.Organization+"'", "Bucket='"+s.config.Database+"'") @@ -99,22 +99,22 @@ func (s *InfluxSink) connect() error { if len(s.config.InfluxMaxRetryInterval) > 0 { if t, err := time.ParseDuration(s.config.InfluxMaxRetryInterval); err == nil { influxMaxRetryInterval := uint(t.Milliseconds()) - cclog.ComponentDebug(s.name, "Influx MaxRetryInterval", s.config.InfluxMaxRetryInterval) + cclog.ComponentDebug(s.name, "connect():", "Influx MaxRetryInterval", s.config.InfluxMaxRetryInterval) clientOptions.SetMaxRetryInterval(influxMaxRetryInterval) } else { - cclog.ComponentError(s.name, "Failed to parse duration for Influx MaxRetryInterval: ", s.config.InfluxMaxRetryInterval) + cclog.ComponentError(s.name, "connect():", "Failed to parse duration for Influx MaxRetryInterval: ", s.config.InfluxMaxRetryInterval) } } // Set the base for the exponential retry delay if s.config.InfluxExponentialBase != 0 { - cclog.ComponentDebug(s.name, "Influx Exponential Base", s.config.InfluxExponentialBase) + cclog.ComponentDebug(s.name, "connect():", "Influx Exponential Base", s.config.InfluxExponentialBase) clientOptions.SetExponentialBase(s.config.InfluxExponentialBase) } // Set maximum count of retry attempts of failed writes if s.config.InfluxMaxRetries != 0 { - cclog.ComponentDebug(s.name, "Influx Max Retries", s.config.InfluxMaxRetries) + cclog.ComponentDebug(s.name, "connect():", "Influx Max Retries", s.config.InfluxMaxRetries) clientOptions.SetMaxRetries(s.config.InfluxMaxRetries) } @@ -122,10 +122,10 @@ func (s *InfluxSink) connect() error { if len(s.config.InfluxMaxRetryTime) > 0 { if t, err := time.ParseDuration(s.config.InfluxMaxRetryTime); err == nil { influxMaxRetryTime := uint(t.Milliseconds()) - cclog.ComponentDebug(s.name, "MaxRetryTime", s.config.InfluxMaxRetryTime) + cclog.ComponentDebug(s.name, "connect():", "MaxRetryTime", s.config.InfluxMaxRetryTime) clientOptions.SetMaxRetryTime(influxMaxRetryTime) } else { - cclog.ComponentError(s.name, "Failed to parse duration for Influx MaxRetryInterval: ", s.config.InfluxMaxRetryInterval) + cclog.ComponentError(s.name, "connect():", "Failed to parse duration for Influx MaxRetryInterval: ", s.config.InfluxMaxRetryInterval) } } @@ -163,12 +163,12 @@ func (s *InfluxSink) Write(m lp.CCMetric) error { if s.flushTimer != nil { // Restarting existing flush timer - cclog.ComponentDebug(s.name, "Restarting flush timer") + cclog.ComponentDebug(s.name, "Write():", "Restarting flush timer") s.flushTimer.Reset(s.flushDelay) } else { // Creating and starting flush timer - cclog.ComponentDebug(s.name, "Starting new flush timer") + cclog.ComponentDebug(s.name, "Write():", "Starting new flush timer") s.flushTimer = time.AfterFunc( s.flushDelay, func() { @@ -181,8 +181,9 @@ func (s *InfluxSink) Write(m lp.CCMetric) error { } } - // Lock access to batch slice + // Protect access to batch slice s.batchMutex.Lock() + defer s.batchMutex.Unlock() // batch slice full, dropping oldest metric(s) // e.g. when previous flushes failed and batch slice was not cleared @@ -196,7 +197,7 @@ func (s *InfluxSink) Write(m lp.CCMetric) error { s.batch[i] = "" } s.batch = s.batch[:newSize] - cclog.ComponentError(s.name, "Batch slice full, dropping", s.config.DropRate, "oldest metric(s)") + cclog.ComponentError(s.name, "Write():", "Batch slice full, dropping", s.config.DropRate, "oldest metric(s)") } // Encode measurement name @@ -257,7 +258,7 @@ func (s *InfluxSink) Write(m lp.CCMetric) error { // Check that encoding worked if err := s.encoder.Err(); err != nil { - cclog.ComponentError(s.name, "Write(): Encoding failed:", err) + cclog.ComponentError(s.name, "Write():", "Encoding failed:", err) return err } @@ -273,13 +274,17 @@ func (s *InfluxSink) Write(m lp.CCMetric) error { // Flush if batch size is reached if s.flushDelay == 0 || len(s.batch) == s.config.BatchSize { - // Unlock access to batch slice - s.batchMutex.Unlock() + + // Stop flush timer + if s.flushTimer != nil { + if ok := s.flushTimer.Stop(); ok { + s.flushTimerMutex.Unlock() + } + } + return s.Flush() } - // Unlock access to batch slice - s.batchMutex.Unlock() return nil } @@ -299,7 +304,7 @@ func (s *InfluxSink) Flush() error { // Send metrics from batch slice err := s.writeApi.WriteRecord(context.Background(), strings.Join(s.batch, "")) if err != nil { - cclog.ComponentError(s.name, "Flush(): Flush of", len(s.batch), "metrics failed:", err) + cclog.ComponentError(s.name, "Flush():", "Flush of", len(s.batch), "metrics failed:", err) return err } @@ -314,11 +319,18 @@ func (s *InfluxSink) Flush() error { func (s *InfluxSink) Close() { cclog.ComponentDebug(s.name, "Closing InfluxDB connection") - s.flushTimer.Stop() + + // Stop existing timer and immediately flush + if s.flushTimer != nil { + if ok := s.flushTimer.Stop(); ok { + s.flushTimerMutex.Unlock() + } + } s.Flush() if err := s.Flush(); err != nil { - cclog.ComponentError(s.name, "Close(): Flush failed:", err) + cclog.ComponentError(s.name, "Close():", "Flush failed:", err) } + s.client.Close() }