Allow multiple nats subscriptions

This commit is contained in:
Lou Knauer 2022-02-22 14:03:45 +01:00
parent d8e25063f9
commit 902fcf9510
3 changed files with 63 additions and 51 deletions

View File

@ -77,7 +77,9 @@ All durations are specified as string that will be parsed [like this](https://pk
- `nats`:
- `address`: Url of NATS.io server, example: "nats://localhost:4222"
- `username` and `password`: Optional, if provided use those for the connection
- `subscribe-to`: Where to expect the measurements to be published
- `subscriptions`:
- `subscribe-to`: Where to expect the measurements to be published
- `cluster-tag`: Default value for the cluster tag
- `http-api`:
- `address`: Address to bind to, for example `0.0.0.0:8080`
- `https-cert-file` and `https-key-file`: Optional, if provided enable HTTPS using those files as certificate/key

View File

@ -33,53 +33,58 @@ func ReceiveNats(conf *NatsConfig, handleLine func(*lineprotocol.Decoder, string
defer nc.Close()
var wg sync.WaitGroup
var sub *nats.Subscription
var subs []*nats.Subscription
msgs := make(chan *nats.Msg, workers*2)
if workers > 1 {
wg.Add(workers)
for _, sc := range conf.Subscriptions {
clusterTag := sc.ClusterTag
var sub *nats.Subscription
if workers > 1 {
wg.Add(workers)
for i := 0; i < workers; i++ {
go func() {
for m := range msgs {
dec := lineprotocol.NewDecoderWithBytes(m.Data)
if err := handleLine(dec, conf.ClusterTag); err != nil {
log.Printf("error: %s\n", err.Error())
for i := 0; i < workers; i++ {
go func() {
for m := range msgs {
dec := lineprotocol.NewDecoderWithBytes(m.Data)
if err := handleLine(dec, clusterTag); err != nil {
log.Printf("error: %s\n", err.Error())
}
}
}
wg.Done()
}()
wg.Done()
}()
}
sub, err = nc.Subscribe(sc.SubscribeTo, func(m *nats.Msg) {
msgs <- m
})
} else {
sub, err = nc.Subscribe(sc.SubscribeTo, func(m *nats.Msg) {
dec := lineprotocol.NewDecoderWithBytes(m.Data)
if err := handleLine(dec, clusterTag); err != nil {
log.Printf("error: %s\n", err.Error())
}
})
}
sub, err = nc.Subscribe(conf.SubscribeTo, func(m *nats.Msg) {
msgs <- m
})
} else {
sub, err = nc.Subscribe(conf.SubscribeTo, func(m *nats.Msg) {
dec := lineprotocol.NewDecoderWithBytes(m.Data)
if err := handleLine(dec, conf.ClusterTag); err != nil {
log.Printf("error: %s\n", err.Error())
}
})
if err != nil {
return err
}
log.Printf("NATS subscription to '%s' on '%s' established\n", sc.SubscribeTo, conf.Address)
subs = append(subs, sub)
}
if err != nil {
return err
}
log.Printf("NATS subscription to '%s' on '%s' established\n", conf.SubscribeTo, conf.Address)
<-ctx.Done()
err = sub.Unsubscribe()
for _, sub := range subs {
err = sub.Unsubscribe()
if err != nil {
log.Printf("NATS unsubscribe failed: %s", err.Error())
}
}
close(msgs)
wg.Wait()
if err != nil {
return err
}
nc.Close()
log.Println("NATS connection closed")
return nil

View File

@ -38,21 +38,23 @@ type NatsConfig struct {
// Address of the nats server
Address string `json:"address"`
// Channel name
SubscribeTo string `json:"subscribe-to"`
// Allow lines without a cluster tag, use this as default, optional
ClusterTag string `json:"cluster-tag"`
// Username/Password, optional
Username string `json:"username"`
Password string `json:"password"`
Subscriptions []struct {
// Channel name
SubscribeTo string `json:"subscribe-to"`
// Allow lines without a cluster tag, use this as default, optional
ClusterTag string `json:"cluster-tag"`
} `json:"subscriptions"`
}
type Config struct {
Metrics map[string]MetricConfig `json:"metrics"`
RetentionInMemory string `json:"retention-in-memory"`
Nats *NatsConfig `json:"nats"`
Nats []*NatsConfig `json:"nats"`
JwtPublicKey string `json:"jwt-public-key"`
HttpConfig *HttpConfig `json:"http-api"`
Checkpoints struct {
@ -237,17 +239,20 @@ func main() {
}()
if conf.Nats != nil {
wg.Add(1)
for _, natsConf := range conf.Nats {
// TODO: When multiple nats configs share a URL, do a single connect.
wg.Add(1)
nc := natsConf
go func() {
// err := ReceiveNats(conf.Nats, decodeLine, runtime.NumCPU()-1, ctx)
err := ReceiveNats(nc, decodeLine, 1, ctx)
go func() {
// err := ReceiveNats(conf.Nats, decodeLine, runtime.NumCPU()-1, ctx)
err := ReceiveNats(conf.Nats, decodeLine, 1, ctx)
if err != nil {
log.Fatal(err)
}
wg.Done()
}()
if err != nil {
log.Fatal(err)
}
wg.Done()
}()
}
}
wg.Wait()