diff --git a/sinks/httpSink.go b/sinks/httpSink.go index 22ecfce..ce46bab 100644 --- a/sinks/httpSink.go +++ b/sinks/httpSink.go @@ -6,49 +6,45 @@ import ( "errors" "fmt" "net/http" + "sync" "time" + cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" influx "github.com/influxdata/line-protocol" ) type HttpSinkConfig struct { defaultSinkConfig - Host string `json:"host,omitempty"` - Port string `json:"port,omitempty"` - Database string `json:"database,omitempty"` + URL string `json:"url,omitempty"` JWT string `json:"jwt,omitempty"` - SSL bool `json:"ssl,omitempty"` Timeout string `json:"timeout,omitempty"` MaxIdleConns int `json:"max_idle_connections,omitempty"` IdleConnTimeout string `json:"idle_connection_timeout,omitempty"` - BatchSize int `json:"batch_size,omitempty"` + FlushDelay string `json:"flush_delay,omitempty"` } type HttpSink struct { sink client *http.Client - url, jwt string encoder *influx.Encoder + lock sync.Mutex // Flush() runs in another goroutine, so this lock has to protect the buffer buffer *bytes.Buffer + flushTimer *time.Timer config HttpSinkConfig maxIdleConns int idleConnTimeout time.Duration timeout time.Duration - batchCounter int + flushDelay time.Duration } func (s *HttpSink) Init(config json.RawMessage) error { // Set default values s.name = "HttpSink" - s.config.SSL = false s.config.MaxIdleConns = 10 s.config.IdleConnTimeout = "5s" s.config.Timeout = "5s" - s.config.BatchSize = 20 - - // Reset counter - s.batchCounter = 0 + s.config.FlushDelay = "1s" // Read config if len(config) > 0 { @@ -57,8 +53,8 @@ func (s *HttpSink) Init(config json.RawMessage) error { return err } } - if len(s.config.Host) == 0 || len(s.config.Port) == 0 || len(s.config.Database) == 0 { - return errors.New("`host`, `port` and `database` config options required for TCP sink") + if len(s.config.URL) == 0 { + return errors.New("`url` config option is required for HTTP sink") } if s.config.MaxIdleConns > 0 { s.maxIdleConns = s.config.MaxIdleConns @@ -75,17 +71,17 @@ func (s *HttpSink) Init(config json.RawMessage) error { s.timeout = t } } + if len(s.config.FlushDelay) > 0 { + t, err := time.ParseDuration(s.config.FlushDelay) + if err == nil { + s.flushDelay = t + } + } tr := &http.Transport{ MaxIdleConns: s.maxIdleConns, IdleConnTimeout: s.idleConnTimeout, } s.client = &http.Client{Transport: tr, Timeout: s.timeout} - proto := "http" - if s.config.SSL { - proto = "https" - } - s.url = fmt.Sprintf("%s://%s:%s/%s", proto, s.config.Host, s.config.Port, s.config.Database) - s.jwt = s.config.JWT s.buffer = &bytes.Buffer{} s.encoder = influx.NewEncoder(s.buffer) s.encoder.SetPrecision(time.Second) @@ -94,35 +90,57 @@ func (s *HttpSink) Init(config json.RawMessage) error { } func (s *HttpSink) Write(m lp.CCMetric) error { - p := m.ToPoint(s.config.MetaAsTags) - _, err := s.encoder.Encode(p) + if s.buffer.Len() == 0 && s.flushDelay != 0 { + // This is the first write since the last flush, start the flushTimer! + if s.flushTimer != nil && s.flushTimer.Stop() { + cclog.ComponentDebug("HttpSink", "unexpected: the flushTimer was already running?") + } - // Flush when received more metrics than batch size - s.batchCounter++ - if s.batchCounter > s.config.BatchSize { - s.Flush() + // Run a batched flush for all lines that have arrived in the last second + s.flushTimer = time.AfterFunc(s.flushDelay, func() { + if err := s.Flush(); err != nil { + cclog.ComponentError("HttpSink", "flush failed:", err.Error()) + } + }) } + + p := m.ToPoint(s.config.MetaAsTags) + + s.lock.Lock() + _, err := s.encoder.Encode(p) + s.lock.Unlock() // defer does not work here as Flush() takes the lock as well + + if err != nil { + return err + } + + // Flush synchronously if "flush_delay" is zero + if s.flushDelay == 0 { + return s.Flush() + } + return err } func (s *HttpSink) Flush() error { + // buffer is read by client.Do, prevent concurrent modifications + s.lock.Lock() + defer s.lock.Unlock() + // Do not flush empty buffer - if s.batchCounter == 0 { + if s.buffer.Len() == 0 { return nil } - // Reset counter - s.batchCounter = 0 - // Create new request to send buffer - req, err := http.NewRequest(http.MethodPost, s.url, s.buffer) + req, err := http.NewRequest(http.MethodPost, s.config.URL, s.buffer) if err != nil { return err } // Set authorization header - if len(s.jwt) != 0 { - req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", s.jwt)) + if len(s.config.JWT) != 0 { + req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", s.config.JWT)) } // Send @@ -131,12 +149,12 @@ func (s *HttpSink) Flush() error { // Clear buffer s.buffer.Reset() - // Handle error code + // Handle transport/tcp errors if err != nil { return err } - // Handle status code + // Handle application errors if res.StatusCode != http.StatusOK { return errors.New(res.Status) } @@ -145,6 +163,9 @@ func (s *HttpSink) Flush() error { } func (s *HttpSink) Close() { - s.Flush() + s.flushTimer.Stop() + if err := s.Flush(); err != nil { + cclog.ComponentError("HttpSink", "flush failed:", err.Error()) + } s.client.CloseIdleConnections() } diff --git a/sinks/httpSink.md b/sinks/httpSink.md index fe466e8..23203a2 100644 --- a/sinks/httpSink.md +++ b/sinks/httpSink.md @@ -9,25 +9,21 @@ The `http` sink uses POST requests to a HTTP server to submit the metrics in the "": { "type": "http", "meta_as_tags" : true, - "database" : "mymetrics", - "host": "dbhost.example.com", - "port": "4222", - "jwt" : "0x0000q231", - "ssl" : false, + "url" : "https://my-monitoring.example.com:1234/api/write", + "jwt" : "blabla.blabla.blabla", "timeout": "5s", "max_idle_connections" : 10, - "idle_connection_timeout" : "5s" + "idle_connection_timeout" : "5s", + "flush_delay": "2s", } } ``` - `type`: makes the sink an `http` sink - `meta_as_tags`: print all meta information as tags in the output (optional) -- `database`: All metrics are written to this bucket -- `host`: Hostname of the InfluxDB database server -- `port`: Portnumber (as string) of the InfluxDB database server -- `jwt`: JSON web tokens for authentification -- `ssl`: Activate SSL encryption +- `url`: The full URL of the endpoint +- `jwt`: JSON web tokens for authentification (Using the *Bearer* scheme) - `timeout`: General timeout for the HTTP client (default '5s') - `max_idle_connections`: Maximally idle connections (default 10) -- `idle_connection_timeout`: Timeout for idle connections (default '5s') \ No newline at end of file +- `idle_connection_timeout`: Timeout for idle connections (default '5s') +- `flush_delay`: Batch all writes arriving in during this duration (default '1s', batching can be disabled by setting it to 0) diff --git a/sinks/sinkManager.go b/sinks/sinkManager.go index 80877c8..ff6e01d 100644 --- a/sinks/sinkManager.go +++ b/sinks/sinkManager.go @@ -106,7 +106,9 @@ func (sm *sinkManager) Start() { // Send received metric to all outputs cclog.ComponentDebug("SinkManager", "WRITE", p) for _, s := range sm.sinks { - s.Write(p) + if err := s.Write(p); err != nil { + cclog.ComponentError("SinkManager", "WRITE", s.Name(), "write failed:", err.Error()) + } } } } @@ -131,7 +133,7 @@ func (sm *sinkManager) AddOutput(name string, rawConfig json.RawMessage) error { } } if _, found := AvailableSinks[sinkConfig.Type]; !found { - cclog.ComponentError("SinkManager", "SKIP", name, "unknown sink:", err.Error()) + cclog.ComponentError("SinkManager", "SKIP", name, "unknown sink:", sinkConfig.Type) return err } s := AvailableSinks[sinkConfig.Type]