Compare commits

..

3 Commits

Author SHA1 Message Date
Thomas Roehl
157bd6f0cc Initial version of a RabbitMQ/AMQP sink 2024-04-19 17:59:43 +02:00
Thomas Roehl
16c796a2b8 Merge branch 'develop' of github.com:ClusterCockpit/cc-metric-collector into develop 2024-04-10 19:57:54 +02:00
Thomas Roehl
b6c4769db3 Remove stray error check 2024-04-10 19:57:46 +02:00
6 changed files with 389 additions and 231 deletions

View File

@@ -99,10 +99,7 @@ func (m *CustomCmdCollector) Read(interval time.Duration, output chan lp.CCMetri
continue continue
} }
y := lp.FromInfluxMetric(c) output <- lp.FromInfluxMetric(c)
if err == nil {
output <- y
}
} }
} }
for _, file := range m.files { for _, file := range m.files {
@@ -121,10 +118,7 @@ func (m *CustomCmdCollector) Read(interval time.Duration, output chan lp.CCMetri
if skip { if skip {
continue continue
} }
y := lp.FromInfluxMetric(f) output <- lp.FromInfluxMetric(f)
if err == nil {
output <- y
}
} }
} }
} }

352
sinks/amqpSink.go Normal file
View File

@@ -0,0 +1,352 @@
package sinks
import (
"bytes"
"context"
"encoding/json"
"fmt"
"net"
"sync"
"time"
cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
influx "github.com/influxdata/line-protocol/v2/lineprotocol"
amqp "github.com/rabbitmq/amqp091-go"
"golang.org/x/exp/slices"
)
type AmqpSinkConfig struct {
// defines JSON tags for 'type' and 'meta_as_tags' (string list)
// See: metricSink.go
defaultSinkConfig
// Additional config options, for AmqpSink
QueueName string `json:"queue_name"`
// Maximum number of points sent to server in single request.
// Default: 1000
BatchSize int `json:"batch_size,omitempty"`
// Time interval for delayed sending of metrics.
// If the buffers are already filled before the end of this interval,
// the metrics are sent without further delay.
// Default: 1s
FlushInterval string `json:"flush_delay,omitempty"`
flushDelay time.Duration
Hostname string `json:"hostname"`
Port int `json:"port"`
PublishTimeout string `json:"publish_timeout,omitempty"`
publishTimeout time.Duration
Username string `json:"username,omitempty"`
Password string `json:"password,omitempty"`
}
type AmqpSink struct {
// declares elements 'name' and 'meta_as_tags' (string to bool map!)
sink
config AmqpSinkConfig // entry point to the AmqpSinkConfig
// influx line protocol encoder
encoder influx.Encoder
// number of records stored in the encoder
numRecordsInEncoder int
// List of tags and meta data tags which should be used as tags
extended_tag_list []key_value_pair
// Flush() runs in another goroutine and accesses the influx line protocol encoder,
// so this encoderLock has to protect the encoder and numRecordsInEncoder
encoderLock sync.Mutex
// timer to run Flush()
flushTimer *time.Timer
// Lock to assure that only one timer is running at a time
timerLock sync.Mutex
// WaitGroup to ensure only one send operation is running at a time
sendWaitGroup sync.WaitGroup
client *amqp.Connection
channel *amqp.Channel
queue amqp.Queue
}
// Implement functions required for Sink interface
// Write(...), Flush(), Close()
// See: metricSink.go
// Code to submit a single CCMetric to the sink
func (s *AmqpSink) Write(m lp.CCMetric) error {
// Lock for encoder usage
s.encoderLock.Lock()
// Encode measurement name
s.encoder.StartLine(m.Name())
// copy tags and meta data which should be used as tags
s.extended_tag_list = s.extended_tag_list[:0]
for key, value := range m.Tags() {
s.extended_tag_list =
append(
s.extended_tag_list,
key_value_pair{
key: key,
value: value,
},
)
}
for _, key := range s.config.MetaAsTags {
if value, ok := m.GetMeta(key); ok {
s.extended_tag_list =
append(
s.extended_tag_list,
key_value_pair{
key: key,
value: value,
},
)
}
}
// Encode tags (they musts be in lexical order)
slices.SortFunc(
s.extended_tag_list,
func(a key_value_pair, b key_value_pair) int {
if a.key < b.key {
return -1
}
if a.key > b.key {
return +1
}
return 0
},
)
for i := range s.extended_tag_list {
s.encoder.AddTag(
s.extended_tag_list[i].key,
s.extended_tag_list[i].value,
)
}
// Encode fields
for key, value := range m.Fields() {
s.encoder.AddField(key, influx.MustNewValue(value))
}
// Encode time stamp
s.encoder.EndLine(m.Time())
// Check for encoder errors
if err := s.encoder.Err(); err != nil {
// Unlock encoder usage
s.encoderLock.Unlock()
return fmt.Errorf("encoding failed: %v", err)
}
s.numRecordsInEncoder++
if s.config.flushDelay == 0 {
// Unlock encoder usage
s.encoderLock.Unlock()
// Directly flush if no flush delay is configured
return s.Flush()
} else if s.numRecordsInEncoder == s.config.BatchSize {
// Unlock encoder usage
s.encoderLock.Unlock()
// Stop flush timer
if s.flushTimer != nil {
if ok := s.flushTimer.Stop(); ok {
cclog.ComponentDebug(s.name, "Write(): Stopped flush timer. Batch size limit reached before flush delay")
s.timerLock.Unlock()
}
}
// Flush if batch size is reached
return s.Flush()
} else if s.timerLock.TryLock() {
// Setup flush timer when flush delay is configured
// and no other timer is already running
if s.flushTimer != nil {
// Restarting existing flush timer
cclog.ComponentDebug(s.name, "Write(): Restarting flush timer")
s.flushTimer.Reset(s.config.flushDelay)
} else {
// Creating and starting flush timer
cclog.ComponentDebug(s.name, "Write(): Starting new flush timer")
s.flushTimer = time.AfterFunc(
s.config.flushDelay,
func() {
defer s.timerLock.Unlock()
cclog.ComponentDebug(s.name, "Starting flush triggered by flush timer")
if err := s.Flush(); err != nil {
cclog.ComponentError(s.name, "Flush triggered by flush timer: flush failed:", err)
}
})
}
}
// Unlock encoder usage
s.encoderLock.Unlock()
return nil
}
// If the sink uses batched sends internally, you can tell to flush its buffers
func (s *AmqpSink) Flush() error {
// Lock for encoder usage
// Own lock for as short as possible: the time it takes to clone the buffer.
s.encoderLock.Lock()
buf := slices.Clone(s.encoder.Bytes())
numRecordsInBuf := s.numRecordsInEncoder
s.encoder.Reset()
s.numRecordsInEncoder = 0
// Unlock encoder usage
s.encoderLock.Unlock()
if len(buf) == 0 {
return nil
}
cclog.ComponentDebug(s.name, "Flush(): Flushing", numRecordsInBuf, "metrics")
// Asynchron send of encoder metrics
s.sendWaitGroup.Add(1)
go func() {
defer s.sendWaitGroup.Done()
//startTime := time.Now()
ctx, cancel := context.WithTimeout(context.Background(), s.config.flushDelay)
defer cancel()
err := s.channel.PublishWithContext(ctx, "", s.queue.Name, false, false, amqp.Publishing{
ContentType: "text/plain",
Body: buf,
})
if err != nil {
cclog.ComponentError(s.name, err.Error())
}
}()
return nil
}
// Close sink: close network connection, close files, close libraries, ...
func (s *AmqpSink) Close() {
cclog.ComponentDebug(s.name, "CLOSE")
// Stop existing timer and immediately flush
if s.flushTimer != nil {
if ok := s.flushTimer.Stop(); ok {
s.timerLock.Unlock()
}
}
// Flush
if err := s.Flush(); err != nil {
cclog.ComponentError(s.name, "Close():", "Flush failed:", err)
}
// Wait for send operations to finish
s.sendWaitGroup.Wait()
s.client.Close()
s.client = nil
}
// New function to create a new instance of the sink
// Initialize the sink by giving it a name and reading in the config JSON
func NewAmqpSink(name string, config json.RawMessage) (Sink, error) {
s := new(AmqpSink)
// Set name of sampleSink
// The name should be chosen in such a way that different instances of AmqpSink can be distinguished
s.name = fmt.Sprintf("AmqpSink(%s)", name) // Always specify a name here
// Set defaults in s.config
// Allow overwriting these defaults by reading config JSON
s.config.PublishTimeout = "4s"
s.config.publishTimeout = time.Duration(4) * time.Second
s.config.Hostname = "localhost"
s.config.Port = 1883
// Read in the config JSON
if len(config) > 0 {
d := json.NewDecoder(bytes.NewReader(config))
d.DisallowUnknownFields()
if err := d.Decode(&s.config); err != nil {
cclog.ComponentError(s.name, "Error reading config:", err.Error())
return nil, err
}
}
// Create lookup map to use meta infos as tags in the output metric
s.meta_as_tags = make(map[string]bool)
for _, k := range s.config.MetaAsTags {
s.meta_as_tags[k] = true
}
// Check if all required fields in the config are set
// E.g. use 'len(s.config.Option) > 0' for string settings
if t, err := time.ParseDuration(s.config.PublishTimeout); err == nil {
s.config.publishTimeout = t
} else {
err := fmt.Errorf("to parse duration for PublishTimeout: %s", s.config.PublishTimeout)
cclog.ComponentError(s.name, err.Error())
return nil, err
}
if t, err := time.ParseDuration(s.config.FlushInterval); err == nil {
s.config.flushDelay = t
} else {
err := fmt.Errorf("to parse duration for FlushInterval: %s", s.config.FlushInterval)
cclog.ComponentError(s.name, err.Error())
return nil, err
}
url := net.JoinHostPort(s.config.Hostname, fmt.Sprintf("%d", s.config.Port))
userpart := ""
if len(s.config.Username) > 0 {
userpart = s.config.Username
if len(s.config.Password) > 0 {
userpart += ":" + s.config.Password
}
userpart += "@"
}
url = fmt.Sprintf("amqp://%s%s", userpart, url)
// Establish connection to the server, library, ...
// Check required files exist and lookup path(s) of executable(s)
c, err := amqp.Dial(url)
if err != nil {
return nil, err
}
s.client = c
ch, err := c.Channel()
if err != nil {
s.client.Close()
return nil, err
}
s.channel = ch
q, err := ch.QueueDeclare(
s.config.QueueName, // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
s.channel.Close()
s.client.Close()
return nil, err
}
s.queue = q
// Return (nil, meaningful error message) in case of errors
return s, nil
}

