Stop flush timer, when immediatelly flushing

This commit is contained in:
Holger Obermaier 2023-10-09 11:01:01 +02:00
parent 39ae211530
commit dd40c852ca

View File

@ -87,7 +87,7 @@ func (s *InfluxSink) connect() error {
} else { } else {
auth = fmt.Sprintf("%s:%s", s.config.User, s.config.Password) auth = fmt.Sprintf("%s:%s", s.config.User, s.config.Password)
} }
cclog.ComponentDebug(s.name, cclog.ComponentDebug(s.name, "connect():",
"Using URI='"+uri+"'", "Using URI='"+uri+"'",
"Org='"+s.config.Organization+"'", "Org='"+s.config.Organization+"'",
"Bucket='"+s.config.Database+"'") "Bucket='"+s.config.Database+"'")
@ -99,22 +99,22 @@ func (s *InfluxSink) connect() error {
if len(s.config.InfluxMaxRetryInterval) > 0 { if len(s.config.InfluxMaxRetryInterval) > 0 {
if t, err := time.ParseDuration(s.config.InfluxMaxRetryInterval); err == nil { if t, err := time.ParseDuration(s.config.InfluxMaxRetryInterval); err == nil {
influxMaxRetryInterval := uint(t.Milliseconds()) influxMaxRetryInterval := uint(t.Milliseconds())
cclog.ComponentDebug(s.name, "Influx MaxRetryInterval", s.config.InfluxMaxRetryInterval) cclog.ComponentDebug(s.name, "connect():", "Influx MaxRetryInterval", s.config.InfluxMaxRetryInterval)
clientOptions.SetMaxRetryInterval(influxMaxRetryInterval) clientOptions.SetMaxRetryInterval(influxMaxRetryInterval)
} else { } else {
cclog.ComponentError(s.name, "Failed to parse duration for Influx MaxRetryInterval: ", s.config.InfluxMaxRetryInterval) cclog.ComponentError(s.name, "connect():", "Failed to parse duration for Influx MaxRetryInterval: ", s.config.InfluxMaxRetryInterval)
} }
} }
// Set the base for the exponential retry delay // Set the base for the exponential retry delay
if s.config.InfluxExponentialBase != 0 { if s.config.InfluxExponentialBase != 0 {
cclog.ComponentDebug(s.name, "Influx Exponential Base", s.config.InfluxExponentialBase) cclog.ComponentDebug(s.name, "connect():", "Influx Exponential Base", s.config.InfluxExponentialBase)
clientOptions.SetExponentialBase(s.config.InfluxExponentialBase) clientOptions.SetExponentialBase(s.config.InfluxExponentialBase)
} }
// Set maximum count of retry attempts of failed writes // Set maximum count of retry attempts of failed writes
if s.config.InfluxMaxRetries != 0 { if s.config.InfluxMaxRetries != 0 {
cclog.ComponentDebug(s.name, "Influx Max Retries", s.config.InfluxMaxRetries) cclog.ComponentDebug(s.name, "connect():", "Influx Max Retries", s.config.InfluxMaxRetries)
clientOptions.SetMaxRetries(s.config.InfluxMaxRetries) clientOptions.SetMaxRetries(s.config.InfluxMaxRetries)
} }
@ -122,10 +122,10 @@ func (s *InfluxSink) connect() error {
if len(s.config.InfluxMaxRetryTime) > 0 { if len(s.config.InfluxMaxRetryTime) > 0 {
if t, err := time.ParseDuration(s.config.InfluxMaxRetryTime); err == nil { if t, err := time.ParseDuration(s.config.InfluxMaxRetryTime); err == nil {
influxMaxRetryTime := uint(t.Milliseconds()) influxMaxRetryTime := uint(t.Milliseconds())
cclog.ComponentDebug(s.name, "MaxRetryTime", s.config.InfluxMaxRetryTime) cclog.ComponentDebug(s.name, "connect():", "MaxRetryTime", s.config.InfluxMaxRetryTime)
clientOptions.SetMaxRetryTime(influxMaxRetryTime) clientOptions.SetMaxRetryTime(influxMaxRetryTime)
} else { } else {
cclog.ComponentError(s.name, "Failed to parse duration for Influx MaxRetryInterval: ", s.config.InfluxMaxRetryInterval) cclog.ComponentError(s.name, "connect():", "Failed to parse duration for Influx MaxRetryInterval: ", s.config.InfluxMaxRetryInterval)
} }
} }
@ -163,12 +163,12 @@ func (s *InfluxSink) Write(m lp.CCMetric) error {
if s.flushTimer != nil { if s.flushTimer != nil {
// Restarting existing flush timer // Restarting existing flush timer
cclog.ComponentDebug(s.name, "Restarting flush timer") cclog.ComponentDebug(s.name, "Write():", "Restarting flush timer")
s.flushTimer.Reset(s.flushDelay) s.flushTimer.Reset(s.flushDelay)
} else { } else {
// Creating and starting flush timer // Creating and starting flush timer
cclog.ComponentDebug(s.name, "Starting new flush timer") cclog.ComponentDebug(s.name, "Write():", "Starting new flush timer")
s.flushTimer = time.AfterFunc( s.flushTimer = time.AfterFunc(
s.flushDelay, s.flushDelay,
func() { func() {
@ -181,8 +181,9 @@ func (s *InfluxSink) Write(m lp.CCMetric) error {
} }
} }
// Lock access to batch slice // Protect access to batch slice
s.batchMutex.Lock() s.batchMutex.Lock()
defer s.batchMutex.Unlock()
// batch slice full, dropping oldest metric(s) // batch slice full, dropping oldest metric(s)
// e.g. when previous flushes failed and batch slice was not cleared // e.g. when previous flushes failed and batch slice was not cleared
@ -196,7 +197,7 @@ func (s *InfluxSink) Write(m lp.CCMetric) error {
s.batch[i] = "" s.batch[i] = ""
} }
s.batch = s.batch[:newSize] s.batch = s.batch[:newSize]
cclog.ComponentError(s.name, "Batch slice full, dropping", s.config.DropRate, "oldest metric(s)") cclog.ComponentError(s.name, "Write():", "Batch slice full, dropping", s.config.DropRate, "oldest metric(s)")
} }
// Encode measurement name // Encode measurement name
@ -257,7 +258,7 @@ func (s *InfluxSink) Write(m lp.CCMetric) error {
// Check that encoding worked // Check that encoding worked
if err := s.encoder.Err(); err != nil { if err := s.encoder.Err(); err != nil {
cclog.ComponentError(s.name, "Write(): Encoding failed:", err) cclog.ComponentError(s.name, "Write():", "Encoding failed:", err)
return err return err
} }
@ -273,13 +274,17 @@ func (s *InfluxSink) Write(m lp.CCMetric) error {
// Flush if batch size is reached // Flush if batch size is reached
if s.flushDelay == 0 || if s.flushDelay == 0 ||
len(s.batch) == s.config.BatchSize { len(s.batch) == s.config.BatchSize {
// Unlock access to batch slice
s.batchMutex.Unlock() // Stop flush timer
if s.flushTimer != nil {
if ok := s.flushTimer.Stop(); ok {
s.flushTimerMutex.Unlock()
}
}
return s.Flush() return s.Flush()
} }
// Unlock access to batch slice
s.batchMutex.Unlock()
return nil return nil
} }
@ -299,7 +304,7 @@ func (s *InfluxSink) Flush() error {
// Send metrics from batch slice // Send metrics from batch slice
err := s.writeApi.WriteRecord(context.Background(), strings.Join(s.batch, "")) err := s.writeApi.WriteRecord(context.Background(), strings.Join(s.batch, ""))
if err != nil { if err != nil {
cclog.ComponentError(s.name, "Flush(): Flush of", len(s.batch), "metrics failed:", err) cclog.ComponentError(s.name, "Flush():", "Flush of", len(s.batch), "metrics failed:", err)
return err return err
} }
@ -314,11 +319,18 @@ func (s *InfluxSink) Flush() error {
func (s *InfluxSink) Close() { func (s *InfluxSink) Close() {
cclog.ComponentDebug(s.name, "Closing InfluxDB connection") cclog.ComponentDebug(s.name, "Closing InfluxDB connection")
s.flushTimer.Stop()
// Stop existing timer and immediately flush
if s.flushTimer != nil {
if ok := s.flushTimer.Stop(); ok {
s.flushTimerMutex.Unlock()
}
}
s.Flush() s.Flush()
if err := s.Flush(); err != nil { if err := s.Flush(); err != nil {
cclog.ComponentError(s.name, "Close(): Flush failed:", err) cclog.ComponentError(s.name, "Close():", "Flush failed:", err)
} }
s.client.Close() s.client.Close()
} }