mirror of
https://github.com/ClusterCockpit/cc-metric-collector.git
synced 2025-11-15 07:33:49 +01:00
Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e1f05acce0 |
@@ -1,33 +0,0 @@
|
|||||||
## `amqp` sink
|
|
||||||
|
|
||||||
The `amqp` sink publishes all metrics into a RabbitMQ network. The publishing key is the queue name in the configuration file
|
|
||||||
|
|
||||||
### Configuration structure
|
|
||||||
|
|
||||||
```json
|
|
||||||
{
|
|
||||||
"<name>": {
|
|
||||||
"type": "amqp",
|
|
||||||
"queue_name" : "myqueue",
|
|
||||||
"batch_size" : 1000,
|
|
||||||
"flush_delay": "4s",
|
|
||||||
"publish_timeout": "1s",
|
|
||||||
"host": "dbhost.example.com",
|
|
||||||
"port": 5672,
|
|
||||||
"username": "exampleuser",
|
|
||||||
"password" : "examplepw",
|
|
||||||
"meta_as_tags" : [],
|
|
||||||
}
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
- `type`: makes the sink an `amqp` sink, also `rabbitmq` is allowed as alias
|
|
||||||
- `queue_name`: All metrics are published to this queue
|
|
||||||
- `host`: Hostname of the RabbitMQ server
|
|
||||||
- `port`: Port number of the RabbitMQ server
|
|
||||||
- `username`: Username for basic authentication
|
|
||||||
- `password`: Password for basic authentication
|
|
||||||
- `meta_as_tags`: print all meta information as tags in the output (optional)
|
|
||||||
- `publish_timeout`: Timeout for each publication operation (default `1s`)
|
|
||||||
- `flush_delay`: Group metrics coming in to a single batch (default `4s`)
|
|
||||||
- `batch_size`: Maximal batch size. If `batch_size` is reached before the end of `flush_delay`, the metrics are sent without further delay (default: `1000`)
|
|
||||||
@@ -2,26 +2,28 @@ package sinks
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
|
"os"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
|
cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
|
||||||
lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
|
lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
|
||||||
|
"github.com/go-mqtt/mqtt"
|
||||||
influx "github.com/influxdata/line-protocol/v2/lineprotocol"
|
influx "github.com/influxdata/line-protocol/v2/lineprotocol"
|
||||||
amqp "github.com/rabbitmq/amqp091-go"
|
|
||||||
"golang.org/x/exp/slices"
|
"golang.org/x/exp/slices"
|
||||||
)
|
)
|
||||||
|
|
||||||
type AmqpSinkConfig struct {
|
type MqttSinkConfig struct {
|
||||||
// defines JSON tags for 'type' and 'meta_as_tags' (string list)
|
// defines JSON tags for 'type' and 'meta_as_tags' (string list)
|
||||||
// See: metricSink.go
|
// See: metricSink.go
|
||||||
defaultSinkConfig
|
defaultSinkConfig
|
||||||
// Additional config options, for AmqpSink
|
// Additional config options, for MqttSink
|
||||||
QueueName string `json:"queue_name"`
|
ClientID string `json:"client_id"`
|
||||||
|
PersistenceDirectory string `json:"persistence_directory,omitempty"`
|
||||||
// Maximum number of points sent to server in single request.
|
// Maximum number of points sent to server in single request.
|
||||||
// Default: 1000
|
// Default: 1000
|
||||||
BatchSize int `json:"batch_size,omitempty"`
|
BatchSize int `json:"batch_size,omitempty"`
|
||||||
@@ -33,18 +35,20 @@ type AmqpSinkConfig struct {
|
|||||||
FlushInterval string `json:"flush_delay,omitempty"`
|
FlushInterval string `json:"flush_delay,omitempty"`
|
||||||
flushDelay time.Duration
|
flushDelay time.Duration
|
||||||
|
|
||||||
Hostname string `json:"hostname"`
|
DialProtocol string `json:"dial_protocol"`
|
||||||
Port int `json:"port"`
|
Hostname string `json:"hostname"`
|
||||||
PublishTimeout string `json:"publish_timeout,omitempty"`
|
Port int `json:"port"`
|
||||||
publishTimeout time.Duration
|
PauseTimeout string `json:"pause_timeout"`
|
||||||
Username string `json:"username,omitempty"`
|
pauseTimeout time.Duration
|
||||||
Password string `json:"password,omitempty"`
|
KeepAlive uint16 `json:"keep_alive_seconds"`
|
||||||
|
Username string `json:"username,omitempty"`
|
||||||
|
Password string `json:"password,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type AmqpSink struct {
|
type MqttSink struct {
|
||||||
// declares elements 'name' and 'meta_as_tags' (string to bool map!)
|
// declares elements 'name' and 'meta_as_tags' (string to bool map!)
|
||||||
sink
|
sink
|
||||||
config AmqpSinkConfig // entry point to the AmqpSinkConfig
|
config MqttSinkConfig // entry point to the MqttSinkConfig
|
||||||
// influx line protocol encoder
|
// influx line protocol encoder
|
||||||
encoder influx.Encoder
|
encoder influx.Encoder
|
||||||
// number of records stored in the encoder
|
// number of records stored in the encoder
|
||||||
@@ -63,9 +67,8 @@ type AmqpSink struct {
|
|||||||
// WaitGroup to ensure only one send operation is running at a time
|
// WaitGroup to ensure only one send operation is running at a time
|
||||||
sendWaitGroup sync.WaitGroup
|
sendWaitGroup sync.WaitGroup
|
||||||
|
|
||||||
client *amqp.Connection
|
client *mqtt.Client
|
||||||
channel *amqp.Channel
|
mqttconfig mqtt.Config
|
||||||
queue amqp.Queue
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Implement functions required for Sink interface
|
// Implement functions required for Sink interface
|
||||||
@@ -73,7 +76,7 @@ type AmqpSink struct {
|
|||||||
// See: metricSink.go
|
// See: metricSink.go
|
||||||
|
|
||||||
// Code to submit a single CCMetric to the sink
|
// Code to submit a single CCMetric to the sink
|
||||||
func (s *AmqpSink) Write(m lp.CCMetric) error {
|
func (s *MqttSink) Write(m lp.CCMetric) error {
|
||||||
|
|
||||||
// Lock for encoder usage
|
// Lock for encoder usage
|
||||||
s.encoderLock.Lock()
|
s.encoderLock.Lock()
|
||||||
@@ -194,7 +197,7 @@ func (s *AmqpSink) Write(m lp.CCMetric) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// If the sink uses batched sends internally, you can tell to flush its buffers
|
// If the sink uses batched sends internally, you can tell to flush its buffers
|
||||||
func (s *AmqpSink) Flush() error {
|
func (s *MqttSink) Flush() error {
|
||||||
|
|
||||||
// Lock for encoder usage
|
// Lock for encoder usage
|
||||||
// Own lock for as short as possible: the time it takes to clone the buffer.
|
// Own lock for as short as possible: the time it takes to clone the buffer.
|
||||||
@@ -219,21 +222,37 @@ func (s *AmqpSink) Flush() error {
|
|||||||
go func() {
|
go func() {
|
||||||
defer s.sendWaitGroup.Done()
|
defer s.sendWaitGroup.Done()
|
||||||
//startTime := time.Now()
|
//startTime := time.Now()
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), s.config.flushDelay)
|
for {
|
||||||
defer cancel()
|
exchange, err := s.client.PublishAtLeastOnce(buf, s.config.ClientID)
|
||||||
err := s.channel.PublishWithContext(ctx, "", s.queue.Name, false, false, amqp.Publishing{
|
switch {
|
||||||
ContentType: "text/plain",
|
case err == nil:
|
||||||
Body: buf,
|
return
|
||||||
})
|
|
||||||
if err != nil {
|
case mqtt.IsDeny(err), errors.Is(err, mqtt.ErrClosed):
|
||||||
cclog.ComponentError(s.name, err.Error())
|
return
|
||||||
|
|
||||||
|
case errors.Is(err, mqtt.ErrMax):
|
||||||
|
time.Sleep(s.config.pauseTimeout)
|
||||||
|
|
||||||
|
default:
|
||||||
|
time.Sleep(s.config.pauseTimeout)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
for err := range exchange {
|
||||||
|
if errors.Is(err, mqtt.ErrClosed) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
return
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close sink: close network connection, close files, close libraries, ...
|
// Close sink: close network connection, close files, close libraries, ...
|
||||||
func (s *AmqpSink) Close() {
|
func (s *MqttSink) Close() {
|
||||||
|
|
||||||
cclog.ComponentDebug(s.name, "CLOSE")
|
cclog.ComponentDebug(s.name, "CLOSE")
|
||||||
|
|
||||||
@@ -258,18 +277,19 @@ func (s *AmqpSink) Close() {
|
|||||||
|
|
||||||
// New function to create a new instance of the sink
|
// New function to create a new instance of the sink
|
||||||
// Initialize the sink by giving it a name and reading in the config JSON
|
// Initialize the sink by giving it a name and reading in the config JSON
|
||||||
func NewAmqpSink(name string, config json.RawMessage) (Sink, error) {
|
func NewMqttSink(name string, config json.RawMessage) (Sink, error) {
|
||||||
s := new(AmqpSink)
|
s := new(MqttSink)
|
||||||
|
|
||||||
// Set name of sampleSink
|
// Set name of sampleSink
|
||||||
// The name should be chosen in such a way that different instances of AmqpSink can be distinguished
|
// The name should be chosen in such a way that different instances of MqttSink can be distinguished
|
||||||
s.name = fmt.Sprintf("AmqpSink(%s)", name) // Always specify a name here
|
s.name = fmt.Sprintf("MqttSink(%s)", name) // Always specify a name here
|
||||||
|
|
||||||
// Set defaults in s.config
|
// Set defaults in s.config
|
||||||
// Allow overwriting these defaults by reading config JSON
|
// Allow overwriting these defaults by reading config JSON
|
||||||
|
|
||||||
s.config.PublishTimeout = "4s"
|
s.config.PauseTimeout = "4s"
|
||||||
s.config.publishTimeout = time.Duration(4) * time.Second
|
s.config.pauseTimeout = time.Duration(4) * time.Second
|
||||||
|
s.config.DialProtocol = "tcp"
|
||||||
s.config.Hostname = "localhost"
|
s.config.Hostname = "localhost"
|
||||||
s.config.Port = 1883
|
s.config.Port = 1883
|
||||||
|
|
||||||
@@ -291,10 +311,10 @@ func NewAmqpSink(name string, config json.RawMessage) (Sink, error) {
|
|||||||
|
|
||||||
// Check if all required fields in the config are set
|
// Check if all required fields in the config are set
|
||||||
// E.g. use 'len(s.config.Option) > 0' for string settings
|
// E.g. use 'len(s.config.Option) > 0' for string settings
|
||||||
if t, err := time.ParseDuration(s.config.PublishTimeout); err == nil {
|
if t, err := time.ParseDuration(s.config.PauseTimeout); err == nil {
|
||||||
s.config.publishTimeout = t
|
s.config.pauseTimeout = t
|
||||||
} else {
|
} else {
|
||||||
err := fmt.Errorf("to parse duration for PublishTimeout: %s", s.config.PublishTimeout)
|
err := fmt.Errorf("to parse duration for PauseTimeout: %s", s.config.PauseTimeout)
|
||||||
cclog.ComponentError(s.name, err.Error())
|
cclog.ComponentError(s.name, err.Error())
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -306,46 +326,46 @@ func NewAmqpSink(name string, config json.RawMessage) (Sink, error) {
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
url := net.JoinHostPort(s.config.Hostname, fmt.Sprintf("%d", s.config.Port))
|
switch s.config.DialProtocol {
|
||||||
userpart := ""
|
case "tcp", "udp":
|
||||||
if len(s.config.Username) > 0 {
|
default:
|
||||||
userpart = s.config.Username
|
err := errors.New("failed to parse dial protocol, allowed: tcp, udp")
|
||||||
if len(s.config.Password) > 0 {
|
cclog.ComponentError(s.name, err.Error())
|
||||||
userpart += ":" + s.config.Password
|
return nil, err
|
||||||
}
|
}
|
||||||
userpart += "@"
|
|
||||||
|
var persistence mqtt.Persistence
|
||||||
|
if len(s.config.PersistenceDirectory) > 0 {
|
||||||
|
persistence = mqtt.FileSystem(s.config.PersistenceDirectory)
|
||||||
|
} else {
|
||||||
|
tmpdir, err := os.MkdirTemp("", "mqtt")
|
||||||
|
if err == nil {
|
||||||
|
persistence = mqtt.FileSystem(tmpdir)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
url = fmt.Sprintf("amqp://%s%s", userpart, url)
|
|
||||||
|
|
||||||
// Establish connection to the server, library, ...
|
// Establish connection to the server, library, ...
|
||||||
// Check required files exist and lookup path(s) of executable(s)
|
// Check required files exist and lookup path(s) of executable(s)
|
||||||
c, err := amqp.Dial(url)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
s.client = c
|
|
||||||
|
|
||||||
ch, err := c.Channel()
|
dialer := mqtt.NewDialer(s.config.DialProtocol, net.JoinHostPort(s.config.Hostname, fmt.Sprintf("%d", s.config.Port)))
|
||||||
if err != nil {
|
|
||||||
s.client.Close()
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
s.channel = ch
|
|
||||||
|
|
||||||
q, err := ch.QueueDeclare(
|
s.mqttconfig = mqtt.Config{
|
||||||
s.config.QueueName, // name
|
Dialer: dialer,
|
||||||
false, // durable
|
PauseTimeout: s.config.pauseTimeout,
|
||||||
false, // delete when unused
|
KeepAlive: uint16(s.config.KeepAlive),
|
||||||
false, // exclusive
|
}
|
||||||
false, // no-wait
|
if len(s.config.Username) > 0 {
|
||||||
nil, // arguments
|
s.mqttconfig.UserName = s.config.Username
|
||||||
)
|
}
|
||||||
|
if len(s.config.Password) > 0 {
|
||||||
|
s.mqttconfig.Password = []byte(s.config.Password)
|
||||||
|
}
|
||||||
|
|
||||||
|
client, err := mqtt.InitSession(s.config.ClientID, persistence, &s.mqttconfig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.channel.Close()
|
|
||||||
s.client.Close()
|
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
s.queue = q
|
s.client = client
|
||||||
|
|
||||||
// Return (nil, meaningful error message) in case of errors
|
// Return (nil, meaningful error message) in case of errors
|
||||||
return s, nil
|
return s, nil
|
||||||
39
sinks/mqttSink.md
Normal file
39
sinks/mqttSink.md
Normal file
@@ -0,0 +1,39 @@
|
|||||||
|
## `mqtt` sink
|
||||||
|
|
||||||
|
The `mqtt` sink publishes all metrics into a MQTT network.
|
||||||
|
|
||||||
|
### Configuration structure
|
||||||
|
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"<name>": {
|
||||||
|
"type": "mqtt",
|
||||||
|
"client_id" : "myid",
|
||||||
|
"persistence_directory": "/tmp",
|
||||||
|
"batch_size": 1000,
|
||||||
|
"flush_delay": "1s",
|
||||||
|
"dial_protocol": "tcp",
|
||||||
|
"host": "dbhost.example.com",
|
||||||
|
"port": 1883,
|
||||||
|
"user": "exampleuser",
|
||||||
|
"password" : "examplepw",
|
||||||
|
"pause_timeout": "1s",
|
||||||
|
"keep_alive_seconds": 10,
|
||||||
|
"meta_as_tags" : [],
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
- `type`: makes the sink an `mqtt` sink
|
||||||
|
- `client_id`: MQTT clients use a client_id to talk to the broker
|
||||||
|
- `persistence_directory`: MQTT stores messages temporarly on disc if the broker is not available. Folder needs to be writable (default: `/tmp`)
|
||||||
|
- `pause_timeout`: Waittime when published failed
|
||||||
|
- `keep_alive_seconds`: Keep the connection alive for some time. Recommended to be longer than global `interval`.
|
||||||
|
- `flush_delay`: Group metrics coming in to a single batch
|
||||||
|
- `batch_size`: Maximal batch size. If `batch_size` is reached before the end of `flush_delay`, the metrics are sent without further delay
|
||||||
|
- `dial_protocol`: Use `tcp` or `udp` for the MQTT communication
|
||||||
|
- `host`: Hostname of the MQTT broker
|
||||||
|
- `port`: Port number of the MQTT broker
|
||||||
|
- `user`: Username for authentication
|
||||||
|
- `password`: Password for authentication
|
||||||
|
- `meta_as_tags`: print all meta information as tags in the output (optional)
|
||||||
@@ -21,8 +21,7 @@ var AvailableSinks = map[string]func(name string, config json.RawMessage) (Sink,
|
|||||||
"influxdb": NewInfluxSink,
|
"influxdb": NewInfluxSink,
|
||||||
"influxasync": NewInfluxAsyncSink,
|
"influxasync": NewInfluxAsyncSink,
|
||||||
"http": NewHttpSink,
|
"http": NewHttpSink,
|
||||||
"amqp": NewAmqpSink,
|
"mqtt": NewMqttSink,
|
||||||
"rabbitmq": NewAmqpSink,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Metric collector manager data structure
|
// Metric collector manager data structure
|
||||||
|
|||||||
Reference in New Issue
Block a user