33
sinks/amqpSink.md Normal file
View File

@@ -0,0 +1,33 @@
## `amqp` sink
The `amqp` sink publishes all metrics into a RabbitMQ network. The publishing key is the queue name in the configuration file
### Configuration structure
```json
{
"<name>": {
"type": "amqp",
"queue_name" : "myqueue",
"batch_size" : 1000,
"flush_delay": "4s",
"publish_timeout": "1s",
"host": "dbhost.example.com",
"port": 5672,
"username": "exampleuser",
"password" : "examplepw",
"meta_as_tags" : [],
}
}
```
- `type`: makes the sink an `amqp` sink, also `rabbitmq` is allowed as alias
- `queue_name`: All metrics are published to this queue
- `host`: Hostname of the RabbitMQ server
- `port`: Port number of the RabbitMQ server
- `username`: Username for basic authentication
- `password`: Password for basic authentication
- `meta_as_tags`: print all meta information as tags in the output (optional)
- `publish_timeout`: Timeout for each publication operation (default `1s`)
- `flush_delay`: Group metrics coming in to a single batch (default `4s`)
- `batch_size`: Maximal batch size. If `batch_size` is reached before the end of `flush_delay`, the metrics are sent without further delay (default: `1000`)

View File

@@ -21,6 +21,8 @@ var AvailableSinks = map[string]func(name string, config json.RawMessage) (Sink,
"influxdb": NewInfluxSink, "influxdb": NewInfluxSink,
"influxasync": NewInfluxAsyncSink, "influxasync": NewInfluxAsyncSink,
"http": NewHttpSink, "http": NewHttpSink,
"amqp": NewAmqpSink,
"rabbitmq": NewAmqpSink,
} }
// Metric collector manager data structure // Metric collector manager data structure

