diff --git a/cmd/cc-backend/main.go b/cmd/cc-backend/main.go index 5ec712a..29a4467 100644 --- a/cmd/cc-backend/main.go +++ b/cmd/cc-backend/main.go @@ -216,10 +216,7 @@ func main() { // Start NATS Messenger if Config exists wg.Add(1) - nm, err := natsMessenger.New(config.Keys.Nats) - if err != nil { - log.Fatal("Error on NATS startup!") - } + nm := natsMessenger.GetNatsMessenger(config.Keys.Nats) wg.Done() // Start HTTP server diff --git a/internal/natsMessenger/natsMessenger.go b/internal/natsMessenger/natsMessenger.go index c679d54..e535f2e 100644 --- a/internal/natsMessenger/natsMessenger.go +++ b/internal/natsMessenger/natsMessenger.go @@ -12,6 +12,7 @@ import ( "errors" "fmt" "os" + "sync" "time" "github.com/ClusterCockpit/cc-backend/internal/importer" @@ -31,9 +32,8 @@ type NatsMessenger struct { jwtPubKey ed25519.PublicKey } -func New(config *schema.NatsConfig) (nm *NatsMessenger, err error) { - return SetupNatsMessenger(config) -} +var natsMessengerInstance *NatsMessenger +var once sync.Once type DevNatsMessage struct { Content string `json:"content"` @@ -72,75 +72,71 @@ type ReceiveEventNatsRequest struct { Value *int64 `json:"value,omitempty" example:"150"` // Optional Value Set for Evenr, eg powercap } -// Check auth and setup listeners to channels - -// ns *server.Server, nc *nats.Conn, subs []*nats.Subscription, err error -func SetupNatsMessenger(config *schema.NatsConfig) (nm *NatsMessenger, err error) { +// Get Singleton +func GetNatsMessenger(config *schema.NatsConfig) *NatsMessenger { // Check if Config present if config == nil { log.Info("No NATS config found: Skip NATS init.") - return nil, nil + return nil } - // Init Raw - nmr := NatsMessenger{ - Server: nil, - Connection: nil, - Subscriptions: []*nats.Subscription{}, - JobRepository: repository.GetJobRepository(), - jwtPubKey: nil, - } + if natsMessengerInstance == nil { + once.Do( + func() { + // Raw Init + var err error + natsMessengerInstance = &NatsMessenger{ + Server: nil, + Connection: nil, + Subscriptions: []*nats.Subscription{}, + JobRepository: repository.GetJobRepository(), + jwtPubKey: nil, + } + // Init JWT PubKey + pubKey := os.Getenv("JWT_PUBLIC_KEY") + if pubKey == "" { + log.Warn("environment variable 'JWT_PUBLIC_KEY' not set (token based authentication will not work for nats: abort setup)") + } else { + if bytes, err := base64.StdEncoding.DecodeString(pubKey); err != nil { + log.Warn("Could not decode JWT public key") + } else { + natsMessengerInstance.jwtPubKey = ed25519.PublicKey(bytes) + } + } - // Init JWT PubKey - pubKey := os.Getenv("JWT_PUBLIC_KEY") - if pubKey == "" { - log.Warn("environment variable 'JWT_PUBLIC_KEY' not set (token based authentication will not work for nats: abort setup)") - return nil, fmt.Errorf("environment variable 'JWT_PUBLIC_KEY' not set (token based authentication will not work for nats: abort nats setup)") + // Start Nats Server + // Note: You can configure things like Host, Port, Authorization, and much more using server.Options. + opts := &server.Options{Port: config.Port} + if natsMessengerInstance.Server, err = server.NewServer(opts); err != nil { + log.Error("nats server error on creation") + } + + go natsMessengerInstance.Server.Start() + + if !natsMessengerInstance.Server.ReadyForConnections(3 * time.Second) { + log.Error("nats server not ready for connection") + } + + // Connect + var copts []nats.Option + if natsMessengerInstance.Connection, err = nats.Connect(natsMessengerInstance.Server.ClientURL(), copts...); err != nil { + natsMessengerInstance.Server.Shutdown() + log.Error("nats connection could not be established: nats shut down") + } + + // Subscribe + if err = natsMessengerInstance.setupSubscriptions(); err != nil { + log.Error("error when subscribing to channels: nats shut down") + natsMessengerInstance.Connection.Close() + natsMessengerInstance.Server.Shutdown() + } + }) + log.Infof("NATS server and subscriptions on port '%d' established\n", config.Port) } else { - bytes, err := base64.StdEncoding.DecodeString(pubKey) - if err != nil { - log.Warn("Could not decode JWT public key") - return nil, err - } - nmr.jwtPubKey = ed25519.PublicKey(bytes) + log.Infof("Single NatsMessenger instance already created on port '%d'\n", config.Port) } - // Start Nats Server - // Note: You can configure things like Host, Port, Authorization, and much more using server.Options. - opts := &server.Options{Port: config.Port} - nmr.Server, err = server.NewServer(opts) - - if err != nil { - log.Error("nats server error on creation") - return nil, err - } - - go nmr.Server.Start() - - if !nmr.Server.ReadyForConnections(3 * time.Second) { - log.Error("nats server not ready for connection") - return nil, fmt.Errorf("nats server not ready for connection") - } - - // Connect - var copts []nats.Option - nmr.Connection, err = nats.Connect(nmr.Server.ClientURL(), copts...) - if nmr.Connection == nil { - nmr.Server.Shutdown() - log.Error("nats connection could not be established: nats shut down") - return nil, err - } - - // Subscribe - if err = nmr.setupSubscriptions(); err != nil { - log.Error("error when subscribing to channels: nats shut down") - nmr.Connection.Close() - nmr.Server.Shutdown() - return nil, err - } - - log.Infof("NATS server and subscriptions on port '%d' established\n", config.Port) - return &nmr, nil + return natsMessengerInstance } func (nm *NatsMessenger) StopNatsMessenger() {