diff --git a/internal/natsMessenger/natsMessenger.go b/internal/natsMessenger/natsMessenger.go index 0e5b06b..24fc4b1 100644 --- a/internal/natsMessenger/natsMessenger.go +++ b/internal/natsMessenger/natsMessenger.go @@ -27,7 +27,6 @@ func New(config *schema.NatsConfig) (nm *NatsMessenger, err error) { return SetupNatsMessenger(config) } -// StartJobNatsMessage model type DevNatsMessage struct { Content string `json:"content"` } @@ -89,7 +88,7 @@ func SetupNatsMessenger(config *schema.NatsConfig) (nm *NatsMessenger, err error go nmr.Server.Start() - if !nmr.Server.ReadyForConnections(4 * time.Second) { + if !nmr.Server.ReadyForConnections(3 * time.Second) { log.Error("nats server not ready for connection") return nil, fmt.Errorf("nats server not ready for connection") } @@ -104,15 +103,13 @@ func SetupNatsMessenger(config *schema.NatsConfig) (nm *NatsMessenger, err error } // Subscribe - sub, err := startJobListener(nmr.Connection) + nmr.Subscriptions, err = setupSubscriptions(nmr.Connection) if err != nil { - log.Error("startJobListener subscription error") + log.Error("error when subscribing to channels") return nil, err - } else { - log.Infof("NATS subscription to 'start-job' on port '%d' established\n", config.Port) - nmr.Subscriptions = append(nmr.Subscriptions, sub) } + log.Infof("NATS server and subscriptions on port '%d' established\n", config.Port) return &nmr, nil } @@ -129,49 +126,111 @@ func (nm *NatsMessenger) StopNatsMessenger() { log.Info("NATS connections closed and server shut down") } -// Listeners: Subscribe to specified channels for actions +func setupSubscriptions(conn *nats.Conn) (subs []*nats.Subscription, err error) { + + if startSub, err := startJobListener(conn); err != nil { + log.Infof("Subscription to 'start-job' failed: %s", err) + } else { + log.Info("Subscribed to 'start-job'") + subs = append(subs, startSub) + } + + if stopSub, err := stopJobListener(conn); err != nil { + log.Infof("Subscription to 'stop-job' failed: %s", err) + } else { + log.Info("Subscribed to 'stop-job'") + subs = append(subs, stopSub) + } + + if deleteSub, err := deleteJobListener(conn); err == nil { + log.Infof("Subscription to 'delete-job' failed: %s", err) + } else { + log.Info("Subscribed to 'delete-job'") + subs = append(subs, deleteSub) + } + + if eventSub, err := jobEventListener(conn); err != nil { + log.Infof("Subscription to 'job-event' failed: %s", err) + } else { + log.Info("Subscribed to 'job-event'") + subs = append(subs, eventSub) + } + + return subs, err +} + +// Listeners: Subscribe to specified channels and handle with specific handler functions func startJobListener(conn *nats.Conn) (sub *nats.Subscription, err error) { - - sub, err = conn.Subscribe("start-job", func(m *nats.Msg) { - var job DevNatsMessage - if err := json.Unmarshal(m.Data, &job); err != nil { - log.Error("Error while unmarshaling raw json nats message content") + return conn.Subscribe("start-job", func(m *nats.Msg) { + var req DevNatsMessage + if err := json.Unmarshal(m.Data, &req); err != nil { + log.Error("Error while unmarshaling raw json nats message content: startJob") } - if err := startJobHandler(job); err != nil { + if err := startJobHandler(req); err != nil { log.Errorf("error: %s", err.Error()) } }) - - if err != nil { - return nil, err - } else { - return sub, nil - } } -func (nm *NatsMessenger) stopJobListener(conn *nats.Conn) { +func stopJobListener(conn *nats.Conn) (sub *nats.Subscription, err error) { + return conn.Subscribe("stop-job", func(m *nats.Msg) { + var req DevNatsMessage + if err := json.Unmarshal(m.Data, &req); err != nil { + log.Error("Error while unmarshaling raw json nats message content: stopJob") + } + + if err := stopJobHandler(req); err != nil { + log.Errorf("error: %s", err.Error()) + } + }) } -func (nm *NatsMessenger) deleteJobListener(conn *nats.Conn) { +func deleteJobListener(conn *nats.Conn) (sub *nats.Subscription, err error) { + return conn.Subscribe("delete-job", func(m *nats.Msg) { + var req DevNatsMessage + if err := json.Unmarshal(m.Data, &req); err != nil { + log.Error("Error while unmarshaling raw json nats message content: deleteJob") + } + + if err := deleteJobHandler(req); err != nil { + log.Errorf("error: %s", err.Error()) + } + }) } -func (nm *NatsMessenger) jobEventListener(conn *nats.Conn) { +func jobEventListener(conn *nats.Conn) (sub *nats.Subscription, err error) { + return conn.Subscribe("job-event", func(m *nats.Msg) { + var req DevNatsMessage + if err := json.Unmarshal(m.Data, &req); err != nil { + log.Error("Error while unmarshaling raw json nats message content: jobEvent") + } + + if err := jobEventHandler(req); err != nil { + log.Errorf("error: %s", err.Error()) + } + }) } // Handlers: Take content of message and perform action, e.g. adding job in db -func startJobHandler(job DevNatsMessage) (err error) { - log.Debugf("CALLED HANDLER FOR startJob: %s", job.Content) +func startJobHandler(req DevNatsMessage) (err error) { + log.Debugf("CALLED HANDLER FOR startJob: %s", req.Content) return nil } -func (nm *NatsMessenger) stopJobHandler() { +func stopJobHandler(req DevNatsMessage) (err error) { + log.Debugf("CALLED HANDLER FOR stopJob: %s", req.Content) + return nil } -func (nm *NatsMessenger) deleteJobHandler() { +func deleteJobHandler(req DevNatsMessage) (err error) { + log.Debugf("CALLED HANDLER FOR deleteJob: %s", req.Content) + return nil } -func (nm *NatsMessenger) jobEventHandler() { +func jobEventHandler(req DevNatsMessage) (err error) { + log.Debugf("CALLED HANDLER FOR jobEvent: %s", req.Content) + return nil }