View File

@@ -1,159 +0,0 @@
package sinks
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"log/syslog"
cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
)
var AvailableSyslogPriorities map[string]syslog.Priority = map[string]syslog.Priority{
"LOG_EMERG": syslog.LOG_EMERG,
"LOG_ALERT": syslog.LOG_ALERT,
"LOG_CRIT": syslog.LOG_CRIT,
"LOG_ERR": syslog.LOG_ERR,
"LOG_NOTICE": syslog.LOG_NOTICE,
"LOG_INFO": syslog.LOG_INFO,
"LOG_DEBUG": syslog.LOG_DEBUG,
"LOG_USER": syslog.LOG_USER,
"LOG_MAIL": syslog.LOG_MAIL,
"LOG_DAEMON": syslog.LOG_DAEMON,
"LOG_AUTH": syslog.LOG_AUTH,
"LOG_SYSLOG": syslog.LOG_SYSLOG,
"LOG_LPR": syslog.LOG_LPR,
"LOG_NEWS": syslog.LOG_NEWS,
"LOG_UUCP": syslog.LOG_UUCP,
"LOG_CRON": syslog.LOG_CRON,
"LOG_AUTHPRIV": syslog.LOG_AUTHPRIV,
"LOG_FTP": syslog.LOG_FTP,
"LOG_LOCAL0": syslog.LOG_LOCAL0,
"LOG_LOCAL1": syslog.LOG_LOCAL1,
"LOG_LOCAL2": syslog.LOG_LOCAL2,
"LOG_LOCAL3": syslog.LOG_LOCAL3,
"LOG_LOCAL4": syslog.LOG_LOCAL4,
"LOG_LOCAL5": syslog.LOG_LOCAL5,
"LOG_LOCAL6": syslog.LOG_LOCAL6,
"LOG_LOCAL7": syslog.LOG_LOCAL7,
}
type SyslogSinkConfig struct {
defaultSinkConfig
SyslogTag string `json:"syslog_tag"`
SyslogPriories []string `json:"syslog_priorities"`
SyslogWriter string `json:"syslog_writer"`
}
type SyslogSink struct {
sink
config SyslogSinkConfig // entry point to the SyslogSinkConfig
syslogPrios syslog.Priority
out *syslog.Writer
}
func (s *SyslogSink) Write(point lp.CCMetric) error {
var err error = nil
p := point.ToLineProtocol(s.meta_as_tags)
switch s.config.SyslogWriter {
case "info":
err = s.out.Info(p)
case "debug":
err = s.out.Debug(p)
case "alert":
err = s.out.Alert(p)
case "emerg":
err = s.out.Emerg(p)
case "err":
err = s.out.Err(p)
case "notice":
err = s.out.Notice(p)
case "warning":
err = s.out.Warning(p)
}
return err
}
func (s *SyslogSink) Flush() error {
return nil
}
func (s *SyslogSink) Close() {
s.out.Close()
cclog.ComponentDebug(s.name, "CLOSE")
}
func NewSyslogSink(name string, config json.RawMessage) (Sink, error) {
var err error = nil
s := new(SyslogSink)
s.name = fmt.Sprintf("SyslogSink(%s)", name)
// Read in the config JSON
if len(config) > 0 {
d := json.NewDecoder(bytes.NewReader(config))
d.DisallowUnknownFields()
if err := d.Decode(&s.config); err != nil {
cclog.ComponentError(s.name, "Error reading config:", err.Error())
return nil, err
}
}
// Create lookup map to use meta infos as tags in the output metric
s.meta_as_tags = make(map[string]bool)
for _, k := range s.config.MetaAsTags {
s.meta_as_tags[k] = true
}
if len(s.config.SyslogTag) == 0 {
err := errors.New("syslog tag must be non-empty")
cclog.ComponentError(s.name, err.Error())
return nil, err
}
if len(s.config.SyslogPriories) == 0 {
err := errors.New("syslog prioritiey must be non-empty")
cclog.ComponentError(s.name, err.Error())
return nil, err
}
for _, p := range s.config.SyslogPriories {
if prio, ok := AvailableSyslogPriorities[p]; !ok {
err := fmt.Errorf("invalid syslog priority '%s'", p)
cclog.ComponentError(s.name, err.Error())
return nil, err
} else {
s.syslogPrios |= prio
}
}
mywriter := ""
for _, w := range []string{
"info",
"debug",
"alert",
"emerg",
"err",
"notice",
"warning",
} {
if s.config.SyslogWriter == w {
mywriter = w
}
}
if len(mywriter) == 0 {
err := fmt.Errorf("invalid syslog writer '%s'", s.config.SyslogWriter)
cclog.ComponentError(s.name, err.Error())
return nil, err
}
s.out, err = syslog.New(s.syslogPrios, s.config.SyslogTag)
if err != nil {
cclog.ComponentError(s.name, err.Error())
return nil, err
}
return s, nil
}

