Added test implementation for Nats subscriber

This commit is contained in:
Bole Ma
2023-07-07 16:15:05 +02:00
parent e5f7ad8ac0
commit 7aa7750177
4 changed files with 152 additions and 1 deletions

View File

@@ -4,7 +4,14 @@
// license that can be found in the LICENSE file.
package scheduler
import "encoding/json"
import (
"encoding/json"
"log"
"strings"
"sync"
"github.com/nats-io/nats.go"
)
type SlurmNatsConfig struct {
URL string `json:"url"`
@@ -15,6 +22,53 @@ type SlurmNatsScheduler struct {
}
func (sd *SlurmNatsScheduler) Init(rawConfig json.RawMessage) error {
servers := []string{"nats://127.0.0.1:4222", "nats://127.0.0.1:1223"}
nc, err := nats.Connect(strings.Join(servers, ","))
if err != nil {
log.Fatal(err)
}
defer nc.Close()
getStatusTxt := func(nc *nats.Conn) string {
switch nc.Status() {
case nats.CONNECTED:
return "Connected"
case nats.CLOSED:
return "Closed"
default:
return "Other"
}
}
log.Printf("The connection status is %v\n", getStatusTxt(nc))
ec, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
if err != nil {
log.Fatal(err)
}
defer ec.Close()
// Define the object
type encodedMessage struct {
ServerName string
ResponseCode int
}
wg := sync.WaitGroup{}
wg.Add(1)
// Subscribe
if _, err := ec.Subscribe("stopJob", func(s *encodedMessage) {
log.Printf("Server Name: %s - Response Code: %v", s.ServerName, s.ResponseCode)
if s.ResponseCode == 500 {
wg.Done()
}
}); err != nil {
log.Fatal(err)
}
// Wait for a message to come in
wg.Wait()
return nil
}