Introduce central nats client

This commit is contained in:
2025-12-16 09:35:33 +01:00
parent 72b2560ecf
commit 14f1192ccb
4 changed files with 323 additions and 0 deletions

View File

@@ -30,6 +30,7 @@ import (
"github.com/ClusterCockpit/cc-backend/internal/tagger"
"github.com/ClusterCockpit/cc-backend/internal/taskmanager"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
"github.com/ClusterCockpit/cc-backend/pkg/nats"
"github.com/ClusterCockpit/cc-backend/web"
ccconf "github.com/ClusterCockpit/cc-lib/ccConfig"
cclog "github.com/ClusterCockpit/cc-lib/ccLogger"
@@ -267,6 +268,13 @@ func generateJWT(authHandle *auth.Authentication, username string) error {
}
func initSubsystems() error {
// Initialize nats client
natsConfig := ccconf.GetPackageConfig("nats")
if err := nats.Init(natsConfig); err != nil {
return fmt.Errorf("initializing nats client: %w", err)
}
nats.Connect()
// Initialize job archive
archiveCfg := ccconf.GetPackageConfig("archive")
if archiveCfg == nil {

View File

@@ -31,6 +31,7 @@ import (
"github.com/ClusterCockpit/cc-backend/internal/graph/generated"
"github.com/ClusterCockpit/cc-backend/internal/memorystore"
"github.com/ClusterCockpit/cc-backend/internal/routerConfig"
"github.com/ClusterCockpit/cc-backend/pkg/nats"
"github.com/ClusterCockpit/cc-backend/web"
cclog "github.com/ClusterCockpit/cc-lib/ccLogger"
"github.com/ClusterCockpit/cc-lib/runtimeEnv"
@@ -363,6 +364,11 @@ func (s *Server) Shutdown(ctx context.Context) {
shutdownCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
nc := nats.GetClient()
if nc != nil {
nc.Close()
}
// First shut down the server gracefully (waiting for all ongoing requests)
if err := s.server.Shutdown(shutdownCtx); err != nil {
cclog.Errorf("Server shutdown error: %v", err)

246
pkg/nats/client.go Normal file
View File

@@ -0,0 +1,246 @@
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved. This file is part of cc-backend.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
// Package nats provides a generic NATS messaging client for publish/subscribe communication.
//
// The package wraps the nats.go library with connection management, automatic reconnection
// handling, and subscription tracking. It supports multiple authentication methods including
// username/password and credential files.
//
// # Configuration
//
// Configure the client via JSON in the application config:
//
// {
// "nats": {
// "address": "nats://localhost:4222",
// "username": "user",
// "password": "secret"
// }
// }
//
// Or using a credentials file:
//
// {
// "nats": {
// "address": "nats://localhost:4222",
// "creds-file-path": "/path/to/creds.json"
// }
// }
//
// # Usage
//
// The package provides a singleton client initialized once and retrieved globally:
//
// nats.Init(rawConfig)
// nats.Connect()
//
// client := nats.GetClient()
// client.Subscribe("events", func(subject string, data []byte) {
// fmt.Printf("Received: %s\n", data)
// })
//
// client.Publish("events", []byte("hello"))
//
// # Thread Safety
//
// All Client methods are safe for concurrent use.
package nats
import (
"context"
"fmt"
"sync"
cclog "github.com/ClusterCockpit/cc-lib/ccLogger"
"github.com/nats-io/nats.go"
)
var (
clientOnce sync.Once
clientInstance *Client
)
// Client wraps a NATS connection with subscription management.
type Client struct {
conn *nats.Conn
subscriptions []*nats.Subscription
mu sync.Mutex
}
// MessageHandler is a callback function for processing received messages.
type MessageHandler func(subject string, data []byte)
// Connect initializes the singleton NATS client using the global Keys config.
func Connect() {
clientOnce.Do(func() {
if Keys.Address == "" {
cclog.Warn("NATS: no address configured, skipping connection")
return
}
client, err := NewClient(nil)
if err != nil {
cclog.Errorf("NATS connection failed: %v", err)
return
}
clientInstance = client
})
}
// GetClient returns the singleton NATS client instance.
func GetClient() *Client {
if clientInstance == nil {
cclog.Warn("NATS client not initialized")
}
return clientInstance
}
// NewClient creates a new NATS client. If cfg is nil, uses the global Keys config.
func NewClient(cfg *NatsConfig) (*Client, error) {
if cfg == nil {
cfg = &Keys
}
if cfg.Address == "" {
return nil, fmt.Errorf("NATS address is required")
}
var opts []nats.Option
if cfg.Username != "" && cfg.Password != "" {
opts = append(opts, nats.UserInfo(cfg.Username, cfg.Password))
}
if cfg.CredsFilePath != "" {
opts = append(opts, nats.UserCredentials(cfg.CredsFilePath))
}
opts = append(opts, nats.DisconnectErrHandler(func(_ *nats.Conn, err error) {
if err != nil {
cclog.Warnf("NATS disconnected: %v", err)
}
}))
opts = append(opts, nats.ReconnectHandler(func(nc *nats.Conn) {
cclog.Infof("NATS reconnected to %s", nc.ConnectedUrl())
}))
opts = append(opts, nats.ErrorHandler(func(_ *nats.Conn, _ *nats.Subscription, err error) {
cclog.Errorf("NATS error: %v", err)
}))
nc, err := nats.Connect(cfg.Address, opts...)
if err != nil {
return nil, fmt.Errorf("NATS connect failed: %w", err)
}
cclog.Infof("NATS connected to %s", cfg.Address)
return &Client{
conn: nc,
subscriptions: make([]*nats.Subscription, 0),
}, nil
}
// Subscribe registers a handler for messages on the given subject.
func (c *Client) Subscribe(subject string, handler MessageHandler) error {
c.mu.Lock()
defer c.mu.Unlock()
sub, err := c.conn.Subscribe(subject, func(msg *nats.Msg) {
handler(msg.Subject, msg.Data)
})
if err != nil {
return fmt.Errorf("NATS subscribe to '%s' failed: %w", subject, err)
}
c.subscriptions = append(c.subscriptions, sub)
cclog.Infof("NATS subscribed to '%s'", subject)
return nil
}
// SubscribeQueue registers a handler with queue group for load-balanced message processing.
func (c *Client) SubscribeQueue(subject, queue string, handler MessageHandler) error {
c.mu.Lock()
defer c.mu.Unlock()
sub, err := c.conn.QueueSubscribe(subject, queue, func(msg *nats.Msg) {
handler(msg.Subject, msg.Data)
})
if err != nil {
return fmt.Errorf("NATS queue subscribe to '%s' (queue: %s) failed: %w", subject, queue, err)
}
c.subscriptions = append(c.subscriptions, sub)
cclog.Infof("NATS queue subscribed to '%s' (queue: %s)", subject, queue)
return nil
}
// SubscribeChan subscribes to a subject and delivers messages to the provided channel.
func (c *Client) SubscribeChan(subject string, ch chan *nats.Msg) error {
c.mu.Lock()
defer c.mu.Unlock()
sub, err := c.conn.ChanSubscribe(subject, ch)
if err != nil {
return fmt.Errorf("NATS chan subscribe to '%s' failed: %w", subject, err)
}
c.subscriptions = append(c.subscriptions, sub)
cclog.Infof("NATS chan subscribed to '%s'", subject)
return nil
}
// Publish sends data to the specified subject.
func (c *Client) Publish(subject string, data []byte) error {
if err := c.conn.Publish(subject, data); err != nil {
return fmt.Errorf("NATS publish to '%s' failed: %w", subject, err)
}
return nil
}
// Request sends a request and waits for a response with the given context timeout.
func (c *Client) Request(subject string, data []byte, timeout context.Context) ([]byte, error) {
msg, err := c.conn.RequestWithContext(timeout, subject, data)
if err != nil {
return nil, fmt.Errorf("NATS request to '%s' failed: %w", subject, err)
}
return msg.Data, nil
}
// Flush flushes the connection buffer to ensure all published messages are sent.
func (c *Client) Flush() error {
return c.conn.Flush()
}
// Close unsubscribes all subscriptions and closes the NATS connection.
func (c *Client) Close() {
c.mu.Lock()
defer c.mu.Unlock()
for _, sub := range c.subscriptions {
if err := sub.Unsubscribe(); err != nil {
cclog.Warnf("NATS unsubscribe failed: %v", err)
}
}
c.subscriptions = nil
if c.conn != nil {
c.conn.Close()
cclog.Info("NATS connection closed")
}
}
// IsConnected returns true if the client has an active connection.
func (c *Client) IsConnected() bool {
return c.conn != nil && c.conn.IsConnected()
}
// Connection returns the underlying NATS connection for advanced usage.
func (c *Client) Connection() *nats.Conn {
return c.conn
}

63
pkg/nats/config.go Normal file
View File

@@ -0,0 +1,63 @@
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved. This file is part of cc-backend.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package nats
import (
"bytes"
"encoding/json"
cclog "github.com/ClusterCockpit/cc-lib/ccLogger"
)
// NatsConfig holds the configuration for connecting to a NATS server.
type NatsConfig struct {
Address string `json:"address"` // NATS server address (e.g., "nats://localhost:4222")
Username string `json:"username"` // Username for authentication (optional)
Password string `json:"password"` // Password for authentication (optional)
CredsFilePath string `json:"creds-file-path"` // Path to credentials file (optional)
}
// Keys holds the global NATS configuration loaded via Init.
var Keys NatsConfig
const ConfigSchema = `{
"type": "object",
"description": "Configuration for NATS messaging client.",
"properties": {
"address": {
"description": "Address of the NATS server (e.g., 'nats://localhost:4222').",
"type": "string"
},
"username": {
"description": "Username for NATS authentication (optional).",
"type": "string"
},
"password": {
"description": "Password for NATS authentication (optional).",
"type": "string"
},
"creds-file-path": {
"description": "Path to NATS credentials file for authentication (optional).",
"type": "string"
}
},
"required": ["address"]
}`
// Init initializes the global Keys configuration from JSON.
func Init(rawConfig json.RawMessage) error {
var err error
if rawConfig != nil {
dec := json.NewDecoder(bytes.NewReader(rawConfig))
dec.DisallowUnknownFields()
if err = dec.Decode(&Keys); err != nil {
cclog.Errorf("Error while initializing nats client: %s", err.Error())
}
}
return err
}