From 485223c5901cdddf0020a2740ba982b69d929482 Mon Sep 17 00:00:00 2001 From: Lou Knauer Date: Tue, 12 Oct 2021 13:44:38 +0200 Subject: [PATCH] Add new httpSink This sink is compatible with the HTTP API of cc-metric-store. Example config.json section: ``` "sink": { "type": "http", "host": "localhost", "port": "8080", "database": "api/write", "password": "" }, ``` The password/JWT can be omitted. --- README.md | 2 +- metric-collector.go | 1 + sinks/README.md | 1 + sinks/httpSink.go | 68 +++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 71 insertions(+), 1 deletion(-) create mode 100644 sinks/httpSink.go diff --git a/README.md b/README.md index ec2b263..b1f4aac 100644 --- a/README.md +++ b/README.md @@ -46,7 +46,7 @@ The `interval` defines how often the metrics should be read and send to the sink All available collectors are listed in the above JSON. A more detailed list can be found in the [README for collectors](./collectors/README.md). -There are currently three sinks supported `influxdb`, `nats` and `stdout`. See [README for sinks](./sinks/README.md). +There are currently four sinks supported `influxdb`, `nats`, `http` and `stdout`. See [README for sinks](./sinks/README.md). In the `default_tags` section, one can define key-value-pairs (only strings) that are added to each sent out metric. This can be useful for cluster names like in the example JSON or information like rank or island for orientation. diff --git a/metric-collector.go b/metric-collector.go index ddaa865..ff67f3c 100644 --- a/metric-collector.go +++ b/metric-collector.go @@ -36,6 +36,7 @@ var Sinks = map[string]sinks.SinkFuncs{ "influxdb": &sinks.InfluxSink{}, "stdout": &sinks.StdoutSink{}, "nats": &sinks.NatsSink{}, + "http": &sinks.HttpSink{}, } var Receivers = map[string]receivers.ReceiverFuncs{ diff --git a/sinks/README.md b/sinks/README.md index 03856ba..43dd802 100644 --- a/sinks/README.md +++ b/sinks/README.md @@ -7,6 +7,7 @@ The base class/configuration is located in `metricSink.go`. * `stdoutSink.go`: Writes all metrics to `stdout` in InfluxDB line protocol. The sink does not use https://github.com/influxdata/line-protocol to reduce the executed code for debugging * `influxSink.go`: Writes all metrics to an InfluxDB database instance using a blocking writer. It uses https://github.com/influxdata/influxdb-client-go . Configuration for the server, port, ssl, password, database name and organisation are in the global configuration file. The 'password' is used for the token and the 'database' for the bucket. It uses the v2 API of Influx. * `natsSink.go`: Sends all metrics to an NATS server using the InfluxDB line protocol as encoding. It uses https://github.com/nats-io/nats.go . Configuration for the server, port, user, password and database name are in the global configuration file. The database name is used as subject for the NATS messages. +* `httpSink.go`: Sends all metrics to an HTTP endpoint `http://:/` using a POST request. The body of the request will consist of lines in the InfluxDB line protocol. In case password is specified, that password is used as a JWT in the 'Authorization' header. # Installation Nothing to do, all sinks are pure Go code diff --git a/sinks/httpSink.go b/sinks/httpSink.go new file mode 100644 index 0000000..e443ceb --- /dev/null +++ b/sinks/httpSink.go @@ -0,0 +1,68 @@ +package sinks + +import ( + "bytes" + "errors" + "fmt" + "net/http" + "time" + + lp "github.com/influxdata/line-protocol" +) + +type HttpSink struct { + Sink + client *http.Client + url, jwt string + encoder *lp.Encoder + buffer *bytes.Buffer +} + +func (s *HttpSink) Init(config SinkConfig) error { + if len(config.Host) == 0 || len(config.Port) == 0 { + return errors.New("`host`, `port` and `database` config options required for TCP sink") + } + + s.client = &http.Client{} + s.url = fmt.Sprintf("http://%s:%s/%s", config.Host, config.Port, config.Database) + s.port = config.Port + s.jwt = config.Password + s.buffer = &bytes.Buffer{} + s.encoder = lp.NewEncoder(s.buffer) + s.encoder.SetPrecision(time.Second) + + return nil +} + +func (s *HttpSink) Write(point lp.MutableMetric) error { + _, err := s.encoder.Encode(point) + return err +} + +func (s *HttpSink) Flush() error { + req, err := http.NewRequest(http.MethodPost, s.url, s.buffer) + if err != nil { + return err + } + + if len(s.jwt) != 0 { + req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", s.jwt)) + } + + res, err := s.client.Do(req) + s.buffer.Reset() + + if err != nil { + return err + } + + if res.StatusCode != 200 { + return errors.New(res.Status) + } + + return nil +} + +func (s *HttpSink) Close() { + s.client.CloseIdleConnections() +}