From 39bfeab283d5ee746c82d1c6435f78aa66285fdc Mon Sep 17 00:00:00 2001 From: Lou Knauer Date: Thu, 10 Feb 2022 09:54:44 +0100 Subject: [PATCH] HttpSink: dynamically sized batches flushed after timer --- sinks/httpSink.go | 70 ++++++++++++++++++++++++++++++++++------------- 1 file changed, 51 insertions(+), 19 deletions(-) diff --git a/sinks/httpSink.go b/sinks/httpSink.go index 25880ea..ce46bab 100644 --- a/sinks/httpSink.go +++ b/sinks/httpSink.go @@ -6,8 +6,10 @@ import ( "errors" "fmt" "net/http" + "sync" "time" + cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" influx "github.com/influxdata/line-protocol" ) @@ -19,19 +21,21 @@ type HttpSinkConfig struct { Timeout string `json:"timeout,omitempty"` MaxIdleConns int `json:"max_idle_connections,omitempty"` IdleConnTimeout string `json:"idle_connection_timeout,omitempty"` - BatchSize int `json:"batch_size,omitempty"` + FlushDelay string `json:"flush_delay,omitempty"` } type HttpSink struct { sink client *http.Client encoder *influx.Encoder + lock sync.Mutex // Flush() runs in another goroutine, so this lock has to protect the buffer buffer *bytes.Buffer + flushTimer *time.Timer config HttpSinkConfig maxIdleConns int idleConnTimeout time.Duration timeout time.Duration - batchCounter int + flushDelay time.Duration } func (s *HttpSink) Init(config json.RawMessage) error { @@ -40,10 +44,7 @@ func (s *HttpSink) Init(config json.RawMessage) error { s.config.MaxIdleConns = 10 s.config.IdleConnTimeout = "5s" s.config.Timeout = "5s" - s.config.BatchSize = 20 - - // Reset counter - s.batchCounter = 0 + s.config.FlushDelay = "1s" // Read config if len(config) > 0 { @@ -70,6 +71,12 @@ func (s *HttpSink) Init(config json.RawMessage) error { s.timeout = t } } + if len(s.config.FlushDelay) > 0 { + t, err := time.ParseDuration(s.config.FlushDelay) + if err == nil { + s.flushDelay = t + } + } tr := &http.Transport{ MaxIdleConns: s.maxIdleConns, IdleConnTimeout: s.idleConnTimeout, @@ -83,26 +90,48 @@ func (s *HttpSink) Init(config json.RawMessage) error { } func (s *HttpSink) Write(m lp.CCMetric) error { - p := m.ToPoint(s.config.MetaAsTags) - _, err := s.encoder.Encode(p) + if s.buffer.Len() == 0 && s.flushDelay != 0 { + // This is the first write since the last flush, start the flushTimer! + if s.flushTimer != nil && s.flushTimer.Stop() { + cclog.ComponentDebug("HttpSink", "unexpected: the flushTimer was already running?") + } - // Flush when received more metrics than batch size - s.batchCounter++ - if s.batchCounter > s.config.BatchSize { - s.Flush() + // Run a batched flush for all lines that have arrived in the last second + s.flushTimer = time.AfterFunc(s.flushDelay, func() { + if err := s.Flush(); err != nil { + cclog.ComponentError("HttpSink", "flush failed:", err.Error()) + } + }) } + + p := m.ToPoint(s.config.MetaAsTags) + + s.lock.Lock() + _, err := s.encoder.Encode(p) + s.lock.Unlock() // defer does not work here as Flush() takes the lock as well + + if err != nil { + return err + } + + // Flush synchronously if "flush_delay" is zero + if s.flushDelay == 0 { + return s.Flush() + } + return err } func (s *HttpSink) Flush() error { + // buffer is read by client.Do, prevent concurrent modifications + s.lock.Lock() + defer s.lock.Unlock() + // Do not flush empty buffer - if s.batchCounter == 0 { + if s.buffer.Len() == 0 { return nil } - // Reset counter - s.batchCounter = 0 - // Create new request to send buffer req, err := http.NewRequest(http.MethodPost, s.config.URL, s.buffer) if err != nil { @@ -120,12 +149,12 @@ func (s *HttpSink) Flush() error { // Clear buffer s.buffer.Reset() - // Handle error code + // Handle transport/tcp errors if err != nil { return err } - // Handle status code + // Handle application errors if res.StatusCode != http.StatusOK { return errors.New(res.Status) } @@ -134,6 +163,9 @@ func (s *HttpSink) Flush() error { } func (s *HttpSink) Close() { - s.Flush() + s.flushTimer.Stop() + if err := s.Flush(); err != nil { + cclog.ComponentError("HttpSink", "flush failed:", err.Error()) + } s.client.CloseIdleConnections() }