mirror of
				https://github.com/ClusterCockpit/cc-metric-collector.git
				synced 2025-11-03 18:25:07 +01:00 
			
		
		
		
	Initial version of a RabbitMQ/AMQP sink
This commit is contained in:
		
							
								
								
									
										352
									
								
								sinks/amqpSink.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										352
									
								
								sinks/amqpSink.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,352 @@
 | 
			
		||||
package sinks
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"bytes"
 | 
			
		||||
	"context"
 | 
			
		||||
	"encoding/json"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"net"
 | 
			
		||||
	"sync"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
 | 
			
		||||
	lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
 | 
			
		||||
	influx "github.com/influxdata/line-protocol/v2/lineprotocol"
 | 
			
		||||
	amqp "github.com/rabbitmq/amqp091-go"
 | 
			
		||||
	"golang.org/x/exp/slices"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type AmqpSinkConfig struct {
 | 
			
		||||
	// defines JSON tags for 'type' and 'meta_as_tags' (string list)
 | 
			
		||||
	// See: metricSink.go
 | 
			
		||||
	defaultSinkConfig
 | 
			
		||||
	// Additional config options, for AmqpSink
 | 
			
		||||
	QueueName string `json:"queue_name"`
 | 
			
		||||
	// Maximum number of points sent to server in single request.
 | 
			
		||||
	// Default: 1000
 | 
			
		||||
	BatchSize int `json:"batch_size,omitempty"`
 | 
			
		||||
 | 
			
		||||
	// Time interval for delayed sending of metrics.
 | 
			
		||||
	// If the buffers are already filled before the end of this interval,
 | 
			
		||||
	// the metrics are sent without further delay.
 | 
			
		||||
	// Default: 1s
 | 
			
		||||
	FlushInterval string `json:"flush_delay,omitempty"`
 | 
			
		||||
	flushDelay    time.Duration
 | 
			
		||||
 | 
			
		||||
	Hostname       string `json:"hostname"`
 | 
			
		||||
	Port           int    `json:"port"`
 | 
			
		||||
	PublishTimeout string `json:"publish_timeout,omitempty"`
 | 
			
		||||
	publishTimeout time.Duration
 | 
			
		||||
	Username       string `json:"username,omitempty"`
 | 
			
		||||
	Password       string `json:"password,omitempty"`
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type AmqpSink struct {
 | 
			
		||||
	// declares elements 	'name' and 'meta_as_tags' (string to bool map!)
 | 
			
		||||
	sink
 | 
			
		||||
	config AmqpSinkConfig // entry point to the AmqpSinkConfig
 | 
			
		||||
	// influx line protocol encoder
 | 
			
		||||
	encoder influx.Encoder
 | 
			
		||||
	// number of records stored in the encoder
 | 
			
		||||
	numRecordsInEncoder int
 | 
			
		||||
	// List of tags and meta data tags which should be used as tags
 | 
			
		||||
	extended_tag_list []key_value_pair
 | 
			
		||||
	// Flush() runs in another goroutine and accesses the influx line protocol encoder,
 | 
			
		||||
	// so this encoderLock has to protect the encoder and numRecordsInEncoder
 | 
			
		||||
	encoderLock sync.Mutex
 | 
			
		||||
 | 
			
		||||
	// timer to run Flush()
 | 
			
		||||
	flushTimer *time.Timer
 | 
			
		||||
	// Lock to assure that only one timer is running at a time
 | 
			
		||||
	timerLock sync.Mutex
 | 
			
		||||
 | 
			
		||||
	// WaitGroup to ensure only one send operation is running at a time
 | 
			
		||||
	sendWaitGroup sync.WaitGroup
 | 
			
		||||
 | 
			
		||||
	client  *amqp.Connection
 | 
			
		||||
	channel *amqp.Channel
 | 
			
		||||
	queue   amqp.Queue
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Implement functions required for Sink interface
 | 
			
		||||
// Write(...), Flush(), Close()
 | 
			
		||||
// See: metricSink.go
 | 
			
		||||
 | 
			
		||||
// Code to submit a single CCMetric to the sink
 | 
			
		||||
func (s *AmqpSink) Write(m lp.CCMetric) error {
 | 
			
		||||
 | 
			
		||||
	// Lock for encoder usage
 | 
			
		||||
	s.encoderLock.Lock()
 | 
			
		||||
 | 
			
		||||
	// Encode measurement name
 | 
			
		||||
	s.encoder.StartLine(m.Name())
 | 
			
		||||
 | 
			
		||||
	// copy tags and meta data which should be used as tags
 | 
			
		||||
	s.extended_tag_list = s.extended_tag_list[:0]
 | 
			
		||||
	for key, value := range m.Tags() {
 | 
			
		||||
		s.extended_tag_list =
 | 
			
		||||
			append(
 | 
			
		||||
				s.extended_tag_list,
 | 
			
		||||
				key_value_pair{
 | 
			
		||||
					key:   key,
 | 
			
		||||
					value: value,
 | 
			
		||||
				},
 | 
			
		||||
			)
 | 
			
		||||
	}
 | 
			
		||||
	for _, key := range s.config.MetaAsTags {
 | 
			
		||||
		if value, ok := m.GetMeta(key); ok {
 | 
			
		||||
			s.extended_tag_list =
 | 
			
		||||
				append(
 | 
			
		||||
					s.extended_tag_list,
 | 
			
		||||
					key_value_pair{
 | 
			
		||||
						key:   key,
 | 
			
		||||
						value: value,
 | 
			
		||||
					},
 | 
			
		||||
				)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Encode tags (they musts be in lexical order)
 | 
			
		||||
	slices.SortFunc(
 | 
			
		||||
		s.extended_tag_list,
 | 
			
		||||
		func(a key_value_pair, b key_value_pair) int {
 | 
			
		||||
			if a.key < b.key {
 | 
			
		||||
				return -1
 | 
			
		||||
			}
 | 
			
		||||
			if a.key > b.key {
 | 
			
		||||
				return +1
 | 
			
		||||
			}
 | 
			
		||||
			return 0
 | 
			
		||||
		},
 | 
			
		||||
	)
 | 
			
		||||
	for i := range s.extended_tag_list {
 | 
			
		||||
		s.encoder.AddTag(
 | 
			
		||||
			s.extended_tag_list[i].key,
 | 
			
		||||
			s.extended_tag_list[i].value,
 | 
			
		||||
		)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Encode fields
 | 
			
		||||
	for key, value := range m.Fields() {
 | 
			
		||||
		s.encoder.AddField(key, influx.MustNewValue(value))
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Encode time stamp
 | 
			
		||||
	s.encoder.EndLine(m.Time())
 | 
			
		||||
 | 
			
		||||
	// Check for encoder errors
 | 
			
		||||
	if err := s.encoder.Err(); err != nil {
 | 
			
		||||
		// Unlock encoder usage
 | 
			
		||||
		s.encoderLock.Unlock()
 | 
			
		||||
 | 
			
		||||
		return fmt.Errorf("encoding failed: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	s.numRecordsInEncoder++
 | 
			
		||||
 | 
			
		||||
	if s.config.flushDelay == 0 {
 | 
			
		||||
		// Unlock encoder usage
 | 
			
		||||
		s.encoderLock.Unlock()
 | 
			
		||||
 | 
			
		||||
		// Directly flush if no flush delay is configured
 | 
			
		||||
		return s.Flush()
 | 
			
		||||
	} else if s.numRecordsInEncoder == s.config.BatchSize {
 | 
			
		||||
		// Unlock encoder usage
 | 
			
		||||
		s.encoderLock.Unlock()
 | 
			
		||||
 | 
			
		||||
		// Stop flush timer
 | 
			
		||||
		if s.flushTimer != nil {
 | 
			
		||||
			if ok := s.flushTimer.Stop(); ok {
 | 
			
		||||
				cclog.ComponentDebug(s.name, "Write(): Stopped flush timer. Batch size limit reached before flush delay")
 | 
			
		||||
				s.timerLock.Unlock()
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		// Flush if batch size is reached
 | 
			
		||||
		return s.Flush()
 | 
			
		||||
	} else if s.timerLock.TryLock() {
 | 
			
		||||
 | 
			
		||||
		// Setup flush timer when flush delay is configured
 | 
			
		||||
		// and no other timer is already running
 | 
			
		||||
		if s.flushTimer != nil {
 | 
			
		||||
 | 
			
		||||
			// Restarting existing flush timer
 | 
			
		||||
			cclog.ComponentDebug(s.name, "Write(): Restarting flush timer")
 | 
			
		||||
			s.flushTimer.Reset(s.config.flushDelay)
 | 
			
		||||
		} else {
 | 
			
		||||
 | 
			
		||||
			// Creating and starting flush timer
 | 
			
		||||
			cclog.ComponentDebug(s.name, "Write(): Starting new flush timer")
 | 
			
		||||
			s.flushTimer = time.AfterFunc(
 | 
			
		||||
				s.config.flushDelay,
 | 
			
		||||
				func() {
 | 
			
		||||
					defer s.timerLock.Unlock()
 | 
			
		||||
					cclog.ComponentDebug(s.name, "Starting flush triggered by flush timer")
 | 
			
		||||
					if err := s.Flush(); err != nil {
 | 
			
		||||
						cclog.ComponentError(s.name, "Flush triggered by flush timer: flush failed:", err)
 | 
			
		||||
					}
 | 
			
		||||
				})
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Unlock encoder usage
 | 
			
		||||
	s.encoderLock.Unlock()
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// If the sink uses batched sends internally, you can tell to flush its buffers
 | 
			
		||||
func (s *AmqpSink) Flush() error {
 | 
			
		||||
 | 
			
		||||
	// Lock for encoder usage
 | 
			
		||||
	// Own lock for as short as possible: the time it takes to clone the buffer.
 | 
			
		||||
	s.encoderLock.Lock()
 | 
			
		||||
 | 
			
		||||
	buf := slices.Clone(s.encoder.Bytes())
 | 
			
		||||
	numRecordsInBuf := s.numRecordsInEncoder
 | 
			
		||||
	s.encoder.Reset()
 | 
			
		||||
	s.numRecordsInEncoder = 0
 | 
			
		||||
 | 
			
		||||
	// Unlock encoder usage
 | 
			
		||||
	s.encoderLock.Unlock()
 | 
			
		||||
 | 
			
		||||
	if len(buf) == 0 {
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	cclog.ComponentDebug(s.name, "Flush(): Flushing", numRecordsInBuf, "metrics")
 | 
			
		||||
 | 
			
		||||
	// Asynchron send of encoder metrics
 | 
			
		||||
	s.sendWaitGroup.Add(1)
 | 
			
		||||
	go func() {
 | 
			
		||||
		defer s.sendWaitGroup.Done()
 | 
			
		||||
		//startTime := time.Now()
 | 
			
		||||
		ctx, cancel := context.WithTimeout(context.Background(), s.config.flushDelay)
 | 
			
		||||
		defer cancel()
 | 
			
		||||
		err := s.channel.PublishWithContext(ctx, "", s.queue.Name, false, false, amqp.Publishing{
 | 
			
		||||
			ContentType: "text/plain",
 | 
			
		||||
			Body:        buf,
 | 
			
		||||
		})
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			cclog.ComponentError(s.name, err.Error())
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Close sink: close network connection, close files, close libraries, ...
 | 
			
		||||
func (s *AmqpSink) Close() {
 | 
			
		||||
 | 
			
		||||
	cclog.ComponentDebug(s.name, "CLOSE")
 | 
			
		||||
 | 
			
		||||
	// Stop existing timer and immediately flush
 | 
			
		||||
	if s.flushTimer != nil {
 | 
			
		||||
		if ok := s.flushTimer.Stop(); ok {
 | 
			
		||||
			s.timerLock.Unlock()
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Flush
 | 
			
		||||
	if err := s.Flush(); err != nil {
 | 
			
		||||
		cclog.ComponentError(s.name, "Close():", "Flush failed:", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Wait for send operations to finish
 | 
			
		||||
	s.sendWaitGroup.Wait()
 | 
			
		||||
 | 
			
		||||
	s.client.Close()
 | 
			
		||||
	s.client = nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// New function to create a new instance of the sink
 | 
			
		||||
// Initialize the sink by giving it a name and reading in the config JSON
 | 
			
		||||
func NewAmqpSink(name string, config json.RawMessage) (Sink, error) {
 | 
			
		||||
	s := new(AmqpSink)
 | 
			
		||||
 | 
			
		||||
	// Set name of sampleSink
 | 
			
		||||
	// The name should be chosen in such a way that different instances of AmqpSink can be distinguished
 | 
			
		||||
	s.name = fmt.Sprintf("AmqpSink(%s)", name) // Always specify a name here
 | 
			
		||||
 | 
			
		||||
	// Set defaults in s.config
 | 
			
		||||
	// Allow overwriting these defaults by reading config JSON
 | 
			
		||||
 | 
			
		||||
	s.config.PublishTimeout = "4s"
 | 
			
		||||
	s.config.publishTimeout = time.Duration(4) * time.Second
 | 
			
		||||
	s.config.Hostname = "localhost"
 | 
			
		||||
	s.config.Port = 1883
 | 
			
		||||
 | 
			
		||||
	// Read in the config JSON
 | 
			
		||||
	if len(config) > 0 {
 | 
			
		||||
		d := json.NewDecoder(bytes.NewReader(config))
 | 
			
		||||
		d.DisallowUnknownFields()
 | 
			
		||||
		if err := d.Decode(&s.config); err != nil {
 | 
			
		||||
			cclog.ComponentError(s.name, "Error reading config:", err.Error())
 | 
			
		||||
			return nil, err
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Create lookup map to use meta infos as tags in the output metric
 | 
			
		||||
	s.meta_as_tags = make(map[string]bool)
 | 
			
		||||
	for _, k := range s.config.MetaAsTags {
 | 
			
		||||
		s.meta_as_tags[k] = true
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Check if all required fields in the config are set
 | 
			
		||||
	// E.g. use 'len(s.config.Option) > 0' for string settings
 | 
			
		||||
	if t, err := time.ParseDuration(s.config.PublishTimeout); err == nil {
 | 
			
		||||
		s.config.publishTimeout = t
 | 
			
		||||
	} else {
 | 
			
		||||
		err := fmt.Errorf("to parse duration for PublishTimeout: %s", s.config.PublishTimeout)
 | 
			
		||||
		cclog.ComponentError(s.name, err.Error())
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	if t, err := time.ParseDuration(s.config.FlushInterval); err == nil {
 | 
			
		||||
		s.config.flushDelay = t
 | 
			
		||||
	} else {
 | 
			
		||||
		err := fmt.Errorf("to parse duration for FlushInterval: %s", s.config.FlushInterval)
 | 
			
		||||
		cclog.ComponentError(s.name, err.Error())
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	url := net.JoinHostPort(s.config.Hostname, fmt.Sprintf("%d", s.config.Port))
 | 
			
		||||
	userpart := ""
 | 
			
		||||
	if len(s.config.Username) > 0 {
 | 
			
		||||
		userpart = s.config.Username
 | 
			
		||||
		if len(s.config.Password) > 0 {
 | 
			
		||||
			userpart += ":" + s.config.Password
 | 
			
		||||
		}
 | 
			
		||||
		userpart += "@"
 | 
			
		||||
	}
 | 
			
		||||
	url = fmt.Sprintf("amqp://%s%s", userpart, url)
 | 
			
		||||
 | 
			
		||||
	// Establish connection to the server, library, ...
 | 
			
		||||
	// Check required files exist and lookup path(s) of executable(s)
 | 
			
		||||
	c, err := amqp.Dial(url)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	s.client = c
 | 
			
		||||
 | 
			
		||||
	ch, err := c.Channel()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		s.client.Close()
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	s.channel = ch
 | 
			
		||||
 | 
			
		||||
	q, err := ch.QueueDeclare(
 | 
			
		||||
		s.config.QueueName, // name
 | 
			
		||||
		false,              // durable
 | 
			
		||||
		false,              // delete when unused
 | 
			
		||||
		false,              // exclusive
 | 
			
		||||
		false,              // no-wait
 | 
			
		||||
		nil,                // arguments
 | 
			
		||||
	)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		s.channel.Close()
 | 
			
		||||
		s.client.Close()
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	s.queue = q
 | 
			
		||||
 | 
			
		||||
	// Return (nil, meaningful error message) in case of errors
 | 
			
		||||
	return s, nil
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										33
									
								
								sinks/amqpSink.md
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										33
									
								
								sinks/amqpSink.md
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,33 @@
 | 
			
		||||
## `amqp` sink
 | 
			
		||||
 | 
			
		||||
The `amqp` sink publishes all metrics into a RabbitMQ network. The publishing key is the queue name in the configuration file
 | 
			
		||||
 | 
			
		||||
### Configuration structure
 | 
			
		||||
 | 
			
		||||
```json
 | 
			
		||||
{
 | 
			
		||||
  "<name>": {
 | 
			
		||||
    "type": "amqp",
 | 
			
		||||
    "queue_name" : "myqueue",
 | 
			
		||||
    "batch_size" : 1000,
 | 
			
		||||
    "flush_delay": "4s",
 | 
			
		||||
    "publish_timeout": "1s",
 | 
			
		||||
    "host": "dbhost.example.com",
 | 
			
		||||
    "port": 5672,
 | 
			
		||||
    "username": "exampleuser",
 | 
			
		||||
    "password" : "examplepw",
 | 
			
		||||
    "meta_as_tags" : [],
 | 
			
		||||
  }
 | 
			
		||||
}
 | 
			
		||||
```
 | 
			
		||||
 | 
			
		||||
- `type`: makes the sink an `amqp` sink, also `rabbitmq` is allowed as alias
 | 
			
		||||
- `queue_name`: All metrics are published to this queue
 | 
			
		||||
- `host`: Hostname of the RabbitMQ server
 | 
			
		||||
- `port`: Port number of the RabbitMQ server
 | 
			
		||||
- `username`: Username for basic authentication
 | 
			
		||||
- `password`: Password for basic authentication
 | 
			
		||||
- `meta_as_tags`: print all meta information as tags in the output (optional)
 | 
			
		||||
- `publish_timeout`: Timeout for each publication operation (default `1s`)
 | 
			
		||||
- `flush_delay`: Group metrics coming in to a single batch (default `4s`)
 | 
			
		||||
- `batch_size`: Maximal batch size. If `batch_size` is reached before the end of `flush_delay`, the metrics are sent without further delay (default: `1000`)
 | 
			
		||||
@@ -21,6 +21,8 @@ var AvailableSinks = map[string]func(name string, config json.RawMessage) (Sink,
 | 
			
		||||
	"influxdb":    NewInfluxSink,
 | 
			
		||||
	"influxasync": NewInfluxAsyncSink,
 | 
			
		||||
	"http":        NewHttpSink,
 | 
			
		||||
	"amqp":        NewAmqpSink,
 | 
			
		||||
	"rabbitmq":    NewAmqpSink,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Metric collector manager data structure
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user