From 499b4287f93230a33b41b72e049f2ee75e7cf39e Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Fri, 23 Jan 2026 10:04:41 +0100 Subject: [PATCH] Switch to cclib nats client --- cmd/cc-backend/main.go | 2 +- cmd/cc-backend/server.go | 3 +- internal/api/nats.go | 2 +- pkg/metricstore/lineprotocol.go | 2 +- pkg/nats/client.go | 246 -------------------------------- pkg/nats/config.go | 63 -------- 6 files changed, 5 insertions(+), 313 deletions(-) delete mode 100644 pkg/nats/client.go delete mode 100644 pkg/nats/config.go diff --git a/cmd/cc-backend/main.go b/cmd/cc-backend/main.go index e6753902..0e93944c 100644 --- a/cmd/cc-backend/main.go +++ b/cmd/cc-backend/main.go @@ -29,10 +29,10 @@ import ( "github.com/ClusterCockpit/cc-backend/internal/taskmanager" "github.com/ClusterCockpit/cc-backend/pkg/archive" "github.com/ClusterCockpit/cc-backend/pkg/metricstore" - "github.com/ClusterCockpit/cc-backend/pkg/nats" "github.com/ClusterCockpit/cc-backend/web" ccconf "github.com/ClusterCockpit/cc-lib/v2/ccConfig" cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" + "github.com/ClusterCockpit/cc-lib/v2/nats" "github.com/ClusterCockpit/cc-lib/v2/runtime" "github.com/ClusterCockpit/cc-lib/v2/schema" "github.com/ClusterCockpit/cc-lib/v2/util" diff --git a/cmd/cc-backend/server.go b/cmd/cc-backend/server.go index 5473271c..c6e30824 100644 --- a/cmd/cc-backend/server.go +++ b/cmd/cc-backend/server.go @@ -2,6 +2,7 @@ // All rights reserved. This file is part of cc-backend. // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. + // Package main provides the entry point for the ClusterCockpit backend server. // This file contains HTTP server setup, routing configuration, and // authentication middleware integration. @@ -31,9 +32,9 @@ import ( "github.com/ClusterCockpit/cc-backend/internal/graph/generated" "github.com/ClusterCockpit/cc-backend/internal/routerConfig" "github.com/ClusterCockpit/cc-backend/pkg/metricstore" - "github.com/ClusterCockpit/cc-backend/pkg/nats" "github.com/ClusterCockpit/cc-backend/web" cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" + "github.com/ClusterCockpit/cc-lib/v2/nats" "github.com/ClusterCockpit/cc-lib/v2/runtime" "github.com/gorilla/handlers" "github.com/gorilla/mux" diff --git a/internal/api/nats.go b/internal/api/nats.go index 9fdfbbb2..bbbd151f 100644 --- a/internal/api/nats.go +++ b/internal/api/nats.go @@ -16,9 +16,9 @@ import ( "github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-backend/internal/importer" "github.com/ClusterCockpit/cc-backend/internal/repository" - "github.com/ClusterCockpit/cc-backend/pkg/nats" cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" lp "github.com/ClusterCockpit/cc-lib/v2/ccMessage" + "github.com/ClusterCockpit/cc-lib/v2/nats" "github.com/ClusterCockpit/cc-lib/v2/receivers" "github.com/ClusterCockpit/cc-lib/v2/schema" influx "github.com/influxdata/line-protocol/v2/lineprotocol" diff --git a/pkg/metricstore/lineprotocol.go b/pkg/metricstore/lineprotocol.go index ca8ea138..bfbbef2d 100644 --- a/pkg/metricstore/lineprotocol.go +++ b/pkg/metricstore/lineprotocol.go @@ -11,8 +11,8 @@ import ( "sync" "time" - "github.com/ClusterCockpit/cc-backend/pkg/nats" cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" + "github.com/ClusterCockpit/cc-lib/v2/nats" "github.com/ClusterCockpit/cc-lib/v2/schema" "github.com/influxdata/line-protocol/v2/lineprotocol" ) diff --git a/pkg/nats/client.go b/pkg/nats/client.go deleted file mode 100644 index 3222a525..00000000 --- a/pkg/nats/client.go +++ /dev/null @@ -1,246 +0,0 @@ -// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. -// All rights reserved. This file is part of cc-backend. -// Use of this source code is governed by a MIT-style -// license that can be found in the LICENSE file. - -// Package nats provides a generic NATS messaging client for publish/subscribe communication. -// -// The package wraps the nats.go library with connection management, automatic reconnection -// handling, and subscription tracking. It supports multiple authentication methods including -// username/password and credential files. -// -// # Configuration -// -// Configure the client via JSON in the application config: -// -// { -// "nats": { -// "address": "nats://localhost:4222", -// "username": "user", -// "password": "secret" -// } -// } -// -// Or using a credentials file: -// -// { -// "nats": { -// "address": "nats://localhost:4222", -// "creds-file-path": "/path/to/creds.json" -// } -// } -// -// # Usage -// -// The package provides a singleton client initialized once and retrieved globally: -// -// nats.Init(rawConfig) -// nats.Connect() -// -// client := nats.GetClient() -// client.Subscribe("events", func(subject string, data []byte) { -// fmt.Printf("Received: %s\n", data) -// }) -// -// client.Publish("events", []byte("hello")) -// -// # Thread Safety -// -// All Client methods are safe for concurrent use. -package nats - -import ( - "context" - "fmt" - "sync" - - cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" - "github.com/nats-io/nats.go" -) - -var ( - clientOnce sync.Once - clientInstance *Client -) - -// Client wraps a NATS connection with subscription management. -type Client struct { - conn *nats.Conn - subscriptions []*nats.Subscription - mu sync.Mutex -} - -// MessageHandler is a callback function for processing received messages. -type MessageHandler func(subject string, data []byte) - -// Connect initializes the singleton NATS client using the global Keys config. -func Connect() { - clientOnce.Do(func() { - if Keys.Address == "" { - cclog.Info("NATS: no address configured, skipping connection") - return - } - - client, err := NewClient(nil) - if err != nil { - cclog.Warnf("NATS connection failed: %v", err) - return - } - - clientInstance = client - }) -} - -// GetClient returns the singleton NATS client instance. -func GetClient() *Client { - if clientInstance == nil { - cclog.Warn("NATS client not initialized") - } - return clientInstance -} - -// NewClient creates a new NATS client. If cfg is nil, uses the global Keys config. -func NewClient(cfg *NatsConfig) (*Client, error) { - if cfg == nil { - cfg = &Keys - } - - if cfg.Address == "" { - return nil, fmt.Errorf("NATS address is required") - } - - var opts []nats.Option - - if cfg.Username != "" && cfg.Password != "" { - opts = append(opts, nats.UserInfo(cfg.Username, cfg.Password)) - } - - if cfg.CredsFilePath != "" { - opts = append(opts, nats.UserCredentials(cfg.CredsFilePath)) - } - - opts = append(opts, nats.DisconnectErrHandler(func(_ *nats.Conn, err error) { - if err != nil { - cclog.Warnf("NATS disconnected: %v", err) - } - })) - - opts = append(opts, nats.ReconnectHandler(func(nc *nats.Conn) { - cclog.Infof("NATS reconnected to %s", nc.ConnectedUrl()) - })) - - opts = append(opts, nats.ErrorHandler(func(_ *nats.Conn, _ *nats.Subscription, err error) { - cclog.Errorf("NATS error: %v", err) - })) - - nc, err := nats.Connect(cfg.Address, opts...) - if err != nil { - return nil, fmt.Errorf("NATS connect failed: %w", err) - } - - cclog.Infof("NATS connected to %s", cfg.Address) - - return &Client{ - conn: nc, - subscriptions: make([]*nats.Subscription, 0), - }, nil -} - -// Subscribe registers a handler for messages on the given subject. -func (c *Client) Subscribe(subject string, handler MessageHandler) error { - c.mu.Lock() - defer c.mu.Unlock() - - sub, err := c.conn.Subscribe(subject, func(msg *nats.Msg) { - handler(msg.Subject, msg.Data) - }) - if err != nil { - return fmt.Errorf("NATS subscribe to '%s' failed: %w", subject, err) - } - - c.subscriptions = append(c.subscriptions, sub) - cclog.Infof("NATS subscribed to '%s'", subject) - return nil -} - -// SubscribeQueue registers a handler with queue group for load-balanced message processing. -func (c *Client) SubscribeQueue(subject, queue string, handler MessageHandler) error { - c.mu.Lock() - defer c.mu.Unlock() - - sub, err := c.conn.QueueSubscribe(subject, queue, func(msg *nats.Msg) { - handler(msg.Subject, msg.Data) - }) - if err != nil { - return fmt.Errorf("NATS queue subscribe to '%s' (queue: %s) failed: %w", subject, queue, err) - } - - c.subscriptions = append(c.subscriptions, sub) - cclog.Infof("NATS queue subscribed to '%s' (queue: %s)", subject, queue) - return nil -} - -// SubscribeChan subscribes to a subject and delivers messages to the provided channel. -func (c *Client) SubscribeChan(subject string, ch chan *nats.Msg) error { - c.mu.Lock() - defer c.mu.Unlock() - - sub, err := c.conn.ChanSubscribe(subject, ch) - if err != nil { - return fmt.Errorf("NATS chan subscribe to '%s' failed: %w", subject, err) - } - - c.subscriptions = append(c.subscriptions, sub) - cclog.Infof("NATS chan subscribed to '%s'", subject) - return nil -} - -// Publish sends data to the specified subject. -func (c *Client) Publish(subject string, data []byte) error { - if err := c.conn.Publish(subject, data); err != nil { - return fmt.Errorf("NATS publish to '%s' failed: %w", subject, err) - } - return nil -} - -// Request sends a request and waits for a response with the given context timeout. -func (c *Client) Request(subject string, data []byte, timeout context.Context) ([]byte, error) { - msg, err := c.conn.RequestWithContext(timeout, subject, data) - if err != nil { - return nil, fmt.Errorf("NATS request to '%s' failed: %w", subject, err) - } - return msg.Data, nil -} - -// Flush flushes the connection buffer to ensure all published messages are sent. -func (c *Client) Flush() error { - return c.conn.Flush() -} - -// Close unsubscribes all subscriptions and closes the NATS connection. -func (c *Client) Close() { - c.mu.Lock() - defer c.mu.Unlock() - - for _, sub := range c.subscriptions { - if err := sub.Unsubscribe(); err != nil { - cclog.Warnf("NATS unsubscribe failed: %v", err) - } - } - c.subscriptions = nil - - if c.conn != nil { - c.conn.Close() - cclog.Info("NATS connection closed") - } -} - -// IsConnected returns true if the client has an active connection. -func (c *Client) IsConnected() bool { - return c.conn != nil && c.conn.IsConnected() -} - -// Connection returns the underlying NATS connection for advanced usage. -func (c *Client) Connection() *nats.Conn { - return c.conn -} diff --git a/pkg/nats/config.go b/pkg/nats/config.go deleted file mode 100644 index c9ab48a5..00000000 --- a/pkg/nats/config.go +++ /dev/null @@ -1,63 +0,0 @@ -// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. -// All rights reserved. This file is part of cc-backend. -// Use of this source code is governed by a MIT-style -// license that can be found in the LICENSE file. - -package nats - -import ( - "bytes" - "encoding/json" - - cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" -) - -// NatsConfig holds the configuration for connecting to a NATS server. -type NatsConfig struct { - Address string `json:"address"` // NATS server address (e.g., "nats://localhost:4222") - Username string `json:"username"` // Username for authentication (optional) - Password string `json:"password"` // Password for authentication (optional) - CredsFilePath string `json:"creds-file-path"` // Path to credentials file (optional) -} - -// Keys holds the global NATS configuration loaded via Init. -var Keys NatsConfig - -const ConfigSchema = `{ - "type": "object", - "description": "Configuration for NATS messaging client.", - "properties": { - "address": { - "description": "Address of the NATS server (e.g., 'nats://localhost:4222').", - "type": "string" - }, - "username": { - "description": "Username for NATS authentication (optional).", - "type": "string" - }, - "password": { - "description": "Password for NATS authentication (optional).", - "type": "string" - }, - "creds-file-path": { - "description": "Path to NATS credentials file for authentication (optional).", - "type": "string" - } - }, - "required": ["address"] -}` - -// Init initializes the global Keys configuration from JSON. -func Init(rawConfig json.RawMessage) error { - var err error - - if rawConfig != nil { - dec := json.NewDecoder(bytes.NewReader(rawConfig)) - dec.DisallowUnknownFields() - if err = dec.Decode(&Keys); err != nil { - cclog.Errorf("Error while initializing nats client: %s", err.Error()) - } - } - - return err -}