Add timeout options to httpSink

This commit is contained in:
Thomas Roehl 2022-02-08 18:06:07 +01:00
parent fec3c5981d
commit 7f78a5baf2
2 changed files with 49 additions and 15 deletions

View File

@ -19,6 +19,9 @@ type HttpSinkConfig struct {
Database string `json:"database,omitempty"` Database string `json:"database,omitempty"`
JWT string `json:"jwt,omitempty"` JWT string `json:"jwt,omitempty"`
SSL bool `json:"ssl,omitempty"` SSL bool `json:"ssl,omitempty"`
Timeout string `json:"timeout,omitempty"`
MaxIdleConns int `json:"max_idle_connections,omitempty"`
IdleConnTimeout string `json:"idle_connection_timeout,omitempty"`
} }
type HttpSink struct { type HttpSink struct {
@ -28,11 +31,17 @@ type HttpSink struct {
encoder *influx.Encoder encoder *influx.Encoder
buffer *bytes.Buffer buffer *bytes.Buffer
config HttpSinkConfig config HttpSinkConfig
maxIdleConns int
idleConnTimeout time.Duration
timeout time.Duration
} }
func (s *HttpSink) Init(config json.RawMessage) error { func (s *HttpSink) Init(config json.RawMessage) error {
s.name = "HttpSink" s.name = "HttpSink"
s.config.SSL = false s.config.SSL = false
s.config.MaxIdleConns = 10
s.config.IdleConnTimeout = "5s"
s.config.Timeout = "5s"
if len(config) > 0 { if len(config) > 0 {
err := json.Unmarshal(config, &s.config) err := json.Unmarshal(config, &s.config)
if err != nil { if err != nil {
@ -42,8 +51,26 @@ func (s *HttpSink) Init(config json.RawMessage) error {
if len(s.config.Host) == 0 || len(s.config.Port) == 0 || len(s.config.Database) == 0 { if len(s.config.Host) == 0 || len(s.config.Port) == 0 || len(s.config.Database) == 0 {
return errors.New("`host`, `port` and `database` config options required for TCP sink") return errors.New("`host`, `port` and `database` config options required for TCP sink")
} }
if s.config.MaxIdleConns > 0 {
s.client = &http.Client{} s.maxIdleConns = s.config.MaxIdleConns
}
if len(s.config.IdleConnTimeout) > 0 {
t, err := time.ParseDuration(s.config.IdleConnTimeout)
if err == nil {
s.idleConnTimeout = t
}
}
if len(s.config.Timeout) > 0 {
t, err := time.ParseDuration(s.config.Timeout)
if err == nil {
s.timeout = t
}
}
tr := &http.Transport{
MaxIdleConns: s.maxIdleConns,
IdleConnTimeout: s.idleConnTimeout,
}
s.client = &http.Client{Transport: tr, Timeout: s.timeout}
proto := "http" proto := "http"
if s.config.SSL { if s.config.SSL {
proto = "https" proto = "https"
@ -58,7 +85,8 @@ func (s *HttpSink) Init(config json.RawMessage) error {
} }
func (s *HttpSink) Write(m lp.CCMetric) error { func (s *HttpSink) Write(m lp.CCMetric) error {
_, err := s.encoder.Encode(m.ToPoint(s.config.MetaAsTags)) p := m.ToPoint(s.config.MetaAsTags)
_, err := s.encoder.Encode(p)
return err return err
} }

View File

@ -13,7 +13,10 @@ The `http` sink uses POST requests to a HTTP server to submit the metrics in the
"host": "dbhost.example.com", "host": "dbhost.example.com",
"port": "4222", "port": "4222",
"jwt" : "0x0000q231", "jwt" : "0x0000q231",
"ssl" : false "ssl" : false,
"timeout": "5s",
"max_idle_connections" : 10,
"idle_connection_timeout" : "5s"
} }
} }
``` ```
@ -25,3 +28,6 @@ The `http` sink uses POST requests to a HTTP server to submit the metrics in the
- `port`: Portnumber (as string) of the InfluxDB database server - `port`: Portnumber (as string) of the InfluxDB database server
- `jwt`: JSON web tokens for authentification - `jwt`: JSON web tokens for authentification
- `ssl`: Activate SSL encryption - `ssl`: Activate SSL encryption
- `timeout`: General timeout for the HTTP client (default '5s')
- `max_idle_connections`: Maximally idle connections (default 10)
- `idle_connection_timeout`: Timeout for idle connections (default '5s')