From 902fcf95105601ac705db84017582b171242c772 Mon Sep 17 00:00:00 2001 From: Lou Knauer Date: Tue, 22 Feb 2022 14:03:45 +0100 Subject: [PATCH] Allow multiple nats subscriptions --- README.md | 4 ++- lineprotocol.go | 71 ++++++++++++++++++++++++++----------------------- metric-store.go | 39 +++++++++++++++------------ 3 files changed, 63 insertions(+), 51 deletions(-) diff --git a/README.md b/README.md index d759fc4..40ac0ae 100644 --- a/README.md +++ b/README.md @@ -77,7 +77,9 @@ All durations are specified as string that will be parsed [like this](https://pk - `nats`: - `address`: Url of NATS.io server, example: "nats://localhost:4222" - `username` and `password`: Optional, if provided use those for the connection - - `subscribe-to`: Where to expect the measurements to be published + - `subscriptions`: + - `subscribe-to`: Where to expect the measurements to be published + - `cluster-tag`: Default value for the cluster tag - `http-api`: - `address`: Address to bind to, for example `0.0.0.0:8080` - `https-cert-file` and `https-key-file`: Optional, if provided enable HTTPS using those files as certificate/key diff --git a/lineprotocol.go b/lineprotocol.go index d8b4af5..33562d8 100644 --- a/lineprotocol.go +++ b/lineprotocol.go @@ -33,53 +33,58 @@ func ReceiveNats(conf *NatsConfig, handleLine func(*lineprotocol.Decoder, string defer nc.Close() var wg sync.WaitGroup - var sub *nats.Subscription + var subs []*nats.Subscription msgs := make(chan *nats.Msg, workers*2) - if workers > 1 { - wg.Add(workers) + for _, sc := range conf.Subscriptions { + clusterTag := sc.ClusterTag + var sub *nats.Subscription + if workers > 1 { + wg.Add(workers) - for i := 0; i < workers; i++ { - go func() { - for m := range msgs { - dec := lineprotocol.NewDecoderWithBytes(m.Data) - if err := handleLine(dec, conf.ClusterTag); err != nil { - log.Printf("error: %s\n", err.Error()) + for i := 0; i < workers; i++ { + go func() { + for m := range msgs { + dec := lineprotocol.NewDecoderWithBytes(m.Data) + if err := handleLine(dec, clusterTag); err != nil { + log.Printf("error: %s\n", err.Error()) + } } - } - wg.Done() - }() + wg.Done() + }() + } + + sub, err = nc.Subscribe(sc.SubscribeTo, func(m *nats.Msg) { + msgs <- m + }) + } else { + sub, err = nc.Subscribe(sc.SubscribeTo, func(m *nats.Msg) { + dec := lineprotocol.NewDecoderWithBytes(m.Data) + if err := handleLine(dec, clusterTag); err != nil { + log.Printf("error: %s\n", err.Error()) + } + }) } - sub, err = nc.Subscribe(conf.SubscribeTo, func(m *nats.Msg) { - msgs <- m - }) - } else { - sub, err = nc.Subscribe(conf.SubscribeTo, func(m *nats.Msg) { - dec := lineprotocol.NewDecoderWithBytes(m.Data) - if err := handleLine(dec, conf.ClusterTag); err != nil { - log.Printf("error: %s\n", err.Error()) - } - }) + if err != nil { + return err + } + log.Printf("NATS subscription to '%s' on '%s' established\n", sc.SubscribeTo, conf.Address) + subs = append(subs, sub) } - if err != nil { - return err - } - - log.Printf("NATS subscription to '%s' on '%s' established\n", conf.SubscribeTo, conf.Address) - <-ctx.Done() - err = sub.Unsubscribe() + for _, sub := range subs { + err = sub.Unsubscribe() + if err != nil { + log.Printf("NATS unsubscribe failed: %s", err.Error()) + } + } close(msgs) wg.Wait() - if err != nil { - return err - } - nc.Close() log.Println("NATS connection closed") return nil diff --git a/metric-store.go b/metric-store.go index 28f3e9a..25a3b21 100644 --- a/metric-store.go +++ b/metric-store.go @@ -38,21 +38,23 @@ type NatsConfig struct { // Address of the nats server Address string `json:"address"` - // Channel name - SubscribeTo string `json:"subscribe-to"` - - // Allow lines without a cluster tag, use this as default, optional - ClusterTag string `json:"cluster-tag"` - // Username/Password, optional Username string `json:"username"` Password string `json:"password"` + + Subscriptions []struct { + // Channel name + SubscribeTo string `json:"subscribe-to"` + + // Allow lines without a cluster tag, use this as default, optional + ClusterTag string `json:"cluster-tag"` + } `json:"subscriptions"` } type Config struct { Metrics map[string]MetricConfig `json:"metrics"` RetentionInMemory string `json:"retention-in-memory"` - Nats *NatsConfig `json:"nats"` + Nats []*NatsConfig `json:"nats"` JwtPublicKey string `json:"jwt-public-key"` HttpConfig *HttpConfig `json:"http-api"` Checkpoints struct { @@ -237,17 +239,20 @@ func main() { }() if conf.Nats != nil { - wg.Add(1) + for _, natsConf := range conf.Nats { + // TODO: When multiple nats configs share a URL, do a single connect. + wg.Add(1) + nc := natsConf + go func() { + // err := ReceiveNats(conf.Nats, decodeLine, runtime.NumCPU()-1, ctx) + err := ReceiveNats(nc, decodeLine, 1, ctx) - go func() { - // err := ReceiveNats(conf.Nats, decodeLine, runtime.NumCPU()-1, ctx) - err := ReceiveNats(conf.Nats, decodeLine, 1, ctx) - - if err != nil { - log.Fatal(err) - } - wg.Done() - }() + if err != nil { + log.Fatal(err) + } + wg.Done() + }() + } } wg.Wait()