diff --git a/sinks/httpSink.go b/sinks/httpSink.go index f592cbc..dae743e 100644 --- a/sinks/httpSink.go +++ b/sinks/httpSink.go @@ -141,8 +141,7 @@ func (s *HttpSink) Write(m lp.CCMetric) error { // Check that encoding worked if err != nil { - cclog.ComponentError(s.name, "Write(): Encoding failed:", err) - return err + return fmt.Errorf("Encoding failed: %v", err) } if s.config.flushDelay == 0 { @@ -177,6 +176,7 @@ func (s *HttpSink) Write(m lp.CCMetric) error { return nil } +// Flush sends all metrics stored in encoder to HTTP server func (s *HttpSink) Flush() error { // Lock for encoder usage @@ -193,6 +193,8 @@ func (s *HttpSink) Flush() error { return nil } + cclog.ComponentDebug(s.name, "Flush(): Flushing") + var res *http.Response for i := 0; i < s.config.MaxRetries; i++ { // Create new request to send buffer @@ -239,12 +241,16 @@ func (s *HttpSink) Flush() error { } func (s *HttpSink) Close() { + cclog.ComponentDebug(s.name, "Closing HTTP connection") + // Stop existing timer and immediately flush if s.flushTimer != nil { if ok := s.flushTimer.Stop(); ok { s.timerLock.Unlock() } } + + // Flush if err := s.Flush(); err != nil { cclog.ComponentError(s.name, "Close(): Flush failed:", err) } diff --git a/sinks/influxSink.go b/sinks/influxSink.go index 5bdf5c6..842aeff 100644 --- a/sinks/influxSink.go +++ b/sinks/influxSink.go @@ -6,7 +6,6 @@ import ( "encoding/json" "errors" "fmt" - "strings" "sync" "time" @@ -34,14 +33,13 @@ type InfluxSink struct { // Maximum number of points sent to server in single request. // Default: 1000 BatchSize int `json:"batch_size,omitempty"` + // Time interval for delayed sending of metrics. // If the buffers are already filled before the end of this interval, // the metrics are sent without further delay. // Default: 1s FlushInterval string `json:"flush_delay,omitempty"` - // Number of metrics that are dropped when buffer is full - // Default: 100 - DropRate int `json:"drop_rate,omitempty"` + flushDelay time.Duration // Influx client options: @@ -56,15 +54,24 @@ type InfluxSink struct { // Specify whether to use GZip compression in write requests InfluxUseGzip bool `json:"use_gzip"` } - batch []string - flushTimer *time.Timer - flushDelay time.Duration - batchMutex sync.Mutex // Flush() runs in another goroutine, so this lock has to protect the buffer - flushTimerMutex sync.Mutex // Ensure only one flush timer is running + // influx line protocol encoder encoder influx.Encoder + // number of records stored in the encoder + numRecordsInEncoder int // List of tags and meta data tags which should be used as tags extended_tag_list []key_value_pair + // Flush() runs in another goroutine and accesses the influx line protocol encoder, + // so this encoderLock has to protect the encoder and numRecordsInEncoder + encoderLock sync.Mutex + + // timer to run Flush() + flushTimer *time.Timer + // Lock to assure that only one timer is running at a time + timerLock sync.Mutex + + // WaitGroup to ensure only one send operation is running at a time + sendWaitGroup sync.WaitGroup } // connect connects to the InfluxDB server @@ -159,49 +166,11 @@ func (s *InfluxSink) connect() error { return nil } +// Write sends metric m in influxDB line protocol func (s *InfluxSink) Write(m lp.CCMetric) error { - if s.flushDelay != 0 && s.flushTimerMutex.TryLock() { - // Setup flush timer when flush delay is configured - // and no other timer is already running - if s.flushTimer != nil { - - // Restarting existing flush timer - cclog.ComponentDebug(s.name, "Write():", "Restarting flush timer") - s.flushTimer.Reset(s.flushDelay) - } else { - - // Creating and starting flush timer - cclog.ComponentDebug(s.name, "Write():", "Starting new flush timer") - s.flushTimer = time.AfterFunc( - s.flushDelay, - func() { - defer s.flushTimerMutex.Unlock() - cclog.ComponentDebug(s.name, "Starting flush in flush timer") - if err := s.Flush(); err != nil { - cclog.ComponentError(s.name, "Flush timer: flush failed:", err) - } - }) - } - } - - // Protect access to batch slice - s.batchMutex.Lock() - - // batch slice full, dropping oldest metric(s) - // e.g. when previous flushes failed and batch slice was not cleared - if len(s.batch) == s.config.BatchSize { - newSize := s.config.BatchSize - s.config.DropRate - - for i := 0; i < newSize; i++ { - s.batch[i] = s.batch[i+s.config.DropRate] - } - for i := newSize; i < s.config.BatchSize; i++ { - s.batch[i] = "" - } - s.batch = s.batch[:newSize] - cclog.ComponentError(s.name, "Write():", "Batch slice full, dropping", s.config.DropRate, "oldest metric(s)") - } + // Lock for encoder usage + s.encoderLock.Lock() // Encode measurement name s.encoder.StartLine(m.Name()) @@ -259,70 +228,106 @@ func (s *InfluxSink) Write(m lp.CCMetric) error { // Encode time stamp s.encoder.EndLine(m.Time()) - // Check that encoding worked + // Check for encoder errors if err := s.encoder.Err(); err != nil { - cclog.ComponentError(s.name, "Write():", "Encoding failed:", err) + // Unlock encoder usage + s.encoderLock.Unlock() - // Unlock access to batch slice - s.batchMutex.Unlock() - return err + return fmt.Errorf("Encoding failed: %v", err) } + s.numRecordsInEncoder++ - // Append metric to batch slice - s.batch = append(s.batch, - string( - slices.Clone( - s.encoder.Bytes()))) - s.encoder.Reset() + if s.config.flushDelay == 0 { + // Unlock encoder usage + s.encoderLock.Unlock() - // Flush synchronously if "flush_delay" is zero - // or - // Flush if batch size is reached - if s.flushDelay == 0 || - len(s.batch) == s.config.BatchSize { + // Directly flush if no flush delay is configured + return s.Flush() + } else if s.numRecordsInEncoder == s.config.BatchSize { + // Unlock encoder usage + s.encoderLock.Unlock() // Stop flush timer if s.flushTimer != nil { if ok := s.flushTimer.Stop(); ok { - s.flushTimerMutex.Unlock() + cclog.ComponentDebug(s.name, "Write(): Stopped flush timer. Batch size limit reached before flush delay") + s.timerLock.Unlock() } } - // Unlock access to batch slice - s.batchMutex.Unlock() + // Flush if batch size is reached return s.Flush() + } else if s.timerLock.TryLock() { + + // Setup flush timer when flush delay is configured + // and no other timer is already running + if s.flushTimer != nil { + + // Restarting existing flush timer + cclog.ComponentDebug(s.name, "Write(): Restarting flush timer") + s.flushTimer.Reset(s.config.flushDelay) + } else { + + // Creating and starting flush timer + cclog.ComponentDebug(s.name, "Write(): Starting new flush timer") + s.flushTimer = time.AfterFunc( + s.config.flushDelay, + func() { + defer s.timerLock.Unlock() + cclog.ComponentDebug(s.name, "Starting flush triggered by flush timer") + if err := s.Flush(); err != nil { + cclog.ComponentError(s.name, "Flush triggered by flush timer: flush failed:", err) + } + }) + } } - // Unlock access to batch slice - s.batchMutex.Unlock() + // Unlock encoder usage + s.encoderLock.Unlock() return nil } -// Flush sends all metrics buffered in batch slice to InfluxDB server +// Flush sends all metrics stored in encoder to InfluxDB server func (s *InfluxSink) Flush() error { - cclog.ComponentDebug(s.name, "Flushing") - // Lock access to batch slice - s.batchMutex.Lock() - defer s.batchMutex.Unlock() + // Lock for encoder usage + // Own lock for as short as possible: the time it takes to clone the buffer. + s.encoderLock.Lock() - // Nothing to do, batch slice is empty - if len(s.batch) == 0 { + buf := slices.Clone(s.encoder.Bytes()) + numRecordsInBuf := s.numRecordsInEncoder + s.encoder.Reset() + s.numRecordsInEncoder = 0 + + // Unlock encoder usage + s.encoderLock.Unlock() + + if len(buf) == 0 { return nil } - // Send metrics from batch slice - err := s.writeApi.WriteRecord(context.Background(), strings.Join(s.batch, "")) - if err != nil { - cclog.ComponentError(s.name, "Flush():", "Flush of", len(s.batch), "metrics failed:", err) - return err - } + cclog.ComponentDebug(s.name, "Flush(): Flushing", numRecordsInBuf, "metrics") - // Clear batch slice - for i := range s.batch { - s.batch[i] = "" - } - s.batch = s.batch[:0] + // Asynchron send of encoder metrics + s.sendWaitGroup.Wait() + s.sendWaitGroup.Add(1) + go func() { + defer s.sendWaitGroup.Done() + startTime := time.Now() + err := s.writeApi.WriteRecord(context.Background(), string(buf)) + if err != nil { + cclog.ComponentError( + s.name, + "Flush():", + "Flush failed:", err, + "(number of records =", numRecordsInBuf, + ", buffer size =", len(buf), + ", send duration =", time.Since(startTime), + ")", + ) + return + } + }() return nil } @@ -333,14 +338,18 @@ func (s *InfluxSink) Close() { // Stop existing timer and immediately flush if s.flushTimer != nil { if ok := s.flushTimer.Stop(); ok { - s.flushTimerMutex.Unlock() + s.timerLock.Unlock() } } - s.Flush() + + // Flush if err := s.Flush(); err != nil { cclog.ComponentError(s.name, "Close():", "Flush failed:", err) } + // Wait for send operation to finish + s.sendWaitGroup.Wait() + s.client.Close() } @@ -352,7 +361,6 @@ func NewInfluxSink(name string, config json.RawMessage) (Sink, error) { // Set config default values s.config.BatchSize = 1000 s.config.FlushInterval = "1s" - s.config.DropRate = 100 // Read config if len(config) > 0 { @@ -388,24 +396,13 @@ func NewInfluxSink(name string, config json.RawMessage) (Sink, error) { if len(s.config.FlushInterval) > 0 { t, err := time.ParseDuration(s.config.FlushInterval) if err == nil { - s.flushDelay = t + s.config.flushDelay = t } } if !(s.config.BatchSize > 0) { return s, fmt.Errorf("batch_size=%d in InfluxDB config must be > 0", s.config.BatchSize) } - if !(s.config.DropRate > 0) { - return s, fmt.Errorf("drop_rate=%d in InfluxDB config must be > 0", s.config.DropRate) - } - if !(s.config.BatchSize > s.config.DropRate) { - return s, fmt.Errorf( - "batch_size=%d must be greater then drop_rate=%d in InfluxDB config", - s.config.BatchSize, s.config.DropRate) - } - - // allocate batch slice - s.batch = make([]string, 0, s.config.BatchSize) // Connect to InfluxDB server if err := s.connect(); err != nil { diff --git a/sinks/influxSink.md b/sinks/influxSink.md index 258ecb5..a7c684e 100644 --- a/sinks/influxSink.md +++ b/sinks/influxSink.md @@ -27,9 +27,9 @@ The `influxdb` sink uses the official [InfluxDB golang client](https://pkg.go.de - `meta_as_tags`: print all meta information as tags in the output (optional) - `database`: All metrics are written to this bucket - `host`: Hostname of the InfluxDB database server -- `port`: Portnumber (as string) of the InfluxDB database server -- `user`: Username for basic authentification -- `password`: Password for basic authentification +- `port`: Port number (as string) of the InfluxDB database server +- `user`: Username for basic authentication +- `password`: Password for basic authentication - `organization`: Organization in the InfluxDB - `ssl`: Use SSL connection - `flush_delay`: Group metrics coming in to a single batch