diff --git a/sinks/httpSink.go b/sinks/httpSink.go index d9e6106..f592cbc 100644 --- a/sinks/httpSink.go +++ b/sinks/httpSink.go @@ -38,13 +38,7 @@ type HttpSinkConfig struct { IdleConnTimeout string `json:"idle_connection_timeout,omitempty"` idleConnTimeout time.Duration - // Maximum number of points sent to server in single request. - // Default: 1000 - BatchSize int `json:"batch_size,omitempty"` - // Batch all writes arriving in during this duration - // If batch size is reached before the end of this interval, - // the metrics are sent without further delay. // (default '5s', batching can be disabled by setting it to 0) FlushDelay string `json:"flush_delay,omitempty"` flushDelay time.Duration @@ -67,17 +61,13 @@ type HttpSink struct { extended_tag_list []key_value_pair // Flush() runs in another goroutine and accesses the influx line protocol encoder, // so this encoderLock has to protect the encoder - encoderLock sync.Mutex - encoderRecordCount int + encoderLock sync.Mutex // timer to run Flush() flushTimer *time.Timer // Lock to assure that only one timer is running at a time timerLock sync.Mutex - // WaitGroup for concurrent flush operations - flushWaitGroup sync.WaitGroup - config HttpSinkConfig } @@ -143,8 +133,6 @@ func (s *HttpSink) Write(m lp.CCMetric) error { // Encode time stamp s.encoder.EndLine(m.Time()) - s.encoderRecordCount++ - // Check for encoder errors err := s.encoder.Err() @@ -161,25 +149,6 @@ func (s *HttpSink) Write(m lp.CCMetric) error { // Directly flush if no flush delay is configured return s.Flush() - } else if s.encoderRecordCount > s.config.BatchSize { - - // Flush if batch size limit is reached - - // Stop flush timer if possible - if s.flushTimer != nil { - if ok := s.flushTimer.Stop(); ok { - cclog.ComponentDebug(s.name, "Write(): Stopped flush timer. Batch size limit reached before flush delay") - s.timerLock.Unlock() - } - } - - // Start asynchronous flush operation - go func() { - cclog.ComponentDebug(s.name, "Starting flush triggered by batch size limit") - if err := s.Flush(); err != nil { - cclog.ComponentError(s.name, "Flush triggered by batch size limit: flush failed:", err) - } - }() } else if s.timerLock.TryLock() { // Setup flush timer when flush delay is configured @@ -209,8 +178,6 @@ func (s *HttpSink) Write(m lp.CCMetric) error { } func (s *HttpSink) Flush() error { - s.flushWaitGroup.Add(1) - defer s.flushWaitGroup.Done() // Lock for encoder usage // Own lock for as short as possible: the time it takes to clone the buffer. @@ -218,8 +185,6 @@ func (s *HttpSink) Flush() error { buf := slices.Clone(s.encoder.Bytes()) s.encoder.Reset() - recordCount := s.encoderRecordCount - s.encoderRecordCount = 0 // Unlock encoder usage s.encoderLock.Unlock() @@ -228,8 +193,6 @@ func (s *HttpSink) Flush() error { return nil } - cclog.ComponentDebug(s.name, "Flush(): Flushing", recordCount, "records") - var res *http.Response for i := 0; i < s.config.MaxRetries; i++ { // Create new request to send buffer @@ -286,9 +249,6 @@ func (s *HttpSink) Close() { cclog.ComponentError(s.name, "Close(): Flush failed:", err) } - // Wait for flush operations to finish - s.flushWaitGroup.Wait() - s.client.CloseIdleConnections() } @@ -300,7 +260,6 @@ func NewHttpSink(name string, config json.RawMessage) (Sink, error) { // should be larger than the measurement interval to keep the connection open s.config.IdleConnTimeout = "120s" s.config.Timeout = "5s" - s.config.BatchSize = 1000 s.config.FlushDelay = "5s" s.config.MaxRetries = 3 cclog.ComponentDebug(s.name, "Init()") @@ -360,7 +319,6 @@ func NewHttpSink(name string, config json.RawMessage) (Sink, error) { // Configure influx line protocol encoder s.encoder.SetPrecision(influx.Nanosecond) - s.encoderRecordCount = 0 s.extended_tag_list = make([]key_value_pair, 0) return s, nil