Move all flush operations to the sinks

This commit is contained in:
Holger Obermaier 2022-02-09 19:47:49 +01:00
parent 1d299be3ea
commit a0e97d216a
2 changed files with 21 additions and 14 deletions

View File

@ -22,6 +22,7 @@ type HttpSinkConfig struct {
Timeout string `json:"timeout,omitempty"` Timeout string `json:"timeout,omitempty"`
MaxIdleConns int `json:"max_idle_connections,omitempty"` MaxIdleConns int `json:"max_idle_connections,omitempty"`
IdleConnTimeout string `json:"idle_connection_timeout,omitempty"` IdleConnTimeout string `json:"idle_connection_timeout,omitempty"`
BatchSize int `json:"batch_size,omitempty"`
} }
type HttpSink struct { type HttpSink struct {
@ -34,14 +35,19 @@ type HttpSink struct {
maxIdleConns int maxIdleConns int
idleConnTimeout time.Duration idleConnTimeout time.Duration
timeout time.Duration timeout time.Duration
batchCounter int
} }
func (s *HttpSink) Init(config json.RawMessage) error { func (s *HttpSink) Init(config json.RawMessage) error {
// Set default values
s.name = "HttpSink" s.name = "HttpSink"
s.config.SSL = false s.config.SSL = false
s.config.MaxIdleConns = 10 s.config.MaxIdleConns = 10
s.config.IdleConnTimeout = "5s" s.config.IdleConnTimeout = "5s"
s.config.Timeout = "5s" s.config.Timeout = "5s"
s.config.BatchSize = 20
// Read config
if len(config) > 0 { if len(config) > 0 {
err := json.Unmarshal(config, &s.config) err := json.Unmarshal(config, &s.config)
if err != nil { if err != nil {
@ -87,27 +93,40 @@ func (s *HttpSink) Init(config json.RawMessage) error {
func (s *HttpSink) Write(m lp.CCMetric) error { func (s *HttpSink) Write(m lp.CCMetric) error {
p := m.ToPoint(s.config.MetaAsTags) p := m.ToPoint(s.config.MetaAsTags)
_, err := s.encoder.Encode(p) _, err := s.encoder.Encode(p)
// Flush when received more metrics than batch size
s.batchCounter++
if s.batchCounter > s.config.BatchSize {
s.Flush()
}
return err return err
} }
func (s *HttpSink) Flush() error { func (s *HttpSink) Flush() error {
// Create new request to send buffer
req, err := http.NewRequest(http.MethodPost, s.url, s.buffer) req, err := http.NewRequest(http.MethodPost, s.url, s.buffer)
if err != nil { if err != nil {
return err return err
} }
// Set authorization header
if len(s.jwt) != 0 { if len(s.jwt) != 0 {
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", s.jwt)) req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", s.jwt))
} }
// Send
res, err := s.client.Do(req) res, err := s.client.Do(req)
// Clear buffer
s.buffer.Reset() s.buffer.Reset()
// Handle error code
if err != nil { if err != nil {
return err return err
} }
if res.StatusCode != 200 { // Handle status code
if res.StatusCode != http.StatusOK {
return errors.New(res.Status) return errors.New(res.Status)
} }
@ -115,5 +134,6 @@ func (s *HttpSink) Flush() error {
} }
func (s *HttpSink) Close() { func (s *HttpSink) Close() {
s.Flush()
s.client.CloseIdleConnections() s.client.CloseIdleConnections()
} }

View File

@ -82,8 +82,6 @@ func (sm *sinkManager) Init(wg *sync.WaitGroup, sinkConfigFile string) error {
// Start starts the sink managers background task, which // Start starts the sink managers background task, which
// distributes received metrics to the sinks // distributes received metrics to the sinks
func (sm *sinkManager) Start() { func (sm *sinkManager) Start() {
batchcount := 20
sm.wg.Add(1) sm.wg.Add(1)
go func() { go func() {
defer sm.wg.Done() defer sm.wg.Done()
@ -91,7 +89,6 @@ func (sm *sinkManager) Start() {
// Sink manager is done // Sink manager is done
done := func() { done := func() {
for _, s := range sm.sinks { for _, s := range sm.sinks {
s.Flush()
s.Close() s.Close()
} }
@ -111,16 +108,6 @@ func (sm *sinkManager) Start() {
for _, s := range sm.sinks { for _, s := range sm.sinks {
s.Write(p) s.Write(p)
} }
// Flush all outputs
if batchcount == 0 {
cclog.ComponentDebug("SinkManager", "FLUSH")
for _, s := range sm.sinks {
s.Flush()
}
batchcount = 20
}
batchcount--
} }
} }
}() }()