mirror of
https://github.com/ClusterCockpit/cc-metric-collector.git
synced 2024-11-10 04:27:25 +01:00
Add batch_size option to httpSink
This commit is contained in:
parent
38d4e0a730
commit
feb8f16af8
@ -22,6 +22,7 @@ type HttpSinkConfig struct {
|
|||||||
MaxIdleConns int `json:"max_idle_connections,omitempty"`
|
MaxIdleConns int `json:"max_idle_connections,omitempty"`
|
||||||
IdleConnTimeout string `json:"idle_connection_timeout,omitempty"`
|
IdleConnTimeout string `json:"idle_connection_timeout,omitempty"`
|
||||||
FlushDelay string `json:"flush_delay,omitempty"`
|
FlushDelay string `json:"flush_delay,omitempty"`
|
||||||
|
BatchSize int `json:"batch_size,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type HttpSink struct {
|
type HttpSink struct {
|
||||||
@ -36,6 +37,7 @@ type HttpSink struct {
|
|||||||
idleConnTimeout time.Duration
|
idleConnTimeout time.Duration
|
||||||
timeout time.Duration
|
timeout time.Duration
|
||||||
flushDelay time.Duration
|
flushDelay time.Duration
|
||||||
|
batchSize int
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *HttpSink) Write(m lp.CCMetric) error {
|
func (s *HttpSink) Write(m lp.CCMetric) error {
|
||||||
@ -57,6 +59,7 @@ func (s *HttpSink) Write(m lp.CCMetric) error {
|
|||||||
|
|
||||||
s.lock.Lock()
|
s.lock.Lock()
|
||||||
_, err := s.encoder.Encode(p)
|
_, err := s.encoder.Encode(p)
|
||||||
|
s.batchSize++
|
||||||
s.lock.Unlock() // defer does not work here as Flush() takes the lock as well
|
s.lock.Unlock() // defer does not work here as Flush() takes the lock as well
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -68,6 +71,9 @@ func (s *HttpSink) Write(m lp.CCMetric) error {
|
|||||||
if s.flushDelay == 0 {
|
if s.flushDelay == 0 {
|
||||||
return s.Flush()
|
return s.Flush()
|
||||||
}
|
}
|
||||||
|
if s.batchSize == s.config.BatchSize {
|
||||||
|
return s.Flush()
|
||||||
|
}
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -99,6 +105,7 @@ func (s *HttpSink) Flush() error {
|
|||||||
|
|
||||||
// Clear buffer
|
// Clear buffer
|
||||||
s.buffer.Reset()
|
s.buffer.Reset()
|
||||||
|
s.batchSize = 0
|
||||||
|
|
||||||
// Handle transport/tcp errors
|
// Handle transport/tcp errors
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -132,6 +139,7 @@ func NewHttpSink(name string, config json.RawMessage) (Sink, error) {
|
|||||||
s.config.IdleConnTimeout = "5s"
|
s.config.IdleConnTimeout = "5s"
|
||||||
s.config.Timeout = "5s"
|
s.config.Timeout = "5s"
|
||||||
s.config.FlushDelay = "1s"
|
s.config.FlushDelay = "1s"
|
||||||
|
s.config.BatchSize = 100
|
||||||
|
|
||||||
// Read config
|
// Read config
|
||||||
if len(config) > 0 {
|
if len(config) > 0 {
|
||||||
|
@ -15,6 +15,7 @@ The `http` sink uses POST requests to a HTTP server to submit the metrics in the
|
|||||||
"max_idle_connections" : 10,
|
"max_idle_connections" : 10,
|
||||||
"idle_connection_timeout" : "5s",
|
"idle_connection_timeout" : "5s",
|
||||||
"flush_delay": "2s",
|
"flush_delay": "2s",
|
||||||
|
"batch_size" : 100
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
@ -27,3 +28,4 @@ The `http` sink uses POST requests to a HTTP server to submit the metrics in the
|
|||||||
- `max_idle_connections`: Maximally idle connections (default 10)
|
- `max_idle_connections`: Maximally idle connections (default 10)
|
||||||
- `idle_connection_timeout`: Timeout for idle connections (default '5s')
|
- `idle_connection_timeout`: Timeout for idle connections (default '5s')
|
||||||
- `flush_delay`: Batch all writes arriving in during this duration (default '1s', batching can be disabled by setting it to 0)
|
- `flush_delay`: Batch all writes arriving in during this duration (default '1s', batching can be disabled by setting it to 0)
|
||||||
|
- `batch_size`: Maximal number of batched metrics. Either it is flushed because batch size or the `flush_delay` is reached
|
||||||
|
Loading…
Reference in New Issue
Block a user