From 72012516003635800a57bce535c5e34cf8f8cc7a Mon Sep 17 00:00:00 2001 From: Lou Knauer Date: Fri, 4 Feb 2022 08:52:53 +0100 Subject: [PATCH] Enable basic authentication for NATS --- README.md | 4 +++- lineprotocol.go | 13 +++++++++---- metric-store.go | 7 +++++++ 3 files changed, 19 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 791bdf4..d759fc4 100644 --- a/README.md +++ b/README.md @@ -76,9 +76,11 @@ All durations are specified as string that will be parsed [like this](https://pk - `scope`: Unused at the moment, should be something like `"node"`, `"socket"` or `"hwthread"` - `nats`: - `address`: Url of NATS.io server, example: "nats://localhost:4222" + - `username` and `password`: Optional, if provided use those for the connection + - `subscribe-to`: Where to expect the measurements to be published - `http-api`: - `address`: Address to bind to, for example `0.0.0.0:8080` - - `https-cert-file` and `https-key-file`: Optional, if provided enable HTTPS using those files as certificate/key. + - `https-cert-file` and `https-key-file`: Optional, if provided enable HTTPS using those files as certificate/key - `jwt-public-key`: Base64 encoded string, use this to verify requests to the HTTP API - `retention-on-memory`: Keep all values in memory for at least that amount of time - `checkpoints`: diff --git a/lineprotocol.go b/lineprotocol.go index 5ed8967..a1829f1 100644 --- a/lineprotocol.go +++ b/lineprotocol.go @@ -21,7 +21,12 @@ type Metric struct { // function. handleLine will be called for each line recieved via nats. // Send `true` through the done channel for gracefull termination. func ReceiveNats(conf *NatsConfig, handleLine func(dec *lineprotocol.Decoder) error, workers int, ctx context.Context) error { - nc, err := nats.Connect(conf.Address) + var opts []nats.Option + if conf.Username != "" && conf.Password != "" { + opts = append(opts, nats.UserInfo(conf.Username, conf.Password)) + } + + nc, err := nats.Connect(conf.Address, opts...) if err != nil { return err } @@ -48,11 +53,11 @@ func ReceiveNats(conf *NatsConfig, handleLine func(dec *lineprotocol.Decoder) er }() } - sub, err = nc.Subscribe("updates", func(m *nats.Msg) { + sub, err = nc.Subscribe(conf.SubscribeTo, func(m *nats.Msg) { msgs <- m }) } else { - sub, err = nc.Subscribe("updates", func(m *nats.Msg) { + sub, err = nc.Subscribe(conf.SubscribeTo, func(m *nats.Msg) { dec := lineprotocol.NewDecoderWithBytes(m.Data) if err := handleLine(dec); err != nil { log.Printf("error: %s\n", err.Error()) @@ -64,7 +69,7 @@ func ReceiveNats(conf *NatsConfig, handleLine func(dec *lineprotocol.Decoder) er return err } - log.Printf("NATS subscription to 'updates' on '%s' established\n", conf.Address) + log.Printf("NATS subscription to '%s' on '%s' established\n", conf.SubscribeTo, conf.Address) <-ctx.Done() err = sub.Unsubscribe() diff --git a/metric-store.go b/metric-store.go index d1d16cb..e17d39f 100644 --- a/metric-store.go +++ b/metric-store.go @@ -39,6 +39,13 @@ type HttpConfig struct { type NatsConfig struct { // Address of the nats server Address string `json:"address"` + + // Channel name + SubscribeTo string `json:"subscribe-to"` + + // Username/Password, optional + Username string `json:"username"` + Password string `json:"password"` } type Config struct {