mirror of
https://github.com/ClusterCockpit/cc-metric-collector.git
synced 2025-07-19 11:21:41 +02:00
Compare commits
1 Commits
likwid_col
...
amqp_sink
Author | SHA1 | Date | |
---|---|---|---|
|
157bd6f0cc |
@@ -374,14 +374,6 @@ func (m *LikwidCollector) takeMeasurement(evidx int, evset LikwidEventsetConfig,
|
||||
}
|
||||
defer watcher.Close()
|
||||
if len(m.config.LockfilePath) > 0 {
|
||||
if _, err := os.Stat(m.config.LockfilePath); os.IsNotExist(err) {
|
||||
file, err := os.Create(m.config.LockfilePath)
|
||||
if err != nil {
|
||||
cclog.ComponentError(m.name, "Cannot create lockfile", m.config.LockfilePath, ":", err.Error())
|
||||
return true, err
|
||||
}
|
||||
file.Close()
|
||||
}
|
||||
info, err := os.Stat(m.config.LockfilePath)
|
||||
if err != nil {
|
||||
return true, err
|
||||
@@ -390,9 +382,9 @@ func (m *LikwidCollector) takeMeasurement(evidx int, evset LikwidEventsetConfig,
|
||||
if uid != uint32(os.Getuid()) {
|
||||
usr, err := user.LookupId(fmt.Sprint(uid))
|
||||
if err == nil {
|
||||
return true, fmt.Errorf("access to performance counters locked by %s", usr.Username)
|
||||
return true, fmt.Errorf("Access to performance counters locked by %s", usr.Username)
|
||||
} else {
|
||||
return true, fmt.Errorf("access to performance counters locked by %d", uid)
|
||||
return true, fmt.Errorf("Access to performance counters locked by %d", uid)
|
||||
}
|
||||
}
|
||||
err = watcher.Add(m.config.LockfilePath)
|
||||
|
@@ -267,45 +267,3 @@ IPC PMC0/PMC1 -> {
|
||||
```
|
||||
|
||||
The script `scripts/likwid_perfgroup_to_cc_config.py` might help you.
|
||||
|
||||
### Internal structure
|
||||
|
||||
This section describes the internal structure of the `likwid` collector.
|
||||
|
||||
#### At initialization
|
||||
|
||||
After setting the defaults, the configuration is read.
|
||||
|
||||
Based on the configuration, the library is searched using `dlopen` to see whether it makes sense to proceed.
|
||||
|
||||
Next, the user-given metrics are tested to ensure they can be evaluated. For this, it creates a list of all user-given events/counters with the value `1.0` which is provided to the metric evaluator. The same is done for the global metrics by using the metric names with value `1.0`. If the evaluator does not fail, the metric can be evaluated and the collector initialization can proceed.
|
||||
|
||||
A separate thread is started to do the measurement. This is not done using a common goroutine but a real application thread with full control. This is required because LIKWID's access system tracks the processes of the using application and the PID should not change between measurements because that would require teardown and reopening of the access system.
|
||||
|
||||
With the separate thread, the access system is initialized by setting the user-given access mode and adding all hardware threads.
|
||||
|
||||
LIKWID measures per hardware thread in general but only some HW threads read the counters available only e.g. per CPU socket (often memory traffic). For this, the collector gets the system topology through LIKWID and creates different mappings like 'hwthread to list offset' and others. With this, the hardware threads responsible for a topological entity can be determined because those read the counters of the per CPU socket units. These mappings are later used in the measurement phase.
|
||||
|
||||
In the end, we read the base CPU frequency of the system. It may be used in the metric evaluation.
|
||||
|
||||
#### Measurements
|
||||
|
||||
The reading of events is done by the separate application thread.
|
||||
|
||||
It traverses over all configured event sets, creates valid LIKWID eventstrings out of them and pass them to take a measurement. This could be done only once but when the LIKWID lock changes, LIKWID has to be completely reopened to provide access again. With this reopening, the already added event sets are gone.
|
||||
|
||||
LIKWID has it's own locking mechanism using a lock file. But not the content of the file is of interest but the owner. In order to track changes of the file, a `fsnotify` watcher is installed on the file. If the file does not exist, it is created and consequently is owned by the same user as `cc-metric-collector`. The LikwidCollector has to watch the file on it's own because LIKWID does not provide proper error handling for this.
|
||||
|
||||
Each call to the LIKWID library for loading the event set, setting up the counting facilities as well as starting and stopping of the counters is wrapped into lockfile checks to ensure no state change happens. If the file owner changed, the LikwidCollector cannot access the counters anymore, so no further operation can be done and measurment stops.
|
||||
|
||||
Although start/stop would be sufficient, the LikwidCollector performs start, read, wait, read, `getLastResult`, stop. Reason might be "historic" but is not 100% clear anymore. The author failed to document ;)
|
||||
|
||||
#### Metric evaluation
|
||||
|
||||
After each meaurement, the metrics of the event set are directly evaluated. It updates the counter->result mapping with the new measurements, calls the evaluator and generates the `CCMetric` with the user-given settings if it should be published. Each metric name to result calculation is stored for the global metric evaluation, which is done as a final step.
|
||||
|
||||
#### Shutdown
|
||||
|
||||
Since each measurment involves a complete initialize to finalize cycle of the LIKWID library, only the topology module needs to be closed.
|
||||
|
||||
Moreover, the separate application thread is stopped.
|
352
sinks/amqpSink.go
Normal file
352
sinks/amqpSink.go
Normal file
@@ -0,0 +1,352 @@
|
||||
package sinks
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
|
||||
lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
|
||||
influx "github.com/influxdata/line-protocol/v2/lineprotocol"
|
||||
amqp "github.com/rabbitmq/amqp091-go"
|
||||
"golang.org/x/exp/slices"
|
||||
)
|
||||
|
||||
type AmqpSinkConfig struct {
|
||||
// defines JSON tags for 'type' and 'meta_as_tags' (string list)
|
||||
// See: metricSink.go
|
||||
defaultSinkConfig
|
||||
// Additional config options, for AmqpSink
|
||||
QueueName string `json:"queue_name"`
|
||||
// Maximum number of points sent to server in single request.
|
||||
// Default: 1000
|
||||
BatchSize int `json:"batch_size,omitempty"`
|
||||
|
||||
// Time interval for delayed sending of metrics.
|
||||
// If the buffers are already filled before the end of this interval,
|
||||
// the metrics are sent without further delay.
|
||||
// Default: 1s
|
||||
FlushInterval string `json:"flush_delay,omitempty"`
|
||||
flushDelay time.Duration
|
||||
|
||||
Hostname string `json:"hostname"`
|
||||
Port int `json:"port"`
|
||||
PublishTimeout string `json:"publish_timeout,omitempty"`
|
||||
publishTimeout time.Duration
|
||||
Username string `json:"username,omitempty"`
|
||||
Password string `json:"password,omitempty"`
|
||||
}
|
||||
|
||||
type AmqpSink struct {
|
||||
// declares elements 'name' and 'meta_as_tags' (string to bool map!)
|
||||
sink
|
||||
config AmqpSinkConfig // entry point to the AmqpSinkConfig
|
||||
// influx line protocol encoder
|
||||
encoder influx.Encoder
|
||||
// number of records stored in the encoder
|
||||
numRecordsInEncoder int
|
||||
// List of tags and meta data tags which should be used as tags
|
||||
extended_tag_list []key_value_pair
|
||||
// Flush() runs in another goroutine and accesses the influx line protocol encoder,
|
||||
// so this encoderLock has to protect the encoder and numRecordsInEncoder
|
||||
encoderLock sync.Mutex
|
||||
|
||||
// timer to run Flush()
|
||||
flushTimer *time.Timer
|
||||
// Lock to assure that only one timer is running at a time
|
||||
timerLock sync.Mutex
|
||||
|
||||
// WaitGroup to ensure only one send operation is running at a time
|
||||
sendWaitGroup sync.WaitGroup
|
||||
|
||||
client *amqp.Connection
|
||||
channel *amqp.Channel
|
||||
queue amqp.Queue
|
||||
}
|
||||
|
||||
// Implement functions required for Sink interface
|
||||
// Write(...), Flush(), Close()
|
||||
// See: metricSink.go
|
||||
|
||||
// Code to submit a single CCMetric to the sink
|
||||
func (s *AmqpSink) Write(m lp.CCMetric) error {
|
||||
|
||||
// Lock for encoder usage
|
||||
s.encoderLock.Lock()
|
||||
|
||||
// Encode measurement name
|
||||
s.encoder.StartLine(m.Name())
|
||||
|
||||
// copy tags and meta data which should be used as tags
|
||||
s.extended_tag_list = s.extended_tag_list[:0]
|
||||
for key, value := range m.Tags() {
|
||||
s.extended_tag_list =
|
||||
append(
|
||||
s.extended_tag_list,
|
||||
key_value_pair{
|
||||
key: key,
|
||||
value: value,
|
||||
},
|
||||
)
|
||||
}
|
||||
for _, key := range s.config.MetaAsTags {
|
||||
if value, ok := m.GetMeta(key); ok {
|
||||
s.extended_tag_list =
|
||||
append(
|
||||
s.extended_tag_list,
|
||||
key_value_pair{
|
||||
key: key,
|
||||
value: value,
|
||||
},
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// Encode tags (they musts be in lexical order)
|
||||
slices.SortFunc(
|
||||
s.extended_tag_list,
|
||||
func(a key_value_pair, b key_value_pair) int {
|
||||
if a.key < b.key {
|
||||
return -1
|
||||
}
|
||||
if a.key > b.key {
|
||||
return +1
|
||||
}
|
||||
return 0
|
||||
},
|
||||
)
|
||||
for i := range s.extended_tag_list {
|
||||
s.encoder.AddTag(
|
||||
s.extended_tag_list[i].key,
|
||||
s.extended_tag_list[i].value,
|
||||
)
|
||||
}
|
||||
|
||||
// Encode fields
|
||||
for key, value := range m.Fields() {
|
||||
s.encoder.AddField(key, influx.MustNewValue(value))
|
||||
}
|
||||
|
||||
// Encode time stamp
|
||||
s.encoder.EndLine(m.Time())
|
||||
|
||||
// Check for encoder errors
|
||||
if err := s.encoder.Err(); err != nil {
|
||||
// Unlock encoder usage
|
||||
s.encoderLock.Unlock()
|
||||
|
||||
return fmt.Errorf("encoding failed: %v", err)
|
||||
}
|
||||
s.numRecordsInEncoder++
|
||||
|
||||
if s.config.flushDelay == 0 {
|
||||
// Unlock encoder usage
|
||||
s.encoderLock.Unlock()
|
||||
|
||||
// Directly flush if no flush delay is configured
|
||||
return s.Flush()
|
||||
} else if s.numRecordsInEncoder == s.config.BatchSize {
|
||||
// Unlock encoder usage
|
||||
s.encoderLock.Unlock()
|
||||
|
||||
// Stop flush timer
|
||||
if s.flushTimer != nil {
|
||||
if ok := s.flushTimer.Stop(); ok {
|
||||
cclog.ComponentDebug(s.name, "Write(): Stopped flush timer. Batch size limit reached before flush delay")
|
||||
s.timerLock.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
// Flush if batch size is reached
|
||||
return s.Flush()
|
||||
} else if s.timerLock.TryLock() {
|
||||
|
||||
// Setup flush timer when flush delay is configured
|
||||
// and no other timer is already running
|
||||
if s.flushTimer != nil {
|
||||
|
||||
// Restarting existing flush timer
|
||||
cclog.ComponentDebug(s.name, "Write(): Restarting flush timer")
|
||||
s.flushTimer.Reset(s.config.flushDelay)
|
||||
} else {
|
||||
|
||||
// Creating and starting flush timer
|
||||
cclog.ComponentDebug(s.name, "Write(): Starting new flush timer")
|
||||
s.flushTimer = time.AfterFunc(
|
||||
s.config.flushDelay,
|
||||
func() {
|
||||
defer s.timerLock.Unlock()
|
||||
cclog.ComponentDebug(s.name, "Starting flush triggered by flush timer")
|
||||
if err := s.Flush(); err != nil {
|
||||
cclog.ComponentError(s.name, "Flush triggered by flush timer: flush failed:", err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Unlock encoder usage
|
||||
s.encoderLock.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
// If the sink uses batched sends internally, you can tell to flush its buffers
|
||||
func (s *AmqpSink) Flush() error {
|
||||
|
||||
// Lock for encoder usage
|
||||
// Own lock for as short as possible: the time it takes to clone the buffer.
|
||||
s.encoderLock.Lock()
|
||||
|
||||
buf := slices.Clone(s.encoder.Bytes())
|
||||
numRecordsInBuf := s.numRecordsInEncoder
|
||||
s.encoder.Reset()
|
||||
s.numRecordsInEncoder = 0
|
||||
|
||||
// Unlock encoder usage
|
||||
s.encoderLock.Unlock()
|
||||
|
||||
if len(buf) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
cclog.ComponentDebug(s.name, "Flush(): Flushing", numRecordsInBuf, "metrics")
|
||||
|
||||
// Asynchron send of encoder metrics
|
||||
s.sendWaitGroup.Add(1)
|
||||
go func() {
|
||||
defer s.sendWaitGroup.Done()
|
||||
//startTime := time.Now()
|
||||
ctx, cancel := context.WithTimeout(context.Background(), s.config.flushDelay)
|
||||
defer cancel()
|
||||
err := s.channel.PublishWithContext(ctx, "", s.queue.Name, false, false, amqp.Publishing{
|
||||
ContentType: "text/plain",
|
||||
Body: buf,
|
||||
})
|
||||
if err != nil {
|
||||
cclog.ComponentError(s.name, err.Error())
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close sink: close network connection, close files, close libraries, ...
|
||||
func (s *AmqpSink) Close() {
|
||||
|
||||
cclog.ComponentDebug(s.name, "CLOSE")
|
||||
|
||||
// Stop existing timer and immediately flush
|
||||
if s.flushTimer != nil {
|
||||
if ok := s.flushTimer.Stop(); ok {
|
||||
s.timerLock.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
// Flush
|
||||
if err := s.Flush(); err != nil {
|
||||
cclog.ComponentError(s.name, "Close():", "Flush failed:", err)
|
||||
}
|
||||
|
||||
// Wait for send operations to finish
|
||||
s.sendWaitGroup.Wait()
|
||||
|
||||
s.client.Close()
|
||||
s.client = nil
|
||||
}
|
||||
|
||||
// New function to create a new instance of the sink
|
||||
// Initialize the sink by giving it a name and reading in the config JSON
|
||||
func NewAmqpSink(name string, config json.RawMessage) (Sink, error) {
|
||||
s := new(AmqpSink)
|
||||
|
||||
// Set name of sampleSink
|
||||
// The name should be chosen in such a way that different instances of AmqpSink can be distinguished
|
||||
s.name = fmt.Sprintf("AmqpSink(%s)", name) // Always specify a name here
|
||||
|
||||
// Set defaults in s.config
|
||||
// Allow overwriting these defaults by reading config JSON
|
||||
|
||||
s.config.PublishTimeout = "4s"
|
||||
s.config.publishTimeout = time.Duration(4) * time.Second
|
||||
s.config.Hostname = "localhost"
|
||||
s.config.Port = 1883
|
||||
|
||||
// Read in the config JSON
|
||||
if len(config) > 0 {
|
||||
d := json.NewDecoder(bytes.NewReader(config))
|
||||
d.DisallowUnknownFields()
|
||||
if err := d.Decode(&s.config); err != nil {
|
||||
cclog.ComponentError(s.name, "Error reading config:", err.Error())
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// Create lookup map to use meta infos as tags in the output metric
|
||||
s.meta_as_tags = make(map[string]bool)
|
||||
for _, k := range s.config.MetaAsTags {
|
||||
s.meta_as_tags[k] = true
|
||||
}
|
||||
|
||||
// Check if all required fields in the config are set
|
||||
// E.g. use 'len(s.config.Option) > 0' for string settings
|
||||
if t, err := time.ParseDuration(s.config.PublishTimeout); err == nil {
|
||||
s.config.publishTimeout = t
|
||||
} else {
|
||||
err := fmt.Errorf("to parse duration for PublishTimeout: %s", s.config.PublishTimeout)
|
||||
cclog.ComponentError(s.name, err.Error())
|
||||
return nil, err
|
||||
}
|
||||
if t, err := time.ParseDuration(s.config.FlushInterval); err == nil {
|
||||
s.config.flushDelay = t
|
||||
} else {
|
||||
err := fmt.Errorf("to parse duration for FlushInterval: %s", s.config.FlushInterval)
|
||||
cclog.ComponentError(s.name, err.Error())
|
||||
return nil, err
|
||||
}
|
||||
|
||||
url := net.JoinHostPort(s.config.Hostname, fmt.Sprintf("%d", s.config.Port))
|
||||
userpart := ""
|
||||
if len(s.config.Username) > 0 {
|
||||
userpart = s.config.Username
|
||||
if len(s.config.Password) > 0 {
|
||||
userpart += ":" + s.config.Password
|
||||
}
|
||||
userpart += "@"
|
||||
}
|
||||
url = fmt.Sprintf("amqp://%s%s", userpart, url)
|
||||
|
||||
// Establish connection to the server, library, ...
|
||||
// Check required files exist and lookup path(s) of executable(s)
|
||||
c, err := amqp.Dial(url)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
s.client = c
|
||||
|
||||
ch, err := c.Channel()
|
||||
if err != nil {
|
||||
s.client.Close()
|
||||
return nil, err
|
||||
}
|
||||
s.channel = ch
|
||||
|
||||
q, err := ch.QueueDeclare(
|
||||
s.config.QueueName, // name
|
||||
false, // durable
|
||||
false, // delete when unused
|
||||
false, // exclusive
|
||||
false, // no-wait
|
||||
nil, // arguments
|
||||
)
|
||||
if err != nil {
|
||||
s.channel.Close()
|
||||
s.client.Close()
|
||||
return nil, err
|
||||
}
|
||||
s.queue = q
|
||||
|
||||
// Return (nil, meaningful error message) in case of errors
|
||||
return s, nil
|
||||
}
|
33
sinks/amqpSink.md
Normal file
33
sinks/amqpSink.md
Normal file
@@ -0,0 +1,33 @@
|
||||
## `amqp` sink
|
||||
|
||||
The `amqp` sink publishes all metrics into a RabbitMQ network. The publishing key is the queue name in the configuration file
|
||||
|
||||
### Configuration structure
|
||||
|
||||
```json
|
||||
{
|
||||
"<name>": {
|
||||
"type": "amqp",
|
||||
"queue_name" : "myqueue",
|
||||
"batch_size" : 1000,
|
||||
"flush_delay": "4s",
|
||||
"publish_timeout": "1s",
|
||||
"host": "dbhost.example.com",
|
||||
"port": 5672,
|
||||
"username": "exampleuser",
|
||||
"password" : "examplepw",
|
||||
"meta_as_tags" : [],
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
- `type`: makes the sink an `amqp` sink, also `rabbitmq` is allowed as alias
|
||||
- `queue_name`: All metrics are published to this queue
|
||||
- `host`: Hostname of the RabbitMQ server
|
||||
- `port`: Port number of the RabbitMQ server
|
||||
- `username`: Username for basic authentication
|
||||
- `password`: Password for basic authentication
|
||||
- `meta_as_tags`: print all meta information as tags in the output (optional)
|
||||
- `publish_timeout`: Timeout for each publication operation (default `1s`)
|
||||
- `flush_delay`: Group metrics coming in to a single batch (default `4s`)
|
||||
- `batch_size`: Maximal batch size. If `batch_size` is reached before the end of `flush_delay`, the metrics are sent without further delay (default: `1000`)
|
@@ -21,6 +21,8 @@ var AvailableSinks = map[string]func(name string, config json.RawMessage) (Sink,
|
||||
"influxdb": NewInfluxSink,
|
||||
"influxasync": NewInfluxAsyncSink,
|
||||
"http": NewHttpSink,
|
||||
"amqp": NewAmqpSink,
|
||||
"rabbitmq": NewAmqpSink,
|
||||
}
|
||||
|
||||
// Metric collector manager data structure
|
||||
|
Reference in New Issue
Block a user