Add support for credential file (NKEY) to NATS sink and receiver

This commit is contained in:
Thomas Roehl 2024-10-11 04:43:57 +02:00
parent a36f8fe19d
commit ccce00d64f
4 changed files with 40 additions and 6 deletions

View File

@ -4,6 +4,7 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"os"
"time" "time"
cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
@ -13,10 +14,13 @@ import (
) )
type NatsReceiverConfig struct { type NatsReceiverConfig struct {
Type string `json:"type"` Type string `json:"type"`
Addr string `json:"address"` Addr string `json:"address"`
Port string `json:"port"` Port string `json:"port"`
Subject string `json:"subject"` Subject string `json:"subject"`
User string `json:"user,omitempty"`
Password string `json:"password,omitempty"`
NkeyFile string `json:"nkey_file,omitempty"`
} }
type NatsReceiver struct { type NatsReceiver struct {
@ -109,6 +113,7 @@ func (r *NatsReceiver) Close() {
// NewNatsReceiver creates a new Receiver which subscribes to messages from a NATS server // NewNatsReceiver creates a new Receiver which subscribes to messages from a NATS server
func NewNatsReceiver(name string, config json.RawMessage) (Receiver, error) { func NewNatsReceiver(name string, config json.RawMessage) (Receiver, error) {
var uinfo nats.Option = nil
r := new(NatsReceiver) r := new(NatsReceiver)
r.name = fmt.Sprintf("NatsReceiver(%s)", name) r.name = fmt.Sprintf("NatsReceiver(%s)", name)
@ -133,10 +138,22 @@ func NewNatsReceiver(name string, config json.RawMessage) (Receiver, error) {
"source": r.name, "source": r.name,
} }
if len(r.config.User) > 0 && len(r.config.Password) > 0 {
uinfo = nats.UserInfo(r.config.User, r.config.Password)
} else if len(r.config.NkeyFile) > 0 {
_, err := os.Stat(r.config.NkeyFile)
if err == nil {
uinfo = nats.UserCredentials(r.config.NkeyFile)
} else {
cclog.ComponentError(r.name, "NKEY file", r.config.NkeyFile, "does not exist: %v", err.Error())
return nil, err
}
}
// Connect to NATS server // Connect to NATS server
url := fmt.Sprintf("nats://%s:%s", r.config.Addr, r.config.Port) url := fmt.Sprintf("nats://%s:%s", r.config.Addr, r.config.Port)
cclog.ComponentDebug(r.name, "NewNatsReceiver", url, "Subject", r.config.Subject) cclog.ComponentDebug(r.name, "NewNatsReceiver", url, "Subject", r.config.Subject)
if nc, err := nats.Connect(url); err == nil { if nc, err := nats.Connect(url, uinfo); err == nil {
r.nc = nc r.nc = nc
} else { } else {
r.nc = nil r.nc = nil

View File

@ -10,7 +10,10 @@ The `nats` receiver can be used receive metrics from the NATS network. The `nats
"type": "nats", "type": "nats",
"address" : "nats-server.example.org", "address" : "nats-server.example.org",
"port" : "4222", "port" : "4222",
"subject" : "subject" "subject" : "subject",
"user": "natsuser",
"password": "natssecret",
"nkey_file": "/path/to/nkey_file"
} }
} }
``` ```
@ -19,6 +22,9 @@ The `nats` receiver can be used receive metrics from the NATS network. The `nats
- `address`: Address of the NATS control server - `address`: Address of the NATS control server
- `port`: Port of the NATS control server - `port`: Port of the NATS control server
- `subject`: Subscribes to this subject and receive metrics - `subject`: Subscribes to this subject and receive metrics
- `user`: Connect to nats using this user
- `password`: Connect to nats using this password
- `nkey_file`: Path to credentials file with NKEY
### Debugging ### Debugging

View File

@ -5,6 +5,7 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"os"
"sync" "sync"
"time" "time"
@ -22,6 +23,7 @@ type NatsSinkConfig struct {
User string `json:"user,omitempty"` User string `json:"user,omitempty"`
Password string `json:"password,omitempty"` Password string `json:"password,omitempty"`
FlushDelay string `json:"flush_delay,omitempty"` FlushDelay string `json:"flush_delay,omitempty"`
NkeyFile string `json:"nkey_file,omitempty"`
} }
type NatsSink struct { type NatsSink struct {
@ -42,6 +44,13 @@ func (s *NatsSink) connect() error {
var nc *nats.Conn var nc *nats.Conn
if len(s.config.User) > 0 && len(s.config.Password) > 0 { if len(s.config.User) > 0 && len(s.config.Password) > 0 {
uinfo = nats.UserInfo(s.config.User, s.config.Password) uinfo = nats.UserInfo(s.config.User, s.config.Password)
} else if len(s.config.NkeyFile) > 0 {
if _, err := os.Stat(s.config.NkeyFile); err == nil {
uinfo = nats.UserCredentials(s.config.NkeyFile)
} else {
cclog.ComponentError(s.name, "NKEY file", s.config.NkeyFile, "does not exist: %v", err.Error())
return err
}
} }
uri := fmt.Sprintf("nats://%s:%s", s.config.Host, s.config.Port) uri := fmt.Sprintf("nats://%s:%s", s.config.Host, s.config.Port)
cclog.ComponentDebug(s.name, "Connect to", uri) cclog.ComponentDebug(s.name, "Connect to", uri)

View File

@ -13,6 +13,7 @@ The `nats` sink publishes all metrics into a NATS network. The publishing key is
"port": "4222", "port": "4222",
"user": "exampleuser", "user": "exampleuser",
"password" : "examplepw", "password" : "examplepw",
"nkey_file": "/path/to/nkey_file",
"meta_as_tags" : [], "meta_as_tags" : [],
} }
} }
@ -25,3 +26,4 @@ The `nats` sink publishes all metrics into a NATS network. The publishing key is
- `user`: Username for basic authentication - `user`: Username for basic authentication
- `password`: Password for basic authentication - `password`: Password for basic authentication
- `meta_as_tags`: print all meta information as tags in the output (optional) - `meta_as_tags`: print all meta information as tags in the output (optional)
- `nkey_file`: Path to credentials file with NKEY