mirror of
				https://github.com/ClusterCockpit/cc-metric-collector.git
				synced 2025-10-24 23:05:06 +02:00 
			
		
		
		
	HttpSink: dynamically sized batches flushed after timer
This commit is contained in:
		| @@ -6,8 +6,10 @@ import ( | |||||||
| 	"errors" | 	"errors" | ||||||
| 	"fmt" | 	"fmt" | ||||||
| 	"net/http" | 	"net/http" | ||||||
|  | 	"sync" | ||||||
| 	"time" | 	"time" | ||||||
|  |  | ||||||
|  | 	cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" | ||||||
| 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | ||||||
| 	influx "github.com/influxdata/line-protocol" | 	influx "github.com/influxdata/line-protocol" | ||||||
| ) | ) | ||||||
| @@ -19,19 +21,21 @@ type HttpSinkConfig struct { | |||||||
| 	Timeout         string `json:"timeout,omitempty"` | 	Timeout         string `json:"timeout,omitempty"` | ||||||
| 	MaxIdleConns    int    `json:"max_idle_connections,omitempty"` | 	MaxIdleConns    int    `json:"max_idle_connections,omitempty"` | ||||||
| 	IdleConnTimeout string `json:"idle_connection_timeout,omitempty"` | 	IdleConnTimeout string `json:"idle_connection_timeout,omitempty"` | ||||||
| 	BatchSize       int    `json:"batch_size,omitempty"` | 	FlushDelay      string `json:"flush_delay,omitempty"` | ||||||
| } | } | ||||||
|  |  | ||||||
| type HttpSink struct { | type HttpSink struct { | ||||||
| 	sink | 	sink | ||||||
| 	client          *http.Client | 	client          *http.Client | ||||||
| 	encoder         *influx.Encoder | 	encoder         *influx.Encoder | ||||||
|  | 	lock            sync.Mutex // Flush() runs in another goroutine, so this lock has to protect the buffer | ||||||
| 	buffer          *bytes.Buffer | 	buffer          *bytes.Buffer | ||||||
|  | 	flushTimer      *time.Timer | ||||||
| 	config          HttpSinkConfig | 	config          HttpSinkConfig | ||||||
| 	maxIdleConns    int | 	maxIdleConns    int | ||||||
| 	idleConnTimeout time.Duration | 	idleConnTimeout time.Duration | ||||||
| 	timeout         time.Duration | 	timeout         time.Duration | ||||||
| 	batchCounter    int | 	flushDelay      time.Duration | ||||||
| } | } | ||||||
|  |  | ||||||
| func (s *HttpSink) Init(config json.RawMessage) error { | func (s *HttpSink) Init(config json.RawMessage) error { | ||||||
| @@ -40,10 +44,7 @@ func (s *HttpSink) Init(config json.RawMessage) error { | |||||||
| 	s.config.MaxIdleConns = 10 | 	s.config.MaxIdleConns = 10 | ||||||
| 	s.config.IdleConnTimeout = "5s" | 	s.config.IdleConnTimeout = "5s" | ||||||
| 	s.config.Timeout = "5s" | 	s.config.Timeout = "5s" | ||||||
| 	s.config.BatchSize = 20 | 	s.config.FlushDelay = "1s" | ||||||
|  |  | ||||||
| 	// Reset counter |  | ||||||
| 	s.batchCounter = 0 |  | ||||||
|  |  | ||||||
| 	// Read config | 	// Read config | ||||||
| 	if len(config) > 0 { | 	if len(config) > 0 { | ||||||
| @@ -70,6 +71,12 @@ func (s *HttpSink) Init(config json.RawMessage) error { | |||||||
| 			s.timeout = t | 			s.timeout = t | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
|  | 	if len(s.config.FlushDelay) > 0 { | ||||||
|  | 		t, err := time.ParseDuration(s.config.FlushDelay) | ||||||
|  | 		if err == nil { | ||||||
|  | 			s.flushDelay = t | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
| 	tr := &http.Transport{ | 	tr := &http.Transport{ | ||||||
| 		MaxIdleConns:    s.maxIdleConns, | 		MaxIdleConns:    s.maxIdleConns, | ||||||
| 		IdleConnTimeout: s.idleConnTimeout, | 		IdleConnTimeout: s.idleConnTimeout, | ||||||
| @@ -83,26 +90,48 @@ func (s *HttpSink) Init(config json.RawMessage) error { | |||||||
| } | } | ||||||
|  |  | ||||||
| func (s *HttpSink) Write(m lp.CCMetric) error { | func (s *HttpSink) Write(m lp.CCMetric) error { | ||||||
| 	p := m.ToPoint(s.config.MetaAsTags) | 	if s.buffer.Len() == 0 && s.flushDelay != 0 { | ||||||
| 	_, err := s.encoder.Encode(p) | 		// This is the first write since the last flush, start the flushTimer! | ||||||
|  | 		if s.flushTimer != nil && s.flushTimer.Stop() { | ||||||
|  | 			cclog.ComponentDebug("HttpSink", "unexpected: the flushTimer was already running?") | ||||||
|  | 		} | ||||||
|  |  | ||||||
| 	// Flush when received more metrics than batch size | 		// Run a batched flush for all lines that have arrived in the last second | ||||||
| 	s.batchCounter++ | 		s.flushTimer = time.AfterFunc(s.flushDelay, func() { | ||||||
| 	if s.batchCounter > s.config.BatchSize { | 			if err := s.Flush(); err != nil { | ||||||
| 		s.Flush() | 				cclog.ComponentError("HttpSink", "flush failed:", err.Error()) | ||||||
|  | 			} | ||||||
|  | 		}) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | 	p := m.ToPoint(s.config.MetaAsTags) | ||||||
|  |  | ||||||
|  | 	s.lock.Lock() | ||||||
|  | 	_, err := s.encoder.Encode(p) | ||||||
|  | 	s.lock.Unlock() // defer does not work here as Flush() takes the lock as well | ||||||
|  |  | ||||||
|  | 	if err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// Flush synchronously if "flush_delay" is zero | ||||||
|  | 	if s.flushDelay == 0 { | ||||||
|  | 		return s.Flush() | ||||||
|  | 	} | ||||||
|  |  | ||||||
| 	return err | 	return err | ||||||
| } | } | ||||||
|  |  | ||||||
| func (s *HttpSink) Flush() error { | func (s *HttpSink) Flush() error { | ||||||
|  | 	// buffer is read by client.Do, prevent concurrent modifications | ||||||
|  | 	s.lock.Lock() | ||||||
|  | 	defer s.lock.Unlock() | ||||||
|  |  | ||||||
| 	// Do not flush empty buffer | 	// Do not flush empty buffer | ||||||
| 	if s.batchCounter == 0 { | 	if s.buffer.Len() == 0 { | ||||||
| 		return nil | 		return nil | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	// Reset counter |  | ||||||
| 	s.batchCounter = 0 |  | ||||||
|  |  | ||||||
| 	// Create new request to send buffer | 	// Create new request to send buffer | ||||||
| 	req, err := http.NewRequest(http.MethodPost, s.config.URL, s.buffer) | 	req, err := http.NewRequest(http.MethodPost, s.config.URL, s.buffer) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| @@ -120,12 +149,12 @@ func (s *HttpSink) Flush() error { | |||||||
| 	// Clear buffer | 	// Clear buffer | ||||||
| 	s.buffer.Reset() | 	s.buffer.Reset() | ||||||
|  |  | ||||||
| 	// Handle error code | 	// Handle transport/tcp errors | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	// Handle status code | 	// Handle application errors | ||||||
| 	if res.StatusCode != http.StatusOK { | 	if res.StatusCode != http.StatusOK { | ||||||
| 		return errors.New(res.Status) | 		return errors.New(res.Status) | ||||||
| 	} | 	} | ||||||
| @@ -134,6 +163,9 @@ func (s *HttpSink) Flush() error { | |||||||
| } | } | ||||||
|  |  | ||||||
| func (s *HttpSink) Close() { | func (s *HttpSink) Close() { | ||||||
| 	s.Flush() | 	s.flushTimer.Stop() | ||||||
|  | 	if err := s.Flush(); err != nil { | ||||||
|  | 		cclog.ComponentError("HttpSink", "flush failed:", err.Error()) | ||||||
|  | 	} | ||||||
| 	s.client.CloseIdleConnections() | 	s.client.CloseIdleConnections() | ||||||
| } | } | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user