View File

@@ -1,64 +0,0 @@
## `syslog` sink
The `syslog` sink provides an easy way to submit metrics and events to the Syslog logging system.
### Configuration structure
```json
{
"<name>": {
"type": "syslog",
"meta_as_tags" : [],
"syslog_tag" : "mytag",
"syslog_priorities" : [
"LOG_INFO",
"LOG_LOCAL7",
],
"syslog_writer" : "info"
}
}
```
- `type`: makes the sink an `syslog` sink
- `meta_as_tags`: print meta information as tags in the output (optional)
- `syslog_tag`: The tag submitted with each message
- `syslog_priorities`: List of syslog priorities. Will be OR'd together
- `syslog_writer`: Submit metrics with this write level
#### Possible priorities for `syslog_priorities`
- `LOG_EMERG`
- `LOG_ALERT`
- `LOG_CRIT`
- `LOG_ERR`
- `LOG_NOTICE`
- `LOG_INFO`
- `LOG_DEBUG`
- `LOG_USER`
- `LOG_MAIL`
- `LOG_DAEMON`
- `LOG_AUTH`
- `LOG_SYSLOG`
- `LOG_LPR`
- `LOG_NEWS`
- `LOG_UUCP`
- `LOG_CRON`
- `LOG_AUTHPRIV`
- `LOG_FTP`
- `LOG_LOCAL0`
- `LOG_LOCAL1`
- `LOG_LOCAL2`
- `LOG_LOCAL3`
- `LOG_LOCAL4`
- `LOG_LOCAL5`
- `LOG_LOCAL6`
- `LOG_LOCAL7`
#### Possible writers for `syslog_writer`
- `info`
- `debug`
- `alert`
- `emerg`
- `err`
- `notice`
- `warning`