diff --git a/sinks/amqpSink.go b/sinks/amqpSink.go new file mode 100644 index 0000000..c16a77d --- /dev/null +++ b/sinks/amqpSink.go @@ -0,0 +1,352 @@ +package sinks + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "net" + "sync" + "time" + + cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" + lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric" + influx "github.com/influxdata/line-protocol/v2/lineprotocol" + amqp "github.com/rabbitmq/amqp091-go" + "golang.org/x/exp/slices" +) + +type AmqpSinkConfig struct { + // defines JSON tags for 'type' and 'meta_as_tags' (string list) + // See: metricSink.go + defaultSinkConfig + // Additional config options, for AmqpSink + QueueName string `json:"queue_name"` + // Maximum number of points sent to server in single request. + // Default: 1000 + BatchSize int `json:"batch_size,omitempty"` + + // Time interval for delayed sending of metrics. + // If the buffers are already filled before the end of this interval, + // the metrics are sent without further delay. + // Default: 1s + FlushInterval string `json:"flush_delay,omitempty"` + flushDelay time.Duration + + Hostname string `json:"hostname"` + Port int `json:"port"` + PublishTimeout string `json:"publish_timeout,omitempty"` + publishTimeout time.Duration + Username string `json:"username,omitempty"` + Password string `json:"password,omitempty"` +} + +type AmqpSink struct { + // declares elements 'name' and 'meta_as_tags' (string to bool map!) + sink + config AmqpSinkConfig // entry point to the AmqpSinkConfig + // influx line protocol encoder + encoder influx.Encoder + // number of records stored in the encoder + numRecordsInEncoder int + // List of tags and meta data tags which should be used as tags + extended_tag_list []key_value_pair + // Flush() runs in another goroutine and accesses the influx line protocol encoder, + // so this encoderLock has to protect the encoder and numRecordsInEncoder + encoderLock sync.Mutex + + // timer to run Flush() + flushTimer *time.Timer + // Lock to assure that only one timer is running at a time + timerLock sync.Mutex + + // WaitGroup to ensure only one send operation is running at a time + sendWaitGroup sync.WaitGroup + + client *amqp.Connection + channel *amqp.Channel + queue amqp.Queue +} + +// Implement functions required for Sink interface +// Write(...), Flush(), Close() +// See: metricSink.go + +// Code to submit a single CCMetric to the sink +func (s *AmqpSink) Write(m lp.CCMetric) error { + + // Lock for encoder usage + s.encoderLock.Lock() + + // Encode measurement name + s.encoder.StartLine(m.Name()) + + // copy tags and meta data which should be used as tags + s.extended_tag_list = s.extended_tag_list[:0] + for key, value := range m.Tags() { + s.extended_tag_list = + append( + s.extended_tag_list, + key_value_pair{ + key: key, + value: value, + }, + ) + } + for _, key := range s.config.MetaAsTags { + if value, ok := m.GetMeta(key); ok { + s.extended_tag_list = + append( + s.extended_tag_list, + key_value_pair{ + key: key, + value: value, + }, + ) + } + } + + // Encode tags (they musts be in lexical order) + slices.SortFunc( + s.extended_tag_list, + func(a key_value_pair, b key_value_pair) int { + if a.key < b.key { + return -1 + } + if a.key > b.key { + return +1 + } + return 0 + }, + ) + for i := range s.extended_tag_list { + s.encoder.AddTag( + s.extended_tag_list[i].key, + s.extended_tag_list[i].value, + ) + } + + // Encode fields + for key, value := range m.Fields() { + s.encoder.AddField(key, influx.MustNewValue(value)) + } + + // Encode time stamp + s.encoder.EndLine(m.Time()) + + // Check for encoder errors + if err := s.encoder.Err(); err != nil { + // Unlock encoder usage + s.encoderLock.Unlock() + + return fmt.Errorf("encoding failed: %v", err) + } + s.numRecordsInEncoder++ + + if s.config.flushDelay == 0 { + // Unlock encoder usage + s.encoderLock.Unlock() + + // Directly flush if no flush delay is configured + return s.Flush() + } else if s.numRecordsInEncoder == s.config.BatchSize { + // Unlock encoder usage + s.encoderLock.Unlock() + + // Stop flush timer + if s.flushTimer != nil { + if ok := s.flushTimer.Stop(); ok { + cclog.ComponentDebug(s.name, "Write(): Stopped flush timer. Batch size limit reached before flush delay") + s.timerLock.Unlock() + } + } + + // Flush if batch size is reached + return s.Flush() + } else if s.timerLock.TryLock() { + + // Setup flush timer when flush delay is configured + // and no other timer is already running + if s.flushTimer != nil { + + // Restarting existing flush timer + cclog.ComponentDebug(s.name, "Write(): Restarting flush timer") + s.flushTimer.Reset(s.config.flushDelay) + } else { + + // Creating and starting flush timer + cclog.ComponentDebug(s.name, "Write(): Starting new flush timer") + s.flushTimer = time.AfterFunc( + s.config.flushDelay, + func() { + defer s.timerLock.Unlock() + cclog.ComponentDebug(s.name, "Starting flush triggered by flush timer") + if err := s.Flush(); err != nil { + cclog.ComponentError(s.name, "Flush triggered by flush timer: flush failed:", err) + } + }) + } + } + + // Unlock encoder usage + s.encoderLock.Unlock() + return nil +} + +// If the sink uses batched sends internally, you can tell to flush its buffers +func (s *AmqpSink) Flush() error { + + // Lock for encoder usage + // Own lock for as short as possible: the time it takes to clone the buffer. + s.encoderLock.Lock() + + buf := slices.Clone(s.encoder.Bytes()) + numRecordsInBuf := s.numRecordsInEncoder + s.encoder.Reset() + s.numRecordsInEncoder = 0 + + // Unlock encoder usage + s.encoderLock.Unlock() + + if len(buf) == 0 { + return nil + } + + cclog.ComponentDebug(s.name, "Flush(): Flushing", numRecordsInBuf, "metrics") + + // Asynchron send of encoder metrics + s.sendWaitGroup.Add(1) + go func() { + defer s.sendWaitGroup.Done() + //startTime := time.Now() + ctx, cancel := context.WithTimeout(context.Background(), s.config.flushDelay) + defer cancel() + err := s.channel.PublishWithContext(ctx, "", s.queue.Name, false, false, amqp.Publishing{ + ContentType: "text/plain", + Body: buf, + }) + if err != nil { + cclog.ComponentError(s.name, err.Error()) + } + }() + return nil +} + +// Close sink: close network connection, close files, close libraries, ... +func (s *AmqpSink) Close() { + + cclog.ComponentDebug(s.name, "CLOSE") + + // Stop existing timer and immediately flush + if s.flushTimer != nil { + if ok := s.flushTimer.Stop(); ok { + s.timerLock.Unlock() + } + } + + // Flush + if err := s.Flush(); err != nil { + cclog.ComponentError(s.name, "Close():", "Flush failed:", err) + } + + // Wait for send operations to finish + s.sendWaitGroup.Wait() + + s.client.Close() + s.client = nil +} + +// New function to create a new instance of the sink +// Initialize the sink by giving it a name and reading in the config JSON +func NewAmqpSink(name string, config json.RawMessage) (Sink, error) { + s := new(AmqpSink) + + // Set name of sampleSink + // The name should be chosen in such a way that different instances of AmqpSink can be distinguished + s.name = fmt.Sprintf("AmqpSink(%s)", name) // Always specify a name here + + // Set defaults in s.config + // Allow overwriting these defaults by reading config JSON + + s.config.PublishTimeout = "4s" + s.config.publishTimeout = time.Duration(4) * time.Second + s.config.Hostname = "localhost" + s.config.Port = 1883 + + // Read in the config JSON + if len(config) > 0 { + d := json.NewDecoder(bytes.NewReader(config)) + d.DisallowUnknownFields() + if err := d.Decode(&s.config); err != nil { + cclog.ComponentError(s.name, "Error reading config:", err.Error()) + return nil, err + } + } + + // Create lookup map to use meta infos as tags in the output metric + s.meta_as_tags = make(map[string]bool) + for _, k := range s.config.MetaAsTags { + s.meta_as_tags[k] = true + } + + // Check if all required fields in the config are set + // E.g. use 'len(s.config.Option) > 0' for string settings + if t, err := time.ParseDuration(s.config.PublishTimeout); err == nil { + s.config.publishTimeout = t + } else { + err := fmt.Errorf("to parse duration for PublishTimeout: %s", s.config.PublishTimeout) + cclog.ComponentError(s.name, err.Error()) + return nil, err + } + if t, err := time.ParseDuration(s.config.FlushInterval); err == nil { + s.config.flushDelay = t + } else { + err := fmt.Errorf("to parse duration for FlushInterval: %s", s.config.FlushInterval) + cclog.ComponentError(s.name, err.Error()) + return nil, err + } + + url := net.JoinHostPort(s.config.Hostname, fmt.Sprintf("%d", s.config.Port)) + userpart := "" + if len(s.config.Username) > 0 { + userpart = s.config.Username + if len(s.config.Password) > 0 { + userpart += ":" + s.config.Password + } + userpart += "@" + } + url = fmt.Sprintf("amqp://%s%s", userpart, url) + + // Establish connection to the server, library, ... + // Check required files exist and lookup path(s) of executable(s) + c, err := amqp.Dial(url) + if err != nil { + return nil, err + } + s.client = c + + ch, err := c.Channel() + if err != nil { + s.client.Close() + return nil, err + } + s.channel = ch + + q, err := ch.QueueDeclare( + s.config.QueueName, // name + false, // durable + false, // delete when unused + false, // exclusive + false, // no-wait + nil, // arguments + ) + if err != nil { + s.channel.Close() + s.client.Close() + return nil, err + } + s.queue = q + + // Return (nil, meaningful error message) in case of errors + return s, nil +} diff --git a/sinks/amqpSink.md b/sinks/amqpSink.md new file mode 100644 index 0000000..4f1704b --- /dev/null +++ b/sinks/amqpSink.md @@ -0,0 +1,33 @@ +## `amqp` sink + +The `amqp` sink publishes all metrics into a RabbitMQ network. The publishing key is the queue name in the configuration file + +### Configuration structure + +```json +{ + "": { + "type": "amqp", + "queue_name" : "myqueue", + "batch_size" : 1000, + "flush_delay": "4s", + "publish_timeout": "1s", + "host": "dbhost.example.com", + "port": 5672, + "username": "exampleuser", + "password" : "examplepw", + "meta_as_tags" : [], + } +} +``` + +- `type`: makes the sink an `amqp` sink, also `rabbitmq` is allowed as alias +- `queue_name`: All metrics are published to this queue +- `host`: Hostname of the RabbitMQ server +- `port`: Port number of the RabbitMQ server +- `username`: Username for basic authentication +- `password`: Password for basic authentication +- `meta_as_tags`: print all meta information as tags in the output (optional) +- `publish_timeout`: Timeout for each publication operation (default `1s`) +- `flush_delay`: Group metrics coming in to a single batch (default `4s`) +- `batch_size`: Maximal batch size. If `batch_size` is reached before the end of `flush_delay`, the metrics are sent without further delay (default: `1000`) \ No newline at end of file diff --git a/sinks/sinkManager.go b/sinks/sinkManager.go index cd2680f..255dcac 100644 --- a/sinks/sinkManager.go +++ b/sinks/sinkManager.go @@ -21,6 +21,8 @@ var AvailableSinks = map[string]func(name string, config json.RawMessage) (Sink, "influxdb": NewInfluxSink, "influxasync": NewInfluxAsyncSink, "http": NewHttpSink, + "amqp": NewAmqpSink, + "rabbitmq": NewAmqpSink, } // Metric collector manager data structure