mirror of
				https://github.com/ClusterCockpit/cc-metric-collector.git
				synced 2025-10-31 09:05:05 +01:00 
			
		
		
		
	Add asynchron send of encoder metrics
This commit is contained in:
		| @@ -141,8 +141,7 @@ func (s *HttpSink) Write(m lp.CCMetric) error { | ||||
|  | ||||
| 	// Check that encoding worked | ||||
| 	if err != nil { | ||||
| 		cclog.ComponentError(s.name, "Write(): Encoding failed:", err) | ||||
| 		return err | ||||
| 		return fmt.Errorf("Encoding failed: %v", err) | ||||
| 	} | ||||
|  | ||||
| 	if s.config.flushDelay == 0 { | ||||
| @@ -177,6 +176,7 @@ func (s *HttpSink) Write(m lp.CCMetric) error { | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // Flush sends all metrics stored in encoder to HTTP server | ||||
| func (s *HttpSink) Flush() error { | ||||
|  | ||||
| 	// Lock for encoder usage | ||||
| @@ -193,6 +193,8 @@ func (s *HttpSink) Flush() error { | ||||
| 		return nil | ||||
| 	} | ||||
|  | ||||
| 	cclog.ComponentDebug(s.name, "Flush(): Flushing") | ||||
|  | ||||
| 	var res *http.Response | ||||
| 	for i := 0; i < s.config.MaxRetries; i++ { | ||||
| 		// Create new request to send buffer | ||||
| @@ -239,12 +241,16 @@ func (s *HttpSink) Flush() error { | ||||
| } | ||||
|  | ||||
| func (s *HttpSink) Close() { | ||||
| 	cclog.ComponentDebug(s.name, "Closing HTTP connection") | ||||
|  | ||||
| 	// Stop existing timer and immediately flush | ||||
| 	if s.flushTimer != nil { | ||||
| 		if ok := s.flushTimer.Stop(); ok { | ||||
| 			s.timerLock.Unlock() | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	// Flush | ||||
| 	if err := s.Flush(); err != nil { | ||||
| 		cclog.ComponentError(s.name, "Close(): Flush failed:", err) | ||||
| 	} | ||||
|   | ||||
| @@ -6,7 +6,6 @@ import ( | ||||
| 	"encoding/json" | ||||
| 	"errors" | ||||
| 	"fmt" | ||||
| 	"strings" | ||||
| 	"sync" | ||||
| 	"time" | ||||
|  | ||||
| @@ -34,14 +33,13 @@ type InfluxSink struct { | ||||
| 		// Maximum number of points sent to server in single request. | ||||
| 		// Default: 1000 | ||||
| 		BatchSize int `json:"batch_size,omitempty"` | ||||
|  | ||||
| 		// Time interval for delayed sending of metrics. | ||||
| 		// If the buffers are already filled before the end of this interval, | ||||
| 		// the metrics are sent without further delay. | ||||
| 		// Default: 1s | ||||
| 		FlushInterval string `json:"flush_delay,omitempty"` | ||||
| 		// Number of metrics that are dropped when buffer is full | ||||
| 		// Default: 100 | ||||
| 		DropRate int `json:"drop_rate,omitempty"` | ||||
| 		flushDelay    time.Duration | ||||
|  | ||||
| 		// Influx client options: | ||||
|  | ||||
| @@ -56,15 +54,24 @@ type InfluxSink struct { | ||||
| 		// Specify whether to use GZip compression in write requests | ||||
| 		InfluxUseGzip bool `json:"use_gzip"` | ||||
| 	} | ||||
| 	batch           []string | ||||
| 	flushTimer      *time.Timer | ||||
| 	flushDelay      time.Duration | ||||
| 	batchMutex      sync.Mutex // Flush() runs in another goroutine, so this lock has to protect the buffer | ||||
| 	flushTimerMutex sync.Mutex // Ensure only one flush timer is running | ||||
|  | ||||
| 	// influx line protocol encoder | ||||
| 	encoder influx.Encoder | ||||
| 	// number of records stored in the encoder | ||||
| 	numRecordsInEncoder int | ||||
| 	// List of tags and meta data tags which should be used as tags | ||||
| 	extended_tag_list []key_value_pair | ||||
| 	// Flush() runs in another goroutine and accesses the influx line protocol encoder, | ||||
| 	// so this encoderLock has to protect the encoder and numRecordsInEncoder | ||||
| 	encoderLock sync.Mutex | ||||
|  | ||||
| 	// timer to run Flush() | ||||
| 	flushTimer *time.Timer | ||||
| 	// Lock to assure that only one timer is running at a time | ||||
| 	timerLock sync.Mutex | ||||
|  | ||||
| 	// WaitGroup to ensure only one send operation is running at a time | ||||
| 	sendWaitGroup sync.WaitGroup | ||||
| } | ||||
|  | ||||
| // connect connects to the InfluxDB server | ||||
| @@ -159,49 +166,11 @@ func (s *InfluxSink) connect() error { | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // Write sends metric m in influxDB line protocol | ||||
| func (s *InfluxSink) Write(m lp.CCMetric) error { | ||||
| 	if s.flushDelay != 0 && s.flushTimerMutex.TryLock() { | ||||
|  | ||||
| 		// Setup flush timer when flush delay is configured | ||||
| 		// and no other timer is already running | ||||
| 		if s.flushTimer != nil { | ||||
|  | ||||
| 			// Restarting existing flush timer | ||||
| 			cclog.ComponentDebug(s.name, "Write():", "Restarting flush timer") | ||||
| 			s.flushTimer.Reset(s.flushDelay) | ||||
| 		} else { | ||||
|  | ||||
| 			// Creating and starting flush timer | ||||
| 			cclog.ComponentDebug(s.name, "Write():", "Starting new flush timer") | ||||
| 			s.flushTimer = time.AfterFunc( | ||||
| 				s.flushDelay, | ||||
| 				func() { | ||||
| 					defer s.flushTimerMutex.Unlock() | ||||
| 					cclog.ComponentDebug(s.name, "Starting flush in flush timer") | ||||
| 					if err := s.Flush(); err != nil { | ||||
| 						cclog.ComponentError(s.name, "Flush timer: flush failed:", err) | ||||
| 					} | ||||
| 				}) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	// Protect access to batch slice | ||||
| 	s.batchMutex.Lock() | ||||
|  | ||||
| 	// batch slice full, dropping oldest metric(s) | ||||
| 	// e.g. when previous flushes failed and batch slice was not cleared | ||||
| 	if len(s.batch) == s.config.BatchSize { | ||||
| 		newSize := s.config.BatchSize - s.config.DropRate | ||||
|  | ||||
| 		for i := 0; i < newSize; i++ { | ||||
| 			s.batch[i] = s.batch[i+s.config.DropRate] | ||||
| 		} | ||||
| 		for i := newSize; i < s.config.BatchSize; i++ { | ||||
| 			s.batch[i] = "" | ||||
| 		} | ||||
| 		s.batch = s.batch[:newSize] | ||||
| 		cclog.ComponentError(s.name, "Write():", "Batch slice full, dropping", s.config.DropRate, "oldest metric(s)") | ||||
| 	} | ||||
| 	// Lock for encoder usage | ||||
| 	s.encoderLock.Lock() | ||||
|  | ||||
| 	// Encode measurement name | ||||
| 	s.encoder.StartLine(m.Name()) | ||||
| @@ -259,70 +228,106 @@ func (s *InfluxSink) Write(m lp.CCMetric) error { | ||||
| 	// Encode time stamp | ||||
| 	s.encoder.EndLine(m.Time()) | ||||
|  | ||||
| 	// Check that encoding worked | ||||
| 	// Check for encoder errors | ||||
| 	if err := s.encoder.Err(); err != nil { | ||||
| 		cclog.ComponentError(s.name, "Write():", "Encoding failed:", err) | ||||
| 		// Unlock encoder usage | ||||
| 		s.encoderLock.Unlock() | ||||
|  | ||||
| 		// Unlock access to batch slice | ||||
| 		s.batchMutex.Unlock() | ||||
| 		return err | ||||
| 		return fmt.Errorf("Encoding failed: %v", err) | ||||
| 	} | ||||
| 	s.numRecordsInEncoder++ | ||||
|  | ||||
| 	// Append metric to batch slice | ||||
| 	s.batch = append(s.batch, | ||||
| 		string( | ||||
| 			slices.Clone( | ||||
| 				s.encoder.Bytes()))) | ||||
| 	s.encoder.Reset() | ||||
| 	if s.config.flushDelay == 0 { | ||||
| 		// Unlock encoder usage | ||||
| 		s.encoderLock.Unlock() | ||||
|  | ||||
| 	// Flush synchronously if "flush_delay" is zero | ||||
| 	// or | ||||
| 	// Flush if batch size is reached | ||||
| 	if s.flushDelay == 0 || | ||||
| 		len(s.batch) == s.config.BatchSize { | ||||
| 		// Directly flush if no flush delay is configured | ||||
| 		return s.Flush() | ||||
| 	} else if s.numRecordsInEncoder == s.config.BatchSize { | ||||
| 		// Unlock encoder usage | ||||
| 		s.encoderLock.Unlock() | ||||
|  | ||||
| 		// Stop flush timer | ||||
| 		if s.flushTimer != nil { | ||||
| 			if ok := s.flushTimer.Stop(); ok { | ||||
| 				s.flushTimerMutex.Unlock() | ||||
| 				cclog.ComponentDebug(s.name, "Write(): Stopped flush timer. Batch size limit reached before flush delay") | ||||
| 				s.timerLock.Unlock() | ||||
| 			} | ||||
| 		} | ||||
|  | ||||
| 		// Unlock access to batch slice | ||||
| 		s.batchMutex.Unlock() | ||||
| 		// Flush if batch size is reached | ||||
| 		return s.Flush() | ||||
| 	} else if s.timerLock.TryLock() { | ||||
|  | ||||
| 		// Setup flush timer when flush delay is configured | ||||
| 		// and no other timer is already running | ||||
| 		if s.flushTimer != nil { | ||||
|  | ||||
| 			// Restarting existing flush timer | ||||
| 			cclog.ComponentDebug(s.name, "Write(): Restarting flush timer") | ||||
| 			s.flushTimer.Reset(s.config.flushDelay) | ||||
| 		} else { | ||||
|  | ||||
| 			// Creating and starting flush timer | ||||
| 			cclog.ComponentDebug(s.name, "Write(): Starting new flush timer") | ||||
| 			s.flushTimer = time.AfterFunc( | ||||
| 				s.config.flushDelay, | ||||
| 				func() { | ||||
| 					defer s.timerLock.Unlock() | ||||
| 					cclog.ComponentDebug(s.name, "Starting flush triggered by flush timer") | ||||
| 					if err := s.Flush(); err != nil { | ||||
| 						cclog.ComponentError(s.name, "Flush triggered by flush timer: flush failed:", err) | ||||
| 					} | ||||
| 				}) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	// Unlock access to batch slice | ||||
| 	s.batchMutex.Unlock() | ||||
| 	// Unlock encoder usage | ||||
| 	s.encoderLock.Unlock() | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // Flush sends all metrics buffered in batch slice to InfluxDB server | ||||
| // Flush sends all metrics stored in encoder to InfluxDB server | ||||
| func (s *InfluxSink) Flush() error { | ||||
| 	cclog.ComponentDebug(s.name, "Flushing") | ||||
|  | ||||
| 	// Lock access to batch slice | ||||
| 	s.batchMutex.Lock() | ||||
| 	defer s.batchMutex.Unlock() | ||||
| 	// Lock for encoder usage | ||||
| 	// Own lock for as short as possible: the time it takes to clone the buffer. | ||||
| 	s.encoderLock.Lock() | ||||
|  | ||||
| 	// Nothing to do, batch slice is empty | ||||
| 	if len(s.batch) == 0 { | ||||
| 	buf := slices.Clone(s.encoder.Bytes()) | ||||
| 	numRecordsInBuf := s.numRecordsInEncoder | ||||
| 	s.encoder.Reset() | ||||
| 	s.numRecordsInEncoder = 0 | ||||
|  | ||||
| 	// Unlock encoder usage | ||||
| 	s.encoderLock.Unlock() | ||||
|  | ||||
| 	if len(buf) == 0 { | ||||
| 		return nil | ||||
| 	} | ||||
|  | ||||
| 	// Send metrics from batch slice | ||||
| 	err := s.writeApi.WriteRecord(context.Background(), strings.Join(s.batch, "")) | ||||
| 	if err != nil { | ||||
| 		cclog.ComponentError(s.name, "Flush():", "Flush of", len(s.batch), "metrics failed:", err) | ||||
| 		return err | ||||
| 	} | ||||
| 	cclog.ComponentDebug(s.name, "Flush(): Flushing", numRecordsInBuf, "metrics") | ||||
|  | ||||
| 	// Clear batch slice | ||||
| 	for i := range s.batch { | ||||
| 		s.batch[i] = "" | ||||
| 	} | ||||
| 	s.batch = s.batch[:0] | ||||
| 	// Asynchron send of encoder metrics | ||||
| 	s.sendWaitGroup.Wait() | ||||
| 	s.sendWaitGroup.Add(1) | ||||
| 	go func() { | ||||
| 		defer s.sendWaitGroup.Done() | ||||
| 		startTime := time.Now() | ||||
| 		err := s.writeApi.WriteRecord(context.Background(), string(buf)) | ||||
| 		if err != nil { | ||||
| 			cclog.ComponentError( | ||||
| 				s.name, | ||||
| 				"Flush():", | ||||
| 				"Flush failed:", err, | ||||
| 				"(number of records =", numRecordsInBuf, | ||||
| 				", buffer size =", len(buf), | ||||
| 				", send duration =", time.Since(startTime), | ||||
| 				")", | ||||
| 			) | ||||
| 			return | ||||
| 		} | ||||
| 	}() | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
| @@ -333,14 +338,18 @@ func (s *InfluxSink) Close() { | ||||
| 	// Stop existing timer and immediately flush | ||||
| 	if s.flushTimer != nil { | ||||
| 		if ok := s.flushTimer.Stop(); ok { | ||||
| 			s.flushTimerMutex.Unlock() | ||||
| 			s.timerLock.Unlock() | ||||
| 		} | ||||
| 	} | ||||
| 	s.Flush() | ||||
|  | ||||
| 	// Flush | ||||
| 	if err := s.Flush(); err != nil { | ||||
| 		cclog.ComponentError(s.name, "Close():", "Flush failed:", err) | ||||
| 	} | ||||
|  | ||||
| 	// Wait for send operation to finish | ||||
| 	s.sendWaitGroup.Wait() | ||||
|  | ||||
| 	s.client.Close() | ||||
| } | ||||
|  | ||||
| @@ -352,7 +361,6 @@ func NewInfluxSink(name string, config json.RawMessage) (Sink, error) { | ||||
| 	// Set config default values | ||||
| 	s.config.BatchSize = 1000 | ||||
| 	s.config.FlushInterval = "1s" | ||||
| 	s.config.DropRate = 100 | ||||
|  | ||||
| 	// Read config | ||||
| 	if len(config) > 0 { | ||||
| @@ -388,24 +396,13 @@ func NewInfluxSink(name string, config json.RawMessage) (Sink, error) { | ||||
| 	if len(s.config.FlushInterval) > 0 { | ||||
| 		t, err := time.ParseDuration(s.config.FlushInterval) | ||||
| 		if err == nil { | ||||
| 			s.flushDelay = t | ||||
| 			s.config.flushDelay = t | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	if !(s.config.BatchSize > 0) { | ||||
| 		return s, fmt.Errorf("batch_size=%d in InfluxDB config must be > 0", s.config.BatchSize) | ||||
| 	} | ||||
| 	if !(s.config.DropRate > 0) { | ||||
| 		return s, fmt.Errorf("drop_rate=%d in InfluxDB config must be > 0", s.config.DropRate) | ||||
| 	} | ||||
| 	if !(s.config.BatchSize > s.config.DropRate) { | ||||
| 		return s, fmt.Errorf( | ||||
| 			"batch_size=%d must be greater then drop_rate=%d in InfluxDB config", | ||||
| 			s.config.BatchSize, s.config.DropRate) | ||||
| 	} | ||||
|  | ||||
| 	// allocate batch slice | ||||
| 	s.batch = make([]string, 0, s.config.BatchSize) | ||||
|  | ||||
| 	// Connect to InfluxDB server | ||||
| 	if err := s.connect(); err != nil { | ||||
|   | ||||
| @@ -27,9 +27,9 @@ The `influxdb` sink uses the official [InfluxDB golang client](https://pkg.go.de | ||||
| - `meta_as_tags`: print all meta information as tags in the output (optional) | ||||
| - `database`: All metrics are written to this bucket | ||||
| - `host`: Hostname of the InfluxDB database server | ||||
| - `port`: Portnumber (as string) of the InfluxDB database server | ||||
| - `user`: Username for basic authentification | ||||
| - `password`: Password for basic authentification | ||||
| - `port`: Port number (as string) of the InfluxDB database server | ||||
| - `user`: Username for basic authentication | ||||
| - `password`: Password for basic authentication | ||||
| - `organization`: Organization in the InfluxDB | ||||
| - `ssl`: Use SSL connection | ||||
| - `flush_delay`: Group metrics coming in to a single batch | ||||
|   | ||||
		Reference in New Issue
	
	Block a user