Compare commits

..

1 Commits

Author SHA1 Message Date
Thomas Roehl
157bd6f0cc Initial version of a RabbitMQ/AMQP sink 2024-04-19 17:59:43 +02:00
4 changed files with 103 additions and 128 deletions

View File

@@ -2,28 +2,26 @@ package sinks
import ( import (
"bytes" "bytes"
"context"
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"net" "net"
"os"
"sync" "sync"
"time" "time"
cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric" lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
"github.com/go-mqtt/mqtt"
influx "github.com/influxdata/line-protocol/v2/lineprotocol" influx "github.com/influxdata/line-protocol/v2/lineprotocol"
amqp "github.com/rabbitmq/amqp091-go"
"golang.org/x/exp/slices" "golang.org/x/exp/slices"
) )
type MqttSinkConfig struct { type AmqpSinkConfig struct {
// defines JSON tags for 'type' and 'meta_as_tags' (string list) // defines JSON tags for 'type' and 'meta_as_tags' (string list)
// See: metricSink.go // See: metricSink.go
defaultSinkConfig defaultSinkConfig
// Additional config options, for MqttSink // Additional config options, for AmqpSink
ClientID string `json:"client_id"` QueueName string `json:"queue_name"`
PersistenceDirectory string `json:"persistence_directory,omitempty"`
// Maximum number of points sent to server in single request. // Maximum number of points sent to server in single request.
// Default: 1000 // Default: 1000
BatchSize int `json:"batch_size,omitempty"` BatchSize int `json:"batch_size,omitempty"`
@@ -35,20 +33,18 @@ type MqttSinkConfig struct {
FlushInterval string `json:"flush_delay,omitempty"` FlushInterval string `json:"flush_delay,omitempty"`
flushDelay time.Duration flushDelay time.Duration
DialProtocol string `json:"dial_protocol"`
Hostname string `json:"hostname"` Hostname string `json:"hostname"`
Port int `json:"port"` Port int `json:"port"`
PauseTimeout string `json:"pause_timeout"` PublishTimeout string `json:"publish_timeout,omitempty"`
pauseTimeout time.Duration publishTimeout time.Duration
KeepAlive uint16 `json:"keep_alive_seconds"`
Username string `json:"username,omitempty"` Username string `json:"username,omitempty"`
Password string `json:"password,omitempty"` Password string `json:"password,omitempty"`
} }
type MqttSink struct { type AmqpSink struct {
// declares elements 'name' and 'meta_as_tags' (string to bool map!) // declares elements 'name' and 'meta_as_tags' (string to bool map!)
sink sink
config MqttSinkConfig // entry point to the MqttSinkConfig config AmqpSinkConfig // entry point to the AmqpSinkConfig
// influx line protocol encoder // influx line protocol encoder
encoder influx.Encoder encoder influx.Encoder
// number of records stored in the encoder // number of records stored in the encoder
@@ -67,8 +63,9 @@ type MqttSink struct {
// WaitGroup to ensure only one send operation is running at a time // WaitGroup to ensure only one send operation is running at a time
sendWaitGroup sync.WaitGroup sendWaitGroup sync.WaitGroup
client *mqtt.Client client *amqp.Connection
mqttconfig mqtt.Config channel *amqp.Channel
queue amqp.Queue
} }
// Implement functions required for Sink interface // Implement functions required for Sink interface
@@ -76,7 +73,7 @@ type MqttSink struct {
// See: metricSink.go // See: metricSink.go
// Code to submit a single CCMetric to the sink // Code to submit a single CCMetric to the sink
func (s *MqttSink) Write(m lp.CCMetric) error { func (s *AmqpSink) Write(m lp.CCMetric) error {
// Lock for encoder usage // Lock for encoder usage
s.encoderLock.Lock() s.encoderLock.Lock()
@@ -197,7 +194,7 @@ func (s *MqttSink) Write(m lp.CCMetric) error {
} }
// If the sink uses batched sends internally, you can tell to flush its buffers // If the sink uses batched sends internally, you can tell to flush its buffers
func (s *MqttSink) Flush() error { func (s *AmqpSink) Flush() error {
// Lock for encoder usage // Lock for encoder usage
// Own lock for as short as possible: the time it takes to clone the buffer. // Own lock for as short as possible: the time it takes to clone the buffer.
@@ -222,37 +219,21 @@ func (s *MqttSink) Flush() error {
go func() { go func() {
defer s.sendWaitGroup.Done() defer s.sendWaitGroup.Done()
//startTime := time.Now() //startTime := time.Now()
for { ctx, cancel := context.WithTimeout(context.Background(), s.config.flushDelay)
exchange, err := s.client.PublishAtLeastOnce(buf, s.config.ClientID) defer cancel()
switch { err := s.channel.PublishWithContext(ctx, "", s.queue.Name, false, false, amqp.Publishing{
case err == nil: ContentType: "text/plain",
return Body: buf,
})
case mqtt.IsDeny(err), errors.Is(err, mqtt.ErrClosed): if err != nil {
return cclog.ComponentError(s.name, err.Error())
case errors.Is(err, mqtt.ErrMax):
time.Sleep(s.config.pauseTimeout)
default:
time.Sleep(s.config.pauseTimeout)
continue
}
for err := range exchange {
if errors.Is(err, mqtt.ErrClosed) {
return
}
}
return
} }
}() }()
return nil return nil
} }
// Close sink: close network connection, close files, close libraries, ... // Close sink: close network connection, close files, close libraries, ...
func (s *MqttSink) Close() { func (s *AmqpSink) Close() {
cclog.ComponentDebug(s.name, "CLOSE") cclog.ComponentDebug(s.name, "CLOSE")
@@ -277,19 +258,18 @@ func (s *MqttSink) Close() {
// New function to create a new instance of the sink // New function to create a new instance of the sink
// Initialize the sink by giving it a name and reading in the config JSON // Initialize the sink by giving it a name and reading in the config JSON
func NewMqttSink(name string, config json.RawMessage) (Sink, error) { func NewAmqpSink(name string, config json.RawMessage) (Sink, error) {
s := new(MqttSink) s := new(AmqpSink)
// Set name of sampleSink // Set name of sampleSink
// The name should be chosen in such a way that different instances of MqttSink can be distinguished // The name should be chosen in such a way that different instances of AmqpSink can be distinguished
s.name = fmt.Sprintf("MqttSink(%s)", name) // Always specify a name here s.name = fmt.Sprintf("AmqpSink(%s)", name) // Always specify a name here
// Set defaults in s.config // Set defaults in s.config
// Allow overwriting these defaults by reading config JSON // Allow overwriting these defaults by reading config JSON
s.config.PauseTimeout = "4s" s.config.PublishTimeout = "4s"
s.config.pauseTimeout = time.Duration(4) * time.Second s.config.publishTimeout = time.Duration(4) * time.Second
s.config.DialProtocol = "tcp"
s.config.Hostname = "localhost" s.config.Hostname = "localhost"
s.config.Port = 1883 s.config.Port = 1883
@@ -311,10 +291,10 @@ func NewMqttSink(name string, config json.RawMessage) (Sink, error) {
// Check if all required fields in the config are set // Check if all required fields in the config are set
// E.g. use 'len(s.config.Option) > 0' for string settings // E.g. use 'len(s.config.Option) > 0' for string settings
if t, err := time.ParseDuration(s.config.PauseTimeout); err == nil { if t, err := time.ParseDuration(s.config.PublishTimeout); err == nil {
s.config.pauseTimeout = t s.config.publishTimeout = t
} else { } else {
err := fmt.Errorf("to parse duration for PauseTimeout: %s", s.config.PauseTimeout) err := fmt.Errorf("to parse duration for PublishTimeout: %s", s.config.PublishTimeout)
cclog.ComponentError(s.name, err.Error()) cclog.ComponentError(s.name, err.Error())
return nil, err return nil, err
} }
@@ -326,46 +306,46 @@ func NewMqttSink(name string, config json.RawMessage) (Sink, error) {
return nil, err return nil, err
} }
switch s.config.DialProtocol { url := net.JoinHostPort(s.config.Hostname, fmt.Sprintf("%d", s.config.Port))
case "tcp", "udp": userpart := ""
default: if len(s.config.Username) > 0 {
err := errors.New("failed to parse dial protocol, allowed: tcp, udp") userpart = s.config.Username
cclog.ComponentError(s.name, err.Error()) if len(s.config.Password) > 0 {
return nil, err userpart += ":" + s.config.Password
}
var persistence mqtt.Persistence
if len(s.config.PersistenceDirectory) > 0 {
persistence = mqtt.FileSystem(s.config.PersistenceDirectory)
} else {
tmpdir, err := os.MkdirTemp("", "mqtt")
if err == nil {
persistence = mqtt.FileSystem(tmpdir)
} }
userpart += "@"
} }
url = fmt.Sprintf("amqp://%s%s", userpart, url)
// Establish connection to the server, library, ... // Establish connection to the server, library, ...
// Check required files exist and lookup path(s) of executable(s) // Check required files exist and lookup path(s) of executable(s)
c, err := amqp.Dial(url)
dialer := mqtt.NewDialer(s.config.DialProtocol, net.JoinHostPort(s.config.Hostname, fmt.Sprintf("%d", s.config.Port)))
s.mqttconfig = mqtt.Config{
Dialer: dialer,
PauseTimeout: s.config.pauseTimeout,
KeepAlive: uint16(s.config.KeepAlive),
}
if len(s.config.Username) > 0 {
s.mqttconfig.UserName = s.config.Username
}
if len(s.config.Password) > 0 {
s.mqttconfig.Password = []byte(s.config.Password)
}
client, err := mqtt.InitSession(s.config.ClientID, persistence, &s.mqttconfig)
if err != nil { if err != nil {
return nil, err return nil, err
} }
s.client = client s.client = c
ch, err := c.Channel()
if err != nil {
s.client.Close()
return nil, err
}
s.channel = ch
q, err := ch.QueueDeclare(
s.config.QueueName, // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
s.channel.Close()
s.client.Close()
return nil, err
}
s.queue = q
// Return (nil, meaningful error message) in case of errors // Return (nil, meaningful error message) in case of errors
return s, nil return s, nil

33
sinks/amqpSink.md Normal file
View File

@@ -0,0 +1,33 @@
## `amqp` sink
The `amqp` sink publishes all metrics into a RabbitMQ network. The publishing key is the queue name in the configuration file
### Configuration structure
```json
{
"<name>": {
"type": "amqp",
"queue_name" : "myqueue",
"batch_size" : 1000,
"flush_delay": "4s",
"publish_timeout": "1s",
"host": "dbhost.example.com",
"port": 5672,
"username": "exampleuser",
"password" : "examplepw",
"meta_as_tags" : [],
}
}
```
- `type`: makes the sink an `amqp` sink, also `rabbitmq` is allowed as alias
- `queue_name`: All metrics are published to this queue
- `host`: Hostname of the RabbitMQ server
- `port`: Port number of the RabbitMQ server
- `username`: Username for basic authentication
- `password`: Password for basic authentication
- `meta_as_tags`: print all meta information as tags in the output (optional)
- `publish_timeout`: Timeout for each publication operation (default `1s`)
- `flush_delay`: Group metrics coming in to a single batch (default `4s`)
- `batch_size`: Maximal batch size. If `batch_size` is reached before the end of `flush_delay`, the metrics are sent without further delay (default: `1000`)

View File

@@ -1,39 +0,0 @@
## `mqtt` sink
The `mqtt` sink publishes all metrics into a MQTT network.
### Configuration structure
```json
{
"<name>": {
"type": "mqtt",
"client_id" : "myid",
"persistence_directory": "/tmp",
"batch_size": 1000,
"flush_delay": "1s",
"dial_protocol": "tcp",
"host": "dbhost.example.com",
"port": 1883,
"user": "exampleuser",
"password" : "examplepw",
"pause_timeout": "1s",
"keep_alive_seconds": 10,
"meta_as_tags" : [],
}
}
```
- `type`: makes the sink an `mqtt` sink
- `client_id`: MQTT clients use a client_id to talk to the broker
- `persistence_directory`: MQTT stores messages temporarly on disc if the broker is not available. Folder needs to be writable (default: `/tmp`)
- `pause_timeout`: Waittime when published failed
- `keep_alive_seconds`: Keep the connection alive for some time. Recommended to be longer than global `interval`.
- `flush_delay`: Group metrics coming in to a single batch
- `batch_size`: Maximal batch size. If `batch_size` is reached before the end of `flush_delay`, the metrics are sent without further delay
- `dial_protocol`: Use `tcp` or `udp` for the MQTT communication
- `host`: Hostname of the MQTT broker
- `port`: Port number of the MQTT broker
- `user`: Username for authentication
- `password`: Password for authentication
- `meta_as_tags`: print all meta information as tags in the output (optional)

View File

@@ -21,7 +21,8 @@ var AvailableSinks = map[string]func(name string, config json.RawMessage) (Sink,
"influxdb": NewInfluxSink, "influxdb": NewInfluxSink,
"influxasync": NewInfluxAsyncSink, "influxasync": NewInfluxAsyncSink,
"http": NewHttpSink, "http": NewHttpSink,
"mqtt": NewMqttSink, "amqp": NewAmqpSink,
"rabbitmq": NewAmqpSink,
} }
// Metric collector manager data structure // Metric collector manager data structure