Allow only one timer at a time

This commit is contained in:
Holger Obermaier 2022-06-23 21:53:02 +02:00
parent b7dcbaebcf
commit 9ccc5a6ca7

View File

@ -37,9 +37,6 @@ type InfluxSink struct {
// the metrics are sent without further delay. // the metrics are sent without further delay.
// Default: 1s // Default: 1s
FlushInterval string `json:"flush_delay,omitempty"` FlushInterval string `json:"flush_delay,omitempty"`
// Time interval after which sending of the metrics is retried after a failed sending.
// Default: 5s
RetryInterval string `json:"retry_delay,omitempty"`
// Number of metrics that are dropped when buffer is full // Number of metrics that are dropped when buffer is full
// Default: 100 // Default: 100
DropRate int `json:"drop_rate,omitempty"` DropRate int `json:"drop_rate,omitempty"`
@ -47,7 +44,6 @@ type InfluxSink struct {
batch []*write.Point batch []*write.Point
flushTimer *time.Timer flushTimer *time.Timer
flushDelay time.Duration flushDelay time.Duration
retryDelay time.Duration
batchMutex sync.Mutex // Flush() runs in another goroutine, so this lock has to protect the buffer batchMutex sync.Mutex // Flush() runs in another goroutine, so this lock has to protect the buffer
} }
@ -106,25 +102,23 @@ func (s *InfluxSink) connect() error {
} }
func (s *InfluxSink) Write(m lp.CCMetric) error { func (s *InfluxSink) Write(m lp.CCMetric) error {
// Lock access to batch slice
s.batchMutex.Lock()
if len(s.batch) == 0 && s.flushDelay != 0 { if s.flushDelay != 0 && s.flushTimer == nil {
// This is the first write since the last flush, start the flushTimer! // Run a batched flush for all metrics that arrived in the last flush delay interval
if s.flushTimer != nil && s.flushTimer.Stop() {
cclog.ComponentDebug(s.name, "unexpected: the flushTimer was already running?")
}
// Run a batched flush for all lines that have arrived in the last flush delay interval
s.flushTimer = time.AfterFunc( s.flushTimer = time.AfterFunc(
s.flushDelay, s.flushDelay,
func() { func() {
if err := s.Flush(); err != nil { err := s.Flush()
if err != nil {
cclog.ComponentError(s.name, "Flush timer: flush failed:", err) cclog.ComponentError(s.name, "Flush timer: flush failed:", err)
} }
s.flushTimer = nil
}) })
} }
// Lock access to batch slice
s.batchMutex.Lock()
// batch slice full, dropping oldest metric(s) // batch slice full, dropping oldest metric(s)
// e.g. when previous flushes failed and batch slice was not cleared // e.g. when previous flushes failed and batch slice was not cleared
if len(s.batch) == s.config.BatchSize { if len(s.batch) == s.config.BatchSize {
@ -175,18 +169,7 @@ func (s *InfluxSink) Flush() error {
// Send metrics from batch slice // Send metrics from batch slice
err := s.writeApi.WritePoint(context.Background(), s.batch...) err := s.writeApi.WritePoint(context.Background(), s.batch...)
if err != nil { if err != nil {
cclog.ComponentError(s.name, "Flush(): Flush of", len(s.batch), "metrics failed:", err) cclog.ComponentError(s.name, "Flush(): Flush of", len(s.batch), "metrics failed:", err)
// Setup timer to retry flush
time.AfterFunc(
s.retryDelay,
func() {
if err := s.Flush(); err != nil {
cclog.ComponentError(s.name, "Retry timer: Flush failed:", err)
}
})
return err return err
} }
@ -217,7 +200,6 @@ func NewInfluxSink(name string, config json.RawMessage) (Sink, error) {
// Set config default values // Set config default values
s.config.BatchSize = 1000 s.config.BatchSize = 1000
s.config.FlushInterval = "1s" s.config.FlushInterval = "1s"
s.config.RetryInterval = "5s"
s.config.DropRate = 100 s.config.DropRate = 100
// Read config // Read config
@ -258,14 +240,6 @@ func NewInfluxSink(name string, config json.RawMessage) (Sink, error) {
} }
} }
// Configure flush delay duration
if len(s.config.RetryInterval) > 0 {
t, err := time.ParseDuration(s.config.RetryInterval)
if err == nil {
s.retryDelay = t
}
}
if !(s.config.BatchSize > 0) { if !(s.config.BatchSize > 0) {
return s, fmt.Errorf("batch_size=%d in InfluxDB config must be > 0", s.config.BatchSize) return s, fmt.Errorf("batch_size=%d in InfluxDB config must be > 0", s.config.BatchSize)
} }