From feb8f16af8b5ef37414a456088d2572e1cb21273 Mon Sep 17 00:00:00 2001 From: Thomas Roehl Date: Wed, 4 May 2022 13:05:10 +0200 Subject: [PATCH] Add batch_size option to httpSink --- sinks/httpSink.go | 8 ++++++++ sinks/httpSink.md | 2 ++ 2 files changed, 10 insertions(+) diff --git a/sinks/httpSink.go b/sinks/httpSink.go index 7713638..58aae45 100644 --- a/sinks/httpSink.go +++ b/sinks/httpSink.go @@ -22,6 +22,7 @@ type HttpSinkConfig struct { MaxIdleConns int `json:"max_idle_connections,omitempty"` IdleConnTimeout string `json:"idle_connection_timeout,omitempty"` FlushDelay string `json:"flush_delay,omitempty"` + BatchSize int `json:"batch_size,omitempty"` } type HttpSink struct { @@ -36,6 +37,7 @@ type HttpSink struct { idleConnTimeout time.Duration timeout time.Duration flushDelay time.Duration + batchSize int } func (s *HttpSink) Write(m lp.CCMetric) error { @@ -57,6 +59,7 @@ func (s *HttpSink) Write(m lp.CCMetric) error { s.lock.Lock() _, err := s.encoder.Encode(p) + s.batchSize++ s.lock.Unlock() // defer does not work here as Flush() takes the lock as well if err != nil { @@ -68,6 +71,9 @@ func (s *HttpSink) Write(m lp.CCMetric) error { if s.flushDelay == 0 { return s.Flush() } + if s.batchSize == s.config.BatchSize { + return s.Flush() + } return err } @@ -99,6 +105,7 @@ func (s *HttpSink) Flush() error { // Clear buffer s.buffer.Reset() + s.batchSize = 0 // Handle transport/tcp errors if err != nil { @@ -132,6 +139,7 @@ func NewHttpSink(name string, config json.RawMessage) (Sink, error) { s.config.IdleConnTimeout = "5s" s.config.Timeout = "5s" s.config.FlushDelay = "1s" + s.config.BatchSize = 100 // Read config if len(config) > 0 { diff --git a/sinks/httpSink.md b/sinks/httpSink.md index 23203a2..dd03173 100644 --- a/sinks/httpSink.md +++ b/sinks/httpSink.md @@ -15,6 +15,7 @@ The `http` sink uses POST requests to a HTTP server to submit the metrics in the "max_idle_connections" : 10, "idle_connection_timeout" : "5s", "flush_delay": "2s", + "batch_size" : 100 } } ``` @@ -27,3 +28,4 @@ The `http` sink uses POST requests to a HTTP server to submit the metrics in the - `max_idle_connections`: Maximally idle connections (default 10) - `idle_connection_timeout`: Timeout for idle connections (default '5s') - `flush_delay`: Batch all writes arriving in during this duration (default '1s', batching can be disabled by setting it to 0) +- `batch_size`: Maximal number of batched metrics. Either it is flushed because batch size or the `flush_delay` is reached