From a0e97d216a392fd1979611430e02ba8bf589d157 Mon Sep 17 00:00:00 2001 From: Holger Obermaier <40787752+ho-ob@users.noreply.github.com> Date: Wed, 9 Feb 2022 19:47:49 +0100 Subject: [PATCH] Move all flush operations to the sinks --- sinks/httpSink.go | 22 +++++++++++++++++++++- sinks/sinkManager.go | 13 ------------- 2 files changed, 21 insertions(+), 14 deletions(-) diff --git a/sinks/httpSink.go b/sinks/httpSink.go index 15d38fb..844de0b 100644 --- a/sinks/httpSink.go +++ b/sinks/httpSink.go @@ -22,6 +22,7 @@ type HttpSinkConfig struct { Timeout string `json:"timeout,omitempty"` MaxIdleConns int `json:"max_idle_connections,omitempty"` IdleConnTimeout string `json:"idle_connection_timeout,omitempty"` + BatchSize int `json:"batch_size,omitempty"` } type HttpSink struct { @@ -34,14 +35,19 @@ type HttpSink struct { maxIdleConns int idleConnTimeout time.Duration timeout time.Duration + batchCounter int } func (s *HttpSink) Init(config json.RawMessage) error { + // Set default values s.name = "HttpSink" s.config.SSL = false s.config.MaxIdleConns = 10 s.config.IdleConnTimeout = "5s" s.config.Timeout = "5s" + s.config.BatchSize = 20 + + // Read config if len(config) > 0 { err := json.Unmarshal(config, &s.config) if err != nil { @@ -87,27 +93,40 @@ func (s *HttpSink) Init(config json.RawMessage) error { func (s *HttpSink) Write(m lp.CCMetric) error { p := m.ToPoint(s.config.MetaAsTags) _, err := s.encoder.Encode(p) + + // Flush when received more metrics than batch size + s.batchCounter++ + if s.batchCounter > s.config.BatchSize { + s.Flush() + } return err } func (s *HttpSink) Flush() error { + // Create new request to send buffer req, err := http.NewRequest(http.MethodPost, s.url, s.buffer) if err != nil { return err } + // Set authorization header if len(s.jwt) != 0 { req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", s.jwt)) } + // Send res, err := s.client.Do(req) + + // Clear buffer s.buffer.Reset() + // Handle error code if err != nil { return err } - if res.StatusCode != 200 { + // Handle status code + if res.StatusCode != http.StatusOK { return errors.New(res.Status) } @@ -115,5 +134,6 @@ func (s *HttpSink) Flush() error { } func (s *HttpSink) Close() { + s.Flush() s.client.CloseIdleConnections() } diff --git a/sinks/sinkManager.go b/sinks/sinkManager.go index b319eb4..80877c8 100644 --- a/sinks/sinkManager.go +++ b/sinks/sinkManager.go @@ -82,8 +82,6 @@ func (sm *sinkManager) Init(wg *sync.WaitGroup, sinkConfigFile string) error { // Start starts the sink managers background task, which // distributes received metrics to the sinks func (sm *sinkManager) Start() { - batchcount := 20 - sm.wg.Add(1) go func() { defer sm.wg.Done() @@ -91,7 +89,6 @@ func (sm *sinkManager) Start() { // Sink manager is done done := func() { for _, s := range sm.sinks { - s.Flush() s.Close() } @@ -111,16 +108,6 @@ func (sm *sinkManager) Start() { for _, s := range sm.sinks { s.Write(p) } - - // Flush all outputs - if batchcount == 0 { - cclog.ComponentDebug("SinkManager", "FLUSH") - for _, s := range sm.sinks { - s.Flush() - } - batchcount = 20 - } - batchcount-- } } }()