mirror of
				https://github.com/ClusterCockpit/cc-metric-collector.git
				synced 2025-11-04 10:45:06 +01:00 
			
		
		
		
	Automatically flush batched writes in the HTTP sink (#31)
* Add error handling for Sink.Write * simplify HttpSink config * HttpSink: dynamically sized batches flushed after timer * fix panic if sink type does not exist
This commit is contained in:
		@@ -6,49 +6,45 @@ import (
 | 
			
		||||
	"errors"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"net/http"
 | 
			
		||||
	"sync"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
 | 
			
		||||
	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
 | 
			
		||||
	influx "github.com/influxdata/line-protocol"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type HttpSinkConfig struct {
 | 
			
		||||
	defaultSinkConfig
 | 
			
		||||
	Host            string `json:"host,omitempty"`
 | 
			
		||||
	Port            string `json:"port,omitempty"`
 | 
			
		||||
	Database        string `json:"database,omitempty"`
 | 
			
		||||
	URL             string `json:"url,omitempty"`
 | 
			
		||||
	JWT             string `json:"jwt,omitempty"`
 | 
			
		||||
	SSL             bool   `json:"ssl,omitempty"`
 | 
			
		||||
	Timeout         string `json:"timeout,omitempty"`
 | 
			
		||||
	MaxIdleConns    int    `json:"max_idle_connections,omitempty"`
 | 
			
		||||
	IdleConnTimeout string `json:"idle_connection_timeout,omitempty"`
 | 
			
		||||
	BatchSize       int    `json:"batch_size,omitempty"`
 | 
			
		||||
	FlushDelay      string `json:"flush_delay,omitempty"`
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type HttpSink struct {
 | 
			
		||||
	sink
 | 
			
		||||
	client          *http.Client
 | 
			
		||||
	url, jwt        string
 | 
			
		||||
	encoder         *influx.Encoder
 | 
			
		||||
	lock            sync.Mutex // Flush() runs in another goroutine, so this lock has to protect the buffer
 | 
			
		||||
	buffer          *bytes.Buffer
 | 
			
		||||
	flushTimer      *time.Timer
 | 
			
		||||
	config          HttpSinkConfig
 | 
			
		||||
	maxIdleConns    int
 | 
			
		||||
	idleConnTimeout time.Duration
 | 
			
		||||
	timeout         time.Duration
 | 
			
		||||
	batchCounter    int
 | 
			
		||||
	flushDelay      time.Duration
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *HttpSink) Init(config json.RawMessage) error {
 | 
			
		||||
	// Set default values
 | 
			
		||||
	s.name = "HttpSink"
 | 
			
		||||
	s.config.SSL = false
 | 
			
		||||
	s.config.MaxIdleConns = 10
 | 
			
		||||
	s.config.IdleConnTimeout = "5s"
 | 
			
		||||
	s.config.Timeout = "5s"
 | 
			
		||||
	s.config.BatchSize = 20
 | 
			
		||||
 | 
			
		||||
	// Reset counter
 | 
			
		||||
	s.batchCounter = 0
 | 
			
		||||
	s.config.FlushDelay = "1s"
 | 
			
		||||
 | 
			
		||||
	// Read config
 | 
			
		||||
	if len(config) > 0 {
 | 
			
		||||
@@ -57,8 +53,8 @@ func (s *HttpSink) Init(config json.RawMessage) error {
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	if len(s.config.Host) == 0 || len(s.config.Port) == 0 || len(s.config.Database) == 0 {
 | 
			
		||||
		return errors.New("`host`, `port` and `database` config options required for TCP sink")
 | 
			
		||||
	if len(s.config.URL) == 0 {
 | 
			
		||||
		return errors.New("`url` config option is required for HTTP sink")
 | 
			
		||||
	}
 | 
			
		||||
	if s.config.MaxIdleConns > 0 {
 | 
			
		||||
		s.maxIdleConns = s.config.MaxIdleConns
 | 
			
		||||
@@ -75,17 +71,17 @@ func (s *HttpSink) Init(config json.RawMessage) error {
 | 
			
		||||
			s.timeout = t
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	if len(s.config.FlushDelay) > 0 {
 | 
			
		||||
		t, err := time.ParseDuration(s.config.FlushDelay)
 | 
			
		||||
		if err == nil {
 | 
			
		||||
			s.flushDelay = t
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	tr := &http.Transport{
 | 
			
		||||
		MaxIdleConns:    s.maxIdleConns,
 | 
			
		||||
		IdleConnTimeout: s.idleConnTimeout,
 | 
			
		||||
	}
 | 
			
		||||
	s.client = &http.Client{Transport: tr, Timeout: s.timeout}
 | 
			
		||||
	proto := "http"
 | 
			
		||||
	if s.config.SSL {
 | 
			
		||||
		proto = "https"
 | 
			
		||||
	}
 | 
			
		||||
	s.url = fmt.Sprintf("%s://%s:%s/%s", proto, s.config.Host, s.config.Port, s.config.Database)
 | 
			
		||||
	s.jwt = s.config.JWT
 | 
			
		||||
	s.buffer = &bytes.Buffer{}
 | 
			
		||||
	s.encoder = influx.NewEncoder(s.buffer)
 | 
			
		||||
	s.encoder.SetPrecision(time.Second)
 | 
			
		||||
@@ -94,35 +90,57 @@ func (s *HttpSink) Init(config json.RawMessage) error {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *HttpSink) Write(m lp.CCMetric) error {
 | 
			
		||||
	p := m.ToPoint(s.config.MetaAsTags)
 | 
			
		||||
	_, err := s.encoder.Encode(p)
 | 
			
		||||
	if s.buffer.Len() == 0 && s.flushDelay != 0 {
 | 
			
		||||
		// This is the first write since the last flush, start the flushTimer!
 | 
			
		||||
		if s.flushTimer != nil && s.flushTimer.Stop() {
 | 
			
		||||
			cclog.ComponentDebug("HttpSink", "unexpected: the flushTimer was already running?")
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
	// Flush when received more metrics than batch size
 | 
			
		||||
	s.batchCounter++
 | 
			
		||||
	if s.batchCounter > s.config.BatchSize {
 | 
			
		||||
		s.Flush()
 | 
			
		||||
		// Run a batched flush for all lines that have arrived in the last second
 | 
			
		||||
		s.flushTimer = time.AfterFunc(s.flushDelay, func() {
 | 
			
		||||
			if err := s.Flush(); err != nil {
 | 
			
		||||
				cclog.ComponentError("HttpSink", "flush failed:", err.Error())
 | 
			
		||||
			}
 | 
			
		||||
		})
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	p := m.ToPoint(s.config.MetaAsTags)
 | 
			
		||||
 | 
			
		||||
	s.lock.Lock()
 | 
			
		||||
	_, err := s.encoder.Encode(p)
 | 
			
		||||
	s.lock.Unlock() // defer does not work here as Flush() takes the lock as well
 | 
			
		||||
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Flush synchronously if "flush_delay" is zero
 | 
			
		||||
	if s.flushDelay == 0 {
 | 
			
		||||
		return s.Flush()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *HttpSink) Flush() error {
 | 
			
		||||
	// buffer is read by client.Do, prevent concurrent modifications
 | 
			
		||||
	s.lock.Lock()
 | 
			
		||||
	defer s.lock.Unlock()
 | 
			
		||||
 | 
			
		||||
	// Do not flush empty buffer
 | 
			
		||||
	if s.batchCounter == 0 {
 | 
			
		||||
	if s.buffer.Len() == 0 {
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Reset counter
 | 
			
		||||
	s.batchCounter = 0
 | 
			
		||||
 | 
			
		||||
	// Create new request to send buffer
 | 
			
		||||
	req, err := http.NewRequest(http.MethodPost, s.url, s.buffer)
 | 
			
		||||
	req, err := http.NewRequest(http.MethodPost, s.config.URL, s.buffer)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Set authorization header
 | 
			
		||||
	if len(s.jwt) != 0 {
 | 
			
		||||
		req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", s.jwt))
 | 
			
		||||
	if len(s.config.JWT) != 0 {
 | 
			
		||||
		req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", s.config.JWT))
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Send
 | 
			
		||||
@@ -131,12 +149,12 @@ func (s *HttpSink) Flush() error {
 | 
			
		||||
	// Clear buffer
 | 
			
		||||
	s.buffer.Reset()
 | 
			
		||||
 | 
			
		||||
	// Handle error code
 | 
			
		||||
	// Handle transport/tcp errors
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Handle status code
 | 
			
		||||
	// Handle application errors
 | 
			
		||||
	if res.StatusCode != http.StatusOK {
 | 
			
		||||
		return errors.New(res.Status)
 | 
			
		||||
	}
 | 
			
		||||
@@ -145,6 +163,9 @@ func (s *HttpSink) Flush() error {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *HttpSink) Close() {
 | 
			
		||||
	s.Flush()
 | 
			
		||||
	s.flushTimer.Stop()
 | 
			
		||||
	if err := s.Flush(); err != nil {
 | 
			
		||||
		cclog.ComponentError("HttpSink", "flush failed:", err.Error())
 | 
			
		||||
	}
 | 
			
		||||
	s.client.CloseIdleConnections()
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -9,25 +9,21 @@ The `http` sink uses POST requests to a HTTP server to submit the metrics in the
 | 
			
		||||
  "<name>": {
 | 
			
		||||
    "type": "http",
 | 
			
		||||
    "meta_as_tags" : true,
 | 
			
		||||
    "database" : "mymetrics",
 | 
			
		||||
    "host": "dbhost.example.com",
 | 
			
		||||
    "port": "4222",
 | 
			
		||||
    "jwt" : "0x0000q231",
 | 
			
		||||
    "ssl" : false,
 | 
			
		||||
    "url" : "https://my-monitoring.example.com:1234/api/write",
 | 
			
		||||
    "jwt" : "blabla.blabla.blabla",
 | 
			
		||||
    "timeout": "5s",
 | 
			
		||||
    "max_idle_connections" : 10,
 | 
			
		||||
    "idle_connection_timeout" : "5s"
 | 
			
		||||
    "idle_connection_timeout" : "5s",
 | 
			
		||||
    "flush_delay": "2s",
 | 
			
		||||
  }
 | 
			
		||||
}
 | 
			
		||||
```
 | 
			
		||||
 | 
			
		||||
- `type`: makes the sink an `http` sink
 | 
			
		||||
- `meta_as_tags`: print all meta information as tags in the output (optional)
 | 
			
		||||
- `database`: All metrics are written to this bucket 
 | 
			
		||||
- `host`: Hostname of the InfluxDB database server
 | 
			
		||||
- `port`: Portnumber (as string) of the InfluxDB database server
 | 
			
		||||
- `jwt`: JSON web tokens for authentification
 | 
			
		||||
- `ssl`: Activate SSL encryption
 | 
			
		||||
- `url`: The full URL of the endpoint
 | 
			
		||||
- `jwt`: JSON web tokens for authentification (Using the *Bearer* scheme)
 | 
			
		||||
- `timeout`: General timeout for the HTTP client (default '5s')
 | 
			
		||||
- `max_idle_connections`: Maximally idle connections (default 10)
 | 
			
		||||
- `idle_connection_timeout`: Timeout for idle connections (default '5s')
 | 
			
		||||
- `flush_delay`: Batch all writes arriving in during this duration (default '1s', batching can be disabled by setting it to 0)
 | 
			
		||||
 
 | 
			
		||||
@@ -106,7 +106,9 @@ func (sm *sinkManager) Start() {
 | 
			
		||||
				// Send received metric to all outputs
 | 
			
		||||
				cclog.ComponentDebug("SinkManager", "WRITE", p)
 | 
			
		||||
				for _, s := range sm.sinks {
 | 
			
		||||
					s.Write(p)
 | 
			
		||||
					if err := s.Write(p); err != nil {
 | 
			
		||||
						cclog.ComponentError("SinkManager", "WRITE", s.Name(), "write failed:", err.Error())
 | 
			
		||||
					}
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
@@ -131,7 +133,7 @@ func (sm *sinkManager) AddOutput(name string, rawConfig json.RawMessage) error {
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	if _, found := AvailableSinks[sinkConfig.Type]; !found {
 | 
			
		||||
		cclog.ComponentError("SinkManager", "SKIP", name, "unknown sink:", err.Error())
 | 
			
		||||
		cclog.ComponentError("SinkManager", "SKIP", name, "unknown sink:", sinkConfig.Type)
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	s := AvailableSinks[sinkConfig.Type]
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user