Reverted previous changes.

Made the code to complex without much advantages
This commit is contained in:
Holger Obermaier 2023-10-06 16:56:30 +02:00
parent 94c88f23df
commit fd1cdc5c07

View File

@ -38,13 +38,7 @@ type HttpSinkConfig struct {
IdleConnTimeout string `json:"idle_connection_timeout,omitempty"` IdleConnTimeout string `json:"idle_connection_timeout,omitempty"`
idleConnTimeout time.Duration idleConnTimeout time.Duration
// Maximum number of points sent to server in single request.
// Default: 1000
BatchSize int `json:"batch_size,omitempty"`
// Batch all writes arriving in during this duration // Batch all writes arriving in during this duration
// If batch size is reached before the end of this interval,
// the metrics are sent without further delay.
// (default '5s', batching can be disabled by setting it to 0) // (default '5s', batching can be disabled by setting it to 0)
FlushDelay string `json:"flush_delay,omitempty"` FlushDelay string `json:"flush_delay,omitempty"`
flushDelay time.Duration flushDelay time.Duration
@ -67,17 +61,13 @@ type HttpSink struct {
extended_tag_list []key_value_pair extended_tag_list []key_value_pair
// Flush() runs in another goroutine and accesses the influx line protocol encoder, // Flush() runs in another goroutine and accesses the influx line protocol encoder,
// so this encoderLock has to protect the encoder // so this encoderLock has to protect the encoder
encoderLock sync.Mutex encoderLock sync.Mutex
encoderRecordCount int
// timer to run Flush() // timer to run Flush()
flushTimer *time.Timer flushTimer *time.Timer
// Lock to assure that only one timer is running at a time // Lock to assure that only one timer is running at a time
timerLock sync.Mutex timerLock sync.Mutex
// WaitGroup for concurrent flush operations
flushWaitGroup sync.WaitGroup
config HttpSinkConfig config HttpSinkConfig
} }
@ -143,8 +133,6 @@ func (s *HttpSink) Write(m lp.CCMetric) error {
// Encode time stamp // Encode time stamp
s.encoder.EndLine(m.Time()) s.encoder.EndLine(m.Time())
s.encoderRecordCount++
// Check for encoder errors // Check for encoder errors
err := s.encoder.Err() err := s.encoder.Err()
@ -161,25 +149,6 @@ func (s *HttpSink) Write(m lp.CCMetric) error {
// Directly flush if no flush delay is configured // Directly flush if no flush delay is configured
return s.Flush() return s.Flush()
} else if s.encoderRecordCount > s.config.BatchSize {
// Flush if batch size limit is reached
// Stop flush timer if possible
if s.flushTimer != nil {
if ok := s.flushTimer.Stop(); ok {
cclog.ComponentDebug(s.name, "Write(): Stopped flush timer. Batch size limit reached before flush delay")
s.timerLock.Unlock()
}
}
// Start asynchronous flush operation
go func() {
cclog.ComponentDebug(s.name, "Starting flush triggered by batch size limit")
if err := s.Flush(); err != nil {
cclog.ComponentError(s.name, "Flush triggered by batch size limit: flush failed:", err)
}
}()
} else if s.timerLock.TryLock() { } else if s.timerLock.TryLock() {
// Setup flush timer when flush delay is configured // Setup flush timer when flush delay is configured
@ -209,8 +178,6 @@ func (s *HttpSink) Write(m lp.CCMetric) error {
} }
func (s *HttpSink) Flush() error { func (s *HttpSink) Flush() error {
s.flushWaitGroup.Add(1)
defer s.flushWaitGroup.Done()
// Lock for encoder usage // Lock for encoder usage
// Own lock for as short as possible: the time it takes to clone the buffer. // Own lock for as short as possible: the time it takes to clone the buffer.
@ -218,8 +185,6 @@ func (s *HttpSink) Flush() error {
buf := slices.Clone(s.encoder.Bytes()) buf := slices.Clone(s.encoder.Bytes())
s.encoder.Reset() s.encoder.Reset()
recordCount := s.encoderRecordCount
s.encoderRecordCount = 0
// Unlock encoder usage // Unlock encoder usage
s.encoderLock.Unlock() s.encoderLock.Unlock()
@ -228,8 +193,6 @@ func (s *HttpSink) Flush() error {
return nil return nil
} }
cclog.ComponentDebug(s.name, "Flush(): Flushing", recordCount, "records")
var res *http.Response var res *http.Response
for i := 0; i < s.config.MaxRetries; i++ { for i := 0; i < s.config.MaxRetries; i++ {
// Create new request to send buffer // Create new request to send buffer
@ -286,9 +249,6 @@ func (s *HttpSink) Close() {
cclog.ComponentError(s.name, "Close(): Flush failed:", err) cclog.ComponentError(s.name, "Close(): Flush failed:", err)
} }
// Wait for flush operations to finish
s.flushWaitGroup.Wait()
s.client.CloseIdleConnections() s.client.CloseIdleConnections()
} }
@ -300,7 +260,6 @@ func NewHttpSink(name string, config json.RawMessage) (Sink, error) {
// should be larger than the measurement interval to keep the connection open // should be larger than the measurement interval to keep the connection open
s.config.IdleConnTimeout = "120s" s.config.IdleConnTimeout = "120s"
s.config.Timeout = "5s" s.config.Timeout = "5s"
s.config.BatchSize = 1000
s.config.FlushDelay = "5s" s.config.FlushDelay = "5s"
s.config.MaxRetries = 3 s.config.MaxRetries = 3
cclog.ComponentDebug(s.name, "Init()") cclog.ComponentDebug(s.name, "Init()")
@ -360,7 +319,6 @@ func NewHttpSink(name string, config json.RawMessage) (Sink, error) {
// Configure influx line protocol encoder // Configure influx line protocol encoder
s.encoder.SetPrecision(influx.Nanosecond) s.encoder.SetPrecision(influx.Nanosecond)
s.encoderRecordCount = 0
s.extended_tag_list = make([]key_value_pair, 0) s.extended_tag_list = make([]key_value_pair, 0)
return s, nil return s, nil