From 5aa9603c01169d3f1de3639d78a3e347e943de12 Mon Sep 17 00:00:00 2001 From: Holger Obermaier <40787752+ho-ob@users.noreply.github.com> Date: Wed, 4 Oct 2023 12:37:25 +0200 Subject: [PATCH] Add batch_size config --- sinks/httpSink.go | 41 ++++++++++++++++++++++++++++++++++++++--- sinks/httpSink.md | 2 ++ 2 files changed, 40 insertions(+), 3 deletions(-) diff --git a/sinks/httpSink.go b/sinks/httpSink.go index 3a33b85..f48a5b5 100644 --- a/sinks/httpSink.go +++ b/sinks/httpSink.go @@ -38,7 +38,13 @@ type HttpSinkConfig struct { IdleConnTimeout string `json:"idle_connection_timeout,omitempty"` idleConnTimeout time.Duration + // Maximum number of points sent to server in single request. + // Default: 1000 + BatchSize int `json:"batch_size,omitempty"` + // Batch all writes arriving in during this duration + // If batch size is reached before the end of this interval, + // the metrics are sent without further delay. // (default '5s', batching can be disabled by setting it to 0) FlushDelay string `json:"flush_delay,omitempty"` flushDelay time.Duration @@ -61,7 +67,8 @@ type HttpSink struct { extended_tag_list []key_value_pair // Flush() runs in another goroutine and accesses the influx line protocol encoder, // so this encoderLock has to protect the encoder - encoderLock sync.Mutex + encoderLock sync.Mutex + encoderRecordCount int // timer to run Flush() flushTimer *time.Timer @@ -133,6 +140,8 @@ func (s *HttpSink) Write(m lp.CCMetric) error { // Encode time stamp s.encoder.EndLine(m.Time()) + s.encoderRecordCount++ + // Check for encoder errors err := s.encoder.Err() @@ -149,6 +158,27 @@ func (s *HttpSink) Write(m lp.CCMetric) error { // Directly flush if no flush delay is configured return s.Flush() + } else if s.encoderRecordCount > s.config.BatchSize { + + // Flush if batch size limit is reached + + // Stop flush timer if possible + if s.flushTimer != nil { + if ok := s.flushTimer.Stop(); ok { + cclog.ComponentDebug(s.name, "Stopped flush timer. Batch size limit reached before flush delay") + s.timerLock.Unlock() + } + } + + // Start asynchronous flush operation + go func() { + cclog.ComponentDebug(s.name, "Starting flush triggered by batch size limit") + if err := s.Flush(); err != nil { + cclog.ComponentError(s.name, "Flush triggered by batch size limit: flush failed:", err) + } + }() + return nil + } else if s.timerLock.TryLock() { // Setup flush timer when flush delay is configured @@ -166,9 +196,9 @@ func (s *HttpSink) Write(m lp.CCMetric) error { s.config.flushDelay, func() { defer s.timerLock.Unlock() - cclog.ComponentDebug(s.name, "Starting flush in flush timer") + cclog.ComponentDebug(s.name, "Starting flush triggered by flush timer") if err := s.Flush(); err != nil { - cclog.ComponentError(s.name, "Flush timer: flush failed:", err) + cclog.ComponentError(s.name, "Flush triggered by flush timer: flush failed:", err) } }) } @@ -184,6 +214,8 @@ func (s *HttpSink) Flush() error { buf := slices.Clone(s.encoder.Bytes()) s.encoder.Reset() + recordCount := s.encoderRecordCount + s.encoderRecordCount = 0 // Unlock encoder usage s.encoderLock.Unlock() @@ -192,6 +224,8 @@ func (s *HttpSink) Flush() error { return nil } + cclog.ComponentDebug(s.name, "Flushing", recordCount, "records") + var res *http.Response for i := 0; i < s.config.MaxRetries; i++ { // Create new request to send buffer @@ -253,6 +287,7 @@ func NewHttpSink(name string, config json.RawMessage) (Sink, error) { // should be larger than the measurement interval to keep the connection open s.config.IdleConnTimeout = "120s" s.config.Timeout = "5s" + s.config.BatchSize = 1000 s.config.FlushDelay = "5s" s.config.MaxRetries = 3 cclog.ComponentDebug(s.name, "init") diff --git a/sinks/httpSink.md b/sinks/httpSink.md index a90a134..ccb7c4b 100644 --- a/sinks/httpSink.md +++ b/sinks/httpSink.md @@ -18,6 +18,7 @@ The `http` sink uses POST requests to a HTTP server to submit the metrics in the "timeout": "5s", "idle_connection_timeout" : "5s", "flush_delay": "2s", + "batch_size": 1000 } } ``` @@ -32,3 +33,4 @@ The `http` sink uses POST requests to a HTTP server to submit the metrics in the - `max_retries`: Maximum number of retries to connect to the http server - `idle_connection_timeout`: Timeout for idle connections (default '120s'). Should be larger than the measurement interval to keep the connection open - `flush_delay`: Batch all writes arriving in during this duration (default '1s', batching can be disabled by setting it to 0) +- `batch_size`: Maximal batch size. If `batch_size` is reached before the end of `flush_delay`, the metrics are sent without further delay