Add batch_size config

This commit is contained in:
Holger Obermaier 2023-10-04 12:37:25 +02:00
parent 0db1cda27f
commit 5aa9603c01
2 changed files with 40 additions and 3 deletions

View File

@ -38,7 +38,13 @@ type HttpSinkConfig struct {
IdleConnTimeout string `json:"idle_connection_timeout,omitempty"` IdleConnTimeout string `json:"idle_connection_timeout,omitempty"`
idleConnTimeout time.Duration idleConnTimeout time.Duration
// Maximum number of points sent to server in single request.
// Default: 1000
BatchSize int `json:"batch_size,omitempty"`
// Batch all writes arriving in during this duration // Batch all writes arriving in during this duration
// If batch size is reached before the end of this interval,
// the metrics are sent without further delay.
// (default '5s', batching can be disabled by setting it to 0) // (default '5s', batching can be disabled by setting it to 0)
FlushDelay string `json:"flush_delay,omitempty"` FlushDelay string `json:"flush_delay,omitempty"`
flushDelay time.Duration flushDelay time.Duration
@ -61,7 +67,8 @@ type HttpSink struct {
extended_tag_list []key_value_pair extended_tag_list []key_value_pair
// Flush() runs in another goroutine and accesses the influx line protocol encoder, // Flush() runs in another goroutine and accesses the influx line protocol encoder,
// so this encoderLock has to protect the encoder // so this encoderLock has to protect the encoder
encoderLock sync.Mutex encoderLock sync.Mutex
encoderRecordCount int
// timer to run Flush() // timer to run Flush()
flushTimer *time.Timer flushTimer *time.Timer
@ -133,6 +140,8 @@ func (s *HttpSink) Write(m lp.CCMetric) error {
// Encode time stamp // Encode time stamp
s.encoder.EndLine(m.Time()) s.encoder.EndLine(m.Time())
s.encoderRecordCount++
// Check for encoder errors // Check for encoder errors
err := s.encoder.Err() err := s.encoder.Err()
@ -149,6 +158,27 @@ func (s *HttpSink) Write(m lp.CCMetric) error {
// Directly flush if no flush delay is configured // Directly flush if no flush delay is configured
return s.Flush() return s.Flush()
} else if s.encoderRecordCount > s.config.BatchSize {
// Flush if batch size limit is reached
// Stop flush timer if possible
if s.flushTimer != nil {
if ok := s.flushTimer.Stop(); ok {
cclog.ComponentDebug(s.name, "Stopped flush timer. Batch size limit reached before flush delay")
s.timerLock.Unlock()
}
}
// Start asynchronous flush operation
go func() {
cclog.ComponentDebug(s.name, "Starting flush triggered by batch size limit")
if err := s.Flush(); err != nil {
cclog.ComponentError(s.name, "Flush triggered by batch size limit: flush failed:", err)
}
}()
return nil
} else if s.timerLock.TryLock() { } else if s.timerLock.TryLock() {
// Setup flush timer when flush delay is configured // Setup flush timer when flush delay is configured
@ -166,9 +196,9 @@ func (s *HttpSink) Write(m lp.CCMetric) error {
s.config.flushDelay, s.config.flushDelay,
func() { func() {
defer s.timerLock.Unlock() defer s.timerLock.Unlock()
cclog.ComponentDebug(s.name, "Starting flush in flush timer") cclog.ComponentDebug(s.name, "Starting flush triggered by flush timer")
if err := s.Flush(); err != nil { if err := s.Flush(); err != nil {
cclog.ComponentError(s.name, "Flush timer: flush failed:", err) cclog.ComponentError(s.name, "Flush triggered by flush timer: flush failed:", err)
} }
}) })
} }
@ -184,6 +214,8 @@ func (s *HttpSink) Flush() error {
buf := slices.Clone(s.encoder.Bytes()) buf := slices.Clone(s.encoder.Bytes())
s.encoder.Reset() s.encoder.Reset()
recordCount := s.encoderRecordCount
s.encoderRecordCount = 0
// Unlock encoder usage // Unlock encoder usage
s.encoderLock.Unlock() s.encoderLock.Unlock()
@ -192,6 +224,8 @@ func (s *HttpSink) Flush() error {
return nil return nil
} }
cclog.ComponentDebug(s.name, "Flushing", recordCount, "records")
var res *http.Response var res *http.Response
for i := 0; i < s.config.MaxRetries; i++ { for i := 0; i < s.config.MaxRetries; i++ {
// Create new request to send buffer // Create new request to send buffer
@ -253,6 +287,7 @@ func NewHttpSink(name string, config json.RawMessage) (Sink, error) {
// should be larger than the measurement interval to keep the connection open // should be larger than the measurement interval to keep the connection open
s.config.IdleConnTimeout = "120s" s.config.IdleConnTimeout = "120s"
s.config.Timeout = "5s" s.config.Timeout = "5s"
s.config.BatchSize = 1000
s.config.FlushDelay = "5s" s.config.FlushDelay = "5s"
s.config.MaxRetries = 3 s.config.MaxRetries = 3
cclog.ComponentDebug(s.name, "init") cclog.ComponentDebug(s.name, "init")

View File

@ -18,6 +18,7 @@ The `http` sink uses POST requests to a HTTP server to submit the metrics in the
"timeout": "5s", "timeout": "5s",
"idle_connection_timeout" : "5s", "idle_connection_timeout" : "5s",
"flush_delay": "2s", "flush_delay": "2s",
"batch_size": 1000
} }
} }
``` ```
@ -32,3 +33,4 @@ The `http` sink uses POST requests to a HTTP server to submit the metrics in the
- `max_retries`: Maximum number of retries to connect to the http server - `max_retries`: Maximum number of retries to connect to the http server
- `idle_connection_timeout`: Timeout for idle connections (default '120s'). Should be larger than the measurement interval to keep the connection open - `idle_connection_timeout`: Timeout for idle connections (default '120s'). Should be larger than the measurement interval to keep the connection open
- `flush_delay`: Batch all writes arriving in during this duration (default '1s', batching can be disabled by setting it to 0) - `flush_delay`: Batch all writes arriving in during this duration (default '1s', batching can be disabled by setting it to 0)
- `batch_size`: Maximal batch size. If `batch_size` is reached before the end of `flush_delay`, the metrics are sent without further delay