From ccce00d64f0493e528daf070a2dd99e32557f055 Mon Sep 17 00:00:00 2001 From: Thomas Roehl Date: Fri, 11 Oct 2024 04:43:57 +0200 Subject: [PATCH] Add support for credential file (NKEY) to NATS sink and receiver --- receivers/natsReceiver.go | 27 ++++++++++++++++++++++----- receivers/natsReceiver.md | 8 +++++++- sinks/natsSink.go | 9 +++++++++ sinks/natsSink.md | 2 ++ 4 files changed, 40 insertions(+), 6 deletions(-) diff --git a/receivers/natsReceiver.go b/receivers/natsReceiver.go index ea0cc3b..02bcfdc 100644 --- a/receivers/natsReceiver.go +++ b/receivers/natsReceiver.go @@ -4,6 +4,7 @@ import ( "encoding/json" "errors" "fmt" + "os" "time" cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" @@ -13,10 +14,13 @@ import ( ) type NatsReceiverConfig struct { - Type string `json:"type"` - Addr string `json:"address"` - Port string `json:"port"` - Subject string `json:"subject"` + Type string `json:"type"` + Addr string `json:"address"` + Port string `json:"port"` + Subject string `json:"subject"` + User string `json:"user,omitempty"` + Password string `json:"password,omitempty"` + NkeyFile string `json:"nkey_file,omitempty"` } type NatsReceiver struct { @@ -109,6 +113,7 @@ func (r *NatsReceiver) Close() { // NewNatsReceiver creates a new Receiver which subscribes to messages from a NATS server func NewNatsReceiver(name string, config json.RawMessage) (Receiver, error) { + var uinfo nats.Option = nil r := new(NatsReceiver) r.name = fmt.Sprintf("NatsReceiver(%s)", name) @@ -133,10 +138,22 @@ func NewNatsReceiver(name string, config json.RawMessage) (Receiver, error) { "source": r.name, } + if len(r.config.User) > 0 && len(r.config.Password) > 0 { + uinfo = nats.UserInfo(r.config.User, r.config.Password) + } else if len(r.config.NkeyFile) > 0 { + _, err := os.Stat(r.config.NkeyFile) + if err == nil { + uinfo = nats.UserCredentials(r.config.NkeyFile) + } else { + cclog.ComponentError(r.name, "NKEY file", r.config.NkeyFile, "does not exist: %v", err.Error()) + return nil, err + } + } + // Connect to NATS server url := fmt.Sprintf("nats://%s:%s", r.config.Addr, r.config.Port) cclog.ComponentDebug(r.name, "NewNatsReceiver", url, "Subject", r.config.Subject) - if nc, err := nats.Connect(url); err == nil { + if nc, err := nats.Connect(url, uinfo); err == nil { r.nc = nc } else { r.nc = nil diff --git a/receivers/natsReceiver.md b/receivers/natsReceiver.md index d0b2166..0882dcf 100644 --- a/receivers/natsReceiver.md +++ b/receivers/natsReceiver.md @@ -10,7 +10,10 @@ The `nats` receiver can be used receive metrics from the NATS network. The `nats "type": "nats", "address" : "nats-server.example.org", "port" : "4222", - "subject" : "subject" + "subject" : "subject", + "user": "natsuser", + "password": "natssecret", + "nkey_file": "/path/to/nkey_file" } } ``` @@ -19,6 +22,9 @@ The `nats` receiver can be used receive metrics from the NATS network. The `nats - `address`: Address of the NATS control server - `port`: Port of the NATS control server - `subject`: Subscribes to this subject and receive metrics +- `user`: Connect to nats using this user +- `password`: Connect to nats using this password +- `nkey_file`: Path to credentials file with NKEY ### Debugging diff --git a/sinks/natsSink.go b/sinks/natsSink.go index db446ca..32ff963 100644 --- a/sinks/natsSink.go +++ b/sinks/natsSink.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + "os" "sync" "time" @@ -22,6 +23,7 @@ type NatsSinkConfig struct { User string `json:"user,omitempty"` Password string `json:"password,omitempty"` FlushDelay string `json:"flush_delay,omitempty"` + NkeyFile string `json:"nkey_file,omitempty"` } type NatsSink struct { @@ -42,6 +44,13 @@ func (s *NatsSink) connect() error { var nc *nats.Conn if len(s.config.User) > 0 && len(s.config.Password) > 0 { uinfo = nats.UserInfo(s.config.User, s.config.Password) + } else if len(s.config.NkeyFile) > 0 { + if _, err := os.Stat(s.config.NkeyFile); err == nil { + uinfo = nats.UserCredentials(s.config.NkeyFile) + } else { + cclog.ComponentError(s.name, "NKEY file", s.config.NkeyFile, "does not exist: %v", err.Error()) + return err + } } uri := fmt.Sprintf("nats://%s:%s", s.config.Host, s.config.Port) cclog.ComponentDebug(s.name, "Connect to", uri) diff --git a/sinks/natsSink.md b/sinks/natsSink.md index 4c7d9d0..ee32e80 100644 --- a/sinks/natsSink.md +++ b/sinks/natsSink.md @@ -13,6 +13,7 @@ The `nats` sink publishes all metrics into a NATS network. The publishing key is "port": "4222", "user": "exampleuser", "password" : "examplepw", + "nkey_file": "/path/to/nkey_file", "meta_as_tags" : [], } } @@ -25,3 +26,4 @@ The `nats` sink publishes all metrics into a NATS network. The publishing key is - `user`: Username for basic authentication - `password`: Password for basic authentication - `meta_as_tags`: print all meta information as tags in the output (optional) +- `nkey_file`: Path to credentials file with NKEY