mirror of
https://github.com/ClusterCockpit/cc-metric-collector.git
synced 2025-01-24 21:09:06 +01:00
Add new httpSink
This sink is compatible with the HTTP API of cc-metric-store. Example config.json section: ``` "sink": { "type": "http", "host": "localhost", "port": "8080", "database": "api/write", "password": "<JWT>" }, ``` The password/JWT can be omitted.
This commit is contained in:
parent
cdc1811576
commit
485223c590
@ -46,7 +46,7 @@ The `interval` defines how often the metrics should be read and send to the sink
|
||||
|
||||
All available collectors are listed in the above JSON. A more detailed list can be found in the [README for collectors](./collectors/README.md).
|
||||
|
||||
There are currently three sinks supported `influxdb`, `nats` and `stdout`. See [README for sinks](./sinks/README.md).
|
||||
There are currently four sinks supported `influxdb`, `nats`, `http` and `stdout`. See [README for sinks](./sinks/README.md).
|
||||
|
||||
In the `default_tags` section, one can define key-value-pairs (only strings) that are added to each sent out metric. This can be useful for cluster names like in the example JSON or information like rank or island for orientation.
|
||||
|
||||
|
@ -36,6 +36,7 @@ var Sinks = map[string]sinks.SinkFuncs{
|
||||
"influxdb": &sinks.InfluxSink{},
|
||||
"stdout": &sinks.StdoutSink{},
|
||||
"nats": &sinks.NatsSink{},
|
||||
"http": &sinks.HttpSink{},
|
||||
}
|
||||
|
||||
var Receivers = map[string]receivers.ReceiverFuncs{
|
||||
|
@ -7,6 +7,7 @@ The base class/configuration is located in `metricSink.go`.
|
||||
* `stdoutSink.go`: Writes all metrics to `stdout` in InfluxDB line protocol. The sink does not use https://github.com/influxdata/line-protocol to reduce the executed code for debugging
|
||||
* `influxSink.go`: Writes all metrics to an InfluxDB database instance using a blocking writer. It uses https://github.com/influxdata/influxdb-client-go . Configuration for the server, port, ssl, password, database name and organisation are in the global configuration file. The 'password' is used for the token and the 'database' for the bucket. It uses the v2 API of Influx.
|
||||
* `natsSink.go`: Sends all metrics to an NATS server using the InfluxDB line protocol as encoding. It uses https://github.com/nats-io/nats.go . Configuration for the server, port, user, password and database name are in the global configuration file. The database name is used as subject for the NATS messages.
|
||||
* `httpSink.go`: Sends all metrics to an HTTP endpoint `http://<host>:<port>/<database>` using a POST request. The body of the request will consist of lines in the InfluxDB line protocol. In case password is specified, that password is used as a JWT in the 'Authorization' header.
|
||||
|
||||
# Installation
|
||||
Nothing to do, all sinks are pure Go code
|
||||
|
68
sinks/httpSink.go
Normal file
68
sinks/httpSink.go
Normal file
@ -0,0 +1,68 @@
|
||||
package sinks
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
lp "github.com/influxdata/line-protocol"
|
||||
)
|
||||
|
||||
type HttpSink struct {
|
||||
Sink
|
||||
client *http.Client
|
||||
url, jwt string
|
||||
encoder *lp.Encoder
|
||||
buffer *bytes.Buffer
|
||||
}
|
||||
|
||||
func (s *HttpSink) Init(config SinkConfig) error {
|
||||
if len(config.Host) == 0 || len(config.Port) == 0 {
|
||||
return errors.New("`host`, `port` and `database` config options required for TCP sink")
|
||||
}
|
||||
|
||||
s.client = &http.Client{}
|
||||
s.url = fmt.Sprintf("http://%s:%s/%s", config.Host, config.Port, config.Database)
|
||||
s.port = config.Port
|
||||
s.jwt = config.Password
|
||||
s.buffer = &bytes.Buffer{}
|
||||
s.encoder = lp.NewEncoder(s.buffer)
|
||||
s.encoder.SetPrecision(time.Second)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *HttpSink) Write(point lp.MutableMetric) error {
|
||||
_, err := s.encoder.Encode(point)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *HttpSink) Flush() error {
|
||||
req, err := http.NewRequest(http.MethodPost, s.url, s.buffer)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(s.jwt) != 0 {
|
||||
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", s.jwt))
|
||||
}
|
||||
|
||||
res, err := s.client.Do(req)
|
||||
s.buffer.Reset()
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if res.StatusCode != 200 {
|
||||
return errors.New(res.Status)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *HttpSink) Close() {
|
||||
s.client.CloseIdleConnections()
|
||||
}
|
Loading…
Reference in New Issue
Block a user