mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2026-06-08 04:37:29 +02:00
Merge pull request #556 from ClusterCockpit/release/v1.5
Fix critical/severe issues in init, startup and shutdown
This commit is contained in:
@@ -78,6 +78,12 @@ type NatsAPI struct {
|
||||
jobCh chan natsMessage
|
||||
// nodeCh receives node state messages for processing by worker goroutines.
|
||||
nodeCh chan natsMessage
|
||||
// stop signals worker goroutines and subscription callbacks to stop.
|
||||
// Closing it (via Shutdown) makes workers exit and callbacks drop further
|
||||
// messages instead of blocking; the channels are never closed so in-flight
|
||||
// callbacks can never send on a closed channel.
|
||||
stop chan struct{}
|
||||
stopOnce sync.Once
|
||||
}
|
||||
|
||||
// NewNatsAPI creates a new NatsAPI instance with channel-based worker pools.
|
||||
@@ -99,6 +105,7 @@ func NewNatsAPI() *NatsAPI {
|
||||
JobRepository: repository.GetJobRepository(),
|
||||
jobCh: make(chan natsMessage, jobConc),
|
||||
nodeCh: make(chan natsMessage, nodeConc),
|
||||
stop: make(chan struct{}),
|
||||
}
|
||||
|
||||
// Start worker goroutines
|
||||
@@ -112,17 +119,36 @@ func NewNatsAPI() *NatsAPI {
|
||||
return api
|
||||
}
|
||||
|
||||
// Shutdown stops the worker goroutines and tells subscription callbacks to stop
|
||||
// enqueueing. It is safe to call multiple times. Callers must ensure the NATS
|
||||
// client is closed first so no new callbacks are invoked.
|
||||
func (api *NatsAPI) Shutdown() {
|
||||
api.stopOnce.Do(func() {
|
||||
close(api.stop)
|
||||
})
|
||||
}
|
||||
|
||||
// jobWorker processes job event messages from the job channel.
|
||||
func (api *NatsAPI) jobWorker() {
|
||||
for msg := range api.jobCh {
|
||||
api.handleJobEvent(msg.subject, msg.data)
|
||||
for {
|
||||
select {
|
||||
case <-api.stop:
|
||||
return
|
||||
case msg := <-api.jobCh:
|
||||
api.handleJobEvent(msg.subject, msg.data)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// nodeWorker processes node state messages from the node channel.
|
||||
func (api *NatsAPI) nodeWorker() {
|
||||
for msg := range api.nodeCh {
|
||||
api.handleNodeState(msg.subject, msg.data)
|
||||
for {
|
||||
select {
|
||||
case <-api.stop:
|
||||
return
|
||||
case msg := <-api.nodeCh:
|
||||
api.handleNodeState(msg.subject, msg.data)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -140,13 +166,19 @@ func (api *NatsAPI) StartSubscriptions() error {
|
||||
s := config.Keys.APISubjects
|
||||
|
||||
if err := client.Subscribe(s.SubjectJobEvent, func(subject string, data []byte) {
|
||||
api.jobCh <- natsMessage{subject: subject, data: data}
|
||||
select {
|
||||
case api.jobCh <- natsMessage{subject: subject, data: data}:
|
||||
case <-api.stop:
|
||||
}
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := client.Subscribe(s.SubjectNodeState, func(subject string, data []byte) {
|
||||
api.nodeCh <- natsMessage{subject: subject, data: data}
|
||||
select {
|
||||
case api.nodeCh <- natsMessage{subject: subject, data: data}:
|
||||
case <-api.stop:
|
||||
}
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user